| OLD | NEW |
| (Empty) |
| 1 # Copyright 2013 The Chromium Authors. All rights reserved. | |
| 2 # Use of this source code is governed by a BSD-style license that can be | |
| 3 # found in the LICENSE file. | |
| 4 | |
| 5 import time | |
| 6 import traceback | |
| 7 | |
| 8 from app_yaml_helper import AppYamlHelper | |
| 9 from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue | |
| 10 from branch_utility import BranchUtility | |
| 11 from compiled_file_system import CompiledFileSystem | |
| 12 from custom_logger import CustomLogger | |
| 13 from data_source_registry import CreateDataSources | |
| 14 from environment import GetAppVersion | |
| 15 from gcs_file_system_provider import CloudStorageFileSystemProvider | |
| 16 from github_file_system_provider import GithubFileSystemProvider | |
| 17 from host_file_system_provider import HostFileSystemProvider | |
| 18 from object_store_creator import ObjectStoreCreator | |
| 19 from refresh_tracker import RefreshTracker | |
| 20 from render_refresher import RenderRefresher | |
| 21 from server_instance import ServerInstance | |
| 22 from servlet import Servlet, Request, Response | |
| 23 from timer import Timer | |
| 24 | |
| 25 | |
| 26 _log = CustomLogger('cron') | |
| 27 | |
| 28 | |
| 29 class CronServlet(Servlet): | |
| 30 '''Servlet which runs a cron job. | |
| 31 ''' | |
| 32 def __init__(self, request, delegate_for_test=None): | |
| 33 Servlet.__init__(self, request) | |
| 34 self._delegate = delegate_for_test or CronServlet.Delegate() | |
| 35 | |
| 36 class Delegate(object): | |
| 37 '''CronServlet's runtime dependencies. Override for testing. | |
| 38 ''' | |
| 39 def CreateBranchUtility(self, object_store_creator): | |
| 40 return BranchUtility.Create(object_store_creator) | |
| 41 | |
| 42 def CreateHostFileSystemProvider(self, | |
| 43 object_store_creator, | |
| 44 pinned_commit=None): | |
| 45 return HostFileSystemProvider(object_store_creator, | |
| 46 pinned_commit=pinned_commit) | |
| 47 | |
| 48 def CreateGithubFileSystemProvider(self, object_store_creator): | |
| 49 return GithubFileSystemProvider(object_store_creator) | |
| 50 | |
| 51 def CreateGCSFileSystemProvider(self, object_store_creator): | |
| 52 return CloudStorageFileSystemProvider(object_store_creator) | |
| 53 | |
| 54 def GetAppVersion(self): | |
| 55 return GetAppVersion() | |
| 56 | |
| 57 def Get(self): | |
| 58 # Refreshes may time out, and if they do we need to make sure to flush the | |
| 59 # logs before the process gets killed (Python gives us a couple of | |
| 60 # seconds). | |
| 61 # | |
| 62 # So, manually flush logs at the end of the cron run. However, sometimes | |
| 63 # even that isn't enough, which is why in this file we use _log and | |
| 64 # make it flush the log every time its used. | |
| 65 logservice.AUTOFLUSH_ENABLED = False | |
| 66 try: | |
| 67 return self._GetImpl() | |
| 68 except BaseException: | |
| 69 _log.error('Caught top-level exception! %s', traceback.format_exc()) | |
| 70 finally: | |
| 71 logservice.flush() | |
| 72 | |
| 73 def _GetImpl(self): | |
| 74 # Cron strategy: | |
| 75 # | |
| 76 # Collect all DataSources, the PlatformBundle, the ContentProviders, and | |
| 77 # any other statically renderered contents (e.g. examples content), | |
| 78 # and spin up taskqueue tasks which will refresh any cached data relevant | |
| 79 # to these assets. | |
| 80 # | |
| 81 # TODO(rockot/kalman): At the moment examples are not actually refreshed | |
| 82 # because they're too slow. | |
| 83 | |
| 84 _log.info('starting') | |
| 85 | |
| 86 server_instance = self._GetSafeServerInstance() | |
| 87 master_fs = server_instance.host_file_system_provider.GetMaster() | |
| 88 if 'commit' in self._request.arguments: | |
| 89 master_commit = self._request.arguments['commit'] | |
| 90 else: | |
| 91 master_commit = master_fs.GetCommitID().Get() | |
| 92 | |
| 93 # This is the guy that would be responsible for refreshing the cache of | |
| 94 # examples. Here for posterity, hopefully it will be added to the targets | |
| 95 # below someday. | |
| 96 render_refresher = RenderRefresher(server_instance, self._request) | |
| 97 | |
| 98 # Used to register a new refresh cycle keyed on |master_commit|. | |
| 99 refresh_tracker = RefreshTracker(server_instance.object_store_creator) | |
| 100 | |
| 101 # Get the default taskqueue | |
| 102 queue = taskqueue.Queue() | |
| 103 | |
| 104 # GAE documentation specifies that it's bad to add tasks to a queue | |
| 105 # within one second of purging. We wait 2 seconds, because we like | |
| 106 # to go the extra mile. | |
| 107 queue.purge() | |
| 108 time.sleep(2) | |
| 109 | |
| 110 success = True | |
| 111 try: | |
| 112 data_sources = CreateDataSources(server_instance) | |
| 113 targets = (data_sources.items() + | |
| 114 [('content_providers', server_instance.content_providers), | |
| 115 ('platform_bundle', server_instance.platform_bundle)]) | |
| 116 title = 'initializing %s parallel targets' % len(targets) | |
| 117 _log.info(title) | |
| 118 timer = Timer() | |
| 119 tasks = [] | |
| 120 for name, target in targets: | |
| 121 refresh_paths = target.GetRefreshPaths() | |
| 122 tasks += [('%s/%s' % (name, path)).strip('/') for path in refresh_paths] | |
| 123 | |
| 124 # Start a new refresh cycle. In order to detect the completion of a full | |
| 125 # cache refresh, the RefreshServlet (which handles individual refresh | |
| 126 # tasks) will mark each task complete and check the set of completed tasks | |
| 127 # against the set registered here. | |
| 128 refresh_tracker.StartRefresh(master_commit, tasks).Get() | |
| 129 for task in tasks: | |
| 130 queue.add(taskqueue.Task(url='/_refresh/%s' % task, | |
| 131 params={'commit': master_commit})) | |
| 132 | |
| 133 _log.info('%s took %s' % (title, timer.Stop().FormatElapsed())) | |
| 134 except: | |
| 135 # This should never actually happen (each cron step does its own | |
| 136 # conservative error checking), so re-raise no matter what it is. | |
| 137 _log.error('uncaught error: %s' % traceback.format_exc()) | |
| 138 success = False | |
| 139 raise | |
| 140 finally: | |
| 141 _log.info('finished (%s)', 'success' if success else 'FAILED') | |
| 142 return (Response.Ok('Success') if success else | |
| 143 Response.InternalError('Failure')) | |
| 144 | |
| 145 def _GetSafeServerInstance(self): | |
| 146 '''Returns a ServerInstance with a host file system at a safe commit, | |
| 147 meaning the last commit that the current running version of the server | |
| 148 existed. | |
| 149 ''' | |
| 150 delegate = self._delegate | |
| 151 | |
| 152 # IMPORTANT: Get a ServerInstance pinned to the most recent commit, not | |
| 153 # HEAD. These cron jobs take a while and run very frequently such that | |
| 154 # there is usually one running at any given time, and eventually a file | |
| 155 # that we're dealing with will change underneath it, putting the server in | |
| 156 # an undefined state. | |
| 157 server_instance_near_head = self._CreateServerInstance( | |
| 158 self._GetMostRecentCommit()) | |
| 159 | |
| 160 app_yaml_handler = AppYamlHelper( | |
| 161 server_instance_near_head.object_store_creator, | |
| 162 server_instance_near_head.host_file_system_provider) | |
| 163 | |
| 164 if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()): | |
| 165 return server_instance_near_head | |
| 166 | |
| 167 # The version in app.yaml is greater than the currently running app's. | |
| 168 # The safe version is the one before it changed. | |
| 169 safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan( | |
| 170 delegate.GetAppVersion()) - 1 | |
| 171 | |
| 172 _log.info('app version %s is out of date, safe is %s', | |
| 173 delegate.GetAppVersion(), safe_revision) | |
| 174 | |
| 175 return self._CreateServerInstance(safe_revision) | |
| 176 | |
| 177 def _GetMostRecentCommit(self): | |
| 178 '''Gets the commit of the most recent patch submitted to the host file | |
| 179 system. This is similar to HEAD but it's a concrete commit so won't | |
| 180 change as the cron runs. | |
| 181 ''' | |
| 182 head_fs = ( | |
| 183 self._CreateServerInstance(None).host_file_system_provider.GetMaster()) | |
| 184 return head_fs.GetCommitID().Get() | |
| 185 | |
| 186 def _CreateServerInstance(self, commit): | |
| 187 '''Creates a ServerInstance pinned to |commit|, or HEAD if None. | |
| 188 NOTE: If passed None it's likely that during the cron run patches will be | |
| 189 submitted at HEAD, which may change data underneath the cron run. | |
| 190 ''' | |
| 191 object_store_creator = ObjectStoreCreator(start_empty=True) | |
| 192 branch_utility = self._delegate.CreateBranchUtility(object_store_creator) | |
| 193 host_file_system_provider = self._delegate.CreateHostFileSystemProvider( | |
| 194 object_store_creator, pinned_commit=commit) | |
| 195 github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( | |
| 196 object_store_creator) | |
| 197 gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider( | |
| 198 object_store_creator) | |
| 199 return ServerInstance(object_store_creator, | |
| 200 CompiledFileSystem.Factory(object_store_creator), | |
| 201 branch_utility, | |
| 202 host_file_system_provider, | |
| 203 github_file_system_provider, | |
| 204 gcs_file_system_provider) | |
| OLD | NEW |