OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "net/spdy/bidirectional_stream_spdy_impl.h" | 5 #include "net/spdy/bidirectional_stream_spdy_impl.h" |
6 | 6 |
7 #include "base/bind.h" | 7 #include "base/bind.h" |
8 #include "base/location.h" | 8 #include "base/location.h" |
9 #include "base/logging.h" | 9 #include "base/logging.h" |
10 #include "base/time/time.h" | 10 #include "base/time/time.h" |
(...skipping 23 matching lines...) Expand all Loading... |
34 negotiated_protocol_(kProtoUnknown), | 34 negotiated_protocol_(kProtoUnknown), |
35 more_read_data_pending_(false), | 35 more_read_data_pending_(false), |
36 read_buffer_len_(0), | 36 read_buffer_len_(0), |
37 stream_closed_(false), | 37 stream_closed_(false), |
38 closed_stream_status_(ERR_FAILED), | 38 closed_stream_status_(ERR_FAILED), |
39 closed_stream_received_bytes_(0), | 39 closed_stream_received_bytes_(0), |
40 closed_stream_sent_bytes_(0), | 40 closed_stream_sent_bytes_(0), |
41 weak_factory_(this) {} | 41 weak_factory_(this) {} |
42 | 42 |
43 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() { | 43 BidirectionalStreamSpdyImpl::~BidirectionalStreamSpdyImpl() { |
44 if (stream_) { | 44 Cancel(); |
45 stream_->DetachDelegate(); | |
46 DCHECK(!stream_); | |
47 } | |
48 } | 45 } |
49 | 46 |
50 void BidirectionalStreamSpdyImpl::Start( | 47 void BidirectionalStreamSpdyImpl::Start( |
51 const BidirectionalStreamRequestInfo* request_info, | 48 const BidirectionalStreamRequestInfo* request_info, |
52 const BoundNetLog& net_log, | 49 const BoundNetLog& net_log, |
53 bool /*send_request_headers_automatically*/, | 50 bool /*send_request_headers_automatically*/, |
54 BidirectionalStreamImpl::Delegate* delegate, | 51 BidirectionalStreamImpl::Delegate* delegate, |
55 std::unique_ptr<base::Timer> timer) { | 52 std::unique_ptr<base::Timer> timer) { |
56 DCHECK(!stream_); | 53 DCHECK(!stream_); |
57 DCHECK(timer); | 54 DCHECK(timer); |
58 | 55 |
59 delegate_ = delegate; | 56 delegate_ = delegate; |
60 timer_ = std::move(timer); | 57 timer_ = std::move(timer); |
61 | 58 |
62 if (!spdy_session_) { | 59 if (!spdy_session_) { |
63 delegate_->OnFailed(ERR_CONNECTION_CLOSED); | 60 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 61 FROM_HERE, |
| 62 base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, |
| 63 weak_factory_.GetWeakPtr(), ERR_CONNECTION_CLOSED)); |
64 return; | 64 return; |
65 } | 65 } |
66 | 66 |
67 request_info_ = request_info; | 67 request_info_ = request_info; |
68 | 68 |
69 int rv = stream_request_.StartRequest( | 69 int rv = stream_request_.StartRequest( |
70 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url, | 70 SPDY_BIDIRECTIONAL_STREAM, spdy_session_, request_info_->url, |
71 request_info_->priority, net_log, | 71 request_info_->priority, net_log, |
72 base::Bind(&BidirectionalStreamSpdyImpl::OnStreamInitialized, | 72 base::Bind(&BidirectionalStreamSpdyImpl::OnStreamInitialized, |
73 weak_factory_.GetWeakPtr())); | 73 weak_factory_.GetWeakPtr())); |
(...skipping 23 matching lines...) Expand all Loading... |
97 // Read will complete asynchronously and Delegate::OnReadCompleted will be | 97 // Read will complete asynchronously and Delegate::OnReadCompleted will be |
98 // called upon completion. | 98 // called upon completion. |
99 read_buffer_ = buf; | 99 read_buffer_ = buf; |
100 read_buffer_len_ = buf_len; | 100 read_buffer_len_ = buf_len; |
101 return ERR_IO_PENDING; | 101 return ERR_IO_PENDING; |
102 } | 102 } |
103 | 103 |
104 void BidirectionalStreamSpdyImpl::SendData(const scoped_refptr<IOBuffer>& data, | 104 void BidirectionalStreamSpdyImpl::SendData(const scoped_refptr<IOBuffer>& data, |
105 int length, | 105 int length, |
106 bool end_stream) { | 106 bool end_stream) { |
| 107 DCHECK(length > 0 || (length == 0 && end_stream)); |
| 108 |
| 109 if (!stream_) { |
| 110 LOG(ERROR) << "Trying to send data after stream has been destroyed."; |
| 111 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 112 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, |
| 113 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); |
| 114 return; |
| 115 } |
| 116 |
107 DCHECK(!stream_closed_); | 117 DCHECK(!stream_closed_); |
108 DCHECK(stream_); | |
109 | |
110 stream_->SendData(data.get(), length, | 118 stream_->SendData(data.get(), length, |
111 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | 119 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); |
112 } | 120 } |
113 | 121 |
114 void BidirectionalStreamSpdyImpl::SendvData( | 122 void BidirectionalStreamSpdyImpl::SendvData( |
115 const std::vector<scoped_refptr<IOBuffer>>& buffers, | 123 const std::vector<scoped_refptr<IOBuffer>>& buffers, |
116 const std::vector<int>& lengths, | 124 const std::vector<int>& lengths, |
117 bool end_stream) { | 125 bool end_stream) { |
118 DCHECK(!stream_closed_); | |
119 DCHECK(stream_); | |
120 DCHECK_EQ(buffers.size(), lengths.size()); | 126 DCHECK_EQ(buffers.size(), lengths.size()); |
121 | 127 |
| 128 if (!stream_) { |
| 129 LOG(ERROR) << "Trying to send data after stream has been destroyed."; |
| 130 base::ThreadTaskRunnerHandle::Get()->PostTask( |
| 131 FROM_HERE, base::Bind(&BidirectionalStreamSpdyImpl::NotifyError, |
| 132 weak_factory_.GetWeakPtr(), ERR_UNEXPECTED)); |
| 133 return; |
| 134 } |
| 135 |
| 136 DCHECK(!stream_closed_); |
122 int total_len = 0; | 137 int total_len = 0; |
123 for (int len : lengths) { | 138 for (int len : lengths) { |
124 total_len += len; | 139 total_len += len; |
125 } | 140 } |
126 | 141 |
127 pending_combined_buffer_ = new net::IOBuffer(total_len); | 142 pending_combined_buffer_ = new net::IOBuffer(total_len); |
128 int len = 0; | 143 int len = 0; |
129 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames. | 144 // TODO(xunjieli): Get rid of extra copy. Coalesce headers and data frames. |
130 for (size_t i = 0; i < buffers.size(); ++i) { | 145 for (size_t i = 0; i < buffers.size(); ++i) { |
131 memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(), | 146 memcpy(pending_combined_buffer_->data() + len, buffers[i]->data(), |
132 lengths[i]); | 147 lengths[i]); |
133 len += lengths[i]; | 148 len += lengths[i]; |
134 } | 149 } |
135 stream_->SendData(pending_combined_buffer_.get(), total_len, | 150 stream_->SendData(pending_combined_buffer_.get(), total_len, |
136 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); | 151 end_stream ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND); |
137 } | 152 } |
138 | 153 |
139 void BidirectionalStreamSpdyImpl::Cancel() { | 154 void BidirectionalStreamSpdyImpl::Cancel() { |
140 if (!stream_) | 155 if (delegate_) { |
141 return; | 156 delegate_ = nullptr; |
142 // Cancels the stream and detaches the delegate so it doesn't get called back. | 157 // Cancel any pending callback. |
143 stream_->DetachDelegate(); | 158 weak_factory_.InvalidateWeakPtrs(); |
144 DCHECK(!stream_); | 159 } |
| 160 ResetStream(); |
145 } | 161 } |
146 | 162 |
147 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const { | 163 NextProto BidirectionalStreamSpdyImpl::GetProtocol() const { |
148 return negotiated_protocol_; | 164 return negotiated_protocol_; |
149 } | 165 } |
150 | 166 |
151 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const { | 167 int64_t BidirectionalStreamSpdyImpl::GetTotalReceivedBytes() const { |
152 if (stream_closed_) | 168 if (stream_closed_) |
153 return closed_stream_received_bytes_; | 169 return closed_stream_received_bytes_; |
154 | 170 |
(...skipping 10 matching lines...) Expand all Loading... |
165 if (!stream_) | 181 if (!stream_) |
166 return 0; | 182 return 0; |
167 | 183 |
168 return stream_->raw_sent_bytes(); | 184 return stream_->raw_sent_bytes(); |
169 } | 185 } |
170 | 186 |
171 void BidirectionalStreamSpdyImpl::OnRequestHeadersSent() { | 187 void BidirectionalStreamSpdyImpl::OnRequestHeadersSent() { |
172 DCHECK(stream_); | 188 DCHECK(stream_); |
173 | 189 |
174 negotiated_protocol_ = stream_->GetProtocol(); | 190 negotiated_protocol_ = stream_->GetProtocol(); |
175 delegate_->OnStreamReady(/*request_headers_sent=*/true); | 191 if (delegate_) |
| 192 delegate_->OnStreamReady(/*request_headers_sent=*/true); |
176 } | 193 } |
177 | 194 |
178 SpdyResponseHeadersStatus BidirectionalStreamSpdyImpl::OnResponseHeadersUpdated( | 195 SpdyResponseHeadersStatus BidirectionalStreamSpdyImpl::OnResponseHeadersUpdated( |
179 const SpdyHeaderBlock& response_headers) { | 196 const SpdyHeaderBlock& response_headers) { |
180 DCHECK(stream_); | 197 DCHECK(stream_); |
181 | 198 |
182 delegate_->OnHeadersReceived(response_headers); | 199 if (delegate_) |
| 200 delegate_->OnHeadersReceived(response_headers); |
| 201 |
183 return RESPONSE_HEADERS_ARE_COMPLETE; | 202 return RESPONSE_HEADERS_ARE_COMPLETE; |
184 } | 203 } |
185 | 204 |
186 void BidirectionalStreamSpdyImpl::OnDataReceived( | 205 void BidirectionalStreamSpdyImpl::OnDataReceived( |
187 std::unique_ptr<SpdyBuffer> buffer) { | 206 std::unique_ptr<SpdyBuffer> buffer) { |
188 DCHECK(stream_); | 207 DCHECK(stream_); |
189 DCHECK(!stream_closed_); | 208 DCHECK(!stream_closed_); |
190 | 209 |
191 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked | 210 // If |buffer| is null, BidirectionalStreamSpdyImpl::OnClose will be invoked |
192 // by SpdyStream to indicate the end of stream. | 211 // by SpdyStream to indicate the end of stream. |
193 if (!buffer) | 212 if (!buffer) |
194 return; | 213 return; |
195 | 214 |
196 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust | 215 // When buffer is consumed, SpdyStream::OnReadBufferConsumed will adjust |
197 // recv window size accordingly. | 216 // recv window size accordingly. |
198 read_data_queue_.Enqueue(std::move(buffer)); | 217 read_data_queue_.Enqueue(std::move(buffer)); |
199 if (read_buffer_) { | 218 if (read_buffer_) { |
200 // Handing small chunks of data to the caller creates measurable overhead. | 219 // Handing small chunks of data to the caller creates measurable overhead. |
201 // So buffer data in short time-spans and send a single read notification. | 220 // So buffer data in short time-spans and send a single read notification. |
202 ScheduleBufferedRead(); | 221 ScheduleBufferedRead(); |
203 } | 222 } |
204 } | 223 } |
205 | 224 |
206 void BidirectionalStreamSpdyImpl::OnDataSent() { | 225 void BidirectionalStreamSpdyImpl::OnDataSent() { |
207 DCHECK(stream_); | 226 DCHECK(stream_); |
208 DCHECK(!stream_closed_); | 227 DCHECK(!stream_closed_); |
209 | 228 |
210 pending_combined_buffer_ = nullptr; | 229 pending_combined_buffer_ = nullptr; |
211 delegate_->OnDataSent(); | 230 if (delegate_) |
| 231 delegate_->OnDataSent(); |
212 } | 232 } |
213 | 233 |
214 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) { | 234 void BidirectionalStreamSpdyImpl::OnTrailers(const SpdyHeaderBlock& trailers) { |
215 DCHECK(stream_); | 235 DCHECK(stream_); |
216 DCHECK(!stream_closed_); | 236 DCHECK(!stream_closed_); |
217 | 237 |
218 delegate_->OnTrailersReceived(trailers); | 238 if (delegate_) |
| 239 delegate_->OnTrailersReceived(trailers); |
219 } | 240 } |
220 | 241 |
221 void BidirectionalStreamSpdyImpl::OnClose(int status) { | 242 void BidirectionalStreamSpdyImpl::OnClose(int status) { |
222 DCHECK(stream_); | 243 DCHECK(stream_); |
223 | 244 |
224 stream_closed_ = true; | 245 stream_closed_ = true; |
225 closed_stream_status_ = status; | 246 closed_stream_status_ = status; |
226 closed_stream_received_bytes_ = stream_->raw_received_bytes(); | 247 closed_stream_received_bytes_ = stream_->raw_received_bytes(); |
227 closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); | 248 closed_stream_sent_bytes_ = stream_->raw_sent_bytes(); |
228 stream_.reset(); | |
229 | 249 |
230 if (status != OK) { | 250 if (status != OK) { |
231 delegate_->OnFailed(status); | 251 NotifyError(status); |
232 return; | 252 return; |
233 } | 253 } |
| 254 ResetStream(); |
234 // Complete any remaining read, as all data has been buffered. | 255 // Complete any remaining read, as all data has been buffered. |
235 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will | 256 // If user has not called ReadData (i.e |read_buffer_| is nullptr), this will |
236 // do nothing. | 257 // do nothing. |
237 timer_->Stop(); | 258 timer_->Stop(); |
238 DoBufferedRead(); | 259 DoBufferedRead(); |
239 } | 260 } |
240 | 261 |
241 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() { | 262 int BidirectionalStreamSpdyImpl::SendRequestHeadersHelper() { |
242 std::unique_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); | 263 std::unique_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock); |
243 HttpRequestInfo http_request_info; | 264 HttpRequestInfo http_request_info; |
(...skipping 16 matching lines...) Expand all Loading... |
260 stream_ = stream_request_.ReleaseStream(); | 281 stream_ = stream_request_.ReleaseStream(); |
261 stream_->SetDelegate(this); | 282 stream_->SetDelegate(this); |
262 rv = SendRequestHeadersHelper(); | 283 rv = SendRequestHeadersHelper(); |
263 if (rv == OK) { | 284 if (rv == OK) { |
264 OnRequestHeadersSent(); | 285 OnRequestHeadersSent(); |
265 return; | 286 return; |
266 } else if (rv == ERR_IO_PENDING) { | 287 } else if (rv == ERR_IO_PENDING) { |
267 return; | 288 return; |
268 } | 289 } |
269 } | 290 } |
270 delegate_->OnFailed(rv); | 291 NotifyError(rv); |
| 292 } |
| 293 |
| 294 void BidirectionalStreamSpdyImpl::NotifyError(int rv) { |
| 295 ResetStream(); |
| 296 if (delegate_) { |
| 297 BidirectionalStreamImpl::Delegate* delegate = delegate_; |
| 298 delegate_ = nullptr; |
| 299 // Cancel any pending callback. |
| 300 weak_factory_.InvalidateWeakPtrs(); |
| 301 delegate->OnFailed(rv); |
| 302 // |this| can be null when returned from delegate. |
| 303 } |
| 304 } |
| 305 |
| 306 void BidirectionalStreamSpdyImpl::ResetStream() { |
| 307 if (!stream_) |
| 308 return; |
| 309 if (!stream_->IsClosed()) { |
| 310 // This sends a RST to the remote. |
| 311 stream_->DetachDelegate(); |
| 312 DCHECK(!stream_); |
| 313 } else { |
| 314 // Stream is already closed, so it is not legal to call DetachDelegate. |
| 315 stream_.reset(); |
| 316 } |
271 } | 317 } |
272 | 318 |
273 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() { | 319 void BidirectionalStreamSpdyImpl::ScheduleBufferedRead() { |
274 // If there is already a scheduled DoBufferedRead, don't issue | 320 // If there is already a scheduled DoBufferedRead, don't issue |
275 // another one. Mark that we have received more data and return. | 321 // another one. Mark that we have received more data and return. |
276 if (timer_->IsRunning()) { | 322 if (timer_->IsRunning()) { |
277 more_read_data_pending_ = true; | 323 more_read_data_pending_ = true; |
278 return; | 324 return; |
279 } | 325 } |
280 | 326 |
(...skipping 15 matching lines...) Expand all Loading... |
296 ScheduleBufferedRead(); | 342 ScheduleBufferedRead(); |
297 return; | 343 return; |
298 } | 344 } |
299 | 345 |
300 int rv = 0; | 346 int rv = 0; |
301 if (read_buffer_) { | 347 if (read_buffer_) { |
302 rv = ReadData(read_buffer_.get(), read_buffer_len_); | 348 rv = ReadData(read_buffer_.get(), read_buffer_len_); |
303 DCHECK_NE(ERR_IO_PENDING, rv); | 349 DCHECK_NE(ERR_IO_PENDING, rv); |
304 read_buffer_ = nullptr; | 350 read_buffer_ = nullptr; |
305 read_buffer_len_ = 0; | 351 read_buffer_len_ = 0; |
306 delegate_->OnDataRead(rv); | 352 if (delegate_) |
| 353 delegate_->OnDataRead(rv); |
307 } | 354 } |
308 } | 355 } |
309 | 356 |
310 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const { | 357 bool BidirectionalStreamSpdyImpl::ShouldWaitForMoreBufferedData() const { |
311 if (stream_closed_) | 358 if (stream_closed_) |
312 return false; | 359 return false; |
313 DCHECK_GT(read_buffer_len_, 0); | 360 DCHECK_GT(read_buffer_len_, 0); |
314 return read_data_queue_.GetTotalSize() < | 361 return read_data_queue_.GetTotalSize() < |
315 static_cast<size_t>(read_buffer_len_); | 362 static_cast<size_t>(read_buffer_len_); |
316 } | 363 } |
317 | 364 |
318 } // namespace net | 365 } // namespace net |
OLD | NEW |