| OLD | NEW |
| 1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "google_apis/gcm/engine/connection_handler.h" | 5 #include "google_apis/gcm/engine/connection_handler_impl.h" |
| 6 | 6 |
| 7 #include "base/message_loop/message_loop.h" | 7 #include "base/message_loop/message_loop.h" |
| 8 #include "google/protobuf/io/coded_stream.h" | 8 #include "google/protobuf/io/coded_stream.h" |
| 9 #include "google_apis/gcm/base/mcs_util.h" | 9 #include "google_apis/gcm/base/mcs_util.h" |
| 10 #include "google_apis/gcm/base/socket_stream.h" | 10 #include "google_apis/gcm/base/socket_stream.h" |
| 11 #include "google_apis/gcm/protocol/mcs.pb.h" |
| 11 #include "net/base/net_errors.h" | 12 #include "net/base/net_errors.h" |
| 12 #include "net/socket/stream_socket.h" | 13 #include "net/socket/stream_socket.h" |
| 13 | 14 |
| 14 using namespace google::protobuf::io; | 15 using namespace google::protobuf::io; |
| 15 | 16 |
| 16 namespace gcm { | 17 namespace gcm { |
| 17 | 18 |
| 18 namespace { | 19 namespace { |
| 19 | 20 |
| 20 // # of bytes a MCS version packet consumes. | 21 // # of bytes a MCS version packet consumes. |
| 21 const int kVersionPacketLen = 1; | 22 const int kVersionPacketLen = 1; |
| 22 // # of bytes a tag packet consumes. | 23 // # of bytes a tag packet consumes. |
| 23 const int kTagPacketLen = 1; | 24 const int kTagPacketLen = 1; |
| 24 // Max # of bytes a length packet consumes. | 25 // Max # of bytes a length packet consumes. |
| 25 const int kSizePacketLenMin = 1; | 26 const int kSizePacketLenMin = 1; |
| 26 const int kSizePacketLenMax = 2; | 27 const int kSizePacketLenMax = 2; |
| 27 | 28 |
| 28 // The current MCS protocol version. | 29 // The current MCS protocol version. |
| 30 // TODO(zea): bump to 41 once the server supports it. |
| 29 const int kMCSVersion = 38; | 31 const int kMCSVersion = 38; |
| 30 | 32 |
| 31 } // namespace | 33 } // namespace |
| 32 | 34 |
| 33 ConnectionHandler::ConnectionHandler(base::TimeDelta read_timeout) | 35 ConnectionHandlerImpl::ConnectionHandlerImpl( |
| 36 base::TimeDelta read_timeout, |
| 37 const ProtoReceivedCallback& read_callback, |
| 38 const ProtoSentCallback& write_callback, |
| 39 const ConnectionChangedCallback& connection_callback) |
| 34 : read_timeout_(read_timeout), | 40 : read_timeout_(read_timeout), |
| 35 handshake_complete_(false), | 41 handshake_complete_(false), |
| 36 message_tag_(0), | 42 message_tag_(0), |
| 37 message_size_(0), | 43 message_size_(0), |
| 44 read_callback_(read_callback), |
| 45 write_callback_(write_callback), |
| 46 connection_callback_(connection_callback), |
| 38 weak_ptr_factory_(this) { | 47 weak_ptr_factory_(this) { |
| 39 } | 48 } |
| 40 | 49 |
| 41 ConnectionHandler::~ConnectionHandler() { | 50 ConnectionHandlerImpl::~ConnectionHandlerImpl() { |
| 42 } | 51 } |
| 43 | 52 |
| 44 void ConnectionHandler::Init( | 53 void ConnectionHandlerImpl::Init( |
| 45 scoped_ptr<net::StreamSocket> socket, | 54 const mcs_proto::LoginRequest& login_request, |
| 46 const google::protobuf::MessageLite& login_request, | 55 scoped_ptr<net::StreamSocket> socket) { |
| 47 const ProtoReceivedCallback& read_callback, | 56 DCHECK(!read_callback_.is_null()); |
| 48 const ProtoSentCallback& write_callback, | 57 DCHECK(!write_callback_.is_null()); |
| 49 const ConnectionChangedCallback& connection_callback) { | 58 DCHECK(!connection_callback_.is_null()); |
| 50 DCHECK(!read_callback.is_null()); | |
| 51 DCHECK(!write_callback.is_null()); | |
| 52 DCHECK(!connection_callback.is_null()); | |
| 53 | 59 |
| 54 // Invalidate any previously outstanding reads. | 60 // Invalidate any previously outstanding reads. |
| 55 weak_ptr_factory_.InvalidateWeakPtrs(); | 61 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 56 | 62 |
| 57 handshake_complete_ = false; | 63 handshake_complete_ = false; |
| 58 message_tag_ = 0; | 64 message_tag_ = 0; |
| 59 message_size_ = 0; | 65 message_size_ = 0; |
| 60 socket_ = socket.Pass(); | 66 socket_ = socket.Pass(); |
| 61 input_stream_.reset(new SocketInputStream(socket_.get())); | 67 input_stream_.reset(new SocketInputStream(socket_.get())); |
| 62 output_stream_.reset(new SocketOutputStream(socket_.get())); | 68 output_stream_.reset(new SocketOutputStream(socket_.get())); |
| 63 read_callback_ = read_callback; | |
| 64 write_callback_ = write_callback; | |
| 65 connection_callback_ = connection_callback; | |
| 66 | 69 |
| 67 Login(login_request); | 70 Login(login_request); |
| 68 } | 71 } |
| 69 | 72 |
| 70 bool ConnectionHandler::CanSendMessage() const { | 73 bool ConnectionHandlerImpl::CanSendMessage() const { |
| 71 return handshake_complete_ && output_stream_.get() && | 74 return handshake_complete_ && output_stream_.get() && |
| 72 output_stream_->GetState() == SocketOutputStream::EMPTY; | 75 output_stream_->GetState() == SocketOutputStream::EMPTY; |
| 73 } | 76 } |
| 74 | 77 |
| 75 void ConnectionHandler::SendMessage( | 78 void ConnectionHandlerImpl::SendMessage( |
| 76 const google::protobuf::MessageLite& message) { | 79 const google::protobuf::MessageLite& message) { |
| 77 DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); | 80 DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); |
| 78 DCHECK(handshake_complete_); | 81 DCHECK(handshake_complete_); |
| 79 | 82 |
| 80 { | 83 { |
| 81 CodedOutputStream coded_output_stream(output_stream_.get()); | 84 CodedOutputStream coded_output_stream(output_stream_.get()); |
| 82 DVLOG(1) << "Writing proto of size " << message.ByteSize(); | 85 DVLOG(1) << "Writing proto of size " << message.ByteSize(); |
| 83 int tag = GetMCSProtoTag(message); | 86 int tag = GetMCSProtoTag(message); |
| 84 DCHECK_NE(tag, -1); | 87 DCHECK_NE(tag, -1); |
| 85 coded_output_stream.WriteRaw(&tag, 1); | 88 coded_output_stream.WriteRaw(&tag, 1); |
| 86 coded_output_stream.WriteVarint32(message.ByteSize()); | 89 coded_output_stream.WriteVarint32(message.ByteSize()); |
| 87 message.SerializeToCodedStream(&coded_output_stream); | 90 message.SerializeToCodedStream(&coded_output_stream); |
| 88 } | 91 } |
| 89 | 92 |
| 90 if (output_stream_->Flush( | 93 if (output_stream_->Flush( |
| 91 base::Bind(&ConnectionHandler::OnMessageSent, | 94 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
| 92 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { | 95 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { |
| 93 OnMessageSent(); | 96 OnMessageSent(); |
| 94 } | 97 } |
| 95 } | 98 } |
| 96 | 99 |
| 97 void ConnectionHandler::Login( | 100 void ConnectionHandlerImpl::Login( |
| 98 const google::protobuf::MessageLite& login_request) { | 101 const google::protobuf::MessageLite& login_request) { |
| 99 DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); | 102 DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); |
| 100 | 103 |
| 101 const char version_byte[1] = {kMCSVersion}; | 104 const char version_byte[1] = {kMCSVersion}; |
| 102 const char login_request_tag[1] = {kLoginRequestTag}; | 105 const char login_request_tag[1] = {kLoginRequestTag}; |
| 103 { | 106 { |
| 104 CodedOutputStream coded_output_stream(output_stream_.get()); | 107 CodedOutputStream coded_output_stream(output_stream_.get()); |
| 105 coded_output_stream.WriteRaw(version_byte, 1); | 108 coded_output_stream.WriteRaw(version_byte, 1); |
| 106 coded_output_stream.WriteRaw(login_request_tag, 1); | 109 coded_output_stream.WriteRaw(login_request_tag, 1); |
| 107 coded_output_stream.WriteVarint32(login_request.ByteSize()); | 110 coded_output_stream.WriteVarint32(login_request.ByteSize()); |
| 108 login_request.SerializeToCodedStream(&coded_output_stream); | 111 login_request.SerializeToCodedStream(&coded_output_stream); |
| 109 } | 112 } |
| 110 | 113 |
| 111 if (output_stream_->Flush( | 114 if (output_stream_->Flush( |
| 112 base::Bind(&ConnectionHandler::OnMessageSent, | 115 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
| 113 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { | 116 weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { |
| 114 base::MessageLoop::current()->PostTask( | 117 base::MessageLoop::current()->PostTask( |
| 115 FROM_HERE, | 118 FROM_HERE, |
| 116 base::Bind(&ConnectionHandler::OnMessageSent, | 119 base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
| 117 weak_ptr_factory_.GetWeakPtr())); | 120 weak_ptr_factory_.GetWeakPtr())); |
| 118 } | 121 } |
| 119 | 122 |
| 120 read_timeout_timer_.Start(FROM_HERE, | 123 read_timeout_timer_.Start(FROM_HERE, |
| 121 read_timeout_, | 124 read_timeout_, |
| 122 base::Bind(&ConnectionHandler::OnTimeout, | 125 base::Bind(&ConnectionHandlerImpl::OnTimeout, |
| 123 weak_ptr_factory_.GetWeakPtr())); | 126 weak_ptr_factory_.GetWeakPtr())); |
| 124 WaitForData(MCS_VERSION_TAG_AND_SIZE); | 127 WaitForData(MCS_VERSION_TAG_AND_SIZE); |
| 125 } | 128 } |
| 126 | 129 |
| 127 void ConnectionHandler::OnMessageSent() { | 130 void ConnectionHandlerImpl::OnMessageSent() { |
| 128 if (!output_stream_.get()) { | 131 if (!output_stream_.get()) { |
| 129 // The connection has already been closed. Just return. | 132 // The connection has already been closed. Just return. |
| 130 DCHECK(!input_stream_.get()); | 133 DCHECK(!input_stream_.get()); |
| 131 DCHECK(!read_timeout_timer_.IsRunning()); | 134 DCHECK(!read_timeout_timer_.IsRunning()); |
| 132 return; | 135 return; |
| 133 } | 136 } |
| 134 | 137 |
| 135 if (output_stream_->GetState() != SocketOutputStream::EMPTY) { | 138 if (output_stream_->GetState() != SocketOutputStream::EMPTY) { |
| 136 int last_error = output_stream_->last_error(); | 139 int last_error = output_stream_->last_error(); |
| 137 CloseConnection(); | 140 CloseConnection(); |
| 138 // If the socket stream had an error, plumb it up, else plumb up FAILED. | 141 // If the socket stream had an error, plumb it up, else plumb up FAILED. |
| 139 if (last_error == net::OK) | 142 if (last_error == net::OK) |
| 140 last_error = net::ERR_FAILED; | 143 last_error = net::ERR_FAILED; |
| 141 connection_callback_.Run(last_error); | 144 connection_callback_.Run(last_error); |
| 142 return; | 145 return; |
| 143 } | 146 } |
| 144 | 147 |
| 145 write_callback_.Run(); | 148 write_callback_.Run(); |
| 146 } | 149 } |
| 147 | 150 |
| 148 void ConnectionHandler::GetNextMessage() { | 151 void ConnectionHandlerImpl::GetNextMessage() { |
| 149 DCHECK(SocketInputStream::EMPTY == input_stream_->GetState() || | 152 DCHECK(SocketInputStream::EMPTY == input_stream_->GetState() || |
| 150 SocketInputStream::READY == input_stream_->GetState()); | 153 SocketInputStream::READY == input_stream_->GetState()); |
| 151 message_tag_ = 0; | 154 message_tag_ = 0; |
| 152 message_size_ = 0; | 155 message_size_ = 0; |
| 153 | 156 |
| 154 WaitForData(MCS_TAG_AND_SIZE); | 157 WaitForData(MCS_TAG_AND_SIZE); |
| 155 } | 158 } |
| 156 | 159 |
| 157 void ConnectionHandler::WaitForData(ProcessingState state) { | 160 void ConnectionHandlerImpl::WaitForData(ProcessingState state) { |
| 158 DVLOG(1) << "Waiting for MCS data: state == " << state; | 161 DVLOG(1) << "Waiting for MCS data: state == " << state; |
| 159 | 162 |
| 160 if (!input_stream_) { | 163 if (!input_stream_) { |
| 161 // The connection has already been closed. Just return. | 164 // The connection has already been closed. Just return. |
| 162 DCHECK(!output_stream_.get()); | 165 DCHECK(!output_stream_.get()); |
| 163 DCHECK(!read_timeout_timer_.IsRunning()); | 166 DCHECK(!read_timeout_timer_.IsRunning()); |
| 164 return; | 167 return; |
| 165 } | 168 } |
| 166 | 169 |
| 167 if (input_stream_->GetState() != SocketInputStream::EMPTY && | 170 if (input_stream_->GetState() != SocketInputStream::EMPTY && |
| (...skipping 35 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 203 max_bytes_needed = message_size_; | 206 max_bytes_needed = message_size_; |
| 204 break; | 207 break; |
| 205 default: | 208 default: |
| 206 NOTREACHED(); | 209 NOTREACHED(); |
| 207 } | 210 } |
| 208 DCHECK_GE(max_bytes_needed, min_bytes_needed); | 211 DCHECK_GE(max_bytes_needed, min_bytes_needed); |
| 209 | 212 |
| 210 int byte_count = input_stream_->UnreadByteCount(); | 213 int byte_count = input_stream_->UnreadByteCount(); |
| 211 if (min_bytes_needed - byte_count > 0 && | 214 if (min_bytes_needed - byte_count > 0 && |
| 212 input_stream_->Refresh( | 215 input_stream_->Refresh( |
| 213 base::Bind(&ConnectionHandler::WaitForData, | 216 base::Bind(&ConnectionHandlerImpl::WaitForData, |
| 214 weak_ptr_factory_.GetWeakPtr(), | 217 weak_ptr_factory_.GetWeakPtr(), |
| 215 state), | 218 state), |
| 216 max_bytes_needed - byte_count) == net::ERR_IO_PENDING) { | 219 max_bytes_needed - byte_count) == net::ERR_IO_PENDING) { |
| 217 return; | 220 return; |
| 218 } | 221 } |
| 219 | 222 |
| 220 // Check for refresh errors. | 223 // Check for refresh errors. |
| 221 if (input_stream_->GetState() != SocketInputStream::READY) { | 224 if (input_stream_->GetState() != SocketInputStream::READY) { |
| 222 // An error occurred. | 225 // An error occurred. |
| 223 int last_error = output_stream_->last_error(); | 226 int last_error = output_stream_->last_error(); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 242 OnGotMessageSize(); | 245 OnGotMessageSize(); |
| 243 break; | 246 break; |
| 244 case MCS_PROTO_BYTES: | 247 case MCS_PROTO_BYTES: |
| 245 OnGotMessageBytes(); | 248 OnGotMessageBytes(); |
| 246 break; | 249 break; |
| 247 default: | 250 default: |
| 248 NOTREACHED(); | 251 NOTREACHED(); |
| 249 } | 252 } |
| 250 } | 253 } |
| 251 | 254 |
| 252 void ConnectionHandler::OnGotVersion() { | 255 void ConnectionHandlerImpl::OnGotVersion() { |
| 253 uint8 version = 0; | 256 uint8 version = 0; |
| 254 { | 257 { |
| 255 CodedInputStream coded_input_stream(input_stream_.get()); | 258 CodedInputStream coded_input_stream(input_stream_.get()); |
| 256 coded_input_stream.ReadRaw(&version, 1); | 259 coded_input_stream.ReadRaw(&version, 1); |
| 257 } | 260 } |
| 258 if (version < kMCSVersion) { | 261 if (version < kMCSVersion) { |
| 259 LOG(ERROR) << "Invalid GCM version response: " << static_cast<int>(version); | 262 LOG(ERROR) << "Invalid GCM version response: " << static_cast<int>(version); |
| 260 connection_callback_.Run(net::ERR_FAILED); | 263 connection_callback_.Run(net::ERR_FAILED); |
| 261 return; | 264 return; |
| 262 } | 265 } |
| 263 | 266 |
| 264 input_stream_->RebuildBuffer(); | 267 input_stream_->RebuildBuffer(); |
| 265 | 268 |
| 266 // Process the LoginResponse message tag. | 269 // Process the LoginResponse message tag. |
| 267 OnGotMessageTag(); | 270 OnGotMessageTag(); |
| 268 } | 271 } |
| 269 | 272 |
| 270 void ConnectionHandler::OnGotMessageTag() { | 273 void ConnectionHandlerImpl::OnGotMessageTag() { |
| 271 if (input_stream_->GetState() != SocketInputStream::READY) { | 274 if (input_stream_->GetState() != SocketInputStream::READY) { |
| 272 LOG(ERROR) << "Failed to receive protobuf tag."; | 275 LOG(ERROR) << "Failed to receive protobuf tag."; |
| 273 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); | 276 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); |
| 274 return; | 277 return; |
| 275 } | 278 } |
| 276 | 279 |
| 277 { | 280 { |
| 278 CodedInputStream coded_input_stream(input_stream_.get()); | 281 CodedInputStream coded_input_stream(input_stream_.get()); |
| 279 coded_input_stream.ReadRaw(&message_tag_, 1); | 282 coded_input_stream.ReadRaw(&message_tag_, 1); |
| 280 } | 283 } |
| 281 | 284 |
| 282 DVLOG(1) << "Received proto of type " | 285 DVLOG(1) << "Received proto of type " |
| 283 << static_cast<unsigned int>(message_tag_); | 286 << static_cast<unsigned int>(message_tag_); |
| 284 | 287 |
| 285 if (!read_timeout_timer_.IsRunning()) { | 288 if (!read_timeout_timer_.IsRunning()) { |
| 286 read_timeout_timer_.Start(FROM_HERE, | 289 read_timeout_timer_.Start(FROM_HERE, |
| 287 read_timeout_, | 290 read_timeout_, |
| 288 base::Bind(&ConnectionHandler::OnTimeout, | 291 base::Bind(&ConnectionHandlerImpl::OnTimeout, |
| 289 weak_ptr_factory_.GetWeakPtr())); | 292 weak_ptr_factory_.GetWeakPtr())); |
| 290 } | 293 } |
| 291 OnGotMessageSize(); | 294 OnGotMessageSize(); |
| 292 } | 295 } |
| 293 | 296 |
| 294 void ConnectionHandler::OnGotMessageSize() { | 297 void ConnectionHandlerImpl::OnGotMessageSize() { |
| 295 if (input_stream_->GetState() != SocketInputStream::READY) { | 298 if (input_stream_->GetState() != SocketInputStream::READY) { |
| 296 LOG(ERROR) << "Failed to receive message size."; | 299 LOG(ERROR) << "Failed to receive message size."; |
| 297 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); | 300 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); |
| 298 return; | 301 return; |
| 299 } | 302 } |
| 300 | 303 |
| 301 bool need_another_byte = false; | 304 bool need_another_byte = false; |
| 302 int prev_byte_count = input_stream_->ByteCount(); | 305 int prev_byte_count = input_stream_->ByteCount(); |
| 303 { | 306 { |
| 304 CodedInputStream coded_input_stream(input_stream_.get()); | 307 CodedInputStream coded_input_stream(input_stream_.get()); |
| (...skipping 18 matching lines...) Expand all Loading... |
| 323 } | 326 } |
| 324 | 327 |
| 325 DVLOG(1) << "Proto size: " << message_size_; | 328 DVLOG(1) << "Proto size: " << message_size_; |
| 326 | 329 |
| 327 if (message_size_ > 0) | 330 if (message_size_ > 0) |
| 328 WaitForData(MCS_PROTO_BYTES); | 331 WaitForData(MCS_PROTO_BYTES); |
| 329 else | 332 else |
| 330 OnGotMessageBytes(); | 333 OnGotMessageBytes(); |
| 331 } | 334 } |
| 332 | 335 |
| 333 void ConnectionHandler::OnGotMessageBytes() { | 336 void ConnectionHandlerImpl::OnGotMessageBytes() { |
| 334 read_timeout_timer_.Stop(); | 337 read_timeout_timer_.Stop(); |
| 335 scoped_ptr<google::protobuf::MessageLite> protobuf( | 338 scoped_ptr<google::protobuf::MessageLite> protobuf( |
| 336 BuildProtobufFromTag(message_tag_)); | 339 BuildProtobufFromTag(message_tag_)); |
| 337 // Messages with no content are valid; just use the default protobuf for | 340 // Messages with no content are valid; just use the default protobuf for |
| 338 // that tag. | 341 // that tag. |
| 339 if (protobuf.get() && message_size_ == 0) { | 342 if (protobuf.get() && message_size_ == 0) { |
| 340 base::MessageLoop::current()->PostTask( | 343 base::MessageLoop::current()->PostTask( |
| 341 FROM_HERE, | 344 FROM_HERE, |
| 342 base::Bind(&ConnectionHandler::GetNextMessage, | 345 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
| 343 weak_ptr_factory_.GetWeakPtr())); | 346 weak_ptr_factory_.GetWeakPtr())); |
| 344 read_callback_.Run(protobuf.Pass()); | 347 read_callback_.Run(protobuf.Pass()); |
| 345 return; | 348 return; |
| 346 } | 349 } |
| 347 | 350 |
| 348 if (!protobuf.get() || | 351 if (!protobuf.get() || |
| 349 input_stream_->GetState() != SocketInputStream::READY) { | 352 input_stream_->GetState() != SocketInputStream::READY) { |
| 350 LOG(ERROR) << "Failed to extract protobuf bytes of type " | 353 LOG(ERROR) << "Failed to extract protobuf bytes of type " |
| 351 << static_cast<unsigned int>(message_tag_); | 354 << static_cast<unsigned int>(message_tag_); |
| 352 protobuf.reset(); // Return a null pointer to denote an error. | 355 protobuf.reset(); // Return a null pointer to denote an error. |
| 353 read_callback_.Run(protobuf.Pass()); | 356 read_callback_.Run(protobuf.Pass()); |
| 354 return; | 357 return; |
| 355 } | 358 } |
| 356 | 359 |
| 357 { | 360 { |
| 358 CodedInputStream coded_input_stream(input_stream_.get()); | 361 CodedInputStream coded_input_stream(input_stream_.get()); |
| 359 if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) { | 362 if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) { |
| 360 NOTREACHED() << "Unable to parse GCM message of type " | 363 NOTREACHED() << "Unable to parse GCM message of type " |
| 361 << static_cast<unsigned int>(message_tag_); | 364 << static_cast<unsigned int>(message_tag_); |
| 362 protobuf.reset(); // Return a null pointer to denote an error. | 365 protobuf.reset(); // Return a null pointer to denote an error. |
| 363 read_callback_.Run(protobuf.Pass()); | 366 read_callback_.Run(protobuf.Pass()); |
| 364 return; | 367 return; |
| 365 } | 368 } |
| 366 } | 369 } |
| 367 | 370 |
| 368 input_stream_->RebuildBuffer(); | 371 input_stream_->RebuildBuffer(); |
| 369 base::MessageLoop::current()->PostTask( | 372 base::MessageLoop::current()->PostTask( |
| 370 FROM_HERE, | 373 FROM_HERE, |
| 371 base::Bind(&ConnectionHandler::GetNextMessage, | 374 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
| 372 weak_ptr_factory_.GetWeakPtr())); | 375 weak_ptr_factory_.GetWeakPtr())); |
| 373 if (message_tag_ == kLoginResponseTag) { | 376 if (message_tag_ == kLoginResponseTag) { |
| 374 if (handshake_complete_) { | 377 if (handshake_complete_) { |
| 375 LOG(ERROR) << "Unexpected login response."; | 378 LOG(ERROR) << "Unexpected login response."; |
| 376 } else { | 379 } else { |
| 377 handshake_complete_ = true; | 380 handshake_complete_ = true; |
| 378 DVLOG(1) << "GCM Handshake complete."; | 381 DVLOG(1) << "GCM Handshake complete."; |
| 379 } | 382 } |
| 380 } | 383 } |
| 381 read_callback_.Run(protobuf.Pass()); | 384 read_callback_.Run(protobuf.Pass()); |
| 382 } | 385 } |
| 383 | 386 |
| 384 void ConnectionHandler::OnTimeout() { | 387 void ConnectionHandlerImpl::OnTimeout() { |
| 385 LOG(ERROR) << "Timed out waiting for GCM Protocol buffer."; | 388 LOG(ERROR) << "Timed out waiting for GCM Protocol buffer."; |
| 386 CloseConnection(); | 389 CloseConnection(); |
| 387 connection_callback_.Run(net::ERR_TIMED_OUT); | 390 connection_callback_.Run(net::ERR_TIMED_OUT); |
| 388 } | 391 } |
| 389 | 392 |
| 390 void ConnectionHandler::CloseConnection() { | 393 void ConnectionHandlerImpl::CloseConnection() { |
| 391 DVLOG(1) << "Closing connection."; | 394 DVLOG(1) << "Closing connection."; |
| 392 read_callback_.Reset(); | 395 read_callback_.Reset(); |
| 393 write_callback_.Reset(); | 396 write_callback_.Reset(); |
| 394 read_timeout_timer_.Stop(); | 397 read_timeout_timer_.Stop(); |
| 395 socket_->Disconnect(); | 398 socket_->Disconnect(); |
| 396 input_stream_.reset(); | 399 input_stream_.reset(); |
| 397 output_stream_.reset(); | 400 output_stream_.reset(); |
| 398 weak_ptr_factory_.InvalidateWeakPtrs(); | 401 weak_ptr_factory_.InvalidateWeakPtrs(); |
| 399 } | 402 } |
| 400 | 403 |
| 401 } // namespace gcm | 404 } // namespace gcm |
| OLD | NEW |