| Index: sync/engine/non_blocking_type_processor_core.cc
|
| diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc
|
| deleted file mode 100644
|
| index a14a8d003a1c41f62c1b438940e8e6f93d3dbc7e..0000000000000000000000000000000000000000
|
| --- a/sync/engine/non_blocking_type_processor_core.cc
|
| +++ /dev/null
|
| @@ -1,286 +0,0 @@
|
| -// Copyright 2014 The Chromium Authors. All rights reserved.
|
| -// Use of this source code is governed by a BSD-style license that can be
|
| -// found in the LICENSE file.
|
| -
|
| -#include "sync/engine/non_blocking_type_processor_core.h"
|
| -
|
| -#include "base/bind.h"
|
| -#include "base/format_macros.h"
|
| -#include "base/logging.h"
|
| -#include "base/strings/stringprintf.h"
|
| -#include "sync/engine/commit_contribution.h"
|
| -#include "sync/engine/non_blocking_type_commit_contribution.h"
|
| -#include "sync/engine/non_blocking_type_processor_interface.h"
|
| -#include "sync/engine/sync_thread_sync_entity.h"
|
| -#include "sync/syncable/syncable_util.h"
|
| -#include "sync/util/time.h"
|
| -
|
| -namespace syncer {
|
| -
|
| -NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore(
|
| - ModelType type,
|
| - const DataTypeState& initial_state,
|
| - scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface)
|
| - : type_(type),
|
| - data_type_state_(initial_state),
|
| - processor_interface_(processor_interface.Pass()),
|
| - entities_deleter_(&entities_),
|
| - weak_ptr_factory_(this) {
|
| -}
|
| -
|
| -NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() {
|
| -}
|
| -
|
| -ModelType NonBlockingTypeProcessorCore::GetModelType() const {
|
| - DCHECK(CalledOnValidThread());
|
| - return type_;
|
| -}
|
| -
|
| -// UpdateHandler implementation.
|
| -void NonBlockingTypeProcessorCore::GetDownloadProgress(
|
| - sync_pb::DataTypeProgressMarker* progress_marker) const {
|
| - DCHECK(CalledOnValidThread());
|
| - progress_marker->CopyFrom(data_type_state_.progress_marker);
|
| -}
|
| -
|
| -void NonBlockingTypeProcessorCore::GetDataTypeContext(
|
| - sync_pb::DataTypeContext* context) const {
|
| - DCHECK(CalledOnValidThread());
|
| - context->CopyFrom(data_type_state_.type_context);
|
| -}
|
| -
|
| -SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
|
| - const sync_pb::DataTypeProgressMarker& progress_marker,
|
| - const sync_pb::DataTypeContext& mutated_context,
|
| - const SyncEntityList& applicable_updates,
|
| - sessions::StatusController* status) {
|
| - DCHECK(CalledOnValidThread());
|
| -
|
| - // TODO(rlarocque): Handle data type context conflicts.
|
| - data_type_state_.type_context = mutated_context;
|
| - data_type_state_.progress_marker = progress_marker;
|
| -
|
| - UpdateResponseDataList response_datas;
|
| -
|
| - for (SyncEntityList::const_iterator update_it = applicable_updates.begin();
|
| - update_it != applicable_updates.end();
|
| - ++update_it) {
|
| - const sync_pb::SyncEntity* update_entity = *update_it;
|
| - if (!update_entity->server_defined_unique_tag().empty()) {
|
| - // We can't commit an item unless we know its parent ID. This is where
|
| - // we learn that ID and remember it forever.
|
| - DCHECK_EQ(ModelTypeToRootTag(type_),
|
| - update_entity->server_defined_unique_tag());
|
| - if (!data_type_state_.type_root_id.empty()) {
|
| - DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string());
|
| - }
|
| - data_type_state_.type_root_id = update_entity->id_string();
|
| - } else {
|
| - // Normal updates are handled here.
|
| - const std::string& client_tag_hash =
|
| - update_entity->client_defined_unique_tag();
|
| - DCHECK(!client_tag_hash.empty());
|
| - EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
|
| - if (map_it == entities_.end()) {
|
| - SyncThreadSyncEntity* entity =
|
| - SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(),
|
| - client_tag_hash,
|
| - update_entity->version());
|
| - entities_.insert(std::make_pair(client_tag_hash, entity));
|
| - } else {
|
| - SyncThreadSyncEntity* entity = map_it->second;
|
| - entity->ReceiveUpdate(update_entity->version());
|
| - }
|
| -
|
| - // Prepare the message for the model thread.
|
| - UpdateResponseData response_data;
|
| - response_data.id = update_entity->id_string();
|
| - response_data.client_tag_hash = client_tag_hash;
|
| - response_data.response_version = update_entity->version();
|
| - response_data.ctime = ProtoTimeToTime(update_entity->ctime());
|
| - response_data.mtime = ProtoTimeToTime(update_entity->mtime());
|
| - response_data.non_unique_name = update_entity->name();
|
| - response_data.deleted = update_entity->deleted();
|
| - response_data.specifics = update_entity->specifics();
|
| -
|
| - response_datas.push_back(response_data);
|
| - }
|
| - }
|
| -
|
| - // Forward these updates to the model thread so it can do the rest.
|
| - processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas);
|
| -
|
| - return SYNCER_OK;
|
| -}
|
| -
|
| -void NonBlockingTypeProcessorCore::ApplyUpdates(
|
| - sessions::StatusController* status) {
|
| - DCHECK(CalledOnValidThread());
|
| - // This function is called only when we've finished a download cycle, ie. we
|
| - // got a response with changes_remaining == 0. If this is our first download
|
| - // cycle, we should update our state so the NonBlockingTypeProcessor knows
|
| - // that it's safe to commit items now.
|
| - if (!data_type_state_.initial_sync_done) {
|
| - data_type_state_.initial_sync_done = true;
|
| -
|
| - UpdateResponseDataList empty_update_list;
|
| - processor_interface_->ReceiveUpdateResponse(data_type_state_,
|
| - empty_update_list);
|
| - }
|
| -}
|
| -
|
| -void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
|
| - sessions::StatusController* status) {
|
| - NOTREACHED()
|
| - << "Non-blocking types should never apply updates on sync thread. "
|
| - << "ModelType is: " << ModelTypeToString(type_);
|
| -}
|
| -
|
| -void NonBlockingTypeProcessorCore::EnqueueForCommit(
|
| - const CommitRequestDataList& list) {
|
| - DCHECK(CalledOnValidThread());
|
| -
|
| - DCHECK(CanCommitItems())
|
| - << "Asked to commit items before type was initialized. "
|
| - << "ModelType is: " << ModelTypeToString(type_);
|
| -
|
| - for (CommitRequestDataList::const_iterator it = list.begin();
|
| - it != list.end();
|
| - ++it) {
|
| - StorePendingCommit(*it);
|
| - }
|
| -}
|
| -
|
| -// CommitContributor implementation.
|
| -scoped_ptr<CommitContribution>
|
| -NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) {
|
| - DCHECK(CalledOnValidThread());
|
| -
|
| - size_t space_remaining = max_entries;
|
| - std::vector<int64> sequence_numbers;
|
| - google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
|
| -
|
| - if (!CanCommitItems())
|
| - return scoped_ptr<CommitContribution>();
|
| -
|
| - // TODO(rlarocque): Avoid iterating here.
|
| - for (EntityMap::const_iterator it = entities_.begin();
|
| - it != entities_.end() && space_remaining > 0;
|
| - ++it) {
|
| - SyncThreadSyncEntity* entity = it->second;
|
| - if (entity->IsCommitPending()) {
|
| - sync_pb::SyncEntity* commit_entity = commit_entities.Add();
|
| - int64 sequence_number = -1;
|
| -
|
| - entity->PrepareCommitProto(commit_entity, &sequence_number);
|
| - HelpInitializeCommitEntity(commit_entity);
|
| - sequence_numbers.push_back(sequence_number);
|
| -
|
| - space_remaining--;
|
| - }
|
| - }
|
| -
|
| - if (commit_entities.size() == 0)
|
| - return scoped_ptr<CommitContribution>();
|
| -
|
| - return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution(
|
| - data_type_state_.type_context, commit_entities, sequence_numbers, this));
|
| -}
|
| -
|
| -void NonBlockingTypeProcessorCore::StorePendingCommit(
|
| - const CommitRequestData& request) {
|
| - if (!request.deleted) {
|
| - DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics));
|
| - }
|
| -
|
| - EntityMap::iterator map_it = entities_.find(request.client_tag_hash);
|
| - if (map_it == entities_.end()) {
|
| - SyncThreadSyncEntity* entity =
|
| - SyncThreadSyncEntity::FromCommitRequest(request.id,
|
| - request.client_tag_hash,
|
| - request.sequence_number,
|
| - request.base_version,
|
| - request.ctime,
|
| - request.mtime,
|
| - request.non_unique_name,
|
| - request.deleted,
|
| - request.specifics);
|
| - entities_.insert(std::make_pair(request.client_tag_hash, entity));
|
| - } else {
|
| - SyncThreadSyncEntity* entity = map_it->second;
|
| - entity->RequestCommit(request.id,
|
| - request.client_tag_hash,
|
| - request.sequence_number,
|
| - request.base_version,
|
| - request.ctime,
|
| - request.mtime,
|
| - request.non_unique_name,
|
| - request.deleted,
|
| - request.specifics);
|
| - }
|
| -
|
| - // TODO: Nudge SyncScheduler.
|
| -}
|
| -
|
| -void NonBlockingTypeProcessorCore::OnCommitResponse(
|
| - const CommitResponseDataList& response_list) {
|
| - for (CommitResponseDataList::const_iterator response_it =
|
| - response_list.begin();
|
| - response_it != response_list.end();
|
| - ++response_it) {
|
| - const std::string client_tag_hash = response_it->client_tag_hash;
|
| - EntityMap::iterator map_it = entities_.find(client_tag_hash);
|
| -
|
| - // There's no way we could have committed an entry we know nothing about.
|
| - if (map_it == entities_.end()) {
|
| - NOTREACHED() << "Received commit response for item unknown to us."
|
| - << " Model type: " << ModelTypeToString(type_)
|
| - << " ID: " << response_it->id;
|
| - continue;
|
| - }
|
| -
|
| - SyncThreadSyncEntity* entity = map_it->second;
|
| - entity->ReceiveCommitResponse(response_it->id,
|
| - response_it->response_version,
|
| - response_it->sequence_number);
|
| - }
|
| -
|
| - // Send the responses back to the model thread. It needs to know which
|
| - // items have been successfully committed so it can save that information in
|
| - // permanent storage.
|
| - processor_interface_->ReceiveCommitResponse(data_type_state_, response_list);
|
| -}
|
| -
|
| -base::WeakPtr<NonBlockingTypeProcessorCore>
|
| -NonBlockingTypeProcessorCore::AsWeakPtr() {
|
| - return weak_ptr_factory_.GetWeakPtr();
|
| -}
|
| -
|
| -bool NonBlockingTypeProcessorCore::CanCommitItems() const {
|
| - // We can't commit anything until we know the type's parent node.
|
| - // We'll get it in the first update response.
|
| - return !data_type_state_.type_root_id.empty() &&
|
| - data_type_state_.initial_sync_done;
|
| -}
|
| -
|
| -void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity(
|
| - sync_pb::SyncEntity* sync_entity) {
|
| - // Initial commits need our help to generate a client ID.
|
| - if (!sync_entity->has_id_string()) {
|
| - DCHECK_EQ(kUncommittedVersion, sync_entity->version());
|
| - const int64 id = data_type_state_.next_client_id++;
|
| - sync_entity->set_id_string(
|
| - base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id));
|
| - }
|
| -
|
| - // Always include enough specifics to identify the type. Do this even in
|
| - // deletion requests, where the specifics are otherwise invalid.
|
| - if (!sync_entity->has_specifics()) {
|
| - AddDefaultFieldValue(type_, sync_entity->mutable_specifics());
|
| - }
|
| -
|
| - // We're always responsible for the parent ID.
|
| - sync_entity->set_parent_id_string(data_type_state_.type_root_id);
|
| -}
|
| -
|
| -} // namespace syncer
|
|
|