/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #import "GRXConcurrentWriteable.h" #import @interface GRXConcurrentWriteable () // This is atomic so that cancellation can nillify it from any thread. @property(atomic, strong) id writeable; @end @implementation GRXConcurrentWriteable { dispatch_queue_t _writeableQueue; // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected // by _writeableQueue. BOOL _alreadyFinished; // This ivar ensures that a cancelWithError: call prevents further values to be sent to // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be // protected by self lock. BOOL _cancelled; } - (instancetype)init { return [self initWithWriteable:nil]; } // Designated initializer - (instancetype)initWithWriteable:(id)writeable dispatchQueue:(dispatch_queue_t)queue { if (self = [super init]) { _writeableQueue = queue; _writeable = writeable; _alreadyFinished = NO; _cancelled = NO; } return self; } - (instancetype)initWithWriteable:(id)writeable { return [self initWithWriteable:writeable dispatchQueue:dispatch_get_main_queue()]; } - (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler { dispatch_async(_writeableQueue, ^{ if (self->_alreadyFinished) { return; } @synchronized(self) { if (self->_cancelled) { return; } } [self.writeable writeValue:value]; handler(); }); } - (void)enqueueSuccessfulCompletion { dispatch_async(_writeableQueue, ^{ if (self->_alreadyFinished) { return; } @synchronized(self) { if (self->_cancelled) { return; } } [self.writeable writesFinishedWithError:nil]; // Skip any possible message to the wrapped writeable enqueued after this one. self->_alreadyFinished = YES; self.writeable = nil; }); } - (void)cancelWithError:(NSError *)error { NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion."); @synchronized(self) { self->_cancelled = YES; } dispatch_async(_writeableQueue, ^{ if (self->_alreadyFinished) { // a cancel or a successful completion is already issued return; } [self.writeable writesFinishedWithError:error]; // Skip any possible message to the wrapped writeable enqueued after this one. self->_alreadyFinished = YES; self.writeable = nil; }); } - (void)cancelSilently { dispatch_async(_writeableQueue, ^{ if (self->_alreadyFinished) { return; } self.writeable = nil; }); } @end