Index: trunk/src/net/spdy/spdy_session.cc |
=================================================================== |
--- trunk/src/net/spdy/spdy_session.cc (revision 194561) |
+++ trunk/src/net/spdy/spdy_session.cc (working copy) |
@@ -31,9 +31,9 @@ |
#include "net/cert/asn1_util.h" |
#include "net/http/http_network_session.h" |
#include "net/http/http_server_properties.h" |
-#include "net/spdy/spdy_buffer_producer.h" |
#include "net/spdy/spdy_credential_builder.h" |
#include "net/spdy/spdy_frame_builder.h" |
+#include "net/spdy/spdy_frame_producer.h" |
#include "net/spdy/spdy_http_utils.h" |
#include "net/spdy/spdy_protocol.h" |
#include "net/spdy/spdy_session_pool.h" |
@@ -312,7 +312,6 @@ |
stream_hi_water_mark_(kFirstStreamId), |
write_pending_(false), |
in_flight_write_frame_type_(DATA), |
- in_flight_write_frame_size_(0), |
delayed_write_pending_(false), |
is_secure_(false), |
certificate_error_code_(OK), |
@@ -637,7 +636,7 @@ |
void SpdySession::EnqueueStreamWrite( |
SpdyStream* stream, |
SpdyFrameType frame_type, |
- scoped_ptr<SpdyBufferProducer> producer) { |
+ scoped_ptr<SpdyFrameProducer> producer) { |
DCHECK(frame_type == HEADERS || |
frame_type == DATA || |
frame_type == CREDENTIAL || |
@@ -739,18 +738,18 @@ |
return frame.Pass(); |
} |
-scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id, |
- net::IOBuffer* data, |
- int len, |
- SpdyDataFlags flags) { |
- // Find our stream. |
+scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id, |
+ net::IOBuffer* data, |
+ int len, |
+ SpdyDataFlags flags) { |
+ // Find our stream |
CHECK(IsStreamActive(stream_id)); |
scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
CHECK_EQ(stream->stream_id(), stream_id); |
if (len < 0) { |
NOTREACHED(); |
- return scoped_ptr<SpdyBuffer>(); |
+ return scoped_ptr<SpdyFrame>(); |
} |
if (len > kMaxSpdyFrameChunkSize) { |
@@ -773,7 +772,7 @@ |
net_log().AddEvent( |
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW, |
NetLog::IntegerCallback("stream_id", stream_id)); |
- return scoped_ptr<SpdyBuffer>(); |
+ return scoped_ptr<SpdyFrame>(); |
} |
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) { |
effective_window_size = |
@@ -785,7 +784,7 @@ |
net_log().AddEvent( |
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW, |
NetLog::IntegerCallback("stream_id", stream_id)); |
- return scoped_ptr<SpdyBuffer>(); |
+ return scoped_ptr<SpdyFrame>(); |
} |
} |
@@ -816,7 +815,7 @@ |
buffered_spdy_framer_->CreateDataFrame( |
stream_id, data->data(), static_cast<uint32>(len), flags)); |
- return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())); |
+ return frame.Pass(); |
} |
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) { |
@@ -983,45 +982,42 @@ |
scoped_refptr<SpdySession> self(this); |
DCHECK(write_pending_); |
- DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); |
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
last_activity_time_ = base::TimeTicks::Now(); |
write_pending_ = false; |
if (result < 0) { |
- in_flight_write_.reset(); |
+ in_flight_write_.Release(); |
in_flight_write_frame_type_ = DATA; |
- in_flight_write_frame_size_ = 0; |
- in_flight_write_stream_ = NULL; |
- CloseSessionOnError(static_cast<Error>(result), true, "Write error"); |
+ CloseSessionOnError(static_cast<net::Error>(result), true, "Write error"); |
return; |
} |
// It should not be possible to have written more bytes than our |
// in_flight_write_. |
- DCHECK_LE(static_cast<size_t>(result), |
- in_flight_write_->GetRemainingSize()); |
+ DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining()); |
- if (result > 0) { |
- in_flight_write_->Consume(static_cast<size_t>(result)); |
+ in_flight_write_.buffer()->DidConsume(result); |
- // We only notify the stream when we've fully written the pending frame. |
- if (in_flight_write_->GetRemainingSize() == 0) { |
- // It is possible that the stream was cancelled while we were |
- // writing to the socket. |
- if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) { |
- DCHECK_GT(in_flight_write_frame_size_, 0u); |
- in_flight_write_stream_->OnFrameWriteComplete( |
- in_flight_write_frame_type_, |
- in_flight_write_frame_size_); |
- } |
+ // We only notify the stream when we've fully written the pending frame. |
+ if (in_flight_write_.buffer()->BytesRemaining() == 0) { |
+ DCHECK_GT(result, 0); |
- // Cleanup the write which just completed. |
- in_flight_write_.reset(); |
- in_flight_write_frame_type_ = DATA; |
- in_flight_write_frame_size_ = 0; |
- in_flight_write_stream_ = NULL; |
+ scoped_refptr<SpdyStream> stream = in_flight_write_.stream(); |
+ |
+ // It is possible that the stream was cancelled while we were writing |
+ // to the socket. |
+ if (stream && !stream->cancelled()) { |
+ DCHECK_GT(in_flight_write_.buffer()->size(), 0); |
+ stream->OnFrameWriteComplete( |
+ in_flight_write_frame_type_, |
+ static_cast<size_t>(in_flight_write_.buffer()->size())); |
} |
+ |
+ // Cleanup the write which just completed. |
+ in_flight_write_.Release(); |
+ in_flight_write_frame_type_ = DATA; |
} |
// Write more data. We're already in a continuation, so we can go |
@@ -1061,12 +1057,12 @@ |
// returns error (or ERR_IO_PENDING). |
DCHECK(buffered_spdy_framer_.get()); |
while (true) { |
- if (in_flight_write_) { |
- DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u); |
+ if (in_flight_write_.buffer()) { |
+ DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0); |
} else { |
// Grab the next frame to send. |
SpdyFrameType frame_type = DATA; |
- scoped_ptr<SpdyBufferProducer> producer; |
+ scoped_ptr<SpdyFrameProducer> producer; |
scoped_refptr<SpdyStream> stream; |
if (!write_queue_.Dequeue(&frame_type, &producer, &stream)) |
break; |
@@ -1090,25 +1086,25 @@ |
} |
} |
- in_flight_write_ = producer->ProduceBuffer(); |
- if (!in_flight_write_) { |
+ scoped_ptr<SpdyFrame> frame = producer->ProduceFrame(); |
+ if (!frame) { |
NOTREACHED(); |
continue; |
} |
+ DCHECK_GT(frame->size(), 0u); |
+ |
+ // TODO(mbelshe): We have too much copying of data here. |
+ scoped_refptr<IOBufferWithSize> buffer = |
+ new IOBufferWithSize(frame->size()); |
+ memcpy(buffer->data(), frame->data(), frame->size()); |
+ in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream); |
in_flight_write_frame_type_ = frame_type; |
- in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize(); |
- DCHECK_GE(in_flight_write_frame_size_, |
- buffered_spdy_framer_->GetFrameMinimumSize()); |
- in_flight_write_stream_ = stream; |
} |
write_pending_ = true; |
- // We keep |in_flight_write_| alive until OnWriteComplete(), so |
- // it's okay to use GetIOBufferForRemainingData() since the socket |
- // doesn't use the IOBuffer past OnWriteComplete(). |
int rv = connection_->socket()->Write( |
- in_flight_write_->GetIOBufferForRemainingData(), |
- in_flight_write_->GetRemainingSize(), |
+ in_flight_write_.buffer(), |
+ in_flight_write_.buffer()->BytesRemaining(), |
base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr())); |
if (rv == net::ERR_IO_PENDING) |
break; |
@@ -1298,15 +1294,13 @@ |
frame_type == PING); |
EnqueueWrite( |
priority, frame_type, |
- scoped_ptr<SpdyBufferProducer>( |
- new SimpleBufferProducer( |
- scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))), |
+ scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())), |
NULL); |
} |
void SpdySession::EnqueueWrite(RequestPriority priority, |
SpdyFrameType frame_type, |
- scoped_ptr<SpdyBufferProducer> producer, |
+ scoped_ptr<SpdyFrameProducer> producer, |
const scoped_refptr<SpdyStream>& stream) { |
write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream); |
WriteSocketLater(); |
@@ -1424,24 +1418,16 @@ |
base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin)); |
} |
- ActiveStreamMap::const_iterator it = active_streams_.find(stream_id); |
- |
// By the time data comes in, the stream may already be inactive. |
- if (it == active_streams_.end()) |
+ if (!IsStreamActive(stream_id)) |
return; |
// Only decrease the window size for data for active streams. |
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0) |
DecreaseRecvWindowSize(static_cast<int32>(len)); |
- scoped_ptr<SpdyBuffer> buffer; |
- if (data) { |
- DCHECK_GT(len, 0u); |
- buffer.reset(new SpdyBuffer(data, len)); |
- } else { |
- DCHECK_EQ(len, 0u); |
- } |
- it->second->OnDataReceived(buffer.Pass()); |
+ scoped_refptr<SpdyStream> stream = active_streams_[stream_id]; |
+ stream->OnDataReceived(data, len); |
} |
void SpdySession::OnSetting(SpdySettingsIds id, |
@@ -1710,7 +1696,7 @@ |
CHECK(!stream->cancelled()); |
if (status == 0) { |
- stream->OnDataReceived(scoped_ptr<SpdyBuffer>()); |
+ stream->OnDataReceived(NULL, 0); |
} else if (status == RST_STREAM_REFUSED_STREAM) { |
DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM); |
} else { |