Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(9)

Unified Diff: content/browser/download/download_file_impl.cc

Issue 2712713007: Make DownloadFileImpl handle multiple byte streams. (Closed)
Patch Set: Minor polish to avoid an extra empty read. Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « content/browser/download/download_file_impl.h ('k') | content/browser/download/download_file_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: content/browser/download/download_file_impl.cc
diff --git a/content/browser/download/download_file_impl.cc b/content/browser/download/download_file_impl.cc
index 80b5d1fb96a47c4e616debbbaad822013346cdb1..a5a5be3a409a8addf5f415390e756d416c6dd2d5 100644
--- a/content/browser/download/download_file_impl.cc
+++ b/content/browser/download/download_file_impl.cc
@@ -9,6 +9,7 @@
#include "base/bind.h"
#include "base/files/file_util.h"
+#include "base/memory/ptr_util.h"
#include "base/strings/stringprintf.h"
#include "base/time/time.h"
#include "base/values.h"
@@ -40,11 +41,26 @@ const int kInitialRenameRetryDelayMs = 200;
// Number of times a failing rename is retried before giving up.
const int kMaxRenameRetries = 3;
+DownloadFileImpl::SourceStream::SourceStream(int64_t offset, int64_t length)
+ : offset_(offset), length_(length), bytes_written_(0), finished_(false) {}
+
+DownloadFileImpl::SourceStream::~SourceStream() = default;
+
+void DownloadFileImpl::SourceStream::SetByteStream(
+ std::unique_ptr<ByteStreamReader> stream_reader) {
+ stream_reader_ = std::move(stream_reader);
+}
+
+void DownloadFileImpl::SourceStream::OnWriteBytesToDisk(int64_t bytes_write) {
+ bytes_written_ += bytes_write;
+}
+
DownloadFileImpl::DownloadFileImpl(
std::unique_ptr<DownloadSaveInfo> save_info,
const base::FilePath& default_download_directory,
- std::unique_ptr<ByteStreamReader> stream,
+ std::unique_ptr<ByteStreamReader> stream_reader,
const net::NetLogWithSource& download_item_net_log,
+ bool is_sparse_file,
base::WeakPtr<DownloadDestinationObserver> observer)
: net_log_(
net::NetLogWithSource::Make(download_item_net_log.net_log(),
@@ -52,10 +68,15 @@ DownloadFileImpl::DownloadFileImpl(
file_(net_log_),
save_info_(std::move(save_info)),
default_download_directory_(default_download_directory),
- stream_reader_(std::move(stream)),
+ is_sparse_file_(is_sparse_file),
bytes_seen_(0),
observer_(observer),
weak_factory_(this) {
+ source_streams_[save_info_->offset] =
+ base::MakeUnique<SourceStream>(save_info_->offset, 0);
qinmin 2017/03/02 17:21:34 nit: s/0/DownloadSaveInfo::kLengthFullContent/
xingliu 2017/03/02 19:21:20 Done.
+ DCHECK(source_streams_.size() == static_cast<size_t>(1));
+ source_streams_.begin()->second->SetByteStream(std::move(stream_reader));
+
download_item_net_log.AddEvent(
net::NetLogEventType::DOWNLOAD_FILE_CREATED,
net_log_.source().ToEventParametersCallback());
@@ -74,48 +95,64 @@ void DownloadFileImpl::Initialize(const InitializeCallback& callback) {
update_timer_.reset(new base::RepeatingTimer());
DownloadInterruptReason result =
- file_.Initialize(save_info_->file_path,
- default_download_directory_,
- std::move(save_info_->file),
- save_info_->offset,
+ file_.Initialize(save_info_->file_path, default_download_directory_,
+ std::move(save_info_->file), save_info_->offset,
save_info_->hash_of_partial_file,
- std::move(save_info_->hash_state),
- false);
+ std::move(save_info_->hash_state), is_sparse_file_);
if (result != DOWNLOAD_INTERRUPT_REASON_NONE) {
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE, base::Bind(callback, result));
return;
}
- stream_reader_->RegisterCallback(
- base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr()));
-
download_start_ = base::TimeTicks::Now();
// Primarily to make reset to zero in restart visible to owner.
SendUpdate();
// Initial pull from the straw.
- StreamActive();
+ for (auto& source_stream : source_streams_)
+ RegisterAndActivateStream(source_stream.second.get());
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE, base::Bind(
callback, DOWNLOAD_INTERRUPT_REASON_NONE));
}
+void DownloadFileImpl::AddByteStream(
+ std::unique_ptr<ByteStreamReader> stream_reader,
+ int64_t offset) {
+ // |source_streams_| is not thread safe, must be modified on the same thread.
+ DCHECK(thread_checker_.CalledOnValidThread());
+
+ // The |source_streams_| must have an existing entry for the stream reader.
+ auto current_source_stream = source_streams_.find(offset);
+ DCHECK(current_source_stream != source_streams_.end());
+ SourceStream* stream = current_source_stream->second.get();
+ stream->SetByteStream(std::move(stream_reader));
+
+ // Start to pull data from the stream.
+ BrowserThread::PostTask(
+ BrowserThread::FILE, FROM_HERE,
+ base::Bind(&DownloadFileImpl::RegisterAndActivateStream,
+ weak_factory_.GetWeakPtr(), stream));
+}
+
DownloadInterruptReason DownloadFileImpl::AppendDataToFile(
const char* data, size_t data_len) {
DCHECK_CURRENTLY_ON(BrowserThread::FILE);
-
- if (!update_timer_->IsRunning()) {
- update_timer_->Start(FROM_HERE,
- base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
- this, &DownloadFileImpl::SendUpdate);
- }
- rate_estimator_.Increment(data_len);
+ WillWriteToDisk(data_len);
return file_.AppendDataToFile(data, data_len);
}
+DownloadInterruptReason DownloadFileImpl::WriteDataToFile(int64_t offset,
+ const char* data,
+ size_t data_len) {
+ DCHECK_CURRENTLY_ON(BrowserThread::FILE);
+ WillWriteToDisk(data_len);
+ return file_.WriteDataToFile(offset, data, data_len);
+}
+
void DownloadFileImpl::RenameAndUniquify(
const base::FilePath& full_path,
const RenameCompletionCallback& callback) {
@@ -213,7 +250,11 @@ void DownloadFileImpl::RenameWithRetryInternal(
SendUpdate();
// Null out callback so that we don't do any more stream processing.
- stream_reader_->RegisterCallback(base::Closure());
+ for (auto& stream : source_streams_) {
+ ByteStreamReader* stream_reader = stream.second->stream_reader();
+ if (stream_reader)
+ stream_reader->RegisterCallback(base::Closure());
+ }
new_path.clear();
}
@@ -240,13 +281,15 @@ bool DownloadFileImpl::InProgress() const {
return file_.in_progress();
}
-void DownloadFileImpl::StreamActive() {
+void DownloadFileImpl::StreamActive(SourceStream* source_stream) {
+ DCHECK(source_stream->stream_reader());
base::TimeTicks start(base::TimeTicks::Now());
base::TimeTicks now;
scoped_refptr<net::IOBuffer> incoming_data;
size_t incoming_data_size = 0;
size_t total_incoming_data_size = 0;
size_t num_buffers = 0;
+ bool should_terminate = false;
ByteStreamReader::StreamState state(ByteStreamReader::STREAM_EMPTY);
DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_NONE;
base::TimeDelta delta(
@@ -254,7 +297,8 @@ void DownloadFileImpl::StreamActive() {
// Take care of any file local activity required.
do {
- state = stream_reader_->Read(&incoming_data, &incoming_data_size);
+ state = source_stream->stream_reader()->Read(&incoming_data,
+ &incoming_data_size);
switch (state) {
case ByteStreamReader::STREAM_EMPTY:
@@ -263,24 +307,36 @@ void DownloadFileImpl::StreamActive() {
{
++num_buffers;
base::TimeTicks write_start(base::TimeTicks::Now());
- reason = AppendDataToFile(
- incoming_data.get()->data(), incoming_data_size);
+ // Stop the stream if it writes more bytes than expected.
+ if (source_stream->length() > 0 &&
qinmin 2017/03/02 17:21:34 nit: s/> 0/!= DownloadSaveInfo::kLengthFullContent
xingliu 2017/03/02 19:21:20 Done.
+ source_stream->bytes_written() +
+ static_cast<int64_t>(incoming_data_size) >=
+ source_stream->length()) {
+ should_terminate = true;
+ incoming_data_size =
+ source_stream->length() - source_stream->bytes_written();
+ }
+
+ if (is_sparse_file_) {
+ reason = WriteDataToFile(
+ source_stream->offset() + source_stream->bytes_written(),
+ incoming_data.get()->data(), incoming_data_size);
+ } else {
+ reason = AppendDataToFile(incoming_data.get()->data(),
+ incoming_data_size);
+ }
disk_writes_time_ += (base::TimeTicks::Now() - write_start);
bytes_seen_ += incoming_data_size;
total_incoming_data_size += incoming_data_size;
+ if (reason == DOWNLOAD_INTERRUPT_REASON_NONE)
+ source_stream->OnWriteBytesToDisk(incoming_data_size);
}
break;
case ByteStreamReader::STREAM_COMPLETE:
{
reason = static_cast<DownloadInterruptReason>(
- stream_reader_->GetStatus());
+ source_stream->stream_reader()->GetStatus());
SendUpdate();
- base::TimeTicks close_start(base::TimeTicks::Now());
- base::TimeTicks now(base::TimeTicks::Now());
- disk_writes_time_ += (now - close_start);
- RecordFileBandwidth(
- bytes_seen_, disk_writes_time_, now - download_start_);
- update_timer_.reset();
}
break;
default:
@@ -289,16 +345,16 @@ void DownloadFileImpl::StreamActive() {
}
now = base::TimeTicks::Now();
} while (state == ByteStreamReader::STREAM_HAS_DATA &&
- reason == DOWNLOAD_INTERRUPT_REASON_NONE &&
- now - start <= delta);
+ reason == DOWNLOAD_INTERRUPT_REASON_NONE && now - start <= delta &&
+ !should_terminate);
// If we're stopping to yield the thread, post a task so we come back.
- if (state == ByteStreamReader::STREAM_HAS_DATA &&
- now - start > delta) {
+ if (state == ByteStreamReader::STREAM_HAS_DATA && now - start > delta &&
+ !should_terminate) {
BrowserThread::PostTask(
BrowserThread::FILE, FROM_HERE,
- base::Bind(&DownloadFileImpl::StreamActive,
- weak_factory_.GetWeakPtr()));
+ base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
+ source_stream));
}
if (total_incoming_data_size)
@@ -308,34 +364,50 @@ void DownloadFileImpl::StreamActive() {
// Take care of communication with our observer.
if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
+ if (state == ByteStreamReader::STREAM_COMPLETE) {
qinmin 2017/03/02 00:12:23 do we need to do this if it is just one stream fai
xingliu 2017/03/02 01:18:35 Done, removed. The original logic also records ban
+ RecordFileBandwidth(bytes_seen_, disk_writes_time_,
+ base::TimeTicks::Now() - download_start_);
+ }
// Error case for both upstream source and file write.
// Shut down processing and signal an error to our observer.
// Our observer will clean us up.
- stream_reader_->RegisterCallback(base::Closure());
+ source_stream->stream_reader()->RegisterCallback(base::Closure());
weak_factory_.InvalidateWeakPtrs();
SendUpdate(); // Make info up to date before error.
std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
BrowserThread::PostTask(
- BrowserThread::UI,
- FROM_HERE,
- base::Bind(&DownloadDestinationObserver::DestinationError,
- observer_,
- reason,
- file_.bytes_so_far(),
- base::Passed(&hash_state)));
- } else if (state == ByteStreamReader::STREAM_COMPLETE) {
- // Signal successful completion and shut down processing.
- stream_reader_->RegisterCallback(base::Closure());
- weak_factory_.InvalidateWeakPtrs();
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadDestinationObserver::DestinationError, observer_,
+ reason, TotalBytesReceived(), base::Passed(&hash_state)));
+ } else if (state == ByteStreamReader::STREAM_COMPLETE || should_terminate) {
+ // Signal successful completion or termination of the current stream.
+ source_stream->stream_reader()->RegisterCallback(base::Closure());
+ source_stream->set_finished(true);
+
+ // Inform observers.
SendUpdate();
- std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
- BrowserThread::PostTask(
- BrowserThread::UI,
- FROM_HERE,
- base::Bind(&DownloadDestinationObserver::DestinationCompleted,
- observer_,
- file_.bytes_so_far(),
- base::Passed(&hash_state)));
+
+ bool all_stream_complete = true;
+ for (auto& stream : source_streams_) {
+ if (!stream.second->is_finished()) {
+ all_stream_complete = false;
+ break;
+ }
+ }
+
+ // All the stream reader are completed, shut down file IO processing.
+ if (all_stream_complete) {
+ RecordFileBandwidth(bytes_seen_, disk_writes_time_,
+ base::TimeTicks::Now() - download_start_);
+ weak_factory_.InvalidateWeakPtrs();
qinmin 2017/03/02 00:12:23 ditto: reset timer here?
xingliu 2017/03/02 01:18:35 Done, RecordFileBandwidth and update_timer_ is re
+ std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
+ update_timer_.reset();
+ BrowserThread::PostTask(
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadDestinationObserver::DestinationCompleted,
+ observer_, TotalBytesReceived(),
+ base::Passed(&hash_state)));
+ }
}
if (net_log_.IsCapturing()) {
net_log_.AddEvent(net::NetLogEventType::DOWNLOAD_STREAM_DRAINED,
@@ -344,14 +416,37 @@ void DownloadFileImpl::StreamActive() {
}
}
+void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
+ DCHECK_CURRENTLY_ON(BrowserThread::FILE);
+ ByteStreamReader* stream_reader = source_stream->stream_reader();
+ if (stream_reader) {
+ source_stream->stream_reader()->RegisterCallback(
qinmin 2017/03/02 00:12:23 just use stream_reader
xingliu 2017/03/02 01:18:35 Done.
+ base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
+ source_stream));
+ StreamActive(source_stream);
+ }
+}
+
+int64_t DownloadFileImpl::TotalBytesReceived() const {
+ // TODO(xingliu): Use slice info to figure out total bytes received.
+ return file_.bytes_so_far();
+}
+
void DownloadFileImpl::SendUpdate() {
+ // TODO(xingliu): Update slice info to observer to update history db.
BrowserThread::PostTask(
- BrowserThread::UI,
- FROM_HERE,
- base::Bind(&DownloadDestinationObserver::DestinationUpdate,
- observer_,
- file_.bytes_so_far(),
- rate_estimator_.GetCountPerSecond()));
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadDestinationObserver::DestinationUpdate, observer_,
+ TotalBytesReceived(), rate_estimator_.GetCountPerSecond()));
+}
+
+void DownloadFileImpl::WillWriteToDisk(size_t data_len) {
+ if (!update_timer_->IsRunning()) {
+ update_timer_->Start(FROM_HERE,
+ base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
+ this, &DownloadFileImpl::SendUpdate);
+ }
+ rate_estimator_.Increment(data_len);
}
DownloadFileImpl::RenameParameters::RenameParameters(
« no previous file with comments | « content/browser/download/download_file_impl.h ('k') | content/browser/download/download_file_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698