OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include "sync/notifier/sync_invalidation_listener.h" |
| 6 |
| 7 #include <vector> |
| 8 |
| 9 #include "base/bind.h" |
| 10 #include "base/callback.h" |
| 11 #include "base/compiler_specific.h" |
| 12 #include "base/logging.h" |
| 13 #include "base/tracked_objects.h" |
| 14 #include "google/cacheinvalidation/include/invalidation-client.h" |
| 15 #include "google/cacheinvalidation/include/types.h" |
| 16 #include "jingle/notifier/listener/push_client.h" |
| 17 #include "sync/notifier/invalidation_util.h" |
| 18 #include "sync/notifier/object_id_invalidation_map.h" |
| 19 #include "sync/notifier/registration_manager.h" |
| 20 |
| 21 namespace { |
| 22 |
| 23 const char kApplicationName[] = "chrome-sync"; |
| 24 |
| 25 } // namespace |
| 26 |
| 27 namespace syncer { |
| 28 |
| 29 SyncInvalidationListener::Delegate::~Delegate() {} |
| 30 |
| 31 SyncInvalidationListener::SyncInvalidationListener( |
| 32 scoped_ptr<SyncNetworkChannel> network_channel) |
| 33 : sync_network_channel_(network_channel.Pass()), |
| 34 sync_system_resources_(sync_network_channel_.get(), this), |
| 35 delegate_(NULL), |
| 36 ticl_state_(DEFAULT_INVALIDATION_ERROR), |
| 37 push_client_state_(DEFAULT_INVALIDATION_ERROR), |
| 38 weak_ptr_factory_(this) { |
| 39 DCHECK(CalledOnValidThread()); |
| 40 sync_network_channel_->AddObserver(this); |
| 41 } |
| 42 |
| 43 SyncInvalidationListener::~SyncInvalidationListener() { |
| 44 DCHECK(CalledOnValidThread()); |
| 45 sync_network_channel_->RemoveObserver(this); |
| 46 Stop(); |
| 47 DCHECK(!delegate_); |
| 48 } |
| 49 |
| 50 void SyncInvalidationListener::Start( |
| 51 const CreateInvalidationClientCallback& |
| 52 create_invalidation_client_callback, |
| 53 const std::string& client_id, const std::string& client_info, |
| 54 const std::string& invalidation_bootstrap_data, |
| 55 const UnackedInvalidationsMap& initial_unacked_invalidations, |
| 56 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
| 57 Delegate* delegate) { |
| 58 DCHECK(CalledOnValidThread()); |
| 59 Stop(); |
| 60 |
| 61 sync_system_resources_.set_platform(client_info); |
| 62 sync_system_resources_.Start(); |
| 63 |
| 64 // The Storage resource is implemented as a write-through cache. We populate |
| 65 // it with the initial state on startup, so subsequent writes go to disk and |
| 66 // update the in-memory cache, while reads just return the cached state. |
| 67 sync_system_resources_.storage()->SetInitialState( |
| 68 invalidation_bootstrap_data); |
| 69 |
| 70 unacked_invalidations_map_ = initial_unacked_invalidations; |
| 71 invalidation_state_tracker_ = invalidation_state_tracker; |
| 72 DCHECK(invalidation_state_tracker_.IsInitialized()); |
| 73 |
| 74 DCHECK(!delegate_); |
| 75 DCHECK(delegate); |
| 76 delegate_ = delegate; |
| 77 |
| 78 invalidation_client_.reset(create_invalidation_client_callback.Run( |
| 79 &sync_system_resources_, |
| 80 sync_network_channel_->GetInvalidationClientType(), |
| 81 client_id, |
| 82 kApplicationName, |
| 83 this)); |
| 84 invalidation_client_->Start(); |
| 85 |
| 86 registration_manager_.reset( |
| 87 new RegistrationManager(invalidation_client_.get())); |
| 88 } |
| 89 |
| 90 void SyncInvalidationListener::UpdateCredentials( |
| 91 const std::string& email, const std::string& token) { |
| 92 DCHECK(CalledOnValidThread()); |
| 93 sync_network_channel_->UpdateCredentials(email, token); |
| 94 } |
| 95 |
| 96 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
| 97 DCHECK(CalledOnValidThread()); |
| 98 registered_ids_ = ids; |
| 99 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a |
| 100 // working XMPP connection (as observed by us), so check it instead |
| 101 // of GetState() (see http://crbug.com/139424). |
| 102 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { |
| 103 DoRegistrationUpdate(); |
| 104 } |
| 105 } |
| 106 |
| 107 void SyncInvalidationListener::Ready( |
| 108 invalidation::InvalidationClient* client) { |
| 109 DCHECK(CalledOnValidThread()); |
| 110 DCHECK_EQ(client, invalidation_client_.get()); |
| 111 ticl_state_ = INVALIDATIONS_ENABLED; |
| 112 EmitStateChange(); |
| 113 DoRegistrationUpdate(); |
| 114 } |
| 115 |
| 116 void SyncInvalidationListener::Invalidate( |
| 117 invalidation::InvalidationClient* client, |
| 118 const invalidation::Invalidation& invalidation, |
| 119 const invalidation::AckHandle& ack_handle) { |
| 120 DCHECK(CalledOnValidThread()); |
| 121 DCHECK_EQ(client, invalidation_client_.get()); |
| 122 client->Acknowledge(ack_handle); |
| 123 |
| 124 const invalidation::ObjectId& id = invalidation.object_id(); |
| 125 |
| 126 std::string payload; |
| 127 // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
| 128 if (invalidation.has_payload()) |
| 129 payload = invalidation.payload(); |
| 130 |
| 131 DVLOG(2) << "Received invalidation with version " << invalidation.version() |
| 132 << " for " << ObjectIdToString(id); |
| 133 |
| 134 ObjectIdInvalidationMap invalidations; |
| 135 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); |
| 136 inv.set_ack_handler(GetThisAsAckHandler()); |
| 137 invalidations.Insert(inv); |
| 138 |
| 139 DispatchInvalidations(invalidations); |
| 140 } |
| 141 |
| 142 void SyncInvalidationListener::InvalidateUnknownVersion( |
| 143 invalidation::InvalidationClient* client, |
| 144 const invalidation::ObjectId& object_id, |
| 145 const invalidation::AckHandle& ack_handle) { |
| 146 DCHECK(CalledOnValidThread()); |
| 147 DCHECK_EQ(client, invalidation_client_.get()); |
| 148 DVLOG(1) << "InvalidateUnknownVersion"; |
| 149 client->Acknowledge(ack_handle); |
| 150 |
| 151 ObjectIdInvalidationMap invalidations; |
| 152 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); |
| 153 unknown_version.set_ack_handler(GetThisAsAckHandler()); |
| 154 invalidations.Insert(unknown_version); |
| 155 |
| 156 DispatchInvalidations(invalidations); |
| 157 } |
| 158 |
| 159 // This should behave as if we got an invalidation with version |
| 160 // UNKNOWN_OBJECT_VERSION for all known data types. |
| 161 void SyncInvalidationListener::InvalidateAll( |
| 162 invalidation::InvalidationClient* client, |
| 163 const invalidation::AckHandle& ack_handle) { |
| 164 DCHECK(CalledOnValidThread()); |
| 165 DCHECK_EQ(client, invalidation_client_.get()); |
| 166 DVLOG(1) << "InvalidateAll"; |
| 167 client->Acknowledge(ack_handle); |
| 168 |
| 169 ObjectIdInvalidationMap invalidations; |
| 170 for (ObjectIdSet::iterator it = registered_ids_.begin(); |
| 171 it != registered_ids_.end(); ++it) { |
| 172 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); |
| 173 unknown_version.set_ack_handler(GetThisAsAckHandler()); |
| 174 invalidations.Insert(unknown_version); |
| 175 } |
| 176 |
| 177 DispatchInvalidations(invalidations); |
| 178 } |
| 179 |
| 180 // If a handler is registered, emit right away. Otherwise, save it for later. |
| 181 void SyncInvalidationListener::DispatchInvalidations( |
| 182 const ObjectIdInvalidationMap& invalidations) { |
| 183 DCHECK(CalledOnValidThread()); |
| 184 |
| 185 ObjectIdInvalidationMap to_save = invalidations; |
| 186 ObjectIdInvalidationMap to_emit = |
| 187 invalidations.GetSubsetWithObjectIds(registered_ids_); |
| 188 |
| 189 SaveInvalidations(to_save); |
| 190 EmitSavedInvalidations(to_emit); |
| 191 } |
| 192 |
| 193 void SyncInvalidationListener::SaveInvalidations( |
| 194 const ObjectIdInvalidationMap& to_save) { |
| 195 ObjectIdSet objects_to_save = to_save.GetObjectIds(); |
| 196 for (ObjectIdSet::const_iterator it = objects_to_save.begin(); |
| 197 it != objects_to_save.end(); ++it) { |
| 198 UnackedInvalidationsMap::iterator lookup = |
| 199 unacked_invalidations_map_.find(*it); |
| 200 if (lookup == unacked_invalidations_map_.end()) { |
| 201 lookup = unacked_invalidations_map_.insert( |
| 202 std::make_pair(*it, UnackedInvalidationSet(*it))).first; |
| 203 } |
| 204 lookup->second.AddSet(to_save.ForObject(*it)); |
| 205 } |
| 206 |
| 207 invalidation_state_tracker_.Call( |
| 208 FROM_HERE, |
| 209 &InvalidationStateTracker::SetSavedInvalidations, |
| 210 unacked_invalidations_map_); |
| 211 } |
| 212 |
| 213 void SyncInvalidationListener::EmitSavedInvalidations( |
| 214 const ObjectIdInvalidationMap& to_emit) { |
| 215 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); |
| 216 delegate_->OnInvalidate(to_emit); |
| 217 } |
| 218 |
| 219 void SyncInvalidationListener::InformRegistrationStatus( |
| 220 invalidation::InvalidationClient* client, |
| 221 const invalidation::ObjectId& object_id, |
| 222 InvalidationListener::RegistrationState new_state) { |
| 223 DCHECK(CalledOnValidThread()); |
| 224 DCHECK_EQ(client, invalidation_client_.get()); |
| 225 DVLOG(1) << "InformRegistrationStatus: " |
| 226 << ObjectIdToString(object_id) << " " << new_state; |
| 227 |
| 228 if (new_state != InvalidationListener::REGISTERED) { |
| 229 // Let |registration_manager_| handle the registration backoff policy. |
| 230 registration_manager_->MarkRegistrationLost(object_id); |
| 231 } |
| 232 } |
| 233 |
| 234 void SyncInvalidationListener::InformRegistrationFailure( |
| 235 invalidation::InvalidationClient* client, |
| 236 const invalidation::ObjectId& object_id, |
| 237 bool is_transient, |
| 238 const std::string& error_message) { |
| 239 DCHECK(CalledOnValidThread()); |
| 240 DCHECK_EQ(client, invalidation_client_.get()); |
| 241 DVLOG(1) << "InformRegistrationFailure: " |
| 242 << ObjectIdToString(object_id) |
| 243 << "is_transient=" << is_transient |
| 244 << ", message=" << error_message; |
| 245 |
| 246 if (is_transient) { |
| 247 // We don't care about |unknown_hint|; we let |
| 248 // |registration_manager_| handle the registration backoff policy. |
| 249 registration_manager_->MarkRegistrationLost(object_id); |
| 250 } else { |
| 251 // Non-transient failures require an action to resolve. This could happen |
| 252 // because: |
| 253 // - the server doesn't yet recognize the data type, which could happen for |
| 254 // brand-new data types. |
| 255 // - the user has changed his password and hasn't updated it yet locally. |
| 256 // Either way, block future registration attempts for |object_id|. However, |
| 257 // we don't forget any saved invalidation state since we may use it once the |
| 258 // error is addressed. |
| 259 registration_manager_->DisableId(object_id); |
| 260 } |
| 261 } |
| 262 |
| 263 void SyncInvalidationListener::ReissueRegistrations( |
| 264 invalidation::InvalidationClient* client, |
| 265 const std::string& prefix, |
| 266 int prefix_length) { |
| 267 DCHECK(CalledOnValidThread()); |
| 268 DCHECK_EQ(client, invalidation_client_.get()); |
| 269 DVLOG(1) << "AllRegistrationsLost"; |
| 270 registration_manager_->MarkAllRegistrationsLost(); |
| 271 } |
| 272 |
| 273 void SyncInvalidationListener::InformError( |
| 274 invalidation::InvalidationClient* client, |
| 275 const invalidation::ErrorInfo& error_info) { |
| 276 DCHECK(CalledOnValidThread()); |
| 277 DCHECK_EQ(client, invalidation_client_.get()); |
| 278 LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": " |
| 279 << error_info.error_message() |
| 280 << " (transient = " << error_info.is_transient() << ")"; |
| 281 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { |
| 282 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; |
| 283 } else { |
| 284 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; |
| 285 } |
| 286 EmitStateChange(); |
| 287 } |
| 288 |
| 289 void SyncInvalidationListener::Acknowledge( |
| 290 const invalidation::ObjectId& id, |
| 291 const syncer::AckHandle& handle) { |
| 292 UnackedInvalidationsMap::iterator lookup = |
| 293 unacked_invalidations_map_.find(id); |
| 294 if (lookup == unacked_invalidations_map_.end()) { |
| 295 DLOG(WARNING) << "Received acknowledgement for untracked object ID"; |
| 296 return; |
| 297 } |
| 298 lookup->second.Acknowledge(handle); |
| 299 invalidation_state_tracker_.Call( |
| 300 FROM_HERE, |
| 301 &InvalidationStateTracker::SetSavedInvalidations, |
| 302 unacked_invalidations_map_); |
| 303 } |
| 304 |
| 305 void SyncInvalidationListener::Drop( |
| 306 const invalidation::ObjectId& id, |
| 307 const syncer::AckHandle& handle) { |
| 308 UnackedInvalidationsMap::iterator lookup = |
| 309 unacked_invalidations_map_.find(id); |
| 310 if (lookup == unacked_invalidations_map_.end()) { |
| 311 DLOG(WARNING) << "Received drop for untracked object ID"; |
| 312 return; |
| 313 } |
| 314 lookup->second.Drop(handle); |
| 315 invalidation_state_tracker_.Call( |
| 316 FROM_HERE, |
| 317 &InvalidationStateTracker::SetSavedInvalidations, |
| 318 unacked_invalidations_map_); |
| 319 } |
| 320 |
| 321 void SyncInvalidationListener::WriteState(const std::string& state) { |
| 322 DCHECK(CalledOnValidThread()); |
| 323 DVLOG(1) << "WriteState"; |
| 324 invalidation_state_tracker_.Call( |
| 325 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); |
| 326 } |
| 327 |
| 328 void SyncInvalidationListener::DoRegistrationUpdate() { |
| 329 DCHECK(CalledOnValidThread()); |
| 330 const ObjectIdSet& unregistered_ids = |
| 331 registration_manager_->UpdateRegisteredIds(registered_ids_); |
| 332 for (ObjectIdSet::iterator it = unregistered_ids.begin(); |
| 333 it != unregistered_ids.end(); ++it) { |
| 334 unacked_invalidations_map_.erase(*it); |
| 335 } |
| 336 invalidation_state_tracker_.Call( |
| 337 FROM_HERE, |
| 338 &InvalidationStateTracker::SetSavedInvalidations, |
| 339 unacked_invalidations_map_); |
| 340 |
| 341 ObjectIdInvalidationMap object_id_invalidation_map; |
| 342 for (UnackedInvalidationsMap::iterator map_it = |
| 343 unacked_invalidations_map_.begin(); |
| 344 map_it != unacked_invalidations_map_.end(); ++map_it) { |
| 345 if (registered_ids_.find(map_it->first) == registered_ids_.end()) { |
| 346 continue; |
| 347 } |
| 348 map_it->second.ExportInvalidations( |
| 349 GetThisAsAckHandler(), |
| 350 &object_id_invalidation_map); |
| 351 } |
| 352 |
| 353 // There's no need to run these through DispatchInvalidations(); they've |
| 354 // already been saved to storage (that's where we found them) so all we need |
| 355 // to do now is emit them. |
| 356 EmitSavedInvalidations(object_id_invalidation_map); |
| 357 } |
| 358 |
| 359 void SyncInvalidationListener::RequestDetailedStatus( |
| 360 base::Callback<void(const base::DictionaryValue&)> callback) const { |
| 361 DCHECK(CalledOnValidThread()); |
| 362 sync_network_channel_->RequestDetailedStatus(callback); |
| 363 callback.Run(*CollectDebugData()); |
| 364 } |
| 365 |
| 366 scoped_ptr<base::DictionaryValue> |
| 367 SyncInvalidationListener::CollectDebugData() const { |
| 368 scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue()); |
| 369 return_value->SetString( |
| 370 "SyncInvalidationListener.PushClientState", |
| 371 std::string(InvalidatorStateToString(push_client_state_))); |
| 372 return_value->SetString("SyncInvalidationListener.TiclState", |
| 373 std::string(InvalidatorStateToString(ticl_state_))); |
| 374 scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue()); |
| 375 for (UnackedInvalidationsMap::const_iterator it = |
| 376 unacked_invalidations_map_.begin(); |
| 377 it != unacked_invalidations_map_.end(); |
| 378 ++it) { |
| 379 unacked_map->Set((it->first).name(), (it->second).ToValue().release()); |
| 380 } |
| 381 return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap", |
| 382 unacked_map.release()); |
| 383 return return_value.Pass(); |
| 384 } |
| 385 |
| 386 void SyncInvalidationListener::StopForTest() { |
| 387 DCHECK(CalledOnValidThread()); |
| 388 Stop(); |
| 389 } |
| 390 |
| 391 void SyncInvalidationListener::Stop() { |
| 392 DCHECK(CalledOnValidThread()); |
| 393 if (!invalidation_client_) { |
| 394 return; |
| 395 } |
| 396 |
| 397 registration_manager_.reset(); |
| 398 sync_system_resources_.Stop(); |
| 399 invalidation_client_->Stop(); |
| 400 |
| 401 invalidation_client_.reset(); |
| 402 delegate_ = NULL; |
| 403 |
| 404 ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
| 405 push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
| 406 } |
| 407 |
| 408 InvalidatorState SyncInvalidationListener::GetState() const { |
| 409 DCHECK(CalledOnValidThread()); |
| 410 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || |
| 411 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { |
| 412 // If either the ticl or the push client rejected our credentials, |
| 413 // return INVALIDATION_CREDENTIALS_REJECTED. |
| 414 return INVALIDATION_CREDENTIALS_REJECTED; |
| 415 } |
| 416 if (ticl_state_ == INVALIDATIONS_ENABLED && |
| 417 push_client_state_ == INVALIDATIONS_ENABLED) { |
| 418 // If the ticl is ready and the push client notifications are |
| 419 // enabled, return INVALIDATIONS_ENABLED. |
| 420 return INVALIDATIONS_ENABLED; |
| 421 } |
| 422 // Otherwise, we have a transient error. |
| 423 return TRANSIENT_INVALIDATION_ERROR; |
| 424 } |
| 425 |
| 426 void SyncInvalidationListener::EmitStateChange() { |
| 427 DCHECK(CalledOnValidThread()); |
| 428 delegate_->OnInvalidatorStateChange(GetState()); |
| 429 } |
| 430 |
| 431 WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() { |
| 432 DCHECK(CalledOnValidThread()); |
| 433 return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr()); |
| 434 } |
| 435 |
| 436 void SyncInvalidationListener::OnNetworkChannelStateChanged( |
| 437 InvalidatorState invalidator_state) { |
| 438 DCHECK(CalledOnValidThread()); |
| 439 push_client_state_ = invalidator_state; |
| 440 EmitStateChange(); |
| 441 } |
| 442 |
| 443 } // namespace syncer |
OLD | NEW |