Index: sync/notifier/sync_invalidation_listener.cc |
diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc |
index 763cc876bf0e954ca1e8a28c6f78c707215ce4ea..0f2252f1912c73be1da021ddba81224318f34b4c 100644 |
--- a/sync/notifier/sync_invalidation_listener.cc |
+++ b/sync/notifier/sync_invalidation_listener.cc |
@@ -16,14 +16,13 @@ |
#include "google/cacheinvalidation/types.pb.h" |
#include "jingle/notifier/listener/push_client.h" |
#include "sync/notifier/invalidation_util.h" |
+#include "sync/notifier/object_id_invalidation_map.h" |
#include "sync/notifier/registration_manager.h" |
namespace { |
const char kApplicationName[] = "chrome-sync"; |
-static const int64 kUnknownVersion = -1; |
- |
} // namespace |
namespace syncer { |
@@ -31,10 +30,8 @@ namespace syncer { |
SyncInvalidationListener::Delegate::~Delegate() {} |
SyncInvalidationListener::SyncInvalidationListener( |
- base::TickClock* tick_clock, |
scoped_ptr<notifier::PushClient> push_client) |
- : ack_tracker_(tick_clock, this), |
- push_client_(push_client.get()), |
+ : push_client_(push_client.get()), |
akalin
2013/11/21 04:31:36
Should not have .get(). Otherwise, you'd have two
rlarocque
2013/11/21 20:09:27
This hasn't changed in a long time. I think this
|
sync_system_resources_(push_client.Pass(), this), |
delegate_(NULL), |
ticl_state_(DEFAULT_INVALIDATION_ERROR), |
@@ -56,7 +53,7 @@ void SyncInvalidationListener::Start( |
create_invalidation_client_callback, |
const std::string& client_id, const std::string& client_info, |
const std::string& invalidation_bootstrap_data, |
- const InvalidationStateMap& initial_invalidation_state_map, |
+ const UnackedInvalidationsMap& initial_unacked_invalidations, |
const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
Delegate* delegate) { |
DCHECK(CalledOnValidThread()); |
@@ -71,18 +68,7 @@ void SyncInvalidationListener::Start( |
sync_system_resources_.storage()->SetInitialState( |
invalidation_bootstrap_data); |
- invalidation_state_map_ = initial_invalidation_state_map; |
- if (invalidation_state_map_.empty()) { |
- DVLOG(2) << "No initial max invalidation versions for any id"; |
- } else { |
- for (InvalidationStateMap::const_iterator it = |
- invalidation_state_map_.begin(); |
- it != invalidation_state_map_.end(); ++it) { |
- DVLOG(2) << "Initial max invalidation version for " |
- << ObjectIdToString(it->first) << " is " |
- << it->second.version; |
- } |
- } |
+ unacked_invalidations_map_ = initial_unacked_invalidations; |
invalidation_state_tracker_ = invalidation_state_tracker; |
DCHECK(invalidation_state_tracker_.IsInitialized()); |
@@ -103,19 +89,6 @@ void SyncInvalidationListener::Start( |
registration_manager_.reset( |
new RegistrationManager(invalidation_client_.get())); |
- |
- // Set up reminders for any invalidations that have not been locally |
- // acknowledged. |
- ObjectIdSet unacknowledged_ids; |
- for (InvalidationStateMap::const_iterator it = |
- invalidation_state_map_.begin(); |
- it != invalidation_state_map_.end(); ++it) { |
- if (it->second.expected.Equals(it->second.current)) |
- continue; |
- unacknowledged_ids.insert(it->first); |
- } |
- if (!unacknowledged_ids.empty()) |
- ack_tracker_.Track(unacknowledged_ids); |
} |
void SyncInvalidationListener::UpdateCredentials( |
@@ -135,27 +108,6 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
} |
} |
-void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, |
- const AckHandle& ack_handle) { |
- DCHECK(CalledOnValidThread()); |
- InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); |
- if (state_it == invalidation_state_map_.end()) |
- return; |
- invalidation_state_tracker_.Call( |
- FROM_HERE, |
- &InvalidationStateTracker::Acknowledge, |
- id, |
- ack_handle); |
- state_it->second.current = ack_handle; |
- if (state_it->second.expected.Equals(ack_handle)) { |
- // If the received ack matches the expected ack, then we no longer need to |
- // keep track of |id| since it is up-to-date. |
- ObjectIdSet ids; |
- ids.insert(id); |
- ack_tracker_.Ack(ids); |
- } |
-} |
- |
void SyncInvalidationListener::Ready( |
invalidation::InvalidationClient* client) { |
DCHECK(CalledOnValidThread()); |
@@ -171,43 +123,24 @@ void SyncInvalidationListener::Invalidate( |
const invalidation::AckHandle& ack_handle) { |
DCHECK(CalledOnValidThread()); |
DCHECK_EQ(client, invalidation_client_.get()); |
- DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); |
+ client->Acknowledge(ack_handle); |
const invalidation::ObjectId& id = invalidation.object_id(); |
- // The invalidation API spec allows for the possibility of redundant |
- // invalidations, so keep track of the max versions and drop |
- // invalidations with old versions. |
- // |
- // TODO(akalin): Now that we keep track of registered ids, we |
- // should drop invalidations for unregistered ids. We may also |
- // have to filter it at a higher level, as invalidations for |
- // newly-unregistered ids may already be in flight. |
- InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id); |
- if ((it != invalidation_state_map_.end()) && |
- (invalidation.version() <= it->second.version)) { |
- // Drop redundant invalidations. |
- client->Acknowledge(ack_handle); |
- return; |
- } |
- |
std::string payload; |
// payload() CHECK()'s has_payload(), so we must check it ourselves first. |
if (invalidation.has_payload()) |
payload = invalidation.payload(); |
- DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
- << " to " << invalidation.version(); |
- invalidation_state_map_[id].version = invalidation.version(); |
- invalidation_state_map_[id].payload = payload; |
- invalidation_state_tracker_.Call( |
- FROM_HERE, |
- &InvalidationStateTracker::SetMaxVersionAndPayload, |
- id, invalidation.version(), payload); |
+ DVLOG(2) << "Received invalidation with version " << invalidation.version() |
+ << " for " << ObjectIdToString(id); |
+ |
+ ObjectIdInvalidationMap invalidations; |
+ Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); |
+ inv.set_ack_handler(GetThisAsAckHandler()); |
+ invalidations.Insert(inv); |
- ObjectIdSet ids; |
- ids.insert(id); |
- PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle); |
+ DispatchInvalidations(invalidations); |
} |
void SyncInvalidationListener::InvalidateUnknownVersion( |
@@ -217,15 +150,14 @@ void SyncInvalidationListener::InvalidateUnknownVersion( |
DCHECK(CalledOnValidThread()); |
DCHECK_EQ(client, invalidation_client_.get()); |
DVLOG(1) << "InvalidateUnknownVersion"; |
+ client->Acknowledge(ack_handle); |
+ |
+ ObjectIdInvalidationMap invalidations; |
+ Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); |
+ unknown_version.set_ack_handler(GetThisAsAckHandler()); |
+ invalidations.Insert(unknown_version); |
- ObjectIdSet ids; |
- ids.insert(object_id); |
- PrepareInvalidation( |
- ids, |
- kUnknownVersion, |
- std::string(), |
- client, |
- ack_handle); |
+ DispatchInvalidations(invalidations); |
} |
// This should behave as if we got an invalidation with version |
@@ -236,86 +168,56 @@ void SyncInvalidationListener::InvalidateAll( |
DCHECK(CalledOnValidThread()); |
DCHECK_EQ(client, invalidation_client_.get()); |
DVLOG(1) << "InvalidateAll"; |
+ client->Acknowledge(ack_handle); |
+ |
+ ObjectIdInvalidationMap invalidations; |
+ for (ObjectIdSet::iterator it = registered_ids_.begin(); |
+ it != registered_ids_.end(); ++it) { |
+ Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); |
+ unknown_version.set_ack_handler(GetThisAsAckHandler()); |
+ invalidations.Insert(unknown_version); |
+ } |
- PrepareInvalidation( |
- registered_ids_, |
- kUnknownVersion, |
- std::string(), |
- client, |
- ack_handle); |
+ DispatchInvalidations(invalidations); |
} |
-void SyncInvalidationListener::PrepareInvalidation( |
- const ObjectIdSet& ids, |
- int64 version, |
- const std::string& payload, |
- invalidation::InvalidationClient* client, |
- const invalidation::AckHandle& ack_handle) { |
+// If a handler is registered, emit right away. Otherwise, save it for later. |
+void SyncInvalidationListener::DispatchInvalidations( |
+ const ObjectIdInvalidationMap& invalidations) { |
DCHECK(CalledOnValidThread()); |
- // A server invalidation resets the local retry count. |
- ack_tracker_.Ack(ids); |
- invalidation_state_tracker_.Call( |
- FROM_HERE, |
- &InvalidationStateTracker::GenerateAckHandles, |
- ids, |
- base::MessageLoopProxy::current(), |
- base::Bind(&SyncInvalidationListener::EmitInvalidation, |
- weak_ptr_factory_.GetWeakPtr(), |
- ids, |
- version, |
- payload, |
- client, |
- ack_handle)); |
-} |
+ ObjectIdInvalidationMap to_save = invalidations; |
+ ObjectIdInvalidationMap to_emit = |
+ invalidations.GetSubsetWithObjectIds(registered_ids_); |
-void SyncInvalidationListener::EmitInvalidation( |
- const ObjectIdSet& ids, |
- int64 version, |
- const std::string& payload, |
- invalidation::InvalidationClient* client, |
- const invalidation::AckHandle& ack_handle, |
- const AckHandleMap& local_ack_handles) { |
- DCHECK(CalledOnValidThread()); |
+ SaveInvalidations(to_save); |
+ EmitInvalidations(to_emit); |
+} |
- ObjectIdInvalidationMap invalidation_map; |
- for (AckHandleMap::const_iterator it = local_ack_handles.begin(); |
- it != local_ack_handles.end(); ++it) { |
- // Update in-memory copy of the invalidation state. |
- invalidation_state_map_[it->first].expected = it->second; |
- |
- if (version == kUnknownVersion) { |
- Invalidation inv = Invalidation::InitUnknownVersion(it->first); |
- inv.set_ack_handle(it->second); |
- invalidation_map.Insert(inv); |
- } else { |
- Invalidation inv = Invalidation::Init(it->first, version, payload); |
- inv.set_ack_handle(it->second); |
- invalidation_map.Insert(inv); |
+void SyncInvalidationListener::SaveInvalidations( |
+ const ObjectIdInvalidationMap& to_save) { |
+ ObjectIdSet objects_to_save = to_save.GetObjectIds(); |
+ for (ObjectIdSet::const_iterator it = objects_to_save.begin(); |
+ it != objects_to_save.end(); ++it) { |
+ UnackedInvalidationsMap::iterator lookup = |
+ unacked_invalidations_map_.find(*it); |
+ if (lookup == unacked_invalidations_map_.end()) { |
+ lookup = unacked_invalidations_map_.insert( |
+ std::make_pair(*it, UnackedInvalidationSet(*it))).first; |
} |
+ lookup->second.AddSet(to_save.ForObject(*it)); |
} |
- ack_tracker_.Track(ids); |
- delegate_->OnInvalidate(invalidation_map); |
- client->Acknowledge(ack_handle); |
+ |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, |
+ &InvalidationStateTracker::SetSavedInvalidations, |
+ unacked_invalidations_map_); |
} |
-void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { |
- ObjectIdInvalidationMap invalidation_map; |
- for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { |
- if (invalidation_state_map_[*it].version == kUnknownVersion) { |
- Invalidation inv = Invalidation::InitUnknownVersion(*it); |
- inv.set_ack_handle(invalidation_state_map_[*it].expected); |
- invalidation_map.Insert(inv); |
- } else { |
- Invalidation inv = Invalidation::Init( |
- *it, |
- invalidation_state_map_[*it].version, |
- invalidation_state_map_[*it].payload); |
- inv.set_ack_handle(invalidation_state_map_[*it].expected); |
- invalidation_map.Insert(inv); |
- } |
- } |
- delegate_->OnInvalidate(invalidation_map); |
+void SyncInvalidationListener::EmitInvalidations( |
+ const ObjectIdInvalidationMap& to_emit) { |
+ DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); |
+ delegate_->OnInvalidate(to_emit); |
} |
void SyncInvalidationListener::InformRegistrationStatus( |
@@ -388,6 +290,38 @@ void SyncInvalidationListener::InformError( |
EmitStateChange(); |
} |
+void SyncInvalidationListener::Acknowledge( |
+ const invalidation::ObjectId& id, |
+ const syncer::AckHandle& handle) { |
+ UnackedInvalidationsMap::iterator lookup = |
+ unacked_invalidations_map_.find(id); |
+ if (lookup == unacked_invalidations_map_.end()) { |
+ DLOG(WARNING) << "Received acknowledgement for untracked object ID"; |
+ return; |
+ } |
+ lookup->second.Acknowledge(handle); |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, |
+ &InvalidationStateTracker::SetSavedInvalidations, |
+ unacked_invalidations_map_); |
+} |
+ |
+void SyncInvalidationListener::Drop( |
+ const invalidation::ObjectId& id, |
+ const syncer::AckHandle& handle) { |
+ UnackedInvalidationsMap::iterator lookup = |
+ unacked_invalidations_map_.find(id); |
+ if (lookup == unacked_invalidations_map_.end()) { |
+ DLOG(WARNING) << "Received drop for untracked object ID"; |
+ return; |
+ } |
+ lookup->second.Drop(handle); |
+ invalidation_state_tracker_.Call( |
+ FROM_HERE, |
+ &InvalidationStateTracker::SetSavedInvalidations, |
+ unacked_invalidations_map_); |
+} |
+ |
void SyncInvalidationListener::WriteState(const std::string& state) { |
DCHECK(CalledOnValidThread()); |
DVLOG(1) << "WriteState"; |
@@ -399,13 +333,27 @@ void SyncInvalidationListener::DoRegistrationUpdate() { |
DCHECK(CalledOnValidThread()); |
const ObjectIdSet& unregistered_ids = |
registration_manager_->UpdateRegisteredIds(registered_ids_); |
- for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); |
+ for (ObjectIdSet::iterator it = unregistered_ids.begin(); |
it != unregistered_ids.end(); ++it) { |
- invalidation_state_map_.erase(*it); |
+ unacked_invalidations_map_.erase(*it); |
} |
invalidation_state_tracker_.Call( |
- FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
- ack_tracker_.Ack(unregistered_ids); |
+ FROM_HERE, |
+ &InvalidationStateTracker::SetSavedInvalidations, |
+ unacked_invalidations_map_); |
+ |
+ ObjectIdInvalidationMap object_id_invalidation_map; |
+ for (UnackedInvalidationsMap::iterator map_it = |
+ unacked_invalidations_map_.begin(); |
+ map_it != unacked_invalidations_map_.end(); ++map_it) { |
+ if (registered_ids_.find(map_it->first) == registered_ids_.end()) { |
+ continue; |
+ } |
+ map_it->second.ExportInvalidations( |
+ GetThisAsAckHandler(), |
+ &object_id_invalidation_map); |
+ } |
+ EmitInvalidations(object_id_invalidation_map); |
} |
void SyncInvalidationListener::StopForTest() { |
@@ -413,23 +361,12 @@ void SyncInvalidationListener::StopForTest() { |
Stop(); |
} |
-InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { |
- DCHECK(CalledOnValidThread()); |
- return invalidation_state_map_; |
-} |
- |
-AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { |
- return &ack_tracker_; |
-} |
- |
void SyncInvalidationListener::Stop() { |
DCHECK(CalledOnValidThread()); |
if (!invalidation_client_) { |
return; |
} |
- ack_tracker_.Clear(); |
- |
registration_manager_.reset(); |
sync_system_resources_.Stop(); |
invalidation_client_->Stop(); |
@@ -437,8 +374,6 @@ void SyncInvalidationListener::Stop() { |
invalidation_client_.reset(); |
delegate_ = NULL; |
- invalidation_state_tracker_.Reset(); |
- invalidation_state_map_.clear(); |
ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
} |
@@ -466,6 +401,11 @@ void SyncInvalidationListener::EmitStateChange() { |
delegate_->OnInvalidatorStateChange(GetState()); |
} |
+WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() { |
+ DCHECK(CalledOnValidThread()); |
+ return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr()); |
+} |
+ |
void SyncInvalidationListener::OnNotificationsEnabled() { |
DCHECK(CalledOnValidThread()); |
push_client_state_ = INVALIDATIONS_ENABLED; |