| Index: google_apis/gcm/engine/mcs_client.cc
|
| diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
|
| index 9b639c3bbd035e40472b9e31d4d96472119ee862..56d3901c189e8cfd96ef578647d1d468e1fa3ad0 100644
|
| --- a/google_apis/gcm/engine/mcs_client.cc
|
| +++ b/google_apis/gcm/engine/mcs_client.cc
|
| @@ -7,6 +7,8 @@
|
| #include "base/basictypes.h"
|
| #include "base/message_loop/message_loop.h"
|
| #include "base/strings/string_number_conversions.h"
|
| +#include "base/time/clock.h"
|
| +#include "base/time/time.h"
|
| #include "google_apis/gcm/base/mcs_util.h"
|
| #include "google_apis/gcm/base/socket_stream.h"
|
| #include "google_apis/gcm/engine/connection_factory.h"
|
| @@ -83,8 +85,11 @@ ReliablePacketInfo::ReliablePacketInfo()
|
| }
|
| ReliablePacketInfo::~ReliablePacketInfo() {}
|
|
|
| -MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store)
|
| - : state_(UNINITIALIZED),
|
| +MCSClient::MCSClient(base::Clock* clock,
|
| + ConnectionFactory* connection_factory,
|
| + RMQStore* rmq_store)
|
| + : clock_(clock),
|
| + state_(UNINITIALIZED),
|
| android_id_(0),
|
| security_token_(0),
|
| connection_factory_(connection_factory),
|
| @@ -149,6 +154,7 @@ void MCSClient::Initialize(
|
|
|
| // First go through and order the outgoing messages by recency.
|
| std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
|
| + std::vector<PersistentId> expired_ttl_ids;
|
| for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator
|
| iter = load_result.outgoing_messages.begin();
|
| iter != load_result.outgoing_messages.end(); ++iter) {
|
| @@ -157,9 +163,25 @@ void MCSClient::Initialize(
|
| LOG(ERROR) << "Invalid restored message.";
|
| return;
|
| }
|
| +
|
| + // Check if the TTL has expired for this message.
|
| + if (HasTTLExpired(*iter->second, clock_)) {
|
| + expired_ttl_ids.push_back(iter->first);
|
| + message_sent_callback_.Run("TTL expired for " + iter->first);
|
| + delete iter->second;
|
| + continue;
|
| + }
|
| +
|
| ordered_messages[timestamp] = iter->second;
|
| }
|
|
|
| + if (!expired_ttl_ids.empty()) {
|
| + rmq_store_->RemoveOutgoingMessages(
|
| + expired_ttl_ids,
|
| + base::Bind(&MCSClient::OnRMQUpdateFinished,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + }
|
| +
|
| // Now go through and add the outgoing messages to the send queue in their
|
| // appropriate order (oldest at front, most recent at back).
|
| for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator
|
| @@ -194,25 +216,22 @@ void MCSClient::Login(uint64 android_id, uint64 security_token) {
|
| connection_factory_->Connect();
|
| }
|
|
|
| -void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) {
|
| - DCHECK_EQ(state_, CONNECTED);
|
| +void MCSClient::SendMessage(const MCSMessage& message) {
|
| + int ttl = GetTTL(message.GetProtobuf());
|
| + DCHECK_GE(ttl, 0);
|
| if (to_send_.size() > kMaxSendQueueSize) {
|
| - base::MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(message_sent_callback_, "Message queue full."));
|
| + message_sent_callback_.Run("Message queue full.");
|
| return;
|
| }
|
| if (message.size() > kMaxMessageBytes) {
|
| - base::MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(message_sent_callback_, "Message too large."));
|
| + message_sent_callback_.Run("Message too large.");
|
| return;
|
| }
|
|
|
| ReliablePacketInfo* packet_info = new ReliablePacketInfo();
|
| packet_info->protobuf = message.CloneProtobuf();
|
|
|
| - if (use_rmq) {
|
| + if (ttl > 0) {
|
| PersistentId persistent_id = GetNextPersistentId();
|
| DVLOG(1) << "Setting persistent id to " << persistent_id;
|
| packet_info->persistent_id = persistent_id;
|
| @@ -223,14 +242,10 @@ void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) {
|
| *(packet_info->protobuf)),
|
| base::Bind(&MCSClient::OnRMQUpdateFinished,
|
| weak_ptr_factory_.GetWeakPtr()));
|
| - } else {
|
| - // Check that there is an active connection to the endpoint.
|
| - if (!connection_handler_->CanSendMessage()) {
|
| - base::MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(message_sent_callback_, "Unable to reach endpoint"));
|
| - return;
|
| - }
|
| + } else if (!connection_factory_->IsEndpointReachable()) {
|
| + DVLOG(1) << "No active connection, dropping message.";
|
| + message_sent_callback_.Run("TTL expired");
|
| + return;
|
| }
|
| to_send_.push_back(make_linked_ptr(packet_info));
|
| MaybeSendMessage();
|
| @@ -252,8 +267,6 @@ void MCSClient::ResetStateAndBuildLoginRequest(
|
|
|
| heartbeat_manager_.Stop();
|
|
|
| - // TODO(zea): expire all messages older than their TTL.
|
| -
|
| // Add any pending acknowledgments to the list of ids.
|
| for (StreamIdToPersistentIdMap::const_iterator iter =
|
| unacked_server_ids_.begin();
|
| @@ -290,6 +303,36 @@ void MCSClient::ResetStateAndBuildLoginRequest(
|
| to_send_.push_front(to_resend_.back());
|
| to_resend_.pop_back();
|
| }
|
| +
|
| + // Drop all TTL == 0 or expired TTL messages from the queue.
|
| + std::deque<MCSPacketInternal> new_to_send;
|
| + std::vector<PersistentId> expired_ttl_ids;
|
| + while (!to_send_.empty()) {
|
| + MCSPacketInternal packet = to_send_.front();
|
| + to_send_.pop_front();
|
| + if (GetTTL(*packet->protobuf) > 0 &&
|
| + !HasTTLExpired(*packet->protobuf, clock_)) {
|
| + new_to_send.push_back(packet);
|
| + } else {
|
| + // If the TTL was 0 there is no persistent id, so no need to remove the
|
| + // message from the persistent store.
|
| + if (!packet->persistent_id.empty())
|
| + expired_ttl_ids.push_back(packet->persistent_id);
|
| + message_sent_callback_.Run("TTL expired");
|
| + }
|
| + }
|
| +
|
| + if (!expired_ttl_ids.empty()) {
|
| + DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
|
| + << " messages expired.";
|
| + rmq_store_->RemoveOutgoingMessages(
|
| + expired_ttl_ids,
|
| + base::Bind(&MCSClient::OnRMQUpdateFinished,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + }
|
| +
|
| + to_send_.swap(new_to_send);
|
| +
|
| DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
|
| << " incoming acks pending, and " << to_send_.size()
|
| << " pending outgoing messages.";
|
| @@ -298,8 +341,7 @@ void MCSClient::ResetStateAndBuildLoginRequest(
|
| }
|
|
|
| void MCSClient::SendHeartbeat() {
|
| - SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()),
|
| - false);
|
| + SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
|
| }
|
|
|
| void MCSClient::OnRMQUpdateFinished(bool success) {
|
| @@ -311,14 +353,30 @@ void MCSClient::MaybeSendMessage() {
|
| if (to_send_.empty())
|
| return;
|
|
|
| - if (!connection_handler_->CanSendMessage())
|
| + // If the connection has been reset, do nothing. On reconnection
|
| + // MaybeSendMessage will be automatically invoked again.
|
| + // TODO(zea): consider doing TTL expiration at connection reset time, rather
|
| + // than reconnect time.
|
| + if (!connection_factory_->IsEndpointReachable())
|
| return;
|
|
|
| - // TODO(zea): drop messages older than their TTL.
|
| -
|
| - DVLOG(1) << "Pending output message found, sending.";
|
| MCSPacketInternal packet = to_send_.front();
|
| to_send_.pop_front();
|
| + if (HasTTLExpired(*packet->protobuf, clock_)) {
|
| + DCHECK(!packet->persistent_id.empty());
|
| + DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
|
| + message_sent_callback_.Run("TTL expired for " + packet->persistent_id);
|
| + rmq_store_->RemoveOutgoingMessage(
|
| + packet->persistent_id,
|
| + base::Bind(&MCSClient::OnRMQUpdateFinished,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + base::MessageLoop::current()->PostTask(
|
| + FROM_HERE,
|
| + base::Bind(&MCSClient::MaybeSendMessage,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| + return;
|
| + }
|
| + DVLOG(1) << "Pending output message found, sending.";
|
| if (!packet->persistent_id.empty())
|
| to_resend_.push_back(packet);
|
| SendPacketToWire(packet.get());
|
| @@ -362,6 +420,9 @@ void MCSClient::HandleMCSDataMesssage(
|
| scoped_ptr<mcs_proto::DataMessageStanza> response(
|
| new mcs_proto::DataMessageStanza());
|
| response->set_from(kGCMFromField);
|
| + response->set_sent(clock_->Now().ToInternalValue() /
|
| + base::Time::kMicrosecondsPerSecond);
|
| + response->set_ttl(0);
|
| bool send = false;
|
| for (int i = 0; i < data_message->app_data_size(); ++i) {
|
| const mcs_proto::AppData& app_data = data_message->app_data(i);
|
| @@ -379,8 +440,7 @@ void MCSClient::HandleMCSDataMesssage(
|
| if (send) {
|
| SendMessage(
|
| MCSMessage(kDataMessageStanzaTag,
|
| - response.PassAs<const google::protobuf::MessageLite>()),
|
| - false);
|
| + response.PassAs<const google::protobuf::MessageLite>()));
|
| }
|
| }
|
|
|
| @@ -434,8 +494,7 @@ void MCSClient::HandlePacketFromWire(
|
| unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
|
| SendMessage(MCSMessage(kIqStanzaTag,
|
| BuildStreamAck().
|
| - PassAs<const google::protobuf::MessageLite>()),
|
| - false);
|
| + PassAs<const google::protobuf::MessageLite>()));
|
| }
|
|
|
| // The connection is alive, treat this message as a heartbeat ack.
|
| @@ -493,7 +552,7 @@ void MCSClient::HandlePacketFromWire(
|
| DCHECK_GE(stream_id_in_, 1U);
|
| DVLOG(1) << "Received heartbeat ping, sending ack.";
|
| SendMessage(
|
| - MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false);
|
| + MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
|
| return;
|
| case kHeartbeatAckTag:
|
| DCHECK_GE(stream_id_in_, 1U);
|
|
|