Chromium Code Reviews| Index: google_apis/gcm/engine/mcs_client.cc |
| diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc |
| index ca134f06589c14d832c6473ecc1534f4264c7a09..a73b2bb432d579b56abf2ec4933e564b5c0b7195 100644 |
| --- a/google_apis/gcm/engine/mcs_client.cc |
| +++ b/google_apis/gcm/engine/mcs_client.cc |
| @@ -63,6 +63,46 @@ bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, |
| } // namespace |
| +class CollapseKey { |
| + public: |
| + explicit CollapseKey(const mcs_proto::DataMessageStanza& message); |
| + ~CollapseKey(); |
| + |
| + // Comparison operator for use in maps. |
| + bool operator<(const CollapseKey& right) const; |
| + |
| + // Whether the message had a valid collapse key. |
| + bool IsValid() const; |
| + |
| + std::string token() const { return token_; } |
| + std::string app_id() const { return app_id_; } |
| + int64 device_user_id() const { return device_user_id_; } |
| + |
| + private: |
| + const std::string token_; |
| + const std::string app_id_; |
| + const int64 device_user_id_; |
| +}; |
| + |
| +CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) |
| + : token_(message.token()), |
| + app_id_(message.category()), |
| + device_user_id_(message.device_user_id()) {} |
| + |
| +CollapseKey::~CollapseKey() {} |
| + |
| +bool CollapseKey::IsValid() const { |
| + // Device user id is optional, but the application id and token are not. |
| + return !token_.empty() && !app_id_.empty(); |
| +} |
| +bool CollapseKey::operator<(const CollapseKey& right) const { |
|
jianli
2014/02/04 22:55:14
nit: empty line
Nicolas Zea
2014/02/12 23:35:46
Done.
|
| + if (device_user_id_ != right.device_user_id()) |
| + return device_user_id_ < right.device_user_id(); |
| + if (app_id_ != right.app_id()) |
| + return app_id_ < right.app_id(); |
| + return token_ < right.token(); |
| +} |
| + |
| struct ReliablePacketInfo { |
| ReliablePacketInfo(); |
| ~ReliablePacketInfo(); |
| @@ -227,21 +267,47 @@ void MCSClient::SendMessage(const MCSMessage& message) { |
| packet_info->protobuf = message.CloneProtobuf(); |
| if (ttl > 0) { |
| - PersistentId persistent_id = GetNextPersistentId(); |
| - DVLOG(1) << "Setting persistent id to " << persistent_id; |
| - packet_info->persistent_id = persistent_id; |
| - SetPersistentId(persistent_id, |
| - packet_info->protobuf.get()); |
| - if (!gcm_store_->AddOutgoingMessage( |
| - persistent_id, |
| - MCSMessage(message.tag(), |
| - *(packet_info->protobuf)), |
| - base::Bind(&MCSClient::OnGCMUpdateFinished, |
| - weak_ptr_factory_.GetWeakPtr()))) { |
| - NotifyMessageSendStatus(message.GetProtobuf(), |
| - APP_QUEUE_SIZE_LIMIT_REACHED); |
| + DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
| + |
| + // First check if this message should replace a pending message with the |
| + // same collapsed key. |
|
jianli
2014/02/04 22:55:14
nit: collapse
Nicolas Zea
2014/02/12 23:35:46
Done.
|
| + mcs_proto::DataMessageStanza* data_message = |
| + reinterpret_cast<mcs_proto::DataMessageStanza*>( |
| + packet_info->protobuf.get()); |
| + CollapseKey collapse_key(*data_message); |
| + if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { |
| + ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; |
| + DVLOG(1) << "Found matching collapse key, Reusing persistent id of " |
| + << original_packet->persistent_id; |
| + original_packet->protobuf = packet_info->protobuf.Pass(); |
| + SetPersistentId(original_packet->persistent_id, |
| + original_packet->protobuf.get()); |
| + gcm_store_->OverwriteOutgoingMessage( |
| + original_packet->persistent_id, |
| + message, |
| + base::Bind(&MCSClient::OnGCMUpdateFinished, |
| + weak_ptr_factory_.GetWeakPtr())); |
| + |
| + // The message is already queued, return. |
| return; |
| + } else { |
| + PersistentId persistent_id = GetNextPersistentId(); |
| + DVLOG(1) << "Setting persistent id to " << persistent_id; |
| + packet_info->persistent_id = persistent_id; |
| + SetPersistentId(persistent_id, packet_info->protobuf.get()); |
| + if (!gcm_store_->AddOutgoingMessage( |
| + persistent_id, |
| + MCSMessage(message.tag(), *(packet_info->protobuf)), |
| + base::Bind(&MCSClient::OnGCMUpdateFinished, |
| + weak_ptr_factory_.GetWeakPtr()))) { |
| + NotifyMessageSendStatus(message.GetProtobuf(), |
| + APP_QUEUE_SIZE_LIMIT_REACHED); |
| + return; |
| + } |
| } |
| + |
| + if (collapse_key.IsValid()) |
| + collapse_key_map_[collapse_key] = packet_info.get(); |
| } else if (!connection_factory_->IsEndpointReachable()) { |
| DVLOG(1) << "No active connection, dropping message."; |
| NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
| @@ -363,6 +429,13 @@ void MCSClient::MaybeSendMessage() { |
| MCSPacketInternal packet = to_send_.front(); |
| to_send_.pop_front(); |
| + if (packet->tag == kDataMessageStanzaTag) { |
| + mcs_proto::DataMessageStanza* data_message = |
| + reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
| + CollapseKey collapse_key(*data_message); |
| + if (collapse_key.IsValid()) |
| + collapse_key_map_.erase(collapse_key); |
| + } |
| if (HasTTLExpired(*packet->protobuf, clock_)) { |
| DCHECK(!packet->persistent_id.empty()); |
| DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |