Index: google_apis/gcm/engine/mcs_client.cc |
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc |
index 5e6490ac2d3ec497ec20d680d267bb15f042ab52..f0e2911770ff73d9a25c4e8984032958b80b1d3b 100644 |
--- a/google_apis/gcm/engine/mcs_client.cc |
+++ b/google_apis/gcm/engine/mcs_client.cc |
@@ -226,21 +226,58 @@ void MCSClient::SendMessage(const MCSMessage& message) { |
packet_info->protobuf = message.CloneProtobuf(); |
if (ttl > 0) { |
- PersistentId persistent_id = GetNextPersistentId(); |
- DVLOG(1) << "Setting persistent id to " << persistent_id; |
- packet_info->persistent_id = persistent_id; |
- SetPersistentId(persistent_id, |
- packet_info->protobuf.get()); |
- if (!gcm_store_->AddOutgoingMessage( |
- persistent_id, |
- MCSMessage(message.tag(), |
- *(packet_info->protobuf)), |
- base::Bind(&MCSClient::OnGCMUpdateFinished, |
- weak_ptr_factory_.GetWeakPtr()))) { |
- NotifyMessageSendStatus(message.GetProtobuf(), |
- APP_QUEUE_SIZE_LIMIT_REACHED); |
+ DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
+ |
+ // First check if this message should replace a pending message with the |
+ // same collapsed key. |
+ mcs_proto::DataMessageStanza* data_message = |
+ reinterpret_cast<mcs_proto::DataMessageStanza*>( |
+ packet_info->protobuf.get()); |
+ std::string app_id = data_message->from(); |
fgorski
2014/01/27 17:39:38
same here
Nicolas Zea
2014/01/31 11:58:43
Done.
|
+ DCHECK(!app_id.empty()); |
+ std::string token = data_message->token(); |
+ if (!token.empty() && |
+ collapse_key_map_.count(app_id) != 0 && |
+ collapse_key_map_[app_id]->count(token) != 0) { |
+ ReliablePacketInfo* original_packet = |
+ (*collapse_key_map_[app_id])[data_message->token()]; |
+ DVLOG(1) << "Found matching collapse key, Reusing persistent id of " |
+ << original_packet->persistent_id; |
fgorski
2014/01/27 17:39:38
Is it possible that you are assigning a persistent
Nicolas Zea
2014/01/28 08:15:35
SendMessage is only invoked for new messages, not
fgorski
2014/01/28 18:10:27
Does that mean we can fill RMQ for the application
Nicolas Zea
2014/01/31 11:58:43
Currently yes, if all the messages manage to send,
|
+ original_packet->protobuf = packet_info->protobuf.Pass(); |
+ SetPersistentId(original_packet->persistent_id, |
+ original_packet->protobuf.get()); |
+ gcm_store_->OverwriteOutgoingMessage( |
+ original_packet->persistent_id, |
+ message, |
+ base::Bind(&MCSClient::OnGCMUpdateFinished, |
+ weak_ptr_factory_.GetWeakPtr())); |
+ |
+ // The message is already queued, return. |
return; |
+ } else { |
+ PersistentId persistent_id = GetNextPersistentId(); |
+ DVLOG(1) << "Setting persistent id to " << persistent_id; |
+ packet_info->persistent_id = persistent_id; |
+ SetPersistentId(persistent_id, |
+ packet_info->protobuf.get()); |
+ if (!gcm_store_->AddOutgoingMessage( |
+ persistent_id, |
+ MCSMessage(message.tag(), |
+ *(packet_info->protobuf)), |
+ base::Bind(&MCSClient::OnGCMUpdateFinished, |
+ weak_ptr_factory_.GetWeakPtr()))) { |
+ NotifyMessageSendStatus(message.GetProtobuf(), |
+ APP_QUEUE_SIZE_LIMIT_REACHED); |
+ return; |
+ } |
+ } |
+ |
+ if (!token.empty()) { |
+ if (!collapse_key_map_[app_id].get()) |
+ collapse_key_map_[app_id] = make_linked_ptr(new TokenMap()); |
+ (*collapse_key_map_[app_id])[token] = packet_info.get(); |
} |
+ |
} else if (!connection_factory_->IsEndpointReachable()) { |
DVLOG(1) << "No active connection, dropping message."; |
NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
@@ -362,6 +399,15 @@ void MCSClient::MaybeSendMessage() { |
MCSPacketInternal packet = to_send_.front(); |
to_send_.pop_front(); |
+ if (packet->tag == kDataMessageStanzaTag) { |
+ mcs_proto::DataMessageStanza* data_message = |
+ reinterpret_cast<mcs_proto::DataMessageStanza*>( |
+ packet->protobuf.get()); |
+ std::string app_id = data_message->from(); |
fgorski
2014/01/27 17:39:38
ditto: category
Nicolas Zea
2014/01/31 11:58:43
Done.
|
+ std::string token = data_message->token(); |
+ if (!token.empty()) |
+ collapse_key_map_[app_id]->erase(token); |
+ } |
if (HasTTLExpired(*packet->protobuf, clock_)) { |
DCHECK(!packet->persistent_id.empty()); |
DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |