Index: net/quic/chromium/quic_chromium_client_stream.cc |
diff --git a/net/quic/chromium/quic_chromium_client_stream.cc b/net/quic/chromium/quic_chromium_client_stream.cc |
index 0014b40ee101d7408e522d25357722a4b40f5214..029cd4d87a041444f3502b720fcb7ea326fb9564 100644 |
--- a/net/quic/chromium/quic_chromium_client_stream.cc |
+++ b/net/quic/chromium/quic_chromium_client_stream.cc |
@@ -20,13 +20,30 @@ |
#include "net/quic/core/spdy_utils.h" |
namespace net { |
+namespace { |
+// Sets a boolean to a value, and restores it to the previous value once |
+// the saver goes out of scope. |
+class ScopedBoolSaver { |
+ public: |
+ ScopedBoolSaver(bool* var, bool new_val) : var_(var), old_val_(*var) { |
+ *var_ = new_val; |
+ } |
+ |
+ ~ScopedBoolSaver() { *var_ = old_val_; } |
-QuicChromiumClientStream::Handle::Handle(QuicChromiumClientStream* stream, |
- Delegate* delegate) |
+ private: |
+ bool* var_; |
+ bool old_val_; |
+}; |
+} // namespace |
+ |
+QuicChromiumClientStream::Handle::Handle(QuicChromiumClientStream* stream) |
: stream_(stream), |
- delegate_(delegate), |
+ may_invoke_callbacks_(true), |
read_headers_buffer_(nullptr), |
- read_body_buffer_len_(0) { |
+ read_body_buffer_len_(0), |
+ net_error_(ERR_UNEXPECTED), |
+ weak_factory_(this) { |
SaveState(); |
} |
@@ -39,10 +56,6 @@ QuicChromiumClientStream::Handle::~Handle() { |
} |
} |
-void QuicChromiumClientStream::Handle::ClearDelegate() { |
- delegate_ = nullptr; |
-} |
- |
void QuicChromiumClientStream::Handle::OnInitialHeadersAvailable() { |
if (!read_headers_callback_) |
return; // Wait for ReadInitialHeaders to be called. |
@@ -51,7 +64,7 @@ void QuicChromiumClientStream::Handle::OnInitialHeadersAvailable() { |
if (!stream_->DeliverInitialHeaders(read_headers_buffer_, &rv)) |
rv = ERR_QUIC_PROTOCOL_ERROR; |
- ResetAndReturn(&read_headers_callback_).Run(rv); |
+ ResetAndRun(&read_headers_callback_, rv); |
} |
void QuicChromiumClientStream::Handle::OnTrailingHeadersAvailable() { |
@@ -62,7 +75,7 @@ void QuicChromiumClientStream::Handle::OnTrailingHeadersAvailable() { |
if (!stream_->DeliverTrailingHeaders(read_headers_buffer_, &rv)) |
rv = ERR_QUIC_PROTOCOL_ERROR; |
- ResetAndReturn(&read_headers_callback_).Run(rv); |
+ ResetAndRun(&read_headers_callback_, rv); |
} |
void QuicChromiumClientStream::Handle::OnDataAvailable() { |
@@ -75,50 +88,70 @@ void QuicChromiumClientStream::Handle::OnDataAvailable() { |
read_body_buffer_ = nullptr; |
read_body_buffer_len_ = 0; |
- ResetAndReturn(&read_body_callback_).Run(rv); |
+ ResetAndRun(&read_body_callback_, rv); |
} |
void QuicChromiumClientStream::Handle::OnCanWrite() { |
if (!write_callback_) |
return; |
- base::ResetAndReturn(&write_callback_).Run(OK); |
+ ResetAndRun(&write_callback_, OK); |
} |
void QuicChromiumClientStream::Handle::OnClose() { |
- if (stream_) |
- SaveState(); |
- stream_ = nullptr; |
- if (delegate_) { |
- auto* delegate = delegate_; |
- delegate_ = nullptr; |
- delegate->OnClose(); |
+ if (net_error_ == ERR_UNEXPECTED) { |
+ if (stream_error() == QUIC_STREAM_NO_ERROR && |
+ connection_error() == QUIC_NO_ERROR && fin_sent() && fin_received()) { |
+ net_error_ = ERR_CONNECTION_CLOSED; |
+ } else { |
+ net_error_ = ERR_QUIC_PROTOCOL_ERROR; |
+ } |
} |
+ OnError(net_error_); |
} |
void QuicChromiumClientStream::Handle::OnError(int error) { |
+ net_error_ = error; |
if (stream_) |
SaveState(); |
stream_ = nullptr; |
- if (delegate_) { |
- auto* delegate = delegate_; |
- delegate_ = nullptr; |
- delegate->OnError(error); |
+ |
+ // Post a task to invoke the callbacks to ensure that there is no reentrancy. |
+ // A ScopedPacketBundler might cause an error which closes the stream under |
+ // the call stack of the owner of the handle. |
+ base::ThreadTaskRunnerHandle::Get()->PostTask( |
+ FROM_HERE, |
+ base::Bind(&QuicChromiumClientStream::Handle::InvokeCallbacksOnClose, |
+ weak_factory_.GetWeakPtr(), error)); |
+} |
+ |
+void QuicChromiumClientStream::Handle::InvokeCallbacksOnClose(int error) { |
+ // Invoking a callback may cause |this| to be deleted. If this happens, no |
+ // more callbacks should be invoked. Guard against this by holding a WeakPtr |
+ // to |this| and ensuring it's still valid. |
+ auto guard(weak_factory_.GetWeakPtr()); |
+ for (auto* callback : |
+ {&read_headers_callback_, &read_body_callback_, &write_callback_}) { |
+ if (*callback) |
+ ResetAndRun(callback, error); |
+ if (!guard.get()) |
+ return; |
} |
} |
int QuicChromiumClientStream::Handle::ReadInitialHeaders( |
SpdyHeaderBlock* header_block, |
const CompletionCallback& callback) { |
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
if (!stream_) |
- return ERR_CONNECTION_CLOSED; |
+ return net_error_; |
int frame_len = 0; |
if (stream_->DeliverInitialHeaders(header_block, &frame_len)) |
return frame_len; |
read_headers_buffer_ = header_block; |
- read_headers_callback_ = callback; |
+ SetCallback(callback, &read_headers_callback_); |
return ERR_IO_PENDING; |
} |
@@ -126,14 +159,18 @@ int QuicChromiumClientStream::Handle::ReadBody( |
IOBuffer* buffer, |
int buffer_len, |
const CompletionCallback& callback) { |
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
+ if (IsDoneReading()) |
+ return OK; |
+ |
if (!stream_) |
- return ERR_CONNECTION_CLOSED; |
+ return net_error_; |
int rv = stream_->Read(buffer, buffer_len); |
if (rv != ERR_IO_PENDING) |
return rv; |
- read_body_callback_ = callback; |
+ SetCallback(callback, &read_body_callback_); |
read_body_buffer_ = buffer; |
read_body_buffer_len_ = buffer_len; |
return ERR_IO_PENDING; |
@@ -142,40 +179,42 @@ int QuicChromiumClientStream::Handle::ReadBody( |
int QuicChromiumClientStream::Handle::ReadTrailingHeaders( |
SpdyHeaderBlock* header_block, |
const CompletionCallback& callback) { |
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
if (!stream_) |
- return ERR_CONNECTION_CLOSED; |
+ return net_error_; |
int frame_len = 0; |
if (stream_->DeliverTrailingHeaders(header_block, &frame_len)) |
return frame_len; |
read_headers_buffer_ = header_block; |
- read_headers_callback_ = callback; |
+ SetCallback(callback, &read_headers_callback_); |
return ERR_IO_PENDING; |
} |
-size_t QuicChromiumClientStream::Handle::WriteHeaders( |
+int QuicChromiumClientStream::Handle::WriteHeaders( |
SpdyHeaderBlock header_block, |
bool fin, |
QuicReferenceCountedPointer<QuicAckListenerInterface> |
ack_notifier_delegate) { |
if (!stream_) |
return 0; |
- return stream_->WriteHeaders(std::move(header_block), fin, |
- ack_notifier_delegate); |
+ return HandleIOComplete(stream_->WriteHeaders(std::move(header_block), fin, |
+ ack_notifier_delegate)); |
} |
int QuicChromiumClientStream::Handle::WriteStreamData( |
base::StringPiece data, |
bool fin, |
const CompletionCallback& callback) { |
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
if (!stream_) |
- return ERR_CONNECTION_CLOSED; |
+ return net_error_; |
if (stream_->WriteStreamData(data, fin)) |
- return OK; |
+ return HandleIOComplete(OK); |
- write_callback_ = callback; |
+ SetCallback(callback, &write_callback_); |
return ERR_IO_PENDING; |
} |
@@ -184,23 +223,25 @@ int QuicChromiumClientStream::Handle::WritevStreamData( |
const std::vector<int>& lengths, |
bool fin, |
const CompletionCallback& callback) { |
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false); |
if (!stream_) |
- return ERR_CONNECTION_CLOSED; |
+ return net_error_; |
if (stream_->WritevStreamData(buffers, lengths, fin)) |
- return OK; |
+ return HandleIOComplete(OK); |
- write_callback_ = callback; |
+ SetCallback(callback, &write_callback_); |
return ERR_IO_PENDING; |
} |
int QuicChromiumClientStream::Handle::Read(IOBuffer* buf, int buf_len) { |
if (!stream_) |
- return ERR_CONNECTION_CLOSED; |
+ return net_error_; |
return stream_->Read(buf, buf_len); |
} |
void QuicChromiumClientStream::Handle::OnFinRead() { |
+ read_headers_callback_.Reset(); |
if (stream_) |
stream_->OnFinRead(); |
} |
@@ -300,11 +341,6 @@ bool QuicChromiumClientStream::Handle::can_migrate() { |
return stream_->can_migrate(); |
} |
-QuicChromiumClientStream::Delegate* |
-QuicChromiumClientStream::Handle::GetDelegate() { |
- return delegate_; |
-} |
- |
void QuicChromiumClientStream::Handle::SaveState() { |
DCHECK(stream_); |
fin_sent_ = stream_->fin_sent(); |
@@ -320,6 +356,37 @@ void QuicChromiumClientStream::Handle::SaveState() { |
priority_ = stream_->priority(); |
} |
+void QuicChromiumClientStream::Handle::SetCallback( |
+ const CompletionCallback& new_callback, |
+ CompletionCallback* callback) { |
+ // TODO(rch): Convert this to a DCHECK once we ensure the API is stable and |
+ // bug free. |
+ CHECK(!may_invoke_callbacks_); |
+ *callback = new_callback; |
+} |
+ |
+void QuicChromiumClientStream::Handle::ResetAndRun(CompletionCallback* callback, |
+ int rv) { |
+ // TODO(rch): Convert this to a DCHECK once we ensure the API is stable and |
+ // bug free. |
+ CHECK(may_invoke_callbacks_); |
+ ResetAndReturn(callback).Run(rv); |
+} |
+ |
+int QuicChromiumClientStream::Handle::HandleIOComplete(int rv) { |
+ // If |stream_| is still valid the stream has not been closed. If the stream |
+ // has not been closed, then just return |rv|. |
+ if (rv < 0 || stream_) |
+ return rv; |
+ |
+ if (stream_error_ == QUIC_STREAM_NO_ERROR && |
+ connection_error_ == QUIC_NO_ERROR && fin_sent_ && fin_received_) { |
+ return rv; |
+ } |
+ |
+ return net_error_; |
+} |
+ |
QuicChromiumClientStream::QuicChromiumClientStream( |
QuicStreamId id, |
QuicClientSessionBase* session, |
@@ -479,11 +546,10 @@ bool QuicChromiumClientStream::WritevStreamData( |
} |
std::unique_ptr<QuicChromiumClientStream::Handle> |
-QuicChromiumClientStream::CreateHandle( |
- QuicChromiumClientStream::Delegate* delegate) { |
+QuicChromiumClientStream::CreateHandle() { |
DCHECK(!handle_); |
auto handle = std::unique_ptr<QuicChromiumClientStream::Handle>( |
- new QuicChromiumClientStream::Handle(this, delegate)); |
+ new QuicChromiumClientStream::Handle(this)); |
handle_ = handle.get(); |
// Should this perhaps be via PostTask to make reasoning simpler? |
@@ -552,7 +618,7 @@ void QuicChromiumClientStream::NotifyHandleOfTrailingHeadersAvailable() { |
return; |
DCHECK(headers_delivered_); |
- // Post an async task to notify delegate of the FIN flag. |
+ // Post an async task to notify handle of the FIN flag. |
NotifyHandleOfDataAvailableLater(); |
handle_->OnTrailingHeadersAvailable(); |
} |