Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(45)

Unified Diff: content/browser/download/download_file_impl.cc

Issue 10392111: Use ByteStream in downloads system to decouple source and sink. (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/src
Patch Set: Sync'd to LKGR. Created 8 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « content/browser/download/download_file_impl.h ('k') | content/browser/download/download_file_manager.h » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: content/browser/download/download_file_impl.cc
diff --git a/content/browser/download/download_file_impl.cc b/content/browser/download/download_file_impl.cc
index ae8d4ec4395ddb88e5b5c1875f0c9b247055c5c5..ee588b8ed33da305641c15d46ac132de958703fe 100644
--- a/content/browser/download/download_file_impl.cc
+++ b/content/browser/download/download_file_impl.cc
@@ -6,20 +6,30 @@
#include <string>
+#include "base/bind.h"
#include "base/file_util.h"
+#include "base/message_loop_proxy.h"
+#include "base/time.h"
+#include "content/browser/download/byte_stream.h"
#include "content/browser/download/download_create_info.h"
+#include "content/browser/download/download_interrupt_reasons_impl.h"
+#include "content/browser/download/download_net_log_parameters.h"
#include "content/browser/power_save_blocker.h"
#include "content/public/browser/browser_thread.h"
#include "content/public/browser/download_manager.h"
+#include "content/browser/download/download_stats.h"
+#include "net/base/io_buffer.h"
using content::BrowserThread;
using content::DownloadId;
using content::DownloadManager;
const int kUpdatePeriodMs = 500;
+const int kMaxTimeBlockingFileThreadMs = 1000;
DownloadFileImpl::DownloadFileImpl(
const DownloadCreateInfo* info,
+ scoped_ptr<content::ByteStreamReader> stream,
DownloadRequestHandleInterface* request_handle,
DownloadManager* download_manager,
bool calculate_hash,
@@ -33,21 +43,35 @@ DownloadFileImpl::DownloadFileImpl(
info->save_info.hash_state,
info->save_info.file_stream,
bound_net_log),
+ stream_reader_(stream.Pass()),
id_(info->download_id),
request_handle_(request_handle),
download_manager_(download_manager),
+ bytes_seen_(0),
+ bound_net_log_(bound_net_log),
+ weak_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
power_save_blocker_(power_save_blocker.Pass()) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::FILE));
}
DownloadFileImpl::~DownloadFileImpl() {
- DCHECK(BrowserThread::CurrentlyOn(BrowserThread::FILE));
}
// BaseFile delegated functions.
net::Error DownloadFileImpl::Initialize() {
update_timer_.reset(new base::RepeatingTimer<DownloadFileImpl>());
- return file_.Initialize();
+ net::Error result = file_.Initialize();
+ if (result != net::OK)
+ return result;
+
+ stream_reader_->RegisterCallback(
+ base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr()));
+
+ download_start_ = base::TimeTicks::Now();
+ // Initial pull from the straw.
+ StreamActive();
+
+ return result;
}
net::Error DownloadFileImpl::AppendDataToFile(const char* data,
@@ -72,11 +96,6 @@ void DownloadFileImpl::Cancel() {
file_.Cancel();
}
-void DownloadFileImpl::Finish() {
- file_.Finish();
- update_timer_.reset();
-}
-
void DownloadFileImpl::AnnotateWithSourceInformation() {
file_.AnnotateWithSourceInformation();
}
@@ -135,6 +154,114 @@ std::string DownloadFileImpl::DebugString() const {
file_.DebugString().c_str());
}
+void DownloadFileImpl::StreamActive() {
+ base::TimeTicks start(base::TimeTicks::Now());
+ base::TimeTicks now;
+ scoped_refptr<net::IOBuffer> incoming_data;
+ size_t incoming_data_size = 0;
+ size_t total_incoming_data_size = 0;
+ size_t num_buffers = 0;
+ content::ByteStreamReader::StreamState state(
+ content::ByteStreamReader::STREAM_EMPTY);
+ content::DownloadInterruptReason reason =
+ content::DOWNLOAD_INTERRUPT_REASON_NONE;
+ base::TimeDelta delta(
+ base::TimeDelta::FromMilliseconds(kMaxTimeBlockingFileThreadMs));
+
+ // Take care of any file local activity required.
+ do {
+ state = stream_reader_->Read(&incoming_data, &incoming_data_size);
+
+ net::Error result = net::OK;
+ switch (state) {
+ case content::ByteStreamReader::STREAM_EMPTY:
+ break;
+ case content::ByteStreamReader::STREAM_HAS_DATA:
+ {
+ ++num_buffers;
+ base::TimeTicks write_start(base::TimeTicks::Now());
+ result = AppendDataToFile(
+ incoming_data.get()->data(), incoming_data_size);
+ disk_writes_time_ += (base::TimeTicks::Now() - write_start);
+ total_incoming_data_size += incoming_data_size;
+ reason = content::ConvertNetErrorToInterruptReason(
+ result, content::DOWNLOAD_INTERRUPT_FROM_DISK);
+ }
+ break;
+ case content::ByteStreamReader::STREAM_COMPLETE:
+ {
+ reason = stream_reader_->GetStatus();
+ SendUpdate();
+ base::TimeTicks close_start(base::TimeTicks::Now());
+ file_.Finish();
+ base::TimeTicks now(base::TimeTicks::Now());
+ disk_writes_time_ += (now - close_start);
+ download_stats::RecordFileBandwidth(
+ bytes_seen_, disk_writes_time_, now - download_start_);
+ update_timer_.reset();
+ }
+ break;
+ default:
+ NOTREACHED();
+ break;
+ }
+ now = base::TimeTicks::Now();
+ } while (state == content::ByteStreamReader::STREAM_HAS_DATA &&
+ reason == content::DOWNLOAD_INTERRUPT_REASON_NONE &&
+ now - start <= delta);
+
+ // If we're stopping to yield the thread, post a task so we come back.
+ if (state == content::ByteStreamReader::STREAM_HAS_DATA &&
+ now - start > delta) {
+ BrowserThread::PostTask(
+ BrowserThread::FILE, FROM_HERE,
+ base::Bind(&DownloadFileImpl::StreamActive,
+ weak_factory_.GetWeakPtr()));
+ }
+
+ if (total_incoming_data_size)
+ download_stats::RecordFileThreadReceiveBuffers(num_buffers);
+ bytes_seen_ += total_incoming_data_size;
+
+ download_stats::RecordContiguousWriteTime(now - start);
+
+ // Take care of communication with our controller.
+ if (reason != content::DOWNLOAD_INTERRUPT_REASON_NONE) {
+ // Error case for both upstream source and file write.
+ // Shut down processing and signal an error to our controller.
+ // Our controller will clean us up.
+ stream_reader_->RegisterCallback(base::Closure());
+ weak_factory_.InvalidateWeakPtrs();
+ if (download_manager_.get()) {
+ BrowserThread::PostTask(
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadManager::OnDownloadInterrupted,
+ download_manager_, id_.local(),
+ BytesSoFar(), GetHashState(), reason));
+ }
+ } else if (state == content::ByteStreamReader::STREAM_COMPLETE) {
+ // Signal successful completion and shut down processing.
+ stream_reader_->RegisterCallback(base::Closure());
+ weak_factory_.InvalidateWeakPtrs();
+ if (download_manager_.get()) {
+ std::string hash;
+ if (!GetHash(&hash) || file_.IsEmptyHash(hash))
+ hash.clear();
+ BrowserThread::PostTask(
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadManager::OnResponseCompleted,
+ download_manager_, id_.local(),
+ BytesSoFar(), hash));
+ }
+ }
+ if (bound_net_log_.IsLoggingAllEvents()) {
+ bound_net_log_.AddEvent(
+ net::NetLog::TYPE_DOWNLOAD_STREAM_DRAINED,
+ make_scoped_refptr(new download_net_logs::FileStreamDrainedParameters(
+ total_incoming_data_size, num_buffers)));
+ }
+}
+
void DownloadFileImpl::SendUpdate() {
if (download_manager_.get()) {
BrowserThread::PostTask(
« no previous file with comments | « content/browser/download/download_file_impl.h ('k') | content/browser/download/download_file_manager.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698