Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(154)

Side by Side Diff: net/spdy/bidirectional_stream_spdy_impl.cc

Issue 2832973003: Split net/spdy into core and chromium subdirectories. (Closed)
Patch Set: Fix some more build rules. Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/bidirectional_stream_spdy_impl.h"
6
7 #include <utility>
8
9 #include "base/bind.h"
10 #include "base/location.h"
11 #include "base/logging.h"
12 #include "base/threading/thread_task_runner_handle.h"
13 #include "base/time/time.h"
14 #include "base/timer/timer.h"
15 #include "net/http/bidirectional_stream_request_info.h"
16 #include "net/spdy/spdy_buffer.h"
17 #include "net/spdy/spdy_header_block.h"
18 #include "net/spdy/spdy_http_utils.h"
19 #include "net/spdy/spdy_stream.h"
20
21 namespace net {
22
23 namespace {
24
25 // Time to wait in millisecond to notify |delegate_| of data received.
26 // Handing small chunks of data to the caller creates measurable overhead.
27 // So buffer data in short time-spans and send a single read notification.
28 const int kBufferTimeMs = 1;
29
30 } // namespace
31
32 BidirectionalStreamSpdyImpl::BidirectionalStreamSpdyImpl(
33 const base::WeakPtr<SpdySession>& spdy_session,
34 NetLogSource source_dependency)
35 : spdy_session_(spdy_session),
36 request_info_(nullptr),
37 delegate_(nullptr),
38 source_dependency_(source_dependency),
39 negotiated_protocol_(kProtoUnknown),
40 more_read_data_pending_(false),
41 read_buffer_len_(0),
42 written_end_of_stream_(false),
43 write_pending_(false),
44 stream_closed_(false),
45 closed_stream_status_(ERR_FAILED),
46 closed_stream_received_bytes_(0),
47 closed_stream_sent_bytes_(0),
48 closed_has_load_timing_info_(false),
49 weak_factory_(this) {}
50
51 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() {
52 // Sends a RST to the remote if the stream is destroyed before it completes.
53 ResetStream();
54 }
55
56 void BidirectionalStreamSpdyImpl::Start(
57 const BidirectionalStreamRequestInfo* request_info,
58 const NetLogWithSource& net_log,
59 bool /*send_request_headers_automatically*/,
60 BidirectionalStreamImpl::Delegate* delegate,
61 std::unique_ptr<base::Timer> timer) {
62 DCHECK(!stream_);
63 DCHECK(timer);
64
65 delegate_ = delegate;
66 timer_ = std::move(timer);
67
68 if (!spdy_session_) {
69 base::ThreadTaskRunnerHandle::Get()->PostTask(
70 FROM_HERE,
71 base::Bind(&BidirectionalStreamSpdyImpl::NotifyError,
72 weak_factory_.GetWeakPtr(), ERR_CONNECTION_CLOSED));
73 return;
74 }
75
76 request_info_ = request_info;
77
78 int rv = stream_request_.StartRequest(
79 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url,
80 request_info_->priority, net_log,
81 base::Bind(&BidirectionalStreamSpdyImpl::OnStreamInitialized,
82 weak_factory_.GetWeakPtr()));
83 if (rv != ERR_IO_PENDING)
84 OnStreamInitialized(rv);
85 }
86
87 void BidirectionalStreamSpdyImpl::SendRequestHeaders() {
88 // Request headers will be sent automatically.
89 NOTREACHED();
90 }
91
92 int BidirectionalStreamSpdyImpl::ReadData(IOBuffer* buf, int buf_len) {
93 if (stream_)
94 DCHECK(!stream_->IsIdle());
95
96 DCHECK(buf);
97 DCHECK(buf_len);
98 DCHECK(!timer_->IsRunning()) << "There should be only one ReadData in flight";
99
100 // If there is data buffered, complete the IO immediately.
101 if (!read_data_queue_.IsEmpty()) {
102 return read_data_queue_.Dequeue(buf->data(), buf_len);
103 } else if (stream_closed_) {
104 return closed_stream_status_;
105 }
106 // Read will complete asynchronously and Delegate::OnReadCompleted will be
107 // called upon completion.
108 read_buffer_ = buf;
109 read_buffer_len_ = buf_len;
110 return ERR_IO_PENDING;
111 }
112
113 void BidirectionalStreamSpdyImpl::SendData(const scoped_refptr<IOBuffer>& data,
114 int length,
115 bool end_stream) {
116 DCHECK(length > 0 || (length == 0 && end_stream));
117 DCHECK(!write_pending_);
118
119 if (written_end_of_stream_) {
120 LOG(ERROR) << "Writing after end of stream is written.";
121 base::ThreadTaskRunnerHandle::Get()->PostTask(
122 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError,
123 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
124 return;
125 }
126
127 write_pending_ = true;
128 written_end_of_stream_ = end_stream;
129 if (MaybeHandleStreamClosedInSendData())
130 return;
131
132 DCHECK(!stream_closed_);
133 stream_->SendData(data.get(), length,
134 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
135 }
136
137 void BidirectionalStreamSpdyImpl::SendvData(
138 const std::vector<scoped_refptr<IOBuffer>>& buffers,
139 const std::vector<int>& lengths,
140 bool end_stream) {
141 DCHECK_EQ(buffers.size(), lengths.size());
142 DCHECK(!write_pending_);
143
144 if (written_end_of_stream_) {
145 LOG(ERROR) << "Writing after end of stream is written.";
146 base::ThreadTaskRunnerHandle::Get()->PostTask(
147 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError,
148 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
149 return;
150 }
151
152 write_pending_ = true;
153 written_end_of_stream_ = end_stream;
154 if (MaybeHandleStreamClosedInSendData())
155 return;
156
157 DCHECK(!stream_closed_);
158 int total_len = 0;
159 for (int len : lengths) {
160 total_len += len;
161 }
162
163 pending_combined_buffer_ = new net::IOBuffer(total_len);
164 int len = 0;
165 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames.
166 for (size_t i = 0; i < buffers.size(); ++i) {
167 memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(),
168 lengths[i]);
169 len += lengths[i];
170 }
171 stream_->SendData(pending_combined_buffer_.get(), total_len,
172 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
173 }
174
175 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const {
176 return negotiated_protocol_;
177 }
178
179 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const {
180 if (stream_closed_)
181 return closed_stream_received_bytes_;
182
183 if (!stream_)
184 return 0;
185
186 return stream_->raw_received_bytes();
187 }
188
189 int64_t BidirectionalStreamSpdyImpl::GetTotalSentBytes() const {
190 if (stream_closed_)
191 return closed_stream_sent_bytes_;
192
193 if (!stream_)
194 return 0;
195
196 return stream_->raw_sent_bytes();
197 }
198
199 bool BidirectionalStreamSpdyImpl::GetLoadTimingInfo(
200 LoadTimingInfo* load_timing_info) const {
201 if (stream_closed_) {
202 if (!closed_has_load_timing_info_)
203 return false;
204 *load_timing_info = closed_load_timing_info_;
205 return true;
206 }
207
208 // If |stream_| isn't created or has ID 0, return false. This is to match
209 // the implementation in SpdyHttpStream.
210 if (!stream_ || stream_->stream_id() == 0)
211 return false;
212
213 return stream_->GetLoadTimingInfo(load_timing_info);
214 }
215
216 void BidirectionalStreamSpdyImpl::OnHeadersSent() {
217 DCHECK(stream_);
218
219 negotiated_protocol_ = kProtoHTTP2;
220 if (delegate_)
221 delegate_->OnStreamReady(/*request_headers_sent=*/true);
222 }
223
224 void BidirectionalStreamSpdyImpl::OnHeadersReceived(
225 const SpdyHeaderBlock& response_headers) {
226 DCHECK(stream_);
227
228 if (delegate_)
229 delegate_->OnHeadersReceived(response_headers);
230 }
231
232 void BidirectionalStreamSpdyImpl::OnDataReceived(
233 std::unique_ptr<SpdyBuffer> buffer) {
234 DCHECK(stream_);
235 DCHECK(!stream_closed_);
236
237 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked
238 // by SpdyStream to indicate the end of stream.
239 if (!buffer)
240 return;
241
242 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust
243 // recv window size accordingly.
244 read_data_queue_.Enqueue(std::move(buffer));
245 if (read_buffer_) {
246 // Handing small chunks of data to the caller creates measurable overhead.
247 // So buffer data in short time-spans and send a single read notification.
248 ScheduleBufferedRead();
249 }
250 }
251
252 void BidirectionalStreamSpdyImpl::OnDataSent() {
253 DCHECK(write_pending_);
254
255 pending_combined_buffer_ = nullptr;
256 write_pending_ = false;
257
258 if (delegate_)
259 delegate_->OnDataSent();
260 }
261
262 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) {
263 DCHECK(stream_);
264 DCHECK(!stream_closed_);
265
266 if (delegate_)
267 delegate_->OnTrailersReceived(trailers);
268 }
269
270 void BidirectionalStreamSpdyImpl::OnClose(int status) {
271 DCHECK(stream_);
272
273 stream_closed_ = true;
274 closed_stream_status_ = status;
275 closed_stream_received_bytes_ = stream_->raw_received_bytes();
276 closed_stream_sent_bytes_ = stream_->raw_sent_bytes();
277 closed_has_load_timing_info_ =
278 stream_->GetLoadTimingInfo(&closed_load_timing_info_);
279
280 if (status != OK) {
281 NotifyError(status);
282 return;
283 }
284 ResetStream();
285 // Complete any remaining read, as all data has been buffered.
286 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will
287 // do nothing.
288 timer_->Stop();
289
290 // |this| might get destroyed after calling into |delegate_| in
291 // DoBufferedRead().
292 auto weak_this = weak_factory_.GetWeakPtr();
293 DoBufferedRead();
294 if (weak_this.get() && write_pending_)
295 OnDataSent();
296 }
297
298 NetLogSource BidirectionalStreamSpdyImpl::source_dependency() const {
299 return source_dependency_;
300 }
301
302 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() {
303 SpdyHeaderBlock headers;
304 HttpRequestInfo http_request_info;
305 http_request_info.url = request_info_->url;
306 http_request_info.method = request_info_->method;
307 http_request_info.extra_headers = request_info_->extra_headers;
308
309 CreateSpdyHeadersFromHttpRequest(
310 http_request_info, http_request_info.extra_headers, true, &headers);
311 written_end_of_stream_ = request_info_->end_stream_on_headers;
312 return stream_->SendRequestHeaders(std::move(headers),
313 request_info_->end_stream_on_headers
314 ? NO_MORE_DATA_TO_SEND
315 : MORE_DATA_TO_SEND);
316 }
317
318 void BidirectionalStreamSpdyImpl::OnStreamInitialized(int rv) {
319 DCHECK_NE(ERR_IO_PENDING, rv);
320 if (rv == OK) {
321 stream_ = stream_request_.ReleaseStream();
322 stream_->SetDelegate(this);
323 rv = SendRequestHeadersHelper();
324 if (rv == OK) {
325 OnHeadersSent();
326 return;
327 } else if (rv == ERR_IO_PENDING) {
328 return;
329 }
330 }
331 NotifyError(rv);
332 }
333
334 void BidirectionalStreamSpdyImpl::NotifyError(int rv) {
335 ResetStream();
336 write_pending_ = false;
337 if (delegate_) {
338 BidirectionalStreamImpl::Delegate* delegate = delegate_;
339 delegate_ = nullptr;
340 // Cancel any pending callback.
341 weak_factory_.InvalidateWeakPtrs();
342 delegate->OnFailed(rv);
343 // |this| can be null when returned from delegate.
344 }
345 }
346
347 void BidirectionalStreamSpdyImpl::ResetStream() {
348 if (!stream_)
349 return;
350 if (!stream_->IsClosed()) {
351 // This sends a RST to the remote.
352 stream_->DetachDelegate();
353 DCHECK(!stream_);
354 } else {
355 // Stream is already closed, so it is not legal to call DetachDelegate.
356 stream_.reset();
357 }
358 }
359
360 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() {
361 // If there is already a scheduled DoBufferedRead, don't issue
362 // another one. Mark that we have received more data and return.
363 if (timer_->IsRunning()) {
364 more_read_data_pending_ = true;
365 return;
366 }
367
368 more_read_data_pending_ = false;
369 timer_->Start(FROM_HERE, base::TimeDelta::FromMilliseconds(kBufferTimeMs),
370 base::Bind(&BidirectionalStreamSpdyImpl::DoBufferedRead,
371 weak_factory_.GetWeakPtr()));
372 }
373
374 void BidirectionalStreamSpdyImpl::DoBufferedRead() {
375 DCHECK(!timer_->IsRunning());
376 // Check to see that the stream has not errored out.
377 DCHECK(stream_ || stream_closed_);
378 DCHECK(!stream_closed_ || closed_stream_status_ == OK);
379
380 // When |more_read_data_pending_| is true, it means that more data has arrived
381 // since started waiting. Wait a little longer and continue to buffer.
382 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
383 ScheduleBufferedRead();
384 return;
385 }
386
387 int rv = 0;
388 if (read_buffer_) {
389 rv = ReadData(read_buffer_.get(), read_buffer_len_);
390 DCHECK_NE(ERR_IO_PENDING, rv);
391 read_buffer_ = nullptr;
392 read_buffer_len_ = 0;
393 if (delegate_)
394 delegate_->OnDataRead(rv);
395 }
396 }
397
398 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const {
399 if (stream_closed_)
400 return false;
401 DCHECK_GT(read_buffer_len_, 0);
402 return read_data_queue_.GetTotalSize() <
403 static_cast<size_t>(read_buffer_len_);
404 }
405
406 bool BidirectionalStreamSpdyImpl::MaybeHandleStreamClosedInSendData() {
407 if (stream_)
408 return false;
409 // If |stream_| is closed without an error before client half closes,
410 // blackhole any pending write data. crbug.com/650438.
411 if (stream_closed_ && closed_stream_status_ == OK) {
412 base::ThreadTaskRunnerHandle::Get()->PostTask(
413 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::OnDataSent,
414 weak_factory_.GetWeakPtr()));
415 return true;
416 }
417 LOG(ERROR) << "Trying to send data after stream has been destroyed.";
418 base::ThreadTaskRunnerHandle::Get()->PostTask(
419 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError,
420 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED));
421 return true;
422 }
423
424 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/bidirectional_stream_spdy_impl.h ('k') | net/spdy/bidirectional_stream_spdy_impl_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698