Index: blimp/net/grpc_client_stream.cc |
diff --git a/blimp/net/grpc_client_stream.cc b/blimp/net/grpc_client_stream.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..8c952395fa76d3986de86f40cd727a74afe7446d |
--- /dev/null |
+++ b/blimp/net/grpc_client_stream.cc |
@@ -0,0 +1,94 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include <grpc++/channel.h> |
+#include <grpc++/client_context.h> |
+#include <grpc++/create_channel.h> |
+#include <grpc++/security/credentials.h> |
+#include <grpc/grpc.h> |
+ |
+#include "base/bind_helpers.h" |
+#include "blimp/net/grpc_client_stream.h" |
+ |
+namespace blimp { |
+ |
+// Data-only struct for handling cross-thread gRPC objects. |
+struct GrpcClientStream::ClientSharedData : public GrpcStream::SharedData { |
+ public: |
+ // This stub implements the gRPC service. We use this to strema data to |
Garrett Casto
2016/11/11 00:15:53
Nit: strema -> stream.
|
+ // and from the server (engine). |
+ std::unique_ptr<HeliumService::Stub> stub; |
+ |
+ // This is the specific Stream API exposed as part of our gRPC service. |
+ std::unique_ptr<grpc::ClientAsyncReaderWriter<HeliumWrapper, HeliumWrapper>> |
+ stream; |
+ |
+ // This is the context bound to both the stub and the stream. This must be |
+ // destroyed only after the stream is no longer in use. |
+ grpc::ClientContext context; |
+ |
+ protected: |
+ ~ClientSharedData() override { |
+ // The destructor can be called from either the IO thread or the completion |
+ // queue thread. |
+ DVLOG(3) << "Client stream shared data is now destroyed."; |
+ }; |
+ friend class base::RefCountedThreadSafe<ClientSharedData>; |
+}; |
+ |
+// Sets up the gRPC client stream and connects to the specified assignment |
+// options. |
+GrpcClientStream::GrpcClientStream( |
+ const AssignmentOptions& assignment_options, |
+ const net::CompletionCallback& connection_callback) |
+ : GrpcStream(), |
+ assignment_options_(assignment_options), |
+ shared_data_(scoped_refptr<ClientSharedData>(new ClientSharedData())) { |
+ // First establish a stream to the engine (asynchronously establishing a |
+ // connection as well). |
+ shared_data_->completion_queue = base::MakeUnique<grpc::CompletionQueue>(); |
+ // TODO(perumaal): Add file-descriptor support. See crbug.com/661366 |
+ shared_data_->stub = HeliumService::NewStub( |
+ grpc::CreateChannel(assignment_options.engine_endpoint.ToString(), |
+ grpc::InsecureChannelCredentials())); |
+ shared_data_->stream = shared_data_->stub->AsyncStream( |
+ &shared_data_->context, shared_data_->completion_queue.get(), |
+ ConnectTag(connection_callback)); |
+ |
+ // Now start the completion queue passing on a shared ownership of the |
+ // |shared_data_| until one of the two threads are killed. |
+ StartCompletionQueueThread(shared_data_); |
+ |
+ DVLOG(3) << "Starting client stream @ " |
+ << assignment_options.engine_endpoint.ToString(); |
+} |
+ |
+void GrpcClientStream::SendMessage( |
+ std::unique_ptr<HeliumWrapper> helium_message, |
+ const Stream::SendMessageCallback& sent_cb) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes"; |
+ DCHECK_GT(helium_message->serialized_helium_message().size(), 0U); |
+ shared_data_->stream->Write(*helium_message.get(), |
+ reinterpret_cast<void*>(WriteTag(sent_cb))); |
+} |
+ |
+void GrpcClientStream::ReceiveMessage( |
+ const Stream::ReceiveMessageCallback& received_cb) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ HeliumWrapper* received_msg = nullptr; |
+ auto tag = ReadTag(received_cb, &received_msg); |
+ DVLOG(3) << "Receiving (waiting for callback)"; |
+ DCHECK(received_msg != nullptr); |
+ shared_data_->stream->Read(received_msg, reinterpret_cast<void*>(tag)); |
+} |
+ |
+// The destructor for the client stream should be called from the IO thread. |
+GrpcClientStream::~GrpcClientStream() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DVLOG(3) << "Destroying GrpcClientStream"; |
+ shared_data_->completion_queue->Shutdown(); |
+} |
+ |
+} // namespace blimp |