Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/mcs_client.h" | 5 #include "google_apis/gcm/engine/mcs_client.h" |
| 6 | 6 |
| 7 #include "base/basictypes.h" | 7 #include "base/basictypes.h" |
| 8 #include "base/message_loop/message_loop.h" | 8 #include "base/message_loop/message_loop.h" |
| 9 #include "base/strings/string_number_conversions.h" | 9 #include "base/strings/string_number_conversions.h" |
| 10 #include "base/time/clock.h" | |
| 11 #include "base/time/time.h" | |
| 10 #include "google_apis/gcm/base/mcs_util.h" | 12 #include "google_apis/gcm/base/mcs_util.h" |
| 11 #include "google_apis/gcm/base/socket_stream.h" | 13 #include "google_apis/gcm/base/socket_stream.h" |
| 12 #include "google_apis/gcm/engine/connection_factory.h" | 14 #include "google_apis/gcm/engine/connection_factory.h" |
| 13 #include "google_apis/gcm/engine/rmq_store.h" | 15 #include "google_apis/gcm/engine/rmq_store.h" |
| 14 | 16 |
| 15 using namespace google::protobuf::io; | 17 using namespace google::protobuf::io; |
| 16 | 18 |
| 17 namespace gcm { | 19 namespace gcm { |
| 18 | 20 |
| 19 namespace { | 21 namespace { |
| (...skipping 17 matching lines...) Expand all Loading... | |
| 37 // Applies to both incoming and outgoing messages. | 39 // Applies to both incoming and outgoing messages. |
| 38 // TODO(zea): make this server configurable. | 40 // TODO(zea): make this server configurable. |
| 39 const int kUnackedMessageBeforeStreamAck = 10; | 41 const int kUnackedMessageBeforeStreamAck = 10; |
| 40 | 42 |
| 41 // The global maximum number of pending messages to have in the send queue. | 43 // The global maximum number of pending messages to have in the send queue. |
| 42 const size_t kMaxSendQueueSize = 10 * 1024; | 44 const size_t kMaxSendQueueSize = 10 * 1024; |
| 43 | 45 |
| 44 // The maximum message size that can be sent to the server. | 46 // The maximum message size that can be sent to the server. |
| 45 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server. | 47 const int kMaxMessageBytes = 4 * 1024; // 4KB, like the server. |
| 46 | 48 |
| 49 // Maximum amount of time to save an unsent outgoing message for. | |
| 50 const int kMaxTTLSeconds = 4 * 7 * 24 * 60 * 60; // 4 weeks. | |
| 51 | |
| 47 // Helper for converting a proto persistent id list to a vector of strings. | 52 // Helper for converting a proto persistent id list to a vector of strings. |
| 48 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, | 53 bool BuildPersistentIdListFromProto(const google::protobuf::string& bytes, |
| 49 std::vector<std::string>* id_list) { | 54 std::vector<std::string>* id_list) { |
| 50 mcs_proto::SelectiveAck selective_ack; | 55 mcs_proto::SelectiveAck selective_ack; |
| 51 if (!selective_ack.ParseFromString(bytes)) | 56 if (!selective_ack.ParseFromString(bytes)) |
| 52 return false; | 57 return false; |
| 53 std::vector<std::string> new_list; | 58 std::vector<std::string> new_list; |
| 54 for (int i = 0; i < selective_ack.id_size(); ++i) { | 59 for (int i = 0; i < selective_ack.id_size(); ++i) { |
| 55 DCHECK(!selective_ack.id(i).empty()); | 60 DCHECK(!selective_ack.id(i).empty()); |
| 56 new_list.push_back(selective_ack.id(i)); | 61 new_list.push_back(selective_ack.id(i)); |
| (...skipping 19 matching lines...) Expand all Loading... | |
| 76 | 81 |
| 77 // The protobuf of the message itself. | 82 // The protobuf of the message itself. |
| 78 MCSProto protobuf; | 83 MCSProto protobuf; |
| 79 }; | 84 }; |
| 80 | 85 |
| 81 ReliablePacketInfo::ReliablePacketInfo() | 86 ReliablePacketInfo::ReliablePacketInfo() |
| 82 : stream_id(0), tag(0) { | 87 : stream_id(0), tag(0) { |
| 83 } | 88 } |
| 84 ReliablePacketInfo::~ReliablePacketInfo() {} | 89 ReliablePacketInfo::~ReliablePacketInfo() {} |
| 85 | 90 |
| 86 MCSClient::MCSClient(ConnectionFactory* connection_factory, RMQStore* rmq_store) | 91 MCSClient::MCSClient(base::Clock* clock, |
| 87 : state_(UNINITIALIZED), | 92 ConnectionFactory* connection_factory, |
| 93 RMQStore* rmq_store) | |
| 94 : clock_(clock), | |
| 95 state_(UNINITIALIZED), | |
| 88 android_id_(0), | 96 android_id_(0), |
| 89 security_token_(0), | 97 security_token_(0), |
| 90 connection_factory_(connection_factory), | 98 connection_factory_(connection_factory), |
| 91 connection_handler_(NULL), | 99 connection_handler_(NULL), |
| 92 last_device_to_server_stream_id_received_(0), | 100 last_device_to_server_stream_id_received_(0), |
| 93 last_server_to_device_stream_id_received_(0), | 101 last_server_to_device_stream_id_received_(0), |
| 94 stream_id_out_(0), | 102 stream_id_out_(0), |
| 95 stream_id_in_(0), | 103 stream_id_in_(0), |
| 96 rmq_store_(rmq_store), | 104 rmq_store_(rmq_store), |
| 97 weak_ptr_factory_(this) { | 105 weak_ptr_factory_(this) { |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 142 | 150 |
| 143 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() | 151 DVLOG(1) << "RMQ Load finished with " << load_result.incoming_messages.size() |
| 144 << " incoming acks pending and " | 152 << " incoming acks pending and " |
| 145 << load_result.outgoing_messages.size() | 153 << load_result.outgoing_messages.size() |
| 146 << " outgoing messages pending."; | 154 << " outgoing messages pending."; |
| 147 | 155 |
| 148 restored_unackeds_server_ids_ = load_result.incoming_messages; | 156 restored_unackeds_server_ids_ = load_result.incoming_messages; |
| 149 | 157 |
| 150 // First go through and order the outgoing messages by recency. | 158 // First go through and order the outgoing messages by recency. |
| 151 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; | 159 std::map<uint64, google::protobuf::MessageLite*> ordered_messages; |
| 160 std::vector<PersistentId> dropped_ids; | |
| 152 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator | 161 for (std::map<PersistentId, google::protobuf::MessageLite*>::const_iterator |
| 153 iter = load_result.outgoing_messages.begin(); | 162 iter = load_result.outgoing_messages.begin(); |
| 154 iter != load_result.outgoing_messages.end(); ++iter) { | 163 iter != load_result.outgoing_messages.end(); ++iter) { |
| 155 uint64 timestamp = 0; | 164 uint64 timestamp = 0; |
| 156 if (!base::StringToUint64(iter->first, ×tamp)) { | 165 if (!base::StringToUint64(iter->first, ×tamp)) { |
| 157 LOG(ERROR) << "Invalid restored message."; | 166 LOG(ERROR) << "Invalid restored message."; |
| 158 return; | 167 return; |
| 159 } | 168 } |
| 169 | |
| 170 // Check if the TTL has expired for this message. | |
| 171 if (HasTTLExpired(*iter->second, clock_)) { | |
| 172 dropped_ids.push_back(iter->first); | |
| 173 message_sent_callback_.Run("TTL expired for " + iter->first); | |
| 174 delete iter->second; | |
| 175 continue; | |
| 176 } | |
| 177 | |
| 160 ordered_messages[timestamp] = iter->second; | 178 ordered_messages[timestamp] = iter->second; |
| 161 } | 179 } |
| 162 | 180 |
| 181 if (!dropped_ids.empty()) { | |
| 182 rmq_store_->RemoveOutgoingMessages( | |
| 183 dropped_ids, | |
| 184 base::Bind(&MCSClient::OnRMQUpdateFinished, | |
| 185 weak_ptr_factory_.GetWeakPtr())); | |
| 186 } | |
| 187 | |
| 163 // Now go through and add the outgoing messages to the send queue in their | 188 // Now go through and add the outgoing messages to the send queue in their |
| 164 // appropriate order (oldest at front, most recent at back). | 189 // appropriate order (oldest at front, most recent at back). |
| 165 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator | 190 for (std::map<uint64, google::protobuf::MessageLite*>::const_iterator |
| 166 iter = ordered_messages.begin(); | 191 iter = ordered_messages.begin(); |
| 167 iter != ordered_messages.end(); ++iter) { | 192 iter != ordered_messages.end(); ++iter) { |
| 168 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 193 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| 169 packet_info->protobuf.reset(iter->second); | 194 packet_info->protobuf.reset(iter->second); |
| 170 packet_info->persistent_id = base::Uint64ToString(iter->first); | 195 packet_info->persistent_id = base::Uint64ToString(iter->first); |
| 171 to_send_.push_back(make_linked_ptr(packet_info)); | 196 to_send_.push_back(make_linked_ptr(packet_info)); |
| 172 } | 197 } |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 187 android_id_, | 212 android_id_, |
| 188 security_token_, | 213 security_token_, |
| 189 base::Bind(&MCSClient::OnRMQUpdateFinished, | 214 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 190 weak_ptr_factory_.GetWeakPtr())); | 215 weak_ptr_factory_.GetWeakPtr())); |
| 191 } | 216 } |
| 192 | 217 |
| 193 state_ = CONNECTING; | 218 state_ = CONNECTING; |
| 194 connection_factory_->Connect(); | 219 connection_factory_->Connect(); |
| 195 } | 220 } |
| 196 | 221 |
| 197 void MCSClient::SendMessage(const MCSMessage& message, bool use_rmq) { | 222 void MCSClient::SendMessage(const MCSMessage& message) { |
| 198 DCHECK_EQ(state_, CONNECTED); | 223 int ttl = GetTTL(message.GetProtobuf()); |
| 224 DCHECK_GE(ttl, 0); | |
| 225 DCHECK_LE(ttl, kMaxTTLSeconds); | |
| 199 if (to_send_.size() > kMaxSendQueueSize) { | 226 if (to_send_.size() > kMaxSendQueueSize) { |
| 200 base::MessageLoop::current()->PostTask( | 227 message_sent_callback_.Run("Message queue full."); |
| 201 FROM_HERE, | |
| 202 base::Bind(message_sent_callback_, "Message queue full.")); | |
| 203 return; | 228 return; |
| 204 } | 229 } |
| 205 if (message.size() > kMaxMessageBytes) { | 230 if (message.size() > kMaxMessageBytes) { |
| 206 base::MessageLoop::current()->PostTask( | 231 message_sent_callback_.Run("Message too large."); |
| 207 FROM_HERE, | |
| 208 base::Bind(message_sent_callback_, "Message too large.")); | |
| 209 return; | 232 return; |
| 210 } | 233 } |
| 211 | 234 |
| 212 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); | 235 ReliablePacketInfo* packet_info = new ReliablePacketInfo(); |
| 213 packet_info->protobuf = message.CloneProtobuf(); | 236 packet_info->protobuf = message.CloneProtobuf(); |
| 214 | 237 |
| 215 if (use_rmq) { | 238 if (ttl > 0) { |
| 216 PersistentId persistent_id = GetNextPersistentId(); | 239 PersistentId persistent_id = GetNextPersistentId(); |
| 217 DVLOG(1) << "Setting persistent id to " << persistent_id; | 240 DVLOG(1) << "Setting persistent id to " << persistent_id; |
| 218 packet_info->persistent_id = persistent_id; | 241 packet_info->persistent_id = persistent_id; |
| 219 SetPersistentId(persistent_id, | 242 SetPersistentId(persistent_id, |
| 220 packet_info->protobuf.get()); | 243 packet_info->protobuf.get()); |
| 221 rmq_store_->AddOutgoingMessage(persistent_id, | 244 rmq_store_->AddOutgoingMessage(persistent_id, |
| 222 MCSMessage(message.tag(), | 245 MCSMessage(message.tag(), |
| 223 *(packet_info->protobuf)), | 246 *(packet_info->protobuf)), |
| 224 base::Bind(&MCSClient::OnRMQUpdateFinished, | 247 base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 225 weak_ptr_factory_.GetWeakPtr())); | 248 weak_ptr_factory_.GetWeakPtr())); |
| 226 } else { | 249 } else if (!connection_factory_->IsEndpointReachable()) { |
| 227 // Check that there is an active connection to the endpoint. | 250 DVLOG(1) << "No active connection, dropping message."; |
| 228 if (!connection_handler_->CanSendMessage()) { | 251 message_sent_callback_.Run("TTL expired"); |
| 229 base::MessageLoop::current()->PostTask( | 252 return; |
| 230 FROM_HERE, | |
| 231 base::Bind(message_sent_callback_, "Unable to reach endpoint")); | |
| 232 return; | |
| 233 } | |
| 234 } | 253 } |
| 235 to_send_.push_back(make_linked_ptr(packet_info)); | 254 to_send_.push_back(make_linked_ptr(packet_info)); |
| 236 MaybeSendMessage(); | 255 MaybeSendMessage(); |
| 237 } | 256 } |
| 238 | 257 |
| 239 void MCSClient::Destroy() { | 258 void MCSClient::Destroy() { |
| 240 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, | 259 rmq_store_->Destroy(base::Bind(&MCSClient::OnRMQUpdateFinished, |
| 241 weak_ptr_factory_.GetWeakPtr())); | 260 weak_ptr_factory_.GetWeakPtr())); |
| 242 } | 261 } |
| 243 | 262 |
| 244 void MCSClient::ResetStateAndBuildLoginRequest( | 263 void MCSClient::ResetStateAndBuildLoginRequest( |
| 245 mcs_proto::LoginRequest* request) { | 264 mcs_proto::LoginRequest* request) { |
| 246 DCHECK(android_id_); | 265 DCHECK(android_id_); |
| 247 DCHECK(security_token_); | 266 DCHECK(security_token_); |
| 248 stream_id_in_ = 0; | 267 stream_id_in_ = 0; |
| 249 stream_id_out_ = 1; | 268 stream_id_out_ = 1; |
| 250 last_device_to_server_stream_id_received_ = 0; | 269 last_device_to_server_stream_id_received_ = 0; |
| 251 last_server_to_device_stream_id_received_ = 0; | 270 last_server_to_device_stream_id_received_ = 0; |
| 252 | 271 |
| 253 heartbeat_manager_.Stop(); | 272 heartbeat_manager_.Stop(); |
| 254 | 273 |
| 255 // TODO(zea): expire all messages older than their TTL. | |
| 256 | |
| 257 // Add any pending acknowledgments to the list of ids. | 274 // Add any pending acknowledgments to the list of ids. |
| 258 for (StreamIdToPersistentIdMap::const_iterator iter = | 275 for (StreamIdToPersistentIdMap::const_iterator iter = |
| 259 unacked_server_ids_.begin(); | 276 unacked_server_ids_.begin(); |
| 260 iter != unacked_server_ids_.end(); ++iter) { | 277 iter != unacked_server_ids_.end(); ++iter) { |
| 261 restored_unackeds_server_ids_.push_back(iter->second); | 278 restored_unackeds_server_ids_.push_back(iter->second); |
| 262 } | 279 } |
| 263 unacked_server_ids_.clear(); | 280 unacked_server_ids_.clear(); |
| 264 | 281 |
| 265 // Any acknowledged server ids which have not been confirmed by the server | 282 // Any acknowledged server ids which have not been confirmed by the server |
| 266 // are treated like unacknowledged ids. | 283 // are treated like unacknowledged ids. |
| (...skipping 16 matching lines...) Expand all Loading... | |
| 283 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; | 300 acked_server_ids_[stream_id_out_] = restored_unackeds_server_ids_; |
| 284 restored_unackeds_server_ids_.clear(); | 301 restored_unackeds_server_ids_.clear(); |
| 285 | 302 |
| 286 // Push all unacknowledged messages to front of send queue. No need to save | 303 // Push all unacknowledged messages to front of send queue. No need to save |
| 287 // to RMQ, as all messages that reach this point should already have been | 304 // to RMQ, as all messages that reach this point should already have been |
| 288 // saved as necessary. | 305 // saved as necessary. |
| 289 while (!to_resend_.empty()) { | 306 while (!to_resend_.empty()) { |
| 290 to_send_.push_front(to_resend_.back()); | 307 to_send_.push_front(to_resend_.back()); |
| 291 to_resend_.pop_back(); | 308 to_resend_.pop_back(); |
| 292 } | 309 } |
| 310 | |
| 311 // Drop all TTL == 0 or expired TTL messages from the queue. | |
| 312 std::deque<MCSPacketInternal> new_to_send; | |
| 313 std::vector<PersistentId> dropped_ids; | |
| 314 while (!to_send_.empty()) { | |
| 315 MCSPacketInternal packet = to_send_.front(); | |
| 316 to_send_.pop_front(); | |
| 317 if (GetTTL(*packet->protobuf) > 0 && | |
| 318 !HasTTLExpired(*packet->protobuf, clock_)) { | |
| 319 new_to_send.push_back(packet); | |
| 320 } else { | |
| 321 if (!packet->persistent_id.empty()) | |
| 322 dropped_ids.push_back(packet->persistent_id); | |
|
jianli
2014/01/02 18:52:58
If TTL is 0, we do not give a persisten ID as in S
Nicolas Zea
2014/01/02 21:39:08
Right, but those ids weren't stored in the RMQStor
jianli
2014/01/02 21:51:31
But the comment at line 311 says: Drop all TTL ==
Nicolas Zea
2014/01/02 21:56:13
We are handling the case by removing those message
| |
| 323 message_sent_callback_.Run("TTL expired"); | |
| 324 } | |
| 325 } | |
| 326 | |
| 327 if (!dropped_ids.empty()) { | |
| 328 DVLOG(1) << "Connection reset, " << dropped_ids.size() | |
| 329 << " messages expired."; | |
| 330 rmq_store_->RemoveOutgoingMessages( | |
| 331 dropped_ids, | |
| 332 base::Bind(&MCSClient::OnRMQUpdateFinished, | |
| 333 weak_ptr_factory_.GetWeakPtr())); | |
| 334 } | |
| 335 | |
| 336 to_send_.swap(new_to_send); | |
| 337 | |
| 293 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() | 338 DVLOG(1) << "Resetting state, with " << request->received_persistent_id_size() |
| 294 << " incoming acks pending, and " << to_send_.size() | 339 << " incoming acks pending, and " << to_send_.size() |
| 295 << " pending outgoing messages."; | 340 << " pending outgoing messages."; |
| 296 | 341 |
| 297 state_ = CONNECTING; | 342 state_ = CONNECTING; |
| 298 } | 343 } |
| 299 | 344 |
| 300 void MCSClient::SendHeartbeat() { | 345 void MCSClient::SendHeartbeat() { |
| 301 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing()), | 346 SendMessage(MCSMessage(kHeartbeatPingTag, mcs_proto::HeartbeatPing())); |
| 302 false); | |
| 303 } | 347 } |
| 304 | 348 |
| 305 void MCSClient::OnRMQUpdateFinished(bool success) { | 349 void MCSClient::OnRMQUpdateFinished(bool success) { |
| 306 LOG_IF(ERROR, !success) << "RMQ Update failed!"; | 350 LOG_IF(ERROR, !success) << "RMQ Update failed!"; |
| 307 // TODO(zea): Rebuild the store from scratch in case of persistence failure? | 351 // TODO(zea): Rebuild the store from scratch in case of persistence failure? |
| 308 } | 352 } |
| 309 | 353 |
| 310 void MCSClient::MaybeSendMessage() { | 354 void MCSClient::MaybeSendMessage() { |
| 311 if (to_send_.empty()) | 355 if (to_send_.empty()) |
| 312 return; | 356 return; |
| 313 | 357 |
| 314 if (!connection_handler_->CanSendMessage()) | 358 // If the connection has been reset, do nothing. On reconnection |
| 359 // MaybeSendMessage will be automatically invoked again. | |
| 360 // TODO(zea): consider doing TTL expiration at connection reset time, rather | |
| 361 // than reconnect time. | |
| 362 if (!connection_factory_->IsEndpointReachable()) | |
| 315 return; | 363 return; |
| 316 | 364 |
| 317 // TODO(zea): drop messages older than their TTL. | |
| 318 | |
| 319 DVLOG(1) << "Pending output message found, sending."; | |
| 320 MCSPacketInternal packet = to_send_.front(); | 365 MCSPacketInternal packet = to_send_.front(); |
| 321 to_send_.pop_front(); | 366 to_send_.pop_front(); |
| 367 if (HasTTLExpired(*packet->protobuf, clock_)) { | |
| 368 DCHECK(packet->persistent_id.empty()); | |
| 369 DVLOG(1) << "Dropping expired message " << packet->persistent_id << "."; | |
| 370 message_sent_callback_.Run("TTL expired for " + packet->persistent_id); | |
| 371 rmq_store_->RemoveOutgoingMessage( | |
| 372 packet->persistent_id, | |
| 373 base::Bind(&MCSClient::OnRMQUpdateFinished, | |
| 374 weak_ptr_factory_.GetWeakPtr())); | |
| 375 base::MessageLoop::current()->PostTask( | |
| 376 FROM_HERE, | |
| 377 base::Bind(&MCSClient::MaybeSendMessage, | |
| 378 weak_ptr_factory_.GetWeakPtr())); | |
| 379 return; | |
| 380 } | |
| 381 DVLOG(1) << "Pending output message found, sending."; | |
| 322 if (!packet->persistent_id.empty()) | 382 if (!packet->persistent_id.empty()) |
| 323 to_resend_.push_back(packet); | 383 to_resend_.push_back(packet); |
| 324 SendPacketToWire(packet.get()); | 384 SendPacketToWire(packet.get()); |
| 325 } | 385 } |
| 326 | 386 |
| 327 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { | 387 void MCSClient::SendPacketToWire(ReliablePacketInfo* packet_info) { |
| 328 packet_info->stream_id = ++stream_id_out_; | 388 packet_info->stream_id = ++stream_id_out_; |
| 329 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); | 389 DVLOG(1) << "Sending packet of type " << packet_info->protobuf->GetTypeName(); |
| 330 | 390 |
| 331 // Set the proper last received stream id to acknowledge received server | 391 // Set the proper last received stream id to acknowledge received server |
| (...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 372 data.set_key(kIdleNotification); | 432 data.set_key(kIdleNotification); |
| 373 data.set_value("false"); | 433 data.set_value("false"); |
| 374 response->add_app_data()->CopyFrom(data); | 434 response->add_app_data()->CopyFrom(data); |
| 375 response->set_category(kMCSCategory); | 435 response->set_category(kMCSCategory); |
| 376 } | 436 } |
| 377 } | 437 } |
| 378 | 438 |
| 379 if (send) { | 439 if (send) { |
| 380 SendMessage( | 440 SendMessage( |
| 381 MCSMessage(kDataMessageStanzaTag, | 441 MCSMessage(kDataMessageStanzaTag, |
| 382 response.PassAs<const google::protobuf::MessageLite>()), | 442 response.PassAs<const google::protobuf::MessageLite>())); |
| 383 false); | |
| 384 } | 443 } |
| 385 } | 444 } |
| 386 | 445 |
| 387 void MCSClient::HandlePacketFromWire( | 446 void MCSClient::HandlePacketFromWire( |
| 388 scoped_ptr<google::protobuf::MessageLite> protobuf) { | 447 scoped_ptr<google::protobuf::MessageLite> protobuf) { |
| 389 if (!protobuf.get()) | 448 if (!protobuf.get()) |
| 390 return; | 449 return; |
| 391 uint8 tag = GetMCSProtoTag(*protobuf); | 450 uint8 tag = GetMCSProtoTag(*protobuf); |
| 392 PersistentId persistent_id = GetPersistentId(*protobuf); | 451 PersistentId persistent_id = GetPersistentId(*protobuf); |
| 393 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); | 452 StreamId last_stream_id_received = GetLastStreamIdReceived(*protobuf); |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 427 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() | 486 DVLOG(1) << "Received message of type " << protobuf->GetTypeName() |
| 428 << " with persistent id " | 487 << " with persistent id " |
| 429 << (persistent_id.empty() ? "NULL" : persistent_id) | 488 << (persistent_id.empty() ? "NULL" : persistent_id) |
| 430 << ", stream id " << stream_id_in_ << " and last stream id received " | 489 << ", stream id " << stream_id_in_ << " and last stream id received " |
| 431 << last_stream_id_received; | 490 << last_stream_id_received; |
| 432 | 491 |
| 433 if (unacked_server_ids_.size() > 0 && | 492 if (unacked_server_ids_.size() > 0 && |
| 434 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { | 493 unacked_server_ids_.size() % kUnackedMessageBeforeStreamAck == 0) { |
| 435 SendMessage(MCSMessage(kIqStanzaTag, | 494 SendMessage(MCSMessage(kIqStanzaTag, |
| 436 BuildStreamAck(). | 495 BuildStreamAck(). |
| 437 PassAs<const google::protobuf::MessageLite>()), | 496 PassAs<const google::protobuf::MessageLite>())); |
| 438 false); | |
| 439 } | 497 } |
| 440 | 498 |
| 441 // The connection is alive, treat this message as a heartbeat ack. | 499 // The connection is alive, treat this message as a heartbeat ack. |
| 442 heartbeat_manager_.OnHeartbeatAcked(); | 500 heartbeat_manager_.OnHeartbeatAcked(); |
| 443 | 501 |
| 444 switch (tag) { | 502 switch (tag) { |
| 445 case kLoginResponseTag: { | 503 case kLoginResponseTag: { |
| 446 DCHECK_EQ(CONNECTING, state_); | 504 DCHECK_EQ(CONNECTING, state_); |
| 447 mcs_proto::LoginResponse* login_response = | 505 mcs_proto::LoginResponse* login_response = |
| 448 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); | 506 reinterpret_cast<mcs_proto::LoginResponse*>(protobuf.get()); |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 486 base::Bind(&MCSClient::SendHeartbeat, | 544 base::Bind(&MCSClient::SendHeartbeat, |
| 487 weak_ptr_factory_.GetWeakPtr()), | 545 weak_ptr_factory_.GetWeakPtr()), |
| 488 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, | 546 base::Bind(&MCSClient::OnConnectionResetByHeartbeat, |
| 489 weak_ptr_factory_.GetWeakPtr())); | 547 weak_ptr_factory_.GetWeakPtr())); |
| 490 return; | 548 return; |
| 491 } | 549 } |
| 492 case kHeartbeatPingTag: | 550 case kHeartbeatPingTag: |
| 493 DCHECK_GE(stream_id_in_, 1U); | 551 DCHECK_GE(stream_id_in_, 1U); |
| 494 DVLOG(1) << "Received heartbeat ping, sending ack."; | 552 DVLOG(1) << "Received heartbeat ping, sending ack."; |
| 495 SendMessage( | 553 SendMessage( |
| 496 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck()), false); | 554 MCSMessage(kHeartbeatAckTag, mcs_proto::HeartbeatAck())); |
| 497 return; | 555 return; |
| 498 case kHeartbeatAckTag: | 556 case kHeartbeatAckTag: |
| 499 DCHECK_GE(stream_id_in_, 1U); | 557 DCHECK_GE(stream_id_in_, 1U); |
| 500 DVLOG(1) << "Received heartbeat ack."; | 558 DVLOG(1) << "Received heartbeat ack."; |
| 501 // Do nothing else, all messages act as heartbeat acks. | 559 // Do nothing else, all messages act as heartbeat acks. |
| 502 return; | 560 return; |
| 503 case kCloseTag: | 561 case kCloseTag: |
| 504 LOG(ERROR) << "Received close command, resetting connection."; | 562 LOG(ERROR) << "Received close command, resetting connection."; |
| 505 state_ = LOADED; | 563 state_ = LOADED; |
| 506 connection_factory_->SignalConnectionReset(); | 564 connection_factory_->SignalConnectionReset(); |
| (...skipping 147 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 654 | 712 |
| 655 MCSClient::PersistentId MCSClient::GetNextPersistentId() { | 713 MCSClient::PersistentId MCSClient::GetNextPersistentId() { |
| 656 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); | 714 return base::Uint64ToString(base::TimeTicks::Now().ToInternalValue()); |
| 657 } | 715 } |
| 658 | 716 |
| 659 void MCSClient::OnConnectionResetByHeartbeat() { | 717 void MCSClient::OnConnectionResetByHeartbeat() { |
| 660 connection_factory_->SignalConnectionReset(); | 718 connection_factory_->SignalConnectionReset(); |
| 661 } | 719 } |
| 662 | 720 |
| 663 } // namespace gcm | 721 } // namespace gcm |
| OLD | NEW |