| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 import sys, pickle | |
| 5 | |
| 6 try: | |
| 7 import threading | |
| 8 except ImportError: | |
| 9 threading = None | |
| 10 | |
| 11 from twisted.trial import unittest | |
| 12 from twisted.python import threadable | |
| 13 from twisted.internet import defer, reactor | |
| 14 | |
| 15 class TestObject: | |
| 16 synchronized = ['aMethod'] | |
| 17 | |
| 18 x = -1 | |
| 19 y = 1 | |
| 20 | |
| 21 def aMethod(self): | |
| 22 for i in xrange(10): | |
| 23 self.x, self.y = self.y, self.x | |
| 24 self.z = self.x + self.y | |
| 25 assert self.z == 0, "z == %d, not 0 as expected" % (self.z,) | |
| 26 | |
| 27 threadable.synchronize(TestObject) | |
| 28 | |
| 29 class SynchronizationTestCase(unittest.TestCase): | |
| 30 if hasattr(sys, 'getcheckinterval'): | |
| 31 def setUpClass(self): | |
| 32 self.checkInterval = sys.getcheckinterval() | |
| 33 sys.setcheckinterval(7) | |
| 34 | |
| 35 def tearDownClass(self): | |
| 36 sys.setcheckinterval(self.checkInterval) | |
| 37 | |
| 38 | |
| 39 def setUp(self): | |
| 40 # XXX This is a trial hack. We need to make sure the reactor | |
| 41 # actually *starts* for isInIOThread() to have a meaningful result. | |
| 42 # Returning a Deferred here should force that to happen, if it has | |
| 43 # not happened already. In the future, this should not be | |
| 44 # necessary. | |
| 45 d = defer.Deferred() | |
| 46 reactor.callLater(0, d.callback, None) | |
| 47 return d | |
| 48 | |
| 49 | |
| 50 def testIsInIOThread(self): | |
| 51 foreignResult = [] | |
| 52 t = threading.Thread(target=lambda: foreignResult.append(threadable.isIn
IOThread())) | |
| 53 t.start() | |
| 54 t.join() | |
| 55 self.failIf(foreignResult[0], "Non-IO thread reported as IO thread") | |
| 56 self.failUnless(threadable.isInIOThread(), "IO thread reported as not IO
thread") | |
| 57 | |
| 58 | |
| 59 def testThreadedSynchronization(self): | |
| 60 o = TestObject() | |
| 61 | |
| 62 errors = [] | |
| 63 | |
| 64 def callMethodLots(): | |
| 65 try: | |
| 66 for i in xrange(1000): | |
| 67 o.aMethod() | |
| 68 except AssertionError, e: | |
| 69 errors.append(str(e)) | |
| 70 | |
| 71 threads = [] | |
| 72 for x in range(5): | |
| 73 t = threading.Thread(target=callMethodLots) | |
| 74 threads.append(t) | |
| 75 t.start() | |
| 76 | |
| 77 for t in threads: | |
| 78 t.join() | |
| 79 | |
| 80 if errors: | |
| 81 raise unittest.FailTest(errors) | |
| 82 | |
| 83 def testUnthreadedSynchronization(self): | |
| 84 o = TestObject() | |
| 85 for i in xrange(1000): | |
| 86 o.aMethod() | |
| 87 | |
| 88 class SerializationTestCase(unittest.TestCase): | |
| 89 def testPickling(self): | |
| 90 lock = threadable.XLock() | |
| 91 lockType = type(lock) | |
| 92 lockPickle = pickle.dumps(lock) | |
| 93 newLock = pickle.loads(lockPickle) | |
| 94 self.failUnless(isinstance(newLock, lockType)) | |
| 95 | |
| 96 def testUnpickling(self): | |
| 97 lockPickle = 'ctwisted.python.threadable\nunpickle_lock\np0\n(tp1\nRp2\n
.' | |
| 98 lock = pickle.loads(lockPickle) | |
| 99 newPickle = pickle.dumps(lock, 2) | |
| 100 newLock = pickle.loads(newPickle) | |
| 101 | |
| 102 if threading is None: | |
| 103 SynchronizationTestCase.testThreadedSynchronization.skip = "Platform lacks t
hread support" | |
| 104 SerializationTestCase.testPickling.skip = "Platform lacks thread support" | |
| OLD | NEW |