Index: blimp/net/grpc_engine_stream.cc |
diff --git a/blimp/net/grpc_engine_stream.cc b/blimp/net/grpc_engine_stream.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..a840aa05223bac27edede08bc50077b13e5cef9c |
--- /dev/null |
+++ b/blimp/net/grpc_engine_stream.cc |
@@ -0,0 +1,128 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include <grpc++/channel.h> |
+#include <grpc++/security/credentials.h> |
+#include <grpc++/security/server_credentials.h> |
+#include <grpc++/server.h> |
+#include <grpc++/server_builder.h> |
+#include <grpc++/server_context.h> |
+#include <grpc/grpc.h> |
+ |
+#include <utility> |
+ |
+#include "base/bind_helpers.h" |
+#include "base/callback.h" |
+#include "blimp/common/logging.h" |
+#include "blimp/common/proto/helium_service.grpc.pb.h" |
+#include "blimp/common/public/session/assignment_options.h" |
+#include "blimp/net/grpc_connection.h" |
+#include "blimp/net/grpc_engine_stream.h" |
+ |
+namespace blimp { |
+ |
+// Data-only struct for handling cross-thread gRPC objects. |
+struct GrpcEngineStream::EngineSharedData : public GrpcStream::SharedData { |
+ public: |
+ // The gRPC service that provides the streaming, asynchronous API. |
+ HeliumService::AsyncService service; |
+ |
+ // The gRPC server that listens to a specified port/file-descriptor. |
+ std::unique_ptr<grpc::Server> server; |
+ |
+ // The context that is used by the service/server as well as the completion |
+ // queue. |
+ grpc::ServerContext context; |
+ |
+ // This is the actual asynchronous API used to stream bidirectional |
+ // |HeliumMessage|s to and from a client. |
+ std::unique_ptr<grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>> |
+ stream; |
+ |
+ protected: |
+ ~EngineSharedData() override { |
+ DVLOG(3) << "Engine stream shared data is now destroyed."; |
+ }; |
+ friend class base::RefCountedThreadSafe<EngineSharedData>; |
+}; |
+ |
+GrpcEngineStream::GrpcEngineStream( |
+ const AssignmentOptions& assignment_options, |
+ const net::CompletionCallback& connection_callback) |
+ : GrpcStream(), assignment_options_(assignment_options) { |
+ // First start the gRPC server and add a completion queue. |
+ shared_data_ = scoped_refptr<EngineSharedData>(new EngineSharedData()); |
+ grpc::ServerBuilder builder; |
+ |
+ int selected_port = -1; |
+ builder.AddListeningPort(assignment_options.engine_endpoint.ToString(), |
+ grpc::InsecureServerCredentials(), &selected_port); |
+ builder.RegisterService(&shared_data_->service); |
+ |
+ std::unique_ptr<grpc::ServerCompletionQueue> completion_queue = |
+ builder.AddCompletionQueue(); |
+ |
+ // TODO(perumaal): Add file-descriptor support. See crbug.com/661366 |
+ shared_data_->server = builder.BuildAndStart(); |
+ |
+ // Update our actual listening engine ip-address/port number. |
+ assignment_options_.engine_endpoint = net::IPEndPoint( |
+ assignment_options_.engine_endpoint.address(), selected_port); |
+ |
+ // Now setup a completion queue thread that processes tasks (tags) from the |
+ // completion queue. |
+ shared_data_->stream = base::MakeUnique< |
+ grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>( |
+ &shared_data_->context); |
+ shared_data_->service.RequestStream( |
+ &shared_data_->context, shared_data_->stream.get(), |
+ completion_queue.get(), completion_queue.get(), |
+ reinterpret_cast<void*>(ConnectTag(connection_callback))); |
+ shared_data_->completion_queue = std::move(completion_queue); |
+ StartCompletionQueueThread(shared_data_); |
+ DVLOG(3) << "Starting engine stream @ " |
+ << assignment_options_.engine_endpoint.ToString(); |
+} |
+ |
+const AssignmentOptions& GrpcEngineStream::GetAssignmentOptions() const { |
+ return assignment_options_; |
+} |
+ |
+void GrpcEngineStream::SendMessage( |
+ std::unique_ptr<HeliumWrapper> helium_message, |
+ const Stream::SendMessageCallback& sent_cb) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes"; |
+ DCHECK_GT(helium_message->serialized_helium_message().size(), 0U); |
+ |
+ shared_data_->stream->Write(*helium_message.get(), |
+ reinterpret_cast<void*>(WriteTag(sent_cb))); |
+} |
+ |
+void GrpcEngineStream::ReceiveMessage( |
+ const Stream::ReceiveMessageCallback& received_cb) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DVLOG(3) << "Receiving (waiting for callback)"; |
+ HeliumWrapper* received_msg = nullptr; |
+ void* tag = reinterpret_cast<void*>(ReadTag(received_cb, &received_msg)); |
+ DCHECK(received_msg != nullptr); |
+ shared_data_->stream->Read(received_msg, tag); |
+} |
+ |
+GrpcEngineStream::~GrpcEngineStream() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ |
+ // The order here is important! Completion queue *must* be shutdown *after* |
+ // the server shutdown. The unit-tests verify this aspect by tearing down the |
+ // engine stream explicitly. |
+ DVLOG(3) << "Engine stream shutting down."; |
+ |
+ // It's important to not block the IO thread simply because the server is |
+ // shutting down. |
+ gpr_timespec deadline = gpr_time_from_millis(0, GPR_CLOCK_MONOTONIC); |
+ shared_data_->server->Shutdown(deadline); |
+ shared_data_->completion_queue->Shutdown(); |
+ DVLOG(3) << "Engine stream shut down finished."; |
+} |
+} // namespace blimp |