Chromium Code Reviews| Index: net/quic/chromium/quic_chromium_client_stream.cc |
| diff --git a/net/quic/chromium/quic_chromium_client_stream.cc b/net/quic/chromium/quic_chromium_client_stream.cc |
| index 0014b40ee101d7408e522d25357722a4b40f5214..286ee9439e6176675998f84e6627caf9ea249e43 100644 |
| --- a/net/quic/chromium/quic_chromium_client_stream.cc |
| +++ b/net/quic/chromium/quic_chromium_client_stream.cc |
| @@ -20,13 +20,30 @@ |
| #include "net/quic/core/spdy_utils.h" |
| namespace net { |
| +namespace { |
| +// Sets a boolean to a value, and restores it to the previous value once |
| +// the saver goes out of scope. |
| +class ScopedBoolSaver { |
| + public: |
| + ScopedBoolSaver(bool* var, bool new_val) : var_(var), old_val_(*var) { |
| + *var_ = new_val; |
| + } |
| + |
| + ~ScopedBoolSaver() { *var_ = old_val_; } |
| -QuicChromiumClientStream::Handle::Handle(QuicChromiumClientStream* stream, |
| - Delegate* delegate) |
| + private: |
| + bool* var_; |
| + bool old_val_; |
| +}; |
| +} // namespace |
| + |
| +QuicChromiumClientStream::Handle::Handle(QuicChromiumClientStream* stream) |
| : stream_(stream), |
| - delegate_(delegate), |
| + may_invoke_callbacks_(true), |
| read_headers_buffer_(nullptr), |
| - read_body_buffer_len_(0) { |
| + read_body_buffer_len_(0), |
| + net_error_(ERR_UNEXPECTED), |
| + weak_factory_(this) { |
| SaveState(); |
| } |
| @@ -39,10 +56,6 @@ QuicChromiumClientStream::Handle::~Handle() { |
| } |
| } |
| -void QuicChromiumClientStream::Handle::ClearDelegate() { |
| - delegate_ = nullptr; |
| -} |
| - |
| void QuicChromiumClientStream::Handle::OnInitialHeadersAvailable() { |
| if (!read_headers_callback_) |
| return; // Wait for ReadInitialHeaders to be called. |
| @@ -51,7 +64,7 @@ void QuicChromiumClientStream::Handle::OnInitialHeadersAvailable() { |
| if (!stream_->DeliverInitialHeaders(read_headers_buffer_, &rv)) |
| rv = ERR_QUIC_PROTOCOL_ERROR; |
| - ResetAndReturn(&read_headers_callback_).Run(rv); |
| + ResetAndRun(&read_headers_callback_, rv); |
| } |
| void QuicChromiumClientStream::Handle::OnTrailingHeadersAvailable() { |
| @@ -62,7 +75,7 @@ void QuicChromiumClientStream::Handle::OnTrailingHeadersAvailable() { |
| if (!stream_->DeliverTrailingHeaders(read_headers_buffer_, &rv)) |
| rv = ERR_QUIC_PROTOCOL_ERROR; |
| - ResetAndReturn(&read_headers_callback_).Run(rv); |
| + ResetAndRun(&read_headers_callback_, rv); |
| } |
| void QuicChromiumClientStream::Handle::OnDataAvailable() { |
| @@ -75,50 +88,70 @@ void QuicChromiumClientStream::Handle::OnDataAvailable() { |
| read_body_buffer_ = nullptr; |
| read_body_buffer_len_ = 0; |
| - ResetAndReturn(&read_body_callback_).Run(rv); |
| + ResetAndRun(&read_body_callback_, rv); |
| } |
| void QuicChromiumClientStream::Handle::OnCanWrite() { |
| if (!write_callback_) |
| return; |
| - base::ResetAndReturn(&write_callback_).Run(OK); |
| + ResetAndRun(&write_callback_, OK); |
| } |
| void QuicChromiumClientStream::Handle::OnClose() { |
| - if (stream_) |
| - SaveState(); |
| - stream_ = nullptr; |
| - if (delegate_) { |
| - auto* delegate = delegate_; |
| - delegate_ = nullptr; |
| - delegate->OnClose(); |
| + if (net_error_ == ERR_UNEXPECTED) { |
| + if (stream_error() == QUIC_STREAM_NO_ERROR && |
| + connection_error() == QUIC_NO_ERROR && fin_sent() && fin_received()) { |
| + net_error_ = ERR_CONNECTION_CLOSED; |
| + } else { |
| + net_error_ = ERR_QUIC_PROTOCOL_ERROR; |
| + } |
| } |
| + OnError(net_error_); |
| } |
| void QuicChromiumClientStream::Handle::OnError(int error) { |
| + net_error_ = error; |
| if (stream_) |
| SaveState(); |
| stream_ = nullptr; |
| - if (delegate_) { |
| - auto* delegate = delegate_; |
| - delegate_ = nullptr; |
| - delegate->OnError(error); |
| + |
| + // Post a task to invoke the callbacks to ensure that there is no reentrancy. |
| + // A ScopedPacketBundler might cause an error which closes the stream under |
| + // the call stack of the owner of the handle. |
| + base::ThreadTaskRunnerHandle::Get()->PostTask( |
|
xunjieli
2017/05/31 00:25:22
Is this PostTask necessary?
The callback shouldn'
Ryan Hamilton
2017/05/31 02:49:56
The problem is not that the caller might call into
xunjieli
2017/06/01 15:30:32
Acknowledged. You are totally right. I missed the
|
| + FROM_HERE, |
| + base::Bind(&QuicChromiumClientStream::Handle::InvokeCallbacksOnError, |
| + weak_factory_.GetWeakPtr(), error)); |
| +} |
| + |
| +void QuicChromiumClientStream::Handle::InvokeCallbacksOnError(int error) { |
| + // Invoking a callback may cause |this| to be deleted. If this happens, no |
| + // more callbacks should be invoked. Guard against this by holding a WeakPtr |
| + // to |this| and ensuring it's still valid. |
| + auto guard(weak_factory_.GetWeakPtr()); |
| + for (auto* callback : |
| + {&read_headers_callback_, &read_body_callback_, &write_callback_}) { |
| + if (*callback) |
| + ResetAndRun(callback, error); |
| + if (!guard.get()) |
| + return; |
| } |
| } |
| int QuicChromiumClientStream::Handle::ReadInitialHeaders( |
| SpdyHeaderBlock* header_block, |
| const CompletionCallback& callback) { |
| + ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
| if (!stream_) |
| - return ERR_CONNECTION_CLOSED; |
| + return net_error_; |
| int frame_len = 0; |
| if (stream_->DeliverInitialHeaders(header_block, &frame_len)) |
| return frame_len; |
| read_headers_buffer_ = header_block; |
| - read_headers_callback_ = callback; |
| + SetCallback(callback, &read_headers_callback_); |
| return ERR_IO_PENDING; |
| } |
| @@ -126,14 +159,18 @@ int QuicChromiumClientStream::Handle::ReadBody( |
| IOBuffer* buffer, |
| int buffer_len, |
| const CompletionCallback& callback) { |
| + ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
| + if (IsDoneReading()) |
| + return OK; |
| + |
| if (!stream_) |
| - return ERR_CONNECTION_CLOSED; |
| + return net_error_; |
| int rv = stream_->Read(buffer, buffer_len); |
| if (rv != ERR_IO_PENDING) |
| return rv; |
| - read_body_callback_ = callback; |
| + SetCallback(callback, &read_body_callback_); |
| read_body_buffer_ = buffer; |
| read_body_buffer_len_ = buffer_len; |
| return ERR_IO_PENDING; |
| @@ -142,40 +179,42 @@ int QuicChromiumClientStream::Handle::ReadBody( |
| int QuicChromiumClientStream::Handle::ReadTrailingHeaders( |
| SpdyHeaderBlock* header_block, |
| const CompletionCallback& callback) { |
| + ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
| if (!stream_) |
| - return ERR_CONNECTION_CLOSED; |
| + return net_error_; |
| int frame_len = 0; |
| if (stream_->DeliverTrailingHeaders(header_block, &frame_len)) |
| return frame_len; |
| read_headers_buffer_ = header_block; |
| - read_headers_callback_ = callback; |
| + SetCallback(callback, &read_headers_callback_); |
| return ERR_IO_PENDING; |
| } |
| -size_t QuicChromiumClientStream::Handle::WriteHeaders( |
| +int QuicChromiumClientStream::Handle::WriteHeaders( |
| SpdyHeaderBlock header_block, |
| bool fin, |
| QuicReferenceCountedPointer<QuicAckListenerInterface> |
| ack_notifier_delegate) { |
| if (!stream_) |
| return 0; |
| - return stream_->WriteHeaders(std::move(header_block), fin, |
| - ack_notifier_delegate); |
| + return HandleIOComplete(stream_->WriteHeaders(std::move(header_block), fin, |
|
xunjieli
2017/05/31 00:25:22
Doesn't stream_->WriteHeaders() return a size_t ?
Ryan Hamilton
2017/05/31 02:49:56
What should we check?
|
| + ack_notifier_delegate)); |
| } |
| int QuicChromiumClientStream::Handle::WriteStreamData( |
| base::StringPiece data, |
| bool fin, |
| const CompletionCallback& callback) { |
| + ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
| if (!stream_) |
| - return ERR_CONNECTION_CLOSED; |
| + return net_error_; |
| if (stream_->WriteStreamData(data, fin)) |
| - return OK; |
| + return HandleIOComplete(OK); |
| - write_callback_ = callback; |
| + SetCallback(callback, &write_callback_); |
| return ERR_IO_PENDING; |
| } |
| @@ -184,23 +223,25 @@ int QuicChromiumClientStream::Handle::WritevStreamData( |
| const std::vector<int>& lengths, |
| bool fin, |
| const CompletionCallback& callback) { |
| + ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
| if (!stream_) |
| - return ERR_CONNECTION_CLOSED; |
| + return net_error_; |
| if (stream_->WritevStreamData(buffers, lengths, fin)) |
| - return OK; |
| + return HandleIOComplete(OK); |
| - write_callback_ = callback; |
| + SetCallback(callback, &write_callback_); |
| return ERR_IO_PENDING; |
| } |
| int QuicChromiumClientStream::Handle::Read(IOBuffer* buf, int buf_len) { |
| if (!stream_) |
| - return ERR_CONNECTION_CLOSED; |
| + return net_error_; |
| return stream_->Read(buf, buf_len); |
| } |
| void QuicChromiumClientStream::Handle::OnFinRead() { |
| + read_headers_callback_.Reset(); |
|
xunjieli
2017/05/31 00:25:22
Should we also reset the read trailers callback an
Ryan Hamilton
2017/05/31 02:49:56
There is no read trailers callback, only read_head
xunjieli
2017/06/01 15:30:32
Got it. Thanks for the details!
|
| if (stream_) |
| stream_->OnFinRead(); |
| } |
| @@ -300,11 +341,6 @@ bool QuicChromiumClientStream::Handle::can_migrate() { |
| return stream_->can_migrate(); |
| } |
| -QuicChromiumClientStream::Delegate* |
| -QuicChromiumClientStream::Handle::GetDelegate() { |
| - return delegate_; |
| -} |
| - |
| void QuicChromiumClientStream::Handle::SaveState() { |
| DCHECK(stream_); |
| fin_sent_ = stream_->fin_sent(); |
| @@ -320,6 +356,31 @@ void QuicChromiumClientStream::Handle::SaveState() { |
| priority_ = stream_->priority(); |
| } |
| +void QuicChromiumClientStream::Handle::SetCallback( |
| + CompletionCallback new_callback, |
| + CompletionCallback* callback) { |
| + CHECK(!may_invoke_callbacks_); |
|
xunjieli
2017/05/31 00:25:22
Should we add a TODO to convert this CHECK to DCHE
Ryan Hamilton
2017/05/31 02:49:56
Done.
|
| + *callback = new_callback; |
| +} |
| + |
| +void QuicChromiumClientStream::Handle::ResetAndRun(CompletionCallback* callback, |
| + int rv) { |
| + CHECK(may_invoke_callbacks_); |
| + ResetAndReturn(callback).Run(rv); |
| +} |
| + |
| +int QuicChromiumClientStream::Handle::HandleIOComplete(int rv) { |
| + if (rv < 0 || stream_) |
|
xunjieli
2017/05/31 00:25:22
Why do we need the |stream_| check? Could you add
Ryan Hamilton
2017/05/31 02:49:56
If |stream_| is still valid the stream has not bee
|
| + return rv; |
| + |
| + if (stream_error_ == QUIC_STREAM_NO_ERROR && |
| + connection_error_ == QUIC_NO_ERROR && fin_sent_ && fin_received_) { |
| + return rv; |
| + } |
| + |
| + return net_error_; |
| +} |
| + |
| QuicChromiumClientStream::QuicChromiumClientStream( |
| QuicStreamId id, |
| QuicClientSessionBase* session, |
| @@ -479,11 +540,10 @@ bool QuicChromiumClientStream::WritevStreamData( |
| } |
| std::unique_ptr<QuicChromiumClientStream::Handle> |
| -QuicChromiumClientStream::CreateHandle( |
| - QuicChromiumClientStream::Delegate* delegate) { |
| +QuicChromiumClientStream::CreateHandle() { |
| DCHECK(!handle_); |
| auto handle = std::unique_ptr<QuicChromiumClientStream::Handle>( |
| - new QuicChromiumClientStream::Handle(this, delegate)); |
| + new QuicChromiumClientStream::Handle(this)); |
| handle_ = handle.get(); |
| // Should this perhaps be via PostTask to make reasoning simpler? |
| @@ -552,7 +612,7 @@ void QuicChromiumClientStream::NotifyHandleOfTrailingHeadersAvailable() { |
| return; |
| DCHECK(headers_delivered_); |
| - // Post an async task to notify delegate of the FIN flag. |
| + // Post an async task to notify handle of the FIN flag. |
| NotifyHandleOfDataAvailableLater(); |
| handle_->OnTrailingHeadersAvailable(); |
| } |