| Index: sync/engine/non_blocking_type_processor.cc
|
| diff --git a/sync/engine/non_blocking_type_processor.cc b/sync/engine/non_blocking_type_processor.cc
|
| index 66a016b762d8ccd626c935aa8cc89bbe67a6f8c6..0aaead452cf5f34febe2c9b2b1a536fa8e6b3f0e 100644
|
| --- a/sync/engine/non_blocking_type_processor.cc
|
| +++ b/sync/engine/non_blocking_type_processor.cc
|
| @@ -4,9 +4,13 @@
|
|
|
| #include "sync/engine/non_blocking_type_processor.h"
|
|
|
| +#include "base/bind.h"
|
| +#include "base/location.h"
|
| #include "base/message_loop/message_loop_proxy.h"
|
| -#include "sync/engine/non_blocking_type_processor_core.h"
|
| +#include "sync/engine/model_thread_sync_entity.h"
|
| +#include "sync/engine/non_blocking_type_processor_core_interface.h"
|
| #include "sync/internal_api/public/sync_core_proxy.h"
|
| +#include "sync/syncable/syncable_util.h"
|
|
|
| namespace syncer {
|
|
|
| @@ -14,6 +18,7 @@ NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type)
|
| : type_(type),
|
| is_preferred_(false),
|
| is_connected_(false),
|
| + entities_deleter_(&entities_),
|
| weak_ptr_factory_for_ui_(this),
|
| weak_ptr_factory_for_sync_(this) {
|
| }
|
| @@ -40,9 +45,17 @@ void NonBlockingTypeProcessor::Enable(
|
| scoped_ptr<SyncCoreProxy> sync_core_proxy) {
|
| DCHECK(CalledOnValidThread());
|
| DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
|
| +
|
| is_preferred_ = true;
|
| +
|
| + // TODO(rlarocque): At some point, this should be loaded from storage.
|
| + data_type_state_.progress_marker.set_data_type_id(
|
| + GetSpecificsFieldNumberFromModelType(type_));
|
| + data_type_state_.next_client_id = 0;
|
| +
|
| sync_core_proxy_ = sync_core_proxy.Pass();
|
| sync_core_proxy_->ConnectTypeToCore(GetModelType(),
|
| + data_type_state_,
|
| weak_ptr_factory_for_sync_.GetWeakPtr());
|
| }
|
|
|
| @@ -63,8 +76,7 @@ void NonBlockingTypeProcessor::Disconnect() {
|
| }
|
|
|
| weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
|
| - core_ = base::WeakPtr<NonBlockingTypeProcessorCore>();
|
| - sync_thread_ = scoped_refptr<base::SequencedTaskRunner>();
|
| + core_interface_.reset();
|
| }
|
|
|
| base::WeakPtr<NonBlockingTypeProcessor>
|
| @@ -74,13 +86,136 @@ NonBlockingTypeProcessor::AsWeakPtrForUI() {
|
| }
|
|
|
| void NonBlockingTypeProcessor::OnConnect(
|
| - base::WeakPtr<NonBlockingTypeProcessorCore> core,
|
| - scoped_refptr<base::SequencedTaskRunner> sync_thread) {
|
| + scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) {
|
| DCHECK(CalledOnValidThread());
|
| DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
|
| +
|
| is_connected_ = true;
|
| - core_ = core;
|
| - sync_thread_ = sync_thread;
|
| + core_interface_ = core_interface.Pass();
|
| +
|
| + FlushPendingCommitRequests();
|
| +}
|
| +
|
| +void NonBlockingTypeProcessor::Put(const std::string& client_tag,
|
| + const sync_pb::EntitySpecifics& specifics) {
|
| + DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
|
| +
|
| + const std::string client_tag_hash(
|
| + syncable::GenerateSyncableHash(type_, client_tag));
|
| +
|
| + EntityMap::iterator it = entities_.find(client_tag_hash);
|
| + if (it == entities_.end()) {
|
| + scoped_ptr<ModelThreadSyncEntity> entity(
|
| + ModelThreadSyncEntity::NewLocalItem(
|
| + client_tag, specifics, base::Time::Now()));
|
| + entities_.insert(std::make_pair(client_tag_hash, entity.release()));
|
| + } else {
|
| + ModelThreadSyncEntity* entity = it->second;
|
| + entity->MakeLocalChange(specifics);
|
| + }
|
| +
|
| + FlushPendingCommitRequests();
|
| +}
|
| +
|
| +void NonBlockingTypeProcessor::Delete(const std::string& client_tag) {
|
| + const std::string client_tag_hash(
|
| + syncable::GenerateSyncableHash(type_, client_tag));
|
| +
|
| + EntityMap::iterator it = entities_.find(client_tag_hash);
|
| + if (it == entities_.end()) {
|
| + // That's unusual, but not necessarily a bad thing.
|
| + // Missing is as good as deleted as far as the model is concerned.
|
| + DLOG(WARNING) << "Attempted to delete missing item."
|
| + << " client tag: " << client_tag;
|
| + } else {
|
| + ModelThreadSyncEntity* entity = it->second;
|
| + entity->Delete();
|
| + }
|
| +
|
| + FlushPendingCommitRequests();
|
| +}
|
| +
|
| +void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
|
| + CommitRequestDataList commit_requests;
|
| +
|
| + // Don't bother sending anything if there's no one to send to.
|
| + if (!IsConnected())
|
| + return;
|
| +
|
| + // TODO(rlarocque): Do something smarter than iterate here.
|
| + for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
|
| + ++it) {
|
| + if (it->second->RequiresCommitRequest()) {
|
| + CommitRequestData request;
|
| + it->second->InitializeCommitRequestData(&request);
|
| + commit_requests.push_back(request);
|
| + it->second->SetCommitRequestInProgress();
|
| + }
|
| + }
|
| +
|
| + if (!commit_requests.empty())
|
| + core_interface_->RequestCommits(commit_requests);
|
| +}
|
| +
|
| +void NonBlockingTypeProcessor::OnCommitCompletion(
|
| + const DataTypeState& type_state,
|
| + const CommitResponseDataList& response_list) {
|
| + data_type_state_ = type_state;
|
| +
|
| + for (CommitResponseDataList::const_iterator list_it = response_list.begin();
|
| + list_it != response_list.end();
|
| + ++list_it) {
|
| + const CommitResponseData& response_data = *list_it;
|
| + const std::string& client_tag_hash = response_data.client_tag_hash;
|
| +
|
| + EntityMap::iterator it = entities_.find(client_tag_hash);
|
| + if (it == entities_.end()) {
|
| + NOTREACHED() << "Received commit response for missing item."
|
| + << " type: " << type_ << " client_tag: " << client_tag_hash;
|
| + return;
|
| + } else {
|
| + it->second->ReceiveCommitResponse(response_data.id,
|
| + response_data.sequence_number,
|
| + response_data.response_version);
|
| + }
|
| + }
|
| +}
|
| +
|
| +void NonBlockingTypeProcessor::OnUpdateReceived(
|
| + const DataTypeState& data_type_state,
|
| + const UpdateResponseDataList& response_list) {
|
| + data_type_state_ = data_type_state;
|
| +
|
| + for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
|
| + list_it != response_list.end();
|
| + ++list_it) {
|
| + const UpdateResponseData& response_data = *list_it;
|
| + const std::string& client_tag_hash = response_data.client_tag_hash;
|
| +
|
| + EntityMap::iterator it = entities_.find(client_tag_hash);
|
| + if (it == entities_.end()) {
|
| + scoped_ptr<ModelThreadSyncEntity> entity =
|
| + ModelThreadSyncEntity::FromServerUpdate(
|
| + response_data.id,
|
| + response_data.client_tag_hash,
|
| + response_data.non_unique_name,
|
| + response_data.response_version,
|
| + response_data.specifics,
|
| + response_data.deleted,
|
| + response_data.ctime,
|
| + response_data.mtime);
|
| + entities_.insert(std::make_pair(client_tag_hash, entity.release()));
|
| + } else {
|
| + ModelThreadSyncEntity* entity = it->second;
|
| + entity->ApplyUpdateFromServer(response_data.response_version,
|
| + response_data.deleted,
|
| + response_data.specifics,
|
| + response_data.mtime);
|
| + // TODO: Do something special when conflicts are detected.
|
| + }
|
| + }
|
| +
|
| + // TODO: Inform the model of the new or updated data.
|
| }
|
|
|
| } // namespace syncer
|
|
|