OLD | NEW |
---|---|
1 // Copyright 2016 The Chromium Authors. All rights reserved. | 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/quic/bidirectional_stream_quic_impl.h" | 5 #include "net/quic/bidirectional_stream_quic_impl.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/location.h" | 8 #include "base/location.h" |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "base/timer/timer.h" | 10 #include "base/timer/timer.h" |
(...skipping 57 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
68 weak_factory_.GetWeakPtr())); | 68 weak_factory_.GetWeakPtr())); |
69 if (rv == OK) { | 69 if (rv == OK) { |
70 OnStreamReady(rv); | 70 OnStreamReady(rv); |
71 } else if (!was_handshake_confirmed_) { | 71 } else if (!was_handshake_confirmed_) { |
72 NotifyError(ERR_QUIC_HANDSHAKE_FAILED); | 72 NotifyError(ERR_QUIC_HANDSHAKE_FAILED); |
73 } | 73 } |
74 } | 74 } |
75 | 75 |
76 void BidirectionalStreamQuicImpl::SendRequestHeaders() { | 76 void BidirectionalStreamQuicImpl::SendRequestHeaders() { |
77 DCHECK(!has_sent_headers_); | 77 DCHECK(!has_sent_headers_); |
78 DCHECK(stream_); | 78 if (!stream_) { |
79 LOG(ERROR) | |
80 << "Trying to send request headers after stream has been destroyed."; | |
81 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
82 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, | |
83 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); | |
84 return; | |
85 } | |
79 | 86 |
80 SpdyHeaderBlock headers; | 87 SpdyHeaderBlock headers; |
81 HttpRequestInfo http_request_info; | 88 HttpRequestInfo http_request_info; |
82 http_request_info.url = request_info_->url; | 89 http_request_info.url = request_info_->url; |
83 http_request_info.method = request_info_->method; | 90 http_request_info.method = request_info_->method; |
84 http_request_info.extra_headers = request_info_->extra_headers; | 91 http_request_info.extra_headers = request_info_->extra_headers; |
85 | 92 |
86 CreateSpdyHeadersFromHttpRequest(http_request_info, | 93 CreateSpdyHeadersFromHttpRequest(http_request_info, |
87 http_request_info.extra_headers, HTTP2, true, | 94 http_request_info.extra_headers, HTTP2, true, |
88 &headers); | 95 &headers); |
(...skipping 23 matching lines...) Expand all Loading... | |
112 // Read will complete asynchronously and Delegate::OnReadCompleted will be | 119 // Read will complete asynchronously and Delegate::OnReadCompleted will be |
113 // called upon completion. | 120 // called upon completion. |
114 read_buffer_ = buffer; | 121 read_buffer_ = buffer; |
115 read_buffer_len_ = buffer_len; | 122 read_buffer_len_ = buffer_len; |
116 return ERR_IO_PENDING; | 123 return ERR_IO_PENDING; |
117 } | 124 } |
118 | 125 |
119 void BidirectionalStreamQuicImpl::SendData(const scoped_refptr<IOBuffer>& data, | 126 void BidirectionalStreamQuicImpl::SendData(const scoped_refptr<IOBuffer>& data, |
120 int length, | 127 int length, |
121 bool end_stream) { | 128 bool end_stream) { |
122 DCHECK(stream_); | |
123 DCHECK(length > 0 || (length == 0 && end_stream)); | 129 DCHECK(length > 0 || (length == 0 && end_stream)); |
130 if (!stream_) { | |
131 LOG(ERROR) << "Trying to send data after stream has been destroyed."; | |
132 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
133 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, | |
134 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); | |
135 return; | |
136 } | |
124 | 137 |
125 std::unique_ptr<QuicConnection::ScopedPacketBundler> bundler; | 138 std::unique_ptr<QuicConnection::ScopedPacketBundler> bundler; |
126 if (!has_sent_headers_) { | 139 if (!has_sent_headers_) { |
127 DCHECK(!send_request_headers_automatically_); | 140 DCHECK(!send_request_headers_automatically_); |
128 // Creates a bundler only if there are headers to be sent along with the | 141 // Creates a bundler only if there are headers to be sent along with the |
129 // single data buffer. | 142 // single data buffer. |
130 bundler.reset(new QuicConnection::ScopedPacketBundler( | 143 bundler.reset(new QuicConnection::ScopedPacketBundler( |
131 session_->connection(), QuicConnection::SEND_ACK_IF_PENDING)); | 144 session_->connection(), QuicConnection::SEND_ACK_IF_PENDING)); |
132 SendRequestHeaders(); | 145 SendRequestHeaders(); |
133 } | 146 } |
134 | 147 |
135 base::StringPiece string_data(data->data(), length); | 148 base::StringPiece string_data(data->data(), length); |
136 int rv = stream_->WriteStreamData( | 149 int rv = stream_->WriteStreamData( |
137 string_data, end_stream, | 150 string_data, end_stream, |
138 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, | 151 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, |
139 weak_factory_.GetWeakPtr())); | 152 weak_factory_.GetWeakPtr())); |
140 DCHECK(rv == OK || rv == ERR_IO_PENDING); | 153 DCHECK(rv == OK || rv == ERR_IO_PENDING); |
141 if (rv == OK) { | 154 if (rv == OK) { |
142 base::ThreadTaskRunnerHandle::Get()->PostTask( | 155 base::ThreadTaskRunnerHandle::Get()->PostTask( |
143 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, | 156 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, |
144 weak_factory_.GetWeakPtr(), OK)); | 157 weak_factory_.GetWeakPtr(), OK)); |
145 } | 158 } |
146 } | 159 } |
147 | 160 |
148 void BidirectionalStreamQuicImpl::SendvData( | 161 void BidirectionalStreamQuicImpl::SendvData( |
149 const std::vector<scoped_refptr<IOBuffer>>& buffers, | 162 const std::vector<scoped_refptr<IOBuffer>>& buffers, |
150 const std::vector<int>& lengths, | 163 const std::vector<int>& lengths, |
151 bool end_stream) { | 164 bool end_stream) { |
152 DCHECK(stream_); | |
153 DCHECK_EQ(buffers.size(), lengths.size()); | 165 DCHECK_EQ(buffers.size(), lengths.size()); |
154 | 166 |
167 if (!stream_) { | |
168 LOG(ERROR) << "Trying to send data after stream has been destroyed."; | |
mef
2016/06/03 20:03:19
I think logging is fine, but I'm not sure that pos
xunjieli
2016/06/03 20:20:53
If stream is destroyed gracefully because of a net
| |
169 base::ThreadTaskRunnerHandle::Get()->PostTask( | |
170 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::NotifyError, | |
171 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); | |
172 return; | |
173 } | |
174 | |
155 QuicConnection::ScopedPacketBundler bundler( | 175 QuicConnection::ScopedPacketBundler bundler( |
156 session_->connection(), QuicConnection::SEND_ACK_IF_PENDING); | 176 session_->connection(), QuicConnection::SEND_ACK_IF_PENDING); |
157 if (!has_sent_headers_) { | 177 if (!has_sent_headers_) { |
158 DCHECK(!send_request_headers_automatically_); | 178 DCHECK(!send_request_headers_automatically_); |
159 SendRequestHeaders(); | 179 SendRequestHeaders(); |
160 } | 180 } |
161 | 181 |
162 int rv = stream_->WritevStreamData( | 182 int rv = stream_->WritevStreamData( |
163 buffers, lengths, end_stream, | 183 buffers, lengths, end_stream, |
164 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, | 184 base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, |
165 weak_factory_.GetWeakPtr())); | 185 weak_factory_.GetWeakPtr())); |
166 | 186 |
167 DCHECK(rv == OK || rv == ERR_IO_PENDING); | 187 DCHECK(rv == OK || rv == ERR_IO_PENDING); |
168 if (rv == OK) { | 188 if (rv == OK) { |
169 base::ThreadTaskRunnerHandle::Get()->PostTask( | 189 base::ThreadTaskRunnerHandle::Get()->PostTask( |
170 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, | 190 FROM_HERE, base::Bind(&BidirectionalStreamQuicImpl::OnSendDataComplete, |
171 weak_factory_.GetWeakPtr(), OK)); | 191 weak_factory_.GetWeakPtr(), OK)); |
172 } | 192 } |
173 } | 193 } |
174 | 194 |
175 void BidirectionalStreamQuicImpl::Cancel() { | 195 void BidirectionalStreamQuicImpl::Cancel() { |
176 if (stream_) { | 196 if (!stream_) |
177 stream_->SetDelegate(nullptr); | 197 return; |
178 stream_->Reset(QUIC_STREAM_CANCELLED); | 198 stream_->SetDelegate(nullptr); |
179 ResetStream(); | 199 stream_->Reset(QUIC_STREAM_CANCELLED); |
180 } | 200 delegate_ = nullptr; |
201 // Cancel any pending callbacks. | |
202 weak_factory_.InvalidateWeakPtrs(); | |
mef
2016/06/03 20:03:19
should this be part of ResetStream()?
xunjieli
2016/06/03 20:20:53
Great question! If OnClose(OK) is invoked, we do R
| |
203 ResetStream(); | |
181 } | 204 } |
182 | 205 |
183 NextProto BidirectionalStreamQuicImpl::GetProtocol() const { | 206 NextProto BidirectionalStreamQuicImpl::GetProtocol() const { |
184 return negotiated_protocol_; | 207 return negotiated_protocol_; |
185 } | 208 } |
186 | 209 |
187 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const { | 210 int64_t BidirectionalStreamQuicImpl::GetTotalReceivedBytes() const { |
188 if (stream_) | 211 if (stream_) |
189 return headers_bytes_received_ + stream_->stream_bytes_read(); | 212 return headers_bytes_received_ + stream_->stream_bytes_read(); |
190 return headers_bytes_received_ + closed_stream_received_bytes_; | 213 return headers_bytes_received_ + closed_stream_received_bytes_; |
191 } | 214 } |
192 | 215 |
193 int64_t BidirectionalStreamQuicImpl::GetTotalSentBytes() const { | 216 int64_t BidirectionalStreamQuicImpl::GetTotalSentBytes() const { |
194 if (stream_) | 217 if (stream_) |
195 return headers_bytes_sent_ + stream_->stream_bytes_written(); | 218 return headers_bytes_sent_ + stream_->stream_bytes_written(); |
196 return headers_bytes_sent_ + closed_stream_sent_bytes_; | 219 return headers_bytes_sent_ + closed_stream_sent_bytes_; |
197 } | 220 } |
198 | 221 |
199 void BidirectionalStreamQuicImpl::OnHeadersAvailable( | 222 void BidirectionalStreamQuicImpl::OnHeadersAvailable( |
200 const SpdyHeaderBlock& headers, | 223 const SpdyHeaderBlock& headers, |
201 size_t frame_len) { | 224 size_t frame_len) { |
202 headers_bytes_received_ += frame_len; | 225 headers_bytes_received_ += frame_len; |
203 negotiated_protocol_ = kProtoQUIC1SPDY3; | 226 negotiated_protocol_ = kProtoQUIC1SPDY3; |
204 if (!has_received_headers_) { | 227 if (!has_received_headers_) { |
205 has_received_headers_ = true; | 228 has_received_headers_ = true; |
206 delegate_->OnHeadersReceived(headers); | 229 if (delegate_) |
230 delegate_->OnHeadersReceived(headers); | |
207 } else { | 231 } else { |
208 if (stream_->IsDoneReading()) { | 232 if (stream_->IsDoneReading()) { |
209 // If the write side is closed, OnFinRead() will call | 233 // If the write side is closed, OnFinRead() will call |
210 // BidirectionalStreamQuicImpl::OnClose(). | 234 // BidirectionalStreamQuicImpl::OnClose(). |
211 stream_->OnFinRead(); | 235 stream_->OnFinRead(); |
212 } | 236 } |
213 delegate_->OnTrailersReceived(headers); | 237 if (delegate_) |
238 delegate_->OnTrailersReceived(headers); | |
214 } | 239 } |
215 } | 240 } |
216 | 241 |
217 void BidirectionalStreamQuicImpl::OnDataAvailable() { | 242 void BidirectionalStreamQuicImpl::OnDataAvailable() { |
218 // Return early if ReadData has not been called. | 243 // Return early if ReadData has not been called. |
219 if (!read_buffer_) | 244 if (!read_buffer_) |
220 return; | 245 return; |
221 | 246 |
222 CHECK(read_buffer_); | 247 CHECK(read_buffer_); |
223 CHECK_NE(0, read_buffer_len_); | 248 CHECK_NE(0, read_buffer_len_); |
224 int rv = ReadData(read_buffer_.get(), read_buffer_len_); | 249 int rv = ReadData(read_buffer_.get(), read_buffer_len_); |
225 if (rv == ERR_IO_PENDING) { | 250 if (rv == ERR_IO_PENDING) { |
226 // Spurrious notification. Wait for the next one. | 251 // Spurrious notification. Wait for the next one. |
227 return; | 252 return; |
228 } | 253 } |
229 read_buffer_ = nullptr; | 254 read_buffer_ = nullptr; |
230 read_buffer_len_ = 0; | 255 read_buffer_len_ = 0; |
231 delegate_->OnDataRead(rv); | 256 if (delegate_) |
257 delegate_->OnDataRead(rv); | |
232 } | 258 } |
233 | 259 |
234 void BidirectionalStreamQuicImpl::OnClose(QuicErrorCode error) { | 260 void BidirectionalStreamQuicImpl::OnClose(QuicErrorCode error) { |
235 DCHECK(stream_); | 261 DCHECK(stream_); |
262 | |
236 if (error == QUIC_NO_ERROR && | 263 if (error == QUIC_NO_ERROR && |
237 stream_->stream_error() == QUIC_STREAM_NO_ERROR) { | 264 stream_->stream_error() == QUIC_STREAM_NO_ERROR) { |
238 ResetStream(); | 265 ResetStream(); |
239 return; | 266 return; |
240 } | 267 } |
241 ResetStream(); | |
242 NotifyError(was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR | 268 NotifyError(was_handshake_confirmed_ ? ERR_QUIC_PROTOCOL_ERROR |
243 : ERR_QUIC_HANDSHAKE_FAILED); | 269 : ERR_QUIC_HANDSHAKE_FAILED); |
244 } | 270 } |
245 | 271 |
246 void BidirectionalStreamQuicImpl::OnError(int error) { | 272 void BidirectionalStreamQuicImpl::OnError(int error) { |
247 NotifyError(error); | 273 NotifyError(error); |
248 } | 274 } |
249 | 275 |
250 bool BidirectionalStreamQuicImpl::HasSendHeadersComplete() { | 276 bool BidirectionalStreamQuicImpl::HasSendHeadersComplete() { |
251 return has_sent_headers_; | 277 return has_sent_headers_; |
(...skipping 12 matching lines...) Expand all Loading... | |
264 } | 290 } |
265 | 291 |
266 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) { | 292 void BidirectionalStreamQuicImpl::OnStreamReady(int rv) { |
267 DCHECK_NE(ERR_IO_PENDING, rv); | 293 DCHECK_NE(ERR_IO_PENDING, rv); |
268 DCHECK(rv == OK || !stream_); | 294 DCHECK(rv == OK || !stream_); |
269 if (rv == OK) { | 295 if (rv == OK) { |
270 stream_->SetDelegate(this); | 296 stream_->SetDelegate(this); |
271 if (send_request_headers_automatically_) { | 297 if (send_request_headers_automatically_) { |
272 SendRequestHeaders(); | 298 SendRequestHeaders(); |
273 } | 299 } |
274 delegate_->OnStreamReady(has_sent_headers_); | 300 if (delegate_) |
301 delegate_->OnStreamReady(has_sent_headers_); | |
275 } else { | 302 } else { |
276 NotifyError(rv); | 303 NotifyError(rv); |
277 } | 304 } |
278 } | 305 } |
279 | 306 |
280 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) { | 307 void BidirectionalStreamQuicImpl::OnSendDataComplete(int rv) { |
281 DCHECK(rv == OK || !stream_); | 308 DCHECK(rv == OK || !stream_); |
282 if (rv == OK) { | 309 if (rv == OK) { |
283 delegate_->OnDataSent(); | 310 if (delegate_) |
311 delegate_->OnDataSent(); | |
284 } else { | 312 } else { |
285 NotifyError(rv); | 313 NotifyError(rv); |
286 } | 314 } |
287 } | 315 } |
288 | 316 |
289 void BidirectionalStreamQuicImpl::NotifyError(int error) { | 317 void BidirectionalStreamQuicImpl::NotifyError(int error) { |
290 DCHECK_NE(OK, error); | 318 DCHECK_NE(OK, error); |
291 DCHECK_NE(ERR_IO_PENDING, error); | 319 DCHECK_NE(ERR_IO_PENDING, error); |
292 | 320 |
321 if (!delegate_) | |
322 return; | |
293 response_status_ = error; | 323 response_status_ = error; |
294 ResetStream(); | 324 ResetStream(); |
295 delegate_->OnFailed(error); | 325 BidirectionalStreamImpl::Delegate* delegate = delegate_; |
326 delegate_ = nullptr; | |
327 // Cancel any pending callback. | |
328 weak_factory_.InvalidateWeakPtrs(); | |
mef
2016/06/03 20:03:19
If we invalidate any pending callbacks here, why d
xunjieli
2016/06/03 20:20:53
Great question! InvalidateWeakPtrs() only invalida
| |
329 delegate->OnFailed(error); | |
296 } | 330 } |
297 | 331 |
298 void BidirectionalStreamQuicImpl::ResetStream() { | 332 void BidirectionalStreamQuicImpl::ResetStream() { |
299 if (!stream_) | 333 if (!stream_) |
300 return; | 334 return; |
301 closed_stream_received_bytes_ = stream_->stream_bytes_read(); | 335 closed_stream_received_bytes_ = stream_->stream_bytes_read(); |
302 closed_stream_sent_bytes_ = stream_->stream_bytes_written(); | 336 closed_stream_sent_bytes_ = stream_->stream_bytes_written(); |
303 stream_->SetDelegate(nullptr); | 337 stream_->SetDelegate(nullptr); |
304 stream_ = nullptr; | 338 stream_ = nullptr; |
305 } | 339 } |
306 | 340 |
307 } // namespace net | 341 } // namespace net |
OLD | NEW |