Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "net/spdy/bidirectional_spdy_stream.h" | |
| 6 | |
| 7 #include "base/memory/scoped_ptr.h" | |
| 8 #include "base/time/time.h" | |
| 9 #include "net/base/request_priority.h" | |
| 10 #include "net/spdy/spdy_buffer.h" | |
| 11 #include "net/spdy/spdy_header_block.h" | |
| 12 #include "net/spdy/spdy_http_utils.h" | |
| 13 #include "net/spdy/spdy_stream.h" | |
| 14 | |
| 15 namespace net { | |
| 16 | |
| 17 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1); | |
| 18 | |
| 19 BidirectionalSpdyStream::BidirectionalSpdyStream( | |
| 20 const base::WeakPtr<SpdySession>& spdy_session) | |
| 21 : spdy_session_(spdy_session), | |
| 22 stream_closed_(false), | |
| 23 closed_stream_status_(ERR_FAILED), | |
| 24 buffered_read_callback_pending_(false), | |
| 25 more_read_data_pending_(false), | |
| 26 weak_factory_(this) {} | |
| 27 | |
| 28 BidirectionalSpdyStream::~BidirectionalSpdyStream() { | |
| 29 if (stream_.get()) { | |
| 30 stream_->DetachDelegate(); | |
| 31 DCHECK(!stream_.get()); | |
| 32 } | |
| 33 } | |
| 34 | |
| 35 void BidirectionalSpdyStream::Start(const HttpRequestInfo* request_info, | |
| 36 RequestPriority priority, | |
| 37 const BoundNetLog& net_log, | |
| 38 BidirectionalStream::Delegate* delegate) { | |
| 39 delegate_ = delegate; | |
| 40 DCHECK(!stream_); | |
| 41 if (!spdy_session_) | |
| 42 delegate_->OnFailed(ERR_CONNECTION_CLOSED); | |
| 43 | |
| 44 request_info_ = request_info; | |
| 45 | |
| 46 int rv = stream_request_.StartRequest( | |
| 47 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url, priority, | |
| 48 net_log, base::Bind(&BidirectionalSpdyStream::OnStreamInitialized, | |
| 49 weak_factory_.GetWeakPtr())); | |
| 50 if (rv != ERR_IO_PENDING) | |
| 51 OnStreamInitialized(rv); | |
| 52 } | |
| 53 | |
| 54 int BidirectionalSpdyStream::ReadData(IOBuffer* buf, int buf_len) { | |
| 55 if (stream_.get()) | |
| 56 CHECK(!stream_->IsIdle()); | |
| 57 | |
| 58 CHECK(buf); | |
| 59 CHECK(buf_len); | |
| 60 if (!stream_closed_) | |
| 61 CHECK(stream_); | |
| 62 | |
| 63 // If there is data buffered, complete the IO immediately. | |
| 64 if (!data_queue_.IsEmpty()) { | |
| 65 return data_queue_.Dequeue(buf->data(), buf_len); | |
| 66 } else if (stream_closed_) { | |
| 67 return closed_stream_status_; | |
| 68 } | |
| 69 user_buffer_ = buf; | |
| 70 user_buffer_len_ = buf_len; | |
| 71 return ERR_IO_PENDING; | |
| 72 } | |
| 73 | |
| 74 void BidirectionalSpdyStream::SendData(IOBuffer* data, | |
| 75 int length, | |
| 76 bool end_stream) { | |
| 77 stream_->SendData(data, length, | |
| 78 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
| 79 } | |
| 80 | |
| 81 void BidirectionalSpdyStream::OnRequestHeadersSent() { | |
| 82 delegate_->OnRequestHeadersSent(); | |
| 83 } | |
| 84 | |
| 85 SpdyResponseHeadersStatus BidirectionalSpdyStream::OnResponseHeadersUpdated( | |
| 86 const SpdyHeaderBlock& response_headers) { | |
| 87 delegate_->OnHeaders(response_headers); | |
| 88 return RESPONSE_HEADERS_ARE_COMPLETE; | |
| 89 } | |
| 90 | |
| 91 void BidirectionalSpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { | |
| 92 DCHECK(stream_); | |
| 93 data_queue_.Enqueue(buffer.Pass()); | |
| 94 if (user_buffer_.get()) { | |
| 95 // Handing small chunks of data to the caller creates measurable overhead. | |
| 96 // So buffer data in short time-spans and send a single read notification. | |
| 97 ScheduleBufferedReadCallback(); | |
| 98 } | |
| 99 } | |
| 100 | |
| 101 void BidirectionalSpdyStream::OnDataSent() { | |
| 102 delegate_->OnDataSent(); | |
| 103 } | |
| 104 | |
| 105 void BidirectionalSpdyStream::OnTrailers(const SpdyHeaderBlock& trailers) { | |
| 106 delegate_->OnTrailers(trailers); | |
| 107 } | |
| 108 | |
| 109 void BidirectionalSpdyStream::OnClose(int status) { | |
| 110 if (stream_.get()) { | |
| 111 stream_closed_ = true; | |
| 112 closed_stream_status_ = status; | |
| 113 } | |
| 114 | |
| 115 stream_.reset(); | |
| 116 // Complete remaining buffered read. | |
|
mef
2015/10/07 23:44:56
what if there is no pending read?
xunjieli
2015/10/19 21:07:46
Done. Good catch! I added null check to handle thi
| |
| 117 if (status == OK) { | |
| 118 DoBufferedReadCallback(); | |
| 119 return; | |
| 120 } | |
| 121 | |
| 122 delegate_->OnClose(status); | |
| 123 } | |
| 124 | |
| 125 void BidirectionalSpdyStream::SendRequestHeaders() { | |
| 126 stream_->SetDelegate(this); | |
| 127 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); | |
| 128 CreateSpdyHeadersFromHttpRequest(*request_info_, request_info_->extra_headers, | |
| 129 stream_->GetProtocolVersion(), true, | |
| 130 headers.get()); | |
| 131 bool end_stream = (request_info_->method == "GET"); | |
| 132 stream_->SendRequestHeaders( | |
| 133 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
| 134 } | |
| 135 | |
| 136 void BidirectionalSpdyStream::OnStreamInitialized(int rv) { | |
| 137 DCHECK_NE(ERR_IO_PENDING, rv); | |
| 138 if (rv == OK) { | |
| 139 stream_ = stream_request_.ReleaseStream(); | |
| 140 SendRequestHeaders(); | |
| 141 return; | |
| 142 } | |
| 143 delegate_->OnFailed(rv); | |
| 144 } | |
| 145 | |
| 146 void BidirectionalSpdyStream::ScheduleBufferedReadCallback() { | |
| 147 // If there is already a scheduled DoBufferedReadCallback, don't issue | |
| 148 // another one. Mark that we have received more data and return. | |
| 149 if (buffered_read_callback_pending_) { | |
| 150 more_read_data_pending_ = true; | |
| 151 return; | |
| 152 } | |
| 153 | |
| 154 more_read_data_pending_ = false; | |
| 155 buffered_read_callback_pending_ = true; | |
| 156 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( | |
| 157 FROM_HERE, base::Bind(&BidirectionalSpdyStream::DoBufferedReadCallback, | |
| 158 weak_factory_.GetWeakPtr()), | |
| 159 kBufferTime); | |
| 160 } | |
| 161 | |
| 162 void BidirectionalSpdyStream::DoBufferedReadCallback() { | |
| 163 buffered_read_callback_pending_ = false; | |
| 164 // If the stream errored out, do not complete the read. | |
| 165 if (!stream_ && !stream_closed_) | |
| 166 return; | |
| 167 if (stream_closed_ && closed_stream_status_ != OK) | |
| 168 return; | |
| 169 | |
| 170 // When |more_read_data_pending_| is true, it means that more data has arrived | |
| 171 // since started waiting. Wait a little longer and continue to buffer. | |
| 172 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { | |
| 173 ScheduleBufferedReadCallback(); | |
| 174 return; | |
| 175 } | |
| 176 | |
| 177 if (!user_buffer_.get()) | |
| 178 return; | |
| 179 | |
| 180 int rv = ReadData(user_buffer_.get(), user_buffer_len_); | |
| 181 DCHECK_NE(ERR_IO_PENDING, rv); | |
| 182 delegate_->OnReadCompleted(rv); | |
| 183 if (data_queue_.IsEmpty() && stream_closed_) | |
| 184 delegate_->OnClose(closed_stream_status_); | |
| 185 } | |
| 186 | |
| 187 bool BidirectionalSpdyStream::ShouldWaitForMoreBufferedData() const { | |
| 188 if (stream_closed_) | |
| 189 return false; | |
| 190 DCHECK_GT(user_buffer_len_, 0); | |
| 191 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_); | |
| 192 } | |
| 193 | |
| 194 } // namespace net | |
| OLD | NEW |