aboutsummaryrefslogtreecommitdiffhomepage
path: root/Firebase/Messaging/FIRMessagingRmqManager.m
diff options
context:
space:
mode:
authorGravatar Paul Beusterien <paulbeusterien@google.com>2017-05-15 12:27:07 -0700
committerGravatar Paul Beusterien <paulbeusterien@google.com>2017-05-15 12:27:07 -0700
commit98ba64449a632518bd2b86fe8d927f4a960d3ddc (patch)
tree131d9c4272fa6179fcda6c5a33fcb3b1bd57ad2e /Firebase/Messaging/FIRMessagingRmqManager.m
parent32461366c9e204a527ca05e6e9b9404a2454ac51 (diff)
Initial
Diffstat (limited to 'Firebase/Messaging/FIRMessagingRmqManager.m')
-rw-r--r--Firebase/Messaging/FIRMessagingRmqManager.m264
1 files changed, 264 insertions, 0 deletions
diff --git a/Firebase/Messaging/FIRMessagingRmqManager.m b/Firebase/Messaging/FIRMessagingRmqManager.m
new file mode 100644
index 0000000..de63a73
--- /dev/null
+++ b/Firebase/Messaging/FIRMessagingRmqManager.m
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2017 Google
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#import "FIRMessagingRmqManager.h"
+
+#import "Protos/GtalkCore.pbobjc.h"
+#import "sqlite3.h"
+
+#import "FIRMessagingDefines.h"
+#import "FIRMessagingLogger.h"
+#import "FIRMessagingRmq2PersistentStore.h"
+#import "FIRMessagingUtilities.h"
+
+#ifndef _FIRMessagingRmqLogAndExit
+#define _FIRMessagingRmqLogAndExit(stmt, return_value) \
+do { \
+ [self logErrorAndFinalizeStatement:stmt]; \
+ return return_value; \
+} while(0)
+#endif
+
+static NSString *const kFCMRmqTag = @"FIRMessagingRmq:";
+
+@interface FIRMessagingRmqManager ()
+
+@property(nonatomic, readwrite, strong) FIRMessagingRmq2PersistentStore *rmq2Store;
+// map the category of an outgoing message with the number of messages for that category
+// should always have two keys -- the app, gcm
+@property(nonatomic, readwrite, strong) NSMutableDictionary *outstandingMessages;
+
+// Outgoing RMQ persistent id
+@property(nonatomic, readwrite, assign) int64_t rmqId;
+
+@end
+
+@implementation FIRMessagingRmqManager
+
+- (instancetype)initWithDatabaseName:(NSString *)databaseName {
+ self = [super init];
+ if (self) {
+ _FIRMessagingDevAssert([databaseName length] > 0, @"RMQ: Invalid rmq db name");
+ _rmq2Store = [[FIRMessagingRmq2PersistentStore alloc] initWithDatabaseName:databaseName];
+ _outstandingMessages = [NSMutableDictionary dictionaryWithCapacity:2];
+ _rmqId = -1;
+ }
+ return self;
+}
+
+- (void)loadRmqId {
+ if (self.rmqId >= 0) {
+ return; // already done
+ }
+
+ [self loadInitialOutgoingPersistentId];
+ if (self.outstandingMessages.count) {
+ FIRMessagingLoggerDebug(kFIRMessagingMessageCodeRmqManager000,
+ @"%@: outstanding categories %ld", kFCMRmqTag,
+ _FIRMessaging_UL(self.outstandingMessages.count));
+ }
+}
+
+/**
+ * Initialize the 'initial RMQ':
+ * - max ID of any message in the queue
+ * - if the queue is empty, stored value in separate DB.
+ *
+ * Stream acks will remove from RMQ, when we remove the highest message we keep track
+ * of its ID.
+ */
+- (void)loadInitialOutgoingPersistentId {
+
+ // we shouldn't always trust the lastRmqId stored in the LastRmqId table, because
+ // we only save to the LastRmqId table once in a while (after getting the lastRmqId sent
+ // by the server after reconnect, and after getting a rmq ack from the server). The
+ // rmq message with the highest rmq id tells the real story, so check against that first.
+
+ int64_t rmqId = [self queryHighestRmqId];
+ if (rmqId == 0) {
+ rmqId = [self querylastRmqId];
+ }
+ self.rmqId = rmqId + 1;
+}
+
+#pragma mark - Save
+
+/**
+ * Save a message to RMQ2. Will populate the rmq2 persistent ID.
+ */
+- (BOOL)saveRmqMessage:(GPBMessage *)message
+ error:(NSError **)error {
+ // send using rmq2manager
+ // the wire format of rmq2 id is a string. However, we keep it as a long internally
+ // in the database. So only convert the id to string when preparing for sending over
+ // the wire.
+ NSString *rmq2Id = FIRMessagingGetRmq2Id(message);
+ if (![rmq2Id length]) {
+ int64_t rmqId = [self nextRmqId];
+ rmq2Id = [NSString stringWithFormat:@"%lld", rmqId];
+ FIRMessagingSetRmq2Id(message, rmq2Id);
+ }
+ FIRMessagingProtoTag tag = FIRMessagingGetTagForProto(message);
+ return [self saveMessage:message withRmqId:[rmq2Id integerValue] tag:tag error:error];
+}
+
+- (BOOL)saveMessage:(GPBMessage *)message
+ withRmqId:(int64_t)rmqId
+ tag:(int8_t)tag
+ error:(NSError **)error {
+ NSData *data = [message data];
+ return [self.rmq2Store saveMessageWithRmqId:rmqId tag:tag data:data error:error];
+}
+
+/**
+ * This is called when we delete the largest outgoing message from queue.
+ */
+- (void)saveLastOutgoingRmqId:(int64_t)rmqID {
+ [self.rmq2Store updateLastOutgoingRmqId:rmqID];
+}
+
+- (BOOL)saveS2dMessageWithRmqId:(NSString *)rmqID {
+ return [self.rmq2Store saveUnackedS2dMessageWithRmqId:rmqID];
+}
+
+#pragma mark - Query
+
+- (int64_t)queryHighestRmqId {
+ return [self.rmq2Store queryHighestRmqId];
+}
+
+- (int64_t)querylastRmqId {
+ return [self.rmq2Store queryLastRmqId];
+}
+
+- (NSArray *)unackedS2dRmqIds {
+ return [self.rmq2Store unackedS2dRmqIds];
+}
+
+#pragma mark - FIRMessagingRMQScanner protocol
+
+/**
+ * We don't have a 'getMessages' method - it would require loading in memory
+ * the entire content body of all messages.
+ *
+ * Instead we iterate and call 'resend' for each message.
+ *
+ * This is called:
+ * - on connect MCS, to resend any outstanding messages
+ * - init
+ */
+- (void)scanWithRmqMessageHandler:(FIRMessagingRmqMessageHandler)rmqMessageHandler
+ dataMessageHandler:(FIRMessagingDataMessageHandler)dataMessageHandler {
+ // no need to scan database with no callbacks
+ if (rmqMessageHandler || dataMessageHandler) {
+ [self.rmq2Store scanOutgoingRmqMessagesWithHandler:^(int64_t rmqId, int8_t tag, NSData *data) {
+ if (rmqMessageHandler != nil) {
+ rmqMessageHandler(rmqId, tag, data);
+ }
+ if (dataMessageHandler != nil && kFIRMessagingProtoTagDataMessageStanza == tag) {
+ GPBMessage *proto =
+ [FIRMessagingGetClassForTag((FIRMessagingProtoTag)tag) parseFromData:data error:NULL];
+ GtalkDataMessageStanza *stanza = (GtalkDataMessageStanza *)proto;
+ dataMessageHandler(rmqId, stanza);
+ }
+ }];
+ }
+}
+
+#pragma mark - Remove
+
+- (void)ackReceivedForRmqId:(NSString *)rmqId {
+ // TODO: Optional book-keeping
+}
+
+- (int)removeRmqMessagesWithRmqId:(NSString *)rmqId {
+ return [self removeRmqMessagesWithRmqIds:@[rmqId]];
+}
+
+- (int)removeRmqMessagesWithRmqIds:(NSArray *)rmqIds {
+ if (![rmqIds count]) {
+ return 0;
+ }
+ for (NSString *rmqId in rmqIds) {
+ [self ackReceivedForRmqId:rmqId];
+ }
+ int64_t maxRmqId = -1;
+ for (NSString *rmqId in rmqIds) {
+ int64_t rmqIdValue = [rmqId longLongValue];
+ if (rmqIdValue > maxRmqId) {
+ maxRmqId = rmqIdValue;
+ }
+ }
+ maxRmqId++;
+ if (maxRmqId >= self.rmqId) {
+ [self saveLastOutgoingRmqId:maxRmqId];
+ }
+ return [self.rmq2Store deleteMessagesFromTable:kTableOutgoingRmqMessages withRmqIds:rmqIds];
+}
+
+- (void)removeS2dIds:(NSArray *)s2dIds {
+ [self.rmq2Store deleteMessagesFromTable:kTableS2DRmqIds withRmqIds:s2dIds];
+}
+
+#pragma mark - Sync Messages
+
+// TODO: RMQManager should also have a cache for all the sync messages
+// so we don't hit the DB each time.
+- (FIRMessagingPersistentSyncMessage *)querySyncMessageWithRmqID:(NSString *)rmqID {
+ return [self.rmq2Store querySyncMessageWithRmqID:rmqID];
+}
+
+- (BOOL)deleteSyncMessageWithRmqID:(NSString *)rmqID {
+ return [self.rmq2Store deleteSyncMessageWithRmqID:rmqID];
+}
+
+- (int)deleteExpiredOrFinishedSyncMessages:(NSError **)error {
+ return [self.rmq2Store deleteExpiredOrFinishedSyncMessages:error];
+}
+
+- (BOOL)saveSyncMessageWithRmqID:(NSString *)rmqID
+ expirationTime:(int64_t)expirationTime
+ apnsReceived:(BOOL)apnsReceived
+ mcsReceived:(BOOL)mcsReceived
+ error:(NSError *__autoreleasing *)error {
+ return [self.rmq2Store saveSyncMessageWithRmqID:rmqID
+ expirationTime:expirationTime
+ apnsReceived:apnsReceived
+ mcsReceived:mcsReceived
+ error:error];
+}
+
+- (BOOL)updateSyncMessageViaAPNSWithRmqID:(NSString *)rmqID error:(NSError **)error {
+ return [self.rmq2Store updateSyncMessageViaAPNSWithRmqID:rmqID error:error];
+}
+
+- (BOOL)updateSyncMessageViaMCSWithRmqID:(NSString *)rmqID error:(NSError **)error {
+ return [self.rmq2Store updateSyncMessageViaMCSWithRmqID:rmqID error:error];
+}
+
+#pragma mark - Testing
+
++ (void)removeDatabaseWithName:(NSString *)dbName {
+ [FIRMessagingRmq2PersistentStore removeDatabase:dbName];
+}
+
+#pragma mark - Private
+
+- (int64_t)nextRmqId {
+ return ++self.rmqId;
+}
+
+@end