Index: net/spdy/spdy_stream.cc |
diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc |
index e20fb8d73125baa10d1316d87830d6663f87fae9..4bf4e74ecbf484df853983b5e3c1706c8e489d0d 100644 |
--- a/net/spdy/spdy_stream.cc |
+++ b/net/spdy/spdy_stream.cc |
@@ -12,6 +12,7 @@ |
#include "base/values.h" |
#include "net/spdy/spdy_http_utils.h" |
#include "net/spdy/spdy_session.h" |
+#include "net/spdy/spdy_write_queue.h" |
namespace net { |
@@ -50,6 +51,57 @@ bool ContainsUpperAscii(const std::string& str) { |
} // namespace |
+// A wrapper around a stream that calls into ProduceSynStreamFrame(). |
+class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer { |
+ public: |
+ SynStreamFrameProducer(const base::WeakPtr<SpdyStream>& stream) |
+ : stream_(stream) { |
+ DCHECK(stream_); |
+ } |
+ |
+ virtual ~SynStreamFrameProducer() {} |
+ |
+ virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { |
+ if (!stream_) { |
+ NOTREACHED(); |
+ return scoped_ptr<SpdyFrame>(); |
+ } |
+ DCHECK_GT(stream_->stream_id(), 0u); |
+ return stream_->ProduceSynStreamFrame(); |
+ } |
+ |
+ private: |
+ const base::WeakPtr<SpdyStream> stream_; |
+}; |
+ |
+// A wrapper around a stream that calls into ProduceHeaderFrame() with |
+// a given header block. |
+class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer { |
+ public: |
+ HeaderFrameProducer(const base::WeakPtr<SpdyStream>& stream, |
+ scoped_ptr<SpdyHeaderBlock> headers) |
+ : stream_(stream), |
+ headers_(headers.Pass()) { |
+ DCHECK(stream_); |
+ DCHECK(headers_); |
+ } |
+ |
+ virtual ~HeaderFrameProducer() {} |
+ |
+ virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { |
+ if (!stream_) { |
+ NOTREACHED(); |
+ return scoped_ptr<SpdyFrame>(); |
+ } |
+ DCHECK_GT(stream_->stream_id(), 0u); |
+ return stream_->ProduceHeaderFrame(headers_.Pass()); |
+ } |
+ |
+ private: |
+ const base::WeakPtr<SpdyStream> stream_; |
+ scoped_ptr<SpdyHeaderBlock> headers_; |
+}; |
+ |
SpdyStream::SpdyStream(SpdySession* session, |
const std::string& path, |
RequestPriority priority, |
@@ -83,99 +135,8 @@ SpdyStream::SpdyStream(SpdySession* session, |
domain_bound_cert_type_(CLIENT_CERT_INVALID_TYPE) { |
} |
-class SpdyStream::SpdyStreamIOBufferProducer |
- : public SpdySession::SpdyIOBufferProducer { |
- public: |
- SpdyStreamIOBufferProducer(SpdyStream* stream) : stream_(stream) {} |
- |
- // SpdyFrameProducer |
- virtual RequestPriority GetPriority() const OVERRIDE { |
- return stream_->priority(); |
- } |
- |
- virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) OVERRIDE { |
- if (stream_->cancelled()) |
- return NULL; |
- if (stream_->stream_id() == 0) |
- SpdySession::SpdyIOBufferProducer::ActivateStream(session, stream_); |
- frame_ = stream_->ProduceNextFrame(); |
- return frame_ == NULL ? NULL : |
- SpdySession::SpdyIOBufferProducer::CreateIOBuffer( |
- frame_.get(), GetPriority(), stream_); |
- } |
- |
- private: |
- scoped_refptr<SpdyStream> stream_; |
- scoped_ptr<SpdyFrame> frame_; |
-}; |
- |
-void SpdyStream::SetHasWriteAvailable() { |
- session_->SetStreamHasWriteAvailable(this, |
- new SpdyStreamIOBufferProducer(this)); |
-} |
- |
-scoped_ptr<SpdyFrame> SpdyStream::ProduceNextFrame() { |
- if (io_state_ == STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE) { |
- CHECK(request_.get()); |
- CHECK_GT(stream_id_, 0u); |
- |
- std::string origin = GetUrl().GetOrigin().spec(); |
- DCHECK(origin[origin.length() - 1] == '/'); |
- origin.erase(origin.length() - 1); // Trim trailing slash. |
- scoped_ptr<SpdyFrame> frame(session_->CreateCredentialFrame( |
- origin, domain_bound_cert_type_, domain_bound_private_key_, |
- domain_bound_cert_, priority_)); |
- return frame.Pass(); |
- } else if (io_state_ == STATE_SEND_HEADERS_COMPLETE) { |
- CHECK(request_.get()); |
- CHECK_GT(stream_id_, 0u); |
- |
- SpdyControlFlags flags = |
- has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN; |
- scoped_ptr<SpdyFrame> frame(session_->CreateSynStream( |
- stream_id_, priority_, slot_, flags, *request_)); |
- send_time_ = base::TimeTicks::Now(); |
- return frame.Pass(); |
- } else { |
- CHECK(!cancelled()); |
- // We must need to write stream data. |
- // Until the headers have been completely sent, we can not be sure |
- // that our stream_id is correct. |
- DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); |
- DCHECK_GT(stream_id_, 0u); |
- DCHECK(!pending_frames_.empty()); |
- |
- PendingFrame frame = pending_frames_.front(); |
- pending_frames_.pop_front(); |
- |
- waiting_completions_.push_back(frame.type); |
- |
- if (frame.type == TYPE_DATA) { |
- // Send queued data frame. |
- return scoped_ptr<SpdyFrame>(frame.data_frame); |
- } else { |
- DCHECK(frame.type == TYPE_HEADERS); |
- // Create actual HEADERS frame just in time because it depends on |
- // compression context and should not be reordered after the creation. |
- scoped_ptr<SpdyFrame> header_frame(session_->CreateHeadersFrame( |
- stream_id_, *frame.header_block, SpdyControlFlags())); |
- delete frame.header_block; |
- return header_frame.Pass(); |
- } |
- } |
- NOTREACHED(); |
-} |
- |
SpdyStream::~SpdyStream() { |
UpdateHistograms(); |
- while (!pending_frames_.empty()) { |
- PendingFrame frame = pending_frames_.back(); |
- pending_frames_.pop_back(); |
- if (frame.type == TYPE_DATA) |
- delete frame.data_frame; |
- else |
- delete frame.header_block; |
- } |
} |
void SpdyStream::SetDelegate(Delegate* delegate) { |
@@ -228,6 +189,35 @@ void SpdyStream::PushedStreamReplayData() { |
} |
} |
+scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() { |
+ CHECK_EQ(io_state_, STATE_SEND_HEADERS_COMPLETE); |
+ CHECK(request_.get()); |
+ CHECK_GT(stream_id_, 0u); |
+ |
+ SpdyControlFlags flags = |
+ has_upload_data_ ? CONTROL_FLAG_NONE : CONTROL_FLAG_FIN; |
+ scoped_ptr<SpdyFrame> frame(session_->CreateSynStream( |
+ stream_id_, priority_, slot_, flags, *request_)); |
+ send_time_ = base::TimeTicks::Now(); |
+ return frame.Pass(); |
+} |
+ |
+scoped_ptr<SpdyFrame> SpdyStream::ProduceHeaderFrame( |
+ scoped_ptr<SpdyHeaderBlock> header_block) { |
+ CHECK(!cancelled()); |
+ // We must need to write stream data. |
+ // Until the headers have been completely sent, we can not be sure |
+ // that our stream_id is correct. |
+ DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); |
+ DCHECK_GT(stream_id_, 0u); |
+ |
+ // Create actual HEADERS frame just in time because it depends on |
+ // compression context and should not be reordered after the creation. |
+ scoped_ptr<SpdyFrame> header_frame(session_->CreateHeadersFrame( |
+ stream_id_, *header_block, SpdyControlFlags())); |
+ return header_frame.Pass(); |
+} |
+ |
void SpdyStream::DetachDelegate() { |
delegate_ = NULL; |
if (!closed()) |
@@ -578,14 +568,13 @@ void SpdyStream::QueueHeaders(scoped_ptr<SpdyHeaderBlock> headers) { |
DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); |
CHECK_GT(stream_id_, 0u); |
- PendingFrame frame; |
- frame.type = TYPE_HEADERS; |
- // |frame.header_block| is deleted by either ProduceNextFrame() or |
- // the destructor. |
- frame.header_block = headers.release(); |
- pending_frames_.push_back(frame); |
+ waiting_completions_.push_back(TYPE_HEADERS); |
- SetHasWriteAvailable(); |
+ session_->EnqueueStreamWrite( |
+ this, |
+ scoped_ptr<SpdyFrameProducer>( |
+ new HeaderFrameProducer( |
+ weak_ptr_factory_.GetWeakPtr(), headers.Pass()))); |
} |
void SpdyStream::QueueStreamData(IOBuffer* data, |
@@ -595,20 +584,19 @@ void SpdyStream::QueueStreamData(IOBuffer* data, |
// that our stream_id is correct. |
DCHECK_GT(io_state_, STATE_SEND_HEADERS_COMPLETE); |
CHECK_GT(stream_id_, 0u); |
+ CHECK(!cancelled()); |
scoped_ptr<SpdyFrame> data_frame(session_->CreateDataFrame( |
stream_id_, data, length, flags)); |
if (!data_frame) |
return; |
- PendingFrame frame; |
- frame.type = TYPE_DATA; |
- // |frame.data_frame| is either returned by ProduceNextFrame() or |
- // deleted in the destructor. |
- frame.data_frame = data_frame.release(); |
- pending_frames_.push_back(frame); |
+ waiting_completions_.push_back(TYPE_DATA); |
- SetHasWriteAvailable(); |
+ session_->EnqueueStreamWrite( |
+ this, |
+ scoped_ptr<SpdyFrameProducer>( |
+ new SimpleFrameProducer(data_frame.Pass()))); |
} |
bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, |
@@ -693,7 +681,7 @@ int SpdyStream::DoLoop(int result) { |
break; |
// State machine 2: connection is established. |
// In STATE_OPEN, OnResponseReceived has already been called. |
- // OnDataReceived, OnClose and OnWriteCompelte can be called. |
+ // OnDataReceived, OnClose and OnWriteComplete can be called. |
// Only OnWriteComplete calls DoLoop((). |
// |
// For HTTP streams, no data is sent from the client while in the OPEN |
@@ -763,7 +751,37 @@ int SpdyStream::DoGetDomainBoundCertComplete(int result) { |
int SpdyStream::DoSendDomainBoundCert() { |
io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE; |
CHECK(request_.get()); |
- SetHasWriteAvailable(); |
+ |
+ std::string origin = GetUrl().GetOrigin().spec(); |
+ DCHECK(origin[origin.length() - 1] == '/'); |
+ origin.erase(origin.length() - 1); // Trim trailing slash. |
+ scoped_ptr<SpdyFrame> frame; |
+ int rv = session_->CreateCredentialFrame( |
+ origin, domain_bound_cert_type_, domain_bound_private_key_, |
+ domain_bound_cert_, priority_, &frame); |
+ if (rv != OK) { |
+ DCHECK_NE(rv, ERR_IO_PENDING); |
+ return rv; |
+ } |
+ |
+ DCHECK(frame); |
+ // TODO(akalin): Fix a couple of race conditions: |
+ // |
+ // 1) Since this counts as a write for this stream, the stream will |
+ // be activated (and hence allocated a stream ID) before this frame |
+ // is sent, even though the ID should only be activated for the |
+ // SYN_STREAM frame. This can be solved by signalling to the session |
+ // when we're sending a SYN_STREAM frame, and have it only activate |
+ // the stream then. |
+ // |
+ // 2) Since this is decoupled from sending the SYN_STREAM frame, it |
+ // is possible that other domain-bound cert frames will clobber ours |
+ // before our SYN_STREAM frame gets sent. This can be solved by |
+ // immediately enqueueing the SYN_STREAM frame here and adjusting |
+ // the state machine appropriately. |
+ session_->EnqueueStreamWrite( |
+ this, |
+ scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass()))); |
return ERR_IO_PENDING; |
} |
@@ -777,9 +795,12 @@ int SpdyStream::DoSendDomainBoundCertComplete(int result) { |
int SpdyStream::DoSendHeaders() { |
CHECK(!cancelled_); |
- |
- SetHasWriteAvailable(); |
io_state_ = STATE_SEND_HEADERS_COMPLETE; |
+ |
+ session_->EnqueueStreamWrite( |
+ this, |
+ scoped_ptr<SpdyFrameProducer>( |
+ new SynStreamFrameProducer(weak_ptr_factory_.GetWeakPtr()))); |
return ERR_IO_PENDING; |
} |