| Index: google_apis/gcm/engine/connection_handler_impl.cc
|
| diff --git a/google_apis/gcm/engine/connection_handler.cc b/google_apis/gcm/engine/connection_handler_impl.cc
|
| similarity index 86%
|
| copy from google_apis/gcm/engine/connection_handler.cc
|
| copy to google_apis/gcm/engine/connection_handler_impl.cc
|
| index b4eb602c7f46f9f3b46e242bc3c1e2780184cb57..033b37e744d1aea45c408e6c968390037aa6277f 100644
|
| --- a/google_apis/gcm/engine/connection_handler.cc
|
| +++ b/google_apis/gcm/engine/connection_handler_impl.cc
|
| @@ -2,7 +2,7 @@
|
| // Use of this source code is governed by a BSD-style license that can be
|
| // found in the LICENSE file.
|
|
|
| -#include "google_apis/gcm/engine/connection_handler.h"
|
| +#include "google_apis/gcm/engine/connection_handler_impl.h"
|
|
|
| #include "base/message_loop/message_loop.h"
|
| #include "google/protobuf/io/coded_stream.h"
|
| @@ -26,30 +26,35 @@ const int kSizePacketLenMin = 1;
|
| const int kSizePacketLenMax = 2;
|
|
|
| // The current MCS protocol version.
|
| +// TODO(zea): bump to 41 once the server supports it.
|
| const int kMCSVersion = 38;
|
|
|
| } // namespace
|
|
|
| -ConnectionHandler::ConnectionHandler(base::TimeDelta read_timeout)
|
| +ConnectionHandlerImpl::ConnectionHandlerImpl(
|
| + base::TimeDelta read_timeout,
|
| + const ProtoReceivedCallback& read_callback,
|
| + const ProtoSentCallback& write_callback,
|
| + const ConnectionChangedCallback& connection_callback)
|
| : read_timeout_(read_timeout),
|
| handshake_complete_(false),
|
| message_tag_(0),
|
| message_size_(0),
|
| + read_callback_(read_callback),
|
| + write_callback_(write_callback),
|
| + connection_callback_(connection_callback),
|
| weak_ptr_factory_(this) {
|
| }
|
|
|
| -ConnectionHandler::~ConnectionHandler() {
|
| +ConnectionHandlerImpl::~ConnectionHandlerImpl() {
|
| }
|
|
|
| -void ConnectionHandler::Init(
|
| - scoped_ptr<net::StreamSocket> socket,
|
| - const google::protobuf::MessageLite& login_request,
|
| - const ProtoReceivedCallback& read_callback,
|
| - const ProtoSentCallback& write_callback,
|
| - const ConnectionChangedCallback& connection_callback) {
|
| - DCHECK(!read_callback.is_null());
|
| - DCHECK(!write_callback.is_null());
|
| - DCHECK(!connection_callback.is_null());
|
| +void ConnectionHandlerImpl::Init(
|
| + const mcs_proto::LoginRequest& login_request,
|
| + scoped_ptr<net::StreamSocket> socket) {
|
| + DCHECK(!read_callback_.is_null());
|
| + DCHECK(!write_callback_.is_null());
|
| + DCHECK(!connection_callback_.is_null());
|
|
|
| // Invalidate any previously outstanding reads.
|
| weak_ptr_factory_.InvalidateWeakPtrs();
|
| @@ -60,19 +65,16 @@ void ConnectionHandler::Init(
|
| socket_ = socket.Pass();
|
| input_stream_.reset(new SocketInputStream(socket_.get()));
|
| output_stream_.reset(new SocketOutputStream(socket_.get()));
|
| - read_callback_ = read_callback;
|
| - write_callback_ = write_callback;
|
| - connection_callback_ = connection_callback;
|
|
|
| Login(login_request);
|
| }
|
|
|
| -bool ConnectionHandler::CanSendMessage() const {
|
| +bool ConnectionHandlerImpl::CanSendMessage() const {
|
| return handshake_complete_ && output_stream_.get() &&
|
| output_stream_->GetState() == SocketOutputStream::EMPTY;
|
| }
|
|
|
| -void ConnectionHandler::SendMessage(
|
| +void ConnectionHandlerImpl::SendMessage(
|
| const google::protobuf::MessageLite& message) {
|
| DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY);
|
| DCHECK(handshake_complete_);
|
| @@ -88,13 +90,13 @@ void ConnectionHandler::SendMessage(
|
| }
|
|
|
| if (output_stream_->Flush(
|
| - base::Bind(&ConnectionHandler::OnMessageSent,
|
| + base::Bind(&ConnectionHandlerImpl::OnMessageSent,
|
| weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) {
|
| OnMessageSent();
|
| }
|
| }
|
|
|
| -void ConnectionHandler::Login(
|
| +void ConnectionHandlerImpl::Login(
|
| const google::protobuf::MessageLite& login_request) {
|
| DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY);
|
|
|
| @@ -109,22 +111,22 @@ void ConnectionHandler::Login(
|
| }
|
|
|
| if (output_stream_->Flush(
|
| - base::Bind(&ConnectionHandler::OnMessageSent,
|
| + base::Bind(&ConnectionHandlerImpl::OnMessageSent,
|
| weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) {
|
| base::MessageLoop::current()->PostTask(
|
| FROM_HERE,
|
| - base::Bind(&ConnectionHandler::OnMessageSent,
|
| + base::Bind(&ConnectionHandlerImpl::OnMessageSent,
|
| weak_ptr_factory_.GetWeakPtr()));
|
| }
|
|
|
| read_timeout_timer_.Start(FROM_HERE,
|
| read_timeout_,
|
| - base::Bind(&ConnectionHandler::OnTimeout,
|
| + base::Bind(&ConnectionHandlerImpl::OnTimeout,
|
| weak_ptr_factory_.GetWeakPtr()));
|
| WaitForData(MCS_VERSION_TAG_AND_SIZE);
|
| }
|
|
|
| -void ConnectionHandler::OnMessageSent() {
|
| +void ConnectionHandlerImpl::OnMessageSent() {
|
| if (!output_stream_.get()) {
|
| // The connection has already been closed. Just return.
|
| DCHECK(!input_stream_.get());
|
| @@ -145,7 +147,7 @@ void ConnectionHandler::OnMessageSent() {
|
| write_callback_.Run();
|
| }
|
|
|
| -void ConnectionHandler::GetNextMessage() {
|
| +void ConnectionHandlerImpl::GetNextMessage() {
|
| DCHECK(SocketInputStream::EMPTY == input_stream_->GetState() ||
|
| SocketInputStream::READY == input_stream_->GetState());
|
| message_tag_ = 0;
|
| @@ -154,7 +156,7 @@ void ConnectionHandler::GetNextMessage() {
|
| WaitForData(MCS_TAG_AND_SIZE);
|
| }
|
|
|
| -void ConnectionHandler::WaitForData(ProcessingState state) {
|
| +void ConnectionHandlerImpl::WaitForData(ProcessingState state) {
|
| DVLOG(1) << "Waiting for MCS data: state == " << state;
|
|
|
| if (!input_stream_) {
|
| @@ -210,7 +212,7 @@ void ConnectionHandler::WaitForData(ProcessingState state) {
|
| int byte_count = input_stream_->UnreadByteCount();
|
| if (min_bytes_needed - byte_count > 0 &&
|
| input_stream_->Refresh(
|
| - base::Bind(&ConnectionHandler::WaitForData,
|
| + base::Bind(&ConnectionHandlerImpl::WaitForData,
|
| weak_ptr_factory_.GetWeakPtr(),
|
| state),
|
| max_bytes_needed - byte_count) == net::ERR_IO_PENDING) {
|
| @@ -249,7 +251,7 @@ void ConnectionHandler::WaitForData(ProcessingState state) {
|
| }
|
| }
|
|
|
| -void ConnectionHandler::OnGotVersion() {
|
| +void ConnectionHandlerImpl::OnGotVersion() {
|
| uint8 version = 0;
|
| {
|
| CodedInputStream coded_input_stream(input_stream_.get());
|
| @@ -267,7 +269,7 @@ void ConnectionHandler::OnGotVersion() {
|
| OnGotMessageTag();
|
| }
|
|
|
| -void ConnectionHandler::OnGotMessageTag() {
|
| +void ConnectionHandlerImpl::OnGotMessageTag() {
|
| if (input_stream_->GetState() != SocketInputStream::READY) {
|
| LOG(ERROR) << "Failed to receive protobuf tag.";
|
| read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
|
| @@ -285,13 +287,13 @@ void ConnectionHandler::OnGotMessageTag() {
|
| if (!read_timeout_timer_.IsRunning()) {
|
| read_timeout_timer_.Start(FROM_HERE,
|
| read_timeout_,
|
| - base::Bind(&ConnectionHandler::OnTimeout,
|
| + base::Bind(&ConnectionHandlerImpl::OnTimeout,
|
| weak_ptr_factory_.GetWeakPtr()));
|
| }
|
| OnGotMessageSize();
|
| }
|
|
|
| -void ConnectionHandler::OnGotMessageSize() {
|
| +void ConnectionHandlerImpl::OnGotMessageSize() {
|
| if (input_stream_->GetState() != SocketInputStream::READY) {
|
| LOG(ERROR) << "Failed to receive message size.";
|
| read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
|
| @@ -330,7 +332,7 @@ void ConnectionHandler::OnGotMessageSize() {
|
| OnGotMessageBytes();
|
| }
|
|
|
| -void ConnectionHandler::OnGotMessageBytes() {
|
| +void ConnectionHandlerImpl::OnGotMessageBytes() {
|
| read_timeout_timer_.Stop();
|
| scoped_ptr<google::protobuf::MessageLite> protobuf(
|
| BuildProtobufFromTag(message_tag_));
|
| @@ -339,7 +341,7 @@ void ConnectionHandler::OnGotMessageBytes() {
|
| if (protobuf.get() && message_size_ == 0) {
|
| base::MessageLoop::current()->PostTask(
|
| FROM_HERE,
|
| - base::Bind(&ConnectionHandler::GetNextMessage,
|
| + base::Bind(&ConnectionHandlerImpl::GetNextMessage,
|
| weak_ptr_factory_.GetWeakPtr()));
|
| read_callback_.Run(protobuf.Pass());
|
| return;
|
| @@ -368,7 +370,7 @@ void ConnectionHandler::OnGotMessageBytes() {
|
| input_stream_->RebuildBuffer();
|
| base::MessageLoop::current()->PostTask(
|
| FROM_HERE,
|
| - base::Bind(&ConnectionHandler::GetNextMessage,
|
| + base::Bind(&ConnectionHandlerImpl::GetNextMessage,
|
| weak_ptr_factory_.GetWeakPtr()));
|
| if (message_tag_ == kLoginResponseTag) {
|
| if (handshake_complete_) {
|
| @@ -381,13 +383,13 @@ void ConnectionHandler::OnGotMessageBytes() {
|
| read_callback_.Run(protobuf.Pass());
|
| }
|
|
|
| -void ConnectionHandler::OnTimeout() {
|
| +void ConnectionHandlerImpl::OnTimeout() {
|
| LOG(ERROR) << "Timed out waiting for GCM Protocol buffer.";
|
| CloseConnection();
|
| connection_callback_.Run(net::ERR_TIMED_OUT);
|
| }
|
|
|
| -void ConnectionHandler::CloseConnection() {
|
| +void ConnectionHandlerImpl::CloseConnection() {
|
| DVLOG(1) << "Closing connection.";
|
| read_callback_.Reset();
|
| write_callback_.Reset();
|
|
|