Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(30)

Side by Side Diff: sync/sessions/model_type_registry.cc

Issue 299963002: sync: Implement NonBlockingTypeProcessorCore (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Unbreak some tests Created 6 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « sync/protocol/proto_value_conversions.cc ('k') | sync/sync_core.gypi » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/sessions/model_type_registry.h" 5 #include "sync/sessions/model_type_registry.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/message_loop/message_loop_proxy.h" 8 #include "base/message_loop/message_loop_proxy.h"
9 #include "base/observer_list.h" 9 #include "base/observer_list.h"
10 #include "sync/engine/directory_commit_contributor.h" 10 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h" 11 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/non_blocking_sync_common.h" 12 #include "sync/engine/non_blocking_sync_common.h"
13 #include "sync/engine/non_blocking_type_processor.h" 13 #include "sync/engine/non_blocking_type_processor.h"
14 #include "sync/engine/non_blocking_type_processor_core.h" 14 #include "sync/engine/non_blocking_type_processor_core.h"
15 #include "sync/engine/non_blocking_type_processor_core_interface.h" 15 #include "sync/engine/non_blocking_type_processor_core_interface.h"
16 #include "sync/engine/non_blocking_type_processor_interface.h"
16 #include "sync/sessions/directory_type_debug_info_emitter.h" 17 #include "sync/sessions/directory_type_debug_info_emitter.h"
17 18
18 namespace syncer { 19 namespace syncer {
19 20
20 namespace { 21 namespace {
21 22
23 class NonBlockingTypeProcessorWrapper
24 : public NonBlockingTypeProcessorInterface {
25 public:
26 NonBlockingTypeProcessorWrapper(
27 base::WeakPtr<NonBlockingTypeProcessor> processor,
28 scoped_refptr<base::SequencedTaskRunner> processor_task_runner);
29 virtual ~NonBlockingTypeProcessorWrapper();
30
31 virtual void ReceiveCommitResponse(
32 const DataTypeState& type_state,
33 const CommitResponseDataList& response_list) OVERRIDE;
34 virtual void ReceiveUpdateResponse(
35 const DataTypeState& type_state,
36 const UpdateResponseDataList& response_list) OVERRIDE;
37
38 private:
39 base::WeakPtr<NonBlockingTypeProcessor> processor_;
40 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
41 };
42
43 NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper(
44 base::WeakPtr<NonBlockingTypeProcessor> processor,
45 scoped_refptr<base::SequencedTaskRunner> processor_task_runner)
46 : processor_(processor), processor_task_runner_(processor_task_runner) {
47 }
48
49 NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() {
50 }
51
52 void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse(
53 const DataTypeState& type_state,
54 const CommitResponseDataList& response_list) {
55 processor_task_runner_->PostTask(
56 FROM_HERE,
57 base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion,
58 processor_,
59 type_state,
60 response_list));
61 }
62
63 void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse(
64 const DataTypeState& type_state,
65 const UpdateResponseDataList& response_list) {
66 processor_task_runner_->PostTask(
67 FROM_HERE,
68 base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived,
69 processor_,
70 type_state,
71 response_list));
72 }
73
22 class NonBlockingTypeProcessorCoreWrapper 74 class NonBlockingTypeProcessorCoreWrapper
23 : public NonBlockingTypeProcessorCoreInterface { 75 : public NonBlockingTypeProcessorCoreInterface {
24 public: 76 public:
25 NonBlockingTypeProcessorCoreWrapper( 77 NonBlockingTypeProcessorCoreWrapper(
26 base::WeakPtr<NonBlockingTypeProcessorCore> core, 78 base::WeakPtr<NonBlockingTypeProcessorCore> core,
27 scoped_refptr<base::SequencedTaskRunner> sync_thread); 79 scoped_refptr<base::SequencedTaskRunner> sync_thread);
28 virtual ~NonBlockingTypeProcessorCoreWrapper(); 80 virtual ~NonBlockingTypeProcessorCoreWrapper();
29 81
30 virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE; 82 virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE;
31 83
32 private: 84 private:
33 base::WeakPtr<NonBlockingTypeProcessorCore> core_; 85 base::WeakPtr<NonBlockingTypeProcessorCore> core_;
34 scoped_refptr<base::SequencedTaskRunner> sync_thread_; 86 scoped_refptr<base::SequencedTaskRunner> sync_thread_;
35 }; 87 };
36 88
37 NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper( 89 NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper(
38 base::WeakPtr<NonBlockingTypeProcessorCore> core, 90 base::WeakPtr<NonBlockingTypeProcessorCore> core,
39 scoped_refptr<base::SequencedTaskRunner> sync_thread) 91 scoped_refptr<base::SequencedTaskRunner> sync_thread)
40 : core_(core), sync_thread_(sync_thread) { 92 : core_(core), sync_thread_(sync_thread) {
41 } 93 }
42 94
43 NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() { 95 NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() {
44 } 96 }
45 97
46 void NonBlockingTypeProcessorCoreWrapper::RequestCommits( 98 void NonBlockingTypeProcessorCoreWrapper::RequestCommits(
47 const CommitRequestDataList& list) { 99 const CommitRequestDataList& list) {
48 sync_thread_->PostTask( 100 sync_thread_->PostTask(
49 FROM_HERE, 101 FROM_HERE,
50 base::Bind(&NonBlockingTypeProcessorCore::RequestCommits, core_, list)); 102 base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list));
51 } 103 }
52 104
53 } // namespace 105 } // namespace
54 106
55 ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {} 107 ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {}
56 108
57 ModelTypeRegistry::ModelTypeRegistry( 109 ModelTypeRegistry::ModelTypeRegistry(
58 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, 110 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
59 syncable::Directory* directory) 111 syncable::Directory* directory)
60 : directory_(directory) { 112 : directory_(directory) {
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
132 } 184 }
133 185
134 void ModelTypeRegistry::InitializeNonBlockingType( 186 void ModelTypeRegistry::InitializeNonBlockingType(
135 ModelType type, 187 ModelType type,
136 const DataTypeState& data_type_state, 188 const DataTypeState& data_type_state,
137 scoped_refptr<base::SequencedTaskRunner> type_task_runner, 189 scoped_refptr<base::SequencedTaskRunner> type_task_runner,
138 base::WeakPtr<NonBlockingTypeProcessor> processor) { 190 base::WeakPtr<NonBlockingTypeProcessor> processor) {
139 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); 191 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
140 192
141 // Initialize CoreProcessor -> Processor communication channel. 193 // Initialize CoreProcessor -> Processor communication channel.
194 scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface(
195 new NonBlockingTypeProcessorWrapper(processor, type_task_runner));
142 scoped_ptr<NonBlockingTypeProcessorCore> core( 196 scoped_ptr<NonBlockingTypeProcessorCore> core(
143 new NonBlockingTypeProcessorCore(type, type_task_runner, processor)); 197 new NonBlockingTypeProcessorCore(
144 198 type, data_type_state, processor_interface.Pass()));
145 // TODO(rlarocque): DataTypeState should be forwarded to core here.
146 199
147 // Initialize Processor -> CoreProcessor communication channel. 200 // Initialize Processor -> CoreProcessor communication channel.
148 scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface( 201 scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface(
149 new NonBlockingTypeProcessorCoreWrapper( 202 new NonBlockingTypeProcessorCoreWrapper(
150 core->AsWeakPtr(), 203 core->AsWeakPtr(),
151 scoped_refptr<base::SequencedTaskRunner>( 204 scoped_refptr<base::SequencedTaskRunner>(
152 base::MessageLoopProxy::current()))); 205 base::MessageLoopProxy::current())));
153 type_task_runner->PostTask(FROM_HERE, 206 type_task_runner->PostTask(FROM_HERE,
154 base::Bind(&NonBlockingTypeProcessor::OnConnect, 207 base::Bind(&NonBlockingTypeProcessor::OnConnect,
155 processor, 208 processor,
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
241 ModelTypeSet enabled_off_thread_types; 294 ModelTypeSet enabled_off_thread_types;
242 for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it = 295 for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it =
243 non_blocking_type_processor_cores_.begin(); 296 non_blocking_type_processor_cores_.begin();
244 it != non_blocking_type_processor_cores_.end(); ++it) { 297 it != non_blocking_type_processor_cores_.end(); ++it) {
245 enabled_off_thread_types.Put((*it)->GetModelType()); 298 enabled_off_thread_types.Put((*it)->GetModelType());
246 } 299 }
247 return enabled_off_thread_types; 300 return enabled_off_thread_types;
248 } 301 }
249 302
250 } // namespace syncer 303 } // namespace syncer
OLDNEW
« no previous file with comments | « sync/protocol/proto_value_conversions.cc ('k') | sync/sync_core.gypi » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698