Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
| 9 #include "base/metrics/histogram.h" | 9 #include "base/metrics/histogram.h" |
| 10 #include "base/strings/string_number_conversions.h" | 10 #include "base/strings/string_number_conversions.h" |
| (...skipping 208 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 219 if (message.size() > kMaxMessageBytes) { | 219 if (message.size() > kMaxMessageBytes) { |
| 220 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); | 220 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); |
| 221 return; | 221 return; |
| 222 } | 222 } |
| 223 | 223 |
| 224 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); | 224 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); |
| 225 packet_info->tag = message.tag(); | 225 packet_info->tag = message.tag(); |
| 226 packet_info->protobuf = message.CloneProtobuf(); | 226 packet_info->protobuf = message.CloneProtobuf(); |
| 227 | 227 |
| 228 if (ttl > 0) { | 228 if (ttl > 0) { |
| 229 PersistentId persistent_id = GetNextPersistentId(); | 229 DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
| 230 DVLOG(1) << "Setting persistent id to " << persistent_id; | 230 |
| 231 packet_info->persistent_id = persistent_id; | 231 // First check if this message should replace a pending message with the |
| 232 SetPersistentId(persistent_id, | 232 // same collapsed key. |
| 233 packet_info->protobuf.get()); | 233 mcs_proto::DataMessageStanza* data_message = |
| 234 if (!gcm_store_->AddOutgoingMessage( | 234 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
| 235 persistent_id, | 235 packet_info->protobuf.get()); |
| 236 MCSMessage(message.tag(), | 236 std::string app_id = data_message->from(); |
|
fgorski
2014/01/27 17:39:38
same here
Nicolas Zea
2014/01/31 11:58:43
Done.
| |
| 237 *(packet_info->protobuf)), | 237 DCHECK(!app_id.empty()); |
| 238 base::Bind(&MCSClient::OnGCMUpdateFinished, | 238 std::string token = data_message->token(); |
| 239 weak_ptr_factory_.GetWeakPtr()))) { | 239 if (!token.empty() && |
| 240 NotifyMessageSendStatus(message.GetProtobuf(), | 240 collapse_key_map_.count(app_id) != 0 && |
| 241 APP_QUEUE_SIZE_LIMIT_REACHED); | 241 collapse_key_map_[app_id]->count(token) != 0) { |
| 242 ReliablePacketInfo* original_packet = | |
| 243 (*collapse_key_map_[app_id])[data_message->token()]; | |
| 244 DVLOG(1) << "Found matching collapse key, Reusing persistent id of " | |
| 245 << original_packet->persistent_id; | |
|
fgorski
2014/01/27 17:39:38
Is it possible that you are assigning a persistent
Nicolas Zea
2014/01/28 08:15:35
SendMessage is only invoked for new messages, not
fgorski
2014/01/28 18:10:27
Does that mean we can fill RMQ for the application
Nicolas Zea
2014/01/31 11:58:43
Currently yes, if all the messages manage to send,
| |
| 246 original_packet->protobuf = packet_info->protobuf.Pass(); | |
| 247 SetPersistentId(original_packet->persistent_id, | |
| 248 original_packet->protobuf.get()); | |
| 249 gcm_store_->OverwriteOutgoingMessage( | |
| 250 original_packet->persistent_id, | |
| 251 message, | |
| 252 base::Bind(&MCSClient::OnGCMUpdateFinished, | |
| 253 weak_ptr_factory_.GetWeakPtr())); | |
| 254 | |
| 255 // The message is already queued, return. | |
| 242 return; | 256 return; |
| 257 } else { | |
| 258 PersistentId persistent_id = GetNextPersistentId(); | |
| 259 DVLOG(1) << "Setting persistent id to " << persistent_id; | |
| 260 packet_info->persistent_id = persistent_id; | |
| 261 SetPersistentId(persistent_id, | |
| 262 packet_info->protobuf.get()); | |
| 263 if (!gcm_store_->AddOutgoingMessage( | |
| 264 persistent_id, | |
| 265 MCSMessage(message.tag(), | |
| 266 *(packet_info->protobuf)), | |
| 267 base::Bind(&MCSClient::OnGCMUpdateFinished, | |
| 268 weak_ptr_factory_.GetWeakPtr()))) { | |
| 269 NotifyMessageSendStatus(message.GetProtobuf(), | |
| 270 APP_QUEUE_SIZE_LIMIT_REACHED); | |
| 271 return; | |
| 272 } | |
| 243 } | 273 } |
| 274 | |
| 275 if (!token.empty()) { | |
| 276 if (!collapse_key_map_[app_id].get()) | |
| 277 collapse_key_map_[app_id] = make_linked_ptr(new TokenMap()); | |
| 278 (*collapse_key_map_[app_id])[token] = packet_info.get(); | |
| 279 } | |
| 280 | |
| 244 } else if (!connection_factory_->IsEndpointReachable()) { | 281 } else if (!connection_factory_->IsEndpointReachable()) { |
| 245 DVLOG(1) << "No active connection, dropping message."; | 282 DVLOG(1) << "No active connection, dropping message."; |
| 246 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); | 283 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
| 247 return; | 284 return; |
| 248 } | 285 } |
| 249 to_send_.push_back(make_linked_ptr(packet_info.release())); | 286 to_send_.push_back(make_linked_ptr(packet_info.release())); |
| 250 MaybeSendMessage(); | 287 MaybeSendMessage(); |
| 251 } | 288 } |
| 252 | 289 |
| 253 void MCSClient::Destroy() { | 290 void MCSClient::Destroy() { |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 355 | 392 |
| 356 // If the connection has been reset, do nothing. On reconnection | 393 // If the connection has been reset, do nothing. On reconnection |
| 357 // MaybeSendMessage will be automatically invoked again. | 394 // MaybeSendMessage will be automatically invoked again. |
| 358 // TODO(zea): consider doing TTL expiration at connection reset time, rather | 395 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
| 359 // than reconnect time. | 396 // than reconnect time. |
| 360 if (!connection_factory_->IsEndpointReachable()) | 397 if (!connection_factory_->IsEndpointReachable()) |
| 361 return; | 398 return; |
| 362 | 399 |
| 363 MCSPacketInternal packet = to_send_.front(); | 400 MCSPacketInternal packet = to_send_.front(); |
| 364 to_send_.pop_front(); | 401 to_send_.pop_front(); |
| 402 if (packet->tag == kDataMessageStanzaTag) { | |
| 403 mcs_proto::DataMessageStanza* data_message = | |
| 404 reinterpret_cast<mcs_proto::DataMessageStanza*>( | |
| 405 packet->protobuf.get()); | |
| 406 std::string app_id = data_message->from(); | |
|
fgorski
2014/01/27 17:39:38
ditto: category
Nicolas Zea
2014/01/31 11:58:43
Done.
| |
| 407 std::string token = data_message->token(); | |
| 408 if (!token.empty()) | |
| 409 collapse_key_map_[app_id]->erase(token); | |
| 410 } | |
| 365 if (HasTTLExpired(*packet->protobuf, clock_)) { | 411 if (HasTTLExpired(*packet->protobuf, clock_)) { |
| 366 DCHECK(!packet->persistent_id.empty()); | 412 DCHECK(!packet->persistent_id.empty()); |
| 367 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 413 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
| 368 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 414 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
| 369 gcm_store_->RemoveOutgoingMessage( | 415 gcm_store_->RemoveOutgoingMessage( |
| 370 packet->persistent_id, | 416 packet->persistent_id, |
| 371 base::Bind(&MCSClient::OnGCMUpdateFinished, | 417 base::Bind(&MCSClient::OnGCMUpdateFinished, |
| 372 weak_ptr_factory_.GetWeakPtr())); | 418 weak_ptr_factory_.GetWeakPtr())); |
| 373 base::MessageLoop::current()->PostTask( | 419 base::MessageLoop::current()->PostTask( |
| 374 FROM_HERE, | 420 FROM_HERE, |
| (...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 738 const mcs_proto::DataMessageStanza* data_message_stanza = | 784 const mcs_proto::DataMessageStanza* data_message_stanza = |
| 739 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf); | 785 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf); |
| 740 message_sent_callback_.Run( | 786 message_sent_callback_.Run( |
| 741 data_message_stanza->device_user_id(), | 787 data_message_stanza->device_user_id(), |
| 742 data_message_stanza->category(), | 788 data_message_stanza->category(), |
| 743 data_message_stanza->id(), | 789 data_message_stanza->id(), |
| 744 status); | 790 status); |
| 745 } | 791 } |
| 746 | 792 |
| 747 } // namespace gcm | 793 } // namespace gcm |
| OLD | NEW |