Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(65)

Side by Side Diff: components/sync/driver/shared_change_processor.cc

Issue 2593803002: Make sync's change processors sequence-affine. (Closed)
Patch Set: Created 3 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/sync/driver/shared_change_processor.h" 5 #include "components/sync/driver/shared_change_processor.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/threading/sequenced_task_runner_handle.h"
9 #include "base/threading/thread_task_runner_handle.h" 10 #include "base/threading/thread_task_runner_handle.h"
gab 2016/12/20 20:11:29 rm?
fdoray 2016/12/20 20:34:39 Still needed to initialize |frontend_task_runner_|
10 #include "components/sync/base/data_type_histogram.h" 11 #include "components/sync/base/data_type_histogram.h"
11 #include "components/sync/driver/generic_change_processor.h" 12 #include "components/sync/driver/generic_change_processor.h"
12 #include "components/sync/driver/generic_change_processor_factory.h" 13 #include "components/sync/driver/generic_change_processor_factory.h"
13 #include "components/sync/driver/shared_change_processor_ref.h" 14 #include "components/sync/driver/shared_change_processor_ref.h"
14 #include "components/sync/driver/sync_client.h" 15 #include "components/sync/driver/sync_client.h"
15 #include "components/sync/model/sync_change.h" 16 #include "components/sync/model/sync_change.h"
16 #include "components/sync/model/syncable_service.h" 17 #include "components/sync/model/syncable_service.h"
17 18
18 using base::AutoLock; 19 using base::AutoLock;
19 20
20 namespace syncer { 21 namespace syncer {
21 22
22 class AttachmentService; 23 class AttachmentService;
23 24
24 SharedChangeProcessor::SharedChangeProcessor(ModelType type) 25 SharedChangeProcessor::SharedChangeProcessor(ModelType type)
25 : disconnected_(false), 26 : disconnected_(false),
26 type_(type), 27 type_(type),
27 frontend_task_runner_(base::ThreadTaskRunnerHandle::Get()), 28 frontend_task_runner_(base::ThreadTaskRunnerHandle::Get()),
28 generic_change_processor_(nullptr) { 29 generic_change_processor_(nullptr) {
29 DCHECK_NE(type_, UNSPECIFIED); 30 DCHECK_NE(type_, UNSPECIFIED);
30 } 31 }
31 32
32 SharedChangeProcessor::~SharedChangeProcessor() { 33 SharedChangeProcessor::~SharedChangeProcessor() {
33 // We can either be deleted when the DTC is destroyed (on UI 34 // We can either be deleted when the DTC is destroyed (on UI thread), or when
34 // thread), or when the SyncableService stops syncing (datatype 35 // the SyncableService stops syncing (on |backend_task_runner_|).
35 // thread). |generic_change_processor_|, if non-null, must be 36 // |generic_change_processor_|, if non-null, must be deleted on
36 // deleted on |backend_loop_|. 37 // |backend_task_runner_|.
37 if (backend_task_runner_.get()) { 38 if (backend_task_runner_.get()) {
38 if (backend_task_runner_->BelongsToCurrentThread()) { 39 if (backend_task_runner_->RunsTasksOnCurrentThread()) {
39 delete generic_change_processor_; 40 delete generic_change_processor_;
40 } else { 41 } else {
41 DCHECK(frontend_task_runner_->BelongsToCurrentThread()); 42 DCHECK(frontend_task_runner_->BelongsToCurrentThread());
42 if (!backend_task_runner_->DeleteSoon(FROM_HERE, 43 if (!backend_task_runner_->DeleteSoon(FROM_HERE,
43 generic_change_processor_)) { 44 generic_change_processor_)) {
44 NOTREACHED(); 45 NOTREACHED();
45 } 46 }
46 } 47 }
47 } else { 48 } else {
48 DCHECK(!generic_change_processor_); 49 DCHECK(!generic_change_processor_);
(...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after
138 } 139 }
139 140
140 base::WeakPtr<SyncableService> SharedChangeProcessor::Connect( 141 base::WeakPtr<SyncableService> SharedChangeProcessor::Connect(
141 SyncClient* sync_client, 142 SyncClient* sync_client,
142 GenericChangeProcessorFactory* processor_factory, 143 GenericChangeProcessorFactory* processor_factory,
143 UserShare* user_share, 144 UserShare* user_share,
144 std::unique_ptr<DataTypeErrorHandler> error_handler, 145 std::unique_ptr<DataTypeErrorHandler> error_handler,
145 const base::WeakPtr<SyncMergeResult>& merge_result) { 146 const base::WeakPtr<SyncMergeResult>& merge_result) {
146 DCHECK(sync_client); 147 DCHECK(sync_client);
147 DCHECK(error_handler); 148 DCHECK(error_handler);
148 backend_task_runner_ = base::ThreadTaskRunnerHandle::Get(); 149 backend_task_runner_ = base::SequencedTaskRunnerHandle::Get();
149 AutoLock lock(monitor_lock_); 150 AutoLock lock(monitor_lock_);
150 if (disconnected_) 151 if (disconnected_)
151 return base::WeakPtr<SyncableService>(); 152 return base::WeakPtr<SyncableService>();
152 error_handler_ = std::move(error_handler); 153 error_handler_ = std::move(error_handler);
153 base::WeakPtr<SyncableService> local_service = 154 base::WeakPtr<SyncableService> local_service =
154 sync_client->GetSyncableServiceForType(type_); 155 sync_client->GetSyncableServiceForType(type_);
155 if (!local_service.get()) { 156 if (!local_service.get()) {
156 LOG(WARNING) << "SyncableService destroyed before DTC was stopped."; 157 LOG(WARNING) << "SyncableService destroyed before DTC was stopped.";
157 disconnected_ = true; 158 disconnected_ = true;
158 return base::WeakPtr<SyncableService>(); 159 return base::WeakPtr<SyncableService>();
(...skipping 22 matching lines...) Expand all
181 error_handler_.reset(); 182 error_handler_.reset();
182 return was_connected; 183 return was_connected;
183 } 184 }
184 185
185 ChangeProcessor* SharedChangeProcessor::generic_change_processor() { 186 ChangeProcessor* SharedChangeProcessor::generic_change_processor() {
186 return generic_change_processor_; 187 return generic_change_processor_;
187 } 188 }
188 189
189 int SharedChangeProcessor::GetSyncCount() { 190 int SharedChangeProcessor::GetSyncCount() {
190 DCHECK(backend_task_runner_.get()); 191 DCHECK(backend_task_runner_.get());
191 DCHECK(backend_task_runner_->BelongsToCurrentThread()); 192 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread());
192 AutoLock lock(monitor_lock_); 193 AutoLock lock(monitor_lock_);
193 if (disconnected_) { 194 if (disconnected_) {
194 LOG(ERROR) << "Change processor disconnected."; 195 LOG(ERROR) << "Change processor disconnected.";
195 return 0; 196 return 0;
196 } 197 }
197 return generic_change_processor_->GetSyncCount(); 198 return generic_change_processor_->GetSyncCount();
198 } 199 }
199 200
200 SyncError SharedChangeProcessor::ProcessSyncChanges( 201 SyncError SharedChangeProcessor::ProcessSyncChanges(
201 const tracked_objects::Location& from_here, 202 const tracked_objects::Location& from_here,
202 const SyncChangeList& list_of_changes) { 203 const SyncChangeList& list_of_changes) {
203 DCHECK(backend_task_runner_.get()); 204 DCHECK(backend_task_runner_.get());
204 DCHECK(backend_task_runner_->BelongsToCurrentThread()); 205 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread());
205 AutoLock lock(monitor_lock_); 206 AutoLock lock(monitor_lock_);
206 if (disconnected_) { 207 if (disconnected_) {
207 // The DTC that disconnects us must ensure it posts a StopSyncing task. 208 // The DTC that disconnects us must ensure it posts a StopSyncing task.
208 // If we reach this, it means it just hasn't executed yet. 209 // If we reach this, it means it just hasn't executed yet.
209 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, 210 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR,
210 "Change processor disconnected.", type_); 211 "Change processor disconnected.", type_);
211 return error; 212 return error;
212 } 213 }
213 return generic_change_processor_->ProcessSyncChanges(from_here, 214 return generic_change_processor_->ProcessSyncChanges(from_here,
214 list_of_changes); 215 list_of_changes);
215 } 216 }
216 217
217 SyncDataList SharedChangeProcessor::GetAllSyncData(ModelType type) const { 218 SyncDataList SharedChangeProcessor::GetAllSyncData(ModelType type) const {
218 SyncDataList data; 219 SyncDataList data;
219 GetAllSyncDataReturnError(type, &data); // Handles the disconnect case. 220 GetAllSyncDataReturnError(type, &data); // Handles the disconnect case.
220 return data; 221 return data;
221 } 222 }
222 223
223 SyncError SharedChangeProcessor::GetAllSyncDataReturnError( 224 SyncError SharedChangeProcessor::GetAllSyncDataReturnError(
224 ModelType type, 225 ModelType type,
225 SyncDataList* data) const { 226 SyncDataList* data) const {
226 DCHECK(backend_task_runner_.get()); 227 DCHECK(backend_task_runner_.get());
227 DCHECK(backend_task_runner_->BelongsToCurrentThread()); 228 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread());
228 AutoLock lock(monitor_lock_); 229 AutoLock lock(monitor_lock_);
229 if (disconnected_) { 230 if (disconnected_) {
230 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, 231 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR,
231 "Change processor disconnected.", type_); 232 "Change processor disconnected.", type_);
232 return error; 233 return error;
233 } 234 }
234 return generic_change_processor_->GetAllSyncDataReturnError(data); 235 return generic_change_processor_->GetAllSyncDataReturnError(data);
235 } 236 }
236 237
237 SyncError SharedChangeProcessor::UpdateDataTypeContext( 238 SyncError SharedChangeProcessor::UpdateDataTypeContext(
238 ModelType type, 239 ModelType type,
239 SyncChangeProcessor::ContextRefreshStatus refresh_status, 240 SyncChangeProcessor::ContextRefreshStatus refresh_status,
240 const std::string& context) { 241 const std::string& context) {
241 DCHECK(backend_task_runner_.get()); 242 DCHECK(backend_task_runner_.get());
242 DCHECK(backend_task_runner_->BelongsToCurrentThread()); 243 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread());
243 AutoLock lock(monitor_lock_); 244 AutoLock lock(monitor_lock_);
244 if (disconnected_) { 245 if (disconnected_) {
245 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, 246 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR,
246 "Change processor disconnected.", type_); 247 "Change processor disconnected.", type_);
247 return error; 248 return error;
248 } 249 }
249 return generic_change_processor_->UpdateDataTypeContext(type, refresh_status, 250 return generic_change_processor_->UpdateDataTypeContext(type, refresh_status,
250 context); 251 context);
251 } 252 }
252 253
253 void SharedChangeProcessor::AddLocalChangeObserver( 254 void SharedChangeProcessor::AddLocalChangeObserver(
254 LocalChangeObserver* observer) { 255 LocalChangeObserver* observer) {
255 DCHECK(backend_task_runner_.get()); 256 DCHECK(backend_task_runner_.get());
256 DCHECK(backend_task_runner_->BelongsToCurrentThread()); 257 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread());
257 258
258 generic_change_processor_->AddLocalChangeObserver(observer); 259 generic_change_processor_->AddLocalChangeObserver(observer);
259 } 260 }
260 261
261 void SharedChangeProcessor::RemoveLocalChangeObserver( 262 void SharedChangeProcessor::RemoveLocalChangeObserver(
262 LocalChangeObserver* observer) { 263 LocalChangeObserver* observer) {
263 DCHECK(backend_task_runner_.get()); 264 DCHECK(backend_task_runner_.get());
264 DCHECK(backend_task_runner_->BelongsToCurrentThread()); 265 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread());
265 266
266 generic_change_processor_->RemoveLocalChangeObserver(observer); 267 generic_change_processor_->RemoveLocalChangeObserver(observer);
267 } 268 }
268 269
269 bool SharedChangeProcessor::SyncModelHasUserCreatedNodes(bool* has_nodes) { 270 bool SharedChangeProcessor::SyncModelHasUserCreatedNodes(bool* has_nodes) {
270 DCHECK(backend_task_runner_.get()); 271 DCHECK(backend_task_runner_.get());
271 DCHECK(backend_task_runner_->BelongsToCurrentThread()); 272 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread());
272 AutoLock lock(monitor_lock_); 273 AutoLock lock(monitor_lock_);
273 if (disconnected_) { 274 if (disconnected_) {
274 LOG(ERROR) << "Change processor disconnected."; 275 LOG(ERROR) << "Change processor disconnected.";
275 return false; 276 return false;
276 } 277 }
277 return generic_change_processor_->SyncModelHasUserCreatedNodes(has_nodes); 278 return generic_change_processor_->SyncModelHasUserCreatedNodes(has_nodes);
278 } 279 }
279 280
280 bool SharedChangeProcessor::CryptoReadyIfNecessary() { 281 bool SharedChangeProcessor::CryptoReadyIfNecessary() {
281 DCHECK(backend_task_runner_.get()); 282 DCHECK(backend_task_runner_.get());
282 DCHECK(backend_task_runner_->BelongsToCurrentThread()); 283 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread());
283 AutoLock lock(monitor_lock_); 284 AutoLock lock(monitor_lock_);
284 if (disconnected_) { 285 if (disconnected_) {
285 LOG(ERROR) << "Change processor disconnected."; 286 LOG(ERROR) << "Change processor disconnected.";
286 return true; // Otherwise we get into infinite spin waiting. 287 return true; // Otherwise we get into infinite spin waiting.
287 } 288 }
288 return generic_change_processor_->CryptoReadyIfNecessary(); 289 return generic_change_processor_->CryptoReadyIfNecessary();
289 } 290 }
290 291
291 bool SharedChangeProcessor::GetDataTypeContext(std::string* context) const { 292 bool SharedChangeProcessor::GetDataTypeContext(std::string* context) const {
292 DCHECK(backend_task_runner_.get()); 293 DCHECK(backend_task_runner_.get());
293 DCHECK(backend_task_runner_->BelongsToCurrentThread()); 294 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread());
294 AutoLock lock(monitor_lock_); 295 AutoLock lock(monitor_lock_);
295 if (disconnected_) { 296 if (disconnected_) {
296 LOG(ERROR) << "Change processor disconnected."; 297 LOG(ERROR) << "Change processor disconnected.";
297 return false; 298 return false;
298 } 299 }
299 return generic_change_processor_->GetDataTypeContext(context); 300 return generic_change_processor_->GetDataTypeContext(context);
300 } 301 }
301 302
302 SyncError SharedChangeProcessor::CreateAndUploadError( 303 SyncError SharedChangeProcessor::CreateAndUploadError(
303 const tracked_objects::Location& location, 304 const tracked_objects::Location& location,
(...skipping 13 matching lines...) Expand all
317 #undef PER_DATA_TYPE_MACRO 318 #undef PER_DATA_TYPE_MACRO
318 } 319 }
319 320
320 void SharedChangeProcessor::StopLocalService() { 321 void SharedChangeProcessor::StopLocalService() {
321 if (local_service_.get()) 322 if (local_service_.get())
322 local_service_->StopSyncing(type_); 323 local_service_->StopSyncing(type_);
323 local_service_.reset(); 324 local_service_.reset();
324 } 325 }
325 326
326 } // namespace syncer 327 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698