Chromium Code Reviews| Index: net/spdy/spdy_session.cc |
| diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc |
| index 21fcd132594801850efd8615595f7b34ced9d4e6..eb20251aaf26a5b8a0b4542d7d61dd38aed8b3d2 100644 |
| --- a/net/spdy/spdy_session.cc |
| +++ b/net/spdy/spdy_session.cc |
| @@ -271,28 +271,6 @@ void SpdyStreamRequest::Reset() { |
| callback_.Reset(); |
| } |
| -// static |
| -void SpdySession::SpdyIOBufferProducer::ActivateStream( |
| - SpdySession* spdy_session, |
| - SpdyStream* spdy_stream) { |
| - spdy_session->ActivateStream(spdy_stream); |
| -} |
| - |
| -// static |
| -SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
| - SpdyFrame* frame, |
| - RequestPriority priority, |
| - SpdyStream* stream) { |
| - size_t size = frame->size(); |
| - DCHECK_GT(size, 0u); |
| - |
| - // TODO(mbelshe): We have too much copying of data here. |
| - IOBufferWithSize* buffer = new IOBufferWithSize(size); |
| - memcpy(buffer->data(), frame->data(), size); |
| - |
| - return new SpdyIOBuffer(buffer, size, priority, stream); |
| -} |
| - |
| SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, |
| SpdySessionPool* spdy_session_pool, |
| HttpServerProperties* http_server_properties, |
| @@ -389,7 +367,7 @@ SpdySession::~SpdySession() { |
| DCHECK_EQ(0u, num_active_streams()); |
| DCHECK_EQ(0u, num_unclaimed_pushed_streams()); |
| - for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) { |
| + for (int i = 0; i < NUM_PRIORITIES; ++i) { |
| DCHECK(pending_create_stream_queues_[i].empty()); |
| } |
| DCHECK(pending_stream_request_completions_.empty()); |
| @@ -485,13 +463,6 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { |
| ssl_info.cert->VerifyNameMatch(domain); |
| } |
| -void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, |
| - SpdyIOBufferProducer* producer) { |
| - write_queue_.push(producer); |
| - stream_producers_[producer] = stream; |
| - WriteSocketLater(); |
| -} |
| - |
| int SpdySession::GetPushStream( |
| const GURL& url, |
| scoped_refptr<SpdyStream>* stream, |
| @@ -645,7 +616,13 @@ int SpdySession::GetProtocolVersion() const { |
| return buffered_spdy_framer_->protocol_version(); |
| } |
| -SpdyFrame* SpdySession::CreateSynStream( |
| +void SpdySession::EnqueueStreamWrite( |
| + SpdyStream* stream, |
| + scoped_ptr<SpdyFrameProducer> producer) { |
| + EnqueueWrite(stream->priority(), producer.Pass(), stream); |
| +} |
| + |
| +scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( |
| SpdyStreamId stream_id, |
| RequestPriority priority, |
| uint8 credential_slot, |
| @@ -677,15 +654,16 @@ SpdyFrame* SpdySession::CreateSynStream( |
| stream_id, 0)); |
| } |
| - return syn_frame.release(); |
| + return syn_frame.Pass(); |
| } |
| -SpdyFrame* SpdySession::CreateCredentialFrame( |
| +int SpdySession::CreateCredentialFrame( |
| const std::string& origin, |
| SSLClientCertType type, |
| const std::string& key, |
| const std::string& cert, |
| - RequestPriority priority) { |
| + RequestPriority priority, |
| + scoped_ptr<SpdyFrame>* credential_frame) { |
| DCHECK(is_secure_); |
| SSLClientSocket* ssl_socket = GetSSLClientSocket(); |
| DCHECK(ssl_socket); |
| @@ -697,12 +675,13 @@ SpdyFrame* SpdySession::CreateCredentialFrame( |
| size_t slot = credential_state_.SetHasCredential(GURL(origin)); |
| int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot, |
| &credential); |
| - DCHECK_EQ(OK, rv); |
| - if (rv != OK) |
| - return NULL; |
| + if (rv != OK) { |
| + DCHECK_NE(rv, ERR_IO_PENDING); |
|
Ryan Hamilton
2013/03/28 15:51:04
if you want to save a line, you could write this a
akalin
2013/04/06 01:17:48
Done.
|
| + return rv; |
| + } |
| DCHECK(buffered_spdy_framer_.get()); |
| - scoped_ptr<SpdyFrame> credential_frame( |
| + credential_frame->reset( |
| buffered_spdy_framer_->CreateCredentialFrame(credential)); |
| if (net_log().IsLoggingAllEvents()) { |
| @@ -710,10 +689,10 @@ SpdyFrame* SpdySession::CreateCredentialFrame( |
| NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, |
| base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); |
| } |
| - return credential_frame.release(); |
| + return OK; |
| } |
| -SpdyFrame* SpdySession::CreateHeadersFrame( |
| +scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame( |
| SpdyStreamId stream_id, |
| const SpdyHeaderBlock& headers, |
| SpdyControlFlags flags) { |
| @@ -735,12 +714,14 @@ SpdyFrame* SpdySession::CreateHeadersFrame( |
| &headers, fin, /*unidirectional=*/false, |
| stream_id, 0)); |
| } |
| - return frame.release(); |
| + return frame.Pass(); |
| } |
| -SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
| - net::IOBuffer* data, int len, |
| - SpdyDataFlags flags) { |
| +scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame( |
| + SpdyStreamId stream_id, |
|
Ryan Hamilton
2013/03/28 15:51:04
I think you can move this to the previous line, an
akalin
2013/04/06 01:17:48
Done.
|
| + net::IOBuffer* data, |
| + int len, |
| + SpdyDataFlags flags) { |
| // Find our stream |
| CHECK(IsStreamActive(stream_id)); |
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
| @@ -748,7 +729,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
| if (len < 0) { |
| NOTREACHED(); |
| - return NULL; |
| + return scoped_ptr<SpdyFrame>(); |
| } |
| if (len > kMaxSpdyFrameChunkSize) { |
| @@ -771,7 +752,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
| net_log().AddEvent( |
| NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, |
| NetLog::IntegerCallback("stream_id", stream_id)); |
| - return NULL; |
| + return scoped_ptr<SpdyFrame>(); |
| } |
| if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { |
| effective_window_size = |
| @@ -783,7 +764,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
| net_log().AddEvent( |
| NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, |
| NetLog::IntegerCallback("stream_id", stream_id)); |
| - return NULL; |
| + return scoped_ptr<SpdyFrame>(); |
| } |
| } |
| @@ -814,7 +795,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
| buffered_spdy_framer_->CreateDataFrame( |
| stream_id, data->data(), static_cast<uint32>(len), flags)); |
| - return frame.release(); |
| + return frame.Pass(); |
| } |
| void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { |
| @@ -849,7 +830,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, |
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
| priority = stream->priority(); |
| } |
| - QueueFrame(rst_frame.release(), priority); |
| + EnqueueSessionWrite(priority, rst_frame.Pass()); |
| RecordProtocolErrorHistogram( |
| static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); |
| DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); |
| @@ -970,56 +951,53 @@ int SpdySession::DoReadComplete(int result) { |
| void SpdySession::OnWriteComplete(int result) { |
| DCHECK(write_pending_); |
| - DCHECK(in_flight_write_.size()); |
| + DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
| last_activity_time_ = base::TimeTicks::Now(); |
| write_pending_ = false; |
| - scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); |
| - |
| - if (result >= 0) { |
| - // It should not be possible to have written more bytes than our |
| - // in_flight_write_. |
| - DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); |
| - |
| - in_flight_write_.buffer()->DidConsume(result); |
| - |
| - // We only notify the stream when we've fully written the pending frame. |
| - if (!in_flight_write_.buffer()->BytesRemaining()) { |
| - if (stream) { |
| - // Report the number of bytes written to the caller, but exclude the |
| - // frame size overhead. NOTE: if this frame was compressed the |
| - // reported bytes written is the compressed size, not the original |
| - // size. |
| - if (result > 0) { |
| - result = in_flight_write_.buffer()->size(); |
| - DCHECK_GE(result, |
| - static_cast<int>( |
| - buffered_spdy_framer_->GetControlFrameHeaderSize())); |
| - result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); |
| - } |
| - |
| - // It is possible that the stream was cancelled while we were writing |
| - // to the socket. |
| - if (!stream->cancelled()) |
| - stream->OnWriteComplete(result); |
| - } |
| + if (result < 0) { |
| + in_flight_write_.Release(); |
| + CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); |
| + return; |
| + } |
| - // Cleanup the write which just completed. |
| - in_flight_write_.release(); |
| - } |
| + // It should not be possible to have written more bytes than our |
| + // in_flight_write_. |
| + DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); |
| - // Write more data. We're already in a continuation, so we can |
| - // go ahead and write it immediately (without going back to the |
| - // message loop). |
| - WriteSocketLater(); |
| - } else { |
| - in_flight_write_.release(); |
| + in_flight_write_.buffer()->DidConsume(result); |
| - // The stream is now errored. Close it down. |
| - CloseSessionOnError( |
| - static_cast<net::Error>(result), true, "The stream has errored."); |
| + // We only notify the stream when we've fully written the pending frame. |
| + if (in_flight_write_.buffer()->BytesRemaining() == 0) { |
| + DCHECK_GT(result, 0); |
| + |
| + scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); |
| + |
| + // It is possible that the stream was cancelled while we were writing |
| + // to the socket. |
| + if (stream && !stream->cancelled()) { |
| + // Report the number of bytes written to the caller, but exclude the |
| + // frame size overhead. NOTE: if this frame was compressed the |
| + // reported bytes written is the compressed size, not the original |
| + // size. |
|
Ryan Hamilton
2013/03/28 15:51:04
I wonder if the caller actually cares about the nu
akalin
2013/04/06 01:17:48
Yeah, the callers do care about the number of byte
|
| + result = in_flight_write_.buffer()->size(); |
| + DCHECK_GE(result, |
| + static_cast<int>( |
| + buffered_spdy_framer_->GetControlFrameHeaderSize())); |
| + result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); |
| + |
| + stream->OnWriteComplete(result); |
| + } |
| + |
| + // Cleanup the write which just completed. |
| + in_flight_write_.Release(); |
| } |
| + |
| + // Write more data. We're already in a continuation, so we can go |
| + // ahead and write it immediately (without going back to the message |
| + // loop). |
| + WriteSocketLater(); |
| } |
| void SpdySession::WriteSocketLater() { |
| @@ -1052,24 +1030,39 @@ void SpdySession::WriteSocket() { |
| // Loop sending frames until we've sent everything or until the write |
| // returns error (or ERR_IO_PENDING). |
| DCHECK(buffered_spdy_framer_.get()); |
| - while (in_flight_write_.buffer() || !write_queue_.empty()) { |
| - if (!in_flight_write_.buffer()) { |
| - // Grab the next SpdyBuffer to send. |
| - scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); |
| - write_queue_.pop(); |
| - scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); |
| - stream_producers_.erase(producer.get()); |
| + while (true) { |
| + if (in_flight_write_.buffer()) { |
| + DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
| + } else { |
| + // Grab the next frame to send. |
| + scoped_ptr<SpdyFrameProducer> producer; |
| + scoped_refptr<SpdyStream> stream; |
| + if (!write_queue_.Dequeue(&producer, &stream)) |
| + break; |
| + |
| // It is possible that a stream had data to write, but a |
| // WINDOW_UPDATE frame has been received which made that |
| // stream no longer writable. |
| // TODO(rch): consider handling that case by removing the |
| // stream from the writable queue? |
| - if (buffer == NULL) |
| + if (stream.get() && stream->cancelled()) |
| continue; |
| - in_flight_write_ = *buffer; |
| - } else { |
| - DCHECK(in_flight_write_.buffer()->BytesRemaining()); |
| + if (stream.get() && stream->stream_id() == 0) |
| + ActivateStream(stream); |
| + |
| + scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); |
| + if (!frame) { |
| + NOTREACHED(); |
| + continue; |
| + } |
| + DCHECK_GT(frame->size(), 0u); |
| + |
| + // TODO(mbelshe): We have too much copying of data here. |
| + scoped_refptr<IOBufferWithSize> buffer = |
| + new IOBufferWithSize(frame->size()); |
| + memcpy(buffer->data(), frame->data(), frame->size()); |
| + in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream); |
| } |
| write_pending_ = true; |
| @@ -1127,12 +1120,7 @@ void SpdySession::CloseAllStreams(net::Error status) { |
| stream->OnClose(status); |
| } |
| - // We also need to drain the queue. |
| - while (!write_queue_.empty()) { |
| - scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); |
| - write_queue_.pop(); |
| - stream_producers_.erase(producer.get()); |
| - } |
| + write_queue_.Clear(); |
| } |
| void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, |
| @@ -1255,33 +1243,18 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const { |
| return connection_->socket()->GetLocalAddress(address); |
| } |
| -class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { |
| - public: |
| - SimpleSpdyIOBufferProducer(SpdyFrame* frame, |
| - RequestPriority priority) |
| - : frame_(frame), |
| - priority_(priority) { |
| - } |
| - |
| - virtual RequestPriority GetPriority() const OVERRIDE { |
| - return priority_; |
| - } |
| - |
| - virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { |
| - return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
| - frame_.get(), priority_, NULL); |
| - } |
| - |
| - private: |
| - scoped_ptr<SpdyFrame> frame_; |
| - RequestPriority priority_; |
| -}; |
| +void SpdySession::EnqueueSessionWrite(RequestPriority priority, |
| + scoped_ptr<SpdyFrame> frame) { |
| + EnqueueWrite( |
| + priority, |
| + scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())), |
| + NULL); |
| +} |
| -void SpdySession::QueueFrame(SpdyFrame* frame, |
| - RequestPriority priority) { |
| - SimpleSpdyIOBufferProducer* producer = |
| - new SimpleSpdyIOBufferProducer(frame, priority); |
| - write_queue_.push(producer); |
| +void SpdySession::EnqueueWrite(RequestPriority priority, |
| + scoped_ptr<SpdyFrameProducer> producer, |
| + const scoped_refptr<SpdyStream>& stream) { |
| + write_queue_.Enqueue(priority, producer.Pass(), stream); |
| WriteSocketLater(); |
| } |
| @@ -1301,8 +1274,7 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { |
| // the stream in the unclaimed_pushed_streams_ list. However, if |
| // the stream is errored out, clean it up entirely. |
| if (status != OK) { |
| - PushedStreamMap::iterator it; |
| - for (it = unclaimed_pushed_streams_.begin(); |
| + for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); |
| it != unclaimed_pushed_streams_.end(); ++it) { |
| scoped_refptr<SpdyStream> curr = it->second.first; |
| if (id == curr->stream_id()) { |
| @@ -1313,29 +1285,17 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { |
| } |
| // The stream might have been deleted. |
| - ActiveStreamMap::iterator it2 = active_streams_.find(id); |
| - if (it2 == active_streams_.end()) |
| + ActiveStreamMap::iterator it = active_streams_.find(id); |
| + if (it == active_streams_.end()) |
| return; |
| - // Possibly remove from the write queue. |
| - WriteQueue old = write_queue_; |
| - write_queue_ = WriteQueue(); |
| - while (!old.empty()) { |
| - scoped_ptr<SpdyIOBufferProducer> producer(old.top()); |
| - old.pop(); |
| - StreamProducerMap::iterator it = stream_producers_.find(producer.get()); |
| - if (it == stream_producers_.end() || it->second->stream_id() != id) { |
| - write_queue_.push(producer.release()); |
| - } else { |
| - stream_producers_.erase(producer.get()); |
| - producer.reset(NULL); |
| - } |
| - } |
| + const scoped_refptr<SpdyStream> stream(it->second); |
| + active_streams_.erase(it); |
| + DCHECK(stream); |
| + |
| + write_queue_.RemovePendingWritesForStream(stream); |
| // If this is an active stream, call the callback. |
| - const scoped_refptr<SpdyStream> stream(it2->second); |
| - active_streams_.erase(it2); |
| - DCHECK(stream); |
| stream->OnClose(status); |
| ProcessPendingStreamRequests(); |
| } |
| @@ -1919,7 +1879,7 @@ void SpdySession::SendSettings(const SettingsMap& settings) { |
| scoped_ptr<SpdyFrame> settings_frame( |
| buffered_spdy_framer_->CreateSettings(settings)); |
| sent_settings_ = true; |
| - QueueFrame(settings_frame.release(), HIGHEST); |
| + EnqueueSessionWrite(HIGHEST, settings_frame.Pass()); |
| } |
| void SpdySession::HandleSetting(uint32 id, uint32 value) { |
| @@ -2007,14 +1967,14 @@ void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, |
| DCHECK(buffered_spdy_framer_.get()); |
| scoped_ptr<SpdyFrame> window_update_frame( |
| buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); |
| - QueueFrame(window_update_frame.release(), priority); |
| + EnqueueSessionWrite(priority, window_update_frame.Pass()); |
| } |
| void SpdySession::WritePingFrame(uint32 unique_id) { |
| DCHECK(buffered_spdy_framer_.get()); |
| scoped_ptr<SpdyFrame> ping_frame( |
| buffered_spdy_framer_->CreatePingFrame(unique_id)); |
| - QueueFrame(ping_frame.release(), HIGHEST); |
| + EnqueueSessionWrite(HIGHEST, ping_frame.Pass()); |
| if (net_log().IsLoggingAllEvents()) { |
| net_log().AddEvent( |