Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1)

Side by Side Diff: sync/notifier/sync_invalidation_listener.cc

Issue 10911084: Implement Invalidator::Acknowledge (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: . Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/notifier/sync_invalidation_listener.h" 5 #include "sync/notifier/sync_invalidation_listener.h"
6 6
7 #include <string>
8 #include <vector> 7 #include <vector>
9 8
9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/tracked_objects.h" 13 #include "base/tracked_objects.h"
14 #include "google/cacheinvalidation/include/invalidation-client.h" 14 #include "google/cacheinvalidation/include/invalidation-client.h"
15 #include "google/cacheinvalidation/include/types.h" 15 #include "google/cacheinvalidation/include/types.h"
16 #include "google/cacheinvalidation/types.pb.h" 16 #include "google/cacheinvalidation/types.pb.h"
17 #include "jingle/notifier/listener/push_client.h" 17 #include "jingle/notifier/listener/push_client.h"
18 #include "sync/notifier/invalidation_util.h" 18 #include "sync/notifier/invalidation_util.h"
19 #include "sync/notifier/registration_manager.h" 19 #include "sync/notifier/registration_manager.h"
20 20
21 namespace { 21 namespace {
22 22
23 const char kApplicationName[] = "chrome-sync"; 23 const char kApplicationName[] = "chrome-sync";
24 24
25 } // namespace 25 } // namespace
26 26
27 namespace syncer { 27 namespace syncer {
28 28
29 SyncInvalidationListener::Delegate::~Delegate() {} 29 SyncInvalidationListener::Delegate::~Delegate() {}
30 30
31 SyncInvalidationListener::SyncInvalidationListener( 31 SyncInvalidationListener::SyncInvalidationListener(
32 scoped_ptr<notifier::PushClient> push_client) 32 scoped_ptr<notifier::PushClient> push_client)
33 : push_client_(push_client.get()), 33 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
34 ack_tracker_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
35 push_client_(push_client.get()),
34 sync_system_resources_(push_client.Pass(), 36 sync_system_resources_(push_client.Pass(),
35 ALLOW_THIS_IN_INITIALIZER_LIST(this)), 37 ALLOW_THIS_IN_INITIALIZER_LIST(this)),
36 delegate_(NULL), 38 delegate_(NULL),
37 ticl_state_(DEFAULT_INVALIDATION_ERROR), 39 ticl_state_(DEFAULT_INVALIDATION_ERROR),
38 push_client_state_(DEFAULT_INVALIDATION_ERROR) { 40 push_client_state_(DEFAULT_INVALIDATION_ERROR) {
39 DCHECK(CalledOnValidThread()); 41 DCHECK(CalledOnValidThread());
40 push_client_->AddObserver(this); 42 push_client_->AddObserver(this);
41 } 43 }
42 44
43 SyncInvalidationListener::~SyncInvalidationListener() { 45 SyncInvalidationListener::~SyncInvalidationListener() {
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
98 100
99 // TODO(rlarocque): This call exists as part of an effort to move the 101 // TODO(rlarocque): This call exists as part of an effort to move the
100 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to 102 // invalidator's ID out of sync. It writes the provided (sync-managed) ID to
101 // storage that lives on the UI thread. Once this has been in place for a 103 // storage that lives on the UI thread. Once this has been in place for a
102 // milestone or two, we can remove it and start looking for invalidator client 104 // milestone or two, we can remove it and start looking for invalidator client
103 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142. 105 // IDs exclusively in the InvalidationStateTracker. See crbug.com/124142.
104 invalidation_state_tracker_.Call( 106 invalidation_state_tracker_.Call(
105 FROM_HERE, 107 FROM_HERE,
106 &InvalidationStateTracker::SetInvalidatorClientId, 108 &InvalidationStateTracker::SetInvalidatorClientId,
107 client_id); 109 client_id);
110
111 // Set up reminders for any invalidations that have not been locally
112 // acknowledged.
113 ObjectIdSet unacknowledged_ids;
114 for (InvalidationStateMap::const_iterator it =
115 invalidation_state_map_.begin();
116 it != invalidation_state_map_.end(); ++it) {
117 if (it->second.expected.Equals(it->second.current))
118 continue;
119 unacknowledged_ids.insert(it->first);
120 }
121 if (!unacknowledged_ids.empty())
122 ack_tracker_.Track(unacknowledged_ids);
108 } 123 }
109 124
110 void SyncInvalidationListener::UpdateCredentials( 125 void SyncInvalidationListener::UpdateCredentials(
111 const std::string& email, const std::string& token) { 126 const std::string& email, const std::string& token) {
112 DCHECK(CalledOnValidThread()); 127 DCHECK(CalledOnValidThread());
113 sync_system_resources_.network()->UpdateCredentials(email, token); 128 sync_system_resources_.network()->UpdateCredentials(email, token);
114 } 129 }
115 130
116 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { 131 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
117 DCHECK(CalledOnValidThread()); 132 DCHECK(CalledOnValidThread());
118 registered_ids_ = ids; 133 registered_ids_ = ids;
119 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a 134 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
120 // working XMPP connection (as observed by us), so check it instead 135 // working XMPP connection (as observed by us), so check it instead
121 // of GetState() (see http://crbug.com/139424). 136 // of GetState() (see http://crbug.com/139424).
122 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { 137 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) {
123 DoRegistrationUpdate(); 138 DoRegistrationUpdate();
124 } 139 }
125 } 140 }
126 141
142 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
143 const AckHandle& ack_handle) {
144 DCHECK(CalledOnValidThread());
145 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
146 if (state_it == invalidation_state_map_.end())
147 return;
148 invalidation_state_tracker_.Call(
149 FROM_HERE,
150 &InvalidationStateTracker::Acknowledge,
151 id,
152 ack_handle);
153 state_it->second.current = ack_handle;
154 if (state_it->second.expected.Equals(ack_handle)) {
155 // If the received ack matches the expected ack, then we no longer need to
156 // keep track of |id| since it is up-to-date.
Pete Williamson 2013/02/16 01:41:06 This comment confuses me a bit, we say that we don
dcheng 2013/02/22 02:51:30 Right, AckTracker has a Track() and an Ack() funct
157 ObjectIdSet ids;
158 ids.insert(id);
159 ack_tracker_.Ack(ids);
160 }
161 }
162
127 void SyncInvalidationListener::Ready( 163 void SyncInvalidationListener::Ready(
128 invalidation::InvalidationClient* client) { 164 invalidation::InvalidationClient* client) {
129 DCHECK(CalledOnValidThread()); 165 DCHECK(CalledOnValidThread());
130 DCHECK_EQ(client, invalidation_client_.get()); 166 DCHECK_EQ(client, invalidation_client_.get());
131 ticl_state_ = INVALIDATIONS_ENABLED; 167 ticl_state_ = INVALIDATIONS_ENABLED;
132 EmitStateChange(); 168 EmitStateChange();
133 DoRegistrationUpdate(); 169 DoRegistrationUpdate();
134 } 170 }
135 171
136 void SyncInvalidationListener::Invalidate( 172 void SyncInvalidationListener::Invalidate(
(...skipping 23 matching lines...) Expand all
160 } 196 }
161 197
162 std::string payload; 198 std::string payload;
163 // payload() CHECK()'s has_payload(), so we must check it ourselves first. 199 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
164 if (invalidation.has_payload()) 200 if (invalidation.has_payload())
165 payload = invalidation.payload(); 201 payload = invalidation.payload();
166 202
167 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) 203 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
168 << " to " << invalidation.version(); 204 << " to " << invalidation.version();
169 invalidation_state_map_[id].version = invalidation.version(); 205 invalidation_state_map_[id].version = invalidation.version();
206 invalidation_state_map_[id].payload = payload;
170 invalidation_state_tracker_.Call( 207 invalidation_state_tracker_.Call(
171 FROM_HERE, 208 FROM_HERE,
172 &InvalidationStateTracker::SetMaxVersionAndPayload, 209 &InvalidationStateTracker::SetMaxVersionAndPayload,
173 id, invalidation.version(), payload); 210 id, invalidation.version(), payload);
174 211
175 ObjectIdInvalidationMap invalidation_map; 212 ObjectIdSet ids;
176 invalidation_map[id].payload = payload; 213 ids.insert(id);
177 EmitInvalidation(invalidation_map); 214 PrepareInvalidation(ids, payload, client, ack_handle);
178 // TODO(akalin): We should really acknowledge only after we get the
179 // updates from the sync server. (see http://crbug.com/78462).
180 client->Acknowledge(ack_handle);
181 } 215 }
182 216
183 void SyncInvalidationListener::InvalidateUnknownVersion( 217 void SyncInvalidationListener::InvalidateUnknownVersion(
184 invalidation::InvalidationClient* client, 218 invalidation::InvalidationClient* client,
185 const invalidation::ObjectId& object_id, 219 const invalidation::ObjectId& object_id,
186 const invalidation::AckHandle& ack_handle) { 220 const invalidation::AckHandle& ack_handle) {
187 DCHECK(CalledOnValidThread()); 221 DCHECK(CalledOnValidThread());
188 DCHECK_EQ(client, invalidation_client_.get()); 222 DCHECK_EQ(client, invalidation_client_.get());
189 DVLOG(1) << "InvalidateUnknownVersion"; 223 DVLOG(1) << "InvalidateUnknownVersion";
190 224
191 ObjectIdInvalidationMap invalidation_map; 225 ObjectIdSet ids;
192 invalidation_map[object_id].payload = std::string(); 226 ids.insert(object_id);
193 EmitInvalidation(invalidation_map); 227 PrepareInvalidation(ids, std::string(), client, ack_handle);
194 // TODO(akalin): We should really acknowledge only after we get the
195 // updates from the sync server. (see http://crbug.com/78462).
196 client->Acknowledge(ack_handle);
197 } 228 }
198 229
199 // This should behave as if we got an invalidation with version 230 // This should behave as if we got an invalidation with version
200 // UNKNOWN_OBJECT_VERSION for all known data types. 231 // UNKNOWN_OBJECT_VERSION for all known data types.
201 void SyncInvalidationListener::InvalidateAll( 232 void SyncInvalidationListener::InvalidateAll(
202 invalidation::InvalidationClient* client, 233 invalidation::InvalidationClient* client,
203 const invalidation::AckHandle& ack_handle) { 234 const invalidation::AckHandle& ack_handle) {
204 DCHECK(CalledOnValidThread()); 235 DCHECK(CalledOnValidThread());
205 DCHECK_EQ(client, invalidation_client_.get()); 236 DCHECK_EQ(client, invalidation_client_.get());
206 DVLOG(1) << "InvalidateAll"; 237 DVLOG(1) << "InvalidateAll";
207 238
208 const ObjectIdInvalidationMap& invalidation_map = 239 PrepareInvalidation(registered_ids_, std::string(), client, ack_handle);
209 ObjectIdSetToInvalidationMap(registered_ids_, std::string()); 240 }
210 EmitInvalidation(invalidation_map); 241
211 // TODO(akalin): We should really acknowledge only after we get the 242 void SyncInvalidationListener::PrepareInvalidation(
212 // updates from the sync server. (see http://crbug.com/76482). 243 const ObjectIdSet& ids,
244 const std::string& payload,
245 invalidation::InvalidationClient* client,
246 const invalidation::AckHandle& ack_handle) {
247 DCHECK(CalledOnValidThread());
248
249 // A server invalidation resets the local retry count.
250 ack_tracker_.Ack(ids);
251 invalidation_state_tracker_.Call(
252 FROM_HERE,
253 &InvalidationStateTracker::GenerateAckHandles,
254 ids,
255 base::MessageLoopProxy::current(),
256 base::Bind(&SyncInvalidationListener::EmitInvalidation,
257 weak_ptr_factory_.GetWeakPtr(),
258 ids,
259 payload,
260 client,
261 ack_handle));
262 }
263
264 void SyncInvalidationListener::EmitInvalidation(
265 const ObjectIdSet& ids,
266 const std::string& payload,
267 invalidation::InvalidationClient* client,
268 const invalidation::AckHandle& ack_handle,
269 const AckHandleMap& local_ack_handles) {
270 DCHECK(CalledOnValidThread());
271 ObjectIdInvalidationMap invalidation_map =
272 ObjectIdSetToInvalidationMap(ids, payload);
273 for (AckHandleMap::const_iterator it = local_ack_handles.begin();
274 it != local_ack_handles.end(); ++it) {
275 // Update in-memory copy of the invalidation state.
276 invalidation_state_map_[it->first].expected = it->second;
277 invalidation_map[it->first].ack_handle = it->second;
278 }
279 ack_tracker_.Track(ids);
280 delegate_->OnInvalidate(invalidation_map);
213 client->Acknowledge(ack_handle); 281 client->Acknowledge(ack_handle);
214 } 282 }
215 283
216 void SyncInvalidationListener::EmitInvalidation( 284 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) {
Pete Williamson 2013/02/16 01:41:06 A comment on the "OnTimeout" function might be nic
dcheng 2013/02/22 02:51:30 Hmm. The delegate interface describes what this ca
217 const ObjectIdInvalidationMap& invalidation_map) { 285 ObjectIdInvalidationMap invalidation_map;
218 DCHECK(CalledOnValidThread()); 286 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
287 Invalidation invalidation;
288 invalidation.ack_handle = invalidation_state_map_[*it].expected;
289 invalidation.payload = invalidation_state_map_[*it].payload;
290 invalidation_map.insert(std::make_pair(*it, invalidation));
291 }
292
219 delegate_->OnInvalidate(invalidation_map); 293 delegate_->OnInvalidate(invalidation_map);
220 } 294 }
221 295
222 void SyncInvalidationListener::InformRegistrationStatus( 296 void SyncInvalidationListener::InformRegistrationStatus(
223 invalidation::InvalidationClient* client, 297 invalidation::InvalidationClient* client,
224 const invalidation::ObjectId& object_id, 298 const invalidation::ObjectId& object_id,
225 InvalidationListener::RegistrationState new_state) { 299 InvalidationListener::RegistrationState new_state) {
226 DCHECK(CalledOnValidThread()); 300 DCHECK(CalledOnValidThread());
227 DCHECK_EQ(client, invalidation_client_.get()); 301 DCHECK_EQ(client, invalidation_client_.get());
228 DVLOG(1) << "InformRegistrationStatus: " 302 DVLOG(1) << "InformRegistrationStatus: "
(...skipping 70 matching lines...) Expand 10 before | Expand all | Expand 10 after
299 void SyncInvalidationListener::DoRegistrationUpdate() { 373 void SyncInvalidationListener::DoRegistrationUpdate() {
300 DCHECK(CalledOnValidThread()); 374 DCHECK(CalledOnValidThread());
301 const ObjectIdSet& unregistered_ids = 375 const ObjectIdSet& unregistered_ids =
302 registration_manager_->UpdateRegisteredIds(registered_ids_); 376 registration_manager_->UpdateRegisteredIds(registered_ids_);
303 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); 377 for (ObjectIdSet::const_iterator it = unregistered_ids.begin();
304 it != unregistered_ids.end(); ++it) { 378 it != unregistered_ids.end(); ++it) {
305 invalidation_state_map_.erase(*it); 379 invalidation_state_map_.erase(*it);
306 } 380 }
307 invalidation_state_tracker_.Call( 381 invalidation_state_tracker_.Call(
308 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); 382 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
383 ack_tracker_.Ack(unregistered_ids);
309 } 384 }
310 385
311 void SyncInvalidationListener::StopForTest() { 386 void SyncInvalidationListener::StopForTest() {
312 DCHECK(CalledOnValidThread()); 387 DCHECK(CalledOnValidThread());
313 Stop(); 388 Stop();
314 } 389 }
315 390
316 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { 391 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const {
317 DCHECK(CalledOnValidThread()); 392 DCHECK(CalledOnValidThread());
318 return invalidation_state_map_; 393 return invalidation_state_map_;
319 } 394 }
320 395
396 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() {
397 return &ack_tracker_;
398 }
399
321 void SyncInvalidationListener::Stop() { 400 void SyncInvalidationListener::Stop() {
322 DCHECK(CalledOnValidThread()); 401 DCHECK(CalledOnValidThread());
323 if (!invalidation_client_.get()) { 402 if (!invalidation_client_.get()) {
324 return; 403 return;
325 } 404 }
326 405
406 ack_tracker_.Clear();
407
327 registration_manager_.reset(); 408 registration_manager_.reset();
328 sync_system_resources_.Stop(); 409 sync_system_resources_.Stop();
329 invalidation_client_->Stop(); 410 invalidation_client_->Stop();
330 411
331 invalidation_client_.reset(); 412 invalidation_client_.reset();
332 delegate_ = NULL; 413 delegate_ = NULL;
333 414
334 invalidation_state_tracker_.Reset(); 415 invalidation_state_tracker_.Reset();
335 invalidation_state_map_.clear(); 416 invalidation_state_map_.clear();
336 ticl_state_ = DEFAULT_INVALIDATION_ERROR; 417 ticl_state_ = DEFAULT_INVALIDATION_ERROR;
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after
373 EmitStateChange(); 454 EmitStateChange();
374 } 455 }
375 456
376 void SyncInvalidationListener::OnIncomingNotification( 457 void SyncInvalidationListener::OnIncomingNotification(
377 const notifier::Notification& notification) { 458 const notifier::Notification& notification) {
378 DCHECK(CalledOnValidThread()); 459 DCHECK(CalledOnValidThread());
379 // Do nothing, since this is already handled by |invalidation_client_|. 460 // Do nothing, since this is already handled by |invalidation_client_|.
380 } 461 }
381 462
382 } // namespace syncer 463 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698