| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/connection_handler_impl.h" | 5 #include "google_apis/gcm/engine/connection_handler_impl.h" |
| 6 | 6 |
| 7 #include "base/message_loop/message_loop.h" | 7 #include "base/location.h" |
| 8 #include "base/thread_task_runner_handle.h" |
| 8 #include "google/protobuf/io/coded_stream.h" | 9 #include "google/protobuf/io/coded_stream.h" |
| 9 #include "google/protobuf/io/zero_copy_stream_impl_lite.h" | 10 #include "google/protobuf/io/zero_copy_stream_impl_lite.h" |
| 10 #include "google_apis/gcm/base/mcs_util.h" | 11 #include "google_apis/gcm/base/mcs_util.h" |
| 11 #include "google_apis/gcm/base/socket_stream.h" | 12 #include "google_apis/gcm/base/socket_stream.h" |
| 12 #include "google_apis/gcm/protocol/mcs.pb.h" | 13 #include "google_apis/gcm/protocol/mcs.pb.h" |
| 13 #include "net/base/net_errors.h" | 14 #include "net/base/net_errors.h" |
| 14 #include "net/socket/stream_socket.h" | 15 #include "net/socket/stream_socket.h" |
| 15 | 16 |
| 16 using namespace google::protobuf::io; | 17 using namespace google::protobuf::io; |
| 17 | 18 |
| (...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 122 CodedOutputStream coded_output_stream(output_stream_.get()); | 123 CodedOutputStream coded_output_stream(output_stream_.get()); |
| 123 coded_output_stream.WriteRaw(version_byte, 1); | 124 coded_output_stream.WriteRaw(version_byte, 1); |
| 124 coded_output_stream.WriteRaw(login_request_tag, 1); | 125 coded_output_stream.WriteRaw(login_request_tag, 1); |
| 125 coded_output_stream.WriteVarint32(login_request.ByteSize()); | 126 coded_output_stream.WriteVarint32(login_request.ByteSize()); |
| 126 login_request.SerializeToCodedStream(&coded_output_stream); | 127 login_request.SerializeToCodedStream(&coded_output_stream); |
| 127 } | 128 } |
| 128 | 129 |
| 129 if (output_stream_->Flush( | 130 if (output_stream_->Flush( |
| 130 base::Bind(&ConnectionHandlerImpl::OnMessageSent, | 131 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
| 131 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { | 132 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { |
| 132 base::MessageLoop::current()->PostTask( | 133 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 133 FROM_HERE, | 134 FROM_HERE, |
| 134 base::Bind(&ConnectionHandlerImpl::OnMessageSent, | 135 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
| 135 weak_ptr_factory_.GetWeakPtr())); | 136 weak_ptr_factory_.GetWeakPtr())); |
| 136 } | 137 } |
| 137 | 138 |
| 138 read_timeout_timer_.Start(FROM_HERE, | 139 read_timeout_timer_.Start(FROM_HERE, |
| 139 read_timeout_, | 140 read_timeout_, |
| 140 base::Bind(&ConnectionHandlerImpl::OnTimeout, | 141 base::Bind(&ConnectionHandlerImpl::OnTimeout, |
| 141 weak_ptr_factory_.GetWeakPtr())); | 142 weak_ptr_factory_.GetWeakPtr())); |
| 142 WaitForData(MCS_VERSION_TAG_AND_SIZE); | 143 WaitForData(MCS_VERSION_TAG_AND_SIZE); |
| (...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 252 connection_callback_.Run(last_error); | 253 connection_callback_.Run(last_error); |
| 253 return; | 254 return; |
| 254 } | 255 } |
| 255 | 256 |
| 256 // Check whether read is complete, or needs to be continued ( | 257 // Check whether read is complete, or needs to be continued ( |
| 257 // SocketInputStream::Refresh can finish without reading all the data). | 258 // SocketInputStream::Refresh can finish without reading all the data). |
| 258 if (input_stream_->UnreadByteCount() < min_bytes_needed) { | 259 if (input_stream_->UnreadByteCount() < min_bytes_needed) { |
| 259 DVLOG(1) << "Socket read finished prematurely. Waiting for " | 260 DVLOG(1) << "Socket read finished prematurely. Waiting for " |
| 260 << min_bytes_needed - input_stream_->UnreadByteCount() | 261 << min_bytes_needed - input_stream_->UnreadByteCount() |
| 261 << " more bytes."; | 262 << " more bytes."; |
| 262 base::MessageLoop::current()->PostTask( | 263 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 263 FROM_HERE, | 264 FROM_HERE, |
| 264 base::Bind(&ConnectionHandlerImpl::WaitForData, | 265 base::Bind(&ConnectionHandlerImpl::WaitForData, |
| 265 weak_ptr_factory_.GetWeakPtr(), | 266 weak_ptr_factory_.GetWeakPtr(), |
| 266 MCS_PROTO_BYTES)); | 267 MCS_PROTO_BYTES)); |
| 267 return; | 268 return; |
| 268 } | 269 } |
| 269 | 270 |
| 270 // Received enough bytes, process them. | 271 // Received enough bytes, process them. |
| 271 DVLOG(1) << "Processing MCS data: state == " << state; | 272 DVLOG(1) << "Processing MCS data: state == " << state; |
| 272 switch(state) { | 273 switch(state) { |
| (...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 367 OnGotMessageBytes(); | 368 OnGotMessageBytes(); |
| 368 } | 369 } |
| 369 | 370 |
| 370 void ConnectionHandlerImpl::OnGotMessageBytes() { | 371 void ConnectionHandlerImpl::OnGotMessageBytes() { |
| 371 read_timeout_timer_.Stop(); | 372 read_timeout_timer_.Stop(); |
| 372 scoped_ptr<google::protobuf::MessageLite> protobuf( | 373 scoped_ptr<google::protobuf::MessageLite> protobuf( |
| 373 BuildProtobufFromTag(message_tag_)); | 374 BuildProtobufFromTag(message_tag_)); |
| 374 // Messages with no content are valid; just use the default protobuf for | 375 // Messages with no content are valid; just use the default protobuf for |
| 375 // that tag. | 376 // that tag. |
| 376 if (protobuf.get() && message_size_ == 0) { | 377 if (protobuf.get() && message_size_ == 0) { |
| 377 base::MessageLoop::current()->PostTask( | 378 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 378 FROM_HERE, | 379 FROM_HERE, |
| 379 base::Bind(&ConnectionHandlerImpl::GetNextMessage, | 380 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
| 380 weak_ptr_factory_.GetWeakPtr())); | 381 weak_ptr_factory_.GetWeakPtr())); |
| 381 read_callback_.Run(protobuf.Pass()); | 382 read_callback_.Run(protobuf.Pass()); |
| 382 return; | 383 return; |
| 383 } | 384 } |
| 384 | 385 |
| 385 if (input_stream_->GetState() != SocketInputStream::READY) { | 386 if (input_stream_->GetState() != SocketInputStream::READY) { |
| 386 LOG(ERROR) << "Failed to extract protobuf bytes of type " | 387 LOG(ERROR) << "Failed to extract protobuf bytes of type " |
| 387 << static_cast<unsigned int>(message_tag_); | 388 << static_cast<unsigned int>(message_tag_); |
| (...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 437 read_timeout_timer_.Start(FROM_HERE, | 438 read_timeout_timer_.Start(FROM_HERE, |
| 438 read_timeout_, | 439 read_timeout_, |
| 439 base::Bind(&ConnectionHandlerImpl::OnTimeout, | 440 base::Bind(&ConnectionHandlerImpl::OnTimeout, |
| 440 weak_ptr_factory_.GetWeakPtr())); | 441 weak_ptr_factory_.GetWeakPtr())); |
| 441 WaitForData(MCS_PROTO_BYTES); | 442 WaitForData(MCS_PROTO_BYTES); |
| 442 return; | 443 return; |
| 443 } | 444 } |
| 444 } | 445 } |
| 445 | 446 |
| 446 input_stream_->RebuildBuffer(); | 447 input_stream_->RebuildBuffer(); |
| 447 base::MessageLoop::current()->PostTask( | 448 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 448 FROM_HERE, | 449 FROM_HERE, |
| 449 base::Bind(&ConnectionHandlerImpl::GetNextMessage, | 450 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
| 450 weak_ptr_factory_.GetWeakPtr())); | 451 weak_ptr_factory_.GetWeakPtr())); |
| 451 if (message_tag_ == kLoginResponseTag) { | 452 if (message_tag_ == kLoginResponseTag) { |
| 452 if (handshake_complete_) { | 453 if (handshake_complete_) { |
| 453 LOG(ERROR) << "Unexpected login response."; | 454 LOG(ERROR) << "Unexpected login response."; |
| 454 } else { | 455 } else { |
| 455 handshake_complete_ = true; | 456 handshake_complete_ = true; |
| 456 DVLOG(1) << "GCM Handshake complete."; | 457 DVLOG(1) << "GCM Handshake complete."; |
| 457 connection_callback_.Run(net::OK); | 458 connection_callback_.Run(net::OK); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 476 message_tag_ = 0; | 477 message_tag_ = 0; |
| 477 message_size_ = 0; | 478 message_size_ = 0; |
| 478 size_packet_so_far_ = 0; | 479 size_packet_so_far_ = 0; |
| 479 payload_input_buffer_.clear(); | 480 payload_input_buffer_.clear(); |
| 480 input_stream_.reset(); | 481 input_stream_.reset(); |
| 481 output_stream_.reset(); | 482 output_stream_.reset(); |
| 482 weak_ptr_factory_.InvalidateWeakPtrs(); | 483 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 483 } | 484 } |
| 484 | 485 |
| 485 } // namespace gcm | 486 } // namespace gcm |
| OLD | NEW |