Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(765)

Side by Side Diff: components/leveldb/leveldb_file_thread.cc

Issue 1825413003: leveldb_service: Attempt to fix deadlock on shutdown. (Closed) Base URL: https://chromium.googlesource.com/chromium/src.git@master
Patch Set: style/lint Created 4 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2016 The Chromium Authors. All rights reserved. 1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 #include "components/leveldb/leveldb_file_thread.h" 5 #include "components/leveldb/leveldb_file_thread.h"
6 6
7 #include <set>
8
7 #include "base/bind.h" 9 #include "base/bind.h"
8 #include "mojo/message_pump/message_pump_mojo.h" 10 #include "mojo/message_pump/message_pump_mojo.h"
9 #include "mojo/platform_handle/platform_handle_functions.h" 11 #include "mojo/platform_handle/platform_handle_functions.h"
10 12
11 namespace leveldb { 13 namespace leveldb {
12 14
13 namespace { 15 namespace {
14 const char kLevelDBFileThreadName[] = "LevelDB_File_Thread"; 16 const char kLevelDBFileThreadName[] = "LevelDB_File_Thread";
15 } // namespace 17 } // namespace
16 18
17 struct LevelDBFileThread::OpaqueLock { 19 struct LevelDBFileThread::OpaqueLock {
18 filesystem::FilePtr lock_file; 20 filesystem::FilePtr lock_file;
19 }; 21 };
20 22
21 struct LevelDBFileThread::OpaqueDir { 23 struct LevelDBFileThread::OpaqueDir {
22 OpaqueDir(mojo::InterfacePtrInfo<filesystem::Directory> directory_info) { 24 explicit OpaqueDir(
25 mojo::InterfacePtrInfo<filesystem::Directory> directory_info) {
23 directory.Bind(std::move(directory_info)); 26 directory.Bind(std::move(directory_info));
24 } 27 }
25 28
26 filesystem::DirectoryPtr directory; 29 filesystem::DirectoryPtr directory;
27 }; 30 };
28 31
32 struct LevelDBFileThread::WaitableEventDependencies {
33 WaitableEventDependencies() {}
34 ~WaitableEventDependencies() {}
35 std::set<filesystem::DirectoryPtr*> directories;
36 std::set<filesystem::FilePtr*> files;
37 };
38
29 LevelDBFileThread::LevelDBFileThread() 39 LevelDBFileThread::LevelDBFileThread()
30 : base::Thread(kLevelDBFileThreadName), 40 : base::Thread(kLevelDBFileThreadName),
31 outstanding_opaque_dirs_(0) { 41 outstanding_opaque_dirs_(0) {
32 base::Thread::Options options; 42 base::Thread::Options options;
33 options.message_pump_factory = 43 options.message_pump_factory =
34 base::Bind(&mojo::common::MessagePumpMojo::Create); 44 base::Bind(&mojo::common::MessagePumpMojo::Create);
35 StartWithOptions(options); 45 StartWithOptions(options);
36 } 46 }
37 47
38 LevelDBFileThread::OpaqueDir* LevelDBFileThread::RegisterDirectory( 48 LevelDBFileThread::OpaqueDir* LevelDBFileThread::RegisterDirectory(
(...skipping 211 matching lines...) Expand 10 before | Expand all | Expand 10 after
250 done_event.Wait(); 260 done_event.Wait();
251 261
252 return error; 262 return error;
253 } 263 }
254 264
255 LevelDBFileThread::~LevelDBFileThread() { 265 LevelDBFileThread::~LevelDBFileThread() {
256 Stop(); 266 Stop();
257 DCHECK_EQ(0, outstanding_opaque_dirs_); 267 DCHECK_EQ(0, outstanding_opaque_dirs_);
258 } 268 }
259 269
270 bool LevelDBFileThread::RegisterDirAndWaitableEvent(
271 OpaqueDir* dir,
272 base::WaitableEvent* done_event) {
273 if (!dir->directory.is_bound()) {
274 // The directory went out of scope between the PostTask on the other thread
275 // and now.
276 done_event->Signal();
277 return true;
278 }
279
280 waitable_event_dependencies_[done_event].directories.insert(&dir->directory);
281 return false;
282 }
283
284 void LevelDBFileThread::CompleteWaitableEvent(base::WaitableEvent* done_event) {
285 // Clean up the dependencies that we no longer care about.
286 waitable_event_dependencies_.erase(done_event);
287 done_event->Signal();
288 }
289
290 void LevelDBFileThread::OnConnectionError() {
291 // One of our interface ptrs has become unbound. Signal the event which has
292 // it as a dependency.
293 auto it = waitable_event_dependencies_.begin();
294 while (it != waitable_event_dependencies_.end()) {
295 bool unbound_ptr_found = false;
296 for (const auto* dir : it->second.directories) {
297 if (!dir->is_bound()) {
298 unbound_ptr_found = true;
299 break;
300 }
301 }
302
303 if (!unbound_ptr_found) {
304 for (const auto* file : it->second.files) {
305 if (!file->is_bound()) {
306 unbound_ptr_found = true;
307 break;
308 }
309 }
310 }
311
312 if (unbound_ptr_found) {
313 base::WaitableEvent* e = it->first;
314 it = waitable_event_dependencies_.erase(it);
315 e->Signal();
316 } else {
317 ++it;
318 }
319 }
320 }
321
260 void LevelDBFileThread::OnSimpleComplete(base::WaitableEvent* done_event, 322 void LevelDBFileThread::OnSimpleComplete(base::WaitableEvent* done_event,
261 filesystem::FileError* out_error, 323 filesystem::FileError* out_error,
262 filesystem::FileError in_error) { 324 filesystem::FileError in_error) {
263 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 325 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
264 *out_error = in_error; 326 *out_error = in_error;
265 done_event->Signal(); 327 CompleteWaitableEvent(done_event);
266 } 328 }
267 329
268 void LevelDBFileThread::RegisterDirectoryImpl( 330 void LevelDBFileThread::RegisterDirectoryImpl(
269 base::WaitableEvent* done_event, 331 base::WaitableEvent* done_event,
270 mojo::InterfacePtrInfo<filesystem::Directory> directory_info, 332 mojo::InterfacePtrInfo<filesystem::Directory> directory_info,
271 OpaqueDir** out_dir) { 333 OpaqueDir** out_dir) {
272 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 334 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
273 335
274 // Take the Directory pipe and bind it on this thread. 336 // Take the Directory pipe and bind it on this thread.
275 *out_dir = new OpaqueDir(std::move(directory_info)); 337 *out_dir = new OpaqueDir(std::move(directory_info));
276 outstanding_opaque_dirs_++; 338 outstanding_opaque_dirs_++;
339
340 // Register the connection error handler for the resulting DirectoryPtr
341 (*out_dir)->directory.set_connection_error_handler(
342 base::Bind(&LevelDBFileThread::OnConnectionError, this));
343
277 done_event->Signal(); 344 done_event->Signal();
278 } 345 }
279 346
280 void LevelDBFileThread::UnregisterDirectoryImpl( 347 void LevelDBFileThread::UnregisterDirectoryImpl(
281 base::WaitableEvent* done_event, 348 base::WaitableEvent* done_event,
282 OpaqueDir* dir) { 349 OpaqueDir* dir) {
283 // Only delete the directories on the thread that owns them. 350 // Only delete the directories on the thread that owns them.
284 delete dir; 351 delete dir;
285 outstanding_opaque_dirs_--; 352 outstanding_opaque_dirs_--;
286 done_event->Signal(); 353 CompleteWaitableEvent(done_event);
287 } 354 }
288 355
289 void LevelDBFileThread::OpenFileHandleImpl(OpaqueDir* dir, 356 void LevelDBFileThread::OpenFileHandleImpl(OpaqueDir* dir,
290 std::string name, 357 std::string name,
291 base::WaitableEvent* done_event, 358 base::WaitableEvent* done_event,
292 uint32_t open_flags, 359 uint32_t open_flags,
293 base::File* out_file) { 360 base::File* out_file) {
294 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 361 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
295 362
363 if (RegisterDirAndWaitableEvent(dir, done_event))
364 return;
365
296 dir->directory->OpenFileHandle( 366 dir->directory->OpenFileHandle(
297 mojo::String::From(name), open_flags, 367 mojo::String::From(name), open_flags,
298 base::Bind(&LevelDBFileThread::OnOpenFileHandleComplete, this, done_event, 368 base::Bind(&LevelDBFileThread::OnOpenFileHandleComplete, this, done_event,
299 out_file)); 369 out_file));
300 } 370 }
301 371
302 void LevelDBFileThread::OnOpenFileHandleComplete( 372 void LevelDBFileThread::OnOpenFileHandleComplete(
303 base::WaitableEvent* done_event, 373 base::WaitableEvent* done_event,
304 base::File* output_file, 374 base::File* output_file,
305 filesystem::FileError err, 375 filesystem::FileError err,
306 mojo::ScopedHandle handle) { 376 mojo::ScopedHandle handle) {
307 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 377 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
308 378
309 if (err != filesystem::FileError::OK) { 379 if (err != filesystem::FileError::OK) {
310 *output_file = base::File(static_cast<base::File::Error>(err)); 380 *output_file = base::File(static_cast<base::File::Error>(err));
311 } else { 381 } else {
312 MojoPlatformHandle platform_handle; 382 MojoPlatformHandle platform_handle;
313 MojoResult extract_result = 383 MojoResult extract_result =
314 MojoExtractPlatformHandle(handle.release().value(), &platform_handle); 384 MojoExtractPlatformHandle(handle.release().value(), &platform_handle);
315 385
316 if (extract_result == MOJO_RESULT_OK) { 386 if (extract_result == MOJO_RESULT_OK) {
317 *output_file = base::File(platform_handle); 387 *output_file = base::File(platform_handle);
318 } else { 388 } else {
319 NOTREACHED(); 389 NOTREACHED();
320 *output_file = base::File(base::File::Error::FILE_ERROR_FAILED); 390 *output_file = base::File(base::File::Error::FILE_ERROR_FAILED);
321 } 391 }
322 } 392 }
323 393
324 done_event->Signal(); 394 CompleteWaitableEvent(done_event);
325 } 395 }
326 396
327 void LevelDBFileThread::SyncDirectoryImpl(OpaqueDir* dir, 397 void LevelDBFileThread::SyncDirectoryImpl(OpaqueDir* dir,
328 std::string name, 398 std::string name,
329 base::WaitableEvent* done_event, 399 base::WaitableEvent* done_event,
330 filesystem::FileError* out_error) { 400 filesystem::FileError* out_error) {
331 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 401 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
332 402
403 if (RegisterDirAndWaitableEvent(dir, done_event))
404 return;
405
333 // Step one: open the directory |name| from the toplevel directory. 406 // Step one: open the directory |name| from the toplevel directory.
334 scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr); 407 scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr);
335 408
336 dir->directory->OpenDirectory( 409 dir->directory->OpenDirectory(
337 name, GetProxy(target.get()), 410 name, GetProxy(target.get()),
338 filesystem::kFlagRead | filesystem::kFlagWrite, 411 filesystem::kFlagRead | filesystem::kFlagWrite,
339 base::Bind(&LevelDBFileThread::OnSyncDirctoryOpened, this, 412 base::Bind(&LevelDBFileThread::OnSyncDirctoryOpened, this,
340 base::Passed(&target), done_event, out_error)); 413 base::Passed(&target), done_event, out_error));
341 } 414 }
342 415
343 void LevelDBFileThread::OnSyncDirctoryOpened( 416 void LevelDBFileThread::OnSyncDirctoryOpened(
344 scoped_ptr<filesystem::DirectoryPtr> dir, 417 scoped_ptr<filesystem::DirectoryPtr> dir,
345 base::WaitableEvent* done_event, 418 base::WaitableEvent* done_event,
346 filesystem::FileError* out_error, 419 filesystem::FileError* out_error,
347 filesystem::FileError in_error) { 420 filesystem::FileError in_error) {
348 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 421 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
349 422
350 if (in_error != filesystem::FileError::OK) { 423 if (in_error != filesystem::FileError::OK) {
351 *out_error = in_error; 424 *out_error = in_error;
352 done_event->Signal(); 425 CompleteWaitableEvent(done_event);
353 return; 426 return;
354 } 427 }
355 428
429 // Add a dependency between the new directory we opened and the current
430 // waitable event.
431 dir->set_connection_error_handler(
432 base::Bind(&LevelDBFileThread::OnConnectionError, this));
433 waitable_event_dependencies_[done_event].directories.insert(dir.get());
434
356 // We move the object into the bind before we call. Copy to the stack. 435 // We move the object into the bind before we call. Copy to the stack.
357 filesystem::DirectoryPtr* local = dir.get(); 436 filesystem::DirectoryPtr* local = dir.get();
358 (*local)->Flush(base::Bind(&LevelDBFileThread::OnSyncDirectoryComplete, this, 437 (*local)->Flush(base::Bind(&LevelDBFileThread::OnSyncDirectoryComplete, this,
359 base::Passed(&dir), done_event, out_error)); 438 base::Passed(&dir), done_event, out_error));
360 } 439 }
361 440
362 void LevelDBFileThread::OnSyncDirectoryComplete( 441 void LevelDBFileThread::OnSyncDirectoryComplete(
363 scoped_ptr<filesystem::DirectoryPtr> dir, 442 scoped_ptr<filesystem::DirectoryPtr> dir,
364 base::WaitableEvent* done_event, 443 base::WaitableEvent* done_event,
365 filesystem::FileError* out_error, 444 filesystem::FileError* out_error,
366 filesystem::FileError in_error) { 445 filesystem::FileError in_error) {
367 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 446 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
368 *out_error = in_error; 447 *out_error = in_error;
369 done_event->Signal(); 448 CompleteWaitableEvent(done_event);
370 } 449 }
371 450
372 void LevelDBFileThread::FileExistsImpl(OpaqueDir* dir, 451 void LevelDBFileThread::FileExistsImpl(OpaqueDir* dir,
373 std::string name, 452 std::string name,
374 base::WaitableEvent* done_event, 453 base::WaitableEvent* done_event,
375 bool* exists) { 454 bool* exists) {
376 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 455 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
456
457 if (RegisterDirAndWaitableEvent(dir, done_event))
458 return;
459
377 dir->directory->Exists( 460 dir->directory->Exists(
378 mojo::String::From(name), 461 mojo::String::From(name),
379 base::Bind(&LevelDBFileThread::OnFileExistsComplete, this, 462 base::Bind(&LevelDBFileThread::OnFileExistsComplete, this,
380 done_event, exists)); 463 done_event, exists));
381 } 464 }
382 465
383 void LevelDBFileThread::OnFileExistsComplete(base::WaitableEvent* done_event, 466 void LevelDBFileThread::OnFileExistsComplete(base::WaitableEvent* done_event,
384 bool* exists, 467 bool* exists,
385 filesystem::FileError err, 468 filesystem::FileError err,
386 bool in_exists) { 469 bool in_exists) {
387 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 470 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
388 *exists = in_exists; 471 *exists = in_exists;
389 done_event->Signal(); 472 CompleteWaitableEvent(done_event);
390 } 473 }
391 474
392 void LevelDBFileThread::GetChildrenImpl(OpaqueDir* dir, 475 void LevelDBFileThread::GetChildrenImpl(OpaqueDir* dir,
393 std::string name, 476 std::string name,
394 std::vector<std::string>* contents, 477 std::vector<std::string>* contents,
395 base::WaitableEvent* done_event, 478 base::WaitableEvent* done_event,
396 filesystem::FileError* out_error) { 479 filesystem::FileError* out_error) {
397 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 480 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
398 481
482 if (RegisterDirAndWaitableEvent(dir, done_event))
483 return;
484
399 // Step one: open the directory |name| from the toplevel directory. 485 // Step one: open the directory |name| from the toplevel directory.
400 scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr); 486 scoped_ptr<filesystem::DirectoryPtr> target(new filesystem::DirectoryPtr);
401 mojo::InterfaceRequest<filesystem::Directory> proxy = GetProxy(target.get()); 487 mojo::InterfaceRequest<filesystem::Directory> proxy = GetProxy(target.get());
402 dir->directory->OpenDirectory( 488 dir->directory->OpenDirectory(
403 name, std::move(proxy), filesystem::kFlagRead | filesystem::kFlagWrite, 489 name, std::move(proxy), filesystem::kFlagRead | filesystem::kFlagWrite,
404 base::Bind(&LevelDBFileThread::OnGetChildrenOpened, this, 490 base::Bind(&LevelDBFileThread::OnGetChildrenOpened, this,
405 base::Passed(&target), contents, done_event, out_error)); 491 base::Passed(&target), contents, done_event, out_error));
406 } 492 }
407 493
408 void LevelDBFileThread::OnGetChildrenOpened( 494 void LevelDBFileThread::OnGetChildrenOpened(
409 scoped_ptr<filesystem::DirectoryPtr> dir, 495 scoped_ptr<filesystem::DirectoryPtr> dir,
410 std::vector<std::string>* contents, 496 std::vector<std::string>* contents,
411 base::WaitableEvent* done_event, 497 base::WaitableEvent* done_event,
412 filesystem::FileError* out_error, 498 filesystem::FileError* out_error,
413 filesystem::FileError in_error) { 499 filesystem::FileError in_error) {
414 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 500 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
415 501
416 if (in_error != filesystem::FileError::OK) { 502 if (in_error != filesystem::FileError::OK) {
417 *out_error = in_error; 503 *out_error = in_error;
418 done_event->Signal(); 504 CompleteWaitableEvent(done_event);
419 return; 505 return;
420 } 506 }
421 507
508 // Add a dependency between the new directory we opened and the current
509 // waitable event.
510 dir->set_connection_error_handler(
511 base::Bind(&LevelDBFileThread::OnConnectionError, this));
512 waitable_event_dependencies_[done_event].directories.insert(dir.get());
513
422 // We move the object into the bind before we call. Copy to the stack. 514 // We move the object into the bind before we call. Copy to the stack.
423 filesystem::DirectoryPtr* local = dir.get(); 515 filesystem::DirectoryPtr* local = dir.get();
424 (*local)->Read(base::Bind(&LevelDBFileThread::OnGetChildrenComplete, this, 516 (*local)->Read(base::Bind(&LevelDBFileThread::OnGetChildrenComplete, this,
425 base::Passed(&dir), contents, done_event, 517 base::Passed(&dir), contents, done_event,
426 out_error)); 518 out_error));
427 } 519 }
428 520
429 void LevelDBFileThread::OnGetChildrenComplete( 521 void LevelDBFileThread::OnGetChildrenComplete(
430 scoped_ptr<filesystem::DirectoryPtr> dir, 522 scoped_ptr<filesystem::DirectoryPtr> dir,
431 std::vector<std::string>* out_contents, 523 std::vector<std::string>* out_contents,
432 base::WaitableEvent* done_event, 524 base::WaitableEvent* done_event,
433 filesystem::FileError* out_error, 525 filesystem::FileError* out_error,
434 filesystem::FileError in_error, 526 filesystem::FileError in_error,
435 mojo::Array<filesystem::DirectoryEntryPtr> directory_contents) { 527 mojo::Array<filesystem::DirectoryEntryPtr> directory_contents) {
436 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 528 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
437 529
438 if (!directory_contents.is_null()) { 530 if (!directory_contents.is_null()) {
439 for (size_t i = 0; i < directory_contents.size(); ++i) 531 for (size_t i = 0; i < directory_contents.size(); ++i)
440 out_contents->push_back(directory_contents[i]->name.To<std::string>()); 532 out_contents->push_back(directory_contents[i]->name.To<std::string>());
441 } 533 }
442 534
443 *out_error = in_error; 535 *out_error = in_error;
444 done_event->Signal(); 536 CompleteWaitableEvent(done_event);
445 } 537 }
446 538
447 void LevelDBFileThread::DeleteImpl(OpaqueDir* dir, 539 void LevelDBFileThread::DeleteImpl(OpaqueDir* dir,
448 std::string name, 540 std::string name,
449 uint32_t delete_flags, 541 uint32_t delete_flags,
450 base::WaitableEvent* done_event, 542 base::WaitableEvent* done_event,
451 filesystem::FileError* out_error) { 543 filesystem::FileError* out_error) {
452 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 544 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
453 545
546 if (RegisterDirAndWaitableEvent(dir, done_event))
547 return;
548
454 dir->directory->Delete(mojo::String::From(name), delete_flags, 549 dir->directory->Delete(mojo::String::From(name), delete_flags,
455 base::Bind(&LevelDBFileThread::OnSimpleComplete, this, 550 base::Bind(&LevelDBFileThread::OnSimpleComplete, this,
456 done_event, out_error)); 551 done_event, out_error));
457 } 552 }
458 553
459 void LevelDBFileThread::CreateDirImpl(OpaqueDir* dir, 554 void LevelDBFileThread::CreateDirImpl(OpaqueDir* dir,
460 std::string name, 555 std::string name,
461 base::WaitableEvent* done_event, 556 base::WaitableEvent* done_event,
462 filesystem::FileError* out_error) { 557 filesystem::FileError* out_error) {
463 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 558 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
464 559
560 if (RegisterDirAndWaitableEvent(dir, done_event))
561 return;
562
465 dir->directory->OpenDirectory( 563 dir->directory->OpenDirectory(
466 name, nullptr, 564 name, nullptr,
467 filesystem::kFlagRead | filesystem::kFlagWrite | filesystem::kFlagCreate, 565 filesystem::kFlagRead | filesystem::kFlagWrite | filesystem::kFlagCreate,
468 base::Bind(&LevelDBFileThread::OnSimpleComplete, this, done_event, 566 base::Bind(&LevelDBFileThread::OnSimpleComplete, this, done_event,
469 out_error)); 567 out_error));
470 } 568 }
471 569
472 void LevelDBFileThread::GetFileSizeImpl(OpaqueDir* dir, 570 void LevelDBFileThread::GetFileSizeImpl(OpaqueDir* dir,
473 const std::string& path, 571 const std::string& path,
474 uint64_t* file_size, 572 uint64_t* file_size,
475 base::WaitableEvent* done_event, 573 base::WaitableEvent* done_event,
476 filesystem::FileError* out_error) { 574 filesystem::FileError* out_error) {
477 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 575 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
576
577 if (RegisterDirAndWaitableEvent(dir, done_event))
578 return;
579
478 dir->directory->StatFile( 580 dir->directory->StatFile(
479 path, base::Bind(&LevelDBFileThread::OnGetFileSizeImpl, this, file_size, 581 path, base::Bind(&LevelDBFileThread::OnGetFileSizeImpl, this, file_size,
480 done_event, out_error)); 582 done_event, out_error));
481 } 583 }
482 584
483 void LevelDBFileThread::OnGetFileSizeImpl( 585 void LevelDBFileThread::OnGetFileSizeImpl(
484 uint64_t* file_size, 586 uint64_t* file_size,
485 base::WaitableEvent* done_event, 587 base::WaitableEvent* done_event,
486 filesystem::FileError* out_error, 588 filesystem::FileError* out_error,
487 filesystem::FileError in_error, 589 filesystem::FileError in_error,
488 filesystem::FileInformationPtr file_info) { 590 filesystem::FileInformationPtr file_info) {
489 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 591 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
490 if (file_info) 592 if (file_info)
491 *file_size = file_info->size; 593 *file_size = file_info->size;
492 *out_error = in_error; 594 *out_error = in_error;
493 done_event->Signal(); 595 CompleteWaitableEvent(done_event);
494 } 596 }
495 597
496 void LevelDBFileThread::RenameFileImpl(OpaqueDir* dir, 598 void LevelDBFileThread::RenameFileImpl(OpaqueDir* dir,
497 const std::string& old_path, 599 const std::string& old_path,
498 const std::string& new_path, 600 const std::string& new_path,
499 base::WaitableEvent* done_event, 601 base::WaitableEvent* done_event,
500 filesystem::FileError* out_error) { 602 filesystem::FileError* out_error) {
501 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 603 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
604
605 if (RegisterDirAndWaitableEvent(dir, done_event))
606 return;
607
502 dir->directory->Rename(mojo::String::From(old_path), 608 dir->directory->Rename(mojo::String::From(old_path),
503 mojo::String::From(new_path), 609 mojo::String::From(new_path),
504 base::Bind(&LevelDBFileThread::OnSimpleComplete, this, 610 base::Bind(&LevelDBFileThread::OnSimpleComplete, this,
505 done_event, out_error)); 611 done_event, out_error));
506 } 612 }
507 613
508 void LevelDBFileThread::LockFileImpl(OpaqueDir* dir, 614 void LevelDBFileThread::LockFileImpl(OpaqueDir* dir,
509 const std::string& path, 615 const std::string& path,
510 base::WaitableEvent* done_event, 616 base::WaitableEvent* done_event,
511 filesystem::FileError* out_error, 617 filesystem::FileError* out_error,
512 OpaqueLock** out_lock) { 618 OpaqueLock** out_lock) {
513 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 619 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
514 620
621 if (RegisterDirAndWaitableEvent(dir, done_event))
622 return;
623
515 // Since a lock is associated with a file descriptor, we need to open and 624 // Since a lock is associated with a file descriptor, we need to open and
516 // have a persistent file on the other side of the connection. 625 // have a persistent file on the other side of the connection.
517 scoped_ptr<filesystem::FilePtr> target(new filesystem::FilePtr); 626 scoped_ptr<filesystem::FilePtr> target(new filesystem::FilePtr);
518 mojo::InterfaceRequest<filesystem::File> proxy = GetProxy(target.get()); 627 mojo::InterfaceRequest<filesystem::File> proxy = GetProxy(target.get());
519 dir->directory->OpenFile( 628 dir->directory->OpenFile(
520 mojo::String::From(path), std::move(proxy), 629 mojo::String::From(path), std::move(proxy),
521 filesystem::kFlagOpenAlways | filesystem::kFlagRead | 630 filesystem::kFlagOpenAlways | filesystem::kFlagRead |
522 filesystem::kFlagWrite, 631 filesystem::kFlagWrite,
523 base::Bind(&LevelDBFileThread::OnOpenLockFileComplete, this, 632 base::Bind(&LevelDBFileThread::OnOpenLockFileComplete, this,
524 base::Passed(&target), done_event, out_error, out_lock)); 633 base::Passed(&target), done_event, out_error, out_lock));
525 } 634 }
526 635
527 void LevelDBFileThread::OnOpenLockFileComplete( 636 void LevelDBFileThread::OnOpenLockFileComplete(
528 scoped_ptr<filesystem::FilePtr> file, 637 scoped_ptr<filesystem::FilePtr> file,
529 base::WaitableEvent* done_event, 638 base::WaitableEvent* done_event,
530 filesystem::FileError* out_error, 639 filesystem::FileError* out_error,
531 OpaqueLock** out_lock, 640 OpaqueLock** out_lock,
532 filesystem::FileError in_error) { 641 filesystem::FileError in_error) {
533 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 642 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
534 643
535 if (in_error != filesystem::FileError::OK) { 644 if (in_error != filesystem::FileError::OK) {
536 *out_error = in_error; 645 *out_error = in_error;
537 done_event->Signal(); 646 CompleteWaitableEvent(done_event);
538 return; 647 return;
539 } 648 }
540 649
650 // Add a dependency between the new file we opened and the current waitable
651 // event. (This dependency will get cleared in OnLockFileCompelte if we
jam 2016/03/24 17:27:39 nit: OnLockFileComplete
652 // complete this call safely.)
653 file->set_connection_error_handler(
654 base::Bind(&LevelDBFileThread::OnConnectionError, this));
655 waitable_event_dependencies_[done_event].files.insert(file.get());
656
541 filesystem::FilePtr* local = file.get(); 657 filesystem::FilePtr* local = file.get();
542 (*local)->Lock(base::Bind(&LevelDBFileThread::OnLockFileComplete, this, 658 (*local)->Lock(base::Bind(&LevelDBFileThread::OnLockFileComplete, this,
543 base::Passed(&file), done_event, out_error, 659 base::Passed(&file), done_event, out_error,
544 out_lock)); 660 out_lock));
545 } 661 }
546 662
547 void LevelDBFileThread::OnLockFileComplete(scoped_ptr<filesystem::FilePtr> file, 663 void LevelDBFileThread::OnLockFileComplete(scoped_ptr<filesystem::FilePtr> file,
548 base::WaitableEvent* done_event, 664 base::WaitableEvent* done_event,
549 filesystem::FileError* out_error, 665 filesystem::FileError* out_error,
550 OpaqueLock** out_lock, 666 OpaqueLock** out_lock,
551 filesystem::FileError in_error) { 667 filesystem::FileError in_error) {
552 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 668 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
553 669
554 *out_error = in_error; 670 *out_error = in_error;
555 671
556 if (in_error == filesystem::FileError::OK) { 672 if (in_error == filesystem::FileError::OK) {
557 OpaqueLock* l = new OpaqueLock; 673 OpaqueLock* l = new OpaqueLock;
558 l->lock_file = std::move(*file); 674 l->lock_file = std::move(*file);
559 *out_lock = l; 675 *out_lock = l;
560 } 676 }
561 677
562 done_event->Signal(); 678 CompleteWaitableEvent(done_event);
563 } 679 }
564 680
565 void LevelDBFileThread::UnlockFileImpl(scoped_ptr<OpaqueLock> lock, 681 void LevelDBFileThread::UnlockFileImpl(scoped_ptr<OpaqueLock> lock,
566 base::WaitableEvent* done_event, 682 base::WaitableEvent* done_event,
567 filesystem::FileError* out_error) { 683 filesystem::FileError* out_error) {
568 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 684 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
569 685
570 OpaqueLock* local = lock.get(); 686 OpaqueLock* local = lock.get();
571 local->lock_file->Unlock(base::Bind(&LevelDBFileThread::OnUnlockFileCompleted, 687 local->lock_file->Unlock(base::Bind(&LevelDBFileThread::OnUnlockFileCompleted,
572 this, base::Passed(&lock), done_event, 688 this, base::Passed(&lock), done_event,
573 out_error)); 689 out_error));
574 } 690 }
575 691
576 void LevelDBFileThread::OnUnlockFileCompleted(scoped_ptr<OpaqueLock> lock, 692 void LevelDBFileThread::OnUnlockFileCompleted(scoped_ptr<OpaqueLock> lock,
577 base::WaitableEvent* done_event, 693 base::WaitableEvent* done_event,
578 filesystem::FileError* out_error, 694 filesystem::FileError* out_error,
579 filesystem::FileError in_error) { 695 filesystem::FileError in_error) {
580 // We're passed the OpauqeLock here for ownership reasons, but it's going to 696 // We're passed the OpauqeLock here for ownership reasons, but it's going to
581 // get destructed on its own here. 697 // get destructed on its own here.
582 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId()); 698 DCHECK_EQ(GetThreadId(), base::PlatformThread::CurrentId());
583 *out_error = in_error; 699 *out_error = in_error;
584 done_event->Signal(); 700 CompleteWaitableEvent(done_event);
585 } 701 }
586 702
587 void LevelDBFileThread::Init() { 703 void LevelDBFileThread::Init() {
588 } 704 }
589 705
590 void LevelDBFileThread::CleanUp() { 706 void LevelDBFileThread::CleanUp() {
591 } 707 }
592 708
593 } // namespace leveldb 709 } // namespace leveldb
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698