Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(985)

Unified Diff: blimp/net/grpc_connection.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Address gcasto comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « blimp/net/grpc_connection.h ('k') | blimp/net/grpc_connection_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: blimp/net/grpc_connection.cc
diff --git a/blimp/net/grpc_connection.cc b/blimp/net/grpc_connection.cc
new file mode 100644
index 0000000000000000000000000000000000000000..00a005269e46c935b2df551e2fa671c9229e8611
--- /dev/null
+++ b/blimp/net/grpc_connection.cc
@@ -0,0 +1,174 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <utility>
+
+#include "base/callback.h"
+#include "base/callback_helpers.h"
+#include "base/macros.h"
+#include "base/memory/ptr_util.h"
+#include "base/threading/thread_task_runner_handle.h"
+#include "blimp/common/logging.h"
+#include "blimp/common/proto/blimp_message.pb.h"
+#include "blimp/net/blimp_message_processor.h"
+#include "blimp/net/connection_error_observer.h"
+#include "blimp/net/grpc_connection.h"
+#include "blimp/net/grpc_stream.h"
+#include "net/base/net_errors.h"
+
+namespace blimp {
+
+int ConvertHeliumResult(helium::Result result) {
+ return (result == helium::Result::SUCCESS ? 0 : -1);
+}
+
+// NOTE: This entire class/file is going away soon.
+// This set of classes is a thin wrapper around the new HeliumStream-like
+// GrpcStream.
+
+// Forwards a |BlimpMessage| from the client to the Engine via gRPC after
+// converting it to a |HeliumWrapper| first.
+class GrpcOutgoingForwarder : public BlimpMessageProcessor {
+ public:
+ GrpcOutgoingForwarder(GrpcStream* stream,
+ ConnectionErrorObserver* error_observer)
+ : stream_(stream) {}
+ void ProcessMessage(std::unique_ptr<BlimpMessage> message,
+ const net::CompletionCallback& callback) override;
+
+ private:
+ static void SendMessageCb(const base::Callback<void(int)>& callback,
+ helium::Result result) {
+ callback.Run(ConvertHeliumResult(result));
+ }
+
+ GrpcStream* stream_;
+};
+
+void GrpcOutgoingForwarder::ProcessMessage(
+ std::unique_ptr<BlimpMessage> message,
+ const net::CompletionCallback& callback) {
+ DVLOG(3) << "Sending message: " << *message
+ << "; type = " << message->message_id();
+
+ if (stream_ != nullptr) {
+ std::unique_ptr<HeliumWrapper> helium_message =
+ base::MakeUnique<HeliumWrapper>();
+
+ if (message->SerializeToString(
+ helium_message->mutable_serialized_helium_message())) {
+ stream_->SendMessage(
+ std::move(helium_message),
+ base::Bind(&GrpcOutgoingForwarder::SendMessageCb, callback));
+ } else {
+ LOG(FATAL) << "Unable to serialize message " << *message;
+ }
+ }
+}
+
+// Callback for the gRPC layer that has a new |HeliumWrapper| message to forward
+// to the client's 'incoming message processor' that accepts |BlimpMessage|.
+class GrpcIncomingForwarder {
+ public:
+ GrpcIncomingForwarder(GrpcStream* stream,
+ ConnectionErrorObserver* error_observer)
+ : stream_(stream),
+ error_observer_(error_observer),
+ incoming_msg_processor_(nullptr),
+ incoming_forwarder_(this) {}
+
+ void set_incoming_message_processor(BlimpMessageProcessor* processor) {
+ bool is_receive_message_setup = (incoming_msg_processor_ != nullptr);
+ incoming_msg_processor_ = processor;
+
+ if (!is_receive_message_setup) {
+ TriggerReceiveMessage();
+ }
+ }
+
+ private:
+ void TriggerReceiveMessage();
+ void ReceiveMessage();
+ void OnMessageReceived(std::unique_ptr<HeliumWrapper> helium_message,
+ helium::Result result);
+ void ProcessResult(int result);
+
+ GrpcStream* stream_;
+ ConnectionErrorObserver* error_observer_;
+ BlimpMessageProcessor* incoming_msg_processor_;
+ base::WeakPtrFactory<GrpcIncomingForwarder> incoming_forwarder_;
+};
+
+void GrpcIncomingForwarder::ProcessResult(int result) {
+ // No-op.
+}
+
+void GrpcIncomingForwarder::TriggerReceiveMessage() {
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE, base::Bind(&GrpcIncomingForwarder::ReceiveMessage,
+ incoming_forwarder_.GetWeakPtr()));
+}
+
+void GrpcIncomingForwarder::ReceiveMessage() {
+ if (stream_ != nullptr) {
+ stream_->ReceiveMessage(
+ base::Bind(&GrpcIncomingForwarder::OnMessageReceived,
+ incoming_forwarder_.GetWeakPtr()));
+ }
+}
+
+void GrpcIncomingForwarder::OnMessageReceived(
+ std::unique_ptr<HeliumWrapper> helium_message, helium::Result result) {
+ int result_code = ConvertHeliumResult(result);
+ if (result != helium::Result::SUCCESS) {
+ DVLOG(3) << "Received message with error " << result;
+ error_observer_->OnConnectionError(result_code);
+ return;
+ }
+
+ DVLOG(3) << "Processing message : " << helium_message->GetTypeName();
+ std::unique_ptr<BlimpMessage> blimp_message =
+ base::MakeUnique<BlimpMessage>();
+
+ if (blimp_message->ParseFromString(
+ helium_message->serialized_helium_message())) {
+ if (incoming_msg_processor_ != nullptr) {
+ DVLOG(3) << "Parsed BlimpMessage: " << *blimp_message;
+ incoming_msg_processor_->ProcessMessage(
+ std::move(blimp_message),
+ base::Bind(&GrpcIncomingForwarder::ProcessResult,
+ incoming_forwarder_.GetWeakPtr()));
+ TriggerReceiveMessage();
+ }
+ } else {
+ LOG(FATAL) << "Unable to deserialize message.";
+ }
+}
+
+// We rely on the gRPC and our //net code to agree on the 'success' code alone.
+// Again, this is interim while we use the current //net as well as gRPC but
+// will go away once we fully move to gRPC alone with strongly-typed results.
+static_assert(static_cast<int>(grpc::OK) == static_cast<int>(net::OK),
+ "Mismatch in error code definitions.");
+
+GrpcConnection::GrpcConnection(std::unique_ptr<GrpcStream> stream)
+ : stream_(std::move(stream)),
+ outgoing_forwarder_(
+ base::MakeUnique<GrpcOutgoingForwarder>(stream_.get(), this)),
+ incoming_forwarder_(
+ base::MakeUnique<GrpcIncomingForwarder>(stream_.get(), this)) {}
+
+BlimpMessageProcessor* GrpcConnection::GetOutgoingMessageProcessor() {
+ return outgoing_forwarder_.get();
+}
+
+void GrpcConnection::SetIncomingMessageProcessor(
+ BlimpMessageProcessor* processor) {
+ DVLOG(3) << "Setting incoming message processor " << processor;
+ incoming_forwarder_->set_incoming_message_processor(processor);
+}
+
+GrpcConnection::~GrpcConnection() { DVLOG(1) << "Ending connection."; }
+
+} // namespace blimp
« no previous file with comments | « blimp/net/grpc_connection.h ('k') | blimp/net/grpc_connection_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698