Index: third_party/buildbot_8_4p1/buildbot/db/pool.py |
diff --git a/third_party/buildbot_8_4p1/buildbot/db/pool.py b/third_party/buildbot_8_4p1/buildbot/db/pool.py |
index fe3540ad68b91faa9c4fc688bf6f9e4a5d5e7002..258f036be8d90fbabe5d1c9d1bf42724bea0f99a 100644 |
--- a/third_party/buildbot_8_4p1/buildbot/db/pool.py |
+++ b/third_party/buildbot_8_4p1/buildbot/db/pool.py |
@@ -13,9 +13,11 @@ |
# |
# Copyright Buildbot Team Members |
+import time |
import os |
import sqlalchemy as sa |
import twisted |
+from buildbot.process import metrics |
from twisted.internet import reactor, threads, defer |
from twisted.python import threadpool, failure, versions, log |
@@ -87,70 +89,92 @@ class DBThreadPool(threadpool.ThreadPool): |
reactor.removeSystemEventTrigger(self._stop_evt) |
self._stop() |
- def do(self, callable, *args, **kwargs): |
- def thd(): |
- conn = self.engine.contextual_connect() |
+ # Try about 170 times over the space of a day, with the last few tries |
ghost stip (do not use)
2016/07/01 01:30:16
uh
|
+ # being about an hour apart. This is designed to span a reasonable amount |
+ # of time for repairing a broken database server, while still failing |
+ # actual problematic queries eventually |
+ BACKOFF_START = 1.0 |
+ BACKOFF_MULT = 1.05 |
+ MAX_OPERATIONALERROR_TIME = 3600*24 # one day |
+ def __thd(self, with_engine, callable, args, kwargs): |
+ # try to call callable(arg, *args, **kwargs) repeatedly until no |
+ # OperationalErrors occur, where arg is either the engine (with_engine) |
+ # or a connection (not with_engine) |
+ backoff = self.BACKOFF_START |
+ start = time.time() |
+ while True: |
+ if with_engine: |
+ arg = self.engine |
+ else: |
+ arg = self.engine.contextual_connect() |
+ |
if self.__broken_sqlite: # see bug #1810 |
- conn.execute("select * from sqlite_master") |
+ arg.execute("select * from sqlite_master") |
try: |
- rv = callable(conn, *args, **kwargs) |
+ rv = callable(arg, *args, **kwargs) |
assert not isinstance(rv, sa.engine.ResultProxy), \ |
"do not return ResultProxy objects!" |
+ except sa.exc.OperationalError, e: |
+ text = e.orig.args[0] |
+ if "Lost connection" in text \ |
+ or "database is locked" in text: |
+ |
+ # see if we've retried too much |
+ elapsed = time.time() - start |
+ if elapsed > self.MAX_OPERATIONALERROR_TIME: |
+ raise |
+ |
+ metrics.MetricCountEvent.log( |
+ "DBThreadPool.retry-on-OperationalError") |
+ log.msg("automatically retrying query after " |
+ "OperationalError (%ss sleep)" % backoff) |
+ |
+ # sleep (remember, we're in a thread..) |
+ time.sleep(backoff) |
+ backoff *= self.BACKOFF_MULT |
+ |
+ # and re-try |
+ continue |
+ else: |
+ raise |
finally: |
- conn.close() |
- return rv |
- return threads.deferToThreadPool(reactor, self, thd) |
+ if not with_engine: |
+ arg.close() |
+ break |
+ return rv |
+ |
+ def do(self, callable, *args, **kwargs): |
+ return threads.deferToThreadPool(reactor, self, |
+ self.__thd, False, callable, args, kwargs) |
def do_with_engine(self, callable, *args, **kwargs): |
- def thd(): |
- if self.__broken_sqlite: # see bug #1810 |
- self.engine.execute("select * from sqlite_master") |
- rv = callable(self.engine, *args, **kwargs) |
- assert not isinstance(rv, sa.engine.ResultProxy), \ |
- "do not return ResultProxy objects!" |
- return rv |
- return threads.deferToThreadPool(reactor, self, thd) |
+ return threads.deferToThreadPool(reactor, self, |
+ self.__thd, True, callable, args, kwargs) |
# older implementations for twisted < 0.8.2, which does not have |
# deferToThreadPool; this basically re-implements it, although it gets some |
# of the synchronization wrong - the thread may still be "in use" when the |
# deferred fires in the parent, which can lead to database accesses hopping |
# between threads. In practice, this should not cause any difficulty. |
- def do_081(self, callable, *args, **kwargs): # pragma: no cover |
- d = defer.Deferred() |
- def thd(): |
- try: |
- conn = self.engine.contextual_connect() |
- if self.__broken_sqlite: # see bug #1810 |
- conn.execute("select * from sqlite_master") |
- try: |
- rv = callable(conn, *args, **kwargs) |
- assert not isinstance(rv, sa.engine.ResultProxy), \ |
- "do not return ResultProxy objects!" |
- finally: |
- conn.close() |
- reactor.callFromThread(d.callback, rv) |
- except: |
- reactor.callFromThread(d.errback, failure.Failure()) |
- self.callInThread(thd) |
- return d |
- |
- def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
- d = defer.Deferred() |
- def thd(): |
- try: |
- conn = self.engine |
- if self.__broken_sqlite: # see bug #1810 |
- conn.execute("select * from sqlite_master") |
- rv = callable(conn, *args, **kwargs) |
- assert not isinstance(rv, sa.engine.ResultProxy), \ |
- "do not return ResultProxy objects!" |
- reactor.callFromThread(d.callback, rv) |
- except: |
- reactor.callFromThread(d.errback, failure.Failure()) |
- self.callInThread(thd) |
- return d |
if twisted.version < versions.Version('twisted', 8, 2, 0): |
+ def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover |
+ d = defer.Deferred() |
+ def thd(): |
+ try: |
+ reactor.callFromThread(d.callback, |
+ self.__thd(with_engine, callable, args, kwargs)) |
+ except: |
+ reactor.callFromThread(d.errback, |
+ failure.Failure()) |
+ self.callInThread(thd) |
+ return d |
+ |
+ def do_081(self, callable, *args, **kwargs): # pragma: no cover |
+ return self.__081_wrap(False, callable, args, kwargs) |
+ |
+ def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
+ return self.__081_wrap(True, callable, args, kwargs) |
+ |
do = do_081 |
do_with_engine = do_with_engine_081 |