Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Unified Diff: content/browser/download/download_file_impl.cc

Issue 2712713007: Make DownloadFileImpl handle multiple byte streams. (Closed)
Patch Set: Remove the AppendDataToFile call, fix the browsertest. Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « content/browser/download/download_file_impl.h ('k') | content/browser/download/download_file_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: content/browser/download/download_file_impl.cc
diff --git a/content/browser/download/download_file_impl.cc b/content/browser/download/download_file_impl.cc
index 5d9ac7997b08e668e65b4c863a14029ee510afbe..de810fed95897b341fd4743dc3c06663e33c3d31 100644
--- a/content/browser/download/download_file_impl.cc
+++ b/content/browser/download/download_file_impl.cc
@@ -9,6 +9,7 @@
#include "base/bind.h"
#include "base/files/file_util.h"
+#include "base/memory/ptr_util.h"
#include "base/strings/stringprintf.h"
#include "base/time/time.h"
#include "base/values.h"
@@ -40,11 +41,26 @@ const int kInitialRenameRetryDelayMs = 200;
// Number of times a failing rename is retried before giving up.
const int kMaxRenameRetries = 3;
+DownloadFileImpl::SourceStream::SourceStream(int64_t offset, int64_t length)
+ : offset_(offset), length_(length), bytes_written_(0), finished_(false) {}
+
+DownloadFileImpl::SourceStream::~SourceStream() = default;
+
+void DownloadFileImpl::SourceStream::SetByteStream(
+ std::unique_ptr<ByteStreamReader> stream_reader) {
+ stream_reader_ = std::move(stream_reader);
+}
+
+void DownloadFileImpl::SourceStream::OnWriteBytesToDisk(int64_t bytes_write) {
+ bytes_written_ += bytes_write;
+}
+
DownloadFileImpl::DownloadFileImpl(
std::unique_ptr<DownloadSaveInfo> save_info,
const base::FilePath& default_download_directory,
- std::unique_ptr<ByteStreamReader> stream,
+ std::unique_ptr<ByteStreamReader> stream_reader,
const net::NetLogWithSource& download_item_net_log,
+ bool is_sparse_file,
base::WeakPtr<DownloadDestinationObserver> observer)
: net_log_(
net::NetLogWithSource::Make(download_item_net_log.net_log(),
@@ -52,10 +68,15 @@ DownloadFileImpl::DownloadFileImpl(
file_(net_log_),
save_info_(std::move(save_info)),
default_download_directory_(default_download_directory),
- stream_reader_(std::move(stream)),
+ is_sparse_file_(is_sparse_file),
bytes_seen_(0),
observer_(observer),
weak_factory_(this) {
+ source_streams_[save_info_->offset] = base::MakeUnique<SourceStream>(
+ save_info_->offset, DownloadSaveInfo::kLengthFullContent);
+ DCHECK(source_streams_.size() == static_cast<size_t>(1));
+ source_streams_[save_info_->offset]->SetByteStream(std::move(stream_reader));
+
download_item_net_log.AddEvent(
net::NetLogEventType::DOWNLOAD_FILE_CREATED,
net_log_.source().ToEventParametersCallback());
@@ -74,46 +95,50 @@ void DownloadFileImpl::Initialize(const InitializeCallback& callback) {
update_timer_.reset(new base::RepeatingTimer());
DownloadInterruptReason result =
- file_.Initialize(save_info_->file_path,
- default_download_directory_,
- std::move(save_info_->file),
- save_info_->offset,
+ file_.Initialize(save_info_->file_path, default_download_directory_,
+ std::move(save_info_->file), save_info_->offset,
save_info_->hash_of_partial_file,
- std::move(save_info_->hash_state),
- false);
+ std::move(save_info_->hash_state), is_sparse_file_);
if (result != DOWNLOAD_INTERRUPT_REASON_NONE) {
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE, base::Bind(callback, result));
return;
}
- stream_reader_->RegisterCallback(
- base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr()));
-
download_start_ = base::TimeTicks::Now();
// Primarily to make reset to zero in restart visible to owner.
SendUpdate();
// Initial pull from the straw.
- StreamActive();
+ for (auto& source_stream : source_streams_)
+ RegisterAndActivateStream(source_stream.second.get());
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE, base::Bind(
callback, DOWNLOAD_INTERRUPT_REASON_NONE));
}
-DownloadInterruptReason DownloadFileImpl::AppendDataToFile(
- const char* data, size_t data_len) {
+void DownloadFileImpl::AddByteStream(
+ std::unique_ptr<ByteStreamReader> stream_reader,
+ int64_t offset) {
DCHECK_CURRENTLY_ON(BrowserThread::FILE);
- if (!update_timer_->IsRunning()) {
- update_timer_->Start(FROM_HERE,
- base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
- this, &DownloadFileImpl::SendUpdate);
- }
- rate_estimator_.Increment(data_len);
- return file_.AppendDataToFile(data, data_len);
+ // The |source_streams_| must have an existing entry for the stream reader.
+ auto current_source_stream = source_streams_.find(offset);
+ DCHECK(current_source_stream != source_streams_.end());
+ SourceStream* stream = current_source_stream->second.get();
+ stream->SetByteStream(std::move(stream_reader));
+
+ RegisterAndActivateStream(stream);
+}
+
+DownloadInterruptReason DownloadFileImpl::WriteDataToFile(int64_t offset,
+ const char* data,
+ size_t data_len) {
+ DCHECK_CURRENTLY_ON(BrowserThread::FILE);
+ WillWriteToDisk(data_len);
+ return file_.WriteDataToFile(offset, data, data_len);
}
void DownloadFileImpl::RenameAndUniquify(
@@ -213,7 +238,11 @@ void DownloadFileImpl::RenameWithRetryInternal(
SendUpdate();
// Null out callback so that we don't do any more stream processing.
- stream_reader_->RegisterCallback(base::Closure());
+ for (auto& stream : source_streams_) {
+ ByteStreamReader* stream_reader = stream.second->stream_reader();
+ if (stream_reader)
+ stream_reader->RegisterCallback(base::Closure());
+ }
new_path.clear();
}
@@ -240,13 +269,15 @@ bool DownloadFileImpl::InProgress() const {
return file_.in_progress();
}
-void DownloadFileImpl::StreamActive() {
+void DownloadFileImpl::StreamActive(SourceStream* source_stream) {
+ DCHECK(source_stream->stream_reader());
base::TimeTicks start(base::TimeTicks::Now());
base::TimeTicks now;
scoped_refptr<net::IOBuffer> incoming_data;
size_t incoming_data_size = 0;
size_t total_incoming_data_size = 0;
size_t num_buffers = 0;
+ bool should_terminate = false;
ByteStreamReader::StreamState state(ByteStreamReader::STREAM_EMPTY);
DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_NONE;
base::TimeDelta delta(
@@ -254,7 +285,8 @@ void DownloadFileImpl::StreamActive() {
// Take care of any file local activity required.
do {
- state = stream_reader_->Read(&incoming_data, &incoming_data_size);
+ state = source_stream->stream_reader()->Read(&incoming_data,
+ &incoming_data_size);
switch (state) {
case ByteStreamReader::STREAM_EMPTY:
@@ -263,24 +295,30 @@ void DownloadFileImpl::StreamActive() {
{
++num_buffers;
base::TimeTicks write_start(base::TimeTicks::Now());
- reason = AppendDataToFile(
+ // Stop the stream if it writes more bytes than expected.
+ if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
+ source_stream->bytes_written() +
+ static_cast<int64_t>(incoming_data_size) >=
+ source_stream->length()) {
+ should_terminate = true;
+ incoming_data_size =
+ source_stream->length() - source_stream->bytes_written();
+ }
+ reason = WriteDataToFile(
+ source_stream->offset() + source_stream->bytes_written(),
incoming_data.get()->data(), incoming_data_size);
disk_writes_time_ += (base::TimeTicks::Now() - write_start);
bytes_seen_ += incoming_data_size;
total_incoming_data_size += incoming_data_size;
+ if (reason == DOWNLOAD_INTERRUPT_REASON_NONE)
+ source_stream->OnWriteBytesToDisk(incoming_data_size);
}
break;
case ByteStreamReader::STREAM_COMPLETE:
{
reason = static_cast<DownloadInterruptReason>(
- stream_reader_->GetStatus());
+ source_stream->stream_reader()->GetStatus());
SendUpdate();
- base::TimeTicks close_start(base::TimeTicks::Now());
- base::TimeTicks now(base::TimeTicks::Now());
- disk_writes_time_ += (now - close_start);
- RecordFileBandwidth(
- bytes_seen_, disk_writes_time_, now - download_start_);
- update_timer_.reset();
}
break;
default:
@@ -289,16 +327,16 @@ void DownloadFileImpl::StreamActive() {
}
now = base::TimeTicks::Now();
} while (state == ByteStreamReader::STREAM_HAS_DATA &&
- reason == DOWNLOAD_INTERRUPT_REASON_NONE &&
- now - start <= delta);
+ reason == DOWNLOAD_INTERRUPT_REASON_NONE && now - start <= delta &&
+ !should_terminate);
// If we're stopping to yield the thread, post a task so we come back.
- if (state == ByteStreamReader::STREAM_HAS_DATA &&
- now - start > delta) {
+ if (state == ByteStreamReader::STREAM_HAS_DATA && now - start > delta &&
+ !should_terminate) {
BrowserThread::PostTask(
BrowserThread::FILE, FROM_HERE,
- base::Bind(&DownloadFileImpl::StreamActive,
- weak_factory_.GetWeakPtr()));
+ base::Bind(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
+ source_stream));
}
if (total_incoming_data_size)
@@ -311,31 +349,43 @@ void DownloadFileImpl::StreamActive() {
// Error case for both upstream source and file write.
// Shut down processing and signal an error to our observer.
// Our observer will clean us up.
- stream_reader_->RegisterCallback(base::Closure());
+ source_stream->stream_reader()->RegisterCallback(base::Closure());
weak_factory_.InvalidateWeakPtrs();
SendUpdate(); // Make info up to date before error.
std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
BrowserThread::PostTask(
- BrowserThread::UI,
- FROM_HERE,
- base::Bind(&DownloadDestinationObserver::DestinationError,
- observer_,
- reason,
- file_.bytes_so_far(),
- base::Passed(&hash_state)));
- } else if (state == ByteStreamReader::STREAM_COMPLETE) {
- // Signal successful completion and shut down processing.
- stream_reader_->RegisterCallback(base::Closure());
- weak_factory_.InvalidateWeakPtrs();
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadDestinationObserver::DestinationError, observer_,
+ reason, TotalBytesReceived(), base::Passed(&hash_state)));
+ } else if (state == ByteStreamReader::STREAM_COMPLETE || should_terminate) {
+ // Signal successful completion or termination of the current stream.
+ source_stream->stream_reader()->RegisterCallback(base::Closure());
+ source_stream->set_finished(true);
+
+ // Inform observers.
SendUpdate();
- std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
- BrowserThread::PostTask(
- BrowserThread::UI,
- FROM_HERE,
- base::Bind(&DownloadDestinationObserver::DestinationCompleted,
- observer_,
- file_.bytes_so_far(),
- base::Passed(&hash_state)));
+
+ bool all_stream_complete = true;
+ for (auto& stream : source_streams_) {
+ if (!stream.second->is_finished()) {
+ all_stream_complete = false;
+ break;
+ }
+ }
+
+ // All the stream reader are completed, shut down file IO processing.
+ if (all_stream_complete) {
+ RecordFileBandwidth(bytes_seen_, disk_writes_time_,
+ base::TimeTicks::Now() - download_start_);
+ weak_factory_.InvalidateWeakPtrs();
+ std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
+ update_timer_.reset();
+ BrowserThread::PostTask(
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadDestinationObserver::DestinationCompleted,
+ observer_, TotalBytesReceived(),
+ base::Passed(&hash_state)));
+ }
}
if (net_log_.IsCapturing()) {
net_log_.AddEvent(net::NetLogEventType::DOWNLOAD_STREAM_DRAINED,
@@ -344,17 +394,40 @@ void DownloadFileImpl::StreamActive() {
}
}
+void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
+ DCHECK_CURRENTLY_ON(BrowserThread::FILE);
+ ByteStreamReader* stream_reader = source_stream->stream_reader();
+ if (stream_reader) {
+ stream_reader->RegisterCallback(base::Bind(&DownloadFileImpl::StreamActive,
+ weak_factory_.GetWeakPtr(),
+ source_stream));
+ StreamActive(source_stream);
+ }
+}
+
+int64_t DownloadFileImpl::TotalBytesReceived() const {
+ // TODO(xingliu): Use slice info to figure out total bytes received.
+ return file_.bytes_so_far();
+}
+
void DownloadFileImpl::SendUpdate() {
+ // TODO(xingliu): Update slice info to observer to update history db.
BrowserThread::PostTask(
- BrowserThread::UI,
- FROM_HERE,
- base::Bind(&DownloadDestinationObserver::DestinationUpdate,
- observer_,
- file_.bytes_so_far(),
- rate_estimator_.GetCountPerSecond(),
+ BrowserThread::UI, FROM_HERE,
+ base::Bind(&DownloadDestinationObserver::DestinationUpdate, observer_,
+ TotalBytesReceived(), rate_estimator_.GetCountPerSecond(),
std::vector<DownloadItem::ReceivedSlice>()));
}
+void DownloadFileImpl::WillWriteToDisk(size_t data_len) {
+ if (!update_timer_->IsRunning()) {
+ update_timer_->Start(FROM_HERE,
+ base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
+ this, &DownloadFileImpl::SendUpdate);
+ }
+ rate_estimator_.Increment(data_len);
+}
+
DownloadFileImpl::RenameParameters::RenameParameters(
RenameOption option,
const base::FilePath& new_path,
« no previous file with comments | « content/browser/download/download_file_impl.h ('k') | content/browser/download/download_file_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698