| OLD | NEW |
| 1 # Copyright 2013 The Chromium Authors. All rights reserved. | 1 # Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is governed by a BSD-style license that can be | 2 # Use of this source code is governed by a BSD-style license that can be |
| 3 # found in the LICENSE file. | 3 # found in the LICENSE file. |
| 4 | 4 |
| 5 import logging | 5 import time |
| 6 import posixpath | |
| 7 import traceback | 6 import traceback |
| 8 | 7 |
| 9 from app_yaml_helper import AppYamlHelper | 8 from app_yaml_helper import AppYamlHelper |
| 10 from appengine_wrappers import IsDeadlineExceededError, logservice | 9 from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue |
| 11 from branch_utility import BranchUtility | 10 from branch_utility import BranchUtility |
| 12 from compiled_file_system import CompiledFileSystem | 11 from compiled_file_system import CompiledFileSystem |
| 12 from custom_logger import CustomLogger |
| 13 from data_source_registry import CreateDataSources | 13 from data_source_registry import CreateDataSources |
| 14 from environment import GetAppVersion, IsDevServer | 14 from environment import GetAppVersion |
| 15 from extensions_paths import EXAMPLES, PUBLIC_TEMPLATES, STATIC_DOCS | |
| 16 from file_system_util import CreateURLsFromPaths | |
| 17 from future import Future | |
| 18 from gcs_file_system_provider import CloudStorageFileSystemProvider | 15 from gcs_file_system_provider import CloudStorageFileSystemProvider |
| 19 from github_file_system_provider import GithubFileSystemProvider | 16 from github_file_system_provider import GithubFileSystemProvider |
| 20 from host_file_system_provider import HostFileSystemProvider | 17 from host_file_system_provider import HostFileSystemProvider |
| 21 from object_store_creator import ObjectStoreCreator | 18 from object_store_creator import ObjectStoreCreator |
| 22 from render_servlet import RenderServlet | 19 from render_refresher import RenderRefresher |
| 23 from server_instance import ServerInstance | 20 from server_instance import ServerInstance |
| 24 from servlet import Servlet, Request, Response | 21 from servlet import Servlet, Request, Response |
| 25 from special_paths import SITE_VERIFICATION_FILE | 22 from timer import Timer |
| 26 from timer import Timer, TimerClosure | |
| 27 | 23 |
| 28 | 24 |
| 29 class _SingletonRenderServletDelegate(RenderServlet.Delegate): | 25 _log = CustomLogger('cron') |
| 30 def __init__(self, server_instance): | |
| 31 self._server_instance = server_instance | |
| 32 | 26 |
| 33 def CreateServerInstance(self): | |
| 34 return self._server_instance | |
| 35 | |
| 36 class _CronLogger(object): | |
| 37 '''Wraps the logging.* methods to prefix them with 'cron' and flush | |
| 38 immediately. The flushing is important because often these cron runs time | |
| 39 out and we lose the logs. | |
| 40 ''' | |
| 41 def info(self, msg, *args): self._log(logging.info, msg, args) | |
| 42 def warning(self, msg, *args): self._log(logging.warning, msg, args) | |
| 43 def error(self, msg, *args): self._log(logging.error, msg, args) | |
| 44 | |
| 45 def _log(self, logfn, msg, args): | |
| 46 try: | |
| 47 logfn('cron: %s' % msg, *args) | |
| 48 finally: | |
| 49 logservice.flush() | |
| 50 | |
| 51 _cronlog = _CronLogger() | |
| 52 | |
| 53 def _RequestEachItem(title, items, request_callback): | |
| 54 '''Runs a task |request_callback| named |title| for each item in |items|. | |
| 55 |request_callback| must take an item and return a servlet response. | |
| 56 Returns true if every item was successfully run, false if any return a | |
| 57 non-200 response or raise an exception. | |
| 58 ''' | |
| 59 _cronlog.info('%s: starting', title) | |
| 60 success_count, failure_count = 0, 0 | |
| 61 timer = Timer() | |
| 62 try: | |
| 63 for i, item in enumerate(items): | |
| 64 def error_message(detail): | |
| 65 return '%s: error rendering %s (%s of %s): %s' % ( | |
| 66 title, item, i + 1, len(items), detail) | |
| 67 try: | |
| 68 response = request_callback(item) | |
| 69 if response.status == 200: | |
| 70 success_count += 1 | |
| 71 else: | |
| 72 _cronlog.error(error_message('response status %s' % response.status)) | |
| 73 failure_count += 1 | |
| 74 except Exception as e: | |
| 75 _cronlog.error(error_message(traceback.format_exc())) | |
| 76 failure_count += 1 | |
| 77 if IsDeadlineExceededError(e): raise | |
| 78 finally: | |
| 79 _cronlog.info('%s: rendered %s of %s with %s failures in %s', | |
| 80 title, success_count, len(items), failure_count, | |
| 81 timer.Stop().FormatElapsed()) | |
| 82 return success_count == len(items) | |
| 83 | 27 |
| 84 class CronServlet(Servlet): | 28 class CronServlet(Servlet): |
| 85 '''Servlet which runs a cron job. | 29 '''Servlet which runs a cron job. |
| 86 ''' | 30 ''' |
| 87 def __init__(self, request, delegate_for_test=None): | 31 def __init__(self, request, delegate_for_test=None): |
| 88 Servlet.__init__(self, request) | 32 Servlet.__init__(self, request) |
| 89 self._delegate = delegate_for_test or CronServlet.Delegate() | 33 self._delegate = delegate_for_test or CronServlet.Delegate() |
| 90 | 34 |
| 91 class Delegate(object): | 35 class Delegate(object): |
| 92 '''CronServlet's runtime dependencies. Override for testing. | 36 '''CronServlet's runtime dependencies. Override for testing. |
| (...skipping 10 matching lines...) Expand all Loading... |
| 103 def CreateGithubFileSystemProvider(self, object_store_creator): | 47 def CreateGithubFileSystemProvider(self, object_store_creator): |
| 104 return GithubFileSystemProvider(object_store_creator) | 48 return GithubFileSystemProvider(object_store_creator) |
| 105 | 49 |
| 106 def CreateGCSFileSystemProvider(self, object_store_creator): | 50 def CreateGCSFileSystemProvider(self, object_store_creator): |
| 107 return CloudStorageFileSystemProvider(object_store_creator) | 51 return CloudStorageFileSystemProvider(object_store_creator) |
| 108 | 52 |
| 109 def GetAppVersion(self): | 53 def GetAppVersion(self): |
| 110 return GetAppVersion() | 54 return GetAppVersion() |
| 111 | 55 |
| 112 def Get(self): | 56 def Get(self): |
| 113 # Crons often time out, and if they do we need to make sure to flush the | 57 # Refreshes may time out, and if they do we need to make sure to flush the |
| 114 # logs before the process gets killed (Python gives us a couple of | 58 # logs before the process gets killed (Python gives us a couple of |
| 115 # seconds). | 59 # seconds). |
| 116 # | 60 # |
| 117 # So, manually flush logs at the end of the cron run. However, sometimes | 61 # So, manually flush logs at the end of the cron run. However, sometimes |
| 118 # even that isn't enough, which is why in this file we use _cronlog and | 62 # even that isn't enough, which is why in this file we use _log and |
| 119 # make it flush the log every time its used. | 63 # make it flush the log every time its used. |
| 120 logservice.AUTOFLUSH_ENABLED = False | 64 logservice.AUTOFLUSH_ENABLED = False |
| 121 try: | 65 try: |
| 122 return self._GetImpl() | 66 return self._GetImpl() |
| 123 except BaseException: | 67 except BaseException: |
| 124 _cronlog.error('Caught top-level exception! %s', traceback.format_exc()) | 68 _log.error('Caught top-level exception! %s', traceback.format_exc()) |
| 125 finally: | 69 finally: |
| 126 logservice.flush() | 70 logservice.flush() |
| 127 | 71 |
| 128 def _GetImpl(self): | 72 def _GetImpl(self): |
| 129 # Cron strategy: | 73 # Cron strategy: |
| 130 # | 74 # |
| 131 # Find all public template files and static files, and render them. Most of | 75 # Collect all DataSources, the PlatformBundle, the ContentProviders, and |
| 132 # the time these won't have changed since the last cron run, so it's a | 76 # any other statically renderered contents (e.g. examples content), |
| 133 # little wasteful, but hopefully rendering is really fast (if it isn't we | 77 # and spin up taskqueue tasks which will refresh any cached data relevant |
| 134 # have a problem). | 78 # to these assets. |
| 135 _cronlog.info('starting') | 79 # |
| 80 # TODO(rockot/kalman): At the moment examples are not actually refreshed |
| 81 # because they're too slow. |
| 136 | 82 |
| 137 # This is returned every time RenderServlet wants to create a new | 83 _log.info('starting') |
| 138 # ServerInstance. | 84 |
| 139 # | |
| 140 # TODO(kalman): IMPORTANT. This sometimes throws an exception, breaking | |
| 141 # everything. Need retry logic at the fetcher level. | |
| 142 server_instance = self._GetSafeServerInstance() | 85 server_instance = self._GetSafeServerInstance() |
| 143 master_fs = server_instance.host_file_system_provider.GetMaster() | 86 master_fs = server_instance.host_file_system_provider.GetMaster() |
| 87 master_commit = master_fs.GetCommitID().Get() |
| 144 | 88 |
| 145 def render(path): | 89 # This is the guy that would be responsible for refreshing the cache of |
| 146 request = Request(path, self._request.host, self._request.headers) | 90 # examples. Here for posterity, hopefully it will be added to the targets |
| 147 delegate = _SingletonRenderServletDelegate(server_instance) | 91 # below someday. |
| 148 return RenderServlet(request, delegate).Get() | 92 render_refresher = RenderRefresher(server_instance, self._request) |
| 149 | 93 |
| 150 def request_files_in_dir(path, prefix='', strip_ext=None): | 94 # Get the default taskqueue |
| 151 '''Requests every file found under |path| in this host file system, with | 95 queue = taskqueue.Queue() |
| 152 a request prefix of |prefix|. |strip_ext| is an optional list of file | |
| 153 extensions that should be stripped from paths before requesting. | |
| 154 ''' | |
| 155 def maybe_strip_ext(name): | |
| 156 if name == SITE_VERIFICATION_FILE or not strip_ext: | |
| 157 return name | |
| 158 base, ext = posixpath.splitext(name) | |
| 159 return base if ext in strip_ext else name | |
| 160 files = [maybe_strip_ext(name) | |
| 161 for name, _ in CreateURLsFromPaths(master_fs, path, prefix)] | |
| 162 return _RequestEachItem(path, files, render) | |
| 163 | 96 |
| 164 results = [] | 97 # GAE documentation specifies that it's bad to add tasks to a queue |
| 98 # within one second of purging. We wait 2 seconds, because we like |
| 99 # to go the extra mile. |
| 100 queue.purge() |
| 101 time.sleep(2) |
| 165 | 102 |
| 103 success = True |
| 166 try: | 104 try: |
| 167 # Start running the hand-written Cron methods first; they can be run in | 105 data_sources = CreateDataSources(server_instance) |
| 168 # parallel. They are resolved at the end. | 106 targets = (data_sources.items() + |
| 169 def run_cron_for_future(target): | 107 [('content_providers', server_instance.content_providers), |
| 170 title = target.__class__.__name__ | 108 ('platform_bundle', server_instance.platform_bundle)]) |
| 171 future, init_timer = TimerClosure(target.Cron) | 109 title = 'initializing %s parallel targets' % len(targets) |
| 172 assert isinstance(future, Future), ( | 110 _log.info(title) |
| 173 '%s.Cron() did not return a Future' % title) | |
| 174 def resolve(): | |
| 175 resolve_timer = Timer() | |
| 176 try: | |
| 177 future.Get() | |
| 178 except Exception as e: | |
| 179 _cronlog.error('%s: error %s' % (title, traceback.format_exc())) | |
| 180 results.append(False) | |
| 181 if IsDeadlineExceededError(e): raise | |
| 182 finally: | |
| 183 resolve_timer.Stop() | |
| 184 _cronlog.info('%s took %s: %s to initialize and %s to resolve' % | |
| 185 (title, | |
| 186 init_timer.With(resolve_timer).FormatElapsed(), | |
| 187 init_timer.FormatElapsed(), | |
| 188 resolve_timer.FormatElapsed())) | |
| 189 return Future(callback=resolve) | |
| 190 | |
| 191 targets = (CreateDataSources(server_instance).values() + | |
| 192 [server_instance.content_providers, | |
| 193 server_instance.platform_bundle]) | |
| 194 title = 'initializing %s parallel Cron targets' % len(targets) | |
| 195 _cronlog.info(title) | |
| 196 timer = Timer() | 111 timer = Timer() |
| 197 try: | 112 for name, target in targets: |
| 198 cron_futures = [run_cron_for_future(target) for target in targets] | 113 refresh_paths = target.GetRefreshPaths() |
| 199 finally: | 114 for path in refresh_paths: |
| 200 _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed())) | 115 queue.add(taskqueue.Task(url='/_refresh/%s/%s' % (name, path), |
| 201 | 116 params={'commit': master_commit})) |
| 202 # Samples are too expensive to run on the dev server, where there is no | 117 _log.info('%s took %s' % (title, timer.Stop().FormatElapsed())) |
| 203 # parallel fetch. | |
| 204 # | |
| 205 # XXX(kalman): Currently samples are *always* too expensive to fetch, so | |
| 206 # disabling them for now. It won't break anything so long as we're still | |
| 207 # not enforcing that everything gets cached for normal instances. | |
| 208 if False: # should be "not IsDevServer()": | |
| 209 # Fetch each individual sample file. | |
| 210 results.append(request_files_in_dir(EXAMPLES, | |
| 211 prefix='extensions/examples')) | |
| 212 | |
| 213 # Resolve the hand-written Cron method futures. | |
| 214 title = 'resolving %s parallel Cron targets' % len(targets) | |
| 215 _cronlog.info(title) | |
| 216 timer = Timer() | |
| 217 try: | |
| 218 for future in cron_futures: | |
| 219 future.Get() | |
| 220 finally: | |
| 221 _cronlog.info('%s took %s' % (title, timer.Stop().FormatElapsed())) | |
| 222 | |
| 223 except: | 118 except: |
| 224 results.append(False) | |
| 225 # This should never actually happen (each cron step does its own | 119 # This should never actually happen (each cron step does its own |
| 226 # conservative error checking), so re-raise no matter what it is. | 120 # conservative error checking), so re-raise no matter what it is. |
| 227 _cronlog.error('uncaught error: %s' % traceback.format_exc()) | 121 _log.error('uncaught error: %s' % traceback.format_exc()) |
| 122 success = False |
| 228 raise | 123 raise |
| 229 finally: | 124 finally: |
| 230 success = all(results) | 125 _log.info('finished (%s)', 'success' if success else 'FAILED') |
| 231 _cronlog.info('finished (%s)', 'success' if success else 'FAILED') | |
| 232 return (Response.Ok('Success') if success else | 126 return (Response.Ok('Success') if success else |
| 233 Response.InternalError('Failure')) | 127 Response.InternalError('Failure')) |
| 234 | 128 |
| 235 def _GetSafeServerInstance(self): | 129 def _GetSafeServerInstance(self): |
| 236 '''Returns a ServerInstance with a host file system at a safe commit, | 130 '''Returns a ServerInstance with a host file system at a safe commit, |
| 237 meaning the last commit that the current running version of the server | 131 meaning the last commit that the current running version of the server |
| 238 existed. | 132 existed. |
| 239 ''' | 133 ''' |
| 240 delegate = self._delegate | 134 delegate = self._delegate |
| 241 | 135 |
| (...skipping 10 matching lines...) Expand all Loading... |
| 252 server_instance_near_head.host_file_system_provider) | 146 server_instance_near_head.host_file_system_provider) |
| 253 | 147 |
| 254 if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()): | 148 if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()): |
| 255 return server_instance_near_head | 149 return server_instance_near_head |
| 256 | 150 |
| 257 # The version in app.yaml is greater than the currently running app's. | 151 # The version in app.yaml is greater than the currently running app's. |
| 258 # The safe version is the one before it changed. | 152 # The safe version is the one before it changed. |
| 259 safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan( | 153 safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan( |
| 260 delegate.GetAppVersion()) - 1 | 154 delegate.GetAppVersion()) - 1 |
| 261 | 155 |
| 262 _cronlog.info('app version %s is out of date, safe is %s', | 156 _log.info('app version %s is out of date, safe is %s', |
| 263 delegate.GetAppVersion(), safe_revision) | 157 delegate.GetAppVersion(), safe_revision) |
| 264 | 158 |
| 265 return self._CreateServerInstance(safe_revision) | 159 return self._CreateServerInstance(safe_revision) |
| 266 | 160 |
| 267 def _GetMostRecentCommit(self): | 161 def _GetMostRecentCommit(self): |
| 268 '''Gets the commit of the most recent patch submitted to the host file | 162 '''Gets the commit of the most recent patch submitted to the host file |
| 269 system. This is similar to HEAD but it's a concrete commit so won't | 163 system. This is similar to HEAD but it's a concrete commit so won't |
| 270 change as the cron runs. | 164 change as the cron runs. |
| 271 ''' | 165 ''' |
| 272 head_fs = ( | 166 head_fs = ( |
| (...skipping 12 matching lines...) Expand all Loading... |
| 285 github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( | 179 github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( |
| 286 object_store_creator) | 180 object_store_creator) |
| 287 gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider( | 181 gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider( |
| 288 object_store_creator) | 182 object_store_creator) |
| 289 return ServerInstance(object_store_creator, | 183 return ServerInstance(object_store_creator, |
| 290 CompiledFileSystem.Factory(object_store_creator), | 184 CompiledFileSystem.Factory(object_store_creator), |
| 291 branch_utility, | 185 branch_utility, |
| 292 host_file_system_provider, | 186 host_file_system_provider, |
| 293 github_file_system_provider, | 187 github_file_system_provider, |
| 294 gcs_file_system_provider) | 188 gcs_file_system_provider) |
| OLD | NEW |