Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(208)

Unified Diff: net/spdy/spdy_session.cc

Issue 13009012: [SPDY] Refactor SpdySession's write queue (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Fix use-after-free (crbug.com/230259) Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/spdy/spdy_session.cc
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 9943190e72dd5117fcde19b242da7d05b02941c6..e34c16f78eb427c8ce48c17e306531cc4d96ca53 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -32,6 +32,7 @@
#include "net/http/http_server_properties.h"
#include "net/spdy/spdy_credential_builder.h"
#include "net/spdy/spdy_frame_builder.h"
+#include "net/spdy/spdy_frame_producer.h"
#include "net/spdy/spdy_http_utils.h"
#include "net/spdy/spdy_protocol.h"
#include "net/spdy/spdy_session_pool.h"
@@ -286,28 +287,6 @@ void SpdyStreamRequest::Reset() {
callback_.Reset();
}
-// static
-void SpdySession::SpdyIOBufferProducer::ActivateStream(
- SpdySession* spdy_session,
- SpdyStream* spdy_stream) {
- spdy_session->ActivateStream(spdy_stream);
-}
-
-// static
-SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
- SpdyFrame* frame,
- RequestPriority priority,
- SpdyStream* stream) {
- size_t size = frame->size();
- DCHECK_GT(size, 0u);
-
- // TODO(mbelshe): We have too much copying of data here.
- IOBufferWithSize* buffer = new IOBufferWithSize(size);
- memcpy(buffer->data(), frame->data(), size);
-
- return new SpdyIOBuffer(buffer, size, priority, stream);
-}
-
SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
SpdySessionPool* spdy_session_pool,
HttpServerProperties* http_server_properties,
@@ -404,7 +383,7 @@ SpdySession::~SpdySession() {
DCHECK_EQ(0u, num_active_streams());
DCHECK_EQ(0u, num_unclaimed_pushed_streams());
- for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) {
+ for (int i = 0; i < NUM_PRIORITIES; ++i) {
DCHECK(pending_create_stream_queues_[i].empty());
}
DCHECK(pending_stream_request_completions_.empty());
@@ -499,13 +478,6 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
ssl_info.cert->VerifyNameMatch(domain);
}
-void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
- SpdyIOBufferProducer* producer) {
- write_queue_.push(producer);
- stream_producers_[producer] = stream;
- WriteSocketLater();
-}
-
int SpdySession::GetPushStream(
const GURL& url,
scoped_refptr<SpdyStream>* stream,
@@ -659,7 +631,13 @@ int SpdySession::GetProtocolVersion() const {
return buffered_spdy_framer_->protocol_version();
}
-SpdyFrame* SpdySession::CreateSynStream(
+void SpdySession::EnqueueStreamWrite(
+ SpdyStream* stream,
+ scoped_ptr<SpdyFrameProducer> producer) {
+ EnqueueWrite(stream->priority(), producer.Pass(), stream);
+}
+
+scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
@@ -691,15 +669,16 @@ SpdyFrame* SpdySession::CreateSynStream(
stream_id, 0));
}
- return syn_frame.release();
+ return syn_frame.Pass();
}
-SpdyFrame* SpdySession::CreateCredentialFrame(
+int SpdySession::CreateCredentialFrame(
const std::string& origin,
SSLClientCertType type,
const std::string& key,
const std::string& cert,
- RequestPriority priority) {
+ RequestPriority priority,
+ scoped_ptr<SpdyFrame>* credential_frame) {
DCHECK(is_secure_);
SSLClientSocket* ssl_socket = GetSSLClientSocket();
DCHECK(ssl_socket);
@@ -711,12 +690,12 @@ SpdyFrame* SpdySession::CreateCredentialFrame(
size_t slot = credential_state_.SetHasCredential(GURL(origin));
int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot,
&credential);
- DCHECK_EQ(OK, rv);
+ DCHECK_NE(rv, ERR_IO_PENDING);
if (rv != OK)
- return NULL;
+ return rv;
DCHECK(buffered_spdy_framer_.get());
- scoped_ptr<SpdyFrame> credential_frame(
+ credential_frame->reset(
buffered_spdy_framer_->CreateCredentialFrame(credential));
if (net_log().IsLoggingAllEvents()) {
@@ -724,10 +703,10 @@ SpdyFrame* SpdySession::CreateCredentialFrame(
NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
}
- return credential_frame.release();
+ return OK;
}
-SpdyFrame* SpdySession::CreateHeadersFrame(
+scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame(
SpdyStreamId stream_id,
const SpdyHeaderBlock& headers,
SpdyControlFlags flags) {
@@ -749,12 +728,13 @@ SpdyFrame* SpdySession::CreateHeadersFrame(
&headers, fin, /*unidirectional=*/false,
stream_id, 0));
}
- return frame.release();
+ return frame.Pass();
}
-SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
- net::IOBuffer* data, int len,
- SpdyDataFlags flags) {
+scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
+ net::IOBuffer* data,
+ int len,
+ SpdyDataFlags flags) {
// Find our stream
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
@@ -762,7 +742,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
if (len < 0) {
NOTREACHED();
- return NULL;
+ return scoped_ptr<SpdyFrame>();
}
if (len > kMaxSpdyFrameChunkSize) {
@@ -785,7 +765,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return NULL;
+ return scoped_ptr<SpdyFrame>();
}
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
effective_window_size =
@@ -797,7 +777,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return NULL;
+ return scoped_ptr<SpdyFrame>();
}
}
@@ -828,7 +808,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), static_cast<uint32>(len), flags));
- return frame.release();
+ return frame.Pass();
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
@@ -863,7 +843,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
priority = stream->priority();
}
- QueueFrame(rst_frame.release(), priority);
+ EnqueueSessionWrite(priority, rst_frame.Pass());
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
@@ -930,57 +910,59 @@ void SpdySession::OnReadComplete(int bytes_read) {
}
void SpdySession::OnWriteComplete(int result) {
+ // Releasing the in-flight write can have a side-effect of dropping
+ // the last reference to |this|. Hold a reference through this
+ // function.
+ scoped_refptr<SpdySession> self(this);
+
DCHECK(write_pending_);
- DCHECK(in_flight_write_.size());
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
last_activity_time_ = base::TimeTicks::Now();
write_pending_ = false;
- scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
-
- if (result >= 0) {
- // It should not be possible to have written more bytes than our
- // in_flight_write_.
- DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
-
- in_flight_write_.buffer()->DidConsume(result);
-
- // We only notify the stream when we've fully written the pending frame.
- if (!in_flight_write_.buffer()->BytesRemaining()) {
- if (stream) {
- // Report the number of bytes written to the caller, but exclude the
- // frame size overhead. NOTE: if this frame was compressed the
- // reported bytes written is the compressed size, not the original
- // size.
- if (result > 0) {
- result = in_flight_write_.buffer()->size();
- DCHECK_GE(result,
- static_cast<int>(
- buffered_spdy_framer_->GetControlFrameHeaderSize()));
- result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
- }
-
- // It is possible that the stream was cancelled while we were writing
- // to the socket.
- if (!stream->cancelled())
- stream->OnWriteComplete(result);
- }
+ if (result < 0) {
+ in_flight_write_.Release();
+ CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
+ return;
+ }
- // Cleanup the write which just completed.
- in_flight_write_.release();
- }
+ // It should not be possible to have written more bytes than our
+ // in_flight_write_.
+ DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
- // Write more data. We're already in a continuation, so we can
- // go ahead and write it immediately (without going back to the
- // message loop).
- WriteSocketLater();
- } else {
- in_flight_write_.release();
+ in_flight_write_.buffer()->DidConsume(result);
- // The stream is now errored. Close it down.
- CloseSessionOnError(
- static_cast<net::Error>(result), true, "The stream has errored.");
+ // We only notify the stream when we've fully written the pending frame.
+ if (in_flight_write_.buffer()->BytesRemaining() == 0) {
+ DCHECK_GT(result, 0);
+
+ scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
+
+ // It is possible that the stream was cancelled while we were writing
+ // to the socket.
+ if (stream && !stream->cancelled()) {
+ // Report the number of bytes written to the caller, but exclude the
+ // frame size overhead. NOTE: if this frame was compressed the
+ // reported bytes written is the compressed size, not the original
+ // size.
+ result = in_flight_write_.buffer()->size();
+ DCHECK_GE(result,
+ static_cast<int>(
+ buffered_spdy_framer_->GetControlFrameHeaderSize()));
+ result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
+
+ stream->OnWriteComplete(result);
+ }
+
+ // Cleanup the write which just completed.
+ in_flight_write_.Release();
}
+
+ // Write more data. We're already in a continuation, so we can go
+ // ahead and write it immediately (without going back to the message
+ // loop).
+ WriteSocketLater();
}
net::Error SpdySession::ReadSocket() {
@@ -997,7 +979,7 @@ net::Error SpdySession::ReadSocket() {
int bytes_read = connection_->socket()->Read(
read_buffer_.get(),
kReadBufferSize,
- base::Bind(&SpdySession::OnReadComplete, base::Unretained(this)));
+ base::Bind(&SpdySession::OnReadComplete, weak_factory_.GetWeakPtr()));
switch (bytes_read) {
case 0:
// Socket is closed!
@@ -1051,24 +1033,39 @@ void SpdySession::WriteSocket() {
// Loop sending frames until we've sent everything or until the write
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
- while (in_flight_write_.buffer() || !write_queue_.empty()) {
- if (!in_flight_write_.buffer()) {
- // Grab the next SpdyBuffer to send.
- scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
- write_queue_.pop();
- scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
- stream_producers_.erase(producer.get());
+ while (true) {
+ if (in_flight_write_.buffer()) {
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
+ } else {
+ // Grab the next frame to send.
+ scoped_ptr<SpdyFrameProducer> producer;
+ scoped_refptr<SpdyStream> stream;
+ if (!write_queue_.Dequeue(&producer, &stream))
+ break;
+
// It is possible that a stream had data to write, but a
// WINDOW_UPDATE frame has been received which made that
// stream no longer writable.
// TODO(rch): consider handling that case by removing the
// stream from the writable queue?
- if (buffer == NULL)
+ if (stream.get() && stream->cancelled())
continue;
- in_flight_write_ = *buffer;
- } else {
- DCHECK(in_flight_write_.buffer()->BytesRemaining());
+ if (stream.get() && stream->stream_id() == 0)
+ ActivateStream(stream);
+
+ scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
+ if (!frame) {
+ NOTREACHED();
+ continue;
+ }
+ DCHECK_GT(frame->size(), 0u);
+
+ // TODO(mbelshe): We have too much copying of data here.
+ scoped_refptr<IOBufferWithSize> buffer =
+ new IOBufferWithSize(frame->size());
+ memcpy(buffer->data(), frame->data(), frame->size());
+ in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
}
write_pending_ = true;
@@ -1126,12 +1123,7 @@ void SpdySession::CloseAllStreams(net::Error status) {
stream->OnClose(status);
}
- // We also need to drain the queue.
- while (!write_queue_.empty()) {
- scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
- write_queue_.pop();
- stream_producers_.erase(producer.get());
- }
+ write_queue_.Clear();
}
void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
@@ -1257,33 +1249,18 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
return rv;
}
-class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
- public:
- SimpleSpdyIOBufferProducer(SpdyFrame* frame,
- RequestPriority priority)
- : frame_(frame),
- priority_(priority) {
- }
-
- virtual RequestPriority GetPriority() const OVERRIDE {
- return priority_;
- }
-
- virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE {
- return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
- frame_.get(), priority_, NULL);
- }
-
- private:
- scoped_ptr<SpdyFrame> frame_;
- RequestPriority priority_;
-};
+void SpdySession::EnqueueSessionWrite(RequestPriority priority,
+ scoped_ptr<SpdyFrame> frame) {
+ EnqueueWrite(
+ priority,
+ scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())),
+ NULL);
+}
-void SpdySession::QueueFrame(SpdyFrame* frame,
- RequestPriority priority) {
- SimpleSpdyIOBufferProducer* producer =
- new SimpleSpdyIOBufferProducer(frame, priority);
- write_queue_.push(producer);
+void SpdySession::EnqueueWrite(RequestPriority priority,
+ scoped_ptr<SpdyFrameProducer> producer,
+ const scoped_refptr<SpdyStream>& stream) {
+ write_queue_.Enqueue(priority, producer.Pass(), stream);
WriteSocketLater();
}
@@ -1303,8 +1280,7 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
// the stream in the unclaimed_pushed_streams_ list. However, if
// the stream is errored out, clean it up entirely.
if (status != OK) {
- PushedStreamMap::iterator it;
- for (it = unclaimed_pushed_streams_.begin();
+ for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
it != unclaimed_pushed_streams_.end(); ++it) {
scoped_refptr<SpdyStream> curr = it->second.first;
if (id == curr->stream_id()) {
@@ -1315,29 +1291,17 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
}
// The stream might have been deleted.
- ActiveStreamMap::iterator it2 = active_streams_.find(id);
- if (it2 == active_streams_.end())
+ ActiveStreamMap::iterator it = active_streams_.find(id);
+ if (it == active_streams_.end())
return;
- // Possibly remove from the write queue.
- WriteQueue old = write_queue_;
- write_queue_ = WriteQueue();
- while (!old.empty()) {
- scoped_ptr<SpdyIOBufferProducer> producer(old.top());
- old.pop();
- StreamProducerMap::iterator it = stream_producers_.find(producer.get());
- if (it == stream_producers_.end() || it->second->stream_id() != id) {
- write_queue_.push(producer.release());
- } else {
- stream_producers_.erase(producer.get());
- producer.reset(NULL);
- }
- }
+ const scoped_refptr<SpdyStream> stream(it->second);
+ active_streams_.erase(it);
+ DCHECK(stream);
+
+ write_queue_.RemovePendingWritesForStream(stream);
// If this is an active stream, call the callback.
- const scoped_refptr<SpdyStream> stream(it2->second);
- active_streams_.erase(it2);
- DCHECK(stream);
stream->OnClose(status);
ProcessPendingStreamRequests();
}
@@ -1921,7 +1885,7 @@ void SpdySession::SendSettings(const SettingsMap& settings) {
scoped_ptr<SpdyFrame> settings_frame(
buffered_spdy_framer_->CreateSettings(settings));
sent_settings_ = true;
- QueueFrame(settings_frame.release(), HIGHEST);
+ EnqueueSessionWrite(HIGHEST, settings_frame.Pass());
}
void SpdySession::HandleSetting(uint32 id, uint32 value) {
@@ -2009,14 +1973,14 @@ void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyFrame> window_update_frame(
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
- QueueFrame(window_update_frame.release(), priority);
+ EnqueueSessionWrite(priority, window_update_frame.Pass());
}
void SpdySession::WritePingFrame(uint32 unique_id) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyFrame> ping_frame(
buffered_spdy_framer_->CreatePingFrame(unique_id));
- QueueFrame(ping_frame.release(), HIGHEST);
+ EnqueueSessionWrite(HIGHEST, ping_frame.Pass());
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698