Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
| 9 #include "base/metrics/histogram.h" | 9 #include "base/metrics/histogram.h" |
| 10 #include "base/strings/string_number_conversions.h" | 10 #include "base/strings/string_number_conversions.h" |
| (...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 56 for (int i = 0; i < selective_ack.id_size(); ++i) { | 56 for (int i = 0; i < selective_ack.id_size(); ++i) { |
| 57 DCHECK(!selective_ack.id(i).empty()); | 57 DCHECK(!selective_ack.id(i).empty()); |
| 58 new_list.push_back(selective_ack.id(i)); | 58 new_list.push_back(selective_ack.id(i)); |
| 59 } | 59 } |
| 60 id_list->swap(new_list); | 60 id_list->swap(new_list); |
| 61 return true; | 61 return true; |
| 62 } | 62 } |
| 63 | 63 |
| 64 } // namespace | 64 } // namespace |
| 65 | 65 |
| 66 class CollapseKey { | |
| 67 public: | |
| 68 explicit CollapseKey(const mcs_proto::DataMessageStanza& message); | |
| 69 ~CollapseKey(); | |
| 70 | |
| 71 // Comparison operator for use in maps. | |
| 72 bool operator<(const CollapseKey& right) const; | |
| 73 | |
| 74 // Whether the message had a valid collapse key. | |
| 75 bool IsValid() const; | |
| 76 | |
| 77 std::string token() const { return token_; } | |
| 78 std::string app_id() const { return app_id_; } | |
| 79 int64 device_user_id() const { return device_user_id_; } | |
| 80 | |
| 81 private: | |
| 82 const std::string token_; | |
| 83 const std::string app_id_; | |
| 84 const int64 device_user_id_; | |
| 85 }; | |
| 86 | |
| 87 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) | |
| 88 : token_(message.token()), | |
| 89 app_id_(message.category()), | |
| 90 device_user_id_(message.device_user_id()) {} | |
| 91 | |
| 92 CollapseKey::~CollapseKey() {} | |
| 93 | |
| 94 bool CollapseKey::IsValid() const { | |
| 95 // Device user id is optional, but the application id and token are not. | |
| 96 return !token_.empty() && !app_id_.empty(); | |
| 97 } | |
| 98 bool CollapseKey::operator<(const CollapseKey& right) const { | |
|
jianli
2014/02/04 22:55:14
nit: empty line
Nicolas Zea
2014/02/12 23:35:46
Done.
| |
| 99 if (device_user_id_ != right.device_user_id()) | |
| 100 return device_user_id_ < right.device_user_id(); | |
| 101 if (app_id_ != right.app_id()) | |
| 102 return app_id_ < right.app_id(); | |
| 103 return token_ < right.token(); | |
| 104 } | |
| 105 | |
| 66 struct ReliablePacketInfo { | 106 struct ReliablePacketInfo { |
| 67 ReliablePacketInfo(); | 107 ReliablePacketInfo(); |
| 68 ~ReliablePacketInfo(); | 108 ~ReliablePacketInfo(); |
| 69 | 109 |
| 70 // The stream id with which the message was sent. | 110 // The stream id with which the message was sent. |
| 71 uint32 stream_id; | 111 uint32 stream_id; |
| 72 | 112 |
| 73 // If reliable delivery was requested, the persistent id of the message. | 113 // If reliable delivery was requested, the persistent id of the message. |
| 74 std::string persistent_id; | 114 std::string persistent_id; |
| 75 | 115 |
| (...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 220 if (message.size() > kMaxMessageBytes) { | 260 if (message.size() > kMaxMessageBytes) { |
| 221 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); | 261 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); |
| 222 return; | 262 return; |
| 223 } | 263 } |
| 224 | 264 |
| 225 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); | 265 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); |
| 226 packet_info->tag = message.tag(); | 266 packet_info->tag = message.tag(); |
| 227 packet_info->protobuf = message.CloneProtobuf(); | 267 packet_info->protobuf = message.CloneProtobuf(); |
| 228 | 268 |
| 229 if (ttl > 0) { | 269 if (ttl > 0) { |
| 230 PersistentId persistent_id = GetNextPersistentId(); | 270 DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
| 231 DVLOG(1) << "Setting persistent id to " << persistent_id; | 271 |
| 232 packet_info->persistent_id = persistent_id; | 272 // First check if this message should replace a pending message with the |
| 233 SetPersistentId(persistent_id, | 273 // same collapsed key. |
|
jianli
2014/02/04 22:55:14
nit: collapse
Nicolas Zea
2014/02/12 23:35:46
Done.
| |
| 234 packet_info->protobuf.get()); | 274 mcs_proto::DataMessageStanza* data_message = |
| 235 if (!gcm_store_->AddOutgoingMessage( | 275 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
| 236 persistent_id, | 276 packet_info->protobuf.get()); |
| 237 MCSMessage(message.tag(), | 277 CollapseKey collapse_key(*data_message); |
| 238 *(packet_info->protobuf)), | 278 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { |
| 239 base::Bind(&MCSClient::OnGCMUpdateFinished, | 279 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; |
| 240 weak_ptr_factory_.GetWeakPtr()))) { | 280 DVLOG(1) << "Found matching collapse key, Reusing persistent id of " |
| 241 NotifyMessageSendStatus(message.GetProtobuf(), | 281 << original_packet->persistent_id; |
| 242 APP_QUEUE_SIZE_LIMIT_REACHED); | 282 original_packet->protobuf = packet_info->protobuf.Pass(); |
| 283 SetPersistentId(original_packet->persistent_id, | |
| 284 original_packet->protobuf.get()); | |
| 285 gcm_store_->OverwriteOutgoingMessage( | |
| 286 original_packet->persistent_id, | |
| 287 message, | |
| 288 base::Bind(&MCSClient::OnGCMUpdateFinished, | |
| 289 weak_ptr_factory_.GetWeakPtr())); | |
| 290 | |
| 291 // The message is already queued, return. | |
| 243 return; | 292 return; |
| 293 } else { | |
| 294 PersistentId persistent_id = GetNextPersistentId(); | |
| 295 DVLOG(1) << "Setting persistent id to " << persistent_id; | |
| 296 packet_info->persistent_id = persistent_id; | |
| 297 SetPersistentId(persistent_id, packet_info->protobuf.get()); | |
| 298 if (!gcm_store_->AddOutgoingMessage( | |
| 299 persistent_id, | |
| 300 MCSMessage(message.tag(), *(packet_info->protobuf)), | |
| 301 base::Bind(&MCSClient::OnGCMUpdateFinished, | |
| 302 weak_ptr_factory_.GetWeakPtr()))) { | |
| 303 NotifyMessageSendStatus(message.GetProtobuf(), | |
| 304 APP_QUEUE_SIZE_LIMIT_REACHED); | |
| 305 return; | |
| 306 } | |
| 244 } | 307 } |
| 308 | |
| 309 if (collapse_key.IsValid()) | |
| 310 collapse_key_map_[collapse_key] = packet_info.get(); | |
| 245 } else if (!connection_factory_->IsEndpointReachable()) { | 311 } else if (!connection_factory_->IsEndpointReachable()) { |
| 246 DVLOG(1) << "No active connection, dropping message."; | 312 DVLOG(1) << "No active connection, dropping message."; |
| 247 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); | 313 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
| 248 return; | 314 return; |
| 249 } | 315 } |
| 250 to_send_.push_back(make_linked_ptr(packet_info.release())); | 316 to_send_.push_back(make_linked_ptr(packet_info.release())); |
| 251 MaybeSendMessage(); | 317 MaybeSendMessage(); |
| 252 } | 318 } |
| 253 | 319 |
| 254 void MCSClient::Destroy() { | 320 void MCSClient::Destroy() { |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 356 | 422 |
| 357 // If the connection has been reset, do nothing. On reconnection | 423 // If the connection has been reset, do nothing. On reconnection |
| 358 // MaybeSendMessage will be automatically invoked again. | 424 // MaybeSendMessage will be automatically invoked again. |
| 359 // TODO(zea): consider doing TTL expiration at connection reset time, rather | 425 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
| 360 // than reconnect time. | 426 // than reconnect time. |
| 361 if (!connection_factory_->IsEndpointReachable()) | 427 if (!connection_factory_->IsEndpointReachable()) |
| 362 return; | 428 return; |
| 363 | 429 |
| 364 MCSPacketInternal packet = to_send_.front(); | 430 MCSPacketInternal packet = to_send_.front(); |
| 365 to_send_.pop_front(); | 431 to_send_.pop_front(); |
| 432 if (packet->tag == kDataMessageStanzaTag) { | |
| 433 mcs_proto::DataMessageStanza* data_message = | |
| 434 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); | |
| 435 CollapseKey collapse_key(*data_message); | |
| 436 if (collapse_key.IsValid()) | |
| 437 collapse_key_map_.erase(collapse_key); | |
| 438 } | |
| 366 if (HasTTLExpired(*packet->protobuf, clock_)) { | 439 if (HasTTLExpired(*packet->protobuf, clock_)) { |
| 367 DCHECK(!packet->persistent_id.empty()); | 440 DCHECK(!packet->persistent_id.empty()); |
| 368 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 441 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
| 369 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 442 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
| 370 gcm_store_->RemoveOutgoingMessage( | 443 gcm_store_->RemoveOutgoingMessage( |
| 371 packet->persistent_id, | 444 packet->persistent_id, |
| 372 base::Bind(&MCSClient::OnGCMUpdateFinished, | 445 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 373 weak_ptr_factory_.GetWeakPtr())); | 446 weak_ptr_factory_.GetWeakPtr())); |
| 374 base::MessageLoop::current()->PostTask( | 447 base::MessageLoop::current()->PostTask( |
| 375 FROM_HERE, | 448 FROM_HERE, |
| (...skipping 367 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 743 data_message_stanza->category(), | 816 data_message_stanza->category(), |
| 744 data_message_stanza->id(), | 817 data_message_stanza->id(), |
| 745 status); | 818 status); |
| 746 } | 819 } |
| 747 | 820 |
| 748 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { | 821 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { |
| 749 gcm_store_ = gcm_store; | 822 gcm_store_ = gcm_store; |
| 750 } | 823 } |
| 751 | 824 |
| 752 } // namespace gcm | 825 } // namespace gcm |
| OLD | NEW |