| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_adbapi -*- | |
| 2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 | |
| 6 """ | |
| 7 An asynchronous mapping to U{DB-API 2.0<http://www.python.org/topics/database/Da
tabaseAPI-2.0.html>}. | |
| 8 """ | |
| 9 | |
| 10 from twisted.internet import defer, threads | |
| 11 from twisted.python import reflect, log | |
| 12 from twisted.python.deprecate import deprecated | |
| 13 from twisted.python.versions import Version | |
| 14 | |
| 15 | |
| 16 class ConnectionLost(Exception): | |
| 17 """This exception means that a db connection has been lost. | |
| 18 Client code may try again.""" | |
| 19 pass | |
| 20 | |
| 21 | |
| 22 class Connection(object): | |
| 23 """A wrapper for a DB-API connection instance. | |
| 24 | |
| 25 The wrapper passes almost everything to the wrapped connection and so has | |
| 26 the same API. However, the Connection knows about its pool and also | |
| 27 handle reconnecting should when the real connection dies. | |
| 28 """ | |
| 29 | |
| 30 def __init__(self, pool): | |
| 31 self._pool = pool | |
| 32 self._connection = None | |
| 33 self.reconnect() | |
| 34 | |
| 35 def close(self): | |
| 36 # The way adbapi works right now means that closing a connection is | |
| 37 # a really bad thing as it leaves a dead connection associated with | |
| 38 # a thread in the thread pool. | |
| 39 # Really, I think closing a pooled connection should return it to the | |
| 40 # pool but that's handled by the runWithConnection method already so, | |
| 41 # rather than upsetting anyone by raising an exception, let's ignore | |
| 42 # the request | |
| 43 pass | |
| 44 | |
| 45 def rollback(self): | |
| 46 if not self._pool.reconnect: | |
| 47 self._connection.rollback() | |
| 48 return | |
| 49 | |
| 50 try: | |
| 51 self._connection.rollback() | |
| 52 curs = self._connection.cursor() | |
| 53 curs.execute(self._pool.good_sql) | |
| 54 curs.close() | |
| 55 self._connection.commit() | |
| 56 return | |
| 57 except: | |
| 58 pass | |
| 59 | |
| 60 self._pool.disconnect(self._connection) | |
| 61 | |
| 62 if self._pool.noisy: | |
| 63 log.msg('Connection lost.') | |
| 64 | |
| 65 raise ConnectionLost() | |
| 66 | |
| 67 def reconnect(self): | |
| 68 if self._connection is not None: | |
| 69 self._pool.disconnect(self._connection) | |
| 70 self._connection = self._pool.connect() | |
| 71 | |
| 72 def __getattr__(self, name): | |
| 73 return getattr(self._connection, name) | |
| 74 | |
| 75 | |
| 76 class Transaction: | |
| 77 """A lightweight wrapper for a DB-API 'cursor' object. | |
| 78 | |
| 79 Relays attribute access to the DB cursor. That is, you can call | |
| 80 execute(), fetchall(), etc., and they will be called on the | |
| 81 underlying DB-API cursor object. Attributes will also be | |
| 82 retrieved from there. | |
| 83 """ | |
| 84 _cursor = None | |
| 85 | |
| 86 def __init__(self, pool, connection): | |
| 87 self._pool = pool | |
| 88 self._connection = connection | |
| 89 self.reopen() | |
| 90 | |
| 91 def close(self): | |
| 92 _cursor = self._cursor | |
| 93 self._cursor = None | |
| 94 _cursor.close() | |
| 95 | |
| 96 def reopen(self): | |
| 97 if self._cursor is not None: | |
| 98 self.close() | |
| 99 | |
| 100 try: | |
| 101 self._cursor = self._connection.cursor() | |
| 102 return | |
| 103 except: | |
| 104 if not self._pool.reconnect: | |
| 105 raise | |
| 106 | |
| 107 if self._pool.noisy: | |
| 108 log.msg('Connection lost, reconnecting') | |
| 109 | |
| 110 self.reconnect() | |
| 111 self._cursor = self._connection.cursor() | |
| 112 | |
| 113 def reconnect(self): | |
| 114 self._connection.reconnect() | |
| 115 self._cursor = None | |
| 116 | |
| 117 def __getattr__(self, name): | |
| 118 return getattr(self._cursor, name) | |
| 119 | |
| 120 | |
| 121 class ConnectionPool: | |
| 122 """I represent a pool of connections to a DB-API 2.0 compliant database. | |
| 123 """ | |
| 124 | |
| 125 CP_ARGS = "min max name noisy openfun reconnect good_sql".split() | |
| 126 | |
| 127 noisy = True # if true, generate informational log messages | |
| 128 min = 3 # minimum number of connections in pool | |
| 129 max = 5 # maximum number of connections in pool | |
| 130 name = None # Name to assign to thread pool for debugging | |
| 131 openfun = None # A function to call on new connections | |
| 132 reconnect = False # reconnect when connections fail | |
| 133 good_sql = 'select 1' # a query which should always succeed | |
| 134 | |
| 135 running = False # true when the pool is operating | |
| 136 | |
| 137 def __init__(self, dbapiName, *connargs, **connkw): | |
| 138 """Create a new ConnectionPool. | |
| 139 | |
| 140 Any positional or keyword arguments other than those documented here | |
| 141 are passed to the DB-API object when connecting. Use these arguments to | |
| 142 pass database names, usernames, passwords, etc. | |
| 143 | |
| 144 @param dbapiName: an import string to use to obtain a DB-API compatible | |
| 145 module (e.g. 'pyPgSQL.PgSQL') | |
| 146 | |
| 147 @param cp_min: the minimum number of connections in pool (default 3) | |
| 148 | |
| 149 @param cp_max: the maximum number of connections in pool (default 5) | |
| 150 | |
| 151 @param cp_noisy: generate informational log messages during operation | |
| 152 (default False) | |
| 153 | |
| 154 @param cp_openfun: a callback invoked after every connect() on the | |
| 155 underlying DB-API object. The callback is passed a | |
| 156 new DB-API connection object. This callback can | |
| 157 setup per-connection state such as charset, | |
| 158 timezone, etc. | |
| 159 | |
| 160 @param cp_reconnect: detect connections which have failed and reconnect | |
| 161 (default False). Failed connections may result in | |
| 162 ConnectionLost exceptions, which indicate the | |
| 163 query may need to be re-sent. | |
| 164 | |
| 165 @param cp_good_sql: an sql query which should always succeed and change | |
| 166 no state (default 'select 1') | |
| 167 """ | |
| 168 | |
| 169 self.dbapiName = dbapiName | |
| 170 self.dbapi = reflect.namedModule(dbapiName) | |
| 171 | |
| 172 if getattr(self.dbapi, 'apilevel', None) != '2.0': | |
| 173 log.msg('DB API module not DB API 2.0 compliant.') | |
| 174 | |
| 175 if getattr(self.dbapi, 'threadsafety', 0) < 1: | |
| 176 log.msg('DB API module not sufficiently thread-safe.') | |
| 177 | |
| 178 self.connargs = connargs | |
| 179 self.connkw = connkw | |
| 180 | |
| 181 for arg in self.CP_ARGS: | |
| 182 cp_arg = 'cp_%s' % arg | |
| 183 if connkw.has_key(cp_arg): | |
| 184 setattr(self, arg, connkw[cp_arg]) | |
| 185 del connkw[cp_arg] | |
| 186 | |
| 187 self.min = min(self.min, self.max) | |
| 188 self.max = max(self.min, self.max) | |
| 189 | |
| 190 self.connections = {} # all connections, hashed on thread id | |
| 191 | |
| 192 # these are optional so import them here | |
| 193 from twisted.python import threadpool | |
| 194 import thread | |
| 195 | |
| 196 self.threadID = thread.get_ident | |
| 197 self.threadpool = threadpool.ThreadPool(self.min, self.max) | |
| 198 | |
| 199 from twisted.internet import reactor | |
| 200 self.startID = reactor.callWhenRunning(self._start) | |
| 201 | |
| 202 def _start(self): | |
| 203 self.startID = None | |
| 204 return self.start() | |
| 205 | |
| 206 def start(self): | |
| 207 """Start the connection pool. | |
| 208 | |
| 209 If you are using the reactor normally, this function does *not* | |
| 210 need to be called. | |
| 211 """ | |
| 212 | |
| 213 if not self.running: | |
| 214 from twisted.internet import reactor | |
| 215 self.threadpool.start() | |
| 216 self.shutdownID = reactor.addSystemEventTrigger('during', | |
| 217 'shutdown', | |
| 218 self.finalClose) | |
| 219 self.running = True | |
| 220 | |
| 221 def runWithConnection(self, func, *args, **kw): | |
| 222 return self._deferToThread(self._runWithConnection, | |
| 223 func, *args, **kw) | |
| 224 | |
| 225 def _runWithConnection(self, func, *args, **kw): | |
| 226 conn = Connection(self) | |
| 227 try: | |
| 228 result = func(conn, *args, **kw) | |
| 229 conn.commit() | |
| 230 return result | |
| 231 except: | |
| 232 conn.rollback() | |
| 233 raise | |
| 234 | |
| 235 def runInteraction(self, interaction, *args, **kw): | |
| 236 """Interact with the database and return the result. | |
| 237 | |
| 238 The 'interaction' is a callable object which will be executed | |
| 239 in a thread using a pooled connection. It will be passed an | |
| 240 L{Transaction} object as an argument (whose interface is | |
| 241 identical to that of the database cursor for your DB-API | |
| 242 module of choice), and its results will be returned as a | |
| 243 Deferred. If running the method raises an exception, the | |
| 244 transaction will be rolled back. If the method returns a | |
| 245 value, the transaction will be committed. | |
| 246 | |
| 247 NOTE that the function you pass is *not* run in the main | |
| 248 thread: you may have to worry about thread-safety in the | |
| 249 function you pass to this if it tries to use non-local | |
| 250 objects. | |
| 251 | |
| 252 @param interaction: a callable object whose first argument is | |
| 253 L{adbapi.Transaction}. *args,**kw will be passed as | |
| 254 additional arguments. | |
| 255 | |
| 256 @return: a Deferred which will fire the return value of | |
| 257 'interaction(Transaction(...))', or a Failure. | |
| 258 """ | |
| 259 | |
| 260 return self._deferToThread(self._runInteraction, | |
| 261 interaction, *args, **kw) | |
| 262 | |
| 263 def runQuery(self, *args, **kw): | |
| 264 """Execute an SQL query and return the result. | |
| 265 | |
| 266 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). | |
| 267 The exact nature of the arguments will depend on the specific flavor | |
| 268 of DB-API being used, but the first argument in *args be an SQL | |
| 269 statement. The result of a subsequent cursor.fetchall() will be | |
| 270 fired to the Deferred which is returned. If either the 'execute' or | |
| 271 'fetchall' methods raise an exception, the transaction will be rolled | |
| 272 back and a Failure returned. | |
| 273 | |
| 274 The *args and **kw arguments will be passed to the DB-API cursor's | |
| 275 'execute' method. | |
| 276 | |
| 277 @return: a Deferred which will fire the return value of a DB-API | |
| 278 cursor's 'fetchall' method, or a Failure. | |
| 279 """ | |
| 280 | |
| 281 return self.runInteraction(self._runQuery, *args, **kw) | |
| 282 | |
| 283 def runOperation(self, *args, **kw): | |
| 284 """Execute an SQL query and return None. | |
| 285 | |
| 286 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). | |
| 287 The exact nature of the arguments will depend on the specific flavor | |
| 288 of DB-API being used, but the first argument in *args will be an SQL | |
| 289 statement. This method will not attempt to fetch any results from the | |
| 290 query and is thus suitable for INSERT, DELETE, and other SQL statements | |
| 291 which do not return values. If the 'execute' method raises an | |
| 292 exception, the transaction will be rolled back and a Failure returned. | |
| 293 | |
| 294 The args and kw arguments will be passed to the DB-API cursor's | |
| 295 'execute' method. | |
| 296 | |
| 297 return: a Deferred which will fire None or a Failure. | |
| 298 """ | |
| 299 | |
| 300 return self.runInteraction(self._runOperation, *args, **kw) | |
| 301 | |
| 302 def close(self): | |
| 303 """Close all pool connections and shutdown the pool.""" | |
| 304 | |
| 305 from twisted.internet import reactor | |
| 306 if self.shutdownID: | |
| 307 reactor.removeSystemEventTrigger(self.shutdownID) | |
| 308 self.shutdownID = None | |
| 309 if self.startID: | |
| 310 reactor.removeSystemEventTrigger(self.startID) | |
| 311 self.startID = None | |
| 312 self.finalClose() | |
| 313 | |
| 314 def finalClose(self): | |
| 315 """This should only be called by the shutdown trigger.""" | |
| 316 | |
| 317 self.shutdownID = None | |
| 318 self.threadpool.stop() | |
| 319 self.running = False | |
| 320 for conn in self.connections.values(): | |
| 321 self._close(conn) | |
| 322 self.connections.clear() | |
| 323 | |
| 324 def connect(self): | |
| 325 """Return a database connection when one becomes available. | |
| 326 | |
| 327 This method blocks and should be run in a thread from the internal | |
| 328 threadpool. Don't call this method directly from non-threaded code. | |
| 329 Using this method outside the external threadpool may exceed the | |
| 330 maximum number of connections in the pool. | |
| 331 | |
| 332 @return: a database connection from the pool. | |
| 333 """ | |
| 334 | |
| 335 tid = self.threadID() | |
| 336 conn = self.connections.get(tid) | |
| 337 if conn is None: | |
| 338 if self.noisy: | |
| 339 log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, | |
| 340 self.connargs or '', | |
| 341 self.connkw or '')) | |
| 342 conn = self.dbapi.connect(*self.connargs, **self.connkw) | |
| 343 if self.openfun != None: | |
| 344 self.openfun(conn) | |
| 345 self.connections[tid] = conn | |
| 346 return conn | |
| 347 | |
| 348 def disconnect(self, conn): | |
| 349 """Disconnect a database connection associated with this pool. | |
| 350 | |
| 351 Note: This function should only be used by the same thread which | |
| 352 called connect(). As with connect(), this function is not used | |
| 353 in normal non-threaded twisted code. | |
| 354 """ | |
| 355 | |
| 356 tid = self.threadID() | |
| 357 if conn is not self.connections.get(tid): | |
| 358 raise Exception("wrong connection for thread") | |
| 359 if conn is not None: | |
| 360 self._close(conn) | |
| 361 del self.connections[tid] | |
| 362 | |
| 363 def _close(self, conn): | |
| 364 if self.noisy: | |
| 365 log.msg('adbapi closing: %s' % (self.dbapiName,)) | |
| 366 try: | |
| 367 conn.close() | |
| 368 except: | |
| 369 pass | |
| 370 | |
| 371 def _runInteraction(self, interaction, *args, **kw): | |
| 372 conn = Connection(self) | |
| 373 trans = Transaction(self, conn) | |
| 374 try: | |
| 375 result = interaction(trans, *args, **kw) | |
| 376 trans.close() | |
| 377 conn.commit() | |
| 378 return result | |
| 379 except: | |
| 380 conn.rollback() | |
| 381 raise | |
| 382 | |
| 383 def _runQuery(self, trans, *args, **kw): | |
| 384 trans.execute(*args, **kw) | |
| 385 return trans.fetchall() | |
| 386 | |
| 387 def _runOperation(self, trans, *args, **kw): | |
| 388 trans.execute(*args, **kw) | |
| 389 | |
| 390 def __getstate__(self): | |
| 391 return {'dbapiName': self.dbapiName, | |
| 392 'min': self.min, | |
| 393 'max': self.max, | |
| 394 'noisy': self.noisy, | |
| 395 'reconnect': self.reconnect, | |
| 396 'good_sql': self.good_sql, | |
| 397 'connargs': self.connargs, | |
| 398 'connkw': self.connkw} | |
| 399 | |
| 400 def __setstate__(self, state): | |
| 401 self.__dict__ = state | |
| 402 self.__init__(self.dbapiName, *self.connargs, **self.connkw) | |
| 403 | |
| 404 def _deferToThread(self, f, *args, **kwargs): | |
| 405 """Internal function. | |
| 406 | |
| 407 Call f in one of the connection pool's threads. | |
| 408 """ | |
| 409 | |
| 410 d = defer.Deferred() | |
| 411 self.threadpool.callInThread(threads._putResultInDeferred, | |
| 412 d, f, args, kwargs) | |
| 413 return d | |
| 414 | |
| 415 | |
| 416 | |
| 417 # Common deprecation decorator used for all deprecations. | |
| 418 _unreleasedVersion = Version("Twisted", 2, 6, 0) | |
| 419 _unreleasedDeprecation = deprecated(_unreleasedVersion) | |
| 420 | |
| 421 | |
| 422 | |
| 423 def _safe(text): | |
| 424 """ | |
| 425 Something really stupid that replaces quotes with escaped quotes. | |
| 426 """ | |
| 427 return text.replace("'", "''").replace("\\", "\\\\") | |
| 428 | |
| 429 | |
| 430 | |
| 431 def safe(text): | |
| 432 """ | |
| 433 Make a string safe to include in an SQL statement. | |
| 434 """ | |
| 435 return _safe(text) | |
| 436 | |
| 437 safe = _unreleasedDeprecation(safe) | |
| 438 | |
| 439 | |
| 440 __all__ = ['Transaction', 'ConnectionPool', 'safe'] | |
| OLD | NEW |