Index: trunk/src/net/spdy/spdy_stream.cc |
=================================================================== |
--- trunk/src/net/spdy/spdy_stream.cc (revision 194561) |
+++ trunk/src/net/spdy/spdy_stream.cc (working copy) |
@@ -12,7 +12,7 @@ |
#include "base/message_loop.h" |
#include "base/stringprintf.h" |
#include "base/values.h" |
-#include "net/spdy/spdy_buffer_producer.h" |
+#include "net/spdy/spdy_frame_producer.h" |
#include "net/spdy/spdy_http_utils.h" |
#include "net/spdy/spdy_session.h" |
@@ -54,23 +54,22 @@ |
} // namespace |
// A wrapper around a stream that calls into ProduceSynStreamFrame(). |
-class SpdyStream::SynStreamBufferProducer : public SpdyBufferProducer { |
+class SpdyStream::SynStreamFrameProducer : public SpdyFrameProducer { |
public: |
- SynStreamBufferProducer(const base::WeakPtr<SpdyStream>& stream) |
+ SynStreamFrameProducer(const base::WeakPtr<SpdyStream>& stream) |
: stream_(stream) { |
DCHECK(stream_); |
} |
- virtual ~SynStreamBufferProducer() {} |
+ virtual ~SynStreamFrameProducer() {} |
- virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { |
+ virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { |
if (!stream_) { |
NOTREACHED(); |
- return scoped_ptr<SpdyBuffer>(); |
+ return scoped_ptr<SpdyFrame>(); |
} |
DCHECK_GT(stream_->stream_id(), 0u); |
- return scoped_ptr<SpdyBuffer>( |
- new SpdyBuffer(stream_->ProduceSynStreamFrame())); |
+ return stream_->ProduceSynStreamFrame(); |
} |
private: |
@@ -79,9 +78,9 @@ |
// A wrapper around a stream that calls into ProduceHeaderFrame() with |
// a given header block. |
-class SpdyStream::HeaderBufferProducer : public SpdyBufferProducer { |
+class SpdyStream::HeaderFrameProducer : public SpdyFrameProducer { |
public: |
- HeaderBufferProducer(const base::WeakPtr<SpdyStream>& stream, |
+ HeaderFrameProducer(const base::WeakPtr<SpdyStream>& stream, |
scoped_ptr<SpdyHeaderBlock> headers) |
: stream_(stream), |
headers_(headers.Pass()) { |
@@ -89,16 +88,15 @@ |
DCHECK(headers_); |
} |
- virtual ~HeaderBufferProducer() {} |
+ virtual ~HeaderFrameProducer() {} |
- virtual scoped_ptr<SpdyBuffer> ProduceBuffer() OVERRIDE { |
+ virtual scoped_ptr<SpdyFrame> ProduceFrame() OVERRIDE { |
if (!stream_) { |
NOTREACHED(); |
- return scoped_ptr<SpdyBuffer>(); |
+ return scoped_ptr<SpdyFrame>(); |
} |
DCHECK_GT(stream_->stream_id(), 0u); |
- return scoped_ptr<SpdyBuffer>( |
- new SpdyBuffer(stream_->ProduceHeaderFrame(headers_.Pass()))); |
+ return stream_->ProduceHeaderFrame(headers_.Pass()); |
} |
private: |
@@ -177,17 +175,17 @@ |
return; |
} |
- std::vector<SpdyBuffer*> buffers; |
- pending_buffers_.release(&buffers); |
+ std::vector<scoped_refptr<IOBufferWithSize> > buffers; |
+ buffers.swap(pending_buffers_); |
for (size_t i = 0; i < buffers.size(); ++i) { |
// It is always possible that a callback to the delegate results in |
// the delegate no longer being available. |
if (!delegate_) |
break; |
if (buffers[i]) { |
- delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>(buffers[i])); |
+ delegate_->OnDataReceived(buffers[i]->data(), buffers[i]->size()); |
} else { |
- delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>()); |
+ delegate_->OnDataReceived(NULL, 0); |
session_->CloseStream(stream_id_, net::OK); |
// Note: |this| may be deleted after calling CloseStream. |
DCHECK_EQ(buffers.size() - 1, i); |
@@ -452,8 +450,9 @@ |
return rv; |
} |
-void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { |
+void SpdyStream::OnDataReceived(const char* data, size_t length) { |
DCHECK(session_->IsStreamActive(stream_id_)); |
+ DCHECK_LT(length, 1u << 24); |
// If we don't have a response, then the SYN_REPLY did not come through. |
// We cannot pass data up to the caller unless the reply headers have been |
// received. |
@@ -466,8 +465,10 @@ |
if (!delegate_ || continue_buffering_data_) { |
// It should be valid for this to happen in the server push case. |
// We'll return received data when delegate gets attached to the stream. |
- if (buffer) { |
- pending_buffers_.push_back(buffer.release()); |
+ if (length > 0) { |
+ IOBufferWithSize* buf = new IOBufferWithSize(length); |
+ memcpy(buf->data(), data, length); |
+ pending_buffers_.push_back(make_scoped_refptr(buf)); |
} else { |
pending_buffers_.push_back(NULL); |
metrics_.StopStream(); |
@@ -479,15 +480,14 @@ |
CHECK(!closed()); |
- if (!buffer) { |
+ // A zero-length read means that the stream is being closed. |
+ if (length == 0) { |
metrics_.StopStream(); |
session_->CloseStream(stream_id_, net::OK); |
// Note: |this| may be deleted after calling CloseStream. |
return; |
} |
- size_t length = buffer->GetRemainingSize(); |
- DCHECK_LE(length, session_->GetDataFrameMaximumPayload()); |
if (session_->flow_control_state() >= SpdySession::FLOW_CONTROL_STREAM) |
DecreaseRecvWindowSize(static_cast<int32>(length)); |
@@ -496,7 +496,7 @@ |
recv_bytes_ += length; |
recv_last_byte_time_ = base::TimeTicks::Now(); |
- if (delegate_->OnDataReceived(buffer.Pass()) != net::OK) { |
+ if (delegate_->OnDataReceived(data, length) != net::OK) { |
// |delegate_| rejected the data. |
LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "Delegate rejected the data"); |
session_->CloseStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); |
@@ -580,8 +580,8 @@ |
session_->EnqueueStreamWrite( |
this, HEADERS, |
- scoped_ptr<SpdyBufferProducer>( |
- new HeaderBufferProducer( |
+ scoped_ptr<SpdyFrameProducer>( |
+ new HeaderFrameProducer( |
weak_ptr_factory_.GetWeakPtr(), headers.Pass()))); |
} |
@@ -594,16 +594,15 @@ |
CHECK_GT(stream_id_, 0u); |
CHECK(!cancelled()); |
- scoped_ptr<SpdyBuffer> data_buffer(session_->CreateDataBuffer( |
+ scoped_ptr<SpdyFrame> data_frame(session_->CreateDataFrame( |
stream_id_, data, length, flags)); |
- // We'll get called again by PossiblyResumeIfSendStalled(). |
- if (!data_buffer) |
+ if (!data_frame) |
return; |
session_->EnqueueStreamWrite( |
this, DATA, |
- scoped_ptr<SpdyBufferProducer>( |
- new SimpleBufferProducer(data_buffer.Pass()))); |
+ scoped_ptr<SpdyFrameProducer>( |
+ new SimpleFrameProducer(data_frame.Pass()))); |
} |
bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info, |
@@ -785,9 +784,7 @@ |
// the state machine appropriately. |
session_->EnqueueStreamWrite( |
this, CREDENTIAL, |
- scoped_ptr<SpdyBufferProducer>( |
- new SimpleBufferProducer( |
- scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()))))); |
+ scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass()))); |
return ERR_IO_PENDING; |
} |
@@ -803,8 +800,8 @@ |
session_->EnqueueStreamWrite( |
this, SYN_STREAM, |
- scoped_ptr<SpdyBufferProducer>( |
- new SynStreamBufferProducer(weak_ptr_factory_.GetWeakPtr()))); |
+ scoped_ptr<SpdyFrameProducer>( |
+ new SynStreamFrameProducer(weak_ptr_factory_.GetWeakPtr()))); |
return ERR_IO_PENDING; |
} |