Chromium Code Reviews| Index: components/leveldb/leveldb_mojo_proxy.cc |
| diff --git a/components/leveldb/leveldb_mojo_proxy.cc b/components/leveldb/leveldb_mojo_proxy.cc |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..1e36f261577dfdb9783a56bcd001faba685defd6 |
| --- /dev/null |
| +++ b/components/leveldb/leveldb_mojo_proxy.cc |
| @@ -0,0 +1,472 @@ |
| +// Copyright 2016 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#include "components/leveldb/leveldb_mojo_proxy.h" |
| + |
| +#include <set> |
| + |
| +#include "base/bind.h" |
| +#include "mojo/message_pump/message_pump_mojo.h" |
| +#include "mojo/platform_handle/platform_handle_functions.h" |
| +#include "mojo/public/cpp/bindings/interface_request.h" |
| + |
| +namespace leveldb { |
| + |
| +struct LevelDBMojoProxy::OpaqueLock { |
| + filesystem::FilePtr lock_file; |
| +}; |
| + |
| +struct LevelDBMojoProxy::OpaqueDir { |
| + explicit OpaqueDir( |
| + mojo::InterfacePtrInfo<filesystem::Directory> directory_info) { |
| + directory.Bind(std::move(directory_info)); |
| + } |
| + |
| + filesystem::DirectoryPtr directory; |
| +}; |
| + |
| +LevelDBMojoProxy::LevelDBMojoProxy( |
| + scoped_refptr<base::SingleThreadTaskRunner> task_runner) |
| + : task_runner_(std::move(task_runner)), outstanding_opaque_dirs_(0) {} |
| + |
| +LevelDBMojoProxy::OpaqueDir* LevelDBMojoProxy::RegisterDirectory( |
| + filesystem::DirectoryPtr directory) { |
| + OpaqueDir* out_dir = nullptr; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + // If our task runner is the current thread, we switch to synchronous mode |
| + // instead of posting tasks with WaitableEvents. |
| + RegisterDirectoryImpl(directory.PassInterface(), nullptr, &out_dir); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
|
jam
2016/03/29 05:58:00
nit: no point in repeating this comment everywhere
|
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::RegisterDirectoryImpl, this, |
| + base::Passed(directory.PassInterface()), |
| + &done_event, &out_dir)); |
| + done_event.Wait(); |
| + } |
| + |
| + return out_dir; |
| +} |
| + |
| +void LevelDBMojoProxy::UnregisterDirectory(OpaqueDir* dir) { |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + UnregisterDirectoryImpl(dir, nullptr); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::UnregisterDirectoryImpl, this, |
| + dir, &done_event)); |
| + done_event.Wait(); |
|
jam
2016/03/29 05:58:00
since this code block is duplicated in all methods
Elliot Glaysher
2016/03/29 17:33:29
So how does the WaitableEvent block the current me
|
| + } |
| +} |
| + |
| +base::File LevelDBMojoProxy::OpenFileHandle(OpaqueDir* dir, |
| + const std::string& name, |
| + uint32_t open_flags) { |
| + base::File file; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + OpenFileHandleImpl(dir, name, open_flags, nullptr, &file); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::OpenFileHandleImpl, this, dir, |
| + name, open_flags, &done_event, &file)); |
| + done_event.Wait(); |
| + } |
| + |
| + return file; |
| +} |
| + |
| +filesystem::FileError LevelDBMojoProxy::SyncDirectory(OpaqueDir* dir, |
| + const std::string& name) { |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + SyncDirectoryImpl(dir, name, nullptr, &error); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::SyncDirectoryImpl, this, dir, |
| + name, &done_event, &error)); |
| + done_event.Wait(); |
| + } |
| + |
| + return error; |
| +} |
| + |
| +bool LevelDBMojoProxy::FileExists(OpaqueDir* dir, const std::string& name) { |
| + bool exists = false; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + FileExistsImpl(dir, name, nullptr, &exists); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::FileExistsImpl, this, dir, |
| + name, &done_event, &exists)); |
| + done_event.Wait(); |
| + } |
| + |
| + return exists; |
| +} |
| + |
| +filesystem::FileError LevelDBMojoProxy::GetChildren( |
| + OpaqueDir* dir, |
| + const std::string& path, |
| + std::vector<std::string>* result) { |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + GetChildrenImpl(dir, path, result, nullptr, &error); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::GetChildrenImpl, this, dir, |
| + path, result, &done_event, &error)); |
| + done_event.Wait(); |
| + } |
| + |
| + return error; |
| +} |
| + |
| +filesystem::FileError LevelDBMojoProxy::Delete(OpaqueDir* dir, |
| + const std::string& path, |
| + uint32_t delete_flags) { |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + DeleteImpl(dir, path, delete_flags, nullptr, &error); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::DeleteImpl, this, dir, path, |
| + delete_flags, &done_event, &error)); |
| + done_event.Wait(); |
| + } |
| + |
| + return error; |
| +} |
| + |
| +filesystem::FileError LevelDBMojoProxy::CreateDir(OpaqueDir* dir, |
| + const std::string& path) { |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + CreateDirImpl(dir, path, nullptr, &error); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::CreateDirImpl, this, dir, path, |
| + &done_event, &error)); |
| + done_event.Wait(); |
| + } |
| + |
| + return error; |
| +} |
| + |
| +filesystem::FileError LevelDBMojoProxy::GetFileSize(OpaqueDir* dir, |
| + const std::string& path, |
| + uint64_t* file_size) { |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + GetFileSizeImpl(dir, path, file_size, nullptr, &error); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::GetFileSizeImpl, this, dir, |
| + path, file_size, &done_event, &error)); |
| + done_event.Wait(); |
| + } |
| + |
| + return error; |
| +} |
| + |
| +filesystem::FileError LevelDBMojoProxy::RenameFile( |
| + OpaqueDir* dir, |
| + const std::string& old_path, |
| + const std::string& new_path) { |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + RenameFileImpl(dir, old_path, new_path, nullptr, &error); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::RenameFileImpl, this, dir, |
| + old_path, new_path, &done_event, &error)); |
| + done_event.Wait(); |
| + } |
| + |
| + return error; |
| +} |
| + |
| +std::pair<filesystem::FileError, LevelDBMojoProxy::OpaqueLock*> |
| +LevelDBMojoProxy::LockFile(OpaqueDir* dir, const std::string& path) { |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + OpaqueLock* out_lock = nullptr; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + LockFileImpl(dir, path, nullptr, &error, &out_lock); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::LockFileImpl, this, dir, path, |
| + &done_event, &error, &out_lock)); |
| + done_event.Wait(); |
| + } |
| + |
| + return std::make_pair(error, out_lock); |
| +} |
| + |
| +filesystem::FileError LevelDBMojoProxy::UnlockFile(OpaqueLock* lock) { |
| + // Take ownership of the incoming lock so it gets destroyed whatever happens. |
| + scoped_ptr<OpaqueLock> scoped_lock(lock); |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + |
| + if (task_runner_->BelongsToCurrentThread()) { |
| + UnlockFileImpl(std::move(scoped_lock), nullptr, &error); |
| + } else { |
| + // This proxies to the other thread, which proxies to mojo. Only on the |
| + // reply from mojo do we return from this. |
| + base::WaitableEvent done_event(false, false); |
| + task_runner_->PostTask( |
| + FROM_HERE, base::Bind(&LevelDBMojoProxy::UnlockFileImpl, this, |
| + base::Passed(&scoped_lock), &done_event, &error)); |
| + done_event.Wait(); |
| + } |
| + |
| + return error; |
| +} |
| + |
| +LevelDBMojoProxy::~LevelDBMojoProxy() { |
| + DCHECK_EQ(0, outstanding_opaque_dirs_); |
| +} |
| + |
| +void LevelDBMojoProxy::SignalIfNeeded(base::WaitableEvent* done_event) { |
| + if (done_event) |
| + done_event->Signal(); |
| +} |
| + |
| +void LevelDBMojoProxy::RegisterDirectoryImpl( |
| + mojo::InterfacePtrInfo<filesystem::Directory> directory_info, |
| + base::WaitableEvent* done_event, |
| + OpaqueDir** out_dir) { |
| + // Take the Directory pipe and bind it on this thread. |
| + *out_dir = new OpaqueDir(std::move(directory_info)); |
| + outstanding_opaque_dirs_++; |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::UnregisterDirectoryImpl( |
| + OpaqueDir* dir, |
| + base::WaitableEvent* done_event) { |
| + // Only delete the directories on the thread that owns them. |
| + delete dir; |
| + outstanding_opaque_dirs_--; |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::OpenFileHandleImpl(OpaqueDir* dir, |
| + std::string name, |
| + uint32_t open_flags, |
| + base::WaitableEvent* done_event, |
| + base::File* output_file) { |
| + mojo::ScopedHandle handle; |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + bool completed = dir->directory->OpenFileHandle(mojo::String::From(name), |
| + open_flags, &error, &handle); |
| + DCHECK(completed); |
| + |
| + if (error != filesystem::FileError::OK) { |
| + *output_file = base::File(static_cast<base::File::Error>(error)); |
| + } else { |
| + MojoPlatformHandle platform_handle; |
| + MojoResult extract_result = |
| + MojoExtractPlatformHandle(handle.release().value(), &platform_handle); |
| + |
| + if (extract_result == MOJO_RESULT_OK) { |
| + *output_file = base::File(platform_handle); |
| + } else { |
| + NOTREACHED(); |
| + *output_file = base::File(base::File::Error::FILE_ERROR_FAILED); |
| + } |
| + } |
| + |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::SyncDirectoryImpl(OpaqueDir* dir, |
| + std::string name, |
| + base::WaitableEvent* done_event, |
| + filesystem::FileError* out_error) { |
| + filesystem::DirectoryPtr target; |
| + bool completed = dir->directory->OpenDirectory( |
| + name, GetProxy(&target), filesystem::kFlagRead | filesystem::kFlagWrite, |
| + out_error); |
| + DCHECK(completed); |
| + |
| + if (*out_error != filesystem::FileError::OK) { |
| + SignalIfNeeded(done_event); |
| + return; |
| + } |
| + |
| + completed = target->Flush(out_error); |
| + DCHECK(completed); |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::FileExistsImpl(OpaqueDir* dir, |
| + std::string name, |
| + base::WaitableEvent* done_event, |
| + bool* exists) { |
| + filesystem::FileError error = filesystem::FileError::FAILED; |
| + bool completed = |
| + dir->directory->Exists(mojo::String::From(name), &error, exists); |
| + DCHECK(completed); |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::GetChildrenImpl(OpaqueDir* dir, |
| + std::string name, |
| + std::vector<std::string>* out_contents, |
| + base::WaitableEvent* done_event, |
| + filesystem::FileError* out_error) { |
| + // Step one: open the directory |name| from the toplevel directory. |
| + filesystem::DirectoryPtr target; |
| + filesystem::DirectoryRequest proxy = GetProxy(&target); |
| + bool completed = dir->directory->OpenDirectory( |
| + name, std::move(proxy), filesystem::kFlagRead | filesystem::kFlagWrite, |
| + out_error); |
| + DCHECK(completed); |
| + |
| + if (*out_error != filesystem::FileError::OK) { |
| + SignalIfNeeded(done_event); |
| + return; |
| + } |
| + |
| + mojo::Array<filesystem::DirectoryEntryPtr> directory_contents; |
| + completed = target->Read(out_error, &directory_contents); |
| + DCHECK(completed); |
| + |
| + if (!directory_contents.is_null()) { |
| + for (size_t i = 0; i < directory_contents.size(); ++i) |
| + out_contents->push_back(directory_contents[i]->name.To<std::string>()); |
| + } |
| + |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::DeleteImpl(OpaqueDir* dir, |
| + std::string name, |
| + uint32_t delete_flags, |
| + base::WaitableEvent* done_event, |
| + filesystem::FileError* out_error) { |
| + bool completed = |
| + dir->directory->Delete(mojo::String::From(name), delete_flags, out_error); |
| + DCHECK(completed); |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::CreateDirImpl(OpaqueDir* dir, |
| + std::string name, |
| + base::WaitableEvent* done_event, |
| + filesystem::FileError* out_error) { |
| + bool completed = dir->directory->OpenDirectory( |
| + name, nullptr, |
| + filesystem::kFlagRead | filesystem::kFlagWrite | filesystem::kFlagCreate, |
| + out_error); |
| + DCHECK(completed); |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::GetFileSizeImpl(OpaqueDir* dir, |
| + const std::string& path, |
| + uint64_t* file_size, |
| + base::WaitableEvent* done_event, |
| + filesystem::FileError* out_error) { |
| + filesystem::FileInformationPtr info; |
| + bool completed = dir->directory->StatFile(path, out_error, &info); |
| + DCHECK(completed); |
| + if (info) |
| + *file_size = info->size; |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::RenameFileImpl(OpaqueDir* dir, |
| + const std::string& old_path, |
| + const std::string& new_path, |
| + base::WaitableEvent* done_event, |
| + filesystem::FileError* out_error) { |
| + bool completed = dir->directory->Rename( |
| + mojo::String::From(old_path), mojo::String::From(new_path), out_error); |
| + DCHECK(completed); |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::LockFileImpl(OpaqueDir* dir, |
| + const std::string& path, |
| + base::WaitableEvent* done_event, |
| + filesystem::FileError* out_error, |
| + OpaqueLock** out_lock) { |
| + // Since a lock is associated with a file descriptor, we need to open and |
| + // have a persistent file on the other side of the connection. |
| + filesystem::FilePtr target; |
| + filesystem::FileRequest proxy = GetProxy(&target); |
| + bool completed = dir->directory->OpenFile( |
| + mojo::String::From(path), std::move(proxy), |
| + filesystem::kFlagOpenAlways | filesystem::kFlagRead | |
| + filesystem::kFlagWrite, |
| + out_error); |
| + DCHECK(completed); |
| + |
| + if (*out_error != filesystem::FileError::OK) { |
| + SignalIfNeeded(done_event); |
| + return; |
| + } |
| + |
| + completed = target->Lock(out_error); |
| + DCHECK(completed); |
| + |
| + if (*out_error == filesystem::FileError::OK) { |
| + OpaqueLock* l = new OpaqueLock; |
| + l->lock_file = std::move(target); |
| + *out_lock = l; |
| + } |
| + |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +void LevelDBMojoProxy::UnlockFileImpl(scoped_ptr<OpaqueLock> lock, |
| + base::WaitableEvent* done_event, |
| + filesystem::FileError* out_error) { |
| + lock->lock_file->Unlock(out_error); |
| + SignalIfNeeded(done_event); |
| +} |
| + |
| +} // namespace leveldb |