Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1736)

Unified Diff: net/spdy/spdy_session.cc

Issue 13009012: [SPDY] Refactor SpdySession's write queue (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/spdy/spdy_session.cc
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 21fcd132594801850efd8615595f7b34ced9d4e6..ad3c378fcf2751b1dd6097262ebb98c7ecb5200b 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -271,28 +271,6 @@ void SpdyStreamRequest::Reset() {
callback_.Reset();
}
-// static
-void SpdySession::SpdyIOBufferProducer::ActivateStream(
- SpdySession* spdy_session,
- SpdyStream* spdy_stream) {
- spdy_session->ActivateStream(spdy_stream);
-}
-
-// static
-SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
- SpdyFrame* frame,
- RequestPriority priority,
- SpdyStream* stream) {
- size_t size = frame->size();
- DCHECK_GT(size, 0u);
-
- // TODO(mbelshe): We have too much copying of data here.
- IOBufferWithSize* buffer = new IOBufferWithSize(size);
- memcpy(buffer->data(), frame->data(), size);
-
- return new SpdyIOBuffer(buffer, size, priority, stream);
-}
-
SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
SpdySessionPool* spdy_session_pool,
HttpServerProperties* http_server_properties,
@@ -389,7 +367,7 @@ SpdySession::~SpdySession() {
DCHECK_EQ(0u, num_active_streams());
DCHECK_EQ(0u, num_unclaimed_pushed_streams());
- for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) {
+ for (int i = 0; i < NUM_PRIORITIES; ++i) {
DCHECK(pending_create_stream_queues_[i].empty());
}
DCHECK(pending_stream_request_completions_.empty());
@@ -485,11 +463,10 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
ssl_info.cert->VerifyNameMatch(domain);
}
-void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
- SpdyIOBufferProducer* producer) {
- write_queue_.push(producer);
- stream_producers_[producer] = stream;
- WriteSocketLater();
+void SpdySession::SetStreamHasWriteAvailable(
+ SpdyStream* stream,
+ scoped_ptr<SpdyFrameProducer> producer) {
+ QueueFrameProducerForWriting(producer.Pass(), stream->priority());
}
int SpdySession::GetPushStream(
@@ -849,7 +826,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
priority = stream->priority();
}
- QueueFrame(rst_frame.release(), priority);
+ QueueSessionFrameForWriting(rst_frame.Pass(), priority);
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
@@ -970,56 +947,53 @@ int SpdySession::DoReadComplete(int result) {
void SpdySession::OnWriteComplete(int result) {
DCHECK(write_pending_);
- DCHECK(in_flight_write_.size());
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
last_activity_time_ = base::TimeTicks::Now();
write_pending_ = false;
- scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
-
- if (result >= 0) {
- // It should not be possible to have written more bytes than our
- // in_flight_write_.
- DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
-
- in_flight_write_.buffer()->DidConsume(result);
-
- // We only notify the stream when we've fully written the pending frame.
- if (!in_flight_write_.buffer()->BytesRemaining()) {
- if (stream) {
- // Report the number of bytes written to the caller, but exclude the
- // frame size overhead. NOTE: if this frame was compressed the
- // reported bytes written is the compressed size, not the original
- // size.
- if (result > 0) {
- result = in_flight_write_.buffer()->size();
- DCHECK_GE(result,
- static_cast<int>(
- buffered_spdy_framer_->GetControlFrameHeaderSize()));
- result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
- }
-
- // It is possible that the stream was cancelled while we were writing
- // to the socket.
- if (!stream->cancelled())
- stream->OnWriteComplete(result);
- }
+ if (result < 0) {
+ in_flight_write_.Release();
+ CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
+ return;
+ }
- // Cleanup the write which just completed.
- in_flight_write_.release();
- }
+ // It should not be possible to have written more bytes than our
+ // in_flight_write_.
+ DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
- // Write more data. We're already in a continuation, so we can
- // go ahead and write it immediately (without going back to the
- // message loop).
- WriteSocketLater();
- } else {
- in_flight_write_.release();
+ in_flight_write_.buffer()->DidConsume(result);
- // The stream is now errored. Close it down.
- CloseSessionOnError(
- static_cast<net::Error>(result), true, "The stream has errored.");
+ // We only notify the stream when we've fully written the pending frame.
+ if (in_flight_write_.buffer()->BytesRemaining() == 0) {
+ DCHECK_GT(result, 0);
+
+ scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
+
+ // It is possible that the stream was cancelled while we were writing
+ // to the socket.
+ if (stream && !stream->cancelled()) {
+ // Report the number of bytes written to the caller, but exclude the
+ // frame size overhead. NOTE: if this frame was compressed the
+ // reported bytes written is the compressed size, not the original
+ // size.
+ result = in_flight_write_.buffer()->size();
+ DCHECK_GE(result,
+ static_cast<int>(
+ buffered_spdy_framer_->GetControlFrameHeaderSize()));
+ result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
+
+ stream->OnWriteComplete(result);
+ }
+
+ // Cleanup the write which just completed.
+ in_flight_write_.Release();
}
+
+ // Write more data. We're already in a continuation, so we can go
+ // ahead and write it immediately (without going back to the message
+ // loop).
+ WriteSocketLater();
}
void SpdySession::WriteSocketLater() {
@@ -1052,24 +1026,36 @@ void SpdySession::WriteSocket() {
// Loop sending frames until we've sent everything or until the write
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
- while (in_flight_write_.buffer() || !write_queue_.empty()) {
- if (!in_flight_write_.buffer()) {
- // Grab the next SpdyBuffer to send.
- scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
- write_queue_.pop();
- scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
- stream_producers_.erase(producer.get());
+ while (true) {
+ if (in_flight_write_.buffer()) {
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
+ } else {
+ // Grab the next frame to send.
+ scoped_ptr<SpdyFrameProducer> producer = PopNextFrameProducerToWrite();
+ if (!producer)
+ break;
+
+ scoped_refptr<SpdyStream> stream = producer->GetStream();
// It is possible that a stream had data to write, but a
// WINDOW_UPDATE frame has been received which made that
// stream no longer writable.
// TODO(rch): consider handling that case by removing the
// stream from the writable queue?
- if (buffer == NULL)
+ if (stream.get() && stream->cancelled())
continue;
- in_flight_write_ = *buffer;
- } else {
- DCHECK(in_flight_write_.buffer()->BytesRemaining());
+ if (stream.get() && stream->stream_id() == 0)
+ ActivateStream(stream);
+
+ scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
+ DCHECK(frame);
+ DCHECK_GT(frame->size(), 0u);
+
+ // TODO(mbelshe): We have too much copying of data here.
+ scoped_refptr<IOBufferWithSize> buffer =
+ new IOBufferWithSize(frame->size());
+ memcpy(buffer->data(), frame->data(), frame->size());
+ in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
}
write_pending_ = true;
@@ -1127,12 +1113,7 @@ void SpdySession::CloseAllStreams(net::Error status) {
stream->OnClose(status);
}
- // We also need to drain the queue.
- while (!write_queue_.empty()) {
- scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
- write_queue_.pop();
- stream_producers_.erase(producer.get());
- }
+ ClearWriteQueue();
}
void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
@@ -1255,36 +1236,75 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
return connection_->socket()->GetLocalAddress(address);
}
-class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
+// A simple wrapper around a SpdyFrame associated with the session.
+class SessionFrameProducer : public SpdyFrameProducer {
public:
- SimpleSpdyIOBufferProducer(SpdyFrame* frame,
- RequestPriority priority)
- : frame_(frame),
- priority_(priority) {
- }
+ SessionFrameProducer(scoped_ptr<SpdyFrame> frame) : frame_(frame.Pass()) {}
- virtual RequestPriority GetPriority() const OVERRIDE {
- return priority_;
+ virtual ~SessionFrameProducer() {}
+
+ virtual SpdyStream* GetStream() OVERRIDE {
+ return NULL;
}
- virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE {
- return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
- frame_.get(), priority_, NULL);
+ virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE {
+ DCHECK(frame_);
+ return frame_.Pass();
}
private:
scoped_ptr<SpdyFrame> frame_;
- RequestPriority priority_;
};
-void SpdySession::QueueFrame(SpdyFrame* frame,
- RequestPriority priority) {
- SimpleSpdyIOBufferProducer* producer =
- new SimpleSpdyIOBufferProducer(frame, priority);
- write_queue_.push(producer);
+void SpdySession::QueueSessionFrameForWriting(scoped_ptr<SpdyFrame> frame,
+ RequestPriority priority) {
+ QueueFrameProducerForWriting(
+ scoped_ptr<SpdyFrameProducer>(new SessionFrameProducer(frame.Pass())),
+ priority);
+}
+
+void SpdySession::QueueFrameProducerForWriting(
+ scoped_ptr<SpdyFrameProducer> producer,
+ RequestPriority priority) {
+ write_queue_[priority].push_back(producer.release());
WriteSocketLater();
}
+scoped_ptr<SpdyFrameProducer> SpdySession::PopNextFrameProducerToWrite() {
+ for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
+ std::deque<SpdyFrameProducer*>* queue = &write_queue_[i];
+ if (!queue->empty()) {
+ scoped_ptr<SpdyFrameProducer> producer(queue->front());
+ queue->pop_front();
+ return producer.Pass();
+ }
+ }
+ return scoped_ptr<SpdyFrameProducer>();
+}
+
+void SpdySession::RemoveStreamFromWriteQueue(
+ const scoped_refptr<SpdyStream>& stream) {
+ std::deque<SpdyFrameProducer*> old;
+ std::deque<SpdyFrameProducer*>* queue =
+ &write_queue_[stream->priority()];
+ old.swap(*queue);
+
+ while (!old.empty()) {
+ scoped_ptr<SpdyFrameProducer> producer(old.front());
+ old.pop_front();
+ scoped_refptr<SpdyStream> producer_stream = producer->GetStream();
+ if (!producer_stream || producer_stream != stream) {
+ queue->push_back(producer.release());
+ }
+ }
+}
+
+void SpdySession::ClearWriteQueue() {
+ for (int i = 0; i < NUM_PRIORITIES; ++i) {
+ STLDeleteElements(&write_queue_[i]);
+ }
+}
+
void SpdySession::ActivateStream(SpdyStream* stream) {
if (stream->stream_id() == 0) {
stream->set_stream_id(GetNewStreamId());
@@ -1301,8 +1321,7 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
// the stream in the unclaimed_pushed_streams_ list. However, if
// the stream is errored out, clean it up entirely.
if (status != OK) {
- PushedStreamMap::iterator it;
- for (it = unclaimed_pushed_streams_.begin();
+ for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
it != unclaimed_pushed_streams_.end(); ++it) {
scoped_refptr<SpdyStream> curr = it->second.first;
if (id == curr->stream_id()) {
@@ -1313,29 +1332,17 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
}
// The stream might have been deleted.
- ActiveStreamMap::iterator it2 = active_streams_.find(id);
- if (it2 == active_streams_.end())
+ ActiveStreamMap::iterator it = active_streams_.find(id);
+ if (it == active_streams_.end())
return;
- // Possibly remove from the write queue.
- WriteQueue old = write_queue_;
- write_queue_ = WriteQueue();
- while (!old.empty()) {
- scoped_ptr<SpdyIOBufferProducer> producer(old.top());
- old.pop();
- StreamProducerMap::iterator it = stream_producers_.find(producer.get());
- if (it == stream_producers_.end() || it->second->stream_id() != id) {
- write_queue_.push(producer.release());
- } else {
- stream_producers_.erase(producer.get());
- producer.reset(NULL);
- }
- }
+ const scoped_refptr<SpdyStream> stream(it->second);
+ active_streams_.erase(it);
+ DCHECK(stream);
+
+ RemoveStreamFromWriteQueue(stream);
// If this is an active stream, call the callback.
- const scoped_refptr<SpdyStream> stream(it2->second);
- active_streams_.erase(it2);
- DCHECK(stream);
stream->OnClose(status);
ProcessPendingStreamRequests();
}
@@ -1919,7 +1926,7 @@ void SpdySession::SendSettings(const SettingsMap& settings) {
scoped_ptr<SpdyFrame> settings_frame(
buffered_spdy_framer_->CreateSettings(settings));
sent_settings_ = true;
- QueueFrame(settings_frame.release(), HIGHEST);
+ QueueSessionFrameForWriting(settings_frame.Pass(), HIGHEST);
}
void SpdySession::HandleSetting(uint32 id, uint32 value) {
@@ -2007,14 +2014,14 @@ void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyFrame> window_update_frame(
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
- QueueFrame(window_update_frame.release(), priority);
+ QueueSessionFrameForWriting(window_update_frame.Pass(), priority);
}
void SpdySession::WritePingFrame(uint32 unique_id) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyFrame> ping_frame(
buffered_spdy_framer_->CreatePingFrame(unique_id));
- QueueFrame(ping_frame.release(), HIGHEST);
+ QueueSessionFrameForWriting(ping_frame.Pass(), HIGHEST);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(

Powered by Google App Engine
This is Rietveld 408576698