Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "sync/notifier/sync_invalidation_listener.h" | 5 #include "sync/notifier/sync_invalidation_listener.h" |
| 6 | 6 |
| 7 #include <string> | |
| 8 #include <vector> | 7 #include <vector> |
| 9 | 8 |
| 9 #include "base/bind.h" | |
| 10 #include "base/callback.h" | 10 #include "base/callback.h" |
| 11 #include "base/compiler_specific.h" | 11 #include "base/compiler_specific.h" |
| 12 #include "base/logging.h" | 12 #include "base/logging.h" |
| 13 #include "base/tracked_objects.h" | 13 #include "base/tracked_objects.h" |
| 14 #include "google/cacheinvalidation/include/invalidation-client.h" | 14 #include "google/cacheinvalidation/include/invalidation-client.h" |
| 15 #include "google/cacheinvalidation/include/types.h" | 15 #include "google/cacheinvalidation/include/types.h" |
| 16 #include "google/cacheinvalidation/types.pb.h" | 16 #include "google/cacheinvalidation/types.pb.h" |
| 17 #include "jingle/notifier/listener/push_client.h" | 17 #include "jingle/notifier/listener/push_client.h" |
| 18 #include "sync/notifier/invalidation_util.h" | 18 #include "sync/notifier/invalidation_util.h" |
| 19 #include "sync/notifier/registration_manager.h" | 19 #include "sync/notifier/registration_manager.h" |
| 20 | 20 |
| 21 namespace { | 21 namespace { |
| 22 | 22 |
| 23 const char kApplicationName[] = "chrome-sync"; | 23 const char kApplicationName[] = "chrome-sync"; |
| 24 | 24 |
| 25 } // namespace | 25 } // namespace |
| 26 | 26 |
| 27 namespace syncer { | 27 namespace syncer { |
| 28 | 28 |
| 29 SyncInvalidationListener::Delegate::~Delegate() {} | 29 SyncInvalidationListener::Delegate::~Delegate() {} |
| 30 | 30 |
| 31 SyncInvalidationListener::SyncInvalidationListener( | 31 SyncInvalidationListener::SyncInvalidationListener( |
| 32 scoped_ptr<notifier::PushClient> push_client) | 32 scoped_ptr<notifier::PushClient> push_client) |
| 33 : push_client_(push_client.get()), | 33 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| 34 ack_tracker_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), | |
| 35 push_client_(push_client.get()), | |
| 34 sync_system_resources_(push_client.Pass(), | 36 sync_system_resources_(push_client.Pass(), |
| 35 ALLOW_THIS_IN_INITIALIZER_LIST(this)), | 37 ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| 36 delegate_(NULL), | 38 delegate_(NULL), |
| 37 ticl_state_(DEFAULT_INVALIDATION_ERROR), | 39 ticl_state_(DEFAULT_INVALIDATION_ERROR), |
| 38 push_client_state_(DEFAULT_INVALIDATION_ERROR) { | 40 push_client_state_(DEFAULT_INVALIDATION_ERROR) { |
| 39 DCHECK(CalledOnValidThread()); | 41 DCHECK(CalledOnValidThread()); |
| 40 push_client_->AddObserver(this); | 42 push_client_->AddObserver(this); |
| 41 } | 43 } |
| 42 | 44 |
| 43 SyncInvalidationListener::~SyncInvalidationListener() { | 45 SyncInvalidationListener::~SyncInvalidationListener() { |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 98 | 100 |
| 99 // TODO(rlarocque): This call exists as part of an effort to move the | 101 // TODO(rlarocque): This call exists as part of an effort to move the |
| 100 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to | 102 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to |
| 101 // storage that lives on the UI thread. Once this has been in place for a | 103 // storage that lives on the UI thread. Once this has been in place for a |
| 102 // milestone or two, we can remove it and start looking for invalidator client | 104 // milestone or two, we can remove it and start looking for invalidator client |
| 103 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142. | 105 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142. |
| 104 invalidation_state_tracker_.Call( | 106 invalidation_state_tracker_.Call( |
| 105 FROM_HERE, | 107 FROM_HERE, |
| 106 &InvalidationStateTracker::SetInvalidatorClientId, | 108 &InvalidationStateTracker::SetInvalidatorClientId, |
| 107 client_id); | 109 client_id); |
| 110 | |
| 111 // Set up reminders for any invalidations that have not been locally | |
| 112 // acknowledged. | |
| 113 ObjectIdSet unacknowledged_ids; | |
| 114 for (InvalidationStateMap::const_iterator it = | |
| 115 invalidation_state_map_.begin(); | |
| 116 it != invalidation_state_map_.end(); ++it) { | |
| 117 if (it->second.expected.Equals(it->second.current)) | |
| 118 continue; | |
| 119 unacknowledged_ids.insert(it->first); | |
| 120 } | |
| 121 if (!unacknowledged_ids.empty()) | |
| 122 ack_tracker_.Track(unacknowledged_ids); | |
| 108 } | 123 } |
| 109 | 124 |
| 110 void SyncInvalidationListener::UpdateCredentials( | 125 void SyncInvalidationListener::UpdateCredentials( |
| 111 const std::string& email, const std::string& token) { | 126 const std::string& email, const std::string& token) { |
| 112 DCHECK(CalledOnValidThread()); | 127 DCHECK(CalledOnValidThread()); |
| 113 sync_system_resources_.network()->UpdateCredentials(email, token); | 128 sync_system_resources_.network()->UpdateCredentials(email, token); |
| 114 } | 129 } |
| 115 | 130 |
| 116 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { | 131 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
| 117 DCHECK(CalledOnValidThread()); | 132 DCHECK(CalledOnValidThread()); |
| 118 registered_ids_ = ids; | 133 registered_ids_ = ids; |
| 119 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a | 134 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a |
| 120 // working XMPP connection (as observed by us), so check it instead | 135 // working XMPP connection (as observed by us), so check it instead |
| 121 // of GetState() (see http://crbug.com/139424). | 136 // of GetState() (see http://crbug.com/139424). |
| 122 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { | 137 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { |
| 123 DoRegistrationUpdate(); | 138 DoRegistrationUpdate(); |
| 124 } | 139 } |
| 125 } | 140 } |
| 126 | 141 |
| 142 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, | |
| 143 const AckHandle& ack_handle) { | |
| 144 DCHECK(CalledOnValidThread()); | |
| 145 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); | |
| 146 if (state_it == invalidation_state_map_.end()) | |
| 147 return; | |
| 148 invalidation_state_tracker_.Call( | |
| 149 FROM_HERE, | |
| 150 &InvalidationStateTracker::Acknowledge, | |
| 151 id, | |
| 152 ack_handle); | |
| 153 state_it->second.current = ack_handle; | |
| 154 if (state_it->second.expected.Equals(ack_handle)) { | |
| 155 // If the received ack matches the expected ack, then we no longer need to | |
| 156 // keep track of |id| since it is up-to-date. | |
|
Pete Williamson
2013/02/16 01:41:06
This comment confuses me a bit, we say that we don
dcheng
2013/02/22 02:51:30
Right, AckTracker has a Track() and an Ack() funct
| |
| 157 ObjectIdSet ids; | |
| 158 ids.insert(id); | |
| 159 ack_tracker_.Ack(ids); | |
| 160 } | |
| 161 } | |
| 162 | |
| 127 void SyncInvalidationListener::Ready( | 163 void SyncInvalidationListener::Ready( |
| 128 invalidation::InvalidationClient* client) { | 164 invalidation::InvalidationClient* client) { |
| 129 DCHECK(CalledOnValidThread()); | 165 DCHECK(CalledOnValidThread()); |
| 130 DCHECK_EQ(client, invalidation_client_.get()); | 166 DCHECK_EQ(client, invalidation_client_.get()); |
| 131 ticl_state_ = INVALIDATIONS_ENABLED; | 167 ticl_state_ = INVALIDATIONS_ENABLED; |
| 132 EmitStateChange(); | 168 EmitStateChange(); |
| 133 DoRegistrationUpdate(); | 169 DoRegistrationUpdate(); |
| 134 } | 170 } |
| 135 | 171 |
| 136 void SyncInvalidationListener::Invalidate( | 172 void SyncInvalidationListener::Invalidate( |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 160 } | 196 } |
| 161 | 197 |
| 162 std::string payload; | 198 std::string payload; |
| 163 // payload() CHECK()'s has_payload(), so we must check it ourselves first. | 199 // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
| 164 if (invalidation.has_payload()) | 200 if (invalidation.has_payload()) |
| 165 payload = invalidation.payload(); | 201 payload = invalidation.payload(); |
| 166 | 202 |
| 167 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) | 203 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
| 168 << " to " << invalidation.version(); | 204 << " to " << invalidation.version(); |
| 169 invalidation_state_map_[id].version = invalidation.version(); | 205 invalidation_state_map_[id].version = invalidation.version(); |
| 206 invalidation_state_map_[id].payload = payload; | |
| 170 invalidation_state_tracker_.Call( | 207 invalidation_state_tracker_.Call( |
| 171 FROM_HERE, | 208 FROM_HERE, |
| 172 &InvalidationStateTracker::SetMaxVersionAndPayload, | 209 &InvalidationStateTracker::SetMaxVersionAndPayload, |
| 173 id, invalidation.version(), payload); | 210 id, invalidation.version(), payload); |
| 174 | 211 |
| 175 ObjectIdInvalidationMap invalidation_map; | 212 ObjectIdSet ids; |
| 176 invalidation_map[id].payload = payload; | 213 ids.insert(id); |
| 177 EmitInvalidation(invalidation_map); | 214 PrepareInvalidation(ids, payload, client, ack_handle); |
| 178 // TODO(akalin): We should really acknowledge only after we get the | |
| 179 // updates from the sync server. (see http://crbug.com/78462). | |
| 180 client->Acknowledge(ack_handle); | |
| 181 } | 215 } |
| 182 | 216 |
| 183 void SyncInvalidationListener::InvalidateUnknownVersion( | 217 void SyncInvalidationListener::InvalidateUnknownVersion( |
| 184 invalidation::InvalidationClient* client, | 218 invalidation::InvalidationClient* client, |
| 185 const invalidation::ObjectId& object_id, | 219 const invalidation::ObjectId& object_id, |
| 186 const invalidation::AckHandle& ack_handle) { | 220 const invalidation::AckHandle& ack_handle) { |
| 187 DCHECK(CalledOnValidThread()); | 221 DCHECK(CalledOnValidThread()); |
| 188 DCHECK_EQ(client, invalidation_client_.get()); | 222 DCHECK_EQ(client, invalidation_client_.get()); |
| 189 DVLOG(1) << "InvalidateUnknownVersion"; | 223 DVLOG(1) << "InvalidateUnknownVersion"; |
| 190 | 224 |
| 191 ObjectIdInvalidationMap invalidation_map; | 225 ObjectIdSet ids; |
| 192 invalidation_map[object_id].payload = std::string(); | 226 ids.insert(object_id); |
| 193 EmitInvalidation(invalidation_map); | 227 PrepareInvalidation(ids, std::string(), client, ack_handle); |
| 194 // TODO(akalin): We should really acknowledge only after we get the | |
| 195 // updates from the sync server. (see http://crbug.com/78462). | |
| 196 client->Acknowledge(ack_handle); | |
| 197 } | 228 } |
| 198 | 229 |
| 199 // This should behave as if we got an invalidation with version | 230 // This should behave as if we got an invalidation with version |
| 200 // UNKNOWN_OBJECT_VERSION for all known data types. | 231 // UNKNOWN_OBJECT_VERSION for all known data types. |
| 201 void SyncInvalidationListener::InvalidateAll( | 232 void SyncInvalidationListener::InvalidateAll( |
| 202 invalidation::InvalidationClient* client, | 233 invalidation::InvalidationClient* client, |
| 203 const invalidation::AckHandle& ack_handle) { | 234 const invalidation::AckHandle& ack_handle) { |
| 204 DCHECK(CalledOnValidThread()); | 235 DCHECK(CalledOnValidThread()); |
| 205 DCHECK_EQ(client, invalidation_client_.get()); | 236 DCHECK_EQ(client, invalidation_client_.get()); |
| 206 DVLOG(1) << "InvalidateAll"; | 237 DVLOG(1) << "InvalidateAll"; |
| 207 | 238 |
| 208 const ObjectIdInvalidationMap& invalidation_map = | 239 PrepareInvalidation(registered_ids_, std::string(), client, ack_handle); |
| 209 ObjectIdSetToInvalidationMap(registered_ids_, std::string()); | 240 } |
| 210 EmitInvalidation(invalidation_map); | 241 |
| 211 // TODO(akalin): We should really acknowledge only after we get the | 242 void SyncInvalidationListener::PrepareInvalidation( |
| 212 // updates from the sync server. (see http://crbug.com/76482). | 243 const ObjectIdSet& ids, |
| 244 const std::string& payload, | |
| 245 invalidation::InvalidationClient* client, | |
| 246 const invalidation::AckHandle& ack_handle) { | |
| 247 DCHECK(CalledOnValidThread()); | |
| 248 | |
| 249 // A server invalidation resets the local retry count. | |
| 250 ack_tracker_.Ack(ids); | |
| 251 invalidation_state_tracker_.Call( | |
| 252 FROM_HERE, | |
| 253 &InvalidationStateTracker::GenerateAckHandles, | |
| 254 ids, | |
| 255 base::MessageLoopProxy::current(), | |
| 256 base::Bind(&SyncInvalidationListener::EmitInvalidation, | |
| 257 weak_ptr_factory_.GetWeakPtr(), | |
| 258 ids, | |
| 259 payload, | |
| 260 client, | |
| 261 ack_handle)); | |
| 262 } | |
| 263 | |
| 264 void SyncInvalidationListener::EmitInvalidation( | |
| 265 const ObjectIdSet& ids, | |
| 266 const std::string& payload, | |
| 267 invalidation::InvalidationClient* client, | |
| 268 const invalidation::AckHandle& ack_handle, | |
| 269 const AckHandleMap& local_ack_handles) { | |
| 270 DCHECK(CalledOnValidThread()); | |
| 271 ObjectIdInvalidationMap invalidation_map = | |
| 272 ObjectIdSetToInvalidationMap(ids, payload); | |
| 273 for (AckHandleMap::const_iterator it = local_ack_handles.begin(); | |
| 274 it != local_ack_handles.end(); ++it) { | |
| 275 // Update in-memory copy of the invalidation state. | |
| 276 invalidation_state_map_[it->first].expected = it->second; | |
| 277 invalidation_map[it->first].ack_handle = it->second; | |
| 278 } | |
| 279 ack_tracker_.Track(ids); | |
| 280 delegate_->OnInvalidate(invalidation_map); | |
| 213 client->Acknowledge(ack_handle); | 281 client->Acknowledge(ack_handle); |
| 214 } | 282 } |
| 215 | 283 |
| 216 void SyncInvalidationListener::EmitInvalidation( | 284 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { |
|
Pete Williamson
2013/02/16 01:41:06
A comment on the "OnTimeout" function might be nic
dcheng
2013/02/22 02:51:30
Hmm. The delegate interface describes what this ca
| |
| 217 const ObjectIdInvalidationMap& invalidation_map) { | 285 ObjectIdInvalidationMap invalidation_map; |
| 218 DCHECK(CalledOnValidThread()); | 286 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { |
| 287 Invalidation invalidation; | |
| 288 invalidation.ack_handle = invalidation_state_map_[*it].expected; | |
| 289 invalidation.payload = invalidation_state_map_[*it].payload; | |
| 290 invalidation_map.insert(std::make_pair(*it, invalidation)); | |
| 291 } | |
| 292 | |
| 219 delegate_->OnInvalidate(invalidation_map); | 293 delegate_->OnInvalidate(invalidation_map); |
| 220 } | 294 } |
| 221 | 295 |
| 222 void SyncInvalidationListener::InformRegistrationStatus( | 296 void SyncInvalidationListener::InformRegistrationStatus( |
| 223 invalidation::InvalidationClient* client, | 297 invalidation::InvalidationClient* client, |
| 224 const invalidation::ObjectId& object_id, | 298 const invalidation::ObjectId& object_id, |
| 225 InvalidationListener::RegistrationState new_state) { | 299 InvalidationListener::RegistrationState new_state) { |
| 226 DCHECK(CalledOnValidThread()); | 300 DCHECK(CalledOnValidThread()); |
| 227 DCHECK_EQ(client, invalidation_client_.get()); | 301 DCHECK_EQ(client, invalidation_client_.get()); |
| 228 DVLOG(1) << "InformRegistrationStatus: " | 302 DVLOG(1) << "InformRegistrationStatus: " |
| (...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 299 void SyncInvalidationListener::DoRegistrationUpdate() { | 373 void SyncInvalidationListener::DoRegistrationUpdate() { |
| 300 DCHECK(CalledOnValidThread()); | 374 DCHECK(CalledOnValidThread()); |
| 301 const ObjectIdSet& unregistered_ids = | 375 const ObjectIdSet& unregistered_ids = |
| 302 registration_manager_->UpdateRegisteredIds(registered_ids_); | 376 registration_manager_->UpdateRegisteredIds(registered_ids_); |
| 303 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); | 377 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); |
| 304 it != unregistered_ids.end(); ++it) { | 378 it != unregistered_ids.end(); ++it) { |
| 305 invalidation_state_map_.erase(*it); | 379 invalidation_state_map_.erase(*it); |
| 306 } | 380 } |
| 307 invalidation_state_tracker_.Call( | 381 invalidation_state_tracker_.Call( |
| 308 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); | 382 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
| 383 ack_tracker_.Ack(unregistered_ids); | |
| 309 } | 384 } |
| 310 | 385 |
| 311 void SyncInvalidationListener::StopForTest() { | 386 void SyncInvalidationListener::StopForTest() { |
| 312 DCHECK(CalledOnValidThread()); | 387 DCHECK(CalledOnValidThread()); |
| 313 Stop(); | 388 Stop(); |
| 314 } | 389 } |
| 315 | 390 |
| 316 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { | 391 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { |
| 317 DCHECK(CalledOnValidThread()); | 392 DCHECK(CalledOnValidThread()); |
| 318 return invalidation_state_map_; | 393 return invalidation_state_map_; |
| 319 } | 394 } |
| 320 | 395 |
| 396 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { | |
| 397 return &ack_tracker_; | |
| 398 } | |
| 399 | |
| 321 void SyncInvalidationListener::Stop() { | 400 void SyncInvalidationListener::Stop() { |
| 322 DCHECK(CalledOnValidThread()); | 401 DCHECK(CalledOnValidThread()); |
| 323 if (!invalidation_client_.get()) { | 402 if (!invalidation_client_.get()) { |
| 324 return; | 403 return; |
| 325 } | 404 } |
| 326 | 405 |
| 406 ack_tracker_.Clear(); | |
| 407 | |
| 327 registration_manager_.reset(); | 408 registration_manager_.reset(); |
| 328 sync_system_resources_.Stop(); | 409 sync_system_resources_.Stop(); |
| 329 invalidation_client_->Stop(); | 410 invalidation_client_->Stop(); |
| 330 | 411 |
| 331 invalidation_client_.reset(); | 412 invalidation_client_.reset(); |
| 332 delegate_ = NULL; | 413 delegate_ = NULL; |
| 333 | 414 |
| 334 invalidation_state_tracker_.Reset(); | 415 invalidation_state_tracker_.Reset(); |
| 335 invalidation_state_map_.clear(); | 416 invalidation_state_map_.clear(); |
| 336 ticl_state_ = DEFAULT_INVALIDATION_ERROR; | 417 ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 373 EmitStateChange(); | 454 EmitStateChange(); |
| 374 } | 455 } |
| 375 | 456 |
| 376 void SyncInvalidationListener::OnIncomingNotification( | 457 void SyncInvalidationListener::OnIncomingNotification( |
| 377 const notifier::Notification& notification) { | 458 const notifier::Notification& notification) { |
| 378 DCHECK(CalledOnValidThread()); | 459 DCHECK(CalledOnValidThread()); |
| 379 // Do nothing, since this is already handled by |invalidation_client_|. | 460 // Do nothing, since this is already handled by |invalidation_client_|. |
| 380 } | 461 } |
| 381 | 462 |
| 382 } // namespace syncer | 463 } // namespace syncer |
| OLD | NEW |