Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(450)

Side by Side Diff: blimp/net/grpc_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« blimp/net/grpc_stream.h ('K') | « blimp/net/grpc_stream.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "blimp/net/grpc_stream.h"
6
7 #include "base/bind_helpers.h"
8 #include "base/callback.h"
9 #include "base/callback_helpers.h"
10 #include "base/command_line.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/synchronization/lock.h"
13 #include "base/threading/thread.h"
14 #include "base/threading/thread_restrictions.h"
15 #include "base/threading/thread_task_runner_handle.h"
16
17 #include "content/public/browser/browser_thread.h"
18
19 namespace blimp {
20
21 GrpcStream::GrpcStream(const net::CompletionCallback& connection_callback)
22 : callback_task_runner_(base::ThreadTaskRunnerHandle::Get()),
23 connection_callback_(connection_callback),
24 grpc_thread_(base::MakeUnique<base::Thread>("GrpcThread")),
25 weak_factory_(this) {}
26
27 GrpcStream::GrpcTag* GrpcStream::GrpcConnectTag() {
28 return GrpcTag::Connect(connection_callback_);
29 }
30
31 GrpcStream::GrpcTag* GrpcStream::GrpcReadTag(
32 const HeliumMessageReceivedCb& received_cb) {
33 return GrpcTag::Read(received_cb);
34 }
35
36 GrpcStream::GrpcTag* GrpcStream::GrpcWriteTag(
37 const HeliumMessageSentCb& sent_cb) {
38 return GrpcTag::Write(sent_cb);
39 }
40
41 GrpcStream::GrpcTag* GrpcStream::GrpcWritesDoneTag() {
42 return GrpcTag::WritesDone();
43 }
44
45 void GrpcStream::StartCompletionQueueThread(
46 grpc::CompletionQueue* completion_q) {
47 if (grpc_thread_ == nullptr) {
48 LOG(FATAL) << "Completion queue thread can only be started exactly once.";
49 return;
50 }
51
52 base::Thread::Options options;
53 // options.joinable = false;
54 grpc_thread_->StartWithOptions(options);
55 grpc_thread_.release()->task_runner()->PostTask(
56 FROM_HERE, base::Bind(&GrpcStream::CompletionQueueThread, completion_q,
57 callback_task_runner_));
58 }
59
60 void GrpcStream::CompletionQueueThread(
61 grpc::CompletionQueue* completion_q,
62 scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner) {
63 base::ThreadRestrictions::SetIOAllowed(true);
64
65 DVLOG(3) << "Starting Completion Queue thread.";
66 static int event_count = 0;
67 bool ok = false;
68 void* tag = nullptr;
69 DVLOG(3) << "Waiting for next event (grpc stream; count so far = "
70 << event_count << ").";
71 ++event_count;
72
73 if (completion_q->Next(reinterpret_cast<void**>(&tag), &ok)) {
74 if (tag != nullptr) {
75 auto grpc_tag = reinterpret_cast<GrpcTag*>(tag);
76 if (callback_task_runner != nullptr) {
77 DVLOG(3) << "Triggering GrpcTag (from gRPC thread): "
78 << static_cast<int>(grpc_tag->tag_type_)
79 << "; Result: " << (ok ? "OK" : "ERROR");
80 callback_task_runner->PostTask(
81 FROM_HERE,
82 base::Bind(&GrpcStream::GrpcTag::ApplyCallbackOnCbThread,
83 base::Passed(base::WrapUnique(grpc_tag)), ok));
84 } else {
85 LOG(ERROR) << "Unable to trigger gRPC callbacks.";
86 delete grpc_tag;
87 }
88 } else {
89 LOG(ERROR) << "Completion queue shutting down; tag = " << tag;
90 }
91
92 if (ok) {
93 auto task_runner = base::ThreadTaskRunnerHandle::Get();
94 if (task_runner != nullptr) {
95 task_runner->PostTask(FROM_HERE,
96 base::Bind(&GrpcStream::CompletionQueueThread,
97 completion_q, callback_task_runner));
98 }
99 } else {
100 LOG(INFO) << "Tag " << tag << " had problems. Exiting.";
101 }
102 } else {
103 LOG(ERROR) << "Completion queue thread has no more events.";
104 }
105 }
106
107 GrpcStream::~GrpcStream() {
108 callback_task_runner_ = nullptr;
109 }
110
111 GrpcStream::GrpcTag::GrpcTag() : tag_type_(TagType::UNKNOWN) {
112 DVLOG(3) << "Created Tag " << this;
113 }
114
115 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Connect(
116 const net::CompletionCallback& connection_cb) {
117 auto tag = new GrpcStream::GrpcTag();
118 tag->tag_type_ = TagType::CONNECT;
119 tag->connection_cb_ = connection_cb;
120 return tag;
121 }
122
123 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Write(
124 const GrpcStream::HeliumMessageSentCb& callback) {
125 auto tag = new GrpcStream::GrpcTag();
126 tag->tag_type_ = TagType::WRITE;
127 tag->sent_cb_ = callback;
128 return tag;
129 }
130
131 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Read(
132 const GrpcStream::HeliumMessageReceivedCb& callback) {
133 auto tag = new GrpcStream::GrpcTag();
134 tag->tag_type_ = TagType::READ;
135 tag->received_msg_ = base::MakeUnique<HeliumWrapper>();
136 tag->received_cb_ = callback;
137 return tag;
138 }
139
140 GrpcStream::GrpcTag* GrpcStream::GrpcTag::WritesDone() {
141 auto tag = new GrpcStream::GrpcTag();
142 tag->tag_type_ = TagType::WRITES_DONE;
143 return tag;
144 }
145
146 HeliumWrapper* GrpcStream::GrpcTag::GetReceivedMsg() const {
147 CHECK_EQ(TagType::READ, tag_type_);
148 return received_msg_.get();
149 }
150
151 void GrpcStream::GrpcTag::ApplyCallbackOnCbThread(bool ok) {
152 auto status_code = (ok ? grpc::OK : -grpc::UNKNOWN);
153 DVLOG(3) << "Processing GrpcTag (on IO thread): "
154 << static_cast<int>(tag_type_)
155 << " status_code = " << static_cast<int>(status_code);
156 switch (tag_type_) {
157 case TagType::CONNECT:
158 DVLOG(1) << "gRPC Stream is now setup.";
159 if (!connection_cb_.is_null()) {
160 base::ResetAndReturn(&connection_cb_).Run(status_code);
161 }
162 break;
163 case TagType::READ:
164 DVLOG(1) << "Received Helium Message = "
165 << "; size = " << received_msg_->ByteSize();
166
167 base::ResetAndReturn(&received_cb_)
168 .Run(std::move(received_msg_), status_code);
169 break;
170 case TagType::WRITE:
171 // TODO(perumaal): Apply correct helium results here.
172 base::ResetAndReturn(&sent_cb_).Run(status_code);
173 break;
174 case TagType::WRITES_DONE:
175 break;
176 default:
Kevin M 2016/10/31 21:33:25 General point: don't use default, so that accident
177 LOG(FATAL) << "GrpcTag has invalid tag type : "
178 << static_cast<int>(tag_type_) << "; this = " << this;
179 }
180 }
181
182 GrpcStream::GrpcTag::~GrpcTag() {
183 DVLOG(3) << "Deleting tag " << ((void*)this);
184 }
185
186 } // namespace blimp
OLDNEW
« blimp/net/grpc_stream.h ('K') | « blimp/net/grpc_stream.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698