Index: components/leveldb_proto/core/proto_database_impl.h |
diff --git a/components/leveldb_proto/core/proto_database_impl.h b/components/leveldb_proto/core/proto_database_impl.h |
new file mode 100644 |
index 0000000000000000000000000000000000000000..b47e738562a0b8301bca49d4e41b9272b5c10f2b |
--- /dev/null |
+++ b/components/leveldb_proto/core/proto_database_impl.h |
@@ -0,0 +1,312 @@ |
+// Copyright 2014 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+#ifndef COMPONENTS_LEVELDB_PROTO_CORE_PROTO_DATABASE_IMPL_H_ |
+#define COMPONENTS_LEVELDB_PROTO_CORE_PROTO_DATABASE_IMPL_H_ |
+ |
+#include <string> |
+#include <vector> |
+ |
+#include "base/bind.h" |
+#include "base/file_util.h" |
+#include "base/files/file_path.h" |
+#include "base/memory/scoped_ptr.h" |
+#include "base/message_loop/message_loop.h" |
+#include "base/sequenced_task_runner.h" |
+#include "base/strings/string_util.h" |
+#include "base/threading/sequenced_worker_pool.h" |
+#include "base/threading/thread_checker.h" |
+#include "base/threading/thread_collision_warner.h" |
+#include "components/leveldb_proto/core/proto_database.h" |
+#include "third_party/leveldatabase/src/include/leveldb/db.h" |
+#include "third_party/leveldatabase/src/include/leveldb/iterator.h" |
+#include "third_party/leveldatabase/src/include/leveldb/options.h" |
+#include "third_party/leveldatabase/src/include/leveldb/slice.h" |
+#include "third_party/leveldatabase/src/include/leveldb/status.h" |
+#include "third_party/leveldatabase/src/include/leveldb/write_batch.h" |
+ |
+namespace base { |
+class SequencedTaskRunner; |
cjhopman
2014/06/13 19:41:05
Don't need these forward declarations anymore.
Mathieu
2014/06/13 22:07:07
Done.
|
+class MessageLoop; |
+} |
+ |
+namespace leveldb { |
+class DB; |
cjhopman
2014/06/13 19:41:05
Don't need this one either.
Mathieu
2014/06/13 22:07:07
Done.
|
+} |
+ |
+namespace leveldb_proto { |
+ |
+// When the ProtoDatabaseImpl instance is deleted, in-progress asynchronous |
+// operations will be completed and the corresponding callbacks will be called. |
cjhopman
2014/06/13 19:41:05
Maybe point out that construction/calls/destructio
Mathieu
2014/06/13 22:07:07
Done.
|
+template <typename T> |
+class ProtoDatabaseImpl : public ProtoDatabase<T> { |
+ public: |
+ // The underlying database. Calls to this type may be blocking. |
+ class Database { |
cjhopman
2014/06/13 19:41:05
I think that the SerializeAsString() and ParseFrom
Mathieu
2014/06/13 22:07:07
Gotcha. The Database no longer depends on proto. T
|
+ public: |
+ virtual bool Init(const base::FilePath& database_dir) = 0; |
+ virtual bool Save( |
+ const typename ProtoDatabase<T>::KeyEntryVector& entries_to_save, |
+ const std::vector<std::string>& keys_to_remove) = 0; |
+ virtual bool Load(std::vector<T>* entries) = 0; |
+ virtual ~Database() {} |
+ }; |
+ |
+ // Once constructed, function calls and destruction should all occur on the |
+ // same thread (not necessarily the same as the constructor). |
+ class LevelDB : public Database { |
+ public: |
+ LevelDB(); |
+ virtual ~LevelDB(); |
+ virtual bool Init(const base::FilePath& database_dir) OVERRIDE; |
+ virtual bool Save( |
+ const typename ProtoDatabase<T>::KeyEntryVector& entries_to_save, |
+ const std::vector<std::string>& keys_to_remove) OVERRIDE; |
+ virtual bool Load(std::vector<T>* entries) OVERRIDE; |
+ |
+ private: |
+ DFAKE_MUTEX(thread_checker_); |
+ scoped_ptr<leveldb::DB> db_; |
+ }; |
+ |
+ explicit ProtoDatabaseImpl( |
+ scoped_refptr<base::SequencedTaskRunner> task_runner); |
+ |
+ virtual ~ProtoDatabaseImpl(); |
+ |
+ // ProtoDatabase implementation. |
+ virtual void Init(const base::FilePath& database_dir, |
cjhopman
2014/06/13 19:41:05
I don't think I like that this Init() is exposed t
Mathieu
2014/06/13 22:07:07
Ack. Added a TODO with you as point of contact.
|
+ typename ProtoDatabase<T>::InitCallback callback) OVERRIDE; |
+ virtual void UpdateEntries( |
+ scoped_ptr<typename ProtoDatabase<T>::KeyEntryVector> entries_to_save, |
+ scoped_ptr<std::vector<std::string> > keys_to_remove, |
+ typename ProtoDatabase<T>::UpdateCallback callback) OVERRIDE; |
+ virtual void LoadEntries( |
+ typename ProtoDatabase<T>::LoadCallback callback) OVERRIDE; |
+ |
+ // Allow callers to provide their own Database implementation. |
+ void InitWithDatabase(scoped_ptr<Database> database, |
+ const base::FilePath& database_dir, |
+ typename ProtoDatabase<T>::InitCallback callback); |
+ |
+ private: |
+ base::ThreadChecker thread_checker_; |
+ |
+ // Used to run blocking tasks in-order. |
+ scoped_refptr<base::SequencedTaskRunner> task_runner_; |
+ |
+ scoped_ptr<Database> db_; |
+ |
+ DISALLOW_COPY_AND_ASSIGN(ProtoDatabaseImpl); |
+}; |
+ |
+using base::MessageLoop; |
cjhopman
2014/06/13 19:41:05
Remove these usings
Mathieu
2014/06/13 22:07:07
Done.
|
+using base::SequencedTaskRunner; |
+ |
+template <typename T> |
+ProtoDatabaseImpl<T>::LevelDB::LevelDB() {} |
+ |
+template <typename T> |
+ProtoDatabaseImpl<T>::LevelDB::~LevelDB() { |
+ DFAKE_SCOPED_LOCK(thread_checker_); |
+} |
+ |
+template <typename T> |
+bool ProtoDatabaseImpl<T>::LevelDB::Init(const base::FilePath& database_dir) { |
+ DFAKE_SCOPED_LOCK(thread_checker_); |
+ |
+ leveldb::Options options; |
+ options.create_if_missing = true; |
+ options.max_open_files = 0; // Use minimum. |
+ |
+ std::string path = database_dir.AsUTF8Unsafe(); |
+ |
+ leveldb::DB* db = NULL; |
+ leveldb::Status status = leveldb::DB::Open(options, path, &db); |
+ if (status.IsCorruption()) { |
+ base::DeleteFile(database_dir, true); |
+ status = leveldb::DB::Open(options, path, &db); |
+ } |
+ |
+ if (status.ok()) { |
+ CHECK(db); |
+ db_.reset(db); |
+ return true; |
+ } |
+ |
+ LOG(WARNING) << "Unable to open " << database_dir.value() << ": " |
+ << status.ToString(); |
+ return false; |
+} |
+ |
+template <typename T> |
+bool ProtoDatabaseImpl<T>::LevelDB::Save( |
+ const typename ProtoDatabase<T>::KeyEntryVector& entries_to_save, |
+ const std::vector<std::string>& keys_to_remove) { |
+ DFAKE_SCOPED_LOCK(thread_checker_); |
+ |
+ leveldb::WriteBatch updates; |
+ for (typename ProtoDatabase<T>::KeyEntryVector::const_iterator it = |
+ entries_to_save.begin(); |
+ it != entries_to_save.end(); ++it) { |
+ updates.Put(leveldb::Slice(it->first), |
+ leveldb::Slice(it->second.SerializeAsString())); |
+ } |
+ for (std::vector<std::string>::const_iterator it = keys_to_remove.begin(); |
+ it != keys_to_remove.end(); ++it) { |
+ updates.Delete(leveldb::Slice(*it)); |
+ } |
+ |
+ leveldb::WriteOptions options; |
+ options.sync = true; |
+ leveldb::Status status = db_->Write(options, &updates); |
+ if (status.ok()) return true; |
+ |
+ DLOG(WARNING) << "Failed writing leveldb_proto entries: " |
+ << status.ToString(); |
+ return false; |
+} |
+ |
+template <typename T> |
+bool ProtoDatabaseImpl<T>::LevelDB::Load(std::vector<T>* entries) { |
+ DFAKE_SCOPED_LOCK(thread_checker_); |
+ |
+ leveldb::ReadOptions options; |
+ scoped_ptr<leveldb::Iterator> db_iterator(db_->NewIterator(options)); |
+ for (db_iterator->SeekToFirst(); db_iterator->Valid(); db_iterator->Next()) { |
+ leveldb::Slice value_slice = db_iterator->value(); |
+ |
+ T entry; |
+ if (!entry.ParseFromArray(value_slice.data(), value_slice.size())) { |
+ DLOG(WARNING) << "Unable to parse leveldb_proto entry " |
+ << db_iterator->key().ToString(); |
+ // TODO(cjhopman): Decide what to do about un-parseable entries. |
+ } |
+ entries->push_back(entry); |
+ } |
+ return true; |
+} |
+ |
+namespace { |
+ |
+template <typename T> |
+void RunInitCallback(typename ProtoDatabase<T>::InitCallback callback, |
+ const bool* success) { |
+ callback.Run(*success); |
+} |
+ |
+template <typename T> |
+void RunUpdateCallback(typename ProtoDatabase<T>::UpdateCallback callback, |
+ const bool* success) { |
+ callback.Run(*success); |
+} |
+ |
+template <typename T> |
+void RunLoadCallback(typename ProtoDatabase<T>::LoadCallback callback, |
+ const bool* success, scoped_ptr<std::vector<T> > entries) { |
+ callback.Run(*success, entries.Pass()); |
+} |
+ |
+template <typename T> |
+void InitFromTaskRunner(typename ProtoDatabaseImpl<T>::Database* database, |
+ const base::FilePath& database_dir, bool* success) { |
+ DCHECK(success); |
+ |
+ // TODO(cjhopman): Histogram for database size. |
+ *success = database->Init(database_dir); |
+} |
+ |
+template <typename T> |
+void UpdateEntriesFromTaskRunner( |
+ typename ProtoDatabaseImpl<T>::Database* database, |
+ scoped_ptr<typename ProtoDatabase<T>::KeyEntryVector> entries_to_save, |
+ scoped_ptr<std::vector<std::string> > keys_to_remove, bool* success) { |
+ DCHECK(success); |
+ *success = database->Save(*entries_to_save, *keys_to_remove); |
+} |
+ |
+template <typename T> |
+void LoadEntriesFromTaskRunner( |
+ typename ProtoDatabaseImpl<T>::Database* database, std::vector<T>* entries, |
+ bool* success) { |
+ DCHECK(success); |
+ DCHECK(entries); |
+ |
+ entries->clear(); |
+ *success = database->Load(entries); |
+} |
+ |
+} // namespace |
+ |
+template <typename T> |
+ProtoDatabaseImpl<T>::ProtoDatabaseImpl( |
+ scoped_refptr<base::SequencedTaskRunner> task_runner) |
+ : task_runner_(task_runner) {} |
+ |
+template <typename T> |
+ProtoDatabaseImpl<T>::~ProtoDatabaseImpl() { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ if (!task_runner_->DeleteSoon(FROM_HERE, db_.release())) { |
+ DLOG(WARNING) << "DOM distiller database will not be deleted."; |
+ } |
+} |
+ |
+template <typename T> |
+void ProtoDatabaseImpl<T>::Init( |
+ const base::FilePath& database_dir, |
+ typename ProtoDatabase<T>::InitCallback callback) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ InitWithDatabase(scoped_ptr<Database>(new LevelDB()), database_dir, callback); |
+} |
+ |
+template <typename T> |
+void ProtoDatabaseImpl<T>::InitWithDatabase( |
+ scoped_ptr<Database> database, const base::FilePath& database_dir, |
+ typename ProtoDatabase<T>::InitCallback callback) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ DCHECK(!db_); |
+ DCHECK(database); |
+ db_.reset(database.release()); |
+ bool* success = new bool(false); |
+ task_runner_->PostTaskAndReply( |
+ FROM_HERE, base::Bind(InitFromTaskRunner<T>, base::Unretained(db_.get()), |
+ database_dir, success), |
+ base::Bind(RunInitCallback<T>, callback, base::Owned(success))); |
+} |
+ |
+template <typename T> |
+void ProtoDatabaseImpl<T>::UpdateEntries( |
+ scoped_ptr<typename ProtoDatabase<T>::KeyEntryVector> entries_to_save, |
+ scoped_ptr<std::vector<std::string> > keys_to_remove, |
+ typename ProtoDatabase<T>::UpdateCallback callback) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ bool* success = new bool(false); |
+ task_runner_->PostTaskAndReply( |
+ FROM_HERE, |
+ base::Bind(UpdateEntriesFromTaskRunner<T>, base::Unretained(db_.get()), |
+ base::Passed(&entries_to_save), base::Passed(&keys_to_remove), |
+ success), |
+ base::Bind(RunUpdateCallback<T>, callback, base::Owned(success))); |
+} |
+ |
+template <typename T> |
+void ProtoDatabaseImpl<T>::LoadEntries( |
+ typename ProtoDatabase<T>::LoadCallback callback) { |
+ DCHECK(thread_checker_.CalledOnValidThread()); |
+ bool* success = new bool(false); |
+ |
+ scoped_ptr<std::vector<T> > entries(new std::vector<T>()); |
+ // Get this pointer before entries is base::Passed() so we can use it below. |
+ std::vector<T>* entries_ptr = entries.get(); |
+ |
+ task_runner_->PostTaskAndReply( |
+ FROM_HERE, base::Bind(LoadEntriesFromTaskRunner<T>, |
+ base::Unretained(db_.get()), entries_ptr, success), |
+ base::Bind(RunLoadCallback<T>, callback, base::Owned(success), |
+ base::Passed(&entries))); |
+} |
+ |
+} // namespace leveldb_proto |
+ |
+#endif // COMPONENTS_LEVELDB_PROTO_CORE_PROTO_DATABASE_IMPL_H_ |