OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/spdy/bidirectional_stream_spdy_impl.h" | |
6 | |
7 #include <utility> | |
8 | |
9 #include "base/bind.h" | |
10 #include "base/location.h" | |
11 #include "base/logging.h" | |
12 #include "base/threading/thread_task_runner_handle.h" | |
13 #include "base/time/time.h" | |
14 #include "base/timer/timer.h" | |
15 #include "net/http/bidirectional_stream_request_info.h" | |
16 #include "net/spdy/spdy_buffer.h" | |
17 #include "net/spdy/spdy_header_block.h" | |
18 #include "net/spdy/spdy_http_utils.h" | |
19 #include "net/spdy/spdy_stream.h" | |
20 | |
21 namespace net { | |
22 | |
23 namespace { | |
24 | |
25 // Time to wait in millisecond to notify |delegate_| of data received. | |
26 // Handing small chunks of data to the caller creates measurable overhead. | |
27 // So buffer data in short time-spans and send a single read notification. | |
28 const int kBufferTimeMs = 1; | |
29 | |
30 } // namespace | |
31 | |
32 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl( | |
33 const base::WeakPtr<SpdySession>& spdy_session, | |
34 NetLogSource source_dependency) | |
35 : spdy_session_(spdy_session), | |
36 request_info_(nullptr), | |
37 delegate_(nullptr), | |
38 source_dependency_(source_dependency), | |
39 negotiated_protocol_(kProtoUnknown), | |
40 more_read_data_pending_(false), | |
41 read_buffer_len_(0), | |
42 written_end_of_stream_(false), | |
43 write_pending_(false), | |
44 stream_closed_(false), | |
45 closed_stream_status_(ERR_FAILED), | |
46 closed_stream_received_bytes_(0), | |
47 closed_stream_sent_bytes_(0), | |
48 closed_has_load_timing_info_(false), | |
49 weak_factory_(this) {} | |
50 | |
51 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() { | |
52 // Sends a RST to the remote if the stream is destroyed before it completes. | |
53 ResetStream(); | |
54 } | |
55 | |
56 void BidirectionalStreamSpdyImpl::Start( | |
57 const BidirectionalStreamRequestInfo* request_info, | |
58 const NetLogWithSource& net_log, | |
59 bool /*send_request_headers_automatically*/, | |
60 BidirectionalStreamImpl::Delegate* delegate, | |
61 std::unique_ptr<base::Timer> timer) { | |
62 DCHECK(!stream_); | |
63 DCHECK(timer); | |
64 | |
65 delegate_ = delegate; | |
66 timer_ = std::move(timer); | |
67 | |
68 if (!spdy_session_) { | |
69 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
70 FROM_HERE, | |
71 base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, | |
72 weak_factory_.GetWeakPtr(), ERR_CONNECTION_CLOSED)); | |
73 return; | |
74 } | |
75 | |
76 request_info_ = request_info; | |
77 | |
78 int rv = stream_request_.StartRequest( | |
79 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url, | |
80 request_info_->priority, net_log, | |
81 base::Bind(&BidirectionalStreamSpdyImpl::OnStreamInitialized, | |
82 weak_factory_.GetWeakPtr())); | |
83 if (rv != ERR_IO_PENDING) | |
84 OnStreamInitialized(rv); | |
85 } | |
86 | |
87 void BidirectionalStreamSpdyImpl::SendRequestHeaders() { | |
88 // Request headers will be sent automatically. | |
89 NOTREACHED(); | |
90 } | |
91 | |
92 int BidirectionalStreamSpdyImpl::ReadData(IOBuffer* buf, int buf_len) { | |
93 if (stream_) | |
94 DCHECK(!stream_->IsIdle()); | |
95 | |
96 DCHECK(buf); | |
97 DCHECK(buf_len); | |
98 DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight"; | |
99 | |
100 // If there is data buffered, complete the IO immediately. | |
101 if (!read_data_queue_.IsEmpty()) { | |
102 return read_data_queue_.Dequeue(buf->data(), buf_len); | |
103 } else if (stream_closed_) { | |
104 return closed_stream_status_; | |
105 } | |
106 // Read will complete asynchronously and Delegate::OnReadCompleted will be | |
107 // called upon completion. | |
108 read_buffer_ = buf; | |
109 read_buffer_len_ = buf_len; | |
110 return ERR_IO_PENDING; | |
111 } | |
112 | |
113 void BidirectionalStreamSpdyImpl::SendData(const scoped_refptr<IOBuffer>& data, | |
114 int length, | |
115 bool end_stream) { | |
116 DCHECK(length > 0 || (length == 0 && end_stream)); | |
117 DCHECK(!write_pending_); | |
118 | |
119 if (written_end_of_stream_) { | |
120 LOG(ERROR) << "Writing after end of stream is written."; | |
121 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
122 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, | |
123 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); | |
124 return; | |
125 } | |
126 | |
127 write_pending_ = true; | |
128 written_end_of_stream_ = end_stream; | |
129 if (MaybeHandleStreamClosedInSendData()) | |
130 return; | |
131 | |
132 DCHECK(!stream_closed_); | |
133 stream_->SendData(data.get(), length, | |
134 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
135 } | |
136 | |
137 void BidirectionalStreamSpdyImpl::SendvData( | |
138 const std::vector<scoped_refptr<IOBuffer>>& buffers, | |
139 const std::vector<int>& lengths, | |
140 bool end_stream) { | |
141 DCHECK_EQ(buffers.size(), lengths.size()); | |
142 DCHECK(!write_pending_); | |
143 | |
144 if (written_end_of_stream_) { | |
145 LOG(ERROR) << "Writing after end of stream is written."; | |
146 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
147 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, | |
148 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); | |
149 return; | |
150 } | |
151 | |
152 write_pending_ = true; | |
153 written_end_of_stream_ = end_stream; | |
154 if (MaybeHandleStreamClosedInSendData()) | |
155 return; | |
156 | |
157 DCHECK(!stream_closed_); | |
158 int total_len = 0; | |
159 for (int len : lengths) { | |
160 total_len += len; | |
161 } | |
162 | |
163 pending_combined_buffer_ = new net::IOBuffer(total_len); | |
164 int len = 0; | |
165 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames. | |
166 for (size_t i = 0; i < buffers.size(); ++i) { | |
167 memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(), | |
168 lengths[i]); | |
169 len += lengths[i]; | |
170 } | |
171 stream_->SendData(pending_combined_buffer_.get(), total_len, | |
172 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
173 } | |
174 | |
175 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const { | |
176 return negotiated_protocol_; | |
177 } | |
178 | |
179 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const { | |
180 if (stream_closed_) | |
181 return closed_stream_received_bytes_; | |
182 | |
183 if (!stream_) | |
184 return 0; | |
185 | |
186 return stream_->raw_received_bytes(); | |
187 } | |
188 | |
189 int64_t BidirectionalStreamSpdyImpl::GetTotalSentBytes() const { | |
190 if (stream_closed_) | |
191 return closed_stream_sent_bytes_; | |
192 | |
193 if (!stream_) | |
194 return 0; | |
195 | |
196 return stream_->raw_sent_bytes(); | |
197 } | |
198 | |
199 bool BidirectionalStreamSpdyImpl::GetLoadTimingInfo( | |
200 LoadTimingInfo* load_timing_info) const { | |
201 if (stream_closed_) { | |
202 if (!closed_has_load_timing_info_) | |
203 return false; | |
204 *load_timing_info = closed_load_timing_info_; | |
205 return true; | |
206 } | |
207 | |
208 // If |stream_| isn't created or has ID 0, return false. This is to match | |
209 // the implementation in SpdyHttpStream. | |
210 if (!stream_ || stream_->stream_id() == 0) | |
211 return false; | |
212 | |
213 return stream_->GetLoadTimingInfo(load_timing_info); | |
214 } | |
215 | |
216 void BidirectionalStreamSpdyImpl::OnHeadersSent() { | |
217 DCHECK(stream_); | |
218 | |
219 negotiated_protocol_ = kProtoHTTP2; | |
220 if (delegate_) | |
221 delegate_->OnStreamReady(/*request_headers_sent=*/true); | |
222 } | |
223 | |
224 void BidirectionalStreamSpdyImpl::OnHeadersReceived( | |
225 const SpdyHeaderBlock& response_headers) { | |
226 DCHECK(stream_); | |
227 | |
228 if (delegate_) | |
229 delegate_->OnHeadersReceived(response_headers); | |
230 } | |
231 | |
232 void BidirectionalStreamSpdyImpl::OnDataReceived( | |
233 std::unique_ptr<SpdyBuffer> buffer) { | |
234 DCHECK(stream_); | |
235 DCHECK(!stream_closed_); | |
236 | |
237 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked | |
238 // by SpdyStream to indicate the end of stream. | |
239 if (!buffer) | |
240 return; | |
241 | |
242 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust | |
243 // recv window size accordingly. | |
244 read_data_queue_.Enqueue(std::move(buffer)); | |
245 if (read_buffer_) { | |
246 // Handing small chunks of data to the caller creates measurable overhead. | |
247 // So buffer data in short time-spans and send a single read notification. | |
248 ScheduleBufferedRead(); | |
249 } | |
250 } | |
251 | |
252 void BidirectionalStreamSpdyImpl::OnDataSent() { | |
253 DCHECK(write_pending_); | |
254 | |
255 pending_combined_buffer_ = nullptr; | |
256 write_pending_ = false; | |
257 | |
258 if (delegate_) | |
259 delegate_->OnDataSent(); | |
260 } | |
261 | |
262 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) { | |
263 DCHECK(stream_); | |
264 DCHECK(!stream_closed_); | |
265 | |
266 if (delegate_) | |
267 delegate_->OnTrailersReceived(trailers); | |
268 } | |
269 | |
270 void BidirectionalStreamSpdyImpl::OnClose(int status) { | |
271 DCHECK(stream_); | |
272 | |
273 stream_closed_ = true; | |
274 closed_stream_status_ = status; | |
275 closed_stream_received_bytes_ = stream_->raw_received_bytes(); | |
276 closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); | |
277 closed_has_load_timing_info_ = | |
278 stream_->GetLoadTimingInfo(&closed_load_timing_info_); | |
279 | |
280 if (status != OK) { | |
281 NotifyError(status); | |
282 return; | |
283 } | |
284 ResetStream(); | |
285 // Complete any remaining read, as all data has been buffered. | |
286 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will | |
287 // do nothing. | |
288 timer_->Stop(); | |
289 | |
290 // |this| might get destroyed after calling into |delegate_| in | |
291 // DoBufferedRead(). | |
292 auto weak_this = weak_factory_.GetWeakPtr(); | |
293 DoBufferedRead(); | |
294 if (weak_this.get() && write_pending_) | |
295 OnDataSent(); | |
296 } | |
297 | |
298 NetLogSource BidirectionalStreamSpdyImpl::source_dependency() const { | |
299 return source_dependency_; | |
300 } | |
301 | |
302 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() { | |
303 SpdyHeaderBlock headers; | |
304 HttpRequestInfo http_request_info; | |
305 http_request_info.url = request_info_->url; | |
306 http_request_info.method = request_info_->method; | |
307 http_request_info.extra_headers = request_info_->extra_headers; | |
308 | |
309 CreateSpdyHeadersFromHttpRequest( | |
310 http_request_info, http_request_info.extra_headers, true, &headers); | |
311 written_end_of_stream_ = request_info_->end_stream_on_headers; | |
312 return stream_->SendRequestHeaders(std::move(headers), | |
313 request_info_->end_stream_on_headers | |
314 ? NO_MORE_DATA_TO_SEND | |
315 : MORE_DATA_TO_SEND); | |
316 } | |
317 | |
318 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) { | |
319 DCHECK_NE(ERR_IO_PENDING, rv); | |
320 if (rv == OK) { | |
321 stream_ = stream_request_.ReleaseStream(); | |
322 stream_->SetDelegate(this); | |
323 rv = SendRequestHeadersHelper(); | |
324 if (rv == OK) { | |
325 OnHeadersSent(); | |
326 return; | |
327 } else if (rv == ERR_IO_PENDING) { | |
328 return; | |
329 } | |
330 } | |
331 NotifyError(rv); | |
332 } | |
333 | |
334 void BidirectionalStreamSpdyImpl::NotifyError(int rv) { | |
335 ResetStream(); | |
336 write_pending_ = false; | |
337 if (delegate_) { | |
338 BidirectionalStreamImpl::Delegate* delegate = delegate_; | |
339 delegate_ = nullptr; | |
340 // Cancel any pending callback. | |
341 weak_factory_.InvalidateWeakPtrs(); | |
342 delegate->OnFailed(rv); | |
343 // |this| can be null when returned from delegate. | |
344 } | |
345 } | |
346 | |
347 void BidirectionalStreamSpdyImpl::ResetStream() { | |
348 if (!stream_) | |
349 return; | |
350 if (!stream_->IsClosed()) { | |
351 // This sends a RST to the remote. | |
352 stream_->DetachDelegate(); | |
353 DCHECK(!stream_); | |
354 } else { | |
355 // Stream is already closed, so it is not legal to call DetachDelegate. | |
356 stream_.reset(); | |
357 } | |
358 } | |
359 | |
360 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() { | |
361 // If there is already a scheduled DoBufferedRead, don't issue | |
362 // another one. Mark that we have received more data and return. | |
363 if (timer_->IsRunning()) { | |
364 more_read_data_pending_ = true; | |
365 return; | |
366 } | |
367 | |
368 more_read_data_pending_ = false; | |
369 timer_->Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kBufferTimeMs), | |
370 base::Bind(&BidirectionalStreamSpdyImpl::DoBufferedRead, | |
371 weak_factory_.GetWeakPtr())); | |
372 } | |
373 | |
374 void BidirectionalStreamSpdyImpl::DoBufferedRead() { | |
375 DCHECK(!timer_->IsRunning()); | |
376 // Check to see that the stream has not errored out. | |
377 DCHECK(stream_ || stream_closed_); | |
378 DCHECK(!stream_closed_ || closed_stream_status_ == OK); | |
379 | |
380 // When |more_read_data_pending_| is true, it means that more data has arrived | |
381 // since started waiting. Wait a little longer and continue to buffer. | |
382 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { | |
383 ScheduleBufferedRead(); | |
384 return; | |
385 } | |
386 | |
387 int rv = 0; | |
388 if (read_buffer_) { | |
389 rv = ReadData(read_buffer_.get(), read_buffer_len_); | |
390 DCHECK_NE(ERR_IO_PENDING, rv); | |
391 read_buffer_ = nullptr; | |
392 read_buffer_len_ = 0; | |
393 if (delegate_) | |
394 delegate_->OnDataRead(rv); | |
395 } | |
396 } | |
397 | |
398 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const { | |
399 if (stream_closed_) | |
400 return false; | |
401 DCHECK_GT(read_buffer_len_, 0); | |
402 return read_data_queue_.GetTotalSize() < | |
403 static_cast<size_t>(read_buffer_len_); | |
404 } | |
405 | |
406 bool BidirectionalStreamSpdyImpl::MaybeHandleStreamClosedInSendData() { | |
407 if (stream_) | |
408 return false; | |
409 // If |stream_| is closed without an error before client half closes, | |
410 // blackhole any pending write data. crbug.com/650438. | |
411 if (stream_closed_ && closed_stream_status_ == OK) { | |
412 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
413 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::OnDataSent, | |
414 weak_factory_.GetWeakPtr())); | |
415 return true; | |
416 } | |
417 LOG(ERROR) << "Trying to send data after stream has been destroyed."; | |
418 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
419 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, | |
420 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); | |
421 return true; | |
422 } | |
423 | |
424 } // namespace net | |
OLD | NEW |