Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "sync/notifier/sync_invalidation_listener.h" | 5 #include "sync/notifier/sync_invalidation_listener.h" |
| 6 | 6 |
| 7 #include <string> | |
| 8 #include <vector> | 7 #include <vector> |
| 9 | 8 |
| 9 #include "base/bind.h" | |
| 10 #include "base/callback.h" | 10 #include "base/callback.h" |
| 11 #include "base/compiler_specific.h" | 11 #include "base/compiler_specific.h" |
| 12 #include "base/logging.h" | 12 #include "base/logging.h" |
| 13 #include "base/stl_util.h" | |
| 13 #include "base/tracked_objects.h" | 14 #include "base/tracked_objects.h" |
| 14 #include "google/cacheinvalidation/include/invalidation-client.h" | 15 #include "google/cacheinvalidation/include/invalidation-client.h" |
| 15 #include "google/cacheinvalidation/include/types.h" | 16 #include "google/cacheinvalidation/include/types.h" |
| 16 #include "google/cacheinvalidation/types.pb.h" | 17 #include "google/cacheinvalidation/types.pb.h" |
| 17 #include "jingle/notifier/listener/push_client.h" | 18 #include "jingle/notifier/listener/push_client.h" |
| 18 #include "sync/notifier/invalidation_util.h" | 19 #include "sync/notifier/invalidation_util.h" |
| 19 #include "sync/notifier/registration_manager.h" | 20 #include "sync/notifier/registration_manager.h" |
| 20 | 21 |
| 21 namespace { | 22 namespace { |
| 22 | 23 |
| 23 const char kApplicationName[] = "chrome-sync"; | 24 const char kApplicationName[] = "chrome-sync"; |
| 25 const int kMaxDelayInSeconds = 600; | |
| 26 | |
| 27 base::TimeDelta CalculateBackoffDelay(int retry_count) { | |
|
akalin
2012/10/19 13:27:16
almost certain there's already an exp. backoff cla
Jói
2012/10/19 13:29:49
net/base/backoff_entry.h
dcheng
2012/10/19 19:38:11
I like this much better and will update the code a
| |
| 28 int delay = kMaxDelayInSeconds; | |
| 29 // Lazy way to prevent overflow. | |
| 30 if (retry_count < 10) | |
| 31 delay = std::min(delay, (1 << retry_count) * 60); | |
| 32 return base::TimeDelta::FromSeconds(delay); | |
| 33 } | |
| 24 | 34 |
| 25 } // namespace | 35 } // namespace |
| 26 | 36 |
| 27 namespace syncer { | 37 namespace syncer { |
| 28 | 38 |
| 29 SyncInvalidationListener::Delegate::~Delegate() {} | 39 SyncInvalidationListener::Delegate::~Delegate() {} |
| 30 | 40 |
| 31 SyncInvalidationListener::SyncInvalidationListener( | 41 SyncInvalidationListener::SyncInvalidationListener( |
| 32 scoped_ptr<notifier::PushClient> push_client) | 42 scoped_ptr<notifier::PushClient> push_client) |
| 33 : push_client_(push_client.get()), | 43 : weak_ptr_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| 44 push_client_(push_client.get()), | |
| 34 sync_system_resources_(push_client.Pass(), | 45 sync_system_resources_(push_client.Pass(), |
| 35 ALLOW_THIS_IN_INITIALIZER_LIST(this)), | 46 ALLOW_THIS_IN_INITIALIZER_LIST(this)), |
| 36 delegate_(NULL), | 47 delegate_(NULL), |
| 37 ticl_state_(DEFAULT_INVALIDATION_ERROR), | 48 ticl_state_(DEFAULT_INVALIDATION_ERROR), |
| 38 push_client_state_(DEFAULT_INVALIDATION_ERROR) { | 49 push_client_state_(DEFAULT_INVALIDATION_ERROR) { |
| 39 DCHECK(CalledOnValidThread()); | 50 DCHECK(CalledOnValidThread()); |
| 40 push_client_->AddObserver(this); | 51 push_client_->AddObserver(this); |
| 41 } | 52 } |
| 42 | 53 |
| 43 SyncInvalidationListener::~SyncInvalidationListener() { | 54 SyncInvalidationListener::~SyncInvalidationListener() { |
| 44 DCHECK(CalledOnValidThread()); | 55 DCHECK(CalledOnValidThread()); |
| 45 push_client_->RemoveObserver(this); | 56 push_client_->RemoveObserver(this); |
| 46 Stop(); | 57 Stop(); |
| 47 DCHECK(!delegate_); | 58 DCHECK(!delegate_); |
| 48 } | 59 } |
| 49 | 60 |
| 50 void SyncInvalidationListener::Start( | 61 void SyncInvalidationListener::Start( |
| 51 const CreateInvalidationClientCallback& | 62 const CreateInvalidationClientCallback& |
| 52 create_invalidation_client_callback, | 63 create_invalidation_client_callback, |
| 53 const std::string& client_id, const std::string& client_info, | 64 const std::string& client_id, const std::string& client_info, |
| 54 const std::string& invalidation_bootstrap_data, | 65 const std::string& invalidation_bootstrap_data, |
| 55 const InvalidationVersionMap& initial_max_invalidation_versions, | 66 const InvalidationStateMap& initial_invalidation_state_map, |
| 56 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, | 67 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
| 57 Delegate* delegate) { | 68 Delegate* delegate) { |
| 58 DCHECK(CalledOnValidThread()); | 69 DCHECK(CalledOnValidThread()); |
| 59 Stop(); | 70 Stop(); |
| 60 | 71 |
| 61 sync_system_resources_.set_platform(client_info); | 72 sync_system_resources_.set_platform(client_info); |
| 62 sync_system_resources_.Start(); | 73 sync_system_resources_.Start(); |
| 63 | 74 |
| 64 // The Storage resource is implemented as a write-through cache. We populate | 75 // The Storage resource is implemented as a write-through cache. We populate |
| 65 // it with the initial state on startup, so subsequent writes go to disk and | 76 // it with the initial state on startup, so subsequent writes go to disk and |
| 66 // update the in-memory cache, while reads just return the cached state. | 77 // update the in-memory cache, while reads just return the cached state. |
| 67 sync_system_resources_.storage()->SetInitialState( | 78 sync_system_resources_.storage()->SetInitialState( |
| 68 invalidation_bootstrap_data); | 79 invalidation_bootstrap_data); |
| 69 | 80 |
| 70 max_invalidation_versions_ = initial_max_invalidation_versions; | 81 invalidation_state_map_ = initial_invalidation_state_map; |
| 71 if (max_invalidation_versions_.empty()) { | 82 if (invalidation_state_map_.empty()) { |
| 72 DVLOG(2) << "No initial max invalidation versions for any id"; | 83 DVLOG(2) << "No initial max invalidation versions for any id"; |
| 73 } else { | 84 } else { |
| 74 for (InvalidationVersionMap::const_iterator it = | 85 // Start the reminder timer if we have unacknowledged local invalidations. |
| 75 max_invalidation_versions_.begin(); | 86 for (InvalidationStateMap::const_iterator it = |
| 76 it != max_invalidation_versions_.end(); ++it) { | 87 invalidation_state_map_.begin(); |
| 88 it != invalidation_state_map_.end(); ++it) { | |
| 77 DVLOG(2) << "Initial max invalidation version for " | 89 DVLOG(2) << "Initial max invalidation version for " |
| 78 << ObjectIdToString(it->first) << " is " | 90 << ObjectIdToString(it->first) << " is " |
| 79 << it->second; | 91 << it->second.version; |
| 80 } | 92 } |
| 81 } | 93 } |
| 82 invalidation_state_tracker_ = invalidation_state_tracker; | 94 invalidation_state_tracker_ = invalidation_state_tracker; |
| 83 DCHECK(invalidation_state_tracker_.IsInitialized()); | 95 DCHECK(invalidation_state_tracker_.IsInitialized()); |
| 84 | 96 |
| 85 DCHECK(!delegate_); | 97 DCHECK(!delegate_); |
| 86 DCHECK(delegate); | 98 DCHECK(delegate); |
| 87 delegate_ = delegate; | 99 delegate_ = delegate; |
| 88 | 100 |
| 89 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; | 101 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; |
| 90 invalidation_client_.reset( | 102 invalidation_client_.reset( |
| 91 create_invalidation_client_callback.Run( | 103 create_invalidation_client_callback.Run( |
| 92 &sync_system_resources_, client_type, client_id, | 104 &sync_system_resources_, client_type, client_id, |
| 93 kApplicationName, this)); | 105 kApplicationName, this)); |
| 94 invalidation_client_->Start(); | 106 invalidation_client_->Start(); |
| 95 | 107 |
| 96 registration_manager_.reset( | 108 registration_manager_.reset( |
| 97 new RegistrationManager(invalidation_client_.get())); | 109 new RegistrationManager(invalidation_client_.get())); |
| 110 | |
| 111 // Start the reminder timer if we have unacknowledged local invalidations. | |
| 112 const base::TimeTicks now = base::TimeTicks::Now(); | |
| 113 const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0); | |
| 114 for (InvalidationStateMap::const_iterator it = | |
| 115 invalidation_state_map_.begin(); | |
| 116 it != invalidation_state_map_.end(); ++it) { | |
| 117 if (it->second.expected.Equals(it->second.current)) | |
| 118 continue; | |
| 119 // TODO(dcheng): Save payload? | |
| 120 InsertId(expiration_time, | |
| 121 it->first, | |
| 122 std::string(), | |
| 123 0 /* retry_count */); | |
| 124 } | |
| 125 UpdateTimer(now); | |
| 98 } | 126 } |
| 99 | 127 |
| 100 void SyncInvalidationListener::UpdateCredentials( | 128 void SyncInvalidationListener::UpdateCredentials( |
| 101 const std::string& email, const std::string& token) { | 129 const std::string& email, const std::string& token) { |
| 102 DCHECK(CalledOnValidThread()); | 130 DCHECK(CalledOnValidThread()); |
| 103 sync_system_resources_.network()->UpdateCredentials(email, token); | 131 sync_system_resources_.network()->UpdateCredentials(email, token); |
| 104 } | 132 } |
| 105 | 133 |
| 106 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { | 134 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
| 107 DCHECK(CalledOnValidThread()); | 135 DCHECK(CalledOnValidThread()); |
| 108 registered_ids_ = ids; | 136 registered_ids_ = ids; |
| 109 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a | 137 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a |
| 110 // working XMPP connection (as observed by us), so check it instead | 138 // working XMPP connection (as observed by us), so check it instead |
| 111 // of GetState() (see http://crbug.com/139424). | 139 // of GetState() (see http://crbug.com/139424). |
| 112 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { | 140 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_.get()) { |
| 113 DoRegistrationUpdate(); | 141 DoRegistrationUpdate(); |
| 114 } | 142 } |
| 115 } | 143 } |
| 116 | 144 |
| 145 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, | |
| 146 const AckHandle& ack_handle) { | |
| 147 DCHECK(CalledOnValidThread()); | |
| 148 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); | |
| 149 if (state_it == invalidation_state_map_.end()) | |
| 150 return; | |
| 151 invalidation_state_tracker_.Call( | |
| 152 FROM_HERE, | |
| 153 &InvalidationStateTracker::Acknowledge, | |
| 154 id, | |
| 155 ack_handle); | |
| 156 state_it->second.current = ack_handle; | |
| 157 if (state_it->second.expected.Equals(ack_handle)) { | |
| 158 // If the received ack matches the expected ack, then we no longer need to | |
| 159 // keep track of |id| since it is up-to-date. We use some heuristics to | |
| 160 // avoid unnecessarily resetting the timer, since this results in the timer | |
| 161 // having to post followup continuation tasks and also makes tests less | |
| 162 // deterministic. | |
| 163 bool update_timer = false; | |
| 164 // If we're removing the head of the queue, we may need to update the timer. | |
| 165 if (timer_queue_.begin()->second.id == id) { | |
| 166 TimerQueue::const_iterator it = timer_queue_.begin(); | |
| 167 ++it; | |
| 168 // But only if there are no other entries with the same expiration time. | |
| 169 update_timer = | |
| 170 timer_queue_.upper_bound(timer_queue_.begin()->first) == it; | |
| 171 } | |
| 172 RemoveId(id); | |
| 173 if (update_timer) | |
| 174 UpdateTimer(base::TimeTicks::Now()); | |
| 175 } | |
| 176 } | |
| 177 | |
| 117 void SyncInvalidationListener::Ready( | 178 void SyncInvalidationListener::Ready( |
| 118 invalidation::InvalidationClient* client) { | 179 invalidation::InvalidationClient* client) { |
| 119 DCHECK(CalledOnValidThread()); | 180 DCHECK(CalledOnValidThread()); |
| 120 DCHECK_EQ(client, invalidation_client_.get()); | 181 DCHECK_EQ(client, invalidation_client_.get()); |
| 121 ticl_state_ = INVALIDATIONS_ENABLED; | 182 ticl_state_ = INVALIDATIONS_ENABLED; |
| 122 EmitStateChange(); | 183 EmitStateChange(); |
| 123 DoRegistrationUpdate(); | 184 DoRegistrationUpdate(); |
| 124 } | 185 } |
| 125 | 186 |
| 126 void SyncInvalidationListener::Invalidate( | 187 void SyncInvalidationListener::Invalidate( |
| 127 invalidation::InvalidationClient* client, | 188 invalidation::InvalidationClient* client, |
| 128 const invalidation::Invalidation& invalidation, | 189 const invalidation::Invalidation& invalidation, |
| 129 const invalidation::AckHandle& ack_handle) { | 190 const invalidation::AckHandle& ack_handle) { |
| 130 DCHECK(CalledOnValidThread()); | 191 DCHECK(CalledOnValidThread()); |
| 131 DCHECK_EQ(client, invalidation_client_.get()); | 192 DCHECK_EQ(client, invalidation_client_.get()); |
| 132 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); | 193 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); |
| 133 | 194 |
| 134 const invalidation::ObjectId& id = invalidation.object_id(); | 195 const invalidation::ObjectId& id = invalidation.object_id(); |
| 135 | 196 |
| 136 // The invalidation API spec allows for the possibility of redundant | 197 // The invalidation API spec allows for the possibility of redundant |
| 137 // invalidations, so keep track of the max versions and drop | 198 // invalidations, so keep track of the max versions and drop |
| 138 // invalidations with old versions. | 199 // invalidations with old versions. |
| 139 // | 200 // |
| 140 // TODO(akalin): Now that we keep track of registered ids, we | 201 // TODO(akalin): Now that we keep track of registered ids, we |
| 141 // should drop invalidations for unregistered ids. We may also | 202 // should drop invalidations for unregistered ids. We may also |
| 142 // have to filter it at a higher level, as invalidations for | 203 // have to filter it at a higher level, as invalidations for |
| 143 // newly-unregistered ids may already be in flight. | 204 // newly-unregistered ids may already be in flight. |
| 144 InvalidationVersionMap::const_iterator it = | 205 InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id); |
| 145 max_invalidation_versions_.find(id); | 206 if ((it != invalidation_state_map_.end()) && |
| 146 if ((it != max_invalidation_versions_.end()) && | 207 (invalidation.version() <= it->second.version)) { |
| 147 (invalidation.version() <= it->second)) { | |
| 148 // Drop redundant invalidations. | 208 // Drop redundant invalidations. |
| 149 client->Acknowledge(ack_handle); | 209 client->Acknowledge(ack_handle); |
| 150 return; | 210 return; |
| 151 } | 211 } |
| 152 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) | 212 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) |
| 153 << " to " << invalidation.version(); | 213 << " to " << invalidation.version(); |
| 154 max_invalidation_versions_[id] = invalidation.version(); | 214 invalidation_state_map_[id].version = invalidation.version(); |
| 155 invalidation_state_tracker_.Call( | 215 invalidation_state_tracker_.Call( |
| 156 FROM_HERE, | 216 FROM_HERE, |
| 157 &InvalidationStateTracker::SetMaxVersion, | 217 &InvalidationStateTracker::SetMaxVersion, |
|
akalin
2012/10/19 13:27:16
can we combine the SetMaxVersion and GenerateAckHa
dcheng
2012/10/19 19:38:11
The reason they are split up is because Invalidat
| |
| 158 id, invalidation.version()); | 218 id, invalidation.version()); |
| 159 | 219 |
| 160 std::string payload; | 220 std::string payload; |
| 161 // payload() CHECK()'s has_payload(), so we must check it ourselves first. | 221 // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
| 162 if (invalidation.has_payload()) | 222 if (invalidation.has_payload()) |
| 163 payload = invalidation.payload(); | 223 payload = invalidation.payload(); |
| 164 | 224 |
| 165 ObjectIdInvalidationMap invalidation_map; | 225 ObjectIdSet ids; |
| 166 invalidation_map[id].payload = payload; | 226 ids.insert(id); |
| 167 EmitInvalidation(invalidation_map); | 227 PrepareInvalidation(ids, payload, client, ack_handle); |
| 168 // TODO(akalin): We should really acknowledge only after we get the | |
| 169 // updates from the sync server. (see http://crbug.com/78462). | |
| 170 client->Acknowledge(ack_handle); | |
| 171 } | 228 } |
| 172 | 229 |
| 173 void SyncInvalidationListener::InvalidateUnknownVersion( | 230 void SyncInvalidationListener::InvalidateUnknownVersion( |
| 174 invalidation::InvalidationClient* client, | 231 invalidation::InvalidationClient* client, |
| 175 const invalidation::ObjectId& object_id, | 232 const invalidation::ObjectId& object_id, |
| 176 const invalidation::AckHandle& ack_handle) { | 233 const invalidation::AckHandle& ack_handle) { |
| 177 DCHECK(CalledOnValidThread()); | 234 DCHECK(CalledOnValidThread()); |
| 178 DCHECK_EQ(client, invalidation_client_.get()); | 235 DCHECK_EQ(client, invalidation_client_.get()); |
| 179 DVLOG(1) << "InvalidateUnknownVersion"; | 236 DVLOG(1) << "InvalidateUnknownVersion"; |
| 180 | 237 |
| 181 ObjectIdInvalidationMap invalidation_map; | 238 ObjectIdSet ids; |
| 182 invalidation_map[object_id].payload = std::string(); | 239 ids.insert(object_id); |
| 183 EmitInvalidation(invalidation_map); | 240 PrepareInvalidation(ids, std::string(), client, ack_handle); |
| 184 // TODO(akalin): We should really acknowledge only after we get the | |
| 185 // updates from the sync server. (see http://crbug.com/78462). | |
| 186 client->Acknowledge(ack_handle); | |
| 187 } | 241 } |
| 188 | 242 |
| 189 // This should behave as if we got an invalidation with version | 243 // This should behave as if we got an invalidation with version |
| 190 // UNKNOWN_OBJECT_VERSION for all known data types. | 244 // UNKNOWN_OBJECT_VERSION for all known data types. |
| 191 void SyncInvalidationListener::InvalidateAll( | 245 void SyncInvalidationListener::InvalidateAll( |
| 192 invalidation::InvalidationClient* client, | 246 invalidation::InvalidationClient* client, |
| 193 const invalidation::AckHandle& ack_handle) { | 247 const invalidation::AckHandle& ack_handle) { |
| 194 DCHECK(CalledOnValidThread()); | 248 DCHECK(CalledOnValidThread()); |
| 195 DCHECK_EQ(client, invalidation_client_.get()); | 249 DCHECK_EQ(client, invalidation_client_.get()); |
| 196 DVLOG(1) << "InvalidateAll"; | 250 DVLOG(1) << "InvalidateAll"; |
| 197 | 251 |
| 198 const ObjectIdInvalidationMap& invalidation_map = | 252 PrepareInvalidation(registered_ids_, std::string(), client, ack_handle); |
| 199 ObjectIdSetToInvalidationMap(registered_ids_, std::string()); | 253 } |
| 200 EmitInvalidation(invalidation_map); | 254 |
| 201 // TODO(akalin): We should really acknowledge only after we get the | 255 void SyncInvalidationListener::PrepareInvalidation( |
| 202 // updates from the sync server. (see http://crbug.com/76482). | 256 const ObjectIdSet& ids, |
| 257 const std::string& payload, | |
| 258 invalidation::InvalidationClient* client, | |
| 259 const invalidation::AckHandle& ack_handle) { | |
| 260 DCHECK(CalledOnValidThread()); | |
| 261 | |
| 262 invalidation_state_tracker_.Call( | |
| 263 FROM_HERE, | |
| 264 &InvalidationStateTracker::GenerateAckHandles, | |
| 265 ids, | |
| 266 base::MessageLoopProxy::current(), | |
| 267 base::Bind(&SyncInvalidationListener::EmitInvalidation, | |
| 268 weak_ptr_factory_.GetWeakPtr(), | |
| 269 ids, | |
| 270 payload, | |
| 271 client, | |
| 272 ack_handle)); | |
| 273 } | |
| 274 | |
| 275 void SyncInvalidationListener::EmitInvalidation( | |
| 276 const ObjectIdSet& ids, | |
| 277 const std::string& payload, | |
| 278 invalidation::InvalidationClient* client, | |
| 279 const invalidation::AckHandle& ack_handle, | |
| 280 const AckHandleMap& local_ack_handles) { | |
| 281 DCHECK(CalledOnValidThread()); | |
| 282 ObjectIdInvalidationMap invalidation_map = | |
| 283 ObjectIdSetToInvalidationMap(ids, payload); | |
| 284 // Erase any timer queue entries that correspond with an id in |ids| since a | |
| 285 // new invalidation from the server resets the retry count. | |
| 286 RemoveIds(ids); | |
| 287 const base::TimeTicks now = base::TimeTicks::Now(); | |
| 288 const base::TimeTicks expiration_time = now + CalculateBackoffDelay(0); | |
| 289 for (AckHandleMap::const_iterator it = local_ack_handles.begin(); | |
| 290 it != local_ack_handles.end(); ++it) { | |
| 291 // Update in-memory copy of the invalidation state. | |
| 292 invalidation_state_map_[it->first].expected = it->second; | |
| 293 invalidation_map[it->first].ack_handle = it->second; | |
| 294 InsertId(expiration_time, it->first, payload, 0 /* retry_count */); | |
| 295 } | |
| 296 DCHECK(!timer_queue_.empty()); | |
| 297 UpdateTimer(now); | |
| 298 delegate_->OnInvalidate(invalidation_map); | |
| 203 client->Acknowledge(ack_handle); | 299 client->Acknowledge(ack_handle); |
| 204 } | 300 } |
| 205 | 301 |
| 206 void SyncInvalidationListener::EmitInvalidation( | 302 void SyncInvalidationListener::ResendUnacknowledgedInvalidations() { |
| 207 const ObjectIdInvalidationMap& invalidation_map) { | 303 ResendUnacknowledgedInvalidationsAt(base::TimeTicks::Now()); |
| 208 DCHECK(CalledOnValidThread()); | 304 } |
| 305 | |
| 306 void SyncInvalidationListener::ResendUnacknowledgedInvalidationsAt( | |
| 307 base::TimeTicks expiration_time) { | |
| 308 DCHECK(!timer_queue_.empty()) << "Timer not canceled but queue is empty!"; | |
| 309 | |
| 310 // This is slightly redundant since in non-test code, we always satisfy | |
| 311 // the condition now == expiration_time. Having this makes the tests a bit | |
| 312 // more sane though. | |
| 313 const base::TimeTicks now = base::TimeTicks::Now(); | |
| 314 TimerQueue::iterator end = timer_queue_.upper_bound(expiration_time); | |
| 315 // In theory, we can do in one loop since all the new insertions should be | |
| 316 // past |end|. To avoid potential infinite loops in case of a bug, we do it in | |
| 317 // two separate loops. | |
| 318 std::vector<QueueEntry> expired_entries; | |
| 319 for (TimerQueue::iterator it = timer_queue_.begin(); it != end; ) { | |
| 320 expired_entries.push_back(it->second); | |
| 321 TimerQueue::iterator erase_it = it; | |
| 322 ++it; | |
| 323 timer_queue_.erase(erase_it); | |
| 324 } | |
| 325 | |
| 326 ObjectIdInvalidationMap invalidation_map; | |
| 327 for (std::vector<QueueEntry>::const_iterator it = expired_entries.begin(); | |
| 328 it != expired_entries.end(); ++it) { | |
| 329 InsertId(now + CalculateBackoffDelay(it->retry_count + 1), | |
| 330 it->id, | |
| 331 it->payload, | |
| 332 it->retry_count + 1); | |
| 333 Invalidation invalidation; | |
| 334 invalidation.ack_handle = invalidation_state_map_[it->id].expected; | |
| 335 invalidation.payload = it->payload; | |
| 336 invalidation_map.insert(std::make_pair(it->id, invalidation)); | |
| 337 } | |
| 338 | |
| 209 delegate_->OnInvalidate(invalidation_map); | 339 delegate_->OnInvalidate(invalidation_map); |
| 340 UpdateTimer(now); | |
| 341 } | |
| 342 | |
| 343 void SyncInvalidationListener::InsertId(base::TimeTicks expiration_time, | |
| 344 const invalidation::ObjectId& id, | |
| 345 const std::string& payload, | |
| 346 int retry_count) { | |
| 347 timer_queue_.insert(std::make_pair(expiration_time, | |
| 348 QueueEntry(id, payload, retry_count))); | |
| 349 } | |
| 350 | |
| 351 void SyncInvalidationListener::RemoveId(const invalidation::ObjectId& id) { | |
| 352 for (TimerQueue::iterator it = timer_queue_.begin(); it != timer_queue_.end(); | |
| 353 ++it) { | |
| 354 if (it->second.id == id) { | |
| 355 timer_queue_.erase(it); | |
| 356 return; | |
| 357 } | |
| 358 } | |
| 359 // This shouldn't happen unless someone is acking multiple times. | |
| 360 NOTREACHED(); | |
| 361 } | |
| 362 | |
| 363 void SyncInvalidationListener::RemoveIds(const ObjectIdSet& ids) { | |
| 364 for (TimerQueue::iterator it = timer_queue_.begin(); | |
| 365 it != timer_queue_.end(); ) { | |
| 366 // We could be more clever here and reduce the complexity, since both | |
| 367 // containers are already sorted. Since we don't expect timer_queue_ to be | |
| 368 // very long in practice though, we just do things the simpler way. | |
| 369 if (ContainsKey(ids, it->second.id)) { | |
| 370 TimerQueue::iterator erase_it = it; | |
| 371 ++it; | |
| 372 timer_queue_.erase(erase_it); | |
| 373 } else { | |
| 374 ++it; | |
| 375 } | |
| 376 } | |
| 377 } | |
| 378 | |
| 379 void SyncInvalidationListener::UpdateTimer(base::TimeTicks now) { | |
| 380 if (timer_queue_.empty()) { | |
| 381 timer_.Stop(); | |
| 382 } else { | |
| 383 timer_.Start(FROM_HERE, | |
| 384 timer_queue_.begin()->first - now, | |
| 385 this, | |
| 386 &SyncInvalidationListener::ResendUnacknowledgedInvalidations); | |
| 387 } | |
| 210 } | 388 } |
| 211 | 389 |
| 212 void SyncInvalidationListener::InformRegistrationStatus( | 390 void SyncInvalidationListener::InformRegistrationStatus( |
| 213 invalidation::InvalidationClient* client, | 391 invalidation::InvalidationClient* client, |
| 214 const invalidation::ObjectId& object_id, | 392 const invalidation::ObjectId& object_id, |
| 215 InvalidationListener::RegistrationState new_state) { | 393 InvalidationListener::RegistrationState new_state) { |
| 216 DCHECK(CalledOnValidThread()); | 394 DCHECK(CalledOnValidThread()); |
| 217 DCHECK_EQ(client, invalidation_client_.get()); | 395 DCHECK_EQ(client, invalidation_client_.get()); |
| 218 DVLOG(1) << "InformRegistrationStatus: " | 396 DVLOG(1) << "InformRegistrationStatus: " |
| 219 << ObjectIdToString(object_id) << " " << new_state; | 397 << ObjectIdToString(object_id) << " " << new_state; |
| (...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 285 invalidation_state_tracker_.Call( | 463 invalidation_state_tracker_.Call( |
| 286 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); | 464 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); |
| 287 } | 465 } |
| 288 | 466 |
| 289 void SyncInvalidationListener::DoRegistrationUpdate() { | 467 void SyncInvalidationListener::DoRegistrationUpdate() { |
| 290 DCHECK(CalledOnValidThread()); | 468 DCHECK(CalledOnValidThread()); |
| 291 const ObjectIdSet& unregistered_ids = | 469 const ObjectIdSet& unregistered_ids = |
| 292 registration_manager_->UpdateRegisteredIds(registered_ids_); | 470 registration_manager_->UpdateRegisteredIds(registered_ids_); |
| 293 invalidation_state_tracker_.Call( | 471 invalidation_state_tracker_.Call( |
| 294 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); | 472 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); |
| 473 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); | |
| 474 it != unregistered_ids.end(); ++it) { | |
| 475 invalidation_state_map_.erase(*it); | |
| 476 } | |
| 477 for (TimerQueue::iterator it = timer_queue_.begin(); | |
| 478 it != timer_queue_.end(); ) { | |
| 479 if (unregistered_ids.find(it->second.id) != unregistered_ids.end()) { | |
| 480 TimerQueue::iterator erase_it = it; | |
| 481 ++it; | |
| 482 timer_queue_.erase(erase_it); | |
| 483 } else { | |
| 484 ++it; | |
| 485 } | |
| 486 } | |
| 487 } | |
| 488 | |
| 489 bool SyncInvalidationListener::TriggerNextTimeoutForTest( | |
| 490 base::TimeTicks* next_invalidation_time) { | |
| 491 if (timer_queue_.empty()) { | |
| 492 CHECK(!timer_.IsRunning()); | |
| 493 return false; | |
| 494 } | |
| 495 *next_invalidation_time = timer_queue_.begin()->first; | |
| 496 ResendUnacknowledgedInvalidationsAt(*next_invalidation_time); | |
| 497 return true; | |
| 295 } | 498 } |
| 296 | 499 |
| 297 void SyncInvalidationListener::StopForTest() { | 500 void SyncInvalidationListener::StopForTest() { |
| 298 DCHECK(CalledOnValidThread()); | 501 DCHECK(CalledOnValidThread()); |
| 299 Stop(); | 502 Stop(); |
| 300 } | 503 } |
| 301 | 504 |
| 302 void SyncInvalidationListener::Stop() { | 505 void SyncInvalidationListener::Stop() { |
| 303 DCHECK(CalledOnValidThread()); | 506 DCHECK(CalledOnValidThread()); |
| 304 if (!invalidation_client_.get()) { | 507 if (!invalidation_client_.get()) { |
| 305 return; | 508 return; |
| 306 } | 509 } |
| 307 | 510 |
| 511 timer_.Stop(); | |
| 512 timer_queue_.clear(); | |
| 513 | |
| 308 registration_manager_.reset(); | 514 registration_manager_.reset(); |
| 309 sync_system_resources_.Stop(); | 515 sync_system_resources_.Stop(); |
| 310 invalidation_client_->Stop(); | 516 invalidation_client_->Stop(); |
| 311 | 517 |
| 312 invalidation_client_.reset(); | 518 invalidation_client_.reset(); |
| 313 delegate_ = NULL; | 519 delegate_ = NULL; |
| 314 | 520 |
| 315 invalidation_state_tracker_.Reset(); | 521 invalidation_state_tracker_.Reset(); |
| 316 max_invalidation_versions_.clear(); | 522 invalidation_state_map_.clear(); |
| 317 ticl_state_ = DEFAULT_INVALIDATION_ERROR; | 523 ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
| 318 push_client_state_ = DEFAULT_INVALIDATION_ERROR; | 524 push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
| 319 } | 525 } |
| 320 | 526 |
| 321 InvalidatorState SyncInvalidationListener::GetState() const { | 527 InvalidatorState SyncInvalidationListener::GetState() const { |
| 322 DCHECK(CalledOnValidThread()); | 528 DCHECK(CalledOnValidThread()); |
| 323 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || | 529 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || |
| 324 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { | 530 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { |
| 325 // If either the ticl or the push client rejected our credentials, | 531 // If either the ticl or the push client rejected our credentials, |
| 326 // return INVALIDATION_CREDENTIALS_REJECTED. | 532 // return INVALIDATION_CREDENTIALS_REJECTED. |
| (...skipping 27 matching lines...) Expand all Loading... | |
| 354 EmitStateChange(); | 560 EmitStateChange(); |
| 355 } | 561 } |
| 356 | 562 |
| 357 void SyncInvalidationListener::OnIncomingNotification( | 563 void SyncInvalidationListener::OnIncomingNotification( |
| 358 const notifier::Notification& notification) { | 564 const notifier::Notification& notification) { |
| 359 DCHECK(CalledOnValidThread()); | 565 DCHECK(CalledOnValidThread()); |
| 360 // Do nothing, since this is already handled by |invalidation_client_|. | 566 // Do nothing, since this is already handled by |invalidation_client_|. |
| 361 } | 567 } |
| 362 | 568 |
| 363 } // namespace syncer | 569 } // namespace syncer |
| OLD | NEW |