From f77ec68a8862bd03b430deff48022ffb179172b0 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Mon, 26 Mar 2018 11:57:34 -0700 Subject: Switch LevelDB MutationQueue and RemoteDocumentCache to use transactions (#968) * Start work on leveldb transactions * Style * Working API. Not plumbed in yet * Move files into correct place * Wrangling file locations and associations * Tests pass * Add some comments * style * Fix copyright * Rewrite iterator internals to handle deletion-while-iterating. Also add tests for same * Switch to strings instead of slices * Style * More style fixes * Start switching writegroup over * Swap out write group tracking for transaction usage * Style * Response to feedback before updating docs * Style * Add comment * Initialize version_ * Satisfy the linter * Start switching writegroup over * Swap out write group tracking for transaction usage * Style * Checkpoint before implementing BatchDescription * Style * Initial plumbing for leveldb local parts * Add model::BatchId * Port leveldb_key.{h,cc} * Add string StartsWith * Add leveldb_key_test.cc to the project * Revert back to using leveldb::Slice for read/describe These operations universally operate on keys obtained from leveldb so it's actually unhelpful to force all the callers to make absl::string_views from them. * Everything passing * Drop unused function * Style * STart work on reads * Swap reads in queryCache to use transactions * Fix up tests of querycache * Drop commented out code * Cleanup * Style * Fix up for passing tests * style * Renaming * Style * Start work on ToString for transactions * Add ToString() method to LevelDbTransaction * Style * lint * Fix includes, drop runTransaction * current_transaction -> currentTransaction * LevelDbTransaction::NewIterator now returns a unique_ptr * Style * Revert addition of util::StartsWith * Add log line * Style * Add log line * Style * Add debug log line for commits, drop unused BatchDescription * STart work on reads * Swap reads in queryCache to use transactions * Start on remote documents * Transition mutation queue and remote documents to use transactions * Style * Make everything pass * Make everything pass * Make it compile * Style * Style * Revert name change, use DefaultReadOptions() * Style * Handle iterators returning bad statuses --- Firestore/Source/Local/FSTLevelDBMutationQueue.mm | 119 ++++++++-------------- 1 file changed, 43 insertions(+), 76 deletions(-) (limited to 'Firestore/Source/Local/FSTLevelDBMutationQueue.mm') diff --git a/Firestore/Source/Local/FSTLevelDBMutationQueue.mm b/Firestore/Source/Local/FSTLevelDBMutationQueue.mm index 221d56f..6d4bd29 100644 --- a/Firestore/Source/Local/FSTLevelDBMutationQueue.mm +++ b/Firestore/Source/Local/FSTLevelDBMutationQueue.mm @@ -16,11 +16,13 @@ #import "Firestore/Source/Local/FSTLevelDBMutationQueue.h" -#include -#include #include #include +#include +#include +#include + #import "Firestore/Protos/objc/firestore/local/Mutation.pbobjc.h" #import "Firestore/Source/Core/FSTQuery.h" #import "Firestore/Source/Local/FSTLevelDB.h" @@ -32,6 +34,7 @@ #import "Firestore/Source/Util/FSTAssert.h" #include "Firestore/core/src/firebase/firestore/auth/user.h" +#include "Firestore/core/src/firebase/firestore/local/leveldb_transaction.h" #include "Firestore/core/src/firebase/firestore/model/document_key.h" #include "Firestore/core/src/firebase/firestore/model/resource_path.h" #include "Firestore/core/src/firebase/firestore/util/string_apple.h" @@ -40,6 +43,7 @@ NS_ASSUME_NONNULL_BEGIN namespace util = firebase::firestore::util; +using firebase::firestore::local::LevelDbTransaction; using Firestore::StringView; using firebase::firestore::auth::User; using firebase::firestore::model::DocumentKey; @@ -55,7 +59,7 @@ using leveldb::WriteOptions; @interface FSTLevelDBMutationQueue () - (instancetype)initWithUserID:(NSString *)userID - db:(std::shared_ptr)db + db:(FSTLevelDB *)db serializer:(FSTLocalSerializer *)serializer NS_DESIGNATED_INITIALIZER; /** The normalized userID (e.g. nil UID => @"" userID) used in our LevelDB keys. */ @@ -77,24 +81,12 @@ using leveldb::WriteOptions; @end -/** - * Returns a standard set of read options. - * - * For now this is paranoid, but perhaps disable that in production builds. - */ -static ReadOptions StandardReadOptions() { - ReadOptions options; - options.verify_checksums = true; - return options; -} - @implementation FSTLevelDBMutationQueue { - // The DB pointer is shared with all cooperating LevelDB-related objects. - std::shared_ptr _db; + FSTLevelDB *_db; } + (instancetype)mutationQueueWithUser:(const User &)user - db:(std::shared_ptr)db + db:(FSTLevelDB *)db serializer:(FSTLocalSerializer *)serializer { NSString *userID = user.is_authenticated() ? util::WrapNSString(user.uid()) : @""; @@ -102,7 +94,7 @@ static ReadOptions StandardReadOptions() { } - (instancetype)initWithUserID:(NSString *)userID - db:(std::shared_ptr)db + db:(FSTLevelDB *)db serializer:(FSTLocalSerializer *)serializer { if (self = [super init]) { _userID = [userID copy]; @@ -113,7 +105,7 @@ static ReadOptions StandardReadOptions() { } - (void)startWithGroup:(FSTWriteGroup *)group { - FSTBatchID nextBatchID = [FSTLevelDBMutationQueue loadNextBatchIDFromDB:_db]; + FSTBatchID nextBatchID = [FSTLevelDBMutationQueue loadNextBatchIDFromDB:_db.ptr]; // On restart, nextBatchId may end up lower than lastAcknowledgedBatchId since it's computed from // the queue contents, and there may be no mutations in the queue. In this case, we need to reset @@ -142,11 +134,12 @@ static ReadOptions StandardReadOptions() { } - (void)shutdown { - _db.reset(); } + (FSTBatchID)loadNextBatchIDFromDB:(std::shared_ptr)db { - std::unique_ptr it(db->NewIterator(StandardReadOptions())); + // TODO(gsoltis): implement Prev() and SeekToLast() on LevelDbTransaction::Iterator, then port + // this to a transaction. + std::unique_ptr it(db->NewIterator(LevelDbTransaction::DefaultReadOptions())); auto tableKey = [FSTLevelDBMutationKey keyPrefix]; @@ -209,19 +202,14 @@ static ReadOptions StandardReadOptions() { - (BOOL)isEmpty { std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:self.userID]; - std::unique_ptr it(_db->NewIterator(StandardReadOptions())); + auto it = _db.currentTransaction->NewIterator(); it->Seek(userKey); BOOL empty = YES; - if (it->Valid() && it->key().starts_with(userKey)) { + if (it->Valid() && absl::StartsWith(it->key(), userKey)) { empty = NO; } - Status status = it->status(); - if (!status.ok()) { - FSTFail(@"isEmpty failed with status: %s", status.ToString().c_str()); - } - return empty; } @@ -260,7 +248,7 @@ static ReadOptions StandardReadOptions() { - (nullable FSTPBMutationQueue *)metadataForKey:(const std::string &)key { std::string value; - Status status = _db->Get(StandardReadOptions(), key, &value); + Status status = _db.currentTransaction->Get(key, &value); if (status.ok()) { return [self parsedMetadata:value]; } else if (status.IsNotFound()) { @@ -304,7 +292,7 @@ static ReadOptions StandardReadOptions() { std::string key = [self mutationKeyForBatchID:batchID]; std::string value; - Status status = _db->Get(StandardReadOptions(), key, &value); + Status status = _db.currentTransaction->Get(key, &value); if (!status.ok()) { if (status.IsNotFound()) { return nil; @@ -323,15 +311,9 @@ static ReadOptions StandardReadOptions() { FSTBatchID nextBatchID = MAX(batchID, self.metadata.lastAcknowledgedBatchId) + 1; std::string key = [self mutationKeyForBatchID:nextBatchID]; - std::unique_ptr it(_db->NewIterator(StandardReadOptions())); + auto it = _db.currentTransaction->NewIterator(); it->Seek(key); - Status status = it->status(); - if (!status.ok()) { - FSTFail(@"Seek to mutation batch (%@, %d) failed with status: %s", self.userID, batchID, - status.ToString().c_str()); - } - FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init]; if (!it->Valid() || ![rowKey decodeKey:it->key()]) { // Past the last row in the DB or out of the mutations table @@ -351,7 +333,7 @@ static ReadOptions StandardReadOptions() { std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:self.userID]; const char *userID = [self.userID UTF8String]; - std::unique_ptr it(_db->NewIterator(StandardReadOptions())); + auto it = _db.currentTransaction->NewIterator(); it->Seek(userKey); NSMutableArray *result = [NSMutableArray array]; @@ -368,12 +350,6 @@ static ReadOptions StandardReadOptions() { [result addObject:[self decodedMutationBatch:it->value()]]; } - Status status = it->status(); - if (!status.ok()) { - FSTFail(@"Find all mutations through mutation batch (%@, %d) failed with status: %s", - self.userID, batchID, status.ToString().c_str()); - } - return result; } @@ -384,26 +360,25 @@ static ReadOptions StandardReadOptions() { // Scan the document-mutation index starting with a prefix starting with the given documentKey. std::string indexPrefix = [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID resourcePath:documentKey.path()]; - std::unique_ptr indexIterator(_db->NewIterator(StandardReadOptions())); + auto indexIterator = _db.currentTransaction->NewIterator(); indexIterator->Seek(indexPrefix); // Simultaneously scan the mutation queue. This works because each (key, batchID) pair is unique // and ordered, so when scanning a table prefixed by exactly key, all the batchIDs encountered // will be unique and in order. std::string mutationsPrefix = [FSTLevelDBMutationKey keyPrefixWithUserID:userID]; - std::unique_ptr mutationIterator(_db->NewIterator(StandardReadOptions())); + auto mutationIterator = _db.currentTransaction->NewIterator(); NSMutableArray *result = [NSMutableArray array]; FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init]; for (; indexIterator->Valid(); indexIterator->Next()) { - Slice indexKey = indexIterator->key(); - // Only consider rows matching exactly the specific key of interest. Note that because we order // by path first, and we order terminators before path separators, we'll encounter all the // index rows for documentKey contiguously. In particular, all the rows for documentKey will // occur before any rows for documents nested in a subcollection beneath documentKey so we can // stop as soon as we hit any such row. - if (!indexKey.starts_with(indexPrefix) || ![rowKey decodeKey:indexKey] || + if (!absl::StartsWith(indexIterator->key(), indexPrefix) || + ![rowKey decodeKey:indexIterator->key()] || DocumentKey{rowKey.documentKey} != documentKey) { break; } @@ -420,8 +395,8 @@ static ReadOptions StandardReadOptions() { FSTFail( @"Dangling document-mutation reference found: " @"%@ points to %@; seeking there found %@", - [FSTLevelDBKey descriptionForKey:indexKey], [FSTLevelDBKey descriptionForKey:mutationKey], - foundKeyDescription); + [FSTLevelDBKey descriptionForKey:indexIterator->key()], + [FSTLevelDBKey descriptionForKey:mutationKey], foundKeyDescription); } [result addObject:[self decodedMutationBatch:mutationIterator->value()]]; @@ -451,7 +426,7 @@ static ReadOptions StandardReadOptions() { // unique nor in order. This means an efficient simultaneous scan isn't possible. std::string indexPrefix = [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID resourcePath:queryPath]; - std::unique_ptr indexIterator(_db->NewIterator(StandardReadOptions())); + auto indexIterator = _db.currentTransaction->NewIterator(); indexIterator->Seek(indexPrefix); NSMutableArray *result = [NSMutableArray array]; @@ -465,9 +440,8 @@ static ReadOptions StandardReadOptions() { // numbers of keys but > 30% faster for larger numbers of keys. std::set uniqueBatchIds; for (; indexIterator->Valid(); indexIterator->Next()) { - Slice indexKey = indexIterator->key(); - - if (!indexKey.starts_with(indexPrefix) || ![rowKey decodeKey:indexKey]) { + if (!absl::StartsWith(indexIterator->key(), indexPrefix) || + ![rowKey decodeKey:indexIterator->key()]) { break; } @@ -483,7 +457,7 @@ static ReadOptions StandardReadOptions() { // Given an ordered set of unique batchIDs perform a skipping scan over the main table to find // the mutation batches. - std::unique_ptr mutationIterator(_db->NewIterator(StandardReadOptions())); + auto mutationIterator = _db.currentTransaction->NewIterator(); for (FSTBatchID batchID : uniqueBatchIds) { std::string mutationKey = [FSTLevelDBMutationKey keyWithUserID:userID batchID:batchID]; @@ -507,19 +481,14 @@ static ReadOptions StandardReadOptions() { - (NSArray *)allMutationBatches { std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:self.userID]; - std::unique_ptr it(_db->NewIterator(StandardReadOptions())); + auto it = _db.currentTransaction->NewIterator(); it->Seek(userKey); NSMutableArray *result = [NSMutableArray array]; - for (; it->Valid() && it->key().starts_with(userKey); it->Next()) { + for (; it->Valid() && absl::StartsWith(it->key(), userKey); it->Next()) { [result addObject:[self decodedMutationBatch:it->value()]]; } - Status status = it->status(); - if (!status.ok()) { - FSTFail(@"Find all mutation batches failed with status: %s", status.ToString().c_str()); - } - return result; } @@ -527,7 +496,7 @@ static ReadOptions StandardReadOptions() { NSString *userID = self.userID; id garbageCollector = self.garbageCollector; - std::unique_ptr checkIterator(_db->NewIterator(StandardReadOptions())); + auto checkIterator = _db.currentTransaction->NewIterator(); for (FSTMutationBatch *batch in batches) { FSTBatchID batchID = batch.batchID; @@ -561,20 +530,18 @@ static ReadOptions StandardReadOptions() { // Verify that there are no entries in the document-mutation index if the queue is empty. std::string indexPrefix = [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID]; - std::unique_ptr indexIterator(_db->NewIterator(StandardReadOptions())); + auto indexIterator = _db.currentTransaction->NewIterator(); indexIterator->Seek(indexPrefix); NSMutableArray *danglingMutationReferences = [NSMutableArray array]; for (; indexIterator->Valid(); indexIterator->Next()) { - Slice indexKey = indexIterator->key(); - // Only consider rows matching this index prefix for the current user. - if (!indexKey.starts_with(indexPrefix)) { + if (!absl::StartsWith(indexIterator->key(), indexPrefix)) { break; } - [danglingMutationReferences addObject:[FSTLevelDBKey descriptionForKey:indexKey]]; + [danglingMutationReferences addObject:[FSTLevelDBKey descriptionForKey:indexIterator->key()]]; } FSTAssert(danglingMutationReferences.count == 0, @@ -605,9 +572,10 @@ static ReadOptions StandardReadOptions() { return proto; } -- (FSTMutationBatch *)decodedMutationBatch:(Slice)slice { - NSData *data = - [[NSData alloc] initWithBytesNoCopy:(void *)slice.data() length:slice.size() freeWhenDone:NO]; +- (FSTMutationBatch *)decodedMutationBatch:(absl::string_view)encoded { + NSData *data = [[NSData alloc] initWithBytesNoCopy:(void *)encoded.data() + length:encoded.size() + freeWhenDone:NO]; NSError *error; FSTPBWriteBatch *proto = [FSTPBWriteBatch parseFromData:data error:&error]; @@ -623,17 +591,16 @@ static ReadOptions StandardReadOptions() { - (BOOL)containsKey:(const DocumentKey &)documentKey { std::string indexPrefix = [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID resourcePath:documentKey.path()]; - std::unique_ptr indexIterator(_db->NewIterator(StandardReadOptions())); + auto indexIterator = _db.currentTransaction->NewIterator(); indexIterator->Seek(indexPrefix); if (indexIterator->Valid()) { FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init]; - Slice iteratorKey = indexIterator->key(); // Check both that the key prefix matches and that the decoded document key is exactly the key // we're looking for. - if (iteratorKey.starts_with(indexPrefix) && [rowKey decodeKey:iteratorKey] && - DocumentKey{rowKey.documentKey} == documentKey) { + if (absl::StartsWith(indexIterator->key(), indexPrefix) && + [rowKey decodeKey:indexIterator->key()] && DocumentKey{rowKey.documentKey} == documentKey) { return YES; } } -- cgit v1.2.3