Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(647)

Unified Diff: google_apis/gcm/engine/mcs_client.cc

Issue 148293002: [GCM] Add basic collapse key support for upstream (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: fix Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: google_apis/gcm/engine/mcs_client.cc
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
index 5e6490ac2d3ec497ec20d680d267bb15f042ab52..f0e2911770ff73d9a25c4e8984032958b80b1d3b 100644
--- a/google_apis/gcm/engine/mcs_client.cc
+++ b/google_apis/gcm/engine/mcs_client.cc
@@ -226,21 +226,58 @@ void MCSClient::SendMessage(const MCSMessage& message) {
packet_info->protobuf = message.CloneProtobuf();
if (ttl > 0) {
- PersistentId persistent_id = GetNextPersistentId();
- DVLOG(1) << "Setting persistent id to " << persistent_id;
- packet_info->persistent_id = persistent_id;
- SetPersistentId(persistent_id,
- packet_info->protobuf.get());
- if (!gcm_store_->AddOutgoingMessage(
- persistent_id,
- MCSMessage(message.tag(),
- *(packet_info->protobuf)),
- base::Bind(&MCSClient::OnGCMUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()))) {
- NotifyMessageSendStatus(message.GetProtobuf(),
- APP_QUEUE_SIZE_LIMIT_REACHED);
+ DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
+
+ // First check if this message should replace a pending message with the
+ // same collapsed key.
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(
+ packet_info->protobuf.get());
+ std::string app_id = data_message->from();
fgorski 2014/01/27 17:39:38 same here
Nicolas Zea 2014/01/31 11:58:43 Done.
+ DCHECK(!app_id.empty());
+ std::string token = data_message->token();
+ if (!token.empty() &&
+ collapse_key_map_.count(app_id) != 0 &&
+ collapse_key_map_[app_id]->count(token) != 0) {
+ ReliablePacketInfo* original_packet =
+ (*collapse_key_map_[app_id])[data_message->token()];
+ DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
+ << original_packet->persistent_id;
fgorski 2014/01/27 17:39:38 Is it possible that you are assigning a persistent
Nicolas Zea 2014/01/28 08:15:35 SendMessage is only invoked for new messages, not
fgorski 2014/01/28 18:10:27 Does that mean we can fill RMQ for the application
Nicolas Zea 2014/01/31 11:58:43 Currently yes, if all the messages manage to send,
+ original_packet->protobuf = packet_info->protobuf.Pass();
+ SetPersistentId(original_packet->persistent_id,
+ original_packet->protobuf.get());
+ gcm_store_->OverwriteOutgoingMessage(
+ original_packet->persistent_id,
+ message,
+ base::Bind(&MCSClient::OnGCMUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+
+ // The message is already queued, return.
return;
+ } else {
+ PersistentId persistent_id = GetNextPersistentId();
+ DVLOG(1) << "Setting persistent id to " << persistent_id;
+ packet_info->persistent_id = persistent_id;
+ SetPersistentId(persistent_id,
+ packet_info->protobuf.get());
+ if (!gcm_store_->AddOutgoingMessage(
+ persistent_id,
+ MCSMessage(message.tag(),
+ *(packet_info->protobuf)),
+ base::Bind(&MCSClient::OnGCMUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()))) {
+ NotifyMessageSendStatus(message.GetProtobuf(),
+ APP_QUEUE_SIZE_LIMIT_REACHED);
+ return;
+ }
+ }
+
+ if (!token.empty()) {
+ if (!collapse_key_map_[app_id].get())
+ collapse_key_map_[app_id] = make_linked_ptr(new TokenMap());
+ (*collapse_key_map_[app_id])[token] = packet_info.get();
}
+
} else if (!connection_factory_->IsEndpointReachable()) {
DVLOG(1) << "No active connection, dropping message.";
NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
@@ -362,6 +399,15 @@ void MCSClient::MaybeSendMessage() {
MCSPacketInternal packet = to_send_.front();
to_send_.pop_front();
+ if (packet->tag == kDataMessageStanzaTag) {
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(
+ packet->protobuf.get());
+ std::string app_id = data_message->from();
fgorski 2014/01/27 17:39:38 ditto: category
Nicolas Zea 2014/01/31 11:58:43 Done.
+ std::string token = data_message->token();
+ if (!token.empty())
+ collapse_key_map_[app_id]->erase(token);
+ }
if (HasTTLExpired(*packet->protobuf, clock_)) {
DCHECK(!packet->persistent_id.empty());
DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";

Powered by Google App Engine
This is Rietveld 408576698