Index: google_apis/gcm/engine/mcs_client.cc |
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc |
index ca134f06589c14d832c6473ecc1534f4264c7a09..a73b2bb432d579b56abf2ec4933e564b5c0b7195 100644 |
--- a/google_apis/gcm/engine/mcs_client.cc |
+++ b/google_apis/gcm/engine/mcs_client.cc |
@@ -63,6 +63,46 @@ bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, |
} // namespace |
+class CollapseKey { |
+ public: |
+ explicit CollapseKey(const mcs_proto::DataMessageStanza& message); |
+ ~CollapseKey(); |
+ |
+ // Comparison operator for use in maps. |
+ bool operator<(const CollapseKey& right) const; |
+ |
+ // Whether the message had a valid collapse key. |
+ bool IsValid() const; |
+ |
+ std::string token() const { return token_; } |
+ std::string app_id() const { return app_id_; } |
+ int64 device_user_id() const { return device_user_id_; } |
+ |
+ private: |
+ const std::string token_; |
+ const std::string app_id_; |
+ const int64 device_user_id_; |
+}; |
+ |
+CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) |
+ : token_(message.token()), |
+ app_id_(message.category()), |
+ device_user_id_(message.device_user_id()) {} |
+ |
+CollapseKey::~CollapseKey() {} |
+ |
+bool CollapseKey::IsValid() const { |
+ // Device user id is optional, but the application id and token are not. |
+ return !token_.empty() && !app_id_.empty(); |
+} |
+bool CollapseKey::operator<(const CollapseKey& right) const { |
jianli
2014/02/04 22:55:14
nit: empty line
Nicolas Zea
2014/02/12 23:35:46
Done.
|
+ if (device_user_id_ != right.device_user_id()) |
+ return device_user_id_ < right.device_user_id(); |
+ if (app_id_ != right.app_id()) |
+ return app_id_ < right.app_id(); |
+ return token_ < right.token(); |
+} |
+ |
struct ReliablePacketInfo { |
ReliablePacketInfo(); |
~ReliablePacketInfo(); |
@@ -227,21 +267,47 @@ void MCSClient::SendMessage(const MCSMessage& message) { |
packet_info->protobuf = message.CloneProtobuf(); |
if (ttl > 0) { |
- PersistentId persistent_id = GetNextPersistentId(); |
- DVLOG(1) << "Setting persistent id to " << persistent_id; |
- packet_info->persistent_id = persistent_id; |
- SetPersistentId(persistent_id, |
- packet_info->protobuf.get()); |
- if (!gcm_store_->AddOutgoingMessage( |
- persistent_id, |
- MCSMessage(message.tag(), |
- *(packet_info->protobuf)), |
- base::Bind(&MCSClient::OnGCMUpdateFinished, |
- weak_ptr_factory_.GetWeakPtr()))) { |
- NotifyMessageSendStatus(message.GetProtobuf(), |
- APP_QUEUE_SIZE_LIMIT_REACHED); |
+ DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
+ |
+ // First check if this message should replace a pending message with the |
+ // same collapsed key. |
jianli
2014/02/04 22:55:14
nit: collapse
Nicolas Zea
2014/02/12 23:35:46
Done.
|
+ mcs_proto::DataMessageStanza* data_message = |
+ reinterpret_cast<mcs_proto::DataMessageStanza*>( |
+ packet_info->protobuf.get()); |
+ CollapseKey collapse_key(*data_message); |
+ if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { |
+ ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; |
+ DVLOG(1) << "Found matching collapse key, Reusing persistent id of " |
+ << original_packet->persistent_id; |
+ original_packet->protobuf = packet_info->protobuf.Pass(); |
+ SetPersistentId(original_packet->persistent_id, |
+ original_packet->protobuf.get()); |
+ gcm_store_->OverwriteOutgoingMessage( |
+ original_packet->persistent_id, |
+ message, |
+ base::Bind(&MCSClient::OnGCMUpdateFinished, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ |
+ // The message is already queued, return. |
return; |
+ } else { |
+ PersistentId persistent_id = GetNextPersistentId(); |
+ DVLOG(1) << "Setting persistent id to " << persistent_id; |
+ packet_info->persistent_id = persistent_id; |
+ SetPersistentId(persistent_id, packet_info->protobuf.get()); |
+ if (!gcm_store_->AddOutgoingMessage( |
+ persistent_id, |
+ MCSMessage(message.tag(), *(packet_info->protobuf)), |
+ base::Bind(&MCSClient::OnGCMUpdateFinished, |
+ weak_ptr_factory_.GetWeakPtr()))) { |
+ NotifyMessageSendStatus(message.GetProtobuf(), |
+ APP_QUEUE_SIZE_LIMIT_REACHED); |
+ return; |
+ } |
} |
+ |
+ if (collapse_key.IsValid()) |
+ collapse_key_map_[collapse_key] = packet_info.get(); |
} else if (!connection_factory_->IsEndpointReachable()) { |
DVLOG(1) << "No active connection, dropping message."; |
NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
@@ -363,6 +429,13 @@ void MCSClient::MaybeSendMessage() { |
MCSPacketInternal packet = to_send_.front(); |
to_send_.pop_front(); |
+ if (packet->tag == kDataMessageStanzaTag) { |
+ mcs_proto::DataMessageStanza* data_message = |
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
+ CollapseKey collapse_key(*data_message); |
+ if (collapse_key.IsValid()) |
+ collapse_key_map_.erase(collapse_key); |
+ } |
if (HasTTLExpired(*packet->protobuf, clock_)) { |
DCHECK(!packet->persistent_id.empty()); |
DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |