aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/RxLibrary
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c/RxLibrary')
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m21
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.h4
-rw-r--r--src/objective-c/RxLibrary/GRXConcurrentWriteable.m76
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.m48
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateSingleWriter.m30
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h4
6 files changed, 84 insertions, 99 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
index 546d46cba3..d0064a5cfa 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.m
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -51,16 +51,22 @@
// We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
value = [value copy];
}
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^(void) {
- [weakSelf.writeable writeValue:value];
+ @synchronized (self) {
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self.writeable writeValue:value];
+ }
});
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- __weak GRXBufferedPipe *weakSelf = self;
dispatch_async(_writeQueue, ^{
- [weakSelf finishWithError:errorOrNil];
+ if (self->_state == GRXWriterStateFinished) {
+ return;
+ }
+ [self finishWithError:errorOrNil];
});
}
@@ -100,14 +106,15 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- self.writeable = writeable;
- _state = GRXWriterStateStarted;
+ @synchronized (self) {
+ self.writeable = writeable;
+ _state = GRXWriterStateStarted;
+ }
dispatch_resume(_writeQueue);
}
- (void)finishWithError:(NSError *)errorOrNil {
[self.writeable writesFinishedWithError:errorOrNil];
- self.state = GRXWriterStateFinished;
}
- (void)dealloc {
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
index abb831e6fb..606f5dea95 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.h
@@ -23,9 +23,9 @@
/**
* This is a thread-safe wrapper over a GRXWriteable instance. It lets one enqueue calls to a
- * GRXWriteable instance for the main thread, guaranteeing that writesFinishedWithError: is the last
+ * GRXWriteable instance for the thread user provided, guaranteeing that writesFinishedWithError: is the last
* message sent to it (no matter what messages are sent to the wrapper, in what order, nor from
- * which thread). It also guarantees that, if cancelWithError: is called from the main thread (e.g.
+ * which thread). It also guarantees that, if cancelWithError: is called (e.g.
* by the app cancelling the writes), no further messages are sent to the writeable except
* writesFinishedWithError:.
*
diff --git a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
index 81ccc3fbce..229d592f48 100644
--- a/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
+++ b/src/objective-c/RxLibrary/GRXConcurrentWriteable.m
@@ -41,6 +41,7 @@
if (self = [super init]) {
_writeableQueue = queue;
_writeable = writeable;
+ _alreadyFinished = NO;
}
return self;
}
@@ -51,78 +52,43 @@
- (void)enqueueValue:(id)value completionHandler:(void (^)(void))handler {
dispatch_async(_writeableQueue, ^{
- // We're racing a possible cancellation performed by another thread. To turn all already-
- // enqueued messages into noops, cancellation nillifies the writeable property. If we get it
- // before it's nil, we won the race.
- id<GRXWriteable> writeable = self.writeable;
- if (writeable) {
- [writeable writeValue:value];
- handler();
+ if (self->_alreadyFinished) {
+ return;
}
+
+ [self.writeable writeValue:value];
+ handler();
});
}
- (void)enqueueSuccessfulCompletion {
- __weak typeof(self) weakSelf = self;
dispatch_async(_writeableQueue, ^{
- typeof(self) strongSelf = weakSelf;
- if (strongSelf) {
- BOOL finished = NO;
- @synchronized(strongSelf) {
- if (!strongSelf->_alreadyFinished) {
- strongSelf->_alreadyFinished = YES;
- } else {
- finished = YES;
- }
- }
- if (!finished) {
- // Cancellation is now impossible. None of the other three blocks can run concurrently with
- // this one.
- [strongSelf.writeable writesFinishedWithError:nil];
- // Skip any possible message to the wrapped writeable enqueued after this one.
- strongSelf.writeable = nil;
- }
+ if (self->_alreadyFinished) {
+ return;
}
+ [self.writeable writesFinishedWithError:nil];
+ // Skip any possible message to the wrapped writeable enqueued after this one.
+ self.writeable = nil;
});
}
- (void)cancelWithError:(NSError *)error {
- NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
- BOOL finished = NO;
- @synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
+ NSAssert(error != nil, @"For a successful completion, use enqueueSuccessfulCompletion.");
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ return;
}
- }
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
- id<GRXWriteable> writeable = self.writeable;
+ [self.writeable writesFinishedWithError:error];
self.writeable = nil;
-
- dispatch_async(_writeableQueue, ^{
- [writeable writesFinishedWithError:error];
- });
- }
+ });
}
- (void)cancelSilently {
- BOOL finished = NO;
- @synchronized(self) {
- if (!_alreadyFinished) {
- _alreadyFinished = YES;
- } else {
- finished = YES;
+ dispatch_async(_writeableQueue, ^{
+ if (self->_alreadyFinished) {
+ return;
}
- }
- if (!finished) {
- // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
- // nillify writeable because we might be running concurrently with the blocks in
- // _writeableQueue, and assignment with ARC isn't atomic.
self.writeable = nil;
- }
+ });
}
@end
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m
index 3e522ef24e..376c196b4f 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.m
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m
@@ -54,23 +54,19 @@
[writeable writesFinishedWithError:errorOrNil];
}
-// This is used to stop the input writer. It nillifies our reference to it
-// to release it.
-- (void)finishInput {
- GRXWriter *writer = _writer;
- _writer = nil;
- writer.state = GRXWriterStateFinished;
-}
-
#pragma mark GRXWriteable implementation
- (void)writeValue:(id)value {
- [_writeable writeValue:value];
+ @synchronized (self) {
+ [_writeable writeValue:value];
+ }
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _writer = nil;
- [self finishOutputWithError:errorOrNil];
+ @synchronized (self) {
+ _writer = nil;
+ [self finishOutputWithError:errorOrNil];
+ }
}
#pragma mark GRXWriter implementation
@@ -80,22 +76,38 @@
}
- (void)setState:(GRXWriterState)state {
+ GRXWriter *copiedWriter = nil;
if (state == GRXWriterStateFinished) {
- _writeable = nil;
- [self finishInput];
+ @synchronized (self) {
+ _writeable = nil;
+ copiedWriter = _writer;
+ _writer = nil;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
} else {
- _writer.state = state;
+ @synchronized (self) {
+ copiedWriter = _writer;
+ }
+ copiedWriter.state = state;
}
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _writeable = writeable;
- [_writer startWithWriteable:self];
+ GRXWriter *copiedWriter = nil;
+ @synchronized (self) {
+ _writeable = writeable;
+ copiedWriter = _writer;
+ }
+ [copiedWriter startWithWriteable:self];
}
- (void)finishWithError:(NSError *)errorOrNil {
- [self finishOutputWithError:errorOrNil];
- [self finishInput];
+ GRXWriter *copiedWriter = nil;
+ @synchronized (self) {
+ [self finishOutputWithError:errorOrNil];
+ copiedWriter = _writer;
+ }
+ copiedWriter.state = GRXWriterStateFinished;
}
@end
diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
index 3126ae4bd1..eadad6c3b6 100644
--- a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
+++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m
@@ -20,7 +20,6 @@
@implementation GRXImmediateSingleWriter {
id _value;
- id<GRXWriteable> _writeable;
}
@synthesize state = _state;
@@ -38,17 +37,16 @@
}
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
- _state = GRXWriterStateStarted;
- _writeable = writeable;
- [writeable writeValue:_value];
- [self finish];
-}
-
-- (void)finish {
- _state = GRXWriterStateFinished;
- _value = nil;
- id<GRXWriteable> writeable = _writeable;
- _writeable = nil;
+ id copiedValue = nil;
+ @synchronized (self) {
+ if (_state != GRXWriterStateNotStarted) {
+ return;
+ }
+ copiedValue = _value;
+ _value = nil;
+ _state = GRXWriterStateFinished;
+ }
+ [writeable writeValue:copiedValue];
[writeable writesFinishedWithError:nil];
}
@@ -65,9 +63,11 @@
// the original \a map function returns a new Writer of another type. So we
// need to override this function here.
- (GRXWriter *)map:(id (^)(id))map {
- // Since _value is available when creating the object, we can simply
- // apply the map and store the output.
- _value = map(_value);
+ @synchronized (self) {
+ // Since _value is available when creating the object, we can simply
+ // apply the map and store the output.
+ _value = map(_value);
+ }
return self;
}
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index 5d99583a92..ac1f7b9c4c 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -80,9 +80,9 @@ typedef NS_ENUM(NSInteger, GRXWriterState) {
* This property can be used to query the current state of the writer, which determines how it might
* currently use its writeable. Some state transitions can be triggered by setting this property to
* the corresponding value, and that's useful for advanced use cases like pausing an writer. For
- * more details, see the documentation of the enum further down.
+ * more details, see the documentation of the enum further down. The property is thread safe.
*/
-@property(nonatomic) GRXWriterState state;
+@property(atomic) GRXWriterState state;
/**
* Transition to the Started state, and start sending messages to the writeable (a reference to it