Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(672)

Unified Diff: google_apis/gcm/engine/mcs_client.cc

Issue 148293002: [GCM] Add basic collapse key support for upstream (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/engine/mcs_client_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: google_apis/gcm/engine/mcs_client.cc
diff --git a/google_apis/gcm/engine/mcs_client.cc b/google_apis/gcm/engine/mcs_client.cc
index 0a0bfc92731d919fb3190cb2a7c04262673b426b..2f4edf188fb0d1e3aa04e10d47fc8d841e5bf9a7 100644
--- a/google_apis/gcm/engine/mcs_client.cc
+++ b/google_apis/gcm/engine/mcs_client.cc
@@ -63,6 +63,47 @@ bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
} // namespace
+class CollapseKey {
+ public:
+ explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
+ ~CollapseKey();
+
+ // Comparison operator for use in maps.
+ bool operator<(const CollapseKey& right) const;
+
+ // Whether the message had a valid collapse key.
+ bool IsValid() const;
+
+ std::string token() const { return token_; }
+ std::string app_id() const { return app_id_; }
+ int64 device_user_id() const { return device_user_id_; }
+
+ private:
+ const std::string token_;
+ const std::string app_id_;
+ const int64 device_user_id_;
+};
+
+CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
+ : token_(message.token()),
+ app_id_(message.category()),
+ device_user_id_(message.device_user_id()) {}
+
+CollapseKey::~CollapseKey() {}
+
+bool CollapseKey::IsValid() const {
+ // Device user id is optional, but the application id and token are not.
+ return !token_.empty() && !app_id_.empty();
+}
+
+bool CollapseKey::operator<(const CollapseKey& right) const {
+ if (device_user_id_ != right.device_user_id())
+ return device_user_id_ < right.device_user_id();
+ if (app_id_ != right.app_id())
+ return app_id_ < right.app_id();
+ return token_ < right.token();
+}
+
struct ReliablePacketInfo {
ReliablePacketInfo();
~ReliablePacketInfo();
@@ -189,6 +230,15 @@ void MCSClient::Initialize(
packet_info->tag = GetMCSProtoTag(*iter->second);
packet_info->persistent_id = base::Uint64ToString(iter->first);
to_send_.push_back(make_linked_ptr(packet_info));
+
+ if (packet_info->tag == kDataMessageStanzaTag) {
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(
+ packet_info->protobuf.get());
+ CollapseKey collapse_key(*data_message);
+ if (collapse_key.IsValid())
+ collapse_key_map_[collapse_key] = packet_info;
+ }
}
}
@@ -227,21 +277,47 @@ void MCSClient::SendMessage(const MCSMessage& message) {
packet_info->protobuf = message.CloneProtobuf();
if (ttl > 0) {
- PersistentId persistent_id = GetNextPersistentId();
- DVLOG(1) << "Setting persistent id to " << persistent_id;
- packet_info->persistent_id = persistent_id;
- SetPersistentId(persistent_id,
- packet_info->protobuf.get());
- if (!gcm_store_->AddOutgoingMessage(
- persistent_id,
- MCSMessage(message.tag(),
- *(packet_info->protobuf)),
- base::Bind(&MCSClient::OnGCMUpdateFinished,
- weak_ptr_factory_.GetWeakPtr()))) {
- NotifyMessageSendStatus(message.GetProtobuf(),
- APP_QUEUE_SIZE_LIMIT_REACHED);
+ DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
+
+ // First check if this message should replace a pending message with the
+ // same collapse key.
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(
+ packet_info->protobuf.get());
+ CollapseKey collapse_key(*data_message);
+ if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
+ ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
+ DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
+ << original_packet->persistent_id;
+ original_packet->protobuf = packet_info->protobuf.Pass();
+ SetPersistentId(original_packet->persistent_id,
+ original_packet->protobuf.get());
+ gcm_store_->OverwriteOutgoingMessage(
+ original_packet->persistent_id,
+ message,
+ base::Bind(&MCSClient::OnGCMUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()));
+
+ // The message is already queued, return.
return;
+ } else {
+ PersistentId persistent_id = GetNextPersistentId();
+ DVLOG(1) << "Setting persistent id to " << persistent_id;
+ packet_info->persistent_id = persistent_id;
+ SetPersistentId(persistent_id, packet_info->protobuf.get());
+ if (!gcm_store_->AddOutgoingMessage(
+ persistent_id,
+ MCSMessage(message.tag(), *(packet_info->protobuf)),
+ base::Bind(&MCSClient::OnGCMUpdateFinished,
+ weak_ptr_factory_.GetWeakPtr()))) {
+ NotifyMessageSendStatus(message.GetProtobuf(),
+ APP_QUEUE_SIZE_LIMIT_REACHED);
+ return;
+ }
}
+
+ if (collapse_key.IsValid())
+ collapse_key_map_[collapse_key] = packet_info.get();
} else if (!connection_factory_->IsEndpointReachable()) {
DVLOG(1) << "No active connection, dropping message.";
NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
@@ -314,8 +390,7 @@ void MCSClient::ResetStateAndBuildLoginRequest(
std::deque<MCSPacketInternal> new_to_send;
std::vector<PersistentId> expired_ttl_ids;
while (!to_send_.empty()) {
- MCSPacketInternal packet = to_send_.front();
- to_send_.pop_front();
+ MCSPacketInternal packet = PopMessageForSend();
if (GetTTL(*packet->protobuf) > 0 &&
!HasTTLExpired(*packet->protobuf, clock_)) {
new_to_send.push_back(packet);
@@ -367,8 +442,7 @@ void MCSClient::MaybeSendMessage() {
if (!connection_factory_->IsEndpointReachable())
return;
- MCSPacketInternal packet = to_send_.front();
- to_send_.pop_front();
+ MCSPacketInternal packet = PopMessageForSend();
if (HasTTLExpired(*packet->protobuf, clock_)) {
DCHECK(!packet->persistent_id.empty());
DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
@@ -678,7 +752,7 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
// queue (typically when a StreamAck confirms messages as part of a login
// response).
for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
- const MCSPacketInternal& outgoing_packet = to_send_.front();
+ const MCSPacketInternal& outgoing_packet = PopMessageForSend();
DCHECK_EQ(outgoing_packet->persistent_id, *iter);
NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
@@ -686,8 +760,6 @@ void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
// acknowledged.
StreamId device_stream_id = outgoing_packet->stream_id;
HandleServerConfirmedReceipt(device_stream_id);
-
- to_send_.pop_front();
}
DCHECK(iter == id_list.end());
@@ -755,4 +827,19 @@ void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) {
gcm_store_ = gcm_store;
}
+MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
+ MCSPacketInternal packet = to_send_.front();
+ to_send_.pop_front();
+
+ if (packet->tag == kDataMessageStanzaTag) {
+ mcs_proto::DataMessageStanza* data_message =
+ reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
+ CollapseKey collapse_key(*data_message);
+ if (collapse_key.IsValid())
+ collapse_key_map_.erase(collapse_key);
+ }
+
+ return packet;
+}
+
} // namespace gcm
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/engine/mcs_client_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698