| OLD | NEW |
| 1 # This file is part of Buildbot. Buildbot is free software: you can | 1 # This file is part of Buildbot. Buildbot is free software: you can |
| 2 # redistribute it and/or modify it under the terms of the GNU General Public | 2 # redistribute it and/or modify it under the terms of the GNU General Public |
| 3 # License as published by the Free Software Foundation, version 2. | 3 # License as published by the Free Software Foundation, version 2. |
| 4 # | 4 # |
| 5 # This program is distributed in the hope that it will be useful, but WITHOUT | 5 # This program is distributed in the hope that it will be useful, but WITHOUT |
| 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
| 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
| 8 # details. | 8 # details. |
| 9 # | 9 # |
| 10 # You should have received a copy of the GNU General Public License along with | 10 # You should have received a copy of the GNU General Public License along with |
| 11 # this program; if not, write to the Free Software Foundation, Inc., 51 | 11 # this program; if not, write to the Free Software Foundation, Inc., 51 |
| 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
| 13 # | 13 # |
| 14 # Copyright Buildbot Team Members | 14 # Copyright Buildbot Team Members |
| 15 | 15 |
| 16 import time |
| 16 import os | 17 import os |
| 17 import sqlalchemy as sa | 18 import sqlalchemy as sa |
| 18 import twisted | 19 import twisted |
| 20 from buildbot.process import metrics |
| 19 from twisted.internet import reactor, threads, defer | 21 from twisted.internet import reactor, threads, defer |
| 20 from twisted.python import threadpool, failure, versions, log | 22 from twisted.python import threadpool, failure, versions, log |
| 21 | 23 |
| 22 | 24 |
| 23 class DBThreadPool(threadpool.ThreadPool): | 25 class DBThreadPool(threadpool.ThreadPool): |
| 24 | 26 |
| 25 running = False | 27 running = False |
| 26 | 28 |
| 27 # Some versions of SQLite incorrectly cache metadata about which tables are | 29 # Some versions of SQLite incorrectly cache metadata about which tables are |
| 28 # and are not present on a per-connection basis. This cache can be flushed | 30 # and are not present on a per-connection basis. This cache can be flushed |
| (...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 80 | 82 |
| 81 def shutdown(self): | 83 def shutdown(self): |
| 82 """Manually stop the pool. This is only necessary from tests, as the | 84 """Manually stop the pool. This is only necessary from tests, as the |
| 83 pool will stop itself when the reactor stops under normal | 85 pool will stop itself when the reactor stops under normal |
| 84 circumstances.""" | 86 circumstances.""" |
| 85 if not self._stop_evt: | 87 if not self._stop_evt: |
| 86 return # pool is already stopped | 88 return # pool is already stopped |
| 87 reactor.removeSystemEventTrigger(self._stop_evt) | 89 reactor.removeSystemEventTrigger(self._stop_evt) |
| 88 self._stop() | 90 self._stop() |
| 89 | 91 |
| 90 def do(self, callable, *args, **kwargs): | 92 # Try about 170 times over the space of a day, with the last few tries |
| 91 def thd(): | 93 # being about an hour apart. This is designed to span a reasonable amount |
| 92 conn = self.engine.contextual_connect() | 94 # of time for repairing a broken database server, while still failing |
| 95 # actual problematic queries eventually |
| 96 BACKOFF_START = 1.0 |
| 97 BACKOFF_MULT = 1.05 |
| 98 MAX_OPERATIONALERROR_TIME = 3600*24 # one day |
| 99 def __thd(self, with_engine, callable, args, kwargs): |
| 100 # try to call callable(arg, *args, **kwargs) repeatedly until no |
| 101 # OperationalErrors occur, where arg is either the engine (with_engine) |
| 102 # or a connection (not with_engine) |
| 103 backoff = self.BACKOFF_START |
| 104 start = time.time() |
| 105 while True: |
| 106 if with_engine: |
| 107 arg = self.engine |
| 108 else: |
| 109 arg = self.engine.contextual_connect() |
| 110 |
| 93 if self.__broken_sqlite: # see bug #1810 | 111 if self.__broken_sqlite: # see bug #1810 |
| 94 conn.execute("select * from sqlite_master") | 112 arg.execute("select * from sqlite_master") |
| 95 try: | 113 try: |
| 96 rv = callable(conn, *args, **kwargs) | 114 rv = callable(arg, *args, **kwargs) |
| 97 assert not isinstance(rv, sa.engine.ResultProxy), \ | 115 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 98 "do not return ResultProxy objects!" | 116 "do not return ResultProxy objects!" |
| 117 except sa.exc.OperationalError, e: |
| 118 text = e.orig.args[0] |
| 119 if "Lost connection" in text \ |
| 120 or "database is locked" in text: |
| 121 |
| 122 # see if we've retried too much |
| 123 elapsed = time.time() - start |
| 124 if elapsed > self.MAX_OPERATIONALERROR_TIME: |
| 125 raise |
| 126 |
| 127 metrics.MetricCountEvent.log( |
| 128 "DBThreadPool.retry-on-OperationalError") |
| 129 log.msg("automatically retrying query after " |
| 130 "OperationalError (%ss sleep)" % backoff) |
| 131 |
| 132 # sleep (remember, we're in a thread..) |
| 133 time.sleep(backoff) |
| 134 backoff *= self.BACKOFF_MULT |
| 135 |
| 136 # and re-try |
| 137 continue |
| 138 else: |
| 139 raise |
| 99 finally: | 140 finally: |
| 100 conn.close() | 141 if not with_engine: |
| 101 return rv | 142 arg.close() |
| 102 return threads.deferToThreadPool(reactor, self, thd) | 143 break |
| 144 return rv |
| 145 |
| 146 def do(self, callable, *args, **kwargs): |
| 147 return threads.deferToThreadPool(reactor, self, |
| 148 self.__thd, False, callable, args, kwargs) |
| 103 | 149 |
| 104 def do_with_engine(self, callable, *args, **kwargs): | 150 def do_with_engine(self, callable, *args, **kwargs): |
| 105 def thd(): | 151 return threads.deferToThreadPool(reactor, self, |
| 106 if self.__broken_sqlite: # see bug #1810 | 152 self.__thd, True, callable, args, kwargs) |
| 107 self.engine.execute("select * from sqlite_master") | |
| 108 rv = callable(self.engine, *args, **kwargs) | |
| 109 assert not isinstance(rv, sa.engine.ResultProxy), \ | |
| 110 "do not return ResultProxy objects!" | |
| 111 return rv | |
| 112 return threads.deferToThreadPool(reactor, self, thd) | |
| 113 | 153 |
| 114 # older implementations for twisted < 0.8.2, which does not have | 154 # older implementations for twisted < 0.8.2, which does not have |
| 115 # deferToThreadPool; this basically re-implements it, although it gets some | 155 # deferToThreadPool; this basically re-implements it, although it gets some |
| 116 # of the synchronization wrong - the thread may still be "in use" when the | 156 # of the synchronization wrong - the thread may still be "in use" when the |
| 117 # deferred fires in the parent, which can lead to database accesses hopping | 157 # deferred fires in the parent, which can lead to database accesses hopping |
| 118 # between threads. In practice, this should not cause any difficulty. | 158 # between threads. In practice, this should not cause any difficulty. |
| 119 def do_081(self, callable, *args, **kwargs): # pragma: no cover | 159 if twisted.version < versions.Version('twisted', 8, 2, 0): |
| 120 d = defer.Deferred() | 160 def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no
cover |
| 121 def thd(): | 161 d = defer.Deferred() |
| 122 try: | 162 def thd(): |
| 123 conn = self.engine.contextual_connect() | |
| 124 if self.__broken_sqlite: # see bug #1810 | |
| 125 conn.execute("select * from sqlite_master") | |
| 126 try: | 163 try: |
| 127 rv = callable(conn, *args, **kwargs) | 164 reactor.callFromThread(d.callback, |
| 128 assert not isinstance(rv, sa.engine.ResultProxy), \ | 165 self.__thd(with_engine, callable, args, kwargs)) |
| 129 "do not return ResultProxy objects!" | 166 except: |
| 130 finally: | 167 reactor.callFromThread(d.errback, |
| 131 conn.close() | 168 failure.Failure()) |
| 132 reactor.callFromThread(d.callback, rv) | 169 self.callInThread(thd) |
| 133 except: | 170 return d |
| 134 reactor.callFromThread(d.errback, failure.Failure()) | |
| 135 self.callInThread(thd) | |
| 136 return d | |
| 137 | 171 |
| 138 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover | 172 def do_081(self, callable, *args, **kwargs): # pragma: no cover |
| 139 d = defer.Deferred() | 173 return self.__081_wrap(False, callable, args, kwargs) |
| 140 def thd(): | 174 |
| 141 try: | 175 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no co
ver |
| 142 conn = self.engine | 176 return self.__081_wrap(True, callable, args, kwargs) |
| 143 if self.__broken_sqlite: # see bug #1810 | 177 |
| 144 conn.execute("select * from sqlite_master") | |
| 145 rv = callable(conn, *args, **kwargs) | |
| 146 assert not isinstance(rv, sa.engine.ResultProxy), \ | |
| 147 "do not return ResultProxy objects!" | |
| 148 reactor.callFromThread(d.callback, rv) | |
| 149 except: | |
| 150 reactor.callFromThread(d.errback, failure.Failure()) | |
| 151 self.callInThread(thd) | |
| 152 return d | |
| 153 if twisted.version < versions.Version('twisted', 8, 2, 0): | |
| 154 do = do_081 | 178 do = do_081 |
| 155 do_with_engine = do_with_engine_081 | 179 do_with_engine = do_with_engine_081 |
| 156 | 180 |
| 157 def detect_bug1810(self): | 181 def detect_bug1810(self): |
| 158 # detect buggy SQLite implementations; call only for a known-sqlite | 182 # detect buggy SQLite implementations; call only for a known-sqlite |
| 159 # dialect | 183 # dialect |
| 160 try: | 184 try: |
| 161 import pysqlite2.dbapi2 as sqlite | 185 import pysqlite2.dbapi2 as sqlite |
| 162 sqlite = sqlite | 186 sqlite = sqlite |
| 163 except ImportError: | 187 except ImportError: |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 203 except: | 227 except: |
| 204 return (0,) | 228 return (0,) |
| 205 | 229 |
| 206 if vers_row: | 230 if vers_row: |
| 207 try: | 231 try: |
| 208 return tuple(map(int, vers_row[0].split('.'))) | 232 return tuple(map(int, vers_row[0].split('.'))) |
| 209 except (TypeError, ValueError): | 233 except (TypeError, ValueError): |
| 210 return (0,) | 234 return (0,) |
| 211 else: | 235 else: |
| 212 return (0,) | 236 return (0,) |
| OLD | NEW |