Index: google_apis/gcm/engine/mcs_client.cc |
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc |
index 9b639c3bbd035e40472b9e31d4d96472119ee862..0435fa037b3caaaf3014d0173bb8980395369e33 100644 |
--- a/google_apis/gcm/engine/mcs_client.cc |
+++ b/google_apis/gcm/engine/mcs_client.cc |
@@ -7,6 +7,8 @@ |
#include "base/basictypes.h" |
#include "base/message_loop/message_loop.h" |
#include "base/strings/string_number_conversions.h" |
+#include "base/time/clock.h" |
+#include "base/time/time.h" |
#include "google_apis/gcm/base/mcs_util.h" |
#include "google_apis/gcm/base/socket_stream.h" |
#include "google_apis/gcm/engine/connection_factory.h" |
@@ -83,8 +85,11 @@ ReliablePacketInfo::ReliablePacketInfo() |
} |
ReliablePacketInfo::~ReliablePacketInfo() {} |
-MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) |
- : state_(UNINITIALIZED), |
+MCSClient::MCSClient(base::Clock* clock, |
+ ConnectionFactory* connection_factory, |
+ RMQStore* rmq_store) |
+ : clock_(clock), |
+ state_(UNINITIALIZED), |
android_id_(0), |
security_token_(0), |
connection_factory_(connection_factory), |
@@ -149,6 +154,7 @@ void MCSClient::Initialize( |
// First go through and order the outgoing messages by recency. |
std::map<uint64, google::protobuf::MessageLite*> ordered_messages; |
+ std::vector<PersistentId> dropped_ids; |
for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator |
iter = load_result.outgoing_messages.begin(); |
iter != load_result.outgoing_messages.end(); ++iter) { |
@@ -157,9 +163,25 @@ void MCSClient::Initialize( |
LOG(ERROR) << "Invalid restored message."; |
return; |
} |
+ |
+ // Check if the TTL has expired for this message. |
+ if (HasTTLExpired(*iter->second, clock_)) { |
+ dropped_ids.push_back(iter->first); |
+ message_sent_callback_.Run("TTL expired for " + iter->first); |
+ delete iter->second; |
+ continue; |
+ } |
+ |
ordered_messages[timestamp] = iter->second; |
} |
+ if (!dropped_ids.empty()) { |
+ rmq_store_->RemoveOutgoingMessages( |
+ dropped_ids, |
+ base::Bind(&MCSClient::OnRMQUpdateFinished, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ } |
+ |
// Now go through and add the outgoing messages to the send queue in their |
// appropriate order (oldest at front, most recent at back). |
for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator |
@@ -194,25 +216,22 @@ void MCSClient::Login(uint64 android_id, uint64 security_token) { |
connection_factory_->Connect(); |
} |
-void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { |
- DCHECK_EQ(state_, CONNECTED); |
+void MCSClient::SendMessage(const MCSMessage& message) { |
+ int ttl = GetTTL(message.GetProtobuf()); |
+ DCHECK_GE(ttl, 0); |
if (to_send_.size() > kMaxSendQueueSize) { |
- base::MessageLoop::current()->PostTask( |
- FROM_HERE, |
- base::Bind(message_sent_callback_, "Message queue full.")); |
+ message_sent_callback_.Run("Message queue full."); |
return; |
} |
if (message.size() > kMaxMessageBytes) { |
- base::MessageLoop::current()->PostTask( |
- FROM_HERE, |
- base::Bind(message_sent_callback_, "Message too large.")); |
+ message_sent_callback_.Run("Message too large."); |
return; |
} |
ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
packet_info->protobuf = message.CloneProtobuf(); |
- if (use_rmq) { |
+ if (ttl > 0) { |
PersistentId persistent_id = GetNextPersistentId(); |
DVLOG(1) << "Setting persistent id to " << persistent_id; |
packet_info->persistent_id = persistent_id; |
@@ -223,14 +242,10 @@ void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { |
*(packet_info->protobuf)), |
base::Bind(&MCSClient::OnRMQUpdateFinished, |
weak_ptr_factory_.GetWeakPtr())); |
- } else { |
- // Check that there is an active connection to the endpoint. |
- if (!connection_handler_->CanSendMessage()) { |
- base::MessageLoop::current()->PostTask( |
- FROM_HERE, |
- base::Bind(message_sent_callback_, "Unable to reach endpoint")); |
- return; |
- } |
+ } else if (!connection_factory_->IsEndpointReachable()) { |
+ DVLOG(1) << "No active connection, dropping message."; |
+ message_sent_callback_.Run("TTL expired"); |
+ return; |
} |
to_send_.push_back(make_linked_ptr(packet_info)); |
MaybeSendMessage(); |
@@ -252,8 +267,6 @@ void MCSClient::ResetStateAndBuildLoginRequest( |
heartbeat_manager_.Stop(); |
- // TODO(zea): expire all messages older than their TTL. |
- |
// Add any pending acknowledgments to the list of ids. |
for (StreamIdToPersistentIdMap::const_iterator iter = |
unacked_server_ids_.begin(); |
@@ -290,6 +303,34 @@ void MCSClient::ResetStateAndBuildLoginRequest( |
to_send_.push_front(to_resend_.back()); |
to_resend_.pop_back(); |
} |
+ |
+ // Drop all TTL == 0 or expired TTL messages from the queue. |
+ std::deque<MCSPacketInternal> new_to_send; |
+ std::vector<PersistentId> dropped_ids; |
+ while (!to_send_.empty()) { |
+ MCSPacketInternal packet = to_send_.front(); |
+ to_send_.pop_front(); |
+ if (GetTTL(*packet->protobuf) > 0 && |
+ !HasTTLExpired(*packet->protobuf, clock_)) { |
+ new_to_send.push_back(packet); |
+ } else { |
+ if (!packet->persistent_id.empty()) |
+ dropped_ids.push_back(packet->persistent_id); |
+ message_sent_callback_.Run("TTL expired"); |
+ } |
+ } |
+ |
+ if (!dropped_ids.empty()) { |
+ DVLOG(1) << "Connection reset, " << dropped_ids.size() |
+ << " messages expired."; |
+ rmq_store_->RemoveOutgoingMessages( |
+ dropped_ids, |
+ base::Bind(&MCSClient::OnRMQUpdateFinished, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ } |
+ |
+ to_send_.swap(new_to_send); |
+ |
DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
<< " incoming acks pending, and " << to_send_.size() |
<< " pending outgoing messages."; |
@@ -298,8 +339,7 @@ void MCSClient::ResetStateAndBuildLoginRequest( |
} |
void MCSClient::SendHeartbeat() { |
- SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), |
- false); |
+ SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); |
} |
void MCSClient::OnRMQUpdateFinished(bool success) { |
@@ -311,14 +351,30 @@ void MCSClient::MaybeSendMessage() { |
if (to_send_.empty()) |
return; |
- if (!connection_handler_->CanSendMessage()) |
+ // If the connection has been reset, do nothing. On reconnection |
+ // MaybeSendMessage will be automatically invoked again. |
+ // TODO(zea): consider doing TTL expiration at connection reset time, rather |
+ // than reconnect time. |
+ if (!connection_factory_->IsEndpointReachable()) |
return; |
- // TODO(zea): drop messages older than their TTL. |
- |
- DVLOG(1) << "Pending output message found, sending."; |
MCSPacketInternal packet = to_send_.front(); |
to_send_.pop_front(); |
+ if (HasTTLExpired(*packet->protobuf, clock_)) { |
+ DCHECK(!packet->persistent_id.empty()); |
+ DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
+ message_sent_callback_.Run("TTL expired for " + packet->persistent_id); |
+ rmq_store_->RemoveOutgoingMessage( |
+ packet->persistent_id, |
+ base::Bind(&MCSClient::OnRMQUpdateFinished, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ base::MessageLoop::current()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&MCSClient::MaybeSendMessage, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ return; |
+ } |
+ DVLOG(1) << "Pending output message found, sending."; |
if (!packet->persistent_id.empty()) |
to_resend_.push_back(packet); |
SendPacketToWire(packet.get()); |
@@ -379,8 +435,7 @@ void MCSClient::HandleMCSDataMesssage( |
if (send) { |
SendMessage( |
MCSMessage(kDataMessageStanzaTag, |
- response.PassAs<const google::protobuf::MessageLite>()), |
- false); |
+ response.PassAs<const google::protobuf::MessageLite>())); |
} |
} |
@@ -434,8 +489,7 @@ void MCSClient::HandlePacketFromWire( |
unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { |
SendMessage(MCSMessage(kIqStanzaTag, |
BuildStreamAck(). |
- PassAs<const google::protobuf::MessageLite>()), |
- false); |
+ PassAs<const google::protobuf::MessageLite>())); |
} |
// The connection is alive, treat this message as a heartbeat ack. |
@@ -493,7 +547,7 @@ void MCSClient::HandlePacketFromWire( |
DCHECK_GE(stream_id_in_, 1U); |
DVLOG(1) << "Received heartbeat ping, sending ack."; |
SendMessage( |
- MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); |
+ MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck())); |
return; |
case kHeartbeatAckTag: |
DCHECK_GE(stream_id_in_, 1U); |