aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient/GRPCCall.m
diff options
context:
space:
mode:
Diffstat (limited to 'src/objective-c/GRPCClient/GRPCCall.m')
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m373
1 files changed, 373 insertions, 0 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
new file mode 100644
index 0000000000..b9248be5db
--- /dev/null
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -0,0 +1,373 @@
+#import "GRPCCall.h"
+
+#include <grpc.h>
+#include <support/time.h>
+
+#import "GRPCMethodName.h"
+#import "private/GRPCChannel.h"
+#import "private/GRPCCompletionQueue.h"
+#import "private/GRPCDelegateWrapper.h"
+#import "private/GRPCMethodName+HTTP2Encoding.h"
+#import "private/NSData+GRPC.h"
+#import "private/NSDictionary+GRPC.h"
+#import "private/NSError+GRPC.h"
+
+// A grpc_call_error represents a precondition failure when invoking the
+// grpc_call_* functions. If one ever happens, it's a bug in this library.
+//
+// TODO(jcanizales): Can an application shut down gracefully when a thread other
+// than the main one throws an exception?
+static void AssertNoErrorInCall(grpc_call_error error) {
+ if (error != GRPC_CALL_OK) {
+ @throw [NSException exceptionWithName:NSInternalInconsistencyException
+ reason:@"Precondition of grpc_call_* not met."
+ userInfo:nil];
+ }
+}
+
+@interface GRPCCall () <GRXWriteable>
+// Makes it readwrite.
+@property(atomic, strong) NSDictionary *responseMetadata;
+@end
+
+// The following methods of a C gRPC call object aren't reentrant, and thus
+// calls to them must be serialized:
+// - add_metadata
+// - invoke
+// - start_write
+// - writes_done
+// - start_read
+// - destroy
+// The first four are called as part of responding to client commands, but
+// start_read we want to call as soon as we're notified that the RPC was
+// successfully established (which happens concurrently in the network queue).
+// Serialization is achieved by using a private serial queue to operate the
+// call object.
+// Because add_metadata and invoke are called and return successfully before
+// any of the other methods is called, they don't need to use the queue.
+//
+// Furthermore, start_write and writes_done can only be called after the
+// WRITE_ACCEPTED event for any previous write is received. This is achieved by
+// pausing the requests writer immediately every time it writes a value, and
+// resuming it again when WRITE_ACCEPTED is received.
+//
+// Similarly, start_read can only be called after the READ event for any
+// previous read is received. This is easier to enforce, as we're writing the
+// received messages into the writeable: start_read is enqueued once upon receiving
+// the CLIENT_METADATA_READ event, and then once after receiving each READ
+// event.
+@implementation GRPCCall {
+ dispatch_queue_t _callQueue;
+
+ grpc_call *_gRPCCall;
+ dispatch_once_t _callAlreadyInvoked;
+
+ GRPCChannel *_channel;
+ GRPCCompletionQueue *_completionQueue;
+
+ // The C gRPC library has less guarantees on the ordering of events than we
+ // do. Particularly, in the face of errors, there's no ordering guarantee at
+ // all. This wrapper over our actual writeable ensures thread-safety and
+ // correct ordering.
+ GRPCDelegateWrapper *_responseWriteable;
+ id<GRXWriter> _requestWriter;
+}
+
+@synthesize state = _state;
+
+- (instancetype)init {
+ return [self initWithHost:nil method:nil requestsWriter:nil];
+}
+
+// Designated initializer
+- (instancetype)initWithHost:(NSString *)host
+ method:(GRPCMethodName *)method
+ requestsWriter:(id<GRXWriter>)requestWriter {
+ if (!host || !method) {
+ [NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."];
+ }
+ // TODO(jcanizales): Throw if the requestWriter was already started.
+ if ((self = [super init])) {
+ static dispatch_once_t initialization;
+ dispatch_once(&initialization, ^{
+ grpc_init();
+ });
+
+ _completionQueue = [GRPCCompletionQueue completionQueue];
+
+ _channel = [GRPCChannel channelToHost:host];
+ _gRPCCall = grpc_channel_create_call_old(_channel.unmanagedChannel,
+ method.HTTP2Path.UTF8String,
+ host.UTF8String,
+ gpr_inf_future);
+
+ // Serial queue to invoke the non-reentrant methods of the grpc_call object.
+ _callQueue = dispatch_queue_create("org.grpc.call", NULL);
+
+ _requestWriter = requestWriter;
+ }
+ return self;
+}
+
+#pragma mark Finish
+
+- (void)finishWithError:(NSError *)errorOrNil {
+ _requestWriter.state = GRXWriterStateFinished;
+ _requestWriter = nil;
+ if (errorOrNil) {
+ [_responseWriteable cancelWithError:errorOrNil];
+ } else {
+ [_responseWriteable enqueueSuccessfulCompletion];
+ }
+}
+
+- (void)cancelCall {
+ // Can be called from any thread, any number of times.
+ AssertNoErrorInCall(grpc_call_cancel(_gRPCCall));
+}
+
+- (void)cancel {
+ [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:nil]];
+ [self cancelCall];
+}
+
+- (void)dealloc {
+ grpc_call *gRPCCall = _gRPCCall;
+ dispatch_async(_callQueue, ^{
+ grpc_call_destroy(gRPCCall);
+ });
+}
+
+#pragma mark Read messages
+
+// Only called from the call queue.
+// The handler will be called from the network queue.
+- (void)startReadWithHandler:(GRPCEventHandler)handler {
+ AssertNoErrorInCall(grpc_call_start_read_old(_gRPCCall, (__bridge_retained void *)handler));
+}
+
+// Called initially from the network queue once response headers are received,
+// then "recursively" from the responseWriteable queue after each response from the
+// server has been written.
+// If the call is currently paused, this is a noop. Restarting the call will invoke this
+// method.
+// TODO(jcanizales): Rename to readResponseIfNotPaused.
+- (void)startNextRead {
+ if (self.state == GRXWriterStatePaused) {
+ return;
+ }
+ __weak GRPCCall *weakSelf = self;
+ __weak GRPCDelegateWrapper *weakWriteable = _responseWriteable;
+
+ dispatch_async(_callQueue, ^{
+ [weakSelf startReadWithHandler:^(grpc_event *event) {
+ if (!event->data.read) {
+ // No more responses from the server.
+ return;
+ }
+ NSData *data = [NSData grpc_dataWithByteBuffer:event->data.read];
+ if (!data) {
+ // The app doesn't have enough memory to hold the server response. We
+ // don't want to throw, because the app shouldn't crash for a behavior
+ // that's on the hands of any server to have. Instead we finish and ask
+ // the server to cancel.
+ //
+ // TODO(jcanizales): No canonical code is appropriate for this situation
+ // (because it's just a client problem). Use another domain and an
+ // appropriately-documented code.
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeInternal
+ userInfo:nil]];
+ [weakSelf cancelCall];
+ return;
+ }
+ [weakWriteable enqueueMessage:data completionHandler:^{
+ [weakSelf startNextRead];
+ }];
+ }];
+ });
+}
+
+#pragma mark Send headers
+
+- (void)addHeaderWithName:(NSString *)name binaryValue:(NSData *)value {
+ grpc_metadata metadata;
+ // Safe to discard const qualifiers; we're not going to modify the contents.
+ metadata.key = (char *)name.UTF8String;
+ metadata.value = (char *)value.bytes;
+ metadata.value_length = value.length;
+ grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
+}
+
+- (void)addHeaderWithName:(NSString *)name ASCIIValue:(NSString *)value {
+ grpc_metadata metadata;
+ // Safe to discard const qualifiers; we're not going to modify the contents.
+ metadata.key = (char *)name.UTF8String;
+ metadata.value = (char *)value.UTF8String;
+ // The trailing \0 isn't encoded in HTTP2.
+ metadata.value_length = value.length;
+ grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
+}
+
+// TODO(jcanizales): Rename to commitHeaders.
+- (void)sendHeaders:(NSDictionary *)metadata {
+ for (NSString *name in metadata) {
+ id value = metadata[name];
+ if ([value isKindOfClass:[NSData class]]) {
+ [self addHeaderWithName:name binaryValue:value];
+ } else if ([value isKindOfClass:[NSString class]]) {
+ [self addHeaderWithName:name ASCIIValue:value];
+ }
+ }
+}
+
+#pragma mark GRXWriteable implementation
+
+// Only called from the call queue. The error handler will be called from the
+// network queue if the write didn't succeed.
+- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
+
+ __weak GRPCCall *weakSelf = self;
+ GRPCEventHandler resumingHandler = ^(grpc_event *event) {
+ if (event->data.write_accepted != GRPC_OP_OK) {
+ errorHandler();
+ }
+ // Resume the request writer (even in the case of error).
+ // TODO(jcanizales): No need to do it in the case of errors anymore?
+ GRPCCall *strongSelf = weakSelf;
+ if (strongSelf) {
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ }
+ };
+
+ grpc_byte_buffer *buffer = message.grpc_byteBuffer;
+ AssertNoErrorInCall(grpc_call_start_write_old(_gRPCCall,
+ buffer,
+ (__bridge_retained void *)resumingHandler,
+ 0));
+ grpc_byte_buffer_destroy(buffer);
+}
+
+- (void)didReceiveValue:(id)value {
+ // TODO(jcanizales): Throw/assert if value isn't NSData.
+
+ // Pause the input and only resume it when the C layer notifies us that writes
+ // can proceed.
+ _requestWriter.state = GRXWriterStatePaused;
+
+ __weak GRPCCall *weakSelf = self;
+ dispatch_async(_callQueue, ^{
+ [weakSelf writeMessage:value withErrorHandler:^{
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeInternal
+ userInfo:nil]];
+ }];
+ });
+}
+
+// Only called from the call queue. The error handler will be called from the
+// network queue if the requests stream couldn't be closed successfully.
+- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
+ GRPCEventHandler handler = ^(grpc_event *event) {
+ if (event->data.finish_accepted != GRPC_OP_OK) {
+ errorHandler();
+ }
+ };
+ AssertNoErrorInCall(grpc_call_writes_done_old(_gRPCCall, (__bridge_retained void *)handler));
+}
+
+- (void)didFinishWithError:(NSError *)errorOrNil {
+ if (errorOrNil) {
+ [self cancel];
+ } else {
+ __weak GRPCCall *weakSelf = self;
+ dispatch_async(_callQueue, ^{
+ [weakSelf finishRequestWithErrorHandler:^{
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeInternal
+ userInfo:nil]];
+ }];
+ });
+ }
+}
+
+#pragma mark Invoke
+
+// Both handlers will eventually be called, from the network queue. Writes can start immediately
+// after this.
+// The first one (metadataHandler), when the response headers are received.
+// The second one (completionHandler), whenever the RPC finishes for any reason.
+- (void)invokeCallWithMetadataHandler:(GRPCEventHandler)metadataHandler
+ completionHandler:(GRPCEventHandler)completionHandler {
+ AssertNoErrorInCall(grpc_call_invoke_old(_gRPCCall,
+ _completionQueue.unmanagedQueue,
+ (__bridge_retained void *)metadataHandler,
+ (__bridge_retained void *)completionHandler,
+ 0));
+}
+
+- (void)invokeCall {
+ __weak GRPCCall *weakSelf = self;
+ [self invokeCallWithMetadataHandler:^(grpc_event *event) {
+ // Response metadata received.
+ // TODO(jcanizales): Name the type of event->data.client_metadata_read
+ // in the C library so one can actually pass the object to a method.
+ grpc_metadata *entries = event->data.client_metadata_read.elements;
+ size_t count = event->data.client_metadata_read.count;
+ GRPCCall *strongSelf = weakSelf;
+ if (strongSelf) {
+ strongSelf.responseMetadata = [NSDictionary grpc_dictionaryFromMetadata:entries
+ count:count];
+ [strongSelf startNextRead];
+ }
+ } completionHandler:^(grpc_event *event) {
+ // TODO(jcanizales): Merge HTTP2 trailers into response metadata.
+ [weakSelf finishWithError:[NSError grpc_errorFromStatus:&event->data.finished]];
+ }];
+ // Now that the RPC has been initiated, request writes can start.
+ [_requestWriter startWithWriteable:self];
+}
+
+#pragma mark GRXWriter implementation
+
+- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ // The following produces a retain cycle self:_responseWriteable:self, which is only
+ // broken when didFinishWithError: is sent to the wrapped writeable.
+ // Care is taken not to retain self strongly in any of the blocks used in
+ // the implementation of GRPCCall, so that the life of the instance is
+ // determined by this retain cycle.
+ _responseWriteable = [[GRPCDelegateWrapper alloc] initWithWriteable:writeable writer:self];
+ [self sendHeaders:_requestMetadata];
+ [self invokeCall];
+}
+
+- (void)setState:(GRXWriterState)newState {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
+ return;
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
+ _state = newState;
+ // Per GRXWriter's contract, setting the state to Finished manually
+ // means one doesn't wish the writeable to be messaged anymore.
+ [_responseWriteable cancelSilently];
+ _responseWriteable = nil;
+ return;
+ case GRXWriterStatePaused:
+ _state = newState;
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ [self startNextRead];
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
+}
+@end