OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/notifier/sync_invalidation_listener.h" | 5 #include "sync/notifier/sync_invalidation_listener.h" |
6 | 6 |
7 #include <string> | |
8 #include <vector> | 7 #include <vector> |
9 | 8 |
9 #include "base/bind.h" | |
10 #include "base/callback.h" | 10 #include "base/callback.h" |
11 #include "base/compiler_specific.h" | 11 #include "base/compiler_specific.h" |
12 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "base/tracked_objects.h" | 13 #include "base/tracked_objects.h" |
14 #include "google/cacheinvalidation/include/invalidation-client.h" | 14 #include "google/cacheinvalidation/include/invalidation-client.h" |
15 #include "google/cacheinvalidation/include/types.h" | 15 #include "google/cacheinvalidation/include/types.h" |
16 #include "google/cacheinvalidation/types.pb.h" | 16 #include "google/cacheinvalidation/types.pb.h" |
17 #include "jingle/notifier/listener/push_client.h" | 17 #include "jingle/notifier/listener/push_client.h" |
18 #include "sync/notifier/invalidation_util.h" | 18 #include "sync/notifier/invalidation_util.h" |
19 #include "sync/notifier/registration_manager.h" | 19 #include "sync/notifier/registration_manager.h" |
20 | 20 |
21 namespace { | 21 namespace { |
22 | 22 |
23 const char kApplicationName[] = "chrome-sync"; | 23 const char kApplicationName[] = "chrome-sync"; |
24 | 24 |
25 } // namespace | 25 } // namespace |
26 | 26 |
27 namespace syncer { | 27 namespace syncer { |
28 | 28 |
29 SyncInvalidationListener::Delegate::~Delegate() {} | 29 SyncInvalidationListener::Delegate::~Delegate() {} |
30 | 30 |
31 SyncInvalidationListener::SyncInvalidationListener( | 31 SyncInvalidationListener::SyncInvalidationListener( |
32 scoped_ptr<notifier::PushClient> push_client) | 32 scoped_ptr<notifier::PushClient> push_client) |
33 : push_client_(push_client.get()), | 33 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
34 ack_tracker_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), | |
35 push_client_(push_client.get()), | |
34 sync_system_resources_(push_client.Pass(), | 36 sync_system_resources_(push_client.Pass(), |
35 ALLOW_THIS_IN_INITIALIZER_LIST(this)), | 37 ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
36 delegate_(NULL), | 38 delegate_(NULL), |
37 ticl_state_(DEFAULT_INVALIDATION_ERROR), | 39 ticl_state_(DEFAULT_INVALIDATION_ERROR), |
38 push_client_state_(DEFAULT_INVALIDATION_ERROR) { | 40 push_client_state_(DEFAULT_INVALIDATION_ERROR) { |
39 DCHECK(CalledOnValidThread()); | 41 DCHECK(CalledOnValidThread()); |
40 push_client_->AddObserver(this); | 42 push_client_->AddObserver(this); |
41 } | 43 } |
42 | 44 |
43 SyncInvalidationListener::~SyncInvalidationListener() { | 45 SyncInvalidationListener::~SyncInvalidationListener() { |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
98 | 100 |
99 // TODO(rlarocque): This call exists as part of an effort to move the | 101 // TODO(rlarocque): This call exists as part of an effort to move the |
100 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to | 102 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to |
101 // storage that lives on the UI thread. Once this has been in place for a | 103 // storage that lives on the UI thread. Once this has been in place for a |
102 // milestone or two, we can remove it and start looking for invalidator client | 104 // milestone or two, we can remove it and start looking for invalidator client |
103 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142. | 105 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142. |
104 invalidation_state_tracker_.Call( | 106 invalidation_state_tracker_.Call( |
105 FROM_HERE, | 107 FROM_HERE, |
106 &InvalidationStateTracker::SetInvalidatorClientId, | 108 &InvalidationStateTracker::SetInvalidatorClientId, |
107 client_id); | 109 client_id); |
110 | |
111 // Set up reminders for any invalidations that have not been locally | |
112 // acknowledged. | |
113 ObjectIdSet unacknowledged_ids; | |
114 for (InvalidationStateMap::const_iterator it = | |
115 invalidation_state_map_.begin(); | |
116 it != invalidation_state_map_.end(); ++it) { | |
117 if (it->second.expected.Equals(it->second.current)) | |
118 continue; | |
119 unacknowledged_ids.insert(it->first); | |
120 } | |
121 if (!unacknowledged_ids.empty()) | |
122 ack_tracker_.Track(unacknowledged_ids); | |
108 } | 123 } |
109 | 124 |
110 void SyncInvalidationListener::UpdateCredentials( | 125 void SyncInvalidationListener::UpdateCredentials( |
111 const std::string& email, const std::string& token) { | 126 const std::string& email, const std::string& token) { |
112 DCHECK(CalledOnValidThread()); | 127 DCHECK(CalledOnValidThread()); |
113 sync_system_resources_.network()->UpdateCredentials(email, token); | 128 sync_system_resources_.network()->UpdateCredentials(email, token); |
114 } | 129 } |
115 | 130 |
116 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { | 131 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
117 DCHECK(CalledOnValidThread()); | 132 DCHECK(CalledOnValidThread()); |
118 registered_ids_ = ids; | 133 registered_ids_ = ids; |
119 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a | 134 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a |
120 // working XMPP connection (as observed by us), so check it instead | 135 // working XMPP connection (as observed by us), so check it instead |
121 // of GetState() (see http://crbug.com/139424). | 136 // of GetState() (see http://crbug.com/139424). |
122 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { | 137 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { |
123 DoRegistrationUpdate(); | 138 DoRegistrationUpdate(); |
124 } | 139 } |
125 } | 140 } |
126 | 141 |
142 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, | |
143 const AckHandle& ack_handle) { | |
144 DCHECK(CalledOnValidThread()); | |
145 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); | |
146 if (state_it == invalidation_state_map_.end()) | |
147 return; | |
148 invalidation_state_tracker_.Call( | |
149 FROM_HERE, | |
150 &InvalidationStateTracker::Acknowledge, | |
151 id, | |
152 ack_handle); | |
153 state_it->second.current = ack_handle; | |
154 if (state_it->second.expected.Equals(ack_handle)) { | |
155 // If the received ack matches the expected ack, then we no longer need to | |
156 // keep track of |id| since it is up-to-date. | |
Pete Williamson
2013/02/16 01:41:06
This comment confuses me a bit, we say that we don
dcheng
2013/02/22 02:51:30
Right, AckTracker has a Track() and an Ack() funct
| |
157 ObjectIdSet ids; | |
158 ids.insert(id); | |
159 ack_tracker_.Ack(ids); | |
160 } | |
161 } | |
162 | |
127 void SyncInvalidationListener::Ready( | 163 void SyncInvalidationListener::Ready( |
128 invalidation::InvalidationClient* client) { | 164 invalidation::InvalidationClient* client) { |
129 DCHECK(CalledOnValidThread()); | 165 DCHECK(CalledOnValidThread()); |
130 DCHECK_EQ(client, invalidation_client_.get()); | 166 DCHECK_EQ(client, invalidation_client_.get()); |
131 ticl_state_ = INVALIDATIONS_ENABLED; | 167 ticl_state_ = INVALIDATIONS_ENABLED; |
132 EmitStateChange(); | 168 EmitStateChange(); |
133 DoRegistrationUpdate(); | 169 DoRegistrationUpdate(); |
134 } | 170 } |
135 | 171 |
136 void SyncInvalidationListener::Invalidate( | 172 void SyncInvalidationListener::Invalidate( |
(...skipping 23 matching lines...) Expand all Loading... | |
160 } | 196 } |
161 | 197 |
162 std::string payload; | 198 std::string payload; |
163 // payload() CHECK()'s has_payload(), so we must check it ourselves first. | 199 // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
164 if (invalidation.has_payload()) | 200 if (invalidation.has_payload()) |
165 payload = invalidation.payload(); | 201 payload = invalidation.payload(); |
166 | 202 |
167 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) | 203 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
168 << " to " << invalidation.version(); | 204 << " to " << invalidation.version(); |
169 invalidation_state_map_[id].version = invalidation.version(); | 205 invalidation_state_map_[id].version = invalidation.version(); |
206 invalidation_state_map_[id].payload = payload; | |
170 invalidation_state_tracker_.Call( | 207 invalidation_state_tracker_.Call( |
171 FROM_HERE, | 208 FROM_HERE, |
172 &InvalidationStateTracker::SetMaxVersionAndPayload, | 209 &InvalidationStateTracker::SetMaxVersionAndPayload, |
173 id, invalidation.version(), payload); | 210 id, invalidation.version(), payload); |
174 | 211 |
175 ObjectIdInvalidationMap invalidation_map; | 212 ObjectIdSet ids; |
176 invalidation_map[id].payload = payload; | 213 ids.insert(id); |
177 EmitInvalidation(invalidation_map); | 214 PrepareInvalidation(ids, payload, client, ack_handle); |
178 // TODO(akalin): We should really acknowledge only after we get the | |
179 // updates from the sync server. (see http://crbug.com/78462). | |
180 client->Acknowledge(ack_handle); | |
181 } | 215 } |
182 | 216 |
183 void SyncInvalidationListener::InvalidateUnknownVersion( | 217 void SyncInvalidationListener::InvalidateUnknownVersion( |
184 invalidation::InvalidationClient* client, | 218 invalidation::InvalidationClient* client, |
185 const invalidation::ObjectId& object_id, | 219 const invalidation::ObjectId& object_id, |
186 const invalidation::AckHandle& ack_handle) { | 220 const invalidation::AckHandle& ack_handle) { |
187 DCHECK(CalledOnValidThread()); | 221 DCHECK(CalledOnValidThread()); |
188 DCHECK_EQ(client, invalidation_client_.get()); | 222 DCHECK_EQ(client, invalidation_client_.get()); |
189 DVLOG(1) << "InvalidateUnknownVersion"; | 223 DVLOG(1) << "InvalidateUnknownVersion"; |
190 | 224 |
191 ObjectIdInvalidationMap invalidation_map; | 225 ObjectIdSet ids; |
192 invalidation_map[object_id].payload = std::string(); | 226 ids.insert(object_id); |
193 EmitInvalidation(invalidation_map); | 227 PrepareInvalidation(ids, std::string(), client, ack_handle); |
194 // TODO(akalin): We should really acknowledge only after we get the | |
195 // updates from the sync server. (see http://crbug.com/78462). | |
196 client->Acknowledge(ack_handle); | |
197 } | 228 } |
198 | 229 |
199 // This should behave as if we got an invalidation with version | 230 // This should behave as if we got an invalidation with version |
200 // UNKNOWN_OBJECT_VERSION for all known data types. | 231 // UNKNOWN_OBJECT_VERSION for all known data types. |
201 void SyncInvalidationListener::InvalidateAll( | 232 void SyncInvalidationListener::InvalidateAll( |
202 invalidation::InvalidationClient* client, | 233 invalidation::InvalidationClient* client, |
203 const invalidation::AckHandle& ack_handle) { | 234 const invalidation::AckHandle& ack_handle) { |
204 DCHECK(CalledOnValidThread()); | 235 DCHECK(CalledOnValidThread()); |
205 DCHECK_EQ(client, invalidation_client_.get()); | 236 DCHECK_EQ(client, invalidation_client_.get()); |
206 DVLOG(1) << "InvalidateAll"; | 237 DVLOG(1) << "InvalidateAll"; |
207 | 238 |
208 const ObjectIdInvalidationMap& invalidation_map = | 239 PrepareInvalidation(registered_ids_, std::string(), client, ack_handle); |
209 ObjectIdSetToInvalidationMap(registered_ids_, std::string()); | 240 } |
210 EmitInvalidation(invalidation_map); | 241 |
211 // TODO(akalin): We should really acknowledge only after we get the | 242 void SyncInvalidationListener::PrepareInvalidation( |
212 // updates from the sync server. (see http://crbug.com/76482). | 243 const ObjectIdSet& ids, |
244 const std::string& payload, | |
245 invalidation::InvalidationClient* client, | |
246 const invalidation::AckHandle& ack_handle) { | |
247 DCHECK(CalledOnValidThread()); | |
248 | |
249 // A server invalidation resets the local retry count. | |
250 ack_tracker_.Ack(ids); | |
251 invalidation_state_tracker_.Call( | |
252 FROM_HERE, | |
253 &InvalidationStateTracker::GenerateAckHandles, | |
254 ids, | |
255 base::MessageLoopProxy::current(), | |
256 base::Bind(&SyncInvalidationListener::EmitInvalidation, | |
257 weak_ptr_factory_.GetWeakPtr(), | |
258 ids, | |
259 payload, | |
260 client, | |
261 ack_handle)); | |
262 } | |
263 | |
264 void SyncInvalidationListener::EmitInvalidation( | |
265 const ObjectIdSet& ids, | |
266 const std::string& payload, | |
267 invalidation::InvalidationClient* client, | |
268 const invalidation::AckHandle& ack_handle, | |
269 const AckHandleMap& local_ack_handles) { | |
270 DCHECK(CalledOnValidThread()); | |
271 ObjectIdInvalidationMap invalidation_map = | |
272 ObjectIdSetToInvalidationMap(ids, payload); | |
273 for (AckHandleMap::const_iterator it = local_ack_handles.begin(); | |
274 it != local_ack_handles.end(); ++it) { | |
275 // Update in-memory copy of the invalidation state. | |
276 invalidation_state_map_[it->first].expected = it->second; | |
277 invalidation_map[it->first].ack_handle = it->second; | |
278 } | |
279 ack_tracker_.Track(ids); | |
280 delegate_->OnInvalidate(invalidation_map); | |
213 client->Acknowledge(ack_handle); | 281 client->Acknowledge(ack_handle); |
214 } | 282 } |
215 | 283 |
216 void SyncInvalidationListener::EmitInvalidation( | 284 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { |
Pete Williamson
2013/02/16 01:41:06
A comment on the "OnTimeout" function might be nic
dcheng
2013/02/22 02:51:30
Hmm. The delegate interface describes what this ca
| |
217 const ObjectIdInvalidationMap& invalidation_map) { | 285 ObjectIdInvalidationMap invalidation_map; |
218 DCHECK(CalledOnValidThread()); | 286 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { |
287 Invalidation invalidation; | |
288 invalidation.ack_handle = invalidation_state_map_[*it].expected; | |
289 invalidation.payload = invalidation_state_map_[*it].payload; | |
290 invalidation_map.insert(std::make_pair(*it, invalidation)); | |
291 } | |
292 | |
219 delegate_->OnInvalidate(invalidation_map); | 293 delegate_->OnInvalidate(invalidation_map); |
220 } | 294 } |
221 | 295 |
222 void SyncInvalidationListener::InformRegistrationStatus( | 296 void SyncInvalidationListener::InformRegistrationStatus( |
223 invalidation::InvalidationClient* client, | 297 invalidation::InvalidationClient* client, |
224 const invalidation::ObjectId& object_id, | 298 const invalidation::ObjectId& object_id, |
225 InvalidationListener::RegistrationState new_state) { | 299 InvalidationListener::RegistrationState new_state) { |
226 DCHECK(CalledOnValidThread()); | 300 DCHECK(CalledOnValidThread()); |
227 DCHECK_EQ(client, invalidation_client_.get()); | 301 DCHECK_EQ(client, invalidation_client_.get()); |
228 DVLOG(1) << "InformRegistrationStatus: " | 302 DVLOG(1) << "InformRegistrationStatus: " |
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
299 void SyncInvalidationListener::DoRegistrationUpdate() { | 373 void SyncInvalidationListener::DoRegistrationUpdate() { |
300 DCHECK(CalledOnValidThread()); | 374 DCHECK(CalledOnValidThread()); |
301 const ObjectIdSet& unregistered_ids = | 375 const ObjectIdSet& unregistered_ids = |
302 registration_manager_->UpdateRegisteredIds(registered_ids_); | 376 registration_manager_->UpdateRegisteredIds(registered_ids_); |
303 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); | 377 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); |
304 it != unregistered_ids.end(); ++it) { | 378 it != unregistered_ids.end(); ++it) { |
305 invalidation_state_map_.erase(*it); | 379 invalidation_state_map_.erase(*it); |
306 } | 380 } |
307 invalidation_state_tracker_.Call( | 381 invalidation_state_tracker_.Call( |
308 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); | 382 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
383 ack_tracker_.Ack(unregistered_ids); | |
309 } | 384 } |
310 | 385 |
311 void SyncInvalidationListener::StopForTest() { | 386 void SyncInvalidationListener::StopForTest() { |
312 DCHECK(CalledOnValidThread()); | 387 DCHECK(CalledOnValidThread()); |
313 Stop(); | 388 Stop(); |
314 } | 389 } |
315 | 390 |
316 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { | 391 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { |
317 DCHECK(CalledOnValidThread()); | 392 DCHECK(CalledOnValidThread()); |
318 return invalidation_state_map_; | 393 return invalidation_state_map_; |
319 } | 394 } |
320 | 395 |
396 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { | |
397 return &ack_tracker_; | |
398 } | |
399 | |
321 void SyncInvalidationListener::Stop() { | 400 void SyncInvalidationListener::Stop() { |
322 DCHECK(CalledOnValidThread()); | 401 DCHECK(CalledOnValidThread()); |
323 if (!invalidation_client_.get()) { | 402 if (!invalidation_client_.get()) { |
324 return; | 403 return; |
325 } | 404 } |
326 | 405 |
406 ack_tracker_.Clear(); | |
407 | |
327 registration_manager_.reset(); | 408 registration_manager_.reset(); |
328 sync_system_resources_.Stop(); | 409 sync_system_resources_.Stop(); |
329 invalidation_client_->Stop(); | 410 invalidation_client_->Stop(); |
330 | 411 |
331 invalidation_client_.reset(); | 412 invalidation_client_.reset(); |
332 delegate_ = NULL; | 413 delegate_ = NULL; |
333 | 414 |
334 invalidation_state_tracker_.Reset(); | 415 invalidation_state_tracker_.Reset(); |
335 invalidation_state_map_.clear(); | 416 invalidation_state_map_.clear(); |
336 ticl_state_ = DEFAULT_INVALIDATION_ERROR; | 417 ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
373 EmitStateChange(); | 454 EmitStateChange(); |
374 } | 455 } |
375 | 456 |
376 void SyncInvalidationListener::OnIncomingNotification( | 457 void SyncInvalidationListener::OnIncomingNotification( |
377 const notifier::Notification& notification) { | 458 const notifier::Notification& notification) { |
378 DCHECK(CalledOnValidThread()); | 459 DCHECK(CalledOnValidThread()); |
379 // Do nothing, since this is already handled by |invalidation_client_|. | 460 // Do nothing, since this is already handled by |invalidation_client_|. |
380 } | 461 } |
381 | 462 |
382 } // namespace syncer | 463 } // namespace syncer |
OLD | NEW |