Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(193)

Side by Side Diff: net/quic/chromium/bidirectional_stream_quic_impl.cc

Issue 2920873002: Ensure that BidirectionalStreamQuicImpl does not synchronously invoke callbacks (Closed)
Patch Set: Rebase Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h" 5 #include "net/quic/chromium/bidirectional_stream_quic_impl.h"
6 6
7 #include <utility> 7 #include <utility>
8 8
9 #include "base/bind.h" 9 #include "base/bind.h"
10 #include "base/location.h" 10 #include "base/location.h"
11 #include "base/logging.h" 11 #include "base/logging.h"
12 #include "base/threading/thread_task_runner_handle.h" 12 #include "base/threading/thread_task_runner_handle.h"
13 #include "base/timer/timer.h" 13 #include "base/timer/timer.h"
14 #include "net/http/bidirectional_stream_request_info.h" 14 #include "net/http/bidirectional_stream_request_info.h"
15 #include "net/quic/core/quic_connection.h" 15 #include "net/quic/core/quic_connection.h"
16 #include "net/quic/platform/api/quic_string_piece.h" 16 #include "net/quic/platform/api/quic_string_piece.h"
17 #include "net/socket/next_proto.h" 17 #include "net/socket/next_proto.h"
18 #include "net/spdy/chromium/spdy_http_utils.h" 18 #include "net/spdy/chromium/spdy_http_utils.h"
19 #include "net/spdy/core/spdy_header_block.h" 19 #include "net/spdy/core/spdy_header_block.h"
20 20
21 namespace net { 21 namespace net {
22 namespace {
23 // Sets a boolean to a value, and restores it to the previous value once
24 // the saver goes out of scope.
25 class ScopedBoolSaver {
26 public:
27 ScopedBoolSaver(bool* var, bool new_val) : var_(var), old_val_(*var) {
28 *var_ = new_val;
29 }
30
31 ~ScopedBoolSaver() { *var_ = old_val_; }
32
33 private:
34 bool* var_;
35 bool old_val_;
36 };
37 } // namespace
22 38
23 BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl( 39 BidirectionalStreamQuicImpl::BidirectionalStreamQuicImpl(
24 std::unique_ptr<QuicChromiumClientSession::Handle> session) 40 std::unique_ptr<QuicChromiumClientSession::Handle> session)
25 : session_(std::move(session)), 41 : session_(std::move(session)),
26 stream_(nullptr), 42 stream_(nullptr),
27 request_info_(nullptr), 43 request_info_(nullptr),
28 delegate_(nullptr), 44 delegate_(nullptr),
29 response_status_(OK), 45 response_status_(OK),
30 negotiated_protocol_(kProtoUnknown), 46 negotiated_protocol_(kProtoUnknown),
31 read_buffer_len_(0), 47 read_buffer_len_(0),
32 headers_bytes_received_(0), 48 headers_bytes_received_(0),
33 headers_bytes_sent_(0), 49 headers_bytes_sent_(0),
34 closed_stream_received_bytes_(0), 50 closed_stream_received_bytes_(0),
35 closed_stream_sent_bytes_(0), 51 closed_stream_sent_bytes_(0),
36 closed_is_first_stream_(false), 52 closed_is_first_stream_(false),
37 has_sent_headers_(false), 53 has_sent_headers_(false),
38 send_request_headers_automatically_(true), 54 send_request_headers_automatically_(true),
55 may_invoke_callbacks_(true),
39 weak_factory_(this) {} 56 weak_factory_(this) {}
40 57
41 BidirectionalStreamQuicImpl::~BidirectionalStreamQuicImpl() { 58 BidirectionalStreamQuicImpl::~BidirectionalStreamQuicImpl() {
42 if (stream_) { 59 if (stream_) {
43 delegate_ = nullptr; 60 delegate_ = nullptr;
44 stream_->Reset(QUIC_STREAM_CANCELLED); 61 stream_->Reset(QUIC_STREAM_CANCELLED);
45 } 62 }
46 } 63 }
47 64
48 void BidirectionalStreamQuicImpl::Start( 65 void BidirectionalStreamQuicImpl::Start(
49 const BidirectionalStreamRequestInfo* request_info, 66 const BidirectionalStreamRequestInfo* request_info,
50 const NetLogWithSource& net_log, 67 const NetLogWithSource& net_log,
51 bool send_request_headers_automatically, 68 bool send_request_headers_automatically,
52 BidirectionalStreamImpl::Delegate* delegate, 69 BidirectionalStreamImpl::Delegate* delegate,
53 std::unique_ptr<base::Timer> /* timer */) { 70 std::unique_ptr<base::Timer> /* timer */) {
71 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
54 DCHECK(!stream_); 72 DCHECK(!stream_);
55 CHECK(delegate); 73 CHECK(delegate);
56 DLOG_IF(WARNING, !session_->IsConnected()) 74 DLOG_IF(WARNING, !session_->IsConnected())
57 << "Trying to start request headers after session has been closed."; 75 << "Trying to start request headers after session has been closed.";
58 76
59 send_request_headers_automatically_ = send_request_headers_automatically; 77 send_request_headers_automatically_ = send_request_headers_automatically;
60 delegate_ = delegate; 78 delegate_ = delegate;
61 request_info_ = request_info; 79 request_info_ = request_info;
62 80
63 int rv = session_->RequestStream( 81 int rv = session_->RequestStream(
64 request_info_->method == "POST", 82 request_info_->method == "POST",
65 base::Bind(&BidirectionalStreamQuicImpl::OnStreamReady, 83 base::Bind(&BidirectionalStreamQuicImpl::OnStreamReady,
66 weak_factory_.GetWeakPtr())); 84 weak_factory_.GetWeakPtr()));
67 if (rv == ERR_IO_PENDING) 85 if (rv == ERR_IO_PENDING)
68 return; 86 return;
69 87
70 if (rv != OK) { 88 if (rv != OK) {
71 base::ThreadTaskRunnerHandle::Get()->PostTask( 89 base::ThreadTaskRunnerHandle::Get()->PostTask(
72 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, 90 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
73 weak_factory_.GetWeakPtr(), 91 weak_factory_.GetWeakPtr(),
74 session_->IsCryptoHandshakeConfirmed() 92 session_->IsCryptoHandshakeConfirmed()
75 ? rv 93 ? rv
76 : ERR_QUIC_HANDSHAKE_FAILED)); 94 : ERR_QUIC_HANDSHAKE_FAILED));
77 return; 95 return;
78 } 96 }
79 97
80 OnStreamReady(rv); 98 base::ThreadTaskRunnerHandle::Get()->PostTask(
99 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnStreamReady,
100 weak_factory_.GetWeakPtr(), rv));
81 } 101 }
82 102
83 void BidirectionalStreamQuicImpl::SendRequestHeaders() { 103 void BidirectionalStreamQuicImpl::SendRequestHeaders() {
104 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
105 // If this fails, a task will have been posted to notify the delegate
106 // asynchronously.
84 WriteHeaders(); 107 WriteHeaders();
85 } 108 }
86 109
87 bool BidirectionalStreamQuicImpl::WriteHeaders() { 110 bool BidirectionalStreamQuicImpl::WriteHeaders() {
88 DCHECK(!has_sent_headers_); 111 DCHECK(!has_sent_headers_);
89 if (!stream_) { 112 if (!stream_) {
90 LOG(ERROR) 113 LOG(ERROR)
91 << "Trying to send request headers after stream has been destroyed."; 114 << "Trying to send request headers after stream has been destroyed.";
92 base::ThreadTaskRunnerHandle::Get()->PostTask( 115 base::ThreadTaskRunnerHandle::Get()->PostTask(
93 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, 116 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
(...skipping 16 matching lines...) Expand all
110 std::move(headers), request_info_->end_stream_on_headers, nullptr); 133 std::move(headers), request_info_->end_stream_on_headers, nullptr);
111 if (!stream_) 134 if (!stream_)
112 return false; 135 return false;
113 136
114 headers_bytes_sent_ += headers_bytes_sent; 137 headers_bytes_sent_ += headers_bytes_sent;
115 has_sent_headers_ = true; 138 has_sent_headers_ = true;
116 return true; 139 return true;
117 } 140 }
118 141
119 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) { 142 int BidirectionalStreamQuicImpl::ReadData(IOBuffer* buffer, int buffer_len) {
143 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
120 DCHECK(buffer); 144 DCHECK(buffer);
121 DCHECK(buffer_len); 145 DCHECK(buffer_len);
122 146
123 if (!stream_) { 147 if (!stream_) {
124 // If the stream is already closed, there is no body to read. 148 // If the stream is already closed, there is no body to read.
125 return response_status_; 149 return response_status_;
126 } 150 }
127 int rv = stream_->ReadBody( 151 int rv = stream_->ReadBody(
128 buffer, buffer_len, 152 buffer, buffer_len,
129 base::Bind(&BidirectionalStreamQuicImpl::OnReadDataComplete, 153 base::Bind(&BidirectionalStreamQuicImpl::OnReadDataComplete,
(...skipping 12 matching lines...) Expand all
142 // BidirectionalStreamQuicImpl::OnClose(). 166 // BidirectionalStreamQuicImpl::OnClose().
143 stream_->OnFinRead(); 167 stream_->OnFinRead();
144 } 168 }
145 return rv; 169 return rv;
146 } 170 }
147 171
148 void BidirectionalStreamQuicImpl::SendvData( 172 void BidirectionalStreamQuicImpl::SendvData(
149 const std::vector<scoped_refptr<IOBuffer>>& buffers, 173 const std::vector<scoped_refptr<IOBuffer>>& buffers,
150 const std::vector<int>& lengths, 174 const std::vector<int>& lengths,
151 bool end_stream) { 175 bool end_stream) {
176 ScopedBoolSaver saver(&may_invoke_callbacks_, false);
152 DCHECK_EQ(buffers.size(), lengths.size()); 177 DCHECK_EQ(buffers.size(), lengths.size());
153 178
154 if (!stream_) { 179 if (!stream_) {
155 LOG(ERROR) << "Trying to send data after stream has been destroyed."; 180 LOG(ERROR) << "Trying to send data after stream has been destroyed.";
156 base::ThreadTaskRunnerHandle::Get()->PostTask( 181 base::ThreadTaskRunnerHandle::Get()->PostTask(
157 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, 182 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError,
158 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); 183 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
159 return; 184 return;
160 } 185 }
161 186
(...skipping 87 matching lines...) Expand 10 before | Expand all | Expand 10 after
249 stream_ = session_->ReleaseStream(this); 274 stream_ = session_->ReleaseStream(this);
250 275
251 base::ThreadTaskRunnerHandle::Get()->PostTask( 276 base::ThreadTaskRunnerHandle::Get()->PostTask(
252 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::ReadInitialHeaders, 277 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::ReadInitialHeaders,
253 weak_factory_.GetWeakPtr())); 278 weak_factory_.GetWeakPtr()));
254 279
255 NotifyStreamReady(); 280 NotifyStreamReady();
256 } 281 }
257 282
258 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) { 283 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) {
284 CHECK(may_invoke_callbacks_);
259 DCHECK(rv == OK || !stream_); 285 DCHECK(rv == OK || !stream_);
260 if (rv != 0) { 286 if (rv != 0) {
261 NotifyError(rv); 287 NotifyError(rv);
262 return; 288 return;
263 } 289 }
264 290
265 if (delegate_) 291 if (delegate_)
266 delegate_->OnDataSent(); 292 delegate_->OnDataSent();
267 } 293 }
268 294
269 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) { 295 void BidirectionalStreamQuicImpl::OnReadInitialHeadersComplete(int rv) {
296 CHECK(may_invoke_callbacks_);
270 DCHECK_NE(ERR_IO_PENDING, rv); 297 DCHECK_NE(ERR_IO_PENDING, rv);
271 if (rv < 0) { 298 if (rv < 0) {
272 NotifyError(rv); 299 NotifyError(rv);
273 return; 300 return;
274 } 301 }
275 302
276 headers_bytes_received_ += rv; 303 headers_bytes_received_ += rv;
277 negotiated_protocol_ = kProtoQUIC; 304 negotiated_protocol_ = kProtoQUIC;
278 connect_timing_ = session_->GetConnectTiming(); 305 connect_timing_ = session_->GetConnectTiming();
279 base::ThreadTaskRunnerHandle::Get()->PostTask( 306 base::ThreadTaskRunnerHandle::Get()->PostTask(
(...skipping 17 matching lines...) Expand all
297 int rv = stream_->ReadTrailingHeaders( 324 int rv = stream_->ReadTrailingHeaders(
298 &trailing_headers_, 325 &trailing_headers_,
299 base::Bind(&BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete, 326 base::Bind(&BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete,
300 weak_factory_.GetWeakPtr())); 327 weak_factory_.GetWeakPtr()));
301 328
302 if (rv != ERR_IO_PENDING) 329 if (rv != ERR_IO_PENDING)
303 OnReadTrailingHeadersComplete(rv); 330 OnReadTrailingHeadersComplete(rv);
304 } 331 }
305 332
306 void BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete(int rv) { 333 void BidirectionalStreamQuicImpl::OnReadTrailingHeadersComplete(int rv) {
334 CHECK(may_invoke_callbacks_);
307 DCHECK_NE(ERR_IO_PENDING, rv); 335 DCHECK_NE(ERR_IO_PENDING, rv);
308 if (rv < 0) { 336 if (rv < 0) {
309 NotifyError(rv); 337 NotifyError(rv);
310 return; 338 return;
311 } 339 }
312 340
313 headers_bytes_received_ += rv; 341 headers_bytes_received_ += rv;
314 342
315 if (delegate_) 343 if (delegate_)
316 delegate_->OnTrailersReceived(trailing_headers_); 344 delegate_->OnTrailersReceived(trailing_headers_);
317 } 345 }
318 346
319 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) { 347 void BidirectionalStreamQuicImpl::OnReadDataComplete(int rv) {
348 CHECK(may_invoke_callbacks_);
320 DCHECK_GE(rv, 0); 349 DCHECK_GE(rv, 0);
321 read_buffer_ = nullptr; 350 read_buffer_ = nullptr;
322 read_buffer_len_ = 0; 351 read_buffer_len_ = 0;
323 352
324 if (stream_->IsDoneReading()) { 353 if (stream_->IsDoneReading()) {
325 // If the write side is closed, OnFinRead() will call 354 // If the write side is closed, OnFinRead() will call
326 // BidirectionalStreamQuicImpl::OnClose(). 355 // BidirectionalStreamQuicImpl::OnClose().
327 stream_->OnFinRead(); 356 stream_->OnFinRead();
328 } 357 }
329 358
(...skipping 24 matching lines...) Expand all
354 } else { 383 } else {
355 NotifyFailure(delegate, error); 384 NotifyFailure(delegate, error);
356 // |this| might be destroyed at this point. 385 // |this| might be destroyed at this point.
357 } 386 }
358 } 387 }
359 } 388 }
360 389
361 void BidirectionalStreamQuicImpl::NotifyFailure( 390 void BidirectionalStreamQuicImpl::NotifyFailure(
362 BidirectionalStreamImpl::Delegate* delegate, 391 BidirectionalStreamImpl::Delegate* delegate,
363 int error) { 392 int error) {
393 CHECK(may_invoke_callbacks_);
364 delegate->OnFailed(error); 394 delegate->OnFailed(error);
365 // |this| might be destroyed at this point. 395 // |this| might be destroyed at this point.
366 } 396 }
367 397
368 void BidirectionalStreamQuicImpl::NotifyStreamReady() { 398 void BidirectionalStreamQuicImpl::NotifyStreamReady() {
399 CHECK(may_invoke_callbacks_);
369 // Sending the request might result in the stream being closed. 400 // Sending the request might result in the stream being closed.
370 if (send_request_headers_automatically_ && !WriteHeaders()) 401 if (send_request_headers_automatically_ && !WriteHeaders())
371 return; 402 return;
372 403
373 if (delegate_) 404 if (delegate_)
374 delegate_->OnStreamReady(has_sent_headers_); 405 delegate_->OnStreamReady(has_sent_headers_);
375 } 406 }
376 407
377 void BidirectionalStreamQuicImpl::ResetStream() { 408 void BidirectionalStreamQuicImpl::ResetStream() {
378 if (!stream_) 409 if (!stream_)
379 return; 410 return;
380 closed_stream_received_bytes_ = stream_->stream_bytes_read(); 411 closed_stream_received_bytes_ = stream_->stream_bytes_read();
381 closed_stream_sent_bytes_ = stream_->stream_bytes_written(); 412 closed_stream_sent_bytes_ = stream_->stream_bytes_written();
382 closed_is_first_stream_ = stream_->IsFirstStream(); 413 closed_is_first_stream_ = stream_->IsFirstStream();
383 stream_->ClearDelegate(); 414 stream_->ClearDelegate();
384 stream_ = nullptr; 415 stream_ = nullptr;
385 } 416 }
386 417
387 } // namespace net 418 } // namespace net
OLDNEW
« no previous file with comments | « net/quic/chromium/bidirectional_stream_quic_impl.h ('k') | net/quic/chromium/bidirectional_stream_quic_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698