| OLD | NEW |
| (Empty) |
| 1 import signal | |
| 2 import weakref | |
| 3 | |
| 4 from unittest2.compatibility import wraps | |
| 5 | |
| 6 __unittest = True | |
| 7 | |
| 8 | |
| 9 class _InterruptHandler(object): | |
| 10 def __init__(self, default_handler): | |
| 11 self.called = False | |
| 12 self.default_handler = default_handler | |
| 13 | |
| 14 def __call__(self, signum, frame): | |
| 15 installed_handler = signal.getsignal(signal.SIGINT) | |
| 16 if installed_handler is not self: | |
| 17 # if we aren't the installed handler, then delegate immediately | |
| 18 # to the default handler | |
| 19 self.default_handler(signum, frame) | |
| 20 | |
| 21 if self.called: | |
| 22 self.default_handler(signum, frame) | |
| 23 self.called = True | |
| 24 for result in _results.keys(): | |
| 25 result.stop() | |
| 26 | |
| 27 _results = weakref.WeakKeyDictionary() | |
| 28 def registerResult(result): | |
| 29 _results[result] = 1 | |
| 30 | |
| 31 def removeResult(result): | |
| 32 return bool(_results.pop(result, None)) | |
| 33 | |
| 34 _interrupt_handler = None | |
| 35 def installHandler(): | |
| 36 global _interrupt_handler | |
| 37 if _interrupt_handler is None: | |
| 38 default_handler = signal.getsignal(signal.SIGINT) | |
| 39 _interrupt_handler = _InterruptHandler(default_handler) | |
| 40 signal.signal(signal.SIGINT, _interrupt_handler) | |
| 41 | |
| 42 | |
| 43 def removeHandler(method=None): | |
| 44 if method is not None: | |
| 45 @wraps(method) | |
| 46 def inner(*args, **kwargs): | |
| 47 initial = signal.getsignal(signal.SIGINT) | |
| 48 removeHandler() | |
| 49 try: | |
| 50 return method(*args, **kwargs) | |
| 51 finally: | |
| 52 signal.signal(signal.SIGINT, initial) | |
| 53 return inner | |
| 54 | |
| 55 global _interrupt_handler | |
| 56 if _interrupt_handler is not None: | |
| 57 signal.signal(signal.SIGINT, _interrupt_handler.default_handler) | |
| OLD | NEW |