| Index: third_party/buildbot_8_4p1/buildbot/db/pool.py
|
| diff --git a/third_party/buildbot_8_4p1/buildbot/db/pool.py b/third_party/buildbot_8_4p1/buildbot/db/pool.py
|
| index 258f036be8d90fbabe5d1c9d1bf42724bea0f99a..fe3540ad68b91faa9c4fc688bf6f9e4a5d5e7002 100644
|
| --- a/third_party/buildbot_8_4p1/buildbot/db/pool.py
|
| +++ b/third_party/buildbot_8_4p1/buildbot/db/pool.py
|
| @@ -13,11 +13,9 @@
|
| #
|
| # Copyright Buildbot Team Members
|
|
|
| -import time
|
| import os
|
| import sqlalchemy as sa
|
| import twisted
|
| -from buildbot.process import metrics
|
| from twisted.internet import reactor, threads, defer
|
| from twisted.python import threadpool, failure, versions, log
|
|
|
| @@ -89,92 +87,70 @@
|
| reactor.removeSystemEventTrigger(self._stop_evt)
|
| self._stop()
|
|
|
| - # Try about 170 times over the space of a day, with the last few tries
|
| - # being about an hour apart. This is designed to span a reasonable amount
|
| - # of time for repairing a broken database server, while still failing
|
| - # actual problematic queries eventually
|
| - BACKOFF_START = 1.0
|
| - BACKOFF_MULT = 1.05
|
| - MAX_OPERATIONALERROR_TIME = 3600*24 # one day
|
| - def __thd(self, with_engine, callable, args, kwargs):
|
| - # try to call callable(arg, *args, **kwargs) repeatedly until no
|
| - # OperationalErrors occur, where arg is either the engine (with_engine)
|
| - # or a connection (not with_engine)
|
| - backoff = self.BACKOFF_START
|
| - start = time.time()
|
| - while True:
|
| - if with_engine:
|
| - arg = self.engine
|
| - else:
|
| - arg = self.engine.contextual_connect()
|
| -
|
| + def do(self, callable, *args, **kwargs):
|
| + def thd():
|
| + conn = self.engine.contextual_connect()
|
| if self.__broken_sqlite: # see bug #1810
|
| - arg.execute("select * from sqlite_master")
|
| - try:
|
| - rv = callable(arg, *args, **kwargs)
|
| + conn.execute("select * from sqlite_master")
|
| + try:
|
| + rv = callable(conn, *args, **kwargs)
|
| assert not isinstance(rv, sa.engine.ResultProxy), \
|
| "do not return ResultProxy objects!"
|
| - except sa.exc.OperationalError, e:
|
| - text = e.orig.args[0]
|
| - if "Lost connection" in text \
|
| - or "database is locked" in text:
|
| -
|
| - # see if we've retried too much
|
| - elapsed = time.time() - start
|
| - if elapsed > self.MAX_OPERATIONALERROR_TIME:
|
| - raise
|
| -
|
| - metrics.MetricCountEvent.log(
|
| - "DBThreadPool.retry-on-OperationalError")
|
| - log.msg("automatically retrying query after "
|
| - "OperationalError (%ss sleep)" % backoff)
|
| -
|
| - # sleep (remember, we're in a thread..)
|
| - time.sleep(backoff)
|
| - backoff *= self.BACKOFF_MULT
|
| -
|
| - # and re-try
|
| - continue
|
| - else:
|
| - raise
|
| finally:
|
| - if not with_engine:
|
| - arg.close()
|
| - break
|
| - return rv
|
| -
|
| - def do(self, callable, *args, **kwargs):
|
| - return threads.deferToThreadPool(reactor, self,
|
| - self.__thd, False, callable, args, kwargs)
|
| + conn.close()
|
| + return rv
|
| + return threads.deferToThreadPool(reactor, self, thd)
|
|
|
| def do_with_engine(self, callable, *args, **kwargs):
|
| - return threads.deferToThreadPool(reactor, self,
|
| - self.__thd, True, callable, args, kwargs)
|
| + def thd():
|
| + if self.__broken_sqlite: # see bug #1810
|
| + self.engine.execute("select * from sqlite_master")
|
| + rv = callable(self.engine, *args, **kwargs)
|
| + assert not isinstance(rv, sa.engine.ResultProxy), \
|
| + "do not return ResultProxy objects!"
|
| + return rv
|
| + return threads.deferToThreadPool(reactor, self, thd)
|
|
|
| # older implementations for twisted < 0.8.2, which does not have
|
| # deferToThreadPool; this basically re-implements it, although it gets some
|
| # of the synchronization wrong - the thread may still be "in use" when the
|
| # deferred fires in the parent, which can lead to database accesses hopping
|
| # between threads. In practice, this should not cause any difficulty.
|
| + def do_081(self, callable, *args, **kwargs): # pragma: no cover
|
| + d = defer.Deferred()
|
| + def thd():
|
| + try:
|
| + conn = self.engine.contextual_connect()
|
| + if self.__broken_sqlite: # see bug #1810
|
| + conn.execute("select * from sqlite_master")
|
| + try:
|
| + rv = callable(conn, *args, **kwargs)
|
| + assert not isinstance(rv, sa.engine.ResultProxy), \
|
| + "do not return ResultProxy objects!"
|
| + finally:
|
| + conn.close()
|
| + reactor.callFromThread(d.callback, rv)
|
| + except:
|
| + reactor.callFromThread(d.errback, failure.Failure())
|
| + self.callInThread(thd)
|
| + return d
|
| +
|
| + def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover
|
| + d = defer.Deferred()
|
| + def thd():
|
| + try:
|
| + conn = self.engine
|
| + if self.__broken_sqlite: # see bug #1810
|
| + conn.execute("select * from sqlite_master")
|
| + rv = callable(conn, *args, **kwargs)
|
| + assert not isinstance(rv, sa.engine.ResultProxy), \
|
| + "do not return ResultProxy objects!"
|
| + reactor.callFromThread(d.callback, rv)
|
| + except:
|
| + reactor.callFromThread(d.errback, failure.Failure())
|
| + self.callInThread(thd)
|
| + return d
|
| if twisted.version < versions.Version('twisted', 8, 2, 0):
|
| - def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover
|
| - d = defer.Deferred()
|
| - def thd():
|
| - try:
|
| - reactor.callFromThread(d.callback,
|
| - self.__thd(with_engine, callable, args, kwargs))
|
| - except:
|
| - reactor.callFromThread(d.errback,
|
| - failure.Failure())
|
| - self.callInThread(thd)
|
| - return d
|
| -
|
| - def do_081(self, callable, *args, **kwargs): # pragma: no cover
|
| - return self.__081_wrap(False, callable, args, kwargs)
|
| -
|
| - def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover
|
| - return self.__081_wrap(True, callable, args, kwargs)
|
| -
|
| do = do_081
|
| do_with_engine = do_with_engine_081
|
|
|
|
|