Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(403)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_reflector.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4 """
5 Tests for twisted.enterprise reflectors.
6 """
7
8 import random
9
10 from twisted.internet import reactor, interfaces, defer
11 from twisted.enterprise.row import RowObject
12 from twisted.enterprise.reflector import EQUAL
13 from twisted.enterprise.sqlreflector import SQLReflector
14 from twisted.enterprise import util
15 from twisted.test.test_adbapi import makeSQLTests
16 from twisted.trial.util import suppress as suppressWarning
17 from twisted.trial import unittest
18
19
20 tableName = "testTable"
21 childTableName = "childTable"
22
23
24 class TestRow(RowObject):
25 rowColumns = [("key_string", "varchar"),
26 ("col2", "int"),
27 ("another_column", "varchar"),
28 ("Column4", "varchar"),
29 ("column_5_", "int")]
30 rowKeyColumns = [("key_string", "varchar")]
31 rowTableName = tableName
32
33
34 class ChildRow(RowObject):
35 rowColumns = [("childId", "int"),
36 ("foo", "varchar"),
37 ("test_key", "varchar"),
38 ("stuff", "varchar"),
39 ("gogogo", "int"),
40 ("data", "varchar")]
41 rowKeyColumns = [("childId", "int")]
42 rowTableName = childTableName
43 rowForeignKeys = [(tableName,
44 [("test_key","varchar")],
45 [("key_string","varchar")],
46 None, 1)]
47
48
49 main_table_schema = """
50 CREATE TABLE testTable (
51 key_string varchar(64),
52 col2 integer,
53 another_column varchar(64),
54 Column4 varchar(64),
55 column_5_ integer
56 )
57 """
58
59 child_table_schema = """
60 CREATE TABLE childTable (
61 childId integer,
62 foo varchar(64),
63 test_key varchar(64),
64 stuff varchar(64),
65 gogogo integer,
66 data varchar(64)
67 )
68 """
69
70
71 def randomizeRow(row, nulls_ok=True, trailing_spaces_ok=True):
72 values = {}
73 for name, type in row.rowColumns:
74 if util.getKeyColumn(row, name):
75 values[name] = getattr(row, name)
76 continue
77 elif nulls_ok and random.randint(0, 9) == 0:
78 value = None # null
79 elif type == 'int':
80 value = random.randint(-10000, 10000)
81 else:
82 if random.randint(0, 9) == 0:
83 value = ''
84 else:
85 value = ''.join(map(lambda i:chr(random.randrange(32,127)),
86 xrange(random.randint(1, 64))))
87 if not trailing_spaces_ok:
88 value = value.rstrip()
89 setattr(row, name, value)
90 values[name] = value
91 return values
92
93
94 def rowMatches(row, values):
95 for name, type in row.rowColumns:
96 if getattr(row, name) != values[name]:
97 print ("Mismatch on column %s: |%s| (row) |%s| (values)" %
98 (name, getattr(row, name), values[name]))
99 return False
100 return True
101
102
103 rowObjectSuppression = suppressWarning(
104 message="twisted.enterprise.row is deprecated since Twisted 8.0",
105 category=DeprecationWarning)
106
107
108 reflectorSuppression = suppressWarning(
109 message="twisted.enterprise.reflector is deprecated since Twisted 8.0",
110 category=DeprecationWarning)
111
112
113 class ReflectorTestBase:
114 """
115 Base class for testing reflectors.
116 """
117
118 if interfaces.IReactorThreads(reactor, None) is None:
119 skip = "No thread support, no reflector tests"
120
121 count = 100 # a parameter used for running iterative tests
122
123 def randomizeRow(self, row):
124 return randomizeRow(row, self.nulls_ok, self.trailing_spaces_ok)
125
126 def setUp(self):
127 d = self.createReflector()
128 d.addCallback(self._cbSetUp)
129 return d
130
131 def _cbSetUp(self, reflector):
132 self.reflector = reflector
133
134 def tearDown(self):
135 return self.destroyReflector()
136
137 def destroyReflector(self):
138 pass
139
140 def test_reflector(self):
141 """
142 Full featured tests of reflector.
143 """
144 # create one row to work with
145 row = TestRow()
146 row.assignKeyAttr("key_string", "first")
147 values = self.randomizeRow(row)
148
149 # save it
150 d = self.reflector.insertRow(row)
151
152 def _loadBack(_):
153 # now load it back in
154 whereClause = [("key_string", EQUAL, "first")]
155 d = self.reflector.loadObjectsFrom(tableName,
156 whereClause=whereClause)
157 return d.addCallback(self.gotData)
158
159 def _getParent(_):
160 # make sure it came back as what we saved
161 self.failUnless(len(self.data) == 1, "no row")
162 parent = self.data[0]
163 self.failUnless(rowMatches(parent, values), "no match")
164 return parent
165
166 d.addCallback(_loadBack)
167 d.addCallback(_getParent)
168 d.addCallback(self._cbTestReflector)
169 return d
170 test_reflector.suppress = [rowObjectSuppression, reflectorSuppression]
171
172 def _cbTestReflector(self, parent):
173 # create some child rows
174 test_values = {}
175 inserts = []
176 child_values = {}
177 for i in range(0, self.num_iterations):
178 row = ChildRow()
179 row.assignKeyAttr("childId", i)
180 values = self.randomizeRow(row)
181 values['test_key'] = row.test_key = "first"
182 child_values[i] = values
183 inserts.append(self.reflector.insertRow(row))
184 row = None
185 #del inserts
186 d = defer.gatherResults(inserts)
187 values = [None]
188
189 def _loadObjects(_):
190 d = self.reflector.loadObjectsFrom(childTableName, parentRow=parent)
191 return d.addCallback(self.gotData)
192
193 def _checkLoadObjects(_):
194 self.failUnless(len(self.data) == self.num_iterations,
195 "no rows on query")
196 self.failUnless(len(parent.childRows) == self.num_iterations,
197 "did not load child rows: %d" % len(parent.childRows ))
198 for child in parent.childRows:
199 self.failUnless(rowMatches(child, child_values[child.childId]),
200 "child %d does not match" % child.childId)
201
202 def _checkLoadObjects2(_):
203 self.failUnless(len(self.data) == self.num_iterations,
204 "no rows on query")
205 self.failUnless(len(parent.childRows) == self.num_iterations,
206 "child rows added twice!: %d" % len(parent.childRows ))
207
208 def _changeParent(_):
209 # now change the parent
210 values[0] = self.randomizeRow(parent)
211 return self.reflector.updateRow(parent)
212
213 def _loadBack(_):
214 # now load it back in
215 whereClause = [("key_string", EQUAL, "first")]
216 d = self.reflector.loadObjectsFrom(tableName, whereClause=whereClaus e)
217 return d.addCallback(self.gotData)
218
219 def _checkLoadBack(_):
220 # make sure it came back as what we saved
221 self.failUnless(len(self.data) == 1, "no row")
222 parent = self.data[0]
223 self.failUnless(rowMatches(parent, values[0]), "no match")
224 # save parent
225 test_values[parent.key_string] = values[0]
226 parent = None
227
228 def _saveMoreTestRows(_):
229 # save some more test rows
230 ds = []
231 for i in range(0, self.num_iterations):
232 row = TestRow()
233 row.assignKeyAttr("key_string", "bulk%d"%i)
234 test_values[row.key_string] = self.randomizeRow(row)
235 ds.append(self.reflector.insertRow(row))
236 return defer.gatherResults(ds)
237
238 def _loadRowsBack(_):
239 # now load them all back in
240 d = self.reflector.loadObjectsFrom("testTable")
241 return d.addCallback(self.gotData)
242
243 def _checkRowsBack(_):
244 # make sure they are the same
245 self.failUnless(len(self.data) == self.num_iterations + 1,
246 "query did not get rows")
247 for row in self.data:
248 self.failUnless(rowMatches(row, test_values[row.key_string]),
249 "child %s does not match" % row.key_string)
250
251 def _changeRows(_):
252 # now change them all
253 ds = []
254 for row in self.data:
255 test_values[row.key_string] = self.randomizeRow(row)
256 ds.append(self.reflector.updateRow(row))
257 d = defer.gatherResults(ds)
258 return d.addCallback(_cbChangeRows)
259
260 def _cbChangeRows(_):
261 self.data = None
262
263 def _deleteRows(_):
264 # now delete them
265 ds = []
266 for row in self.data:
267 ds.append(self.reflector.deleteRow(row))
268 d = defer.gatherResults(ds)
269 return d.addCallback(_cbChangeRows)
270
271 def _checkRowsDeleted(_):
272 self.failUnless(len(self.data) == 0, "rows were not deleted")
273
274 d.addCallback(_loadObjects)
275 d.addCallback(_checkLoadObjects)
276 d.addCallback(_loadObjects)
277 d.addCallback(_checkLoadObjects2)
278 d.addCallback(_changeParent)
279 d.addCallback(_loadBack)
280 d.addCallback(_checkLoadBack)
281 d.addCallback(_saveMoreTestRows)
282 d.addCallback(_loadRowsBack)
283 d.addCallback(_checkRowsBack)
284 d.addCallback(_changeRows)
285 d.addCallback(_loadRowsBack)
286 d.addCallback(_checkRowsBack)
287 d.addCallback(_deleteRows)
288 d.addCallback(_loadRowsBack)
289 d.addCallback(_checkRowsDeleted)
290 return d
291
292
293 def test_saveAndDelete(self):
294 """
295 Create a row and then try to delete it.
296 """
297 # create one row to work with
298 row = TestRow()
299 row.assignKeyAttr("key_string", "first")
300 values = self.randomizeRow(row)
301 # save it
302 d = self.reflector.insertRow(row)
303 def _deleteRow(_):
304 # delete it
305 return self.reflector.deleteRow(row)
306 d.addCallback(_deleteRow)
307 return d
308 test_saveAndDelete.suppress = [rowObjectSuppression, reflectorSuppression]
309
310
311 def gotData(self, data):
312 self.data = data
313
314
315 ReflectorTestBase.timeout = 30.0
316
317
318 class SQLReflectorTestBase(ReflectorTestBase):
319 """
320 Base class for the SQL reflector.
321 """
322
323 def createReflector(self):
324 self.startDB()
325 self.dbpool = self.makePool()
326 self.dbpool.start()
327
328 if self.can_clear:
329 d = self.dbpool.runOperation('DROP TABLE testTable')
330 d.addCallback(lambda _:
331 self.dbpool.runOperation('DROP TABLE childTable'))
332 d.addErrback(lambda _: None)
333 else:
334 d = defer.succeed(None)
335
336 d.addCallback(lambda _: self.dbpool.runOperation(main_table_schema))
337 d.addCallback(lambda _: self.dbpool.runOperation(child_table_schema))
338 reflectorClass = self.escape_slashes and SQLReflector \
339 or NoSlashSQLReflector
340 d.addCallback(lambda _:
341 reflectorClass(self.dbpool, [TestRow, ChildRow]))
342 return d
343
344 def destroyReflector(self):
345 d = self.dbpool.runOperation('DROP TABLE testTable')
346 d.addCallback(lambda _:
347 self.dbpool.runOperation('DROP TABLE childTable'))
348 def close(_):
349 self.dbpool.close()
350 self.stopDB()
351 d.addCallback(close)
352 return d
353
354
355 class DeprecationTestCase(unittest.TestCase):
356 """
357 Test various deprecations of twisted.enterprise.
358 """
359
360 def test_rowDeprecation(self):
361 """
362 Test deprecation of L{RowObject}.
363 """
364 def wrapper():
365 return TestRow()
366 self.assertWarns(DeprecationWarning,
367 "twisted.enterprise.row is deprecated since Twisted 8.0",
368 __file__,
369 wrapper)
370
371 def test_reflectorDeprecation(self):
372 """
373 Test deprecation of L{SQLReflector}.
374 """
375 def wrapper():
376 return SQLReflector(None, ())
377 from twisted.enterprise import sqlreflector
378 self.assertWarns(DeprecationWarning,
379 "twisted.enterprise.reflector is deprecated since Twisted 8.0",
380 sqlreflector.__file__,
381 wrapper)
382
383
384 # GadflyReflectorTestCase SQLiteReflectorTestCase PyPgSQLReflectorTestCase
385 # PsycopgReflectorTestCase MySQLReflectorTestCase FirebirdReflectorTestCase
386 makeSQLTests(SQLReflectorTestBase, 'ReflectorTestCase', globals())
387
388
389 class NoSlashSQLReflector(SQLReflector):
390 """
391 An sql reflector that only escapes single quotes.
392 """
393
394 def escape_string(self, text):
395 return text.replace("'", "''")
396
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_reflect.py ('k') | third_party/twisted_8_1/twisted/test/test_roots.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698