OLD | NEW |
| (Empty) |
1 # Copyright 2013 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import time | |
6 import traceback | |
7 | |
8 from app_yaml_helper import AppYamlHelper | |
9 from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue | |
10 from branch_utility import BranchUtility | |
11 from compiled_file_system import CompiledFileSystem | |
12 from custom_logger import CustomLogger | |
13 from data_source_registry import CreateDataSources | |
14 from environment import GetAppVersion | |
15 from gcs_file_system_provider import CloudStorageFileSystemProvider | |
16 from github_file_system_provider import GithubFileSystemProvider | |
17 from host_file_system_provider import HostFileSystemProvider | |
18 from object_store_creator import ObjectStoreCreator | |
19 from refresh_tracker import RefreshTracker | |
20 from render_refresher import RenderRefresher | |
21 from server_instance import ServerInstance | |
22 from servlet import Servlet, Request, Response | |
23 from timer import Timer | |
24 | |
25 | |
26 _log = CustomLogger('cron') | |
27 | |
28 | |
29 class CronServlet(Servlet): | |
30 '''Servlet which runs a cron job. | |
31 ''' | |
32 def __init__(self, request, delegate_for_test=None): | |
33 Servlet.__init__(self, request) | |
34 self._delegate = delegate_for_test or CronServlet.Delegate() | |
35 | |
36 class Delegate(object): | |
37 '''CronServlet's runtime dependencies. Override for testing. | |
38 ''' | |
39 def CreateBranchUtility(self, object_store_creator): | |
40 return BranchUtility.Create(object_store_creator) | |
41 | |
42 def CreateHostFileSystemProvider(self, | |
43 object_store_creator, | |
44 pinned_commit=None): | |
45 return HostFileSystemProvider(object_store_creator, | |
46 pinned_commit=pinned_commit) | |
47 | |
48 def CreateGithubFileSystemProvider(self, object_store_creator): | |
49 return GithubFileSystemProvider(object_store_creator) | |
50 | |
51 def CreateGCSFileSystemProvider(self, object_store_creator): | |
52 return CloudStorageFileSystemProvider(object_store_creator) | |
53 | |
54 def GetAppVersion(self): | |
55 return GetAppVersion() | |
56 | |
57 def Get(self): | |
58 # Refreshes may time out, and if they do we need to make sure to flush the | |
59 # logs before the process gets killed (Python gives us a couple of | |
60 # seconds). | |
61 # | |
62 # So, manually flush logs at the end of the cron run. However, sometimes | |
63 # even that isn't enough, which is why in this file we use _log and | |
64 # make it flush the log every time its used. | |
65 logservice.AUTOFLUSH_ENABLED = False | |
66 try: | |
67 return self._GetImpl() | |
68 except BaseException: | |
69 _log.error('Caught top-level exception! %s', traceback.format_exc()) | |
70 finally: | |
71 logservice.flush() | |
72 | |
73 def _GetImpl(self): | |
74 # Cron strategy: | |
75 # | |
76 # Collect all DataSources, the PlatformBundle, the ContentProviders, and | |
77 # any other statically renderered contents (e.g. examples content), | |
78 # and spin up taskqueue tasks which will refresh any cached data relevant | |
79 # to these assets. | |
80 # | |
81 # TODO(rockot/kalman): At the moment examples are not actually refreshed | |
82 # because they're too slow. | |
83 | |
84 _log.info('starting') | |
85 | |
86 server_instance = self._GetSafeServerInstance() | |
87 master_fs = server_instance.host_file_system_provider.GetMaster() | |
88 if 'commit' in self._request.arguments: | |
89 master_commit = self._request.arguments['commit'] | |
90 else: | |
91 master_commit = master_fs.GetCommitID().Get() | |
92 | |
93 # This is the guy that would be responsible for refreshing the cache of | |
94 # examples. Here for posterity, hopefully it will be added to the targets | |
95 # below someday. | |
96 render_refresher = RenderRefresher(server_instance, self._request) | |
97 | |
98 # Used to register a new refresh cycle keyed on |master_commit|. | |
99 refresh_tracker = RefreshTracker(server_instance.object_store_creator) | |
100 | |
101 # Get the default taskqueue | |
102 queue = taskqueue.Queue() | |
103 | |
104 # GAE documentation specifies that it's bad to add tasks to a queue | |
105 # within one second of purging. We wait 2 seconds, because we like | |
106 # to go the extra mile. | |
107 queue.purge() | |
108 time.sleep(2) | |
109 | |
110 success = True | |
111 try: | |
112 data_sources = CreateDataSources(server_instance) | |
113 targets = (data_sources.items() + | |
114 [('content_providers', server_instance.content_providers), | |
115 ('platform_bundle', server_instance.platform_bundle)]) | |
116 title = 'initializing %s parallel targets' % len(targets) | |
117 _log.info(title) | |
118 timer = Timer() | |
119 tasks = [] | |
120 for name, target in targets: | |
121 refresh_paths = target.GetRefreshPaths() | |
122 tasks += [('%s/%s' % (name, path)).strip('/') for path in refresh_paths] | |
123 | |
124 # Start a new refresh cycle. In order to detect the completion of a full | |
125 # cache refresh, the RefreshServlet (which handles individual refresh | |
126 # tasks) will mark each task complete and check the set of completed tasks | |
127 # against the set registered here. | |
128 refresh_tracker.StartRefresh(master_commit, tasks).Get() | |
129 for task in tasks: | |
130 queue.add(taskqueue.Task(url='/_refresh/%s' % task, | |
131 params={'commit': master_commit})) | |
132 | |
133 _log.info('%s took %s' % (title, timer.Stop().FormatElapsed())) | |
134 except: | |
135 # This should never actually happen (each cron step does its own | |
136 # conservative error checking), so re-raise no matter what it is. | |
137 _log.error('uncaught error: %s' % traceback.format_exc()) | |
138 success = False | |
139 raise | |
140 finally: | |
141 _log.info('finished (%s)', 'success' if success else 'FAILED') | |
142 return (Response.Ok('Success') if success else | |
143 Response.InternalError('Failure')) | |
144 | |
145 def _GetSafeServerInstance(self): | |
146 '''Returns a ServerInstance with a host file system at a safe commit, | |
147 meaning the last commit that the current running version of the server | |
148 existed. | |
149 ''' | |
150 delegate = self._delegate | |
151 | |
152 # IMPORTANT: Get a ServerInstance pinned to the most recent commit, not | |
153 # HEAD. These cron jobs take a while and run very frequently such that | |
154 # there is usually one running at any given time, and eventually a file | |
155 # that we're dealing with will change underneath it, putting the server in | |
156 # an undefined state. | |
157 server_instance_near_head = self._CreateServerInstance( | |
158 self._GetMostRecentCommit()) | |
159 | |
160 app_yaml_handler = AppYamlHelper( | |
161 server_instance_near_head.object_store_creator, | |
162 server_instance_near_head.host_file_system_provider) | |
163 | |
164 if app_yaml_handler.IsUpToDate(delegate.GetAppVersion()): | |
165 return server_instance_near_head | |
166 | |
167 # The version in app.yaml is greater than the currently running app's. | |
168 # The safe version is the one before it changed. | |
169 safe_revision = app_yaml_handler.GetFirstRevisionGreaterThan( | |
170 delegate.GetAppVersion()) - 1 | |
171 | |
172 _log.info('app version %s is out of date, safe is %s', | |
173 delegate.GetAppVersion(), safe_revision) | |
174 | |
175 return self._CreateServerInstance(safe_revision) | |
176 | |
177 def _GetMostRecentCommit(self): | |
178 '''Gets the commit of the most recent patch submitted to the host file | |
179 system. This is similar to HEAD but it's a concrete commit so won't | |
180 change as the cron runs. | |
181 ''' | |
182 head_fs = ( | |
183 self._CreateServerInstance(None).host_file_system_provider.GetMaster()) | |
184 return head_fs.GetCommitID().Get() | |
185 | |
186 def _CreateServerInstance(self, commit): | |
187 '''Creates a ServerInstance pinned to |commit|, or HEAD if None. | |
188 NOTE: If passed None it's likely that during the cron run patches will be | |
189 submitted at HEAD, which may change data underneath the cron run. | |
190 ''' | |
191 object_store_creator = ObjectStoreCreator(start_empty=True) | |
192 branch_utility = self._delegate.CreateBranchUtility(object_store_creator) | |
193 host_file_system_provider = self._delegate.CreateHostFileSystemProvider( | |
194 object_store_creator, pinned_commit=commit) | |
195 github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( | |
196 object_store_creator) | |
197 gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider( | |
198 object_store_creator) | |
199 return ServerInstance(object_store_creator, | |
200 CompiledFileSystem.Factory(object_store_creator), | |
201 branch_utility, | |
202 host_file_system_provider, | |
203 github_file_system_provider, | |
204 gcs_file_system_provider) | |
OLD | NEW |