| Index: blimp/net/grpc_client_stream.cc
|
| diff --git a/blimp/net/grpc_client_stream.cc b/blimp/net/grpc_client_stream.cc
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..18c8667712e53230c0b9055bdfc2588ccce1621d
|
| --- /dev/null
|
| +++ b/blimp/net/grpc_client_stream.cc
|
| @@ -0,0 +1,94 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +#include <grpc++/channel.h>
|
| +#include <grpc++/client_context.h>
|
| +#include <grpc++/create_channel.h>
|
| +#include <grpc++/security/credentials.h>
|
| +#include <grpc/grpc.h>
|
| +
|
| +#include "base/bind_helpers.h"
|
| +#include "blimp/net/grpc_client_stream.h"
|
| +
|
| +namespace blimp {
|
| +
|
| +// Data-only struct for handling cross-thread gRPC objects.
|
| +struct GrpcClientStream::ClientSharedData : public GrpcStream::SharedData {
|
| + public:
|
| + // This stub implements the gRPC service. We use this to stream data to and
|
| + // from the server (engine).
|
| + std::unique_ptr<HeliumService::Stub> stub;
|
| +
|
| + // This is the specific Stream API exposed as part of our gRPC service.
|
| + std::unique_ptr<grpc::ClientAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>
|
| + stream;
|
| +
|
| + // This is the context bound to both the stub and the stream. This must be
|
| + // destroyed only after the stream is no longer in use.
|
| + grpc::ClientContext context;
|
| +
|
| + protected:
|
| + ~ClientSharedData() override {
|
| + // The destructor can be called from either the IO thread or the completion
|
| + // queue thread.
|
| + DVLOG(3) << "Client stream shared data is now destroyed.";
|
| + };
|
| + friend class base::RefCountedThreadSafe<ClientSharedData>;
|
| +};
|
| +
|
| +// Sets up the gRPC client stream and connects to the specified assignment
|
| +// options.
|
| +GrpcClientStream::GrpcClientStream(
|
| + const AssignmentOptions& assignment_options,
|
| + const net::CompletionCallback& connection_callback)
|
| + : GrpcStream(),
|
| + assignment_options_(assignment_options),
|
| + shared_data_(scoped_refptr<ClientSharedData>(new ClientSharedData())) {
|
| + // First establish a stream to the engine (asynchronously establishing a
|
| + // connection as well).
|
| + shared_data_->completion_queue = base::MakeUnique<grpc::CompletionQueue>();
|
| + // TODO(perumaal): Add file-descriptor support. See crbug.com/661366
|
| + shared_data_->stub = HeliumService::NewStub(
|
| + grpc::CreateChannel(assignment_options.engine_endpoint.ToString(),
|
| + grpc::InsecureChannelCredentials()));
|
| + shared_data_->stream = shared_data_->stub->AsyncStream(
|
| + &shared_data_->context, shared_data_->completion_queue.get(),
|
| + ConnectTag(connection_callback));
|
| +
|
| + // Now start the completion queue passing on a shared ownership of the
|
| + // |shared_data_| until one of the two threads are killed.
|
| + StartCompletionQueueThread(shared_data_);
|
| +
|
| + DVLOG(3) << "Starting client stream @ "
|
| + << assignment_options.engine_endpoint.ToString();
|
| +}
|
| +
|
| +void GrpcClientStream::SendMessage(
|
| + std::unique_ptr<HeliumWrapper> helium_message,
|
| + const Stream::SendMessageCallback& sent_cb) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes";
|
| + DCHECK_GT(helium_message->serialized_helium_message().size(), 0U);
|
| + shared_data_->stream->Write(*helium_message.get(),
|
| + reinterpret_cast<void*>(WriteTag(sent_cb)));
|
| +}
|
| +
|
| +void GrpcClientStream::ReceiveMessage(
|
| + const Stream::ReceiveMessageCallback& received_cb) {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + HeliumWrapper* received_msg = nullptr;
|
| + auto tag = ReadTag(received_cb, &received_msg);
|
| + DVLOG(3) << "Receiving (waiting for callback)";
|
| + DCHECK(received_msg != nullptr);
|
| + shared_data_->stream->Read(received_msg, reinterpret_cast<void*>(tag));
|
| +}
|
| +
|
| +// The destructor for the client stream should be called from the IO thread.
|
| +GrpcClientStream::~GrpcClientStream() {
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| + DVLOG(3) << "Destroying GrpcClientStream";
|
| + shared_data_->completion_queue->Shutdown();
|
| +}
|
| +
|
| +} // namespace blimp
|
|
|