Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(57)

Side by Side Diff: google_apis/gcm/engine/mcs_client.cc

Issue 148293002: [GCM] Add basic collapse key support for upstream (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/mcs_client.h" 5 #include "google_apis/gcm/engine/mcs_client.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h" 8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h" 9 #include "base/metrics/histogram.h"
10 #include "base/strings/string_number_conversions.h" 10 #include "base/strings/string_number_conversions.h"
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
56 for (int i = 0; i < selective_ack.id_size(); ++i) { 56 for (int i = 0; i < selective_ack.id_size(); ++i) {
57 DCHECK(!selective_ack.id(i).empty()); 57 DCHECK(!selective_ack.id(i).empty());
58 new_list.push_back(selective_ack.id(i)); 58 new_list.push_back(selective_ack.id(i));
59 } 59 }
60 id_list->swap(new_list); 60 id_list->swap(new_list);
61 return true; 61 return true;
62 } 62 }
63 63
64 } // namespace 64 } // namespace
65 65
66 class CollapseKey {
67 public:
68 explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
69 ~CollapseKey();
70
71 // Comparison operator for use in maps.
72 bool operator<(const CollapseKey& right) const;
73
74 // Whether the message had a valid collapse key.
75 bool IsValid() const;
76
77 std::string token() const { return token_; }
78 std::string app_id() const { return app_id_; }
79 int64 device_user_id() const { return device_user_id_; }
80
81 private:
82 const std::string token_;
83 const std::string app_id_;
84 const int64 device_user_id_;
85 };
86
87 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
88 : token_(message.token()),
89 app_id_(message.category()),
90 device_user_id_(message.device_user_id()) {}
91
92 CollapseKey::~CollapseKey() {}
93
94 bool CollapseKey::IsValid() const {
95 // Device user id is optional, but the application id and token are not.
96 return !token_.empty() && !app_id_.empty();
97 }
98 bool CollapseKey::operator<(const CollapseKey& right) const {
jianli 2014/02/04 22:55:14 nit: empty line
Nicolas Zea 2014/02/12 23:35:46 Done.
99 if (device_user_id_ != right.device_user_id())
100 return device_user_id_ < right.device_user_id();
101 if (app_id_ != right.app_id())
102 return app_id_ < right.app_id();
103 return token_ < right.token();
104 }
105
66 struct ReliablePacketInfo { 106 struct ReliablePacketInfo {
67 ReliablePacketInfo(); 107 ReliablePacketInfo();
68 ~ReliablePacketInfo(); 108 ~ReliablePacketInfo();
69 109
70 // The stream id with which the message was sent. 110 // The stream id with which the message was sent.
71 uint32 stream_id; 111 uint32 stream_id;
72 112
73 // If reliable delivery was requested, the persistent id of the message. 113 // If reliable delivery was requested, the persistent id of the message.
74 std::string persistent_id; 114 std::string persistent_id;
75 115
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after
220 if (message.size() > kMaxMessageBytes) { 260 if (message.size() > kMaxMessageBytes) {
221 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); 261 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
222 return; 262 return;
223 } 263 }
224 264
225 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); 265 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
226 packet_info->tag = message.tag(); 266 packet_info->tag = message.tag();
227 packet_info->protobuf = message.CloneProtobuf(); 267 packet_info->protobuf = message.CloneProtobuf();
228 268
229 if (ttl > 0) { 269 if (ttl > 0) {
230 PersistentId persistent_id = GetNextPersistentId(); 270 DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
231 DVLOG(1) << "Setting persistent id to " << persistent_id; 271
232 packet_info->persistent_id = persistent_id; 272 // First check if this message should replace a pending message with the
233 SetPersistentId(persistent_id, 273 // same collapsed key.
jianli 2014/02/04 22:55:14 nit: collapse
Nicolas Zea 2014/02/12 23:35:46 Done.
234 packet_info->protobuf.get()); 274 mcs_proto::DataMessageStanza* data_message =
235 if (!gcm_store_->AddOutgoingMessage( 275 reinterpret_cast<mcs_proto::DataMessageStanza*>(
236 persistent_id, 276 packet_info->protobuf.get());
237 MCSMessage(message.tag(), 277 CollapseKey collapse_key(*data_message);
238 *(packet_info->protobuf)), 278 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
239 base::Bind(&MCSClient::OnGCMUpdateFinished, 279 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
240 weak_ptr_factory_.GetWeakPtr()))) { 280 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
241 NotifyMessageSendStatus(message.GetProtobuf(), 281 << original_packet->persistent_id;
242 APP_QUEUE_SIZE_LIMIT_REACHED); 282 original_packet->protobuf = packet_info->protobuf.Pass();
283 SetPersistentId(original_packet->persistent_id,
284 original_packet->protobuf.get());
285 gcm_store_->OverwriteOutgoingMessage(
286 original_packet->persistent_id,
287 message,
288 base::Bind(&MCSClient::OnGCMUpdateFinished,
289 weak_ptr_factory_.GetWeakPtr()));
290
291 // The message is already queued, return.
243 return; 292 return;
293 } else {
294 PersistentId persistent_id = GetNextPersistentId();
295 DVLOG(1) << "Setting persistent id to " << persistent_id;
296 packet_info->persistent_id = persistent_id;
297 SetPersistentId(persistent_id, packet_info->protobuf.get());
298 if (!gcm_store_->AddOutgoingMessage(
299 persistent_id,
300 MCSMessage(message.tag(), *(packet_info->protobuf)),
301 base::Bind(&MCSClient::OnGCMUpdateFinished,
302 weak_ptr_factory_.GetWeakPtr()))) {
303 NotifyMessageSendStatus(message.GetProtobuf(),
304 APP_QUEUE_SIZE_LIMIT_REACHED);
305 return;
306 }
244 } 307 }
308
309 if (collapse_key.IsValid())
310 collapse_key_map_[collapse_key] = packet_info.get();
245 } else if (!connection_factory_->IsEndpointReachable()) { 311 } else if (!connection_factory_->IsEndpointReachable()) {
246 DVLOG(1) << "No active connection, dropping message."; 312 DVLOG(1) << "No active connection, dropping message.";
247 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); 313 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
248 return; 314 return;
249 } 315 }
250 to_send_.push_back(make_linked_ptr(packet_info.release())); 316 to_send_.push_back(make_linked_ptr(packet_info.release()));
251 MaybeSendMessage(); 317 MaybeSendMessage();
252 } 318 }
253 319
254 void MCSClient::Destroy() { 320 void MCSClient::Destroy() {
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
356 422
357 // If the connection has been reset, do nothing. On reconnection 423 // If the connection has been reset, do nothing. On reconnection
358 // MaybeSendMessage will be automatically invoked again. 424 // MaybeSendMessage will be automatically invoked again.
359 // TODO(zea): consider doing TTL expiration at connection reset time, rather 425 // TODO(zea): consider doing TTL expiration at connection reset time, rather
360 // than reconnect time. 426 // than reconnect time.
361 if (!connection_factory_->IsEndpointReachable()) 427 if (!connection_factory_->IsEndpointReachable())
362 return; 428 return;
363 429
364 MCSPacketInternal packet = to_send_.front(); 430 MCSPacketInternal packet = to_send_.front();
365 to_send_.pop_front(); 431 to_send_.pop_front();
432 if (packet->tag == kDataMessageStanzaTag) {
433 mcs_proto::DataMessageStanza* data_message =
434 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
435 CollapseKey collapse_key(*data_message);
436 if (collapse_key.IsValid())
437 collapse_key_map_.erase(collapse_key);
438 }
366 if (HasTTLExpired(*packet->protobuf, clock_)) { 439 if (HasTTLExpired(*packet->protobuf, clock_)) {
367 DCHECK(!packet->persistent_id.empty()); 440 DCHECK(!packet->persistent_id.empty());
368 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; 441 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
369 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); 442 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
370 gcm_store_->RemoveOutgoingMessage( 443 gcm_store_->RemoveOutgoingMessage(
371 packet->persistent_id, 444 packet->persistent_id,
372 base::Bind(&MCSClient::OnGCMUpdateFinished, 445 base::Bind(&MCSClient::OnGCMUpdateFinished,
373 weak_ptr_factory_.GetWeakPtr())); 446 weak_ptr_factory_.GetWeakPtr()));
374 base::MessageLoop::current()->PostTask( 447 base::MessageLoop::current()->PostTask(
375 FROM_HERE, 448 FROM_HERE,
(...skipping 367 matching lines...) Expand 10 before | Expand all | Expand 10 after
743 data_message_stanza->category(), 816 data_message_stanza->category(),
744 data_message_stanza->id(), 817 data_message_stanza->id(),
745 status); 818 status);
746 } 819 }
747 820
748 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { 821 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) {
749 gcm_store_ = gcm_store; 822 gcm_store_ = gcm_store;
750 } 823 }
751 824
752 } // namespace gcm 825 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698