From bde743ed25166a0b320ae157bfb1d68064f531c9 Mon Sep 17 00:00:00 2001 From: Gil Date: Tue, 3 Oct 2017 08:55:22 -0700 Subject: Release 4.3.0 (#327) Initial release of Firestore at 0.8.0 Bump FirebaseCommunity to 0.1.3 --- Firestore/Source/Local/FSTLevelDBMutationQueue.mm | 637 ++++++++++++++++++++++ 1 file changed, 637 insertions(+) create mode 100644 Firestore/Source/Local/FSTLevelDBMutationQueue.mm (limited to 'Firestore/Source/Local/FSTLevelDBMutationQueue.mm') diff --git a/Firestore/Source/Local/FSTLevelDBMutationQueue.mm b/Firestore/Source/Local/FSTLevelDBMutationQueue.mm new file mode 100644 index 0000000..d57a15d --- /dev/null +++ b/Firestore/Source/Local/FSTLevelDBMutationQueue.mm @@ -0,0 +1,637 @@ +/* + * Copyright 2017 Google + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#import "FSTLevelDBMutationQueue.h" + +#include +#include +#include +#include + +#import "Mutation.pbobjc.h" +#import "FSTUser.h" +#import "FSTQuery.h" +#import "FSTLevelDB.h" +#import "FSTLevelDBKey.h" +#import "FSTLocalSerializer.h" +#import "FSTWriteGroup.h" +#import "FSTDocumentKey.h" +#import "FSTMutation.h" +#import "FSTMutationBatch.h" +#import "FSTPath.h" +#import "FSTAssert.h" + +#include "ordered_code.h" +#include "string_util.h" + +NS_ASSUME_NONNULL_BEGIN + +using Firestore::OrderedCode; +using Firestore::StringView; +using leveldb::DB; +using leveldb::Iterator; +using leveldb::ReadOptions; +using leveldb::Slice; +using leveldb::Status; +using leveldb::WriteBatch; +using leveldb::WriteOptions; + +@interface FSTLevelDBMutationQueue () + +- (instancetype)initWithUserID:(NSString *)userID + db:(std::shared_ptr)db + serializer:(FSTLocalSerializer *)serializer NS_DESIGNATED_INITIALIZER; + +/** The normalized userID (e.g. nil UID => @"" userID) used in our LevelDB keys. */ +@property(nonatomic, strong, readonly) NSString *userID; + +/** + * Next value to use when assigning sequential IDs to each mutation batch. + * + * NOTE: There can only be one FSTLevelDBMutationQueue for a given db at a time, hence it is safe + * to track nextBatchID as an instance-level property. Should we ever relax this constraint we'll + * need to revisit this. + */ +@property(nonatomic, assign) FSTBatchID nextBatchID; + +/** A write-through cache copy of the metadata describing the current queue. */ +@property(nonatomic, strong, nullable) FSTPBMutationQueue *metadata; + +@property(nonatomic, strong, readonly) FSTLocalSerializer *serializer; + +@end + +/** + * Returns a standard set of read options. + * + * For now this is paranoid, but perhaps disable that in production builds. + */ +static ReadOptions StandardReadOptions() { + ReadOptions options; + options.verify_checksums = true; + return options; +} + +@implementation FSTLevelDBMutationQueue { + // The DB pointer is shared with all cooperating LevelDB-related objects. + std::shared_ptr _db; +} + ++ (instancetype)mutationQueueWithUser:(FSTUser *)user + db:(std::shared_ptr)db + serializer:(FSTLocalSerializer *)serializer { + FSTAssert(![user.UID isEqual:@""], @"UserID must not be an empty string."); + NSString *userID = user.isUnauthenticated ? @"" : user.UID; + + return [[FSTLevelDBMutationQueue alloc] initWithUserID:userID db:db serializer:serializer]; +} + +- (instancetype)initWithUserID:(NSString *)userID + db:(std::shared_ptr)db + serializer:(FSTLocalSerializer *)serializer { + if (self = [super init]) { + _userID = userID; + _db = db; + _serializer = serializer; + } + return self; +} + +- (void)startWithGroup:(FSTWriteGroup *)group { + FSTBatchID nextBatchID = [FSTLevelDBMutationQueue loadNextBatchIDFromDB:_db]; + + // On restart, nextBatchId may end up lower than lastAcknowledgedBatchId since it's computed from + // the queue contents, and there may be no mutations in the queue. In this case, we need to reset + // lastAcknowledgedBatchId (which is safe since the queue must be empty). + std::string key = [self keyForCurrentMutationQueue]; + FSTPBMutationQueue *metadata = [self metadataForKey:key]; + if (!metadata) { + metadata = [FSTPBMutationQueue message]; + + // proto3's default value for lastAcknowledgedBatchId is zero, but that would consider the first + // entry in the queue to be acknowledged without that acknowledgement actually happening. + metadata.lastAcknowledgedBatchId = kFSTBatchIDUnknown; + } else { + FSTBatchID lastAcked = metadata.lastAcknowledgedBatchId; + if (lastAcked >= nextBatchID) { + FSTAssert([self isEmpty], @"Reset nextBatchID is only possible when the queue is empty"); + lastAcked = kFSTBatchIDUnknown; + + metadata.lastAcknowledgedBatchId = lastAcked; + [group setMessage:metadata forKey:[self keyForCurrentMutationQueue]]; + } + } + + self.nextBatchID = nextBatchID; + self.metadata = metadata; +} + +- (void)shutdown { + _db.reset(); +} + ++ (FSTBatchID)loadNextBatchIDFromDB:(std::shared_ptr)db { + std::unique_ptr it(db->NewIterator(StandardReadOptions())); + + auto tableKey = [FSTLevelDBMutationKey keyPrefix]; + + FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init]; + FSTBatchID maxBatchID = kFSTBatchIDUnknown; + + BOOL moreUserIDs = NO; + std::string nextUserID; + + it->Seek(tableKey); + if (it->Valid() && [rowKey decodeKey:it->key()]) { + moreUserIDs = YES; + nextUserID = rowKey.userID; + } + + // This loop assumes that nextUserId contains the next username at the start of the iteration. + while (moreUserIDs) { + // Compute the first key after the last mutation for nextUserID. + auto userEnd = [FSTLevelDBMutationKey keyPrefixWithUserID:nextUserID]; + userEnd = Firestore::PrefixSuccessor(userEnd); + + // Seek to that key with the intent of finding the boundary between nextUserID's mutations + // and the one after that (if any). + it->Seek(userEnd); + + // At this point there are three possible cases to handle differently. Each case must prepare + // the next iteration (by assigning to nextUserID or setting moreUserIDs = NO) and seek the + // iterator to the last row in the current user's mutation sequence. + if (!it->Valid()) { + // The iterator is past the last row altogether (there are no additional userIDs and now + // rows in any table after mutations). The last row will have the highest batchID. + moreUserIDs = NO; + it->SeekToLast(); + + } else if ([rowKey decodeKey:it->key()]) { + // The iterator is valid and the key decoded successfully so the next user was just decoded. + nextUserID = rowKey.userID; + it->Prev(); + + } else { + // The iterator is past the end of the mutations table but there are other rows. + moreUserIDs = NO; + it->Prev(); + } + + // In all the cases above there was at least one row for the current user and each case has + // set things up such that iterator points to it. + if (![rowKey decodeKey:it->key()]) { + FSTFail(@"There should have been a key previous to %s", userEnd.c_str()); + } + + if (rowKey.batchID > maxBatchID) { + maxBatchID = rowKey.batchID; + } + } + + return maxBatchID + 1; +} + +- (BOOL)isEmpty { + std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:self.userID]; + + std::unique_ptr it(_db->NewIterator(StandardReadOptions())); + it->Seek(userKey); + + BOOL empty = YES; + if (it->Valid() && it->key().starts_with(userKey)) { + empty = NO; + } + + Status status = it->status(); + if (!status.ok()) { + FSTFail(@"isEmpty failed with status: %s", status.ToString().c_str()); + } + + return empty; +} + +- (FSTBatchID)highestAcknowledgedBatchID { + return self.metadata.lastAcknowledgedBatchId; +} + +- (void)acknowledgeBatch:(FSTMutationBatch *)batch + streamToken:(nullable NSData *)streamToken + group:(FSTWriteGroup *)group { + FSTBatchID batchID = batch.batchID; + FSTAssert(batchID > self.highestAcknowledgedBatchID, + @"Mutation batchIDs must be acknowledged in order"); + + FSTPBMutationQueue *metadata = self.metadata; + metadata.lastAcknowledgedBatchId = batchID; + metadata.lastStreamToken = streamToken; + + [group setMessage:metadata forKey:[self keyForCurrentMutationQueue]]; +} + +- (nullable NSData *)lastStreamToken { + return self.metadata.lastStreamToken; +} + +- (void)setLastStreamToken:(nullable NSData *)streamToken group:(FSTWriteGroup *)group { + FSTPBMutationQueue *metadata = self.metadata; + metadata.lastStreamToken = streamToken; + + [group setMessage:metadata forKey:[self keyForCurrentMutationQueue]]; +} + +- (std::string)keyForCurrentMutationQueue { + return [FSTLevelDBMutationQueueKey keyWithUserID:self.userID]; +} + +- (nullable FSTPBMutationQueue *)metadataForKey:(const std::string &)key { + std::string value; + Status status = _db->Get(StandardReadOptions(), key, &value); + if (status.ok()) { + return [self parsedMetadata:value]; + } else if (status.IsNotFound()) { + return nil; + } else { + FSTFail(@"metadataForKey: failed loading key %s with status: %s", key.c_str(), + status.ToString().c_str()); + } +} + +- (FSTMutationBatch *)addMutationBatchWithWriteTime:(FSTTimestamp *)localWriteTime + mutations:(NSArray *)mutations + group:(FSTWriteGroup *)group { + FSTBatchID batchID = self.nextBatchID; + self.nextBatchID += 1; + + FSTMutationBatch *batch = [[FSTMutationBatch alloc] initWithBatchID:batchID + localWriteTime:localWriteTime + mutations:mutations]; + std::string key = [self mutationKeyForBatch:batch]; + [group setMessage:[self.serializer encodedMutationBatch:batch] forKey:key]; + + NSString *userID = self.userID; + + // Store an empty value in the index which is equivalent to serializing a GPBEmpty message. In the + // future if we wanted to store some other kind of value here, we can parse these empty values as + // with some other protocol buffer (and the parser will see all default values). + std::string emptyBuffer; + + for (FSTMutation *mutation in mutations) { + key = [FSTLevelDBDocumentMutationKey keyWithUserID:userID + documentKey:mutation.key + batchID:batchID]; + [group setData:emptyBuffer forKey:key]; + } + + return batch; +} + +- (nullable FSTMutationBatch *)lookupMutationBatch:(FSTBatchID)batchID { + std::string key = [self mutationKeyForBatchID:batchID]; + + std::string value; + Status status = _db->Get(StandardReadOptions(), key, &value); + if (!status.ok()) { + if (status.IsNotFound()) { + return nil; + } + FSTFail(@"Lookup mutation batch (%@, %d) failed with status: %s", self.userID, batchID, + status.ToString().c_str()); + } + + return [self decodedMutationBatch:value]; +} + +- (nullable FSTMutationBatch *)nextMutationBatchAfterBatchID:(FSTBatchID)batchID { + std::string key = [self mutationKeyForBatchID:batchID + 1]; + std::unique_ptr it(_db->NewIterator(StandardReadOptions())); + it->Seek(key); + + Status status = it->status(); + if (!status.ok()) { + FSTFail(@"Seek to mutation batch (%@, %d) failed with status: %s", self.userID, batchID, + status.ToString().c_str()); + } + + FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init]; + if (!it->Valid() || ![rowKey decodeKey:it->key()]) { + // Past the last row in the DB or out of the mutations table + return nil; + } + + if (rowKey.userID != [self.userID UTF8String]) { + // Jumped past the last mutation for this user + return nil; + } + + FSTAssert(rowKey.batchID > batchID, @"Should have found mutation after %d", batchID); + return [self decodedMutationBatch:it->value()]; +} + +- (NSArray *)allMutationBatchesThroughBatchID:(FSTBatchID)batchID { + std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:self.userID]; + const char *userID = [self.userID UTF8String]; + + std::unique_ptr it(_db->NewIterator(StandardReadOptions())); + it->Seek(userKey); + + NSMutableArray *result = [NSMutableArray array]; + FSTLevelDBMutationKey *rowKey = [[FSTLevelDBMutationKey alloc] init]; + for (; it->Valid() && [rowKey decodeKey:it->key()]; it->Next()) { + if (rowKey.userID != userID) { + // End of this user's mutations + break; + } else if (rowKey.batchID > batchID) { + // This mutation is past what we're looking for + break; + } + + [result addObject:[self decodedMutationBatch:it->value()]]; + } + + Status status = it->status(); + if (!status.ok()) { + FSTFail(@"Find all mutations through mutation batch (%@, %d) failed with status: %s", + self.userID, batchID, status.ToString().c_str()); + } + + return result; +} + +- (NSArray *)allMutationBatchesAffectingDocumentKey: + (FSTDocumentKey *)documentKey { + NSString *userID = self.userID; + + // Scan the document-mutation index starting with a prefix starting with the given documentKey. + std::string indexPrefix = + [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID resourcePath:documentKey.path]; + std::unique_ptr indexIterator(_db->NewIterator(StandardReadOptions())); + indexIterator->Seek(indexPrefix); + + // Simultaneously scan the mutation queue. This works because each (key, batchID) pair is unique + // and ordered, so when scanning a table prefixed by exactly key, all the batchIDs encountered + // will be unique and in order. + std::string mutationsPrefix = [FSTLevelDBMutationKey keyPrefixWithUserID:userID]; + std::unique_ptr mutationIterator(_db->NewIterator(StandardReadOptions())); + + NSMutableArray *result = [NSMutableArray array]; + FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init]; + for (; indexIterator->Valid(); indexIterator->Next()) { + Slice indexKey = indexIterator->key(); + + // Only consider rows matching exactly the specific key of interest. Note that because we order + // by path first, and we order terminators before path separators, we'll encounter all the + // index rows for documentKey contiguously. In particular, all the rows for documentKey will + // occur before any rows for documents nested in a subcollection beneath documentKey so we can + // stop as soon as we hit any such row. + if (!indexKey.starts_with(indexPrefix) || ![rowKey decodeKey:indexKey] || + ![rowKey.documentKey isEqualToKey:documentKey]) { + break; + } + + // Each row is a unique combination of key and batchID, so this foreign key reference can + // only occur once. + std::string mutationKey = [FSTLevelDBMutationKey keyWithUserID:userID batchID:rowKey.batchID]; + mutationIterator->Seek(mutationKey); + if (!mutationIterator->Valid() || mutationIterator->key() != mutationKey) { + NSString *foundKeyDescription = @"the end of the table"; + if (mutationIterator->Valid()) { + foundKeyDescription = [FSTLevelDBKey descriptionForKey:mutationIterator->key()]; + } + FSTFail(@"Dangling document-mutation reference found: " + @"%@ points to %@; seeking there found %@", + [FSTLevelDBKey descriptionForKey:indexKey], + [FSTLevelDBKey descriptionForKey:mutationKey], foundKeyDescription); + } + + [result addObject:[self decodedMutationBatch:mutationIterator->value()]]; + } + return result; +} + +- (NSArray *)allMutationBatchesAffectingQuery:(FSTQuery *)query { + FSTAssert(![query isDocumentQuery], @"Document queries shouldn't go down this path"); + NSString *userID = self.userID; + + FSTResourcePath *queryPath = query.path; + int immediateChildrenPathLength = queryPath.length + 1; + + // TODO(mcg): Actually implement a single-collection query + // + // This is actually executing an ancestor query, traversing the whole subtree below the + // collection which can be horrifically inefficient for some structures. The right way to + // solve this is to implement the full value index, but that's not in the cards in the near + // future so this is the best we can do for the moment. + // + // Since we don't yet index the actual properties in the mutations, our current approach is to + // just return all mutation batches that affect documents in the collection being queried. + // + // Unlike allMutationBatchesAffectingDocumentKey, this iteration will scan the document-mutation + // index for more than a single document so the associated batchIDs will be neither necessarily + // unique nor in order. This means an efficient simultaneous scan isn't possible. + std::string indexPrefix = + [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID resourcePath:queryPath]; + std::unique_ptr indexIterator(_db->NewIterator(StandardReadOptions())); + indexIterator->Seek(indexPrefix); + + NSMutableArray *result = [NSMutableArray array]; + FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init]; + + // Collect up unique batchIDs encountered during a scan of the index. Use a set to + // accumulate batch IDs so they can be traversed in order in a scan of the main table. + // + // This method is faster than performing lookups of the keys with _db->Get and keeping a hash of + // batchIDs that have already been looked up. The performance difference is minor for small + // numbers of keys but > 30% faster for larger numbers of keys. + std::set uniqueBatchIds; + for (; indexIterator->Valid(); indexIterator->Next()) { + Slice indexKey = indexIterator->key(); + + if (!indexKey.starts_with(indexPrefix) || ![rowKey decodeKey:indexKey]) { + break; + } + + // Rows with document keys more than one segment longer than the query path can't be matches. + // For example, a query on 'rooms' can't match the document /rooms/abc/messages/xyx. + // TODO(mcg): we'll need a different scanner when we implement ancestor queries. + if (rowKey.documentKey.path.length != immediateChildrenPathLength) { + continue; + } + + uniqueBatchIds.insert(rowKey.batchID); + } + + // Given an ordered set of unique batchIDs perform a skipping scan over the main table to find + // the mutation batches. + std::unique_ptr mutationIterator(_db->NewIterator(StandardReadOptions())); + + for (FSTBatchID batchID : uniqueBatchIds) { + std::string mutationKey = [FSTLevelDBMutationKey keyWithUserID:userID batchID:batchID]; + mutationIterator->Seek(mutationKey); + if (!mutationIterator->Valid() || mutationIterator->key() != mutationKey) { + NSString *foundKeyDescription = @"the end of the table"; + if (mutationIterator->Valid()) { + foundKeyDescription = [FSTLevelDBKey descriptionForKey:mutationIterator->key()]; + } + FSTFail(@"Dangling document-mutation reference found: " + @"Missing batch %@; seeking there found %@", + [FSTLevelDBKey descriptionForKey:mutationKey], foundKeyDescription); + } + + [result addObject:[self decodedMutationBatch:mutationIterator->value()]]; + } + return result; +} + +- (NSArray *)allMutationBatches { + std::string userKey = [FSTLevelDBMutationKey keyPrefixWithUserID:self.userID]; + + std::unique_ptr it(_db->NewIterator(StandardReadOptions())); + it->Seek(userKey); + + NSMutableArray *result = [NSMutableArray array]; + for (; it->Valid() && it->key().starts_with(userKey); it->Next()) { + [result addObject:[self decodedMutationBatch:it->value()]]; + } + + Status status = it->status(); + if (!status.ok()) { + FSTFail(@"Find all mutation batches failed with status: %s", status.ToString().c_str()); + } + + return result; +} + +- (void)removeMutationBatches:(NSArray *)batches group:(FSTWriteGroup *)group { + NSString *userID = self.userID; + id garbageCollector = self.garbageCollector; + + std::unique_ptr checkIterator(_db->NewIterator(StandardReadOptions())); + + for (FSTMutationBatch *batch in batches) { + FSTBatchID batchID = batch.batchID; + std::string key = [FSTLevelDBMutationKey keyWithUserID:userID batchID:batchID]; + + // As a sanity check, verify that the mutation batch exists before deleting it. + checkIterator->Seek(key); + FSTAssert(checkIterator->Valid(), @"Mutation batch %@ did not exist", + [FSTLevelDBKey descriptionForKey:key]); + + FSTAssert(key == checkIterator->key(), @"Mutation batch %@ not found; found %@", + [FSTLevelDBKey descriptionForKey:key], + [FSTLevelDBKey descriptionForKey:checkIterator->key()]); + + [group removeMessageForKey:key]; + + for (FSTMutation *mutation in batch.mutations) { + key = [FSTLevelDBDocumentMutationKey keyWithUserID:userID + documentKey:mutation.key + batchID:batchID]; + [group removeMessageForKey:key]; + [garbageCollector addPotentialGarbageKey:mutation.key]; + } + } +} + +- (void)performConsistencyCheck { + if (![self isEmpty]) { + return; + } + + // Verify that there are no entries in the document-mutation index if the queue is empty. + std::string indexPrefix = [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID]; + std::unique_ptr indexIterator(_db->NewIterator(StandardReadOptions())); + indexIterator->Seek(indexPrefix); + + NSMutableArray *danglingMutationReferences = [NSMutableArray array]; + + for (; indexIterator->Valid(); indexIterator->Next()) { + Slice indexKey = indexIterator->key(); + + // Only consider rows matching this index prefix for the current user. + if (!indexKey.starts_with(indexPrefix)) { + break; + } + + [danglingMutationReferences addObject:[FSTLevelDBKey descriptionForKey:indexKey]]; + } + + FSTAssert(danglingMutationReferences.count == 0, + @"Document leak -- detected dangling mutation references when queue " + @"is empty. Dangling keys: %@", + danglingMutationReferences); +} + +- (std::string)mutationKeyForBatch:(FSTMutationBatch *)batch { + return [FSTLevelDBMutationKey keyWithUserID:self.userID batchID:batch.batchID]; +} + +- (std::string)mutationKeyForBatchID:(FSTBatchID)batchID { + return [FSTLevelDBMutationKey keyWithUserID:self.userID batchID:batchID]; +} + +/** Parses the MutationQueue metadata from the given LevelDB row contents. */ +- (FSTPBMutationQueue *)parsedMetadata:(Slice)slice { + NSData *data = + [[NSData alloc] initWithBytesNoCopy:(void *)slice.data() length:slice.size() freeWhenDone:NO]; + + NSError *error; + FSTPBMutationQueue *proto = [FSTPBMutationQueue parseFromData:data error:&error]; + if (!proto) { + FSTFail(@"FSTPBMutationQueue failed to parse: %@", error); + } + + return proto; +} + +- (FSTMutationBatch *)decodedMutationBatch:(Slice)slice { + NSData *data = + [[NSData alloc] initWithBytesNoCopy:(void *)slice.data() length:slice.size() freeWhenDone:NO]; + + NSError *error; + FSTPBWriteBatch *proto = [FSTPBWriteBatch parseFromData:data error:&error]; + if (!proto) { + FSTFail(@"FSTPBMutationBatch failed to parse: %@", error); + } + + return [self.serializer decodedMutationBatch:proto]; +} + +#pragma mark - FSTGarbageSource implementation + +- (BOOL)containsKey:(FSTDocumentKey *)documentKey { + std::string indexPrefix = + [FSTLevelDBDocumentMutationKey keyPrefixWithUserID:self.userID resourcePath:documentKey.path]; + std::unique_ptr indexIterator(_db->NewIterator(StandardReadOptions())); + indexIterator->Seek(indexPrefix); + + if (indexIterator->Valid()) { + FSTLevelDBDocumentMutationKey *rowKey = [[FSTLevelDBDocumentMutationKey alloc] init]; + Slice iteratorKey = indexIterator->key(); + + // Check both that the key prefix matches and that the decoded document key is exactly the key + // we're looking for. + if (iteratorKey.starts_with(indexPrefix) && [rowKey decodeKey:iteratorKey] && + [rowKey.documentKey isEqualToKey:documentKey]) { + return YES; + } + } + + return NO; +} + +@end + +NS_ASSUME_NONNULL_END -- cgit v1.2.3