Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(466)

Side by Side Diff: net/quic/chromium/bidirectional_stream_quic_impl.cc

Issue 2908243002: Remove QuicChromiumClientStream::Delegate in favor of async methods. (Closed)
Patch Set: No expect_trailers_ Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h" 5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/location.h" 10 #include "base/location.h"
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
95 return; 95 return;
96 } 96 }
97 97
98 base::ThreadTaskRunnerHandle::Get()->PostTask( 98 base::ThreadTaskRunnerHandle::Get()->PostTask(
99 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnStreamReady, 99 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnStreamReady,
100 weak_factory_.GetWeakPtr(), rv)); 100 weak_factory_.GetWeakPtr(), rv));
101 } 101 }
102 102
103 void BidirectionalStreamQuicImpl::SendRequestHeaders() { 103 void BidirectionalStreamQuicImpl::SendRequestHeaders() {
104 ScopedBoolSaver saver(&may_invoke_callbacks_, false); 104 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
105 // If this fails, a task will have been posted to notify the delegate 105 int rv = WriteHeaders();
106 // asynchronously. 106 if (rv < 0) {
107 WriteHeaders(); 107 base::ThreadTaskRunnerHandle::Get()->PostTask(
108 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
109 weak_factory_.GetWeakPtr(), rv));
110 }
108 } 111 }
109 112
110 bool BidirectionalStreamQuicImpl::WriteHeaders() { 113 int BidirectionalStreamQuicImpl::WriteHeaders() {
111 DCHECK(!has_sent_headers_); 114 DCHECK(!has_sent_headers_);
112 if (!stream_) {
113 LOG(ERROR)
114 << "Trying to send request headers after stream has been destroyed.";
115 base::ThreadTaskRunnerHandle::Get()->PostTask(
116 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
117 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
118 return false;
119 }
120 115
121 SpdyHeaderBlock headers; 116 SpdyHeaderBlock headers;
122 HttpRequestInfo http_request_info; 117 HttpRequestInfo http_request_info;
123 http_request_info.url = request_info_->url; 118 http_request_info.url = request_info_->url;
124 http_request_info.method = request_info_->method; 119 http_request_info.method = request_info_->method;
125 http_request_info.extra_headers = request_info_->extra_headers; 120 http_request_info.extra_headers = request_info_->extra_headers;
126 121
127 CreateSpdyHeadersFromHttpRequest( 122 CreateSpdyHeadersFromHttpRequest(
128 http_request_info, http_request_info.extra_headers, true, &headers); 123 http_request_info, http_request_info.extra_headers, true, &headers);
129 // Sending the request might result in the stream being closed via OnClose 124 int rv = stream_->WriteHeaders(std::move(headers),
130 // which will post a task to notify the delegate asynchronously. 125 request_info_->end_stream_on_headers, nullptr);
131 // TODO(rch): Clean up this interface when OnClose and OnError are removed. 126 if (rv >= 0) {
132 size_t headers_bytes_sent = stream_->WriteHeaders( 127 headers_bytes_sent_ += rv;
133 std::move(headers), request_info_->end_stream_on_headers, nullptr); 128 has_sent_headers_ = true;
134 if (!stream_) 129 }
135 return false; 130 return rv;
136
137 headers_bytes_sent_ += headers_bytes_sent;
138 has_sent_headers_ = true;
139 return true;
140 } 131 }
141 132
142 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) { 133 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
143 ScopedBoolSaver saver(&may_invoke_callbacks_, false); 134 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
144 DCHECK(buffer); 135 DCHECK(buffer);
145 DCHECK(buffer_len); 136 DCHECK(buffer_len);
146 137
147 if (!stream_) {
148 // If the stream is already closed, there is no body to read.
149 return response_status_;
150 }
151 int rv = stream_->ReadBody( 138 int rv = stream_->ReadBody(
152 buffer, buffer_len, 139 buffer, buffer_len,
153 base::Bind(&BidirectionalStreamQuicImpl::OnReadDataComplete, 140 base::Bind(&BidirectionalStreamQuicImpl::OnReadDataComplete,
154 weak_factory_.GetWeakPtr())); 141 weak_factory_.GetWeakPtr()));
155 if (rv == ERR_IO_PENDING) { 142 if (rv == ERR_IO_PENDING) {
156 read_buffer_ = buffer; 143 read_buffer_ = buffer;
157 read_buffer_len_ = buffer_len; 144 read_buffer_len_ = buffer_len;
158 return ERR_IO_PENDING; 145 return ERR_IO_PENDING;
159 } 146 }
160 147
161 if (rv < 0) 148 if (rv < 0)
162 return rv; 149 return rv;
163 150
164 if (stream_->IsDoneReading()) { 151 // If the write side is closed, OnFinRead() will call
165 // If the write side is closed, OnFinRead() will call 152 // BidirectionalStreamQuicImpl::OnClose().
166 // BidirectionalStreamQuicImpl::OnClose(). 153 if (stream_->IsDoneReading())
167 stream_->OnFinRead(); 154 stream_->OnFinRead();
168 } 155
169 return rv; 156 return rv;
170 } 157 }
171 158
172 void BidirectionalStreamQuicImpl::SendvData( 159 void BidirectionalStreamQuicImpl::SendvData(
173 const std::vector<scoped_refptr<IOBuffer>>& buffers, 160 const std::vector<scoped_refptr<IOBuffer>>& buffers,
174 const std::vector<int>& lengths, 161 const std::vector<int>& lengths,
175 bool end_stream) { 162 bool end_stream) {
176 ScopedBoolSaver saver(&may_invoke_callbacks_, false); 163 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
177 DCHECK_EQ(buffers.size(), lengths.size()); 164 DCHECK_EQ(buffers.size(), lengths.size());
178 165
179 if (!stream_) { 166 if (!stream_->IsOpen()) {
180 LOG(ERROR) << "Trying to send data after stream has been destroyed."; 167 LOG(ERROR) << "Trying to send data after stream has been closed.";
181 base::ThreadTaskRunnerHandle::Get()->PostTask( 168 base::ThreadTaskRunnerHandle::Get()->PostTask(
182 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, 169 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
183 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); 170 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
184 return; 171 return;
185 } 172 }
186 173
187 std::unique_ptr<QuicConnection::ScopedPacketBundler> bundler( 174 std::unique_ptr<QuicConnection::ScopedPacketBundler> bundler(
188 session_->CreatePacketBundler(QuicConnection::SEND_ACK_IF_PENDING)); 175 session_->CreatePacketBundler(QuicConnection::SEND_ACK_IF_PENDING));
189 if (!has_sent_headers_) { 176 if (!has_sent_headers_) {
190 DCHECK(!send_request_headers_automatically_); 177 DCHECK(!send_request_headers_automatically_);
191 // Sending the request might result in the stream being closed. 178 int rv = WriteHeaders();
192 if (!WriteHeaders()) 179 if (rv < 0) {
180 base::ThreadTaskRunnerHandle::Get()->PostTask(
181 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
182 weak_factory_.GetWeakPtr(), rv));
193 return; 183 return;
184 }
194 } 185 }
195 186
196 int rv = stream_->WritevStreamData( 187 int rv = stream_->WritevStreamData(
197 buffers, lengths, end_stream, 188 buffers, lengths, end_stream,
198 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, 189 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
199 weak_factory_.GetWeakPtr())); 190 weak_factory_.GetWeakPtr()));
200 191
201 DCHECK(rv == OK || rv == ERR_IO_PENDING); 192 if (rv != ERR_IO_PENDING) {
202 if (rv == OK) {
203 base::ThreadTaskRunnerHandle::Get()->PostTask( 193 base::ThreadTaskRunnerHandle::Get()->PostTask(
204 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, 194 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete,
205 weak_factory_.GetWeakPtr(), OK)); 195 weak_factory_.GetWeakPtr(), rv));
206 } 196 }
207 } 197 }
208 198
209 NextProto BidirectionalStreamQuicImpl::GetProtocol() const { 199 NextProto BidirectionalStreamQuicImpl::GetProtocol() const {
210 return negotiated_protocol_; 200 return negotiated_protocol_;
211 } 201 }
212 202
213 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const { 203 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const {
214 if (stream_) 204 if (stream_)
215 return headers_bytes_received_ + stream_->stream_bytes_read(); 205 return headers_bytes_received_ + stream_->stream_bytes_read();
(...skipping 13 matching lines...) Expand all
229 is_first_stream = stream_->IsFirstStream(); 219 is_first_stream = stream_->IsFirstStream();
230 if (is_first_stream) { 220 if (is_first_stream) {
231 load_timing_info->socket_reused = false; 221 load_timing_info->socket_reused = false;
232 load_timing_info->connect_timing = connect_timing_; 222 load_timing_info->connect_timing = connect_timing_;
233 } else { 223 } else {
234 load_timing_info->socket_reused = true; 224 load_timing_info->socket_reused = true;
235 } 225 }
236 return true; 226 return true;
237 } 227 }
238 228
239 void BidirectionalStreamQuicImpl::OnClose() {
240 DCHECK(stream_);
241
242 if (stream_->connection_error() != QUIC_NO_ERROR ||
243 stream_->stream_error() != QUIC_STREAM_NO_ERROR) {
244 OnError(session_->IsCryptoHandshakeConfirmed() ? ERR_QUIC_PROTOCOL_ERROR
245 : ERR_QUIC_HANDSHAKE_FAILED);
246 return;
247 }
248
249 if (!stream_->fin_sent() || !stream_->fin_received()) {
250 // The connection must have been closed by the peer with QUIC_NO_ERROR,
251 // which is improper.
252 OnError(ERR_UNEXPECTED);
253 return;
254 }
255
256 // The connection was closed normally so there is no need to notify
257 // the delegate.
258 ResetStream();
259 }
260
261 void BidirectionalStreamQuicImpl::OnError(int error) {
262 // Avoid reentrancy by notifying the delegate asynchronously.
263 NotifyErrorImpl(error, /*notify_delegate_later*/ true);
264 }
265
266 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) { 229 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) {
267 DCHECK_NE(ERR_IO_PENDING, rv); 230 DCHECK_NE(ERR_IO_PENDING, rv);
268 DCHECK(rv == OK || !stream_); 231 DCHECK(!stream_);
269 if (rv != OK) { 232 if (rv != OK) {
270 NotifyError(rv); 233 NotifyError(rv);
271 return; 234 return;
272 } 235 }
273 236
274 stream_ = session_->ReleaseStream(this); 237 stream_ = session_->ReleaseStream();
275 238
276 base::ThreadTaskRunnerHandle::Get()->PostTask( 239 base::ThreadTaskRunnerHandle::Get()->PostTask(
277 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::ReadInitialHeaders, 240 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::ReadInitialHeaders,
278 weak_factory_.GetWeakPtr())); 241 weak_factory_.GetWeakPtr()));
279 242
280 NotifyStreamReady(); 243 NotifyStreamReady();
281 } 244 }
282 245
283 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) { 246 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) {
284 CHECK(may_invoke_callbacks_); 247 CHECK(may_invoke_callbacks_);
285 DCHECK(rv == OK || !stream_); 248 DCHECK_NE(ERR_IO_PENDING, rv);
286 if (rv != 0) { 249 if (rv < 0) {
287 NotifyError(rv); 250 NotifyError(rv);
288 return; 251 return;
289 } 252 }
290 253
291 if (delegate_) 254 if (delegate_)
292 delegate_->OnDataSent(); 255 delegate_->OnDataSent();
293 } 256 }
294 257
295 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) { 258 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) {
296 CHECK(may_invoke_callbacks_); 259 CHECK(may_invoke_callbacks_);
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
339 } 302 }
340 303
341 headers_bytes_received_ += rv; 304 headers_bytes_received_ += rv;
342 305
343 if (delegate_) 306 if (delegate_)
344 delegate_->OnTrailersReceived(trailing_headers_); 307 delegate_->OnTrailersReceived(trailing_headers_);
345 } 308 }
346 309
347 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) { 310 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) {
348 CHECK(may_invoke_callbacks_); 311 CHECK(may_invoke_callbacks_);
349 DCHECK_GE(rv, 0); 312
350 read_buffer_ = nullptr; 313 read_buffer_ = nullptr;
351 read_buffer_len_ = 0; 314 read_buffer_len_ = 0;
352 315
353 if (stream_->IsDoneReading()) { 316 // If the write side is closed, OnFinRead() will call
354 // If the write side is closed, OnFinRead() will call 317 // BidirectionalStreamQuicImpl::OnClose().
355 // BidirectionalStreamQuicImpl::OnClose(). 318 if (stream_->IsDoneReading())
356 stream_->OnFinRead(); 319 stream_->OnFinRead();
357 }
358 320
359 if (delegate_) 321 if (!delegate_)
322 return;
323
324 if (rv < 0)
325 NotifyError(rv);
326 else
360 delegate_->OnDataRead(rv); 327 delegate_->OnDataRead(rv);
361 } 328 }
362 329
363 void BidirectionalStreamQuicImpl::NotifyError(int error) { 330 void BidirectionalStreamQuicImpl::NotifyError(int error) {
364 NotifyErrorImpl(error, /*notify_delegate_later*/ false); 331 NotifyErrorImpl(error, /*notify_delegate_later*/ false);
365 } 332 }
366 333
367 void BidirectionalStreamQuicImpl::NotifyErrorImpl(int error, 334 void BidirectionalStreamQuicImpl::NotifyErrorImpl(int error,
368 bool notify_delegate_later) { 335 bool notify_delegate_later) {
369 DCHECK_NE(OK, error); 336 DCHECK_NE(OK, error);
(...skipping 20 matching lines...) Expand all
390 void BidirectionalStreamQuicImpl::NotifyFailure( 357 void BidirectionalStreamQuicImpl::NotifyFailure(
391 BidirectionalStreamImpl::Delegate* delegate, 358 BidirectionalStreamImpl::Delegate* delegate,
392 int error) { 359 int error) {
393 CHECK(may_invoke_callbacks_); 360 CHECK(may_invoke_callbacks_);
394 delegate->OnFailed(error); 361 delegate->OnFailed(error);
395 // |this| might be destroyed at this point. 362 // |this| might be destroyed at this point.
396 } 363 }
397 364
398 void BidirectionalStreamQuicImpl::NotifyStreamReady() { 365 void BidirectionalStreamQuicImpl::NotifyStreamReady() {
399 CHECK(may_invoke_callbacks_); 366 CHECK(may_invoke_callbacks_);
400 // Sending the request might result in the stream being closed. 367 if (send_request_headers_automatically_) {
401 if (send_request_headers_automatically_ && !WriteHeaders()) 368 int rv = WriteHeaders();
402 return; 369 if (rv < 0) {
370 base::ThreadTaskRunnerHandle::Get()->PostTask(
371 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
372 weak_factory_.GetWeakPtr(), rv));
373 return;
374 }
375 }
403 376
404 if (delegate_) 377 if (delegate_)
405 delegate_->OnStreamReady(has_sent_headers_); 378 delegate_->OnStreamReady(has_sent_headers_);
406 } 379 }
407 380
408 void BidirectionalStreamQuicImpl::ResetStream() { 381 void BidirectionalStreamQuicImpl::ResetStream() {
409 if (!stream_) 382 if (!stream_)
410 return; 383 return;
411 closed_stream_received_bytes_ = stream_->stream_bytes_read(); 384 closed_stream_received_bytes_ = stream_->stream_bytes_read();
412 closed_stream_sent_bytes_ = stream_->stream_bytes_written(); 385 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
413 closed_is_first_stream_ = stream_->IsFirstStream(); 386 closed_is_first_stream_ = stream_->IsFirstStream();
414 stream_->ClearDelegate();
415 stream_ = nullptr;
416 } 387 }
417 388
418 } // namespace net 389 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/chromium/bidirectional_stream_quic_impl.h ('k') | net/quic/chromium/bidirectional_stream_quic_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698