OLD | NEW |
---|---|
1 // Copyright 2013 The Chromium Authors. All rights reserved. | 1 // Copyright 2013 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "google_apis/gcm/engine/connection_handler_impl.h" | 5 #include "google_apis/gcm/engine/connection_handler_impl.h" |
6 | 6 |
7 #include "base/message_loop/message_loop.h" | 7 #include "base/message_loop/message_loop.h" |
8 #include "google/protobuf/io/coded_stream.h" | 8 #include "google/protobuf/io/coded_stream.h" |
9 #include "google_apis/gcm/base/mcs_util.h" | 9 #include "google_apis/gcm/base/mcs_util.h" |
10 #include "google_apis/gcm/base/socket_stream.h" | 10 #include "google_apis/gcm/base/socket_stream.h" |
11 #include "google_apis/gcm/protocol/mcs.pb.h" | 11 #include "google_apis/gcm/protocol/mcs.pb.h" |
12 #include "net/base/net_errors.h" | 12 #include "net/base/net_errors.h" |
13 #include "net/socket/stream_socket.h" | 13 #include "net/socket/stream_socket.h" |
14 | 14 |
15 using namespace google::protobuf::io; | 15 using namespace google::protobuf::io; |
16 | 16 |
17 namespace gcm { | 17 namespace gcm { |
18 | 18 |
19 namespace { | 19 namespace { |
20 | 20 |
21 // # of bytes a MCS version packet consumes. | 21 // # of bytes a MCS version packet consumes. |
22 const int kVersionPacketLen = 1; | 22 const int kVersionPacketLen = 1; |
23 // # of bytes a tag packet consumes. | 23 // # of bytes a tag packet consumes. |
24 const int kTagPacketLen = 1; | 24 const int kTagPacketLen = 1; |
25 // Max # of bytes a length packet consumes. | 25 // Max # of bytes a length packet consumes. A Varint32 can consume up to 5 bytes |
26 // (the MSB in each byte is reserved for denoting whether more bytes follow). | |
27 // But, the protocol only allows for 4KiB payloads, and the socket stream buffer | |
28 // is only of size 8KiB. As such we should never need more than 2 bytes (max | |
29 // value of 16KiB). Anything higher than that will result in an error, either | |
30 // because the socket stream buffer overflowed or too many bytes were required | |
31 // in the size packet. | |
26 const int kSizePacketLenMin = 1; | 32 const int kSizePacketLenMin = 1; |
27 const int kSizePacketLenMax = 2; | 33 const int kSizePacketLenMax = 2; |
28 | 34 |
29 // The current MCS protocol version. | 35 // The current MCS protocol version. |
30 const int kMCSVersion = 41; | 36 const int kMCSVersion = 41; |
31 | 37 |
32 } // namespace | 38 } // namespace |
33 | 39 |
34 ConnectionHandlerImpl::ConnectionHandlerImpl( | 40 ConnectionHandlerImpl::ConnectionHandlerImpl( |
35 base::TimeDelta read_timeout, | 41 base::TimeDelta read_timeout, |
(...skipping 278 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
314 } | 320 } |
315 | 321 |
316 void ConnectionHandlerImpl::OnGotMessageSize() { | 322 void ConnectionHandlerImpl::OnGotMessageSize() { |
317 if (input_stream_->GetState() != SocketInputStream::READY) { | 323 if (input_stream_->GetState() != SocketInputStream::READY) { |
318 LOG(ERROR) << "Failed to receive message size."; | 324 LOG(ERROR) << "Failed to receive message size."; |
319 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); | 325 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); |
320 return; | 326 return; |
321 } | 327 } |
322 | 328 |
323 bool need_another_byte = false; | 329 bool need_another_byte = false; |
324 int prev_byte_count = input_stream_->ByteCount(); | 330 int prev_byte_count = input_stream_->UnreadByteCount(); |
fgorski
2014/10/20 22:11:54
please explain why you are switching to UnreadByte
Nicolas Zea
2014/10/21 00:02:54
I've updated the commit comment to explain why, bu
| |
325 { | 331 { |
326 CodedInputStream coded_input_stream(input_stream_.get()); | 332 CodedInputStream coded_input_stream(input_stream_.get()); |
327 if (!coded_input_stream.ReadVarint32(&message_size_)) | 333 if (!coded_input_stream.ReadVarint32(&message_size_)) |
328 need_another_byte = true; | 334 need_another_byte = true; |
329 } | 335 } |
330 | 336 |
331 if (need_another_byte) { | 337 if (need_another_byte) { |
332 DVLOG(1) << "Expecting another message size byte."; | 338 DVLOG(1) << "Expecting another message size byte."; |
333 if (prev_byte_count >= kSizePacketLenMax) { | 339 if (prev_byte_count >= kSizePacketLenMax) { |
334 // Already had enough bytes, something else went wrong. | 340 // Already had enough bytes, something else went wrong. |
335 LOG(ERROR) << "Failed to process message size."; | 341 LOG(ERROR) << "Failed to process message size, too many bytes needed."; |
336 read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); | 342 connection_callback_.Run(net::ERR_FILE_TOO_BIG); |
337 return; | 343 return; |
338 } | 344 } |
339 // Back up by the amount read (should always be 1 byte). | 345 // Back up by the amount read (should always be 1 byte). |
340 int bytes_read = prev_byte_count - input_stream_->ByteCount(); | 346 int bytes_read = prev_byte_count - input_stream_->UnreadByteCount(); |
341 DCHECK_EQ(bytes_read, 1); | 347 DCHECK_EQ(bytes_read, 1); |
342 input_stream_->BackUp(bytes_read); | 348 input_stream_->BackUp(bytes_read); |
343 WaitForData(MCS_FULL_SIZE); | 349 WaitForData(MCS_FULL_SIZE); |
344 return; | 350 return; |
345 } | 351 } |
346 | 352 |
347 DVLOG(1) << "Proto size: " << message_size_; | 353 DVLOG(1) << "Proto size: " << message_size_; |
348 | 354 |
349 if (message_size_ > 0) | 355 if (message_size_ > 0) |
350 WaitForData(MCS_PROTO_BYTES); | 356 WaitForData(MCS_PROTO_BYTES); |
351 else | 357 else |
352 OnGotMessageBytes(); | 358 OnGotMessageBytes(); |
353 } | 359 } |
354 | 360 |
355 void ConnectionHandlerImpl::OnGotMessageBytes() { | 361 void ConnectionHandlerImpl::OnGotMessageBytes() { |
356 read_timeout_timer_.Stop(); | 362 read_timeout_timer_.Stop(); |
357 scoped_ptr<google::protobuf::MessageLite> protobuf( | 363 scoped_ptr<google::protobuf::MessageLite> protobuf( |
358 BuildProtobufFromTag(message_tag_)); | 364 BuildProtobufFromTag(message_tag_)); |
359 // Messages with no content are valid; just use the default protobuf for | 365 // Messages with no content are valid; just use the default protobuf for |
360 // that tag. | 366 // that tag. |
361 if (protobuf.get() && message_size_ == 0) { | 367 if (protobuf.get() && message_size_ == 0) { |
362 base::MessageLoop::current()->PostTask( | 368 base::MessageLoop::current()->PostTask( |
363 FROM_HERE, | 369 FROM_HERE, |
364 base::Bind(&ConnectionHandlerImpl::GetNextMessage, | 370 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
365 weak_ptr_factory_.GetWeakPtr())); | 371 weak_ptr_factory_.GetWeakPtr())); |
366 read_callback_.Run(protobuf.Pass()); | 372 read_callback_.Run(protobuf.Pass()); |
367 return; | 373 return; |
368 } | 374 } |
369 | 375 |
370 if (!protobuf.get() || | 376 if (input_stream_->GetState() != SocketInputStream::READY) { |
371 input_stream_->GetState() != SocketInputStream::READY) { | |
372 LOG(ERROR) << "Failed to extract protobuf bytes of type " | 377 LOG(ERROR) << "Failed to extract protobuf bytes of type " |
373 << static_cast<unsigned int>(message_tag_); | 378 << static_cast<unsigned int>(message_tag_); |
374 // Reset the connection. | 379 // Reset the connection. |
375 connection_callback_.Run(net::ERR_FAILED); | 380 connection_callback_.Run(net::ERR_FAILED); |
376 return; | 381 return; |
377 } | 382 } |
378 | 383 |
379 { | 384 if (protobuf.get()) { |
fgorski
2014/10/20 22:11:54
nit: did you consider putting all of the failure c
Nicolas Zea
2014/10/21 00:02:54
Done.
| |
380 CodedInputStream coded_input_stream(input_stream_.get()); | 385 CodedInputStream coded_input_stream(input_stream_.get()); |
381 if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) { | 386 if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) { |
382 LOG(ERROR) << "Unable to parse GCM message of type " | 387 LOG(ERROR) << "Unable to parse GCM message of type " |
383 << static_cast<unsigned int>(message_tag_); | 388 << static_cast<unsigned int>(message_tag_); |
384 // Reset the connection. | 389 // Reset the connection. |
385 connection_callback_.Run(net::ERR_FAILED); | 390 connection_callback_.Run(net::ERR_FAILED); |
386 return; | 391 return; |
387 } | 392 } |
393 } else { | |
394 LOG(ERROR) << "Received message of invalid type " | |
395 << static_cast<unsigned int>(message_tag_); | |
396 connection_callback_.Run(net::ERR_INVALID_ARGUMENT); | |
397 return; | |
388 } | 398 } |
389 | 399 |
390 input_stream_->RebuildBuffer(); | 400 input_stream_->RebuildBuffer(); |
391 base::MessageLoop::current()->PostTask( | 401 base::MessageLoop::current()->PostTask( |
392 FROM_HERE, | 402 FROM_HERE, |
393 base::Bind(&ConnectionHandlerImpl::GetNextMessage, | 403 base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
394 weak_ptr_factory_.GetWeakPtr())); | 404 weak_ptr_factory_.GetWeakPtr())); |
395 if (message_tag_ == kLoginResponseTag) { | 405 if (message_tag_ == kLoginResponseTag) { |
396 if (handshake_complete_) { | 406 if (handshake_complete_) { |
397 LOG(ERROR) << "Unexpected login response."; | 407 LOG(ERROR) << "Unexpected login response."; |
(...skipping 20 matching lines...) Expand all Loading... | |
418 socket_ = NULL; | 428 socket_ = NULL; |
419 handshake_complete_ = false; | 429 handshake_complete_ = false; |
420 message_tag_ = 0; | 430 message_tag_ = 0; |
421 message_size_ = 0; | 431 message_size_ = 0; |
422 input_stream_.reset(); | 432 input_stream_.reset(); |
423 output_stream_.reset(); | 433 output_stream_.reset(); |
424 weak_ptr_factory_.InvalidateWeakPtrs(); | 434 weak_ptr_factory_.InvalidateWeakPtrs(); |
425 } | 435 } |
426 | 436 |
427 } // namespace gcm | 437 } // namespace gcm |
OLD | NEW |