Index: net/spdy/spdy_session.cc |
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc |
index 9943190e72dd5117fcde19b242da7d05b02941c6..e34c16f78eb427c8ce48c17e306531cc4d96ca53 100644 |
--- a/net/spdy/spdy_session.cc |
+++ b/net/spdy/spdy_session.cc |
@@ -32,6 +32,7 @@ |
#include "net/http/http_server_properties.h" |
#include "net/spdy/spdy_credential_builder.h" |
#include "net/spdy/spdy_frame_builder.h" |
+#include "net/spdy/spdy_frame_producer.h" |
#include "net/spdy/spdy_http_utils.h" |
#include "net/spdy/spdy_protocol.h" |
#include "net/spdy/spdy_session_pool.h" |
@@ -286,28 +287,6 @@ void SpdyStreamRequest::Reset() { |
callback_.Reset(); |
} |
-// static |
-void SpdySession::SpdyIOBufferProducer::ActivateStream( |
- SpdySession* spdy_session, |
- SpdyStream* spdy_stream) { |
- spdy_session->ActivateStream(spdy_stream); |
-} |
- |
-// static |
-SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
- SpdyFrame* frame, |
- RequestPriority priority, |
- SpdyStream* stream) { |
- size_t size = frame->size(); |
- DCHECK_GT(size, 0u); |
- |
- // TODO(mbelshe): We have too much copying of data here. |
- IOBufferWithSize* buffer = new IOBufferWithSize(size); |
- memcpy(buffer->data(), frame->data(), size); |
- |
- return new SpdyIOBuffer(buffer, size, priority, stream); |
-} |
- |
SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, |
SpdySessionPool* spdy_session_pool, |
HttpServerProperties* http_server_properties, |
@@ -404,7 +383,7 @@ SpdySession::~SpdySession() { |
DCHECK_EQ(0u, num_active_streams()); |
DCHECK_EQ(0u, num_unclaimed_pushed_streams()); |
- for (int i = NUM_PRIORITIES - 1; i >= MINIMUM_PRIORITY; --i) { |
+ for (int i = 0; i < NUM_PRIORITIES; ++i) { |
DCHECK(pending_create_stream_queues_[i].empty()); |
} |
DCHECK(pending_stream_request_completions_.empty()); |
@@ -499,13 +478,6 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) { |
ssl_info.cert->VerifyNameMatch(domain); |
} |
-void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream, |
- SpdyIOBufferProducer* producer) { |
- write_queue_.push(producer); |
- stream_producers_[producer] = stream; |
- WriteSocketLater(); |
-} |
- |
int SpdySession::GetPushStream( |
const GURL& url, |
scoped_refptr<SpdyStream>* stream, |
@@ -659,7 +631,13 @@ int SpdySession::GetProtocolVersion() const { |
return buffered_spdy_framer_->protocol_version(); |
} |
-SpdyFrame* SpdySession::CreateSynStream( |
+void SpdySession::EnqueueStreamWrite( |
+ SpdyStream* stream, |
+ scoped_ptr<SpdyFrameProducer> producer) { |
+ EnqueueWrite(stream->priority(), producer.Pass(), stream); |
+} |
+ |
+scoped_ptr<SpdyFrame> SpdySession::CreateSynStream( |
SpdyStreamId stream_id, |
RequestPriority priority, |
uint8 credential_slot, |
@@ -691,15 +669,16 @@ SpdyFrame* SpdySession::CreateSynStream( |
stream_id, 0)); |
} |
- return syn_frame.release(); |
+ return syn_frame.Pass(); |
} |
-SpdyFrame* SpdySession::CreateCredentialFrame( |
+int SpdySession::CreateCredentialFrame( |
const std::string& origin, |
SSLClientCertType type, |
const std::string& key, |
const std::string& cert, |
- RequestPriority priority) { |
+ RequestPriority priority, |
+ scoped_ptr<SpdyFrame>* credential_frame) { |
DCHECK(is_secure_); |
SSLClientSocket* ssl_socket = GetSSLClientSocket(); |
DCHECK(ssl_socket); |
@@ -711,12 +690,12 @@ SpdyFrame* SpdySession::CreateCredentialFrame( |
size_t slot = credential_state_.SetHasCredential(GURL(origin)); |
int rv = SpdyCredentialBuilder::Build(tls_unique, type, key, cert, slot, |
&credential); |
- DCHECK_EQ(OK, rv); |
+ DCHECK_NE(rv, ERR_IO_PENDING); |
if (rv != OK) |
- return NULL; |
+ return rv; |
DCHECK(buffered_spdy_framer_.get()); |
- scoped_ptr<SpdyFrame> credential_frame( |
+ credential_frame->reset( |
buffered_spdy_framer_->CreateCredentialFrame(credential)); |
if (net_log().IsLoggingAllEvents()) { |
@@ -724,10 +703,10 @@ SpdyFrame* SpdySession::CreateCredentialFrame( |
NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL, |
base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin)); |
} |
- return credential_frame.release(); |
+ return OK; |
} |
-SpdyFrame* SpdySession::CreateHeadersFrame( |
+scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame( |
SpdyStreamId stream_id, |
const SpdyHeaderBlock& headers, |
SpdyControlFlags flags) { |
@@ -749,12 +728,13 @@ SpdyFrame* SpdySession::CreateHeadersFrame( |
&headers, fin, /*unidirectional=*/false, |
stream_id, 0)); |
} |
- return frame.release(); |
+ return frame.Pass(); |
} |
-SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
- net::IOBuffer* data, int len, |
- SpdyDataFlags flags) { |
+scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
+ net::IOBuffer* data, |
+ int len, |
+ SpdyDataFlags flags) { |
// Find our stream |
CHECK(IsStreamActive(stream_id)); |
scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
@@ -762,7 +742,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
if (len < 0) { |
NOTREACHED(); |
- return NULL; |
+ return scoped_ptr<SpdyFrame>(); |
} |
if (len > kMaxSpdyFrameChunkSize) { |
@@ -785,7 +765,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
net_log().AddEvent( |
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, |
NetLog::IntegerCallback("stream_id", stream_id)); |
- return NULL; |
+ return scoped_ptr<SpdyFrame>(); |
} |
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { |
effective_window_size = |
@@ -797,7 +777,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
net_log().AddEvent( |
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, |
NetLog::IntegerCallback("stream_id", stream_id)); |
- return NULL; |
+ return scoped_ptr<SpdyFrame>(); |
} |
} |
@@ -828,7 +808,7 @@ SpdyFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
buffered_spdy_framer_->CreateDataFrame( |
stream_id, data->data(), static_cast<uint32>(len), flags)); |
- return frame.release(); |
+ return frame.Pass(); |
} |
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { |
@@ -863,7 +843,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id, |
scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
priority = stream->priority(); |
} |
- QueueFrame(rst_frame.release(), priority); |
+ EnqueueSessionWrite(priority, rst_frame.Pass()); |
RecordProtocolErrorHistogram( |
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID)); |
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR); |
@@ -930,57 +910,59 @@ void SpdySession::OnReadComplete(int bytes_read) { |
} |
void SpdySession::OnWriteComplete(int result) { |
+ // Releasing the in-flight write can have a side-effect of dropping |
+ // the last reference to |this|. Hold a reference through this |
+ // function. |
+ scoped_refptr<SpdySession> self(this); |
+ |
DCHECK(write_pending_); |
- DCHECK(in_flight_write_.size()); |
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
last_activity_time_ = base::TimeTicks::Now(); |
write_pending_ = false; |
- scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); |
- |
- if (result >= 0) { |
- // It should not be possible to have written more bytes than our |
- // in_flight_write_. |
- DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); |
- |
- in_flight_write_.buffer()->DidConsume(result); |
- |
- // We only notify the stream when we've fully written the pending frame. |
- if (!in_flight_write_.buffer()->BytesRemaining()) { |
- if (stream) { |
- // Report the number of bytes written to the caller, but exclude the |
- // frame size overhead. NOTE: if this frame was compressed the |
- // reported bytes written is the compressed size, not the original |
- // size. |
- if (result > 0) { |
- result = in_flight_write_.buffer()->size(); |
- DCHECK_GE(result, |
- static_cast<int>( |
- buffered_spdy_framer_->GetControlFrameHeaderSize())); |
- result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); |
- } |
- |
- // It is possible that the stream was cancelled while we were writing |
- // to the socket. |
- if (!stream->cancelled()) |
- stream->OnWriteComplete(result); |
- } |
+ if (result < 0) { |
+ in_flight_write_.Release(); |
+ CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); |
+ return; |
+ } |
- // Cleanup the write which just completed. |
- in_flight_write_.release(); |
- } |
+ // It should not be possible to have written more bytes than our |
+ // in_flight_write_. |
+ DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); |
- // Write more data. We're already in a continuation, so we can |
- // go ahead and write it immediately (without going back to the |
- // message loop). |
- WriteSocketLater(); |
- } else { |
- in_flight_write_.release(); |
+ in_flight_write_.buffer()->DidConsume(result); |
- // The stream is now errored. Close it down. |
- CloseSessionOnError( |
- static_cast<net::Error>(result), true, "The stream has errored."); |
+ // We only notify the stream when we've fully written the pending frame. |
+ if (in_flight_write_.buffer()->BytesRemaining() == 0) { |
+ DCHECK_GT(result, 0); |
+ |
+ scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); |
+ |
+ // It is possible that the stream was cancelled while we were writing |
+ // to the socket. |
+ if (stream && !stream->cancelled()) { |
+ // Report the number of bytes written to the caller, but exclude the |
+ // frame size overhead. NOTE: if this frame was compressed the |
+ // reported bytes written is the compressed size, not the original |
+ // size. |
+ result = in_flight_write_.buffer()->size(); |
+ DCHECK_GE(result, |
+ static_cast<int>( |
+ buffered_spdy_framer_->GetControlFrameHeaderSize())); |
+ result -= buffered_spdy_framer_->GetControlFrameHeaderSize(); |
+ |
+ stream->OnWriteComplete(result); |
+ } |
+ |
+ // Cleanup the write which just completed. |
+ in_flight_write_.Release(); |
} |
+ |
+ // Write more data. We're already in a continuation, so we can go |
+ // ahead and write it immediately (without going back to the message |
+ // loop). |
+ WriteSocketLater(); |
} |
net::Error SpdySession::ReadSocket() { |
@@ -997,7 +979,7 @@ net::Error SpdySession::ReadSocket() { |
int bytes_read = connection_->socket()->Read( |
read_buffer_.get(), |
kReadBufferSize, |
- base::Bind(&SpdySession::OnReadComplete, base::Unretained(this))); |
+ base::Bind(&SpdySession::OnReadComplete, weak_factory_.GetWeakPtr())); |
switch (bytes_read) { |
case 0: |
// Socket is closed! |
@@ -1051,24 +1033,39 @@ void SpdySession::WriteSocket() { |
// Loop sending frames until we've sent everything or until the write |
// returns error (or ERR_IO_PENDING). |
DCHECK(buffered_spdy_framer_.get()); |
- while (in_flight_write_.buffer() || !write_queue_.empty()) { |
- if (!in_flight_write_.buffer()) { |
- // Grab the next SpdyBuffer to send. |
- scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); |
- write_queue_.pop(); |
- scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this)); |
- stream_producers_.erase(producer.get()); |
+ while (true) { |
+ if (in_flight_write_.buffer()) { |
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
+ } else { |
+ // Grab the next frame to send. |
+ scoped_ptr<SpdyFrameProducer> producer; |
+ scoped_refptr<SpdyStream> stream; |
+ if (!write_queue_.Dequeue(&producer, &stream)) |
+ break; |
+ |
// It is possible that a stream had data to write, but a |
// WINDOW_UPDATE frame has been received which made that |
// stream no longer writable. |
// TODO(rch): consider handling that case by removing the |
// stream from the writable queue? |
- if (buffer == NULL) |
+ if (stream.get() && stream->cancelled()) |
continue; |
- in_flight_write_ = *buffer; |
- } else { |
- DCHECK(in_flight_write_.buffer()->BytesRemaining()); |
+ if (stream.get() && stream->stream_id() == 0) |
+ ActivateStream(stream); |
+ |
+ scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); |
+ if (!frame) { |
+ NOTREACHED(); |
+ continue; |
+ } |
+ DCHECK_GT(frame->size(), 0u); |
+ |
+ // TODO(mbelshe): We have too much copying of data here. |
+ scoped_refptr<IOBufferWithSize> buffer = |
+ new IOBufferWithSize(frame->size()); |
+ memcpy(buffer->data(), frame->data(), frame->size()); |
+ in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream); |
} |
write_pending_ = true; |
@@ -1126,12 +1123,7 @@ void SpdySession::CloseAllStreams(net::Error status) { |
stream->OnClose(status); |
} |
- // We also need to drain the queue. |
- while (!write_queue_.empty()) { |
- scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top()); |
- write_queue_.pop(); |
- stream_producers_.erase(producer.get()); |
- } |
+ write_queue_.Clear(); |
} |
void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream, |
@@ -1257,33 +1249,18 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const { |
return rv; |
} |
-class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer { |
- public: |
- SimpleSpdyIOBufferProducer(SpdyFrame* frame, |
- RequestPriority priority) |
- : frame_(frame), |
- priority_(priority) { |
- } |
- |
- virtual RequestPriority GetPriority() const OVERRIDE { |
- return priority_; |
- } |
- |
- virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { |
- return SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
- frame_.get(), priority_, NULL); |
- } |
- |
- private: |
- scoped_ptr<SpdyFrame> frame_; |
- RequestPriority priority_; |
-}; |
+void SpdySession::EnqueueSessionWrite(RequestPriority priority, |
+ scoped_ptr<SpdyFrame> frame) { |
+ EnqueueWrite( |
+ priority, |
+ scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())), |
+ NULL); |
+} |
-void SpdySession::QueueFrame(SpdyFrame* frame, |
- RequestPriority priority) { |
- SimpleSpdyIOBufferProducer* producer = |
- new SimpleSpdyIOBufferProducer(frame, priority); |
- write_queue_.push(producer); |
+void SpdySession::EnqueueWrite(RequestPriority priority, |
+ scoped_ptr<SpdyFrameProducer> producer, |
+ const scoped_refptr<SpdyStream>& stream) { |
+ write_queue_.Enqueue(priority, producer.Pass(), stream); |
WriteSocketLater(); |
} |
@@ -1303,8 +1280,7 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { |
// the stream in the unclaimed_pushed_streams_ list. However, if |
// the stream is errored out, clean it up entirely. |
if (status != OK) { |
- PushedStreamMap::iterator it; |
- for (it = unclaimed_pushed_streams_.begin(); |
+ for (PushedStreamMap::iterator it = unclaimed_pushed_streams_.begin(); |
it != unclaimed_pushed_streams_.end(); ++it) { |
scoped_refptr<SpdyStream> curr = it->second.first; |
if (id == curr->stream_id()) { |
@@ -1315,29 +1291,17 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) { |
} |
// The stream might have been deleted. |
- ActiveStreamMap::iterator it2 = active_streams_.find(id); |
- if (it2 == active_streams_.end()) |
+ ActiveStreamMap::iterator it = active_streams_.find(id); |
+ if (it == active_streams_.end()) |
return; |
- // Possibly remove from the write queue. |
- WriteQueue old = write_queue_; |
- write_queue_ = WriteQueue(); |
- while (!old.empty()) { |
- scoped_ptr<SpdyIOBufferProducer> producer(old.top()); |
- old.pop(); |
- StreamProducerMap::iterator it = stream_producers_.find(producer.get()); |
- if (it == stream_producers_.end() || it->second->stream_id() != id) { |
- write_queue_.push(producer.release()); |
- } else { |
- stream_producers_.erase(producer.get()); |
- producer.reset(NULL); |
- } |
- } |
+ const scoped_refptr<SpdyStream> stream(it->second); |
+ active_streams_.erase(it); |
+ DCHECK(stream); |
+ |
+ write_queue_.RemovePendingWritesForStream(stream); |
// If this is an active stream, call the callback. |
- const scoped_refptr<SpdyStream> stream(it2->second); |
- active_streams_.erase(it2); |
- DCHECK(stream); |
stream->OnClose(status); |
ProcessPendingStreamRequests(); |
} |
@@ -1921,7 +1885,7 @@ void SpdySession::SendSettings(const SettingsMap& settings) { |
scoped_ptr<SpdyFrame> settings_frame( |
buffered_spdy_framer_->CreateSettings(settings)); |
sent_settings_ = true; |
- QueueFrame(settings_frame.release(), HIGHEST); |
+ EnqueueSessionWrite(HIGHEST, settings_frame.Pass()); |
} |
void SpdySession::HandleSetting(uint32 id, uint32 value) { |
@@ -2009,14 +1973,14 @@ void SpdySession::SendWindowUpdateFrame(SpdyStreamId stream_id, |
DCHECK(buffered_spdy_framer_.get()); |
scoped_ptr<SpdyFrame> window_update_frame( |
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size)); |
- QueueFrame(window_update_frame.release(), priority); |
+ EnqueueSessionWrite(priority, window_update_frame.Pass()); |
} |
void SpdySession::WritePingFrame(uint32 unique_id) { |
DCHECK(buffered_spdy_framer_.get()); |
scoped_ptr<SpdyFrame> ping_frame( |
buffered_spdy_framer_->CreatePingFrame(unique_id)); |
- QueueFrame(ping_frame.release(), HIGHEST); |
+ EnqueueSessionWrite(HIGHEST, ping_frame.Pass()); |
if (net_log().IsLoggingAllEvents()) { |
net_log().AddEvent( |