| Index: components/leveldb/leveldb_file_thread.cc
|
| diff --git a/components/leveldb/leveldb_file_thread.cc b/components/leveldb/leveldb_file_thread.cc
|
| index 7bc9774bc4a0e616ca8feeb4e333e7313ac903e0..efad40e36d86df8c1a5a51066908a9e7c675042f 100644
|
| --- a/components/leveldb/leveldb_file_thread.cc
|
| +++ b/components/leveldb/leveldb_file_thread.cc
|
| @@ -4,6 +4,8 @@
|
|
|
| #include "components/leveldb/leveldb_file_thread.h"
|
|
|
| +#include <set>
|
| +
|
| #include "base/bind.h"
|
| #include "mojo/message_pump/message_pump_mojo.h"
|
| #include "mojo/platform_handle/platform_handle_functions.h"
|
| @@ -19,13 +21,21 @@ struct LevelDBFileThread::OpaqueLock {
|
| };
|
|
|
| struct LevelDBFileThread::OpaqueDir {
|
| - OpaqueDir(mojo::InterfacePtrInfo<filesystem::Directory> directory_info) {
|
| + explicit OpaqueDir(
|
| + mojo::InterfacePtrInfo<filesystem::Directory> directory_info) {
|
| directory.Bind(std::move(directory_info));
|
| }
|
|
|
| filesystem::DirectoryPtr directory;
|
| };
|
|
|
| +struct LevelDBFileThread::WaitableEventDependencies {
|
| + WaitableEventDependencies() {}
|
| + ~WaitableEventDependencies() {}
|
| + std::set<filesystem::DirectoryPtr*> directories;
|
| + std::set<filesystem::FilePtr*> files;
|
| +};
|
| +
|
| LevelDBFileThread::LevelDBFileThread()
|
| : base::Thread(kLevelDBFileThreadName),
|
| outstanding_opaque_dirs_(0) {
|
| @@ -257,12 +267,64 @@ LevelDBFileThread::~LevelDBFileThread() {
|
| DCHECK_EQ(0, outstanding_opaque_dirs_);
|
| }
|
|
|
| +bool LevelDBFileThread::RegisterDirAndWaitableEvent(
|
| + OpaqueDir* dir,
|
| + base::WaitableEvent* done_event) {
|
| + if (!dir->directory.is_bound()) {
|
| + // The directory went out of scope between the PostTask on the other thread
|
| + // and now.
|
| + done_event->Signal();
|
| + return true;
|
| + }
|
| +
|
| + waitable_event_dependencies_[done_event].directories.insert(&dir->directory);
|
| + return false;
|
| +}
|
| +
|
| +void LevelDBFileThread::CompleteWaitableEvent(base::WaitableEvent* done_event) {
|
| + // Clean up the dependencies that we no longer care about.
|
| + waitable_event_dependencies_.erase(done_event);
|
| + done_event->Signal();
|
| +}
|
| +
|
| +void LevelDBFileThread::OnConnectionError() {
|
| + // One of our interface ptrs has become unbound. Signal the event which has
|
| + // it as a dependency.
|
| + auto it = waitable_event_dependencies_.begin();
|
| + while (it != waitable_event_dependencies_.end()) {
|
| + bool unbound_ptr_found = false;
|
| + for (const auto* dir : it->second.directories) {
|
| + if (!dir->is_bound()) {
|
| + unbound_ptr_found = true;
|
| + break;
|
| + }
|
| + }
|
| +
|
| + if (!unbound_ptr_found) {
|
| + for (const auto* file : it->second.files) {
|
| + if (!file->is_bound()) {
|
| + unbound_ptr_found = true;
|
| + break;
|
| + }
|
| + }
|
| + }
|
| +
|
| + if (unbound_ptr_found) {
|
| + base::WaitableEvent* e = it->first;
|
| + it = waitable_event_dependencies_.erase(it);
|
| + e->Signal();
|
| + } else {
|
| + ++it;
|
| + }
|
| + }
|
| +}
|
| +
|
| void LevelDBFileThread::OnSimpleComplete(base::WaitableEvent* done_event,
|
| filesystem::FileError* out_error,
|
| filesystem::FileError in_error) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
| *out_error = in_error;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| }
|
|
|
| void LevelDBFileThread::RegisterDirectoryImpl(
|
| @@ -274,6 +336,11 @@ void LevelDBFileThread::RegisterDirectoryImpl(
|
| // Take the Directory pipe and bind it on this thread.
|
| *out_dir = new OpaqueDir(std::move(directory_info));
|
| outstanding_opaque_dirs_++;
|
| +
|
| + // Register the connection error handler for the resulting DirectoryPtr
|
| + (*out_dir)->directory.set_connection_error_handler(
|
| + base::Bind(&LevelDBFileThread::OnConnectionError, this));
|
| +
|
| done_event->Signal();
|
| }
|
|
|
| @@ -283,7 +350,7 @@ void LevelDBFileThread::UnregisterDirectoryImpl(
|
| // Only delete the directories on the thread that owns them.
|
| delete dir;
|
| outstanding_opaque_dirs_--;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| }
|
|
|
| void LevelDBFileThread::OpenFileHandleImpl(OpaqueDir* dir,
|
| @@ -293,6 +360,9 @@ void LevelDBFileThread::OpenFileHandleImpl(OpaqueDir* dir,
|
| base::File* out_file) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
|
|
| + if (RegisterDirAndWaitableEvent(dir, done_event))
|
| + return;
|
| +
|
| dir->directory->OpenFileHandle(
|
| mojo::String::From(name), open_flags,
|
| base::Bind(&LevelDBFileThread::OnOpenFileHandleComplete, this, done_event,
|
| @@ -321,7 +391,7 @@ void LevelDBFileThread::OnOpenFileHandleComplete(
|
| }
|
| }
|
|
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| }
|
|
|
| void LevelDBFileThread::SyncDirectoryImpl(OpaqueDir* dir,
|
| @@ -330,6 +400,9 @@ void LevelDBFileThread::SyncDirectoryImpl(OpaqueDir* dir,
|
| filesystem::FileError* out_error) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
|
|
| + if (RegisterDirAndWaitableEvent(dir, done_event))
|
| + return;
|
| +
|
| // Step one: open the directory |name| from the toplevel directory.
|
| scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr);
|
|
|
| @@ -349,10 +422,16 @@ void LevelDBFileThread::OnSyncDirctoryOpened(
|
|
|
| if (in_error != filesystem::FileError::OK) {
|
| *out_error = in_error;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| return;
|
| }
|
|
|
| + // Add a dependency between the new directory we opened and the current
|
| + // waitable event.
|
| + dir->set_connection_error_handler(
|
| + base::Bind(&LevelDBFileThread::OnConnectionError, this));
|
| + waitable_event_dependencies_[done_event].directories.insert(dir.get());
|
| +
|
| // We move the object into the bind before we call. Copy to the stack.
|
| filesystem::DirectoryPtr* local = dir.get();
|
| (*local)->Flush(base::Bind(&LevelDBFileThread::OnSyncDirectoryComplete, this,
|
| @@ -366,7 +445,7 @@ void LevelDBFileThread::OnSyncDirectoryComplete(
|
| filesystem::FileError in_error) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
| *out_error = in_error;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| }
|
|
|
| void LevelDBFileThread::FileExistsImpl(OpaqueDir* dir,
|
| @@ -374,6 +453,10 @@ void LevelDBFileThread::FileExistsImpl(OpaqueDir* dir,
|
| base::WaitableEvent* done_event,
|
| bool* exists) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
| +
|
| + if (RegisterDirAndWaitableEvent(dir, done_event))
|
| + return;
|
| +
|
| dir->directory->Exists(
|
| mojo::String::From(name),
|
| base::Bind(&LevelDBFileThread::OnFileExistsComplete, this,
|
| @@ -386,7 +469,7 @@ void LevelDBFileThread::OnFileExistsComplete(base::WaitableEvent* done_event,
|
| bool in_exists) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
| *exists = in_exists;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| }
|
|
|
| void LevelDBFileThread::GetChildrenImpl(OpaqueDir* dir,
|
| @@ -396,6 +479,9 @@ void LevelDBFileThread::GetChildrenImpl(OpaqueDir* dir,
|
| filesystem::FileError* out_error) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
|
|
| + if (RegisterDirAndWaitableEvent(dir, done_event))
|
| + return;
|
| +
|
| // Step one: open the directory |name| from the toplevel directory.
|
| scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr);
|
| mojo::InterfaceRequest<filesystem::Directory> proxy = GetProxy(target.get());
|
| @@ -415,10 +501,16 @@ void LevelDBFileThread::OnGetChildrenOpened(
|
|
|
| if (in_error != filesystem::FileError::OK) {
|
| *out_error = in_error;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| return;
|
| }
|
|
|
| + // Add a dependency between the new directory we opened and the current
|
| + // waitable event.
|
| + dir->set_connection_error_handler(
|
| + base::Bind(&LevelDBFileThread::OnConnectionError, this));
|
| + waitable_event_dependencies_[done_event].directories.insert(dir.get());
|
| +
|
| // We move the object into the bind before we call. Copy to the stack.
|
| filesystem::DirectoryPtr* local = dir.get();
|
| (*local)->Read(base::Bind(&LevelDBFileThread::OnGetChildrenComplete, this,
|
| @@ -441,7 +533,7 @@ void LevelDBFileThread::OnGetChildrenComplete(
|
| }
|
|
|
| *out_error = in_error;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| }
|
|
|
| void LevelDBFileThread::DeleteImpl(OpaqueDir* dir,
|
| @@ -451,6 +543,9 @@ void LevelDBFileThread::DeleteImpl(OpaqueDir* dir,
|
| filesystem::FileError* out_error) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
|
|
| + if (RegisterDirAndWaitableEvent(dir, done_event))
|
| + return;
|
| +
|
| dir->directory->Delete(mojo::String::From(name), delete_flags,
|
| base::Bind(&LevelDBFileThread::OnSimpleComplete, this,
|
| done_event, out_error));
|
| @@ -462,6 +557,9 @@ void LevelDBFileThread::CreateDirImpl(OpaqueDir* dir,
|
| filesystem::FileError* out_error) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
|
|
| + if (RegisterDirAndWaitableEvent(dir, done_event))
|
| + return;
|
| +
|
| dir->directory->OpenDirectory(
|
| name, nullptr,
|
| filesystem::kFlagRead | filesystem::kFlagWrite | filesystem::kFlagCreate,
|
| @@ -475,6 +573,10 @@ void LevelDBFileThread::GetFileSizeImpl(OpaqueDir* dir,
|
| base::WaitableEvent* done_event,
|
| filesystem::FileError* out_error) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
| +
|
| + if (RegisterDirAndWaitableEvent(dir, done_event))
|
| + return;
|
| +
|
| dir->directory->StatFile(
|
| path, base::Bind(&LevelDBFileThread::OnGetFileSizeImpl, this, file_size,
|
| done_event, out_error));
|
| @@ -490,7 +592,7 @@ void LevelDBFileThread::OnGetFileSizeImpl(
|
| if (file_info)
|
| *file_size = file_info->size;
|
| *out_error = in_error;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| }
|
|
|
| void LevelDBFileThread::RenameFileImpl(OpaqueDir* dir,
|
| @@ -499,6 +601,10 @@ void LevelDBFileThread::RenameFileImpl(OpaqueDir* dir,
|
| base::WaitableEvent* done_event,
|
| filesystem::FileError* out_error) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
| +
|
| + if (RegisterDirAndWaitableEvent(dir, done_event))
|
| + return;
|
| +
|
| dir->directory->Rename(mojo::String::From(old_path),
|
| mojo::String::From(new_path),
|
| base::Bind(&LevelDBFileThread::OnSimpleComplete, this,
|
| @@ -512,6 +618,9 @@ void LevelDBFileThread::LockFileImpl(OpaqueDir* dir,
|
| OpaqueLock** out_lock) {
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
|
|
| + if (RegisterDirAndWaitableEvent(dir, done_event))
|
| + return;
|
| +
|
| // Since a lock is associated with a file descriptor, we need to open and
|
| // have a persistent file on the other side of the connection.
|
| scoped_ptr<filesystem::FilePtr> target(new filesystem::FilePtr);
|
| @@ -534,10 +643,17 @@ void LevelDBFileThread::OnOpenLockFileComplete(
|
|
|
| if (in_error != filesystem::FileError::OK) {
|
| *out_error = in_error;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| return;
|
| }
|
|
|
| + // Add a dependency between the new file we opened and the current waitable
|
| + // event. (This dependency will get cleared in OnLockFileComplete if we
|
| + // complete this call safely.)
|
| + file->set_connection_error_handler(
|
| + base::Bind(&LevelDBFileThread::OnConnectionError, this));
|
| + waitable_event_dependencies_[done_event].files.insert(file.get());
|
| +
|
| filesystem::FilePtr* local = file.get();
|
| (*local)->Lock(base::Bind(&LevelDBFileThread::OnLockFileComplete, this,
|
| base::Passed(&file), done_event, out_error,
|
| @@ -559,7 +675,7 @@ void LevelDBFileThread::OnLockFileComplete(scoped_ptr<filesystem::FilePtr> file,
|
| *out_lock = l;
|
| }
|
|
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| }
|
|
|
| void LevelDBFileThread::UnlockFileImpl(scoped_ptr<OpaqueLock> lock,
|
| @@ -581,7 +697,7 @@ void LevelDBFileThread::OnUnlockFileCompleted(scoped_ptr<OpaqueLock> lock,
|
| // get destructed on its own here.
|
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
|
| *out_error = in_error;
|
| - done_event->Signal();
|
| + CompleteWaitableEvent(done_event);
|
| }
|
|
|
| void LevelDBFileThread::Init() {
|
|
|