Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: sync/engine/download.cc

Issue 38803003: sync: Implement per-type update processing (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Clean up and add comments Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/engine/download.h" 5 #include "sync/engine/download.h"
6 6
7 #include <string> 7 #include <string>
8 8
9 #include "base/command_line.h" 9 #include "base/command_line.h"
10 #include "sync/engine/process_updates_command.h" 10 #include "sync/engine/process_updates_util.h"
11 #include "sync/engine/store_timestamps_command.h" 11 #include "sync/engine/sync_directory_update_handler.h"
12 #include "sync/engine/syncer.h" 12 #include "sync/engine/syncer.h"
13 #include "sync/engine/syncer_proto_util.h" 13 #include "sync/engine/syncer_proto_util.h"
14 #include "sync/sessions/nudge_tracker.h" 14 #include "sync/sessions/nudge_tracker.h"
15 #include "sync/syncable/directory.h" 15 #include "sync/syncable/directory.h"
16 #include "sync/syncable/nigori_handler.h" 16 #include "sync/syncable/nigori_handler.h"
17 #include "sync/syncable/syncable_read_transaction.h" 17 #include "sync/syncable/syncable_read_transaction.h"
18 18
19 using sync_pb::DebugInfo; 19 using sync_pb::DebugInfo;
20 20
21 namespace syncer { 21 namespace syncer {
22 22
23 using sessions::StatusController; 23 using sessions::StatusController;
24 using sessions::SyncSession; 24 using sessions::SyncSession;
25 using sessions::SyncSessionContext; 25 using sessions::SyncSessionContext;
26 using std::string; 26 using std::string;
27 27
28 namespace { 28 namespace {
29 29
30 typedef std::map<ModelType, const sync_pb::DataTypeProgressMarker*>
31 TypeProgressMarkerMap;
32
30 SyncerError HandleGetEncryptionKeyResponse( 33 SyncerError HandleGetEncryptionKeyResponse(
31 const sync_pb::ClientToServerResponse& update_response, 34 const sync_pb::ClientToServerResponse& update_response,
32 syncable::Directory* dir) { 35 syncable::Directory* dir) {
33 bool success = false; 36 bool success = false;
34 if (update_response.get_updates().encryption_keys_size() == 0) { 37 if (update_response.get_updates().encryption_keys_size() == 0) {
35 LOG(ERROR) << "Failed to receive encryption key from server."; 38 LOG(ERROR) << "Failed to receive encryption key from server.";
36 return SERVER_RESPONSE_VALIDATION_FAILED; 39 return SERVER_RESPONSE_VALIDATION_FAILED;
37 } 40 }
38 syncable::ReadTransaction trans(FROM_HERE, dir); 41 syncable::ReadTransaction trans(FROM_HERE, dir);
39 syncable::NigoriHandler* nigori_handler = dir->GetNigoriHandler(); 42 syncable::NigoriHandler* nigori_handler = dir->GetNigoriHandler();
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
128 for (ModelTypeSet::Iterator it = request_types.First(); 131 for (ModelTypeSet::Iterator it = request_types.First();
129 it.Good(); it.Inc()) { 132 it.Good(); it.Inc()) {
130 if (ProxyTypes().Has(it.Get())) 133 if (ProxyTypes().Has(it.Get()))
131 continue; 134 continue;
132 sync_pb::DataTypeProgressMarker* progress_marker = 135 sync_pb::DataTypeProgressMarker* progress_marker =
133 get_updates->add_from_progress_marker(); 136 get_updates->add_from_progress_marker();
134 dir->GetDownloadProgress(it.Get(), progress_marker); 137 dir->GetDownloadProgress(it.Get(), progress_marker);
135 } 138 }
136 } 139 }
137 140
141 // Builds a map of ModelTypes to pointers to progress markers in the given
142 // update response. The map is returned in the |marker_map| parameter.
143 void PartitionProgressMarkersByType(
144 const sync_pb::GetUpdatesResponse& gu_response,
145 ModelTypeSet request_types,
146 TypeProgressMarkerMap* marker_map) {
147 for (int i = 0; i < gu_response.new_progress_marker_size(); ++i) {
148 int field_number = gu_response.new_progress_marker(i).data_type_id();
149 ModelType model_type = GetModelTypeFromSpecificsFieldNumber(field_number);
150 if (!IsRealDataType(model_type)) {
151 DLOG(WARNING) << "Unknown field number " << field_number;
152 continue;
153 }
154 if (!request_types.Has(model_type)) {
155 DLOG(WARNING)
156 << "Skipping unexpected progress marker for non-enabled type "
157 << ModelTypeToString(model_type);
158 continue;
159 }
160 marker_map->insert(
161 std::make_pair(model_type, &gu_response.new_progress_marker(i)));
162 }
163 }
164
165 // Examines the contents of the GetUpdates response message and forwards
166 // relevant data to the UpdateHandlers for processing and persisting.
167 bool ProcessUpdateResponseMessage(
168 const sync_pb::GetUpdatesResponse& gu_response,
169 ModelTypeSet proto_request_types,
170 UpdateHandlerMap* handler_map,
171 StatusController* status) {
172 TypeSyncEntityMap updates_by_type;
173 PartitionUpdatesByType(gu_response, proto_request_types, &updates_by_type);
174 DCHECK_EQ(proto_request_types.Size(), updates_by_type.size());
175
176 TypeProgressMarkerMap progress_by_type;
177 PartitionProgressMarkersByType(gu_response,
178 proto_request_types,
179 &progress_by_type);
180 if (proto_request_types.Size() != progress_by_type.size()) {
181 NOTREACHED() << "Missing progress markers in GetUpdates response.";
182 return false;
183 }
184
185 // Iterate over these maps in parallel, processing updates for each type.
186 TypeProgressMarkerMap::iterator pm_it = progress_by_type.begin();
187 TypeSyncEntityMap::iterator up_it = updates_by_type.begin();
188 for ( ; pm_it != progress_by_type.end() && up_it != updates_by_type.end();
189 ++pm_it, ++up_it) {
190 DCHECK_EQ(pm_it->first, up_it->first);
191 ModelType type = pm_it->first;
192
193 UpdateHandlerMap::iterator uh_it = handler_map->find(type);
194
195 if (uh_it != handler_map->end()) {
196 uh_it->second->ProcessGetUpdatesResponse(*pm_it->second,
197 up_it->second,
198 status);
199 } else {
200 DLOG(WARNING)
201 << "Ignoring received updates of a type we can't handle. "
202 << "Type is: " << ModelTypeToString(type);
203 continue;
204 }
205 }
206 DCHECK(pm_it == progress_by_type.end() && up_it == updates_by_type.end());
207
208 return true;
209 }
210
138 } // namespace 211 } // namespace
139 212
140 void BuildNormalDownloadUpdates( 213 void BuildNormalDownloadUpdates(
141 SyncSession* session, 214 SyncSession* session,
142 bool create_mobile_bookmarks_folder, 215 bool create_mobile_bookmarks_folder,
143 ModelTypeSet request_types, 216 ModelTypeSet request_types,
144 const sessions::NudgeTracker& nudge_tracker, 217 const sessions::NudgeTracker& nudge_tracker,
145 sync_pb::ClientToServerMessage* client_to_server_message) { 218 sync_pb::ClientToServerMessage* client_to_server_message) {
146 InitDownloadUpdatesRequest( 219 InitDownloadUpdatesRequest(
147 session, 220 session,
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
227 300
228 // Set legacy GetUpdatesMessage.GetUpdatesCallerInfo information. 301 // Set legacy GetUpdatesMessage.GetUpdatesCallerInfo information.
229 get_updates->mutable_caller_info()->set_source( 302 get_updates->mutable_caller_info()->set_source(
230 sync_pb::GetUpdatesCallerInfo::PERIODIC); 303 sync_pb::GetUpdatesCallerInfo::PERIODIC);
231 304
232 // Set the new and improved version of source, too. 305 // Set the new and improved version of source, too.
233 get_updates->set_get_updates_origin(sync_pb::SyncEnums::PERIODIC); 306 get_updates->set_get_updates_origin(sync_pb::SyncEnums::PERIODIC);
234 } 307 }
235 308
236 SyncerError ExecuteDownloadUpdates( 309 SyncerError ExecuteDownloadUpdates(
310 ModelTypeSet request_types,
237 SyncSession* session, 311 SyncSession* session,
238 sync_pb::ClientToServerMessage* msg) { 312 sync_pb::ClientToServerMessage* msg) {
239 sync_pb::ClientToServerResponse update_response; 313 sync_pb::ClientToServerResponse update_response;
240 StatusController* status = session->mutable_status_controller(); 314 StatusController* status = session->mutable_status_controller();
241 bool need_encryption_key = ShouldRequestEncryptionKey(session->context()); 315 bool need_encryption_key = ShouldRequestEncryptionKey(session->context());
242 316
243 SyncerError result = SyncerProtoUtil::PostClientToServerMessage( 317 SyncerError result = SyncerProtoUtil::PostClientToServerMessage(
244 msg, 318 msg,
245 &update_response, 319 &update_response,
246 session); 320 session);
247 321
248 DVLOG(2) << SyncerProtoUtil::ClientToServerResponseDebugString( 322 DVLOG(2) << SyncerProtoUtil::ClientToServerResponseDebugString(
249 update_response); 323 update_response);
250 324
251 if (result != SYNCER_OK) { 325 if (result != SYNCER_OK) {
252 status->mutable_updates_response()->Clear(); 326 status->mutable_updates_response()->Clear();
253 LOG(ERROR) << "PostClientToServerMessage() failed during GetUpdates"; 327 LOG(ERROR) << "PostClientToServerMessage() failed during GetUpdates";
254 } else { 328 return result;
255 status->mutable_updates_response()->CopyFrom(update_response);
256
257 DVLOG(1) << "GetUpdates "
258 << " returned " << update_response.get_updates().entries_size()
259 << " updates and indicated "
260 << update_response.get_updates().changes_remaining()
261 << " updates left on server.";
262
263 if (need_encryption_key ||
264 update_response.get_updates().encryption_keys_size() > 0) {
265 syncable::Directory* dir = session->context()->directory();
266 status->set_last_get_key_result(
267 HandleGetEncryptionKeyResponse(update_response, dir));
268 }
269 } 329 }
270 330
271 ProcessUpdatesCommand process_updates; 331 status->mutable_updates_response()->CopyFrom(update_response);
272 process_updates.Execute(session);
273 332
274 StoreTimestampsCommand store_timestamps; 333 DVLOG(1) << "GetUpdates "
275 store_timestamps.Execute(session); 334 << " returned " << update_response.get_updates().entries_size()
335 << " updates and indicated "
336 << update_response.get_updates().changes_remaining()
337 << " updates left on server.";
276 338
277 return result; 339 if (need_encryption_key ||
340 update_response.get_updates().encryption_keys_size() > 0) {
341 syncable::Directory* dir = session->context()->directory();
342 status->set_last_get_key_result(
343 HandleGetEncryptionKeyResponse(update_response, dir));
344 }
345
346 const sync_pb::GetUpdatesResponse& gu_response =
347 update_response.get_updates();
348 status->increment_num_updates_downloaded_by(gu_response.entries_size());
349 DCHECK(gu_response.has_changes_remaining());
350 status->set_num_server_changes_remaining(gu_response.changes_remaining());
351
352 const ModelTypeSet proto_request_types =
353 Intersection(request_types, ProtocolTypes());
354
355 if (!ProcessUpdateResponseMessage(gu_response,
356 proto_request_types,
357 session->context()->update_handler_map(),
358 status)) {
359 return SERVER_RESPONSE_VALIDATION_FAILED;
360 } else {
361 return result;
362 }
278 } 363 }
279 364
280 } // namespace syncer 365 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698