| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 #include "net/spdy/bidirectional_stream_spdy_impl.h" | |
| 6 | |
| 7 #include <utility> | |
| 8 | |
| 9 #include "base/bind.h" | |
| 10 #include "base/location.h" | |
| 11 #include "base/logging.h" | |
| 12 #include "base/threading/thread_task_runner_handle.h" | |
| 13 #include "base/time/time.h" | |
| 14 #include "base/timer/timer.h" | |
| 15 #include "net/http/bidirectional_stream_request_info.h" | |
| 16 #include "net/spdy/spdy_buffer.h" | |
| 17 #include "net/spdy/spdy_header_block.h" | |
| 18 #include "net/spdy/spdy_http_utils.h" | |
| 19 #include "net/spdy/spdy_stream.h" | |
| 20 | |
| 21 namespace net { | |
| 22 | |
| 23 namespace { | |
| 24 | |
| 25 // Time to wait in millisecond to notify |delegate_| of data received. | |
| 26 // Handing small chunks of data to the caller creates measurable overhead. | |
| 27 // So buffer data in short time-spans and send a single read notification. | |
| 28 const int kBufferTimeMs = 1; | |
| 29 | |
| 30 } // namespace | |
| 31 | |
| 32 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl( | |
| 33 const base::WeakPtr<SpdySession>& spdy_session, | |
| 34 NetLogSource source_dependency) | |
| 35 : spdy_session_(spdy_session), | |
| 36 request_info_(nullptr), | |
| 37 delegate_(nullptr), | |
| 38 source_dependency_(source_dependency), | |
| 39 negotiated_protocol_(kProtoUnknown), | |
| 40 more_read_data_pending_(false), | |
| 41 read_buffer_len_(0), | |
| 42 written_end_of_stream_(false), | |
| 43 write_pending_(false), | |
| 44 stream_closed_(false), | |
| 45 closed_stream_status_(ERR_FAILED), | |
| 46 closed_stream_received_bytes_(0), | |
| 47 closed_stream_sent_bytes_(0), | |
| 48 closed_has_load_timing_info_(false), | |
| 49 weak_factory_(this) {} | |
| 50 | |
| 51 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() { | |
| 52 // Sends a RST to the remote if the stream is destroyed before it completes. | |
| 53 ResetStream(); | |
| 54 } | |
| 55 | |
| 56 void BidirectionalStreamSpdyImpl::Start( | |
| 57 const BidirectionalStreamRequestInfo* request_info, | |
| 58 const NetLogWithSource& net_log, | |
| 59 bool /*send_request_headers_automatically*/, | |
| 60 BidirectionalStreamImpl::Delegate* delegate, | |
| 61 std::unique_ptr<base::Timer> timer) { | |
| 62 DCHECK(!stream_); | |
| 63 DCHECK(timer); | |
| 64 | |
| 65 delegate_ = delegate; | |
| 66 timer_ = std::move(timer); | |
| 67 | |
| 68 if (!spdy_session_) { | |
| 69 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 70 FROM_HERE, | |
| 71 base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, | |
| 72 weak_factory_.GetWeakPtr(), ERR_CONNECTION_CLOSED)); | |
| 73 return; | |
| 74 } | |
| 75 | |
| 76 request_info_ = request_info; | |
| 77 | |
| 78 int rv = stream_request_.StartRequest( | |
| 79 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url, | |
| 80 request_info_->priority, net_log, | |
| 81 base::Bind(&BidirectionalStreamSpdyImpl::OnStreamInitialized, | |
| 82 weak_factory_.GetWeakPtr())); | |
| 83 if (rv != ERR_IO_PENDING) | |
| 84 OnStreamInitialized(rv); | |
| 85 } | |
| 86 | |
| 87 void BidirectionalStreamSpdyImpl::SendRequestHeaders() { | |
| 88 // Request headers will be sent automatically. | |
| 89 NOTREACHED(); | |
| 90 } | |
| 91 | |
| 92 int BidirectionalStreamSpdyImpl::ReadData(IOBuffer* buf, int buf_len) { | |
| 93 if (stream_) | |
| 94 DCHECK(!stream_->IsIdle()); | |
| 95 | |
| 96 DCHECK(buf); | |
| 97 DCHECK(buf_len); | |
| 98 DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight"; | |
| 99 | |
| 100 // If there is data buffered, complete the IO immediately. | |
| 101 if (!read_data_queue_.IsEmpty()) { | |
| 102 return read_data_queue_.Dequeue(buf->data(), buf_len); | |
| 103 } else if (stream_closed_) { | |
| 104 return closed_stream_status_; | |
| 105 } | |
| 106 // Read will complete asynchronously and Delegate::OnReadCompleted will be | |
| 107 // called upon completion. | |
| 108 read_buffer_ = buf; | |
| 109 read_buffer_len_ = buf_len; | |
| 110 return ERR_IO_PENDING; | |
| 111 } | |
| 112 | |
| 113 void BidirectionalStreamSpdyImpl::SendData(const scoped_refptr<IOBuffer>& data, | |
| 114 int length, | |
| 115 bool end_stream) { | |
| 116 DCHECK(length > 0 || (length == 0 && end_stream)); | |
| 117 DCHECK(!write_pending_); | |
| 118 | |
| 119 if (written_end_of_stream_) { | |
| 120 LOG(ERROR) << "Writing after end of stream is written."; | |
| 121 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 122 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, | |
| 123 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); | |
| 124 return; | |
| 125 } | |
| 126 | |
| 127 write_pending_ = true; | |
| 128 written_end_of_stream_ = end_stream; | |
| 129 if (MaybeHandleStreamClosedInSendData()) | |
| 130 return; | |
| 131 | |
| 132 DCHECK(!stream_closed_); | |
| 133 stream_->SendData(data.get(), length, | |
| 134 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
| 135 } | |
| 136 | |
| 137 void BidirectionalStreamSpdyImpl::SendvData( | |
| 138 const std::vector<scoped_refptr<IOBuffer>>& buffers, | |
| 139 const std::vector<int>& lengths, | |
| 140 bool end_stream) { | |
| 141 DCHECK_EQ(buffers.size(), lengths.size()); | |
| 142 DCHECK(!write_pending_); | |
| 143 | |
| 144 if (written_end_of_stream_) { | |
| 145 LOG(ERROR) << "Writing after end of stream is written."; | |
| 146 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 147 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, | |
| 148 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); | |
| 149 return; | |
| 150 } | |
| 151 | |
| 152 write_pending_ = true; | |
| 153 written_end_of_stream_ = end_stream; | |
| 154 if (MaybeHandleStreamClosedInSendData()) | |
| 155 return; | |
| 156 | |
| 157 DCHECK(!stream_closed_); | |
| 158 int total_len = 0; | |
| 159 for (int len : lengths) { | |
| 160 total_len += len; | |
| 161 } | |
| 162 | |
| 163 pending_combined_buffer_ = new net::IOBuffer(total_len); | |
| 164 int len = 0; | |
| 165 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames. | |
| 166 for (size_t i = 0; i < buffers.size(); ++i) { | |
| 167 memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(), | |
| 168 lengths[i]); | |
| 169 len += lengths[i]; | |
| 170 } | |
| 171 stream_->SendData(pending_combined_buffer_.get(), total_len, | |
| 172 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
| 173 } | |
| 174 | |
| 175 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const { | |
| 176 return negotiated_protocol_; | |
| 177 } | |
| 178 | |
| 179 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const { | |
| 180 if (stream_closed_) | |
| 181 return closed_stream_received_bytes_; | |
| 182 | |
| 183 if (!stream_) | |
| 184 return 0; | |
| 185 | |
| 186 return stream_->raw_received_bytes(); | |
| 187 } | |
| 188 | |
| 189 int64_t BidirectionalStreamSpdyImpl::GetTotalSentBytes() const { | |
| 190 if (stream_closed_) | |
| 191 return closed_stream_sent_bytes_; | |
| 192 | |
| 193 if (!stream_) | |
| 194 return 0; | |
| 195 | |
| 196 return stream_->raw_sent_bytes(); | |
| 197 } | |
| 198 | |
| 199 bool BidirectionalStreamSpdyImpl::GetLoadTimingInfo( | |
| 200 LoadTimingInfo* load_timing_info) const { | |
| 201 if (stream_closed_) { | |
| 202 if (!closed_has_load_timing_info_) | |
| 203 return false; | |
| 204 *load_timing_info = closed_load_timing_info_; | |
| 205 return true; | |
| 206 } | |
| 207 | |
| 208 // If |stream_| isn't created or has ID 0, return false. This is to match | |
| 209 // the implementation in SpdyHttpStream. | |
| 210 if (!stream_ || stream_->stream_id() == 0) | |
| 211 return false; | |
| 212 | |
| 213 return stream_->GetLoadTimingInfo(load_timing_info); | |
| 214 } | |
| 215 | |
| 216 void BidirectionalStreamSpdyImpl::OnHeadersSent() { | |
| 217 DCHECK(stream_); | |
| 218 | |
| 219 negotiated_protocol_ = kProtoHTTP2; | |
| 220 if (delegate_) | |
| 221 delegate_->OnStreamReady(/*request_headers_sent=*/true); | |
| 222 } | |
| 223 | |
| 224 void BidirectionalStreamSpdyImpl::OnHeadersReceived( | |
| 225 const SpdyHeaderBlock& response_headers) { | |
| 226 DCHECK(stream_); | |
| 227 | |
| 228 if (delegate_) | |
| 229 delegate_->OnHeadersReceived(response_headers); | |
| 230 } | |
| 231 | |
| 232 void BidirectionalStreamSpdyImpl::OnDataReceived( | |
| 233 std::unique_ptr<SpdyBuffer> buffer) { | |
| 234 DCHECK(stream_); | |
| 235 DCHECK(!stream_closed_); | |
| 236 | |
| 237 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked | |
| 238 // by SpdyStream to indicate the end of stream. | |
| 239 if (!buffer) | |
| 240 return; | |
| 241 | |
| 242 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust | |
| 243 // recv window size accordingly. | |
| 244 read_data_queue_.Enqueue(std::move(buffer)); | |
| 245 if (read_buffer_) { | |
| 246 // Handing small chunks of data to the caller creates measurable overhead. | |
| 247 // So buffer data in short time-spans and send a single read notification. | |
| 248 ScheduleBufferedRead(); | |
| 249 } | |
| 250 } | |
| 251 | |
| 252 void BidirectionalStreamSpdyImpl::OnDataSent() { | |
| 253 DCHECK(write_pending_); | |
| 254 | |
| 255 pending_combined_buffer_ = nullptr; | |
| 256 write_pending_ = false; | |
| 257 | |
| 258 if (delegate_) | |
| 259 delegate_->OnDataSent(); | |
| 260 } | |
| 261 | |
| 262 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) { | |
| 263 DCHECK(stream_); | |
| 264 DCHECK(!stream_closed_); | |
| 265 | |
| 266 if (delegate_) | |
| 267 delegate_->OnTrailersReceived(trailers); | |
| 268 } | |
| 269 | |
| 270 void BidirectionalStreamSpdyImpl::OnClose(int status) { | |
| 271 DCHECK(stream_); | |
| 272 | |
| 273 stream_closed_ = true; | |
| 274 closed_stream_status_ = status; | |
| 275 closed_stream_received_bytes_ = stream_->raw_received_bytes(); | |
| 276 closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); | |
| 277 closed_has_load_timing_info_ = | |
| 278 stream_->GetLoadTimingInfo(&closed_load_timing_info_); | |
| 279 | |
| 280 if (status != OK) { | |
| 281 NotifyError(status); | |
| 282 return; | |
| 283 } | |
| 284 ResetStream(); | |
| 285 // Complete any remaining read, as all data has been buffered. | |
| 286 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will | |
| 287 // do nothing. | |
| 288 timer_->Stop(); | |
| 289 | |
| 290 // |this| might get destroyed after calling into |delegate_| in | |
| 291 // DoBufferedRead(). | |
| 292 auto weak_this = weak_factory_.GetWeakPtr(); | |
| 293 DoBufferedRead(); | |
| 294 if (weak_this.get() && write_pending_) | |
| 295 OnDataSent(); | |
| 296 } | |
| 297 | |
| 298 NetLogSource BidirectionalStreamSpdyImpl::source_dependency() const { | |
| 299 return source_dependency_; | |
| 300 } | |
| 301 | |
| 302 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() { | |
| 303 SpdyHeaderBlock headers; | |
| 304 HttpRequestInfo http_request_info; | |
| 305 http_request_info.url = request_info_->url; | |
| 306 http_request_info.method = request_info_->method; | |
| 307 http_request_info.extra_headers = request_info_->extra_headers; | |
| 308 | |
| 309 CreateSpdyHeadersFromHttpRequest( | |
| 310 http_request_info, http_request_info.extra_headers, true, &headers); | |
| 311 written_end_of_stream_ = request_info_->end_stream_on_headers; | |
| 312 return stream_->SendRequestHeaders(std::move(headers), | |
| 313 request_info_->end_stream_on_headers | |
| 314 ? NO_MORE_DATA_TO_SEND | |
| 315 : MORE_DATA_TO_SEND); | |
| 316 } | |
| 317 | |
| 318 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) { | |
| 319 DCHECK_NE(ERR_IO_PENDING, rv); | |
| 320 if (rv == OK) { | |
| 321 stream_ = stream_request_.ReleaseStream(); | |
| 322 stream_->SetDelegate(this); | |
| 323 rv = SendRequestHeadersHelper(); | |
| 324 if (rv == OK) { | |
| 325 OnHeadersSent(); | |
| 326 return; | |
| 327 } else if (rv == ERR_IO_PENDING) { | |
| 328 return; | |
| 329 } | |
| 330 } | |
| 331 NotifyError(rv); | |
| 332 } | |
| 333 | |
| 334 void BidirectionalStreamSpdyImpl::NotifyError(int rv) { | |
| 335 ResetStream(); | |
| 336 write_pending_ = false; | |
| 337 if (delegate_) { | |
| 338 BidirectionalStreamImpl::Delegate* delegate = delegate_; | |
| 339 delegate_ = nullptr; | |
| 340 // Cancel any pending callback. | |
| 341 weak_factory_.InvalidateWeakPtrs(); | |
| 342 delegate->OnFailed(rv); | |
| 343 // |this| can be null when returned from delegate. | |
| 344 } | |
| 345 } | |
| 346 | |
| 347 void BidirectionalStreamSpdyImpl::ResetStream() { | |
| 348 if (!stream_) | |
| 349 return; | |
| 350 if (!stream_->IsClosed()) { | |
| 351 // This sends a RST to the remote. | |
| 352 stream_->DetachDelegate(); | |
| 353 DCHECK(!stream_); | |
| 354 } else { | |
| 355 // Stream is already closed, so it is not legal to call DetachDelegate. | |
| 356 stream_.reset(); | |
| 357 } | |
| 358 } | |
| 359 | |
| 360 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() { | |
| 361 // If there is already a scheduled DoBufferedRead, don't issue | |
| 362 // another one. Mark that we have received more data and return. | |
| 363 if (timer_->IsRunning()) { | |
| 364 more_read_data_pending_ = true; | |
| 365 return; | |
| 366 } | |
| 367 | |
| 368 more_read_data_pending_ = false; | |
| 369 timer_->Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kBufferTimeMs), | |
| 370 base::Bind(&BidirectionalStreamSpdyImpl::DoBufferedRead, | |
| 371 weak_factory_.GetWeakPtr())); | |
| 372 } | |
| 373 | |
| 374 void BidirectionalStreamSpdyImpl::DoBufferedRead() { | |
| 375 DCHECK(!timer_->IsRunning()); | |
| 376 // Check to see that the stream has not errored out. | |
| 377 DCHECK(stream_ || stream_closed_); | |
| 378 DCHECK(!stream_closed_ || closed_stream_status_ == OK); | |
| 379 | |
| 380 // When |more_read_data_pending_| is true, it means that more data has arrived | |
| 381 // since started waiting. Wait a little longer and continue to buffer. | |
| 382 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { | |
| 383 ScheduleBufferedRead(); | |
| 384 return; | |
| 385 } | |
| 386 | |
| 387 int rv = 0; | |
| 388 if (read_buffer_) { | |
| 389 rv = ReadData(read_buffer_.get(), read_buffer_len_); | |
| 390 DCHECK_NE(ERR_IO_PENDING, rv); | |
| 391 read_buffer_ = nullptr; | |
| 392 read_buffer_len_ = 0; | |
| 393 if (delegate_) | |
| 394 delegate_->OnDataRead(rv); | |
| 395 } | |
| 396 } | |
| 397 | |
| 398 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const { | |
| 399 if (stream_closed_) | |
| 400 return false; | |
| 401 DCHECK_GT(read_buffer_len_, 0); | |
| 402 return read_data_queue_.GetTotalSize() < | |
| 403 static_cast<size_t>(read_buffer_len_); | |
| 404 } | |
| 405 | |
| 406 bool BidirectionalStreamSpdyImpl::MaybeHandleStreamClosedInSendData() { | |
| 407 if (stream_) | |
| 408 return false; | |
| 409 // If |stream_| is closed without an error before client half closes, | |
| 410 // blackhole any pending write data. crbug.com/650438. | |
| 411 if (stream_closed_ && closed_stream_status_ == OK) { | |
| 412 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 413 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::OnDataSent, | |
| 414 weak_factory_.GetWeakPtr())); | |
| 415 return true; | |
| 416 } | |
| 417 LOG(ERROR) << "Trying to send data after stream has been destroyed."; | |
| 418 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
| 419 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, | |
| 420 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); | |
| 421 return true; | |
| 422 } | |
| 423 | |
| 424 } // namespace net | |
| OLD | NEW |