OLD | NEW |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
4 | 4 |
5 import logging | 5 import time |
6 import posixpath | |
7 import traceback | 6 import traceback |
8 | 7 |
9 from app_yaml_helper import AppYamlHelper | 8 from app_yaml_helper import AppYamlHelper |
10 from appengine_wrappers import IsDeadlineExceededError, logservice | 9 from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue |
11 from branch_utility import BranchUtility | 10 from branch_utility import BranchUtility |
12 from compiled_file_system import CompiledFileSystem | 11 from compiled_file_system import CompiledFileSystem |
| 12 from custom_logger import CustomLogger |
13 from data_source_registry import CreateDataSources | 13 from data_source_registry import CreateDataSources |
14 from environment import GetAppVersion, IsDevServer | 14 from environment import GetAppVersion |
15 from extensions_paths import EXAMPLES, PUBLIC_TEMPLATES, STATIC_DOCS | |
16 from file_system_util import CreateURLsFromPaths | |
17 from future import Future | |
18 from gcs_file_system_provider import CloudStorageFileSystemProvider | 15 from gcs_file_system_provider import CloudStorageFileSystemProvider |
19 from github_file_system_provider import GithubFileSystemProvider | 16 from github_file_system_provider import GithubFileSystemProvider |
20 from host_file_system_provider import HostFileSystemProvider | 17 from host_file_system_provider import HostFileSystemProvider |
21 from object_store_creator import ObjectStoreCreator | 18 from object_store_creator import ObjectStoreCreator |
22 from render_servlet import RenderServlet | 19 from render_refresher import RenderRefresher |
23 from server_instance import ServerInstance | 20 from server_instance import ServerInstance |
24 from servlet import Servlet, Request, Response | 21 from servlet import Servlet, Request, Response |
25 from special_paths import SITE_VERIFICATION_FILE | 22 from timer import Timer |
26 from timer import Timer, TimerClosure | |
27 | 23 |
28 | 24 |
29 class _SingletonRenderServletDelegate(RenderServlet.Delegate): | 25 _log = CustomLogger('cron') |
30 def __init__(self, server_instance): | |
31 self._server_instance = server_instance | |
32 | 26 |
33 def CreateServerInstance(self): | |
34 return self._server_instance | |
35 | |
36 class _CronLogger(object): | |
37 '''Wraps the logging.* methods to prefix them with 'cron' and flush | |
38 immediately. The flushing is important because often these cron runs time | |
39 out and we lose the logs. | |
40 ''' | |
41 def info(self, msg, *args): self._log(logging.info, msg, args) | |
42 def warning(self, msg, *args): self._log(logging.warning, msg, args) | |
43 def error(self, msg, *args): self._log(logging.error, msg, args) | |
44 | |
45 def _log(self, logfn, msg, args): | |
46 try: | |
47 logfn('cron: %s' % msg, *args) | |
48 finally: | |
49 logservice.flush() | |
50 | |
51 _cronlog = _CronLogger() | |
52 | |
53 def _RequestEachItem(title, items, request_callback): | |
54 '''Runs a task |request_callback| named |title| for each item in |items|. | |
55 |request_callback| must take an item and return a servlet response. | |
56 Returns true if every item was successfully run, false if any return a | |
57 non-200 response or raise an exception. | |
58 ''' | |
59 _cronlog.info('%s: starting', title) | |
60 success_count, failure_count = 0, 0 | |
61 timer = Timer() | |
62 try: | |
63 for i, item in enumerate(items): | |
64 def error_message(detail): | |
65 return '%s: error rendering %s (%s of %s): %s' % ( | |
66 title, item, i + 1, len(items), detail) | |
67 try: | |
68 response = request_callback(item) | |
69 if response.status == 200: | |
70 success_count += 1 | |
71 else: | |
72 _cronlog.error(error_message('response status %s' % response.status)) | |
73 failure_count += 1 | |
74 except Exception as e: | |
75 _cronlog.error(error_message(traceback.format_exc())) | |
76 failure_count += 1 | |
77 if IsDeadlineExceededError(e): raise | |
78 finally: | |
79 _cronlog.info('%s: rendered %s of %s with %s failures in %s', | |
80 title, success_count, len(items), failure_count, | |
81 timer.Stop().FormatElapsed()) | |
82 return success_count == len(items) | |
83 | 27 |
84 class CronServlet(Servlet): | 28 class CronServlet(Servlet): |
85 '''Servlet which runs a cron job. | 29 '''Servlet which runs a cron job. |
86 ''' | 30 ''' |
87 def __init__(self, request, delegate_for_test=None): | 31 def __init__(self, request, delegate_for_test=None): |
88 Servlet.__init__(self, request) | 32 Servlet.__init__(self, request) |
89 self._delegate = delegate_for_test or CronServlet.Delegate() | 33 self._delegate = delegate_for_test or CronServlet.Delegate() |
90 | 34 |
91 class Delegate(object): | 35 class Delegate(object): |
92 '''CronServlet's runtime dependencies. Override for testing. | 36 '''CronServlet's runtime dependencies. Override for testing. |
(...skipping 10 matching lines...) Expand all Loading... |
103 def CreateGithubFileSystemProvider(self, object_store_creator): | 47 def CreateGithubFileSystemProvider(self, object_store_creator): |
104 return GithubFileSystemProvider(object_store_creator) | 48 return GithubFileSystemProvider(object_store_creator) |
105 | 49 |
106 def CreateGCSFileSystemProvider(self, object_store_creator): | 50 def CreateGCSFileSystemProvider(self, object_store_creator): |
107 return CloudStorageFileSystemProvider(object_store_creator) | 51 return CloudStorageFileSystemProvider(object_store_creator) |
108 | 52 |
109 def GetAppVersion(self): | 53 def GetAppVersion(self): |
110 return GetAppVersion() | 54 return GetAppVersion() |
111 | 55 |
112 def Get(self): | 56 def Get(self): |
113 # Crons often time out, and if they do we need to make sure to flush the | 57 # Refreshes may time out, and if they do we need to make sure to flush the |
114 # logs before the process gets killed (Python gives us a couple of | 58 # logs before the process gets killed (Python gives us a couple of |
115 # seconds). | 59 # seconds). |
116 # | 60 # |
117 # So, manually flush logs at the end of the cron run. However, sometimes | 61 # So, manually flush logs at the end of the cron run. However, sometimes |
118 # even that isn't enough, which is why in this file we use _cronlog and | 62 # even that isn't enough, which is why in this file we use _log and |
119 # make it flush the log every time its used. | 63 # make it flush the log every time its used. |
120 logservice.AUTOFLUSH_ENABLED = False | 64 logservice.AUTOFLUSH_ENABLED = False |
121 try: | 65 try: |
122 return self._GetImpl() | 66 return self._GetImpl() |
123 except BaseException: | 67 except BaseException: |
124 _cronlog.error('Caught top-level exception! %s', traceback.format_exc()) | 68 _log.error('Caught top-level exception! %s', traceback.format_exc()) |
125 finally: | 69 finally: |
126 logservice.flush() | 70 logservice.flush() |
127 | 71 |
128 def _GetImpl(self): | 72 def _GetImpl(self): |
129 # Cron strategy: | 73 # Cron strategy: |
130 # | 74 # |
131 # Find all public template files and static files, and render them. Most of | 75 # Collect all DataSources, the PlatformBundle, the ContentProviders, and |
132 # the time these won't have changed since the last cron run, so it's a | 76 # any other statically renderered contents (e.g. examples content), |
133 # little wasteful, but hopefully rendering is really fast (if it isn't we | 77 # and spin up taskqueue tasks which will refresh any cached data relevant |
134 # have a problem). | 78 # to these assets. |
135 _cronlog.info('starting') | 79 # |
| 80 # TODO(rockot/kalman): At the moment examples are not actually refreshed |
| 81 # because they're too slow. |
136 | 82 |
137 # This is returned every time RenderServlet wants to create a new | 83 _log.info('starting') |
138 # ServerInstance. | 84 |
139 # | |
140 # TODO(kalman): IMPORTANT. This sometimes throws an exception, breaking | |
141 # everything. Need retry logic at the fetcher level. | |
142 server_instance = self._GetSafeServerInstance() | 85 server_instance = self._GetSafeServerInstance() |
143 master_fs = server_instance.host_file_system_provider.GetMaster() | 86 master_fs = server_instance.host_file_system_provider.GetMaster() |
| 87 master_commit = master_fs.GetCommitID().Get() |
144 | 88 |
145 def render(path): | 89 # This is the guy that would be responsible for refreshing the cache of |
146 request = Request(path, self._request.host, self._request.headers) | 90 # examples. Here for posterity, hopefully it will be added to the targets |
147 delegate = _SingletonRenderServletDelegate(server_instance) | 91 # below someday. |
148 return RenderServlet(request, delegate).Get() | 92 render_refresher = RenderRefresher(server_instance, self._request) |
149 | 93 |
150 def request_files_in_dir(path, prefix='', strip_ext=None): | 94 # Get the default taskqueue |
151 '''Requests every file found under |path| in this host file system, with | 95 queue = taskqueue.Queue() |
152 a request prefix of |prefix|. |strip_ext| is an optional list of file | |
153 extensions that should be stripped from paths before requesting. | |
154 ''' | |
155 def maybe_strip_ext(name): | |
156 if name == SITE_VERIFICATION_FILE or not strip_ext: | |
157 return name | |
158 base, ext = posixpath.splitext(name) | |
159 return base if ext in strip_ext else name | |
160 files = [maybe_strip_ext(name) | |
161 for name, _ in CreateURLsFromPaths(master_fs, path, prefix)] | |
162 return _RequestEachItem(path, files, render) | |
163 | 96 |
164 results = [] | 97 # GAE documentation specifies that it's bad to add tasks to a queue |
| 98 # within one second of purging. We wait 2 seconds, because we like |
| 99 # to go the extra mile. |
| 100 queue.purge() |
| 101 time.sleep(2) |
165 | 102 |
| 103 success = True |
166 try: | 104 try: |
167 # Start running the hand-written Cron methods first; they can be run in | 105 data_sources = CreateDataSources(server_instance) |
168 # parallel. They are resolved at the end. | 106 targets = (data_sources.items() + |
169 def run_cron_for_future(target): | 107 [('content_providers', server_instance.content_providers), |
170 title = target.__class__.__name__ | 108 ('platform_bundle', server_instance.platform_bundle)]) |
171 future, init_timer = TimerClosure(target.Cron) | 109 title = 'initializing %s parallel targets' % len(targets) |
172 assert isinstance(future, Future), ( | 110 _log.info(title) |
173 '%s.Cron() did not return a Future' % title) | |
174 def resolve(): | |
175 resolve_timer = Timer() | |
176 try: | |
177 future.Get() | |
178 except Exception as e: | |
179 _cronlog.error('%s: error %s' % (title, traceback.format_exc())) | |
180 results.append(False) | |
181 if IsDeadlineExceededError(e): raise | |
182 finally: | |
183 resolve_timer.Stop() | |
184 _cronlog.info('%s took %s: %s to initialize and %s to resolve' % | |
185 (title, | |
186 init_timer.With(resolve_timer).FormatElapsed(), | |
187 init_timer.FormatElapsed(), | |
188 resolve_timer.FormatElapsed())) | |
189 return Future(callback=resolve) | |
190 | |
191 targets = (CreateDataSources(server_instance).values() + | |
192 [server_instance.content_providers, | |
193 server_instance.platform_bundle]) | |
194 title = 'initializing %s parallel Cron targets' % len(targets) | |
195 _cronlog.info(title) | |
196 timer = Timer() | 111 timer = Timer() |
197 try: | 112 for name, target in targets: |
198 cron_futures = [run_cron_for_future(target) for target in targets] | 113 refresh_paths = target.GetRefreshPaths() |
199 finally: | 114 for path in refresh_paths: |
200 _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed())) | 115 queue.add(taskqueue.Task(url='/_refresh/%s/%s' % (name, path), |
201 | 116 params={'commit': master_commit})) |
202 # Samples are too expensive to run on the dev server, where there is no | 117 _log.info('%s took %s' % (title, timer.Stop().FormatElapsed())) |
203 # parallel fetch. | |
204 # | |
205 # XXX(kalman): Currently samples are *always* too expensive to fetch, so | |
206 # disabling them for now. It won't break anything so long as we're still | |
207 # not enforcing that everything gets cached for normal instances. | |
208 if False: # should be "not IsDevServer()": | |
209 # Fetch each individual sample file. | |
210 results.append(request_files_in_dir(EXAMPLES, | |
211 prefix='extensions/examples')) | |
212 | |
213 # Resolve the hand-written Cron method futures. | |
214 title = 'resolving %s parallel Cron targets' % len(targets) | |
215 _cronlog.info(title) | |
216 timer = Timer() | |
217 try: | |
218 for future in cron_futures: | |
219 future.Get() | |
220 finally: | |
221 _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed())) | |
222 | |
223 except: | 118 except: |
224 results.append(False) | |
225 # This should never actually happen (each cron step does its own | 119 # This should never actually happen (each cron step does its own |
226 # conservative error checking), so re-raise no matter what it is. | 120 # conservative error checking), so re-raise no matter what it is. |
227 _cronlog.error('uncaught error: %s' % traceback.format_exc()) | 121 _log.error('uncaught error: %s' % traceback.format_exc()) |
| 122 success = False |
228 raise | 123 raise |
229 finally: | 124 finally: |
230 success = all(results) | 125 _log.info('finished (%s)', 'success' if success else 'FAILED') |
231 _cronlog.info('finished (%s)', 'success' if success else 'FAILED') | |
232 return (Response.Ok('Success') if success else | 126 return (Response.Ok('Success') if success else |
233 Response.InternalError('Failure')) | 127 Response.InternalError('Failure')) |
234 | 128 |
235 def _GetSafeServerInstance(self): | 129 def _GetSafeServerInstance(self): |
236 '''Returns a ServerInstance with a host file system at a safe commit, | 130 '''Returns a ServerInstance with a host file system at a safe commit, |
237 meaning the last commit that the current running version of the server | 131 meaning the last commit that the current running version of the server |
238 existed. | 132 existed. |
239 ''' | 133 ''' |
240 delegate = self._delegate | 134 delegate = self._delegate |
241 | 135 |
(...skipping 10 matching lines...) Expand all Loading... |
252 server_instance_near_head.host_file_system_provider) | 146 server_instance_near_head.host_file_system_provider) |
253 | 147 |
254 if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()): | 148 if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()): |
255 return server_instance_near_head | 149 return server_instance_near_head |
256 | 150 |
257 # The version in app.yaml is greater than the currently running app's. | 151 # The version in app.yaml is greater than the currently running app's. |
258 # The safe version is the one before it changed. | 152 # The safe version is the one before it changed. |
259 safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan( | 153 safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan( |
260 delegate.GetAppVersion()) - 1 | 154 delegate.GetAppVersion()) - 1 |
261 | 155 |
262 _cronlog.info('app version %s is out of date, safe is %s', | 156 _log.info('app version %s is out of date, safe is %s', |
263 delegate.GetAppVersion(), safe_revision) | 157 delegate.GetAppVersion(), safe_revision) |
264 | 158 |
265 return self._CreateServerInstance(safe_revision) | 159 return self._CreateServerInstance(safe_revision) |
266 | 160 |
267 def _GetMostRecentCommit(self): | 161 def _GetMostRecentCommit(self): |
268 '''Gets the commit of the most recent patch submitted to the host file | 162 '''Gets the commit of the most recent patch submitted to the host file |
269 system. This is similar to HEAD but it's a concrete commit so won't | 163 system. This is similar to HEAD but it's a concrete commit so won't |
270 change as the cron runs. | 164 change as the cron runs. |
271 ''' | 165 ''' |
272 head_fs = ( | 166 head_fs = ( |
(...skipping 12 matching lines...) Expand all Loading... |
285 github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( | 179 github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( |
286 object_store_creator) | 180 object_store_creator) |
287 gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider( | 181 gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider( |
288 object_store_creator) | 182 object_store_creator) |
289 return ServerInstance(object_store_creator, | 183 return ServerInstance(object_store_creator, |
290 CompiledFileSystem.Factory(object_store_creator), | 184 CompiledFileSystem.Factory(object_store_creator), |
291 branch_utility, | 185 branch_utility, |
292 host_file_system_provider, | 186 host_file_system_provider, |
293 github_file_system_provider, | 187 github_file_system_provider, |
294 gcs_file_system_provider) | 188 gcs_file_system_provider) |
OLD | NEW |