Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(54)

Side by Side Diff: third_party/buildbot_8_4p1/buildbot/db/pool.py

Issue 2103053002: Cherry-pick buildbot 95deef27d7c531ead19e0ac86a9aa1546d4ee7f9: (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/build.git@buildbot-version-2
Patch Set: Disable the test again Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | third_party/buildbot_8_4p1/buildbot/test/unit/test_db_pool.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # This file is part of Buildbot. Buildbot is free software: you can 1 # This file is part of Buildbot. Buildbot is free software: you can
2 # redistribute it and/or modify it under the terms of the GNU General Public 2 # redistribute it and/or modify it under the terms of the GNU General Public
3 # License as published by the Free Software Foundation, version 2. 3 # License as published by the Free Software Foundation, version 2.
4 # 4 #
5 # This program is distributed in the hope that it will be useful, but WITHOUT 5 # This program is distributed in the hope that it will be useful, but WITHOUT
6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
8 # details. 8 # details.
9 # 9 #
10 # You should have received a copy of the GNU General Public License along with 10 # You should have received a copy of the GNU General Public License along with
11 # this program; if not, write to the Free Software Foundation, Inc., 51 11 # this program; if not, write to the Free Software Foundation, Inc., 51
12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13 # 13 #
14 # Copyright Buildbot Team Members 14 # Copyright Buildbot Team Members
15 15
16 import time
16 import os 17 import os
17 import sqlalchemy as sa 18 import sqlalchemy as sa
18 import twisted 19 import twisted
20 from buildbot.process import metrics
19 from twisted.internet import reactor, threads, defer 21 from twisted.internet import reactor, threads, defer
20 from twisted.python import threadpool, failure, versions, log 22 from twisted.python import threadpool, failure, versions, log
21 23
22 24
23 class DBThreadPool(threadpool.ThreadPool): 25 class DBThreadPool(threadpool.ThreadPool):
24 26
25 running = False 27 running = False
26 28
27 # Some versions of SQLite incorrectly cache metadata about which tables are 29 # Some versions of SQLite incorrectly cache metadata about which tables are
28 # and are not present on a per-connection basis. This cache can be flushed 30 # and are not present on a per-connection basis. This cache can be flushed
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
80 82
81 def shutdown(self): 83 def shutdown(self):
82 """Manually stop the pool. This is only necessary from tests, as the 84 """Manually stop the pool. This is only necessary from tests, as the
83 pool will stop itself when the reactor stops under normal 85 pool will stop itself when the reactor stops under normal
84 circumstances.""" 86 circumstances."""
85 if not self._stop_evt: 87 if not self._stop_evt:
86 return # pool is already stopped 88 return # pool is already stopped
87 reactor.removeSystemEventTrigger(self._stop_evt) 89 reactor.removeSystemEventTrigger(self._stop_evt)
88 self._stop() 90 self._stop()
89 91
90 def do(self, callable, *args, **kwargs): 92 # Try about 170 times over the space of a day, with the last few tries
ghost stip (do not use) 2016/07/01 01:30:16 uh
91 def thd(): 93 # being about an hour apart. This is designed to span a reasonable amount
92 conn = self.engine.contextual_connect() 94 # of time for repairing a broken database server, while still failing
95 # actual problematic queries eventually
96 BACKOFF_START = 1.0
97 BACKOFF_MULT = 1.05
98 MAX_OPERATIONALERROR_TIME = 3600*24 # one day
99 def __thd(self, with_engine, callable, args, kwargs):
100 # try to call callable(arg, *args, **kwargs) repeatedly until no
101 # OperationalErrors occur, where arg is either the engine (with_engine)
102 # or a connection (not with_engine)
103 backoff = self.BACKOFF_START
104 start = time.time()
105 while True:
106 if with_engine:
107 arg = self.engine
108 else:
109 arg = self.engine.contextual_connect()
110
93 if self.__broken_sqlite: # see bug #1810 111 if self.__broken_sqlite: # see bug #1810
94 conn.execute("select * from sqlite_master") 112 arg.execute("select * from sqlite_master")
95 try: 113 try:
96 rv = callable(conn, *args, **kwargs) 114 rv = callable(arg, *args, **kwargs)
97 assert not isinstance(rv, sa.engine.ResultProxy), \ 115 assert not isinstance(rv, sa.engine.ResultProxy), \
98 "do not return ResultProxy objects!" 116 "do not return ResultProxy objects!"
117 except sa.exc.OperationalError, e:
118 text = e.orig.args[0]
119 if "Lost connection" in text \
120 or "database is locked" in text:
121
122 # see if we've retried too much
123 elapsed = time.time() - start
124 if elapsed > self.MAX_OPERATIONALERROR_TIME:
125 raise
126
127 metrics.MetricCountEvent.log(
128 "DBThreadPool.retry-on-OperationalError")
129 log.msg("automatically retrying query after "
130 "OperationalError (%ss sleep)" % backoff)
131
132 # sleep (remember, we're in a thread..)
133 time.sleep(backoff)
134 backoff *= self.BACKOFF_MULT
135
136 # and re-try
137 continue
138 else:
139 raise
99 finally: 140 finally:
100 conn.close() 141 if not with_engine:
101 return rv 142 arg.close()
102 return threads.deferToThreadPool(reactor, self, thd) 143 break
144 return rv
145
146 def do(self, callable, *args, **kwargs):
147 return threads.deferToThreadPool(reactor, self,
148 self.__thd, False, callable, args, kwargs)
103 149
104 def do_with_engine(self, callable, *args, **kwargs): 150 def do_with_engine(self, callable, *args, **kwargs):
105 def thd(): 151 return threads.deferToThreadPool(reactor, self,
106 if self.__broken_sqlite: # see bug #1810 152 self.__thd, True, callable, args, kwargs)
107 self.engine.execute("select * from sqlite_master")
108 rv = callable(self.engine, *args, **kwargs)
109 assert not isinstance(rv, sa.engine.ResultProxy), \
110 "do not return ResultProxy objects!"
111 return rv
112 return threads.deferToThreadPool(reactor, self, thd)
113 153
114 # older implementations for twisted < 0.8.2, which does not have 154 # older implementations for twisted < 0.8.2, which does not have
115 # deferToThreadPool; this basically re-implements it, although it gets some 155 # deferToThreadPool; this basically re-implements it, although it gets some
116 # of the synchronization wrong - the thread may still be "in use" when the 156 # of the synchronization wrong - the thread may still be "in use" when the
117 # deferred fires in the parent, which can lead to database accesses hopping 157 # deferred fires in the parent, which can lead to database accesses hopping
118 # between threads. In practice, this should not cause any difficulty. 158 # between threads. In practice, this should not cause any difficulty.
119 def do_081(self, callable, *args, **kwargs): # pragma: no cover 159 if twisted.version < versions.Version('twisted', 8, 2, 0):
120 d = defer.Deferred() 160 def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover
121 def thd(): 161 d = defer.Deferred()
122 try: 162 def thd():
123 conn = self.engine.contextual_connect()
124 if self.__broken_sqlite: # see bug #1810
125 conn.execute("select * from sqlite_master")
126 try: 163 try:
127 rv = callable(conn, *args, **kwargs) 164 reactor.callFromThread(d.callback,
128 assert not isinstance(rv, sa.engine.ResultProxy), \ 165 self.__thd(with_engine, callable, args, kwargs))
129 "do not return ResultProxy objects!" 166 except:
130 finally: 167 reactor.callFromThread(d.errback,
131 conn.close() 168 failure.Failure())
132 reactor.callFromThread(d.callback, rv) 169 self.callInThread(thd)
133 except: 170 return d
134 reactor.callFromThread(d.errback, failure.Failure())
135 self.callInThread(thd)
136 return d
137 171
138 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover 172 def do_081(self, callable, *args, **kwargs): # pragma: no cover
139 d = defer.Deferred() 173 return self.__081_wrap(False, callable, args, kwargs)
140 def thd(): 174
141 try: 175 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no co ver
142 conn = self.engine 176 return self.__081_wrap(True, callable, args, kwargs)
143 if self.__broken_sqlite: # see bug #1810 177
144 conn.execute("select * from sqlite_master")
145 rv = callable(conn, *args, **kwargs)
146 assert not isinstance(rv, sa.engine.ResultProxy), \
147 "do not return ResultProxy objects!"
148 reactor.callFromThread(d.callback, rv)
149 except:
150 reactor.callFromThread(d.errback, failure.Failure())
151 self.callInThread(thd)
152 return d
153 if twisted.version < versions.Version('twisted', 8, 2, 0):
154 do = do_081 178 do = do_081
155 do_with_engine = do_with_engine_081 179 do_with_engine = do_with_engine_081
156 180
157 def detect_bug1810(self): 181 def detect_bug1810(self):
158 # detect buggy SQLite implementations; call only for a known-sqlite 182 # detect buggy SQLite implementations; call only for a known-sqlite
159 # dialect 183 # dialect
160 try: 184 try:
161 import pysqlite2.dbapi2 as sqlite 185 import pysqlite2.dbapi2 as sqlite
162 sqlite = sqlite 186 sqlite = sqlite
163 except ImportError: 187 except ImportError:
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
203 except: 227 except:
204 return (0,) 228 return (0,)
205 229
206 if vers_row: 230 if vers_row:
207 try: 231 try:
208 return tuple(map(int, vers_row[0].split('.'))) 232 return tuple(map(int, vers_row[0].split('.')))
209 except (TypeError, ValueError): 233 except (TypeError, ValueError):
210 return (0,) 234 return (0,)
211 else: 235 else:
212 return (0,) 236 return (0,)
OLDNEW
« no previous file with comments | « no previous file | third_party/buildbot_8_4p1/buildbot/test/unit/test_db_pool.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698