Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(10)

Unified Diff: sync/notifier/sync_invalidation_listener.cc

Issue 23754021: Invalidation trickles mega-patch (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « sync/notifier/sync_invalidation_listener.h ('k') | sync/notifier/sync_invalidation_listener_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: sync/notifier/sync_invalidation_listener.cc
diff --git a/sync/notifier/sync_invalidation_listener.cc b/sync/notifier/sync_invalidation_listener.cc
index ed07f2070b7fdc4398758deacd2565c5d69f27ad..32d7f97b4b8e909a577059e14b2c6712c1065a2c 100644
--- a/sync/notifier/sync_invalidation_listener.cc
+++ b/sync/notifier/sync_invalidation_listener.cc
@@ -16,6 +16,7 @@
#include "google/cacheinvalidation/types.pb.h"
#include "jingle/notifier/listener/push_client.h"
#include "sync/notifier/invalidation_util.h"
+#include "sync/notifier/object_id_invalidation_map.h"
#include "sync/notifier/registration_manager.h"
namespace {
@@ -26,13 +27,14 @@ const char kApplicationName[] = "chrome-sync";
namespace syncer {
+// static
+const size_t SyncInvalidationListener::kMaxBufferedInvalidations = 5;
+
SyncInvalidationListener::Delegate::~Delegate() {}
SyncInvalidationListener::SyncInvalidationListener(
- base::TickClock* tick_clock,
scoped_ptr<notifier::PushClient> push_client)
: weak_ptr_factory_(this),
- ack_tracker_(tick_clock, this),
push_client_(push_client.get()),
sync_system_resources_(push_client.Pass(), this),
delegate_(NULL),
@@ -54,7 +56,7 @@ void SyncInvalidationListener::Start(
create_invalidation_client_callback,
const std::string& client_id, const std::string& client_info,
const std::string& invalidation_bootstrap_data,
- const InvalidationStateMap& initial_invalidation_state_map,
+ const UnackedInvalidationStorageMap& initial_unacked_invalidations,
const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
Delegate* delegate) {
DCHECK(CalledOnValidThread());
@@ -69,18 +71,7 @@ void SyncInvalidationListener::Start(
sync_system_resources_.storage()->SetInitialState(
invalidation_bootstrap_data);
- invalidation_state_map_ = initial_invalidation_state_map;
- if (invalidation_state_map_.empty()) {
- DVLOG(2) << "No initial max invalidation versions for any id";
- } else {
- for (InvalidationStateMap::const_iterator it =
- invalidation_state_map_.begin();
- it != invalidation_state_map_.end(); ++it) {
- DVLOG(2) << "Initial max invalidation version for "
- << ObjectIdToString(it->first) << " is "
- << it->second.version;
- }
- }
+ unacked_invalidations_map_ = initial_unacked_invalidations;
invalidation_state_tracker_ = invalidation_state_tracker;
DCHECK(invalidation_state_tracker_.IsInitialized());
@@ -101,19 +92,6 @@ void SyncInvalidationListener::Start(
registration_manager_.reset(
new RegistrationManager(invalidation_client_.get()));
-
- // Set up reminders for any invalidations that have not been locally
- // acknowledged.
- ObjectIdSet unacknowledged_ids;
- for (InvalidationStateMap::const_iterator it =
- invalidation_state_map_.begin();
- it != invalidation_state_map_.end(); ++it) {
- if (it->second.expected.Equals(it->second.current))
- continue;
- unacknowledged_ids.insert(it->first);
- }
- if (!unacknowledged_ids.empty())
- ack_tracker_.Track(unacknowledged_ids);
}
void SyncInvalidationListener::UpdateCredentials(
@@ -133,27 +111,6 @@ void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
}
}
-void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
- const AckHandle& ack_handle) {
- DCHECK(CalledOnValidThread());
- InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
- if (state_it == invalidation_state_map_.end())
- return;
- invalidation_state_tracker_.Call(
- FROM_HERE,
- &InvalidationStateTracker::Acknowledge,
- id,
- ack_handle);
- state_it->second.current = ack_handle;
- if (state_it->second.expected.Equals(ack_handle)) {
- // If the received ack matches the expected ack, then we no longer need to
- // keep track of |id| since it is up-to-date.
- ObjectIdSet ids;
- ids.insert(id);
- ack_tracker_.Ack(ids);
- }
-}
-
void SyncInvalidationListener::Ready(
invalidation::InvalidationClient* client) {
DCHECK(CalledOnValidThread());
@@ -169,43 +126,24 @@ void SyncInvalidationListener::Invalidate(
const invalidation::AckHandle& ack_handle) {
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
- DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation);
+ client->Acknowledge(ack_handle);
const invalidation::ObjectId& id = invalidation.object_id();
- // The invalidation API spec allows for the possibility of redundant
- // invalidations, so keep track of the max versions and drop
- // invalidations with old versions.
- //
- // TODO(akalin): Now that we keep track of registered ids, we
- // should drop invalidations for unregistered ids. We may also
- // have to filter it at a higher level, as invalidations for
- // newly-unregistered ids may already be in flight.
- InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id);
- if ((it != invalidation_state_map_.end()) &&
- (invalidation.version() <= it->second.version)) {
- // Drop redundant invalidations.
- client->Acknowledge(ack_handle);
- return;
- }
-
std::string payload;
// payload() CHECK()'s has_payload(), so we must check it ourselves first.
if (invalidation.has_payload())
payload = invalidation.payload();
- DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
- << " to " << invalidation.version();
- invalidation_state_map_[id].version = invalidation.version();
- invalidation_state_map_[id].payload = payload;
- invalidation_state_tracker_.Call(
- FROM_HERE,
- &InvalidationStateTracker::SetMaxVersionAndPayload,
- id, invalidation.version(), payload);
+ DVLOG(2) << "Received invalidation with version " << invalidation.version()
+ << " for " << ObjectIdToString(id);
+
+ ObjectIdInvalidationMap invalidations;
+ Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
+ inv.SetAckHandler(GetThisAsAckHandler());
+ invalidations.Insert(inv);
- ObjectIdSet ids;
- ids.insert(id);
- PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle);
+ DispatchInvalidations(invalidations);
}
void SyncInvalidationListener::InvalidateUnknownVersion(
@@ -215,15 +153,14 @@ void SyncInvalidationListener::InvalidateUnknownVersion(
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateUnknownVersion";
+ client->Acknowledge(ack_handle);
+
+ ObjectIdInvalidationMap invalidations;
+ Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
+ unknown_version.SetAckHandler(GetThisAsAckHandler());
+ invalidations.Insert(unknown_version);
- ObjectIdSet ids;
- ids.insert(object_id);
- PrepareInvalidation(
- ids,
- Invalidation::kUnknownVersion,
- std::string(),
- client,
- ack_handle);
+ DispatchInvalidations(invalidations);
}
// This should behave as if we got an invalidation with version
@@ -234,71 +171,55 @@ void SyncInvalidationListener::InvalidateAll(
DCHECK(CalledOnValidThread());
DCHECK_EQ(client, invalidation_client_.get());
DVLOG(1) << "InvalidateAll";
+ client->Acknowledge(ack_handle);
+
+ ObjectIdInvalidationMap invalidations;
+ for (ObjectIdSet::iterator it = registered_ids_.begin();
+ it != registered_ids_.end(); ++it) {
+ Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
+ unknown_version.SetAckHandler(GetThisAsAckHandler());
+ invalidations.Insert(unknown_version);
+ }
- PrepareInvalidation(
- registered_ids_,
- Invalidation::kUnknownVersion,
- std::string(),
- client,
- ack_handle);
+ DispatchInvalidations(invalidations);
}
-void SyncInvalidationListener::PrepareInvalidation(
- const ObjectIdSet& ids,
- int64 version,
- const std::string& payload,
- invalidation::InvalidationClient* client,
- const invalidation::AckHandle& ack_handle) {
+// If a handler is registered, emit right away. Otherwise, save it for later.
+void SyncInvalidationListener::DispatchInvalidations(
+ const ObjectIdInvalidationMap& invalidations) {
DCHECK(CalledOnValidThread());
- // A server invalidation resets the local retry count.
- ack_tracker_.Ack(ids);
- invalidation_state_tracker_.Call(
- FROM_HERE,
- &InvalidationStateTracker::GenerateAckHandles,
- ids,
- base::MessageLoopProxy::current(),
- base::Bind(&SyncInvalidationListener::EmitInvalidation,
- weak_ptr_factory_.GetWeakPtr(),
- ids,
- version,
- payload,
- client,
- ack_handle));
-}
+ ObjectIdInvalidationMap to_save = invalidations;
+ ObjectIdInvalidationMap to_emit = invalidations.WithObjects(registered_ids_);
-void SyncInvalidationListener::EmitInvalidation(
- const ObjectIdSet& ids,
- int64 version,
- const std::string& payload,
- invalidation::InvalidationClient* client,
- const invalidation::AckHandle& ack_handle,
- const AckHandleMap& local_ack_handles) {
- DCHECK(CalledOnValidThread());
- ObjectIdInvalidationMap invalidation_map =
- ObjectIdSetToInvalidationMap(ids, version, payload);
- for (AckHandleMap::const_iterator it = local_ack_handles.begin();
- it != local_ack_handles.end(); ++it) {
- // Update in-memory copy of the invalidation state.
- invalidation_state_map_[it->first].expected = it->second;
- invalidation_map[it->first].ack_handle = it->second;
- }
- ack_tracker_.Track(ids);
- delegate_->OnInvalidate(invalidation_map);
- client->Acknowledge(ack_handle);
+ SaveInvalidations(to_save);
+ EmitInvalidations(to_emit);
}
-void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) {
- ObjectIdInvalidationMap invalidation_map;
- for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
- Invalidation invalidation;
- invalidation.ack_handle = invalidation_state_map_[*it].expected;
- invalidation.version = invalidation_state_map_[*it].version;
- invalidation.payload = invalidation_state_map_[*it].payload;
- invalidation_map.insert(std::make_pair(*it, invalidation));
+void SyncInvalidationListener::SaveInvalidations(
+ const ObjectIdInvalidationMap& to_save) {
+ ObjectIdSet objects_to_save = to_save.GetObjectIds();
+ for (ObjectIdSet::iterator it = objects_to_save.begin();
+ it != objects_to_save.end(); ++it) {
+ UnackedInvalidationStorageMap::iterator lookup =
+ unacked_invalidations_map_.find(*it);
+ if (lookup == unacked_invalidations_map_.end()) {
+ lookup = unacked_invalidations_map_.insert(
+ std::make_pair(*it, UnackedInvalidationStorage(*it))).first;
+ }
+ lookup->second.RecordInvalidations(to_save.ForObject(*it));
}
- delegate_->OnInvalidate(invalidation_map);
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::SetSavedInvalidations,
+ unacked_invalidations_map_);
+}
+
+void SyncInvalidationListener::EmitInvalidations(
+ const ObjectIdInvalidationMap& to_emit) {
+ DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
+ delegate_->OnInvalidate(to_emit);
}
void SyncInvalidationListener::InformRegistrationStatus(
@@ -371,6 +292,38 @@ void SyncInvalidationListener::InformError(
EmitStateChange();
}
+void SyncInvalidationListener::Acknowledge(
+ const invalidation::ObjectId& id,
+ const syncer::AckHandle& handle) {
+ UnackedInvalidationStorageMap::iterator lookup =
+ unacked_invalidations_map_.find(id);
+ if (lookup == unacked_invalidations_map_.end()) {
+ DLOG(WARNING) << "Received acknowledgement for untracked object ID";
+ return;
+ }
+ lookup->second.Acknowledge(handle);
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::SetSavedInvalidations,
+ unacked_invalidations_map_);
+}
+
+void SyncInvalidationListener::Drop(
+ const invalidation::ObjectId& id,
+ const syncer::AckHandle& handle) {
+ UnackedInvalidationStorageMap::iterator lookup =
+ unacked_invalidations_map_.find(id);
+ if (lookup == unacked_invalidations_map_.end()) {
+ DLOG(WARNING) << "Received drop for untracked object ID";
+ return;
+ }
+ lookup->second.Drop(handle);
+ invalidation_state_tracker_.Call(
+ FROM_HERE,
+ &InvalidationStateTracker::SetSavedInvalidations,
+ unacked_invalidations_map_);
+}
+
void SyncInvalidationListener::WriteState(const std::string& state) {
DCHECK(CalledOnValidThread());
DVLOG(1) << "WriteState";
@@ -382,13 +335,27 @@ void SyncInvalidationListener::DoRegistrationUpdate() {
DCHECK(CalledOnValidThread());
const ObjectIdSet& unregistered_ids =
registration_manager_->UpdateRegisteredIds(registered_ids_);
- for (ObjectIdSet::const_iterator it = unregistered_ids.begin();
+ for (ObjectIdSet::iterator it = unregistered_ids.begin();
it != unregistered_ids.end(); ++it) {
- invalidation_state_map_.erase(*it);
+ unacked_invalidations_map_.erase(*it);
}
invalidation_state_tracker_.Call(
- FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
- ack_tracker_.Ack(unregistered_ids);
+ FROM_HERE,
+ &InvalidationStateTracker::SetSavedInvalidations,
+ unacked_invalidations_map_);
+
+ ObjectIdInvalidationMap object_id_invalidation_map;
+ for (UnackedInvalidationStorageMap::iterator map_it =
+ unacked_invalidations_map_.begin();
+ map_it != unacked_invalidations_map_.end(); ++map_it) {
+ if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
+ continue;
+ }
+ map_it->second.UnpackRecordedInvalidations(
+ &object_id_invalidation_map,
+ GetThisAsAckHandler());
+ }
+ EmitInvalidations(object_id_invalidation_map);
}
void SyncInvalidationListener::StopForTest() {
@@ -396,23 +363,12 @@ void SyncInvalidationListener::StopForTest() {
Stop();
}
-InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const {
- DCHECK(CalledOnValidThread());
- return invalidation_state_map_;
-}
-
-AckTracker* SyncInvalidationListener::GetAckTrackerForTest() {
- return &ack_tracker_;
-}
-
void SyncInvalidationListener::Stop() {
DCHECK(CalledOnValidThread());
if (!invalidation_client_) {
return;
}
- ack_tracker_.Clear();
-
registration_manager_.reset();
sync_system_resources_.Stop();
invalidation_client_->Stop();
@@ -420,8 +376,6 @@ void SyncInvalidationListener::Stop() {
invalidation_client_.reset();
delegate_ = NULL;
- invalidation_state_tracker_.Reset();
- invalidation_state_map_.clear();
ticl_state_ = DEFAULT_INVALIDATION_ERROR;
push_client_state_ = DEFAULT_INVALIDATION_ERROR;
}
@@ -449,6 +403,11 @@ void SyncInvalidationListener::EmitStateChange() {
delegate_->OnInvalidatorStateChange(GetState());
}
+WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() {
+ DCHECK(CalledOnValidThread());
+ return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr());
+}
+
void SyncInvalidationListener::OnNotificationsEnabled() {
DCHECK(CalledOnValidThread());
push_client_state_ = INVALIDATIONS_ENABLED;
« no previous file with comments | « sync/notifier/sync_invalidation_listener.h ('k') | sync/notifier/sync_invalidation_listener_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698