Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(220)

Side by Side Diff: blimp/net/grpc_stream.cc

Issue 2462183002: GRPC Stream implementation of HeliumStream
Patch Set: Implement helium::Stream Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« blimp/net/DEPS ('K') | « blimp/net/grpc_stream.h ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "blimp/net/grpc_stream.h"
6
7 #include "base/bind_helpers.h"
8 #include "base/callback.h"
9 #include "base/callback_helpers.h"
10 #include "base/command_line.h"
11 #include "base/memory/ptr_util.h"
12 #include "base/memory/ref_counted.h"
13 #include "base/synchronization/lock.h"
14 #include "base/threading/thread.h"
15 #include "base/threading/thread_restrictions.h"
16 #include "base/threading/thread_task_runner_handle.h"
17
18 #include "content/public/browser/browser_thread.h"
19
20 namespace blimp {
21
22 GrpcStream::GrpcStream(const net::CompletionCallback& connection_callback)
23 : callback_task_runner_(base::ThreadTaskRunnerHandle::Get()),
24 connection_callback_(connection_callback),
25 grpc_thread_(base::MakeUnique<base::Thread>("GrpcThread")),
26 weak_factory_(this) {}
27
28 GrpcStream::GrpcTag* GrpcStream::GrpcConnectTag() {
29 return GrpcTag::Connect(connection_callback_);
30 }
31
32 GrpcStream::GrpcTag* GrpcStream::GrpcReadTag(
33 const Stream::ReceiveMessageCallback& received_cb) {
34 return GrpcTag::Read(received_cb);
35 }
36
37 GrpcStream::GrpcTag* GrpcStream::GrpcWriteTag(
38 const Stream::SendMessageCallback& sent_cb) {
39 return GrpcTag::Write(sent_cb);
40 }
41
42 GrpcStream::GrpcTag* GrpcStream::GrpcWritesDoneTag() {
43 return GrpcTag::WritesDone();
44 }
45
46 void GrpcStream::StartCompletionQueueThread(
47 grpc::CompletionQueue* completion_q) {
48 if (grpc_thread_ == nullptr) {
49 LOG(FATAL) << "Completion queue thread can only be started exactly once.";
50 return;
51 }
52
53 base::Thread::Options options;
54 grpc_thread_->StartWithOptions(options);
55 grpc_thread_.release()->task_runner()->PostTask(
56 FROM_HERE, base::Bind(&GrpcStream::CompletionQueueThread, completion_q,
57 callback_task_runner_));
58 }
59
60 void GrpcStream::CompletionQueueThread(
61 grpc::CompletionQueue* completion_q,
62 scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner) {
63 base::ThreadRestrictions::SetIOAllowed(true);
64
65 DVLOG(3) << "Starting Completion Queue thread.";
66 static int event_count = 0;
67 bool ok = false;
68 void* tag = nullptr;
69 DVLOG(3) << "Waiting for next event (grpc stream; count so far = "
70 << event_count << ").";
71 ++event_count;
72
73 if (completion_q->Next(reinterpret_cast<void**>(&tag), &ok)) {
74 if (tag != nullptr) {
75 GrpcTag* grpc_tag = reinterpret_cast<GrpcTag*>(tag);
76 if (callback_task_runner != nullptr) {
77 DVLOG(3) << "Triggering GrpcTag (from gRPC thread): "
78 << static_cast<int>(grpc_tag->tag_type_)
79 << "; Result: " << (ok ? "OK" : "ERROR");
80 callback_task_runner->PostTask(
81 FROM_HERE,
82 base::Bind(&GrpcStream::GrpcTag::ApplyCallbackOnCbThread,
83 base::Passed(base::WrapUnique(grpc_tag)), ok));
84 } else {
85 LOG(ERROR) << "Unable to trigger gRPC callbacks.";
86 delete grpc_tag;
87 }
88 } else {
89 LOG(ERROR) << "Completion queue shutting down; tag = " << tag;
90 }
91
92 if (ok) {
93 scoped_refptr<base::SingleThreadTaskRunner> task_runner =
94 base::ThreadTaskRunnerHandle::Get();
95 if (task_runner != nullptr) {
96 task_runner->PostTask(FROM_HERE,
97 base::Bind(&GrpcStream::CompletionQueueThread,
98 completion_q, callback_task_runner));
99 }
100 } else {
101 LOG(INFO) << "Tag " << tag << " had problems. Exiting.";
102 }
103 } else {
104 LOG(ERROR) << "Completion queue thread has no more events.";
105 }
106 }
107
108 GrpcStream::~GrpcStream() { callback_task_runner_ = nullptr; }
109
110 GrpcStream::GrpcTag::GrpcTag() : tag_type_(TagType::UNKNOWN) {
111 DVLOG(3) << "Created Tag " << this;
112 }
113
114 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Connect(
115 const net::CompletionCallback& connection_cb) {
116 GrpcTag* tag = new GrpcStream::GrpcTag();
117 tag->tag_type_ = TagType::CONNECT;
118 tag->connection_cb_ = connection_cb;
119 return tag;
120 }
121
122 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Write(
123 const Stream::SendMessageCallback& callback) {
124 GrpcTag* tag = new GrpcStream::GrpcTag();
125 tag->tag_type_ = TagType::WRITE;
126 tag->sent_cb_ = callback;
127 return tag;
128 }
129
130 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Read(
131 const Stream::ReceiveMessageCallback& callback) {
132 GrpcTag* tag = new GrpcStream::GrpcTag();
133 tag->tag_type_ = TagType::READ;
134 tag->received_msg_ = base::MakeUnique<HeliumWrapper>();
135 tag->received_cb_ = callback;
136 return tag;
137 }
138
139 GrpcStream::GrpcTag* GrpcStream::GrpcTag::WritesDone() {
140 GrpcTag* tag = new GrpcStream::GrpcTag();
141 tag->tag_type_ = TagType::WRITES_DONE;
142 return tag;
143 }
144
145 HeliumWrapper* GrpcStream::GrpcTag::GetReceivedMsg() const {
146 CHECK_EQ(TagType::READ, tag_type_);
147 return received_msg_.get();
148 }
149
150 void GrpcStream::GrpcTag::ApplyCallbackOnCbThread(bool ok) {
151 Result status_code =
152 (ok ? helium::Result::SUCCESS : helium::Result::ERR_PROTOCOL_ERROR);
153 DVLOG(3) << "Processing GrpcTag (on IO thread): "
154 << static_cast<int>(tag_type_)
155 << " status_code = " << static_cast<int>(status_code);
156 switch (tag_type_) {
157 case TagType::CONNECT:
158 DVLOG(1) << "gRPC Stream is now setup.";
159 if (!connection_cb_.is_null()) {
160 base::ResetAndReturn(&connection_cb_).Run(status_code);
161 }
162 break;
163 case TagType::READ:
164 DVLOG(1) << "Received Helium Message = "
165 << "; size = " << received_msg_->ByteSize();
166
167 base::ResetAndReturn(&received_cb_)
168 .Run(std::move(received_msg_), status_code);
169 break;
170 case TagType::WRITE:
171 // TODO(perumaal): Apply correct helium results here.
172 base::ResetAndReturn(&sent_cb_).Run(status_code);
173 break;
174 case TagType::WRITES_DONE:
175 break;
176 default:
177 LOG(FATAL) << "GrpcTag has invalid tag type : "
178 << static_cast<int>(tag_type_) << "; this = " << this;
179 }
180 }
181
182 GrpcStream::GrpcTag::~GrpcTag() {
183 DVLOG(3) << "Deleting tag " << ((void*)this);
184 }
185
186 } // namespace blimp
OLDNEW
« blimp/net/DEPS ('K') | « blimp/net/grpc_stream.h ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698