Chromium Code Reviews| Index: sync/notifier/sync_invalidation_listener.cc |
| diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc |
| index ab70527f610096d47db0283c3acdaa90bbb22bd8..ae5acc9141582a70ac3118876f66b9f295516108 100644 |
| --- a/sync/notifier/sync_invalidation_listener.cc |
| +++ b/sync/notifier/sync_invalidation_listener.cc |
| @@ -4,12 +4,13 @@ |
| #include "sync/notifier/sync_invalidation_listener.h" |
| -#include <string> |
| #include <vector> |
| +#include "base/bind.h" |
| #include "base/callback.h" |
| #include "base/compiler_specific.h" |
| #include "base/logging.h" |
| +#include "base/stl_util.h" |
| #include "base/tracked_objects.h" |
| #include "google/cacheinvalidation/include/invalidation-client.h" |
| #include "google/cacheinvalidation/include/types.h" |
| @@ -21,6 +22,15 @@ |
| namespace { |
| const char kApplicationName[] = "chrome-sync"; |
| +const int kMaxDelayInSeconds = 600; |
| + |
| +base::TimeDelta CalculateBackoffDelay(int retry_count) { |
|
akalin
2012/10/19 13:27:16
almost certain there's already an exp. backoff cla
Jói
2012/10/19 13:29:49
net/base/backoff_entry.h
dcheng
2012/10/19 19:38:11
I like this much better and will update the code a
|
| + int delay = kMaxDelayInSeconds; |
| + // Lazy way to prevent overflow. |
| + if (retry_count < 10) |
| + delay = std::min(delay, (1 << retry_count) * 60); |
| + return base::TimeDelta::FromSeconds(delay); |
| +} |
| } // namespace |
| @@ -30,7 +40,8 @@ SyncInvalidationListener::Delegate::~Delegate() {} |
| SyncInvalidationListener::SyncInvalidationListener( |
| scoped_ptr<notifier::PushClient> push_client) |
| - : push_client_(push_client.get()), |
| + : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| + push_client_(push_client.get()), |
| sync_system_resources_(push_client.Pass(), |
| ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| delegate_(NULL), |
| @@ -52,7 +63,7 @@ void SyncInvalidationListener::Start( |
| create_invalidation_client_callback, |
| const std::string& client_id, const std::string& client_info, |
| const std::string& invalidation_bootstrap_data, |
| - const InvalidationVersionMap& initial_max_invalidation_versions, |
| + const InvalidationStateMap& initial_invalidation_state_map, |
| const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
| Delegate* delegate) { |
| DCHECK(CalledOnValidThread()); |
| @@ -67,16 +78,17 @@ void SyncInvalidationListener::Start( |
| sync_system_resources_.storage()->SetInitialState( |
| invalidation_bootstrap_data); |
| - max_invalidation_versions_ = initial_max_invalidation_versions; |
| - if (max_invalidation_versions_.empty()) { |
| + invalidation_state_map_ = initial_invalidation_state_map; |
| + if (invalidation_state_map_.empty()) { |
| DVLOG(2) << "No initial max invalidation versions for any id"; |
| } else { |
| - for (InvalidationVersionMap::const_iterator it = |
| - max_invalidation_versions_.begin(); |
| - it != max_invalidation_versions_.end(); ++it) { |
| + // Start the reminder timer if we have unacknowledged local invalidations. |
| + for (InvalidationStateMap::const_iterator it = |
| + invalidation_state_map_.begin(); |
| + it != invalidation_state_map_.end(); ++it) { |
| DVLOG(2) << "Initial max invalidation version for " |
| << ObjectIdToString(it->first) << " is " |
| - << it->second; |
| + << it->second.version; |
| } |
| } |
| invalidation_state_tracker_ = invalidation_state_tracker; |
| @@ -95,6 +107,22 @@ void SyncInvalidationListener::Start( |
| registration_manager_.reset( |
| new RegistrationManager(invalidation_client_.get())); |
| + |
| + // Start the reminder timer if we have unacknowledged local invalidations. |
| + const base::TimeTicks now = base::TimeTicks::Now(); |
| + const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0); |
| + for (InvalidationStateMap::const_iterator it = |
| + invalidation_state_map_.begin(); |
| + it != invalidation_state_map_.end(); ++it) { |
| + if (it->second.expected.Equals(it->second.current)) |
| + continue; |
| + // TODO(dcheng): Save payload? |
| + InsertId(expiration_time, |
| + it->first, |
| + std::string(), |
| + 0 /* retry_count */); |
| + } |
| + UpdateTimer(now); |
| } |
| void SyncInvalidationListener::UpdateCredentials( |
| @@ -114,6 +142,39 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
| } |
| } |
| +void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, |
| + const AckHandle& ack_handle) { |
| + DCHECK(CalledOnValidThread()); |
| + InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); |
| + if (state_it == invalidation_state_map_.end()) |
| + return; |
| + invalidation_state_tracker_.Call( |
| + FROM_HERE, |
| + &InvalidationStateTracker::Acknowledge, |
| + id, |
| + ack_handle); |
| + state_it->second.current = ack_handle; |
| + if (state_it->second.expected.Equals(ack_handle)) { |
| + // If the received ack matches the expected ack, then we no longer need to |
| + // keep track of |id| since it is up-to-date. We use some heuristics to |
| + // avoid unnecessarily resetting the timer, since this results in the timer |
| + // having to post followup continuation tasks and also makes tests less |
| + // deterministic. |
| + bool update_timer = false; |
| + // If we're removing the head of the queue, we may need to update the timer. |
| + if (timer_queue_.begin()->second.id == id) { |
| + TimerQueue::const_iterator it = timer_queue_.begin(); |
| + ++it; |
| + // But only if there are no other entries with the same expiration time. |
| + update_timer = |
| + timer_queue_.upper_bound(timer_queue_.begin()->first) == it; |
| + } |
| + RemoveId(id); |
| + if (update_timer) |
| + UpdateTimer(base::TimeTicks::Now()); |
| + } |
| +} |
| + |
| void SyncInvalidationListener::Ready( |
| invalidation::InvalidationClient* client) { |
| DCHECK(CalledOnValidThread()); |
| @@ -141,17 +202,16 @@ void SyncInvalidationListener::Invalidate( |
| // should drop invalidations for unregistered ids. We may also |
| // have to filter it at a higher level, as invalidations for |
| // newly-unregistered ids may already be in flight. |
| - InvalidationVersionMap::const_iterator it = |
| - max_invalidation_versions_.find(id); |
| - if ((it != max_invalidation_versions_.end()) && |
| - (invalidation.version() <= it->second)) { |
| + InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id); |
| + if ((it != invalidation_state_map_.end()) && |
| + (invalidation.version() <= it->second.version)) { |
| // Drop redundant invalidations. |
| client->Acknowledge(ack_handle); |
| return; |
| } |
| DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
| << " to " << invalidation.version(); |
| - max_invalidation_versions_[id] = invalidation.version(); |
| + invalidation_state_map_[id].version = invalidation.version(); |
| invalidation_state_tracker_.Call( |
| FROM_HERE, |
| &InvalidationStateTracker::SetMaxVersion, |
|
akalin
2012/10/19 13:27:16
can we combine the SetMaxVersion and GenerateAckHa
dcheng
2012/10/19 19:38:11
The reason they are split up is because Invalidat
|
| @@ -162,12 +222,9 @@ void SyncInvalidationListener::Invalidate( |
| if (invalidation.has_payload()) |
| payload = invalidation.payload(); |
| - ObjectIdInvalidationMap invalidation_map; |
| - invalidation_map[id].payload = payload; |
| - EmitInvalidation(invalidation_map); |
| - // TODO(akalin): We should really acknowledge only after we get the |
| - // updates from the sync server. (see http://crbug.com/78462). |
| - client->Acknowledge(ack_handle); |
| + ObjectIdSet ids; |
| + ids.insert(id); |
| + PrepareInvalidation(ids, payload, client, ack_handle); |
| } |
| void SyncInvalidationListener::InvalidateUnknownVersion( |
| @@ -178,12 +235,9 @@ void SyncInvalidationListener::InvalidateUnknownVersion( |
| DCHECK_EQ(client, invalidation_client_.get()); |
| DVLOG(1) << "InvalidateUnknownVersion"; |
| - ObjectIdInvalidationMap invalidation_map; |
| - invalidation_map[object_id].payload = std::string(); |
| - EmitInvalidation(invalidation_map); |
| - // TODO(akalin): We should really acknowledge only after we get the |
| - // updates from the sync server. (see http://crbug.com/78462). |
| - client->Acknowledge(ack_handle); |
| + ObjectIdSet ids; |
| + ids.insert(object_id); |
| + PrepareInvalidation(ids, std::string(), client, ack_handle); |
| } |
| // This should behave as if we got an invalidation with version |
| @@ -195,18 +249,142 @@ void SyncInvalidationListener::InvalidateAll( |
| DCHECK_EQ(client, invalidation_client_.get()); |
| DVLOG(1) << "InvalidateAll"; |
| - const ObjectIdInvalidationMap& invalidation_map = |
| - ObjectIdSetToInvalidationMap(registered_ids_, std::string()); |
| - EmitInvalidation(invalidation_map); |
| - // TODO(akalin): We should really acknowledge only after we get the |
| - // updates from the sync server. (see http://crbug.com/76482). |
| - client->Acknowledge(ack_handle); |
| + PrepareInvalidation(registered_ids_, std::string(), client, ack_handle); |
| +} |
| + |
| +void SyncInvalidationListener::PrepareInvalidation( |
| + const ObjectIdSet& ids, |
| + const std::string& payload, |
| + invalidation::InvalidationClient* client, |
| + const invalidation::AckHandle& ack_handle) { |
| + DCHECK(CalledOnValidThread()); |
| + |
| + invalidation_state_tracker_.Call( |
| + FROM_HERE, |
| + &InvalidationStateTracker::GenerateAckHandles, |
| + ids, |
| + base::MessageLoopProxy::current(), |
| + base::Bind(&SyncInvalidationListener::EmitInvalidation, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + ids, |
| + payload, |
| + client, |
| + ack_handle)); |
| } |
| void SyncInvalidationListener::EmitInvalidation( |
| - const ObjectIdInvalidationMap& invalidation_map) { |
| + const ObjectIdSet& ids, |
| + const std::string& payload, |
| + invalidation::InvalidationClient* client, |
| + const invalidation::AckHandle& ack_handle, |
| + const AckHandleMap& local_ack_handles) { |
| DCHECK(CalledOnValidThread()); |
| + ObjectIdInvalidationMap invalidation_map = |
| + ObjectIdSetToInvalidationMap(ids, payload); |
| + // Erase any timer queue entries that correspond with an id in |ids| since a |
| + // new invalidation from the server resets the retry count. |
| + RemoveIds(ids); |
| + const base::TimeTicks now = base::TimeTicks::Now(); |
| + const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0); |
| + for (AckHandleMap::const_iterator it = local_ack_handles.begin(); |
| + it != local_ack_handles.end(); ++it) { |
| + // Update in-memory copy of the invalidation state. |
| + invalidation_state_map_[it->first].expected = it->second; |
| + invalidation_map[it->first].ack_handle = it->second; |
| + InsertId(expiration_time, it->first, payload, 0 /* retry_count */); |
| + } |
| + DCHECK(!timer_queue_.empty()); |
| + UpdateTimer(now); |
| + delegate_->OnInvalidate(invalidation_map); |
| + client->Acknowledge(ack_handle); |
| +} |
| + |
| +void SyncInvalidationListener::ResendUnacknowledgedInvalidations() { |
| + ResendUnacknowledgedInvalidationsAt(base::TimeTicks::Now()); |
| +} |
| + |
| +void SyncInvalidationListener::ResendUnacknowledgedInvalidationsAt( |
| + base::TimeTicks expiration_time) { |
| + DCHECK(!timer_queue_.empty()) << "Timer not canceled but queue is empty!"; |
| + |
| + // This is slightly redundant since in non-test code, we always satisfy |
| + // the condition now == expiration_time. Having this makes the tests a bit |
| + // more sane though. |
| + const base::TimeTicks now = base::TimeTicks::Now(); |
| + TimerQueue::iterator end = timer_queue_.upper_bound(expiration_time); |
| + // In theory, we can do in one loop since all the new insertions should be |
| + // past |end|. To avoid potential infinite loops in case of a bug, we do it in |
| + // two separate loops. |
| + std::vector<QueueEntry> expired_entries; |
| + for (TimerQueue::iterator it = timer_queue_.begin(); it != end; ) { |
| + expired_entries.push_back(it->second); |
| + TimerQueue::iterator erase_it = it; |
| + ++it; |
| + timer_queue_.erase(erase_it); |
| + } |
| + |
| + ObjectIdInvalidationMap invalidation_map; |
| + for (std::vector<QueueEntry>::const_iterator it = expired_entries.begin(); |
| + it != expired_entries.end(); ++it) { |
| + InsertId(now + CalculateBackoffDelay(it->retry_count + 1), |
| + it->id, |
| + it->payload, |
| + it->retry_count + 1); |
| + Invalidation invalidation; |
| + invalidation.ack_handle = invalidation_state_map_[it->id].expected; |
| + invalidation.payload = it->payload; |
| + invalidation_map.insert(std::make_pair(it->id, invalidation)); |
| + } |
| + |
| delegate_->OnInvalidate(invalidation_map); |
| + UpdateTimer(now); |
| +} |
| + |
| +void SyncInvalidationListener::InsertId(base::TimeTicks expiration_time, |
| + const invalidation::ObjectId& id, |
| + const std::string& payload, |
| + int retry_count) { |
| + timer_queue_.insert(std::make_pair(expiration_time, |
| + QueueEntry(id, payload, retry_count))); |
| +} |
| + |
| +void SyncInvalidationListener::RemoveId(const invalidation::ObjectId& id) { |
| + for (TimerQueue::iterator it = timer_queue_.begin(); it != timer_queue_.end(); |
| + ++it) { |
| + if (it->second.id == id) { |
| + timer_queue_.erase(it); |
| + return; |
| + } |
| + } |
| + // This shouldn't happen unless someone is acking multiple times. |
| + NOTREACHED(); |
| +} |
| + |
| +void SyncInvalidationListener::RemoveIds(const ObjectIdSet& ids) { |
| + for (TimerQueue::iterator it = timer_queue_.begin(); |
| + it != timer_queue_.end(); ) { |
| + // We could be more clever here and reduce the complexity, since both |
| + // containers are already sorted. Since we don't expect timer_queue_ to be |
| + // very long in practice though, we just do things the simpler way. |
| + if (ContainsKey(ids, it->second.id)) { |
| + TimerQueue::iterator erase_it = it; |
| + ++it; |
| + timer_queue_.erase(erase_it); |
| + } else { |
| + ++it; |
| + } |
| + } |
| +} |
| + |
| +void SyncInvalidationListener::UpdateTimer(base::TimeTicks now) { |
| + if (timer_queue_.empty()) { |
| + timer_.Stop(); |
| + } else { |
| + timer_.Start(FROM_HERE, |
| + timer_queue_.begin()->first - now, |
| + this, |
| + &SyncInvalidationListener::ResendUnacknowledgedInvalidations); |
| + } |
| } |
| void SyncInvalidationListener::InformRegistrationStatus( |
| @@ -292,6 +470,31 @@ void SyncInvalidationListener::DoRegistrationUpdate() { |
| registration_manager_->UpdateRegisteredIds(registered_ids_); |
| invalidation_state_tracker_.Call( |
| FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
| + for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); |
| + it != unregistered_ids.end(); ++it) { |
| + invalidation_state_map_.erase(*it); |
| + } |
| + for (TimerQueue::iterator it = timer_queue_.begin(); |
| + it != timer_queue_.end(); ) { |
| + if (unregistered_ids.find(it->second.id) != unregistered_ids.end()) { |
| + TimerQueue::iterator erase_it = it; |
| + ++it; |
| + timer_queue_.erase(erase_it); |
| + } else { |
| + ++it; |
| + } |
| + } |
| +} |
| + |
| +bool SyncInvalidationListener::TriggerNextTimeoutForTest( |
| + base::TimeTicks* next_invalidation_time) { |
| + if (timer_queue_.empty()) { |
| + CHECK(!timer_.IsRunning()); |
| + return false; |
| + } |
| + *next_invalidation_time = timer_queue_.begin()->first; |
| + ResendUnacknowledgedInvalidationsAt(*next_invalidation_time); |
| + return true; |
| } |
| void SyncInvalidationListener::StopForTest() { |
| @@ -305,6 +508,9 @@ void SyncInvalidationListener::Stop() { |
| return; |
| } |
| + timer_.Stop(); |
| + timer_queue_.clear(); |
| + |
| registration_manager_.reset(); |
| sync_system_resources_.Stop(); |
| invalidation_client_->Stop(); |
| @@ -313,7 +519,7 @@ void SyncInvalidationListener::Stop() { |
| delegate_ = NULL; |
| invalidation_state_tracker_.Reset(); |
| - max_invalidation_versions_.clear(); |
| + invalidation_state_map_.clear(); |
| ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
| push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
| } |