Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(299)

Unified Diff: content/browser/download/download_file_impl.cc

Issue 2712713007: Make DownloadFileImpl handle multiple byte streams. (Closed)
Patch Set: Export the new class for linking on windows. Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: content/browser/download/download_file_impl.cc
diff --git a/content/browser/download/download_file_impl.cc b/content/browser/download/download_file_impl.cc
index 80b5d1fb96a47c4e616debbbaad822013346cdb1..222ebc72d10e8f12ebf60a219148f8f357b1f32a 100644
--- a/content/browser/download/download_file_impl.cc
+++ b/content/browser/download/download_file_impl.cc
@@ -9,6 +9,7 @@
#include "base/bind.h"
#include "base/files/file_util.h"
+#include "base/memory/ptr_util.h"
#include "base/strings/stringprintf.h"
#include "base/time/time.h"
#include "base/values.h"
@@ -40,11 +41,27 @@ const int kInitialRenameRetryDelayMs = 200;
// Number of times a failing rename is retried before giving up.
const int kMaxRenameRetries = 3;
+DownloadFileImpl::SourceStream::SourceStream(int64_t offset,
+ int64_t bytes_received)
+ : offset_(offset), bytes_received_(bytes_received), finished_(false) {}
+
+DownloadFileImpl::SourceStream::~SourceStream() = default;
+
+void DownloadFileImpl::SourceStream::SetByteStream(
+ std::unique_ptr<ByteStreamReader> stream_reader) {
+ stream_reader_ = std::move(stream_reader);
+}
+
+void DownloadFileImpl::SourceStream::OnWriteBytesToDisk(int64_t bytes_write) {
+ bytes_received_ += bytes_write;
+}
+
DownloadFileImpl::DownloadFileImpl(
std::unique_ptr<DownloadSaveInfo> save_info,
const base::FilePath& default_download_directory,
- std::unique_ptr<ByteStreamReader> stream,
+ std::unique_ptr<ByteStreamReader> stream_reader,
const net::NetLogWithSource& download_item_net_log,
+ bool is_sparse_file,
base::WeakPtr<DownloadDestinationObserver> observer)
: net_log_(
net::NetLogWithSource::Make(download_item_net_log.net_log(),
@@ -52,10 +69,15 @@ DownloadFileImpl::DownloadFileImpl(
file_(net_log_),
save_info_(std::move(save_info)),
default_download_directory_(default_download_directory),
- stream_reader_(std::move(stream)),
+ is_sparse_file_(is_sparse_file),
bytes_seen_(0),
observer_(observer),
weak_factory_(this) {
+ source_streams_[save_info_->offset] =
+ base::MakeUnique<SourceStream>(save_info_->offset, 0);
+ DCHECK(source_streams_.size() == static_cast<size_t>(1));
+ source_streams_.begin()->second->SetByteStream(std::move(stream_reader));
+
download_item_net_log.AddEvent(
net::NetLogEventType::DOWNLOAD_FILE_CREATED,
net_log_.source().ToEventParametersCallback());
@@ -74,48 +96,65 @@ void DownloadFileImpl::Initialize(const InitializeCallback& callback) {
update_timer_.reset(new base::RepeatingTimer());
DownloadInterruptReason result =
- file_.Initialize(save_info_->file_path,
- default_download_directory_,
- std::move(save_info_->file),
- save_info_->offset,
+ file_.Initialize(save_info_->file_path, default_download_directory_,
+ std::move(save_info_->file), save_info_->offset,
save_info_->hash_of_partial_file,
- std::move(save_info_->hash_state),
- false);
+ std::move(save_info_->hash_state), is_sparse_file_);
if (result != DOWNLOAD_INTERRUPT_REASON_NONE) {
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE, base::Bind(callback, result));
return;
}
- stream_reader_->RegisterCallback(
- base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr()));
-
download_start_ = base::TimeTicks::Now();
// Primarily to make reset to zero in restart visible to owner.
SendUpdate();
// Initial pull from the straw.
- StreamActive();
+ for (auto& source_stream : source_streams_) {
qinmin 2017/02/27 18:55:29 no {} needed
xingliu 2017/02/28 00:57:06 Done.
+ RegisterAndActivateStream(source_stream.second.get());
+ }
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE, base::Bind(
callback, DOWNLOAD_INTERRUPT_REASON_NONE));
}
+void DownloadFileImpl::AddByteStream(
+ std::unique_ptr<ByteStreamReader> stream_reader,
+ int64_t offset) {
+ // |source_streams_| is not thread safe, must be modified on the same thread.
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ // The |source_streams_| must have an existing entry for the stream reader.
+ auto current_source_stream = source_streams_.find(offset);
+ DCHECK(current_source_stream != source_streams_.end());
+ SourceStream* stream = current_source_stream->second.get();
+ stream->SetByteStream(std::move(stream_reader));
+
+ // Start to pull data from the stream.
+ BrowserThread::PostTask(
+ BrowserThread::FILE, FROM_HERE,
+ base::Bind(&DownloadFileImpl::RegisterAndActivateStream,
+ weak_factory_.GetWeakPtr(), stream));
+}
+
DownloadInterruptReason DownloadFileImpl::AppendDataToFile(
const char* data, size_t data_len) {
DCHECK_CURRENTLY_ON(BrowserThread::FILE);
-
- if (!update_timer_->IsRunning()) {
- update_timer_->Start(FROM_HERE,
- base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
- this, &DownloadFileImpl::SendUpdate);
- }
- rate_estimator_.Increment(data_len);
+ WillWriteToDisk(data_len);
return file_.AppendDataToFile(data, data_len);
}
+DownloadInterruptReason DownloadFileImpl::WriteDataToFile(int64_t offset,
+ const char* data,
+ size_t data_len) {
+ DCHECK_CURRENTLY_ON(BrowserThread::FILE);
+ WillWriteToDisk(data_len);
+ return file_.WriteDataToFile(offset, data, data_len);
+}
+
void DownloadFileImpl::RenameAndUniquify(
const base::FilePath& full_path,
const RenameCompletionCallback& callback) {
@@ -213,7 +252,11 @@ void DownloadFileImpl::RenameWithRetryInternal(
SendUpdate();
// Null out callback so that we don't do any more stream processing.
- stream_reader_->RegisterCallback(base::Closure());
+ for (auto& stream : source_streams_) {
+ ByteStreamReader* stream_reader = stream.second->stream_reader();
+ if (stream_reader)
qinmin 2017/02/27 18:55:29 no {}
xingliu 2017/02/28 00:57:06 Done. Kept {} here since there are multiple lines.
+ stream_reader->RegisterCallback(base::Closure());
+ }
new_path.clear();
}
@@ -240,7 +283,8 @@ bool DownloadFileImpl::InProgress() const {
return file_.in_progress();
}
-void DownloadFileImpl::StreamActive() {
+void DownloadFileImpl::StreamActive(SourceStream* source_stream) {
+ DCHECK(source_stream->stream_reader());
base::TimeTicks start(base::TimeTicks::Now());
base::TimeTicks now;
scoped_refptr<net::IOBuffer> incoming_data;
@@ -254,7 +298,8 @@ void DownloadFileImpl::StreamActive() {
// Take care of any file local activity required.
do {
- state = stream_reader_->Read(&incoming_data, &incoming_data_size);
+ state = source_stream->stream_reader()->Read(&incoming_data,
+ &incoming_data_size);
switch (state) {
case ByteStreamReader::STREAM_EMPTY:
@@ -263,24 +308,26 @@ void DownloadFileImpl::StreamActive() {
{
++num_buffers;
base::TimeTicks write_start(base::TimeTicks::Now());
- reason = AppendDataToFile(
- incoming_data.get()->data(), incoming_data_size);
+ if (is_sparse_file_) {
+ reason = WriteDataToFile(
+ source_stream->offset() + source_stream->bytes_received(),
+ incoming_data.get()->data(), incoming_data_size);
+ } else {
+ reason = AppendDataToFile(incoming_data.get()->data(),
+ incoming_data_size);
+ }
disk_writes_time_ += (base::TimeTicks::Now() - write_start);
bytes_seen_ += incoming_data_size;
total_incoming_data_size += incoming_data_size;
+ if (reason == DOWNLOAD_INTERRUPT_REASON_NONE)
+ source_stream->OnWriteBytesToDisk(incoming_data_size);
}
break;
case ByteStreamReader::STREAM_COMPLETE:
{
- reason = static_cast<DownloadInterruptReason>(
- stream_reader_->GetStatus());
- SendUpdate();
- base::TimeTicks close_start(base::TimeTicks::Now());
- base::TimeTicks now(base::TimeTicks::Now());
- disk_writes_time_ += (now - close_start);
- RecordFileBandwidth(
- bytes_seen_, disk_writes_time_, now - download_start_);
- update_timer_.reset();
+ reason = static_cast<DownloadInterruptReason>(
qinmin 2017/02/27 18:55:29 fix the indentations
xingliu 2017/02/28 00:57:06 Done.
+ source_stream->stream_reader()->GetStatus());
+ SendUpdate();
}
break;
default:
@@ -297,8 +344,8 @@ void DownloadFileImpl::StreamActive() {
now - start > delta) {
BrowserThread::PostTask(
BrowserThread::FILE, FROM_HERE,
- base::Bind(&DownloadFileImpl::StreamActive,
- weak_factory_.GetWeakPtr()));
+ base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
+ source_stream));
}
if (total_incoming_data_size)
@@ -308,34 +355,50 @@ void DownloadFileImpl::StreamActive() {
// Take care of communication with our observer.
if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
+ if (state == ByteStreamReader::STREAM_COMPLETE) {
+ RecordFileBandwidth(bytes_seen_, disk_writes_time_,
+ base::TimeTicks::Now() - download_start_);
+ }
// Error case for both upstream source and file write.
// Shut down processing and signal an error to our observer.
// Our observer will clean us up.
- stream_reader_->RegisterCallback(base::Closure());
+ source_stream->stream_reader()->RegisterCallback(base::Closure());
weak_factory_.InvalidateWeakPtrs();
SendUpdate(); // Make info up to date before error.
std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
BrowserThread::PostTask(
- BrowserThread::UI,
- FROM_HERE,
- base::Bind(&DownloadDestinationObserver::DestinationError,
- observer_,
- reason,
- file_.bytes_so_far(),
- base::Passed(&hash_state)));
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadDestinationObserver::DestinationError, observer_,
+ reason, TotalBytesReceived(), base::Passed(&hash_state)));
} else if (state == ByteStreamReader::STREAM_COMPLETE) {
- // Signal successful completion and shut down processing.
- stream_reader_->RegisterCallback(base::Closure());
- weak_factory_.InvalidateWeakPtrs();
+ // Signal successful completion of the current stream.
+ source_stream->stream_reader()->RegisterCallback(base::Closure());
+ source_stream->set_finished(true);
+
+ // Inform observers.
SendUpdate();
- std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
- BrowserThread::PostTask(
- BrowserThread::UI,
- FROM_HERE,
- base::Bind(&DownloadDestinationObserver::DestinationCompleted,
- observer_,
- file_.bytes_so_far(),
- base::Passed(&hash_state)));
+
+ bool all_stream_complete = true;
+ for (auto& stream : source_streams_) {
+ if (!stream.second->is_finished()) {
+ all_stream_complete = false;
+ break;
+ }
+ }
+
+ // All the stream reader are completed, shut down file IO processing.
+ if (all_stream_complete) {
+ RecordFileBandwidth(bytes_seen_, disk_writes_time_,
+ base::TimeTicks::Now() - download_start_);
+ weak_factory_.InvalidateWeakPtrs();
+ std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
+ update_timer_.reset();
+ BrowserThread::PostTask(
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadDestinationObserver::DestinationCompleted,
+ observer_, TotalBytesReceived(),
+ base::Passed(&hash_state)));
+ }
}
if (net_log_.IsCapturing()) {
net_log_.AddEvent(net::NetLogEventType::DOWNLOAD_STREAM_DRAINED,
@@ -344,14 +407,45 @@ void DownloadFileImpl::StreamActive() {
}
}
+void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
+ DCHECK_CURRENTLY_ON(BrowserThread::FILE);
+ ByteStreamReader* stream_reader = source_stream->stream_reader();
+ if (stream_reader) {
+ source_stream->stream_reader()->RegisterCallback(
+ base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
+ source_stream));
+ StreamActive(source_stream);
+ }
+}
+
+int64_t DownloadFileImpl::TotalBytesReceived() const {
+ if (!is_sparse_file_)
+ return file_.bytes_so_far();
+
+ // Accumulate all valid bytes from all streams.
+ int64_t total_received = 0;
+ for (auto& stream : source_streams_) {
qinmin 2017/02/27 18:55:29 no {} meeded
xingliu 2017/02/28 00:57:06 Done.
+ total_received += stream.second->bytes_received();
+ }
+
+ return total_received;
+}
+
void DownloadFileImpl::SendUpdate() {
+ // TODO(xingliu): Update slice info to observer to update history db.
BrowserThread::PostTask(
- BrowserThread::UI,
- FROM_HERE,
- base::Bind(&DownloadDestinationObserver::DestinationUpdate,
- observer_,
- file_.bytes_so_far(),
- rate_estimator_.GetCountPerSecond()));
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadDestinationObserver::DestinationUpdate, observer_,
+ TotalBytesReceived(), rate_estimator_.GetCountPerSecond()));
+}
+
+void DownloadFileImpl::WillWriteToDisk(size_t data_len) {
+ if (!update_timer_->IsRunning()) {
+ update_timer_->Start(FROM_HERE,
+ base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
+ this, &DownloadFileImpl::SendUpdate);
+ }
+ rate_estimator_.Increment(data_len);
}
DownloadFileImpl::RenameParameters::RenameParameters(

Powered by Google App Engine
This is Rietveld 408576698