Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1311)

Unified Diff: google_apis/gcm/engine/mcs_client.cc

Issue 117513004: [GCM] Add TTL support (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 6 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: google_apis/gcm/engine/mcs_client.cc
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
index 9b639c3bbd035e40472b9e31d4d96472119ee862..0435fa037b3caaaf3014d0173bb8980395369e33 100644
--- a/google_apis/gcm/engine/mcs_client.cc
+++ b/google_apis/gcm/engine/mcs_client.cc
@@ -7,6 +7,8 @@
#include "base/basictypes.h"
#include "base/message_loop/message_loop.h"
#include "base/strings/string_number_conversions.h"
+#include "base/time/clock.h"
+#include "base/time/time.h"
#include "google_apis/gcm/base/mcs_util.h"
#include "google_apis/gcm/base/socket_stream.h"
#include "google_apis/gcm/engine/connection_factory.h"
@@ -83,8 +85,11 @@ ReliablePacketInfo::ReliablePacketInfo()
}
ReliablePacketInfo::~ReliablePacketInfo() {}
-MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store)
- : state_(UNINITIALIZED),
+MCSClient::MCSClient(base::Clock* clock,
+ ConnectionFactory* connection_factory,
+ RMQStore* rmq_store)
+ : clock_(clock),
+ state_(UNINITIALIZED),
android_id_(0),
security_token_(0),
connection_factory_(connection_factory),
@@ -149,6 +154,7 @@ void MCSClient::Initialize(
// First go through and order the outgoing messages by recency.
std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
+ std::vector<PersistentId> dropped_ids;
for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator
iter = load_result.outgoing_messages.begin();
iter != load_result.outgoing_messages.end(); ++iter) {
@@ -157,9 +163,25 @@ void MCSClient::Initialize(
LOG(ERROR) << "Invalid restored message.";
return;
}
+
+ // Check if the TTL has expired for this message.
+ if (HasTTLExpired(*iter->second, clock_)) {
+ dropped_ids.push_back(iter->first);
+ message_sent_callback_.Run("TTL expired for " + iter->first);
+ delete iter->second;
+ continue;
+ }
+
ordered_messages[timestamp] = iter->second;
}
+ if (!dropped_ids.empty()) {
+ rmq_store_->RemoveOutgoingMessages(
+ dropped_ids,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+ }
+
// Now go through and add the outgoing messages to the send queue in their
// appropriate order (oldest at front, most recent at back).
for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator
@@ -194,25 +216,22 @@ void MCSClient::Login(uint64 android_id, uint64 security_token) {
connection_factory_->Connect();
}
-void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) {
- DCHECK_EQ(state_, CONNECTED);
+void MCSClient::SendMessage(const MCSMessage& message) {
+ int ttl = GetTTL(message.GetProtobuf());
+ DCHECK_GE(ttl, 0);
if (to_send_.size() > kMaxSendQueueSize) {
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(message_sent_callback_, "Message queue full."));
+ message_sent_callback_.Run("Message queue full.");
return;
}
if (message.size() > kMaxMessageBytes) {
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(message_sent_callback_, "Message too large."));
+ message_sent_callback_.Run("Message too large.");
return;
}
ReliablePacketInfo* packet_info = new ReliablePacketInfo();
packet_info->protobuf = message.CloneProtobuf();
- if (use_rmq) {
+ if (ttl > 0) {
PersistentId persistent_id = GetNextPersistentId();
DVLOG(1) << "Setting persistent id to " << persistent_id;
packet_info->persistent_id = persistent_id;
@@ -223,14 +242,10 @@ void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) {
*(packet_info->protobuf)),
base::Bind(&MCSClient::OnRMQUpdateFinished,
weak_ptr_factory_.GetWeakPtr()));
- } else {
- // Check that there is an active connection to the endpoint.
- if (!connection_handler_->CanSendMessage()) {
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(message_sent_callback_, "Unable to reach endpoint"));
- return;
- }
+ } else if (!connection_factory_->IsEndpointReachable()) {
+ DVLOG(1) << "No active connection, dropping message.";
+ message_sent_callback_.Run("TTL expired");
+ return;
}
to_send_.push_back(make_linked_ptr(packet_info));
MaybeSendMessage();
@@ -252,8 +267,6 @@ void MCSClient::ResetStateAndBuildLoginRequest(
heartbeat_manager_.Stop();
- // TODO(zea): expire all messages older than their TTL.
-
// Add any pending acknowledgments to the list of ids.
for (StreamIdToPersistentIdMap::const_iterator iter =
unacked_server_ids_.begin();
@@ -290,6 +303,34 @@ void MCSClient::ResetStateAndBuildLoginRequest(
to_send_.push_front(to_resend_.back());
to_resend_.pop_back();
}
+
+ // Drop all TTL == 0 or expired TTL messages from the queue.
+ std::deque<MCSPacketInternal> new_to_send;
+ std::vector<PersistentId> dropped_ids;
+ while (!to_send_.empty()) {
+ MCSPacketInternal packet = to_send_.front();
+ to_send_.pop_front();
+ if (GetTTL(*packet->protobuf) > 0 &&
+ !HasTTLExpired(*packet->protobuf, clock_)) {
+ new_to_send.push_back(packet);
+ } else {
+ if (!packet->persistent_id.empty())
+ dropped_ids.push_back(packet->persistent_id);
+ message_sent_callback_.Run("TTL expired");
+ }
+ }
+
+ if (!dropped_ids.empty()) {
+ DVLOG(1) << "Connection reset, " << dropped_ids.size()
+ << " messages expired.";
+ rmq_store_->RemoveOutgoingMessages(
+ dropped_ids,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+ }
+
+ to_send_.swap(new_to_send);
+
DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
<< " incoming acks pending, and " << to_send_.size()
<< " pending outgoing messages.";
@@ -298,8 +339,7 @@ void MCSClient::ResetStateAndBuildLoginRequest(
}
void MCSClient::SendHeartbeat() {
- SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()),
- false);
+ SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
}
void MCSClient::OnRMQUpdateFinished(bool success) {
@@ -311,14 +351,30 @@ void MCSClient::MaybeSendMessage() {
if (to_send_.empty())
return;
- if (!connection_handler_->CanSendMessage())
+ // If the connection has been reset, do nothing. On reconnection
+ // MaybeSendMessage will be automatically invoked again.
+ // TODO(zea): consider doing TTL expiration at connection reset time, rather
+ // than reconnect time.
+ if (!connection_factory_->IsEndpointReachable())
return;
- // TODO(zea): drop messages older than their TTL.
-
- DVLOG(1) << "Pending output message found, sending.";
MCSPacketInternal packet = to_send_.front();
to_send_.pop_front();
+ if (HasTTLExpired(*packet->protobuf, clock_)) {
+ DCHECK(!packet->persistent_id.empty());
+ DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
+ message_sent_callback_.Run("TTL expired for " + packet->persistent_id);
+ rmq_store_->RemoveOutgoingMessage(
+ packet->persistent_id,
+ base::Bind(&MCSClient::OnRMQUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+ base::MessageLoop::current()->PostTask(
+ FROM_HERE,
+ base::Bind(&MCSClient::MaybeSendMessage,
+ weak_ptr_factory_.GetWeakPtr()));
+ return;
+ }
+ DVLOG(1) << "Pending output message found, sending.";
if (!packet->persistent_id.empty())
to_resend_.push_back(packet);
SendPacketToWire(packet.get());
@@ -379,8 +435,7 @@ void MCSClient::HandleMCSDataMesssage(
if (send) {
SendMessage(
MCSMessage(kDataMessageStanzaTag,
- response.PassAs<const google::protobuf::MessageLite>()),
- false);
+ response.PassAs<const google::protobuf::MessageLite>()));
}
}
@@ -434,8 +489,7 @@ void MCSClient::HandlePacketFromWire(
unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
SendMessage(MCSMessage(kIqStanzaTag,
BuildStreamAck().
- PassAs<const google::protobuf::MessageLite>()),
- false);
+ PassAs<const google::protobuf::MessageLite>()));
}
// The connection is alive, treat this message as a heartbeat ack.
@@ -493,7 +547,7 @@ void MCSClient::HandlePacketFromWire(
DCHECK_GE(stream_id_in_, 1U);
DVLOG(1) << "Received heartbeat ping, sending ack.";
SendMessage(
- MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false);
+ MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
return;
case kHeartbeatAckTag:
DCHECK_GE(stream_id_in_, 1U);

Powered by Google App Engine
This is Rietveld 408576698