Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(683)

Unified Diff: net/spdy/spdy_session.cc

Issue 10448083: Fix out of order SYN_STEAM frames. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Cleanup Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: net/spdy/spdy_session.cc
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index 050d35beae9204adfd1771ccc7bf7730a7288c49..def2a78a1e08bb536ab67f51590d1baa97e3c10c 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -473,6 +473,11 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
return !ssl_info.client_cert_sent && ssl_info.cert->VerifyNameMatch(domain);
}
+void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream) {
+ write_queue_.push(stream);
+ WriteSocketLater();
+}
+
int SpdySession::GetPushStream(
const GURL& url,
scoped_refptr<SpdyStream>* stream,
@@ -511,7 +516,8 @@ int SpdySession::CreateStream(
const BoundNetLog& stream_net_log,
const CompletionCallback& callback) {
if (!max_concurrent_streams_ ||
- active_streams_.size() < max_concurrent_streams_) {
+ (active_streams_.size() + created_streams_.size()
+ < max_concurrent_streams_)) {
return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
}
@@ -601,10 +607,7 @@ int SpdySession::CreateStreamImpl(
const std::string& path = url.PathForRequest();
- const SpdyStreamId stream_id = GetNewStreamId();
-
*spdy_stream = new SpdyStream(this,
- stream_id,
false,
stream_net_log);
const scoped_refptr<SpdyStream>& stream = *spdy_stream;
@@ -613,14 +616,13 @@ int SpdySession::CreateStreamImpl(
stream->set_path(path);
stream->set_send_window_size(initial_send_window_size_);
stream->set_recv_window_size(initial_recv_window_size_);
- ActivateStream(stream);
+ created_streams_.insert(stream);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
static_cast<int>(priority), 0, 10, 11);
// TODO(mbelshe): Optimize memory allocations
- DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
return OK;
}
@@ -642,15 +644,13 @@ int SpdySession::GetProtocolVersion() const {
return buffered_spdy_framer_->protocol_version();
}
-int SpdySession::WriteSynStream(
+SpdySynStreamControlFrame* SpdySession::CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
SpdyControlFlags flags,
const linked_ptr<SpdyHeaderBlock>& headers) {
- // Find our stream
- if (!IsStreamActive(stream_id))
- return ERR_INVALID_SPDY_STREAM;
+ DCHECK(IsStreamActive(stream_id));
const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
@@ -661,11 +661,7 @@ int SpdySession::WriteSynStream(
buffered_spdy_framer_->CreateSynStream(
stream_id, 0,
ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
- credential_slot, flags, false, headers.get()));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order.
- // http://crbug.com/111708
- QueueFrame(syn_frame.get(), HIGHEST, stream);
+ credential_slot, flags, true, headers.get()));
base::StatsCounter spdy_requests("spdy.requests");
spdy_requests.Increment();
@@ -678,14 +674,15 @@ int SpdySession::WriteSynStream(
new NetLogSpdySynParameter(headers, flags, stream_id, 0)));
}
- return ERR_IO_PENDING;
+ return syn_frame.release();
}
-int SpdySession::WriteCredentialFrame(const std::string& origin,
- SSLClientCertType type,
- const std::string& key,
- const std::string& cert,
- RequestPriority priority) {
+SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame(
+ const std::string& origin,
+ SSLClientCertType type,
+ const std::string& key,
+ const std::string& cert,
+ RequestPriority priority) {
DCHECK(is_secure_);
unsigned char secret[32]; // 32 bytes from the spec
GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof",
@@ -727,12 +724,6 @@ int SpdySession::WriteCredentialFrame(const std::string& origin,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyCredentialControlFrame> credential_frame(
buffered_spdy_framer_->CreateCredentialFrame(credential));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order, which means that we need
- // to enqueue all CREDENTIAL frames at this priority to ensure that
- // they are sent *before* the SYN_STREAM that references them.
- // http://crbug.com/111708
- QueueFrame(credential_frame.get(), HIGHEST, NULL);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
@@ -741,12 +732,12 @@ int SpdySession::WriteCredentialFrame(const std::string& origin,
new NetLogSpdyCredentialParameter(credential.slot,
origin)));
}
- return ERR_IO_PENDING;
+ return credential_frame.release();
}
-int SpdySession::WriteStreamData(SpdyStreamId stream_id,
- net::IOBuffer* data, int len,
- SpdyDataFlags flags) {
+SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
+ net::IOBuffer* data, int len,
+ SpdyDataFlags flags) {
// Find our stream
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
@@ -771,7 +762,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
make_scoped_refptr(
new NetLogIntegerParameter("stream_id", stream_id)));
- return ERR_IO_PENDING;
+ return NULL;
}
int new_len = std::min(len, stream->send_window_size());
if (new_len < len) {
@@ -796,18 +787,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
scoped_ptr<SpdyDataFrame> frame(
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), len, flags));
- QueueFrame(frame.get(), stream->priority(), stream);
- return ERR_IO_PENDING;
+ return frame.release();
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
+ DCHECK_NE(0u, stream_id);
// TODO(mbelshe): We should send a RST_STREAM control frame here
// so that the server can cancel a large send.
DeleteStream(stream_id, status);
}
+void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) {
+ DCHECK_EQ(0u, stream->stream_id());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+}
+
void SpdySession::ResetStream(SpdyStreamId stream_id,
SpdyStatusCodes status,
const std::string& description) {
@@ -826,7 +822,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
priority = stream->priority();
}
- QueueFrame(rst_frame.get(), priority, NULL);
+ QueueFrame(rst_frame.release(), priority);
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
@@ -1014,44 +1010,36 @@ void SpdySession::WriteSocket() {
// Loop sending frames until we've sent everything or until the write
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
- while (in_flight_write_.buffer() || !queue_.empty()) {
+ while (in_flight_write_.buffer() || !write_queue_.empty()) {
if (!in_flight_write_.buffer()) {
// Grab the next SpdyFrame to send.
- SpdyIOBuffer next_buffer = queue_.top();
- queue_.pop();
-
- // We've deferred compression until just before we write it to the socket,
- // which is now. At this time, we don't compress our data frames.
- SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
- size_t size;
- if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) {
- DCHECK(uncompressed_frame.is_control_frame());
- scoped_ptr<SpdyFrame> compressed_frame(
- buffered_spdy_framer_->CompressControlFrame(
- reinterpret_cast<const SpdyControlFrame&>(uncompressed_frame)));
- if (!compressed_frame.get()) {
- RecordProtocolErrorHistogram(
- PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE);
- CloseSessionOnError(
- net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure.");
- return;
- }
-
- size = compressed_frame->length() + SpdyFrame::kHeaderSize;
-
- DCHECK_GT(size, 0u);
-
- // TODO(mbelshe): We have too much copying of data here.
- IOBufferWithSize* buffer = new IOBufferWithSize(size);
- memcpy(buffer->data(), compressed_frame->data(), size);
-
- // Attempt to send the frame.
- in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST,
- next_buffer.stream());
- } else {
- size = uncompressed_frame.length() + SpdyFrame::kHeaderSize;
- in_flight_write_ = next_buffer;
+ SpdyFrameProducer* producer = write_queue_.top();
+ write_queue_.pop();
+ SpdyStream* stream = producer->GetSpdyStream();
+ if (stream != NULL && stream->stream_id() == 0) {
+ stream->set_stream_id(GetNewStreamId());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+ ActivateStream(stream);
+ DCHECK_EQ(active_streams_[stream->stream_id()].get(), stream);
}
+ SpdyFrame* frame = producer->ProduceNextFrame();
+ if (write_queue_.size() > 0)
+ // It is possible that a stream had data to write, but a
+ // WINDOW_UPDATE frame has been received which made that
+ // stream no longer writable.
+ // TODO(rch): consider handling that case by removing the
+ // stream from the writable queue?
+ if (frame == NULL)
+ continue;
+ size_t size = frame->length() + SpdyFrame::kHeaderSize;
+ DCHECK_GT(size, 0u);
+
+ // TODO(mbelshe): We have too much copying of data here.
+ IOBufferWithSize* buffer = new IOBufferWithSize(size);
+ memcpy(buffer->data(), frame->data(), size);
+
+ in_flight_write_ = SpdyIOBuffer(buffer, size, producer->GetPriority(),
+ stream);
} else {
DCHECK(in_flight_write_.buffer()->BytesRemaining());
}
@@ -1105,9 +1093,20 @@ void SpdySession::CloseAllStreams(net::Error status) {
DeleteStream(stream->stream_id(), status);
}
+ while (!created_streams_.empty()) {
+ CreatedStreamSet::iterator it = created_streams_.begin();
+ const scoped_refptr<SpdyStream>& stream = *it;
+ std::string description = base::StringPrintf(
+ "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
+ stream->LogStreamError(status, description);
+ if (stream)
+ stream->OnClose(status);
+ created_streams_.erase(it);
+ }
+
// We also need to drain the queue.
- while (queue_.size())
- queue_.pop();
+ while (!write_queue_.empty())
+ write_queue_.pop();
}
int SpdySession::GetNewStreamId() {
@@ -1118,17 +1117,6 @@ int SpdySession::GetNewStreamId() {
return id;
}
-void SpdySession::QueueFrame(SpdyFrame* frame,
- RequestPriority priority,
- SpdyStream* stream) {
- int length = SpdyFrame::kHeaderSize + frame->length();
- IOBuffer* buffer = new IOBuffer(length);
- memcpy(buffer->data(), frame->data(), length);
- queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
-
- WriteSocketLater();
-}
-
void SpdySession::CloseSessionOnError(net::Error err,
bool remove_from_pool,
const std::string& description) {
@@ -1219,6 +1207,35 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
return connection_->socket()->GetLocalAddress(address);
}
+class SimpleSpdyFrameProducer : public SpdyFrameProducer {
+ public:
+ SimpleSpdyFrameProducer(SpdyFrame* frame,
+ RequestPriority priority)
+ : frame_(frame),
+ priority_(priority) {
+ }
+ virtual RequestPriority GetPriority() const OVERRIDE {
+ return priority_;
+ }
+ virtual SpdyFrame* ProduceNextFrame() OVERRIDE {
+ return frame_;
+ }
+ virtual SpdyStream* GetSpdyStream() const OVERRIDE {
+ return NULL;
+ }
+ private:
+ SpdyFrame* frame_;
+ RequestPriority priority_;
+};
+
+void SpdySession::QueueFrame(SpdyFrame* frame,
+ RequestPriority priority) {
+ SimpleSpdyFrameProducer* producer
+ = new SimpleSpdyFrameProducer(frame, priority);
+ write_queue_.push(producer);
+ WriteSocketLater();
+}
+
void SpdySession::ActivateStream(SpdyStream* stream) {
const SpdyStreamId id = stream->stream_id();
DCHECK(!IsStreamActive(id));
@@ -1247,6 +1264,17 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
if (it2 == active_streams_.end())
return;
+ // Possibly remove from the write queue
+ WriteQueue old = write_queue_;
+ write_queue_ = WriteQueue();
+ while (!old.empty()) {
+ SpdyFrameProducer* producer = old.top();
+ if (producer->GetSpdyStream() == NULL ||
+ producer->GetSpdyStream()->stream_id() != id)
+ write_queue_.push(producer);
+ old.pop();
+ }
+
// If this is an active stream, call the callback.
const scoped_refptr<SpdyStream> stream(it2->second);
active_streams_.erase(it2);
@@ -1463,8 +1491,8 @@ void SpdySession::OnSynStream(
return;
}
- scoped_refptr<SpdyStream> stream(
- new SpdyStream(this, stream_id, true, net_log_));
+ scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_));
+ stream->set_stream_id(stream_id);
stream->set_path(gurl.PathForRequest());
stream->set_send_window_size(initial_send_window_size_);
@@ -1486,7 +1514,6 @@ void SpdySession::OnSynStream(
void SpdySession::OnSynReply(const SpdySynReplyControlFrame& frame,
const linked_ptr<SpdyHeaderBlock>& headers) {
SpdyStreamId stream_id = frame.stream_id();
-
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
@@ -1671,7 +1698,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame(
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
- QueueFrame(window_update_frame.get(), stream->priority(), NULL);
+ QueueFrame(window_update_frame.release(), stream->priority());
}
// Given a cwnd that we would have sent to the server, modify it based on the
@@ -1733,7 +1760,7 @@ void SpdySession::SendSettings() {
scoped_ptr<SpdySettingsControlFrame> settings_frame(
buffered_spdy_framer_->CreateSettings(settings_map_new));
sent_settings_ = true;
- QueueFrame(settings_frame.get(), HIGHEST, NULL);
+ QueueFrame(settings_frame.release(), HIGHEST);
}
void SpdySession::HandleSetting(uint32 id, uint32 value) {
@@ -1770,6 +1797,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
DCHECK(stream);
stream->AdjustSendWindowSize(delta_window_size);
}
+
+ CreatedStreamSet::iterator i;
+ for (i = created_streams_.begin(); i != created_streams_.end(); i++) {
+ const scoped_refptr<SpdyStream>& stream = *i;
+ stream->AdjustSendWindowSize(delta_window_size);
+ }
}
void SpdySession::SendPrefacePingIfNoneInFlight() {
@@ -1790,7 +1823,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyPingControlFrame> ping_frame(
buffered_spdy_framer_->CreatePingFrame(next_ping_id_));
- QueueFrame(ping_frame.get(), HIGHEST, NULL);
+ QueueFrame(ping_frame.release(), HIGHEST);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(

Powered by Google App Engine
This is Rietveld 408576698