Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(150)

Side by Side Diff: google_apis/gcm/engine/mcs_client.cc

Issue 121743002: Renaming RMQStore to GCMStore and breaking out its interface (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Updates to documentation and test method naming Created 6 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/mcs_client.h" 5 #include "google_apis/gcm/engine/mcs_client.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h" 8 #include "base/message_loop/message_loop.h"
9 #include "base/strings/string_number_conversions.h" 9 #include "base/strings/string_number_conversions.h"
10 #include "google_apis/gcm/base/mcs_util.h" 10 #include "google_apis/gcm/base/mcs_util.h"
11 #include "google_apis/gcm/base/socket_stream.h" 11 #include "google_apis/gcm/base/socket_stream.h"
12 #include "google_apis/gcm/engine/connection_factory.h" 12 #include "google_apis/gcm/engine/connection_factory.h"
13 #include "google_apis/gcm/engine/rmq_store.h"
14 13
15 using namespace google::protobuf::io; 14 using namespace google::protobuf::io;
16 15
17 namespace gcm { 16 namespace gcm {
18 17
19 namespace { 18 namespace {
20 19
21 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto; 20 typedef scoped_ptr<google::protobuf::MessageLite> MCSProto;
22 21
23 // TODO(zea): get these values from MCS settings. 22 // TODO(zea): get these values from MCS settings.
(...skipping 55 matching lines...) Expand 10 before | Expand all | Expand 10 after
79 78
80 // The protobuf of the message itself. 79 // The protobuf of the message itself.
81 MCSProto protobuf; 80 MCSProto protobuf;
82 }; 81 };
83 82
84 ReliablePacketInfo::ReliablePacketInfo() 83 ReliablePacketInfo::ReliablePacketInfo()
85 : stream_id(0), tag(0) { 84 : stream_id(0), tag(0) {
86 } 85 }
87 ReliablePacketInfo::~ReliablePacketInfo() {} 86 ReliablePacketInfo::~ReliablePacketInfo() {}
88 87
89 MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) 88 MCSClient::MCSClient(ConnectionFactory* connection_factory, GCMStore* gcm_store)
90 : state_(UNINITIALIZED), 89 : state_(UNINITIALIZED),
91 android_id_(0), 90 android_id_(0),
92 security_token_(0), 91 security_token_(0),
93 connection_factory_(connection_factory), 92 connection_factory_(connection_factory),
94 connection_handler_(NULL), 93 connection_handler_(NULL),
95 last_device_to_server_stream_id_received_(0), 94 last_device_to_server_stream_id_received_(0),
96 last_server_to_device_stream_id_received_(0), 95 last_server_to_device_stream_id_received_(0),
97 stream_id_out_(0), 96 stream_id_out_(0),
98 stream_id_in_(0), 97 stream_id_in_(0),
99 rmq_store_(rmq_store), 98 gcm_store_(gcm_store),
100 heartbeat_interval_( 99 heartbeat_interval_(
101 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)), 100 base::TimeDelta::FromSeconds(kHeartbeatDefaultSeconds)),
102 heartbeat_timer_(true, true), 101 heartbeat_timer_(true, true),
103 weak_ptr_factory_(this) { 102 weak_ptr_factory_(this) {}
104 }
105 103
106 MCSClient::~MCSClient() { 104 MCSClient::~MCSClient() {
107 } 105 }
108 106
109 void MCSClient::Initialize( 107 void MCSClient::Initialize(
110 const InitializationCompleteCallback& initialization_callback, 108 const InitializationCompleteCallback& initialization_callback,
111 const OnMessageReceivedCallback& message_received_callback, 109 const OnMessageReceivedCallback& message_received_callback,
112 const OnMessageSentCallback& message_sent_callback, 110 const OnMessageSentCallback& message_sent_callback,
113 const RMQStore::LoadResult& load_result) { 111 const GCMStore::LoadResult& load_result) {
114 DCHECK_EQ(state_, UNINITIALIZED); 112 DCHECK_EQ(state_, UNINITIALIZED);
115 initialization_callback_ = initialization_callback; 113 initialization_callback_ = initialization_callback;
116 message_received_callback_ = message_received_callback; 114 message_received_callback_ = message_received_callback;
117 message_sent_callback_ = message_sent_callback; 115 message_sent_callback_ = message_sent_callback;
118 116
119 connection_factory_->Initialize( 117 connection_factory_->Initialize(
120 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest, 118 base::Bind(&MCSClient::ResetStateAndBuildLoginRequest,
121 weak_ptr_factory_.GetWeakPtr()), 119 weak_ptr_factory_.GetWeakPtr()),
122 base::Bind(&MCSClient::HandlePacketFromWire, 120 base::Bind(&MCSClient::HandlePacketFromWire,
123 weak_ptr_factory_.GetWeakPtr()), 121 weak_ptr_factory_.GetWeakPtr()),
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
183 } 181 }
184 182
185 void MCSClient::Login(uint64 android_id, uint64 security_token) { 183 void MCSClient::Login(uint64 android_id, uint64 security_token) {
186 DCHECK_EQ(state_, LOADED); 184 DCHECK_EQ(state_, LOADED);
187 if (android_id != android_id_ && security_token != security_token_) { 185 if (android_id != android_id_ && security_token != security_token_) {
188 DCHECK(android_id); 186 DCHECK(android_id);
189 DCHECK(security_token); 187 DCHECK(security_token);
190 DCHECK(restored_unackeds_server_ids_.empty()); 188 DCHECK(restored_unackeds_server_ids_.empty());
191 android_id_ = android_id; 189 android_id_ = android_id;
192 security_token_ = security_token; 190 security_token_ = security_token;
193 rmq_store_->SetDeviceCredentials( 191 gcm_store_->SetDeviceCredentials(
194 android_id_, 192 android_id_,
195 security_token_, 193 security_token_,
196 base::Bind(&MCSClient::OnRMQUpdateFinished, 194 base::Bind(&MCSClient::OnGCMUpdateFinished,
197 weak_ptr_factory_.GetWeakPtr())); 195 weak_ptr_factory_.GetWeakPtr()));
198 } 196 }
199 197
200 state_ = CONNECTING; 198 state_ = CONNECTING;
201 connection_factory_->Connect(); 199 connection_factory_->Connect();
202 } 200 }
203 201
204 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { 202 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) {
205 DCHECK_EQ(state_, CONNECTED); 203 DCHECK_EQ(state_, CONNECTED);
206 if (to_send_.size() > kMaxSendQueueSize) { 204 if (to_send_.size() > kMaxSendQueueSize) {
(...skipping 11 matching lines...) Expand all
218 216
219 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 217 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
220 packet_info->protobuf = message.CloneProtobuf(); 218 packet_info->protobuf = message.CloneProtobuf();
221 219
222 if (use_rmq) { 220 if (use_rmq) {
223 PersistentId persistent_id = GetNextPersistentId(); 221 PersistentId persistent_id = GetNextPersistentId();
224 DVLOG(1) << "Setting persistent id to " << persistent_id; 222 DVLOG(1) << "Setting persistent id to " << persistent_id;
225 packet_info->persistent_id = persistent_id; 223 packet_info->persistent_id = persistent_id;
226 SetPersistentId(persistent_id, 224 SetPersistentId(persistent_id,
227 packet_info->protobuf.get()); 225 packet_info->protobuf.get());
228 rmq_store_->AddOutgoingMessage(persistent_id, 226 gcm_store_->AddOutgoingMessage(
229 MCSMessage(message.tag(), 227 persistent_id,
230 *(packet_info->protobuf)), 228 MCSMessage(message.tag(), *(packet_info->protobuf)),
231 base::Bind(&MCSClient::OnRMQUpdateFinished, 229 base::Bind(&MCSClient::OnGCMUpdateFinished,
232 weak_ptr_factory_.GetWeakPtr())); 230 weak_ptr_factory_.GetWeakPtr()));
233 } else { 231 } else {
234 // Check that there is an active connection to the endpoint. 232 // Check that there is an active connection to the endpoint.
235 if (!connection_handler_->CanSendMessage()) { 233 if (!connection_handler_->CanSendMessage()) {
236 base::MessageLoop::current()->PostTask( 234 base::MessageLoop::current()->PostTask(
237 FROM_HERE, 235 FROM_HERE,
238 base::Bind(message_sent_callback_, "Unable to reach endpoint")); 236 base::Bind(message_sent_callback_, "Unable to reach endpoint"));
239 return; 237 return;
240 } 238 }
241 } 239 }
242 to_send_.push_back(make_linked_ptr(packet_info)); 240 to_send_.push_back(make_linked_ptr(packet_info));
243 MaybeSendMessage(); 241 MaybeSendMessage();
244 } 242 }
245 243
246 void MCSClient::Destroy() { 244 void MCSClient::Destroy() {
247 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, 245 gcm_store_->Destroy(base::Bind(&MCSClient::OnGCMUpdateFinished,
248 weak_ptr_factory_.GetWeakPtr())); 246 weak_ptr_factory_.GetWeakPtr()));
249 } 247 }
250 248
251 void MCSClient::ResetStateAndBuildLoginRequest( 249 void MCSClient::ResetStateAndBuildLoginRequest(
252 mcs_proto::LoginRequest* request) { 250 mcs_proto::LoginRequest* request) {
253 DCHECK(android_id_); 251 DCHECK(android_id_);
254 DCHECK(security_token_); 252 DCHECK(security_token_);
255 stream_id_in_ = 0; 253 stream_id_in_ = 0;
256 stream_id_out_ = 1; 254 stream_id_out_ = 1;
257 last_device_to_server_stream_id_received_ = 0; 255 last_device_to_server_stream_id_received_ = 0;
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
302 heartbeat_timer_.Stop(); 300 heartbeat_timer_.Stop();
303 301
304 state_ = CONNECTING; 302 state_ = CONNECTING;
305 } 303 }
306 304
307 void MCSClient::SendHeartbeat() { 305 void MCSClient::SendHeartbeat() {
308 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), 306 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()),
309 false); 307 false);
310 } 308 }
311 309
312 void MCSClient::OnRMQUpdateFinished(bool success) { 310 void MCSClient::OnGCMUpdateFinished(bool success) {
313 LOG_IF(ERROR, !success) << "RMQ Update failed!"; 311 LOG_IF(ERROR, !success) << "RMQ Update failed!";
314 // TODO(zea): Rebuild the store from scratch in case of persistence failure? 312 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
315 } 313 }
316 314
317 void MCSClient::MaybeSendMessage() { 315 void MCSClient::MaybeSendMessage() {
318 if (to_send_.empty()) 316 if (to_send_.empty())
319 return; 317 return;
320 318
321 if (!connection_handler_->CanSendMessage()) 319 if (!connection_handler_->CanSendMessage())
322 return; 320 return;
(...skipping 98 matching lines...) Expand 10 before | Expand all | Expand 10 after
421 } 419 }
422 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin(); 420 for (StreamIdList::iterator iter = acked_stream_ids_to_remove.begin();
423 iter != acked_stream_ids_to_remove.end(); ++iter) { 421 iter != acked_stream_ids_to_remove.end(); ++iter) {
424 acked_server_ids_.erase(*iter); 422 acked_server_ids_.erase(*iter);
425 } 423 }
426 } 424 }
427 425
428 ++stream_id_in_; 426 ++stream_id_in_;
429 if (!persistent_id.empty()) { 427 if (!persistent_id.empty()) {
430 unacked_server_ids_[stream_id_in_] = persistent_id; 428 unacked_server_ids_[stream_id_in_] = persistent_id;
431 rmq_store_->AddIncomingMessage(persistent_id, 429 gcm_store_->AddIncomingMessage(persistent_id,
432 base::Bind(&MCSClient::OnRMQUpdateFinished, 430 base::Bind(&MCSClient::OnGCMUpdateFinished,
433 weak_ptr_factory_.GetWeakPtr())); 431 weak_ptr_factory_.GetWeakPtr()));
434 } 432 }
435 433
436 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() 434 DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
437 << " with persistent id " 435 << " with persistent id "
438 << (persistent_id.empty() ? "NULL" : persistent_id) 436 << (persistent_id.empty() ? "NULL" : persistent_id)
439 << ", stream id " << stream_id_in_ << " and last stream id received " 437 << ", stream id " << stream_id_in_ << " and last stream id received "
440 << last_stream_id_received; 438 << last_stream_id_received;
441 439
442 if (unacked_server_ids_.size() > 0 && 440 if (unacked_server_ids_.size() > 0 &&
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after
562 to_resend_.front()->stream_id <= last_stream_id_received) { 560 to_resend_.front()->stream_id <= last_stream_id_received) {
563 const MCSPacketInternal& outgoing_packet = to_resend_.front(); 561 const MCSPacketInternal& outgoing_packet = to_resend_.front();
564 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id); 562 acked_outgoing_persistent_ids.push_back(outgoing_packet->persistent_id);
565 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id); 563 acked_outgoing_stream_ids.push_back(outgoing_packet->stream_id);
566 to_resend_.pop_front(); 564 to_resend_.pop_front();
567 } 565 }
568 566
569 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size() 567 DVLOG(1) << "Server acked " << acked_outgoing_persistent_ids.size()
570 << " outgoing messages, " << to_resend_.size() 568 << " outgoing messages, " << to_resend_.size()
571 << " remaining unacked"; 569 << " remaining unacked";
572 rmq_store_->RemoveOutgoingMessages( 570 gcm_store_->RemoveOutgoingMessages(
573 acked_outgoing_persistent_ids, 571 acked_outgoing_persistent_ids,
574 base::Bind(&MCSClient::OnRMQUpdateFinished, 572 base::Bind(&MCSClient::OnGCMUpdateFinished,
575 weak_ptr_factory_.GetWeakPtr())); 573 weak_ptr_factory_.GetWeakPtr()));
576 574
577 HandleServerConfirmedReceipt(last_stream_id_received); 575 HandleServerConfirmedReceipt(last_stream_id_received);
578 } 576 }
579 577
580 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) { 578 void MCSClient::HandleSelectiveAck(const PersistentIdList& id_list) {
581 // First check the to_resend_ queue. Acknowledgments should always happen 579 // First check the to_resend_ queue. Acknowledgments should always happen
582 // in the order they were sent, so if messages are present they should match 580 // in the order they were sent, so if messages are present they should match
583 // the acknowledge list. 581 // the acknowledge list.
584 PersistentIdList::const_iterator iter = id_list.begin(); 582 PersistentIdList::const_iterator iter = id_list.begin();
(...skipping 21 matching lines...) Expand all
606 StreamId device_stream_id = outgoing_packet->stream_id; 604 StreamId device_stream_id = outgoing_packet->stream_id;
607 HandleServerConfirmedReceipt(device_stream_id); 605 HandleServerConfirmedReceipt(device_stream_id);
608 606
609 to_send_.pop_front(); 607 to_send_.pop_front();
610 } 608 }
611 609
612 DCHECK(iter == id_list.end()); 610 DCHECK(iter == id_list.end());
613 611
614 DVLOG(1) << "Server acked " << id_list.size() 612 DVLOG(1) << "Server acked " << id_list.size()
615 << " messages, " << to_resend_.size() << " remaining unacked."; 613 << " messages, " << to_resend_.size() << " remaining unacked.";
616 rmq_store_->RemoveOutgoingMessages( 614 gcm_store_->RemoveOutgoingMessages(
617 id_list, 615 id_list,
618 base::Bind(&MCSClient::OnRMQUpdateFinished, 616 base::Bind(&MCSClient::OnGCMUpdateFinished,
619 weak_ptr_factory_.GetWeakPtr())); 617 weak_ptr_factory_.GetWeakPtr()));
620 618
621 // Resend any remaining outgoing messages, as they were not received by the 619 // Resend any remaining outgoing messages, as they were not received by the
622 // server. 620 // server.
623 DVLOG(1) << "Resending " << to_resend_.size() << " messages."; 621 DVLOG(1) << "Resending " << to_resend_.size() << " messages.";
624 while (!to_resend_.empty()) { 622 while (!to_resend_.empty()) {
625 to_send_.push_front(to_resend_.back()); 623 to_send_.push_front(to_resend_.back());
626 to_resend_.pop_back(); 624 to_resend_.pop_back();
627 } 625 }
628 } 626 }
(...skipping 11 matching lines...) Expand all
640 iter != acked_server_ids_.end() && 638 iter != acked_server_ids_.end() &&
641 iter->first <= device_stream_id;) { 639 iter->first <= device_stream_id;) {
642 acked_incoming_ids.insert(acked_incoming_ids.end(), 640 acked_incoming_ids.insert(acked_incoming_ids.end(),
643 iter->second.begin(), 641 iter->second.begin(),
644 iter->second.end()); 642 iter->second.end());
645 acked_server_ids_.erase(iter++); 643 acked_server_ids_.erase(iter++);
646 } 644 }
647 645
648 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size() 646 DVLOG(1) << "Server confirmed receipt of " << acked_incoming_ids.size()
649 << " acknowledged server messages."; 647 << " acknowledged server messages.";
650 rmq_store_->RemoveIncomingMessages( 648 gcm_store_->RemoveIncomingMessages(
651 acked_incoming_ids, 649 acked_incoming_ids,
652 base::Bind(&MCSClient::OnRMQUpdateFinished, 650 base::Bind(&MCSClient::OnGCMUpdateFinished,
653 weak_ptr_factory_.GetWeakPtr())); 651 weak_ptr_factory_.GetWeakPtr()));
654 } 652 }
655 653
656 MCSClient::PersistentId MCSClient::GetNextPersistentId() { 654 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
657 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); 655 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
658 } 656 }
659 657
660 } // namespace gcm 658 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698