Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
| 9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
| 10 #include "google_apis/gcm/base/mcs_util.h" | 10 #include "google_apis/gcm/base/mcs_util.h" |
| 11 #include "google_apis/gcm/base/socket_stream.h" | 11 #include "google_apis/gcm/base/socket_stream.h" |
| 12 #include "google_apis/gcm/engine/connection_factory.h" | 12 #include "google_apis/gcm/engine/connection_factory.h" |
| 13 #include "google_apis/gcm/engine/rmq_store.h" | 13 #include "google_apis/gcm/engine/rmq_store.h" |
| 14 | 14 |
| 15 using namespace google::protobuf::io; | 15 using namespace google::protobuf::io; |
| 16 | 16 |
| 17 namespace gcm { | 17 namespace gcm { |
| 18 | 18 |
| 19 namespace { | 19 namespace { |
| 20 | 20 |
| 21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; | 21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; |
| 22 | 22 |
| 23 // TODO(zea): get these values from MCS settings. | |
| 24 const int64 kHeartbeatDefaultSeconds = 60 * 15; // 15 minutes. | |
| 25 | |
| 26 // The category of messages intended for the GCM client itself from MCS. | 23 // The category of messages intended for the GCM client itself from MCS. |
| 27 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; | 24 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; |
| 28 | 25 |
| 29 // The from field for messages originating in the GCM client. | 26 // The from field for messages originating in the GCM client. |
| 30 const char kGCMFromField[] = "gcm@android.com"; | 27 const char kGCMFromField[] = "gcm@android.com"; |
| 31 | 28 |
| 32 // MCS status message types. | 29 // MCS status message types. |
| 30 // TODO(zea): handle these at the GCMClient layer. | |
|
fgorski
2013/12/19 23:55:13
What exactly do you mean?
Nicolas Zea
2013/12/20 22:52:56
System status settings like these should be handle
| |
| 33 const char kIdleNotification[] = "IdleNotification"; | 31 const char kIdleNotification[] = "IdleNotification"; |
| 34 // TODO(zea): consume the following message types: | |
| 35 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; | 32 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; |
| 36 // const char kPowerNotification[] = "PowerNotification"; | 33 // const char kPowerNotification[] = "PowerNotification"; |
| 37 // const char kDataActiveNotification[] = "DataActiveNotification"; | 34 // const char kDataActiveNotification[] = "DataActiveNotification"; |
| 38 | 35 |
| 39 // The number of unacked messages to allow before sending a stream ack. | 36 // The number of unacked messages to allow before sending a stream ack. |
| 40 // Applies to both incoming and outgoing messages. | 37 // Applies to both incoming and outgoing messages. |
| 41 // TODO(zea): make this server configurable. | 38 // TODO(zea): make this server configurable. |
| 42 const int kUnackedMessageBeforeStreamAck = 10; | 39 const int kUnackedMessageBeforeStreamAck = 10; |
| 43 | 40 |
| 44 // The global maximum number of pending messages to have in the send queue. | 41 // The global maximum number of pending messages to have in the send queue. |
| (...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 93 : state_(UNINITIALIZED), | 90 : state_(UNINITIALIZED), |
| 94 android_id_(0), | 91 android_id_(0), |
| 95 security_token_(0), | 92 security_token_(0), |
| 96 connection_factory_(connection_factory), | 93 connection_factory_(connection_factory), |
| 97 connection_handler_(NULL), | 94 connection_handler_(NULL), |
| 98 last_device_to_server_stream_id_received_(0), | 95 last_device_to_server_stream_id_received_(0), |
| 99 last_server_to_device_stream_id_received_(0), | 96 last_server_to_device_stream_id_received_(0), |
| 100 stream_id_out_(0), | 97 stream_id_out_(0), |
| 101 stream_id_in_(0), | 98 stream_id_in_(0), |
| 102 rmq_store_(rmq_path, blocking_task_runner), | 99 rmq_store_(rmq_path, blocking_task_runner), |
| 103 heartbeat_interval_( | |
| 104 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)), | |
| 105 heartbeat_timer_(true, true), | |
| 106 blocking_task_runner_(blocking_task_runner), | 100 blocking_task_runner_(blocking_task_runner), |
| 107 weak_ptr_factory_(this) { | 101 weak_ptr_factory_(this) { |
| 108 } | 102 } |
| 109 | 103 |
| 110 MCSClient::~MCSClient() { | 104 MCSClient::~MCSClient() { |
| 111 } | 105 } |
| 112 | 106 |
| 113 void MCSClient::Initialize( | 107 void MCSClient::Initialize( |
| 114 const InitializationCompleteCallback& initialization_callback, | 108 const InitializationCompleteCallback& initialization_callback, |
| 115 const OnMessageReceivedCallback& message_received_callback, | 109 const OnMessageReceivedCallback& message_received_callback, |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 127 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, | 121 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, |
| 128 weak_ptr_factory_.GetWeakPtr()), | 122 weak_ptr_factory_.GetWeakPtr()), |
| 129 base::Bind(&MCSClient::HandlePacketFromWire, | 123 base::Bind(&MCSClient::HandlePacketFromWire, |
| 130 weak_ptr_factory_.GetWeakPtr()), | 124 weak_ptr_factory_.GetWeakPtr()), |
| 131 base::Bind(&MCSClient::MaybeSendMessage, | 125 base::Bind(&MCSClient::MaybeSendMessage, |
| 132 weak_ptr_factory_.GetWeakPtr())); | 126 weak_ptr_factory_.GetWeakPtr())); |
| 133 connection_handler_ = connection_factory_->GetConnectionHandler(); | 127 connection_handler_ = connection_factory_->GetConnectionHandler(); |
| 134 } | 128 } |
| 135 | 129 |
| 136 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 130 void MCSClient::Login(uint64 android_id, uint64 security_token) { |
| 137 DCHECK_EQ(state_, LOADED); | |
| 138 if (android_id != android_id_ && security_token != security_token_) { | 131 if (android_id != android_id_ && security_token != security_token_) { |
| 139 DCHECK(android_id); | 132 DCHECK(android_id); |
| 140 DCHECK(security_token); | 133 DCHECK(security_token); |
| 141 DCHECK(restored_unackeds_server_ids_.empty()); | 134 DCHECK(restored_unackeds_server_ids_.empty()); |
| 142 android_id_ = android_id; | 135 android_id_ = android_id; |
| 143 security_token_ = security_token; | 136 security_token_ = security_token; |
| 144 rmq_store_.SetDeviceCredentials(android_id_, | 137 rmq_store_.SetDeviceCredentials(android_id_, |
| 145 security_token_, | 138 security_token_, |
| 146 base::Bind(&MCSClient::OnRMQUpdateFinished, | 139 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 147 weak_ptr_factory_.GetWeakPtr())); | 140 weak_ptr_factory_.GetWeakPtr())); |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 200 | 193 |
| 201 void MCSClient::ResetStateAndBuildLoginRequest( | 194 void MCSClient::ResetStateAndBuildLoginRequest( |
| 202 mcs_proto::LoginRequest* request) { | 195 mcs_proto::LoginRequest* request) { |
| 203 DCHECK(android_id_); | 196 DCHECK(android_id_); |
| 204 DCHECK(security_token_); | 197 DCHECK(security_token_); |
| 205 stream_id_in_ = 0; | 198 stream_id_in_ = 0; |
| 206 stream_id_out_ = 1; | 199 stream_id_out_ = 1; |
| 207 last_device_to_server_stream_id_received_ = 0; | 200 last_device_to_server_stream_id_received_ = 0; |
| 208 last_server_to_device_stream_id_received_ = 0; | 201 last_server_to_device_stream_id_received_ = 0; |
| 209 | 202 |
| 203 heartbeat_manager_.Stop(); | |
| 204 | |
| 210 // TODO(zea): expire all messages older than their TTL. | 205 // TODO(zea): expire all messages older than their TTL. |
| 211 | 206 |
| 212 // Add any pending acknowledgments to the list of ids. | 207 // Add any pending acknowledgments to the list of ids. |
| 213 for (StreamIdToPersistentIdMap::const_iterator iter = | 208 for (StreamIdToPersistentIdMap::const_iterator iter = |
| 214 unacked_server_ids_.begin(); | 209 unacked_server_ids_.begin(); |
| 215 iter != unacked_server_ids_.end(); ++iter) { | 210 iter != unacked_server_ids_.end(); ++iter) { |
| 216 restored_unackeds_server_ids_.push_back(iter->second); | 211 restored_unackeds_server_ids_.push_back(iter->second); |
| 217 } | 212 } |
| 218 unacked_server_ids_.clear(); | 213 unacked_server_ids_.clear(); |
| 219 | 214 |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 242 // to RMQ, as all messages that reach this point should already have been | 237 // to RMQ, as all messages that reach this point should already have been |
| 243 // saved as necessary. | 238 // saved as necessary. |
| 244 while (!to_resend_.empty()) { | 239 while (!to_resend_.empty()) { |
| 245 to_send_.push_front(to_resend_.back()); | 240 to_send_.push_front(to_resend_.back()); |
| 246 to_resend_.pop_back(); | 241 to_resend_.pop_back(); |
| 247 } | 242 } |
| 248 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() | 243 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
| 249 << " incoming acks pending, and " << to_send_.size() | 244 << " incoming acks pending, and " << to_send_.size() |
| 250 << " pending outgoing messages."; | 245 << " pending outgoing messages."; |
| 251 | 246 |
| 252 heartbeat_timer_.Stop(); | |
| 253 | |
| 254 state_ = CONNECTING; | 247 state_ = CONNECTING; |
| 255 } | 248 } |
| 256 | 249 |
| 257 void MCSClient::SendHeartbeat() { | 250 void MCSClient::SendHeartbeat() { |
| 258 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), | 251 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), |
| 259 false); | 252 false); |
| 260 } | 253 } |
| 261 | 254 |
| 262 void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) { | 255 void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) { |
| 263 if (!result.success) { | 256 if (!result.success) { |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 327 | 320 |
| 328 DVLOG(1) << "Pending output message found, sending."; | 321 DVLOG(1) << "Pending output message found, sending."; |
| 329 MCSPacketInternal packet = to_send_.front(); | 322 MCSPacketInternal packet = to_send_.front(); |
| 330 to_send_.pop_front(); | 323 to_send_.pop_front(); |
| 331 if (!packet->persistent_id.empty()) | 324 if (!packet->persistent_id.empty()) |
| 332 to_resend_.push_back(packet); | 325 to_resend_.push_back(packet); |
| 333 SendPacketToWire(packet.get()); | 326 SendPacketToWire(packet.get()); |
| 334 } | 327 } |
| 335 | 328 |
| 336 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { | 329 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { |
| 337 // Reset the heartbeat interval. | |
| 338 heartbeat_timer_.Reset(); | |
| 339 packet_info->stream_id = ++stream_id_out_; | 330 packet_info->stream_id = ++stream_id_out_; |
| 340 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); | 331 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); |
| 341 | 332 |
| 342 // Set the proper last received stream id to acknowledge received server | 333 // Set the proper last received stream id to acknowledge received server |
| 343 // packets. | 334 // packets. |
| 344 DVLOG(1) << "Setting last stream id received to " | 335 DVLOG(1) << "Setting last stream id received to " |
| 345 << stream_id_in_; | 336 << stream_id_in_; |
| 346 SetLastStreamIdReceived(stream_id_in_, | 337 SetLastStreamIdReceived(stream_id_in_, |
| 347 packet_info->protobuf.get()); | 338 packet_info->protobuf.get()); |
| 348 if (stream_id_in_ != last_server_to_device_stream_id_received_) { | 339 if (stream_id_in_ != last_server_to_device_stream_id_received_) { |
| (...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 442 << last_stream_id_received; | 433 << last_stream_id_received; |
| 443 | 434 |
| 444 if (unacked_server_ids_.size() > 0 && | 435 if (unacked_server_ids_.size() > 0 && |
| 445 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { | 436 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { |
| 446 SendMessage(MCSMessage(kIqStanzaTag, | 437 SendMessage(MCSMessage(kIqStanzaTag, |
| 447 BuildStreamAck(). | 438 BuildStreamAck(). |
| 448 PassAs<const google::protobuf::MessageLite>()), | 439 PassAs<const google::protobuf::MessageLite>()), |
| 449 false); | 440 false); |
| 450 } | 441 } |
| 451 | 442 |
| 443 // The connection is alive, treat this message as a heartbeat ack. | |
| 444 heartbeat_manager_.OnHeartbeatAcked(); | |
|
fgorski
2013/12/19 23:55:13
Will that work if the manager hasn't been started
Nicolas Zea
2013/12/20 22:52:56
Yep, OnHeartbeatAcked does nothing if not running.
| |
| 445 | |
| 452 switch (tag) { | 446 switch (tag) { |
| 453 case kLoginResponseTag: { | 447 case kLoginResponseTag: { |
| 454 DCHECK_EQ(CONNECTING, state_); | 448 DCHECK_EQ(CONNECTING, state_); |
| 455 mcs_proto::LoginResponse* login_response = | 449 mcs_proto::LoginResponse* login_response = |
| 456 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); | 450 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); |
| 457 DVLOG(1) << "Received login response:"; | 451 DVLOG(1) << "Received login response:"; |
| 458 DVLOG(1) << " Id: " << login_response->id(); | 452 DVLOG(1) << " Id: " << login_response->id(); |
| 459 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); | 453 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); |
| 460 if (login_response->has_error()) { | 454 if (login_response->has_error() && login_response->error().code() != 0) { |
| 461 state_ = UNINITIALIZED; | 455 state_ = UNINITIALIZED; |
| 462 DVLOG(1) << " Error code: " << login_response->error().code(); | 456 DVLOG(1) << " Error code: " << login_response->error().code(); |
| 463 DVLOG(1) << " Error message: " << login_response->error().message(); | 457 DVLOG(1) << " Error message: " << login_response->error().message(); |
| 464 initialization_callback_.Run(false, 0, 0); | 458 initialization_callback_.Run(false, 0, 0); |
| 465 return; | 459 return; |
| 466 } | 460 } |
| 467 | 461 |
| 462 if (login_response->has_heartbeat_config()) { | |
| 463 heartbeat_manager_.UpdateHeartbeatConfig( | |
| 464 login_response->heartbeat_config()); | |
| 465 } | |
| 466 | |
| 468 state_ = CONNECTED; | 467 state_ = CONNECTED; |
| 469 stream_id_in_ = 1; // To account for the login response. | 468 stream_id_in_ = 1; // To account for the login response. |
| 470 DCHECK_EQ(1U, stream_id_out_); | 469 DCHECK_EQ(1U, stream_id_out_); |
| 471 | 470 |
| 472 // Pass the login response on up. | 471 // Pass the login response on up. |
| 473 base::MessageLoop::current()->PostTask( | 472 base::MessageLoop::current()->PostTask( |
| 474 FROM_HERE, | 473 FROM_HERE, |
| 475 base::Bind(message_received_callback_, | 474 base::Bind(message_received_callback_, |
| 476 MCSMessage(tag, | 475 MCSMessage(tag, |
| 477 protobuf.PassAs< | 476 protobuf.PassAs< |
| 478 const google::protobuf::MessageLite>()))); | 477 const google::protobuf::MessageLite>()))); |
| 479 | 478 |
| 480 // If there are pending messages, attempt to send one. | 479 // If there are pending messages, attempt to send one. |
| 481 if (!to_send_.empty()) { | 480 if (!to_send_.empty()) { |
| 482 base::MessageLoop::current()->PostTask( | 481 base::MessageLoop::current()->PostTask( |
| 483 FROM_HERE, | 482 FROM_HERE, |
| 484 base::Bind(&MCSClient::MaybeSendMessage, | 483 base::Bind(&MCSClient::MaybeSendMessage, |
| 485 weak_ptr_factory_.GetWeakPtr())); | 484 weak_ptr_factory_.GetWeakPtr())); |
| 486 } | 485 } |
| 487 | 486 |
| 488 heartbeat_timer_.Start(FROM_HERE, | 487 heartbeat_manager_.Start( |
| 489 heartbeat_interval_, | 488 base::Bind(&MCSClient::SendHeartbeat, |
| 490 base::Bind(&MCSClient::SendHeartbeat, | 489 weak_ptr_factory_.GetWeakPtr()), |
| 491 weak_ptr_factory_.GetWeakPtr())); | 490 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, |
| 491 weak_ptr_factory_.GetWeakPtr())); | |
| 492 return; | 492 return; |
| 493 } | 493 } |
| 494 case kHeartbeatPingTag: | 494 case kHeartbeatPingTag: |
| 495 DCHECK_GE(stream_id_in_, 1U); | 495 DCHECK_GE(stream_id_in_, 1U); |
| 496 DVLOG(1) << "Received heartbeat ping, sending ack."; | 496 DVLOG(1) << "Received heartbeat ping, sending ack."; |
| 497 SendMessage( | 497 SendMessage( |
| 498 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); | 498 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); |
| 499 return; | 499 return; |
| 500 case kHeartbeatAckTag: | 500 case kHeartbeatAckTag: |
| 501 DCHECK_GE(stream_id_in_, 1U); | 501 DCHECK_GE(stream_id_in_, 1U); |
| 502 DVLOG(1) << "Received heartbeat ack."; | 502 DVLOG(1) << "Received heartbeat ack."; |
| 503 // TODO(zea): add logic to reconnect if no ack received within a certain | 503 // Do nothing else, all messages act as heartbeat acks. |
| 504 // timeout (with backoff). | |
| 505 return; | 504 return; |
| 506 case kCloseTag: | 505 case kCloseTag: |
| 507 LOG(ERROR) << "Received close command, resetting connection."; | 506 LOG(ERROR) << "Received close command, resetting connection."; |
| 508 state_ = LOADED; | 507 state_ = LOADED; |
| 509 connection_factory_->SignalConnectionReset(); | 508 connection_factory_->SignalConnectionReset(); |
| 510 return; | 509 return; |
| 511 case kIqStanzaTag: { | 510 case kIqStanzaTag: { |
| 512 DCHECK_GE(stream_id_in_, 1U); | 511 DCHECK_GE(stream_id_in_, 1U); |
| 513 mcs_proto::IqStanza* iq_stanza = | 512 mcs_proto::IqStanza* iq_stanza = |
| 514 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); | 513 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); |
| (...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 649 << " acknowledged server messages."; | 648 << " acknowledged server messages."; |
| 650 rmq_store_.RemoveIncomingMessages(acked_incoming_ids, | 649 rmq_store_.RemoveIncomingMessages(acked_incoming_ids, |
| 651 base::Bind(&MCSClient::OnRMQUpdateFinished, | 650 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 652 weak_ptr_factory_.GetWeakPtr())); | 651 weak_ptr_factory_.GetWeakPtr())); |
| 653 } | 652 } |
| 654 | 653 |
| 655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 654 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
| 656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 655 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
| 657 } | 656 } |
| 658 | 657 |
| 658 void MCSClient::OnConnectionResetByHeartbeat() { | |
| 659 connection_factory_->SignalConnectionReset(); | |
| 660 } | |
| 661 | |
| 659 } // namespace gcm | 662 } // namespace gcm |
| OLD | NEW |