Chromium Code Reviews| Index: third_party/buildbot_8_4p1/buildbot/db/pool.py |
| diff --git a/third_party/buildbot_8_4p1/buildbot/db/pool.py b/third_party/buildbot_8_4p1/buildbot/db/pool.py |
| index fe3540ad68b91faa9c4fc688bf6f9e4a5d5e7002..258f036be8d90fbabe5d1c9d1bf42724bea0f99a 100644 |
| --- a/third_party/buildbot_8_4p1/buildbot/db/pool.py |
| +++ b/third_party/buildbot_8_4p1/buildbot/db/pool.py |
| @@ -13,9 +13,11 @@ |
| # |
| # Copyright Buildbot Team Members |
| +import time |
| import os |
| import sqlalchemy as sa |
| import twisted |
| +from buildbot.process import metrics |
| from twisted.internet import reactor, threads, defer |
| from twisted.python import threadpool, failure, versions, log |
| @@ -87,70 +89,92 @@ class DBThreadPool(threadpool.ThreadPool): |
| reactor.removeSystemEventTrigger(self._stop_evt) |
| self._stop() |
| - def do(self, callable, *args, **kwargs): |
| - def thd(): |
| - conn = self.engine.contextual_connect() |
| + # Try about 170 times over the space of a day, with the last few tries |
|
ghost stip (do not use)
2016/07/01 01:30:16
uh
|
| + # being about an hour apart. This is designed to span a reasonable amount |
| + # of time for repairing a broken database server, while still failing |
| + # actual problematic queries eventually |
| + BACKOFF_START = 1.0 |
| + BACKOFF_MULT = 1.05 |
| + MAX_OPERATIONALERROR_TIME = 3600*24 # one day |
| + def __thd(self, with_engine, callable, args, kwargs): |
| + # try to call callable(arg, *args, **kwargs) repeatedly until no |
| + # OperationalErrors occur, where arg is either the engine (with_engine) |
| + # or a connection (not with_engine) |
| + backoff = self.BACKOFF_START |
| + start = time.time() |
| + while True: |
| + if with_engine: |
| + arg = self.engine |
| + else: |
| + arg = self.engine.contextual_connect() |
| + |
| if self.__broken_sqlite: # see bug #1810 |
| - conn.execute("select * from sqlite_master") |
| + arg.execute("select * from sqlite_master") |
| try: |
| - rv = callable(conn, *args, **kwargs) |
| + rv = callable(arg, *args, **kwargs) |
| assert not isinstance(rv, sa.engine.ResultProxy), \ |
| "do not return ResultProxy objects!" |
| + except sa.exc.OperationalError, e: |
| + text = e.orig.args[0] |
| + if "Lost connection" in text \ |
| + or "database is locked" in text: |
| + |
| + # see if we've retried too much |
| + elapsed = time.time() - start |
| + if elapsed > self.MAX_OPERATIONALERROR_TIME: |
| + raise |
| + |
| + metrics.MetricCountEvent.log( |
| + "DBThreadPool.retry-on-OperationalError") |
| + log.msg("automatically retrying query after " |
| + "OperationalError (%ss sleep)" % backoff) |
| + |
| + # sleep (remember, we're in a thread..) |
| + time.sleep(backoff) |
| + backoff *= self.BACKOFF_MULT |
| + |
| + # and re-try |
| + continue |
| + else: |
| + raise |
| finally: |
| - conn.close() |
| - return rv |
| - return threads.deferToThreadPool(reactor, self, thd) |
| + if not with_engine: |
| + arg.close() |
| + break |
| + return rv |
| + |
| + def do(self, callable, *args, **kwargs): |
| + return threads.deferToThreadPool(reactor, self, |
| + self.__thd, False, callable, args, kwargs) |
| def do_with_engine(self, callable, *args, **kwargs): |
| - def thd(): |
| - if self.__broken_sqlite: # see bug #1810 |
| - self.engine.execute("select * from sqlite_master") |
| - rv = callable(self.engine, *args, **kwargs) |
| - assert not isinstance(rv, sa.engine.ResultProxy), \ |
| - "do not return ResultProxy objects!" |
| - return rv |
| - return threads.deferToThreadPool(reactor, self, thd) |
| + return threads.deferToThreadPool(reactor, self, |
| + self.__thd, True, callable, args, kwargs) |
| # older implementations for twisted < 0.8.2, which does not have |
| # deferToThreadPool; this basically re-implements it, although it gets some |
| # of the synchronization wrong - the thread may still be "in use" when the |
| # deferred fires in the parent, which can lead to database accesses hopping |
| # between threads. In practice, this should not cause any difficulty. |
| - def do_081(self, callable, *args, **kwargs): # pragma: no cover |
| - d = defer.Deferred() |
| - def thd(): |
| - try: |
| - conn = self.engine.contextual_connect() |
| - if self.__broken_sqlite: # see bug #1810 |
| - conn.execute("select * from sqlite_master") |
| - try: |
| - rv = callable(conn, *args, **kwargs) |
| - assert not isinstance(rv, sa.engine.ResultProxy), \ |
| - "do not return ResultProxy objects!" |
| - finally: |
| - conn.close() |
| - reactor.callFromThread(d.callback, rv) |
| - except: |
| - reactor.callFromThread(d.errback, failure.Failure()) |
| - self.callInThread(thd) |
| - return d |
| - |
| - def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
| - d = defer.Deferred() |
| - def thd(): |
| - try: |
| - conn = self.engine |
| - if self.__broken_sqlite: # see bug #1810 |
| - conn.execute("select * from sqlite_master") |
| - rv = callable(conn, *args, **kwargs) |
| - assert not isinstance(rv, sa.engine.ResultProxy), \ |
| - "do not return ResultProxy objects!" |
| - reactor.callFromThread(d.callback, rv) |
| - except: |
| - reactor.callFromThread(d.errback, failure.Failure()) |
| - self.callInThread(thd) |
| - return d |
| if twisted.version < versions.Version('twisted', 8, 2, 0): |
| + def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover |
| + d = defer.Deferred() |
| + def thd(): |
| + try: |
| + reactor.callFromThread(d.callback, |
| + self.__thd(with_engine, callable, args, kwargs)) |
| + except: |
| + reactor.callFromThread(d.errback, |
| + failure.Failure()) |
| + self.callInThread(thd) |
| + return d |
| + |
| + def do_081(self, callable, *args, **kwargs): # pragma: no cover |
| + return self.__081_wrap(False, callable, args, kwargs) |
| + |
| + def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
| + return self.__081_wrap(True, callable, args, kwargs) |
| + |
| do = do_081 |
| do_with_engine = do_with_engine_081 |