Index: sync/engine/non_blocking_type_processor_core.cc |
diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc |
index 236c9c5cdddeb1dbfd2ae55e774f6e2bfd8c5b77..a14a8d003a1c41f62c1b438940e8e6f93d3dbc7e 100644 |
--- a/sync/engine/non_blocking_type_processor_core.cc |
+++ b/sync/engine/non_blocking_type_processor_core.cc |
@@ -4,20 +4,28 @@ |
#include "sync/engine/non_blocking_type_processor_core.h" |
+#include "base/bind.h" |
+#include "base/format_macros.h" |
#include "base/logging.h" |
+#include "base/strings/stringprintf.h" |
#include "sync/engine/commit_contribution.h" |
+#include "sync/engine/non_blocking_type_commit_contribution.h" |
+#include "sync/engine/non_blocking_type_processor_interface.h" |
+#include "sync/engine/sync_thread_sync_entity.h" |
+#include "sync/syncable/syncable_util.h" |
+#include "sync/util/time.h" |
namespace syncer { |
NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore( |
- ModelType type, |
- scoped_refptr<base::SequencedTaskRunner> processor_task_runner, |
- base::WeakPtr<NonBlockingTypeProcessor> processor) |
+ ModelType type, |
+ const DataTypeState& initial_state, |
+ scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface) |
: type_(type), |
- processor_task_runner_(processor_task_runner), |
- processor_(processor), |
+ data_type_state_(initial_state), |
+ processor_interface_(processor_interface.Pass()), |
+ entities_deleter_(&entities_), |
weak_ptr_factory_(this) { |
- progress_marker_.set_data_type_id(GetSpecificsFieldNumberFromModelType(type)); |
} |
NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() { |
@@ -32,16 +40,13 @@ ModelType NonBlockingTypeProcessorCore::GetModelType() const { |
void NonBlockingTypeProcessorCore::GetDownloadProgress( |
sync_pb::DataTypeProgressMarker* progress_marker) const { |
DCHECK(CalledOnValidThread()); |
- // TODO(rlarocque): Implement this properly. crbug.com/351005. |
- DVLOG(1) << "Getting progress for: " << ModelTypeToString(type_); |
- *progress_marker = progress_marker_; |
+ progress_marker->CopyFrom(data_type_state_.progress_marker); |
} |
void NonBlockingTypeProcessorCore::GetDataTypeContext( |
sync_pb::DataTypeContext* context) const { |
- // TODO(rlarocque): Implement this properly. crbug.com/351005. |
- DVLOG(1) << "Getting context for: " << ModelTypeToString(type_); |
- context->Clear(); |
+ DCHECK(CalledOnValidThread()); |
+ context->CopyFrom(data_type_state_.type_context); |
} |
SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( |
@@ -50,22 +55,78 @@ SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( |
const SyncEntityList& applicable_updates, |
sessions::StatusController* status) { |
DCHECK(CalledOnValidThread()); |
- // TODO(rlarocque): Implement this properly. crbug.com/351005. |
- DVLOG(1) << "Processing updates response for: " << ModelTypeToString(type_); |
- progress_marker_ = progress_marker; |
+ |
+ // TODO(rlarocque): Handle data type context conflicts. |
+ data_type_state_.type_context = mutated_context; |
+ data_type_state_.progress_marker = progress_marker; |
+ |
+ UpdateResponseDataList response_datas; |
+ |
+ for (SyncEntityList::const_iterator update_it = applicable_updates.begin(); |
+ update_it != applicable_updates.end(); |
+ ++update_it) { |
+ const sync_pb::SyncEntity* update_entity = *update_it; |
+ if (!update_entity->server_defined_unique_tag().empty()) { |
+ // We can't commit an item unless we know its parent ID. This is where |
+ // we learn that ID and remember it forever. |
+ DCHECK_EQ(ModelTypeToRootTag(type_), |
+ update_entity->server_defined_unique_tag()); |
+ if (!data_type_state_.type_root_id.empty()) { |
+ DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string()); |
+ } |
+ data_type_state_.type_root_id = update_entity->id_string(); |
+ } else { |
+ // Normal updates are handled here. |
+ const std::string& client_tag_hash = |
+ update_entity->client_defined_unique_tag(); |
+ DCHECK(!client_tag_hash.empty()); |
+ EntityMap::const_iterator map_it = entities_.find(client_tag_hash); |
+ if (map_it == entities_.end()) { |
+ SyncThreadSyncEntity* entity = |
+ SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(), |
+ client_tag_hash, |
+ update_entity->version()); |
+ entities_.insert(std::make_pair(client_tag_hash, entity)); |
+ } else { |
+ SyncThreadSyncEntity* entity = map_it->second; |
+ entity->ReceiveUpdate(update_entity->version()); |
+ } |
+ |
+ // Prepare the message for the model thread. |
+ UpdateResponseData response_data; |
+ response_data.id = update_entity->id_string(); |
+ response_data.client_tag_hash = client_tag_hash; |
+ response_data.response_version = update_entity->version(); |
+ response_data.ctime = ProtoTimeToTime(update_entity->ctime()); |
+ response_data.mtime = ProtoTimeToTime(update_entity->mtime()); |
+ response_data.non_unique_name = update_entity->name(); |
+ response_data.deleted = update_entity->deleted(); |
+ response_data.specifics = update_entity->specifics(); |
+ |
+ response_datas.push_back(response_data); |
+ } |
+ } |
+ |
+ // Forward these updates to the model thread so it can do the rest. |
+ processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas); |
+ |
return SYNCER_OK; |
} |
void NonBlockingTypeProcessorCore::ApplyUpdates( |
sessions::StatusController* status) { |
DCHECK(CalledOnValidThread()); |
- // TODO(rlarocque): Implement this properly. crbug.com/351005. |
- DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_); |
-} |
+ // This function is called only when we've finished a download cycle, ie. we |
+ // got a response with changes_remaining == 0. If this is our first download |
+ // cycle, we should update our state so the NonBlockingTypeProcessor knows |
+ // that it's safe to commit items now. |
+ if (!data_type_state_.initial_sync_done) { |
+ data_type_state_.initial_sync_done = true; |
-void NonBlockingTypeProcessorCore::RequestCommits( |
- const CommitRequestDataList& request_list) { |
- // TODO(rlarocque): Implement this. crbug.com/351005. |
+ UpdateResponseDataList empty_update_list; |
+ processor_interface_->ReceiveUpdateResponse(data_type_state_, |
+ empty_update_list); |
+ } |
} |
void NonBlockingTypeProcessorCore::PassiveApplyUpdates( |
@@ -75,13 +136,119 @@ void NonBlockingTypeProcessorCore::PassiveApplyUpdates( |
<< "ModelType is: " << ModelTypeToString(type_); |
} |
+void NonBlockingTypeProcessorCore::EnqueueForCommit( |
+ const CommitRequestDataList& list) { |
+ DCHECK(CalledOnValidThread()); |
+ |
+ DCHECK(CanCommitItems()) |
+ << "Asked to commit items before type was initialized. " |
+ << "ModelType is: " << ModelTypeToString(type_); |
+ |
+ for (CommitRequestDataList::const_iterator it = list.begin(); |
+ it != list.end(); |
+ ++it) { |
+ StorePendingCommit(*it); |
+ } |
+} |
+ |
// CommitContributor implementation. |
scoped_ptr<CommitContribution> |
NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) { |
DCHECK(CalledOnValidThread()); |
- // TODO(rlarocque): Implement this properly. crbug.com/351005. |
- DVLOG(1) << "Getting commit contribution for: " << ModelTypeToString(type_); |
- return scoped_ptr<CommitContribution>(); |
+ |
+ size_t space_remaining = max_entries; |
+ std::vector<int64> sequence_numbers; |
+ google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities; |
+ |
+ if (!CanCommitItems()) |
+ return scoped_ptr<CommitContribution>(); |
+ |
+ // TODO(rlarocque): Avoid iterating here. |
+ for (EntityMap::const_iterator it = entities_.begin(); |
+ it != entities_.end() && space_remaining > 0; |
+ ++it) { |
+ SyncThreadSyncEntity* entity = it->second; |
+ if (entity->IsCommitPending()) { |
+ sync_pb::SyncEntity* commit_entity = commit_entities.Add(); |
+ int64 sequence_number = -1; |
+ |
+ entity->PrepareCommitProto(commit_entity, &sequence_number); |
+ HelpInitializeCommitEntity(commit_entity); |
+ sequence_numbers.push_back(sequence_number); |
+ |
+ space_remaining--; |
+ } |
+ } |
+ |
+ if (commit_entities.size() == 0) |
+ return scoped_ptr<CommitContribution>(); |
+ |
+ return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution( |
+ data_type_state_.type_context, commit_entities, sequence_numbers, this)); |
+} |
+ |
+void NonBlockingTypeProcessorCore::StorePendingCommit( |
+ const CommitRequestData& request) { |
+ if (!request.deleted) { |
+ DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics)); |
+ } |
+ |
+ EntityMap::iterator map_it = entities_.find(request.client_tag_hash); |
+ if (map_it == entities_.end()) { |
+ SyncThreadSyncEntity* entity = |
+ SyncThreadSyncEntity::FromCommitRequest(request.id, |
+ request.client_tag_hash, |
+ request.sequence_number, |
+ request.base_version, |
+ request.ctime, |
+ request.mtime, |
+ request.non_unique_name, |
+ request.deleted, |
+ request.specifics); |
+ entities_.insert(std::make_pair(request.client_tag_hash, entity)); |
+ } else { |
+ SyncThreadSyncEntity* entity = map_it->second; |
+ entity->RequestCommit(request.id, |
+ request.client_tag_hash, |
+ request.sequence_number, |
+ request.base_version, |
+ request.ctime, |
+ request.mtime, |
+ request.non_unique_name, |
+ request.deleted, |
+ request.specifics); |
+ } |
+ |
+ // TODO: Nudge SyncScheduler. |
+} |
+ |
+void NonBlockingTypeProcessorCore::OnCommitResponse( |
+ const CommitResponseDataList& response_list) { |
+ for (CommitResponseDataList::const_iterator response_it = |
+ response_list.begin(); |
+ response_it != response_list.end(); |
+ ++response_it) { |
+ const std::string client_tag_hash = response_it->client_tag_hash; |
+ EntityMap::iterator map_it = entities_.find(client_tag_hash); |
+ |
+ // There's no way we could have committed an entry we know nothing about. |
+ if (map_it == entities_.end()) { |
+ NOTREACHED() << "Received commit response for item unknown to us." |
+ << " Model type: " << ModelTypeToString(type_) |
+ << " ID: " << response_it->id; |
+ continue; |
+ } |
+ |
+ SyncThreadSyncEntity* entity = map_it->second; |
+ entity->ReceiveCommitResponse(response_it->id, |
+ response_it->response_version, |
+ response_it->sequence_number); |
+ } |
+ |
+ // Send the responses back to the model thread. It needs to know which |
+ // items have been successfully committed so it can save that information in |
+ // permanent storage. |
+ processor_interface_->ReceiveCommitResponse(data_type_state_, response_list); |
} |
base::WeakPtr<NonBlockingTypeProcessorCore> |
@@ -89,4 +256,31 @@ NonBlockingTypeProcessorCore::AsWeakPtr() { |
return weak_ptr_factory_.GetWeakPtr(); |
} |
+bool NonBlockingTypeProcessorCore::CanCommitItems() const { |
+ // We can't commit anything until we know the type's parent node. |
+ // We'll get it in the first update response. |
+ return !data_type_state_.type_root_id.empty() && |
+ data_type_state_.initial_sync_done; |
+} |
+ |
+void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity( |
+ sync_pb::SyncEntity* sync_entity) { |
+ // Initial commits need our help to generate a client ID. |
+ if (!sync_entity->has_id_string()) { |
+ DCHECK_EQ(kUncommittedVersion, sync_entity->version()); |
+ const int64 id = data_type_state_.next_client_id++; |
+ sync_entity->set_id_string( |
+ base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id)); |
+ } |
+ |
+ // Always include enough specifics to identify the type. Do this even in |
+ // deletion requests, where the specifics are otherwise invalid. |
+ if (!sync_entity->has_specifics()) { |
+ AddDefaultFieldValue(type_, sync_entity->mutable_specifics()); |
+ } |
+ |
+ // We're always responsible for the parent ID. |
+ sync_entity->set_parent_id_string(data_type_state_.type_root_id); |
+} |
+ |
} // namespace syncer |