OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
9 #include "base/metrics/histogram.h" | 9 #include "base/metrics/histogram.h" |
10 #include "base/strings/string_number_conversions.h" | 10 #include "base/strings/string_number_conversions.h" |
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
56 for (int i = 0; i < selective_ack.id_size(); ++i) { | 56 for (int i = 0; i < selective_ack.id_size(); ++i) { |
57 DCHECK(!selective_ack.id(i).empty()); | 57 DCHECK(!selective_ack.id(i).empty()); |
58 new_list.push_back(selective_ack.id(i)); | 58 new_list.push_back(selective_ack.id(i)); |
59 } | 59 } |
60 id_list->swap(new_list); | 60 id_list->swap(new_list); |
61 return true; | 61 return true; |
62 } | 62 } |
63 | 63 |
64 } // namespace | 64 } // namespace |
65 | 65 |
66 class CollapseKey { | |
67 public: | |
68 explicit CollapseKey(const mcs_proto::DataMessageStanza& message); | |
69 ~CollapseKey(); | |
70 | |
71 // Comparison operator for use in maps. | |
72 bool operator<(const CollapseKey& right) const; | |
73 | |
74 // Whether the message had a valid collapse key. | |
75 bool IsValid() const; | |
76 | |
77 std::string token() const { return token_; } | |
78 std::string app_id() const { return app_id_; } | |
79 int64 device_user_id() const { return device_user_id_; } | |
80 | |
81 private: | |
82 const std::string token_; | |
83 const std::string app_id_; | |
84 const int64 device_user_id_; | |
85 }; | |
86 | |
87 CollapseKey::CollapseKey(const mcs_proto::DataMessageStanza& message) | |
88 : token_(message.token()), | |
89 app_id_(message.category()), | |
90 device_user_id_(message.device_user_id()) {} | |
91 | |
92 CollapseKey::~CollapseKey() {} | |
93 | |
94 bool CollapseKey::IsValid() const { | |
95 // Device user id is optional, but the application id and token are not. | |
96 return !token_.empty() && !app_id_.empty(); | |
97 } | |
98 bool CollapseKey::operator<(const CollapseKey& right) const { | |
jianli
2014/02/04 22:55:14
nit: empty line
Nicolas Zea
2014/02/12 23:35:46
Done.
| |
99 if (device_user_id_ != right.device_user_id()) | |
100 return device_user_id_ < right.device_user_id(); | |
101 if (app_id_ != right.app_id()) | |
102 return app_id_ < right.app_id(); | |
103 return token_ < right.token(); | |
104 } | |
105 | |
66 struct ReliablePacketInfo { | 106 struct ReliablePacketInfo { |
67 ReliablePacketInfo(); | 107 ReliablePacketInfo(); |
68 ~ReliablePacketInfo(); | 108 ~ReliablePacketInfo(); |
69 | 109 |
70 // The stream id with which the message was sent. | 110 // The stream id with which the message was sent. |
71 uint32 stream_id; | 111 uint32 stream_id; |
72 | 112 |
73 // If reliable delivery was requested, the persistent id of the message. | 113 // If reliable delivery was requested, the persistent id of the message. |
74 std::string persistent_id; | 114 std::string persistent_id; |
75 | 115 |
(...skipping 144 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
220 if (message.size() > kMaxMessageBytes) { | 260 if (message.size() > kMaxMessageBytes) { |
221 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); | 261 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); |
222 return; | 262 return; |
223 } | 263 } |
224 | 264 |
225 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); | 265 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); |
226 packet_info->tag = message.tag(); | 266 packet_info->tag = message.tag(); |
227 packet_info->protobuf = message.CloneProtobuf(); | 267 packet_info->protobuf = message.CloneProtobuf(); |
228 | 268 |
229 if (ttl > 0) { | 269 if (ttl > 0) { |
230 PersistentId persistent_id = GetNextPersistentId(); | 270 DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
231 DVLOG(1) << "Setting persistent id to " << persistent_id; | 271 |
232 packet_info->persistent_id = persistent_id; | 272 // First check if this message should replace a pending message with the |
233 SetPersistentId(persistent_id, | 273 // same collapsed key. |
jianli
2014/02/04 22:55:14
nit: collapse
Nicolas Zea
2014/02/12 23:35:46
Done.
| |
234 packet_info->protobuf.get()); | 274 mcs_proto::DataMessageStanza* data_message = |
235 if (!gcm_store_->AddOutgoingMessage( | 275 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
236 persistent_id, | 276 packet_info->protobuf.get()); |
237 MCSMessage(message.tag(), | 277 CollapseKey collapse_key(*data_message); |
238 *(packet_info->protobuf)), | 278 if (collapse_key.IsValid() && collapse_key_map_.count(collapse_key) > 0) { |
239 base::Bind(&MCSClient::OnGCMUpdateFinished, | 279 ReliablePacketInfo* original_packet = collapse_key_map_[collapse_key]; |
240 weak_ptr_factory_.GetWeakPtr()))) { | 280 DVLOG(1) << "Found matching collapse key, Reusing persistent id of " |
241 NotifyMessageSendStatus(message.GetProtobuf(), | 281 << original_packet->persistent_id; |
242 APP_QUEUE_SIZE_LIMIT_REACHED); | 282 original_packet->protobuf = packet_info->protobuf.Pass(); |
283 SetPersistentId(original_packet->persistent_id, | |
284 original_packet->protobuf.get()); | |
285 gcm_store_->OverwriteOutgoingMessage( | |
286 original_packet->persistent_id, | |
287 message, | |
288 base::Bind(&MCSClient::OnGCMUpdateFinished, | |
289 weak_ptr_factory_.GetWeakPtr())); | |
290 | |
291 // The message is already queued, return. | |
243 return; | 292 return; |
293 } else { | |
294 PersistentId persistent_id = GetNextPersistentId(); | |
295 DVLOG(1) << "Setting persistent id to " << persistent_id; | |
296 packet_info->persistent_id = persistent_id; | |
297 SetPersistentId(persistent_id, packet_info->protobuf.get()); | |
298 if (!gcm_store_->AddOutgoingMessage( | |
299 persistent_id, | |
300 MCSMessage(message.tag(), *(packet_info->protobuf)), | |
301 base::Bind(&MCSClient::OnGCMUpdateFinished, | |
302 weak_ptr_factory_.GetWeakPtr()))) { | |
303 NotifyMessageSendStatus(message.GetProtobuf(), | |
304 APP_QUEUE_SIZE_LIMIT_REACHED); | |
305 return; | |
306 } | |
244 } | 307 } |
308 | |
309 if (collapse_key.IsValid()) | |
310 collapse_key_map_[collapse_key] = packet_info.get(); | |
245 } else if (!connection_factory_->IsEndpointReachable()) { | 311 } else if (!connection_factory_->IsEndpointReachable()) { |
246 DVLOG(1) << "No active connection, dropping message."; | 312 DVLOG(1) << "No active connection, dropping message."; |
247 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); | 313 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
248 return; | 314 return; |
249 } | 315 } |
250 to_send_.push_back(make_linked_ptr(packet_info.release())); | 316 to_send_.push_back(make_linked_ptr(packet_info.release())); |
251 MaybeSendMessage(); | 317 MaybeSendMessage(); |
252 } | 318 } |
253 | 319 |
254 void MCSClient::Destroy() { | 320 void MCSClient::Destroy() { |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
356 | 422 |
357 // If the connection has been reset, do nothing. On reconnection | 423 // If the connection has been reset, do nothing. On reconnection |
358 // MaybeSendMessage will be automatically invoked again. | 424 // MaybeSendMessage will be automatically invoked again. |
359 // TODO(zea): consider doing TTL expiration at connection reset time, rather | 425 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
360 // than reconnect time. | 426 // than reconnect time. |
361 if (!connection_factory_->IsEndpointReachable()) | 427 if (!connection_factory_->IsEndpointReachable()) |
362 return; | 428 return; |
363 | 429 |
364 MCSPacketInternal packet = to_send_.front(); | 430 MCSPacketInternal packet = to_send_.front(); |
365 to_send_.pop_front(); | 431 to_send_.pop_front(); |
432 if (packet->tag == kDataMessageStanzaTag) { | |
433 mcs_proto::DataMessageStanza* data_message = | |
434 reinterpret_cast<mcs_proto::DataMessageStanza*>(packet->protobuf.get()); | |
435 CollapseKey collapse_key(*data_message); | |
436 if (collapse_key.IsValid()) | |
437 collapse_key_map_.erase(collapse_key); | |
438 } | |
366 if (HasTTLExpired(*packet->protobuf, clock_)) { | 439 if (HasTTLExpired(*packet->protobuf, clock_)) { |
367 DCHECK(!packet->persistent_id.empty()); | 440 DCHECK(!packet->persistent_id.empty()); |
368 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 441 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
369 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 442 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
370 gcm_store_->RemoveOutgoingMessage( | 443 gcm_store_->RemoveOutgoingMessage( |
371 packet->persistent_id, | 444 packet->persistent_id, |
372 base::Bind(&MCSClient::OnGCMUpdateFinished, | 445 base::Bind(&MCSClient::OnGCMUpdateFinished, |
373 weak_ptr_factory_.GetWeakPtr())); | 446 weak_ptr_factory_.GetWeakPtr())); |
374 base::MessageLoop::current()->PostTask( | 447 base::MessageLoop::current()->PostTask( |
375 FROM_HERE, | 448 FROM_HERE, |
(...skipping 367 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
743 data_message_stanza->category(), | 816 data_message_stanza->category(), |
744 data_message_stanza->id(), | 817 data_message_stanza->id(), |
745 status); | 818 status); |
746 } | 819 } |
747 | 820 |
748 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { | 821 void MCSClient::SetGCMStoreForTesting(GCMStore* gcm_store) { |
749 gcm_store_ = gcm_store; | 822 gcm_store_ = gcm_store; |
750 } | 823 } |
751 | 824 |
752 } // namespace gcm | 825 } // namespace gcm |
OLD | NEW |