Index: components/leveldb/leveldb_mojo_proxy.cc |
diff --git a/components/leveldb/leveldb_mojo_proxy.cc b/components/leveldb/leveldb_mojo_proxy.cc |
new file mode 100644 |
index 0000000000000000000000000000000000000000..1e36f261577dfdb9783a56bcd001faba685defd6 |
--- /dev/null |
+++ b/components/leveldb/leveldb_mojo_proxy.cc |
@@ -0,0 +1,472 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#include "components/leveldb/leveldb_mojo_proxy.h" |
+ |
+#include <set> |
+ |
+#include "base/bind.h" |
+#include "mojo/message_pump/message_pump_mojo.h" |
+#include "mojo/platform_handle/platform_handle_functions.h" |
+#include "mojo/public/cpp/bindings/interface_request.h" |
+ |
+namespace leveldb { |
+ |
+struct LevelDBMojoProxy::OpaqueLock { |
+ filesystem::FilePtr lock_file; |
+}; |
+ |
+struct LevelDBMojoProxy::OpaqueDir { |
+ explicit OpaqueDir( |
+ mojo::InterfacePtrInfo<filesystem::Directory> directory_info) { |
+ directory.Bind(std::move(directory_info)); |
+ } |
+ |
+ filesystem::DirectoryPtr directory; |
+}; |
+ |
+LevelDBMojoProxy::LevelDBMojoProxy( |
+ scoped_refptr<base::SingleThreadTaskRunner> task_runner) |
+ : task_runner_(std::move(task_runner)), outstanding_opaque_dirs_(0) {} |
+ |
+LevelDBMojoProxy::OpaqueDir* LevelDBMojoProxy::RegisterDirectory( |
+ filesystem::DirectoryPtr directory) { |
+ OpaqueDir* out_dir = nullptr; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ // If our task runner is the current thread, we switch to synchronous mode |
+ // instead of posting tasks with WaitableEvents. |
+ RegisterDirectoryImpl(directory.PassInterface(), nullptr, &out_dir); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
jam
2016/03/29 05:58:00
nit: no point in repeating this comment everywhere
|
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::RegisterDirectoryImpl, this, |
+ base::Passed(directory.PassInterface()), |
+ &done_event, &out_dir)); |
+ done_event.Wait(); |
+ } |
+ |
+ return out_dir; |
+} |
+ |
+void LevelDBMojoProxy::UnregisterDirectory(OpaqueDir* dir) { |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ UnregisterDirectoryImpl(dir, nullptr); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::UnregisterDirectoryImpl, this, |
+ dir, &done_event)); |
+ done_event.Wait(); |
jam
2016/03/29 05:58:00
since this code block is duplicated in all methods
Elliot Glaysher
2016/03/29 17:33:29
So how does the WaitableEvent block the current me
|
+ } |
+} |
+ |
+base::File LevelDBMojoProxy::OpenFileHandle(OpaqueDir* dir, |
+ const std::string& name, |
+ uint32_t open_flags) { |
+ base::File file; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ OpenFileHandleImpl(dir, name, open_flags, nullptr, &file); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::OpenFileHandleImpl, this, dir, |
+ name, open_flags, &done_event, &file)); |
+ done_event.Wait(); |
+ } |
+ |
+ return file; |
+} |
+ |
+filesystem::FileError LevelDBMojoProxy::SyncDirectory(OpaqueDir* dir, |
+ const std::string& name) { |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ SyncDirectoryImpl(dir, name, nullptr, &error); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::SyncDirectoryImpl, this, dir, |
+ name, &done_event, &error)); |
+ done_event.Wait(); |
+ } |
+ |
+ return error; |
+} |
+ |
+bool LevelDBMojoProxy::FileExists(OpaqueDir* dir, const std::string& name) { |
+ bool exists = false; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ FileExistsImpl(dir, name, nullptr, &exists); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::FileExistsImpl, this, dir, |
+ name, &done_event, &exists)); |
+ done_event.Wait(); |
+ } |
+ |
+ return exists; |
+} |
+ |
+filesystem::FileError LevelDBMojoProxy::GetChildren( |
+ OpaqueDir* dir, |
+ const std::string& path, |
+ std::vector<std::string>* result) { |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ GetChildrenImpl(dir, path, result, nullptr, &error); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::GetChildrenImpl, this, dir, |
+ path, result, &done_event, &error)); |
+ done_event.Wait(); |
+ } |
+ |
+ return error; |
+} |
+ |
+filesystem::FileError LevelDBMojoProxy::Delete(OpaqueDir* dir, |
+ const std::string& path, |
+ uint32_t delete_flags) { |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ DeleteImpl(dir, path, delete_flags, nullptr, &error); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::DeleteImpl, this, dir, path, |
+ delete_flags, &done_event, &error)); |
+ done_event.Wait(); |
+ } |
+ |
+ return error; |
+} |
+ |
+filesystem::FileError LevelDBMojoProxy::CreateDir(OpaqueDir* dir, |
+ const std::string& path) { |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ CreateDirImpl(dir, path, nullptr, &error); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::CreateDirImpl, this, dir, path, |
+ &done_event, &error)); |
+ done_event.Wait(); |
+ } |
+ |
+ return error; |
+} |
+ |
+filesystem::FileError LevelDBMojoProxy::GetFileSize(OpaqueDir* dir, |
+ const std::string& path, |
+ uint64_t* file_size) { |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ GetFileSizeImpl(dir, path, file_size, nullptr, &error); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::GetFileSizeImpl, this, dir, |
+ path, file_size, &done_event, &error)); |
+ done_event.Wait(); |
+ } |
+ |
+ return error; |
+} |
+ |
+filesystem::FileError LevelDBMojoProxy::RenameFile( |
+ OpaqueDir* dir, |
+ const std::string& old_path, |
+ const std::string& new_path) { |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ RenameFileImpl(dir, old_path, new_path, nullptr, &error); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::RenameFileImpl, this, dir, |
+ old_path, new_path, &done_event, &error)); |
+ done_event.Wait(); |
+ } |
+ |
+ return error; |
+} |
+ |
+std::pair<filesystem::FileError, LevelDBMojoProxy::OpaqueLock*> |
+LevelDBMojoProxy::LockFile(OpaqueDir* dir, const std::string& path) { |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ OpaqueLock* out_lock = nullptr; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ LockFileImpl(dir, path, nullptr, &error, &out_lock); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::LockFileImpl, this, dir, path, |
+ &done_event, &error, &out_lock)); |
+ done_event.Wait(); |
+ } |
+ |
+ return std::make_pair(error, out_lock); |
+} |
+ |
+filesystem::FileError LevelDBMojoProxy::UnlockFile(OpaqueLock* lock) { |
+ // Take ownership of the incoming lock so it gets destroyed whatever happens. |
+ scoped_ptr<OpaqueLock> scoped_lock(lock); |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ |
+ if (task_runner_->BelongsToCurrentThread()) { |
+ UnlockFileImpl(std::move(scoped_lock), nullptr, &error); |
+ } else { |
+ // This proxies to the other thread, which proxies to mojo. Only on the |
+ // reply from mojo do we return from this. |
+ base::WaitableEvent done_event(false, false); |
+ task_runner_->PostTask( |
+ FROM_HERE, base::Bind(&LevelDBMojoProxy::UnlockFileImpl, this, |
+ base::Passed(&scoped_lock), &done_event, &error)); |
+ done_event.Wait(); |
+ } |
+ |
+ return error; |
+} |
+ |
+LevelDBMojoProxy::~LevelDBMojoProxy() { |
+ DCHECK_EQ(0, outstanding_opaque_dirs_); |
+} |
+ |
+void LevelDBMojoProxy::SignalIfNeeded(base::WaitableEvent* done_event) { |
+ if (done_event) |
+ done_event->Signal(); |
+} |
+ |
+void LevelDBMojoProxy::RegisterDirectoryImpl( |
+ mojo::InterfacePtrInfo<filesystem::Directory> directory_info, |
+ base::WaitableEvent* done_event, |
+ OpaqueDir** out_dir) { |
+ // Take the Directory pipe and bind it on this thread. |
+ *out_dir = new OpaqueDir(std::move(directory_info)); |
+ outstanding_opaque_dirs_++; |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::UnregisterDirectoryImpl( |
+ OpaqueDir* dir, |
+ base::WaitableEvent* done_event) { |
+ // Only delete the directories on the thread that owns them. |
+ delete dir; |
+ outstanding_opaque_dirs_--; |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::OpenFileHandleImpl(OpaqueDir* dir, |
+ std::string name, |
+ uint32_t open_flags, |
+ base::WaitableEvent* done_event, |
+ base::File* output_file) { |
+ mojo::ScopedHandle handle; |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ bool completed = dir->directory->OpenFileHandle(mojo::String::From(name), |
+ open_flags, &error, &handle); |
+ DCHECK(completed); |
+ |
+ if (error != filesystem::FileError::OK) { |
+ *output_file = base::File(static_cast<base::File::Error>(error)); |
+ } else { |
+ MojoPlatformHandle platform_handle; |
+ MojoResult extract_result = |
+ MojoExtractPlatformHandle(handle.release().value(), &platform_handle); |
+ |
+ if (extract_result == MOJO_RESULT_OK) { |
+ *output_file = base::File(platform_handle); |
+ } else { |
+ NOTREACHED(); |
+ *output_file = base::File(base::File::Error::FILE_ERROR_FAILED); |
+ } |
+ } |
+ |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::SyncDirectoryImpl(OpaqueDir* dir, |
+ std::string name, |
+ base::WaitableEvent* done_event, |
+ filesystem::FileError* out_error) { |
+ filesystem::DirectoryPtr target; |
+ bool completed = dir->directory->OpenDirectory( |
+ name, GetProxy(&target), filesystem::kFlagRead | filesystem::kFlagWrite, |
+ out_error); |
+ DCHECK(completed); |
+ |
+ if (*out_error != filesystem::FileError::OK) { |
+ SignalIfNeeded(done_event); |
+ return; |
+ } |
+ |
+ completed = target->Flush(out_error); |
+ DCHECK(completed); |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::FileExistsImpl(OpaqueDir* dir, |
+ std::string name, |
+ base::WaitableEvent* done_event, |
+ bool* exists) { |
+ filesystem::FileError error = filesystem::FileError::FAILED; |
+ bool completed = |
+ dir->directory->Exists(mojo::String::From(name), &error, exists); |
+ DCHECK(completed); |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::GetChildrenImpl(OpaqueDir* dir, |
+ std::string name, |
+ std::vector<std::string>* out_contents, |
+ base::WaitableEvent* done_event, |
+ filesystem::FileError* out_error) { |
+ // Step one: open the directory |name| from the toplevel directory. |
+ filesystem::DirectoryPtr target; |
+ filesystem::DirectoryRequest proxy = GetProxy(&target); |
+ bool completed = dir->directory->OpenDirectory( |
+ name, std::move(proxy), filesystem::kFlagRead | filesystem::kFlagWrite, |
+ out_error); |
+ DCHECK(completed); |
+ |
+ if (*out_error != filesystem::FileError::OK) { |
+ SignalIfNeeded(done_event); |
+ return; |
+ } |
+ |
+ mojo::Array<filesystem::DirectoryEntryPtr> directory_contents; |
+ completed = target->Read(out_error, &directory_contents); |
+ DCHECK(completed); |
+ |
+ if (!directory_contents.is_null()) { |
+ for (size_t i = 0; i < directory_contents.size(); ++i) |
+ out_contents->push_back(directory_contents[i]->name.To<std::string>()); |
+ } |
+ |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::DeleteImpl(OpaqueDir* dir, |
+ std::string name, |
+ uint32_t delete_flags, |
+ base::WaitableEvent* done_event, |
+ filesystem::FileError* out_error) { |
+ bool completed = |
+ dir->directory->Delete(mojo::String::From(name), delete_flags, out_error); |
+ DCHECK(completed); |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::CreateDirImpl(OpaqueDir* dir, |
+ std::string name, |
+ base::WaitableEvent* done_event, |
+ filesystem::FileError* out_error) { |
+ bool completed = dir->directory->OpenDirectory( |
+ name, nullptr, |
+ filesystem::kFlagRead | filesystem::kFlagWrite | filesystem::kFlagCreate, |
+ out_error); |
+ DCHECK(completed); |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::GetFileSizeImpl(OpaqueDir* dir, |
+ const std::string& path, |
+ uint64_t* file_size, |
+ base::WaitableEvent* done_event, |
+ filesystem::FileError* out_error) { |
+ filesystem::FileInformationPtr info; |
+ bool completed = dir->directory->StatFile(path, out_error, &info); |
+ DCHECK(completed); |
+ if (info) |
+ *file_size = info->size; |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::RenameFileImpl(OpaqueDir* dir, |
+ const std::string& old_path, |
+ const std::string& new_path, |
+ base::WaitableEvent* done_event, |
+ filesystem::FileError* out_error) { |
+ bool completed = dir->directory->Rename( |
+ mojo::String::From(old_path), mojo::String::From(new_path), out_error); |
+ DCHECK(completed); |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::LockFileImpl(OpaqueDir* dir, |
+ const std::string& path, |
+ base::WaitableEvent* done_event, |
+ filesystem::FileError* out_error, |
+ OpaqueLock** out_lock) { |
+ // Since a lock is associated with a file descriptor, we need to open and |
+ // have a persistent file on the other side of the connection. |
+ filesystem::FilePtr target; |
+ filesystem::FileRequest proxy = GetProxy(&target); |
+ bool completed = dir->directory->OpenFile( |
+ mojo::String::From(path), std::move(proxy), |
+ filesystem::kFlagOpenAlways | filesystem::kFlagRead | |
+ filesystem::kFlagWrite, |
+ out_error); |
+ DCHECK(completed); |
+ |
+ if (*out_error != filesystem::FileError::OK) { |
+ SignalIfNeeded(done_event); |
+ return; |
+ } |
+ |
+ completed = target->Lock(out_error); |
+ DCHECK(completed); |
+ |
+ if (*out_error == filesystem::FileError::OK) { |
+ OpaqueLock* l = new OpaqueLock; |
+ l->lock_file = std::move(target); |
+ *out_lock = l; |
+ } |
+ |
+ SignalIfNeeded(done_event); |
+} |
+ |
+void LevelDBMojoProxy::UnlockFileImpl(scoped_ptr<OpaqueLock> lock, |
+ base::WaitableEvent* done_event, |
+ filesystem::FileError* out_error) { |
+ lock->lock_file->Unlock(out_error); |
+ SignalIfNeeded(done_event); |
+} |
+ |
+} // namespace leveldb |