Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(496)

Unified Diff: sync/notifier/sync_invalidation_listener.cc

Issue 10911084: Implement Invalidator::Acknowledge (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Restart test + more cleanup Created 8 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sync/notifier/sync_invalidation_listener.cc
diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc
index ab70527f610096d47db0283c3acdaa90bbb22bd8..ae5acc9141582a70ac3118876f66b9f295516108 100644
--- a/sync/notifier/sync_invalidation_listener.cc
+++ b/sync/notifier/sync_invalidation_listener.cc
@@ -4,12 +4,13 @@
#include "sync/notifier/sync_invalidation_listener.h"
-#include <string>
#include <vector>
+#include "base/bind.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
+#include "base/stl_util.h"
#include "base/tracked_objects.h"
#include "google/cacheinvalidation/include/invalidation-client.h"
#include "google/cacheinvalidation/include/types.h"
@@ -21,6 +22,15 @@
namespace {
const char kApplicationName[] = "chrome-sync";
+const int kMaxDelayInSeconds = 600;
+
+base::TimeDelta CalculateBackoffDelay(int retry_count) {
akalin 2012/10/19 13:27:16 almost certain there's already an exp. backoff cla
Jói 2012/10/19 13:29:49 net/base/backoff_entry.h
dcheng 2012/10/19 19:38:11 I like this much better and will update the code a
+ int delay = kMaxDelayInSeconds;
+ // Lazy way to prevent overflow.
+ if (retry_count < 10)
+ delay = std::min(delay, (1 << retry_count) * 60);
+ return base::TimeDelta::FromSeconds(delay);
+}
} // namespace
@@ -30,7 +40,8 @@ SyncInvalidationListener::Delegate::~Delegate() {}
SyncInvalidationListener::SyncInvalidationListener(
scoped_ptr<notifier::PushClient> push_client)
- : push_client_(push_client.get()),
+ : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
+ push_client_(push_client.get()),
sync_system_resources_(push_client.Pass(),
ALLOW_THIS_IN_INITIALIZER_LIST(this)),
delegate_(NULL),
@@ -52,7 +63,7 @@ void SyncInvalidationListener::Start(
create_invalidation_client_callback,
const std::string& client_id, const std::string& client_info,
const std::string& invalidation_bootstrap_data,
- const InvalidationVersionMap& initial_max_invalidation_versions,
+ const InvalidationStateMap& initial_invalidation_state_map,
const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
Delegate* delegate) {
DCHECK(CalledOnValidThread());
@@ -67,16 +78,17 @@ void SyncInvalidationListener::Start(
sync_system_resources_.storage()->SetInitialState(
invalidation_bootstrap_data);
- max_invalidation_versions_ = initial_max_invalidation_versions;
- if (max_invalidation_versions_.empty()) {
+ invalidation_state_map_ = initial_invalidation_state_map;
+ if (invalidation_state_map_.empty()) {
DVLOG(2) << "No initial max invalidation versions for any id";
} else {
- for (InvalidationVersionMap::const_iterator it =
- max_invalidation_versions_.begin();
- it != max_invalidation_versions_.end(); ++it) {
+ // Start the reminder timer if we have unacknowledged local invalidations.
+ for (InvalidationStateMap::const_iterator it =
+ invalidation_state_map_.begin();
+ it != invalidation_state_map_.end(); ++it) {
DVLOG(2) << "Initial max invalidation version for "
<< ObjectIdToString(it->first) << " is "
- << it->second;
+ << it->second.version;
}
}
invalidation_state_tracker_ = invalidation_state_tracker;
@@ -95,6 +107,22 @@ void SyncInvalidationListener::Start(
registration_manager_.reset(
new RegistrationManager(invalidation_client_.get()));
+
+ // Start the reminder timer if we have unacknowledged local invalidations.
+ const base::TimeTicks now = base::TimeTicks::Now();
+ const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0);
+ for (InvalidationStateMap::const_iterator it =
+ invalidation_state_map_.begin();
+ it != invalidation_state_map_.end(); ++it) {
+ if (it->second.expected.Equals(it->second.current))
+ continue;
+ // TODO(dcheng): Save payload?
+ InsertId(expiration_time,
+ it->first,
+ std::string(),
+ 0 /* retry_count */);
+ }
+ UpdateTimer(now);
}
void SyncInvalidationListener::UpdateCredentials(
@@ -114,6 +142,39 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
}
}
+void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
+ const AckHandle& ack_handle) {
+ DCHECK(CalledOnValidThread());
+ InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
+ if (state_it == invalidation_state_map_.end())
+ return;
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::Acknowledge,
+ id,
+ ack_handle);
+ state_it->second.current = ack_handle;
+ if (state_it->second.expected.Equals(ack_handle)) {
+ // If the received ack matches the expected ack, then we no longer need to
+ // keep track of |id| since it is up-to-date. We use some heuristics to
+ // avoid unnecessarily resetting the timer, since this results in the timer
+ // having to post followup continuation tasks and also makes tests less
+ // deterministic.
+ bool update_timer = false;
+ // If we're removing the head of the queue, we may need to update the timer.
+ if (timer_queue_.begin()->second.id == id) {
+ TimerQueue::const_iterator it = timer_queue_.begin();
+ ++it;
+ // But only if there are no other entries with the same expiration time.
+ update_timer =
+ timer_queue_.upper_bound(timer_queue_.begin()->first) == it;
+ }
+ RemoveId(id);
+ if (update_timer)
+ UpdateTimer(base::TimeTicks::Now());
+ }
+}
+
void SyncInvalidationListener::Ready(
invalidation::InvalidationClient* client) {
DCHECK(CalledOnValidThread());
@@ -141,17 +202,16 @@ void SyncInvalidationListener::Invalidate(
// should drop invalidations for unregistered ids. We may also
// have to filter it at a higher level, as invalidations for
// newly-unregistered ids may already be in flight.
- InvalidationVersionMap::const_iterator it =
- max_invalidation_versions_.find(id);
- if ((it != max_invalidation_versions_.end()) &&
- (invalidation.version() <= it->second)) {
+ InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id);
+ if ((it != invalidation_state_map_.end()) &&
+ (invalidation.version() <= it->second.version)) {
// Drop redundant invalidations.
client->Acknowledge(ack_handle);
return;
}
DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
<< " to " << invalidation.version();
- max_invalidation_versions_[id] = invalidation.version();
+ invalidation_state_map_[id].version = invalidation.version();
invalidation_state_tracker_.Call(
FROM_HERE,
&InvalidationStateTracker::SetMaxVersion,
akalin 2012/10/19 13:27:16 can we combine the SetMaxVersion and GenerateAckHa
dcheng 2012/10/19 19:38:11 The reason they are split up is because Invalidat
@@ -162,12 +222,9 @@ void SyncInvalidationListener::Invalidate(
if (invalidation.has_payload())
payload = invalidation.payload();
- ObjectIdInvalidationMap invalidation_map;
- invalidation_map[id].payload = payload;
- EmitInvalidation(invalidation_map);
- // TODO(akalin): We should really acknowledge only after we get the
- // updates from the sync server. (see http://crbug.com/78462).
- client->Acknowledge(ack_handle);
+ ObjectIdSet ids;
+ ids.insert(id);
+ PrepareInvalidation(ids, payload, client, ack_handle);
}
void SyncInvalidationListener::InvalidateUnknownVersion(
@@ -178,12 +235,9 @@ void SyncInvalidationListener::InvalidateUnknownVersion(
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateUnknownVersion";
- ObjectIdInvalidationMap invalidation_map;
- invalidation_map[object_id].payload = std::string();
- EmitInvalidation(invalidation_map);
- // TODO(akalin): We should really acknowledge only after we get the
- // updates from the sync server. (see http://crbug.com/78462).
- client->Acknowledge(ack_handle);
+ ObjectIdSet ids;
+ ids.insert(object_id);
+ PrepareInvalidation(ids, std::string(), client, ack_handle);
}
// This should behave as if we got an invalidation with version
@@ -195,18 +249,142 @@ void SyncInvalidationListener::InvalidateAll(
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateAll";
- const ObjectIdInvalidationMap& invalidation_map =
- ObjectIdSetToInvalidationMap(registered_ids_, std::string());
- EmitInvalidation(invalidation_map);
- // TODO(akalin): We should really acknowledge only after we get the
- // updates from the sync server. (see http://crbug.com/76482).
- client->Acknowledge(ack_handle);
+ PrepareInvalidation(registered_ids_, std::string(), client, ack_handle);
+}
+
+void SyncInvalidationListener::PrepareInvalidation(
+ const ObjectIdSet& ids,
+ const std::string& payload,
+ invalidation::InvalidationClient* client,
+ const invalidation::AckHandle& ack_handle) {
+ DCHECK(CalledOnValidThread());
+
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::GenerateAckHandles,
+ ids,
+ base::MessageLoopProxy::current(),
+ base::Bind(&SyncInvalidationListener::EmitInvalidation,
+ weak_ptr_factory_.GetWeakPtr(),
+ ids,
+ payload,
+ client,
+ ack_handle));
}
void SyncInvalidationListener::EmitInvalidation(
- const ObjectIdInvalidationMap& invalidation_map) {
+ const ObjectIdSet& ids,
+ const std::string& payload,
+ invalidation::InvalidationClient* client,
+ const invalidation::AckHandle& ack_handle,
+ const AckHandleMap& local_ack_handles) {
DCHECK(CalledOnValidThread());
+ ObjectIdInvalidationMap invalidation_map =
+ ObjectIdSetToInvalidationMap(ids, payload);
+ // Erase any timer queue entries that correspond with an id in |ids| since a
+ // new invalidation from the server resets the retry count.
+ RemoveIds(ids);
+ const base::TimeTicks now = base::TimeTicks::Now();
+ const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0);
+ for (AckHandleMap::const_iterator it = local_ack_handles.begin();
+ it != local_ack_handles.end(); ++it) {
+ // Update in-memory copy of the invalidation state.
+ invalidation_state_map_[it->first].expected = it->second;
+ invalidation_map[it->first].ack_handle = it->second;
+ InsertId(expiration_time, it->first, payload, 0 /* retry_count */);
+ }
+ DCHECK(!timer_queue_.empty());
+ UpdateTimer(now);
+ delegate_->OnInvalidate(invalidation_map);
+ client->Acknowledge(ack_handle);
+}
+
+void SyncInvalidationListener::ResendUnacknowledgedInvalidations() {
+ ResendUnacknowledgedInvalidationsAt(base::TimeTicks::Now());
+}
+
+void SyncInvalidationListener::ResendUnacknowledgedInvalidationsAt(
+ base::TimeTicks expiration_time) {
+ DCHECK(!timer_queue_.empty()) << "Timer not canceled but queue is empty!";
+
+ // This is slightly redundant since in non-test code, we always satisfy
+ // the condition now == expiration_time. Having this makes the tests a bit
+ // more sane though.
+ const base::TimeTicks now = base::TimeTicks::Now();
+ TimerQueue::iterator end = timer_queue_.upper_bound(expiration_time);
+ // In theory, we can do in one loop since all the new insertions should be
+ // past |end|. To avoid potential infinite loops in case of a bug, we do it in
+ // two separate loops.
+ std::vector<QueueEntry> expired_entries;
+ for (TimerQueue::iterator it = timer_queue_.begin(); it != end; ) {
+ expired_entries.push_back(it->second);
+ TimerQueue::iterator erase_it = it;
+ ++it;
+ timer_queue_.erase(erase_it);
+ }
+
+ ObjectIdInvalidationMap invalidation_map;
+ for (std::vector<QueueEntry>::const_iterator it = expired_entries.begin();
+ it != expired_entries.end(); ++it) {
+ InsertId(now + CalculateBackoffDelay(it->retry_count + 1),
+ it->id,
+ it->payload,
+ it->retry_count + 1);
+ Invalidation invalidation;
+ invalidation.ack_handle = invalidation_state_map_[it->id].expected;
+ invalidation.payload = it->payload;
+ invalidation_map.insert(std::make_pair(it->id, invalidation));
+ }
+
delegate_->OnInvalidate(invalidation_map);
+ UpdateTimer(now);
+}
+
+void SyncInvalidationListener::InsertId(base::TimeTicks expiration_time,
+ const invalidation::ObjectId& id,
+ const std::string& payload,
+ int retry_count) {
+ timer_queue_.insert(std::make_pair(expiration_time,
+ QueueEntry(id, payload, retry_count)));
+}
+
+void SyncInvalidationListener::RemoveId(const invalidation::ObjectId& id) {
+ for (TimerQueue::iterator it = timer_queue_.begin(); it != timer_queue_.end();
+ ++it) {
+ if (it->second.id == id) {
+ timer_queue_.erase(it);
+ return;
+ }
+ }
+ // This shouldn't happen unless someone is acking multiple times.
+ NOTREACHED();
+}
+
+void SyncInvalidationListener::RemoveIds(const ObjectIdSet& ids) {
+ for (TimerQueue::iterator it = timer_queue_.begin();
+ it != timer_queue_.end(); ) {
+ // We could be more clever here and reduce the complexity, since both
+ // containers are already sorted. Since we don't expect timer_queue_ to be
+ // very long in practice though, we just do things the simpler way.
+ if (ContainsKey(ids, it->second.id)) {
+ TimerQueue::iterator erase_it = it;
+ ++it;
+ timer_queue_.erase(erase_it);
+ } else {
+ ++it;
+ }
+ }
+}
+
+void SyncInvalidationListener::UpdateTimer(base::TimeTicks now) {
+ if (timer_queue_.empty()) {
+ timer_.Stop();
+ } else {
+ timer_.Start(FROM_HERE,
+ timer_queue_.begin()->first - now,
+ this,
+ &SyncInvalidationListener::ResendUnacknowledgedInvalidations);
+ }
}
void SyncInvalidationListener::InformRegistrationStatus(
@@ -292,6 +470,31 @@ void SyncInvalidationListener::DoRegistrationUpdate() {
registration_manager_->UpdateRegisteredIds(registered_ids_);
invalidation_state_tracker_.Call(
FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
+ for (ObjectIdSet::const_iterator it = unregistered_ids.begin();
+ it != unregistered_ids.end(); ++it) {
+ invalidation_state_map_.erase(*it);
+ }
+ for (TimerQueue::iterator it = timer_queue_.begin();
+ it != timer_queue_.end(); ) {
+ if (unregistered_ids.find(it->second.id) != unregistered_ids.end()) {
+ TimerQueue::iterator erase_it = it;
+ ++it;
+ timer_queue_.erase(erase_it);
+ } else {
+ ++it;
+ }
+ }
+}
+
+bool SyncInvalidationListener::TriggerNextTimeoutForTest(
+ base::TimeTicks* next_invalidation_time) {
+ if (timer_queue_.empty()) {
+ CHECK(!timer_.IsRunning());
+ return false;
+ }
+ *next_invalidation_time = timer_queue_.begin()->first;
+ ResendUnacknowledgedInvalidationsAt(*next_invalidation_time);
+ return true;
}
void SyncInvalidationListener::StopForTest() {
@@ -305,6 +508,9 @@ void SyncInvalidationListener::Stop() {
return;
}
+ timer_.Stop();
+ timer_queue_.clear();
+
registration_manager_.reset();
sync_system_resources_.Stop();
invalidation_client_->Stop();
@@ -313,7 +519,7 @@ void SyncInvalidationListener::Stop() {
delegate_ = NULL;
invalidation_state_tracker_.Reset();
- max_invalidation_versions_.clear();
+ invalidation_state_map_.clear();
ticl_state_ = DEFAULT_INVALIDATION_ERROR;
push_client_state_ = DEFAULT_INVALIDATION_ERROR;
}

Powered by Google App Engine
This is Rietveld 408576698