OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_adbapi -*- | |
2 # Copyright (c) 2001-2008 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 | |
6 """ | |
7 An asynchronous mapping to U{DB-API 2.0<http://www.python.org/topics/database/Da
tabaseAPI-2.0.html>}. | |
8 """ | |
9 | |
10 from twisted.internet import defer, threads | |
11 from twisted.python import reflect, log | |
12 from twisted.python.deprecate import deprecated | |
13 from twisted.python.versions import Version | |
14 | |
15 | |
16 class ConnectionLost(Exception): | |
17 """This exception means that a db connection has been lost. | |
18 Client code may try again.""" | |
19 pass | |
20 | |
21 | |
22 class Connection(object): | |
23 """A wrapper for a DB-API connection instance. | |
24 | |
25 The wrapper passes almost everything to the wrapped connection and so has | |
26 the same API. However, the Connection knows about its pool and also | |
27 handle reconnecting should when the real connection dies. | |
28 """ | |
29 | |
30 def __init__(self, pool): | |
31 self._pool = pool | |
32 self._connection = None | |
33 self.reconnect() | |
34 | |
35 def close(self): | |
36 # The way adbapi works right now means that closing a connection is | |
37 # a really bad thing as it leaves a dead connection associated with | |
38 # a thread in the thread pool. | |
39 # Really, I think closing a pooled connection should return it to the | |
40 # pool but that's handled by the runWithConnection method already so, | |
41 # rather than upsetting anyone by raising an exception, let's ignore | |
42 # the request | |
43 pass | |
44 | |
45 def rollback(self): | |
46 if not self._pool.reconnect: | |
47 self._connection.rollback() | |
48 return | |
49 | |
50 try: | |
51 self._connection.rollback() | |
52 curs = self._connection.cursor() | |
53 curs.execute(self._pool.good_sql) | |
54 curs.close() | |
55 self._connection.commit() | |
56 return | |
57 except: | |
58 pass | |
59 | |
60 self._pool.disconnect(self._connection) | |
61 | |
62 if self._pool.noisy: | |
63 log.msg('Connection lost.') | |
64 | |
65 raise ConnectionLost() | |
66 | |
67 def reconnect(self): | |
68 if self._connection is not None: | |
69 self._pool.disconnect(self._connection) | |
70 self._connection = self._pool.connect() | |
71 | |
72 def __getattr__(self, name): | |
73 return getattr(self._connection, name) | |
74 | |
75 | |
76 class Transaction: | |
77 """A lightweight wrapper for a DB-API 'cursor' object. | |
78 | |
79 Relays attribute access to the DB cursor. That is, you can call | |
80 execute(), fetchall(), etc., and they will be called on the | |
81 underlying DB-API cursor object. Attributes will also be | |
82 retrieved from there. | |
83 """ | |
84 _cursor = None | |
85 | |
86 def __init__(self, pool, connection): | |
87 self._pool = pool | |
88 self._connection = connection | |
89 self.reopen() | |
90 | |
91 def close(self): | |
92 _cursor = self._cursor | |
93 self._cursor = None | |
94 _cursor.close() | |
95 | |
96 def reopen(self): | |
97 if self._cursor is not None: | |
98 self.close() | |
99 | |
100 try: | |
101 self._cursor = self._connection.cursor() | |
102 return | |
103 except: | |
104 if not self._pool.reconnect: | |
105 raise | |
106 | |
107 if self._pool.noisy: | |
108 log.msg('Connection lost, reconnecting') | |
109 | |
110 self.reconnect() | |
111 self._cursor = self._connection.cursor() | |
112 | |
113 def reconnect(self): | |
114 self._connection.reconnect() | |
115 self._cursor = None | |
116 | |
117 def __getattr__(self, name): | |
118 return getattr(self._cursor, name) | |
119 | |
120 | |
121 class ConnectionPool: | |
122 """I represent a pool of connections to a DB-API 2.0 compliant database. | |
123 """ | |
124 | |
125 CP_ARGS = "min max name noisy openfun reconnect good_sql".split() | |
126 | |
127 noisy = True # if true, generate informational log messages | |
128 min = 3 # minimum number of connections in pool | |
129 max = 5 # maximum number of connections in pool | |
130 name = None # Name to assign to thread pool for debugging | |
131 openfun = None # A function to call on new connections | |
132 reconnect = False # reconnect when connections fail | |
133 good_sql = 'select 1' # a query which should always succeed | |
134 | |
135 running = False # true when the pool is operating | |
136 | |
137 def __init__(self, dbapiName, *connargs, **connkw): | |
138 """Create a new ConnectionPool. | |
139 | |
140 Any positional or keyword arguments other than those documented here | |
141 are passed to the DB-API object when connecting. Use these arguments to | |
142 pass database names, usernames, passwords, etc. | |
143 | |
144 @param dbapiName: an import string to use to obtain a DB-API compatible | |
145 module (e.g. 'pyPgSQL.PgSQL') | |
146 | |
147 @param cp_min: the minimum number of connections in pool (default 3) | |
148 | |
149 @param cp_max: the maximum number of connections in pool (default 5) | |
150 | |
151 @param cp_noisy: generate informational log messages during operation | |
152 (default False) | |
153 | |
154 @param cp_openfun: a callback invoked after every connect() on the | |
155 underlying DB-API object. The callback is passed a | |
156 new DB-API connection object. This callback can | |
157 setup per-connection state such as charset, | |
158 timezone, etc. | |
159 | |
160 @param cp_reconnect: detect connections which have failed and reconnect | |
161 (default False). Failed connections may result in | |
162 ConnectionLost exceptions, which indicate the | |
163 query may need to be re-sent. | |
164 | |
165 @param cp_good_sql: an sql query which should always succeed and change | |
166 no state (default 'select 1') | |
167 """ | |
168 | |
169 self.dbapiName = dbapiName | |
170 self.dbapi = reflect.namedModule(dbapiName) | |
171 | |
172 if getattr(self.dbapi, 'apilevel', None) != '2.0': | |
173 log.msg('DB API module not DB API 2.0 compliant.') | |
174 | |
175 if getattr(self.dbapi, 'threadsafety', 0) < 1: | |
176 log.msg('DB API module not sufficiently thread-safe.') | |
177 | |
178 self.connargs = connargs | |
179 self.connkw = connkw | |
180 | |
181 for arg in self.CP_ARGS: | |
182 cp_arg = 'cp_%s' % arg | |
183 if connkw.has_key(cp_arg): | |
184 setattr(self, arg, connkw[cp_arg]) | |
185 del connkw[cp_arg] | |
186 | |
187 self.min = min(self.min, self.max) | |
188 self.max = max(self.min, self.max) | |
189 | |
190 self.connections = {} # all connections, hashed on thread id | |
191 | |
192 # these are optional so import them here | |
193 from twisted.python import threadpool | |
194 import thread | |
195 | |
196 self.threadID = thread.get_ident | |
197 self.threadpool = threadpool.ThreadPool(self.min, self.max) | |
198 | |
199 from twisted.internet import reactor | |
200 self.startID = reactor.callWhenRunning(self._start) | |
201 | |
202 def _start(self): | |
203 self.startID = None | |
204 return self.start() | |
205 | |
206 def start(self): | |
207 """Start the connection pool. | |
208 | |
209 If you are using the reactor normally, this function does *not* | |
210 need to be called. | |
211 """ | |
212 | |
213 if not self.running: | |
214 from twisted.internet import reactor | |
215 self.threadpool.start() | |
216 self.shutdownID = reactor.addSystemEventTrigger('during', | |
217 'shutdown', | |
218 self.finalClose) | |
219 self.running = True | |
220 | |
221 def runWithConnection(self, func, *args, **kw): | |
222 return self._deferToThread(self._runWithConnection, | |
223 func, *args, **kw) | |
224 | |
225 def _runWithConnection(self, func, *args, **kw): | |
226 conn = Connection(self) | |
227 try: | |
228 result = func(conn, *args, **kw) | |
229 conn.commit() | |
230 return result | |
231 except: | |
232 conn.rollback() | |
233 raise | |
234 | |
235 def runInteraction(self, interaction, *args, **kw): | |
236 """Interact with the database and return the result. | |
237 | |
238 The 'interaction' is a callable object which will be executed | |
239 in a thread using a pooled connection. It will be passed an | |
240 L{Transaction} object as an argument (whose interface is | |
241 identical to that of the database cursor for your DB-API | |
242 module of choice), and its results will be returned as a | |
243 Deferred. If running the method raises an exception, the | |
244 transaction will be rolled back. If the method returns a | |
245 value, the transaction will be committed. | |
246 | |
247 NOTE that the function you pass is *not* run in the main | |
248 thread: you may have to worry about thread-safety in the | |
249 function you pass to this if it tries to use non-local | |
250 objects. | |
251 | |
252 @param interaction: a callable object whose first argument is | |
253 L{adbapi.Transaction}. *args,**kw will be passed as | |
254 additional arguments. | |
255 | |
256 @return: a Deferred which will fire the return value of | |
257 'interaction(Transaction(...))', or a Failure. | |
258 """ | |
259 | |
260 return self._deferToThread(self._runInteraction, | |
261 interaction, *args, **kw) | |
262 | |
263 def runQuery(self, *args, **kw): | |
264 """Execute an SQL query and return the result. | |
265 | |
266 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). | |
267 The exact nature of the arguments will depend on the specific flavor | |
268 of DB-API being used, but the first argument in *args be an SQL | |
269 statement. The result of a subsequent cursor.fetchall() will be | |
270 fired to the Deferred which is returned. If either the 'execute' or | |
271 'fetchall' methods raise an exception, the transaction will be rolled | |
272 back and a Failure returned. | |
273 | |
274 The *args and **kw arguments will be passed to the DB-API cursor's | |
275 'execute' method. | |
276 | |
277 @return: a Deferred which will fire the return value of a DB-API | |
278 cursor's 'fetchall' method, or a Failure. | |
279 """ | |
280 | |
281 return self.runInteraction(self._runQuery, *args, **kw) | |
282 | |
283 def runOperation(self, *args, **kw): | |
284 """Execute an SQL query and return None. | |
285 | |
286 A DB-API cursor will will be invoked with cursor.execute(*args, **kw). | |
287 The exact nature of the arguments will depend on the specific flavor | |
288 of DB-API being used, but the first argument in *args will be an SQL | |
289 statement. This method will not attempt to fetch any results from the | |
290 query and is thus suitable for INSERT, DELETE, and other SQL statements | |
291 which do not return values. If the 'execute' method raises an | |
292 exception, the transaction will be rolled back and a Failure returned. | |
293 | |
294 The args and kw arguments will be passed to the DB-API cursor's | |
295 'execute' method. | |
296 | |
297 return: a Deferred which will fire None or a Failure. | |
298 """ | |
299 | |
300 return self.runInteraction(self._runOperation, *args, **kw) | |
301 | |
302 def close(self): | |
303 """Close all pool connections and shutdown the pool.""" | |
304 | |
305 from twisted.internet import reactor | |
306 if self.shutdownID: | |
307 reactor.removeSystemEventTrigger(self.shutdownID) | |
308 self.shutdownID = None | |
309 if self.startID: | |
310 reactor.removeSystemEventTrigger(self.startID) | |
311 self.startID = None | |
312 self.finalClose() | |
313 | |
314 def finalClose(self): | |
315 """This should only be called by the shutdown trigger.""" | |
316 | |
317 self.shutdownID = None | |
318 self.threadpool.stop() | |
319 self.running = False | |
320 for conn in self.connections.values(): | |
321 self._close(conn) | |
322 self.connections.clear() | |
323 | |
324 def connect(self): | |
325 """Return a database connection when one becomes available. | |
326 | |
327 This method blocks and should be run in a thread from the internal | |
328 threadpool. Don't call this method directly from non-threaded code. | |
329 Using this method outside the external threadpool may exceed the | |
330 maximum number of connections in the pool. | |
331 | |
332 @return: a database connection from the pool. | |
333 """ | |
334 | |
335 tid = self.threadID() | |
336 conn = self.connections.get(tid) | |
337 if conn is None: | |
338 if self.noisy: | |
339 log.msg('adbapi connecting: %s %s%s' % (self.dbapiName, | |
340 self.connargs or '', | |
341 self.connkw or '')) | |
342 conn = self.dbapi.connect(*self.connargs, **self.connkw) | |
343 if self.openfun != None: | |
344 self.openfun(conn) | |
345 self.connections[tid] = conn | |
346 return conn | |
347 | |
348 def disconnect(self, conn): | |
349 """Disconnect a database connection associated with this pool. | |
350 | |
351 Note: This function should only be used by the same thread which | |
352 called connect(). As with connect(), this function is not used | |
353 in normal non-threaded twisted code. | |
354 """ | |
355 | |
356 tid = self.threadID() | |
357 if conn is not self.connections.get(tid): | |
358 raise Exception("wrong connection for thread") | |
359 if conn is not None: | |
360 self._close(conn) | |
361 del self.connections[tid] | |
362 | |
363 def _close(self, conn): | |
364 if self.noisy: | |
365 log.msg('adbapi closing: %s' % (self.dbapiName,)) | |
366 try: | |
367 conn.close() | |
368 except: | |
369 pass | |
370 | |
371 def _runInteraction(self, interaction, *args, **kw): | |
372 conn = Connection(self) | |
373 trans = Transaction(self, conn) | |
374 try: | |
375 result = interaction(trans, *args, **kw) | |
376 trans.close() | |
377 conn.commit() | |
378 return result | |
379 except: | |
380 conn.rollback() | |
381 raise | |
382 | |
383 def _runQuery(self, trans, *args, **kw): | |
384 trans.execute(*args, **kw) | |
385 return trans.fetchall() | |
386 | |
387 def _runOperation(self, trans, *args, **kw): | |
388 trans.execute(*args, **kw) | |
389 | |
390 def __getstate__(self): | |
391 return {'dbapiName': self.dbapiName, | |
392 'min': self.min, | |
393 'max': self.max, | |
394 'noisy': self.noisy, | |
395 'reconnect': self.reconnect, | |
396 'good_sql': self.good_sql, | |
397 'connargs': self.connargs, | |
398 'connkw': self.connkw} | |
399 | |
400 def __setstate__(self, state): | |
401 self.__dict__ = state | |
402 self.__init__(self.dbapiName, *self.connargs, **self.connkw) | |
403 | |
404 def _deferToThread(self, f, *args, **kwargs): | |
405 """Internal function. | |
406 | |
407 Call f in one of the connection pool's threads. | |
408 """ | |
409 | |
410 d = defer.Deferred() | |
411 self.threadpool.callInThread(threads._putResultInDeferred, | |
412 d, f, args, kwargs) | |
413 return d | |
414 | |
415 | |
416 | |
417 # Common deprecation decorator used for all deprecations. | |
418 _unreleasedVersion = Version("Twisted", 2, 6, 0) | |
419 _unreleasedDeprecation = deprecated(_unreleasedVersion) | |
420 | |
421 | |
422 | |
423 def _safe(text): | |
424 """ | |
425 Something really stupid that replaces quotes with escaped quotes. | |
426 """ | |
427 return text.replace("'", "''").replace("\\", "\\\\") | |
428 | |
429 | |
430 | |
431 def safe(text): | |
432 """ | |
433 Make a string safe to include in an SQL statement. | |
434 """ | |
435 return _safe(text) | |
436 | |
437 safe = _unreleasedDeprecation(safe) | |
438 | |
439 | |
440 __all__ = ['Transaction', 'ConnectionPool', 'safe'] | |
OLD | NEW |