OLD | NEW |
1 # This file is part of Buildbot. Buildbot is free software: you can | 1 # This file is part of Buildbot. Buildbot is free software: you can |
2 # redistribute it and/or modify it under the terms of the GNU General Public | 2 # redistribute it and/or modify it under the terms of the GNU General Public |
3 # License as published by the Free Software Foundation, version 2. | 3 # License as published by the Free Software Foundation, version 2. |
4 # | 4 # |
5 # This program is distributed in the hope that it will be useful, but WITHOUT | 5 # This program is distributed in the hope that it will be useful, but WITHOUT |
6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | 6 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | 7 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more |
8 # details. | 8 # details. |
9 # | 9 # |
10 # You should have received a copy of the GNU General Public License along with | 10 # You should have received a copy of the GNU General Public License along with |
11 # this program; if not, write to the Free Software Foundation, Inc., 51 | 11 # this program; if not, write to the Free Software Foundation, Inc., 51 |
12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | 12 # Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. |
13 # | 13 # |
14 # Copyright Buildbot Team Members | 14 # Copyright Buildbot Team Members |
15 | 15 |
16 import os | 16 import os |
17 import sqlalchemy as sa | 17 import sqlalchemy as sa |
18 import twisted | 18 import twisted |
19 from twisted.internet import reactor, threads, defer, task | 19 from twisted.internet import reactor, threads, defer |
20 from twisted.python import threadpool, failure, versions, log | 20 from twisted.python import threadpool, failure, versions, log |
21 | 21 |
22 # Hack for bug #1992. In as-yet-unknown circumstances, select() fails to | |
23 # notice that a selfpipe has been written to, thus causing callFromThread, as | |
24 # used in deferToThreadPool, to hang indefinitely. The workaround is to wake | |
25 # up the select loop every second by ensuring that there is an event occuring | |
26 # every second, with this busy loop: | |
27 def bug1992hack(f): | |
28 def w(*args, **kwargs): | |
29 busyloop = task.LoopingCall(lambda : None) | |
30 busyloop.start(1) | |
31 d = f(*args, **kwargs) | |
32 def stop_loop(r): | |
33 busyloop.stop() | |
34 return r | |
35 d.addBoth(stop_loop) | |
36 return d | |
37 w.__name__ = f.__name__ | |
38 w.__doc__ = f.__doc__ | |
39 return w | |
40 | |
41 | 22 |
42 class DBThreadPool(threadpool.ThreadPool): | 23 class DBThreadPool(threadpool.ThreadPool): |
43 """ | 24 """ |
44 A pool of threads ready and waiting to execute queries. | 25 A pool of threads ready and waiting to execute queries. |
45 | 26 |
46 If the engine has an C{optimal_thread_pool_size} attribute, then the | 27 If the engine has an C{optimal_thread_pool_size} attribute, then the |
47 maxthreads of the thread pool will be set to that value. This is most | 28 maxthreads of the thread pool will be set to that value. This is most |
48 useful for SQLite in-memory connections, where exactly one connection | 29 useful for SQLite in-memory connections, where exactly one connection |
49 (and thus thread) should be used. | 30 (and thus thread) should be used. |
50 """ | 31 """ |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
89 | 70 |
90 def shutdown(self): | 71 def shutdown(self): |
91 """Manually stop the pool. This is only necessary from tests, as the | 72 """Manually stop the pool. This is only necessary from tests, as the |
92 pool will stop itself when the reactor stops under normal | 73 pool will stop itself when the reactor stops under normal |
93 circumstances.""" | 74 circumstances.""" |
94 if not self._stop_evt: | 75 if not self._stop_evt: |
95 return # pool is already stopped | 76 return # pool is already stopped |
96 reactor.removeSystemEventTrigger(self._stop_evt) | 77 reactor.removeSystemEventTrigger(self._stop_evt) |
97 self._stop() | 78 self._stop() |
98 | 79 |
99 @bug1992hack | |
100 def do(self, callable, *args, **kwargs): | 80 def do(self, callable, *args, **kwargs): |
101 """ | 81 """ |
102 Call C{callable} in a thread, with a Connection as first argument. | 82 Call C{callable} in a thread, with a Connection as first argument. |
103 Returns a deferred that will indicate the results of the callable. | 83 Returns a deferred that will indicate the results of the callable. |
104 | 84 |
105 Note: do not return any SQLAlchemy objects via this deferred! | 85 Note: do not return any SQLAlchemy objects via this deferred! |
106 """ | 86 """ |
107 def thd(): | 87 def thd(): |
108 conn = self.engine.contextual_connect() | 88 conn = self.engine.contextual_connect() |
109 if self.__broken_sqlite: # see bug #1810 | 89 if self.__broken_sqlite: # see bug #1810 |
110 conn.execute("select * from sqlite_master") | 90 conn.execute("select * from sqlite_master") |
111 try: | 91 try: |
112 rv = callable(conn, *args, **kwargs) | 92 rv = callable(conn, *args, **kwargs) |
113 assert not isinstance(rv, sa.engine.ResultProxy), \ | 93 assert not isinstance(rv, sa.engine.ResultProxy), \ |
114 "do not return ResultProxy objects!" | 94 "do not return ResultProxy objects!" |
115 finally: | 95 finally: |
116 conn.close() | 96 conn.close() |
117 return rv | 97 return rv |
118 return threads.deferToThreadPool(reactor, self, thd) | 98 return threads.deferToThreadPool(reactor, self, thd) |
119 | 99 |
120 @bug1992hack | |
121 def do_with_engine(self, callable, *args, **kwargs): | 100 def do_with_engine(self, callable, *args, **kwargs): |
122 """ | 101 """ |
123 Like L{do}, but with an SQLAlchemy Engine as the first argument. This | 102 Like L{do}, but with an SQLAlchemy Engine as the first argument. This |
124 is only used for schema manipulation, and is not used at master | 103 is only used for schema manipulation, and is not used at master |
125 runtime. | 104 runtime. |
126 """ | 105 """ |
127 def thd(): | 106 def thd(): |
128 if self.__broken_sqlite: # see bug #1810 | 107 if self.__broken_sqlite: # see bug #1810 |
129 self.engine.execute("select * from sqlite_master") | 108 self.engine.execute("select * from sqlite_master") |
130 rv = callable(self.engine, *args, **kwargs) | 109 rv = callable(self.engine, *args, **kwargs) |
131 assert not isinstance(rv, sa.engine.ResultProxy), \ | 110 assert not isinstance(rv, sa.engine.ResultProxy), \ |
132 "do not return ResultProxy objects!" | 111 "do not return ResultProxy objects!" |
133 return rv | 112 return rv |
134 return threads.deferToThreadPool(reactor, self, thd) | 113 return threads.deferToThreadPool(reactor, self, thd) |
135 | 114 |
136 # older implementations for twisted < 0.8.2, which does not have | 115 # older implementations for twisted < 0.8.2, which does not have |
137 # deferToThreadPool; this basically re-implements it, although it gets some | 116 # deferToThreadPool; this basically re-implements it, although it gets some |
138 # of the synchronization wrong - the thread may still be "in use" when the | 117 # of the synchronization wrong - the thread may still be "in use" when the |
139 # deferred fires in the parent, which can lead to database accesses hopping | 118 # deferred fires in the parent, which can lead to database accesses hopping |
140 # between threads. In practice, this should not cause any difficulty. | 119 # between threads. In practice, this should not cause any difficulty. |
141 @bug1992hack | |
142 def do_081(self, callable, *args, **kwargs): # pragma: no cover | 120 def do_081(self, callable, *args, **kwargs): # pragma: no cover |
143 d = defer.Deferred() | 121 d = defer.Deferred() |
144 def thd(): | 122 def thd(): |
145 try: | 123 try: |
146 conn = self.engine.contextual_connect() | 124 conn = self.engine.contextual_connect() |
147 if self.__broken_sqlite: # see bug #1810 | 125 if self.__broken_sqlite: # see bug #1810 |
148 conn.execute("select * from sqlite_master") | 126 conn.execute("select * from sqlite_master") |
149 try: | 127 try: |
150 rv = callable(conn, *args, **kwargs) | 128 rv = callable(conn, *args, **kwargs) |
151 assert not isinstance(rv, sa.engine.ResultProxy), \ | 129 assert not isinstance(rv, sa.engine.ResultProxy), \ |
152 "do not return ResultProxy objects!" | 130 "do not return ResultProxy objects!" |
153 finally: | 131 finally: |
154 conn.close() | 132 conn.close() |
155 reactor.callFromThread(d.callback, rv) | 133 reactor.callFromThread(d.callback, rv) |
156 except: | 134 except: |
157 reactor.callFromThread(d.errback, failure.Failure()) | 135 reactor.callFromThread(d.errback, failure.Failure()) |
158 self.callInThread(thd) | 136 self.callInThread(thd) |
159 return d | 137 return d |
160 | 138 |
161 @bug1992hack | |
162 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover | 139 def do_with_engine_081(self, callable, *args, **kwargs): # pragma: no cover |
163 d = defer.Deferred() | 140 d = defer.Deferred() |
164 def thd(): | 141 def thd(): |
165 try: | 142 try: |
166 conn = self.engine | 143 conn = self.engine |
167 if self.__broken_sqlite: # see bug #1810 | 144 if self.__broken_sqlite: # see bug #1810 |
168 conn.execute("select * from sqlite_master") | 145 conn.execute("select * from sqlite_master") |
169 rv = callable(conn, *args, **kwargs) | 146 rv = callable(conn, *args, **kwargs) |
170 assert not isinstance(rv, sa.engine.ResultProxy), \ | 147 assert not isinstance(rv, sa.engine.ResultProxy), \ |
171 "do not return ResultProxy objects!" | 148 "do not return ResultProxy objects!" |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
208 | 185 |
209 try: | 186 try: |
210 test() | 187 test() |
211 except sqlite.OperationalError: | 188 except sqlite.OperationalError: |
212 # this is the expected error indicating it's broken | 189 # this is the expected error indicating it's broken |
213 return True | 190 return True |
214 | 191 |
215 # but this version should not fail.. | 192 # but this version should not fail.. |
216 test(select_from_sqlite_master=True) | 193 test(select_from_sqlite_master=True) |
217 return False # not broken - no workaround required | 194 return False # not broken - no workaround required |
OLD | NEW |