Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(70)

Side by Side Diff: sync/sessions/model_type_registry.cc

Issue 1325453003: [Sync] rename USS processor / worker interfaces (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/sessions/model_type_registry.h" 5 #include "sync/sessions/model_type_registry.h"
6 6
7 #include "base/bind.h" 7 #include "base/bind.h"
8 #include "base/observer_list.h" 8 #include "base/observer_list.h"
9 #include "base/thread_task_runner_handle.h" 9 #include "base/thread_task_runner_handle.h"
10 #include "sync/engine/commit_queue.h"
11 #include "sync/engine/commit_queue_impl.h"
10 #include "sync/engine/directory_commit_contributor.h" 12 #include "sync/engine/directory_commit_contributor.h"
11 #include "sync/engine/directory_update_handler.h" 13 #include "sync/engine/directory_update_handler.h"
12 #include "sync/engine/model_type_sync_proxy.h" 14 #include "sync/engine/model_type_processor.h"
13 #include "sync/engine/model_type_sync_proxy_impl.h" 15 #include "sync/engine/model_type_processor_impl.h"
14 #include "sync/engine/model_type_sync_worker.h"
15 #include "sync/engine/model_type_sync_worker_impl.h"
16 #include "sync/internal_api/public/non_blocking_sync_common.h" 16 #include "sync/internal_api/public/non_blocking_sync_common.h"
17 #include "sync/sessions/directory_type_debug_info_emitter.h" 17 #include "sync/sessions/directory_type_debug_info_emitter.h"
18 #include "sync/util/cryptographer.h" 18 #include "sync/util/cryptographer.h"
19 19
20 namespace syncer { 20 namespace syncer {
21 21
22 namespace { 22 namespace {
23 23
24 class ModelTypeSyncProxyWrapper : public syncer_v2::ModelTypeSyncProxy { 24 class ModelTypeProcessorWrapper : public syncer_v2::ModelTypeProcessor {
25 public: 25 public:
26 ModelTypeSyncProxyWrapper( 26 ModelTypeProcessorWrapper(
27 const base::WeakPtr<syncer_v2::ModelTypeSyncProxyImpl>& proxy, 27 const base::WeakPtr<syncer_v2::ModelTypeProcessorImpl>& proxy,
28 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner); 28 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner);
29 ~ModelTypeSyncProxyWrapper() override; 29 ~ModelTypeProcessorWrapper() override;
30 30
31 void OnCommitCompleted( 31 void OnCommitCompleted(
32 const syncer_v2::DataTypeState& type_state, 32 const syncer_v2::DataTypeState& type_state,
33 const syncer_v2::CommitResponseDataList& response_list) override; 33 const syncer_v2::CommitResponseDataList& response_list) override;
34 void OnUpdateReceived( 34 void OnUpdateReceived(
35 const syncer_v2::DataTypeState& type_state, 35 const syncer_v2::DataTypeState& type_state,
36 const syncer_v2::UpdateResponseDataList& response_list, 36 const syncer_v2::UpdateResponseDataList& response_list,
37 const syncer_v2::UpdateResponseDataList& pending_updates) override; 37 const syncer_v2::UpdateResponseDataList& pending_updates) override;
38 38
39 private: 39 private:
40 base::WeakPtr<syncer_v2::ModelTypeSyncProxyImpl> processor_; 40 base::WeakPtr<syncer_v2::ModelTypeProcessorImpl> processor_;
41 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_; 41 scoped_refptr<base::SequencedTaskRunner> processor_task_runner_;
42 }; 42 };
43 43
44 ModelTypeSyncProxyWrapper::ModelTypeSyncProxyWrapper( 44 ModelTypeProcessorWrapper::ModelTypeProcessorWrapper(
45 const base::WeakPtr<syncer_v2::ModelTypeSyncProxyImpl>& proxy, 45 const base::WeakPtr<syncer_v2::ModelTypeProcessorImpl>& proxy,
46 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner) 46 const scoped_refptr<base::SequencedTaskRunner>& processor_task_runner)
47 : processor_(proxy), processor_task_runner_(processor_task_runner) {} 47 : processor_(proxy), processor_task_runner_(processor_task_runner) {}
48 48
49 ModelTypeSyncProxyWrapper::~ModelTypeSyncProxyWrapper() { 49 ModelTypeProcessorWrapper::~ModelTypeProcessorWrapper() {
50 } 50 }
51 51
52 void ModelTypeSyncProxyWrapper::OnCommitCompleted( 52 void ModelTypeProcessorWrapper::OnCommitCompleted(
53 const syncer_v2::DataTypeState& type_state, 53 const syncer_v2::DataTypeState& type_state,
54 const syncer_v2::CommitResponseDataList& response_list) { 54 const syncer_v2::CommitResponseDataList& response_list) {
55 processor_task_runner_->PostTask( 55 processor_task_runner_->PostTask(
56 FROM_HERE, 56 FROM_HERE,
57 base::Bind(&syncer_v2::ModelTypeSyncProxyImpl::OnCommitCompleted, 57 base::Bind(&syncer_v2::ModelTypeProcessorImpl::OnCommitCompleted,
58 processor_, type_state, response_list)); 58 processor_, type_state, response_list));
59 } 59 }
60 60
61 void ModelTypeSyncProxyWrapper::OnUpdateReceived( 61 void ModelTypeProcessorWrapper::OnUpdateReceived(
62 const syncer_v2::DataTypeState& type_state, 62 const syncer_v2::DataTypeState& type_state,
63 const syncer_v2::UpdateResponseDataList& response_list, 63 const syncer_v2::UpdateResponseDataList& response_list,
64 const syncer_v2::UpdateResponseDataList& pending_updates) { 64 const syncer_v2::UpdateResponseDataList& pending_updates) {
65 processor_task_runner_->PostTask( 65 processor_task_runner_->PostTask(
66 FROM_HERE, 66 FROM_HERE,
67 base::Bind(&syncer_v2::ModelTypeSyncProxyImpl::OnUpdateReceived, 67 base::Bind(&syncer_v2::ModelTypeProcessorImpl::OnUpdateReceived,
68 processor_, type_state, response_list, pending_updates)); 68 processor_, type_state, response_list, pending_updates));
69 } 69 }
70 70
71 class ModelTypeSyncWorkerWrapper : public syncer_v2::ModelTypeSyncWorker { 71 class CommitQueueWrapper : public syncer_v2::CommitQueue {
72 public: 72 public:
73 ModelTypeSyncWorkerWrapper( 73 CommitQueueWrapper(
74 const base::WeakPtr<syncer_v2::ModelTypeSyncWorkerImpl>& worker, 74 const base::WeakPtr<syncer_v2::CommitQueueImpl>& worker,
75 const scoped_refptr<base::SequencedTaskRunner>& sync_thread); 75 const scoped_refptr<base::SequencedTaskRunner>& sync_thread);
76 ~ModelTypeSyncWorkerWrapper() override; 76 ~CommitQueueWrapper() override;
77 77
78 void EnqueueForCommit(const syncer_v2::CommitRequestDataList& list) override; 78 void EnqueueForCommit(const syncer_v2::CommitRequestDataList& list) override;
79 79
80 private: 80 private:
81 base::WeakPtr<syncer_v2::ModelTypeSyncWorkerImpl> worker_; 81 base::WeakPtr<syncer_v2::CommitQueueImpl> worker_;
82 scoped_refptr<base::SequencedTaskRunner> sync_thread_; 82 scoped_refptr<base::SequencedTaskRunner> sync_thread_;
83 }; 83 };
84 84
85 ModelTypeSyncWorkerWrapper::ModelTypeSyncWorkerWrapper( 85 CommitQueueWrapper::CommitQueueWrapper(
86 const base::WeakPtr<syncer_v2::ModelTypeSyncWorkerImpl>& worker, 86 const base::WeakPtr<syncer_v2::CommitQueueImpl>& worker,
87 const scoped_refptr<base::SequencedTaskRunner>& sync_thread) 87 const scoped_refptr<base::SequencedTaskRunner>& sync_thread)
88 : worker_(worker), sync_thread_(sync_thread) {} 88 : worker_(worker), sync_thread_(sync_thread) {}
89 89
90 ModelTypeSyncWorkerWrapper::~ModelTypeSyncWorkerWrapper() { 90 CommitQueueWrapper::~CommitQueueWrapper() {
91 } 91 }
92 92
93 void ModelTypeSyncWorkerWrapper::EnqueueForCommit( 93 void CommitQueueWrapper::EnqueueForCommit(
94 const syncer_v2::CommitRequestDataList& list) { 94 const syncer_v2::CommitRequestDataList& list) {
95 sync_thread_->PostTask( 95 sync_thread_->PostTask(
96 FROM_HERE, 96 FROM_HERE,
97 base::Bind(&syncer_v2::ModelTypeSyncWorkerImpl::EnqueueForCommit, worker_, 97 base::Bind(&syncer_v2::CommitQueueImpl::EnqueueForCommit, worker_,
98 list)); 98 list));
99 } 99 }
100 100
101 } // namespace 101 } // namespace
102 102
103 ModelTypeRegistry::ModelTypeRegistry( 103 ModelTypeRegistry::ModelTypeRegistry(
104 const std::vector<scoped_refptr<ModelSafeWorker> >& workers, 104 const std::vector<scoped_refptr<ModelSafeWorker> >& workers,
105 syncable::Directory* directory, 105 syncable::Directory* directory,
106 NudgeHandler* nudge_handler) 106 NudgeHandler* nudge_handler)
107 : directory_(directory), 107 : directory_(directory),
(...skipping 71 matching lines...) Expand 10 before | Expand all | Expand 10 after
179 enabled_directory_types_ = GetRoutingInfoTypes(routing_info); 179 enabled_directory_types_ = GetRoutingInfoTypes(routing_info);
180 DCHECK(Intersection(GetEnabledDirectoryTypes(), 180 DCHECK(Intersection(GetEnabledDirectoryTypes(),
181 GetEnabledNonBlockingTypes()).Empty()); 181 GetEnabledNonBlockingTypes()).Empty());
182 } 182 }
183 183
184 void ModelTypeRegistry::ConnectSyncTypeToWorker( 184 void ModelTypeRegistry::ConnectSyncTypeToWorker(
185 ModelType type, 185 ModelType type,
186 const syncer_v2::DataTypeState& data_type_state, 186 const syncer_v2::DataTypeState& data_type_state,
187 const syncer_v2::UpdateResponseDataList& saved_pending_updates, 187 const syncer_v2::UpdateResponseDataList& saved_pending_updates,
188 const scoped_refptr<base::SequencedTaskRunner>& type_task_runner, 188 const scoped_refptr<base::SequencedTaskRunner>& type_task_runner,
189 const base::WeakPtr<syncer_v2::ModelTypeSyncProxyImpl>& proxy_impl) { 189 const base::WeakPtr<syncer_v2::ModelTypeProcessorImpl>& proxy_impl) {
190 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type); 190 DVLOG(1) << "Enabling an off-thread sync type: " << ModelTypeToString(type);
191 191
192 // Initialize Worker -> Proxy communication channel. 192 // Initialize Worker -> Proxy communication channel.
193 scoped_ptr<syncer_v2::ModelTypeSyncProxy> proxy( 193 scoped_ptr<syncer_v2::ModelTypeProcessor> proxy(
194 new ModelTypeSyncProxyWrapper(proxy_impl, type_task_runner)); 194 new ModelTypeProcessorWrapper(proxy_impl, type_task_runner));
195 scoped_ptr<Cryptographer> cryptographer_copy; 195 scoped_ptr<Cryptographer> cryptographer_copy;
196 if (encrypted_types_.Has(type)) 196 if (encrypted_types_.Has(type))
197 cryptographer_copy.reset(new Cryptographer(*cryptographer_)); 197 cryptographer_copy.reset(new Cryptographer(*cryptographer_));
198 198
199 scoped_ptr<syncer_v2::ModelTypeSyncWorkerImpl> worker( 199 scoped_ptr<syncer_v2::CommitQueueImpl> worker(
200 new syncer_v2::ModelTypeSyncWorkerImpl( 200 new syncer_v2::CommitQueueImpl(
201 type, data_type_state, saved_pending_updates, 201 type, data_type_state, saved_pending_updates,
202 cryptographer_copy.Pass(), nudge_handler_, proxy.Pass())); 202 cryptographer_copy.Pass(), nudge_handler_, proxy.Pass()));
203 203
204 // Initialize Proxy -> Worker communication channel. 204 // Initialize Proxy -> Worker communication channel.
205 scoped_ptr<syncer_v2::ModelTypeSyncWorker> wrapped_worker( 205 scoped_ptr<syncer_v2::CommitQueue> wrapped_worker(
206 new ModelTypeSyncWorkerWrapper(worker->AsWeakPtr(), 206 new CommitQueueWrapper(worker->AsWeakPtr(),
207 scoped_refptr<base::SequencedTaskRunner>( 207 scoped_refptr<base::SequencedTaskRunner>(
208 base::ThreadTaskRunnerHandle::Get()))); 208 base::ThreadTaskRunnerHandle::Get())));
209 type_task_runner->PostTask( 209 type_task_runner->PostTask(
210 FROM_HERE, base::Bind(&syncer_v2::ModelTypeSyncProxyImpl::OnConnect, 210 FROM_HERE, base::Bind(&syncer_v2::ModelTypeProcessorImpl::OnConnect,
211 proxy_impl, base::Passed(&wrapped_worker))); 211 proxy_impl, base::Passed(&wrapped_worker)));
212 212
213 DCHECK(update_handler_map_.find(type) == update_handler_map_.end()); 213 DCHECK(update_handler_map_.find(type) == update_handler_map_.end());
214 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end()); 214 DCHECK(commit_contributor_map_.find(type) == commit_contributor_map_.end());
215 215
216 update_handler_map_.insert(std::make_pair(type, worker.get())); 216 update_handler_map_.insert(std::make_pair(type, worker.get()));
217 commit_contributor_map_.insert(std::make_pair(type, worker.get())); 217 commit_contributor_map_.insert(std::make_pair(type, worker.get()));
218 218
219 // The container takes ownership. 219 // The container takes ownership.
220 model_type_sync_workers_.push_back(worker.Pass()); 220 commit_queues_.push_back(worker.Pass());
221 221
222 DCHECK(Intersection(GetEnabledDirectoryTypes(), 222 DCHECK(Intersection(GetEnabledDirectoryTypes(),
223 GetEnabledNonBlockingTypes()).Empty()); 223 GetEnabledNonBlockingTypes()).Empty());
224 } 224 }
225 225
226 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) { 226 void ModelTypeRegistry::DisconnectSyncWorker(ModelType type) {
227 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type); 227 DVLOG(1) << "Disabling an off-thread sync type: " << ModelTypeToString(type);
228 DCHECK(update_handler_map_.find(type) != update_handler_map_.end()); 228 DCHECK(update_handler_map_.find(type) != update_handler_map_.end());
229 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end()); 229 DCHECK(commit_contributor_map_.find(type) != commit_contributor_map_.end());
230 230
231 size_t updaters_erased = update_handler_map_.erase(type); 231 size_t updaters_erased = update_handler_map_.erase(type);
232 size_t committers_erased = commit_contributor_map_.erase(type); 232 size_t committers_erased = commit_contributor_map_.erase(type);
233 233
234 DCHECK_EQ(1U, updaters_erased); 234 DCHECK_EQ(1U, updaters_erased);
235 DCHECK_EQ(1U, committers_erased); 235 DCHECK_EQ(1U, committers_erased);
236 236
237 // Remove from the ScopedVector, deleting the worker in the process. 237 // Remove from the ScopedVector, deleting the worker in the process.
238 for (ScopedVector<syncer_v2::ModelTypeSyncWorkerImpl>::iterator it = 238 for (ScopedVector<syncer_v2::CommitQueueImpl>::iterator it =
239 model_type_sync_workers_.begin(); 239 commit_queues_.begin();
240 it != model_type_sync_workers_.end(); ++it) { 240 it != commit_queues_.end(); ++it) {
241 if ((*it)->GetModelType() == type) { 241 if ((*it)->GetModelType() == type) {
242 model_type_sync_workers_.erase(it); 242 commit_queues_.erase(it);
243 break; 243 break;
244 } 244 }
245 } 245 }
246 } 246 }
247 247
248 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const { 248 ModelTypeSet ModelTypeRegistry::GetEnabledTypes() const {
249 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes()); 249 return Union(GetEnabledDirectoryTypes(), GetEnabledNonBlockingTypes());
250 } 250 }
251 251
252 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() { 252 UpdateHandlerMap* ModelTypeRegistry::update_handler_map() {
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
326 326
327 void ModelTypeRegistry::OnLocalSetPassphraseEncryption( 327 void ModelTypeRegistry::OnLocalSetPassphraseEncryption(
328 const SyncEncryptionHandler::NigoriState& nigori_state) { 328 const SyncEncryptionHandler::NigoriState& nigori_state) {
329 } 329 }
330 330
331 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const { 331 ModelTypeSet ModelTypeRegistry::GetEnabledDirectoryTypes() const {
332 return enabled_directory_types_; 332 return enabled_directory_types_;
333 } 333 }
334 334
335 void ModelTypeRegistry::OnEncryptionStateChanged() { 335 void ModelTypeRegistry::OnEncryptionStateChanged() {
336 for (ScopedVector<syncer_v2::ModelTypeSyncWorkerImpl>::iterator it = 336 for (ScopedVector<syncer_v2::CommitQueueImpl>::iterator it =
337 model_type_sync_workers_.begin(); 337 commit_queues_.begin();
338 it != model_type_sync_workers_.end(); ++it) { 338 it != commit_queues_.end(); ++it) {
339 if (encrypted_types_.Has((*it)->GetModelType())) { 339 if (encrypted_types_.Has((*it)->GetModelType())) {
340 (*it)->UpdateCryptographer( 340 (*it)->UpdateCryptographer(
341 make_scoped_ptr(new Cryptographer(*cryptographer_))); 341 make_scoped_ptr(new Cryptographer(*cryptographer_)));
342 } 342 }
343 } 343 }
344 } 344 }
345 345
346 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const { 346 ModelTypeSet ModelTypeRegistry::GetEnabledNonBlockingTypes() const {
347 ModelTypeSet enabled_off_thread_types; 347 ModelTypeSet enabled_off_thread_types;
348 for (ScopedVector<syncer_v2::ModelTypeSyncWorkerImpl>::const_iterator it = 348 for (ScopedVector<syncer_v2::CommitQueueImpl>::const_iterator it =
349 model_type_sync_workers_.begin(); 349 commit_queues_.begin();
350 it != model_type_sync_workers_.end(); ++it) { 350 it != commit_queues_.end(); ++it) {
351 enabled_off_thread_types.Put((*it)->GetModelType()); 351 enabled_off_thread_types.Put((*it)->GetModelType());
352 } 352 }
353 return enabled_off_thread_types; 353 return enabled_off_thread_types;
354 } 354 }
355 355
356 } // namespace syncer 356 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698