| OLD | NEW |
| 1 # This file is part of Buildbot. Buildbot is free software: you can | 1 # This file is part of Buildbot. Buildbot is free software: you can |
| 2 # redistribute it and/or modify it under the terms of the GNU General Public | 2 # redistribute it and/or modify it under the terms of the GNU General Public |
| 3 # License as published by the Free Software Foundation, version 2. | 3 # License as published by the Free Software Foundation, version 2. |
| 4 # | 4 # |
| 5 # This program is distributed in the hope that it will be useful, but WITHOUT | 5 # This program is distributed in the hope that it will be useful, but WITHOUT |
| 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| 8 # details. | 8 # details. |
| 9 # | 9 # |
| 10 # You should have received a copy of the GNU General Public License along with | 10 # You should have received a copy of the GNU General Public License along with |
| 11 # this program; if not, write to the Free Software Foundation, Inc., 51 | 11 # this program; if not, write to the Free Software Foundation, Inc., 51 |
| 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| 13 # | 13 # |
| 14 # Copyright Buildbot Team Members | 14 # Copyright Buildbot Team Members |
| 15 | 15 |
| 16 import os | 16 import os |
| 17 import sqlalchemy as sa | 17 import sqlalchemy as sa |
| 18 import twisted | 18 import twisted |
| 19 from twisted.internet import reactor, threads, defer, task | 19 from twisted.internet import reactor, threads, defer |
| 20 from twisted.python import threadpool, failure, versions, log | 20 from twisted.python import threadpool, failure, versions, log |
| 21 | 21 |
| 22 # Hack for bug #1992. In as-yet-unknown circumstances, select() fails to | |
| 23 # notice that a selfpipe has been written to, thus causing callFromThread, as | |
| 24 # used in deferToThreadPool, to hang indefinitely. The workaround is to wake | |
| 25 # up the select loop every second by ensuring that there is an event occuring | |
| 26 # every second, with this busy loop: | |
| 27 def bug1992hack(f): | |
| 28 def w(*args, **kwargs): | |
| 29 busyloop = task.LoopingCall(lambda : None) | |
| 30 busyloop.start(1) | |
| 31 d = f(*args, **kwargs) | |
| 32 def stop_loop(r): | |
| 33 busyloop.stop() | |
| 34 return r | |
| 35 d.addBoth(stop_loop) | |
| 36 return d | |
| 37 w.__name__ = f.__name__ | |
| 38 w.__doc__ = f.__doc__ | |
| 39 return w | |
| 40 | |
| 41 | 22 |
| 42 class DBThreadPool(threadpool.ThreadPool): | 23 class DBThreadPool(threadpool.ThreadPool): |
| 43 """ | 24 """ |
| 44 A pool of threads ready and waiting to execute queries. | 25 A pool of threads ready and waiting to execute queries. |
| 45 | 26 |
| 46 If the engine has an C{optimal_thread_pool_size} attribute, then the | 27 If the engine has an C{optimal_thread_pool_size} attribute, then the |
| 47 maxthreads of the thread pool will be set to that value. This is most | 28 maxthreads of the thread pool will be set to that value. This is most |
| 48 useful for SQLite in-memory connections, where exactly one connection | 29 useful for SQLite in-memory connections, where exactly one connection |
| 49 (and thus thread) should be used. | 30 (and thus thread) should be used. |
| 50 """ | 31 """ |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 100 | 81 |
| 101 def shutdown(self): | 82 def shutdown(self): |
| 102 """Manually stop the pool. This is only necessary from tests, as the | 83 """Manually stop the pool. This is only necessary from tests, as the |
| 103 pool will stop itself when the reactor stops under normal | 84 pool will stop itself when the reactor stops under normal |
| 104 circumstances.""" | 85 circumstances.""" |
| 105 if not self._stop_evt: | 86 if not self._stop_evt: |
| 106 return # pool is already stopped | 87 return # pool is already stopped |
| 107 reactor.removeSystemEventTrigger(self._stop_evt) | 88 reactor.removeSystemEventTrigger(self._stop_evt) |
| 108 self._stop() | 89 self._stop() |
| 109 | 90 |
| 110 @bug1992hack | |
| 111 def do(self, callable, *args, **kwargs): | 91 def do(self, callable, *args, **kwargs): |
| 112 """ | 92 """ |
| 113 Call C{callable} in a thread, with a Connection as first argument. | 93 Call C{callable} in a thread, with a Connection as first argument. |
| 114 Returns a deferred that will indicate the results of the callable. | 94 Returns a deferred that will indicate the results of the callable. |
| 115 | 95 |
| 116 Note: do not return any SQLAlchemy objects via this deferred! | 96 Note: do not return any SQLAlchemy objects via this deferred! |
| 117 """ | 97 """ |
| 118 def thd(): | 98 def thd(): |
| 119 conn = self.engine.contextual_connect() | 99 conn = self.engine.contextual_connect() |
| 120 if self.__broken_sqlite: # see bug #1810 | 100 if self.__broken_sqlite: # see bug #1810 |
| 121 conn.execute("select * from sqlite_master") | 101 conn.execute("select * from sqlite_master") |
| 122 try: | 102 try: |
| 123 rv = callable(conn, *args, **kwargs) | 103 rv = callable(conn, *args, **kwargs) |
| 124 assert not isinstance(rv, sa.engine.ResultProxy), \ | 104 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 125 "do not return ResultProxy objects!" | 105 "do not return ResultProxy objects!" |
| 126 finally: | 106 finally: |
| 127 conn.close() | 107 conn.close() |
| 128 return rv | 108 return rv |
| 129 return threads.deferToThreadPool(reactor, self, thd) | 109 return threads.deferToThreadPool(reactor, self, thd) |
| 130 | 110 |
| 131 @bug1992hack | |
| 132 def do_with_engine(self, callable, *args, **kwargs): | 111 def do_with_engine(self, callable, *args, **kwargs): |
| 133 """ | 112 """ |
| 134 Like L{do}, but with an SQLAlchemy Engine as the first argument. This | 113 Like L{do}, but with an SQLAlchemy Engine as the first argument. This |
| 135 is only used for schema manipulation, and is not used at master | 114 is only used for schema manipulation, and is not used at master |
| 136 runtime. | 115 runtime. |
| 137 """ | 116 """ |
| 138 def thd(): | 117 def thd(): |
| 139 if self.__broken_sqlite: # see bug #1810 | 118 if self.__broken_sqlite: # see bug #1810 |
| 140 self.engine.execute("select * from sqlite_master") | 119 self.engine.execute("select * from sqlite_master") |
| 141 rv = callable(self.engine, *args, **kwargs) | 120 rv = callable(self.engine, *args, **kwargs) |
| 142 assert not isinstance(rv, sa.engine.ResultProxy), \ | 121 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 143 "do not return ResultProxy objects!" | 122 "do not return ResultProxy objects!" |
| 144 return rv | 123 return rv |
| 145 return threads.deferToThreadPool(reactor, self, thd) | 124 return threads.deferToThreadPool(reactor, self, thd) |
| 146 | 125 |
| 147 # older implementations for twisted < 0.8.2, which does not have | 126 # older implementations for twisted < 0.8.2, which does not have |
| 148 # deferToThreadPool; this basically re-implements it, although it gets some | 127 # deferToThreadPool; this basically re-implements it, although it gets some |
| 149 # of the synchronization wrong - the thread may still be "in use" when the | 128 # of the synchronization wrong - the thread may still be "in use" when the |
| 150 # deferred fires in the parent, which can lead to database accesses hopping | 129 # deferred fires in the parent, which can lead to database accesses hopping |
| 151 # between threads. In practice, this should not cause any difficulty. | 130 # between threads. In practice, this should not cause any difficulty. |
| 152 @bug1992hack | |
| 153 def do_081(self, callable, *args, **kwargs): # pragma: no cover | 131 def do_081(self, callable, *args, **kwargs): # pragma: no cover |
| 154 d = defer.Deferred() | 132 d = defer.Deferred() |
| 155 def thd(): | 133 def thd(): |
| 156 try: | 134 try: |
| 157 conn = self.engine.contextual_connect() | 135 conn = self.engine.contextual_connect() |
| 158 if self.__broken_sqlite: # see bug #1810 | 136 if self.__broken_sqlite: # see bug #1810 |
| 159 conn.execute("select * from sqlite_master") | 137 conn.execute("select * from sqlite_master") |
| 160 try: | 138 try: |
| 161 rv = callable(conn, *args, **kwargs) | 139 rv = callable(conn, *args, **kwargs) |
| 162 assert not isinstance(rv, sa.engine.ResultProxy), \ | 140 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 163 "do not return ResultProxy objects!" | 141 "do not return ResultProxy objects!" |
| 164 finally: | 142 finally: |
| 165 conn.close() | 143 conn.close() |
| 166 reactor.callFromThread(d.callback, rv) | 144 reactor.callFromThread(d.callback, rv) |
| 167 except: | 145 except: |
| 168 reactor.callFromThread(d.errback, failure.Failure()) | 146 reactor.callFromThread(d.errback, failure.Failure()) |
| 169 self.callInThread(thd) | 147 self.callInThread(thd) |
| 170 return d | 148 return d |
| 171 | 149 |
| 172 @bug1992hack | |
| 173 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover | 150 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
| 174 d = defer.Deferred() | 151 d = defer.Deferred() |
| 175 def thd(): | 152 def thd(): |
| 176 try: | 153 try: |
| 177 conn = self.engine | 154 conn = self.engine |
| 178 if self.__broken_sqlite: # see bug #1810 | 155 if self.__broken_sqlite: # see bug #1810 |
| 179 conn.execute("select * from sqlite_master") | 156 conn.execute("select * from sqlite_master") |
| 180 rv = callable(conn, *args, **kwargs) | 157 rv = callable(conn, *args, **kwargs) |
| 181 assert not isinstance(rv, sa.engine.ResultProxy), \ | 158 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 182 "do not return ResultProxy objects!" | 159 "do not return ResultProxy objects!" |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 238 except: | 215 except: |
| 239 return (0,) | 216 return (0,) |
| 240 | 217 |
| 241 if vers_row: | 218 if vers_row: |
| 242 try: | 219 try: |
| 243 return tuple(map(int, vers_row[0].split('.'))) | 220 return tuple(map(int, vers_row[0].split('.'))) |
| 244 except (TypeError, ValueError): | 221 except (TypeError, ValueError): |
| 245 return (0,) | 222 return (0,) |
| 246 else: | 223 else: |
| 247 return (0,) | 224 return (0,) |
| OLD | NEW |