OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
10 #include "google_apis/gcm/base/mcs_util.h" | 10 #include "google_apis/gcm/base/mcs_util.h" |
11 #include "google_apis/gcm/base/socket_stream.h" | 11 #include "google_apis/gcm/base/socket_stream.h" |
12 #include "google_apis/gcm/engine/connection_factory.h" | 12 #include "google_apis/gcm/engine/connection_factory.h" |
13 #include "google_apis/gcm/engine/rmq_store.h" | 13 #include "google_apis/gcm/engine/rmq_store.h" |
14 | 14 |
15 using namespace google::protobuf::io; | 15 using namespace google::protobuf::io; |
16 | 16 |
17 namespace gcm { | 17 namespace gcm { |
18 | 18 |
19 namespace { | 19 namespace { |
20 | 20 |
21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; | 21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; |
22 | 22 |
23 // TODO(zea): get these values from MCS settings. | |
24 const int64 kHeartbeatDefaultSeconds = 60 * 15; // 15 minutes. | |
25 | |
26 // The category of messages intended for the GCM client itself from MCS. | 23 // The category of messages intended for the GCM client itself from MCS. |
27 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; | 24 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; |
28 | 25 |
29 // The from field for messages originating in the GCM client. | 26 // The from field for messages originating in the GCM client. |
30 const char kGCMFromField[] = "gcm@android.com"; | 27 const char kGCMFromField[] = "gcm@android.com"; |
31 | 28 |
32 // MCS status message types. | 29 // MCS status message types. |
| 30 // TODO(zea): handle these at the GCMClient layer. |
33 const char kIdleNotification[] = "IdleNotification"; | 31 const char kIdleNotification[] = "IdleNotification"; |
34 // TODO(zea): consume the following message types: | |
35 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; | 32 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; |
36 // const char kPowerNotification[] = "PowerNotification"; | 33 // const char kPowerNotification[] = "PowerNotification"; |
37 // const char kDataActiveNotification[] = "DataActiveNotification"; | 34 // const char kDataActiveNotification[] = "DataActiveNotification"; |
38 | 35 |
39 // The number of unacked messages to allow before sending a stream ack. | 36 // The number of unacked messages to allow before sending a stream ack. |
40 // Applies to both incoming and outgoing messages. | 37 // Applies to both incoming and outgoing messages. |
41 // TODO(zea): make this server configurable. | 38 // TODO(zea): make this server configurable. |
42 const int kUnackedMessageBeforeStreamAck = 10; | 39 const int kUnackedMessageBeforeStreamAck = 10; |
43 | 40 |
44 // The global maximum number of pending messages to have in the send queue. | 41 // The global maximum number of pending messages to have in the send queue. |
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
93 : state_(UNINITIALIZED), | 90 : state_(UNINITIALIZED), |
94 android_id_(0), | 91 android_id_(0), |
95 security_token_(0), | 92 security_token_(0), |
96 connection_factory_(connection_factory), | 93 connection_factory_(connection_factory), |
97 connection_handler_(NULL), | 94 connection_handler_(NULL), |
98 last_device_to_server_stream_id_received_(0), | 95 last_device_to_server_stream_id_received_(0), |
99 last_server_to_device_stream_id_received_(0), | 96 last_server_to_device_stream_id_received_(0), |
100 stream_id_out_(0), | 97 stream_id_out_(0), |
101 stream_id_in_(0), | 98 stream_id_in_(0), |
102 rmq_store_(rmq_path, blocking_task_runner), | 99 rmq_store_(rmq_path, blocking_task_runner), |
103 heartbeat_interval_( | |
104 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)), | |
105 heartbeat_timer_(true, true), | |
106 blocking_task_runner_(blocking_task_runner), | 100 blocking_task_runner_(blocking_task_runner), |
107 weak_ptr_factory_(this) { | 101 weak_ptr_factory_(this) { |
108 } | 102 } |
109 | 103 |
110 MCSClient::~MCSClient() { | 104 MCSClient::~MCSClient() { |
111 } | 105 } |
112 | 106 |
113 void MCSClient::Initialize( | 107 void MCSClient::Initialize( |
114 const InitializationCompleteCallback& initialization_callback, | 108 const InitializationCompleteCallback& initialization_callback, |
115 const OnMessageReceivedCallback& message_received_callback, | 109 const OnMessageReceivedCallback& message_received_callback, |
(...skipping 11 matching lines...) Expand all Loading... |
127 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, | 121 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, |
128 weak_ptr_factory_.GetWeakPtr()), | 122 weak_ptr_factory_.GetWeakPtr()), |
129 base::Bind(&MCSClient::HandlePacketFromWire, | 123 base::Bind(&MCSClient::HandlePacketFromWire, |
130 weak_ptr_factory_.GetWeakPtr()), | 124 weak_ptr_factory_.GetWeakPtr()), |
131 base::Bind(&MCSClient::MaybeSendMessage, | 125 base::Bind(&MCSClient::MaybeSendMessage, |
132 weak_ptr_factory_.GetWeakPtr())); | 126 weak_ptr_factory_.GetWeakPtr())); |
133 connection_handler_ = connection_factory_->GetConnectionHandler(); | 127 connection_handler_ = connection_factory_->GetConnectionHandler(); |
134 } | 128 } |
135 | 129 |
136 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 130 void MCSClient::Login(uint64 android_id, uint64 security_token) { |
137 DCHECK_EQ(state_, LOADED); | |
138 if (android_id != android_id_ && security_token != security_token_) { | 131 if (android_id != android_id_ && security_token != security_token_) { |
139 DCHECK(android_id); | 132 DCHECK(android_id); |
140 DCHECK(security_token); | 133 DCHECK(security_token); |
141 DCHECK(restored_unackeds_server_ids_.empty()); | 134 DCHECK(restored_unackeds_server_ids_.empty()); |
142 android_id_ = android_id; | 135 android_id_ = android_id; |
143 security_token_ = security_token; | 136 security_token_ = security_token; |
144 rmq_store_.SetDeviceCredentials(android_id_, | 137 rmq_store_.SetDeviceCredentials(android_id_, |
145 security_token_, | 138 security_token_, |
146 base::Bind(&MCSClient::OnRMQUpdateFinished, | 139 base::Bind(&MCSClient::OnRMQUpdateFinished, |
147 weak_ptr_factory_.GetWeakPtr())); | 140 weak_ptr_factory_.GetWeakPtr())); |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
200 | 193 |
201 void MCSClient::ResetStateAndBuildLoginRequest( | 194 void MCSClient::ResetStateAndBuildLoginRequest( |
202 mcs_proto::LoginRequest* request) { | 195 mcs_proto::LoginRequest* request) { |
203 DCHECK(android_id_); | 196 DCHECK(android_id_); |
204 DCHECK(security_token_); | 197 DCHECK(security_token_); |
205 stream_id_in_ = 0; | 198 stream_id_in_ = 0; |
206 stream_id_out_ = 1; | 199 stream_id_out_ = 1; |
207 last_device_to_server_stream_id_received_ = 0; | 200 last_device_to_server_stream_id_received_ = 0; |
208 last_server_to_device_stream_id_received_ = 0; | 201 last_server_to_device_stream_id_received_ = 0; |
209 | 202 |
| 203 heartbeat_manager_.Stop(); |
| 204 |
210 // TODO(zea): expire all messages older than their TTL. | 205 // TODO(zea): expire all messages older than their TTL. |
211 | 206 |
212 // Add any pending acknowledgments to the list of ids. | 207 // Add any pending acknowledgments to the list of ids. |
213 for (StreamIdToPersistentIdMap::const_iterator iter = | 208 for (StreamIdToPersistentIdMap::const_iterator iter = |
214 unacked_server_ids_.begin(); | 209 unacked_server_ids_.begin(); |
215 iter != unacked_server_ids_.end(); ++iter) { | 210 iter != unacked_server_ids_.end(); ++iter) { |
216 restored_unackeds_server_ids_.push_back(iter->second); | 211 restored_unackeds_server_ids_.push_back(iter->second); |
217 } | 212 } |
218 unacked_server_ids_.clear(); | 213 unacked_server_ids_.clear(); |
219 | 214 |
(...skipping 22 matching lines...) Expand all Loading... |
242 // to RMQ, as all messages that reach this point should already have been | 237 // to RMQ, as all messages that reach this point should already have been |
243 // saved as necessary. | 238 // saved as necessary. |
244 while (!to_resend_.empty()) { | 239 while (!to_resend_.empty()) { |
245 to_send_.push_front(to_resend_.back()); | 240 to_send_.push_front(to_resend_.back()); |
246 to_resend_.pop_back(); | 241 to_resend_.pop_back(); |
247 } | 242 } |
248 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() | 243 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
249 << " incoming acks pending, and " << to_send_.size() | 244 << " incoming acks pending, and " << to_send_.size() |
250 << " pending outgoing messages."; | 245 << " pending outgoing messages."; |
251 | 246 |
252 heartbeat_timer_.Stop(); | |
253 | |
254 state_ = CONNECTING; | 247 state_ = CONNECTING; |
255 } | 248 } |
256 | 249 |
257 void MCSClient::SendHeartbeat() { | 250 void MCSClient::SendHeartbeat() { |
258 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), | 251 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), |
259 false); | 252 false); |
260 } | 253 } |
261 | 254 |
262 void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) { | 255 void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) { |
263 if (!result.success) { | 256 if (!result.success) { |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
327 | 320 |
328 DVLOG(1) << "Pending output message found, sending."; | 321 DVLOG(1) << "Pending output message found, sending."; |
329 MCSPacketInternal packet = to_send_.front(); | 322 MCSPacketInternal packet = to_send_.front(); |
330 to_send_.pop_front(); | 323 to_send_.pop_front(); |
331 if (!packet->persistent_id.empty()) | 324 if (!packet->persistent_id.empty()) |
332 to_resend_.push_back(packet); | 325 to_resend_.push_back(packet); |
333 SendPacketToWire(packet.get()); | 326 SendPacketToWire(packet.get()); |
334 } | 327 } |
335 | 328 |
336 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { | 329 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { |
337 // Reset the heartbeat interval. | |
338 heartbeat_timer_.Reset(); | |
339 packet_info->stream_id = ++stream_id_out_; | 330 packet_info->stream_id = ++stream_id_out_; |
340 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); | 331 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); |
341 | 332 |
342 // Set the proper last received stream id to acknowledge received server | 333 // Set the proper last received stream id to acknowledge received server |
343 // packets. | 334 // packets. |
344 DVLOG(1) << "Setting last stream id received to " | 335 DVLOG(1) << "Setting last stream id received to " |
345 << stream_id_in_; | 336 << stream_id_in_; |
346 SetLastStreamIdReceived(stream_id_in_, | 337 SetLastStreamIdReceived(stream_id_in_, |
347 packet_info->protobuf.get()); | 338 packet_info->protobuf.get()); |
348 if (stream_id_in_ != last_server_to_device_stream_id_received_) { | 339 if (stream_id_in_ != last_server_to_device_stream_id_received_) { |
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
442 << last_stream_id_received; | 433 << last_stream_id_received; |
443 | 434 |
444 if (unacked_server_ids_.size() > 0 && | 435 if (unacked_server_ids_.size() > 0 && |
445 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { | 436 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { |
446 SendMessage(MCSMessage(kIqStanzaTag, | 437 SendMessage(MCSMessage(kIqStanzaTag, |
447 BuildStreamAck(). | 438 BuildStreamAck(). |
448 PassAs<const google::protobuf::MessageLite>()), | 439 PassAs<const google::protobuf::MessageLite>()), |
449 false); | 440 false); |
450 } | 441 } |
451 | 442 |
| 443 // The connection is alive, treat this message as a heartbeat ack. |
| 444 heartbeat_manager_.OnHeartbeatAcked(); |
| 445 |
452 switch (tag) { | 446 switch (tag) { |
453 case kLoginResponseTag: { | 447 case kLoginResponseTag: { |
454 DCHECK_EQ(CONNECTING, state_); | 448 DCHECK_EQ(CONNECTING, state_); |
455 mcs_proto::LoginResponse* login_response = | 449 mcs_proto::LoginResponse* login_response = |
456 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); | 450 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); |
457 DVLOG(1) << "Received login response:"; | 451 DVLOG(1) << "Received login response:"; |
458 DVLOG(1) << " Id: " << login_response->id(); | 452 DVLOG(1) << " Id: " << login_response->id(); |
459 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); | 453 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); |
460 if (login_response->has_error()) { | 454 if (login_response->has_error() && login_response->error().code() != 0) { |
461 state_ = UNINITIALIZED; | 455 state_ = UNINITIALIZED; |
462 DVLOG(1) << " Error code: " << login_response->error().code(); | 456 DVLOG(1) << " Error code: " << login_response->error().code(); |
463 DVLOG(1) << " Error message: " << login_response->error().message(); | 457 DVLOG(1) << " Error message: " << login_response->error().message(); |
464 initialization_callback_.Run(false, 0, 0); | 458 initialization_callback_.Run(false, 0, 0); |
465 return; | 459 return; |
466 } | 460 } |
467 | 461 |
| 462 if (login_response->has_heartbeat_config()) { |
| 463 heartbeat_manager_.UpdateHeartbeatConfig( |
| 464 login_response->heartbeat_config()); |
| 465 } |
| 466 |
468 state_ = CONNECTED; | 467 state_ = CONNECTED; |
469 stream_id_in_ = 1; // To account for the login response. | 468 stream_id_in_ = 1; // To account for the login response. |
470 DCHECK_EQ(1U, stream_id_out_); | 469 DCHECK_EQ(1U, stream_id_out_); |
471 | 470 |
472 // Pass the login response on up. | 471 // Pass the login response on up. |
473 base::MessageLoop::current()->PostTask( | 472 base::MessageLoop::current()->PostTask( |
474 FROM_HERE, | 473 FROM_HERE, |
475 base::Bind(message_received_callback_, | 474 base::Bind(message_received_callback_, |
476 MCSMessage(tag, | 475 MCSMessage(tag, |
477 protobuf.PassAs< | 476 protobuf.PassAs< |
478 const google::protobuf::MessageLite>()))); | 477 const google::protobuf::MessageLite>()))); |
479 | 478 |
480 // If there are pending messages, attempt to send one. | 479 // If there are pending messages, attempt to send one. |
481 if (!to_send_.empty()) { | 480 if (!to_send_.empty()) { |
482 base::MessageLoop::current()->PostTask( | 481 base::MessageLoop::current()->PostTask( |
483 FROM_HERE, | 482 FROM_HERE, |
484 base::Bind(&MCSClient::MaybeSendMessage, | 483 base::Bind(&MCSClient::MaybeSendMessage, |
485 weak_ptr_factory_.GetWeakPtr())); | 484 weak_ptr_factory_.GetWeakPtr())); |
486 } | 485 } |
487 | 486 |
488 heartbeat_timer_.Start(FROM_HERE, | 487 heartbeat_manager_.Start( |
489 heartbeat_interval_, | 488 base::Bind(&MCSClient::SendHeartbeat, |
490 base::Bind(&MCSClient::SendHeartbeat, | 489 weak_ptr_factory_.GetWeakPtr()), |
491 weak_ptr_factory_.GetWeakPtr())); | 490 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, |
| 491 weak_ptr_factory_.GetWeakPtr())); |
492 return; | 492 return; |
493 } | 493 } |
494 case kHeartbeatPingTag: | 494 case kHeartbeatPingTag: |
495 DCHECK_GE(stream_id_in_, 1U); | 495 DCHECK_GE(stream_id_in_, 1U); |
496 DVLOG(1) << "Received heartbeat ping, sending ack."; | 496 DVLOG(1) << "Received heartbeat ping, sending ack."; |
497 SendMessage( | 497 SendMessage( |
498 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); | 498 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); |
499 return; | 499 return; |
500 case kHeartbeatAckTag: | 500 case kHeartbeatAckTag: |
501 DCHECK_GE(stream_id_in_, 1U); | 501 DCHECK_GE(stream_id_in_, 1U); |
502 DVLOG(1) << "Received heartbeat ack."; | 502 DVLOG(1) << "Received heartbeat ack."; |
503 // TODO(zea): add logic to reconnect if no ack received within a certain | 503 // Do nothing else, all messages act as heartbeat acks. |
504 // timeout (with backoff). | |
505 return; | 504 return; |
506 case kCloseTag: | 505 case kCloseTag: |
507 LOG(ERROR) << "Received close command, resetting connection."; | 506 LOG(ERROR) << "Received close command, resetting connection."; |
508 state_ = LOADED; | 507 state_ = LOADED; |
509 connection_factory_->SignalConnectionReset(); | 508 connection_factory_->SignalConnectionReset(); |
510 return; | 509 return; |
511 case kIqStanzaTag: { | 510 case kIqStanzaTag: { |
512 DCHECK_GE(stream_id_in_, 1U); | 511 DCHECK_GE(stream_id_in_, 1U); |
513 mcs_proto::IqStanza* iq_stanza = | 512 mcs_proto::IqStanza* iq_stanza = |
514 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); | 513 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); |
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
649 << " acknowledged server messages."; | 648 << " acknowledged server messages."; |
650 rmq_store_.RemoveIncomingMessages(acked_incoming_ids, | 649 rmq_store_.RemoveIncomingMessages(acked_incoming_ids, |
651 base::Bind(&MCSClient::OnRMQUpdateFinished, | 650 base::Bind(&MCSClient::OnRMQUpdateFinished, |
652 weak_ptr_factory_.GetWeakPtr())); | 651 weak_ptr_factory_.GetWeakPtr())); |
653 } | 652 } |
654 | 653 |
655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 654 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 655 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
657 } | 656 } |
658 | 657 |
| 658 void MCSClient::OnConnectionResetByHeartbeat() { |
| 659 connection_factory_->SignalConnectionReset(); |
| 660 } |
| 661 |
659 } // namespace gcm | 662 } // namespace gcm |
OLD | NEW |