Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(571)

Side by Side Diff: google_apis/gcm/engine/mcs_client.cc

Issue 118133003: [GCM] Add heartbeat manager and reconnection logic due to heartbeat failure (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Unrevert fixes Created 6 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/gcm.gyp » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/mcs_client.h" 5 #include "google_apis/gcm/engine/mcs_client.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h" 8 #include "base/message_loop/message_loop.h"
9 #include "base/strings/string_number_conversions.h" 9 #include "base/strings/string_number_conversions.h"
10 #include "google_apis/gcm/base/mcs_util.h" 10 #include "google_apis/gcm/base/mcs_util.h"
11 #include "google_apis/gcm/base/socket_stream.h" 11 #include "google_apis/gcm/base/socket_stream.h"
12 #include "google_apis/gcm/engine/connection_factory.h" 12 #include "google_apis/gcm/engine/connection_factory.h"
13 #include "google_apis/gcm/engine/rmq_store.h" 13 #include "google_apis/gcm/engine/rmq_store.h"
14 14
15 using namespace google::protobuf::io; 15 using namespace google::protobuf::io;
16 16
17 namespace gcm { 17 namespace gcm {
18 18
19 namespace { 19 namespace {
20 20
21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; 21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
22 22
23 // TODO(zea): get these values from MCS settings.
24 const int64 kHeartbeatDefaultSeconds = 60 * 15; // 15 minutes.
25
26 // The category of messages intended for the GCM client itself from MCS. 23 // The category of messages intended for the GCM client itself from MCS.
27 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice"; 24 const char kMCSCategory[] = "com.google.android.gsf.gtalkservice";
28 25
29 // The from field for messages originating in the GCM client. 26 // The from field for messages originating in the GCM client.
30 const char kGCMFromField[] = "gcm@android.com"; 27 const char kGCMFromField[] = "gcm@android.com";
31 28
32 // MCS status message types. 29 // MCS status message types.
30 // TODO(zea): handle these at the GCMClient layer.
33 const char kIdleNotification[] = "IdleNotification"; 31 const char kIdleNotification[] = "IdleNotification";
34 // TODO(zea): consume the following message types:
35 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle"; 32 // const char kAlwaysShowOnIdle[] = "ShowAwayOnIdle";
36 // const char kPowerNotification[] = "PowerNotification"; 33 // const char kPowerNotification[] = "PowerNotification";
37 // const char kDataActiveNotification[] = "DataActiveNotification"; 34 // const char kDataActiveNotification[] = "DataActiveNotification";
38 35
39 // The number of unacked messages to allow before sending a stream ack. 36 // The number of unacked messages to allow before sending a stream ack.
40 // Applies to both incoming and outgoing messages. 37 // Applies to both incoming and outgoing messages.
41 // TODO(zea): make this server configurable. 38 // TODO(zea): make this server configurable.
42 const int kUnackedMessageBeforeStreamAck = 10; 39 const int kUnackedMessageBeforeStreamAck = 10;
43 40
44 // The global maximum number of pending messages to have in the send queue. 41 // The global maximum number of pending messages to have in the send queue.
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
90 : state_(UNINITIALIZED), 87 : state_(UNINITIALIZED),
91 android_id_(0), 88 android_id_(0),
92 security_token_(0), 89 security_token_(0),
93 connection_factory_(connection_factory), 90 connection_factory_(connection_factory),
94 connection_handler_(NULL), 91 connection_handler_(NULL),
95 last_device_to_server_stream_id_received_(0), 92 last_device_to_server_stream_id_received_(0),
96 last_server_to_device_stream_id_received_(0), 93 last_server_to_device_stream_id_received_(0),
97 stream_id_out_(0), 94 stream_id_out_(0),
98 stream_id_in_(0), 95 stream_id_in_(0),
99 rmq_store_(rmq_store), 96 rmq_store_(rmq_store),
100 heartbeat_interval_(
101 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)),
102 heartbeat_timer_(true, true),
103 weak_ptr_factory_(this) { 97 weak_ptr_factory_(this) {
104 } 98 }
105 99
106 MCSClient::~MCSClient() { 100 MCSClient::~MCSClient() {
107 } 101 }
108 102
109 void MCSClient::Initialize( 103 void MCSClient::Initialize(
110 const InitializationCompleteCallback& initialization_callback, 104 const InitializationCompleteCallback& initialization_callback,
111 const OnMessageReceivedCallback& message_received_callback, 105 const OnMessageReceivedCallback& message_received_callback,
112 const OnMessageSentCallback& message_sent_callback, 106 const OnMessageSentCallback& message_sent_callback,
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
176 packet_info->persistent_id = base::Uint64ToString(iter->first); 170 packet_info->persistent_id = base::Uint64ToString(iter->first);
177 to_send_.push_back(make_linked_ptr(packet_info)); 171 to_send_.push_back(make_linked_ptr(packet_info));
178 } 172 }
179 173
180 // TODO(fgorski): that is likely the only place where the initialization 174 // TODO(fgorski): that is likely the only place where the initialization
181 // callback could be used. 175 // callback could be used.
182 initialization_callback_.Run(true, android_id_, security_token_); 176 initialization_callback_.Run(true, android_id_, security_token_);
183 } 177 }
184 178
185 void MCSClient::Login(uint64 android_id, uint64 security_token) { 179 void MCSClient::Login(uint64 android_id, uint64 security_token) {
186 DCHECK_EQ(state_, LOADED);
187 if (android_id != android_id_ && security_token != security_token_) { 180 if (android_id != android_id_ && security_token != security_token_) {
188 DCHECK(android_id); 181 DCHECK(android_id);
189 DCHECK(security_token); 182 DCHECK(security_token);
190 DCHECK(restored_unackeds_server_ids_.empty()); 183 DCHECK(restored_unackeds_server_ids_.empty());
191 android_id_ = android_id; 184 android_id_ = android_id;
192 security_token_ = security_token; 185 security_token_ = security_token;
193 rmq_store_->SetDeviceCredentials( 186 rmq_store_->SetDeviceCredentials(
194 android_id_, 187 android_id_,
195 security_token_, 188 security_token_,
196 base::Bind(&MCSClient::OnRMQUpdateFinished, 189 base::Bind(&MCSClient::OnRMQUpdateFinished,
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
250 243
251 void MCSClient::ResetStateAndBuildLoginRequest( 244 void MCSClient::ResetStateAndBuildLoginRequest(
252 mcs_proto::LoginRequest* request) { 245 mcs_proto::LoginRequest* request) {
253 DCHECK(android_id_); 246 DCHECK(android_id_);
254 DCHECK(security_token_); 247 DCHECK(security_token_);
255 stream_id_in_ = 0; 248 stream_id_in_ = 0;
256 stream_id_out_ = 1; 249 stream_id_out_ = 1;
257 last_device_to_server_stream_id_received_ = 0; 250 last_device_to_server_stream_id_received_ = 0;
258 last_server_to_device_stream_id_received_ = 0; 251 last_server_to_device_stream_id_received_ = 0;
259 252
253 heartbeat_manager_.Stop();
254
260 // TODO(zea): expire all messages older than their TTL. 255 // TODO(zea): expire all messages older than their TTL.
261 256
262 // Add any pending acknowledgments to the list of ids. 257 // Add any pending acknowledgments to the list of ids.
263 for (StreamIdToPersistentIdMap::const_iterator iter = 258 for (StreamIdToPersistentIdMap::const_iterator iter =
264 unacked_server_ids_.begin(); 259 unacked_server_ids_.begin();
265 iter != unacked_server_ids_.end(); ++iter) { 260 iter != unacked_server_ids_.end(); ++iter) {
266 restored_unackeds_server_ids_.push_back(iter->second); 261 restored_unackeds_server_ids_.push_back(iter->second);
267 } 262 }
268 unacked_server_ids_.clear(); 263 unacked_server_ids_.clear();
269 264
(...skipping 22 matching lines...) Expand all
292 // to RMQ, as all messages that reach this point should already have been 287 // to RMQ, as all messages that reach this point should already have been
293 // saved as necessary. 288 // saved as necessary.
294 while (!to_resend_.empty()) { 289 while (!to_resend_.empty()) {
295 to_send_.push_front(to_resend_.back()); 290 to_send_.push_front(to_resend_.back());
296 to_resend_.pop_back(); 291 to_resend_.pop_back();
297 } 292 }
298 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() 293 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
299 << " incoming acks pending, and " << to_send_.size() 294 << " incoming acks pending, and " << to_send_.size()
300 << " pending outgoing messages."; 295 << " pending outgoing messages.";
301 296
302 heartbeat_timer_.Stop();
303
304 state_ = CONNECTING; 297 state_ = CONNECTING;
305 } 298 }
306 299
307 void MCSClient::SendHeartbeat() { 300 void MCSClient::SendHeartbeat() {
308 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), 301 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()),
309 false); 302 false);
310 } 303 }
311 304
312 void MCSClient::OnRMQUpdateFinished(bool success) { 305 void MCSClient::OnRMQUpdateFinished(bool success) {
313 LOG_IF(ERROR, !success) << "RMQ Update failed!"; 306 LOG_IF(ERROR, !success) << "RMQ Update failed!";
(...skipping 11 matching lines...) Expand all
325 318
326 DVLOG(1) << "Pending output message found, sending."; 319 DVLOG(1) << "Pending output message found, sending.";
327 MCSPacketInternal packet = to_send_.front(); 320 MCSPacketInternal packet = to_send_.front();
328 to_send_.pop_front(); 321 to_send_.pop_front();
329 if (!packet->persistent_id.empty()) 322 if (!packet->persistent_id.empty())
330 to_resend_.push_back(packet); 323 to_resend_.push_back(packet);
331 SendPacketToWire(packet.get()); 324 SendPacketToWire(packet.get());
332 } 325 }
333 326
334 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { 327 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
335 // Reset the heartbeat interval.
336 heartbeat_timer_.Reset();
337 packet_info->stream_id = ++stream_id_out_; 328 packet_info->stream_id = ++stream_id_out_;
338 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); 329 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
339 330
340 // Set the proper last received stream id to acknowledge received server 331 // Set the proper last received stream id to acknowledge received server
341 // packets. 332 // packets.
342 DVLOG(1) << "Setting last stream id received to " 333 DVLOG(1) << "Setting last stream id received to "
343 << stream_id_in_; 334 << stream_id_in_;
344 SetLastStreamIdReceived(stream_id_in_, 335 SetLastStreamIdReceived(stream_id_in_,
345 packet_info->protobuf.get()); 336 packet_info->protobuf.get());
346 if (stream_id_in_ != last_server_to_device_stream_id_received_) { 337 if (stream_id_in_ != last_server_to_device_stream_id_received_) {
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after
440 << last_stream_id_received; 431 << last_stream_id_received;
441 432
442 if (unacked_server_ids_.size() > 0 && 433 if (unacked_server_ids_.size() > 0 &&
443 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { 434 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
444 SendMessage(MCSMessage(kIqStanzaTag, 435 SendMessage(MCSMessage(kIqStanzaTag,
445 BuildStreamAck(). 436 BuildStreamAck().
446 PassAs<const google::protobuf::MessageLite>()), 437 PassAs<const google::protobuf::MessageLite>()),
447 false); 438 false);
448 } 439 }
449 440
441 // The connection is alive, treat this message as a heartbeat ack.
442 heartbeat_manager_.OnHeartbeatAcked();
443
450 switch (tag) { 444 switch (tag) {
451 case kLoginResponseTag: { 445 case kLoginResponseTag: {
452 DCHECK_EQ(CONNECTING, state_); 446 DCHECK_EQ(CONNECTING, state_);
453 mcs_proto::LoginResponse* login_response = 447 mcs_proto::LoginResponse* login_response =
454 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); 448 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
455 DVLOG(1) << "Received login response:"; 449 DVLOG(1) << "Received login response:";
456 DVLOG(1) << " Id: " << login_response->id(); 450 DVLOG(1) << " Id: " << login_response->id();
457 DVLOG(1) << " Timestamp: " << login_response->server_timestamp(); 451 DVLOG(1) << " Timestamp: " << login_response->server_timestamp();
458 if (login_response->has_error()) { 452 if (login_response->has_error() && login_response->error().code() != 0) {
459 state_ = UNINITIALIZED; 453 state_ = UNINITIALIZED;
460 DVLOG(1) << " Error code: " << login_response->error().code(); 454 DVLOG(1) << " Error code: " << login_response->error().code();
461 DVLOG(1) << " Error message: " << login_response->error().message(); 455 DVLOG(1) << " Error message: " << login_response->error().message();
462 initialization_callback_.Run(false, 0, 0); 456 initialization_callback_.Run(false, 0, 0);
463 return; 457 return;
464 } 458 }
465 459
460 if (login_response->has_heartbeat_config()) {
461 heartbeat_manager_.UpdateHeartbeatConfig(
462 login_response->heartbeat_config());
463 }
464
466 state_ = CONNECTED; 465 state_ = CONNECTED;
467 stream_id_in_ = 1; // To account for the login response. 466 stream_id_in_ = 1; // To account for the login response.
468 DCHECK_EQ(1U, stream_id_out_); 467 DCHECK_EQ(1U, stream_id_out_);
469 468
470 // Pass the login response on up. 469 // Pass the login response on up.
471 base::MessageLoop::current()->PostTask( 470 base::MessageLoop::current()->PostTask(
472 FROM_HERE, 471 FROM_HERE,
473 base::Bind(message_received_callback_, 472 base::Bind(message_received_callback_,
474 MCSMessage(tag, 473 MCSMessage(tag,
475 protobuf.PassAs< 474 protobuf.PassAs<
476 const google::protobuf::MessageLite>()))); 475 const google::protobuf::MessageLite>())));
477 476
478 // If there are pending messages, attempt to send one. 477 // If there are pending messages, attempt to send one.
479 if (!to_send_.empty()) { 478 if (!to_send_.empty()) {
480 base::MessageLoop::current()->PostTask( 479 base::MessageLoop::current()->PostTask(
481 FROM_HERE, 480 FROM_HERE,
482 base::Bind(&MCSClient::MaybeSendMessage, 481 base::Bind(&MCSClient::MaybeSendMessage,
483 weak_ptr_factory_.GetWeakPtr())); 482 weak_ptr_factory_.GetWeakPtr()));
484 } 483 }
485 484
486 heartbeat_timer_.Start(FROM_HERE, 485 heartbeat_manager_.Start(
487 heartbeat_interval_, 486 base::Bind(&MCSClient::SendHeartbeat,
488 base::Bind(&MCSClient::SendHeartbeat, 487 weak_ptr_factory_.GetWeakPtr()),
489 weak_ptr_factory_.GetWeakPtr())); 488 base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
489 weak_ptr_factory_.GetWeakPtr()));
490 return; 490 return;
491 } 491 }
492 case kHeartbeatPingTag: 492 case kHeartbeatPingTag:
493 DCHECK_GE(stream_id_in_, 1U); 493 DCHECK_GE(stream_id_in_, 1U);
494 DVLOG(1) << "Received heartbeat ping, sending ack."; 494 DVLOG(1) << "Received heartbeat ping, sending ack.";
495 SendMessage( 495 SendMessage(
496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); 496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false);
497 return; 497 return;
498 case kHeartbeatAckTag: 498 case kHeartbeatAckTag:
499 DCHECK_GE(stream_id_in_, 1U); 499 DCHECK_GE(stream_id_in_, 1U);
500 DVLOG(1) << "Received heartbeat ack."; 500 DVLOG(1) << "Received heartbeat ack.";
501 // TODO(zea): add logic to reconnect if no ack received within a certain 501 // Do nothing else, all messages act as heartbeat acks.
502 // timeout (with backoff).
503 return; 502 return;
504 case kCloseTag: 503 case kCloseTag:
505 LOG(ERROR) << "Received close command, resetting connection."; 504 LOG(ERROR) << "Received close command, resetting connection.";
506 state_ = LOADED; 505 state_ = LOADED;
507 connection_factory_->SignalConnectionReset(); 506 connection_factory_->SignalConnectionReset();
508 return; 507 return;
509 case kIqStanzaTag: { 508 case kIqStanzaTag: {
510 DCHECK_GE(stream_id_in_, 1U); 509 DCHECK_GE(stream_id_in_, 1U);
511 mcs_proto::IqStanza* iq_stanza = 510 mcs_proto::IqStanza* iq_stanza =
512 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get()); 511 reinterpret_cast<mcs_proto::IqStanza*>(protobuf.get());
(...skipping 137 matching lines...) Expand 10 before | Expand all | Expand 10 after
650 rmq_store_->RemoveIncomingMessages( 649 rmq_store_->RemoveIncomingMessages(
651 acked_incoming_ids, 650 acked_incoming_ids,
652 base::Bind(&MCSClient::OnRMQUpdateFinished, 651 base::Bind(&MCSClient::OnRMQUpdateFinished,
653 weak_ptr_factory_.GetWeakPtr())); 652 weak_ptr_factory_.GetWeakPtr()));
654 } 653 }
655 654
656 MCSClient::PersistentId MCSClient::GetNextPersistentId() { 655 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
657 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); 656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
658 } 657 }
659 658
659 void MCSClient::OnConnectionResetByHeartbeat() {
660 connection_factory_->SignalConnectionReset();
661 }
662
660 } // namespace gcm 663 } // namespace gcm
OLDNEW
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/gcm.gyp » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698