Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(34)

Side by Side Diff: google_apis/gcm/engine/connection_handler_impl.cc

Issue 1232193002: Replace MessageLoop::current() with ThreadTaskRunnerHandle::Get() in GCM (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2013 The Chromium Authors. All rights reserved. 1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "google_apis/gcm/engine/connection_handler_impl.h" 5 #include "google_apis/gcm/engine/connection_handler_impl.h"
6 6
7 #include "base/message_loop/message_loop.h" 7 #include "base/location.h"
8 #include "base/thread_task_runner_handle.h"
8 #include "google/protobuf/io/coded_stream.h" 9 #include "google/protobuf/io/coded_stream.h"
9 #include "google/protobuf/io/zero_copy_stream_impl_lite.h" 10 #include "google/protobuf/io/zero_copy_stream_impl_lite.h"
10 #include "google_apis/gcm/base/mcs_util.h" 11 #include "google_apis/gcm/base/mcs_util.h"
11 #include "google_apis/gcm/base/socket_stream.h" 12 #include "google_apis/gcm/base/socket_stream.h"
12 #include "google_apis/gcm/protocol/mcs.pb.h" 13 #include "google_apis/gcm/protocol/mcs.pb.h"
13 #include "net/base/net_errors.h" 14 #include "net/base/net_errors.h"
14 #include "net/socket/stream_socket.h" 15 #include "net/socket/stream_socket.h"
15 16
16 using namespace google::protobuf::io; 17 using namespace google::protobuf::io;
17 18
(...skipping 104 matching lines...) Expand 10 before | Expand all | Expand 10 after
122 CodedOutputStream coded_output_stream(output_stream_.get()); 123 CodedOutputStream coded_output_stream(output_stream_.get());
123 coded_output_stream.WriteRaw(version_byte, 1); 124 coded_output_stream.WriteRaw(version_byte, 1);
124 coded_output_stream.WriteRaw(login_request_tag, 1); 125 coded_output_stream.WriteRaw(login_request_tag, 1);
125 coded_output_stream.WriteVarint32(login_request.ByteSize()); 126 coded_output_stream.WriteVarint32(login_request.ByteSize());
126 login_request.SerializeToCodedStream(&coded_output_stream); 127 login_request.SerializeToCodedStream(&coded_output_stream);
127 } 128 }
128 129
129 if (output_stream_->Flush( 130 if (output_stream_->Flush(
130 base::Bind(&ConnectionHandlerImpl::OnMessageSent, 131 base::Bind(&ConnectionHandlerImpl::OnMessageSent,
131 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { 132 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) {
132 base::MessageLoop::current()->PostTask( 133 base::ThreadTaskRunnerHandle::Get()->PostTask(
133 FROM_HERE, 134 FROM_HERE,
134 base::Bind(&ConnectionHandlerImpl::OnMessageSent, 135 base::Bind(&ConnectionHandlerImpl::OnMessageSent,
135 weak_ptr_factory_.GetWeakPtr())); 136 weak_ptr_factory_.GetWeakPtr()));
136 } 137 }
137 138
138 read_timeout_timer_.Start(FROM_HERE, 139 read_timeout_timer_.Start(FROM_HERE,
139 read_timeout_, 140 read_timeout_,
140 base::Bind(&ConnectionHandlerImpl::OnTimeout, 141 base::Bind(&ConnectionHandlerImpl::OnTimeout,
141 weak_ptr_factory_.GetWeakPtr())); 142 weak_ptr_factory_.GetWeakPtr()));
142 WaitForData(MCS_VERSION_TAG_AND_SIZE); 143 WaitForData(MCS_VERSION_TAG_AND_SIZE);
(...skipping 109 matching lines...) Expand 10 before | Expand all | Expand 10 after
252 connection_callback_.Run(last_error); 253 connection_callback_.Run(last_error);
253 return; 254 return;
254 } 255 }
255 256
256 // Check whether read is complete, or needs to be continued ( 257 // Check whether read is complete, or needs to be continued (
257 // SocketInputStream::Refresh can finish without reading all the data). 258 // SocketInputStream::Refresh can finish without reading all the data).
258 if (input_stream_->UnreadByteCount() < min_bytes_needed) { 259 if (input_stream_->UnreadByteCount() < min_bytes_needed) {
259 DVLOG(1) << "Socket read finished prematurely. Waiting for " 260 DVLOG(1) << "Socket read finished prematurely. Waiting for "
260 << min_bytes_needed - input_stream_->UnreadByteCount() 261 << min_bytes_needed - input_stream_->UnreadByteCount()
261 << " more bytes."; 262 << " more bytes.";
262 base::MessageLoop::current()->PostTask( 263 base::ThreadTaskRunnerHandle::Get()->PostTask(
263 FROM_HERE, 264 FROM_HERE,
264 base::Bind(&ConnectionHandlerImpl::WaitForData, 265 base::Bind(&ConnectionHandlerImpl::WaitForData,
265 weak_ptr_factory_.GetWeakPtr(), 266 weak_ptr_factory_.GetWeakPtr(),
266 MCS_PROTO_BYTES)); 267 MCS_PROTO_BYTES));
267 return; 268 return;
268 } 269 }
269 270
270 // Received enough bytes, process them. 271 // Received enough bytes, process them.
271 DVLOG(1) << "Processing MCS data: state == " << state; 272 DVLOG(1) << "Processing MCS data: state == " << state;
272 switch(state) { 273 switch(state) {
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
367 OnGotMessageBytes(); 368 OnGotMessageBytes();
368 } 369 }
369 370
370 void ConnectionHandlerImpl::OnGotMessageBytes() { 371 void ConnectionHandlerImpl::OnGotMessageBytes() {
371 read_timeout_timer_.Stop(); 372 read_timeout_timer_.Stop();
372 scoped_ptr<google::protobuf::MessageLite> protobuf( 373 scoped_ptr<google::protobuf::MessageLite> protobuf(
373 BuildProtobufFromTag(message_tag_)); 374 BuildProtobufFromTag(message_tag_));
374 // Messages with no content are valid; just use the default protobuf for 375 // Messages with no content are valid; just use the default protobuf for
375 // that tag. 376 // that tag.
376 if (protobuf.get() && message_size_ == 0) { 377 if (protobuf.get() && message_size_ == 0) {
377 base::MessageLoop::current()->PostTask( 378 base::ThreadTaskRunnerHandle::Get()->PostTask(
378 FROM_HERE, 379 FROM_HERE,
379 base::Bind(&ConnectionHandlerImpl::GetNextMessage, 380 base::Bind(&ConnectionHandlerImpl::GetNextMessage,
380 weak_ptr_factory_.GetWeakPtr())); 381 weak_ptr_factory_.GetWeakPtr()));
381 read_callback_.Run(protobuf.Pass()); 382 read_callback_.Run(protobuf.Pass());
382 return; 383 return;
383 } 384 }
384 385
385 if (input_stream_->GetState() != SocketInputStream::READY) { 386 if (input_stream_->GetState() != SocketInputStream::READY) {
386 LOG(ERROR) << "Failed to extract protobuf bytes of type " 387 LOG(ERROR) << "Failed to extract protobuf bytes of type "
387 << static_cast<unsigned int>(message_tag_); 388 << static_cast<unsigned int>(message_tag_);
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
437 read_timeout_timer_.Start(FROM_HERE, 438 read_timeout_timer_.Start(FROM_HERE,
438 read_timeout_, 439 read_timeout_,
439 base::Bind(&ConnectionHandlerImpl::OnTimeout, 440 base::Bind(&ConnectionHandlerImpl::OnTimeout,
440 weak_ptr_factory_.GetWeakPtr())); 441 weak_ptr_factory_.GetWeakPtr()));
441 WaitForData(MCS_PROTO_BYTES); 442 WaitForData(MCS_PROTO_BYTES);
442 return; 443 return;
443 } 444 }
444 } 445 }
445 446
446 input_stream_->RebuildBuffer(); 447 input_stream_->RebuildBuffer();
447 base::MessageLoop::current()->PostTask( 448 base::ThreadTaskRunnerHandle::Get()->PostTask(
448 FROM_HERE, 449 FROM_HERE,
449 base::Bind(&ConnectionHandlerImpl::GetNextMessage, 450 base::Bind(&ConnectionHandlerImpl::GetNextMessage,
450 weak_ptr_factory_.GetWeakPtr())); 451 weak_ptr_factory_.GetWeakPtr()));
451 if (message_tag_ == kLoginResponseTag) { 452 if (message_tag_ == kLoginResponseTag) {
452 if (handshake_complete_) { 453 if (handshake_complete_) {
453 LOG(ERROR) << "Unexpected login response."; 454 LOG(ERROR) << "Unexpected login response.";
454 } else { 455 } else {
455 handshake_complete_ = true; 456 handshake_complete_ = true;
456 DVLOG(1) << "GCM Handshake complete."; 457 DVLOG(1) << "GCM Handshake complete.";
457 connection_callback_.Run(net::OK); 458 connection_callback_.Run(net::OK);
(...skipping 18 matching lines...) Expand all
476 message_tag_ = 0; 477 message_tag_ = 0;
477 message_size_ = 0; 478 message_size_ = 0;
478 size_packet_so_far_ = 0; 479 size_packet_so_far_ = 0;
479 payload_input_buffer_.clear(); 480 payload_input_buffer_.clear();
480 input_stream_.reset(); 481 input_stream_.reset();
481 output_stream_.reset(); 482 output_stream_.reset();
482 weak_ptr_factory_.InvalidateWeakPtrs(); 483 weak_ptr_factory_.InvalidateWeakPtrs();
483 } 484 }
484 485
485 } // namespace gcm 486 } // namespace gcm
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698