Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_impl.cc

Issue 2462463002: Make net::BidirectionalStream handle RST_STREAM_NO_ERROR (Closed)
Patch Set: Fix tests Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/spdy/bidirectional_stream_spdy_impl.h" 5 #include "net/spdy/bidirectional_stream_spdy_impl.h"
6 6
7 #include <utility>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
8 #include "base/location.h" 10 #include "base/location.h"
9 #include "base/logging.h" 11 #include "base/logging.h"
10 #include "base/time/time.h" 12 #include "base/time/time.h"
11 #include "base/timer/timer.h" 13 #include "base/timer/timer.h"
12 #include "net/http/bidirectional_stream_request_info.h" 14 #include "net/http/bidirectional_stream_request_info.h"
13 #include "net/spdy/spdy_buffer.h" 15 #include "net/spdy/spdy_buffer.h"
14 #include "net/spdy/spdy_header_block.h" 16 #include "net/spdy/spdy_header_block.h"
15 #include "net/spdy/spdy_http_utils.h" 17 #include "net/spdy/spdy_http_utils.h"
16 #include "net/spdy/spdy_stream.h" 18 #include "net/spdy/spdy_stream.h"
(...skipping 10 matching lines...) Expand all
27 } // namespace 29 } // namespace
28 30
29 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl( 31 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl(
30 const base::WeakPtr<SpdySession>& spdy_session) 32 const base::WeakPtr<SpdySession>& spdy_session)
31 : spdy_session_(spdy_session), 33 : spdy_session_(spdy_session),
32 request_info_(nullptr), 34 request_info_(nullptr),
33 delegate_(nullptr), 35 delegate_(nullptr),
34 negotiated_protocol_(kProtoUnknown), 36 negotiated_protocol_(kProtoUnknown),
35 more_read_data_pending_(false), 37 more_read_data_pending_(false),
36 read_buffer_len_(0), 38 read_buffer_len_(0),
39 written_end_of_stream_(false),
40 write_pending_(false),
37 stream_closed_(false), 41 stream_closed_(false),
38 closed_stream_status_(ERR_FAILED), 42 closed_stream_status_(ERR_FAILED),
39 closed_stream_received_bytes_(0), 43 closed_stream_received_bytes_(0),
40 closed_stream_sent_bytes_(0), 44 closed_stream_sent_bytes_(0),
41 closed_has_load_timing_info_(false), 45 closed_has_load_timing_info_(false),
42 weak_factory_(this) {} 46 weak_factory_(this) {}
43 47
44 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() { 48 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() {
45 // Sends a RST to the remote if the stream is destroyed before it completes. 49 // Sends a RST to the remote if the stream is destroyed before it completes.
46 ResetStream(); 50 ResetStream();
51 DCHECK(!write_pending_);
47 } 52 }
48 53
49 void BidirectionalStreamSpdyImpl::Start( 54 void BidirectionalStreamSpdyImpl::Start(
50 const BidirectionalStreamRequestInfo* request_info, 55 const BidirectionalStreamRequestInfo* request_info,
51 const NetLogWithSource& net_log, 56 const NetLogWithSource& net_log,
52 bool /*send_request_headers_automatically*/, 57 bool /*send_request_headers_automatically*/,
53 BidirectionalStreamImpl::Delegate* delegate, 58 BidirectionalStreamImpl::Delegate* delegate,
54 std::unique_ptr<base::Timer> timer) { 59 std::unique_ptr<base::Timer> timer) {
55 DCHECK(!stream_); 60 DCHECK(!stream_);
56 DCHECK(timer); 61 DCHECK(timer);
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
100 // called upon completion. 105 // called upon completion.
101 read_buffer_ = buf; 106 read_buffer_ = buf;
102 read_buffer_len_ = buf_len; 107 read_buffer_len_ = buf_len;
103 return ERR_IO_PENDING; 108 return ERR_IO_PENDING;
104 } 109 }
105 110
106 void BidirectionalStreamSpdyImpl::SendData(const scoped_refptr<IOBuffer>& data, 111 void BidirectionalStreamSpdyImpl::SendData(const scoped_refptr<IOBuffer>& data,
107 int length, 112 int length,
108 bool end_stream) { 113 bool end_stream) {
109 DCHECK(length > 0 || (length == 0 && end_stream)); 114 DCHECK(length > 0 || (length == 0 && end_stream));
115 DCHECK(!write_pending_);
110 116
111 if (!stream_) { 117 if (written_end_of_stream_) {
112 LOG(ERROR) << "Trying to send data after stream has been destroyed."; 118 LOG(ERROR) << "Writing after end of stream is written.";
113 base::ThreadTaskRunnerHandle::Get()->PostTask( 119 base::ThreadTaskRunnerHandle::Get()->PostTask(
114 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, 120 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError,
115 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); 121 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
116 return; 122 return;
117 } 123 }
118 124
125 write_pending_ = true;
126 written_end_of_stream_ = end_stream;
127 if (MaybeHandleStreamClosedInSendData())
128 return;
129
119 DCHECK(!stream_closed_); 130 DCHECK(!stream_closed_);
120 stream_->SendData(data.get(), length, 131 stream_->SendData(data.get(), length,
121 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); 132 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
122 } 133 }
123 134
124 void BidirectionalStreamSpdyImpl::SendvData( 135 void BidirectionalStreamSpdyImpl::SendvData(
125 const std::vector<scoped_refptr<IOBuffer>>& buffers, 136 const std::vector<scoped_refptr<IOBuffer>>& buffers,
126 const std::vector<int>& lengths, 137 const std::vector<int>& lengths,
127 bool end_stream) { 138 bool end_stream) {
128 DCHECK_EQ(buffers.size(), lengths.size()); 139 DCHECK_EQ(buffers.size(), lengths.size());
140 DCHECK(!write_pending_);
129 141
130 if (!stream_) { 142 if (written_end_of_stream_) {
131 LOG(ERROR) << "Trying to send data after stream has been destroyed."; 143 LOG(ERROR) << "Writing after end of stream is written.";
132 base::ThreadTaskRunnerHandle::Get()->PostTask( 144 base::ThreadTaskRunnerHandle::Get()->PostTask(
133 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, 145 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError,
134 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); 146 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
135 return; 147 return;
136 } 148 }
137 149
150 write_pending_ = true;
151 written_end_of_stream_ = end_stream;
152 if (MaybeHandleStreamClosedInSendData())
153 return;
154
138 DCHECK(!stream_closed_); 155 DCHECK(!stream_closed_);
139 int total_len = 0; 156 int total_len = 0;
140 for (int len : lengths) { 157 for (int len : lengths) {
141 total_len += len; 158 total_len += len;
142 } 159 }
143 160
144 pending_combined_buffer_ = new net::IOBuffer(total_len); 161 pending_combined_buffer_ = new net::IOBuffer(total_len);
145 int len = 0; 162 int len = 0;
146 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames. 163 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames.
147 for (size_t i = 0; i < buffers.size(); ++i) { 164 for (size_t i = 0; i < buffers.size(); ++i) {
(...skipping 78 matching lines...) Expand 10 before | Expand all | Expand 10 after
226 // recv window size accordingly. 243 // recv window size accordingly.
227 read_data_queue_.Enqueue(std::move(buffer)); 244 read_data_queue_.Enqueue(std::move(buffer));
228 if (read_buffer_) { 245 if (read_buffer_) {
229 // Handing small chunks of data to the caller creates measurable overhead. 246 // Handing small chunks of data to the caller creates measurable overhead.
230 // So buffer data in short time-spans and send a single read notification. 247 // So buffer data in short time-spans and send a single read notification.
231 ScheduleBufferedRead(); 248 ScheduleBufferedRead();
232 } 249 }
233 } 250 }
234 251
235 void BidirectionalStreamSpdyImpl::OnDataSent() { 252 void BidirectionalStreamSpdyImpl::OnDataSent() {
236 DCHECK(stream_); 253 DCHECK(write_pending_);
237 DCHECK(!stream_closed_);
238 254
239 pending_combined_buffer_ = nullptr; 255 pending_combined_buffer_ = nullptr;
256 write_pending_ = false;
257
240 if (delegate_) 258 if (delegate_)
241 delegate_->OnDataSent(); 259 delegate_->OnDataSent();
242 } 260 }
243 261
244 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) { 262 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) {
245 DCHECK(stream_); 263 DCHECK(stream_);
246 DCHECK(!stream_closed_); 264 DCHECK(!stream_closed_);
247 265
248 if (delegate_) 266 if (delegate_)
249 delegate_->OnTrailersReceived(trailers); 267 delegate_->OnTrailersReceived(trailers);
(...skipping 11 matching lines...) Expand all
261 279
262 if (status != OK) { 280 if (status != OK) {
263 NotifyError(status); 281 NotifyError(status);
264 return; 282 return;
265 } 283 }
266 ResetStream(); 284 ResetStream();
267 // Complete any remaining read, as all data has been buffered. 285 // Complete any remaining read, as all data has been buffered.
268 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will 286 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will
269 // do nothing. 287 // do nothing.
270 timer_->Stop(); 288 timer_->Stop();
289
290 // |this| might get destroyed after calling into |delegate_| in
291 // DoBufferedRead().
292 auto weak_this = weak_factory_.GetWeakPtr();
271 DoBufferedRead(); 293 DoBufferedRead();
294 if (weak_this.get() && write_pending_)
295 OnDataSent();
272 } 296 }
273 297
274 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() { 298 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() {
275 SpdyHeaderBlock headers; 299 SpdyHeaderBlock headers;
276 HttpRequestInfo http_request_info; 300 HttpRequestInfo http_request_info;
277 http_request_info.url = request_info_->url; 301 http_request_info.url = request_info_->url;
278 http_request_info.method = request_info_->method; 302 http_request_info.method = request_info_->method;
279 http_request_info.extra_headers = request_info_->extra_headers; 303 http_request_info.extra_headers = request_info_->extra_headers;
280 304
281 CreateSpdyHeadersFromHttpRequest( 305 CreateSpdyHeadersFromHttpRequest(
282 http_request_info, http_request_info.extra_headers, true, &headers); 306 http_request_info, http_request_info.extra_headers, true, &headers);
307 written_end_of_stream_ = request_info_->end_stream_on_headers;
283 return stream_->SendRequestHeaders(std::move(headers), 308 return stream_->SendRequestHeaders(std::move(headers),
284 request_info_->end_stream_on_headers 309 request_info_->end_stream_on_headers
285 ? NO_MORE_DATA_TO_SEND 310 ? NO_MORE_DATA_TO_SEND
286 : MORE_DATA_TO_SEND); 311 : MORE_DATA_TO_SEND);
287 } 312 }
288 313
289 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) { 314 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) {
290 DCHECK_NE(ERR_IO_PENDING, rv); 315 DCHECK_NE(ERR_IO_PENDING, rv);
291 if (rv == OK) { 316 if (rv == OK) {
292 stream_ = stream_request_.ReleaseStream(); 317 stream_ = stream_request_.ReleaseStream();
293 stream_->SetDelegate(this); 318 stream_->SetDelegate(this);
294 rv = SendRequestHeadersHelper(); 319 rv = SendRequestHeadersHelper();
295 if (rv == OK) { 320 if (rv == OK) {
296 OnRequestHeadersSent(); 321 OnRequestHeadersSent();
297 return; 322 return;
298 } else if (rv == ERR_IO_PENDING) { 323 } else if (rv == ERR_IO_PENDING) {
299 return; 324 return;
300 } 325 }
301 } 326 }
302 NotifyError(rv); 327 NotifyError(rv);
303 } 328 }
304 329
305 void BidirectionalStreamSpdyImpl::NotifyError(int rv) { 330 void BidirectionalStreamSpdyImpl::NotifyError(int rv) {
306 ResetStream(); 331 ResetStream();
332 write_pending_ = false;
307 if (delegate_) { 333 if (delegate_) {
308 BidirectionalStreamImpl::Delegate* delegate = delegate_; 334 BidirectionalStreamImpl::Delegate* delegate = delegate_;
309 delegate_ = nullptr; 335 delegate_ = nullptr;
310 // Cancel any pending callback. 336 // Cancel any pending callback.
311 weak_factory_.InvalidateWeakPtrs(); 337 weak_factory_.InvalidateWeakPtrs();
312 delegate->OnFailed(rv); 338 delegate->OnFailed(rv);
313 // |this| can be null when returned from delegate. 339 // |this| can be null when returned from delegate.
314 } 340 }
315 } 341 }
316 342
(...skipping 49 matching lines...) Expand 10 before | Expand all | Expand 10 after
366 } 392 }
367 393
368 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const { 394 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const {
369 if (stream_closed_) 395 if (stream_closed_)
370 return false; 396 return false;
371 DCHECK_GT(read_buffer_len_, 0); 397 DCHECK_GT(read_buffer_len_, 0);
372 return read_data_queue_.GetTotalSize() < 398 return read_data_queue_.GetTotalSize() <
373 static_cast<size_t>(read_buffer_len_); 399 static_cast<size_t>(read_buffer_len_);
374 } 400 }
375 401
402 bool BidirectionalStreamSpdyImpl::MaybeHandleStreamClosedInSendData() {
403 if (stream_)
404 return false;
405 // If |stream_| is closed without an error before client half closes,
406 // blackhole any pending write data. crbug.com/650438.
407 if (stream_closed_ && closed_stream_status_ == OK) {
408 base::ThreadTaskRunnerHandle::Get()->PostTask(
409 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::OnDataSent,
410 weak_factory_.GetWeakPtr()));
411 return true;
412 }
413 LOG(ERROR) << "Trying to send data after stream has been destroyed.";
414 base::ThreadTaskRunnerHandle::Get()->PostTask(
415 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError,
416 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
417 return true;
418 }
419
376 } // namespace net 420 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/bidirectional_stream_spdy_impl.h ('k') | net/spdy/bidirectional_stream_spdy_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698