Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(237)

Side by Side Diff: net/spdy/spdy_stream.cc

Issue 2832973003: Split net/spdy into core and chromium subdirectories. (Closed)
Patch Set: Fix some more build rules. Created 3 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « net/spdy/spdy_stream.h ('k') | net/spdy/spdy_stream_test_util.h » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "net/spdy/spdy_stream.h"
6
7 #include <algorithm>
8 #include <limits>
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/compiler_specific.h"
13 #include "base/location.h"
14 #include "base/logging.h"
15 #include "base/metrics/histogram_macros.h"
16 #include "base/single_thread_task_runner.h"
17 #include "base/strings/string_number_conversions.h"
18 #include "base/strings/string_util.h"
19 #include "base/threading/thread_task_runner_handle.h"
20 #include "base/values.h"
21 #include "net/log/net_log.h"
22 #include "net/log/net_log_capture_mode.h"
23 #include "net/log/net_log_event_type.h"
24 #include "net/spdy/platform/api/spdy_estimate_memory_usage.h"
25 #include "net/spdy/platform/api/spdy_string_piece.h"
26 #include "net/spdy/platform/api/spdy_string_utils.h"
27 #include "net/spdy/spdy_buffer_producer.h"
28 #include "net/spdy/spdy_http_utils.h"
29 #include "net/spdy/spdy_session.h"
30
31 namespace net {
32
33 namespace {
34
35 std::unique_ptr<base::Value> NetLogSpdyStreamErrorCallback(
36 SpdyStreamId stream_id,
37 int status,
38 const SpdyString* description,
39 NetLogCaptureMode /* capture_mode */) {
40 std::unique_ptr<base::DictionaryValue> dict(new base::DictionaryValue());
41 dict->SetInteger("stream_id", static_cast<int>(stream_id));
42 dict->SetInteger("status", status);
43 dict->SetString("description", *description);
44 return std::move(dict);
45 }
46
47 std::unique_ptr<base::Value> NetLogSpdyStreamWindowUpdateCallback(
48 SpdyStreamId stream_id,
49 int32_t delta,
50 int32_t window_size,
51 NetLogCaptureMode /* capture_mode */) {
52 std::unique_ptr<base::DictionaryValue> dict(new base::DictionaryValue());
53 dict->SetInteger("stream_id", stream_id);
54 dict->SetInteger("delta", delta);
55 dict->SetInteger("window_size", window_size);
56 return std::move(dict);
57 }
58
59 bool ContainsUppercaseAscii(SpdyStringPiece str) {
60 return std::any_of(str.begin(), str.end(), base::IsAsciiUpper<char>);
61 }
62
63 } // namespace
64
65 // A wrapper around a stream that calls into ProduceHeadersFrame().
66 class SpdyStream::HeadersBufferProducer : public SpdyBufferProducer {
67 public:
68 explicit HeadersBufferProducer(const base::WeakPtr<SpdyStream>& stream)
69 : stream_(stream) {
70 DCHECK(stream_.get());
71 }
72
73 ~HeadersBufferProducer() override {}
74
75 std::unique_ptr<SpdyBuffer> ProduceBuffer() override {
76 if (!stream_.get()) {
77 NOTREACHED();
78 return std::unique_ptr<SpdyBuffer>();
79 }
80 DCHECK_GT(stream_->stream_id(), 0u);
81 return std::unique_ptr<SpdyBuffer>(
82 new SpdyBuffer(stream_->ProduceHeadersFrame()));
83 }
84 size_t EstimateMemoryUsage() const override { return 0; }
85
86 private:
87 const base::WeakPtr<SpdyStream> stream_;
88 };
89
90 SpdyStream::SpdyStream(SpdyStreamType type,
91 const base::WeakPtr<SpdySession>& session,
92 const GURL& url,
93 RequestPriority priority,
94 int32_t initial_send_window_size,
95 int32_t max_recv_window_size,
96 const NetLogWithSource& net_log)
97 : type_(type),
98 stream_id_(0),
99 url_(url),
100 priority_(priority),
101 send_stalled_by_flow_control_(false),
102 send_window_size_(initial_send_window_size),
103 max_recv_window_size_(max_recv_window_size),
104 recv_window_size_(max_recv_window_size),
105 unacked_recv_window_bytes_(0),
106 session_(session),
107 delegate_(NULL),
108 request_headers_valid_(false),
109 pending_send_status_(MORE_DATA_TO_SEND),
110 request_time_(base::Time::Now()),
111 response_state_(READY_FOR_HEADERS),
112 io_state_(STATE_IDLE),
113 response_status_(OK),
114 net_log_(net_log),
115 raw_received_bytes_(0),
116 raw_sent_bytes_(0),
117 send_bytes_(0),
118 recv_bytes_(0),
119 write_handler_guard_(false),
120 weak_ptr_factory_(this) {
121 CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM ||
122 type_ == SPDY_REQUEST_RESPONSE_STREAM ||
123 type_ == SPDY_PUSH_STREAM);
124 CHECK_GE(priority_, MINIMUM_PRIORITY);
125 CHECK_LE(priority_, MAXIMUM_PRIORITY);
126 }
127
128 SpdyStream::~SpdyStream() {
129 CHECK(!write_handler_guard_);
130 UpdateHistograms();
131 }
132
133 void SpdyStream::SetDelegate(Delegate* delegate) {
134 CHECK(!delegate_);
135 CHECK(delegate);
136 delegate_ = delegate;
137
138 CHECK(io_state_ == STATE_IDLE ||
139 io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED ||
140 io_state_ == STATE_RESERVED_REMOTE);
141
142 if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) {
143 DCHECK_EQ(type_, SPDY_PUSH_STREAM);
144 base::ThreadTaskRunnerHandle::Get()->PostTask(
145 FROM_HERE, base::Bind(&SpdyStream::PushedStreamReplay, GetWeakPtr()));
146 }
147 }
148
149 void SpdyStream::PushedStreamReplay() {
150 DCHECK_EQ(type_, SPDY_PUSH_STREAM);
151 DCHECK_NE(stream_id_, 0u);
152 CHECK_EQ(stream_id_ % 2, 0u);
153
154 CHECK_EQ(io_state_, STATE_HALF_CLOSED_LOCAL_UNCLAIMED);
155 io_state_ = STATE_HALF_CLOSED_LOCAL;
156
157 // The delegate methods called below may delete |this|, so use
158 // |weak_this| to detect that.
159 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
160
161 CHECK(delegate_);
162 delegate_->OnHeadersReceived(response_headers_);
163
164 // OnHeadersReceived() may have closed |this|.
165 if (!weak_this)
166 return;
167
168 while (!pending_recv_data_.empty()) {
169 // Take ownership of the first element of |pending_recv_data_|.
170 std::unique_ptr<SpdyBuffer> buffer = std::move(pending_recv_data_.at(0));
171 pending_recv_data_.erase(pending_recv_data_.begin());
172
173 bool eof = (buffer == NULL);
174
175 CHECK(delegate_);
176 delegate_->OnDataReceived(std::move(buffer));
177
178 // OnDataReceived() may have closed |this|.
179 if (!weak_this)
180 return;
181
182 if (eof) {
183 DCHECK(pending_recv_data_.empty());
184 session_->CloseActiveStream(stream_id_, OK);
185 DCHECK(!weak_this);
186 // |pending_recv_data_| is invalid at this point.
187 break;
188 }
189 }
190 }
191
192 std::unique_ptr<SpdySerializedFrame> SpdyStream::ProduceHeadersFrame() {
193 CHECK_EQ(io_state_, STATE_IDLE);
194 CHECK(request_headers_valid_);
195 CHECK_GT(stream_id_, 0u);
196
197 SpdyControlFlags flags =
198 (pending_send_status_ == NO_MORE_DATA_TO_SEND) ?
199 CONTROL_FLAG_FIN : CONTROL_FLAG_NONE;
200 std::unique_ptr<SpdySerializedFrame> frame(session_->CreateHeaders(
201 stream_id_, priority_, flags, std::move(request_headers_),
202 delegate_->source_dependency()));
203 request_headers_valid_ = false;
204 send_time_ = base::TimeTicks::Now();
205 return frame;
206 }
207
208 void SpdyStream::DetachDelegate() {
209 DCHECK(!IsClosed());
210 delegate_ = NULL;
211 Cancel();
212 }
213
214 void SpdyStream::AdjustSendWindowSize(int32_t delta_window_size) {
215 if (IsClosed())
216 return;
217
218 // Check for wraparound.
219 if (send_window_size_ > 0) {
220 DCHECK_LE(delta_window_size,
221 std::numeric_limits<int32_t>::max() - send_window_size_);
222 }
223 if (send_window_size_ < 0) {
224 DCHECK_GE(delta_window_size,
225 std::numeric_limits<int32_t>::min() - send_window_size_);
226 }
227 send_window_size_ += delta_window_size;
228 PossiblyResumeIfSendStalled();
229 }
230
231 void SpdyStream::OnWriteBufferConsumed(
232 size_t frame_payload_size,
233 size_t consume_size,
234 SpdyBuffer::ConsumeSource consume_source) {
235 if (consume_source == SpdyBuffer::DISCARD) {
236 // If we're discarding a frame or part of it, increase the send
237 // window by the number of discarded bytes. (Although if we're
238 // discarding part of a frame, it's probably because of a write
239 // error and we'll be tearing down the stream soon.)
240 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size);
241 DCHECK_GT(remaining_payload_bytes, 0u);
242 IncreaseSendWindowSize(static_cast<int32_t>(remaining_payload_bytes));
243 }
244 // For consumed bytes, the send window is increased when we receive
245 // a WINDOW_UPDATE frame.
246 }
247
248 void SpdyStream::IncreaseSendWindowSize(int32_t delta_window_size) {
249 DCHECK_GE(delta_window_size, 1);
250
251 // Ignore late WINDOW_UPDATEs.
252 if (IsClosed())
253 return;
254
255 if (send_window_size_ > 0) {
256 // Check for overflow.
257 int32_t max_delta_window_size =
258 std::numeric_limits<int32_t>::max() - send_window_size_;
259 if (delta_window_size > max_delta_window_size) {
260 SpdyString desc = SpdyStringPrintf(
261 "Received WINDOW_UPDATE [delta: %d] for stream %d overflows "
262 "send_window_size_ [current: %d]",
263 delta_window_size, stream_id_, send_window_size_);
264 session_->ResetStream(stream_id_, ERROR_CODE_FLOW_CONTROL_ERROR, desc);
265 return;
266 }
267 }
268
269 send_window_size_ += delta_window_size;
270
271 net_log_.AddEvent(
272 NetLogEventType::HTTP2_STREAM_UPDATE_SEND_WINDOW,
273 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, stream_id_,
274 delta_window_size, send_window_size_));
275
276 PossiblyResumeIfSendStalled();
277 }
278
279 void SpdyStream::DecreaseSendWindowSize(int32_t delta_window_size) {
280 if (IsClosed())
281 return;
282
283 // We only call this method when sending a frame. Therefore,
284 // |delta_window_size| should be within the valid frame size range.
285 DCHECK_GE(delta_window_size, 1);
286 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize);
287
288 // |send_window_size_| should have been at least |delta_window_size| for
289 // this call to happen.
290 DCHECK_GE(send_window_size_, delta_window_size);
291
292 send_window_size_ -= delta_window_size;
293
294 net_log_.AddEvent(
295 NetLogEventType::HTTP2_STREAM_UPDATE_SEND_WINDOW,
296 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, stream_id_,
297 -delta_window_size, send_window_size_));
298 }
299
300 void SpdyStream::OnReadBufferConsumed(
301 size_t consume_size,
302 SpdyBuffer::ConsumeSource consume_source) {
303 DCHECK_GE(consume_size, 1u);
304 DCHECK_LE(consume_size,
305 static_cast<size_t>(std::numeric_limits<int32_t>::max()));
306 IncreaseRecvWindowSize(static_cast<int32_t>(consume_size));
307 }
308
309 void SpdyStream::IncreaseRecvWindowSize(int32_t delta_window_size) {
310 // By the time a read is processed by the delegate, this stream may
311 // already be inactive.
312 if (!session_->IsStreamActive(stream_id_))
313 return;
314
315 DCHECK_GE(unacked_recv_window_bytes_, 0);
316 DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_);
317 DCHECK_GE(delta_window_size, 1);
318 // Check for overflow.
319 DCHECK_LE(delta_window_size,
320 std::numeric_limits<int32_t>::max() - recv_window_size_);
321
322 recv_window_size_ += delta_window_size;
323 net_log_.AddEvent(
324 NetLogEventType::HTTP2_STREAM_UPDATE_RECV_WINDOW,
325 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, stream_id_,
326 delta_window_size, recv_window_size_));
327
328 unacked_recv_window_bytes_ += delta_window_size;
329 if (unacked_recv_window_bytes_ > max_recv_window_size_ / 2) {
330 session_->SendStreamWindowUpdate(
331 stream_id_, static_cast<uint32_t>(unacked_recv_window_bytes_));
332 unacked_recv_window_bytes_ = 0;
333 }
334 }
335
336 void SpdyStream::DecreaseRecvWindowSize(int32_t delta_window_size) {
337 DCHECK(session_->IsStreamActive(stream_id_));
338 DCHECK_GE(delta_window_size, 1);
339
340 // The receiving window size as the peer knows it is
341 // |recv_window_size_ - unacked_recv_window_bytes_|, if more data are sent by
342 // the peer, that means that the receive window is not being respected.
343 if (delta_window_size > recv_window_size_ - unacked_recv_window_bytes_) {
344 session_->ResetStream(
345 stream_id_, ERROR_CODE_FLOW_CONTROL_ERROR,
346 "delta_window_size is " + base::IntToString(delta_window_size) +
347 " in DecreaseRecvWindowSize, which is larger than the receive " +
348 "window size of " + base::IntToString(recv_window_size_));
349 return;
350 }
351
352 recv_window_size_ -= delta_window_size;
353 net_log_.AddEvent(
354 NetLogEventType::HTTP2_STREAM_UPDATE_RECV_WINDOW,
355 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, stream_id_,
356 -delta_window_size, recv_window_size_));
357 }
358
359 int SpdyStream::GetPeerAddress(IPEndPoint* address) const {
360 return session_->GetPeerAddress(address);
361 }
362
363 int SpdyStream::GetLocalAddress(IPEndPoint* address) const {
364 return session_->GetLocalAddress(address);
365 }
366
367 bool SpdyStream::WasEverUsed() const {
368 return session_->WasEverUsed();
369 }
370
371 base::Time SpdyStream::GetRequestTime() const {
372 return request_time_;
373 }
374
375 void SpdyStream::SetRequestTime(base::Time t) {
376 request_time_ = t;
377 }
378
379 void SpdyStream::OnHeadersReceived(const SpdyHeaderBlock& response_headers,
380 base::Time response_time,
381 base::TimeTicks recv_first_byte_time) {
382 switch (response_state_) {
383 case READY_FOR_HEADERS:
384 // No header block has been received yet.
385 DCHECK(response_headers_.empty());
386
387 {
388 SpdyHeaderBlock::const_iterator it = response_headers.find(":status");
389 if (it == response_headers.end()) {
390 const SpdyString error("Response headers do not include :status.");
391 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
392 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error);
393 return;
394 }
395
396 int status;
397 if (!StringToInt(it->second, &status)) {
398 const SpdyString error("Cannot parse :status.");
399 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
400 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error);
401 return;
402 }
403
404 // Ignore informational headers.
405 // TODO(bnc): Add support for 103 Early Hints, https://crbug.com/671310.
406 if (status / 100 == 1) {
407 return;
408 }
409 }
410
411 response_state_ = READY_FOR_DATA_OR_TRAILERS;
412
413 switch (type_) {
414 case SPDY_BIDIRECTIONAL_STREAM:
415 case SPDY_REQUEST_RESPONSE_STREAM:
416 // A bidirectional stream or a request/response stream is ready for
417 // the response headers only after request headers are sent.
418 if (io_state_ == STATE_IDLE) {
419 const SpdyString error("Response received before request sent.");
420 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
421 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error);
422 return;
423 }
424 break;
425
426 case SPDY_PUSH_STREAM:
427 // Push streams transition to a locally half-closed state upon
428 // headers. We must continue to buffer data while waiting for a call
429 // to SetDelegate() (which may not ever happen).
430 DCHECK_EQ(io_state_, STATE_RESERVED_REMOTE);
431 if (!delegate_) {
432 io_state_ = STATE_HALF_CLOSED_LOCAL_UNCLAIMED;
433 } else {
434 io_state_ = STATE_HALF_CLOSED_LOCAL;
435 }
436 break;
437 }
438
439 DCHECK_NE(io_state_, STATE_IDLE);
440
441 response_time_ = response_time;
442 recv_first_byte_time_ = recv_first_byte_time;
443 SaveResponseHeaders(response_headers);
444
445 break;
446
447 case READY_FOR_DATA_OR_TRAILERS:
448 // Second header block is trailers.
449 if (type_ == SPDY_PUSH_STREAM) {
450 const SpdyString error("Trailers not supported for push stream.");
451 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
452 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error);
453 return;
454 }
455
456 response_state_ = TRAILERS_RECEIVED;
457 delegate_->OnTrailers(response_headers);
458 break;
459
460 case TRAILERS_RECEIVED:
461 // No further header blocks are allowed after trailers.
462 const SpdyString error("Header block received after trailers.");
463 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
464 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error);
465 break;
466 }
467 }
468
469 void SpdyStream::OnPushPromiseHeadersReceived(SpdyHeaderBlock headers) {
470 CHECK(!request_headers_valid_);
471 CHECK_EQ(io_state_, STATE_IDLE);
472 CHECK_EQ(type_, SPDY_PUSH_STREAM);
473 DCHECK(!delegate_);
474
475 io_state_ = STATE_RESERVED_REMOTE;
476 request_headers_ = std::move(headers);
477 request_headers_valid_ = true;
478 url_from_header_block_ = GetUrlFromHeaderBlock(request_headers_);
479 }
480
481 void SpdyStream::OnDataReceived(std::unique_ptr<SpdyBuffer> buffer) {
482 DCHECK(session_->IsStreamActive(stream_id_));
483
484 if (response_state_ == READY_FOR_HEADERS) {
485 const SpdyString error("DATA received before headers.");
486 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
487 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error);
488 return;
489 }
490
491 if (response_state_ == TRAILERS_RECEIVED && buffer) {
492 const SpdyString error("DATA received after trailers.");
493 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error);
494 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error);
495 return;
496 }
497
498 // Track our bandwidth.
499 recv_bytes_ += buffer ? buffer->GetRemainingSize() : 0;
500 recv_last_byte_time_ = base::TimeTicks::Now();
501
502 // If we're still buffering data for a push stream, we will do the check for
503 // data received with incomplete headers in PushedStreamReplay().
504 if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) {
505 DCHECK_EQ(type_, SPDY_PUSH_STREAM);
506 // It should be valid for this to happen in the server push case.
507 // We'll return received data when delegate gets attached to the stream.
508 if (buffer) {
509 pending_recv_data_.push_back(std::move(buffer));
510 } else {
511 pending_recv_data_.push_back(NULL);
512 // Note: we leave the stream open in the session until the stream
513 // is claimed.
514 }
515 return;
516 }
517
518 CHECK(!IsClosed());
519
520 if (!buffer) {
521 if (io_state_ == STATE_OPEN) {
522 io_state_ = STATE_HALF_CLOSED_REMOTE;
523 } else if (io_state_ == STATE_HALF_CLOSED_LOCAL) {
524 io_state_ = STATE_CLOSED;
525 // Deletes |this|.
526 session_->CloseActiveStream(stream_id_, OK);
527 } else {
528 NOTREACHED() << io_state_;
529 }
530 return;
531 }
532
533 size_t length = buffer->GetRemainingSize();
534 DCHECK_LE(length, session_->GetDataFrameMaximumPayload());
535 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
536 // May close the stream.
537 DecreaseRecvWindowSize(static_cast<int32_t>(length));
538 if (!weak_this)
539 return;
540 buffer->AddConsumeCallback(
541 base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr()));
542
543 // May close |this|.
544 delegate_->OnDataReceived(std::move(buffer));
545 }
546
547 void SpdyStream::OnPaddingConsumed(size_t len) {
548 // Decrease window size because padding bytes are received.
549 // Increase window size because padding bytes are consumed (by discarding).
550 // Net result: |unacked_recv_window_bytes_| increases by |len|,
551 // |recv_window_size_| does not change.
552 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
553 // May close the stream.
554 DecreaseRecvWindowSize(static_cast<int32_t>(len));
555 if (!weak_this)
556 return;
557 IncreaseRecvWindowSize(static_cast<int32_t>(len));
558 }
559
560 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type,
561 size_t frame_size) {
562 // PRIORITY writes are allowed at any time and do not trigger a state update.
563 if (frame_type == SpdyFrameType::PRIORITY) {
564 return;
565 }
566
567 DCHECK_NE(type_, SPDY_PUSH_STREAM);
568 CHECK(frame_type == SpdyFrameType::HEADERS ||
569 frame_type == SpdyFrameType::DATA)
570 << frame_type;
571
572 int result = (frame_type == SpdyFrameType::HEADERS) ? OnHeadersSent()
573 : OnDataSent(frame_size);
574 if (result == ERR_IO_PENDING) {
575 // The write operation hasn't completed yet.
576 return;
577 }
578
579 if (pending_send_status_ == NO_MORE_DATA_TO_SEND) {
580 if (io_state_ == STATE_OPEN) {
581 io_state_ = STATE_HALF_CLOSED_LOCAL;
582 } else if (io_state_ == STATE_HALF_CLOSED_REMOTE) {
583 io_state_ = STATE_CLOSED;
584 } else {
585 NOTREACHED() << io_state_;
586 }
587 }
588 // Notify delegate of write completion. Must not destroy |this|.
589 CHECK(delegate_);
590 {
591 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr();
592 write_handler_guard_ = true;
593 if (frame_type == SpdyFrameType::HEADERS) {
594 delegate_->OnHeadersSent();
595 } else {
596 delegate_->OnDataSent();
597 }
598 CHECK(weak_this);
599 write_handler_guard_ = false;
600 }
601
602 if (io_state_ == STATE_CLOSED) {
603 // Deletes |this|.
604 session_->CloseActiveStream(stream_id_, OK);
605 }
606 }
607
608 int SpdyStream::OnHeadersSent() {
609 CHECK_EQ(io_state_, STATE_IDLE);
610 CHECK_NE(stream_id_, 0u);
611
612 io_state_ = STATE_OPEN;
613 return OK;
614 }
615
616 int SpdyStream::OnDataSent(size_t frame_size) {
617 CHECK(io_state_ == STATE_OPEN ||
618 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
619
620 size_t frame_payload_size = frame_size - session_->GetDataFrameMinimumSize();
621
622 CHECK_GE(frame_size, session_->GetDataFrameMinimumSize());
623 CHECK_LE(frame_payload_size, session_->GetDataFrameMaximumPayload());
624
625 send_bytes_ += frame_payload_size;
626
627 // If more data is available to send, dispatch it and
628 // return that the write operation is still ongoing.
629 pending_send_data_->DidConsume(frame_payload_size);
630 if (pending_send_data_->BytesRemaining() > 0) {
631 QueueNextDataFrame();
632 return ERR_IO_PENDING;
633 } else {
634 pending_send_data_ = NULL;
635 return OK;
636 }
637 }
638
639 void SpdyStream::LogStreamError(int status, const SpdyString& description) {
640 net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_ERROR,
641 base::Bind(&NetLogSpdyStreamErrorCallback, stream_id_,
642 status, &description));
643 }
644
645 void SpdyStream::OnClose(int status) {
646 // In most cases, the stream should already be CLOSED. The exception is when a
647 // SpdySession is shutting down while the stream is in an intermediate state.
648 io_state_ = STATE_CLOSED;
649 if (status == ERR_SPDY_RST_STREAM_NO_ERROR_RECEIVED) {
650 if (response_state_ == READY_FOR_HEADERS) {
651 status = ERR_SPDY_PROTOCOL_ERROR;
652 } else {
653 status = OK;
654 }
655 }
656 response_status_ = status;
657 Delegate* delegate = delegate_;
658 delegate_ = NULL;
659 if (delegate)
660 delegate->OnClose(status);
661 // Unset |stream_id_| last so that the delegate can look it up.
662 stream_id_ = 0;
663 }
664
665 void SpdyStream::Cancel() {
666 // We may be called again from a delegate's OnClose().
667 if (io_state_ == STATE_CLOSED)
668 return;
669
670 if (stream_id_ != 0) {
671 session_->ResetStream(stream_id_, ERROR_CODE_CANCEL, SpdyString());
672 } else {
673 session_->CloseCreatedStream(GetWeakPtr(), ERROR_CODE_CANCEL);
674 }
675 // |this| is invalid at this point.
676 }
677
678 void SpdyStream::Close() {
679 // We may be called again from a delegate's OnClose().
680 if (io_state_ == STATE_CLOSED)
681 return;
682
683 if (stream_id_ != 0) {
684 session_->CloseActiveStream(stream_id_, OK);
685 } else {
686 session_->CloseCreatedStream(GetWeakPtr(), OK);
687 }
688 // |this| is invalid at this point.
689 }
690
691 base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() {
692 return weak_ptr_factory_.GetWeakPtr();
693 }
694
695 int SpdyStream::SendRequestHeaders(SpdyHeaderBlock request_headers,
696 SpdySendStatus send_status) {
697 CHECK_NE(type_, SPDY_PUSH_STREAM);
698 CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
699 CHECK(!request_headers_valid_);
700 CHECK(!pending_send_data_.get());
701 CHECK_EQ(io_state_, STATE_IDLE);
702 request_headers_ = std::move(request_headers);
703 request_headers_valid_ = true;
704 url_from_header_block_ = GetUrlFromHeaderBlock(request_headers_);
705 pending_send_status_ = send_status;
706 session_->EnqueueStreamWrite(GetWeakPtr(), SpdyFrameType::HEADERS,
707 std::unique_ptr<SpdyBufferProducer>(
708 new HeadersBufferProducer(GetWeakPtr())));
709 return ERR_IO_PENDING;
710 }
711
712 void SpdyStream::SendData(IOBuffer* data,
713 int length,
714 SpdySendStatus send_status) {
715 CHECK_NE(type_, SPDY_PUSH_STREAM);
716 CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND);
717 CHECK(io_state_ == STATE_OPEN ||
718 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
719 CHECK(!pending_send_data_.get());
720 pending_send_data_ = new DrainableIOBuffer(data, length);
721 pending_send_status_ = send_status;
722 QueueNextDataFrame();
723 }
724
725 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info) const {
726 return session_->GetSSLInfo(ssl_info);
727 }
728
729 bool SpdyStream::WasAlpnNegotiated() const {
730 return session_->WasAlpnNegotiated();
731 }
732
733 NextProto SpdyStream::GetNegotiatedProtocol() const {
734 return session_->GetNegotiatedProtocol();
735 }
736
737 void SpdyStream::PossiblyResumeIfSendStalled() {
738 if (IsLocallyClosed()) {
739 return;
740 }
741 if (send_stalled_by_flow_control_ && !session_->IsSendStalled() &&
742 send_window_size_ > 0) {
743 net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_FLOW_CONTROL_UNSTALLED,
744 NetLog::IntCallback("stream_id", stream_id_));
745 send_stalled_by_flow_control_ = false;
746 QueueNextDataFrame();
747 }
748 }
749
750 bool SpdyStream::IsClosed() const {
751 return io_state_ == STATE_CLOSED;
752 }
753
754 bool SpdyStream::IsLocallyClosed() const {
755 return io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED ||
756 io_state_ == STATE_HALF_CLOSED_LOCAL ||
757 io_state_ == STATE_CLOSED;
758 }
759
760 bool SpdyStream::IsIdle() const {
761 return io_state_ == STATE_IDLE;
762 }
763
764 bool SpdyStream::IsOpen() const {
765 return io_state_ == STATE_OPEN;
766 }
767
768 bool SpdyStream::IsReservedRemote() const {
769 return io_state_ == STATE_RESERVED_REMOTE;
770 }
771
772 void SpdyStream::AddRawReceivedBytes(size_t received_bytes) {
773 raw_received_bytes_ += received_bytes;
774 }
775
776 void SpdyStream::AddRawSentBytes(size_t sent_bytes) {
777 raw_sent_bytes_ += sent_bytes;
778 }
779
780 bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
781 if (stream_id_ == 0)
782 return false;
783 bool result = session_->GetLoadTimingInfo(stream_id_, load_timing_info);
784 if (type_ == SPDY_PUSH_STREAM) {
785 load_timing_info->push_start = recv_first_byte_time_;
786 bool done_receiving = IsClosed() || (!pending_recv_data_.empty() &&
787 !pending_recv_data_.back());
788 if (done_receiving)
789 load_timing_info->push_end = recv_last_byte_time_;
790 }
791 return result;
792 }
793
794 size_t SpdyStream::EstimateMemoryUsage() const {
795 // TODO(xunjieli): https://crbug.com/669108. Estimate |pending_send_data_|
796 // once scoped_refptr support is in.
797 return SpdyEstimateMemoryUsage(url_) +
798 SpdyEstimateMemoryUsage(request_headers_) +
799 SpdyEstimateMemoryUsage(url_from_header_block_) +
800 SpdyEstimateMemoryUsage(pending_recv_data_) +
801 SpdyEstimateMemoryUsage(response_headers_);
802 }
803
804 void SpdyStream::UpdateHistograms() {
805 // We need at least the receive timers to be filled in, as otherwise
806 // metrics can be bogus.
807 if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null())
808 return;
809
810 base::TimeTicks effective_send_time;
811 if (type_ == SPDY_PUSH_STREAM) {
812 // Push streams shouldn't have |send_time_| filled in.
813 DCHECK(send_time_.is_null());
814 effective_send_time = recv_first_byte_time_;
815 } else {
816 // For non-push streams, we also need |send_time_| to be filled
817 // in.
818 if (send_time_.is_null())
819 return;
820 effective_send_time = send_time_;
821 }
822
823 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte",
824 recv_first_byte_time_ - effective_send_time);
825 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime",
826 recv_last_byte_time_ - recv_first_byte_time_);
827 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime",
828 recv_last_byte_time_ - effective_send_time);
829
830 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_);
831 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_);
832 }
833
834 void SpdyStream::QueueNextDataFrame() {
835 // Until the request has been completely sent, we cannot be sure
836 // that our stream_id is correct.
837 CHECK(io_state_ == STATE_OPEN ||
838 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_;
839 CHECK_GT(stream_id_, 0u);
840 CHECK(pending_send_data_.get());
841 // Only the final fame may have a length of 0.
842 if (pending_send_status_ == NO_MORE_DATA_TO_SEND) {
843 CHECK_GE(pending_send_data_->BytesRemaining(), 0);
844 } else {
845 CHECK_GT(pending_send_data_->BytesRemaining(), 0);
846 }
847
848 SpdyDataFlags flags =
849 (pending_send_status_ == NO_MORE_DATA_TO_SEND) ?
850 DATA_FLAG_FIN : DATA_FLAG_NONE;
851 std::unique_ptr<SpdyBuffer> data_buffer(
852 session_->CreateDataBuffer(stream_id_, pending_send_data_.get(),
853 pending_send_data_->BytesRemaining(), flags));
854 // We'll get called again by PossiblyResumeIfSendStalled().
855 if (!data_buffer)
856 return;
857
858 DCHECK_GE(data_buffer->GetRemainingSize(),
859 session_->GetDataFrameMinimumSize());
860 size_t payload_size =
861 data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize();
862 DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload());
863
864 // Send window size is based on payload size, so nothing to do if this is
865 // just a FIN with no payload.
866 if (payload_size != 0) {
867 DecreaseSendWindowSize(static_cast<int32_t>(payload_size));
868 // This currently isn't strictly needed, since write frames are
869 // discarded only if the stream is about to be closed. But have it
870 // here anyway just in case this changes.
871 data_buffer->AddConsumeCallback(base::Bind(
872 &SpdyStream::OnWriteBufferConsumed, GetWeakPtr(), payload_size));
873 }
874
875 session_->EnqueueStreamWrite(
876 GetWeakPtr(), SpdyFrameType::DATA,
877 std::unique_ptr<SpdyBufferProducer>(
878 new SimpleBufferProducer(std::move(data_buffer))));
879 }
880
881 void SpdyStream::SaveResponseHeaders(const SpdyHeaderBlock& response_headers) {
882 DCHECK(response_headers_.empty());
883 if (response_headers.find("transfer-encoding") != response_headers.end()) {
884 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR,
885 "Received transfer-encoding header");
886 return;
887 }
888
889 for (SpdyHeaderBlock::const_iterator it = response_headers.begin();
890 it != response_headers.end(); ++it) {
891 // Disallow uppercase headers.
892 if (ContainsUppercaseAscii(it->first)) {
893 session_->ResetStream(
894 stream_id_, ERROR_CODE_PROTOCOL_ERROR,
895 "Upper case characters in header: " + it->first.as_string());
896 return;
897 }
898
899 response_headers_.insert(*it);
900 }
901
902 // If delegate is not yet attached, OnHeadersReceived() will be called after
903 // the delegate gets attached to the stream.
904 if (delegate_)
905 delegate_->OnHeadersReceived(response_headers_);
906 }
907
908 #define STATE_CASE(s) \
909 case s: \
910 description = SpdyStringPrintf("%s (0x%08X)", #s, s); \
911 break
912
913 SpdyString SpdyStream::DescribeState(State state) {
914 SpdyString description;
915 switch (state) {
916 STATE_CASE(STATE_IDLE);
917 STATE_CASE(STATE_OPEN);
918 STATE_CASE(STATE_HALF_CLOSED_LOCAL_UNCLAIMED);
919 STATE_CASE(STATE_HALF_CLOSED_LOCAL);
920 STATE_CASE(STATE_CLOSED);
921 default:
922 description = SpdyStringPrintf("Unknown state 0x%08X (%u)", state, state);
923 break;
924 }
925 return description;
926 }
927
928 #undef STATE_CASE
929
930 } // namespace net
OLDNEW
« no previous file with comments | « net/spdy/spdy_stream.h ('k') | net/spdy/spdy_stream_test_util.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698