Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(219)

Side by Side Diff: net/quic/chromium/bidirectional_stream_quic_impl.cc

Issue 2908243002: Remove QuicChromiumClientStream::Delegate in favor of async methods. (Closed)
Patch Set: Rebase Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h" 5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/location.h" 10 #include "base/location.h"
(...skipping 10 matching lines...) Expand all
21 namespace net { 21 namespace net {
22 22
23 BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl( 23 BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl(
24 std::unique_ptr<QuicChromiumClientSession::Handle> session) 24 std::unique_ptr<QuicChromiumClientSession::Handle> session)
25 : session_(std::move(session)), 25 : session_(std::move(session)),
26 stream_(nullptr), 26 stream_(nullptr),
27 request_info_(nullptr), 27 request_info_(nullptr),
28 delegate_(nullptr), 28 delegate_(nullptr),
29 response_status_(OK), 29 response_status_(OK),
30 negotiated_protocol_(kProtoUnknown), 30 negotiated_protocol_(kProtoUnknown),
31 expect_trailers_(true),
31 read_buffer_len_(0), 32 read_buffer_len_(0),
32 headers_bytes_received_(0), 33 headers_bytes_received_(0),
33 headers_bytes_sent_(0), 34 headers_bytes_sent_(0),
34 closed_stream_received_bytes_(0), 35 closed_stream_received_bytes_(0),
35 closed_stream_sent_bytes_(0), 36 closed_stream_sent_bytes_(0),
36 closed_is_first_stream_(false), 37 closed_is_first_stream_(false),
37 has_sent_headers_(false), 38 has_sent_headers_(false),
38 send_request_headers_automatically_(true), 39 send_request_headers_automatically_(true),
39 weak_factory_(this) {} 40 weak_factory_(this) {}
40 41
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
79 80
80 OnStreamReady(rv); 81 OnStreamReady(rv);
81 } 82 }
82 83
83 void BidirectionalStreamQuicImpl::SendRequestHeaders() { 84 void BidirectionalStreamQuicImpl::SendRequestHeaders() {
84 WriteHeaders(); 85 WriteHeaders();
85 } 86 }
86 87
87 bool BidirectionalStreamQuicImpl::WriteHeaders() { 88 bool BidirectionalStreamQuicImpl::WriteHeaders() {
88 DCHECK(!has_sent_headers_); 89 DCHECK(!has_sent_headers_);
89 if (!stream_) { 90 if (!stream_->IsOpen()) {
90 LOG(ERROR) 91 LOG(ERROR)
91 << "Trying to send request headers after stream has been destroyed."; 92 << "Trying to send request headers after stream has been closed.";
92 base::ThreadTaskRunnerHandle::Get()->PostTask( 93 base::ThreadTaskRunnerHandle::Get()->PostTask(
93 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, 94 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
94 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); 95 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
95 return false; 96 return false;
96 } 97 }
97 98
98 SpdyHeaderBlock headers; 99 SpdyHeaderBlock headers;
99 HttpRequestInfo http_request_info; 100 HttpRequestInfo http_request_info;
100 http_request_info.url = request_info_->url; 101 http_request_info.url = request_info_->url;
101 http_request_info.method = request_info_->method; 102 http_request_info.method = request_info_->method;
102 http_request_info.extra_headers = request_info_->extra_headers; 103 http_request_info.extra_headers = request_info_->extra_headers;
103 104
104 CreateSpdyHeadersFromHttpRequest( 105 CreateSpdyHeadersFromHttpRequest(
105 http_request_info, http_request_info.extra_headers, true, &headers); 106 http_request_info, http_request_info.extra_headers, true, &headers);
106 // Sending the request might result in |this| being deleted. 107 int rv = stream_->WriteHeaders(std::move(headers),
107 auto guard = weak_factory_.GetWeakPtr(); 108 request_info_->end_stream_on_headers, nullptr);
108 size_t headers_bytes_sent = stream_->WriteHeaders( 109 if (rv < 0) {
109 std::move(headers), request_info_->end_stream_on_headers, nullptr); 110 NotifyError(rv);
xunjieli 2017/06/01 15:30:33 When WriteHeaders() is called synchronously in Sen
Ryan Hamilton 2017/06/01 23:20:29 Done. (I didn't do that in the previous patch set
110 if (!guard.get())
111 return false; 111 return false;
112 112 }
113 headers_bytes_sent_ += headers_bytes_sent; 113 headers_bytes_sent_ += rv;
114 has_sent_headers_ = true; 114 has_sent_headers_ = true;
115 return true; 115 return true;
116 } 116 }
117 117
118 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) { 118 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
119 DCHECK(buffer); 119 DCHECK(buffer);
120 DCHECK(buffer_len); 120 DCHECK(buffer_len);
121 121
122 if (!stream_) {
123 // If the stream is already closed, there is no body to read.
124 return response_status_;
125 }
126 int rv = stream_->ReadBody( 122 int rv = stream_->ReadBody(
127 buffer, buffer_len, 123 buffer, buffer_len,
128 base::Bind(&BidirectionalStreamQuicImpl::OnReadDataComplete, 124 base::Bind(&BidirectionalStreamQuicImpl::OnReadDataComplete,
129 weak_factory_.GetWeakPtr())); 125 weak_factory_.GetWeakPtr()));
130 if (rv == ERR_IO_PENDING) { 126 if (rv == ERR_IO_PENDING) {
131 read_buffer_ = buffer; 127 read_buffer_ = buffer;
132 read_buffer_len_ = buffer_len; 128 read_buffer_len_ = buffer_len;
133 return ERR_IO_PENDING; 129 return ERR_IO_PENDING;
134 } 130 }
135 131
136 if (rv < 0) 132 if (rv < 0)
137 return rv; 133 return rv;
138 134
139 if (stream_->IsDoneReading()) { 135 if (stream_->IsDoneReading()) {
140 // If the write side is closed, OnFinRead() will call 136 // If the write side is closed, OnFinRead() will call
141 // BidirectionalStreamQuicImpl::OnClose(). 137 // BidirectionalStreamQuicImpl::OnClose().
138 expect_trailers_ = false;
142 stream_->OnFinRead(); 139 stream_->OnFinRead();
143 } 140 }
144 return rv; 141 return rv;
145 } 142 }
146 143
147 void BidirectionalStreamQuicImpl::SendData(const scoped_refptr<IOBuffer>& data, 144 void BidirectionalStreamQuicImpl::SendData(const scoped_refptr<IOBuffer>& data,
148 int length, 145 int length,
149 bool end_stream) { 146 bool end_stream) {
150 DCHECK(length > 0 || (length == 0 && end_stream)); 147 DCHECK(length > 0 || (length == 0 && end_stream));
151 if (!stream_) { 148 if (!stream_->IsOpen()) {
152 LOG(ERROR) << "Trying to send data after stream has been destroyed."; 149 LOG(ERROR) << "Trying to send data after stream has been closed.";
153 base::ThreadTaskRunnerHandle::Get()->PostTask( 150 base::ThreadTaskRunnerHandle::Get()->PostTask(
154 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, 151 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
155 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); 152 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
156 return; 153 return;
157 } 154 }
158 155
159 std::unique_ptr<QuicConnection::ScopedPacketBundler> bundler; 156 std::unique_ptr<QuicConnection::ScopedPacketBundler> bundler;
160 if (!has_sent_headers_) { 157 if (!has_sent_headers_) {
161 DCHECK(!send_request_headers_automatically_); 158 DCHECK(!send_request_headers_automatically_);
162 // Creates a bundler only if there are headers to be sent along with the 159 // Creates a bundler only if there are headers to be sent along with the
163 // single data buffer. 160 // single data buffer.
164 bundler = 161 bundler =
165 session_->CreatePacketBundler(QuicConnection::SEND_ACK_IF_PENDING); 162 session_->CreatePacketBundler(QuicConnection::SEND_ACK_IF_PENDING);
166 // Sending the request might result in |this| being deleted. 163 // Sending the request might result in |this| being deleted.
167 if (!WriteHeaders()) 164 if (!WriteHeaders())
168 return; 165 return;
169 } 166 }
170 167
171 QuicStringPiece string_data(data->data(), length); 168 QuicStringPiece string_data(data->data(), length);
172 int rv = stream_->WriteStreamData( 169 int rv = stream_->WriteStreamData(
173 string_data, end_stream, 170 string_data, end_stream,
174 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, 171 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
175 weak_factory_.GetWeakPtr())); 172 weak_factory_.GetWeakPtr()));
176 DCHECK(rv == OK || rv == ERR_IO_PENDING); 173 if (rv != ERR_IO_PENDING) {
177 if (rv == OK) {
178 base::ThreadTaskRunnerHandle::Get()->PostTask( 174 base::ThreadTaskRunnerHandle::Get()->PostTask(
179 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, 175 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
180 weak_factory_.GetWeakPtr(), OK)); 176 weak_factory_.GetWeakPtr(), rv));
181 } 177 }
182 } 178 }
183 179
184 void BidirectionalStreamQuicImpl::SendvData( 180 void BidirectionalStreamQuicImpl::SendvData(
185 const std::vector<scoped_refptr<IOBuffer>>& buffers, 181 const std::vector<scoped_refptr<IOBuffer>>& buffers,
186 const std::vector<int>& lengths, 182 const std::vector<int>& lengths,
187 bool end_stream) { 183 bool end_stream) {
188 DCHECK_EQ(buffers.size(), lengths.size()); 184 DCHECK_EQ(buffers.size(), lengths.size());
189 185
190 if (!stream_) { 186 if (!stream_->IsOpen()) {
191 LOG(ERROR) << "Trying to send data after stream has been destroyed."; 187 LOG(ERROR) << "Trying to send data after stream has been closed.";
192 base::ThreadTaskRunnerHandle::Get()->PostTask( 188 base::ThreadTaskRunnerHandle::Get()->PostTask(
193 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, 189 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
194 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); 190 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
195 return; 191 return;
196 } 192 }
197 193
198 std::unique_ptr<QuicConnection::ScopedPacketBundler> bundler( 194 std::unique_ptr<QuicConnection::ScopedPacketBundler> bundler(
199 session_->CreatePacketBundler(QuicConnection::SEND_ACK_IF_PENDING)); 195 session_->CreatePacketBundler(QuicConnection::SEND_ACK_IF_PENDING));
200 if (!has_sent_headers_) { 196 if (!has_sent_headers_) {
201 DCHECK(!send_request_headers_automatically_); 197 DCHECK(!send_request_headers_automatically_);
202 // Sending the request might result in |this| being deleted. 198 // Sending the request might result in |this| being deleted.
203 if (!WriteHeaders()) 199 if (!WriteHeaders())
204 return; 200 return;
205 } 201 }
206 202
207 int rv = stream_->WritevStreamData( 203 int rv = stream_->WritevStreamData(
208 buffers, lengths, end_stream, 204 buffers, lengths, end_stream,
209 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, 205 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
210 weak_factory_.GetWeakPtr())); 206 weak_factory_.GetWeakPtr()));
211 207
212 DCHECK(rv == OK || rv == ERR_IO_PENDING); 208 if (rv != ERR_IO_PENDING) {
213 if (rv == OK) {
214 base::ThreadTaskRunnerHandle::Get()->PostTask( 209 base::ThreadTaskRunnerHandle::Get()->PostTask(
215 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, 210 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
216 weak_factory_.GetWeakPtr(), OK)); 211 weak_factory_.GetWeakPtr(), rv));
217 } 212 }
218 } 213 }
219 214
220 NextProto BidirectionalStreamQuicImpl::GetProtocol() const { 215 NextProto BidirectionalStreamQuicImpl::GetProtocol() const {
221 return negotiated_protocol_; 216 return negotiated_protocol_;
222 } 217 }
223 218
224 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const { 219 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const {
225 if (stream_) 220 if (stream_)
226 return headers_bytes_received_ + stream_->stream_bytes_read(); 221 return headers_bytes_received_ + stream_->stream_bytes_read();
(...skipping 13 matching lines...) Expand all
240 is_first_stream = stream_->IsFirstStream(); 235 is_first_stream = stream_->IsFirstStream();
241 if (is_first_stream) { 236 if (is_first_stream) {
242 load_timing_info->socket_reused = false; 237 load_timing_info->socket_reused = false;
243 load_timing_info->connect_timing = connect_timing_; 238 load_timing_info->connect_timing = connect_timing_;
244 } else { 239 } else {
245 load_timing_info->socket_reused = true; 240 load_timing_info->socket_reused = true;
246 } 241 }
247 return true; 242 return true;
248 } 243 }
249 244
250 void BidirectionalStreamQuicImpl::OnClose() {
251 DCHECK(stream_);
252
253 if (stream_->connection_error() != QUIC_NO_ERROR ||
254 stream_->stream_error() != QUIC_STREAM_NO_ERROR) {
255 NotifyError(session_->IsCryptoHandshakeConfirmed()
256 ? ERR_QUIC_PROTOCOL_ERROR
257 : ERR_QUIC_HANDSHAKE_FAILED);
258 return;
259 }
260
261 if (!stream_->fin_sent() || !stream_->fin_received()) {
262 // The connection must have been closed by the peer with QUIC_NO_ERROR,
263 // which is improper.
264 NotifyError(ERR_UNEXPECTED);
265 return;
266 }
267
268 // The connection was closed normally so there is no need to notify
269 // the delegate.
270 ResetStream();
271 }
272
273 void BidirectionalStreamQuicImpl::OnError(int error) {
274 NotifyError(error);
275 }
276
277 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) { 245 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) {
278 DCHECK_NE(ERR_IO_PENDING, rv); 246 DCHECK_NE(ERR_IO_PENDING, rv);
279 DCHECK(rv == OK || !stream_); 247 DCHECK(!stream_);
280 if (rv != OK) { 248 if (rv != OK) {
281 NotifyError(rv); 249 NotifyError(rv);
282 return; 250 return;
283 } 251 }
284 252
285 stream_ = session_->ReleaseStream(this); 253 stream_ = session_->ReleaseStream();
286 254
287 base::ThreadTaskRunnerHandle::Get()->PostTask( 255 base::ThreadTaskRunnerHandle::Get()->PostTask(
288 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::ReadInitialHeaders, 256 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::ReadInitialHeaders,
289 weak_factory_.GetWeakPtr())); 257 weak_factory_.GetWeakPtr()));
290 258
291 NotifyStreamReady(); 259 NotifyStreamReady();
292 } 260 }
293 261
294 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) { 262 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) {
295 DCHECK(rv == OK || !stream_); 263 DCHECK_NE(ERR_IO_PENDING, rv);
296 if (rv != 0) { 264 if (rv < OK) {
297 NotifyError(rv); 265 NotifyError(rv);
298 return; 266 return;
299 } 267 }
300 268
301 if (delegate_) 269 if (delegate_)
302 delegate_->OnDataSent(); 270 delegate_->OnDataSent();
303 } 271 }
304 272
305 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) { 273 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) {
306 DCHECK_NE(ERR_IO_PENDING, rv); 274 DCHECK_NE(ERR_IO_PENDING, rv);
(...skipping 28 matching lines...) Expand all
335 base::Bind(&BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete, 303 base::Bind(&BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete,
336 weak_factory_.GetWeakPtr())); 304 weak_factory_.GetWeakPtr()));
337 305
338 if (rv != ERR_IO_PENDING) 306 if (rv != ERR_IO_PENDING)
339 OnReadTrailingHeadersComplete(rv); 307 OnReadTrailingHeadersComplete(rv);
340 } 308 }
341 309
342 void BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete(int rv) { 310 void BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete(int rv) {
343 DCHECK_NE(ERR_IO_PENDING, rv); 311 DCHECK_NE(ERR_IO_PENDING, rv);
344 if (rv < 0) { 312 if (rv < 0) {
345 NotifyError(rv); 313 if (expect_trailers_)
xunjieli 2017/06/01 15:30:32 Is this needed? The second time NotifyError() is c
Ryan Hamilton 2017/06/01 23:20:29 Yes, I think it is. The usecase is that when a ser
xunjieli 2017/06/02 12:43:41 Thanks! If body is received with a Fin, OnFinRead(
Ryan Hamilton 2017/06/02 14:06:26 Oh, right! I forgot that I did that :) (I was stru
314 NotifyError(rv);
346 return; 315 return;
347 } 316 }
348 317
349 headers_bytes_received_ += rv; 318 headers_bytes_received_ += rv;
350 319
351 if (delegate_) 320 if (delegate_)
352 delegate_->OnTrailersReceived(trailing_headers_); 321 delegate_->OnTrailersReceived(trailing_headers_);
353 } 322 }
354 323
355 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) { 324 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) {
356 DCHECK_GE(rv, 0);
357 read_buffer_ = nullptr; 325 read_buffer_ = nullptr;
358 read_buffer_len_ = 0; 326 read_buffer_len_ = 0;
359 327
360 if (stream_->IsDoneReading()) { 328 if (stream_->IsDoneReading()) {
361 // If the write side is closed, OnFinRead() will call 329 // If the write side is closed, OnFinRead() will call
362 // BidirectionalStreamQuicImpl::OnClose(). 330 // BidirectionalStreamQuicImpl::OnClose().
363 stream_->OnFinRead(); 331 stream_->OnFinRead();
364 } 332 }
365 333
366 if (delegate_) 334 if (!delegate_)
335 return;
336
337 if (rv < 0)
338 NotifyError(rv);
339 else
367 delegate_->OnDataRead(rv); 340 delegate_->OnDataRead(rv);
368 } 341 }
369 342
370 void BidirectionalStreamQuicImpl::NotifyError(int error) { 343 void BidirectionalStreamQuicImpl::NotifyError(int error) {
371 DCHECK_NE(OK, error); 344 DCHECK_NE(OK, error);
372 DCHECK_NE(ERR_IO_PENDING, error); 345 DCHECK_NE(ERR_IO_PENDING, error);
373 346
374 ResetStream(); 347 ResetStream();
375 if (delegate_) { 348 if (delegate_) {
376 response_status_ = error; 349 response_status_ = error;
(...skipping 14 matching lines...) Expand all
391 if (delegate_) 364 if (delegate_)
392 delegate_->OnStreamReady(has_sent_headers_); 365 delegate_->OnStreamReady(has_sent_headers_);
393 } 366 }
394 367
395 void BidirectionalStreamQuicImpl::ResetStream() { 368 void BidirectionalStreamQuicImpl::ResetStream() {
396 if (!stream_) 369 if (!stream_)
397 return; 370 return;
398 closed_stream_received_bytes_ = stream_->stream_bytes_read(); 371 closed_stream_received_bytes_ = stream_->stream_bytes_read();
399 closed_stream_sent_bytes_ = stream_->stream_bytes_written(); 372 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
400 closed_is_first_stream_ = stream_->IsFirstStream(); 373 closed_is_first_stream_ = stream_->IsFirstStream();
401 stream_->ClearDelegate();
402 stream_ = nullptr;
403 } 374 }
404 375
405 } // namespace net 376 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698