aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c')
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m41
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h9
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.h10
-rw-r--r--src/objective-c/RxLibrary/GRXForwardingWriter.m6
-rw-r--r--src/objective-c/RxLibrary/GRXImmediateWriter.h13
-rw-r--r--src/objective-c/RxLibrary/GRXWriter.h91
6 files changed, 96 insertions, 74 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 5f7d74bca8..0f4c811ce4 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -74,11 +74,20 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// all. This wrapper over our actual writeable ensures thread-safety and
// correct ordering.
GRXConcurrentWriteable *_responseWriteable;
+
+ // The network thread wants the requestWriter to resume (when the server is ready for more input),
+ // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
+ // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
+ // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
+ // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
+ // pause the writer immediately on writeValue:, so we need our locking to be recursive.
GRXWriter *_requestWriter;
// To create a retain cycle when a call is started, up until it finishes. See
- // |startWithWriteable:| and |finishWithError:|.
- GRPCCall *_self;
+ // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
+ // reference to the call object if all they're interested in is the handler being executed when
+ // the response arrives.
+ GRPCCall *_retainSelf;
NSMutableDictionary *_requestMetadata;
NSMutableDictionary *_responseMetadata;
@@ -136,11 +145,12 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
- (void)finishWithError:(NSError *)errorOrNil {
// If the call isn't retained anywhere else, it can be deallocated now.
- _self = nil;
+ _retainSelf = nil;
// If there were still request messages coming, stop them.
- _requestWriter.state = GRXWriterStateFinished;
- _requestWriter = nil;
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStateFinished;
+ }
if (errorOrNil) {
[_responseWriteable cancelWithError:errorOrNil];
@@ -240,12 +250,14 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Resume the request writer.
GRPCCall *strongSelf = weakSelf;
if (strongSelf) {
- strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ @synchronized(strongSelf->_requestWriter) {
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ }
}
};
- [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc]
- initWithMessage:message
- handler:resumingHandler]] errorHandler:errorHandler];
+ [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message
+ handler:resumingHandler]]
+ errorHandler:errorHandler];
}
- (void)writeValue:(id)value {
@@ -253,7 +265,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// Pause the input and only resume it when the C layer notifies us that writes
// can proceed.
- _requestWriter.state = GRXWriterStatePaused;
+ @synchronized(_requestWriter) {
+ _requestWriter.state = GRXWriterStatePaused;
+ }
__weak GRPCCall *weakSelf = self;
dispatch_async(_callQueue, ^{
@@ -273,7 +287,6 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
- (void)writesFinishedWithError:(NSError *)errorOrNil {
- _requestWriter = nil;
if (errorOrNil) {
[self cancel];
} else {
@@ -327,7 +340,9 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
}
}];
// Now that the RPC has been initiated, request writes can start.
- [_requestWriter startWithWriteable:self];
+ @synchronized(_requestWriter) {
+ [_requestWriter startWithWriteable:self];
+ }
}
#pragma mark GRXWriter implementation
@@ -338,7 +353,7 @@ NSString * const kGRPCStatusMetadataKey = @"io.grpc.StatusMetadataKey";
// before being autoreleased).
// Care is taken not to retain self strongly in any of the blocks used in this implementation, so
// that the life of the instance is determined by this retain cycle.
- _self = self;
+ _retainSelf = self;
_responseWriteable = [[GRXConcurrentWriteable alloc] initWithWriteable:writeable];
[self sendHeaders:_requestMetadata];
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
index b6296e1ed7..ca94ce275f 100644
--- a/src/objective-c/RxLibrary/GRXBufferedPipe.h
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -36,13 +36,11 @@
#import "GRXWriteable.h"
#import "GRXWriter.h"
-// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
-// to -startWithWriteable:).
+// A buffered pipe is a Writer that also acts as a Writeable.
// Once it is started, whatever values are written into it (via -writeValue:) will be propagated
// immediately, unless flow control prevents it.
// If it is throttled and keeps receiving values, as well as if it receives values before being
-// started, it will buffer them and propagate them in order as soon as its state becomes
-// GRXWriterStateStarted.
+// started, it will buffer them and propagate them in order as soon as its state becomes Started.
// If it receives an error (via -writesFinishedWithError:), it will drop any buffered values and
// propagate the error immediately.
//
@@ -51,6 +49,9 @@
// pipe will keep buffering all data written to it, your application could run out of memory and
// crash. If you want to react to flow control signals to prevent that, instead of using this class
// you can implement an object that conforms to GRXWriter.
+//
+// Thread-safety:
+// The methods of an object of this class should not be called concurrently from different threads.
@interface GRXBufferedPipe : GRXWriter<GRXWriteable>
// Convenience constructor.
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.h b/src/objective-c/RxLibrary/GRXForwardingWriter.h
index d004333d2b..f310832284 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.h
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.h
@@ -33,11 +33,17 @@
#import "GRXWriter.h"
-// A "proxy" class that simply forwards values, completion, and errors from its
-// input writer to its writeable.
+// A "proxy" class that simply forwards values, completion, and errors from its input writer to its
+// writeable.
// It is useful as a superclass for pipes that act as a transformation of their
// input writer, and for classes that represent objects with input and
// output sequences of values, like an RPC.
+//
+// Thread-safety:
+// All messages sent to this object need to be serialized. When it is started, the writer it wraps
+// is started in the same thread. Manual state changes are propagated to the wrapped writer in the
+// same thread too. Importantly, all messages the wrapped writer sends to its writeable need to be
+// serialized with any message sent to this object.
@interface GRXForwardingWriter : GRXWriter
- (instancetype)initWithWriter:(GRXWriter *)writer NS_DESIGNATED_INITIALIZER;
@end
diff --git a/src/objective-c/RxLibrary/GRXForwardingWriter.m b/src/objective-c/RxLibrary/GRXForwardingWriter.m
index 2342f51ab3..a72be9ace2 100644
--- a/src/objective-c/RxLibrary/GRXForwardingWriter.m
+++ b/src/objective-c/RxLibrary/GRXForwardingWriter.m
@@ -48,7 +48,11 @@
// Designated initializer
- (instancetype)initWithWriter:(GRXWriter *)writer {
if (!writer) {
- [NSException raise:NSInvalidArgumentException format:@"writer can't be nil."];
+ return nil;
+ }
+ if (writer.state != GRXWriterStateNotStarted) {
+ [NSException raise:NSInvalidArgumentException
+ format:@"The writer argument must not have already started."];
}
if ((self = [super init])) {
_writer = writer;
diff --git a/src/objective-c/RxLibrary/GRXImmediateWriter.h b/src/objective-c/RxLibrary/GRXImmediateWriter.h
index b171f0c760..3fcc259434 100644
--- a/src/objective-c/RxLibrary/GRXImmediateWriter.h
+++ b/src/objective-c/RxLibrary/GRXImmediateWriter.h
@@ -36,10 +36,17 @@
#import "GRXWriter.h"
// Utility to construct GRXWriter instances from values that are immediately available when
-// required. The returned writers all support pausing and early termination.
+// required.
//
-// Unless the writeable callback pauses them or stops them early, these writers will do all their
-// interactions with the writeable before the start method returns.
+// Thread-safety:
+//
+// An object of this class shouldn't be messaged concurrently by more than one thread. It will start
+// messaging the writeable before |startWithWriteable:| returns, in the same thread. That is the
+// only place where the writer can be paused or stopped prematurely.
+//
+// If a paused writer of this class is resumed, it will start messaging the writeable, in the same
+// thread, before |setState:| returns. Because the object can't be legally accessed concurrently,
+// that's the only place where it can be paused again (or stopped).
@interface GRXImmediateWriter : GRXWriter
// Returns a writer that pulls values from the passed NSEnumerator instance and pushes them to
diff --git a/src/objective-c/RxLibrary/GRXWriter.h b/src/objective-c/RxLibrary/GRXWriter.h
index 5d6e1a472a..b1c994aa38 100644
--- a/src/objective-c/RxLibrary/GRXWriter.h
+++ b/src/objective-c/RxLibrary/GRXWriter.h
@@ -35,84 +35,73 @@
#import "GRXWriteable.h"
+// States of a writer.
typedef NS_ENUM(NSInteger, GRXWriterState) {
- // The writer has not yet been given a writeable to which it can push its
- // values. To have an writer transition to the Started state, send it a
- // startWithWriteable: message.
+ // The writer has not yet been given a writeable to which it can push its values. To have a writer
+ // transition to the Started state, send it a startWithWriteable: message.
//
- // An writer's state cannot be manually set to this value.
+ // A writer's state cannot be manually set to this value.
GRXWriterStateNotStarted,
// The writer might push values to the writeable at any moment.
GRXWriterStateStarted,
- // The writer is temporarily paused, and won't send any more values to the
- // writeable unless its state is set back to Started. The writer might still
- // transition to the Finished state at any moment, and is allowed to send
- // writesFinishedWithError: to its writeable.
- //
- // Not all implementations of writer have to support pausing, and thus
- // trying to set an writer's state to this value might have no effect.
+ // The writer is temporarily paused, and won't send any more values to the writeable unless its
+ // state is set back to Started. The writer might still transition to the Finished state at any
+ // moment, and is allowed to send writesFinishedWithError: to its writeable.
GRXWriterStatePaused,
// The writer has released its writeable and won't interact with it anymore.
//
- // One seldomly wants to set an writer's state to this value, as its
- // writeable isn't notified with a writesFinishedWithError: message. Instead, sending
- // finishWithError: to the writer will make it notify the writeable and then
- // transition to this state.
+ // One seldomly wants to set a writer's state to this value, as its writeable isn't notified with
+ // a writesFinishedWithError: message. Instead, sending finishWithError: to the writer will make
+ // it notify the writeable and then transition to this state.
GRXWriterStateFinished
};
-// An object that conforms to this protocol can produce, on demand, a sequence
-// of values. The sequence may be produced asynchronously, and it may consist of
-// any number of elements, including none or an infinite number.
+// An GRXWriter object can produce, on demand, a sequence of values. The sequence may be produced
+// asynchronously, and it may consist of any number of elements, including none or an infinite
+// number.
+//
+// GRXWriter is the active dual of NSEnumerator. The difference between them is thus whether the
+// object plays an active or passive role during usage: A user of NSEnumerator pulls values off it,
+// and passes the values to a writeable. A user of GRXWriter, though, just gives it a writeable, and
+// the GRXWriter instance pushes values to the writeable. This makes this protocol suitable to
+// represent a sequence of future values, as well as collections with internal iteration.
//
-// GRXWriter is the active dual of NSEnumerator. The difference between them
-// is thus whether the object plays an active or passive role during usage: A
-// user of NSEnumerator pulls values off it, and passes the values to a writeable.
-// A user of GRXWriter, though, just gives it a writeable, and the
-// GRXWriter instance pushes values to the writeable. This makes this protocol
-// suitable to represent a sequence of future values, as well as collections
-// with internal iteration.
+// An instance of GRXWriter can start producing values after a writeable is passed to it. It can
+// also be commanded to finish the sequence immediately (with an optional error). Finally, it can be
+// asked to pause, and resumed later. All GRXWriter objects support pausing and early termination.
//
-// An instance of GRXWriter can start producing values after a writeable is
-// passed to it. It can also be commanded to finish the sequence immediately
-// (with an optional error). Finally, it can be asked to pause, but the
-// conforming instance is not required to oblige.
+// Thread-safety:
//
-// Unless otherwise indicated by a conforming class, no messages should be sent
-// concurrently to a GRXWriter. I.e., conforming classes aren't required to
-// be thread-safe.
+// State transitions take immediate effect if the object is used from a single thread. Subclasses
+// might offer stronger guarantees.
+//
+// Unless otherwise indicated by a conforming subclass, no messages should be sent concurrently to a
+// GRXWriter. I.e., conforming classes aren't required to be thread-safe.
@interface GRXWriter : NSObject
-// This property can be used to query the current state of the writer, which
-// determines how it might currently use its writeable. Some state transitions can
-// be triggered by setting this property to the corresponding value, and that's
-// useful for advanced use cases like pausing an writer. For more details,
-// see the documentation of the enum.
+// This property can be used to query the current state of the writer, which determines how it might
+// currently use its writeable. Some state transitions can be triggered by setting this property to
+// the corresponding value, and that's useful for advanced use cases like pausing an writer. For
+// more details, see the documentation of the enum further down.
@property(nonatomic) GRXWriterState state;
-// Start sending messages to the writeable. Messages may be sent before the method
-// returns, or they may be sent later in the future. See GRXWriteable.h for the
-// different messages a writeable can receive.
+// Transition to the Started state, and start sending messages to the writeable (a reference to it
+// is retained). Messages to the writeable may be sent before the method returns, or they may be
+// sent later in the future. See GRXWriteable.h for the different messages a writeable can receive.
//
-// If this writer draws its values from an external source (e.g. from the
-// filesystem or from a server), calling this method will commonly trigger side
-// effects (like network connections).
+// If this writer draws its values from an external source (e.g. from the filesystem or from a
+// server), calling this method will commonly trigger side effects (like network connections).
//
// This method might only be called on writers in the NotStarted state.
- (void)startWithWriteable:(id<GRXWriteable>)writeable;
-// Send writesFinishedWithError:errorOrNil immediately to the writeable, and don't send
-// any more messages to it.
-//
-// This method might only be called on writers in the Started or Paused
-// state.
+// Send writesFinishedWithError:errorOrNil to the writeable. Then release the reference to it and
+// transition to the Finished state.
//
-// TODO(jcanizales): Consider adding some guarantee about the immediacy of that
-// stopping. I know I've relied on it in part of the code that uses this, but
-// can't remember the details in the presence of concurrency.
+// This method might only be called on writers in the Started or Paused state.
- (void)finishWithError:(NSError *)errorOrNil;
@end