Index: sync/notifier/sync_invalidation_listener.cc |
diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc |
index 9e8b2ad130dafd48e09cc7afe16d48a81354468f..264b8b27f27a6eef98ce78ce048edac0081e55c4 100644 |
--- a/sync/notifier/sync_invalidation_listener.cc |
+++ b/sync/notifier/sync_invalidation_listener.cc |
@@ -4,12 +4,13 @@ |
#include "sync/notifier/sync_invalidation_listener.h" |
-#include <string> |
#include <vector> |
+#include "base/bind.h" |
#include "base/callback.h" |
#include "base/compiler_specific.h" |
#include "base/logging.h" |
+#include "base/stl_util.h" |
#include "base/tracked_objects.h" |
#include "google/cacheinvalidation/include/invalidation-client.h" |
#include "google/cacheinvalidation/include/types.h" |
@@ -21,6 +22,14 @@ |
namespace { |
const char kApplicationName[] = "chrome-sync"; |
+const int kMaxDelayInSeconds = 600; |
+ |
+// TODO(dcheng): This is not testable yet. |
Pete Williamson
2012/09/14 18:28:22
How about a friend method that lets a test class c
dcheng
2012/09/14 19:08:49
I was actually thinking that we'd allow a TimeTick
|
+base::TimeDelta CalculateBackoffDelay(int retry_count) { |
+ // TODO(dcheng): This is not overflow safe. |
+ int delay = std::min(kMaxDelayInSeconds, (1 << retry_count) * 60); |
+ return base::TimeDelta::FromSeconds(delay); |
+} |
} // namespace |
@@ -30,7 +39,8 @@ SyncInvalidationListener::Delegate::~Delegate() {} |
SyncInvalidationListener::SyncInvalidationListener( |
scoped_ptr<notifier::PushClient> push_client) |
- : push_client_(push_client.get()), |
+ : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
+ push_client_(push_client.get()), |
sync_system_resources_(push_client.Pass(), |
ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
delegate_(NULL), |
@@ -52,7 +62,7 @@ void SyncInvalidationListener::Start( |
create_invalidation_client_callback, |
const std::string& client_id, const std::string& client_info, |
const std::string& state, |
- const InvalidationVersionMap& initial_max_invalidation_versions, |
+ const InvalidationStateMap& initial_invalidation_state_map, |
const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
Delegate* delegate) { |
DCHECK(CalledOnValidThread()); |
@@ -66,16 +76,16 @@ void SyncInvalidationListener::Start( |
// update the in-memory cache, while reads just return the cached state. |
sync_system_resources_.storage()->SetInitialState(state); |
- max_invalidation_versions_ = initial_max_invalidation_versions; |
- if (max_invalidation_versions_.empty()) { |
+ invalidation_state_map_ = initial_invalidation_state_map; |
+ if (invalidation_state_map_.empty()) { |
DVLOG(2) << "No initial max invalidation versions for any id"; |
} else { |
- for (InvalidationVersionMap::const_iterator it = |
- max_invalidation_versions_.begin(); |
- it != max_invalidation_versions_.end(); ++it) { |
+ for (InvalidationStateMap::const_iterator it = |
+ invalidation_state_map_.begin(); |
+ it != invalidation_state_map_.end(); ++it) { |
DVLOG(2) << "Initial max invalidation version for " |
<< ObjectIdToString(it->first) << " is " |
- << it->second; |
+ << it->second.version; |
} |
} |
invalidation_state_tracker_ = invalidation_state_tracker; |
@@ -113,6 +123,31 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
} |
} |
+void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, |
+ const AckHandle& ack_handle) { |
+ DCHECK(CalledOnValidThread()); |
+ InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); |
+ if (state_it == invalidation_state_map_.end()) |
+ return; |
+ state_it->second.current = ack_handle; |
+ if (state_it->second.expected.Equals(ack_handle)) { |
+ // If the received ack matches the expected ack, then we no longer need to |
+ // keep track of |id| since it is up-to-date. |
+ for (TimerQueue::iterator it = timer_queue_.begin(); |
+ it != timer_queue_.end(); ++it) { |
+ if (it->second.id == id) { |
+ timer_queue_.erase(it); |
+ if (timer_queue_.empty()) { |
+ timer_.Stop(); |
+ } |
+ return; |
+ } |
+ } |
+ // This shouldn't happen unless someone is acking multiple times. |
+ NOTREACHED(); |
+ } |
+} |
+ |
void SyncInvalidationListener::Ready( |
invalidation::InvalidationClient* client) { |
DCHECK(CalledOnValidThread()); |
@@ -140,17 +175,16 @@ void SyncInvalidationListener::Invalidate( |
// should drop invalidations for unregistered ids. We may also |
// have to filter it at a higher level, as invalidations for |
// newly-unregistered ids may already be in flight. |
- InvalidationVersionMap::const_iterator it = |
- max_invalidation_versions_.find(id); |
- if ((it != max_invalidation_versions_.end()) && |
- (invalidation.version() <= it->second)) { |
+ InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id); |
+ if ((it != invalidation_state_map_.end()) && |
+ (invalidation.version() <= it->second.version)) { |
// Drop redundant invalidations. |
client->Acknowledge(ack_handle); |
return; |
} |
DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
<< " to " << invalidation.version(); |
- max_invalidation_versions_[id] = invalidation.version(); |
+ invalidation_state_map_[id].version = invalidation.version(); |
invalidation_state_tracker_.Call( |
FROM_HERE, |
&InvalidationStateTracker::SetMaxVersion, |
@@ -161,12 +195,9 @@ void SyncInvalidationListener::Invalidate( |
if (invalidation.has_payload()) |
payload = invalidation.payload(); |
- ObjectIdStateMap id_state_map; |
- id_state_map[id].payload = payload; |
- EmitInvalidation(id_state_map); |
- // TODO(akalin): We should really acknowledge only after we get the |
- // updates from the sync server. (see http://crbug.com/78462). |
- client->Acknowledge(ack_handle); |
+ ObjectIdSet ids; |
+ ids.insert(id); |
+ PrepareInvalidation(ids, payload, client, ack_handle); |
} |
void SyncInvalidationListener::InvalidateUnknownVersion( |
@@ -177,12 +208,9 @@ void SyncInvalidationListener::InvalidateUnknownVersion( |
DCHECK_EQ(client, invalidation_client_.get()); |
DVLOG(1) << "InvalidateUnknownVersion"; |
- ObjectIdStateMap id_state_map; |
- id_state_map[object_id].payload = std::string(); |
- EmitInvalidation(id_state_map); |
- // TODO(akalin): We should really acknowledge only after we get the |
- // updates from the sync server. (see http://crbug.com/78462). |
- client->Acknowledge(ack_handle); |
+ ObjectIdSet ids; |
+ ids.insert(object_id); |
+ PrepareInvalidation(ids, std::string(), client, ack_handle); |
} |
// This should behave as if we got an invalidation with version |
@@ -194,21 +222,91 @@ void SyncInvalidationListener::InvalidateAll( |
DCHECK_EQ(client, invalidation_client_.get()); |
DVLOG(1) << "InvalidateAll"; |
- ObjectIdStateMap id_state_map; |
- for (ObjectIdSet::const_iterator it = registered_ids_.begin(); |
- it != registered_ids_.end(); ++it) { |
- id_state_map[*it].payload = std::string(); |
- } |
- EmitInvalidation(id_state_map); |
- // TODO(akalin): We should really acknowledge only after we get the |
- // updates from the sync server. (see http://crbug.com/76482). |
- client->Acknowledge(ack_handle); |
+ PrepareInvalidation(registered_ids_, std::string(), client, ack_handle); |
+} |
+ |
+void SyncInvalidationListener::PrepareInvalidation( |
+ const ObjectIdSet& ids, |
+ const std::string& payload, |
+ invalidation::InvalidationClient* client, |
+ const invalidation::AckHandle& ack_handle) { |
+ DCHECK(CalledOnValidThread()); |
+ |
+ scoped_ptr<AckHandleMap> local_ack_handles(new AckHandleMap); |
+ invalidation_state_tracker_.CallWithReply( |
+ FROM_HERE, |
+ &InvalidationStateTracker::GenerateAckHandles, |
+ ids, |
+ local_ack_handles.get(), |
+ base::Bind(&SyncInvalidationListener::EmitInvalidation, |
+ weak_ptr_factory_.GetWeakPtr(), |
+ ids, |
+ payload, |
+ base::Owned(local_ack_handles.release()), |
+ client, |
+ ack_handle)); |
} |
void SyncInvalidationListener::EmitInvalidation( |
- const ObjectIdStateMap& id_state_map) { |
+ const ObjectIdSet& ids, |
+ const std::string& payload, |
+ const AckHandleMap* const local_ack_handles, |
+ invalidation::InvalidationClient* client, |
+ const invalidation::AckHandle& ack_handle) { |
DCHECK(CalledOnValidThread()); |
+ ObjectIdStateMap id_state_map = ObjectIdSetToStateMap(ids, payload); |
+ // Erase any timer queue entries that correspond with an id in |ids|; a new |
+ // invalidation will reset the retry count. |
+ for (TimerQueue::iterator it = timer_queue_.begin(); |
+ it != timer_queue_.end(); ) { |
+ // We could be more clever here and reduce the complexity, since both |
+ // containers are already sorted. Since we don't expect timer_queue_ to be |
+ // very long in practice though, we just do things the simpler way. |
+ if (ContainsKey(ids, it->second.id)) { |
+ TimerQueue::iterator erase_it = it; |
+ ++it; |
+ timer_queue_.erase(erase_it); |
+ } else { |
+ ++it; |
+ } |
+ } |
+ const base::TimeTicks expiration_time = |
+ base::TimeTicks::Now() + CalculateBackoffDelay(0 /* retry_count */); |
+ for (AckHandleMap::const_iterator it = local_ack_handles->begin(); |
+ it != local_ack_handles->end(); ++it) { |
+ // Update in-memory copy of the invalidation state. |
+ // TODO(dcheng): Maybe add some debug checks that the in-memory copy here is |
+ // in sync with the copy in IST? |
+ invalidation_state_map_[it->first].expected = it->second; |
+ id_state_map[it->first].ack_handle = it->second; |
+ timer_queue_.insert(std::make_pair( |
+ expiration_time, QueueEntry(it->first, payload, 0 /* retry_count */))); |
+ } |
+ DCHECK(!timer_queue_.empty()); |
Pete Williamson
2012/09/14 18:28:22
Should we start a timer every time, or just start
dcheng
2012/09/14 19:08:49
I forgot to update the Start() call before uploadi
|
+ timer_.Start(FROM_HERE, base::TimeDelta::FromSeconds(1) /* should not be 1 */, |
+ this, &SyncInvalidationListener::RemitInvalidation); |
+ delegate_->OnInvalidate(id_state_map); |
+ client->Acknowledge(ack_handle); |
+} |
+ |
Pete Williamson
2012/09/14 18:28:22
A comment here would be helpful - does remit mean
dcheng
2012/09/14 19:08:49
I need to think of a better name for this function
|
+void SyncInvalidationListener::RemitInvalidation() { |
+ ObjectIdStateMap id_state_map; |
+ const base::TimeTicks now = base::TimeTicks::Now(); |
+ for (TimerQueue::iterator it = timer_queue_.begin(); |
+ it != timer_queue_.end() && it->first <= now; ) { |
Pete Williamson
2012/09/14 18:28:22
Is the queue sorted? If not, we might end up with
dcheng
2012/09/14 19:08:49
multimaps are sorted.
|
+ TimerQueue::iterator insert_it = timer_queue_.insert(std::make_pair( |
+ now + CalculateBackoffDelay(it->second.retry_count), it->second)); |
+ insert_it->second.retry_count++; |
Pete Williamson
2012/09/14 18:28:22
It looks like the first two calls will have a retr
dcheng
2012/09/14 19:08:49
Good catch. Fixed =)
|
+ InvalidationState state; |
+ state.ack_handle = invalidation_state_map_[it->second.id].expected; |
+ state.payload = it->second.payload; |
+ id_state_map.insert(std::make_pair(it->second.id, state)); |
+ TimerQueue::iterator erase_it = it; |
+ ++it; |
+ timer_queue_.erase(erase_it); |
+ } |
delegate_->OnInvalidate(id_state_map); |
+ |
} |
void SyncInvalidationListener::InformRegistrationStatus( |
@@ -294,6 +392,20 @@ void SyncInvalidationListener::DoRegistrationUpdate() { |
registration_manager_->UpdateRegisteredIds(registered_ids_); |
invalidation_state_tracker_.Call( |
FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
+ for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); |
+ it != unregistered_ids.end(); ++it) { |
+ invalidation_state_map_.erase(*it); |
+ } |
+ for (TimerQueue::iterator it = timer_queue_.begin(); |
+ it != timer_queue_.end(); ) { |
+ if (unregistered_ids.find(it->second.id) != unregistered_ids.end()) { |
+ TimerQueue::iterator erase_it = it; |
+ ++it; |
+ timer_queue_.erase(erase_it); |
+ } else { |
+ ++it; |
+ } |
+ } |
} |
void SyncInvalidationListener::StopForTest() { |
@@ -315,7 +427,7 @@ void SyncInvalidationListener::Stop() { |
delegate_ = NULL; |
invalidation_state_tracker_.Reset(); |
- max_invalidation_versions_.clear(); |
+ invalidation_state_map_.clear(); |
ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
} |