| Index: third_party/buildbot_8_4p1/buildbot/db/pool.py
|
| diff --git a/third_party/buildbot_8_4p1/buildbot/db/pool.py b/third_party/buildbot_8_4p1/buildbot/db/pool.py
|
| index fe3540ad68b91faa9c4fc688bf6f9e4a5d5e7002..258f036be8d90fbabe5d1c9d1bf42724bea0f99a 100644
|
| --- a/third_party/buildbot_8_4p1/buildbot/db/pool.py
|
| +++ b/third_party/buildbot_8_4p1/buildbot/db/pool.py
|
| @@ -13,9 +13,11 @@
|
| #
|
| # Copyright Buildbot Team Members
|
|
|
| +import time
|
| import os
|
| import sqlalchemy as sa
|
| import twisted
|
| +from buildbot.process import metrics
|
| from twisted.internet import reactor, threads, defer
|
| from twisted.python import threadpool, failure, versions, log
|
|
|
| @@ -87,70 +89,92 @@ class DBThreadPool(threadpool.ThreadPool):
|
| reactor.removeSystemEventTrigger(self._stop_evt)
|
| self._stop()
|
|
|
| - def do(self, callable, *args, **kwargs):
|
| - def thd():
|
| - conn = self.engine.contextual_connect()
|
| + # Try about 170 times over the space of a day, with the last few tries
|
| + # being about an hour apart. This is designed to span a reasonable amount
|
| + # of time for repairing a broken database server, while still failing
|
| + # actual problematic queries eventually
|
| + BACKOFF_START = 1.0
|
| + BACKOFF_MULT = 1.05
|
| + MAX_OPERATIONALERROR_TIME = 3600*24 # one day
|
| + def __thd(self, with_engine, callable, args, kwargs):
|
| + # try to call callable(arg, *args, **kwargs) repeatedly until no
|
| + # OperationalErrors occur, where arg is either the engine (with_engine)
|
| + # or a connection (not with_engine)
|
| + backoff = self.BACKOFF_START
|
| + start = time.time()
|
| + while True:
|
| + if with_engine:
|
| + arg = self.engine
|
| + else:
|
| + arg = self.engine.contextual_connect()
|
| +
|
| if self.__broken_sqlite: # see bug #1810
|
| - conn.execute("select * from sqlite_master")
|
| + arg.execute("select * from sqlite_master")
|
| try:
|
| - rv = callable(conn, *args, **kwargs)
|
| + rv = callable(arg, *args, **kwargs)
|
| assert not isinstance(rv, sa.engine.ResultProxy), \
|
| "do not return ResultProxy objects!"
|
| + except sa.exc.OperationalError, e:
|
| + text = e.orig.args[0]
|
| + if "Lost connection" in text \
|
| + or "database is locked" in text:
|
| +
|
| + # see if we've retried too much
|
| + elapsed = time.time() - start
|
| + if elapsed > self.MAX_OPERATIONALERROR_TIME:
|
| + raise
|
| +
|
| + metrics.MetricCountEvent.log(
|
| + "DBThreadPool.retry-on-OperationalError")
|
| + log.msg("automatically retrying query after "
|
| + "OperationalError (%ss sleep)" % backoff)
|
| +
|
| + # sleep (remember, we're in a thread..)
|
| + time.sleep(backoff)
|
| + backoff *= self.BACKOFF_MULT
|
| +
|
| + # and re-try
|
| + continue
|
| + else:
|
| + raise
|
| finally:
|
| - conn.close()
|
| - return rv
|
| - return threads.deferToThreadPool(reactor, self, thd)
|
| + if not with_engine:
|
| + arg.close()
|
| + break
|
| + return rv
|
| +
|
| + def do(self, callable, *args, **kwargs):
|
| + return threads.deferToThreadPool(reactor, self,
|
| + self.__thd, False, callable, args, kwargs)
|
|
|
| def do_with_engine(self, callable, *args, **kwargs):
|
| - def thd():
|
| - if self.__broken_sqlite: # see bug #1810
|
| - self.engine.execute("select * from sqlite_master")
|
| - rv = callable(self.engine, *args, **kwargs)
|
| - assert not isinstance(rv, sa.engine.ResultProxy), \
|
| - "do not return ResultProxy objects!"
|
| - return rv
|
| - return threads.deferToThreadPool(reactor, self, thd)
|
| + return threads.deferToThreadPool(reactor, self,
|
| + self.__thd, True, callable, args, kwargs)
|
|
|
| # older implementations for twisted < 0.8.2, which does not have
|
| # deferToThreadPool; this basically re-implements it, although it gets some
|
| # of the synchronization wrong - the thread may still be "in use" when the
|
| # deferred fires in the parent, which can lead to database accesses hopping
|
| # between threads. In practice, this should not cause any difficulty.
|
| - def do_081(self, callable, *args, **kwargs): # pragma: no cover
|
| - d = defer.Deferred()
|
| - def thd():
|
| - try:
|
| - conn = self.engine.contextual_connect()
|
| - if self.__broken_sqlite: # see bug #1810
|
| - conn.execute("select * from sqlite_master")
|
| - try:
|
| - rv = callable(conn, *args, **kwargs)
|
| - assert not isinstance(rv, sa.engine.ResultProxy), \
|
| - "do not return ResultProxy objects!"
|
| - finally:
|
| - conn.close()
|
| - reactor.callFromThread(d.callback, rv)
|
| - except:
|
| - reactor.callFromThread(d.errback, failure.Failure())
|
| - self.callInThread(thd)
|
| - return d
|
| -
|
| - def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover
|
| - d = defer.Deferred()
|
| - def thd():
|
| - try:
|
| - conn = self.engine
|
| - if self.__broken_sqlite: # see bug #1810
|
| - conn.execute("select * from sqlite_master")
|
| - rv = callable(conn, *args, **kwargs)
|
| - assert not isinstance(rv, sa.engine.ResultProxy), \
|
| - "do not return ResultProxy objects!"
|
| - reactor.callFromThread(d.callback, rv)
|
| - except:
|
| - reactor.callFromThread(d.errback, failure.Failure())
|
| - self.callInThread(thd)
|
| - return d
|
| if twisted.version < versions.Version('twisted', 8, 2, 0):
|
| + def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover
|
| + d = defer.Deferred()
|
| + def thd():
|
| + try:
|
| + reactor.callFromThread(d.callback,
|
| + self.__thd(with_engine, callable, args, kwargs))
|
| + except:
|
| + reactor.callFromThread(d.errback,
|
| + failure.Failure())
|
| + self.callInThread(thd)
|
| + return d
|
| +
|
| + def do_081(self, callable, *args, **kwargs): # pragma: no cover
|
| + return self.__081_wrap(False, callable, args, kwargs)
|
| +
|
| + def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover
|
| + return self.__081_wrap(True, callable, args, kwargs)
|
| +
|
| do = do_081
|
| do_with_engine = do_with_engine_081
|
|
|
|
|