Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(153)

Unified Diff: third_party/twisted_8_1/twisted/test/test_adbapi.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/test/test_adbapi.py
diff --git a/third_party/twisted_8_1/twisted/test/test_adbapi.py b/third_party/twisted_8_1/twisted/test/test_adbapi.py
deleted file mode 100644
index fcda19a70905209571adfe8d86e67a96427b0e7a..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/test/test_adbapi.py
+++ /dev/null
@@ -1,575 +0,0 @@
-# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-
-"""
-Tests for twisted.enterprise.adbapi.
-"""
-
-from twisted.trial import unittest
-
-import os, stat
-
-from twisted.enterprise.adbapi import ConnectionPool, ConnectionLost, safe
-from twisted.enterprise.adbapi import _unreleasedVersion
-from twisted.internet import reactor, defer, interfaces
-
-
-simple_table_schema = """
-CREATE TABLE simple (
- x integer
-)
-"""
-
-
-class ADBAPITestBase:
- """Test the asynchronous DB-API code."""
-
- openfun_called = {}
-
- if interfaces.IReactorThreads(reactor, None) is None:
- skip = "ADB-API requires threads, no way to test without them"
-
- def setUp(self):
- self.startDB()
- self.dbpool = self.makePool(cp_openfun=self.openfun)
- self.dbpool.start()
-
- def tearDown(self):
- d = self.dbpool.runOperation('DROP TABLE simple')
- d.addCallback(lambda res: self.dbpool.close())
- d.addCallback(lambda res: self.stopDB())
- return d
-
- def openfun(self, conn):
- self.openfun_called[conn] = True
-
- def checkOpenfunCalled(self, conn=None):
- if not conn:
- self.failUnless(self.openfun_called)
- else:
- self.failUnless(self.openfun_called.has_key(conn))
-
- def testPool(self):
- d = self.dbpool.runOperation(simple_table_schema)
- if self.test_failures:
- d.addCallback(self._testPool_1_1)
- d.addCallback(self._testPool_1_2)
- d.addCallback(self._testPool_1_3)
- d.addCallback(self._testPool_1_4)
- d.addCallback(lambda res: self.flushLoggedErrors())
- d.addCallback(self._testPool_2)
- d.addCallback(self._testPool_3)
- d.addCallback(self._testPool_4)
- d.addCallback(self._testPool_5)
- d.addCallback(self._testPool_6)
- d.addCallback(self._testPool_7)
- d.addCallback(self._testPool_8)
- d.addCallback(self._testPool_9)
- return d
-
- def _testPool_1_1(self, res):
- d = defer.maybeDeferred(self.dbpool.runQuery, "select * from NOTABLE")
- d.addCallbacks(lambda res: self.fail('no exception'),
- lambda f: None)
- return d
-
- def _testPool_1_2(self, res):
- d = defer.maybeDeferred(self.dbpool.runOperation,
- "deletexxx from NOTABLE")
- d.addCallbacks(lambda res: self.fail('no exception'),
- lambda f: None)
- return d
-
- def _testPool_1_3(self, res):
- d = defer.maybeDeferred(self.dbpool.runInteraction,
- self.bad_interaction)
- d.addCallbacks(lambda res: self.fail('no exception'),
- lambda f: None)
- return d
-
- def _testPool_1_4(self, res):
- d = defer.maybeDeferred(self.dbpool.runWithConnection,
- self.bad_withConnection)
- d.addCallbacks(lambda res: self.fail('no exception'),
- lambda f: None)
- return d
-
- def _testPool_2(self, res):
- # verify simple table is empty
- sql = "select count(1) from simple"
- d = self.dbpool.runQuery(sql)
- def _check(row):
- self.failUnless(int(row[0][0]) == 0, "Interaction not rolled back")
- self.checkOpenfunCalled()
- d.addCallback(_check)
- return d
-
- def _testPool_3(self, res):
- sql = "select count(1) from simple"
- inserts = []
- # add some rows to simple table (runOperation)
- for i in range(self.num_iterations):
- sql = "insert into simple(x) values(%d)" % i
- inserts.append(self.dbpool.runOperation(sql))
- d = defer.gatherResults(inserts)
-
- def _select(res):
- # make sure they were added (runQuery)
- sql = "select x from simple order by x";
- d = self.dbpool.runQuery(sql)
- return d
- d.addCallback(_select)
-
- def _check(rows):
- self.failUnless(len(rows) == self.num_iterations,
- "Wrong number of rows")
- for i in range(self.num_iterations):
- self.failUnless(len(rows[i]) == 1, "Wrong size row")
- self.failUnless(rows[i][0] == i, "Values not returned.")
- d.addCallback(_check)
-
- return d
-
- def _testPool_4(self, res):
- # runInteraction
- d = self.dbpool.runInteraction(self.interaction)
- d.addCallback(lambda res: self.assertEquals(res, "done"))
- return d
-
- def _testPool_5(self, res):
- # withConnection
- d = self.dbpool.runWithConnection(self.withConnection)
- d.addCallback(lambda res: self.assertEquals(res, "done"))
- return d
-
- def _testPool_6(self, res):
- # Test a withConnection cannot be closed
- d = self.dbpool.runWithConnection(self.close_withConnection)
- return d
-
- def _testPool_7(self, res):
- # give the pool a workout
- ds = []
- for i in range(self.num_iterations):
- sql = "select x from simple where x = %d" % i
- ds.append(self.dbpool.runQuery(sql))
- dlist = defer.DeferredList(ds, fireOnOneErrback=True)
- def _check(result):
- for i in range(self.num_iterations):
- self.failUnless(result[i][1][0][0] == i, "Value not returned")
- dlist.addCallback(_check)
- return dlist
-
- def _testPool_8(self, res):
- # now delete everything
- ds = []
- for i in range(self.num_iterations):
- sql = "delete from simple where x = %d" % i
- ds.append(self.dbpool.runOperation(sql))
- dlist = defer.DeferredList(ds, fireOnOneErrback=True)
- return dlist
-
- def _testPool_9(self, res):
- # verify simple table is empty
- sql = "select count(1) from simple"
- d = self.dbpool.runQuery(sql)
- def _check(row):
- self.failUnless(int(row[0][0]) == 0,
- "Didn't successfully delete table contents")
- self.checkConnect()
- d.addCallback(_check)
- return d
-
- def checkConnect(self):
- """Check the connect/disconnect synchronous calls."""
- conn = self.dbpool.connect()
- self.checkOpenfunCalled(conn)
- curs = conn.cursor()
- curs.execute("insert into simple(x) values(1)")
- curs.execute("select x from simple")
- res = curs.fetchall()
- self.failUnlessEqual(len(res), 1)
- self.failUnlessEqual(len(res[0]), 1)
- self.failUnlessEqual(res[0][0], 1)
- curs.execute("delete from simple")
- curs.execute("select x from simple")
- self.failUnlessEqual(len(curs.fetchall()), 0)
- curs.close()
- self.dbpool.disconnect(conn)
-
- def interaction(self, transaction):
- transaction.execute("select x from simple order by x")
- for i in range(self.num_iterations):
- row = transaction.fetchone()
- self.failUnless(len(row) == 1, "Wrong size row")
- self.failUnless(row[0] == i, "Value not returned.")
- # should test this, but gadfly throws an exception instead
- #self.failUnless(transaction.fetchone() is None, "Too many rows")
- return "done"
-
- def bad_interaction(self, transaction):
- if self.can_rollback:
- transaction.execute("insert into simple(x) values(0)")
-
- transaction.execute("select * from NOTABLE")
-
- def withConnection(self, conn):
- curs = conn.cursor()
- try:
- curs.execute("select x from simple order by x")
- for i in range(self.num_iterations):
- row = curs.fetchone()
- self.failUnless(len(row) == 1, "Wrong size row")
- self.failUnless(row[0] == i, "Value not returned.")
- # should test this, but gadfly throws an exception instead
- #self.failUnless(transaction.fetchone() is None, "Too many rows")
- finally:
- curs.close()
- return "done"
-
- def close_withConnection(self, conn):
- conn.close()
-
- def bad_withConnection(self, conn):
- curs = conn.cursor()
- try:
- curs.execute("select * from NOTABLE")
- finally:
- curs.close()
-
-
-class ReconnectTestBase:
- """Test the asynchronous DB-API code with reconnect."""
-
- if interfaces.IReactorThreads(reactor, None) is None:
- skip = "ADB-API requires threads, no way to test without them"
-
- def setUp(self):
- if self.good_sql is None:
- raise unittest.SkipTest('no good sql for reconnect test')
- self.startDB()
- self.dbpool = self.makePool(cp_max=1, cp_reconnect=True,
- cp_good_sql=self.good_sql)
- self.dbpool.start()
- return self.dbpool.runOperation(simple_table_schema)
-
- def tearDown(self):
- d = self.dbpool.runOperation('DROP TABLE simple')
- d.addCallback(lambda res: self.dbpool.close())
- d.addCallback(lambda res: self.stopDB())
- return d
-
- def testPool(self):
- d = defer.succeed(None)
- d.addCallback(self._testPool_1)
- d.addCallback(self._testPool_2)
- if not self.early_reconnect:
- d.addCallback(self._testPool_3)
- d.addCallback(self._testPool_4)
- d.addCallback(self._testPool_5)
- return d
-
- def _testPool_1(self, res):
- sql = "select count(1) from simple"
- d = self.dbpool.runQuery(sql)
- def _check(row):
- self.failUnless(int(row[0][0]) == 0, "Table not empty")
- d.addCallback(_check)
- return d
-
- def _testPool_2(self, res):
- # reach in and close the connection manually
- self.dbpool.connections.values()[0].close()
-
- def _testPool_3(self, res):
- sql = "select count(1) from simple"
- d = defer.maybeDeferred(self.dbpool.runQuery, sql)
- d.addCallbacks(lambda res: self.fail('no exception'),
- lambda f: f.trap(ConnectionLost))
- return d
-
- def _testPool_4(self, res):
- sql = "select count(1) from simple"
- d = self.dbpool.runQuery(sql)
- def _check(row):
- self.failUnless(int(row[0][0]) == 0, "Table not empty")
- d.addCallback(_check)
- return d
-
- def _testPool_5(self, res):
- sql = "select * from NOTABLE" # bad sql
- d = defer.maybeDeferred(self.dbpool.runQuery, sql)
- d.addCallbacks(lambda res: self.fail('no exception'),
- lambda f: self.failIf(f.check(ConnectionLost)))
- return d
-
-
-class DBTestConnector:
- """A class which knows how to test for the presence of
- and establish a connection to a relational database.
-
- To enable test cases which use a central, system database,
- you must create a database named DB_NAME with a user DB_USER
- and password DB_PASS with full access rights to database DB_NAME.
- """
-
- TEST_PREFIX = None # used for creating new test cases
-
- DB_NAME = "twisted_test"
- DB_USER = 'twisted_test'
- DB_PASS = 'twisted_test'
-
- DB_DIR = None # directory for database storage
-
- nulls_ok = True # nulls supported
- trailing_spaces_ok = True # trailing spaces in strings preserved
- can_rollback = True # rollback supported
- test_failures = True # test bad sql?
- escape_slashes = True # escape \ in sql?
- good_sql = ConnectionPool.good_sql
- early_reconnect = True # cursor() will fail on closed connection
- can_clear = True # can try to clear out tables when starting
- needs_dbdir = False # if a temporary directory is needed for the db
-
- num_iterations = 50 # number of iterations for test loops
- # (lower this for slow db's)
-
- def setUpClass(self):
- if self.needs_dbdir:
- self.DB_DIR = self.mktemp()
- os.mkdir(self.DB_DIR)
-
- if not self.can_connect():
- raise unittest.SkipTest('%s: Cannot access db' % self.TEST_PREFIX)
-
- def can_connect(self):
- """Return true if this database is present on the system
- and can be used in a test."""
- raise NotImplementedError()
-
- def startDB(self):
- """Take any steps needed to bring database up."""
- pass
-
- def stopDB(self):
- """Bring database down, if needed."""
- pass
-
- def makePool(self, **newkw):
- """Create a connection pool with additional keyword arguments."""
- args, kw = self.getPoolArgs()
- kw = kw.copy()
- kw.update(newkw)
- return ConnectionPool(*args, **kw)
-
- def getPoolArgs(self):
- """Return a tuple (args, kw) of list and keyword arguments
- that need to be passed to ConnectionPool to create a connection
- to this database."""
- raise NotImplementedError()
-
-class GadflyConnector(DBTestConnector):
- TEST_PREFIX = 'Gadfly'
-
- nulls_ok = False
- can_rollback = False
- escape_slashes = False
- good_sql = 'select * from simple where 1=0'
- needs_dbdir = True
-
- num_iterations = 1 # slow
-
- def can_connect(self):
- try: import gadfly
- except: return False
- if not getattr(gadfly, 'connect', None):
- gadfly.connect = gadfly.gadfly
- return True
-
- def startDB(self):
- import gadfly
- conn = gadfly.gadfly()
- conn.startup(self.DB_NAME, self.DB_DIR)
-
- # gadfly seems to want us to create something to get the db going
- cursor = conn.cursor()
- cursor.execute("create table x (x integer)")
- conn.commit()
- conn.close()
-
- def getPoolArgs(self):
- args = ('gadfly', self.DB_NAME, self.DB_DIR)
- kw = {'cp_max': 1}
- return args, kw
-
-class SQLiteConnector(DBTestConnector):
- TEST_PREFIX = 'SQLite'
-
- escape_slashes = False
- needs_dbdir = True
-
- num_iterations = 1 # slow
-
- def can_connect(self):
- try: import sqlite
- except: return False
- return True
-
- def startDB(self):
- self.database = os.path.join(self.DB_DIR, self.DB_NAME)
- if os.path.exists(self.database):
- os.unlink(self.database)
-
- def getPoolArgs(self):
- args = ('sqlite',)
- kw = {'database': self.database, 'cp_max': 1}
- return args, kw
-
-class PyPgSQLConnector(DBTestConnector):
- TEST_PREFIX = "PyPgSQL"
-
- def can_connect(self):
- try: from pyPgSQL import PgSQL
- except: return False
- try:
- conn = PgSQL.connect(database=self.DB_NAME, user=self.DB_USER,
- password=self.DB_PASS)
- conn.close()
- return True
- except:
- return False
-
- def getPoolArgs(self):
- args = ('pyPgSQL.PgSQL',)
- kw = {'database': self.DB_NAME, 'user': self.DB_USER,
- 'password': self.DB_PASS, 'cp_min': 0}
- return args, kw
-
-class PsycopgConnector(DBTestConnector):
- TEST_PREFIX = 'Psycopg'
-
- def can_connect(self):
- try: import psycopg
- except: return False
- try:
- conn = psycopg.connect(database=self.DB_NAME, user=self.DB_USER,
- password=self.DB_PASS)
- conn.close()
- return True
- except:
- return False
-
- def getPoolArgs(self):
- args = ('psycopg',)
- kw = {'database': self.DB_NAME, 'user': self.DB_USER,
- 'password': self.DB_PASS, 'cp_min': 0}
- return args, kw
-
-class MySQLConnector(DBTestConnector):
- TEST_PREFIX = 'MySQL'
-
- trailing_spaces_ok = False
- can_rollback = False
- early_reconnect = False
-
- def can_connect(self):
- try: import MySQLdb
- except: return False
- try:
- conn = MySQLdb.connect(db=self.DB_NAME, user=self.DB_USER,
- passwd=self.DB_PASS)
- conn.close()
- return True
- except:
- return False
-
- def getPoolArgs(self):
- args = ('MySQLdb',)
- kw = {'db': self.DB_NAME, 'user': self.DB_USER, 'passwd': self.DB_PASS}
- return args, kw
-
-class FirebirdConnector(DBTestConnector):
- TEST_PREFIX = 'Firebird'
-
- test_failures = False # failure testing causes problems
- escape_slashes = False
- good_sql = None # firebird doesn't handle failed sql well
- can_clear = False # firebird is not so good
- needs_dbdir = True
-
- num_iterations = 5 # slow
-
- def can_connect(self):
- try: import kinterbasdb
- except: return False
- try:
- self.startDB()
- self.stopDB()
- return True
- except:
- return False
-
- def startDB(self):
- import kinterbasdb
- self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME)
- os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO)
- sql = 'create database "%s" user "%s" password "%s"'
- sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS);
- conn = kinterbasdb.create_database(sql)
- conn.close()
-
- def getPoolArgs(self):
- args = ('kinterbasdb',)
- kw = {'database': self.DB_NAME, 'host': '127.0.0.1',
- 'user': self.DB_USER, 'password': self.DB_PASS}
- return args, kw
-
- def stopDB(self):
- import kinterbasdb
- conn = kinterbasdb.connect(database=self.DB_NAME,
- host='127.0.0.1', user=self.DB_USER,
- password=self.DB_PASS)
- conn.drop_database()
-
-def makeSQLTests(base, suffix, globals):
- """
- Make a test case for every db connector which can connect.
-
- @param base: Base class for test case. Additional base classes
- will be a DBConnector subclass and unittest.TestCase
- @param suffix: A suffix used to create test case names. Prefixes
- are defined in the DBConnector subclasses.
- """
- connectors = [GadflyConnector, SQLiteConnector, PyPgSQLConnector,
- PsycopgConnector, MySQLConnector, FirebirdConnector]
- for connclass in connectors:
- name = connclass.TEST_PREFIX + suffix
- import new
- klass = new.classobj(name, (connclass, base, unittest.TestCase), base.__dict__)
- globals[name] = klass
-
-# GadflyADBAPITestCase SQLiteADBAPITestCase PyPgSQLADBAPITestCase
-# PsycopgADBAPITestCase MySQLADBAPITestCase FirebirdADBAPITestCase
-makeSQLTests(ADBAPITestBase, 'ADBAPITestCase', globals())
-
-# GadflyReconnectTestCase SQLiteReconnectTestCase PyPgSQLReconnectTestCase
-# PsycopgReconnectTestCase MySQLReconnectTestCase FirebirdReconnectTestCase
-makeSQLTests(ReconnectTestBase, 'ReconnectTestCase', globals())
-
-
-
-class DeprecationTestCase(unittest.TestCase):
- """
- Test deprecations in twisted.enterprise.adbapi
- """
-
- def test_safe(self):
- """
- Test deprecation of twisted.enterprise.adbapi.safe()
- """
- result = self.callDeprecated(_unreleasedVersion,
- safe, "test'")
-
- # make sure safe still behaves like the original
- self.assertEqual(result, "test''")
« no previous file with comments | « third_party/twisted_8_1/twisted/test/test_abstract.py ('k') | third_party/twisted_8_1/twisted/test/test_amp.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698