| Index: net/spdy/spdy_session.cc
|
| diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
|
| index 21fcd132594801850efd8615595f7b34ced9d4e6..ad3c378fcf2751b1dd6097262ebb98c7ecb5200b 100644
|
| --- a/net/spdy/spdy_session.cc
|
| +++ b/net/spdy/spdy_session.cc
|
| @@ -271,28 +271,6 @@ void SpdyStreamRequest::Reset() {
|
| callback_.Reset();
|
| }
|
|
|
| -// static
|
| -void SpdySession::SpdyIOBufferProducer::ActivateStream(
|
| - SpdySession* spdy_session,
|
| - SpdyStream* spdy_stream) {
|
| - spdy_session->ActivateStream(spdy_stream);
|
| -}
|
| -
|
| -// static
|
| -SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
|
| - SpdyFrame* frame,
|
| - RequestPriority priority,
|
| - SpdyStream* stream) {
|
| - size_t size = frame->size();
|
| - DCHECK_GT(size, 0u);
|
| -
|
| - // TODO(mbelshe): We have too much copying of data here.
|
| - IOBufferWithSize* buffer = new IOBufferWithSize(size);
|
| - memcpy(buffer->data(), frame->data(), size);
|
| -
|
| - return new SpdyIOBuffer(buffer, size, priority, stream);
|
| -}
|
| -
|
| SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
|
| SpdySessionPool* spdy_session_pool,
|
| HttpServerProperties* http_server_properties,
|
| @@ -389,7 +367,7 @@ SpdySession::~SpdySession() {
|
| DCHECK_EQ(0u, num_active_streams());
|
| DCHECK_EQ(0u, num_unclaimed_pushed_streams());
|
|
|
| - for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) {
|
| + for (int i = 0; i < NUM_PRIORITIES; ++i) {
|
| DCHECK(pending_create_stream_queues_[i].empty());
|
| }
|
| DCHECK(pending_stream_request_completions_.empty());
|
| @@ -485,11 +463,10 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
|
| ssl_info.cert->VerifyNameMatch(domain);
|
| }
|
|
|
| -void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
|
| - SpdyIOBufferProducer* producer) {
|
| - write_queue_.push(producer);
|
| - stream_producers_[producer] = stream;
|
| - WriteSocketLater();
|
| +void SpdySession::SetStreamHasWriteAvailable(
|
| + SpdyStream* stream,
|
| + scoped_ptr<SpdyFrameProducer> producer) {
|
| + QueueFrameProducerForWriting(producer.Pass(), stream->priority());
|
| }
|
|
|
| int SpdySession::GetPushStream(
|
| @@ -849,7 +826,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
|
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
|
| priority = stream->priority();
|
| }
|
| - QueueFrame(rst_frame.release(), priority);
|
| + QueueSessionFrameForWriting(rst_frame.Pass(), priority);
|
| RecordProtocolErrorHistogram(
|
| static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
|
| DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
|
| @@ -970,56 +947,53 @@ int SpdySession::DoReadComplete(int result) {
|
|
|
| void SpdySession::OnWriteComplete(int result) {
|
| DCHECK(write_pending_);
|
| - DCHECK(in_flight_write_.size());
|
| + DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
|
|
|
| last_activity_time_ = base::TimeTicks::Now();
|
| write_pending_ = false;
|
|
|
| - scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
|
| -
|
| - if (result >= 0) {
|
| - // It should not be possible to have written more bytes than our
|
| - // in_flight_write_.
|
| - DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
|
| -
|
| - in_flight_write_.buffer()->DidConsume(result);
|
| -
|
| - // We only notify the stream when we've fully written the pending frame.
|
| - if (!in_flight_write_.buffer()->BytesRemaining()) {
|
| - if (stream) {
|
| - // Report the number of bytes written to the caller, but exclude the
|
| - // frame size overhead. NOTE: if this frame was compressed the
|
| - // reported bytes written is the compressed size, not the original
|
| - // size.
|
| - if (result > 0) {
|
| - result = in_flight_write_.buffer()->size();
|
| - DCHECK_GE(result,
|
| - static_cast<int>(
|
| - buffered_spdy_framer_->GetControlFrameHeaderSize()));
|
| - result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
|
| - }
|
| -
|
| - // It is possible that the stream was cancelled while we were writing
|
| - // to the socket.
|
| - if (!stream->cancelled())
|
| - stream->OnWriteComplete(result);
|
| - }
|
| + if (result < 0) {
|
| + in_flight_write_.Release();
|
| + CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
|
| + return;
|
| + }
|
|
|
| - // Cleanup the write which just completed.
|
| - in_flight_write_.release();
|
| - }
|
| + // It should not be possible to have written more bytes than our
|
| + // in_flight_write_.
|
| + DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
|
|
|
| - // Write more data. We're already in a continuation, so we can
|
| - // go ahead and write it immediately (without going back to the
|
| - // message loop).
|
| - WriteSocketLater();
|
| - } else {
|
| - in_flight_write_.release();
|
| + in_flight_write_.buffer()->DidConsume(result);
|
|
|
| - // The stream is now errored. Close it down.
|
| - CloseSessionOnError(
|
| - static_cast<net::Error>(result), true, "The stream has errored.");
|
| + // We only notify the stream when we've fully written the pending frame.
|
| + if (in_flight_write_.buffer()->BytesRemaining() == 0) {
|
| + DCHECK_GT(result, 0);
|
| +
|
| + scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
|
| +
|
| + // It is possible that the stream was cancelled while we were writing
|
| + // to the socket.
|
| + if (stream && !stream->cancelled()) {
|
| + // Report the number of bytes written to the caller, but exclude the
|
| + // frame size overhead. NOTE: if this frame was compressed the
|
| + // reported bytes written is the compressed size, not the original
|
| + // size.
|
| + result = in_flight_write_.buffer()->size();
|
| + DCHECK_GE(result,
|
| + static_cast<int>(
|
| + buffered_spdy_framer_->GetControlFrameHeaderSize()));
|
| + result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
|
| +
|
| + stream->OnWriteComplete(result);
|
| + }
|
| +
|
| + // Cleanup the write which just completed.
|
| + in_flight_write_.Release();
|
| }
|
| +
|
| + // Write more data. We're already in a continuation, so we can go
|
| + // ahead and write it immediately (without going back to the message
|
| + // loop).
|
| + WriteSocketLater();
|
| }
|
|
|
| void SpdySession::WriteSocketLater() {
|
| @@ -1052,24 +1026,36 @@ void SpdySession::WriteSocket() {
|
| // Loop sending frames until we've sent everything or until the write
|
| // returns error (or ERR_IO_PENDING).
|
| DCHECK(buffered_spdy_framer_.get());
|
| - while (in_flight_write_.buffer() || !write_queue_.empty()) {
|
| - if (!in_flight_write_.buffer()) {
|
| - // Grab the next SpdyBuffer to send.
|
| - scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
|
| - write_queue_.pop();
|
| - scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
|
| - stream_producers_.erase(producer.get());
|
| + while (true) {
|
| + if (in_flight_write_.buffer()) {
|
| + DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
|
| + } else {
|
| + // Grab the next frame to send.
|
| + scoped_ptr<SpdyFrameProducer> producer = PopNextFrameProducerToWrite();
|
| + if (!producer)
|
| + break;
|
| +
|
| + scoped_refptr<SpdyStream> stream = producer->GetStream();
|
| // It is possible that a stream had data to write, but a
|
| // WINDOW_UPDATE frame has been received which made that
|
| // stream no longer writable.
|
| // TODO(rch): consider handling that case by removing the
|
| // stream from the writable queue?
|
| - if (buffer == NULL)
|
| + if (stream.get() && stream->cancelled())
|
| continue;
|
|
|
| - in_flight_write_ = *buffer;
|
| - } else {
|
| - DCHECK(in_flight_write_.buffer()->BytesRemaining());
|
| + if (stream.get() && stream->stream_id() == 0)
|
| + ActivateStream(stream);
|
| +
|
| + scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
|
| + DCHECK(frame);
|
| + DCHECK_GT(frame->size(), 0u);
|
| +
|
| + // TODO(mbelshe): We have too much copying of data here.
|
| + scoped_refptr<IOBufferWithSize> buffer =
|
| + new IOBufferWithSize(frame->size());
|
| + memcpy(buffer->data(), frame->data(), frame->size());
|
| + in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
|
| }
|
|
|
| write_pending_ = true;
|
| @@ -1127,12 +1113,7 @@ void SpdySession::CloseAllStreams(net::Error status) {
|
| stream->OnClose(status);
|
| }
|
|
|
| - // We also need to drain the queue.
|
| - while (!write_queue_.empty()) {
|
| - scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
|
| - write_queue_.pop();
|
| - stream_producers_.erase(producer.get());
|
| - }
|
| + ClearWriteQueue();
|
| }
|
|
|
| void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
|
| @@ -1255,36 +1236,75 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
|
| return connection_->socket()->GetLocalAddress(address);
|
| }
|
|
|
| -class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
|
| +// A simple wrapper around a SpdyFrame associated with the session.
|
| +class SessionFrameProducer : public SpdyFrameProducer {
|
| public:
|
| - SimpleSpdyIOBufferProducer(SpdyFrame* frame,
|
| - RequestPriority priority)
|
| - : frame_(frame),
|
| - priority_(priority) {
|
| - }
|
| + SessionFrameProducer(scoped_ptr<SpdyFrame> frame) : frame_(frame.Pass()) {}
|
|
|
| - virtual RequestPriority GetPriority() const OVERRIDE {
|
| - return priority_;
|
| + virtual ~SessionFrameProducer() {}
|
| +
|
| + virtual SpdyStream* GetStream() OVERRIDE {
|
| + return NULL;
|
| }
|
|
|
| - virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE {
|
| - return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
|
| - frame_.get(), priority_, NULL);
|
| + virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE {
|
| + DCHECK(frame_);
|
| + return frame_.Pass();
|
| }
|
|
|
| private:
|
| scoped_ptr<SpdyFrame> frame_;
|
| - RequestPriority priority_;
|
| };
|
|
|
| -void SpdySession::QueueFrame(SpdyFrame* frame,
|
| - RequestPriority priority) {
|
| - SimpleSpdyIOBufferProducer* producer =
|
| - new SimpleSpdyIOBufferProducer(frame, priority);
|
| - write_queue_.push(producer);
|
| +void SpdySession::QueueSessionFrameForWriting(scoped_ptr<SpdyFrame> frame,
|
| + RequestPriority priority) {
|
| + QueueFrameProducerForWriting(
|
| + scoped_ptr<SpdyFrameProducer>(new SessionFrameProducer(frame.Pass())),
|
| + priority);
|
| +}
|
| +
|
| +void SpdySession::QueueFrameProducerForWriting(
|
| + scoped_ptr<SpdyFrameProducer> producer,
|
| + RequestPriority priority) {
|
| + write_queue_[priority].push_back(producer.release());
|
| WriteSocketLater();
|
| }
|
|
|
| +scoped_ptr<SpdyFrameProducer> SpdySession::PopNextFrameProducerToWrite() {
|
| + for (int i = NUM_PRIORITIES - 1; i >= 0; --i) {
|
| + std::deque<SpdyFrameProducer*>* queue = &write_queue_[i];
|
| + if (!queue->empty()) {
|
| + scoped_ptr<SpdyFrameProducer> producer(queue->front());
|
| + queue->pop_front();
|
| + return producer.Pass();
|
| + }
|
| + }
|
| + return scoped_ptr<SpdyFrameProducer>();
|
| +}
|
| +
|
| +void SpdySession::RemoveStreamFromWriteQueue(
|
| + const scoped_refptr<SpdyStream>& stream) {
|
| + std::deque<SpdyFrameProducer*> old;
|
| + std::deque<SpdyFrameProducer*>* queue =
|
| + &write_queue_[stream->priority()];
|
| + old.swap(*queue);
|
| +
|
| + while (!old.empty()) {
|
| + scoped_ptr<SpdyFrameProducer> producer(old.front());
|
| + old.pop_front();
|
| + scoped_refptr<SpdyStream> producer_stream = producer->GetStream();
|
| + if (!producer_stream || producer_stream != stream) {
|
| + queue->push_back(producer.release());
|
| + }
|
| + }
|
| +}
|
| +
|
| +void SpdySession::ClearWriteQueue() {
|
| + for (int i = 0; i < NUM_PRIORITIES; ++i) {
|
| + STLDeleteElements(&write_queue_[i]);
|
| + }
|
| +}
|
| +
|
| void SpdySession::ActivateStream(SpdyStream* stream) {
|
| if (stream->stream_id() == 0) {
|
| stream->set_stream_id(GetNewStreamId());
|
| @@ -1301,8 +1321,7 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
|
| // the stream in the unclaimed_pushed_streams_ list. However, if
|
| // the stream is errored out, clean it up entirely.
|
| if (status != OK) {
|
| - PushedStreamMap::iterator it;
|
| - for (it = unclaimed_pushed_streams_.begin();
|
| + for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
|
| it != unclaimed_pushed_streams_.end(); ++it) {
|
| scoped_refptr<SpdyStream> curr = it->second.first;
|
| if (id == curr->stream_id()) {
|
| @@ -1313,29 +1332,17 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
|
| }
|
|
|
| // The stream might have been deleted.
|
| - ActiveStreamMap::iterator it2 = active_streams_.find(id);
|
| - if (it2 == active_streams_.end())
|
| + ActiveStreamMap::iterator it = active_streams_.find(id);
|
| + if (it == active_streams_.end())
|
| return;
|
|
|
| - // Possibly remove from the write queue.
|
| - WriteQueue old = write_queue_;
|
| - write_queue_ = WriteQueue();
|
| - while (!old.empty()) {
|
| - scoped_ptr<SpdyIOBufferProducer> producer(old.top());
|
| - old.pop();
|
| - StreamProducerMap::iterator it = stream_producers_.find(producer.get());
|
| - if (it == stream_producers_.end() || it->second->stream_id() != id) {
|
| - write_queue_.push(producer.release());
|
| - } else {
|
| - stream_producers_.erase(producer.get());
|
| - producer.reset(NULL);
|
| - }
|
| - }
|
| + const scoped_refptr<SpdyStream> stream(it->second);
|
| + active_streams_.erase(it);
|
| + DCHECK(stream);
|
| +
|
| + RemoveStreamFromWriteQueue(stream);
|
|
|
| // If this is an active stream, call the callback.
|
| - const scoped_refptr<SpdyStream> stream(it2->second);
|
| - active_streams_.erase(it2);
|
| - DCHECK(stream);
|
| stream->OnClose(status);
|
| ProcessPendingStreamRequests();
|
| }
|
| @@ -1919,7 +1926,7 @@ void SpdySession::SendSettings(const SettingsMap& settings) {
|
| scoped_ptr<SpdyFrame> settings_frame(
|
| buffered_spdy_framer_->CreateSettings(settings));
|
| sent_settings_ = true;
|
| - QueueFrame(settings_frame.release(), HIGHEST);
|
| + QueueSessionFrameForWriting(settings_frame.Pass(), HIGHEST);
|
| }
|
|
|
| void SpdySession::HandleSetting(uint32 id, uint32 value) {
|
| @@ -2007,14 +2014,14 @@ void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
|
| DCHECK(buffered_spdy_framer_.get());
|
| scoped_ptr<SpdyFrame> window_update_frame(
|
| buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
|
| - QueueFrame(window_update_frame.release(), priority);
|
| + QueueSessionFrameForWriting(window_update_frame.Pass(), priority);
|
| }
|
|
|
| void SpdySession::WritePingFrame(uint32 unique_id) {
|
| DCHECK(buffered_spdy_framer_.get());
|
| scoped_ptr<SpdyFrame> ping_frame(
|
| buffered_spdy_framer_->CreatePingFrame(unique_id));
|
| - QueueFrame(ping_frame.release(), HIGHEST);
|
| + QueueSessionFrameForWriting(ping_frame.Pass(), HIGHEST);
|
|
|
| if (net_log().IsLoggingAllEvents()) {
|
| net_log().AddEvent(
|
|
|