OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
10 #include "base/time/clock.h" | 10 #include "base/time/clock.h" |
11 #include "base/time/time.h" | 11 #include "base/time/time.h" |
12 #include "google_apis/gcm/base/mcs_util.h" | 12 #include "google_apis/gcm/base/mcs_util.h" |
13 #include "google_apis/gcm/base/socket_stream.h" | 13 #include "google_apis/gcm/base/socket_stream.h" |
14 #include "google_apis/gcm/engine/connection_factory.h" | 14 #include "google_apis/gcm/engine/connection_factory.h" |
15 #include "google_apis/gcm/engine/rmq_store.h" | |
16 | 15 |
17 using namespace google::protobuf::io; | 16 using namespace google::protobuf::io; |
18 | 17 |
19 namespace gcm { | 18 namespace gcm { |
20 | 19 |
21 namespace { | 20 namespace { |
22 | 21 |
23 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; | 22 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; |
24 | 23 |
25 // The category of messages intended for the GCM client itself from MCS. | 24 // The category of messages intended for the GCM client itself from MCS. |
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
80 MCSProto protobuf; | 79 MCSProto protobuf; |
81 }; | 80 }; |
82 | 81 |
83 ReliablePacketInfo::ReliablePacketInfo() | 82 ReliablePacketInfo::ReliablePacketInfo() |
84 : stream_id(0), tag(0) { | 83 : stream_id(0), tag(0) { |
85 } | 84 } |
86 ReliablePacketInfo::~ReliablePacketInfo() {} | 85 ReliablePacketInfo::~ReliablePacketInfo() {} |
87 | 86 |
88 MCSClient::MCSClient(base::Clock* clock, | 87 MCSClient::MCSClient(base::Clock* clock, |
89 ConnectionFactory* connection_factory, | 88 ConnectionFactory* connection_factory, |
90 RMQStore* rmq_store) | 89 GCMStore* gcm_store) |
91 : clock_(clock), | 90 : clock_(clock), |
92 state_(UNINITIALIZED), | 91 state_(UNINITIALIZED), |
93 android_id_(0), | 92 android_id_(0), |
94 security_token_(0), | 93 security_token_(0), |
95 connection_factory_(connection_factory), | 94 connection_factory_(connection_factory), |
96 connection_handler_(NULL), | 95 connection_handler_(NULL), |
97 last_device_to_server_stream_id_received_(0), | 96 last_device_to_server_stream_id_received_(0), |
98 last_server_to_device_stream_id_received_(0), | 97 last_server_to_device_stream_id_received_(0), |
99 stream_id_out_(0), | 98 stream_id_out_(0), |
100 stream_id_in_(0), | 99 stream_id_in_(0), |
101 rmq_store_(rmq_store), | 100 gcm_store_(gcm_store), |
102 weak_ptr_factory_(this) { | 101 weak_ptr_factory_(this) { |
103 } | 102 } |
104 | 103 |
105 MCSClient::~MCSClient() { | 104 MCSClient::~MCSClient() { |
106 } | 105 } |
107 | 106 |
108 void MCSClient::Initialize( | 107 void MCSClient::Initialize( |
109 const InitializationCompleteCallback& initialization_callback, | 108 const InitializationCompleteCallback& initialization_callback, |
110 const OnMessageReceivedCallback& message_received_callback, | 109 const OnMessageReceivedCallback& message_received_callback, |
111 const OnMessageSentCallback& message_sent_callback, | 110 const OnMessageSentCallback& message_sent_callback, |
112 const RMQStore::LoadResult& load_result) { | 111 const GCMStore::LoadResult& load_result) { |
113 DCHECK_EQ(state_, UNINITIALIZED); | 112 DCHECK_EQ(state_, UNINITIALIZED); |
114 initialization_callback_ = initialization_callback; | 113 initialization_callback_ = initialization_callback; |
115 message_received_callback_ = message_received_callback; | 114 message_received_callback_ = message_received_callback; |
116 message_sent_callback_ = message_sent_callback; | 115 message_sent_callback_ = message_sent_callback; |
117 | 116 |
118 connection_factory_->Initialize( | 117 connection_factory_->Initialize( |
119 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, | 118 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, |
120 weak_ptr_factory_.GetWeakPtr()), | 119 weak_ptr_factory_.GetWeakPtr()), |
121 base::Bind(&MCSClient::HandlePacketFromWire, | 120 base::Bind(&MCSClient::HandlePacketFromWire, |
122 weak_ptr_factory_.GetWeakPtr()), | 121 weak_ptr_factory_.GetWeakPtr()), |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
169 expired_ttl_ids.push_back(iter->first); | 168 expired_ttl_ids.push_back(iter->first); |
170 message_sent_callback_.Run("TTL expired for " + iter->first); | 169 message_sent_callback_.Run("TTL expired for " + iter->first); |
171 delete iter->second; | 170 delete iter->second; |
172 continue; | 171 continue; |
173 } | 172 } |
174 | 173 |
175 ordered_messages[timestamp] = iter->second; | 174 ordered_messages[timestamp] = iter->second; |
176 } | 175 } |
177 | 176 |
178 if (!expired_ttl_ids.empty()) { | 177 if (!expired_ttl_ids.empty()) { |
179 rmq_store_->RemoveOutgoingMessages( | 178 gcm_store_->RemoveOutgoingMessages( |
180 expired_ttl_ids, | 179 expired_ttl_ids, |
181 base::Bind(&MCSClient::OnRMQUpdateFinished, | 180 base::Bind(&MCSClient::OnGCMUpdateFinished, |
182 weak_ptr_factory_.GetWeakPtr())); | 181 weak_ptr_factory_.GetWeakPtr())); |
183 } | 182 } |
184 | 183 |
185 // Now go through and add the outgoing messages to the send queue in their | 184 // Now go through and add the outgoing messages to the send queue in their |
186 // appropriate order (oldest at front, most recent at back). | 185 // appropriate order (oldest at front, most recent at back). |
187 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator | 186 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator |
188 iter = ordered_messages.begin(); | 187 iter = ordered_messages.begin(); |
189 iter != ordered_messages.end(); ++iter) { | 188 iter != ordered_messages.end(); ++iter) { |
190 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 189 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
191 packet_info->protobuf.reset(iter->second); | 190 packet_info->protobuf.reset(iter->second); |
192 packet_info->persistent_id = base::Uint64ToString(iter->first); | 191 packet_info->persistent_id = base::Uint64ToString(iter->first); |
193 to_send_.push_back(make_linked_ptr(packet_info)); | 192 to_send_.push_back(make_linked_ptr(packet_info)); |
194 } | 193 } |
195 | 194 |
196 // TODO(fgorski): that is likely the only place where the initialization | 195 // TODO(fgorski): that is likely the only place where the initialization |
197 // callback could be used. | 196 // callback could be used. |
198 initialization_callback_.Run(true, android_id_, security_token_); | 197 initialization_callback_.Run(true, android_id_, security_token_); |
199 } | 198 } |
200 | 199 |
201 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 200 void MCSClient::Login(uint64 android_id, uint64 security_token) { |
202 if (android_id != android_id_ && security_token != security_token_) { | 201 if (android_id != android_id_ && security_token != security_token_) { |
203 DCHECK(android_id); | 202 DCHECK(android_id); |
204 DCHECK(security_token); | 203 DCHECK(security_token); |
205 DCHECK(restored_unackeds_server_ids_.empty()); | 204 DCHECK(restored_unackeds_server_ids_.empty()); |
206 android_id_ = android_id; | 205 android_id_ = android_id; |
207 security_token_ = security_token; | 206 security_token_ = security_token; |
208 rmq_store_->SetDeviceCredentials( | 207 gcm_store_->SetDeviceCredentials( |
209 android_id_, | 208 android_id_, |
210 security_token_, | 209 security_token_, |
211 base::Bind(&MCSClient::OnRMQUpdateFinished, | 210 base::Bind(&MCSClient::OnGCMUpdateFinished, |
212 weak_ptr_factory_.GetWeakPtr())); | 211 weak_ptr_factory_.GetWeakPtr())); |
213 } | 212 } |
214 | 213 |
215 state_ = CONNECTING; | 214 state_ = CONNECTING; |
216 connection_factory_->Connect(); | 215 connection_factory_->Connect(); |
217 } | 216 } |
218 | 217 |
219 void MCSClient::SendMessage(const MCSMessage& message) { | 218 void MCSClient::SendMessage(const MCSMessage& message) { |
220 int ttl = GetTTL(message.GetProtobuf()); | 219 int ttl = GetTTL(message.GetProtobuf()); |
221 DCHECK_GE(ttl, 0); | 220 DCHECK_GE(ttl, 0); |
222 if (to_send_.size() > kMaxSendQueueSize) { | 221 if (to_send_.size() > kMaxSendQueueSize) { |
223 message_sent_callback_.Run("Message queue full."); | 222 message_sent_callback_.Run("Message queue full."); |
224 return; | 223 return; |
225 } | 224 } |
226 if (message.size() > kMaxMessageBytes) { | 225 if (message.size() > kMaxMessageBytes) { |
227 message_sent_callback_.Run("Message too large."); | 226 message_sent_callback_.Run("Message too large."); |
228 return; | 227 return; |
229 } | 228 } |
230 | 229 |
231 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 230 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
232 packet_info->protobuf = message.CloneProtobuf(); | 231 packet_info->protobuf = message.CloneProtobuf(); |
233 | 232 |
234 if (ttl > 0) { | 233 if (ttl > 0) { |
235 PersistentId persistent_id = GetNextPersistentId(); | 234 PersistentId persistent_id = GetNextPersistentId(); |
236 DVLOG(1) << "Setting persistent id to " << persistent_id; | 235 DVLOG(1) << "Setting persistent id to " << persistent_id; |
237 packet_info->persistent_id = persistent_id; | 236 packet_info->persistent_id = persistent_id; |
238 SetPersistentId(persistent_id, | 237 SetPersistentId(persistent_id, |
239 packet_info->protobuf.get()); | 238 packet_info->protobuf.get()); |
240 rmq_store_->AddOutgoingMessage(persistent_id, | 239 gcm_store_->AddOutgoingMessage(persistent_id, |
241 MCSMessage(message.tag(), | 240 MCSMessage(message.tag(), |
242 *(packet_info->protobuf)), | 241 *(packet_info->protobuf)), |
243 base::Bind(&MCSClient::OnRMQUpdateFinished, | 242 base::Bind(&MCSClient::OnGCMUpdateFinished, |
244 weak_ptr_factory_.GetWeakPtr())); | 243 weak_ptr_factory_.GetWeakPtr())); |
245 } else if (!connection_factory_->IsEndpointReachable()) { | 244 } else if (!connection_factory_->IsEndpointReachable()) { |
246 DVLOG(1) << "No active connection, dropping message."; | 245 DVLOG(1) << "No active connection, dropping message."; |
247 message_sent_callback_.Run("TTL expired"); | 246 message_sent_callback_.Run("TTL expired"); |
248 return; | 247 return; |
249 } | 248 } |
250 to_send_.push_back(make_linked_ptr(packet_info)); | 249 to_send_.push_back(make_linked_ptr(packet_info)); |
251 MaybeSendMessage(); | 250 MaybeSendMessage(); |
252 } | 251 } |
253 | 252 |
254 void MCSClient::Destroy() { | 253 void MCSClient::Destroy() { |
255 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, | 254 gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished, |
256 weak_ptr_factory_.GetWeakPtr())); | 255 weak_ptr_factory_.GetWeakPtr())); |
257 } | 256 } |
258 | 257 |
259 void MCSClient::ResetStateAndBuildLoginRequest( | 258 void MCSClient::ResetStateAndBuildLoginRequest( |
260 mcs_proto::LoginRequest* request) { | 259 mcs_proto::LoginRequest* request) { |
261 DCHECK(android_id_); | 260 DCHECK(android_id_); |
262 DCHECK(security_token_); | 261 DCHECK(security_token_); |
263 stream_id_in_ = 0; | 262 stream_id_in_ = 0; |
264 stream_id_out_ = 1; | 263 stream_id_out_ = 1; |
265 last_device_to_server_stream_id_received_ = 0; | 264 last_device_to_server_stream_id_received_ = 0; |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
318 // message from the persistent store. | 317 // message from the persistent store. |
319 if (!packet->persistent_id.empty()) | 318 if (!packet->persistent_id.empty()) |
320 expired_ttl_ids.push_back(packet->persistent_id); | 319 expired_ttl_ids.push_back(packet->persistent_id); |
321 message_sent_callback_.Run("TTL expired"); | 320 message_sent_callback_.Run("TTL expired"); |
322 } | 321 } |
323 } | 322 } |
324 | 323 |
325 if (!expired_ttl_ids.empty()) { | 324 if (!expired_ttl_ids.empty()) { |
326 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size() | 325 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size() |
327 << " messages expired."; | 326 << " messages expired."; |
328 rmq_store_->RemoveOutgoingMessages( | 327 gcm_store_->RemoveOutgoingMessages( |
329 expired_ttl_ids, | 328 expired_ttl_ids, |
330 base::Bind(&MCSClient::OnRMQUpdateFinished, | 329 base::Bind(&MCSClient::OnGCMUpdateFinished, |
331 weak_ptr_factory_.GetWeakPtr())); | 330 weak_ptr_factory_.GetWeakPtr())); |
332 } | 331 } |
333 | 332 |
334 to_send_.swap(new_to_send); | 333 to_send_.swap(new_to_send); |
335 | 334 |
336 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() | 335 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
337 << " incoming acks pending, and " << to_send_.size() | 336 << " incoming acks pending, and " << to_send_.size() |
338 << " pending outgoing messages."; | 337 << " pending outgoing messages."; |
339 | 338 |
340 state_ = CONNECTING; | 339 state_ = CONNECTING; |
341 } | 340 } |
342 | 341 |
343 void MCSClient::SendHeartbeat() { | 342 void MCSClient::SendHeartbeat() { |
344 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); | 343 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); |
345 } | 344 } |
346 | 345 |
347 void MCSClient::OnRMQUpdateFinished(bool success) { | 346 void MCSClient::OnGCMUpdateFinished(bool success) { |
348 LOG_IF(ERROR, !success) << "RMQ Update failed!"; | 347 LOG_IF(ERROR, !success) << "GCM Update failed!"; |
349 // TODO(zea): Rebuild the store from scratch in case of persistence failure? | 348 // TODO(zea): Rebuild the store from scratch in case of persistence failure? |
350 } | 349 } |
351 | 350 |
352 void MCSClient::MaybeSendMessage() { | 351 void MCSClient::MaybeSendMessage() { |
353 if (to_send_.empty()) | 352 if (to_send_.empty()) |
354 return; | 353 return; |
355 | 354 |
356 // If the connection has been reset, do nothing. On reconnection | 355 // If the connection has been reset, do nothing. On reconnection |
357 // MaybeSendMessage will be automatically invoked again. | 356 // MaybeSendMessage will be automatically invoked again. |
358 // TODO(zea): consider doing TTL expiration at connection reset time, rather | 357 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
359 // than reconnect time. | 358 // than reconnect time. |
360 if (!connection_factory_->IsEndpointReachable()) | 359 if (!connection_factory_->IsEndpointReachable()) |
361 return; | 360 return; |
362 | 361 |
363 MCSPacketInternal packet = to_send_.front(); | 362 MCSPacketInternal packet = to_send_.front(); |
364 to_send_.pop_front(); | 363 to_send_.pop_front(); |
365 if (HasTTLExpired(*packet->protobuf, clock_)) { | 364 if (HasTTLExpired(*packet->protobuf, clock_)) { |
366 DCHECK(!packet->persistent_id.empty()); | 365 DCHECK(!packet->persistent_id.empty()); |
367 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 366 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
368 message_sent_callback_.Run("TTL expired for " + packet->persistent_id); | 367 message_sent_callback_.Run("TTL expired for " + packet->persistent_id); |
369 rmq_store_->RemoveOutgoingMessage( | 368 gcm_store_->RemoveOutgoingMessage( |
370 packet->persistent_id, | 369 packet->persistent_id, |
371 base::Bind(&MCSClient::OnRMQUpdateFinished, | 370 base::Bind(&MCSClient::OnGCMUpdateFinished, |
372 weak_ptr_factory_.GetWeakPtr())); | 371 weak_ptr_factory_.GetWeakPtr())); |
373 base::MessageLoop::current()->PostTask( | 372 base::MessageLoop::current()->PostTask( |
374 FROM_HERE, | 373 FROM_HERE, |
375 base::Bind(&MCSClient::MaybeSendMessage, | 374 base::Bind(&MCSClient::MaybeSendMessage, |
376 weak_ptr_factory_.GetWeakPtr())); | 375 weak_ptr_factory_.GetWeakPtr())); |
377 return; | 376 return; |
378 } | 377 } |
379 DVLOG(1) << "Pending output message found, sending."; | 378 DVLOG(1) << "Pending output message found, sending."; |
380 if (!packet->persistent_id.empty()) | 379 if (!packet->persistent_id.empty()) |
381 to_resend_.push_back(packet); | 380 to_resend_.push_back(packet); |
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
472 } | 471 } |
473 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); | 472 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); |
474 iter != acked_stream_ids_to_remove.end(); ++iter) { | 473 iter != acked_stream_ids_to_remove.end(); ++iter) { |
475 acked_server_ids_.erase(*iter); | 474 acked_server_ids_.erase(*iter); |
476 } | 475 } |
477 } | 476 } |
478 | 477 |
479 ++stream_id_in_; | 478 ++stream_id_in_; |
480 if (!persistent_id.empty()) { | 479 if (!persistent_id.empty()) { |
481 unacked_server_ids_[stream_id_in_] = persistent_id; | 480 unacked_server_ids_[stream_id_in_] = persistent_id; |
482 rmq_store_->AddIncomingMessage(persistent_id, | 481 gcm_store_->AddIncomingMessage(persistent_id, |
483 base::Bind(&MCSClient::OnRMQUpdateFinished, | 482 base::Bind(&MCSClient::OnGCMUpdateFinished, |
484 weak_ptr_factory_.GetWeakPtr())); | 483 weak_ptr_factory_.GetWeakPtr())); |
485 } | 484 } |
486 | 485 |
487 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() | 486 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() |
488 << " with persistent id " | 487 << " with persistent id " |
489 << (persistent_id.empty() ? "NULL" : persistent_id) | 488 << (persistent_id.empty() ? "NULL" : persistent_id) |
490 << ", stream id " << stream_id_in_ << " and last stream id received " | 489 << ", stream id " << stream_id_in_ << " and last stream id received " |
491 << last_stream_id_received; | 490 << last_stream_id_received; |
492 | 491 |
493 if (unacked_server_ids_.size() > 0 && | 492 if (unacked_server_ids_.size() > 0 && |
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
620 to_resend_.front()->stream_id <= last_stream_id_received) { | 619 to_resend_.front()->stream_id <= last_stream_id_received) { |
621 const MCSPacketInternal& outgoing_packet = to_resend_.front(); | 620 const MCSPacketInternal& outgoing_packet = to_resend_.front(); |
622 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); | 621 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); |
623 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); | 622 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); |
624 to_resend_.pop_front(); | 623 to_resend_.pop_front(); |
625 } | 624 } |
626 | 625 |
627 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() | 626 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() |
628 << " outgoing messages, " << to_resend_.size() | 627 << " outgoing messages, " << to_resend_.size() |
629 << " remaining unacked"; | 628 << " remaining unacked"; |
630 rmq_store_->RemoveOutgoingMessages( | 629 gcm_store_->RemoveOutgoingMessages( |
631 acked_outgoing_persistent_ids, | 630 acked_outgoing_persistent_ids, |
632 base::Bind(&MCSClient::OnRMQUpdateFinished, | 631 base::Bind(&MCSClient::OnGCMUpdateFinished, |
633 weak_ptr_factory_.GetWeakPtr())); | 632 weak_ptr_factory_.GetWeakPtr())); |
634 | 633 |
635 HandleServerConfirmedReceipt(last_stream_id_received); | 634 HandleServerConfirmedReceipt(last_stream_id_received); |
636 } | 635 } |
637 | 636 |
638 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { | 637 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { |
639 // First check the to_resend_ queue. Acknowledgments should always happen | 638 // First check the to_resend_ queue. Acknowledgments should always happen |
640 // in the order they were sent, so if messages are present they should match | 639 // in the order they were sent, so if messages are present they should match |
641 // the acknowledge list. | 640 // the acknowledge list. |
642 PersistentIdList::const_iterator iter = id_list.begin(); | 641 PersistentIdList::const_iterator iter = id_list.begin(); |
(...skipping 21 matching lines...) Expand all Loading... |
664 StreamId device_stream_id = outgoing_packet->stream_id; | 663 StreamId device_stream_id = outgoing_packet->stream_id; |
665 HandleServerConfirmedReceipt(device_stream_id); | 664 HandleServerConfirmedReceipt(device_stream_id); |
666 | 665 |
667 to_send_.pop_front(); | 666 to_send_.pop_front(); |
668 } | 667 } |
669 | 668 |
670 DCHECK(iter == id_list.end()); | 669 DCHECK(iter == id_list.end()); |
671 | 670 |
672 DVLOG(1) << "Server acked " << id_list.size() | 671 DVLOG(1) << "Server acked " << id_list.size() |
673 << " messages, " << to_resend_.size() << " remaining unacked."; | 672 << " messages, " << to_resend_.size() << " remaining unacked."; |
674 rmq_store_->RemoveOutgoingMessages( | 673 gcm_store_->RemoveOutgoingMessages( |
675 id_list, | 674 id_list, |
676 base::Bind(&MCSClient::OnRMQUpdateFinished, | 675 base::Bind(&MCSClient::OnGCMUpdateFinished, |
677 weak_ptr_factory_.GetWeakPtr())); | 676 weak_ptr_factory_.GetWeakPtr())); |
678 | 677 |
679 // Resend any remaining outgoing messages, as they were not received by the | 678 // Resend any remaining outgoing messages, as they were not received by the |
680 // server. | 679 // server. |
681 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; | 680 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; |
682 while (!to_resend_.empty()) { | 681 while (!to_resend_.empty()) { |
683 to_send_.push_front(to_resend_.back()); | 682 to_send_.push_front(to_resend_.back()); |
684 to_resend_.pop_back(); | 683 to_resend_.pop_back(); |
685 } | 684 } |
686 } | 685 } |
(...skipping 11 matching lines...) Expand all Loading... |
698 iter != acked_server_ids_.end() && | 697 iter != acked_server_ids_.end() && |
699 iter->first <= device_stream_id;) { | 698 iter->first <= device_stream_id;) { |
700 acked_incoming_ids.insert(acked_incoming_ids.end(), | 699 acked_incoming_ids.insert(acked_incoming_ids.end(), |
701 iter->second.begin(), | 700 iter->second.begin(), |
702 iter->second.end()); | 701 iter->second.end()); |
703 acked_server_ids_.erase(iter++); | 702 acked_server_ids_.erase(iter++); |
704 } | 703 } |
705 | 704 |
706 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() | 705 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() |
707 << " acknowledged server messages."; | 706 << " acknowledged server messages."; |
708 rmq_store_->RemoveIncomingMessages( | 707 gcm_store_->RemoveIncomingMessages( |
709 acked_incoming_ids, | 708 acked_incoming_ids, |
710 base::Bind(&MCSClient::OnRMQUpdateFinished, | 709 base::Bind(&MCSClient::OnGCMUpdateFinished, |
711 weak_ptr_factory_.GetWeakPtr())); | 710 weak_ptr_factory_.GetWeakPtr())); |
712 } | 711 } |
713 | 712 |
714 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 713 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
715 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 714 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
716 } | 715 } |
717 | 716 |
718 void MCSClient::OnConnectionResetByHeartbeat() { | 717 void MCSClient::OnConnectionResetByHeartbeat() { |
719 connection_factory_->SignalConnectionReset(); | 718 connection_factory_->SignalConnectionReset(); |
720 } | 719 } |
721 | 720 |
722 } // namespace gcm | 721 } // namespace gcm |
OLD | NEW |