Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(149)

Side by Side Diff: sync/notifier/sync_invalidation_listener.cc

Issue 56113003: Implement new invalidations ack tracking system (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Review fixes Created 7 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "sync/notifier/sync_invalidation_listener.h" 5 #include "sync/notifier/sync_invalidation_listener.h"
6 6
7 #include <vector> 7 #include <vector>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/callback.h" 10 #include "base/callback.h"
11 #include "base/compiler_specific.h" 11 #include "base/compiler_specific.h"
12 #include "base/logging.h" 12 #include "base/logging.h"
13 #include "base/tracked_objects.h" 13 #include "base/tracked_objects.h"
14 #include "google/cacheinvalidation/include/invalidation-client.h" 14 #include "google/cacheinvalidation/include/invalidation-client.h"
15 #include "google/cacheinvalidation/include/types.h" 15 #include "google/cacheinvalidation/include/types.h"
16 #include "google/cacheinvalidation/types.pb.h" 16 #include "google/cacheinvalidation/types.pb.h"
17 #include "jingle/notifier/listener/push_client.h" 17 #include "jingle/notifier/listener/push_client.h"
18 #include "sync/notifier/invalidation_util.h" 18 #include "sync/notifier/invalidation_util.h"
19 #include "sync/notifier/object_id_invalidation_map.h"
19 #include "sync/notifier/registration_manager.h" 20 #include "sync/notifier/registration_manager.h"
20 21
21 namespace { 22 namespace {
22 23
23 const char kApplicationName[] = "chrome-sync"; 24 const char kApplicationName[] = "chrome-sync";
24 25
25 static const int64 kUnknownVersion = -1;
26
27 } // namespace 26 } // namespace
28 27
29 namespace syncer { 28 namespace syncer {
30 29
31 SyncInvalidationListener::Delegate::~Delegate() {} 30 SyncInvalidationListener::Delegate::~Delegate() {}
32 31
33 SyncInvalidationListener::SyncInvalidationListener( 32 SyncInvalidationListener::SyncInvalidationListener(
34 base::TickClock* tick_clock,
35 scoped_ptr<notifier::PushClient> push_client) 33 scoped_ptr<notifier::PushClient> push_client)
36 : ack_tracker_(tick_clock, this), 34 : push_client_(push_client.get()),
akalin 2013/11/21 04:31:36 Should not have .get(). Otherwise, you'd have two
rlarocque 2013/11/21 20:09:27 This hasn't changed in a long time. I think this
37 push_client_(push_client.get()),
38 sync_system_resources_(push_client.Pass(), this), 35 sync_system_resources_(push_client.Pass(), this),
39 delegate_(NULL), 36 delegate_(NULL),
40 ticl_state_(DEFAULT_INVALIDATION_ERROR), 37 ticl_state_(DEFAULT_INVALIDATION_ERROR),
41 push_client_state_(DEFAULT_INVALIDATION_ERROR), 38 push_client_state_(DEFAULT_INVALIDATION_ERROR),
42 weak_ptr_factory_(this) { 39 weak_ptr_factory_(this) {
43 DCHECK(CalledOnValidThread()); 40 DCHECK(CalledOnValidThread());
44 push_client_->AddObserver(this); 41 push_client_->AddObserver(this);
45 } 42 }
46 43
47 SyncInvalidationListener::~SyncInvalidationListener() { 44 SyncInvalidationListener::~SyncInvalidationListener() {
48 DCHECK(CalledOnValidThread()); 45 DCHECK(CalledOnValidThread());
49 push_client_->RemoveObserver(this); 46 push_client_->RemoveObserver(this);
50 Stop(); 47 Stop();
51 DCHECK(!delegate_); 48 DCHECK(!delegate_);
52 } 49 }
53 50
54 void SyncInvalidationListener::Start( 51 void SyncInvalidationListener::Start(
55 const CreateInvalidationClientCallback& 52 const CreateInvalidationClientCallback&
56 create_invalidation_client_callback, 53 create_invalidation_client_callback,
57 const std::string& client_id, const std::string& client_info, 54 const std::string& client_id, const std::string& client_info,
58 const std::string& invalidation_bootstrap_data, 55 const std::string& invalidation_bootstrap_data,
59 const InvalidationStateMap& initial_invalidation_state_map, 56 const UnackedInvalidationsMap& initial_unacked_invalidations,
60 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, 57 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker,
61 Delegate* delegate) { 58 Delegate* delegate) {
62 DCHECK(CalledOnValidThread()); 59 DCHECK(CalledOnValidThread());
63 Stop(); 60 Stop();
64 61
65 sync_system_resources_.set_platform(client_info); 62 sync_system_resources_.set_platform(client_info);
66 sync_system_resources_.Start(); 63 sync_system_resources_.Start();
67 64
68 // The Storage resource is implemented as a write-through cache. We populate 65 // The Storage resource is implemented as a write-through cache. We populate
69 // it with the initial state on startup, so subsequent writes go to disk and 66 // it with the initial state on startup, so subsequent writes go to disk and
70 // update the in-memory cache, while reads just return the cached state. 67 // update the in-memory cache, while reads just return the cached state.
71 sync_system_resources_.storage()->SetInitialState( 68 sync_system_resources_.storage()->SetInitialState(
72 invalidation_bootstrap_data); 69 invalidation_bootstrap_data);
73 70
74 invalidation_state_map_ = initial_invalidation_state_map; 71 unacked_invalidations_map_ = initial_unacked_invalidations;
75 if (invalidation_state_map_.empty()) {
76 DVLOG(2) << "No initial max invalidation versions for any id";
77 } else {
78 for (InvalidationStateMap::const_iterator it =
79 invalidation_state_map_.begin();
80 it != invalidation_state_map_.end(); ++it) {
81 DVLOG(2) << "Initial max invalidation version for "
82 << ObjectIdToString(it->first) << " is "
83 << it->second.version;
84 }
85 }
86 invalidation_state_tracker_ = invalidation_state_tracker; 72 invalidation_state_tracker_ = invalidation_state_tracker;
87 DCHECK(invalidation_state_tracker_.IsInitialized()); 73 DCHECK(invalidation_state_tracker_.IsInitialized());
88 74
89 DCHECK(!delegate_); 75 DCHECK(!delegate_);
90 DCHECK(delegate); 76 DCHECK(delegate);
91 delegate_ = delegate; 77 delegate_ = delegate;
92 78
93 #if defined(OS_IOS) 79 #if defined(OS_IOS)
94 int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS; 80 int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS;
95 #else 81 #else
96 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; 82 int client_type = ipc::invalidation::ClientType::CHROME_SYNC;
97 #endif 83 #endif
98 invalidation_client_.reset( 84 invalidation_client_.reset(
99 create_invalidation_client_callback.Run( 85 create_invalidation_client_callback.Run(
100 &sync_system_resources_, client_type, client_id, 86 &sync_system_resources_, client_type, client_id,
101 kApplicationName, this)); 87 kApplicationName, this));
102 invalidation_client_->Start(); 88 invalidation_client_->Start();
103 89
104 registration_manager_.reset( 90 registration_manager_.reset(
105 new RegistrationManager(invalidation_client_.get())); 91 new RegistrationManager(invalidation_client_.get()));
106
107 // Set up reminders for any invalidations that have not been locally
108 // acknowledged.
109 ObjectIdSet unacknowledged_ids;
110 for (InvalidationStateMap::const_iterator it =
111 invalidation_state_map_.begin();
112 it != invalidation_state_map_.end(); ++it) {
113 if (it->second.expected.Equals(it->second.current))
114 continue;
115 unacknowledged_ids.insert(it->first);
116 }
117 if (!unacknowledged_ids.empty())
118 ack_tracker_.Track(unacknowledged_ids);
119 } 92 }
120 93
121 void SyncInvalidationListener::UpdateCredentials( 94 void SyncInvalidationListener::UpdateCredentials(
122 const std::string& email, const std::string& token) { 95 const std::string& email, const std::string& token) {
123 DCHECK(CalledOnValidThread()); 96 DCHECK(CalledOnValidThread());
124 sync_system_resources_.network()->UpdateCredentials(email, token); 97 sync_system_resources_.network()->UpdateCredentials(email, token);
125 } 98 }
126 99
127 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { 100 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) {
128 DCHECK(CalledOnValidThread()); 101 DCHECK(CalledOnValidThread());
129 registered_ids_ = ids; 102 registered_ids_ = ids;
130 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a 103 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a
131 // working XMPP connection (as observed by us), so check it instead 104 // working XMPP connection (as observed by us), so check it instead
132 // of GetState() (see http://crbug.com/139424). 105 // of GetState() (see http://crbug.com/139424).
133 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { 106 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) {
134 DoRegistrationUpdate(); 107 DoRegistrationUpdate();
135 } 108 }
136 } 109 }
137 110
138 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id,
139 const AckHandle& ack_handle) {
140 DCHECK(CalledOnValidThread());
141 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id);
142 if (state_it == invalidation_state_map_.end())
143 return;
144 invalidation_state_tracker_.Call(
145 FROM_HERE,
146 &InvalidationStateTracker::Acknowledge,
147 id,
148 ack_handle);
149 state_it->second.current = ack_handle;
150 if (state_it->second.expected.Equals(ack_handle)) {
151 // If the received ack matches the expected ack, then we no longer need to
152 // keep track of |id| since it is up-to-date.
153 ObjectIdSet ids;
154 ids.insert(id);
155 ack_tracker_.Ack(ids);
156 }
157 }
158
159 void SyncInvalidationListener::Ready( 111 void SyncInvalidationListener::Ready(
160 invalidation::InvalidationClient* client) { 112 invalidation::InvalidationClient* client) {
161 DCHECK(CalledOnValidThread()); 113 DCHECK(CalledOnValidThread());
162 DCHECK_EQ(client, invalidation_client_.get()); 114 DCHECK_EQ(client, invalidation_client_.get());
163 ticl_state_ = INVALIDATIONS_ENABLED; 115 ticl_state_ = INVALIDATIONS_ENABLED;
164 EmitStateChange(); 116 EmitStateChange();
165 DoRegistrationUpdate(); 117 DoRegistrationUpdate();
166 } 118 }
167 119
168 void SyncInvalidationListener::Invalidate( 120 void SyncInvalidationListener::Invalidate(
169 invalidation::InvalidationClient* client, 121 invalidation::InvalidationClient* client,
170 const invalidation::Invalidation& invalidation, 122 const invalidation::Invalidation& invalidation,
171 const invalidation::AckHandle& ack_handle) { 123 const invalidation::AckHandle& ack_handle) {
172 DCHECK(CalledOnValidThread()); 124 DCHECK(CalledOnValidThread());
173 DCHECK_EQ(client, invalidation_client_.get()); 125 DCHECK_EQ(client, invalidation_client_.get());
174 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); 126 client->Acknowledge(ack_handle);
175 127
176 const invalidation::ObjectId& id = invalidation.object_id(); 128 const invalidation::ObjectId& id = invalidation.object_id();
177 129
178 // The invalidation API spec allows for the possibility of redundant
179 // invalidations, so keep track of the max versions and drop
180 // invalidations with old versions.
181 //
182 // TODO(akalin): Now that we keep track of registered ids, we
183 // should drop invalidations for unregistered ids. We may also
184 // have to filter it at a higher level, as invalidations for
185 // newly-unregistered ids may already be in flight.
186 InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id);
187 if ((it != invalidation_state_map_.end()) &&
188 (invalidation.version() <= it->second.version)) {
189 // Drop redundant invalidations.
190 client->Acknowledge(ack_handle);
191 return;
192 }
193
194 std::string payload; 130 std::string payload;
195 // payload() CHECK()'s has_payload(), so we must check it ourselves first. 131 // payload() CHECK()'s has_payload(), so we must check it ourselves first.
196 if (invalidation.has_payload()) 132 if (invalidation.has_payload())
197 payload = invalidation.payload(); 133 payload = invalidation.payload();
198 134
199 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) 135 DVLOG(2) << "Received invalidation with version " << invalidation.version()
200 << " to " << invalidation.version(); 136 << " for " << ObjectIdToString(id);
201 invalidation_state_map_[id].version = invalidation.version();
202 invalidation_state_map_[id].payload = payload;
203 invalidation_state_tracker_.Call(
204 FROM_HERE,
205 &InvalidationStateTracker::SetMaxVersionAndPayload,
206 id, invalidation.version(), payload);
207 137
208 ObjectIdSet ids; 138 ObjectIdInvalidationMap invalidations;
209 ids.insert(id); 139 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload);
210 PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle); 140 inv.set_ack_handler(GetThisAsAckHandler());
141 invalidations.Insert(inv);
142
143 DispatchInvalidations(invalidations);
211 } 144 }
212 145
213 void SyncInvalidationListener::InvalidateUnknownVersion( 146 void SyncInvalidationListener::InvalidateUnknownVersion(
214 invalidation::InvalidationClient* client, 147 invalidation::InvalidationClient* client,
215 const invalidation::ObjectId& object_id, 148 const invalidation::ObjectId& object_id,
216 const invalidation::AckHandle& ack_handle) { 149 const invalidation::AckHandle& ack_handle) {
217 DCHECK(CalledOnValidThread()); 150 DCHECK(CalledOnValidThread());
218 DCHECK_EQ(client, invalidation_client_.get()); 151 DCHECK_EQ(client, invalidation_client_.get());
219 DVLOG(1) << "InvalidateUnknownVersion"; 152 DVLOG(1) << "InvalidateUnknownVersion";
153 client->Acknowledge(ack_handle);
220 154
221 ObjectIdSet ids; 155 ObjectIdInvalidationMap invalidations;
222 ids.insert(object_id); 156 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id);
223 PrepareInvalidation( 157 unknown_version.set_ack_handler(GetThisAsAckHandler());
224 ids, 158 invalidations.Insert(unknown_version);
225 kUnknownVersion, 159
226 std::string(), 160 DispatchInvalidations(invalidations);
227 client,
228 ack_handle);
229 } 161 }
230 162
231 // This should behave as if we got an invalidation with version 163 // This should behave as if we got an invalidation with version
232 // UNKNOWN_OBJECT_VERSION for all known data types. 164 // UNKNOWN_OBJECT_VERSION for all known data types.
233 void SyncInvalidationListener::InvalidateAll( 165 void SyncInvalidationListener::InvalidateAll(
234 invalidation::InvalidationClient* client, 166 invalidation::InvalidationClient* client,
235 const invalidation::AckHandle& ack_handle) { 167 const invalidation::AckHandle& ack_handle) {
236 DCHECK(CalledOnValidThread()); 168 DCHECK(CalledOnValidThread());
237 DCHECK_EQ(client, invalidation_client_.get()); 169 DCHECK_EQ(client, invalidation_client_.get());
238 DVLOG(1) << "InvalidateAll"; 170 DVLOG(1) << "InvalidateAll";
171 client->Acknowledge(ack_handle);
239 172
240 PrepareInvalidation( 173 ObjectIdInvalidationMap invalidations;
241 registered_ids_, 174 for (ObjectIdSet::iterator it = registered_ids_.begin();
242 kUnknownVersion, 175 it != registered_ids_.end(); ++it) {
243 std::string(), 176 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it);
244 client, 177 unknown_version.set_ack_handler(GetThisAsAckHandler());
245 ack_handle); 178 invalidations.Insert(unknown_version);
179 }
180
181 DispatchInvalidations(invalidations);
246 } 182 }
247 183
248 void SyncInvalidationListener::PrepareInvalidation( 184 // If a handler is registered, emit right away. Otherwise, save it for later.
249 const ObjectIdSet& ids, 185 void SyncInvalidationListener::DispatchInvalidations(
250 int64 version, 186 const ObjectIdInvalidationMap& invalidations) {
251 const std::string& payload,
252 invalidation::InvalidationClient* client,
253 const invalidation::AckHandle& ack_handle) {
254 DCHECK(CalledOnValidThread()); 187 DCHECK(CalledOnValidThread());
255 188
256 // A server invalidation resets the local retry count. 189 ObjectIdInvalidationMap to_save = invalidations;
257 ack_tracker_.Ack(ids); 190 ObjectIdInvalidationMap to_emit =
191 invalidations.GetSubsetWithObjectIds(registered_ids_);
192
193 SaveInvalidations(to_save);
194 EmitInvalidations(to_emit);
195 }
196
197 void SyncInvalidationListener::SaveInvalidations(
198 const ObjectIdInvalidationMap& to_save) {
199 ObjectIdSet objects_to_save = to_save.GetObjectIds();
200 for (ObjectIdSet::const_iterator it = objects_to_save.begin();
201 it != objects_to_save.end(); ++it) {
202 UnackedInvalidationsMap::iterator lookup =
203 unacked_invalidations_map_.find(*it);
204 if (lookup == unacked_invalidations_map_.end()) {
205 lookup = unacked_invalidations_map_.insert(
206 std::make_pair(*it, UnackedInvalidationSet(*it))).first;
207 }
208 lookup->second.AddSet(to_save.ForObject(*it));
209 }
210
258 invalidation_state_tracker_.Call( 211 invalidation_state_tracker_.Call(
259 FROM_HERE, 212 FROM_HERE,
260 &InvalidationStateTracker::GenerateAckHandles, 213 &InvalidationStateTracker::SetSavedInvalidations,
261 ids, 214 unacked_invalidations_map_);
262 base::MessageLoopProxy::current(),
263 base::Bind(&SyncInvalidationListener::EmitInvalidation,
264 weak_ptr_factory_.GetWeakPtr(),
265 ids,
266 version,
267 payload,
268 client,
269 ack_handle));
270 } 215 }
271 216
272 void SyncInvalidationListener::EmitInvalidation( 217 void SyncInvalidationListener::EmitInvalidations(
273 const ObjectIdSet& ids, 218 const ObjectIdInvalidationMap& to_emit) {
274 int64 version, 219 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString();
275 const std::string& payload, 220 delegate_->OnInvalidate(to_emit);
276 invalidation::InvalidationClient* client,
277 const invalidation::AckHandle& ack_handle,
278 const AckHandleMap& local_ack_handles) {
279 DCHECK(CalledOnValidThread());
280
281 ObjectIdInvalidationMap invalidation_map;
282 for (AckHandleMap::const_iterator it = local_ack_handles.begin();
283 it != local_ack_handles.end(); ++it) {
284 // Update in-memory copy of the invalidation state.
285 invalidation_state_map_[it->first].expected = it->second;
286
287 if (version == kUnknownVersion) {
288 Invalidation inv = Invalidation::InitUnknownVersion(it->first);
289 inv.set_ack_handle(it->second);
290 invalidation_map.Insert(inv);
291 } else {
292 Invalidation inv = Invalidation::Init(it->first, version, payload);
293 inv.set_ack_handle(it->second);
294 invalidation_map.Insert(inv);
295 }
296 }
297 ack_tracker_.Track(ids);
298 delegate_->OnInvalidate(invalidation_map);
299 client->Acknowledge(ack_handle);
300 }
301
302 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) {
303 ObjectIdInvalidationMap invalidation_map;
304 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) {
305 if (invalidation_state_map_[*it].version == kUnknownVersion) {
306 Invalidation inv = Invalidation::InitUnknownVersion(*it);
307 inv.set_ack_handle(invalidation_state_map_[*it].expected);
308 invalidation_map.Insert(inv);
309 } else {
310 Invalidation inv = Invalidation::Init(
311 *it,
312 invalidation_state_map_[*it].version,
313 invalidation_state_map_[*it].payload);
314 inv.set_ack_handle(invalidation_state_map_[*it].expected);
315 invalidation_map.Insert(inv);
316 }
317 }
318 delegate_->OnInvalidate(invalidation_map);
319 } 221 }
320 222
321 void SyncInvalidationListener::InformRegistrationStatus( 223 void SyncInvalidationListener::InformRegistrationStatus(
322 invalidation::InvalidationClient* client, 224 invalidation::InvalidationClient* client,
323 const invalidation::ObjectId& object_id, 225 const invalidation::ObjectId& object_id,
324 InvalidationListener::RegistrationState new_state) { 226 InvalidationListener::RegistrationState new_state) {
325 DCHECK(CalledOnValidThread()); 227 DCHECK(CalledOnValidThread());
326 DCHECK_EQ(client, invalidation_client_.get()); 228 DCHECK_EQ(client, invalidation_client_.get());
327 DVLOG(1) << "InformRegistrationStatus: " 229 DVLOG(1) << "InformRegistrationStatus: "
328 << ObjectIdToString(object_id) << " " << new_state; 230 << ObjectIdToString(object_id) << " " << new_state;
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
381 << error_info.error_message() 283 << error_info.error_message()
382 << " (transient = " << error_info.is_transient() << ")"; 284 << " (transient = " << error_info.is_transient() << ")";
383 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { 285 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) {
384 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; 286 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED;
385 } else { 287 } else {
386 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; 288 ticl_state_ = TRANSIENT_INVALIDATION_ERROR;
387 } 289 }
388 EmitStateChange(); 290 EmitStateChange();
389 } 291 }
390 292
293 void SyncInvalidationListener::Acknowledge(
294 const invalidation::ObjectId& id,
295 const syncer::AckHandle& handle) {
296 UnackedInvalidationsMap::iterator lookup =
297 unacked_invalidations_map_.find(id);
298 if (lookup == unacked_invalidations_map_.end()) {
299 DLOG(WARNING) << "Received acknowledgement for untracked object ID";
300 return;
301 }
302 lookup->second.Acknowledge(handle);
303 invalidation_state_tracker_.Call(
304 FROM_HERE,
305 &InvalidationStateTracker::SetSavedInvalidations,
306 unacked_invalidations_map_);
307 }
308
309 void SyncInvalidationListener::Drop(
310 const invalidation::ObjectId& id,
311 const syncer::AckHandle& handle) {
312 UnackedInvalidationsMap::iterator lookup =
313 unacked_invalidations_map_.find(id);
314 if (lookup == unacked_invalidations_map_.end()) {
315 DLOG(WARNING) << "Received drop for untracked object ID";
316 return;
317 }
318 lookup->second.Drop(handle);
319 invalidation_state_tracker_.Call(
320 FROM_HERE,
321 &InvalidationStateTracker::SetSavedInvalidations,
322 unacked_invalidations_map_);
323 }
324
391 void SyncInvalidationListener::WriteState(const std::string& state) { 325 void SyncInvalidationListener::WriteState(const std::string& state) {
392 DCHECK(CalledOnValidThread()); 326 DCHECK(CalledOnValidThread());
393 DVLOG(1) << "WriteState"; 327 DVLOG(1) << "WriteState";
394 invalidation_state_tracker_.Call( 328 invalidation_state_tracker_.Call(
395 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); 329 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state);
396 } 330 }
397 331
398 void SyncInvalidationListener::DoRegistrationUpdate() { 332 void SyncInvalidationListener::DoRegistrationUpdate() {
399 DCHECK(CalledOnValidThread()); 333 DCHECK(CalledOnValidThread());
400 const ObjectIdSet& unregistered_ids = 334 const ObjectIdSet& unregistered_ids =
401 registration_manager_->UpdateRegisteredIds(registered_ids_); 335 registration_manager_->UpdateRegisteredIds(registered_ids_);
402 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); 336 for (ObjectIdSet::iterator it = unregistered_ids.begin();
403 it != unregistered_ids.end(); ++it) { 337 it != unregistered_ids.end(); ++it) {
404 invalidation_state_map_.erase(*it); 338 unacked_invalidations_map_.erase(*it);
405 } 339 }
406 invalidation_state_tracker_.Call( 340 invalidation_state_tracker_.Call(
407 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); 341 FROM_HERE,
408 ack_tracker_.Ack(unregistered_ids); 342 &InvalidationStateTracker::SetSavedInvalidations,
343 unacked_invalidations_map_);
344
345 ObjectIdInvalidationMap object_id_invalidation_map;
346 for (UnackedInvalidationsMap::iterator map_it =
347 unacked_invalidations_map_.begin();
348 map_it != unacked_invalidations_map_.end(); ++map_it) {
349 if (registered_ids_.find(map_it->first) == registered_ids_.end()) {
350 continue;
351 }
352 map_it->second.ExportInvalidations(
353 GetThisAsAckHandler(),
354 &object_id_invalidation_map);
355 }
356 EmitInvalidations(object_id_invalidation_map);
409 } 357 }
410 358
411 void SyncInvalidationListener::StopForTest() { 359 void SyncInvalidationListener::StopForTest() {
412 DCHECK(CalledOnValidThread()); 360 DCHECK(CalledOnValidThread());
413 Stop(); 361 Stop();
414 } 362 }
415 363
416 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const {
417 DCHECK(CalledOnValidThread());
418 return invalidation_state_map_;
419 }
420
421 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() {
422 return &ack_tracker_;
423 }
424
425 void SyncInvalidationListener::Stop() { 364 void SyncInvalidationListener::Stop() {
426 DCHECK(CalledOnValidThread()); 365 DCHECK(CalledOnValidThread());
427 if (!invalidation_client_) { 366 if (!invalidation_client_) {
428 return; 367 return;
429 } 368 }
430 369
431 ack_tracker_.Clear();
432
433 registration_manager_.reset(); 370 registration_manager_.reset();
434 sync_system_resources_.Stop(); 371 sync_system_resources_.Stop();
435 invalidation_client_->Stop(); 372 invalidation_client_->Stop();
436 373
437 invalidation_client_.reset(); 374 invalidation_client_.reset();
438 delegate_ = NULL; 375 delegate_ = NULL;
439 376
440 invalidation_state_tracker_.Reset();
441 invalidation_state_map_.clear();
442 ticl_state_ = DEFAULT_INVALIDATION_ERROR; 377 ticl_state_ = DEFAULT_INVALIDATION_ERROR;
443 push_client_state_ = DEFAULT_INVALIDATION_ERROR; 378 push_client_state_ = DEFAULT_INVALIDATION_ERROR;
444 } 379 }
445 380
446 InvalidatorState SyncInvalidationListener::GetState() const { 381 InvalidatorState SyncInvalidationListener::GetState() const {
447 DCHECK(CalledOnValidThread()); 382 DCHECK(CalledOnValidThread());
448 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || 383 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED ||
449 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { 384 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) {
450 // If either the ticl or the push client rejected our credentials, 385 // If either the ticl or the push client rejected our credentials,
451 // return INVALIDATION_CREDENTIALS_REJECTED. 386 // return INVALIDATION_CREDENTIALS_REJECTED.
452 return INVALIDATION_CREDENTIALS_REJECTED; 387 return INVALIDATION_CREDENTIALS_REJECTED;
453 } 388 }
454 if (ticl_state_ == INVALIDATIONS_ENABLED && 389 if (ticl_state_ == INVALIDATIONS_ENABLED &&
455 push_client_state_ == INVALIDATIONS_ENABLED) { 390 push_client_state_ == INVALIDATIONS_ENABLED) {
456 // If the ticl is ready and the push client notifications are 391 // If the ticl is ready and the push client notifications are
457 // enabled, return INVALIDATIONS_ENABLED. 392 // enabled, return INVALIDATIONS_ENABLED.
458 return INVALIDATIONS_ENABLED; 393 return INVALIDATIONS_ENABLED;
459 } 394 }
460 // Otherwise, we have a transient error. 395 // Otherwise, we have a transient error.
461 return TRANSIENT_INVALIDATION_ERROR; 396 return TRANSIENT_INVALIDATION_ERROR;
462 } 397 }
463 398
464 void SyncInvalidationListener::EmitStateChange() { 399 void SyncInvalidationListener::EmitStateChange() {
465 DCHECK(CalledOnValidThread()); 400 DCHECK(CalledOnValidThread());
466 delegate_->OnInvalidatorStateChange(GetState()); 401 delegate_->OnInvalidatorStateChange(GetState());
467 } 402 }
468 403
404 WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() {
405 DCHECK(CalledOnValidThread());
406 return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr());
407 }
408
469 void SyncInvalidationListener::OnNotificationsEnabled() { 409 void SyncInvalidationListener::OnNotificationsEnabled() {
470 DCHECK(CalledOnValidThread()); 410 DCHECK(CalledOnValidThread());
471 push_client_state_ = INVALIDATIONS_ENABLED; 411 push_client_state_ = INVALIDATIONS_ENABLED;
472 EmitStateChange(); 412 EmitStateChange();
473 } 413 }
474 414
475 void SyncInvalidationListener::OnNotificationsDisabled( 415 void SyncInvalidationListener::OnNotificationsDisabled(
476 notifier::NotificationsDisabledReason reason) { 416 notifier::NotificationsDisabledReason reason) {
477 DCHECK(CalledOnValidThread()); 417 DCHECK(CalledOnValidThread());
478 push_client_state_ = FromNotifierReason(reason); 418 push_client_state_ = FromNotifierReason(reason);
479 EmitStateChange(); 419 EmitStateChange();
480 } 420 }
481 421
482 void SyncInvalidationListener::OnIncomingNotification( 422 void SyncInvalidationListener::OnIncomingNotification(
483 const notifier::Notification& notification) { 423 const notifier::Notification& notification) {
484 DCHECK(CalledOnValidThread()); 424 DCHECK(CalledOnValidThread());
485 // Do nothing, since this is already handled by |invalidation_client_|. 425 // Do nothing, since this is already handled by |invalidation_client_|.
486 } 426 }
487 427
488 } // namespace syncer 428 } // namespace syncer
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698