diff options
author | Jorge Canizales <jcanizales@google.com> | 2015-02-17 18:23:58 -0800 |
---|---|---|
committer | Jorge Canizales <jcanizales@google.com> | 2015-02-17 18:23:58 -0800 |
commit | 5e0efd95f785bc3a82fa2b7b67b2442625653efa (patch) | |
tree | 479cf5b81282a4f7df98664fd85ddf1b4139d5e6 /src/objective-c/GRPCClient | |
parent | 30697c9be2ff01e9f33e0934b58877fc3d11f516 (diff) |
Imports code of the generic gRPC client library.
Diffstat (limited to 'src/objective-c/GRPCClient')
19 files changed, 909 insertions, 0 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h new file mode 100644 index 0000000000..db138fd1ee --- /dev/null +++ b/src/objective-c/GRPCClient/GRPCCall.h @@ -0,0 +1,57 @@ +#import <Foundation/Foundation.h> +#import <RxLibrary/GRXWriter.h> + +@class GRPCMethodName; + +@class GRPCCall; + +// The gRPC protocol is an RPC protocol on top of HTTP2. +// +// While the most common type of RPC receives only one request message and +// returns only one response message, the protocol also supports RPCs that +// return multiple individual messages in a streaming fashion, RPCs that +// accept a stream of request messages, or RPCs with both streaming requests +// and responses. +// +// Conceptually, each gRPC call consists of a bidirectional stream of binary +// messages, with RPCs of the "non-streaming type" sending only one message in +// the corresponding direction (the protocol doesn't make any distinction). +// +// Each RPC uses a different HTTP2 stream, and thus multiple simultaneous RPCs +// can be multiplexed transparently on the same TCP connection. +@interface GRPCCall : NSObject<GRXWriter> + +// These HTTP2 headers will be passed to the server as part of this call. Each +// HTTP2 header is a name-value pair with string names and either string or binary values. +// The passed dictionary has to use NSString keys, corresponding to the header names. The +// value associated to each can be a NSString object or a NSData object. E.g.: +// +// call.requestMetadata = @{ +// @"Authorization": @"Bearer ...", +// @"SomeBinaryHeader": someData +// }; +// +// After the call is started, modifying this won't have any effect. +@property(nonatomic, readwrite) NSMutableDictionary *requestMetadata; + +// This isn't populated until the first event is delivered to the handler. +@property(atomic, readonly) NSDictionary *responseMetadata; + +// The request writer has to write NSData objects into the provided Writeable. The server will +// receive each of those separately and in order. +// A gRPC call might not complete until the request writer finishes. On the other hand, the +// request finishing doesn't necessarily make the call to finish, as the server might continue +// sending messages to the response side of the call indefinitely (depending on the semantics of +// the specific remote method called). +// To finish a call right away, invoke cancel. +- (instancetype)initWithHost:(NSString *)host + method:(GRPCMethodName *)method + requestsWriter:(id<GRXWriter>)requestsWriter NS_DESIGNATED_INITIALIZER; + +// Finishes the request side of this call, notifies the server that the RPC +// should be cancelled, and finishes the response side of the call with an error +// of code CANCELED. +- (void)cancel; + +// TODO(jcanizales): Let specify a deadline. As a category of GRXWriter? +@end diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m new file mode 100644 index 0000000000..b9248be5db --- /dev/null +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -0,0 +1,373 @@ +#import "GRPCCall.h" + +#include <grpc.h> +#include <support/time.h> + +#import "GRPCMethodName.h" +#import "private/GRPCChannel.h" +#import "private/GRPCCompletionQueue.h" +#import "private/GRPCDelegateWrapper.h" +#import "private/GRPCMethodName+HTTP2Encoding.h" +#import "private/NSData+GRPC.h" +#import "private/NSDictionary+GRPC.h" +#import "private/NSError+GRPC.h" + +// A grpc_call_error represents a precondition failure when invoking the +// grpc_call_* functions. If one ever happens, it's a bug in this library. +// +// TODO(jcanizales): Can an application shut down gracefully when a thread other +// than the main one throws an exception? +static void AssertNoErrorInCall(grpc_call_error error) { + if (error != GRPC_CALL_OK) { + @throw [NSException exceptionWithName:NSInternalInconsistencyException + reason:@"Precondition of grpc_call_* not met." + userInfo:nil]; + } +} + +@interface GRPCCall () <GRXWriteable> +// Makes it readwrite. +@property(atomic, strong) NSDictionary *responseMetadata; +@end + +// The following methods of a C gRPC call object aren't reentrant, and thus +// calls to them must be serialized: +// - add_metadata +// - invoke +// - start_write +// - writes_done +// - start_read +// - destroy +// The first four are called as part of responding to client commands, but +// start_read we want to call as soon as we're notified that the RPC was +// successfully established (which happens concurrently in the network queue). +// Serialization is achieved by using a private serial queue to operate the +// call object. +// Because add_metadata and invoke are called and return successfully before +// any of the other methods is called, they don't need to use the queue. +// +// Furthermore, start_write and writes_done can only be called after the +// WRITE_ACCEPTED event for any previous write is received. This is achieved by +// pausing the requests writer immediately every time it writes a value, and +// resuming it again when WRITE_ACCEPTED is received. +// +// Similarly, start_read can only be called after the READ event for any +// previous read is received. This is easier to enforce, as we're writing the +// received messages into the writeable: start_read is enqueued once upon receiving +// the CLIENT_METADATA_READ event, and then once after receiving each READ +// event. +@implementation GRPCCall { + dispatch_queue_t _callQueue; + + grpc_call *_gRPCCall; + dispatch_once_t _callAlreadyInvoked; + + GRPCChannel *_channel; + GRPCCompletionQueue *_completionQueue; + + // The C gRPC library has less guarantees on the ordering of events than we + // do. Particularly, in the face of errors, there's no ordering guarantee at + // all. This wrapper over our actual writeable ensures thread-safety and + // correct ordering. + GRPCDelegateWrapper *_responseWriteable; + id<GRXWriter> _requestWriter; +} + +@synthesize state = _state; + +- (instancetype)init { + return [self initWithHost:nil method:nil requestsWriter:nil]; +} + +// Designated initializer +- (instancetype)initWithHost:(NSString *)host + method:(GRPCMethodName *)method + requestsWriter:(id<GRXWriter>)requestWriter { + if (!host || !method) { + [NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."]; + } + // TODO(jcanizales): Throw if the requestWriter was already started. + if ((self = [super init])) { + static dispatch_once_t initialization; + dispatch_once(&initialization, ^{ + grpc_init(); + }); + + _completionQueue = [GRPCCompletionQueue completionQueue]; + + _channel = [GRPCChannel channelToHost:host]; + _gRPCCall = grpc_channel_create_call_old(_channel.unmanagedChannel, + method.HTTP2Path.UTF8String, + host.UTF8String, + gpr_inf_future); + + // Serial queue to invoke the non-reentrant methods of the grpc_call object. + _callQueue = dispatch_queue_create("org.grpc.call", NULL); + + _requestWriter = requestWriter; + } + return self; +} + +#pragma mark Finish + +- (void)finishWithError:(NSError *)errorOrNil { + _requestWriter.state = GRXWriterStateFinished; + _requestWriter = nil; + if (errorOrNil) { + [_responseWriteable cancelWithError:errorOrNil]; + } else { + [_responseWriteable enqueueSuccessfulCompletion]; + } +} + +- (void)cancelCall { + // Can be called from any thread, any number of times. + AssertNoErrorInCall(grpc_call_cancel(_gRPCCall)); +} + +- (void)cancel { + [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeCancelled + userInfo:nil]]; + [self cancelCall]; +} + +- (void)dealloc { + grpc_call *gRPCCall = _gRPCCall; + dispatch_async(_callQueue, ^{ + grpc_call_destroy(gRPCCall); + }); +} + +#pragma mark Read messages + +// Only called from the call queue. +// The handler will be called from the network queue. +- (void)startReadWithHandler:(GRPCEventHandler)handler { + AssertNoErrorInCall(grpc_call_start_read_old(_gRPCCall, (__bridge_retained void *)handler)); +} + +// Called initially from the network queue once response headers are received, +// then "recursively" from the responseWriteable queue after each response from the +// server has been written. +// If the call is currently paused, this is a noop. Restarting the call will invoke this +// method. +// TODO(jcanizales): Rename to readResponseIfNotPaused. +- (void)startNextRead { + if (self.state == GRXWriterStatePaused) { + return; + } + __weak GRPCCall *weakSelf = self; + __weak GRPCDelegateWrapper *weakWriteable = _responseWriteable; + + dispatch_async(_callQueue, ^{ + [weakSelf startReadWithHandler:^(grpc_event *event) { + if (!event->data.read) { + // No more responses from the server. + return; + } + NSData *data = [NSData grpc_dataWithByteBuffer:event->data.read]; + if (!data) { + // The app doesn't have enough memory to hold the server response. We + // don't want to throw, because the app shouldn't crash for a behavior + // that's on the hands of any server to have. Instead we finish and ask + // the server to cancel. + // + // TODO(jcanizales): No canonical code is appropriate for this situation + // (because it's just a client problem). Use another domain and an + // appropriately-documented code. + [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeInternal + userInfo:nil]]; + [weakSelf cancelCall]; + return; + } + [weakWriteable enqueueMessage:data completionHandler:^{ + [weakSelf startNextRead]; + }]; + }]; + }); +} + +#pragma mark Send headers + +- (void)addHeaderWithName:(NSString *)name binaryValue:(NSData *)value { + grpc_metadata metadata; + // Safe to discard const qualifiers; we're not going to modify the contents. + metadata.key = (char *)name.UTF8String; + metadata.value = (char *)value.bytes; + metadata.value_length = value.length; + grpc_call_add_metadata_old(_gRPCCall, &metadata, 0); +} + +- (void)addHeaderWithName:(NSString *)name ASCIIValue:(NSString *)value { + grpc_metadata metadata; + // Safe to discard const qualifiers; we're not going to modify the contents. + metadata.key = (char *)name.UTF8String; + metadata.value = (char *)value.UTF8String; + // The trailing \0 isn't encoded in HTTP2. + metadata.value_length = value.length; + grpc_call_add_metadata_old(_gRPCCall, &metadata, 0); +} + +// TODO(jcanizales): Rename to commitHeaders. +- (void)sendHeaders:(NSDictionary *)metadata { + for (NSString *name in metadata) { + id value = metadata[name]; + if ([value isKindOfClass:[NSData class]]) { + [self addHeaderWithName:name binaryValue:value]; + } else if ([value isKindOfClass:[NSString class]]) { + [self addHeaderWithName:name ASCIIValue:value]; + } + } +} + +#pragma mark GRXWriteable implementation + +// Only called from the call queue. The error handler will be called from the +// network queue if the write didn't succeed. +- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler { + + __weak GRPCCall *weakSelf = self; + GRPCEventHandler resumingHandler = ^(grpc_event *event) { + if (event->data.write_accepted != GRPC_OP_OK) { + errorHandler(); + } + // Resume the request writer (even in the case of error). + // TODO(jcanizales): No need to do it in the case of errors anymore? + GRPCCall *strongSelf = weakSelf; + if (strongSelf) { + strongSelf->_requestWriter.state = GRXWriterStateStarted; + } + }; + + grpc_byte_buffer *buffer = message.grpc_byteBuffer; + AssertNoErrorInCall(grpc_call_start_write_old(_gRPCCall, + buffer, + (__bridge_retained void *)resumingHandler, + 0)); + grpc_byte_buffer_destroy(buffer); +} + +- (void)didReceiveValue:(id)value { + // TODO(jcanizales): Throw/assert if value isn't NSData. + + // Pause the input and only resume it when the C layer notifies us that writes + // can proceed. + _requestWriter.state = GRXWriterStatePaused; + + __weak GRPCCall *weakSelf = self; + dispatch_async(_callQueue, ^{ + [weakSelf writeMessage:value withErrorHandler:^{ + [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeInternal + userInfo:nil]]; + }]; + }); +} + +// Only called from the call queue. The error handler will be called from the +// network queue if the requests stream couldn't be closed successfully. +- (void)finishRequestWithErrorHandler:(void (^)())errorHandler { + GRPCEventHandler handler = ^(grpc_event *event) { + if (event->data.finish_accepted != GRPC_OP_OK) { + errorHandler(); + } + }; + AssertNoErrorInCall(grpc_call_writes_done_old(_gRPCCall, (__bridge_retained void *)handler)); +} + +- (void)didFinishWithError:(NSError *)errorOrNil { + if (errorOrNil) { + [self cancel]; + } else { + __weak GRPCCall *weakSelf = self; + dispatch_async(_callQueue, ^{ + [weakSelf finishRequestWithErrorHandler:^{ + [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain + code:GRPCErrorCodeInternal + userInfo:nil]]; + }]; + }); + } +} + +#pragma mark Invoke + +// Both handlers will eventually be called, from the network queue. Writes can start immediately +// after this. +// The first one (metadataHandler), when the response headers are received. +// The second one (completionHandler), whenever the RPC finishes for any reason. +- (void)invokeCallWithMetadataHandler:(GRPCEventHandler)metadataHandler + completionHandler:(GRPCEventHandler)completionHandler { + AssertNoErrorInCall(grpc_call_invoke_old(_gRPCCall, + _completionQueue.unmanagedQueue, + (__bridge_retained void *)metadataHandler, + (__bridge_retained void *)completionHandler, + 0)); +} + +- (void)invokeCall { + __weak GRPCCall *weakSelf = self; + [self invokeCallWithMetadataHandler:^(grpc_event *event) { + // Response metadata received. + // TODO(jcanizales): Name the type of event->data.client_metadata_read + // in the C library so one can actually pass the object to a method. + grpc_metadata *entries = event->data.client_metadata_read.elements; + size_t count = event->data.client_metadata_read.count; + GRPCCall *strongSelf = weakSelf; + if (strongSelf) { + strongSelf.responseMetadata = [NSDictionary grpc_dictionaryFromMetadata:entries + count:count]; + [strongSelf startNextRead]; + } + } completionHandler:^(grpc_event *event) { + // TODO(jcanizales): Merge HTTP2 trailers into response metadata. + [weakSelf finishWithError:[NSError grpc_errorFromStatus:&event->data.finished]]; + }]; + // Now that the RPC has been initiated, request writes can start. + [_requestWriter startWithWriteable:self]; +} + +#pragma mark GRXWriter implementation + +- (void)startWithWriteable:(id<GRXWriteable>)writeable { + // The following produces a retain cycle self:_responseWriteable:self, which is only + // broken when didFinishWithError: is sent to the wrapped writeable. + // Care is taken not to retain self strongly in any of the blocks used in + // the implementation of GRPCCall, so that the life of the instance is + // determined by this retain cycle. + _responseWriteable = [[GRPCDelegateWrapper alloc] initWithWriteable:writeable writer:self]; + [self sendHeaders:_requestMetadata]; + [self invokeCall]; +} + +- (void)setState:(GRXWriterState)newState { + // Manual transitions are only allowed from the started or paused states. + if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) { + return; + } + + switch (newState) { + case GRXWriterStateFinished: + _state = newState; + // Per GRXWriter's contract, setting the state to Finished manually + // means one doesn't wish the writeable to be messaged anymore. + [_responseWriteable cancelSilently]; + _responseWriteable = nil; + return; + case GRXWriterStatePaused: + _state = newState; + return; + case GRXWriterStateStarted: + if (_state == GRXWriterStatePaused) { + _state = newState; + [self startNextRead]; + } + return; + case GRXWriterStateNotStarted: + return; + } +} +@end diff --git a/src/objective-c/GRPCClient/GRPCMethodName.h b/src/objective-c/GRPCClient/GRPCMethodName.h new file mode 100644 index 0000000000..4fb86d2099 --- /dev/null +++ b/src/objective-c/GRPCClient/GRPCMethodName.h @@ -0,0 +1,15 @@ +#import <Foundation/Foundation.h> + +// See the README file for an introduction to this library. + +// A fully-qualified gRPC method name. Full qualification is needed because a gRPC endpoint can +// implement multiple interfaces. +// TODO(jcanizales): Is this proto-specific, or actual part of gRPC? If the former, move one layer up. +@interface GRPCMethodName : NSObject +@property(nonatomic, readonly) NSString *package; +@property(nonatomic, readonly) NSString *interface; +@property(nonatomic, readonly) NSString *method; +- (instancetype)initWithPackage:(NSString *)package + interface:(NSString *)interface + method:(NSString *)method; +@end diff --git a/src/objective-c/GRPCClient/GRPCMethodName.m b/src/objective-c/GRPCClient/GRPCMethodName.m new file mode 100644 index 0000000000..be9fd4b85b --- /dev/null +++ b/src/objective-c/GRPCClient/GRPCMethodName.m @@ -0,0 +1,14 @@ +#import "GRPCMethodName.h" + +@implementation GRPCMethodName +- (instancetype)initWithPackage:(NSString *)package + interface:(NSString *)interface + method:(NSString *)method { + if ((self = [super init])) { + _package = [package copy]; + _interface = [interface copy]; + _method = [method copy]; + } + return self; +} +@end diff --git a/src/objective-c/GRPCClient/README.md b/src/objective-c/GRPCClient/README.md new file mode 100644 index 0000000000..9b87f0316c --- /dev/null +++ b/src/objective-c/GRPCClient/README.md @@ -0,0 +1,4 @@ +This is a generic gRPC client for Objective-C on iOS. + +If you're trying to get started with the library or with gRPC, you should first +read GRPCCall.h. diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h new file mode 100644 index 0000000000..8772acc12f --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCChannel.h @@ -0,0 +1,17 @@ +#import <Foundation/Foundation.h> + +struct grpc_channel; + +// Each separate instance of this class represents at least one TCP +// connection to the provided host. To create a grpc_call, pass the +// value of the unmanagedChannel property to grpc_channel_create_call. +// Release this object when the call is finished. +@interface GRPCChannel : NSObject +@property(nonatomic, readonly) struct grpc_channel *unmanagedChannel; + +// Convenience constructor to allow for reuse of connections. ++ (instancetype)channelToHost:(NSString *)host; + +// Designated initializer +- (instancetype)initWithHost:(NSString *)host; +@end diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m new file mode 100644 index 0000000000..af4a01ee05 --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCChannel.m @@ -0,0 +1,32 @@ +#import "GRPCChannel.h" + +#import <grpc.h> + +@implementation GRPCChannel + ++ (instancetype)channelToHost:(NSString *)host { + // TODO(jcanizales): Reuse channels. + return [[self alloc] initWithHost:host]; +} + +- (instancetype)init { + return [self initWithHost:nil]; +} + +// Designated initializer +- (instancetype)initWithHost:(NSString *)host { + if (!host) { + [NSException raise:NSInvalidArgumentException format:@"Host can't be nil."]; + } + if ((self = [super init])) { + _unmanagedChannel = grpc_channel_create(host.UTF8String, NULL); + } + return self; +} + +- (void)dealloc { + // TODO(jcanizales): Be sure to add a test with a server that closes the connection prematurely, + // as in the past that made this call to crash. + grpc_channel_destroy(_unmanagedChannel); +} +@end diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h new file mode 100644 index 0000000000..503df94dd4 --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h @@ -0,0 +1,21 @@ +#import <Foundation/Foundation.h> + +struct grpc_completion_queue; +struct grpc_event; + +typedef void(^GRPCEventHandler)(struct grpc_event *event); + +// This class lets one more easily use grpc_completion_queue. To use it, pass +// the value of the unmanagedQueue property of an instance of this class to +// grpc_call_start_invoke. Then for every grpc_call_* method that accepts a tag, +// you can pass a block of type GRPCEventHandler (remembering to cast it using +// __bridge_retained). The block is guaranteed to eventually be called, by a +// concurrent queue, and then released. Each such block is passed a pointer to +// the grpc_event that carried it (in event->tag). +// Release the GRPCCompletionQueue object only after you are not going to pass +// any more blocks to the grpc_call that's using it. +@interface GRPCCompletionQueue : NSObject +@property(nonatomic, readonly) struct grpc_completion_queue *unmanagedQueue; + ++ (instancetype)completionQueue; +@end diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m new file mode 100644 index 0000000000..d2508daec4 --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m @@ -0,0 +1,73 @@ +#import "GRPCCompletionQueue.h" + +#import <grpc.h> + +@implementation GRPCCompletionQueue + ++ (instancetype)completionQueue { + // TODO(jcanizales): Reuse completion queues to consume only one thread, + // instead of one per call. + return [[self alloc] init]; +} + +- (instancetype)init { + if ((self = [super init])) { + _unmanagedQueue = grpc_completion_queue_create(); + + // This is for the following block to capture the pointer by value (instead + // of retaining self and doing self->_unmanagedQueue). This is essential + // because the block doesn't end until after grpc_completion_queue_shutdown + // is called, and we only want that to happen after nobody's using the queue + // anymore (i.e. on self dealloc). So the block would never end if it + // retained self. + grpc_completion_queue *unmanagedQueue = _unmanagedQueue; + + // Start a loop on a concurrent queue to read events from the completion + // queue and dispatch each. + static dispatch_once_t initialization; + static dispatch_queue_t gDefaultConcurrentQueue; + dispatch_once(&initialization, ^{ + gDefaultConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0); + }); + dispatch_async(gDefaultConcurrentQueue, ^{ + while (YES) { + // The following call blocks until an event is available. + grpc_event *event = grpc_completion_queue_next(unmanagedQueue, gpr_inf_future); + switch (event->type) { + case GRPC_WRITE_ACCEPTED: + case GRPC_FINISH_ACCEPTED: + case GRPC_CLIENT_METADATA_READ: + case GRPC_READ: + case GRPC_FINISHED: + if (event->tag) { + GRPCEventHandler handler = (__bridge_transfer GRPCEventHandler) event->tag; + handler(event); + } + grpc_event_finish(event); + continue; + case GRPC_QUEUE_SHUTDOWN: + grpc_completion_queue_destroy(unmanagedQueue); + grpc_event_finish(event); + return; + case GRPC_SERVER_RPC_NEW: + NSAssert(NO, @"C gRPC library produced a server-only event."); + continue; + } + // This means the C gRPC library produced an event that wasn't known + // when this library was written. To preserve evolvability, ignore the + // unknown event on release builds. + NSAssert(NO, @"C gRPC library produced an unknown event."); + }; + }); + } + return self; +} + +- (void)dealloc { + // This makes the completion queue produce a GRPC_QUEUE_SHUTDOWN event *after* + // all other pending events are flushed. What this means is all the blocks + // passed to the gRPC C library as void* are eventually called, even if some + // are called after self is dealloc'd. + grpc_completion_queue_shutdown(_unmanagedQueue); +} +@end diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h new file mode 100644 index 0000000000..70a07f817f --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h @@ -0,0 +1,48 @@ +#import <Foundation/Foundation.h> + +@protocol GRXWriteable; +@protocol GRXWriter; + +// This is a thread-safe wrapper over a GRXWriteable instance. It lets one +// enqueue calls to a GRXWriteable instance for the main thread, guaranteeing +// that didFinishWithError: is the last message sent to it (no matter what +// messages are sent to the wrapper, in what order, nor from which thread). It +// also guarantees that, if cancelWithError: is called from the main thread +// (e.g. by the app cancelling the writes), no further messages are sent to the +// writeable except didFinishWithError:. +// +// TODO(jcanizales): Let the user specify another queue for the writeable +// callbacks. +// TODO(jcanizales): Rename to GRXWriteableWrapper and move to the Rx library. +@interface GRPCDelegateWrapper : NSObject + +// The GRXWriteable passed is the wrapped writeable. +// Both the GRXWriter instance and the GRXWriteable instance are retained until +// didFinishWithError: is sent to the writeable, and released after that. +// This is used to create a retain cycle that keeps both objects alive until the +// writing is explicitly finished. +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(id<GRXWriter>)writer + NS_DESIGNATED_INITIALIZER; + +// Enqueues didReceiveValue: to be sent to the writeable in the main thread. +// The passed handler is invoked from the main thread after didReceiveValue: +// returns. +- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler; + +// Enqueues didFinishWithError:nil to be sent to the writeable in the main +// thread. After that message is sent to the writeable, all other methods of +// this object are effectively noops. +- (void)enqueueSuccessfulCompletion; + +// If the writeable has not yet received a didFinishWithError: message, this +// will enqueue one to be sent to it in the main thread, and cancel all other +// pending messages to the writeable enqueued by this object (both past and +// future). +// The error argument cannot be nil. +- (void)cancelWithError:(NSError *)error; + +// Cancels all pending messages to the writeable enqueued by this object (both +// past and future). Because the writeable won't receive didFinishWithError:, +// this also releases the writeable and the writer. +- (void)cancelSilently; +@end diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m new file mode 100644 index 0000000000..7c64850d7e --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m @@ -0,0 +1,87 @@ +#import "GRPCDelegateWrapper.h" + +#import <net/grpc/objc/RxLibrary/GRXWriteable.h> + +@interface GRPCDelegateWrapper () +// These are atomic so that cancellation can nillify them from any thread. +@property(atomic, strong) id<GRXWriteable> writeable; +@property(atomic, strong) id<GRXWriter> writer; +@end + +@implementation GRPCDelegateWrapper { + dispatch_queue_t _writeableQueue; + // This ensures that didFinishWithError: is only sent once to the writeable. + dispatch_once_t _alreadyFinished; +} + +- (instancetype)init { + return [self initWithWriteable:nil writer:nil]; +} + +// Designated initializer +- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(id<GRXWriter>)writer { + if (self = [super init]) { + _writeableQueue = dispatch_get_main_queue(); + _writeable = writeable; + _writer = writer; + } + return self; +} + +- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler { + dispatch_async(_writeableQueue, ^{ + // We're racing a possible cancellation performed by another thread. To turn + // all already-enqueued messages into noops, cancellation nillifies the + // writeable property. If we get it before it's nil, we won + // the race. + id<GRXWriteable> writeable = self.writeable; + if (writeable) { + [writeable didReceiveValue:message]; + handler(); + } + }); +} + +- (void)enqueueSuccessfulCompletion { + dispatch_async(_writeableQueue, ^{ + dispatch_once(&_alreadyFinished, ^{ + // Cancellation is now impossible. None of the other three blocks can run + // concurrently with this one. + [self.writeable didFinishWithError:nil]; + // Break the retain cycle with writer, and skip any possible message to the + // wrapped writeable enqueued after this one. + self.writeable = nil; + self.writer = nil; + }); + }); +} + +- (void)cancelWithError:(NSError *)error { + NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion."); + dispatch_once(&_alreadyFinished, ^{ + // Skip any of the still-enqueued messages to the wrapped writeable. We use + // the atomic setter to nillify writer and writeable because we might be + // running concurrently with the blocks in _writeableQueue, and assignment + // with ARC isn't atomic. + id<GRXWriteable> writeable = self.writeable; + self.writeable = nil; + + dispatch_async(_writeableQueue, ^{ + [writeable didFinishWithError:error]; + // Break the retain cycle with writer. + self.writer = nil; + }); + }); +} + +- (void)cancelSilently { + dispatch_once(&_alreadyFinished, ^{ + // Skip any of the still-enqueued messages to the wrapped writeable. We use + // the atomic setter to nillify writer and writeable because we might be + // running concurrently with the blocks in _writeableQueue, and assignment + // with ARC isn't atomic. + self.writeable = nil; + self.writer = nil; + }); +} +@end diff --git a/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h new file mode 100644 index 0000000000..504c669f92 --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h @@ -0,0 +1,7 @@ +#import <Foundation/Foundation.h> + +#import "GRPCMethodName.h" + +@interface GRPCMethodName (HTTP2Encoding) +- (NSString *)HTTP2Path; +@end diff --git a/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m new file mode 100644 index 0000000000..2e9fe8d60b --- /dev/null +++ b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m @@ -0,0 +1,11 @@ +#import "GRPCMethodName+HTTP2Encoding.h" + +@implementation GRPCMethodName (HTTP2Encoding) +- (NSString *)HTTP2Path { + if (self.package) { + return [NSString stringWithFormat:@"/%@.%@/%@", self.package, self.interface, self.method]; + } else { + return [NSString stringWithFormat:@"/%@/%@", self.interface, self.method]; + } +} +@end diff --git a/src/objective-c/GRPCClient/private/NSData+GRPC.h b/src/objective-c/GRPCClient/private/NSData+GRPC.h new file mode 100644 index 0000000000..8cb7b76ebc --- /dev/null +++ b/src/objective-c/GRPCClient/private/NSData+GRPC.h @@ -0,0 +1,8 @@ +#import <Foundation/Foundation.h> + +struct grpc_byte_buffer; + +@interface NSData (GRPC) ++ (instancetype)grpc_dataWithByteBuffer:(struct grpc_byte_buffer *)buffer; +- (struct grpc_byte_buffer *)grpc_byteBuffer; +@end diff --git a/src/objective-c/GRPCClient/private/NSData+GRPC.m b/src/objective-c/GRPCClient/private/NSData+GRPC.m new file mode 100644 index 0000000000..47f7a07d7a --- /dev/null +++ b/src/objective-c/GRPCClient/private/NSData+GRPC.m @@ -0,0 +1,53 @@ +#import "NSData+GRPC.h" + +#include <byte_buffer.h> +#include <string.h> + +// TODO(jcanizales): Move these two incantations to the C library. + +static void CopyByteBufferToCharArray(grpc_byte_buffer *buffer, char *array) { + size_t offset = 0; + grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer); + gpr_slice next; + while (grpc_byte_buffer_reader_next(reader, &next) != 0){ + memcpy(array + offset, GPR_SLICE_START_PTR(next), (size_t) GPR_SLICE_LENGTH(next)); + offset += GPR_SLICE_LENGTH(next); + gpr_slice_unref(next); + } + grpc_byte_buffer_reader_destroy(reader); +} + +static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array, size_t length) { + gpr_slice slice = gpr_slice_from_copied_buffer(array, length); + grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + return buffer; +} + +@implementation NSData (GRPC) ++ (instancetype)grpc_dataWithByteBuffer:(grpc_byte_buffer *)buffer { + NSUInteger length = grpc_byte_buffer_length(buffer); + char *array = malloc(length * sizeof(*array)); + if (!array) { + // TODO(jcanizales): grpc_byte_buffer is reference-counted, so we can + // prevent this memory problem by implementing a subclass of NSData + // that wraps the grpc_byte_buffer. Then enumerateByteRangesUsingBlock: + // can be implemented using a grpc_byte_buffer_reader. + return nil; + } + CopyByteBufferToCharArray(buffer, array); + return [self dataWithBytesNoCopy:array length:length freeWhenDone:YES]; +} + +- (grpc_byte_buffer *)grpc_byteBuffer { + // Some implementations of NSData, as well as grpc_byte_buffer, support O(1) + // appending of byte arrays by not using internally a single contiguous memory + // block for representation. + // The following implementation is thus not optimal, sometimes requiring two + // copies (one by self.bytes and another by gpr_slice_from_copied_buffer). + // If it turns out to be an issue, we can use enumerateByteRangesUsingblock: + // to create an array of gpr_slice objects to pass to grpc_byte_buffer_create. + // That would make it do exactly one copy, always. + return CopyCharArrayToNewByteBuffer((const char *)self.bytes, (size_t)self.length); +} +@end diff --git a/src/objective-c/GRPCClient/private/NSDictionary+GRPC.h b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.h new file mode 100644 index 0000000000..b717b108e4 --- /dev/null +++ b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.h @@ -0,0 +1,7 @@ +#import <Foundation/Foundation.h> + +struct grpc_metadata; + +@interface NSDictionary (GRPC) ++ (instancetype)grpc_dictionaryFromMetadata:(struct grpc_metadata *)entries count:(size_t)count; +@end diff --git a/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m new file mode 100644 index 0000000000..a24396d3a9 --- /dev/null +++ b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m @@ -0,0 +1,23 @@ +#import "NSDictionary+GRPC.h" + +#include <grpc.h> + +@implementation NSDictionary (GRPC) ++ (instancetype)grpc_dictionaryFromMetadata:(grpc_metadata *)entries count:(size_t)count { + NSMutableDictionary *metadata = [NSMutableDictionary dictionaryWithCapacity:count]; + for (grpc_metadata *entry = entries; entry < entries + count; entry++) { + // TODO(jcanizales): Verify in a C library test that it's converting header names to lower case automatically. + NSString *name = [NSString stringWithUTF8String:entry->key]; + if (!name) { + continue; + } + if (!metadata[name]) { + metadata[name] = [NSMutableArray array]; + } + // TODO(jcanizales): Should we use a non-copy constructor? + [metadata[name] addObject:[NSData dataWithBytes:entry->value + length:entry->value_length]]; + } + return metadata; +} +@end diff --git a/src/objective-c/GRPCClient/private/NSError+GRPC.h b/src/objective-c/GRPCClient/private/NSError+GRPC.h new file mode 100644 index 0000000000..949d1dd819 --- /dev/null +++ b/src/objective-c/GRPCClient/private/NSError+GRPC.h @@ -0,0 +1,41 @@ +#import <Foundation/Foundation.h> + +// TODO(jcanizales): Make the domain string public. +extern NSString *const kGRPCErrorDomain; + +// TODO(jcanizales): Make this public and document each code. +typedef NS_ENUM(NSInteger, GRPCErrorCode) { + GRPCErrorCodeCancelled = 1, + GRPCErrorCodeUnknown = 2, + GRPCErrorCodeInvalidArgument = 3, + GRPCErrorCodeDeadlineExceeded = 4, + GRPCErrorCodeNotFound = 5, + GRPCErrorCodeAlreadyExists = 6, + GRPCErrorCodePermissionDenied = 7, + GRPCErrorCodeUnauthenticated = 16, + GRPCErrorCodeResourceExhausted = 8, + GRPCErrorCodeFailedPrecondition = 9, + GRPCErrorCodeAborted = 10, + GRPCErrorCodeOutOfRange = 11, + GRPCErrorCodeUnimplemented = 12, + GRPCErrorCodeInternal = 13, + GRPCErrorCodeUnavailable = 14, + GRPCErrorCodeDataLoss = 15 +}; + +// TODO(jcanizales): This is conflating trailing metadata with Status details. Fix it once there's +// a decision on how to codify Status. +#include <grpc/status.h> +struct grpc_metadata; +struct grpc_status { + grpc_status_code status; + const char *details; + size_t metadata_count; + struct grpc_metadata *metadata_elements; +}; + +@interface NSError (GRPC) +// Returns nil if the status is OK. Otherwise, a NSError whose code is one of +// GRPCErrorCode and whose domain is kGRPCErrorDomain. ++ (instancetype)grpc_errorFromStatus:(struct grpc_status *)status; +@end diff --git a/src/objective-c/GRPCClient/private/NSError+GRPC.m b/src/objective-c/GRPCClient/private/NSError+GRPC.m new file mode 100644 index 0000000000..73ce112f15 --- /dev/null +++ b/src/objective-c/GRPCClient/private/NSError+GRPC.m @@ -0,0 +1,18 @@ +#import "NSError+GRPC.h" + +#include <grpc.h> + +NSString *const kGRPCErrorDomain = @"org.grpc"; + +@implementation NSError (GRPC) ++ (instancetype)grpc_errorFromStatus:(struct grpc_status *)status { + if (status->status == GRPC_STATUS_OK) { + return nil; + } + NSString *message = + [NSString stringWithFormat:@"Code=%i Message='%s'", status->status, status->details]; + return [NSError errorWithDomain:kGRPCErrorDomain + code:status->status + userInfo:@{NSLocalizedDescriptionKey: message}]; +} +@end |