Chromium Code Reviews| Index: sync/notifier/sync_invalidation_listener.cc |
| diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc |
| index 4e57bb85ed5a2096072ab572573d4d7cf01e431b..5ca0e30e45dfc50b167e322b8eaffb0c498007cc 100644 |
| --- a/sync/notifier/sync_invalidation_listener.cc |
| +++ b/sync/notifier/sync_invalidation_listener.cc |
| @@ -4,9 +4,9 @@ |
| #include "sync/notifier/sync_invalidation_listener.h" |
| -#include <string> |
| #include <vector> |
| +#include "base/bind.h" |
| #include "base/callback.h" |
| #include "base/compiler_specific.h" |
| #include "base/logging.h" |
| @@ -30,7 +30,9 @@ SyncInvalidationListener::Delegate::~Delegate() {} |
| SyncInvalidationListener::SyncInvalidationListener( |
| scoped_ptr<notifier::PushClient> push_client) |
| - : push_client_(push_client.get()), |
| + : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| + ack_tracker_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| + push_client_(push_client.get()), |
| sync_system_resources_(push_client.Pass(), |
| ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| delegate_(NULL), |
| @@ -105,6 +107,19 @@ void SyncInvalidationListener::Start( |
| FROM_HERE, |
| &InvalidationStateTracker::SetInvalidatorClientId, |
| client_id); |
| + |
| + // Set up reminders for any invalidations that have not been locally |
| + // acknowledged. |
| + ObjectIdSet unacknowledged_ids; |
| + for (InvalidationStateMap::const_iterator it = |
| + invalidation_state_map_.begin(); |
| + it != invalidation_state_map_.end(); ++it) { |
| + if (it->second.expected.Equals(it->second.current)) |
| + continue; |
| + unacknowledged_ids.insert(it->first); |
| + } |
| + if (!unacknowledged_ids.empty()) |
| + ack_tracker_.Track(unacknowledged_ids); |
| } |
| void SyncInvalidationListener::UpdateCredentials( |
| @@ -124,6 +139,27 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
| } |
| } |
| +void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, |
| + const AckHandle& ack_handle) { |
| + DCHECK(CalledOnValidThread()); |
| + InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); |
| + if (state_it == invalidation_state_map_.end()) |
| + return; |
| + invalidation_state_tracker_.Call( |
| + FROM_HERE, |
| + &InvalidationStateTracker::Acknowledge, |
| + id, |
| + ack_handle); |
| + state_it->second.current = ack_handle; |
| + if (state_it->second.expected.Equals(ack_handle)) { |
| + // If the received ack matches the expected ack, then we no longer need to |
| + // keep track of |id| since it is up-to-date. |
|
Pete Williamson
2013/02/16 01:41:06
This comment confuses me a bit, we say that we don
dcheng
2013/02/22 02:51:30
Right, AckTracker has a Track() and an Ack() funct
|
| + ObjectIdSet ids; |
| + ids.insert(id); |
| + ack_tracker_.Ack(ids); |
| + } |
| +} |
| + |
| void SyncInvalidationListener::Ready( |
| invalidation::InvalidationClient* client) { |
| DCHECK(CalledOnValidThread()); |
| @@ -167,17 +203,15 @@ void SyncInvalidationListener::Invalidate( |
| DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
| << " to " << invalidation.version(); |
| invalidation_state_map_[id].version = invalidation.version(); |
| + invalidation_state_map_[id].payload = payload; |
| invalidation_state_tracker_.Call( |
| FROM_HERE, |
| &InvalidationStateTracker::SetMaxVersionAndPayload, |
| id, invalidation.version(), payload); |
| - ObjectIdInvalidationMap invalidation_map; |
| - invalidation_map[id].payload = payload; |
| - EmitInvalidation(invalidation_map); |
| - // TODO(akalin): We should really acknowledge only after we get the |
| - // updates from the sync server. (see http://crbug.com/78462). |
| - client->Acknowledge(ack_handle); |
| + ObjectIdSet ids; |
| + ids.insert(id); |
| + PrepareInvalidation(ids, payload, client, ack_handle); |
| } |
| void SyncInvalidationListener::InvalidateUnknownVersion( |
| @@ -188,12 +222,9 @@ void SyncInvalidationListener::InvalidateUnknownVersion( |
| DCHECK_EQ(client, invalidation_client_.get()); |
| DVLOG(1) << "InvalidateUnknownVersion"; |
| - ObjectIdInvalidationMap invalidation_map; |
| - invalidation_map[object_id].payload = std::string(); |
| - EmitInvalidation(invalidation_map); |
| - // TODO(akalin): We should really acknowledge only after we get the |
| - // updates from the sync server. (see http://crbug.com/78462). |
| - client->Acknowledge(ack_handle); |
| + ObjectIdSet ids; |
| + ids.insert(object_id); |
| + PrepareInvalidation(ids, std::string(), client, ack_handle); |
| } |
| // This should behave as if we got an invalidation with version |
| @@ -205,17 +236,60 @@ void SyncInvalidationListener::InvalidateAll( |
| DCHECK_EQ(client, invalidation_client_.get()); |
| DVLOG(1) << "InvalidateAll"; |
| - const ObjectIdInvalidationMap& invalidation_map = |
| - ObjectIdSetToInvalidationMap(registered_ids_, std::string()); |
| - EmitInvalidation(invalidation_map); |
| - // TODO(akalin): We should really acknowledge only after we get the |
| - // updates from the sync server. (see http://crbug.com/76482). |
| - client->Acknowledge(ack_handle); |
| + PrepareInvalidation(registered_ids_, std::string(), client, ack_handle); |
| +} |
| + |
| +void SyncInvalidationListener::PrepareInvalidation( |
| + const ObjectIdSet& ids, |
| + const std::string& payload, |
| + invalidation::InvalidationClient* client, |
| + const invalidation::AckHandle& ack_handle) { |
| + DCHECK(CalledOnValidThread()); |
| + |
| + // A server invalidation resets the local retry count. |
| + ack_tracker_.Ack(ids); |
| + invalidation_state_tracker_.Call( |
| + FROM_HERE, |
| + &InvalidationStateTracker::GenerateAckHandles, |
| + ids, |
| + base::MessageLoopProxy::current(), |
| + base::Bind(&SyncInvalidationListener::EmitInvalidation, |
| + weak_ptr_factory_.GetWeakPtr(), |
| + ids, |
| + payload, |
| + client, |
| + ack_handle)); |
| } |
| void SyncInvalidationListener::EmitInvalidation( |
| - const ObjectIdInvalidationMap& invalidation_map) { |
| + const ObjectIdSet& ids, |
| + const std::string& payload, |
| + invalidation::InvalidationClient* client, |
| + const invalidation::AckHandle& ack_handle, |
| + const AckHandleMap& local_ack_handles) { |
| DCHECK(CalledOnValidThread()); |
| + ObjectIdInvalidationMap invalidation_map = |
| + ObjectIdSetToInvalidationMap(ids, payload); |
| + for (AckHandleMap::const_iterator it = local_ack_handles.begin(); |
| + it != local_ack_handles.end(); ++it) { |
| + // Update in-memory copy of the invalidation state. |
| + invalidation_state_map_[it->first].expected = it->second; |
| + invalidation_map[it->first].ack_handle = it->second; |
| + } |
| + ack_tracker_.Track(ids); |
| + delegate_->OnInvalidate(invalidation_map); |
| + client->Acknowledge(ack_handle); |
| +} |
| + |
| +void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { |
|
Pete Williamson
2013/02/16 01:41:06
A comment on the "OnTimeout" function might be nic
dcheng
2013/02/22 02:51:30
Hmm. The delegate interface describes what this ca
|
| + ObjectIdInvalidationMap invalidation_map; |
| + for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { |
| + Invalidation invalidation; |
| + invalidation.ack_handle = invalidation_state_map_[*it].expected; |
| + invalidation.payload = invalidation_state_map_[*it].payload; |
| + invalidation_map.insert(std::make_pair(*it, invalidation)); |
| + } |
| + |
| delegate_->OnInvalidate(invalidation_map); |
| } |
| @@ -306,6 +380,7 @@ void SyncInvalidationListener::DoRegistrationUpdate() { |
| } |
| invalidation_state_tracker_.Call( |
| FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
| + ack_tracker_.Ack(unregistered_ids); |
| } |
| void SyncInvalidationListener::StopForTest() { |
| @@ -318,12 +393,18 @@ InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { |
| return invalidation_state_map_; |
| } |
| +AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { |
| + return &ack_tracker_; |
| +} |
| + |
| void SyncInvalidationListener::Stop() { |
| DCHECK(CalledOnValidThread()); |
| if (!invalidation_client_.get()) { |
| return; |
| } |
| + ack_tracker_.Clear(); |
| + |
| registration_manager_.reset(); |
| sync_system_resources_.Stop(); |
| invalidation_client_->Stop(); |