Index: sync/internal_api/shared_model_type_processor.cc |
diff --git a/sync/internal_api/shared_model_type_processor.cc b/sync/internal_api/shared_model_type_processor.cc |
deleted file mode 100644 |
index 8b046240dabaa0a8df7a2e3c8b452994ae202256..0000000000000000000000000000000000000000 |
--- a/sync/internal_api/shared_model_type_processor.cc |
+++ /dev/null |
@@ -1,690 +0,0 @@ |
-// Copyright 2014 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-#include "sync/internal_api/public/shared_model_type_processor.h" |
- |
-#include <utility> |
-#include <vector> |
- |
-#include "base/bind.h" |
-#include "base/location.h" |
-#include "base/memory/ptr_util.h" |
-#include "base/metrics/histogram.h" |
-#include "base/threading/thread_task_runner_handle.h" |
-#include "sync/engine/commit_queue.h" |
-#include "sync/internal_api/public/activation_context.h" |
-#include "sync/internal_api/public/processor_entity_tracker.h" |
-#include "sync/syncable/syncable_util.h" |
- |
-namespace syncer_v2 { |
- |
-namespace { |
- |
-class ModelTypeProcessorProxy : public ModelTypeProcessor { |
- public: |
- ModelTypeProcessorProxy( |
- const base::WeakPtr<ModelTypeProcessor>& processor, |
- const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner); |
- ~ModelTypeProcessorProxy() override; |
- |
- void ConnectSync(std::unique_ptr<CommitQueue> worker) override; |
- void DisconnectSync() override; |
- void OnCommitCompleted(const sync_pb::DataTypeState& type_state, |
- const CommitResponseDataList& response_list) override; |
- void OnUpdateReceived(const sync_pb::DataTypeState& type_state, |
- const UpdateResponseDataList& updates) override; |
- |
- private: |
- base::WeakPtr<ModelTypeProcessor> processor_; |
- scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; |
-}; |
- |
-ModelTypeProcessorProxy::ModelTypeProcessorProxy( |
- const base::WeakPtr<ModelTypeProcessor>& processor, |
- const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner) |
- : processor_(processor), processor_task_runner_(processor_task_runner) {} |
- |
-ModelTypeProcessorProxy::~ModelTypeProcessorProxy() {} |
- |
-void ModelTypeProcessorProxy::ConnectSync(std::unique_ptr<CommitQueue> worker) { |
- processor_task_runner_->PostTask( |
- FROM_HERE, base::Bind(&ModelTypeProcessor::ConnectSync, processor_, |
- base::Passed(std::move(worker)))); |
-} |
- |
-void ModelTypeProcessorProxy::DisconnectSync() { |
- processor_task_runner_->PostTask( |
- FROM_HERE, base::Bind(&ModelTypeProcessor::DisconnectSync, processor_)); |
-} |
- |
-void ModelTypeProcessorProxy::OnCommitCompleted( |
- const sync_pb::DataTypeState& type_state, |
- const CommitResponseDataList& response_list) { |
- processor_task_runner_->PostTask( |
- FROM_HERE, base::Bind(&ModelTypeProcessor::OnCommitCompleted, processor_, |
- type_state, response_list)); |
-} |
- |
-void ModelTypeProcessorProxy::OnUpdateReceived( |
- const sync_pb::DataTypeState& type_state, |
- const UpdateResponseDataList& updates) { |
- processor_task_runner_->PostTask( |
- FROM_HERE, base::Bind(&ModelTypeProcessor::OnUpdateReceived, processor_, |
- type_state, updates)); |
-} |
- |
-} // namespace |
- |
-SharedModelTypeProcessor::SharedModelTypeProcessor(syncer::ModelType type, |
- ModelTypeService* service) |
- : type_(type), |
- is_metadata_loaded_(false), |
- is_initial_pending_data_loaded_(false), |
- service_(service), |
- error_handler_(nullptr), |
- weak_ptr_factory_(this) { |
- DCHECK(service); |
-} |
- |
-SharedModelTypeProcessor::~SharedModelTypeProcessor() {} |
- |
-// static |
-std::unique_ptr<ModelTypeChangeProcessor> |
-SharedModelTypeProcessor::CreateAsChangeProcessor(syncer::ModelType type, |
- ModelTypeService* service) { |
- return std::unique_ptr<ModelTypeChangeProcessor>( |
- new SharedModelTypeProcessor(type, service)); |
-} |
- |
-void SharedModelTypeProcessor::OnSyncStarting( |
- syncer::DataTypeErrorHandler* error_handler, |
- const StartCallback& start_callback) { |
- DCHECK(CalledOnValidThread()); |
- DCHECK(start_callback_.is_null()); |
- DCHECK(!IsConnected()); |
- DCHECK(error_handler); |
- DVLOG(1) << "Sync is starting for " << ModelTypeToString(type_); |
- |
- error_handler_ = error_handler; |
- start_callback_ = start_callback; |
- ConnectIfReady(); |
-} |
- |
-void SharedModelTypeProcessor::OnMetadataLoaded( |
- syncer::SyncError error, |
- std::unique_ptr<MetadataBatch> batch) { |
- DCHECK(CalledOnValidThread()); |
- DCHECK(entities_.empty()); |
- DCHECK(!is_metadata_loaded_); |
- DCHECK(!IsConnected()); |
- |
- is_metadata_loaded_ = true; |
- // Flip this flag here to cover all cases where we don't need to load data. |
- is_initial_pending_data_loaded_ = true; |
- |
- if (error.IsSet()) { |
- start_error_ = error; |
- ConnectIfReady(); |
- return; |
- } |
- |
- if (batch->GetDataTypeState().initial_sync_done()) { |
- EntityMetadataMap metadata_map(batch->TakeAllMetadata()); |
- std::vector<std::string> entities_to_commit; |
- |
- for (auto it = metadata_map.begin(); it != metadata_map.end(); it++) { |
- std::unique_ptr<ProcessorEntityTracker> entity = |
- ProcessorEntityTracker::CreateFromMetadata(it->first, &it->second); |
- if (entity->RequiresCommitData()) { |
- entities_to_commit.push_back(entity->client_tag()); |
- } |
- entities_[entity->metadata().client_tag_hash()] = std::move(entity); |
- } |
- data_type_state_ = batch->GetDataTypeState(); |
- if (!entities_to_commit.empty()) { |
- is_initial_pending_data_loaded_ = false; |
- service_->GetData( |
- entities_to_commit, |
- base::Bind(&SharedModelTypeProcessor::OnInitialPendingDataLoaded, |
- weak_ptr_factory_.GetWeakPtr())); |
- } |
- } else { |
- // First time syncing; initialize metadata. |
- data_type_state_.mutable_progress_marker()->set_data_type_id( |
- GetSpecificsFieldNumberFromModelType(type_)); |
- } |
- |
- ConnectIfReady(); |
-} |
- |
-void SharedModelTypeProcessor::ConnectIfReady() { |
- DCHECK(CalledOnValidThread()); |
- if (!is_metadata_loaded_ || !is_initial_pending_data_loaded_ || |
- start_callback_.is_null()) { |
- return; |
- } |
- |
- std::unique_ptr<ActivationContext> activation_context; |
- |
- if (!start_error_.IsSet()) { |
- activation_context = base::WrapUnique(new ActivationContext); |
- activation_context->data_type_state = data_type_state_; |
- activation_context->type_processor = base::WrapUnique( |
- new ModelTypeProcessorProxy(weak_ptr_factory_.GetWeakPtr(), |
- base::ThreadTaskRunnerHandle::Get())); |
- } |
- |
- start_callback_.Run(start_error_, std::move(activation_context)); |
- start_callback_.Reset(); |
-} |
- |
-bool SharedModelTypeProcessor::IsAllowingChanges() const { |
- return is_metadata_loaded_; |
-} |
- |
-bool SharedModelTypeProcessor::IsConnected() const { |
- DCHECK(CalledOnValidThread()); |
- return !!worker_; |
-} |
- |
-void SharedModelTypeProcessor::DisableSync() { |
- DCHECK(CalledOnValidThread()); |
- std::unique_ptr<MetadataChangeList> change_list = |
- service_->CreateMetadataChangeList(); |
- for (auto it = entities_.begin(); it != entities_.end(); ++it) { |
- change_list->ClearMetadata(it->second->client_tag()); |
- } |
- change_list->ClearDataTypeState(); |
- // Nothing to do if this fails, so just ignore the error it might return. |
- service_->ApplySyncChanges(std::move(change_list), EntityChangeList()); |
-} |
- |
-syncer::SyncError SharedModelTypeProcessor::CreateAndUploadError( |
- const tracked_objects::Location& location, |
- const std::string& message) { |
- if (error_handler_) { |
- return error_handler_->CreateAndUploadError(location, message, type_); |
- } else { |
- return syncer::SyncError(location, syncer::SyncError::DATATYPE_ERROR, |
- message, type_); |
- } |
-} |
- |
-void SharedModelTypeProcessor::ConnectSync( |
- std::unique_ptr<CommitQueue> worker) { |
- DCHECK(CalledOnValidThread()); |
- DVLOG(1) << "Successfully connected " << ModelTypeToString(type_); |
- |
- worker_ = std::move(worker); |
- |
- FlushPendingCommitRequests(); |
-} |
- |
-void SharedModelTypeProcessor::DisconnectSync() { |
- DCHECK(CalledOnValidThread()); |
- DCHECK(IsConnected()); |
- |
- DVLOG(1) << "Disconnecting sync for " << ModelTypeToString(type_); |
- weak_ptr_factory_.InvalidateWeakPtrs(); |
- worker_.reset(); |
- |
- for (auto it = entities_.begin(); it != entities_.end(); ++it) { |
- it->second->ClearTransientSyncState(); |
- } |
-} |
- |
-void SharedModelTypeProcessor::Put(const std::string& tag, |
- std::unique_ptr<EntityData> data, |
- MetadataChangeList* metadata_change_list) { |
- DCHECK(IsAllowingChanges()); |
- DCHECK(data.get()); |
- DCHECK(!data->is_deleted()); |
- DCHECK(!data->non_unique_name.empty()); |
- DCHECK_EQ(type_, syncer::GetModelTypeFromSpecifics(data->specifics)); |
- |
- if (!data_type_state_.initial_sync_done()) { |
- // Ignore changes before the initial sync is done. |
- return; |
- } |
- |
- // Fill in some data. |
- data->client_tag_hash = GetHashForTag(tag); |
- if (data->modification_time.is_null()) { |
- data->modification_time = base::Time::Now(); |
- } |
- |
- ProcessorEntityTracker* entity = GetEntityForTagHash(data->client_tag_hash); |
- |
- if (entity == nullptr) { |
- // The service is creating a new entity. |
- if (data->creation_time.is_null()) { |
- data->creation_time = data->modification_time; |
- } |
- entity = CreateEntity(tag, *data); |
- } else if (entity->MatchesData(*data)) { |
- // Ignore changes that don't actually change anything. |
- return; |
- } |
- |
- entity->MakeLocalChange(std::move(data)); |
- metadata_change_list->UpdateMetadata(tag, entity->metadata()); |
- |
- FlushPendingCommitRequests(); |
-} |
- |
-void SharedModelTypeProcessor::Delete( |
- const std::string& tag, |
- MetadataChangeList* metadata_change_list) { |
- DCHECK(IsAllowingChanges()); |
- |
- if (!data_type_state_.initial_sync_done()) { |
- // Ignore changes before the initial sync is done. |
- return; |
- } |
- |
- ProcessorEntityTracker* entity = GetEntityForTag(tag); |
- if (entity == nullptr) { |
- // That's unusual, but not necessarily a bad thing. |
- // Missing is as good as deleted as far as the model is concerned. |
- DLOG(WARNING) << "Attempted to delete missing item." |
- << " client tag: " << tag; |
- return; |
- } |
- |
- entity->Delete(); |
- |
- metadata_change_list->UpdateMetadata(tag, entity->metadata()); |
- FlushPendingCommitRequests(); |
-} |
- |
-void SharedModelTypeProcessor::FlushPendingCommitRequests() { |
- CommitRequestDataList commit_requests; |
- |
- // Don't bother sending anything if there's no one to send to. |
- if (!IsConnected()) |
- return; |
- |
- // Don't send anything if the type is not ready to handle commits. |
- if (!data_type_state_.initial_sync_done()) |
- return; |
- |
- // TODO(rlarocque): Do something smarter than iterate here. |
- for (auto it = entities_.begin(); it != entities_.end(); ++it) { |
- ProcessorEntityTracker* entity = it->second.get(); |
- if (entity->RequiresCommitRequest() && !entity->RequiresCommitData()) { |
- CommitRequestData request; |
- entity->InitializeCommitRequestData(&request); |
- commit_requests.push_back(request); |
- } |
- } |
- |
- if (!commit_requests.empty()) |
- worker_->EnqueueForCommit(commit_requests); |
-} |
- |
-void SharedModelTypeProcessor::OnCommitCompleted( |
- const sync_pb::DataTypeState& type_state, |
- const CommitResponseDataList& response_list) { |
- std::unique_ptr<MetadataChangeList> change_list = |
- service_->CreateMetadataChangeList(); |
- |
- data_type_state_ = type_state; |
- change_list->UpdateDataTypeState(data_type_state_); |
- |
- for (const CommitResponseData& data : response_list) { |
- ProcessorEntityTracker* entity = GetEntityForTagHash(data.client_tag_hash); |
- if (entity == nullptr) { |
- NOTREACHED() << "Received commit response for missing item." |
- << " type: " << type_ |
- << " client_tag_hash: " << data.client_tag_hash; |
- continue; |
- } |
- |
- entity->ReceiveCommitResponse(data); |
- |
- if (entity->CanClearMetadata()) { |
- change_list->ClearMetadata(entity->client_tag()); |
- entities_.erase(entity->metadata().client_tag_hash()); |
- } else { |
- change_list->UpdateMetadata(entity->client_tag(), entity->metadata()); |
- } |
- } |
- |
- syncer::SyncError error = |
- service_->ApplySyncChanges(std::move(change_list), EntityChangeList()); |
- if (error.IsSet()) { |
- error_handler_->OnSingleDataTypeUnrecoverableError(error); |
- } |
-} |
- |
-void SharedModelTypeProcessor::OnUpdateReceived( |
- const sync_pb::DataTypeState& data_type_state, |
- const UpdateResponseDataList& updates) { |
- if (!data_type_state_.initial_sync_done()) { |
- OnInitialUpdateReceived(data_type_state, updates); |
- return; |
- } |
- |
- std::unique_ptr<MetadataChangeList> metadata_changes = |
- service_->CreateMetadataChangeList(); |
- EntityChangeList entity_changes; |
- |
- metadata_changes->UpdateDataTypeState(data_type_state); |
- bool got_new_encryption_requirements = |
- data_type_state_.encryption_key_name() != |
- data_type_state.encryption_key_name(); |
- data_type_state_ = data_type_state; |
- |
- // If new encryption requirements come from the server, the entities that are |
- // in |updates| will be recorded here so they can be ignored during the |
- // re-encryption phase at the end. |
- std::unordered_set<std::string> already_updated; |
- |
- for (const UpdateResponseData& update : updates) { |
- ProcessorEntityTracker* entity = ProcessUpdate(update, &entity_changes); |
- |
- if (!entity) { |
- // The update should be ignored. |
- continue; |
- } |
- |
- if (entity->CanClearMetadata()) { |
- metadata_changes->ClearMetadata(entity->client_tag()); |
- entities_.erase(entity->metadata().client_tag_hash()); |
- } else { |
- metadata_changes->UpdateMetadata(entity->client_tag(), |
- entity->metadata()); |
- } |
- |
- if (got_new_encryption_requirements) { |
- already_updated.insert(entity->client_tag()); |
- } |
- } |
- |
- if (got_new_encryption_requirements) { |
- RecommitAllForEncryption(already_updated, metadata_changes.get()); |
- } |
- |
- // Inform the service of the new or updated data. |
- syncer::SyncError error = |
- service_->ApplySyncChanges(std::move(metadata_changes), entity_changes); |
- |
- if (error.IsSet()) { |
- error_handler_->OnSingleDataTypeUnrecoverableError(error); |
- } else { |
- // There may be new reasons to commit by the time this function is done. |
- FlushPendingCommitRequests(); |
- } |
-} |
- |
-ProcessorEntityTracker* SharedModelTypeProcessor::ProcessUpdate( |
- const UpdateResponseData& update, |
- EntityChangeList* entity_changes) { |
- const EntityData& data = update.entity.value(); |
- const std::string& client_tag_hash = data.client_tag_hash; |
- ProcessorEntityTracker* entity = GetEntityForTagHash(client_tag_hash); |
- if (entity == nullptr) { |
- if (data.is_deleted()) { |
- DLOG(WARNING) << "Received remote delete for a non-existing item." |
- << " client_tag_hash: " << client_tag_hash; |
- return nullptr; |
- } |
- |
- entity = CreateEntity(data); |
- entity_changes->push_back( |
- EntityChange::CreateAdd(entity->client_tag(), update.entity)); |
- entity->RecordAcceptedUpdate(update); |
- } else if (entity->UpdateIsReflection(update.response_version)) { |
- // Seen this update before; just ignore it. |
- return nullptr; |
- } else if (entity->IsUnsynced()) { |
- ConflictResolution::Type resolution_type = |
- ResolveConflict(update, entity, entity_changes); |
- UMA_HISTOGRAM_ENUMERATION("Sync.ResolveConflict", resolution_type, |
- ConflictResolution::TYPE_SIZE); |
- } else if (data.is_deleted()) { |
- // The entity was deleted; inform the service. Note that the local data |
- // can never be deleted at this point because it would have either been |
- // acked (the add case) or pending (the conflict case). |
- DCHECK(!entity->metadata().is_deleted()); |
- entity_changes->push_back(EntityChange::CreateDelete(entity->client_tag())); |
- entity->RecordAcceptedUpdate(update); |
- } else if (!entity->MatchesData(data)) { |
- // Specifics have changed, so update the service. |
- entity_changes->push_back( |
- EntityChange::CreateUpdate(entity->client_tag(), update.entity)); |
- entity->RecordAcceptedUpdate(update); |
- } else { |
- // No data change; still record that the update was received. |
- entity->RecordAcceptedUpdate(update); |
- } |
- |
- // If the received entity has out of date encryption, we schedule another |
- // commit to fix it. |
- if (data_type_state_.encryption_key_name() != update.encryption_key_name) { |
- DVLOG(2) << ModelTypeToString(type_) << ": Requesting re-encrypt commit " |
- << update.encryption_key_name << " -> " |
- << data_type_state_.encryption_key_name(); |
- |
- entity->IncrementSequenceNumber(); |
- if (entity->RequiresCommitData()) { |
- // If there is no pending commit data, then either this update wasn't |
- // in conflict or the remote data won; either way the remote data is |
- // the right data to re-queue for commit. |
- entity->CacheCommitData(update.entity); |
- } |
- } |
- |
- return entity; |
-} |
- |
-ConflictResolution::Type SharedModelTypeProcessor::ResolveConflict( |
- const UpdateResponseData& update, |
- ProcessorEntityTracker* entity, |
- EntityChangeList* changes) { |
- const EntityData& remote_data = update.entity.value(); |
- |
- ConflictResolution::Type resolution_type = ConflictResolution::TYPE_SIZE; |
- std::unique_ptr<EntityData> new_data; |
- |
- // Determine the type of resolution. |
- if (entity->MatchesData(remote_data)) { |
- // The changes are identical so there isn't a real conflict. |
- resolution_type = ConflictResolution::CHANGES_MATCH; |
- } else if (entity->RequiresCommitData() || |
- entity->MatchesBaseData(entity->commit_data().value())) { |
- // If commit data needs to be loaded at this point, it can only be due to a |
- // re-encryption request. If the commit data matches the base data, it also |
- // must be a re-encryption request. Either way there's no real local change |
- // and the remote data should win. |
- resolution_type = ConflictResolution::IGNORE_LOCAL_ENCRYPTION; |
- } else if (entity->MatchesBaseData(remote_data)) { |
- // The remote data isn't actually changing from the last remote data that |
- // was seen, so it must have been a re-encryption and can be ignored. |
- resolution_type = ConflictResolution::IGNORE_REMOTE_ENCRYPTION; |
- } else { |
- // There's a real data conflict here; let the service resolve it. |
- ConflictResolution resolution = |
- service_->ResolveConflict(entity->commit_data().value(), remote_data); |
- resolution_type = resolution.type(); |
- new_data = resolution.ExtractData(); |
- } |
- |
- // Apply the resolution. |
- switch (resolution_type) { |
- case ConflictResolution::CHANGES_MATCH: |
- // Record the update and squash the pending commit. |
- entity->RecordForcedUpdate(update); |
- break; |
- case ConflictResolution::USE_LOCAL: |
- case ConflictResolution::IGNORE_REMOTE_ENCRYPTION: |
- // Record that we received the update from the server but leave the |
- // pending commit intact. |
- entity->RecordIgnoredUpdate(update); |
- break; |
- case ConflictResolution::USE_REMOTE: |
- case ConflictResolution::IGNORE_LOCAL_ENCRYPTION: |
- // Squash the pending commit. |
- entity->RecordForcedUpdate(update); |
- // Update client data to match server. |
- changes->push_back( |
- EntityChange::CreateUpdate(entity->client_tag(), update.entity)); |
- break; |
- case ConflictResolution::USE_NEW: |
- // Record that we received the update. |
- entity->RecordIgnoredUpdate(update); |
- // Make a new pending commit to update the server. |
- entity->MakeLocalChange(std::move(new_data)); |
- // Update the client with the new entity. |
- changes->push_back(EntityChange::CreateUpdate(entity->client_tag(), |
- entity->commit_data())); |
- break; |
- case ConflictResolution::TYPE_SIZE: |
- NOTREACHED(); |
- break; |
- } |
- DCHECK(!new_data); |
- |
- return resolution_type; |
-} |
- |
-void SharedModelTypeProcessor::RecommitAllForEncryption( |
- std::unordered_set<std::string> already_updated, |
- MetadataChangeList* metadata_changes) { |
- ModelTypeService::ClientTagList entities_needing_data; |
- |
- for (auto it = entities_.begin(); it != entities_.end(); ++it) { |
- ProcessorEntityTracker* entity = it->second.get(); |
- if (already_updated.find(entity->client_tag()) != already_updated.end()) { |
- continue; |
- } |
- entity->IncrementSequenceNumber(); |
- if (entity->RequiresCommitData()) { |
- entities_needing_data.push_back(entity->client_tag()); |
- } |
- metadata_changes->UpdateMetadata(entity->client_tag(), entity->metadata()); |
- } |
- |
- if (!entities_needing_data.empty()) { |
- service_->GetData( |
- entities_needing_data, |
- base::Bind(&SharedModelTypeProcessor::OnDataLoadedForReEncryption, |
- weak_ptr_factory_.GetWeakPtr())); |
- } |
-} |
- |
-void SharedModelTypeProcessor::OnInitialUpdateReceived( |
- const sync_pb::DataTypeState& data_type_state, |
- const UpdateResponseDataList& updates) { |
- DCHECK(entities_.empty()); |
- // Ensure that initial sync was not already done and that the worker |
- // correctly marked initial sync as done for this update. |
- DCHECK(!data_type_state_.initial_sync_done()); |
- DCHECK(data_type_state.initial_sync_done()); |
- |
- std::unique_ptr<MetadataChangeList> metadata_changes = |
- service_->CreateMetadataChangeList(); |
- EntityDataMap data_map; |
- |
- data_type_state_ = data_type_state; |
- metadata_changes->UpdateDataTypeState(data_type_state_); |
- |
- for (const UpdateResponseData& update : updates) { |
- ProcessorEntityTracker* entity = CreateEntity(update.entity.value()); |
- const std::string& tag = entity->client_tag(); |
- entity->RecordAcceptedUpdate(update); |
- metadata_changes->UpdateMetadata(tag, entity->metadata()); |
- data_map[tag] = update.entity; |
- } |
- |
- // Let the service handle associating and merging the data. |
- syncer::SyncError error = |
- service_->MergeSyncData(std::move(metadata_changes), data_map); |
- |
- if (error.IsSet()) { |
- error_handler_->OnSingleDataTypeUnrecoverableError(error); |
- } else { |
- // We may have new reasons to commit by the time this function is done. |
- FlushPendingCommitRequests(); |
- } |
-} |
- |
-void SharedModelTypeProcessor::OnInitialPendingDataLoaded( |
- syncer::SyncError error, |
- std::unique_ptr<DataBatch> data_batch) { |
- DCHECK(!is_initial_pending_data_loaded_); |
- |
- if (error.IsSet()) { |
- start_error_ = error; |
- } else { |
- ConsumeDataBatch(std::move(data_batch)); |
- } |
- |
- is_initial_pending_data_loaded_ = true; |
- ConnectIfReady(); |
-} |
- |
-void SharedModelTypeProcessor::OnDataLoadedForReEncryption( |
- syncer::SyncError error, |
- std::unique_ptr<DataBatch> data_batch) { |
- DCHECK(is_initial_pending_data_loaded_); |
- |
- if (error.IsSet()) { |
- error_handler_->OnSingleDataTypeUnrecoverableError(error); |
- return; |
- } |
- |
- ConsumeDataBatch(std::move(data_batch)); |
- FlushPendingCommitRequests(); |
-} |
- |
-void SharedModelTypeProcessor::ConsumeDataBatch( |
- std::unique_ptr<DataBatch> data_batch) { |
- while (data_batch->HasNext()) { |
- TagAndData data = data_batch->Next(); |
- ProcessorEntityTracker* entity = GetEntityForTag(data.first); |
- // If the entity wasn't deleted or updated with new commit. |
- if (entity != nullptr && entity->RequiresCommitData()) { |
- entity->CacheCommitData(data.second.get()); |
- } |
- } |
-} |
- |
-std::string SharedModelTypeProcessor::GetHashForTag(const std::string& tag) { |
- return syncer::syncable::GenerateSyncableHash(type_, tag); |
-} |
- |
-ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTag( |
- const std::string& tag) { |
- return GetEntityForTagHash(GetHashForTag(tag)); |
-} |
- |
-ProcessorEntityTracker* SharedModelTypeProcessor::GetEntityForTagHash( |
- const std::string& tag_hash) { |
- auto it = entities_.find(tag_hash); |
- return it != entities_.end() ? it->second.get() : nullptr; |
-} |
- |
-ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity( |
- const std::string& tag, |
- const EntityData& data) { |
- DCHECK(entities_.find(data.client_tag_hash) == entities_.end()); |
- std::unique_ptr<ProcessorEntityTracker> entity = |
- ProcessorEntityTracker::CreateNew(tag, data.client_tag_hash, data.id, |
- data.creation_time); |
- ProcessorEntityTracker* entity_ptr = entity.get(); |
- entities_[data.client_tag_hash] = std::move(entity); |
- return entity_ptr; |
-} |
- |
-ProcessorEntityTracker* SharedModelTypeProcessor::CreateEntity( |
- const EntityData& data) { |
- // Let the service define |client_tag| based on the entity data. |
- const std::string tag = service_->GetClientTag(data); |
- // This constraint may be relaxed in the future. |
- DCHECK_EQ(data.client_tag_hash, GetHashForTag(tag)); |
- return CreateEntity(tag, data); |
-} |
- |
-} // namespace syncer_v2 |