| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
| 9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
| 10 #include "base/time/clock.h" |
| 11 #include "base/time/time.h" |
| 10 #include "google_apis/gcm/base/mcs_util.h" | 12 #include "google_apis/gcm/base/mcs_util.h" |
| 11 #include "google_apis/gcm/base/socket_stream.h" | 13 #include "google_apis/gcm/base/socket_stream.h" |
| 12 #include "google_apis/gcm/engine/connection_factory.h" | 14 #include "google_apis/gcm/engine/connection_factory.h" |
| 13 #include "google_apis/gcm/engine/rmq_store.h" | 15 #include "google_apis/gcm/engine/rmq_store.h" |
| 14 | 16 |
| 15 using namespace google::protobuf::io; | 17 using namespace google::protobuf::io; |
| 16 | 18 |
| 17 namespace gcm { | 19 namespace gcm { |
| 18 | 20 |
| 19 namespace { | 21 namespace { |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 76 | 78 |
| 77 // The protobuf of the message itself. | 79 // The protobuf of the message itself. |
| 78 MCSProto protobuf; | 80 MCSProto protobuf; |
| 79 }; | 81 }; |
| 80 | 82 |
| 81 ReliablePacketInfo::ReliablePacketInfo() | 83 ReliablePacketInfo::ReliablePacketInfo() |
| 82 : stream_id(0), tag(0) { | 84 : stream_id(0), tag(0) { |
| 83 } | 85 } |
| 84 ReliablePacketInfo::~ReliablePacketInfo() {} | 86 ReliablePacketInfo::~ReliablePacketInfo() {} |
| 85 | 87 |
| 86 MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) | 88 MCSClient::MCSClient(base::Clock* clock, |
| 87 : state_(UNINITIALIZED), | 89 ConnectionFactory* connection_factory, |
| 90 RMQStore* rmq_store) |
| 91 : clock_(clock), |
| 92 state_(UNINITIALIZED), |
| 88 android_id_(0), | 93 android_id_(0), |
| 89 security_token_(0), | 94 security_token_(0), |
| 90 connection_factory_(connection_factory), | 95 connection_factory_(connection_factory), |
| 91 connection_handler_(NULL), | 96 connection_handler_(NULL), |
| 92 last_device_to_server_stream_id_received_(0), | 97 last_device_to_server_stream_id_received_(0), |
| 93 last_server_to_device_stream_id_received_(0), | 98 last_server_to_device_stream_id_received_(0), |
| 94 stream_id_out_(0), | 99 stream_id_out_(0), |
| 95 stream_id_in_(0), | 100 stream_id_in_(0), |
| 96 rmq_store_(rmq_store), | 101 rmq_store_(rmq_store), |
| 97 weak_ptr_factory_(this) { | 102 weak_ptr_factory_(this) { |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 142 | 147 |
| 143 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() | 148 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() |
| 144 << " incoming acks pending and " | 149 << " incoming acks pending and " |
| 145 << load_result.outgoing_messages.size() | 150 << load_result.outgoing_messages.size() |
| 146 << " outgoing messages pending."; | 151 << " outgoing messages pending."; |
| 147 | 152 |
| 148 restored_unackeds_server_ids_ = load_result.incoming_messages; | 153 restored_unackeds_server_ids_ = load_result.incoming_messages; |
| 149 | 154 |
| 150 // First go through and order the outgoing messages by recency. | 155 // First go through and order the outgoing messages by recency. |
| 151 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; | 156 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; |
| 157 std::vector<PersistentId> expired_ttl_ids; |
| 152 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator | 158 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator |
| 153 iter = load_result.outgoing_messages.begin(); | 159 iter = load_result.outgoing_messages.begin(); |
| 154 iter != load_result.outgoing_messages.end(); ++iter) { | 160 iter != load_result.outgoing_messages.end(); ++iter) { |
| 155 uint64 timestamp = 0; | 161 uint64 timestamp = 0; |
| 156 if (!base::StringToUint64(iter->first, ×tamp)) { | 162 if (!base::StringToUint64(iter->first, ×tamp)) { |
| 157 LOG(ERROR) << "Invalid restored message."; | 163 LOG(ERROR) << "Invalid restored message."; |
| 158 return; | 164 return; |
| 159 } | 165 } |
| 166 |
| 167 // Check if the TTL has expired for this message. |
| 168 if (HasTTLExpired(*iter->second, clock_)) { |
| 169 expired_ttl_ids.push_back(iter->first); |
| 170 message_sent_callback_.Run("TTL expired for " + iter->first); |
| 171 delete iter->second; |
| 172 continue; |
| 173 } |
| 174 |
| 160 ordered_messages[timestamp] = iter->second; | 175 ordered_messages[timestamp] = iter->second; |
| 161 } | 176 } |
| 162 | 177 |
| 178 if (!expired_ttl_ids.empty()) { |
| 179 rmq_store_->RemoveOutgoingMessages( |
| 180 expired_ttl_ids, |
| 181 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 182 weak_ptr_factory_.GetWeakPtr())); |
| 183 } |
| 184 |
| 163 // Now go through and add the outgoing messages to the send queue in their | 185 // Now go through and add the outgoing messages to the send queue in their |
| 164 // appropriate order (oldest at front, most recent at back). | 186 // appropriate order (oldest at front, most recent at back). |
| 165 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator | 187 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator |
| 166 iter = ordered_messages.begin(); | 188 iter = ordered_messages.begin(); |
| 167 iter != ordered_messages.end(); ++iter) { | 189 iter != ordered_messages.end(); ++iter) { |
| 168 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 190 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| 169 packet_info->protobuf.reset(iter->second); | 191 packet_info->protobuf.reset(iter->second); |
| 170 packet_info->persistent_id = base::Uint64ToString(iter->first); | 192 packet_info->persistent_id = base::Uint64ToString(iter->first); |
| 171 to_send_.push_back(make_linked_ptr(packet_info)); | 193 to_send_.push_back(make_linked_ptr(packet_info)); |
| 172 } | 194 } |
| (...skipping 14 matching lines...) Expand all Loading... |
| 187 android_id_, | 209 android_id_, |
| 188 security_token_, | 210 security_token_, |
| 189 base::Bind(&MCSClient::OnRMQUpdateFinished, | 211 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 190 weak_ptr_factory_.GetWeakPtr())); | 212 weak_ptr_factory_.GetWeakPtr())); |
| 191 } | 213 } |
| 192 | 214 |
| 193 state_ = CONNECTING; | 215 state_ = CONNECTING; |
| 194 connection_factory_->Connect(); | 216 connection_factory_->Connect(); |
| 195 } | 217 } |
| 196 | 218 |
| 197 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { | 219 void MCSClient::SendMessage(const MCSMessage& message) { |
| 198 DCHECK_EQ(state_, CONNECTED); | 220 int ttl = GetTTL(message.GetProtobuf()); |
| 221 DCHECK_GE(ttl, 0); |
| 199 if (to_send_.size() > kMaxSendQueueSize) { | 222 if (to_send_.size() > kMaxSendQueueSize) { |
| 200 base::MessageLoop::current()->PostTask( | 223 message_sent_callback_.Run("Message queue full."); |
| 201 FROM_HERE, | |
| 202 base::Bind(message_sent_callback_, "Message queue full.")); | |
| 203 return; | 224 return; |
| 204 } | 225 } |
| 205 if (message.size() > kMaxMessageBytes) { | 226 if (message.size() > kMaxMessageBytes) { |
| 206 base::MessageLoop::current()->PostTask( | 227 message_sent_callback_.Run("Message too large."); |
| 207 FROM_HERE, | |
| 208 base::Bind(message_sent_callback_, "Message too large.")); | |
| 209 return; | 228 return; |
| 210 } | 229 } |
| 211 | 230 |
| 212 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 231 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| 213 packet_info->protobuf = message.CloneProtobuf(); | 232 packet_info->protobuf = message.CloneProtobuf(); |
| 214 | 233 |
| 215 if (use_rmq) { | 234 if (ttl > 0) { |
| 216 PersistentId persistent_id = GetNextPersistentId(); | 235 PersistentId persistent_id = GetNextPersistentId(); |
| 217 DVLOG(1) << "Setting persistent id to " << persistent_id; | 236 DVLOG(1) << "Setting persistent id to " << persistent_id; |
| 218 packet_info->persistent_id = persistent_id; | 237 packet_info->persistent_id = persistent_id; |
| 219 SetPersistentId(persistent_id, | 238 SetPersistentId(persistent_id, |
| 220 packet_info->protobuf.get()); | 239 packet_info->protobuf.get()); |
| 221 rmq_store_->AddOutgoingMessage(persistent_id, | 240 rmq_store_->AddOutgoingMessage(persistent_id, |
| 222 MCSMessage(message.tag(), | 241 MCSMessage(message.tag(), |
| 223 *(packet_info->protobuf)), | 242 *(packet_info->protobuf)), |
| 224 base::Bind(&MCSClient::OnRMQUpdateFinished, | 243 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 225 weak_ptr_factory_.GetWeakPtr())); | 244 weak_ptr_factory_.GetWeakPtr())); |
| 226 } else { | 245 } else if (!connection_factory_->IsEndpointReachable()) { |
| 227 // Check that there is an active connection to the endpoint. | 246 DVLOG(1) << "No active connection, dropping message."; |
| 228 if (!connection_handler_->CanSendMessage()) { | 247 message_sent_callback_.Run("TTL expired"); |
| 229 base::MessageLoop::current()->PostTask( | 248 return; |
| 230 FROM_HERE, | |
| 231 base::Bind(message_sent_callback_, "Unable to reach endpoint")); | |
| 232 return; | |
| 233 } | |
| 234 } | 249 } |
| 235 to_send_.push_back(make_linked_ptr(packet_info)); | 250 to_send_.push_back(make_linked_ptr(packet_info)); |
| 236 MaybeSendMessage(); | 251 MaybeSendMessage(); |
| 237 } | 252 } |
| 238 | 253 |
| 239 void MCSClient::Destroy() { | 254 void MCSClient::Destroy() { |
| 240 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, | 255 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 241 weak_ptr_factory_.GetWeakPtr())); | 256 weak_ptr_factory_.GetWeakPtr())); |
| 242 } | 257 } |
| 243 | 258 |
| 244 void MCSClient::ResetStateAndBuildLoginRequest( | 259 void MCSClient::ResetStateAndBuildLoginRequest( |
| 245 mcs_proto::LoginRequest* request) { | 260 mcs_proto::LoginRequest* request) { |
| 246 DCHECK(android_id_); | 261 DCHECK(android_id_); |
| 247 DCHECK(security_token_); | 262 DCHECK(security_token_); |
| 248 stream_id_in_ = 0; | 263 stream_id_in_ = 0; |
| 249 stream_id_out_ = 1; | 264 stream_id_out_ = 1; |
| 250 last_device_to_server_stream_id_received_ = 0; | 265 last_device_to_server_stream_id_received_ = 0; |
| 251 last_server_to_device_stream_id_received_ = 0; | 266 last_server_to_device_stream_id_received_ = 0; |
| 252 | 267 |
| 253 heartbeat_manager_.Stop(); | 268 heartbeat_manager_.Stop(); |
| 254 | 269 |
| 255 // TODO(zea): expire all messages older than their TTL. | |
| 256 | |
| 257 // Add any pending acknowledgments to the list of ids. | 270 // Add any pending acknowledgments to the list of ids. |
| 258 for (StreamIdToPersistentIdMap::const_iterator iter = | 271 for (StreamIdToPersistentIdMap::const_iterator iter = |
| 259 unacked_server_ids_.begin(); | 272 unacked_server_ids_.begin(); |
| 260 iter != unacked_server_ids_.end(); ++iter) { | 273 iter != unacked_server_ids_.end(); ++iter) { |
| 261 restored_unackeds_server_ids_.push_back(iter->second); | 274 restored_unackeds_server_ids_.push_back(iter->second); |
| 262 } | 275 } |
| 263 unacked_server_ids_.clear(); | 276 unacked_server_ids_.clear(); |
| 264 | 277 |
| 265 // Any acknowledged server ids which have not been confirmed by the server | 278 // Any acknowledged server ids which have not been confirmed by the server |
| 266 // are treated like unacknowledged ids. | 279 // are treated like unacknowledged ids. |
| (...skipping 16 matching lines...) Expand all Loading... |
| 283 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; | 296 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; |
| 284 restored_unackeds_server_ids_.clear(); | 297 restored_unackeds_server_ids_.clear(); |
| 285 | 298 |
| 286 // Push all unacknowledged messages to front of send queue. No need to save | 299 // Push all unacknowledged messages to front of send queue. No need to save |
| 287 // to RMQ, as all messages that reach this point should already have been | 300 // to RMQ, as all messages that reach this point should already have been |
| 288 // saved as necessary. | 301 // saved as necessary. |
| 289 while (!to_resend_.empty()) { | 302 while (!to_resend_.empty()) { |
| 290 to_send_.push_front(to_resend_.back()); | 303 to_send_.push_front(to_resend_.back()); |
| 291 to_resend_.pop_back(); | 304 to_resend_.pop_back(); |
| 292 } | 305 } |
| 306 |
| 307 // Drop all TTL == 0 or expired TTL messages from the queue. |
| 308 std::deque<MCSPacketInternal> new_to_send; |
| 309 std::vector<PersistentId> expired_ttl_ids; |
| 310 while (!to_send_.empty()) { |
| 311 MCSPacketInternal packet = to_send_.front(); |
| 312 to_send_.pop_front(); |
| 313 if (GetTTL(*packet->protobuf) > 0 && |
| 314 !HasTTLExpired(*packet->protobuf, clock_)) { |
| 315 new_to_send.push_back(packet); |
| 316 } else { |
| 317 // If the TTL was 0 there is no persistent id, so no need to remove the |
| 318 // message from the persistent store. |
| 319 if (!packet->persistent_id.empty()) |
| 320 expired_ttl_ids.push_back(packet->persistent_id); |
| 321 message_sent_callback_.Run("TTL expired"); |
| 322 } |
| 323 } |
| 324 |
| 325 if (!expired_ttl_ids.empty()) { |
| 326 DVLOG(1) << "Connection reset, " << expired_ttl_ids.size() |
| 327 << " messages expired."; |
| 328 rmq_store_->RemoveOutgoingMessages( |
| 329 expired_ttl_ids, |
| 330 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 331 weak_ptr_factory_.GetWeakPtr())); |
| 332 } |
| 333 |
| 334 to_send_.swap(new_to_send); |
| 335 |
| 293 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() | 336 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
| 294 << " incoming acks pending, and " << to_send_.size() | 337 << " incoming acks pending, and " << to_send_.size() |
| 295 << " pending outgoing messages."; | 338 << " pending outgoing messages."; |
| 296 | 339 |
| 297 state_ = CONNECTING; | 340 state_ = CONNECTING; |
| 298 } | 341 } |
| 299 | 342 |
| 300 void MCSClient::SendHeartbeat() { | 343 void MCSClient::SendHeartbeat() { |
| 301 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), | 344 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); |
| 302 false); | |
| 303 } | 345 } |
| 304 | 346 |
| 305 void MCSClient::OnRMQUpdateFinished(bool success) { | 347 void MCSClient::OnRMQUpdateFinished(bool success) { |
| 306 LOG_IF(ERROR, !success) << "RMQ Update failed!"; | 348 LOG_IF(ERROR, !success) << "RMQ Update failed!"; |
| 307 // TODO(zea): Rebuild the store from scratch in case of persistence failure? | 349 // TODO(zea): Rebuild the store from scratch in case of persistence failure? |
| 308 } | 350 } |
| 309 | 351 |
| 310 void MCSClient::MaybeSendMessage() { | 352 void MCSClient::MaybeSendMessage() { |
| 311 if (to_send_.empty()) | 353 if (to_send_.empty()) |
| 312 return; | 354 return; |
| 313 | 355 |
| 314 if (!connection_handler_->CanSendMessage()) | 356 // If the connection has been reset, do nothing. On reconnection |
| 357 // MaybeSendMessage will be automatically invoked again. |
| 358 // TODO(zea): consider doing TTL expiration at connection reset time, rather |
| 359 // than reconnect time. |
| 360 if (!connection_factory_->IsEndpointReachable()) |
| 315 return; | 361 return; |
| 316 | 362 |
| 317 // TODO(zea): drop messages older than their TTL. | |
| 318 | |
| 319 DVLOG(1) << "Pending output message found, sending."; | |
| 320 MCSPacketInternal packet = to_send_.front(); | 363 MCSPacketInternal packet = to_send_.front(); |
| 321 to_send_.pop_front(); | 364 to_send_.pop_front(); |
| 365 if (HasTTLExpired(*packet->protobuf, clock_)) { |
| 366 DCHECK(!packet->persistent_id.empty()); |
| 367 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; |
| 368 message_sent_callback_.Run("TTL expired for " + packet->persistent_id); |
| 369 rmq_store_->RemoveOutgoingMessage( |
| 370 packet->persistent_id, |
| 371 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 372 weak_ptr_factory_.GetWeakPtr())); |
| 373 base::MessageLoop::current()->PostTask( |
| 374 FROM_HERE, |
| 375 base::Bind(&MCSClient::MaybeSendMessage, |
| 376 weak_ptr_factory_.GetWeakPtr())); |
| 377 return; |
| 378 } |
| 379 DVLOG(1) << "Pending output message found, sending."; |
| 322 if (!packet->persistent_id.empty()) | 380 if (!packet->persistent_id.empty()) |
| 323 to_resend_.push_back(packet); | 381 to_resend_.push_back(packet); |
| 324 SendPacketToWire(packet.get()); | 382 SendPacketToWire(packet.get()); |
| 325 } | 383 } |
| 326 | 384 |
| 327 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { | 385 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { |
| 328 packet_info->stream_id = ++stream_id_out_; | 386 packet_info->stream_id = ++stream_id_out_; |
| 329 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); | 387 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); |
| 330 | 388 |
| 331 // Set the proper last received stream id to acknowledge received server | 389 // Set the proper last received stream id to acknowledge received server |
| (...skipping 23 matching lines...) Expand all Loading... |
| 355 | 413 |
| 356 void MCSClient::HandleMCSDataMesssage( | 414 void MCSClient::HandleMCSDataMesssage( |
| 357 scoped_ptr<google::protobuf::MessageLite> protobuf) { | 415 scoped_ptr<google::protobuf::MessageLite> protobuf) { |
| 358 mcs_proto::DataMessageStanza* data_message = | 416 mcs_proto::DataMessageStanza* data_message = |
| 359 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); | 417 reinterpret_cast<mcs_proto::DataMessageStanza*>(protobuf.get()); |
| 360 // TODO(zea): implement a proper status manager rather than hardcoding these | 418 // TODO(zea): implement a proper status manager rather than hardcoding these |
| 361 // values. | 419 // values. |
| 362 scoped_ptr<mcs_proto::DataMessageStanza> response( | 420 scoped_ptr<mcs_proto::DataMessageStanza> response( |
| 363 new mcs_proto::DataMessageStanza()); | 421 new mcs_proto::DataMessageStanza()); |
| 364 response->set_from(kGCMFromField); | 422 response->set_from(kGCMFromField); |
| 423 response->set_sent(clock_->Now().ToInternalValue() / |
| 424 base::Time::kMicrosecondsPerSecond); |
| 425 response->set_ttl(0); |
| 365 bool send = false; | 426 bool send = false; |
| 366 for (int i = 0; i < data_message->app_data_size(); ++i) { | 427 for (int i = 0; i < data_message->app_data_size(); ++i) { |
| 367 const mcs_proto::AppData& app_data = data_message->app_data(i); | 428 const mcs_proto::AppData& app_data = data_message->app_data(i); |
| 368 if (app_data.key() == kIdleNotification) { | 429 if (app_data.key() == kIdleNotification) { |
| 369 // Tell the MCS server the client is not idle. | 430 // Tell the MCS server the client is not idle. |
| 370 send = true; | 431 send = true; |
| 371 mcs_proto::AppData data; | 432 mcs_proto::AppData data; |
| 372 data.set_key(kIdleNotification); | 433 data.set_key(kIdleNotification); |
| 373 data.set_value("false"); | 434 data.set_value("false"); |
| 374 response->add_app_data()->CopyFrom(data); | 435 response->add_app_data()->CopyFrom(data); |
| 375 response->set_category(kMCSCategory); | 436 response->set_category(kMCSCategory); |
| 376 } | 437 } |
| 377 } | 438 } |
| 378 | 439 |
| 379 if (send) { | 440 if (send) { |
| 380 SendMessage( | 441 SendMessage( |
| 381 MCSMessage(kDataMessageStanzaTag, | 442 MCSMessage(kDataMessageStanzaTag, |
| 382 response.PassAs<const google::protobuf::MessageLite>()), | 443 response.PassAs<const google::protobuf::MessageLite>())); |
| 383 false); | |
| 384 } | 444 } |
| 385 } | 445 } |
| 386 | 446 |
| 387 void MCSClient::HandlePacketFromWire( | 447 void MCSClient::HandlePacketFromWire( |
| 388 scoped_ptr<google::protobuf::MessageLite> protobuf) { | 448 scoped_ptr<google::protobuf::MessageLite> protobuf) { |
| 389 if (!protobuf.get()) | 449 if (!protobuf.get()) |
| 390 return; | 450 return; |
| 391 uint8 tag = GetMCSProtoTag(*protobuf); | 451 uint8 tag = GetMCSProtoTag(*protobuf); |
| 392 PersistentId persistent_id = GetPersistentId(*protobuf); | 452 PersistentId persistent_id = GetPersistentId(*protobuf); |
| 393 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); | 453 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 427 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() | 487 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() |
| 428 << " with persistent id " | 488 << " with persistent id " |
| 429 << (persistent_id.empty() ? "NULL" : persistent_id) | 489 << (persistent_id.empty() ? "NULL" : persistent_id) |
| 430 << ", stream id " << stream_id_in_ << " and last stream id received " | 490 << ", stream id " << stream_id_in_ << " and last stream id received " |
| 431 << last_stream_id_received; | 491 << last_stream_id_received; |
| 432 | 492 |
| 433 if (unacked_server_ids_.size() > 0 && | 493 if (unacked_server_ids_.size() > 0 && |
| 434 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { | 494 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { |
| 435 SendMessage(MCSMessage(kIqStanzaTag, | 495 SendMessage(MCSMessage(kIqStanzaTag, |
| 436 BuildStreamAck(). | 496 BuildStreamAck(). |
| 437 PassAs<const google::protobuf::MessageLite>()), | 497 PassAs<const google::protobuf::MessageLite>())); |
| 438 false); | |
| 439 } | 498 } |
| 440 | 499 |
| 441 // The connection is alive, treat this message as a heartbeat ack. | 500 // The connection is alive, treat this message as a heartbeat ack. |
| 442 heartbeat_manager_.OnHeartbeatAcked(); | 501 heartbeat_manager_.OnHeartbeatAcked(); |
| 443 | 502 |
| 444 switch (tag) { | 503 switch (tag) { |
| 445 case kLoginResponseTag: { | 504 case kLoginResponseTag: { |
| 446 DCHECK_EQ(CONNECTING, state_); | 505 DCHECK_EQ(CONNECTING, state_); |
| 447 mcs_proto::LoginResponse* login_response = | 506 mcs_proto::LoginResponse* login_response = |
| 448 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); | 507 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 486 base::Bind(&MCSClient::SendHeartbeat, | 545 base::Bind(&MCSClient::SendHeartbeat, |
| 487 weak_ptr_factory_.GetWeakPtr()), | 546 weak_ptr_factory_.GetWeakPtr()), |
| 488 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, | 547 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, |
| 489 weak_ptr_factory_.GetWeakPtr())); | 548 weak_ptr_factory_.GetWeakPtr())); |
| 490 return; | 549 return; |
| 491 } | 550 } |
| 492 case kHeartbeatPingTag: | 551 case kHeartbeatPingTag: |
| 493 DCHECK_GE(stream_id_in_, 1U); | 552 DCHECK_GE(stream_id_in_, 1U); |
| 494 DVLOG(1) << "Received heartbeat ping, sending ack."; | 553 DVLOG(1) << "Received heartbeat ping, sending ack."; |
| 495 SendMessage( | 554 SendMessage( |
| 496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); | 555 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck())); |
| 497 return; | 556 return; |
| 498 case kHeartbeatAckTag: | 557 case kHeartbeatAckTag: |
| 499 DCHECK_GE(stream_id_in_, 1U); | 558 DCHECK_GE(stream_id_in_, 1U); |
| 500 DVLOG(1) << "Received heartbeat ack."; | 559 DVLOG(1) << "Received heartbeat ack."; |
| 501 // Do nothing else, all messages act as heartbeat acks. | 560 // Do nothing else, all messages act as heartbeat acks. |
| 502 return; | 561 return; |
| 503 case kCloseTag: | 562 case kCloseTag: |
| 504 LOG(ERROR) << "Received close command, resetting connection."; | 563 LOG(ERROR) << "Received close command, resetting connection."; |
| 505 state_ = LOADED; | 564 state_ = LOADED; |
| 506 connection_factory_->SignalConnectionReset(); | 565 connection_factory_->SignalConnectionReset(); |
| (...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 654 | 713 |
| 655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 714 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
| 656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 715 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
| 657 } | 716 } |
| 658 | 717 |
| 659 void MCSClient::OnConnectionResetByHeartbeat() { | 718 void MCSClient::OnConnectionResetByHeartbeat() { |
| 660 connection_factory_->SignalConnectionReset(); | 719 connection_factory_->SignalConnectionReset(); |
| 661 } | 720 } |
| 662 | 721 |
| 663 } // namespace gcm | 722 } // namespace gcm |
| OLD | NEW |