Index: net/spdy/spdy_session.cc |
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc |
index 1b59ec7b2d46e669bba74a60b656b45bd8c2fa39..f7057319a671b05d12a722ae6f71467c0a072a18 100644 |
--- a/net/spdy/spdy_session.cc |
+++ b/net/spdy/spdy_session.cc |
@@ -31,9 +31,9 @@ |
#include "net/cert/asn1_util.h" |
#include "net/http/http_network_session.h" |
#include "net/http/http_server_properties.h" |
+#include "net/spdy/spdy_buffer_producer.h" |
#include "net/spdy/spdy_credential_builder.h" |
#include "net/spdy/spdy_frame_builder.h" |
-#include "net/spdy/spdy_frame_producer.h" |
#include "net/spdy/spdy_http_utils.h" |
#include "net/spdy/spdy_protocol.h" |
#include "net/spdy/spdy_session_pool.h" |
@@ -312,6 +312,7 @@ SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair, |
stream_hi_water_mark_(kFirstStreamId), |
write_pending_(false), |
in_flight_write_frame_type_(DATA), |
+ in_flight_write_frame_size_(0), |
delayed_write_pending_(false), |
is_secure_(false), |
certificate_error_code_(OK), |
@@ -636,7 +637,7 @@ int SpdySession::GetProtocolVersion() const { |
void SpdySession::EnqueueStreamWrite( |
SpdyStream* stream, |
SpdyFrameType frame_type, |
- scoped_ptr<SpdyFrameProducer> producer) { |
+ scoped_ptr<SpdyBufferProducer> producer) { |
DCHECK(frame_type == HEADERS || |
frame_type == DATA || |
frame_type == CREDENTIAL || |
@@ -738,18 +739,18 @@ scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame( |
return frame.Pass(); |
} |
-scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
- net::IOBuffer* data, |
- int len, |
- SpdyDataFlags flags) { |
- // Find our stream |
+scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, |
+ net::IOBuffer* data, |
+ int len, |
+ SpdyDataFlags flags) { |
+ // Find our stream. |
CHECK(IsStreamActive(stream_id)); |
scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
CHECK_EQ(stream->stream_id(), stream_id); |
if (len < 0) { |
NOTREACHED(); |
- return scoped_ptr<SpdyFrame>(); |
+ return scoped_ptr<SpdyBuffer>(); |
} |
if (len > kMaxSpdyFrameChunkSize) { |
@@ -772,7 +773,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
net_log().AddEvent( |
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, |
NetLog::IntegerCallback("stream_id", stream_id)); |
- return scoped_ptr<SpdyFrame>(); |
+ return scoped_ptr<SpdyBuffer>(); |
} |
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { |
effective_window_size = |
@@ -784,7 +785,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
net_log().AddEvent( |
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, |
NetLog::IntegerCallback("stream_id", stream_id)); |
- return scoped_ptr<SpdyFrame>(); |
+ return scoped_ptr<SpdyBuffer>(); |
} |
} |
@@ -815,7 +816,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
buffered_spdy_framer_->CreateDataFrame( |
stream_id, data->data(), static_cast<uint32>(len), flags)); |
- return frame.Pass(); |
+ return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())); |
} |
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { |
@@ -982,42 +983,45 @@ void SpdySession::OnWriteComplete(int result) { |
scoped_refptr<SpdySession> self(this); |
DCHECK(write_pending_); |
- DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
+ DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); |
last_activity_time_ = base::TimeTicks::Now(); |
write_pending_ = false; |
if (result < 0) { |
- in_flight_write_.Release(); |
+ in_flight_write_.reset(); |
in_flight_write_frame_type_ = DATA; |
- CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); |
+ in_flight_write_frame_size_ = 0; |
+ in_flight_write_stream_ = NULL; |
+ CloseSessionOnError(static_cast<Error>(result), true, "Write error"); |
return; |
} |
// It should not be possible to have written more bytes than our |
// in_flight_write_. |
- DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); |
- |
- in_flight_write_.buffer()->DidConsume(result); |
- |
- // We only notify the stream when we've fully written the pending frame. |
- if (in_flight_write_.buffer()->BytesRemaining() == 0) { |
- DCHECK_GT(result, 0); |
- |
- scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); |
+ DCHECK_LE(static_cast<size_t>(result), |
+ in_flight_write_->GetRemainingSize()); |
+ |
+ if (result > 0) { |
+ in_flight_write_->Consume(static_cast<size_t>(result)); |
+ |
+ // We only notify the stream when we've fully written the pending frame. |
+ if (in_flight_write_->GetRemainingSize() == 0) { |
+ // It is possible that the stream was cancelled while we were |
+ // writing to the socket. |
+ if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) { |
+ DCHECK_GT(in_flight_write_frame_size_, 0u); |
+ in_flight_write_stream_->OnFrameWriteComplete( |
+ in_flight_write_frame_type_, |
+ in_flight_write_frame_size_); |
+ } |
- // It is possible that the stream was cancelled while we were writing |
- // to the socket. |
- if (stream && !stream->cancelled()) { |
- DCHECK_GT(in_flight_write_.buffer()->size(), 0); |
- stream->OnFrameWriteComplete( |
- in_flight_write_frame_type_, |
- static_cast<size_t>(in_flight_write_.buffer()->size())); |
+ // Cleanup the write which just completed. |
+ in_flight_write_.reset(); |
+ in_flight_write_frame_type_ = DATA; |
+ in_flight_write_frame_size_ = 0; |
+ in_flight_write_stream_ = NULL; |
} |
- |
- // Cleanup the write which just completed. |
- in_flight_write_.Release(); |
- in_flight_write_frame_type_ = DATA; |
} |
// Write more data. We're already in a continuation, so we can go |
@@ -1057,12 +1061,12 @@ void SpdySession::WriteSocket() { |
// returns error (or ERR_IO_PENDING). |
DCHECK(buffered_spdy_framer_.get()); |
while (true) { |
- if (in_flight_write_.buffer()) { |
- DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
+ if (in_flight_write_) { |
+ DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); |
} else { |
// Grab the next frame to send. |
SpdyFrameType frame_type = DATA; |
- scoped_ptr<SpdyFrameProducer> producer; |
+ scoped_ptr<SpdyBufferProducer> producer; |
scoped_refptr<SpdyStream> stream; |
if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) |
break; |
@@ -1086,25 +1090,25 @@ void SpdySession::WriteSocket() { |
} |
} |
- scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); |
- if (!frame) { |
+ in_flight_write_ = producer->ProduceBuffer(); |
+ if (!in_flight_write_) { |
NOTREACHED(); |
continue; |
} |
- DCHECK_GT(frame->size(), 0u); |
- |
- // TODO(mbelshe): We have too much copying of data here. |
- scoped_refptr<IOBufferWithSize> buffer = |
- new IOBufferWithSize(frame->size()); |
- memcpy(buffer->data(), frame->data(), frame->size()); |
- in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream); |
in_flight_write_frame_type_ = frame_type; |
+ in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); |
+ DCHECK_GE(in_flight_write_frame_size_, |
+ buffered_spdy_framer_->GetFrameMinimumSize()); |
+ in_flight_write_stream_ = stream; |
} |
write_pending_ = true; |
+ // We keep |in_flight_write_| alive until OnWriteComplete(), so |
+ // it's okay to use GetIOBufferForRemainingData() since the socket |
+ // doesn't use the IOBuffer past OnWriteComplete(). |
int rv = connection_->socket()->Write( |
- in_flight_write_.buffer(), |
- in_flight_write_.buffer()->BytesRemaining(), |
+ in_flight_write_->GetIOBufferForRemainingData(), |
+ in_flight_write_->GetRemainingSize(), |
base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); |
if (rv == net::ERR_IO_PENDING) |
break; |
@@ -1294,13 +1298,15 @@ void SpdySession::EnqueueSessionWrite(RequestPriority priority, |
frame_type == PING); |
EnqueueWrite( |
priority, frame_type, |
- scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())), |
+ scoped_ptr<SpdyBufferProducer>( |
+ new SimpleBufferProducer( |
+ scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), |
NULL); |
} |
void SpdySession::EnqueueWrite(RequestPriority priority, |
SpdyFrameType frame_type, |
- scoped_ptr<SpdyFrameProducer> producer, |
+ scoped_ptr<SpdyBufferProducer> producer, |
const scoped_refptr<SpdyStream>& stream) { |
write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); |
WriteSocketLater(); |
@@ -1418,16 +1424,24 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id, |
base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); |
} |
+ ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); |
+ |
// By the time data comes in, the stream may already be inactive. |
- if (!IsStreamActive(stream_id)) |
+ if (it == active_streams_.end()) |
return; |
// Only decrease the window size for data for active streams. |
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) |
DecreaseRecvWindowSize(static_cast<int32>(len)); |
- scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
- stream->OnDataReceived(data, len); |
+ scoped_ptr<SpdyBuffer> buffer; |
+ if (data) { |
+ DCHECK_GT(len, 0u); |
+ buffer.reset(new SpdyBuffer(data, len)); |
+ } else { |
+ DCHECK_EQ(len, 0u); |
+ } |
+ it->second->OnDataReceived(buffer.Pass()); |
} |
void SpdySession::OnSetting(SpdySettingsIds id, |
@@ -1696,7 +1710,7 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id, |
CHECK(!stream->cancelled()); |
if (status == 0) { |
- stream->OnDataReceived(NULL, 0); |
+ stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); |
} else if (status == RST_STREAM_REFUSED_STREAM) { |
DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); |
} else { |