Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(47)

Unified Diff: components/leveldb/leveldb_file_thread.cc

Issue 1825413003: leveldb_service: Attempt to fix deadlock on shutdown. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: typo Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « components/leveldb/leveldb_file_thread.h ('k') | components/leveldb/leveldb_service_unittest.cc » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: components/leveldb/leveldb_file_thread.cc
diff --git a/components/leveldb/leveldb_file_thread.cc b/components/leveldb/leveldb_file_thread.cc
index 7bc9774bc4a0e616ca8feeb4e333e7313ac903e0..efad40e36d86df8c1a5a51066908a9e7c675042f 100644
--- a/components/leveldb/leveldb_file_thread.cc
+++ b/components/leveldb/leveldb_file_thread.cc
@@ -4,6 +4,8 @@
#include "components/leveldb/leveldb_file_thread.h"
+#include <set>
+
#include "base/bind.h"
#include "mojo/message_pump/message_pump_mojo.h"
#include "mojo/platform_handle/platform_handle_functions.h"
@@ -19,13 +21,21 @@ struct LevelDBFileThread::OpaqueLock {
};
struct LevelDBFileThread::OpaqueDir {
- OpaqueDir(mojo::InterfacePtrInfo<filesystem::Directory> directory_info) {
+ explicit OpaqueDir(
+ mojo::InterfacePtrInfo<filesystem::Directory> directory_info) {
directory.Bind(std::move(directory_info));
}
filesystem::DirectoryPtr directory;
};
+struct LevelDBFileThread::WaitableEventDependencies {
+ WaitableEventDependencies() {}
+ ~WaitableEventDependencies() {}
+ std::set<filesystem::DirectoryPtr*> directories;
+ std::set<filesystem::FilePtr*> files;
+};
+
LevelDBFileThread::LevelDBFileThread()
: base::Thread(kLevelDBFileThreadName),
outstanding_opaque_dirs_(0) {
@@ -257,12 +267,64 @@ LevelDBFileThread::~LevelDBFileThread() {
DCHECK_EQ(0, outstanding_opaque_dirs_);
}
+bool LevelDBFileThread::RegisterDirAndWaitableEvent(
+ OpaqueDir* dir,
+ base::WaitableEvent* done_event) {
+ if (!dir->directory.is_bound()) {
+ // The directory went out of scope between the PostTask on the other thread
+ // and now.
+ done_event->Signal();
+ return true;
+ }
+
+ waitable_event_dependencies_[done_event].directories.insert(&dir->directory);
+ return false;
+}
+
+void LevelDBFileThread::CompleteWaitableEvent(base::WaitableEvent* done_event) {
+ // Clean up the dependencies that we no longer care about.
+ waitable_event_dependencies_.erase(done_event);
+ done_event->Signal();
+}
+
+void LevelDBFileThread::OnConnectionError() {
+ // One of our interface ptrs has become unbound. Signal the event which has
+ // it as a dependency.
+ auto it = waitable_event_dependencies_.begin();
+ while (it != waitable_event_dependencies_.end()) {
+ bool unbound_ptr_found = false;
+ for (const auto* dir : it->second.directories) {
+ if (!dir->is_bound()) {
+ unbound_ptr_found = true;
+ break;
+ }
+ }
+
+ if (!unbound_ptr_found) {
+ for (const auto* file : it->second.files) {
+ if (!file->is_bound()) {
+ unbound_ptr_found = true;
+ break;
+ }
+ }
+ }
+
+ if (unbound_ptr_found) {
+ base::WaitableEvent* e = it->first;
+ it = waitable_event_dependencies_.erase(it);
+ e->Signal();
+ } else {
+ ++it;
+ }
+ }
+}
+
void LevelDBFileThread::OnSimpleComplete(base::WaitableEvent* done_event,
filesystem::FileError* out_error,
filesystem::FileError in_error) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
*out_error = in_error;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
}
void LevelDBFileThread::RegisterDirectoryImpl(
@@ -274,6 +336,11 @@ void LevelDBFileThread::RegisterDirectoryImpl(
// Take the Directory pipe and bind it on this thread.
*out_dir = new OpaqueDir(std::move(directory_info));
outstanding_opaque_dirs_++;
+
+ // Register the connection error handler for the resulting DirectoryPtr
+ (*out_dir)->directory.set_connection_error_handler(
+ base::Bind(&LevelDBFileThread::OnConnectionError, this));
+
done_event->Signal();
}
@@ -283,7 +350,7 @@ void LevelDBFileThread::UnregisterDirectoryImpl(
// Only delete the directories on the thread that owns them.
delete dir;
outstanding_opaque_dirs_--;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
}
void LevelDBFileThread::OpenFileHandleImpl(OpaqueDir* dir,
@@ -293,6 +360,9 @@ void LevelDBFileThread::OpenFileHandleImpl(OpaqueDir* dir,
base::File* out_file) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
+ if (RegisterDirAndWaitableEvent(dir, done_event))
+ return;
+
dir->directory->OpenFileHandle(
mojo::String::From(name), open_flags,
base::Bind(&LevelDBFileThread::OnOpenFileHandleComplete, this, done_event,
@@ -321,7 +391,7 @@ void LevelDBFileThread::OnOpenFileHandleComplete(
}
}
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
}
void LevelDBFileThread::SyncDirectoryImpl(OpaqueDir* dir,
@@ -330,6 +400,9 @@ void LevelDBFileThread::SyncDirectoryImpl(OpaqueDir* dir,
filesystem::FileError* out_error) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
+ if (RegisterDirAndWaitableEvent(dir, done_event))
+ return;
+
// Step one: open the directory |name| from the toplevel directory.
scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr);
@@ -349,10 +422,16 @@ void LevelDBFileThread::OnSyncDirctoryOpened(
if (in_error != filesystem::FileError::OK) {
*out_error = in_error;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
return;
}
+ // Add a dependency between the new directory we opened and the current
+ // waitable event.
+ dir->set_connection_error_handler(
+ base::Bind(&LevelDBFileThread::OnConnectionError, this));
+ waitable_event_dependencies_[done_event].directories.insert(dir.get());
+
// We move the object into the bind before we call. Copy to the stack.
filesystem::DirectoryPtr* local = dir.get();
(*local)->Flush(base::Bind(&LevelDBFileThread::OnSyncDirectoryComplete, this,
@@ -366,7 +445,7 @@ void LevelDBFileThread::OnSyncDirectoryComplete(
filesystem::FileError in_error) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
*out_error = in_error;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
}
void LevelDBFileThread::FileExistsImpl(OpaqueDir* dir,
@@ -374,6 +453,10 @@ void LevelDBFileThread::FileExistsImpl(OpaqueDir* dir,
base::WaitableEvent* done_event,
bool* exists) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
+
+ if (RegisterDirAndWaitableEvent(dir, done_event))
+ return;
+
dir->directory->Exists(
mojo::String::From(name),
base::Bind(&LevelDBFileThread::OnFileExistsComplete, this,
@@ -386,7 +469,7 @@ void LevelDBFileThread::OnFileExistsComplete(base::WaitableEvent* done_event,
bool in_exists) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
*exists = in_exists;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
}
void LevelDBFileThread::GetChildrenImpl(OpaqueDir* dir,
@@ -396,6 +479,9 @@ void LevelDBFileThread::GetChildrenImpl(OpaqueDir* dir,
filesystem::FileError* out_error) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
+ if (RegisterDirAndWaitableEvent(dir, done_event))
+ return;
+
// Step one: open the directory |name| from the toplevel directory.
scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr);
mojo::InterfaceRequest<filesystem::Directory> proxy = GetProxy(target.get());
@@ -415,10 +501,16 @@ void LevelDBFileThread::OnGetChildrenOpened(
if (in_error != filesystem::FileError::OK) {
*out_error = in_error;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
return;
}
+ // Add a dependency between the new directory we opened and the current
+ // waitable event.
+ dir->set_connection_error_handler(
+ base::Bind(&LevelDBFileThread::OnConnectionError, this));
+ waitable_event_dependencies_[done_event].directories.insert(dir.get());
+
// We move the object into the bind before we call. Copy to the stack.
filesystem::DirectoryPtr* local = dir.get();
(*local)->Read(base::Bind(&LevelDBFileThread::OnGetChildrenComplete, this,
@@ -441,7 +533,7 @@ void LevelDBFileThread::OnGetChildrenComplete(
}
*out_error = in_error;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
}
void LevelDBFileThread::DeleteImpl(OpaqueDir* dir,
@@ -451,6 +543,9 @@ void LevelDBFileThread::DeleteImpl(OpaqueDir* dir,
filesystem::FileError* out_error) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
+ if (RegisterDirAndWaitableEvent(dir, done_event))
+ return;
+
dir->directory->Delete(mojo::String::From(name), delete_flags,
base::Bind(&LevelDBFileThread::OnSimpleComplete, this,
done_event, out_error));
@@ -462,6 +557,9 @@ void LevelDBFileThread::CreateDirImpl(OpaqueDir* dir,
filesystem::FileError* out_error) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
+ if (RegisterDirAndWaitableEvent(dir, done_event))
+ return;
+
dir->directory->OpenDirectory(
name, nullptr,
filesystem::kFlagRead | filesystem::kFlagWrite | filesystem::kFlagCreate,
@@ -475,6 +573,10 @@ void LevelDBFileThread::GetFileSizeImpl(OpaqueDir* dir,
base::WaitableEvent* done_event,
filesystem::FileError* out_error) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
+
+ if (RegisterDirAndWaitableEvent(dir, done_event))
+ return;
+
dir->directory->StatFile(
path, base::Bind(&LevelDBFileThread::OnGetFileSizeImpl, this, file_size,
done_event, out_error));
@@ -490,7 +592,7 @@ void LevelDBFileThread::OnGetFileSizeImpl(
if (file_info)
*file_size = file_info->size;
*out_error = in_error;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
}
void LevelDBFileThread::RenameFileImpl(OpaqueDir* dir,
@@ -499,6 +601,10 @@ void LevelDBFileThread::RenameFileImpl(OpaqueDir* dir,
base::WaitableEvent* done_event,
filesystem::FileError* out_error) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
+
+ if (RegisterDirAndWaitableEvent(dir, done_event))
+ return;
+
dir->directory->Rename(mojo::String::From(old_path),
mojo::String::From(new_path),
base::Bind(&LevelDBFileThread::OnSimpleComplete, this,
@@ -512,6 +618,9 @@ void LevelDBFileThread::LockFileImpl(OpaqueDir* dir,
OpaqueLock** out_lock) {
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
+ if (RegisterDirAndWaitableEvent(dir, done_event))
+ return;
+
// Since a lock is associated with a file descriptor, we need to open and
// have a persistent file on the other side of the connection.
scoped_ptr<filesystem::FilePtr> target(new filesystem::FilePtr);
@@ -534,10 +643,17 @@ void LevelDBFileThread::OnOpenLockFileComplete(
if (in_error != filesystem::FileError::OK) {
*out_error = in_error;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
return;
}
+ // Add a dependency between the new file we opened and the current waitable
+ // event. (This dependency will get cleared in OnLockFileComplete if we
+ // complete this call safely.)
+ file->set_connection_error_handler(
+ base::Bind(&LevelDBFileThread::OnConnectionError, this));
+ waitable_event_dependencies_[done_event].files.insert(file.get());
+
filesystem::FilePtr* local = file.get();
(*local)->Lock(base::Bind(&LevelDBFileThread::OnLockFileComplete, this,
base::Passed(&file), done_event, out_error,
@@ -559,7 +675,7 @@ void LevelDBFileThread::OnLockFileComplete(scoped_ptr<filesystem::FilePtr> file,
*out_lock = l;
}
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
}
void LevelDBFileThread::UnlockFileImpl(scoped_ptr<OpaqueLock> lock,
@@ -581,7 +697,7 @@ void LevelDBFileThread::OnUnlockFileCompleted(scoped_ptr<OpaqueLock> lock,
// get destructed on its own here.
DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
*out_error = in_error;
- done_event->Signal();
+ CompleteWaitableEvent(done_event);
}
void LevelDBFileThread::Init() {
« no previous file with comments | « components/leveldb/leveldb_file_thread.h ('k') | components/leveldb/leveldb_service_unittest.cc » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698