Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "sync/notifier/sync_invalidation_listener.h" | 5 #include "sync/notifier/sync_invalidation_listener.h" |
| 6 | 6 |
| 7 #include <vector> | 7 #include <vector> |
| 8 | 8 |
| 9 #include "base/bind.h" | 9 #include "base/bind.h" |
| 10 #include "base/callback.h" | 10 #include "base/callback.h" |
| 11 #include "base/compiler_specific.h" | 11 #include "base/compiler_specific.h" |
| 12 #include "base/logging.h" | 12 #include "base/logging.h" |
| 13 #include "base/tracked_objects.h" | 13 #include "base/tracked_objects.h" |
| 14 #include "google/cacheinvalidation/include/invalidation-client.h" | 14 #include "google/cacheinvalidation/include/invalidation-client.h" |
| 15 #include "google/cacheinvalidation/include/types.h" | 15 #include "google/cacheinvalidation/include/types.h" |
| 16 #include "google/cacheinvalidation/types.pb.h" | 16 #include "google/cacheinvalidation/types.pb.h" |
| 17 #include "jingle/notifier/listener/push_client.h" | 17 #include "jingle/notifier/listener/push_client.h" |
| 18 #include "sync/notifier/invalidation_util.h" | 18 #include "sync/notifier/invalidation_util.h" |
| 19 #include "sync/notifier/object_id_invalidation_map.h" | |
| 19 #include "sync/notifier/registration_manager.h" | 20 #include "sync/notifier/registration_manager.h" |
| 20 | 21 |
| 21 namespace { | 22 namespace { |
| 22 | 23 |
| 23 const char kApplicationName[] = "chrome-sync"; | 24 const char kApplicationName[] = "chrome-sync"; |
| 24 | 25 |
| 25 static const int64 kUnknownVersion = -1; | |
| 26 | |
| 27 } // namespace | 26 } // namespace |
| 28 | 27 |
| 29 namespace syncer { | 28 namespace syncer { |
| 30 | 29 |
| 31 SyncInvalidationListener::Delegate::~Delegate() {} | 30 SyncInvalidationListener::Delegate::~Delegate() {} |
| 32 | 31 |
| 33 SyncInvalidationListener::SyncInvalidationListener( | 32 SyncInvalidationListener::SyncInvalidationListener( |
| 34 base::TickClock* tick_clock, | |
| 35 scoped_ptr<notifier::PushClient> push_client) | 33 scoped_ptr<notifier::PushClient> push_client) |
| 36 : ack_tracker_(tick_clock, this), | 34 : push_client_(push_client.get()), |
| 37 push_client_(push_client.get()), | |
| 38 sync_system_resources_(push_client.Pass(), this), | 35 sync_system_resources_(push_client.Pass(), this), |
| 39 delegate_(NULL), | 36 delegate_(NULL), |
| 40 ticl_state_(DEFAULT_INVALIDATION_ERROR), | 37 ticl_state_(DEFAULT_INVALIDATION_ERROR), |
| 41 push_client_state_(DEFAULT_INVALIDATION_ERROR), | 38 push_client_state_(DEFAULT_INVALIDATION_ERROR), |
| 42 weak_ptr_factory_(this) { | 39 weak_ptr_factory_(this) { |
| 43 DCHECK(CalledOnValidThread()); | 40 DCHECK(CalledOnValidThread()); |
| 44 push_client_->AddObserver(this); | 41 push_client_->AddObserver(this); |
| 45 } | 42 } |
| 46 | 43 |
| 47 SyncInvalidationListener::~SyncInvalidationListener() { | 44 SyncInvalidationListener::~SyncInvalidationListener() { |
| 48 DCHECK(CalledOnValidThread()); | 45 DCHECK(CalledOnValidThread()); |
| 49 push_client_->RemoveObserver(this); | 46 push_client_->RemoveObserver(this); |
| 50 Stop(); | 47 Stop(); |
| 51 DCHECK(!delegate_); | 48 DCHECK(!delegate_); |
| 52 } | 49 } |
| 53 | 50 |
| 54 void SyncInvalidationListener::Start( | 51 void SyncInvalidationListener::Start( |
| 55 const CreateInvalidationClientCallback& | 52 const CreateInvalidationClientCallback& |
| 56 create_invalidation_client_callback, | 53 create_invalidation_client_callback, |
| 57 const std::string& client_id, const std::string& client_info, | 54 const std::string& client_id, const std::string& client_info, |
| 58 const std::string& invalidation_bootstrap_data, | 55 const std::string& invalidation_bootstrap_data, |
| 59 const InvalidationStateMap& initial_invalidation_state_map, | 56 const UnackedInvalidationsMap& initial_unacked_invalidations, |
| 60 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, | 57 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
| 61 Delegate* delegate) { | 58 Delegate* delegate) { |
| 62 DCHECK(CalledOnValidThread()); | 59 DCHECK(CalledOnValidThread()); |
| 63 Stop(); | 60 Stop(); |
| 64 | 61 |
| 65 sync_system_resources_.set_platform(client_info); | 62 sync_system_resources_.set_platform(client_info); |
| 66 sync_system_resources_.Start(); | 63 sync_system_resources_.Start(); |
| 67 | 64 |
| 68 // The Storage resource is implemented as a write-through cache. We populate | 65 // The Storage resource is implemented as a write-through cache. We populate |
| 69 // it with the initial state on startup, so subsequent writes go to disk and | 66 // it with the initial state on startup, so subsequent writes go to disk and |
| 70 // update the in-memory cache, while reads just return the cached state. | 67 // update the in-memory cache, while reads just return the cached state. |
| 71 sync_system_resources_.storage()->SetInitialState( | 68 sync_system_resources_.storage()->SetInitialState( |
| 72 invalidation_bootstrap_data); | 69 invalidation_bootstrap_data); |
| 73 | 70 |
| 74 invalidation_state_map_ = initial_invalidation_state_map; | 71 unacked_invalidations_map_ = initial_unacked_invalidations; |
| 75 if (invalidation_state_map_.empty()) { | |
| 76 DVLOG(2) << "No initial max invalidation versions for any id"; | |
| 77 } else { | |
| 78 for (InvalidationStateMap::const_iterator it = | |
| 79 invalidation_state_map_.begin(); | |
| 80 it != invalidation_state_map_.end(); ++it) { | |
| 81 DVLOG(2) << "Initial max invalidation version for " | |
| 82 << ObjectIdToString(it->first) << " is " | |
| 83 << it->second.version; | |
| 84 } | |
| 85 } | |
| 86 invalidation_state_tracker_ = invalidation_state_tracker; | 72 invalidation_state_tracker_ = invalidation_state_tracker; |
| 87 DCHECK(invalidation_state_tracker_.IsInitialized()); | 73 DCHECK(invalidation_state_tracker_.IsInitialized()); |
| 88 | 74 |
| 89 DCHECK(!delegate_); | 75 DCHECK(!delegate_); |
| 90 DCHECK(delegate); | 76 DCHECK(delegate); |
| 91 delegate_ = delegate; | 77 delegate_ = delegate; |
| 92 | 78 |
| 93 #if defined(OS_IOS) | 79 #if defined(OS_IOS) |
| 94 int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS; | 80 int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS; |
| 95 #else | 81 #else |
| 96 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; | 82 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; |
| 97 #endif | 83 #endif |
| 98 invalidation_client_.reset( | 84 invalidation_client_.reset( |
| 99 create_invalidation_client_callback.Run( | 85 create_invalidation_client_callback.Run( |
| 100 &sync_system_resources_, client_type, client_id, | 86 &sync_system_resources_, client_type, client_id, |
| 101 kApplicationName, this)); | 87 kApplicationName, this)); |
| 102 invalidation_client_->Start(); | 88 invalidation_client_->Start(); |
| 103 | 89 |
| 104 registration_manager_.reset( | 90 registration_manager_.reset( |
| 105 new RegistrationManager(invalidation_client_.get())); | 91 new RegistrationManager(invalidation_client_.get())); |
| 106 | |
| 107 // Set up reminders for any invalidations that have not been locally | |
| 108 // acknowledged. | |
| 109 ObjectIdSet unacknowledged_ids; | |
| 110 for (InvalidationStateMap::const_iterator it = | |
| 111 invalidation_state_map_.begin(); | |
| 112 it != invalidation_state_map_.end(); ++it) { | |
| 113 if (it->second.expected.Equals(it->second.current)) | |
| 114 continue; | |
| 115 unacknowledged_ids.insert(it->first); | |
| 116 } | |
| 117 if (!unacknowledged_ids.empty()) | |
| 118 ack_tracker_.Track(unacknowledged_ids); | |
| 119 } | 92 } |
| 120 | 93 |
| 121 void SyncInvalidationListener::UpdateCredentials( | 94 void SyncInvalidationListener::UpdateCredentials( |
| 122 const std::string& email, const std::string& token) { | 95 const std::string& email, const std::string& token) { |
| 123 DCHECK(CalledOnValidThread()); | 96 DCHECK(CalledOnValidThread()); |
| 124 sync_system_resources_.network()->UpdateCredentials(email, token); | 97 sync_system_resources_.network()->UpdateCredentials(email, token); |
| 125 } | 98 } |
| 126 | 99 |
| 127 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { | 100 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
| 128 DCHECK(CalledOnValidThread()); | 101 DCHECK(CalledOnValidThread()); |
| 129 registered_ids_ = ids; | 102 registered_ids_ = ids; |
| 130 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a | 103 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a |
| 131 // working XMPP connection (as observed by us), so check it instead | 104 // working XMPP connection (as observed by us), so check it instead |
| 132 // of GetState() (see http://crbug.com/139424). | 105 // of GetState() (see http://crbug.com/139424). |
| 133 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { | 106 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { |
| 134 DoRegistrationUpdate(); | 107 DoRegistrationUpdate(); |
| 135 } | 108 } |
| 136 } | 109 } |
| 137 | 110 |
| 138 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, | |
| 139 const AckHandle& ack_handle) { | |
| 140 DCHECK(CalledOnValidThread()); | |
| 141 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); | |
| 142 if (state_it == invalidation_state_map_.end()) | |
| 143 return; | |
| 144 invalidation_state_tracker_.Call( | |
| 145 FROM_HERE, | |
| 146 &InvalidationStateTracker::Acknowledge, | |
| 147 id, | |
| 148 ack_handle); | |
| 149 state_it->second.current = ack_handle; | |
| 150 if (state_it->second.expected.Equals(ack_handle)) { | |
| 151 // If the received ack matches the expected ack, then we no longer need to | |
| 152 // keep track of |id| since it is up-to-date. | |
| 153 ObjectIdSet ids; | |
| 154 ids.insert(id); | |
| 155 ack_tracker_.Ack(ids); | |
| 156 } | |
| 157 } | |
| 158 | |
| 159 void SyncInvalidationListener::Ready( | 111 void SyncInvalidationListener::Ready( |
| 160 invalidation::InvalidationClient* client) { | 112 invalidation::InvalidationClient* client) { |
| 161 DCHECK(CalledOnValidThread()); | 113 DCHECK(CalledOnValidThread()); |
| 162 DCHECK_EQ(client, invalidation_client_.get()); | 114 DCHECK_EQ(client, invalidation_client_.get()); |
| 163 ticl_state_ = INVALIDATIONS_ENABLED; | 115 ticl_state_ = INVALIDATIONS_ENABLED; |
| 164 EmitStateChange(); | 116 EmitStateChange(); |
| 165 DoRegistrationUpdate(); | 117 DoRegistrationUpdate(); |
| 166 } | 118 } |
| 167 | 119 |
| 168 void SyncInvalidationListener::Invalidate( | 120 void SyncInvalidationListener::Invalidate( |
| 169 invalidation::InvalidationClient* client, | 121 invalidation::InvalidationClient* client, |
| 170 const invalidation::Invalidation& invalidation, | 122 const invalidation::Invalidation& invalidation, |
| 171 const invalidation::AckHandle& ack_handle) { | 123 const invalidation::AckHandle& ack_handle) { |
| 172 DCHECK(CalledOnValidThread()); | 124 DCHECK(CalledOnValidThread()); |
| 173 DCHECK_EQ(client, invalidation_client_.get()); | 125 DCHECK_EQ(client, invalidation_client_.get()); |
| 174 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); | 126 client->Acknowledge(ack_handle); |
| 175 | 127 |
| 176 const invalidation::ObjectId& id = invalidation.object_id(); | 128 const invalidation::ObjectId& id = invalidation.object_id(); |
| 177 | 129 |
| 178 // The invalidation API spec allows for the possibility of redundant | |
| 179 // invalidations, so keep track of the max versions and drop | |
| 180 // invalidations with old versions. | |
| 181 // | |
| 182 // TODO(akalin): Now that we keep track of registered ids, we | |
| 183 // should drop invalidations for unregistered ids. We may also | |
| 184 // have to filter it at a higher level, as invalidations for | |
| 185 // newly-unregistered ids may already be in flight. | |
| 186 InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id); | |
| 187 if ((it != invalidation_state_map_.end()) && | |
| 188 (invalidation.version() <= it->second.version)) { | |
| 189 // Drop redundant invalidations. | |
| 190 client->Acknowledge(ack_handle); | |
| 191 return; | |
| 192 } | |
| 193 | |
| 194 std::string payload; | 130 std::string payload; |
| 195 // payload() CHECK()'s has_payload(), so we must check it ourselves first. | 131 // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
| 196 if (invalidation.has_payload()) | 132 if (invalidation.has_payload()) |
| 197 payload = invalidation.payload(); | 133 payload = invalidation.payload(); |
| 198 | 134 |
| 199 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) | 135 DVLOG(2) << "Received invalidation with version " << invalidation.version() |
| 200 << " to " << invalidation.version(); | 136 << " for " << ObjectIdToString(id); |
| 201 invalidation_state_map_[id].version = invalidation.version(); | |
| 202 invalidation_state_map_[id].payload = payload; | |
| 203 invalidation_state_tracker_.Call( | |
| 204 FROM_HERE, | |
| 205 &InvalidationStateTracker::SetMaxVersionAndPayload, | |
| 206 id, invalidation.version(), payload); | |
| 207 | 137 |
| 208 ObjectIdSet ids; | 138 ObjectIdInvalidationMap invalidations; |
| 209 ids.insert(id); | 139 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); |
| 210 PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle); | 140 inv.set_ack_handler(GetThisAsAckHandler()); |
| 141 invalidations.Insert(inv); | |
| 142 | |
| 143 DispatchInvalidations(invalidations); | |
| 211 } | 144 } |
| 212 | 145 |
| 213 void SyncInvalidationListener::InvalidateUnknownVersion( | 146 void SyncInvalidationListener::InvalidateUnknownVersion( |
| 214 invalidation::InvalidationClient* client, | 147 invalidation::InvalidationClient* client, |
| 215 const invalidation::ObjectId& object_id, | 148 const invalidation::ObjectId& object_id, |
| 216 const invalidation::AckHandle& ack_handle) { | 149 const invalidation::AckHandle& ack_handle) { |
| 217 DCHECK(CalledOnValidThread()); | 150 DCHECK(CalledOnValidThread()); |
| 218 DCHECK_EQ(client, invalidation_client_.get()); | 151 DCHECK_EQ(client, invalidation_client_.get()); |
| 219 DVLOG(1) << "InvalidateUnknownVersion"; | 152 DVLOG(1) << "InvalidateUnknownVersion"; |
| 153 client->Acknowledge(ack_handle); | |
| 220 | 154 |
| 221 ObjectIdSet ids; | 155 ObjectIdInvalidationMap invalidations; |
| 222 ids.insert(object_id); | 156 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); |
| 223 PrepareInvalidation( | 157 unknown_version.set_ack_handler(GetThisAsAckHandler()); |
| 224 ids, | 158 invalidations.Insert(unknown_version); |
| 225 kUnknownVersion, | 159 |
| 226 std::string(), | 160 DispatchInvalidations(invalidations); |
| 227 client, | |
| 228 ack_handle); | |
| 229 } | 161 } |
| 230 | 162 |
| 231 // This should behave as if we got an invalidation with version | 163 // This should behave as if we got an invalidation with version |
| 232 // UNKNOWN_OBJECT_VERSION for all known data types. | 164 // UNKNOWN_OBJECT_VERSION for all known data types. |
| 233 void SyncInvalidationListener::InvalidateAll( | 165 void SyncInvalidationListener::InvalidateAll( |
| 234 invalidation::InvalidationClient* client, | 166 invalidation::InvalidationClient* client, |
| 235 const invalidation::AckHandle& ack_handle) { | 167 const invalidation::AckHandle& ack_handle) { |
| 236 DCHECK(CalledOnValidThread()); | 168 DCHECK(CalledOnValidThread()); |
| 237 DCHECK_EQ(client, invalidation_client_.get()); | 169 DCHECK_EQ(client, invalidation_client_.get()); |
| 238 DVLOG(1) << "InvalidateAll"; | 170 DVLOG(1) << "InvalidateAll"; |
| 171 client->Acknowledge(ack_handle); | |
| 239 | 172 |
| 240 PrepareInvalidation( | 173 ObjectIdInvalidationMap invalidations; |
| 241 registered_ids_, | 174 for (ObjectIdSet::iterator it = registered_ids_.begin(); |
| 242 kUnknownVersion, | 175 it != registered_ids_.end(); ++it) { |
| 243 std::string(), | 176 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); |
| 244 client, | 177 unknown_version.set_ack_handler(GetThisAsAckHandler()); |
| 245 ack_handle); | 178 invalidations.Insert(unknown_version); |
| 179 } | |
| 180 | |
| 181 DispatchInvalidations(invalidations); | |
| 246 } | 182 } |
| 247 | 183 |
| 248 void SyncInvalidationListener::PrepareInvalidation( | 184 // If a handler is registered, emit right away. Otherwise, save it for later. |
| 249 const ObjectIdSet& ids, | 185 void SyncInvalidationListener::DispatchInvalidations( |
| 250 int64 version, | 186 const ObjectIdInvalidationMap& invalidations) { |
| 251 const std::string& payload, | |
| 252 invalidation::InvalidationClient* client, | |
| 253 const invalidation::AckHandle& ack_handle) { | |
| 254 DCHECK(CalledOnValidThread()); | 187 DCHECK(CalledOnValidThread()); |
| 255 | 188 |
| 256 // A server invalidation resets the local retry count. | 189 ObjectIdInvalidationMap to_save = invalidations; |
| 257 ack_tracker_.Ack(ids); | 190 ObjectIdInvalidationMap to_emit = |
| 191 invalidations.GetSubsetWithObjectIds(registered_ids_); | |
| 192 | |
| 193 SaveInvalidations(to_save); | |
| 194 EmitInvalidations(to_emit); | |
| 195 } | |
| 196 | |
| 197 void SyncInvalidationListener::SaveInvalidations( | |
| 198 const ObjectIdInvalidationMap& to_save) { | |
| 199 ObjectIdSet objects_to_save = to_save.GetObjectIds(); | |
| 200 for (ObjectIdSet::iterator it = objects_to_save.begin(); | |
|
tim (not reviewing)
2013/11/18 21:59:05
const_iterator?
rlarocque
2013/11/19 00:35:49
Done.
| |
| 201 it != objects_to_save.end(); ++it) { | |
| 202 UnackedInvalidationsMap::iterator lookup = | |
| 203 unacked_invalidations_map_.find(*it); | |
| 204 if (lookup == unacked_invalidations_map_.end()) { | |
| 205 lookup = unacked_invalidations_map_.insert( | |
| 206 std::make_pair(*it, UnackedInvalidationSet(*it))).first; | |
| 207 } | |
| 208 lookup->second.AddSet(to_save.ForObject(*it)); | |
| 209 } | |
| 210 | |
| 258 invalidation_state_tracker_.Call( | 211 invalidation_state_tracker_.Call( |
|
rlarocque
2013/11/21 20:09:27
What happens if this call doesn't make it through
tim (not reviewing)
2013/11/22 01:24:54
The server won't ping periodicially within the 10s
rlarocque
2013/11/22 21:03:32
Yes, it will send the same invalidation repeatedly
| |
| 259 FROM_HERE, | 212 FROM_HERE, |
| 260 &InvalidationStateTracker::GenerateAckHandles, | 213 &InvalidationStateTracker::SetSavedInvalidations, |
| 261 ids, | 214 unacked_invalidations_map_); |
| 262 base::MessageLoopProxy::current(), | |
| 263 base::Bind(&SyncInvalidationListener::EmitInvalidation, | |
| 264 weak_ptr_factory_.GetWeakPtr(), | |
| 265 ids, | |
| 266 version, | |
| 267 payload, | |
| 268 client, | |
| 269 ack_handle)); | |
| 270 } | 215 } |
| 271 | 216 |
| 272 void SyncInvalidationListener::EmitInvalidation( | 217 void SyncInvalidationListener::EmitInvalidations( |
| 273 const ObjectIdSet& ids, | 218 const ObjectIdInvalidationMap& to_emit) { |
|
tim (not reviewing)
2013/11/20 18:27:04
|to_emit| is a subset of unacked_invalidations_map
rlarocque
2013/11/21 20:09:27
Yep, they should be. For every invalidation we em
| |
| 274 int64 version, | 219 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); |
| 275 const std::string& payload, | 220 delegate_->OnInvalidate(to_emit); |
| 276 invalidation::InvalidationClient* client, | |
| 277 const invalidation::AckHandle& ack_handle, | |
| 278 const AckHandleMap& local_ack_handles) { | |
| 279 DCHECK(CalledOnValidThread()); | |
| 280 | |
| 281 ObjectIdInvalidationMap invalidation_map; | |
| 282 for (AckHandleMap::const_iterator it = local_ack_handles.begin(); | |
| 283 it != local_ack_handles.end(); ++it) { | |
| 284 // Update in-memory copy of the invalidation state. | |
| 285 invalidation_state_map_[it->first].expected = it->second; | |
| 286 | |
| 287 if (version == kUnknownVersion) { | |
| 288 Invalidation inv = Invalidation::InitUnknownVersion(it->first); | |
| 289 inv.set_ack_handle(it->second); | |
| 290 invalidation_map.Insert(inv); | |
| 291 } else { | |
| 292 Invalidation inv = Invalidation::Init(it->first, version, payload); | |
| 293 inv.set_ack_handle(it->second); | |
| 294 invalidation_map.Insert(inv); | |
| 295 } | |
| 296 } | |
| 297 ack_tracker_.Track(ids); | |
| 298 delegate_->OnInvalidate(invalidation_map); | |
| 299 client->Acknowledge(ack_handle); | |
| 300 } | |
| 301 | |
| 302 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { | |
| 303 ObjectIdInvalidationMap invalidation_map; | |
| 304 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { | |
| 305 if (invalidation_state_map_[*it].version == kUnknownVersion) { | |
| 306 Invalidation inv = Invalidation::InitUnknownVersion(*it); | |
| 307 inv.set_ack_handle(invalidation_state_map_[*it].expected); | |
| 308 invalidation_map.Insert(inv); | |
| 309 } else { | |
| 310 Invalidation inv = Invalidation::Init( | |
| 311 *it, | |
| 312 invalidation_state_map_[*it].version, | |
| 313 invalidation_state_map_[*it].payload); | |
| 314 inv.set_ack_handle(invalidation_state_map_[*it].expected); | |
| 315 invalidation_map.Insert(inv); | |
| 316 } | |
| 317 } | |
| 318 delegate_->OnInvalidate(invalidation_map); | |
| 319 } | 221 } |
| 320 | 222 |
| 321 void SyncInvalidationListener::InformRegistrationStatus( | 223 void SyncInvalidationListener::InformRegistrationStatus( |
| 322 invalidation::InvalidationClient* client, | 224 invalidation::InvalidationClient* client, |
| 323 const invalidation::ObjectId& object_id, | 225 const invalidation::ObjectId& object_id, |
| 324 InvalidationListener::RegistrationState new_state) { | 226 InvalidationListener::RegistrationState new_state) { |
| 325 DCHECK(CalledOnValidThread()); | 227 DCHECK(CalledOnValidThread()); |
| 326 DCHECK_EQ(client, invalidation_client_.get()); | 228 DCHECK_EQ(client, invalidation_client_.get()); |
| 327 DVLOG(1) << "InformRegistrationStatus: " | 229 DVLOG(1) << "InformRegistrationStatus: " |
| 328 << ObjectIdToString(object_id) << " " << new_state; | 230 << ObjectIdToString(object_id) << " " << new_state; |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 381 << error_info.error_message() | 283 << error_info.error_message() |
| 382 << " (transient = " << error_info.is_transient() << ")"; | 284 << " (transient = " << error_info.is_transient() << ")"; |
| 383 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { | 285 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { |
| 384 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; | 286 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; |
| 385 } else { | 287 } else { |
| 386 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; | 288 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; |
| 387 } | 289 } |
| 388 EmitStateChange(); | 290 EmitStateChange(); |
| 389 } | 291 } |
| 390 | 292 |
| 293 void SyncInvalidationListener::Acknowledge( | |
| 294 const invalidation::ObjectId& id, | |
| 295 const syncer::AckHandle& handle) { | |
| 296 UnackedInvalidationsMap::iterator lookup = | |
| 297 unacked_invalidations_map_.find(id); | |
| 298 if (lookup == unacked_invalidations_map_.end()) { | |
| 299 DLOG(WARNING) << "Received acknowledgement for untracked object ID"; | |
| 300 return; | |
| 301 } | |
| 302 lookup->second.Acknowledge(handle); | |
| 303 invalidation_state_tracker_.Call( | |
| 304 FROM_HERE, | |
| 305 &InvalidationStateTracker::SetSavedInvalidations, | |
| 306 unacked_invalidations_map_); | |
| 307 } | |
| 308 | |
| 309 void SyncInvalidationListener::Drop( | |
| 310 const invalidation::ObjectId& id, | |
| 311 const syncer::AckHandle& handle) { | |
| 312 UnackedInvalidationsMap::iterator lookup = | |
| 313 unacked_invalidations_map_.find(id); | |
| 314 if (lookup == unacked_invalidations_map_.end()) { | |
| 315 DLOG(WARNING) << "Received drop for untracked object ID"; | |
| 316 return; | |
| 317 } | |
| 318 lookup->second.Drop(handle); | |
| 319 invalidation_state_tracker_.Call( | |
|
tim (not reviewing)
2013/11/20 18:27:04
What happens if this doesn't make it through befor
rlarocque
2013/11/21 20:09:27
In this case it's no big deal.
We'll fail to pe
| |
| 320 FROM_HERE, | |
| 321 &InvalidationStateTracker::SetSavedInvalidations, | |
| 322 unacked_invalidations_map_); | |
| 323 } | |
| 324 | |
| 391 void SyncInvalidationListener::WriteState(const std::string& state) { | 325 void SyncInvalidationListener::WriteState(const std::string& state) { |
| 392 DCHECK(CalledOnValidThread()); | 326 DCHECK(CalledOnValidThread()); |
| 393 DVLOG(1) << "WriteState"; | 327 DVLOG(1) << "WriteState"; |
| 394 invalidation_state_tracker_.Call( | 328 invalidation_state_tracker_.Call( |
| 395 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); | 329 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); |
| 396 } | 330 } |
| 397 | 331 |
| 398 void SyncInvalidationListener::DoRegistrationUpdate() { | 332 void SyncInvalidationListener::DoRegistrationUpdate() { |
| 399 DCHECK(CalledOnValidThread()); | 333 DCHECK(CalledOnValidThread()); |
| 400 const ObjectIdSet& unregistered_ids = | 334 const ObjectIdSet& unregistered_ids = |
| 401 registration_manager_->UpdateRegisteredIds(registered_ids_); | 335 registration_manager_->UpdateRegisteredIds(registered_ids_); |
| 402 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); | 336 for (ObjectIdSet::iterator it = unregistered_ids.begin(); |
| 403 it != unregistered_ids.end(); ++it) { | 337 it != unregistered_ids.end(); ++it) { |
| 404 invalidation_state_map_.erase(*it); | 338 unacked_invalidations_map_.erase(*it); |
| 405 } | 339 } |
| 406 invalidation_state_tracker_.Call( | 340 invalidation_state_tracker_.Call( |
| 407 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); | 341 FROM_HERE, |
| 408 ack_tracker_.Ack(unregistered_ids); | 342 &InvalidationStateTracker::SetSavedInvalidations, |
| 343 unacked_invalidations_map_); | |
| 344 | |
| 345 ObjectIdInvalidationMap object_id_invalidation_map; | |
| 346 for (UnackedInvalidationsMap::iterator map_it = | |
| 347 unacked_invalidations_map_.begin(); | |
| 348 map_it != unacked_invalidations_map_.end(); ++map_it) { | |
| 349 if (registered_ids_.find(map_it->first) == registered_ids_.end()) { | |
| 350 continue; | |
| 351 } | |
| 352 map_it->second.ExportInvalidations( | |
| 353 GetThisAsAckHandler(), | |
| 354 &object_id_invalidation_map); | |
| 355 } | |
| 356 EmitInvalidations(object_id_invalidation_map); | |
|
tim (not reviewing)
2013/11/20 18:27:04
Add a comment before this line that we need only E
tim (not reviewing)
2013/11/20 18:27:44
s/never/rarely
rlarocque
2013/11/21 20:09:27
You're right. The rename makes sense.
I've added
| |
| 409 } | 357 } |
| 410 | 358 |
| 411 void SyncInvalidationListener::StopForTest() { | 359 void SyncInvalidationListener::StopForTest() { |
| 412 DCHECK(CalledOnValidThread()); | 360 DCHECK(CalledOnValidThread()); |
| 413 Stop(); | 361 Stop(); |
| 414 } | 362 } |
| 415 | 363 |
| 416 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { | |
| 417 DCHECK(CalledOnValidThread()); | |
| 418 return invalidation_state_map_; | |
| 419 } | |
| 420 | |
| 421 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { | |
| 422 return &ack_tracker_; | |
| 423 } | |
| 424 | |
| 425 void SyncInvalidationListener::Stop() { | 364 void SyncInvalidationListener::Stop() { |
| 426 DCHECK(CalledOnValidThread()); | 365 DCHECK(CalledOnValidThread()); |
| 427 if (!invalidation_client_) { | 366 if (!invalidation_client_) { |
| 428 return; | 367 return; |
| 429 } | 368 } |
| 430 | 369 |
| 431 ack_tracker_.Clear(); | |
| 432 | |
| 433 registration_manager_.reset(); | 370 registration_manager_.reset(); |
| 434 sync_system_resources_.Stop(); | 371 sync_system_resources_.Stop(); |
| 435 invalidation_client_->Stop(); | 372 invalidation_client_->Stop(); |
| 436 | 373 |
| 437 invalidation_client_.reset(); | 374 invalidation_client_.reset(); |
| 438 delegate_ = NULL; | 375 delegate_ = NULL; |
| 439 | 376 |
| 440 invalidation_state_tracker_.Reset(); | |
| 441 invalidation_state_map_.clear(); | |
| 442 ticl_state_ = DEFAULT_INVALIDATION_ERROR; | 377 ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
| 443 push_client_state_ = DEFAULT_INVALIDATION_ERROR; | 378 push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
| 444 } | 379 } |
| 445 | 380 |
| 446 InvalidatorState SyncInvalidationListener::GetState() const { | 381 InvalidatorState SyncInvalidationListener::GetState() const { |
| 447 DCHECK(CalledOnValidThread()); | 382 DCHECK(CalledOnValidThread()); |
| 448 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || | 383 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || |
| 449 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { | 384 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { |
| 450 // If either the ticl or the push client rejected our credentials, | 385 // If either the ticl or the push client rejected our credentials, |
| 451 // return INVALIDATION_CREDENTIALS_REJECTED. | 386 // return INVALIDATION_CREDENTIALS_REJECTED. |
| 452 return INVALIDATION_CREDENTIALS_REJECTED; | 387 return INVALIDATION_CREDENTIALS_REJECTED; |
| 453 } | 388 } |
| 454 if (ticl_state_ == INVALIDATIONS_ENABLED && | 389 if (ticl_state_ == INVALIDATIONS_ENABLED && |
| 455 push_client_state_ == INVALIDATIONS_ENABLED) { | 390 push_client_state_ == INVALIDATIONS_ENABLED) { |
| 456 // If the ticl is ready and the push client notifications are | 391 // If the ticl is ready and the push client notifications are |
| 457 // enabled, return INVALIDATIONS_ENABLED. | 392 // enabled, return INVALIDATIONS_ENABLED. |
| 458 return INVALIDATIONS_ENABLED; | 393 return INVALIDATIONS_ENABLED; |
| 459 } | 394 } |
| 460 // Otherwise, we have a transient error. | 395 // Otherwise, we have a transient error. |
| 461 return TRANSIENT_INVALIDATION_ERROR; | 396 return TRANSIENT_INVALIDATION_ERROR; |
| 462 } | 397 } |
| 463 | 398 |
| 464 void SyncInvalidationListener::EmitStateChange() { | 399 void SyncInvalidationListener::EmitStateChange() { |
| 465 DCHECK(CalledOnValidThread()); | 400 DCHECK(CalledOnValidThread()); |
| 466 delegate_->OnInvalidatorStateChange(GetState()); | 401 delegate_->OnInvalidatorStateChange(GetState()); |
| 467 } | 402 } |
| 468 | 403 |
| 404 WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() { | |
| 405 DCHECK(CalledOnValidThread()); | |
| 406 return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr()); | |
| 407 } | |
| 408 | |
| 469 void SyncInvalidationListener::OnNotificationsEnabled() { | 409 void SyncInvalidationListener::OnNotificationsEnabled() { |
| 470 DCHECK(CalledOnValidThread()); | 410 DCHECK(CalledOnValidThread()); |
| 471 push_client_state_ = INVALIDATIONS_ENABLED; | 411 push_client_state_ = INVALIDATIONS_ENABLED; |
| 472 EmitStateChange(); | 412 EmitStateChange(); |
| 473 } | 413 } |
| 474 | 414 |
| 475 void SyncInvalidationListener::OnNotificationsDisabled( | 415 void SyncInvalidationListener::OnNotificationsDisabled( |
| 476 notifier::NotificationsDisabledReason reason) { | 416 notifier::NotificationsDisabledReason reason) { |
| 477 DCHECK(CalledOnValidThread()); | 417 DCHECK(CalledOnValidThread()); |
| 478 push_client_state_ = FromNotifierReason(reason); | 418 push_client_state_ = FromNotifierReason(reason); |
| 479 EmitStateChange(); | 419 EmitStateChange(); |
| 480 } | 420 } |
| 481 | 421 |
| 482 void SyncInvalidationListener::OnIncomingNotification( | 422 void SyncInvalidationListener::OnIncomingNotification( |
| 483 const notifier::Notification& notification) { | 423 const notifier::Notification& notification) { |
| 484 DCHECK(CalledOnValidThread()); | 424 DCHECK(CalledOnValidThread()); |
| 485 // Do nothing, since this is already handled by |invalidation_client_|. | 425 // Do nothing, since this is already handled by |invalidation_client_|. |
| 486 } | 426 } |
| 487 | 427 |
| 488 } // namespace syncer | 428 } // namespace syncer |
| OLD | NEW |