Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(265)

Unified Diff: net/quic/chromium/quic_chromium_client_stream.cc

Issue 2908243002: Remove QuicChromiumClientStream::Delegate in favor of async methods. (Closed)
Patch Set: Rebase Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/quic/chromium/quic_chromium_client_stream.cc
diff --git a/net/quic/chromium/quic_chromium_client_stream.cc b/net/quic/chromium/quic_chromium_client_stream.cc
index 0014b40ee101d7408e522d25357722a4b40f5214..1bdb92d6fa71ee3faa022a0bf185e52988f93297 100644
--- a/net/quic/chromium/quic_chromium_client_stream.cc
+++ b/net/quic/chromium/quic_chromium_client_stream.cc
@@ -20,13 +20,30 @@
#include "net/quic/core/spdy_utils.h"
namespace net {
+namespace {
+// Sets a boolean to a value, and restores it to the previous value once
+// the saver goes out of scope.
+class ScopedBoolSaver {
+ public:
+ ScopedBoolSaver(bool* var, bool new_val) : var_(var), old_val_(*var) {
+ *var_ = new_val;
+ }
+
+ ~ScopedBoolSaver() { *var_ = old_val_; }
-QuicChromiumClientStream::Handle::Handle(QuicChromiumClientStream* stream,
- Delegate* delegate)
+ private:
+ bool* var_;
+ bool old_val_;
+};
+} // namespace
+
+QuicChromiumClientStream::Handle::Handle(QuicChromiumClientStream* stream)
: stream_(stream),
- delegate_(delegate),
+ may_invoke_callbacks_(true),
read_headers_buffer_(nullptr),
- read_body_buffer_len_(0) {
+ read_body_buffer_len_(0),
+ net_error_(ERR_UNEXPECTED),
+ weak_factory_(this) {
SaveState();
}
@@ -39,10 +56,6 @@ QuicChromiumClientStream::Handle::~Handle() {
}
}
-void QuicChromiumClientStream::Handle::ClearDelegate() {
- delegate_ = nullptr;
-}
-
void QuicChromiumClientStream::Handle::OnInitialHeadersAvailable() {
if (!read_headers_callback_)
return; // Wait for ReadInitialHeaders to be called.
@@ -51,7 +64,7 @@ void QuicChromiumClientStream::Handle::OnInitialHeadersAvailable() {
if (!stream_->DeliverInitialHeaders(read_headers_buffer_, &rv))
rv = ERR_QUIC_PROTOCOL_ERROR;
- ResetAndReturn(&read_headers_callback_).Run(rv);
+ ResetAndRun(&read_headers_callback_, rv);
}
void QuicChromiumClientStream::Handle::OnTrailingHeadersAvailable() {
@@ -62,7 +75,7 @@ void QuicChromiumClientStream::Handle::OnTrailingHeadersAvailable() {
if (!stream_->DeliverTrailingHeaders(read_headers_buffer_, &rv))
rv = ERR_QUIC_PROTOCOL_ERROR;
- ResetAndReturn(&read_headers_callback_).Run(rv);
+ ResetAndRun(&read_headers_callback_, rv);
}
void QuicChromiumClientStream::Handle::OnDataAvailable() {
@@ -75,50 +88,70 @@ void QuicChromiumClientStream::Handle::OnDataAvailable() {
read_body_buffer_ = nullptr;
read_body_buffer_len_ = 0;
- ResetAndReturn(&read_body_callback_).Run(rv);
+ ResetAndRun(&read_body_callback_, rv);
}
void QuicChromiumClientStream::Handle::OnCanWrite() {
if (!write_callback_)
return;
- base::ResetAndReturn(&write_callback_).Run(OK);
+ ResetAndRun(&write_callback_, OK);
}
void QuicChromiumClientStream::Handle::OnClose() {
- if (stream_)
- SaveState();
- stream_ = nullptr;
- if (delegate_) {
- auto* delegate = delegate_;
- delegate_ = nullptr;
- delegate->OnClose();
+ if (net_error_ == ERR_UNEXPECTED) {
+ if (stream_error() == QUIC_STREAM_NO_ERROR &&
+ connection_error() == QUIC_NO_ERROR && fin_sent() && fin_received()) {
+ net_error_ = ERR_CONNECTION_CLOSED;
+ } else {
+ net_error_ = ERR_QUIC_PROTOCOL_ERROR;
+ }
}
+ OnError(net_error_);
}
void QuicChromiumClientStream::Handle::OnError(int error) {
xunjieli 2017/06/01 15:30:33 Why is this named "OnError"? It's called when the
Ryan Hamilton 2017/06/01 23:20:29 As a public method, it's called from QuicChromiumC
+ net_error_ = error;
if (stream_)
SaveState();
stream_ = nullptr;
- if (delegate_) {
- auto* delegate = delegate_;
- delegate_ = nullptr;
- delegate->OnError(error);
+
+ // Post a task to invoke the callbacks to ensure that there is no reentrancy.
+ // A ScopedPacketBundler might cause an error which closes the stream under
+ // the call stack of the owner of the handle.
+ base::ThreadTaskRunnerHandle::Get()->PostTask(
+ FROM_HERE,
+ base::Bind(&QuicChromiumClientStream::Handle::InvokeCallbacksOnError,
+ weak_factory_.GetWeakPtr(), error));
+}
+
+void QuicChromiumClientStream::Handle::InvokeCallbacksOnError(int error) {
xunjieli 2017/06/01 15:30:33 nit: maybe s/InvokeCallbacksOnError/InvokeCallback
Ryan Hamilton 2017/06/01 23:20:29 Done.
+ // Invoking a callback may cause |this| to be deleted. If this happens, no
+ // more callbacks should be invoked. Guard against this by holding a WeakPtr
+ // to |this| and ensuring it's still valid.
+ auto guard(weak_factory_.GetWeakPtr());
+ for (auto* callback :
+ {&read_headers_callback_, &read_body_callback_, &write_callback_}) {
+ if (*callback)
xunjieli 2017/06/01 15:30:33 should this be if (!callback->is_null())?
Ryan Hamilton 2017/06/01 23:20:29 Callback has a bool operator() which is the same a
xunjieli 2017/06/02 12:43:41 Acknowledged.
+ ResetAndRun(callback, error);
+ if (!guard.get())
+ return;
}
}
int QuicChromiumClientStream::Handle::ReadInitialHeaders(
SpdyHeaderBlock* header_block,
const CompletionCallback& callback) {
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false);
if (!stream_)
- return ERR_CONNECTION_CLOSED;
+ return net_error_;
int frame_len = 0;
if (stream_->DeliverInitialHeaders(header_block, &frame_len))
return frame_len;
read_headers_buffer_ = header_block;
- read_headers_callback_ = callback;
+ SetCallback(callback, &read_headers_callback_);
return ERR_IO_PENDING;
}
@@ -126,14 +159,18 @@ int QuicChromiumClientStream::Handle::ReadBody(
IOBuffer* buffer,
int buffer_len,
const CompletionCallback& callback) {
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false);
+ if (IsDoneReading())
+ return OK;
+
if (!stream_)
- return ERR_CONNECTION_CLOSED;
+ return net_error_;
int rv = stream_->Read(buffer, buffer_len);
if (rv != ERR_IO_PENDING)
return rv;
- read_body_callback_ = callback;
+ SetCallback(callback, &read_body_callback_);
read_body_buffer_ = buffer;
read_body_buffer_len_ = buffer_len;
return ERR_IO_PENDING;
@@ -142,40 +179,42 @@ int QuicChromiumClientStream::Handle::ReadBody(
int QuicChromiumClientStream::Handle::ReadTrailingHeaders(
SpdyHeaderBlock* header_block,
const CompletionCallback& callback) {
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false);
if (!stream_)
- return ERR_CONNECTION_CLOSED;
+ return net_error_;
int frame_len = 0;
if (stream_->DeliverTrailingHeaders(header_block, &frame_len))
return frame_len;
read_headers_buffer_ = header_block;
- read_headers_callback_ = callback;
+ SetCallback(callback, &read_headers_callback_);
return ERR_IO_PENDING;
}
-size_t QuicChromiumClientStream::Handle::WriteHeaders(
+int QuicChromiumClientStream::Handle::WriteHeaders(
SpdyHeaderBlock header_block,
bool fin,
QuicReferenceCountedPointer<QuicAckListenerInterface>
ack_notifier_delegate) {
if (!stream_)
return 0;
- return stream_->WriteHeaders(std::move(header_block), fin,
- ack_notifier_delegate);
+ return HandleIOComplete(stream_->WriteHeaders(std::move(header_block), fin,
xunjieli 2017/06/01 15:30:33 Sorry, I didn't make myself clear last time. I thi
Ryan Hamilton 2017/06/01 23:20:29 well, that's only a bug if the size_t is larger th
xunjieli 2017/06/02 12:43:41 Acknowledged.
+ ack_notifier_delegate));
}
int QuicChromiumClientStream::Handle::WriteStreamData(
base::StringPiece data,
bool fin,
const CompletionCallback& callback) {
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false);
if (!stream_)
- return ERR_CONNECTION_CLOSED;
+ return net_error_;
if (stream_->WriteStreamData(data, fin))
- return OK;
+ return HandleIOComplete(OK);
- write_callback_ = callback;
+ SetCallback(callback, &write_callback_);
return ERR_IO_PENDING;
}
@@ -184,23 +223,25 @@ int QuicChromiumClientStream::Handle::WritevStreamData(
const std::vector<int>& lengths,
bool fin,
const CompletionCallback& callback) {
+ ScopedBoolSaver saver(&may_invoke_callbacks_, false);
if (!stream_)
- return ERR_CONNECTION_CLOSED;
+ return net_error_;
if (stream_->WritevStreamData(buffers, lengths, fin))
- return OK;
+ return HandleIOComplete(OK);
- write_callback_ = callback;
+ SetCallback(callback, &write_callback_);
return ERR_IO_PENDING;
}
int QuicChromiumClientStream::Handle::Read(IOBuffer* buf, int buf_len) {
if (!stream_)
- return ERR_CONNECTION_CLOSED;
+ return net_error_;
return stream_->Read(buf, buf_len);
}
void QuicChromiumClientStream::Handle::OnFinRead() {
+ read_headers_callback_.Reset();
if (stream_)
stream_->OnFinRead();
}
@@ -300,11 +341,6 @@ bool QuicChromiumClientStream::Handle::can_migrate() {
return stream_->can_migrate();
}
-QuicChromiumClientStream::Delegate*
-QuicChromiumClientStream::Handle::GetDelegate() {
- return delegate_;
-}
-
void QuicChromiumClientStream::Handle::SaveState() {
DCHECK(stream_);
fin_sent_ = stream_->fin_sent();
@@ -320,6 +356,37 @@ void QuicChromiumClientStream::Handle::SaveState() {
priority_ = stream_->priority();
}
+void QuicChromiumClientStream::Handle::SetCallback(
+ CompletionCallback new_callback,
xunjieli 2017/06/01 15:30:33 const CompletionCallback& ?
Ryan Hamilton 2017/06/01 23:20:29 Good point! Done.
+ CompletionCallback* callback) {
+ // TODO(rch): Convert this to a DCHECK once we ensure the API is stable and
+ // bug free.
+ CHECK(!may_invoke_callbacks_);
+ *callback = new_callback;
+}
+
+void QuicChromiumClientStream::Handle::ResetAndRun(CompletionCallback* callback,
+ int rv) {
+ // TODO(rch): Convert this to a DCHECK once we ensure the API is stable and
+ // bug free.
+ CHECK(may_invoke_callbacks_);
+ ResetAndReturn(callback).Run(rv);
+}
+
+int QuicChromiumClientStream::Handle::HandleIOComplete(int rv) {
+ // If |stream_| is still valid the stream has not been closed. If the stream
+ // has not been closed, then just return |rv|.
+ if (rv < 0 || stream_)
+ return rv;
+
+ if (stream_error_ == QUIC_STREAM_NO_ERROR &&
+ connection_error_ == QUIC_NO_ERROR && fin_sent_ && fin_received_) {
+ return rv;
+ }
+
+ return net_error_;
+}
+
QuicChromiumClientStream::QuicChromiumClientStream(
QuicStreamId id,
QuicClientSessionBase* session,
@@ -479,11 +546,10 @@ bool QuicChromiumClientStream::WritevStreamData(
}
std::unique_ptr<QuicChromiumClientStream::Handle>
-QuicChromiumClientStream::CreateHandle(
- QuicChromiumClientStream::Delegate* delegate) {
+QuicChromiumClientStream::CreateHandle() {
DCHECK(!handle_);
auto handle = std::unique_ptr<QuicChromiumClientStream::Handle>(
- new QuicChromiumClientStream::Handle(this, delegate));
+ new QuicChromiumClientStream::Handle(this));
handle_ = handle.get();
// Should this perhaps be via PostTask to make reasoning simpler?
@@ -552,7 +618,7 @@ void QuicChromiumClientStream::NotifyHandleOfTrailingHeadersAvailable() {
return;
DCHECK(headers_delivered_);
- // Post an async task to notify delegate of the FIN flag.
+ // Post an async task to notify handle of the FIN flag.
NotifyHandleOfDataAvailableLater();
handle_->OnTrailingHeadersAvailable();
}

Powered by Google App Engine
This is Rietveld 408576698