Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(158)

Side by Side Diff: third_party/buildbot_8_4p1/buildbot/db/pool.py

Issue 2128953002: Revert of Cherry-pick buildbot 95deef27d7c531ead19e0ac86a9aa1546d4ee7f9: (Closed) Base URL: https://chromium.googlesource.com/chromium/tools/build.git@buildbot-version-2
Patch Set: Created 4 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | third_party/buildbot_8_4p1/buildbot/test/unit/test_db_pool.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 # This file is part of Buildbot. Buildbot is free software: you can 1 # This file is part of Buildbot. Buildbot is free software: you can
2 # redistribute it and/or modify it under the terms of the GNU General Public 2 # redistribute it and/or modify it under the terms of the GNU General Public
3 # License as published by the Free Software Foundation, version 2. 3 # License as published by the Free Software Foundation, version 2.
4 # 4 #
5 # This program is distributed in the hope that it will be useful, but WITHOUT 5 # This program is distributed in the hope that it will be useful, but WITHOUT
6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
8 # details. 8 # details.
9 # 9 #
10 # You should have received a copy of the GNU General Public License along with 10 # You should have received a copy of the GNU General Public License along with
11 # this program; if not, write to the Free Software Foundation, Inc., 51 11 # this program; if not, write to the Free Software Foundation, Inc., 51
12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
13 # 13 #
14 # Copyright Buildbot Team Members 14 # Copyright Buildbot Team Members
15 15
16 import time
17 import os 16 import os
18 import sqlalchemy as sa 17 import sqlalchemy as sa
19 import twisted 18 import twisted
20 from buildbot.process import metrics
21 from twisted.internet import reactor, threads, defer 19 from twisted.internet import reactor, threads, defer
22 from twisted.python import threadpool, failure, versions, log 20 from twisted.python import threadpool, failure, versions, log
23 21
24 22
25 class DBThreadPool(threadpool.ThreadPool): 23 class DBThreadPool(threadpool.ThreadPool):
26 24
27 running = False 25 running = False
28 26
29 # Some versions of SQLite incorrectly cache metadata about which tables are 27 # Some versions of SQLite incorrectly cache metadata about which tables are
30 # and are not present on a per-connection basis. This cache can be flushed 28 # and are not present on a per-connection basis. This cache can be flushed
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
82 80
83 def shutdown(self): 81 def shutdown(self):
84 """Manually stop the pool. This is only necessary from tests, as the 82 """Manually stop the pool. This is only necessary from tests, as the
85 pool will stop itself when the reactor stops under normal 83 pool will stop itself when the reactor stops under normal
86 circumstances.""" 84 circumstances."""
87 if not self._stop_evt: 85 if not self._stop_evt:
88 return # pool is already stopped 86 return # pool is already stopped
89 reactor.removeSystemEventTrigger(self._stop_evt) 87 reactor.removeSystemEventTrigger(self._stop_evt)
90 self._stop() 88 self._stop()
91 89
92 # Try about 170 times over the space of a day, with the last few tries 90 def do(self, callable, *args, **kwargs):
93 # being about an hour apart. This is designed to span a reasonable amount 91 def thd():
94 # of time for repairing a broken database server, while still failing 92 conn = self.engine.contextual_connect()
95 # actual problematic queries eventually
96 BACKOFF_START = 1.0
97 BACKOFF_MULT = 1.05
98 MAX_OPERATIONALERROR_TIME = 3600*24 # one day
99 def __thd(self, with_engine, callable, args, kwargs):
100 # try to call callable(arg, *args, **kwargs) repeatedly until no
101 # OperationalErrors occur, where arg is either the engine (with_engine)
102 # or a connection (not with_engine)
103 backoff = self.BACKOFF_START
104 start = time.time()
105 while True:
106 if with_engine:
107 arg = self.engine
108 else:
109 arg = self.engine.contextual_connect()
110
111 if self.__broken_sqlite: # see bug #1810 93 if self.__broken_sqlite: # see bug #1810
112 arg.execute("select * from sqlite_master") 94 conn.execute("select * from sqlite_master")
113 try: 95 try:
114 rv = callable(arg, *args, **kwargs) 96 rv = callable(conn, *args, **kwargs)
115 assert not isinstance(rv, sa.engine.ResultProxy), \ 97 assert not isinstance(rv, sa.engine.ResultProxy), \
116 "do not return ResultProxy objects!" 98 "do not return ResultProxy objects!"
117 except sa.exc.OperationalError, e:
118 text = e.orig.args[0]
119 if "Lost connection" in text \
120 or "database is locked" in text:
121
122 # see if we've retried too much
123 elapsed = time.time() - start
124 if elapsed > self.MAX_OPERATIONALERROR_TIME:
125 raise
126
127 metrics.MetricCountEvent.log(
128 "DBThreadPool.retry-on-OperationalError")
129 log.msg("automatically retrying query after "
130 "OperationalError (%ss sleep)" % backoff)
131
132 # sleep (remember, we're in a thread..)
133 time.sleep(backoff)
134 backoff *= self.BACKOFF_MULT
135
136 # and re-try
137 continue
138 else:
139 raise
140 finally: 99 finally:
141 if not with_engine: 100 conn.close()
142 arg.close() 101 return rv
143 break 102 return threads.deferToThreadPool(reactor, self, thd)
144 return rv
145
146 def do(self, callable, *args, **kwargs):
147 return threads.deferToThreadPool(reactor, self,
148 self.__thd, False, callable, args, kwargs)
149 103
150 def do_with_engine(self, callable, *args, **kwargs): 104 def do_with_engine(self, callable, *args, **kwargs):
151 return threads.deferToThreadPool(reactor, self, 105 def thd():
152 self.__thd, True, callable, args, kwargs) 106 if self.__broken_sqlite: # see bug #1810
107 self.engine.execute("select * from sqlite_master")
108 rv = callable(self.engine, *args, **kwargs)
109 assert not isinstance(rv, sa.engine.ResultProxy), \
110 "do not return ResultProxy objects!"
111 return rv
112 return threads.deferToThreadPool(reactor, self, thd)
153 113
154 # older implementations for twisted < 0.8.2, which does not have 114 # older implementations for twisted < 0.8.2, which does not have
155 # deferToThreadPool; this basically re-implements it, although it gets some 115 # deferToThreadPool; this basically re-implements it, although it gets some
156 # of the synchronization wrong - the thread may still be "in use" when the 116 # of the synchronization wrong - the thread may still be "in use" when the
157 # deferred fires in the parent, which can lead to database accesses hopping 117 # deferred fires in the parent, which can lead to database accesses hopping
158 # between threads. In practice, this should not cause any difficulty. 118 # between threads. In practice, this should not cause any difficulty.
119 def do_081(self, callable, *args, **kwargs): # pragma: no cover
120 d = defer.Deferred()
121 def thd():
122 try:
123 conn = self.engine.contextual_connect()
124 if self.__broken_sqlite: # see bug #1810
125 conn.execute("select * from sqlite_master")
126 try:
127 rv = callable(conn, *args, **kwargs)
128 assert not isinstance(rv, sa.engine.ResultProxy), \
129 "do not return ResultProxy objects!"
130 finally:
131 conn.close()
132 reactor.callFromThread(d.callback, rv)
133 except:
134 reactor.callFromThread(d.errback, failure.Failure())
135 self.callInThread(thd)
136 return d
137
138 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover
139 d = defer.Deferred()
140 def thd():
141 try:
142 conn = self.engine
143 if self.__broken_sqlite: # see bug #1810
144 conn.execute("select * from sqlite_master")
145 rv = callable(conn, *args, **kwargs)
146 assert not isinstance(rv, sa.engine.ResultProxy), \
147 "do not return ResultProxy objects!"
148 reactor.callFromThread(d.callback, rv)
149 except:
150 reactor.callFromThread(d.errback, failure.Failure())
151 self.callInThread(thd)
152 return d
159 if twisted.version < versions.Version('twisted', 8, 2, 0): 153 if twisted.version < versions.Version('twisted', 8, 2, 0):
160 def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover
161 d = defer.Deferred()
162 def thd():
163 try:
164 reactor.callFromThread(d.callback,
165 self.__thd(with_engine, callable, args, kwargs))
166 except:
167 reactor.callFromThread(d.errback,
168 failure.Failure())
169 self.callInThread(thd)
170 return d
171
172 def do_081(self, callable, *args, **kwargs): # pragma: no cover
173 return self.__081_wrap(False, callable, args, kwargs)
174
175 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no co ver
176 return self.__081_wrap(True, callable, args, kwargs)
177
178 do = do_081 154 do = do_081
179 do_with_engine = do_with_engine_081 155 do_with_engine = do_with_engine_081
180 156
181 def detect_bug1810(self): 157 def detect_bug1810(self):
182 # detect buggy SQLite implementations; call only for a known-sqlite 158 # detect buggy SQLite implementations; call only for a known-sqlite
183 # dialect 159 # dialect
184 try: 160 try:
185 import pysqlite2.dbapi2 as sqlite 161 import pysqlite2.dbapi2 as sqlite
186 sqlite = sqlite 162 sqlite = sqlite
187 except ImportError: 163 except ImportError:
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
227 except: 203 except:
228 return (0,) 204 return (0,)
229 205
230 if vers_row: 206 if vers_row:
231 try: 207 try:
232 return tuple(map(int, vers_row[0].split('.'))) 208 return tuple(map(int, vers_row[0].split('.')))
233 except (TypeError, ValueError): 209 except (TypeError, ValueError):
234 return (0,) 210 return (0,)
235 else: 211 else:
236 return (0,) 212 return (0,)
OLDNEW
« no previous file with comments | « no previous file | third_party/buildbot_8_4p1/buildbot/test/unit/test_db_pool.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698