OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #include "net/spdy/bidirectional_spdy_stream.h" | |
6 | |
7 #include "base/memory/scoped_ptr.h" | |
8 #include "base/time/time.h" | |
9 #include "net/base/request_priority.h" | |
10 #include "net/spdy/spdy_buffer.h" | |
11 #include "net/spdy/spdy_header_block.h" | |
12 #include "net/spdy/spdy_http_utils.h" | |
13 #include "net/spdy/spdy_stream.h" | |
14 | |
15 namespace net { | |
16 | |
17 const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1); | |
18 | |
19 BidirectionalSpdyStream::BidirectionalSpdyStream( | |
20 const base::WeakPtr<SpdySession>& spdy_session) | |
21 : spdy_session_(spdy_session), | |
22 stream_closed_(false), | |
23 closed_stream_status_(ERR_FAILED), | |
24 buffered_read_callback_pending_(false), | |
25 more_read_data_pending_(false), | |
26 weak_factory_(this) {} | |
27 | |
28 BidirectionalSpdyStream::~BidirectionalSpdyStream() { | |
29 if (stream_.get()) { | |
30 stream_->DetachDelegate(); | |
31 DCHECK(!stream_.get()); | |
32 } | |
33 } | |
34 | |
35 void BidirectionalSpdyStream::Start(const HttpRequestInfo* request_info, | |
36 RequestPriority priority, | |
37 const BoundNetLog& net_log, | |
38 BidirectionalStream::Delegate* delegate) { | |
39 delegate_ = delegate; | |
40 DCHECK(!stream_); | |
41 if (!spdy_session_) | |
42 delegate_->OnFailed(ERR_CONNECTION_CLOSED); | |
43 | |
44 request_info_ = request_info; | |
45 | |
46 int rv = stream_request_.StartRequest( | |
47 SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url, priority, | |
48 net_log, base::Bind(&BidirectionalSpdyStream::OnStreamInitialized, | |
49 weak_factory_.GetWeakPtr())); | |
50 if (rv != ERR_IO_PENDING) | |
51 OnStreamInitialized(rv); | |
52 } | |
53 | |
54 int BidirectionalSpdyStream::ReadData(IOBuffer* buf, int buf_len) { | |
55 if (stream_.get()) | |
56 CHECK(!stream_->IsIdle()); | |
57 | |
58 CHECK(buf); | |
59 CHECK(buf_len); | |
60 if (!stream_closed_) | |
61 CHECK(stream_); | |
62 | |
63 // If there is data buffered, complete the IO immediately. | |
64 if (!data_queue_.IsEmpty()) { | |
65 return data_queue_.Dequeue(buf->data(), buf_len); | |
66 } else if (stream_closed_) { | |
67 return closed_stream_status_; | |
68 } | |
69 user_buffer_ = buf; | |
70 user_buffer_len_ = buf_len; | |
71 return ERR_IO_PENDING; | |
72 } | |
73 | |
74 void BidirectionalSpdyStream::SendData(IOBuffer* data, | |
75 int length, | |
76 bool end_stream) { | |
77 stream_->SendData(data, length, | |
78 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
79 } | |
80 | |
81 void BidirectionalSpdyStream::OnRequestHeadersSent() { | |
82 delegate_->OnRequestHeadersSent(); | |
83 } | |
84 | |
85 SpdyResponseHeadersStatus BidirectionalSpdyStream::OnResponseHeadersUpdated( | |
86 const SpdyHeaderBlock& response_headers) { | |
87 delegate_->OnHeaders(response_headers); | |
88 return RESPONSE_HEADERS_ARE_COMPLETE; | |
89 } | |
90 | |
91 void BidirectionalSpdyStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) { | |
92 DCHECK(stream_); | |
93 data_queue_.Enqueue(buffer.Pass()); | |
94 if (user_buffer_.get()) { | |
95 // Handing small chunks of data to the caller creates measurable overhead. | |
96 // So buffer data in short time-spans and send a single read notification. | |
97 ScheduleBufferedReadCallback(); | |
98 } | |
99 } | |
100 | |
101 void BidirectionalSpdyStream::OnDataSent() { | |
102 delegate_->OnDataSent(); | |
103 } | |
104 | |
105 void BidirectionalSpdyStream::OnTrailers(const SpdyHeaderBlock& trailers) { | |
106 delegate_->OnTrailers(trailers); | |
107 } | |
108 | |
109 void BidirectionalSpdyStream::OnClose(int status) { | |
110 if (stream_.get()) { | |
111 stream_closed_ = true; | |
112 closed_stream_status_ = status; | |
113 } | |
114 | |
115 stream_.reset(); | |
116 // Complete remaining buffered read. | |
mef
2015/10/07 23:44:56
what if there is no pending read?
xunjieli
2015/10/19 21:07:46
Done. Good catch! I added null check to handle thi
| |
117 if (status == OK) { | |
118 DoBufferedReadCallback(); | |
119 return; | |
120 } | |
121 | |
122 delegate_->OnClose(status); | |
123 } | |
124 | |
125 void BidirectionalSpdyStream::SendRequestHeaders() { | |
126 stream_->SetDelegate(this); | |
127 scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); | |
128 CreateSpdyHeadersFromHttpRequest(*request_info_, request_info_->extra_headers, | |
129 stream_->GetProtocolVersion(), true, | |
130 headers.get()); | |
131 bool end_stream = (request_info_->method == "GET"); | |
132 stream_->SendRequestHeaders( | |
133 headers.Pass(), end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | |
134 } | |
135 | |
136 void BidirectionalSpdyStream::OnStreamInitialized(int rv) { | |
137 DCHECK_NE(ERR_IO_PENDING, rv); | |
138 if (rv == OK) { | |
139 stream_ = stream_request_.ReleaseStream(); | |
140 SendRequestHeaders(); | |
141 return; | |
142 } | |
143 delegate_->OnFailed(rv); | |
144 } | |
145 | |
146 void BidirectionalSpdyStream::ScheduleBufferedReadCallback() { | |
147 // If there is already a scheduled DoBufferedReadCallback, don't issue | |
148 // another one. Mark that we have received more data and return. | |
149 if (buffered_read_callback_pending_) { | |
150 more_read_data_pending_ = true; | |
151 return; | |
152 } | |
153 | |
154 more_read_data_pending_ = false; | |
155 buffered_read_callback_pending_ = true; | |
156 base::ThreadTaskRunnerHandle::Get()->PostDelayedTask( | |
157 FROM_HERE, base::Bind(&BidirectionalSpdyStream::DoBufferedReadCallback, | |
158 weak_factory_.GetWeakPtr()), | |
159 kBufferTime); | |
160 } | |
161 | |
162 void BidirectionalSpdyStream::DoBufferedReadCallback() { | |
163 buffered_read_callback_pending_ = false; | |
164 // If the stream errored out, do not complete the read. | |
165 if (!stream_ && !stream_closed_) | |
166 return; | |
167 if (stream_closed_ && closed_stream_status_ != OK) | |
168 return; | |
169 | |
170 // When |more_read_data_pending_| is true, it means that more data has arrived | |
171 // since started waiting. Wait a little longer and continue to buffer. | |
172 if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) { | |
173 ScheduleBufferedReadCallback(); | |
174 return; | |
175 } | |
176 | |
177 if (!user_buffer_.get()) | |
178 return; | |
179 | |
180 int rv = ReadData(user_buffer_.get(), user_buffer_len_); | |
181 DCHECK_NE(ERR_IO_PENDING, rv); | |
182 delegate_->OnReadCompleted(rv); | |
183 if (data_queue_.IsEmpty() && stream_closed_) | |
184 delegate_->OnClose(closed_stream_status_); | |
185 } | |
186 | |
187 bool BidirectionalSpdyStream::ShouldWaitForMoreBufferedData() const { | |
188 if (stream_closed_) | |
189 return false; | |
190 DCHECK_GT(user_buffer_len_, 0); | |
191 return data_queue_.GetTotalSize() < static_cast<size_t>(user_buffer_len_); | |
192 } | |
193 | |
194 } // namespace net | |
OLD | NEW |