Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(824)

Unified Diff: content/browser/download/byte_stream.cc

Issue 10244001: Creation of ByteStream class. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Finished and polished rewrite. Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/browser/download/byte_stream.cc
diff --git a/content/browser/download/byte_stream.cc b/content/browser/download/byte_stream.cc
new file mode 100644
index 0000000000000000000000000000000000000000..540dcca96e707db73dadef2e6b7f80f4cf34d23e
--- /dev/null
+++ b/content/browser/download/byte_stream.cc
@@ -0,0 +1,473 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/download/byte_stream.h"
+
+#include "base/bind.h"
+#include "base/location.h"
+#include "base/memory/weak_ptr.h"
+#include "base/memory/ref_counted.h"
+#include "base/sequenced_task_runner.h"
+
+namespace {
+
+typedef std::deque<std::pair<scoped_refptr<net::IOBuffer>,size_t> >
+ContentVector;
+
+class ByteStreamOutputImpl;
+
+// A poor man's weak pointer; a RefCountedThreadSafe boolean that can be
+// cleared in an object destructor and accessed to check for object
+// existence. We can't use weak pointers because they're tightly tied to
+// threads rather than task runners.
+struct LifetimeFlag : public base::RefCountedThreadSafe<LifetimeFlag> {
+ public:
+ LifetimeFlag() : is_alive_(true) { }
+ bool is_alive_;
+ protected:
+ friend class base::RefCountedThreadSafe<LifetimeFlag>;
+ virtual ~LifetimeFlag() { }
+};
+
+// For both ByteStreamInputImpl and ByteStreamOutputImpl, Construction and
+// SetPeer may happen anywhere; all other operations on each class must
+// happen in the context of their SequencedTaskRunner.
+class ByteStreamInputImpl : public content::ByteStreamInput {
+ public:
+ ByteStreamInputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
+ scoped_refptr<LifetimeFlag> lifetime_flag,
+ size_t buffer_size);
+ virtual ~ByteStreamInputImpl();
+
+ // Must be called before any operations are performed.
+ void SetPeer(ByteStreamOutputImpl* peer,
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag);
+
+ // Overridden from ByteStreamInput.
+ virtual bool AddData(scoped_refptr<net::IOBuffer> buffer,
+ size_t byte_count) OVERRIDE;
+ virtual void SourceComplete(content::DownloadInterruptReason status) OVERRIDE;
+ virtual void RegisterCallback(base::Closure source_callback) OVERRIDE;
+
+ // PostTask target from |ByteStreamOutputImpl::MaybeUpdateInput|.
+ static void UpdateWindow(scoped_refptr<LifetimeFlag> lifetime_flag,
+ ByteStreamInputImpl* target,
+ size_t bytes_consumed);
+
+ private:
+ void MaybePostToPeer();
+
+ const size_t total_buffer_size_;
+
+ // All data objects in this class are only valid to access on
+ // this task runner except as otherwise noted.
+ scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
+
+ // True while this object is alive.
+ scoped_refptr<LifetimeFlag> my_lifetime_flag_;
+
+ base::Closure space_available_callback_;
+ ContentVector input_contents_;
+ size_t input_contents_size_;
+
+ // Time of last point at which data in stream transitioned from zero
+ // to non-zero. Nulled when a callback is sent.
+ base::Time last_non_empty_time_;
+
+ // ** Peer information.
+
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
+
+ // How much we've sent to the output that for flow control purposes we
+ // must assume hasn't been read yet.
+ size_t output_size_used_;
+
+ // Only valid to access on peer_task_runner_.
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
+
+ // Only valid to access on peer_task_runner_ if
+ // |*peer_lifetime_flag_ == true|
+ ByteStreamOutputImpl* peer_;
+};
+
+class ByteStreamOutputImpl : public content::ByteStreamOutput {
+ public:
+ ByteStreamOutputImpl(scoped_refptr<base::SequencedTaskRunner> task_runner,
+ scoped_refptr<LifetimeFlag> lifetime_flag,
+ size_t buffer_size);
+ virtual ~ByteStreamOutputImpl();
+
+ // Must be called before any operations are performed.
+ void SetPeer(ByteStreamInputImpl* peer,
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag);
+
+ // Overridden from ByteStreamOutput.
+ virtual StreamState GetData(scoped_refptr<net::IOBuffer>* data,
+ size_t* length) OVERRIDE;
+ virtual content::DownloadInterruptReason GetSourceResult() const OVERRIDE;
+ virtual void RegisterCallback(base::Closure sink_callback) OVERRIDE;
+ virtual size_t NumSourceCallbacks() const OVERRIDE;
+ virtual size_t NumSinkCallbacks() const OVERRIDE;
+ virtual size_t BytesRead() const OVERRIDE;
+ virtual size_t BuffersRead() const OVERRIDE;
+ virtual base::TimeDelta TotalSourceTriggerWaitTime() const OVERRIDE;
+ virtual base::TimeDelta TotalSinkTriggerWaitTime() const OVERRIDE;
+
+ // PostTask target from |ByteStreamInputImpl::MaybePostToPeer| and
+ // |ByteStreamInputImpl::SourceComplete|.
+ // Receive data from our peer.
+ // static because it may be called after the object it is targeting
+ // has been destroyed. It may not access |*target|
+ // if |*object_lifetime_flag| is false.
+ static void TransferData(
+ scoped_refptr<LifetimeFlag> object_lifetime_flag,
+ ByteStreamOutputImpl* target,
+ scoped_ptr<ContentVector> xfer_buffer,
+ size_t xfer_buffer_bytes,
+ bool source_complete,
+ content::DownloadInterruptReason status,
+ base::TimeDelta additional_sink_trigger_wait_time);
+
+ private:
+ void MaybeUpdateInput();
+
+ const size_t total_buffer_size_;
+
+ scoped_refptr<base::SequencedTaskRunner> my_task_runner_;
+
+ // True while this object is alive.
+ scoped_refptr<LifetimeFlag> my_lifetime_flag_;
+
+ ContentVector available_contents_;
+ size_t available_contents_size_;
+
+ bool received_status_;
+ content::DownloadInterruptReason status_;
+
+ base::Closure data_available_callback_;
+
+ // Time of last point at which data in stream transitioned from full
+ // to non-full. Nulled when a callback is sent.
+ base::Time last_non_full_time_;
+
+ // ** Peer information
+
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner_;
+
+ // How much has been removed from this class that we haven't told
+ // the input about yet.
+ size_t unreported_consumed_bytes_;
+
+ // Only valid to access on peer_task_runner_.
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag_;
+
+ // Only valid to access on peer_task_runner_ if
+ // |*peer_lifetime_flag_ == true|
+ ByteStreamInputImpl* peer_;
+
+ // ** Stream statistics.
+ size_t bytes_read_;
+ size_t buffers_read_;
+ size_t sink_callbacks_triggered_;
+ size_t source_callbacks_triggered_;
+ base::TimeDelta total_sink_trigger_wait_time_;
+ base::TimeDelta total_source_trigger_wait_time_;
+};
+
+ByteStreamInputImpl::ByteStreamInputImpl(
+ scoped_refptr<base::SequencedTaskRunner> task_runner,
+ scoped_refptr<LifetimeFlag> lifetime_flag,
+ size_t buffer_size)
+ : total_buffer_size_(buffer_size),
+ my_task_runner_(task_runner),
+ my_lifetime_flag_(lifetime_flag),
+ input_contents_size_(0),
+ output_size_used_(0),
+ peer_(NULL) {
+ DCHECK(my_lifetime_flag_.get());
+ my_lifetime_flag_->is_alive_ = true;
+}
+
+ByteStreamInputImpl::~ByteStreamInputImpl() {
+ my_lifetime_flag_->is_alive_ = false;
+}
+
+void ByteStreamInputImpl::SetPeer(
+ ByteStreamOutputImpl* peer,
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
+ peer_ = peer;
+ peer_task_runner_ = peer_task_runner;
+ peer_lifetime_flag_ = peer_lifetime_flag;
+}
+
+bool ByteStreamInputImpl::AddData(
+ scoped_refptr<net::IOBuffer> buffer, size_t byte_count) {
+ if (input_contents_size_ == 0 && byte_count != 0)
+ last_non_empty_time_ = base::Time::Now();
+
+ input_contents_.push_back(std::make_pair(buffer, byte_count));
+ input_contents_size_ += byte_count;
+
+ MaybePostToPeer();
+
+ return (input_contents_size_ + output_size_used_ <= total_buffer_size_);
+}
+
+void ByteStreamInputImpl::SourceComplete(
+ content::DownloadInterruptReason status) {
+ // Make sure to flush everything we have with the source notification.
+ scoped_ptr<ContentVector> xfer_buffer;
+ size_t buffer_size = 0;
+ base::TimeDelta additional_source_wait_time; // Constructs to 0.
+ if (0 != input_contents_size_) {
+ xfer_buffer.reset(new ContentVector);
+ xfer_buffer->swap(input_contents_);
+ buffer_size = input_contents_size_;
+ output_size_used_ += input_contents_size_;
+ input_contents_size_ = 0;
+ if (!last_non_empty_time_.is_null()) {
+ additional_source_wait_time = base::Time::Now() - last_non_empty_time_;
+ last_non_empty_time_ = base::Time();
+ }
+ }
+ peer_task_runner_->PostTask(
+ FROM_HERE, base::Bind(
+ &ByteStreamOutputImpl::TransferData,
+ peer_lifetime_flag_,
+ peer_,
+ base::Passed(xfer_buffer.Pass()),
+ buffer_size,
+ true /* Source complete. */,
+ status,
+ additional_source_wait_time));
+}
+
+void ByteStreamInputImpl::RegisterCallback(
+ base::Closure source_callback) {
+ space_available_callback_ = source_callback;
+}
+
+// static
+void ByteStreamInputImpl::UpdateWindow(
+ scoped_refptr<LifetimeFlag> lifetime_flag, ByteStreamInputImpl* target,
+ size_t bytes_consumed) {
+
+ // If the target object isn't alive anymore, we do nothing.
+ if (!lifetime_flag->is_alive_) return;
+
+ DCHECK_GE(target->output_size_used_, bytes_consumed);
+ target->output_size_used_ -= bytes_consumed;
+
+ if (!target->space_available_callback_.is_null())
+ target->space_available_callback_.Run();
+}
+
+// Decide whether or not we've bufferred enough for a transfer.
+// For right now "enough" will be "anything".
+void ByteStreamInputImpl::MaybePostToPeer() {
+ // Arbitrarily, we buffer to a third of the total size before sending.
+ if (input_contents_size_ > total_buffer_size_ / 3) {
+ scoped_ptr<ContentVector> xfer_buffer(new ContentVector);
+ xfer_buffer->swap(input_contents_);
+ size_t buffer_size = input_contents_size_;
+ output_size_used_ += input_contents_size_;
+ input_contents_size_ = 0;
+
+ base::TimeDelta additional_source_wait_time; // Constructs to 0.
+ if (!last_non_empty_time_.is_null()) {
+ additional_source_wait_time = base::Time::Now() - last_non_empty_time_;
+ last_non_empty_time_ = base::Time();
+ }
+
+ peer_task_runner_->PostTask(
+ FROM_HERE, base::Bind(
+ &ByteStreamOutputImpl::TransferData,
+ peer_lifetime_flag_,
+ peer_,
+ base::Passed(xfer_buffer.Pass()),
+ buffer_size,
+ false /* Source not complete. */,
+ content::DOWNLOAD_INTERRUPT_REASON_NONE,
+ additional_source_wait_time));
+ }
+}
+
+ByteStreamOutputImpl::ByteStreamOutputImpl(
+ scoped_refptr<base::SequencedTaskRunner> task_runner,
+ scoped_refptr<LifetimeFlag> lifetime_flag,
+ size_t buffer_size)
+ : total_buffer_size_(buffer_size),
+ my_task_runner_(task_runner),
+ my_lifetime_flag_(lifetime_flag),
+ available_contents_size_(0),
+ received_status_(false),
+ status_(content::DOWNLOAD_INTERRUPT_REASON_NONE),
+ unreported_consumed_bytes_(0),
+ peer_(NULL),
+ bytes_read_(0),
+ buffers_read_(0),
+ sink_callbacks_triggered_(0),
+ source_callbacks_triggered_(0) {
+ DCHECK(my_lifetime_flag_.get());
+ my_lifetime_flag_->is_alive_ = true;
+}
+
+ByteStreamOutputImpl::~ByteStreamOutputImpl() {
+ my_lifetime_flag_->is_alive_ = false;
+}
+
+void ByteStreamOutputImpl::SetPeer(
+ ByteStreamInputImpl* peer,
+ scoped_refptr<base::SequencedTaskRunner> peer_task_runner,
+ scoped_refptr<LifetimeFlag> peer_lifetime_flag) {
+ peer_ = peer;
+ peer_task_runner_ = peer_task_runner;
+ peer_lifetime_flag_ = peer_lifetime_flag;
+}
+
+ByteStreamOutputImpl::StreamState
+ByteStreamOutputImpl::GetData(scoped_refptr<net::IOBuffer>* data,
+ size_t* length) {
+ if (available_contents_.size()) {
+ *data = available_contents_.front().first;
+ *length = available_contents_.front().second;
+ available_contents_.pop_front();
+ DCHECK_GE(available_contents_size_, *length);
+ available_contents_size_ -= *length;
+ unreported_consumed_bytes_ += *length;
+ bytes_read_ += *length;
+ buffers_read_ ++;
+ if (available_contents_size_ <= total_buffer_size_ &&
+ available_contents_size_ + *length > total_buffer_size_) {
+ last_non_full_time_ = base::Time::Now();
+ }
+
+ MaybeUpdateInput();
+ return STREAM_HAS_DATA;
+ }
+ if (received_status_) {
+ return STREAM_COMPLETE;
+ }
+ return STREAM_EMPTY;
+}
+
+content::DownloadInterruptReason
+ByteStreamOutputImpl::GetSourceResult() const {
+ DCHECK(received_status_);
+ return status_;
+}
+
+void ByteStreamOutputImpl::RegisterCallback(base::Closure sink_callback) {
+ data_available_callback_ = sink_callback;
+}
+
+size_t ByteStreamOutputImpl::NumSourceCallbacks() const {
+ return source_callbacks_triggered_;
+}
+
+size_t ByteStreamOutputImpl::NumSinkCallbacks() const {
+ return sink_callbacks_triggered_;
+}
+
+size_t ByteStreamOutputImpl::BytesRead() const {
+ return bytes_read_;
+}
+
+size_t ByteStreamOutputImpl::BuffersRead() const {
+ return buffers_read_;
+}
+
+base::TimeDelta ByteStreamOutputImpl::TotalSourceTriggerWaitTime() const {
+ return total_source_trigger_wait_time_;
+}
+
+base::TimeDelta ByteStreamOutputImpl::TotalSinkTriggerWaitTime() const {
+ return total_sink_trigger_wait_time_;
+}
+
+// static
+void ByteStreamOutputImpl::TransferData(
+ scoped_refptr<LifetimeFlag> object_lifetime_flag,
+ ByteStreamOutputImpl* target,
+ scoped_ptr<ContentVector> xfer_buffer,
+ size_t buffer_size,
+ bool source_complete,
+ content::DownloadInterruptReason status,
+ base::TimeDelta additional_sink_trigger_wait_time) {
+ // If our target is no longer alive, do nothing.
+ if (!object_lifetime_flag->is_alive_) return;
+
+ if (xfer_buffer.get()) {
+ target->available_contents_.insert(target->available_contents_.end(),
+ xfer_buffer->begin(),
+ xfer_buffer->end());
+ target->available_contents_size_ += buffer_size;
+ }
+
+ if (source_complete) {
+ target->received_status_ = true;
+ target->status_ = status;
+ }
+
+ target->sink_callbacks_triggered_++;
+ target->total_sink_trigger_wait_time_ += additional_sink_trigger_wait_time;
+
+ if (!target->data_available_callback_.is_null())
+ target->data_available_callback_.Run();
+}
+
+// Decide whether or not to send the input a window update.
+// Currently we do that whenever we've got unreported consumption
+// greater than 1/3 of total size.
+void ByteStreamOutputImpl::MaybeUpdateInput() {
+ if (unreported_consumed_bytes_ > total_buffer_size_ / 3) {
+ source_callbacks_triggered_++;
+ if (!last_non_full_time_.is_null()) {
+ total_source_trigger_wait_time_ +=
+ base::Time::Now() - last_non_full_time_;
+ last_non_full_time_ = base::Time();
+ }
+ peer_task_runner_->PostTask(
+ FROM_HERE, base::Bind(
+ &ByteStreamInputImpl::UpdateWindow,
+ peer_lifetime_flag_,
+ peer_,
+ unreported_consumed_bytes_));
+ unreported_consumed_bytes_ = 0;
+ }
+}
+
+} // namespace
+
+namespace content {
+
+ByteStreamOutput::~ByteStreamOutput() { }
+
+ByteStreamInput::~ByteStreamInput() { }
+
+void CreateByteStream(
+ scoped_ptr<ByteStreamInput>* input,
+ scoped_ptr<ByteStreamOutput>* output,
+ scoped_refptr<base::SequencedTaskRunner> input_task_runner,
+ scoped_refptr<base::SequencedTaskRunner> output_task_runner,
+ size_t buffer_size) {
+ scoped_refptr<LifetimeFlag> input_flag(new LifetimeFlag());
+ scoped_refptr<LifetimeFlag> output_flag(new LifetimeFlag());
+
+ ByteStreamInputImpl* in = new ByteStreamInputImpl(
+ input_task_runner, input_flag, buffer_size);
+ ByteStreamOutputImpl* out = new ByteStreamOutputImpl(
+ output_task_runner, output_flag, buffer_size);
+
+ in->SetPeer(out, output_task_runner, output_flag);
+ out->SetPeer(in, input_task_runner, input_flag);
+ input->reset(in);
+ output->reset(out);
+ return;
+}
+
+} // namespace content

Powered by Google App Engine
This is Rietveld 408576698