Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Unified Diff: blimp/net/grpc_client_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Fixed a few minor comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: blimp/net/grpc_client_stream.cc
diff --git a/blimp/net/grpc_client_stream.cc b/blimp/net/grpc_client_stream.cc
new file mode 100644
index 0000000000000000000000000000000000000000..8c952395fa76d3986de86f40cd727a74afe7446d
--- /dev/null
+++ b/blimp/net/grpc_client_stream.cc
@@ -0,0 +1,94 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include <grpc++/channel.h>
+#include <grpc++/client_context.h>
+#include <grpc++/create_channel.h>
+#include <grpc++/security/credentials.h>
+#include <grpc/grpc.h>
+
+#include "base/bind_helpers.h"
+#include "blimp/net/grpc_client_stream.h"
+
+namespace blimp {
+
+// Data-only struct for handling cross-thread gRPC objects.
+struct GrpcClientStream::ClientSharedData : public GrpcStream::SharedData {
+ public:
+ // This stub implements the gRPC service. We use this to strema data to
Garrett Casto 2016/11/11 00:15:53 Nit: strema -> stream.
+ // and from the server (engine).
+ std::unique_ptr<HeliumService::Stub> stub;
+
+ // This is the specific Stream API exposed as part of our gRPC service.
+ std::unique_ptr<grpc::ClientAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>
+ stream;
+
+ // This is the context bound to both the stub and the stream. This must be
+ // destroyed only after the stream is no longer in use.
+ grpc::ClientContext context;
+
+ protected:
+ ~ClientSharedData() override {
+ // The destructor can be called from either the IO thread or the completion
+ // queue thread.
+ DVLOG(3) << "Client stream shared data is now destroyed.";
+ };
+ friend class base::RefCountedThreadSafe<ClientSharedData>;
+};
+
+// Sets up the gRPC client stream and connects to the specified assignment
+// options.
+GrpcClientStream::GrpcClientStream(
+ const AssignmentOptions& assignment_options,
+ const net::CompletionCallback& connection_callback)
+ : GrpcStream(),
+ assignment_options_(assignment_options),
+ shared_data_(scoped_refptr<ClientSharedData>(new ClientSharedData())) {
+ // First establish a stream to the engine (asynchronously establishing a
+ // connection as well).
+ shared_data_->completion_queue = base::MakeUnique<grpc::CompletionQueue>();
+ // TODO(perumaal): Add file-descriptor support. See crbug.com/661366
+ shared_data_->stub = HeliumService::NewStub(
+ grpc::CreateChannel(assignment_options.engine_endpoint.ToString(),
+ grpc::InsecureChannelCredentials()));
+ shared_data_->stream = shared_data_->stub->AsyncStream(
+ &shared_data_->context, shared_data_->completion_queue.get(),
+ ConnectTag(connection_callback));
+
+ // Now start the completion queue passing on a shared ownership of the
+ // |shared_data_| until one of the two threads are killed.
+ StartCompletionQueueThread(shared_data_);
+
+ DVLOG(3) << "Starting client stream @ "
+ << assignment_options.engine_endpoint.ToString();
+}
+
+void GrpcClientStream::SendMessage(
+ std::unique_ptr<HeliumWrapper> helium_message,
+ const Stream::SendMessageCallback& sent_cb) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes";
+ DCHECK_GT(helium_message->serialized_helium_message().size(), 0U);
+ shared_data_->stream->Write(*helium_message.get(),
+ reinterpret_cast<void*>(WriteTag(sent_cb)));
+}
+
+void GrpcClientStream::ReceiveMessage(
+ const Stream::ReceiveMessageCallback& received_cb) {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ HeliumWrapper* received_msg = nullptr;
+ auto tag = ReadTag(received_cb, &received_msg);
+ DVLOG(3) << "Receiving (waiting for callback)";
+ DCHECK(received_msg != nullptr);
+ shared_data_->stream->Read(received_msg, reinterpret_cast<void*>(tag));
+}
+
+// The destructor for the client stream should be called from the IO thread.
+GrpcClientStream::~GrpcClientStream() {
+ DCHECK(thread_checker_.CalledOnValidThread());
+ DVLOG(3) << "Destroying GrpcClientStream";
+ shared_data_->completion_queue->Shutdown();
+}
+
+} // namespace blimp

Powered by Google App Engine
This is Rietveld 408576698