| Index: net/spdy/spdy_session.cc
|
| diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
|
| index 9943190e72dd5117fcde19b242da7d05b02941c6..e34c16f78eb427c8ce48c17e306531cc4d96ca53 100644
|
| --- a/net/spdy/spdy_session.cc
|
| +++ b/net/spdy/spdy_session.cc
|
| @@ -32,6 +32,7 @@
|
| #include "net/http/http_server_properties.h"
|
| #include "net/spdy/spdy_credential_builder.h"
|
| #include "net/spdy/spdy_frame_builder.h"
|
| +#include "net/spdy/spdy_frame_producer.h"
|
| #include "net/spdy/spdy_http_utils.h"
|
| #include "net/spdy/spdy_protocol.h"
|
| #include "net/spdy/spdy_session_pool.h"
|
| @@ -286,28 +287,6 @@ void SpdyStreamRequest::Reset() {
|
| callback_.Reset();
|
| }
|
|
|
| -// static
|
| -void SpdySession::SpdyIOBufferProducer::ActivateStream(
|
| - SpdySession* spdy_session,
|
| - SpdyStream* spdy_stream) {
|
| - spdy_session->ActivateStream(spdy_stream);
|
| -}
|
| -
|
| -// static
|
| -SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
|
| - SpdyFrame* frame,
|
| - RequestPriority priority,
|
| - SpdyStream* stream) {
|
| - size_t size = frame->size();
|
| - DCHECK_GT(size, 0u);
|
| -
|
| - // TODO(mbelshe): We have too much copying of data here.
|
| - IOBufferWithSize* buffer = new IOBufferWithSize(size);
|
| - memcpy(buffer->data(), frame->data(), size);
|
| -
|
| - return new SpdyIOBuffer(buffer, size, priority, stream);
|
| -}
|
| -
|
| SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
|
| SpdySessionPool* spdy_session_pool,
|
| HttpServerProperties* http_server_properties,
|
| @@ -404,7 +383,7 @@ SpdySession::~SpdySession() {
|
| DCHECK_EQ(0u, num_active_streams());
|
| DCHECK_EQ(0u, num_unclaimed_pushed_streams());
|
|
|
| - for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) {
|
| + for (int i = 0; i < NUM_PRIORITIES; ++i) {
|
| DCHECK(pending_create_stream_queues_[i].empty());
|
| }
|
| DCHECK(pending_stream_request_completions_.empty());
|
| @@ -499,13 +478,6 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
|
| ssl_info.cert->VerifyNameMatch(domain);
|
| }
|
|
|
| -void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
|
| - SpdyIOBufferProducer* producer) {
|
| - write_queue_.push(producer);
|
| - stream_producers_[producer] = stream;
|
| - WriteSocketLater();
|
| -}
|
| -
|
| int SpdySession::GetPushStream(
|
| const GURL& url,
|
| scoped_refptr<SpdyStream>* stream,
|
| @@ -659,7 +631,13 @@ int SpdySession::GetProtocolVersion() const {
|
| return buffered_spdy_framer_->protocol_version();
|
| }
|
|
|
| -SpdyFrame* SpdySession::CreateSynStream(
|
| +void SpdySession::EnqueueStreamWrite(
|
| + SpdyStream* stream,
|
| + scoped_ptr<SpdyFrameProducer> producer) {
|
| + EnqueueWrite(stream->priority(), producer.Pass(), stream);
|
| +}
|
| +
|
| +scoped_ptr<SpdyFrame> SpdySession::CreateSynStream(
|
| SpdyStreamId stream_id,
|
| RequestPriority priority,
|
| uint8 credential_slot,
|
| @@ -691,15 +669,16 @@ SpdyFrame* SpdySession::CreateSynStream(
|
| stream_id, 0));
|
| }
|
|
|
| - return syn_frame.release();
|
| + return syn_frame.Pass();
|
| }
|
|
|
| -SpdyFrame* SpdySession::CreateCredentialFrame(
|
| +int SpdySession::CreateCredentialFrame(
|
| const std::string& origin,
|
| SSLClientCertType type,
|
| const std::string& key,
|
| const std::string& cert,
|
| - RequestPriority priority) {
|
| + RequestPriority priority,
|
| + scoped_ptr<SpdyFrame>* credential_frame) {
|
| DCHECK(is_secure_);
|
| SSLClientSocket* ssl_socket = GetSSLClientSocket();
|
| DCHECK(ssl_socket);
|
| @@ -711,12 +690,12 @@ SpdyFrame* SpdySession::CreateCredentialFrame(
|
| size_t slot = credential_state_.SetHasCredential(GURL(origin));
|
| int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot,
|
| &credential);
|
| - DCHECK_EQ(OK, rv);
|
| + DCHECK_NE(rv, ERR_IO_PENDING);
|
| if (rv != OK)
|
| - return NULL;
|
| + return rv;
|
|
|
| DCHECK(buffered_spdy_framer_.get());
|
| - scoped_ptr<SpdyFrame> credential_frame(
|
| + credential_frame->reset(
|
| buffered_spdy_framer_->CreateCredentialFrame(credential));
|
|
|
| if (net_log().IsLoggingAllEvents()) {
|
| @@ -724,10 +703,10 @@ SpdyFrame* SpdySession::CreateCredentialFrame(
|
| NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
|
| base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
|
| }
|
| - return credential_frame.release();
|
| + return OK;
|
| }
|
|
|
| -SpdyFrame* SpdySession::CreateHeadersFrame(
|
| +scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame(
|
| SpdyStreamId stream_id,
|
| const SpdyHeaderBlock& headers,
|
| SpdyControlFlags flags) {
|
| @@ -749,12 +728,13 @@ SpdyFrame* SpdySession::CreateHeadersFrame(
|
| &headers, fin, /*unidirectional=*/false,
|
| stream_id, 0));
|
| }
|
| - return frame.release();
|
| + return frame.Pass();
|
| }
|
|
|
| -SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| - net::IOBuffer* data, int len,
|
| - SpdyDataFlags flags) {
|
| +scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| + net::IOBuffer* data,
|
| + int len,
|
| + SpdyDataFlags flags) {
|
| // Find our stream
|
| CHECK(IsStreamActive(stream_id));
|
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
|
| @@ -762,7 +742,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
|
|
| if (len < 0) {
|
| NOTREACHED();
|
| - return NULL;
|
| + return scoped_ptr<SpdyFrame>();
|
| }
|
|
|
| if (len > kMaxSpdyFrameChunkSize) {
|
| @@ -785,7 +765,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| net_log().AddEvent(
|
| NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW,
|
| NetLog::IntegerCallback("stream_id", stream_id));
|
| - return NULL;
|
| + return scoped_ptr<SpdyFrame>();
|
| }
|
| if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
|
| effective_window_size =
|
| @@ -797,7 +777,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| net_log().AddEvent(
|
| NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW,
|
| NetLog::IntegerCallback("stream_id", stream_id));
|
| - return NULL;
|
| + return scoped_ptr<SpdyFrame>();
|
| }
|
| }
|
|
|
| @@ -828,7 +808,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| buffered_spdy_framer_->CreateDataFrame(
|
| stream_id, data->data(), static_cast<uint32>(len), flags));
|
|
|
| - return frame.release();
|
| + return frame.Pass();
|
| }
|
|
|
| void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
|
| @@ -863,7 +843,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
|
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
|
| priority = stream->priority();
|
| }
|
| - QueueFrame(rst_frame.release(), priority);
|
| + EnqueueSessionWrite(priority, rst_frame.Pass());
|
| RecordProtocolErrorHistogram(
|
| static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
|
| DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
|
| @@ -930,57 +910,59 @@ void SpdySession::OnReadComplete(int bytes_read) {
|
| }
|
|
|
| void SpdySession::OnWriteComplete(int result) {
|
| + // Releasing the in-flight write can have a side-effect of dropping
|
| + // the last reference to |this|. Hold a reference through this
|
| + // function.
|
| + scoped_refptr<SpdySession> self(this);
|
| +
|
| DCHECK(write_pending_);
|
| - DCHECK(in_flight_write_.size());
|
| + DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
|
|
|
| last_activity_time_ = base::TimeTicks::Now();
|
| write_pending_ = false;
|
|
|
| - scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
|
| -
|
| - if (result >= 0) {
|
| - // It should not be possible to have written more bytes than our
|
| - // in_flight_write_.
|
| - DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
|
| -
|
| - in_flight_write_.buffer()->DidConsume(result);
|
| -
|
| - // We only notify the stream when we've fully written the pending frame.
|
| - if (!in_flight_write_.buffer()->BytesRemaining()) {
|
| - if (stream) {
|
| - // Report the number of bytes written to the caller, but exclude the
|
| - // frame size overhead. NOTE: if this frame was compressed the
|
| - // reported bytes written is the compressed size, not the original
|
| - // size.
|
| - if (result > 0) {
|
| - result = in_flight_write_.buffer()->size();
|
| - DCHECK_GE(result,
|
| - static_cast<int>(
|
| - buffered_spdy_framer_->GetControlFrameHeaderSize()));
|
| - result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
|
| - }
|
| -
|
| - // It is possible that the stream was cancelled while we were writing
|
| - // to the socket.
|
| - if (!stream->cancelled())
|
| - stream->OnWriteComplete(result);
|
| - }
|
| + if (result < 0) {
|
| + in_flight_write_.Release();
|
| + CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
|
| + return;
|
| + }
|
|
|
| - // Cleanup the write which just completed.
|
| - in_flight_write_.release();
|
| - }
|
| + // It should not be possible to have written more bytes than our
|
| + // in_flight_write_.
|
| + DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
|
|
|
| - // Write more data. We're already in a continuation, so we can
|
| - // go ahead and write it immediately (without going back to the
|
| - // message loop).
|
| - WriteSocketLater();
|
| - } else {
|
| - in_flight_write_.release();
|
| + in_flight_write_.buffer()->DidConsume(result);
|
|
|
| - // The stream is now errored. Close it down.
|
| - CloseSessionOnError(
|
| - static_cast<net::Error>(result), true, "The stream has errored.");
|
| + // We only notify the stream when we've fully written the pending frame.
|
| + if (in_flight_write_.buffer()->BytesRemaining() == 0) {
|
| + DCHECK_GT(result, 0);
|
| +
|
| + scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
|
| +
|
| + // It is possible that the stream was cancelled while we were writing
|
| + // to the socket.
|
| + if (stream && !stream->cancelled()) {
|
| + // Report the number of bytes written to the caller, but exclude the
|
| + // frame size overhead. NOTE: if this frame was compressed the
|
| + // reported bytes written is the compressed size, not the original
|
| + // size.
|
| + result = in_flight_write_.buffer()->size();
|
| + DCHECK_GE(result,
|
| + static_cast<int>(
|
| + buffered_spdy_framer_->GetControlFrameHeaderSize()));
|
| + result -= buffered_spdy_framer_->GetControlFrameHeaderSize();
|
| +
|
| + stream->OnWriteComplete(result);
|
| + }
|
| +
|
| + // Cleanup the write which just completed.
|
| + in_flight_write_.Release();
|
| }
|
| +
|
| + // Write more data. We're already in a continuation, so we can go
|
| + // ahead and write it immediately (without going back to the message
|
| + // loop).
|
| + WriteSocketLater();
|
| }
|
|
|
| net::Error SpdySession::ReadSocket() {
|
| @@ -997,7 +979,7 @@ net::Error SpdySession::ReadSocket() {
|
| int bytes_read = connection_->socket()->Read(
|
| read_buffer_.get(),
|
| kReadBufferSize,
|
| - base::Bind(&SpdySession::OnReadComplete, base::Unretained(this)));
|
| + base::Bind(&SpdySession::OnReadComplete, weak_factory_.GetWeakPtr()));
|
| switch (bytes_read) {
|
| case 0:
|
| // Socket is closed!
|
| @@ -1051,24 +1033,39 @@ void SpdySession::WriteSocket() {
|
| // Loop sending frames until we've sent everything or until the write
|
| // returns error (or ERR_IO_PENDING).
|
| DCHECK(buffered_spdy_framer_.get());
|
| - while (in_flight_write_.buffer() || !write_queue_.empty()) {
|
| - if (!in_flight_write_.buffer()) {
|
| - // Grab the next SpdyBuffer to send.
|
| - scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
|
| - write_queue_.pop();
|
| - scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
|
| - stream_producers_.erase(producer.get());
|
| + while (true) {
|
| + if (in_flight_write_.buffer()) {
|
| + DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
|
| + } else {
|
| + // Grab the next frame to send.
|
| + scoped_ptr<SpdyFrameProducer> producer;
|
| + scoped_refptr<SpdyStream> stream;
|
| + if (!write_queue_.Dequeue(&producer, &stream))
|
| + break;
|
| +
|
| // It is possible that a stream had data to write, but a
|
| // WINDOW_UPDATE frame has been received which made that
|
| // stream no longer writable.
|
| // TODO(rch): consider handling that case by removing the
|
| // stream from the writable queue?
|
| - if (buffer == NULL)
|
| + if (stream.get() && stream->cancelled())
|
| continue;
|
|
|
| - in_flight_write_ = *buffer;
|
| - } else {
|
| - DCHECK(in_flight_write_.buffer()->BytesRemaining());
|
| + if (stream.get() && stream->stream_id() == 0)
|
| + ActivateStream(stream);
|
| +
|
| + scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
|
| + if (!frame) {
|
| + NOTREACHED();
|
| + continue;
|
| + }
|
| + DCHECK_GT(frame->size(), 0u);
|
| +
|
| + // TODO(mbelshe): We have too much copying of data here.
|
| + scoped_refptr<IOBufferWithSize> buffer =
|
| + new IOBufferWithSize(frame->size());
|
| + memcpy(buffer->data(), frame->data(), frame->size());
|
| + in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
|
| }
|
|
|
| write_pending_ = true;
|
| @@ -1126,12 +1123,7 @@ void SpdySession::CloseAllStreams(net::Error status) {
|
| stream->OnClose(status);
|
| }
|
|
|
| - // We also need to drain the queue.
|
| - while (!write_queue_.empty()) {
|
| - scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
|
| - write_queue_.pop();
|
| - stream_producers_.erase(producer.get());
|
| - }
|
| + write_queue_.Clear();
|
| }
|
|
|
| void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
|
| @@ -1257,33 +1249,18 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
|
| return rv;
|
| }
|
|
|
| -class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
|
| - public:
|
| - SimpleSpdyIOBufferProducer(SpdyFrame* frame,
|
| - RequestPriority priority)
|
| - : frame_(frame),
|
| - priority_(priority) {
|
| - }
|
| -
|
| - virtual RequestPriority GetPriority() const OVERRIDE {
|
| - return priority_;
|
| - }
|
| -
|
| - virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE {
|
| - return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
|
| - frame_.get(), priority_, NULL);
|
| - }
|
| -
|
| - private:
|
| - scoped_ptr<SpdyFrame> frame_;
|
| - RequestPriority priority_;
|
| -};
|
| +void SpdySession::EnqueueSessionWrite(RequestPriority priority,
|
| + scoped_ptr<SpdyFrame> frame) {
|
| + EnqueueWrite(
|
| + priority,
|
| + scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())),
|
| + NULL);
|
| +}
|
|
|
| -void SpdySession::QueueFrame(SpdyFrame* frame,
|
| - RequestPriority priority) {
|
| - SimpleSpdyIOBufferProducer* producer =
|
| - new SimpleSpdyIOBufferProducer(frame, priority);
|
| - write_queue_.push(producer);
|
| +void SpdySession::EnqueueWrite(RequestPriority priority,
|
| + scoped_ptr<SpdyFrameProducer> producer,
|
| + const scoped_refptr<SpdyStream>& stream) {
|
| + write_queue_.Enqueue(priority, producer.Pass(), stream);
|
| WriteSocketLater();
|
| }
|
|
|
| @@ -1303,8 +1280,7 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
|
| // the stream in the unclaimed_pushed_streams_ list. However, if
|
| // the stream is errored out, clean it up entirely.
|
| if (status != OK) {
|
| - PushedStreamMap::iterator it;
|
| - for (it = unclaimed_pushed_streams_.begin();
|
| + for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin();
|
| it != unclaimed_pushed_streams_.end(); ++it) {
|
| scoped_refptr<SpdyStream> curr = it->second.first;
|
| if (id == curr->stream_id()) {
|
| @@ -1315,29 +1291,17 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
|
| }
|
|
|
| // The stream might have been deleted.
|
| - ActiveStreamMap::iterator it2 = active_streams_.find(id);
|
| - if (it2 == active_streams_.end())
|
| + ActiveStreamMap::iterator it = active_streams_.find(id);
|
| + if (it == active_streams_.end())
|
| return;
|
|
|
| - // Possibly remove from the write queue.
|
| - WriteQueue old = write_queue_;
|
| - write_queue_ = WriteQueue();
|
| - while (!old.empty()) {
|
| - scoped_ptr<SpdyIOBufferProducer> producer(old.top());
|
| - old.pop();
|
| - StreamProducerMap::iterator it = stream_producers_.find(producer.get());
|
| - if (it == stream_producers_.end() || it->second->stream_id() != id) {
|
| - write_queue_.push(producer.release());
|
| - } else {
|
| - stream_producers_.erase(producer.get());
|
| - producer.reset(NULL);
|
| - }
|
| - }
|
| + const scoped_refptr<SpdyStream> stream(it->second);
|
| + active_streams_.erase(it);
|
| + DCHECK(stream);
|
| +
|
| + write_queue_.RemovePendingWritesForStream(stream);
|
|
|
| // If this is an active stream, call the callback.
|
| - const scoped_refptr<SpdyStream> stream(it2->second);
|
| - active_streams_.erase(it2);
|
| - DCHECK(stream);
|
| stream->OnClose(status);
|
| ProcessPendingStreamRequests();
|
| }
|
| @@ -1921,7 +1885,7 @@ void SpdySession::SendSettings(const SettingsMap& settings) {
|
| scoped_ptr<SpdyFrame> settings_frame(
|
| buffered_spdy_framer_->CreateSettings(settings));
|
| sent_settings_ = true;
|
| - QueueFrame(settings_frame.release(), HIGHEST);
|
| + EnqueueSessionWrite(HIGHEST, settings_frame.Pass());
|
| }
|
|
|
| void SpdySession::HandleSetting(uint32 id, uint32 value) {
|
| @@ -2009,14 +1973,14 @@ void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id,
|
| DCHECK(buffered_spdy_framer_.get());
|
| scoped_ptr<SpdyFrame> window_update_frame(
|
| buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
|
| - QueueFrame(window_update_frame.release(), priority);
|
| + EnqueueSessionWrite(priority, window_update_frame.Pass());
|
| }
|
|
|
| void SpdySession::WritePingFrame(uint32 unique_id) {
|
| DCHECK(buffered_spdy_framer_.get());
|
| scoped_ptr<SpdyFrame> ping_frame(
|
| buffered_spdy_framer_->CreatePingFrame(unique_id));
|
| - QueueFrame(ping_frame.release(), HIGHEST);
|
| + EnqueueSessionWrite(HIGHEST, ping_frame.Pass());
|
|
|
| if (net_log().IsLoggingAllEvents()) {
|
| net_log().AddEvent(
|
|
|