| OLD | NEW |
| 1 # This file is part of Buildbot. Buildbot is free software: you can | 1 # This file is part of Buildbot. Buildbot is free software: you can |
| 2 # redistribute it and/or modify it under the terms of the GNU General Public | 2 # redistribute it and/or modify it under the terms of the GNU General Public |
| 3 # License as published by the Free Software Foundation, version 2. | 3 # License as published by the Free Software Foundation, version 2. |
| 4 # | 4 # |
| 5 # This program is distributed in the hope that it will be useful, but WITHOUT | 5 # This program is distributed in the hope that it will be useful, but WITHOUT |
| 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| 8 # details. | 8 # details. |
| 9 # | 9 # |
| 10 # You should have received a copy of the GNU General Public License along with | 10 # You should have received a copy of the GNU General Public License along with |
| 11 # this program; if not, write to the Free Software Foundation, Inc., 51 | 11 # this program; if not, write to the Free Software Foundation, Inc., 51 |
| 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| 13 # | 13 # |
| 14 # Copyright Buildbot Team Members | 14 # Copyright Buildbot Team Members |
| 15 | 15 |
| 16 import os | 16 import os |
| 17 import sqlalchemy as sa | 17 import sqlalchemy as sa |
| 18 import twisted | 18 import twisted |
| 19 from twisted.internet import reactor, threads, defer | 19 from twisted.internet import reactor, threads, defer |
| 20 from twisted.python import threadpool, failure, versions, log | 20 from twisted.python import threadpool, failure, versions, log |
| 21 | 21 |
| 22 | 22 |
| 23 class DBThreadPool(threadpool.ThreadPool): | 23 class DBThreadPool(threadpool.ThreadPool): |
| 24 """ | |
| 25 A pool of threads ready and waiting to execute queries. | |
| 26 | |
| 27 If the engine has an C{optimal_thread_pool_size} attribute, then the | |
| 28 maxthreads of the thread pool will be set to that value. This is most | |
| 29 useful for SQLite in-memory connections, where exactly one connection | |
| 30 (and thus thread) should be used. | |
| 31 """ | |
| 32 | 24 |
| 33 running = False | 25 running = False |
| 34 | 26 |
| 35 # Some versions of SQLite incorrectly cache metadata about which tables are | 27 # Some versions of SQLite incorrectly cache metadata about which tables are |
| 36 # and are not present on a per-connection basis. This cache can be flushed | 28 # and are not present on a per-connection basis. This cache can be flushed |
| 37 # by querying the sqlite_master table. We currently assume all versions of | 29 # by querying the sqlite_master table. We currently assume all versions of |
| 38 # SQLite have this bug, although it has only been observed in 3.4.2. A | 30 # SQLite have this bug, although it has only been observed in 3.4.2. A |
| 39 # dynamic check for this bug would be more appropriate. This is documented | 31 # dynamic check for this bug would be more appropriate. This is documented |
| 40 # in bug #1810. | 32 # in bug #1810. |
| 41 __broken_sqlite = False | 33 __broken_sqlite = False |
| 42 | 34 |
| 43 def __init__(self, engine): | 35 def __init__(self, engine): |
| 44 pool_size = 5 | 36 pool_size = 5 |
| 37 |
| 38 # If the engine has an C{optimal_thread_pool_size} attribute, then the |
| 39 # maxthreads of the thread pool will be set to that value. This is |
| 40 # most useful for SQLite in-memory connections, where exactly one |
| 41 # connection (and thus thread) should be used. |
| 45 if hasattr(engine, 'optimal_thread_pool_size'): | 42 if hasattr(engine, 'optimal_thread_pool_size'): |
| 46 pool_size = engine.optimal_thread_pool_size | 43 pool_size = engine.optimal_thread_pool_size |
| 44 |
| 47 threadpool.ThreadPool.__init__(self, | 45 threadpool.ThreadPool.__init__(self, |
| 48 minthreads=1, | 46 minthreads=1, |
| 49 maxthreads=pool_size, | 47 maxthreads=pool_size, |
| 50 name='DBThreadPool') | 48 name='DBThreadPool') |
| 51 self.engine = engine | 49 self.engine = engine |
| 52 if engine.dialect.name == 'sqlite': | 50 if engine.dialect.name == 'sqlite': |
| 53 vers = self.get_sqlite_version() | 51 vers = self.get_sqlite_version() |
| 54 log.msg("Using SQLite Version %s" % (vers,)) | 52 log.msg("Using SQLite Version %s" % (vers,)) |
| 55 if vers < (3,7): | 53 if vers < (3,7): |
| 56 log.msg("NOTE: this old version of SQLite does not support " | 54 log.msg("NOTE: this old version of SQLite does not support " |
| (...skipping 25 matching lines...) Expand all Loading... |
| 82 def shutdown(self): | 80 def shutdown(self): |
| 83 """Manually stop the pool. This is only necessary from tests, as the | 81 """Manually stop the pool. This is only necessary from tests, as the |
| 84 pool will stop itself when the reactor stops under normal | 82 pool will stop itself when the reactor stops under normal |
| 85 circumstances.""" | 83 circumstances.""" |
| 86 if not self._stop_evt: | 84 if not self._stop_evt: |
| 87 return # pool is already stopped | 85 return # pool is already stopped |
| 88 reactor.removeSystemEventTrigger(self._stop_evt) | 86 reactor.removeSystemEventTrigger(self._stop_evt) |
| 89 self._stop() | 87 self._stop() |
| 90 | 88 |
| 91 def do(self, callable, *args, **kwargs): | 89 def do(self, callable, *args, **kwargs): |
| 92 """ | |
| 93 Call C{callable} in a thread, with a Connection as first argument. | |
| 94 Returns a deferred that will indicate the results of the callable. | |
| 95 | |
| 96 Note: do not return any SQLAlchemy objects via this deferred! | |
| 97 """ | |
| 98 def thd(): | 90 def thd(): |
| 99 conn = self.engine.contextual_connect() | 91 conn = self.engine.contextual_connect() |
| 100 if self.__broken_sqlite: # see bug #1810 | 92 if self.__broken_sqlite: # see bug #1810 |
| 101 conn.execute("select * from sqlite_master") | 93 conn.execute("select * from sqlite_master") |
| 102 try: | 94 try: |
| 103 rv = callable(conn, *args, **kwargs) | 95 rv = callable(conn, *args, **kwargs) |
| 104 assert not isinstance(rv, sa.engine.ResultProxy), \ | 96 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 105 "do not return ResultProxy objects!" | 97 "do not return ResultProxy objects!" |
| 106 finally: | 98 finally: |
| 107 conn.close() | 99 conn.close() |
| 108 return rv | 100 return rv |
| 109 return threads.deferToThreadPool(reactor, self, thd) | 101 return threads.deferToThreadPool(reactor, self, thd) |
| 110 | 102 |
| 111 def do_with_engine(self, callable, *args, **kwargs): | 103 def do_with_engine(self, callable, *args, **kwargs): |
| 112 """ | |
| 113 Like L{do}, but with an SQLAlchemy Engine as the first argument. This | |
| 114 is only used for schema manipulation, and is not used at master | |
| 115 runtime. | |
| 116 """ | |
| 117 def thd(): | 104 def thd(): |
| 118 if self.__broken_sqlite: # see bug #1810 | 105 if self.__broken_sqlite: # see bug #1810 |
| 119 self.engine.execute("select * from sqlite_master") | 106 self.engine.execute("select * from sqlite_master") |
| 120 rv = callable(self.engine, *args, **kwargs) | 107 rv = callable(self.engine, *args, **kwargs) |
| 121 assert not isinstance(rv, sa.engine.ResultProxy), \ | 108 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 122 "do not return ResultProxy objects!" | 109 "do not return ResultProxy objects!" |
| 123 return rv | 110 return rv |
| 124 return threads.deferToThreadPool(reactor, self, thd) | 111 return threads.deferToThreadPool(reactor, self, thd) |
| 125 | 112 |
| 126 # older implementations for twisted < 0.8.2, which does not have | 113 # older implementations for twisted < 0.8.2, which does not have |
| (...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 215 except: | 202 except: |
| 216 return (0,) | 203 return (0,) |
| 217 | 204 |
| 218 if vers_row: | 205 if vers_row: |
| 219 try: | 206 try: |
| 220 return tuple(map(int, vers_row[0].split('.'))) | 207 return tuple(map(int, vers_row[0].split('.'))) |
| 221 except (TypeError, ValueError): | 208 except (TypeError, ValueError): |
| 222 return (0,) | 209 return (0,) |
| 223 else: | 210 else: |
| 224 return (0,) | 211 return (0,) |
| OLD | NEW |