Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(329)

Side by Side Diff: net/quic/chromium/bidirectional_stream_quic_impl.cc

Issue 2877063002: Add an async ReadBody method to QuicChromiumClientStream::Handle (Closed)
Patch Set: format Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h" 5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/location.h" 10 #include "base/location.h"
(...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after
105 } 105 }
106 106
107 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) { 107 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
108 DCHECK(buffer); 108 DCHECK(buffer);
109 DCHECK(buffer_len); 109 DCHECK(buffer_len);
110 110
111 if (!stream_) { 111 if (!stream_) {
112 // If the stream is already closed, there is no body to read. 112 // If the stream is already closed, there is no body to read.
113 return response_status_; 113 return response_status_;
114 } 114 }
115 int rv = stream_->Read(buffer, buffer_len); 115 int rv = stream_->ReadBody(
116 if (rv != ERR_IO_PENDING) { 116 buffer, buffer_len,
117 if (stream_->IsDoneReading()) { 117 base::Bind(&BidirectionalStreamQuicImpl::OnReadDataComplete,
118 // If the write side is closed, OnFinRead() will call 118 weak_factory_.GetWeakPtr()));
119 // BidirectionalStreamQuicImpl::OnClose(). 119 if (rv == ERR_IO_PENDING) {
120 stream_->OnFinRead(); 120 read_buffer_ = buffer;
121 } 121 read_buffer_len_ = buffer_len;
122 return ERR_IO_PENDING;
123 }
124
125 if (rv < 0)
122 return rv; 126 return rv;
127
128 if (stream_->IsDoneReading()) {
129 // If the write side is closed, OnFinRead() will call
130 // BidirectionalStreamQuicImpl::OnClose().
131 stream_->OnFinRead();
123 } 132 }
124 // Read will complete asynchronously and Delegate::OnReadCompleted will be 133 return rv;
125 // called upon completion.
126 read_buffer_ = buffer;
127 read_buffer_len_ = buffer_len;
128 return ERR_IO_PENDING;
129 } 134 }
130 135
131 void BidirectionalStreamQuicImpl::SendData(const scoped_refptr<IOBuffer>& data, 136 void BidirectionalStreamQuicImpl::SendData(const scoped_refptr<IOBuffer>& data,
132 int length, 137 int length,
133 bool end_stream) { 138 bool end_stream) {
134 DCHECK(length > 0 || (length == 0 && end_stream)); 139 DCHECK(length > 0 || (length == 0 && end_stream));
135 if (!stream_) { 140 if (!stream_) {
136 LOG(ERROR) << "Trying to send data after stream has been destroyed."; 141 LOG(ERROR) << "Trying to send data after stream has been destroyed.";
137 base::ThreadTaskRunnerHandle::Get()->PostTask( 142 base::ThreadTaskRunnerHandle::Get()->PostTask(
138 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, 143 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
(...skipping 90 matching lines...) Expand 10 before | Expand all | Expand 10 after
229 234
230 void BidirectionalStreamQuicImpl::OnTrailingHeadersAvailable( 235 void BidirectionalStreamQuicImpl::OnTrailingHeadersAvailable(
231 const SpdyHeaderBlock& headers, 236 const SpdyHeaderBlock& headers,
232 size_t frame_len) { 237 size_t frame_len) {
233 headers_bytes_received_ += frame_len; 238 headers_bytes_received_ += frame_len;
234 if (delegate_) 239 if (delegate_)
235 delegate_->OnTrailersReceived(headers); 240 delegate_->OnTrailersReceived(headers);
236 // |this| can be destroyed after this point. 241 // |this| can be destroyed after this point.
237 } 242 }
238 243
239 void BidirectionalStreamQuicImpl::OnDataAvailable() {
240 // Return early if ReadData has not been called.
241 if (!read_buffer_)
242 return;
243
244 int rv = ReadData(read_buffer_.get(), read_buffer_len_);
245 if (rv == ERR_IO_PENDING) {
246 // Spurrious notification. Wait for the next one.
247 return;
248 }
249 read_buffer_ = nullptr;
250 read_buffer_len_ = 0;
251 if (delegate_)
252 delegate_->OnDataRead(rv);
253 }
254
255 void BidirectionalStreamQuicImpl::OnClose() { 244 void BidirectionalStreamQuicImpl::OnClose() {
256 DCHECK(stream_); 245 DCHECK(stream_);
257 246
258 if (stream_->connection_error() != QUIC_NO_ERROR || 247 if (stream_->connection_error() != QUIC_NO_ERROR ||
259 stream_->stream_error() != QUIC_STREAM_NO_ERROR) { 248 stream_->stream_error() != QUIC_STREAM_NO_ERROR) {
260 NotifyError(session_->IsCryptoHandshakeConfirmed() 249 NotifyError(session_->IsCryptoHandshakeConfirmed()
261 ? ERR_QUIC_PROTOCOL_ERROR 250 ? ERR_QUIC_PROTOCOL_ERROR
262 : ERR_QUIC_HANDSHAKE_FAILED); 251 : ERR_QUIC_HANDSHAKE_FAILED);
263 return; 252 return;
264 } 253 }
(...skipping 51 matching lines...) Expand 10 before | Expand all | Expand 10 after
316 return; 305 return;
317 } 306 }
318 307
319 headers_bytes_received_ += rv; 308 headers_bytes_received_ += rv;
320 negotiated_protocol_ = kProtoQUIC; 309 negotiated_protocol_ = kProtoQUIC;
321 connect_timing_ = session_->GetConnectTiming(); 310 connect_timing_ = session_->GetConnectTiming();
322 if (delegate_) 311 if (delegate_)
323 delegate_->OnHeadersReceived(initial_headers_); 312 delegate_->OnHeadersReceived(initial_headers_);
324 } 313 }
325 314
315 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) {
316 DCHECK_GE(rv, 0);
317 read_buffer_ = nullptr;
318 read_buffer_len_ = 0;
319
320 if (stream_->IsDoneReading()) {
321 // If the write side is closed, OnFinRead() will call
322 // BidirectionalStreamQuicImpl::OnClose().
323 stream_->OnFinRead();
324 }
325
326 if (delegate_)
327 delegate_->OnDataRead(rv);
328 }
329
326 void BidirectionalStreamQuicImpl::NotifyError(int error) { 330 void BidirectionalStreamQuicImpl::NotifyError(int error) {
327 DCHECK_NE(OK, error); 331 DCHECK_NE(OK, error);
328 DCHECK_NE(ERR_IO_PENDING, error); 332 DCHECK_NE(ERR_IO_PENDING, error);
329 333
330 ResetStream(); 334 ResetStream();
331 if (delegate_) { 335 if (delegate_) {
332 response_status_ = error; 336 response_status_ = error;
333 BidirectionalStreamImpl::Delegate* delegate = delegate_; 337 BidirectionalStreamImpl::Delegate* delegate = delegate_;
334 delegate_ = nullptr; 338 delegate_ = nullptr;
335 // Cancel any pending callback. 339 // Cancel any pending callback.
(...skipping 15 matching lines...) Expand all
351 if (!stream_) 355 if (!stream_)
352 return; 356 return;
353 closed_stream_received_bytes_ = stream_->stream_bytes_read(); 357 closed_stream_received_bytes_ = stream_->stream_bytes_read();
354 closed_stream_sent_bytes_ = stream_->stream_bytes_written(); 358 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
355 closed_is_first_stream_ = stream_->IsFirstStream(); 359 closed_is_first_stream_ = stream_->IsFirstStream();
356 stream_->ClearDelegate(); 360 stream_->ClearDelegate();
357 stream_ = nullptr; 361 stream_ = nullptr;
358 } 362 }
359 363
360 } // namespace net 364 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/chromium/bidirectional_stream_quic_impl.h ('k') | net/quic/chromium/bidirectional_stream_quic_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698