| Index: sync/notifier/sync_invalidation_listener.cc
|
| diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc
|
| index 9e8b2ad130dafd48e09cc7afe16d48a81354468f..e221d5f31e1df3c90b9c0a700c2b3b131947654a 100644
|
| --- a/sync/notifier/sync_invalidation_listener.cc
|
| +++ b/sync/notifier/sync_invalidation_listener.cc
|
| @@ -4,12 +4,13 @@
|
|
|
| #include "sync/notifier/sync_invalidation_listener.h"
|
|
|
| -#include <string>
|
| #include <vector>
|
|
|
| +#include "base/bind.h"
|
| #include "base/callback.h"
|
| #include "base/compiler_specific.h"
|
| #include "base/logging.h"
|
| +#include "base/stl_util.h"
|
| #include "base/tracked_objects.h"
|
| #include "google/cacheinvalidation/include/invalidation-client.h"
|
| #include "google/cacheinvalidation/include/types.h"
|
| @@ -21,6 +22,14 @@
|
| namespace {
|
|
|
| const char kApplicationName[] = "chrome-sync";
|
| +const int kMaxDelayInSeconds = 600;
|
| +
|
| +// TODO(dcheng): This is not testable yet.
|
| +base::TimeDelta CalculateBackoffDelay(int retry_count) {
|
| + // TODO(dcheng): This is not overflow safe.
|
| + int delay = std::min(kMaxDelayInSeconds, (1 << retry_count) * 60);
|
| + return base::TimeDelta::FromSeconds(delay);
|
| +}
|
|
|
| } // namespace
|
|
|
| @@ -30,7 +39,8 @@ SyncInvalidationListener::Delegate::~Delegate() {}
|
|
|
| SyncInvalidationListener::SyncInvalidationListener(
|
| scoped_ptr<notifier::PushClient> push_client)
|
| - : push_client_(push_client.get()),
|
| + : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
|
| + push_client_(push_client.get()),
|
| sync_system_resources_(push_client.Pass(),
|
| ALLOW_THIS_IN_INITIALIZER_LIST(this)),
|
| delegate_(NULL),
|
| @@ -52,7 +62,7 @@ void SyncInvalidationListener::Start(
|
| create_invalidation_client_callback,
|
| const std::string& client_id, const std::string& client_info,
|
| const std::string& state,
|
| - const InvalidationVersionMap& initial_max_invalidation_versions,
|
| + const InvalidationStateMap& initial_invalidation_state_map,
|
| const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
|
| Delegate* delegate) {
|
| DCHECK(CalledOnValidThread());
|
| @@ -66,16 +76,16 @@ void SyncInvalidationListener::Start(
|
| // update the in-memory cache, while reads just return the cached state.
|
| sync_system_resources_.storage()->SetInitialState(state);
|
|
|
| - max_invalidation_versions_ = initial_max_invalidation_versions;
|
| - if (max_invalidation_versions_.empty()) {
|
| + invalidation_state_map_ = initial_invalidation_state_map;
|
| + if (invalidation_state_map_.empty()) {
|
| DVLOG(2) << "No initial max invalidation versions for any id";
|
| } else {
|
| - for (InvalidationVersionMap::const_iterator it =
|
| - max_invalidation_versions_.begin();
|
| - it != max_invalidation_versions_.end(); ++it) {
|
| + for (InvalidationStateMap::const_iterator it =
|
| + invalidation_state_map_.begin();
|
| + it != invalidation_state_map_.end(); ++it) {
|
| DVLOG(2) << "Initial max invalidation version for "
|
| << ObjectIdToString(it->first) << " is "
|
| - << it->second;
|
| + << it->second.version;
|
| }
|
| }
|
| invalidation_state_tracker_ = invalidation_state_tracker;
|
| @@ -113,6 +123,31 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
|
| }
|
| }
|
|
|
| +void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
|
| + const AckHandle& ack_handle) {
|
| + DCHECK(CalledOnValidThread());
|
| + InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
|
| + if (state_it == invalidation_state_map_.end())
|
| + return;
|
| + state_it->second.current = ack_handle;
|
| + if (state_it->second.expected.Equals(ack_handle)) {
|
| + // If the received ack matches the expected ack, then we no longer need to
|
| + // keep track of |id| since it is up-to-date.
|
| + for (TimerQueue::iterator it = timer_queue_.begin();
|
| + it != timer_queue_.end(); ++it) {
|
| + if (it->second.id == id) {
|
| + timer_queue_.erase(it);
|
| + if (timer_queue_.empty()) {
|
| + timer_.Stop();
|
| + }
|
| + return;
|
| + }
|
| + }
|
| + // This shouldn't happen unless someone is acking multiple times.
|
| + NOTREACHED();
|
| + }
|
| +}
|
| +
|
| void SyncInvalidationListener::Ready(
|
| invalidation::InvalidationClient* client) {
|
| DCHECK(CalledOnValidThread());
|
| @@ -140,17 +175,16 @@ void SyncInvalidationListener::Invalidate(
|
| // should drop invalidations for unregistered ids. We may also
|
| // have to filter it at a higher level, as invalidations for
|
| // newly-unregistered ids may already be in flight.
|
| - InvalidationVersionMap::const_iterator it =
|
| - max_invalidation_versions_.find(id);
|
| - if ((it != max_invalidation_versions_.end()) &&
|
| - (invalidation.version() <= it->second)) {
|
| + InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id);
|
| + if ((it != invalidation_state_map_.end()) &&
|
| + (invalidation.version() <= it->second.version)) {
|
| // Drop redundant invalidations.
|
| client->Acknowledge(ack_handle);
|
| return;
|
| }
|
| DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
|
| << " to " << invalidation.version();
|
| - max_invalidation_versions_[id] = invalidation.version();
|
| + invalidation_state_map_[id].version = invalidation.version();
|
| invalidation_state_tracker_.Call(
|
| FROM_HERE,
|
| &InvalidationStateTracker::SetMaxVersion,
|
| @@ -161,12 +195,9 @@ void SyncInvalidationListener::Invalidate(
|
| if (invalidation.has_payload())
|
| payload = invalidation.payload();
|
|
|
| - ObjectIdStateMap id_state_map;
|
| - id_state_map[id].payload = payload;
|
| - EmitInvalidation(id_state_map);
|
| - // TODO(akalin): We should really acknowledge only after we get the
|
| - // updates from the sync server. (see http://crbug.com/78462).
|
| - client->Acknowledge(ack_handle);
|
| + ObjectIdSet ids;
|
| + ids.insert(id);
|
| + PrepareInvalidation(ids, payload, client, ack_handle);
|
| }
|
|
|
| void SyncInvalidationListener::InvalidateUnknownVersion(
|
| @@ -177,12 +208,9 @@ void SyncInvalidationListener::InvalidateUnknownVersion(
|
| DCHECK_EQ(client, invalidation_client_.get());
|
| DVLOG(1) << "InvalidateUnknownVersion";
|
|
|
| - ObjectIdStateMap id_state_map;
|
| - id_state_map[object_id].payload = std::string();
|
| - EmitInvalidation(id_state_map);
|
| - // TODO(akalin): We should really acknowledge only after we get the
|
| - // updates from the sync server. (see http://crbug.com/78462).
|
| - client->Acknowledge(ack_handle);
|
| + ObjectIdSet ids;
|
| + ids.insert(object_id);
|
| + PrepareInvalidation(ids, std::string(), client, ack_handle);
|
| }
|
|
|
| // This should behave as if we got an invalidation with version
|
| @@ -194,21 +222,91 @@ void SyncInvalidationListener::InvalidateAll(
|
| DCHECK_EQ(client, invalidation_client_.get());
|
| DVLOG(1) << "InvalidateAll";
|
|
|
| - ObjectIdStateMap id_state_map;
|
| - for (ObjectIdSet::const_iterator it = registered_ids_.begin();
|
| - it != registered_ids_.end(); ++it) {
|
| - id_state_map[*it].payload = std::string();
|
| - }
|
| - EmitInvalidation(id_state_map);
|
| - // TODO(akalin): We should really acknowledge only after we get the
|
| - // updates from the sync server. (see http://crbug.com/76482).
|
| - client->Acknowledge(ack_handle);
|
| + PrepareInvalidation(registered_ids_, std::string(), client, ack_handle);
|
| +}
|
| +
|
| +void SyncInvalidationListener::PrepareInvalidation(
|
| + const ObjectIdSet& ids,
|
| + const std::string& payload,
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::AckHandle& ack_handle) {
|
| + DCHECK(CalledOnValidThread());
|
| +
|
| + scoped_ptr<AckHandleMap> local_ack_handles(new AckHandleMap);
|
| + invalidation_state_tracker_.CallWithReply(
|
| + FROM_HERE,
|
| + &InvalidationStateTracker::GenerateAckHandles,
|
| + ids,
|
| + local_ack_handles.get(),
|
| + base::Bind(&SyncInvalidationListener::EmitInvalidation,
|
| + weak_ptr_factory_.GetWeakPtr(),
|
| + ids,
|
| + payload,
|
| + base::Owned(local_ack_handles.release()),
|
| + client,
|
| + ack_handle));
|
| }
|
|
|
| void SyncInvalidationListener::EmitInvalidation(
|
| - const ObjectIdStateMap& id_state_map) {
|
| + const ObjectIdSet& ids,
|
| + const std::string& payload,
|
| + const AckHandleMap* const local_ack_handles,
|
| + invalidation::InvalidationClient* client,
|
| + const invalidation::AckHandle& ack_handle) {
|
| DCHECK(CalledOnValidThread());
|
| + ObjectIdStateMap id_state_map = ObjectIdSetToStateMap(ids, payload);
|
| + // Erase any timer queue entries that correspond with an id in |ids|; a new
|
| + // invalidation will reset the retry count.
|
| + for (TimerQueue::iterator it = timer_queue_.begin();
|
| + it != timer_queue_.end(); ) {
|
| + // We could be more clever here and reduce the complexity, since both
|
| + // containers are already sorted. Since we don't expect timer_queue_ to be
|
| + // very long in practice though, we just do things the simpler way.
|
| + if (ContainsKey(ids, it->second.id)) {
|
| + TimerQueue::iterator erase_it = it;
|
| + ++it;
|
| + timer_queue_.erase(erase_it);
|
| + } else {
|
| + ++it;
|
| + }
|
| + }
|
| + const base::TimeTicks expiration_time =
|
| + base::TimeTicks::Now() + CalculateBackoffDelay(0 /* retry_count */);
|
| + for (AckHandleMap::const_iterator it = local_ack_handles->begin();
|
| + it != local_ack_handles->end(); ++it) {
|
| + // Update in-memory copy of the invalidation state.
|
| + // TODO(dcheng): Maybe add some debug checks that the in-memory copy here is
|
| + // in sync with the copy in IST?
|
| + invalidation_state_map_[it->first].expected = it->second;
|
| + id_state_map[it->first].ack_handle = it->second;
|
| + timer_queue_.insert(std::make_pair(
|
| + expiration_time, QueueEntry(it->first, payload, 0 /* retry_count */)));
|
| + }
|
| + DCHECK(!timer_queue_.empty());
|
| + timer_.Start(FROM_HERE, base::TimeDelta::FromSeconds(1) /* should not be 1 */,
|
| + this, &SyncInvalidationListener::RemitInvalidation);
|
| delegate_->OnInvalidate(id_state_map);
|
| + client->Acknowledge(ack_handle);
|
| +}
|
| +
|
| +void SyncInvalidationListener::RemitInvalidation() {
|
| + ObjectIdStateMap id_state_map;
|
| + const base::TimeTicks now = base::TimeTicks::Now();
|
| + for (TimerQueue::iterator it = timer_queue_.begin();
|
| + it != timer_queue_.end() && it->first <= now; ) {
|
| + TimerQueue::iterator insert_it = timer_queue_.insert(std::make_pair(
|
| + now + CalculateBackoffDelay(it->second.retry_count), it->second));
|
| + insert_it->second.retry_count++;
|
| + InvalidationState state;
|
| + state.ack_handle = invalidation_state_map_[it->second.id].expected;
|
| + state.payload = it->second.payload;
|
| + id_state_map.insert(std::make_pair(it->second.id, state));
|
| + TimerQueue::iterator erase_it = it;
|
| + ++it;
|
| + timer_queue_.erase(erase_it);
|
| + }
|
| + delegate_->OnInvalidate(id_state_map);
|
| +
|
| }
|
|
|
| void SyncInvalidationListener::InformRegistrationStatus(
|
| @@ -294,6 +392,10 @@ void SyncInvalidationListener::DoRegistrationUpdate() {
|
| registration_manager_->UpdateRegisteredIds(registered_ids_);
|
| invalidation_state_tracker_.Call(
|
| FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
|
| + for (ObjectIdSet::const_iterator it = unregistered_ids.begin();
|
| + it != unregistered_ids.end(); ++it) {
|
| + invalidation_state_map_.erase(*it);
|
| + }
|
| }
|
|
|
| void SyncInvalidationListener::StopForTest() {
|
| @@ -315,7 +417,7 @@ void SyncInvalidationListener::Stop() {
|
| delegate_ = NULL;
|
|
|
| invalidation_state_tracker_.Reset();
|
| - max_invalidation_versions_.clear();
|
| + invalidation_state_map_.clear();
|
| ticl_state_ = DEFAULT_INVALIDATION_ERROR;
|
| push_client_state_ = DEFAULT_INVALIDATION_ERROR;
|
| }
|
|
|