| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
| 9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
| 10 #include "google_apis/gcm/base/mcs_util.h" | 10 #include "google_apis/gcm/base/mcs_util.h" |
| 11 #include "google_apis/gcm/base/socket_stream.h" | 11 #include "google_apis/gcm/base/socket_stream.h" |
| 12 #include "google_apis/gcm/engine/connection_factory.h" | 12 #include "google_apis/gcm/engine/connection_factory.h" |
| 13 #include "google_apis/gcm/engine/rmq_store.h" | |
| 14 | 13 |
| 15 using namespace google::protobuf::io; | 14 using namespace google::protobuf::io; |
| 16 | 15 |
| 17 namespace gcm { | 16 namespace gcm { |
| 18 | 17 |
| 19 namespace { | 18 namespace { |
| 20 | 19 |
| 21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; | 20 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; |
| 22 | 21 |
| 23 // TODO(zea): get these values from MCS settings. | 22 // TODO(zea): get these values from MCS settings. |
| (...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 79 | 78 |
| 80 // The protobuf of the message itself. | 79 // The protobuf of the message itself. |
| 81 MCSProto protobuf; | 80 MCSProto protobuf; |
| 82 }; | 81 }; |
| 83 | 82 |
| 84 ReliablePacketInfo::ReliablePacketInfo() | 83 ReliablePacketInfo::ReliablePacketInfo() |
| 85 : stream_id(0), tag(0) { | 84 : stream_id(0), tag(0) { |
| 86 } | 85 } |
| 87 ReliablePacketInfo::~ReliablePacketInfo() {} | 86 ReliablePacketInfo::~ReliablePacketInfo() {} |
| 88 | 87 |
| 89 MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) | 88 MCSClient::MCSClient(ConnectionFactory* connection_factory, GCMStore* gcm_store) |
| 90 : state_(UNINITIALIZED), | 89 : state_(UNINITIALIZED), |
| 91 android_id_(0), | 90 android_id_(0), |
| 92 security_token_(0), | 91 security_token_(0), |
| 93 connection_factory_(connection_factory), | 92 connection_factory_(connection_factory), |
| 94 connection_handler_(NULL), | 93 connection_handler_(NULL), |
| 95 last_device_to_server_stream_id_received_(0), | 94 last_device_to_server_stream_id_received_(0), |
| 96 last_server_to_device_stream_id_received_(0), | 95 last_server_to_device_stream_id_received_(0), |
| 97 stream_id_out_(0), | 96 stream_id_out_(0), |
| 98 stream_id_in_(0), | 97 stream_id_in_(0), |
| 99 rmq_store_(rmq_store), | 98 gcm_store_(gcm_store), |
| 100 heartbeat_interval_( | 99 heartbeat_interval_( |
| 101 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)), | 100 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)), |
| 102 heartbeat_timer_(true, true), | 101 heartbeat_timer_(true, true), |
| 103 weak_ptr_factory_(this) { | 102 weak_ptr_factory_(this) {} |
| 104 } | |
| 105 | 103 |
| 106 MCSClient::~MCSClient() { | 104 MCSClient::~MCSClient() { |
| 107 } | 105 } |
| 108 | 106 |
| 109 void MCSClient::Initialize( | 107 void MCSClient::Initialize( |
| 110 const InitializationCompleteCallback& initialization_callback, | 108 const InitializationCompleteCallback& initialization_callback, |
| 111 const OnMessageReceivedCallback& message_received_callback, | 109 const OnMessageReceivedCallback& message_received_callback, |
| 112 const OnMessageSentCallback& message_sent_callback, | 110 const OnMessageSentCallback& message_sent_callback, |
| 113 const RMQStore::LoadResult& load_result) { | 111 const GCMStore::LoadResult& load_result) { |
| 114 DCHECK_EQ(state_, UNINITIALIZED); | 112 DCHECK_EQ(state_, UNINITIALIZED); |
| 115 initialization_callback_ = initialization_callback; | 113 initialization_callback_ = initialization_callback; |
| 116 message_received_callback_ = message_received_callback; | 114 message_received_callback_ = message_received_callback; |
| 117 message_sent_callback_ = message_sent_callback; | 115 message_sent_callback_ = message_sent_callback; |
| 118 | 116 |
| 119 connection_factory_->Initialize( | 117 connection_factory_->Initialize( |
| 120 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, | 118 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, |
| 121 weak_ptr_factory_.GetWeakPtr()), | 119 weak_ptr_factory_.GetWeakPtr()), |
| 122 base::Bind(&MCSClient::HandlePacketFromWire, | 120 base::Bind(&MCSClient::HandlePacketFromWire, |
| 123 weak_ptr_factory_.GetWeakPtr()), | 121 weak_ptr_factory_.GetWeakPtr()), |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 183 } | 181 } |
| 184 | 182 |
| 185 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 183 void MCSClient::Login(uint64 android_id, uint64 security_token) { |
| 186 DCHECK_EQ(state_, LOADED); | 184 DCHECK_EQ(state_, LOADED); |
| 187 if (android_id != android_id_ && security_token != security_token_) { | 185 if (android_id != android_id_ && security_token != security_token_) { |
| 188 DCHECK(android_id); | 186 DCHECK(android_id); |
| 189 DCHECK(security_token); | 187 DCHECK(security_token); |
| 190 DCHECK(restored_unackeds_server_ids_.empty()); | 188 DCHECK(restored_unackeds_server_ids_.empty()); |
| 191 android_id_ = android_id; | 189 android_id_ = android_id; |
| 192 security_token_ = security_token; | 190 security_token_ = security_token; |
| 193 rmq_store_->SetDeviceCredentials( | 191 gcm_store_->SetDeviceCredentials( |
| 194 android_id_, | 192 android_id_, |
| 195 security_token_, | 193 security_token_, |
| 196 base::Bind(&MCSClient::OnRMQUpdateFinished, | 194 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 197 weak_ptr_factory_.GetWeakPtr())); | 195 weak_ptr_factory_.GetWeakPtr())); |
| 198 } | 196 } |
| 199 | 197 |
| 200 state_ = CONNECTING; | 198 state_ = CONNECTING; |
| 201 connection_factory_->Connect(); | 199 connection_factory_->Connect(); |
| 202 } | 200 } |
| 203 | 201 |
| 204 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { | 202 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { |
| 205 DCHECK_EQ(state_, CONNECTED); | 203 DCHECK_EQ(state_, CONNECTED); |
| 206 if (to_send_.size() > kMaxSendQueueSize) { | 204 if (to_send_.size() > kMaxSendQueueSize) { |
| (...skipping 11 matching lines...) Expand all Loading... |
| 218 | 216 |
| 219 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 217 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| 220 packet_info->protobuf = message.CloneProtobuf(); | 218 packet_info->protobuf = message.CloneProtobuf(); |
| 221 | 219 |
| 222 if (use_rmq) { | 220 if (use_rmq) { |
| 223 PersistentId persistent_id = GetNextPersistentId(); | 221 PersistentId persistent_id = GetNextPersistentId(); |
| 224 DVLOG(1) << "Setting persistent id to " << persistent_id; | 222 DVLOG(1) << "Setting persistent id to " << persistent_id; |
| 225 packet_info->persistent_id = persistent_id; | 223 packet_info->persistent_id = persistent_id; |
| 226 SetPersistentId(persistent_id, | 224 SetPersistentId(persistent_id, |
| 227 packet_info->protobuf.get()); | 225 packet_info->protobuf.get()); |
| 228 rmq_store_->AddOutgoingMessage(persistent_id, | 226 gcm_store_->AddOutgoingMessage( |
| 229 MCSMessage(message.tag(), | 227 persistent_id, |
| 230 *(packet_info->protobuf)), | 228 MCSMessage(message.tag(), *(packet_info->protobuf)), |
| 231 base::Bind(&MCSClient::OnRMQUpdateFinished, | 229 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 232 weak_ptr_factory_.GetWeakPtr())); | 230 weak_ptr_factory_.GetWeakPtr())); |
| 233 } else { | 231 } else { |
| 234 // Check that there is an active connection to the endpoint. | 232 // Check that there is an active connection to the endpoint. |
| 235 if (!connection_handler_->CanSendMessage()) { | 233 if (!connection_handler_->CanSendMessage()) { |
| 236 base::MessageLoop::current()->PostTask( | 234 base::MessageLoop::current()->PostTask( |
| 237 FROM_HERE, | 235 FROM_HERE, |
| 238 base::Bind(message_sent_callback_, "Unable to reach endpoint")); | 236 base::Bind(message_sent_callback_, "Unable to reach endpoint")); |
| 239 return; | 237 return; |
| 240 } | 238 } |
| 241 } | 239 } |
| 242 to_send_.push_back(make_linked_ptr(packet_info)); | 240 to_send_.push_back(make_linked_ptr(packet_info)); |
| 243 MaybeSendMessage(); | 241 MaybeSendMessage(); |
| 244 } | 242 } |
| 245 | 243 |
| 246 void MCSClient::Destroy() { | 244 void MCSClient::Destroy() { |
| 247 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, | 245 gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 248 weak_ptr_factory_.GetWeakPtr())); | 246 weak_ptr_factory_.GetWeakPtr())); |
| 249 } | 247 } |
| 250 | 248 |
| 251 void MCSClient::ResetStateAndBuildLoginRequest( | 249 void MCSClient::ResetStateAndBuildLoginRequest( |
| 252 mcs_proto::LoginRequest* request) { | 250 mcs_proto::LoginRequest* request) { |
| 253 DCHECK(android_id_); | 251 DCHECK(android_id_); |
| 254 DCHECK(security_token_); | 252 DCHECK(security_token_); |
| 255 stream_id_in_ = 0; | 253 stream_id_in_ = 0; |
| 256 stream_id_out_ = 1; | 254 stream_id_out_ = 1; |
| 257 last_device_to_server_stream_id_received_ = 0; | 255 last_device_to_server_stream_id_received_ = 0; |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 302 heartbeat_timer_.Stop(); | 300 heartbeat_timer_.Stop(); |
| 303 | 301 |
| 304 state_ = CONNECTING; | 302 state_ = CONNECTING; |
| 305 } | 303 } |
| 306 | 304 |
| 307 void MCSClient::SendHeartbeat() { | 305 void MCSClient::SendHeartbeat() { |
| 308 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), | 306 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), |
| 309 false); | 307 false); |
| 310 } | 308 } |
| 311 | 309 |
| 312 void MCSClient::OnRMQUpdateFinished(bool success) { | 310 void MCSClient::OnGCMUpdateFinished(bool success) { |
| 313 LOG_IF(ERROR, !success) << "RMQ Update failed!"; | 311 LOG_IF(ERROR, !success) << "RMQ Update failed!"; |
| 314 // TODO(zea): Rebuild the store from scratch in case of persistence failure? | 312 // TODO(zea): Rebuild the store from scratch in case of persistence failure? |
| 315 } | 313 } |
| 316 | 314 |
| 317 void MCSClient::MaybeSendMessage() { | 315 void MCSClient::MaybeSendMessage() { |
| 318 if (to_send_.empty()) | 316 if (to_send_.empty()) |
| 319 return; | 317 return; |
| 320 | 318 |
| 321 if (!connection_handler_->CanSendMessage()) | 319 if (!connection_handler_->CanSendMessage()) |
| 322 return; | 320 return; |
| (...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 421 } | 419 } |
| 422 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); | 420 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); |
| 423 iter != acked_stream_ids_to_remove.end(); ++iter) { | 421 iter != acked_stream_ids_to_remove.end(); ++iter) { |
| 424 acked_server_ids_.erase(*iter); | 422 acked_server_ids_.erase(*iter); |
| 425 } | 423 } |
| 426 } | 424 } |
| 427 | 425 |
| 428 ++stream_id_in_; | 426 ++stream_id_in_; |
| 429 if (!persistent_id.empty()) { | 427 if (!persistent_id.empty()) { |
| 430 unacked_server_ids_[stream_id_in_] = persistent_id; | 428 unacked_server_ids_[stream_id_in_] = persistent_id; |
| 431 rmq_store_->AddIncomingMessage(persistent_id, | 429 gcm_store_->AddIncomingMessage(persistent_id, |
| 432 base::Bind(&MCSClient::OnRMQUpdateFinished, | 430 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 433 weak_ptr_factory_.GetWeakPtr())); | 431 weak_ptr_factory_.GetWeakPtr())); |
| 434 } | 432 } |
| 435 | 433 |
| 436 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() | 434 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() |
| 437 << " with persistent id " | 435 << " with persistent id " |
| 438 << (persistent_id.empty() ? "NULL" : persistent_id) | 436 << (persistent_id.empty() ? "NULL" : persistent_id) |
| 439 << ", stream id " << stream_id_in_ << " and last stream id received " | 437 << ", stream id " << stream_id_in_ << " and last stream id received " |
| 440 << last_stream_id_received; | 438 << last_stream_id_received; |
| 441 | 439 |
| 442 if (unacked_server_ids_.size() > 0 && | 440 if (unacked_server_ids_.size() > 0 && |
| (...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 562 to_resend_.front()->stream_id <= last_stream_id_received) { | 560 to_resend_.front()->stream_id <= last_stream_id_received) { |
| 563 const MCSPacketInternal& outgoing_packet = to_resend_.front(); | 561 const MCSPacketInternal& outgoing_packet = to_resend_.front(); |
| 564 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); | 562 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); |
| 565 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); | 563 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); |
| 566 to_resend_.pop_front(); | 564 to_resend_.pop_front(); |
| 567 } | 565 } |
| 568 | 566 |
| 569 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() | 567 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() |
| 570 << " outgoing messages, " << to_resend_.size() | 568 << " outgoing messages, " << to_resend_.size() |
| 571 << " remaining unacked"; | 569 << " remaining unacked"; |
| 572 rmq_store_->RemoveOutgoingMessages( | 570 gcm_store_->RemoveOutgoingMessages( |
| 573 acked_outgoing_persistent_ids, | 571 acked_outgoing_persistent_ids, |
| 574 base::Bind(&MCSClient::OnRMQUpdateFinished, | 572 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 575 weak_ptr_factory_.GetWeakPtr())); | 573 weak_ptr_factory_.GetWeakPtr())); |
| 576 | 574 |
| 577 HandleServerConfirmedReceipt(last_stream_id_received); | 575 HandleServerConfirmedReceipt(last_stream_id_received); |
| 578 } | 576 } |
| 579 | 577 |
| 580 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { | 578 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { |
| 581 // First check the to_resend_ queue. Acknowledgments should always happen | 579 // First check the to_resend_ queue. Acknowledgments should always happen |
| 582 // in the order they were sent, so if messages are present they should match | 580 // in the order they were sent, so if messages are present they should match |
| 583 // the acknowledge list. | 581 // the acknowledge list. |
| 584 PersistentIdList::const_iterator iter = id_list.begin(); | 582 PersistentIdList::const_iterator iter = id_list.begin(); |
| (...skipping 21 matching lines...) Expand all Loading... |
| 606 StreamId device_stream_id = outgoing_packet->stream_id; | 604 StreamId device_stream_id = outgoing_packet->stream_id; |
| 607 HandleServerConfirmedReceipt(device_stream_id); | 605 HandleServerConfirmedReceipt(device_stream_id); |
| 608 | 606 |
| 609 to_send_.pop_front(); | 607 to_send_.pop_front(); |
| 610 } | 608 } |
| 611 | 609 |
| 612 DCHECK(iter == id_list.end()); | 610 DCHECK(iter == id_list.end()); |
| 613 | 611 |
| 614 DVLOG(1) << "Server acked " << id_list.size() | 612 DVLOG(1) << "Server acked " << id_list.size() |
| 615 << " messages, " << to_resend_.size() << " remaining unacked."; | 613 << " messages, " << to_resend_.size() << " remaining unacked."; |
| 616 rmq_store_->RemoveOutgoingMessages( | 614 gcm_store_->RemoveOutgoingMessages( |
| 617 id_list, | 615 id_list, |
| 618 base::Bind(&MCSClient::OnRMQUpdateFinished, | 616 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 619 weak_ptr_factory_.GetWeakPtr())); | 617 weak_ptr_factory_.GetWeakPtr())); |
| 620 | 618 |
| 621 // Resend any remaining outgoing messages, as they were not received by the | 619 // Resend any remaining outgoing messages, as they were not received by the |
| 622 // server. | 620 // server. |
| 623 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; | 621 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; |
| 624 while (!to_resend_.empty()) { | 622 while (!to_resend_.empty()) { |
| 625 to_send_.push_front(to_resend_.back()); | 623 to_send_.push_front(to_resend_.back()); |
| 626 to_resend_.pop_back(); | 624 to_resend_.pop_back(); |
| 627 } | 625 } |
| 628 } | 626 } |
| (...skipping 11 matching lines...) Expand all Loading... |
| 640 iter != acked_server_ids_.end() && | 638 iter != acked_server_ids_.end() && |
| 641 iter->first <= device_stream_id;) { | 639 iter->first <= device_stream_id;) { |
| 642 acked_incoming_ids.insert(acked_incoming_ids.end(), | 640 acked_incoming_ids.insert(acked_incoming_ids.end(), |
| 643 iter->second.begin(), | 641 iter->second.begin(), |
| 644 iter->second.end()); | 642 iter->second.end()); |
| 645 acked_server_ids_.erase(iter++); | 643 acked_server_ids_.erase(iter++); |
| 646 } | 644 } |
| 647 | 645 |
| 648 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() | 646 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() |
| 649 << " acknowledged server messages."; | 647 << " acknowledged server messages."; |
| 650 rmq_store_->RemoveIncomingMessages( | 648 gcm_store_->RemoveIncomingMessages( |
| 651 acked_incoming_ids, | 649 acked_incoming_ids, |
| 652 base::Bind(&MCSClient::OnRMQUpdateFinished, | 650 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 653 weak_ptr_factory_.GetWeakPtr())); | 651 weak_ptr_factory_.GetWeakPtr())); |
| 654 } | 652 } |
| 655 | 653 |
| 656 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 654 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
| 657 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 655 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
| 658 } | 656 } |
| 659 | 657 |
| 660 } // namespace gcm | 658 } // namespace gcm |
| OLD | NEW |