Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(209)

Unified Diff: components/leveldb/leveldb_mojo_proxy.cc

Issue 1839823002: mojo leveldb: Remove the created file thread. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: General patch cleanup. Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: components/leveldb/leveldb_mojo_proxy.cc
diff --git a/components/leveldb/leveldb_mojo_proxy.cc b/components/leveldb/leveldb_mojo_proxy.cc
new file mode 100644
index 0000000000000000000000000000000000000000..1e36f261577dfdb9783a56bcd001faba685defd6
--- /dev/null
+++ b/components/leveldb/leveldb_mojo_proxy.cc
@@ -0,0 +1,472 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "components/leveldb/leveldb_mojo_proxy.h"
+
+#include <set>
+
+#include "base/bind.h"
+#include "mojo/message_pump/message_pump_mojo.h"
+#include "mojo/platform_handle/platform_handle_functions.h"
+#include "mojo/public/cpp/bindings/interface_request.h"
+
+namespace leveldb {
+
+struct LevelDBMojoProxy::OpaqueLock {
+ filesystem::FilePtr lock_file;
+};
+
+struct LevelDBMojoProxy::OpaqueDir {
+ explicit OpaqueDir(
+ mojo::InterfacePtrInfo<filesystem::Directory> directory_info) {
+ directory.Bind(std::move(directory_info));
+ }
+
+ filesystem::DirectoryPtr directory;
+};
+
+LevelDBMojoProxy::LevelDBMojoProxy(
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner)
+ : task_runner_(std::move(task_runner)), outstanding_opaque_dirs_(0) {}
+
+LevelDBMojoProxy::OpaqueDir* LevelDBMojoProxy::RegisterDirectory(
+ filesystem::DirectoryPtr directory) {
+ OpaqueDir* out_dir = nullptr;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ // If our task runner is the current thread, we switch to synchronous mode
+ // instead of posting tasks with WaitableEvents.
+ RegisterDirectoryImpl(directory.PassInterface(), nullptr, &out_dir);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
jam 2016/03/29 05:58:00 nit: no point in repeating this comment everywhere
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::RegisterDirectoryImpl, this,
+ base::Passed(directory.PassInterface()),
+ &done_event, &out_dir));
+ done_event.Wait();
+ }
+
+ return out_dir;
+}
+
+void LevelDBMojoProxy::UnregisterDirectory(OpaqueDir* dir) {
+ if (task_runner_->BelongsToCurrentThread()) {
+ UnregisterDirectoryImpl(dir, nullptr);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::UnregisterDirectoryImpl, this,
+ dir, &done_event));
+ done_event.Wait();
jam 2016/03/29 05:58:00 since this code block is duplicated in all methods
Elliot Glaysher 2016/03/29 17:33:29 So how does the WaitableEvent block the current me
+ }
+}
+
+base::File LevelDBMojoProxy::OpenFileHandle(OpaqueDir* dir,
+ const std::string& name,
+ uint32_t open_flags) {
+ base::File file;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ OpenFileHandleImpl(dir, name, open_flags, nullptr, &file);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::OpenFileHandleImpl, this, dir,
+ name, open_flags, &done_event, &file));
+ done_event.Wait();
+ }
+
+ return file;
+}
+
+filesystem::FileError LevelDBMojoProxy::SyncDirectory(OpaqueDir* dir,
+ const std::string& name) {
+ filesystem::FileError error = filesystem::FileError::FAILED;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ SyncDirectoryImpl(dir, name, nullptr, &error);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::SyncDirectoryImpl, this, dir,
+ name, &done_event, &error));
+ done_event.Wait();
+ }
+
+ return error;
+}
+
+bool LevelDBMojoProxy::FileExists(OpaqueDir* dir, const std::string& name) {
+ bool exists = false;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ FileExistsImpl(dir, name, nullptr, &exists);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::FileExistsImpl, this, dir,
+ name, &done_event, &exists));
+ done_event.Wait();
+ }
+
+ return exists;
+}
+
+filesystem::FileError LevelDBMojoProxy::GetChildren(
+ OpaqueDir* dir,
+ const std::string& path,
+ std::vector<std::string>* result) {
+ filesystem::FileError error = filesystem::FileError::FAILED;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ GetChildrenImpl(dir, path, result, nullptr, &error);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::GetChildrenImpl, this, dir,
+ path, result, &done_event, &error));
+ done_event.Wait();
+ }
+
+ return error;
+}
+
+filesystem::FileError LevelDBMojoProxy::Delete(OpaqueDir* dir,
+ const std::string& path,
+ uint32_t delete_flags) {
+ filesystem::FileError error = filesystem::FileError::FAILED;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ DeleteImpl(dir, path, delete_flags, nullptr, &error);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::DeleteImpl, this, dir, path,
+ delete_flags, &done_event, &error));
+ done_event.Wait();
+ }
+
+ return error;
+}
+
+filesystem::FileError LevelDBMojoProxy::CreateDir(OpaqueDir* dir,
+ const std::string& path) {
+ filesystem::FileError error = filesystem::FileError::FAILED;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ CreateDirImpl(dir, path, nullptr, &error);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::CreateDirImpl, this, dir, path,
+ &done_event, &error));
+ done_event.Wait();
+ }
+
+ return error;
+}
+
+filesystem::FileError LevelDBMojoProxy::GetFileSize(OpaqueDir* dir,
+ const std::string& path,
+ uint64_t* file_size) {
+ filesystem::FileError error = filesystem::FileError::FAILED;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ GetFileSizeImpl(dir, path, file_size, nullptr, &error);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::GetFileSizeImpl, this, dir,
+ path, file_size, &done_event, &error));
+ done_event.Wait();
+ }
+
+ return error;
+}
+
+filesystem::FileError LevelDBMojoProxy::RenameFile(
+ OpaqueDir* dir,
+ const std::string& old_path,
+ const std::string& new_path) {
+ filesystem::FileError error = filesystem::FileError::FAILED;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ RenameFileImpl(dir, old_path, new_path, nullptr, &error);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::RenameFileImpl, this, dir,
+ old_path, new_path, &done_event, &error));
+ done_event.Wait();
+ }
+
+ return error;
+}
+
+std::pair<filesystem::FileError, LevelDBMojoProxy::OpaqueLock*>
+LevelDBMojoProxy::LockFile(OpaqueDir* dir, const std::string& path) {
+ filesystem::FileError error = filesystem::FileError::FAILED;
+ OpaqueLock* out_lock = nullptr;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ LockFileImpl(dir, path, nullptr, &error, &out_lock);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::LockFileImpl, this, dir, path,
+ &done_event, &error, &out_lock));
+ done_event.Wait();
+ }
+
+ return std::make_pair(error, out_lock);
+}
+
+filesystem::FileError LevelDBMojoProxy::UnlockFile(OpaqueLock* lock) {
+ // Take ownership of the incoming lock so it gets destroyed whatever happens.
+ scoped_ptr<OpaqueLock> scoped_lock(lock);
+ filesystem::FileError error = filesystem::FileError::FAILED;
+
+ if (task_runner_->BelongsToCurrentThread()) {
+ UnlockFileImpl(std::move(scoped_lock), nullptr, &error);
+ } else {
+ // This proxies to the other thread, which proxies to mojo. Only on the
+ // reply from mojo do we return from this.
+ base::WaitableEvent done_event(false, false);
+ task_runner_->PostTask(
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::UnlockFileImpl, this,
+ base::Passed(&scoped_lock), &done_event, &error));
+ done_event.Wait();
+ }
+
+ return error;
+}
+
+LevelDBMojoProxy::~LevelDBMojoProxy() {
+ DCHECK_EQ(0, outstanding_opaque_dirs_);
+}
+
+void LevelDBMojoProxy::SignalIfNeeded(base::WaitableEvent* done_event) {
+ if (done_event)
+ done_event->Signal();
+}
+
+void LevelDBMojoProxy::RegisterDirectoryImpl(
+ mojo::InterfacePtrInfo<filesystem::Directory> directory_info,
+ base::WaitableEvent* done_event,
+ OpaqueDir** out_dir) {
+ // Take the Directory pipe and bind it on this thread.
+ *out_dir = new OpaqueDir(std::move(directory_info));
+ outstanding_opaque_dirs_++;
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::UnregisterDirectoryImpl(
+ OpaqueDir* dir,
+ base::WaitableEvent* done_event) {
+ // Only delete the directories on the thread that owns them.
+ delete dir;
+ outstanding_opaque_dirs_--;
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::OpenFileHandleImpl(OpaqueDir* dir,
+ std::string name,
+ uint32_t open_flags,
+ base::WaitableEvent* done_event,
+ base::File* output_file) {
+ mojo::ScopedHandle handle;
+ filesystem::FileError error = filesystem::FileError::FAILED;
+ bool completed = dir->directory->OpenFileHandle(mojo::String::From(name),
+ open_flags, &error, &handle);
+ DCHECK(completed);
+
+ if (error != filesystem::FileError::OK) {
+ *output_file = base::File(static_cast<base::File::Error>(error));
+ } else {
+ MojoPlatformHandle platform_handle;
+ MojoResult extract_result =
+ MojoExtractPlatformHandle(handle.release().value(), &platform_handle);
+
+ if (extract_result == MOJO_RESULT_OK) {
+ *output_file = base::File(platform_handle);
+ } else {
+ NOTREACHED();
+ *output_file = base::File(base::File::Error::FILE_ERROR_FAILED);
+ }
+ }
+
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::SyncDirectoryImpl(OpaqueDir* dir,
+ std::string name,
+ base::WaitableEvent* done_event,
+ filesystem::FileError* out_error) {
+ filesystem::DirectoryPtr target;
+ bool completed = dir->directory->OpenDirectory(
+ name, GetProxy(&target), filesystem::kFlagRead | filesystem::kFlagWrite,
+ out_error);
+ DCHECK(completed);
+
+ if (*out_error != filesystem::FileError::OK) {
+ SignalIfNeeded(done_event);
+ return;
+ }
+
+ completed = target->Flush(out_error);
+ DCHECK(completed);
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::FileExistsImpl(OpaqueDir* dir,
+ std::string name,
+ base::WaitableEvent* done_event,
+ bool* exists) {
+ filesystem::FileError error = filesystem::FileError::FAILED;
+ bool completed =
+ dir->directory->Exists(mojo::String::From(name), &error, exists);
+ DCHECK(completed);
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::GetChildrenImpl(OpaqueDir* dir,
+ std::string name,
+ std::vector<std::string>* out_contents,
+ base::WaitableEvent* done_event,
+ filesystem::FileError* out_error) {
+ // Step one: open the directory |name| from the toplevel directory.
+ filesystem::DirectoryPtr target;
+ filesystem::DirectoryRequest proxy = GetProxy(&target);
+ bool completed = dir->directory->OpenDirectory(
+ name, std::move(proxy), filesystem::kFlagRead | filesystem::kFlagWrite,
+ out_error);
+ DCHECK(completed);
+
+ if (*out_error != filesystem::FileError::OK) {
+ SignalIfNeeded(done_event);
+ return;
+ }
+
+ mojo::Array<filesystem::DirectoryEntryPtr> directory_contents;
+ completed = target->Read(out_error, &directory_contents);
+ DCHECK(completed);
+
+ if (!directory_contents.is_null()) {
+ for (size_t i = 0; i < directory_contents.size(); ++i)
+ out_contents->push_back(directory_contents[i]->name.To<std::string>());
+ }
+
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::DeleteImpl(OpaqueDir* dir,
+ std::string name,
+ uint32_t delete_flags,
+ base::WaitableEvent* done_event,
+ filesystem::FileError* out_error) {
+ bool completed =
+ dir->directory->Delete(mojo::String::From(name), delete_flags, out_error);
+ DCHECK(completed);
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::CreateDirImpl(OpaqueDir* dir,
+ std::string name,
+ base::WaitableEvent* done_event,
+ filesystem::FileError* out_error) {
+ bool completed = dir->directory->OpenDirectory(
+ name, nullptr,
+ filesystem::kFlagRead | filesystem::kFlagWrite | filesystem::kFlagCreate,
+ out_error);
+ DCHECK(completed);
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::GetFileSizeImpl(OpaqueDir* dir,
+ const std::string& path,
+ uint64_t* file_size,
+ base::WaitableEvent* done_event,
+ filesystem::FileError* out_error) {
+ filesystem::FileInformationPtr info;
+ bool completed = dir->directory->StatFile(path, out_error, &info);
+ DCHECK(completed);
+ if (info)
+ *file_size = info->size;
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::RenameFileImpl(OpaqueDir* dir,
+ const std::string& old_path,
+ const std::string& new_path,
+ base::WaitableEvent* done_event,
+ filesystem::FileError* out_error) {
+ bool completed = dir->directory->Rename(
+ mojo::String::From(old_path), mojo::String::From(new_path), out_error);
+ DCHECK(completed);
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::LockFileImpl(OpaqueDir* dir,
+ const std::string& path,
+ base::WaitableEvent* done_event,
+ filesystem::FileError* out_error,
+ OpaqueLock** out_lock) {
+ // Since a lock is associated with a file descriptor, we need to open and
+ // have a persistent file on the other side of the connection.
+ filesystem::FilePtr target;
+ filesystem::FileRequest proxy = GetProxy(&target);
+ bool completed = dir->directory->OpenFile(
+ mojo::String::From(path), std::move(proxy),
+ filesystem::kFlagOpenAlways | filesystem::kFlagRead |
+ filesystem::kFlagWrite,
+ out_error);
+ DCHECK(completed);
+
+ if (*out_error != filesystem::FileError::OK) {
+ SignalIfNeeded(done_event);
+ return;
+ }
+
+ completed = target->Lock(out_error);
+ DCHECK(completed);
+
+ if (*out_error == filesystem::FileError::OK) {
+ OpaqueLock* l = new OpaqueLock;
+ l->lock_file = std::move(target);
+ *out_lock = l;
+ }
+
+ SignalIfNeeded(done_event);
+}
+
+void LevelDBMojoProxy::UnlockFileImpl(scoped_ptr<OpaqueLock> lock,
+ base::WaitableEvent* done_event,
+ filesystem::FileError* out_error) {
+ lock->lock_file->Unlock(out_error);
+ SignalIfNeeded(done_event);
+}
+
+} // namespace leveldb

Powered by Google App Engine
This is Rietveld 408576698