Index: net/spdy/spdy_session.cc |
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc |
index 21fcd132594801850efd8615595f7b34ced9d4e6..ad3c378fcf2751b1dd6097262ebb98c7ecb5200b 100644 |
--- a/net/spdy/spdy_session.cc |
+++ b/net/spdy/spdy_session.cc |
@@ -271,28 +271,6 @@ void SpdyStreamRequest::Reset() { |
callback_.Reset(); |
} |
-// static |
-void SpdySession::SpdyIOBufferProducer::ActivateStream( |
- SpdySession* spdy_session, |
- SpdyStream* spdy_stream) { |
- spdy_session->ActivateStream(spdy_stream); |
-} |
- |
-// static |
-SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
- SpdyFrame* frame, |
- RequestPriority priority, |
- SpdyStream* stream) { |
- size_t size = frame->size(); |
- DCHECK_GT(size, 0u); |
- |
- // TODO(mbelshe): We have too much copying of data here. |
- IOBufferWithSize* buffer = new IOBufferWithSize(size); |
- memcpy(buffer->data(), frame->data(), size); |
- |
- return new SpdyIOBuffer(buffer, size, priority, stream); |
-} |
- |
SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, |
SpdySessionPool* spdy_session_pool, |
HttpServerProperties* http_server_properties, |
@@ -389,7 +367,7 @@ SpdySession::~SpdySession() { |
DCHECK_EQ(0u, num_active_streams()); |
DCHECK_EQ(0u, num_unclaimed_pushed_streams()); |
- for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) { |
+ for (int i = 0; i < NUM_PRIORITIES; ++i) { |
DCHECK(pending_create_stream_queues_[i].empty()); |
} |
DCHECK(pending_stream_request_completions_.empty()); |
@@ -485,11 +463,10 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { |
ssl_info.cert->VerifyNameMatch(domain); |
} |
-void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, |
- SpdyIOBufferProducer* producer) { |
- write_queue_.push(producer); |
- stream_producers_[producer] = stream; |
- WriteSocketLater(); |
+void SpdySession::SetStreamHasWriteAvailable( |
+ SpdyStream* stream, |
+ scoped_ptr<SpdyFrameProducer> producer) { |
+ QueueFrameProducerForWriting(producer.Pass(), stream->priority()); |
} |
int SpdySession::GetPushStream( |
@@ -849,7 +826,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, |
scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
priority = stream->priority(); |
} |
- QueueFrame(rst_frame.release(), priority); |
+ QueueSessionFrameForWriting(rst_frame.Pass(), priority); |
RecordProtocolErrorHistogram( |
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); |
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); |
@@ -970,56 +947,53 @@ int SpdySession::DoReadComplete(int result) { |
void SpdySession::OnWriteComplete(int result) { |
DCHECK(write_pending_); |
- DCHECK(in_flight_write_.size()); |
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
last_activity_time_ = base::TimeTicks::Now(); |
write_pending_ = false; |
- scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); |
- |
- if (result >= 0) { |
- // It should not be possible to have written more bytes than our |
- // in_flight_write_. |
- DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); |
- |
- in_flight_write_.buffer()->DidConsume(result); |
- |
- // We only notify the stream when we've fully written the pending frame. |
- if (!in_flight_write_.buffer()->BytesRemaining()) { |
- if (stream) { |
- // Report the number of bytes written to the caller, but exclude the |
- // frame size overhead. NOTE: if this frame was compressed the |
- // reported bytes written is the compressed size, not the original |
- // size. |
- if (result > 0) { |
- result = in_flight_write_.buffer()->size(); |
- DCHECK_GE(result, |
- static_cast<int>( |
- buffered_spdy_framer_->GetControlFrameHeaderSize())); |
- result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); |
- } |
- |
- // It is possible that the stream was cancelled while we were writing |
- // to the socket. |
- if (!stream->cancelled()) |
- stream->OnWriteComplete(result); |
- } |
+ if (result < 0) { |
+ in_flight_write_.Release(); |
+ CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); |
+ return; |
+ } |
- // Cleanup the write which just completed. |
- in_flight_write_.release(); |
- } |
+ // It should not be possible to have written more bytes than our |
+ // in_flight_write_. |
+ DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); |
- // Write more data. We're already in a continuation, so we can |
- // go ahead and write it immediately (without going back to the |
- // message loop). |
- WriteSocketLater(); |
- } else { |
- in_flight_write_.release(); |
+ in_flight_write_.buffer()->DidConsume(result); |
- // The stream is now errored. Close it down. |
- CloseSessionOnError( |
- static_cast<net::Error>(result), true, "The stream has errored."); |
+ // We only notify the stream when we've fully written the pending frame. |
+ if (in_flight_write_.buffer()->BytesRemaining() == 0) { |
+ DCHECK_GT(result, 0); |
+ |
+ scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); |
+ |
+ // It is possible that the stream was cancelled while we were writing |
+ // to the socket. |
+ if (stream && !stream->cancelled()) { |
+ // Report the number of bytes written to the caller, but exclude the |
+ // frame size overhead. NOTE: if this frame was compressed the |
+ // reported bytes written is the compressed size, not the original |
+ // size. |
+ result = in_flight_write_.buffer()->size(); |
+ DCHECK_GE(result, |
+ static_cast<int>( |
+ buffered_spdy_framer_->GetControlFrameHeaderSize())); |
+ result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); |
+ |
+ stream->OnWriteComplete(result); |
+ } |
+ |
+ // Cleanup the write which just completed. |
+ in_flight_write_.Release(); |
} |
+ |
+ // Write more data. We're already in a continuation, so we can go |
+ // ahead and write it immediately (without going back to the message |
+ // loop). |
+ WriteSocketLater(); |
} |
void SpdySession::WriteSocketLater() { |
@@ -1052,24 +1026,36 @@ void SpdySession::WriteSocket() { |
// Loop sending frames until we've sent everything or until the write |
// returns error (or ERR_IO_PENDING). |
DCHECK(buffered_spdy_framer_.get()); |
- while (in_flight_write_.buffer() || !write_queue_.empty()) { |
- if (!in_flight_write_.buffer()) { |
- // Grab the next SpdyBuffer to send. |
- scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); |
- write_queue_.pop(); |
- scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); |
- stream_producers_.erase(producer.get()); |
+ while (true) { |
+ if (in_flight_write_.buffer()) { |
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
+ } else { |
+ // Grab the next frame to send. |
+ scoped_ptr<SpdyFrameProducer> producer = PopNextFrameProducerToWrite(); |
+ if (!producer) |
+ break; |
+ |
+ scoped_refptr<SpdyStream> stream = producer->GetStream(); |
// It is possible that a stream had data to write, but a |
// WINDOW_UPDATE frame has been received which made that |
// stream no longer writable. |
// TODO(rch): consider handling that case by removing the |
// stream from the writable queue? |
- if (buffer == NULL) |
+ if (stream.get() && stream->cancelled()) |
continue; |
- in_flight_write_ = *buffer; |
- } else { |
- DCHECK(in_flight_write_.buffer()->BytesRemaining()); |
+ if (stream.get() && stream->stream_id() == 0) |
+ ActivateStream(stream); |
+ |
+ scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); |
+ DCHECK(frame); |
+ DCHECK_GT(frame->size(), 0u); |
+ |
+ // TODO(mbelshe): We have too much copying of data here. |
+ scoped_refptr<IOBufferWithSize> buffer = |
+ new IOBufferWithSize(frame->size()); |
+ memcpy(buffer->data(), frame->data(), frame->size()); |
+ in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream); |
} |
write_pending_ = true; |
@@ -1127,12 +1113,7 @@ void SpdySession::CloseAllStreams(net::Error status) { |
stream->OnClose(status); |
} |
- // We also need to drain the queue. |
- while (!write_queue_.empty()) { |
- scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); |
- write_queue_.pop(); |
- stream_producers_.erase(producer.get()); |
- } |
+ ClearWriteQueue(); |
} |
void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, |
@@ -1255,36 +1236,75 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const { |
return connection_->socket()->GetLocalAddress(address); |
} |
-class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { |
+// A simple wrapper around a SpdyFrame associated with the session. |
+class SessionFrameProducer : public SpdyFrameProducer { |
public: |
- SimpleSpdyIOBufferProducer(SpdyFrame* frame, |
- RequestPriority priority) |
- : frame_(frame), |
- priority_(priority) { |
- } |
+ SessionFrameProducer(scoped_ptr<SpdyFrame> frame) : frame_(frame.Pass()) {} |
- virtual RequestPriority GetPriority() const OVERRIDE { |
- return priority_; |
+ virtual ~SessionFrameProducer() {} |
+ |
+ virtual SpdyStream* GetStream() OVERRIDE { |
+ return NULL; |
} |
- virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { |
- return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
- frame_.get(), priority_, NULL); |
+ virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { |
+ DCHECK(frame_); |
+ return frame_.Pass(); |
} |
private: |
scoped_ptr<SpdyFrame> frame_; |
- RequestPriority priority_; |
}; |
-void SpdySession::QueueFrame(SpdyFrame* frame, |
- RequestPriority priority) { |
- SimpleSpdyIOBufferProducer* producer = |
- new SimpleSpdyIOBufferProducer(frame, priority); |
- write_queue_.push(producer); |
+void SpdySession::QueueSessionFrameForWriting(scoped_ptr<SpdyFrame> frame, |
+ RequestPriority priority) { |
+ QueueFrameProducerForWriting( |
+ scoped_ptr<SpdyFrameProducer>(new SessionFrameProducer(frame.Pass())), |
+ priority); |
+} |
+ |
+void SpdySession::QueueFrameProducerForWriting( |
+ scoped_ptr<SpdyFrameProducer> producer, |
+ RequestPriority priority) { |
+ write_queue_[priority].push_back(producer.release()); |
WriteSocketLater(); |
} |
+scoped_ptr<SpdyFrameProducer> SpdySession::PopNextFrameProducerToWrite() { |
+ for (int i = NUM_PRIORITIES - 1; i >= 0; --i) { |
+ std::deque<SpdyFrameProducer*>* queue = &write_queue_[i]; |
+ if (!queue->empty()) { |
+ scoped_ptr<SpdyFrameProducer> producer(queue->front()); |
+ queue->pop_front(); |
+ return producer.Pass(); |
+ } |
+ } |
+ return scoped_ptr<SpdyFrameProducer>(); |
+} |
+ |
+void SpdySession::RemoveStreamFromWriteQueue( |
+ const scoped_refptr<SpdyStream>& stream) { |
+ std::deque<SpdyFrameProducer*> old; |
+ std::deque<SpdyFrameProducer*>* queue = |
+ &write_queue_[stream->priority()]; |
+ old.swap(*queue); |
+ |
+ while (!old.empty()) { |
+ scoped_ptr<SpdyFrameProducer> producer(old.front()); |
+ old.pop_front(); |
+ scoped_refptr<SpdyStream> producer_stream = producer->GetStream(); |
+ if (!producer_stream || producer_stream != stream) { |
+ queue->push_back(producer.release()); |
+ } |
+ } |
+} |
+ |
+void SpdySession::ClearWriteQueue() { |
+ for (int i = 0; i < NUM_PRIORITIES; ++i) { |
+ STLDeleteElements(&write_queue_[i]); |
+ } |
+} |
+ |
void SpdySession::ActivateStream(SpdyStream* stream) { |
if (stream->stream_id() == 0) { |
stream->set_stream_id(GetNewStreamId()); |
@@ -1301,8 +1321,7 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { |
// the stream in the unclaimed_pushed_streams_ list. However, if |
// the stream is errored out, clean it up entirely. |
if (status != OK) { |
- PushedStreamMap::iterator it; |
- for (it = unclaimed_pushed_streams_.begin(); |
+ for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); |
it != unclaimed_pushed_streams_.end(); ++it) { |
scoped_refptr<SpdyStream> curr = it->second.first; |
if (id == curr->stream_id()) { |
@@ -1313,29 +1332,17 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { |
} |
// The stream might have been deleted. |
- ActiveStreamMap::iterator it2 = active_streams_.find(id); |
- if (it2 == active_streams_.end()) |
+ ActiveStreamMap::iterator it = active_streams_.find(id); |
+ if (it == active_streams_.end()) |
return; |
- // Possibly remove from the write queue. |
- WriteQueue old = write_queue_; |
- write_queue_ = WriteQueue(); |
- while (!old.empty()) { |
- scoped_ptr<SpdyIOBufferProducer> producer(old.top()); |
- old.pop(); |
- StreamProducerMap::iterator it = stream_producers_.find(producer.get()); |
- if (it == stream_producers_.end() || it->second->stream_id() != id) { |
- write_queue_.push(producer.release()); |
- } else { |
- stream_producers_.erase(producer.get()); |
- producer.reset(NULL); |
- } |
- } |
+ const scoped_refptr<SpdyStream> stream(it->second); |
+ active_streams_.erase(it); |
+ DCHECK(stream); |
+ |
+ RemoveStreamFromWriteQueue(stream); |
// If this is an active stream, call the callback. |
- const scoped_refptr<SpdyStream> stream(it2->second); |
- active_streams_.erase(it2); |
- DCHECK(stream); |
stream->OnClose(status); |
ProcessPendingStreamRequests(); |
} |
@@ -1919,7 +1926,7 @@ void SpdySession::SendSettings(const SettingsMap& settings) { |
scoped_ptr<SpdyFrame> settings_frame( |
buffered_spdy_framer_->CreateSettings(settings)); |
sent_settings_ = true; |
- QueueFrame(settings_frame.release(), HIGHEST); |
+ QueueSessionFrameForWriting(settings_frame.Pass(), HIGHEST); |
} |
void SpdySession::HandleSetting(uint32 id, uint32 value) { |
@@ -2007,14 +2014,14 @@ void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, |
DCHECK(buffered_spdy_framer_.get()); |
scoped_ptr<SpdyFrame> window_update_frame( |
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); |
- QueueFrame(window_update_frame.release(), priority); |
+ QueueSessionFrameForWriting(window_update_frame.Pass(), priority); |
} |
void SpdySession::WritePingFrame(uint32 unique_id) { |
DCHECK(buffered_spdy_framer_.get()); |
scoped_ptr<SpdyFrame> ping_frame( |
buffered_spdy_framer_->CreatePingFrame(unique_id)); |
- QueueFrame(ping_frame.release(), HIGHEST); |
+ QueueSessionFrameForWriting(ping_frame.Pass(), HIGHEST); |
if (net_log().IsLoggingAllEvents()) { |
net_log().AddEvent( |