OLD | NEW |
1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/sessions/model_type_registry.h" | 5 #include "sync/sessions/model_type_registry.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/message_loop/message_loop_proxy.h" | 8 #include "base/message_loop/message_loop_proxy.h" |
9 #include "base/observer_list.h" | 9 #include "base/observer_list.h" |
10 #include "sync/engine/directory_commit_contributor.h" | 10 #include "sync/engine/directory_commit_contributor.h" |
11 #include "sync/engine/directory_update_handler.h" | 11 #include "sync/engine/directory_update_handler.h" |
12 #include "sync/engine/non_blocking_sync_common.h" | 12 #include "sync/engine/non_blocking_sync_common.h" |
13 #include "sync/engine/non_blocking_type_processor.h" | 13 #include "sync/engine/non_blocking_type_processor.h" |
14 #include "sync/engine/non_blocking_type_processor_core.h" | 14 #include "sync/engine/non_blocking_type_processor_core.h" |
15 #include "sync/engine/non_blocking_type_processor_core_interface.h" | 15 #include "sync/engine/non_blocking_type_processor_core_interface.h" |
| 16 #include "sync/engine/non_blocking_type_processor_interface.h" |
16 #include "sync/sessions/directory_type_debug_info_emitter.h" | 17 #include "sync/sessions/directory_type_debug_info_emitter.h" |
17 | 18 |
18 namespace syncer { | 19 namespace syncer { |
19 | 20 |
20 namespace { | 21 namespace { |
21 | 22 |
| 23 class NonBlockingTypeProcessorWrapper |
| 24 : public NonBlockingTypeProcessorInterface { |
| 25 public: |
| 26 NonBlockingTypeProcessorWrapper( |
| 27 base::WeakPtr<NonBlockingTypeProcessor> processor, |
| 28 scoped_refptr<base::SequencedTaskRunner> processor_task_runner); |
| 29 virtual ~NonBlockingTypeProcessorWrapper(); |
| 30 |
| 31 virtual void ReceiveCommitResponse( |
| 32 const DataTypeState& type_state, |
| 33 const CommitResponseDataList& response_list) OVERRIDE; |
| 34 virtual void ReceiveUpdateResponse( |
| 35 const DataTypeState& type_state, |
| 36 const UpdateResponseDataList& response_list) OVERRIDE; |
| 37 |
| 38 private: |
| 39 base::WeakPtr<NonBlockingTypeProcessor> processor_; |
| 40 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; |
| 41 }; |
| 42 |
| 43 NonBlockingTypeProcessorWrapper::NonBlockingTypeProcessorWrapper( |
| 44 base::WeakPtr<NonBlockingTypeProcessor> processor, |
| 45 scoped_refptr<base::SequencedTaskRunner> processor_task_runner) |
| 46 : processor_(processor), processor_task_runner_(processor_task_runner) { |
| 47 } |
| 48 |
| 49 NonBlockingTypeProcessorWrapper::~NonBlockingTypeProcessorWrapper() { |
| 50 } |
| 51 |
| 52 void NonBlockingTypeProcessorWrapper::ReceiveCommitResponse( |
| 53 const DataTypeState& type_state, |
| 54 const CommitResponseDataList& response_list) { |
| 55 processor_task_runner_->PostTask( |
| 56 FROM_HERE, |
| 57 base::Bind(&NonBlockingTypeProcessor::OnCommitCompletion, |
| 58 processor_, |
| 59 type_state, |
| 60 response_list)); |
| 61 } |
| 62 |
| 63 void NonBlockingTypeProcessorWrapper::ReceiveUpdateResponse( |
| 64 const DataTypeState& type_state, |
| 65 const UpdateResponseDataList& response_list) { |
| 66 processor_task_runner_->PostTask( |
| 67 FROM_HERE, |
| 68 base::Bind(&NonBlockingTypeProcessor::OnUpdateReceived, |
| 69 processor_, |
| 70 type_state, |
| 71 response_list)); |
| 72 } |
| 73 |
22 class NonBlockingTypeProcessorCoreWrapper | 74 class NonBlockingTypeProcessorCoreWrapper |
23 : public NonBlockingTypeProcessorCoreInterface { | 75 : public NonBlockingTypeProcessorCoreInterface { |
24 public: | 76 public: |
25 NonBlockingTypeProcessorCoreWrapper( | 77 NonBlockingTypeProcessorCoreWrapper( |
26 base::WeakPtr<NonBlockingTypeProcessorCore> core, | 78 base::WeakPtr<NonBlockingTypeProcessorCore> core, |
27 scoped_refptr<base::SequencedTaskRunner> sync_thread); | 79 scoped_refptr<base::SequencedTaskRunner> sync_thread); |
28 virtual ~NonBlockingTypeProcessorCoreWrapper(); | 80 virtual ~NonBlockingTypeProcessorCoreWrapper(); |
29 | 81 |
30 virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE; | 82 virtual void RequestCommits(const CommitRequestDataList& list) OVERRIDE; |
31 | 83 |
32 private: | 84 private: |
33 base::WeakPtr<NonBlockingTypeProcessorCore> core_; | 85 base::WeakPtr<NonBlockingTypeProcessorCore> core_; |
34 scoped_refptr<base::SequencedTaskRunner> sync_thread_; | 86 scoped_refptr<base::SequencedTaskRunner> sync_thread_; |
35 }; | 87 }; |
36 | 88 |
37 NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper( | 89 NonBlockingTypeProcessorCoreWrapper::NonBlockingTypeProcessorCoreWrapper( |
38 base::WeakPtr<NonBlockingTypeProcessorCore> core, | 90 base::WeakPtr<NonBlockingTypeProcessorCore> core, |
39 scoped_refptr<base::SequencedTaskRunner> sync_thread) | 91 scoped_refptr<base::SequencedTaskRunner> sync_thread) |
40 : core_(core), sync_thread_(sync_thread) { | 92 : core_(core), sync_thread_(sync_thread) { |
41 } | 93 } |
42 | 94 |
43 NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() { | 95 NonBlockingTypeProcessorCoreWrapper::~NonBlockingTypeProcessorCoreWrapper() { |
44 } | 96 } |
45 | 97 |
46 void NonBlockingTypeProcessorCoreWrapper::RequestCommits( | 98 void NonBlockingTypeProcessorCoreWrapper::RequestCommits( |
47 const CommitRequestDataList& list) { | 99 const CommitRequestDataList& list) { |
48 sync_thread_->PostTask( | 100 sync_thread_->PostTask( |
49 FROM_HERE, | 101 FROM_HERE, |
50 base::Bind(&NonBlockingTypeProcessorCore::RequestCommits, core_, list)); | 102 base::Bind(&NonBlockingTypeProcessorCore::EnqueueForCommit, core_, list)); |
51 } | 103 } |
52 | 104 |
53 } // namespace | 105 } // namespace |
54 | 106 |
55 ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {} | 107 ModelTypeRegistry::ModelTypeRegistry() : directory_(NULL) {} |
56 | 108 |
57 ModelTypeRegistry::ModelTypeRegistry( | 109 ModelTypeRegistry::ModelTypeRegistry( |
58 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, | 110 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, |
59 syncable::Directory* directory) | 111 syncable::Directory* directory) |
60 : directory_(directory) { | 112 : directory_(directory) { |
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
132 } | 184 } |
133 | 185 |
134 void ModelTypeRegistry::InitializeNonBlockingType( | 186 void ModelTypeRegistry::InitializeNonBlockingType( |
135 ModelType type, | 187 ModelType type, |
136 const DataTypeState& data_type_state, | 188 const DataTypeState& data_type_state, |
137 scoped_refptr<base::SequencedTaskRunner> type_task_runner, | 189 scoped_refptr<base::SequencedTaskRunner> type_task_runner, |
138 base::WeakPtr<NonBlockingTypeProcessor> processor) { | 190 base::WeakPtr<NonBlockingTypeProcessor> processor) { |
139 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); | 191 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); |
140 | 192 |
141 // Initialize CoreProcessor -> Processor communication channel. | 193 // Initialize CoreProcessor -> Processor communication channel. |
| 194 scoped_ptr<NonBlockingTypeProcessorInterface> processor_interface( |
| 195 new NonBlockingTypeProcessorWrapper(processor, type_task_runner)); |
142 scoped_ptr<NonBlockingTypeProcessorCore> core( | 196 scoped_ptr<NonBlockingTypeProcessorCore> core( |
143 new NonBlockingTypeProcessorCore(type, type_task_runner, processor)); | 197 new NonBlockingTypeProcessorCore( |
144 | 198 type, data_type_state, processor_interface.Pass())); |
145 // TODO(rlarocque): DataTypeState should be forwarded to core here. | |
146 | 199 |
147 // Initialize Processor -> CoreProcessor communication channel. | 200 // Initialize Processor -> CoreProcessor communication channel. |
148 scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface( | 201 scoped_ptr<NonBlockingTypeProcessorCoreInterface> core_interface( |
149 new NonBlockingTypeProcessorCoreWrapper( | 202 new NonBlockingTypeProcessorCoreWrapper( |
150 core->AsWeakPtr(), | 203 core->AsWeakPtr(), |
151 scoped_refptr<base::SequencedTaskRunner>( | 204 scoped_refptr<base::SequencedTaskRunner>( |
152 base::MessageLoopProxy::current()))); | 205 base::MessageLoopProxy::current()))); |
153 type_task_runner->PostTask(FROM_HERE, | 206 type_task_runner->PostTask(FROM_HERE, |
154 base::Bind(&NonBlockingTypeProcessor::OnConnect, | 207 base::Bind(&NonBlockingTypeProcessor::OnConnect, |
155 processor, | 208 processor, |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
241 ModelTypeSet enabled_off_thread_types; | 294 ModelTypeSet enabled_off_thread_types; |
242 for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it = | 295 for (ScopedVector<NonBlockingTypeProcessorCore>::const_iterator it = |
243 non_blocking_type_processor_cores_.begin(); | 296 non_blocking_type_processor_cores_.begin(); |
244 it != non_blocking_type_processor_cores_.end(); ++it) { | 297 it != non_blocking_type_processor_cores_.end(); ++it) { |
245 enabled_off_thread_types.Put((*it)->GetModelType()); | 298 enabled_off_thread_types.Put((*it)->GetModelType()); |
246 } | 299 } |
247 return enabled_off_thread_types; | 300 return enabled_off_thread_types; |
248 } | 301 } |
249 | 302 |
250 } // namespace syncer | 303 } // namespace syncer |
OLD | NEW |