Chromium Code Reviews| Index: google_apis/gcm/engine/mcs_client.cc |
| diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc |
| index 9b639c3bbd035e40472b9e31d4d96472119ee862..f418f6a5b51e9bf8ee11b581a8d08e1f715417b7 100644 |
| --- a/google_apis/gcm/engine/mcs_client.cc |
| +++ b/google_apis/gcm/engine/mcs_client.cc |
| @@ -7,6 +7,8 @@ |
| #include "base/basictypes.h" |
| #include "base/message_loop/message_loop.h" |
| #include "base/strings/string_number_conversions.h" |
| +#include "base/time/clock.h" |
| +#include "base/time/time.h" |
| #include "google_apis/gcm/base/mcs_util.h" |
| #include "google_apis/gcm/base/socket_stream.h" |
| #include "google_apis/gcm/engine/connection_factory.h" |
| @@ -44,6 +46,9 @@ const size_t kMaxSendQueueSize = 10 * 1024; |
| // The maximum message size that can be sent to the server. |
| const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server. |
| +// Maximum amount of time to save an unsent outgoing message for. |
| +const int kMaxTTLSeconds = 4 * 7 * 24 * 60 * 60; // 4 weeks. |
| + |
| // Helper for converting a proto persistent id list to a vector of strings. |
| bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, |
| std::vector<std::string>* id_list) { |
| @@ -83,8 +88,11 @@ ReliablePacketInfo::ReliablePacketInfo() |
| } |
| ReliablePacketInfo::~ReliablePacketInfo() {} |
| -MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) |
| - : state_(UNINITIALIZED), |
| +MCSClient::MCSClient(base::Clock* clock, |
| + ConnectionFactory* connection_factory, |
| + RMQStore* rmq_store) |
| + : clock_(clock), |
| + state_(UNINITIALIZED), |
| android_id_(0), |
| security_token_(0), |
| connection_factory_(connection_factory), |
| @@ -149,6 +157,7 @@ void MCSClient::Initialize( |
| // First go through and order the outgoing messages by recency. |
| std::map<uint64, google::protobuf::MessageLite*> ordered_messages; |
| + std::vector<PersistentId> dropped_ids; |
| for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator |
| iter = load_result.outgoing_messages.begin(); |
| iter != load_result.outgoing_messages.end(); ++iter) { |
| @@ -157,9 +166,25 @@ void MCSClient::Initialize( |
| LOG(ERROR) << "Invalid restored message."; |
| return; |
| } |
| + |
| + // Check if the TTL has expired for this message. |
| + if (HasTTLExpired(*iter->second, clock_)) { |
| + dropped_ids.push_back(iter->first); |
| + message_sent_callback_.Run("TTL expired for " + iter->first); |
| + delete iter->second; |
| + continue; |
| + } |
| + |
| ordered_messages[timestamp] = iter->second; |
| } |
| + if (!dropped_ids.empty()) { |
| + rmq_store_->RemoveOutgoingMessages( |
| + dropped_ids, |
| + base::Bind(&MCSClient::OnRMQUpdateFinished, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + } |
| + |
| // Now go through and add the outgoing messages to the send queue in their |
| // appropriate order (oldest at front, most recent at back). |
| for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator |
| @@ -194,25 +219,23 @@ void MCSClient::Login(uint64 android_id, uint64 security_token) { |
| connection_factory_->Connect(); |
| } |
| -void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { |
| - DCHECK_EQ(state_, CONNECTED); |
| +void MCSClient::SendMessage(const MCSMessage& message) { |
| + int ttl = GetTTL(message.GetProtobuf()); |
| + DCHECK_GE(ttl, 0); |
| + DCHECK_LT(ttl, kMaxTTLSeconds); |
|
fgorski
2013/12/28 01:15:08
DCHECK_LE
Nicolas Zea
2013/12/30 21:46:19
Done.
|
| if (to_send_.size() > kMaxSendQueueSize) { |
| - base::MessageLoop::current()->PostTask( |
| - FROM_HERE, |
| - base::Bind(message_sent_callback_, "Message queue full.")); |
| + message_sent_callback_.Run("Message queue full."); |
| return; |
| } |
| if (message.size() > kMaxMessageBytes) { |
| - base::MessageLoop::current()->PostTask( |
| - FROM_HERE, |
| - base::Bind(message_sent_callback_, "Message too large.")); |
| + message_sent_callback_.Run("Message too large."); |
| return; |
| } |
| ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| packet_info->protobuf = message.CloneProtobuf(); |
| - if (use_rmq) { |
| + if (ttl > 0) { |
| PersistentId persistent_id = GetNextPersistentId(); |
| DVLOG(1) << "Setting persistent id to " << persistent_id; |
| packet_info->persistent_id = persistent_id; |
| @@ -223,14 +246,10 @@ void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { |
| *(packet_info->protobuf)), |
| base::Bind(&MCSClient::OnRMQUpdateFinished, |
| weak_ptr_factory_.GetWeakPtr())); |
| - } else { |
| - // Check that there is an active connection to the endpoint. |
| - if (!connection_handler_->CanSendMessage()) { |
| - base::MessageLoop::current()->PostTask( |
| - FROM_HERE, |
| - base::Bind(message_sent_callback_, "Unable to reach endpoint")); |
| - return; |
| - } |
| + } else if (!connection_factory_->IsEndpointReachable()) { |
| + DVLOG(1) << "No active connection, dropping message."; |
| + message_sent_callback_.Run("TTL expired"); |
| + return; |
| } |
| to_send_.push_back(make_linked_ptr(packet_info)); |
| MaybeSendMessage(); |
| @@ -252,8 +271,6 @@ void MCSClient::ResetStateAndBuildLoginRequest( |
| heartbeat_manager_.Stop(); |
| - // TODO(zea): expire all messages older than their TTL. |
| - |
| // Add any pending acknowledgments to the list of ids. |
| for (StreamIdToPersistentIdMap::const_iterator iter = |
| unacked_server_ids_.begin(); |
| @@ -290,6 +307,34 @@ void MCSClient::ResetStateAndBuildLoginRequest( |
| to_send_.push_front(to_resend_.back()); |
| to_resend_.pop_back(); |
| } |
| + |
| + // Drop all TTL == 0 or expired TTL messages from the queue. |
| + std::deque<MCSPacketInternal> new_to_send; |
| + std::vector<PersistentId> dropped_ids; |
| + while (!to_send_.empty()) { |
| + MCSPacketInternal packet = to_send_.front(); |
| + to_send_.pop_front(); |
| + if (GetTTL(*packet->protobuf) > 0 && |
| + !HasTTLExpired(*packet->protobuf, clock_)) { |
| + new_to_send.push_back(packet); |
| + } else { |
| + if (!packet->persistent_id.empty()) |
| + dropped_ids.push_back(packet->persistent_id); |
| + message_sent_callback_.Run("TTL expired for " + packet->persistent_id); |
|
fgorski
2013/12/28 01:15:08
This message is going to be a bit pointless when p
Nicolas Zea
2013/12/30 21:46:19
Yeah, all of these messages are pointless right no
|
| + } |
| + } |
| + |
| + if (!dropped_ids.empty()) { |
| + DVLOG(1) << "Connection reset, " << dropped_ids.size() |
| + << " messages expired."; |
| + rmq_store_->RemoveOutgoingMessages( |
| + dropped_ids, |
| + base::Bind(&MCSClient::OnRMQUpdateFinished, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + } |
| + |
| + to_send_.swap(new_to_send); |
| + |
| DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
| << " incoming acks pending, and " << to_send_.size() |
| << " pending outgoing messages."; |
| @@ -298,8 +343,7 @@ void MCSClient::ResetStateAndBuildLoginRequest( |
| } |
| void MCSClient::SendHeartbeat() { |
| - SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), |
| - false); |
| + SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); |
| } |
| void MCSClient::OnRMQUpdateFinished(bool success) { |
| @@ -311,14 +355,30 @@ void MCSClient::MaybeSendMessage() { |
| if (to_send_.empty()) |
| return; |
| - if (!connection_handler_->CanSendMessage()) |
| + // If the connection has been reset, do nothing. On reconnection |
| + // MaybeSendMessage will be automatically invoked again. |
| + // TODO(zea): consider doing TTL expiration at connection reset time, rather |
| + // than reconnect time. |
| + if (!connection_factory_->IsEndpointReachable()) |
| return; |
| - // TODO(zea): drop messages older than their TTL. |
| - |
| - DVLOG(1) << "Pending output message found, sending."; |
| MCSPacketInternal packet = to_send_.front(); |
| to_send_.pop_front(); |
| + if (HasTTLExpired(*packet->protobuf, clock_)) { |
| + DCHECK(packet->persistent_id.empty()); |
| + DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
| + message_sent_callback_.Run("TTL expired for " + packet->persistent_id); |
| + rmq_store_->RemoveOutgoingMessage( |
| + packet->persistent_id, |
| + base::Bind(&MCSClient::OnRMQUpdateFinished, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + base::MessageLoop::current()->PostTask( |
| + FROM_HERE, |
| + base::Bind(&MCSClient::MaybeSendMessage, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + return; |
| + } |
| + DVLOG(1) << "Pending output message found, sending."; |
| if (!packet->persistent_id.empty()) |
| to_resend_.push_back(packet); |
| SendPacketToWire(packet.get()); |
| @@ -379,8 +439,7 @@ void MCSClient::HandleMCSDataMesssage( |
| if (send) { |
| SendMessage( |
| MCSMessage(kDataMessageStanzaTag, |
| - response.PassAs<const google::protobuf::MessageLite>()), |
| - false); |
| + response.PassAs<const google::protobuf::MessageLite>())); |
| } |
| } |
| @@ -434,8 +493,7 @@ void MCSClient::HandlePacketFromWire( |
| unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { |
| SendMessage(MCSMessage(kIqStanzaTag, |
| BuildStreamAck(). |
| - PassAs<const google::protobuf::MessageLite>()), |
| - false); |
| + PassAs<const google::protobuf::MessageLite>())); |
| } |
| // The connection is alive, treat this message as a heartbeat ack. |
| @@ -493,7 +551,7 @@ void MCSClient::HandlePacketFromWire( |
| DCHECK_GE(stream_id_in_, 1U); |
| DVLOG(1) << "Received heartbeat ping, sending ack."; |
| SendMessage( |
| - MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); |
| + MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck())); |
| return; |
| case kHeartbeatAckTag: |
| DCHECK_GE(stream_id_in_, 1U); |