| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
| 9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
| 10 #include "base/time/clock.h" | 10 #include "base/time/clock.h" |
| 11 #include "base/time/time.h" | 11 #include "base/time/time.h" |
| 12 #include "google_apis/gcm/base/mcs_util.h" | 12 #include "google_apis/gcm/base/mcs_util.h" |
| 13 #include "google_apis/gcm/base/socket_stream.h" | 13 #include "google_apis/gcm/base/socket_stream.h" |
| 14 #include "google_apis/gcm/engine/connection_factory.h" | 14 #include "google_apis/gcm/engine/connection_factory.h" |
| 15 #include "google_apis/gcm/engine/rmq_store.h" | |
| 16 | 15 |
| 17 using namespace google::protobuf::io; | 16 using namespace google::protobuf::io; |
| 18 | 17 |
| 19 namespace gcm { | 18 namespace gcm { |
| 20 | 19 |
| 21 namespace { | 20 namespace { |
| 22 | 21 |
| 23 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; | 22 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; |
| 24 | 23 |
| 25 // The category of messages intended for the GCM client itself from MCS. | 24 // The category of messages intended for the GCM client itself from MCS. |
| (...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 80 MCSProto protobuf; | 79 MCSProto protobuf; |
| 81 }; | 80 }; |
| 82 | 81 |
| 83 ReliablePacketInfo::ReliablePacketInfo() | 82 ReliablePacketInfo::ReliablePacketInfo() |
| 84 : stream_id(0), tag(0) { | 83 : stream_id(0), tag(0) { |
| 85 } | 84 } |
| 86 ReliablePacketInfo::~ReliablePacketInfo() {} | 85 ReliablePacketInfo::~ReliablePacketInfo() {} |
| 87 | 86 |
| 88 MCSClient::MCSClient(base::Clock* clock, | 87 MCSClient::MCSClient(base::Clock* clock, |
| 89 ConnectionFactory* connection_factory, | 88 ConnectionFactory* connection_factory, |
| 90 RMQStore* rmq_store) | 89 GCMStore* gcm_store) |
| 91 : clock_(clock), | 90 : clock_(clock), |
| 92 state_(UNINITIALIZED), | 91 state_(UNINITIALIZED), |
| 93 android_id_(0), | 92 android_id_(0), |
| 94 security_token_(0), | 93 security_token_(0), |
| 95 connection_factory_(connection_factory), | 94 connection_factory_(connection_factory), |
| 96 connection_handler_(NULL), | 95 connection_handler_(NULL), |
| 97 last_device_to_server_stream_id_received_(0), | 96 last_device_to_server_stream_id_received_(0), |
| 98 last_server_to_device_stream_id_received_(0), | 97 last_server_to_device_stream_id_received_(0), |
| 99 stream_id_out_(0), | 98 stream_id_out_(0), |
| 100 stream_id_in_(0), | 99 stream_id_in_(0), |
| 101 rmq_store_(rmq_store), | 100 gcm_store_(gcm_store), |
| 102 weak_ptr_factory_(this) { | 101 weak_ptr_factory_(this) { |
| 103 } | 102 } |
| 104 | 103 |
| 105 MCSClient::~MCSClient() { | 104 MCSClient::~MCSClient() { |
| 106 } | 105 } |
| 107 | 106 |
| 108 void MCSClient::Initialize( | 107 void MCSClient::Initialize( |
| 109 const InitializationCompleteCallback& initialization_callback, | 108 const InitializationCompleteCallback& initialization_callback, |
| 110 const OnMessageReceivedCallback& message_received_callback, | 109 const OnMessageReceivedCallback& message_received_callback, |
| 111 const OnMessageSentCallback& message_sent_callback, | 110 const OnMessageSentCallback& message_sent_callback, |
| 112 const RMQStore::LoadResult& load_result) { | 111 const GCMStore::LoadResult& load_result) { |
| 113 DCHECK_EQ(state_, UNINITIALIZED); | 112 DCHECK_EQ(state_, UNINITIALIZED); |
| 114 initialization_callback_ = initialization_callback; | 113 initialization_callback_ = initialization_callback; |
| 115 message_received_callback_ = message_received_callback; | 114 message_received_callback_ = message_received_callback; |
| 116 message_sent_callback_ = message_sent_callback; | 115 message_sent_callback_ = message_sent_callback; |
| 117 | 116 |
| 118 connection_factory_->Initialize( | 117 connection_factory_->Initialize( |
| 119 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, | 118 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, |
| 120 weak_ptr_factory_.GetWeakPtr()), | 119 weak_ptr_factory_.GetWeakPtr()), |
| 121 base::Bind(&MCSClient::HandlePacketFromWire, | 120 base::Bind(&MCSClient::HandlePacketFromWire, |
| 122 weak_ptr_factory_.GetWeakPtr()), | 121 weak_ptr_factory_.GetWeakPtr()), |
| (...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 169 expired_ttl_ids.push_back(iter->first); | 168 expired_ttl_ids.push_back(iter->first); |
| 170 message_sent_callback_.Run("TTL expired for " + iter->first); | 169 message_sent_callback_.Run("TTL expired for " + iter->first); |
| 171 delete iter->second; | 170 delete iter->second; |
| 172 continue; | 171 continue; |
| 173 } | 172 } |
| 174 | 173 |
| 175 ordered_messages[timestamp] = iter->second; | 174 ordered_messages[timestamp] = iter->second; |
| 176 } | 175 } |
| 177 | 176 |
| 178 if (!expired_ttl_ids.empty()) { | 177 if (!expired_ttl_ids.empty()) { |
| 179 rmq_store_->RemoveOutgoingMessages( | 178 gcm_store_->RemoveOutgoingMessages( |
| 180 expired_ttl_ids, | 179 expired_ttl_ids, |
| 181 base::Bind(&MCSClient::OnRMQUpdateFinished, | 180 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 182 weak_ptr_factory_.GetWeakPtr())); | 181 weak_ptr_factory_.GetWeakPtr())); |
| 183 } | 182 } |
| 184 | 183 |
| 185 // Now go through and add the outgoing messages to the send queue in their | 184 // Now go through and add the outgoing messages to the send queue in their |
| 186 // appropriate order (oldest at front, most recent at back). | 185 // appropriate order (oldest at front, most recent at back). |
| 187 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator | 186 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator |
| 188 iter = ordered_messages.begin(); | 187 iter = ordered_messages.begin(); |
| 189 iter != ordered_messages.end(); ++iter) { | 188 iter != ordered_messages.end(); ++iter) { |
| 190 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 189 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| 191 packet_info->protobuf.reset(iter->second); | 190 packet_info->protobuf.reset(iter->second); |
| 192 packet_info->persistent_id = base::Uint64ToString(iter->first); | 191 packet_info->persistent_id = base::Uint64ToString(iter->first); |
| 193 to_send_.push_back(make_linked_ptr(packet_info)); | 192 to_send_.push_back(make_linked_ptr(packet_info)); |
| 194 } | 193 } |
| 195 | 194 |
| 196 // TODO(fgorski): that is likely the only place where the initialization | 195 // TODO(fgorski): that is likely the only place where the initialization |
| 197 // callback could be used. | 196 // callback could be used. |
| 198 initialization_callback_.Run(true, android_id_, security_token_); | 197 initialization_callback_.Run(true, android_id_, security_token_); |
| 199 } | 198 } |
| 200 | 199 |
| 201 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 200 void MCSClient::Login(uint64 android_id, uint64 security_token) { |
| 202 if (android_id != android_id_ && security_token != security_token_) { | 201 if (android_id != android_id_ && security_token != security_token_) { |
| 203 DCHECK(android_id); | 202 DCHECK(android_id); |
| 204 DCHECK(security_token); | 203 DCHECK(security_token); |
| 205 DCHECK(restored_unackeds_server_ids_.empty()); | 204 DCHECK(restored_unackeds_server_ids_.empty()); |
| 206 android_id_ = android_id; | 205 android_id_ = android_id; |
| 207 security_token_ = security_token; | 206 security_token_ = security_token; |
| 208 rmq_store_->SetDeviceCredentials( | 207 gcm_store_->SetDeviceCredentials( |
| 209 android_id_, | 208 android_id_, |
| 210 security_token_, | 209 security_token_, |
| 211 base::Bind(&MCSClient::OnRMQUpdateFinished, | 210 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 212 weak_ptr_factory_.GetWeakPtr())); | 211 weak_ptr_factory_.GetWeakPtr())); |
| 213 } | 212 } |
| 214 | 213 |
| 215 state_ = CONNECTING; | 214 state_ = CONNECTING; |
| 216 connection_factory_->Connect(); | 215 connection_factory_->Connect(); |
| 217 } | 216 } |
| 218 | 217 |
| 219 void MCSClient::SendMessage(const MCSMessage& message) { | 218 void MCSClient::SendMessage(const MCSMessage& message) { |
| 220 int ttl = GetTTL(message.GetProtobuf()); | 219 int ttl = GetTTL(message.GetProtobuf()); |
| 221 DCHECK_GE(ttl, 0); | 220 DCHECK_GE(ttl, 0); |
| 222 if (to_send_.size() > kMaxSendQueueSize) { | 221 if (to_send_.size() > kMaxSendQueueSize) { |
| 223 message_sent_callback_.Run("Message queue full."); | 222 message_sent_callback_.Run("Message queue full."); |
| 224 return; | 223 return; |
| 225 } | 224 } |
| 226 if (message.size() > kMaxMessageBytes) { | 225 if (message.size() > kMaxMessageBytes) { |
| 227 message_sent_callback_.Run("Message too large."); | 226 message_sent_callback_.Run("Message too large."); |
| 228 return; | 227 return; |
| 229 } | 228 } |
| 230 | 229 |
| 231 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 230 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| 232 packet_info->protobuf = message.CloneProtobuf(); | 231 packet_info->protobuf = message.CloneProtobuf(); |
| 233 | 232 |
| 234 if (ttl > 0) { | 233 if (ttl > 0) { |
| 235 PersistentId persistent_id = GetNextPersistentId(); | 234 PersistentId persistent_id = GetNextPersistentId(); |
| 236 DVLOG(1) << "Setting persistent id to " << persistent_id; | 235 DVLOG(1) << "Setting persistent id to " << persistent_id; |
| 237 packet_info->persistent_id = persistent_id; | 236 packet_info->persistent_id = persistent_id; |
| 238 SetPersistentId(persistent_id, | 237 SetPersistentId(persistent_id, |
| 239 packet_info->protobuf.get()); | 238 packet_info->protobuf.get()); |
| 240 rmq_store_->AddOutgoingMessage(persistent_id, | 239 gcm_store_->AddOutgoingMessage(persistent_id, |
| 241 MCSMessage(message.tag(), | 240 MCSMessage(message.tag(), |
| 242 *(packet_info->protobuf)), | 241 *(packet_info->protobuf)), |
| 243 base::Bind(&MCSClient::OnRMQUpdateFinished, | 242 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 244 weak_ptr_factory_.GetWeakPtr())); | 243 weak_ptr_factory_.GetWeakPtr())); |
| 245 } else if (!connection_factory_->IsEndpointReachable()) { | 244 } else if (!connection_factory_->IsEndpointReachable()) { |
| 246 DVLOG(1) << "No active connection, dropping message."; | 245 DVLOG(1) << "No active connection, dropping message."; |
| 247 message_sent_callback_.Run("TTL expired"); | 246 message_sent_callback_.Run("TTL expired"); |
| 248 return; | 247 return; |
| 249 } | 248 } |
| 250 to_send_.push_back(make_linked_ptr(packet_info)); | 249 to_send_.push_back(make_linked_ptr(packet_info)); |
| 251 MaybeSendMessage(); | 250 MaybeSendMessage(); |
| 252 } | 251 } |
| 253 | 252 |
| 254 void MCSClient::Destroy() { | 253 void MCSClient::Destroy() { |
| 255 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, | 254 gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 256 weak_ptr_factory_.GetWeakPtr())); | 255 weak_ptr_factory_.GetWeakPtr())); |
| 257 } | 256 } |
| 258 | 257 |
| 259 void MCSClient::ResetStateAndBuildLoginRequest( | 258 void MCSClient::ResetStateAndBuildLoginRequest( |
| 260 mcs_proto::LoginRequest* request) { | 259 mcs_proto::LoginRequest* request) { |
| 261 DCHECK(android_id_); | 260 DCHECK(android_id_); |
| 262 DCHECK(security_token_); | 261 DCHECK(security_token_); |
| 263 stream_id_in_ = 0; | 262 stream_id_in_ = 0; |
| 264 stream_id_out_ = 1; | 263 stream_id_out_ = 1; |
| 265 last_device_to_server_stream_id_received_ = 0; | 264 last_device_to_server_stream_id_received_ = 0; |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 318 // message from the persistent store. | 317 // message from the persistent store. |
| 319 if (!packet->persistent_id.empty()) | 318 if (!packet->persistent_id.empty()) |
| 320 expired_ttl_ids.push_back(packet->persistent_id); | 319 expired_ttl_ids.push_back(packet->persistent_id); |
| 321 message_sent_callback_.Run("TTL expired"); | 320 message_sent_callback_.Run("TTL expired"); |
| 322 } | 321 } |
| 323 } | 322 } |
| 324 | 323 |
| 325 if (!expired_ttl_ids.empty()) { | 324 if (!expired_ttl_ids.empty()) { |
| 326 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size() | 325 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size() |
| 327 << " messages expired."; | 326 << " messages expired."; |
| 328 rmq_store_->RemoveOutgoingMessages( | 327 gcm_store_->RemoveOutgoingMessages( |
| 329 expired_ttl_ids, | 328 expired_ttl_ids, |
| 330 base::Bind(&MCSClient::OnRMQUpdateFinished, | 329 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 331 weak_ptr_factory_.GetWeakPtr())); | 330 weak_ptr_factory_.GetWeakPtr())); |
| 332 } | 331 } |
| 333 | 332 |
| 334 to_send_.swap(new_to_send); | 333 to_send_.swap(new_to_send); |
| 335 | 334 |
| 336 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() | 335 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
| 337 << " incoming acks pending, and " << to_send_.size() | 336 << " incoming acks pending, and " << to_send_.size() |
| 338 << " pending outgoing messages."; | 337 << " pending outgoing messages."; |
| 339 | 338 |
| 340 state_ = CONNECTING; | 339 state_ = CONNECTING; |
| 341 } | 340 } |
| 342 | 341 |
| 343 void MCSClient::SendHeartbeat() { | 342 void MCSClient::SendHeartbeat() { |
| 344 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); | 343 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); |
| 345 } | 344 } |
| 346 | 345 |
| 347 void MCSClient::OnRMQUpdateFinished(bool success) { | 346 void MCSClient::OnGCMUpdateFinished(bool success) { |
| 348 LOG_IF(ERROR, !success) << "RMQ Update failed!"; | 347 LOG_IF(ERROR, !success) << "GCM Update failed!"; |
| 349 // TODO(zea): Rebuild the store from scratch in case of persistence failure? | 348 // TODO(zea): Rebuild the store from scratch in case of persistence failure? |
| 350 } | 349 } |
| 351 | 350 |
| 352 void MCSClient::MaybeSendMessage() { | 351 void MCSClient::MaybeSendMessage() { |
| 353 if (to_send_.empty()) | 352 if (to_send_.empty()) |
| 354 return; | 353 return; |
| 355 | 354 |
| 356 // If the connection has been reset, do nothing. On reconnection | 355 // If the connection has been reset, do nothing. On reconnection |
| 357 // MaybeSendMessage will be automatically invoked again. | 356 // MaybeSendMessage will be automatically invoked again. |
| 358 // TODO(zea): consider doing TTL expiration at connection reset time, rather | 357 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
| 359 // than reconnect time. | 358 // than reconnect time. |
| 360 if (!connection_factory_->IsEndpointReachable()) | 359 if (!connection_factory_->IsEndpointReachable()) |
| 361 return; | 360 return; |
| 362 | 361 |
| 363 MCSPacketInternal packet = to_send_.front(); | 362 MCSPacketInternal packet = to_send_.front(); |
| 364 to_send_.pop_front(); | 363 to_send_.pop_front(); |
| 365 if (HasTTLExpired(*packet->protobuf, clock_)) { | 364 if (HasTTLExpired(*packet->protobuf, clock_)) { |
| 366 DCHECK(!packet->persistent_id.empty()); | 365 DCHECK(!packet->persistent_id.empty()); |
| 367 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 366 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
| 368 message_sent_callback_.Run("TTL expired for " + packet->persistent_id); | 367 message_sent_callback_.Run("TTL expired for " + packet->persistent_id); |
| 369 rmq_store_->RemoveOutgoingMessage( | 368 gcm_store_->RemoveOutgoingMessage( |
| 370 packet->persistent_id, | 369 packet->persistent_id, |
| 371 base::Bind(&MCSClient::OnRMQUpdateFinished, | 370 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 372 weak_ptr_factory_.GetWeakPtr())); | 371 weak_ptr_factory_.GetWeakPtr())); |
| 373 base::MessageLoop::current()->PostTask( | 372 base::MessageLoop::current()->PostTask( |
| 374 FROM_HERE, | 373 FROM_HERE, |
| 375 base::Bind(&MCSClient::MaybeSendMessage, | 374 base::Bind(&MCSClient::MaybeSendMessage, |
| 376 weak_ptr_factory_.GetWeakPtr())); | 375 weak_ptr_factory_.GetWeakPtr())); |
| 377 return; | 376 return; |
| 378 } | 377 } |
| 379 DVLOG(1) << "Pending output message found, sending."; | 378 DVLOG(1) << "Pending output message found, sending."; |
| 380 if (!packet->persistent_id.empty()) | 379 if (!packet->persistent_id.empty()) |
| 381 to_resend_.push_back(packet); | 380 to_resend_.push_back(packet); |
| (...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 472 } | 471 } |
| 473 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); | 472 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); |
| 474 iter != acked_stream_ids_to_remove.end(); ++iter) { | 473 iter != acked_stream_ids_to_remove.end(); ++iter) { |
| 475 acked_server_ids_.erase(*iter); | 474 acked_server_ids_.erase(*iter); |
| 476 } | 475 } |
| 477 } | 476 } |
| 478 | 477 |
| 479 ++stream_id_in_; | 478 ++stream_id_in_; |
| 480 if (!persistent_id.empty()) { | 479 if (!persistent_id.empty()) { |
| 481 unacked_server_ids_[stream_id_in_] = persistent_id; | 480 unacked_server_ids_[stream_id_in_] = persistent_id; |
| 482 rmq_store_->AddIncomingMessage(persistent_id, | 481 gcm_store_->AddIncomingMessage(persistent_id, |
| 483 base::Bind(&MCSClient::OnRMQUpdateFinished, | 482 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 484 weak_ptr_factory_.GetWeakPtr())); | 483 weak_ptr_factory_.GetWeakPtr())); |
| 485 } | 484 } |
| 486 | 485 |
| 487 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() | 486 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() |
| 488 << " with persistent id " | 487 << " with persistent id " |
| 489 << (persistent_id.empty() ? "NULL" : persistent_id) | 488 << (persistent_id.empty() ? "NULL" : persistent_id) |
| 490 << ", stream id " << stream_id_in_ << " and last stream id received " | 489 << ", stream id " << stream_id_in_ << " and last stream id received " |
| 491 << last_stream_id_received; | 490 << last_stream_id_received; |
| 492 | 491 |
| 493 if (unacked_server_ids_.size() > 0 && | 492 if (unacked_server_ids_.size() > 0 && |
| (...skipping 126 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 620 to_resend_.front()->stream_id <= last_stream_id_received) { | 619 to_resend_.front()->stream_id <= last_stream_id_received) { |
| 621 const MCSPacketInternal& outgoing_packet = to_resend_.front(); | 620 const MCSPacketInternal& outgoing_packet = to_resend_.front(); |
| 622 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); | 621 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); |
| 623 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); | 622 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); |
| 624 to_resend_.pop_front(); | 623 to_resend_.pop_front(); |
| 625 } | 624 } |
| 626 | 625 |
| 627 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() | 626 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() |
| 628 << " outgoing messages, " << to_resend_.size() | 627 << " outgoing messages, " << to_resend_.size() |
| 629 << " remaining unacked"; | 628 << " remaining unacked"; |
| 630 rmq_store_->RemoveOutgoingMessages( | 629 gcm_store_->RemoveOutgoingMessages( |
| 631 acked_outgoing_persistent_ids, | 630 acked_outgoing_persistent_ids, |
| 632 base::Bind(&MCSClient::OnRMQUpdateFinished, | 631 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 633 weak_ptr_factory_.GetWeakPtr())); | 632 weak_ptr_factory_.GetWeakPtr())); |
| 634 | 633 |
| 635 HandleServerConfirmedReceipt(last_stream_id_received); | 634 HandleServerConfirmedReceipt(last_stream_id_received); |
| 636 } | 635 } |
| 637 | 636 |
| 638 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { | 637 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { |
| 639 // First check the to_resend_ queue. Acknowledgments should always happen | 638 // First check the to_resend_ queue. Acknowledgments should always happen |
| 640 // in the order they were sent, so if messages are present they should match | 639 // in the order they were sent, so if messages are present they should match |
| 641 // the acknowledge list. | 640 // the acknowledge list. |
| 642 PersistentIdList::const_iterator iter = id_list.begin(); | 641 PersistentIdList::const_iterator iter = id_list.begin(); |
| (...skipping 21 matching lines...) Expand all Loading... |
| 664 StreamId device_stream_id = outgoing_packet->stream_id; | 663 StreamId device_stream_id = outgoing_packet->stream_id; |
| 665 HandleServerConfirmedReceipt(device_stream_id); | 664 HandleServerConfirmedReceipt(device_stream_id); |
| 666 | 665 |
| 667 to_send_.pop_front(); | 666 to_send_.pop_front(); |
| 668 } | 667 } |
| 669 | 668 |
| 670 DCHECK(iter == id_list.end()); | 669 DCHECK(iter == id_list.end()); |
| 671 | 670 |
| 672 DVLOG(1) << "Server acked " << id_list.size() | 671 DVLOG(1) << "Server acked " << id_list.size() |
| 673 << " messages, " << to_resend_.size() << " remaining unacked."; | 672 << " messages, " << to_resend_.size() << " remaining unacked."; |
| 674 rmq_store_->RemoveOutgoingMessages( | 673 gcm_store_->RemoveOutgoingMessages( |
| 675 id_list, | 674 id_list, |
| 676 base::Bind(&MCSClient::OnRMQUpdateFinished, | 675 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 677 weak_ptr_factory_.GetWeakPtr())); | 676 weak_ptr_factory_.GetWeakPtr())); |
| 678 | 677 |
| 679 // Resend any remaining outgoing messages, as they were not received by the | 678 // Resend any remaining outgoing messages, as they were not received by the |
| 680 // server. | 679 // server. |
| 681 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; | 680 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; |
| 682 while (!to_resend_.empty()) { | 681 while (!to_resend_.empty()) { |
| 683 to_send_.push_front(to_resend_.back()); | 682 to_send_.push_front(to_resend_.back()); |
| 684 to_resend_.pop_back(); | 683 to_resend_.pop_back(); |
| 685 } | 684 } |
| 686 } | 685 } |
| (...skipping 11 matching lines...) Expand all Loading... |
| 698 iter != acked_server_ids_.end() && | 697 iter != acked_server_ids_.end() && |
| 699 iter->first <= device_stream_id;) { | 698 iter->first <= device_stream_id;) { |
| 700 acked_incoming_ids.insert(acked_incoming_ids.end(), | 699 acked_incoming_ids.insert(acked_incoming_ids.end(), |
| 701 iter->second.begin(), | 700 iter->second.begin(), |
| 702 iter->second.end()); | 701 iter->second.end()); |
| 703 acked_server_ids_.erase(iter++); | 702 acked_server_ids_.erase(iter++); |
| 704 } | 703 } |
| 705 | 704 |
| 706 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() | 705 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() |
| 707 << " acknowledged server messages."; | 706 << " acknowledged server messages."; |
| 708 rmq_store_->RemoveIncomingMessages( | 707 gcm_store_->RemoveIncomingMessages( |
| 709 acked_incoming_ids, | 708 acked_incoming_ids, |
| 710 base::Bind(&MCSClient::OnRMQUpdateFinished, | 709 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 711 weak_ptr_factory_.GetWeakPtr())); | 710 weak_ptr_factory_.GetWeakPtr())); |
| 712 } | 711 } |
| 713 | 712 |
| 714 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 713 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
| 715 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 714 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
| 716 } | 715 } |
| 717 | 716 |
| 718 void MCSClient::OnConnectionResetByHeartbeat() { | 717 void MCSClient::OnConnectionResetByHeartbeat() { |
| 719 connection_factory_->SignalConnectionReset(); | 718 connection_factory_->SignalConnectionReset(); |
| 720 } | 719 } |
| 721 | 720 |
| 722 } // namespace gcm | 721 } // namespace gcm |
| OLD | NEW |