OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/notifier/sync_invalidation_listener.h" | 5 #include "sync/notifier/sync_invalidation_listener.h" |
6 | 6 |
7 #include <string> | |
8 #include <vector> | 7 #include <vector> |
9 | 8 |
9 #include "base/bind.h" | |
10 #include "base/callback.h" | 10 #include "base/callback.h" |
11 #include "base/compiler_specific.h" | 11 #include "base/compiler_specific.h" |
12 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "base/stl_util.h" | |
13 #include "base/tracked_objects.h" | 14 #include "base/tracked_objects.h" |
14 #include "google/cacheinvalidation/include/invalidation-client.h" | 15 #include "google/cacheinvalidation/include/invalidation-client.h" |
15 #include "google/cacheinvalidation/include/types.h" | 16 #include "google/cacheinvalidation/include/types.h" |
16 #include "google/cacheinvalidation/types.pb.h" | 17 #include "google/cacheinvalidation/types.pb.h" |
17 #include "jingle/notifier/listener/push_client.h" | 18 #include "jingle/notifier/listener/push_client.h" |
18 #include "sync/notifier/invalidation_util.h" | 19 #include "sync/notifier/invalidation_util.h" |
19 #include "sync/notifier/registration_manager.h" | 20 #include "sync/notifier/registration_manager.h" |
20 | 21 |
21 namespace { | 22 namespace { |
22 | 23 |
23 const char kApplicationName[] = "chrome-sync"; | 24 const char kApplicationName[] = "chrome-sync"; |
25 const int kMaxDelayInSeconds = 600; | |
26 | |
27 base::TimeDelta CalculateBackoffDelay(int retry_count) { | |
akalin
2012/10/19 13:27:16
almost certain there's already an exp. backoff cla
Jói
2012/10/19 13:29:49
net/base/backoff_entry.h
dcheng
2012/10/19 19:38:11
I like this much better and will update the code a
| |
28 int delay = kMaxDelayInSeconds; | |
29 // Lazy way to prevent overflow. | |
30 if (retry_count < 10) | |
31 delay = std::min(delay, (1 << retry_count) * 60); | |
32 return base::TimeDelta::FromSeconds(delay); | |
33 } | |
24 | 34 |
25 } // namespace | 35 } // namespace |
26 | 36 |
27 namespace syncer { | 37 namespace syncer { |
28 | 38 |
29 SyncInvalidationListener::Delegate::~Delegate() {} | 39 SyncInvalidationListener::Delegate::~Delegate() {} |
30 | 40 |
31 SyncInvalidationListener::SyncInvalidationListener( | 41 SyncInvalidationListener::SyncInvalidationListener( |
32 scoped_ptr<notifier::PushClient> push_client) | 42 scoped_ptr<notifier::PushClient> push_client) |
33 : push_client_(push_client.get()), | 43 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
44 push_client_(push_client.get()), | |
34 sync_system_resources_(push_client.Pass(), | 45 sync_system_resources_(push_client.Pass(), |
35 ALLOW_THIS_IN_INITIALIZER_LIST(this)), | 46 ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
36 delegate_(NULL), | 47 delegate_(NULL), |
37 ticl_state_(DEFAULT_INVALIDATION_ERROR), | 48 ticl_state_(DEFAULT_INVALIDATION_ERROR), |
38 push_client_state_(DEFAULT_INVALIDATION_ERROR) { | 49 push_client_state_(DEFAULT_INVALIDATION_ERROR) { |
39 DCHECK(CalledOnValidThread()); | 50 DCHECK(CalledOnValidThread()); |
40 push_client_->AddObserver(this); | 51 push_client_->AddObserver(this); |
41 } | 52 } |
42 | 53 |
43 SyncInvalidationListener::~SyncInvalidationListener() { | 54 SyncInvalidationListener::~SyncInvalidationListener() { |
44 DCHECK(CalledOnValidThread()); | 55 DCHECK(CalledOnValidThread()); |
45 push_client_->RemoveObserver(this); | 56 push_client_->RemoveObserver(this); |
46 Stop(); | 57 Stop(); |
47 DCHECK(!delegate_); | 58 DCHECK(!delegate_); |
48 } | 59 } |
49 | 60 |
50 void SyncInvalidationListener::Start( | 61 void SyncInvalidationListener::Start( |
51 const CreateInvalidationClientCallback& | 62 const CreateInvalidationClientCallback& |
52 create_invalidation_client_callback, | 63 create_invalidation_client_callback, |
53 const std::string& client_id, const std::string& client_info, | 64 const std::string& client_id, const std::string& client_info, |
54 const std::string& invalidation_bootstrap_data, | 65 const std::string& invalidation_bootstrap_data, |
55 const InvalidationVersionMap& initial_max_invalidation_versions, | 66 const InvalidationStateMap& initial_invalidation_state_map, |
56 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, | 67 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
57 Delegate* delegate) { | 68 Delegate* delegate) { |
58 DCHECK(CalledOnValidThread()); | 69 DCHECK(CalledOnValidThread()); |
59 Stop(); | 70 Stop(); |
60 | 71 |
61 sync_system_resources_.set_platform(client_info); | 72 sync_system_resources_.set_platform(client_info); |
62 sync_system_resources_.Start(); | 73 sync_system_resources_.Start(); |
63 | 74 |
64 // The Storage resource is implemented as a write-through cache. We populate | 75 // The Storage resource is implemented as a write-through cache. We populate |
65 // it with the initial state on startup, so subsequent writes go to disk and | 76 // it with the initial state on startup, so subsequent writes go to disk and |
66 // update the in-memory cache, while reads just return the cached state. | 77 // update the in-memory cache, while reads just return the cached state. |
67 sync_system_resources_.storage()->SetInitialState( | 78 sync_system_resources_.storage()->SetInitialState( |
68 invalidation_bootstrap_data); | 79 invalidation_bootstrap_data); |
69 | 80 |
70 max_invalidation_versions_ = initial_max_invalidation_versions; | 81 invalidation_state_map_ = initial_invalidation_state_map; |
71 if (max_invalidation_versions_.empty()) { | 82 if (invalidation_state_map_.empty()) { |
72 DVLOG(2) << "No initial max invalidation versions for any id"; | 83 DVLOG(2) << "No initial max invalidation versions for any id"; |
73 } else { | 84 } else { |
74 for (InvalidationVersionMap::const_iterator it = | 85 // Start the reminder timer if we have unacknowledged local invalidations. |
75 max_invalidation_versions_.begin(); | 86 for (InvalidationStateMap::const_iterator it = |
76 it != max_invalidation_versions_.end(); ++it) { | 87 invalidation_state_map_.begin(); |
88 it != invalidation_state_map_.end(); ++it) { | |
77 DVLOG(2) << "Initial max invalidation version for " | 89 DVLOG(2) << "Initial max invalidation version for " |
78 << ObjectIdToString(it->first) << " is " | 90 << ObjectIdToString(it->first) << " is " |
79 << it->second; | 91 << it->second.version; |
80 } | 92 } |
81 } | 93 } |
82 invalidation_state_tracker_ = invalidation_state_tracker; | 94 invalidation_state_tracker_ = invalidation_state_tracker; |
83 DCHECK(invalidation_state_tracker_.IsInitialized()); | 95 DCHECK(invalidation_state_tracker_.IsInitialized()); |
84 | 96 |
85 DCHECK(!delegate_); | 97 DCHECK(!delegate_); |
86 DCHECK(delegate); | 98 DCHECK(delegate); |
87 delegate_ = delegate; | 99 delegate_ = delegate; |
88 | 100 |
89 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; | 101 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; |
90 invalidation_client_.reset( | 102 invalidation_client_.reset( |
91 create_invalidation_client_callback.Run( | 103 create_invalidation_client_callback.Run( |
92 &sync_system_resources_, client_type, client_id, | 104 &sync_system_resources_, client_type, client_id, |
93 kApplicationName, this)); | 105 kApplicationName, this)); |
94 invalidation_client_->Start(); | 106 invalidation_client_->Start(); |
95 | 107 |
96 registration_manager_.reset( | 108 registration_manager_.reset( |
97 new RegistrationManager(invalidation_client_.get())); | 109 new RegistrationManager(invalidation_client_.get())); |
110 | |
111 // Start the reminder timer if we have unacknowledged local invalidations. | |
112 const base::TimeTicks now = base::TimeTicks::Now(); | |
113 const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0); | |
114 for (InvalidationStateMap::const_iterator it = | |
115 invalidation_state_map_.begin(); | |
116 it != invalidation_state_map_.end(); ++it) { | |
117 if (it->second.expected.Equals(it->second.current)) | |
118 continue; | |
119 // TODO(dcheng): Save payload? | |
120 InsertId(expiration_time, | |
121 it->first, | |
122 std::string(), | |
123 0 /* retry_count */); | |
124 } | |
125 UpdateTimer(now); | |
98 } | 126 } |
99 | 127 |
100 void SyncInvalidationListener::UpdateCredentials( | 128 void SyncInvalidationListener::UpdateCredentials( |
101 const std::string& email, const std::string& token) { | 129 const std::string& email, const std::string& token) { |
102 DCHECK(CalledOnValidThread()); | 130 DCHECK(CalledOnValidThread()); |
103 sync_system_resources_.network()->UpdateCredentials(email, token); | 131 sync_system_resources_.network()->UpdateCredentials(email, token); |
104 } | 132 } |
105 | 133 |
106 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { | 134 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
107 DCHECK(CalledOnValidThread()); | 135 DCHECK(CalledOnValidThread()); |
108 registered_ids_ = ids; | 136 registered_ids_ = ids; |
109 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a | 137 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a |
110 // working XMPP connection (as observed by us), so check it instead | 138 // working XMPP connection (as observed by us), so check it instead |
111 // of GetState() (see http://crbug.com/139424). | 139 // of GetState() (see http://crbug.com/139424). |
112 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { | 140 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { |
113 DoRegistrationUpdate(); | 141 DoRegistrationUpdate(); |
114 } | 142 } |
115 } | 143 } |
116 | 144 |
145 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, | |
146 const AckHandle& ack_handle) { | |
147 DCHECK(CalledOnValidThread()); | |
148 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); | |
149 if (state_it == invalidation_state_map_.end()) | |
150 return; | |
151 invalidation_state_tracker_.Call( | |
152 FROM_HERE, | |
153 &InvalidationStateTracker::Acknowledge, | |
154 id, | |
155 ack_handle); | |
156 state_it->second.current = ack_handle; | |
157 if (state_it->second.expected.Equals(ack_handle)) { | |
158 // If the received ack matches the expected ack, then we no longer need to | |
159 // keep track of |id| since it is up-to-date. We use some heuristics to | |
160 // avoid unnecessarily resetting the timer, since this results in the timer | |
161 // having to post followup continuation tasks and also makes tests less | |
162 // deterministic. | |
163 bool update_timer = false; | |
164 // If we're removing the head of the queue, we may need to update the timer. | |
165 if (timer_queue_.begin()->second.id == id) { | |
166 TimerQueue::const_iterator it = timer_queue_.begin(); | |
167 ++it; | |
168 // But only if there are no other entries with the same expiration time. | |
169 update_timer = | |
170 timer_queue_.upper_bound(timer_queue_.begin()->first) == it; | |
171 } | |
172 RemoveId(id); | |
173 if (update_timer) | |
174 UpdateTimer(base::TimeTicks::Now()); | |
175 } | |
176 } | |
177 | |
117 void SyncInvalidationListener::Ready( | 178 void SyncInvalidationListener::Ready( |
118 invalidation::InvalidationClient* client) { | 179 invalidation::InvalidationClient* client) { |
119 DCHECK(CalledOnValidThread()); | 180 DCHECK(CalledOnValidThread()); |
120 DCHECK_EQ(client, invalidation_client_.get()); | 181 DCHECK_EQ(client, invalidation_client_.get()); |
121 ticl_state_ = INVALIDATIONS_ENABLED; | 182 ticl_state_ = INVALIDATIONS_ENABLED; |
122 EmitStateChange(); | 183 EmitStateChange(); |
123 DoRegistrationUpdate(); | 184 DoRegistrationUpdate(); |
124 } | 185 } |
125 | 186 |
126 void SyncInvalidationListener::Invalidate( | 187 void SyncInvalidationListener::Invalidate( |
127 invalidation::InvalidationClient* client, | 188 invalidation::InvalidationClient* client, |
128 const invalidation::Invalidation& invalidation, | 189 const invalidation::Invalidation& invalidation, |
129 const invalidation::AckHandle& ack_handle) { | 190 const invalidation::AckHandle& ack_handle) { |
130 DCHECK(CalledOnValidThread()); | 191 DCHECK(CalledOnValidThread()); |
131 DCHECK_EQ(client, invalidation_client_.get()); | 192 DCHECK_EQ(client, invalidation_client_.get()); |
132 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); | 193 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); |
133 | 194 |
134 const invalidation::ObjectId& id = invalidation.object_id(); | 195 const invalidation::ObjectId& id = invalidation.object_id(); |
135 | 196 |
136 // The invalidation API spec allows for the possibility of redundant | 197 // The invalidation API spec allows for the possibility of redundant |
137 // invalidations, so keep track of the max versions and drop | 198 // invalidations, so keep track of the max versions and drop |
138 // invalidations with old versions. | 199 // invalidations with old versions. |
139 // | 200 // |
140 // TODO(akalin): Now that we keep track of registered ids, we | 201 // TODO(akalin): Now that we keep track of registered ids, we |
141 // should drop invalidations for unregistered ids. We may also | 202 // should drop invalidations for unregistered ids. We may also |
142 // have to filter it at a higher level, as invalidations for | 203 // have to filter it at a higher level, as invalidations for |
143 // newly-unregistered ids may already be in flight. | 204 // newly-unregistered ids may already be in flight. |
144 InvalidationVersionMap::const_iterator it = | 205 InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id); |
145 max_invalidation_versions_.find(id); | 206 if ((it != invalidation_state_map_.end()) && |
146 if ((it != max_invalidation_versions_.end()) && | 207 (invalidation.version() <= it->second.version)) { |
147 (invalidation.version() <= it->second)) { | |
148 // Drop redundant invalidations. | 208 // Drop redundant invalidations. |
149 client->Acknowledge(ack_handle); | 209 client->Acknowledge(ack_handle); |
150 return; | 210 return; |
151 } | 211 } |
152 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) | 212 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
153 << " to " << invalidation.version(); | 213 << " to " << invalidation.version(); |
154 max_invalidation_versions_[id] = invalidation.version(); | 214 invalidation_state_map_[id].version = invalidation.version(); |
155 invalidation_state_tracker_.Call( | 215 invalidation_state_tracker_.Call( |
156 FROM_HERE, | 216 FROM_HERE, |
157 &InvalidationStateTracker::SetMaxVersion, | 217 &InvalidationStateTracker::SetMaxVersion, |
akalin
2012/10/19 13:27:16
can we combine the SetMaxVersion and GenerateAckHa
dcheng
2012/10/19 19:38:11
The reason they are split up is because Invalidat
| |
158 id, invalidation.version()); | 218 id, invalidation.version()); |
159 | 219 |
160 std::string payload; | 220 std::string payload; |
161 // payload() CHECK()'s has_payload(), so we must check it ourselves first. | 221 // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
162 if (invalidation.has_payload()) | 222 if (invalidation.has_payload()) |
163 payload = invalidation.payload(); | 223 payload = invalidation.payload(); |
164 | 224 |
165 ObjectIdInvalidationMap invalidation_map; | 225 ObjectIdSet ids; |
166 invalidation_map[id].payload = payload; | 226 ids.insert(id); |
167 EmitInvalidation(invalidation_map); | 227 PrepareInvalidation(ids, payload, client, ack_handle); |
168 // TODO(akalin): We should really acknowledge only after we get the | |
169 // updates from the sync server. (see http://crbug.com/78462). | |
170 client->Acknowledge(ack_handle); | |
171 } | 228 } |
172 | 229 |
173 void SyncInvalidationListener::InvalidateUnknownVersion( | 230 void SyncInvalidationListener::InvalidateUnknownVersion( |
174 invalidation::InvalidationClient* client, | 231 invalidation::InvalidationClient* client, |
175 const invalidation::ObjectId& object_id, | 232 const invalidation::ObjectId& object_id, |
176 const invalidation::AckHandle& ack_handle) { | 233 const invalidation::AckHandle& ack_handle) { |
177 DCHECK(CalledOnValidThread()); | 234 DCHECK(CalledOnValidThread()); |
178 DCHECK_EQ(client, invalidation_client_.get()); | 235 DCHECK_EQ(client, invalidation_client_.get()); |
179 DVLOG(1) << "InvalidateUnknownVersion"; | 236 DVLOG(1) << "InvalidateUnknownVersion"; |
180 | 237 |
181 ObjectIdInvalidationMap invalidation_map; | 238 ObjectIdSet ids; |
182 invalidation_map[object_id].payload = std::string(); | 239 ids.insert(object_id); |
183 EmitInvalidation(invalidation_map); | 240 PrepareInvalidation(ids, std::string(), client, ack_handle); |
184 // TODO(akalin): We should really acknowledge only after we get the | |
185 // updates from the sync server. (see http://crbug.com/78462). | |
186 client->Acknowledge(ack_handle); | |
187 } | 241 } |
188 | 242 |
189 // This should behave as if we got an invalidation with version | 243 // This should behave as if we got an invalidation with version |
190 // UNKNOWN_OBJECT_VERSION for all known data types. | 244 // UNKNOWN_OBJECT_VERSION for all known data types. |
191 void SyncInvalidationListener::InvalidateAll( | 245 void SyncInvalidationListener::InvalidateAll( |
192 invalidation::InvalidationClient* client, | 246 invalidation::InvalidationClient* client, |
193 const invalidation::AckHandle& ack_handle) { | 247 const invalidation::AckHandle& ack_handle) { |
194 DCHECK(CalledOnValidThread()); | 248 DCHECK(CalledOnValidThread()); |
195 DCHECK_EQ(client, invalidation_client_.get()); | 249 DCHECK_EQ(client, invalidation_client_.get()); |
196 DVLOG(1) << "InvalidateAll"; | 250 DVLOG(1) << "InvalidateAll"; |
197 | 251 |
198 const ObjectIdInvalidationMap& invalidation_map = | 252 PrepareInvalidation(registered_ids_, std::string(), client, ack_handle); |
199 ObjectIdSetToInvalidationMap(registered_ids_, std::string()); | 253 } |
200 EmitInvalidation(invalidation_map); | 254 |
201 // TODO(akalin): We should really acknowledge only after we get the | 255 void SyncInvalidationListener::PrepareInvalidation( |
202 // updates from the sync server. (see http://crbug.com/76482). | 256 const ObjectIdSet& ids, |
257 const std::string& payload, | |
258 invalidation::InvalidationClient* client, | |
259 const invalidation::AckHandle& ack_handle) { | |
260 DCHECK(CalledOnValidThread()); | |
261 | |
262 invalidation_state_tracker_.Call( | |
263 FROM_HERE, | |
264 &InvalidationStateTracker::GenerateAckHandles, | |
265 ids, | |
266 base::MessageLoopProxy::current(), | |
267 base::Bind(&SyncInvalidationListener::EmitInvalidation, | |
268 weak_ptr_factory_.GetWeakPtr(), | |
269 ids, | |
270 payload, | |
271 client, | |
272 ack_handle)); | |
273 } | |
274 | |
275 void SyncInvalidationListener::EmitInvalidation( | |
276 const ObjectIdSet& ids, | |
277 const std::string& payload, | |
278 invalidation::InvalidationClient* client, | |
279 const invalidation::AckHandle& ack_handle, | |
280 const AckHandleMap& local_ack_handles) { | |
281 DCHECK(CalledOnValidThread()); | |
282 ObjectIdInvalidationMap invalidation_map = | |
283 ObjectIdSetToInvalidationMap(ids, payload); | |
284 // Erase any timer queue entries that correspond with an id in |ids| since a | |
285 // new invalidation from the server resets the retry count. | |
286 RemoveIds(ids); | |
287 const base::TimeTicks now = base::TimeTicks::Now(); | |
288 const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0); | |
289 for (AckHandleMap::const_iterator it = local_ack_handles.begin(); | |
290 it != local_ack_handles.end(); ++it) { | |
291 // Update in-memory copy of the invalidation state. | |
292 invalidation_state_map_[it->first].expected = it->second; | |
293 invalidation_map[it->first].ack_handle = it->second; | |
294 InsertId(expiration_time, it->first, payload, 0 /* retry_count */); | |
295 } | |
296 DCHECK(!timer_queue_.empty()); | |
297 UpdateTimer(now); | |
298 delegate_->OnInvalidate(invalidation_map); | |
203 client->Acknowledge(ack_handle); | 299 client->Acknowledge(ack_handle); |
204 } | 300 } |
205 | 301 |
206 void SyncInvalidationListener::EmitInvalidation( | 302 void SyncInvalidationListener::ResendUnacknowledgedInvalidations() { |
207 const ObjectIdInvalidationMap& invalidation_map) { | 303 ResendUnacknowledgedInvalidationsAt(base::TimeTicks::Now()); |
208 DCHECK(CalledOnValidThread()); | 304 } |
305 | |
306 void SyncInvalidationListener::ResendUnacknowledgedInvalidationsAt( | |
307 base::TimeTicks expiration_time) { | |
308 DCHECK(!timer_queue_.empty()) << "Timer not canceled but queue is empty!"; | |
309 | |
310 // This is slightly redundant since in non-test code, we always satisfy | |
311 // the condition now == expiration_time. Having this makes the tests a bit | |
312 // more sane though. | |
313 const base::TimeTicks now = base::TimeTicks::Now(); | |
314 TimerQueue::iterator end = timer_queue_.upper_bound(expiration_time); | |
315 // In theory, we can do in one loop since all the new insertions should be | |
316 // past |end|. To avoid potential infinite loops in case of a bug, we do it in | |
317 // two separate loops. | |
318 std::vector<QueueEntry> expired_entries; | |
319 for (TimerQueue::iterator it = timer_queue_.begin(); it != end; ) { | |
320 expired_entries.push_back(it->second); | |
321 TimerQueue::iterator erase_it = it; | |
322 ++it; | |
323 timer_queue_.erase(erase_it); | |
324 } | |
325 | |
326 ObjectIdInvalidationMap invalidation_map; | |
327 for (std::vector<QueueEntry>::const_iterator it = expired_entries.begin(); | |
328 it != expired_entries.end(); ++it) { | |
329 InsertId(now + CalculateBackoffDelay(it->retry_count + 1), | |
330 it->id, | |
331 it->payload, | |
332 it->retry_count + 1); | |
333 Invalidation invalidation; | |
334 invalidation.ack_handle = invalidation_state_map_[it->id].expected; | |
335 invalidation.payload = it->payload; | |
336 invalidation_map.insert(std::make_pair(it->id, invalidation)); | |
337 } | |
338 | |
209 delegate_->OnInvalidate(invalidation_map); | 339 delegate_->OnInvalidate(invalidation_map); |
340 UpdateTimer(now); | |
341 } | |
342 | |
343 void SyncInvalidationListener::InsertId(base::TimeTicks expiration_time, | |
344 const invalidation::ObjectId& id, | |
345 const std::string& payload, | |
346 int retry_count) { | |
347 timer_queue_.insert(std::make_pair(expiration_time, | |
348 QueueEntry(id, payload, retry_count))); | |
349 } | |
350 | |
351 void SyncInvalidationListener::RemoveId(const invalidation::ObjectId& id) { | |
352 for (TimerQueue::iterator it = timer_queue_.begin(); it != timer_queue_.end(); | |
353 ++it) { | |
354 if (it->second.id == id) { | |
355 timer_queue_.erase(it); | |
356 return; | |
357 } | |
358 } | |
359 // This shouldn't happen unless someone is acking multiple times. | |
360 NOTREACHED(); | |
361 } | |
362 | |
363 void SyncInvalidationListener::RemoveIds(const ObjectIdSet& ids) { | |
364 for (TimerQueue::iterator it = timer_queue_.begin(); | |
365 it != timer_queue_.end(); ) { | |
366 // We could be more clever here and reduce the complexity, since both | |
367 // containers are already sorted. Since we don't expect timer_queue_ to be | |
368 // very long in practice though, we just do things the simpler way. | |
369 if (ContainsKey(ids, it->second.id)) { | |
370 TimerQueue::iterator erase_it = it; | |
371 ++it; | |
372 timer_queue_.erase(erase_it); | |
373 } else { | |
374 ++it; | |
375 } | |
376 } | |
377 } | |
378 | |
379 void SyncInvalidationListener::UpdateTimer(base::TimeTicks now) { | |
380 if (timer_queue_.empty()) { | |
381 timer_.Stop(); | |
382 } else { | |
383 timer_.Start(FROM_HERE, | |
384 timer_queue_.begin()->first - now, | |
385 this, | |
386 &SyncInvalidationListener::ResendUnacknowledgedInvalidations); | |
387 } | |
210 } | 388 } |
211 | 389 |
212 void SyncInvalidationListener::InformRegistrationStatus( | 390 void SyncInvalidationListener::InformRegistrationStatus( |
213 invalidation::InvalidationClient* client, | 391 invalidation::InvalidationClient* client, |
214 const invalidation::ObjectId& object_id, | 392 const invalidation::ObjectId& object_id, |
215 InvalidationListener::RegistrationState new_state) { | 393 InvalidationListener::RegistrationState new_state) { |
216 DCHECK(CalledOnValidThread()); | 394 DCHECK(CalledOnValidThread()); |
217 DCHECK_EQ(client, invalidation_client_.get()); | 395 DCHECK_EQ(client, invalidation_client_.get()); |
218 DVLOG(1) << "InformRegistrationStatus: " | 396 DVLOG(1) << "InformRegistrationStatus: " |
219 << ObjectIdToString(object_id) << " " << new_state; | 397 << ObjectIdToString(object_id) << " " << new_state; |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
285 invalidation_state_tracker_.Call( | 463 invalidation_state_tracker_.Call( |
286 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); | 464 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); |
287 } | 465 } |
288 | 466 |
289 void SyncInvalidationListener::DoRegistrationUpdate() { | 467 void SyncInvalidationListener::DoRegistrationUpdate() { |
290 DCHECK(CalledOnValidThread()); | 468 DCHECK(CalledOnValidThread()); |
291 const ObjectIdSet& unregistered_ids = | 469 const ObjectIdSet& unregistered_ids = |
292 registration_manager_->UpdateRegisteredIds(registered_ids_); | 470 registration_manager_->UpdateRegisteredIds(registered_ids_); |
293 invalidation_state_tracker_.Call( | 471 invalidation_state_tracker_.Call( |
294 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); | 472 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
473 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); | |
474 it != unregistered_ids.end(); ++it) { | |
475 invalidation_state_map_.erase(*it); | |
476 } | |
477 for (TimerQueue::iterator it = timer_queue_.begin(); | |
478 it != timer_queue_.end(); ) { | |
479 if (unregistered_ids.find(it->second.id) != unregistered_ids.end()) { | |
480 TimerQueue::iterator erase_it = it; | |
481 ++it; | |
482 timer_queue_.erase(erase_it); | |
483 } else { | |
484 ++it; | |
485 } | |
486 } | |
487 } | |
488 | |
489 bool SyncInvalidationListener::TriggerNextTimeoutForTest( | |
490 base::TimeTicks* next_invalidation_time) { | |
491 if (timer_queue_.empty()) { | |
492 CHECK(!timer_.IsRunning()); | |
493 return false; | |
494 } | |
495 *next_invalidation_time = timer_queue_.begin()->first; | |
496 ResendUnacknowledgedInvalidationsAt(*next_invalidation_time); | |
497 return true; | |
295 } | 498 } |
296 | 499 |
297 void SyncInvalidationListener::StopForTest() { | 500 void SyncInvalidationListener::StopForTest() { |
298 DCHECK(CalledOnValidThread()); | 501 DCHECK(CalledOnValidThread()); |
299 Stop(); | 502 Stop(); |
300 } | 503 } |
301 | 504 |
302 void SyncInvalidationListener::Stop() { | 505 void SyncInvalidationListener::Stop() { |
303 DCHECK(CalledOnValidThread()); | 506 DCHECK(CalledOnValidThread()); |
304 if (!invalidation_client_.get()) { | 507 if (!invalidation_client_.get()) { |
305 return; | 508 return; |
306 } | 509 } |
307 | 510 |
511 timer_.Stop(); | |
512 timer_queue_.clear(); | |
513 | |
308 registration_manager_.reset(); | 514 registration_manager_.reset(); |
309 sync_system_resources_.Stop(); | 515 sync_system_resources_.Stop(); |
310 invalidation_client_->Stop(); | 516 invalidation_client_->Stop(); |
311 | 517 |
312 invalidation_client_.reset(); | 518 invalidation_client_.reset(); |
313 delegate_ = NULL; | 519 delegate_ = NULL; |
314 | 520 |
315 invalidation_state_tracker_.Reset(); | 521 invalidation_state_tracker_.Reset(); |
316 max_invalidation_versions_.clear(); | 522 invalidation_state_map_.clear(); |
317 ticl_state_ = DEFAULT_INVALIDATION_ERROR; | 523 ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
318 push_client_state_ = DEFAULT_INVALIDATION_ERROR; | 524 push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
319 } | 525 } |
320 | 526 |
321 InvalidatorState SyncInvalidationListener::GetState() const { | 527 InvalidatorState SyncInvalidationListener::GetState() const { |
322 DCHECK(CalledOnValidThread()); | 528 DCHECK(CalledOnValidThread()); |
323 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || | 529 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || |
324 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { | 530 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { |
325 // If either the ticl or the push client rejected our credentials, | 531 // If either the ticl or the push client rejected our credentials, |
326 // return INVALIDATION_CREDENTIALS_REJECTED. | 532 // return INVALIDATION_CREDENTIALS_REJECTED. |
(...skipping 27 matching lines...) Expand all Loading... | |
354 EmitStateChange(); | 560 EmitStateChange(); |
355 } | 561 } |
356 | 562 |
357 void SyncInvalidationListener::OnIncomingNotification( | 563 void SyncInvalidationListener::OnIncomingNotification( |
358 const notifier::Notification& notification) { | 564 const notifier::Notification& notification) { |
359 DCHECK(CalledOnValidThread()); | 565 DCHECK(CalledOnValidThread()); |
360 // Do nothing, since this is already handled by |invalidation_client_|. | 566 // Do nothing, since this is already handled by |invalidation_client_|. |
361 } | 567 } |
362 | 568 |
363 } // namespace syncer | 569 } // namespace syncer |
OLD | NEW |