OLD | NEW |
| (Empty) |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "sync/internal_api/public/shared_model_type_processor.h" | |
6 | |
7 #include <utility> | |
8 #include <vector> | |
9 | |
10 #include "base/bind.h" | |
11 #include "base/location.h" | |
12 #include "base/memory/ptr_util.h" | |
13 #include "base/metrics/histogram.h" | |
14 #include "base/threading/thread_task_runner_handle.h" | |
15 #include "sync/engine/commit_queue.h" | |
16 #include "sync/internal_api/public/activation_context.h" | |
17 #include "sync/internal_api/public/processor_entity_tracker.h" | |
18 #include "sync/syncable/syncable_util.h" | |
19 | |
20 namespace syncer_v2 { | |
21 | |
22 namespace { | |
23 | |
24 class ModelTypeProcessorProxy : public ModelTypeProcessor { | |
25 public: | |
26 ModelTypeProcessorProxy( | |
27 const base::WeakPtr<ModelTypeProcessor>& processor, | |
28 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner); | |
29 ~ModelTypeProcessorProxy() override; | |
30 | |
31 void ConnectSync(std::unique_ptr<CommitQueue> worker) override; | |
32 void DisconnectSync() override; | |
33 void OnCommitCompleted(const sync_pb::DataTypeState& type_state, | |
34 const CommitResponseDataList& response_list) override; | |
35 void OnUpdateReceived(const sync_pb::DataTypeState& type_state, | |
36 const UpdateResponseDataList& updates) override; | |
37 | |
38 private: | |
39 base::WeakPtr<ModelTypeProcessor> processor_; | |
40 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; | |
41 }; | |
42 | |
43 ModelTypeProcessorProxy::ModelTypeProcessorProxy( | |
44 const base::WeakPtr<ModelTypeProcessor>& processor, | |
45 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner) | |
46 : processor_(processor), processor_task_runner_(processor_task_runner) {} | |
47 | |
48 ModelTypeProcessorProxy::~ModelTypeProcessorProxy() {} | |
49 | |
50 void ModelTypeProcessorProxy::ConnectSync(std::unique_ptr<CommitQueue> worker) { | |
51 processor_task_runner_->PostTask( | |
52 FROM_HERE, base::Bind(&ModelTypeProcessor::ConnectSync, processor_, | |
53 base::Passed(std::move(worker)))); | |
54 } | |
55 | |
56 void ModelTypeProcessorProxy::DisconnectSync() { | |
57 processor_task_runner_->PostTask( | |
58 FROM_HERE, base::Bind(&ModelTypeProcessor::DisconnectSync, processor_)); | |
59 } | |
60 | |
61 void ModelTypeProcessorProxy::OnCommitCompleted( | |
62 const sync_pb::DataTypeState& type_state, | |
63 const CommitResponseDataList& response_list) { | |
64 processor_task_runner_->PostTask( | |
65 FROM_HERE, base::Bind(&ModelTypeProcessor::OnCommitCompleted, processor_, | |
66 type_state, response_list)); | |
67 } | |
68 | |
69 void ModelTypeProcessorProxy::OnUpdateReceived( | |
70 const sync_pb::DataTypeState& type_state, | |
71 const UpdateResponseDataList& updates) { | |
72 processor_task_runner_->PostTask( | |
73 FROM_HERE, base::Bind(&ModelTypeProcessor::OnUpdateReceived, processor_, | |
74 type_state, updates)); | |
75 } | |
76 | |
77 } // namespace | |
78 | |
79 SharedModelTypeProcessor::SharedModelTypeProcessor(syncer::ModelType type, | |
80 ModelTypeService* service) | |
81 : type_(type), | |
82 is_metadata_loaded_(false), | |
83 is_initial_pending_data_loaded_(false), | |
84 service_(service), | |
85 error_handler_(nullptr), | |
86 weak_ptr_factory_(this) { | |
87 DCHECK(service); | |
88 } | |
89 | |
90 SharedModelTypeProcessor::~SharedModelTypeProcessor() {} | |
91 | |
92 // static | |
93 std::unique_ptr<ModelTypeChangeProcessor> | |
94 SharedModelTypeProcessor::CreateAsChangeProcessor(syncer::ModelType type, | |
95 ModelTypeService* service) { | |
96 return std::unique_ptr<ModelTypeChangeProcessor>( | |
97 new SharedModelTypeProcessor(type, service)); | |
98 } | |
99 | |
100 void SharedModelTypeProcessor::OnSyncStarting( | |
101 syncer::DataTypeErrorHandler* error_handler, | |
102 const StartCallback& start_callback) { | |
103 DCHECK(CalledOnValidThread()); | |
104 DCHECK(start_callback_.is_null()); | |
105 DCHECK(!IsConnected()); | |
106 DCHECK(error_handler); | |
107 DVLOG(1) << "Sync is starting for " << ModelTypeToString(type_); | |
108 | |
109 error_handler_ = error_handler; | |
110 start_callback_ = start_callback; | |
111 ConnectIfReady(); | |
112 } | |
113 | |
114 void SharedModelTypeProcessor::OnMetadataLoaded( | |
115 syncer::SyncError error, | |
116 std::unique_ptr<MetadataBatch> batch) { | |
117 DCHECK(CalledOnValidThread()); | |
118 DCHECK(entities_.empty()); | |
119 DCHECK(!is_metadata_loaded_); | |
120 DCHECK(!IsConnected()); | |
121 | |
122 is_metadata_loaded_ = true; | |
123 // Flip this flag here to cover all cases where we don't need to load data. | |
124 is_initial_pending_data_loaded_ = true; | |
125 | |
126 if (error.IsSet()) { | |
127 start_error_ = error; | |
128 ConnectIfReady(); | |
129 return; | |
130 } | |
131 | |
132 if (batch->GetDataTypeState().initial_sync_done()) { | |
133 EntityMetadataMap metadata_map(batch->TakeAllMetadata()); | |
134 std::vector<std::string> entities_to_commit; | |
135 | |
136 for (auto it = metadata_map.begin(); it != metadata_map.end(); it++) { | |
137 std::unique_ptr<ProcessorEntityTracker> entity = | |
138 ProcessorEntityTracker::CreateFromMetadata(it->first, &it->second); | |
139 if (entity->RequiresCommitData()) { | |
140 entities_to_commit.push_back(entity->client_tag()); | |
141 } | |
142 entities_[entity->metadata().client_tag_hash()] = std::move(entity); | |
143 } | |
144 data_type_state_ = batch->GetDataTypeState(); | |
145 if (!entities_to_commit.empty()) { | |
146 is_initial_pending_data_loaded_ = false; | |
147 service_->GetData( | |
148 entities_to_commit, | |
149 base::Bind(&SharedModelTypeProcessor::OnInitialPendingDataLoaded, | |
150 weak_ptr_factory_.GetWeakPtr())); | |
151 } | |
152 } else { | |
153 // First time syncing; initialize metadata. | |
154 data_type_state_.mutable_progress_marker()->set_data_type_id( | |
155 GetSpecificsFieldNumberFromModelType(type_)); | |
156 } | |
157 | |
158 ConnectIfReady(); | |
159 } | |
160 | |
161 void SharedModelTypeProcessor::ConnectIfReady() { | |
162 DCHECK(CalledOnValidThread()); | |
163 if (!is_metadata_loaded_ || !is_initial_pending_data_loaded_ || | |
164 start_callback_.is_null()) { | |
165 return; | |
166 } | |
167 | |
168 std::unique_ptr<ActivationContext> activation_context; | |
169 | |
170 if (!start_error_.IsSet()) { | |
171 activation_context = base::WrapUnique(new ActivationContext); | |
172 activation_context->data_type_state = data_type_state_; | |
173 activation_context->type_processor = base::WrapUnique( | |
174 new ModelTypeProcessorProxy(weak_ptr_factory_.GetWeakPtr(), | |
175 base::ThreadTaskRunnerHandle::Get())); | |
176 } | |
177 | |
178 start_callback_.Run(start_error_, std::move(activation_context)); | |
179 start_callback_.Reset(); | |
180 } | |
181 | |
182 bool SharedModelTypeProcessor::IsAllowingChanges() const { | |
183 return is_metadata_loaded_; | |
184 } | |
185 | |
186 bool SharedModelTypeProcessor::IsConnected() const { | |
187 DCHECK(CalledOnValidThread()); | |
188 return !!worker_; | |
189 } | |
190 | |
191 void SharedModelTypeProcessor::DisableSync() { | |
192 DCHECK(CalledOnValidThread()); | |
193 std::unique_ptr<MetadataChangeList> change_list = | |
194 service_->CreateMetadataChangeList(); | |
195 for (auto it = entities_.begin(); it != entities_.end(); ++it) { | |
196 change_list->ClearMetadata(it->second->client_tag()); | |
197 } | |
198 change_list->ClearDataTypeState(); | |
199 // Nothing to do if this fails, so just ignore the error it might return. | |
200 service_->ApplySyncChanges(std::move(change_list), EntityChangeList()); | |
201 } | |
202 | |
203 syncer::SyncError SharedModelTypeProcessor::CreateAndUploadError( | |
204 const tracked_objects::Location& location, | |
205 const std::string& message) { | |
206 if (error_handler_) { | |
207 return error_handler_->CreateAndUploadError(location, message, type_); | |
208 } else { | |
209 return syncer::SyncError(location, syncer::SyncError::DATATYPE_ERROR, | |
210 message, type_); | |
211 } | |
212 } | |
213 | |
214 void SharedModelTypeProcessor::ConnectSync( | |
215 std::unique_ptr<CommitQueue> worker) { | |
216 DCHECK(CalledOnValidThread()); | |
217 DVLOG(1) << "Successfully connected " << ModelTypeToString(type_); | |
218 | |
219 worker_ = std::move(worker); | |
220 | |
221 FlushPendingCommitRequests(); | |
222 } | |
223 | |
224 void SharedModelTypeProcessor::DisconnectSync() { | |
225 DCHECK(CalledOnValidThread()); | |
226 DCHECK(IsConnected()); | |
227 | |
228 DVLOG(1) << "Disconnecting sync for " << ModelTypeToString(type_); | |
229 weak_ptr_factory_.InvalidateWeakPtrs(); | |
230 worker_.reset(); | |
231 | |
232 for (auto it = entities_.begin(); it != entities_.end(); ++it) { | |
233 it->second->ClearTransientSyncState(); | |
234 } | |
235 } | |
236 | |
237 void SharedModelTypeProcessor::Put(const std::string& tag, | |
238 std::unique_ptr<EntityData> data, | |
239 MetadataChangeList* metadata_change_list) { | |
240 DCHECK(IsAllowingChanges()); | |
241 DCHECK(data.get()); | |
242 DCHECK(!data->is_deleted()); | |
243 DCHECK(!data->non_unique_name.empty()); | |
244 DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(data->specifics)); | |
245 | |
246 if (!data_type_state_.initial_sync_done()) { | |
247 // Ignore changes before the initial sync is done. | |
248 return; | |
249 } | |
250 | |
251 // Fill in some data. | |
252 data->client_tag_hash = GetHashForTag(tag); | |
253 if (data->modification_time.is_null()) { | |
254 data->modification_time = base::Time::Now(); | |
255 } | |
256 | |
257 ProcessorEntityTracker* entity = GetEntityForTagHash(data->client_tag_hash); | |
258 | |
259 if (entity == nullptr) { | |
260 // The service is creating a new entity. | |
261 if (data->creation_time.is_null()) { | |
262 data->creation_time = data->modification_time; | |
263 } | |
264 entity = CreateEntity(tag, *data); | |
265 } else if (entity->MatchesData(*data)) { | |
266 // Ignore changes that don't actually change anything. | |
267 return; | |
268 } | |
269 | |
270 entity->MakeLocalChange(std::move(data)); | |
271 metadata_change_list->UpdateMetadata(tag, entity->metadata()); | |
272 | |
273 FlushPendingCommitRequests(); | |
274 } | |
275 | |
276 void SharedModelTypeProcessor::Delete( | |
277 const std::string& tag, | |
278 MetadataChangeList* metadata_change_list) { | |
279 DCHECK(IsAllowingChanges()); | |
280 | |
281 if (!data_type_state_.initial_sync_done()) { | |
282 // Ignore changes before the initial sync is done. | |
283 return; | |
284 } | |
285 | |
286 ProcessorEntityTracker* entity = GetEntityForTag(tag); | |
287 if (entity == nullptr) { | |
288 // That's unusual, but not necessarily a bad thing. | |
289 // Missing is as good as deleted as far as the model is concerned. | |
290 DLOG(WARNING) << "Attempted to delete missing item." | |
291 << " client tag: " << tag; | |
292 return; | |
293 } | |
294 | |
295 entity->Delete(); | |
296 | |
297 metadata_change_list->UpdateMetadata(tag, entity->metadata()); | |
298 FlushPendingCommitRequests(); | |
299 } | |
300 | |
301 void SharedModelTypeProcessor::FlushPendingCommitRequests() { | |
302 CommitRequestDataList commit_requests; | |
303 | |
304 // Don't bother sending anything if there's no one to send to. | |
305 if (!IsConnected()) | |
306 return; | |
307 | |
308 // Don't send anything if the type is not ready to handle commits. | |
309 if (!data_type_state_.initial_sync_done()) | |
310 return; | |
311 | |
312 // TODO(rlarocque): Do something smarter than iterate here. | |
313 for (auto it = entities_.begin(); it != entities_.end(); ++it) { | |
314 ProcessorEntityTracker* entity = it->second.get(); | |
315 if (entity->RequiresCommitRequest() && !entity->RequiresCommitData()) { | |
316 CommitRequestData request; | |
317 entity->InitializeCommitRequestData(&request); | |
318 commit_requests.push_back(request); | |
319 } | |
320 } | |
321 | |
322 if (!commit_requests.empty()) | |
323 worker_->EnqueueForCommit(commit_requests); | |
324 } | |
325 | |
326 void SharedModelTypeProcessor::OnCommitCompleted( | |
327 const sync_pb::DataTypeState& type_state, | |
328 const CommitResponseDataList& response_list) { | |
329 std::unique_ptr<MetadataChangeList> change_list = | |
330 service_->CreateMetadataChangeList(); | |
331 | |
332 data_type_state_ = type_state; | |
333 change_list->UpdateDataTypeState(data_type_state_); | |
334 | |
335 for (const CommitResponseData& data : response_list) { | |
336 ProcessorEntityTracker* entity = GetEntityForTagHash(data.client_tag_hash); | |
337 if (entity == nullptr) { | |
338 NOTREACHED() << "Received commit response for missing item." | |
339 << " type: " << type_ | |
340 << " client_tag_hash: " << data.client_tag_hash; | |
341 continue; | |
342 } | |
343 | |
344 entity->ReceiveCommitResponse(data); | |
345 | |
346 if (entity->CanClearMetadata()) { | |
347 change_list->ClearMetadata(entity->client_tag()); | |
348 entities_.erase(entity->metadata().client_tag_hash()); | |
349 } else { | |
350 change_list->UpdateMetadata(entity->client_tag(), entity->metadata()); | |
351 } | |
352 } | |
353 | |
354 syncer::SyncError error = | |
355 service_->ApplySyncChanges(std::move(change_list), EntityChangeList()); | |
356 if (error.IsSet()) { | |
357 error_handler_->OnSingleDataTypeUnrecoverableError(error); | |
358 } | |
359 } | |
360 | |
361 void SharedModelTypeProcessor::OnUpdateReceived( | |
362 const sync_pb::DataTypeState& data_type_state, | |
363 const UpdateResponseDataList& updates) { | |
364 if (!data_type_state_.initial_sync_done()) { | |
365 OnInitialUpdateReceived(data_type_state, updates); | |
366 return; | |
367 } | |
368 | |
369 std::unique_ptr<MetadataChangeList> metadata_changes = | |
370 service_->CreateMetadataChangeList(); | |
371 EntityChangeList entity_changes; | |
372 | |
373 metadata_changes->UpdateDataTypeState(data_type_state); | |
374 bool got_new_encryption_requirements = | |
375 data_type_state_.encryption_key_name() != | |
376 data_type_state.encryption_key_name(); | |
377 data_type_state_ = data_type_state; | |
378 | |
379 // If new encryption requirements come from the server, the entities that are | |
380 // in |updates| will be recorded here so they can be ignored during the | |
381 // re-encryption phase at the end. | |
382 std::unordered_set<std::string> already_updated; | |
383 | |
384 for (const UpdateResponseData& update : updates) { | |
385 ProcessorEntityTracker* entity = ProcessUpdate(update, &entity_changes); | |
386 | |
387 if (!entity) { | |
388 // The update should be ignored. | |
389 continue; | |
390 } | |
391 | |
392 if (entity->CanClearMetadata()) { | |
393 metadata_changes->ClearMetadata(entity->client_tag()); | |
394 entities_.erase(entity->metadata().client_tag_hash()); | |
395 } else { | |
396 metadata_changes->UpdateMetadata(entity->client_tag(), | |
397 entity->metadata()); | |
398 } | |
399 | |
400 if (got_new_encryption_requirements) { | |
401 already_updated.insert(entity->client_tag()); | |
402 } | |
403 } | |
404 | |
405 if (got_new_encryption_requirements) { | |
406 RecommitAllForEncryption(already_updated, metadata_changes.get()); | |
407 } | |
408 | |
409 // Inform the service of the new or updated data. | |
410 syncer::SyncError error = | |
411 service_->ApplySyncChanges(std::move(metadata_changes), entity_changes); | |
412 | |
413 if (error.IsSet()) { | |
414 error_handler_->OnSingleDataTypeUnrecoverableError(error); | |
415 } else { | |
416 // There may be new reasons to commit by the time this function is done. | |
417 FlushPendingCommitRequests(); | |
418 } | |
419 } | |
420 | |
421 ProcessorEntityTracker* SharedModelTypeProcessor::ProcessUpdate( | |
422 const UpdateResponseData& update, | |
423 EntityChangeList* entity_changes) { | |
424 const EntityData& data = update.entity.value(); | |
425 const std::string& client_tag_hash = data.client_tag_hash; | |
426 ProcessorEntityTracker* entity = GetEntityForTagHash(client_tag_hash); | |
427 if (entity == nullptr) { | |
428 if (data.is_deleted()) { | |
429 DLOG(WARNING) << "Received remote delete for a non-existing item." | |
430 << " client_tag_hash: " << client_tag_hash; | |
431 return nullptr; | |
432 } | |
433 | |
434 entity = CreateEntity(data); | |
435 entity_changes->push_back( | |
436 EntityChange::CreateAdd(entity->client_tag(), update.entity)); | |
437 entity->RecordAcceptedUpdate(update); | |
438 } else if (entity->UpdateIsReflection(update.response_version)) { | |
439 // Seen this update before; just ignore it. | |
440 return nullptr; | |
441 } else if (entity->IsUnsynced()) { | |
442 ConflictResolution::Type resolution_type = | |
443 ResolveConflict(update, entity, entity_changes); | |
444 UMA_HISTOGRAM_ENUMERATION("Sync.ResolveConflict", resolution_type, | |
445 ConflictResolution::TYPE_SIZE); | |
446 } else if (data.is_deleted()) { | |
447 // The entity was deleted; inform the service. Note that the local data | |
448 // can never be deleted at this point because it would have either been | |
449 // acked (the add case) or pending (the conflict case). | |
450 DCHECK(!entity->metadata().is_deleted()); | |
451 entity_changes->push_back(EntityChange::CreateDelete(entity->client_tag())); | |
452 entity->RecordAcceptedUpdate(update); | |
453 } else if (!entity->MatchesData(data)) { | |
454 // Specifics have changed, so update the service. | |
455 entity_changes->push_back( | |
456 EntityChange::CreateUpdate(entity->client_tag(), update.entity)); | |
457 entity->RecordAcceptedUpdate(update); | |
458 } else { | |
459 // No data change; still record that the update was received. | |
460 entity->RecordAcceptedUpdate(update); | |
461 } | |
462 | |
463 // If the received entity has out of date encryption, we schedule another | |
464 // commit to fix it. | |
465 if (data_type_state_.encryption_key_name() != update.encryption_key_name) { | |
466 DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit " | |
467 << update.encryption_key_name << " -> " | |
468 << data_type_state_.encryption_key_name(); | |
469 | |
470 entity->IncrementSequenceNumber(); | |
471 if (entity->RequiresCommitData()) { | |
472 // If there is no pending commit data, then either this update wasn't | |
473 // in conflict or the remote data won; either way the remote data is | |
474 // the right data to re-queue for commit. | |
475 entity->CacheCommitData(update.entity); | |
476 } | |
477 } | |
478 | |
479 return entity; | |
480 } | |
481 | |
482 ConflictResolution::Type SharedModelTypeProcessor::ResolveConflict( | |
483 const UpdateResponseData& update, | |
484 ProcessorEntityTracker* entity, | |
485 EntityChangeList* changes) { | |
486 const EntityData& remote_data = update.entity.value(); | |
487 | |
488 ConflictResolution::Type resolution_type = ConflictResolution::TYPE_SIZE; | |
489 std::unique_ptr<EntityData> new_data; | |
490 | |
491 // Determine the type of resolution. | |
492 if (entity->MatchesData(remote_data)) { | |
493 // The changes are identical so there isn't a real conflict. | |
494 resolution_type = ConflictResolution::CHANGES_MATCH; | |
495 } else if (entity->RequiresCommitData() || | |
496 entity->MatchesBaseData(entity->commit_data().value())) { | |
497 // If commit data needs to be loaded at this point, it can only be due to a | |
498 // re-encryption request. If the commit data matches the base data, it also | |
499 // must be a re-encryption request. Either way there's no real local change | |
500 // and the remote data should win. | |
501 resolution_type = ConflictResolution::IGNORE_LOCAL_ENCRYPTION; | |
502 } else if (entity->MatchesBaseData(remote_data)) { | |
503 // The remote data isn't actually changing from the last remote data that | |
504 // was seen, so it must have been a re-encryption and can be ignored. | |
505 resolution_type = ConflictResolution::IGNORE_REMOTE_ENCRYPTION; | |
506 } else { | |
507 // There's a real data conflict here; let the service resolve it. | |
508 ConflictResolution resolution = | |
509 service_->ResolveConflict(entity->commit_data().value(), remote_data); | |
510 resolution_type = resolution.type(); | |
511 new_data = resolution.ExtractData(); | |
512 } | |
513 | |
514 // Apply the resolution. | |
515 switch (resolution_type) { | |
516 case ConflictResolution::CHANGES_MATCH: | |
517 // Record the update and squash the pending commit. | |
518 entity->RecordForcedUpdate(update); | |
519 break; | |
520 case ConflictResolution::USE_LOCAL: | |
521 case ConflictResolution::IGNORE_REMOTE_ENCRYPTION: | |
522 // Record that we received the update from the server but leave the | |
523 // pending commit intact. | |
524 entity->RecordIgnoredUpdate(update); | |
525 break; | |
526 case ConflictResolution::USE_REMOTE: | |
527 case ConflictResolution::IGNORE_LOCAL_ENCRYPTION: | |
528 // Squash the pending commit. | |
529 entity->RecordForcedUpdate(update); | |
530 // Update client data to match server. | |
531 changes->push_back( | |
532 EntityChange::CreateUpdate(entity->client_tag(), update.entity)); | |
533 break; | |
534 case ConflictResolution::USE_NEW: | |
535 // Record that we received the update. | |
536 entity->RecordIgnoredUpdate(update); | |
537 // Make a new pending commit to update the server. | |
538 entity->MakeLocalChange(std::move(new_data)); | |
539 // Update the client with the new entity. | |
540 changes->push_back(EntityChange::CreateUpdate(entity->client_tag(), | |
541 entity->commit_data())); | |
542 break; | |
543 case ConflictResolution::TYPE_SIZE: | |
544 NOTREACHED(); | |
545 break; | |
546 } | |
547 DCHECK(!new_data); | |
548 | |
549 return resolution_type; | |
550 } | |
551 | |
552 void SharedModelTypeProcessor::RecommitAllForEncryption( | |
553 std::unordered_set<std::string> already_updated, | |
554 MetadataChangeList* metadata_changes) { | |
555 ModelTypeService::ClientTagList entities_needing_data; | |
556 | |
557 for (auto it = entities_.begin(); it != entities_.end(); ++it) { | |
558 ProcessorEntityTracker* entity = it->second.get(); | |
559 if (already_updated.find(entity->client_tag()) != already_updated.end()) { | |
560 continue; | |
561 } | |
562 entity->IncrementSequenceNumber(); | |
563 if (entity->RequiresCommitData()) { | |
564 entities_needing_data.push_back(entity->client_tag()); | |
565 } | |
566 metadata_changes->UpdateMetadata(entity->client_tag(), entity->metadata()); | |
567 } | |
568 | |
569 if (!entities_needing_data.empty()) { | |
570 service_->GetData( | |
571 entities_needing_data, | |
572 base::Bind(&SharedModelTypeProcessor::OnDataLoadedForReEncryption, | |
573 weak_ptr_factory_.GetWeakPtr())); | |
574 } | |
575 } | |
576 | |
577 void SharedModelTypeProcessor::OnInitialUpdateReceived( | |
578 const sync_pb::DataTypeState& data_type_state, | |
579 const UpdateResponseDataList& updates) { | |
580 DCHECK(entities_.empty()); | |
581 // Ensure that initial sync was not already done and that the worker | |
582 // correctly marked initial sync as done for this update. | |
583 DCHECK(!data_type_state_.initial_sync_done()); | |
584 DCHECK(data_type_state.initial_sync_done()); | |
585 | |
586 std::unique_ptr<MetadataChangeList> metadata_changes = | |
587 service_->CreateMetadataChangeList(); | |
588 EntityDataMap data_map; | |
589 | |
590 data_type_state_ = data_type_state; | |
591 metadata_changes->UpdateDataTypeState(data_type_state_); | |
592 | |
593 for (const UpdateResponseData& update : updates) { | |
594 ProcessorEntityTracker* entity = CreateEntity(update.entity.value()); | |
595 const std::string& tag = entity->client_tag(); | |
596 entity->RecordAcceptedUpdate(update); | |
597 metadata_changes->UpdateMetadata(tag, entity->metadata()); | |
598 data_map[tag] = update.entity; | |
599 } | |
600 | |
601 // Let the service handle associating and merging the data. | |
602 syncer::SyncError error = | |
603 service_->MergeSyncData(std::move(metadata_changes), data_map); | |
604 | |
605 if (error.IsSet()) { | |
606 error_handler_->OnSingleDataTypeUnrecoverableError(error); | |
607 } else { | |
608 // We may have new reasons to commit by the time this function is done. | |
609 FlushPendingCommitRequests(); | |
610 } | |
611 } | |
612 | |
613 void SharedModelTypeProcessor::OnInitialPendingDataLoaded( | |
614 syncer::SyncError error, | |
615 std::unique_ptr<DataBatch> data_batch) { | |
616 DCHECK(!is_initial_pending_data_loaded_); | |
617 | |
618 if (error.IsSet()) { | |
619 start_error_ = error; | |
620 } else { | |
621 ConsumeDataBatch(std::move(data_batch)); | |
622 } | |
623 | |
624 is_initial_pending_data_loaded_ = true; | |
625 ConnectIfReady(); | |
626 } | |
627 | |
628 void SharedModelTypeProcessor::OnDataLoadedForReEncryption( | |
629 syncer::SyncError error, | |
630 std::unique_ptr<DataBatch> data_batch) { | |
631 DCHECK(is_initial_pending_data_loaded_); | |
632 | |
633 if (error.IsSet()) { | |
634 error_handler_->OnSingleDataTypeUnrecoverableError(error); | |
635 return; | |
636 } | |
637 | |
638 ConsumeDataBatch(std::move(data_batch)); | |
639 FlushPendingCommitRequests(); | |
640 } | |
641 | |
642 void SharedModelTypeProcessor::ConsumeDataBatch( | |
643 std::unique_ptr<DataBatch> data_batch) { | |
644 while (data_batch->HasNext()) { | |
645 TagAndData data = data_batch->Next(); | |
646 ProcessorEntityTracker* entity = GetEntityForTag(data.first); | |
647 // If the entity wasn't deleted or updated with new commit. | |
648 if (entity != nullptr && entity->RequiresCommitData()) { | |
649 entity->CacheCommitData(data.second.get()); | |
650 } | |
651 } | |
652 } | |
653 | |
654 std::string SharedModelTypeProcessor::GetHashForTag(const std::string& tag) { | |
655 return syncer::syncable::GenerateSyncableHash(type_, tag); | |
656 } | |
657 | |
658 ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTag( | |
659 const std::string& tag) { | |
660 return GetEntityForTagHash(GetHashForTag(tag)); | |
661 } | |
662 | |
663 ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTagHash( | |
664 const std::string& tag_hash) { | |
665 auto it = entities_.find(tag_hash); | |
666 return it != entities_.end() ? it->second.get() : nullptr; | |
667 } | |
668 | |
669 ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity( | |
670 const std::string& tag, | |
671 const EntityData& data) { | |
672 DCHECK(entities_.find(data.client_tag_hash) == entities_.end()); | |
673 std::unique_ptr<ProcessorEntityTracker> entity = | |
674 ProcessorEntityTracker::CreateNew(tag, data.client_tag_hash, data.id, | |
675 data.creation_time); | |
676 ProcessorEntityTracker* entity_ptr = entity.get(); | |
677 entities_[data.client_tag_hash] = std::move(entity); | |
678 return entity_ptr; | |
679 } | |
680 | |
681 ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity( | |
682 const EntityData& data) { | |
683 // Let the service define |client_tag| based on the entity data. | |
684 const std::string tag = service_->GetClientTag(data); | |
685 // This constraint may be relaxed in the future. | |
686 DCHECK_EQ(data.client_tag_hash, GetHashForTag(tag)); | |
687 return CreateEntity(tag, data); | |
688 } | |
689 | |
690 } // namespace syncer_v2 | |
OLD | NEW |