Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(417)

Unified Diff: net/spdy/spdy_session.cc

Issue 10448083: Fix out of order SYN_STEAM frames. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Add copy constructor to fix win build Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: net/spdy/spdy_session.cc
diff --git a/net/spdy/spdy_session.cc b/net/spdy/spdy_session.cc
index ad300ebbd9ac669d631fd2f12f21773d9efd2d91..ffa64651ad9197b6ad7874d07ae7a10db6d94fd6 100644
--- a/net/spdy/spdy_session.cc
+++ b/net/spdy/spdy_session.cc
@@ -180,6 +180,28 @@ bool g_enable_ping_based_connection_checking = true;
} // namespace
// static
+void SpdySession::SpdyIOBufferProducer::ActivateStream(
+ SpdySession* spdy_session,
+ SpdyStream* spdy_stream) {
+ spdy_session->ActivateStream(spdy_stream);
+}
+
+// static
+SpdyIOBuffer* SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ SpdyFrame* frame,
+ RequestPriority priority,
+ SpdyStream* stream) {
+ size_t size = frame->length() + SpdyFrame::kHeaderSize;
+ DCHECK_GT(size, 0u);
+
+ // TODO(mbelshe): We have too much copying of data here.
+ IOBufferWithSize* buffer = new IOBufferWithSize(size);
+ memcpy(buffer->data(), frame->data(), size);
+
+ return new SpdyIOBuffer(buffer, size, priority, stream);
+}
+
+// static
void SpdySession::set_default_protocol(NextProto default_protocol) {
g_default_protocol = default_protocol;
}
@@ -355,6 +377,13 @@ bool SpdySession::VerifyDomainAuthentication(const std::string& domain) {
return !ssl_info.client_cert_sent && ssl_info.cert->VerifyNameMatch(domain);
}
+void SpdySession::SetStreamHasWriteAvailable(SpdyStream* stream,
+ SpdyIOBufferProducer* producer) {
+ write_queue_.push(producer);
+ stream_producers_[producer] = stream;
+ WriteSocketLater();
+}
+
int SpdySession::GetPushStream(
const GURL& url,
scoped_refptr<SpdyStream>* stream,
@@ -393,7 +422,8 @@ int SpdySession::CreateStream(
const BoundNetLog& stream_net_log,
const CompletionCallback& callback) {
if (!max_concurrent_streams_ ||
- active_streams_.size() < max_concurrent_streams_) {
+ (active_streams_.size() + created_streams_.size() <
+ max_concurrent_streams_)) {
return CreateStreamImpl(url, priority, spdy_stream, stream_net_log);
}
@@ -483,10 +513,7 @@ int SpdySession::CreateStreamImpl(
const std::string& path = url.PathForRequest();
- const SpdyStreamId stream_id = GetNewStreamId();
-
*spdy_stream = new SpdyStream(this,
- stream_id,
false,
stream_net_log);
const scoped_refptr<SpdyStream>& stream = *spdy_stream;
@@ -495,14 +522,13 @@ int SpdySession::CreateStreamImpl(
stream->set_path(path);
stream->set_send_window_size(initial_send_window_size_);
stream->set_recv_window_size(initial_recv_window_size_);
- ActivateStream(stream);
+ created_streams_.insert(stream);
UMA_HISTOGRAM_CUSTOM_COUNTS("Net.SpdyPriorityCount",
static_cast<int>(priority), 0, 10, 11);
// TODO(mbelshe): Optimize memory allocations
- DCHECK_EQ(active_streams_[stream_id].get(), stream.get());
return OK;
}
@@ -524,15 +550,13 @@ int SpdySession::GetProtocolVersion() const {
return buffered_spdy_framer_->protocol_version();
}
-int SpdySession::WriteSynStream(
+SpdySynStreamControlFrame* SpdySession::CreateSynStream(
SpdyStreamId stream_id,
RequestPriority priority,
uint8 credential_slot,
SpdyControlFlags flags,
const linked_ptr<SpdyHeaderBlock>& headers) {
- // Find our stream
- if (!IsStreamActive(stream_id))
- return ERR_INVALID_SPDY_STREAM;
+ CHECK(IsStreamActive(stream_id));
const scoped_refptr<SpdyStream>& stream = active_streams_[stream_id];
CHECK_EQ(stream->stream_id(), stream_id);
@@ -543,11 +567,7 @@ int SpdySession::WriteSynStream(
buffered_spdy_framer_->CreateSynStream(
stream_id, 0,
ConvertRequestPriorityToSpdyPriority(priority, GetProtocolVersion()),
- credential_slot, flags, false, headers.get()));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order.
- // http://crbug.com/111708
- QueueFrame(syn_frame.get(), HIGHEST, stream);
+ credential_slot, flags, true, headers.get()));
base::StatsCounter spdy_requests("spdy.requests");
spdy_requests.Increment();
@@ -559,14 +579,15 @@ int SpdySession::WriteSynStream(
base::Bind(&NetLogSpdySynCallback, headers.get(), flags, stream_id, 0));
}
- return ERR_IO_PENDING;
+ return syn_frame.release();
}
-int SpdySession::WriteCredentialFrame(const std::string& origin,
- SSLClientCertType type,
- const std::string& key,
- const std::string& cert,
- RequestPriority priority) {
+SpdyCredentialControlFrame* SpdySession::CreateCredentialFrame(
+ const std::string& origin,
+ SSLClientCertType type,
+ const std::string& key,
+ const std::string& cert,
+ RequestPriority priority) {
DCHECK(is_secure_);
unsigned char secret[32]; // 32 bytes from the spec
GetSSLClientSocket()->ExportKeyingMaterial("SPDY certificate proof",
@@ -608,24 +629,18 @@ int SpdySession::WriteCredentialFrame(const std::string& origin,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyCredentialControlFrame> credential_frame(
buffered_spdy_framer_->CreateCredentialFrame(credential));
- // We enqueue all SYN_STREAM frames at the same priority to ensure
- // that we do not send them out-of-order, which means that we need
- // to enqueue all CREDENTIAL frames at this priority to ensure that
- // they are sent *before* the SYN_STREAM that references them.
- // http://crbug.com/111708
- QueueFrame(credential_frame.get(), HIGHEST, NULL);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SEND_CREDENTIAL,
base::Bind(&NetLogSpdyCredentialCallback, credential.slot, &origin));
}
- return ERR_IO_PENDING;
+ return credential_frame.release();
}
-int SpdySession::WriteStreamData(SpdyStreamId stream_id,
- net::IOBuffer* data, int len,
- SpdyDataFlags flags) {
+SpdyDataFrame* SpdySession::CreateDataFrame(SpdyStreamId stream_id,
+ net::IOBuffer* data, int len,
+ SpdyDataFlags flags) {
// Find our stream
CHECK(IsStreamActive(stream_id));
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
@@ -649,7 +664,7 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_STALLED_ON_SEND_WINDOW,
NetLog::IntegerCallback("stream_id", stream_id));
- return ERR_IO_PENDING;
+ return NULL;
}
int new_len = std::min(len, stream->send_window_size());
if (new_len < len) {
@@ -674,18 +689,23 @@ int SpdySession::WriteStreamData(SpdyStreamId stream_id,
scoped_ptr<SpdyDataFrame> frame(
buffered_spdy_framer_->CreateDataFrame(
stream_id, data->data(), len, flags));
- QueueFrame(frame.get(), stream->priority(), stream);
- return ERR_IO_PENDING;
+ return frame.release();
}
void SpdySession::CloseStream(SpdyStreamId stream_id, int status) {
+ DCHECK_NE(0u, stream_id);
// TODO(mbelshe): We should send a RST_STREAM control frame here
// so that the server can cancel a large send.
DeleteStream(stream_id, status);
}
+void SpdySession::CloseCreatedStream(SpdyStream* stream, int status) {
+ DCHECK_EQ(0u, stream->stream_id());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+}
+
void SpdySession::ResetStream(SpdyStreamId stream_id,
SpdyStatusCodes status,
const std::string& description) {
@@ -703,7 +723,7 @@ void SpdySession::ResetStream(SpdyStreamId stream_id,
scoped_refptr<SpdyStream> stream = active_streams_[stream_id];
priority = stream->priority();
}
- QueueFrame(rst_frame.get(), priority, NULL);
+ QueueFrame(rst_frame.release(), priority);
RecordProtocolErrorHistogram(
static_cast<SpdyProtocolErrorDetails>(status + STATUS_CODE_INVALID));
DeleteStream(stream_id, ERR_SPDY_PROTOCOL_ERROR);
@@ -891,46 +911,22 @@ void SpdySession::WriteSocket() {
// Loop sending frames until we've sent everything or until the write
// returns error (or ERR_IO_PENDING).
DCHECK(buffered_spdy_framer_.get());
- while (in_flight_write_.buffer() || !queue_.empty()) {
+ while (in_flight_write_.buffer() || !write_queue_.empty()) {
if (!in_flight_write_.buffer()) {
- // Grab the next SpdyFrame to send.
- SpdyIOBuffer next_buffer = queue_.top();
- queue_.pop();
-
- // We've deferred compression until just before we write it to the socket,
- // which is now. At this time, we don't compress our data frames.
- SpdyFrame uncompressed_frame(next_buffer.buffer()->data(), false);
- size_t size;
- if (buffered_spdy_framer_->IsCompressible(uncompressed_frame)) {
- DCHECK(uncompressed_frame.is_control_frame());
- const SpdyControlFrame* uncompressed_control_frame =
- reinterpret_cast<const SpdyControlFrame*>(&uncompressed_frame);
- scoped_ptr<SpdyFrame> compressed_frame(
- buffered_spdy_framer_->CompressControlFrame(
- *uncompressed_control_frame));
- if (!compressed_frame.get()) {
- RecordProtocolErrorHistogram(
- PROTOCOL_ERROR_SPDY_COMPRESSION_FAILURE);
- CloseSessionOnError(
- net::ERR_SPDY_PROTOCOL_ERROR, true, "SPDY Compression failure.");
- return;
- }
-
- size = compressed_frame->length() + SpdyFrame::kHeaderSize;
-
- DCHECK_GT(size, 0u);
-
- // TODO(mbelshe): We have too much copying of data here.
- IOBufferWithSize* buffer = new IOBufferWithSize(size);
- memcpy(buffer->data(), compressed_frame->data(), size);
-
- // Attempt to send the frame.
- in_flight_write_ = SpdyIOBuffer(buffer, size, HIGHEST,
- next_buffer.stream());
- } else {
- size = uncompressed_frame.length() + SpdyFrame::kHeaderSize;
- in_flight_write_ = next_buffer;
- }
+ // Grab the next SpdyBuffer to send.
+ scoped_ptr<SpdyIOBufferProducer> producer(write_queue_.top());
+ scoped_ptr<SpdyIOBuffer> buffer(producer->ProduceNextBuffer(this));
+ stream_producers_.erase(producer.get());
+ write_queue_.pop();
+ // It is possible that a stream had data to write, but a
+ // WINDOW_UPDATE frame has been received which made that
+ // stream no longer writable.
+ // TODO(rch): consider handling that case by removing the
+ // stream from the writable queue?
+ if (buffer == NULL)
+ continue;
+
+ in_flight_write_ = *buffer;
} else {
DCHECK(in_flight_write_.buffer()->BytesRemaining());
}
@@ -977,16 +973,33 @@ void SpdySession::CloseAllStreams(net::Error status) {
while (!active_streams_.empty()) {
ActiveStreamMap::iterator it = active_streams_.begin();
const scoped_refptr<SpdyStream>& stream = it->second;
- DCHECK(stream);
- std::string description = base::StringPrintf(
- "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
- stream->LogStreamError(status, description);
+ LogAbandonedStream(stream, status);
DeleteStream(stream->stream_id(), status);
}
+ while (!created_streams_.empty()) {
+ CreatedStreamSet::iterator it = created_streams_.begin();
+ const scoped_refptr<SpdyStream>& stream = *it;
+ LogAbandonedStream(stream, status);
+ stream->OnClose(status);
+ created_streams_.erase(it);
+ }
+
// We also need to drain the queue.
- while (queue_.size())
- queue_.pop();
+ while (!write_queue_.empty()) {
+ SpdyIOBufferProducer* producer = write_queue_.top();
+ stream_producers_.erase(producer);
+ delete producer;
+ write_queue_.pop();
+ }
+}
+
+void SpdySession::LogAbandonedStream(const scoped_refptr<SpdyStream>& stream,
+ net::Error status) {
+ DCHECK(stream);
+ std::string description = base::StringPrintf(
+ "ABANDONED (stream_id=%d): ", stream->stream_id()) + stream->path();
+ stream->LogStreamError(status, description);
}
int SpdySession::GetNewStreamId() {
@@ -997,17 +1010,6 @@ int SpdySession::GetNewStreamId() {
return id;
}
-void SpdySession::QueueFrame(SpdyFrame* frame,
- RequestPriority priority,
- SpdyStream* stream) {
- int length = SpdyFrame::kHeaderSize + frame->length();
- IOBuffer* buffer = new IOBuffer(length);
- memcpy(buffer->data(), frame->data(), length);
- queue_.push(SpdyIOBuffer(buffer, length, priority, stream));
-
- WriteSocketLater();
-}
-
void SpdySession::CloseSessionOnError(net::Error err,
bool remove_from_pool,
const std::string& description) {
@@ -1097,7 +1099,41 @@ int SpdySession::GetLocalAddress(IPEndPoint* address) const {
return connection_->socket()->GetLocalAddress(address);
}
+class SimpleSpdyIOBufferProducer : public SpdySession::SpdyIOBufferProducer {
+ public:
+ SimpleSpdyIOBufferProducer(SpdyFrame* frame,
+ RequestPriority priority)
+ : frame_(frame),
+ priority_(priority) {
+ }
+
+ virtual RequestPriority GetPriority() const OVERRIDE {
+ return priority_;
+ }
+
+ virtual SpdyIOBuffer* ProduceNextBuffer(SpdySession* session) {
+ return SpdySession::SpdyIOBufferProducer::CreateIOBuffer(
+ frame_, priority_, NULL);
+ }
+
+ private:
+ SpdyFrame* frame_;
+ RequestPriority priority_;
+};
+
+void SpdySession::QueueFrame(SpdyFrame* frame,
+ RequestPriority priority) {
+ SimpleSpdyIOBufferProducer* producer =
+ new SimpleSpdyIOBufferProducer(frame, priority);
+ write_queue_.push(producer);
+ WriteSocketLater();
+}
+
void SpdySession::ActivateStream(SpdyStream* stream) {
+ if (stream->stream_id() == 0) {
+ stream->set_stream_id(GetNewStreamId());
+ created_streams_.erase(scoped_refptr<SpdyStream>(stream));
+ }
const SpdyStreamId id = stream->stream_id();
DCHECK(!IsStreamActive(id));
@@ -1125,6 +1161,19 @@ void SpdySession::DeleteStream(SpdyStreamId id, int status) {
if (it2 == active_streams_.end())
return;
+ // Possibly remove from the write queue.
+ WriteQueue old = write_queue_;
+ write_queue_ = WriteQueue();
+ while (!old.empty()) {
+ SpdyIOBufferProducer* producer = old.top();
+ StreamProducerMap::iterator it = stream_producers_.find(producer);
+ if (it == stream_producers_.end() || it->second->stream_id() != id)
+ write_queue_.push(producer);
+ else
+ delete producer;
+ old.pop();
+ }
+
// If this is an active stream, call the callback.
const scoped_refptr<SpdyStream> stream(it2->second);
active_streams_.erase(it2);
@@ -1354,8 +1403,8 @@ void SpdySession::OnSynStream(
return;
}
- scoped_refptr<SpdyStream> stream(
- new SpdyStream(this, stream_id, true, net_log_));
+ scoped_refptr<SpdyStream> stream(new SpdyStream(this, true, net_log_));
+ stream->set_stream_id(stream_id);
stream->set_path(gurl.PathForRequest());
stream->set_send_window_size(initial_send_window_size_);
@@ -1377,7 +1426,6 @@ void SpdySession::OnSynStream(
void SpdySession::OnSynReply(const SpdySynReplyControlFrame& frame,
const linked_ptr<SpdyHeaderBlock>& headers) {
SpdyStreamId stream_id = frame.stream_id();
-
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
NetLog::TYPE_SPDY_SESSION_SYN_REPLY,
@@ -1561,7 +1609,7 @@ void SpdySession::SendWindowUpdate(SpdyStreamId stream_id,
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyWindowUpdateControlFrame> window_update_frame(
buffered_spdy_framer_->CreateWindowUpdate(stream_id, delta_window_size));
- QueueFrame(window_update_frame.get(), stream->priority(), NULL);
+ QueueFrame(window_update_frame.release(), stream->priority());
}
// Given a cwnd that we would have sent to the server, modify it based on the
@@ -1600,7 +1648,6 @@ void SpdySession::SendInitialSettings() {
settings_map[SETTINGS_INITIAL_WINDOW_SIZE] =
SettingsFlagsAndValue(SETTINGS_FLAG_NONE, initial_recv_window_size_);
}
- sent_settings_ = true;
SendSettings(settings_map);
}
@@ -1633,7 +1680,6 @@ void SpdySession::SendInitialSettings() {
HandleSetting(new_id, new_val);
}
- sent_settings_ = true;
SendSettings(settings_map_new);
}
@@ -1647,7 +1693,8 @@ void SpdySession::SendSettings(const SettingsMap& settings) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdySettingsControlFrame> settings_frame(
buffered_spdy_framer_->CreateSettings(settings));
- QueueFrame(settings_frame.get(), HIGHEST, NULL);
+ sent_settings_ = true;
+ QueueFrame(settings_frame.release(), HIGHEST);
}
void SpdySession::HandleSetting(uint32 id, uint32 value) {
@@ -1682,6 +1729,12 @@ void SpdySession::UpdateStreamsSendWindowSize(int32 delta_window_size) {
DCHECK(stream);
stream->AdjustSendWindowSize(delta_window_size);
}
+
+ CreatedStreamSet::iterator i;
+ for (i = created_streams_.begin(); i != created_streams_.end(); i++) {
+ const scoped_refptr<SpdyStream>& stream = *i;
+ stream->AdjustSendWindowSize(delta_window_size);
+ }
}
void SpdySession::SendPrefacePingIfNoneInFlight() {
@@ -1702,7 +1755,7 @@ void SpdySession::WritePingFrame(uint32 unique_id) {
DCHECK(buffered_spdy_framer_.get());
scoped_ptr<SpdyPingControlFrame> ping_frame(
buffered_spdy_framer_->CreatePingFrame(next_ping_id_));
- QueueFrame(ping_frame.get(), HIGHEST, NULL);
+ QueueFrame(ping_frame.release(), HIGHEST);
if (net_log().IsLoggingAllEvents()) {
net_log().AddEvent(
« no previous file with comments | « net/spdy/spdy_session.h ('k') | net/spdy/spdy_session_spdy2_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698