Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: google_apis/gcm/engine/mcs_client.cc

Issue 118133003: [GCM] Add heartbeat manager and reconnection logic due to heartbeat failure (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/mcs_client.h" 5 #include "google_apis/gcm/engine/mcs_client.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h" 8 #include "base/message_loop/message_loop.h"
9 #include "base/strings/string_number_conversions.h" 9 #include "base/strings/string_number_conversions.h"
10 #include "google_apis/gcm/base/mcs_util.h" 10 #include "google_apis/gcm/base/mcs_util.h"
11 #include "google_apis/gcm/base/socket_stream.h" 11 #include "google_apis/gcm/base/socket_stream.h"
12 #include "google_apis/gcm/engine/connection_factory.h" 12 #include "google_apis/gcm/engine/connection_factory.h"
13 #include "google_apis/gcm/engine/rmq_store.h" 13 #include "google_apis/gcm/engine/rmq_store.h"
14 14
15 using namespace google::protobuf::io; 15 using namespace google::protobuf::io;
16 16
17 namespace gcm { 17 namespace gcm {
18 18
19 namespace { 19 namespace {
20 20
21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; 21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
22 22
23 // TODO(zea): get these values from MCS settings.
24 const int64 kHeartbeatDefaultSeconds = 60 * 15; // 15 minutes.
25
26 // The category of messages intended for the GCM client itself from MCS. 23 // The category of messages intended for the GCM client itself from MCS.
27 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; 24 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
28 25
29 // The from field for messages originating in the GCM client. 26 // The from field for messages originating in the GCM client.
30 const char kGCMFromField[] = "gcm@android.com"; 27 const char kGCMFromField[] = "gcm@android.com";
31 28
32 // MCS status message types. 29 // MCS status message types.
30 // TODO(zea): handle these at the GCMClient layer.
fgorski 2013/12/19 23:55:13 What exactly do you mean?
Nicolas Zea 2013/12/20 22:52:56 System status settings like these should be handle
33 const char kIdleNotification[] = "IdleNotification"; 31 const char kIdleNotification[] = "IdleNotification";
34 // TODO(zea): consume the following message types:
35 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; 32 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
36 // const char kPowerNotification[] = "PowerNotification"; 33 // const char kPowerNotification[] = "PowerNotification";
37 // const char kDataActiveNotification[] = "DataActiveNotification"; 34 // const char kDataActiveNotification[] = "DataActiveNotification";
38 35
39 // The number of unacked messages to allow before sending a stream ack. 36 // The number of unacked messages to allow before sending a stream ack.
40 // Applies to both incoming and outgoing messages. 37 // Applies to both incoming and outgoing messages.
41 // TODO(zea): make this server configurable. 38 // TODO(zea): make this server configurable.
42 const int kUnackedMessageBeforeStreamAck = 10; 39 const int kUnackedMessageBeforeStreamAck = 10;
43 40
44 // The global maximum number of pending messages to have in the send queue. 41 // The global maximum number of pending messages to have in the send queue.
(...skipping 48 matching lines...) Expand 10 before | Expand all | Expand 10 after
93 : state_(UNINITIALIZED), 90 : state_(UNINITIALIZED),
94 android_id_(0), 91 android_id_(0),
95 security_token_(0), 92 security_token_(0),
96 connection_factory_(connection_factory), 93 connection_factory_(connection_factory),
97 connection_handler_(NULL), 94 connection_handler_(NULL),
98 last_device_to_server_stream_id_received_(0), 95 last_device_to_server_stream_id_received_(0),
99 last_server_to_device_stream_id_received_(0), 96 last_server_to_device_stream_id_received_(0),
100 stream_id_out_(0), 97 stream_id_out_(0),
101 stream_id_in_(0), 98 stream_id_in_(0),
102 rmq_store_(rmq_path, blocking_task_runner), 99 rmq_store_(rmq_path, blocking_task_runner),
103 heartbeat_interval_(
104 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)),
105 heartbeat_timer_(true, true),
106 blocking_task_runner_(blocking_task_runner), 100 blocking_task_runner_(blocking_task_runner),
107 weak_ptr_factory_(this) { 101 weak_ptr_factory_(this) {
108 } 102 }
109 103
110 MCSClient::~MCSClient() { 104 MCSClient::~MCSClient() {
111 } 105 }
112 106
113 void MCSClient::Initialize( 107 void MCSClient::Initialize(
114 const InitializationCompleteCallback& initialization_callback, 108 const InitializationCompleteCallback& initialization_callback,
115 const OnMessageReceivedCallback& message_received_callback, 109 const OnMessageReceivedCallback& message_received_callback,
(...skipping 11 matching lines...) Expand all
127 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, 121 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
128 weak_ptr_factory_.GetWeakPtr()), 122 weak_ptr_factory_.GetWeakPtr()),
129 base::Bind(&MCSClient::HandlePacketFromWire, 123 base::Bind(&MCSClient::HandlePacketFromWire,
130 weak_ptr_factory_.GetWeakPtr()), 124 weak_ptr_factory_.GetWeakPtr()),
131 base::Bind(&MCSClient::MaybeSendMessage, 125 base::Bind(&MCSClient::MaybeSendMessage,
132 weak_ptr_factory_.GetWeakPtr())); 126 weak_ptr_factory_.GetWeakPtr()));
133 connection_handler_ = connection_factory_->GetConnectionHandler(); 127 connection_handler_ = connection_factory_->GetConnectionHandler();
134 } 128 }
135 129
136 void MCSClient::Login(uint64 android_id, uint64 security_token) { 130 void MCSClient::Login(uint64 android_id, uint64 security_token) {
137 DCHECK_EQ(state_, LOADED);
138 if (android_id != android_id_ && security_token != security_token_) { 131 if (android_id != android_id_ && security_token != security_token_) {
139 DCHECK(android_id); 132 DCHECK(android_id);
140 DCHECK(security_token); 133 DCHECK(security_token);
141 DCHECK(restored_unackeds_server_ids_.empty()); 134 DCHECK(restored_unackeds_server_ids_.empty());
142 android_id_ = android_id; 135 android_id_ = android_id;
143 security_token_ = security_token; 136 security_token_ = security_token;
144 rmq_store_.SetDeviceCredentials(android_id_, 137 rmq_store_.SetDeviceCredentials(android_id_,
145 security_token_, 138 security_token_,
146 base::Bind(&MCSClient::OnRMQUpdateFinished, 139 base::Bind(&MCSClient::OnRMQUpdateFinished,
147 weak_ptr_factory_.GetWeakPtr())); 140 weak_ptr_factory_.GetWeakPtr()));
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
200 193
201 void MCSClient::ResetStateAndBuildLoginRequest( 194 void MCSClient::ResetStateAndBuildLoginRequest(
202 mcs_proto::LoginRequest* request) { 195 mcs_proto::LoginRequest* request) {
203 DCHECK(android_id_); 196 DCHECK(android_id_);
204 DCHECK(security_token_); 197 DCHECK(security_token_);
205 stream_id_in_ = 0; 198 stream_id_in_ = 0;
206 stream_id_out_ = 1; 199 stream_id_out_ = 1;
207 last_device_to_server_stream_id_received_ = 0; 200 last_device_to_server_stream_id_received_ = 0;
208 last_server_to_device_stream_id_received_ = 0; 201 last_server_to_device_stream_id_received_ = 0;
209 202
203 heartbeat_manager_.Stop();
204
210 // TODO(zea): expire all messages older than their TTL. 205 // TODO(zea): expire all messages older than their TTL.
211 206
212 // Add any pending acknowledgments to the list of ids. 207 // Add any pending acknowledgments to the list of ids.
213 for (StreamIdToPersistentIdMap::const_iterator iter = 208 for (StreamIdToPersistentIdMap::const_iterator iter =
214 unacked_server_ids_.begin(); 209 unacked_server_ids_.begin();
215 iter != unacked_server_ids_.end(); ++iter) { 210 iter != unacked_server_ids_.end(); ++iter) {
216 restored_unackeds_server_ids_.push_back(iter->second); 211 restored_unackeds_server_ids_.push_back(iter->second);
217 } 212 }
218 unacked_server_ids_.clear(); 213 unacked_server_ids_.clear();
219 214
(...skipping 22 matching lines...) Expand all
242 // to RMQ, as all messages that reach this point should already have been 237 // to RMQ, as all messages that reach this point should already have been
243 // saved as necessary. 238 // saved as necessary.
244 while (!to_resend_.empty()) { 239 while (!to_resend_.empty()) {
245 to_send_.push_front(to_resend_.back()); 240 to_send_.push_front(to_resend_.back());
246 to_resend_.pop_back(); 241 to_resend_.pop_back();
247 } 242 }
248 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() 243 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
249 << " incoming acks pending, and " << to_send_.size() 244 << " incoming acks pending, and " << to_send_.size()
250 << " pending outgoing messages."; 245 << " pending outgoing messages.";
251 246
252 heartbeat_timer_.Stop();
253
254 state_ = CONNECTING; 247 state_ = CONNECTING;
255 } 248 }
256 249
257 void MCSClient::SendHeartbeat() { 250 void MCSClient::SendHeartbeat() {
258 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), 251 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()),
259 false); 252 false);
260 } 253 }
261 254
262 void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) { 255 void MCSClient::OnRMQLoadFinished(const RMQStore::LoadResult& result) {
263 if (!result.success) { 256 if (!result.success) {
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
327 320
328 DVLOG(1) << "Pending output message found, sending."; 321 DVLOG(1) << "Pending output message found, sending.";
329 MCSPacketInternal packet = to_send_.front(); 322 MCSPacketInternal packet = to_send_.front();
330 to_send_.pop_front(); 323 to_send_.pop_front();
331 if (!packet->persistent_id.empty()) 324 if (!packet->persistent_id.empty())
332 to_resend_.push_back(packet); 325 to_resend_.push_back(packet);
333 SendPacketToWire(packet.get()); 326 SendPacketToWire(packet.get());
334 } 327 }
335 328
336 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { 329 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
337 // Reset the heartbeat interval.
338 heartbeat_timer_.Reset();
339 packet_info->stream_id = ++stream_id_out_; 330 packet_info->stream_id = ++stream_id_out_;
340 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); 331 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
341 332
342 // Set the proper last received stream id to acknowledge received server 333 // Set the proper last received stream id to acknowledge received server
343 // packets. 334 // packets.
344 DVLOG(1) << "Setting last stream id received to " 335 DVLOG(1) << "Setting last stream id received to "
345 << stream_id_in_; 336 << stream_id_in_;
346 SetLastStreamIdReceived(stream_id_in_, 337 SetLastStreamIdReceived(stream_id_in_,
347 packet_info->protobuf.get()); 338 packet_info->protobuf.get());
348 if (stream_id_in_ != last_server_to_device_stream_id_received_) { 339 if (stream_id_in_ != last_server_to_device_stream_id_received_) {
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after
442 << last_stream_id_received; 433 << last_stream_id_received;
443 434
444 if (unacked_server_ids_.size() > 0 && 435 if (unacked_server_ids_.size() > 0 &&
445 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { 436 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
446 SendMessage(MCSMessage(kIqStanzaTag, 437 SendMessage(MCSMessage(kIqStanzaTag,
447 BuildStreamAck(). 438 BuildStreamAck().
448 PassAs<const google::protobuf::MessageLite>()), 439 PassAs<const google::protobuf::MessageLite>()),
449 false); 440 false);
450 } 441 }
451 442
443 // The connection is alive, treat this message as a heartbeat ack.
444 heartbeat_manager_.OnHeartbeatAcked();
fgorski 2013/12/19 23:55:13 Will that work if the manager hasn't been started
Nicolas Zea 2013/12/20 22:52:56 Yep, OnHeartbeatAcked does nothing if not running.
445
452 switch (tag) { 446 switch (tag) {
453 case kLoginResponseTag: { 447 case kLoginResponseTag: {
454 DCHECK_EQ(CONNECTING, state_); 448 DCHECK_EQ(CONNECTING, state_);
455 mcs_proto::LoginResponse* login_response = 449 mcs_proto::LoginResponse* login_response =
456 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); 450 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
457 DVLOG(1) << "Received login response:"; 451 DVLOG(1) << "Received login response:";
458 DVLOG(1) << " Id: " << login_response->id(); 452 DVLOG(1) << " Id: " << login_response->id();
459 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); 453 DVLOG(1) << " Timestamp: " << login_response->server_timestamp();
460 if (login_response->has_error()) { 454 if (login_response->has_error() && login_response->error().code() != 0) {
461 state_ = UNINITIALIZED; 455 state_ = UNINITIALIZED;
462 DVLOG(1) << " Error code: " << login_response->error().code(); 456 DVLOG(1) << " Error code: " << login_response->error().code();
463 DVLOG(1) << " Error message: " << login_response->error().message(); 457 DVLOG(1) << " Error message: " << login_response->error().message();
464 initialization_callback_.Run(false, 0, 0); 458 initialization_callback_.Run(false, 0, 0);
465 return; 459 return;
466 } 460 }
467 461
462 if (login_response->has_heartbeat_config()) {
463 heartbeat_manager_.UpdateHeartbeatConfig(
464 login_response->heartbeat_config());
465 }
466
468 state_ = CONNECTED; 467 state_ = CONNECTED;
469 stream_id_in_ = 1; // To account for the login response. 468 stream_id_in_ = 1; // To account for the login response.
470 DCHECK_EQ(1U, stream_id_out_); 469 DCHECK_EQ(1U, stream_id_out_);
471 470
472 // Pass the login response on up. 471 // Pass the login response on up.
473 base::MessageLoop::current()->PostTask( 472 base::MessageLoop::current()->PostTask(
474 FROM_HERE, 473 FROM_HERE,
475 base::Bind(message_received_callback_, 474 base::Bind(message_received_callback_,
476 MCSMessage(tag, 475 MCSMessage(tag,
477 protobuf.PassAs< 476 protobuf.PassAs<
478 const google::protobuf::MessageLite>()))); 477 const google::protobuf::MessageLite>())));
479 478
480 // If there are pending messages, attempt to send one. 479 // If there are pending messages, attempt to send one.
481 if (!to_send_.empty()) { 480 if (!to_send_.empty()) {
482 base::MessageLoop::current()->PostTask( 481 base::MessageLoop::current()->PostTask(
483 FROM_HERE, 482 FROM_HERE,
484 base::Bind(&MCSClient::MaybeSendMessage, 483 base::Bind(&MCSClient::MaybeSendMessage,
485 weak_ptr_factory_.GetWeakPtr())); 484 weak_ptr_factory_.GetWeakPtr()));
486 } 485 }
487 486
488 heartbeat_timer_.Start(FROM_HERE, 487 heartbeat_manager_.Start(
489 heartbeat_interval_, 488 base::Bind(&MCSClient::SendHeartbeat,
490 base::Bind(&MCSClient::SendHeartbeat, 489 weak_ptr_factory_.GetWeakPtr()),
491 weak_ptr_factory_.GetWeakPtr())); 490 base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
491 weak_ptr_factory_.GetWeakPtr()));
492 return; 492 return;
493 } 493 }
494 case kHeartbeatPingTag: 494 case kHeartbeatPingTag:
495 DCHECK_GE(stream_id_in_, 1U); 495 DCHECK_GE(stream_id_in_, 1U);
496 DVLOG(1) << "Received heartbeat ping, sending ack."; 496 DVLOG(1) << "Received heartbeat ping, sending ack.";
497 SendMessage( 497 SendMessage(
498 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); 498 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false);
499 return; 499 return;
500 case kHeartbeatAckTag: 500 case kHeartbeatAckTag:
501 DCHECK_GE(stream_id_in_, 1U); 501 DCHECK_GE(stream_id_in_, 1U);
502 DVLOG(1) << "Received heartbeat ack."; 502 DVLOG(1) << "Received heartbeat ack.";
503 // TODO(zea): add logic to reconnect if no ack received within a certain 503 // Do nothing else, all messages act as heartbeat acks.
504 // timeout (with backoff).
505 return; 504 return;
506 case kCloseTag: 505 case kCloseTag:
507 LOG(ERROR) << "Received close command, resetting connection."; 506 LOG(ERROR) << "Received close command, resetting connection.";
508 state_ = LOADED; 507 state_ = LOADED;
509 connection_factory_->SignalConnectionReset(); 508 connection_factory_->SignalConnectionReset();
510 return; 509 return;
511 case kIqStanzaTag: { 510 case kIqStanzaTag: {
512 DCHECK_GE(stream_id_in_, 1U); 511 DCHECK_GE(stream_id_in_, 1U);
513 mcs_proto::IqStanza* iq_stanza = 512 mcs_proto::IqStanza* iq_stanza =
514 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); 513 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
(...skipping 134 matching lines...) Expand 10 before | Expand all | Expand 10 after
649 << " acknowledged server messages."; 648 << " acknowledged server messages.";
650 rmq_store_.RemoveIncomingMessages(acked_incoming_ids, 649 rmq_store_.RemoveIncomingMessages(acked_incoming_ids,
651 base::Bind(&MCSClient::OnRMQUpdateFinished, 650 base::Bind(&MCSClient::OnRMQUpdateFinished,
652 weak_ptr_factory_.GetWeakPtr())); 651 weak_ptr_factory_.GetWeakPtr()));
653 } 652 }
654 653
655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { 654 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); 655 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
657 } 656 }
658 657
658 void MCSClient::OnConnectionResetByHeartbeat() {
659 connection_factory_->SignalConnectionReset();
660 }
661
659 } // namespace gcm 662 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698