| OLD | NEW |
| 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. | 1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 #include "content/browser/byte_stream.h" | 5 #include "content/browser/byte_stream.h" |
| 6 | 6 |
| 7 #include "base/bind.h" | 7 #include "base/bind.h" |
| 8 #include "base/location.h" | 8 #include "base/location.h" |
| 9 #include "base/memory/ref_counted.h" | 9 #include "base/memory/ref_counted.h" |
| 10 #include "base/memory/weak_ptr.h" | 10 #include "base/memory/weak_ptr.h" |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 49 | 49 |
| 50 // Must be called before any operations are performed. | 50 // Must be called before any operations are performed. |
| 51 void SetPeer(ByteStreamReaderImpl* peer, | 51 void SetPeer(ByteStreamReaderImpl* peer, |
| 52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 52 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
| 53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | 53 scoped_refptr<LifetimeFlag> peer_lifetime_flag); |
| 54 | 54 |
| 55 // Overridden from ByteStreamWriter. | 55 // Overridden from ByteStreamWriter. |
| 56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, | 56 virtual bool Write(scoped_refptr<net::IOBuffer> buffer, |
| 57 size_t byte_count) OVERRIDE; | 57 size_t byte_count) OVERRIDE; |
| 58 virtual void Flush() OVERRIDE; | 58 virtual void Flush() OVERRIDE; |
| 59 virtual void Close(DownloadInterruptReason status) OVERRIDE; | 59 virtual void Close(int status) OVERRIDE; |
| 60 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; | 60 virtual void RegisterCallback(const base::Closure& source_callback) OVERRIDE; |
| 61 | 61 |
| 62 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. | 62 // PostTask target from |ByteStreamReaderImpl::MaybeUpdateInput|. |
| 63 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, | 63 static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, |
| 64 ByteStreamWriterImpl* target, | 64 ByteStreamWriterImpl* target, |
| 65 size_t bytes_consumed); | 65 size_t bytes_consumed); |
| 66 | 66 |
| 67 private: | 67 private: |
| 68 // Called from UpdateWindow when object existence has been validated. | 68 // Called from UpdateWindow when object existence has been validated. |
| 69 void UpdateWindowInternal(size_t bytes_consumed); | 69 void UpdateWindowInternal(size_t bytes_consumed); |
| 70 | 70 |
| 71 void PostToPeer(bool complete, DownloadInterruptReason status); | 71 void PostToPeer(bool complete, int status); |
| 72 | 72 |
| 73 const size_t total_buffer_size_; | 73 const size_t total_buffer_size_; |
| 74 | 74 |
| 75 // All data objects in this class are only valid to access on | 75 // All data objects in this class are only valid to access on |
| 76 // this task runner except as otherwise noted. | 76 // this task runner except as otherwise noted. |
| 77 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | 77 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
| 78 | 78 |
| 79 // True while this object is alive. | 79 // True while this object is alive. |
| 80 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | 80 scoped_refptr<LifetimeFlag> my_lifetime_flag_; |
| 81 | 81 |
| (...skipping 25 matching lines...) Expand all Loading... |
| 107 virtual ~ByteStreamReaderImpl(); | 107 virtual ~ByteStreamReaderImpl(); |
| 108 | 108 |
| 109 // Must be called before any operations are performed. | 109 // Must be called before any operations are performed. |
| 110 void SetPeer(ByteStreamWriterImpl* peer, | 110 void SetPeer(ByteStreamWriterImpl* peer, |
| 111 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, | 111 scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
| 112 scoped_refptr<LifetimeFlag> peer_lifetime_flag); | 112 scoped_refptr<LifetimeFlag> peer_lifetime_flag); |
| 113 | 113 |
| 114 // Overridden from ByteStreamReader. | 114 // Overridden from ByteStreamReader. |
| 115 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, | 115 virtual StreamState Read(scoped_refptr<net::IOBuffer>* data, |
| 116 size_t* length) OVERRIDE; | 116 size_t* length) OVERRIDE; |
| 117 virtual DownloadInterruptReason GetStatus() const OVERRIDE; | 117 virtual int GetStatus() const OVERRIDE; |
| 118 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; | 118 virtual void RegisterCallback(const base::Closure& sink_callback) OVERRIDE; |
| 119 | 119 |
| 120 // PostTask target from |ByteStreamWriterImpl::MaybePostToPeer| and | 120 // PostTask target from |ByteStreamWriterImpl::Write| and |
| 121 // |ByteStreamWriterImpl::Close|. | 121 // |ByteStreamWriterImpl::Close|. |
| 122 // Receive data from our peer. | 122 // Receive data from our peer. |
| 123 // static because it may be called after the object it is targeting | 123 // static because it may be called after the object it is targeting |
| 124 // has been destroyed. It may not access |*target| | 124 // has been destroyed. It may not access |*target| |
| 125 // if |*object_lifetime_flag| is false. | 125 // if |*object_lifetime_flag| is false. |
| 126 static void TransferData( | 126 static void TransferData( |
| 127 scoped_refptr<LifetimeFlag> object_lifetime_flag, | 127 scoped_refptr<LifetimeFlag> object_lifetime_flag, |
| 128 ByteStreamReaderImpl* target, | 128 ByteStreamReaderImpl* target, |
| 129 scoped_ptr<ContentVector> transfer_buffer, | 129 scoped_ptr<ContentVector> transfer_buffer, |
| 130 size_t transfer_buffer_bytes, | 130 size_t transfer_buffer_bytes, |
| 131 bool source_complete, | 131 bool source_complete, |
| 132 DownloadInterruptReason status); | 132 int status); |
| 133 | 133 |
| 134 private: | 134 private: |
| 135 // Called from TransferData once object existence has been validated. | 135 // Called from TransferData once object existence has been validated. |
| 136 void TransferDataInternal( | 136 void TransferDataInternal( |
| 137 scoped_ptr<ContentVector> transfer_buffer, | 137 scoped_ptr<ContentVector> transfer_buffer, |
| 138 size_t transfer_buffer_bytes, | 138 size_t transfer_buffer_bytes, |
| 139 bool source_complete, | 139 bool source_complete, |
| 140 DownloadInterruptReason status); | 140 int status); |
| 141 | 141 |
| 142 void MaybeUpdateInput(); | 142 void MaybeUpdateInput(); |
| 143 | 143 |
| 144 const size_t total_buffer_size_; | 144 const size_t total_buffer_size_; |
| 145 | 145 |
| 146 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; | 146 scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
| 147 | 147 |
| 148 // True while this object is alive. | 148 // True while this object is alive. |
| 149 scoped_refptr<LifetimeFlag> my_lifetime_flag_; | 149 scoped_refptr<LifetimeFlag> my_lifetime_flag_; |
| 150 | 150 |
| 151 ContentVector available_contents_; | 151 ContentVector available_contents_; |
| 152 | 152 |
| 153 bool received_status_; | 153 bool received_status_; |
| 154 DownloadInterruptReason status_; | 154 int status_; |
| 155 | 155 |
| 156 base::Closure data_available_callback_; | 156 base::Closure data_available_callback_; |
| 157 | 157 |
| 158 // Time of last point at which data in stream transitioned from full | 158 // Time of last point at which data in stream transitioned from full |
| 159 // to non-full. Nulled when a callback is sent. | 159 // to non-full. Nulled when a callback is sent. |
| 160 base::Time last_non_full_time_; | 160 base::Time last_non_full_time_; |
| 161 | 161 |
| 162 // ** Peer information | 162 // ** Peer information |
| 163 | 163 |
| 164 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; | 164 scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
| (...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 204 | 204 |
| 205 bool ByteStreamWriterImpl::Write( | 205 bool ByteStreamWriterImpl::Write( |
| 206 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { | 206 scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { |
| 207 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 207 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 208 | 208 |
| 209 input_contents_.push_back(std::make_pair(buffer, byte_count)); | 209 input_contents_.push_back(std::make_pair(buffer, byte_count)); |
| 210 input_contents_size_ += byte_count; | 210 input_contents_size_ += byte_count; |
| 211 | 211 |
| 212 // Arbitrarily, we buffer to a third of the total size before sending. | 212 // Arbitrarily, we buffer to a third of the total size before sending. |
| 213 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) | 213 if (input_contents_size_ > total_buffer_size_ / kFractionBufferBeforeSending) |
| 214 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); | 214 PostToPeer(false, 0); |
| 215 | 215 |
| 216 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); | 216 return (input_contents_size_ + output_size_used_ <= total_buffer_size_); |
| 217 } | 217 } |
| 218 | 218 |
| 219 void ByteStreamWriterImpl::Flush() { | 219 void ByteStreamWriterImpl::Flush() { |
| 220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 220 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 221 if (input_contents_size_ > 0) | 221 if (input_contents_size_ > 0) |
| 222 PostToPeer(false, DOWNLOAD_INTERRUPT_REASON_NONE); | 222 PostToPeer(false, 0); |
| 223 } | 223 } |
| 224 | 224 |
| 225 void ByteStreamWriterImpl::Close( | 225 void ByteStreamWriterImpl::Close(int status) { |
| 226 DownloadInterruptReason status) { | |
| 227 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 226 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 228 PostToPeer(true, status); | 227 PostToPeer(true, status); |
| 229 } | 228 } |
| 230 | 229 |
| 231 void ByteStreamWriterImpl::RegisterCallback( | 230 void ByteStreamWriterImpl::RegisterCallback( |
| 232 const base::Closure& source_callback) { | 231 const base::Closure& source_callback) { |
| 233 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 232 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 234 space_available_callback_ = source_callback; | 233 space_available_callback_ = source_callback; |
| 235 } | 234 } |
| 236 | 235 |
| (...skipping 15 matching lines...) Expand all Loading... |
| 252 // Callback if we were above the limit and we're now <= to it. | 251 // Callback if we were above the limit and we're now <= to it. |
| 253 size_t total_known_size_used = | 252 size_t total_known_size_used = |
| 254 input_contents_size_ + output_size_used_; | 253 input_contents_size_ + output_size_used_; |
| 255 | 254 |
| 256 if (total_known_size_used <= total_buffer_size_ && | 255 if (total_known_size_used <= total_buffer_size_ && |
| 257 (total_known_size_used + bytes_consumed > total_buffer_size_) && | 256 (total_known_size_used + bytes_consumed > total_buffer_size_) && |
| 258 !space_available_callback_.is_null()) | 257 !space_available_callback_.is_null()) |
| 259 space_available_callback_.Run(); | 258 space_available_callback_.Run(); |
| 260 } | 259 } |
| 261 | 260 |
| 262 void ByteStreamWriterImpl::PostToPeer( | 261 void ByteStreamWriterImpl::PostToPeer(bool complete, int status) { |
| 263 bool complete, DownloadInterruptReason status) { | |
| 264 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 262 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 265 // Valid contexts in which to call. | 263 // Valid contexts in which to call. |
| 266 DCHECK(complete || 0 != input_contents_size_); | 264 DCHECK(complete || 0 != input_contents_size_); |
| 267 | 265 |
| 268 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); | 266 scoped_ptr<ContentVector> transfer_buffer(new ContentVector); |
| 269 size_t buffer_size = 0; | 267 size_t buffer_size = 0; |
| 270 if (0 != input_contents_size_) { | 268 if (0 != input_contents_size_) { |
| 271 transfer_buffer.reset(new ContentVector); | 269 transfer_buffer.reset(new ContentVector); |
| 272 transfer_buffer->swap(input_contents_); | 270 transfer_buffer->swap(input_contents_); |
| 273 buffer_size = input_contents_size_; | 271 buffer_size = input_contents_size_; |
| (...skipping 12 matching lines...) Expand all Loading... |
| 286 } | 284 } |
| 287 | 285 |
| 288 ByteStreamReaderImpl::ByteStreamReaderImpl( | 286 ByteStreamReaderImpl::ByteStreamReaderImpl( |
| 289 scoped_refptr<base::SequencedTaskRunner> task_runner, | 287 scoped_refptr<base::SequencedTaskRunner> task_runner, |
| 290 scoped_refptr<LifetimeFlag> lifetime_flag, | 288 scoped_refptr<LifetimeFlag> lifetime_flag, |
| 291 size_t buffer_size) | 289 size_t buffer_size) |
| 292 : total_buffer_size_(buffer_size), | 290 : total_buffer_size_(buffer_size), |
| 293 my_task_runner_(task_runner), | 291 my_task_runner_(task_runner), |
| 294 my_lifetime_flag_(lifetime_flag), | 292 my_lifetime_flag_(lifetime_flag), |
| 295 received_status_(false), | 293 received_status_(false), |
| 296 status_(DOWNLOAD_INTERRUPT_REASON_NONE), | 294 status_(0), |
| 297 unreported_consumed_bytes_(0), | 295 unreported_consumed_bytes_(0), |
| 298 peer_(NULL) { | 296 peer_(NULL) { |
| 299 DCHECK(my_lifetime_flag_.get()); | 297 DCHECK(my_lifetime_flag_.get()); |
| 300 my_lifetime_flag_->is_alive = true; | 298 my_lifetime_flag_->is_alive = true; |
| 301 } | 299 } |
| 302 | 300 |
| 303 ByteStreamReaderImpl::~ByteStreamReaderImpl() { | 301 ByteStreamReaderImpl::~ByteStreamReaderImpl() { |
| 304 my_lifetime_flag_->is_alive = false; | 302 my_lifetime_flag_->is_alive = false; |
| 305 } | 303 } |
| 306 | 304 |
| (...skipping 19 matching lines...) Expand all Loading... |
| 326 | 324 |
| 327 MaybeUpdateInput(); | 325 MaybeUpdateInput(); |
| 328 return STREAM_HAS_DATA; | 326 return STREAM_HAS_DATA; |
| 329 } | 327 } |
| 330 if (received_status_) { | 328 if (received_status_) { |
| 331 return STREAM_COMPLETE; | 329 return STREAM_COMPLETE; |
| 332 } | 330 } |
| 333 return STREAM_EMPTY; | 331 return STREAM_EMPTY; |
| 334 } | 332 } |
| 335 | 333 |
| 336 DownloadInterruptReason ByteStreamReaderImpl::GetStatus() const { | 334 int ByteStreamReaderImpl::GetStatus() const { |
| 337 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 335 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 338 DCHECK(received_status_); | 336 DCHECK(received_status_); |
| 339 return status_; | 337 return status_; |
| 340 } | 338 } |
| 341 | 339 |
| 342 void ByteStreamReaderImpl::RegisterCallback( | 340 void ByteStreamReaderImpl::RegisterCallback( |
| 343 const base::Closure& sink_callback) { | 341 const base::Closure& sink_callback) { |
| 344 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 342 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 345 | 343 |
| 346 data_available_callback_ = sink_callback; | 344 data_available_callback_ = sink_callback; |
| 347 } | 345 } |
| 348 | 346 |
| 349 // static | 347 // static |
| 350 void ByteStreamReaderImpl::TransferData( | 348 void ByteStreamReaderImpl::TransferData( |
| 351 scoped_refptr<LifetimeFlag> object_lifetime_flag, | 349 scoped_refptr<LifetimeFlag> object_lifetime_flag, |
| 352 ByteStreamReaderImpl* target, | 350 ByteStreamReaderImpl* target, |
| 353 scoped_ptr<ContentVector> transfer_buffer, | 351 scoped_ptr<ContentVector> transfer_buffer, |
| 354 size_t buffer_size, | 352 size_t buffer_size, |
| 355 bool source_complete, | 353 bool source_complete, |
| 356 DownloadInterruptReason status) { | 354 int status) { |
| 357 // If our target is no longer alive, do nothing. | 355 // If our target is no longer alive, do nothing. |
| 358 if (!object_lifetime_flag->is_alive) return; | 356 if (!object_lifetime_flag->is_alive) return; |
| 359 | 357 |
| 360 target->TransferDataInternal( | 358 target->TransferDataInternal( |
| 361 transfer_buffer.Pass(), buffer_size, source_complete, status); | 359 transfer_buffer.Pass(), buffer_size, source_complete, status); |
| 362 } | 360 } |
| 363 | 361 |
| 364 void ByteStreamReaderImpl::TransferDataInternal( | 362 void ByteStreamReaderImpl::TransferDataInternal( |
| 365 scoped_ptr<ContentVector> transfer_buffer, | 363 scoped_ptr<ContentVector> transfer_buffer, |
| 366 size_t buffer_size, | 364 size_t buffer_size, |
| 367 bool source_complete, | 365 bool source_complete, |
| 368 DownloadInterruptReason status) { | 366 int status) { |
| 369 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); | 367 DCHECK(my_task_runner_->RunsTasksOnCurrentThread()); |
| 370 | 368 |
| 371 bool was_empty = available_contents_.empty(); | 369 bool was_empty = available_contents_.empty(); |
| 372 | 370 |
| 373 if (transfer_buffer) { | 371 if (transfer_buffer) { |
| 374 available_contents_.insert(available_contents_.end(), | 372 available_contents_.insert(available_contents_.end(), |
| 375 transfer_buffer->begin(), | 373 transfer_buffer->begin(), |
| 376 transfer_buffer->end()); | 374 transfer_buffer->end()); |
| 377 } | 375 } |
| 378 | 376 |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 432 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( | 430 ByteStreamReaderImpl* out = new ByteStreamReaderImpl( |
| 433 output_task_runner, output_flag, buffer_size); | 431 output_task_runner, output_flag, buffer_size); |
| 434 | 432 |
| 435 in->SetPeer(out, output_task_runner, output_flag); | 433 in->SetPeer(out, output_task_runner, output_flag); |
| 436 out->SetPeer(in, input_task_runner, input_flag); | 434 out->SetPeer(in, input_task_runner, input_flag); |
| 437 input->reset(in); | 435 input->reset(in); |
| 438 output->reset(out); | 436 output->reset(out); |
| 439 } | 437 } |
| 440 | 438 |
| 441 } // namespace content | 439 } // namespace content |
| OLD | NEW |