OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2016 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "blimp/net/grpc_stream.h" | |
6 | |
7 #include "base/bind_helpers.h" | |
8 #include "base/callback.h" | |
9 #include "base/callback_helpers.h" | |
10 #include "base/command_line.h" | |
11 #include "base/memory/ptr_util.h" | |
12 #include "base/synchronization/lock.h" | |
13 #include "base/threading/thread.h" | |
14 #include "base/threading/thread_restrictions.h" | |
15 #include "base/threading/thread_task_runner_handle.h" | |
16 | |
17 #include "content/public/browser/browser_thread.h" | |
18 | |
19 namespace blimp { | |
20 | |
21 GrpcStream::GrpcStream(const net::CompletionCallback& connection_callback) | |
22 : callback_task_runner_(base::ThreadTaskRunnerHandle::Get()), | |
23 connection_callback_(connection_callback), | |
24 grpc_thread_(base::MakeUnique<base::Thread>("GrpcThread")), | |
25 weak_factory_(this) {} | |
26 | |
27 GrpcStream::GrpcTag* GrpcStream::GrpcConnectTag() { | |
28 return GrpcTag::Connect(connection_callback_); | |
29 } | |
30 | |
31 GrpcStream::GrpcTag* GrpcStream::GrpcReadTag( | |
32 const HeliumMessageReceivedCb& received_cb) { | |
33 return GrpcTag::Read(received_cb); | |
34 } | |
35 | |
36 GrpcStream::GrpcTag* GrpcStream::GrpcWriteTag( | |
37 const HeliumMessageSentCb& sent_cb) { | |
38 return GrpcTag::Write(sent_cb); | |
39 } | |
40 | |
41 GrpcStream::GrpcTag* GrpcStream::GrpcWritesDoneTag() { | |
42 return GrpcTag::WritesDone(); | |
43 } | |
44 | |
45 void GrpcStream::StartCompletionQueueThread( | |
46 grpc::CompletionQueue* completion_q) { | |
47 if (grpc_thread_ == nullptr) { | |
48 LOG(FATAL) << "Completion queue thread can only be started exactly once."; | |
49 return; | |
50 } | |
51 | |
52 base::Thread::Options options; | |
53 // options.joinable = false; | |
54 grpc_thread_->StartWithOptions(options); | |
55 grpc_thread_.release()->task_runner()->PostTask( | |
56 FROM_HERE, base::Bind(&GrpcStream::CompletionQueueThread, completion_q, | |
57 callback_task_runner_)); | |
58 } | |
59 | |
60 void GrpcStream::CompletionQueueThread( | |
61 grpc::CompletionQueue* completion_q, | |
62 scoped_refptr<base::SingleThreadTaskRunner> callback_task_runner) { | |
63 base::ThreadRestrictions::SetIOAllowed(true); | |
64 | |
65 DVLOG(3) << "Starting Completion Queue thread."; | |
66 static int event_count = 0; | |
67 bool ok = false; | |
68 void* tag = nullptr; | |
69 DVLOG(3) << "Waiting for next event (grpc stream; count so far = " | |
70 << event_count << ")."; | |
71 ++event_count; | |
72 | |
73 if (completion_q->Next(reinterpret_cast<void**>(&tag), &ok)) { | |
74 if (tag != nullptr) { | |
75 auto grpc_tag = reinterpret_cast<GrpcTag*>(tag); | |
76 if (callback_task_runner != nullptr) { | |
77 DVLOG(3) << "Triggering GrpcTag (from gRPC thread): " | |
78 << static_cast<int>(grpc_tag->tag_type_) | |
79 << "; Result: " << (ok ? "OK" : "ERROR"); | |
80 callback_task_runner->PostTask( | |
81 FROM_HERE, | |
82 base::Bind(&GrpcStream::GrpcTag::ApplyCallbackOnCbThread, | |
83 base::Passed(base::WrapUnique(grpc_tag)), ok)); | |
84 } else { | |
85 LOG(ERROR) << "Unable to trigger gRPC callbacks."; | |
86 delete grpc_tag; | |
87 } | |
88 } else { | |
89 LOG(ERROR) << "Completion queue shutting down; tag = " << tag; | |
90 } | |
91 | |
92 if (ok) { | |
93 auto task_runner = base::ThreadTaskRunnerHandle::Get(); | |
94 if (task_runner != nullptr) { | |
95 task_runner->PostTask(FROM_HERE, | |
96 base::Bind(&GrpcStream::CompletionQueueThread, | |
97 completion_q, callback_task_runner)); | |
98 } | |
99 } else { | |
100 LOG(INFO) << "Tag " << tag << " had problems. Exiting."; | |
101 } | |
102 } else { | |
103 LOG(ERROR) << "Completion queue thread has no more events."; | |
104 } | |
105 } | |
106 | |
107 GrpcStream::~GrpcStream() { | |
108 callback_task_runner_ = nullptr; | |
109 } | |
110 | |
111 GrpcStream::GrpcTag::GrpcTag() : tag_type_(TagType::UNKNOWN) { | |
112 DVLOG(3) << "Created Tag " << this; | |
113 } | |
114 | |
115 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Connect( | |
116 const net::CompletionCallback& connection_cb) { | |
117 auto tag = new GrpcStream::GrpcTag(); | |
118 tag->tag_type_ = TagType::CONNECT; | |
119 tag->connection_cb_ = connection_cb; | |
120 return tag; | |
121 } | |
122 | |
123 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Write( | |
124 const GrpcStream::HeliumMessageSentCb& callback) { | |
125 auto tag = new GrpcStream::GrpcTag(); | |
126 tag->tag_type_ = TagType::WRITE; | |
127 tag->sent_cb_ = callback; | |
128 return tag; | |
129 } | |
130 | |
131 GrpcStream::GrpcTag* GrpcStream::GrpcTag::Read( | |
132 const GrpcStream::HeliumMessageReceivedCb& callback) { | |
133 auto tag = new GrpcStream::GrpcTag(); | |
134 tag->tag_type_ = TagType::READ; | |
135 tag->received_msg_ = base::MakeUnique<HeliumWrapper>(); | |
136 tag->received_cb_ = callback; | |
137 return tag; | |
138 } | |
139 | |
140 GrpcStream::GrpcTag* GrpcStream::GrpcTag::WritesDone() { | |
141 auto tag = new GrpcStream::GrpcTag(); | |
142 tag->tag_type_ = TagType::WRITES_DONE; | |
143 return tag; | |
144 } | |
145 | |
146 HeliumWrapper* GrpcStream::GrpcTag::GetReceivedMsg() const { | |
147 CHECK_EQ(TagType::READ, tag_type_); | |
148 return received_msg_.get(); | |
149 } | |
150 | |
151 void GrpcStream::GrpcTag::ApplyCallbackOnCbThread(bool ok) { | |
152 auto status_code = (ok ? grpc::OK : -grpc::UNKNOWN); | |
153 DVLOG(3) << "Processing GrpcTag (on IO thread): " | |
154 << static_cast<int>(tag_type_) | |
155 << " status_code = " << static_cast<int>(status_code); | |
156 switch (tag_type_) { | |
157 case TagType::CONNECT: | |
158 DVLOG(1) << "gRPC Stream is now setup."; | |
159 if (!connection_cb_.is_null()) { | |
160 base::ResetAndReturn(&connection_cb_).Run(status_code); | |
161 } | |
162 break; | |
163 case TagType::READ: | |
164 DVLOG(1) << "Received Helium Message = " | |
165 << "; size = " << received_msg_->ByteSize(); | |
166 | |
167 base::ResetAndReturn(&received_cb_) | |
168 .Run(std::move(received_msg_), status_code); | |
169 break; | |
170 case TagType::WRITE: | |
171 // TODO(perumaal): Apply correct helium results here. | |
172 base::ResetAndReturn(&sent_cb_).Run(status_code); | |
173 break; | |
174 case TagType::WRITES_DONE: | |
175 break; | |
176 default: | |
Kevin M
2016/10/31 21:33:25
General point: don't use default, so that accident
| |
177 LOG(FATAL) << "GrpcTag has invalid tag type : " | |
178 << static_cast<int>(tag_type_) << "; this = " << this; | |
179 } | |
180 } | |
181 | |
182 GrpcStream::GrpcTag::~GrpcTag() { | |
183 DVLOG(3) << "Deleting tag " << ((void*)this); | |
184 } | |
185 | |
186 } // namespace blimp | |
OLD | NEW |