Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(50)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_threadable.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2004 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 import sys, pickle
5
6 try:
7 import threading
8 except ImportError:
9 threading = None
10
11 from twisted.trial import unittest
12 from twisted.python import threadable
13 from twisted.internet import defer, reactor
14
15 class TestObject:
16 synchronized = ['aMethod']
17
18 x = -1
19 y = 1
20
21 def aMethod(self):
22 for i in xrange(10):
23 self.x, self.y = self.y, self.x
24 self.z = self.x + self.y
25 assert self.z == 0, "z == %d, not 0 as expected" % (self.z,)
26
27 threadable.synchronize(TestObject)
28
29 class SynchronizationTestCase(unittest.TestCase):
30 if hasattr(sys, 'getcheckinterval'):
31 def setUpClass(self):
32 self.checkInterval = sys.getcheckinterval()
33 sys.setcheckinterval(7)
34
35 def tearDownClass(self):
36 sys.setcheckinterval(self.checkInterval)
37
38
39 def setUp(self):
40 # XXX This is a trial hack. We need to make sure the reactor
41 # actually *starts* for isInIOThread() to have a meaningful result.
42 # Returning a Deferred here should force that to happen, if it has
43 # not happened already. In the future, this should not be
44 # necessary.
45 d = defer.Deferred()
46 reactor.callLater(0, d.callback, None)
47 return d
48
49
50 def testIsInIOThread(self):
51 foreignResult = []
52 t = threading.Thread(target=lambda: foreignResult.append(threadable.isIn IOThread()))
53 t.start()
54 t.join()
55 self.failIf(foreignResult[0], "Non-IO thread reported as IO thread")
56 self.failUnless(threadable.isInIOThread(), "IO thread reported as not IO thread")
57
58
59 def testThreadedSynchronization(self):
60 o = TestObject()
61
62 errors = []
63
64 def callMethodLots():
65 try:
66 for i in xrange(1000):
67 o.aMethod()
68 except AssertionError, e:
69 errors.append(str(e))
70
71 threads = []
72 for x in range(5):
73 t = threading.Thread(target=callMethodLots)
74 threads.append(t)
75 t.start()
76
77 for t in threads:
78 t.join()
79
80 if errors:
81 raise unittest.FailTest(errors)
82
83 def testUnthreadedSynchronization(self):
84 o = TestObject()
85 for i in xrange(1000):
86 o.aMethod()
87
88 class SerializationTestCase(unittest.TestCase):
89 def testPickling(self):
90 lock = threadable.XLock()
91 lockType = type(lock)
92 lockPickle = pickle.dumps(lock)
93 newLock = pickle.loads(lockPickle)
94 self.failUnless(isinstance(newLock, lockType))
95
96 def testUnpickling(self):
97 lockPickle = 'ctwisted.python.threadable\nunpickle_lock\np0\n(tp1\nRp2\n .'
98 lock = pickle.loads(lockPickle)
99 newPickle = pickle.dumps(lock, 2)
100 newLock = pickle.loads(newPickle)
101
102 if threading is None:
103 SynchronizationTestCase.testThreadedSynchronization.skip = "Platform lacks t hread support"
104 SerializationTestCase.testPickling.skip = "Platform lacks thread support"
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_text.py ('k') | third_party/twisted_8_1/twisted/test/test_threadpool.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698