Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1536)

Side by Side Diff: components/sync/driver/generic_change_processor.cc

Issue 2593803002: Make sync's change processors sequence-affine. (Closed)
Patch Set: CR Created 3 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 The Chromium Authors. All rights reserved. 1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/sync/driver/generic_change_processor.h" 5 #include "components/sync/driver/generic_change_processor.h"
6 6
7 #include <stddef.h> 7 #include <stddef.h>
8 8
9 #include <algorithm> 9 #include <algorithm>
10 #include <utility> 10 #include <utility>
11 11
12 #include "base/location.h" 12 #include "base/location.h"
13 #include "base/memory/ptr_util.h" 13 #include "base/memory/ptr_util.h"
14 #include "base/strings/string_number_conversions.h" 14 #include "base/strings/string_number_conversions.h"
15 #include "base/strings/utf_string_conversions.h" 15 #include "base/strings/utf_string_conversions.h"
16 #include "base/threading/thread_task_runner_handle.h" 16 #include "base/threading/sequenced_task_runner_handle.h"
17 #include "components/sync/base/unrecoverable_error_handler.h" 17 #include "components/sync/base/unrecoverable_error_handler.h"
18 #include "components/sync/driver/sync_api_component_factory.h" 18 #include "components/sync/driver/sync_api_component_factory.h"
19 #include "components/sync/driver/sync_client.h" 19 #include "components/sync/driver/sync_client.h"
20 #include "components/sync/model/local_change_observer.h" 20 #include "components/sync/model/local_change_observer.h"
21 #include "components/sync/model/sync_change.h" 21 #include "components/sync/model/sync_change.h"
22 #include "components/sync/model/sync_error.h" 22 #include "components/sync/model/sync_error.h"
23 #include "components/sync/model/syncable_service.h" 23 #include "components/sync/model/syncable_service.h"
24 #include "components/sync/syncable/base_node.h" 24 #include "components/sync/syncable/base_node.h"
25 #include "components/sync/syncable/change_record.h" 25 #include "components/sync/syncable/change_record.h"
26 #include "components/sync/syncable/entry.h" // TODO(tim): Bug 123674. 26 #include "components/sync/syncable/entry.h" // TODO(tim): Bug 123674.
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after
112 const base::WeakPtr<SyncMergeResult>& merge_result, 112 const base::WeakPtr<SyncMergeResult>& merge_result,
113 UserShare* user_share, 113 UserShare* user_share,
114 SyncClient* sync_client, 114 SyncClient* sync_client,
115 std::unique_ptr<AttachmentStoreForSync> attachment_store) 115 std::unique_ptr<AttachmentStoreForSync> attachment_store)
116 : ChangeProcessor(std::move(error_handler)), 116 : ChangeProcessor(std::move(error_handler)),
117 type_(type), 117 type_(type),
118 local_service_(local_service), 118 local_service_(local_service),
119 merge_result_(merge_result), 119 merge_result_(merge_result),
120 share_handle_(user_share), 120 share_handle_(user_share),
121 weak_ptr_factory_(this) { 121 weak_ptr_factory_(this) {
122 DCHECK(CalledOnValidThread()); 122 DCHECK(sequence_checker_.CalledOnValidSequence());
123 DCHECK_NE(type_, UNSPECIFIED); 123 DCHECK_NE(type_, UNSPECIFIED);
124 if (attachment_store) { 124 if (attachment_store) {
125 std::string store_birthday; 125 std::string store_birthday;
126 { 126 {
127 ReadTransaction trans(FROM_HERE, share_handle()); 127 ReadTransaction trans(FROM_HERE, share_handle());
128 store_birthday = trans.GetStoreBirthday(); 128 store_birthday = trans.GetStoreBirthday();
129 } 129 }
130 attachment_service_ = 130 attachment_service_ =
131 sync_client->GetSyncApiComponentFactory()->CreateAttachmentService( 131 sync_client->GetSyncApiComponentFactory()->CreateAttachmentService(
132 std::move(attachment_store), *user_share, store_birthday, type, 132 std::move(attachment_store), *user_share, store_birthday, type,
133 this); 133 this);
134 attachment_service_weak_ptr_factory_ = 134 attachment_service_weak_ptr_factory_ =
135 base::MakeUnique<base::WeakPtrFactory<AttachmentService>>( 135 base::MakeUnique<base::WeakPtrFactory<AttachmentService>>(
136 attachment_service_.get()); 136 attachment_service_.get());
137 attachment_service_proxy_ = AttachmentServiceProxy( 137 attachment_service_proxy_ = AttachmentServiceProxy(
138 base::ThreadTaskRunnerHandle::Get(), 138 base::SequencedTaskRunnerHandle::Get(),
139 attachment_service_weak_ptr_factory_->GetWeakPtr()); 139 attachment_service_weak_ptr_factory_->GetWeakPtr());
140 UploadAllAttachmentsNotOnServer(); 140 UploadAllAttachmentsNotOnServer();
141 } else { 141 } else {
142 attachment_service_proxy_ = 142 attachment_service_proxy_ =
143 AttachmentServiceProxy(base::ThreadTaskRunnerHandle::Get(), 143 AttachmentServiceProxy(base::SequencedTaskRunnerHandle::Get(),
144 base::WeakPtr<AttachmentService>()); 144 base::WeakPtr<AttachmentService>());
145 } 145 }
146 } 146 }
147 147
148 GenericChangeProcessor::~GenericChangeProcessor() { 148 GenericChangeProcessor::~GenericChangeProcessor() {
149 DCHECK(CalledOnValidThread()); 149 DCHECK(sequence_checker_.CalledOnValidSequence());
150 } 150 }
151 151
152 void GenericChangeProcessor::ApplyChangesFromSyncModel( 152 void GenericChangeProcessor::ApplyChangesFromSyncModel(
153 const BaseTransaction* trans, 153 const BaseTransaction* trans,
154 int64_t model_version, 154 int64_t model_version,
155 const ImmutableChangeRecordList& changes) { 155 const ImmutableChangeRecordList& changes) {
156 DCHECK(CalledOnValidThread()); 156 DCHECK(sequence_checker_.CalledOnValidSequence());
157 DCHECK(syncer_changes_.empty()); 157 DCHECK(syncer_changes_.empty());
158 for (ChangeRecordList::const_iterator it = changes.Get().begin(); 158 for (ChangeRecordList::const_iterator it = changes.Get().begin();
159 it != changes.Get().end(); ++it) { 159 it != changes.Get().end(); ++it) {
160 if (it->action == ChangeRecord::ACTION_DELETE) { 160 if (it->action == ChangeRecord::ACTION_DELETE) {
161 std::unique_ptr<sync_pb::EntitySpecifics> specifics; 161 std::unique_ptr<sync_pb::EntitySpecifics> specifics;
162 if (it->specifics.has_password()) { 162 if (it->specifics.has_password()) {
163 DCHECK(it->extra.get()); 163 DCHECK(it->extra.get());
164 specifics = base::MakeUnique<sync_pb::EntitySpecifics>(it->specifics); 164 specifics = base::MakeUnique<sync_pb::EntitySpecifics>(it->specifics);
165 specifics->mutable_password() 165 specifics->mutable_password()
166 ->mutable_client_only_encrypted_data() 166 ->mutable_client_only_encrypted_data()
(...skipping 20 matching lines...) Expand all
187 return; 187 return;
188 } 188 }
189 syncer_changes_.push_back(SyncChange( 189 syncer_changes_.push_back(SyncChange(
190 FROM_HERE, action, 190 FROM_HERE, action,
191 BuildRemoteSyncData(it->id, read_node, attachment_service_proxy_))); 191 BuildRemoteSyncData(it->id, read_node, attachment_service_proxy_)));
192 } 192 }
193 } 193 }
194 } 194 }
195 195
196 void GenericChangeProcessor::CommitChangesFromSyncModel() { 196 void GenericChangeProcessor::CommitChangesFromSyncModel() {
197 DCHECK(CalledOnValidThread()); 197 DCHECK(sequence_checker_.CalledOnValidSequence());
198 if (syncer_changes_.empty()) 198 if (syncer_changes_.empty())
199 return; 199 return;
200 if (!local_service_.get()) { 200 if (!local_service_.get()) {
201 ModelType type = syncer_changes_[0].sync_data().GetDataType(); 201 ModelType type = syncer_changes_[0].sync_data().GetDataType();
202 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, 202 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR,
203 "Local service destroyed.", type); 203 "Local service destroyed.", type);
204 error_handler()->OnUnrecoverableError(error); 204 error_handler()->OnUnrecoverableError(error);
205 return; 205 return;
206 } 206 }
207 SyncError error = 207 SyncError error =
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
251 } 251 }
252 252
253 void GenericChangeProcessor::OnAttachmentUploaded( 253 void GenericChangeProcessor::OnAttachmentUploaded(
254 const AttachmentId& attachment_id) { 254 const AttachmentId& attachment_id) {
255 WriteTransaction trans(FROM_HERE, share_handle()); 255 WriteTransaction trans(FROM_HERE, share_handle());
256 trans.UpdateEntriesMarkAttachmentAsOnServer(attachment_id); 256 trans.UpdateEntriesMarkAttachmentAsOnServer(attachment_id);
257 } 257 }
258 258
259 SyncError GenericChangeProcessor::GetAllSyncDataReturnError( 259 SyncError GenericChangeProcessor::GetAllSyncDataReturnError(
260 SyncDataList* current_sync_data) const { 260 SyncDataList* current_sync_data) const {
261 DCHECK(CalledOnValidThread()); 261 DCHECK(sequence_checker_.CalledOnValidSequence());
262 std::string type_name = ModelTypeToString(type_); 262 std::string type_name = ModelTypeToString(type_);
263 ReadTransaction trans(FROM_HERE, share_handle()); 263 ReadTransaction trans(FROM_HERE, share_handle());
264 ReadNode root(&trans); 264 ReadNode root(&trans);
265 if (root.InitTypeRoot(type_) != BaseNode::INIT_OK) { 265 if (root.InitTypeRoot(type_) != BaseNode::INIT_OK) {
266 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR, 266 SyncError error(FROM_HERE, SyncError::DATATYPE_ERROR,
267 "Server did not create the top-level " + type_name + 267 "Server did not create the top-level " + type_name +
268 " node. We might be running against an out-of-" 268 " node. We might be running against an out-of-"
269 "date server.", 269 "date server.",
270 type_); 270 type_);
271 return error; 271 return error;
(...skipping 145 matching lines...) Expand 10 before | Expand all | Expand 10 after
417 if (IsActOnceDataType(type)) 417 if (IsActOnceDataType(type))
418 node->Drop(); 418 node->Drop();
419 else 419 else
420 node->Tombstone(); 420 node->Tombstone();
421 return SyncError(); 421 return SyncError();
422 } 422 }
423 423
424 SyncError GenericChangeProcessor::ProcessSyncChanges( 424 SyncError GenericChangeProcessor::ProcessSyncChanges(
425 const tracked_objects::Location& from_here, 425 const tracked_objects::Location& from_here,
426 const SyncChangeList& list_of_changes) { 426 const SyncChangeList& list_of_changes) {
427 DCHECK(CalledOnValidThread()); 427 DCHECK(sequence_checker_.CalledOnValidSequence());
428 428
429 if (list_of_changes.empty()) { 429 if (list_of_changes.empty()) {
430 // No work. Exit without entering WriteTransaction. 430 // No work. Exit without entering WriteTransaction.
431 return SyncError(); 431 return SyncError();
432 } 432 }
433 433
434 // Keep track of brand new attachments so we can persist them on this device 434 // Keep track of brand new attachments so we can persist them on this device
435 // and upload them to the server. 435 // and upload them to the server.
436 AttachmentIdSet new_attachments; 436 AttachmentIdSet new_attachments;
437 437
(...skipping 199 matching lines...) Expand 10 before | Expand all | Expand 10 after
637 if (merge_result_.get()) { 637 if (merge_result_.get()) {
638 merge_result_->set_num_items_modified(merge_result_->num_items_modified() + 638 merge_result_->set_num_items_modified(merge_result_->num_items_modified() +
639 1); 639 1);
640 } 640 }
641 // TODO(sync): Support updating other parts of the sync node (title, 641 // TODO(sync): Support updating other parts of the sync node (title,
642 // successor, parent, etc.). 642 // successor, parent, etc.).
643 return SyncError(); 643 return SyncError();
644 } 644 }
645 645
646 bool GenericChangeProcessor::SyncModelHasUserCreatedNodes(bool* has_nodes) { 646 bool GenericChangeProcessor::SyncModelHasUserCreatedNodes(bool* has_nodes) {
647 DCHECK(CalledOnValidThread()); 647 DCHECK(sequence_checker_.CalledOnValidSequence());
648 DCHECK(has_nodes); 648 DCHECK(has_nodes);
649 std::string type_name = ModelTypeToString(type_); 649 std::string type_name = ModelTypeToString(type_);
650 std::string err_str = 650 std::string err_str =
651 "Server did not create the top-level " + type_name + 651 "Server did not create the top-level " + type_name +
652 " node. We might be running against an out-of-date server."; 652 " node. We might be running against an out-of-date server.";
653 *has_nodes = false; 653 *has_nodes = false;
654 ReadTransaction trans(FROM_HERE, share_handle()); 654 ReadTransaction trans(FROM_HERE, share_handle());
655 ReadNode type_root_node(&trans); 655 ReadNode type_root_node(&trans);
656 if (type_root_node.InitTypeRoot(type_) != BaseNode::INIT_OK) { 656 if (type_root_node.InitTypeRoot(type_) != BaseNode::INIT_OK) {
657 LOG(ERROR) << err_str; 657 LOG(ERROR) << err_str;
658 return false; 658 return false;
659 } 659 }
660 660
661 // The sync model has user created nodes if the type's root node has any 661 // The sync model has user created nodes if the type's root node has any
662 // children. 662 // children.
663 *has_nodes = type_root_node.HasChildren(); 663 *has_nodes = type_root_node.HasChildren();
664 return true; 664 return true;
665 } 665 }
666 666
667 bool GenericChangeProcessor::CryptoReadyIfNecessary() { 667 bool GenericChangeProcessor::CryptoReadyIfNecessary() {
668 DCHECK(CalledOnValidThread()); 668 DCHECK(sequence_checker_.CalledOnValidSequence());
669 // We only access the cryptographer while holding a transaction. 669 // We only access the cryptographer while holding a transaction.
670 ReadTransaction trans(FROM_HERE, share_handle()); 670 ReadTransaction trans(FROM_HERE, share_handle());
671 const ModelTypeSet encrypted_types = trans.GetEncryptedTypes(); 671 const ModelTypeSet encrypted_types = trans.GetEncryptedTypes();
672 return !encrypted_types.Has(type_) || trans.GetCryptographer()->is_ready(); 672 return !encrypted_types.Has(type_) || trans.GetCryptographer()->is_ready();
673 } 673 }
674 674
675 void GenericChangeProcessor::StartImpl() {} 675 void GenericChangeProcessor::StartImpl() {}
676 676
677 UserShare* GenericChangeProcessor::share_handle() const { 677 UserShare* GenericChangeProcessor::share_handle() const {
678 DCHECK(CalledOnValidThread()); 678 DCHECK(sequence_checker_.CalledOnValidSequence());
679 return share_handle_; 679 return share_handle_;
680 } 680 }
681 681
682 void GenericChangeProcessor::UploadAllAttachmentsNotOnServer() { 682 void GenericChangeProcessor::UploadAllAttachmentsNotOnServer() {
683 DCHECK(CalledOnValidThread()); 683 DCHECK(sequence_checker_.CalledOnValidSequence());
684 DCHECK(attachment_service_.get()); 684 DCHECK(attachment_service_.get());
685 AttachmentIdList ids; 685 AttachmentIdList ids;
686 { 686 {
687 ReadTransaction trans(FROM_HERE, share_handle()); 687 ReadTransaction trans(FROM_HERE, share_handle());
688 trans.GetAttachmentIdsToUpload(type_, &ids); 688 trans.GetAttachmentIdsToUpload(type_, &ids);
689 } 689 }
690 if (!ids.empty()) { 690 if (!ids.empty()) {
691 attachment_service_->UploadAttachments(ids); 691 attachment_service_->UploadAttachments(ids);
692 } 692 }
693 } 693 }
694 694
695 void GenericChangeProcessor::NotifyLocalChangeObservers( 695 void GenericChangeProcessor::NotifyLocalChangeObservers(
696 const syncable::Entry* current_entry, 696 const syncable::Entry* current_entry,
697 const SyncChange& change) { 697 const SyncChange& change) {
698 for (auto& observer : local_change_observers_) 698 for (auto& observer : local_change_observers_)
699 observer.OnLocalChange(current_entry, change); 699 observer.OnLocalChange(current_entry, change);
700 } 700 }
701 701
702 std::unique_ptr<AttachmentService> 702 std::unique_ptr<AttachmentService>
703 GenericChangeProcessor::GetAttachmentService() const { 703 GenericChangeProcessor::GetAttachmentService() const {
704 return std::unique_ptr<AttachmentService>( 704 return std::unique_ptr<AttachmentService>(
705 new AttachmentServiceProxy(attachment_service_proxy_)); 705 new AttachmentServiceProxy(attachment_service_proxy_));
706 } 706 }
707 707
708 } // namespace syncer 708 } // namespace syncer
OLDNEW
« no previous file with comments | « components/sync/driver/generic_change_processor.h ('k') | components/sync/driver/shared_change_processor.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698