Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(528)

Unified Diff: sync/notifier/sync_invalidation_listener.cc

Issue 56113003: Implement new invalidations ack tracking system (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Review fixes Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: sync/notifier/sync_invalidation_listener.cc
diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc
index 763cc876bf0e954ca1e8a28c6f78c707215ce4ea..0f2252f1912c73be1da021ddba81224318f34b4c 100644
--- a/sync/notifier/sync_invalidation_listener.cc
+++ b/sync/notifier/sync_invalidation_listener.cc
@@ -16,14 +16,13 @@
#include "google/cacheinvalidation/types.pb.h"
#include "jingle/notifier/listener/push_client.h"
#include "sync/notifier/invalidation_util.h"
+#include "sync/notifier/object_id_invalidation_map.h"
#include "sync/notifier/registration_manager.h"
namespace {
const char kApplicationName[] = "chrome-sync";
-static const int64 kUnknownVersion = -1;
-
} // namespace
namespace syncer {
@@ -31,10 +30,8 @@ namespace syncer {
SyncInvalidationListener::Delegate::~Delegate() {}
SyncInvalidationListener::SyncInvalidationListener(
- base::TickClock* tick_clock,
scoped_ptr<notifier::PushClient> push_client)
- : ack_tracker_(tick_clock, this),
- push_client_(push_client.get()),
+ : push_client_(push_client.get()),
akalin 2013/11/21 04:31:36 Should not have .get(). Otherwise, you'd have two
rlarocque 2013/11/21 20:09:27 This hasn't changed in a long time. I think this
sync_system_resources_(push_client.Pass(), this),
delegate_(NULL),
ticl_state_(DEFAULT_INVALIDATION_ERROR),
@@ -56,7 +53,7 @@ void SyncInvalidationListener::Start(
create_invalidation_client_callback,
const std::string& client_id, const std::string& client_info,
const std::string& invalidation_bootstrap_data,
- const InvalidationStateMap& initial_invalidation_state_map,
+ const UnackedInvalidationsMap& initial_unacked_invalidations,
const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
Delegate* delegate) {
DCHECK(CalledOnValidThread());
@@ -71,18 +68,7 @@ void SyncInvalidationListener::Start(
sync_system_resources_.storage()->SetInitialState(
invalidation_bootstrap_data);
- invalidation_state_map_ = initial_invalidation_state_map;
- if (invalidation_state_map_.empty()) {
- DVLOG(2) << "No initial max invalidation versions for any id";
- } else {
- for (InvalidationStateMap::const_iterator it =
- invalidation_state_map_.begin();
- it != invalidation_state_map_.end(); ++it) {
- DVLOG(2) << "Initial max invalidation version for "
- << ObjectIdToString(it->first) << " is "
- << it->second.version;
- }
- }
+ unacked_invalidations_map_ = initial_unacked_invalidations;
invalidation_state_tracker_ = invalidation_state_tracker;
DCHECK(invalidation_state_tracker_.IsInitialized());
@@ -103,19 +89,6 @@ void SyncInvalidationListener::Start(
registration_manager_.reset(
new RegistrationManager(invalidation_client_.get()));
-
- // Set up reminders for any invalidations that have not been locally
- // acknowledged.
- ObjectIdSet unacknowledged_ids;
- for (InvalidationStateMap::const_iterator it =
- invalidation_state_map_.begin();
- it != invalidation_state_map_.end(); ++it) {
- if (it->second.expected.Equals(it->second.current))
- continue;
- unacknowledged_ids.insert(it->first);
- }
- if (!unacknowledged_ids.empty())
- ack_tracker_.Track(unacknowledged_ids);
}
void SyncInvalidationListener::UpdateCredentials(
@@ -135,27 +108,6 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
}
}
-void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
- const AckHandle& ack_handle) {
- DCHECK(CalledOnValidThread());
- InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
- if (state_it == invalidation_state_map_.end())
- return;
- invalidation_state_tracker_.Call(
- FROM_HERE,
- &InvalidationStateTracker::Acknowledge,
- id,
- ack_handle);
- state_it->second.current = ack_handle;
- if (state_it->second.expected.Equals(ack_handle)) {
- // If the received ack matches the expected ack, then we no longer need to
- // keep track of |id| since it is up-to-date.
- ObjectIdSet ids;
- ids.insert(id);
- ack_tracker_.Ack(ids);
- }
-}
-
void SyncInvalidationListener::Ready(
invalidation::InvalidationClient* client) {
DCHECK(CalledOnValidThread());
@@ -171,43 +123,24 @@ void SyncInvalidationListener::Invalidate(
const invalidation::AckHandle& ack_handle) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
- DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation);
+ client->Acknowledge(ack_handle);
const invalidation::ObjectId& id = invalidation.object_id();
- // The invalidation API spec allows for the possibility of redundant
- // invalidations, so keep track of the max versions and drop
- // invalidations with old versions.
- //
- // TODO(akalin): Now that we keep track of registered ids, we
- // should drop invalidations for unregistered ids. We may also
- // have to filter it at a higher level, as invalidations for
- // newly-unregistered ids may already be in flight.
- InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id);
- if ((it != invalidation_state_map_.end()) &&
- (invalidation.version() <= it->second.version)) {
- // Drop redundant invalidations.
- client->Acknowledge(ack_handle);
- return;
- }
-
std::string payload;
// payload() CHECK()'s has_payload(), so we must check it ourselves first.
if (invalidation.has_payload())
payload = invalidation.payload();
- DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
- << " to " << invalidation.version();
- invalidation_state_map_[id].version = invalidation.version();
- invalidation_state_map_[id].payload = payload;
- invalidation_state_tracker_.Call(
- FROM_HERE,
- &InvalidationStateTracker::SetMaxVersionAndPayload,
- id, invalidation.version(), payload);
+ DVLOG(2) << "Received invalidation with version " << invalidation.version()
+ << " for " << ObjectIdToString(id);
+
+ ObjectIdInvalidationMap invalidations;
+ Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
+ inv.set_ack_handler(GetThisAsAckHandler());
+ invalidations.Insert(inv);
- ObjectIdSet ids;
- ids.insert(id);
- PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle);
+ DispatchInvalidations(invalidations);
}
void SyncInvalidationListener::InvalidateUnknownVersion(
@@ -217,15 +150,14 @@ void SyncInvalidationListener::InvalidateUnknownVersion(
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateUnknownVersion";
+ client->Acknowledge(ack_handle);
+
+ ObjectIdInvalidationMap invalidations;
+ Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
+ unknown_version.set_ack_handler(GetThisAsAckHandler());
+ invalidations.Insert(unknown_version);
- ObjectIdSet ids;
- ids.insert(object_id);
- PrepareInvalidation(
- ids,
- kUnknownVersion,
- std::string(),
- client,
- ack_handle);
+ DispatchInvalidations(invalidations);
}
// This should behave as if we got an invalidation with version
@@ -236,86 +168,56 @@ void SyncInvalidationListener::InvalidateAll(
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateAll";
+ client->Acknowledge(ack_handle);
+
+ ObjectIdInvalidationMap invalidations;
+ for (ObjectIdSet::iterator it = registered_ids_.begin();
+ it != registered_ids_.end(); ++it) {
+ Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
+ unknown_version.set_ack_handler(GetThisAsAckHandler());
+ invalidations.Insert(unknown_version);
+ }
- PrepareInvalidation(
- registered_ids_,
- kUnknownVersion,
- std::string(),
- client,
- ack_handle);
+ DispatchInvalidations(invalidations);
}
-void SyncInvalidationListener::PrepareInvalidation(
- const ObjectIdSet& ids,
- int64 version,
- const std::string& payload,
- invalidation::InvalidationClient* client,
- const invalidation::AckHandle& ack_handle) {
+// If a handler is registered, emit right away. Otherwise, save it for later.
+void SyncInvalidationListener::DispatchInvalidations(
+ const ObjectIdInvalidationMap& invalidations) {
DCHECK(CalledOnValidThread());
- // A server invalidation resets the local retry count.
- ack_tracker_.Ack(ids);
- invalidation_state_tracker_.Call(
- FROM_HERE,
- &InvalidationStateTracker::GenerateAckHandles,
- ids,
- base::MessageLoopProxy::current(),
- base::Bind(&SyncInvalidationListener::EmitInvalidation,
- weak_ptr_factory_.GetWeakPtr(),
- ids,
- version,
- payload,
- client,
- ack_handle));
-}
+ ObjectIdInvalidationMap to_save = invalidations;
+ ObjectIdInvalidationMap to_emit =
+ invalidations.GetSubsetWithObjectIds(registered_ids_);
-void SyncInvalidationListener::EmitInvalidation(
- const ObjectIdSet& ids,
- int64 version,
- const std::string& payload,
- invalidation::InvalidationClient* client,
- const invalidation::AckHandle& ack_handle,
- const AckHandleMap& local_ack_handles) {
- DCHECK(CalledOnValidThread());
+ SaveInvalidations(to_save);
+ EmitInvalidations(to_emit);
+}
- ObjectIdInvalidationMap invalidation_map;
- for (AckHandleMap::const_iterator it = local_ack_handles.begin();
- it != local_ack_handles.end(); ++it) {
- // Update in-memory copy of the invalidation state.
- invalidation_state_map_[it->first].expected = it->second;
-
- if (version == kUnknownVersion) {
- Invalidation inv = Invalidation::InitUnknownVersion(it->first);
- inv.set_ack_handle(it->second);
- invalidation_map.Insert(inv);
- } else {
- Invalidation inv = Invalidation::Init(it->first, version, payload);
- inv.set_ack_handle(it->second);
- invalidation_map.Insert(inv);
+void SyncInvalidationListener::SaveInvalidations(
+ const ObjectIdInvalidationMap& to_save) {
+ ObjectIdSet objects_to_save = to_save.GetObjectIds();
+ for (ObjectIdSet::const_iterator it = objects_to_save.begin();
+ it != objects_to_save.end(); ++it) {
+ UnackedInvalidationsMap::iterator lookup =
+ unacked_invalidations_map_.find(*it);
+ if (lookup == unacked_invalidations_map_.end()) {
+ lookup = unacked_invalidations_map_.insert(
+ std::make_pair(*it, UnackedInvalidationSet(*it))).first;
}
+ lookup->second.AddSet(to_save.ForObject(*it));
}
- ack_tracker_.Track(ids);
- delegate_->OnInvalidate(invalidation_map);
- client->Acknowledge(ack_handle);
+
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::SetSavedInvalidations,
+ unacked_invalidations_map_);
}
-void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) {
- ObjectIdInvalidationMap invalidation_map;
- for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
- if (invalidation_state_map_[*it].version == kUnknownVersion) {
- Invalidation inv = Invalidation::InitUnknownVersion(*it);
- inv.set_ack_handle(invalidation_state_map_[*it].expected);
- invalidation_map.Insert(inv);
- } else {
- Invalidation inv = Invalidation::Init(
- *it,
- invalidation_state_map_[*it].version,
- invalidation_state_map_[*it].payload);
- inv.set_ack_handle(invalidation_state_map_[*it].expected);
- invalidation_map.Insert(inv);
- }
- }
- delegate_->OnInvalidate(invalidation_map);
+void SyncInvalidationListener::EmitInvalidations(
+ const ObjectIdInvalidationMap& to_emit) {
+ DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
+ delegate_->OnInvalidate(to_emit);
}
void SyncInvalidationListener::InformRegistrationStatus(
@@ -388,6 +290,38 @@ void SyncInvalidationListener::InformError(
EmitStateChange();
}
+void SyncInvalidationListener::Acknowledge(
+ const invalidation::ObjectId& id,
+ const syncer::AckHandle& handle) {
+ UnackedInvalidationsMap::iterator lookup =
+ unacked_invalidations_map_.find(id);
+ if (lookup == unacked_invalidations_map_.end()) {
+ DLOG(WARNING) << "Received acknowledgement for untracked object ID";
+ return;
+ }
+ lookup->second.Acknowledge(handle);
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::SetSavedInvalidations,
+ unacked_invalidations_map_);
+}
+
+void SyncInvalidationListener::Drop(
+ const invalidation::ObjectId& id,
+ const syncer::AckHandle& handle) {
+ UnackedInvalidationsMap::iterator lookup =
+ unacked_invalidations_map_.find(id);
+ if (lookup == unacked_invalidations_map_.end()) {
+ DLOG(WARNING) << "Received drop for untracked object ID";
+ return;
+ }
+ lookup->second.Drop(handle);
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::SetSavedInvalidations,
+ unacked_invalidations_map_);
+}
+
void SyncInvalidationListener::WriteState(const std::string& state) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "WriteState";
@@ -399,13 +333,27 @@ void SyncInvalidationListener::DoRegistrationUpdate() {
DCHECK(CalledOnValidThread());
const ObjectIdSet& unregistered_ids =
registration_manager_->UpdateRegisteredIds(registered_ids_);
- for (ObjectIdSet::const_iterator it = unregistered_ids.begin();
+ for (ObjectIdSet::iterator it = unregistered_ids.begin();
it != unregistered_ids.end(); ++it) {
- invalidation_state_map_.erase(*it);
+ unacked_invalidations_map_.erase(*it);
}
invalidation_state_tracker_.Call(
- FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
- ack_tracker_.Ack(unregistered_ids);
+ FROM_HERE,
+ &InvalidationStateTracker::SetSavedInvalidations,
+ unacked_invalidations_map_);
+
+ ObjectIdInvalidationMap object_id_invalidation_map;
+ for (UnackedInvalidationsMap::iterator map_it =
+ unacked_invalidations_map_.begin();
+ map_it != unacked_invalidations_map_.end(); ++map_it) {
+ if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
+ continue;
+ }
+ map_it->second.ExportInvalidations(
+ GetThisAsAckHandler(),
+ &object_id_invalidation_map);
+ }
+ EmitInvalidations(object_id_invalidation_map);
}
void SyncInvalidationListener::StopForTest() {
@@ -413,23 +361,12 @@ void SyncInvalidationListener::StopForTest() {
Stop();
}
-InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const {
- DCHECK(CalledOnValidThread());
- return invalidation_state_map_;
-}
-
-AckTracker* SyncInvalidationListener::GetAckTrackerForTest() {
- return &ack_tracker_;
-}
-
void SyncInvalidationListener::Stop() {
DCHECK(CalledOnValidThread());
if (!invalidation_client_) {
return;
}
- ack_tracker_.Clear();
-
registration_manager_.reset();
sync_system_resources_.Stop();
invalidation_client_->Stop();
@@ -437,8 +374,6 @@ void SyncInvalidationListener::Stop() {
invalidation_client_.reset();
delegate_ = NULL;
- invalidation_state_tracker_.Reset();
- invalidation_state_map_.clear();
ticl_state_ = DEFAULT_INVALIDATION_ERROR;
push_client_state_ = DEFAULT_INVALIDATION_ERROR;
}
@@ -466,6 +401,11 @@ void SyncInvalidationListener::EmitStateChange() {
delegate_->OnInvalidatorStateChange(GetState());
}
+WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() {
+ DCHECK(CalledOnValidThread());
+ return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr());
+}
+
void SyncInvalidationListener::OnNotificationsEnabled() {
DCHECK(CalledOnValidThread());
push_client_state_ = INVALIDATIONS_ENABLED;

Powered by Google App Engine
This is Rietveld 408576698