Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2014 The Chromium Authors. All rights reserved. | 1 // Copyright 2014 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "components/sync/driver/shared_change_processor.h" | 5 #include "components/sync/driver/shared_change_processor.h" |
| 6 | 6 |
| 7 #include <utility> | 7 #include <utility> |
| 8 | 8 |
| 9 #include "base/threading/sequenced_task_runner_handle.h" | |
| 9 #include "base/threading/thread_task_runner_handle.h" | 10 #include "base/threading/thread_task_runner_handle.h" |
|
gab
2016/12/20 20:11:29
rm?
fdoray
2016/12/20 20:34:39
Still needed to initialize |frontend_task_runner_|
| |
| 10 #include "components/sync/base/data_type_histogram.h" | 11 #include "components/sync/base/data_type_histogram.h" |
| 11 #include "components/sync/driver/generic_change_processor.h" | 12 #include "components/sync/driver/generic_change_processor.h" |
| 12 #include "components/sync/driver/generic_change_processor_factory.h" | 13 #include "components/sync/driver/generic_change_processor_factory.h" |
| 13 #include "components/sync/driver/shared_change_processor_ref.h" | 14 #include "components/sync/driver/shared_change_processor_ref.h" |
| 14 #include "components/sync/driver/sync_client.h" | 15 #include "components/sync/driver/sync_client.h" |
| 15 #include "components/sync/model/sync_change.h" | 16 #include "components/sync/model/sync_change.h" |
| 16 #include "components/sync/model/syncable_service.h" | 17 #include "components/sync/model/syncable_service.h" |
| 17 | 18 |
| 18 using base::AutoLock; | 19 using base::AutoLock; |
| 19 | 20 |
| 20 namespace syncer { | 21 namespace syncer { |
| 21 | 22 |
| 22 class AttachmentService; | 23 class AttachmentService; |
| 23 | 24 |
| 24 SharedChangeProcessor::SharedChangeProcessor(ModelType type) | 25 SharedChangeProcessor::SharedChangeProcessor(ModelType type) |
| 25 : disconnected_(false), | 26 : disconnected_(false), |
| 26 type_(type), | 27 type_(type), |
| 27 frontend_task_runner_(base::ThreadTaskRunnerHandle::Get()), | 28 frontend_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
| 28 generic_change_processor_(nullptr) { | 29 generic_change_processor_(nullptr) { |
| 29 DCHECK_NE(type_, UNSPECIFIED); | 30 DCHECK_NE(type_, UNSPECIFIED); |
| 30 } | 31 } |
| 31 | 32 |
| 32 SharedChangeProcessor::~SharedChangeProcessor() { | 33 SharedChangeProcessor::~SharedChangeProcessor() { |
| 33 // We can either be deleted when the DTC is destroyed (on UI | 34 // We can either be deleted when the DTC is destroyed (on UI thread), or when |
| 34 // thread), or when the SyncableService stops syncing (datatype | 35 // the SyncableService stops syncing (on |backend_task_runner_|). |
| 35 // thread). |generic_change_processor_|, if non-null, must be | 36 // |generic_change_processor_|, if non-null, must be deleted on |
| 36 // deleted on |backend_loop_|. | 37 // |backend_task_runner_|. |
| 37 if (backend_task_runner_.get()) { | 38 if (backend_task_runner_.get()) { |
| 38 if (backend_task_runner_->BelongsToCurrentThread()) { | 39 if (backend_task_runner_->RunsTasksOnCurrentThread()) { |
| 39 delete generic_change_processor_; | 40 delete generic_change_processor_; |
| 40 } else { | 41 } else { |
| 41 DCHECK(frontend_task_runner_->BelongsToCurrentThread()); | 42 DCHECK(frontend_task_runner_->BelongsToCurrentThread()); |
| 42 if (!backend_task_runner_->DeleteSoon(FROM_HERE, | 43 if (!backend_task_runner_->DeleteSoon(FROM_HERE, |
| 43 generic_change_processor_)) { | 44 generic_change_processor_)) { |
| 44 NOTREACHED(); | 45 NOTREACHED(); |
| 45 } | 46 } |
| 46 } | 47 } |
| 47 } else { | 48 } else { |
| 48 DCHECK(!generic_change_processor_); | 49 DCHECK(!generic_change_processor_); |
| (...skipping 89 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 138 } | 139 } |
| 139 | 140 |
| 140 base::WeakPtr<SyncableService> SharedChangeProcessor::Connect( | 141 base::WeakPtr<SyncableService> SharedChangeProcessor::Connect( |
| 141 SyncClient* sync_client, | 142 SyncClient* sync_client, |
| 142 GenericChangeProcessorFactory* processor_factory, | 143 GenericChangeProcessorFactory* processor_factory, |
| 143 UserShare* user_share, | 144 UserShare* user_share, |
| 144 std::unique_ptr<DataTypeErrorHandler> error_handler, | 145 std::unique_ptr<DataTypeErrorHandler> error_handler, |
| 145 const base::WeakPtr<SyncMergeResult>& merge_result) { | 146 const base::WeakPtr<SyncMergeResult>& merge_result) { |
| 146 DCHECK(sync_client); | 147 DCHECK(sync_client); |
| 147 DCHECK(error_handler); | 148 DCHECK(error_handler); |
| 148 backend_task_runner_ = base::ThreadTaskRunnerHandle::Get(); | 149 backend_task_runner_ = base::SequencedTaskRunnerHandle::Get(); |
| 149 AutoLock lock(monitor_lock_); | 150 AutoLock lock(monitor_lock_); |
| 150 if (disconnected_) | 151 if (disconnected_) |
| 151 return base::WeakPtr<SyncableService>(); | 152 return base::WeakPtr<SyncableService>(); |
| 152 error_handler_ = std::move(error_handler); | 153 error_handler_ = std::move(error_handler); |
| 153 base::WeakPtr<SyncableService> local_service = | 154 base::WeakPtr<SyncableService> local_service = |
| 154 sync_client->GetSyncableServiceForType(type_); | 155 sync_client->GetSyncableServiceForType(type_); |
| 155 if (!local_service.get()) { | 156 if (!local_service.get()) { |
| 156 LOG(WARNING) << "SyncableService destroyed before DTC was stopped."; | 157 LOG(WARNING) << "SyncableService destroyed before DTC was stopped."; |
| 157 disconnected_ = true; | 158 disconnected_ = true; |
| 158 return base::WeakPtr<SyncableService>(); | 159 return base::WeakPtr<SyncableService>(); |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 181 error_handler_.reset(); | 182 error_handler_.reset(); |
| 182 return was_connected; | 183 return was_connected; |
| 183 } | 184 } |
| 184 | 185 |
| 185 ChangeProcessor* SharedChangeProcessor::generic_change_processor() { | 186 ChangeProcessor* SharedChangeProcessor::generic_change_processor() { |
| 186 return generic_change_processor_; | 187 return generic_change_processor_; |
| 187 } | 188 } |
| 188 | 189 |
| 189 int SharedChangeProcessor::GetSyncCount() { | 190 int SharedChangeProcessor::GetSyncCount() { |
| 190 DCHECK(backend_task_runner_.get()); | 191 DCHECK(backend_task_runner_.get()); |
| 191 DCHECK(backend_task_runner_->BelongsToCurrentThread()); | 192 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread()); |
| 192 AutoLock lock(monitor_lock_); | 193 AutoLock lock(monitor_lock_); |
| 193 if (disconnected_) { | 194 if (disconnected_) { |
| 194 LOG(ERROR) << "Change processor disconnected."; | 195 LOG(ERROR) << "Change processor disconnected."; |
| 195 return 0; | 196 return 0; |
| 196 } | 197 } |
| 197 return generic_change_processor_->GetSyncCount(); | 198 return generic_change_processor_->GetSyncCount(); |
| 198 } | 199 } |
| 199 | 200 |
| 200 SyncError SharedChangeProcessor::ProcessSyncChanges( | 201 SyncError SharedChangeProcessor::ProcessSyncChanges( |
| 201 const tracked_objects::Location& from_here, | 202 const tracked_objects::Location& from_here, |
| 202 const SyncChangeList& list_of_changes) { | 203 const SyncChangeList& list_of_changes) { |
| 203 DCHECK(backend_task_runner_.get()); | 204 DCHECK(backend_task_runner_.get()); |
| 204 DCHECK(backend_task_runner_->BelongsToCurrentThread()); | 205 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread()); |
| 205 AutoLock lock(monitor_lock_); | 206 AutoLock lock(monitor_lock_); |
| 206 if (disconnected_) { | 207 if (disconnected_) { |
| 207 // The DTC that disconnects us must ensure it posts a StopSyncing task. | 208 // The DTC that disconnects us must ensure it posts a StopSyncing task. |
| 208 // If we reach this, it means it just hasn't executed yet. | 209 // If we reach this, it means it just hasn't executed yet. |
| 209 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, | 210 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, |
| 210 "Change processor disconnected.", type_); | 211 "Change processor disconnected.", type_); |
| 211 return error; | 212 return error; |
| 212 } | 213 } |
| 213 return generic_change_processor_->ProcessSyncChanges(from_here, | 214 return generic_change_processor_->ProcessSyncChanges(from_here, |
| 214 list_of_changes); | 215 list_of_changes); |
| 215 } | 216 } |
| 216 | 217 |
| 217 SyncDataList SharedChangeProcessor::GetAllSyncData(ModelType type) const { | 218 SyncDataList SharedChangeProcessor::GetAllSyncData(ModelType type) const { |
| 218 SyncDataList data; | 219 SyncDataList data; |
| 219 GetAllSyncDataReturnError(type, &data); // Handles the disconnect case. | 220 GetAllSyncDataReturnError(type, &data); // Handles the disconnect case. |
| 220 return data; | 221 return data; |
| 221 } | 222 } |
| 222 | 223 |
| 223 SyncError SharedChangeProcessor::GetAllSyncDataReturnError( | 224 SyncError SharedChangeProcessor::GetAllSyncDataReturnError( |
| 224 ModelType type, | 225 ModelType type, |
| 225 SyncDataList* data) const { | 226 SyncDataList* data) const { |
| 226 DCHECK(backend_task_runner_.get()); | 227 DCHECK(backend_task_runner_.get()); |
| 227 DCHECK(backend_task_runner_->BelongsToCurrentThread()); | 228 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread()); |
| 228 AutoLock lock(monitor_lock_); | 229 AutoLock lock(monitor_lock_); |
| 229 if (disconnected_) { | 230 if (disconnected_) { |
| 230 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, | 231 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, |
| 231 "Change processor disconnected.", type_); | 232 "Change processor disconnected.", type_); |
| 232 return error; | 233 return error; |
| 233 } | 234 } |
| 234 return generic_change_processor_->GetAllSyncDataReturnError(data); | 235 return generic_change_processor_->GetAllSyncDataReturnError(data); |
| 235 } | 236 } |
| 236 | 237 |
| 237 SyncError SharedChangeProcessor::UpdateDataTypeContext( | 238 SyncError SharedChangeProcessor::UpdateDataTypeContext( |
| 238 ModelType type, | 239 ModelType type, |
| 239 SyncChangeProcessor::ContextRefreshStatus refresh_status, | 240 SyncChangeProcessor::ContextRefreshStatus refresh_status, |
| 240 const std::string& context) { | 241 const std::string& context) { |
| 241 DCHECK(backend_task_runner_.get()); | 242 DCHECK(backend_task_runner_.get()); |
| 242 DCHECK(backend_task_runner_->BelongsToCurrentThread()); | 243 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread()); |
| 243 AutoLock lock(monitor_lock_); | 244 AutoLock lock(monitor_lock_); |
| 244 if (disconnected_) { | 245 if (disconnected_) { |
| 245 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, | 246 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, |
| 246 "Change processor disconnected.", type_); | 247 "Change processor disconnected.", type_); |
| 247 return error; | 248 return error; |
| 248 } | 249 } |
| 249 return generic_change_processor_->UpdateDataTypeContext(type, refresh_status, | 250 return generic_change_processor_->UpdateDataTypeContext(type, refresh_status, |
| 250 context); | 251 context); |
| 251 } | 252 } |
| 252 | 253 |
| 253 void SharedChangeProcessor::AddLocalChangeObserver( | 254 void SharedChangeProcessor::AddLocalChangeObserver( |
| 254 LocalChangeObserver* observer) { | 255 LocalChangeObserver* observer) { |
| 255 DCHECK(backend_task_runner_.get()); | 256 DCHECK(backend_task_runner_.get()); |
| 256 DCHECK(backend_task_runner_->BelongsToCurrentThread()); | 257 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread()); |
| 257 | 258 |
| 258 generic_change_processor_->AddLocalChangeObserver(observer); | 259 generic_change_processor_->AddLocalChangeObserver(observer); |
| 259 } | 260 } |
| 260 | 261 |
| 261 void SharedChangeProcessor::RemoveLocalChangeObserver( | 262 void SharedChangeProcessor::RemoveLocalChangeObserver( |
| 262 LocalChangeObserver* observer) { | 263 LocalChangeObserver* observer) { |
| 263 DCHECK(backend_task_runner_.get()); | 264 DCHECK(backend_task_runner_.get()); |
| 264 DCHECK(backend_task_runner_->BelongsToCurrentThread()); | 265 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread()); |
| 265 | 266 |
| 266 generic_change_processor_->RemoveLocalChangeObserver(observer); | 267 generic_change_processor_->RemoveLocalChangeObserver(observer); |
| 267 } | 268 } |
| 268 | 269 |
| 269 bool SharedChangeProcessor::SyncModelHasUserCreatedNodes(bool* has_nodes) { | 270 bool SharedChangeProcessor::SyncModelHasUserCreatedNodes(bool* has_nodes) { |
| 270 DCHECK(backend_task_runner_.get()); | 271 DCHECK(backend_task_runner_.get()); |
| 271 DCHECK(backend_task_runner_->BelongsToCurrentThread()); | 272 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread()); |
| 272 AutoLock lock(monitor_lock_); | 273 AutoLock lock(monitor_lock_); |
| 273 if (disconnected_) { | 274 if (disconnected_) { |
| 274 LOG(ERROR) << "Change processor disconnected."; | 275 LOG(ERROR) << "Change processor disconnected."; |
| 275 return false; | 276 return false; |
| 276 } | 277 } |
| 277 return generic_change_processor_->SyncModelHasUserCreatedNodes(has_nodes); | 278 return generic_change_processor_->SyncModelHasUserCreatedNodes(has_nodes); |
| 278 } | 279 } |
| 279 | 280 |
| 280 bool SharedChangeProcessor::CryptoReadyIfNecessary() { | 281 bool SharedChangeProcessor::CryptoReadyIfNecessary() { |
| 281 DCHECK(backend_task_runner_.get()); | 282 DCHECK(backend_task_runner_.get()); |
| 282 DCHECK(backend_task_runner_->BelongsToCurrentThread()); | 283 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread()); |
| 283 AutoLock lock(monitor_lock_); | 284 AutoLock lock(monitor_lock_); |
| 284 if (disconnected_) { | 285 if (disconnected_) { |
| 285 LOG(ERROR) << "Change processor disconnected."; | 286 LOG(ERROR) << "Change processor disconnected."; |
| 286 return true; // Otherwise we get into infinite spin waiting. | 287 return true; // Otherwise we get into infinite spin waiting. |
| 287 } | 288 } |
| 288 return generic_change_processor_->CryptoReadyIfNecessary(); | 289 return generic_change_processor_->CryptoReadyIfNecessary(); |
| 289 } | 290 } |
| 290 | 291 |
| 291 bool SharedChangeProcessor::GetDataTypeContext(std::string* context) const { | 292 bool SharedChangeProcessor::GetDataTypeContext(std::string* context) const { |
| 292 DCHECK(backend_task_runner_.get()); | 293 DCHECK(backend_task_runner_.get()); |
| 293 DCHECK(backend_task_runner_->BelongsToCurrentThread()); | 294 DCHECK(backend_task_runner_->RunsTasksOnCurrentThread()); |
| 294 AutoLock lock(monitor_lock_); | 295 AutoLock lock(monitor_lock_); |
| 295 if (disconnected_) { | 296 if (disconnected_) { |
| 296 LOG(ERROR) << "Change processor disconnected."; | 297 LOG(ERROR) << "Change processor disconnected."; |
| 297 return false; | 298 return false; |
| 298 } | 299 } |
| 299 return generic_change_processor_->GetDataTypeContext(context); | 300 return generic_change_processor_->GetDataTypeContext(context); |
| 300 } | 301 } |
| 301 | 302 |
| 302 SyncError SharedChangeProcessor::CreateAndUploadError( | 303 SyncError SharedChangeProcessor::CreateAndUploadError( |
| 303 const tracked_objects::Location& location, | 304 const tracked_objects::Location& location, |
| (...skipping 13 matching lines...) Expand all Loading... | |
| 317 #undef PER_DATA_TYPE_MACRO | 318 #undef PER_DATA_TYPE_MACRO |
| 318 } | 319 } |
| 319 | 320 |
| 320 void SharedChangeProcessor::StopLocalService() { | 321 void SharedChangeProcessor::StopLocalService() { |
| 321 if (local_service_.get()) | 322 if (local_service_.get()) |
| 322 local_service_->StopSyncing(type_); | 323 local_service_->StopSyncing(type_); |
| 323 local_service_.reset(); | 324 local_service_.reset(); |
| 324 } | 325 } |
| 325 | 326 |
| 326 } // namespace syncer | 327 } // namespace syncer |
| OLD | NEW |