OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
| 7 #include <stddef.h> |
| 8 |
7 #include <set> | 9 #include <set> |
8 | 10 |
9 #include "base/basictypes.h" | |
10 #include "base/bind.h" | 11 #include "base/bind.h" |
11 #include "base/location.h" | 12 #include "base/location.h" |
12 #include "base/metrics/histogram.h" | 13 #include "base/metrics/histogram.h" |
13 #include "base/strings/string_number_conversions.h" | 14 #include "base/strings/string_number_conversions.h" |
14 #include "base/thread_task_runner_handle.h" | 15 #include "base/thread_task_runner_handle.h" |
15 #include "base/time/clock.h" | 16 #include "base/time/clock.h" |
16 #include "base/time/time.h" | 17 #include "base/time/time.h" |
17 #include "base/timer/timer.h" | 18 #include "base/timer/timer.h" |
18 #include "google_apis/gcm/base/mcs_util.h" | 19 #include "google_apis/gcm/base/mcs_util.h" |
19 #include "google_apis/gcm/base/socket_stream.h" | 20 #include "google_apis/gcm/base/socket_stream.h" |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
79 ~CollapseKey(); | 80 ~CollapseKey(); |
80 | 81 |
81 // Comparison operator for use in maps. | 82 // Comparison operator for use in maps. |
82 bool operator<(const CollapseKey& right) const; | 83 bool operator<(const CollapseKey& right) const; |
83 | 84 |
84 // Whether the message had a valid collapse key. | 85 // Whether the message had a valid collapse key. |
85 bool IsValid() const; | 86 bool IsValid() const; |
86 | 87 |
87 std::string token() const { return token_; } | 88 std::string token() const { return token_; } |
88 std::string app_id() const { return app_id_; } | 89 std::string app_id() const { return app_id_; } |
89 int64 device_user_id() const { return device_user_id_; } | 90 int64_t device_user_id() const { return device_user_id_; } |
90 | 91 |
91 private: | 92 private: |
92 const std::string token_; | 93 const std::string token_; |
93 const std::string app_id_; | 94 const std::string app_id_; |
94 const int64 device_user_id_; | 95 const int64_t device_user_id_; |
95 }; | 96 }; |
96 | 97 |
97 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) | 98 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) |
98 : token_(message.token()), | 99 : token_(message.token()), |
99 app_id_(message.category()), | 100 app_id_(message.category()), |
100 device_user_id_(message.device_user_id()) {} | 101 device_user_id_(message.device_user_id()) {} |
101 | 102 |
102 CollapseKey::~CollapseKey() {} | 103 CollapseKey::~CollapseKey() {} |
103 | 104 |
104 bool CollapseKey::IsValid() const { | 105 bool CollapseKey::IsValid() const { |
105 // Device user id is optional, but the application id and token are not. | 106 // Device user id is optional, but the application id and token are not. |
106 return !token_.empty() && !app_id_.empty(); | 107 return !token_.empty() && !app_id_.empty(); |
107 } | 108 } |
108 | 109 |
109 bool CollapseKey::operator<(const CollapseKey& right) const { | 110 bool CollapseKey::operator<(const CollapseKey& right) const { |
110 if (device_user_id_ != right.device_user_id()) | 111 if (device_user_id_ != right.device_user_id()) |
111 return device_user_id_ < right.device_user_id(); | 112 return device_user_id_ < right.device_user_id(); |
112 if (app_id_ != right.app_id()) | 113 if (app_id_ != right.app_id()) |
113 return app_id_ < right.app_id(); | 114 return app_id_ < right.app_id(); |
114 return token_ < right.token(); | 115 return token_ < right.token(); |
115 } | 116 } |
116 | 117 |
117 struct ReliablePacketInfo { | 118 struct ReliablePacketInfo { |
118 ReliablePacketInfo(); | 119 ReliablePacketInfo(); |
119 ~ReliablePacketInfo(); | 120 ~ReliablePacketInfo(); |
120 | 121 |
121 // The stream id with which the message was sent. | 122 // The stream id with which the message was sent. |
122 uint32 stream_id; | 123 uint32_t stream_id; |
123 | 124 |
124 // If reliable delivery was requested, the persistent id of the message. | 125 // If reliable delivery was requested, the persistent id of the message. |
125 std::string persistent_id; | 126 std::string persistent_id; |
126 | 127 |
127 // The type of message itself (for easier lookup). | 128 // The type of message itself (for easier lookup). |
128 uint8 tag; | 129 uint8_t tag; |
129 | 130 |
130 // The protobuf of the message itself. | 131 // The protobuf of the message itself. |
131 MCSProto protobuf; | 132 MCSProto protobuf; |
132 }; | 133 }; |
133 | 134 |
134 ReliablePacketInfo::ReliablePacketInfo() | 135 ReliablePacketInfo::ReliablePacketInfo() |
135 : stream_id(0), tag(0) { | 136 : stream_id(0), tag(0) { |
136 } | 137 } |
137 ReliablePacketInfo::~ReliablePacketInfo() {} | 138 ReliablePacketInfo::~ReliablePacketInfo() {} |
138 | 139 |
(...skipping 81 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
220 << " is non-zero."; | 221 << " is non-zero."; |
221 | 222 |
222 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size() | 223 DVLOG(1) << "RMQ Load finished with " << load_result->incoming_messages.size() |
223 << " incoming acks pending and " | 224 << " incoming acks pending and " |
224 << load_result->outgoing_messages.size() | 225 << load_result->outgoing_messages.size() |
225 << " outgoing messages pending."; | 226 << " outgoing messages pending."; |
226 | 227 |
227 restored_unackeds_server_ids_ = load_result->incoming_messages; | 228 restored_unackeds_server_ids_ = load_result->incoming_messages; |
228 | 229 |
229 // First go through and order the outgoing messages by recency. | 230 // First go through and order the outgoing messages by recency. |
230 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; | 231 std::map<uint64_t, google::protobuf::MessageLite*> ordered_messages; |
231 std::vector<PersistentId> expired_ttl_ids; | 232 std::vector<PersistentId> expired_ttl_ids; |
232 for (GCMStore::OutgoingMessageMap::iterator iter = | 233 for (GCMStore::OutgoingMessageMap::iterator iter = |
233 load_result->outgoing_messages.begin(); | 234 load_result->outgoing_messages.begin(); |
234 iter != load_result->outgoing_messages.end(); ++iter) { | 235 iter != load_result->outgoing_messages.end(); ++iter) { |
235 uint64 timestamp = 0; | 236 uint64_t timestamp = 0; |
236 if (!base::StringToUint64(iter->first, ×tamp)) { | 237 if (!base::StringToUint64(iter->first, ×tamp)) { |
237 LOG(ERROR) << "Invalid restored message."; | 238 LOG(ERROR) << "Invalid restored message."; |
238 // TODO(fgorski): Error: data unreadable | 239 // TODO(fgorski): Error: data unreadable |
239 mcs_error_callback_.Run(); | 240 mcs_error_callback_.Run(); |
240 return; | 241 return; |
241 } | 242 } |
242 | 243 |
243 // Check if the TTL has expired for this message. | 244 // Check if the TTL has expired for this message. |
244 if (HasTTLExpired(*iter->second, clock_)) { | 245 if (HasTTLExpired(*iter->second, clock_)) { |
245 expired_ttl_ids.push_back(iter->first); | 246 expired_ttl_ids.push_back(iter->first); |
246 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED); | 247 NotifyMessageSendStatus(*iter->second, TTL_EXCEEDED); |
247 continue; | 248 continue; |
248 } | 249 } |
249 | 250 |
250 ordered_messages[timestamp] = iter->second.release(); | 251 ordered_messages[timestamp] = iter->second.release(); |
251 } | 252 } |
252 | 253 |
253 if (!expired_ttl_ids.empty()) { | 254 if (!expired_ttl_ids.empty()) { |
254 gcm_store_->RemoveOutgoingMessages( | 255 gcm_store_->RemoveOutgoingMessages( |
255 expired_ttl_ids, | 256 expired_ttl_ids, |
256 base::Bind(&MCSClient::OnGCMUpdateFinished, | 257 base::Bind(&MCSClient::OnGCMUpdateFinished, |
257 weak_ptr_factory_.GetWeakPtr())); | 258 weak_ptr_factory_.GetWeakPtr())); |
258 } | 259 } |
259 | 260 |
260 // Now go through and add the outgoing messages to the send queue in their | 261 // Now go through and add the outgoing messages to the send queue in their |
261 // appropriate order (oldest at front, most recent at back). | 262 // appropriate order (oldest at front, most recent at back). |
262 for (std::map<uint64, google::protobuf::MessageLite*>::iterator | 263 for (std::map<uint64_t, google::protobuf::MessageLite*>::iterator iter = |
263 iter = ordered_messages.begin(); | 264 ordered_messages.begin(); |
264 iter != ordered_messages.end(); ++iter) { | 265 iter != ordered_messages.end(); ++iter) { |
265 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 266 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
266 packet_info->protobuf.reset(iter->second); | 267 packet_info->protobuf.reset(iter->second); |
267 packet_info->tag = GetMCSProtoTag(*iter->second); | 268 packet_info->tag = GetMCSProtoTag(*iter->second); |
268 packet_info->persistent_id = base::Uint64ToString(iter->first); | 269 packet_info->persistent_id = base::Uint64ToString(iter->first); |
269 to_send_.push_back(make_linked_ptr(packet_info)); | 270 to_send_.push_back(make_linked_ptr(packet_info)); |
270 | 271 |
271 if (packet_info->tag == kDataMessageStanzaTag) { | 272 if (packet_info->tag == kDataMessageStanzaTag) { |
272 mcs_proto::DataMessageStanza* data_message = | 273 mcs_proto::DataMessageStanza* data_message = |
273 reinterpret_cast<mcs_proto::DataMessageStanza*>( | 274 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
274 packet_info->protobuf.get()); | 275 packet_info->protobuf.get()); |
275 CollapseKey collapse_key(*data_message); | 276 CollapseKey collapse_key(*data_message); |
276 if (collapse_key.IsValid()) | 277 if (collapse_key.IsValid()) |
277 collapse_key_map_[collapse_key] = packet_info; | 278 collapse_key_map_[collapse_key] = packet_info; |
278 } | 279 } |
279 } | 280 } |
280 | 281 |
281 // Establish if there is any custom client interval persisted from the last | 282 // Establish if there is any custom client interval persisted from the last |
282 // run and set it on the heartbeat manager. | 283 // run and set it on the heartbeat manager. |
283 custom_heartbeat_intervals_.swap(load_result->heartbeat_intervals); | 284 custom_heartbeat_intervals_.swap(load_result->heartbeat_intervals); |
284 int min_interval_ms = GetMinHeartbeatIntervalMs(); | 285 int min_interval_ms = GetMinHeartbeatIntervalMs(); |
285 heartbeat_manager_.SetClientHeartbeatIntervalMs(min_interval_ms); | 286 heartbeat_manager_.SetClientHeartbeatIntervalMs(min_interval_ms); |
286 } | 287 } |
287 | 288 |
288 void MCSClient::Login(uint64 android_id, uint64 security_token) { | 289 void MCSClient::Login(uint64_t android_id, uint64_t security_token) { |
289 DCHECK_EQ(state_, LOADED); | 290 DCHECK_EQ(state_, LOADED); |
290 DCHECK(android_id_ == 0 || android_id_ == android_id); | 291 DCHECK(android_id_ == 0 || android_id_ == android_id); |
291 DCHECK(security_token_ == 0 || security_token_ == security_token); | 292 DCHECK(security_token_ == 0 || security_token_ == security_token); |
292 | 293 |
293 if (android_id != android_id_ && security_token != security_token_) { | 294 if (android_id != android_id_ && security_token != security_token_) { |
294 DCHECK(android_id); | 295 DCHECK(android_id); |
295 DCHECK(security_token); | 296 DCHECK(security_token); |
296 android_id_ = android_id; | 297 android_id_ = android_id; |
297 security_token_ = security_token; | 298 security_token_ = security_token; |
298 } | 299 } |
(...skipping 260 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
559 | 560 |
560 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { | 561 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { |
561 packet_info->stream_id = ++stream_id_out_; | 562 packet_info->stream_id = ++stream_id_out_; |
562 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); | 563 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); |
563 | 564 |
564 // Set the queued time as necessary. | 565 // Set the queued time as necessary. |
565 if (packet_info->tag == kDataMessageStanzaTag) { | 566 if (packet_info->tag == kDataMessageStanzaTag) { |
566 mcs_proto::DataMessageStanza* data_message = | 567 mcs_proto::DataMessageStanza* data_message = |
567 reinterpret_cast<mcs_proto::DataMessageStanza*>( | 568 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
568 packet_info->protobuf.get()); | 569 packet_info->protobuf.get()); |
569 uint64 sent = data_message->sent(); | 570 uint64_t sent = data_message->sent(); |
570 DCHECK_GT(sent, 0U); | 571 DCHECK_GT(sent, 0U); |
571 int queued = (clock_->Now().ToInternalValue() / | 572 int queued = (clock_->Now().ToInternalValue() / |
572 base::Time::kMicrosecondsPerSecond) - sent; | 573 base::Time::kMicrosecondsPerSecond) - sent; |
573 DVLOG(1) << "Message was queued for " << queued << " seconds."; | 574 DVLOG(1) << "Message was queued for " << queued << " seconds."; |
574 data_message->set_queued(queued); | 575 data_message->set_queued(queued); |
575 recorder_->RecordDataSentToWire( | 576 recorder_->RecordDataSentToWire( |
576 data_message->category(), | 577 data_message->category(), |
577 data_message->to(), | 578 data_message->to(), |
578 data_message->id(), | 579 data_message->id(), |
579 queued); | 580 queued); |
(...skipping 52 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
632 | 633 |
633 if (send) { | 634 if (send) { |
634 SendMessage(MCSMessage(kDataMessageStanzaTag, response.Pass())); | 635 SendMessage(MCSMessage(kDataMessageStanzaTag, response.Pass())); |
635 } | 636 } |
636 } | 637 } |
637 | 638 |
638 void MCSClient::HandlePacketFromWire( | 639 void MCSClient::HandlePacketFromWire( |
639 scoped_ptr<google::protobuf::MessageLite> protobuf) { | 640 scoped_ptr<google::protobuf::MessageLite> protobuf) { |
640 if (!protobuf.get()) | 641 if (!protobuf.get()) |
641 return; | 642 return; |
642 uint8 tag = GetMCSProtoTag(*protobuf); | 643 uint8_t tag = GetMCSProtoTag(*protobuf); |
643 PersistentId persistent_id = GetPersistentId(*protobuf); | 644 PersistentId persistent_id = GetPersistentId(*protobuf); |
644 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); | 645 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); |
645 | 646 |
646 if (last_stream_id_received != 0) { | 647 if (last_stream_id_received != 0) { |
647 last_device_to_server_stream_id_received_ = last_stream_id_received; | 648 last_device_to_server_stream_id_received_ = last_stream_id_received; |
648 | 649 |
649 // Process device to server messages that have now been acknowledged by the | 650 // Process device to server messages that have now been acknowledged by the |
650 // server. Because messages are stored in order, just pop off all that have | 651 // server. Because messages are stored in order, just pop off all that have |
651 // a stream id lower than server's last received stream id. | 652 // a stream id lower than server's last received stream id. |
652 HandleStreamAck(last_stream_id_received); | 653 HandleStreamAck(last_stream_id_received); |
(...skipping 311 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
964 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); | 965 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); |
965 CollapseKey collapse_key(*data_message); | 966 CollapseKey collapse_key(*data_message); |
966 if (collapse_key.IsValid()) | 967 if (collapse_key.IsValid()) |
967 collapse_key_map_.erase(collapse_key); | 968 collapse_key_map_.erase(collapse_key); |
968 } | 969 } |
969 | 970 |
970 return packet; | 971 return packet; |
971 } | 972 } |
972 | 973 |
973 } // namespace gcm | 974 } // namespace gcm |
OLD | NEW |