Chromium Code Reviews| Index: components/leveldb_proto/core/proto_database_impl.h |
| diff --git a/components/leveldb_proto/core/proto_database_impl.h b/components/leveldb_proto/core/proto_database_impl.h |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..b47e738562a0b8301bca49d4e41b9272b5c10f2b |
| --- /dev/null |
| +++ b/components/leveldb_proto/core/proto_database_impl.h |
| @@ -0,0 +1,312 @@ |
| +// Copyright 2014 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +#ifndef COMPONENTS_LEVELDB_PROTO_CORE_PROTO_DATABASE_IMPL_H_ |
| +#define COMPONENTS_LEVELDB_PROTO_CORE_PROTO_DATABASE_IMPL_H_ |
| + |
| +#include <string> |
| +#include <vector> |
| + |
| +#include "base/bind.h" |
| +#include "base/file_util.h" |
| +#include "base/files/file_path.h" |
| +#include "base/memory/scoped_ptr.h" |
| +#include "base/message_loop/message_loop.h" |
| +#include "base/sequenced_task_runner.h" |
| +#include "base/strings/string_util.h" |
| +#include "base/threading/sequenced_worker_pool.h" |
| +#include "base/threading/thread_checker.h" |
| +#include "base/threading/thread_collision_warner.h" |
| +#include "components/leveldb_proto/core/proto_database.h" |
| +#include "third_party/leveldatabase/src/include/leveldb/db.h" |
| +#include "third_party/leveldatabase/src/include/leveldb/iterator.h" |
| +#include "third_party/leveldatabase/src/include/leveldb/options.h" |
| +#include "third_party/leveldatabase/src/include/leveldb/slice.h" |
| +#include "third_party/leveldatabase/src/include/leveldb/status.h" |
| +#include "third_party/leveldatabase/src/include/leveldb/write_batch.h" |
| + |
| +namespace base { |
| +class SequencedTaskRunner; |
|
cjhopman
2014/06/13 19:41:05
Don't need these forward declarations anymore.
Mathieu
2014/06/13 22:07:07
Done.
|
| +class MessageLoop; |
| +} |
| + |
| +namespace leveldb { |
| +class DB; |
|
cjhopman
2014/06/13 19:41:05
Don't need this one either.
Mathieu
2014/06/13 22:07:07
Done.
|
| +} |
| + |
| +namespace leveldb_proto { |
| + |
| +// When the ProtoDatabaseImpl instance is deleted, in-progress asynchronous |
| +// operations will be completed and the corresponding callbacks will be called. |
|
cjhopman
2014/06/13 19:41:05
Maybe point out that construction/calls/destructio
Mathieu
2014/06/13 22:07:07
Done.
|
| +template <typename T> |
| +class ProtoDatabaseImpl : public ProtoDatabase<T> { |
| + public: |
| + // The underlying database. Calls to this type may be blocking. |
| + class Database { |
|
cjhopman
2014/06/13 19:41:05
I think that the SerializeAsString() and ParseFrom
Mathieu
2014/06/13 22:07:07
Gotcha. The Database no longer depends on proto. T
|
| + public: |
| + virtual bool Init(const base::FilePath& database_dir) = 0; |
| + virtual bool Save( |
| + const typename ProtoDatabase<T>::KeyEntryVector& entries_to_save, |
| + const std::vector<std::string>& keys_to_remove) = 0; |
| + virtual bool Load(std::vector<T>* entries) = 0; |
| + virtual ~Database() {} |
| + }; |
| + |
| + // Once constructed, function calls and destruction should all occur on the |
| + // same thread (not necessarily the same as the constructor). |
| + class LevelDB : public Database { |
| + public: |
| + LevelDB(); |
| + virtual ~LevelDB(); |
| + virtual bool Init(const base::FilePath& database_dir) OVERRIDE; |
| + virtual bool Save( |
| + const typename ProtoDatabase<T>::KeyEntryVector& entries_to_save, |
| + const std::vector<std::string>& keys_to_remove) OVERRIDE; |
| + virtual bool Load(std::vector<T>* entries) OVERRIDE; |
| + |
| + private: |
| + DFAKE_MUTEX(thread_checker_); |
| + scoped_ptr<leveldb::DB> db_; |
| + }; |
| + |
| + explicit ProtoDatabaseImpl( |
| + scoped_refptr<base::SequencedTaskRunner> task_runner); |
| + |
| + virtual ~ProtoDatabaseImpl(); |
| + |
| + // ProtoDatabase implementation. |
| + virtual void Init(const base::FilePath& database_dir, |
|
cjhopman
2014/06/13 19:41:05
I don't think I like that this Init() is exposed t
Mathieu
2014/06/13 22:07:07
Ack. Added a TODO with you as point of contact.
|
| + typename ProtoDatabase<T>::InitCallback callback) OVERRIDE; |
| + virtual void UpdateEntries( |
| + scoped_ptr<typename ProtoDatabase<T>::KeyEntryVector> entries_to_save, |
| + scoped_ptr<std::vector<std::string> > keys_to_remove, |
| + typename ProtoDatabase<T>::UpdateCallback callback) OVERRIDE; |
| + virtual void LoadEntries( |
| + typename ProtoDatabase<T>::LoadCallback callback) OVERRIDE; |
| + |
| + // Allow callers to provide their own Database implementation. |
| + void InitWithDatabase(scoped_ptr<Database> database, |
| + const base::FilePath& database_dir, |
| + typename ProtoDatabase<T>::InitCallback callback); |
| + |
| + private: |
| + base::ThreadChecker thread_checker_; |
| + |
| + // Used to run blocking tasks in-order. |
| + scoped_refptr<base::SequencedTaskRunner> task_runner_; |
| + |
| + scoped_ptr<Database> db_; |
| + |
| + DISALLOW_COPY_AND_ASSIGN(ProtoDatabaseImpl); |
| +}; |
| + |
| +using base::MessageLoop; |
|
cjhopman
2014/06/13 19:41:05
Remove these usings
Mathieu
2014/06/13 22:07:07
Done.
|
| +using base::SequencedTaskRunner; |
| + |
| +template <typename T> |
| +ProtoDatabaseImpl<T>::LevelDB::LevelDB() {} |
| + |
| +template <typename T> |
| +ProtoDatabaseImpl<T>::LevelDB::~LevelDB() { |
| + DFAKE_SCOPED_LOCK(thread_checker_); |
| +} |
| + |
| +template <typename T> |
| +bool ProtoDatabaseImpl<T>::LevelDB::Init(const base::FilePath& database_dir) { |
| + DFAKE_SCOPED_LOCK(thread_checker_); |
| + |
| + leveldb::Options options; |
| + options.create_if_missing = true; |
| + options.max_open_files = 0; // Use minimum. |
| + |
| + std::string path = database_dir.AsUTF8Unsafe(); |
| + |
| + leveldb::DB* db = NULL; |
| + leveldb::Status status = leveldb::DB::Open(options, path, &db); |
| + if (status.IsCorruption()) { |
| + base::DeleteFile(database_dir, true); |
| + status = leveldb::DB::Open(options, path, &db); |
| + } |
| + |
| + if (status.ok()) { |
| + CHECK(db); |
| + db_.reset(db); |
| + return true; |
| + } |
| + |
| + LOG(WARNING) << "Unable to open " << database_dir.value() << ": " |
| + << status.ToString(); |
| + return false; |
| +} |
| + |
| +template <typename T> |
| +bool ProtoDatabaseImpl<T>::LevelDB::Save( |
| + const typename ProtoDatabase<T>::KeyEntryVector& entries_to_save, |
| + const std::vector<std::string>& keys_to_remove) { |
| + DFAKE_SCOPED_LOCK(thread_checker_); |
| + |
| + leveldb::WriteBatch updates; |
| + for (typename ProtoDatabase<T>::KeyEntryVector::const_iterator it = |
| + entries_to_save.begin(); |
| + it != entries_to_save.end(); ++it) { |
| + updates.Put(leveldb::Slice(it->first), |
| + leveldb::Slice(it->second.SerializeAsString())); |
| + } |
| + for (std::vector<std::string>::const_iterator it = keys_to_remove.begin(); |
| + it != keys_to_remove.end(); ++it) { |
| + updates.Delete(leveldb::Slice(*it)); |
| + } |
| + |
| + leveldb::WriteOptions options; |
| + options.sync = true; |
| + leveldb::Status status = db_->Write(options, &updates); |
| + if (status.ok()) return true; |
| + |
| + DLOG(WARNING) << "Failed writing leveldb_proto entries: " |
| + << status.ToString(); |
| + return false; |
| +} |
| + |
| +template <typename T> |
| +bool ProtoDatabaseImpl<T>::LevelDB::Load(std::vector<T>* entries) { |
| + DFAKE_SCOPED_LOCK(thread_checker_); |
| + |
| + leveldb::ReadOptions options; |
| + scoped_ptr<leveldb::Iterator> db_iterator(db_->NewIterator(options)); |
| + for (db_iterator->SeekToFirst(); db_iterator->Valid(); db_iterator->Next()) { |
| + leveldb::Slice value_slice = db_iterator->value(); |
| + |
| + T entry; |
| + if (!entry.ParseFromArray(value_slice.data(), value_slice.size())) { |
| + DLOG(WARNING) << "Unable to parse leveldb_proto entry " |
| + << db_iterator->key().ToString(); |
| + // TODO(cjhopman): Decide what to do about un-parseable entries. |
| + } |
| + entries->push_back(entry); |
| + } |
| + return true; |
| +} |
| + |
| +namespace { |
| + |
| +template <typename T> |
| +void RunInitCallback(typename ProtoDatabase<T>::InitCallback callback, |
| + const bool* success) { |
| + callback.Run(*success); |
| +} |
| + |
| +template <typename T> |
| +void RunUpdateCallback(typename ProtoDatabase<T>::UpdateCallback callback, |
| + const bool* success) { |
| + callback.Run(*success); |
| +} |
| + |
| +template <typename T> |
| +void RunLoadCallback(typename ProtoDatabase<T>::LoadCallback callback, |
| + const bool* success, scoped_ptr<std::vector<T> > entries) { |
| + callback.Run(*success, entries.Pass()); |
| +} |
| + |
| +template <typename T> |
| +void InitFromTaskRunner(typename ProtoDatabaseImpl<T>::Database* database, |
| + const base::FilePath& database_dir, bool* success) { |
| + DCHECK(success); |
| + |
| + // TODO(cjhopman): Histogram for database size. |
| + *success = database->Init(database_dir); |
| +} |
| + |
| +template <typename T> |
| +void UpdateEntriesFromTaskRunner( |
| + typename ProtoDatabaseImpl<T>::Database* database, |
| + scoped_ptr<typename ProtoDatabase<T>::KeyEntryVector> entries_to_save, |
| + scoped_ptr<std::vector<std::string> > keys_to_remove, bool* success) { |
| + DCHECK(success); |
| + *success = database->Save(*entries_to_save, *keys_to_remove); |
| +} |
| + |
| +template <typename T> |
| +void LoadEntriesFromTaskRunner( |
| + typename ProtoDatabaseImpl<T>::Database* database, std::vector<T>* entries, |
| + bool* success) { |
| + DCHECK(success); |
| + DCHECK(entries); |
| + |
| + entries->clear(); |
| + *success = database->Load(entries); |
| +} |
| + |
| +} // namespace |
| + |
| +template <typename T> |
| +ProtoDatabaseImpl<T>::ProtoDatabaseImpl( |
| + scoped_refptr<base::SequencedTaskRunner> task_runner) |
| + : task_runner_(task_runner) {} |
| + |
| +template <typename T> |
| +ProtoDatabaseImpl<T>::~ProtoDatabaseImpl() { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + if (!task_runner_->DeleteSoon(FROM_HERE, db_.release())) { |
| + DLOG(WARNING) << "DOM distiller database will not be deleted."; |
| + } |
| +} |
| + |
| +template <typename T> |
| +void ProtoDatabaseImpl<T>::Init( |
| + const base::FilePath& database_dir, |
| + typename ProtoDatabase<T>::InitCallback callback) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + InitWithDatabase(scoped_ptr<Database>(new LevelDB()), database_dir, callback); |
| +} |
| + |
| +template <typename T> |
| +void ProtoDatabaseImpl<T>::InitWithDatabase( |
| + scoped_ptr<Database> database, const base::FilePath& database_dir, |
| + typename ProtoDatabase<T>::InitCallback callback) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + DCHECK(!db_); |
| + DCHECK(database); |
| + db_.reset(database.release()); |
| + bool* success = new bool(false); |
| + task_runner_->PostTaskAndReply( |
| + FROM_HERE, base::Bind(InitFromTaskRunner<T>, base::Unretained(db_.get()), |
| + database_dir, success), |
| + base::Bind(RunInitCallback<T>, callback, base::Owned(success))); |
| +} |
| + |
| +template <typename T> |
| +void ProtoDatabaseImpl<T>::UpdateEntries( |
| + scoped_ptr<typename ProtoDatabase<T>::KeyEntryVector> entries_to_save, |
| + scoped_ptr<std::vector<std::string> > keys_to_remove, |
| + typename ProtoDatabase<T>::UpdateCallback callback) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + bool* success = new bool(false); |
| + task_runner_->PostTaskAndReply( |
| + FROM_HERE, |
| + base::Bind(UpdateEntriesFromTaskRunner<T>, base::Unretained(db_.get()), |
| + base::Passed(&entries_to_save), base::Passed(&keys_to_remove), |
| + success), |
| + base::Bind(RunUpdateCallback<T>, callback, base::Owned(success))); |
| +} |
| + |
| +template <typename T> |
| +void ProtoDatabaseImpl<T>::LoadEntries( |
| + typename ProtoDatabase<T>::LoadCallback callback) { |
| + DCHECK(thread_checker_.CalledOnValidThread()); |
| + bool* success = new bool(false); |
| + |
| + scoped_ptr<std::vector<T> > entries(new std::vector<T>()); |
| + // Get this pointer before entries is base::Passed() so we can use it below. |
| + std::vector<T>* entries_ptr = entries.get(); |
| + |
| + task_runner_->PostTaskAndReply( |
| + FROM_HERE, base::Bind(LoadEntriesFromTaskRunner<T>, |
| + base::Unretained(db_.get()), entries_ptr, success), |
| + base::Bind(RunLoadCallback<T>, callback, base::Owned(success), |
| + base::Passed(&entries))); |
| +} |
| + |
| +} // namespace leveldb_proto |
| + |
| +#endif // COMPONENTS_LEVELDB_PROTO_CORE_PROTO_DATABASE_IMPL_H_ |