Index: google_apis/gcm/engine/connection_handler_impl.cc |
diff --git a/google_apis/gcm/engine/connection_handler.cc b/google_apis/gcm/engine/connection_handler_impl.cc |
similarity index 86% |
copy from google_apis/gcm/engine/connection_handler.cc |
copy to google_apis/gcm/engine/connection_handler_impl.cc |
index b4eb602c7f46f9f3b46e242bc3c1e2780184cb57..aff0dfd365180da5cd19dbcc2e0f6230b4453cc6 100644 |
--- a/google_apis/gcm/engine/connection_handler.cc |
+++ b/google_apis/gcm/engine/connection_handler_impl.cc |
@@ -2,12 +2,13 @@ |
// Use of this source code is governed by a BSD-style license that can be |
// found in the LICENSE file. |
-#include "google_apis/gcm/engine/connection_handler.h" |
+#include "google_apis/gcm/engine/connection_handler_impl.h" |
#include "base/message_loop/message_loop.h" |
#include "google/protobuf/io/coded_stream.h" |
#include "google_apis/gcm/base/mcs_util.h" |
#include "google_apis/gcm/base/socket_stream.h" |
+#include "google_apis/gcm/protocol/mcs.pb.h" |
#include "net/base/net_errors.h" |
#include "net/socket/stream_socket.h" |
@@ -26,30 +27,35 @@ const int kSizePacketLenMin = 1; |
const int kSizePacketLenMax = 2; |
// The current MCS protocol version. |
+// TODO(zea): bump to 41 once the server supports it. |
const int kMCSVersion = 38; |
} // namespace |
-ConnectionHandler::ConnectionHandler(base::TimeDelta read_timeout) |
+ConnectionHandlerImpl::ConnectionHandlerImpl( |
+ base::TimeDelta read_timeout, |
+ const ProtoReceivedCallback& read_callback, |
+ const ProtoSentCallback& write_callback, |
+ const ConnectionChangedCallback& connection_callback) |
: read_timeout_(read_timeout), |
handshake_complete_(false), |
message_tag_(0), |
message_size_(0), |
+ read_callback_(read_callback), |
+ write_callback_(write_callback), |
+ connection_callback_(connection_callback), |
weak_ptr_factory_(this) { |
} |
-ConnectionHandler::~ConnectionHandler() { |
+ConnectionHandlerImpl::~ConnectionHandlerImpl() { |
} |
-void ConnectionHandler::Init( |
- scoped_ptr<net::StreamSocket> socket, |
- const google::protobuf::MessageLite& login_request, |
- const ProtoReceivedCallback& read_callback, |
- const ProtoSentCallback& write_callback, |
- const ConnectionChangedCallback& connection_callback) { |
- DCHECK(!read_callback.is_null()); |
- DCHECK(!write_callback.is_null()); |
- DCHECK(!connection_callback.is_null()); |
+void ConnectionHandlerImpl::Init( |
+ const mcs_proto::LoginRequest& login_request, |
+ scoped_ptr<net::StreamSocket> socket) { |
+ DCHECK(!read_callback_.is_null()); |
+ DCHECK(!write_callback_.is_null()); |
+ DCHECK(!connection_callback_.is_null()); |
// Invalidate any previously outstanding reads. |
weak_ptr_factory_.InvalidateWeakPtrs(); |
@@ -60,19 +66,16 @@ void ConnectionHandler::Init( |
socket_ = socket.Pass(); |
input_stream_.reset(new SocketInputStream(socket_.get())); |
output_stream_.reset(new SocketOutputStream(socket_.get())); |
- read_callback_ = read_callback; |
- write_callback_ = write_callback; |
- connection_callback_ = connection_callback; |
Login(login_request); |
} |
-bool ConnectionHandler::CanSendMessage() const { |
+bool ConnectionHandlerImpl::CanSendMessage() const { |
return handshake_complete_ && output_stream_.get() && |
output_stream_->GetState() == SocketOutputStream::EMPTY; |
} |
-void ConnectionHandler::SendMessage( |
+void ConnectionHandlerImpl::SendMessage( |
const google::protobuf::MessageLite& message) { |
DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); |
DCHECK(handshake_complete_); |
@@ -88,13 +91,13 @@ void ConnectionHandler::SendMessage( |
} |
if (output_stream_->Flush( |
- base::Bind(&ConnectionHandler::OnMessageSent, |
+ base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { |
OnMessageSent(); |
} |
} |
-void ConnectionHandler::Login( |
+void ConnectionHandlerImpl::Login( |
const google::protobuf::MessageLite& login_request) { |
DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY); |
@@ -109,22 +112,22 @@ void ConnectionHandler::Login( |
} |
if (output_stream_->Flush( |
- base::Bind(&ConnectionHandler::OnMessageSent, |
+ base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) { |
base::MessageLoop::current()->PostTask( |
FROM_HERE, |
- base::Bind(&ConnectionHandler::OnMessageSent, |
+ base::Bind(&ConnectionHandlerImpl::OnMessageSent, |
weak_ptr_factory_.GetWeakPtr())); |
} |
read_timeout_timer_.Start(FROM_HERE, |
read_timeout_, |
- base::Bind(&ConnectionHandler::OnTimeout, |
+ base::Bind(&ConnectionHandlerImpl::OnTimeout, |
weak_ptr_factory_.GetWeakPtr())); |
WaitForData(MCS_VERSION_TAG_AND_SIZE); |
} |
-void ConnectionHandler::OnMessageSent() { |
+void ConnectionHandlerImpl::OnMessageSent() { |
if (!output_stream_.get()) { |
// The connection has already been closed. Just return. |
DCHECK(!input_stream_.get()); |
@@ -145,7 +148,7 @@ void ConnectionHandler::OnMessageSent() { |
write_callback_.Run(); |
} |
-void ConnectionHandler::GetNextMessage() { |
+void ConnectionHandlerImpl::GetNextMessage() { |
DCHECK(SocketInputStream::EMPTY == input_stream_->GetState() || |
SocketInputStream::READY == input_stream_->GetState()); |
message_tag_ = 0; |
@@ -154,7 +157,7 @@ void ConnectionHandler::GetNextMessage() { |
WaitForData(MCS_TAG_AND_SIZE); |
} |
-void ConnectionHandler::WaitForData(ProcessingState state) { |
+void ConnectionHandlerImpl::WaitForData(ProcessingState state) { |
DVLOG(1) << "Waiting for MCS data: state == " << state; |
if (!input_stream_) { |
@@ -210,7 +213,7 @@ void ConnectionHandler::WaitForData(ProcessingState state) { |
int byte_count = input_stream_->UnreadByteCount(); |
if (min_bytes_needed - byte_count > 0 && |
input_stream_->Refresh( |
- base::Bind(&ConnectionHandler::WaitForData, |
+ base::Bind(&ConnectionHandlerImpl::WaitForData, |
weak_ptr_factory_.GetWeakPtr(), |
state), |
max_bytes_needed - byte_count) == net::ERR_IO_PENDING) { |
@@ -249,7 +252,7 @@ void ConnectionHandler::WaitForData(ProcessingState state) { |
} |
} |
-void ConnectionHandler::OnGotVersion() { |
+void ConnectionHandlerImpl::OnGotVersion() { |
uint8 version = 0; |
{ |
CodedInputStream coded_input_stream(input_stream_.get()); |
@@ -267,7 +270,7 @@ void ConnectionHandler::OnGotVersion() { |
OnGotMessageTag(); |
} |
-void ConnectionHandler::OnGotMessageTag() { |
+void ConnectionHandlerImpl::OnGotMessageTag() { |
if (input_stream_->GetState() != SocketInputStream::READY) { |
LOG(ERROR) << "Failed to receive protobuf tag."; |
read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); |
@@ -285,13 +288,13 @@ void ConnectionHandler::OnGotMessageTag() { |
if (!read_timeout_timer_.IsRunning()) { |
read_timeout_timer_.Start(FROM_HERE, |
read_timeout_, |
- base::Bind(&ConnectionHandler::OnTimeout, |
+ base::Bind(&ConnectionHandlerImpl::OnTimeout, |
weak_ptr_factory_.GetWeakPtr())); |
} |
OnGotMessageSize(); |
} |
-void ConnectionHandler::OnGotMessageSize() { |
+void ConnectionHandlerImpl::OnGotMessageSize() { |
if (input_stream_->GetState() != SocketInputStream::READY) { |
LOG(ERROR) << "Failed to receive message size."; |
read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>()); |
@@ -330,7 +333,7 @@ void ConnectionHandler::OnGotMessageSize() { |
OnGotMessageBytes(); |
} |
-void ConnectionHandler::OnGotMessageBytes() { |
+void ConnectionHandlerImpl::OnGotMessageBytes() { |
read_timeout_timer_.Stop(); |
scoped_ptr<google::protobuf::MessageLite> protobuf( |
BuildProtobufFromTag(message_tag_)); |
@@ -339,7 +342,7 @@ void ConnectionHandler::OnGotMessageBytes() { |
if (protobuf.get() && message_size_ == 0) { |
base::MessageLoop::current()->PostTask( |
FROM_HERE, |
- base::Bind(&ConnectionHandler::GetNextMessage, |
+ base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
weak_ptr_factory_.GetWeakPtr())); |
read_callback_.Run(protobuf.Pass()); |
return; |
@@ -368,7 +371,7 @@ void ConnectionHandler::OnGotMessageBytes() { |
input_stream_->RebuildBuffer(); |
base::MessageLoop::current()->PostTask( |
FROM_HERE, |
- base::Bind(&ConnectionHandler::GetNextMessage, |
+ base::Bind(&ConnectionHandlerImpl::GetNextMessage, |
weak_ptr_factory_.GetWeakPtr())); |
if (message_tag_ == kLoginResponseTag) { |
if (handshake_complete_) { |
@@ -381,13 +384,13 @@ void ConnectionHandler::OnGotMessageBytes() { |
read_callback_.Run(protobuf.Pass()); |
} |
-void ConnectionHandler::OnTimeout() { |
+void ConnectionHandlerImpl::OnTimeout() { |
LOG(ERROR) << "Timed out waiting for GCM Protocol buffer."; |
CloseConnection(); |
connection_callback_.Run(net::ERR_TIMED_OUT); |
} |
-void ConnectionHandler::CloseConnection() { |
+void ConnectionHandlerImpl::CloseConnection() { |
DVLOG(1) << "Closing connection."; |
read_callback_.Reset(); |
write_callback_.Reset(); |