Chromium Code Reviews| Index: sync/engine/non_blocking_type_processor_core.cc |
| diff --git a/sync/engine/non_blocking_type_processor_core.cc b/sync/engine/non_blocking_type_processor_core.cc |
| index 236c9c5cdddeb1dbfd2ae55e774f6e2bfd8c5b77..8ddd9e7ab4189a8f0051e8ceaea19cd248aa0fff 100644 |
| --- a/sync/engine/non_blocking_type_processor_core.cc |
| +++ b/sync/engine/non_blocking_type_processor_core.cc |
| @@ -4,20 +4,29 @@ |
| #include "sync/engine/non_blocking_type_processor_core.h" |
| +#include "base/bind.h" |
| +#include "base/format_macros.h" |
| #include "base/logging.h" |
| +#include "base/strings/stringprintf.h" |
| #include "sync/engine/commit_contribution.h" |
| +#include "sync/engine/non_blocking_type_commit_contribution.h" |
| +#include "sync/engine/non_blocking_type_processor.h" |
|
Nicolas Zea
2014/05/28 23:56:15
Why is NonBlockingTypeProcessor needed, if we have
rlarocque
2014/05/29 20:54:52
Removed.
|
| +#include "sync/engine/non_blocking_type_processor_core_interface.h" |
| +#include "sync/engine/sync_thread_sync_entity.h" |
| +#include "sync/syncable/syncable_util.h" |
| +#include "sync/util/time.h" |
| namespace syncer { |
| NonBlockingTypeProcessorCore::NonBlockingTypeProcessorCore( |
| - ModelType type, |
| - scoped_refptr<base::SequencedTaskRunner> processor_task_runner, |
| - base::WeakPtr<NonBlockingTypeProcessor> processor) |
| + ModelType type, |
| + DataTypeState initial_state, |
| + scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface) |
| : type_(type), |
| - processor_task_runner_(processor_task_runner), |
| - processor_(processor), |
| + data_type_state_(initial_state), |
| + processor_interface_(processor_interface.Pass()), |
| + entities_deleter_(&entities_), |
| weak_ptr_factory_(this) { |
| - progress_marker_.set_data_type_id(GetSpecificsFieldNumberFromModelType(type)); |
| } |
| NonBlockingTypeProcessorCore::~NonBlockingTypeProcessorCore() { |
| @@ -32,16 +41,13 @@ ModelType NonBlockingTypeProcessorCore::GetModelType() const { |
| void NonBlockingTypeProcessorCore::GetDownloadProgress( |
| sync_pb::DataTypeProgressMarker* progress_marker) const { |
| DCHECK(CalledOnValidThread()); |
| - // TODO(rlarocque): Implement this properly. crbug.com/351005. |
| - DVLOG(1) << "Getting progress for: " << ModelTypeToString(type_); |
| - *progress_marker = progress_marker_; |
| + *progress_marker = data_type_state_.progress_marker; |
|
Nicolas Zea
2014/05/28 23:56:15
Why the difference between the context and progres
rlarocque
2014/05/29 20:54:52
I believe there's no significant difference betwee
|
| } |
| void NonBlockingTypeProcessorCore::GetDataTypeContext( |
| sync_pb::DataTypeContext* context) const { |
| - // TODO(rlarocque): Implement this properly. crbug.com/351005. |
| - DVLOG(1) << "Getting context for: " << ModelTypeToString(type_); |
| - context->Clear(); |
| + DCHECK(CalledOnValidThread()); |
| + context->CopyFrom(data_type_state_.type_context); |
| } |
| SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( |
| @@ -50,22 +56,68 @@ SyncerError NonBlockingTypeProcessorCore::ProcessGetUpdatesResponse( |
| const SyncEntityList& applicable_updates, |
| sessions::StatusController* status) { |
| DCHECK(CalledOnValidThread()); |
| - // TODO(rlarocque): Implement this properly. crbug.com/351005. |
| - DVLOG(1) << "Processing updates response for: " << ModelTypeToString(type_); |
| - progress_marker_ = progress_marker; |
| + |
| + // TODO(rlarocque): Handle data type context conflicts. |
| + data_type_state_.type_context = mutated_context; |
| + data_type_state_.progress_marker = progress_marker; |
| + |
| + UpdateResponseDataList response_datas; |
| + |
| + for (SyncEntityList::const_iterator update_it = applicable_updates.begin(); |
| + update_it != applicable_updates.end(); |
| + ++update_it) { |
| + const sync_pb::SyncEntity* update_entity = *update_it; |
| + if (!update_entity->server_defined_unique_tag().empty()) { |
| + // We can't commit an item unless we know its parent ID. This is where |
| + // we learn that ID and remember it forever. |
| + DCHECK_EQ(ModelTypeToRootTag(type_), |
| + update_entity->server_defined_unique_tag()); |
| + if (!data_type_state_.type_root_id.empty()) { |
| + DCHECK_EQ(data_type_state_.type_root_id, update_entity->id_string()); |
|
Nicolas Zea
2014/05/28 23:56:15
nit: DCHECK(..empty() || ..type_root_id == ...)
rlarocque
2014/05/29 20:54:52
I like it better as is. As currently written, it
|
| + } |
| + data_type_state_.type_root_id = update_entity->id_string(); |
| + } else { |
| + // More normal updates are handled here. |
|
Nicolas Zea
2014/05/28 23:56:15
nit: remove "More"
rlarocque
2014/05/29 20:54:52
Done.
|
| + const std::string& client_tag_hash = |
| + update_entity->client_defined_unique_tag(); |
| + DCHECK(!client_tag_hash.empty()); |
| + EntityMap::const_iterator map_it = entities_.find(client_tag_hash); |
| + if (map_it == entities_.end()) { |
| + SyncThreadSyncEntity* entity = |
| + SyncThreadSyncEntity::FromServerUpdate(update_entity->id_string(), |
| + client_tag_hash, |
| + update_entity->version()); |
| + entities_.insert(std::make_pair(client_tag_hash, entity)); |
|
Nicolas Zea
2014/05/28 23:56:15
When do we remove from the entities map?
rlarocque
2014/05/29 20:54:52
Never. The map grows but never shrinks. If this
Nicolas Zea
2014/06/02 20:27:17
Probably worth having a comment mentioning that we
rlarocque
2014/06/02 21:39:13
Added comment next to entities_'s definition notin
|
| + } else { |
| + SyncThreadSyncEntity* entity = map_it->second; |
| + entity->ReceiveUpdate(update_entity->version()); |
| + } |
| + |
| + // Prepare the message for the model thread. |
| + UpdateResponseData response_data; |
| + response_data.id = update_entity->id_string(); |
| + response_data.client_tag_hash = client_tag_hash; |
| + response_data.response_version = update_entity->version(); |
| + response_data.ctime = ProtoTimeToTime(update_entity->ctime()); |
| + response_data.mtime = ProtoTimeToTime(update_entity->mtime()); |
| + response_data.deleted = update_entity->deleted(); |
| + response_data.specifics = update_entity->specifics(); |
| + |
| + response_datas.push_back(response_data); |
| + } |
| + } |
| + |
| + // That's all the processing we needed to do here. Now we forward these |
| + // updates to the model thread so it can do the rest. |
|
Nicolas Zea
2014/05/28 23:56:15
nit: you can just say "Forward these updates.."
rlarocque
2014/05/29 20:54:52
Done.
|
| + processor_interface_->ReceiveUpdateResponse(data_type_state_, response_datas); |
| + |
| return SYNCER_OK; |
| } |
| void NonBlockingTypeProcessorCore::ApplyUpdates( |
| sessions::StatusController* status) { |
| DCHECK(CalledOnValidThread()); |
| - // TODO(rlarocque): Implement this properly. crbug.com/351005. |
| - DVLOG(1) << "Applying updates for: " << ModelTypeToString(type_); |
| -} |
| - |
| -void NonBlockingTypeProcessorCore::RequestCommits( |
| - const CommitRequestDataList& request_list) { |
| - // TODO(rlarocque): Implement this. crbug.com/351005. |
| + // Do nothing. Non-blocking types have no apply step. |
| } |
| void NonBlockingTypeProcessorCore::PassiveApplyUpdates( |
| @@ -75,13 +127,114 @@ void NonBlockingTypeProcessorCore::PassiveApplyUpdates( |
| << "ModelType is: " << ModelTypeToString(type_); |
| } |
| +void NonBlockingTypeProcessorCore::EnqueueForCommit( |
| + const CommitRequestDataList& list) { |
| + DCHECK(CalledOnValidThread()); |
| + for (CommitRequestDataList::const_iterator it = list.begin(); |
| + it != list.end(); |
| + ++it) { |
| + StorePendingCommit(*it); |
| + } |
| +} |
| + |
| // CommitContributor implementation. |
| scoped_ptr<CommitContribution> |
| NonBlockingTypeProcessorCore::GetContribution(size_t max_entries) { |
| DCHECK(CalledOnValidThread()); |
| - // TODO(rlarocque): Implement this properly. crbug.com/351005. |
| - DVLOG(1) << "Getting commit contribution for: " << ModelTypeToString(type_); |
| - return scoped_ptr<CommitContribution>(); |
| + |
| + size_t space_remaining = max_entries; |
| + std::vector<int64> sequence_numbers; |
| + google::protobuf::RepeatedPtrField<sync_pb::SyncEntity> commit_entities; |
| + |
| + if (!CanCommitItems()) |
| + return scoped_ptr<CommitContribution>(); |
| + |
| + // TODO(rlarocque): Avoid iterating here. |
| + for (EntityMap::const_iterator it = entities_.begin(); |
| + it != entities_.end() && space_remaining > 0; |
| + ++it) { |
| + SyncThreadSyncEntity* entity = it->second; |
| + if (entity->IsCommitPending()) { |
| + sync_pb::SyncEntity* commit_entity = commit_entities.Add(); |
| + int64 sequence_number; |
|
Nicolas Zea
2014/05/28 23:56:15
initialize to 0 (or -1 or something invalid)
rlarocque
2014/05/29 20:54:52
Done.
|
| + |
| + entity->PrepareCommitProto(commit_entity, &sequence_number); |
| + HelpInitializeCommitEntity(commit_entity); |
| + sequence_numbers.push_back(sequence_number); |
| + |
| + space_remaining--; |
| + } |
| + } |
| + |
| + if (commit_entities.size() == 0) |
| + return scoped_ptr<CommitContribution>(); |
| + |
| + return scoped_ptr<CommitContribution>(new NonBlockingTypeCommitContribution( |
| + this, data_type_state_.type_context, commit_entities, sequence_numbers)); |
| +} |
| + |
| +void NonBlockingTypeProcessorCore::StorePendingCommit( |
| + const CommitRequestData& request) { |
| + if (!request.deleted) { |
| + DCHECK_EQ(type_, GetModelTypeFromSpecifics(request.specifics)); |
|
Nicolas Zea
2014/05/28 23:56:15
don't we always want the model type now?
rlarocque
2014/05/29 20:54:52
True, but it doesn't have to be the caller (ie. Pr
|
| + } |
| + |
| + EntityMap::iterator map_it = entities_.find(request.client_tag_hash); |
| + if (map_it == entities_.end()) { |
| + SyncThreadSyncEntity* entity = |
| + SyncThreadSyncEntity::FromCommitRequest(request.id, |
| + request.client_tag_hash, |
| + request.sequence_number, |
| + request.base_version, |
| + request.ctime, |
| + request.mtime, |
| + request.non_unique_name, |
| + request.deleted, |
| + request.specifics); |
| + entities_.insert(std::make_pair(request.client_tag_hash, entity)); |
| + } else { |
| + SyncThreadSyncEntity* entity = map_it->second; |
| + entity->RequestCommit(request.id, |
| + request.client_tag_hash, |
| + request.sequence_number, |
| + request.base_version, |
| + request.ctime, |
| + request.mtime, |
| + request.non_unique_name, |
| + request.deleted, |
| + request.specifics); |
| + } |
| + |
| + // TODO: Nudge SyncScheduler. |
| +} |
| + |
| +void NonBlockingTypeProcessorCore::ProcessCommitResponses( |
| + const CommitResponseDataList& response_list) { |
| + for (CommitResponseDataList::const_iterator response_it = |
| + response_list.begin(); |
| + response_it != response_list.end(); |
| + ++response_it) { |
| + const std::string client_tag_hash = response_it->client_tag_hash; |
| + EntityMap::iterator map_it = entities_.find(client_tag_hash); |
| + |
| + // There's no way we could have committed an entry we know nothing about. |
| + if (map_it == entities_.end()) { |
| + NOTREACHED() << "Received commit response for item unknown to us." |
| + << " Model type: " << ModelTypeToString(type_) |
| + << " ID: " << response_it->id; |
| + continue; |
| + } |
| + |
| + SyncThreadSyncEntity* entity = map_it->second; |
| + entity->ReceiveCommitResponse(response_it->id, |
| + response_it->response_version, |
| + response_it->sequence_number); |
| + } |
| + |
| + // Send the responses back to the model thread. It needs to know which |
| + // items have been successfully committed so it can save that information in |
| + // permanent storage. |
| + processor_interface_->ReceiveCommitResponse(data_type_state_, response_list); |
| } |
| base::WeakPtr<NonBlockingTypeProcessorCore> |
| @@ -89,4 +242,31 @@ NonBlockingTypeProcessorCore::AsWeakPtr() { |
| return weak_ptr_factory_.GetWeakPtr(); |
| } |
| +bool NonBlockingTypeProcessorCore::CanCommitItems() const { |
| + // We can't commit anything until we know the type's parent node. |
| + // We'll get it in the first update response. |
|
Nicolas Zea
2014/05/28 23:56:15
Where will be the logic to determine if a particul
rlarocque
2014/05/29 20:54:52
Conflicts are an event, not a state. When a confl
|
| + return !data_type_state_.type_root_id.empty(); |
| +} |
| + |
| +void NonBlockingTypeProcessorCore::HelpInitializeCommitEntity( |
| + sync_pb::SyncEntity* sync_entity) { |
| + // Initial commits need our help to generate a client ID. |
| + if (!sync_entity->has_id_string()) { |
| + DCHECK_EQ(kUncommittedVersion, sync_entity->version()); |
| + const int64 id = data_type_state_.next_client_id; |
| + data_type_state_.next_client_id++; |
|
Nicolas Zea
2014/05/28 23:56:16
nit: can combine with previous line
rlarocque
2014/05/29 20:54:52
Done.
|
| + sync_entity->set_id_string( |
| + base::StringPrintf("%s-%" PRId64, ModelTypeToString(type_), id)); |
| + } |
| + |
| + // Always include enough specifics to identify the type. Do this even in |
| + // deletion requests, where the specifics are otherwise invalid. |
| + if (!sync_entity->has_specifics()) { |
| + AddDefaultFieldValue(type_, sync_entity->mutable_specifics()); |
| + } |
| + |
| + // We're always responsible for the parent ID. |
| + sync_entity->set_parent_id_string(data_type_state_.type_root_id); |
| +} |
| + |
| } // namespace syncer |