| Index: content/browser/download/download_file_impl.cc
|
| diff --git a/content/browser/download/download_file_impl.cc b/content/browser/download/download_file_impl.cc
|
| index 80b5d1fb96a47c4e616debbbaad822013346cdb1..5c063f48101e59878fee2f89253edd13612493b6 100644
|
| --- a/content/browser/download/download_file_impl.cc
|
| +++ b/content/browser/download/download_file_impl.cc
|
| @@ -9,6 +9,7 @@
|
|
|
| #include "base/bind.h"
|
| #include "base/files/file_util.h"
|
| +#include "base/memory/ptr_util.h"
|
| #include "base/strings/stringprintf.h"
|
| #include "base/time/time.h"
|
| #include "base/values.h"
|
| @@ -40,11 +41,27 @@ const int kInitialRenameRetryDelayMs = 200;
|
| // Number of times a failing rename is retried before giving up.
|
| const int kMaxRenameRetries = 3;
|
|
|
| +DownloadFileImpl::SourceStream::SourceStream(int64_t offset,
|
| + int64_t bytes_written)
|
| + : offset_(offset), bytes_written_(bytes_written), finished_(false) {}
|
| +
|
| +DownloadFileImpl::SourceStream::~SourceStream() = default;
|
| +
|
| +void DownloadFileImpl::SourceStream::SetByteStream(
|
| + std::unique_ptr<ByteStreamReader> stream_reader) {
|
| + stream_reader_ = std::move(stream_reader);
|
| +}
|
| +
|
| +void DownloadFileImpl::SourceStream::OnWriteBytesToDisk(int64_t bytes_write) {
|
| + bytes_written_ += bytes_write;
|
| +}
|
| +
|
| DownloadFileImpl::DownloadFileImpl(
|
| std::unique_ptr<DownloadSaveInfo> save_info,
|
| const base::FilePath& default_download_directory,
|
| - std::unique_ptr<ByteStreamReader> stream,
|
| + std::unique_ptr<ByteStreamReader> stream_reader,
|
| const net::NetLogWithSource& download_item_net_log,
|
| + bool is_sparse_file,
|
| base::WeakPtr<DownloadDestinationObserver> observer)
|
| : net_log_(
|
| net::NetLogWithSource::Make(download_item_net_log.net_log(),
|
| @@ -52,10 +69,15 @@ DownloadFileImpl::DownloadFileImpl(
|
| file_(net_log_),
|
| save_info_(std::move(save_info)),
|
| default_download_directory_(default_download_directory),
|
| - stream_reader_(std::move(stream)),
|
| + is_sparse_file_(is_sparse_file),
|
| bytes_seen_(0),
|
| observer_(observer),
|
| weak_factory_(this) {
|
| + source_streams_[save_info_->offset] =
|
| + base::MakeUnique<SourceStream>(save_info_->offset, 0);
|
| + DCHECK(source_streams_.size() == static_cast<size_t>(1));
|
| + source_streams_.begin()->second->SetByteStream(std::move(stream_reader));
|
| +
|
| download_item_net_log.AddEvent(
|
| net::NetLogEventType::DOWNLOAD_FILE_CREATED,
|
| net_log_.source().ToEventParametersCallback());
|
| @@ -74,48 +96,64 @@ void DownloadFileImpl::Initialize(const InitializeCallback& callback) {
|
|
|
| update_timer_.reset(new base::RepeatingTimer());
|
| DownloadInterruptReason result =
|
| - file_.Initialize(save_info_->file_path,
|
| - default_download_directory_,
|
| - std::move(save_info_->file),
|
| - save_info_->offset,
|
| + file_.Initialize(save_info_->file_path, default_download_directory_,
|
| + std::move(save_info_->file), save_info_->offset,
|
| save_info_->hash_of_partial_file,
|
| - std::move(save_info_->hash_state),
|
| - false);
|
| + std::move(save_info_->hash_state), is_sparse_file_);
|
| if (result != DOWNLOAD_INTERRUPT_REASON_NONE) {
|
| BrowserThread::PostTask(
|
| BrowserThread::UI, FROM_HERE, base::Bind(callback, result));
|
| return;
|
| }
|
|
|
| - stream_reader_->RegisterCallback(
|
| - base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr()));
|
| -
|
| download_start_ = base::TimeTicks::Now();
|
|
|
| // Primarily to make reset to zero in restart visible to owner.
|
| SendUpdate();
|
|
|
| // Initial pull from the straw.
|
| - StreamActive();
|
| + for (auto& source_stream : source_streams_)
|
| + RegisterAndActivateStream(source_stream.second.get());
|
|
|
| BrowserThread::PostTask(
|
| BrowserThread::UI, FROM_HERE, base::Bind(
|
| callback, DOWNLOAD_INTERRUPT_REASON_NONE));
|
| }
|
|
|
| +void DownloadFileImpl::AddByteStream(
|
| + std::unique_ptr<ByteStreamReader> stream_reader,
|
| + int64_t offset) {
|
| + // |source_streams_| is not thread safe, must be modified on the same thread.
|
| + DCHECK(thread_checker_.CalledOnValidThread());
|
| +
|
| + // The |source_streams_| must have an existing entry for the stream reader.
|
| + auto current_source_stream = source_streams_.find(offset);
|
| + DCHECK(current_source_stream != source_streams_.end());
|
| + SourceStream* stream = current_source_stream->second.get();
|
| + stream->SetByteStream(std::move(stream_reader));
|
| +
|
| + // Start to pull data from the stream.
|
| + BrowserThread::PostTask(
|
| + BrowserThread::FILE, FROM_HERE,
|
| + base::Bind(&DownloadFileImpl::RegisterAndActivateStream,
|
| + weak_factory_.GetWeakPtr(), stream));
|
| +}
|
| +
|
| DownloadInterruptReason DownloadFileImpl::AppendDataToFile(
|
| const char* data, size_t data_len) {
|
| DCHECK_CURRENTLY_ON(BrowserThread::FILE);
|
| -
|
| - if (!update_timer_->IsRunning()) {
|
| - update_timer_->Start(FROM_HERE,
|
| - base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
|
| - this, &DownloadFileImpl::SendUpdate);
|
| - }
|
| - rate_estimator_.Increment(data_len);
|
| + WillWriteToDisk(data_len);
|
| return file_.AppendDataToFile(data, data_len);
|
| }
|
|
|
| +DownloadInterruptReason DownloadFileImpl::WriteDataToFile(int64_t offset,
|
| + const char* data,
|
| + size_t data_len) {
|
| + DCHECK_CURRENTLY_ON(BrowserThread::FILE);
|
| + WillWriteToDisk(data_len);
|
| + return file_.WriteDataToFile(offset, data, data_len);
|
| +}
|
| +
|
| void DownloadFileImpl::RenameAndUniquify(
|
| const base::FilePath& full_path,
|
| const RenameCompletionCallback& callback) {
|
| @@ -213,7 +251,11 @@ void DownloadFileImpl::RenameWithRetryInternal(
|
| SendUpdate();
|
|
|
| // Null out callback so that we don't do any more stream processing.
|
| - stream_reader_->RegisterCallback(base::Closure());
|
| + for (auto& stream : source_streams_) {
|
| + ByteStreamReader* stream_reader = stream.second->stream_reader();
|
| + if (stream_reader)
|
| + stream_reader->RegisterCallback(base::Closure());
|
| + }
|
|
|
| new_path.clear();
|
| }
|
| @@ -240,7 +282,8 @@ bool DownloadFileImpl::InProgress() const {
|
| return file_.in_progress();
|
| }
|
|
|
| -void DownloadFileImpl::StreamActive() {
|
| +void DownloadFileImpl::StreamActive(SourceStream* source_stream) {
|
| + DCHECK(source_stream->stream_reader());
|
| base::TimeTicks start(base::TimeTicks::Now());
|
| base::TimeTicks now;
|
| scoped_refptr<net::IOBuffer> incoming_data;
|
| @@ -254,7 +297,8 @@ void DownloadFileImpl::StreamActive() {
|
|
|
| // Take care of any file local activity required.
|
| do {
|
| - state = stream_reader_->Read(&incoming_data, &incoming_data_size);
|
| + state = source_stream->stream_reader()->Read(&incoming_data,
|
| + &incoming_data_size);
|
|
|
| switch (state) {
|
| case ByteStreamReader::STREAM_EMPTY:
|
| @@ -263,24 +307,26 @@ void DownloadFileImpl::StreamActive() {
|
| {
|
| ++num_buffers;
|
| base::TimeTicks write_start(base::TimeTicks::Now());
|
| - reason = AppendDataToFile(
|
| - incoming_data.get()->data(), incoming_data_size);
|
| + if (is_sparse_file_) {
|
| + reason = WriteDataToFile(
|
| + source_stream->offset() + source_stream->bytes_written(),
|
| + incoming_data.get()->data(), incoming_data_size);
|
| + } else {
|
| + reason = AppendDataToFile(incoming_data.get()->data(),
|
| + incoming_data_size);
|
| + }
|
| disk_writes_time_ += (base::TimeTicks::Now() - write_start);
|
| bytes_seen_ += incoming_data_size;
|
| total_incoming_data_size += incoming_data_size;
|
| + if (reason == DOWNLOAD_INTERRUPT_REASON_NONE)
|
| + source_stream->OnWriteBytesToDisk(incoming_data_size);
|
| }
|
| break;
|
| case ByteStreamReader::STREAM_COMPLETE:
|
| {
|
| reason = static_cast<DownloadInterruptReason>(
|
| - stream_reader_->GetStatus());
|
| + source_stream->stream_reader()->GetStatus());
|
| SendUpdate();
|
| - base::TimeTicks close_start(base::TimeTicks::Now());
|
| - base::TimeTicks now(base::TimeTicks::Now());
|
| - disk_writes_time_ += (now - close_start);
|
| - RecordFileBandwidth(
|
| - bytes_seen_, disk_writes_time_, now - download_start_);
|
| - update_timer_.reset();
|
| }
|
| break;
|
| default:
|
| @@ -297,8 +343,8 @@ void DownloadFileImpl::StreamActive() {
|
| now - start > delta) {
|
| BrowserThread::PostTask(
|
| BrowserThread::FILE, FROM_HERE,
|
| - base::Bind(&DownloadFileImpl::StreamActive,
|
| - weak_factory_.GetWeakPtr()));
|
| + base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
|
| + source_stream));
|
| }
|
|
|
| if (total_incoming_data_size)
|
| @@ -308,34 +354,50 @@ void DownloadFileImpl::StreamActive() {
|
|
|
| // Take care of communication with our observer.
|
| if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
|
| + if (state == ByteStreamReader::STREAM_COMPLETE) {
|
| + RecordFileBandwidth(bytes_seen_, disk_writes_time_,
|
| + base::TimeTicks::Now() - download_start_);
|
| + }
|
| // Error case for both upstream source and file write.
|
| // Shut down processing and signal an error to our observer.
|
| // Our observer will clean us up.
|
| - stream_reader_->RegisterCallback(base::Closure());
|
| + source_stream->stream_reader()->RegisterCallback(base::Closure());
|
| weak_factory_.InvalidateWeakPtrs();
|
| SendUpdate(); // Make info up to date before error.
|
| std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
|
| BrowserThread::PostTask(
|
| - BrowserThread::UI,
|
| - FROM_HERE,
|
| - base::Bind(&DownloadDestinationObserver::DestinationError,
|
| - observer_,
|
| - reason,
|
| - file_.bytes_so_far(),
|
| - base::Passed(&hash_state)));
|
| + BrowserThread::UI, FROM_HERE,
|
| + base::Bind(&DownloadDestinationObserver::DestinationError, observer_,
|
| + reason, TotalBytesReceived(), base::Passed(&hash_state)));
|
| } else if (state == ByteStreamReader::STREAM_COMPLETE) {
|
| - // Signal successful completion and shut down processing.
|
| - stream_reader_->RegisterCallback(base::Closure());
|
| - weak_factory_.InvalidateWeakPtrs();
|
| + // Signal successful completion of the current stream.
|
| + source_stream->stream_reader()->RegisterCallback(base::Closure());
|
| + source_stream->set_finished(true);
|
| +
|
| + // Inform observers.
|
| SendUpdate();
|
| - std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
|
| - BrowserThread::PostTask(
|
| - BrowserThread::UI,
|
| - FROM_HERE,
|
| - base::Bind(&DownloadDestinationObserver::DestinationCompleted,
|
| - observer_,
|
| - file_.bytes_so_far(),
|
| - base::Passed(&hash_state)));
|
| +
|
| + bool all_stream_complete = true;
|
| + for (auto& stream : source_streams_) {
|
| + if (!stream.second->is_finished()) {
|
| + all_stream_complete = false;
|
| + break;
|
| + }
|
| + }
|
| +
|
| + // All the stream reader are completed, shut down file IO processing.
|
| + if (all_stream_complete) {
|
| + RecordFileBandwidth(bytes_seen_, disk_writes_time_,
|
| + base::TimeTicks::Now() - download_start_);
|
| + weak_factory_.InvalidateWeakPtrs();
|
| + std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
|
| + update_timer_.reset();
|
| + BrowserThread::PostTask(
|
| + BrowserThread::UI, FROM_HERE,
|
| + base::Bind(&DownloadDestinationObserver::DestinationCompleted,
|
| + observer_, TotalBytesReceived(),
|
| + base::Passed(&hash_state)));
|
| + }
|
| }
|
| if (net_log_.IsCapturing()) {
|
| net_log_.AddEvent(net::NetLogEventType::DOWNLOAD_STREAM_DRAINED,
|
| @@ -344,14 +406,44 @@ void DownloadFileImpl::StreamActive() {
|
| }
|
| }
|
|
|
| +void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
|
| + DCHECK_CURRENTLY_ON(BrowserThread::FILE);
|
| + ByteStreamReader* stream_reader = source_stream->stream_reader();
|
| + if (stream_reader) {
|
| + source_stream->stream_reader()->RegisterCallback(
|
| + base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
|
| + source_stream));
|
| + StreamActive(source_stream);
|
| + }
|
| +}
|
| +
|
| +int64_t DownloadFileImpl::TotalBytesReceived() const {
|
| + if (!is_sparse_file_)
|
| + return file_.bytes_so_far();
|
| +
|
| + // Accumulate all valid bytes from all streams.
|
| + int64_t total_received = 0;
|
| + for (auto& stream : source_streams_)
|
| + total_received += stream.second->bytes_written();
|
| +
|
| + return total_received;
|
| +}
|
| +
|
| void DownloadFileImpl::SendUpdate() {
|
| + // TODO(xingliu): Update slice info to observer to update history db.
|
| BrowserThread::PostTask(
|
| - BrowserThread::UI,
|
| - FROM_HERE,
|
| - base::Bind(&DownloadDestinationObserver::DestinationUpdate,
|
| - observer_,
|
| - file_.bytes_so_far(),
|
| - rate_estimator_.GetCountPerSecond()));
|
| + BrowserThread::UI, FROM_HERE,
|
| + base::Bind(&DownloadDestinationObserver::DestinationUpdate, observer_,
|
| + TotalBytesReceived(), rate_estimator_.GetCountPerSecond()));
|
| +}
|
| +
|
| +void DownloadFileImpl::WillWriteToDisk(size_t data_len) {
|
| + if (!update_timer_->IsRunning()) {
|
| + update_timer_->Start(FROM_HERE,
|
| + base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
|
| + this, &DownloadFileImpl::SendUpdate);
|
| + }
|
| + rate_estimator_.Increment(data_len);
|
| }
|
|
|
| DownloadFileImpl::RenameParameters::RenameParameters(
|
|
|