Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(883)

Side by Side Diff: blimp/net/grpc_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Fixed a few minor comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "blimp/net/grpc_stream.h"
6
7 #include <memory>
8 #include <utility>
9
10 #include "base/bind_helpers.h"
11 #include "base/callback.h"
12 #include "base/callback_helpers.h"
13 #include "base/command_line.h"
14 #include "base/memory/ptr_util.h"
15 #include "base/memory/ref_counted.h"
16 #include "base/synchronization/lock.h"
17 #include "base/threading/thread.h"
18 #include "base/threading/thread_restrictions.h"
19 #include "base/threading/thread_task_runner_handle.h"
20
21 namespace blimp {
22
23 // GrpcStream::SharedData code. It's intended to be used a simple
24 // data-structure.
25 GrpcStream::SharedData::SharedData()
26 : grpc_thread(base::MakeUnique<base::Thread>("GrpcThread")),
27 callback_task_runner(base::ThreadTaskRunnerHandle::Get()),
28 event_count(0) {}
29
30 GrpcStream::SharedData::~SharedData() {
31 // When the shared data is being destructed, the gRPC thread object is being
32 // explicitly relinquished beforehand so as to not have any circular
33 // references.
34 DCHECK_EQ(nullptr, grpc_thread.get());
35
36 // The thread checker is purely for verification inside the completion queue
37 // thread.
38 DCHECK_EQ(nullptr, grpc_thread_checker.get());
39 }
40
41 // A wrapper class around a gRPC tag targeted for use by the completion queue.
42 struct GrpcTag {
43 public:
44 GrpcTag();
45 ~GrpcTag();
46
47 // This method calls the correct callback based on the given tag-type.
48 void ApplyCallback(bool ok);
49
50 GrpcStream::TagType tag_type;
51
52 // For initial connection.
53 net::CompletionCallback connection_cb;
54
55 // For sending.
56 Stream::SendMessageCallback sent_cb;
57
58 // For receiving.
59 std::unique_ptr<HeliumWrapper> received_msg;
60 Stream::ReceiveMessageCallback received_cb;
61
62 base::ThreadChecker thread_checker_;
63
64 friend class GrpcStream;
65 };
66
67 GrpcTag::GrpcTag() : tag_type(GrpcStream::TagType::UNKNOWN) {
68 DVLOG(3) << "Created Tag " << this;
69 }
70
71 // Applies the callback (on the IO thread).
72 void GrpcTag::ApplyCallback(bool ok) {
73 DCHECK(thread_checker_.CalledOnValidThread());
74 Result status_code =
75 (ok ? helium::Result::SUCCESS : helium::Result::ERR_PROTOCOL_ERROR);
76 DVLOG(3) << "Processing GrpcTag (on IO thread): "
77 << static_cast<int>(tag_type)
78 << " status_code = " << static_cast<int>(status_code);
79 switch (tag_type) {
80 case GrpcStream::TagType::CONNECT:
81 DVLOG(1) << "gRPC Stream is now setup.";
82 if (!connection_cb.is_null()) {
83 base::ResetAndReturn(&connection_cb).Run(status_code);
84 }
85 break;
86 case GrpcStream::TagType::READ:
87 DVLOG(1) << "Received Helium Message = "
88 << "; size = " << received_msg->ByteSize();
89
90 base::ResetAndReturn(&received_cb)
91 .Run(std::move(received_msg), status_code);
92 break;
93 case GrpcStream::TagType::WRITE:
94 // TODO(perumaal): Apply correct helium results here.
95 base::ResetAndReturn(&sent_cb).Run(status_code);
96 break;
97 default:
98 LOG(FATAL) << "GrpcTag has invalid tag type : "
99 << static_cast<int>(tag_type) << "; this = " << this;
100 }
101 }
102
103 GrpcTag::~GrpcTag() {
104 DVLOG(3) << "Deleting tag " << (reinterpret_cast<void*>(this));
105 DCHECK(thread_checker_.CalledOnValidThread());
106 }
107
108 // GrpcStream class.
109 GrpcStream::GrpcStream() : weak_factory_(this) {}
110
111 GrpcStream::~GrpcStream() {}
112
113 // Various ways of creating tags. Note that the tag is an opaque structure for
114 // the users of GrpcStream including the sub-classes so all the implementation
115 // is self-contained in the cc file without exposing the details. All the tags
116 // must be created on the same thread as the |GrpcStream| - i.e. IO thread.
117
118 // Tag used during initial connection setup.
119 GrpcTag* GrpcStream::ConnectTag(
120 const net::CompletionCallback& connection_callback) {
121 DCHECK(thread_checker_.CalledOnValidThread());
122 GrpcTag* tag = new GrpcTag();
123 tag->tag_type = GrpcStream::TagType::CONNECT;
124 tag->connection_cb = connection_callback;
125 return tag;
126 }
127
128 // Tag that delivers a read-message from the completion queue.
129 GrpcTag* GrpcStream::ReadTag(const Stream::ReceiveMessageCallback& received_cb,
130 HeliumWrapper** received_msg) {
131 DCHECK(thread_checker_.CalledOnValidThread());
132 GrpcTag* tag = new GrpcTag();
133 tag->tag_type = GrpcStream::TagType::READ;
134 tag->received_msg = base::MakeUnique<HeliumWrapper>();
135 tag->received_cb = received_cb;
136 *received_msg = tag->received_msg.get();
137 return tag;
138 }
139
140 // Tag that indicates when a message has been sent (i.e. in the completion
141 // queue).
142 GrpcTag* GrpcStream::WriteTag(const Stream::SendMessageCallback& sent_cb) {
143 DCHECK(thread_checker_.CalledOnValidThread());
144 GrpcTag* tag = new GrpcTag();
145 tag->tag_type = GrpcStream::TagType::WRITE;
146 tag->sent_cb = sent_cb;
147 return tag;
148 }
149
150 // Starts the completion queue with the provided |shared_data| that is owned by
151 // both the completion queue thread and the IO thread.
152 void GrpcStream::StartCompletionQueueThread(
153 scoped_refptr<SharedData> shared_data) {
154 if (shared_data->grpc_thread == nullptr) {
155 LOG(FATAL) << "Completion queue thread can only be started exactly once.";
156 return;
157 }
158
159 base::Thread::Options options;
160 shared_data->grpc_thread->StartWithOptions(options);
161 shared_data->grpc_thread.release()->task_runner()->PostTask(
162 FROM_HERE,
163 base::Bind(&GrpcStream::CompletionQueueThread,
164 base::Passed(std::move(shared_data)), TagType::UNKNOWN));
165 }
166
167 // Ensures that the given "tag-type to tag-type transition" is OK.
168 /* static */
169 GrpcStream::TagType GrpcStream::CheckTagType(
170 GrpcStream::TagType current_tag_type,
171 GrpcStream::TagType previous_tag_type) {
172 switch (current_tag_type) {
173 case TagType::CONNECT:
174 DCHECK_EQ(TagType::UNKNOWN, previous_tag_type);
175 break;
176 case TagType::WRITE:
177 case TagType::READ:
178 DCHECK(previous_tag_type != TagType::UNKNOWN);
179 break;
180 default:
181 LOG(FATAL) << "Unrecognized tag-type transition; current: "
182 << static_cast<int>(current_tag_type)
183 << "; previous: " << static_cast<int>(previous_tag_type);
184 break;
185 }
186 return current_tag_type;
187 }
188
189 // This is the core Completion queue thread that processes gRPC tasks and
190 // delivers callbacks in the right callback thread (i.e. IO thread). This will
191 // continue scheduling further tasks until (a) either the completion queue is no
192 // longer active or (b) the completion queue encounters a problem (i.e. it's
193 // shutting down).
194 /* static */
195 void GrpcStream::CompletionQueueThread(scoped_refptr<SharedData> shared_data,
196 TagType previous_tag_type) {
197 base::ThreadRestrictions::SetIOAllowed(true);
198
199 if (shared_data->grpc_thread_checker == nullptr) {
200 shared_data->grpc_thread_checker = base::MakeUnique<base::ThreadChecker>();
201 }
202
203 // Create once in the completion queue thread and ensure we always get called
204 // into the same thread until the completion queue is shutdown.
205 DCHECK(shared_data->grpc_thread_checker->CalledOnValidThread());
206 bool ok = false;
207 void* tag = nullptr;
208 DVLOG(3) << "Waiting for next event (grpc stream; count so far = "
209 << shared_data->event_count << ").";
210 ++shared_data->event_count;
211
212 // This is a blocking call. Waits until (a) either a tag is available for
213 // processing or (b) when the completion queue is being shutdown.
214 if (shared_data->completion_queue->Next(reinterpret_cast<void**>(&tag),
215 &ok)) {
216 if (tag != nullptr) {
217 std::unique_ptr<GrpcTag> grpc_tag =
218 base::WrapUnique<GrpcTag>(reinterpret_cast<GrpcTag*>(tag));
219 if (shared_data->callback_task_runner != nullptr) {
220 DVLOG(3) << "Triggering GrpcTag (from gRPC thread): "
221 << static_cast<int>(grpc_tag->tag_type)
222 << "; Result: " << (ok ? "OK" : "ERROR");
223 previous_tag_type =
224 GrpcStream::CheckTagType(grpc_tag->tag_type, previous_tag_type);
225 shared_data->callback_task_runner->PostTask(
226 FROM_HERE,
227 base::Bind(&GrpcTag::ApplyCallback, std::move(grpc_tag), ok));
228 } else {
229 LOG(ERROR) << "Unable to trigger gRPC callbacks.";
230 }
231 }
232
233 if (!ok) {
234 DVLOG(3) << "Tag " << tag
235 << " had problems. Continuing to process remaining tags.";
236 }
237 scoped_refptr<base::SingleThreadTaskRunner> task_runner =
238 base::ThreadTaskRunnerHandle::Get();
239 if (task_runner != nullptr) {
240 task_runner->PostTask(
241 FROM_HERE,
242 base::Bind(&GrpcStream::CompletionQueueThread,
243 base::Passed(std::move(shared_data)), previous_tag_type));
244 return;
245 }
246 } else {
247 LOG(ERROR) << "Completion queue thread has no more events.";
248 }
249
250 // We reach here once the completion queue has shutdown or when there is an
251 // unrecoverable error.
252
253 // Ensure that the |shared_data| thread checker is destructed in the same
254 // thread as it was created in.
255 shared_data->grpc_thread_checker = nullptr;
256 }
257
258 } // namespace blimp
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698