Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(22)

Side by Side Diff: blimp/net/grpc_client_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Fixed a few minor comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <grpc++/channel.h>
6 #include <grpc++/client_context.h>
7 #include <grpc++/create_channel.h>
8 #include <grpc++/security/credentials.h>
9 #include <grpc/grpc.h>
10
11 #include "base/bind_helpers.h"
12 #include "blimp/net/grpc_client_stream.h"
13
14 namespace blimp {
15
16 // Data-only struct for handling cross-thread gRPC objects.
17 struct GrpcClientStream::ClientSharedData : public GrpcStream::SharedData {
18 public:
19 // This stub implements the gRPC service. We use this to strema data to
Garrett Casto 2016/11/11 00:15:53 Nit: strema -> stream.
20 // and from the server (engine).
21 std::unique_ptr<HeliumService::Stub> stub;
22
23 // This is the specific Stream API exposed as part of our gRPC service.
24 std::unique_ptr<grpc::ClientAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>
25 stream;
26
27 // This is the context bound to both the stub and the stream. This must be
28 // destroyed only after the stream is no longer in use.
29 grpc::ClientContext context;
30
31 protected:
32 ~ClientSharedData() override {
33 // The destructor can be called from either the IO thread or the completion
34 // queue thread.
35 DVLOG(3) << "Client stream shared data is now destroyed.";
36 };
37 friend class base::RefCountedThreadSafe<ClientSharedData>;
38 };
39
40 // Sets up the gRPC client stream and connects to the specified assignment
41 // options.
42 GrpcClientStream::GrpcClientStream(
43 const AssignmentOptions& assignment_options,
44 const net::CompletionCallback& connection_callback)
45 : GrpcStream(),
46 assignment_options_(assignment_options),
47 shared_data_(scoped_refptr<ClientSharedData>(new ClientSharedData())) {
48 // First establish a stream to the engine (asynchronously establishing a
49 // connection as well).
50 shared_data_->completion_queue = base::MakeUnique<grpc::CompletionQueue>();
51 // TODO(perumaal): Add file-descriptor support. See crbug.com/661366
52 shared_data_->stub = HeliumService::NewStub(
53 grpc::CreateChannel(assignment_options.engine_endpoint.ToString(),
54 grpc::InsecureChannelCredentials()));
55 shared_data_->stream = shared_data_->stub->AsyncStream(
56 &shared_data_->context, shared_data_->completion_queue.get(),
57 ConnectTag(connection_callback));
58
59 // Now start the completion queue passing on a shared ownership of the
60 // |shared_data_| until one of the two threads are killed.
61 StartCompletionQueueThread(shared_data_);
62
63 DVLOG(3) << "Starting client stream @ "
64 << assignment_options.engine_endpoint.ToString();
65 }
66
67 void GrpcClientStream::SendMessage(
68 std::unique_ptr<HeliumWrapper> helium_message,
69 const Stream::SendMessageCallback& sent_cb) {
70 DCHECK(thread_checker_.CalledOnValidThread());
71 DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes";
72 DCHECK_GT(helium_message->serialized_helium_message().size(), 0U);
73 shared_data_->stream->Write(*helium_message.get(),
74 reinterpret_cast<void*>(WriteTag(sent_cb)));
75 }
76
77 void GrpcClientStream::ReceiveMessage(
78 const Stream::ReceiveMessageCallback& received_cb) {
79 DCHECK(thread_checker_.CalledOnValidThread());
80 HeliumWrapper* received_msg = nullptr;
81 auto tag = ReadTag(received_cb, &received_msg);
82 DVLOG(3) << "Receiving (waiting for callback)";
83 DCHECK(received_msg != nullptr);
84 shared_data_->stream->Read(received_msg, reinterpret_cast<void*>(tag));
85 }
86
87 // The destructor for the client stream should be called from the IO thread.
88 GrpcClientStream::~GrpcClientStream() {
89 DCHECK(thread_checker_.CalledOnValidThread());
90 DVLOG(3) << "Destroying GrpcClientStream";
91 shared_data_->completion_queue->Shutdown();
92 }
93
94 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698