Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include <grpc++/channel.h> | |
| 6 #include <grpc++/client_context.h> | |
| 7 #include <grpc++/create_channel.h> | |
| 8 #include <grpc++/security/credentials.h> | |
| 9 #include <grpc/grpc.h> | |
| 10 | |
| 11 #include "base/bind_helpers.h" | |
| 12 #include "blimp/net/grpc_client_stream.h" | |
| 13 | |
| 14 namespace blimp { | |
| 15 | |
| 16 // Data-only struct for handling cross-thread gRPC objects. | |
| 17 struct GrpcClientStream::ClientSharedData : public GrpcStream::SharedData { | |
| 18 public: | |
| 19 // This stub implements the gRPC service. We use this to strema data to | |
|
Garrett Casto
2016/11/11 00:15:53
Nit: strema -> stream.
| |
| 20 // and from the server (engine). | |
| 21 std::unique_ptr<HeliumService::Stub> stub; | |
| 22 | |
| 23 // This is the specific Stream API exposed as part of our gRPC service. | |
| 24 std::unique_ptr<grpc::ClientAsyncReaderWriter<HeliumWrapper, HeliumWrapper>> | |
| 25 stream; | |
| 26 | |
| 27 // This is the context bound to both the stub and the stream. This must be | |
| 28 // destroyed only after the stream is no longer in use. | |
| 29 grpc::ClientContext context; | |
| 30 | |
| 31 protected: | |
| 32 ~ClientSharedData() override { | |
| 33 // The destructor can be called from either the IO thread or the completion | |
| 34 // queue thread. | |
| 35 DVLOG(3) << "Client stream shared data is now destroyed."; | |
| 36 }; | |
| 37 friend class base::RefCountedThreadSafe<ClientSharedData>; | |
| 38 }; | |
| 39 | |
| 40 // Sets up the gRPC client stream and connects to the specified assignment | |
| 41 // options. | |
| 42 GrpcClientStream::GrpcClientStream( | |
| 43 const AssignmentOptions& assignment_options, | |
| 44 const net::CompletionCallback& connection_callback) | |
| 45 : GrpcStream(), | |
| 46 assignment_options_(assignment_options), | |
| 47 shared_data_(scoped_refptr<ClientSharedData>(new ClientSharedData())) { | |
| 48 // First establish a stream to the engine (asynchronously establishing a | |
| 49 // connection as well). | |
| 50 shared_data_->completion_queue = base::MakeUnique<grpc::CompletionQueue>(); | |
| 51 // TODO(perumaal): Add file-descriptor support. See crbug.com/661366 | |
| 52 shared_data_->stub = HeliumService::NewStub( | |
| 53 grpc::CreateChannel(assignment_options.engine_endpoint.ToString(), | |
| 54 grpc::InsecureChannelCredentials())); | |
| 55 shared_data_->stream = shared_data_->stub->AsyncStream( | |
| 56 &shared_data_->context, shared_data_->completion_queue.get(), | |
| 57 ConnectTag(connection_callback)); | |
| 58 | |
| 59 // Now start the completion queue passing on a shared ownership of the | |
| 60 // |shared_data_| until one of the two threads are killed. | |
| 61 StartCompletionQueueThread(shared_data_); | |
| 62 | |
| 63 DVLOG(3) << "Starting client stream @ " | |
| 64 << assignment_options.engine_endpoint.ToString(); | |
| 65 } | |
| 66 | |
| 67 void GrpcClientStream::SendMessage( | |
| 68 std::unique_ptr<HeliumWrapper> helium_message, | |
| 69 const Stream::SendMessageCallback& sent_cb) { | |
| 70 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 71 DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes"; | |
| 72 DCHECK_GT(helium_message->serialized_helium_message().size(), 0U); | |
| 73 shared_data_->stream->Write(*helium_message.get(), | |
| 74 reinterpret_cast<void*>(WriteTag(sent_cb))); | |
| 75 } | |
| 76 | |
| 77 void GrpcClientStream::ReceiveMessage( | |
| 78 const Stream::ReceiveMessageCallback& received_cb) { | |
| 79 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 80 HeliumWrapper* received_msg = nullptr; | |
| 81 auto tag = ReadTag(received_cb, &received_msg); | |
| 82 DVLOG(3) << "Receiving (waiting for callback)"; | |
| 83 DCHECK(received_msg != nullptr); | |
| 84 shared_data_->stream->Read(received_msg, reinterpret_cast<void*>(tag)); | |
| 85 } | |
| 86 | |
| 87 // The destructor for the client stream should be called from the IO thread. | |
| 88 GrpcClientStream::~GrpcClientStream() { | |
| 89 DCHECK(thread_checker_.CalledOnValidThread()); | |
| 90 DVLOG(3) << "Destroying GrpcClientStream"; | |
| 91 shared_data_->completion_queue->Shutdown(); | |
| 92 } | |
| 93 | |
| 94 } // namespace blimp | |
| OLD | NEW |