OLD | NEW |
| (Empty) |
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/spdy/spdy_stream.h" | |
6 | |
7 #include <algorithm> | |
8 #include <limits> | |
9 #include <utility> | |
10 | |
11 #include "base/bind.h" | |
12 #include "base/compiler_specific.h" | |
13 #include "base/location.h" | |
14 #include "base/logging.h" | |
15 #include "base/metrics/histogram_macros.h" | |
16 #include "base/single_thread_task_runner.h" | |
17 #include "base/strings/string_number_conversions.h" | |
18 #include "base/strings/string_util.h" | |
19 #include "base/threading/thread_task_runner_handle.h" | |
20 #include "base/values.h" | |
21 #include "net/log/net_log.h" | |
22 #include "net/log/net_log_capture_mode.h" | |
23 #include "net/log/net_log_event_type.h" | |
24 #include "net/spdy/platform/api/spdy_estimate_memory_usage.h" | |
25 #include "net/spdy/platform/api/spdy_string_piece.h" | |
26 #include "net/spdy/platform/api/spdy_string_utils.h" | |
27 #include "net/spdy/spdy_buffer_producer.h" | |
28 #include "net/spdy/spdy_http_utils.h" | |
29 #include "net/spdy/spdy_session.h" | |
30 | |
31 namespace net { | |
32 | |
33 namespace { | |
34 | |
35 std::unique_ptr<base::Value> NetLogSpdyStreamErrorCallback( | |
36 SpdyStreamId stream_id, | |
37 int status, | |
38 const SpdyString* description, | |
39 NetLogCaptureMode /* capture_mode */) { | |
40 std::unique_ptr<base::DictionaryValue> dict(new base::DictionaryValue()); | |
41 dict->SetInteger("stream_id", static_cast<int>(stream_id)); | |
42 dict->SetInteger("status", status); | |
43 dict->SetString("description", *description); | |
44 return std::move(dict); | |
45 } | |
46 | |
47 std::unique_ptr<base::Value> NetLogSpdyStreamWindowUpdateCallback( | |
48 SpdyStreamId stream_id, | |
49 int32_t delta, | |
50 int32_t window_size, | |
51 NetLogCaptureMode /* capture_mode */) { | |
52 std::unique_ptr<base::DictionaryValue> dict(new base::DictionaryValue()); | |
53 dict->SetInteger("stream_id", stream_id); | |
54 dict->SetInteger("delta", delta); | |
55 dict->SetInteger("window_size", window_size); | |
56 return std::move(dict); | |
57 } | |
58 | |
59 bool ContainsUppercaseAscii(SpdyStringPiece str) { | |
60 return std::any_of(str.begin(), str.end(), base::IsAsciiUpper<char>); | |
61 } | |
62 | |
63 } // namespace | |
64 | |
65 // A wrapper around a stream that calls into ProduceHeadersFrame(). | |
66 class SpdyStream::HeadersBufferProducer : public SpdyBufferProducer { | |
67 public: | |
68 explicit HeadersBufferProducer(const base::WeakPtr<SpdyStream>& stream) | |
69 : stream_(stream) { | |
70 DCHECK(stream_.get()); | |
71 } | |
72 | |
73 ~HeadersBufferProducer() override {} | |
74 | |
75 std::unique_ptr<SpdyBuffer> ProduceBuffer() override { | |
76 if (!stream_.get()) { | |
77 NOTREACHED(); | |
78 return std::unique_ptr<SpdyBuffer>(); | |
79 } | |
80 DCHECK_GT(stream_->stream_id(), 0u); | |
81 return std::unique_ptr<SpdyBuffer>( | |
82 new SpdyBuffer(stream_->ProduceHeadersFrame())); | |
83 } | |
84 size_t EstimateMemoryUsage() const override { return 0; } | |
85 | |
86 private: | |
87 const base::WeakPtr<SpdyStream> stream_; | |
88 }; | |
89 | |
90 SpdyStream::SpdyStream(SpdyStreamType type, | |
91 const base::WeakPtr<SpdySession>& session, | |
92 const GURL& url, | |
93 RequestPriority priority, | |
94 int32_t initial_send_window_size, | |
95 int32_t max_recv_window_size, | |
96 const NetLogWithSource& net_log) | |
97 : type_(type), | |
98 stream_id_(0), | |
99 url_(url), | |
100 priority_(priority), | |
101 send_stalled_by_flow_control_(false), | |
102 send_window_size_(initial_send_window_size), | |
103 max_recv_window_size_(max_recv_window_size), | |
104 recv_window_size_(max_recv_window_size), | |
105 unacked_recv_window_bytes_(0), | |
106 session_(session), | |
107 delegate_(NULL), | |
108 request_headers_valid_(false), | |
109 pending_send_status_(MORE_DATA_TO_SEND), | |
110 request_time_(base::Time::Now()), | |
111 response_state_(READY_FOR_HEADERS), | |
112 io_state_(STATE_IDLE), | |
113 response_status_(OK), | |
114 net_log_(net_log), | |
115 raw_received_bytes_(0), | |
116 raw_sent_bytes_(0), | |
117 send_bytes_(0), | |
118 recv_bytes_(0), | |
119 write_handler_guard_(false), | |
120 weak_ptr_factory_(this) { | |
121 CHECK(type_ == SPDY_BIDIRECTIONAL_STREAM || | |
122 type_ == SPDY_REQUEST_RESPONSE_STREAM || | |
123 type_ == SPDY_PUSH_STREAM); | |
124 CHECK_GE(priority_, MINIMUM_PRIORITY); | |
125 CHECK_LE(priority_, MAXIMUM_PRIORITY); | |
126 } | |
127 | |
128 SpdyStream::~SpdyStream() { | |
129 CHECK(!write_handler_guard_); | |
130 UpdateHistograms(); | |
131 } | |
132 | |
133 void SpdyStream::SetDelegate(Delegate* delegate) { | |
134 CHECK(!delegate_); | |
135 CHECK(delegate); | |
136 delegate_ = delegate; | |
137 | |
138 CHECK(io_state_ == STATE_IDLE || | |
139 io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED || | |
140 io_state_ == STATE_RESERVED_REMOTE); | |
141 | |
142 if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) { | |
143 DCHECK_EQ(type_, SPDY_PUSH_STREAM); | |
144 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
145 FROM_HERE, base::Bind(&SpdyStream::PushedStreamReplay, GetWeakPtr())); | |
146 } | |
147 } | |
148 | |
149 void SpdyStream::PushedStreamReplay() { | |
150 DCHECK_EQ(type_, SPDY_PUSH_STREAM); | |
151 DCHECK_NE(stream_id_, 0u); | |
152 CHECK_EQ(stream_id_ % 2, 0u); | |
153 | |
154 CHECK_EQ(io_state_, STATE_HALF_CLOSED_LOCAL_UNCLAIMED); | |
155 io_state_ = STATE_HALF_CLOSED_LOCAL; | |
156 | |
157 // The delegate methods called below may delete |this|, so use | |
158 // |weak_this| to detect that. | |
159 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); | |
160 | |
161 CHECK(delegate_); | |
162 delegate_->OnHeadersReceived(response_headers_); | |
163 | |
164 // OnHeadersReceived() may have closed |this|. | |
165 if (!weak_this) | |
166 return; | |
167 | |
168 while (!pending_recv_data_.empty()) { | |
169 // Take ownership of the first element of |pending_recv_data_|. | |
170 std::unique_ptr<SpdyBuffer> buffer = std::move(pending_recv_data_.at(0)); | |
171 pending_recv_data_.erase(pending_recv_data_.begin()); | |
172 | |
173 bool eof = (buffer == NULL); | |
174 | |
175 CHECK(delegate_); | |
176 delegate_->OnDataReceived(std::move(buffer)); | |
177 | |
178 // OnDataReceived() may have closed |this|. | |
179 if (!weak_this) | |
180 return; | |
181 | |
182 if (eof) { | |
183 DCHECK(pending_recv_data_.empty()); | |
184 session_->CloseActiveStream(stream_id_, OK); | |
185 DCHECK(!weak_this); | |
186 // |pending_recv_data_| is invalid at this point. | |
187 break; | |
188 } | |
189 } | |
190 } | |
191 | |
192 std::unique_ptr<SpdySerializedFrame> SpdyStream::ProduceHeadersFrame() { | |
193 CHECK_EQ(io_state_, STATE_IDLE); | |
194 CHECK(request_headers_valid_); | |
195 CHECK_GT(stream_id_, 0u); | |
196 | |
197 SpdyControlFlags flags = | |
198 (pending_send_status_ == NO_MORE_DATA_TO_SEND) ? | |
199 CONTROL_FLAG_FIN : CONTROL_FLAG_NONE; | |
200 std::unique_ptr<SpdySerializedFrame> frame(session_->CreateHeaders( | |
201 stream_id_, priority_, flags, std::move(request_headers_), | |
202 delegate_->source_dependency())); | |
203 request_headers_valid_ = false; | |
204 send_time_ = base::TimeTicks::Now(); | |
205 return frame; | |
206 } | |
207 | |
208 void SpdyStream::DetachDelegate() { | |
209 DCHECK(!IsClosed()); | |
210 delegate_ = NULL; | |
211 Cancel(); | |
212 } | |
213 | |
214 void SpdyStream::AdjustSendWindowSize(int32_t delta_window_size) { | |
215 if (IsClosed()) | |
216 return; | |
217 | |
218 // Check for wraparound. | |
219 if (send_window_size_ > 0) { | |
220 DCHECK_LE(delta_window_size, | |
221 std::numeric_limits<int32_t>::max() - send_window_size_); | |
222 } | |
223 if (send_window_size_ < 0) { | |
224 DCHECK_GE(delta_window_size, | |
225 std::numeric_limits<int32_t>::min() - send_window_size_); | |
226 } | |
227 send_window_size_ += delta_window_size; | |
228 PossiblyResumeIfSendStalled(); | |
229 } | |
230 | |
231 void SpdyStream::OnWriteBufferConsumed( | |
232 size_t frame_payload_size, | |
233 size_t consume_size, | |
234 SpdyBuffer::ConsumeSource consume_source) { | |
235 if (consume_source == SpdyBuffer::DISCARD) { | |
236 // If we're discarding a frame or part of it, increase the send | |
237 // window by the number of discarded bytes. (Although if we're | |
238 // discarding part of a frame, it's probably because of a write | |
239 // error and we'll be tearing down the stream soon.) | |
240 size_t remaining_payload_bytes = std::min(consume_size, frame_payload_size); | |
241 DCHECK_GT(remaining_payload_bytes, 0u); | |
242 IncreaseSendWindowSize(static_cast<int32_t>(remaining_payload_bytes)); | |
243 } | |
244 // For consumed bytes, the send window is increased when we receive | |
245 // a WINDOW_UPDATE frame. | |
246 } | |
247 | |
248 void SpdyStream::IncreaseSendWindowSize(int32_t delta_window_size) { | |
249 DCHECK_GE(delta_window_size, 1); | |
250 | |
251 // Ignore late WINDOW_UPDATEs. | |
252 if (IsClosed()) | |
253 return; | |
254 | |
255 if (send_window_size_ > 0) { | |
256 // Check for overflow. | |
257 int32_t max_delta_window_size = | |
258 std::numeric_limits<int32_t>::max() - send_window_size_; | |
259 if (delta_window_size > max_delta_window_size) { | |
260 SpdyString desc = SpdyStringPrintf( | |
261 "Received WINDOW_UPDATE [delta: %d] for stream %d overflows " | |
262 "send_window_size_ [current: %d]", | |
263 delta_window_size, stream_id_, send_window_size_); | |
264 session_->ResetStream(stream_id_, ERROR_CODE_FLOW_CONTROL_ERROR, desc); | |
265 return; | |
266 } | |
267 } | |
268 | |
269 send_window_size_ += delta_window_size; | |
270 | |
271 net_log_.AddEvent( | |
272 NetLogEventType::HTTP2_STREAM_UPDATE_SEND_WINDOW, | |
273 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, stream_id_, | |
274 delta_window_size, send_window_size_)); | |
275 | |
276 PossiblyResumeIfSendStalled(); | |
277 } | |
278 | |
279 void SpdyStream::DecreaseSendWindowSize(int32_t delta_window_size) { | |
280 if (IsClosed()) | |
281 return; | |
282 | |
283 // We only call this method when sending a frame. Therefore, | |
284 // |delta_window_size| should be within the valid frame size range. | |
285 DCHECK_GE(delta_window_size, 1); | |
286 DCHECK_LE(delta_window_size, kMaxSpdyFrameChunkSize); | |
287 | |
288 // |send_window_size_| should have been at least |delta_window_size| for | |
289 // this call to happen. | |
290 DCHECK_GE(send_window_size_, delta_window_size); | |
291 | |
292 send_window_size_ -= delta_window_size; | |
293 | |
294 net_log_.AddEvent( | |
295 NetLogEventType::HTTP2_STREAM_UPDATE_SEND_WINDOW, | |
296 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, stream_id_, | |
297 -delta_window_size, send_window_size_)); | |
298 } | |
299 | |
300 void SpdyStream::OnReadBufferConsumed( | |
301 size_t consume_size, | |
302 SpdyBuffer::ConsumeSource consume_source) { | |
303 DCHECK_GE(consume_size, 1u); | |
304 DCHECK_LE(consume_size, | |
305 static_cast<size_t>(std::numeric_limits<int32_t>::max())); | |
306 IncreaseRecvWindowSize(static_cast<int32_t>(consume_size)); | |
307 } | |
308 | |
309 void SpdyStream::IncreaseRecvWindowSize(int32_t delta_window_size) { | |
310 // By the time a read is processed by the delegate, this stream may | |
311 // already be inactive. | |
312 if (!session_->IsStreamActive(stream_id_)) | |
313 return; | |
314 | |
315 DCHECK_GE(unacked_recv_window_bytes_, 0); | |
316 DCHECK_GE(recv_window_size_, unacked_recv_window_bytes_); | |
317 DCHECK_GE(delta_window_size, 1); | |
318 // Check for overflow. | |
319 DCHECK_LE(delta_window_size, | |
320 std::numeric_limits<int32_t>::max() - recv_window_size_); | |
321 | |
322 recv_window_size_ += delta_window_size; | |
323 net_log_.AddEvent( | |
324 NetLogEventType::HTTP2_STREAM_UPDATE_RECV_WINDOW, | |
325 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, stream_id_, | |
326 delta_window_size, recv_window_size_)); | |
327 | |
328 unacked_recv_window_bytes_ += delta_window_size; | |
329 if (unacked_recv_window_bytes_ > max_recv_window_size_ / 2) { | |
330 session_->SendStreamWindowUpdate( | |
331 stream_id_, static_cast<uint32_t>(unacked_recv_window_bytes_)); | |
332 unacked_recv_window_bytes_ = 0; | |
333 } | |
334 } | |
335 | |
336 void SpdyStream::DecreaseRecvWindowSize(int32_t delta_window_size) { | |
337 DCHECK(session_->IsStreamActive(stream_id_)); | |
338 DCHECK_GE(delta_window_size, 1); | |
339 | |
340 // The receiving window size as the peer knows it is | |
341 // |recv_window_size_ - unacked_recv_window_bytes_|, if more data are sent by | |
342 // the peer, that means that the receive window is not being respected. | |
343 if (delta_window_size > recv_window_size_ - unacked_recv_window_bytes_) { | |
344 session_->ResetStream( | |
345 stream_id_, ERROR_CODE_FLOW_CONTROL_ERROR, | |
346 "delta_window_size is " + base::IntToString(delta_window_size) + | |
347 " in DecreaseRecvWindowSize, which is larger than the receive " + | |
348 "window size of " + base::IntToString(recv_window_size_)); | |
349 return; | |
350 } | |
351 | |
352 recv_window_size_ -= delta_window_size; | |
353 net_log_.AddEvent( | |
354 NetLogEventType::HTTP2_STREAM_UPDATE_RECV_WINDOW, | |
355 base::Bind(&NetLogSpdyStreamWindowUpdateCallback, stream_id_, | |
356 -delta_window_size, recv_window_size_)); | |
357 } | |
358 | |
359 int SpdyStream::GetPeerAddress(IPEndPoint* address) const { | |
360 return session_->GetPeerAddress(address); | |
361 } | |
362 | |
363 int SpdyStream::GetLocalAddress(IPEndPoint* address) const { | |
364 return session_->GetLocalAddress(address); | |
365 } | |
366 | |
367 bool SpdyStream::WasEverUsed() const { | |
368 return session_->WasEverUsed(); | |
369 } | |
370 | |
371 base::Time SpdyStream::GetRequestTime() const { | |
372 return request_time_; | |
373 } | |
374 | |
375 void SpdyStream::SetRequestTime(base::Time t) { | |
376 request_time_ = t; | |
377 } | |
378 | |
379 void SpdyStream::OnHeadersReceived(const SpdyHeaderBlock& response_headers, | |
380 base::Time response_time, | |
381 base::TimeTicks recv_first_byte_time) { | |
382 switch (response_state_) { | |
383 case READY_FOR_HEADERS: | |
384 // No header block has been received yet. | |
385 DCHECK(response_headers_.empty()); | |
386 | |
387 { | |
388 SpdyHeaderBlock::const_iterator it = response_headers.find(":status"); | |
389 if (it == response_headers.end()) { | |
390 const SpdyString error("Response headers do not include :status."); | |
391 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); | |
392 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error); | |
393 return; | |
394 } | |
395 | |
396 int status; | |
397 if (!StringToInt(it->second, &status)) { | |
398 const SpdyString error("Cannot parse :status."); | |
399 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); | |
400 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error); | |
401 return; | |
402 } | |
403 | |
404 // Ignore informational headers. | |
405 // TODO(bnc): Add support for 103 Early Hints, https://crbug.com/671310. | |
406 if (status / 100 == 1) { | |
407 return; | |
408 } | |
409 } | |
410 | |
411 response_state_ = READY_FOR_DATA_OR_TRAILERS; | |
412 | |
413 switch (type_) { | |
414 case SPDY_BIDIRECTIONAL_STREAM: | |
415 case SPDY_REQUEST_RESPONSE_STREAM: | |
416 // A bidirectional stream or a request/response stream is ready for | |
417 // the response headers only after request headers are sent. | |
418 if (io_state_ == STATE_IDLE) { | |
419 const SpdyString error("Response received before request sent."); | |
420 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); | |
421 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error); | |
422 return; | |
423 } | |
424 break; | |
425 | |
426 case SPDY_PUSH_STREAM: | |
427 // Push streams transition to a locally half-closed state upon | |
428 // headers. We must continue to buffer data while waiting for a call | |
429 // to SetDelegate() (which may not ever happen). | |
430 DCHECK_EQ(io_state_, STATE_RESERVED_REMOTE); | |
431 if (!delegate_) { | |
432 io_state_ = STATE_HALF_CLOSED_LOCAL_UNCLAIMED; | |
433 } else { | |
434 io_state_ = STATE_HALF_CLOSED_LOCAL; | |
435 } | |
436 break; | |
437 } | |
438 | |
439 DCHECK_NE(io_state_, STATE_IDLE); | |
440 | |
441 response_time_ = response_time; | |
442 recv_first_byte_time_ = recv_first_byte_time; | |
443 SaveResponseHeaders(response_headers); | |
444 | |
445 break; | |
446 | |
447 case READY_FOR_DATA_OR_TRAILERS: | |
448 // Second header block is trailers. | |
449 if (type_ == SPDY_PUSH_STREAM) { | |
450 const SpdyString error("Trailers not supported for push stream."); | |
451 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); | |
452 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error); | |
453 return; | |
454 } | |
455 | |
456 response_state_ = TRAILERS_RECEIVED; | |
457 delegate_->OnTrailers(response_headers); | |
458 break; | |
459 | |
460 case TRAILERS_RECEIVED: | |
461 // No further header blocks are allowed after trailers. | |
462 const SpdyString error("Header block received after trailers."); | |
463 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); | |
464 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error); | |
465 break; | |
466 } | |
467 } | |
468 | |
469 void SpdyStream::OnPushPromiseHeadersReceived(SpdyHeaderBlock headers) { | |
470 CHECK(!request_headers_valid_); | |
471 CHECK_EQ(io_state_, STATE_IDLE); | |
472 CHECK_EQ(type_, SPDY_PUSH_STREAM); | |
473 DCHECK(!delegate_); | |
474 | |
475 io_state_ = STATE_RESERVED_REMOTE; | |
476 request_headers_ = std::move(headers); | |
477 request_headers_valid_ = true; | |
478 url_from_header_block_ = GetUrlFromHeaderBlock(request_headers_); | |
479 } | |
480 | |
481 void SpdyStream::OnDataReceived(std::unique_ptr<SpdyBuffer> buffer) { | |
482 DCHECK(session_->IsStreamActive(stream_id_)); | |
483 | |
484 if (response_state_ == READY_FOR_HEADERS) { | |
485 const SpdyString error("DATA received before headers."); | |
486 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); | |
487 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error); | |
488 return; | |
489 } | |
490 | |
491 if (response_state_ == TRAILERS_RECEIVED && buffer) { | |
492 const SpdyString error("DATA received after trailers."); | |
493 LogStreamError(ERR_SPDY_PROTOCOL_ERROR, error); | |
494 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, error); | |
495 return; | |
496 } | |
497 | |
498 // Track our bandwidth. | |
499 recv_bytes_ += buffer ? buffer->GetRemainingSize() : 0; | |
500 recv_last_byte_time_ = base::TimeTicks::Now(); | |
501 | |
502 // If we're still buffering data for a push stream, we will do the check for | |
503 // data received with incomplete headers in PushedStreamReplay(). | |
504 if (io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED) { | |
505 DCHECK_EQ(type_, SPDY_PUSH_STREAM); | |
506 // It should be valid for this to happen in the server push case. | |
507 // We'll return received data when delegate gets attached to the stream. | |
508 if (buffer) { | |
509 pending_recv_data_.push_back(std::move(buffer)); | |
510 } else { | |
511 pending_recv_data_.push_back(NULL); | |
512 // Note: we leave the stream open in the session until the stream | |
513 // is claimed. | |
514 } | |
515 return; | |
516 } | |
517 | |
518 CHECK(!IsClosed()); | |
519 | |
520 if (!buffer) { | |
521 if (io_state_ == STATE_OPEN) { | |
522 io_state_ = STATE_HALF_CLOSED_REMOTE; | |
523 } else if (io_state_ == STATE_HALF_CLOSED_LOCAL) { | |
524 io_state_ = STATE_CLOSED; | |
525 // Deletes |this|. | |
526 session_->CloseActiveStream(stream_id_, OK); | |
527 } else { | |
528 NOTREACHED() << io_state_; | |
529 } | |
530 return; | |
531 } | |
532 | |
533 size_t length = buffer->GetRemainingSize(); | |
534 DCHECK_LE(length, session_->GetDataFrameMaximumPayload()); | |
535 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); | |
536 // May close the stream. | |
537 DecreaseRecvWindowSize(static_cast<int32_t>(length)); | |
538 if (!weak_this) | |
539 return; | |
540 buffer->AddConsumeCallback( | |
541 base::Bind(&SpdyStream::OnReadBufferConsumed, GetWeakPtr())); | |
542 | |
543 // May close |this|. | |
544 delegate_->OnDataReceived(std::move(buffer)); | |
545 } | |
546 | |
547 void SpdyStream::OnPaddingConsumed(size_t len) { | |
548 // Decrease window size because padding bytes are received. | |
549 // Increase window size because padding bytes are consumed (by discarding). | |
550 // Net result: |unacked_recv_window_bytes_| increases by |len|, | |
551 // |recv_window_size_| does not change. | |
552 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); | |
553 // May close the stream. | |
554 DecreaseRecvWindowSize(static_cast<int32_t>(len)); | |
555 if (!weak_this) | |
556 return; | |
557 IncreaseRecvWindowSize(static_cast<int32_t>(len)); | |
558 } | |
559 | |
560 void SpdyStream::OnFrameWriteComplete(SpdyFrameType frame_type, | |
561 size_t frame_size) { | |
562 // PRIORITY writes are allowed at any time and do not trigger a state update. | |
563 if (frame_type == SpdyFrameType::PRIORITY) { | |
564 return; | |
565 } | |
566 | |
567 DCHECK_NE(type_, SPDY_PUSH_STREAM); | |
568 CHECK(frame_type == SpdyFrameType::HEADERS || | |
569 frame_type == SpdyFrameType::DATA) | |
570 << frame_type; | |
571 | |
572 int result = (frame_type == SpdyFrameType::HEADERS) ? OnHeadersSent() | |
573 : OnDataSent(frame_size); | |
574 if (result == ERR_IO_PENDING) { | |
575 // The write operation hasn't completed yet. | |
576 return; | |
577 } | |
578 | |
579 if (pending_send_status_ == NO_MORE_DATA_TO_SEND) { | |
580 if (io_state_ == STATE_OPEN) { | |
581 io_state_ = STATE_HALF_CLOSED_LOCAL; | |
582 } else if (io_state_ == STATE_HALF_CLOSED_REMOTE) { | |
583 io_state_ = STATE_CLOSED; | |
584 } else { | |
585 NOTREACHED() << io_state_; | |
586 } | |
587 } | |
588 // Notify delegate of write completion. Must not destroy |this|. | |
589 CHECK(delegate_); | |
590 { | |
591 base::WeakPtr<SpdyStream> weak_this = GetWeakPtr(); | |
592 write_handler_guard_ = true; | |
593 if (frame_type == SpdyFrameType::HEADERS) { | |
594 delegate_->OnHeadersSent(); | |
595 } else { | |
596 delegate_->OnDataSent(); | |
597 } | |
598 CHECK(weak_this); | |
599 write_handler_guard_ = false; | |
600 } | |
601 | |
602 if (io_state_ == STATE_CLOSED) { | |
603 // Deletes |this|. | |
604 session_->CloseActiveStream(stream_id_, OK); | |
605 } | |
606 } | |
607 | |
608 int SpdyStream::OnHeadersSent() { | |
609 CHECK_EQ(io_state_, STATE_IDLE); | |
610 CHECK_NE(stream_id_, 0u); | |
611 | |
612 io_state_ = STATE_OPEN; | |
613 return OK; | |
614 } | |
615 | |
616 int SpdyStream::OnDataSent(size_t frame_size) { | |
617 CHECK(io_state_ == STATE_OPEN || | |
618 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_; | |
619 | |
620 size_t frame_payload_size = frame_size - session_->GetDataFrameMinimumSize(); | |
621 | |
622 CHECK_GE(frame_size, session_->GetDataFrameMinimumSize()); | |
623 CHECK_LE(frame_payload_size, session_->GetDataFrameMaximumPayload()); | |
624 | |
625 send_bytes_ += frame_payload_size; | |
626 | |
627 // If more data is available to send, dispatch it and | |
628 // return that the write operation is still ongoing. | |
629 pending_send_data_->DidConsume(frame_payload_size); | |
630 if (pending_send_data_->BytesRemaining() > 0) { | |
631 QueueNextDataFrame(); | |
632 return ERR_IO_PENDING; | |
633 } else { | |
634 pending_send_data_ = NULL; | |
635 return OK; | |
636 } | |
637 } | |
638 | |
639 void SpdyStream::LogStreamError(int status, const SpdyString& description) { | |
640 net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_ERROR, | |
641 base::Bind(&NetLogSpdyStreamErrorCallback, stream_id_, | |
642 status, &description)); | |
643 } | |
644 | |
645 void SpdyStream::OnClose(int status) { | |
646 // In most cases, the stream should already be CLOSED. The exception is when a | |
647 // SpdySession is shutting down while the stream is in an intermediate state. | |
648 io_state_ = STATE_CLOSED; | |
649 if (status == ERR_SPDY_RST_STREAM_NO_ERROR_RECEIVED) { | |
650 if (response_state_ == READY_FOR_HEADERS) { | |
651 status = ERR_SPDY_PROTOCOL_ERROR; | |
652 } else { | |
653 status = OK; | |
654 } | |
655 } | |
656 response_status_ = status; | |
657 Delegate* delegate = delegate_; | |
658 delegate_ = NULL; | |
659 if (delegate) | |
660 delegate->OnClose(status); | |
661 // Unset |stream_id_| last so that the delegate can look it up. | |
662 stream_id_ = 0; | |
663 } | |
664 | |
665 void SpdyStream::Cancel() { | |
666 // We may be called again from a delegate's OnClose(). | |
667 if (io_state_ == STATE_CLOSED) | |
668 return; | |
669 | |
670 if (stream_id_ != 0) { | |
671 session_->ResetStream(stream_id_, ERROR_CODE_CANCEL, SpdyString()); | |
672 } else { | |
673 session_->CloseCreatedStream(GetWeakPtr(), ERROR_CODE_CANCEL); | |
674 } | |
675 // |this| is invalid at this point. | |
676 } | |
677 | |
678 void SpdyStream::Close() { | |
679 // We may be called again from a delegate's OnClose(). | |
680 if (io_state_ == STATE_CLOSED) | |
681 return; | |
682 | |
683 if (stream_id_ != 0) { | |
684 session_->CloseActiveStream(stream_id_, OK); | |
685 } else { | |
686 session_->CloseCreatedStream(GetWeakPtr(), OK); | |
687 } | |
688 // |this| is invalid at this point. | |
689 } | |
690 | |
691 base::WeakPtr<SpdyStream> SpdyStream::GetWeakPtr() { | |
692 return weak_ptr_factory_.GetWeakPtr(); | |
693 } | |
694 | |
695 int SpdyStream::SendRequestHeaders(SpdyHeaderBlock request_headers, | |
696 SpdySendStatus send_status) { | |
697 CHECK_NE(type_, SPDY_PUSH_STREAM); | |
698 CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND); | |
699 CHECK(!request_headers_valid_); | |
700 CHECK(!pending_send_data_.get()); | |
701 CHECK_EQ(io_state_, STATE_IDLE); | |
702 request_headers_ = std::move(request_headers); | |
703 request_headers_valid_ = true; | |
704 url_from_header_block_ = GetUrlFromHeaderBlock(request_headers_); | |
705 pending_send_status_ = send_status; | |
706 session_->EnqueueStreamWrite(GetWeakPtr(), SpdyFrameType::HEADERS, | |
707 std::unique_ptr<SpdyBufferProducer>( | |
708 new HeadersBufferProducer(GetWeakPtr()))); | |
709 return ERR_IO_PENDING; | |
710 } | |
711 | |
712 void SpdyStream::SendData(IOBuffer* data, | |
713 int length, | |
714 SpdySendStatus send_status) { | |
715 CHECK_NE(type_, SPDY_PUSH_STREAM); | |
716 CHECK_EQ(pending_send_status_, MORE_DATA_TO_SEND); | |
717 CHECK(io_state_ == STATE_OPEN || | |
718 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_; | |
719 CHECK(!pending_send_data_.get()); | |
720 pending_send_data_ = new DrainableIOBuffer(data, length); | |
721 pending_send_status_ = send_status; | |
722 QueueNextDataFrame(); | |
723 } | |
724 | |
725 bool SpdyStream::GetSSLInfo(SSLInfo* ssl_info) const { | |
726 return session_->GetSSLInfo(ssl_info); | |
727 } | |
728 | |
729 bool SpdyStream::WasAlpnNegotiated() const { | |
730 return session_->WasAlpnNegotiated(); | |
731 } | |
732 | |
733 NextProto SpdyStream::GetNegotiatedProtocol() const { | |
734 return session_->GetNegotiatedProtocol(); | |
735 } | |
736 | |
737 void SpdyStream::PossiblyResumeIfSendStalled() { | |
738 if (IsLocallyClosed()) { | |
739 return; | |
740 } | |
741 if (send_stalled_by_flow_control_ && !session_->IsSendStalled() && | |
742 send_window_size_ > 0) { | |
743 net_log_.AddEvent(NetLogEventType::HTTP2_STREAM_FLOW_CONTROL_UNSTALLED, | |
744 NetLog::IntCallback("stream_id", stream_id_)); | |
745 send_stalled_by_flow_control_ = false; | |
746 QueueNextDataFrame(); | |
747 } | |
748 } | |
749 | |
750 bool SpdyStream::IsClosed() const { | |
751 return io_state_ == STATE_CLOSED; | |
752 } | |
753 | |
754 bool SpdyStream::IsLocallyClosed() const { | |
755 return io_state_ == STATE_HALF_CLOSED_LOCAL_UNCLAIMED || | |
756 io_state_ == STATE_HALF_CLOSED_LOCAL || | |
757 io_state_ == STATE_CLOSED; | |
758 } | |
759 | |
760 bool SpdyStream::IsIdle() const { | |
761 return io_state_ == STATE_IDLE; | |
762 } | |
763 | |
764 bool SpdyStream::IsOpen() const { | |
765 return io_state_ == STATE_OPEN; | |
766 } | |
767 | |
768 bool SpdyStream::IsReservedRemote() const { | |
769 return io_state_ == STATE_RESERVED_REMOTE; | |
770 } | |
771 | |
772 void SpdyStream::AddRawReceivedBytes(size_t received_bytes) { | |
773 raw_received_bytes_ += received_bytes; | |
774 } | |
775 | |
776 void SpdyStream::AddRawSentBytes(size_t sent_bytes) { | |
777 raw_sent_bytes_ += sent_bytes; | |
778 } | |
779 | |
780 bool SpdyStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const { | |
781 if (stream_id_ == 0) | |
782 return false; | |
783 bool result = session_->GetLoadTimingInfo(stream_id_, load_timing_info); | |
784 if (type_ == SPDY_PUSH_STREAM) { | |
785 load_timing_info->push_start = recv_first_byte_time_; | |
786 bool done_receiving = IsClosed() || (!pending_recv_data_.empty() && | |
787 !pending_recv_data_.back()); | |
788 if (done_receiving) | |
789 load_timing_info->push_end = recv_last_byte_time_; | |
790 } | |
791 return result; | |
792 } | |
793 | |
794 size_t SpdyStream::EstimateMemoryUsage() const { | |
795 // TODO(xunjieli): https://crbug.com/669108. Estimate |pending_send_data_| | |
796 // once scoped_refptr support is in. | |
797 return SpdyEstimateMemoryUsage(url_) + | |
798 SpdyEstimateMemoryUsage(request_headers_) + | |
799 SpdyEstimateMemoryUsage(url_from_header_block_) + | |
800 SpdyEstimateMemoryUsage(pending_recv_data_) + | |
801 SpdyEstimateMemoryUsage(response_headers_); | |
802 } | |
803 | |
804 void SpdyStream::UpdateHistograms() { | |
805 // We need at least the receive timers to be filled in, as otherwise | |
806 // metrics can be bogus. | |
807 if (recv_first_byte_time_.is_null() || recv_last_byte_time_.is_null()) | |
808 return; | |
809 | |
810 base::TimeTicks effective_send_time; | |
811 if (type_ == SPDY_PUSH_STREAM) { | |
812 // Push streams shouldn't have |send_time_| filled in. | |
813 DCHECK(send_time_.is_null()); | |
814 effective_send_time = recv_first_byte_time_; | |
815 } else { | |
816 // For non-push streams, we also need |send_time_| to be filled | |
817 // in. | |
818 if (send_time_.is_null()) | |
819 return; | |
820 effective_send_time = send_time_; | |
821 } | |
822 | |
823 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTimeToFirstByte", | |
824 recv_first_byte_time_ - effective_send_time); | |
825 UMA_HISTOGRAM_TIMES("Net.SpdyStreamDownloadTime", | |
826 recv_last_byte_time_ - recv_first_byte_time_); | |
827 UMA_HISTOGRAM_TIMES("Net.SpdyStreamTime", | |
828 recv_last_byte_time_ - effective_send_time); | |
829 | |
830 UMA_HISTOGRAM_COUNTS("Net.SpdySendBytes", send_bytes_); | |
831 UMA_HISTOGRAM_COUNTS("Net.SpdyRecvBytes", recv_bytes_); | |
832 } | |
833 | |
834 void SpdyStream::QueueNextDataFrame() { | |
835 // Until the request has been completely sent, we cannot be sure | |
836 // that our stream_id is correct. | |
837 CHECK(io_state_ == STATE_OPEN || | |
838 io_state_ == STATE_HALF_CLOSED_REMOTE) << io_state_; | |
839 CHECK_GT(stream_id_, 0u); | |
840 CHECK(pending_send_data_.get()); | |
841 // Only the final fame may have a length of 0. | |
842 if (pending_send_status_ == NO_MORE_DATA_TO_SEND) { | |
843 CHECK_GE(pending_send_data_->BytesRemaining(), 0); | |
844 } else { | |
845 CHECK_GT(pending_send_data_->BytesRemaining(), 0); | |
846 } | |
847 | |
848 SpdyDataFlags flags = | |
849 (pending_send_status_ == NO_MORE_DATA_TO_SEND) ? | |
850 DATA_FLAG_FIN : DATA_FLAG_NONE; | |
851 std::unique_ptr<SpdyBuffer> data_buffer( | |
852 session_->CreateDataBuffer(stream_id_, pending_send_data_.get(), | |
853 pending_send_data_->BytesRemaining(), flags)); | |
854 // We'll get called again by PossiblyResumeIfSendStalled(). | |
855 if (!data_buffer) | |
856 return; | |
857 | |
858 DCHECK_GE(data_buffer->GetRemainingSize(), | |
859 session_->GetDataFrameMinimumSize()); | |
860 size_t payload_size = | |
861 data_buffer->GetRemainingSize() - session_->GetDataFrameMinimumSize(); | |
862 DCHECK_LE(payload_size, session_->GetDataFrameMaximumPayload()); | |
863 | |
864 // Send window size is based on payload size, so nothing to do if this is | |
865 // just a FIN with no payload. | |
866 if (payload_size != 0) { | |
867 DecreaseSendWindowSize(static_cast<int32_t>(payload_size)); | |
868 // This currently isn't strictly needed, since write frames are | |
869 // discarded only if the stream is about to be closed. But have it | |
870 // here anyway just in case this changes. | |
871 data_buffer->AddConsumeCallback(base::Bind( | |
872 &SpdyStream::OnWriteBufferConsumed, GetWeakPtr(), payload_size)); | |
873 } | |
874 | |
875 session_->EnqueueStreamWrite( | |
876 GetWeakPtr(), SpdyFrameType::DATA, | |
877 std::unique_ptr<SpdyBufferProducer>( | |
878 new SimpleBufferProducer(std::move(data_buffer)))); | |
879 } | |
880 | |
881 void SpdyStream::SaveResponseHeaders(const SpdyHeaderBlock& response_headers) { | |
882 DCHECK(response_headers_.empty()); | |
883 if (response_headers.find("transfer-encoding") != response_headers.end()) { | |
884 session_->ResetStream(stream_id_, ERROR_CODE_PROTOCOL_ERROR, | |
885 "Received transfer-encoding header"); | |
886 return; | |
887 } | |
888 | |
889 for (SpdyHeaderBlock::const_iterator it = response_headers.begin(); | |
890 it != response_headers.end(); ++it) { | |
891 // Disallow uppercase headers. | |
892 if (ContainsUppercaseAscii(it->first)) { | |
893 session_->ResetStream( | |
894 stream_id_, ERROR_CODE_PROTOCOL_ERROR, | |
895 "Upper case characters in header: " + it->first.as_string()); | |
896 return; | |
897 } | |
898 | |
899 response_headers_.insert(*it); | |
900 } | |
901 | |
902 // If delegate is not yet attached, OnHeadersReceived() will be called after | |
903 // the delegate gets attached to the stream. | |
904 if (delegate_) | |
905 delegate_->OnHeadersReceived(response_headers_); | |
906 } | |
907 | |
908 #define STATE_CASE(s) \ | |
909 case s: \ | |
910 description = SpdyStringPrintf("%s (0x%08X)", #s, s); \ | |
911 break | |
912 | |
913 SpdyString SpdyStream::DescribeState(State state) { | |
914 SpdyString description; | |
915 switch (state) { | |
916 STATE_CASE(STATE_IDLE); | |
917 STATE_CASE(STATE_OPEN); | |
918 STATE_CASE(STATE_HALF_CLOSED_LOCAL_UNCLAIMED); | |
919 STATE_CASE(STATE_HALF_CLOSED_LOCAL); | |
920 STATE_CASE(STATE_CLOSED); | |
921 default: | |
922 description = SpdyStringPrintf("Unknown state 0x%08X (%u)", state, state); | |
923 break; | |
924 } | |
925 return description; | |
926 } | |
927 | |
928 #undef STATE_CASE | |
929 | |
930 } // namespace net | |
OLD | NEW |