Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1183)

Side by Side Diff: google_apis/gcm/engine/mcs_client.cc

Issue 117513004: [GCM] Add TTL support (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Self review Created 6 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/mcs_client.h" 5 #include "google_apis/gcm/engine/mcs_client.h"
6 6
7 #include "base/basictypes.h" 7 #include "base/basictypes.h"
8 #include "base/message_loop/message_loop.h" 8 #include "base/message_loop/message_loop.h"
9 #include "base/strings/string_number_conversions.h" 9 #include "base/strings/string_number_conversions.h"
10 #include "base/time/clock.h"
11 #include "base/time/time.h"
10 #include "google_apis/gcm/base/mcs_util.h" 12 #include "google_apis/gcm/base/mcs_util.h"
11 #include "google_apis/gcm/base/socket_stream.h" 13 #include "google_apis/gcm/base/socket_stream.h"
12 #include "google_apis/gcm/engine/connection_factory.h" 14 #include "google_apis/gcm/engine/connection_factory.h"
13 #include "google_apis/gcm/engine/rmq_store.h" 15 #include "google_apis/gcm/engine/rmq_store.h"
14 16
15 using namespace google::protobuf::io; 17 using namespace google::protobuf::io;
16 18
17 namespace gcm { 19 namespace gcm {
18 20
19 namespace { 21 namespace {
(...skipping 17 matching lines...) Expand all
37 // Applies to both incoming and outgoing messages. 39 // Applies to both incoming and outgoing messages.
38 // TODO(zea): make this server configurable. 40 // TODO(zea): make this server configurable.
39 const int kUnackedMessageBeforeStreamAck = 10; 41 const int kUnackedMessageBeforeStreamAck = 10;
40 42
41 // The global maximum number of pending messages to have in the send queue. 43 // The global maximum number of pending messages to have in the send queue.
42 const size_t kMaxSendQueueSize = 10 * 1024; 44 const size_t kMaxSendQueueSize = 10 * 1024;
43 45
44 // The maximum message size that can be sent to the server. 46 // The maximum message size that can be sent to the server.
45 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server. 47 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server.
46 48
49 // Maximum amount of time to save an unsent outgoing message for.
50 const int kMaxTTLSeconds = 4 * 7 * 24 * 60 * 60; // 4 weeks.
51
47 // Helper for converting a proto persistent id list to a vector of strings. 52 // Helper for converting a proto persistent id list to a vector of strings.
48 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, 53 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes,
49 std::vector<std::string>* id_list) { 54 std::vector<std::string>* id_list) {
50 mcs_proto::SelectiveAck selective_ack; 55 mcs_proto::SelectiveAck selective_ack;
51 if (!selective_ack.ParseFromString(bytes)) 56 if (!selective_ack.ParseFromString(bytes))
52 return false; 57 return false;
53 std::vector<std::string> new_list; 58 std::vector<std::string> new_list;
54 for (int i = 0; i < selective_ack.id_size(); ++i) { 59 for (int i = 0; i < selective_ack.id_size(); ++i) {
55 DCHECK(!selective_ack.id(i).empty()); 60 DCHECK(!selective_ack.id(i).empty());
56 new_list.push_back(selective_ack.id(i)); 61 new_list.push_back(selective_ack.id(i));
(...skipping 19 matching lines...) Expand all
76 81
77 // The protobuf of the message itself. 82 // The protobuf of the message itself.
78 MCSProto protobuf; 83 MCSProto protobuf;
79 }; 84 };
80 85
81 ReliablePacketInfo::ReliablePacketInfo() 86 ReliablePacketInfo::ReliablePacketInfo()
82 : stream_id(0), tag(0) { 87 : stream_id(0), tag(0) {
83 } 88 }
84 ReliablePacketInfo::~ReliablePacketInfo() {} 89 ReliablePacketInfo::~ReliablePacketInfo() {}
85 90
86 MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) 91 MCSClient::MCSClient(base::Clock* clock,
87 : state_(UNINITIALIZED), 92 ConnectionFactory* connection_factory,
93 RMQStore* rmq_store)
94 : clock_(clock),
95 state_(UNINITIALIZED),
88 android_id_(0), 96 android_id_(0),
89 security_token_(0), 97 security_token_(0),
90 connection_factory_(connection_factory), 98 connection_factory_(connection_factory),
91 connection_handler_(NULL), 99 connection_handler_(NULL),
92 last_device_to_server_stream_id_received_(0), 100 last_device_to_server_stream_id_received_(0),
93 last_server_to_device_stream_id_received_(0), 101 last_server_to_device_stream_id_received_(0),
94 stream_id_out_(0), 102 stream_id_out_(0),
95 stream_id_in_(0), 103 stream_id_in_(0),
96 rmq_store_(rmq_store), 104 rmq_store_(rmq_store),
97 weak_ptr_factory_(this) { 105 weak_ptr_factory_(this) {
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 150
143 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() 151 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size()
144 << " incoming acks pending and " 152 << " incoming acks pending and "
145 << load_result.outgoing_messages.size() 153 << load_result.outgoing_messages.size()
146 << " outgoing messages pending."; 154 << " outgoing messages pending.";
147 155
148 restored_unackeds_server_ids_ = load_result.incoming_messages; 156 restored_unackeds_server_ids_ = load_result.incoming_messages;
149 157
150 // First go through and order the outgoing messages by recency. 158 // First go through and order the outgoing messages by recency.
151 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; 159 std::map<uint64, google::protobuf::MessageLite*> ordered_messages;
160 std::vector<PersistentId> dropped_ids;
152 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator 161 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator
153 iter = load_result.outgoing_messages.begin(); 162 iter = load_result.outgoing_messages.begin();
154 iter != load_result.outgoing_messages.end(); ++iter) { 163 iter != load_result.outgoing_messages.end(); ++iter) {
155 uint64 timestamp = 0; 164 uint64 timestamp = 0;
156 if (!base::StringToUint64(iter->first, &timestamp)) { 165 if (!base::StringToUint64(iter->first, &timestamp)) {
157 LOG(ERROR) << "Invalid restored message."; 166 LOG(ERROR) << "Invalid restored message.";
158 return; 167 return;
159 } 168 }
169
170 // Check if the TTL has expired for this message.
171 if (HasTTLExpired(*iter->second, clock_)) {
172 dropped_ids.push_back(iter->first);
173 message_sent_callback_.Run("TTL expired for " + iter->first);
174 delete iter->second;
175 continue;
176 }
177
160 ordered_messages[timestamp] = iter->second; 178 ordered_messages[timestamp] = iter->second;
161 } 179 }
162 180
181 if (!dropped_ids.empty()) {
182 rmq_store_->RemoveOutgoingMessages(
183 dropped_ids,
184 base::Bind(&MCSClient::OnRMQUpdateFinished,
185 weak_ptr_factory_.GetWeakPtr()));
186 }
187
163 // Now go through and add the outgoing messages to the send queue in their 188 // Now go through and add the outgoing messages to the send queue in their
164 // appropriate order (oldest at front, most recent at back). 189 // appropriate order (oldest at front, most recent at back).
165 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator 190 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator
166 iter = ordered_messages.begin(); 191 iter = ordered_messages.begin();
167 iter != ordered_messages.end(); ++iter) { 192 iter != ordered_messages.end(); ++iter) {
168 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 193 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
169 packet_info->protobuf.reset(iter->second); 194 packet_info->protobuf.reset(iter->second);
170 packet_info->persistent_id = base::Uint64ToString(iter->first); 195 packet_info->persistent_id = base::Uint64ToString(iter->first);
171 to_send_.push_back(make_linked_ptr(packet_info)); 196 to_send_.push_back(make_linked_ptr(packet_info));
172 } 197 }
(...skipping 14 matching lines...) Expand all
187 android_id_, 212 android_id_,
188 security_token_, 213 security_token_,
189 base::Bind(&MCSClient::OnRMQUpdateFinished, 214 base::Bind(&MCSClient::OnRMQUpdateFinished,
190 weak_ptr_factory_.GetWeakPtr())); 215 weak_ptr_factory_.GetWeakPtr()));
191 } 216 }
192 217
193 state_ = CONNECTING; 218 state_ = CONNECTING;
194 connection_factory_->Connect(); 219 connection_factory_->Connect();
195 } 220 }
196 221
197 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { 222 void MCSClient::SendMessage(const MCSMessage& message) {
198 DCHECK_EQ(state_, CONNECTED); 223 int ttl = GetTTL(message.GetProtobuf());
224 DCHECK_GE(ttl, 0);
225 DCHECK_LT(ttl, kMaxTTLSeconds);
fgorski 2013/12/28 01:15:08 DCHECK_LE
Nicolas Zea 2013/12/30 21:46:19 Done.
199 if (to_send_.size() > kMaxSendQueueSize) { 226 if (to_send_.size() > kMaxSendQueueSize) {
200 base::MessageLoop::current()->PostTask( 227 message_sent_callback_.Run("Message queue full.");
201 FROM_HERE,
202 base::Bind(message_sent_callback_, "Message queue full."));
203 return; 228 return;
204 } 229 }
205 if (message.size() > kMaxMessageBytes) { 230 if (message.size() > kMaxMessageBytes) {
206 base::MessageLoop::current()->PostTask( 231 message_sent_callback_.Run("Message too large.");
207 FROM_HERE,
208 base::Bind(message_sent_callback_, "Message too large."));
209 return; 232 return;
210 } 233 }
211 234
212 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); 235 ReliablePacketInfo* packet_info = new ReliablePacketInfo();
213 packet_info->protobuf = message.CloneProtobuf(); 236 packet_info->protobuf = message.CloneProtobuf();
214 237
215 if (use_rmq) { 238 if (ttl > 0) {
216 PersistentId persistent_id = GetNextPersistentId(); 239 PersistentId persistent_id = GetNextPersistentId();
217 DVLOG(1) << "Setting persistent id to " << persistent_id; 240 DVLOG(1) << "Setting persistent id to " << persistent_id;
218 packet_info->persistent_id = persistent_id; 241 packet_info->persistent_id = persistent_id;
219 SetPersistentId(persistent_id, 242 SetPersistentId(persistent_id,
220 packet_info->protobuf.get()); 243 packet_info->protobuf.get());
221 rmq_store_->AddOutgoingMessage(persistent_id, 244 rmq_store_->AddOutgoingMessage(persistent_id,
222 MCSMessage(message.tag(), 245 MCSMessage(message.tag(),
223 *(packet_info->protobuf)), 246 *(packet_info->protobuf)),
224 base::Bind(&MCSClient::OnRMQUpdateFinished, 247 base::Bind(&MCSClient::OnRMQUpdateFinished,
225 weak_ptr_factory_.GetWeakPtr())); 248 weak_ptr_factory_.GetWeakPtr()));
226 } else { 249 } else if (!connection_factory_->IsEndpointReachable()) {
227 // Check that there is an active connection to the endpoint. 250 DVLOG(1) << "No active connection, dropping message.";
228 if (!connection_handler_->CanSendMessage()) { 251 message_sent_callback_.Run("TTL expired");
229 base::MessageLoop::current()->PostTask( 252 return;
230 FROM_HERE,
231 base::Bind(message_sent_callback_, "Unable to reach endpoint"));
232 return;
233 }
234 } 253 }
235 to_send_.push_back(make_linked_ptr(packet_info)); 254 to_send_.push_back(make_linked_ptr(packet_info));
236 MaybeSendMessage(); 255 MaybeSendMessage();
237 } 256 }
238 257
239 void MCSClient::Destroy() { 258 void MCSClient::Destroy() {
240 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, 259 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished,
241 weak_ptr_factory_.GetWeakPtr())); 260 weak_ptr_factory_.GetWeakPtr()));
242 } 261 }
243 262
244 void MCSClient::ResetStateAndBuildLoginRequest( 263 void MCSClient::ResetStateAndBuildLoginRequest(
245 mcs_proto::LoginRequest* request) { 264 mcs_proto::LoginRequest* request) {
246 DCHECK(android_id_); 265 DCHECK(android_id_);
247 DCHECK(security_token_); 266 DCHECK(security_token_);
248 stream_id_in_ = 0; 267 stream_id_in_ = 0;
249 stream_id_out_ = 1; 268 stream_id_out_ = 1;
250 last_device_to_server_stream_id_received_ = 0; 269 last_device_to_server_stream_id_received_ = 0;
251 last_server_to_device_stream_id_received_ = 0; 270 last_server_to_device_stream_id_received_ = 0;
252 271
253 heartbeat_manager_.Stop(); 272 heartbeat_manager_.Stop();
254 273
255 // TODO(zea): expire all messages older than their TTL.
256
257 // Add any pending acknowledgments to the list of ids. 274 // Add any pending acknowledgments to the list of ids.
258 for (StreamIdToPersistentIdMap::const_iterator iter = 275 for (StreamIdToPersistentIdMap::const_iterator iter =
259 unacked_server_ids_.begin(); 276 unacked_server_ids_.begin();
260 iter != unacked_server_ids_.end(); ++iter) { 277 iter != unacked_server_ids_.end(); ++iter) {
261 restored_unackeds_server_ids_.push_back(iter->second); 278 restored_unackeds_server_ids_.push_back(iter->second);
262 } 279 }
263 unacked_server_ids_.clear(); 280 unacked_server_ids_.clear();
264 281
265 // Any acknowledged server ids which have not been confirmed by the server 282 // Any acknowledged server ids which have not been confirmed by the server
266 // are treated like unacknowledged ids. 283 // are treated like unacknowledged ids.
(...skipping 16 matching lines...) Expand all
283 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; 300 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_;
284 restored_unackeds_server_ids_.clear(); 301 restored_unackeds_server_ids_.clear();
285 302
286 // Push all unacknowledged messages to front of send queue. No need to save 303 // Push all unacknowledged messages to front of send queue. No need to save
287 // to RMQ, as all messages that reach this point should already have been 304 // to RMQ, as all messages that reach this point should already have been
288 // saved as necessary. 305 // saved as necessary.
289 while (!to_resend_.empty()) { 306 while (!to_resend_.empty()) {
290 to_send_.push_front(to_resend_.back()); 307 to_send_.push_front(to_resend_.back());
291 to_resend_.pop_back(); 308 to_resend_.pop_back();
292 } 309 }
310
311 // Drop all TTL == 0 or expired TTL messages from the queue.
312 std::deque<MCSPacketInternal> new_to_send;
313 std::vector<PersistentId> dropped_ids;
314 while (!to_send_.empty()) {
315 MCSPacketInternal packet = to_send_.front();
316 to_send_.pop_front();
317 if (GetTTL(*packet->protobuf) > 0 &&
318 !HasTTLExpired(*packet->protobuf, clock_)) {
319 new_to_send.push_back(packet);
320 } else {
321 if (!packet->persistent_id.empty())
322 dropped_ids.push_back(packet->persistent_id);
323 message_sent_callback_.Run("TTL expired for " + packet->persistent_id);
fgorski 2013/12/28 01:15:08 This message is going to be a bit pointless when p
Nicolas Zea 2013/12/30 21:46:19 Yeah, all of these messages are pointless right no
324 }
325 }
326
327 if (!dropped_ids.empty()) {
328 DVLOG(1) << "Connection reset, " << dropped_ids.size()
329 << " messages expired.";
330 rmq_store_->RemoveOutgoingMessages(
331 dropped_ids,
332 base::Bind(&MCSClient::OnRMQUpdateFinished,
333 weak_ptr_factory_.GetWeakPtr()));
334 }
335
336 to_send_.swap(new_to_send);
337
293 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() 338 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size()
294 << " incoming acks pending, and " << to_send_.size() 339 << " incoming acks pending, and " << to_send_.size()
295 << " pending outgoing messages."; 340 << " pending outgoing messages.";
296 341
297 state_ = CONNECTING; 342 state_ = CONNECTING;
298 } 343 }
299 344
300 void MCSClient::SendHeartbeat() { 345 void MCSClient::SendHeartbeat() {
301 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), 346 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()));
302 false);
303 } 347 }
304 348
305 void MCSClient::OnRMQUpdateFinished(bool success) { 349 void MCSClient::OnRMQUpdateFinished(bool success) {
306 LOG_IF(ERROR, !success) << "RMQ Update failed!"; 350 LOG_IF(ERROR, !success) << "RMQ Update failed!";
307 // TODO(zea): Rebuild the store from scratch in case of persistence failure? 351 // TODO(zea): Rebuild the store from scratch in case of persistence failure?
308 } 352 }
309 353
310 void MCSClient::MaybeSendMessage() { 354 void MCSClient::MaybeSendMessage() {
311 if (to_send_.empty()) 355 if (to_send_.empty())
312 return; 356 return;
313 357
314 if (!connection_handler_->CanSendMessage()) 358 // If the connection has been reset, do nothing. On reconnection
359 // MaybeSendMessage will be automatically invoked again.
360 // TODO(zea): consider doing TTL expiration at connection reset time, rather
361 // than reconnect time.
362 if (!connection_factory_->IsEndpointReachable())
315 return; 363 return;
316 364
317 // TODO(zea): drop messages older than their TTL.
318
319 DVLOG(1) << "Pending output message found, sending.";
320 MCSPacketInternal packet = to_send_.front(); 365 MCSPacketInternal packet = to_send_.front();
321 to_send_.pop_front(); 366 to_send_.pop_front();
367 if (HasTTLExpired(*packet->protobuf, clock_)) {
368 DCHECK(packet->persistent_id.empty());
369 DVLOG(1) << "Dropping expired message " << packet->persistent_id << ".";
370 message_sent_callback_.Run("TTL expired for " + packet->persistent_id);
371 rmq_store_->RemoveOutgoingMessage(
372 packet->persistent_id,
373 base::Bind(&MCSClient::OnRMQUpdateFinished,
374 weak_ptr_factory_.GetWeakPtr()));
375 base::MessageLoop::current()->PostTask(
376 FROM_HERE,
377 base::Bind(&MCSClient::MaybeSendMessage,
378 weak_ptr_factory_.GetWeakPtr()));
379 return;
380 }
381 DVLOG(1) << "Pending output message found, sending.";
322 if (!packet->persistent_id.empty()) 382 if (!packet->persistent_id.empty())
323 to_resend_.push_back(packet); 383 to_resend_.push_back(packet);
324 SendPacketToWire(packet.get()); 384 SendPacketToWire(packet.get());
325 } 385 }
326 386
327 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { 387 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) {
328 packet_info->stream_id = ++stream_id_out_; 388 packet_info->stream_id = ++stream_id_out_;
329 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); 389 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName();
330 390
331 // Set the proper last received stream id to acknowledge received server 391 // Set the proper last received stream id to acknowledge received server
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after
372 data.set_key(kIdleNotification); 432 data.set_key(kIdleNotification);
373 data.set_value("false"); 433 data.set_value("false");
374 response->add_app_data()->CopyFrom(data); 434 response->add_app_data()->CopyFrom(data);
375 response->set_category(kMCSCategory); 435 response->set_category(kMCSCategory);
376 } 436 }
377 } 437 }
378 438
379 if (send) { 439 if (send) {
380 SendMessage( 440 SendMessage(
381 MCSMessage(kDataMessageStanzaTag, 441 MCSMessage(kDataMessageStanzaTag,
382 response.PassAs<const google::protobuf::MessageLite>()), 442 response.PassAs<const google::protobuf::MessageLite>()));
383 false);
384 } 443 }
385 } 444 }
386 445
387 void MCSClient::HandlePacketFromWire( 446 void MCSClient::HandlePacketFromWire(
388 scoped_ptr<google::protobuf::MessageLite> protobuf) { 447 scoped_ptr<google::protobuf::MessageLite> protobuf) {
389 if (!protobuf.get()) 448 if (!protobuf.get())
390 return; 449 return;
391 uint8 tag = GetMCSProtoTag(*protobuf); 450 uint8 tag = GetMCSProtoTag(*protobuf);
392 PersistentId persistent_id = GetPersistentId(*protobuf); 451 PersistentId persistent_id = GetPersistentId(*protobuf);
393 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); 452 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf);
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
427 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() 486 DVLOG(1) << "Received message of type " << protobuf->GetTypeName()
428 << " with persistent id " 487 << " with persistent id "
429 << (persistent_id.empty() ? "NULL" : persistent_id) 488 << (persistent_id.empty() ? "NULL" : persistent_id)
430 << ", stream id " << stream_id_in_ << " and last stream id received " 489 << ", stream id " << stream_id_in_ << " and last stream id received "
431 << last_stream_id_received; 490 << last_stream_id_received;
432 491
433 if (unacked_server_ids_.size() > 0 && 492 if (unacked_server_ids_.size() > 0 &&
434 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { 493 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) {
435 SendMessage(MCSMessage(kIqStanzaTag, 494 SendMessage(MCSMessage(kIqStanzaTag,
436 BuildStreamAck(). 495 BuildStreamAck().
437 PassAs<const google::protobuf::MessageLite>()), 496 PassAs<const google::protobuf::MessageLite>()));
438 false);
439 } 497 }
440 498
441 // The connection is alive, treat this message as a heartbeat ack. 499 // The connection is alive, treat this message as a heartbeat ack.
442 heartbeat_manager_.OnHeartbeatAcked(); 500 heartbeat_manager_.OnHeartbeatAcked();
443 501
444 switch (tag) { 502 switch (tag) {
445 case kLoginResponseTag: { 503 case kLoginResponseTag: {
446 DCHECK_EQ(CONNECTING, state_); 504 DCHECK_EQ(CONNECTING, state_);
447 mcs_proto::LoginResponse* login_response = 505 mcs_proto::LoginResponse* login_response =
448 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); 506 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get());
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
486 base::Bind(&MCSClient::SendHeartbeat, 544 base::Bind(&MCSClient::SendHeartbeat,
487 weak_ptr_factory_.GetWeakPtr()), 545 weak_ptr_factory_.GetWeakPtr()),
488 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, 546 base::Bind(&MCSClient::OnConnectionResetByHeartbeat,
489 weak_ptr_factory_.GetWeakPtr())); 547 weak_ptr_factory_.GetWeakPtr()));
490 return; 548 return;
491 } 549 }
492 case kHeartbeatPingTag: 550 case kHeartbeatPingTag:
493 DCHECK_GE(stream_id_in_, 1U); 551 DCHECK_GE(stream_id_in_, 1U);
494 DVLOG(1) << "Received heartbeat ping, sending ack."; 552 DVLOG(1) << "Received heartbeat ping, sending ack.";
495 SendMessage( 553 SendMessage(
496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); 554 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()));
497 return; 555 return;
498 case kHeartbeatAckTag: 556 case kHeartbeatAckTag:
499 DCHECK_GE(stream_id_in_, 1U); 557 DCHECK_GE(stream_id_in_, 1U);
500 DVLOG(1) << "Received heartbeat ack."; 558 DVLOG(1) << "Received heartbeat ack.";
501 // Do nothing else, all messages act as heartbeat acks. 559 // Do nothing else, all messages act as heartbeat acks.
502 return; 560 return;
503 case kCloseTag: 561 case kCloseTag:
504 LOG(ERROR) << "Received close command, resetting connection."; 562 LOG(ERROR) << "Received close command, resetting connection.";
505 state_ = LOADED; 563 state_ = LOADED;
506 connection_factory_->SignalConnectionReset(); 564 connection_factory_->SignalConnectionReset();
(...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after
654 712
655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { 713 MCSClient::PersistentId MCSClient::GetNextPersistentId() {
656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); 714 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue());
657 } 715 }
658 716
659 void MCSClient::OnConnectionResetByHeartbeat() { 717 void MCSClient::OnConnectionResetByHeartbeat() {
660 connection_factory_->SignalConnectionReset(); 718 connection_factory_->SignalConnectionReset();
661 } 719 }
662 720
663 } // namespace gcm 721 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698