Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(215)

Unified Diff: google_apis/gcm/engine/connection_handler_impl.cc

Issue 54743007: [GCM] Add connection factory for creating MCS connections (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix compile Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: google_apis/gcm/engine/connection_handler_impl.cc
diff --git a/google_apis/gcm/engine/connection_handler.cc b/google_apis/gcm/engine/connection_handler_impl.cc
similarity index 86%
copy from google_apis/gcm/engine/connection_handler.cc
copy to google_apis/gcm/engine/connection_handler_impl.cc
index b4eb602c7f46f9f3b46e242bc3c1e2780184cb57..aff0dfd365180da5cd19dbcc2e0f6230b4453cc6 100644
--- a/google_apis/gcm/engine/connection_handler.cc
+++ b/google_apis/gcm/engine/connection_handler_impl.cc
@@ -2,12 +2,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
-#include "google_apis/gcm/engine/connection_handler.h"
+#include "google_apis/gcm/engine/connection_handler_impl.h"
#include "base/message_loop/message_loop.h"
#include "google/protobuf/io/coded_stream.h"
#include "google_apis/gcm/base/mcs_util.h"
#include "google_apis/gcm/base/socket_stream.h"
+#include "google_apis/gcm/protocol/mcs.pb.h"
#include "net/base/net_errors.h"
#include "net/socket/stream_socket.h"
@@ -26,30 +27,35 @@ const int kSizePacketLenMin = 1;
const int kSizePacketLenMax = 2;
// The current MCS protocol version.
+// TODO(zea): bump to 41 once the server supports it.
const int kMCSVersion = 38;
} // namespace
-ConnectionHandler::ConnectionHandler(base::TimeDelta read_timeout)
+ConnectionHandlerImpl::ConnectionHandlerImpl(
+ base::TimeDelta read_timeout,
+ const ProtoReceivedCallback& read_callback,
+ const ProtoSentCallback& write_callback,
+ const ConnectionChangedCallback& connection_callback)
: read_timeout_(read_timeout),
handshake_complete_(false),
message_tag_(0),
message_size_(0),
+ read_callback_(read_callback),
+ write_callback_(write_callback),
+ connection_callback_(connection_callback),
weak_ptr_factory_(this) {
}
-ConnectionHandler::~ConnectionHandler() {
+ConnectionHandlerImpl::~ConnectionHandlerImpl() {
}
-void ConnectionHandler::Init(
- scoped_ptr<net::StreamSocket> socket,
- const google::protobuf::MessageLite& login_request,
- const ProtoReceivedCallback& read_callback,
- const ProtoSentCallback& write_callback,
- const ConnectionChangedCallback& connection_callback) {
- DCHECK(!read_callback.is_null());
- DCHECK(!write_callback.is_null());
- DCHECK(!connection_callback.is_null());
+void ConnectionHandlerImpl::Init(
+ const mcs_proto::LoginRequest& login_request,
+ scoped_ptr<net::StreamSocket> socket) {
+ DCHECK(!read_callback_.is_null());
+ DCHECK(!write_callback_.is_null());
+ DCHECK(!connection_callback_.is_null());
// Invalidate any previously outstanding reads.
weak_ptr_factory_.InvalidateWeakPtrs();
@@ -60,19 +66,16 @@ void ConnectionHandler::Init(
socket_ = socket.Pass();
input_stream_.reset(new SocketInputStream(socket_.get()));
output_stream_.reset(new SocketOutputStream(socket_.get()));
- read_callback_ = read_callback;
- write_callback_ = write_callback;
- connection_callback_ = connection_callback;
Login(login_request);
}
-bool ConnectionHandler::CanSendMessage() const {
+bool ConnectionHandlerImpl::CanSendMessage() const {
return handshake_complete_ && output_stream_.get() &&
output_stream_->GetState() == SocketOutputStream::EMPTY;
}
-void ConnectionHandler::SendMessage(
+void ConnectionHandlerImpl::SendMessage(
const google::protobuf::MessageLite& message) {
DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY);
DCHECK(handshake_complete_);
@@ -88,13 +91,13 @@ void ConnectionHandler::SendMessage(
}
if (output_stream_->Flush(
- base::Bind(&ConnectionHandler::OnMessageSent,
+ base::Bind(&ConnectionHandlerImpl::OnMessageSent,
weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) {
OnMessageSent();
}
}
-void ConnectionHandler::Login(
+void ConnectionHandlerImpl::Login(
const google::protobuf::MessageLite& login_request) {
DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY);
@@ -109,22 +112,22 @@ void ConnectionHandler::Login(
}
if (output_stream_->Flush(
- base::Bind(&ConnectionHandler::OnMessageSent,
+ base::Bind(&ConnectionHandlerImpl::OnMessageSent,
weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) {
base::MessageLoop::current()->PostTask(
FROM_HERE,
- base::Bind(&ConnectionHandler::OnMessageSent,
+ base::Bind(&ConnectionHandlerImpl::OnMessageSent,
weak_ptr_factory_.GetWeakPtr()));
}
read_timeout_timer_.Start(FROM_HERE,
read_timeout_,
- base::Bind(&ConnectionHandler::OnTimeout,
+ base::Bind(&ConnectionHandlerImpl::OnTimeout,
weak_ptr_factory_.GetWeakPtr()));
WaitForData(MCS_VERSION_TAG_AND_SIZE);
}
-void ConnectionHandler::OnMessageSent() {
+void ConnectionHandlerImpl::OnMessageSent() {
if (!output_stream_.get()) {
// The connection has already been closed. Just return.
DCHECK(!input_stream_.get());
@@ -145,7 +148,7 @@ void ConnectionHandler::OnMessageSent() {
write_callback_.Run();
}
-void ConnectionHandler::GetNextMessage() {
+void ConnectionHandlerImpl::GetNextMessage() {
DCHECK(SocketInputStream::EMPTY == input_stream_->GetState() ||
SocketInputStream::READY == input_stream_->GetState());
message_tag_ = 0;
@@ -154,7 +157,7 @@ void ConnectionHandler::GetNextMessage() {
WaitForData(MCS_TAG_AND_SIZE);
}
-void ConnectionHandler::WaitForData(ProcessingState state) {
+void ConnectionHandlerImpl::WaitForData(ProcessingState state) {
DVLOG(1) << "Waiting for MCS data: state == " << state;
if (!input_stream_) {
@@ -210,7 +213,7 @@ void ConnectionHandler::WaitForData(ProcessingState state) {
int byte_count = input_stream_->UnreadByteCount();
if (min_bytes_needed - byte_count > 0 &&
input_stream_->Refresh(
- base::Bind(&ConnectionHandler::WaitForData,
+ base::Bind(&ConnectionHandlerImpl::WaitForData,
weak_ptr_factory_.GetWeakPtr(),
state),
max_bytes_needed - byte_count) == net::ERR_IO_PENDING) {
@@ -249,7 +252,7 @@ void ConnectionHandler::WaitForData(ProcessingState state) {
}
}
-void ConnectionHandler::OnGotVersion() {
+void ConnectionHandlerImpl::OnGotVersion() {
uint8 version = 0;
{
CodedInputStream coded_input_stream(input_stream_.get());
@@ -267,7 +270,7 @@ void ConnectionHandler::OnGotVersion() {
OnGotMessageTag();
}
-void ConnectionHandler::OnGotMessageTag() {
+void ConnectionHandlerImpl::OnGotMessageTag() {
if (input_stream_->GetState() != SocketInputStream::READY) {
LOG(ERROR) << "Failed to receive protobuf tag.";
read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
@@ -285,13 +288,13 @@ void ConnectionHandler::OnGotMessageTag() {
if (!read_timeout_timer_.IsRunning()) {
read_timeout_timer_.Start(FROM_HERE,
read_timeout_,
- base::Bind(&ConnectionHandler::OnTimeout,
+ base::Bind(&ConnectionHandlerImpl::OnTimeout,
weak_ptr_factory_.GetWeakPtr()));
}
OnGotMessageSize();
}
-void ConnectionHandler::OnGotMessageSize() {
+void ConnectionHandlerImpl::OnGotMessageSize() {
if (input_stream_->GetState() != SocketInputStream::READY) {
LOG(ERROR) << "Failed to receive message size.";
read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
@@ -330,7 +333,7 @@ void ConnectionHandler::OnGotMessageSize() {
OnGotMessageBytes();
}
-void ConnectionHandler::OnGotMessageBytes() {
+void ConnectionHandlerImpl::OnGotMessageBytes() {
read_timeout_timer_.Stop();
scoped_ptr<google::protobuf::MessageLite> protobuf(
BuildProtobufFromTag(message_tag_));
@@ -339,7 +342,7 @@ void ConnectionHandler::OnGotMessageBytes() {
if (protobuf.get() && message_size_ == 0) {
base::MessageLoop::current()->PostTask(
FROM_HERE,
- base::Bind(&ConnectionHandler::GetNextMessage,
+ base::Bind(&ConnectionHandlerImpl::GetNextMessage,
weak_ptr_factory_.GetWeakPtr()));
read_callback_.Run(protobuf.Pass());
return;
@@ -368,7 +371,7 @@ void ConnectionHandler::OnGotMessageBytes() {
input_stream_->RebuildBuffer();
base::MessageLoop::current()->PostTask(
FROM_HERE,
- base::Bind(&ConnectionHandler::GetNextMessage,
+ base::Bind(&ConnectionHandlerImpl::GetNextMessage,
weak_ptr_factory_.GetWeakPtr()));
if (message_tag_ == kLoginResponseTag) {
if (handshake_complete_) {
@@ -381,13 +384,13 @@ void ConnectionHandler::OnGotMessageBytes() {
read_callback_.Run(protobuf.Pass());
}
-void ConnectionHandler::OnTimeout() {
+void ConnectionHandlerImpl::OnTimeout() {
LOG(ERROR) << "Timed out waiting for GCM Protocol buffer.";
CloseConnection();
connection_callback_.Run(net::ERR_TIMED_OUT);
}
-void ConnectionHandler::CloseConnection() {
+void ConnectionHandlerImpl::CloseConnection() {
DVLOG(1) << "Closing connection.";
read_callback_.Reset();
write_callback_.Reset();
« no previous file with comments | « google_apis/gcm/engine/connection_handler_impl.h ('k') | google_apis/gcm/engine/connection_handler_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698