| OLD | NEW |
| 1 # This file is part of Buildbot. Buildbot is free software: you can | 1 # This file is part of Buildbot. Buildbot is free software: you can |
| 2 # redistribute it and/or modify it under the terms of the GNU General Public | 2 # redistribute it and/or modify it under the terms of the GNU General Public |
| 3 # License as published by the Free Software Foundation, version 2. | 3 # License as published by the Free Software Foundation, version 2. |
| 4 # | 4 # |
| 5 # This program is distributed in the hope that it will be useful, but WITHOUT | 5 # This program is distributed in the hope that it will be useful, but WITHOUT |
| 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| 8 # details. | 8 # details. |
| 9 # | 9 # |
| 10 # You should have received a copy of the GNU General Public License along with | 10 # You should have received a copy of the GNU General Public License along with |
| 11 # this program; if not, write to the Free Software Foundation, Inc., 51 | 11 # this program; if not, write to the Free Software Foundation, Inc., 51 |
| 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| 13 # | 13 # |
| 14 # Copyright Buildbot Team Members | 14 # Copyright Buildbot Team Members |
| 15 | 15 |
| 16 import os | 16 import os |
| 17 import sqlalchemy as sa | 17 import sqlalchemy as sa |
| 18 import twisted | 18 import twisted |
| 19 from twisted.internet import reactor, threads, defer | 19 from twisted.internet import reactor, threads, defer, task |
| 20 from twisted.python import threadpool, failure, versions, log | 20 from twisted.python import threadpool, failure, versions, log |
| 21 | 21 |
| 22 # Hack for bug #1992. In as-yet-unknown circumstances, select() fails to |
| 23 # notice that a selfpipe has been written to, thus causing callFromThread, as |
| 24 # used in deferToThreadPool, to hang indefinitely. The workaround is to wake |
| 25 # up the select loop every second by ensuring that there is an event occuring |
| 26 # every second, with this busy loop: |
| 27 def bug1992hack(f): |
| 28 def w(*args, **kwargs): |
| 29 busyloop = task.LoopingCall(lambda : None) |
| 30 busyloop.start(1) |
| 31 d = f(*args, **kwargs) |
| 32 def stop_loop(r): |
| 33 busyloop.stop() |
| 34 return r |
| 35 d.addBoth(stop_loop) |
| 36 return d |
| 37 w.__name__ = f.__name__ |
| 38 w.__doc__ = f.__doc__ |
| 39 return w |
| 40 |
| 22 | 41 |
| 23 class DBThreadPool(threadpool.ThreadPool): | 42 class DBThreadPool(threadpool.ThreadPool): |
| 24 """ | 43 """ |
| 25 A pool of threads ready and waiting to execute queries. | 44 A pool of threads ready and waiting to execute queries. |
| 26 | 45 |
| 27 If the engine has an C{optimal_thread_pool_size} attribute, then the | 46 If the engine has an C{optimal_thread_pool_size} attribute, then the |
| 28 maxthreads of the thread pool will be set to that value. This is most | 47 maxthreads of the thread pool will be set to that value. This is most |
| 29 useful for SQLite in-memory connections, where exactly one connection | 48 useful for SQLite in-memory connections, where exactly one connection |
| 30 (and thus thread) should be used. | 49 (and thus thread) should be used. |
| 31 """ | 50 """ |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 70 | 89 |
| 71 def shutdown(self): | 90 def shutdown(self): |
| 72 """Manually stop the pool. This is only necessary from tests, as the | 91 """Manually stop the pool. This is only necessary from tests, as the |
| 73 pool will stop itself when the reactor stops under normal | 92 pool will stop itself when the reactor stops under normal |
| 74 circumstances.""" | 93 circumstances.""" |
| 75 if not self._stop_evt: | 94 if not self._stop_evt: |
| 76 return # pool is already stopped | 95 return # pool is already stopped |
| 77 reactor.removeSystemEventTrigger(self._stop_evt) | 96 reactor.removeSystemEventTrigger(self._stop_evt) |
| 78 self._stop() | 97 self._stop() |
| 79 | 98 |
| 99 @bug1992hack |
| 80 def do(self, callable, *args, **kwargs): | 100 def do(self, callable, *args, **kwargs): |
| 81 """ | 101 """ |
| 82 Call C{callable} in a thread, with a Connection as first argument. | 102 Call C{callable} in a thread, with a Connection as first argument. |
| 83 Returns a deferred that will indicate the results of the callable. | 103 Returns a deferred that will indicate the results of the callable. |
| 84 | 104 |
| 85 Note: do not return any SQLAlchemy objects via this deferred! | 105 Note: do not return any SQLAlchemy objects via this deferred! |
| 86 """ | 106 """ |
| 87 def thd(): | 107 def thd(): |
| 88 conn = self.engine.contextual_connect() | 108 conn = self.engine.contextual_connect() |
| 89 if self.__broken_sqlite: # see bug #1810 | 109 if self.__broken_sqlite: # see bug #1810 |
| 90 conn.execute("select * from sqlite_master") | 110 conn.execute("select * from sqlite_master") |
| 91 try: | 111 try: |
| 92 rv = callable(conn, *args, **kwargs) | 112 rv = callable(conn, *args, **kwargs) |
| 93 assert not isinstance(rv, sa.engine.ResultProxy), \ | 113 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 94 "do not return ResultProxy objects!" | 114 "do not return ResultProxy objects!" |
| 95 finally: | 115 finally: |
| 96 conn.close() | 116 conn.close() |
| 97 return rv | 117 return rv |
| 98 return threads.deferToThreadPool(reactor, self, thd) | 118 return threads.deferToThreadPool(reactor, self, thd) |
| 99 | 119 |
| 120 @bug1992hack |
| 100 def do_with_engine(self, callable, *args, **kwargs): | 121 def do_with_engine(self, callable, *args, **kwargs): |
| 101 """ | 122 """ |
| 102 Like L{do}, but with an SQLAlchemy Engine as the first argument. This | 123 Like L{do}, but with an SQLAlchemy Engine as the first argument. This |
| 103 is only used for schema manipulation, and is not used at master | 124 is only used for schema manipulation, and is not used at master |
| 104 runtime. | 125 runtime. |
| 105 """ | 126 """ |
| 106 def thd(): | 127 def thd(): |
| 107 if self.__broken_sqlite: # see bug #1810 | 128 if self.__broken_sqlite: # see bug #1810 |
| 108 self.engine.execute("select * from sqlite_master") | 129 self.engine.execute("select * from sqlite_master") |
| 109 rv = callable(self.engine, *args, **kwargs) | 130 rv = callable(self.engine, *args, **kwargs) |
| 110 assert not isinstance(rv, sa.engine.ResultProxy), \ | 131 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 111 "do not return ResultProxy objects!" | 132 "do not return ResultProxy objects!" |
| 112 return rv | 133 return rv |
| 113 return threads.deferToThreadPool(reactor, self, thd) | 134 return threads.deferToThreadPool(reactor, self, thd) |
| 114 | 135 |
| 115 # older implementations for twisted < 0.8.2, which does not have | 136 # older implementations for twisted < 0.8.2, which does not have |
| 116 # deferToThreadPool; this basically re-implements it, although it gets some | 137 # deferToThreadPool; this basically re-implements it, although it gets some |
| 117 # of the synchronization wrong - the thread may still be "in use" when the | 138 # of the synchronization wrong - the thread may still be "in use" when the |
| 118 # deferred fires in the parent, which can lead to database accesses hopping | 139 # deferred fires in the parent, which can lead to database accesses hopping |
| 119 # between threads. In practice, this should not cause any difficulty. | 140 # between threads. In practice, this should not cause any difficulty. |
| 141 @bug1992hack |
| 120 def do_081(self, callable, *args, **kwargs): # pragma: no cover | 142 def do_081(self, callable, *args, **kwargs): # pragma: no cover |
| 121 d = defer.Deferred() | 143 d = defer.Deferred() |
| 122 def thd(): | 144 def thd(): |
| 123 try: | 145 try: |
| 124 conn = self.engine.contextual_connect() | 146 conn = self.engine.contextual_connect() |
| 125 if self.__broken_sqlite: # see bug #1810 | 147 if self.__broken_sqlite: # see bug #1810 |
| 126 conn.execute("select * from sqlite_master") | 148 conn.execute("select * from sqlite_master") |
| 127 try: | 149 try: |
| 128 rv = callable(conn, *args, **kwargs) | 150 rv = callable(conn, *args, **kwargs) |
| 129 assert not isinstance(rv, sa.engine.ResultProxy), \ | 151 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 130 "do not return ResultProxy objects!" | 152 "do not return ResultProxy objects!" |
| 131 finally: | 153 finally: |
| 132 conn.close() | 154 conn.close() |
| 133 reactor.callFromThread(d.callback, rv) | 155 reactor.callFromThread(d.callback, rv) |
| 134 except: | 156 except: |
| 135 reactor.callFromThread(d.errback, failure.Failure()) | 157 reactor.callFromThread(d.errback, failure.Failure()) |
| 136 self.callInThread(thd) | 158 self.callInThread(thd) |
| 137 return d | 159 return d |
| 138 | 160 |
| 161 @bug1992hack |
| 139 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover | 162 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
| 140 d = defer.Deferred() | 163 d = defer.Deferred() |
| 141 def thd(): | 164 def thd(): |
| 142 try: | 165 try: |
| 143 conn = self.engine | 166 conn = self.engine |
| 144 if self.__broken_sqlite: # see bug #1810 | 167 if self.__broken_sqlite: # see bug #1810 |
| 145 conn.execute("select * from sqlite_master") | 168 conn.execute("select * from sqlite_master") |
| 146 rv = callable(conn, *args, **kwargs) | 169 rv = callable(conn, *args, **kwargs) |
| 147 assert not isinstance(rv, sa.engine.ResultProxy), \ | 170 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 148 "do not return ResultProxy objects!" | 171 "do not return ResultProxy objects!" |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 185 | 208 |
| 186 try: | 209 try: |
| 187 test() | 210 test() |
| 188 except sqlite.OperationalError: | 211 except sqlite.OperationalError: |
| 189 # this is the expected error indicating it's broken | 212 # this is the expected error indicating it's broken |
| 190 return True | 213 return True |
| 191 | 214 |
| 192 # but this version should not fail.. | 215 # but this version should not fail.. |
| 193 test(select_from_sqlite_master=True) | 216 test(select_from_sqlite_master=True) |
| 194 return False # not broken - no workaround required | 217 return False # not broken - no workaround required |
| OLD | NEW |