OLD | NEW |
---|---|
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 #include "content/browser/byte_stream.h" | 5 #include "content/browser/byte_stream.h" |
6 | 6 |
7 #include <deque> | 7 #include <deque> |
8 #include <set> | 8 #include <set> |
9 #include <utility> | 9 #include <utility> |
10 | 10 |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
54 void SetPeer(ByteStreamReaderImpl* peer, | 54 void SetPeer(ByteStreamReaderImpl* peer, |
55 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 55 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
56 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | 56 scoped_refptr<LifetimeFlag> peer_lifetime_flag); |
57 | 57 |
58 // Overridden from ByteStreamWriter. | 58 // Overridden from ByteStreamWriter. |
59 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | 59 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
60 size_t byte_count) OVERRIDE; | 60 size_t byte_count) OVERRIDE; |
61 virtual void Flush() OVERRIDE; | 61 virtual void Flush() OVERRIDE; |
62 virtual void Close(int status) OVERRIDE; | 62 virtual void Close(int status) OVERRIDE; |
63 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; | 63 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; |
64 virtual size_t GetTotalBufferedBytes() const OVERRIDE; | |
64 | 65 |
65 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. | 66 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. |
66 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, | 67 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, |
67 ByteStreamWriterImpl* target, | 68 ByteStreamWriterImpl* target, |
68 size_t bytes_consumed); | 69 size_t bytes_consumed); |
69 | 70 |
70 private: | 71 private: |
71 // Called from UpdateWindow when object existence has been validated. | 72 // Called from UpdateWindow when object existence has been validated. |
72 void UpdateWindowInternal(size_t bytes_consumed); | 73 void UpdateWindowInternal(size_t bytes_consumed); |
73 | 74 |
(...skipping 112 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
186 my_task_runner_(task_runner), | 187 my_task_runner_(task_runner), |
187 my_lifetime_flag_(lifetime_flag), | 188 my_lifetime_flag_(lifetime_flag), |
188 input_contents_size_(0), | 189 input_contents_size_(0), |
189 output_size_used_(0), | 190 output_size_used_(0), |
190 peer_(NULL) { | 191 peer_(NULL) { |
191 DCHECK(my_lifetime_flag_.get()); | 192 DCHECK(my_lifetime_flag_.get()); |
192 my_lifetime_flag_->is_alive = true; | 193 my_lifetime_flag_->is_alive = true; |
193 } | 194 } |
194 | 195 |
195 ByteStreamWriterImpl::~ByteStreamWriterImpl() { | 196 ByteStreamWriterImpl::~ByteStreamWriterImpl() { |
197 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
tyoshino (SeeGerritForStatus)
2013/08/16 11:14:09
reverted these changes. download code crashes if I
| |
196 my_lifetime_flag_->is_alive = false; | 198 my_lifetime_flag_->is_alive = false; |
197 } | 199 } |
198 | 200 |
199 void ByteStreamWriterImpl::SetPeer( | 201 void ByteStreamWriterImpl::SetPeer( |
200 ByteStreamReaderImpl* peer, | 202 ByteStreamReaderImpl* peer, |
201 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 203 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
202 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | 204 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { |
203 peer_ = peer; | 205 peer_ = peer; |
204 peer_task_runner_ = peer_task_runner; | 206 peer_task_runner_ = peer_task_runner; |
205 peer_lifetime_flag_ = peer_lifetime_flag; | 207 peer_lifetime_flag_ = peer_lifetime_flag; |
206 } | 208 } |
207 | 209 |
208 bool ByteStreamWriterImpl::Write( | 210 bool ByteStreamWriterImpl::Write( |
209 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | 211 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { |
210 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 212 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
211 | 213 |
214 // Check overflow. | |
215 size_t space_limit = std::numeric_limits<size_t>::max() - | |
216 output_size_used_ - input_contents_size_; | |
217 if (byte_count > space_limit) { | |
218 // TODO(tyoshino): Tell the user that Write() failed. | |
219 // Ignore input. | |
220 return false; | |
221 } | |
222 | |
212 input_contents_.push_back(std::make_pair(buffer, byte_count)); | 223 input_contents_.push_back(std::make_pair(buffer, byte_count)); |
213 input_contents_size_ += byte_count; | 224 input_contents_size_ += byte_count; |
214 | 225 |
215 // Arbitrarily, we buffer to a third of the total size before sending. | 226 // Arbitrarily, we buffer to a third of the total size before sending. |
216 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) | 227 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) |
217 PostToPeer(false, 0); | 228 PostToPeer(false, 0); |
218 | 229 |
219 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | 230 return GetTotalBufferedBytes() <= total_buffer_size_; |
220 } | 231 } |
221 | 232 |
222 void ByteStreamWriterImpl::Flush() { | 233 void ByteStreamWriterImpl::Flush() { |
223 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 234 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
224 if (input_contents_size_ > 0) | 235 if (input_contents_size_ > 0) |
225 PostToPeer(false, 0); | 236 PostToPeer(false, 0); |
226 } | 237 } |
227 | 238 |
228 void ByteStreamWriterImpl::Close(int status) { | 239 void ByteStreamWriterImpl::Close(int status) { |
229 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 240 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
230 PostToPeer(true, status); | 241 PostToPeer(true, status); |
231 } | 242 } |
232 | 243 |
233 void ByteStreamWriterImpl::RegisterCallback( | 244 void ByteStreamWriterImpl::RegisterCallback( |
234 const base::Closure& source_callback) { | 245 const base::Closure& source_callback) { |
235 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 246 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
236 space_available_callback_ = source_callback; | 247 space_available_callback_ = source_callback; |
237 } | 248 } |
238 | 249 |
250 size_t ByteStreamWriterImpl::GetTotalBufferedBytes() const { | |
251 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
252 // This sum doesn't overflow since Write() fails if this sum is going to | |
253 // overflow. | |
254 return input_contents_size_ + output_size_used_; | |
255 } | |
256 | |
239 // static | 257 // static |
240 void ByteStreamWriterImpl::UpdateWindow( | 258 void ByteStreamWriterImpl::UpdateWindow( |
241 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, | 259 scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamWriterImpl* target, |
242 size_t bytes_consumed) { | 260 size_t bytes_consumed) { |
243 // If the target object isn't alive anymore, we do nothing. | 261 // If the target object isn't alive anymore, we do nothing. |
244 if (!lifetime_flag->is_alive) return; | 262 if (!lifetime_flag->is_alive) return; |
245 | 263 |
246 target->UpdateWindowInternal(bytes_consumed); | 264 target->UpdateWindowInternal(bytes_consumed); |
247 } | 265 } |
248 | 266 |
249 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { | 267 void ByteStreamWriterImpl::UpdateWindowInternal(size_t bytes_consumed) { |
250 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 268 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
269 | |
270 bool was_above_limit = | |
271 input_contents_size_ + output_size_used_ > total_buffer_size_; | |
272 | |
251 DCHECK_GE(output_size_used_, bytes_consumed); | 273 DCHECK_GE(output_size_used_, bytes_consumed); |
252 output_size_used_ -= bytes_consumed; | 274 output_size_used_ -= bytes_consumed; |
253 | 275 |
254 // Callback if we were above the limit and we're now <= to it. | 276 // Callback if we were above the limit and we're now <= to it. |
255 size_t total_known_size_used = | 277 bool no_longer_above_limit = |
256 input_contents_size_ + output_size_used_; | 278 input_contents_size_ + output_size_used_ <= total_buffer_size_; |
257 | 279 |
258 if (total_known_size_used <= total_buffer_size_ && | 280 if (no_longer_above_limit && was_above_limit && |
259 (total_known_size_used + bytes_consumed > total_buffer_size_) && | |
260 !space_available_callback_.is_null()) | 281 !space_available_callback_.is_null()) |
261 space_available_callback_.Run(); | 282 space_available_callback_.Run(); |
262 } | 283 } |
263 | 284 |
264 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { | 285 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { |
265 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 286 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
266 // Valid contexts in which to call. | 287 // Valid contexts in which to call. |
267 DCHECK(complete || 0 != input_contents_size_); | 288 DCHECK(complete || 0 != input_contents_size_); |
268 | 289 |
269 scoped_ptr<ContentVector> transfer_buffer; | 290 scoped_ptr<ContentVector> transfer_buffer; |
(...skipping 25 matching lines...) Expand all Loading... | |
295 my_lifetime_flag_(lifetime_flag), | 316 my_lifetime_flag_(lifetime_flag), |
296 received_status_(false), | 317 received_status_(false), |
297 status_(0), | 318 status_(0), |
298 unreported_consumed_bytes_(0), | 319 unreported_consumed_bytes_(0), |
299 peer_(NULL) { | 320 peer_(NULL) { |
300 DCHECK(my_lifetime_flag_.get()); | 321 DCHECK(my_lifetime_flag_.get()); |
301 my_lifetime_flag_->is_alive = true; | 322 my_lifetime_flag_->is_alive = true; |
302 } | 323 } |
303 | 324 |
304 ByteStreamReaderImpl::~ByteStreamReaderImpl() { | 325 ByteStreamReaderImpl::~ByteStreamReaderImpl() { |
326 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | |
305 my_lifetime_flag_->is_alive = false; | 327 my_lifetime_flag_->is_alive = false; |
306 } | 328 } |
307 | 329 |
308 void ByteStreamReaderImpl::SetPeer( | 330 void ByteStreamReaderImpl::SetPeer( |
309 ByteStreamWriterImpl* peer, | 331 ByteStreamWriterImpl* peer, |
310 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 332 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
311 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { | 333 scoped_refptr<LifetimeFlag> peer_lifetime_flag) { |
312 peer_ = peer; | 334 peer_ = peer; |
313 peer_task_runner_ = peer_task_runner; | 335 peer_task_runner_ = peer_task_runner; |
314 peer_lifetime_flag_ = peer_lifetime_flag; | 336 peer_lifetime_flag_ = peer_lifetime_flag; |
(...skipping 117 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
432 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( | 454 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( |
433 output_task_runner, output_flag, buffer_size); | 455 output_task_runner, output_flag, buffer_size); |
434 | 456 |
435 in->SetPeer(out, output_task_runner, output_flag); | 457 in->SetPeer(out, output_task_runner, output_flag); |
436 out->SetPeer(in, input_task_runner, input_flag); | 458 out->SetPeer(in, input_task_runner, input_flag); |
437 input->reset(in); | 459 input->reset(in); |
438 output->reset(out); | 460 output->reset(out); |
439 } | 461 } |
440 | 462 |
441 } // namespace content | 463 } // namespace content |
OLD | NEW |