| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "net/spdy/bidirectional_stream_spdy_impl.h" | 5 #include "net/spdy/bidirectional_stream_spdy_impl.h" |
| 6 | 6 |
| 7 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/location.h" | 8 #include "base/location.h" |
| 9 #include "base/logging.h" | 9 #include "base/logging.h" |
| 10 #include "base/time/time.h" | 10 #include "base/time/time.h" |
| (...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 53 bool /*send_request_headers_automatically*/, | 53 bool /*send_request_headers_automatically*/, |
| 54 BidirectionalStreamImpl::Delegate* delegate, | 54 BidirectionalStreamImpl::Delegate* delegate, |
| 55 std::unique_ptr<base::Timer> timer) { | 55 std::unique_ptr<base::Timer> timer) { |
| 56 DCHECK(!stream_); | 56 DCHECK(!stream_); |
| 57 DCHECK(timer); | 57 DCHECK(timer); |
| 58 | 58 |
| 59 delegate_ = delegate; | 59 delegate_ = delegate; |
| 60 timer_ = std::move(timer); | 60 timer_ = std::move(timer); |
| 61 | 61 |
| 62 if (!spdy_session_) { | 62 if (!spdy_session_) { |
| 63 delegate_->OnFailed(ERR_CONNECTION_CLOSED); | 63 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 64 FROM_HERE, |
| 65 base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, |
| 66 weak_factory_.GetWeakPtr(), ERR_CONNECTION_CLOSED)); |
| 64 return; | 67 return; |
| 65 } | 68 } |
| 66 | 69 |
| 67 request_info_ = request_info; | 70 request_info_ = request_info; |
| 68 | 71 |
| 69 int rv = stream_request_.StartRequest( | 72 int rv = stream_request_.StartRequest( |
| 70 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url, | 73 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url, |
| 71 request_info_->priority, net_log, | 74 request_info_->priority, net_log, |
| 72 base::Bind(&BidirectionalStreamSpdyImpl::OnStreamInitialized, | 75 base::Bind(&BidirectionalStreamSpdyImpl::OnStreamInitialized, |
| 73 weak_factory_.GetWeakPtr())); | 76 weak_factory_.GetWeakPtr())); |
| (...skipping 23 matching lines...) Expand all Loading... |
| 97 // Read will complete asynchronously and Delegate::OnReadCompleted will be | 100 // Read will complete asynchronously and Delegate::OnReadCompleted will be |
| 98 // called upon completion. | 101 // called upon completion. |
| 99 read_buffer_ = buf; | 102 read_buffer_ = buf; |
| 100 read_buffer_len_ = buf_len; | 103 read_buffer_len_ = buf_len; |
| 101 return ERR_IO_PENDING; | 104 return ERR_IO_PENDING; |
| 102 } | 105 } |
| 103 | 106 |
| 104 void BidirectionalStreamSpdyImpl::SendData(const scoped_refptr<IOBuffer>& data, | 107 void BidirectionalStreamSpdyImpl::SendData(const scoped_refptr<IOBuffer>& data, |
| 105 int length, | 108 int length, |
| 106 bool end_stream) { | 109 bool end_stream) { |
| 107 DCHECK(!stream_closed_); | 110 DCHECK(length > 0 || (length == 0 && end_stream)); |
| 108 DCHECK(stream_); | 111 |
| 112 if (!stream_) { |
| 113 LOG(ERROR) << "Trying to send data after stream has been destroyed."; |
| 114 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 115 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, |
| 116 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); |
| 117 return; |
| 118 } |
| 109 | 119 |
| 110 stream_->SendData(data.get(), length, | 120 stream_->SendData(data.get(), length, |
| 111 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | 121 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); |
| 112 } | 122 } |
| 113 | 123 |
| 114 void BidirectionalStreamSpdyImpl::SendvData( | 124 void BidirectionalStreamSpdyImpl::SendvData( |
| 115 const std::vector<scoped_refptr<IOBuffer>>& buffers, | 125 const std::vector<scoped_refptr<IOBuffer>>& buffers, |
| 116 const std::vector<int>& lengths, | 126 const std::vector<int>& lengths, |
| 117 bool end_stream) { | 127 bool end_stream) { |
| 118 DCHECK(!stream_closed_); | |
| 119 DCHECK(stream_); | |
| 120 DCHECK_EQ(buffers.size(), lengths.size()); | 128 DCHECK_EQ(buffers.size(), lengths.size()); |
| 121 | 129 |
| 130 if (!stream_) { |
| 131 LOG(ERROR) << "Trying to send data after stream has been destroyed."; |
| 132 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 133 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, |
| 134 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); |
| 135 return; |
| 136 } |
| 137 |
| 122 int total_len = 0; | 138 int total_len = 0; |
| 123 for (int len : lengths) { | 139 for (int len : lengths) { |
| 124 total_len += len; | 140 total_len += len; |
| 125 } | 141 } |
| 126 | 142 |
| 127 pending_combined_buffer_ = new net::IOBuffer(total_len); | 143 pending_combined_buffer_ = new net::IOBuffer(total_len); |
| 128 int len = 0; | 144 int len = 0; |
| 129 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames. | 145 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames. |
| 130 for (size_t i = 0; i < buffers.size(); ++i) { | 146 for (size_t i = 0; i < buffers.size(); ++i) { |
| 131 memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(), | 147 memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(), |
| 132 lengths[i]); | 148 lengths[i]); |
| 133 len += lengths[i]; | 149 len += lengths[i]; |
| 134 } | 150 } |
| 135 stream_->SendData(pending_combined_buffer_.get(), total_len, | 151 stream_->SendData(pending_combined_buffer_.get(), total_len, |
| 136 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | 152 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); |
| 137 } | 153 } |
| 138 | 154 |
| 139 void BidirectionalStreamSpdyImpl::Cancel() { | 155 void BidirectionalStreamSpdyImpl::Cancel() { |
| 140 if (!stream_) | 156 if (!stream_) |
| 141 return; | 157 return; |
| 142 // Cancels the stream and detaches the delegate so it doesn't get called back. | 158 // Cancels the stream and detaches the delegate so it doesn't get called back. |
| 143 stream_->DetachDelegate(); | 159 stream_->DetachDelegate(); |
| 144 DCHECK(!stream_); | 160 DCHECK(!stream_); |
| 161 delegate_ = nullptr; |
| 162 // Cancel any pending callback. |
| 163 weak_factory_.InvalidateWeakPtrs(); |
| 145 } | 164 } |
| 146 | 165 |
| 147 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const { | 166 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const { |
| 148 return negotiated_protocol_; | 167 return negotiated_protocol_; |
| 149 } | 168 } |
| 150 | 169 |
| 151 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const { | 170 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const { |
| 152 if (stream_closed_) | 171 if (stream_closed_) |
| 153 return closed_stream_received_bytes_; | 172 return closed_stream_received_bytes_; |
| 154 | 173 |
| (...skipping 10 matching lines...) Expand all Loading... |
| 165 if (!stream_) | 184 if (!stream_) |
| 166 return 0; | 185 return 0; |
| 167 | 186 |
| 168 return stream_->raw_sent_bytes(); | 187 return stream_->raw_sent_bytes(); |
| 169 } | 188 } |
| 170 | 189 |
| 171 void BidirectionalStreamSpdyImpl::OnRequestHeadersSent() { | 190 void BidirectionalStreamSpdyImpl::OnRequestHeadersSent() { |
| 172 DCHECK(stream_); | 191 DCHECK(stream_); |
| 173 | 192 |
| 174 negotiated_protocol_ = stream_->GetProtocol(); | 193 negotiated_protocol_ = stream_->GetProtocol(); |
| 175 delegate_->OnStreamReady(/*request_headers_sent=*/true); | 194 if (delegate_) |
| 195 delegate_->OnStreamReady(/*request_headers_sent=*/true); |
| 176 } | 196 } |
| 177 | 197 |
| 178 SpdyResponseHeadersStatus BidirectionalStreamSpdyImpl::OnResponseHeadersUpdated( | 198 SpdyResponseHeadersStatus BidirectionalStreamSpdyImpl::OnResponseHeadersUpdated( |
| 179 const SpdyHeaderBlock& response_headers) { | 199 const SpdyHeaderBlock& response_headers) { |
| 180 DCHECK(stream_); | 200 DCHECK(stream_); |
| 181 | 201 |
| 182 delegate_->OnHeadersReceived(response_headers); | 202 if (delegate_) |
| 203 delegate_->OnHeadersReceived(response_headers); |
| 204 |
| 183 return RESPONSE_HEADERS_ARE_COMPLETE; | 205 return RESPONSE_HEADERS_ARE_COMPLETE; |
| 184 } | 206 } |
| 185 | 207 |
| 186 void BidirectionalStreamSpdyImpl::OnDataReceived( | 208 void BidirectionalStreamSpdyImpl::OnDataReceived( |
| 187 std::unique_ptr<SpdyBuffer> buffer) { | 209 std::unique_ptr<SpdyBuffer> buffer) { |
| 188 DCHECK(stream_); | 210 DCHECK(stream_); |
| 189 DCHECK(!stream_closed_); | 211 DCHECK(!stream_closed_); |
| 190 | 212 |
| 191 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked | 213 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked |
| 192 // by SpdyStream to indicate the end of stream. | 214 // by SpdyStream to indicate the end of stream. |
| 193 if (!buffer) | 215 if (!buffer) |
| 194 return; | 216 return; |
| 195 | 217 |
| 196 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust | 218 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust |
| 197 // recv window size accordingly. | 219 // recv window size accordingly. |
| 198 read_data_queue_.Enqueue(std::move(buffer)); | 220 read_data_queue_.Enqueue(std::move(buffer)); |
| 199 if (read_buffer_) { | 221 if (read_buffer_) { |
| 200 // Handing small chunks of data to the caller creates measurable overhead. | 222 // Handing small chunks of data to the caller creates measurable overhead. |
| 201 // So buffer data in short time-spans and send a single read notification. | 223 // So buffer data in short time-spans and send a single read notification. |
| 202 ScheduleBufferedRead(); | 224 ScheduleBufferedRead(); |
| 203 } | 225 } |
| 204 } | 226 } |
| 205 | 227 |
| 206 void BidirectionalStreamSpdyImpl::OnDataSent() { | 228 void BidirectionalStreamSpdyImpl::OnDataSent() { |
| 207 DCHECK(stream_); | 229 DCHECK(stream_); |
| 208 DCHECK(!stream_closed_); | 230 DCHECK(!stream_closed_); |
| 209 | 231 |
| 210 pending_combined_buffer_ = nullptr; | 232 pending_combined_buffer_ = nullptr; |
| 211 delegate_->OnDataSent(); | 233 if (delegate_) |
| 234 delegate_->OnDataSent(); |
| 212 } | 235 } |
| 213 | 236 |
| 214 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) { | 237 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) { |
| 215 DCHECK(stream_); | 238 DCHECK(stream_); |
| 216 DCHECK(!stream_closed_); | 239 DCHECK(!stream_closed_); |
| 217 | 240 |
| 218 delegate_->OnTrailersReceived(trailers); | 241 if (delegate_) |
| 242 delegate_->OnTrailersReceived(trailers); |
| 219 } | 243 } |
| 220 | 244 |
| 221 void BidirectionalStreamSpdyImpl::OnClose(int status) { | 245 void BidirectionalStreamSpdyImpl::OnClose(int status) { |
| 222 DCHECK(stream_); | 246 DCHECK(stream_); |
| 223 | 247 |
| 224 stream_closed_ = true; | 248 stream_closed_ = true; |
| 225 closed_stream_status_ = status; | 249 closed_stream_status_ = status; |
| 226 closed_stream_received_bytes_ = stream_->raw_received_bytes(); | 250 closed_stream_received_bytes_ = stream_->raw_received_bytes(); |
| 227 closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); | 251 closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); |
| 228 stream_.reset(); | |
| 229 | 252 |
| 230 if (status != OK) { | 253 if (status != OK) { |
| 231 delegate_->OnFailed(status); | 254 NotifyError(status); |
| 232 return; | 255 return; |
| 233 } | 256 } |
| 257 stream_.reset(); |
| 234 // Complete any remaining read, as all data has been buffered. | 258 // Complete any remaining read, as all data has been buffered. |
| 235 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will | 259 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will |
| 236 // do nothing. | 260 // do nothing. |
| 237 timer_->Stop(); | 261 timer_->Stop(); |
| 238 DoBufferedRead(); | 262 DoBufferedRead(); |
| 239 } | 263 } |
| 240 | 264 |
| 241 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() { | 265 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() { |
| 242 std::unique_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); | 266 std::unique_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); |
| 243 HttpRequestInfo http_request_info; | 267 HttpRequestInfo http_request_info; |
| (...skipping 16 matching lines...) Expand all Loading... |
| 260 stream_ = stream_request_.ReleaseStream(); | 284 stream_ = stream_request_.ReleaseStream(); |
| 261 stream_->SetDelegate(this); | 285 stream_->SetDelegate(this); |
| 262 rv = SendRequestHeadersHelper(); | 286 rv = SendRequestHeadersHelper(); |
| 263 if (rv == OK) { | 287 if (rv == OK) { |
| 264 OnRequestHeadersSent(); | 288 OnRequestHeadersSent(); |
| 265 return; | 289 return; |
| 266 } else if (rv == ERR_IO_PENDING) { | 290 } else if (rv == ERR_IO_PENDING) { |
| 267 return; | 291 return; |
| 268 } | 292 } |
| 269 } | 293 } |
| 270 delegate_->OnFailed(rv); | 294 NotifyError(rv); |
| 295 } |
| 296 |
| 297 void BidirectionalStreamSpdyImpl::NotifyError(int rv) { |
| 298 stream_.reset(); |
| 299 if (!delegate_) |
| 300 return; |
| 301 BidirectionalStreamImpl::Delegate* delegate = delegate_; |
| 302 delegate_ = nullptr; |
| 303 // Cancel any pending callback. |
| 304 weak_factory_.InvalidateWeakPtrs(); |
| 305 delegate->OnFailed(rv); |
| 271 } | 306 } |
| 272 | 307 |
| 273 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() { | 308 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() { |
| 274 // If there is already a scheduled DoBufferedRead, don't issue | 309 // If there is already a scheduled DoBufferedRead, don't issue |
| 275 // another one. Mark that we have received more data and return. | 310 // another one. Mark that we have received more data and return. |
| 276 if (timer_->IsRunning()) { | 311 if (timer_->IsRunning()) { |
| 277 more_read_data_pending_ = true; | 312 more_read_data_pending_ = true; |
| 278 return; | 313 return; |
| 279 } | 314 } |
| 280 | 315 |
| (...skipping 15 matching lines...) Expand all Loading... |
| 296 ScheduleBufferedRead(); | 331 ScheduleBufferedRead(); |
| 297 return; | 332 return; |
| 298 } | 333 } |
| 299 | 334 |
| 300 int rv = 0; | 335 int rv = 0; |
| 301 if (read_buffer_) { | 336 if (read_buffer_) { |
| 302 rv = ReadData(read_buffer_.get(), read_buffer_len_); | 337 rv = ReadData(read_buffer_.get(), read_buffer_len_); |
| 303 DCHECK_NE(ERR_IO_PENDING, rv); | 338 DCHECK_NE(ERR_IO_PENDING, rv); |
| 304 read_buffer_ = nullptr; | 339 read_buffer_ = nullptr; |
| 305 read_buffer_len_ = 0; | 340 read_buffer_len_ = 0; |
| 306 delegate_->OnDataRead(rv); | 341 if (delegate_) |
| 342 delegate_->OnDataRead(rv); |
| 307 } | 343 } |
| 308 } | 344 } |
| 309 | 345 |
| 310 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const { | 346 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const { |
| 311 if (stream_closed_) | 347 if (stream_closed_) |
| 312 return false; | 348 return false; |
| 313 DCHECK_GT(read_buffer_len_, 0); | 349 DCHECK_GT(read_buffer_len_, 0); |
| 314 return read_data_queue_.GetTotalSize() < | 350 return read_data_queue_.GetTotalSize() < |
| 315 static_cast<size_t>(read_buffer_len_); | 351 static_cast<size_t>(read_buffer_len_); |
| 316 } | 352 } |
| 317 | 353 |
| 318 } // namespace net | 354 } // namespace net |
| OLD | NEW |