| Index: net/spdy/spdy_session.cc
|
| diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
|
| index 46a92ee7a6a7842563ca5d839225dafacd801042..c80f5236f2555ec236cb48e7c84f08efb4bddf99 100644
|
| --- a/net/spdy/spdy_session.cc
|
| +++ b/net/spdy/spdy_session.cc
|
| @@ -30,9 +30,9 @@
|
| #include "net/cert/asn1_util.h"
|
| #include "net/http/http_network_session.h"
|
| #include "net/http/http_server_properties.h"
|
| +#include "net/spdy/spdy_buffer_producer.h"
|
| #include "net/spdy/spdy_credential_builder.h"
|
| #include "net/spdy/spdy_frame_builder.h"
|
| -#include "net/spdy/spdy_frame_producer.h"
|
| #include "net/spdy/spdy_http_utils.h"
|
| #include "net/spdy/spdy_protocol.h"
|
| #include "net/spdy/spdy_session_pool.h"
|
| @@ -312,6 +312,7 @@ SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
|
| stream_hi_water_mark_(kFirstStreamId),
|
| write_pending_(false),
|
| in_flight_write_frame_type_(DATA),
|
| + in_flight_write_frame_size_(0),
|
| delayed_write_pending_(false),
|
| is_secure_(false),
|
| certificate_error_code_(OK),
|
| @@ -635,7 +636,7 @@ int SpdySession::GetProtocolVersion() const {
|
| void SpdySession::EnqueueStreamWrite(
|
| SpdyStream* stream,
|
| SpdyFrameType frame_type,
|
| - scoped_ptr<SpdyFrameProducer> producer) {
|
| + scoped_ptr<SpdyBufferProducer> producer) {
|
| DCHECK(frame_type == HEADERS ||
|
| frame_type == DATA ||
|
| frame_type == CREDENTIAL ||
|
| @@ -737,18 +738,18 @@ scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame(
|
| return frame.Pass();
|
| }
|
|
|
| -scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| - net::IOBuffer* data,
|
| - int len,
|
| - SpdyDataFlags flags) {
|
| - // Find our stream
|
| +scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
|
| + net::IOBuffer* data,
|
| + int len,
|
| + SpdyDataFlags flags) {
|
| + // Find our stream.
|
| CHECK(IsStreamActive(stream_id));
|
| scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
|
| CHECK_EQ(stream->stream_id(), stream_id);
|
|
|
| if (len < 0) {
|
| NOTREACHED();
|
| - return scoped_ptr<SpdyFrame>();
|
| + return scoped_ptr<SpdyBuffer>();
|
| }
|
|
|
| if (len > kMaxSpdyFrameChunkSize) {
|
| @@ -771,7 +772,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| net_log().AddEvent(
|
| NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW,
|
| NetLog::IntegerCallback("stream_id", stream_id));
|
| - return scoped_ptr<SpdyFrame>();
|
| + return scoped_ptr<SpdyBuffer>();
|
| }
|
| if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
|
| effective_window_size =
|
| @@ -783,7 +784,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| net_log().AddEvent(
|
| NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW,
|
| NetLog::IntegerCallback("stream_id", stream_id));
|
| - return scoped_ptr<SpdyFrame>();
|
| + return scoped_ptr<SpdyBuffer>();
|
| }
|
| }
|
|
|
| @@ -814,7 +815,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
|
| buffered_spdy_framer_->CreateDataFrame(
|
| stream_id, data->data(), static_cast<uint32>(len), flags));
|
|
|
| - return frame.Pass();
|
| + return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()));
|
| }
|
|
|
| void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
|
| @@ -922,42 +923,45 @@ void SpdySession::OnWriteComplete(int result) {
|
| scoped_refptr<SpdySession> self(this);
|
|
|
| DCHECK(write_pending_);
|
| - DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
|
| + DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
|
|
|
| last_activity_time_ = base::TimeTicks::Now();
|
| write_pending_ = false;
|
|
|
| if (result < 0) {
|
| - in_flight_write_.Release();
|
| + in_flight_write_.reset();
|
| in_flight_write_frame_type_ = DATA;
|
| - CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
|
| + in_flight_write_frame_size_ = 0;
|
| + in_flight_write_stream_ = NULL;
|
| + CloseSessionOnError(static_cast<Error>(result), true, "Write error");
|
| return;
|
| }
|
|
|
| // It should not be possible to have written more bytes than our
|
| // in_flight_write_.
|
| - DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
|
| -
|
| - in_flight_write_.buffer()->DidConsume(result);
|
| -
|
| - // We only notify the stream when we've fully written the pending frame.
|
| - if (in_flight_write_.buffer()->BytesRemaining() == 0) {
|
| - DCHECK_GT(result, 0);
|
| -
|
| - scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
|
| + DCHECK_LE(static_cast<size_t>(result),
|
| + in_flight_write_->GetRemainingSize());
|
| +
|
| + if (result > 0) {
|
| + in_flight_write_->Consume(static_cast<size_t>(result));
|
| +
|
| + // We only notify the stream when we've fully written the pending frame.
|
| + if (in_flight_write_->GetRemainingSize() == 0) {
|
| + // It is possible that the stream was cancelled while we were
|
| + // writing to the socket.
|
| + if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) {
|
| + DCHECK_GT(in_flight_write_frame_size_, 0u);
|
| + in_flight_write_stream_->OnFrameWriteComplete(
|
| + in_flight_write_frame_type_,
|
| + in_flight_write_frame_size_);
|
| + }
|
|
|
| - // It is possible that the stream was cancelled while we were writing
|
| - // to the socket.
|
| - if (stream && !stream->cancelled()) {
|
| - DCHECK_GT(in_flight_write_.buffer()->size(), 0);
|
| - stream->OnFrameWriteComplete(
|
| - in_flight_write_frame_type_,
|
| - static_cast<size_t>(in_flight_write_.buffer()->size()));
|
| + // Cleanup the write which just completed.
|
| + in_flight_write_.reset();
|
| + in_flight_write_frame_type_ = DATA;
|
| + in_flight_write_frame_size_ = 0;
|
| + in_flight_write_stream_ = NULL;
|
| }
|
| -
|
| - // Cleanup the write which just completed.
|
| - in_flight_write_.Release();
|
| - in_flight_write_frame_type_ = DATA;
|
| }
|
|
|
| // Write more data. We're already in a continuation, so we can go
|
| @@ -1035,12 +1039,12 @@ void SpdySession::WriteSocket() {
|
| // returns error (or ERR_IO_PENDING).
|
| DCHECK(buffered_spdy_framer_.get());
|
| while (true) {
|
| - if (in_flight_write_.buffer()) {
|
| - DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
|
| + if (in_flight_write_) {
|
| + DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
|
| } else {
|
| // Grab the next frame to send.
|
| SpdyFrameType frame_type = DATA;
|
| - scoped_ptr<SpdyFrameProducer> producer;
|
| + scoped_ptr<SpdyBufferProducer> producer;
|
| scoped_refptr<SpdyStream> stream;
|
| if (!write_queue_.Dequeue(&frame_type, &producer, &stream))
|
| break;
|
| @@ -1064,25 +1068,22 @@ void SpdySession::WriteSocket() {
|
| }
|
| }
|
|
|
| - scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
|
| - if (!frame) {
|
| + in_flight_write_ = producer->ProduceBuffer();
|
| + if (!in_flight_write_) {
|
| NOTREACHED();
|
| continue;
|
| }
|
| - DCHECK_GT(frame->size(), 0u);
|
| -
|
| - // TODO(mbelshe): We have too much copying of data here.
|
| - scoped_refptr<IOBufferWithSize> buffer =
|
| - new IOBufferWithSize(frame->size());
|
| - memcpy(buffer->data(), frame->data(), frame->size());
|
| - in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
|
| in_flight_write_frame_type_ = frame_type;
|
| + in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
|
| + DCHECK_GE(in_flight_write_frame_size_,
|
| + buffered_spdy_framer_->GetFrameMinimumSize());
|
| + in_flight_write_stream_ = stream;
|
| }
|
|
|
| write_pending_ = true;
|
| int rv = connection_->socket()->Write(
|
| - in_flight_write_.buffer(),
|
| - in_flight_write_.buffer()->BytesRemaining(),
|
| + in_flight_write_->GetIOBufferForRemainingData(),
|
| + in_flight_write_->GetRemainingSize(),
|
| base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
|
| if (rv == net::ERR_IO_PENDING)
|
| break;
|
| @@ -1269,13 +1270,15 @@ void SpdySession::EnqueueSessionWrite(RequestPriority priority,
|
| frame_type == PING);
|
| EnqueueWrite(
|
| priority, frame_type,
|
| - scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())),
|
| + scoped_ptr<SpdyBufferProducer>(
|
| + new SimpleBufferProducer(
|
| + scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
|
| NULL);
|
| }
|
|
|
| void SpdySession::EnqueueWrite(RequestPriority priority,
|
| SpdyFrameType frame_type,
|
| - scoped_ptr<SpdyFrameProducer> producer,
|
| + scoped_ptr<SpdyBufferProducer> producer,
|
| const scoped_refptr<SpdyStream>& stream) {
|
| write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
|
| WriteSocketLater();
|
| @@ -1393,16 +1396,24 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
|
| base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
|
| }
|
|
|
| + ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
|
| +
|
| // By the time data comes in, the stream may already be inactive.
|
| - if (!IsStreamActive(stream_id))
|
| + if (it == active_streams_.end())
|
| return;
|
|
|
| // Only decrease the window size for data for active streams.
|
| if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0)
|
| DecreaseRecvWindowSize(static_cast<int32>(len));
|
|
|
| - scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
|
| - stream->OnDataReceived(data, len);
|
| + scoped_ptr<SpdyBuffer> buffer;
|
| + if (data) {
|
| + DCHECK_GT(len, 0u);
|
| + buffer.reset(new SpdyBuffer(data, len));
|
| + } else {
|
| + DCHECK_EQ(len, 0u);
|
| + }
|
| + it->second->OnDataReceived(buffer.Pass());
|
| }
|
|
|
| void SpdySession::OnSetting(SpdySettingsIds id,
|
| @@ -1671,7 +1682,7 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id,
|
| CHECK(!stream->cancelled());
|
|
|
| if (status == 0) {
|
| - stream->OnDataReceived(NULL, 0);
|
| + stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
|
| } else if (status == RST_STREAM_REFUSED_STREAM) {
|
| DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM);
|
| } else {
|
|
|