Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(84)

Unified Diff: content/browser/download/byte_stream.cc

Issue 10074001: Initial implementation of the ByteStream refactor. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Checkpoint and merge to LKGR. Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « content/browser/download/byte_stream.h ('k') | content/browser/download/byte_stream_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: content/browser/download/byte_stream.cc
diff --git a/content/browser/download/byte_stream.cc b/content/browser/download/byte_stream.cc
new file mode 100644
index 0000000000000000000000000000000000000000..319902c6612f1389cac2a6bc292284bfbc489688
--- /dev/null
+++ b/content/browser/download/byte_stream.cc
@@ -0,0 +1,206 @@
+// Copyright (c) 2012 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "content/browser/download/byte_stream.h"
+
+#include "base/bind.h"
+#include "base/location.h"
+
+namespace content {
+
+ByteStream::ByteStream()
+ : buffer_size_(kDefaultBufferSize),
+ data_size_(0),
+ is_complete_(false),
+ source_status_(DOWNLOAD_INTERRUPT_REASON_NONE),
+ empty_percentage_(0),
+ full_percentage_(0),
+ num_source_callbacks_(0),
+ num_sink_callbacks_(0),
+ bytes_read_(0),
+ buffers_read_(0){ }
+
+void ByteStream::SetBufferSize(size_t buffer_size) {
+ buffer_size_ = buffer_size;
+}
+
+bool ByteStream::AddData(scoped_refptr<net::IOBuffer> buffer,
+ size_t byte_count) {
+ base::Time now(base::Time::Now());
+ base::AutoLock auto_lock(lock_);
+
+ if (data_size_ == 0 && byte_count > 0)
+ last_non_empty_time_ = now;
+
+ if (data_size_ <= (full_percentage_ * buffer_size_) / 100 &&
+ data_size_ + byte_count > (full_percentage_ * buffer_size_) / 100) {
+ // We only want to track time from empty to next signal; anything
+ // beyond that is the sink's problem.
+ if (!last_non_empty_time_.is_null()) {
+ sink_trigger_wait_time_ += (now - last_non_empty_time_);
+ last_non_empty_time_ = base::Time();
+ }
+ if (sink_task_runner_.get() != NULL) {
+ // Nothing that actually touches the data on this object can be
+ // executed until we drop the lock, so it's ok to
+ // dispatch this before we actually add the data.
+ sink_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&content::ByteStream::RunSinkCallback,
+ this, sink_task_runner_));
+ num_sink_callbacks_++;
+ }
+ }
+
+ // Take manual (not enforced by compiler via scoped_*) ownership of data.
+ contents_.push_back(std::make_pair(buffer, byte_count));
+ data_size_ += byte_count;
+ return (data_size_ <= buffer_size_);
+}
+
+void ByteStream::SourceComplete(DownloadInterruptReason status) {
+ base::AutoLock auto_lock(lock_);
+
+ is_complete_ = true;
+ source_status_ = status;
+ // If data_size_ is over our full percentage, a callback has already
+ // been posted.
+ if (data_size_ <= (full_percentage_ * buffer_size_) / 100 &&
+ sink_task_runner_.get() != NULL) {
+ sink_task_runner_->PostTask(
+ FROM_HERE, base::Bind(&content::ByteStream::RunSinkCallback,
+ this, sink_task_runner_));
+ }
+}
+
+bool ByteStream::IsFull() const {
+ base::AutoLock auto_lock(lock_);
+
+ return (data_size_ > buffer_size_);
+}
+
+void ByteStream::RegisterSourceCallback(
+ scoped_refptr<base::TaskRunner> source_task_runner,
+ ByteStreamCallback source_callback,
+ int empty_percentage) {
+ DCHECK_LE(0, empty_percentage);
+ DCHECK_GE(100, empty_percentage);
+
+ base::AutoLock auto_lock(lock_);
+
+ source_task_runner_ = source_task_runner;
+ source_callback_ = source_callback;
+ empty_percentage_ = empty_percentage;
+}
+
+ByteStream::StreamState ByteStream::GetData(scoped_refptr<net::IOBuffer>* data,
+ size_t* length) {
+ base::Time now(base::Time::Now());
+ base::AutoLock auto_lock(lock_);
+
+ if (contents_.empty()) {
+ if (is_complete_)
+ return STREAM_COMPLETE;
+ return STREAM_EMPTY;
+ }
+
+ if (data_size_ >= buffer_size_ &&
+ data_size_ - contents_.front().second < buffer_size_)
+ last_non_full_time_ = now;
+
+ size_t byte_boundary = (buffer_size_ * (100 - empty_percentage_)) / 100;
+ if (data_size_ > byte_boundary &&
+ data_size_ - contents_.front().second <= byte_boundary) {
+ if (!last_non_full_time_.is_null()) {
+ source_trigger_wait_time_ += (now - last_non_full_time_);
+ last_non_full_time_ = base::Time();
+ }
+ if (source_task_runner_.get() != NULL) {
+ // Nothing that actually touches the data on this object can be
+ // executed until we drop the lock, so it's ok to
+ // dispatch this before we actually take the data.
+ source_task_runner_->PostTask(FROM_HERE, base::Bind(
+ &content::ByteStream::RunSourceCallback, this, source_task_runner_));
+ num_source_callbacks_++;
+ }
+ }
+
+ *data = contents_.front().first;
+ *length = contents_.front().second;
+ contents_.pop_front();
+ data_size_ -= *length;
+ bytes_read_ += *length;
+ buffers_read_++;
+ return STREAM_HAS_DATA;
+}
+
+DownloadInterruptReason ByteStream::GetSourceResult () const {
+ base::AutoLock auto_lock(lock_);
+
+ DCHECK(is_complete_);
+
+ return source_status_;
+}
+
+void ByteStream::RegisterSinkCallback(
+ scoped_refptr<base::TaskRunner> sink_task_runner,
+ ByteStreamCallback sink_callback,
+ int full_percentage) {
+ DCHECK_LE(0, full_percentage);
+ DCHECK_GE(100, full_percentage);
+
+ base::AutoLock auto_lock(lock_);
+
+ sink_task_runner_ = sink_task_runner;
+ sink_callback_ = sink_callback;
+ full_percentage_ = full_percentage;
+}
+
+ByteStream::~ByteStream() {
+}
+
+void ByteStream::RunSourceCallback(
+ scoped_refptr<base::TaskRunner> target_runner) {
+ base::Closure callback;
+ {
+ base::AutoLock auto_lock(lock_);
+
+ // If the target_runner has been updated, that implies an access to
+ // the class, so we can drop this request on the floor. (The
+ // alternative would be to forward it on to the new task runner.)
+ if (target_runner.get() != source_task_runner_.get())
+ return;
+
+ if (source_callback_.is_null())
+ return;
+
+ callback = source_callback_;
+ }
+
+ // Run unlocked to allow caller to call back into us.
+ callback.Run();
+}
+
+void ByteStream::RunSinkCallback(
+ scoped_refptr<base::TaskRunner> target_runner) {
+ base::Closure callback;
+ {
+ base::AutoLock auto_lock(lock_);
+
+ // If the target_runner has been updated, that implies an access to
+ // the class, so we can drop this request on the floor. (The
+ // alternative would be to forward it on to the new task runner.)
+ if (target_runner.get() != sink_task_runner_.get())
+ return;
+
+ if (sink_callback_.is_null())
+ return;
+
+ callback = sink_callback_;
+ }
+
+ // Run unlocked to allow caller to call back into us.
+ callback.Run();
+}
+
+} // namespace content
« no previous file with comments | « content/browser/download/byte_stream.h ('k') | content/browser/download/byte_stream_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698