OLD | NEW |
---|---|
1 # This file is part of Buildbot. Buildbot is free software: you can | 1 # This file is part of Buildbot. Buildbot is free software: you can |
2 # redistribute it and/or modify it under the terms of the GNU General Public | 2 # redistribute it and/or modify it under the terms of the GNU General Public |
3 # License as published by the Free Software Foundation, version 2. | 3 # License as published by the Free Software Foundation, version 2. |
4 # | 4 # |
5 # This program is distributed in the hope that it will be useful, but WITHOUT | 5 # This program is distributed in the hope that it will be useful, but WITHOUT |
6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
8 # details. | 8 # details. |
9 # | 9 # |
10 # You should have received a copy of the GNU General Public License along with | 10 # You should have received a copy of the GNU General Public License along with |
11 # this program; if not, write to the Free Software Foundation, Inc., 51 | 11 # this program; if not, write to the Free Software Foundation, Inc., 51 |
12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
13 # | 13 # |
14 # Copyright Buildbot Team Members | 14 # Copyright Buildbot Team Members |
15 | 15 |
16 import time | |
16 import os | 17 import os |
17 import sqlalchemy as sa | 18 import sqlalchemy as sa |
18 import twisted | 19 import twisted |
20 from buildbot.process import metrics | |
19 from twisted.internet import reactor, threads, defer | 21 from twisted.internet import reactor, threads, defer |
20 from twisted.python import threadpool, failure, versions, log | 22 from twisted.python import threadpool, failure, versions, log |
21 | 23 |
22 | 24 |
23 class DBThreadPool(threadpool.ThreadPool): | 25 class DBThreadPool(threadpool.ThreadPool): |
24 | 26 |
25 running = False | 27 running = False |
26 | 28 |
27 # Some versions of SQLite incorrectly cache metadata about which tables are | 29 # Some versions of SQLite incorrectly cache metadata about which tables are |
28 # and are not present on a per-connection basis. This cache can be flushed | 30 # and are not present on a per-connection basis. This cache can be flushed |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
80 | 82 |
81 def shutdown(self): | 83 def shutdown(self): |
82 """Manually stop the pool. This is only necessary from tests, as the | 84 """Manually stop the pool. This is only necessary from tests, as the |
83 pool will stop itself when the reactor stops under normal | 85 pool will stop itself when the reactor stops under normal |
84 circumstances.""" | 86 circumstances.""" |
85 if not self._stop_evt: | 87 if not self._stop_evt: |
86 return # pool is already stopped | 88 return # pool is already stopped |
87 reactor.removeSystemEventTrigger(self._stop_evt) | 89 reactor.removeSystemEventTrigger(self._stop_evt) |
88 self._stop() | 90 self._stop() |
89 | 91 |
90 def do(self, callable, *args, **kwargs): | 92 # Try about 170 times over the space of a day, with the last few tries |
ghost stip (do not use)
2016/07/01 01:30:16
uh
| |
91 def thd(): | 93 # being about an hour apart. This is designed to span a reasonable amount |
92 conn = self.engine.contextual_connect() | 94 # of time for repairing a broken database server, while still failing |
95 # actual problematic queries eventually | |
96 BACKOFF_START = 1.0 | |
97 BACKOFF_MULT = 1.05 | |
98 MAX_OPERATIONALERROR_TIME = 3600*24 # one day | |
99 def __thd(self, with_engine, callable, args, kwargs): | |
100 # try to call callable(arg, *args, **kwargs) repeatedly until no | |
101 # OperationalErrors occur, where arg is either the engine (with_engine) | |
102 # or a connection (not with_engine) | |
103 backoff = self.BACKOFF_START | |
104 start = time.time() | |
105 while True: | |
106 if with_engine: | |
107 arg = self.engine | |
108 else: | |
109 arg = self.engine.contextual_connect() | |
110 | |
93 if self.__broken_sqlite: # see bug #1810 | 111 if self.__broken_sqlite: # see bug #1810 |
94 conn.execute("select * from sqlite_master") | 112 arg.execute("select * from sqlite_master") |
95 try: | 113 try: |
96 rv = callable(conn, *args, **kwargs) | 114 rv = callable(arg, *args, **kwargs) |
97 assert not isinstance(rv, sa.engine.ResultProxy), \ | 115 assert not isinstance(rv, sa.engine.ResultProxy), \ |
98 "do not return ResultProxy objects!" | 116 "do not return ResultProxy objects!" |
117 except sa.exc.OperationalError, e: | |
118 text = e.orig.args[0] | |
119 if "Lost connection" in text \ | |
120 or "database is locked" in text: | |
121 | |
122 # see if we've retried too much | |
123 elapsed = time.time() - start | |
124 if elapsed > self.MAX_OPERATIONALERROR_TIME: | |
125 raise | |
126 | |
127 metrics.MetricCountEvent.log( | |
128 "DBThreadPool.retry-on-OperationalError") | |
129 log.msg("automatically retrying query after " | |
130 "OperationalError (%ss sleep)" % backoff) | |
131 | |
132 # sleep (remember, we're in a thread..) | |
133 time.sleep(backoff) | |
134 backoff *= self.BACKOFF_MULT | |
135 | |
136 # and re-try | |
137 continue | |
138 else: | |
139 raise | |
99 finally: | 140 finally: |
100 conn.close() | 141 if not with_engine: |
101 return rv | 142 arg.close() |
102 return threads.deferToThreadPool(reactor, self, thd) | 143 break |
144 return rv | |
145 | |
146 def do(self, callable, *args, **kwargs): | |
147 return threads.deferToThreadPool(reactor, self, | |
148 self.__thd, False, callable, args, kwargs) | |
103 | 149 |
104 def do_with_engine(self, callable, *args, **kwargs): | 150 def do_with_engine(self, callable, *args, **kwargs): |
105 def thd(): | 151 return threads.deferToThreadPool(reactor, self, |
106 if self.__broken_sqlite: # see bug #1810 | 152 self.__thd, True, callable, args, kwargs) |
107 self.engine.execute("select * from sqlite_master") | |
108 rv = callable(self.engine, *args, **kwargs) | |
109 assert not isinstance(rv, sa.engine.ResultProxy), \ | |
110 "do not return ResultProxy objects!" | |
111 return rv | |
112 return threads.deferToThreadPool(reactor, self, thd) | |
113 | 153 |
114 # older implementations for twisted < 0.8.2, which does not have | 154 # older implementations for twisted < 0.8.2, which does not have |
115 # deferToThreadPool; this basically re-implements it, although it gets some | 155 # deferToThreadPool; this basically re-implements it, although it gets some |
116 # of the synchronization wrong - the thread may still be "in use" when the | 156 # of the synchronization wrong - the thread may still be "in use" when the |
117 # deferred fires in the parent, which can lead to database accesses hopping | 157 # deferred fires in the parent, which can lead to database accesses hopping |
118 # between threads. In practice, this should not cause any difficulty. | 158 # between threads. In practice, this should not cause any difficulty. |
119 def do_081(self, callable, *args, **kwargs): # pragma: no cover | 159 if twisted.version < versions.Version('twisted', 8, 2, 0): |
120 d = defer.Deferred() | 160 def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no cover |
121 def thd(): | 161 d = defer.Deferred() |
122 try: | 162 def thd(): |
123 conn = self.engine.contextual_connect() | |
124 if self.__broken_sqlite: # see bug #1810 | |
125 conn.execute("select * from sqlite_master") | |
126 try: | 163 try: |
127 rv = callable(conn, *args, **kwargs) | 164 reactor.callFromThread(d.callback, |
128 assert not isinstance(rv, sa.engine.ResultProxy), \ | 165 self.__thd(with_engine, callable, args, kwargs)) |
129 "do not return ResultProxy objects!" | 166 except: |
130 finally: | 167 reactor.callFromThread(d.errback, |
131 conn.close() | 168 failure.Failure()) |
132 reactor.callFromThread(d.callback, rv) | 169 self.callInThread(thd) |
133 except: | 170 return d |
134 reactor.callFromThread(d.errback, failure.Failure()) | |
135 self.callInThread(thd) | |
136 return d | |
137 | 171 |
138 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover | 172 def do_081(self, callable, *args, **kwargs): # pragma: no cover |
139 d = defer.Deferred() | 173 return self.__081_wrap(False, callable, args, kwargs) |
140 def thd(): | 174 |
141 try: | 175 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no co ver |
142 conn = self.engine | 176 return self.__081_wrap(True, callable, args, kwargs) |
143 if self.__broken_sqlite: # see bug #1810 | 177 |
144 conn.execute("select * from sqlite_master") | |
145 rv = callable(conn, *args, **kwargs) | |
146 assert not isinstance(rv, sa.engine.ResultProxy), \ | |
147 "do not return ResultProxy objects!" | |
148 reactor.callFromThread(d.callback, rv) | |
149 except: | |
150 reactor.callFromThread(d.errback, failure.Failure()) | |
151 self.callInThread(thd) | |
152 return d | |
153 if twisted.version < versions.Version('twisted', 8, 2, 0): | |
154 do = do_081 | 178 do = do_081 |
155 do_with_engine = do_with_engine_081 | 179 do_with_engine = do_with_engine_081 |
156 | 180 |
157 def detect_bug1810(self): | 181 def detect_bug1810(self): |
158 # detect buggy SQLite implementations; call only for a known-sqlite | 182 # detect buggy SQLite implementations; call only for a known-sqlite |
159 # dialect | 183 # dialect |
160 try: | 184 try: |
161 import pysqlite2.dbapi2 as sqlite | 185 import pysqlite2.dbapi2 as sqlite |
162 sqlite = sqlite | 186 sqlite = sqlite |
163 except ImportError: | 187 except ImportError: |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
203 except: | 227 except: |
204 return (0,) | 228 return (0,) |
205 | 229 |
206 if vers_row: | 230 if vers_row: |
207 try: | 231 try: |
208 return tuple(map(int, vers_row[0].split('.'))) | 232 return tuple(map(int, vers_row[0].split('.'))) |
209 except (TypeError, ValueError): | 233 except (TypeError, ValueError): |
210 return (0,) | 234 return (0,) |
211 else: | 235 else: |
212 return (0,) | 236 return (0,) |
OLD | NEW |