Chromium Code Reviews| Index: components/leveldb/leveldb_file_thread.cc |
| diff --git a/components/leveldb/leveldb_file_thread.cc b/components/leveldb/leveldb_file_thread.cc |
| index 7bc9774bc4a0e616ca8feeb4e333e7313ac903e0..698e975dace5a8d7e2628adbe2003d628b6f7880 100644 |
| --- a/components/leveldb/leveldb_file_thread.cc |
| +++ b/components/leveldb/leveldb_file_thread.cc |
| @@ -4,6 +4,8 @@ |
| #include "components/leveldb/leveldb_file_thread.h" |
| +#include <set> |
| + |
| #include "base/bind.h" |
| #include "mojo/message_pump/message_pump_mojo.h" |
| #include "mojo/platform_handle/platform_handle_functions.h" |
| @@ -19,13 +21,21 @@ struct LevelDBFileThread::OpaqueLock { |
| }; |
| struct LevelDBFileThread::OpaqueDir { |
| - OpaqueDir(mojo::InterfacePtrInfo<filesystem::Directory> directory_info) { |
| + explicit OpaqueDir( |
| + mojo::InterfacePtrInfo<filesystem::Directory> directory_info) { |
| directory.Bind(std::move(directory_info)); |
| } |
| filesystem::DirectoryPtr directory; |
| }; |
| +struct LevelDBFileThread::WaitableEventDependencies { |
| + WaitableEventDependencies() {} |
| + ~WaitableEventDependencies() {} |
| + std::set<filesystem::DirectoryPtr*> directories; |
| + std::set<filesystem::FilePtr*> files; |
| +}; |
| + |
| LevelDBFileThread::LevelDBFileThread() |
| : base::Thread(kLevelDBFileThreadName), |
| outstanding_opaque_dirs_(0) { |
| @@ -257,12 +267,64 @@ LevelDBFileThread::~LevelDBFileThread() { |
| DCHECK_EQ(0, outstanding_opaque_dirs_); |
| } |
| +bool LevelDBFileThread::RegisterDirAndWaitableEvent( |
| + OpaqueDir* dir, |
| + base::WaitableEvent* done_event) { |
| + if (!dir->directory.is_bound()) { |
| + // The directory went out of scope between the PostTask on the other thread |
| + // and now. |
| + done_event->Signal(); |
| + return true; |
| + } |
| + |
| + waitable_event_dependencies_[done_event].directories.insert(&dir->directory); |
| + return false; |
| +} |
| + |
| +void LevelDBFileThread::CompleteWaitableEvent(base::WaitableEvent* done_event) { |
| + // Clean up the dependencies that we no longer care about. |
| + waitable_event_dependencies_.erase(done_event); |
| + done_event->Signal(); |
| +} |
| + |
| +void LevelDBFileThread::OnConnectionError() { |
| + // One of our interface ptrs has become unbound. Signal the event which has |
| + // it as a dependency. |
| + auto it = waitable_event_dependencies_.begin(); |
| + while (it != waitable_event_dependencies_.end()) { |
| + bool unbound_ptr_found = false; |
| + for (const auto* dir : it->second.directories) { |
| + if (!dir->is_bound()) { |
| + unbound_ptr_found = true; |
| + break; |
| + } |
| + } |
| + |
| + if (!unbound_ptr_found) { |
| + for (const auto* file : it->second.files) { |
| + if (!file->is_bound()) { |
| + unbound_ptr_found = true; |
| + break; |
| + } |
| + } |
| + } |
| + |
| + if (unbound_ptr_found) { |
| + base::WaitableEvent* e = it->first; |
| + it = waitable_event_dependencies_.erase(it); |
| + e->Signal(); |
| + } else { |
| + ++it; |
| + } |
| + } |
| +} |
| + |
| void LevelDBFileThread::OnSimpleComplete(base::WaitableEvent* done_event, |
| filesystem::FileError* out_error, |
| filesystem::FileError in_error) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| *out_error = in_error; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| } |
| void LevelDBFileThread::RegisterDirectoryImpl( |
| @@ -274,6 +336,11 @@ void LevelDBFileThread::RegisterDirectoryImpl( |
| // Take the Directory pipe and bind it on this thread. |
| *out_dir = new OpaqueDir(std::move(directory_info)); |
| outstanding_opaque_dirs_++; |
| + |
| + // Register the connection error handler for the resulting DirectoryPtr |
| + (*out_dir)->directory.set_connection_error_handler( |
| + base::Bind(&LevelDBFileThread::OnConnectionError, this)); |
| + |
| done_event->Signal(); |
| } |
| @@ -283,7 +350,7 @@ void LevelDBFileThread::UnregisterDirectoryImpl( |
| // Only delete the directories on the thread that owns them. |
| delete dir; |
| outstanding_opaque_dirs_--; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| } |
| void LevelDBFileThread::OpenFileHandleImpl(OpaqueDir* dir, |
| @@ -293,6 +360,9 @@ void LevelDBFileThread::OpenFileHandleImpl(OpaqueDir* dir, |
| base::File* out_file) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| + if (RegisterDirAndWaitableEvent(dir, done_event)) |
| + return; |
| + |
| dir->directory->OpenFileHandle( |
| mojo::String::From(name), open_flags, |
| base::Bind(&LevelDBFileThread::OnOpenFileHandleComplete, this, done_event, |
| @@ -321,7 +391,7 @@ void LevelDBFileThread::OnOpenFileHandleComplete( |
| } |
| } |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| } |
| void LevelDBFileThread::SyncDirectoryImpl(OpaqueDir* dir, |
| @@ -330,6 +400,9 @@ void LevelDBFileThread::SyncDirectoryImpl(OpaqueDir* dir, |
| filesystem::FileError* out_error) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| + if (RegisterDirAndWaitableEvent(dir, done_event)) |
| + return; |
| + |
| // Step one: open the directory |name| from the toplevel directory. |
| scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr); |
| @@ -349,10 +422,16 @@ void LevelDBFileThread::OnSyncDirctoryOpened( |
| if (in_error != filesystem::FileError::OK) { |
| *out_error = in_error; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| return; |
| } |
| + // Add a dependency between the new directory we opened and the current |
| + // waitable event. |
| + dir->set_connection_error_handler( |
| + base::Bind(&LevelDBFileThread::OnConnectionError, this)); |
| + waitable_event_dependencies_[done_event].directories.insert(dir.get()); |
| + |
| // We move the object into the bind before we call. Copy to the stack. |
| filesystem::DirectoryPtr* local = dir.get(); |
| (*local)->Flush(base::Bind(&LevelDBFileThread::OnSyncDirectoryComplete, this, |
| @@ -366,7 +445,7 @@ void LevelDBFileThread::OnSyncDirectoryComplete( |
| filesystem::FileError in_error) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| *out_error = in_error; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| } |
| void LevelDBFileThread::FileExistsImpl(OpaqueDir* dir, |
| @@ -374,6 +453,10 @@ void LevelDBFileThread::FileExistsImpl(OpaqueDir* dir, |
| base::WaitableEvent* done_event, |
| bool* exists) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| + |
| + if (RegisterDirAndWaitableEvent(dir, done_event)) |
| + return; |
| + |
| dir->directory->Exists( |
| mojo::String::From(name), |
| base::Bind(&LevelDBFileThread::OnFileExistsComplete, this, |
| @@ -386,7 +469,7 @@ void LevelDBFileThread::OnFileExistsComplete(base::WaitableEvent* done_event, |
| bool in_exists) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| *exists = in_exists; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| } |
| void LevelDBFileThread::GetChildrenImpl(OpaqueDir* dir, |
| @@ -396,6 +479,9 @@ void LevelDBFileThread::GetChildrenImpl(OpaqueDir* dir, |
| filesystem::FileError* out_error) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| + if (RegisterDirAndWaitableEvent(dir, done_event)) |
| + return; |
| + |
| // Step one: open the directory |name| from the toplevel directory. |
| scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr); |
| mojo::InterfaceRequest<filesystem::Directory> proxy = GetProxy(target.get()); |
| @@ -415,10 +501,16 @@ void LevelDBFileThread::OnGetChildrenOpened( |
| if (in_error != filesystem::FileError::OK) { |
| *out_error = in_error; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| return; |
| } |
| + // Add a dependency between the new directory we opened and the current |
| + // waitable event. |
| + dir->set_connection_error_handler( |
| + base::Bind(&LevelDBFileThread::OnConnectionError, this)); |
| + waitable_event_dependencies_[done_event].directories.insert(dir.get()); |
| + |
| // We move the object into the bind before we call. Copy to the stack. |
| filesystem::DirectoryPtr* local = dir.get(); |
| (*local)->Read(base::Bind(&LevelDBFileThread::OnGetChildrenComplete, this, |
| @@ -441,7 +533,7 @@ void LevelDBFileThread::OnGetChildrenComplete( |
| } |
| *out_error = in_error; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| } |
| void LevelDBFileThread::DeleteImpl(OpaqueDir* dir, |
| @@ -451,6 +543,9 @@ void LevelDBFileThread::DeleteImpl(OpaqueDir* dir, |
| filesystem::FileError* out_error) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| + if (RegisterDirAndWaitableEvent(dir, done_event)) |
| + return; |
| + |
| dir->directory->Delete(mojo::String::From(name), delete_flags, |
| base::Bind(&LevelDBFileThread::OnSimpleComplete, this, |
| done_event, out_error)); |
| @@ -462,6 +557,9 @@ void LevelDBFileThread::CreateDirImpl(OpaqueDir* dir, |
| filesystem::FileError* out_error) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| + if (RegisterDirAndWaitableEvent(dir, done_event)) |
| + return; |
| + |
| dir->directory->OpenDirectory( |
| name, nullptr, |
| filesystem::kFlagRead | filesystem::kFlagWrite | filesystem::kFlagCreate, |
| @@ -475,6 +573,10 @@ void LevelDBFileThread::GetFileSizeImpl(OpaqueDir* dir, |
| base::WaitableEvent* done_event, |
| filesystem::FileError* out_error) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| + |
| + if (RegisterDirAndWaitableEvent(dir, done_event)) |
| + return; |
| + |
| dir->directory->StatFile( |
| path, base::Bind(&LevelDBFileThread::OnGetFileSizeImpl, this, file_size, |
| done_event, out_error)); |
| @@ -490,7 +592,7 @@ void LevelDBFileThread::OnGetFileSizeImpl( |
| if (file_info) |
| *file_size = file_info->size; |
| *out_error = in_error; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| } |
| void LevelDBFileThread::RenameFileImpl(OpaqueDir* dir, |
| @@ -499,6 +601,10 @@ void LevelDBFileThread::RenameFileImpl(OpaqueDir* dir, |
| base::WaitableEvent* done_event, |
| filesystem::FileError* out_error) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| + |
| + if (RegisterDirAndWaitableEvent(dir, done_event)) |
| + return; |
| + |
| dir->directory->Rename(mojo::String::From(old_path), |
| mojo::String::From(new_path), |
| base::Bind(&LevelDBFileThread::OnSimpleComplete, this, |
| @@ -512,6 +618,9 @@ void LevelDBFileThread::LockFileImpl(OpaqueDir* dir, |
| OpaqueLock** out_lock) { |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| + if (RegisterDirAndWaitableEvent(dir, done_event)) |
| + return; |
| + |
| // Since a lock is associated with a file descriptor, we need to open and |
| // have a persistent file on the other side of the connection. |
| scoped_ptr<filesystem::FilePtr> target(new filesystem::FilePtr); |
| @@ -534,10 +643,17 @@ void LevelDBFileThread::OnOpenLockFileComplete( |
| if (in_error != filesystem::FileError::OK) { |
| *out_error = in_error; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| return; |
| } |
| + // Add a dependency between the new file we opened and the current waitable |
| + // event. (This dependency will get cleared in OnLockFileCompelte if we |
|
jam
2016/03/24 17:27:39
nit: OnLockFileComplete
|
| + // complete this call safely.) |
| + file->set_connection_error_handler( |
| + base::Bind(&LevelDBFileThread::OnConnectionError, this)); |
| + waitable_event_dependencies_[done_event].files.insert(file.get()); |
| + |
| filesystem::FilePtr* local = file.get(); |
| (*local)->Lock(base::Bind(&LevelDBFileThread::OnLockFileComplete, this, |
| base::Passed(&file), done_event, out_error, |
| @@ -559,7 +675,7 @@ void LevelDBFileThread::OnLockFileComplete(scoped_ptr<filesystem::FilePtr> file, |
| *out_lock = l; |
| } |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| } |
| void LevelDBFileThread::UnlockFileImpl(scoped_ptr<OpaqueLock> lock, |
| @@ -581,7 +697,7 @@ void LevelDBFileThread::OnUnlockFileCompleted(scoped_ptr<OpaqueLock> lock, |
| // get destructed on its own here. |
| DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); |
| *out_error = in_error; |
| - done_event->Signal(); |
| + CompleteWaitableEvent(done_event); |
| } |
| void LevelDBFileThread::Init() { |