From 9060078609232082a989c7906e8d53b6c3b490b4 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 15 Mar 2018 16:52:13 -0700 Subject: Implements transactions for leveldb (#881) * Start work on leveldb transactions * Style * Working API. Not plumbed in yet * Move files into correct place * Wrangling file locations and associations * Tests pass * Add some comments * style * Fix copyright * Rewrite iterator internals to handle deletion-while-iterating. Also add tests for same * Switch to strings instead of slices * Style * More style fixes * Response to feedback before updating docs * Style * Add comment * Initialize version_ * Satisfy the linter * Fix style * snake_case * More snakes * LevelDBTransaction -> LevelDbTransaction --- .../firestore/local/leveldb_transaction.cc | 211 +++++++++++++++++++++ .../firebase/firestore/local/leveldb_transaction.h | 204 ++++++++++++++++++++ 2 files changed, 415 insertions(+) create mode 100644 Firestore/core/src/firebase/firestore/local/leveldb_transaction.cc create mode 100644 Firestore/core/src/firebase/firestore/local/leveldb_transaction.h (limited to 'Firestore/core/src/firebase/firestore/local') diff --git a/Firestore/core/src/firebase/firestore/local/leveldb_transaction.cc b/Firestore/core/src/firebase/firestore/local/leveldb_transaction.cc new file mode 100644 index 0000000..af72716 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/local/leveldb_transaction.cc @@ -0,0 +1,211 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Firestore/core/src/firebase/firestore/local/leveldb_transaction.h" + +#include + +#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h" + +using leveldb::DB; +using leveldb::ReadOptions; +using leveldb::Slice; +using leveldb::Status; +using leveldb::WriteBatch; +using leveldb::WriteOptions; + +namespace firebase { +namespace firestore { +namespace local { + +LevelDbTransaction::Iterator::Iterator(LevelDbTransaction* txn) + : db_iter_(txn->db_->NewIterator(txn->read_options_)), + last_version_(txn->version_), + txn_(txn), + mutations_iter_(txn->mutations_.begin()), + current_(), + is_mutation_(false), + // Iterator doesn't really point to anything yet, so is + // invalid + is_valid_(false) { +} + +void LevelDbTransaction::Iterator::UpdateCurrent() { + bool mutation_is_valid = mutations_iter_ != txn_->mutations_.end(); + is_valid_ = mutation_is_valid || db_iter_->Valid(); + + if (is_valid_) { + if (!mutation_is_valid) { + is_mutation_ = false; + } else if (!db_iter_->Valid()) { + is_mutation_ = true; + } else { + // Both iterators are valid. If the leveldb key is equal to or greater + // than the current mutation key, we are looking at a mutation next. It's + // either sooner in the iteration or directly shadowing the underlying + // committed value in leveldb. + is_mutation_ = db_iter_->key().compare(mutations_iter_->first) >= 0; + } + if (is_mutation_) { + current_ = *mutations_iter_; + } else { + current_ = {db_iter_->key().ToString(), db_iter_->value().ToString()}; + } + } +} + +void LevelDbTransaction::Iterator::Seek(const std::string& key) { + db_iter_->Seek(key); + for (; db_iter_->Valid() && IsDeleted(db_iter_->key()); db_iter_->Next()) { + } + mutations_iter_ = txn_->mutations_.lower_bound(key); + UpdateCurrent(); + last_version_ = txn_->version_; +} + +absl::string_view LevelDbTransaction::Iterator::key() { + FIREBASE_ASSERT_MESSAGE(Valid(), "key() called on invalid iterator"); + return current_.first; +} + +absl::string_view LevelDbTransaction::Iterator::value() { + FIREBASE_ASSERT_MESSAGE(Valid(), "value() called on invalid iterator"); + return current_.second; +} + +bool LevelDbTransaction::Iterator::IsDeleted(leveldb::Slice slice) { + return txn_->deletions_.find(slice.ToString()) != txn_->deletions_.end(); +} + +bool LevelDbTransaction::Iterator::SyncToTransaction() { + if (last_version_ < txn_->version_) { + // Intentionally copying here since Seek() may update current_. We need the + // copy to do the comparison below. + const std::string current_key = current_.first; + Seek(current_key); + // If we advanced, we don't need to advance again. + return is_valid_ && current_.first > current_key; + } else { + return false; + } +} + +void LevelDbTransaction::Iterator::AdvanceLDB() { + do { + db_iter_->Next(); + } while (db_iter_->Valid() && IsDeleted(db_iter_->key())); +} + +void LevelDbTransaction::Iterator::Next() { + FIREBASE_ASSERT_MESSAGE(Valid(), "Next() called on invalid iterator"); + bool advanced = SyncToTransaction(); + if (!advanced) { + if (is_mutation_) { + // A mutation might be shadowing leveldb. If so, advance both. + if (db_iter_->Valid() && db_iter_->key() == mutations_iter_->first) { + AdvanceLDB(); + } + ++mutations_iter_; + } else { + AdvanceLDB(); + } + UpdateCurrent(); + } +} + +bool LevelDbTransaction::Iterator::Valid() { + return is_valid_; +} + +LevelDbTransaction::LevelDbTransaction(DB* db, + const ReadOptions& read_options, + const WriteOptions& write_options) + : db_(db), + mutations_(), + deletions_(), + read_options_(read_options), + write_options_(write_options), + version_(0) { +} + +const ReadOptions& LevelDbTransaction::DefaultReadOptions() { + static ReadOptions options = ([]() { + ReadOptions read_options; + read_options.verify_checksums = true; + return read_options; + })(); + return options; +} + +const WriteOptions& LevelDbTransaction::DefaultWriteOptions() { + static WriteOptions options; + return options; +} + +void LevelDbTransaction::Put(const absl::string_view& key, + const absl::string_view& value) { + std::string key_string(key); + std::string value_string(value); + mutations_[key_string] = value_string; + deletions_.erase(key_string); + version_++; +} + +LevelDbTransaction::Iterator* LevelDbTransaction::NewIterator() { + return new LevelDbTransaction::Iterator(this); +} + +Status LevelDbTransaction::Get(const absl::string_view& key, + std::string* value) { + std::string key_string(key); + if (deletions_.find(key_string) != deletions_.end()) { + return Status::NotFound(key_string + " is not present in the transaction"); + } else { + Mutations::iterator iter(mutations_.find(key_string)); + if (iter != mutations_.end()) { + *value = iter->second; + return Status::OK(); + } else { + return db_->Get(read_options_, key_string, value); + } + } +} + +void LevelDbTransaction::Delete(const absl::string_view& key) { + std::string to_delete(key); + deletions_.insert(to_delete); + mutations_.erase(to_delete); + version_++; +} + +void LevelDbTransaction::Commit() { + WriteBatch batch; + for (auto it = deletions_.begin(); it != deletions_.end(); it++) { + batch.Delete(*it); + } + + for (auto it = mutations_.begin(); it != mutations_.end(); it++) { + batch.Put(it->first, it->second); + } + + Status status = db_->Write(write_options_, &batch); + FIREBASE_ASSERT_MESSAGE(status.ok(), "Failed to commit transaction: %s", + status.ToString().c_str()); +} + +} // namespace local +} // namespace firestore +} // namespace firebase diff --git a/Firestore/core/src/firebase/firestore/local/leveldb_transaction.h b/Firestore/core/src/firebase/firestore/local/leveldb_transaction.h new file mode 100644 index 0000000..015a6cb --- /dev/null +++ b/Firestore/core/src/firebase/firestore/local/leveldb_transaction.h @@ -0,0 +1,204 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_LOCAL_LEVELDB_TRANSACTION_H_ +#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_LOCAL_LEVELDB_TRANSACTION_H_ + +#include +#include + +#include +#include +#include +#include +#include +#include + +#if __OBJC__ +#import +#endif + +namespace firebase { +namespace firestore { +namespace local { + +/** + * LevelDBTransaction tracks pending changes to entries in leveldb, including + * deletions. It also provides an Iterator to traverse a merged view of pending + * changes and committed values. + */ +class LevelDbTransaction { + using Deletions = std::set; + using Mutations = std::map; + + public: + /** + * Iterator iterates over a merged view of pending changes from the + * transaction and any unchanged values in the underlying leveldb instance. + */ + class Iterator { + public: + explicit Iterator(LevelDbTransaction* txn); + + /** + * Returns true if this iterator points to an entry + */ + bool Valid(); + + /** + * Seeks this iterator to the first key equal to or greater than the given + * key + */ + void Seek(const std::string& key); + + /** + * Advances the iterator to the next entry + */ + void Next(); + + /** + * Returns the key of the current entry + */ + absl::string_view key(); + + /** + * Returns the value of the current entry + */ + absl::string_view value(); + + private: + /** + * Advances to the next non-deleted key in leveldb. + */ + void AdvanceLDB(); + + /** + * Returns true if the given slice matches a key present in the deletions_ + * set. + */ + bool IsDeleted(leveldb::Slice slice); + + /** + * Syncs with the underlying transaction. If the transaction has been + * updated, the mutation iterator may need to be reset. Returns true if this + * resulted in moving to a new underlying entry (i.e. the entry represented + * by current_ was deleted). + */ + bool SyncToTransaction(); + + /** + * Given the current state of the internal iterators, set is_valid_, + * is_mutation_, and current_. + */ + void UpdateCurrent(); + + std::unique_ptr db_iter_; + + // The last observed version of the underlying transaction + int32_t last_version_; + // The underlying transaction. + LevelDbTransaction* txn_; + Mutations::iterator mutations_iter_; + // We save the current key and value so that once an iterator is Valid(), it + // remains so at least until the next call to Seek() or Next(), even if the + // underlying data is deleted. + std::pair current_; + // True if current_ represents an entry in the mutations_ map, rather than + // committed data. + bool is_mutation_; + // True if the iterator pointed to a valid entry the last time Next() or + // Seek() was called. + bool is_valid_; + }; + + explicit LevelDbTransaction( + leveldb::DB* db, + const leveldb::ReadOptions& read_options = DefaultReadOptions(), + const leveldb::WriteOptions& write_options = DefaultWriteOptions()); + + LevelDbTransaction(const LevelDbTransaction& other) = delete; + + LevelDbTransaction& operator=(const LevelDbTransaction& other) = delete; + + /** + * Returns a default set of ReadOptions + */ + static const leveldb::ReadOptions& DefaultReadOptions(); + + /** + * Returns a default set of WriteOptions + */ + static const leveldb::WriteOptions& DefaultWriteOptions(); + + /** + * Remove the database entry (if any) for "key". It is not an error if "key" + * did not exist in the database. + */ + void Delete(const absl::string_view& key); + +#if __OBJC__ + /** + * Schedules the row identified by `key` to be set to the given protocol + * buffer message when this transaction commits. + */ + void Put(const absl::string_view& key, GPBMessage* message) { + NSData* data = [message data]; + std::string key_string(key); + mutations_[key_string] = std::string((const char*)data.bytes, data.length); + version_++; + } +#endif + + /** + * Schedules the row identified by `key` to be set to `value` when this + * transaction commits. + */ + void Put(const absl::string_view& key, const absl::string_view& value); + + /** + * Sets the contents of `value` to the latest known value for the given key, + * including any pending mutations and `Status::OK` is returned. If the key + * doesn't exist in leveldb, or it is scheduled for deletion in this + * transaction, `Status::NotFound` is returned. + */ + leveldb::Status Get(const absl::string_view& key, std::string* value); + + /** + * Returns a new Iterator over the pending changes in this transaction, merged + * with the existing values already in leveldb. + */ + Iterator* NewIterator(); + + /** + * Commits the transaction. All pending changes are written. The transaction + * should not be used after calling this method. + */ + void Commit(); + + private: + leveldb::DB* db_; + Mutations mutations_; + Deletions deletions_; + leveldb::ReadOptions read_options_; + leveldb::WriteOptions write_options_; + int32_t version_; +}; + +} // namespace local +} // namespace firestore +} // namespace firebase + +#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_LOCAL_LEVELDB_TRANSACTION_H_ -- cgit v1.2.3