| Index: sync/notifier/non_blocking_invalidator.cc
|
| diff --git a/sync/notifier/non_blocking_invalidator.cc b/sync/notifier/non_blocking_invalidator.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..df40477d9bd490cebaca0edc0c36091b3a836cc5
|
| --- /dev/null
|
| +++ b/sync/notifier/non_blocking_invalidator.cc
|
| @@ -0,0 +1,367 @@
|
| +// Copyright (c) 2012 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "sync/notifier/non_blocking_invalidator.h"
|
| +
|
| +#include <cstddef>
|
| +
|
| +#include "base/location.h"
|
| +#include "base/logging.h"
|
| +#include "base/memory/scoped_ptr.h"
|
| +#include "base/single_thread_task_runner.h"
|
| +#include "base/thread_task_runner_handle.h"
|
| +#include "base/threading/thread.h"
|
| +#include "jingle/notifier/listener/push_client.h"
|
| +#include "sync/internal_api/public/util/weak_handle.h"
|
| +#include "sync/notifier/gcm_network_channel_delegate.h"
|
| +#include "sync/notifier/invalidation_handler.h"
|
| +#include "sync/notifier/invalidation_notifier.h"
|
| +#include "sync/notifier/object_id_invalidation_map.h"
|
| +#include "sync/notifier/sync_system_resources.h"
|
| +
|
| +namespace syncer {
|
| +
|
| +struct NonBlockingInvalidator::InitializeOptions {
|
| + InitializeOptions(
|
| + NetworkChannelCreator network_channel_creator,
|
| + const std::string& invalidator_client_id,
|
| + const UnackedInvalidationsMap& saved_invalidations,
|
| + const std::string& invalidation_bootstrap_data,
|
| + const WeakHandle<InvalidationStateTracker>&
|
| + invalidation_state_tracker,
|
| + const std::string& client_info,
|
| + scoped_refptr<net::URLRequestContextGetter> request_context_getter)
|
| + : network_channel_creator(network_channel_creator),
|
| + invalidator_client_id(invalidator_client_id),
|
| + saved_invalidations(saved_invalidations),
|
| + invalidation_bootstrap_data(invalidation_bootstrap_data),
|
| + invalidation_state_tracker(invalidation_state_tracker),
|
| + client_info(client_info),
|
| + request_context_getter(request_context_getter) {
|
| + }
|
| +
|
| + NetworkChannelCreator network_channel_creator;
|
| + std::string invalidator_client_id;
|
| + UnackedInvalidationsMap saved_invalidations;
|
| + std::string invalidation_bootstrap_data;
|
| + WeakHandle<InvalidationStateTracker> invalidation_state_tracker;
|
| + std::string client_info;
|
| + scoped_refptr<net::URLRequestContextGetter> request_context_getter;
|
| +};
|
| +
|
| +namespace {
|
| +// This class provides a wrapper for a logging class in order to receive
|
| +// callbacks across threads, without having to worry about owner threads.
|
| +class CallbackProxy {
|
| + public:
|
| + explicit CallbackProxy(
|
| + base::Callback<void(const base::DictionaryValue&)> callback);
|
| + ~CallbackProxy();
|
| +
|
| + void Run(const base::DictionaryValue& value);
|
| +
|
| + private:
|
| + static void DoRun(base::Callback<void(const base::DictionaryValue&)> callback,
|
| + scoped_ptr<base::DictionaryValue> value);
|
| +
|
| + base::Callback<void(const base::DictionaryValue&)> callback_;
|
| + scoped_refptr<base::SingleThreadTaskRunner> running_thread_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(CallbackProxy);
|
| +};
|
| +
|
| +CallbackProxy::CallbackProxy(
|
| + base::Callback<void(const base::DictionaryValue&)> callback)
|
| + : callback_(callback),
|
| + running_thread_(base::ThreadTaskRunnerHandle::Get()) {}
|
| +
|
| +CallbackProxy::~CallbackProxy() {}
|
| +
|
| +void CallbackProxy::DoRun(
|
| + base::Callback<void(const base::DictionaryValue&)> callback,
|
| + scoped_ptr<base::DictionaryValue> value) {
|
| + callback.Run(*value);
|
| +}
|
| +
|
| +void CallbackProxy::Run(const base::DictionaryValue& value) {
|
| + scoped_ptr<base::DictionaryValue> copied(value.DeepCopy());
|
| + running_thread_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&CallbackProxy::DoRun, callback_, base::Passed(&copied)));
|
| +}
|
| +}
|
| +
|
| +class NonBlockingInvalidator::Core
|
| + : public base::RefCountedThreadSafe<NonBlockingInvalidator::Core>,
|
| + // InvalidationHandler to observe the InvalidationNotifier we create.
|
| + public InvalidationHandler {
|
| + public:
|
| + // Called on parent thread. |delegate_observer| should be initialized.
|
| + explicit Core(
|
| + const WeakHandle<NonBlockingInvalidator>& delegate_observer);
|
| +
|
| + // Helpers called on I/O thread.
|
| + void Initialize(
|
| + const NonBlockingInvalidator::InitializeOptions& initialize_options);
|
| + void Teardown();
|
| + void UpdateRegisteredIds(const ObjectIdSet& ids);
|
| + void UpdateCredentials(const std::string& email, const std::string& token);
|
| + void RequestDetailedStatus(
|
| + base::Callback<void(const base::DictionaryValue&)> callback) const;
|
| +
|
| + // InvalidationHandler implementation (all called on I/O thread by
|
| + // InvalidationNotifier).
|
| + virtual void OnInvalidatorStateChange(InvalidatorState reason) OVERRIDE;
|
| + virtual void OnIncomingInvalidation(
|
| + const ObjectIdInvalidationMap& invalidation_map) OVERRIDE;
|
| + virtual std::string GetOwnerName() const OVERRIDE;
|
| +
|
| + private:
|
| + friend class
|
| + base::RefCountedThreadSafe<NonBlockingInvalidator::Core>;
|
| + // Called on parent or I/O thread.
|
| + virtual ~Core();
|
| +
|
| + // The variables below should be used only on the I/O thread.
|
| + const WeakHandle<NonBlockingInvalidator> delegate_observer_;
|
| + scoped_ptr<InvalidationNotifier> invalidation_notifier_;
|
| + scoped_refptr<base::SingleThreadTaskRunner> network_task_runner_;
|
| +
|
| + DISALLOW_COPY_AND_ASSIGN(Core);
|
| +};
|
| +
|
| +NonBlockingInvalidator::Core::Core(
|
| + const WeakHandle<NonBlockingInvalidator>& delegate_observer)
|
| + : delegate_observer_(delegate_observer) {
|
| + DCHECK(delegate_observer_.IsInitialized());
|
| +}
|
| +
|
| +NonBlockingInvalidator::Core::~Core() {
|
| +}
|
| +
|
| +void NonBlockingInvalidator::Core::Initialize(
|
| + const NonBlockingInvalidator::InitializeOptions& initialize_options) {
|
| + DCHECK(initialize_options.request_context_getter.get());
|
| + network_task_runner_ =
|
| + initialize_options.request_context_getter->GetNetworkTaskRunner();
|
| + DCHECK(network_task_runner_->BelongsToCurrentThread());
|
| + scoped_ptr<SyncNetworkChannel> network_channel =
|
| + initialize_options.network_channel_creator.Run();
|
| + invalidation_notifier_.reset(
|
| + new InvalidationNotifier(
|
| + network_channel.Pass(),
|
| + initialize_options.invalidator_client_id,
|
| + initialize_options.saved_invalidations,
|
| + initialize_options.invalidation_bootstrap_data,
|
| + initialize_options.invalidation_state_tracker,
|
| + initialize_options.client_info));
|
| + invalidation_notifier_->RegisterHandler(this);
|
| +}
|
| +
|
| +void NonBlockingInvalidator::Core::Teardown() {
|
| + DCHECK(network_task_runner_->BelongsToCurrentThread());
|
| + invalidation_notifier_->UnregisterHandler(this);
|
| + invalidation_notifier_.reset();
|
| + network_task_runner_ = NULL;
|
| +}
|
| +
|
| +void NonBlockingInvalidator::Core::UpdateRegisteredIds(const ObjectIdSet& ids) {
|
| + DCHECK(network_task_runner_->BelongsToCurrentThread());
|
| + invalidation_notifier_->UpdateRegisteredIds(this, ids);
|
| +}
|
| +
|
| +void NonBlockingInvalidator::Core::UpdateCredentials(const std::string& email,
|
| + const std::string& token) {
|
| + DCHECK(network_task_runner_->BelongsToCurrentThread());
|
| + invalidation_notifier_->UpdateCredentials(email, token);
|
| +}
|
| +
|
| +void NonBlockingInvalidator::Core::RequestDetailedStatus(
|
| + base::Callback<void(const base::DictionaryValue&)> callback) const {
|
| + DCHECK(network_task_runner_->BelongsToCurrentThread());
|
| + invalidation_notifier_->RequestDetailedStatus(callback);
|
| +}
|
| +
|
| +void NonBlockingInvalidator::Core::OnInvalidatorStateChange(
|
| + InvalidatorState reason) {
|
| + DCHECK(network_task_runner_->BelongsToCurrentThread());
|
| + delegate_observer_.Call(FROM_HERE,
|
| + &NonBlockingInvalidator::OnInvalidatorStateChange,
|
| + reason);
|
| +}
|
| +
|
| +void NonBlockingInvalidator::Core::OnIncomingInvalidation(
|
| + const ObjectIdInvalidationMap& invalidation_map) {
|
| + DCHECK(network_task_runner_->BelongsToCurrentThread());
|
| + delegate_observer_.Call(FROM_HERE,
|
| + &NonBlockingInvalidator::OnIncomingInvalidation,
|
| + invalidation_map);
|
| +}
|
| +
|
| +std::string NonBlockingInvalidator::Core::GetOwnerName() const {
|
| + return "Sync";
|
| +}
|
| +
|
| +NonBlockingInvalidator::NonBlockingInvalidator(
|
| + NetworkChannelCreator network_channel_creator,
|
| + const std::string& invalidator_client_id,
|
| + const UnackedInvalidationsMap& saved_invalidations,
|
| + const std::string& invalidation_bootstrap_data,
|
| + InvalidationStateTracker* invalidation_state_tracker,
|
| + const std::string& client_info,
|
| + const scoped_refptr<net::URLRequestContextGetter>& request_context_getter)
|
| + : invalidation_state_tracker_(invalidation_state_tracker),
|
| + parent_task_runner_(base::ThreadTaskRunnerHandle::Get()),
|
| + network_task_runner_(request_context_getter->GetNetworkTaskRunner()),
|
| + weak_ptr_factory_(this) {
|
| + core_ = new Core(MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()));
|
| +
|
| + InitializeOptions initialize_options(
|
| + network_channel_creator,
|
| + invalidator_client_id,
|
| + saved_invalidations,
|
| + invalidation_bootstrap_data,
|
| + MakeWeakHandle(weak_ptr_factory_.GetWeakPtr()),
|
| + client_info,
|
| + request_context_getter);
|
| +
|
| + if (!network_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(
|
| + &NonBlockingInvalidator::Core::Initialize,
|
| + core_.get(),
|
| + initialize_options))) {
|
| + NOTREACHED();
|
| + }
|
| +}
|
| +
|
| +NonBlockingInvalidator::~NonBlockingInvalidator() {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + if (!network_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&NonBlockingInvalidator::Core::Teardown,
|
| + core_.get()))) {
|
| + DVLOG(1) << "Network thread stopped before invalidator is destroyed.";
|
| + }
|
| +}
|
| +
|
| +void NonBlockingInvalidator::RegisterHandler(InvalidationHandler* handler) {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + registrar_.RegisterHandler(handler);
|
| +}
|
| +
|
| +void NonBlockingInvalidator::UpdateRegisteredIds(InvalidationHandler* handler,
|
| + const ObjectIdSet& ids) {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + registrar_.UpdateRegisteredIds(handler, ids);
|
| + if (!network_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(
|
| + &NonBlockingInvalidator::Core::UpdateRegisteredIds,
|
| + core_.get(),
|
| + registrar_.GetAllRegisteredIds()))) {
|
| + NOTREACHED();
|
| + }
|
| +}
|
| +
|
| +void NonBlockingInvalidator::UnregisterHandler(InvalidationHandler* handler) {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + registrar_.UnregisterHandler(handler);
|
| +}
|
| +
|
| +InvalidatorState NonBlockingInvalidator::GetInvalidatorState() const {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + return registrar_.GetInvalidatorState();
|
| +}
|
| +
|
| +void NonBlockingInvalidator::UpdateCredentials(const std::string& email,
|
| + const std::string& token) {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + if (!network_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&NonBlockingInvalidator::Core::UpdateCredentials,
|
| + core_.get(), email, token))) {
|
| + NOTREACHED();
|
| + }
|
| +}
|
| +
|
| +void NonBlockingInvalidator::RequestDetailedStatus(
|
| + base::Callback<void(const base::DictionaryValue&)> callback) const {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + base::Callback<void(const base::DictionaryValue&)> proxy_callback =
|
| + base::Bind(&CallbackProxy::Run, base::Owned(new CallbackProxy(callback)));
|
| + if (!network_task_runner_->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&NonBlockingInvalidator::Core::RequestDetailedStatus,
|
| + core_.get(),
|
| + proxy_callback))) {
|
| + NOTREACHED();
|
| + }
|
| +}
|
| +
|
| +NetworkChannelCreator
|
| + NonBlockingInvalidator::MakePushClientChannelCreator(
|
| + const notifier::NotifierOptions& notifier_options) {
|
| + return base::Bind(SyncNetworkChannel::CreatePushClientChannel,
|
| + notifier_options);
|
| +}
|
| +
|
| +NetworkChannelCreator NonBlockingInvalidator::MakeGCMNetworkChannelCreator(
|
| + scoped_refptr<net::URLRequestContextGetter> request_context_getter,
|
| + scoped_ptr<GCMNetworkChannelDelegate> delegate) {
|
| + return base::Bind(&SyncNetworkChannel::CreateGCMNetworkChannel,
|
| + request_context_getter,
|
| + base::Passed(&delegate));
|
| +}
|
| +
|
| +void NonBlockingInvalidator::ClearAndSetNewClientId(const std::string& data) {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + invalidation_state_tracker_->ClearAndSetNewClientId(data);
|
| +}
|
| +
|
| +std::string NonBlockingInvalidator::GetInvalidatorClientId() const {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + return invalidation_state_tracker_->GetInvalidatorClientId();
|
| +}
|
| +
|
| +void NonBlockingInvalidator::SetBootstrapData(const std::string& data) {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + invalidation_state_tracker_->SetBootstrapData(data);
|
| +}
|
| +
|
| +std::string NonBlockingInvalidator::GetBootstrapData() const {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + return invalidation_state_tracker_->GetBootstrapData();
|
| +}
|
| +
|
| +void NonBlockingInvalidator::SetSavedInvalidations(
|
| + const UnackedInvalidationsMap& states) {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + invalidation_state_tracker_->SetSavedInvalidations(states);
|
| +}
|
| +
|
| +UnackedInvalidationsMap NonBlockingInvalidator::GetSavedInvalidations() const {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + return invalidation_state_tracker_->GetSavedInvalidations();
|
| +}
|
| +
|
| +void NonBlockingInvalidator::Clear() {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + invalidation_state_tracker_->Clear();
|
| +}
|
| +
|
| +void NonBlockingInvalidator::OnInvalidatorStateChange(InvalidatorState state) {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + registrar_.UpdateInvalidatorState(state);
|
| +}
|
| +
|
| +void NonBlockingInvalidator::OnIncomingInvalidation(
|
| + const ObjectIdInvalidationMap& invalidation_map) {
|
| + DCHECK(parent_task_runner_->BelongsToCurrentThread());
|
| + registrar_.DispatchInvalidationsToHandlers(invalidation_map);
|
| +}
|
| +
|
| +std::string NonBlockingInvalidator::GetOwnerName() const { return "Sync"; }
|
| +
|
| +} // namespace syncer
|
|
|