Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(25)

Side by Side Diff: components/grpc_support/bidirectional_stream.cc

Issue 2487863003: Revert of Revert "Revert of Moving gRPC support interfaces out of cronet and into a new component. (patchset … (Closed)
Patch Set: Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/grpc_support/bidirectional_stream.h"
6
7 #include <memory>
8 #include <string>
9 #include <vector>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/strings/string_number_conversions.h"
16 #include "net/base/io_buffer.h"
17 #include "net/base/net_errors.h"
18 #include "net/base/request_priority.h"
19 #include "net/http/bidirectional_stream.h"
20 #include "net/http/bidirectional_stream_request_info.h"
21 #include "net/http/http_network_session.h"
22 #include "net/http/http_response_headers.h"
23 #include "net/http/http_status_code.h"
24 #include "net/http/http_transaction_factory.h"
25 #include "net/http/http_util.h"
26 #include "net/spdy/spdy_header_block.h"
27 #include "net/ssl/ssl_info.h"
28 #include "net/url_request/http_user_agent_settings.h"
29 #include "net/url_request/url_request_context.h"
30 #include "net/url_request/url_request_context_getter.h"
31 #include "url/gurl.h"
32
33 namespace grpc_support {
34
35 BidirectionalStream::WriteBuffers::WriteBuffers() {}
36
37 BidirectionalStream::WriteBuffers::~WriteBuffers() {}
38
39 void BidirectionalStream::WriteBuffers::Clear() {
40 write_buffer_list.clear();
41 write_buffer_len_list.clear();
42 }
43
44 void BidirectionalStream::WriteBuffers::AppendBuffer(
45 const scoped_refptr<net::IOBuffer>& buffer,
46 int buffer_size) {
47 write_buffer_list.push_back(buffer);
48 write_buffer_len_list.push_back(buffer_size);
49 }
50
51 void BidirectionalStream::WriteBuffers::MoveTo(WriteBuffers* target) {
52 std::move(write_buffer_list.begin(), write_buffer_list.end(),
53 std::back_inserter(target->write_buffer_list));
54 std::move(write_buffer_len_list.begin(), write_buffer_len_list.end(),
55 std::back_inserter(target->write_buffer_len_list));
56 Clear();
57 }
58
59 bool BidirectionalStream::WriteBuffers::Empty() const {
60 return write_buffer_list.empty();
61 }
62
63 BidirectionalStream::BidirectionalStream(
64 net::URLRequestContextGetter* request_context_getter,
65 Delegate* delegate)
66 : read_state_(NOT_STARTED),
67 write_state_(NOT_STARTED),
68 write_end_of_stream_(false),
69 request_headers_sent_(false),
70 disable_auto_flush_(false),
71 delay_headers_until_flush_(false),
72 request_context_getter_(request_context_getter),
73 pending_write_data_(new WriteBuffers()),
74 flushing_write_data_(new WriteBuffers()),
75 sending_write_data_(new WriteBuffers()),
76 delegate_(delegate),
77 weak_factory_(this) {
78 weak_this_ = weak_factory_.GetWeakPtr();
79 }
80
81 BidirectionalStream::~BidirectionalStream() {
82 DCHECK(IsOnNetworkThread());
83 }
84
85 int BidirectionalStream::Start(const char* url,
86 int priority,
87 const char* method,
88 const net::HttpRequestHeaders& headers,
89 bool end_of_stream) {
90 // Prepare request info here to be able to return the error.
91 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info(
92 new net::BidirectionalStreamRequestInfo());
93 request_info->url = GURL(url);
94 request_info->priority = static_cast<net::RequestPriority>(priority);
95 // Http method is a token, just as header name.
96 request_info->method = method;
97 if (!net::HttpUtil::IsValidHeaderName(request_info->method))
98 return -1;
99 request_info->extra_headers.CopyFrom(headers);
100 request_info->end_stream_on_headers = end_of_stream;
101 write_end_of_stream_ = end_of_stream;
102 PostToNetworkThread(FROM_HERE,
103 base::Bind(&BidirectionalStream::StartOnNetworkThread,
104 weak_this_, base::Passed(&request_info)));
105 return 0;
106 }
107
108 bool BidirectionalStream::ReadData(char* buffer, int capacity) {
109 if (!buffer)
110 return false;
111 scoped_refptr<net::WrappedIOBuffer> read_buffer(
112 new net::WrappedIOBuffer(buffer));
113
114 PostToNetworkThread(FROM_HERE,
115 base::Bind(&BidirectionalStream::ReadDataOnNetworkThread,
116 weak_this_, read_buffer, capacity));
117 return true;
118 }
119
120 bool BidirectionalStream::WriteData(const char* buffer,
121 int count,
122 bool end_of_stream) {
123 if (!buffer)
124 return false;
125
126 scoped_refptr<net::WrappedIOBuffer> write_buffer(
127 new net::WrappedIOBuffer(buffer));
128
129 PostToNetworkThread(
130 FROM_HERE, base::Bind(&BidirectionalStream::WriteDataOnNetworkThread,
131 weak_this_, write_buffer, count, end_of_stream));
132 return true;
133 }
134
135 void BidirectionalStream::Flush() {
136 PostToNetworkThread(
137 FROM_HERE,
138 base::Bind(&BidirectionalStream::FlushOnNetworkThread, weak_this_));
139 }
140
141 void BidirectionalStream::Cancel() {
142 PostToNetworkThread(
143 FROM_HERE,
144 base::Bind(&BidirectionalStream::CancelOnNetworkThread, weak_this_));
145 }
146
147 void BidirectionalStream::Destroy() {
148 // Destroy could be called from any thread, including network thread (if
149 // posting task to executor throws an exception), but is posted, so |this|
150 // is valid until calling task is complete.
151 PostToNetworkThread(FROM_HERE,
152 base::Bind(&BidirectionalStream::DestroyOnNetworkThread,
153 base::Unretained(this)));
154 }
155
156 void BidirectionalStream::OnStreamReady(bool request_headers_sent) {
157 DCHECK(IsOnNetworkThread());
158 DCHECK_EQ(STARTED, write_state_);
159 if (!bidi_stream_)
160 return;
161 request_headers_sent_ = request_headers_sent;
162 write_state_ = WAITING_FOR_FLUSH;
163 if (write_end_of_stream_) {
164 if (!request_headers_sent) {
165 // If there is no data to write, then just send headers explicitly.
166 bidi_stream_->SendRequestHeaders();
167 request_headers_sent_ = true;
168 }
169 write_state_ = WRITING_DONE;
170 }
171 delegate_->OnStreamReady();
172 }
173
174 void BidirectionalStream::OnHeadersReceived(
175 const net::SpdyHeaderBlock& response_headers) {
176 DCHECK(IsOnNetworkThread());
177 DCHECK_EQ(STARTED, read_state_);
178 if (!bidi_stream_)
179 return;
180 read_state_ = WAITING_FOR_READ;
181 // Get http status code from response headers.
182 int http_status_code = 0;
183 const auto http_status_header = response_headers.find(":status");
184 if (http_status_header != response_headers.end())
185 base::StringToInt(http_status_header->second, &http_status_code);
186 const char* protocol = "unknown";
187 switch (bidi_stream_->GetProtocol()) {
188 case net::kProtoHTTP2:
189 protocol = "h2";
190 break;
191 case net::kProtoQUIC:
192 protocol = "quic/1+spdy/3";
193 break;
194 default:
195 break;
196 }
197 delegate_->OnHeadersReceived(response_headers, protocol);
198 }
199
200 void BidirectionalStream::OnDataRead(int bytes_read) {
201 DCHECK(IsOnNetworkThread());
202 DCHECK_EQ(READING, read_state_);
203 if (!bidi_stream_)
204 return;
205 read_state_ = WAITING_FOR_READ;
206 delegate_->OnDataRead(read_buffer_->data(), bytes_read);
207
208 // Free the read buffer.
209 read_buffer_ = nullptr;
210 if (bytes_read == 0)
211 read_state_ = READING_DONE;
212 MaybeOnSucceded();
213 }
214
215 void BidirectionalStream::OnDataSent() {
216 DCHECK(IsOnNetworkThread());
217 if (!bidi_stream_)
218 return;
219 DCHECK_EQ(WRITING, write_state_);
220 write_state_ = WAITING_FOR_FLUSH;
221 for (const scoped_refptr<net::IOBuffer>& buffer :
222 sending_write_data_->buffers()) {
223 delegate_->OnDataSent(buffer->data());
224 }
225 sending_write_data_->Clear();
226 // Send data flushed while other data was sending.
227 if (!flushing_write_data_->Empty()) {
228 SendFlushingWriteData();
229 return;
230 }
231 if (write_end_of_stream_ && pending_write_data_->Empty()) {
232 write_state_ = WRITING_DONE;
233 MaybeOnSucceded();
234 }
235 }
236
237 void BidirectionalStream::OnTrailersReceived(
238 const net::SpdyHeaderBlock& response_trailers) {
239 DCHECK(IsOnNetworkThread());
240 if (!bidi_stream_)
241 return;
242 delegate_->OnTrailersReceived(response_trailers);
243 }
244
245 void BidirectionalStream::OnFailed(int error) {
246 DCHECK(IsOnNetworkThread());
247 if (!bidi_stream_ && read_state_ != NOT_STARTED)
248 return;
249 read_state_ = write_state_ = ERR;
250 weak_factory_.InvalidateWeakPtrs();
251 // Delete underlying |bidi_stream_| asynchronously as it may still be used.
252 PostToNetworkThread(FROM_HERE,
253 base::Bind(&base::DeletePointer<net::BidirectionalStream>,
254 bidi_stream_.release()));
255 delegate_->OnFailed(error);
256 }
257
258 void BidirectionalStream::StartOnNetworkThread(
259 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) {
260 DCHECK(IsOnNetworkThread());
261 DCHECK(!bidi_stream_);
262 DCHECK(request_context_getter_->GetURLRequestContext());
263 net::URLRequestContext* request_context =
264 request_context_getter_->GetURLRequestContext();
265 request_info->extra_headers.SetHeaderIfMissing(
266 net::HttpRequestHeaders::kUserAgent,
267 request_context->http_user_agent_settings()->GetUserAgent());
268 bidi_stream_.reset(new net::BidirectionalStream(
269 std::move(request_info),
270 request_context->http_transaction_factory()->GetSession(),
271 !delay_headers_until_flush_, this));
272 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED);
273 read_state_ = write_state_ = STARTED;
274 }
275
276 void BidirectionalStream::ReadDataOnNetworkThread(
277 scoped_refptr<net::WrappedIOBuffer> read_buffer,
278 int buffer_size) {
279 DCHECK(IsOnNetworkThread());
280 DCHECK(read_buffer);
281 DCHECK(!read_buffer_);
282 if (read_state_ != WAITING_FOR_READ) {
283 DLOG(ERROR) << "Unexpected Read Data in read_state " << read_state_;
284 // Invoke OnFailed unless it is already invoked.
285 if (read_state_ != ERR)
286 OnFailed(net::ERR_UNEXPECTED);
287 return;
288 }
289 read_state_ = READING;
290 read_buffer_ = read_buffer;
291
292 int bytes_read = bidi_stream_->ReadData(read_buffer_.get(), buffer_size);
293 // If IO is pending, wait for the BidirectionalStream to call OnDataRead.
294 if (bytes_read == net::ERR_IO_PENDING)
295 return;
296
297 if (bytes_read < 0) {
298 OnFailed(bytes_read);
299 return;
300 }
301 OnDataRead(bytes_read);
302 }
303
304 void BidirectionalStream::WriteDataOnNetworkThread(
305 scoped_refptr<net::WrappedIOBuffer> write_buffer,
306 int buffer_size,
307 bool end_of_stream) {
308 DCHECK(IsOnNetworkThread());
309 DCHECK(write_buffer);
310 DCHECK(!write_end_of_stream_);
311 if (!bidi_stream_ || write_end_of_stream_) {
312 DLOG(ERROR) << "Unexpected Flush Data in write_state " << write_state_;
313 // Invoke OnFailed unless it is already invoked.
314 if (write_state_ != ERR)
315 OnFailed(net::ERR_UNEXPECTED);
316 return;
317 }
318 pending_write_data_->AppendBuffer(write_buffer, buffer_size);
319 write_end_of_stream_ = end_of_stream;
320 if (!disable_auto_flush_)
321 FlushOnNetworkThread();
322 }
323
324 void BidirectionalStream::FlushOnNetworkThread() {
325 DCHECK(IsOnNetworkThread());
326 if (!bidi_stream_)
327 return;
328 // If there is no data to flush, may need to send headers.
329 if (pending_write_data_->Empty()) {
330 if (!request_headers_sent_) {
331 request_headers_sent_ = true;
332 bidi_stream_->SendRequestHeaders();
333 }
334 return;
335 }
336 // If request headers are not sent yet, they will be sent with the data.
337 if (!request_headers_sent_)
338 request_headers_sent_ = true;
339
340 // Move pending data to the flushing list.
341 pending_write_data_->MoveTo(flushing_write_data_.get());
342 DCHECK(pending_write_data_->Empty());
343 if (write_state_ != WRITING)
344 SendFlushingWriteData();
345 }
346
347 void BidirectionalStream::SendFlushingWriteData() {
348 DCHECK(bidi_stream_);
349 // If previous send is not done, or there is nothing to flush, then exit.
350 if (write_state_ == WRITING || flushing_write_data_->Empty())
351 return;
352 DCHECK(sending_write_data_->Empty());
353 write_state_ = WRITING;
354 flushing_write_data_->MoveTo(sending_write_data_.get());
355 bidi_stream_->SendvData(sending_write_data_->buffers(),
356 sending_write_data_->lengths(),
357 write_end_of_stream_ && pending_write_data_->Empty());
358 }
359
360 void BidirectionalStream::CancelOnNetworkThread() {
361 DCHECK(IsOnNetworkThread());
362 if (!bidi_stream_)
363 return;
364 read_state_ = write_state_ = CANCELED;
365 bidi_stream_.reset();
366 weak_factory_.InvalidateWeakPtrs();
367 delegate_->OnCanceled();
368 }
369
370 void BidirectionalStream::DestroyOnNetworkThread() {
371 DCHECK(IsOnNetworkThread());
372 delete this;
373 }
374
375 void BidirectionalStream::MaybeOnSucceded() {
376 DCHECK(IsOnNetworkThread());
377 if (!bidi_stream_)
378 return;
379 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) {
380 read_state_ = write_state_ = SUCCESS;
381 weak_factory_.InvalidateWeakPtrs();
382 // Delete underlying |bidi_stream_| asynchronously as it may still be used.
383 PostToNetworkThread(
384 FROM_HERE, base::Bind(&base::DeletePointer<net::BidirectionalStream>,
385 bidi_stream_.release()));
386 delegate_->OnSucceeded();
387 }
388 }
389
390 bool BidirectionalStream::IsOnNetworkThread() {
391 return request_context_getter_->GetNetworkTaskRunner()
392 ->BelongsToCurrentThread();
393 }
394
395 void BidirectionalStream::PostToNetworkThread(
396 const tracked_objects::Location& from_here,
397 const base::Closure& task) {
398 request_context_getter_->GetNetworkTaskRunner()->PostTask(from_here, task);
399 }
400
401 } // namespace cronet
OLDNEW
« no previous file with comments | « components/grpc_support/bidirectional_stream.h ('k') | components/grpc_support/bidirectional_stream_c.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698