Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(145)

Side by Side Diff: google_apis/gcm/engine/mcs_client.cc

Issue 148293002: [GCM] Add basic collapse key support for upstream (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Address comments Created 6 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/engine/mcs_client_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/mcs_client.h" 5 #include "google_apis/gcm/engine/mcs_client.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h" 8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h" 9 #include "base/metrics/histogram.h"
10 #include "base/strings/string_number_conversions.h" 10 #include "base/strings/string_number_conversions.h"
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
56 for (int i = 0; i < selective_ack.id_size(); ++i) { 56 for (int i = 0; i < selective_ack.id_size(); ++i) {
57 DCHECK(!selective_ack.id(i).empty()); 57 DCHECK(!selective_ack.id(i).empty());
58 new_list.push_back(selective_ack.id(i)); 58 new_list.push_back(selective_ack.id(i));
59 } 59 }
60 id_list->swap(new_list); 60 id_list->swap(new_list);
61 return true; 61 return true;
62 } 62 }
63 63
64 } // namespace 64 } // namespace
65 65
66 class CollapseKey {
67 public:
68 explicit CollapseKey(const mcs_proto::DataMessageStanza& message);
69 ~CollapseKey();
70
71 // Comparison operator for use in maps.
72 bool operator<(const CollapseKey& right) const;
73
74 // Whether the message had a valid collapse key.
75 bool IsValid() const;
76
77 std::string token() const { return token_; }
78 std::string app_id() const { return app_id_; }
79 int64 device_user_id() const { return device_user_id_; }
80
81 private:
82 const std::string token_;
83 const std::string app_id_;
84 const int64 device_user_id_;
85 };
86
87 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message)
88 : token_(message.token()),
89 app_id_(message.category()),
90 device_user_id_(message.device_user_id()) {}
91
92 CollapseKey::~CollapseKey() {}
93
94 bool CollapseKey::IsValid() const {
95 // Device user id is optional, but the application id and token are not.
96 return !token_.empty() && !app_id_.empty();
97 }
98
99 bool CollapseKey::operator<(const CollapseKey& right) const {
100 if (device_user_id_ != right.device_user_id())
101 return device_user_id_ < right.device_user_id();
102 if (app_id_ != right.app_id())
103 return app_id_ < right.app_id();
104 return token_ < right.token();
105 }
106
66 struct ReliablePacketInfo { 107 struct ReliablePacketInfo {
67 ReliablePacketInfo(); 108 ReliablePacketInfo();
68 ~ReliablePacketInfo(); 109 ~ReliablePacketInfo();
69 110
70 // The stream id with which the message was sent. 111 // The stream id with which the message was sent.
71 uint32 stream_id; 112 uint32 stream_id;
72 113
73 // If reliable delivery was requested, the persistent id of the message. 114 // If reliable delivery was requested, the persistent id of the message.
74 std::string persistent_id; 115 std::string persistent_id;
75 116
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
182 // Now go through and add the outgoing messages to the send queue in their 223 // Now go through and add the outgoing messages to the send queue in their
183 // appropriate order (oldest at front, most recent at back). 224 // appropriate order (oldest at front, most recent at back).
184 for (std::map<uint64, google::protobuf::MessageLite*>::iterator 225 for (std::map<uint64, google::protobuf::MessageLite*>::iterator
185 iter = ordered_messages.begin(); 226 iter = ordered_messages.begin();
186 iter != ordered_messages.end(); ++iter) { 227 iter != ordered_messages.end(); ++iter) {
187 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 228 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
188 packet_info->protobuf.reset(iter->second); 229 packet_info->protobuf.reset(iter->second);
189 packet_info->tag = GetMCSProtoTag(*iter->second); 230 packet_info->tag = GetMCSProtoTag(*iter->second);
190 packet_info->persistent_id = base::Uint64ToString(iter->first); 231 packet_info->persistent_id = base::Uint64ToString(iter->first);
191 to_send_.push_back(make_linked_ptr(packet_info)); 232 to_send_.push_back(make_linked_ptr(packet_info));
233
234 if (packet_info->tag == kDataMessageStanzaTag) {
235 mcs_proto::DataMessageStanza* data_message =
236 reinterpret_cast<mcs_proto::DataMessageStanza*>(
237 packet_info->protobuf.get());
238 CollapseKey collapse_key(*data_message);
239 if (collapse_key.IsValid())
240 collapse_key_map_[collapse_key] = packet_info;
241 }
192 } 242 }
193 } 243 }
194 244
195 void MCSClient::Login(uint64 android_id, uint64 security_token) { 245 void MCSClient::Login(uint64 android_id, uint64 security_token) {
196 DCHECK_EQ(state_, LOADED); 246 DCHECK_EQ(state_, LOADED);
197 DCHECK(android_id_ == 0 || android_id_ == android_id); 247 DCHECK(android_id_ == 0 || android_id_ == android_id);
198 DCHECK(security_token_ == 0 || security_token_ == security_token); 248 DCHECK(security_token_ == 0 || security_token_ == security_token);
199 249
200 if (android_id != android_id_ && security_token != security_token_) { 250 if (android_id != android_id_ && security_token != security_token_) {
201 DCHECK(android_id); 251 DCHECK(android_id);
(...skipping 18 matching lines...) Expand all
220 if (message.size() > kMaxMessageBytes) { 270 if (message.size() > kMaxMessageBytes) {
221 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); 271 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
222 return; 272 return;
223 } 273 }
224 274
225 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); 275 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
226 packet_info->tag = message.tag(); 276 packet_info->tag = message.tag();
227 packet_info->protobuf = message.CloneProtobuf(); 277 packet_info->protobuf = message.CloneProtobuf();
228 278
229 if (ttl > 0) { 279 if (ttl > 0) {
230 PersistentId persistent_id = GetNextPersistentId(); 280 DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
231 DVLOG(1) << "Setting persistent id to " << persistent_id; 281
232 packet_info->persistent_id = persistent_id; 282 // First check if this message should replace a pending message with the
233 SetPersistentId(persistent_id, 283 // same collapse key.
234 packet_info->protobuf.get()); 284 mcs_proto::DataMessageStanza* data_message =
235 if (!gcm_store_->AddOutgoingMessage( 285 reinterpret_cast<mcs_proto::DataMessageStanza*>(
236 persistent_id, 286 packet_info->protobuf.get());
237 MCSMessage(message.tag(), 287 CollapseKey collapse_key(*data_message);
238 *(packet_info->protobuf)), 288 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) {
239 base::Bind(&MCSClient::OnGCMUpdateFinished, 289 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key];
240 weak_ptr_factory_.GetWeakPtr()))) { 290 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
241 NotifyMessageSendStatus(message.GetProtobuf(), 291 << original_packet->persistent_id;
242 APP_QUEUE_SIZE_LIMIT_REACHED); 292 original_packet->protobuf = packet_info->protobuf.Pass();
293 SetPersistentId(original_packet->persistent_id,
294 original_packet->protobuf.get());
295 gcm_store_->OverwriteOutgoingMessage(
296 original_packet->persistent_id,
297 message,
298 base::Bind(&MCSClient::OnGCMUpdateFinished,
299 weak_ptr_factory_.GetWeakPtr()));
300
301 // The message is already queued, return.
243 return; 302 return;
303 } else {
304 PersistentId persistent_id = GetNextPersistentId();
305 DVLOG(1) << "Setting persistent id to " << persistent_id;
306 packet_info->persistent_id = persistent_id;
307 SetPersistentId(persistent_id, packet_info->protobuf.get());
308 if (!gcm_store_->AddOutgoingMessage(
309 persistent_id,
310 MCSMessage(message.tag(), *(packet_info->protobuf)),
311 base::Bind(&MCSClient::OnGCMUpdateFinished,
312 weak_ptr_factory_.GetWeakPtr()))) {
313 NotifyMessageSendStatus(message.GetProtobuf(),
314 APP_QUEUE_SIZE_LIMIT_REACHED);
315 return;
316 }
244 } 317 }
318
319 if (collapse_key.IsValid())
320 collapse_key_map_[collapse_key] = packet_info.get();
245 } else if (!connection_factory_->IsEndpointReachable()) { 321 } else if (!connection_factory_->IsEndpointReachable()) {
246 DVLOG(1) << "No active connection, dropping message."; 322 DVLOG(1) << "No active connection, dropping message.";
247 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); 323 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
248 return; 324 return;
249 } 325 }
250 326
251 to_send_.push_back(make_linked_ptr(packet_info.release())); 327 to_send_.push_back(make_linked_ptr(packet_info.release()));
252 328
253 // Notify that the messages has been succsfully queued for sending. 329 // Notify that the messages has been succsfully queued for sending.
254 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds. 330 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds.
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after
307 // saved as necessary. 383 // saved as necessary.
308 while (!to_resend_.empty()) { 384 while (!to_resend_.empty()) {
309 to_send_.push_front(to_resend_.back()); 385 to_send_.push_front(to_resend_.back());
310 to_resend_.pop_back(); 386 to_resend_.pop_back();
311 } 387 }
312 388
313 // Drop all TTL == 0 or expired TTL messages from the queue. 389 // Drop all TTL == 0 or expired TTL messages from the queue.
314 std::deque<MCSPacketInternal> new_to_send; 390 std::deque<MCSPacketInternal> new_to_send;
315 std::vector<PersistentId> expired_ttl_ids; 391 std::vector<PersistentId> expired_ttl_ids;
316 while (!to_send_.empty()) { 392 while (!to_send_.empty()) {
317 MCSPacketInternal packet = to_send_.front(); 393 MCSPacketInternal packet = PopMessageForSend();
318 to_send_.pop_front();
319 if (GetTTL(*packet->protobuf) > 0 && 394 if (GetTTL(*packet->protobuf) > 0 &&
320 !HasTTLExpired(*packet->protobuf, clock_)) { 395 !HasTTLExpired(*packet->protobuf, clock_)) {
321 new_to_send.push_back(packet); 396 new_to_send.push_back(packet);
322 } else { 397 } else {
323 // If the TTL was 0 there is no persistent id, so no need to remove the 398 // If the TTL was 0 there is no persistent id, so no need to remove the
324 // message from the persistent store. 399 // message from the persistent store.
325 if (!packet->persistent_id.empty()) 400 if (!packet->persistent_id.empty())
326 expired_ttl_ids.push_back(packet->persistent_id); 401 expired_ttl_ids.push_back(packet->persistent_id);
327 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); 402 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
328 } 403 }
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
360 if (to_send_.empty()) 435 if (to_send_.empty())
361 return; 436 return;
362 437
363 // If the connection has been reset, do nothing. On reconnection 438 // If the connection has been reset, do nothing. On reconnection
364 // MaybeSendMessage will be automatically invoked again. 439 // MaybeSendMessage will be automatically invoked again.
365 // TODO(zea): consider doing TTL expiration at connection reset time, rather 440 // TODO(zea): consider doing TTL expiration at connection reset time, rather
366 // than reconnect time. 441 // than reconnect time.
367 if (!connection_factory_->IsEndpointReachable()) 442 if (!connection_factory_->IsEndpointReachable())
368 return; 443 return;
369 444
370 MCSPacketInternal packet = to_send_.front(); 445 MCSPacketInternal packet = PopMessageForSend();
371 to_send_.pop_front();
372 if (HasTTLExpired(*packet->protobuf, clock_)) { 446 if (HasTTLExpired(*packet->protobuf, clock_)) {
373 DCHECK(!packet->persistent_id.empty()); 447 DCHECK(!packet->persistent_id.empty());
374 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; 448 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
375 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); 449 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
376 gcm_store_->RemoveOutgoingMessage( 450 gcm_store_->RemoveOutgoingMessage(
377 packet->persistent_id, 451 packet->persistent_id,
378 base::Bind(&MCSClient::OnGCMUpdateFinished, 452 base::Bind(&MCSClient::OnGCMUpdateFinished,
379 weak_ptr_factory_.GetWeakPtr())); 453 weak_ptr_factory_.GetWeakPtr()));
380 base::MessageLoop::current()->PostTask( 454 base::MessageLoop::current()->PostTask(
381 FROM_HERE, 455 FROM_HERE,
(...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after
671 StreamId device_stream_id = outgoing_packet->stream_id; 745 StreamId device_stream_id = outgoing_packet->stream_id;
672 HandleServerConfirmedReceipt(device_stream_id); 746 HandleServerConfirmedReceipt(device_stream_id);
673 747
674 to_resend_.pop_front(); 748 to_resend_.pop_front();
675 } 749 }
676 750
677 // If the acknowledged ids aren't all there, they might be in the to_send_ 751 // If the acknowledged ids aren't all there, they might be in the to_send_
678 // queue (typically when a StreamAck confirms messages as part of a login 752 // queue (typically when a StreamAck confirms messages as part of a login
679 // response). 753 // response).
680 for (; iter != id_list.end() && !to_send_.empty(); ++iter) { 754 for (; iter != id_list.end() && !to_send_.empty(); ++iter) {
681 const MCSPacketInternal& outgoing_packet = to_send_.front(); 755 const MCSPacketInternal& outgoing_packet = PopMessageForSend();
682 DCHECK_EQ(outgoing_packet->persistent_id, *iter); 756 DCHECK_EQ(outgoing_packet->persistent_id, *iter);
683 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); 757 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT);
684 758
685 // No need to re-acknowledge any server messages this message already 759 // No need to re-acknowledge any server messages this message already
686 // acknowledged. 760 // acknowledged.
687 StreamId device_stream_id = outgoing_packet->stream_id; 761 StreamId device_stream_id = outgoing_packet->stream_id;
688 HandleServerConfirmedReceipt(device_stream_id); 762 HandleServerConfirmedReceipt(device_stream_id);
689
690 to_send_.pop_front();
691 } 763 }
692 764
693 DCHECK(iter == id_list.end()); 765 DCHECK(iter == id_list.end());
694 766
695 DVLOG(1) << "Server acked " << id_list.size() 767 DVLOG(1) << "Server acked " << id_list.size()
696 << " messages, " << to_resend_.size() << " remaining unacked."; 768 << " messages, " << to_resend_.size() << " remaining unacked.";
697 gcm_store_->RemoveOutgoingMessages( 769 gcm_store_->RemoveOutgoingMessages(
698 id_list, 770 id_list,
699 base::Bind(&MCSClient::OnGCMUpdateFinished, 771 base::Bind(&MCSClient::OnGCMUpdateFinished,
700 weak_ptr_factory_.GetWeakPtr())); 772 weak_ptr_factory_.GetWeakPtr()));
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
748 data_message_stanza->device_user_id(), 820 data_message_stanza->device_user_id(),
749 data_message_stanza->category(), 821 data_message_stanza->category(),
750 data_message_stanza->id(), 822 data_message_stanza->id(),
751 status); 823 status);
752 } 824 }
753 825
754 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { 826 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) {
755 gcm_store_ = gcm_store; 827 gcm_store_ = gcm_store;
756 } 828 }
757 829
830 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() {
831 MCSPacketInternal packet = to_send_.front();
832 to_send_.pop_front();
833
834 if (packet->tag == kDataMessageStanzaTag) {
835 mcs_proto::DataMessageStanza* data_message =
836 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get());
837 CollapseKey collapse_key(*data_message);
838 if (collapse_key.IsValid())
839 collapse_key_map_.erase(collapse_key);
840 }
841
842 return packet;
843 }
844
758 } // namespace gcm 845 } // namespace gcm
OLDNEW
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/engine/mcs_client_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698