Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(356)

Unified Diff: sync/notifier/non_blocking_invalidator.cc

Issue 308413002: Revert of Move some sync/notifier to components/invalidation (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 6 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sync/notifier/non_blocking_invalidator.h ('k') | sync/notifier/non_blocking_invalidator_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sync/notifier/non_blocking_invalidator.cc
diff --git a/sync/notifier/non_blocking_invalidator.cc b/sync/notifier/non_blocking_invalidator.cc
new file mode 100644
index 0000000000000000000000000000000000000000..df40477d9bd490cebaca0edc0c36091b3a836cc5
--- /dev/null
+++ b/sync/notifier/non_blocking_invalidator.cc
@@ -0,0 +1,367 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "sync/notifier/non_blocking_invalidator.h"
+
+#include <cstddef>
+
+#include "base/location.h"
+#include "base/logging.h"
+#include "base/memory/scoped_ptr.h"
+#include "base/single_thread_task_runner.h"
+#include "base/thread_task_runner_handle.h"
+#include "base/threading/thread.h"
+#include "jingle/notifier/listener/push_client.h"
+#include "sync/internal_api/public/util/weak_handle.h"
+#include "sync/notifier/gcm_network_channel_delegate.h"
+#include "sync/notifier/invalidation_handler.h"
+#include "sync/notifier/invalidation_notifier.h"
+#include "sync/notifier/object_id_invalidation_map.h"
+#include "sync/notifier/sync_system_resources.h"
+
+namespace syncer {
+
+struct NonBlockingInvalidator::InitializeOptions {
+ InitializeOptions(
+ NetworkChannelCreator network_channel_creator,
+ const std::string& invalidator_client_id,
+ const UnackedInvalidationsMap& saved_invalidations,
+ const std::string& invalidation_bootstrap_data,
+ const WeakHandle<InvalidationStateTracker>&
+ invalidation_state_tracker,
+ const std::string& client_info,
+ scoped_refptr<net::URLRequestContextGetter> request_context_getter)
+ : network_channel_creator(network_channel_creator),
+ invalidator_client_id(invalidator_client_id),
+ saved_invalidations(saved_invalidations),
+ invalidation_bootstrap_data(invalidation_bootstrap_data),
+ invalidation_state_tracker(invalidation_state_tracker),
+ client_info(client_info),
+ request_context_getter(request_context_getter) {
+ }
+
+ NetworkChannelCreator network_channel_creator;
+ std::string invalidator_client_id;
+ UnackedInvalidationsMap saved_invalidations;
+ std::string invalidation_bootstrap_data;
+ WeakHandle<InvalidationStateTracker> invalidation_state_tracker;
+ std::string client_info;
+ scoped_refptr<net::URLRequestContextGetter> request_context_getter;
+};
+
+namespace {
+// This class provides a wrapper for a logging class in order to receive
+// callbacks across threads, without having to worry about owner threads.
+class CallbackProxy {
+ public:
+ explicit CallbackProxy(
+ base::Callback<void(const base::DictionaryValue&)> callback);
+ ~CallbackProxy();
+
+ void Run(const base::DictionaryValue& value);
+
+ private:
+ static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback,
+ scoped_ptr<base::DictionaryValue> value);
+
+ base::Callback<void(const base::DictionaryValue&)> callback_;
+ scoped_refptr<base::SingleThreadTaskRunner> running_thread_;
+
+ DISALLOW_COPY_AND_ASSIGN(CallbackProxy);
+};
+
+CallbackProxy::CallbackProxy(
+ base::Callback<void(const base::DictionaryValue&)> callback)
+ : callback_(callback),
+ running_thread_(base::ThreadTaskRunnerHandle::Get()) {}
+
+CallbackProxy::~CallbackProxy() {}
+
+void CallbackProxy::DoRun(
+ base::Callback<void(const base::DictionaryValue&)> callback,
+ scoped_ptr<base::DictionaryValue> value) {
+ callback.Run(*value);
+}
+
+void CallbackProxy::Run(const base::DictionaryValue& value) {
+ scoped_ptr<base::DictionaryValue> copied(value.DeepCopy());
+ running_thread_->PostTask(
+ FROM_HERE,
+ base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied)));
+}
+}
+
+class NonBlockingInvalidator::Core
+ : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>,
+ // InvalidationHandler to observe the InvalidationNotifier we create.
+ public InvalidationHandler {
+ public:
+ // Called on parent thread. |delegate_observer| should be initialized.
+ explicit Core(
+ const WeakHandle<NonBlockingInvalidator>& delegate_observer);
+
+ // Helpers called on I/O thread.
+ void Initialize(
+ const NonBlockingInvalidator::InitializeOptions& initialize_options);
+ void Teardown();
+ void UpdateRegisteredIds(const ObjectIdSet& ids);
+ void UpdateCredentials(const std::string& email, const std::string& token);
+ void RequestDetailedStatus(
+ base::Callback<void(const base::DictionaryValue&)> callback) const;
+
+ // InvalidationHandler implementation (all called on I/O thread by
+ // InvalidationNotifier).
+ virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE;
+ virtual void OnIncomingInvalidation(
+ const ObjectIdInvalidationMap& invalidation_map) OVERRIDE;
+ virtual std::string GetOwnerName() const OVERRIDE;
+
+ private:
+ friend class
+ base::RefCountedThreadSafe<NonBlockingInvalidator::Core>;
+ // Called on parent or I/O thread.
+ virtual ~Core();
+
+ // The variables below should be used only on the I/O thread.
+ const WeakHandle<NonBlockingInvalidator> delegate_observer_;
+ scoped_ptr<InvalidationNotifier> invalidation_notifier_;
+ scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_;
+
+ DISALLOW_COPY_AND_ASSIGN(Core);
+};
+
+NonBlockingInvalidator::Core::Core(
+ const WeakHandle<NonBlockingInvalidator>& delegate_observer)
+ : delegate_observer_(delegate_observer) {
+ DCHECK(delegate_observer_.IsInitialized());
+}
+
+NonBlockingInvalidator::Core::~Core() {
+}
+
+void NonBlockingInvalidator::Core::Initialize(
+ const NonBlockingInvalidator::InitializeOptions& initialize_options) {
+ DCHECK(initialize_options.request_context_getter.get());
+ network_task_runner_ =
+ initialize_options.request_context_getter->GetNetworkTaskRunner();
+ DCHECK(network_task_runner_->BelongsToCurrentThread());
+ scoped_ptr<SyncNetworkChannel> network_channel =
+ initialize_options.network_channel_creator.Run();
+ invalidation_notifier_.reset(
+ new InvalidationNotifier(
+ network_channel.Pass(),
+ initialize_options.invalidator_client_id,
+ initialize_options.saved_invalidations,
+ initialize_options.invalidation_bootstrap_data,
+ initialize_options.invalidation_state_tracker,
+ initialize_options.client_info));
+ invalidation_notifier_->RegisterHandler(this);
+}
+
+void NonBlockingInvalidator::Core::Teardown() {
+ DCHECK(network_task_runner_->BelongsToCurrentThread());
+ invalidation_notifier_->UnregisterHandler(this);
+ invalidation_notifier_.reset();
+ network_task_runner_ = NULL;
+}
+
+void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) {
+ DCHECK(network_task_runner_->BelongsToCurrentThread());
+ invalidation_notifier_->UpdateRegisteredIds(this, ids);
+}
+
+void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email,
+ const std::string& token) {
+ DCHECK(network_task_runner_->BelongsToCurrentThread());
+ invalidation_notifier_->UpdateCredentials(email, token);
+}
+
+void NonBlockingInvalidator::Core::RequestDetailedStatus(
+ base::Callback<void(const base::DictionaryValue&)> callback) const {
+ DCHECK(network_task_runner_->BelongsToCurrentThread());
+ invalidation_notifier_->RequestDetailedStatus(callback);
+}
+
+void NonBlockingInvalidator::Core::OnInvalidatorStateChange(
+ InvalidatorState reason) {
+ DCHECK(network_task_runner_->BelongsToCurrentThread());
+ delegate_observer_.Call(FROM_HERE,
+ &NonBlockingInvalidator::OnInvalidatorStateChange,
+ reason);
+}
+
+void NonBlockingInvalidator::Core::OnIncomingInvalidation(
+ const ObjectIdInvalidationMap& invalidation_map) {
+ DCHECK(network_task_runner_->BelongsToCurrentThread());
+ delegate_observer_.Call(FROM_HERE,
+ &NonBlockingInvalidator::OnIncomingInvalidation,
+ invalidation_map);
+}
+
+std::string NonBlockingInvalidator::Core::GetOwnerName() const {
+ return "Sync";
+}
+
+NonBlockingInvalidator::NonBlockingInvalidator(
+ NetworkChannelCreator network_channel_creator,
+ const std::string& invalidator_client_id,
+ const UnackedInvalidationsMap& saved_invalidations,
+ const std::string& invalidation_bootstrap_data,
+ InvalidationStateTracker* invalidation_state_tracker,
+ const std::string& client_info,
+ const scoped_refptr<net::URLRequestContextGetter>& request_context_getter)
+ : invalidation_state_tracker_(invalidation_state_tracker),
+ parent_task_runner_(base::ThreadTaskRunnerHandle::Get()),
+ network_task_runner_(request_context_getter->GetNetworkTaskRunner()),
+ weak_ptr_factory_(this) {
+ core_ = new Core(MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()));
+
+ InitializeOptions initialize_options(
+ network_channel_creator,
+ invalidator_client_id,
+ saved_invalidations,
+ invalidation_bootstrap_data,
+ MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
+ client_info,
+ request_context_getter);
+
+ if (!network_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(
+ &NonBlockingInvalidator::Core::Initialize,
+ core_.get(),
+ initialize_options))) {
+ NOTREACHED();
+ }
+}
+
+NonBlockingInvalidator::~NonBlockingInvalidator() {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ if (!network_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingInvalidator::Core::Teardown,
+ core_.get()))) {
+ DVLOG(1) << "Network thread stopped before invalidator is destroyed.";
+ }
+}
+
+void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ registrar_.RegisterHandler(handler);
+}
+
+void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler,
+ const ObjectIdSet& ids) {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ registrar_.UpdateRegisteredIds(handler, ids);
+ if (!network_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(
+ &NonBlockingInvalidator::Core::UpdateRegisteredIds,
+ core_.get(),
+ registrar_.GetAllRegisteredIds()))) {
+ NOTREACHED();
+ }
+}
+
+void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ registrar_.UnregisterHandler(handler);
+}
+
+InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ return registrar_.GetInvalidatorState();
+}
+
+void NonBlockingInvalidator::UpdateCredentials(const std::string& email,
+ const std::string& token) {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ if (!network_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials,
+ core_.get(), email, token))) {
+ NOTREACHED();
+ }
+}
+
+void NonBlockingInvalidator::RequestDetailedStatus(
+ base::Callback<void(const base::DictionaryValue&)> callback) const {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ base::Callback<void(const base::DictionaryValue&)> proxy_callback =
+ base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback)));
+ if (!network_task_runner_->PostTask(
+ FROM_HERE,
+ base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus,
+ core_.get(),
+ proxy_callback))) {
+ NOTREACHED();
+ }
+}
+
+NetworkChannelCreator
+ NonBlockingInvalidator::MakePushClientChannelCreator(
+ const notifier::NotifierOptions& notifier_options) {
+ return base::Bind(SyncNetworkChannel::CreatePushClientChannel,
+ notifier_options);
+}
+
+NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator(
+ scoped_refptr<net::URLRequestContextGetter> request_context_getter,
+ scoped_ptr<GCMNetworkChannelDelegate> delegate) {
+ return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel,
+ request_context_getter,
+ base::Passed(&delegate));
+}
+
+void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ invalidation_state_tracker_->ClearAndSetNewClientId(data);
+}
+
+std::string NonBlockingInvalidator::GetInvalidatorClientId() const {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ return invalidation_state_tracker_->GetInvalidatorClientId();
+}
+
+void NonBlockingInvalidator::SetBootstrapData(const std::string& data) {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ invalidation_state_tracker_->SetBootstrapData(data);
+}
+
+std::string NonBlockingInvalidator::GetBootstrapData() const {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ return invalidation_state_tracker_->GetBootstrapData();
+}
+
+void NonBlockingInvalidator::SetSavedInvalidations(
+ const UnackedInvalidationsMap& states) {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ invalidation_state_tracker_->SetSavedInvalidations(states);
+}
+
+UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ return invalidation_state_tracker_->GetSavedInvalidations();
+}
+
+void NonBlockingInvalidator::Clear() {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ invalidation_state_tracker_->Clear();
+}
+
+void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ registrar_.UpdateInvalidatorState(state);
+}
+
+void NonBlockingInvalidator::OnIncomingInvalidation(
+ const ObjectIdInvalidationMap& invalidation_map) {
+ DCHECK(parent_task_runner_->BelongsToCurrentThread());
+ registrar_.DispatchInvalidationsToHandlers(invalidation_map);
+}
+
+std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; }
+
+} // namespace syncer
« no previous file with comments | « sync/notifier/non_blocking_invalidator.h ('k') | sync/notifier/non_blocking_invalidator_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698