Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(589)

Unified Diff: net/spdy/spdy_session.cc

Issue 13990005: [SPDY] Replace SpdyIOBuffer with new SpdyBuffer class (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/spdy/spdy_session.cc
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 46a92ee7a6a7842563ca5d839225dafacd801042..c80f5236f2555ec236cb48e7c84f08efb4bddf99 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -30,9 +30,9 @@
#include "net/cert/asn1_util.h"
#include "net/http/http_network_session.h"
#include "net/http/http_server_properties.h"
+#include "net/spdy/spdy_buffer_producer.h"
#include "net/spdy/spdy_credential_builder.h"
#include "net/spdy/spdy_frame_builder.h"
-#include "net/spdy/spdy_frame_producer.h"
#include "net/spdy/spdy_http_utils.h"
#include "net/spdy/spdy_protocol.h"
#include "net/spdy/spdy_session_pool.h"
@@ -312,6 +312,7 @@ SpdySession::SpdySession(const HostPortProxyPair& host_port_proxy_pair,
stream_hi_water_mark_(kFirstStreamId),
write_pending_(false),
in_flight_write_frame_type_(DATA),
+ in_flight_write_frame_size_(0),
delayed_write_pending_(false),
is_secure_(false),
certificate_error_code_(OK),
@@ -635,7 +636,7 @@ int SpdySession::GetProtocolVersion() const {
void SpdySession::EnqueueStreamWrite(
SpdyStream* stream,
SpdyFrameType frame_type,
- scoped_ptr<SpdyFrameProducer> producer) {
+ scoped_ptr<SpdyBufferProducer> producer) {
DCHECK(frame_type == HEADERS ||
frame_type == DATA ||
frame_type == CREDENTIAL ||
@@ -737,18 +738,18 @@ scoped_ptr<SpdyFrame> SpdySession::CreateHeadersFrame(
return frame.Pass();
}
-scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
- net::IOBuffer* data,
- int len,
- SpdyDataFlags flags) {
- // Find our stream
+scoped_ptr<SpdyBuffer> SpdySession::CreateDataBuffer(SpdyStreamId stream_id,
+ net::IOBuffer* data,
+ int len,
+ SpdyDataFlags flags) {
+ // Find our stream.
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
if (len < 0) {
NOTREACHED();
- return scoped_ptr<SpdyFrame>();
+ return scoped_ptr<SpdyBuffer>();
}
if (len > kMaxSpdyFrameChunkSize) {
@@ -771,7 +772,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_STREAM_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return scoped_ptr<SpdyFrame>();
+ return scoped_ptr<SpdyBuffer>();
}
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION) {
effective_window_size =
@@ -783,7 +784,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STREAM_STALLED_ON_SESSION_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return scoped_ptr<SpdyFrame>();
+ return scoped_ptr<SpdyBuffer>();
}
}
@@ -814,7 +815,7 @@ scoped_ptr<SpdyFrame> SpdySession::CreateDataFrame(SpdyStreamId stream_id,
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), static_cast<uint32>(len), flags));
- return frame.Pass();
+ return scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass()));
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
@@ -922,42 +923,45 @@ void SpdySession::OnWriteComplete(int result) {
scoped_refptr<SpdySession> self(this);
DCHECK(write_pending_);
- DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
+ DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
last_activity_time_ = base::TimeTicks::Now();
write_pending_ = false;
if (result < 0) {
- in_flight_write_.Release();
+ in_flight_write_.reset();
in_flight_write_frame_type_ = DATA;
- CloseSessionOnError(static_cast<net::Error>(result), true, "Write error");
+ in_flight_write_frame_size_ = 0;
+ in_flight_write_stream_ = NULL;
+ CloseSessionOnError(static_cast<Error>(result), true, "Write error");
return;
}
// It should not be possible to have written more bytes than our
// in_flight_write_.
- DCHECK_LE(result, in_flight_write_.buffer()->BytesRemaining());
-
- in_flight_write_.buffer()->DidConsume(result);
-
- // We only notify the stream when we've fully written the pending frame.
- if (in_flight_write_.buffer()->BytesRemaining() == 0) {
- DCHECK_GT(result, 0);
-
- scoped_refptr<SpdyStream> stream = in_flight_write_.stream();
+ DCHECK_LE(static_cast<size_t>(result),
+ in_flight_write_->GetRemainingSize());
+
+ if (result > 0) {
+ in_flight_write_->Consume(static_cast<size_t>(result));
+
+ // We only notify the stream when we've fully written the pending frame.
+ if (in_flight_write_->GetRemainingSize() == 0) {
+ // It is possible that the stream was cancelled while we were
+ // writing to the socket.
+ if (in_flight_write_stream_ && !in_flight_write_stream_->cancelled()) {
+ DCHECK_GT(in_flight_write_frame_size_, 0u);
+ in_flight_write_stream_->OnFrameWriteComplete(
+ in_flight_write_frame_type_,
+ in_flight_write_frame_size_);
+ }
- // It is possible that the stream was cancelled while we were writing
- // to the socket.
- if (stream && !stream->cancelled()) {
- DCHECK_GT(in_flight_write_.buffer()->size(), 0);
- stream->OnFrameWriteComplete(
- in_flight_write_frame_type_,
- static_cast<size_t>(in_flight_write_.buffer()->size()));
+ // Cleanup the write which just completed.
+ in_flight_write_.reset();
+ in_flight_write_frame_type_ = DATA;
+ in_flight_write_frame_size_ = 0;
+ in_flight_write_stream_ = NULL;
}
-
- // Cleanup the write which just completed.
- in_flight_write_.Release();
- in_flight_write_frame_type_ = DATA;
}
// Write more data. We're already in a continuation, so we can go
@@ -1035,12 +1039,12 @@ void SpdySession::WriteSocket() {
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
while (true) {
- if (in_flight_write_.buffer()) {
- DCHECK_GT(in_flight_write_.buffer()->BytesRemaining(), 0);
+ if (in_flight_write_) {
+ DCHECK_GT(in_flight_write_->GetRemainingSize(), 0u);
} else {
// Grab the next frame to send.
SpdyFrameType frame_type = DATA;
- scoped_ptr<SpdyFrameProducer> producer;
+ scoped_ptr<SpdyBufferProducer> producer;
scoped_refptr<SpdyStream> stream;
if (!write_queue_.Dequeue(&frame_type, &producer, &stream))
break;
@@ -1064,25 +1068,22 @@ void SpdySession::WriteSocket() {
}
}
- scoped_ptr<SpdyFrame> frame = producer->ProduceFrame();
- if (!frame) {
+ in_flight_write_ = producer->ProduceBuffer();
+ if (!in_flight_write_) {
NOTREACHED();
continue;
}
- DCHECK_GT(frame->size(), 0u);
-
- // TODO(mbelshe): We have too much copying of data here.
- scoped_refptr<IOBufferWithSize> buffer =
- new IOBufferWithSize(frame->size());
- memcpy(buffer->data(), frame->data(), frame->size());
- in_flight_write_ = SpdyIOBuffer(buffer, frame->size(), stream);
in_flight_write_frame_type_ = frame_type;
+ in_flight_write_frame_size_ = in_flight_write_->GetRemainingSize();
+ DCHECK_GE(in_flight_write_frame_size_,
+ buffered_spdy_framer_->GetFrameMinimumSize());
+ in_flight_write_stream_ = stream;
}
write_pending_ = true;
int rv = connection_->socket()->Write(
- in_flight_write_.buffer(),
- in_flight_write_.buffer()->BytesRemaining(),
+ in_flight_write_->GetIOBufferForRemainingData(),
+ in_flight_write_->GetRemainingSize(),
base::Bind(&SpdySession::OnWriteComplete, weak_factory_.GetWeakPtr()));
if (rv == net::ERR_IO_PENDING)
break;
@@ -1269,13 +1270,15 @@ void SpdySession::EnqueueSessionWrite(RequestPriority priority,
frame_type == PING);
EnqueueWrite(
priority, frame_type,
- scoped_ptr<SpdyFrameProducer>(new SimpleFrameProducer(frame.Pass())),
+ scoped_ptr<SpdyBufferProducer>(
+ new SimpleBufferProducer(
+ scoped_ptr<SpdyBuffer>(new SpdyBuffer(frame.Pass())))),
NULL);
}
void SpdySession::EnqueueWrite(RequestPriority priority,
SpdyFrameType frame_type,
- scoped_ptr<SpdyFrameProducer> producer,
+ scoped_ptr<SpdyBufferProducer> producer,
const scoped_refptr<SpdyStream>& stream) {
write_queue_.Enqueue(priority, frame_type, producer.Pass(), stream);
WriteSocketLater();
@@ -1393,16 +1396,24 @@ void SpdySession::OnStreamFrameData(SpdyStreamId stream_id,
base::Bind(&NetLogSpdyDataCallback, stream_id, len, fin));
}
+ ActiveStreamMap::const_iterator it = active_streams_.find(stream_id);
+
// By the time data comes in, the stream may already be inactive.
- if (!IsStreamActive(stream_id))
+ if (it == active_streams_.end())
return;
// Only decrease the window size for data for active streams.
if (flow_control_state_ == FLOW_CONTROL_STREAM_AND_SESSION && len > 0)
DecreaseRecvWindowSize(static_cast<int32>(len));
- scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
- stream->OnDataReceived(data, len);
+ scoped_ptr<SpdyBuffer> buffer;
+ if (data) {
+ DCHECK_GT(len, 0u);
+ buffer.reset(new SpdyBuffer(data, len));
+ } else {
+ DCHECK_EQ(len, 0u);
+ }
+ it->second->OnDataReceived(buffer.Pass());
}
void SpdySession::OnSetting(SpdySettingsIds id,
@@ -1671,7 +1682,7 @@ void SpdySession::OnRstStream(SpdyStreamId stream_id,
CHECK(!stream->cancelled());
if (status == 0) {
- stream->OnDataReceived(NULL, 0);
+ stream->OnDataReceived(scoped_ptr<SpdyBuffer>());
} else if (status == RST_STREAM_REFUSED_STREAM) {
DeleteStream(stream_id, ERR_SPDY_SERVER_REFUSED_STREAM);
} else {

Powered by Google App Engine
This is Rietveld 408576698