Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(772)

Unified Diff: sync/engine/non_blocking_type_processor.cc

Issue 280983002: Implement sync in the NonBlockingTypeProcessor (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Add some comments Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sync/engine/non_blocking_type_processor.cc
diff --git a/sync/engine/non_blocking_type_processor.cc b/sync/engine/non_blocking_type_processor.cc
index 66a016b762d8ccd626c935aa8cc89bbe67a6f8c6..152551940d7bae90760eabb50a33d51646ab5faa 100644
--- a/sync/engine/non_blocking_type_processor.cc
+++ b/sync/engine/non_blocking_type_processor.cc
@@ -4,9 +4,13 @@
#include "sync/engine/non_blocking_type_processor.h"
+#include "base/bind.h"
+#include "base/location.h"
#include "base/message_loop/message_loop_proxy.h"
-#include "sync/engine/non_blocking_type_processor_core.h"
+#include "sync/engine/model_thread_sync_entity.h"
+#include "sync/engine/non_blocking_type_processor_core_interface.h"
#include "sync/internal_api/public/sync_core_proxy.h"
+#include "sync/syncable/syncable_util.h"
namespace syncer {
@@ -14,6 +18,7 @@ NonBlockingTypeProcessor::NonBlockingTypeProcessor(ModelType type)
: type_(type),
is_preferred_(false),
is_connected_(false),
+ entities_deleter_(&entities_),
weak_ptr_factory_for_ui_(this),
weak_ptr_factory_for_sync_(this) {
}
@@ -40,9 +45,17 @@ void NonBlockingTypeProcessor::Enable(
scoped_ptr<SyncCoreProxy> sync_core_proxy) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "Asked to enable " << ModelTypeToString(type_);
+
is_preferred_ = true;
+
+ // TODO(rlarocque): At some point, this should be loaded from storage.
+ data_type_state_.progress_marker.set_data_type_id(
+ GetSpecificsFieldNumberFromModelType(type_));
+ data_type_state_.next_id = 0;
+
sync_core_proxy_ = sync_core_proxy.Pass();
sync_core_proxy_->ConnectTypeToCore(GetModelType(),
+ data_type_state_,
weak_ptr_factory_for_sync_.GetWeakPtr());
}
@@ -63,8 +76,7 @@ void NonBlockingTypeProcessor::Disconnect() {
}
weak_ptr_factory_for_sync_.InvalidateWeakPtrs();
- core_ = base::WeakPtr<NonBlockingTypeProcessorCore>();
- sync_thread_ = scoped_refptr<base::SequencedTaskRunner>();
+ core_interface_.reset();
}
base::WeakPtr<NonBlockingTypeProcessor>
@@ -74,13 +86,134 @@ NonBlockingTypeProcessor::AsWeakPtrForUI() {
}
void NonBlockingTypeProcessor::OnConnect(
- base::WeakPtr<NonBlockingTypeProcessorCore> core,
- scoped_refptr<base::SequencedTaskRunner> sync_thread) {
+ scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "Successfully connected " << ModelTypeToString(type_);
+
is_connected_ = true;
- core_ = core;
- sync_thread_ = sync_thread;
+ core_interface_ = core_interface.Pass();
+
+ FlushPendingCommitRequests();
+}
+
+void NonBlockingTypeProcessor::Put(const std::string& client_tag,
+ const sync_pb::EntitySpecifics& specifics) {
+ DCHECK_EQ(type_, GetModelTypeFromSpecifics(specifics));
+
+ const std::string client_tag_hash(
+ syncable::GenerateSyncableHash(type_, client_tag));
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ ModelThreadSyncEntity* entity = ModelThreadSyncEntity::NewLocalItem(
Nicolas Zea 2014/05/19 23:54:15 this should probably be using scoped_ptr
rlarocque 2014/05/20 01:39:47 Done.
+ client_tag, specifics, base::Time::Now());
+ entities_.insert(std::make_pair(client_tag_hash, entity));
+ } else {
+ ModelThreadSyncEntity* entity = it->second;
+ entity->MakeLocalChange(client_tag, specifics);
+ }
+
+ FlushPendingCommitRequests();
+}
+
+void NonBlockingTypeProcessor::Delete(const std::string& client_tag) {
+ const std::string client_tag_hash(
+ syncable::GenerateSyncableHash(type_, client_tag));
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ // That's unusual, but not necessarily a bad thing.
+ // Missing is as good as deleted as far as the model is concerned.
+ DLOG(WARNING) << "Attempted to delete missing item."
Nicolas Zea 2014/05/19 23:54:15 Given our discussion about these constraints being
rlarocque 2014/05/20 01:39:47 Agreed. I consider the Put/Delete interface to be
+ << " client tag: " << client_tag;
+ } else {
+ ModelThreadSyncEntity* entity = it->second;
+ entity->Delete(client_tag);
+ }
+
+ FlushPendingCommitRequests();
+}
+
+void NonBlockingTypeProcessor::FlushPendingCommitRequests() {
+ CommitRequestDataList commit_requests;
+
+ // Don't bother sending anything if there's no one to send to.
+ if (!IsConnected())
+ return;
+
+ // TODO(rlarocque): Do something smarter than iterate here.
+ for (EntityMap::iterator it = entities_.begin(); it != entities_.end();
+ ++it) {
+ if (it->second->RequiresCommitRequest()) {
+ CommitRequestData request;
+ it->second->InitializeCommitRequestData(&request);
+ commit_requests.push_back(request);
+ it->second->SetCommitRequestInProgress();
+ }
+ }
+
+ if (!commit_requests.empty())
+ core_interface_->RequestCommits(commit_requests);
+}
+
+void NonBlockingTypeProcessor::ReceiveCommitResponse(
+ const DataTypeState& type_state,
+ const CommitResponseDataList& response_list) {
+ data_type_state_ = type_state;
+
+ for (CommitResponseDataList::const_iterator list_it = response_list.begin();
+ list_it != response_list.end();
+ ++list_it) {
+ const CommitResponseData& response_data = *list_it;
+ const std::string& client_tag_hash = response_data.client_tag_hash;
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ NOTREACHED() << "Received commit response for missing item."
+ << " type: " << type_ << " client_tag: " << client_tag_hash;
+ return;
+ } else {
+ it->second->ReceiveCommitResponse(response_data.id,
+ response_data.sequence_number,
+ response_data.response_version);
+ }
+ }
+}
+
+void NonBlockingTypeProcessor::ReceiveUpdateResponse(
+ const DataTypeState& data_type_state,
+ const UpdateResponseDataList& response_list) {
+ data_type_state_ = data_type_state;
+
+ for (UpdateResponseDataList::const_iterator list_it = response_list.begin();
+ list_it != response_list.end();
+ ++list_it) {
+ const UpdateResponseData& response_data = *list_it;
+ const std::string& client_tag_hash = response_data.client_tag_hash;
+
+ EntityMap::iterator it = entities_.find(client_tag_hash);
+ if (it == entities_.end()) {
+ ModelThreadSyncEntity* entity = ModelThreadSyncEntity::FromServerUpdate(
+ response_data.id,
+ response_data.client_tag_hash,
+ response_data.non_unique_name,
+ response_data.response_version,
+ response_data.specifics,
+ response_data.deleted,
+ response_data.ctime,
+ response_data.mtime);
+ entities_.insert(std::make_pair(client_tag_hash, entity));
+ } else {
+ ModelThreadSyncEntity* entity = it->second;
+ entity->ApplyUpdateFromServer(response_data.response_version,
+ response_data.deleted,
+ response_data.specifics,
+ response_data.mtime);
+ // TODO: Do something special when conflicts are detected.
+ }
+ }
+
+ // TODO: Inform the model of the new or updated data.
}
} // namespace syncer

Powered by Google App Engine
This is Rietveld 408576698