| Index: blimp/net/grpc_connection.cc
|
| diff --git a/blimp/net/grpc_connection.cc b/blimp/net/grpc_connection.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..0095cc801c8f669bfcafd47eef8d106fdda74468
|
| --- /dev/null
|
| +++ b/blimp/net/grpc_connection.cc
|
| @@ -0,0 +1,166 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include "blimp/net/grpc_connection.h"
|
| +
|
| +#include "blimp/common/proto/blimp_message.pb.h"
|
| +
|
| +#include "base/callback.h"
|
| +#include "base/callback_helpers.h"
|
| +#include "base/macros.h"
|
| +#include "base/memory/ptr_util.h"
|
| +#include "base/threading/thread_task_runner_handle.h"
|
| +#include "blimp/common/logging.h"
|
| +#include "blimp/net/grpc_stream.h"
|
| +#include "content/public/browser/browser_thread.h"
|
| +#include "net/base/net_errors.h"
|
| +#include "blimp/net/connection_error_observer.h"
|
| +
|
| +namespace blimp {
|
| +
|
| +// NOTE: This entire class/file is going away soon.
|
| +// This set of classes is a thin wrapper around the new HeliumStream-like
|
| +// GrpcStream.
|
| +
|
| +// Forwards a |BlimpMessage| from the client to the Engine via gRPC after
|
| +// converting it to a |HeliumWrapper| first.
|
| +class GrpcOutgoingForwarder : public BlimpMessageProcessor {
|
| + public:
|
| + GrpcOutgoingForwarder(GrpcStream* stream,
|
| + ConnectionErrorObserver* error_observer)
|
| + : stream_(stream) {}
|
| + void ProcessMessage(std::unique_ptr<BlimpMessage> message,
|
| + const net::CompletionCallback& callback) override;
|
| +
|
| + private:
|
| + GrpcStream* stream_;
|
| +};
|
| +
|
| +void GrpcOutgoingForwarder::ProcessMessage(
|
| + std::unique_ptr<BlimpMessage> message,
|
| + const net::CompletionCallback& callback) {
|
| + DVLOG(3) << "Sending message: " << *message
|
| + << "; type = " << message->message_id();
|
| + // DCHECK(message->has_message_id());
|
| + if (stream_ != nullptr) {
|
| + auto helium_message = base::MakeUnique<HeliumWrapper>();
|
| +
|
| + if (message->SerializeToString(
|
| + helium_message->mutable_serialized_helium_message())) {
|
| + stream_->SendMessage(std::move(helium_message), callback);
|
| + } else {
|
| + LOG(FATAL) << "Unable to serialize message " << *message;
|
| + }
|
| + }
|
| +};
|
| +
|
| +// Callback for the gRPC layer that has a new |HeliumWrapper| message to forward
|
| +// to the client's 'incoming message processor' that accepts |BlimpMessage|.
|
| +class GrpcIncomingForwarder {
|
| + public:
|
| + GrpcIncomingForwarder(GrpcStream* stream,
|
| + ConnectionErrorObserver* error_observer)
|
| + : stream_(stream),
|
| + error_observer_(error_observer),
|
| + incoming_msg_processor_(nullptr),
|
| + incoming_forwarder_(this) {}
|
| +
|
| + void set_incoming_message_processor(BlimpMessageProcessor* processor) {
|
| + auto is_receive_message_setup = (incoming_msg_processor_ != nullptr);
|
| + incoming_msg_processor_ = processor;
|
| +
|
| + if (!is_receive_message_setup) {
|
| + TriggerReceiveMessage();
|
| + }
|
| + }
|
| +
|
| + private:
|
| + void TriggerReceiveMessage();
|
| +
|
| + void ReceiveMessage();
|
| +
|
| + void OnMessageReceived(std::unique_ptr<HeliumWrapper> helium_message,
|
| + int result);
|
| +
|
| + void ProcessResult(int result);
|
| +
|
| + GrpcStream* stream_;
|
| + ConnectionErrorObserver* error_observer_;
|
| + BlimpMessageProcessor* incoming_msg_processor_;
|
| + base::WeakPtrFactory<GrpcIncomingForwarder> incoming_forwarder_;
|
| +};
|
| +
|
| +void GrpcIncomingForwarder::ProcessResult(int result) {
|
| + // No-op.
|
| +}
|
| +
|
| +void GrpcIncomingForwarder::TriggerReceiveMessage() {
|
| + base::ThreadTaskRunnerHandle::Get()->PostTask(
|
| + FROM_HERE, base::Bind(&GrpcIncomingForwarder::ReceiveMessage,
|
| + incoming_forwarder_.GetWeakPtr()));
|
| +}
|
| +
|
| +void GrpcIncomingForwarder::ReceiveMessage() {
|
| + if (stream_ != nullptr) {
|
| + stream_->ReceiveMessage(
|
| + base::Bind(&GrpcIncomingForwarder::OnMessageReceived,
|
| + incoming_forwarder_.GetWeakPtr()));
|
| + }
|
| +}
|
| +
|
| +void GrpcIncomingForwarder::OnMessageReceived(
|
| + std::unique_ptr<HeliumWrapper> helium_message,
|
| + int result) {
|
| + if (result != grpc::OK) {
|
| + DVLOG(3) << "Received message with error " << result;
|
| + error_observer_->OnConnectionError(result);
|
| + return;
|
| + }
|
| +
|
| + DVLOG(3) << "Processing message : " << helium_message->GetTypeName();
|
| + auto blimp_message = base::MakeUnique<BlimpMessage>();
|
| +
|
| + if (blimp_message->ParseFromString(
|
| + helium_message->serialized_helium_message())) {
|
| + if (incoming_msg_processor_ != nullptr) {
|
| + DVLOG(3) << "Parsed BlimpMessage: " << *blimp_message;
|
| + incoming_msg_processor_->ProcessMessage(
|
| + std::move(blimp_message),
|
| + base::Bind(&GrpcIncomingForwarder::ProcessResult,
|
| + incoming_forwarder_.GetWeakPtr()));
|
| + TriggerReceiveMessage();
|
| + }
|
| + } else {
|
| + LOG(FATAL) << "Unable to deserialize message.";
|
| + }
|
| +}
|
| +
|
| +// We rely on the gRPC and our //net code to agree on the 'success' code alone.
|
| +// Again, this is interim while we use the current //net as well as gRPC but
|
| +// will go away once we fully move to gRPC alone with strongly-typed results.
|
| +static_assert(static_cast<int>(grpc::OK) == static_cast<int>(net::OK),
|
| + "Mismatch in error code definitions.");
|
| +
|
| +GrpcConnection::GrpcConnection(std::unique_ptr<GrpcStream> stream)
|
| + : stream_(std::move(stream)),
|
| + outgoing_forwarder_(
|
| + base::MakeUnique<GrpcOutgoingForwarder>(stream_.get(), this)),
|
| + incoming_forwarder_(
|
| + base::MakeUnique<GrpcIncomingForwarder>(stream_.get(), this)) {}
|
| +
|
| +BlimpMessageProcessor* GrpcConnection::GetOutgoingMessageProcessor() {
|
| + return outgoing_forwarder_.get();
|
| +}
|
| +
|
| +void GrpcConnection::SetIncomingMessageProcessor(
|
| + BlimpMessageProcessor* processor) {
|
| + DVLOG(3) << "Setting incoming message processor " << processor;
|
| + incoming_forwarder_->set_incoming_message_processor(processor);
|
| +}
|
| +
|
| +GrpcConnection::~GrpcConnection() {
|
| + DVLOG(1) << "Ending connection.";
|
| +}
|
| +
|
| +} // namespace blimp
|
|
|