OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
9 #include "base/metrics/histogram.h" | 9 #include "base/metrics/histogram.h" |
10 #include "base/strings/string_number_conversions.h" | 10 #include "base/strings/string_number_conversions.h" |
(...skipping 208 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
219 if (message.size() > kMaxMessageBytes) { | 219 if (message.size() > kMaxMessageBytes) { |
220 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); | 220 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); |
221 return; | 221 return; |
222 } | 222 } |
223 | 223 |
224 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); | 224 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); |
225 packet_info->tag = message.tag(); | 225 packet_info->tag = message.tag(); |
226 packet_info->protobuf = message.CloneProtobuf(); | 226 packet_info->protobuf = message.CloneProtobuf(); |
227 | 227 |
228 if (ttl > 0) { | 228 if (ttl > 0) { |
229 PersistentId persistent_id = GetNextPersistentId(); | 229 DCHECK_EQ(message.tag(), kDataMessageStanzaTag); |
230 DVLOG(1) << "Setting persistent id to " << persistent_id; | 230 |
231 packet_info->persistent_id = persistent_id; | 231 // First check if this message should replace a pending message with the |
232 SetPersistentId(persistent_id, | 232 // same collapsed key. |
233 packet_info->protobuf.get()); | 233 mcs_proto::DataMessageStanza* data_message = |
234 if (!gcm_store_->AddOutgoingMessage( | 234 reinterpret_cast<mcs_proto::DataMessageStanza*>( |
235 persistent_id, | 235 packet_info->protobuf.get()); |
236 MCSMessage(message.tag(), | 236 std::string app_id = data_message->from(); |
fgorski
2014/01/27 17:39:38
same here
Nicolas Zea
2014/01/31 11:58:43
Done.
| |
237 *(packet_info->protobuf)), | 237 DCHECK(!app_id.empty()); |
238 base::Bind(&MCSClient::OnGCMUpdateFinished, | 238 std::string token = data_message->token(); |
239 weak_ptr_factory_.GetWeakPtr()))) { | 239 if (!token.empty() && |
240 NotifyMessageSendStatus(message.GetProtobuf(), | 240 collapse_key_map_.count(app_id) != 0 && |
241 APP_QUEUE_SIZE_LIMIT_REACHED); | 241 collapse_key_map_[app_id]->count(token) != 0) { |
242 ReliablePacketInfo* original_packet = | |
243 (*collapse_key_map_[app_id])[data_message->token()]; | |
244 DVLOG(1) << "Found matching collapse key, Reusing persistent id of " | |
245 << original_packet->persistent_id; | |
fgorski
2014/01/27 17:39:38
Is it possible that you are assigning a persistent
Nicolas Zea
2014/01/28 08:15:35
SendMessage is only invoked for new messages, not
fgorski
2014/01/28 18:10:27
Does that mean we can fill RMQ for the application
Nicolas Zea
2014/01/31 11:58:43
Currently yes, if all the messages manage to send,
| |
246 original_packet->protobuf = packet_info->protobuf.Pass(); | |
247 SetPersistentId(original_packet->persistent_id, | |
248 original_packet->protobuf.get()); | |
249 gcm_store_->OverwriteOutgoingMessage( | |
250 original_packet->persistent_id, | |
251 message, | |
252 base::Bind(&MCSClient::OnGCMUpdateFinished, | |
253 weak_ptr_factory_.GetWeakPtr())); | |
254 | |
255 // The message is already queued, return. | |
242 return; | 256 return; |
257 } else { | |
258 PersistentId persistent_id = GetNextPersistentId(); | |
259 DVLOG(1) << "Setting persistent id to " << persistent_id; | |
260 packet_info->persistent_id = persistent_id; | |
261 SetPersistentId(persistent_id, | |
262 packet_info->protobuf.get()); | |
263 if (!gcm_store_->AddOutgoingMessage( | |
264 persistent_id, | |
265 MCSMessage(message.tag(), | |
266 *(packet_info->protobuf)), | |
267 base::Bind(&MCSClient::OnGCMUpdateFinished, | |
268 weak_ptr_factory_.GetWeakPtr()))) { | |
269 NotifyMessageSendStatus(message.GetProtobuf(), | |
270 APP_QUEUE_SIZE_LIMIT_REACHED); | |
271 return; | |
272 } | |
243 } | 273 } |
274 | |
275 if (!token.empty()) { | |
276 if (!collapse_key_map_[app_id].get()) | |
277 collapse_key_map_[app_id] = make_linked_ptr(new TokenMap()); | |
278 (*collapse_key_map_[app_id])[token] = packet_info.get(); | |
279 } | |
280 | |
244 } else if (!connection_factory_->IsEndpointReachable()) { | 281 } else if (!connection_factory_->IsEndpointReachable()) { |
245 DVLOG(1) << "No active connection, dropping message."; | 282 DVLOG(1) << "No active connection, dropping message."; |
246 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); | 283 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); |
247 return; | 284 return; |
248 } | 285 } |
249 to_send_.push_back(make_linked_ptr(packet_info.release())); | 286 to_send_.push_back(make_linked_ptr(packet_info.release())); |
250 MaybeSendMessage(); | 287 MaybeSendMessage(); |
251 } | 288 } |
252 | 289 |
253 void MCSClient::Destroy() { | 290 void MCSClient::Destroy() { |
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
355 | 392 |
356 // If the connection has been reset, do nothing. On reconnection | 393 // If the connection has been reset, do nothing. On reconnection |
357 // MaybeSendMessage will be automatically invoked again. | 394 // MaybeSendMessage will be automatically invoked again. |
358 // TODO(zea): consider doing TTL expiration at connection reset time, rather | 395 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
359 // than reconnect time. | 396 // than reconnect time. |
360 if (!connection_factory_->IsEndpointReachable()) | 397 if (!connection_factory_->IsEndpointReachable()) |
361 return; | 398 return; |
362 | 399 |
363 MCSPacketInternal packet = to_send_.front(); | 400 MCSPacketInternal packet = to_send_.front(); |
364 to_send_.pop_front(); | 401 to_send_.pop_front(); |
402 if (packet->tag == kDataMessageStanzaTag) { | |
403 mcs_proto::DataMessageStanza* data_message = | |
404 reinterpret_cast<mcs_proto::DataMessageStanza*>( | |
405 packet->protobuf.get()); | |
406 std::string app_id = data_message->from(); | |
fgorski
2014/01/27 17:39:38
ditto: category
Nicolas Zea
2014/01/31 11:58:43
Done.
| |
407 std::string token = data_message->token(); | |
408 if (!token.empty()) | |
409 collapse_key_map_[app_id]->erase(token); | |
410 } | |
365 if (HasTTLExpired(*packet->protobuf, clock_)) { | 411 if (HasTTLExpired(*packet->protobuf, clock_)) { |
366 DCHECK(!packet->persistent_id.empty()); | 412 DCHECK(!packet->persistent_id.empty()); |
367 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | 413 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
368 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); | 414 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); |
369 gcm_store_->RemoveOutgoingMessage( | 415 gcm_store_->RemoveOutgoingMessage( |
370 packet->persistent_id, | 416 packet->persistent_id, |
371 base::Bind(&MCSClient::OnGCMUpdateFinished, | 417 base::Bind(&MCSClient::OnGCMUpdateFinished, |
372 weak_ptr_factory_.GetWeakPtr())); | 418 weak_ptr_factory_.GetWeakPtr())); |
373 base::MessageLoop::current()->PostTask( | 419 base::MessageLoop::current()->PostTask( |
374 FROM_HERE, | 420 FROM_HERE, |
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
738 const mcs_proto::DataMessageStanza* data_message_stanza = | 784 const mcs_proto::DataMessageStanza* data_message_stanza = |
739 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf); | 785 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf); |
740 message_sent_callback_.Run( | 786 message_sent_callback_.Run( |
741 data_message_stanza->device_user_id(), | 787 data_message_stanza->device_user_id(), |
742 data_message_stanza->category(), | 788 data_message_stanza->category(), |
743 data_message_stanza->id(), | 789 data_message_stanza->id(), |
744 status); | 790 status); |
745 } | 791 } |
746 | 792 |
747 } // namespace gcm | 793 } // namespace gcm |
OLD | NEW |