Chromium Code Reviews| Index: content/browser/download/download_file_impl.cc |
| diff --git a/content/browser/download/download_file_impl.cc b/content/browser/download/download_file_impl.cc |
| index 80b5d1fb96a47c4e616debbbaad822013346cdb1..222ebc72d10e8f12ebf60a219148f8f357b1f32a 100644 |
| --- a/content/browser/download/download_file_impl.cc |
| +++ b/content/browser/download/download_file_impl.cc |
| @@ -9,6 +9,7 @@ |
| #include "base/bind.h" |
| #include "base/files/file_util.h" |
| +#include "base/memory/ptr_util.h" |
| #include "base/strings/stringprintf.h" |
| #include "base/time/time.h" |
| #include "base/values.h" |
| @@ -40,11 +41,27 @@ const int kInitialRenameRetryDelayMs = 200; |
| // Number of times a failing rename is retried before giving up. |
| const int kMaxRenameRetries = 3; |
| +DownloadFileImpl::SourceStream::SourceStream(int64_t offset, |
| + int64_t bytes_received) |
| + : offset_(offset), bytes_received_(bytes_received), finished_(false) {} |
| + |
| +DownloadFileImpl::SourceStream::~SourceStream() = default; |
| + |
| +void DownloadFileImpl::SourceStream::SetByteStream( |
| + std::unique_ptr<ByteStreamReader> stream_reader) { |
| + stream_reader_ = std::move(stream_reader); |
| +} |
| + |
| +void DownloadFileImpl::SourceStream::OnWriteBytesToDisk(int64_t bytes_write) { |
| + bytes_received_ += bytes_write; |
| +} |
| + |
| DownloadFileImpl::DownloadFileImpl( |
| std::unique_ptr<DownloadSaveInfo> save_info, |
| const base::FilePath& default_download_directory, |
| - std::unique_ptr<ByteStreamReader> stream, |
| + std::unique_ptr<ByteStreamReader> stream_reader, |
| const net::NetLogWithSource& download_item_net_log, |
| + bool is_sparse_file, |
| base::WeakPtr<DownloadDestinationObserver> observer) |
| : net_log_( |
| net::NetLogWithSource::Make(download_item_net_log.net_log(), |
| @@ -52,10 +69,15 @@ DownloadFileImpl::DownloadFileImpl( |
| file_(net_log_), |
| save_info_(std::move(save_info)), |
| default_download_directory_(default_download_directory), |
| - stream_reader_(std::move(stream)), |
| + is_sparse_file_(is_sparse_file), |
| bytes_seen_(0), |
| observer_(observer), |
| weak_factory_(this) { |
| + source_streams_[save_info_->offset] = |
| + base::MakeUnique<SourceStream>(save_info_->offset, 0); |
| + DCHECK(source_streams_.size() == static_cast<size_t>(1)); |
| + source_streams_.begin()->second->SetByteStream(std::move(stream_reader)); |
| + |
| download_item_net_log.AddEvent( |
| net::NetLogEventType::DOWNLOAD_FILE_CREATED, |
| net_log_.source().ToEventParametersCallback()); |
| @@ -74,48 +96,65 @@ void DownloadFileImpl::Initialize(const InitializeCallback& callback) { |
| update_timer_.reset(new base::RepeatingTimer()); |
| DownloadInterruptReason result = |
| - file_.Initialize(save_info_->file_path, |
| - default_download_directory_, |
| - std::move(save_info_->file), |
| - save_info_->offset, |
| + file_.Initialize(save_info_->file_path, default_download_directory_, |
| + std::move(save_info_->file), save_info_->offset, |
| save_info_->hash_of_partial_file, |
| - std::move(save_info_->hash_state), |
| - false); |
| + std::move(save_info_->hash_state), is_sparse_file_); |
| if (result != DOWNLOAD_INTERRUPT_REASON_NONE) { |
| BrowserThread::PostTask( |
| BrowserThread::UI, FROM_HERE, base::Bind(callback, result)); |
| return; |
| } |
| - stream_reader_->RegisterCallback( |
| - base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr())); |
| - |
| download_start_ = base::TimeTicks::Now(); |
| // Primarily to make reset to zero in restart visible to owner. |
| SendUpdate(); |
| // Initial pull from the straw. |
| - StreamActive(); |
| + for (auto& source_stream : source_streams_) { |
|
qinmin
2017/02/27 18:55:29
no {} needed
xingliu
2017/02/28 00:57:06
Done.
|
| + RegisterAndActivateStream(source_stream.second.get()); |
| + } |
| BrowserThread::PostTask( |
| BrowserThread::UI, FROM_HERE, base::Bind( |
| callback, DOWNLOAD_INTERRUPT_REASON_NONE)); |
| } |
| +void DownloadFileImpl::AddByteStream( |
| + std::unique_ptr<ByteStreamReader> stream_reader, |
| + int64_t offset) { |
| + // |source_streams_| is not thread safe, must be modified on the same thread. |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + |
| + // The |source_streams_| must have an existing entry for the stream reader. |
| + auto current_source_stream = source_streams_.find(offset); |
| + DCHECK(current_source_stream != source_streams_.end()); |
| + SourceStream* stream = current_source_stream->second.get(); |
| + stream->SetByteStream(std::move(stream_reader)); |
| + |
| + // Start to pull data from the stream. |
| + BrowserThread::PostTask( |
| + BrowserThread::FILE, FROM_HERE, |
| + base::Bind(&DownloadFileImpl::RegisterAndActivateStream, |
| + weak_factory_.GetWeakPtr(), stream)); |
| +} |
| + |
| DownloadInterruptReason DownloadFileImpl::AppendDataToFile( |
| const char* data, size_t data_len) { |
| DCHECK_CURRENTLY_ON(BrowserThread::FILE); |
| - |
| - if (!update_timer_->IsRunning()) { |
| - update_timer_->Start(FROM_HERE, |
| - base::TimeDelta::FromMilliseconds(kUpdatePeriodMs), |
| - this, &DownloadFileImpl::SendUpdate); |
| - } |
| - rate_estimator_.Increment(data_len); |
| + WillWriteToDisk(data_len); |
| return file_.AppendDataToFile(data, data_len); |
| } |
| +DownloadInterruptReason DownloadFileImpl::WriteDataToFile(int64_t offset, |
| + const char* data, |
| + size_t data_len) { |
| + DCHECK_CURRENTLY_ON(BrowserThread::FILE); |
| + WillWriteToDisk(data_len); |
| + return file_.WriteDataToFile(offset, data, data_len); |
| +} |
| + |
| void DownloadFileImpl::RenameAndUniquify( |
| const base::FilePath& full_path, |
| const RenameCompletionCallback& callback) { |
| @@ -213,7 +252,11 @@ void DownloadFileImpl::RenameWithRetryInternal( |
| SendUpdate(); |
| // Null out callback so that we don't do any more stream processing. |
| - stream_reader_->RegisterCallback(base::Closure()); |
| + for (auto& stream : source_streams_) { |
| + ByteStreamReader* stream_reader = stream.second->stream_reader(); |
| + if (stream_reader) |
|
qinmin
2017/02/27 18:55:29
no {}
xingliu
2017/02/28 00:57:06
Done. Kept {} here since there are multiple lines.
|
| + stream_reader->RegisterCallback(base::Closure()); |
| + } |
| new_path.clear(); |
| } |
| @@ -240,7 +283,8 @@ bool DownloadFileImpl::InProgress() const { |
| return file_.in_progress(); |
| } |
| -void DownloadFileImpl::StreamActive() { |
| +void DownloadFileImpl::StreamActive(SourceStream* source_stream) { |
| + DCHECK(source_stream->stream_reader()); |
| base::TimeTicks start(base::TimeTicks::Now()); |
| base::TimeTicks now; |
| scoped_refptr<net::IOBuffer> incoming_data; |
| @@ -254,7 +298,8 @@ void DownloadFileImpl::StreamActive() { |
| // Take care of any file local activity required. |
| do { |
| - state = stream_reader_->Read(&incoming_data, &incoming_data_size); |
| + state = source_stream->stream_reader()->Read(&incoming_data, |
| + &incoming_data_size); |
| switch (state) { |
| case ByteStreamReader::STREAM_EMPTY: |
| @@ -263,24 +308,26 @@ void DownloadFileImpl::StreamActive() { |
| { |
| ++num_buffers; |
| base::TimeTicks write_start(base::TimeTicks::Now()); |
| - reason = AppendDataToFile( |
| - incoming_data.get()->data(), incoming_data_size); |
| + if (is_sparse_file_) { |
| + reason = WriteDataToFile( |
| + source_stream->offset() + source_stream->bytes_received(), |
| + incoming_data.get()->data(), incoming_data_size); |
| + } else { |
| + reason = AppendDataToFile(incoming_data.get()->data(), |
| + incoming_data_size); |
| + } |
| disk_writes_time_ += (base::TimeTicks::Now() - write_start); |
| bytes_seen_ += incoming_data_size; |
| total_incoming_data_size += incoming_data_size; |
| + if (reason == DOWNLOAD_INTERRUPT_REASON_NONE) |
| + source_stream->OnWriteBytesToDisk(incoming_data_size); |
| } |
| break; |
| case ByteStreamReader::STREAM_COMPLETE: |
| { |
| - reason = static_cast<DownloadInterruptReason>( |
| - stream_reader_->GetStatus()); |
| - SendUpdate(); |
| - base::TimeTicks close_start(base::TimeTicks::Now()); |
| - base::TimeTicks now(base::TimeTicks::Now()); |
| - disk_writes_time_ += (now - close_start); |
| - RecordFileBandwidth( |
| - bytes_seen_, disk_writes_time_, now - download_start_); |
| - update_timer_.reset(); |
| + reason = static_cast<DownloadInterruptReason>( |
|
qinmin
2017/02/27 18:55:29
fix the indentations
xingliu
2017/02/28 00:57:06
Done.
|
| + source_stream->stream_reader()->GetStatus()); |
| + SendUpdate(); |
| } |
| break; |
| default: |
| @@ -297,8 +344,8 @@ void DownloadFileImpl::StreamActive() { |
| now - start > delta) { |
| BrowserThread::PostTask( |
| BrowserThread::FILE, FROM_HERE, |
| - base::Bind(&DownloadFileImpl::StreamActive, |
| - weak_factory_.GetWeakPtr())); |
| + base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(), |
| + source_stream)); |
| } |
| if (total_incoming_data_size) |
| @@ -308,34 +355,50 @@ void DownloadFileImpl::StreamActive() { |
| // Take care of communication with our observer. |
| if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) { |
| + if (state == ByteStreamReader::STREAM_COMPLETE) { |
| + RecordFileBandwidth(bytes_seen_, disk_writes_time_, |
| + base::TimeTicks::Now() - download_start_); |
| + } |
| // Error case for both upstream source and file write. |
| // Shut down processing and signal an error to our observer. |
| // Our observer will clean us up. |
| - stream_reader_->RegisterCallback(base::Closure()); |
| + source_stream->stream_reader()->RegisterCallback(base::Closure()); |
| weak_factory_.InvalidateWeakPtrs(); |
| SendUpdate(); // Make info up to date before error. |
| std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish(); |
| BrowserThread::PostTask( |
| - BrowserThread::UI, |
| - FROM_HERE, |
| - base::Bind(&DownloadDestinationObserver::DestinationError, |
| - observer_, |
| - reason, |
| - file_.bytes_so_far(), |
| - base::Passed(&hash_state))); |
| + BrowserThread::UI, FROM_HERE, |
| + base::Bind(&DownloadDestinationObserver::DestinationError, observer_, |
| + reason, TotalBytesReceived(), base::Passed(&hash_state))); |
| } else if (state == ByteStreamReader::STREAM_COMPLETE) { |
| - // Signal successful completion and shut down processing. |
| - stream_reader_->RegisterCallback(base::Closure()); |
| - weak_factory_.InvalidateWeakPtrs(); |
| + // Signal successful completion of the current stream. |
| + source_stream->stream_reader()->RegisterCallback(base::Closure()); |
| + source_stream->set_finished(true); |
| + |
| + // Inform observers. |
| SendUpdate(); |
| - std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish(); |
| - BrowserThread::PostTask( |
| - BrowserThread::UI, |
| - FROM_HERE, |
| - base::Bind(&DownloadDestinationObserver::DestinationCompleted, |
| - observer_, |
| - file_.bytes_so_far(), |
| - base::Passed(&hash_state))); |
| + |
| + bool all_stream_complete = true; |
| + for (auto& stream : source_streams_) { |
| + if (!stream.second->is_finished()) { |
| + all_stream_complete = false; |
| + break; |
| + } |
| + } |
| + |
| + // All the stream reader are completed, shut down file IO processing. |
| + if (all_stream_complete) { |
| + RecordFileBandwidth(bytes_seen_, disk_writes_time_, |
| + base::TimeTicks::Now() - download_start_); |
| + weak_factory_.InvalidateWeakPtrs(); |
| + std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish(); |
| + update_timer_.reset(); |
| + BrowserThread::PostTask( |
| + BrowserThread::UI, FROM_HERE, |
| + base::Bind(&DownloadDestinationObserver::DestinationCompleted, |
| + observer_, TotalBytesReceived(), |
| + base::Passed(&hash_state))); |
| + } |
| } |
| if (net_log_.IsCapturing()) { |
| net_log_.AddEvent(net::NetLogEventType::DOWNLOAD_STREAM_DRAINED, |
| @@ -344,14 +407,45 @@ void DownloadFileImpl::StreamActive() { |
| } |
| } |
| +void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) { |
| + DCHECK_CURRENTLY_ON(BrowserThread::FILE); |
| + ByteStreamReader* stream_reader = source_stream->stream_reader(); |
| + if (stream_reader) { |
| + source_stream->stream_reader()->RegisterCallback( |
| + base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(), |
| + source_stream)); |
| + StreamActive(source_stream); |
| + } |
| +} |
| + |
| +int64_t DownloadFileImpl::TotalBytesReceived() const { |
| + if (!is_sparse_file_) |
| + return file_.bytes_so_far(); |
| + |
| + // Accumulate all valid bytes from all streams. |
| + int64_t total_received = 0; |
| + for (auto& stream : source_streams_) { |
|
qinmin
2017/02/27 18:55:29
no {} meeded
xingliu
2017/02/28 00:57:06
Done.
|
| + total_received += stream.second->bytes_received(); |
| + } |
| + |
| + return total_received; |
| +} |
| + |
| void DownloadFileImpl::SendUpdate() { |
| + // TODO(xingliu): Update slice info to observer to update history db. |
| BrowserThread::PostTask( |
| - BrowserThread::UI, |
| - FROM_HERE, |
| - base::Bind(&DownloadDestinationObserver::DestinationUpdate, |
| - observer_, |
| - file_.bytes_so_far(), |
| - rate_estimator_.GetCountPerSecond())); |
| + BrowserThread::UI, FROM_HERE, |
| + base::Bind(&DownloadDestinationObserver::DestinationUpdate, observer_, |
| + TotalBytesReceived(), rate_estimator_.GetCountPerSecond())); |
| +} |
| + |
| +void DownloadFileImpl::WillWriteToDisk(size_t data_len) { |
| + if (!update_timer_->IsRunning()) { |
| + update_timer_->Start(FROM_HERE, |
| + base::TimeDelta::FromMilliseconds(kUpdatePeriodMs), |
| + this, &DownloadFileImpl::SendUpdate); |
| + } |
| + rate_estimator_.Increment(data_len); |
| } |
| DownloadFileImpl::RenameParameters::RenameParameters( |