Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(81)

Side by Side Diff: components/cronet/ios/cronet_bidirectional_stream.cc

Issue 2273403003: Moving gRPC support interfaces out of cronet and into a new component. (Closed)
Patch Set: Address comments Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/cronet/ios/cronet_bidirectional_stream.h"
6
7 #include <memory>
8 #include <string>
9 #include <vector>
10
11 #include "base/bind.h"
12 #include "base/location.h"
13 #include "base/logging.h"
14 #include "base/memory/ref_counted.h"
15 #include "base/strings/string_number_conversions.h"
16 #include "components/cronet/ios/cronet_environment.h"
17 #include "net/base/io_buffer.h"
18 #include "net/base/net_errors.h"
19 #include "net/base/request_priority.h"
20 #include "net/http/bidirectional_stream.h"
21 #include "net/http/bidirectional_stream_request_info.h"
22 #include "net/http/http_network_session.h"
23 #include "net/http/http_response_headers.h"
24 #include "net/http/http_status_code.h"
25 #include "net/http/http_transaction_factory.h"
26 #include "net/http/http_util.h"
27 #include "net/spdy/spdy_header_block.h"
28 #include "net/ssl/ssl_info.h"
29 #include "net/url_request/http_user_agent_settings.h"
30 #include "net/url_request/url_request_context.h"
31 #include "url/gurl.h"
32
33 namespace cronet {
34
35 CronetBidirectionalStream::WriteBuffers::WriteBuffers() {}
36
37 CronetBidirectionalStream::WriteBuffers::~WriteBuffers() {}
38
39 void CronetBidirectionalStream::WriteBuffers::Clear() {
40 write_buffer_list.clear();
41 write_buffer_len_list.clear();
42 }
43
44 void CronetBidirectionalStream::WriteBuffers::AppendBuffer(
45 const scoped_refptr<net::IOBuffer>& buffer,
46 int buffer_size) {
47 write_buffer_list.push_back(buffer);
48 write_buffer_len_list.push_back(buffer_size);
49 }
50
51 void CronetBidirectionalStream::WriteBuffers::MoveTo(WriteBuffers* target) {
52 std::move(write_buffer_list.begin(), write_buffer_list.end(),
53 std::back_inserter(target->write_buffer_list));
54 std::move(write_buffer_len_list.begin(), write_buffer_len_list.end(),
55 std::back_inserter(target->write_buffer_len_list));
56 Clear();
57 }
58
59 bool CronetBidirectionalStream::WriteBuffers::Empty() const {
60 return write_buffer_list.empty();
61 }
62
63 CronetBidirectionalStream::CronetBidirectionalStream(
64 CronetEnvironment* environment,
65 Delegate* delegate)
66 : read_state_(NOT_STARTED),
67 write_state_(NOT_STARTED),
68 write_end_of_stream_(false),
69 request_headers_sent_(false),
70 disable_auto_flush_(false),
71 delay_headers_until_flush_(false),
72 environment_(environment),
73 pending_write_data_(new WriteBuffers()),
74 flushing_write_data_(new WriteBuffers()),
75 sending_write_data_(new WriteBuffers()),
76 delegate_(delegate),
77 weak_factory_(this) {
78 weak_this_ = weak_factory_.GetWeakPtr();
79 }
80
81 CronetBidirectionalStream::~CronetBidirectionalStream() {
82 DCHECK(environment_->IsOnNetworkThread());
83 }
84
85 int CronetBidirectionalStream::Start(const char* url,
86 int priority,
87 const char* method,
88 const net::HttpRequestHeaders& headers,
89 bool end_of_stream) {
90 // Prepare request info here to be able to return the error.
91 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info(
92 new net::BidirectionalStreamRequestInfo());
93 request_info->url = GURL(url);
94 request_info->priority = static_cast<net::RequestPriority>(priority);
95 // Http method is a token, just as header name.
96 request_info->method = method;
97 if (!net::HttpUtil::IsValidHeaderName(request_info->method))
98 return -1;
99 request_info->extra_headers.CopyFrom(headers);
100 request_info->end_stream_on_headers = end_of_stream;
101 write_end_of_stream_ = end_of_stream;
102 DCHECK(environment_);
103 environment_->PostToNetworkThread(
104 FROM_HERE, base::Bind(&CronetBidirectionalStream::StartOnNetworkThread,
105 weak_this_, base::Passed(&request_info)));
106 return 0;
107 }
108
109 bool CronetBidirectionalStream::ReadData(char* buffer, int capacity) {
110 if (!buffer)
111 return false;
112 scoped_refptr<net::WrappedIOBuffer> read_buffer(
113 new net::WrappedIOBuffer(buffer));
114
115 environment_->PostToNetworkThread(
116 FROM_HERE, base::Bind(&CronetBidirectionalStream::ReadDataOnNetworkThread,
117 weak_this_, read_buffer, capacity));
118 return true;
119 }
120
121 bool CronetBidirectionalStream::WriteData(const char* buffer,
122 int count,
123 bool end_of_stream) {
124 if (!buffer)
125 return false;
126
127 scoped_refptr<net::WrappedIOBuffer> write_buffer(
128 new net::WrappedIOBuffer(buffer));
129
130 environment_->PostToNetworkThread(
131 FROM_HERE,
132 base::Bind(&CronetBidirectionalStream::WriteDataOnNetworkThread,
133 weak_this_, write_buffer, count, end_of_stream));
134 return true;
135 }
136
137 void CronetBidirectionalStream::Flush() {
138 environment_->PostToNetworkThread(
139 FROM_HERE,
140 base::Bind(&CronetBidirectionalStream::FlushOnNetworkThread, weak_this_));
141 }
142
143 void CronetBidirectionalStream::Cancel() {
144 environment_->PostToNetworkThread(
145 FROM_HERE, base::Bind(&CronetBidirectionalStream::CancelOnNetworkThread,
146 weak_this_));
147 }
148
149 void CronetBidirectionalStream::Destroy() {
150 // Destroy could be called from any thread, including network thread (if
151 // posting task to executor throws an exception), but is posted, so |this|
152 // is valid until calling task is complete.
153 environment_->PostToNetworkThread(
154 FROM_HERE, base::Bind(&CronetBidirectionalStream::DestroyOnNetworkThread,
155 base::Unretained(this)));
156 }
157
158 void CronetBidirectionalStream::OnStreamReady(bool request_headers_sent) {
159 DCHECK(environment_->IsOnNetworkThread());
160 DCHECK_EQ(STARTED, write_state_);
161 if (!bidi_stream_)
162 return;
163 request_headers_sent_ = request_headers_sent;
164 write_state_ = WAITING_FOR_FLUSH;
165 if (write_end_of_stream_) {
166 if (!request_headers_sent) {
167 // If there is no data to write, then just send headers explicitly.
168 bidi_stream_->SendRequestHeaders();
169 request_headers_sent_ = true;
170 }
171 write_state_ = WRITING_DONE;
172 }
173 delegate_->OnStreamReady();
174 }
175
176 void CronetBidirectionalStream::OnHeadersReceived(
177 const net::SpdyHeaderBlock& response_headers) {
178 DCHECK(environment_->IsOnNetworkThread());
179 DCHECK_EQ(STARTED, read_state_);
180 if (!bidi_stream_)
181 return;
182 read_state_ = WAITING_FOR_READ;
183 // Get http status code from response headers.
184 int http_status_code = 0;
185 const auto http_status_header = response_headers.find(":status");
186 if (http_status_header != response_headers.end())
187 base::StringToInt(http_status_header->second, &http_status_code);
188 const char* protocol = "unknown";
189 switch (bidi_stream_->GetProtocol()) {
190 case net::kProtoHTTP2:
191 protocol = "h2";
192 break;
193 case net::kProtoQUIC:
194 protocol = "quic/1+spdy/3";
195 break;
196 default:
197 break;
198 }
199 delegate_->OnHeadersReceived(response_headers, protocol);
200 }
201
202 void CronetBidirectionalStream::OnDataRead(int bytes_read) {
203 DCHECK(environment_->IsOnNetworkThread());
204 DCHECK_EQ(READING, read_state_);
205 if (!bidi_stream_)
206 return;
207 read_state_ = WAITING_FOR_READ;
208 delegate_->OnDataRead(read_buffer_->data(), bytes_read);
209
210 // Free the read buffer.
211 read_buffer_ = nullptr;
212 if (bytes_read == 0)
213 read_state_ = READING_DONE;
214 MaybeOnSucceded();
215 }
216
217 void CronetBidirectionalStream::OnDataSent() {
218 DCHECK(environment_->IsOnNetworkThread());
219 if (!bidi_stream_)
220 return;
221 DCHECK_EQ(WRITING, write_state_);
222 write_state_ = WAITING_FOR_FLUSH;
223 for (const scoped_refptr<net::IOBuffer>& buffer :
224 sending_write_data_->buffers()) {
225 delegate_->OnDataSent(buffer->data());
226 }
227 sending_write_data_->Clear();
228 // Send data flushed while other data was sending.
229 if (!flushing_write_data_->Empty()) {
230 SendFlushingWriteData();
231 return;
232 }
233 if (write_end_of_stream_ && pending_write_data_->Empty()) {
234 write_state_ = WRITING_DONE;
235 MaybeOnSucceded();
236 }
237 }
238
239 void CronetBidirectionalStream::OnTrailersReceived(
240 const net::SpdyHeaderBlock& response_trailers) {
241 DCHECK(environment_->IsOnNetworkThread());
242 if (!bidi_stream_)
243 return;
244 delegate_->OnTrailersReceived(response_trailers);
245 }
246
247 void CronetBidirectionalStream::OnFailed(int error) {
248 DCHECK(environment_->IsOnNetworkThread());
249 if (!bidi_stream_ && read_state_ != NOT_STARTED)
250 return;
251 read_state_ = write_state_ = ERROR;
252 weak_factory_.InvalidateWeakPtrs();
253 // Delete underlying |bidi_stream_| asynchronously as it may still be used.
254 environment_->PostToNetworkThread(
255 FROM_HERE, base::Bind(&base::DeletePointer<net::BidirectionalStream>,
256 bidi_stream_.release()));
257 delegate_->OnFailed(error);
258 }
259
260 void CronetBidirectionalStream::StartOnNetworkThread(
261 std::unique_ptr<net::BidirectionalStreamRequestInfo> request_info) {
262 DCHECK(environment_->IsOnNetworkThread());
263 DCHECK(!bidi_stream_);
264 DCHECK(environment_->GetURLRequestContext());
265 request_info->extra_headers.SetHeaderIfMissing(
266 net::HttpRequestHeaders::kUserAgent, environment_->user_agent());
267 bidi_stream_.reset(new net::BidirectionalStream(
268 std::move(request_info), environment_->GetURLRequestContext()
269 ->http_transaction_factory()
270 ->GetSession(),
271 !delay_headers_until_flush_, this));
272 DCHECK(read_state_ == NOT_STARTED && write_state_ == NOT_STARTED);
273 read_state_ = write_state_ = STARTED;
274 }
275
276 void CronetBidirectionalStream::ReadDataOnNetworkThread(
277 scoped_refptr<net::WrappedIOBuffer> read_buffer,
278 int buffer_size) {
279 DCHECK(environment_->IsOnNetworkThread());
280 DCHECK(read_buffer);
281 DCHECK(!read_buffer_);
282 if (read_state_ != WAITING_FOR_READ) {
283 DLOG(ERROR) << "Unexpected Read Data in read_state " << read_state_;
284 // Invoke OnFailed unless it is already invoked.
285 if (read_state_ != ERROR)
286 OnFailed(net::ERR_UNEXPECTED);
287 return;
288 }
289 read_state_ = READING;
290 read_buffer_ = read_buffer;
291
292 int bytes_read = bidi_stream_->ReadData(read_buffer_.get(), buffer_size);
293 // If IO is pending, wait for the BidirectionalStream to call OnDataRead.
294 if (bytes_read == net::ERR_IO_PENDING)
295 return;
296
297 if (bytes_read < 0) {
298 OnFailed(bytes_read);
299 return;
300 }
301 OnDataRead(bytes_read);
302 }
303
304 void CronetBidirectionalStream::WriteDataOnNetworkThread(
305 scoped_refptr<net::WrappedIOBuffer> write_buffer,
306 int buffer_size,
307 bool end_of_stream) {
308 DCHECK(environment_->IsOnNetworkThread());
309 DCHECK(write_buffer);
310 DCHECK(!write_end_of_stream_);
311 if (!bidi_stream_ || write_end_of_stream_) {
312 DLOG(ERROR) << "Unexpected Flush Data in write_state " << write_state_;
313 // Invoke OnFailed unless it is already invoked.
314 if (write_state_ != ERROR)
315 OnFailed(net::ERR_UNEXPECTED);
316 return;
317 }
318 pending_write_data_->AppendBuffer(write_buffer, buffer_size);
319 write_end_of_stream_ = end_of_stream;
320 if (!disable_auto_flush_)
321 FlushOnNetworkThread();
322 }
323
324 void CronetBidirectionalStream::FlushOnNetworkThread() {
325 DCHECK(environment_->IsOnNetworkThread());
326 if (!bidi_stream_)
327 return;
328 // If there is no data to flush, may need to send headers.
329 if (pending_write_data_->Empty()) {
330 if (!request_headers_sent_) {
331 request_headers_sent_ = true;
332 bidi_stream_->SendRequestHeaders();
333 }
334 return;
335 }
336 // If request headers are not sent yet, they will be sent with the data.
337 if (!request_headers_sent_)
338 request_headers_sent_ = true;
339
340 // Move pending data to the flushing list.
341 pending_write_data_->MoveTo(flushing_write_data_.get());
342 DCHECK(pending_write_data_->Empty());
343 if (write_state_ != WRITING)
344 SendFlushingWriteData();
345 }
346
347 void CronetBidirectionalStream::SendFlushingWriteData() {
348 DCHECK(bidi_stream_);
349 // If previous send is not done, or there is nothing to flush, then exit.
350 if (write_state_ == WRITING || flushing_write_data_->Empty())
351 return;
352 DCHECK(sending_write_data_->Empty());
353 write_state_ = WRITING;
354 flushing_write_data_->MoveTo(sending_write_data_.get());
355 bidi_stream_->SendvData(sending_write_data_->buffers(),
356 sending_write_data_->lengths(),
357 write_end_of_stream_ && pending_write_data_->Empty());
358 }
359
360 void CronetBidirectionalStream::CancelOnNetworkThread() {
361 DCHECK(environment_->IsOnNetworkThread());
362 if (!bidi_stream_)
363 return;
364 read_state_ = write_state_ = CANCELED;
365 bidi_stream_.reset();
366 weak_factory_.InvalidateWeakPtrs();
367 delegate_->OnCanceled();
368 }
369
370 void CronetBidirectionalStream::DestroyOnNetworkThread() {
371 DCHECK(environment_->IsOnNetworkThread());
372 delete this;
373 }
374
375 void CronetBidirectionalStream::MaybeOnSucceded() {
376 DCHECK(environment_->IsOnNetworkThread());
377 if (!bidi_stream_)
378 return;
379 if (read_state_ == READING_DONE && write_state_ == WRITING_DONE) {
380 read_state_ = write_state_ = SUCCESS;
381 weak_factory_.InvalidateWeakPtrs();
382 // Delete underlying |bidi_stream_| asynchronously as it may still be used.
383 environment_->PostToNetworkThread(
384 FROM_HERE, base::Bind(&base::DeletePointer<net::BidirectionalStream>,
385 bidi_stream_.release()));
386 delegate_->OnSucceeded();
387 }
388 }
389
390 } // namespace cronet
OLDNEW
« no previous file with comments | « components/cronet/ios/cronet_bidirectional_stream.h ('k') | components/cronet/ios/cronet_c_for_grpc.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698