Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(367)

Side by Side Diff: google_apis/gcm/engine/mcs_client.cc

Issue 121743002: Renaming RMQStore to GCMStore and breaking out its interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Adding a GCM_EXPORT directive to GCMStoreImpl Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/engine/mcs_client_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/mcs_client.h" 5 #include "google_apis/gcm/engine/mcs_client.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h" 8 #include "base/message_loop/message_loop.h"
9 #include "base/strings/string_number_conversions.h" 9 #include "base/strings/string_number_conversions.h"
10 #include "base/time/clock.h" 10 #include "base/time/clock.h"
11 #include "base/time/time.h" 11 #include "base/time/time.h"
12 #include "google_apis/gcm/base/mcs_util.h" 12 #include "google_apis/gcm/base/mcs_util.h"
13 #include "google_apis/gcm/base/socket_stream.h" 13 #include "google_apis/gcm/base/socket_stream.h"
14 #include "google_apis/gcm/engine/connection_factory.h" 14 #include "google_apis/gcm/engine/connection_factory.h"
15 #include "google_apis/gcm/engine/rmq_store.h"
16 15
17 using namespace google::protobuf::io; 16 using namespace google::protobuf::io;
18 17
19 namespace gcm { 18 namespace gcm {
20 19
21 namespace { 20 namespace {
22 21
23 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; 22 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
24 23
25 // The category of messages intended for the GCM client itself from MCS. 24 // The category of messages intended for the GCM client itself from MCS.
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
80 MCSProto protobuf; 79 MCSProto protobuf;
81 }; 80 };
82 81
83 ReliablePacketInfo::ReliablePacketInfo() 82 ReliablePacketInfo::ReliablePacketInfo()
84 : stream_id(0), tag(0) { 83 : stream_id(0), tag(0) {
85 } 84 }
86 ReliablePacketInfo::~ReliablePacketInfo() {} 85 ReliablePacketInfo::~ReliablePacketInfo() {}
87 86
88 MCSClient::MCSClient(base::Clock* clock, 87 MCSClient::MCSClient(base::Clock* clock,
89 ConnectionFactory* connection_factory, 88 ConnectionFactory* connection_factory,
90 RMQStore* rmq_store) 89 GCMStore* gcm_store)
91 : clock_(clock), 90 : clock_(clock),
92 state_(UNINITIALIZED), 91 state_(UNINITIALIZED),
93 android_id_(0), 92 android_id_(0),
94 security_token_(0), 93 security_token_(0),
95 connection_factory_(connection_factory), 94 connection_factory_(connection_factory),
96 connection_handler_(NULL), 95 connection_handler_(NULL),
97 last_device_to_server_stream_id_received_(0), 96 last_device_to_server_stream_id_received_(0),
98 last_server_to_device_stream_id_received_(0), 97 last_server_to_device_stream_id_received_(0),
99 stream_id_out_(0), 98 stream_id_out_(0),
100 stream_id_in_(0), 99 stream_id_in_(0),
101 rmq_store_(rmq_store), 100 gcm_store_(gcm_store),
102 weak_ptr_factory_(this) { 101 weak_ptr_factory_(this) {
103 } 102 }
104 103
105 MCSClient::~MCSClient() { 104 MCSClient::~MCSClient() {
106 } 105 }
107 106
108 void MCSClient::Initialize( 107 void MCSClient::Initialize(
109 const InitializationCompleteCallback& initialization_callback, 108 const InitializationCompleteCallback& initialization_callback,
110 const OnMessageReceivedCallback& message_received_callback, 109 const OnMessageReceivedCallback& message_received_callback,
111 const OnMessageSentCallback& message_sent_callback, 110 const OnMessageSentCallback& message_sent_callback,
112 const RMQStore::LoadResult& load_result) { 111 const GCMStore::LoadResult& load_result) {
113 DCHECK_EQ(state_, UNINITIALIZED); 112 DCHECK_EQ(state_, UNINITIALIZED);
114 initialization_callback_ = initialization_callback; 113 initialization_callback_ = initialization_callback;
115 message_received_callback_ = message_received_callback; 114 message_received_callback_ = message_received_callback;
116 message_sent_callback_ = message_sent_callback; 115 message_sent_callback_ = message_sent_callback;
117 116
118 connection_factory_->Initialize( 117 connection_factory_->Initialize(
119 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, 118 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
120 weak_ptr_factory_.GetWeakPtr()), 119 weak_ptr_factory_.GetWeakPtr()),
121 base::Bind(&MCSClient::HandlePacketFromWire, 120 base::Bind(&MCSClient::HandlePacketFromWire,
122 weak_ptr_factory_.GetWeakPtr()), 121 weak_ptr_factory_.GetWeakPtr()),
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
169 expired_ttl_ids.push_back(iter->first); 168 expired_ttl_ids.push_back(iter->first);
170 message_sent_callback_.Run("TTL expired for " + iter->first); 169 message_sent_callback_.Run("TTL expired for " + iter->first);
171 delete iter->second; 170 delete iter->second;
172 continue; 171 continue;
173 } 172 }
174 173
175 ordered_messages[timestamp] = iter->second; 174 ordered_messages[timestamp] = iter->second;
176 } 175 }
177 176
178 if (!expired_ttl_ids.empty()) { 177 if (!expired_ttl_ids.empty()) {
179 rmq_store_->RemoveOutgoingMessages( 178 gcm_store_->RemoveOutgoingMessages(
180 expired_ttl_ids, 179 expired_ttl_ids,
181 base::Bind(&MCSClient::OnRMQUpdateFinished, 180 base::Bind(&MCSClient::OnGCMUpdateFinished,
182 weak_ptr_factory_.GetWeakPtr())); 181 weak_ptr_factory_.GetWeakPtr()));
183 } 182 }
184 183
185 // Now go through and add the outgoing messages to the send queue in their 184 // Now go through and add the outgoing messages to the send queue in their
186 // appropriate order (oldest at front, most recent at back). 185 // appropriate order (oldest at front, most recent at back).
187 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator 186 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator
188 iter = ordered_messages.begin(); 187 iter = ordered_messages.begin();
189 iter != ordered_messages.end(); ++iter) { 188 iter != ordered_messages.end(); ++iter) {
190 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 189 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
191 packet_info->protobuf.reset(iter->second); 190 packet_info->protobuf.reset(iter->second);
192 packet_info->persistent_id = base::Uint64ToString(iter->first); 191 packet_info->persistent_id = base::Uint64ToString(iter->first);
193 to_send_.push_back(make_linked_ptr(packet_info)); 192 to_send_.push_back(make_linked_ptr(packet_info));
194 } 193 }
195 194
196 // TODO(fgorski): that is likely the only place where the initialization 195 // TODO(fgorski): that is likely the only place where the initialization
197 // callback could be used. 196 // callback could be used.
198 initialization_callback_.Run(true, android_id_, security_token_); 197 initialization_callback_.Run(true, android_id_, security_token_);
199 } 198 }
200 199
201 void MCSClient::Login(uint64 android_id, uint64 security_token) { 200 void MCSClient::Login(uint64 android_id, uint64 security_token) {
202 if (android_id != android_id_ && security_token != security_token_) { 201 if (android_id != android_id_ && security_token != security_token_) {
203 DCHECK(android_id); 202 DCHECK(android_id);
204 DCHECK(security_token); 203 DCHECK(security_token);
205 DCHECK(restored_unackeds_server_ids_.empty()); 204 DCHECK(restored_unackeds_server_ids_.empty());
206 android_id_ = android_id; 205 android_id_ = android_id;
207 security_token_ = security_token; 206 security_token_ = security_token;
208 rmq_store_->SetDeviceCredentials( 207 gcm_store_->SetDeviceCredentials(
209 android_id_, 208 android_id_,
210 security_token_, 209 security_token_,
211 base::Bind(&MCSClient::OnRMQUpdateFinished, 210 base::Bind(&MCSClient::OnGCMUpdateFinished,
212 weak_ptr_factory_.GetWeakPtr())); 211 weak_ptr_factory_.GetWeakPtr()));
213 } 212 }
214 213
215 state_ = CONNECTING; 214 state_ = CONNECTING;
216 connection_factory_->Connect(); 215 connection_factory_->Connect();
217 } 216 }
218 217
219 void MCSClient::SendMessage(const MCSMessage& message) { 218 void MCSClient::SendMessage(const MCSMessage& message) {
220 int ttl = GetTTL(message.GetProtobuf()); 219 int ttl = GetTTL(message.GetProtobuf());
221 DCHECK_GE(ttl, 0); 220 DCHECK_GE(ttl, 0);
222 if (to_send_.size() > kMaxSendQueueSize) { 221 if (to_send_.size() > kMaxSendQueueSize) {
223 message_sent_callback_.Run("Message queue full."); 222 message_sent_callback_.Run("Message queue full.");
224 return; 223 return;
225 } 224 }
226 if (message.size() > kMaxMessageBytes) { 225 if (message.size() > kMaxMessageBytes) {
227 message_sent_callback_.Run("Message too large."); 226 message_sent_callback_.Run("Message too large.");
228 return; 227 return;
229 } 228 }
230 229
231 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 230 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
232 packet_info->protobuf = message.CloneProtobuf(); 231 packet_info->protobuf = message.CloneProtobuf();
233 232
234 if (ttl > 0) { 233 if (ttl > 0) {
235 PersistentId persistent_id = GetNextPersistentId(); 234 PersistentId persistent_id = GetNextPersistentId();
236 DVLOG(1) << "Setting persistent id to " << persistent_id; 235 DVLOG(1) << "Setting persistent id to " << persistent_id;
237 packet_info->persistent_id = persistent_id; 236 packet_info->persistent_id = persistent_id;
238 SetPersistentId(persistent_id, 237 SetPersistentId(persistent_id,
239 packet_info->protobuf.get()); 238 packet_info->protobuf.get());
240 rmq_store_->AddOutgoingMessage(persistent_id, 239 gcm_store_->AddOutgoingMessage(persistent_id,
241 MCSMessage(message.tag(), 240 MCSMessage(message.tag(),
242 *(packet_info->protobuf)), 241 *(packet_info->protobuf)),
243 base::Bind(&MCSClient::OnRMQUpdateFinished, 242 base::Bind(&MCSClient::OnGCMUpdateFinished,
244 weak_ptr_factory_.GetWeakPtr())); 243 weak_ptr_factory_.GetWeakPtr()));
245 } else if (!connection_factory_->IsEndpointReachable()) { 244 } else if (!connection_factory_->IsEndpointReachable()) {
246 DVLOG(1) << "No active connection, dropping message."; 245 DVLOG(1) << "No active connection, dropping message.";
247 message_sent_callback_.Run("TTL expired"); 246 message_sent_callback_.Run("TTL expired");
248 return; 247 return;
249 } 248 }
250 to_send_.push_back(make_linked_ptr(packet_info)); 249 to_send_.push_back(make_linked_ptr(packet_info));
251 MaybeSendMessage(); 250 MaybeSendMessage();
252 } 251 }
253 252
254 void MCSClient::Destroy() { 253 void MCSClient::Destroy() {
255 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, 254 gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished,
256 weak_ptr_factory_.GetWeakPtr())); 255 weak_ptr_factory_.GetWeakPtr()));
257 } 256 }
258 257
259 void MCSClient::ResetStateAndBuildLoginRequest( 258 void MCSClient::ResetStateAndBuildLoginRequest(
260 mcs_proto::LoginRequest* request) { 259 mcs_proto::LoginRequest* request) {
261 DCHECK(android_id_); 260 DCHECK(android_id_);
262 DCHECK(security_token_); 261 DCHECK(security_token_);
263 stream_id_in_ = 0; 262 stream_id_in_ = 0;
264 stream_id_out_ = 1; 263 stream_id_out_ = 1;
265 last_device_to_server_stream_id_received_ = 0; 264 last_device_to_server_stream_id_received_ = 0;
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
318 // message from the persistent store. 317 // message from the persistent store.
319 if (!packet->persistent_id.empty()) 318 if (!packet->persistent_id.empty())
320 expired_ttl_ids.push_back(packet->persistent_id); 319 expired_ttl_ids.push_back(packet->persistent_id);
321 message_sent_callback_.Run("TTL expired"); 320 message_sent_callback_.Run("TTL expired");
322 } 321 }
323 } 322 }
324 323
325 if (!expired_ttl_ids.empty()) { 324 if (!expired_ttl_ids.empty()) {
326 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size() 325 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
327 << " messages expired."; 326 << " messages expired.";
328 rmq_store_->RemoveOutgoingMessages( 327 gcm_store_->RemoveOutgoingMessages(
329 expired_ttl_ids, 328 expired_ttl_ids,
330 base::Bind(&MCSClient::OnRMQUpdateFinished, 329 base::Bind(&MCSClient::OnGCMUpdateFinished,
331 weak_ptr_factory_.GetWeakPtr())); 330 weak_ptr_factory_.GetWeakPtr()));
332 } 331 }
333 332
334 to_send_.swap(new_to_send); 333 to_send_.swap(new_to_send);
335 334
336 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() 335 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
337 << " incoming acks pending, and " << to_send_.size() 336 << " incoming acks pending, and " << to_send_.size()
338 << " pending outgoing messages."; 337 << " pending outgoing messages.";
339 338
340 state_ = CONNECTING; 339 state_ = CONNECTING;
341 } 340 }
342 341
343 void MCSClient::SendHeartbeat() { 342 void MCSClient::SendHeartbeat() {
344 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); 343 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
345 } 344 }
346 345
347 void MCSClient::OnRMQUpdateFinished(bool success) { 346 void MCSClient::OnGCMUpdateFinished(bool success) {
348 LOG_IF(ERROR, !success) << "RMQ Update failed!"; 347 LOG_IF(ERROR, !success) << "GCM Update failed!";
349 // TODO(zea): Rebuild the store from scratch in case of persistence failure? 348 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
350 } 349 }
351 350
352 void MCSClient::MaybeSendMessage() { 351 void MCSClient::MaybeSendMessage() {
353 if (to_send_.empty()) 352 if (to_send_.empty())
354 return; 353 return;
355 354
356 // If the connection has been reset, do nothing. On reconnection 355 // If the connection has been reset, do nothing. On reconnection
357 // MaybeSendMessage will be automatically invoked again. 356 // MaybeSendMessage will be automatically invoked again.
358 // TODO(zea): consider doing TTL expiration at connection reset time, rather 357 // TODO(zea): consider doing TTL expiration at connection reset time, rather
359 // than reconnect time. 358 // than reconnect time.
360 if (!connection_factory_->IsEndpointReachable()) 359 if (!connection_factory_->IsEndpointReachable())
361 return; 360 return;
362 361
363 MCSPacketInternal packet = to_send_.front(); 362 MCSPacketInternal packet = to_send_.front();
364 to_send_.pop_front(); 363 to_send_.pop_front();
365 if (HasTTLExpired(*packet->protobuf, clock_)) { 364 if (HasTTLExpired(*packet->protobuf, clock_)) {
366 DCHECK(!packet->persistent_id.empty()); 365 DCHECK(!packet->persistent_id.empty());
367 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; 366 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
368 message_sent_callback_.Run("TTL expired for " + packet->persistent_id); 367 message_sent_callback_.Run("TTL expired for " + packet->persistent_id);
369 rmq_store_->RemoveOutgoingMessage( 368 gcm_store_->RemoveOutgoingMessage(
370 packet->persistent_id, 369 packet->persistent_id,
371 base::Bind(&MCSClient::OnRMQUpdateFinished, 370 base::Bind(&MCSClient::OnGCMUpdateFinished,
372 weak_ptr_factory_.GetWeakPtr())); 371 weak_ptr_factory_.GetWeakPtr()));
373 base::MessageLoop::current()->PostTask( 372 base::MessageLoop::current()->PostTask(
374 FROM_HERE, 373 FROM_HERE,
375 base::Bind(&MCSClient::MaybeSendMessage, 374 base::Bind(&MCSClient::MaybeSendMessage,
376 weak_ptr_factory_.GetWeakPtr())); 375 weak_ptr_factory_.GetWeakPtr()));
377 return; 376 return;
378 } 377 }
379 DVLOG(1) << "Pending output message found, sending."; 378 DVLOG(1) << "Pending output message found, sending.";
380 if (!packet->persistent_id.empty()) 379 if (!packet->persistent_id.empty())
381 to_resend_.push_back(packet); 380 to_resend_.push_back(packet);
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
472 } 471 }
473 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); 472 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
474 iter != acked_stream_ids_to_remove.end(); ++iter) { 473 iter != acked_stream_ids_to_remove.end(); ++iter) {
475 acked_server_ids_.erase(*iter); 474 acked_server_ids_.erase(*iter);
476 } 475 }
477 } 476 }
478 477
479 ++stream_id_in_; 478 ++stream_id_in_;
480 if (!persistent_id.empty()) { 479 if (!persistent_id.empty()) {
481 unacked_server_ids_[stream_id_in_] = persistent_id; 480 unacked_server_ids_[stream_id_in_] = persistent_id;
482 rmq_store_->AddIncomingMessage(persistent_id, 481 gcm_store_->AddIncomingMessage(persistent_id,
483 base::Bind(&MCSClient::OnRMQUpdateFinished, 482 base::Bind(&MCSClient::OnGCMUpdateFinished,
484 weak_ptr_factory_.GetWeakPtr())); 483 weak_ptr_factory_.GetWeakPtr()));
485 } 484 }
486 485
487 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() 486 DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
488 << " with persistent id " 487 << " with persistent id "
489 << (persistent_id.empty() ? "NULL" : persistent_id) 488 << (persistent_id.empty() ? "NULL" : persistent_id)
490 << ", stream id " << stream_id_in_ << " and last stream id received " 489 << ", stream id " << stream_id_in_ << " and last stream id received "
491 << last_stream_id_received; 490 << last_stream_id_received;
492 491
493 if (unacked_server_ids_.size() > 0 && 492 if (unacked_server_ids_.size() > 0 &&
(...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after
620 to_resend_.front()->stream_id <= last_stream_id_received) { 619 to_resend_.front()->stream_id <= last_stream_id_received) {
621 const MCSPacketInternal& outgoing_packet = to_resend_.front(); 620 const MCSPacketInternal& outgoing_packet = to_resend_.front();
622 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); 621 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
623 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); 622 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
624 to_resend_.pop_front(); 623 to_resend_.pop_front();
625 } 624 }
626 625
627 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() 626 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
628 << " outgoing messages, " << to_resend_.size() 627 << " outgoing messages, " << to_resend_.size()
629 << " remaining unacked"; 628 << " remaining unacked";
630 rmq_store_->RemoveOutgoingMessages( 629 gcm_store_->RemoveOutgoingMessages(
631 acked_outgoing_persistent_ids, 630 acked_outgoing_persistent_ids,
632 base::Bind(&MCSClient::OnRMQUpdateFinished, 631 base::Bind(&MCSClient::OnGCMUpdateFinished,
633 weak_ptr_factory_.GetWeakPtr())); 632 weak_ptr_factory_.GetWeakPtr()));
634 633
635 HandleServerConfirmedReceipt(last_stream_id_received); 634 HandleServerConfirmedReceipt(last_stream_id_received);
636 } 635 }
637 636
638 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { 637 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
639 // First check the to_resend_ queue. Acknowledgments should always happen 638 // First check the to_resend_ queue. Acknowledgments should always happen
640 // in the order they were sent, so if messages are present they should match 639 // in the order they were sent, so if messages are present they should match
641 // the acknowledge list. 640 // the acknowledge list.
642 PersistentIdList::const_iterator iter = id_list.begin(); 641 PersistentIdList::const_iterator iter = id_list.begin();
(...skipping 21 matching lines...) Expand all
664 StreamId device_stream_id = outgoing_packet->stream_id; 663 StreamId device_stream_id = outgoing_packet->stream_id;
665 HandleServerConfirmedReceipt(device_stream_id); 664 HandleServerConfirmedReceipt(device_stream_id);
666 665
667 to_send_.pop_front(); 666 to_send_.pop_front();
668 } 667 }
669 668
670 DCHECK(iter == id_list.end()); 669 DCHECK(iter == id_list.end());
671 670
672 DVLOG(1) << "Server acked " << id_list.size() 671 DVLOG(1) << "Server acked " << id_list.size()
673 << " messages, " << to_resend_.size() << " remaining unacked."; 672 << " messages, " << to_resend_.size() << " remaining unacked.";
674 rmq_store_->RemoveOutgoingMessages( 673 gcm_store_->RemoveOutgoingMessages(
675 id_list, 674 id_list,
676 base::Bind(&MCSClient::OnRMQUpdateFinished, 675 base::Bind(&MCSClient::OnGCMUpdateFinished,
677 weak_ptr_factory_.GetWeakPtr())); 676 weak_ptr_factory_.GetWeakPtr()));
678 677
679 // Resend any remaining outgoing messages, as they were not received by the 678 // Resend any remaining outgoing messages, as they were not received by the
680 // server. 679 // server.
681 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; 680 DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
682 while (!to_resend_.empty()) { 681 while (!to_resend_.empty()) {
683 to_send_.push_front(to_resend_.back()); 682 to_send_.push_front(to_resend_.back());
684 to_resend_.pop_back(); 683 to_resend_.pop_back();
685 } 684 }
686 } 685 }
(...skipping 11 matching lines...) Expand all
698 iter != acked_server_ids_.end() && 697 iter != acked_server_ids_.end() &&
699 iter->first <= device_stream_id;) { 698 iter->first <= device_stream_id;) {
700 acked_incoming_ids.insert(acked_incoming_ids.end(), 699 acked_incoming_ids.insert(acked_incoming_ids.end(),
701 iter->second.begin(), 700 iter->second.begin(),
702 iter->second.end()); 701 iter->second.end());
703 acked_server_ids_.erase(iter++); 702 acked_server_ids_.erase(iter++);
704 } 703 }
705 704
706 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() 705 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
707 << " acknowledged server messages."; 706 << " acknowledged server messages.";
708 rmq_store_->RemoveIncomingMessages( 707 gcm_store_->RemoveIncomingMessages(
709 acked_incoming_ids, 708 acked_incoming_ids,
710 base::Bind(&MCSClient::OnRMQUpdateFinished, 709 base::Bind(&MCSClient::OnGCMUpdateFinished,
711 weak_ptr_factory_.GetWeakPtr())); 710 weak_ptr_factory_.GetWeakPtr()));
712 } 711 }
713 712
714 MCSClient::PersistentId MCSClient::GetNextPersistentId() { 713 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
715 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); 714 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
716 } 715 }
717 716
718 void MCSClient::OnConnectionResetByHeartbeat() { 717 void MCSClient::OnConnectionResetByHeartbeat() {
719 connection_factory_->SignalConnectionReset(); 718 connection_factory_->SignalConnectionReset();
720 } 719 }
721 720
722 } // namespace gcm 721 } // namespace gcm
OLDNEW
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/engine/mcs_client_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698