Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(13)

Side by Side Diff: blimp/net/grpc_engine_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Address gcasto comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « blimp/net/grpc_engine_stream.h ('k') | blimp/net/grpc_engine_transport.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include <grpc++/channel.h>
6 #include <grpc++/security/credentials.h>
7 #include <grpc++/security/server_credentials.h>
8 #include <grpc++/server.h>
9 #include <grpc++/server_builder.h>
10 #include <grpc++/server_context.h>
11 #include <grpc/grpc.h>
12
13 #include <utility>
14
15 #include "base/bind_helpers.h"
16 #include "base/callback.h"
17 #include "blimp/common/logging.h"
18 #include "blimp/common/proto/helium_service.grpc.pb.h"
19 #include "blimp/common/public/session/assignment_options.h"
20 #include "blimp/net/grpc_connection.h"
21 #include "blimp/net/grpc_engine_stream.h"
22
23 namespace blimp {
24
25 // Data-only struct for handling cross-thread gRPC objects.
26 struct GrpcEngineStream::EngineSharedData : public GrpcStream::SharedData {
27 public:
28 // The gRPC service that provides the streaming, asynchronous API.
29 HeliumService::AsyncService service;
30
31 // The gRPC server that listens to a specified port/file-descriptor.
32 std::unique_ptr<grpc::Server> server;
33
34 // The context that is used by the service/server as well as the completion
35 // queue.
36 grpc::ServerContext context;
37
38 // This is the actual asynchronous API used to stream bidirectional
39 // |HeliumMessage|s to and from a client.
40 std::unique_ptr<grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>
41 stream;
42
43 protected:
44 ~EngineSharedData() override {
45 DVLOG(3) << "Engine stream shared data is now destroyed.";
46 };
47 friend class base::RefCountedThreadSafe<EngineSharedData>;
48 };
49
50 GrpcEngineStream::GrpcEngineStream(
51 const AssignmentOptions& assignment_options,
52 const net::CompletionCallback& connection_callback)
53 : GrpcStream(), assignment_options_(assignment_options) {
54 // First start the gRPC server and add a completion queue.
55 shared_data_ = scoped_refptr<EngineSharedData>(new EngineSharedData());
56 grpc::ServerBuilder builder;
57
58 int selected_port = -1;
59 builder.AddListeningPort(assignment_options.engine_endpoint.ToString(),
60 grpc::InsecureServerCredentials(), &selected_port);
61 builder.RegisterService(&shared_data_->service);
62
63 std::unique_ptr<grpc::ServerCompletionQueue> completion_queue =
64 builder.AddCompletionQueue();
65
66 // TODO(perumaal): Add file-descriptor support. See crbug.com/661366
67 shared_data_->server = builder.BuildAndStart();
68
69 // Update our actual listening engine ip-address/port number.
70 assignment_options_.engine_endpoint = net::IPEndPoint(
71 assignment_options_.engine_endpoint.address(), selected_port);
72
73 // Now setup a completion queue thread that processes tasks (tags) from the
74 // completion queue.
75 shared_data_->stream = base::MakeUnique<
76 grpc::ServerAsyncReaderWriter<HeliumWrapper, HeliumWrapper>>(
77 &shared_data_->context);
78 shared_data_->service.RequestStream(
79 &shared_data_->context, shared_data_->stream.get(),
80 completion_queue.get(), completion_queue.get(),
81 reinterpret_cast<void*>(ConnectTag(connection_callback)));
82 shared_data_->completion_queue = std::move(completion_queue);
83 StartCompletionQueueThread(shared_data_);
84 DVLOG(3) << "Starting engine stream @ "
85 << assignment_options_.engine_endpoint.ToString();
86 }
87
88 const AssignmentOptions& GrpcEngineStream::GetAssignmentOptions() const {
89 return assignment_options_;
90 }
91
92 void GrpcEngineStream::SendMessage(
93 std::unique_ptr<HeliumWrapper> helium_message,
94 const Stream::SendMessageCallback& sent_cb) {
95 DCHECK(thread_checker_.CalledOnValidThread());
96 DVLOG(3) << "Sending: " << helium_message->ByteSize() << " bytes";
97 DCHECK_GT(helium_message->serialized_helium_message().size(), 0U);
98
99 shared_data_->stream->Write(*helium_message.get(),
100 reinterpret_cast<void*>(WriteTag(sent_cb)));
101 }
102
103 void GrpcEngineStream::ReceiveMessage(
104 const Stream::ReceiveMessageCallback& received_cb) {
105 DCHECK(thread_checker_.CalledOnValidThread());
106 DVLOG(3) << "Receiving (waiting for callback)";
107 HeliumWrapper* received_msg = nullptr;
108 void* tag = reinterpret_cast<void*>(ReadTag(received_cb, &received_msg));
109 DCHECK(received_msg != nullptr);
110 shared_data_->stream->Read(received_msg, tag);
111 }
112
113 GrpcEngineStream::~GrpcEngineStream() {
114 DCHECK(thread_checker_.CalledOnValidThread());
115
116 // The order here is important! Completion queue *must* be shutdown *after*
117 // the server shutdown. The unit-tests verify this aspect by tearing down the
118 // engine stream explicitly.
119 DVLOG(3) << "Engine stream shutting down.";
120
121 // It's important to not block the IO thread simply because the server is
122 // shutting down.
123 gpr_timespec deadline = gpr_time_from_millis(0, GPR_CLOCK_MONOTONIC);
124 shared_data_->server->Shutdown(deadline);
125 shared_data_->completion_queue->Shutdown();
126 DVLOG(3) << "Engine stream shut down finished.";
127 }
128 } // namespace blimp
OLDNEW
« no previous file with comments | « blimp/net/grpc_engine_stream.h ('k') | blimp/net/grpc_engine_transport.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698