Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(501)

Side by Side Diff: blimp/net/grpc_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Address gcasto comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « blimp/net/grpc_stream.h ('k') | blimp/net/grpc_stream_unittest.cc » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "blimp/net/grpc_stream.h"
6
7 #include <memory>
8 #include <utility>
9
10 #include "base/bind_helpers.h"
11 #include "base/callback.h"
12 #include "base/callback_helpers.h"
13 #include "base/command_line.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/synchronization/lock.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/thread_restrictions.h"
19 #include "base/threading/thread_task_runner_handle.h"
20
21 namespace blimp {
22
23 // GrpcStream::SharedData code. It's intended to be used as a simple
24 // data-structure.
25 GrpcStream::SharedData::SharedData()
26 : callback_task_runner(base::ThreadTaskRunnerHandle::Get()),
27 event_count(0),
28 grpc_thread(new base::Thread("GrpcThread")) {}
29
30 GrpcStream::SharedData::~SharedData() {
31 // When the shared data is being destructed, the gRPC thread object is being
32 // explicitly relinquished beforehand so as to not have any circular
33 // references.
34 DCHECK_EQ(nullptr, grpc_thread);
35
36 // The thread checker is purely for verification inside the completion queue
37 // thread.
38 DCHECK_EQ(nullptr, grpc_thread_checker.get());
39 }
40
41 // A wrapper class around a gRPC tag targeted for use by the completion queue.
42 struct GrpcTag {
43 public:
44 GrpcTag();
45 ~GrpcTag();
46
47 // This method calls the correct callback based on the given tag-type.
48 void ApplyCallback(bool ok);
49
50 GrpcStream::TagType tag_type;
51
52 // For initial connection.
53 net::CompletionCallback connection_cb;
54
55 // For sending.
56 Stream::SendMessageCallback sent_cb;
57
58 // For receiving.
59 std::unique_ptr<HeliumWrapper> received_msg;
60 Stream::ReceiveMessageCallback received_cb;
61
62 base::ThreadChecker thread_checker;
63
64 friend class GrpcStream;
65 };
66
67 GrpcTag::GrpcTag() : tag_type(GrpcStream::TagType::UNKNOWN) {
68 DVLOG(3) << "Created Tag " << this;
69 }
70
71 // Applies the callback (on the IO thread).
72 void GrpcTag::ApplyCallback(bool ok) {
73 DCHECK(thread_checker.CalledOnValidThread());
74 Result status_code =
75 (ok ? helium::Result::SUCCESS : helium::Result::ERR_PROTOCOL_ERROR);
76 DVLOG(3) << "Processing GrpcTag (on IO thread): "
77 << static_cast<int>(tag_type)
78 << " status_code = " << static_cast<int>(status_code);
79 switch (tag_type) {
80 case GrpcStream::TagType::CONNECT:
81 DVLOG(1) << "gRPC Stream is now setup.";
82 if (!connection_cb.is_null()) {
83 base::ResetAndReturn(&connection_cb).Run(status_code);
84 }
85 break;
86 case GrpcStream::TagType::READ:
87 DVLOG(1) << "Received Helium Message = "
88 << "; size = " << received_msg->ByteSize();
89
90 base::ResetAndReturn(&received_cb)
91 .Run(std::move(received_msg), status_code);
92 break;
93 case GrpcStream::TagType::WRITE:
94 // TODO(perumaal): Apply correct helium results here.
95 base::ResetAndReturn(&sent_cb).Run(status_code);
96 break;
97 default:
98 LOG(FATAL) << "GrpcTag has invalid tag type : "
99 << static_cast<int>(tag_type) << "; this = " << this;
100 }
101 }
102
103 GrpcTag::~GrpcTag() {
104 DVLOG(3) << "Deleting tag " << (reinterpret_cast<void*>(this));
105 }
106
107 // GrpcStream class.
108 GrpcStream::GrpcStream() : weak_factory_(this) {}
109
110 GrpcStream::~GrpcStream() {}
111
112 // Various ways of creating tags. Note that the tag is an opaque structure for
113 // the users of GrpcStream including the sub-classes so all the implementation
114 // is self-contained in the cc file without exposing the details. All the tags
115 // must be created on the same thread as the |GrpcStream| - i.e. IO thread.
116
117 // Tag used during initial connection setup.
118 GrpcTag* GrpcStream::ConnectTag(
119 const net::CompletionCallback& connection_callback) {
120 DCHECK(thread_checker_.CalledOnValidThread());
121 GrpcTag* tag = new GrpcTag();
122 tag->tag_type = GrpcStream::TagType::CONNECT;
123 tag->connection_cb = connection_callback;
124 return tag;
125 }
126
127 // Tag that delivers a read-message from the completion queue.
128 GrpcTag* GrpcStream::ReadTag(const Stream::ReceiveMessageCallback& received_cb,
129 HeliumWrapper** received_msg) {
130 DCHECK(thread_checker_.CalledOnValidThread());
131 GrpcTag* tag = new GrpcTag();
132 tag->tag_type = GrpcStream::TagType::READ;
133 tag->received_msg = base::MakeUnique<HeliumWrapper>();
134 tag->received_cb = received_cb;
135 *received_msg = tag->received_msg.get();
136 return tag;
137 }
138
139 // Tag that indicates when a message has been sent (i.e. in the completion
140 // queue).
141 GrpcTag* GrpcStream::WriteTag(const Stream::SendMessageCallback& sent_cb) {
142 DCHECK(thread_checker_.CalledOnValidThread());
143 GrpcTag* tag = new GrpcTag();
144 tag->tag_type = GrpcStream::TagType::WRITE;
145 tag->sent_cb = sent_cb;
146 return tag;
147 }
148
149 // Starts the completion queue with the provided |shared_data| that is owned by
150 // both the completion queue thread and the IO thread.
151 void GrpcStream::StartCompletionQueueThread(
152 scoped_refptr<SharedData> shared_data) {
153 if (shared_data->grpc_thread == nullptr) {
154 LOG(FATAL) << "Completion queue thread can only be started exactly once.";
155 return;
156 }
157
158 base::Thread::Options options;
159 options.message_loop_type = base::MessageLoop::TYPE_IO;
160 shared_data->grpc_thread->StartWithOptions(options);
161 shared_data->grpc_thread->task_runner()->PostTask(
162 FROM_HERE,
163 base::Bind(&GrpcStream::CompletionQueueThread,
164 base::Passed(std::move(shared_data)), TagType::UNKNOWN));
165 }
166
167 // Ensures that the given "tag-type to tag-type transition" is OK.
168 /* static */
169 GrpcStream::TagType GrpcStream::CheckTagType(
170 GrpcStream::TagType current_tag_type,
171 GrpcStream::TagType previous_tag_type) {
172 switch (current_tag_type) {
173 case TagType::CONNECT:
174 DCHECK_EQ(TagType::UNKNOWN, previous_tag_type);
175 break;
176 case TagType::WRITE:
177 case TagType::READ:
178 DCHECK(previous_tag_type != TagType::UNKNOWN);
179 break;
180 default:
181 LOG(FATAL) << "Unrecognized tag-type transition; current: "
182 << static_cast<int>(current_tag_type)
183 << "; previous: " << static_cast<int>(previous_tag_type);
184 break;
185 }
186 return current_tag_type;
187 }
188
189 // This is the core Completion queue thread that processes gRPC tasks and
190 // delivers callbacks in the right callback thread (i.e. IO thread). This will
191 // continue scheduling further tasks until (a) either the completion queue is no
192 // longer active or (b) the completion queue encounters a problem (i.e. it's
193 // shutting down).
194 /* static */
195 void GrpcStream::CompletionQueueThread(scoped_refptr<SharedData> shared_data,
196 TagType previous_tag_type) {
197 base::ThreadRestrictions::SetIOAllowed(true);
198
199 if (shared_data->grpc_thread_checker == nullptr) {
200 shared_data->grpc_thread_checker = base::MakeUnique<base::ThreadChecker>();
201 }
202
203 // Create once in the completion queue thread and ensure we always get called
204 // into the same thread until the completion queue is shutdown.
205 DCHECK(shared_data->grpc_thread_checker->CalledOnValidThread());
206 bool ok = false;
207 void* tag = nullptr;
208 DVLOG(3) << "Waiting for next event (grpc stream; count so far = "
209 << shared_data->event_count << ").";
210 ++shared_data->event_count;
211 scoped_refptr<base::SingleThreadTaskRunner> task_runner =
212 base::ThreadTaskRunnerHandle::Get();
213
214 // This is a blocking call. Waits until (a) either a tag is available for
215 // processing or (b) when the completion queue is being shutdown.
216 if (shared_data->completion_queue->Next(reinterpret_cast<void**>(&tag),
217 &ok)) {
218 if (tag != nullptr) {
219 std::unique_ptr<GrpcTag> grpc_tag =
220 base::WrapUnique<GrpcTag>(reinterpret_cast<GrpcTag*>(tag));
221 if (shared_data->callback_task_runner != nullptr) {
222 DVLOG(3) << "Triggering GrpcTag (from gRPC thread): "
223 << static_cast<int>(grpc_tag->tag_type)
224 << "; Result: " << (ok ? "OK" : "ERROR");
225 previous_tag_type =
226 GrpcStream::CheckTagType(grpc_tag->tag_type, previous_tag_type);
227 shared_data->callback_task_runner->PostTask(
228 FROM_HERE,
229 base::Bind(&GrpcTag::ApplyCallback, std::move(grpc_tag), ok));
230 } else {
231 LOG(ERROR) << "Unable to trigger gRPC callbacks.";
232 }
233 }
234
235 if (!ok) {
236 DVLOG(3) << "Tag " << tag
237 << " had problems. Continuing to process remaining tags.";
238 }
239 task_runner->PostTask(
240 FROM_HERE,
241 base::Bind(&GrpcStream::CompletionQueueThread,
242 base::Passed(std::move(shared_data)), previous_tag_type));
243 return;
244 } else {
245 LOG(ERROR) << "Completion queue thread has no more events.";
246 }
247
248 // We reach here once the completion queue has shutdown or when there is an
249 // unrecoverable error. Destroy the thread and the shared_data now.
250
251 // Ensure that the |shared_data| thread checker is destructed in the same
252 // thread as it was created in.
253 shared_data->grpc_thread_checker = nullptr;
254
255 // TODO(perumaal): Figure out how to clean up the thread cleanly.
256 shared_data->grpc_thread = nullptr;
257 }
258
259 } // namespace blimp
OLDNEW
« no previous file with comments | « blimp/net/grpc_stream.h ('k') | blimp/net/grpc_stream_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698