Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(279)

Side by Side Diff: third_party/twisted_8_1/twisted/test/test_adbapi.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 # Copyright (c) 2001-2008 Twisted Matrix Laboratories.
2 # See LICENSE for details.
3
4
5 """
6 Tests for twisted.enterprise.adbapi.
7 """
8
9 from twisted.trial import unittest
10
11 import os, stat
12
13 from twisted.enterprise.adbapi import ConnectionPool, ConnectionLost, safe
14 from twisted.enterprise.adbapi import _unreleasedVersion
15 from twisted.internet import reactor, defer, interfaces
16
17
18 simple_table_schema = """
19 CREATE TABLE simple (
20 x integer
21 )
22 """
23
24
25 class ADBAPITestBase:
26 """Test the asynchronous DB-API code."""
27
28 openfun_called = {}
29
30 if interfaces.IReactorThreads(reactor, None) is None:
31 skip = "ADB-API requires threads, no way to test without them"
32
33 def setUp(self):
34 self.startDB()
35 self.dbpool = self.makePool(cp_openfun=self.openfun)
36 self.dbpool.start()
37
38 def tearDown(self):
39 d = self.dbpool.runOperation('DROP TABLE simple')
40 d.addCallback(lambda res: self.dbpool.close())
41 d.addCallback(lambda res: self.stopDB())
42 return d
43
44 def openfun(self, conn):
45 self.openfun_called[conn] = True
46
47 def checkOpenfunCalled(self, conn=None):
48 if not conn:
49 self.failUnless(self.openfun_called)
50 else:
51 self.failUnless(self.openfun_called.has_key(conn))
52
53 def testPool(self):
54 d = self.dbpool.runOperation(simple_table_schema)
55 if self.test_failures:
56 d.addCallback(self._testPool_1_1)
57 d.addCallback(self._testPool_1_2)
58 d.addCallback(self._testPool_1_3)
59 d.addCallback(self._testPool_1_4)
60 d.addCallback(lambda res: self.flushLoggedErrors())
61 d.addCallback(self._testPool_2)
62 d.addCallback(self._testPool_3)
63 d.addCallback(self._testPool_4)
64 d.addCallback(self._testPool_5)
65 d.addCallback(self._testPool_6)
66 d.addCallback(self._testPool_7)
67 d.addCallback(self._testPool_8)
68 d.addCallback(self._testPool_9)
69 return d
70
71 def _testPool_1_1(self, res):
72 d = defer.maybeDeferred(self.dbpool.runQuery, "select * from NOTABLE")
73 d.addCallbacks(lambda res: self.fail('no exception'),
74 lambda f: None)
75 return d
76
77 def _testPool_1_2(self, res):
78 d = defer.maybeDeferred(self.dbpool.runOperation,
79 "deletexxx from NOTABLE")
80 d.addCallbacks(lambda res: self.fail('no exception'),
81 lambda f: None)
82 return d
83
84 def _testPool_1_3(self, res):
85 d = defer.maybeDeferred(self.dbpool.runInteraction,
86 self.bad_interaction)
87 d.addCallbacks(lambda res: self.fail('no exception'),
88 lambda f: None)
89 return d
90
91 def _testPool_1_4(self, res):
92 d = defer.maybeDeferred(self.dbpool.runWithConnection,
93 self.bad_withConnection)
94 d.addCallbacks(lambda res: self.fail('no exception'),
95 lambda f: None)
96 return d
97
98 def _testPool_2(self, res):
99 # verify simple table is empty
100 sql = "select count(1) from simple"
101 d = self.dbpool.runQuery(sql)
102 def _check(row):
103 self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back")
104 self.checkOpenfunCalled()
105 d.addCallback(_check)
106 return d
107
108 def _testPool_3(self, res):
109 sql = "select count(1) from simple"
110 inserts = []
111 # add some rows to simple table (runOperation)
112 for i in range(self.num_iterations):
113 sql = "insert into simple(x) values(%d)" % i
114 inserts.append(self.dbpool.runOperation(sql))
115 d = defer.gatherResults(inserts)
116
117 def _select(res):
118 # make sure they were added (runQuery)
119 sql = "select x from simple order by x";
120 d = self.dbpool.runQuery(sql)
121 return d
122 d.addCallback(_select)
123
124 def _check(rows):
125 self.failUnless(len(rows) == self.num_iterations,
126 "Wrong number of rows")
127 for i in range(self.num_iterations):
128 self.failUnless(len(rows[i]) == 1, "Wrong size row")
129 self.failUnless(rows[i][0] == i, "Values not returned.")
130 d.addCallback(_check)
131
132 return d
133
134 def _testPool_4(self, res):
135 # runInteraction
136 d = self.dbpool.runInteraction(self.interaction)
137 d.addCallback(lambda res: self.assertEquals(res, "done"))
138 return d
139
140 def _testPool_5(self, res):
141 # withConnection
142 d = self.dbpool.runWithConnection(self.withConnection)
143 d.addCallback(lambda res: self.assertEquals(res, "done"))
144 return d
145
146 def _testPool_6(self, res):
147 # Test a withConnection cannot be closed
148 d = self.dbpool.runWithConnection(self.close_withConnection)
149 return d
150
151 def _testPool_7(self, res):
152 # give the pool a workout
153 ds = []
154 for i in range(self.num_iterations):
155 sql = "select x from simple where x = %d" % i
156 ds.append(self.dbpool.runQuery(sql))
157 dlist = defer.DeferredList(ds, fireOnOneErrback=True)
158 def _check(result):
159 for i in range(self.num_iterations):
160 self.failUnless(result[i][1][0][0] == i, "Value not returned")
161 dlist.addCallback(_check)
162 return dlist
163
164 def _testPool_8(self, res):
165 # now delete everything
166 ds = []
167 for i in range(self.num_iterations):
168 sql = "delete from simple where x = %d" % i
169 ds.append(self.dbpool.runOperation(sql))
170 dlist = defer.DeferredList(ds, fireOnOneErrback=True)
171 return dlist
172
173 def _testPool_9(self, res):
174 # verify simple table is empty
175 sql = "select count(1) from simple"
176 d = self.dbpool.runQuery(sql)
177 def _check(row):
178 self.failUnless(int(row[0][0]) == 0,
179 "Didn't successfully delete table contents")
180 self.checkConnect()
181 d.addCallback(_check)
182 return d
183
184 def checkConnect(self):
185 """Check the connect/disconnect synchronous calls."""
186 conn = self.dbpool.connect()
187 self.checkOpenfunCalled(conn)
188 curs = conn.cursor()
189 curs.execute("insert into simple(x) values(1)")
190 curs.execute("select x from simple")
191 res = curs.fetchall()
192 self.failUnlessEqual(len(res), 1)
193 self.failUnlessEqual(len(res[0]), 1)
194 self.failUnlessEqual(res[0][0], 1)
195 curs.execute("delete from simple")
196 curs.execute("select x from simple")
197 self.failUnlessEqual(len(curs.fetchall()), 0)
198 curs.close()
199 self.dbpool.disconnect(conn)
200
201 def interaction(self, transaction):
202 transaction.execute("select x from simple order by x")
203 for i in range(self.num_iterations):
204 row = transaction.fetchone()
205 self.failUnless(len(row) == 1, "Wrong size row")
206 self.failUnless(row[0] == i, "Value not returned.")
207 # should test this, but gadfly throws an exception instead
208 #self.failUnless(transaction.fetchone() is None, "Too many rows")
209 return "done"
210
211 def bad_interaction(self, transaction):
212 if self.can_rollback:
213 transaction.execute("insert into simple(x) values(0)")
214
215 transaction.execute("select * from NOTABLE")
216
217 def withConnection(self, conn):
218 curs = conn.cursor()
219 try:
220 curs.execute("select x from simple order by x")
221 for i in range(self.num_iterations):
222 row = curs.fetchone()
223 self.failUnless(len(row) == 1, "Wrong size row")
224 self.failUnless(row[0] == i, "Value not returned.")
225 # should test this, but gadfly throws an exception instead
226 #self.failUnless(transaction.fetchone() is None, "Too many rows")
227 finally:
228 curs.close()
229 return "done"
230
231 def close_withConnection(self, conn):
232 conn.close()
233
234 def bad_withConnection(self, conn):
235 curs = conn.cursor()
236 try:
237 curs.execute("select * from NOTABLE")
238 finally:
239 curs.close()
240
241
242 class ReconnectTestBase:
243 """Test the asynchronous DB-API code with reconnect."""
244
245 if interfaces.IReactorThreads(reactor, None) is None:
246 skip = "ADB-API requires threads, no way to test without them"
247
248 def setUp(self):
249 if self.good_sql is None:
250 raise unittest.SkipTest('no good sql for reconnect test')
251 self.startDB()
252 self.dbpool = self.makePool(cp_max=1, cp_reconnect=True,
253 cp_good_sql=self.good_sql)
254 self.dbpool.start()
255 return self.dbpool.runOperation(simple_table_schema)
256
257 def tearDown(self):
258 d = self.dbpool.runOperation('DROP TABLE simple')
259 d.addCallback(lambda res: self.dbpool.close())
260 d.addCallback(lambda res: self.stopDB())
261 return d
262
263 def testPool(self):
264 d = defer.succeed(None)
265 d.addCallback(self._testPool_1)
266 d.addCallback(self._testPool_2)
267 if not self.early_reconnect:
268 d.addCallback(self._testPool_3)
269 d.addCallback(self._testPool_4)
270 d.addCallback(self._testPool_5)
271 return d
272
273 def _testPool_1(self, res):
274 sql = "select count(1) from simple"
275 d = self.dbpool.runQuery(sql)
276 def _check(row):
277 self.failUnless(int(row[0][0]) == 0, "Table not empty")
278 d.addCallback(_check)
279 return d
280
281 def _testPool_2(self, res):
282 # reach in and close the connection manually
283 self.dbpool.connections.values()[0].close()
284
285 def _testPool_3(self, res):
286 sql = "select count(1) from simple"
287 d = defer.maybeDeferred(self.dbpool.runQuery, sql)
288 d.addCallbacks(lambda res: self.fail('no exception'),
289 lambda f: f.trap(ConnectionLost))
290 return d
291
292 def _testPool_4(self, res):
293 sql = "select count(1) from simple"
294 d = self.dbpool.runQuery(sql)
295 def _check(row):
296 self.failUnless(int(row[0][0]) == 0, "Table not empty")
297 d.addCallback(_check)
298 return d
299
300 def _testPool_5(self, res):
301 sql = "select * from NOTABLE" # bad sql
302 d = defer.maybeDeferred(self.dbpool.runQuery, sql)
303 d.addCallbacks(lambda res: self.fail('no exception'),
304 lambda f: self.failIf(f.check(ConnectionLost)))
305 return d
306
307
308 class DBTestConnector:
309 """A class which knows how to test for the presence of
310 and establish a connection to a relational database.
311
312 To enable test cases which use a central, system database,
313 you must create a database named DB_NAME with a user DB_USER
314 and password DB_PASS with full access rights to database DB_NAME.
315 """
316
317 TEST_PREFIX = None # used for creating new test cases
318
319 DB_NAME = "twisted_test"
320 DB_USER = 'twisted_test'
321 DB_PASS = 'twisted_test'
322
323 DB_DIR = None # directory for database storage
324
325 nulls_ok = True # nulls supported
326 trailing_spaces_ok = True # trailing spaces in strings preserved
327 can_rollback = True # rollback supported
328 test_failures = True # test bad sql?
329 escape_slashes = True # escape \ in sql?
330 good_sql = ConnectionPool.good_sql
331 early_reconnect = True # cursor() will fail on closed connection
332 can_clear = True # can try to clear out tables when starting
333 needs_dbdir = False # if a temporary directory is needed for the db
334
335 num_iterations = 50 # number of iterations for test loops
336 # (lower this for slow db's)
337
338 def setUpClass(self):
339 if self.needs_dbdir:
340 self.DB_DIR = self.mktemp()
341 os.mkdir(self.DB_DIR)
342
343 if not self.can_connect():
344 raise unittest.SkipTest('%s: Cannot access db' % self.TEST_PREFIX)
345
346 def can_connect(self):
347 """Return true if this database is present on the system
348 and can be used in a test."""
349 raise NotImplementedError()
350
351 def startDB(self):
352 """Take any steps needed to bring database up."""
353 pass
354
355 def stopDB(self):
356 """Bring database down, if needed."""
357 pass
358
359 def makePool(self, **newkw):
360 """Create a connection pool with additional keyword arguments."""
361 args, kw = self.getPoolArgs()
362 kw = kw.copy()
363 kw.update(newkw)
364 return ConnectionPool(*args, **kw)
365
366 def getPoolArgs(self):
367 """Return a tuple (args, kw) of list and keyword arguments
368 that need to be passed to ConnectionPool to create a connection
369 to this database."""
370 raise NotImplementedError()
371
372 class GadflyConnector(DBTestConnector):
373 TEST_PREFIX = 'Gadfly'
374
375 nulls_ok = False
376 can_rollback = False
377 escape_slashes = False
378 good_sql = 'select * from simple where 1=0'
379 needs_dbdir = True
380
381 num_iterations = 1 # slow
382
383 def can_connect(self):
384 try: import gadfly
385 except: return False
386 if not getattr(gadfly, 'connect', None):
387 gadfly.connect = gadfly.gadfly
388 return True
389
390 def startDB(self):
391 import gadfly
392 conn = gadfly.gadfly()
393 conn.startup(self.DB_NAME, self.DB_DIR)
394
395 # gadfly seems to want us to create something to get the db going
396 cursor = conn.cursor()
397 cursor.execute("create table x (x integer)")
398 conn.commit()
399 conn.close()
400
401 def getPoolArgs(self):
402 args = ('gadfly', self.DB_NAME, self.DB_DIR)
403 kw = {'cp_max': 1}
404 return args, kw
405
406 class SQLiteConnector(DBTestConnector):
407 TEST_PREFIX = 'SQLite'
408
409 escape_slashes = False
410 needs_dbdir = True
411
412 num_iterations = 1 # slow
413
414 def can_connect(self):
415 try: import sqlite
416 except: return False
417 return True
418
419 def startDB(self):
420 self.database = os.path.join(self.DB_DIR, self.DB_NAME)
421 if os.path.exists(self.database):
422 os.unlink(self.database)
423
424 def getPoolArgs(self):
425 args = ('sqlite',)
426 kw = {'database': self.database, 'cp_max': 1}
427 return args, kw
428
429 class PyPgSQLConnector(DBTestConnector):
430 TEST_PREFIX = "PyPgSQL"
431
432 def can_connect(self):
433 try: from pyPgSQL import PgSQL
434 except: return False
435 try:
436 conn = PgSQL.connect(database=self.DB_NAME, user=self.DB_USER,
437 password=self.DB_PASS)
438 conn.close()
439 return True
440 except:
441 return False
442
443 def getPoolArgs(self):
444 args = ('pyPgSQL.PgSQL',)
445 kw = {'database': self.DB_NAME, 'user': self.DB_USER,
446 'password': self.DB_PASS, 'cp_min': 0}
447 return args, kw
448
449 class PsycopgConnector(DBTestConnector):
450 TEST_PREFIX = 'Psycopg'
451
452 def can_connect(self):
453 try: import psycopg
454 except: return False
455 try:
456 conn = psycopg.connect(database=self.DB_NAME, user=self.DB_USER,
457 password=self.DB_PASS)
458 conn.close()
459 return True
460 except:
461 return False
462
463 def getPoolArgs(self):
464 args = ('psycopg',)
465 kw = {'database': self.DB_NAME, 'user': self.DB_USER,
466 'password': self.DB_PASS, 'cp_min': 0}
467 return args, kw
468
469 class MySQLConnector(DBTestConnector):
470 TEST_PREFIX = 'MySQL'
471
472 trailing_spaces_ok = False
473 can_rollback = False
474 early_reconnect = False
475
476 def can_connect(self):
477 try: import MySQLdb
478 except: return False
479 try:
480 conn = MySQLdb.connect(db=self.DB_NAME, user=self.DB_USER,
481 passwd=self.DB_PASS)
482 conn.close()
483 return True
484 except:
485 return False
486
487 def getPoolArgs(self):
488 args = ('MySQLdb',)
489 kw = {'db': self.DB_NAME, 'user': self.DB_USER, 'passwd': self.DB_PASS}
490 return args, kw
491
492 class FirebirdConnector(DBTestConnector):
493 TEST_PREFIX = 'Firebird'
494
495 test_failures = False # failure testing causes problems
496 escape_slashes = False
497 good_sql = None # firebird doesn't handle failed sql well
498 can_clear = False # firebird is not so good
499 needs_dbdir = True
500
501 num_iterations = 5 # slow
502
503 def can_connect(self):
504 try: import kinterbasdb
505 except: return False
506 try:
507 self.startDB()
508 self.stopDB()
509 return True
510 except:
511 return False
512
513 def startDB(self):
514 import kinterbasdb
515 self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME)
516 os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO)
517 sql = 'create database "%s" user "%s" password "%s"'
518 sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS);
519 conn = kinterbasdb.create_database(sql)
520 conn.close()
521
522 def getPoolArgs(self):
523 args = ('kinterbasdb',)
524 kw = {'database': self.DB_NAME, 'host': '127.0.0.1',
525 'user': self.DB_USER, 'password': self.DB_PASS}
526 return args, kw
527
528 def stopDB(self):
529 import kinterbasdb
530 conn = kinterbasdb.connect(database=self.DB_NAME,
531 host='127.0.0.1', user=self.DB_USER,
532 password=self.DB_PASS)
533 conn.drop_database()
534
535 def makeSQLTests(base, suffix, globals):
536 """
537 Make a test case for every db connector which can connect.
538
539 @param base: Base class for test case. Additional base classes
540 will be a DBConnector subclass and unittest.TestCase
541 @param suffix: A suffix used to create test case names. Prefixes
542 are defined in the DBConnector subclasses.
543 """
544 connectors = [GadflyConnector, SQLiteConnector, PyPgSQLConnector,
545 PsycopgConnector, MySQLConnector, FirebirdConnector]
546 for connclass in connectors:
547 name = connclass.TEST_PREFIX + suffix
548 import new
549 klass = new.classobj(name, (connclass, base, unittest.TestCase), base.__ dict__)
550 globals[name] = klass
551
552 # GadflyADBAPITestCase SQLiteADBAPITestCase PyPgSQLADBAPITestCase
553 # PsycopgADBAPITestCase MySQLADBAPITestCase FirebirdADBAPITestCase
554 makeSQLTests(ADBAPITestBase, 'ADBAPITestCase', globals())
555
556 # GadflyReconnectTestCase SQLiteReconnectTestCase PyPgSQLReconnectTestCase
557 # PsycopgReconnectTestCase MySQLReconnectTestCase FirebirdReconnectTestCase
558 makeSQLTests(ReconnectTestBase, 'ReconnectTestCase', globals())
559
560
561
562 class DeprecationTestCase(unittest.TestCase):
563 """
564 Test deprecations in twisted.enterprise.adbapi
565 """
566
567 def test_safe(self):
568 """
569 Test deprecation of twisted.enterprise.adbapi.safe()
570 """
571 result = self.callDeprecated(_unreleasedVersion,
572 safe, "test'")
573
574 # make sure safe still behaves like the original
575 self.assertEqual(result, "test''")
OLDNEW
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_abstract.py ('k') | third_party/twisted_8_1/twisted/test/test_amp.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698