Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(241)

Side by Side Diff: components/sync/core_impl/sync_manager_impl.cc

Issue 2413313004: [Sync] Move the last things out of core/. (Closed)
Patch Set: Address comments. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/sync/core_impl/sync_manager_impl.h"
6
7 #include <stddef.h>
8
9 #include <utility>
10
11 #include "base/base64.h"
12 #include "base/bind.h"
13 #include "base/callback.h"
14 #include "base/compiler_specific.h"
15 #include "base/json/json_writer.h"
16 #include "base/memory/ptr_util.h"
17 #include "base/memory/ref_counted.h"
18 #include "base/metrics/histogram_macros.h"
19 #include "base/observer_list.h"
20 #include "base/strings/string_number_conversions.h"
21 #include "base/threading/thread_task_runner_handle.h"
22 #include "base/values.h"
23 #include "components/sync/base/cancelation_signal.h"
24 #include "components/sync/base/experiments.h"
25 #include "components/sync/base/invalidation_interface.h"
26 #include "components/sync/base/model_type.h"
27 #include "components/sync/core/configure_reason.h"
28 #include "components/sync/core/internal_components_factory.h"
29 #include "components/sync/core_impl/model_type_connector_proxy.h"
30 #include "components/sync/core_impl/syncapi_internal.h"
31 #include "components/sync/engine/net/http_post_provider_factory.h"
32 #include "components/sync/engine/polling_constants.h"
33 #include "components/sync/engine_impl/cycle/directory_type_debug_info_emitter.h"
34 #include "components/sync/engine_impl/net/sync_server_connection_manager.h"
35 #include "components/sync/engine_impl/sync_scheduler.h"
36 #include "components/sync/engine_impl/syncer_types.h"
37 #include "components/sync/protocol/proto_value_conversions.h"
38 #include "components/sync/protocol/sync.pb.h"
39 #include "components/sync/syncable/base_node.h"
40 #include "components/sync/syncable/directory.h"
41 #include "components/sync/syncable/entry.h"
42 #include "components/sync/syncable/in_memory_directory_backing_store.h"
43 #include "components/sync/syncable/on_disk_directory_backing_store.h"
44 #include "components/sync/syncable/read_node.h"
45 #include "components/sync/syncable/read_transaction.h"
46 #include "components/sync/syncable/write_node.h"
47 #include "components/sync/syncable/write_transaction.h"
48
49 using base::TimeDelta;
50 using sync_pb::GetUpdatesCallerInfo;
51
52 class GURL;
53
54 namespace syncer {
55
56 using syncable::ImmutableWriteTransactionInfo;
57 using syncable::SPECIFICS;
58 using syncable::UNIQUE_POSITION;
59
60 namespace {
61
62 GetUpdatesCallerInfo::GetUpdatesSource GetSourceFromReason(
63 ConfigureReason reason) {
64 switch (reason) {
65 case CONFIGURE_REASON_RECONFIGURATION:
66 return GetUpdatesCallerInfo::RECONFIGURATION;
67 case CONFIGURE_REASON_MIGRATION:
68 return GetUpdatesCallerInfo::MIGRATION;
69 case CONFIGURE_REASON_NEW_CLIENT:
70 return GetUpdatesCallerInfo::NEW_CLIENT;
71 case CONFIGURE_REASON_NEWLY_ENABLED_DATA_TYPE:
72 case CONFIGURE_REASON_CRYPTO:
73 case CONFIGURE_REASON_CATCH_UP:
74 return GetUpdatesCallerInfo::NEWLY_SUPPORTED_DATATYPE;
75 case CONFIGURE_REASON_PROGRAMMATIC:
76 return GetUpdatesCallerInfo::PROGRAMMATIC;
77 case CONFIGURE_REASON_UNKNOWN:
78 NOTREACHED();
79 }
80 return GetUpdatesCallerInfo::UNKNOWN;
81 }
82
83 } // namespace
84
85 SyncManagerImpl::SyncManagerImpl(const std::string& name)
86 : name_(name),
87 change_delegate_(NULL),
88 initialized_(false),
89 observing_network_connectivity_changes_(false),
90 weak_ptr_factory_(this) {
91 // Pre-fill |notification_info_map_|.
92 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
93 notification_info_map_.insert(
94 std::make_pair(ModelTypeFromInt(i), NotificationInfo()));
95 }
96 }
97
98 SyncManagerImpl::~SyncManagerImpl() {
99 DCHECK(thread_checker_.CalledOnValidThread());
100 CHECK(!initialized_);
101 }
102
103 SyncManagerImpl::NotificationInfo::NotificationInfo() : total_count(0) {}
104 SyncManagerImpl::NotificationInfo::~NotificationInfo() {}
105
106 base::DictionaryValue* SyncManagerImpl::NotificationInfo::ToValue() const {
107 base::DictionaryValue* value = new base::DictionaryValue();
108 value->SetInteger("totalCount", total_count);
109 value->SetString("payload", payload);
110 return value;
111 }
112
113 bool SyncManagerImpl::VisiblePositionsDiffer(
114 const syncable::EntryKernelMutation& mutation) const {
115 const syncable::EntryKernel& a = mutation.original;
116 const syncable::EntryKernel& b = mutation.mutated;
117 if (!b.ShouldMaintainPosition())
118 return false;
119 if (!a.ref(UNIQUE_POSITION).Equals(b.ref(UNIQUE_POSITION)))
120 return true;
121 if (a.ref(syncable::PARENT_ID) != b.ref(syncable::PARENT_ID))
122 return true;
123 return false;
124 }
125
126 bool SyncManagerImpl::VisiblePropertiesDiffer(
127 const syncable::EntryKernelMutation& mutation,
128 Cryptographer* cryptographer) const {
129 const syncable::EntryKernel& a = mutation.original;
130 const syncable::EntryKernel& b = mutation.mutated;
131 const sync_pb::EntitySpecifics& a_specifics = a.ref(SPECIFICS);
132 const sync_pb::EntitySpecifics& b_specifics = b.ref(SPECIFICS);
133 DCHECK_EQ(GetModelTypeFromSpecifics(a_specifics),
134 GetModelTypeFromSpecifics(b_specifics));
135 ModelType model_type = GetModelTypeFromSpecifics(b_specifics);
136 // Suppress updates to items that aren't tracked by any browser model.
137 if (model_type < FIRST_REAL_MODEL_TYPE ||
138 !a.ref(syncable::UNIQUE_SERVER_TAG).empty()) {
139 return false;
140 }
141 if (a.ref(syncable::IS_DIR) != b.ref(syncable::IS_DIR))
142 return true;
143 if (!AreSpecificsEqual(cryptographer, a.ref(syncable::SPECIFICS),
144 b.ref(syncable::SPECIFICS))) {
145 return true;
146 }
147 if (!AreAttachmentMetadataEqual(a.ref(syncable::ATTACHMENT_METADATA),
148 b.ref(syncable::ATTACHMENT_METADATA))) {
149 return true;
150 }
151 // We only care if the name has changed if neither specifics is encrypted
152 // (encrypted nodes blow away the NON_UNIQUE_NAME).
153 if (!a_specifics.has_encrypted() && !b_specifics.has_encrypted() &&
154 a.ref(syncable::NON_UNIQUE_NAME) != b.ref(syncable::NON_UNIQUE_NAME))
155 return true;
156 if (VisiblePositionsDiffer(mutation))
157 return true;
158 return false;
159 }
160
161 ModelTypeSet SyncManagerImpl::InitialSyncEndedTypes() {
162 DCHECK(initialized_);
163 return model_type_registry_->GetInitialSyncEndedTypes();
164 }
165
166 ModelTypeSet SyncManagerImpl::GetTypesWithEmptyProgressMarkerToken(
167 ModelTypeSet types) {
168 ModelTypeSet result;
169 for (ModelTypeSet::Iterator i = types.First(); i.Good(); i.Inc()) {
170 sync_pb::DataTypeProgressMarker marker;
171 directory()->GetDownloadProgress(i.Get(), &marker);
172
173 if (marker.token().empty())
174 result.Put(i.Get());
175 }
176 return result;
177 }
178
179 void SyncManagerImpl::ConfigureSyncer(
180 ConfigureReason reason,
181 ModelTypeSet to_download,
182 ModelTypeSet to_purge,
183 ModelTypeSet to_journal,
184 ModelTypeSet to_unapply,
185 const ModelSafeRoutingInfo& new_routing_info,
186 const base::Closure& ready_task,
187 const base::Closure& retry_task) {
188 DCHECK(thread_checker_.CalledOnValidThread());
189 DCHECK(!ready_task.is_null());
190 DCHECK(initialized_);
191
192 DVLOG(1) << "Configuring -"
193 << "\n\t"
194 << "current types: "
195 << ModelTypeSetToString(GetRoutingInfoTypes(new_routing_info))
196 << "\n\t"
197 << "types to download: " << ModelTypeSetToString(to_download)
198 << "\n\t"
199 << "types to purge: " << ModelTypeSetToString(to_purge) << "\n\t"
200 << "types to journal: " << ModelTypeSetToString(to_journal) << "\n\t"
201 << "types to unapply: " << ModelTypeSetToString(to_unapply);
202 if (!PurgeDisabledTypes(to_purge, to_journal, to_unapply)) {
203 // We failed to cleanup the types. Invoke the ready task without actually
204 // configuring any types. The caller should detect this as a configuration
205 // failure and act appropriately.
206 ready_task.Run();
207 return;
208 }
209
210 ConfigurationParams params(GetSourceFromReason(reason), to_download,
211 new_routing_info, ready_task, retry_task);
212
213 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE, base::Time());
214 scheduler_->ScheduleConfiguration(params);
215 }
216
217 void SyncManagerImpl::Init(InitArgs* args) {
218 CHECK(!initialized_);
219 DCHECK(thread_checker_.CalledOnValidThread());
220 DCHECK(args->post_factory.get());
221 DCHECK(!args->credentials.account_id.empty());
222 DCHECK(!args->credentials.sync_token.empty());
223 DCHECK(!args->credentials.scope_set.empty());
224 DCHECK(args->cancelation_signal);
225 DVLOG(1) << "SyncManager starting Init...";
226
227 weak_handle_this_ = MakeWeakHandle(weak_ptr_factory_.GetWeakPtr());
228
229 change_delegate_ = args->change_delegate;
230
231 AddObserver(&js_sync_manager_observer_);
232 SetJsEventHandler(args->event_handler);
233
234 AddObserver(&debug_info_event_listener_);
235
236 database_path_ = args->database_location.Append(
237 syncable::Directory::kSyncDatabaseFilename);
238 report_unrecoverable_error_function_ =
239 args->report_unrecoverable_error_function;
240
241 allstatus_.SetHasKeystoreKey(
242 !args->restored_keystore_key_for_bootstrapping.empty());
243 sync_encryption_handler_.reset(new SyncEncryptionHandlerImpl(
244 &share_, args->encryptor, args->restored_key_for_bootstrapping,
245 args->restored_keystore_key_for_bootstrapping));
246 sync_encryption_handler_->AddObserver(this);
247 sync_encryption_handler_->AddObserver(&debug_info_event_listener_);
248 sync_encryption_handler_->AddObserver(&js_sync_encryption_handler_observer_);
249
250 base::FilePath absolute_db_path = database_path_;
251 DCHECK(absolute_db_path.IsAbsolute());
252
253 std::unique_ptr<syncable::DirectoryBackingStore> backing_store =
254 args->internal_components_factory->BuildDirectoryBackingStore(
255 InternalComponentsFactory::STORAGE_ON_DISK,
256 args->credentials.account_id, absolute_db_path);
257
258 DCHECK(backing_store.get());
259 share_.directory.reset(new syncable::Directory(
260 backing_store.release(), args->unrecoverable_error_handler,
261 report_unrecoverable_error_function_, sync_encryption_handler_.get(),
262 sync_encryption_handler_->GetCryptographerUnsafe()));
263 share_.sync_credentials = args->credentials;
264
265 // UserShare is accessible to a lot of code that doesn't need access to the
266 // sync token so clear sync_token from the UserShare.
267 share_.sync_credentials.sync_token = "";
268
269 DVLOG(1) << "Username: " << args->credentials.email;
270 DVLOG(1) << "AccountId: " << args->credentials.account_id;
271 if (!OpenDirectory(args->credentials.account_id)) {
272 NotifyInitializationFailure();
273 LOG(ERROR) << "Sync manager initialization failed!";
274 return;
275 }
276
277 // Now that we have opened the Directory we can restore any previously saved
278 // nigori specifics.
279 if (args->saved_nigori_state) {
280 sync_encryption_handler_->RestoreNigori(*args->saved_nigori_state);
281 args->saved_nigori_state.reset();
282 }
283
284 connection_manager_.reset(new SyncServerConnectionManager(
285 args->service_url.host() + args->service_url.path(),
286 args->service_url.EffectiveIntPort(),
287 args->service_url.SchemeIsCryptographic(), args->post_factory.release(),
288 args->cancelation_signal));
289 connection_manager_->set_client_id(directory()->cache_guid());
290 connection_manager_->AddListener(this);
291
292 std::string sync_id = directory()->cache_guid();
293
294 DVLOG(1) << "Setting sync client ID: " << sync_id;
295 allstatus_.SetSyncId(sync_id);
296 DVLOG(1) << "Setting invalidator client ID: " << args->invalidator_client_id;
297 allstatus_.SetInvalidatorClientId(args->invalidator_client_id);
298
299 model_type_registry_.reset(
300 new ModelTypeRegistry(args->workers, directory(), this));
301 sync_encryption_handler_->AddObserver(model_type_registry_.get());
302
303 // Build a SyncCycleContext and store the worker in it.
304 DVLOG(1) << "Sync is bringing up SyncCycleContext.";
305 std::vector<SyncEngineEventListener*> listeners;
306 listeners.push_back(&allstatus_);
307 listeners.push_back(this);
308 cycle_context_ = args->internal_components_factory->BuildContext(
309 connection_manager_.get(), directory(), args->extensions_activity,
310 listeners, &debug_info_event_listener_, model_type_registry_.get(),
311 args->invalidator_client_id);
312 scheduler_ = args->internal_components_factory->BuildScheduler(
313 name_, cycle_context_.get(), args->cancelation_signal);
314
315 scheduler_->Start(SyncScheduler::CONFIGURATION_MODE, base::Time());
316
317 initialized_ = true;
318
319 net::NetworkChangeNotifier::AddIPAddressObserver(this);
320 net::NetworkChangeNotifier::AddConnectionTypeObserver(this);
321 observing_network_connectivity_changes_ = true;
322
323 UpdateCredentials(args->credentials);
324
325 NotifyInitializationSuccess();
326 }
327
328 void SyncManagerImpl::NotifyInitializationSuccess() {
329 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
330 OnInitializationComplete(
331 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
332 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
333 true, InitialSyncEndedTypes()));
334 }
335
336 void SyncManagerImpl::NotifyInitializationFailure() {
337 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
338 OnInitializationComplete(
339 MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
340 MakeWeakHandle(debug_info_event_listener_.GetWeakPtr()),
341 false, ModelTypeSet()));
342 }
343
344 void SyncManagerImpl::OnPassphraseRequired(
345 PassphraseRequiredReason reason,
346 const sync_pb::EncryptedData& pending_keys) {
347 // Does nothing.
348 }
349
350 void SyncManagerImpl::OnPassphraseAccepted() {
351 // Does nothing.
352 }
353
354 void SyncManagerImpl::OnBootstrapTokenUpdated(
355 const std::string& bootstrap_token,
356 BootstrapTokenType type) {
357 if (type == KEYSTORE_BOOTSTRAP_TOKEN)
358 allstatus_.SetHasKeystoreKey(true);
359 }
360
361 void SyncManagerImpl::OnEncryptedTypesChanged(ModelTypeSet encrypted_types,
362 bool encrypt_everything) {
363 allstatus_.SetEncryptedTypes(encrypted_types);
364 }
365
366 void SyncManagerImpl::OnEncryptionComplete() {
367 // Does nothing.
368 }
369
370 void SyncManagerImpl::OnCryptographerStateChanged(
371 Cryptographer* cryptographer) {
372 allstatus_.SetCryptographerReady(cryptographer->is_ready());
373 allstatus_.SetCryptoHasPendingKeys(cryptographer->has_pending_keys());
374 allstatus_.SetKeystoreMigrationTime(
375 sync_encryption_handler_->migration_time());
376 }
377
378 void SyncManagerImpl::OnPassphraseTypeChanged(
379 PassphraseType type,
380 base::Time explicit_passphrase_time) {
381 allstatus_.SetPassphraseType(type);
382 allstatus_.SetKeystoreMigrationTime(
383 sync_encryption_handler_->migration_time());
384 }
385
386 void SyncManagerImpl::OnLocalSetPassphraseEncryption(
387 const SyncEncryptionHandler::NigoriState& nigori_state) {}
388
389 void SyncManagerImpl::StartSyncingNormally(
390 const ModelSafeRoutingInfo& routing_info,
391 base::Time last_poll_time) {
392 // Start the sync scheduler.
393 // TODO(sync): We always want the newest set of routes when we switch back
394 // to normal mode. Figure out how to enforce set_routing_info is always
395 // appropriately set and that it's only modified when switching to normal
396 // mode.
397 DCHECK(thread_checker_.CalledOnValidThread());
398 cycle_context_->SetRoutingInfo(routing_info);
399 scheduler_->Start(SyncScheduler::NORMAL_MODE, last_poll_time);
400 }
401
402 syncable::Directory* SyncManagerImpl::directory() {
403 return share_.directory.get();
404 }
405
406 const SyncScheduler* SyncManagerImpl::scheduler() const {
407 return scheduler_.get();
408 }
409
410 bool SyncManagerImpl::GetHasInvalidAuthTokenForTest() const {
411 return connection_manager_->HasInvalidAuthToken();
412 }
413
414 bool SyncManagerImpl::OpenDirectory(const std::string& username) {
415 DCHECK(!initialized_) << "Should only happen once";
416
417 // Set before Open().
418 change_observer_ = MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr());
419 WeakHandle<syncable::TransactionObserver> transaction_observer(
420 MakeWeakHandle(js_mutation_event_observer_.AsWeakPtr()));
421
422 syncable::DirOpenResult open_result = syncable::NOT_INITIALIZED;
423 open_result = directory()->Open(username, this, transaction_observer);
424 if (open_result != syncable::OPENED) {
425 LOG(ERROR) << "Could not open share for:" << username;
426 return false;
427 }
428
429 // Unapplied datatypes (those that do not have initial sync ended set) get
430 // re-downloaded during any configuration. But, it's possible for a datatype
431 // to have a progress marker but not have initial sync ended yet, making
432 // it a candidate for migration. This is a problem, as the DataTypeManager
433 // does not support a migration while it's already in the middle of a
434 // configuration. As a result, any partially synced datatype can stall the
435 // DTM, waiting for the configuration to complete, which it never will due
436 // to the migration error. In addition, a partially synced nigori will
437 // trigger the migration logic before the backend is initialized, resulting
438 // in crashes. We therefore detect and purge any partially synced types as
439 // part of initialization.
440 if (!PurgePartiallySyncedTypes())
441 return false;
442
443 return true;
444 }
445
446 bool SyncManagerImpl::PurgePartiallySyncedTypes() {
447 ModelTypeSet partially_synced_types = ModelTypeSet::All();
448 partially_synced_types.RemoveAll(directory()->InitialSyncEndedTypes());
449 partially_synced_types.RemoveAll(
450 GetTypesWithEmptyProgressMarkerToken(ModelTypeSet::All()));
451
452 DVLOG(1) << "Purging partially synced types "
453 << ModelTypeSetToString(partially_synced_types);
454 UMA_HISTOGRAM_COUNTS("Sync.PartiallySyncedTypes",
455 partially_synced_types.Size());
456 if (partially_synced_types.Empty())
457 return true;
458 return directory()->PurgeEntriesWithTypeIn(partially_synced_types,
459 ModelTypeSet(), ModelTypeSet());
460 }
461
462 bool SyncManagerImpl::PurgeDisabledTypes(ModelTypeSet to_purge,
463 ModelTypeSet to_journal,
464 ModelTypeSet to_unapply) {
465 if (to_purge.Empty())
466 return true;
467 DVLOG(1) << "Purging disabled types " << ModelTypeSetToString(to_purge);
468 DCHECK(to_purge.HasAll(to_journal));
469 DCHECK(to_purge.HasAll(to_unapply));
470 return directory()->PurgeEntriesWithTypeIn(to_purge, to_journal, to_unapply);
471 }
472
473 void SyncManagerImpl::UpdateCredentials(const SyncCredentials& credentials) {
474 DCHECK(thread_checker_.CalledOnValidThread());
475 DCHECK(initialized_);
476 DCHECK(!credentials.account_id.empty());
477 DCHECK(!credentials.sync_token.empty());
478 DCHECK(!credentials.scope_set.empty());
479 cycle_context_->set_account_name(credentials.email);
480
481 observing_network_connectivity_changes_ = true;
482 if (!connection_manager_->SetAuthToken(credentials.sync_token))
483 return; // Auth token is known to be invalid, so exit early.
484
485 scheduler_->OnCredentialsUpdated();
486
487 // TODO(zea): pass the credential age to the debug info event listener.
488 }
489
490 void SyncManagerImpl::AddObserver(SyncManager::Observer* observer) {
491 DCHECK(thread_checker_.CalledOnValidThread());
492 observers_.AddObserver(observer);
493 }
494
495 void SyncManagerImpl::RemoveObserver(SyncManager::Observer* observer) {
496 DCHECK(thread_checker_.CalledOnValidThread());
497 observers_.RemoveObserver(observer);
498 }
499
500 void SyncManagerImpl::ShutdownOnSyncThread(ShutdownReason reason) {
501 DCHECK(thread_checker_.CalledOnValidThread());
502
503 // Prevent any in-flight method calls from running. Also
504 // invalidates |weak_handle_this_| and |change_observer_|.
505 weak_ptr_factory_.InvalidateWeakPtrs();
506 js_mutation_event_observer_.InvalidateWeakPtrs();
507
508 scheduler_.reset();
509 cycle_context_.reset();
510
511 if (model_type_registry_)
512 sync_encryption_handler_->RemoveObserver(model_type_registry_.get());
513
514 model_type_registry_.reset();
515
516 if (sync_encryption_handler_) {
517 sync_encryption_handler_->RemoveObserver(&debug_info_event_listener_);
518 sync_encryption_handler_->RemoveObserver(this);
519 }
520
521 SetJsEventHandler(WeakHandle<JsEventHandler>());
522 RemoveObserver(&js_sync_manager_observer_);
523
524 RemoveObserver(&debug_info_event_listener_);
525
526 // |connection_manager_| may end up being NULL here in tests (in synchronous
527 // initialization mode).
528 //
529 // TODO(akalin): Fix this behavior.
530 if (connection_manager_)
531 connection_manager_->RemoveListener(this);
532 connection_manager_.reset();
533
534 net::NetworkChangeNotifier::RemoveIPAddressObserver(this);
535 net::NetworkChangeNotifier::RemoveConnectionTypeObserver(this);
536 observing_network_connectivity_changes_ = false;
537
538 if (initialized_ && directory()) {
539 directory()->SaveChanges();
540 }
541
542 share_.directory.reset();
543
544 change_delegate_ = NULL;
545
546 initialized_ = false;
547
548 // We reset these here, since only now we know they will not be
549 // accessed from other threads (since we shut down everything).
550 change_observer_.Reset();
551 weak_handle_this_.Reset();
552 }
553
554 void SyncManagerImpl::OnIPAddressChanged() {
555 if (!observing_network_connectivity_changes_) {
556 DVLOG(1) << "IP address change dropped.";
557 return;
558 }
559 DVLOG(1) << "IP address change detected.";
560 OnNetworkConnectivityChangedImpl();
561 }
562
563 void SyncManagerImpl::OnConnectionTypeChanged(
564 net::NetworkChangeNotifier::ConnectionType) {
565 if (!observing_network_connectivity_changes_) {
566 DVLOG(1) << "Connection type change dropped.";
567 return;
568 }
569 DVLOG(1) << "Connection type change detected.";
570 OnNetworkConnectivityChangedImpl();
571 }
572
573 void SyncManagerImpl::OnNetworkConnectivityChangedImpl() {
574 DCHECK(thread_checker_.CalledOnValidThread());
575 scheduler_->OnConnectionStatusChange();
576 }
577
578 void SyncManagerImpl::OnServerConnectionEvent(
579 const ServerConnectionEvent& event) {
580 DCHECK(thread_checker_.CalledOnValidThread());
581 if (event.connection_code == HttpResponse::SERVER_CONNECTION_OK) {
582 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
583 OnConnectionStatusChange(CONNECTION_OK));
584 }
585
586 if (event.connection_code == HttpResponse::SYNC_AUTH_ERROR) {
587 observing_network_connectivity_changes_ = false;
588 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
589 OnConnectionStatusChange(CONNECTION_AUTH_ERROR));
590 }
591
592 if (event.connection_code == HttpResponse::SYNC_SERVER_ERROR) {
593 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
594 OnConnectionStatusChange(CONNECTION_SERVER_ERROR));
595 }
596 }
597
598 void SyncManagerImpl::HandleTransactionCompleteChangeEvent(
599 ModelTypeSet models_with_changes) {
600 // This notification happens immediately after the transaction mutex is
601 // released. This allows work to be performed without blocking other threads
602 // from acquiring a transaction.
603 if (!change_delegate_)
604 return;
605
606 // Call commit.
607 for (ModelTypeSet::Iterator it = models_with_changes.First(); it.Good();
608 it.Inc()) {
609 change_delegate_->OnChangesComplete(it.Get());
610 change_observer_.Call(
611 FROM_HERE, &SyncManager::ChangeObserver::OnChangesComplete, it.Get());
612 }
613 }
614
615 ModelTypeSet SyncManagerImpl::HandleTransactionEndingChangeEvent(
616 const ImmutableWriteTransactionInfo& write_transaction_info,
617 syncable::BaseTransaction* trans) {
618 // This notification happens immediately before a syncable WriteTransaction
619 // falls out of scope. It happens while the channel mutex is still held,
620 // and while the transaction mutex is held, so it cannot be re-entrant.
621 if (!change_delegate_ || change_records_.empty())
622 return ModelTypeSet();
623
624 // This will continue the WriteTransaction using a read only wrapper.
625 // This is the last chance for read to occur in the WriteTransaction
626 // that's closing. This special ReadTransaction will not close the
627 // underlying transaction.
628 ReadTransaction read_trans(GetUserShare(), trans);
629
630 ModelTypeSet models_with_changes;
631 for (ChangeRecordMap::const_iterator it = change_records_.begin();
632 it != change_records_.end(); ++it) {
633 DCHECK(!it->second.Get().empty());
634 ModelType type = ModelTypeFromInt(it->first);
635 change_delegate_->OnChangesApplied(
636 type, trans->directory()->GetTransactionVersion(type), &read_trans,
637 it->second);
638 change_observer_.Call(FROM_HERE,
639 &SyncManager::ChangeObserver::OnChangesApplied, type,
640 write_transaction_info.Get().id, it->second);
641 models_with_changes.Put(type);
642 }
643 change_records_.clear();
644 return models_with_changes;
645 }
646
647 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncApi(
648 const ImmutableWriteTransactionInfo& write_transaction_info,
649 syncable::BaseTransaction* trans,
650 std::vector<int64_t>* entries_changed) {
651 // We have been notified about a user action changing a sync model.
652 LOG_IF(WARNING, !change_records_.empty())
653 << "CALCULATE_CHANGES called with unapplied old changes.";
654
655 // The mutated model type, or UNSPECIFIED if nothing was mutated.
656 ModelTypeSet mutated_model_types;
657
658 const syncable::ImmutableEntryKernelMutationMap& mutations =
659 write_transaction_info.Get().mutations;
660 for (syncable::EntryKernelMutationMap::const_iterator it =
661 mutations.Get().begin();
662 it != mutations.Get().end(); ++it) {
663 if (!it->second.mutated.ref(syncable::IS_UNSYNCED)) {
664 continue;
665 }
666
667 ModelType model_type =
668 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
669 if (model_type < FIRST_REAL_MODEL_TYPE) {
670 NOTREACHED() << "Permanent or underspecified item changed via syncapi.";
671 continue;
672 }
673
674 // Found real mutation.
675 if (model_type != UNSPECIFIED) {
676 mutated_model_types.Put(model_type);
677 entries_changed->push_back(it->second.mutated.ref(syncable::META_HANDLE));
678 }
679 }
680
681 // Nudge if necessary.
682 if (!mutated_model_types.Empty()) {
683 if (weak_handle_this_.IsInitialized()) {
684 weak_handle_this_.Call(FROM_HERE,
685 &SyncManagerImpl::RequestNudgeForDataTypes,
686 FROM_HERE, mutated_model_types);
687 } else {
688 NOTREACHED();
689 }
690 }
691 }
692
693 void SyncManagerImpl::SetExtraChangeRecordData(
694 int64_t id,
695 ModelType type,
696 ChangeReorderBuffer* buffer,
697 Cryptographer* cryptographer,
698 const syncable::EntryKernel& original,
699 bool existed_before,
700 bool exists_now) {
701 // If this is a deletion and the datatype was encrypted, we need to decrypt it
702 // and attach it to the buffer.
703 if (!exists_now && existed_before) {
704 sync_pb::EntitySpecifics original_specifics(original.ref(SPECIFICS));
705 if (type == PASSWORDS) {
706 // Passwords must use their own legacy ExtraPasswordChangeRecordData.
707 std::unique_ptr<sync_pb::PasswordSpecificsData> data(
708 DecryptPasswordSpecifics(original_specifics, cryptographer));
709 if (!data) {
710 NOTREACHED();
711 return;
712 }
713 buffer->SetExtraDataForId(id, new ExtraPasswordChangeRecordData(*data));
714 } else if (original_specifics.has_encrypted()) {
715 // All other datatypes can just create a new unencrypted specifics and
716 // attach it.
717 const sync_pb::EncryptedData& encrypted = original_specifics.encrypted();
718 if (!cryptographer->Decrypt(encrypted, &original_specifics)) {
719 NOTREACHED();
720 return;
721 }
722 }
723 buffer->SetSpecificsForId(id, original_specifics);
724 }
725 }
726
727 void SyncManagerImpl::HandleCalculateChangesChangeEventFromSyncer(
728 const ImmutableWriteTransactionInfo& write_transaction_info,
729 syncable::BaseTransaction* trans,
730 std::vector<int64_t>* entries_changed) {
731 // We only expect one notification per sync step, so change_buffers_ should
732 // contain no pending entries.
733 LOG_IF(WARNING, !change_records_.empty())
734 << "CALCULATE_CHANGES called with unapplied old changes.";
735
736 ChangeReorderBuffer change_buffers[MODEL_TYPE_COUNT];
737
738 Cryptographer* crypto = directory()->GetCryptographer(trans);
739 const syncable::ImmutableEntryKernelMutationMap& mutations =
740 write_transaction_info.Get().mutations;
741 for (syncable::EntryKernelMutationMap::const_iterator it =
742 mutations.Get().begin();
743 it != mutations.Get().end(); ++it) {
744 bool existed_before = !it->second.original.ref(syncable::IS_DEL);
745 bool exists_now = !it->second.mutated.ref(syncable::IS_DEL);
746
747 // Omit items that aren't associated with a model.
748 ModelType type =
749 GetModelTypeFromSpecifics(it->second.mutated.ref(SPECIFICS));
750 if (type < FIRST_REAL_MODEL_TYPE)
751 continue;
752
753 int64_t handle = it->first;
754 if (exists_now && !existed_before)
755 change_buffers[type].PushAddedItem(handle);
756 else if (!exists_now && existed_before)
757 change_buffers[type].PushDeletedItem(handle);
758 else if (exists_now && existed_before &&
759 VisiblePropertiesDiffer(it->second, crypto))
760 change_buffers[type].PushUpdatedItem(handle);
761
762 SetExtraChangeRecordData(handle, type, &change_buffers[type], crypto,
763 it->second.original, existed_before, exists_now);
764 }
765
766 ReadTransaction read_trans(GetUserShare(), trans);
767 for (int i = FIRST_REAL_MODEL_TYPE; i < MODEL_TYPE_COUNT; ++i) {
768 if (!change_buffers[i].IsEmpty()) {
769 if (change_buffers[i].GetAllChangesInTreeOrder(&read_trans,
770 &(change_records_[i]))) {
771 for (size_t j = 0; j < change_records_[i].Get().size(); ++j)
772 entries_changed->push_back((change_records_[i].Get())[j].id);
773 }
774 if (change_records_[i].Get().empty())
775 change_records_.erase(i);
776 }
777 }
778 }
779
780 void SyncManagerImpl::RequestNudgeForDataTypes(
781 const tracked_objects::Location& nudge_location,
782 ModelTypeSet types) {
783 debug_info_event_listener_.OnNudgeFromDatatype(types.First().Get());
784
785 scheduler_->ScheduleLocalNudge(types, nudge_location);
786 }
787
788 void SyncManagerImpl::NudgeForInitialDownload(ModelType type) {
789 DCHECK(thread_checker_.CalledOnValidThread());
790 scheduler_->ScheduleInitialSyncNudge(type);
791 }
792
793 void SyncManagerImpl::NudgeForCommit(ModelType type) {
794 DCHECK(thread_checker_.CalledOnValidThread());
795 RequestNudgeForDataTypes(FROM_HERE, ModelTypeSet(type));
796 }
797
798 void SyncManagerImpl::NudgeForRefresh(ModelType type) {
799 DCHECK(thread_checker_.CalledOnValidThread());
800 RefreshTypes(ModelTypeSet(type));
801 }
802
803 void SyncManagerImpl::OnSyncCycleEvent(const SyncCycleEvent& event) {
804 DCHECK(thread_checker_.CalledOnValidThread());
805 // Only send an event if this is due to a cycle ending and this cycle
806 // concludes a canonical "sync" process; that is, based on what is known
807 // locally we are "all happy" and up to date. There may be new changes on
808 // the server, but we'll get them on a subsequent sync.
809 //
810 // Notifications are sent at the end of every sync cycle, regardless of
811 // whether we should sync again.
812 if (event.what_happened == SyncCycleEvent::SYNC_CYCLE_ENDED) {
813 if (!initialized_) {
814 DVLOG(1) << "OnSyncCycleCompleted not sent because sync api is not "
815 << "initialized";
816 return;
817 }
818
819 DVLOG(1) << "Sending OnSyncCycleCompleted";
820 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
821 OnSyncCycleCompleted(event.snapshot));
822 }
823 }
824
825 void SyncManagerImpl::OnActionableError(const SyncProtocolError& error) {
826 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
827 OnActionableError(error));
828 }
829
830 void SyncManagerImpl::OnRetryTimeChanged(base::Time) {}
831
832 void SyncManagerImpl::OnThrottledTypesChanged(ModelTypeSet) {}
833
834 void SyncManagerImpl::OnMigrationRequested(ModelTypeSet types) {
835 FOR_EACH_OBSERVER(SyncManager::Observer, observers_,
836 OnMigrationRequested(types));
837 }
838
839 void SyncManagerImpl::OnProtocolEvent(const ProtocolEvent& event) {
840 protocol_event_buffer_.RecordProtocolEvent(event);
841 FOR_EACH_OBSERVER(SyncManager::Observer, observers_, OnProtocolEvent(event));
842 }
843
844 void SyncManagerImpl::SetJsEventHandler(
845 const WeakHandle<JsEventHandler>& event_handler) {
846 js_sync_manager_observer_.SetJsEventHandler(event_handler);
847 js_mutation_event_observer_.SetJsEventHandler(event_handler);
848 js_sync_encryption_handler_observer_.SetJsEventHandler(event_handler);
849 }
850
851 void SyncManagerImpl::SetInvalidatorEnabled(bool invalidator_enabled) {
852 DCHECK(thread_checker_.CalledOnValidThread());
853
854 DVLOG(1) << "Invalidator enabled state is now: " << invalidator_enabled;
855 allstatus_.SetNotificationsEnabled(invalidator_enabled);
856 scheduler_->SetNotificationsEnabled(invalidator_enabled);
857 }
858
859 void SyncManagerImpl::OnIncomingInvalidation(
860 ModelType type,
861 std::unique_ptr<InvalidationInterface> invalidation) {
862 DCHECK(thread_checker_.CalledOnValidThread());
863
864 allstatus_.IncrementNotificationsReceived();
865 scheduler_->ScheduleInvalidationNudge(type, std::move(invalidation),
866 FROM_HERE);
867 }
868
869 void SyncManagerImpl::RefreshTypes(ModelTypeSet types) {
870 DCHECK(thread_checker_.CalledOnValidThread());
871 if (types.Empty()) {
872 LOG(WARNING) << "Sync received refresh request with no types specified.";
873 } else {
874 scheduler_->ScheduleLocalRefreshRequest(types, FROM_HERE);
875 }
876 }
877
878 SyncStatus SyncManagerImpl::GetDetailedStatus() const {
879 return allstatus_.status();
880 }
881
882 void SyncManagerImpl::SaveChanges() {
883 directory()->SaveChanges();
884 }
885
886 UserShare* SyncManagerImpl::GetUserShare() {
887 DCHECK(initialized_);
888 return &share_;
889 }
890
891 std::unique_ptr<ModelTypeConnector>
892 SyncManagerImpl::GetModelTypeConnectorProxy() {
893 DCHECK(initialized_);
894 return base::MakeUnique<ModelTypeConnectorProxy>(
895 base::ThreadTaskRunnerHandle::Get(), model_type_registry_->AsWeakPtr());
896 }
897
898 const std::string SyncManagerImpl::cache_guid() {
899 DCHECK(initialized_);
900 return directory()->cache_guid();
901 }
902
903 bool SyncManagerImpl::ReceivedExperiment(Experiments* experiments) {
904 ReadTransaction trans(FROM_HERE, GetUserShare());
905 ReadNode nigori_node(&trans);
906 if (nigori_node.InitTypeRoot(NIGORI) != BaseNode::INIT_OK) {
907 DVLOG(1) << "Couldn't find Nigori node.";
908 return false;
909 }
910 bool found_experiment = false;
911
912 ReadNode favicon_sync_node(&trans);
913 if (favicon_sync_node.InitByClientTagLookup(EXPERIMENTS, kFaviconSyncTag) ==
914 BaseNode::INIT_OK) {
915 experiments->favicon_sync_limit =
916 favicon_sync_node.GetExperimentsSpecifics()
917 .favicon_sync()
918 .favicon_sync_limit();
919 found_experiment = true;
920 }
921
922 ReadNode pre_commit_update_avoidance_node(&trans);
923 if (pre_commit_update_avoidance_node.InitByClientTagLookup(
924 EXPERIMENTS, kPreCommitUpdateAvoidanceTag) == BaseNode::INIT_OK) {
925 cycle_context_->set_server_enabled_pre_commit_update_avoidance(
926 pre_commit_update_avoidance_node.GetExperimentsSpecifics()
927 .pre_commit_update_avoidance()
928 .enabled());
929 // We don't bother setting found_experiment. The frontend doesn't need to
930 // know about this.
931 }
932
933 ReadNode gcm_invalidations_node(&trans);
934 if (gcm_invalidations_node.InitByClientTagLookup(
935 EXPERIMENTS, kGCMInvalidationsTag) == BaseNode::INIT_OK) {
936 const sync_pb::GcmInvalidationsFlags& gcm_invalidations =
937 gcm_invalidations_node.GetExperimentsSpecifics().gcm_invalidations();
938 if (gcm_invalidations.has_enabled()) {
939 experiments->gcm_invalidations_enabled = gcm_invalidations.enabled();
940 found_experiment = true;
941 }
942 }
943
944 return found_experiment;
945 }
946
947 bool SyncManagerImpl::HasUnsyncedItems() {
948 ReadTransaction trans(FROM_HERE, GetUserShare());
949 return (trans.GetWrappedTrans()->directory()->unsynced_entity_count() != 0);
950 }
951
952 SyncEncryptionHandler* SyncManagerImpl::GetEncryptionHandler() {
953 return sync_encryption_handler_.get();
954 }
955
956 std::vector<std::unique_ptr<ProtocolEvent>>
957 SyncManagerImpl::GetBufferedProtocolEvents() {
958 return protocol_event_buffer_.GetBufferedProtocolEvents();
959 }
960
961 void SyncManagerImpl::RegisterDirectoryTypeDebugInfoObserver(
962 TypeDebugInfoObserver* observer) {
963 model_type_registry_->RegisterDirectoryTypeDebugInfoObserver(observer);
964 }
965
966 void SyncManagerImpl::UnregisterDirectoryTypeDebugInfoObserver(
967 TypeDebugInfoObserver* observer) {
968 model_type_registry_->UnregisterDirectoryTypeDebugInfoObserver(observer);
969 }
970
971 bool SyncManagerImpl::HasDirectoryTypeDebugInfoObserver(
972 TypeDebugInfoObserver* observer) {
973 return model_type_registry_->HasDirectoryTypeDebugInfoObserver(observer);
974 }
975
976 void SyncManagerImpl::RequestEmitDebugInfo() {
977 model_type_registry_->RequestEmitDebugInfo();
978 }
979
980 void SyncManagerImpl::ClearServerData(const ClearServerDataCallback& callback) {
981 DCHECK(thread_checker_.CalledOnValidThread());
982 scheduler_->Start(SyncScheduler::CLEAR_SERVER_DATA_MODE, base::Time());
983 ClearParams params(callback);
984 scheduler_->ScheduleClearServerData(params);
985 }
986
987 void SyncManagerImpl::OnCookieJarChanged(bool account_mismatch,
988 bool empty_jar) {
989 DCHECK(thread_checker_.CalledOnValidThread());
990 cycle_context_->set_cookie_jar_mismatch(account_mismatch);
991 cycle_context_->set_cookie_jar_empty(empty_jar);
992 }
993
994 } // namespace syncer
OLDNEW
« no previous file with comments | « components/sync/core_impl/sync_manager_impl.h ('k') | components/sync/core_impl/sync_manager_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698