OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "sync/notifier/sync_invalidation_listener.h" | 5 #include "sync/notifier/sync_invalidation_listener.h" |
6 | 6 |
7 #include <vector> | 7 #include <vector> |
8 | 8 |
9 #include "base/bind.h" | 9 #include "base/bind.h" |
10 #include "base/callback.h" | 10 #include "base/callback.h" |
11 #include "base/compiler_specific.h" | 11 #include "base/compiler_specific.h" |
12 #include "base/logging.h" | 12 #include "base/logging.h" |
13 #include "base/tracked_objects.h" | 13 #include "base/tracked_objects.h" |
14 #include "google/cacheinvalidation/include/invalidation-client.h" | 14 #include "google/cacheinvalidation/include/invalidation-client.h" |
15 #include "google/cacheinvalidation/include/types.h" | 15 #include "google/cacheinvalidation/include/types.h" |
16 #include "google/cacheinvalidation/types.pb.h" | 16 #include "google/cacheinvalidation/types.pb.h" |
17 #include "jingle/notifier/listener/push_client.h" | 17 #include "jingle/notifier/listener/push_client.h" |
18 #include "sync/notifier/invalidation_util.h" | 18 #include "sync/notifier/invalidation_util.h" |
19 #include "sync/notifier/object_id_invalidation_map.h" | |
19 #include "sync/notifier/registration_manager.h" | 20 #include "sync/notifier/registration_manager.h" |
20 | 21 |
21 namespace { | 22 namespace { |
22 | 23 |
23 const char kApplicationName[] = "chrome-sync"; | 24 const char kApplicationName[] = "chrome-sync"; |
24 | 25 |
25 static const int64 kUnknownVersion = -1; | |
26 | |
27 } // namespace | 26 } // namespace |
28 | 27 |
29 namespace syncer { | 28 namespace syncer { |
30 | 29 |
31 SyncInvalidationListener::Delegate::~Delegate() {} | 30 SyncInvalidationListener::Delegate::~Delegate() {} |
32 | 31 |
33 SyncInvalidationListener::SyncInvalidationListener( | 32 SyncInvalidationListener::SyncInvalidationListener( |
34 base::TickClock* tick_clock, | |
35 scoped_ptr<notifier::PushClient> push_client) | 33 scoped_ptr<notifier::PushClient> push_client) |
36 : ack_tracker_(tick_clock, this), | 34 : push_client_(push_client.get()), |
akalin
2013/11/21 04:31:36
Should not have .get(). Otherwise, you'd have two
rlarocque
2013/11/21 20:09:27
This hasn't changed in a long time. I think this
| |
37 push_client_(push_client.get()), | |
38 sync_system_resources_(push_client.Pass(), this), | 35 sync_system_resources_(push_client.Pass(), this), |
39 delegate_(NULL), | 36 delegate_(NULL), |
40 ticl_state_(DEFAULT_INVALIDATION_ERROR), | 37 ticl_state_(DEFAULT_INVALIDATION_ERROR), |
41 push_client_state_(DEFAULT_INVALIDATION_ERROR), | 38 push_client_state_(DEFAULT_INVALIDATION_ERROR), |
42 weak_ptr_factory_(this) { | 39 weak_ptr_factory_(this) { |
43 DCHECK(CalledOnValidThread()); | 40 DCHECK(CalledOnValidThread()); |
44 push_client_->AddObserver(this); | 41 push_client_->AddObserver(this); |
45 } | 42 } |
46 | 43 |
47 SyncInvalidationListener::~SyncInvalidationListener() { | 44 SyncInvalidationListener::~SyncInvalidationListener() { |
48 DCHECK(CalledOnValidThread()); | 45 DCHECK(CalledOnValidThread()); |
49 push_client_->RemoveObserver(this); | 46 push_client_->RemoveObserver(this); |
50 Stop(); | 47 Stop(); |
51 DCHECK(!delegate_); | 48 DCHECK(!delegate_); |
52 } | 49 } |
53 | 50 |
54 void SyncInvalidationListener::Start( | 51 void SyncInvalidationListener::Start( |
55 const CreateInvalidationClientCallback& | 52 const CreateInvalidationClientCallback& |
56 create_invalidation_client_callback, | 53 create_invalidation_client_callback, |
57 const std::string& client_id, const std::string& client_info, | 54 const std::string& client_id, const std::string& client_info, |
58 const std::string& invalidation_bootstrap_data, | 55 const std::string& invalidation_bootstrap_data, |
59 const InvalidationStateMap& initial_invalidation_state_map, | 56 const UnackedInvalidationsMap& initial_unacked_invalidations, |
60 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, | 57 const WeakHandle<InvalidationStateTracker>& invalidation_state_tracker, |
61 Delegate* delegate) { | 58 Delegate* delegate) { |
62 DCHECK(CalledOnValidThread()); | 59 DCHECK(CalledOnValidThread()); |
63 Stop(); | 60 Stop(); |
64 | 61 |
65 sync_system_resources_.set_platform(client_info); | 62 sync_system_resources_.set_platform(client_info); |
66 sync_system_resources_.Start(); | 63 sync_system_resources_.Start(); |
67 | 64 |
68 // The Storage resource is implemented as a write-through cache. We populate | 65 // The Storage resource is implemented as a write-through cache. We populate |
69 // it with the initial state on startup, so subsequent writes go to disk and | 66 // it with the initial state on startup, so subsequent writes go to disk and |
70 // update the in-memory cache, while reads just return the cached state. | 67 // update the in-memory cache, while reads just return the cached state. |
71 sync_system_resources_.storage()->SetInitialState( | 68 sync_system_resources_.storage()->SetInitialState( |
72 invalidation_bootstrap_data); | 69 invalidation_bootstrap_data); |
73 | 70 |
74 invalidation_state_map_ = initial_invalidation_state_map; | 71 unacked_invalidations_map_ = initial_unacked_invalidations; |
75 if (invalidation_state_map_.empty()) { | |
76 DVLOG(2) << "No initial max invalidation versions for any id"; | |
77 } else { | |
78 for (InvalidationStateMap::const_iterator it = | |
79 invalidation_state_map_.begin(); | |
80 it != invalidation_state_map_.end(); ++it) { | |
81 DVLOG(2) << "Initial max invalidation version for " | |
82 << ObjectIdToString(it->first) << " is " | |
83 << it->second.version; | |
84 } | |
85 } | |
86 invalidation_state_tracker_ = invalidation_state_tracker; | 72 invalidation_state_tracker_ = invalidation_state_tracker; |
87 DCHECK(invalidation_state_tracker_.IsInitialized()); | 73 DCHECK(invalidation_state_tracker_.IsInitialized()); |
88 | 74 |
89 DCHECK(!delegate_); | 75 DCHECK(!delegate_); |
90 DCHECK(delegate); | 76 DCHECK(delegate); |
91 delegate_ = delegate; | 77 delegate_ = delegate; |
92 | 78 |
93 #if defined(OS_IOS) | 79 #if defined(OS_IOS) |
94 int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS; | 80 int client_type = ipc::invalidation::ClientType::CHROME_SYNC_IOS; |
95 #else | 81 #else |
96 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; | 82 int client_type = ipc::invalidation::ClientType::CHROME_SYNC; |
97 #endif | 83 #endif |
98 invalidation_client_.reset( | 84 invalidation_client_.reset( |
99 create_invalidation_client_callback.Run( | 85 create_invalidation_client_callback.Run( |
100 &sync_system_resources_, client_type, client_id, | 86 &sync_system_resources_, client_type, client_id, |
101 kApplicationName, this)); | 87 kApplicationName, this)); |
102 invalidation_client_->Start(); | 88 invalidation_client_->Start(); |
103 | 89 |
104 registration_manager_.reset( | 90 registration_manager_.reset( |
105 new RegistrationManager(invalidation_client_.get())); | 91 new RegistrationManager(invalidation_client_.get())); |
106 | |
107 // Set up reminders for any invalidations that have not been locally | |
108 // acknowledged. | |
109 ObjectIdSet unacknowledged_ids; | |
110 for (InvalidationStateMap::const_iterator it = | |
111 invalidation_state_map_.begin(); | |
112 it != invalidation_state_map_.end(); ++it) { | |
113 if (it->second.expected.Equals(it->second.current)) | |
114 continue; | |
115 unacknowledged_ids.insert(it->first); | |
116 } | |
117 if (!unacknowledged_ids.empty()) | |
118 ack_tracker_.Track(unacknowledged_ids); | |
119 } | 92 } |
120 | 93 |
121 void SyncInvalidationListener::UpdateCredentials( | 94 void SyncInvalidationListener::UpdateCredentials( |
122 const std::string& email, const std::string& token) { | 95 const std::string& email, const std::string& token) { |
123 DCHECK(CalledOnValidThread()); | 96 DCHECK(CalledOnValidThread()); |
124 sync_system_resources_.network()->UpdateCredentials(email, token); | 97 sync_system_resources_.network()->UpdateCredentials(email, token); |
125 } | 98 } |
126 | 99 |
127 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { | 100 void SyncInvalidationListener::UpdateRegisteredIds(const ObjectIdSet& ids) { |
128 DCHECK(CalledOnValidThread()); | 101 DCHECK(CalledOnValidThread()); |
129 registered_ids_ = ids; | 102 registered_ids_ = ids; |
130 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a | 103 // |ticl_state_| can go to INVALIDATIONS_ENABLED even without a |
131 // working XMPP connection (as observed by us), so check it instead | 104 // working XMPP connection (as observed by us), so check it instead |
132 // of GetState() (see http://crbug.com/139424). | 105 // of GetState() (see http://crbug.com/139424). |
133 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { | 106 if (ticl_state_ == INVALIDATIONS_ENABLED && registration_manager_) { |
134 DoRegistrationUpdate(); | 107 DoRegistrationUpdate(); |
135 } | 108 } |
136 } | 109 } |
137 | 110 |
138 void SyncInvalidationListener::Acknowledge(const invalidation::ObjectId& id, | |
139 const AckHandle& ack_handle) { | |
140 DCHECK(CalledOnValidThread()); | |
141 InvalidationStateMap::iterator state_it = invalidation_state_map_.find(id); | |
142 if (state_it == invalidation_state_map_.end()) | |
143 return; | |
144 invalidation_state_tracker_.Call( | |
145 FROM_HERE, | |
146 &InvalidationStateTracker::Acknowledge, | |
147 id, | |
148 ack_handle); | |
149 state_it->second.current = ack_handle; | |
150 if (state_it->second.expected.Equals(ack_handle)) { | |
151 // If the received ack matches the expected ack, then we no longer need to | |
152 // keep track of |id| since it is up-to-date. | |
153 ObjectIdSet ids; | |
154 ids.insert(id); | |
155 ack_tracker_.Ack(ids); | |
156 } | |
157 } | |
158 | |
159 void SyncInvalidationListener::Ready( | 111 void SyncInvalidationListener::Ready( |
160 invalidation::InvalidationClient* client) { | 112 invalidation::InvalidationClient* client) { |
161 DCHECK(CalledOnValidThread()); | 113 DCHECK(CalledOnValidThread()); |
162 DCHECK_EQ(client, invalidation_client_.get()); | 114 DCHECK_EQ(client, invalidation_client_.get()); |
163 ticl_state_ = INVALIDATIONS_ENABLED; | 115 ticl_state_ = INVALIDATIONS_ENABLED; |
164 EmitStateChange(); | 116 EmitStateChange(); |
165 DoRegistrationUpdate(); | 117 DoRegistrationUpdate(); |
166 } | 118 } |
167 | 119 |
168 void SyncInvalidationListener::Invalidate( | 120 void SyncInvalidationListener::Invalidate( |
169 invalidation::InvalidationClient* client, | 121 invalidation::InvalidationClient* client, |
170 const invalidation::Invalidation& invalidation, | 122 const invalidation::Invalidation& invalidation, |
171 const invalidation::AckHandle& ack_handle) { | 123 const invalidation::AckHandle& ack_handle) { |
172 DCHECK(CalledOnValidThread()); | 124 DCHECK(CalledOnValidThread()); |
173 DCHECK_EQ(client, invalidation_client_.get()); | 125 DCHECK_EQ(client, invalidation_client_.get()); |
174 DVLOG(1) << "Invalidate: " << InvalidationToString(invalidation); | 126 client->Acknowledge(ack_handle); |
175 | 127 |
176 const invalidation::ObjectId& id = invalidation.object_id(); | 128 const invalidation::ObjectId& id = invalidation.object_id(); |
177 | 129 |
178 // The invalidation API spec allows for the possibility of redundant | |
179 // invalidations, so keep track of the max versions and drop | |
180 // invalidations with old versions. | |
181 // | |
182 // TODO(akalin): Now that we keep track of registered ids, we | |
183 // should drop invalidations for unregistered ids. We may also | |
184 // have to filter it at a higher level, as invalidations for | |
185 // newly-unregistered ids may already be in flight. | |
186 InvalidationStateMap::const_iterator it = invalidation_state_map_.find(id); | |
187 if ((it != invalidation_state_map_.end()) && | |
188 (invalidation.version() <= it->second.version)) { | |
189 // Drop redundant invalidations. | |
190 client->Acknowledge(ack_handle); | |
191 return; | |
192 } | |
193 | |
194 std::string payload; | 130 std::string payload; |
195 // payload() CHECK()'s has_payload(), so we must check it ourselves first. | 131 // payload() CHECK()'s has_payload(), so we must check it ourselves first. |
196 if (invalidation.has_payload()) | 132 if (invalidation.has_payload()) |
197 payload = invalidation.payload(); | 133 payload = invalidation.payload(); |
198 | 134 |
199 DVLOG(2) << "Setting max invalidation version for " << ObjectIdToString(id) | 135 DVLOG(2) << "Received invalidation with version " << invalidation.version() |
200 << " to " << invalidation.version(); | 136 << " for " << ObjectIdToString(id); |
201 invalidation_state_map_[id].version = invalidation.version(); | |
202 invalidation_state_map_[id].payload = payload; | |
203 invalidation_state_tracker_.Call( | |
204 FROM_HERE, | |
205 &InvalidationStateTracker::SetMaxVersionAndPayload, | |
206 id, invalidation.version(), payload); | |
207 | 137 |
208 ObjectIdSet ids; | 138 ObjectIdInvalidationMap invalidations; |
209 ids.insert(id); | 139 Invalidation inv = Invalidation::Init(id, invalidation.version(), payload); |
210 PrepareInvalidation(ids, invalidation.version(), payload, client, ack_handle); | 140 inv.set_ack_handler(GetThisAsAckHandler()); |
141 invalidations.Insert(inv); | |
142 | |
143 DispatchInvalidations(invalidations); | |
211 } | 144 } |
212 | 145 |
213 void SyncInvalidationListener::InvalidateUnknownVersion( | 146 void SyncInvalidationListener::InvalidateUnknownVersion( |
214 invalidation::InvalidationClient* client, | 147 invalidation::InvalidationClient* client, |
215 const invalidation::ObjectId& object_id, | 148 const invalidation::ObjectId& object_id, |
216 const invalidation::AckHandle& ack_handle) { | 149 const invalidation::AckHandle& ack_handle) { |
217 DCHECK(CalledOnValidThread()); | 150 DCHECK(CalledOnValidThread()); |
218 DCHECK_EQ(client, invalidation_client_.get()); | 151 DCHECK_EQ(client, invalidation_client_.get()); |
219 DVLOG(1) << "InvalidateUnknownVersion"; | 152 DVLOG(1) << "InvalidateUnknownVersion"; |
153 client->Acknowledge(ack_handle); | |
220 | 154 |
221 ObjectIdSet ids; | 155 ObjectIdInvalidationMap invalidations; |
222 ids.insert(object_id); | 156 Invalidation unknown_version = Invalidation::InitUnknownVersion(object_id); |
223 PrepareInvalidation( | 157 unknown_version.set_ack_handler(GetThisAsAckHandler()); |
224 ids, | 158 invalidations.Insert(unknown_version); |
225 kUnknownVersion, | 159 |
226 std::string(), | 160 DispatchInvalidations(invalidations); |
227 client, | |
228 ack_handle); | |
229 } | 161 } |
230 | 162 |
231 // This should behave as if we got an invalidation with version | 163 // This should behave as if we got an invalidation with version |
232 // UNKNOWN_OBJECT_VERSION for all known data types. | 164 // UNKNOWN_OBJECT_VERSION for all known data types. |
233 void SyncInvalidationListener::InvalidateAll( | 165 void SyncInvalidationListener::InvalidateAll( |
234 invalidation::InvalidationClient* client, | 166 invalidation::InvalidationClient* client, |
235 const invalidation::AckHandle& ack_handle) { | 167 const invalidation::AckHandle& ack_handle) { |
236 DCHECK(CalledOnValidThread()); | 168 DCHECK(CalledOnValidThread()); |
237 DCHECK_EQ(client, invalidation_client_.get()); | 169 DCHECK_EQ(client, invalidation_client_.get()); |
238 DVLOG(1) << "InvalidateAll"; | 170 DVLOG(1) << "InvalidateAll"; |
171 client->Acknowledge(ack_handle); | |
239 | 172 |
240 PrepareInvalidation( | 173 ObjectIdInvalidationMap invalidations; |
241 registered_ids_, | 174 for (ObjectIdSet::iterator it = registered_ids_.begin(); |
242 kUnknownVersion, | 175 it != registered_ids_.end(); ++it) { |
243 std::string(), | 176 Invalidation unknown_version = Invalidation::InitUnknownVersion(*it); |
244 client, | 177 unknown_version.set_ack_handler(GetThisAsAckHandler()); |
245 ack_handle); | 178 invalidations.Insert(unknown_version); |
179 } | |
180 | |
181 DispatchInvalidations(invalidations); | |
246 } | 182 } |
247 | 183 |
248 void SyncInvalidationListener::PrepareInvalidation( | 184 // If a handler is registered, emit right away. Otherwise, save it for later. |
249 const ObjectIdSet& ids, | 185 void SyncInvalidationListener::DispatchInvalidations( |
250 int64 version, | 186 const ObjectIdInvalidationMap& invalidations) { |
251 const std::string& payload, | |
252 invalidation::InvalidationClient* client, | |
253 const invalidation::AckHandle& ack_handle) { | |
254 DCHECK(CalledOnValidThread()); | 187 DCHECK(CalledOnValidThread()); |
255 | 188 |
256 // A server invalidation resets the local retry count. | 189 ObjectIdInvalidationMap to_save = invalidations; |
257 ack_tracker_.Ack(ids); | 190 ObjectIdInvalidationMap to_emit = |
191 invalidations.GetSubsetWithObjectIds(registered_ids_); | |
192 | |
193 SaveInvalidations(to_save); | |
194 EmitInvalidations(to_emit); | |
195 } | |
196 | |
197 void SyncInvalidationListener::SaveInvalidations( | |
198 const ObjectIdInvalidationMap& to_save) { | |
199 ObjectIdSet objects_to_save = to_save.GetObjectIds(); | |
200 for (ObjectIdSet::const_iterator it = objects_to_save.begin(); | |
201 it != objects_to_save.end(); ++it) { | |
202 UnackedInvalidationsMap::iterator lookup = | |
203 unacked_invalidations_map_.find(*it); | |
204 if (lookup == unacked_invalidations_map_.end()) { | |
205 lookup = unacked_invalidations_map_.insert( | |
206 std::make_pair(*it, UnackedInvalidationSet(*it))).first; | |
207 } | |
208 lookup->second.AddSet(to_save.ForObject(*it)); | |
209 } | |
210 | |
258 invalidation_state_tracker_.Call( | 211 invalidation_state_tracker_.Call( |
259 FROM_HERE, | 212 FROM_HERE, |
260 &InvalidationStateTracker::GenerateAckHandles, | 213 &InvalidationStateTracker::SetSavedInvalidations, |
261 ids, | 214 unacked_invalidations_map_); |
262 base::MessageLoopProxy::current(), | |
263 base::Bind(&SyncInvalidationListener::EmitInvalidation, | |
264 weak_ptr_factory_.GetWeakPtr(), | |
265 ids, | |
266 version, | |
267 payload, | |
268 client, | |
269 ack_handle)); | |
270 } | 215 } |
271 | 216 |
272 void SyncInvalidationListener::EmitInvalidation( | 217 void SyncInvalidationListener::EmitInvalidations( |
273 const ObjectIdSet& ids, | 218 const ObjectIdInvalidationMap& to_emit) { |
274 int64 version, | 219 DVLOG(2) << "Emitting invalidations: " << to_emit.ToString(); |
275 const std::string& payload, | 220 delegate_->OnInvalidate(to_emit); |
276 invalidation::InvalidationClient* client, | |
277 const invalidation::AckHandle& ack_handle, | |
278 const AckHandleMap& local_ack_handles) { | |
279 DCHECK(CalledOnValidThread()); | |
280 | |
281 ObjectIdInvalidationMap invalidation_map; | |
282 for (AckHandleMap::const_iterator it = local_ack_handles.begin(); | |
283 it != local_ack_handles.end(); ++it) { | |
284 // Update in-memory copy of the invalidation state. | |
285 invalidation_state_map_[it->first].expected = it->second; | |
286 | |
287 if (version == kUnknownVersion) { | |
288 Invalidation inv = Invalidation::InitUnknownVersion(it->first); | |
289 inv.set_ack_handle(it->second); | |
290 invalidation_map.Insert(inv); | |
291 } else { | |
292 Invalidation inv = Invalidation::Init(it->first, version, payload); | |
293 inv.set_ack_handle(it->second); | |
294 invalidation_map.Insert(inv); | |
295 } | |
296 } | |
297 ack_tracker_.Track(ids); | |
298 delegate_->OnInvalidate(invalidation_map); | |
299 client->Acknowledge(ack_handle); | |
300 } | |
301 | |
302 void SyncInvalidationListener::OnTimeout(const ObjectIdSet& ids) { | |
303 ObjectIdInvalidationMap invalidation_map; | |
304 for (ObjectIdSet::const_iterator it = ids.begin(); it != ids.end(); ++it) { | |
305 if (invalidation_state_map_[*it].version == kUnknownVersion) { | |
306 Invalidation inv = Invalidation::InitUnknownVersion(*it); | |
307 inv.set_ack_handle(invalidation_state_map_[*it].expected); | |
308 invalidation_map.Insert(inv); | |
309 } else { | |
310 Invalidation inv = Invalidation::Init( | |
311 *it, | |
312 invalidation_state_map_[*it].version, | |
313 invalidation_state_map_[*it].payload); | |
314 inv.set_ack_handle(invalidation_state_map_[*it].expected); | |
315 invalidation_map.Insert(inv); | |
316 } | |
317 } | |
318 delegate_->OnInvalidate(invalidation_map); | |
319 } | 221 } |
320 | 222 |
321 void SyncInvalidationListener::InformRegistrationStatus( | 223 void SyncInvalidationListener::InformRegistrationStatus( |
322 invalidation::InvalidationClient* client, | 224 invalidation::InvalidationClient* client, |
323 const invalidation::ObjectId& object_id, | 225 const invalidation::ObjectId& object_id, |
324 InvalidationListener::RegistrationState new_state) { | 226 InvalidationListener::RegistrationState new_state) { |
325 DCHECK(CalledOnValidThread()); | 227 DCHECK(CalledOnValidThread()); |
326 DCHECK_EQ(client, invalidation_client_.get()); | 228 DCHECK_EQ(client, invalidation_client_.get()); |
327 DVLOG(1) << "InformRegistrationStatus: " | 229 DVLOG(1) << "InformRegistrationStatus: " |
328 << ObjectIdToString(object_id) << " " << new_state; | 230 << ObjectIdToString(object_id) << " " << new_state; |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
381 << error_info.error_message() | 283 << error_info.error_message() |
382 << " (transient = " << error_info.is_transient() << ")"; | 284 << " (transient = " << error_info.is_transient() << ")"; |
383 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { | 285 if (error_info.error_reason() == invalidation::ErrorReason::AUTH_FAILURE) { |
384 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; | 286 ticl_state_ = INVALIDATION_CREDENTIALS_REJECTED; |
385 } else { | 287 } else { |
386 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; | 288 ticl_state_ = TRANSIENT_INVALIDATION_ERROR; |
387 } | 289 } |
388 EmitStateChange(); | 290 EmitStateChange(); |
389 } | 291 } |
390 | 292 |
293 void SyncInvalidationListener::Acknowledge( | |
294 const invalidation::ObjectId& id, | |
295 const syncer::AckHandle& handle) { | |
296 UnackedInvalidationsMap::iterator lookup = | |
297 unacked_invalidations_map_.find(id); | |
298 if (lookup == unacked_invalidations_map_.end()) { | |
299 DLOG(WARNING) << "Received acknowledgement for untracked object ID"; | |
300 return; | |
301 } | |
302 lookup->second.Acknowledge(handle); | |
303 invalidation_state_tracker_.Call( | |
304 FROM_HERE, | |
305 &InvalidationStateTracker::SetSavedInvalidations, | |
306 unacked_invalidations_map_); | |
307 } | |
308 | |
309 void SyncInvalidationListener::Drop( | |
310 const invalidation::ObjectId& id, | |
311 const syncer::AckHandle& handle) { | |
312 UnackedInvalidationsMap::iterator lookup = | |
313 unacked_invalidations_map_.find(id); | |
314 if (lookup == unacked_invalidations_map_.end()) { | |
315 DLOG(WARNING) << "Received drop for untracked object ID"; | |
316 return; | |
317 } | |
318 lookup->second.Drop(handle); | |
319 invalidation_state_tracker_.Call( | |
320 FROM_HERE, | |
321 &InvalidationStateTracker::SetSavedInvalidations, | |
322 unacked_invalidations_map_); | |
323 } | |
324 | |
391 void SyncInvalidationListener::WriteState(const std::string& state) { | 325 void SyncInvalidationListener::WriteState(const std::string& state) { |
392 DCHECK(CalledOnValidThread()); | 326 DCHECK(CalledOnValidThread()); |
393 DVLOG(1) << "WriteState"; | 327 DVLOG(1) << "WriteState"; |
394 invalidation_state_tracker_.Call( | 328 invalidation_state_tracker_.Call( |
395 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); | 329 FROM_HERE, &InvalidationStateTracker::SetBootstrapData, state); |
396 } | 330 } |
397 | 331 |
398 void SyncInvalidationListener::DoRegistrationUpdate() { | 332 void SyncInvalidationListener::DoRegistrationUpdate() { |
399 DCHECK(CalledOnValidThread()); | 333 DCHECK(CalledOnValidThread()); |
400 const ObjectIdSet& unregistered_ids = | 334 const ObjectIdSet& unregistered_ids = |
401 registration_manager_->UpdateRegisteredIds(registered_ids_); | 335 registration_manager_->UpdateRegisteredIds(registered_ids_); |
402 for (ObjectIdSet::const_iterator it = unregistered_ids.begin(); | 336 for (ObjectIdSet::iterator it = unregistered_ids.begin(); |
403 it != unregistered_ids.end(); ++it) { | 337 it != unregistered_ids.end(); ++it) { |
404 invalidation_state_map_.erase(*it); | 338 unacked_invalidations_map_.erase(*it); |
405 } | 339 } |
406 invalidation_state_tracker_.Call( | 340 invalidation_state_tracker_.Call( |
407 FROM_HERE, &InvalidationStateTracker::Forget, unregistered_ids); | 341 FROM_HERE, |
408 ack_tracker_.Ack(unregistered_ids); | 342 &InvalidationStateTracker::SetSavedInvalidations, |
343 unacked_invalidations_map_); | |
344 | |
345 ObjectIdInvalidationMap object_id_invalidation_map; | |
346 for (UnackedInvalidationsMap::iterator map_it = | |
347 unacked_invalidations_map_.begin(); | |
348 map_it != unacked_invalidations_map_.end(); ++map_it) { | |
349 if (registered_ids_.find(map_it->first) == registered_ids_.end()) { | |
350 continue; | |
351 } | |
352 map_it->second.ExportInvalidations( | |
353 GetThisAsAckHandler(), | |
354 &object_id_invalidation_map); | |
355 } | |
356 EmitInvalidations(object_id_invalidation_map); | |
409 } | 357 } |
410 | 358 |
411 void SyncInvalidationListener::StopForTest() { | 359 void SyncInvalidationListener::StopForTest() { |
412 DCHECK(CalledOnValidThread()); | 360 DCHECK(CalledOnValidThread()); |
413 Stop(); | 361 Stop(); |
414 } | 362 } |
415 | 363 |
416 InvalidationStateMap SyncInvalidationListener::GetStateMapForTest() const { | |
417 DCHECK(CalledOnValidThread()); | |
418 return invalidation_state_map_; | |
419 } | |
420 | |
421 AckTracker* SyncInvalidationListener::GetAckTrackerForTest() { | |
422 return &ack_tracker_; | |
423 } | |
424 | |
425 void SyncInvalidationListener::Stop() { | 364 void SyncInvalidationListener::Stop() { |
426 DCHECK(CalledOnValidThread()); | 365 DCHECK(CalledOnValidThread()); |
427 if (!invalidation_client_) { | 366 if (!invalidation_client_) { |
428 return; | 367 return; |
429 } | 368 } |
430 | 369 |
431 ack_tracker_.Clear(); | |
432 | |
433 registration_manager_.reset(); | 370 registration_manager_.reset(); |
434 sync_system_resources_.Stop(); | 371 sync_system_resources_.Stop(); |
435 invalidation_client_->Stop(); | 372 invalidation_client_->Stop(); |
436 | 373 |
437 invalidation_client_.reset(); | 374 invalidation_client_.reset(); |
438 delegate_ = NULL; | 375 delegate_ = NULL; |
439 | 376 |
440 invalidation_state_tracker_.Reset(); | |
441 invalidation_state_map_.clear(); | |
442 ticl_state_ = DEFAULT_INVALIDATION_ERROR; | 377 ticl_state_ = DEFAULT_INVALIDATION_ERROR; |
443 push_client_state_ = DEFAULT_INVALIDATION_ERROR; | 378 push_client_state_ = DEFAULT_INVALIDATION_ERROR; |
444 } | 379 } |
445 | 380 |
446 InvalidatorState SyncInvalidationListener::GetState() const { | 381 InvalidatorState SyncInvalidationListener::GetState() const { |
447 DCHECK(CalledOnValidThread()); | 382 DCHECK(CalledOnValidThread()); |
448 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || | 383 if (ticl_state_ == INVALIDATION_CREDENTIALS_REJECTED || |
449 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { | 384 push_client_state_ == INVALIDATION_CREDENTIALS_REJECTED) { |
450 // If either the ticl or the push client rejected our credentials, | 385 // If either the ticl or the push client rejected our credentials, |
451 // return INVALIDATION_CREDENTIALS_REJECTED. | 386 // return INVALIDATION_CREDENTIALS_REJECTED. |
452 return INVALIDATION_CREDENTIALS_REJECTED; | 387 return INVALIDATION_CREDENTIALS_REJECTED; |
453 } | 388 } |
454 if (ticl_state_ == INVALIDATIONS_ENABLED && | 389 if (ticl_state_ == INVALIDATIONS_ENABLED && |
455 push_client_state_ == INVALIDATIONS_ENABLED) { | 390 push_client_state_ == INVALIDATIONS_ENABLED) { |
456 // If the ticl is ready and the push client notifications are | 391 // If the ticl is ready and the push client notifications are |
457 // enabled, return INVALIDATIONS_ENABLED. | 392 // enabled, return INVALIDATIONS_ENABLED. |
458 return INVALIDATIONS_ENABLED; | 393 return INVALIDATIONS_ENABLED; |
459 } | 394 } |
460 // Otherwise, we have a transient error. | 395 // Otherwise, we have a transient error. |
461 return TRANSIENT_INVALIDATION_ERROR; | 396 return TRANSIENT_INVALIDATION_ERROR; |
462 } | 397 } |
463 | 398 |
464 void SyncInvalidationListener::EmitStateChange() { | 399 void SyncInvalidationListener::EmitStateChange() { |
465 DCHECK(CalledOnValidThread()); | 400 DCHECK(CalledOnValidThread()); |
466 delegate_->OnInvalidatorStateChange(GetState()); | 401 delegate_->OnInvalidatorStateChange(GetState()); |
467 } | 402 } |
468 | 403 |
404 WeakHandle<AckHandler> SyncInvalidationListener::GetThisAsAckHandler() { | |
405 DCHECK(CalledOnValidThread()); | |
406 return WeakHandle<AckHandler>(weak_ptr_factory_.GetWeakPtr()); | |
407 } | |
408 | |
469 void SyncInvalidationListener::OnNotificationsEnabled() { | 409 void SyncInvalidationListener::OnNotificationsEnabled() { |
470 DCHECK(CalledOnValidThread()); | 410 DCHECK(CalledOnValidThread()); |
471 push_client_state_ = INVALIDATIONS_ENABLED; | 411 push_client_state_ = INVALIDATIONS_ENABLED; |
472 EmitStateChange(); | 412 EmitStateChange(); |
473 } | 413 } |
474 | 414 |
475 void SyncInvalidationListener::OnNotificationsDisabled( | 415 void SyncInvalidationListener::OnNotificationsDisabled( |
476 notifier::NotificationsDisabledReason reason) { | 416 notifier::NotificationsDisabledReason reason) { |
477 DCHECK(CalledOnValidThread()); | 417 DCHECK(CalledOnValidThread()); |
478 push_client_state_ = FromNotifierReason(reason); | 418 push_client_state_ = FromNotifierReason(reason); |
479 EmitStateChange(); | 419 EmitStateChange(); |
480 } | 420 } |
481 | 421 |
482 void SyncInvalidationListener::OnIncomingNotification( | 422 void SyncInvalidationListener::OnIncomingNotification( |
483 const notifier::Notification& notification) { | 423 const notifier::Notification& notification) { |
484 DCHECK(CalledOnValidThread()); | 424 DCHECK(CalledOnValidThread()); |
485 // Do nothing, since this is already handled by |invalidation_client_|. | 425 // Do nothing, since this is already handled by |invalidation_client_|. |
486 } | 426 } |
487 | 427 |
488 } // namespace syncer | 428 } // namespace syncer |
OLD | NEW |