Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(14)

Side by Side Diff: google_apis/gcm/engine/mcs_client.cc

Issue 117513004: [GCM] Add TTL support (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: fix probe tool Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/engine/mcs_client_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/mcs_client.h" 5 #include "google_apis/gcm/engine/mcs_client.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h" 8 #include "base/message_loop/message_loop.h"
9 #include "base/strings/string_number_conversions.h" 9 #include "base/strings/string_number_conversions.h"
10 #include "base/time/clock.h"
11 #include "base/time/time.h"
10 #include "google_apis/gcm/base/mcs_util.h" 12 #include "google_apis/gcm/base/mcs_util.h"
11 #include "google_apis/gcm/base/socket_stream.h" 13 #include "google_apis/gcm/base/socket_stream.h"
12 #include "google_apis/gcm/engine/connection_factory.h" 14 #include "google_apis/gcm/engine/connection_factory.h"
13 #include "google_apis/gcm/engine/rmq_store.h" 15 #include "google_apis/gcm/engine/rmq_store.h"
14 16
15 using namespace google::protobuf::io; 17 using namespace google::protobuf::io;
16 18
17 namespace gcm { 19 namespace gcm {
18 20
19 namespace { 21 namespace {
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
76 78
77 // The protobuf of the message itself. 79 // The protobuf of the message itself.
78 MCSProto protobuf; 80 MCSProto protobuf;
79 }; 81 };
80 82
81 ReliablePacketInfo::ReliablePacketInfo() 83 ReliablePacketInfo::ReliablePacketInfo()
82 : stream_id(0), tag(0) { 84 : stream_id(0), tag(0) {
83 } 85 }
84 ReliablePacketInfo::~ReliablePacketInfo() {} 86 ReliablePacketInfo::~ReliablePacketInfo() {}
85 87
86 MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) 88 MCSClient::MCSClient(base::Clock* clock,
87 : state_(UNINITIALIZED), 89 ConnectionFactory* connection_factory,
90 RMQStore* rmq_store)
91 : clock_(clock),
92 state_(UNINITIALIZED),
88 android_id_(0), 93 android_id_(0),
89 security_token_(0), 94 security_token_(0),
90 connection_factory_(connection_factory), 95 connection_factory_(connection_factory),
91 connection_handler_(NULL), 96 connection_handler_(NULL),
92 last_device_to_server_stream_id_received_(0), 97 last_device_to_server_stream_id_received_(0),
93 last_server_to_device_stream_id_received_(0), 98 last_server_to_device_stream_id_received_(0),
94 stream_id_out_(0), 99 stream_id_out_(0),
95 stream_id_in_(0), 100 stream_id_in_(0),
96 rmq_store_(rmq_store), 101 rmq_store_(rmq_store),
97 weak_ptr_factory_(this) { 102 weak_ptr_factory_(this) {
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 147
143 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() 148 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size()
144 << " incoming acks pending and " 149 << " incoming acks pending and "
145 << load_result.outgoing_messages.size() 150 << load_result.outgoing_messages.size()
146 << " outgoing messages pending."; 151 << " outgoing messages pending.";
147 152
148 restored_unackeds_server_ids_ = load_result.incoming_messages; 153 restored_unackeds_server_ids_ = load_result.incoming_messages;
149 154
150 // First go through and order the outgoing messages by recency. 155 // First go through and order the outgoing messages by recency.
151 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; 156 std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
157 std::vector<PersistentId> expired_ttl_ids;
152 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator 158 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator
153 iter = load_result.outgoing_messages.begin(); 159 iter = load_result.outgoing_messages.begin();
154 iter != load_result.outgoing_messages.end(); ++iter) { 160 iter != load_result.outgoing_messages.end(); ++iter) {
155 uint64 timestamp = 0; 161 uint64 timestamp = 0;
156 if (!base::StringToUint64(iter->first, &timestamp)) { 162 if (!base::StringToUint64(iter->first, &timestamp)) {
157 LOG(ERROR) << "Invalid restored message."; 163 LOG(ERROR) << "Invalid restored message.";
158 return; 164 return;
159 } 165 }
166
167 // Check if the TTL has expired for this message.
168 if (HasTTLExpired(*iter->second, clock_)) {
169 expired_ttl_ids.push_back(iter->first);
170 message_sent_callback_.Run("TTL expired for " + iter->first);
171 delete iter->second;
172 continue;
173 }
174
160 ordered_messages[timestamp] = iter->second; 175 ordered_messages[timestamp] = iter->second;
161 } 176 }
162 177
178 if (!expired_ttl_ids.empty()) {
179 rmq_store_->RemoveOutgoingMessages(
180 expired_ttl_ids,
181 base::Bind(&MCSClient::OnRMQUpdateFinished,
182 weak_ptr_factory_.GetWeakPtr()));
183 }
184
163 // Now go through and add the outgoing messages to the send queue in their 185 // Now go through and add the outgoing messages to the send queue in their
164 // appropriate order (oldest at front, most recent at back). 186 // appropriate order (oldest at front, most recent at back).
165 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator 187 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator
166 iter = ordered_messages.begin(); 188 iter = ordered_messages.begin();
167 iter != ordered_messages.end(); ++iter) { 189 iter != ordered_messages.end(); ++iter) {
168 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 190 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
169 packet_info->protobuf.reset(iter->second); 191 packet_info->protobuf.reset(iter->second);
170 packet_info->persistent_id = base::Uint64ToString(iter->first); 192 packet_info->persistent_id = base::Uint64ToString(iter->first);
171 to_send_.push_back(make_linked_ptr(packet_info)); 193 to_send_.push_back(make_linked_ptr(packet_info));
172 } 194 }
(...skipping 14 matching lines...) Expand all
187 android_id_, 209 android_id_,
188 security_token_, 210 security_token_,
189 base::Bind(&MCSClient::OnRMQUpdateFinished, 211 base::Bind(&MCSClient::OnRMQUpdateFinished,
190 weak_ptr_factory_.GetWeakPtr())); 212 weak_ptr_factory_.GetWeakPtr()));
191 } 213 }
192 214
193 state_ = CONNECTING; 215 state_ = CONNECTING;
194 connection_factory_->Connect(); 216 connection_factory_->Connect();
195 } 217 }
196 218
197 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { 219 void MCSClient::SendMessage(const MCSMessage& message) {
198 DCHECK_EQ(state_, CONNECTED); 220 int ttl = GetTTL(message.GetProtobuf());
221 DCHECK_GE(ttl, 0);
199 if (to_send_.size() > kMaxSendQueueSize) { 222 if (to_send_.size() > kMaxSendQueueSize) {
200 base::MessageLoop::current()->PostTask( 223 message_sent_callback_.Run("Message queue full.");
201 FROM_HERE,
202 base::Bind(message_sent_callback_, "Message queue full."));
203 return; 224 return;
204 } 225 }
205 if (message.size() > kMaxMessageBytes) { 226 if (message.size() > kMaxMessageBytes) {
206 base::MessageLoop::current()->PostTask( 227 message_sent_callback_.Run("Message too large.");
207 FROM_HERE,
208 base::Bind(message_sent_callback_, "Message too large."));
209 return; 228 return;
210 } 229 }
211 230
212 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 231 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
213 packet_info->protobuf = message.CloneProtobuf(); 232 packet_info->protobuf = message.CloneProtobuf();
214 233
215 if (use_rmq) { 234 if (ttl > 0) {
216 PersistentId persistent_id = GetNextPersistentId(); 235 PersistentId persistent_id = GetNextPersistentId();
217 DVLOG(1) << "Setting persistent id to " << persistent_id; 236 DVLOG(1) << "Setting persistent id to " << persistent_id;
218 packet_info->persistent_id = persistent_id; 237 packet_info->persistent_id = persistent_id;
219 SetPersistentId(persistent_id, 238 SetPersistentId(persistent_id,
220 packet_info->protobuf.get()); 239 packet_info->protobuf.get());
221 rmq_store_->AddOutgoingMessage(persistent_id, 240 rmq_store_->AddOutgoingMessage(persistent_id,
222 MCSMessage(message.tag(), 241 MCSMessage(message.tag(),
223 *(packet_info->protobuf)), 242 *(packet_info->protobuf)),
224 base::Bind(&MCSClient::OnRMQUpdateFinished, 243 base::Bind(&MCSClient::OnRMQUpdateFinished,
225 weak_ptr_factory_.GetWeakPtr())); 244 weak_ptr_factory_.GetWeakPtr()));
226 } else { 245 } else if (!connection_factory_->IsEndpointReachable()) {
227 // Check that there is an active connection to the endpoint. 246 DVLOG(1) << "No active connection, dropping message.";
228 if (!connection_handler_->CanSendMessage()) { 247 message_sent_callback_.Run("TTL expired");
229 base::MessageLoop::current()->PostTask( 248 return;
230 FROM_HERE,
231 base::Bind(message_sent_callback_, "Unable to reach endpoint"));
232 return;
233 }
234 } 249 }
235 to_send_.push_back(make_linked_ptr(packet_info)); 250 to_send_.push_back(make_linked_ptr(packet_info));
236 MaybeSendMessage(); 251 MaybeSendMessage();
237 } 252 }
238 253
239 void MCSClient::Destroy() { 254 void MCSClient::Destroy() {
240 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, 255 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished,
241 weak_ptr_factory_.GetWeakPtr())); 256 weak_ptr_factory_.GetWeakPtr()));
242 } 257 }
243 258
244 void MCSClient::ResetStateAndBuildLoginRequest( 259 void MCSClient::ResetStateAndBuildLoginRequest(
245 mcs_proto::LoginRequest* request) { 260 mcs_proto::LoginRequest* request) {
246 DCHECK(android_id_); 261 DCHECK(android_id_);
247 DCHECK(security_token_); 262 DCHECK(security_token_);
248 stream_id_in_ = 0; 263 stream_id_in_ = 0;
249 stream_id_out_ = 1; 264 stream_id_out_ = 1;
250 last_device_to_server_stream_id_received_ = 0; 265 last_device_to_server_stream_id_received_ = 0;
251 last_server_to_device_stream_id_received_ = 0; 266 last_server_to_device_stream_id_received_ = 0;
252 267
253 heartbeat_manager_.Stop(); 268 heartbeat_manager_.Stop();
254 269
255 // TODO(zea): expire all messages older than their TTL.
256
257 // Add any pending acknowledgments to the list of ids. 270 // Add any pending acknowledgments to the list of ids.
258 for (StreamIdToPersistentIdMap::const_iterator iter = 271 for (StreamIdToPersistentIdMap::const_iterator iter =
259 unacked_server_ids_.begin(); 272 unacked_server_ids_.begin();
260 iter != unacked_server_ids_.end(); ++iter) { 273 iter != unacked_server_ids_.end(); ++iter) {
261 restored_unackeds_server_ids_.push_back(iter->second); 274 restored_unackeds_server_ids_.push_back(iter->second);
262 } 275 }
263 unacked_server_ids_.clear(); 276 unacked_server_ids_.clear();
264 277
265 // Any acknowledged server ids which have not been confirmed by the server 278 // Any acknowledged server ids which have not been confirmed by the server
266 // are treated like unacknowledged ids. 279 // are treated like unacknowledged ids.
(...skipping 16 matching lines...) Expand all
283 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; 296 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
284 restored_unackeds_server_ids_.clear(); 297 restored_unackeds_server_ids_.clear();
285 298
286 // Push all unacknowledged messages to front of send queue. No need to save 299 // Push all unacknowledged messages to front of send queue. No need to save
287 // to RMQ, as all messages that reach this point should already have been 300 // to RMQ, as all messages that reach this point should already have been
288 // saved as necessary. 301 // saved as necessary.
289 while (!to_resend_.empty()) { 302 while (!to_resend_.empty()) {
290 to_send_.push_front(to_resend_.back()); 303 to_send_.push_front(to_resend_.back());
291 to_resend_.pop_back(); 304 to_resend_.pop_back();
292 } 305 }
306
307 // Drop all TTL == 0 or expired TTL messages from the queue.
308 std::deque<MCSPacketInternal> new_to_send;
309 std::vector<PersistentId> expired_ttl_ids;
310 while (!to_send_.empty()) {
311 MCSPacketInternal packet = to_send_.front();
312 to_send_.pop_front();
313 if (GetTTL(*packet->protobuf) > 0 &&
314 !HasTTLExpired(*packet->protobuf, clock_)) {
315 new_to_send.push_back(packet);
316 } else {
317 // If the TTL was 0 there is no persistent id, so no need to remove the
318 // message from the persistent store.
319 if (!packet->persistent_id.empty())
320 expired_ttl_ids.push_back(packet->persistent_id);
321 message_sent_callback_.Run("TTL expired");
322 }
323 }
324
325 if (!expired_ttl_ids.empty()) {
326 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size()
327 << " messages expired.";
328 rmq_store_->RemoveOutgoingMessages(
329 expired_ttl_ids,
330 base::Bind(&MCSClient::OnRMQUpdateFinished,
331 weak_ptr_factory_.GetWeakPtr()));
332 }
333
334 to_send_.swap(new_to_send);
335
293 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() 336 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
294 << " incoming acks pending, and " << to_send_.size() 337 << " incoming acks pending, and " << to_send_.size()
295 << " pending outgoing messages."; 338 << " pending outgoing messages.";
296 339
297 state_ = CONNECTING; 340 state_ = CONNECTING;
298 } 341 }
299 342
300 void MCSClient::SendHeartbeat() { 343 void MCSClient::SendHeartbeat() {
301 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), 344 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
302 false);
303 } 345 }
304 346
305 void MCSClient::OnRMQUpdateFinished(bool success) { 347 void MCSClient::OnRMQUpdateFinished(bool success) {
306 LOG_IF(ERROR, !success) << "RMQ Update failed!"; 348 LOG_IF(ERROR, !success) << "RMQ Update failed!";
307 // TODO(zea): Rebuild the store from scratch in case of persistence failure? 349 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
308 } 350 }
309 351
310 void MCSClient::MaybeSendMessage() { 352 void MCSClient::MaybeSendMessage() {
311 if (to_send_.empty()) 353 if (to_send_.empty())
312 return; 354 return;
313 355
314 if (!connection_handler_->CanSendMessage()) 356 // If the connection has been reset, do nothing. On reconnection
357 // MaybeSendMessage will be automatically invoked again.
358 // TODO(zea): consider doing TTL expiration at connection reset time, rather
359 // than reconnect time.
360 if (!connection_factory_->IsEndpointReachable())
315 return; 361 return;
316 362
317 // TODO(zea): drop messages older than their TTL.
318
319 DVLOG(1) << "Pending output message found, sending.";
320 MCSPacketInternal packet = to_send_.front(); 363 MCSPacketInternal packet = to_send_.front();
321 to_send_.pop_front(); 364 to_send_.pop_front();
365 if (HasTTLExpired(*packet->protobuf, clock_)) {
366 DCHECK(!packet->persistent_id.empty());
367 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
368 message_sent_callback_.Run("TTL expired for " + packet->persistent_id);
369 rmq_store_->RemoveOutgoingMessage(
370 packet->persistent_id,
371 base::Bind(&MCSClient::OnRMQUpdateFinished,
372 weak_ptr_factory_.GetWeakPtr()));
373 base::MessageLoop::current()->PostTask(
374 FROM_HERE,
375 base::Bind(&MCSClient::MaybeSendMessage,
376 weak_ptr_factory_.GetWeakPtr()));
377 return;
378 }
379 DVLOG(1) << "Pending output message found, sending.";
322 if (!packet->persistent_id.empty()) 380 if (!packet->persistent_id.empty())
323 to_resend_.push_back(packet); 381 to_resend_.push_back(packet);
324 SendPacketToWire(packet.get()); 382 SendPacketToWire(packet.get());
325 } 383 }
326 384
327 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { 385 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
328 packet_info->stream_id = ++stream_id_out_; 386 packet_info->stream_id = ++stream_id_out_;
329 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); 387 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
330 388
331 // Set the proper last received stream id to acknowledge received server 389 // Set the proper last received stream id to acknowledge received server
(...skipping 23 matching lines...) Expand all
355 413
356 void MCSClient::HandleMCSDataMesssage( 414 void MCSClient::HandleMCSDataMesssage(
357 scoped_ptr<google::protobuf::MessageLite> protobuf) { 415 scoped_ptr<google::protobuf::MessageLite> protobuf) {
358 mcs_proto::DataMessageStanza* data_message = 416 mcs_proto::DataMessageStanza* data_message =
359 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); 417 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get());
360 // TODO(zea): implement a proper status manager rather than hardcoding these 418 // TODO(zea): implement a proper status manager rather than hardcoding these
361 // values. 419 // values.
362 scoped_ptr<mcs_proto::DataMessageStanza> response( 420 scoped_ptr<mcs_proto::DataMessageStanza> response(
363 new mcs_proto::DataMessageStanza()); 421 new mcs_proto::DataMessageStanza());
364 response->set_from(kGCMFromField); 422 response->set_from(kGCMFromField);
423 response->set_sent(clock_->Now().ToInternalValue() /
424 base::Time::kMicrosecondsPerSecond);
425 response->set_ttl(0);
365 bool send = false; 426 bool send = false;
366 for (int i = 0; i < data_message->app_data_size(); ++i) { 427 for (int i = 0; i < data_message->app_data_size(); ++i) {
367 const mcs_proto::AppData& app_data = data_message->app_data(i); 428 const mcs_proto::AppData& app_data = data_message->app_data(i);
368 if (app_data.key() == kIdleNotification) { 429 if (app_data.key() == kIdleNotification) {
369 // Tell the MCS server the client is not idle. 430 // Tell the MCS server the client is not idle.
370 send = true; 431 send = true;
371 mcs_proto::AppData data; 432 mcs_proto::AppData data;
372 data.set_key(kIdleNotification); 433 data.set_key(kIdleNotification);
373 data.set_value("false"); 434 data.set_value("false");
374 response->add_app_data()->CopyFrom(data); 435 response->add_app_data()->CopyFrom(data);
375 response->set_category(kMCSCategory); 436 response->set_category(kMCSCategory);
376 } 437 }
377 } 438 }
378 439
379 if (send) { 440 if (send) {
380 SendMessage( 441 SendMessage(
381 MCSMessage(kDataMessageStanzaTag, 442 MCSMessage(kDataMessageStanzaTag,
382 response.PassAs<const google::protobuf::MessageLite>()), 443 response.PassAs<const google::protobuf::MessageLite>()));
383 false);
384 } 444 }
385 } 445 }
386 446
387 void MCSClient::HandlePacketFromWire( 447 void MCSClient::HandlePacketFromWire(
388 scoped_ptr<google::protobuf::MessageLite> protobuf) { 448 scoped_ptr<google::protobuf::MessageLite> protobuf) {
389 if (!protobuf.get()) 449 if (!protobuf.get())
390 return; 450 return;
391 uint8 tag = GetMCSProtoTag(*protobuf); 451 uint8 tag = GetMCSProtoTag(*protobuf);
392 PersistentId persistent_id = GetPersistentId(*protobuf); 452 PersistentId persistent_id = GetPersistentId(*protobuf);
393 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); 453 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
427 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() 487 DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
428 << " with persistent id " 488 << " with persistent id "
429 << (persistent_id.empty() ? "NULL" : persistent_id) 489 << (persistent_id.empty() ? "NULL" : persistent_id)
430 << ", stream id " << stream_id_in_ << " and last stream id received " 490 << ", stream id " << stream_id_in_ << " and last stream id received "
431 << last_stream_id_received; 491 << last_stream_id_received;
432 492
433 if (unacked_server_ids_.size() > 0 && 493 if (unacked_server_ids_.size() > 0 &&
434 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { 494 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
435 SendMessage(MCSMessage(kIqStanzaTag, 495 SendMessage(MCSMessage(kIqStanzaTag,
436 BuildStreamAck(). 496 BuildStreamAck().
437 PassAs<const google::protobuf::MessageLite>()), 497 PassAs<const google::protobuf::MessageLite>()));
438 false);
439 } 498 }
440 499
441 // The connection is alive, treat this message as a heartbeat ack. 500 // The connection is alive, treat this message as a heartbeat ack.
442 heartbeat_manager_.OnHeartbeatAcked(); 501 heartbeat_manager_.OnHeartbeatAcked();
443 502
444 switch (tag) { 503 switch (tag) {
445 case kLoginResponseTag: { 504 case kLoginResponseTag: {
446 DCHECK_EQ(CONNECTING, state_); 505 DCHECK_EQ(CONNECTING, state_);
447 mcs_proto::LoginResponse* login_response = 506 mcs_proto::LoginResponse* login_response =
448 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); 507 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
486 base::Bind(&MCSClient::SendHeartbeat, 545 base::Bind(&MCSClient::SendHeartbeat,
487 weak_ptr_factory_.GetWeakPtr()), 546 weak_ptr_factory_.GetWeakPtr()),
488 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, 547 base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
489 weak_ptr_factory_.GetWeakPtr())); 548 weak_ptr_factory_.GetWeakPtr()));
490 return; 549 return;
491 } 550 }
492 case kHeartbeatPingTag: 551 case kHeartbeatPingTag:
493 DCHECK_GE(stream_id_in_, 1U); 552 DCHECK_GE(stream_id_in_, 1U);
494 DVLOG(1) << "Received heartbeat ping, sending ack."; 553 DVLOG(1) << "Received heartbeat ping, sending ack.";
495 SendMessage( 554 SendMessage(
496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); 555 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
497 return; 556 return;
498 case kHeartbeatAckTag: 557 case kHeartbeatAckTag:
499 DCHECK_GE(stream_id_in_, 1U); 558 DCHECK_GE(stream_id_in_, 1U);
500 DVLOG(1) << "Received heartbeat ack."; 559 DVLOG(1) << "Received heartbeat ack.";
501 // Do nothing else, all messages act as heartbeat acks. 560 // Do nothing else, all messages act as heartbeat acks.
502 return; 561 return;
503 case kCloseTag: 562 case kCloseTag:
504 LOG(ERROR) << "Received close command, resetting connection."; 563 LOG(ERROR) << "Received close command, resetting connection.";
505 state_ = LOADED; 564 state_ = LOADED;
506 connection_factory_->SignalConnectionReset(); 565 connection_factory_->SignalConnectionReset();
(...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after
654 713
655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { 714 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); 715 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
657 } 716 }
658 717
659 void MCSClient::OnConnectionResetByHeartbeat() { 718 void MCSClient::OnConnectionResetByHeartbeat() {
660 connection_factory_->SignalConnectionReset(); 719 connection_factory_->SignalConnectionReset();
661 } 720 }
662 721
663 } // namespace gcm 722 } // namespace gcm
OLDNEW
« no previous file with comments | « google_apis/gcm/engine/mcs_client.h ('k') | google_apis/gcm/engine/mcs_client_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698