Chromium Code Reviews| Index: net/spdy/spdy_stream.cc |
| diff --git a/net/spdy/spdy_stream.cc b/net/spdy/spdy_stream.cc |
| index 190e1750c7eedc107ff2da1dfdd05860943adc7a..b441a76f7df1f1e241a5697621df176674a73312 100644 |
| --- a/net/spdy/spdy_stream.cc |
| +++ b/net/spdy/spdy_stream.cc |
| @@ -44,7 +44,7 @@ base::Value* NetLogSpdyStreamWindowUpdateCallback( |
| return dict; |
| } |
| -bool ContainsUpperAscii(const std::string& str) { |
| +bool ContainsUppercaseAscii(const std::string& str) { |
| for (std::string::const_iterator i(str.begin()); i != str.end(); ++i) { |
| if (*i >= 'A' && *i <= 'Z') { |
| return true; |
| @@ -89,7 +89,7 @@ SpdyStream::SpdyStream(SpdyStreamType type, |
| : type_(type), |
| weak_ptr_factory_(this), |
| in_do_loop_(false), |
| - continue_buffering_data_(true), |
| + continue_buffering_data_(type_ == SPDY_PUSH_STREAM), |
| stream_id_(0), |
| path_(path), |
| priority_(priority), |
| @@ -98,14 +98,12 @@ SpdyStream::SpdyStream(SpdyStreamType type, |
| send_window_size_(initial_send_window_size), |
| recv_window_size_(initial_recv_window_size), |
| unacked_recv_window_bytes_(0), |
| - response_received_(false), |
| session_(session), |
| delegate_(NULL), |
| send_status_( |
| (type_ == SPDY_PUSH_STREAM) ? |
| NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND), |
| request_time_(base::Time::Now()), |
| - response_(new SpdyHeaderBlock), |
| io_state_((type_ == SPDY_PUSH_STREAM) ? STATE_OPEN : STATE_NONE), |
| response_status_(OK), |
| net_log_(net_log), |
| @@ -125,35 +123,44 @@ SpdyStream::~SpdyStream() { |
| } |
| void SpdyStream::SetDelegate(Delegate* delegate) { |
| + Delegate* old_delegate = delegate_; |
|
Ryan Hamilton
2013/06/19 18:58:36
Is it "valid" to have more than one delegate in th
akalin
2013/06/21 21:13:25
Yeap, in the redirect case in SpdyProxyClientSocke
|
| CHECK(delegate); |
|
Ryan Hamilton
2013/06/19 18:58:36
nit: can you make this the first line
akalin
2013/06/21 21:13:25
Done.
|
| delegate_ = delegate; |
| - if (type_ == SPDY_PUSH_STREAM) { |
| - CHECK(response_received()); |
| + if (type_ == SPDY_PUSH_STREAM && !old_delegate) { |
| + DCHECK(continue_buffering_data_); |
| base::MessageLoop::current()->PostTask( |
| FROM_HERE, |
| base::Bind(&SpdyStream::PushedStreamReplayData, GetWeakPtr())); |
| - } else { |
| - continue_buffering_data_ = false; |
| } |
| } |
| +SpdyStream::Delegate* SpdyStream::GetDelegate() { |
| + return delegate_; |
| +} |
| + |
| void SpdyStream::PushedStreamReplayData() { |
| + DCHECK_EQ(type_, SPDY_PUSH_STREAM); |
| DCHECK_NE(stream_id_, 0u); |
| - |
| - if (!delegate_) |
| - return; |
| + DCHECK(continue_buffering_data_); |
| continue_buffering_data_ = false; |
| - // TODO(akalin): This call may delete this object. Figure out what |
| - // to do in that case. |
| - int rv = delegate_->OnResponseHeadersReceived(*response_, response_time_, OK); |
| + // The delegate methods called below may delete |this|, so use |
| + // |weak_this| to detect that. |
| + base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); |
| + |
| + CHECK(delegate_); |
| + CHECK(response_headers_); |
| + int rv = delegate_->OnResponseHeadersUpdated(*response_headers_); |
| + DCHECK(rv == OK || rv == ERR_INCOMPLETE_SPDY_HEADERS); |
| if (rv == ERR_INCOMPLETE_SPDY_HEADERS) { |
| - // We don't have complete headers. Assume we're waiting for another |
| - // HEADERS frame. Since we don't have headers, we had better not have |
| - // any pending data frames. |
| - if (pending_buffers_.size() != 0U) { |
| + // Since ERR_INCOMPLETE_SPDY_HEADERS was returned, we know that |
| + // we're not closed/deleted. Since we don't have complete headers, |
| + // assume we're waiting for another HEADERS frame, and we had |
| + // better not have any pending data frames. |
| + CHECK(weak_this); |
| + if (!pending_buffers_.empty()) { |
| LogStreamError(ERR_SPDY_PROTOCOL_ERROR, |
| "HEADERS incomplete headers, but pending data frames."); |
| session_->CloseActiveStream(stream_id_, ERR_SPDY_PROTOCOL_ERROR); |
| @@ -161,34 +168,40 @@ void SpdyStream::PushedStreamReplayData() { |
| return; |
| } |
| - std::vector<SpdyBuffer*> buffers; |
| - pending_buffers_.release(&buffers); |
| - for (size_t i = 0; i < buffers.size(); ++i) { |
| - // It is always possible that a callback to the delegate results in |
| - // the delegate no longer being available. |
| - if (!delegate_) |
| - break; |
| - if (buffers[i]) { |
| - delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>(buffers[i])); |
| - } else { |
| - delegate_->OnDataReceived(scoped_ptr<SpdyBuffer>()); |
| + if (!weak_this) |
| + return; |
| + |
| + while (!pending_buffers_.empty()) { |
| + // Take ownership of the first element of |pending_buffers_|. |
| + scoped_ptr<SpdyBuffer> buffer(pending_buffers_.front()); |
| + pending_buffers_.weak_erase(pending_buffers_.begin()); |
| + |
| + bool eof = (buffer == NULL); |
| + |
| + CHECK(delegate_); |
| + delegate_->OnDataReceived(buffer.Pass()); |
| + |
| + if (!weak_this) |
| + return; |
|
Ryan Hamilton
2013/06/19 18:58:36
do we need to make this check every time we invoke
akalin
2013/06/21 21:13:25
Audited the other delegate methods and added comme
|
| + |
| + if (eof) { |
| + DCHECK(pending_buffers_.empty()); |
| session_->CloseActiveStream(stream_id_, OK); |
| - // Note: |this| may be deleted after calling CloseActiveStream. |
| - DCHECK_EQ(buffers.size() - 1, i); |
| + DCHECK(!weak_this); |
| } |
| } |
| } |
| scoped_ptr<SpdyFrame> SpdyStream::ProduceSynStreamFrame() { |
| CHECK_EQ(io_state_, STATE_SEND_REQUEST_HEADERS_COMPLETE); |
| - CHECK(request_); |
| + CHECK(request_headers_); |
| CHECK_GT(stream_id_, 0u); |
| SpdyControlFlags flags = |
| (send_status_ == NO_MORE_DATA_TO_SEND) ? |
| CONTROL_FLAG_FIN : CONTROL_FLAG_NONE; |
| scoped_ptr<SpdyFrame> frame(session_->CreateSynStream( |
| - stream_id_, priority_, slot_, flags, *request_)); |
| + stream_id_, priority_, slot_, flags, *request_headers_)); |
| send_time_ = base::TimeTicks::Now(); |
| return frame.Pass(); |
| } |
| @@ -200,6 +213,10 @@ void SpdyStream::DetachDelegate() { |
| Cancel(); |
| } |
| +bool SpdyStream::ReceivedInitialResponseHeaders() const { |
| + return response_headers_ != NULL; |
| +} |
| + |
| void SpdyStream::AdjustSendWindowSize(int32 delta_window_size) { |
| DCHECK_GE(session_->flow_control_state(), SpdySession::FLOW_CONTROL_STREAM); |
| @@ -372,17 +389,12 @@ void SpdyStream::SetRequestTime(base::Time t) { |
| request_time_ = t; |
| } |
| -int SpdyStream::OnResponseHeadersReceived(const SpdyHeaderBlock& response) { |
| - int rv = OK; |
| - |
| - metrics_.StartStream(); |
| - |
| - // TODO(akalin): This should be handled as a protocol error. |
| - DCHECK(response_->empty()); |
| - *response_ = response; // TODO(ukai): avoid copy. |
| - |
| - recv_first_byte_time_ = base::TimeTicks::Now(); |
| - response_time_ = base::Time::Now(); |
|
Ryan Hamilton
2013/06/19 18:58:36
I see that you pushed this logic up from the strea
akalin
2013/06/21 21:13:25
There was a TODO somewhere to record the response
|
| +int SpdyStream::OnInitialResponseHeadersReceived( |
| + const SpdyHeaderBlock& initial_response_headers, |
| + base::Time response_time, |
| + base::TimeTicks recv_first_byte_time) { |
| + // SpdySession guarantees that this is called at most once. |
| + CHECK(!response_headers_); |
| // Check to make sure that we don't receive the response headers |
| // before we're ready for it. |
| @@ -410,77 +422,19 @@ int SpdyStream::OnResponseHeadersReceived(const SpdyHeaderBlock& response) { |
| break; |
| } |
| - DCHECK_EQ(io_state_, STATE_OPEN); |
| - |
| - // TODO(akalin): Merge the code below with the code in OnHeaders(). |
| - |
| - // Append all the headers into the response header block. |
| - for (SpdyHeaderBlock::const_iterator it = response.begin(); |
| - it != response.end(); ++it) { |
| - // Disallow uppercase headers. |
| - if (ContainsUpperAscii(it->first)) { |
| - session_->ResetStream(stream_id_, priority_, RST_STREAM_PROTOCOL_ERROR, |
| - "Upper case characters in header: " + it->first); |
| - return ERR_SPDY_PROTOCOL_ERROR; |
| - } |
| - } |
| - |
| - if ((*response_).find("transfer-encoding") != (*response_).end()) { |
| - session_->ResetStream(stream_id_, priority_, RST_STREAM_PROTOCOL_ERROR, |
| - "Received transfer-encoding header"); |
| - return ERR_SPDY_PROTOCOL_ERROR; |
| - } |
| + metrics_.StartStream(); |
|
Ryan Hamilton
2013/06/19 18:58:36
I notice that this moved to here. This means that
akalin
2013/06/21 21:13:25
I think so. That just means that erroneous streams
|
| - if (delegate_) { |
| - // May delete this object. |
| - rv = delegate_->OnResponseHeadersReceived(*response_, response_time_, rv); |
| - } |
| - // If delegate_ is not yet attached, we'll call |
| - // OnResponseHeadersReceived after the delegate gets attached to the |
| - // stream. |
| + DCHECK_EQ(io_state_, STATE_OPEN); |
| - return rv; |
| + response_headers_.reset(new SpdyHeaderBlock()); |
| + response_time_ = response_time; |
| + recv_first_byte_time_ = recv_first_byte_time; |
| + return MergeWithResponseHeaders(initial_response_headers); |
| } |
| -int SpdyStream::OnHeaders(const SpdyHeaderBlock& headers) { |
| - DCHECK(!response_->empty()); |
| - |
| - // Append all the headers into the response header block. |
| - for (SpdyHeaderBlock::const_iterator it = headers.begin(); |
| - it != headers.end(); ++it) { |
| - // Disallow duplicate headers. This is just to be conservative. |
| - if ((*response_).find(it->first) != (*response_).end()) { |
| - LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "HEADERS duplicate header"); |
| - response_status_ = ERR_SPDY_PROTOCOL_ERROR; |
| - return ERR_SPDY_PROTOCOL_ERROR; |
| - } |
| - |
| - // Disallow uppercase headers. |
| - if (ContainsUpperAscii(it->first)) { |
| - session_->ResetStream(stream_id_, priority_, RST_STREAM_PROTOCOL_ERROR, |
| - "Upper case characters in header: " + it->first); |
| - return ERR_SPDY_PROTOCOL_ERROR; |
| - } |
| - |
| - (*response_)[it->first] = it->second; |
| - } |
| - |
| - if ((*response_).find("transfer-encoding") != (*response_).end()) { |
| - session_->ResetStream(stream_id_, priority_, RST_STREAM_PROTOCOL_ERROR, |
| - "Received transfer-encoding header"); |
| - return ERR_SPDY_PROTOCOL_ERROR; |
| - } |
| - |
| - int rv = OK; |
| - if (delegate_) { |
| - // May delete this object. |
| - rv = delegate_->OnResponseHeadersReceived(*response_, response_time_, rv); |
| - // ERR_INCOMPLETE_SPDY_HEADERS means that we are waiting for more |
| - // headers before the response header block is complete. |
| - if (rv == ERR_INCOMPLETE_SPDY_HEADERS) |
| - rv = OK; |
| - } |
| - return rv; |
| +int SpdyStream::OnAdditionalResponseHeadersReceived( |
| + const SpdyHeaderBlock& additional_response_headers) { |
| + return MergeWithResponseHeaders(additional_response_headers); |
| } |
| void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { |
| @@ -488,13 +442,14 @@ void SpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { |
| // If we don't have a response, then the SYN_REPLY did not come through. |
| // We cannot pass data up to the caller unless the reply headers have been |
| // received. |
| - if (!response_received()) { |
| + if (!response_headers_) { |
| LogStreamError(ERR_SYN_REPLY_NOT_RECEIVED, "Didn't receive a response."); |
| session_->CloseActiveStream(stream_id_, ERR_SYN_REPLY_NOT_RECEIVED); |
| return; |
| } |
| if (!delegate_ || continue_buffering_data_) { |
| + DCHECK_EQ(type_, SPDY_PUSH_STREAM); |
| // It should be valid for this to happen in the server push case. |
| // We'll return received data when delegate gets attached to the stream. |
| if (buffer) { |
| @@ -591,14 +546,14 @@ void SpdyStream::Close() { |
| } |
| } |
| -int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> headers, |
| +int SpdyStream::SendRequestHeaders(scoped_ptr<SpdyHeaderBlock> request_headers, |
| SpdySendStatus send_status) { |
| CHECK_NE(type_, SPDY_PUSH_STREAM); |
| CHECK_EQ(send_status_, MORE_DATA_TO_SEND); |
| - CHECK(!request_); |
| + CHECK(!request_headers_); |
| CHECK(!pending_send_data_.get()); |
| CHECK_EQ(io_state_, STATE_NONE); |
| - request_ = headers.Pass(); |
| + request_headers_ = request_headers.Pass(); |
| send_status_ = send_status; |
| io_state_ = STATE_GET_DOMAIN_BOUND_CERT; |
| return DoLoop(OK); |
| @@ -646,15 +601,15 @@ base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() { |
| bool SpdyStream::HasUrl() const { |
| if (type_ == SPDY_PUSH_STREAM) |
| - return response_received(); |
| - return request_ != NULL; |
| + return response_headers_ != NULL; |
| + return request_headers_ != NULL; |
| } |
| GURL SpdyStream::GetUrl() const { |
| DCHECK(HasUrl()); |
| const SpdyHeaderBlock& headers = |
| - (type_ == SPDY_PUSH_STREAM) ? *response_ : *request_; |
| + (type_ == SPDY_PUSH_STREAM) ? *response_headers_ : *request_headers_; |
| return GetUrlFromHeaderBlock(headers, GetProtocolVersion(), |
| type_ == SPDY_PUSH_STREAM); |
| } |
| @@ -726,7 +681,7 @@ int SpdyStream::DoLoop(int result) { |
| } |
| int SpdyStream::DoGetDomainBoundCert() { |
| - CHECK(request_); |
| + CHECK(request_headers_); |
| DCHECK_NE(type_, SPDY_PUSH_STREAM); |
| GURL url = GetUrl(); |
| if (!session_->NeedsCredentials() || !url.SchemeIs("https")) { |
| @@ -766,7 +721,7 @@ int SpdyStream::DoGetDomainBoundCertComplete(int result) { |
| } |
| int SpdyStream::DoSendDomainBoundCert() { |
| - CHECK(request_); |
| + CHECK(request_headers_); |
| DCHECK_NE(type_, SPDY_PUSH_STREAM); |
| io_state_ = STATE_SEND_DOMAIN_BOUND_CERT_COMPLETE; |
| @@ -982,4 +937,54 @@ void SpdyStream::QueueNextDataFrame() { |
| new SimpleBufferProducer(data_buffer.Pass()))); |
| } |
| +int SpdyStream::MergeWithResponseHeaders( |
| + const SpdyHeaderBlock& new_response_headers) { |
| + DCHECK(response_headers_); |
| + |
| + if (new_response_headers.find("transfer-encoding") != |
| + new_response_headers.end()) { |
| + session_->ResetStream(stream_id_, priority_, RST_STREAM_PROTOCOL_ERROR, |
| + "Received transfer-encoding header"); |
| + return ERR_SPDY_PROTOCOL_ERROR; |
| + } |
| + |
| + for (SpdyHeaderBlock::const_iterator it = new_response_headers.begin(); |
| + it != new_response_headers.end(); ++it) { |
| + // Disallow uppercase headers. |
| + if (ContainsUppercaseAscii(it->first)) { |
| + session_->ResetStream(stream_id_, priority_, RST_STREAM_PROTOCOL_ERROR, |
| + "Upper case characters in header: " + it->first); |
| + return ERR_SPDY_PROTOCOL_ERROR; |
| + } |
| + |
| + SpdyHeaderBlock::iterator it2 = response_headers_->lower_bound(it->first); |
| + // Disallow duplicate headers. This is just to be conservative. |
| + if (it2 != response_headers_->end() && it2->first == it->first) { |
| + LogStreamError(ERR_SPDY_PROTOCOL_ERROR, "HEADERS duplicate header"); |
| + response_status_ = ERR_SPDY_PROTOCOL_ERROR; |
| + return ERR_SPDY_PROTOCOL_ERROR; |
| + } |
| + |
| + response_headers_->insert(it2, *it); |
| + } |
| + |
| + // If delegate_ is not yet attached, we'll call |
| + // OnResponseHeadersUpdated() after the delegate gets attached to |
| + // the stream. |
| + if (delegate_) { |
| + // The call to OnResponseHeadersUpdated() may delete this object |
| + // if OK is returned, so store |type_| in a local variable. |
| + SpdyStreamType type = type_; |
| + int rv = delegate_->OnResponseHeadersUpdated(*response_headers_); |
| + DCHECK(rv == OK || rv == ERR_INCOMPLETE_SPDY_HEADERS); |
|
Ryan Hamilton
2013/06/19 18:58:36
boy, this sure screams "this method should return
akalin
2013/06/21 21:13:25
Done.
|
| + // Incomplete headers are OK only for push streams. |
| + if (type == SPDY_PUSH_STREAM && rv == ERR_INCOMPLETE_SPDY_HEADERS) |
| + rv = OK; |
| + if (rv != OK) |
| + return rv; |
| + } |
| + |
| + return OK; |
| +} |
| + |
| } // namespace net |