Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(63)

Side by Side Diff: net/quic/chromium/bidirectional_stream_quic_impl.cc

Issue 2877063002: Add an async ReadBody method to QuicChromiumClientStream::Handle (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h" 5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/location.h" 10 #include "base/location.h"
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
105 } 105 }
106 106
107 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) { 107 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
108 DCHECK(buffer); 108 DCHECK(buffer);
109 DCHECK(buffer_len); 109 DCHECK(buffer_len);
110 110
111 if (!stream_) { 111 if (!stream_) {
112 // If the stream is already closed, there is no body to read. 112 // If the stream is already closed, there is no body to read.
113 return response_status_; 113 return response_status_;
114 } 114 }
115 int rv = stream_->Read(buffer, buffer_len); 115 int rv = stream_->ReadBody(
116 if (rv != ERR_IO_PENDING) { 116 buffer, buffer_len,
117 if (stream_->IsDoneReading()) { 117 base::Bind(&BidirectionalStreamQuicImpl::OnReadDataComplete,
118 // If the write side is closed, OnFinRead() will call 118 weak_factory_.GetWeakPtr()));
119 // BidirectionalStreamQuicImpl::OnClose(). 119 if (rv == ERR_IO_PENDING) {
120 stream_->OnFinRead(); 120 // Read will complete asynchronously and Delegate::OnReadCompleted will be
121 } 121 // called upon completion.
xunjieli 2017/05/12 15:22:22 nit: Could you remove this old comment (line 120 t
Ryan Hamilton 2017/05/12 17:26:24 Great! Done.
122 read_buffer_ = buffer;
123 read_buffer_len_ = buffer_len;
124 return ERR_IO_PENDING;
125 }
126
127 if (rv < 0)
122 return rv; 128 return rv;
129
130 if (stream_->IsDoneReading()) {
131 // If the write side is closed, OnFinRead() will call
132 // BidirectionalStreamQuicImpl::OnClose().
133 stream_->OnFinRead();
123 } 134 }
124 // Read will complete asynchronously and Delegate::OnReadCompleted will be 135 return rv;
125 // called upon completion.
126 read_buffer_ = buffer;
127 read_buffer_len_ = buffer_len;
128 return ERR_IO_PENDING;
129 } 136 }
130 137
131 void BidirectionalStreamQuicImpl::SendData(const scoped_refptr<IOBuffer>& data, 138 void BidirectionalStreamQuicImpl::SendData(const scoped_refptr<IOBuffer>& data,
132 int length, 139 int length,
133 bool end_stream) { 140 bool end_stream) {
134 DCHECK(length > 0 || (length == 0 && end_stream)); 141 DCHECK(length > 0 || (length == 0 && end_stream));
135 if (!stream_) { 142 if (!stream_) {
136 LOG(ERROR) << "Trying to send data after stream has been destroyed."; 143 LOG(ERROR) << "Trying to send data after stream has been destroyed.";
137 base::ThreadTaskRunnerHandle::Get()->PostTask( 144 base::ThreadTaskRunnerHandle::Get()->PostTask(
138 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, 145 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
229 236
230 void BidirectionalStreamQuicImpl::OnTrailingHeadersAvailable( 237 void BidirectionalStreamQuicImpl::OnTrailingHeadersAvailable(
231 const SpdyHeaderBlock& headers, 238 const SpdyHeaderBlock& headers,
232 size_t frame_len) { 239 size_t frame_len) {
233 headers_bytes_received_ += frame_len; 240 headers_bytes_received_ += frame_len;
234 if (delegate_) 241 if (delegate_)
235 delegate_->OnTrailersReceived(headers); 242 delegate_->OnTrailersReceived(headers);
236 // |this| can be destroyed after this point. 243 // |this| can be destroyed after this point.
237 } 244 }
238 245
239 void BidirectionalStreamQuicImpl::OnDataAvailable() {
240 // Return early if ReadData has not been called.
241 if (!read_buffer_)
242 return;
243
244 int rv = ReadData(read_buffer_.get(), read_buffer_len_);
245 if (rv == ERR_IO_PENDING) {
246 // Spurrious notification. Wait for the next one.
247 return;
248 }
249 read_buffer_ = nullptr;
250 read_buffer_len_ = 0;
251 if (delegate_)
252 delegate_->OnDataRead(rv);
253 }
254
255 void BidirectionalStreamQuicImpl::OnClose() { 246 void BidirectionalStreamQuicImpl::OnClose() {
256 DCHECK(stream_); 247 DCHECK(stream_);
257 248
258 if (stream_->connection_error() != QUIC_NO_ERROR || 249 if (stream_->connection_error() != QUIC_NO_ERROR ||
259 stream_->stream_error() != QUIC_STREAM_NO_ERROR) { 250 stream_->stream_error() != QUIC_STREAM_NO_ERROR) {
260 NotifyError(session_->IsCryptoHandshakeConfirmed() 251 NotifyError(session_->IsCryptoHandshakeConfirmed()
261 ? ERR_QUIC_PROTOCOL_ERROR 252 ? ERR_QUIC_PROTOCOL_ERROR
262 : ERR_QUIC_HANDSHAKE_FAILED); 253 : ERR_QUIC_HANDSHAKE_FAILED);
263 return; 254 return;
264 } 255 }
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
316 return; 307 return;
317 } 308 }
318 309
319 headers_bytes_received_ += rv; 310 headers_bytes_received_ += rv;
320 negotiated_protocol_ = kProtoQUIC; 311 negotiated_protocol_ = kProtoQUIC;
321 connect_timing_ = session_->GetConnectTiming(); 312 connect_timing_ = session_->GetConnectTiming();
322 if (delegate_) 313 if (delegate_)
323 delegate_->OnHeadersReceived(initial_headers_); 314 delegate_->OnHeadersReceived(initial_headers_);
324 } 315 }
325 316
317 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) {
318 DCHECK_GE(rv, 0);
319 read_buffer_ = nullptr;
320 read_buffer_len_ = 0;
321 if (delegate_)
322 delegate_->OnDataRead(rv);
323 }
324
326 void BidirectionalStreamQuicImpl::NotifyError(int error) { 325 void BidirectionalStreamQuicImpl::NotifyError(int error) {
327 DCHECK_NE(OK, error); 326 DCHECK_NE(OK, error);
328 DCHECK_NE(ERR_IO_PENDING, error); 327 DCHECK_NE(ERR_IO_PENDING, error);
329 328
330 ResetStream(); 329 ResetStream();
331 if (delegate_) { 330 if (delegate_) {
332 response_status_ = error; 331 response_status_ = error;
333 BidirectionalStreamImpl::Delegate* delegate = delegate_; 332 BidirectionalStreamImpl::Delegate* delegate = delegate_;
334 delegate_ = nullptr; 333 delegate_ = nullptr;
335 // Cancel any pending callback. 334 // Cancel any pending callback.
(...skipping 15 matching lines...) Expand all
351 if (!stream_) 350 if (!stream_)
352 return; 351 return;
353 closed_stream_received_bytes_ = stream_->stream_bytes_read(); 352 closed_stream_received_bytes_ = stream_->stream_bytes_read();
354 closed_stream_sent_bytes_ = stream_->stream_bytes_written(); 353 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
355 closed_is_first_stream_ = stream_->IsFirstStream(); 354 closed_is_first_stream_ = stream_->IsFirstStream();
356 stream_->ClearDelegate(); 355 stream_->ClearDelegate();
357 stream_ = nullptr; 356 stream_ = nullptr;
358 } 357 }
359 358
360 } // namespace net 359 } // namespace net
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698