Chromium Code Reviews| Index: sync/notifier/sync_invalidation_listener.cc |
| diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc |
| index 763cc876bf0e954ca1e8a28c6f78c707215ce4ea..0f2252f1912c73be1da021ddba81224318f34b4c 100644 |
| --- a/sync/notifier/sync_invalidation_listener.cc |
| +++ b/sync/notifier/sync_invalidation_listener.cc |
| @@ -16,14 +16,13 @@ |
| #include "google/cacheinvalidation/types.pb.h" |
| #include "jingle/notifier/listener/push_client.h" |
| #include "sync/notifier/invalidation_util.h" |
| +#include "sync/notifier/object_id_invalidation_map.h" |
| #include "sync/notifier/registration_manager.h" |
| namespace { |
| const char kApplicationName[] = "chrome-sync"; |
| -static const int64 kUnknownVersion = -1; |
| - |
| } // namespace |
| namespace syncer { |
| @@ -31,10 +30,8 @@ namespace syncer { |
| SyncInvalidationListener::Delegate::~Delegate() {} |
| SyncInvalidationListener::SyncInvalidationListener( |
| - base::TickClock* tick_clock, |
| scoped_ptr<notifier::PushClient> push_client) |
| - : ack_tracker_(tick_clock, this), |
| - push_client_(push_client.get()), |
| + : push_client_(push_client.get()), |
|
akalin
2013/11/21 04:31:36
Should not have .get(). Otherwise, you'd have two
rlarocque
2013/11/21 20:09:27
This hasn't changed in a long time. I think this
|
| sync_system_resources_(push_client.Pass(), this), |
| delegate_(NULL), |
| ticl_state_(DEFAULT_INVALIDATION_ERROR), |
| @@ -56,7 +53,7 @@ void SyncInvalidationListener::Start( |
| create_invalidation_client_callback, |
| const std::string& client_id, const std::string& client_info, |
| const std::string& invalidation_bootstrap_data, |
| - const InvalidationStateMap& initial_invalidation_state_map, |
| + const UnackedInvalidationsMap& initial_unacked_invalidations, |
| const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
| Delegate* delegate) { |
| DCHECK(CalledOnValidThread()); |
| @@ -71,18 +68,7 @@ void SyncInvalidationListener::Start( |
| sync_system_resources_.storage()->SetInitialState( |
| invalidation_bootstrap_data); |
| - invalidation_state_map_ = initial_invalidation_state_map; |
| - if (invalidation_state_map_.empty()) { |
| - DVLOG(2) << "No initial max invalidation versions for any id"; |
| - } else { |
| - for (InvalidationStateMap::const_iterator it = |
| - invalidation_state_map_.begin(); |
| - it != invalidation_state_map_.end(); ++it) { |
| - DVLOG(2) << "Initial max invalidation version for " |
| - << ObjectIdToString(it->first) << " is " |
| - << it->second.version; |
| - } |
| - } |
| + unacked_invalidations_map_ = initial_unacked_invalidations; |
| invalidation_state_tracker_ = invalidation_state_tracker; |
| DCHECK(invalidation_state_tracker_.IsInitialized()); |
| @@ -103,19 +89,6 @@ void SyncInvalidationListener::Start( |
| registration_manager_.reset( |
| new RegistrationManager(invalidation_client_.get())); |
| - |
| - // Set up reminders for any invalidations that have not been locally |
| - // acknowledged. |
| - ObjectIdSet unacknowledged_ids; |
| - for (InvalidationStateMap::const_iterator it = |
| - invalidation_state_map_.begin(); |
| - it != invalidation_state_map_.end(); ++it) { |
| - if (it->second.expected.Equals(it->second.current)) |
| - continue; |
| - unacknowledged_ids.insert(it->first); |
| - } |
| - if (!unacknowledged_ids.empty()) |
| - ack_tracker_.Track(unacknowledged_ids); |
| } |
| void SyncInvalidationListener::UpdateCredentials( |
| @@ -135,27 +108,6 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
| } |
| } |
| -void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, |
| - const AckHandle& ack_handle) { |
| - DCHECK(CalledOnValidThread()); |
| - InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); |
| - if (state_it == invalidation_state_map_.end()) |
| - return; |
| - invalidation_state_tracker_.Call( |
| - FROM_HERE, |
| - &InvalidationStateTracker::Acknowledge, |
| - id, |
| - ack_handle); |
| - state_it->second.current = ack_handle; |
| - if (state_it->second.expected.Equals(ack_handle)) { |
| - // If the received ack matches the expected ack, then we no longer need to |
| - // keep track of |id| since it is up-to-date. |
| - ObjectIdSet ids; |
| - ids.insert(id); |
| - ack_tracker_.Ack(ids); |
| - } |
| -} |
| - |
| void SyncInvalidationListener::Ready( |
| invalidation::InvalidationClient* client) { |
| DCHECK(CalledOnValidThread()); |
| @@ -171,43 +123,24 @@ void SyncInvalidationListener::Invalidate( |
| const invalidation::AckHandle& ack_handle) { |
| DCHECK(CalledOnValidThread()); |
| DCHECK_EQ(client, invalidation_client_.get()); |
| - DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); |
| + client->Acknowledge(ack_handle); |
| const invalidation::ObjectId& id = invalidation.object_id(); |
| - // The invalidation API spec allows for the possibility of redundant |
| - // invalidations, so keep track of the max versions and drop |
| - // invalidations with old versions. |
| - // |
| - // TODO(akalin): Now that we keep track of registered ids, we |
| - // should drop invalidations for unregistered ids. We may also |
| - // have to filter it at a higher level, as invalidations for |
| - // newly-unregistered ids may already be in flight. |
| - InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id); |
| - if ((it != invalidation_state_map_.end()) && |
| - (invalidation.version() <= it->second.version)) { |
| - // Drop redundant invalidations. |
| - client->Acknowledge(ack_handle); |
| - return; |
| - } |
| - |
| std::string payload; |
| // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
| if (invalidation.has_payload()) |
| payload = invalidation.payload(); |
| - DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
| - << " to " << invalidation.version(); |
| - invalidation_state_map_[id].version = invalidation.version(); |
| - invalidation_state_map_[id].payload = payload; |
| - invalidation_state_tracker_.Call( |
| - FROM_HERE, |
| - &InvalidationStateTracker::SetMaxVersionAndPayload, |
| - id, invalidation.version(), payload); |
| + DVLOG(2) << "Received invalidation with version " << invalidation.version() |
| + << " for " << ObjectIdToString(id); |
| + |
| + ObjectIdInvalidationMap invalidations; |
| + Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); |
| + inv.set_ack_handler(GetThisAsAckHandler()); |
| + invalidations.Insert(inv); |
| - ObjectIdSet ids; |
| - ids.insert(id); |
| - PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle); |
| + DispatchInvalidations(invalidations); |
| } |
| void SyncInvalidationListener::InvalidateUnknownVersion( |
| @@ -217,15 +150,14 @@ void SyncInvalidationListener::InvalidateUnknownVersion( |
| DCHECK(CalledOnValidThread()); |
| DCHECK_EQ(client, invalidation_client_.get()); |
| DVLOG(1) << "InvalidateUnknownVersion"; |
| + client->Acknowledge(ack_handle); |
| + |
| + ObjectIdInvalidationMap invalidations; |
| + Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); |
| + unknown_version.set_ack_handler(GetThisAsAckHandler()); |
| + invalidations.Insert(unknown_version); |
| - ObjectIdSet ids; |
| - ids.insert(object_id); |
| - PrepareInvalidation( |
| - ids, |
| - kUnknownVersion, |
| - std::string(), |
| - client, |
| - ack_handle); |
| + DispatchInvalidations(invalidations); |
| } |
| // This should behave as if we got an invalidation with version |
| @@ -236,86 +168,56 @@ void SyncInvalidationListener::InvalidateAll( |
| DCHECK(CalledOnValidThread()); |
| DCHECK_EQ(client, invalidation_client_.get()); |
| DVLOG(1) << "InvalidateAll"; |
| + client->Acknowledge(ack_handle); |
| + |
| + ObjectIdInvalidationMap invalidations; |
| + for (ObjectIdSet::iterator it = registered_ids_.begin(); |
| + it != registered_ids_.end(); ++it) { |
| + Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); |
| + unknown_version.set_ack_handler(GetThisAsAckHandler()); |
| + invalidations.Insert(unknown_version); |
| + } |
| - PrepareInvalidation( |
| - registered_ids_, |
| - kUnknownVersion, |
| - std::string(), |
| - client, |
| - ack_handle); |
| + DispatchInvalidations(invalidations); |
| } |
| -void SyncInvalidationListener::PrepareInvalidation( |
| - const ObjectIdSet& ids, |
| - int64 version, |
| - const std::string& payload, |
| - invalidation::InvalidationClient* client, |
| - const invalidation::AckHandle& ack_handle) { |
| +// If a handler is registered, emit right away. Otherwise, save it for later. |
| +void SyncInvalidationListener::DispatchInvalidations( |
| + const ObjectIdInvalidationMap& invalidations) { |
| DCHECK(CalledOnValidThread()); |
| - // A server invalidation resets the local retry count. |
| - ack_tracker_.Ack(ids); |
| - invalidation_state_tracker_.Call( |
| - FROM_HERE, |
| - &InvalidationStateTracker::GenerateAckHandles, |
| - ids, |
| - base::MessageLoopProxy::current(), |
| - base::Bind(&SyncInvalidationListener::EmitInvalidation, |
| - weak_ptr_factory_.GetWeakPtr(), |
| - ids, |
| - version, |
| - payload, |
| - client, |
| - ack_handle)); |
| -} |
| + ObjectIdInvalidationMap to_save = invalidations; |
| + ObjectIdInvalidationMap to_emit = |
| + invalidations.GetSubsetWithObjectIds(registered_ids_); |
| -void SyncInvalidationListener::EmitInvalidation( |
| - const ObjectIdSet& ids, |
| - int64 version, |
| - const std::string& payload, |
| - invalidation::InvalidationClient* client, |
| - const invalidation::AckHandle& ack_handle, |
| - const AckHandleMap& local_ack_handles) { |
| - DCHECK(CalledOnValidThread()); |
| + SaveInvalidations(to_save); |
| + EmitInvalidations(to_emit); |
| +} |
| - ObjectIdInvalidationMap invalidation_map; |
| - for (AckHandleMap::const_iterator it = local_ack_handles.begin(); |
| - it != local_ack_handles.end(); ++it) { |
| - // Update in-memory copy of the invalidation state. |
| - invalidation_state_map_[it->first].expected = it->second; |
| - |
| - if (version == kUnknownVersion) { |
| - Invalidation inv = Invalidation::InitUnknownVersion(it->first); |
| - inv.set_ack_handle(it->second); |
| - invalidation_map.Insert(inv); |
| - } else { |
| - Invalidation inv = Invalidation::Init(it->first, version, payload); |
| - inv.set_ack_handle(it->second); |
| - invalidation_map.Insert(inv); |
| +void SyncInvalidationListener::SaveInvalidations( |
| + const ObjectIdInvalidationMap& to_save) { |
| + ObjectIdSet objects_to_save = to_save.GetObjectIds(); |
| + for (ObjectIdSet::const_iterator it = objects_to_save.begin(); |
| + it != objects_to_save.end(); ++it) { |
| + UnackedInvalidationsMap::iterator lookup = |
| + unacked_invalidations_map_.find(*it); |
| + if (lookup == unacked_invalidations_map_.end()) { |
| + lookup = unacked_invalidations_map_.insert( |
| + std::make_pair(*it, UnackedInvalidationSet(*it))).first; |
| } |
| + lookup->second.AddSet(to_save.ForObject(*it)); |
| } |
| - ack_tracker_.Track(ids); |
| - delegate_->OnInvalidate(invalidation_map); |
| - client->Acknowledge(ack_handle); |
| + |
| + invalidation_state_tracker_.Call( |
| + FROM_HERE, |
| + &InvalidationStateTracker::SetSavedInvalidations, |
| + unacked_invalidations_map_); |
| } |
| -void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { |
| - ObjectIdInvalidationMap invalidation_map; |
| - for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { |
| - if (invalidation_state_map_[*it].version == kUnknownVersion) { |
| - Invalidation inv = Invalidation::InitUnknownVersion(*it); |
| - inv.set_ack_handle(invalidation_state_map_[*it].expected); |
| - invalidation_map.Insert(inv); |
| - } else { |
| - Invalidation inv = Invalidation::Init( |
| - *it, |
| - invalidation_state_map_[*it].version, |
| - invalidation_state_map_[*it].payload); |
| - inv.set_ack_handle(invalidation_state_map_[*it].expected); |
| - invalidation_map.Insert(inv); |
| - } |
| - } |
| - delegate_->OnInvalidate(invalidation_map); |
| +void SyncInvalidationListener::EmitInvalidations( |
| + const ObjectIdInvalidationMap& to_emit) { |
| + DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); |
| + delegate_->OnInvalidate(to_emit); |
| } |
| void SyncInvalidationListener::InformRegistrationStatus( |
| @@ -388,6 +290,38 @@ void SyncInvalidationListener::InformError( |
| EmitStateChange(); |
| } |
| +void SyncInvalidationListener::Acknowledge( |
| + const invalidation::ObjectId& id, |
| + const syncer::AckHandle& handle) { |
| + UnackedInvalidationsMap::iterator lookup = |
| + unacked_invalidations_map_.find(id); |
| + if (lookup == unacked_invalidations_map_.end()) { |
| + DLOG(WARNING) << "Received acknowledgement for untracked object ID"; |
| + return; |
| + } |
| + lookup->second.Acknowledge(handle); |
| + invalidation_state_tracker_.Call( |
| + FROM_HERE, |
| + &InvalidationStateTracker::SetSavedInvalidations, |
| + unacked_invalidations_map_); |
| +} |
| + |
| +void SyncInvalidationListener::Drop( |
| + const invalidation::ObjectId& id, |
| + const syncer::AckHandle& handle) { |
| + UnackedInvalidationsMap::iterator lookup = |
| + unacked_invalidations_map_.find(id); |
| + if (lookup == unacked_invalidations_map_.end()) { |
| + DLOG(WARNING) << "Received drop for untracked object ID"; |
| + return; |
| + } |
| + lookup->second.Drop(handle); |
| + invalidation_state_tracker_.Call( |
| + FROM_HERE, |
| + &InvalidationStateTracker::SetSavedInvalidations, |
| + unacked_invalidations_map_); |
| +} |
| + |
| void SyncInvalidationListener::WriteState(const std::string& state) { |
| DCHECK(CalledOnValidThread()); |
| DVLOG(1) << "WriteState"; |
| @@ -399,13 +333,27 @@ void SyncInvalidationListener::DoRegistrationUpdate() { |
| DCHECK(CalledOnValidThread()); |
| const ObjectIdSet& unregistered_ids = |
| registration_manager_->UpdateRegisteredIds(registered_ids_); |
| - for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); |
| + for (ObjectIdSet::iterator it = unregistered_ids.begin(); |
| it != unregistered_ids.end(); ++it) { |
| - invalidation_state_map_.erase(*it); |
| + unacked_invalidations_map_.erase(*it); |
| } |
| invalidation_state_tracker_.Call( |
| - FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
| - ack_tracker_.Ack(unregistered_ids); |
| + FROM_HERE, |
| + &InvalidationStateTracker::SetSavedInvalidations, |
| + unacked_invalidations_map_); |
| + |
| + ObjectIdInvalidationMap object_id_invalidation_map; |
| + for (UnackedInvalidationsMap::iterator map_it = |
| + unacked_invalidations_map_.begin(); |
| + map_it != unacked_invalidations_map_.end(); ++map_it) { |
| + if (registered_ids_.find(map_it->first) == registered_ids_.end()) { |
| + continue; |
| + } |
| + map_it->second.ExportInvalidations( |
| + GetThisAsAckHandler(), |
| + &object_id_invalidation_map); |
| + } |
| + EmitInvalidations(object_id_invalidation_map); |
| } |
| void SyncInvalidationListener::StopForTest() { |
| @@ -413,23 +361,12 @@ void SyncInvalidationListener::StopForTest() { |
| Stop(); |
| } |
| -InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { |
| - DCHECK(CalledOnValidThread()); |
| - return invalidation_state_map_; |
| -} |
| - |
| -AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { |
| - return &ack_tracker_; |
| -} |
| - |
| void SyncInvalidationListener::Stop() { |
| DCHECK(CalledOnValidThread()); |
| if (!invalidation_client_) { |
| return; |
| } |
| - ack_tracker_.Clear(); |
| - |
| registration_manager_.reset(); |
| sync_system_resources_.Stop(); |
| invalidation_client_->Stop(); |
| @@ -437,8 +374,6 @@ void SyncInvalidationListener::Stop() { |
| invalidation_client_.reset(); |
| delegate_ = NULL; |
| - invalidation_state_tracker_.Reset(); |
| - invalidation_state_map_.clear(); |
| ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
| push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
| } |
| @@ -466,6 +401,11 @@ void SyncInvalidationListener::EmitStateChange() { |
| delegate_->OnInvalidatorStateChange(GetState()); |
| } |
| +WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() { |
| + DCHECK(CalledOnValidThread()); |
| + return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr()); |
| +} |
| + |
| void SyncInvalidationListener::OnNotificationsEnabled() { |
| DCHECK(CalledOnValidThread()); |
| push_client_state_ = INVALIDATIONS_ENABLED; |