OLD | NEW |
| (Empty) |
1 # Copyright 2014 The Chromium Authors. All rights reserved. | |
2 # Use of this source code is governed by a BSD-style license that can be | |
3 # found in the LICENSE file. | |
4 | |
5 import traceback | |
6 | |
7 from app_yaml_helper import AppYamlHelper | |
8 from appengine_wrappers import IsDeadlineExceededError, logservice, taskqueue | |
9 from branch_utility import BranchUtility | |
10 from commit_tracker import CommitTracker | |
11 from compiled_file_system import CompiledFileSystem | |
12 from custom_logger import CustomLogger | |
13 from data_source_registry import CreateDataSource | |
14 from environment import GetAppVersion | |
15 from file_system import IsFileSystemThrottledError | |
16 from future import Future | |
17 from gcs_file_system_provider import CloudStorageFileSystemProvider | |
18 from github_file_system_provider import GithubFileSystemProvider | |
19 from host_file_system_provider import HostFileSystemProvider | |
20 from object_store_creator import ObjectStoreCreator | |
21 from refresh_tracker import RefreshTracker | |
22 from server_instance import ServerInstance | |
23 from servlet import Servlet, Request, Response | |
24 from timer import Timer, TimerClosure | |
25 | |
26 | |
27 _log = CustomLogger('refresh') | |
28 | |
29 | |
30 class RefreshServlet(Servlet): | |
31 '''Servlet which refreshes a single data source. | |
32 ''' | |
33 def __init__(self, request, delegate_for_test=None): | |
34 Servlet.__init__(self, request) | |
35 self._delegate = delegate_for_test or RefreshServlet.Delegate() | |
36 | |
37 class Delegate(object): | |
38 '''RefreshServlet's runtime dependencies. Override for testing. | |
39 ''' | |
40 def CreateBranchUtility(self, object_store_creator): | |
41 return BranchUtility.Create(object_store_creator) | |
42 | |
43 def CreateHostFileSystemProvider(self, | |
44 object_store_creator, | |
45 pinned_commit=None): | |
46 return HostFileSystemProvider(object_store_creator, | |
47 pinned_commit=pinned_commit) | |
48 | |
49 def CreateGithubFileSystemProvider(self, object_store_creator): | |
50 return GithubFileSystemProvider(object_store_creator) | |
51 | |
52 def CreateGCSFileSystemProvider(self, object_store_creator): | |
53 return CloudStorageFileSystemProvider(object_store_creator) | |
54 | |
55 def GetAppVersion(self): | |
56 return GetAppVersion() | |
57 | |
58 def Get(self): | |
59 # Manually flush logs at the end of the run. However, sometimes | |
60 # even that isn't enough, which is why in this file we use the | |
61 # custom logger and make it flush the log every time its used. | |
62 logservice.AUTOFLUSH_ENABLED = False | |
63 try: | |
64 return self._GetImpl() | |
65 except BaseException: | |
66 _log.error('Caught top-level exception! %s', traceback.format_exc()) | |
67 finally: | |
68 logservice.flush() | |
69 | |
70 def _GetImpl(self): | |
71 path = self._request.path.strip('/') | |
72 parts = self._request.path.split('/', 1) | |
73 source_name = parts[0] | |
74 if len(parts) == 2: | |
75 source_path = parts[1] | |
76 else: | |
77 source_path = None | |
78 | |
79 _log.info('starting refresh of %s DataSource %s' % | |
80 (source_name, '' if source_path is None else '[%s]' % source_path)) | |
81 | |
82 if 'commit' in self._request.arguments: | |
83 commit = self._request.arguments['commit'] | |
84 else: | |
85 _log.warning('No commit given; refreshing from master. ' | |
86 'This is probably NOT what you want.') | |
87 commit = None | |
88 | |
89 server_instance = self._CreateServerInstance(commit) | |
90 commit_tracker = CommitTracker(server_instance.object_store_creator) | |
91 refresh_tracker = RefreshTracker(server_instance.object_store_creator) | |
92 | |
93 # If no commit was given, use the ID of the last cached master commit. | |
94 # This allows sources external to the chromium repository to be updated | |
95 # independently from individual refresh cycles. | |
96 if commit is None: | |
97 commit = commit_tracker.Get('master').Get() | |
98 | |
99 success = True | |
100 try: | |
101 if source_name == 'platform_bundle': | |
102 data_source = server_instance.platform_bundle | |
103 elif source_name == 'content_providers': | |
104 data_source = server_instance.content_providers | |
105 else: | |
106 data_source = CreateDataSource(source_name, server_instance) | |
107 | |
108 class_name = data_source.__class__.__name__ | |
109 refresh_future = data_source.Refresh(source_path) | |
110 assert isinstance(refresh_future, Future), ( | |
111 '%s.Refresh() did not return a Future' % class_name) | |
112 timer = Timer() | |
113 try: | |
114 refresh_future.Get() | |
115 | |
116 # Mark this (commit, task) pair as completed and then see if this | |
117 # concludes the full cache refresh. The list of tasks required to | |
118 # complete a cache refresh is registered (and keyed on commit ID) by the | |
119 # CronServlet before kicking off all the refresh tasks. | |
120 (refresh_tracker.MarkTaskComplete(commit, path) | |
121 .Then(lambda _: refresh_tracker.GetRefreshComplete(commit)) | |
122 .Then(lambda is_complete: | |
123 commit_tracker.Set('master', commit) if is_complete else None) | |
124 .Get()) | |
125 except Exception as e: | |
126 _log.error('%s: error %s' % (class_name, traceback.format_exc())) | |
127 success = False | |
128 if IsFileSystemThrottledError(e): | |
129 return Response.ThrottledError('Throttled') | |
130 raise | |
131 finally: | |
132 _log.info('Refreshing %s took %s' % | |
133 (class_name, timer.Stop().FormatElapsed())) | |
134 | |
135 except: | |
136 success = False | |
137 # This should never actually happen. | |
138 _log.error('uncaught error: %s' % traceback.format_exc()) | |
139 raise | |
140 finally: | |
141 _log.info('finished (%s)', 'success' if success else 'FAILED') | |
142 return (Response.Ok('Success') if success else | |
143 Response.InternalError('Failure')) | |
144 | |
145 def _CreateServerInstance(self, commit): | |
146 '''Creates a ServerInstance pinned to |commit|, or HEAD if None. | |
147 NOTE: If passed None it's likely that during the cron run patches will be | |
148 submitted at HEAD, which may change data underneath the cron run. | |
149 ''' | |
150 object_store_creator = ObjectStoreCreator(start_empty=True) | |
151 branch_utility = self._delegate.CreateBranchUtility(object_store_creator) | |
152 host_file_system_provider = self._delegate.CreateHostFileSystemProvider( | |
153 object_store_creator, pinned_commit=commit) | |
154 github_file_system_provider = self._delegate.CreateGithubFileSystemProvider( | |
155 object_store_creator) | |
156 gcs_file_system_provider = self._delegate.CreateGCSFileSystemProvider( | |
157 object_store_creator) | |
158 return ServerInstance(object_store_creator, | |
159 CompiledFileSystem.Factory(object_store_creator), | |
160 branch_utility, | |
161 host_file_system_provider, | |
162 github_file_system_provider, | |
163 gcs_file_system_provider) | |
OLD | NEW |