OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
9 #include "base/metrics/histogram.h" | 9 #include "base/metrics/histogram.h" |
10 #include "base/strings/string_number_conversions.h" | 10 #include "base/strings/string_number_conversions.h" |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
56 for (int i = 0; i < selective_ack.id_size(); ++i) { | 56 for (int i = 0; i < selective_ack.id_size(); ++i) { |
57 DCHECK(!selective_ack.id(i).empty()); | 57 DCHECK(!selective_ack.id(i).empty()); |
58 new_list.push_back(selective_ack.id(i)); | 58 new_list.push_back(selective_ack.id(i)); |
59 } | 59 } |
60 id_list->swap(new_list); | 60 id_list->swap(new_list); |
61 return true; | 61 return true; |
62 } | 62 } |
63 | 63 |
64 } // namespace | 64 } // namespace |
65 | 65 |
| 66 class CollapseKey { |
| 67 public: |
| 68 explicit CollapseKey(const mcs_proto::DataMessageStanza& message); |
| 69 ~CollapseKey(); |
| 70 |
| 71 // Comparison operator for use in maps. |
| 72 bool operator<(const CollapseKey& right) const; |
| 73 |
| 74 // Whether the message had a valid collapse key. |
| 75 bool IsValid() const; |
| 76 |
| 77 std::string token() const { return token_; } |
| 78 std::string app_id() const { return app_id_; } |
| 79 int64 device_user_id() const { return device_user_id_; } |
| 80 |
| 81 private: |
| 82 const std::string token_; |
| 83 const std::string app_id_; |
| 84 const int64 device_user_id_; |
| 85 }; |
| 86 |
| 87 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) |
| 88 : token_(message.token()), |
| 89 app_id_(message.category()), |
| 90 device_user_id_(message.device_user_id()) {} |
| 91 |
| 92 CollapseKey::~CollapseKey() {} |
| 93 |
| 94 bool CollapseKey::IsValid() const { |
| 95 // Device user id is optional, but the application id and token are not. |
| 96 return !token_.empty() && !app_id_.empty(); |
| 97 } |
| 98 |
| 99 bool CollapseKey::operator<(const CollapseKey& right) const { |
| 100 if (device_user_id_ != right.device_user_id()) |
| 101 return device_user_id_ < right.device_user_id(); |
| 102 if (app_id_ != right.app_id()) |
| 103 return app_id_ < right.app_id(); |
| 104 return token_ < right.token(); |
| 105 } |
| 106 |
66 struct ReliablePacketInfo { | 107 struct ReliablePacketInfo { |
67 ReliablePacketInfo(); | 108 ReliablePacketInfo(); |
68 ~ReliablePacketInfo(); | 109 ~ReliablePacketInfo(); |
69 | 110 |
70 // The stream id with which the message was sent. | 111 // The stream id with which the message was sent. |
71 uint32 stream_id; | 112 uint32 stream_id; |
72 | 113 |
73 // If reliable delivery was requested, the persistent id of the message. | 114 // If reliable delivery was requested, the persistent id of the message. |
74 std::string persistent_id; | 115 std::string persistent_id; |
75 | 116 |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
182 // Now go through and add the outgoing messages to the send queue in their | 223 // Now go through and add the outgoing messages to the send queue in their |
183 // appropriate order (oldest at front, most recent at back). | 224 // appropriate order (oldest at front, most recent at back). |
184 for (std::map<uint64, google::protobuf::MessageLite*>::iterator | 225 for (std::map<uint64, google::protobuf::MessageLite*>::iterator |
185 iter = ordered_messages.begin(); | 226 iter = ordered_messages.begin(); |
186 iter != ordered_messages.end(); ++iter) { | 227 iter != ordered_messages.end(); ++iter) { |
187 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 228 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
188 packet_info->protobuf.reset(iter->second); | 229 packet_info->protobuf.reset(iter->second); |
189 packet_info->tag = GetMCSProtoTag(*iter->second); | 230 packet_info->tag = GetMCSProtoTag(*iter->second); |
190 packet_info->persistent_id = base::Uint64ToString(iter->first); | 231 packet_info->persistent_id = base::Uint64ToString(iter->first); |
191 to_send_.push_back(make_linked_ptr(packet_info)); | 232 to_send_.push_back(make_linked_ptr(packet_info)); |
| 233 |
| 234 if (packet_info->tag == kDataMessageStanzaTag) { |
| 235 mcs_proto::DataMessageStanza* data_message = |
| 236 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
| 237 packet_info->protobuf.get()); |
| 238 CollapseKey collapse_key(*data_message); |
| 239 if (collapse_key.IsValid()) |
| 240 collapse_key_map_[collapse_key] = packet_info; |
| 241 } |
192 } | 242 } |
193 } | 243 } |
194 | 244 |
195 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 245 void MCSClient::Login(uint64 android_id, uint64 security_token) { |
196 DCHECK_EQ(state_, LOADED); | 246 DCHECK_EQ(state_, LOADED); |
197 DCHECK(android_id_ == 0 || android_id_ == android_id); | 247 DCHECK(android_id_ == 0 || android_id_ == android_id); |
198 DCHECK(security_token_ == 0 || security_token_ == security_token); | 248 DCHECK(security_token_ == 0 || security_token_ == security_token); |
199 | 249 |
200 if (android_id != android_id_ && security_token != security_token_) { | 250 if (android_id != android_id_ && security_token != security_token_) { |
201 DCHECK(android_id); | 251 DCHECK(android_id); |
(...skipping 18 matching lines...) Expand all Loading... |
220 if (message.size() > kMaxMessageBytes) { | 270 if (message.size() > kMaxMessageBytes) { |
221 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); | 271 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); |
222 return; | 272 return; |
223 } | 273 } |
224 | 274 |
225 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); | 275 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); |
226 packet_info->tag = message.tag(); | 276 packet_info->tag = message.tag(); |
227 packet_info->protobuf = message.CloneProtobuf(); | 277 packet_info->protobuf = message.CloneProtobuf(); |
228 | 278 |
229 if (ttl > 0) { | 279 if (ttl > 0) { |
230 PersistentId persistent_id = GetNextPersistentId(); | 280 DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
231 DVLOG(1) << "Setting persistent id to " << persistent_id; | 281 |
232 packet_info->persistent_id = persistent_id; | 282 // First check if this message should replace a pending message with the |
233 SetPersistentId(persistent_id, | 283 // same collapse key. |
234 packet_info->protobuf.get()); | 284 mcs_proto::DataMessageStanza* data_message = |
235 if (!gcm_store_->AddOutgoingMessage( | 285 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
236 persistent_id, | 286 packet_info->protobuf.get()); |
237 MCSMessage(message.tag(), | 287 CollapseKey collapse_key(*data_message); |
238 *(packet_info->protobuf)), | 288 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { |
239 base::Bind(&MCSClient::OnGCMUpdateFinished, | 289 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; |
240 weak_ptr_factory_.GetWeakPtr()))) { | 290 DVLOG(1) << "Found matching collapse key, Reusing persistent id of " |
241 NotifyMessageSendStatus(message.GetProtobuf(), | 291 << original_packet->persistent_id; |
242 APP_QUEUE_SIZE_LIMIT_REACHED); | 292 original_packet->protobuf = packet_info->protobuf.Pass(); |
| 293 SetPersistentId(original_packet->persistent_id, |
| 294 original_packet->protobuf.get()); |
| 295 gcm_store_->OverwriteOutgoingMessage( |
| 296 original_packet->persistent_id, |
| 297 message, |
| 298 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 299 weak_ptr_factory_.GetWeakPtr())); |
| 300 |
| 301 // The message is already queued, return. |
243 return; | 302 return; |
| 303 } else { |
| 304 PersistentId persistent_id = GetNextPersistentId(); |
| 305 DVLOG(1) << "Setting persistent id to " << persistent_id; |
| 306 packet_info->persistent_id = persistent_id; |
| 307 SetPersistentId(persistent_id, packet_info->protobuf.get()); |
| 308 if (!gcm_store_->AddOutgoingMessage( |
| 309 persistent_id, |
| 310 MCSMessage(message.tag(), *(packet_info->protobuf)), |
| 311 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 312 weak_ptr_factory_.GetWeakPtr()))) { |
| 313 NotifyMessageSendStatus(message.GetProtobuf(), |
| 314 APP_QUEUE_SIZE_LIMIT_REACHED); |
| 315 return; |
| 316 } |
244 } | 317 } |
| 318 |
| 319 if (collapse_key.IsValid()) |
| 320 collapse_key_map_[collapse_key] = packet_info.get(); |
245 } else if (!connection_factory_->IsEndpointReachable()) { | 321 } else if (!connection_factory_->IsEndpointReachable()) { |
246 DVLOG(1) << "No active connection, dropping message."; | 322 DVLOG(1) << "No active connection, dropping message."; |
247 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); | 323 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
248 return; | 324 return; |
249 } | 325 } |
250 | 326 |
251 to_send_.push_back(make_linked_ptr(packet_info.release())); | 327 to_send_.push_back(make_linked_ptr(packet_info.release())); |
252 | 328 |
253 // Notify that the messages has been succsfully queued for sending. | 329 // Notify that the messages has been succsfully queued for sending. |
254 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds. | 330 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds. |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
307 // saved as necessary. | 383 // saved as necessary. |
308 while (!to_resend_.empty()) { | 384 while (!to_resend_.empty()) { |
309 to_send_.push_front(to_resend_.back()); | 385 to_send_.push_front(to_resend_.back()); |
310 to_resend_.pop_back(); | 386 to_resend_.pop_back(); |
311 } | 387 } |
312 | 388 |
313 // Drop all TTL == 0 or expired TTL messages from the queue. | 389 // Drop all TTL == 0 or expired TTL messages from the queue. |
314 std::deque<MCSPacketInternal> new_to_send; | 390 std::deque<MCSPacketInternal> new_to_send; |
315 std::vector<PersistentId> expired_ttl_ids; | 391 std::vector<PersistentId> expired_ttl_ids; |
316 while (!to_send_.empty()) { | 392 while (!to_send_.empty()) { |
317 MCSPacketInternal packet = to_send_.front(); | 393 MCSPacketInternal packet = PopMessageForSend(); |
318 to_send_.pop_front(); | |
319 if (GetTTL(*packet->protobuf) > 0 && | 394 if (GetTTL(*packet->protobuf) > 0 && |
320 !HasTTLExpired(*packet->protobuf, clock_)) { | 395 !HasTTLExpired(*packet->protobuf, clock_)) { |
321 new_to_send.push_back(packet); | 396 new_to_send.push_back(packet); |
322 } else { | 397 } else { |
323 // If the TTL was 0 there is no persistent id, so no need to remove the | 398 // If the TTL was 0 there is no persistent id, so no need to remove the |
324 // message from the persistent store. | 399 // message from the persistent store. |
325 if (!packet->persistent_id.empty()) | 400 if (!packet->persistent_id.empty()) |
326 expired_ttl_ids.push_back(packet->persistent_id); | 401 expired_ttl_ids.push_back(packet->persistent_id); |
327 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 402 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
328 } | 403 } |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
360 if (to_send_.empty()) | 435 if (to_send_.empty()) |
361 return; | 436 return; |
362 | 437 |
363 // If the connection has been reset, do nothing. On reconnection | 438 // If the connection has been reset, do nothing. On reconnection |
364 // MaybeSendMessage will be automatically invoked again. | 439 // MaybeSendMessage will be automatically invoked again. |
365 // TODO(zea): consider doing TTL expiration at connection reset time, rather | 440 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
366 // than reconnect time. | 441 // than reconnect time. |
367 if (!connection_factory_->IsEndpointReachable()) | 442 if (!connection_factory_->IsEndpointReachable()) |
368 return; | 443 return; |
369 | 444 |
370 MCSPacketInternal packet = to_send_.front(); | 445 MCSPacketInternal packet = PopMessageForSend(); |
371 to_send_.pop_front(); | |
372 if (HasTTLExpired(*packet->protobuf, clock_)) { | 446 if (HasTTLExpired(*packet->protobuf, clock_)) { |
373 DCHECK(!packet->persistent_id.empty()); | 447 DCHECK(!packet->persistent_id.empty()); |
374 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 448 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
375 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 449 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
376 gcm_store_->RemoveOutgoingMessage( | 450 gcm_store_->RemoveOutgoingMessage( |
377 packet->persistent_id, | 451 packet->persistent_id, |
378 base::Bind(&MCSClient::OnGCMUpdateFinished, | 452 base::Bind(&MCSClient::OnGCMUpdateFinished, |
379 weak_ptr_factory_.GetWeakPtr())); | 453 weak_ptr_factory_.GetWeakPtr())); |
380 base::MessageLoop::current()->PostTask( | 454 base::MessageLoop::current()->PostTask( |
381 FROM_HERE, | 455 FROM_HERE, |
(...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
671 StreamId device_stream_id = outgoing_packet->stream_id; | 745 StreamId device_stream_id = outgoing_packet->stream_id; |
672 HandleServerConfirmedReceipt(device_stream_id); | 746 HandleServerConfirmedReceipt(device_stream_id); |
673 | 747 |
674 to_resend_.pop_front(); | 748 to_resend_.pop_front(); |
675 } | 749 } |
676 | 750 |
677 // If the acknowledged ids aren't all there, they might be in the to_send_ | 751 // If the acknowledged ids aren't all there, they might be in the to_send_ |
678 // queue (typically when a StreamAck confirms messages as part of a login | 752 // queue (typically when a StreamAck confirms messages as part of a login |
679 // response). | 753 // response). |
680 for (; iter != id_list.end() && !to_send_.empty(); ++iter) { | 754 for (; iter != id_list.end() && !to_send_.empty(); ++iter) { |
681 const MCSPacketInternal& outgoing_packet = to_send_.front(); | 755 const MCSPacketInternal& outgoing_packet = PopMessageForSend(); |
682 DCHECK_EQ(outgoing_packet->persistent_id, *iter); | 756 DCHECK_EQ(outgoing_packet->persistent_id, *iter); |
683 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); | 757 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); |
684 | 758 |
685 // No need to re-acknowledge any server messages this message already | 759 // No need to re-acknowledge any server messages this message already |
686 // acknowledged. | 760 // acknowledged. |
687 StreamId device_stream_id = outgoing_packet->stream_id; | 761 StreamId device_stream_id = outgoing_packet->stream_id; |
688 HandleServerConfirmedReceipt(device_stream_id); | 762 HandleServerConfirmedReceipt(device_stream_id); |
689 | |
690 to_send_.pop_front(); | |
691 } | 763 } |
692 | 764 |
693 DCHECK(iter == id_list.end()); | 765 DCHECK(iter == id_list.end()); |
694 | 766 |
695 DVLOG(1) << "Server acked " << id_list.size() | 767 DVLOG(1) << "Server acked " << id_list.size() |
696 << " messages, " << to_resend_.size() << " remaining unacked."; | 768 << " messages, " << to_resend_.size() << " remaining unacked."; |
697 gcm_store_->RemoveOutgoingMessages( | 769 gcm_store_->RemoveOutgoingMessages( |
698 id_list, | 770 id_list, |
699 base::Bind(&MCSClient::OnGCMUpdateFinished, | 771 base::Bind(&MCSClient::OnGCMUpdateFinished, |
700 weak_ptr_factory_.GetWeakPtr())); | 772 weak_ptr_factory_.GetWeakPtr())); |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
748 data_message_stanza->device_user_id(), | 820 data_message_stanza->device_user_id(), |
749 data_message_stanza->category(), | 821 data_message_stanza->category(), |
750 data_message_stanza->id(), | 822 data_message_stanza->id(), |
751 status); | 823 status); |
752 } | 824 } |
753 | 825 |
754 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { | 826 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { |
755 gcm_store_ = gcm_store; | 827 gcm_store_ = gcm_store; |
756 } | 828 } |
757 | 829 |
| 830 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() { |
| 831 MCSPacketInternal packet = to_send_.front(); |
| 832 to_send_.pop_front(); |
| 833 |
| 834 if (packet->tag == kDataMessageStanzaTag) { |
| 835 mcs_proto::DataMessageStanza* data_message = |
| 836 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
| 837 CollapseKey collapse_key(*data_message); |
| 838 if (collapse_key.IsValid()) |
| 839 collapse_key_map_.erase(collapse_key); |
| 840 } |
| 841 |
| 842 return packet; |
| 843 } |
| 844 |
758 } // namespace gcm | 845 } // namespace gcm |
OLD | NEW |