| Index: sync/sessions/model_type_registry.cc
|
| diff --git a/sync/sessions/model_type_registry.cc b/sync/sessions/model_type_registry.cc
|
| index 9ceac741f8a1ceeede70a98bd8f94309e3543f13..b8ab74b0f8fd4218c003ec48c3b72ca776193b53 100644
|
| --- a/sync/sessions/model_type_registry.cc
|
| +++ b/sync/sessions/model_type_registry.cc
|
| @@ -9,24 +9,23 @@
|
| #include "base/observer_list.h"
|
| #include "sync/engine/directory_commit_contributor.h"
|
| #include "sync/engine/directory_update_handler.h"
|
| +#include "sync/engine/model_type_sync_proxy.h"
|
| +#include "sync/engine/model_type_sync_proxy_impl.h"
|
| +#include "sync/engine/model_type_sync_worker.h"
|
| +#include "sync/engine/model_type_sync_worker_impl.h"
|
| #include "sync/engine/non_blocking_sync_common.h"
|
| -#include "sync/engine/non_blocking_type_processor.h"
|
| -#include "sync/engine/non_blocking_type_processor_core.h"
|
| -#include "sync/engine/non_blocking_type_processor_core_interface.h"
|
| -#include "sync/engine/non_blocking_type_processor_interface.h"
|
| #include "sync/sessions/directory_type_debug_info_emitter.h"
|
|
|
| namespace syncer {
|
|
|
| namespace {
|
|
|
| -class NonBlockingTypeProcessorWrapper
|
| - : public NonBlockingTypeProcessorInterface {
|
| +class ModelTypeSyncProxyWrapper : public ModelTypeSyncProxy {
|
| public:
|
| - NonBlockingTypeProcessorWrapper(
|
| - base::WeakPtr<NonBlockingTypeProcessor> processor,
|
| - scoped_refptr<base::SequencedTaskRunner> processor_task_runner);
|
| - virtual ~NonBlockingTypeProcessorWrapper();
|
| + ModelTypeSyncProxyWrapper(
|
| + const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
|
| + const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
|
| + virtual ~ModelTypeSyncProxyWrapper();
|
|
|
| virtual void ReceiveCommitResponse(
|
| const DataTypeState& type_state,
|
| @@ -36,70 +35,69 @@ class NonBlockingTypeProcessorWrapper
|
| const UpdateResponseDataList& response_list) OVERRIDE;
|
|
|
| private:
|
| - base::WeakPtr<NonBlockingTypeProcessor> processor_;
|
| + base::WeakPtr<ModelTypeSyncProxyImpl> processor_;
|
| scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
|
| };
|
|
|
| -NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper(
|
| - base::WeakPtr<NonBlockingTypeProcessor> processor,
|
| - scoped_refptr<base::SequencedTaskRunner> processor_task_runner)
|
| - : processor_(processor), processor_task_runner_(processor_task_runner) {
|
| +ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper(
|
| + const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy,
|
| + const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
|
| + : processor_(proxy), processor_task_runner_(processor_task_runner) {
|
| }
|
|
|
| -NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() {
|
| +ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() {
|
| }
|
|
|
| -void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse(
|
| +void ModelTypeSyncProxyWrapper::ReceiveCommitResponse(
|
| const DataTypeState& type_state,
|
| const CommitResponseDataList& response_list) {
|
| processor_task_runner_->PostTask(
|
| FROM_HERE,
|
| - base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion,
|
| + base::Bind(&ModelTypeSyncProxyImpl::OnCommitCompletion,
|
| processor_,
|
| type_state,
|
| response_list));
|
| }
|
|
|
| -void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse(
|
| +void ModelTypeSyncProxyWrapper::ReceiveUpdateResponse(
|
| const DataTypeState& type_state,
|
| const UpdateResponseDataList& response_list) {
|
| processor_task_runner_->PostTask(
|
| FROM_HERE,
|
| - base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived,
|
| + base::Bind(&ModelTypeSyncProxyImpl::OnUpdateReceived,
|
| processor_,
|
| type_state,
|
| response_list));
|
| }
|
|
|
| -class NonBlockingTypeProcessorCoreWrapper
|
| - : public NonBlockingTypeProcessorCoreInterface {
|
| +class ModelTypeSyncWorkerWrapper : public ModelTypeSyncWorker {
|
| public:
|
| - NonBlockingTypeProcessorCoreWrapper(
|
| - base::WeakPtr<NonBlockingTypeProcessorCore> core,
|
| - scoped_refptr<base::SequencedTaskRunner> sync_thread);
|
| - virtual ~NonBlockingTypeProcessorCoreWrapper();
|
| + ModelTypeSyncWorkerWrapper(
|
| + const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
|
| + const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
|
| + virtual ~ModelTypeSyncWorkerWrapper();
|
|
|
| virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE;
|
|
|
| private:
|
| - base::WeakPtr<NonBlockingTypeProcessorCore> core_;
|
| + base::WeakPtr<ModelTypeSyncWorkerImpl> worker_;
|
| scoped_refptr<base::SequencedTaskRunner> sync_thread_;
|
| };
|
|
|
| -NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper(
|
| - base::WeakPtr<NonBlockingTypeProcessorCore> core,
|
| - scoped_refptr<base::SequencedTaskRunner> sync_thread)
|
| - : core_(core), sync_thread_(sync_thread) {
|
| +ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper(
|
| + const base::WeakPtr<ModelTypeSyncWorkerImpl>& worker,
|
| + const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
|
| + : worker_(worker), sync_thread_(sync_thread) {
|
| }
|
|
|
| -NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() {
|
| +ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() {
|
| }
|
|
|
| -void NonBlockingTypeProcessorCoreWrapper::RequestCommits(
|
| +void ModelTypeSyncWorkerWrapper::RequestCommits(
|
| const CommitRequestDataList& list) {
|
| sync_thread_->PostTask(
|
| FROM_HERE,
|
| - base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list));
|
| + base::Bind(&ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, list));
|
| }
|
|
|
| } // namespace
|
| @@ -186,36 +184,34 @@ void ModelTypeRegistry::SetEnabledDirectoryTypes(
|
| void ModelTypeRegistry::InitializeNonBlockingType(
|
| ModelType type,
|
| const DataTypeState& data_type_state,
|
| - scoped_refptr<base::SequencedTaskRunner> type_task_runner,
|
| - base::WeakPtr<NonBlockingTypeProcessor> processor) {
|
| + const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
|
| + const base::WeakPtr<ModelTypeSyncProxyImpl>& proxy_impl) {
|
| DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
|
|
|
| - // Initialize CoreProcessor -> Processor communication channel.
|
| - scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface(
|
| - new NonBlockingTypeProcessorWrapper(processor, type_task_runner));
|
| - scoped_ptr<NonBlockingTypeProcessorCore> core(
|
| - new NonBlockingTypeProcessorCore(
|
| - type, data_type_state, processor_interface.Pass()));
|
| -
|
| - // Initialize Processor -> CoreProcessor communication channel.
|
| - scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface(
|
| - new NonBlockingTypeProcessorCoreWrapper(
|
| - core->AsWeakPtr(),
|
| - scoped_refptr<base::SequencedTaskRunner>(
|
| - base::MessageLoopProxy::current())));
|
| + // Initialize Worker -> Proxy communication channel.
|
| + scoped_ptr<ModelTypeSyncProxy> proxy(
|
| + new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner));
|
| + scoped_ptr<ModelTypeSyncWorkerImpl> worker(
|
| + new ModelTypeSyncWorkerImpl(type, data_type_state, proxy.Pass()));
|
| +
|
| + // Initialize Proxy -> Worker communication channel.
|
| + scoped_ptr<ModelTypeSyncWorker> wrapped_worker(
|
| + new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(),
|
| + scoped_refptr<base::SequencedTaskRunner>(
|
| + base::MessageLoopProxy::current())));
|
| type_task_runner->PostTask(FROM_HERE,
|
| - base::Bind(&NonBlockingTypeProcessor::OnConnect,
|
| - processor,
|
| - base::Passed(&core_interface)));
|
| + base::Bind(&ModelTypeSyncProxyImpl::OnConnect,
|
| + proxy_impl,
|
| + base::Passed(&wrapped_worker)));
|
|
|
| DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
|
| DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
|
|
|
| - update_handler_map_.insert(std::make_pair(type, core.get()));
|
| - commit_contributor_map_.insert(std::make_pair(type, core.get()));
|
| + update_handler_map_.insert(std::make_pair(type, worker.get()));
|
| + commit_contributor_map_.insert(std::make_pair(type, worker.get()));
|
|
|
| // The container takes ownership.
|
| - non_blocking_type_processor_cores_.push_back(core.release());
|
| + model_type_sync_workers_.push_back(worker.release());
|
|
|
| DCHECK(Intersection(GetEnabledDirectoryTypes(),
|
| GetEnabledNonBlockingTypes()).Empty());
|
| @@ -232,12 +228,13 @@ void ModelTypeRegistry::RemoveNonBlockingType(ModelType type) {
|
| DCHECK_EQ(1U, updaters_erased);
|
| DCHECK_EQ(1U, committers_erased);
|
|
|
| - // Remove from the ScopedVector, deleting the core in the process.
|
| - for (ScopedVector<NonBlockingTypeProcessorCore>::iterator it =
|
| - non_blocking_type_processor_cores_.begin();
|
| - it != non_blocking_type_processor_cores_.end(); ++it) {
|
| + // Remove from the ScopedVector, deleting the worker in the process.
|
| + for (ScopedVector<ModelTypeSyncWorkerImpl>::iterator it =
|
| + model_type_sync_workers_.begin();
|
| + it != model_type_sync_workers_.end();
|
| + ++it) {
|
| if ((*it)->GetModelType() == type) {
|
| - non_blocking_type_processor_cores_.erase(it);
|
| + model_type_sync_workers_.erase(it);
|
| break;
|
| }
|
| }
|
| @@ -292,9 +289,10 @@ ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
|
|
|
| ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
|
| ModelTypeSet enabled_off_thread_types;
|
| - for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it =
|
| - non_blocking_type_processor_cores_.begin();
|
| - it != non_blocking_type_processor_cores_.end(); ++it) {
|
| + for (ScopedVector<ModelTypeSyncWorkerImpl>::const_iterator it =
|
| + model_type_sync_workers_.begin();
|
| + it != model_type_sync_workers_.end();
|
| + ++it) {
|
| enabled_off_thread_types.Put((*it)->GetModelType());
|
| }
|
| return enabled_off_thread_types;
|
|
|