Index: third_party/buildbot_8_4p1/buildbot/db/pool.py |
diff --git a/third_party/buildbot_8_4p1/buildbot/db/pool.py b/third_party/buildbot_8_4p1/buildbot/db/pool.py |
index 258f036be8d90fbabe5d1c9d1bf42724bea0f99a..fe3540ad68b91faa9c4fc688bf6f9e4a5d5e7002 100644 |
--- a/third_party/buildbot_8_4p1/buildbot/db/pool.py |
+++ b/third_party/buildbot_8_4p1/buildbot/db/pool.py |
@@ -13,11 +13,9 @@ |
# |
# Copyright Buildbot Team Members |
-import time |
import os |
import sqlalchemy as sa |
import twisted |
-from buildbot.process import metrics |
from twisted.internet import reactor, threads, defer |
from twisted.python import threadpool, failure, versions, log |
@@ -89,92 +87,70 @@ |
reactor.removeSystemEventTrigger(self._stop_evt) |
self._stop() |
- # Try about 170 times over the space of a day, with the last few tries |
- # being about an hour apart. This is designed to span a reasonable amount |
- # of time for repairing a broken database server, while still failing |
- # actual problematic queries eventually |
- BACKOFF_START = 1.0 |
- BACKOFF_MULT = 1.05 |
- MAX_OPERATIONALERROR_TIME = 3600*24 # one day |
- def __thd(self, with_engine, callable, args, kwargs): |
- # try to call callable(arg, *args, **kwargs) repeatedly until no |
- # OperationalErrors occur, where arg is either the engine (with_engine) |
- # or a connection (not with_engine) |
- backoff = self.BACKOFF_START |
- start = time.time() |
- while True: |
- if with_engine: |
- arg = self.engine |
- else: |
- arg = self.engine.contextual_connect() |
- |
+ def do(self, callable, *args, **kwargs): |
+ def thd(): |
+ conn = self.engine.contextual_connect() |
if self.__broken_sqlite: # see bug #1810 |
- arg.execute("select * from sqlite_master") |
- try: |
- rv = callable(arg, *args, **kwargs) |
+ conn.execute("select * from sqlite_master") |
+ try: |
+ rv = callable(conn, *args, **kwargs) |
assert not isinstance(rv, sa.engine.ResultProxy), \ |
"do not return ResultProxy objects!" |
- except sa.exc.OperationalError, e: |
- text = e.orig.args[0] |
- if "Lost connection" in text \ |
- or "database is locked" in text: |
- |
- # see if we've retried too much |
- elapsed = time.time() - start |
- if elapsed > self.MAX_OPERATIONALERROR_TIME: |
- raise |
- |
- metrics.MetricCountEvent.log( |
- "DBThreadPool.retry-on-OperationalError") |
- log.msg("automatically retrying query after " |
- "OperationalError (%ss sleep)" % backoff) |
- |
- # sleep (remember, we're in a thread..) |
- time.sleep(backoff) |
- backoff *= self.BACKOFF_MULT |
- |
- # and re-try |
- continue |
- else: |
- raise |
finally: |
- if not with_engine: |
- arg.close() |
- break |
- return rv |
- |
- def do(self, callable, *args, **kwargs): |
- return threads.deferToThreadPool(reactor, self, |
- self.__thd, False, callable, args, kwargs) |
+ conn.close() |
+ return rv |
+ return threads.deferToThreadPool(reactor, self, thd) |
def do_with_engine(self, callable, *args, **kwargs): |
- return threads.deferToThreadPool(reactor, self, |
- self.__thd, True, callable, args, kwargs) |
+ def thd(): |
+ if self.__broken_sqlite: # see bug #1810 |
+ self.engine.execute("select * from sqlite_master") |
+ rv = callable(self.engine, *args, **kwargs) |
+ assert not isinstance(rv, sa.engine.ResultProxy), \ |
+ "do not return ResultProxy objects!" |
+ return rv |
+ return threads.deferToThreadPool(reactor, self, thd) |
# older implementations for twisted < 0.8.2, which does not have |
# deferToThreadPool; this basically re-implements it, although it gets some |
# of the synchronization wrong - the thread may still be "in use" when the |
# deferred fires in the parent, which can lead to database accesses hopping |
# between threads. In practice, this should not cause any difficulty. |
+ def do_081(self, callable, *args, **kwargs): # pragma: no cover |
+ d = defer.Deferred() |
+ def thd(): |
+ try: |
+ conn = self.engine.contextual_connect() |
+ if self.__broken_sqlite: # see bug #1810 |
+ conn.execute("select * from sqlite_master") |
+ try: |
+ rv = callable(conn, *args, **kwargs) |
+ assert not isinstance(rv, sa.engine.ResultProxy), \ |
+ "do not return ResultProxy objects!" |
+ finally: |
+ conn.close() |
+ reactor.callFromThread(d.callback, rv) |
+ except: |
+ reactor.callFromThread(d.errback, failure.Failure()) |
+ self.callInThread(thd) |
+ return d |
+ |
+ def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
+ d = defer.Deferred() |
+ def thd(): |
+ try: |
+ conn = self.engine |
+ if self.__broken_sqlite: # see bug #1810 |
+ conn.execute("select * from sqlite_master") |
+ rv = callable(conn, *args, **kwargs) |
+ assert not isinstance(rv, sa.engine.ResultProxy), \ |
+ "do not return ResultProxy objects!" |
+ reactor.callFromThread(d.callback, rv) |
+ except: |
+ reactor.callFromThread(d.errback, failure.Failure()) |
+ self.callInThread(thd) |
+ return d |
if twisted.version < versions.Version('twisted', 8, 2, 0): |
- def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover |
- d = defer.Deferred() |
- def thd(): |
- try: |
- reactor.callFromThread(d.callback, |
- self.__thd(with_engine, callable, args, kwargs)) |
- except: |
- reactor.callFromThread(d.errback, |
- failure.Failure()) |
- self.callInThread(thd) |
- return d |
- |
- def do_081(self, callable, *args, **kwargs): # pragma: no cover |
- return self.__081_wrap(False, callable, args, kwargs) |
- |
- def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
- return self.__081_wrap(True, callable, args, kwargs) |
- |
do = do_081 |
do_with_engine = do_with_engine_081 |