| Index: blimp/net/grpc_engine_stream.cc
|
| diff --git a/blimp/net/grpc_engine_stream.cc b/blimp/net/grpc_engine_stream.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..a840aa05223bac27edede08bc50077b13e5cef9c
|
| --- /dev/null
|
| +++ b/blimp/net/grpc_engine_stream.cc
|
| @@ -0,0 +1,128 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include <grpc++/channel.h>
|
| +#include <grpc++/security/credentials.h>
|
| +#include <grpc++/security/server_credentials.h>
|
| +#include <grpc++/server.h>
|
| +#include <grpc++/server_builder.h>
|
| +#include <grpc++/server_context.h>
|
| +#include <grpc/grpc.h>
|
| +
|
| +#include <utility>
|
| +
|
| +#include "base/bind_helpers.h"
|
| +#include "base/callback.h"
|
| +#include "blimp/common/logging.h"
|
| +#include "blimp/common/proto/helium_service.grpc.pb.h"
|
| +#include "blimp/common/public/session/assignment_options.h"
|
| +#include "blimp/net/grpc_connection.h"
|
| +#include "blimp/net/grpc_engine_stream.h"
|
| +
|
| +namespace blimp {
|
| +
|
| +// Data-only struct for handling cross-thread gRPC objects.
|
| +struct GrpcEngineStream::EngineSharedData : public GrpcStream::SharedData {
|
| + public:
|
| + // The gRPC service that provides the streaming, asynchronous API.
|
| + HeliumService::AsyncService service;
|
| +
|
| + // The gRPC server that listens to a specified port/file-descriptor.
|
| + std::unique_ptr<grpc::Server> server;
|
| +
|
| + // The context that is used by the service/server as well as the completion
|
| + // queue.
|
| + grpc::ServerContext context;
|
| +
|
| + // This is the actual asynchronous API used to stream bidirectional
|
| + // |HeliumMessage|s to and from a client.
|
| + std::unique_ptr<grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>
|
| + stream;
|
| +
|
| + protected:
|
| + ~EngineSharedData() override {
|
| + DVLOG(3) << "Engine stream shared data is now destroyed.";
|
| + };
|
| + friend class base::RefCountedThreadSafe<EngineSharedData>;
|
| +};
|
| +
|
| +GrpcEngineStream::GrpcEngineStream(
|
| + const AssignmentOptions& assignment_options,
|
| + const net::CompletionCallback& connection_callback)
|
| + : GrpcStream(), assignment_options_(assignment_options) {
|
| + // First start the gRPC server and add a completion queue.
|
| + shared_data_ = scoped_refptr<EngineSharedData>(new EngineSharedData());
|
| + grpc::ServerBuilder builder;
|
| +
|
| + int selected_port = -1;
|
| + builder.AddListeningPort(assignment_options.engine_endpoint.ToString(),
|
| + grpc::InsecureServerCredentials(), &selected_port);
|
| + builder.RegisterService(&shared_data_->service);
|
| +
|
| + std::unique_ptr<grpc::ServerCompletionQueue> completion_queue =
|
| + builder.AddCompletionQueue();
|
| +
|
| + // TODO(perumaal): Add file-descriptor support. See crbug.com/661366
|
| + shared_data_->server = builder.BuildAndStart();
|
| +
|
| + // Update our actual listening engine ip-address/port number.
|
| + assignment_options_.engine_endpoint = net::IPEndPoint(
|
| + assignment_options_.engine_endpoint.address(), selected_port);
|
| +
|
| + // Now setup a completion queue thread that processes tasks (tags) from the
|
| + // completion queue.
|
| + shared_data_->stream = base::MakeUnique<
|
| + grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>(
|
| + &shared_data_->context);
|
| + shared_data_->service.RequestStream(
|
| + &shared_data_->context, shared_data_->stream.get(),
|
| + completion_queue.get(), completion_queue.get(),
|
| + reinterpret_cast<void*>(ConnectTag(connection_callback)));
|
| + shared_data_->completion_queue = std::move(completion_queue);
|
| + StartCompletionQueueThread(shared_data_);
|
| + DVLOG(3) << "Starting engine stream @ "
|
| + << assignment_options_.engine_endpoint.ToString();
|
| +}
|
| +
|
| +const AssignmentOptions& GrpcEngineStream::GetAssignmentOptions() const {
|
| + return assignment_options_;
|
| +}
|
| +
|
| +void GrpcEngineStream::SendMessage(
|
| + std::unique_ptr<HeliumWrapper> helium_message,
|
| + const Stream::SendMessageCallback& sent_cb) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes";
|
| + DCHECK_GT(helium_message->serialized_helium_message().size(), 0U);
|
| +
|
| + shared_data_->stream->Write(*helium_message.get(),
|
| + reinterpret_cast<void*>(WriteTag(sent_cb)));
|
| +}
|
| +
|
| +void GrpcEngineStream::ReceiveMessage(
|
| + const Stream::ReceiveMessageCallback& received_cb) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + DVLOG(3) << "Receiving (waiting for callback)";
|
| + HeliumWrapper* received_msg = nullptr;
|
| + void* tag = reinterpret_cast<void*>(ReadTag(received_cb, &received_msg));
|
| + DCHECK(received_msg != nullptr);
|
| + shared_data_->stream->Read(received_msg, tag);
|
| +}
|
| +
|
| +GrpcEngineStream::~GrpcEngineStream() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + // The order here is important! Completion queue *must* be shutdown *after*
|
| + // the server shutdown. The unit-tests verify this aspect by tearing down the
|
| + // engine stream explicitly.
|
| + DVLOG(3) << "Engine stream shutting down.";
|
| +
|
| + // It's important to not block the IO thread simply because the server is
|
| + // shutting down.
|
| + gpr_timespec deadline = gpr_time_from_millis(0, GPR_CLOCK_MONOTONIC);
|
| + shared_data_->server->Shutdown(deadline);
|
| + shared_data_->completion_queue->Shutdown();
|
| + DVLOG(3) << "Engine stream shut down finished.";
|
| +}
|
| +} // namespace blimp
|
|
|