OLD | NEW |
1 # This file is part of Buildbot. Buildbot is free software: you can | 1 # This file is part of Buildbot. Buildbot is free software: you can |
2 # redistribute it and/or modify it under the terms of the GNU General Public | 2 # redistribute it and/or modify it under the terms of the GNU General Public |
3 # License as published by the Free Software Foundation, version 2. | 3 # License as published by the Free Software Foundation, version 2. |
4 # | 4 # |
5 # This program is distributed in the hope that it will be useful, but WITHOUT | 5 # This program is distributed in the hope that it will be useful, but WITHOUT |
6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
8 # details. | 8 # details. |
9 # | 9 # |
10 # You should have received a copy of the GNU General Public License along with | 10 # You should have received a copy of the GNU General Public License along with |
11 # this program; if not, write to the Free Software Foundation, Inc., 51 | 11 # this program; if not, write to the Free Software Foundation, Inc., 51 |
12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
13 # | 13 # |
14 # Copyright Buildbot Team Members | 14 # Copyright Buildbot Team Members |
15 | 15 |
16 import os | 16 import os |
17 import sqlalchemy as sa | 17 import sqlalchemy as sa |
18 import twisted | 18 import twisted |
19 from twisted.internet import reactor, threads, defer | 19 from twisted.internet import reactor, threads, defer |
20 from twisted.python import threadpool, failure, versions, log | 20 from twisted.python import threadpool, failure, versions, log |
21 | 21 |
22 | 22 |
23 class DBThreadPool(threadpool.ThreadPool): | 23 class DBThreadPool(threadpool.ThreadPool): |
| 24 """ |
| 25 A pool of threads ready and waiting to execute queries. |
| 26 |
| 27 If the engine has an C{optimal_thread_pool_size} attribute, then the |
| 28 maxthreads of the thread pool will be set to that value. This is most |
| 29 useful for SQLite in-memory connections, where exactly one connection |
| 30 (and thus thread) should be used. |
| 31 """ |
24 | 32 |
25 running = False | 33 running = False |
26 | 34 |
27 # Some versions of SQLite incorrectly cache metadata about which tables are | 35 # Some versions of SQLite incorrectly cache metadata about which tables are |
28 # and are not present on a per-connection basis. This cache can be flushed | 36 # and are not present on a per-connection basis. This cache can be flushed |
29 # by querying the sqlite_master table. We currently assume all versions of | 37 # by querying the sqlite_master table. We currently assume all versions of |
30 # SQLite have this bug, although it has only been observed in 3.4.2. A | 38 # SQLite have this bug, although it has only been observed in 3.4.2. A |
31 # dynamic check for this bug would be more appropriate. This is documented | 39 # dynamic check for this bug would be more appropriate. This is documented |
32 # in bug #1810. | 40 # in bug #1810. |
33 __broken_sqlite = False | 41 __broken_sqlite = False |
34 | 42 |
35 def __init__(self, engine): | 43 def __init__(self, engine): |
36 pool_size = 5 | 44 pool_size = 5 |
37 | |
38 # If the engine has an C{optimal_thread_pool_size} attribute, then the | |
39 # maxthreads of the thread pool will be set to that value. This is | |
40 # most useful for SQLite in-memory connections, where exactly one | |
41 # connection (and thus thread) should be used. | |
42 if hasattr(engine, 'optimal_thread_pool_size'): | 45 if hasattr(engine, 'optimal_thread_pool_size'): |
43 pool_size = engine.optimal_thread_pool_size | 46 pool_size = engine.optimal_thread_pool_size |
44 | |
45 threadpool.ThreadPool.__init__(self, | 47 threadpool.ThreadPool.__init__(self, |
46 minthreads=1, | 48 minthreads=1, |
47 maxthreads=pool_size, | 49 maxthreads=pool_size, |
48 name='DBThreadPool') | 50 name='DBThreadPool') |
49 self.engine = engine | 51 self.engine = engine |
50 if engine.dialect.name == 'sqlite': | 52 if engine.dialect.name == 'sqlite': |
51 log.msg("applying SQLite workaround from Buildbot bug #1810") | 53 log.msg("applying SQLite workaround from Buildbot bug #1810") |
52 self.__broken_sqlite = self.detect_bug1810() | 54 self.__broken_sqlite = self.detect_bug1810() |
53 self._start_evt = reactor.callWhenRunning(self._start) | 55 self._start_evt = reactor.callWhenRunning(self._start) |
54 | 56 |
(...skipping 14 matching lines...) Expand all Loading... |
69 def shutdown(self): | 71 def shutdown(self): |
70 """Manually stop the pool. This is only necessary from tests, as the | 72 """Manually stop the pool. This is only necessary from tests, as the |
71 pool will stop itself when the reactor stops under normal | 73 pool will stop itself when the reactor stops under normal |
72 circumstances.""" | 74 circumstances.""" |
73 if not self._stop_evt: | 75 if not self._stop_evt: |
74 return # pool is already stopped | 76 return # pool is already stopped |
75 reactor.removeSystemEventTrigger(self._stop_evt) | 77 reactor.removeSystemEventTrigger(self._stop_evt) |
76 self._stop() | 78 self._stop() |
77 | 79 |
78 def do(self, callable, *args, **kwargs): | 80 def do(self, callable, *args, **kwargs): |
| 81 """ |
| 82 Call C{callable} in a thread, with a Connection as first argument. |
| 83 Returns a deferred that will indicate the results of the callable. |
| 84 |
| 85 Note: do not return any SQLAlchemy objects via this deferred! |
| 86 """ |
79 def thd(): | 87 def thd(): |
80 conn = self.engine.contextual_connect() | 88 conn = self.engine.contextual_connect() |
81 if self.__broken_sqlite: # see bug #1810 | 89 if self.__broken_sqlite: # see bug #1810 |
82 conn.execute("select * from sqlite_master") | 90 conn.execute("select * from sqlite_master") |
83 try: | 91 try: |
84 rv = callable(conn, *args, **kwargs) | 92 rv = callable(conn, *args, **kwargs) |
85 assert not isinstance(rv, sa.engine.ResultProxy), \ | 93 assert not isinstance(rv, sa.engine.ResultProxy), \ |
86 "do not return ResultProxy objects!" | 94 "do not return ResultProxy objects!" |
87 finally: | 95 finally: |
88 conn.close() | 96 conn.close() |
89 return rv | 97 return rv |
90 return threads.deferToThreadPool(reactor, self, thd) | 98 return threads.deferToThreadPool(reactor, self, thd) |
91 | 99 |
92 def do_with_engine(self, callable, *args, **kwargs): | 100 def do_with_engine(self, callable, *args, **kwargs): |
| 101 """ |
| 102 Like L{do}, but with an SQLAlchemy Engine as the first argument. This |
| 103 is only used for schema manipulation, and is not used at master |
| 104 runtime. |
| 105 """ |
93 def thd(): | 106 def thd(): |
94 if self.__broken_sqlite: # see bug #1810 | 107 if self.__broken_sqlite: # see bug #1810 |
95 self.engine.execute("select * from sqlite_master") | 108 self.engine.execute("select * from sqlite_master") |
96 rv = callable(self.engine, *args, **kwargs) | 109 rv = callable(self.engine, *args, **kwargs) |
97 assert not isinstance(rv, sa.engine.ResultProxy), \ | 110 assert not isinstance(rv, sa.engine.ResultProxy), \ |
98 "do not return ResultProxy objects!" | 111 "do not return ResultProxy objects!" |
99 return rv | 112 return rv |
100 return threads.deferToThreadPool(reactor, self, thd) | 113 return threads.deferToThreadPool(reactor, self, thd) |
101 | 114 |
102 # older implementations for twisted < 0.8.2, which does not have | 115 # older implementations for twisted < 0.8.2, which does not have |
(...skipping 69 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
172 | 185 |
173 try: | 186 try: |
174 test() | 187 test() |
175 except sqlite.OperationalError: | 188 except sqlite.OperationalError: |
176 # this is the expected error indicating it's broken | 189 # this is the expected error indicating it's broken |
177 return True | 190 return True |
178 | 191 |
179 # but this version should not fail.. | 192 # but this version should not fail.. |
180 test(select_from_sqlite_master=True) | 193 test(select_from_sqlite_master=True) |
181 return False # not broken - no workaround required | 194 return False # not broken - no workaround required |
OLD | NEW |