| Index: content/browser/download/download_file_impl.cc
|
| diff --git a/content/browser/download/download_file_impl.cc b/content/browser/download/download_file_impl.cc
|
| index 5d9ac7997b08e668e65b4c863a14029ee510afbe..de810fed95897b341fd4743dc3c06663e33c3d31 100644
|
| --- a/content/browser/download/download_file_impl.cc
|
| +++ b/content/browser/download/download_file_impl.cc
|
| @@ -9,6 +9,7 @@
|
|
|
| #include "base/bind.h"
|
| #include "base/files/file_util.h"
|
| +#include "base/memory/ptr_util.h"
|
| #include "base/strings/stringprintf.h"
|
| #include "base/time/time.h"
|
| #include "base/values.h"
|
| @@ -40,11 +41,26 @@ const int kInitialRenameRetryDelayMs = 200;
|
| // Number of times a failing rename is retried before giving up.
|
| const int kMaxRenameRetries = 3;
|
|
|
| +DownloadFileImpl::SourceStream::SourceStream(int64_t offset, int64_t length)
|
| + : offset_(offset), length_(length), bytes_written_(0), finished_(false) {}
|
| +
|
| +DownloadFileImpl::SourceStream::~SourceStream() = default;
|
| +
|
| +void DownloadFileImpl::SourceStream::SetByteStream(
|
| + std::unique_ptr<ByteStreamReader> stream_reader) {
|
| + stream_reader_ = std::move(stream_reader);
|
| +}
|
| +
|
| +void DownloadFileImpl::SourceStream::OnWriteBytesToDisk(int64_t bytes_write) {
|
| + bytes_written_ += bytes_write;
|
| +}
|
| +
|
| DownloadFileImpl::DownloadFileImpl(
|
| std::unique_ptr<DownloadSaveInfo> save_info,
|
| const base::FilePath& default_download_directory,
|
| - std::unique_ptr<ByteStreamReader> stream,
|
| + std::unique_ptr<ByteStreamReader> stream_reader,
|
| const net::NetLogWithSource& download_item_net_log,
|
| + bool is_sparse_file,
|
| base::WeakPtr<DownloadDestinationObserver> observer)
|
| : net_log_(
|
| net::NetLogWithSource::Make(download_item_net_log.net_log(),
|
| @@ -52,10 +68,15 @@ DownloadFileImpl::DownloadFileImpl(
|
| file_(net_log_),
|
| save_info_(std::move(save_info)),
|
| default_download_directory_(default_download_directory),
|
| - stream_reader_(std::move(stream)),
|
| + is_sparse_file_(is_sparse_file),
|
| bytes_seen_(0),
|
| observer_(observer),
|
| weak_factory_(this) {
|
| + source_streams_[save_info_->offset] = base::MakeUnique<SourceStream>(
|
| + save_info_->offset, DownloadSaveInfo::kLengthFullContent);
|
| + DCHECK(source_streams_.size() == static_cast<size_t>(1));
|
| + source_streams_[save_info_->offset]->SetByteStream(std::move(stream_reader));
|
| +
|
| download_item_net_log.AddEvent(
|
| net::NetLogEventType::DOWNLOAD_FILE_CREATED,
|
| net_log_.source().ToEventParametersCallback());
|
| @@ -74,46 +95,50 @@ void DownloadFileImpl::Initialize(const InitializeCallback& callback) {
|
|
|
| update_timer_.reset(new base::RepeatingTimer());
|
| DownloadInterruptReason result =
|
| - file_.Initialize(save_info_->file_path,
|
| - default_download_directory_,
|
| - std::move(save_info_->file),
|
| - save_info_->offset,
|
| + file_.Initialize(save_info_->file_path, default_download_directory_,
|
| + std::move(save_info_->file), save_info_->offset,
|
| save_info_->hash_of_partial_file,
|
| - std::move(save_info_->hash_state),
|
| - false);
|
| + std::move(save_info_->hash_state), is_sparse_file_);
|
| if (result != DOWNLOAD_INTERRUPT_REASON_NONE) {
|
| BrowserThread::PostTask(
|
| BrowserThread::UI, FROM_HERE, base::Bind(callback, result));
|
| return;
|
| }
|
|
|
| - stream_reader_->RegisterCallback(
|
| - base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr()));
|
| -
|
| download_start_ = base::TimeTicks::Now();
|
|
|
| // Primarily to make reset to zero in restart visible to owner.
|
| SendUpdate();
|
|
|
| // Initial pull from the straw.
|
| - StreamActive();
|
| + for (auto& source_stream : source_streams_)
|
| + RegisterAndActivateStream(source_stream.second.get());
|
|
|
| BrowserThread::PostTask(
|
| BrowserThread::UI, FROM_HERE, base::Bind(
|
| callback, DOWNLOAD_INTERRUPT_REASON_NONE));
|
| }
|
|
|
| -DownloadInterruptReason DownloadFileImpl::AppendDataToFile(
|
| - const char* data, size_t data_len) {
|
| +void DownloadFileImpl::AddByteStream(
|
| + std::unique_ptr<ByteStreamReader> stream_reader,
|
| + int64_t offset) {
|
| DCHECK_CURRENTLY_ON(BrowserThread::FILE);
|
|
|
| - if (!update_timer_->IsRunning()) {
|
| - update_timer_->Start(FROM_HERE,
|
| - base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
|
| - this, &DownloadFileImpl::SendUpdate);
|
| - }
|
| - rate_estimator_.Increment(data_len);
|
| - return file_.AppendDataToFile(data, data_len);
|
| + // The |source_streams_| must have an existing entry for the stream reader.
|
| + auto current_source_stream = source_streams_.find(offset);
|
| + DCHECK(current_source_stream != source_streams_.end());
|
| + SourceStream* stream = current_source_stream->second.get();
|
| + stream->SetByteStream(std::move(stream_reader));
|
| +
|
| + RegisterAndActivateStream(stream);
|
| +}
|
| +
|
| +DownloadInterruptReason DownloadFileImpl::WriteDataToFile(int64_t offset,
|
| + const char* data,
|
| + size_t data_len) {
|
| + DCHECK_CURRENTLY_ON(BrowserThread::FILE);
|
| + WillWriteToDisk(data_len);
|
| + return file_.WriteDataToFile(offset, data, data_len);
|
| }
|
|
|
| void DownloadFileImpl::RenameAndUniquify(
|
| @@ -213,7 +238,11 @@ void DownloadFileImpl::RenameWithRetryInternal(
|
| SendUpdate();
|
|
|
| // Null out callback so that we don't do any more stream processing.
|
| - stream_reader_->RegisterCallback(base::Closure());
|
| + for (auto& stream : source_streams_) {
|
| + ByteStreamReader* stream_reader = stream.second->stream_reader();
|
| + if (stream_reader)
|
| + stream_reader->RegisterCallback(base::Closure());
|
| + }
|
|
|
| new_path.clear();
|
| }
|
| @@ -240,13 +269,15 @@ bool DownloadFileImpl::InProgress() const {
|
| return file_.in_progress();
|
| }
|
|
|
| -void DownloadFileImpl::StreamActive() {
|
| +void DownloadFileImpl::StreamActive(SourceStream* source_stream) {
|
| + DCHECK(source_stream->stream_reader());
|
| base::TimeTicks start(base::TimeTicks::Now());
|
| base::TimeTicks now;
|
| scoped_refptr<net::IOBuffer> incoming_data;
|
| size_t incoming_data_size = 0;
|
| size_t total_incoming_data_size = 0;
|
| size_t num_buffers = 0;
|
| + bool should_terminate = false;
|
| ByteStreamReader::StreamState state(ByteStreamReader::STREAM_EMPTY);
|
| DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_NONE;
|
| base::TimeDelta delta(
|
| @@ -254,7 +285,8 @@ void DownloadFileImpl::StreamActive() {
|
|
|
| // Take care of any file local activity required.
|
| do {
|
| - state = stream_reader_->Read(&incoming_data, &incoming_data_size);
|
| + state = source_stream->stream_reader()->Read(&incoming_data,
|
| + &incoming_data_size);
|
|
|
| switch (state) {
|
| case ByteStreamReader::STREAM_EMPTY:
|
| @@ -263,24 +295,30 @@ void DownloadFileImpl::StreamActive() {
|
| {
|
| ++num_buffers;
|
| base::TimeTicks write_start(base::TimeTicks::Now());
|
| - reason = AppendDataToFile(
|
| + // Stop the stream if it writes more bytes than expected.
|
| + if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
|
| + source_stream->bytes_written() +
|
| + static_cast<int64_t>(incoming_data_size) >=
|
| + source_stream->length()) {
|
| + should_terminate = true;
|
| + incoming_data_size =
|
| + source_stream->length() - source_stream->bytes_written();
|
| + }
|
| + reason = WriteDataToFile(
|
| + source_stream->offset() + source_stream->bytes_written(),
|
| incoming_data.get()->data(), incoming_data_size);
|
| disk_writes_time_ += (base::TimeTicks::Now() - write_start);
|
| bytes_seen_ += incoming_data_size;
|
| total_incoming_data_size += incoming_data_size;
|
| + if (reason == DOWNLOAD_INTERRUPT_REASON_NONE)
|
| + source_stream->OnWriteBytesToDisk(incoming_data_size);
|
| }
|
| break;
|
| case ByteStreamReader::STREAM_COMPLETE:
|
| {
|
| reason = static_cast<DownloadInterruptReason>(
|
| - stream_reader_->GetStatus());
|
| + source_stream->stream_reader()->GetStatus());
|
| SendUpdate();
|
| - base::TimeTicks close_start(base::TimeTicks::Now());
|
| - base::TimeTicks now(base::TimeTicks::Now());
|
| - disk_writes_time_ += (now - close_start);
|
| - RecordFileBandwidth(
|
| - bytes_seen_, disk_writes_time_, now - download_start_);
|
| - update_timer_.reset();
|
| }
|
| break;
|
| default:
|
| @@ -289,16 +327,16 @@ void DownloadFileImpl::StreamActive() {
|
| }
|
| now = base::TimeTicks::Now();
|
| } while (state == ByteStreamReader::STREAM_HAS_DATA &&
|
| - reason == DOWNLOAD_INTERRUPT_REASON_NONE &&
|
| - now - start <= delta);
|
| + reason == DOWNLOAD_INTERRUPT_REASON_NONE && now - start <= delta &&
|
| + !should_terminate);
|
|
|
| // If we're stopping to yield the thread, post a task so we come back.
|
| - if (state == ByteStreamReader::STREAM_HAS_DATA &&
|
| - now - start > delta) {
|
| + if (state == ByteStreamReader::STREAM_HAS_DATA && now - start > delta &&
|
| + !should_terminate) {
|
| BrowserThread::PostTask(
|
| BrowserThread::FILE, FROM_HERE,
|
| - base::Bind(&DownloadFileImpl::StreamActive,
|
| - weak_factory_.GetWeakPtr()));
|
| + base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
|
| + source_stream));
|
| }
|
|
|
| if (total_incoming_data_size)
|
| @@ -311,31 +349,43 @@ void DownloadFileImpl::StreamActive() {
|
| // Error case for both upstream source and file write.
|
| // Shut down processing and signal an error to our observer.
|
| // Our observer will clean us up.
|
| - stream_reader_->RegisterCallback(base::Closure());
|
| + source_stream->stream_reader()->RegisterCallback(base::Closure());
|
| weak_factory_.InvalidateWeakPtrs();
|
| SendUpdate(); // Make info up to date before error.
|
| std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
|
| BrowserThread::PostTask(
|
| - BrowserThread::UI,
|
| - FROM_HERE,
|
| - base::Bind(&DownloadDestinationObserver::DestinationError,
|
| - observer_,
|
| - reason,
|
| - file_.bytes_so_far(),
|
| - base::Passed(&hash_state)));
|
| - } else if (state == ByteStreamReader::STREAM_COMPLETE) {
|
| - // Signal successful completion and shut down processing.
|
| - stream_reader_->RegisterCallback(base::Closure());
|
| - weak_factory_.InvalidateWeakPtrs();
|
| + BrowserThread::UI, FROM_HERE,
|
| + base::Bind(&DownloadDestinationObserver::DestinationError, observer_,
|
| + reason, TotalBytesReceived(), base::Passed(&hash_state)));
|
| + } else if (state == ByteStreamReader::STREAM_COMPLETE || should_terminate) {
|
| + // Signal successful completion or termination of the current stream.
|
| + source_stream->stream_reader()->RegisterCallback(base::Closure());
|
| + source_stream->set_finished(true);
|
| +
|
| + // Inform observers.
|
| SendUpdate();
|
| - std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
|
| - BrowserThread::PostTask(
|
| - BrowserThread::UI,
|
| - FROM_HERE,
|
| - base::Bind(&DownloadDestinationObserver::DestinationCompleted,
|
| - observer_,
|
| - file_.bytes_so_far(),
|
| - base::Passed(&hash_state)));
|
| +
|
| + bool all_stream_complete = true;
|
| + for (auto& stream : source_streams_) {
|
| + if (!stream.second->is_finished()) {
|
| + all_stream_complete = false;
|
| + break;
|
| + }
|
| + }
|
| +
|
| + // All the stream reader are completed, shut down file IO processing.
|
| + if (all_stream_complete) {
|
| + RecordFileBandwidth(bytes_seen_, disk_writes_time_,
|
| + base::TimeTicks::Now() - download_start_);
|
| + weak_factory_.InvalidateWeakPtrs();
|
| + std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
|
| + update_timer_.reset();
|
| + BrowserThread::PostTask(
|
| + BrowserThread::UI, FROM_HERE,
|
| + base::Bind(&DownloadDestinationObserver::DestinationCompleted,
|
| + observer_, TotalBytesReceived(),
|
| + base::Passed(&hash_state)));
|
| + }
|
| }
|
| if (net_log_.IsCapturing()) {
|
| net_log_.AddEvent(net::NetLogEventType::DOWNLOAD_STREAM_DRAINED,
|
| @@ -344,17 +394,40 @@ void DownloadFileImpl::StreamActive() {
|
| }
|
| }
|
|
|
| +void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
|
| + DCHECK_CURRENTLY_ON(BrowserThread::FILE);
|
| + ByteStreamReader* stream_reader = source_stream->stream_reader();
|
| + if (stream_reader) {
|
| + stream_reader->RegisterCallback(base::Bind(&DownloadFileImpl::StreamActive,
|
| + weak_factory_.GetWeakPtr(),
|
| + source_stream));
|
| + StreamActive(source_stream);
|
| + }
|
| +}
|
| +
|
| +int64_t DownloadFileImpl::TotalBytesReceived() const {
|
| + // TODO(xingliu): Use slice info to figure out total bytes received.
|
| + return file_.bytes_so_far();
|
| +}
|
| +
|
| void DownloadFileImpl::SendUpdate() {
|
| + // TODO(xingliu): Update slice info to observer to update history db.
|
| BrowserThread::PostTask(
|
| - BrowserThread::UI,
|
| - FROM_HERE,
|
| - base::Bind(&DownloadDestinationObserver::DestinationUpdate,
|
| - observer_,
|
| - file_.bytes_so_far(),
|
| - rate_estimator_.GetCountPerSecond(),
|
| + BrowserThread::UI, FROM_HERE,
|
| + base::Bind(&DownloadDestinationObserver::DestinationUpdate, observer_,
|
| + TotalBytesReceived(), rate_estimator_.GetCountPerSecond(),
|
| std::vector<DownloadItem::ReceivedSlice>()));
|
| }
|
|
|
| +void DownloadFileImpl::WillWriteToDisk(size_t data_len) {
|
| + if (!update_timer_->IsRunning()) {
|
| + update_timer_->Start(FROM_HERE,
|
| + base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
|
| + this, &DownloadFileImpl::SendUpdate);
|
| + }
|
| + rate_estimator_.Increment(data_len);
|
| +}
|
| +
|
| DownloadFileImpl::RenameParameters::RenameParameters(
|
| RenameOption option,
|
| const base::FilePath& new_path,
|
|
|