From 9060078609232082a989c7906e8d53b6c3b490b4 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 15 Mar 2018 16:52:13 -0700 Subject: Implements transactions for leveldb (#881) * Start work on leveldb transactions * Style * Working API. Not plumbed in yet * Move files into correct place * Wrangling file locations and associations * Tests pass * Add some comments * style * Fix copyright * Rewrite iterator internals to handle deletion-while-iterating. Also add tests for same * Switch to strings instead of slices * Style * More style fixes * Response to feedback before updating docs * Style * Add comment * Initialize version_ * Satisfy the linter * Fix style * snake_case * More snakes * LevelDBTransaction -> LevelDbTransaction --- .../firestore/local/leveldb_transaction.cc | 211 +++++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 Firestore/core/src/firebase/firestore/local/leveldb_transaction.cc (limited to 'Firestore/core/src/firebase/firestore/local/leveldb_transaction.cc') diff --git a/Firestore/core/src/firebase/firestore/local/leveldb_transaction.cc b/Firestore/core/src/firebase/firestore/local/leveldb_transaction.cc new file mode 100644 index 0000000..af72716 --- /dev/null +++ b/Firestore/core/src/firebase/firestore/local/leveldb_transaction.cc @@ -0,0 +1,211 @@ +/* + * Copyright 2018 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Firestore/core/src/firebase/firestore/local/leveldb_transaction.h" + +#include + +#include "Firestore/core/src/firebase/firestore/util/firebase_assert.h" + +using leveldb::DB; +using leveldb::ReadOptions; +using leveldb::Slice; +using leveldb::Status; +using leveldb::WriteBatch; +using leveldb::WriteOptions; + +namespace firebase { +namespace firestore { +namespace local { + +LevelDbTransaction::Iterator::Iterator(LevelDbTransaction* txn) + : db_iter_(txn->db_->NewIterator(txn->read_options_)), + last_version_(txn->version_), + txn_(txn), + mutations_iter_(txn->mutations_.begin()), + current_(), + is_mutation_(false), + // Iterator doesn't really point to anything yet, so is + // invalid + is_valid_(false) { +} + +void LevelDbTransaction::Iterator::UpdateCurrent() { + bool mutation_is_valid = mutations_iter_ != txn_->mutations_.end(); + is_valid_ = mutation_is_valid || db_iter_->Valid(); + + if (is_valid_) { + if (!mutation_is_valid) { + is_mutation_ = false; + } else if (!db_iter_->Valid()) { + is_mutation_ = true; + } else { + // Both iterators are valid. If the leveldb key is equal to or greater + // than the current mutation key, we are looking at a mutation next. It's + // either sooner in the iteration or directly shadowing the underlying + // committed value in leveldb. + is_mutation_ = db_iter_->key().compare(mutations_iter_->first) >= 0; + } + if (is_mutation_) { + current_ = *mutations_iter_; + } else { + current_ = {db_iter_->key().ToString(), db_iter_->value().ToString()}; + } + } +} + +void LevelDbTransaction::Iterator::Seek(const std::string& key) { + db_iter_->Seek(key); + for (; db_iter_->Valid() && IsDeleted(db_iter_->key()); db_iter_->Next()) { + } + mutations_iter_ = txn_->mutations_.lower_bound(key); + UpdateCurrent(); + last_version_ = txn_->version_; +} + +absl::string_view LevelDbTransaction::Iterator::key() { + FIREBASE_ASSERT_MESSAGE(Valid(), "key() called on invalid iterator"); + return current_.first; +} + +absl::string_view LevelDbTransaction::Iterator::value() { + FIREBASE_ASSERT_MESSAGE(Valid(), "value() called on invalid iterator"); + return current_.second; +} + +bool LevelDbTransaction::Iterator::IsDeleted(leveldb::Slice slice) { + return txn_->deletions_.find(slice.ToString()) != txn_->deletions_.end(); +} + +bool LevelDbTransaction::Iterator::SyncToTransaction() { + if (last_version_ < txn_->version_) { + // Intentionally copying here since Seek() may update current_. We need the + // copy to do the comparison below. + const std::string current_key = current_.first; + Seek(current_key); + // If we advanced, we don't need to advance again. + return is_valid_ && current_.first > current_key; + } else { + return false; + } +} + +void LevelDbTransaction::Iterator::AdvanceLDB() { + do { + db_iter_->Next(); + } while (db_iter_->Valid() && IsDeleted(db_iter_->key())); +} + +void LevelDbTransaction::Iterator::Next() { + FIREBASE_ASSERT_MESSAGE(Valid(), "Next() called on invalid iterator"); + bool advanced = SyncToTransaction(); + if (!advanced) { + if (is_mutation_) { + // A mutation might be shadowing leveldb. If so, advance both. + if (db_iter_->Valid() && db_iter_->key() == mutations_iter_->first) { + AdvanceLDB(); + } + ++mutations_iter_; + } else { + AdvanceLDB(); + } + UpdateCurrent(); + } +} + +bool LevelDbTransaction::Iterator::Valid() { + return is_valid_; +} + +LevelDbTransaction::LevelDbTransaction(DB* db, + const ReadOptions& read_options, + const WriteOptions& write_options) + : db_(db), + mutations_(), + deletions_(), + read_options_(read_options), + write_options_(write_options), + version_(0) { +} + +const ReadOptions& LevelDbTransaction::DefaultReadOptions() { + static ReadOptions options = ([]() { + ReadOptions read_options; + read_options.verify_checksums = true; + return read_options; + })(); + return options; +} + +const WriteOptions& LevelDbTransaction::DefaultWriteOptions() { + static WriteOptions options; + return options; +} + +void LevelDbTransaction::Put(const absl::string_view& key, + const absl::string_view& value) { + std::string key_string(key); + std::string value_string(value); + mutations_[key_string] = value_string; + deletions_.erase(key_string); + version_++; +} + +LevelDbTransaction::Iterator* LevelDbTransaction::NewIterator() { + return new LevelDbTransaction::Iterator(this); +} + +Status LevelDbTransaction::Get(const absl::string_view& key, + std::string* value) { + std::string key_string(key); + if (deletions_.find(key_string) != deletions_.end()) { + return Status::NotFound(key_string + " is not present in the transaction"); + } else { + Mutations::iterator iter(mutations_.find(key_string)); + if (iter != mutations_.end()) { + *value = iter->second; + return Status::OK(); + } else { + return db_->Get(read_options_, key_string, value); + } + } +} + +void LevelDbTransaction::Delete(const absl::string_view& key) { + std::string to_delete(key); + deletions_.insert(to_delete); + mutations_.erase(to_delete); + version_++; +} + +void LevelDbTransaction::Commit() { + WriteBatch batch; + for (auto it = deletions_.begin(); it != deletions_.end(); it++) { + batch.Delete(*it); + } + + for (auto it = mutations_.begin(); it != mutations_.end(); it++) { + batch.Put(it->first, it->second); + } + + Status status = db_->Write(write_options_, &batch); + FIREBASE_ASSERT_MESSAGE(status.ok(), "Failed to commit transaction: %s", + status.ToString().c_str()); +} + +} // namespace local +} // namespace firestore +} // namespace firebase -- cgit v1.2.3