Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(78)

Side by Side Diff: sync/notifier/sync_invalidation_listener.cc

Issue 10911084: Implement Invalidator::Acknowledge (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Restart test + more cleanup Created 8 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/notifier/sync_invalidation_listener.h" 5 #include "sync/notifier/sync_invalidation_listener.h"
6 6
7 #include <string>
8 #include <vector> 7 #include <vector>
9 8
9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/stl_util.h"
13 #include "base/tracked_objects.h" 14 #include "base/tracked_objects.h"
14 #include "google/cacheinvalidation/include/invalidation-client.h" 15 #include "google/cacheinvalidation/include/invalidation-client.h"
15 #include "google/cacheinvalidation/include/types.h" 16 #include "google/cacheinvalidation/include/types.h"
16 #include "google/cacheinvalidation/types.pb.h" 17 #include "google/cacheinvalidation/types.pb.h"
17 #include "jingle/notifier/listener/push_client.h" 18 #include "jingle/notifier/listener/push_client.h"
18 #include "sync/notifier/invalidation_util.h" 19 #include "sync/notifier/invalidation_util.h"
19 #include "sync/notifier/registration_manager.h" 20 #include "sync/notifier/registration_manager.h"
20 21
21 namespace { 22 namespace {
22 23
23 const char kApplicationName[] = "chrome-sync"; 24 const char kApplicationName[] = "chrome-sync";
25 const int kMaxDelayInSeconds = 600;
26
27 base::TimeDelta CalculateBackoffDelay(int retry_count) {
akalin 2012/10/19 13:27:16 almost certain there's already an exp. backoff cla
Jói 2012/10/19 13:29:49 net/base/backoff_entry.h
dcheng 2012/10/19 19:38:11 I like this much better and will update the code a
28 int delay = kMaxDelayInSeconds;
29 // Lazy way to prevent overflow.
30 if (retry_count < 10)
31 delay = std::min(delay, (1 << retry_count) * 60);
32 return base::TimeDelta::FromSeconds(delay);
33 }
24 34
25 } // namespace 35 } // namespace
26 36
27 namespace syncer { 37 namespace syncer {
28 38
29 SyncInvalidationListener::Delegate::~Delegate() {} 39 SyncInvalidationListener::Delegate::~Delegate() {}
30 40
31 SyncInvalidationListener::SyncInvalidationListener( 41 SyncInvalidationListener::SyncInvalidationListener(
32 scoped_ptr<notifier::PushClient> push_client) 42 scoped_ptr<notifier::PushClient> push_client)
33 : push_client_(push_client.get()), 43 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
44 push_client_(push_client.get()),
34 sync_system_resources_(push_client.Pass(), 45 sync_system_resources_(push_client.Pass(),
35 ALLOW_THIS_IN_INITIALIZER_LIST(this)), 46 ALLOW_THIS_IN_INITIALIZER_LIST(this)),
36 delegate_(NULL), 47 delegate_(NULL),
37 ticl_state_(DEFAULT_INVALIDATION_ERROR), 48 ticl_state_(DEFAULT_INVALIDATION_ERROR),
38 push_client_state_(DEFAULT_INVALIDATION_ERROR) { 49 push_client_state_(DEFAULT_INVALIDATION_ERROR) {
39 DCHECK(CalledOnValidThread()); 50 DCHECK(CalledOnValidThread());
40 push_client_->AddObserver(this); 51 push_client_->AddObserver(this);
41 } 52 }
42 53
43 SyncInvalidationListener::~SyncInvalidationListener() { 54 SyncInvalidationListener::~SyncInvalidationListener() {
44 DCHECK(CalledOnValidThread()); 55 DCHECK(CalledOnValidThread());
45 push_client_->RemoveObserver(this); 56 push_client_->RemoveObserver(this);
46 Stop(); 57 Stop();
47 DCHECK(!delegate_); 58 DCHECK(!delegate_);
48 } 59 }
49 60
50 void SyncInvalidationListener::Start( 61 void SyncInvalidationListener::Start(
51 const CreateInvalidationClientCallback& 62 const CreateInvalidationClientCallback&
52 create_invalidation_client_callback, 63 create_invalidation_client_callback,
53 const std::string& client_id, const std::string& client_info, 64 const std::string& client_id, const std::string& client_info,
54 const std::string& invalidation_bootstrap_data, 65 const std::string& invalidation_bootstrap_data,
55 const InvalidationVersionMap& initial_max_invalidation_versions, 66 const InvalidationStateMap& initial_invalidation_state_map,
56 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, 67 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
57 Delegate* delegate) { 68 Delegate* delegate) {
58 DCHECK(CalledOnValidThread()); 69 DCHECK(CalledOnValidThread());
59 Stop(); 70 Stop();
60 71
61 sync_system_resources_.set_platform(client_info); 72 sync_system_resources_.set_platform(client_info);
62 sync_system_resources_.Start(); 73 sync_system_resources_.Start();
63 74
64 // The Storage resource is implemented as a write-through cache. We populate 75 // The Storage resource is implemented as a write-through cache. We populate
65 // it with the initial state on startup, so subsequent writes go to disk and 76 // it with the initial state on startup, so subsequent writes go to disk and
66 // update the in-memory cache, while reads just return the cached state. 77 // update the in-memory cache, while reads just return the cached state.
67 sync_system_resources_.storage()->SetInitialState( 78 sync_system_resources_.storage()->SetInitialState(
68 invalidation_bootstrap_data); 79 invalidation_bootstrap_data);
69 80
70 max_invalidation_versions_ = initial_max_invalidation_versions; 81 invalidation_state_map_ = initial_invalidation_state_map;
71 if (max_invalidation_versions_.empty()) { 82 if (invalidation_state_map_.empty()) {
72 DVLOG(2) << "No initial max invalidation versions for any id"; 83 DVLOG(2) << "No initial max invalidation versions for any id";
73 } else { 84 } else {
74 for (InvalidationVersionMap::const_iterator it = 85 // Start the reminder timer if we have unacknowledged local invalidations.
75 max_invalidation_versions_.begin(); 86 for (InvalidationStateMap::const_iterator it =
76 it != max_invalidation_versions_.end(); ++it) { 87 invalidation_state_map_.begin();
88 it != invalidation_state_map_.end(); ++it) {
77 DVLOG(2) << "Initial max invalidation version for " 89 DVLOG(2) << "Initial max invalidation version for "
78 << ObjectIdToString(it->first) << " is " 90 << ObjectIdToString(it->first) << " is "
79 << it->second; 91 << it->second.version;
80 } 92 }
81 } 93 }
82 invalidation_state_tracker_ = invalidation_state_tracker; 94 invalidation_state_tracker_ = invalidation_state_tracker;
83 DCHECK(invalidation_state_tracker_.IsInitialized()); 95 DCHECK(invalidation_state_tracker_.IsInitialized());
84 96
85 DCHECK(!delegate_); 97 DCHECK(!delegate_);
86 DCHECK(delegate); 98 DCHECK(delegate);
87 delegate_ = delegate; 99 delegate_ = delegate;
88 100
89 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; 101 int client_type = ipc::invalidation::ClientType::CHROME_SYNC;
90 invalidation_client_.reset( 102 invalidation_client_.reset(
91 create_invalidation_client_callback.Run( 103 create_invalidation_client_callback.Run(
92 &sync_system_resources_, client_type, client_id, 104 &sync_system_resources_, client_type, client_id,
93 kApplicationName, this)); 105 kApplicationName, this));
94 invalidation_client_->Start(); 106 invalidation_client_->Start();
95 107
96 registration_manager_.reset( 108 registration_manager_.reset(
97 new RegistrationManager(invalidation_client_.get())); 109 new RegistrationManager(invalidation_client_.get()));
110
111 // Start the reminder timer if we have unacknowledged local invalidations.
112 const base::TimeTicks now = base::TimeTicks::Now();
113 const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0);
114 for (InvalidationStateMap::const_iterator it =
115 invalidation_state_map_.begin();
116 it != invalidation_state_map_.end(); ++it) {
117 if (it->second.expected.Equals(it->second.current))
118 continue;
119 // TODO(dcheng): Save payload?
120 InsertId(expiration_time,
121 it->first,
122 std::string(),
123 0 /* retry_count */);
124 }
125 UpdateTimer(now);
98 } 126 }
99 127
100 void SyncInvalidationListener::UpdateCredentials( 128 void SyncInvalidationListener::UpdateCredentials(
101 const std::string& email, const std::string& token) { 129 const std::string& email, const std::string& token) {
102 DCHECK(CalledOnValidThread()); 130 DCHECK(CalledOnValidThread());
103 sync_system_resources_.network()->UpdateCredentials(email, token); 131 sync_system_resources_.network()->UpdateCredentials(email, token);
104 } 132 }
105 133
106 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { 134 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
107 DCHECK(CalledOnValidThread()); 135 DCHECK(CalledOnValidThread());
108 registered_ids_ = ids; 136 registered_ids_ = ids;
109 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a 137 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
110 // working XMPP connection (as observed by us), so check it instead 138 // working XMPP connection (as observed by us), so check it instead
111 // of GetState() (see http://crbug.com/139424). 139 // of GetState() (see http://crbug.com/139424).
112 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { 140 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) {
113 DoRegistrationUpdate(); 141 DoRegistrationUpdate();
114 } 142 }
115 } 143 }
116 144
145 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
146 const AckHandle& ack_handle) {
147 DCHECK(CalledOnValidThread());
148 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
149 if (state_it == invalidation_state_map_.end())
150 return;
151 invalidation_state_tracker_.Call(
152 FROM_HERE,
153 &InvalidationStateTracker::Acknowledge,
154 id,
155 ack_handle);
156 state_it->second.current = ack_handle;
157 if (state_it->second.expected.Equals(ack_handle)) {
158 // If the received ack matches the expected ack, then we no longer need to
159 // keep track of |id| since it is up-to-date. We use some heuristics to
160 // avoid unnecessarily resetting the timer, since this results in the timer
161 // having to post followup continuation tasks and also makes tests less
162 // deterministic.
163 bool update_timer = false;
164 // If we're removing the head of the queue, we may need to update the timer.
165 if (timer_queue_.begin()->second.id == id) {
166 TimerQueue::const_iterator it = timer_queue_.begin();
167 ++it;
168 // But only if there are no other entries with the same expiration time.
169 update_timer =
170 timer_queue_.upper_bound(timer_queue_.begin()->first) == it;
171 }
172 RemoveId(id);
173 if (update_timer)
174 UpdateTimer(base::TimeTicks::Now());
175 }
176 }
177
117 void SyncInvalidationListener::Ready( 178 void SyncInvalidationListener::Ready(
118 invalidation::InvalidationClient* client) { 179 invalidation::InvalidationClient* client) {
119 DCHECK(CalledOnValidThread()); 180 DCHECK(CalledOnValidThread());
120 DCHECK_EQ(client, invalidation_client_.get()); 181 DCHECK_EQ(client, invalidation_client_.get());
121 ticl_state_ = INVALIDATIONS_ENABLED; 182 ticl_state_ = INVALIDATIONS_ENABLED;
122 EmitStateChange(); 183 EmitStateChange();
123 DoRegistrationUpdate(); 184 DoRegistrationUpdate();
124 } 185 }
125 186
126 void SyncInvalidationListener::Invalidate( 187 void SyncInvalidationListener::Invalidate(
127 invalidation::InvalidationClient* client, 188 invalidation::InvalidationClient* client,
128 const invalidation::Invalidation& invalidation, 189 const invalidation::Invalidation& invalidation,
129 const invalidation::AckHandle& ack_handle) { 190 const invalidation::AckHandle& ack_handle) {
130 DCHECK(CalledOnValidThread()); 191 DCHECK(CalledOnValidThread());
131 DCHECK_EQ(client, invalidation_client_.get()); 192 DCHECK_EQ(client, invalidation_client_.get());
132 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); 193 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation);
133 194
134 const invalidation::ObjectId& id = invalidation.object_id(); 195 const invalidation::ObjectId& id = invalidation.object_id();
135 196
136 // The invalidation API spec allows for the possibility of redundant 197 // The invalidation API spec allows for the possibility of redundant
137 // invalidations, so keep track of the max versions and drop 198 // invalidations, so keep track of the max versions and drop
138 // invalidations with old versions. 199 // invalidations with old versions.
139 // 200 //
140 // TODO(akalin): Now that we keep track of registered ids, we 201 // TODO(akalin): Now that we keep track of registered ids, we
141 // should drop invalidations for unregistered ids. We may also 202 // should drop invalidations for unregistered ids. We may also
142 // have to filter it at a higher level, as invalidations for 203 // have to filter it at a higher level, as invalidations for
143 // newly-unregistered ids may already be in flight. 204 // newly-unregistered ids may already be in flight.
144 InvalidationVersionMap::const_iterator it = 205 InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id);
145 max_invalidation_versions_.find(id); 206 if ((it != invalidation_state_map_.end()) &&
146 if ((it != max_invalidation_versions_.end()) && 207 (invalidation.version() <= it->second.version)) {
147 (invalidation.version() <= it->second)) {
148 // Drop redundant invalidations. 208 // Drop redundant invalidations.
149 client->Acknowledge(ack_handle); 209 client->Acknowledge(ack_handle);
150 return; 210 return;
151 } 211 }
152 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) 212 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id)
153 << " to " << invalidation.version(); 213 << " to " << invalidation.version();
154 max_invalidation_versions_[id] = invalidation.version(); 214 invalidation_state_map_[id].version = invalidation.version();
155 invalidation_state_tracker_.Call( 215 invalidation_state_tracker_.Call(
156 FROM_HERE, 216 FROM_HERE,
157 &InvalidationStateTracker::SetMaxVersion, 217 &InvalidationStateTracker::SetMaxVersion,
akalin 2012/10/19 13:27:16 can we combine the SetMaxVersion and GenerateAckHa
dcheng 2012/10/19 19:38:11 The reason they are split up is because Invalidat
158 id, invalidation.version()); 218 id, invalidation.version());
159 219
160 std::string payload; 220 std::string payload;
161 // payload() CHECK()'s has_payload(), so we must check it ourselves first. 221 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
162 if (invalidation.has_payload()) 222 if (invalidation.has_payload())
163 payload = invalidation.payload(); 223 payload = invalidation.payload();
164 224
165 ObjectIdInvalidationMap invalidation_map; 225 ObjectIdSet ids;
166 invalidation_map[id].payload = payload; 226 ids.insert(id);
167 EmitInvalidation(invalidation_map); 227 PrepareInvalidation(ids, payload, client, ack_handle);
168 // TODO(akalin): We should really acknowledge only after we get the
169 // updates from the sync server. (see http://crbug.com/78462).
170 client->Acknowledge(ack_handle);
171 } 228 }
172 229
173 void SyncInvalidationListener::InvalidateUnknownVersion( 230 void SyncInvalidationListener::InvalidateUnknownVersion(
174 invalidation::InvalidationClient* client, 231 invalidation::InvalidationClient* client,
175 const invalidation::ObjectId& object_id, 232 const invalidation::ObjectId& object_id,
176 const invalidation::AckHandle& ack_handle) { 233 const invalidation::AckHandle& ack_handle) {
177 DCHECK(CalledOnValidThread()); 234 DCHECK(CalledOnValidThread());
178 DCHECK_EQ(client, invalidation_client_.get()); 235 DCHECK_EQ(client, invalidation_client_.get());
179 DVLOG(1) << "InvalidateUnknownVersion"; 236 DVLOG(1) << "InvalidateUnknownVersion";
180 237
181 ObjectIdInvalidationMap invalidation_map; 238 ObjectIdSet ids;
182 invalidation_map[object_id].payload = std::string(); 239 ids.insert(object_id);
183 EmitInvalidation(invalidation_map); 240 PrepareInvalidation(ids, std::string(), client, ack_handle);
184 // TODO(akalin): We should really acknowledge only after we get the
185 // updates from the sync server. (see http://crbug.com/78462).
186 client->Acknowledge(ack_handle);
187 } 241 }
188 242
189 // This should behave as if we got an invalidation with version 243 // This should behave as if we got an invalidation with version
190 // UNKNOWN_OBJECT_VERSION for all known data types. 244 // UNKNOWN_OBJECT_VERSION for all known data types.
191 void SyncInvalidationListener::InvalidateAll( 245 void SyncInvalidationListener::InvalidateAll(
192 invalidation::InvalidationClient* client, 246 invalidation::InvalidationClient* client,
193 const invalidation::AckHandle& ack_handle) { 247 const invalidation::AckHandle& ack_handle) {
194 DCHECK(CalledOnValidThread()); 248 DCHECK(CalledOnValidThread());
195 DCHECK_EQ(client, invalidation_client_.get()); 249 DCHECK_EQ(client, invalidation_client_.get());
196 DVLOG(1) << "InvalidateAll"; 250 DVLOG(1) << "InvalidateAll";
197 251
198 const ObjectIdInvalidationMap& invalidation_map = 252 PrepareInvalidation(registered_ids_, std::string(), client, ack_handle);
199 ObjectIdSetToInvalidationMap(registered_ids_, std::string()); 253 }
200 EmitInvalidation(invalidation_map); 254
201 // TODO(akalin): We should really acknowledge only after we get the 255 void SyncInvalidationListener::PrepareInvalidation(
202 // updates from the sync server. (see http://crbug.com/76482). 256 const ObjectIdSet& ids,
257 const std::string& payload,
258 invalidation::InvalidationClient* client,
259 const invalidation::AckHandle& ack_handle) {
260 DCHECK(CalledOnValidThread());
261
262 invalidation_state_tracker_.Call(
263 FROM_HERE,
264 &InvalidationStateTracker::GenerateAckHandles,
265 ids,
266 base::MessageLoopProxy::current(),
267 base::Bind(&SyncInvalidationListener::EmitInvalidation,
268 weak_ptr_factory_.GetWeakPtr(),
269 ids,
270 payload,
271 client,
272 ack_handle));
273 }
274
275 void SyncInvalidationListener::EmitInvalidation(
276 const ObjectIdSet& ids,
277 const std::string& payload,
278 invalidation::InvalidationClient* client,
279 const invalidation::AckHandle& ack_handle,
280 const AckHandleMap& local_ack_handles) {
281 DCHECK(CalledOnValidThread());
282 ObjectIdInvalidationMap invalidation_map =
283 ObjectIdSetToInvalidationMap(ids, payload);
284 // Erase any timer queue entries that correspond with an id in |ids| since a
285 // new invalidation from the server resets the retry count.
286 RemoveIds(ids);
287 const base::TimeTicks now = base::TimeTicks::Now();
288 const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0);
289 for (AckHandleMap::const_iterator it = local_ack_handles.begin();
290 it != local_ack_handles.end(); ++it) {
291 // Update in-memory copy of the invalidation state.
292 invalidation_state_map_[it->first].expected = it->second;
293 invalidation_map[it->first].ack_handle = it->second;
294 InsertId(expiration_time, it->first, payload, 0 /* retry_count */);
295 }
296 DCHECK(!timer_queue_.empty());
297 UpdateTimer(now);
298 delegate_->OnInvalidate(invalidation_map);
203 client->Acknowledge(ack_handle); 299 client->Acknowledge(ack_handle);
204 } 300 }
205 301
206 void SyncInvalidationListener::EmitInvalidation( 302 void SyncInvalidationListener::ResendUnacknowledgedInvalidations() {
207 const ObjectIdInvalidationMap& invalidation_map) { 303 ResendUnacknowledgedInvalidationsAt(base::TimeTicks::Now());
208 DCHECK(CalledOnValidThread()); 304 }
305
306 void SyncInvalidationListener::ResendUnacknowledgedInvalidationsAt(
307 base::TimeTicks expiration_time) {
308 DCHECK(!timer_queue_.empty()) << "Timer not canceled but queue is empty!";
309
310 // This is slightly redundant since in non-test code, we always satisfy
311 // the condition now == expiration_time. Having this makes the tests a bit
312 // more sane though.
313 const base::TimeTicks now = base::TimeTicks::Now();
314 TimerQueue::iterator end = timer_queue_.upper_bound(expiration_time);
315 // In theory, we can do in one loop since all the new insertions should be
316 // past |end|. To avoid potential infinite loops in case of a bug, we do it in
317 // two separate loops.
318 std::vector<QueueEntry> expired_entries;
319 for (TimerQueue::iterator it = timer_queue_.begin(); it != end; ) {
320 expired_entries.push_back(it->second);
321 TimerQueue::iterator erase_it = it;
322 ++it;
323 timer_queue_.erase(erase_it);
324 }
325
326 ObjectIdInvalidationMap invalidation_map;
327 for (std::vector<QueueEntry>::const_iterator it = expired_entries.begin();
328 it != expired_entries.end(); ++it) {
329 InsertId(now + CalculateBackoffDelay(it->retry_count + 1),
330 it->id,
331 it->payload,
332 it->retry_count + 1);
333 Invalidation invalidation;
334 invalidation.ack_handle = invalidation_state_map_[it->id].expected;
335 invalidation.payload = it->payload;
336 invalidation_map.insert(std::make_pair(it->id, invalidation));
337 }
338
209 delegate_->OnInvalidate(invalidation_map); 339 delegate_->OnInvalidate(invalidation_map);
340 UpdateTimer(now);
341 }
342
343 void SyncInvalidationListener::InsertId(base::TimeTicks expiration_time,
344 const invalidation::ObjectId& id,
345 const std::string& payload,
346 int retry_count) {
347 timer_queue_.insert(std::make_pair(expiration_time,
348 QueueEntry(id, payload, retry_count)));
349 }
350
351 void SyncInvalidationListener::RemoveId(const invalidation::ObjectId& id) {
352 for (TimerQueue::iterator it = timer_queue_.begin(); it != timer_queue_.end();
353 ++it) {
354 if (it->second.id == id) {
355 timer_queue_.erase(it);
356 return;
357 }
358 }
359 // This shouldn't happen unless someone is acking multiple times.
360 NOTREACHED();
361 }
362
363 void SyncInvalidationListener::RemoveIds(const ObjectIdSet& ids) {
364 for (TimerQueue::iterator it = timer_queue_.begin();
365 it != timer_queue_.end(); ) {
366 // We could be more clever here and reduce the complexity, since both
367 // containers are already sorted. Since we don't expect timer_queue_ to be
368 // very long in practice though, we just do things the simpler way.
369 if (ContainsKey(ids, it->second.id)) {
370 TimerQueue::iterator erase_it = it;
371 ++it;
372 timer_queue_.erase(erase_it);
373 } else {
374 ++it;
375 }
376 }
377 }
378
379 void SyncInvalidationListener::UpdateTimer(base::TimeTicks now) {
380 if (timer_queue_.empty()) {
381 timer_.Stop();
382 } else {
383 timer_.Start(FROM_HERE,
384 timer_queue_.begin()->first - now,
385 this,
386 &SyncInvalidationListener::ResendUnacknowledgedInvalidations);
387 }
210 } 388 }
211 389
212 void SyncInvalidationListener::InformRegistrationStatus( 390 void SyncInvalidationListener::InformRegistrationStatus(
213 invalidation::InvalidationClient* client, 391 invalidation::InvalidationClient* client,
214 const invalidation::ObjectId& object_id, 392 const invalidation::ObjectId& object_id,
215 InvalidationListener::RegistrationState new_state) { 393 InvalidationListener::RegistrationState new_state) {
216 DCHECK(CalledOnValidThread()); 394 DCHECK(CalledOnValidThread());
217 DCHECK_EQ(client, invalidation_client_.get()); 395 DCHECK_EQ(client, invalidation_client_.get());
218 DVLOG(1) << "InformRegistrationStatus: " 396 DVLOG(1) << "InformRegistrationStatus: "
219 << ObjectIdToString(object_id) << " " << new_state; 397 << ObjectIdToString(object_id) << " " << new_state;
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
285 invalidation_state_tracker_.Call( 463 invalidation_state_tracker_.Call(
286 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); 464 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state);
287 } 465 }
288 466
289 void SyncInvalidationListener::DoRegistrationUpdate() { 467 void SyncInvalidationListener::DoRegistrationUpdate() {
290 DCHECK(CalledOnValidThread()); 468 DCHECK(CalledOnValidThread());
291 const ObjectIdSet& unregistered_ids = 469 const ObjectIdSet& unregistered_ids =
292 registration_manager_->UpdateRegisteredIds(registered_ids_); 470 registration_manager_->UpdateRegisteredIds(registered_ids_);
293 invalidation_state_tracker_.Call( 471 invalidation_state_tracker_.Call(
294 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); 472 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids);
473 for (ObjectIdSet::const_iterator it = unregistered_ids.begin();
474 it != unregistered_ids.end(); ++it) {
475 invalidation_state_map_.erase(*it);
476 }
477 for (TimerQueue::iterator it = timer_queue_.begin();
478 it != timer_queue_.end(); ) {
479 if (unregistered_ids.find(it->second.id) != unregistered_ids.end()) {
480 TimerQueue::iterator erase_it = it;
481 ++it;
482 timer_queue_.erase(erase_it);
483 } else {
484 ++it;
485 }
486 }
487 }
488
489 bool SyncInvalidationListener::TriggerNextTimeoutForTest(
490 base::TimeTicks* next_invalidation_time) {
491 if (timer_queue_.empty()) {
492 CHECK(!timer_.IsRunning());
493 return false;
494 }
495 *next_invalidation_time = timer_queue_.begin()->first;
496 ResendUnacknowledgedInvalidationsAt(*next_invalidation_time);
497 return true;
295 } 498 }
296 499
297 void SyncInvalidationListener::StopForTest() { 500 void SyncInvalidationListener::StopForTest() {
298 DCHECK(CalledOnValidThread()); 501 DCHECK(CalledOnValidThread());
299 Stop(); 502 Stop();
300 } 503 }
301 504
302 void SyncInvalidationListener::Stop() { 505 void SyncInvalidationListener::Stop() {
303 DCHECK(CalledOnValidThread()); 506 DCHECK(CalledOnValidThread());
304 if (!invalidation_client_.get()) { 507 if (!invalidation_client_.get()) {
305 return; 508 return;
306 } 509 }
307 510
511 timer_.Stop();
512 timer_queue_.clear();
513
308 registration_manager_.reset(); 514 registration_manager_.reset();
309 sync_system_resources_.Stop(); 515 sync_system_resources_.Stop();
310 invalidation_client_->Stop(); 516 invalidation_client_->Stop();
311 517
312 invalidation_client_.reset(); 518 invalidation_client_.reset();
313 delegate_ = NULL; 519 delegate_ = NULL;
314 520
315 invalidation_state_tracker_.Reset(); 521 invalidation_state_tracker_.Reset();
316 max_invalidation_versions_.clear(); 522 invalidation_state_map_.clear();
317 ticl_state_ = DEFAULT_INVALIDATION_ERROR; 523 ticl_state_ = DEFAULT_INVALIDATION_ERROR;
318 push_client_state_ = DEFAULT_INVALIDATION_ERROR; 524 push_client_state_ = DEFAULT_INVALIDATION_ERROR;
319 } 525 }
320 526
321 InvalidatorState SyncInvalidationListener::GetState() const { 527 InvalidatorState SyncInvalidationListener::GetState() const {
322 DCHECK(CalledOnValidThread()); 528 DCHECK(CalledOnValidThread());
323 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || 529 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
324 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { 530 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
325 // If either the ticl or the push client rejected our credentials, 531 // If either the ticl or the push client rejected our credentials,
326 // return INVALIDATION_CREDENTIALS_REJECTED. 532 // return INVALIDATION_CREDENTIALS_REJECTED.
(...skipping 27 matching lines...) Expand all
354 EmitStateChange(); 560 EmitStateChange();
355 } 561 }
356 562
357 void SyncInvalidationListener::OnIncomingNotification( 563 void SyncInvalidationListener::OnIncomingNotification(
358 const notifier::Notification& notification) { 564 const notifier::Notification& notification) {
359 DCHECK(CalledOnValidThread()); 565 DCHECK(CalledOnValidThread());
360 // Do nothing, since this is already handled by |invalidation_client_|. 566 // Do nothing, since this is already handled by |invalidation_client_|.
361 } 567 }
362 568
363 } // namespace syncer 569 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698