| Index: sync/notifier/sync_invalidation_listener.cc
|
| diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc
|
| index 4e57bb85ed5a2096072ab572573d4d7cf01e431b..28b37bc07873bf829ad9342ac1cefc2ba75006b4 100644
|
| --- a/sync/notifier/sync_invalidation_listener.cc
|
| +++ b/sync/notifier/sync_invalidation_listener.cc
|
| @@ -4,9 +4,9 @@
|
|
|
| #include "sync/notifier/sync_invalidation_listener.h"
|
|
|
| -#include <string>
|
| #include <vector>
|
|
|
| +#include "base/bind.h"
|
| #include "base/callback.h"
|
| #include "base/compiler_specific.h"
|
| #include "base/logging.h"
|
| @@ -29,8 +29,11 @@ namespace syncer {
|
| SyncInvalidationListener::Delegate::~Delegate() {}
|
|
|
| SyncInvalidationListener::SyncInvalidationListener(
|
| + base::TickClock* tick_clock,
|
| scoped_ptr<notifier::PushClient> push_client)
|
| - : push_client_(push_client.get()),
|
| + : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
|
| + ack_tracker_(tick_clock, ALLOW_THIS_IN_INITIALIZER_LIST(this)),
|
| + push_client_(push_client.get()),
|
| sync_system_resources_(push_client.Pass(),
|
| ALLOW_THIS_IN_INITIALIZER_LIST(this)),
|
| delegate_(NULL),
|
| @@ -105,6 +108,19 @@ void SyncInvalidationListener::Start(
|
| FROM_HERE,
|
| &InvalidationStateTracker::SetInvalidatorClientId,
|
| client_id);
|
| +
|
| + // Set up reminders for any invalidations that have not been locally
|
| + // acknowledged.
|
| + ObjectIdSet unacknowledged_ids;
|
| + for (InvalidationStateMap::const_iterator it =
|
| + invalidation_state_map_.begin();
|
| + it != invalidation_state_map_.end(); ++it) {
|
| + if (it->second.expected.Equals(it->second.current))
|
| + continue;
|
| + unacknowledged_ids.insert(it->first);
|
| + }
|
| + if (!unacknowledged_ids.empty())
|
| + ack_tracker_.Track(unacknowledged_ids);
|
| }
|
|
|
| void SyncInvalidationListener::UpdateCredentials(
|
| @@ -124,6 +140,27 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
|
| }
|
| }
|
|
|
| +void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
|
| + const AckHandle& ack_handle) {
|
| + DCHECK(CalledOnValidThread());
|
| + InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
|
| + if (state_it == invalidation_state_map_.end())
|
| + return;
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::Acknowledge,
|
| + id,
|
| + ack_handle);
|
| + state_it->second.current = ack_handle;
|
| + if (state_it->second.expected.Equals(ack_handle)) {
|
| + // If the received ack matches the expected ack, then we no longer need to
|
| + // keep track of |id| since it is up-to-date.
|
| + ObjectIdSet ids;
|
| + ids.insert(id);
|
| + ack_tracker_.Ack(ids);
|
| + }
|
| +}
|
| +
|
| void SyncInvalidationListener::Ready(
|
| invalidation::InvalidationClient* client) {
|
| DCHECK(CalledOnValidThread());
|
| @@ -167,17 +204,15 @@ void SyncInvalidationListener::Invalidate(
|
| DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
|
| << " to " << invalidation.version();
|
| invalidation_state_map_[id].version = invalidation.version();
|
| + invalidation_state_map_[id].payload = payload;
|
| invalidation_state_tracker_.Call(
|
| FROM_HERE,
|
| &InvalidationStateTracker::SetMaxVersionAndPayload,
|
| id, invalidation.version(), payload);
|
|
|
| - ObjectIdInvalidationMap invalidation_map;
|
| - invalidation_map[id].payload = payload;
|
| - EmitInvalidation(invalidation_map);
|
| - // TODO(akalin): We should really acknowledge only after we get the
|
| - // updates from the sync server. (see http://crbug.com/78462).
|
| - client->Acknowledge(ack_handle);
|
| + ObjectIdSet ids;
|
| + ids.insert(id);
|
| + PrepareInvalidation(ids, payload, client, ack_handle);
|
| }
|
|
|
| void SyncInvalidationListener::InvalidateUnknownVersion(
|
| @@ -188,12 +223,9 @@ void SyncInvalidationListener::InvalidateUnknownVersion(
|
| DCHECK_EQ(client, invalidation_client_.get());
|
| DVLOG(1) << "InvalidateUnknownVersion";
|
|
|
| - ObjectIdInvalidationMap invalidation_map;
|
| - invalidation_map[object_id].payload = std::string();
|
| - EmitInvalidation(invalidation_map);
|
| - // TODO(akalin): We should really acknowledge only after we get the
|
| - // updates from the sync server. (see http://crbug.com/78462).
|
| - client->Acknowledge(ack_handle);
|
| + ObjectIdSet ids;
|
| + ids.insert(object_id);
|
| + PrepareInvalidation(ids, std::string(), client, ack_handle);
|
| }
|
|
|
| // This should behave as if we got an invalidation with version
|
| @@ -205,17 +237,60 @@ void SyncInvalidationListener::InvalidateAll(
|
| DCHECK_EQ(client, invalidation_client_.get());
|
| DVLOG(1) << "InvalidateAll";
|
|
|
| - const ObjectIdInvalidationMap& invalidation_map =
|
| - ObjectIdSetToInvalidationMap(registered_ids_, std::string());
|
| - EmitInvalidation(invalidation_map);
|
| - // TODO(akalin): We should really acknowledge only after we get the
|
| - // updates from the sync server. (see http://crbug.com/76482).
|
| - client->Acknowledge(ack_handle);
|
| + PrepareInvalidation(registered_ids_, std::string(), client, ack_handle);
|
| +}
|
| +
|
| +void SyncInvalidationListener::PrepareInvalidation(
|
| + const ObjectIdSet& ids,
|
| + const std::string& payload,
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::AckHandle& ack_handle) {
|
| + DCHECK(CalledOnValidThread());
|
| +
|
| + // A server invalidation resets the local retry count.
|
| + ack_tracker_.Ack(ids);
|
| + invalidation_state_tracker_.Call(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::GenerateAckHandles,
|
| + ids,
|
| + base::MessageLoopProxy::current(),
|
| + base::Bind(&SyncInvalidationListener::EmitInvalidation,
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + ids,
|
| + payload,
|
| + client,
|
| + ack_handle));
|
| }
|
|
|
| void SyncInvalidationListener::EmitInvalidation(
|
| - const ObjectIdInvalidationMap& invalidation_map) {
|
| + const ObjectIdSet& ids,
|
| + const std::string& payload,
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::AckHandle& ack_handle,
|
| + const AckHandleMap& local_ack_handles) {
|
| DCHECK(CalledOnValidThread());
|
| + ObjectIdInvalidationMap invalidation_map =
|
| + ObjectIdSetToInvalidationMap(ids, payload);
|
| + for (AckHandleMap::const_iterator it = local_ack_handles.begin();
|
| + it != local_ack_handles.end(); ++it) {
|
| + // Update in-memory copy of the invalidation state.
|
| + invalidation_state_map_[it->first].expected = it->second;
|
| + invalidation_map[it->first].ack_handle = it->second;
|
| + }
|
| + ack_tracker_.Track(ids);
|
| + delegate_->OnInvalidate(invalidation_map);
|
| + client->Acknowledge(ack_handle);
|
| +}
|
| +
|
| +void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) {
|
| + ObjectIdInvalidationMap invalidation_map;
|
| + for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
|
| + Invalidation invalidation;
|
| + invalidation.ack_handle = invalidation_state_map_[*it].expected;
|
| + invalidation.payload = invalidation_state_map_[*it].payload;
|
| + invalidation_map.insert(std::make_pair(*it, invalidation));
|
| + }
|
| +
|
| delegate_->OnInvalidate(invalidation_map);
|
| }
|
|
|
| @@ -306,6 +381,7 @@ void SyncInvalidationListener::DoRegistrationUpdate() {
|
| }
|
| invalidation_state_tracker_.Call(
|
| FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
|
| + ack_tracker_.Ack(unregistered_ids);
|
| }
|
|
|
| void SyncInvalidationListener::StopForTest() {
|
| @@ -318,12 +394,18 @@ InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const {
|
| return invalidation_state_map_;
|
| }
|
|
|
| +AckTracker* SyncInvalidationListener::GetAckTrackerForTest() {
|
| + return &ack_tracker_;
|
| +}
|
| +
|
| void SyncInvalidationListener::Stop() {
|
| DCHECK(CalledOnValidThread());
|
| if (!invalidation_client_.get()) {
|
| return;
|
| }
|
|
|
| + ack_tracker_.Clear();
|
| +
|
| registration_manager_.reset();
|
| sync_system_resources_.Stop();
|
| invalidation_client_->Stop();
|
|
|