Index: google_apis/gcm/engine/mcs_client.cc |
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc |
index 0a0bfc92731d919fb3190cb2a7c04262673b426b..2f4edf188fb0d1e3aa04e10d47fc8d841e5bf9a7 100644 |
--- a/google_apis/gcm/engine/mcs_client.cc |
+++ b/google_apis/gcm/engine/mcs_client.cc |
@@ -63,6 +63,47 @@ bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, |
} // namespace |
+class CollapseKey { |
+ public: |
+ explicit CollapseKey(const mcs_proto::DataMessageStanza& message); |
+ ~CollapseKey(); |
+ |
+ // Comparison operator for use in maps. |
+ bool operator<(const CollapseKey& right) const; |
+ |
+ // Whether the message had a valid collapse key. |
+ bool IsValid() const; |
+ |
+ std::string token() const { return token_; } |
+ std::string app_id() const { return app_id_; } |
+ int64 device_user_id() const { return device_user_id_; } |
+ |
+ private: |
+ const std::string token_; |
+ const std::string app_id_; |
+ const int64 device_user_id_; |
+}; |
+ |
+CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) |
+ : token_(message.token()), |
+ app_id_(message.category()), |
+ device_user_id_(message.device_user_id()) {} |
+ |
+CollapseKey::~CollapseKey() {} |
+ |
+bool CollapseKey::IsValid() const { |
+ // Device user id is optional, but the application id and token are not. |
+ return !token_.empty() && !app_id_.empty(); |
+} |
+ |
+bool CollapseKey::operator<(const CollapseKey& right) const { |
+ if (device_user_id_ != right.device_user_id()) |
+ return device_user_id_ < right.device_user_id(); |
+ if (app_id_ != right.app_id()) |
+ return app_id_ < right.app_id(); |
+ return token_ < right.token(); |
+} |
+ |
struct ReliablePacketInfo { |
ReliablePacketInfo(); |
~ReliablePacketInfo(); |
@@ -189,6 +230,15 @@ void MCSClient::Initialize( |
packet_info->tag = GetMCSProtoTag(*iter->second); |
packet_info->persistent_id = base::Uint64ToString(iter->first); |
to_send_.push_back(make_linked_ptr(packet_info)); |
+ |
+ if (packet_info->tag == kDataMessageStanzaTag) { |
+ mcs_proto::DataMessageStanza* data_message = |
+ reinterpret_cast<mcs_proto::DataMessageStanza*>( |
+ packet_info->protobuf.get()); |
+ CollapseKey collapse_key(*data_message); |
+ if (collapse_key.IsValid()) |
+ collapse_key_map_[collapse_key] = packet_info; |
+ } |
} |
} |
@@ -227,21 +277,47 @@ void MCSClient::SendMessage(const MCSMessage& message) { |
packet_info->protobuf = message.CloneProtobuf(); |
if (ttl > 0) { |
- PersistentId persistent_id = GetNextPersistentId(); |
- DVLOG(1) << "Setting persistent id to " << persistent_id; |
- packet_info->persistent_id = persistent_id; |
- SetPersistentId(persistent_id, |
- packet_info->protobuf.get()); |
- if (!gcm_store_->AddOutgoingMessage( |
- persistent_id, |
- MCSMessage(message.tag(), |
- *(packet_info->protobuf)), |
- base::Bind(&MCSClient::OnGCMUpdateFinished, |
- weak_ptr_factory_.GetWeakPtr()))) { |
- NotifyMessageSendStatus(message.GetProtobuf(), |
- APP_QUEUE_SIZE_LIMIT_REACHED); |
+ DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
+ |
+ // First check if this message should replace a pending message with the |
+ // same collapse key. |
+ mcs_proto::DataMessageStanza* data_message = |
+ reinterpret_cast<mcs_proto::DataMessageStanza*>( |
+ packet_info->protobuf.get()); |
+ CollapseKey collapse_key(*data_message); |
+ if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { |
+ ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; |
+ DVLOG(1) << "Found matching collapse key, Reusing persistent id of " |
+ << original_packet->persistent_id; |
+ original_packet->protobuf = packet_info->protobuf.Pass(); |
+ SetPersistentId(original_packet->persistent_id, |
+ original_packet->protobuf.get()); |
+ gcm_store_->OverwriteOutgoingMessage( |
+ original_packet->persistent_id, |
+ message, |
+ base::Bind(&MCSClient::OnGCMUpdateFinished, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ |
+ // The message is already queued, return. |
return; |
+ } else { |
+ PersistentId persistent_id = GetNextPersistentId(); |
+ DVLOG(1) << "Setting persistent id to " << persistent_id; |
+ packet_info->persistent_id = persistent_id; |
+ SetPersistentId(persistent_id, packet_info->protobuf.get()); |
+ if (!gcm_store_->AddOutgoingMessage( |
+ persistent_id, |
+ MCSMessage(message.tag(), *(packet_info->protobuf)), |
+ base::Bind(&MCSClient::OnGCMUpdateFinished, |
+ weak_ptr_factory_.GetWeakPtr()))) { |
+ NotifyMessageSendStatus(message.GetProtobuf(), |
+ APP_QUEUE_SIZE_LIMIT_REACHED); |
+ return; |
+ } |
} |
+ |
+ if (collapse_key.IsValid()) |
+ collapse_key_map_[collapse_key] = packet_info.get(); |
} else if (!connection_factory_->IsEndpointReachable()) { |
DVLOG(1) << "No active connection, dropping message."; |
NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
@@ -314,8 +390,7 @@ void MCSClient::ResetStateAndBuildLoginRequest( |
std::deque<MCSPacketInternal> new_to_send; |
std::vector<PersistentId> expired_ttl_ids; |
while (!to_send_.empty()) { |
- MCSPacketInternal packet = to_send_.front(); |
- to_send_.pop_front(); |
+ MCSPacketInternal packet = PopMessageForSend(); |
if (GetTTL(*packet->protobuf) > 0 && |
!HasTTLExpired(*packet->protobuf, clock_)) { |
new_to_send.push_back(packet); |
@@ -367,8 +442,7 @@ void MCSClient::MaybeSendMessage() { |
if (!connection_factory_->IsEndpointReachable()) |
return; |
- MCSPacketInternal packet = to_send_.front(); |
- to_send_.pop_front(); |
+ MCSPacketInternal packet = PopMessageForSend(); |
if (HasTTLExpired(*packet->protobuf, clock_)) { |
DCHECK(!packet->persistent_id.empty()); |
DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
@@ -678,7 +752,7 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { |
// queue (typically when a StreamAck confirms messages as part of a login |
// response). |
for (; iter != id_list.end() && !to_send_.empty(); ++iter) { |
- const MCSPacketInternal& outgoing_packet = to_send_.front(); |
+ const MCSPacketInternal& outgoing_packet = PopMessageForSend(); |
DCHECK_EQ(outgoing_packet->persistent_id, *iter); |
NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); |
@@ -686,8 +760,6 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { |
// acknowledged. |
StreamId device_stream_id = outgoing_packet->stream_id; |
HandleServerConfirmedReceipt(device_stream_id); |
- |
- to_send_.pop_front(); |
} |
DCHECK(iter == id_list.end()); |
@@ -755,4 +827,19 @@ void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { |
gcm_store_ = gcm_store; |
} |
+MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() { |
+ MCSPacketInternal packet = to_send_.front(); |
+ to_send_.pop_front(); |
+ |
+ if (packet->tag == kDataMessageStanzaTag) { |
+ mcs_proto::DataMessageStanza* data_message = |
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
+ CollapseKey collapse_key(*data_message); |
+ if (collapse_key.IsValid()) |
+ collapse_key_map_.erase(collapse_key); |
+ } |
+ |
+ return packet; |
+} |
+ |
} // namespace gcm |