| Index: google_apis/gcm/engine/mcs_client.cc
|
| diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
|
| index 0a0bfc92731d919fb3190cb2a7c04262673b426b..2f4edf188fb0d1e3aa04e10d47fc8d841e5bf9a7 100644
|
| --- a/google_apis/gcm/engine/mcs_client.cc
|
| +++ b/google_apis/gcm/engine/mcs_client.cc
|
| @@ -63,6 +63,47 @@ bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
|
|
|
| } // namespace
|
|
|
| +class CollapseKey {
|
| + public:
|
| + explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
|
| + ~CollapseKey();
|
| +
|
| + // Comparison operator for use in maps.
|
| + bool operator<(const CollapseKey& right) const;
|
| +
|
| + // Whether the message had a valid collapse key.
|
| + bool IsValid() const;
|
| +
|
| + std::string token() const { return token_; }
|
| + std::string app_id() const { return app_id_; }
|
| + int64 device_user_id() const { return device_user_id_; }
|
| +
|
| + private:
|
| + const std::string token_;
|
| + const std::string app_id_;
|
| + const int64 device_user_id_;
|
| +};
|
| +
|
| +CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
|
| + : token_(message.token()),
|
| + app_id_(message.category()),
|
| + device_user_id_(message.device_user_id()) {}
|
| +
|
| +CollapseKey::~CollapseKey() {}
|
| +
|
| +bool CollapseKey::IsValid() const {
|
| + // Device user id is optional, but the application id and token are not.
|
| + return !token_.empty() && !app_id_.empty();
|
| +}
|
| +
|
| +bool CollapseKey::operator<(const CollapseKey& right) const {
|
| + if (device_user_id_ != right.device_user_id())
|
| + return device_user_id_ < right.device_user_id();
|
| + if (app_id_ != right.app_id())
|
| + return app_id_ < right.app_id();
|
| + return token_ < right.token();
|
| +}
|
| +
|
| struct ReliablePacketInfo {
|
| ReliablePacketInfo();
|
| ~ReliablePacketInfo();
|
| @@ -189,6 +230,15 @@ void MCSClient::Initialize(
|
| packet_info->tag = GetMCSProtoTag(*iter->second);
|
| packet_info->persistent_id = base::Uint64ToString(iter->first);
|
| to_send_.push_back(make_linked_ptr(packet_info));
|
| +
|
| + if (packet_info->tag == kDataMessageStanzaTag) {
|
| + mcs_proto::DataMessageStanza* data_message =
|
| + reinterpret_cast<mcs_proto::DataMessageStanza*>(
|
| + packet_info->protobuf.get());
|
| + CollapseKey collapse_key(*data_message);
|
| + if (collapse_key.IsValid())
|
| + collapse_key_map_[collapse_key] = packet_info;
|
| + }
|
| }
|
| }
|
|
|
| @@ -227,21 +277,47 @@ void MCSClient::SendMessage(const MCSMessage& message) {
|
| packet_info->protobuf = message.CloneProtobuf();
|
|
|
| if (ttl > 0) {
|
| - PersistentId persistent_id = GetNextPersistentId();
|
| - DVLOG(1) << "Setting persistent id to " << persistent_id;
|
| - packet_info->persistent_id = persistent_id;
|
| - SetPersistentId(persistent_id,
|
| - packet_info->protobuf.get());
|
| - if (!gcm_store_->AddOutgoingMessage(
|
| - persistent_id,
|
| - MCSMessage(message.tag(),
|
| - *(packet_info->protobuf)),
|
| - base::Bind(&MCSClient::OnGCMUpdateFinished,
|
| - weak_ptr_factory_.GetWeakPtr()))) {
|
| - NotifyMessageSendStatus(message.GetProtobuf(),
|
| - APP_QUEUE_SIZE_LIMIT_REACHED);
|
| + DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
|
| +
|
| + // First check if this message should replace a pending message with the
|
| + // same collapse key.
|
| + mcs_proto::DataMessageStanza* data_message =
|
| + reinterpret_cast<mcs_proto::DataMessageStanza*>(
|
| + packet_info->protobuf.get());
|
| + CollapseKey collapse_key(*data_message);
|
| + if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
|
| + ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
|
| + DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
|
| + << original_packet->persistent_id;
|
| + original_packet->protobuf = packet_info->protobuf.Pass();
|
| + SetPersistentId(original_packet->persistent_id,
|
| + original_packet->protobuf.get());
|
| + gcm_store_->OverwriteOutgoingMessage(
|
| + original_packet->persistent_id,
|
| + message,
|
| + base::Bind(&MCSClient::OnGCMUpdateFinished,
|
| + weak_ptr_factory_.GetWeakPtr()));
|
| +
|
| + // The message is already queued, return.
|
| return;
|
| + } else {
|
| + PersistentId persistent_id = GetNextPersistentId();
|
| + DVLOG(1) << "Setting persistent id to " << persistent_id;
|
| + packet_info->persistent_id = persistent_id;
|
| + SetPersistentId(persistent_id, packet_info->protobuf.get());
|
| + if (!gcm_store_->AddOutgoingMessage(
|
| + persistent_id,
|
| + MCSMessage(message.tag(), *(packet_info->protobuf)),
|
| + base::Bind(&MCSClient::OnGCMUpdateFinished,
|
| + weak_ptr_factory_.GetWeakPtr()))) {
|
| + NotifyMessageSendStatus(message.GetProtobuf(),
|
| + APP_QUEUE_SIZE_LIMIT_REACHED);
|
| + return;
|
| + }
|
| }
|
| +
|
| + if (collapse_key.IsValid())
|
| + collapse_key_map_[collapse_key] = packet_info.get();
|
| } else if (!connection_factory_->IsEndpointReachable()) {
|
| DVLOG(1) << "No active connection, dropping message.";
|
| NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
|
| @@ -314,8 +390,7 @@ void MCSClient::ResetStateAndBuildLoginRequest(
|
| std::deque<MCSPacketInternal> new_to_send;
|
| std::vector<PersistentId> expired_ttl_ids;
|
| while (!to_send_.empty()) {
|
| - MCSPacketInternal packet = to_send_.front();
|
| - to_send_.pop_front();
|
| + MCSPacketInternal packet = PopMessageForSend();
|
| if (GetTTL(*packet->protobuf) > 0 &&
|
| !HasTTLExpired(*packet->protobuf, clock_)) {
|
| new_to_send.push_back(packet);
|
| @@ -367,8 +442,7 @@ void MCSClient::MaybeSendMessage() {
|
| if (!connection_factory_->IsEndpointReachable())
|
| return;
|
|
|
| - MCSPacketInternal packet = to_send_.front();
|
| - to_send_.pop_front();
|
| + MCSPacketInternal packet = PopMessageForSend();
|
| if (HasTTLExpired(*packet->protobuf, clock_)) {
|
| DCHECK(!packet->persistent_id.empty());
|
| DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
|
| @@ -678,7 +752,7 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
|
| // queue (typically when a StreamAck confirms messages as part of a login
|
| // response).
|
| for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
|
| - const MCSPacketInternal& outgoing_packet = to_send_.front();
|
| + const MCSPacketInternal& outgoing_packet = PopMessageForSend();
|
| DCHECK_EQ(outgoing_packet->persistent_id, *iter);
|
| NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
|
|
|
| @@ -686,8 +760,6 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
|
| // acknowledged.
|
| StreamId device_stream_id = outgoing_packet->stream_id;
|
| HandleServerConfirmedReceipt(device_stream_id);
|
| -
|
| - to_send_.pop_front();
|
| }
|
|
|
| DCHECK(iter == id_list.end());
|
| @@ -755,4 +827,19 @@ void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) {
|
| gcm_store_ = gcm_store;
|
| }
|
|
|
| +MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
|
| + MCSPacketInternal packet = to_send_.front();
|
| + to_send_.pop_front();
|
| +
|
| + if (packet->tag == kDataMessageStanzaTag) {
|
| + mcs_proto::DataMessageStanza* data_message =
|
| + reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
|
| + CollapseKey collapse_key(*data_message);
|
| + if (collapse_key.IsValid())
|
| + collapse_key_map_.erase(collapse_key);
|
| + }
|
| +
|
| + return packet;
|
| +}
|
| +
|
| } // namespace gcm
|
|
|