Chromium Code Reviews| Index: third_party/buildbot_8_4p1/buildbot/db/pool.py | 
| diff --git a/third_party/buildbot_8_4p1/buildbot/db/pool.py b/third_party/buildbot_8_4p1/buildbot/db/pool.py | 
| index fe3540ad68b91faa9c4fc688bf6f9e4a5d5e7002..258f036be8d90fbabe5d1c9d1bf42724bea0f99a 100644 | 
| --- a/third_party/buildbot_8_4p1/buildbot/db/pool.py | 
| +++ b/third_party/buildbot_8_4p1/buildbot/db/pool.py | 
| @@ -13,9 +13,11 @@ | 
| # | 
| # Copyright Buildbot Team Members | 
| +import time | 
| import os | 
| import sqlalchemy as sa | 
| import twisted | 
| +from buildbot.process import metrics | 
| from twisted.internet import reactor, threads, defer | 
| from twisted.python import threadpool, failure, versions, log | 
| @@ -87,70 +89,92 @@ class DBThreadPool(threadpool.ThreadPool): | 
| reactor.removeSystemEventTrigger(self._stop_evt) | 
| self._stop() | 
| - def do(self, callable, *args, **kwargs): | 
| - def thd(): | 
| - conn = self.engine.contextual_connect() | 
| + # Try about 170 times over the space of a day, with the last few tries | 
| 
 
ghost stip (do not use)
2016/07/01 01:30:16
uh
 
 | 
| + # being about an hour apart. This is designed to span a reasonable amount | 
| + # of time for repairing a broken database server, while still failing | 
| + # actual problematic queries eventually | 
| + BACKOFF_START = 1.0 | 
| + BACKOFF_MULT = 1.05 | 
| + MAX_OPERATIONALERROR_TIME = 3600*24 # one day | 
| + def __thd(self, with_engine, callable, args, kwargs): | 
| + # try to call callable(arg, *args, **kwargs) repeatedly until no | 
| + # OperationalErrors occur, where arg is either the engine (with_engine) | 
| + # or a connection (not with_engine) | 
| + backoff = self.BACKOFF_START | 
| + start = time.time() | 
| + while True: | 
| + if with_engine: | 
| + arg = self.engine | 
| + else: | 
| + arg = self.engine.contextual_connect() | 
| + | 
| if self.__broken_sqlite: # see bug #1810 | 
| - conn.execute("select * from sqlite_master") | 
| + arg.execute("select * from sqlite_master") | 
| try: | 
| - rv = callable(conn, *args, **kwargs) | 
| + rv = callable(arg, *args, **kwargs) | 
| assert not isinstance(rv, sa.engine.ResultProxy), \ | 
| "do not return ResultProxy objects!" | 
| + except sa.exc.OperationalError, e: | 
| + text = e.orig.args[0] | 
| + if "Lost connection" in text \ | 
| + or "database is locked" in text: | 
| + | 
| + # see if we've retried too much | 
| + elapsed = time.time() - start | 
| + if elapsed > self.MAX_OPERATIONALERROR_TIME: | 
| + raise | 
| + | 
| + metrics.MetricCountEvent.log( | 
| + "DBThreadPool.retry-on-OperationalError") | 
| + log.msg("automatically retrying query after " | 
| + "OperationalError (%ss sleep)" % backoff) | 
| + | 
| + # sleep (remember, we're in a thread..) | 
| + time.sleep(backoff) | 
| + backoff *= self.BACKOFF_MULT | 
| + | 
| + # and re-try | 
| + continue | 
| + else: | 
| + raise | 
| finally: | 
| - conn.close() | 
| - return rv | 
| - return threads.deferToThreadPool(reactor, self, thd) | 
| + if not with_engine: | 
| + arg.close() | 
| + break | 
| + return rv | 
| + | 
| + def do(self, callable, *args, **kwargs): | 
| + return threads.deferToThreadPool(reactor, self, | 
| + self.__thd, False, callable, args, kwargs) | 
| def do_with_engine(self, callable, *args, **kwargs): | 
| - def thd(): | 
| - if self.__broken_sqlite: # see bug #1810 | 
| - self.engine.execute("select * from sqlite_master") | 
| - rv = callable(self.engine, *args, **kwargs) | 
| - assert not isinstance(rv, sa.engine.ResultProxy), \ | 
| - "do not return ResultProxy objects!" | 
| - return rv | 
| - return threads.deferToThreadPool(reactor, self, thd) | 
| + return threads.deferToThreadPool(reactor, self, | 
| + self.__thd, True, callable, args, kwargs) | 
| # older implementations for twisted < 0.8.2, which does not have | 
| # deferToThreadPool; this basically re-implements it, although it gets some | 
| # of the synchronization wrong - the thread may still be "in use" when the | 
| # deferred fires in the parent, which can lead to database accesses hopping | 
| # between threads. In practice, this should not cause any difficulty. | 
| - def do_081(self, callable, *args, **kwargs): # pragma: no cover | 
| - d = defer.Deferred() | 
| - def thd(): | 
| - try: | 
| - conn = self.engine.contextual_connect() | 
| - if self.__broken_sqlite: # see bug #1810 | 
| - conn.execute("select * from sqlite_master") | 
| - try: | 
| - rv = callable(conn, *args, **kwargs) | 
| - assert not isinstance(rv, sa.engine.ResultProxy), \ | 
| - "do not return ResultProxy objects!" | 
| - finally: | 
| - conn.close() | 
| - reactor.callFromThread(d.callback, rv) | 
| - except: | 
| - reactor.callFromThread(d.errback, failure.Failure()) | 
| - self.callInThread(thd) | 
| - return d | 
| - | 
| - def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover | 
| - d = defer.Deferred() | 
| - def thd(): | 
| - try: | 
| - conn = self.engine | 
| - if self.__broken_sqlite: # see bug #1810 | 
| - conn.execute("select * from sqlite_master") | 
| - rv = callable(conn, *args, **kwargs) | 
| - assert not isinstance(rv, sa.engine.ResultProxy), \ | 
| - "do not return ResultProxy objects!" | 
| - reactor.callFromThread(d.callback, rv) | 
| - except: | 
| - reactor.callFromThread(d.errback, failure.Failure()) | 
| - self.callInThread(thd) | 
| - return d | 
| if twisted.version < versions.Version('twisted', 8, 2, 0): | 
| + def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover | 
| + d = defer.Deferred() | 
| + def thd(): | 
| + try: | 
| + reactor.callFromThread(d.callback, | 
| + self.__thd(with_engine, callable, args, kwargs)) | 
| + except: | 
| + reactor.callFromThread(d.errback, | 
| + failure.Failure()) | 
| + self.callInThread(thd) | 
| + return d | 
| + | 
| + def do_081(self, callable, *args, **kwargs): # pragma: no cover | 
| + return self.__081_wrap(False, callable, args, kwargs) | 
| + | 
| + def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover | 
| + return self.__081_wrap(True, callable, args, kwargs) | 
| + | 
| do = do_081 | 
| do_with_engine = do_with_engine_081 |