How to prevent a block of code from being interrupted by KeyboardInterrupt in Python?

Each Answer to this Q is separated by one/two green lines.

I’m writing a program that caches some results via the pickle module. What happens at the moment is that if I hit ctrl-c at while the dump operation is occurring, dump gets interrupted and the resulting file is corrupted (i.e. only partially written, so it cannot be loaded again.

Is there a way to make dump, or in general a block of code, uninterruptable? My current workaround looks something like this:

try:
  file = open(path, 'w')
  dump(obj, file)
  file.close()
except KeyboardInterrupt:
  file.close()
  file.open(path,'w')
  dump(obj, file)
  file.close()
  raise

It seems silly to restart the operation if it is interrupted, so I am searching for a way to defer the interrupt. How do I do this?

The following is a context manager that attaches a signal handler for SIGINT. If the context manager’s signal handler is called, the signal is delayed by only passing the signal to the original handler when the context manager exits.

import signal
import logging

class DelayedKeyboardInterrupt:

    def __enter__(self):
        self.signal_received = False
        self.old_handler = signal.signal(signal.SIGINT, self.handler)
                
    def handler(self, sig, frame):
        self.signal_received = (sig, frame)
        logging.debug('SIGINT received. Delaying KeyboardInterrupt.')
    
    def __exit__(self, type, value, traceback):
        signal.signal(signal.SIGINT, self.old_handler)
        if self.signal_received:
            self.old_handler(*self.signal_received)

with DelayedKeyboardInterrupt():
    # stuff here will not be interrupted by SIGINT
    critical_code()

Put the function in a thread, and wait for the thread to finish.

Python threads cannot be interrupted except with a special C api.

import time
from threading import Thread

def noInterrupt():
    for i in xrange(4):
        print i
        time.sleep(1)

a = Thread(target=noInterrupt)
a.start()
a.join()
print "done"


0
1
2
3
Traceback (most recent call last):
  File "C:\Users\Admin\Desktop\test.py", line 11, in <module>
    a.join()
  File "C:\Python26\lib\threading.py", line 634, in join
    self.__block.wait()
  File "C:\Python26\lib\threading.py", line 237, in wait
    waiter.acquire()
KeyboardInterrupt

See how the interrupt was deferred until the thread finished?

Here it is adapted to your use:

import time
from threading import Thread

def noInterrupt(path, obj):
    try:
        file = open(path, 'w')
        dump(obj, file)
    finally:
        file.close()

a = Thread(target=noInterrupt, args=(path,obj))
a.start()
a.join()

Use the signal module to disable SIGINT for the duration of the process:

s = signal.signal(signal.SIGINT, signal.SIG_IGN)
do_important_stuff()
signal.signal(signal.SIGINT, s)

In my opinion using threads for this is an overkill. You can make sure the file is being saved correctly by simply doing it in a loop until a successful write was done:

def saveToFile(obj, filename):
    file = open(filename, 'w')
    cPickle.dump(obj, file)
    file.close()
    return True

done = False
while not done:
    try:
        done = saveToFile(obj, 'file')
    except KeyboardInterrupt:
        print 'retry'
        continue

This question is about blocking the KeyboardInterrupt, but for this situation I find atomic file writing to be cleaner and provide additional protection.

With atomic writes either the entire file gets written correctly, or nothing does. Stackoverflow has a variety of solutions, but personally I like just using atomicwrites library.

After running pip install atomicwrites, just use it like this:

from atomicwrites import atomic_write

with atomic_write(path, overwrite=True) as file:
    dump(obj, file)

I’ve been thinking a lot about the criticisms of the answers to this question, and I believe I have implemented a better solution, which is used like so:

with signal_fence(signal.SIGINT):
  file = open(path, 'w')
  dump(obj, file)
  file.close()

The signal_fence context manager is below, followed by an explanation of its improvements on the previous answers. The docstring of this function documents its interface and guarantees.

import os
import signal
from contextlib import contextmanager
from types import FrameType
from typing import Callable, Iterator, Optional, Tuple
from typing_extensions import assert_never


@contextmanager
def signal_fence(
    signum: signal.Signals,
    *,
    on_deferred_signal: Callable[[int, Optional[FrameType]], None] = None,
) -> Iterator[None]:
    """
    A `signal_fence` creates an uninterruptible "fence" around a block of code. The
    fence defers a specific signal received inside of the fence until the fence is
    destroyed, at which point the original signal handler is called with the deferred
    signal. Multiple deferred signals will result in a single call to the original
    handler. An optional callback `on_deferred_signal` may be specified which will be
    called each time a signal is handled while the fence is active, and can be used
    to print a message or record the signal.

    A `signal_fence` guarantees the following with regards to exception-safety:

    1. If an exception occurs prior to creating the fence (installing a custom signal
    handler), the exception will bubble up as normal. The code inside of the fence will
    not run.
    2. If an exception occurs after creating the fence, including in the fenced code,
    the original signal handler will always be restored before the exception bubbles up.
    3. If an exception occurs while the fence is calling the original signal handler on
    destruction, the original handler may not be called, but the original handler will
    be restored. The exception will bubble up and can be detected by calling code.
    4. If an exception occurs while the fence is restoring the original signal handler
    (exceedingly rare), the original signal handler will be restored regardless.
    5. No guarantees about the fence's behavior are made if exceptions occur while
    exceptions are being handled.

    A `signal_fence` can only be used on the main thread, or else a `ValueError` will
    raise when entering the fence.
    """
    handled: Optional[Tuple[int, Optional[FrameType]]] = None

    def handler(signum: int, frame: Optional[FrameType]) -> None:
        nonlocal handled
        if handled is None:
            handled = (signum, frame)
        if on_deferred_signal is not None:
            try:
                on_deferred_signal(signum, frame)
            except:
                pass

    # https://docs.python.org/3/library/signal.html#signal.getsignal
    original_handler = signal.getsignal(signum)
    if original_handler is None:
        raise TypeError(
            "signal_fence cannot be used with signal handlers that were not installed"
            " from Python"
        )
    if isinstance(original_handler, int) and not isinstance(
        original_handler, signal.Handlers
    ):
        raise NotImplementedError(
            "Your Python interpreter's signal module is using raw integers to"
            " represent SIG_IGN and SIG_DFL, which shouldn't be possible!"
        )

    # N.B. to best guarantee the original handler is restored, the @contextmanager
    #      decorator is used rather than a class with __enter__/__exit__ methods so
    #      that the installation of the new handler can be done inside of a try block,
    #      whereas per [PEP 343](https://www.python.org/dev/peps/pep-0343/) the
    #      __enter__ call is not guaranteed to have a corresponding __exit__ call if an
    #      exception interleaves
    try:
        try:
            signal.signal(signum, handler)
            yield
        finally:
            if handled is not None:
                if isinstance(original_handler, signal.Handlers):
                    if original_handler is signal.Handlers.SIG_IGN:
                        pass
                    elif original_handler is signal.Handlers.SIG_DFL:
                        signal.signal(signum, signal.SIG_DFL)
                        os.kill(os.getpid(), signum)
                    else:
                        assert_never(original_handler)
                elif callable(original_handler):
                    original_handler(*handled)
                else:
                    assert_never(original_handler)
            signal.signal(signum, original_handler)
    except:
        signal.signal(signum, original_handler)
        raise

First, why not use a thread (accepted answer)? Running code in a non-daemon thread does guarantee that the thread will be joined on interpreter shutdown, but any exception on the main thread (e.g. KeyboardInterrupt) will not prevent the main thread from continuing to execute. Consider what would happen if the thread method is using some data that the main thread mutates in a finally block after the KeyboardInterrupt.

Second, to address @benrg‘s feedback on the most upvoted answer using a context manager:

  1. if an exception is raised after signal is called but before __enter__ returns, the signal will be permanently blocked;

My solution avoids this bug by using a generator context manager with the aid of the @contextmanager decorator. See the full comment in the code above for more details.

  1. this code may call third-party exception handlers in threads other than the main thread, which CPython never does;

I don’t think this bug is real. signal.signal is required to be called from the main thread, and raises ValueError otherwise. These context managers can only run on the main thread, and thus will only call third-party exception handlers from the main thread.

  1. if signal returns a non-callable value, __exit__ will crash

My solution handles all possible values of the signal handler and calls them appropriately. Additionally I use assert_never to benefit from exhaustiveness checking in static analyzers.

Do note that signal_fence is designed to handle one interruption on the main thread such as a KeyboardInterrupt. If your user is spamming Ctrl+C while the signal handler is being restored, not much can save you. This is unlikely given the relatively few opcodes that need to execute to restore the handler, but it’s possible.

I’ve fully unit tested this code and have confidence it is the most robust solution so far.

A generic approach would be to use a context manager that accepts a set of signal to suspend:

import signal

from contextlib import contextmanager


@contextmanager
def suspended_signals(*signals):
    """
    Suspends signal handling execution
    """
    signal.pthread_sigmask(signal.SIG_BLOCK, set(signals))
    try:
        yield None
    finally:
        signal.pthread_sigmask(signal.SIG_UNBLOCK, set(signals))


The answers/resolutions are collected from stackoverflow, are licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0 .