Index: sync/notifier/non_blocking_invalidator.cc |
diff --git a/sync/notifier/non_blocking_invalidator.cc b/sync/notifier/non_blocking_invalidator.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..df40477d9bd490cebaca0edc0c36091b3a836cc5 |
--- /dev/null |
+++ b/sync/notifier/non_blocking_invalidator.cc |
@@ -0,0 +1,367 @@ |
+// Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "sync/notifier/non_blocking_invalidator.h" |
+ |
+#include <cstddef> |
+ |
+#include "base/location.h" |
+#include "base/logging.h" |
+#include "base/memory/scoped_ptr.h" |
+#include "base/single_thread_task_runner.h" |
+#include "base/thread_task_runner_handle.h" |
+#include "base/threading/thread.h" |
+#include "jingle/notifier/listener/push_client.h" |
+#include "sync/internal_api/public/util/weak_handle.h" |
+#include "sync/notifier/gcm_network_channel_delegate.h" |
+#include "sync/notifier/invalidation_handler.h" |
+#include "sync/notifier/invalidation_notifier.h" |
+#include "sync/notifier/object_id_invalidation_map.h" |
+#include "sync/notifier/sync_system_resources.h" |
+ |
+namespace syncer { |
+ |
+struct NonBlockingInvalidator::InitializeOptions { |
+ InitializeOptions( |
+ NetworkChannelCreator network_channel_creator, |
+ const std::string& invalidator_client_id, |
+ const UnackedInvalidationsMap& saved_invalidations, |
+ const std::string& invalidation_bootstrap_data, |
+ const WeakHandle<InvalidationStateTracker>& |
+ invalidation_state_tracker, |
+ const std::string& client_info, |
+ scoped_refptr<net::URLRequestContextGetter> request_context_getter) |
+ : network_channel_creator(network_channel_creator), |
+ invalidator_client_id(invalidator_client_id), |
+ saved_invalidations(saved_invalidations), |
+ invalidation_bootstrap_data(invalidation_bootstrap_data), |
+ invalidation_state_tracker(invalidation_state_tracker), |
+ client_info(client_info), |
+ request_context_getter(request_context_getter) { |
+ } |
+ |
+ NetworkChannelCreator network_channel_creator; |
+ std::string invalidator_client_id; |
+ UnackedInvalidationsMap saved_invalidations; |
+ std::string invalidation_bootstrap_data; |
+ WeakHandle<InvalidationStateTracker> invalidation_state_tracker; |
+ std::string client_info; |
+ scoped_refptr<net::URLRequestContextGetter> request_context_getter; |
+}; |
+ |
+namespace { |
+// This class provides a wrapper for a logging class in order to receive |
+// callbacks across threads, without having to worry about owner threads. |
+class CallbackProxy { |
+ public: |
+ explicit CallbackProxy( |
+ base::Callback<void(const base::DictionaryValue&)> callback); |
+ ~CallbackProxy(); |
+ |
+ void Run(const base::DictionaryValue& value); |
+ |
+ private: |
+ static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback, |
+ scoped_ptr<base::DictionaryValue> value); |
+ |
+ base::Callback<void(const base::DictionaryValue&)> callback_; |
+ scoped_refptr<base::SingleThreadTaskRunner> running_thread_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(CallbackProxy); |
+}; |
+ |
+CallbackProxy::CallbackProxy( |
+ base::Callback<void(const base::DictionaryValue&)> callback) |
+ : callback_(callback), |
+ running_thread_(base::ThreadTaskRunnerHandle::Get()) {} |
+ |
+CallbackProxy::~CallbackProxy() {} |
+ |
+void CallbackProxy::DoRun( |
+ base::Callback<void(const base::DictionaryValue&)> callback, |
+ scoped_ptr<base::DictionaryValue> value) { |
+ callback.Run(*value); |
+} |
+ |
+void CallbackProxy::Run(const base::DictionaryValue& value) { |
+ scoped_ptr<base::DictionaryValue> copied(value.DeepCopy()); |
+ running_thread_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied))); |
+} |
+} |
+ |
+class NonBlockingInvalidator::Core |
+ : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>, |
+ // InvalidationHandler to observe the InvalidationNotifier we create. |
+ public InvalidationHandler { |
+ public: |
+ // Called on parent thread. |delegate_observer| should be initialized. |
+ explicit Core( |
+ const WeakHandle<NonBlockingInvalidator>& delegate_observer); |
+ |
+ // Helpers called on I/O thread. |
+ void Initialize( |
+ const NonBlockingInvalidator::InitializeOptions& initialize_options); |
+ void Teardown(); |
+ void UpdateRegisteredIds(const ObjectIdSet& ids); |
+ void UpdateCredentials(const std::string& email, const std::string& token); |
+ void RequestDetailedStatus( |
+ base::Callback<void(const base::DictionaryValue&)> callback) const; |
+ |
+ // InvalidationHandler implementation (all called on I/O thread by |
+ // InvalidationNotifier). |
+ virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE; |
+ virtual void OnIncomingInvalidation( |
+ const ObjectIdInvalidationMap& invalidation_map) OVERRIDE; |
+ virtual std::string GetOwnerName() const OVERRIDE; |
+ |
+ private: |
+ friend class |
+ base::RefCountedThreadSafe<NonBlockingInvalidator::Core>; |
+ // Called on parent or I/O thread. |
+ virtual ~Core(); |
+ |
+ // The variables below should be used only on the I/O thread. |
+ const WeakHandle<NonBlockingInvalidator> delegate_observer_; |
+ scoped_ptr<InvalidationNotifier> invalidation_notifier_; |
+ scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(Core); |
+}; |
+ |
+NonBlockingInvalidator::Core::Core( |
+ const WeakHandle<NonBlockingInvalidator>& delegate_observer) |
+ : delegate_observer_(delegate_observer) { |
+ DCHECK(delegate_observer_.IsInitialized()); |
+} |
+ |
+NonBlockingInvalidator::Core::~Core() { |
+} |
+ |
+void NonBlockingInvalidator::Core::Initialize( |
+ const NonBlockingInvalidator::InitializeOptions& initialize_options) { |
+ DCHECK(initialize_options.request_context_getter.get()); |
+ network_task_runner_ = |
+ initialize_options.request_context_getter->GetNetworkTaskRunner(); |
+ DCHECK(network_task_runner_->BelongsToCurrentThread()); |
+ scoped_ptr<SyncNetworkChannel> network_channel = |
+ initialize_options.network_channel_creator.Run(); |
+ invalidation_notifier_.reset( |
+ new InvalidationNotifier( |
+ network_channel.Pass(), |
+ initialize_options.invalidator_client_id, |
+ initialize_options.saved_invalidations, |
+ initialize_options.invalidation_bootstrap_data, |
+ initialize_options.invalidation_state_tracker, |
+ initialize_options.client_info)); |
+ invalidation_notifier_->RegisterHandler(this); |
+} |
+ |
+void NonBlockingInvalidator::Core::Teardown() { |
+ DCHECK(network_task_runner_->BelongsToCurrentThread()); |
+ invalidation_notifier_->UnregisterHandler(this); |
+ invalidation_notifier_.reset(); |
+ network_task_runner_ = NULL; |
+} |
+ |
+void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) { |
+ DCHECK(network_task_runner_->BelongsToCurrentThread()); |
+ invalidation_notifier_->UpdateRegisteredIds(this, ids); |
+} |
+ |
+void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email, |
+ const std::string& token) { |
+ DCHECK(network_task_runner_->BelongsToCurrentThread()); |
+ invalidation_notifier_->UpdateCredentials(email, token); |
+} |
+ |
+void NonBlockingInvalidator::Core::RequestDetailedStatus( |
+ base::Callback<void(const base::DictionaryValue&)> callback) const { |
+ DCHECK(network_task_runner_->BelongsToCurrentThread()); |
+ invalidation_notifier_->RequestDetailedStatus(callback); |
+} |
+ |
+void NonBlockingInvalidator::Core::OnInvalidatorStateChange( |
+ InvalidatorState reason) { |
+ DCHECK(network_task_runner_->BelongsToCurrentThread()); |
+ delegate_observer_.Call(FROM_HERE, |
+ &NonBlockingInvalidator::OnInvalidatorStateChange, |
+ reason); |
+} |
+ |
+void NonBlockingInvalidator::Core::OnIncomingInvalidation( |
+ const ObjectIdInvalidationMap& invalidation_map) { |
+ DCHECK(network_task_runner_->BelongsToCurrentThread()); |
+ delegate_observer_.Call(FROM_HERE, |
+ &NonBlockingInvalidator::OnIncomingInvalidation, |
+ invalidation_map); |
+} |
+ |
+std::string NonBlockingInvalidator::Core::GetOwnerName() const { |
+ return "Sync"; |
+} |
+ |
+NonBlockingInvalidator::NonBlockingInvalidator( |
+ NetworkChannelCreator network_channel_creator, |
+ const std::string& invalidator_client_id, |
+ const UnackedInvalidationsMap& saved_invalidations, |
+ const std::string& invalidation_bootstrap_data, |
+ InvalidationStateTracker* invalidation_state_tracker, |
+ const std::string& client_info, |
+ const scoped_refptr<net::URLRequestContextGetter>& request_context_getter) |
+ : invalidation_state_tracker_(invalidation_state_tracker), |
+ parent_task_runner_(base::ThreadTaskRunnerHandle::Get()), |
+ network_task_runner_(request_context_getter->GetNetworkTaskRunner()), |
+ weak_ptr_factory_(this) { |
+ core_ = new Core(MakeWeakHandle(weak_ptr_factory_.GetWeakPtr())); |
+ |
+ InitializeOptions initialize_options( |
+ network_channel_creator, |
+ invalidator_client_id, |
+ saved_invalidations, |
+ invalidation_bootstrap_data, |
+ MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()), |
+ client_info, |
+ request_context_getter); |
+ |
+ if (!network_task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind( |
+ &NonBlockingInvalidator::Core::Initialize, |
+ core_.get(), |
+ initialize_options))) { |
+ NOTREACHED(); |
+ } |
+} |
+ |
+NonBlockingInvalidator::~NonBlockingInvalidator() { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ if (!network_task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&NonBlockingInvalidator::Core::Teardown, |
+ core_.get()))) { |
+ DVLOG(1) << "Network thread stopped before invalidator is destroyed."; |
+ } |
+} |
+ |
+void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ registrar_.RegisterHandler(handler); |
+} |
+ |
+void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler, |
+ const ObjectIdSet& ids) { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ registrar_.UpdateRegisteredIds(handler, ids); |
+ if (!network_task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind( |
+ &NonBlockingInvalidator::Core::UpdateRegisteredIds, |
+ core_.get(), |
+ registrar_.GetAllRegisteredIds()))) { |
+ NOTREACHED(); |
+ } |
+} |
+ |
+void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ registrar_.UnregisterHandler(handler); |
+} |
+ |
+InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ return registrar_.GetInvalidatorState(); |
+} |
+ |
+void NonBlockingInvalidator::UpdateCredentials(const std::string& email, |
+ const std::string& token) { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ if (!network_task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials, |
+ core_.get(), email, token))) { |
+ NOTREACHED(); |
+ } |
+} |
+ |
+void NonBlockingInvalidator::RequestDetailedStatus( |
+ base::Callback<void(const base::DictionaryValue&)> callback) const { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ base::Callback<void(const base::DictionaryValue&)> proxy_callback = |
+ base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback))); |
+ if (!network_task_runner_->PostTask( |
+ FROM_HERE, |
+ base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus, |
+ core_.get(), |
+ proxy_callback))) { |
+ NOTREACHED(); |
+ } |
+} |
+ |
+NetworkChannelCreator |
+ NonBlockingInvalidator::MakePushClientChannelCreator( |
+ const notifier::NotifierOptions& notifier_options) { |
+ return base::Bind(SyncNetworkChannel::CreatePushClientChannel, |
+ notifier_options); |
+} |
+ |
+NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator( |
+ scoped_refptr<net::URLRequestContextGetter> request_context_getter, |
+ scoped_ptr<GCMNetworkChannelDelegate> delegate) { |
+ return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel, |
+ request_context_getter, |
+ base::Passed(&delegate)); |
+} |
+ |
+void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ invalidation_state_tracker_->ClearAndSetNewClientId(data); |
+} |
+ |
+std::string NonBlockingInvalidator::GetInvalidatorClientId() const { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ return invalidation_state_tracker_->GetInvalidatorClientId(); |
+} |
+ |
+void NonBlockingInvalidator::SetBootstrapData(const std::string& data) { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ invalidation_state_tracker_->SetBootstrapData(data); |
+} |
+ |
+std::string NonBlockingInvalidator::GetBootstrapData() const { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ return invalidation_state_tracker_->GetBootstrapData(); |
+} |
+ |
+void NonBlockingInvalidator::SetSavedInvalidations( |
+ const UnackedInvalidationsMap& states) { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ invalidation_state_tracker_->SetSavedInvalidations(states); |
+} |
+ |
+UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ return invalidation_state_tracker_->GetSavedInvalidations(); |
+} |
+ |
+void NonBlockingInvalidator::Clear() { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ invalidation_state_tracker_->Clear(); |
+} |
+ |
+void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ registrar_.UpdateInvalidatorState(state); |
+} |
+ |
+void NonBlockingInvalidator::OnIncomingInvalidation( |
+ const ObjectIdInvalidationMap& invalidation_map) { |
+ DCHECK(parent_task_runner_->BelongsToCurrentThread()); |
+ registrar_.DispatchInvalidationsToHandlers(invalidation_map); |
+} |
+ |
+std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; } |
+ |
+} // namespace syncer |