Chromium Code Reviews| Index: blimp/net/grpc_client_stream.cc |
| diff --git a/blimp/net/grpc_client_stream.cc b/blimp/net/grpc_client_stream.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..8c952395fa76d3986de86f40cd727a74afe7446d |
| --- /dev/null |
| +++ b/blimp/net/grpc_client_stream.cc |
| @@ -0,0 +1,94 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include <grpc++/channel.h> |
| +#include <grpc++/client_context.h> |
| +#include <grpc++/create_channel.h> |
| +#include <grpc++/security/credentials.h> |
| +#include <grpc/grpc.h> |
| + |
| +#include "base/bind_helpers.h" |
| +#include "blimp/net/grpc_client_stream.h" |
| + |
| +namespace blimp { |
| + |
| +// Data-only struct for handling cross-thread gRPC objects. |
| +struct GrpcClientStream::ClientSharedData : public GrpcStream::SharedData { |
| + public: |
| + // This stub implements the gRPC service. We use this to strema data to |
|
Garrett Casto
2016/11/11 00:15:53
Nit: strema -> stream.
|
| + // and from the server (engine). |
| + std::unique_ptr<HeliumService::Stub> stub; |
| + |
| + // This is the specific Stream API exposed as part of our gRPC service. |
| + std::unique_ptr<grpc::ClientAsyncReaderWriter<HeliumWrapper, HeliumWrapper>> |
| + stream; |
| + |
| + // This is the context bound to both the stub and the stream. This must be |
| + // destroyed only after the stream is no longer in use. |
| + grpc::ClientContext context; |
| + |
| + protected: |
| + ~ClientSharedData() override { |
| + // The destructor can be called from either the IO thread or the completion |
| + // queue thread. |
| + DVLOG(3) << "Client stream shared data is now destroyed."; |
| + }; |
| + friend class base::RefCountedThreadSafe<ClientSharedData>; |
| +}; |
| + |
| +// Sets up the gRPC client stream and connects to the specified assignment |
| +// options. |
| +GrpcClientStream::GrpcClientStream( |
| + const AssignmentOptions& assignment_options, |
| + const net::CompletionCallback& connection_callback) |
| + : GrpcStream(), |
| + assignment_options_(assignment_options), |
| + shared_data_(scoped_refptr<ClientSharedData>(new ClientSharedData())) { |
| + // First establish a stream to the engine (asynchronously establishing a |
| + // connection as well). |
| + shared_data_->completion_queue = base::MakeUnique<grpc::CompletionQueue>(); |
| + // TODO(perumaal): Add file-descriptor support. See crbug.com/661366 |
| + shared_data_->stub = HeliumService::NewStub( |
| + grpc::CreateChannel(assignment_options.engine_endpoint.ToString(), |
| + grpc::InsecureChannelCredentials())); |
| + shared_data_->stream = shared_data_->stub->AsyncStream( |
| + &shared_data_->context, shared_data_->completion_queue.get(), |
| + ConnectTag(connection_callback)); |
| + |
| + // Now start the completion queue passing on a shared ownership of the |
| + // |shared_data_| until one of the two threads are killed. |
| + StartCompletionQueueThread(shared_data_); |
| + |
| + DVLOG(3) << "Starting client stream @ " |
| + << assignment_options.engine_endpoint.ToString(); |
| +} |
| + |
| +void GrpcClientStream::SendMessage( |
| + std::unique_ptr<HeliumWrapper> helium_message, |
| + const Stream::SendMessageCallback& sent_cb) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes"; |
| + DCHECK_GT(helium_message->serialized_helium_message().size(), 0U); |
| + shared_data_->stream->Write(*helium_message.get(), |
| + reinterpret_cast<void*>(WriteTag(sent_cb))); |
| +} |
| + |
| +void GrpcClientStream::ReceiveMessage( |
| + const Stream::ReceiveMessageCallback& received_cb) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + HeliumWrapper* received_msg = nullptr; |
| + auto tag = ReadTag(received_cb, &received_msg); |
| + DVLOG(3) << "Receiving (waiting for callback)"; |
| + DCHECK(received_msg != nullptr); |
| + shared_data_->stream->Read(received_msg, reinterpret_cast<void*>(tag)); |
| +} |
| + |
| +// The destructor for the client stream should be called from the IO thread. |
| +GrpcClientStream::~GrpcClientStream() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + DVLOG(3) << "Destroying GrpcClientStream"; |
| + shared_data_->completion_queue->Shutdown(); |
| +} |
| + |
| +} // namespace blimp |