OLD | NEW |
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/connection_handler_impl.h" | 5 #include "google_apis/gcm/engine/connection_handler_impl.h" |
6 | 6 |
7 #include "base/message_loop/message_loop.h" | 7 #include "base/location.h" |
| 8 #include "base/thread_task_runner_handle.h" |
8 #include "google/protobuf/io/coded_stream.h" | 9 #include "google/protobuf/io/coded_stream.h" |
9 #include "google/protobuf/io/zero_copy_stream_impl_lite.h" | 10 #include "google/protobuf/io/zero_copy_stream_impl_lite.h" |
10 #include "google_apis/gcm/base/mcs_util.h" | 11 #include "google_apis/gcm/base/mcs_util.h" |
11 #include "google_apis/gcm/base/socket_stream.h" | 12 #include "google_apis/gcm/base/socket_stream.h" |
12 #include "google_apis/gcm/protocol/mcs.pb.h" | 13 #include "google_apis/gcm/protocol/mcs.pb.h" |
13 #include "net/base/net_errors.h" | 14 #include "net/base/net_errors.h" |
14 #include "net/socket/stream_socket.h" | 15 #include "net/socket/stream_socket.h" |
15 | 16 |
16 using namespace google::protobuf::io; | 17 using namespace google::protobuf::io; |
17 | 18 |
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
122 CodedOutputStream coded_output_stream(output_stream_.get()); | 123 CodedOutputStream coded_output_stream(output_stream_.get()); |
123 coded_output_stream.WriteRaw(version_byte, 1); | 124 coded_output_stream.WriteRaw(version_byte, 1); |
124 coded_output_stream.WriteRaw(login_request_tag, 1); | 125 coded_output_stream.WriteRaw(login_request_tag, 1); |
125 coded_output_stream.WriteVarint32(login_request.ByteSize()); | 126 coded_output_stream.WriteVarint32(login_request.ByteSize()); |
126 login_request.SerializeToCodedStream(&coded_output_stream); | 127 login_request.SerializeToCodedStream(&coded_output_stream); |
127 } | 128 } |
128 | 129 |
129 if (output_stream_->Flush( | 130 if (output_stream_->Flush( |
130 base::Bind(&ConnectionHandlerImpl::OnMessageSent, | 131 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
131 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { | 132 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { |
132 base::MessageLoop::current()->PostTask( | 133 base::ThreadTaskRunnerHandle::Get()->PostTask( |
133 FROM_HERE, | 134 FROM_HERE, |
134 base::Bind(&ConnectionHandlerImpl::OnMessageSent, | 135 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
135 weak_ptr_factory_.GetWeakPtr())); | 136 weak_ptr_factory_.GetWeakPtr())); |
136 } | 137 } |
137 | 138 |
138 read_timeout_timer_.Start(FROM_HERE, | 139 read_timeout_timer_.Start(FROM_HERE, |
139 read_timeout_, | 140 read_timeout_, |
140 base::Bind(&ConnectionHandlerImpl::OnTimeout, | 141 base::Bind(&ConnectionHandlerImpl::OnTimeout, |
141 weak_ptr_factory_.GetWeakPtr())); | 142 weak_ptr_factory_.GetWeakPtr())); |
142 WaitForData(MCS_VERSION_TAG_AND_SIZE); | 143 WaitForData(MCS_VERSION_TAG_AND_SIZE); |
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
252 connection_callback_.Run(last_error); | 253 connection_callback_.Run(last_error); |
253 return; | 254 return; |
254 } | 255 } |
255 | 256 |
256 // Check whether read is complete, or needs to be continued ( | 257 // Check whether read is complete, or needs to be continued ( |
257 // SocketInputStream::Refresh can finish without reading all the data). | 258 // SocketInputStream::Refresh can finish without reading all the data). |
258 if (input_stream_->UnreadByteCount() < min_bytes_needed) { | 259 if (input_stream_->UnreadByteCount() < min_bytes_needed) { |
259 DVLOG(1) << "Socket read finished prematurely. Waiting for " | 260 DVLOG(1) << "Socket read finished prematurely. Waiting for " |
260 << min_bytes_needed - input_stream_->UnreadByteCount() | 261 << min_bytes_needed - input_stream_->UnreadByteCount() |
261 << " more bytes."; | 262 << " more bytes."; |
262 base::MessageLoop::current()->PostTask( | 263 base::ThreadTaskRunnerHandle::Get()->PostTask( |
263 FROM_HERE, | 264 FROM_HERE, |
264 base::Bind(&ConnectionHandlerImpl::WaitForData, | 265 base::Bind(&ConnectionHandlerImpl::WaitForData, |
265 weak_ptr_factory_.GetWeakPtr(), | 266 weak_ptr_factory_.GetWeakPtr(), |
266 MCS_PROTO_BYTES)); | 267 MCS_PROTO_BYTES)); |
267 return; | 268 return; |
268 } | 269 } |
269 | 270 |
270 // Received enough bytes, process them. | 271 // Received enough bytes, process them. |
271 DVLOG(1) << "Processing MCS data: state == " << state; | 272 DVLOG(1) << "Processing MCS data: state == " << state; |
272 switch(state) { | 273 switch(state) { |
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
367 OnGotMessageBytes(); | 368 OnGotMessageBytes(); |
368 } | 369 } |
369 | 370 |
370 void ConnectionHandlerImpl::OnGotMessageBytes() { | 371 void ConnectionHandlerImpl::OnGotMessageBytes() { |
371 read_timeout_timer_.Stop(); | 372 read_timeout_timer_.Stop(); |
372 scoped_ptr<google::protobuf::MessageLite> protobuf( | 373 scoped_ptr<google::protobuf::MessageLite> protobuf( |
373 BuildProtobufFromTag(message_tag_)); | 374 BuildProtobufFromTag(message_tag_)); |
374 // Messages with no content are valid; just use the default protobuf for | 375 // Messages with no content are valid; just use the default protobuf for |
375 // that tag. | 376 // that tag. |
376 if (protobuf.get() && message_size_ == 0) { | 377 if (protobuf.get() && message_size_ == 0) { |
377 base::MessageLoop::current()->PostTask( | 378 base::ThreadTaskRunnerHandle::Get()->PostTask( |
378 FROM_HERE, | 379 FROM_HERE, |
379 base::Bind(&ConnectionHandlerImpl::GetNextMessage, | 380 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
380 weak_ptr_factory_.GetWeakPtr())); | 381 weak_ptr_factory_.GetWeakPtr())); |
381 read_callback_.Run(protobuf.Pass()); | 382 read_callback_.Run(protobuf.Pass()); |
382 return; | 383 return; |
383 } | 384 } |
384 | 385 |
385 if (input_stream_->GetState() != SocketInputStream::READY) { | 386 if (input_stream_->GetState() != SocketInputStream::READY) { |
386 LOG(ERROR) << "Failed to extract protobuf bytes of type " | 387 LOG(ERROR) << "Failed to extract protobuf bytes of type " |
387 << static_cast<unsigned int>(message_tag_); | 388 << static_cast<unsigned int>(message_tag_); |
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
437 read_timeout_timer_.Start(FROM_HERE, | 438 read_timeout_timer_.Start(FROM_HERE, |
438 read_timeout_, | 439 read_timeout_, |
439 base::Bind(&ConnectionHandlerImpl::OnTimeout, | 440 base::Bind(&ConnectionHandlerImpl::OnTimeout, |
440 weak_ptr_factory_.GetWeakPtr())); | 441 weak_ptr_factory_.GetWeakPtr())); |
441 WaitForData(MCS_PROTO_BYTES); | 442 WaitForData(MCS_PROTO_BYTES); |
442 return; | 443 return; |
443 } | 444 } |
444 } | 445 } |
445 | 446 |
446 input_stream_->RebuildBuffer(); | 447 input_stream_->RebuildBuffer(); |
447 base::MessageLoop::current()->PostTask( | 448 base::ThreadTaskRunnerHandle::Get()->PostTask( |
448 FROM_HERE, | 449 FROM_HERE, |
449 base::Bind(&ConnectionHandlerImpl::GetNextMessage, | 450 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
450 weak_ptr_factory_.GetWeakPtr())); | 451 weak_ptr_factory_.GetWeakPtr())); |
451 if (message_tag_ == kLoginResponseTag) { | 452 if (message_tag_ == kLoginResponseTag) { |
452 if (handshake_complete_) { | 453 if (handshake_complete_) { |
453 LOG(ERROR) << "Unexpected login response."; | 454 LOG(ERROR) << "Unexpected login response."; |
454 } else { | 455 } else { |
455 handshake_complete_ = true; | 456 handshake_complete_ = true; |
456 DVLOG(1) << "GCM Handshake complete."; | 457 DVLOG(1) << "GCM Handshake complete."; |
457 connection_callback_.Run(net::OK); | 458 connection_callback_.Run(net::OK); |
(...skipping 18 matching lines...) Expand all Loading... |
476 message_tag_ = 0; | 477 message_tag_ = 0; |
477 message_size_ = 0; | 478 message_size_ = 0; |
478 size_packet_so_far_ = 0; | 479 size_packet_so_far_ = 0; |
479 payload_input_buffer_.clear(); | 480 payload_input_buffer_.clear(); |
480 input_stream_.reset(); | 481 input_stream_.reset(); |
481 output_stream_.reset(); | 482 output_stream_.reset(); |
482 weak_ptr_factory_.InvalidateWeakPtrs(); | 483 weak_ptr_factory_.InvalidateWeakPtrs(); |
483 } | 484 } |
484 | 485 |
485 } // namespace gcm | 486 } // namespace gcm |
OLD | NEW |