| Index: net/spdy/spdy_session.cc
|
| diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
|
| index ad300ebbd9ac669d631fd2f12f21773d9efd2d91..ffa64651ad9197b6ad7874d07ae7a10db6d94fd6 100644
|
| --- a/net/spdy/spdy_session.cc
|
| +++ b/net/spdy/spdy_session.cc
|
| @@ -180,6 +180,28 @@ bool g_enable_ping_based_connection_checking = true;
|
| } // namespace
|
|
|
| // static
|
| +void SpdySession::SpdyIOBufferProducer::ActivateStream(
|
| + SpdySession* spdy_session,
|
| + SpdyStream* spdy_stream) {
|
| + spdy_session->ActivateStream(spdy_stream);
|
| +}
|
| +
|
| +// static
|
| +SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
|
| + SpdyFrame* frame,
|
| + RequestPriority priority,
|
| + SpdyStream* stream) {
|
| + size_t size = frame->length() + SpdyFrame::kHeaderSize;
|
| + DCHECK_GT(size, 0u);
|
| +
|
| + // TODO(mbelshe): We have too much copying of data here.
|
| + IOBufferWithSize* buffer = new IOBufferWithSize(size);
|
| + memcpy(buffer->data(), frame->data(), size);
|
| +
|
| + return new SpdyIOBuffer(buffer, size, priority, stream);
|
| +}
|
| +
|
| +// static
|
| void SpdySession::set_default_protocol(NextProto default_protocol) {
|
| g_default_protocol = default_protocol;
|
| }
|
| @@ -355,6 +377,13 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
|
| return !ssl_info.client_cert_sent && ssl_info.cert->VerifyNameMatch(domain);
|
| }
|
|
|
| +void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
|
| + SpdyIOBufferProducer* producer) {
|
| + write_queue_.push(producer);
|
| + stream_producers_[producer] = stream;
|
| + WriteSocketLater();
|
| +}
|
| +
|
| int SpdySession::GetPushStream(
|
| const GURL& url,
|
| scoped_refptr<SpdyStream>* stream,
|
| @@ -393,7 +422,8 @@ int SpdySession::CreateStream(
|
| const BoundNetLog& stream_net_log,
|
| const CompletionCallback& callback) {
|
| if (!max_concurrent_streams_ ||
|
| - active_streams_.size() < max_concurrent_streams_) {
|
| + (active_streams_.size() + created_streams_.size() <
|
| + max_concurrent_streams_)) {
|
| return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
|
| }
|
|
|
| @@ -483,10 +513,7 @@ int SpdySession::CreateStreamImpl(
|
|
|
| const std::string& path = url.PathForRequest();
|
|
|
| - const SpdyStreamId stream_id = GetNewStreamId();
|
| -
|
| *spdy_stream = new SpdyStream(this,
|
| - stream_id,
|
| false,
|
| stream_net_log);
|
| const scoped_refptr<SpdyStream>& stream = *spdy_stream;
|
| @@ -495,14 +522,13 @@ int SpdySession::CreateStreamImpl(
|
| stream->set_path(path);
|
| stream->set_send_window_size(initial_send_window_size_);
|
| stream->set_recv_window_size(initial_recv_window_size_);
|
| - ActivateStream(stream);
|
| + created_streams_.insert(stream);
|
|
|
| UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
|
| static_cast<int>(priority), 0, 10, 11);
|
|
|
| // TODO(mbelshe): Optimize memory allocations
|
|
|
| - DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
|
| return OK;
|
| }
|
|
|
| @@ -524,15 +550,13 @@ int SpdySession::GetProtocolVersion() const {
|
| return buffered_spdy_framer_->protocol_version();
|
| }
|
|
|
| -int SpdySession::WriteSynStream(
|
| +SpdySynStreamControlFrame* SpdySession::CreateSynStream(
|
| SpdyStreamId stream_id,
|
| RequestPriority priority,
|
| uint8 credential_slot,
|
| SpdyControlFlags flags,
|
| const linked_ptr<SpdyHeaderBlock>& headers) {
|
| - // Find our stream
|
| - if (!IsStreamActive(stream_id))
|
| - return ERR_INVALID_SPDY_STREAM;
|
| + CHECK(IsStreamActive(stream_id));
|
| const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
|
| CHECK_EQ(stream->stream_id(), stream_id);
|
|
|
| @@ -543,11 +567,7 @@ int SpdySession::WriteSynStream(
|
| buffered_spdy_framer_->CreateSynStream(
|
| stream_id, 0,
|
| ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
|
| - credential_slot, flags, false, headers.get()));
|
| - // We enqueue all SYN_STREAM frames at the same priority to ensure
|
| - // that we do not send them out-of-order.
|
| - // http://crbug.com/111708
|
| - QueueFrame(syn_frame.get(), HIGHEST, stream);
|
| + credential_slot, flags, true, headers.get()));
|
|
|
| base::StatsCounter spdy_requests("spdy.requests");
|
| spdy_requests.Increment();
|
| @@ -559,14 +579,15 @@ int SpdySession::WriteSynStream(
|
| base::Bind(&NetLogSpdySynCallback, headers.get(), flags, stream_id, 0));
|
| }
|
|
|
| - return ERR_IO_PENDING;
|
| + return syn_frame.release();
|
| }
|
|
|
| -int SpdySession::WriteCredentialFrame(const std::string& origin,
|
| - SSLClientCertType type,
|
| - const std::string& key,
|
| - const std::string& cert,
|
| - RequestPriority priority) {
|
| +SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame(
|
| + const std::string& origin,
|
| + SSLClientCertType type,
|
| + const std::string& key,
|
| + const std::string& cert,
|
| + RequestPriority priority) {
|
| DCHECK(is_secure_);
|
| unsigned char secret[32]; // 32 bytes from the spec
|
| GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof",
|
| @@ -608,24 +629,18 @@ int SpdySession::WriteCredentialFrame(const std::string& origin,
|
| DCHECK(buffered_spdy_framer_.get());
|
| scoped_ptr<SpdyCredentialControlFrame> credential_frame(
|
| buffered_spdy_framer_->CreateCredentialFrame(credential));
|
| - // We enqueue all SYN_STREAM frames at the same priority to ensure
|
| - // that we do not send them out-of-order, which means that we need
|
| - // to enqueue all CREDENTIAL frames at this priority to ensure that
|
| - // they are sent *before* the SYN_STREAM that references them.
|
| - // http://crbug.com/111708
|
| - QueueFrame(credential_frame.get(), HIGHEST, NULL);
|
|
|
| if (net_log().IsLoggingAllEvents()) {
|
| net_log().AddEvent(
|
| NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
|
| base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
|
| }
|
| - return ERR_IO_PENDING;
|
| + return credential_frame.release();
|
| }
|
|
|
| -int SpdySession::WriteStreamData(SpdyStreamId stream_id,
|
| - net::IOBuffer* data, int len,
|
| - SpdyDataFlags flags) {
|
| +SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| + net::IOBuffer* data, int len,
|
| + SpdyDataFlags flags) {
|
| // Find our stream
|
| CHECK(IsStreamActive(stream_id));
|
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
|
| @@ -649,7 +664,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
|
| net_log().AddEvent(
|
| NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
|
| NetLog::IntegerCallback("stream_id", stream_id));
|
| - return ERR_IO_PENDING;
|
| + return NULL;
|
| }
|
| int new_len = std::min(len, stream->send_window_size());
|
| if (new_len < len) {
|
| @@ -674,18 +689,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
|
| scoped_ptr<SpdyDataFrame> frame(
|
| buffered_spdy_framer_->CreateDataFrame(
|
| stream_id, data->data(), len, flags));
|
| - QueueFrame(frame.get(), stream->priority(), stream);
|
|
|
| - return ERR_IO_PENDING;
|
| + return frame.release();
|
| }
|
|
|
| void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
|
| + DCHECK_NE(0u, stream_id);
|
| // TODO(mbelshe): We should send a RST_STREAM control frame here
|
| // so that the server can cancel a large send.
|
|
|
| DeleteStream(stream_id, status);
|
| }
|
|
|
| +void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) {
|
| + DCHECK_EQ(0u, stream->stream_id());
|
| + created_streams_.erase(scoped_refptr<SpdyStream>(stream));
|
| +}
|
| +
|
| void SpdySession::ResetStream(SpdyStreamId stream_id,
|
| SpdyStatusCodes status,
|
| const std::string& description) {
|
| @@ -703,7 +723,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
|
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
|
| priority = stream->priority();
|
| }
|
| - QueueFrame(rst_frame.get(), priority, NULL);
|
| + QueueFrame(rst_frame.release(), priority);
|
| RecordProtocolErrorHistogram(
|
| static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
|
| DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
|
| @@ -891,46 +911,22 @@ void SpdySession::WriteSocket() {
|
| // Loop sending frames until we've sent everything or until the write
|
| // returns error (or ERR_IO_PENDING).
|
| DCHECK(buffered_spdy_framer_.get());
|
| - while (in_flight_write_.buffer() || !queue_.empty()) {
|
| + while (in_flight_write_.buffer() || !write_queue_.empty()) {
|
| if (!in_flight_write_.buffer()) {
|
| - // Grab the next SpdyFrame to send.
|
| - SpdyIOBuffer next_buffer = queue_.top();
|
| - queue_.pop();
|
| -
|
| - // We've deferred compression until just before we write it to the socket,
|
| - // which is now. At this time, we don't compress our data frames.
|
| - SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
|
| - size_t size;
|
| - if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) {
|
| - DCHECK(uncompressed_frame.is_control_frame());
|
| - const SpdyControlFrame* uncompressed_control_frame =
|
| - reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame);
|
| - scoped_ptr<SpdyFrame> compressed_frame(
|
| - buffered_spdy_framer_->CompressControlFrame(
|
| - *uncompressed_control_frame));
|
| - if (!compressed_frame.get()) {
|
| - RecordProtocolErrorHistogram(
|
| - PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE);
|
| - CloseSessionOnError(
|
| - net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure.");
|
| - return;
|
| - }
|
| -
|
| - size = compressed_frame->length() + SpdyFrame::kHeaderSize;
|
| -
|
| - DCHECK_GT(size, 0u);
|
| -
|
| - // TODO(mbelshe): We have too much copying of data here.
|
| - IOBufferWithSize* buffer = new IOBufferWithSize(size);
|
| - memcpy(buffer->data(), compressed_frame->data(), size);
|
| -
|
| - // Attempt to send the frame.
|
| - in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST,
|
| - next_buffer.stream());
|
| - } else {
|
| - size = uncompressed_frame.length() + SpdyFrame::kHeaderSize;
|
| - in_flight_write_ = next_buffer;
|
| - }
|
| + // Grab the next SpdyBuffer to send.
|
| + scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
|
| + scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
|
| + stream_producers_.erase(producer.get());
|
| + write_queue_.pop();
|
| + // It is possible that a stream had data to write, but a
|
| + // WINDOW_UPDATE frame has been received which made that
|
| + // stream no longer writable.
|
| + // TODO(rch): consider handling that case by removing the
|
| + // stream from the writable queue?
|
| + if (buffer == NULL)
|
| + continue;
|
| +
|
| + in_flight_write_ = *buffer;
|
| } else {
|
| DCHECK(in_flight_write_.buffer()->BytesRemaining());
|
| }
|
| @@ -977,16 +973,33 @@ void SpdySession::CloseAllStreams(net::Error status) {
|
| while (!active_streams_.empty()) {
|
| ActiveStreamMap::iterator it = active_streams_.begin();
|
| const scoped_refptr<SpdyStream>& stream = it->second;
|
| - DCHECK(stream);
|
| - std::string description = base::StringPrintf(
|
| - "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
|
| - stream->LogStreamError(status, description);
|
| + LogAbandonedStream(stream, status);
|
| DeleteStream(stream->stream_id(), status);
|
| }
|
|
|
| + while (!created_streams_.empty()) {
|
| + CreatedStreamSet::iterator it = created_streams_.begin();
|
| + const scoped_refptr<SpdyStream>& stream = *it;
|
| + LogAbandonedStream(stream, status);
|
| + stream->OnClose(status);
|
| + created_streams_.erase(it);
|
| + }
|
| +
|
| // We also need to drain the queue.
|
| - while (queue_.size())
|
| - queue_.pop();
|
| + while (!write_queue_.empty()) {
|
| + SpdyIOBufferProducer* producer = write_queue_.top();
|
| + stream_producers_.erase(producer);
|
| + delete producer;
|
| + write_queue_.pop();
|
| + }
|
| +}
|
| +
|
| +void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
|
| + net::Error status) {
|
| + DCHECK(stream);
|
| + std::string description = base::StringPrintf(
|
| + "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
|
| + stream->LogStreamError(status, description);
|
| }
|
|
|
| int SpdySession::GetNewStreamId() {
|
| @@ -997,17 +1010,6 @@ int SpdySession::GetNewStreamId() {
|
| return id;
|
| }
|
|
|
| -void SpdySession::QueueFrame(SpdyFrame* frame,
|
| - RequestPriority priority,
|
| - SpdyStream* stream) {
|
| - int length = SpdyFrame::kHeaderSize + frame->length();
|
| - IOBuffer* buffer = new IOBuffer(length);
|
| - memcpy(buffer->data(), frame->data(), length);
|
| - queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
|
| -
|
| - WriteSocketLater();
|
| -}
|
| -
|
| void SpdySession::CloseSessionOnError(net::Error err,
|
| bool remove_from_pool,
|
| const std::string& description) {
|
| @@ -1097,7 +1099,41 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
|
| return connection_->socket()->GetLocalAddress(address);
|
| }
|
|
|
| +class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
|
| + public:
|
| + SimpleSpdyIOBufferProducer(SpdyFrame* frame,
|
| + RequestPriority priority)
|
| + : frame_(frame),
|
| + priority_(priority) {
|
| + }
|
| +
|
| + virtual RequestPriority GetPriority() const OVERRIDE {
|
| + return priority_;
|
| + }
|
| +
|
| + virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) {
|
| + return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
|
| + frame_, priority_, NULL);
|
| + }
|
| +
|
| + private:
|
| + SpdyFrame* frame_;
|
| + RequestPriority priority_;
|
| +};
|
| +
|
| +void SpdySession::QueueFrame(SpdyFrame* frame,
|
| + RequestPriority priority) {
|
| + SimpleSpdyIOBufferProducer* producer =
|
| + new SimpleSpdyIOBufferProducer(frame, priority);
|
| + write_queue_.push(producer);
|
| + WriteSocketLater();
|
| +}
|
| +
|
| void SpdySession::ActivateStream(SpdyStream* stream) {
|
| + if (stream->stream_id() == 0) {
|
| + stream->set_stream_id(GetNewStreamId());
|
| + created_streams_.erase(scoped_refptr<SpdyStream>(stream));
|
| + }
|
| const SpdyStreamId id = stream->stream_id();
|
| DCHECK(!IsStreamActive(id));
|
|
|
| @@ -1125,6 +1161,19 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
|
| if (it2 == active_streams_.end())
|
| return;
|
|
|
| + // Possibly remove from the write queue.
|
| + WriteQueue old = write_queue_;
|
| + write_queue_ = WriteQueue();
|
| + while (!old.empty()) {
|
| + SpdyIOBufferProducer* producer = old.top();
|
| + StreamProducerMap::iterator it = stream_producers_.find(producer);
|
| + if (it == stream_producers_.end() || it->second->stream_id() != id)
|
| + write_queue_.push(producer);
|
| + else
|
| + delete producer;
|
| + old.pop();
|
| + }
|
| +
|
| // If this is an active stream, call the callback.
|
| const scoped_refptr<SpdyStream> stream(it2->second);
|
| active_streams_.erase(it2);
|
| @@ -1354,8 +1403,8 @@ void SpdySession::OnSynStream(
|
| return;
|
| }
|
|
|
| - scoped_refptr<SpdyStream> stream(
|
| - new SpdyStream(this, stream_id, true, net_log_));
|
| + scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_));
|
| + stream->set_stream_id(stream_id);
|
|
|
| stream->set_path(gurl.PathForRequest());
|
| stream->set_send_window_size(initial_send_window_size_);
|
| @@ -1377,7 +1426,6 @@ void SpdySession::OnSynStream(
|
| void SpdySession::OnSynReply(const SpdySynReplyControlFrame& frame,
|
| const linked_ptr<SpdyHeaderBlock>& headers) {
|
| SpdyStreamId stream_id = frame.stream_id();
|
| -
|
| if (net_log().IsLoggingAllEvents()) {
|
| net_log().AddEvent(
|
| NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
|
| @@ -1561,7 +1609,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id,
|
| DCHECK(buffered_spdy_framer_.get());
|
| scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame(
|
| buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
|
| - QueueFrame(window_update_frame.get(), stream->priority(), NULL);
|
| + QueueFrame(window_update_frame.release(), stream->priority());
|
| }
|
|
|
| // Given a cwnd that we would have sent to the server, modify it based on the
|
| @@ -1600,7 +1648,6 @@ void SpdySession::SendInitialSettings() {
|
| settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
|
| SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_);
|
| }
|
| - sent_settings_ = true;
|
| SendSettings(settings_map);
|
| }
|
|
|
| @@ -1633,7 +1680,6 @@ void SpdySession::SendInitialSettings() {
|
| HandleSetting(new_id, new_val);
|
| }
|
|
|
| - sent_settings_ = true;
|
| SendSettings(settings_map_new);
|
| }
|
|
|
| @@ -1647,7 +1693,8 @@ void SpdySession::SendSettings(const SettingsMap& settings) {
|
| DCHECK(buffered_spdy_framer_.get());
|
| scoped_ptr<SpdySettingsControlFrame> settings_frame(
|
| buffered_spdy_framer_->CreateSettings(settings));
|
| - QueueFrame(settings_frame.get(), HIGHEST, NULL);
|
| + sent_settings_ = true;
|
| + QueueFrame(settings_frame.release(), HIGHEST);
|
| }
|
|
|
| void SpdySession::HandleSetting(uint32 id, uint32 value) {
|
| @@ -1682,6 +1729,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
|
| DCHECK(stream);
|
| stream->AdjustSendWindowSize(delta_window_size);
|
| }
|
| +
|
| + CreatedStreamSet::iterator i;
|
| + for (i = created_streams_.begin(); i != created_streams_.end(); i++) {
|
| + const scoped_refptr<SpdyStream>& stream = *i;
|
| + stream->AdjustSendWindowSize(delta_window_size);
|
| + }
|
| }
|
|
|
| void SpdySession::SendPrefacePingIfNoneInFlight() {
|
| @@ -1702,7 +1755,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) {
|
| DCHECK(buffered_spdy_framer_.get());
|
| scoped_ptr<SpdyPingControlFrame> ping_frame(
|
| buffered_spdy_framer_->CreatePingFrame(next_ping_id_));
|
| - QueueFrame(ping_frame.get(), HIGHEST, NULL);
|
| + QueueFrame(ping_frame.release(), HIGHEST);
|
|
|
| if (net_log().IsLoggingAllEvents()) {
|
| net_log().AddEvent(
|
|
|