OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 #include <grpc++/channel.h> |
| 6 #include <grpc++/client_context.h> |
| 7 #include <grpc++/create_channel.h> |
| 8 #include <grpc++/security/credentials.h> |
| 9 #include <grpc/grpc.h> |
| 10 |
| 11 #include "base/bind_helpers.h" |
| 12 #include "blimp/net/grpc_client_stream.h" |
| 13 |
| 14 namespace blimp { |
| 15 |
| 16 // Data-only struct for handling cross-thread gRPC objects. |
| 17 struct GrpcClientStream::ClientSharedData : public GrpcStream::SharedData { |
| 18 public: |
| 19 // This stub implements the gRPC service. We use this to stream data to and |
| 20 // from the server (engine). |
| 21 std::unique_ptr<HeliumService::Stub> stub; |
| 22 |
| 23 // This is the specific Stream API exposed as part of our gRPC service. |
| 24 std::unique_ptr<grpc::ClientAsyncReaderWriter<HeliumWrapper, HeliumWrapper>> |
| 25 stream; |
| 26 |
| 27 // This is the context bound to both the stub and the stream. This must be |
| 28 // destroyed only after the stream is no longer in use. |
| 29 grpc::ClientContext context; |
| 30 |
| 31 protected: |
| 32 ~ClientSharedData() override { |
| 33 // The destructor can be called from either the IO thread or the completion |
| 34 // queue thread. |
| 35 DVLOG(3) << "Client stream shared data is now destroyed."; |
| 36 }; |
| 37 friend class base::RefCountedThreadSafe<ClientSharedData>; |
| 38 }; |
| 39 |
| 40 // Sets up the gRPC client stream and connects to the specified assignment |
| 41 // options. |
| 42 GrpcClientStream::GrpcClientStream( |
| 43 const AssignmentOptions& assignment_options, |
| 44 const net::CompletionCallback& connection_callback) |
| 45 : GrpcStream(), |
| 46 assignment_options_(assignment_options), |
| 47 shared_data_(scoped_refptr<ClientSharedData>(new ClientSharedData())) { |
| 48 // First establish a stream to the engine (asynchronously establishing a |
| 49 // connection as well). |
| 50 shared_data_->completion_queue = base::MakeUnique<grpc::CompletionQueue>(); |
| 51 // TODO(perumaal): Add file-descriptor support. See crbug.com/661366 |
| 52 shared_data_->stub = HeliumService::NewStub( |
| 53 grpc::CreateChannel(assignment_options.engine_endpoint.ToString(), |
| 54 grpc::InsecureChannelCredentials())); |
| 55 shared_data_->stream = shared_data_->stub->AsyncStream( |
| 56 &shared_data_->context, shared_data_->completion_queue.get(), |
| 57 ConnectTag(connection_callback)); |
| 58 |
| 59 // Now start the completion queue passing on a shared ownership of the |
| 60 // |shared_data_| until one of the two threads are killed. |
| 61 StartCompletionQueueThread(shared_data_); |
| 62 |
| 63 DVLOG(3) << "Starting client stream @ " |
| 64 << assignment_options.engine_endpoint.ToString(); |
| 65 } |
| 66 |
| 67 void GrpcClientStream::SendMessage( |
| 68 std::unique_ptr<HeliumWrapper> helium_message, |
| 69 const Stream::SendMessageCallback& sent_cb) { |
| 70 DCHECK(thread_checker_.CalledOnValidThread()); |
| 71 DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes"; |
| 72 DCHECK_GT(helium_message->serialized_helium_message().size(), 0U); |
| 73 shared_data_->stream->Write(*helium_message.get(), |
| 74 reinterpret_cast<void*>(WriteTag(sent_cb))); |
| 75 } |
| 76 |
| 77 void GrpcClientStream::ReceiveMessage( |
| 78 const Stream::ReceiveMessageCallback& received_cb) { |
| 79 DCHECK(thread_checker_.CalledOnValidThread()); |
| 80 HeliumWrapper* received_msg = nullptr; |
| 81 auto tag = ReadTag(received_cb, &received_msg); |
| 82 DVLOG(3) << "Receiving (waiting for callback)"; |
| 83 DCHECK(received_msg != nullptr); |
| 84 shared_data_->stream->Read(received_msg, reinterpret_cast<void*>(tag)); |
| 85 } |
| 86 |
| 87 // The destructor for the client stream should be called from the IO thread. |
| 88 GrpcClientStream::~GrpcClientStream() { |
| 89 DCHECK(thread_checker_.CalledOnValidThread()); |
| 90 DVLOG(3) << "Destroying GrpcClientStream"; |
| 91 shared_data_->completion_queue->Shutdown(); |
| 92 } |
| 93 |
| 94 } // namespace blimp |
OLD | NEW |