| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
| 9 #include "base/metrics/histogram.h" | 9 #include "base/metrics/histogram.h" |
| 10 #include "base/strings/string_number_conversions.h" | 10 #include "base/strings/string_number_conversions.h" |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 56 for (int i = 0; i < selective_ack.id_size(); ++i) { | 56 for (int i = 0; i < selective_ack.id_size(); ++i) { |
| 57 DCHECK(!selective_ack.id(i).empty()); | 57 DCHECK(!selective_ack.id(i).empty()); |
| 58 new_list.push_back(selective_ack.id(i)); | 58 new_list.push_back(selective_ack.id(i)); |
| 59 } | 59 } |
| 60 id_list->swap(new_list); | 60 id_list->swap(new_list); |
| 61 return true; | 61 return true; |
| 62 } | 62 } |
| 63 | 63 |
| 64 } // namespace | 64 } // namespace |
| 65 | 65 |
| 66 class CollapseKey { |
| 67 public: |
| 68 explicit CollapseKey(const mcs_proto::DataMessageStanza& message); |
| 69 ~CollapseKey(); |
| 70 |
| 71 // Comparison operator for use in maps. |
| 72 bool operator<(const CollapseKey& right) const; |
| 73 |
| 74 // Whether the message had a valid collapse key. |
| 75 bool IsValid() const; |
| 76 |
| 77 std::string token() const { return token_; } |
| 78 std::string app_id() const { return app_id_; } |
| 79 int64 device_user_id() const { return device_user_id_; } |
| 80 |
| 81 private: |
| 82 const std::string token_; |
| 83 const std::string app_id_; |
| 84 const int64 device_user_id_; |
| 85 }; |
| 86 |
| 87 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) |
| 88 : token_(message.token()), |
| 89 app_id_(message.category()), |
| 90 device_user_id_(message.device_user_id()) {} |
| 91 |
| 92 CollapseKey::~CollapseKey() {} |
| 93 |
| 94 bool CollapseKey::IsValid() const { |
| 95 // Device user id is optional, but the application id and token are not. |
| 96 return !token_.empty() && !app_id_.empty(); |
| 97 } |
| 98 |
| 99 bool CollapseKey::operator<(const CollapseKey& right) const { |
| 100 if (device_user_id_ != right.device_user_id()) |
| 101 return device_user_id_ < right.device_user_id(); |
| 102 if (app_id_ != right.app_id()) |
| 103 return app_id_ < right.app_id(); |
| 104 return token_ < right.token(); |
| 105 } |
| 106 |
| 66 struct ReliablePacketInfo { | 107 struct ReliablePacketInfo { |
| 67 ReliablePacketInfo(); | 108 ReliablePacketInfo(); |
| 68 ~ReliablePacketInfo(); | 109 ~ReliablePacketInfo(); |
| 69 | 110 |
| 70 // The stream id with which the message was sent. | 111 // The stream id with which the message was sent. |
| 71 uint32 stream_id; | 112 uint32 stream_id; |
| 72 | 113 |
| 73 // If reliable delivery was requested, the persistent id of the message. | 114 // If reliable delivery was requested, the persistent id of the message. |
| 74 std::string persistent_id; | 115 std::string persistent_id; |
| 75 | 116 |
| (...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 182 // Now go through and add the outgoing messages to the send queue in their | 223 // Now go through and add the outgoing messages to the send queue in their |
| 183 // appropriate order (oldest at front, most recent at back). | 224 // appropriate order (oldest at front, most recent at back). |
| 184 for (std::map<uint64, google::protobuf::MessageLite*>::iterator | 225 for (std::map<uint64, google::protobuf::MessageLite*>::iterator |
| 185 iter = ordered_messages.begin(); | 226 iter = ordered_messages.begin(); |
| 186 iter != ordered_messages.end(); ++iter) { | 227 iter != ordered_messages.end(); ++iter) { |
| 187 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 228 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| 188 packet_info->protobuf.reset(iter->second); | 229 packet_info->protobuf.reset(iter->second); |
| 189 packet_info->tag = GetMCSProtoTag(*iter->second); | 230 packet_info->tag = GetMCSProtoTag(*iter->second); |
| 190 packet_info->persistent_id = base::Uint64ToString(iter->first); | 231 packet_info->persistent_id = base::Uint64ToString(iter->first); |
| 191 to_send_.push_back(make_linked_ptr(packet_info)); | 232 to_send_.push_back(make_linked_ptr(packet_info)); |
| 233 |
| 234 if (packet_info->tag == kDataMessageStanzaTag) { |
| 235 mcs_proto::DataMessageStanza* data_message = |
| 236 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
| 237 packet_info->protobuf.get()); |
| 238 CollapseKey collapse_key(*data_message); |
| 239 if (collapse_key.IsValid()) |
| 240 collapse_key_map_[collapse_key] = packet_info; |
| 241 } |
| 192 } | 242 } |
| 193 } | 243 } |
| 194 | 244 |
| 195 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 245 void MCSClient::Login(uint64 android_id, uint64 security_token) { |
| 196 DCHECK_EQ(state_, LOADED); | 246 DCHECK_EQ(state_, LOADED); |
| 197 DCHECK(android_id_ == 0 || android_id_ == android_id); | 247 DCHECK(android_id_ == 0 || android_id_ == android_id); |
| 198 DCHECK(security_token_ == 0 || security_token_ == security_token); | 248 DCHECK(security_token_ == 0 || security_token_ == security_token); |
| 199 | 249 |
| 200 if (android_id != android_id_ && security_token != security_token_) { | 250 if (android_id != android_id_ && security_token != security_token_) { |
| 201 DCHECK(android_id); | 251 DCHECK(android_id); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 220 if (message.size() > kMaxMessageBytes) { | 270 if (message.size() > kMaxMessageBytes) { |
| 221 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); | 271 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); |
| 222 return; | 272 return; |
| 223 } | 273 } |
| 224 | 274 |
| 225 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); | 275 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); |
| 226 packet_info->tag = message.tag(); | 276 packet_info->tag = message.tag(); |
| 227 packet_info->protobuf = message.CloneProtobuf(); | 277 packet_info->protobuf = message.CloneProtobuf(); |
| 228 | 278 |
| 229 if (ttl > 0) { | 279 if (ttl > 0) { |
| 230 PersistentId persistent_id = GetNextPersistentId(); | 280 DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
| 231 DVLOG(1) << "Setting persistent id to " << persistent_id; | 281 |
| 232 packet_info->persistent_id = persistent_id; | 282 // First check if this message should replace a pending message with the |
| 233 SetPersistentId(persistent_id, | 283 // same collapse key. |
| 234 packet_info->protobuf.get()); | 284 mcs_proto::DataMessageStanza* data_message = |
| 235 if (!gcm_store_->AddOutgoingMessage( | 285 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
| 236 persistent_id, | 286 packet_info->protobuf.get()); |
| 237 MCSMessage(message.tag(), | 287 CollapseKey collapse_key(*data_message); |
| 238 *(packet_info->protobuf)), | 288 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { |
| 239 base::Bind(&MCSClient::OnGCMUpdateFinished, | 289 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; |
| 240 weak_ptr_factory_.GetWeakPtr()))) { | 290 DVLOG(1) << "Found matching collapse key, Reusing persistent id of " |
| 241 NotifyMessageSendStatus(message.GetProtobuf(), | 291 << original_packet->persistent_id; |
| 242 APP_QUEUE_SIZE_LIMIT_REACHED); | 292 original_packet->protobuf = packet_info->protobuf.Pass(); |
| 293 SetPersistentId(original_packet->persistent_id, |
| 294 original_packet->protobuf.get()); |
| 295 gcm_store_->OverwriteOutgoingMessage( |
| 296 original_packet->persistent_id, |
| 297 message, |
| 298 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 299 weak_ptr_factory_.GetWeakPtr())); |
| 300 |
| 301 // The message is already queued, return. |
| 243 return; | 302 return; |
| 303 } else { |
| 304 PersistentId persistent_id = GetNextPersistentId(); |
| 305 DVLOG(1) << "Setting persistent id to " << persistent_id; |
| 306 packet_info->persistent_id = persistent_id; |
| 307 SetPersistentId(persistent_id, packet_info->protobuf.get()); |
| 308 if (!gcm_store_->AddOutgoingMessage( |
| 309 persistent_id, |
| 310 MCSMessage(message.tag(), *(packet_info->protobuf)), |
| 311 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 312 weak_ptr_factory_.GetWeakPtr()))) { |
| 313 NotifyMessageSendStatus(message.GetProtobuf(), |
| 314 APP_QUEUE_SIZE_LIMIT_REACHED); |
| 315 return; |
| 316 } |
| 244 } | 317 } |
| 318 |
| 319 if (collapse_key.IsValid()) |
| 320 collapse_key_map_[collapse_key] = packet_info.get(); |
| 245 } else if (!connection_factory_->IsEndpointReachable()) { | 321 } else if (!connection_factory_->IsEndpointReachable()) { |
| 246 DVLOG(1) << "No active connection, dropping message."; | 322 DVLOG(1) << "No active connection, dropping message."; |
| 247 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); | 323 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
| 248 return; | 324 return; |
| 249 } | 325 } |
| 250 | 326 |
| 251 to_send_.push_back(make_linked_ptr(packet_info.release())); | 327 to_send_.push_back(make_linked_ptr(packet_info.release())); |
| 252 | 328 |
| 253 // Notify that the messages has been succsfully queued for sending. | 329 // Notify that the messages has been succsfully queued for sending. |
| 254 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds. | 330 // TODO(jianli): We should report QUEUED after writing to GCM store succeeds. |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 307 // saved as necessary. | 383 // saved as necessary. |
| 308 while (!to_resend_.empty()) { | 384 while (!to_resend_.empty()) { |
| 309 to_send_.push_front(to_resend_.back()); | 385 to_send_.push_front(to_resend_.back()); |
| 310 to_resend_.pop_back(); | 386 to_resend_.pop_back(); |
| 311 } | 387 } |
| 312 | 388 |
| 313 // Drop all TTL == 0 or expired TTL messages from the queue. | 389 // Drop all TTL == 0 or expired TTL messages from the queue. |
| 314 std::deque<MCSPacketInternal> new_to_send; | 390 std::deque<MCSPacketInternal> new_to_send; |
| 315 std::vector<PersistentId> expired_ttl_ids; | 391 std::vector<PersistentId> expired_ttl_ids; |
| 316 while (!to_send_.empty()) { | 392 while (!to_send_.empty()) { |
| 317 MCSPacketInternal packet = to_send_.front(); | 393 MCSPacketInternal packet = PopMessageForSend(); |
| 318 to_send_.pop_front(); | |
| 319 if (GetTTL(*packet->protobuf) > 0 && | 394 if (GetTTL(*packet->protobuf) > 0 && |
| 320 !HasTTLExpired(*packet->protobuf, clock_)) { | 395 !HasTTLExpired(*packet->protobuf, clock_)) { |
| 321 new_to_send.push_back(packet); | 396 new_to_send.push_back(packet); |
| 322 } else { | 397 } else { |
| 323 // If the TTL was 0 there is no persistent id, so no need to remove the | 398 // If the TTL was 0 there is no persistent id, so no need to remove the |
| 324 // message from the persistent store. | 399 // message from the persistent store. |
| 325 if (!packet->persistent_id.empty()) | 400 if (!packet->persistent_id.empty()) |
| 326 expired_ttl_ids.push_back(packet->persistent_id); | 401 expired_ttl_ids.push_back(packet->persistent_id); |
| 327 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 402 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
| 328 } | 403 } |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 360 if (to_send_.empty()) | 435 if (to_send_.empty()) |
| 361 return; | 436 return; |
| 362 | 437 |
| 363 // If the connection has been reset, do nothing. On reconnection | 438 // If the connection has been reset, do nothing. On reconnection |
| 364 // MaybeSendMessage will be automatically invoked again. | 439 // MaybeSendMessage will be automatically invoked again. |
| 365 // TODO(zea): consider doing TTL expiration at connection reset time, rather | 440 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
| 366 // than reconnect time. | 441 // than reconnect time. |
| 367 if (!connection_factory_->IsEndpointReachable()) | 442 if (!connection_factory_->IsEndpointReachable()) |
| 368 return; | 443 return; |
| 369 | 444 |
| 370 MCSPacketInternal packet = to_send_.front(); | 445 MCSPacketInternal packet = PopMessageForSend(); |
| 371 to_send_.pop_front(); | |
| 372 if (HasTTLExpired(*packet->protobuf, clock_)) { | 446 if (HasTTLExpired(*packet->protobuf, clock_)) { |
| 373 DCHECK(!packet->persistent_id.empty()); | 447 DCHECK(!packet->persistent_id.empty()); |
| 374 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 448 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
| 375 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 449 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
| 376 gcm_store_->RemoveOutgoingMessage( | 450 gcm_store_->RemoveOutgoingMessage( |
| 377 packet->persistent_id, | 451 packet->persistent_id, |
| 378 base::Bind(&MCSClient::OnGCMUpdateFinished, | 452 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 379 weak_ptr_factory_.GetWeakPtr())); | 453 weak_ptr_factory_.GetWeakPtr())); |
| 380 base::MessageLoop::current()->PostTask( | 454 base::MessageLoop::current()->PostTask( |
| 381 FROM_HERE, | 455 FROM_HERE, |
| (...skipping 289 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 671 StreamId device_stream_id = outgoing_packet->stream_id; | 745 StreamId device_stream_id = outgoing_packet->stream_id; |
| 672 HandleServerConfirmedReceipt(device_stream_id); | 746 HandleServerConfirmedReceipt(device_stream_id); |
| 673 | 747 |
| 674 to_resend_.pop_front(); | 748 to_resend_.pop_front(); |
| 675 } | 749 } |
| 676 | 750 |
| 677 // If the acknowledged ids aren't all there, they might be in the to_send_ | 751 // If the acknowledged ids aren't all there, they might be in the to_send_ |
| 678 // queue (typically when a StreamAck confirms messages as part of a login | 752 // queue (typically when a StreamAck confirms messages as part of a login |
| 679 // response). | 753 // response). |
| 680 for (; iter != id_list.end() && !to_send_.empty(); ++iter) { | 754 for (; iter != id_list.end() && !to_send_.empty(); ++iter) { |
| 681 const MCSPacketInternal& outgoing_packet = to_send_.front(); | 755 const MCSPacketInternal& outgoing_packet = PopMessageForSend(); |
| 682 DCHECK_EQ(outgoing_packet->persistent_id, *iter); | 756 DCHECK_EQ(outgoing_packet->persistent_id, *iter); |
| 683 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); | 757 NotifyMessageSendStatus(*outgoing_packet->protobuf, SENT); |
| 684 | 758 |
| 685 // No need to re-acknowledge any server messages this message already | 759 // No need to re-acknowledge any server messages this message already |
| 686 // acknowledged. | 760 // acknowledged. |
| 687 StreamId device_stream_id = outgoing_packet->stream_id; | 761 StreamId device_stream_id = outgoing_packet->stream_id; |
| 688 HandleServerConfirmedReceipt(device_stream_id); | 762 HandleServerConfirmedReceipt(device_stream_id); |
| 689 | |
| 690 to_send_.pop_front(); | |
| 691 } | 763 } |
| 692 | 764 |
| 693 DCHECK(iter == id_list.end()); | 765 DCHECK(iter == id_list.end()); |
| 694 | 766 |
| 695 DVLOG(1) << "Server acked " << id_list.size() | 767 DVLOG(1) << "Server acked " << id_list.size() |
| 696 << " messages, " << to_resend_.size() << " remaining unacked."; | 768 << " messages, " << to_resend_.size() << " remaining unacked."; |
| 697 gcm_store_->RemoveOutgoingMessages( | 769 gcm_store_->RemoveOutgoingMessages( |
| 698 id_list, | 770 id_list, |
| 699 base::Bind(&MCSClient::OnGCMUpdateFinished, | 771 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 700 weak_ptr_factory_.GetWeakPtr())); | 772 weak_ptr_factory_.GetWeakPtr())); |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 748 data_message_stanza->device_user_id(), | 820 data_message_stanza->device_user_id(), |
| 749 data_message_stanza->category(), | 821 data_message_stanza->category(), |
| 750 data_message_stanza->id(), | 822 data_message_stanza->id(), |
| 751 status); | 823 status); |
| 752 } | 824 } |
| 753 | 825 |
| 754 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { | 826 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { |
| 755 gcm_store_ = gcm_store; | 827 gcm_store_ = gcm_store; |
| 756 } | 828 } |
| 757 | 829 |
| 830 MCSClient::MCSPacketInternal MCSClient::PopMessageForSend() { |
| 831 MCSPacketInternal packet = to_send_.front(); |
| 832 to_send_.pop_front(); |
| 833 |
| 834 if (packet->tag == kDataMessageStanzaTag) { |
| 835 mcs_proto::DataMessageStanza* data_message = |
| 836 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
| 837 CollapseKey collapse_key(*data_message); |
| 838 if (collapse_key.IsValid()) |
| 839 collapse_key_map_.erase(collapse_key); |
| 840 } |
| 841 |
| 842 return packet; |
| 843 } |
| 844 |
| 758 } // namespace gcm | 845 } // namespace gcm |
| OLD | NEW |