OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
10 #include "google_apis/gcm/base/mcs_util.h" | 10 #include "google_apis/gcm/base/mcs_util.h" |
11 #include "google_apis/gcm/base/socket_stream.h" | 11 #include "google_apis/gcm/base/socket_stream.h" |
12 #include "google_apis/gcm/engine/connection_factory.h" | 12 #include "google_apis/gcm/engine/connection_factory.h" |
13 #include "google_apis/gcm/engine/rmq_store.h" | 13 #include "google_apis/gcm/engine/rmq_store.h" |
14 | 14 |
15 using namespace google::protobuf::io; | 15 using namespace google::protobuf::io; |
16 | 16 |
17 namespace gcm { | 17 namespace gcm { |
18 | 18 |
19 namespace { | 19 namespace { |
20 | 20 |
21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; | 21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; |
22 | 22 |
23 // TODO(zea): get these values from MCS settings. | |
24 const int64 kHeartbeatDefaultSeconds = 60 * 15; // 15 minutes. | |
25 | |
26 // The category of messages intended for the GCM client itself from MCS. | 23 // The category of messages intended for the GCM client itself from MCS. |
27 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; | 24 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; |
28 | 25 |
29 // The from field for messages originating in the GCM client. | 26 // The from field for messages originating in the GCM client. |
30 const char kGCMFromField[] = "gcm@android.com"; | 27 const char kGCMFromField[] = "gcm@android.com"; |
31 | 28 |
32 // MCS status message types. | 29 // MCS status message types. |
| 30 // TODO(zea): handle these at the GCMClient layer. |
33 const char kIdleNotification[] = "IdleNotification"; | 31 const char kIdleNotification[] = "IdleNotification"; |
34 // TODO(zea): consume the following message types: | |
35 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; | 32 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; |
36 // const char kPowerNotification[] = "PowerNotification"; | 33 // const char kPowerNotification[] = "PowerNotification"; |
37 // const char kDataActiveNotification[] = "DataActiveNotification"; | 34 // const char kDataActiveNotification[] = "DataActiveNotification"; |
38 | 35 |
39 // The number of unacked messages to allow before sending a stream ack. | 36 // The number of unacked messages to allow before sending a stream ack. |
40 // Applies to both incoming and outgoing messages. | 37 // Applies to both incoming and outgoing messages. |
41 // TODO(zea): make this server configurable. | 38 // TODO(zea): make this server configurable. |
42 const int kUnackedMessageBeforeStreamAck = 10; | 39 const int kUnackedMessageBeforeStreamAck = 10; |
43 | 40 |
44 // The global maximum number of pending messages to have in the send queue. | 41 // The global maximum number of pending messages to have in the send queue. |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
90 : state_(UNINITIALIZED), | 87 : state_(UNINITIALIZED), |
91 android_id_(0), | 88 android_id_(0), |
92 security_token_(0), | 89 security_token_(0), |
93 connection_factory_(connection_factory), | 90 connection_factory_(connection_factory), |
94 connection_handler_(NULL), | 91 connection_handler_(NULL), |
95 last_device_to_server_stream_id_received_(0), | 92 last_device_to_server_stream_id_received_(0), |
96 last_server_to_device_stream_id_received_(0), | 93 last_server_to_device_stream_id_received_(0), |
97 stream_id_out_(0), | 94 stream_id_out_(0), |
98 stream_id_in_(0), | 95 stream_id_in_(0), |
99 rmq_store_(rmq_store), | 96 rmq_store_(rmq_store), |
100 heartbeat_interval_( | |
101 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)), | |
102 heartbeat_timer_(true, true), | |
103 weak_ptr_factory_(this) { | 97 weak_ptr_factory_(this) { |
104 } | 98 } |
105 | 99 |
106 MCSClient::~MCSClient() { | 100 MCSClient::~MCSClient() { |
107 } | 101 } |
108 | 102 |
109 void MCSClient::Initialize( | 103 void MCSClient::Initialize( |
110 const InitializationCompleteCallback& initialization_callback, | 104 const InitializationCompleteCallback& initialization_callback, |
111 const OnMessageReceivedCallback& message_received_callback, | 105 const OnMessageReceivedCallback& message_received_callback, |
112 const OnMessageSentCallback& message_sent_callback, | 106 const OnMessageSentCallback& message_sent_callback, |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
176 packet_info->persistent_id = base::Uint64ToString(iter->first); | 170 packet_info->persistent_id = base::Uint64ToString(iter->first); |
177 to_send_.push_back(make_linked_ptr(packet_info)); | 171 to_send_.push_back(make_linked_ptr(packet_info)); |
178 } | 172 } |
179 | 173 |
180 // TODO(fgorski): that is likely the only place where the initialization | 174 // TODO(fgorski): that is likely the only place where the initialization |
181 // callback could be used. | 175 // callback could be used. |
182 initialization_callback_.Run(true, android_id_, security_token_); | 176 initialization_callback_.Run(true, android_id_, security_token_); |
183 } | 177 } |
184 | 178 |
185 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 179 void MCSClient::Login(uint64 android_id, uint64 security_token) { |
186 DCHECK_EQ(state_, LOADED); | |
187 if (android_id != android_id_ && security_token != security_token_) { | 180 if (android_id != android_id_ && security_token != security_token_) { |
188 DCHECK(android_id); | 181 DCHECK(android_id); |
189 DCHECK(security_token); | 182 DCHECK(security_token); |
190 DCHECK(restored_unackeds_server_ids_.empty()); | 183 DCHECK(restored_unackeds_server_ids_.empty()); |
191 android_id_ = android_id; | 184 android_id_ = android_id; |
192 security_token_ = security_token; | 185 security_token_ = security_token; |
193 rmq_store_->SetDeviceCredentials( | 186 rmq_store_->SetDeviceCredentials( |
194 android_id_, | 187 android_id_, |
195 security_token_, | 188 security_token_, |
196 base::Bind(&MCSClient::OnRMQUpdateFinished, | 189 base::Bind(&MCSClient::OnRMQUpdateFinished, |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
250 | 243 |
251 void MCSClient::ResetStateAndBuildLoginRequest( | 244 void MCSClient::ResetStateAndBuildLoginRequest( |
252 mcs_proto::LoginRequest* request) { | 245 mcs_proto::LoginRequest* request) { |
253 DCHECK(android_id_); | 246 DCHECK(android_id_); |
254 DCHECK(security_token_); | 247 DCHECK(security_token_); |
255 stream_id_in_ = 0; | 248 stream_id_in_ = 0; |
256 stream_id_out_ = 1; | 249 stream_id_out_ = 1; |
257 last_device_to_server_stream_id_received_ = 0; | 250 last_device_to_server_stream_id_received_ = 0; |
258 last_server_to_device_stream_id_received_ = 0; | 251 last_server_to_device_stream_id_received_ = 0; |
259 | 252 |
| 253 heartbeat_manager_.Stop(); |
| 254 |
260 // TODO(zea): expire all messages older than their TTL. | 255 // TODO(zea): expire all messages older than their TTL. |
261 | 256 |
262 // Add any pending acknowledgments to the list of ids. | 257 // Add any pending acknowledgments to the list of ids. |
263 for (StreamIdToPersistentIdMap::const_iterator iter = | 258 for (StreamIdToPersistentIdMap::const_iterator iter = |
264 unacked_server_ids_.begin(); | 259 unacked_server_ids_.begin(); |
265 iter != unacked_server_ids_.end(); ++iter) { | 260 iter != unacked_server_ids_.end(); ++iter) { |
266 restored_unackeds_server_ids_.push_back(iter->second); | 261 restored_unackeds_server_ids_.push_back(iter->second); |
267 } | 262 } |
268 unacked_server_ids_.clear(); | 263 unacked_server_ids_.clear(); |
269 | 264 |
(...skipping 22 matching lines...) Expand all Loading... |
292 // to RMQ, as all messages that reach this point should already have been | 287 // to RMQ, as all messages that reach this point should already have been |
293 // saved as necessary. | 288 // saved as necessary. |
294 while (!to_resend_.empty()) { | 289 while (!to_resend_.empty()) { |
295 to_send_.push_front(to_resend_.back()); | 290 to_send_.push_front(to_resend_.back()); |
296 to_resend_.pop_back(); | 291 to_resend_.pop_back(); |
297 } | 292 } |
298 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() | 293 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
299 << " incoming acks pending, and " << to_send_.size() | 294 << " incoming acks pending, and " << to_send_.size() |
300 << " pending outgoing messages."; | 295 << " pending outgoing messages."; |
301 | 296 |
302 heartbeat_timer_.Stop(); | |
303 | |
304 state_ = CONNECTING; | 297 state_ = CONNECTING; |
305 } | 298 } |
306 | 299 |
307 void MCSClient::SendHeartbeat() { | 300 void MCSClient::SendHeartbeat() { |
308 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), | 301 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), |
309 false); | 302 false); |
310 } | 303 } |
311 | 304 |
312 void MCSClient::OnRMQUpdateFinished(bool success) { | 305 void MCSClient::OnRMQUpdateFinished(bool success) { |
313 LOG_IF(ERROR, !success) << "RMQ Update failed!"; | 306 LOG_IF(ERROR, !success) << "RMQ Update failed!"; |
(...skipping 11 matching lines...) Expand all Loading... |
325 | 318 |
326 DVLOG(1) << "Pending output message found, sending."; | 319 DVLOG(1) << "Pending output message found, sending."; |
327 MCSPacketInternal packet = to_send_.front(); | 320 MCSPacketInternal packet = to_send_.front(); |
328 to_send_.pop_front(); | 321 to_send_.pop_front(); |
329 if (!packet->persistent_id.empty()) | 322 if (!packet->persistent_id.empty()) |
330 to_resend_.push_back(packet); | 323 to_resend_.push_back(packet); |
331 SendPacketToWire(packet.get()); | 324 SendPacketToWire(packet.get()); |
332 } | 325 } |
333 | 326 |
334 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { | 327 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { |
335 // Reset the heartbeat interval. | |
336 heartbeat_timer_.Reset(); | |
337 packet_info->stream_id = ++stream_id_out_; | 328 packet_info->stream_id = ++stream_id_out_; |
338 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); | 329 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); |
339 | 330 |
340 // Set the proper last received stream id to acknowledge received server | 331 // Set the proper last received stream id to acknowledge received server |
341 // packets. | 332 // packets. |
342 DVLOG(1) << "Setting last stream id received to " | 333 DVLOG(1) << "Setting last stream id received to " |
343 << stream_id_in_; | 334 << stream_id_in_; |
344 SetLastStreamIdReceived(stream_id_in_, | 335 SetLastStreamIdReceived(stream_id_in_, |
345 packet_info->protobuf.get()); | 336 packet_info->protobuf.get()); |
346 if (stream_id_in_ != last_server_to_device_stream_id_received_) { | 337 if (stream_id_in_ != last_server_to_device_stream_id_received_) { |
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
440 << last_stream_id_received; | 431 << last_stream_id_received; |
441 | 432 |
442 if (unacked_server_ids_.size() > 0 && | 433 if (unacked_server_ids_.size() > 0 && |
443 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { | 434 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { |
444 SendMessage(MCSMessage(kIqStanzaTag, | 435 SendMessage(MCSMessage(kIqStanzaTag, |
445 BuildStreamAck(). | 436 BuildStreamAck(). |
446 PassAs<const google::protobuf::MessageLite>()), | 437 PassAs<const google::protobuf::MessageLite>()), |
447 false); | 438 false); |
448 } | 439 } |
449 | 440 |
| 441 // The connection is alive, treat this message as a heartbeat ack. |
| 442 heartbeat_manager_.OnHeartbeatAcked(); |
| 443 |
450 switch (tag) { | 444 switch (tag) { |
451 case kLoginResponseTag: { | 445 case kLoginResponseTag: { |
452 DCHECK_EQ(CONNECTING, state_); | 446 DCHECK_EQ(CONNECTING, state_); |
453 mcs_proto::LoginResponse* login_response = | 447 mcs_proto::LoginResponse* login_response = |
454 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); | 448 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); |
455 DVLOG(1) << "Received login response:"; | 449 DVLOG(1) << "Received login response:"; |
456 DVLOG(1) << " Id: " << login_response->id(); | 450 DVLOG(1) << " Id: " << login_response->id(); |
457 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); | 451 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); |
458 if (login_response->has_error()) { | 452 if (login_response->has_error() && login_response->error().code() != 0) { |
459 state_ = UNINITIALIZED; | 453 state_ = UNINITIALIZED; |
460 DVLOG(1) << " Error code: " << login_response->error().code(); | 454 DVLOG(1) << " Error code: " << login_response->error().code(); |
461 DVLOG(1) << " Error message: " << login_response->error().message(); | 455 DVLOG(1) << " Error message: " << login_response->error().message(); |
462 initialization_callback_.Run(false, 0, 0); | 456 initialization_callback_.Run(false, 0, 0); |
463 return; | 457 return; |
464 } | 458 } |
465 | 459 |
| 460 if (login_response->has_heartbeat_config()) { |
| 461 heartbeat_manager_.UpdateHeartbeatConfig( |
| 462 login_response->heartbeat_config()); |
| 463 } |
| 464 |
466 state_ = CONNECTED; | 465 state_ = CONNECTED; |
467 stream_id_in_ = 1; // To account for the login response. | 466 stream_id_in_ = 1; // To account for the login response. |
468 DCHECK_EQ(1U, stream_id_out_); | 467 DCHECK_EQ(1U, stream_id_out_); |
469 | 468 |
470 // Pass the login response on up. | 469 // Pass the login response on up. |
471 base::MessageLoop::current()->PostTask( | 470 base::MessageLoop::current()->PostTask( |
472 FROM_HERE, | 471 FROM_HERE, |
473 base::Bind(message_received_callback_, | 472 base::Bind(message_received_callback_, |
474 MCSMessage(tag, | 473 MCSMessage(tag, |
475 protobuf.PassAs< | 474 protobuf.PassAs< |
476 const google::protobuf::MessageLite>()))); | 475 const google::protobuf::MessageLite>()))); |
477 | 476 |
478 // If there are pending messages, attempt to send one. | 477 // If there are pending messages, attempt to send one. |
479 if (!to_send_.empty()) { | 478 if (!to_send_.empty()) { |
480 base::MessageLoop::current()->PostTask( | 479 base::MessageLoop::current()->PostTask( |
481 FROM_HERE, | 480 FROM_HERE, |
482 base::Bind(&MCSClient::MaybeSendMessage, | 481 base::Bind(&MCSClient::MaybeSendMessage, |
483 weak_ptr_factory_.GetWeakPtr())); | 482 weak_ptr_factory_.GetWeakPtr())); |
484 } | 483 } |
485 | 484 |
486 heartbeat_timer_.Start(FROM_HERE, | 485 heartbeat_manager_.Start( |
487 heartbeat_interval_, | 486 base::Bind(&MCSClient::SendHeartbeat, |
488 base::Bind(&MCSClient::SendHeartbeat, | 487 weak_ptr_factory_.GetWeakPtr()), |
489 weak_ptr_factory_.GetWeakPtr())); | 488 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, |
| 489 weak_ptr_factory_.GetWeakPtr())); |
490 return; | 490 return; |
491 } | 491 } |
492 case kHeartbeatPingTag: | 492 case kHeartbeatPingTag: |
493 DCHECK_GE(stream_id_in_, 1U); | 493 DCHECK_GE(stream_id_in_, 1U); |
494 DVLOG(1) << "Received heartbeat ping, sending ack."; | 494 DVLOG(1) << "Received heartbeat ping, sending ack."; |
495 SendMessage( | 495 SendMessage( |
496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); | 496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); |
497 return; | 497 return; |
498 case kHeartbeatAckTag: | 498 case kHeartbeatAckTag: |
499 DCHECK_GE(stream_id_in_, 1U); | 499 DCHECK_GE(stream_id_in_, 1U); |
500 DVLOG(1) << "Received heartbeat ack."; | 500 DVLOG(1) << "Received heartbeat ack."; |
501 // TODO(zea): add logic to reconnect if no ack received within a certain | 501 // Do nothing else, all messages act as heartbeat acks. |
502 // timeout (with backoff). | |
503 return; | 502 return; |
504 case kCloseTag: | 503 case kCloseTag: |
505 LOG(ERROR) << "Received close command, resetting connection."; | 504 LOG(ERROR) << "Received close command, resetting connection."; |
506 state_ = LOADED; | 505 state_ = LOADED; |
507 connection_factory_->SignalConnectionReset(); | 506 connection_factory_->SignalConnectionReset(); |
508 return; | 507 return; |
509 case kIqStanzaTag: { | 508 case kIqStanzaTag: { |
510 DCHECK_GE(stream_id_in_, 1U); | 509 DCHECK_GE(stream_id_in_, 1U); |
511 mcs_proto::IqStanza* iq_stanza = | 510 mcs_proto::IqStanza* iq_stanza = |
512 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); | 511 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); |
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
650 rmq_store_->RemoveIncomingMessages( | 649 rmq_store_->RemoveIncomingMessages( |
651 acked_incoming_ids, | 650 acked_incoming_ids, |
652 base::Bind(&MCSClient::OnRMQUpdateFinished, | 651 base::Bind(&MCSClient::OnRMQUpdateFinished, |
653 weak_ptr_factory_.GetWeakPtr())); | 652 weak_ptr_factory_.GetWeakPtr())); |
654 } | 653 } |
655 | 654 |
656 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
657 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
658 } | 657 } |
659 | 658 |
| 659 void MCSClient::OnConnectionResetByHeartbeat() { |
| 660 connection_factory_->SignalConnectionReset(); |
| 661 } |
| 662 |
660 } // namespace gcm | 663 } // namespace gcm |
OLD | NEW |