| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include <stddef.h> |
| 8 |
| 7 #include <set> | 9 #include <set> |
| 8 | 10 |
| 9 #include "base/basictypes.h" | |
| 10 #include "base/bind.h" | 11 #include "base/bind.h" |
| 11 #include "base/location.h" | 12 #include "base/location.h" |
| 12 #include "base/metrics/histogram.h" | 13 #include "base/metrics/histogram.h" |
| 13 #include "base/strings/string_number_conversions.h" | 14 #include "base/strings/string_number_conversions.h" |
| 14 #include "base/thread_task_runner_handle.h" | 15 #include "base/thread_task_runner_handle.h" |
| 15 #include "base/time/clock.h" | 16 #include "base/time/clock.h" |
| 16 #include "base/time/time.h" | 17 #include "base/time/time.h" |
| 17 #include "base/timer/timer.h" | 18 #include "base/timer/timer.h" |
| 18 #include "google_apis/gcm/base/mcs_util.h" | 19 #include "google_apis/gcm/base/mcs_util.h" |
| 19 #include "google_apis/gcm/base/socket_stream.h" | 20 #include "google_apis/gcm/base/socket_stream.h" |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 79 ~CollapseKey(); | 80 ~CollapseKey(); |
| 80 | 81 |
| 81 // Comparison operator for use in maps. | 82 // Comparison operator for use in maps. |
| 82 bool operator<(const CollapseKey& right) const; | 83 bool operator<(const CollapseKey& right) const; |
| 83 | 84 |
| 84 // Whether the message had a valid collapse key. | 85 // Whether the message had a valid collapse key. |
| 85 bool IsValid() const; | 86 bool IsValid() const; |
| 86 | 87 |
| 87 std::string token() const { return token_; } | 88 std::string token() const { return token_; } |
| 88 std::string app_id() const { return app_id_; } | 89 std::string app_id() const { return app_id_; } |
| 89 int64 device_user_id() const { return device_user_id_; } | 90 int64_t device_user_id() const { return device_user_id_; } |
| 90 | 91 |
| 91 private: | 92 private: |
| 92 const std::string token_; | 93 const std::string token_; |
| 93 const std::string app_id_; | 94 const std::string app_id_; |
| 94 const int64 device_user_id_; | 95 const int64_t device_user_id_; |
| 95 }; | 96 }; |
| 96 | 97 |
| 97 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) | 98 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) |
| 98 : token_(message.token()), | 99 : token_(message.token()), |
| 99 app_id_(message.category()), | 100 app_id_(message.category()), |
| 100 device_user_id_(message.device_user_id()) {} | 101 device_user_id_(message.device_user_id()) {} |
| 101 | 102 |
| 102 CollapseKey::~CollapseKey() {} | 103 CollapseKey::~CollapseKey() {} |
| 103 | 104 |
| 104 bool CollapseKey::IsValid() const { | 105 bool CollapseKey::IsValid() const { |
| 105 // Device user id is optional, but the application id and token are not. | 106 // Device user id is optional, but the application id and token are not. |
| 106 return !token_.empty() && !app_id_.empty(); | 107 return !token_.empty() && !app_id_.empty(); |
| 107 } | 108 } |
| 108 | 109 |
| 109 bool CollapseKey::operator<(const CollapseKey& right) const { | 110 bool CollapseKey::operator<(const CollapseKey& right) const { |
| 110 if (device_user_id_ != right.device_user_id()) | 111 if (device_user_id_ != right.device_user_id()) |
| 111 return device_user_id_ < right.device_user_id(); | 112 return device_user_id_ < right.device_user_id(); |
| 112 if (app_id_ != right.app_id()) | 113 if (app_id_ != right.app_id()) |
| 113 return app_id_ < right.app_id(); | 114 return app_id_ < right.app_id(); |
| 114 return token_ < right.token(); | 115 return token_ < right.token(); |
| 115 } | 116 } |
| 116 | 117 |
| 117 struct ReliablePacketInfo { | 118 struct ReliablePacketInfo { |
| 118 ReliablePacketInfo(); | 119 ReliablePacketInfo(); |
| 119 ~ReliablePacketInfo(); | 120 ~ReliablePacketInfo(); |
| 120 | 121 |
| 121 // The stream id with which the message was sent. | 122 // The stream id with which the message was sent. |
| 122 uint32 stream_id; | 123 uint32_t stream_id; |
| 123 | 124 |
| 124 // If reliable delivery was requested, the persistent id of the message. | 125 // If reliable delivery was requested, the persistent id of the message. |
| 125 std::string persistent_id; | 126 std::string persistent_id; |
| 126 | 127 |
| 127 // The type of message itself (for easier lookup). | 128 // The type of message itself (for easier lookup). |
| 128 uint8 tag; | 129 uint8_t tag; |
| 129 | 130 |
| 130 // The protobuf of the message itself. | 131 // The protobuf of the message itself. |
| 131 MCSProto protobuf; | 132 MCSProto protobuf; |
| 132 }; | 133 }; |
| 133 | 134 |
| 134 ReliablePacketInfo::ReliablePacketInfo() | 135 ReliablePacketInfo::ReliablePacketInfo() |
| 135 : stream_id(0), tag(0) { | 136 : stream_id(0), tag(0) { |
| 136 } | 137 } |
| 137 ReliablePacketInfo::~ReliablePacketInfo() {} | 138 ReliablePacketInfo::~ReliablePacketInfo() {} |
| 138 | 139 |
| (...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 220 << " is non-zero."; | 221 << " is non-zero."; |
| 221 | 222 |
| 222 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size() | 223 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size() |
| 223 << " incoming acks pending and " | 224 << " incoming acks pending and " |
| 224 << load_result->outgoing_messages.size() | 225 << load_result->outgoing_messages.size() |
| 225 << " outgoing messages pending."; | 226 << " outgoing messages pending."; |
| 226 | 227 |
| 227 restored_unackeds_server_ids_ = load_result->incoming_messages; | 228 restored_unackeds_server_ids_ = load_result->incoming_messages; |
| 228 | 229 |
| 229 // First go through and order the outgoing messages by recency. | 230 // First go through and order the outgoing messages by recency. |
| 230 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; | 231 std::map<uint64_t, google::protobuf::MessageLite*> ordered_messages; |
| 231 std::vector<PersistentId> expired_ttl_ids; | 232 std::vector<PersistentId> expired_ttl_ids; |
| 232 for (GCMStore::OutgoingMessageMap::iterator iter = | 233 for (GCMStore::OutgoingMessageMap::iterator iter = |
| 233 load_result->outgoing_messages.begin(); | 234 load_result->outgoing_messages.begin(); |
| 234 iter != load_result->outgoing_messages.end(); ++iter) { | 235 iter != load_result->outgoing_messages.end(); ++iter) { |
| 235 uint64 timestamp = 0; | 236 uint64_t timestamp = 0; |
| 236 if (!base::StringToUint64(iter->first, ×tamp)) { | 237 if (!base::StringToUint64(iter->first, ×tamp)) { |
| 237 LOG(ERROR) << "Invalid restored message."; | 238 LOG(ERROR) << "Invalid restored message."; |
| 238 // TODO(fgorski): Error: data unreadable | 239 // TODO(fgorski): Error: data unreadable |
| 239 mcs_error_callback_.Run(); | 240 mcs_error_callback_.Run(); |
| 240 return; | 241 return; |
| 241 } | 242 } |
| 242 | 243 |
| 243 // Check if the TTL has expired for this message. | 244 // Check if the TTL has expired for this message. |
| 244 if (HasTTLExpired(*iter->second, clock_)) { | 245 if (HasTTLExpired(*iter->second, clock_)) { |
| 245 expired_ttl_ids.push_back(iter->first); | 246 expired_ttl_ids.push_back(iter->first); |
| 246 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED); | 247 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED); |
| 247 continue; | 248 continue; |
| 248 } | 249 } |
| 249 | 250 |
| 250 ordered_messages[timestamp] = iter->second.release(); | 251 ordered_messages[timestamp] = iter->second.release(); |
| 251 } | 252 } |
| 252 | 253 |
| 253 if (!expired_ttl_ids.empty()) { | 254 if (!expired_ttl_ids.empty()) { |
| 254 gcm_store_->RemoveOutgoingMessages( | 255 gcm_store_->RemoveOutgoingMessages( |
| 255 expired_ttl_ids, | 256 expired_ttl_ids, |
| 256 base::Bind(&MCSClient::OnGCMUpdateFinished, | 257 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 257 weak_ptr_factory_.GetWeakPtr())); | 258 weak_ptr_factory_.GetWeakPtr())); |
| 258 } | 259 } |
| 259 | 260 |
| 260 // Now go through and add the outgoing messages to the send queue in their | 261 // Now go through and add the outgoing messages to the send queue in their |
| 261 // appropriate order (oldest at front, most recent at back). | 262 // appropriate order (oldest at front, most recent at back). |
| 262 for (std::map<uint64, google::protobuf::MessageLite*>::iterator | 263 for (std::map<uint64_t, google::protobuf::MessageLite*>::iterator iter = |
| 263 iter = ordered_messages.begin(); | 264 ordered_messages.begin(); |
| 264 iter != ordered_messages.end(); ++iter) { | 265 iter != ordered_messages.end(); ++iter) { |
| 265 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 266 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| 266 packet_info->protobuf.reset(iter->second); | 267 packet_info->protobuf.reset(iter->second); |
| 267 packet_info->tag = GetMCSProtoTag(*iter->second); | 268 packet_info->tag = GetMCSProtoTag(*iter->second); |
| 268 packet_info->persistent_id = base::Uint64ToString(iter->first); | 269 packet_info->persistent_id = base::Uint64ToString(iter->first); |
| 269 to_send_.push_back(make_linked_ptr(packet_info)); | 270 to_send_.push_back(make_linked_ptr(packet_info)); |
| 270 | 271 |
| 271 if (packet_info->tag == kDataMessageStanzaTag) { | 272 if (packet_info->tag == kDataMessageStanzaTag) { |
| 272 mcs_proto::DataMessageStanza* data_message = | 273 mcs_proto::DataMessageStanza* data_message = |
| 273 reinterpret_cast<mcs_proto::DataMessageStanza*>( | 274 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
| 274 packet_info->protobuf.get()); | 275 packet_info->protobuf.get()); |
| 275 CollapseKey collapse_key(*data_message); | 276 CollapseKey collapse_key(*data_message); |
| 276 if (collapse_key.IsValid()) | 277 if (collapse_key.IsValid()) |
| 277 collapse_key_map_[collapse_key] = packet_info; | 278 collapse_key_map_[collapse_key] = packet_info; |
| 278 } | 279 } |
| 279 } | 280 } |
| 280 | 281 |
| 281 // Establish if there is any custom client interval persisted from the last | 282 // Establish if there is any custom client interval persisted from the last |
| 282 // run and set it on the heartbeat manager. | 283 // run and set it on the heartbeat manager. |
| 283 custom_heartbeat_intervals_.swap(load_result->heartbeat_intervals); | 284 custom_heartbeat_intervals_.swap(load_result->heartbeat_intervals); |
| 284 int min_interval_ms = GetMinHeartbeatIntervalMs(); | 285 int min_interval_ms = GetMinHeartbeatIntervalMs(); |
| 285 heartbeat_manager_.SetClientHeartbeatIntervalMs(min_interval_ms); | 286 heartbeat_manager_.SetClientHeartbeatIntervalMs(min_interval_ms); |
| 286 } | 287 } |
| 287 | 288 |
| 288 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 289 void MCSClient::Login(uint64_t android_id, uint64_t security_token) { |
| 289 DCHECK_EQ(state_, LOADED); | 290 DCHECK_EQ(state_, LOADED); |
| 290 DCHECK(android_id_ == 0 || android_id_ == android_id); | 291 DCHECK(android_id_ == 0 || android_id_ == android_id); |
| 291 DCHECK(security_token_ == 0 || security_token_ == security_token); | 292 DCHECK(security_token_ == 0 || security_token_ == security_token); |
| 292 | 293 |
| 293 if (android_id != android_id_ && security_token != security_token_) { | 294 if (android_id != android_id_ && security_token != security_token_) { |
| 294 DCHECK(android_id); | 295 DCHECK(android_id); |
| 295 DCHECK(security_token); | 296 DCHECK(security_token); |
| 296 android_id_ = android_id; | 297 android_id_ = android_id; |
| 297 security_token_ = security_token; | 298 security_token_ = security_token; |
| 298 } | 299 } |
| (...skipping 260 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 559 | 560 |
| 560 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { | 561 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { |
| 561 packet_info->stream_id = ++stream_id_out_; | 562 packet_info->stream_id = ++stream_id_out_; |
| 562 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); | 563 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); |
| 563 | 564 |
| 564 // Set the queued time as necessary. | 565 // Set the queued time as necessary. |
| 565 if (packet_info->tag == kDataMessageStanzaTag) { | 566 if (packet_info->tag == kDataMessageStanzaTag) { |
| 566 mcs_proto::DataMessageStanza* data_message = | 567 mcs_proto::DataMessageStanza* data_message = |
| 567 reinterpret_cast<mcs_proto::DataMessageStanza*>( | 568 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
| 568 packet_info->protobuf.get()); | 569 packet_info->protobuf.get()); |
| 569 uint64 sent = data_message->sent(); | 570 uint64_t sent = data_message->sent(); |
| 570 DCHECK_GT(sent, 0U); | 571 DCHECK_GT(sent, 0U); |
| 571 int queued = (clock_->Now().ToInternalValue() / | 572 int queued = (clock_->Now().ToInternalValue() / |
| 572 base::Time::kMicrosecondsPerSecond) - sent; | 573 base::Time::kMicrosecondsPerSecond) - sent; |
| 573 DVLOG(1) << "Message was queued for " << queued << " seconds."; | 574 DVLOG(1) << "Message was queued for " << queued << " seconds."; |
| 574 data_message->set_queued(queued); | 575 data_message->set_queued(queued); |
| 575 recorder_->RecordDataSentToWire( | 576 recorder_->RecordDataSentToWire( |
| 576 data_message->category(), | 577 data_message->category(), |
| 577 data_message->to(), | 578 data_message->to(), |
| 578 data_message->id(), | 579 data_message->id(), |
| 579 queued); | 580 queued); |
| (...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 632 | 633 |
| 633 if (send) { | 634 if (send) { |
| 634 SendMessage(MCSMessage(kDataMessageStanzaTag, response.Pass())); | 635 SendMessage(MCSMessage(kDataMessageStanzaTag, response.Pass())); |
| 635 } | 636 } |
| 636 } | 637 } |
| 637 | 638 |
| 638 void MCSClient::HandlePacketFromWire( | 639 void MCSClient::HandlePacketFromWire( |
| 639 scoped_ptr<google::protobuf::MessageLite> protobuf) { | 640 scoped_ptr<google::protobuf::MessageLite> protobuf) { |
| 640 if (!protobuf.get()) | 641 if (!protobuf.get()) |
| 641 return; | 642 return; |
| 642 uint8 tag = GetMCSProtoTag(*protobuf); | 643 uint8_t tag = GetMCSProtoTag(*protobuf); |
| 643 PersistentId persistent_id = GetPersistentId(*protobuf); | 644 PersistentId persistent_id = GetPersistentId(*protobuf); |
| 644 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); | 645 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); |
| 645 | 646 |
| 646 if (last_stream_id_received != 0) { | 647 if (last_stream_id_received != 0) { |
| 647 last_device_to_server_stream_id_received_ = last_stream_id_received; | 648 last_device_to_server_stream_id_received_ = last_stream_id_received; |
| 648 | 649 |
| 649 // Process device to server messages that have now been acknowledged by the | 650 // Process device to server messages that have now been acknowledged by the |
| 650 // server. Because messages are stored in order, just pop off all that have | 651 // server. Because messages are stored in order, just pop off all that have |
| 651 // a stream id lower than server's last received stream id. | 652 // a stream id lower than server's last received stream id. |
| 652 HandleStreamAck(last_stream_id_received); | 653 HandleStreamAck(last_stream_id_received); |
| (...skipping 311 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 964 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); | 965 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
| 965 CollapseKey collapse_key(*data_message); | 966 CollapseKey collapse_key(*data_message); |
| 966 if (collapse_key.IsValid()) | 967 if (collapse_key.IsValid()) |
| 967 collapse_key_map_.erase(collapse_key); | 968 collapse_key_map_.erase(collapse_key); |
| 968 } | 969 } |
| 969 | 970 |
| 970 return packet; | 971 return packet; |
| 971 } | 972 } |
| 972 | 973 |
| 973 } // namespace gcm | 974 } // namespace gcm |
| OLD | NEW |