Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(534)

Unified Diff: storage/browser/blob/blob_registry_impl.cc

Issue 2892953006: WIP POC blob transport over mojo
Patch Set: pass mojo blobs over ipc Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « storage/browser/blob/blob_registry_impl.h ('k') | third_party/WebKit/Source/platform/BUILD.gn » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: storage/browser/blob/blob_registry_impl.cc
diff --git a/storage/browser/blob/blob_registry_impl.cc b/storage/browser/blob/blob_registry_impl.cc
new file mode 100644
index 0000000000000000000000000000000000000000..c29a674313a45f3af38298a1dfcb429ad10a0b16
--- /dev/null
+++ b/storage/browser/blob/blob_registry_impl.cc
@@ -0,0 +1,377 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "storage/browser/blob/blob_registry_impl.h"
+
+#include "storage/browser/blob/blob_data_builder.h"
+#include "storage/browser/blob/blob_impl.h"
+#include "storage/browser/blob/blob_storage_context.h"
+
+namespace storage {
+
+namespace {
+
+using MemoryStrategy = BlobMemoryController::Strategy;
+
+bool CalculateBlobMemorySize(const std::vector<mojom::DataElementPtr>& elements,
+ size_t* shortcut_bytes,
+ uint64_t* total_bytes) {
+ DCHECK(shortcut_bytes);
+ DCHECK(total_bytes);
+
+ base::CheckedNumeric<uint64_t> total_size_checked = 0;
+ base::CheckedNumeric<size_t> shortcut_size_checked = 0;
+ for (const auto& e : elements) {
+ if (e->is_bytes()) {
+ total_size_checked += e->get_bytes().size();
+ shortcut_size_checked += e->get_bytes().size();
+ } else if (e->is_large_bytes()) {
+ total_size_checked += e->get_large_bytes()->length;
+ } else {
+ continue;
+ }
+ if (!total_size_checked.IsValid() || !shortcut_size_checked.IsValid())
+ return false;
+ }
+ *shortcut_bytes = shortcut_size_checked.ValueOrDie();
+ *total_bytes = total_size_checked.ValueOrDie();
+ return true;
+}
+
+class ByteStreamReceiver {
+ public:
+ ByteStreamReceiver(mojo::ScopedDataPipeConsumerHandle pipe,
+ BlobDataBuilder* builder,
+ size_t item_index,
+ size_t expected_length,
+ base::OnceCallback<void(bool success)> done_callback)
+ : builder_(builder),
+ item_index_(item_index),
+ expected_length_(expected_length),
+ pipe_(std::move(pipe)),
+ watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC),
+ done_callback_(std::move(done_callback)) {
+ watcher_.Watch(
+ pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
+ base::Bind(&ByteStreamReceiver::OnReadable, base::Unretained(this)));
+ }
+
+ ~ByteStreamReceiver() {
+ std::move(done_callback_).Run(offset_ == expected_length_);
+ LOG(INFO) << "Read " << offset_ << " out of " << expected_length_
+ << " bytes";
+ }
+
+ void OnReadable(MojoResult result) {
+ if (result == MOJO_RESULT_CANCELLED ||
+ result == MOJO_RESULT_FAILED_PRECONDITION) {
+ LOG(INFO) << "Done reading, closing connection: " << result << " for "
+ << builder_->uuid();
+ delete this;
+ return;
+ }
+ DCHECK_EQ(result, MOJO_RESULT_OK);
+
+ // TODO(mek): Use two-phase reads to directly read into BlobDataBuilder
+ while (true) {
+ uint32_t num_bytes = 0;
+ MojoResult query_result = mojo::ReadDataRaw(
+ pipe_.get(), nullptr, &num_bytes, MOJO_READ_DATA_FLAG_QUERY);
+ if (query_result == MOJO_RESULT_SHOULD_WAIT)
+ break;
+ DCHECK_EQ(query_result, MOJO_RESULT_OK);
+
+ LOG(INFO) << "Have " << num_bytes << " to read";
+ std::vector<char> data(num_bytes);
+ query_result = mojo::ReadDataRaw(pipe_.get(), data.data(), &num_bytes,
+ MOJO_READ_DATA_FLAG_ALL_OR_NONE);
+ if (query_result == MOJO_RESULT_SHOULD_WAIT)
+ break;
+ DCHECK_EQ(query_result, MOJO_RESULT_OK);
+ bool result = builder_->PopulateFutureData(item_index_, data.data(),
+ offset_, num_bytes);
+ offset_ += num_bytes;
+ DCHECK(result);
+ }
+ }
+
+ private:
+ BlobDataBuilder* builder_;
+ size_t item_index_;
+ size_t expected_length_;
+ mojo::ScopedDataPipeConsumerHandle pipe_;
+ mojo::SimpleWatcher watcher_;
+ size_t offset_ = 0;
+ base::OnceCallback<void(bool success)> done_callback_;
+};
+
+} // namespace
+
+BlobRegistryImpl::BlobRegistryImpl(BlobStorageContext* context)
+ : context_(context), weak_ptr_factory_(this) {}
+
+BlobRegistryImpl::~BlobRegistryImpl() {
+ LOG(INFO) << "Destroying blob registry";
+}
+
+void BlobRegistryImpl::Bind(const service_manager::BindSourceInfo& source_info,
+ storage::mojom::BlobRegistryRequest request) {
+ bindings_.AddBinding(this, std::move(request));
+}
+
+void BlobRegistryImpl::Register(mojom::BlobRequest blob,
+ const std::string& uuid,
+ const std::string& content_type,
+ const std::string& content_disposition,
+ std::vector<mojom::DataElementPtr> elements,
+ RegisterCallback callback) {
+ RegisterWithBlobUUIDs(std::move(blob), uuid, content_type,
+ content_disposition, std::move(elements),
+ std::vector<std::string>(), std::move(callback), "");
+}
+
+void BlobRegistryImpl::RegisterWithBlobUUIDs(
+ mojom::BlobRequest blob,
+ const std::string& uuid,
+ const std::string& content_type,
+ const std::string& content_disposition,
+ std::vector<mojom::DataElementPtr> elements,
+ std::vector<std::string> blob_uuids,
+ RegisterCallback callback,
+ const std::string& next_blob) {
+ if (!next_blob.empty())
+ blob_uuids.push_back(next_blob);
+ while (blob_uuids.size() < elements.size() &&
+ !elements[blob_uuids.size()]->is_blob())
+ blob_uuids.push_back("");
+ if (blob_uuids.size() < elements.size()) {
+ // next element is a blob, request its UUID
+ elements[blob_uuids.size()]->get_blob()->blob->InternalGetUUID(
+ base::BindOnce(&BlobRegistryImpl::RegisterWithBlobUUIDs,
+ weak_ptr_factory_.GetWeakPtr(), std::move(blob), uuid,
+ content_type, content_disposition, std::move(elements),
+ std::move(blob_uuids), std::move(callback)));
+ return;
+ }
+
+ LOG(INFO) << "Register " << uuid << " contains " << elements.size()
+ << " elements";
+ if (uuid.empty() || context_->registry().HasEntry(uuid)) {
+ LOG(INFO) << "Invalid UUID";
+ std::move(callback).Run();
+ return;
+ }
+
+ // TODO(mek): can read files/filesystem checks
+
+ // TODO(mek): Validate that our referenced blobs aren't us
+
+ uint64_t transport_memory_size = 0;
+ size_t shortcut_size = 0;
+ if (!CalculateBlobMemorySize(elements, &shortcut_size,
+ &transport_memory_size)) {
+ LOG(INFO) << "Can't calculate size";
+ std::unique_ptr<BlobDataHandle> handle =
+ context_->AddBrokenBlob(uuid, content_type, content_disposition,
+ BlobStatus::ERR_INVALID_CONSTRUCTION_ARGUMENTS);
+ new BlobImpl(std::move(handle), std::move(blob));
+ std::move(callback).Run();
+ return;
+ }
+
+ const BlobMemoryController& memory_controller = context_->memory_controller();
+ MemoryStrategy memory_strategy =
+ memory_controller.DetermineStrategy(shortcut_size, transport_memory_size);
+
+ auto builderPtr = base::MakeUnique<BlobDataBuilder>(uuid);
+ BlobDataBuilder& builder = *builderPtr;
+ builder.set_content_type(content_type);
+ builder.set_content_disposition(content_disposition);
+ int idx = 0;
+ std::list<std::pair<size_t, mojom::BytesProviderPtr>> future_data;
+ std::list<size_t> file_sizes;
+ for (const auto& e : elements) {
+ if (e->is_bytes()) {
+ LOG(INFO) << " Bytes: " << e->get_bytes().size();
+ builder.AppendData(reinterpret_cast<const char*>(e->get_bytes().data()),
+ e->get_bytes().size());
+ } else if (e->is_large_bytes()) {
+ LOG(INFO) << " Future Bytes: " << e->get_large_bytes()->length;
+ if (memory_strategy == MemoryStrategy::FILE) {
+ future_data.push_back(std::make_pair(
+ builder.AppendFutureFile(0, e->get_large_bytes()->length,
+ future_data.size()),
+ std::move(e->get_large_bytes()->data)));
+ } else {
+ future_data.push_back(std::make_pair(
+ builder.AppendFutureData(e->get_large_bytes()->length),
+ std::move(e->get_large_bytes()->data)));
+ }
+ file_sizes.push_back(e->get_large_bytes()->length);
+ } else if (e->is_file()) {
+ LOG(INFO) << " File: " << e->get_file()->length << " "
+ << e->get_file()->path;
+ builder.AppendFile(base::FilePath::FromUTF8Unsafe(e->get_file()->path),
+ e->get_file()->offset, e->get_file()->length,
+ e->get_file()->expected_modification_time);
+ } else if (e->is_file_filesystem()) {
+ LOG(INFO) << " Filesystem: " << e->get_file_filesystem()->length;
+ builder.AppendFileSystemFile(
+ e->get_file_filesystem()->url, e->get_file_filesystem()->offset,
+ e->get_file_filesystem()->length,
+ e->get_file_filesystem()->expected_modification_time);
+ } else if (e->is_blob()) {
+ LOG(INFO) << " Blob: " << e->get_blob()->length;
+ std::string ref_uuid = blob_uuids[idx];
+ builder.AppendBlob(ref_uuid, e->get_blob()->offset,
+ e->get_blob()->length);
+ } else {
+ NOTREACHED();
+ }
+ idx++;
+ }
+
+ switch (memory_strategy) {
+ case MemoryStrategy::TOO_LARGE: {
+ LOG(INFO) << "Blob too large";
+ std::unique_ptr<BlobDataHandle> handle =
+ context_->AddBrokenBlob(uuid, content_type, content_disposition,
+ BlobStatus::ERR_OUT_OF_MEMORY);
+ new BlobImpl(std::move(handle), std::move(blob));
+ std::move(callback).Run();
+ return;
+ }
+ case MemoryStrategy::NONE_NEEDED: {
+ DCHECK(future_data.empty());
+ std::unique_ptr<BlobDataHandle> handle = context_->BuildBlob(
+ builder, BlobStorageContext::TransportAllowedCallback());
+ new BlobImpl(std::move(handle), std::move(blob));
+ std::move(callback).Run();
+ return;
+ }
+ case MemoryStrategy::IPC:
+ // No separate IPC strategy in mojo for now.
+ case MemoryStrategy::SHARED_MEMORY: {
+ // DCHECK(!future_data.empty());
+ LOG(INFO) << " Future as stream";
+ std::unique_ptr<BlobDataHandle> handle = context_->BuildBlob(
+ builder, base::Bind(&BlobRegistryImpl::OnReadyForDataStreams,
+ weak_ptr_factory_.GetWeakPtr(), uuid,
+ base::Passed(std::move(builderPtr)),
+ base::Passed(std::move(future_data)),
+ base::Passed(std::move(file_sizes))));
+ new BlobImpl(std::move(handle), std::move(blob));
+ std::move(callback).Run();
+ return;
+ }
+ case MemoryStrategy::FILE: {
+ // DCHECK(!future_data.empty());
+ LOG(INFO) << " Future as files";
+ std::unique_ptr<BlobDataHandle> handle = context_->BuildBlob(
+ builder, base::Bind(&BlobRegistryImpl::OnReadyForFileTransport,
+ weak_ptr_factory_.GetWeakPtr(), uuid,
+ base::Passed(std::move(builderPtr)),
+ base::Passed(std::move(future_data)),
+ base::Passed(std::move(file_sizes))));
+ new BlobImpl(std::move(handle), std::move(blob));
+ std::move(callback).Run();
+ return;
+ }
+ }
+ NOTREACHED();
+}
+
+void BlobRegistryImpl::DeprecatedGetBlob(const std::string& uuid,
+ mojom::BlobRequest blob) {
+ LOG(INFO) << "Get " << uuid;
+ new BlobImpl(context_->GetBlobDataFromUUID(uuid), std::move(blob));
+}
+
+void BlobRegistryImpl::OnReadyForDataStreams(
+ const std::string& uuid,
+ std::unique_ptr<BlobDataBuilder> builder,
+ std::list<std::pair<size_t, mojom::BytesProviderPtr>> future_data,
+ std::list<size_t> file_sizes,
+ BlobStatus status,
+ std::vector<BlobMemoryController::FileCreationInfo> file_info) {
+ if (future_data.empty()) {
+ OnDataStreamDone(uuid);
+ return;
+ }
+
+ LOG(INFO) << "Ready for datastreams: " << int(status) << " for " << uuid;
+ size_t index = future_data.begin()->first;
+ size_t size = *file_sizes.begin();
+ mojom::BytesProviderPtr ptr = std::move(future_data.begin()->second);
+ future_data.pop_front();
+ file_sizes.pop_front();
+ mojo::DataPipe pipe;
+ ptr->RequestAsStream(std::move(pipe.producer_handle));
+ BlobDataBuilder* b = builder.get();
+ new ByteStreamReceiver(
+ std::move(pipe.consumer_handle), b, index, size,
+ base::BindOnce(
+ &BlobRegistryImpl::OnReadDatastream, weak_ptr_factory_.GetWeakPtr(),
+ uuid,
+ base::BindOnce(&BlobRegistryImpl::OnReadyForDataStreams,
+ weak_ptr_factory_.GetWeakPtr(), uuid,
+ std::move(builder), std::move(future_data),
+ std::move(file_sizes), status, std::move(file_info))));
+}
+
+void BlobRegistryImpl::OnReadDatastream(const std::string& uuid,
+ base::OnceClosure next_read_callback,
+ bool success) {
+ if (!success) {
+ context_->CancelBuildingBlob(uuid, BlobStatus::ERR_SOURCE_DIED_IN_TRANSIT);
+ return;
+ }
+ std::move(next_read_callback).Run();
+}
+
+void BlobRegistryImpl::OnReadyForFileTransport(
+ const std::string& uuid,
+ std::unique_ptr<BlobDataBuilder> builder,
+ std::list<std::pair<size_t, mojom::BytesProviderPtr>> future_data,
+ std::list<size_t> file_sizes,
+ BlobStatus status,
+ std::vector<BlobMemoryController::FileCreationInfo> file_info) {
+ LOG(INFO) << "Ready for files for " << uuid << " (" << int(status) << ")";
+ LOG(INFO) << " Files: " << file_info.size()
+ << ", data blocks: " << future_data.size();
+ BlobDataBuilder* b = builder.release();
+ auto size_it = file_sizes.begin();
+ auto info_it = file_info.begin();
+ for (auto& p : future_data) {
+ LOG(INFO) << " Requesting data as file";
+ mojom::BytesProviderPtr ptr = std::move(p.second);
+ ptr->RequestAsFile(
+ 0, *size_it, std::move(info_it->file), 0,
+ base::BindOnce(&BlobRegistryImpl::OnFileDone,
+ weak_ptr_factory_.GetWeakPtr(), uuid, b, p.first,
+ std::move(ptr), info_it->file_reference));
+ ++size_it;
+ ++info_it;
+ }
+}
+
+void BlobRegistryImpl::OnDataStreamDone(const std::string& uuid) {
+ if (context_->registry().HasEntry(uuid))
+ context_->NotifyTransportComplete(uuid);
+}
+
+void BlobRegistryImpl::OnFileDone(
+ const std::string& uuid,
+ BlobDataBuilder* builder,
+ size_t item_index,
+ mojom::BytesProviderPtr ptr,
+ const scoped_refptr<ShareableFileReference>& file_reference,
+ base::Optional<base::Time> time_file_modified) {
+ LOG(INFO) << "File done for " << uuid << " " << *time_file_modified;
+ builder->PopulateFutureFile(item_index, file_reference, *time_file_modified);
+ if (context_->registry().HasEntry(uuid))
+ context_->NotifyTransportComplete(uuid);
+}
+
+} // namespace storage
« no previous file with comments | « storage/browser/blob/blob_registry_impl.h ('k') | third_party/WebKit/Source/platform/BUILD.gn » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698