Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "net/spdy/bidirectional_spdy_stream.h" | |
| 6 | |
| 7 #include "base/bind.h" | |
| 8 #include "base/memory/scoped_ptr.h" | |
| 9 #include "base/time/time.h" | |
| 10 #include "net/base/request_priority.h" | |
| 11 #include "net/spdy/spdy_buffer.h" | |
| 12 #include "net/spdy/spdy_header_block.h" | |
| 13 #include "net/spdy/spdy_http_utils.h" | |
| 14 #include "net/spdy/spdy_stream.h" | |
| 15 | |
| 16 namespace net { | |
| 17 | |
| 18 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1); | |
| 19 | |
| 20 BidirectionalSpdyStream::BidirectionalSpdyStream( | |
| 21 const base::WeakPtr<SpdySession>& spdy_session) | |
| 22 : spdy_session_(spdy_session), | |
| 23 stream_closed_(false), | |
| 24 closed_stream_status_(ERR_FAILED), | |
| 25 buffered_read_callback_pending_(false), | |
| 26 more_read_data_pending_(false), | |
| 27 weak_factory_(this) {} | |
| 28 | |
| 29 BidirectionalSpdyStream::~BidirectionalSpdyStream() { | |
| 30 if (stream_.get()) { | |
| 31 stream_->DetachDelegate(); | |
| 32 DCHECK(!stream_.get()); | |
| 33 } | |
| 34 } | |
| 35 | |
| 36 void BidirectionalSpdyStream::Start(const HttpRequestInfo* request_info, | |
| 37 RequestPriority priority, | |
| 38 const BoundNetLog& net_log, | |
| 39 BidirectionalStream::Delegate* delegate) { | |
| 40 delegate_ = delegate; | |
| 41 DCHECK(!stream_); | |
| 42 if (!spdy_session_) | |
| 43 delegate_->OnFailed(ERR_CONNECTION_CLOSED); | |
| 44 | |
| 45 request_info_ = request_info; | |
| 46 | |
| 47 int rv = stream_request_.StartRequest( | |
| 48 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url, priority, | |
| 49 net_log, base::Bind(&BidirectionalSpdyStream::OnStreamInitialized, | |
| 50 weak_factory_.GetWeakPtr())); | |
| 51 if (rv != ERR_IO_PENDING) | |
| 52 OnStreamInitialized(rv); | |
| 53 } | |
| 54 | |
| 55 int BidirectionalSpdyStream::ReadData(IOBuffer* buf, int buf_len) { | |
| 56 if (stream_.get()) | |
| 57 CHECK(!stream_->IsIdle()); | |
| 58 | |
| 59 CHECK(buf); | |
| 60 CHECK(buf_len); | |
| 61 if (!stream_closed_) | |
| 62 CHECK(stream_); | |
| 63 | |
| 64 // If there is data buffered, complete the IO immediately. | |
| 65 if (!data_queue_.IsEmpty()) { | |
| 66 return data_queue_.Dequeue(buf->data(), buf_len); | |
| 67 } else if (stream_closed_) { | |
| 68 return closed_stream_status_; | |
| 69 } | |
| 70 user_buffer_ = buf; | |
| 71 user_buffer_len_ = buf_len; | |
| 72 return ERR_IO_PENDING; | |
| 73 } | |
| 74 | |
| 75 void BidirectionalSpdyStream::SendData(IOBuffer* data, | |
| 76 int length, | |
| 77 bool end_stream) { | |
| 78 stream_->SendData(data, length, | |
| 79 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
| 80 } | |
| 81 | |
| 82 void BidirectionalSpdyStream::Cancel() { | |
| 83 if (!stream_) | |
| 84 return; | |
| 85 stream_->Cancel(); | |
| 86 DCHECK(!stream_); | |
| 87 } | |
| 88 | |
| 89 void BidirectionalSpdyStream::OnRequestHeadersSent() { | |
| 90 delegate_->OnRequestHeadersSent(); | |
| 91 } | |
| 92 | |
| 93 SpdyResponseHeadersStatus BidirectionalSpdyStream::OnResponseHeadersUpdated( | |
| 94 const SpdyHeaderBlock& response_headers) { | |
| 95 delegate_->OnHeaders(response_headers); | |
| 96 return RESPONSE_HEADERS_ARE_COMPLETE; | |
| 97 } | |
| 98 | |
| 99 void BidirectionalSpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { | |
| 100 DCHECK(stream_); | |
| 101 data_queue_.Enqueue(buffer.Pass()); | |
| 102 if (user_buffer_) { | |
| 103 // Handing small chunks of data to the caller creates measurable overhead. | |
| 104 // So buffer data in short time-spans and send a single read notification. | |
| 105 ScheduleBufferedRead(); | |
| 106 } | |
| 107 } | |
| 108 | |
| 109 void BidirectionalSpdyStream::OnDataSent() { | |
| 110 delegate_->OnDataSent(); | |
| 111 } | |
| 112 | |
| 113 void BidirectionalSpdyStream::OnTrailers(const SpdyHeaderBlock& trailers) { | |
| 114 delegate_->OnTrailers(trailers); | |
| 115 } | |
| 116 | |
| 117 void BidirectionalSpdyStream::OnClose(int status) { | |
| 118 DCHECK(stream_); | |
| 119 | |
| 120 stream_closed_ = true; | |
| 121 closed_stream_status_ = status; | |
| 122 stream_.reset(); | |
| 123 | |
| 124 // Complete remaining buffered read. | |
| 125 if (status == OK && user_buffer_) { | |
| 126 DoBufferedRead(); | |
| 127 return; | |
| 128 } | |
| 129 | |
| 130 delegate_->OnClose(status); | |
| 131 } | |
| 132 | |
| 133 void BidirectionalSpdyStream::SendRequestHeaders() { | |
| 134 stream_->SetDelegate(this); | |
| 135 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); | |
| 136 CreateSpdyHeadersFromHttpRequest(*request_info_, request_info_->extra_headers, | |
| 137 stream_->GetProtocolVersion(), true, | |
| 138 headers.get()); | |
| 139 bool end_stream = (request_info_->method == "GET"); | |
| 140 stream_->SendRequestHeaders( | |
| 141 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
| 142 } | |
| 143 | |
| 144 void BidirectionalSpdyStream::OnStreamInitialized(int rv) { | |
| 145 DCHECK_NE(ERR_IO_PENDING, rv); | |
| 146 if (rv == OK) { | |
| 147 stream_ = stream_request_.ReleaseStream(); | |
| 148 SendRequestHeaders(); | |
| 149 return; | |
| 150 } | |
| 151 delegate_->OnFailed(rv); | |
| 152 } | |
| 153 | |
| 154 void BidirectionalSpdyStream::ScheduleBufferedRead() { | |
| 155 // If there is already a scheduled DoBufferedRead, don't issue | |
| 156 // another one. Mark that we have received more data and return. | |
| 157 if (buffered_read_callback_pending_) { | |
| 158 more_read_data_pending_ = true; | |
| 159 return; | |
| 160 } | |
| 161 | |
| 162 more_read_data_pending_ = false; | |
| 163 buffered_read_callback_pending_ = true; | |
| 164 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( | |
| 165 FROM_HERE, base::Bind(&BidirectionalSpdyStream::DoBufferedRead, | |
| 166 weak_factory_.GetWeakPtr()), | |
| 167 kBufferTime); | |
| 168 } | |
| 169 | |
| 170 void BidirectionalSpdyStream::DoBufferedRead() { | |
| 171 buffered_read_callback_pending_ = false; | |
| 172 // If the stream errored out, do not complete the read. | |
| 173 if (!stream_ && !stream_closed_) | |
| 174 return; | |
| 175 if (stream_closed_ && closed_stream_status_ != OK) | |
| 176 return; | |
| 177 | |
| 178 // When |more_read_data_pending_| is true, it means that more data has arrived | |
| 179 // since started waiting. Wait a little longer and continue to buffer. | |
| 180 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { | |
| 181 ScheduleBufferedRead(); | |
| 182 return; | |
| 183 } | |
| 184 | |
| 185 int rv = 0; | |
| 186 if (user_buffer_.get()) { | |
| 187 rv = ReadData(user_buffer_.get(), user_buffer_len_); | |
| 188 DCHECK_NE(ERR_IO_PENDING, rv); | |
| 189 user_buffer_ = nullptr; | |
| 190 user_buffer_len_ = 0; | |
| 191 delegate_->OnReadCompleted(rv); | |
| 192 | |
| 193 // If all data is read, and BidirectionalSpdyStream::onClose is invoked | |
| 194 // previously, let the delegate know about the onClose event. | |
| 195 if (data_queue_.IsEmpty() && stream_closed_) { | |
| 196 DCHECK_EQ(OK, closed_stream_status_); | |
| 197 delegate_->OnClose(OK); | |
|
mef
2015/10/20 21:56:35
should we change this to 'noMoreData' flag?
xunjieli
2015/10/21 19:35:36
Talked on the Misha's CL. I misunderstood the comm
| |
| 198 } | |
| 199 } | |
| 200 } | |
| 201 | |
| 202 bool BidirectionalSpdyStream::ShouldWaitForMoreBufferedData() const { | |
| 203 if (stream_closed_) | |
| 204 return false; | |
| 205 DCHECK_GT(user_buffer_len_, 0); | |
| 206 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_); | |
| 207 } | |
| 208 | |
| 209 } // namespace net | |
| OLD | NEW |