OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "sync/notifier/sync_invalidation_listener.h" | |
6 | |
7 #include <vector> | |
8 | |
9 #include "base/bind.h" | |
10 #include "base/callback.h" | |
11 #include "base/compiler_specific.h" | |
12 #include "base/logging.h" | |
13 #include "base/tracked_objects.h" | |
14 #include "google/cacheinvalidation/include/invalidation-client.h" | |
15 #include "google/cacheinvalidation/include/types.h" | |
16 #include "jingle/notifier/listener/push_client.h" | |
17 #include "sync/notifier/invalidation_util.h" | |
18 #include "sync/notifier/object_id_invalidation_map.h" | |
19 #include "sync/notifier/registration_manager.h" | |
20 | |
21 namespace { | |
22 | |
23 const char kApplicationName[] = "chrome-sync"; | |
24 | |
25 } // namespace | |
26 | |
27 namespace syncer { | |
28 | |
29 SyncInvalidationListener::Delegate::~Delegate() {} | |
30 | |
31 SyncInvalidationListener::SyncInvalidationListener( | |
32 scoped_ptr<SyncNetworkChannel> network_channel) | |
33 : sync_network_channel_(network_channel.Pass()), | |
34 sync_system_resources_(sync_network_channel_.get(), this), | |
35 delegate_(NULL), | |
36 ticl_state_(DEFAULT_INVALIDATION_ERROR), | |
37 push_client_state_(DEFAULT_INVALIDATION_ERROR), | |
38 weak_ptr_factory_(this) { | |
39 DCHECK(CalledOnValidThread()); | |
40 sync_network_channel_->AddObserver(this); | |
41 } | |
42 | |
43 SyncInvalidationListener::~SyncInvalidationListener() { | |
44 DCHECK(CalledOnValidThread()); | |
45 sync_network_channel_->RemoveObserver(this); | |
46 Stop(); | |
47 DCHECK(!delegate_); | |
48 } | |
49 | |
50 void SyncInvalidationListener::Start( | |
51 const CreateInvalidationClientCallback& | |
52 create_invalidation_client_callback, | |
53 const std::string& client_id, const std::string& client_info, | |
54 const std::string& invalidation_bootstrap_data, | |
55 const UnackedInvalidationsMap& initial_unacked_invalidations, | |
56 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, | |
57 Delegate* delegate) { | |
58 DCHECK(CalledOnValidThread()); | |
59 Stop(); | |
60 | |
61 sync_system_resources_.set_platform(client_info); | |
62 sync_system_resources_.Start(); | |
63 | |
64 // The Storage resource is implemented as a write-through cache. We populate | |
65 // it with the initial state on startup, so subsequent writes go to disk and | |
66 // update the in-memory cache, while reads just return the cached state. | |
67 sync_system_resources_.storage()->SetInitialState( | |
68 invalidation_bootstrap_data); | |
69 | |
70 unacked_invalidations_map_ = initial_unacked_invalidations; | |
71 invalidation_state_tracker_ = invalidation_state_tracker; | |
72 DCHECK(invalidation_state_tracker_.IsInitialized()); | |
73 | |
74 DCHECK(!delegate_); | |
75 DCHECK(delegate); | |
76 delegate_ = delegate; | |
77 | |
78 invalidation_client_.reset(create_invalidation_client_callback.Run( | |
79 &sync_system_resources_, | |
80 sync_network_channel_->GetInvalidationClientType(), | |
81 client_id, | |
82 kApplicationName, | |
83 this)); | |
84 invalidation_client_->Start(); | |
85 | |
86 registration_manager_.reset( | |
87 new RegistrationManager(invalidation_client_.get())); | |
88 } | |
89 | |
90 void SyncInvalidationListener::UpdateCredentials( | |
91 const std::string& email, const std::string& token) { | |
92 DCHECK(CalledOnValidThread()); | |
93 sync_network_channel_->UpdateCredentials(email, token); | |
94 } | |
95 | |
96 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { | |
97 DCHECK(CalledOnValidThread()); | |
98 registered_ids_ = ids; | |
99 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a | |
100 // working XMPP connection (as observed by us), so check it instead | |
101 // of GetState() (see http://crbug.com/139424). | |
102 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { | |
103 DoRegistrationUpdate(); | |
104 } | |
105 } | |
106 | |
107 void SyncInvalidationListener::Ready( | |
108 invalidation::InvalidationClient* client) { | |
109 DCHECK(CalledOnValidThread()); | |
110 DCHECK_EQ(client, invalidation_client_.get()); | |
111 ticl_state_ = INVALIDATIONS_ENABLED; | |
112 EmitStateChange(); | |
113 DoRegistrationUpdate(); | |
114 } | |
115 | |
116 void SyncInvalidationListener::Invalidate( | |
117 invalidation::InvalidationClient* client, | |
118 const invalidation::Invalidation& invalidation, | |
119 const invalidation::AckHandle& ack_handle) { | |
120 DCHECK(CalledOnValidThread()); | |
121 DCHECK_EQ(client, invalidation_client_.get()); | |
122 client->Acknowledge(ack_handle); | |
123 | |
124 const invalidation::ObjectId& id = invalidation.object_id(); | |
125 | |
126 std::string payload; | |
127 // payload() CHECK()'s has_payload(), so we must check it ourselves first. | |
128 if (invalidation.has_payload()) | |
129 payload = invalidation.payload(); | |
130 | |
131 DVLOG(2) << "Received invalidation with version " << invalidation.version() | |
132 << " for " << ObjectIdToString(id); | |
133 | |
134 ObjectIdInvalidationMap invalidations; | |
135 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); | |
136 inv.set_ack_handler(GetThisAsAckHandler()); | |
137 invalidations.Insert(inv); | |
138 | |
139 DispatchInvalidations(invalidations); | |
140 } | |
141 | |
142 void SyncInvalidationListener::InvalidateUnknownVersion( | |
143 invalidation::InvalidationClient* client, | |
144 const invalidation::ObjectId& object_id, | |
145 const invalidation::AckHandle& ack_handle) { | |
146 DCHECK(CalledOnValidThread()); | |
147 DCHECK_EQ(client, invalidation_client_.get()); | |
148 DVLOG(1) << "InvalidateUnknownVersion"; | |
149 client->Acknowledge(ack_handle); | |
150 | |
151 ObjectIdInvalidationMap invalidations; | |
152 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); | |
153 unknown_version.set_ack_handler(GetThisAsAckHandler()); | |
154 invalidations.Insert(unknown_version); | |
155 | |
156 DispatchInvalidations(invalidations); | |
157 } | |
158 | |
159 // This should behave as if we got an invalidation with version | |
160 // UNKNOWN_OBJECT_VERSION for all known data types. | |
161 void SyncInvalidationListener::InvalidateAll( | |
162 invalidation::InvalidationClient* client, | |
163 const invalidation::AckHandle& ack_handle) { | |
164 DCHECK(CalledOnValidThread()); | |
165 DCHECK_EQ(client, invalidation_client_.get()); | |
166 DVLOG(1) << "InvalidateAll"; | |
167 client->Acknowledge(ack_handle); | |
168 | |
169 ObjectIdInvalidationMap invalidations; | |
170 for (ObjectIdSet::iterator it = registered_ids_.begin(); | |
171 it != registered_ids_.end(); ++it) { | |
172 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); | |
173 unknown_version.set_ack_handler(GetThisAsAckHandler()); | |
174 invalidations.Insert(unknown_version); | |
175 } | |
176 | |
177 DispatchInvalidations(invalidations); | |
178 } | |
179 | |
180 // If a handler is registered, emit right away. Otherwise, save it for later. | |
181 void SyncInvalidationListener::DispatchInvalidations( | |
182 const ObjectIdInvalidationMap& invalidations) { | |
183 DCHECK(CalledOnValidThread()); | |
184 | |
185 ObjectIdInvalidationMap to_save = invalidations; | |
186 ObjectIdInvalidationMap to_emit = | |
187 invalidations.GetSubsetWithObjectIds(registered_ids_); | |
188 | |
189 SaveInvalidations(to_save); | |
190 EmitSavedInvalidations(to_emit); | |
191 } | |
192 | |
193 void SyncInvalidationListener::SaveInvalidations( | |
194 const ObjectIdInvalidationMap& to_save) { | |
195 ObjectIdSet objects_to_save = to_save.GetObjectIds(); | |
196 for (ObjectIdSet::const_iterator it = objects_to_save.begin(); | |
197 it != objects_to_save.end(); ++it) { | |
198 UnackedInvalidationsMap::iterator lookup = | |
199 unacked_invalidations_map_.find(*it); | |
200 if (lookup == unacked_invalidations_map_.end()) { | |
201 lookup = unacked_invalidations_map_.insert( | |
202 std::make_pair(*it, UnackedInvalidationSet(*it))).first; | |
203 } | |
204 lookup->second.AddSet(to_save.ForObject(*it)); | |
205 } | |
206 | |
207 invalidation_state_tracker_.Call( | |
208 FROM_HERE, | |
209 &InvalidationStateTracker::SetSavedInvalidations, | |
210 unacked_invalidations_map_); | |
211 } | |
212 | |
213 void SyncInvalidationListener::EmitSavedInvalidations( | |
214 const ObjectIdInvalidationMap& to_emit) { | |
215 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); | |
216 delegate_->OnInvalidate(to_emit); | |
217 } | |
218 | |
219 void SyncInvalidationListener::InformRegistrationStatus( | |
220 invalidation::InvalidationClient* client, | |
221 const invalidation::ObjectId& object_id, | |
222 InvalidationListener::RegistrationState new_state) { | |
223 DCHECK(CalledOnValidThread()); | |
224 DCHECK_EQ(client, invalidation_client_.get()); | |
225 DVLOG(1) << "InformRegistrationStatus: " | |
226 << ObjectIdToString(object_id) << " " << new_state; | |
227 | |
228 if (new_state != InvalidationListener::REGISTERED) { | |
229 // Let |registration_manager_| handle the registration backoff policy. | |
230 registration_manager_->MarkRegistrationLost(object_id); | |
231 } | |
232 } | |
233 | |
234 void SyncInvalidationListener::InformRegistrationFailure( | |
235 invalidation::InvalidationClient* client, | |
236 const invalidation::ObjectId& object_id, | |
237 bool is_transient, | |
238 const std::string& error_message) { | |
239 DCHECK(CalledOnValidThread()); | |
240 DCHECK_EQ(client, invalidation_client_.get()); | |
241 DVLOG(1) << "InformRegistrationFailure: " | |
242 << ObjectIdToString(object_id) | |
243 << "is_transient=" << is_transient | |
244 << ", message=" << error_message; | |
245 | |
246 if (is_transient) { | |
247 // We don't care about |unknown_hint|; we let | |
248 // |registration_manager_| handle the registration backoff policy. | |
249 registration_manager_->MarkRegistrationLost(object_id); | |
250 } else { | |
251 // Non-transient failures require an action to resolve. This could happen | |
252 // because: | |
253 // - the server doesn't yet recognize the data type, which could happen for | |
254 // brand-new data types. | |
255 // - the user has changed his password and hasn't updated it yet locally. | |
256 // Either way, block future registration attempts for |object_id|. However, | |
257 // we don't forget any saved invalidation state since we may use it once the | |
258 // error is addressed. | |
259 registration_manager_->DisableId(object_id); | |
260 } | |
261 } | |
262 | |
263 void SyncInvalidationListener::ReissueRegistrations( | |
264 invalidation::InvalidationClient* client, | |
265 const std::string& prefix, | |
266 int prefix_length) { | |
267 DCHECK(CalledOnValidThread()); | |
268 DCHECK_EQ(client, invalidation_client_.get()); | |
269 DVLOG(1) << "AllRegistrationsLost"; | |
270 registration_manager_->MarkAllRegistrationsLost(); | |
271 } | |
272 | |
273 void SyncInvalidationListener::InformError( | |
274 invalidation::InvalidationClient* client, | |
275 const invalidation::ErrorInfo& error_info) { | |
276 DCHECK(CalledOnValidThread()); | |
277 DCHECK_EQ(client, invalidation_client_.get()); | |
278 LOG(ERROR) << "Ticl error " << error_info.error_reason() << ": " | |
279 << error_info.error_message() | |
280 << " (transient = " << error_info.is_transient() << ")"; | |
281 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { | |
282 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; | |
283 } else { | |
284 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; | |
285 } | |
286 EmitStateChange(); | |
287 } | |
288 | |
289 void SyncInvalidationListener::Acknowledge( | |
290 const invalidation::ObjectId& id, | |
291 const syncer::AckHandle& handle) { | |
292 UnackedInvalidationsMap::iterator lookup = | |
293 unacked_invalidations_map_.find(id); | |
294 if (lookup == unacked_invalidations_map_.end()) { | |
295 DLOG(WARNING) << "Received acknowledgement for untracked object ID"; | |
296 return; | |
297 } | |
298 lookup->second.Acknowledge(handle); | |
299 invalidation_state_tracker_.Call( | |
300 FROM_HERE, | |
301 &InvalidationStateTracker::SetSavedInvalidations, | |
302 unacked_invalidations_map_); | |
303 } | |
304 | |
305 void SyncInvalidationListener::Drop( | |
306 const invalidation::ObjectId& id, | |
307 const syncer::AckHandle& handle) { | |
308 UnackedInvalidationsMap::iterator lookup = | |
309 unacked_invalidations_map_.find(id); | |
310 if (lookup == unacked_invalidations_map_.end()) { | |
311 DLOG(WARNING) << "Received drop for untracked object ID"; | |
312 return; | |
313 } | |
314 lookup->second.Drop(handle); | |
315 invalidation_state_tracker_.Call( | |
316 FROM_HERE, | |
317 &InvalidationStateTracker::SetSavedInvalidations, | |
318 unacked_invalidations_map_); | |
319 } | |
320 | |
321 void SyncInvalidationListener::WriteState(const std::string& state) { | |
322 DCHECK(CalledOnValidThread()); | |
323 DVLOG(1) << "WriteState"; | |
324 invalidation_state_tracker_.Call( | |
325 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); | |
326 } | |
327 | |
328 void SyncInvalidationListener::DoRegistrationUpdate() { | |
329 DCHECK(CalledOnValidThread()); | |
330 const ObjectIdSet& unregistered_ids = | |
331 registration_manager_->UpdateRegisteredIds(registered_ids_); | |
332 for (ObjectIdSet::iterator it = unregistered_ids.begin(); | |
333 it != unregistered_ids.end(); ++it) { | |
334 unacked_invalidations_map_.erase(*it); | |
335 } | |
336 invalidation_state_tracker_.Call( | |
337 FROM_HERE, | |
338 &InvalidationStateTracker::SetSavedInvalidations, | |
339 unacked_invalidations_map_); | |
340 | |
341 ObjectIdInvalidationMap object_id_invalidation_map; | |
342 for (UnackedInvalidationsMap::iterator map_it = | |
343 unacked_invalidations_map_.begin(); | |
344 map_it != unacked_invalidations_map_.end(); ++map_it) { | |
345 if (registered_ids_.find(map_it->first) == registered_ids_.end()) { | |
346 continue; | |
347 } | |
348 map_it->second.ExportInvalidations( | |
349 GetThisAsAckHandler(), | |
350 &object_id_invalidation_map); | |
351 } | |
352 | |
353 // There's no need to run these through DispatchInvalidations(); they've | |
354 // already been saved to storage (that's where we found them) so all we need | |
355 // to do now is emit them. | |
356 EmitSavedInvalidations(object_id_invalidation_map); | |
357 } | |
358 | |
359 void SyncInvalidationListener::RequestDetailedStatus( | |
360 base::Callback<void(const base::DictionaryValue&)> callback) const { | |
361 DCHECK(CalledOnValidThread()); | |
362 sync_network_channel_->RequestDetailedStatus(callback); | |
363 callback.Run(*CollectDebugData()); | |
364 } | |
365 | |
366 scoped_ptr<base::DictionaryValue> | |
367 SyncInvalidationListener::CollectDebugData() const { | |
368 scoped_ptr<base::DictionaryValue> return_value(new base::DictionaryValue()); | |
369 return_value->SetString( | |
370 "SyncInvalidationListener.PushClientState", | |
371 std::string(InvalidatorStateToString(push_client_state_))); | |
372 return_value->SetString("SyncInvalidationListener.TiclState", | |
373 std::string(InvalidatorStateToString(ticl_state_))); | |
374 scoped_ptr<base::DictionaryValue> unacked_map(new base::DictionaryValue()); | |
375 for (UnackedInvalidationsMap::const_iterator it = | |
376 unacked_invalidations_map_.begin(); | |
377 it != unacked_invalidations_map_.end(); | |
378 ++it) { | |
379 unacked_map->Set((it->first).name(), (it->second).ToValue().release()); | |
380 } | |
381 return_value->Set("SyncInvalidationListener.UnackedInvalidationsMap", | |
382 unacked_map.release()); | |
383 return return_value.Pass(); | |
384 } | |
385 | |
386 void SyncInvalidationListener::StopForTest() { | |
387 DCHECK(CalledOnValidThread()); | |
388 Stop(); | |
389 } | |
390 | |
391 void SyncInvalidationListener::Stop() { | |
392 DCHECK(CalledOnValidThread()); | |
393 if (!invalidation_client_) { | |
394 return; | |
395 } | |
396 | |
397 registration_manager_.reset(); | |
398 sync_system_resources_.Stop(); | |
399 invalidation_client_->Stop(); | |
400 | |
401 invalidation_client_.reset(); | |
402 delegate_ = NULL; | |
403 | |
404 ticl_state_ = DEFAULT_INVALIDATION_ERROR; | |
405 push_client_state_ = DEFAULT_INVALIDATION_ERROR; | |
406 } | |
407 | |
408 InvalidatorState SyncInvalidationListener::GetState() const { | |
409 DCHECK(CalledOnValidThread()); | |
410 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || | |
411 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { | |
412 // If either the ticl or the push client rejected our credentials, | |
413 // return INVALIDATION_CREDENTIALS_REJECTED. | |
414 return INVALIDATION_CREDENTIALS_REJECTED; | |
415 } | |
416 if (ticl_state_ == INVALIDATIONS_ENABLED && | |
417 push_client_state_ == INVALIDATIONS_ENABLED) { | |
418 // If the ticl is ready and the push client notifications are | |
419 // enabled, return INVALIDATIONS_ENABLED. | |
420 return INVALIDATIONS_ENABLED; | |
421 } | |
422 // Otherwise, we have a transient error. | |
423 return TRANSIENT_INVALIDATION_ERROR; | |
424 } | |
425 | |
426 void SyncInvalidationListener::EmitStateChange() { | |
427 DCHECK(CalledOnValidThread()); | |
428 delegate_->OnInvalidatorStateChange(GetState()); | |
429 } | |
430 | |
431 WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() { | |
432 DCHECK(CalledOnValidThread()); | |
433 return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr()); | |
434 } | |
435 | |
436 void SyncInvalidationListener::OnNetworkChannelStateChanged( | |
437 InvalidatorState invalidator_state) { | |
438 DCHECK(CalledOnValidThread()); | |
439 push_client_state_ = invalidator_state; | |
440 EmitStateChange(); | |
441 } | |
442 | |
443 } // namespace syncer | |
OLD | NEW |