Index: content/browser/download/byte_stream.cc |
diff --git a/content/browser/download/byte_stream.cc b/content/browser/download/byte_stream.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..540dcca96e707db73dadef2e6b7f80f4cf34d23e |
--- /dev/null |
+++ b/content/browser/download/byte_stream.cc |
@@ -0,0 +1,473 @@ |
+// Copyright (c) 2012 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "content/browser/download/byte_stream.h" |
+ |
+#include "base/bind.h" |
+#include "base/location.h" |
+#include "base/memory/weak_ptr.h" |
+#include "base/memory/ref_counted.h" |
+#include "base/sequenced_task_runner.h" |
+ |
+namespace { |
+ |
+typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>,size_t> > |
+ContentVector; |
+ |
+class ByteStreamOutputImpl; |
+ |
+// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be |
+// cleared in an object destructor and accessed to check for object |
+// existence. We can't use weak pointers because they're tightly tied to |
+// threads rather than task runners. |
+struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> { |
+ public: |
+ LifetimeFlag() : is_alive_(true) { } |
+ bool is_alive_; |
+ protected: |
+ friend class base::RefCountedThreadSafe<LifetimeFlag>; |
+ virtual ~LifetimeFlag() { } |
+}; |
+ |
+// For both ByteStreamInputImpl and ByteStreamOutputImpl, Construction and |
+// SetPeer may happen anywhere; all other operations on each class must |
+// happen in the context of their SequencedTaskRunner. |
+class ByteStreamInputImpl : public content::ByteStreamInput { |
+ public: |
+ ByteStreamInputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, |
+ scoped_refptr<LifetimeFlag> lifetime_flag, |
+ size_t buffer_size); |
+ virtual ~ByteStreamInputImpl(); |
+ |
+ // Must be called before any operations are performed. |
+ void SetPeer(ByteStreamOutputImpl* peer, |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag); |
+ |
+ // Overridden from ByteStreamInput. |
+ virtual bool AddData(scoped_refptr<net::IOBuffer> buffer, |
+ size_t byte_count) OVERRIDE; |
+ virtual void SourceComplete(content::DownloadInterruptReason status) OVERRIDE; |
+ virtual void RegisterCallback(base::Closure source_callback) OVERRIDE; |
+ |
+ // PostTask target from |ByteStreamOutputImpl::MaybeUpdateInput|. |
+ static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag, |
+ ByteStreamInputImpl* target, |
+ size_t bytes_consumed); |
+ |
+ private: |
+ void MaybePostToPeer(); |
+ |
+ const size_t total_buffer_size_; |
+ |
+ // All data objects in this class are only valid to access on |
+ // this task runner except as otherwise noted. |
+ scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
+ |
+ // True while this object is alive. |
+ scoped_refptr<LifetimeFlag> my_lifetime_flag_; |
+ |
+ base::Closure space_available_callback_; |
+ ContentVector input_contents_; |
+ size_t input_contents_size_; |
+ |
+ // Time of last point at which data in stream transitioned from zero |
+ // to non-zero. Nulled when a callback is sent. |
+ base::Time last_non_empty_time_; |
+ |
+ // ** Peer information. |
+ |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
+ |
+ // How much we've sent to the output that for flow control purposes we |
+ // must assume hasn't been read yet. |
+ size_t output_size_used_; |
+ |
+ // Only valid to access on peer_task_runner_. |
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag_; |
+ |
+ // Only valid to access on peer_task_runner_ if |
+ // |*peer_lifetime_flag_ == true| |
+ ByteStreamOutputImpl* peer_; |
+}; |
+ |
+class ByteStreamOutputImpl : public content::ByteStreamOutput { |
+ public: |
+ ByteStreamOutputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner, |
+ scoped_refptr<LifetimeFlag> lifetime_flag, |
+ size_t buffer_size); |
+ virtual ~ByteStreamOutputImpl(); |
+ |
+ // Must be called before any operations are performed. |
+ void SetPeer(ByteStreamInputImpl* peer, |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag); |
+ |
+ // Overridden from ByteStreamOutput. |
+ virtual StreamState GetData(scoped_refptr<net::IOBuffer>* data, |
+ size_t* length) OVERRIDE; |
+ virtual content::DownloadInterruptReason GetSourceResult() const OVERRIDE; |
+ virtual void RegisterCallback(base::Closure sink_callback) OVERRIDE; |
+ virtual size_t NumSourceCallbacks() const OVERRIDE; |
+ virtual size_t NumSinkCallbacks() const OVERRIDE; |
+ virtual size_t BytesRead() const OVERRIDE; |
+ virtual size_t BuffersRead() const OVERRIDE; |
+ virtual base::TimeDelta TotalSourceTriggerWaitTime() const OVERRIDE; |
+ virtual base::TimeDelta TotalSinkTriggerWaitTime() const OVERRIDE; |
+ |
+ // PostTask target from |ByteStreamInputImpl::MaybePostToPeer| and |
+ // |ByteStreamInputImpl::SourceComplete|. |
+ // Receive data from our peer. |
+ // static because it may be called after the object it is targeting |
+ // has been destroyed. It may not access |*target| |
+ // if |*object_lifetime_flag| is false. |
+ static void TransferData( |
+ scoped_refptr<LifetimeFlag> object_lifetime_flag, |
+ ByteStreamOutputImpl* target, |
+ scoped_ptr<ContentVector> xfer_buffer, |
+ size_t xfer_buffer_bytes, |
+ bool source_complete, |
+ content::DownloadInterruptReason status, |
+ base::TimeDelta additional_sink_trigger_wait_time); |
+ |
+ private: |
+ void MaybeUpdateInput(); |
+ |
+ const size_t total_buffer_size_; |
+ |
+ scoped_refptr<base::SequencedTaskRunner> my_task_runner_; |
+ |
+ // True while this object is alive. |
+ scoped_refptr<LifetimeFlag> my_lifetime_flag_; |
+ |
+ ContentVector available_contents_; |
+ size_t available_contents_size_; |
+ |
+ bool received_status_; |
+ content::DownloadInterruptReason status_; |
+ |
+ base::Closure data_available_callback_; |
+ |
+ // Time of last point at which data in stream transitioned from full |
+ // to non-full. Nulled when a callback is sent. |
+ base::Time last_non_full_time_; |
+ |
+ // ** Peer information |
+ |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner_; |
+ |
+ // How much has been removed from this class that we haven't told |
+ // the input about yet. |
+ size_t unreported_consumed_bytes_; |
+ |
+ // Only valid to access on peer_task_runner_. |
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag_; |
+ |
+ // Only valid to access on peer_task_runner_ if |
+ // |*peer_lifetime_flag_ == true| |
+ ByteStreamInputImpl* peer_; |
+ |
+ // ** Stream statistics. |
+ size_t bytes_read_; |
+ size_t buffers_read_; |
+ size_t sink_callbacks_triggered_; |
+ size_t source_callbacks_triggered_; |
+ base::TimeDelta total_sink_trigger_wait_time_; |
+ base::TimeDelta total_source_trigger_wait_time_; |
+}; |
+ |
+ByteStreamInputImpl::ByteStreamInputImpl( |
+ scoped_refptr<base::SequencedTaskRunner> task_runner, |
+ scoped_refptr<LifetimeFlag> lifetime_flag, |
+ size_t buffer_size) |
+ : total_buffer_size_(buffer_size), |
+ my_task_runner_(task_runner), |
+ my_lifetime_flag_(lifetime_flag), |
+ input_contents_size_(0), |
+ output_size_used_(0), |
+ peer_(NULL) { |
+ DCHECK(my_lifetime_flag_.get()); |
+ my_lifetime_flag_->is_alive_ = true; |
+} |
+ |
+ByteStreamInputImpl::~ByteStreamInputImpl() { |
+ my_lifetime_flag_->is_alive_ = false; |
+} |
+ |
+void ByteStreamInputImpl::SetPeer( |
+ ByteStreamOutputImpl* peer, |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag) { |
+ peer_ = peer; |
+ peer_task_runner_ = peer_task_runner; |
+ peer_lifetime_flag_ = peer_lifetime_flag; |
+} |
+ |
+bool ByteStreamInputImpl::AddData( |
+ scoped_refptr<net::IOBuffer> buffer, size_t byte_count) { |
+ if (input_contents_size_ == 0 && byte_count != 0) |
+ last_non_empty_time_ = base::Time::Now(); |
+ |
+ input_contents_.push_back(std::make_pair(buffer, byte_count)); |
+ input_contents_size_ += byte_count; |
+ |
+ MaybePostToPeer(); |
+ |
+ return (input_contents_size_ + output_size_used_ <= total_buffer_size_); |
+} |
+ |
+void ByteStreamInputImpl::SourceComplete( |
+ content::DownloadInterruptReason status) { |
+ // Make sure to flush everything we have with the source notification. |
+ scoped_ptr<ContentVector> xfer_buffer; |
+ size_t buffer_size = 0; |
+ base::TimeDelta additional_source_wait_time; // Constructs to 0. |
+ if (0 != input_contents_size_) { |
+ xfer_buffer.reset(new ContentVector); |
+ xfer_buffer->swap(input_contents_); |
+ buffer_size = input_contents_size_; |
+ output_size_used_ += input_contents_size_; |
+ input_contents_size_ = 0; |
+ if (!last_non_empty_time_.is_null()) { |
+ additional_source_wait_time = base::Time::Now() - last_non_empty_time_; |
+ last_non_empty_time_ = base::Time(); |
+ } |
+ } |
+ peer_task_runner_->PostTask( |
+ FROM_HERE, base::Bind( |
+ &ByteStreamOutputImpl::TransferData, |
+ peer_lifetime_flag_, |
+ peer_, |
+ base::Passed(xfer_buffer.Pass()), |
+ buffer_size, |
+ true /* Source complete. */, |
+ status, |
+ additional_source_wait_time)); |
+} |
+ |
+void ByteStreamInputImpl::RegisterCallback( |
+ base::Closure source_callback) { |
+ space_available_callback_ = source_callback; |
+} |
+ |
+// static |
+void ByteStreamInputImpl::UpdateWindow( |
+ scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamInputImpl* target, |
+ size_t bytes_consumed) { |
+ |
+ // If the target object isn't alive anymore, we do nothing. |
+ if (!lifetime_flag->is_alive_) return; |
+ |
+ DCHECK_GE(target->output_size_used_, bytes_consumed); |
+ target->output_size_used_ -= bytes_consumed; |
+ |
+ if (!target->space_available_callback_.is_null()) |
+ target->space_available_callback_.Run(); |
+} |
+ |
+// Decide whether or not we've bufferred enough for a transfer. |
+// For right now "enough" will be "anything". |
+void ByteStreamInputImpl::MaybePostToPeer() { |
+ // Arbitrarily, we buffer to a third of the total size before sending. |
+ if (input_contents_size_ > total_buffer_size_ / 3) { |
+ scoped_ptr<ContentVector> xfer_buffer(new ContentVector); |
+ xfer_buffer->swap(input_contents_); |
+ size_t buffer_size = input_contents_size_; |
+ output_size_used_ += input_contents_size_; |
+ input_contents_size_ = 0; |
+ |
+ base::TimeDelta additional_source_wait_time; // Constructs to 0. |
+ if (!last_non_empty_time_.is_null()) { |
+ additional_source_wait_time = base::Time::Now() - last_non_empty_time_; |
+ last_non_empty_time_ = base::Time(); |
+ } |
+ |
+ peer_task_runner_->PostTask( |
+ FROM_HERE, base::Bind( |
+ &ByteStreamOutputImpl::TransferData, |
+ peer_lifetime_flag_, |
+ peer_, |
+ base::Passed(xfer_buffer.Pass()), |
+ buffer_size, |
+ false /* Source not complete. */, |
+ content::DOWNLOAD_INTERRUPT_REASON_NONE, |
+ additional_source_wait_time)); |
+ } |
+} |
+ |
+ByteStreamOutputImpl::ByteStreamOutputImpl( |
+ scoped_refptr<base::SequencedTaskRunner> task_runner, |
+ scoped_refptr<LifetimeFlag> lifetime_flag, |
+ size_t buffer_size) |
+ : total_buffer_size_(buffer_size), |
+ my_task_runner_(task_runner), |
+ my_lifetime_flag_(lifetime_flag), |
+ available_contents_size_(0), |
+ received_status_(false), |
+ status_(content::DOWNLOAD_INTERRUPT_REASON_NONE), |
+ unreported_consumed_bytes_(0), |
+ peer_(NULL), |
+ bytes_read_(0), |
+ buffers_read_(0), |
+ sink_callbacks_triggered_(0), |
+ source_callbacks_triggered_(0) { |
+ DCHECK(my_lifetime_flag_.get()); |
+ my_lifetime_flag_->is_alive_ = true; |
+} |
+ |
+ByteStreamOutputImpl::~ByteStreamOutputImpl() { |
+ my_lifetime_flag_->is_alive_ = false; |
+} |
+ |
+void ByteStreamOutputImpl::SetPeer( |
+ ByteStreamInputImpl* peer, |
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner, |
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag) { |
+ peer_ = peer; |
+ peer_task_runner_ = peer_task_runner; |
+ peer_lifetime_flag_ = peer_lifetime_flag; |
+} |
+ |
+ByteStreamOutputImpl::StreamState |
+ByteStreamOutputImpl::GetData(scoped_refptr<net::IOBuffer>* data, |
+ size_t* length) { |
+ if (available_contents_.size()) { |
+ *data = available_contents_.front().first; |
+ *length = available_contents_.front().second; |
+ available_contents_.pop_front(); |
+ DCHECK_GE(available_contents_size_, *length); |
+ available_contents_size_ -= *length; |
+ unreported_consumed_bytes_ += *length; |
+ bytes_read_ += *length; |
+ buffers_read_ ++; |
+ if (available_contents_size_ <= total_buffer_size_ && |
+ available_contents_size_ + *length > total_buffer_size_) { |
+ last_non_full_time_ = base::Time::Now(); |
+ } |
+ |
+ MaybeUpdateInput(); |
+ return STREAM_HAS_DATA; |
+ } |
+ if (received_status_) { |
+ return STREAM_COMPLETE; |
+ } |
+ return STREAM_EMPTY; |
+} |
+ |
+content::DownloadInterruptReason |
+ByteStreamOutputImpl::GetSourceResult() const { |
+ DCHECK(received_status_); |
+ return status_; |
+} |
+ |
+void ByteStreamOutputImpl::RegisterCallback(base::Closure sink_callback) { |
+ data_available_callback_ = sink_callback; |
+} |
+ |
+size_t ByteStreamOutputImpl::NumSourceCallbacks() const { |
+ return source_callbacks_triggered_; |
+} |
+ |
+size_t ByteStreamOutputImpl::NumSinkCallbacks() const { |
+ return sink_callbacks_triggered_; |
+} |
+ |
+size_t ByteStreamOutputImpl::BytesRead() const { |
+ return bytes_read_; |
+} |
+ |
+size_t ByteStreamOutputImpl::BuffersRead() const { |
+ return buffers_read_; |
+} |
+ |
+base::TimeDelta ByteStreamOutputImpl::TotalSourceTriggerWaitTime() const { |
+ return total_source_trigger_wait_time_; |
+} |
+ |
+base::TimeDelta ByteStreamOutputImpl::TotalSinkTriggerWaitTime() const { |
+ return total_sink_trigger_wait_time_; |
+} |
+ |
+// static |
+void ByteStreamOutputImpl::TransferData( |
+ scoped_refptr<LifetimeFlag> object_lifetime_flag, |
+ ByteStreamOutputImpl* target, |
+ scoped_ptr<ContentVector> xfer_buffer, |
+ size_t buffer_size, |
+ bool source_complete, |
+ content::DownloadInterruptReason status, |
+ base::TimeDelta additional_sink_trigger_wait_time) { |
+ // If our target is no longer alive, do nothing. |
+ if (!object_lifetime_flag->is_alive_) return; |
+ |
+ if (xfer_buffer.get()) { |
+ target->available_contents_.insert(target->available_contents_.end(), |
+ xfer_buffer->begin(), |
+ xfer_buffer->end()); |
+ target->available_contents_size_ += buffer_size; |
+ } |
+ |
+ if (source_complete) { |
+ target->received_status_ = true; |
+ target->status_ = status; |
+ } |
+ |
+ target->sink_callbacks_triggered_++; |
+ target->total_sink_trigger_wait_time_ += additional_sink_trigger_wait_time; |
+ |
+ if (!target->data_available_callback_.is_null()) |
+ target->data_available_callback_.Run(); |
+} |
+ |
+// Decide whether or not to send the input a window update. |
+// Currently we do that whenever we've got unreported consumption |
+// greater than 1/3 of total size. |
+void ByteStreamOutputImpl::MaybeUpdateInput() { |
+ if (unreported_consumed_bytes_ > total_buffer_size_ / 3) { |
+ source_callbacks_triggered_++; |
+ if (!last_non_full_time_.is_null()) { |
+ total_source_trigger_wait_time_ += |
+ base::Time::Now() - last_non_full_time_; |
+ last_non_full_time_ = base::Time(); |
+ } |
+ peer_task_runner_->PostTask( |
+ FROM_HERE, base::Bind( |
+ &ByteStreamInputImpl::UpdateWindow, |
+ peer_lifetime_flag_, |
+ peer_, |
+ unreported_consumed_bytes_)); |
+ unreported_consumed_bytes_ = 0; |
+ } |
+} |
+ |
+} // namespace |
+ |
+namespace content { |
+ |
+ByteStreamOutput::~ByteStreamOutput() { } |
+ |
+ByteStreamInput::~ByteStreamInput() { } |
+ |
+void CreateByteStream( |
+ scoped_ptr<ByteStreamInput>* input, |
+ scoped_ptr<ByteStreamOutput>* output, |
+ scoped_refptr<base::SequencedTaskRunner> input_task_runner, |
+ scoped_refptr<base::SequencedTaskRunner> output_task_runner, |
+ size_t buffer_size) { |
+ scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag()); |
+ scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag()); |
+ |
+ ByteStreamInputImpl* in = new ByteStreamInputImpl( |
+ input_task_runner, input_flag, buffer_size); |
+ ByteStreamOutputImpl* out = new ByteStreamOutputImpl( |
+ output_task_runner, output_flag, buffer_size); |
+ |
+ in->SetPeer(out, output_task_runner, output_flag); |
+ out->SetPeer(in, input_task_runner, input_flag); |
+ input->reset(in); |
+ output->reset(out); |
+ return; |
+} |
+ |
+} // namespace content |