Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(110)

Side by Side Diff: google_apis/gcm/engine/mcs_client.cc

Issue 148293002: [GCM] Add basic collapse key support for upstream (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: fix Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/mcs_client.h" 5 #include "google_apis/gcm/engine/mcs_client.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h" 8 #include "base/message_loop/message_loop.h"
9 #include "base/metrics/histogram.h" 9 #include "base/metrics/histogram.h"
10 #include "base/strings/string_number_conversions.h" 10 #include "base/strings/string_number_conversions.h"
(...skipping 208 matching lines...) Expand 10 before | Expand all | Expand 10 after
219 if (message.size() > kMaxMessageBytes) { 219 if (message.size() > kMaxMessageBytes) {
220 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE); 220 NotifyMessageSendStatus(message.GetProtobuf(), MESSAGE_TOO_LARGE);
221 return; 221 return;
222 } 222 }
223 223
224 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo()); 224 scoped_ptr<ReliablePacketInfo> packet_info(new ReliablePacketInfo());
225 packet_info->tag = message.tag(); 225 packet_info->tag = message.tag();
226 packet_info->protobuf = message.CloneProtobuf(); 226 packet_info->protobuf = message.CloneProtobuf();
227 227
228 if (ttl > 0) { 228 if (ttl > 0) {
229 PersistentId persistent_id = GetNextPersistentId(); 229 DCHECK_EQ(message.tag(), kDataMessageStanzaTag);
230 DVLOG(1) << "Setting persistent id to " << persistent_id; 230
231 packet_info->persistent_id = persistent_id; 231 // First check if this message should replace a pending message with the
232 SetPersistentId(persistent_id, 232 // same collapsed key.
233 packet_info->protobuf.get()); 233 mcs_proto::DataMessageStanza* data_message =
234 if (!gcm_store_->AddOutgoingMessage( 234 reinterpret_cast<mcs_proto::DataMessageStanza*>(
235 persistent_id, 235 packet_info->protobuf.get());
236 MCSMessage(message.tag(), 236 std::string app_id = data_message->from();
fgorski 2014/01/27 17:39:38 same here
Nicolas Zea 2014/01/31 11:58:43 Done.
237 *(packet_info->protobuf)), 237 DCHECK(!app_id.empty());
238 base::Bind(&MCSClient::OnGCMUpdateFinished, 238 std::string token = data_message->token();
239 weak_ptr_factory_.GetWeakPtr()))) { 239 if (!token.empty() &&
240 NotifyMessageSendStatus(message.GetProtobuf(), 240 collapse_key_map_.count(app_id) != 0 &&
241 APP_QUEUE_SIZE_LIMIT_REACHED); 241 collapse_key_map_[app_id]->count(token) != 0) {
242 ReliablePacketInfo* original_packet =
243 (*collapse_key_map_[app_id])[data_message->token()];
244 DVLOG(1) << "Found matching collapse key, Reusing persistent id of "
245 << original_packet->persistent_id;
fgorski 2014/01/27 17:39:38 Is it possible that you are assigning a persistent
Nicolas Zea 2014/01/28 08:15:35 SendMessage is only invoked for new messages, not
fgorski 2014/01/28 18:10:27 Does that mean we can fill RMQ for the application
Nicolas Zea 2014/01/31 11:58:43 Currently yes, if all the messages manage to send,
246 original_packet->protobuf = packet_info->protobuf.Pass();
247 SetPersistentId(original_packet->persistent_id,
248 original_packet->protobuf.get());
249 gcm_store_->OverwriteOutgoingMessage(
250 original_packet->persistent_id,
251 message,
252 base::Bind(&MCSClient::OnGCMUpdateFinished,
253 weak_ptr_factory_.GetWeakPtr()));
254
255 // The message is already queued, return.
242 return; 256 return;
257 } else {
258 PersistentId persistent_id = GetNextPersistentId();
259 DVLOG(1) << "Setting persistent id to " << persistent_id;
260 packet_info->persistent_id = persistent_id;
261 SetPersistentId(persistent_id,
262 packet_info->protobuf.get());
263 if (!gcm_store_->AddOutgoingMessage(
264 persistent_id,
265 MCSMessage(message.tag(),
266 *(packet_info->protobuf)),
267 base::Bind(&MCSClient::OnGCMUpdateFinished,
268 weak_ptr_factory_.GetWeakPtr()))) {
269 NotifyMessageSendStatus(message.GetProtobuf(),
270 APP_QUEUE_SIZE_LIMIT_REACHED);
271 return;
272 }
243 } 273 }
274
275 if (!token.empty()) {
276 if (!collapse_key_map_[app_id].get())
277 collapse_key_map_[app_id] = make_linked_ptr(new TokenMap());
278 (*collapse_key_map_[app_id])[token] = packet_info.get();
279 }
280
244 } else if (!connection_factory_->IsEndpointReachable()) { 281 } else if (!connection_factory_->IsEndpointReachable()) {
245 DVLOG(1) << "No active connection, dropping message."; 282 DVLOG(1) << "No active connection, dropping message.";
246 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL); 283 NotifyMessageSendStatus(message.GetProtobuf(), NO_CONNECTION_ON_ZERO_TTL);
247 return; 284 return;
248 } 285 }
249 to_send_.push_back(make_linked_ptr(packet_info.release())); 286 to_send_.push_back(make_linked_ptr(packet_info.release()));
250 MaybeSendMessage(); 287 MaybeSendMessage();
251 } 288 }
252 289
253 void MCSClient::Destroy() { 290 void MCSClient::Destroy() {
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
355 392
356 // If the connection has been reset, do nothing. On reconnection 393 // If the connection has been reset, do nothing. On reconnection
357 // MaybeSendMessage will be automatically invoked again. 394 // MaybeSendMessage will be automatically invoked again.
358 // TODO(zea): consider doing TTL expiration at connection reset time, rather 395 // TODO(zea): consider doing TTL expiration at connection reset time, rather
359 // than reconnect time. 396 // than reconnect time.
360 if (!connection_factory_->IsEndpointReachable()) 397 if (!connection_factory_->IsEndpointReachable())
361 return; 398 return;
362 399
363 MCSPacketInternal packet = to_send_.front(); 400 MCSPacketInternal packet = to_send_.front();
364 to_send_.pop_front(); 401 to_send_.pop_front();
402 if (packet->tag == kDataMessageStanzaTag) {
403 mcs_proto::DataMessageStanza* data_message =
404 reinterpret_cast<mcs_proto::DataMessageStanza*>(
405 packet->protobuf.get());
406 std::string app_id = data_message->from();
fgorski 2014/01/27 17:39:38 ditto: category
Nicolas Zea 2014/01/31 11:58:43 Done.
407 std::string token = data_message->token();
408 if (!token.empty())
409 collapse_key_map_[app_id]->erase(token);
410 }
365 if (HasTTLExpired(*packet->protobuf, clock_)) { 411 if (HasTTLExpired(*packet->protobuf, clock_)) {
366 DCHECK(!packet->persistent_id.empty()); 412 DCHECK(!packet->persistent_id.empty());
367 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; 413 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
368 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED); 414 NotifyMessageSendStatus(*packet->protobuf, TTL_EXCEEDED);
369 gcm_store_->RemoveOutgoingMessage( 415 gcm_store_->RemoveOutgoingMessage(
370 packet->persistent_id, 416 packet->persistent_id,
371 base::Bind(&MCSClient::OnGCMUpdateFinished, 417 base::Bind(&MCSClient::OnGCMUpdateFinished,
372 weak_ptr_factory_.GetWeakPtr())); 418 weak_ptr_factory_.GetWeakPtr()));
373 base::MessageLoop::current()->PostTask( 419 base::MessageLoop::current()->PostTask(
374 FROM_HERE, 420 FROM_HERE,
(...skipping 363 matching lines...) Expand 10 before | Expand all | Expand 10 after
738 const mcs_proto::DataMessageStanza* data_message_stanza = 784 const mcs_proto::DataMessageStanza* data_message_stanza =
739 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf); 785 reinterpret_cast<const mcs_proto::DataMessageStanza*>(&protobuf);
740 message_sent_callback_.Run( 786 message_sent_callback_.Run(
741 data_message_stanza->device_user_id(), 787 data_message_stanza->device_user_id(),
742 data_message_stanza->category(), 788 data_message_stanza->category(),
743 data_message_stanza->id(), 789 data_message_stanza->id(),
744 status); 790 status);
745 } 791 }
746 792
747 } // namespace gcm 793 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698