Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2794)

Unified Diff: blimp/net/grpc_engine_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Address gcasto comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « blimp/net/grpc_engine_stream.h ('k') | blimp/net/grpc_engine_transport.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: blimp/net/grpc_engine_stream.cc
diff --git a/blimp/net/grpc_engine_stream.cc b/blimp/net/grpc_engine_stream.cc
new file mode 100644
index 0000000000000000000000000000000000000000..a840aa05223bac27edede08bc50077b13e5cef9c
--- /dev/null
+++ b/blimp/net/grpc_engine_stream.cc
@@ -0,0 +1,128 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <grpc++/channel.h>
+#include <grpc++/security/credentials.h>
+#include <grpc++/security/server_credentials.h>
+#include <grpc++/server.h>
+#include <grpc++/server_builder.h>
+#include <grpc++/server_context.h>
+#include <grpc/grpc.h>
+
+#include <utility>
+
+#include "base/bind_helpers.h"
+#include "base/callback.h"
+#include "blimp/common/logging.h"
+#include "blimp/common/proto/helium_service.grpc.pb.h"
+#include "blimp/common/public/session/assignment_options.h"
+#include "blimp/net/grpc_connection.h"
+#include "blimp/net/grpc_engine_stream.h"
+
+namespace blimp {
+
+// Data-only struct for handling cross-thread gRPC objects.
+struct GrpcEngineStream::EngineSharedData : public GrpcStream::SharedData {
+ public:
+ // The gRPC service that provides the streaming, asynchronous API.
+ HeliumService::AsyncService service;
+
+ // The gRPC server that listens to a specified port/file-descriptor.
+ std::unique_ptr<grpc::Server> server;
+
+ // The context that is used by the service/server as well as the completion
+ // queue.
+ grpc::ServerContext context;
+
+ // This is the actual asynchronous API used to stream bidirectional
+ // |HeliumMessage|s to and from a client.
+ std::unique_ptr<grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>
+ stream;
+
+ protected:
+ ~EngineSharedData() override {
+ DVLOG(3) << "Engine stream shared data is now destroyed.";
+ };
+ friend class base::RefCountedThreadSafe<EngineSharedData>;
+};
+
+GrpcEngineStream::GrpcEngineStream(
+ const AssignmentOptions& assignment_options,
+ const net::CompletionCallback& connection_callback)
+ : GrpcStream(), assignment_options_(assignment_options) {
+ // First start the gRPC server and add a completion queue.
+ shared_data_ = scoped_refptr<EngineSharedData>(new EngineSharedData());
+ grpc::ServerBuilder builder;
+
+ int selected_port = -1;
+ builder.AddListeningPort(assignment_options.engine_endpoint.ToString(),
+ grpc::InsecureServerCredentials(), &selected_port);
+ builder.RegisterService(&shared_data_->service);
+
+ std::unique_ptr<grpc::ServerCompletionQueue> completion_queue =
+ builder.AddCompletionQueue();
+
+ // TODO(perumaal): Add file-descriptor support. See crbug.com/661366
+ shared_data_->server = builder.BuildAndStart();
+
+ // Update our actual listening engine ip-address/port number.
+ assignment_options_.engine_endpoint = net::IPEndPoint(
+ assignment_options_.engine_endpoint.address(), selected_port);
+
+ // Now setup a completion queue thread that processes tasks (tags) from the
+ // completion queue.
+ shared_data_->stream = base::MakeUnique<
+ grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>(
+ &shared_data_->context);
+ shared_data_->service.RequestStream(
+ &shared_data_->context, shared_data_->stream.get(),
+ completion_queue.get(), completion_queue.get(),
+ reinterpret_cast<void*>(ConnectTag(connection_callback)));
+ shared_data_->completion_queue = std::move(completion_queue);
+ StartCompletionQueueThread(shared_data_);
+ DVLOG(3) << "Starting engine stream @ "
+ << assignment_options_.engine_endpoint.ToString();
+}
+
+const AssignmentOptions& GrpcEngineStream::GetAssignmentOptions() const {
+ return assignment_options_;
+}
+
+void GrpcEngineStream::SendMessage(
+ std::unique_ptr<HeliumWrapper> helium_message,
+ const Stream::SendMessageCallback& sent_cb) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes";
+ DCHECK_GT(helium_message->serialized_helium_message().size(), 0U);
+
+ shared_data_->stream->Write(*helium_message.get(),
+ reinterpret_cast<void*>(WriteTag(sent_cb)));
+}
+
+void GrpcEngineStream::ReceiveMessage(
+ const Stream::ReceiveMessageCallback& received_cb) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DVLOG(3) << "Receiving (waiting for callback)";
+ HeliumWrapper* received_msg = nullptr;
+ void* tag = reinterpret_cast<void*>(ReadTag(received_cb, &received_msg));
+ DCHECK(received_msg != nullptr);
+ shared_data_->stream->Read(received_msg, tag);
+}
+
+GrpcEngineStream::~GrpcEngineStream() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ // The order here is important! Completion queue *must* be shutdown *after*
+ // the server shutdown. The unit-tests verify this aspect by tearing down the
+ // engine stream explicitly.
+ DVLOG(3) << "Engine stream shutting down.";
+
+ // It's important to not block the IO thread simply because the server is
+ // shutting down.
+ gpr_timespec deadline = gpr_time_from_millis(0, GPR_CLOCK_MONOTONIC);
+ shared_data_->server->Shutdown(deadline);
+ shared_data_->completion_queue->Shutdown();
+ DVLOG(3) << "Engine stream shut down finished.";
+}
+} // namespace blimp
« no previous file with comments | « blimp/net/grpc_engine_stream.h ('k') | blimp/net/grpc_engine_transport.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698