| Index: google_apis/gcm/engine/connection_handler.cc
|
| diff --git a/google_apis/gcm/engine/connection_handler.cc b/google_apis/gcm/engine/connection_handler.cc
|
| index b4eb602c7f46f9f3b46e242bc3c1e2780184cb57..bc9b65859797d8f49704ec77ae25d9d0b1e0856c 100644
|
| --- a/google_apis/gcm/engine/connection_handler.cc
|
| +++ b/google_apis/gcm/engine/connection_handler.cc
|
| @@ -4,398 +4,12 @@
|
|
|
| #include "google_apis/gcm/engine/connection_handler.h"
|
|
|
| -#include "base/message_loop/message_loop.h"
|
| -#include "google/protobuf/io/coded_stream.h"
|
| -#include "google_apis/gcm/base/mcs_util.h"
|
| -#include "google_apis/gcm/base/socket_stream.h"
|
| -#include "net/base/net_errors.h"
|
| -#include "net/socket/stream_socket.h"
|
| -
|
| -using namespace google::protobuf::io;
|
| -
|
| namespace gcm {
|
|
|
| -namespace {
|
| -
|
| -// # of bytes a MCS version packet consumes.
|
| -const int kVersionPacketLen = 1;
|
| -// # of bytes a tag packet consumes.
|
| -const int kTagPacketLen = 1;
|
| -// Max # of bytes a length packet consumes.
|
| -const int kSizePacketLenMin = 1;
|
| -const int kSizePacketLenMax = 2;
|
| -
|
| -// The current MCS protocol version.
|
| -const int kMCSVersion = 38;
|
| -
|
| -} // namespace
|
| -
|
| -ConnectionHandler::ConnectionHandler(base::TimeDelta read_timeout)
|
| - : read_timeout_(read_timeout),
|
| - handshake_complete_(false),
|
| - message_tag_(0),
|
| - message_size_(0),
|
| - weak_ptr_factory_(this) {
|
| +ConnectionHandler::ConnectionHandler() {
|
| }
|
|
|
| ConnectionHandler::~ConnectionHandler() {
|
| }
|
|
|
| -void ConnectionHandler::Init(
|
| - scoped_ptr<net::StreamSocket> socket,
|
| - const google::protobuf::MessageLite& login_request,
|
| - const ProtoReceivedCallback& read_callback,
|
| - const ProtoSentCallback& write_callback,
|
| - const ConnectionChangedCallback& connection_callback) {
|
| - DCHECK(!read_callback.is_null());
|
| - DCHECK(!write_callback.is_null());
|
| - DCHECK(!connection_callback.is_null());
|
| -
|
| - // Invalidate any previously outstanding reads.
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
| -
|
| - handshake_complete_ = false;
|
| - message_tag_ = 0;
|
| - message_size_ = 0;
|
| - socket_ = socket.Pass();
|
| - input_stream_.reset(new SocketInputStream(socket_.get()));
|
| - output_stream_.reset(new SocketOutputStream(socket_.get()));
|
| - read_callback_ = read_callback;
|
| - write_callback_ = write_callback;
|
| - connection_callback_ = connection_callback;
|
| -
|
| - Login(login_request);
|
| -}
|
| -
|
| -bool ConnectionHandler::CanSendMessage() const {
|
| - return handshake_complete_ && output_stream_.get() &&
|
| - output_stream_->GetState() == SocketOutputStream::EMPTY;
|
| -}
|
| -
|
| -void ConnectionHandler::SendMessage(
|
| - const google::protobuf::MessageLite& message) {
|
| - DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY);
|
| - DCHECK(handshake_complete_);
|
| -
|
| - {
|
| - CodedOutputStream coded_output_stream(output_stream_.get());
|
| - DVLOG(1) << "Writing proto of size " << message.ByteSize();
|
| - int tag = GetMCSProtoTag(message);
|
| - DCHECK_NE(tag, -1);
|
| - coded_output_stream.WriteRaw(&tag, 1);
|
| - coded_output_stream.WriteVarint32(message.ByteSize());
|
| - message.SerializeToCodedStream(&coded_output_stream);
|
| - }
|
| -
|
| - if (output_stream_->Flush(
|
| - base::Bind(&ConnectionHandler::OnMessageSent,
|
| - weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) {
|
| - OnMessageSent();
|
| - }
|
| -}
|
| -
|
| -void ConnectionHandler::Login(
|
| - const google::protobuf::MessageLite& login_request) {
|
| - DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY);
|
| -
|
| - const char version_byte[1] = {kMCSVersion};
|
| - const char login_request_tag[1] = {kLoginRequestTag};
|
| - {
|
| - CodedOutputStream coded_output_stream(output_stream_.get());
|
| - coded_output_stream.WriteRaw(version_byte, 1);
|
| - coded_output_stream.WriteRaw(login_request_tag, 1);
|
| - coded_output_stream.WriteVarint32(login_request.ByteSize());
|
| - login_request.SerializeToCodedStream(&coded_output_stream);
|
| - }
|
| -
|
| - if (output_stream_->Flush(
|
| - base::Bind(&ConnectionHandler::OnMessageSent,
|
| - weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) {
|
| - base::MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&ConnectionHandler::OnMessageSent,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| - }
|
| -
|
| - read_timeout_timer_.Start(FROM_HERE,
|
| - read_timeout_,
|
| - base::Bind(&ConnectionHandler::OnTimeout,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| - WaitForData(MCS_VERSION_TAG_AND_SIZE);
|
| -}
|
| -
|
| -void ConnectionHandler::OnMessageSent() {
|
| - if (!output_stream_.get()) {
|
| - // The connection has already been closed. Just return.
|
| - DCHECK(!input_stream_.get());
|
| - DCHECK(!read_timeout_timer_.IsRunning());
|
| - return;
|
| - }
|
| -
|
| - if (output_stream_->GetState() != SocketOutputStream::EMPTY) {
|
| - int last_error = output_stream_->last_error();
|
| - CloseConnection();
|
| - // If the socket stream had an error, plumb it up, else plumb up FAILED.
|
| - if (last_error == net::OK)
|
| - last_error = net::ERR_FAILED;
|
| - connection_callback_.Run(last_error);
|
| - return;
|
| - }
|
| -
|
| - write_callback_.Run();
|
| -}
|
| -
|
| -void ConnectionHandler::GetNextMessage() {
|
| - DCHECK(SocketInputStream::EMPTY == input_stream_->GetState() ||
|
| - SocketInputStream::READY == input_stream_->GetState());
|
| - message_tag_ = 0;
|
| - message_size_ = 0;
|
| -
|
| - WaitForData(MCS_TAG_AND_SIZE);
|
| -}
|
| -
|
| -void ConnectionHandler::WaitForData(ProcessingState state) {
|
| - DVLOG(1) << "Waiting for MCS data: state == " << state;
|
| -
|
| - if (!input_stream_) {
|
| - // The connection has already been closed. Just return.
|
| - DCHECK(!output_stream_.get());
|
| - DCHECK(!read_timeout_timer_.IsRunning());
|
| - return;
|
| - }
|
| -
|
| - if (input_stream_->GetState() != SocketInputStream::EMPTY &&
|
| - input_stream_->GetState() != SocketInputStream::READY) {
|
| - // An error occurred.
|
| - int last_error = output_stream_->last_error();
|
| - CloseConnection();
|
| - // If the socket stream had an error, plumb it up, else plumb up FAILED.
|
| - if (last_error == net::OK)
|
| - last_error = net::ERR_FAILED;
|
| - connection_callback_.Run(last_error);
|
| - return;
|
| - }
|
| -
|
| - // Used to determine whether a Socket::Read is necessary.
|
| - int min_bytes_needed = 0;
|
| - // Used to limit the size of the Socket::Read.
|
| - int max_bytes_needed = 0;
|
| -
|
| - switch(state) {
|
| - case MCS_VERSION_TAG_AND_SIZE:
|
| - min_bytes_needed = kVersionPacketLen + kTagPacketLen + kSizePacketLenMin;
|
| - max_bytes_needed = kVersionPacketLen + kTagPacketLen + kSizePacketLenMax;
|
| - break;
|
| - case MCS_TAG_AND_SIZE:
|
| - min_bytes_needed = kTagPacketLen + kSizePacketLenMin;
|
| - max_bytes_needed = kTagPacketLen + kSizePacketLenMax;
|
| - break;
|
| - case MCS_FULL_SIZE:
|
| - // If in this state, the minimum size packet length must already have been
|
| - // insufficient, so set both to the max length.
|
| - min_bytes_needed = kSizePacketLenMax;
|
| - max_bytes_needed = kSizePacketLenMax;
|
| - break;
|
| - case MCS_PROTO_BYTES:
|
| - read_timeout_timer_.Reset();
|
| - // No variability in the message size, set both to the same.
|
| - min_bytes_needed = message_size_;
|
| - max_bytes_needed = message_size_;
|
| - break;
|
| - default:
|
| - NOTREACHED();
|
| - }
|
| - DCHECK_GE(max_bytes_needed, min_bytes_needed);
|
| -
|
| - int byte_count = input_stream_->UnreadByteCount();
|
| - if (min_bytes_needed - byte_count > 0 &&
|
| - input_stream_->Refresh(
|
| - base::Bind(&ConnectionHandler::WaitForData,
|
| - weak_ptr_factory_.GetWeakPtr(),
|
| - state),
|
| - max_bytes_needed - byte_count) == net::ERR_IO_PENDING) {
|
| - return;
|
| - }
|
| -
|
| - // Check for refresh errors.
|
| - if (input_stream_->GetState() != SocketInputStream::READY) {
|
| - // An error occurred.
|
| - int last_error = output_stream_->last_error();
|
| - CloseConnection();
|
| - // If the socket stream had an error, plumb it up, else plumb up FAILED.
|
| - if (last_error == net::OK)
|
| - last_error = net::ERR_FAILED;
|
| - connection_callback_.Run(last_error);
|
| - return;
|
| - }
|
| -
|
| - // Received enough bytes, process them.
|
| - DVLOG(1) << "Processing MCS data: state == " << state;
|
| - switch(state) {
|
| - case MCS_VERSION_TAG_AND_SIZE:
|
| - OnGotVersion();
|
| - break;
|
| - case MCS_TAG_AND_SIZE:
|
| - OnGotMessageTag();
|
| - break;
|
| - case MCS_FULL_SIZE:
|
| - OnGotMessageSize();
|
| - break;
|
| - case MCS_PROTO_BYTES:
|
| - OnGotMessageBytes();
|
| - break;
|
| - default:
|
| - NOTREACHED();
|
| - }
|
| -}
|
| -
|
| -void ConnectionHandler::OnGotVersion() {
|
| - uint8 version = 0;
|
| - {
|
| - CodedInputStream coded_input_stream(input_stream_.get());
|
| - coded_input_stream.ReadRaw(&version, 1);
|
| - }
|
| - if (version < kMCSVersion) {
|
| - LOG(ERROR) << "Invalid GCM version response: " << static_cast<int>(version);
|
| - connection_callback_.Run(net::ERR_FAILED);
|
| - return;
|
| - }
|
| -
|
| - input_stream_->RebuildBuffer();
|
| -
|
| - // Process the LoginResponse message tag.
|
| - OnGotMessageTag();
|
| -}
|
| -
|
| -void ConnectionHandler::OnGotMessageTag() {
|
| - if (input_stream_->GetState() != SocketInputStream::READY) {
|
| - LOG(ERROR) << "Failed to receive protobuf tag.";
|
| - read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
|
| - return;
|
| - }
|
| -
|
| - {
|
| - CodedInputStream coded_input_stream(input_stream_.get());
|
| - coded_input_stream.ReadRaw(&message_tag_, 1);
|
| - }
|
| -
|
| - DVLOG(1) << "Received proto of type "
|
| - << static_cast<unsigned int>(message_tag_);
|
| -
|
| - if (!read_timeout_timer_.IsRunning()) {
|
| - read_timeout_timer_.Start(FROM_HERE,
|
| - read_timeout_,
|
| - base::Bind(&ConnectionHandler::OnTimeout,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| - }
|
| - OnGotMessageSize();
|
| -}
|
| -
|
| -void ConnectionHandler::OnGotMessageSize() {
|
| - if (input_stream_->GetState() != SocketInputStream::READY) {
|
| - LOG(ERROR) << "Failed to receive message size.";
|
| - read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
|
| - return;
|
| - }
|
| -
|
| - bool need_another_byte = false;
|
| - int prev_byte_count = input_stream_->ByteCount();
|
| - {
|
| - CodedInputStream coded_input_stream(input_stream_.get());
|
| - if (!coded_input_stream.ReadVarint32(&message_size_))
|
| - need_another_byte = true;
|
| - }
|
| -
|
| - if (need_another_byte) {
|
| - DVLOG(1) << "Expecting another message size byte.";
|
| - if (prev_byte_count >= kSizePacketLenMax) {
|
| - // Already had enough bytes, something else went wrong.
|
| - LOG(ERROR) << "Failed to process message size.";
|
| - read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
|
| - return;
|
| - }
|
| - // Back up by the amount read (should always be 1 byte).
|
| - int bytes_read = prev_byte_count - input_stream_->ByteCount();
|
| - DCHECK_EQ(bytes_read, 1);
|
| - input_stream_->BackUp(bytes_read);
|
| - WaitForData(MCS_FULL_SIZE);
|
| - return;
|
| - }
|
| -
|
| - DVLOG(1) << "Proto size: " << message_size_;
|
| -
|
| - if (message_size_ > 0)
|
| - WaitForData(MCS_PROTO_BYTES);
|
| - else
|
| - OnGotMessageBytes();
|
| -}
|
| -
|
| -void ConnectionHandler::OnGotMessageBytes() {
|
| - read_timeout_timer_.Stop();
|
| - scoped_ptr<google::protobuf::MessageLite> protobuf(
|
| - BuildProtobufFromTag(message_tag_));
|
| - // Messages with no content are valid; just use the default protobuf for
|
| - // that tag.
|
| - if (protobuf.get() && message_size_ == 0) {
|
| - base::MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&ConnectionHandler::GetNextMessage,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| - read_callback_.Run(protobuf.Pass());
|
| - return;
|
| - }
|
| -
|
| - if (!protobuf.get() ||
|
| - input_stream_->GetState() != SocketInputStream::READY) {
|
| - LOG(ERROR) << "Failed to extract protobuf bytes of type "
|
| - << static_cast<unsigned int>(message_tag_);
|
| - protobuf.reset(); // Return a null pointer to denote an error.
|
| - read_callback_.Run(protobuf.Pass());
|
| - return;
|
| - }
|
| -
|
| - {
|
| - CodedInputStream coded_input_stream(input_stream_.get());
|
| - if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) {
|
| - NOTREACHED() << "Unable to parse GCM message of type "
|
| - << static_cast<unsigned int>(message_tag_);
|
| - protobuf.reset(); // Return a null pointer to denote an error.
|
| - read_callback_.Run(protobuf.Pass());
|
| - return;
|
| - }
|
| - }
|
| -
|
| - input_stream_->RebuildBuffer();
|
| - base::MessageLoop::current()->PostTask(
|
| - FROM_HERE,
|
| - base::Bind(&ConnectionHandler::GetNextMessage,
|
| - weak_ptr_factory_.GetWeakPtr()));
|
| - if (message_tag_ == kLoginResponseTag) {
|
| - if (handshake_complete_) {
|
| - LOG(ERROR) << "Unexpected login response.";
|
| - } else {
|
| - handshake_complete_ = true;
|
| - DVLOG(1) << "GCM Handshake complete.";
|
| - }
|
| - }
|
| - read_callback_.Run(protobuf.Pass());
|
| -}
|
| -
|
| -void ConnectionHandler::OnTimeout() {
|
| - LOG(ERROR) << "Timed out waiting for GCM Protocol buffer.";
|
| - CloseConnection();
|
| - connection_callback_.Run(net::ERR_TIMED_OUT);
|
| -}
|
| -
|
| -void ConnectionHandler::CloseConnection() {
|
| - DVLOG(1) << "Closing connection.";
|
| - read_callback_.Reset();
|
| - write_callback_.Reset();
|
| - read_timeout_timer_.Stop();
|
| - socket_->Disconnect();
|
| - input_stream_.reset();
|
| - output_stream_.reset();
|
| - weak_ptr_factory_.InvalidateWeakPtrs();
|
| -}
|
| -
|
| } // namespace gcm
|
|
|