| Index: sync/engine/non_blocking_type_processor_core.cc
|
| diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc
|
| index 236c9c5cdddeb1dbfd2ae55e774f6e2bfd8c5b77..b803f36a6c395d438caa43104dcf18a588532e9c 100644
|
| --- a/sync/engine/non_blocking_type_processor_core.cc
|
| +++ b/sync/engine/non_blocking_type_processor_core.cc
|
| @@ -4,20 +4,29 @@
|
|
|
| #include "sync/engine/non_blocking_type_processor_core.h"
|
|
|
| +#include "base/bind.h"
|
| +#include "base/format_macros.h"
|
| #include "base/logging.h"
|
| +#include "base/strings/stringprintf.h"
|
| #include "sync/engine/commit_contribution.h"
|
| +#include "sync/engine/non_blocking_type_commit_contribution.h"
|
| +#include "sync/engine/non_blocking_type_processor.h"
|
| +#include "sync/engine/non_blocking_type_processor_core_interface.h"
|
| +#include "sync/engine/sync_thread_sync_entity.h"
|
| +#include "sync/syncable/syncable_util.h"
|
| +#include "sync/util/time.h"
|
|
|
| namespace syncer {
|
|
|
| NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore(
|
| - ModelType type,
|
| - scoped_refptr<base::SequencedTaskRunner> processor_task_runner,
|
| - base::WeakPtr<NonBlockingTypeProcessor> processor)
|
| + ModelType type,
|
| + DataTypeState initial_state,
|
| + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface)
|
| : type_(type),
|
| - processor_task_runner_(processor_task_runner),
|
| - processor_(processor),
|
| + data_type_state_(initial_state),
|
| + processor_interface_(processor_interface.Pass()),
|
| + entities_deleter_(&entities_),
|
| weak_ptr_factory_(this) {
|
| - progress_marker_.set_data_type_id(GetSpecificsFieldNumberFromModelType(type));
|
| }
|
|
|
| NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() {
|
| @@ -32,16 +41,13 @@ ModelType NonBlockingTypeProcessorCore::GetModelType() const {
|
| void NonBlockingTypeProcessorCore::GetDownloadProgress(
|
| sync_pb::DataTypeProgressMarker* progress_marker) const {
|
| DCHECK(CalledOnValidThread());
|
| - // TODO(rlarocque): Implement this properly. crbug.com/351005.
|
| - DVLOG(1) << "Getting progress for: " << ModelTypeToString(type_);
|
| - *progress_marker = progress_marker_;
|
| + *progress_marker = data_type_state_.progress_marker;
|
| }
|
|
|
| void NonBlockingTypeProcessorCore::GetDataTypeContext(
|
| sync_pb::DataTypeContext* context) const {
|
| - // TODO(rlarocque): Implement this properly. crbug.com/351005.
|
| - DVLOG(1) << "Getting context for: " << ModelTypeToString(type_);
|
| - context->Clear();
|
| + DCHECK(CalledOnValidThread());
|
| + context->CopyFrom(data_type_state_.type_context);
|
| }
|
|
|
| SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
|
| @@ -50,22 +56,68 @@ SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse(
|
| const SyncEntityList& applicable_updates,
|
| sessions::StatusController* status) {
|
| DCHECK(CalledOnValidThread());
|
| - // TODO(rlarocque): Implement this properly. crbug.com/351005.
|
| - DVLOG(1) << "Processing updates response for: " << ModelTypeToString(type_);
|
| - progress_marker_ = progress_marker;
|
| +
|
| + // TODO(rlarocque): Handle data type context conflicts.
|
| + data_type_state_.type_context = mutated_context;
|
| + data_type_state_.progress_marker = progress_marker;
|
| +
|
| + UpdateResponseDataList response_datas;
|
| +
|
| + for (SyncEntityList::const_iterator update_it = applicable_updates.begin();
|
| + update_it != applicable_updates.end();
|
| + ++update_it) {
|
| + const sync_pb::SyncEntity* update_entity = *update_it;
|
| + if (!update_entity->server_defined_unique_tag().empty()) {
|
| + // We can't commit an item unless we know its parent ID. This is where
|
| + // we learn that ID and remember it forever.
|
| + DCHECK_EQ(ModelTypeToRootTag(type_),
|
| + update_entity->server_defined_unique_tag());
|
| + if (!data_type_state_.type_root_id.empty()) {
|
| + DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string());
|
| + }
|
| + data_type_state_.type_root_id = update_entity->id_string();
|
| + } else {
|
| + // More normal updates are handled here.
|
| + const std::string& client_tag_hash =
|
| + update_entity->client_defined_unique_tag();
|
| + DCHECK(!client_tag_hash.empty());
|
| + EntityMap::const_iterator map_it = entities_.find(client_tag_hash);
|
| + if (map_it == entities_.end()) {
|
| + SyncThreadSyncEntity* entity =
|
| + SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(),
|
| + client_tag_hash,
|
| + update_entity->version());
|
| + entities_.insert(std::make_pair(client_tag_hash, entity));
|
| + } else {
|
| + SyncThreadSyncEntity* entity = map_it->second;
|
| + entity->ReceiveUpdate(update_entity->version());
|
| + }
|
| +
|
| + // Prepare the message for the model thread.
|
| + UpdateResponseData response_data;
|
| + response_data.id = update_entity->id_string();
|
| + response_data.client_tag_hash = client_tag_hash;
|
| + response_data.response_version = update_entity->version();
|
| + response_data.ctime = ProtoTimeToTime(update_entity->ctime());
|
| + response_data.mtime = ProtoTimeToTime(update_entity->mtime());
|
| + response_data.deleted = update_entity->deleted();
|
| + response_data.specifics = update_entity->specifics();
|
| +
|
| + response_datas.push_back(response_data);
|
| + }
|
| + }
|
| +
|
| + // That's all the processing we needed to do here. Now we forward these
|
| + // updates to the model thread so it can do the rest.
|
| + processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas);
|
| +
|
| return SYNCER_OK;
|
| }
|
|
|
| void NonBlockingTypeProcessorCore::ApplyUpdates(
|
| sessions::StatusController* status) {
|
| DCHECK(CalledOnValidThread());
|
| - // TODO(rlarocque): Implement this properly. crbug.com/351005.
|
| - DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_);
|
| -}
|
| -
|
| -void NonBlockingTypeProcessorCore::RequestCommits(
|
| - const CommitRequestDataList& request_list) {
|
| - // TODO(rlarocque): Implement this. crbug.com/351005.
|
| + // Do nothing. Non-blocking types have no apply step.
|
| }
|
|
|
| void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
|
| @@ -75,13 +127,111 @@ void NonBlockingTypeProcessorCore::PassiveApplyUpdates(
|
| << "ModelType is: " << ModelTypeToString(type_);
|
| }
|
|
|
| +void NonBlockingTypeProcessorCore::RequestCommits(
|
| + const CommitRequestDataList& list) {
|
| + DCHECK(CalledOnValidThread());
|
| + for (CommitRequestDataList::const_iterator it = list.begin();
|
| + it != list.end();
|
| + ++it) {
|
| + RequestCommit(*it);
|
| + }
|
| +}
|
| +
|
| // CommitContributor implementation.
|
| scoped_ptr<CommitContribution>
|
| NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) {
|
| DCHECK(CalledOnValidThread());
|
| - // TODO(rlarocque): Implement this properly. crbug.com/351005.
|
| - DVLOG(1) << "Getting commit contribution for: " << ModelTypeToString(type_);
|
| - return scoped_ptr<CommitContribution>();
|
| +
|
| + size_t space_remaining = max_entries;
|
| + std::vector<int64> sequence_numbers;
|
| + google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities;
|
| +
|
| + if (!CanCommitItems())
|
| + return scoped_ptr<CommitContribution>();
|
| +
|
| + // TODO(rlarocque): Avoid iterating here.
|
| + for (EntityMap::const_iterator it = entities_.begin();
|
| + it != entities_.end() && space_remaining > 0;
|
| + ++it) {
|
| + SyncThreadSyncEntity* entity = it->second;
|
| + if (entity->IsCommitPending()) {
|
| + sync_pb::SyncEntity* commit_entity = commit_entities.Add();
|
| + int64 sequence_number;
|
| +
|
| + entity->PrepareCommitProto(commit_entity, &sequence_number);
|
| + ContributeToCommitEntity(commit_entity);
|
| + sequence_numbers.push_back(sequence_number);
|
| +
|
| + space_remaining--;
|
| + }
|
| + }
|
| +
|
| + if (commit_entities.size() == 0)
|
| + return scoped_ptr<CommitContribution>();
|
| +
|
| + return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution(
|
| + this, data_type_state_.type_context, commit_entities, sequence_numbers));
|
| +}
|
| +
|
| +void NonBlockingTypeProcessorCore::RequestCommit(
|
| + const CommitRequestData& request) {
|
| + DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics));
|
| +
|
| + EntityMap::iterator map_it = entities_.find(request.client_tag_hash);
|
| + if (map_it == entities_.end()) {
|
| + SyncThreadSyncEntity* entity =
|
| + SyncThreadSyncEntity::FromCommitRequest(request.id,
|
| + request.client_tag_hash,
|
| + request.sequence_number,
|
| + request.base_version,
|
| + request.ctime,
|
| + request.mtime,
|
| + request.non_unique_name,
|
| + request.deleted,
|
| + request.specifics);
|
| + entities_.insert(std::make_pair(request.client_tag_hash, entity));
|
| + } else {
|
| + SyncThreadSyncEntity* entity = map_it->second;
|
| + entity->RequestCommit(request.id,
|
| + request.client_tag_hash,
|
| + request.sequence_number,
|
| + request.base_version,
|
| + request.ctime,
|
| + request.mtime,
|
| + request.non_unique_name,
|
| + request.deleted,
|
| + request.specifics);
|
| + }
|
| +
|
| + // TODO: Nudge SyncScheduler.
|
| +}
|
| +
|
| +void NonBlockingTypeProcessorCore::ProcessCommitResponses(
|
| + const CommitResponseDataList& response_list) {
|
| + for (CommitResponseDataList::const_iterator response_it =
|
| + response_list.begin();
|
| + response_it != response_list.end();
|
| + ++response_it) {
|
| + const std::string client_tag_hash = response_it->client_tag_hash;
|
| + EntityMap::iterator map_it = entities_.find(client_tag_hash);
|
| +
|
| + // There's no way we could have committed an entry we know nothing about.
|
| + if (map_it == entities_.end()) {
|
| + NOTREACHED() << "Received commit response for item unknown to us."
|
| + << " Model type: " << ModelTypeToString(type_)
|
| + << " ID: " << response_it->id;
|
| + continue;
|
| + }
|
| +
|
| + SyncThreadSyncEntity* entity = map_it->second;
|
| + entity->ReceiveCommitResponse(response_it->response_version,
|
| + response_it->sequence_number);
|
| + }
|
| +
|
| + // Send the responses back to the model thread. It needs to know which
|
| + // items have been successfully committed so it can save that information in
|
| + // permanent storage.
|
| + processor_interface_->ReceiveCommitResponse(data_type_state_, response_list);
|
| }
|
|
|
| base::WeakPtr<NonBlockingTypeProcessorCore>
|
| @@ -89,4 +239,25 @@ NonBlockingTypeProcessorCore::AsWeakPtr() {
|
| return weak_ptr_factory_.GetWeakPtr();
|
| }
|
|
|
| +bool NonBlockingTypeProcessorCore::CanCommitItems() const {
|
| + // We can't commit anything until we know the type's parent node.
|
| + // We'll get it in the first update response.
|
| + return !data_type_state_.type_root_id.empty();
|
| +}
|
| +
|
| +void NonBlockingTypeProcessorCore::ContributeToCommitEntity(
|
| + sync_pb::SyncEntity* sync_entity) {
|
| + // Initial commits need our help to generate a client ID.
|
| + if (!sync_entity->has_id_string()) {
|
| + DCHECK_EQ(0, sync_entity->version());
|
| + const int64 id = data_type_state_.next_client_id;
|
| + data_type_state_.next_client_id++;
|
| + sync_entity->set_id_string(
|
| + base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id));
|
| + }
|
| +
|
| + // We're always responsible for the parent ID.
|
| + sync_entity->set_parent_id_string(data_type_state_.type_root_id);
|
| +}
|
| +
|
| } // namespace syncer
|
|
|