OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
6 | 6 |
7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
| 10 #include "base/time/clock.h" |
| 11 #include "base/time/time.h" |
10 #include "google_apis/gcm/base/mcs_util.h" | 12 #include "google_apis/gcm/base/mcs_util.h" |
11 #include "google_apis/gcm/base/socket_stream.h" | 13 #include "google_apis/gcm/base/socket_stream.h" |
12 #include "google_apis/gcm/engine/connection_factory.h" | 14 #include "google_apis/gcm/engine/connection_factory.h" |
13 #include "google_apis/gcm/engine/rmq_store.h" | 15 #include "google_apis/gcm/engine/rmq_store.h" |
14 | 16 |
15 using namespace google::protobuf::io; | 17 using namespace google::protobuf::io; |
16 | 18 |
17 namespace gcm { | 19 namespace gcm { |
18 | 20 |
19 namespace { | 21 namespace { |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
76 | 78 |
77 // The protobuf of the message itself. | 79 // The protobuf of the message itself. |
78 MCSProto protobuf; | 80 MCSProto protobuf; |
79 }; | 81 }; |
80 | 82 |
81 ReliablePacketInfo::ReliablePacketInfo() | 83 ReliablePacketInfo::ReliablePacketInfo() |
82 : stream_id(0), tag(0) { | 84 : stream_id(0), tag(0) { |
83 } | 85 } |
84 ReliablePacketInfo::~ReliablePacketInfo() {} | 86 ReliablePacketInfo::~ReliablePacketInfo() {} |
85 | 87 |
86 MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) | 88 MCSClient::MCSClient(base::Clock* clock, |
87 : state_(UNINITIALIZED), | 89 ConnectionFactory* connection_factory, |
| 90 RMQStore* rmq_store) |
| 91 : clock_(clock), |
| 92 state_(UNINITIALIZED), |
88 android_id_(0), | 93 android_id_(0), |
89 security_token_(0), | 94 security_token_(0), |
90 connection_factory_(connection_factory), | 95 connection_factory_(connection_factory), |
91 connection_handler_(NULL), | 96 connection_handler_(NULL), |
92 last_device_to_server_stream_id_received_(0), | 97 last_device_to_server_stream_id_received_(0), |
93 last_server_to_device_stream_id_received_(0), | 98 last_server_to_device_stream_id_received_(0), |
94 stream_id_out_(0), | 99 stream_id_out_(0), |
95 stream_id_in_(0), | 100 stream_id_in_(0), |
96 rmq_store_(rmq_store), | 101 rmq_store_(rmq_store), |
97 weak_ptr_factory_(this) { | 102 weak_ptr_factory_(this) { |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
142 | 147 |
143 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() | 148 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() |
144 << " incoming acks pending and " | 149 << " incoming acks pending and " |
145 << load_result.outgoing_messages.size() | 150 << load_result.outgoing_messages.size() |
146 << " outgoing messages pending."; | 151 << " outgoing messages pending."; |
147 | 152 |
148 restored_unackeds_server_ids_ = load_result.incoming_messages; | 153 restored_unackeds_server_ids_ = load_result.incoming_messages; |
149 | 154 |
150 // First go through and order the outgoing messages by recency. | 155 // First go through and order the outgoing messages by recency. |
151 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; | 156 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; |
| 157 std::vector<PersistentId> expired_ttl_ids; |
152 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator | 158 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator |
153 iter = load_result.outgoing_messages.begin(); | 159 iter = load_result.outgoing_messages.begin(); |
154 iter != load_result.outgoing_messages.end(); ++iter) { | 160 iter != load_result.outgoing_messages.end(); ++iter) { |
155 uint64 timestamp = 0; | 161 uint64 timestamp = 0; |
156 if (!base::StringToUint64(iter->first, ×tamp)) { | 162 if (!base::StringToUint64(iter->first, ×tamp)) { |
157 LOG(ERROR) << "Invalid restored message."; | 163 LOG(ERROR) << "Invalid restored message."; |
158 return; | 164 return; |
159 } | 165 } |
| 166 |
| 167 // Check if the TTL has expired for this message. |
| 168 if (HasTTLExpired(*iter->second, clock_)) { |
| 169 expired_ttl_ids.push_back(iter->first); |
| 170 message_sent_callback_.Run("TTL expired for " + iter->first); |
| 171 delete iter->second; |
| 172 continue; |
| 173 } |
| 174 |
160 ordered_messages[timestamp] = iter->second; | 175 ordered_messages[timestamp] = iter->second; |
161 } | 176 } |
162 | 177 |
| 178 if (!expired_ttl_ids.empty()) { |
| 179 rmq_store_->RemoveOutgoingMessages( |
| 180 expired_ttl_ids, |
| 181 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 182 weak_ptr_factory_.GetWeakPtr())); |
| 183 } |
| 184 |
163 // Now go through and add the outgoing messages to the send queue in their | 185 // Now go through and add the outgoing messages to the send queue in their |
164 // appropriate order (oldest at front, most recent at back). | 186 // appropriate order (oldest at front, most recent at back). |
165 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator | 187 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator |
166 iter = ordered_messages.begin(); | 188 iter = ordered_messages.begin(); |
167 iter != ordered_messages.end(); ++iter) { | 189 iter != ordered_messages.end(); ++iter) { |
168 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 190 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
169 packet_info->protobuf.reset(iter->second); | 191 packet_info->protobuf.reset(iter->second); |
170 packet_info->persistent_id = base::Uint64ToString(iter->first); | 192 packet_info->persistent_id = base::Uint64ToString(iter->first); |
171 to_send_.push_back(make_linked_ptr(packet_info)); | 193 to_send_.push_back(make_linked_ptr(packet_info)); |
172 } | 194 } |
(...skipping 14 matching lines...) Expand all Loading... |
187 android_id_, | 209 android_id_, |
188 security_token_, | 210 security_token_, |
189 base::Bind(&MCSClient::OnRMQUpdateFinished, | 211 base::Bind(&MCSClient::OnRMQUpdateFinished, |
190 weak_ptr_factory_.GetWeakPtr())); | 212 weak_ptr_factory_.GetWeakPtr())); |
191 } | 213 } |
192 | 214 |
193 state_ = CONNECTING; | 215 state_ = CONNECTING; |
194 connection_factory_->Connect(); | 216 connection_factory_->Connect(); |
195 } | 217 } |
196 | 218 |
197 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { | 219 void MCSClient::SendMessage(const MCSMessage& message) { |
198 DCHECK_EQ(state_, CONNECTED); | 220 int ttl = GetTTL(message.GetProtobuf()); |
| 221 DCHECK_GE(ttl, 0); |
199 if (to_send_.size() > kMaxSendQueueSize) { | 222 if (to_send_.size() > kMaxSendQueueSize) { |
200 base::MessageLoop::current()->PostTask( | 223 message_sent_callback_.Run("Message queue full."); |
201 FROM_HERE, | |
202 base::Bind(message_sent_callback_, "Message queue full.")); | |
203 return; | 224 return; |
204 } | 225 } |
205 if (message.size() > kMaxMessageBytes) { | 226 if (message.size() > kMaxMessageBytes) { |
206 base::MessageLoop::current()->PostTask( | 227 message_sent_callback_.Run("Message too large."); |
207 FROM_HERE, | |
208 base::Bind(message_sent_callback_, "Message too large.")); | |
209 return; | 228 return; |
210 } | 229 } |
211 | 230 |
212 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 231 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
213 packet_info->protobuf = message.CloneProtobuf(); | 232 packet_info->protobuf = message.CloneProtobuf(); |
214 | 233 |
215 if (use_rmq) { | 234 if (ttl > 0) { |
216 PersistentId persistent_id = GetNextPersistentId(); | 235 PersistentId persistent_id = GetNextPersistentId(); |
217 DVLOG(1) << "Setting persistent id to " << persistent_id; | 236 DVLOG(1) << "Setting persistent id to " << persistent_id; |
218 packet_info->persistent_id = persistent_id; | 237 packet_info->persistent_id = persistent_id; |
219 SetPersistentId(persistent_id, | 238 SetPersistentId(persistent_id, |
220 packet_info->protobuf.get()); | 239 packet_info->protobuf.get()); |
221 rmq_store_->AddOutgoingMessage(persistent_id, | 240 rmq_store_->AddOutgoingMessage(persistent_id, |
222 MCSMessage(message.tag(), | 241 MCSMessage(message.tag(), |
223 *(packet_info->protobuf)), | 242 *(packet_info->protobuf)), |
224 base::Bind(&MCSClient::OnRMQUpdateFinished, | 243 base::Bind(&MCSClient::OnRMQUpdateFinished, |
225 weak_ptr_factory_.GetWeakPtr())); | 244 weak_ptr_factory_.GetWeakPtr())); |
226 } else { | 245 } else if (!connection_factory_->IsEndpointReachable()) { |
227 // Check that there is an active connection to the endpoint. | 246 DVLOG(1) << "No active connection, dropping message."; |
228 if (!connection_handler_->CanSendMessage()) { | 247 message_sent_callback_.Run("TTL expired"); |
229 base::MessageLoop::current()->PostTask( | 248 return; |
230 FROM_HERE, | |
231 base::Bind(message_sent_callback_, "Unable to reach endpoint")); | |
232 return; | |
233 } | |
234 } | 249 } |
235 to_send_.push_back(make_linked_ptr(packet_info)); | 250 to_send_.push_back(make_linked_ptr(packet_info)); |
236 MaybeSendMessage(); | 251 MaybeSendMessage(); |
237 } | 252 } |
238 | 253 |
239 void MCSClient::Destroy() { | 254 void MCSClient::Destroy() { |
240 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, | 255 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, |
241 weak_ptr_factory_.GetWeakPtr())); | 256 weak_ptr_factory_.GetWeakPtr())); |
242 } | 257 } |
243 | 258 |
244 void MCSClient::ResetStateAndBuildLoginRequest( | 259 void MCSClient::ResetStateAndBuildLoginRequest( |
245 mcs_proto::LoginRequest* request) { | 260 mcs_proto::LoginRequest* request) { |
246 DCHECK(android_id_); | 261 DCHECK(android_id_); |
247 DCHECK(security_token_); | 262 DCHECK(security_token_); |
248 stream_id_in_ = 0; | 263 stream_id_in_ = 0; |
249 stream_id_out_ = 1; | 264 stream_id_out_ = 1; |
250 last_device_to_server_stream_id_received_ = 0; | 265 last_device_to_server_stream_id_received_ = 0; |
251 last_server_to_device_stream_id_received_ = 0; | 266 last_server_to_device_stream_id_received_ = 0; |
252 | 267 |
253 heartbeat_manager_.Stop(); | 268 heartbeat_manager_.Stop(); |
254 | 269 |
255 // TODO(zea): expire all messages older than their TTL. | |
256 | |
257 // Add any pending acknowledgments to the list of ids. | 270 // Add any pending acknowledgments to the list of ids. |
258 for (StreamIdToPersistentIdMap::const_iterator iter = | 271 for (StreamIdToPersistentIdMap::const_iterator iter = |
259 unacked_server_ids_.begin(); | 272 unacked_server_ids_.begin(); |
260 iter != unacked_server_ids_.end(); ++iter) { | 273 iter != unacked_server_ids_.end(); ++iter) { |
261 restored_unackeds_server_ids_.push_back(iter->second); | 274 restored_unackeds_server_ids_.push_back(iter->second); |
262 } | 275 } |
263 unacked_server_ids_.clear(); | 276 unacked_server_ids_.clear(); |
264 | 277 |
265 // Any acknowledged server ids which have not been confirmed by the server | 278 // Any acknowledged server ids which have not been confirmed by the server |
266 // are treated like unacknowledged ids. | 279 // are treated like unacknowledged ids. |
(...skipping 16 matching lines...) Expand all Loading... |
283 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; | 296 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; |
284 restored_unackeds_server_ids_.clear(); | 297 restored_unackeds_server_ids_.clear(); |
285 | 298 |
286 // Push all unacknowledged messages to front of send queue. No need to save | 299 // Push all unacknowledged messages to front of send queue. No need to save |
287 // to RMQ, as all messages that reach this point should already have been | 300 // to RMQ, as all messages that reach this point should already have been |
288 // saved as necessary. | 301 // saved as necessary. |
289 while (!to_resend_.empty()) { | 302 while (!to_resend_.empty()) { |
290 to_send_.push_front(to_resend_.back()); | 303 to_send_.push_front(to_resend_.back()); |
291 to_resend_.pop_back(); | 304 to_resend_.pop_back(); |
292 } | 305 } |
| 306 |
| 307 // Drop all TTL == 0 or expired TTL messages from the queue. |
| 308 std::deque<MCSPacketInternal> new_to_send; |
| 309 std::vector<PersistentId> expired_ttl_ids; |
| 310 while (!to_send_.empty()) { |
| 311 MCSPacketInternal packet = to_send_.front(); |
| 312 to_send_.pop_front(); |
| 313 if (GetTTL(*packet->protobuf) > 0 && |
| 314 !HasTTLExpired(*packet->protobuf, clock_)) { |
| 315 new_to_send.push_back(packet); |
| 316 } else { |
| 317 // If the TTL was 0 there is no persistent id, so no need to remove the |
| 318 // message from the persistent store. |
| 319 if (!packet->persistent_id.empty()) |
| 320 expired_ttl_ids.push_back(packet->persistent_id); |
| 321 message_sent_callback_.Run("TTL expired"); |
| 322 } |
| 323 } |
| 324 |
| 325 if (!expired_ttl_ids.empty()) { |
| 326 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size() |
| 327 << " messages expired."; |
| 328 rmq_store_->RemoveOutgoingMessages( |
| 329 expired_ttl_ids, |
| 330 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 331 weak_ptr_factory_.GetWeakPtr())); |
| 332 } |
| 333 |
| 334 to_send_.swap(new_to_send); |
| 335 |
293 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() | 336 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
294 << " incoming acks pending, and " << to_send_.size() | 337 << " incoming acks pending, and " << to_send_.size() |
295 << " pending outgoing messages."; | 338 << " pending outgoing messages."; |
296 | 339 |
297 state_ = CONNECTING; | 340 state_ = CONNECTING; |
298 } | 341 } |
299 | 342 |
300 void MCSClient::SendHeartbeat() { | 343 void MCSClient::SendHeartbeat() { |
301 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), | 344 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); |
302 false); | |
303 } | 345 } |
304 | 346 |
305 void MCSClient::OnRMQUpdateFinished(bool success) { | 347 void MCSClient::OnRMQUpdateFinished(bool success) { |
306 LOG_IF(ERROR, !success) << "RMQ Update failed!"; | 348 LOG_IF(ERROR, !success) << "RMQ Update failed!"; |
307 // TODO(zea): Rebuild the store from scratch in case of persistence failure? | 349 // TODO(zea): Rebuild the store from scratch in case of persistence failure? |
308 } | 350 } |
309 | 351 |
310 void MCSClient::MaybeSendMessage() { | 352 void MCSClient::MaybeSendMessage() { |
311 if (to_send_.empty()) | 353 if (to_send_.empty()) |
312 return; | 354 return; |
313 | 355 |
314 if (!connection_handler_->CanSendMessage()) | 356 // If the connection has been reset, do nothing. On reconnection |
| 357 // MaybeSendMessage will be automatically invoked again. |
| 358 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
| 359 // than reconnect time. |
| 360 if (!connection_factory_->IsEndpointReachable()) |
315 return; | 361 return; |
316 | 362 |
317 // TODO(zea): drop messages older than their TTL. | |
318 | |
319 DVLOG(1) << "Pending output message found, sending."; | |
320 MCSPacketInternal packet = to_send_.front(); | 363 MCSPacketInternal packet = to_send_.front(); |
321 to_send_.pop_front(); | 364 to_send_.pop_front(); |
| 365 if (HasTTLExpired(*packet->protobuf, clock_)) { |
| 366 DCHECK(!packet->persistent_id.empty()); |
| 367 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
| 368 message_sent_callback_.Run("TTL expired for " + packet->persistent_id); |
| 369 rmq_store_->RemoveOutgoingMessage( |
| 370 packet->persistent_id, |
| 371 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 372 weak_ptr_factory_.GetWeakPtr())); |
| 373 base::MessageLoop::current()->PostTask( |
| 374 FROM_HERE, |
| 375 base::Bind(&MCSClient::MaybeSendMessage, |
| 376 weak_ptr_factory_.GetWeakPtr())); |
| 377 return; |
| 378 } |
| 379 DVLOG(1) << "Pending output message found, sending."; |
322 if (!packet->persistent_id.empty()) | 380 if (!packet->persistent_id.empty()) |
323 to_resend_.push_back(packet); | 381 to_resend_.push_back(packet); |
324 SendPacketToWire(packet.get()); | 382 SendPacketToWire(packet.get()); |
325 } | 383 } |
326 | 384 |
327 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { | 385 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { |
328 packet_info->stream_id = ++stream_id_out_; | 386 packet_info->stream_id = ++stream_id_out_; |
329 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); | 387 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); |
330 | 388 |
331 // Set the proper last received stream id to acknowledge received server | 389 // Set the proper last received stream id to acknowledge received server |
(...skipping 23 matching lines...) Expand all Loading... |
355 | 413 |
356 void MCSClient::HandleMCSDataMesssage( | 414 void MCSClient::HandleMCSDataMesssage( |
357 scoped_ptr<google::protobuf::MessageLite> protobuf) { | 415 scoped_ptr<google::protobuf::MessageLite> protobuf) { |
358 mcs_proto::DataMessageStanza* data_message = | 416 mcs_proto::DataMessageStanza* data_message = |
359 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); | 417 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); |
360 // TODO(zea): implement a proper status manager rather than hardcoding these | 418 // TODO(zea): implement a proper status manager rather than hardcoding these |
361 // values. | 419 // values. |
362 scoped_ptr<mcs_proto::DataMessageStanza> response( | 420 scoped_ptr<mcs_proto::DataMessageStanza> response( |
363 new mcs_proto::DataMessageStanza()); | 421 new mcs_proto::DataMessageStanza()); |
364 response->set_from(kGCMFromField); | 422 response->set_from(kGCMFromField); |
| 423 response->set_sent(clock_->Now().ToInternalValue() / |
| 424 base::Time::kMicrosecondsPerSecond); |
| 425 response->set_ttl(0); |
365 bool send = false; | 426 bool send = false; |
366 for (int i = 0; i < data_message->app_data_size(); ++i) { | 427 for (int i = 0; i < data_message->app_data_size(); ++i) { |
367 const mcs_proto::AppData& app_data = data_message->app_data(i); | 428 const mcs_proto::AppData& app_data = data_message->app_data(i); |
368 if (app_data.key() == kIdleNotification) { | 429 if (app_data.key() == kIdleNotification) { |
369 // Tell the MCS server the client is not idle. | 430 // Tell the MCS server the client is not idle. |
370 send = true; | 431 send = true; |
371 mcs_proto::AppData data; | 432 mcs_proto::AppData data; |
372 data.set_key(kIdleNotification); | 433 data.set_key(kIdleNotification); |
373 data.set_value("false"); | 434 data.set_value("false"); |
374 response->add_app_data()->CopyFrom(data); | 435 response->add_app_data()->CopyFrom(data); |
375 response->set_category(kMCSCategory); | 436 response->set_category(kMCSCategory); |
376 } | 437 } |
377 } | 438 } |
378 | 439 |
379 if (send) { | 440 if (send) { |
380 SendMessage( | 441 SendMessage( |
381 MCSMessage(kDataMessageStanzaTag, | 442 MCSMessage(kDataMessageStanzaTag, |
382 response.PassAs<const google::protobuf::MessageLite>()), | 443 response.PassAs<const google::protobuf::MessageLite>())); |
383 false); | |
384 } | 444 } |
385 } | 445 } |
386 | 446 |
387 void MCSClient::HandlePacketFromWire( | 447 void MCSClient::HandlePacketFromWire( |
388 scoped_ptr<google::protobuf::MessageLite> protobuf) { | 448 scoped_ptr<google::protobuf::MessageLite> protobuf) { |
389 if (!protobuf.get()) | 449 if (!protobuf.get()) |
390 return; | 450 return; |
391 uint8 tag = GetMCSProtoTag(*protobuf); | 451 uint8 tag = GetMCSProtoTag(*protobuf); |
392 PersistentId persistent_id = GetPersistentId(*protobuf); | 452 PersistentId persistent_id = GetPersistentId(*protobuf); |
393 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); | 453 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
427 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() | 487 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() |
428 << " with persistent id " | 488 << " with persistent id " |
429 << (persistent_id.empty() ? "NULL" : persistent_id) | 489 << (persistent_id.empty() ? "NULL" : persistent_id) |
430 << ", stream id " << stream_id_in_ << " and last stream id received " | 490 << ", stream id " << stream_id_in_ << " and last stream id received " |
431 << last_stream_id_received; | 491 << last_stream_id_received; |
432 | 492 |
433 if (unacked_server_ids_.size() > 0 && | 493 if (unacked_server_ids_.size() > 0 && |
434 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { | 494 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { |
435 SendMessage(MCSMessage(kIqStanzaTag, | 495 SendMessage(MCSMessage(kIqStanzaTag, |
436 BuildStreamAck(). | 496 BuildStreamAck(). |
437 PassAs<const google::protobuf::MessageLite>()), | 497 PassAs<const google::protobuf::MessageLite>())); |
438 false); | |
439 } | 498 } |
440 | 499 |
441 // The connection is alive, treat this message as a heartbeat ack. | 500 // The connection is alive, treat this message as a heartbeat ack. |
442 heartbeat_manager_.OnHeartbeatAcked(); | 501 heartbeat_manager_.OnHeartbeatAcked(); |
443 | 502 |
444 switch (tag) { | 503 switch (tag) { |
445 case kLoginResponseTag: { | 504 case kLoginResponseTag: { |
446 DCHECK_EQ(CONNECTING, state_); | 505 DCHECK_EQ(CONNECTING, state_); |
447 mcs_proto::LoginResponse* login_response = | 506 mcs_proto::LoginResponse* login_response = |
448 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); | 507 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
486 base::Bind(&MCSClient::SendHeartbeat, | 545 base::Bind(&MCSClient::SendHeartbeat, |
487 weak_ptr_factory_.GetWeakPtr()), | 546 weak_ptr_factory_.GetWeakPtr()), |
488 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, | 547 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, |
489 weak_ptr_factory_.GetWeakPtr())); | 548 weak_ptr_factory_.GetWeakPtr())); |
490 return; | 549 return; |
491 } | 550 } |
492 case kHeartbeatPingTag: | 551 case kHeartbeatPingTag: |
493 DCHECK_GE(stream_id_in_, 1U); | 552 DCHECK_GE(stream_id_in_, 1U); |
494 DVLOG(1) << "Received heartbeat ping, sending ack."; | 553 DVLOG(1) << "Received heartbeat ping, sending ack."; |
495 SendMessage( | 554 SendMessage( |
496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); | 555 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck())); |
497 return; | 556 return; |
498 case kHeartbeatAckTag: | 557 case kHeartbeatAckTag: |
499 DCHECK_GE(stream_id_in_, 1U); | 558 DCHECK_GE(stream_id_in_, 1U); |
500 DVLOG(1) << "Received heartbeat ack."; | 559 DVLOG(1) << "Received heartbeat ack."; |
501 // Do nothing else, all messages act as heartbeat acks. | 560 // Do nothing else, all messages act as heartbeat acks. |
502 return; | 561 return; |
503 case kCloseTag: | 562 case kCloseTag: |
504 LOG(ERROR) << "Received close command, resetting connection."; | 563 LOG(ERROR) << "Received close command, resetting connection."; |
505 state_ = LOADED; | 564 state_ = LOADED; |
506 connection_factory_->SignalConnectionReset(); | 565 connection_factory_->SignalConnectionReset(); |
(...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
654 | 713 |
655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 714 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 715 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
657 } | 716 } |
658 | 717 |
659 void MCSClient::OnConnectionResetByHeartbeat() { | 718 void MCSClient::OnConnectionResetByHeartbeat() { |
660 connection_factory_->SignalConnectionReset(); | 719 connection_factory_->SignalConnectionReset(); |
661 } | 720 } |
662 | 721 |
663 } // namespace gcm | 722 } // namespace gcm |
OLD | NEW |