diff options
Diffstat (limited to 'src/objective-c')
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 41 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXBufferedPipe.h | 9 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXForwardingWriter.h | 10 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXForwardingWriter.m | 6 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXImmediateWriter.h | 13 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXWriter.h | 91 |
6 files changed, 96 insertions, 74 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 5f7d74bca8..0f4c811ce4 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -74,11 +74,20 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; // all. This wrapper over our actual writeable ensures thread-safety and // correct ordering. GRXConcurrentWriteable *_responseWriteable; + + // The network thread wants the requestWriter to resume (when the server is ready for more input), + // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop + // it. Because a writer isn't thread-safe, we'll synchronize those operations on it. + // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or + // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to + // pause the writer immediately on writeValue:, so we need our locking to be recursive. GRXWriter *_requestWriter; // To create a retain cycle when a call is started, up until it finishes. See - // |startWithWriteable:| and |finishWithError:|. - GRPCCall *_self; + // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a + // reference to the call object if all they're interested in is the handler being executed when + // the response arrives. + GRPCCall *_retainSelf; NSMutableDictionary *_requestMetadata; NSMutableDictionary *_responseMetadata; @@ -136,11 +145,12 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; - (void)finishWithError:(NSError *)errorOrNil { // If the call isn't retained anywhere else, it can be deallocated now. - _self = nil; + _retainSelf = nil; // If there were still request messages coming, stop them. - _requestWriter.state = GRXWriterStateFinished; - _requestWriter = nil; + @synchronized(_requestWriter) { + _requestWriter.state = GRXWriterStateFinished; + } if (errorOrNil) { [_responseWriteable cancelWithError:errorOrNil]; @@ -240,12 +250,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; // Resume the request writer. GRPCCall *strongSelf = weakSelf; if (strongSelf) { - strongSelf->_requestWriter.state = GRXWriterStateStarted; + @synchronized(strongSelf->_requestWriter) { + strongSelf->_requestWriter.state = GRXWriterStateStarted; + } } }; - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] - initWithMessage:message - handler:resumingHandler]] errorHandler:errorHandler]; + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message + handler:resumingHandler]] + errorHandler:errorHandler]; } - (void)writeValue:(id)value { @@ -253,7 +265,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; // Pause the input and only resume it when the C layer notifies us that writes // can proceed. - _requestWriter.state = GRXWriterStatePaused; + @synchronized(_requestWriter) { + _requestWriter.state = GRXWriterStatePaused; + } __weak GRPCCall *weakSelf = self; dispatch_async(_callQueue, ^{ @@ -273,7 +287,6 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; } - (void)writesFinishedWithError:(NSError *)errorOrNil { - _requestWriter = nil; if (errorOrNil) { [self cancel]; } else { @@ -327,7 +340,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; } }]; // Now that the RPC has been initiated, request writes can start. - [_requestWriter startWithWriteable:self]; + @synchronized(_requestWriter) { + [_requestWriter startWithWriteable:self]; + } } #pragma mark GRXWriter implementation @@ -338,7 +353,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey"; // before being autoreleased). // Care is taken not to retain self strongly in any of the blocks used in this implementation, so // that the life of the instance is determined by this retain cycle. - _self = self; + _retainSelf = self; _responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable]; [self sendHeaders:_requestMetadata]; diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h index b6296e1ed7..ca94ce275f 100644 --- a/src/objective-c/RxLibrary/GRXBufferedPipe.h +++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h @@ -36,13 +36,11 @@ #import "GRXWriteable.h" #import "GRXWriter.h" -// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed -// to -startWithWriteable:). +// A buffered pipe is a Writer that also acts as a Writeable. // Once it is started, whatever values are written into it (via -writeValue:) will be propagated // immediately, unless flow control prevents it. // If it is throttled and keeps receiving values, as well as if it receives values before being -// started, it will buffer them and propagate them in order as soon as its state becomes -// GRXWriterStateStarted. +// started, it will buffer them and propagate them in order as soon as its state becomes Started. // If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and // propagate the error immediately. // @@ -51,6 +49,9 @@ // pipe will keep buffering all data written to it, your application could run out of memory and // crash. If you want to react to flow control signals to prevent that, instead of using this class // you can implement an object that conforms to GRXWriter. +// +// Thread-safety: +// The methods of an object of this class should not be called concurrently from different threads. @interface GRXBufferedPipe : GRXWriter<GRXWriteable> // Convenience constructor. diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h index d004333d2b..f310832284 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.h +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h @@ -33,11 +33,17 @@ #import "GRXWriter.h" -// A "proxy" class that simply forwards values, completion, and errors from its -// input writer to its writeable. +// A "proxy" class that simply forwards values, completion, and errors from its input writer to its +// writeable. // It is useful as a superclass for pipes that act as a transformation of their // input writer, and for classes that represent objects with input and // output sequences of values, like an RPC. +// +// Thread-safety: +// All messages sent to this object need to be serialized. When it is started, the writer it wraps +// is started in the same thread. Manual state changes are propagated to the wrapped writer in the +// same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be +// serialized with any message sent to this object. @interface GRXForwardingWriter : GRXWriter - (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER; @end diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m index 2342f51ab3..a72be9ace2 100644 --- a/src/objective-c/RxLibrary/GRXForwardingWriter.m +++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m @@ -48,7 +48,11 @@ // Designated initializer - (instancetype)initWithWriter:(GRXWriter *)writer { if (!writer) { - [NSException raise:NSInvalidArgumentException format:@"writer can't be nil."]; + return nil; + } + if (writer.state != GRXWriterStateNotStarted) { + [NSException raise:NSInvalidArgumentException + format:@"The writer argument must not have already started."]; } if ((self = [super init])) { _writer = writer; diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.h b/src/objective-c/RxLibrary/GRXImmediateWriter.h index b171f0c760..3fcc259434 100644 --- a/src/objective-c/RxLibrary/GRXImmediateWriter.h +++ b/src/objective-c/RxLibrary/GRXImmediateWriter.h @@ -36,10 +36,17 @@ #import "GRXWriter.h" // Utility to construct GRXWriter instances from values that are immediately available when -// required. The returned writers all support pausing and early termination. +// required. // -// Unless the writeable callback pauses them or stops them early, these writers will do all their -// interactions with the writeable before the start method returns. +// Thread-safety: +// +// An object of this class shouldn't be messaged concurrently by more than one thread. It will start +// messaging the writeable before |startWithWriteable:| returns, in the same thread. That is the +// only place where the writer can be paused or stopped prematurely. +// +// If a paused writer of this class is resumed, it will start messaging the writeable, in the same +// thread, before |setState:| returns. Because the object can't be legally accessed concurrently, +// that's the only place where it can be paused again (or stopped). @interface GRXImmediateWriter : GRXWriter // Returns a writer that pulls values from the passed NSEnumerator instance and pushes them to diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h index 5d6e1a472a..b1c994aa38 100644 --- a/src/objective-c/RxLibrary/GRXWriter.h +++ b/src/objective-c/RxLibrary/GRXWriter.h @@ -35,84 +35,73 @@ #import "GRXWriteable.h" +// States of a writer. typedef NS_ENUM(NSInteger, GRXWriterState) { - // The writer has not yet been given a writeable to which it can push its - // values. To have an writer transition to the Started state, send it a - // startWithWriteable: message. + // The writer has not yet been given a writeable to which it can push its values. To have a writer + // transition to the Started state, send it a startWithWriteable: message. // - // An writer's state cannot be manually set to this value. + // A writer's state cannot be manually set to this value. GRXWriterStateNotStarted, // The writer might push values to the writeable at any moment. GRXWriterStateStarted, - // The writer is temporarily paused, and won't send any more values to the - // writeable unless its state is set back to Started. The writer might still - // transition to the Finished state at any moment, and is allowed to send - // writesFinishedWithError: to its writeable. - // - // Not all implementations of writer have to support pausing, and thus - // trying to set an writer's state to this value might have no effect. + // The writer is temporarily paused, and won't send any more values to the writeable unless its + // state is set back to Started. The writer might still transition to the Finished state at any + // moment, and is allowed to send writesFinishedWithError: to its writeable. GRXWriterStatePaused, // The writer has released its writeable and won't interact with it anymore. // - // One seldomly wants to set an writer's state to this value, as its - // writeable isn't notified with a writesFinishedWithError: message. Instead, sending - // finishWithError: to the writer will make it notify the writeable and then - // transition to this state. + // One seldomly wants to set a writer's state to this value, as its writeable isn't notified with + // a writesFinishedWithError: message. Instead, sending finishWithError: to the writer will make + // it notify the writeable and then transition to this state. GRXWriterStateFinished }; -// An object that conforms to this protocol can produce, on demand, a sequence -// of values. The sequence may be produced asynchronously, and it may consist of -// any number of elements, including none or an infinite number. +// An GRXWriter object can produce, on demand, a sequence of values. The sequence may be produced +// asynchronously, and it may consist of any number of elements, including none or an infinite +// number. +// +// GRXWriter is the active dual of NSEnumerator. The difference between them is thus whether the +// object plays an active or passive role during usage: A user of NSEnumerator pulls values off it, +// and passes the values to a writeable. A user of GRXWriter, though, just gives it a writeable, and +// the GRXWriter instance pushes values to the writeable. This makes this protocol suitable to +// represent a sequence of future values, as well as collections with internal iteration. // -// GRXWriter is the active dual of NSEnumerator. The difference between them -// is thus whether the object plays an active or passive role during usage: A -// user of NSEnumerator pulls values off it, and passes the values to a writeable. -// A user of GRXWriter, though, just gives it a writeable, and the -// GRXWriter instance pushes values to the writeable. This makes this protocol -// suitable to represent a sequence of future values, as well as collections -// with internal iteration. +// An instance of GRXWriter can start producing values after a writeable is passed to it. It can +// also be commanded to finish the sequence immediately (with an optional error). Finally, it can be +// asked to pause, and resumed later. All GRXWriter objects support pausing and early termination. // -// An instance of GRXWriter can start producing values after a writeable is -// passed to it. It can also be commanded to finish the sequence immediately -// (with an optional error). Finally, it can be asked to pause, but the -// conforming instance is not required to oblige. +// Thread-safety: // -// Unless otherwise indicated by a conforming class, no messages should be sent -// concurrently to a GRXWriter. I.e., conforming classes aren't required to -// be thread-safe. +// State transitions take immediate effect if the object is used from a single thread. Subclasses +// might offer stronger guarantees. +// +// Unless otherwise indicated by a conforming subclass, no messages should be sent concurrently to a +// GRXWriter. I.e., conforming classes aren't required to be thread-safe. @interface GRXWriter : NSObject -// This property can be used to query the current state of the writer, which -// determines how it might currently use its writeable. Some state transitions can -// be triggered by setting this property to the corresponding value, and that's -// useful for advanced use cases like pausing an writer. For more details, -// see the documentation of the enum. +// This property can be used to query the current state of the writer, which determines how it might +// currently use its writeable. Some state transitions can be triggered by setting this property to +// the corresponding value, and that's useful for advanced use cases like pausing an writer. For +// more details, see the documentation of the enum further down. @property(nonatomic) GRXWriterState state; -// Start sending messages to the writeable. Messages may be sent before the method -// returns, or they may be sent later in the future. See GRXWriteable.h for the -// different messages a writeable can receive. +// Transition to the Started state, and start sending messages to the writeable (a reference to it +// is retained). Messages to the writeable may be sent before the method returns, or they may be +// sent later in the future. See GRXWriteable.h for the different messages a writeable can receive. // -// If this writer draws its values from an external source (e.g. from the -// filesystem or from a server), calling this method will commonly trigger side -// effects (like network connections). +// If this writer draws its values from an external source (e.g. from the filesystem or from a +// server), calling this method will commonly trigger side effects (like network connections). // // This method might only be called on writers in the NotStarted state. - (void)startWithWriteable:(id<GRXWriteable>)writeable; -// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send -// any more messages to it. -// -// This method might only be called on writers in the Started or Paused -// state. +// Send writesFinishedWithError:errorOrNil to the writeable. Then release the reference to it and +// transition to the Finished state. // -// TODO(jcanizales): Consider adding some guarantee about the immediacy of that -// stopping. I know I've relied on it in part of the code that uses this, but -// can't remember the details in the presence of concurrency. +// This method might only be called on writers in the Started or Paused state. - (void)finishWithError:(NSError *)errorOrNil; @end |