diff options
Diffstat (limited to 'src/objective-c/RxLibrary')
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.h | 59 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.m | 146 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXImmediateWriter.m | 4 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXWriteable.h | 8 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXWriteable.m | 4 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXWriter.h | 6 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXWriter.m | 8 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/transformations/GRXMappingWriter.m | 4 |
8 files changed, 221 insertions, 18 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h new file mode 100644 index 0000000000..5e876a73bf --- /dev/null +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -0,0 +1,59 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import <Foundation/Foundation.h> + +#import "GRXWriteable.h" +#import "GRXWriter.h" + +// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed +// to -startWithWriteable:). +// Once it is started, whatever values are written into it (via -writeValue:) will be propagated +// immediately, unless flow control prevents it. +// If it is throttled and keeps receiving values, as well as if it receives values before being +// started, it will buffer them and propagate them in order as soon as its state becomes +// GRXWriterStateStarted. +// If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and +// propagate the error immediately. +// +// Beware that a pipe of this type can't prevent receiving more values when it is paused (for +// example if used to write data to a congested network connection). Because in such situations the +// pipe will keep buffering all data written to it, your application could run out of memory and +// crash. If you want to react to flow control signals to prevent that, instead of using this class +// you can implement an object that conforms to GRXWriter. +@interface GRXBufferedPipe : NSObject<GRXWriteable, GRXWriter> + +// Convenience constructor. ++ (instancetype)pipe; + +@end diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m new file mode 100644 index 0000000000..4820c84af0 --- /dev/null +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m @@ -0,0 +1,146 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import "GRXBufferedPipe.h" + +@implementation GRXBufferedPipe { + id<GRXWriteable> _writeable; + NSMutableArray *_queue; + BOOL _inputIsFinished; + NSError *_errorOrNil; +} + +@synthesize state = _state; + ++ (instancetype)pipe { + return [[self alloc] init]; +} + +- (instancetype)init { + if (self = [super init]) { + _queue = [NSMutableArray array]; + _state = GRXWriterStateNotStarted; + } + return self; +} + +- (id)popValue { + id value = _queue[0]; + [_queue removeObjectAtIndex:0]; + return value; +} + +- (void)writeBufferUntilPausedOrStopped { + while (_state == GRXWriterStateStarted && _queue.count > 0) { + [_writeable writeValue:[self popValue]]; + } + if (_inputIsFinished && _queue.count == 0) { + // Our writer finished normally while we were paused or not-started-yet. + [self finishWithError:_errorOrNil]; + } +} + +#pragma mark GRXWriteable implementation + +// Returns whether events can be simply propagated to the other end of the pipe. +- (BOOL)shouldFastForward { + return _state == GRXWriterStateStarted && _queue.count == 0; +} + +- (void)writeValue:(id)value { + if (self.shouldFastForward) { + // Skip the queue. + [_writeable writeValue:value]; + } else { + // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer. + // So just buffer the new value. + // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe. + if ([value respondsToSelector:@selector(copy)]) { + value = [value copy]; + } + [_queue addObject:value]; + } +} + +- (void)writesFinishedWithError:(NSError *)errorOrNil { + _inputIsFinished = YES; + _errorOrNil = errorOrNil; + if (errorOrNil || self.shouldFastForward) { + // No need to write pending values. + [self finishWithError:_errorOrNil]; + } +} + +#pragma mark GRXWriter implementation + +- (void)setState:(GRXWriterState)newState { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { + return; + } + + switch (newState) { + case GRXWriterStateFinished: + _state = newState; + _queue = nil; + // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the + // writeable to be messaged anymore. + _writeable = nil; + return; + case GRXWriterStatePaused: + _state = newState; + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + [self writeBufferUntilPausedOrStopped]; + } + return; + case GRXWriterStateNotStarted: + return; + } +} + +- (void)startWithWriteable:(id<GRXWriteable>)writeable { + _state = GRXWriterStateStarted; + _writeable = writeable; + [self writeBufferUntilPausedOrStopped]; +} + +- (void)finishWithError:(NSError *)errorOrNil { + id<GRXWriteable> writeable = _writeable; + self.state = GRXWriterStateFinished; + [writeable writesFinishedWithError:errorOrNil]; +} + +@end diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.m b/src/objective-c/RxLibrary/GRXImmediateWriter.m index 7468af557f..0b4979872e 100644 --- a/src/objective-c/RxLibrary/GRXImmediateWriter.m +++ b/src/objective-c/RxLibrary/GRXImmediateWriter.m @@ -109,7 +109,7 @@ - (void)writeUntilPausedOrStopped { id value; while (value = [_enumerator nextObject]) { - [_writeable didReceiveValue:value]; + [_writeable writeValue:value]; // If the writeable has a reference to us, it might change our state to paused or finished. if (_state == GRXWriterStatePaused || _state == GRXWriterStateFinished) { return; @@ -130,7 +130,7 @@ _errorOrNil = nil; id<GRXWriteable> writeable = _writeable; _writeable = nil; - [writeable didFinishWithError:errorOrNil]; + [writeable writesFinishedWithError:errorOrNil]; } - (void)setState:(GRXWriterState)newState { diff --git a/src/objective-c/RxLibrary/GRXWriteable.h b/src/objective-c/RxLibrary/GRXWriteable.h index 6f6ea142e0..216de30735 100644 --- a/src/objective-c/RxLibrary/GRXWriteable.h +++ b/src/objective-c/RxLibrary/GRXWriteable.h @@ -38,14 +38,12 @@ @protocol GRXWriteable <NSObject> // Push the next value of the sequence to the receiving object. -// TODO(jcanizales): Name it enumerator:(id<GRXEnumerator>) didProduceValue:(id)? -- (void)didReceiveValue:(id)value; +- (void)writeValue:(id)value; // Signal that the sequence is completed, or that an error ocurred. After this -// message is sent to the instance, neither it nor didReceiveValue: may be +// message is sent to the instance, neither it nor writeValue: may be // called again. -// TODO(jcanizales): enumerator:(id<GRXEnumerator>) didFinishWithError:(NSError*)? -- (void)didFinishWithError:(NSError *)errorOrNil; +- (void)writesFinishedWithError:(NSError *)errorOrNil; @end typedef void (^GRXValueHandler)(id value); diff --git a/src/objective-c/RxLibrary/GRXWriteable.m b/src/objective-c/RxLibrary/GRXWriteable.m index 7000a078d1..63f7c3e7f3 100644 --- a/src/objective-c/RxLibrary/GRXWriteable.m +++ b/src/objective-c/RxLibrary/GRXWriteable.m @@ -76,13 +76,13 @@ return self; } -- (void)didReceiveValue:(id)value { +- (void)writeValue:(id)value { if (_valueHandler) { _valueHandler(value); } } -- (void)didFinishWithError:(NSError *)errorOrNil { +- (void)writesFinishedWithError:(NSError *)errorOrNil { if (_completionHandler) { _completionHandler(errorOrNil); } diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h index 68c294f007..dcf44e3143 100644 --- a/src/objective-c/RxLibrary/GRXWriter.h +++ b/src/objective-c/RxLibrary/GRXWriter.h @@ -50,7 +50,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) { // The writer is temporarily paused, and won't send any more values to the // writeable unless its state is set back to Started. The writer might still // transition to the Finished state at any moment, and is allowed to send - // didFinishWithError: to its writeable. + // writesFinishedWithError: to its writeable. // // Not all implementations of writer have to support pausing, and thus // trying to set an writer's state to this value might have no effect. @@ -59,7 +59,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) { // The writer has released its writeable and won't interact with it anymore. // // One seldomly wants to set an writer's state to this value, as its - // writeable isn't notified with a didFinishWithError: message. Instead, sending + // writeable isn't notified with a writesFinishedWithError: message. Instead, sending // finishWithError: to the writer will make it notify the writeable and then // transition to this state. GRXWriterStateFinished @@ -105,7 +105,7 @@ typedef NS_ENUM(NSInteger, GRXWriterState) { // This method might only be called on writers in the NotStarted state. - (void)startWithWriteable:(id<GRXWriteable>)writeable; -// Send didFinishWithError:errorOrNil immediately to the writeable, and don't send +// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send // any more messages to it. // // This method might only be called on writers in the Started or Paused diff --git a/src/objective-c/RxLibrary/GRXWriter.m b/src/objective-c/RxLibrary/GRXWriter.m index b48a44f3a7..cc14383560 100644 --- a/src/objective-c/RxLibrary/GRXWriter.m +++ b/src/objective-c/RxLibrary/GRXWriter.m @@ -62,7 +62,7 @@ - (void)finishOutputWithError:(NSError *)errorOrNil { id<GRXWriteable> writeable = _writeable; _writeable = nil; - [writeable didFinishWithError:errorOrNil]; + [writeable writesFinishedWithError:errorOrNil]; } // This is used to stop the input writer. It nillifies our reference to it @@ -75,11 +75,11 @@ #pragma mark GRXWriteable implementation -- (void)didReceiveValue:(id)value { - [_writeable didReceiveValue:value]; +- (void)writeValue:(id)value { + [_writeable writeValue:value]; } -- (void)didFinishWithError:(NSError *)errorOrNil { +- (void)writesFinishedWithError:(NSError *)errorOrNil { _writer = nil; [self finishOutputWithError:errorOrNil]; } diff --git a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m index 8a41c819a6..2cdfea1b67 100644 --- a/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m +++ b/src/objective-c/RxLibrary/transformations/GRXMappingWriter.m @@ -57,7 +57,7 @@ static id (^kIdentity)(id value) = ^id(id value) { } // Override -- (void)didReceiveValue:(id)value { - [super didReceiveValue:_map(value)]; +- (void)writeValue:(id)value { + [super writeValue:_map(value)]; } @end |