Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(584)

Unified Diff: third_party/WebKit/Source/platform/blob/BlobBytesProvider.cpp

Issue 2892953006: WIP POC blob transport over mojo
Patch Set: pass mojo blobs over ipc Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/WebKit/Source/platform/blob/BlobBytesProvider.cpp
diff --git a/third_party/WebKit/Source/platform/blob/BlobBytesProvider.cpp b/third_party/WebKit/Source/platform/blob/BlobBytesProvider.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..9cebec56988606c3d70d5af632a817457fda3502
--- /dev/null
+++ b/third_party/WebKit/Source/platform/blob/BlobBytesProvider.cpp
@@ -0,0 +1,162 @@
+// Copyright 2017 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "platform/blob/BlobBytesProvider.h"
+
+#include "platform/wtf/Functional.h"
+#include "public/platform/Platform.h"
+
+namespace blink {
+
+namespace {
+
+class BlobBytesStreamer {
+ public:
+ BlobBytesStreamer(Vector<RefPtr<RawData>> data,
+ mojo::ScopedDataPipeProducerHandle pipe)
+ : data_(std::move(data)),
+ pipe_(std::move(pipe)),
+ watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::AUTOMATIC) {
+ watcher_.Watch(pipe_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
+ ConvertToBaseCallback(WTF::Bind(
+ &BlobBytesStreamer::OnWritable, WTF::Unretained(this))));
+ }
+
+ void OnWritable(MojoResult result) {
+ if (result == MOJO_RESULT_CANCELLED ||
+ result == MOJO_RESULT_FAILED_PRECONDITION) {
+ LOG(INFO) << "Deleting, current item: " << current_item_
+ << ", current pos: " << current_item_offset_;
+ delete this;
+ return;
+ }
+ DCHECK_EQ(result, MOJO_RESULT_OK);
+
+ while (true) {
+ uint32_t num_bytes =
+ data_[current_item_]->length() - current_item_offset_;
+ MojoResult write_result = mojo::WriteDataRaw(
+ pipe_.get(), data_[current_item_]->data() + current_item_offset_,
+ &num_bytes, MOJO_WRITE_DATA_FLAG_NONE);
+ if (write_result == MOJO_RESULT_OK) {
+ LOG(INFO) << "Wrote " << num_bytes;
+ current_item_offset_ += num_bytes;
+ if (current_item_offset_ >= data_[current_item_]->length()) {
+ current_item_++;
+ if (current_item_ >= int(data_.size())) {
+ // Send all items completely, done.
+ LOG(INFO) << "Done writing, deleting self";
+ delete this;
+ return;
+ }
+ }
+ } else if (write_result == MOJO_RESULT_SHOULD_WAIT) {
+ break;
+ } else {
+ // Something went wrong.
+ delete this;
+ return;
+ }
+ }
+ }
+
+ private:
+ int current_item_ = 0;
+ size_t current_item_offset_ = 0;
+ Vector<RefPtr<RawData>> data_;
+
+ mojo::ScopedDataPipeProducerHandle pipe_;
+ mojo::SimpleWatcher watcher_;
+};
+
+Time TimeFromDouble(double dt) {
+ if (dt == 0 || std::isnan(dt))
+ return Time();
+ return Time::FromSeconds(base::Time::kTimeTToMicrosecondsOffset / 1000000 +
+ dt);
+}
+
+} // namespace
+
+BlobBytesProvider::BlobBytesProvider(RefPtr<RawData> data) {
+ Platform::Current()->IncProcessRefCount();
+ data_.push_back(std::move(data));
+}
+
+BlobBytesProvider::~BlobBytesProvider() {
+ Platform::Current()->DecProcessRefCount();
+ LOG(INFO) << "Destroying bytes provider for " << uuid;
+}
+
+void BlobBytesProvider::AppendData(RefPtr<RawData> data) {
+ data_.push_back(std::move(data));
+}
+
+void BlobBytesProvider::RequestAsStream(
+ mojo::ScopedDataPipeProducerHandle pipe) {
+ LOG(INFO) << "Data being requested as stream for " << uuid;
+ // Will self delete when done.
+ new BlobBytesStreamer(data_, std::move(pipe));
+}
+
+void BlobBytesProvider::RequestAsFile(uint64_t source_offset,
+ uint64_t source_size,
+ base::File file,
+ uint64_t file_offset,
+ RequestAsFileCallback callback) {
+ LOG(INFO) << "Data being requested as files" << uuid;
+ // TODO(mek): Do on correct thread
+ if (!file.IsValid()) {
+ std::move(callback).Run(WTF::nullopt);
+ return;
+ }
+
+ int64_t seek_distance =
+ file.Seek(base::File::FROM_BEGIN, SafeCast<int64_t>(file_offset));
+ bool seek_failed = seek_distance < 0;
+ // histogram Storage.Blob.RendererFileSeekFailed
+ if (seek_failed) {
+ std::move(callback).Run(WTF::nullopt);
+ return;
+ }
+
+ // TODO(mek): More efficient way to find beginning.
+ uint64_t offset = 0;
+ for (const RefPtr<RawData>& data : data_) {
+ if (offset + data->length() > source_offset) {
+ uint64_t data_offset = source_offset - offset;
+ uint64_t data_size = std::min(data->length() - data_offset,
+ source_size - offset - source_offset);
+ size_t written = 0;
+ while (written < data_size) {
+ size_t writing_size = data_size - written;
+ int actual_written =
+ file.WriteAtCurrentPos(data->data() + data_offset + written,
+ static_cast<int>(writing_size));
+ bool write_failed = actual_written < 0;
+ // histogram Storage.Blob.RendererFileWriteFailed
+ if (write_failed) {
+ std::move(callback).Run(WTF::nullopt);
+ return;
+ }
+ written += actual_written;
+ }
+ }
+ offset += data->length();
+ }
+
+ if (!file.Flush()) {
+ std::move(callback).Run(WTF::nullopt);
+ return;
+ }
+ base::File::Info info;
+ if (!file.GetInfo(&info)) {
+ std::move(callback).Run(WTF::nullopt);
+ return;
+ }
+ LOG(INFO) << "About to call final callback " << info.last_modified;
+ std::move(callback).Run(TimeFromDouble(info.last_modified.ToDoubleT()));
+}
+
+} // namespace blink
« no previous file with comments | « third_party/WebKit/Source/platform/blob/BlobBytesProvider.h ('k') | third_party/WebKit/Source/platform/blob/BlobData.h » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698