Chromium Code Reviews| Index: net/spdy/spdy_session.cc |
| diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc |
| index d273291dfa424a34cbbc0f0d4c7074dcdeb2bd45..a1e89912e87a2b259a13e7d88bb986d87da87ae4 100644 |
| --- a/net/spdy/spdy_session.cc |
| +++ b/net/spdy/spdy_session.cc |
| @@ -180,6 +180,28 @@ bool g_enable_ping_based_connection_checking = true; |
| } // namespace |
| // static |
| +void SpdySession::SpdyIOBufferProducer::ActivateStream( |
| + SpdySession* spdy_session, |
| + SpdyStream* spdy_stream) { |
| + spdy_session->ActivateStream(spdy_stream); |
| +} |
| + |
| +// static |
| +SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
| + SpdyFrame* frame, |
| + RequestPriority priority, |
| + SpdyStream* stream) { |
| + size_t size = frame->length() + SpdyFrame::kHeaderSize; |
| + DCHECK_GT(size, 0u); |
| + |
| + // TODO(mbelshe): We have too much copying of data here. |
| + IOBufferWithSize* buffer = new IOBufferWithSize(size); |
| + memcpy(buffer->data(), frame->data(), size); |
| + |
| + return new SpdyIOBuffer(buffer, size, priority, stream); |
| +} |
| + |
| +// static |
| void SpdySession::set_default_protocol(NextProto default_protocol) { |
| g_default_protocol = default_protocol; |
| } |
| @@ -355,6 +377,13 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { |
| return !ssl_info.client_cert_sent && ssl_info.cert->VerifyNameMatch(domain); |
| } |
| +void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, |
| + SpdyIOBufferProducer* producer) { |
| + write_queue_.push(producer); |
| + stream_producers_[producer] = stream; |
| + WriteSocketLater(); |
| +} |
| + |
| int SpdySession::GetPushStream( |
| const GURL& url, |
| scoped_refptr<SpdyStream>* stream, |
| @@ -393,7 +422,8 @@ int SpdySession::CreateStream( |
| const BoundNetLog& stream_net_log, |
| const CompletionCallback& callback) { |
| if (!max_concurrent_streams_ || |
| - active_streams_.size() < max_concurrent_streams_) { |
| + (active_streams_.size() + created_streams_.size() |
| + < max_concurrent_streams_)) { |
| return CreateStreamImpl(url, priority, spdy_stream, stream_net_log); |
|
ramant (doing other things)
2012/06/26 23:30:25
nit: "When expressions are wrapped, the operator s
Ryan Hamilton
2012/06/27 16:58:28
Done.
|
| } |
| @@ -483,10 +513,7 @@ int SpdySession::CreateStreamImpl( |
| const std::string& path = url.PathForRequest(); |
| - const SpdyStreamId stream_id = GetNewStreamId(); |
| - |
| *spdy_stream = new SpdyStream(this, |
| - stream_id, |
| false, |
| stream_net_log); |
| const scoped_refptr<SpdyStream>& stream = *spdy_stream; |
| @@ -495,14 +522,13 @@ int SpdySession::CreateStreamImpl( |
| stream->set_path(path); |
| stream->set_send_window_size(initial_send_window_size_); |
| stream->set_recv_window_size(initial_recv_window_size_); |
| - ActivateStream(stream); |
| + created_streams_.insert(stream); |
| UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount", |
| static_cast<int>(priority), 0, 10, 11); |
| // TODO(mbelshe): Optimize memory allocations |
| - DCHECK_EQ(active_streams_[stream_id].get(), stream.get()); |
| return OK; |
| } |
| @@ -524,15 +550,13 @@ int SpdySession::GetProtocolVersion() const { |
| return buffered_spdy_framer_->protocol_version(); |
| } |
| -int SpdySession::WriteSynStream( |
| +SpdySynStreamControlFrame* SpdySession::CreateSynStream( |
| SpdyStreamId stream_id, |
| RequestPriority priority, |
| uint8 credential_slot, |
| SpdyControlFlags flags, |
| const linked_ptr<SpdyHeaderBlock>& headers) { |
| - // Find our stream |
| - if (!IsStreamActive(stream_id)) |
| - return ERR_INVALID_SPDY_STREAM; |
| + CHECK(IsStreamActive(stream_id)); |
| const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id]; |
| CHECK_EQ(stream->stream_id(), stream_id); |
| @@ -543,11 +567,7 @@ int SpdySession::WriteSynStream( |
| buffered_spdy_framer_->CreateSynStream( |
| stream_id, 0, |
| ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()), |
| - credential_slot, flags, false, headers.get())); |
| - // We enqueue all SYN_STREAM frames at the same priority to ensure |
| - // that we do not send them out-of-order. |
| - // http://crbug.com/111708 |
| - QueueFrame(syn_frame.get(), HIGHEST, stream); |
| + credential_slot, flags, true, headers.get())); |
| base::StatsCounter spdy_requests("spdy.requests"); |
| spdy_requests.Increment(); |
| @@ -559,14 +579,15 @@ int SpdySession::WriteSynStream( |
| base::Bind(&NetLogSpdySynCallback, headers.get(), flags, stream_id, 0)); |
| } |
| - return ERR_IO_PENDING; |
| + return syn_frame.release(); |
| } |
| -int SpdySession::WriteCredentialFrame(const std::string& origin, |
| - SSLClientCertType type, |
| - const std::string& key, |
| - const std::string& cert, |
| - RequestPriority priority) { |
| +SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame( |
| + const std::string& origin, |
| + SSLClientCertType type, |
| + const std::string& key, |
| + const std::string& cert, |
| + RequestPriority priority) { |
| DCHECK(is_secure_); |
| unsigned char secret[32]; // 32 bytes from the spec |
| GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof", |
| @@ -608,24 +629,18 @@ int SpdySession::WriteCredentialFrame(const std::string& origin, |
| DCHECK(buffered_spdy_framer_.get()); |
| scoped_ptr<SpdyCredentialControlFrame> credential_frame( |
| buffered_spdy_framer_->CreateCredentialFrame(credential)); |
| - // We enqueue all SYN_STREAM frames at the same priority to ensure |
| - // that we do not send them out-of-order, which means that we need |
| - // to enqueue all CREDENTIAL frames at this priority to ensure that |
| - // they are sent *before* the SYN_STREAM that references them. |
| - // http://crbug.com/111708 |
| - QueueFrame(credential_frame.get(), HIGHEST, NULL); |
| if (net_log().IsLoggingAllEvents()) { |
| net_log().AddEvent( |
| NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, |
| base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); |
| } |
| - return ERR_IO_PENDING; |
| + return credential_frame.release(); |
| } |
| -int SpdySession::WriteStreamData(SpdyStreamId stream_id, |
| - net::IOBuffer* data, int len, |
| - SpdyDataFlags flags) { |
| +SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
| + net::IOBuffer* data, int len, |
| + SpdyDataFlags flags) { |
| // Find our stream |
| CHECK(IsStreamActive(stream_id)); |
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
| @@ -649,7 +664,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id, |
| net_log().AddEvent( |
| NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW, |
| NetLog::IntegerCallback("stream_id", stream_id)); |
| - return ERR_IO_PENDING; |
| + return NULL; |
| } |
| int new_len = std::min(len, stream->send_window_size()); |
| if (new_len < len) { |
| @@ -674,18 +689,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id, |
| scoped_ptr<SpdyDataFrame> frame( |
| buffered_spdy_framer_->CreateDataFrame( |
| stream_id, data->data(), len, flags)); |
| - QueueFrame(frame.get(), stream->priority(), stream); |
| - return ERR_IO_PENDING; |
| + return frame.release(); |
| } |
| void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { |
| + DCHECK_NE(0u, stream_id); |
| // TODO(mbelshe): We should send a RST_STREAM control frame here |
| // so that the server can cancel a large send. |
| DeleteStream(stream_id, status); |
| } |
| +void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) { |
| + DCHECK_EQ(0u, stream->stream_id()); |
| + created_streams_.erase(scoped_refptr<SpdyStream>(stream)); |
| +} |
| + |
| void SpdySession::ResetStream(SpdyStreamId stream_id, |
| SpdyStatusCodes status, |
| const std::string& description) { |
| @@ -703,7 +723,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, |
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
| priority = stream->priority(); |
| } |
| - QueueFrame(rst_frame.get(), priority, NULL); |
| + QueueFrame(rst_frame.release(), priority); |
| RecordProtocolErrorHistogram( |
| static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); |
| DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); |
| @@ -891,55 +911,22 @@ void SpdySession::WriteSocket() { |
| // Loop sending frames until we've sent everything or until the write |
| // returns error (or ERR_IO_PENDING). |
| DCHECK(buffered_spdy_framer_.get()); |
| - while (in_flight_write_.buffer() || !queue_.empty()) { |
| + while (in_flight_write_.buffer() || !write_queue_.empty()) { |
| if (!in_flight_write_.buffer()) { |
| - // Grab the next SpdyFrame to send. |
| - SpdyIOBuffer next_buffer = queue_.top(); |
| - queue_.pop(); |
| - |
| - // We've deferred compression until just before we write it to the socket, |
| - // which is now. At this time, we don't compress our data frames. |
| - SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false); |
| - size_t size; |
| - if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) { |
| - DCHECK(uncompressed_frame.is_control_frame()); |
| - const SpdyControlFrame* uncompressed_control_frame = |
| - reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame); |
| - scoped_ptr<SpdyFrame> compressed_frame( |
| - buffered_spdy_framer_->CompressControlFrame( |
| - *uncompressed_control_frame)); |
| - if (!compressed_frame.get()) { |
| - RecordProtocolErrorHistogram( |
| - PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE); |
| - CloseSessionOnError( |
| - net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure."); |
| - return; |
| - } |
| - |
| - size = compressed_frame->length() + SpdyFrame::kHeaderSize; |
| - |
| - DCHECK_GT(size, 0u); |
| - |
| - if (uncompressed_control_frame->type() == SYN_STREAM) { |
| - int uncompressed_size = uncompressed_control_frame->length(); |
| - int compressed_size = compressed_frame->length(); |
| - // Make sure we avoid early decimal truncation. |
| - int compression_pct = 100 - (100* compressed_size)/uncompressed_size; |
| - UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage", |
| - compression_pct); |
| - } |
| - |
| - // TODO(mbelshe): We have too much copying of data here. |
| - IOBufferWithSize* buffer = new IOBufferWithSize(size); |
| - memcpy(buffer->data(), compressed_frame->data(), size); |
| - |
| - // Attempt to send the frame. |
| - in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST, |
| - next_buffer.stream()); |
| - } else { |
| - size = uncompressed_frame.length() + SpdyFrame::kHeaderSize; |
| - in_flight_write_ = next_buffer; |
| - } |
| + // Grab the next SpdyBuffer to send. |
| + scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); |
| + scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); |
| + stream_producers_.erase(producer.get()); |
| + write_queue_.pop(); |
| + // It is possible that a stream had data to write, but a |
| + // WINDOW_UPDATE frame has been received which made that |
| + // stream no longer writable. |
| + // TODO(rch): consider handling that case by removing the |
| + // stream from the writable queue? |
| + if (buffer == NULL) |
| + continue; |
| + |
| + in_flight_write_ = *buffer; |
| } else { |
| DCHECK(in_flight_write_.buffer()->BytesRemaining()); |
| } |
| @@ -993,9 +980,23 @@ void SpdySession::CloseAllStreams(net::Error status) { |
| DeleteStream(stream->stream_id(), status); |
| } |
| + while (!created_streams_.empty()) { |
| + CreatedStreamSet::iterator it = created_streams_.begin(); |
| + const scoped_refptr<SpdyStream>& stream = *it; |
| + std::string description = base::StringPrintf( |
| + "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path(); |
| + stream->LogStreamError(status, description); |
| + stream->OnClose(status); |
|
ramant (doing other things)
2012/06/26 23:30:25
nit: consider sharing of code between lines 976-97
Ryan Hamilton
2012/06/27 16:58:28
Done.
|
| + created_streams_.erase(it); |
| + } |
| + |
| // We also need to drain the queue. |
| - while (queue_.size()) |
| - queue_.pop(); |
| + while (!write_queue_.empty()) { |
| + SpdyIOBufferProducer* producer = write_queue_.top(); |
| + stream_producers_.erase(producer); |
| + delete producer; |
| + write_queue_.pop(); |
| + } |
| } |
| int SpdySession::GetNewStreamId() { |
| @@ -1006,17 +1007,6 @@ int SpdySession::GetNewStreamId() { |
| return id; |
| } |
| -void SpdySession::QueueFrame(SpdyFrame* frame, |
| - RequestPriority priority, |
| - SpdyStream* stream) { |
| - int length = SpdyFrame::kHeaderSize + frame->length(); |
| - IOBuffer* buffer = new IOBuffer(length); |
| - memcpy(buffer->data(), frame->data(), length); |
| - queue_.push(SpdyIOBuffer(buffer, length, priority, stream)); |
| - |
| - WriteSocketLater(); |
| -} |
| - |
| void SpdySession::CloseSessionOnError(net::Error err, |
| bool remove_from_pool, |
| const std::string& description) { |
| @@ -1106,7 +1096,41 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const { |
| return connection_->socket()->GetLocalAddress(address); |
| } |
| +class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { |
| + public: |
| + SimpleSpdyIOBufferProducer(SpdyFrame* frame, |
| + RequestPriority priority) |
| + : frame_(frame), |
|
ramant (doing other things)
2012/06/26 23:30:25
nit: indent "RequestPriority ..."
Ryan Hamilton
2012/06/27 16:58:28
Done.
|
| + priority_(priority) { |
| + } |
| + |
| + virtual RequestPriority GetPriority() const OVERRIDE { |
| + return priority_; |
| + } |
| + |
| + virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) { |
| + return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
| + frame_, priority_, NULL); |
| + } |
| + |
| + private: |
| + SpdyFrame* frame_; |
| + RequestPriority priority_; |
| +}; |
| + |
| +void SpdySession::QueueFrame(SpdyFrame* frame, |
| + RequestPriority priority) { |
| + SimpleSpdyIOBufferProducer* producer |
| + = new SimpleSpdyIOBufferProducer(frame, priority); |
| + write_queue_.push(producer); |
|
ramant (doing other things)
2012/06/26 23:30:25
nit: " = " should be in the previous line.
Ryan Hamilton
2012/06/27 16:58:28
Done.
Ryan Hamilton
2012/06/27 16:58:28
Done.
|
| + WriteSocketLater(); |
| +} |
| + |
| void SpdySession::ActivateStream(SpdyStream* stream) { |
| + if (stream->stream_id() == 0) { |
| + stream->set_stream_id(GetNewStreamId()); |
| + created_streams_.erase(scoped_refptr<SpdyStream>(stream)); |
| + } |
| const SpdyStreamId id = stream->stream_id(); |
| DCHECK(!IsStreamActive(id)); |
| @@ -1134,6 +1158,19 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { |
| if (it2 == active_streams_.end()) |
| return; |
| + // Possibly remove from the write queue. |
| + WriteQueue old = write_queue_; |
| + write_queue_ = WriteQueue(); |
| + while (!old.empty()) { |
| + SpdyIOBufferProducer* producer = old.top(); |
| + StreamProducerMap::iterator it = stream_producers_.find(producer); |
| + if (it == stream_producers_.end() || it->second->stream_id() != id) |
| + write_queue_.push(producer); |
| + else |
| + delete producer; |
| + old.pop(); |
| + } |
| + |
| // If this is an active stream, call the callback. |
| const scoped_refptr<SpdyStream> stream(it2->second); |
| active_streams_.erase(it2); |
| @@ -1249,6 +1286,20 @@ void SpdySession::OnSetting(SpdySettingsIds id, |
| id, static_cast<SpdySettingsFlags>(flags), value)); |
| } |
| +void SpdySession::OnControlFrameCompressed( |
| + const SpdyControlFrame& uncompressed_frame, |
| + const SpdyControlFrame& compressed_frame) { |
| + if (uncompressed_frame.type() == SYN_STREAM) { |
| + int uncompressed_size = uncompressed_frame.length(); |
| + int compressed_size = compressed_frame.length(); |
| + // Make sure we avoid early decimal truncation. |
| + int compression_pct = 100 - (100* compressed_size)/uncompressed_size; |
| + UMA_HISTOGRAM_PERCENTAGE("Net.SpdySynStreamCompressionPercentage", |
| + compression_pct); |
| + } |
| +} |
| + |
| + |
| bool SpdySession::Respond(const SpdyHeaderBlock& headers, |
| const scoped_refptr<SpdyStream> stream) { |
| int rv = OK; |
| @@ -1349,8 +1400,8 @@ void SpdySession::OnSynStream( |
| return; |
| } |
| - scoped_refptr<SpdyStream> stream( |
| - new SpdyStream(this, stream_id, true, net_log_)); |
| + scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_)); |
| + stream->set_stream_id(stream_id); |
| stream->set_path(gurl.PathForRequest()); |
| stream->set_send_window_size(initial_send_window_size_); |
| @@ -1372,7 +1423,6 @@ void SpdySession::OnSynStream( |
| void SpdySession::OnSynReply(const SpdySynReplyControlFrame& frame, |
| const linked_ptr<SpdyHeaderBlock>& headers) { |
| SpdyStreamId stream_id = frame.stream_id(); |
| - |
| if (net_log().IsLoggingAllEvents()) { |
| net_log().AddEvent( |
| NetLog::TYPE_SPDY_SESSION_SYN_REPLY, |
| @@ -1556,7 +1606,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id, |
| DCHECK(buffered_spdy_framer_.get()); |
| scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame( |
| buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); |
| - QueueFrame(window_update_frame.get(), stream->priority(), NULL); |
| + QueueFrame(window_update_frame.release(), stream->priority()); |
| } |
| // Given a cwnd that we would have sent to the server, modify it based on the |
| @@ -1642,7 +1692,8 @@ void SpdySession::SendSettings(const SettingsMap& settings) { |
| DCHECK(buffered_spdy_framer_.get()); |
| scoped_ptr<SpdySettingsControlFrame> settings_frame( |
| buffered_spdy_framer_->CreateSettings(settings)); |
| - QueueFrame(settings_frame.get(), HIGHEST, NULL); |
| + sent_settings_ = true; |
| + QueueFrame(settings_frame.release(), HIGHEST); |
| } |
| void SpdySession::HandleSetting(uint32 id, uint32 value) { |
| @@ -1677,6 +1728,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) { |
| DCHECK(stream); |
| stream->AdjustSendWindowSize(delta_window_size); |
| } |
| + |
| + CreatedStreamSet::iterator i; |
| + for (i = created_streams_.begin(); i != created_streams_.end(); i++) { |
| + const scoped_refptr<SpdyStream>& stream = *i; |
| + stream->AdjustSendWindowSize(delta_window_size); |
| + } |
| } |
| void SpdySession::SendPrefacePingIfNoneInFlight() { |
| @@ -1697,7 +1754,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) { |
| DCHECK(buffered_spdy_framer_.get()); |
| scoped_ptr<SpdyPingControlFrame> ping_frame( |
| buffered_spdy_framer_->CreatePingFrame(next_ping_id_)); |
| - QueueFrame(ping_frame.get(), HIGHEST, NULL); |
| + QueueFrame(ping_frame.release(), HIGHEST); |
| if (net_log().IsLoggingAllEvents()) { |
| net_log().AddEvent( |