OLD | NEW |
1 # This file is part of Buildbot. Buildbot is free software: you can | 1 # This file is part of Buildbot. Buildbot is free software: you can |
2 # redistribute it and/or modify it under the terms of the GNU General Public | 2 # redistribute it and/or modify it under the terms of the GNU General Public |
3 # License as published by the Free Software Foundation, version 2. | 3 # License as published by the Free Software Foundation, version 2. |
4 # | 4 # |
5 # This program is distributed in the hope that it will be useful, but WITHOUT | 5 # This program is distributed in the hope that it will be useful, but WITHOUT |
6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
8 # details. | 8 # details. |
9 # | 9 # |
10 # You should have received a copy of the GNU General Public License along with | 10 # You should have received a copy of the GNU General Public License along with |
11 # this program; if not, write to the Free Software Foundation, Inc., 51 | 11 # this program; if not, write to the Free Software Foundation, Inc., 51 |
12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
13 # | 13 # |
14 # Copyright Buildbot Team Members | 14 # Copyright Buildbot Team Members |
15 | 15 |
16 import time | |
17 import os | 16 import os |
18 import sqlalchemy as sa | 17 import sqlalchemy as sa |
19 import twisted | 18 import twisted |
20 from buildbot.process import metrics | |
21 from twisted.internet import reactor, threads, defer | 19 from twisted.internet import reactor, threads, defer |
22 from twisted.python import threadpool, failure, versions, log | 20 from twisted.python import threadpool, failure, versions, log |
23 | 21 |
24 | 22 |
25 class DBThreadPool(threadpool.ThreadPool): | 23 class DBThreadPool(threadpool.ThreadPool): |
26 | 24 |
27 running = False | 25 running = False |
28 | 26 |
29 # Some versions of SQLite incorrectly cache metadata about which tables are | 27 # Some versions of SQLite incorrectly cache metadata about which tables are |
30 # and are not present on a per-connection basis. This cache can be flushed | 28 # and are not present on a per-connection basis. This cache can be flushed |
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
82 | 80 |
83 def shutdown(self): | 81 def shutdown(self): |
84 """Manually stop the pool. This is only necessary from tests, as the | 82 """Manually stop the pool. This is only necessary from tests, as the |
85 pool will stop itself when the reactor stops under normal | 83 pool will stop itself when the reactor stops under normal |
86 circumstances.""" | 84 circumstances.""" |
87 if not self._stop_evt: | 85 if not self._stop_evt: |
88 return # pool is already stopped | 86 return # pool is already stopped |
89 reactor.removeSystemEventTrigger(self._stop_evt) | 87 reactor.removeSystemEventTrigger(self._stop_evt) |
90 self._stop() | 88 self._stop() |
91 | 89 |
92 # Try about 170 times over the space of a day, with the last few tries | 90 def do(self, callable, *args, **kwargs): |
93 # being about an hour apart. This is designed to span a reasonable amount | 91 def thd(): |
94 # of time for repairing a broken database server, while still failing | 92 conn = self.engine.contextual_connect() |
95 # actual problematic queries eventually | |
96 BACKOFF_START = 1.0 | |
97 BACKOFF_MULT = 1.05 | |
98 MAX_OPERATIONALERROR_TIME = 3600*24 # one day | |
99 def __thd(self, with_engine, callable, args, kwargs): | |
100 # try to call callable(arg, *args, **kwargs) repeatedly until no | |
101 # OperationalErrors occur, where arg is either the engine (with_engine) | |
102 # or a connection (not with_engine) | |
103 backoff = self.BACKOFF_START | |
104 start = time.time() | |
105 while True: | |
106 if with_engine: | |
107 arg = self.engine | |
108 else: | |
109 arg = self.engine.contextual_connect() | |
110 | |
111 if self.__broken_sqlite: # see bug #1810 | 93 if self.__broken_sqlite: # see bug #1810 |
112 arg.execute("select * from sqlite_master") | 94 conn.execute("select * from sqlite_master") |
113 try: | 95 try: |
114 rv = callable(arg, *args, **kwargs) | 96 rv = callable(conn, *args, **kwargs) |
115 assert not isinstance(rv, sa.engine.ResultProxy), \ | 97 assert not isinstance(rv, sa.engine.ResultProxy), \ |
116 "do not return ResultProxy objects!" | 98 "do not return ResultProxy objects!" |
117 except sa.exc.OperationalError, e: | |
118 text = e.orig.args[0] | |
119 if "Lost connection" in text \ | |
120 or "database is locked" in text: | |
121 | |
122 # see if we've retried too much | |
123 elapsed = time.time() - start | |
124 if elapsed > self.MAX_OPERATIONALERROR_TIME: | |
125 raise | |
126 | |
127 metrics.MetricCountEvent.log( | |
128 "DBThreadPool.retry-on-OperationalError") | |
129 log.msg("automatically retrying query after " | |
130 "OperationalError (%ss sleep)" % backoff) | |
131 | |
132 # sleep (remember, we're in a thread..) | |
133 time.sleep(backoff) | |
134 backoff *= self.BACKOFF_MULT | |
135 | |
136 # and re-try | |
137 continue | |
138 else: | |
139 raise | |
140 finally: | 99 finally: |
141 if not with_engine: | 100 conn.close() |
142 arg.close() | 101 return rv |
143 break | 102 return threads.deferToThreadPool(reactor, self, thd) |
144 return rv | |
145 | |
146 def do(self, callable, *args, **kwargs): | |
147 return threads.deferToThreadPool(reactor, self, | |
148 self.__thd, False, callable, args, kwargs) | |
149 | 103 |
150 def do_with_engine(self, callable, *args, **kwargs): | 104 def do_with_engine(self, callable, *args, **kwargs): |
151 return threads.deferToThreadPool(reactor, self, | 105 def thd(): |
152 self.__thd, True, callable, args, kwargs) | 106 if self.__broken_sqlite: # see bug #1810 |
| 107 self.engine.execute("select * from sqlite_master") |
| 108 rv = callable(self.engine, *args, **kwargs) |
| 109 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 110 "do not return ResultProxy objects!" |
| 111 return rv |
| 112 return threads.deferToThreadPool(reactor, self, thd) |
153 | 113 |
154 # older implementations for twisted < 0.8.2, which does not have | 114 # older implementations for twisted < 0.8.2, which does not have |
155 # deferToThreadPool; this basically re-implements it, although it gets some | 115 # deferToThreadPool; this basically re-implements it, although it gets some |
156 # of the synchronization wrong - the thread may still be "in use" when the | 116 # of the synchronization wrong - the thread may still be "in use" when the |
157 # deferred fires in the parent, which can lead to database accesses hopping | 117 # deferred fires in the parent, which can lead to database accesses hopping |
158 # between threads. In practice, this should not cause any difficulty. | 118 # between threads. In practice, this should not cause any difficulty. |
| 119 def do_081(self, callable, *args, **kwargs): # pragma: no cover |
| 120 d = defer.Deferred() |
| 121 def thd(): |
| 122 try: |
| 123 conn = self.engine.contextual_connect() |
| 124 if self.__broken_sqlite: # see bug #1810 |
| 125 conn.execute("select * from sqlite_master") |
| 126 try: |
| 127 rv = callable(conn, *args, **kwargs) |
| 128 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 129 "do not return ResultProxy objects!" |
| 130 finally: |
| 131 conn.close() |
| 132 reactor.callFromThread(d.callback, rv) |
| 133 except: |
| 134 reactor.callFromThread(d.errback, failure.Failure()) |
| 135 self.callInThread(thd) |
| 136 return d |
| 137 |
| 138 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
| 139 d = defer.Deferred() |
| 140 def thd(): |
| 141 try: |
| 142 conn = self.engine |
| 143 if self.__broken_sqlite: # see bug #1810 |
| 144 conn.execute("select * from sqlite_master") |
| 145 rv = callable(conn, *args, **kwargs) |
| 146 assert not isinstance(rv, sa.engine.ResultProxy), \ |
| 147 "do not return ResultProxy objects!" |
| 148 reactor.callFromThread(d.callback, rv) |
| 149 except: |
| 150 reactor.callFromThread(d.errback, failure.Failure()) |
| 151 self.callInThread(thd) |
| 152 return d |
159 if twisted.version < versions.Version('twisted', 8, 2, 0): | 153 if twisted.version < versions.Version('twisted', 8, 2, 0): |
160 def __081_wrap(self, with_engine, callable, args, kwargs): # pragma: no
cover | |
161 d = defer.Deferred() | |
162 def thd(): | |
163 try: | |
164 reactor.callFromThread(d.callback, | |
165 self.__thd(with_engine, callable, args, kwargs)) | |
166 except: | |
167 reactor.callFromThread(d.errback, | |
168 failure.Failure()) | |
169 self.callInThread(thd) | |
170 return d | |
171 | |
172 def do_081(self, callable, *args, **kwargs): # pragma: no cover | |
173 return self.__081_wrap(False, callable, args, kwargs) | |
174 | |
175 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no co
ver | |
176 return self.__081_wrap(True, callable, args, kwargs) | |
177 | |
178 do = do_081 | 154 do = do_081 |
179 do_with_engine = do_with_engine_081 | 155 do_with_engine = do_with_engine_081 |
180 | 156 |
181 def detect_bug1810(self): | 157 def detect_bug1810(self): |
182 # detect buggy SQLite implementations; call only for a known-sqlite | 158 # detect buggy SQLite implementations; call only for a known-sqlite |
183 # dialect | 159 # dialect |
184 try: | 160 try: |
185 import pysqlite2.dbapi2 as sqlite | 161 import pysqlite2.dbapi2 as sqlite |
186 sqlite = sqlite | 162 sqlite = sqlite |
187 except ImportError: | 163 except ImportError: |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
227 except: | 203 except: |
228 return (0,) | 204 return (0,) |
229 | 205 |
230 if vers_row: | 206 if vers_row: |
231 try: | 207 try: |
232 return tuple(map(int, vers_row[0].split('.'))) | 208 return tuple(map(int, vers_row[0].split('.'))) |
233 except (TypeError, ValueError): | 209 except (TypeError, ValueError): |
234 return (0,) | 210 return (0,) |
235 else: | 211 else: |
236 return (0,) | 212 return (0,) |
OLD | NEW |