Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(214)

Unified Diff: google_apis/gcm/engine/connection_handler.cc

Issue 54743007: [GCM] Add connection factory for creating MCS connections (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix compile Created 7 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « google_apis/gcm/engine/connection_handler.h ('k') | google_apis/gcm/engine/connection_handler_impl.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: google_apis/gcm/engine/connection_handler.cc
diff --git a/google_apis/gcm/engine/connection_handler.cc b/google_apis/gcm/engine/connection_handler.cc
index b4eb602c7f46f9f3b46e242bc3c1e2780184cb57..bc9b65859797d8f49704ec77ae25d9d0b1e0856c 100644
--- a/google_apis/gcm/engine/connection_handler.cc
+++ b/google_apis/gcm/engine/connection_handler.cc
@@ -4,398 +4,12 @@
#include "google_apis/gcm/engine/connection_handler.h"
-#include "base/message_loop/message_loop.h"
-#include "google/protobuf/io/coded_stream.h"
-#include "google_apis/gcm/base/mcs_util.h"
-#include "google_apis/gcm/base/socket_stream.h"
-#include "net/base/net_errors.h"
-#include "net/socket/stream_socket.h"
-
-using namespace google::protobuf::io;
-
namespace gcm {
-namespace {
-
-// # of bytes a MCS version packet consumes.
-const int kVersionPacketLen = 1;
-// # of bytes a tag packet consumes.
-const int kTagPacketLen = 1;
-// Max # of bytes a length packet consumes.
-const int kSizePacketLenMin = 1;
-const int kSizePacketLenMax = 2;
-
-// The current MCS protocol version.
-const int kMCSVersion = 38;
-
-} // namespace
-
-ConnectionHandler::ConnectionHandler(base::TimeDelta read_timeout)
- : read_timeout_(read_timeout),
- handshake_complete_(false),
- message_tag_(0),
- message_size_(0),
- weak_ptr_factory_(this) {
+ConnectionHandler::ConnectionHandler() {
}
ConnectionHandler::~ConnectionHandler() {
}
-void ConnectionHandler::Init(
- scoped_ptr<net::StreamSocket> socket,
- const google::protobuf::MessageLite& login_request,
- const ProtoReceivedCallback& read_callback,
- const ProtoSentCallback& write_callback,
- const ConnectionChangedCallback& connection_callback) {
- DCHECK(!read_callback.is_null());
- DCHECK(!write_callback.is_null());
- DCHECK(!connection_callback.is_null());
-
- // Invalidate any previously outstanding reads.
- weak_ptr_factory_.InvalidateWeakPtrs();
-
- handshake_complete_ = false;
- message_tag_ = 0;
- message_size_ = 0;
- socket_ = socket.Pass();
- input_stream_.reset(new SocketInputStream(socket_.get()));
- output_stream_.reset(new SocketOutputStream(socket_.get()));
- read_callback_ = read_callback;
- write_callback_ = write_callback;
- connection_callback_ = connection_callback;
-
- Login(login_request);
-}
-
-bool ConnectionHandler::CanSendMessage() const {
- return handshake_complete_ && output_stream_.get() &&
- output_stream_->GetState() == SocketOutputStream::EMPTY;
-}
-
-void ConnectionHandler::SendMessage(
- const google::protobuf::MessageLite& message) {
- DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY);
- DCHECK(handshake_complete_);
-
- {
- CodedOutputStream coded_output_stream(output_stream_.get());
- DVLOG(1) << "Writing proto of size " << message.ByteSize();
- int tag = GetMCSProtoTag(message);
- DCHECK_NE(tag, -1);
- coded_output_stream.WriteRaw(&tag, 1);
- coded_output_stream.WriteVarint32(message.ByteSize());
- message.SerializeToCodedStream(&coded_output_stream);
- }
-
- if (output_stream_->Flush(
- base::Bind(&ConnectionHandler::OnMessageSent,
- weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) {
- OnMessageSent();
- }
-}
-
-void ConnectionHandler::Login(
- const google::protobuf::MessageLite& login_request) {
- DCHECK_EQ(output_stream_->GetState(), SocketOutputStream::EMPTY);
-
- const char version_byte[1] = {kMCSVersion};
- const char login_request_tag[1] = {kLoginRequestTag};
- {
- CodedOutputStream coded_output_stream(output_stream_.get());
- coded_output_stream.WriteRaw(version_byte, 1);
- coded_output_stream.WriteRaw(login_request_tag, 1);
- coded_output_stream.WriteVarint32(login_request.ByteSize());
- login_request.SerializeToCodedStream(&coded_output_stream);
- }
-
- if (output_stream_->Flush(
- base::Bind(&ConnectionHandler::OnMessageSent,
- weak_ptr_factory_.GetWeakPtr())) != net::ERR_IO_PENDING) {
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&ConnectionHandler::OnMessageSent,
- weak_ptr_factory_.GetWeakPtr()));
- }
-
- read_timeout_timer_.Start(FROM_HERE,
- read_timeout_,
- base::Bind(&ConnectionHandler::OnTimeout,
- weak_ptr_factory_.GetWeakPtr()));
- WaitForData(MCS_VERSION_TAG_AND_SIZE);
-}
-
-void ConnectionHandler::OnMessageSent() {
- if (!output_stream_.get()) {
- // The connection has already been closed. Just return.
- DCHECK(!input_stream_.get());
- DCHECK(!read_timeout_timer_.IsRunning());
- return;
- }
-
- if (output_stream_->GetState() != SocketOutputStream::EMPTY) {
- int last_error = output_stream_->last_error();
- CloseConnection();
- // If the socket stream had an error, plumb it up, else plumb up FAILED.
- if (last_error == net::OK)
- last_error = net::ERR_FAILED;
- connection_callback_.Run(last_error);
- return;
- }
-
- write_callback_.Run();
-}
-
-void ConnectionHandler::GetNextMessage() {
- DCHECK(SocketInputStream::EMPTY == input_stream_->GetState() ||
- SocketInputStream::READY == input_stream_->GetState());
- message_tag_ = 0;
- message_size_ = 0;
-
- WaitForData(MCS_TAG_AND_SIZE);
-}
-
-void ConnectionHandler::WaitForData(ProcessingState state) {
- DVLOG(1) << "Waiting for MCS data: state == " << state;
-
- if (!input_stream_) {
- // The connection has already been closed. Just return.
- DCHECK(!output_stream_.get());
- DCHECK(!read_timeout_timer_.IsRunning());
- return;
- }
-
- if (input_stream_->GetState() != SocketInputStream::EMPTY &&
- input_stream_->GetState() != SocketInputStream::READY) {
- // An error occurred.
- int last_error = output_stream_->last_error();
- CloseConnection();
- // If the socket stream had an error, plumb it up, else plumb up FAILED.
- if (last_error == net::OK)
- last_error = net::ERR_FAILED;
- connection_callback_.Run(last_error);
- return;
- }
-
- // Used to determine whether a Socket::Read is necessary.
- int min_bytes_needed = 0;
- // Used to limit the size of the Socket::Read.
- int max_bytes_needed = 0;
-
- switch(state) {
- case MCS_VERSION_TAG_AND_SIZE:
- min_bytes_needed = kVersionPacketLen + kTagPacketLen + kSizePacketLenMin;
- max_bytes_needed = kVersionPacketLen + kTagPacketLen + kSizePacketLenMax;
- break;
- case MCS_TAG_AND_SIZE:
- min_bytes_needed = kTagPacketLen + kSizePacketLenMin;
- max_bytes_needed = kTagPacketLen + kSizePacketLenMax;
- break;
- case MCS_FULL_SIZE:
- // If in this state, the minimum size packet length must already have been
- // insufficient, so set both to the max length.
- min_bytes_needed = kSizePacketLenMax;
- max_bytes_needed = kSizePacketLenMax;
- break;
- case MCS_PROTO_BYTES:
- read_timeout_timer_.Reset();
- // No variability in the message size, set both to the same.
- min_bytes_needed = message_size_;
- max_bytes_needed = message_size_;
- break;
- default:
- NOTREACHED();
- }
- DCHECK_GE(max_bytes_needed, min_bytes_needed);
-
- int byte_count = input_stream_->UnreadByteCount();
- if (min_bytes_needed - byte_count > 0 &&
- input_stream_->Refresh(
- base::Bind(&ConnectionHandler::WaitForData,
- weak_ptr_factory_.GetWeakPtr(),
- state),
- max_bytes_needed - byte_count) == net::ERR_IO_PENDING) {
- return;
- }
-
- // Check for refresh errors.
- if (input_stream_->GetState() != SocketInputStream::READY) {
- // An error occurred.
- int last_error = output_stream_->last_error();
- CloseConnection();
- // If the socket stream had an error, plumb it up, else plumb up FAILED.
- if (last_error == net::OK)
- last_error = net::ERR_FAILED;
- connection_callback_.Run(last_error);
- return;
- }
-
- // Received enough bytes, process them.
- DVLOG(1) << "Processing MCS data: state == " << state;
- switch(state) {
- case MCS_VERSION_TAG_AND_SIZE:
- OnGotVersion();
- break;
- case MCS_TAG_AND_SIZE:
- OnGotMessageTag();
- break;
- case MCS_FULL_SIZE:
- OnGotMessageSize();
- break;
- case MCS_PROTO_BYTES:
- OnGotMessageBytes();
- break;
- default:
- NOTREACHED();
- }
-}
-
-void ConnectionHandler::OnGotVersion() {
- uint8 version = 0;
- {
- CodedInputStream coded_input_stream(input_stream_.get());
- coded_input_stream.ReadRaw(&version, 1);
- }
- if (version < kMCSVersion) {
- LOG(ERROR) << "Invalid GCM version response: " << static_cast<int>(version);
- connection_callback_.Run(net::ERR_FAILED);
- return;
- }
-
- input_stream_->RebuildBuffer();
-
- // Process the LoginResponse message tag.
- OnGotMessageTag();
-}
-
-void ConnectionHandler::OnGotMessageTag() {
- if (input_stream_->GetState() != SocketInputStream::READY) {
- LOG(ERROR) << "Failed to receive protobuf tag.";
- read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
- return;
- }
-
- {
- CodedInputStream coded_input_stream(input_stream_.get());
- coded_input_stream.ReadRaw(&message_tag_, 1);
- }
-
- DVLOG(1) << "Received proto of type "
- << static_cast<unsigned int>(message_tag_);
-
- if (!read_timeout_timer_.IsRunning()) {
- read_timeout_timer_.Start(FROM_HERE,
- read_timeout_,
- base::Bind(&ConnectionHandler::OnTimeout,
- weak_ptr_factory_.GetWeakPtr()));
- }
- OnGotMessageSize();
-}
-
-void ConnectionHandler::OnGotMessageSize() {
- if (input_stream_->GetState() != SocketInputStream::READY) {
- LOG(ERROR) << "Failed to receive message size.";
- read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
- return;
- }
-
- bool need_another_byte = false;
- int prev_byte_count = input_stream_->ByteCount();
- {
- CodedInputStream coded_input_stream(input_stream_.get());
- if (!coded_input_stream.ReadVarint32(&message_size_))
- need_another_byte = true;
- }
-
- if (need_another_byte) {
- DVLOG(1) << "Expecting another message size byte.";
- if (prev_byte_count >= kSizePacketLenMax) {
- // Already had enough bytes, something else went wrong.
- LOG(ERROR) << "Failed to process message size.";
- read_callback_.Run(scoped_ptr<google::protobuf::MessageLite>());
- return;
- }
- // Back up by the amount read (should always be 1 byte).
- int bytes_read = prev_byte_count - input_stream_->ByteCount();
- DCHECK_EQ(bytes_read, 1);
- input_stream_->BackUp(bytes_read);
- WaitForData(MCS_FULL_SIZE);
- return;
- }
-
- DVLOG(1) << "Proto size: " << message_size_;
-
- if (message_size_ > 0)
- WaitForData(MCS_PROTO_BYTES);
- else
- OnGotMessageBytes();
-}
-
-void ConnectionHandler::OnGotMessageBytes() {
- read_timeout_timer_.Stop();
- scoped_ptr<google::protobuf::MessageLite> protobuf(
- BuildProtobufFromTag(message_tag_));
- // Messages with no content are valid; just use the default protobuf for
- // that tag.
- if (protobuf.get() && message_size_ == 0) {
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&ConnectionHandler::GetNextMessage,
- weak_ptr_factory_.GetWeakPtr()));
- read_callback_.Run(protobuf.Pass());
- return;
- }
-
- if (!protobuf.get() ||
- input_stream_->GetState() != SocketInputStream::READY) {
- LOG(ERROR) << "Failed to extract protobuf bytes of type "
- << static_cast<unsigned int>(message_tag_);
- protobuf.reset(); // Return a null pointer to denote an error.
- read_callback_.Run(protobuf.Pass());
- return;
- }
-
- {
- CodedInputStream coded_input_stream(input_stream_.get());
- if (!protobuf->ParsePartialFromCodedStream(&coded_input_stream)) {
- NOTREACHED() << "Unable to parse GCM message of type "
- << static_cast<unsigned int>(message_tag_);
- protobuf.reset(); // Return a null pointer to denote an error.
- read_callback_.Run(protobuf.Pass());
- return;
- }
- }
-
- input_stream_->RebuildBuffer();
- base::MessageLoop::current()->PostTask(
- FROM_HERE,
- base::Bind(&ConnectionHandler::GetNextMessage,
- weak_ptr_factory_.GetWeakPtr()));
- if (message_tag_ == kLoginResponseTag) {
- if (handshake_complete_) {
- LOG(ERROR) << "Unexpected login response.";
- } else {
- handshake_complete_ = true;
- DVLOG(1) << "GCM Handshake complete.";
- }
- }
- read_callback_.Run(protobuf.Pass());
-}
-
-void ConnectionHandler::OnTimeout() {
- LOG(ERROR) << "Timed out waiting for GCM Protocol buffer.";
- CloseConnection();
- connection_callback_.Run(net::ERR_TIMED_OUT);
-}
-
-void ConnectionHandler::CloseConnection() {
- DVLOG(1) << "Closing connection.";
- read_callback_.Reset();
- write_callback_.Reset();
- read_timeout_timer_.Stop();
- socket_->Disconnect();
- input_stream_.reset();
- output_stream_.reset();
- weak_ptr_factory_.InvalidateWeakPtrs();
-}
-
} // namespace gcm
« no previous file with comments | « google_apis/gcm/engine/connection_handler.h ('k') | google_apis/gcm/engine/connection_handler_impl.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698