aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/RxLibrary
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c/RxLibrary')
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h3
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m21
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.h21
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.m102
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.h6
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.m55
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateSingleWriter.h2
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateSingleWriter.m30
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h4
9 files changed, 130 insertions, 114 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index a871ea895a..ae08cc315b 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -36,8 +36,7 @@
* crash. If you want to react to flow control signals to prevent that, instead of using this class
* you can implement an object that conforms to GRXWriter.
*
- * Thread-safety:
- * The methods of an object of this class should not be called concurrently from different threads.
+ * Thread-safety: the methods of this class are thread-safe.
*/
@interface GRXBufferedPipe : GRXWriter<GRXWriteable>
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 546d46cba3..74e2f03da6 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -51,16 +51,22 @@
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
value = [value copy];
}
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
- [weakSelf.writeable writeValue:value];
+ @synchronized(self) {
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self.writeable writeValue:value];
+ }
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^{
- [weakSelf finishWithError:errorOrNil];
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self finishWithError:errorOrNil];
});
}
@@ -100,14 +106,15 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- self.writeable = writeable;
- _state = GRXWriterStateStarted;
+ @synchronized(self) {
+ self.writeable = writeable;
+ _state = GRXWriterStateStarted;
+ }
dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
[self.writeable writesFinishedWithError:errorOrNil];
- self.state = GRXWriterStateFinished;
}
- (void)dealloc {
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
index abb831e6fb..5beca9d41e 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
@@ -23,10 +23,10 @@
/**
* This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a
- * GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last
- * message sent to it (no matter what messages are sent to the wrapper, in what order, nor from
- * which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g.
- * by the app cancelling the writes), no further messages are sent to the writeable except
+ * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is
+ * the last message sent to it (no matter what messages are sent to the wrapper, in what order, nor
+ * from which thread). It also guarantees that, if cancelWithError: is called (e.g. by the app
+ * cancelling the writes), no further messages are sent to the writeable except
* writesFinishedWithError:.
*
* TODO(jcanizales): Let the user specify another queue for the writeable callbacks.
@@ -43,21 +43,22 @@
- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable;
/**
- * Enqueues writeValue: to be sent to the writeable in the main thread.
- * The passed handler is invoked from the main thread after writeValue: returns.
+ * Enqueues writeValue: to be sent to the writeable from the designated dispatch queue.
+ * The passed handler is invoked from designated dispatch queue after writeValue: returns.
*/
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler;
/**
- * Enqueues writesFinishedWithError:nil to be sent to the writeable in the main thread. After that
- * message is sent to the writeable, all other methods of this object are effectively noops.
+ * Enqueues writesFinishedWithError:nil to be sent to the writeable in the designated dispatch
+ * queue. After that message is sent to the writeable, all other methods of this object are
+ * effectively noops.
*/
- (void)enqueueSuccessfulCompletion;
/**
* If the writeable has not yet received a writesFinishedWithError: message, this will enqueue one
- * to be sent to it in the main thread, and cancel all other pending messages to the writeable
- * enqueued by this object (both past and future).
+ * to be sent to it in the designated dispatch queue, and cancel all other pending messages to the
+ * writeable enqueued by this object (both past and future).
* The error argument cannot be nil.
*/
- (void)cancelWithError:(NSError *)error;
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
index 81ccc3fbce..115195463d 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
@@ -27,8 +27,15 @@
@implementation GRXConcurrentWriteable {
dispatch_queue_t _writeableQueue;
- // This ensures that writesFinishedWithError: is only sent once to the writeable.
+
+ // This ivar ensures that writesFinishedWithError: is only sent once to the writeable. Protected
+ // by _writeableQueue.
BOOL _alreadyFinished;
+
+ // This ivar ensures that a cancelWithError: call prevents further values to be sent to
+ // self.writeable. It must support manipulation outside of _writeableQueue and thus needs to be
+ // protected by self lock.
+ BOOL _cancelled;
}
- (instancetype)init {
@@ -41,6 +48,8 @@
if (self = [super init]) {
_writeableQueue = queue;
_writeable = writeable;
+ _alreadyFinished = NO;
+ _cancelled = NO;
}
return self;
}
@@ -51,78 +60,63 @@
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
dispatch_async(_writeableQueue, ^{
- // We're racing a possible cancellation performed by another thread. To turn all already-
- // enqueued messages into noops, cancellation nillifies the writeable property. If we get it
- // before it's nil, we won the race.
- id<GRXWriteable> writeable = self.writeable;
- if (writeable) {
- [writeable writeValue:value];
- handler();
+ if (self->_alreadyFinished) {
+ return;
+ }
+
+ @synchronized(self) {
+ if (self->_cancelled) {
+ return;
+ }
}
+
+ [self.writeable writeValue:value];
+ handler();
});
}
- (void)enqueueSuccessfulCompletion {
- __weak typeof(self) weakSelf = self;
dispatch_async(_writeableQueue, ^{
- typeof(self) strongSelf = weakSelf;
- if (strongSelf) {
- BOOL finished = NO;
- @synchronized(strongSelf) {
- if (!strongSelf->_alreadyFinished) {
- strongSelf->_alreadyFinished = YES;
- } else {
- finished = YES;
- }
- }
- if (!finished) {
- // Cancellation is now impossible. None of the other three blocks can run concurrently with
- // this one.
- [strongSelf.writeable writesFinishedWithError:nil];
- // Skip any possible message to the wrapped writeable enqueued after this one.
- strongSelf.writeable = nil;
+ if (self->_alreadyFinished) {
+ return;
+ }
+ @synchronized(self) {
+ if (self->_cancelled) {
+ return;
}
}
+ [self.writeable writesFinishedWithError:nil];
+
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self->_alreadyFinished = YES;
+ self.writeable = nil;
});
}
- (void)cancelWithError:(NSError *)error {
- NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
- BOOL finished = NO;
+ NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion.");
@synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
- }
+ self->_cancelled = YES;
}
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
- id<GRXWriteable> writeable = self.writeable;
- self.writeable = nil;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ // a cancel or a successful completion is already issued
+ return;
+ }
+ [self.writeable writesFinishedWithError:error];
- dispatch_async(_writeableQueue, ^{
- [writeable writesFinishedWithError:error];
- });
- }
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self->_alreadyFinished = YES;
+ self.writeable = nil;
+ });
}
- (void)cancelSilently {
- BOOL finished = NO;
- @synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ return;
}
- }
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
self.writeable = nil;
- }
+ });
}
@end
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h
index 3814ff8831..00366b6416 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.h
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h
@@ -25,11 +25,7 @@
* input writer, and for classes that represent objects with input and
* output sequences of values, like an RPC.
*
- * Thread-safety:
- * All messages sent to this object need to be serialized. When it is started, the writer it wraps
- * is started in the same thread. Manual state changes are propagated to the wrapped writer in the
- * same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be
- * serialized with any message sent to this object.
+ * Thread-safety: the methods of this class are thread safe.
*/
@interface GRXForwardingWriter : GRXWriter
- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m
index 3e522ef24e..27ac0acdff 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.m
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m
@@ -54,48 +54,65 @@
[writeable writesFinishedWithError:errorOrNil];
}
-// This is used to stop the input writer. It nillifies our reference to it
-// to release it.
-- (void)finishInput {
- GRXWriter *writer = _writer;
- _writer = nil;
- writer.state = GRXWriterStateFinished;
-}
-
#pragma mark GRXWriteable implementation
- (void)writeValue:(id)value {
- [_writeable writeValue:value];
+ @synchronized(self) {
+ [_writeable writeValue:value];
+ }
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _writer = nil;
- [self finishOutputWithError:errorOrNil];
+ @synchronized(self) {
+ _writer = nil;
+ [self finishOutputWithError:errorOrNil];
+ }
}
#pragma mark GRXWriter implementation
- (GRXWriterState)state {
- return _writer ? _writer.state : GRXWriterStateFinished;
+ GRXWriter *copiedWriter;
+ @synchronized(self) {
+ copiedWriter = _writer;
+ }
+ return copiedWriter ? copiedWriter.state : GRXWriterStateFinished;
}
- (void)setState:(GRXWriterState)state {
+ GRXWriter *copiedWriter = nil;
if (state == GRXWriterStateFinished) {
- _writeable = nil;
- [self finishInput];
+ @synchronized(self) {
+ _writeable = nil;
+ copiedWriter = _writer;
+ _writer = nil;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
} else {
- _writer.state = state;
+ @synchronized(self) {
+ copiedWriter = _writer;
+ }
+ copiedWriter.state = state;
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _writeable = writeable;
- [_writer startWithWriteable:self];
+ GRXWriter *copiedWriter = nil;
+ @synchronized(self) {
+ _writeable = writeable;
+ copiedWriter = _writer;
+ }
+ [copiedWriter startWithWriteable:self];
}
- (void)finishWithError:(NSError *)errorOrNil {
- [self finishOutputWithError:errorOrNil];
- [self finishInput];
+ GRXWriter *copiedWriter = nil;
+ @synchronized(self) {
+ [self finishOutputWithError:errorOrNil];
+ copiedWriter = _writer;
+ _writer = nil;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
}
@end
diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
index 601abdc6b9..2fa38b3dce 100644
--- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
+++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h
@@ -23,6 +23,8 @@
/**
* Utility to construct GRXWriter instances from values that are immediately available when
* required.
+ *
+ * Thread safety: the methods of this class are thread safe.
*/
@interface GRXImmediateSingleWriter : GRXImmediateWriter
diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
index 3126ae4bd1..079c11b94f 100644
--- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
+++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
@@ -20,7 +20,6 @@
@implementation GRXImmediateSingleWriter {
id _value;
- id<GRXWriteable> _writeable;
}
@synthesize state = _state;
@@ -38,17 +37,16 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _state = GRXWriterStateStarted;
- _writeable = writeable;
- [writeable writeValue:_value];
- [self finish];
-}
-
-- (void)finish {
- _state = GRXWriterStateFinished;
- _value = nil;
- id<GRXWriteable> writeable = _writeable;
- _writeable = nil;
+ id copiedValue = nil;
+ @synchronized(self) {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ copiedValue = _value;
+ _value = nil;
+ _state = GRXWriterStateFinished;
+ }
+ [writeable writeValue:copiedValue];
[writeable writesFinishedWithError:nil];
}
@@ -65,9 +63,11 @@
// the original \a map function returns a new Writer of another type. So we
// need to override this function here.
- (GRXWriter *)map:(id (^)(id))map {
- // Since _value is available when creating the object, we can simply
- // apply the map and store the output.
- _value = map(_value);
+ @synchronized(self) {
+ // Since _value is available when creating the object, we can simply
+ // apply the map and store the output.
+ _value = map(_value);
+ }
return self;
}
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index 5d99583a92..df4c80c28d 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -80,9 +80,9 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
* This property can be used to query the current state of the writer, which determines how it might
* currently use its writeable. Some state transitions can be triggered by setting this property to
* the corresponding value, and that's useful for advanced use cases like pausing an writer. For
- * more details, see the documentation of the enum further down.
+ * more details, see the documentation of the enum further down. The property is thread safe.
*/
-@property(nonatomic) GRXWriterState state;
+@property GRXWriterState state;
/**
* Transition to the Started state, and start sending messages to the writeable (a reference to it