aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/objective-c/GRPCClient
diff options
context:
space:
mode:
authorGravatar Jorge Canizales <jcanizales@google.com>2015-02-17 18:23:58 -0800
committerGravatar Jorge Canizales <jcanizales@google.com>2015-02-17 18:23:58 -0800
commit5e0efd95f785bc3a82fa2b7b67b2442625653efa (patch)
tree479cf5b81282a4f7df98664fd85ddf1b4139d5e6 /src/objective-c/GRPCClient
parent30697c9be2ff01e9f33e0934b58877fc3d11f516 (diff)
Imports code of the generic gRPC client library.
Diffstat (limited to 'src/objective-c/GRPCClient')
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.h57
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m373
-rw-r--r--src/objective-c/GRPCClient/GRPCMethodName.h15
-rw-r--r--src/objective-c/GRPCClient/GRPCMethodName.m14
-rw-r--r--src/objective-c/GRPCClient/README.md4
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.h17
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.m32
-rw-r--r--src/objective-c/GRPCClient/private/GRPCCompletionQueue.h21
-rw-r--r--src/objective-c/GRPCClient/private/GRPCCompletionQueue.m73
-rw-r--r--src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h48
-rw-r--r--src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m87
-rw-r--r--src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h7
-rw-r--r--src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m11
-rw-r--r--src/objective-c/GRPCClient/private/NSData+GRPC.h8
-rw-r--r--src/objective-c/GRPCClient/private/NSData+GRPC.m53
-rw-r--r--src/objective-c/GRPCClient/private/NSDictionary+GRPC.h7
-rw-r--r--src/objective-c/GRPCClient/private/NSDictionary+GRPC.m23
-rw-r--r--src/objective-c/GRPCClient/private/NSError+GRPC.h41
-rw-r--r--src/objective-c/GRPCClient/private/NSError+GRPC.m18
19 files changed, 909 insertions, 0 deletions
diff --git a/src/objective-c/GRPCClient/GRPCCall.h b/src/objective-c/GRPCClient/GRPCCall.h
new file mode 100644
index 0000000000..db138fd1ee
--- /dev/null
+++ b/src/objective-c/GRPCClient/GRPCCall.h
@@ -0,0 +1,57 @@
+#import <Foundation/Foundation.h>
+#import <RxLibrary/GRXWriter.h>
+
+@class GRPCMethodName;
+
+@class GRPCCall;
+
+// The gRPC protocol is an RPC protocol on top of HTTP2.
+//
+// While the most common type of RPC receives only one request message and
+// returns only one response message, the protocol also supports RPCs that
+// return multiple individual messages in a streaming fashion, RPCs that
+// accept a stream of request messages, or RPCs with both streaming requests
+// and responses.
+//
+// Conceptually, each gRPC call consists of a bidirectional stream of binary
+// messages, with RPCs of the "non-streaming type" sending only one message in
+// the corresponding direction (the protocol doesn't make any distinction).
+//
+// Each RPC uses a different HTTP2 stream, and thus multiple simultaneous RPCs
+// can be multiplexed transparently on the same TCP connection.
+@interface GRPCCall : NSObject<GRXWriter>
+
+// These HTTP2 headers will be passed to the server as part of this call. Each
+// HTTP2 header is a name-value pair with string names and either string or binary values.
+// The passed dictionary has to use NSString keys, corresponding to the header names. The
+// value associated to each can be a NSString object or a NSData object. E.g.:
+//
+// call.requestMetadata = @{
+// @"Authorization": @"Bearer ...",
+// @"SomeBinaryHeader": someData
+// };
+//
+// After the call is started, modifying this won't have any effect.
+@property(nonatomic, readwrite) NSMutableDictionary *requestMetadata;
+
+// This isn't populated until the first event is delivered to the handler.
+@property(atomic, readonly) NSDictionary *responseMetadata;
+
+// The request writer has to write NSData objects into the provided Writeable. The server will
+// receive each of those separately and in order.
+// A gRPC call might not complete until the request writer finishes. On the other hand, the
+// request finishing doesn't necessarily make the call to finish, as the server might continue
+// sending messages to the response side of the call indefinitely (depending on the semantics of
+// the specific remote method called).
+// To finish a call right away, invoke cancel.
+- (instancetype)initWithHost:(NSString *)host
+ method:(GRPCMethodName *)method
+ requestsWriter:(id<GRXWriter>)requestsWriter NS_DESIGNATED_INITIALIZER;
+
+// Finishes the request side of this call, notifies the server that the RPC
+// should be cancelled, and finishes the response side of the call with an error
+// of code CANCELED.
+- (void)cancel;
+
+// TODO(jcanizales): Let specify a deadline. As a category of GRXWriter?
+@end
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
new file mode 100644
index 0000000000..b9248be5db
--- /dev/null
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -0,0 +1,373 @@
+#import "GRPCCall.h"
+
+#include <grpc.h>
+#include <support/time.h>
+
+#import "GRPCMethodName.h"
+#import "private/GRPCChannel.h"
+#import "private/GRPCCompletionQueue.h"
+#import "private/GRPCDelegateWrapper.h"
+#import "private/GRPCMethodName+HTTP2Encoding.h"
+#import "private/NSData+GRPC.h"
+#import "private/NSDictionary+GRPC.h"
+#import "private/NSError+GRPC.h"
+
+// A grpc_call_error represents a precondition failure when invoking the
+// grpc_call_* functions. If one ever happens, it's a bug in this library.
+//
+// TODO(jcanizales): Can an application shut down gracefully when a thread other
+// than the main one throws an exception?
+static void AssertNoErrorInCall(grpc_call_error error) {
+ if (error != GRPC_CALL_OK) {
+ @throw [NSException exceptionWithName:NSInternalInconsistencyException
+ reason:@"Precondition of grpc_call_* not met."
+ userInfo:nil];
+ }
+}
+
+@interface GRPCCall () <GRXWriteable>
+// Makes it readwrite.
+@property(atomic, strong) NSDictionary *responseMetadata;
+@end
+
+// The following methods of a C gRPC call object aren't reentrant, and thus
+// calls to them must be serialized:
+// - add_metadata
+// - invoke
+// - start_write
+// - writes_done
+// - start_read
+// - destroy
+// The first four are called as part of responding to client commands, but
+// start_read we want to call as soon as we're notified that the RPC was
+// successfully established (which happens concurrently in the network queue).
+// Serialization is achieved by using a private serial queue to operate the
+// call object.
+// Because add_metadata and invoke are called and return successfully before
+// any of the other methods is called, they don't need to use the queue.
+//
+// Furthermore, start_write and writes_done can only be called after the
+// WRITE_ACCEPTED event for any previous write is received. This is achieved by
+// pausing the requests writer immediately every time it writes a value, and
+// resuming it again when WRITE_ACCEPTED is received.
+//
+// Similarly, start_read can only be called after the READ event for any
+// previous read is received. This is easier to enforce, as we're writing the
+// received messages into the writeable: start_read is enqueued once upon receiving
+// the CLIENT_METADATA_READ event, and then once after receiving each READ
+// event.
+@implementation GRPCCall {
+ dispatch_queue_t _callQueue;
+
+ grpc_call *_gRPCCall;
+ dispatch_once_t _callAlreadyInvoked;
+
+ GRPCChannel *_channel;
+ GRPCCompletionQueue *_completionQueue;
+
+ // The C gRPC library has less guarantees on the ordering of events than we
+ // do. Particularly, in the face of errors, there's no ordering guarantee at
+ // all. This wrapper over our actual writeable ensures thread-safety and
+ // correct ordering.
+ GRPCDelegateWrapper *_responseWriteable;
+ id<GRXWriter> _requestWriter;
+}
+
+@synthesize state = _state;
+
+- (instancetype)init {
+ return [self initWithHost:nil method:nil requestsWriter:nil];
+}
+
+// Designated initializer
+- (instancetype)initWithHost:(NSString *)host
+ method:(GRPCMethodName *)method
+ requestsWriter:(id<GRXWriter>)requestWriter {
+ if (!host || !method) {
+ [NSException raise:NSInvalidArgumentException format:@"Neither host nor method can be nil."];
+ }
+ // TODO(jcanizales): Throw if the requestWriter was already started.
+ if ((self = [super init])) {
+ static dispatch_once_t initialization;
+ dispatch_once(&initialization, ^{
+ grpc_init();
+ });
+
+ _completionQueue = [GRPCCompletionQueue completionQueue];
+
+ _channel = [GRPCChannel channelToHost:host];
+ _gRPCCall = grpc_channel_create_call_old(_channel.unmanagedChannel,
+ method.HTTP2Path.UTF8String,
+ host.UTF8String,
+ gpr_inf_future);
+
+ // Serial queue to invoke the non-reentrant methods of the grpc_call object.
+ _callQueue = dispatch_queue_create("org.grpc.call", NULL);
+
+ _requestWriter = requestWriter;
+ }
+ return self;
+}
+
+#pragma mark Finish
+
+- (void)finishWithError:(NSError *)errorOrNil {
+ _requestWriter.state = GRXWriterStateFinished;
+ _requestWriter = nil;
+ if (errorOrNil) {
+ [_responseWriteable cancelWithError:errorOrNil];
+ } else {
+ [_responseWriteable enqueueSuccessfulCompletion];
+ }
+}
+
+- (void)cancelCall {
+ // Can be called from any thread, any number of times.
+ AssertNoErrorInCall(grpc_call_cancel(_gRPCCall));
+}
+
+- (void)cancel {
+ [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeCancelled
+ userInfo:nil]];
+ [self cancelCall];
+}
+
+- (void)dealloc {
+ grpc_call *gRPCCall = _gRPCCall;
+ dispatch_async(_callQueue, ^{
+ grpc_call_destroy(gRPCCall);
+ });
+}
+
+#pragma mark Read messages
+
+// Only called from the call queue.
+// The handler will be called from the network queue.
+- (void)startReadWithHandler:(GRPCEventHandler)handler {
+ AssertNoErrorInCall(grpc_call_start_read_old(_gRPCCall, (__bridge_retained void *)handler));
+}
+
+// Called initially from the network queue once response headers are received,
+// then "recursively" from the responseWriteable queue after each response from the
+// server has been written.
+// If the call is currently paused, this is a noop. Restarting the call will invoke this
+// method.
+// TODO(jcanizales): Rename to readResponseIfNotPaused.
+- (void)startNextRead {
+ if (self.state == GRXWriterStatePaused) {
+ return;
+ }
+ __weak GRPCCall *weakSelf = self;
+ __weak GRPCDelegateWrapper *weakWriteable = _responseWriteable;
+
+ dispatch_async(_callQueue, ^{
+ [weakSelf startReadWithHandler:^(grpc_event *event) {
+ if (!event->data.read) {
+ // No more responses from the server.
+ return;
+ }
+ NSData *data = [NSData grpc_dataWithByteBuffer:event->data.read];
+ if (!data) {
+ // The app doesn't have enough memory to hold the server response. We
+ // don't want to throw, because the app shouldn't crash for a behavior
+ // that's on the hands of any server to have. Instead we finish and ask
+ // the server to cancel.
+ //
+ // TODO(jcanizales): No canonical code is appropriate for this situation
+ // (because it's just a client problem). Use another domain and an
+ // appropriately-documented code.
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeInternal
+ userInfo:nil]];
+ [weakSelf cancelCall];
+ return;
+ }
+ [weakWriteable enqueueMessage:data completionHandler:^{
+ [weakSelf startNextRead];
+ }];
+ }];
+ });
+}
+
+#pragma mark Send headers
+
+- (void)addHeaderWithName:(NSString *)name binaryValue:(NSData *)value {
+ grpc_metadata metadata;
+ // Safe to discard const qualifiers; we're not going to modify the contents.
+ metadata.key = (char *)name.UTF8String;
+ metadata.value = (char *)value.bytes;
+ metadata.value_length = value.length;
+ grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
+}
+
+- (void)addHeaderWithName:(NSString *)name ASCIIValue:(NSString *)value {
+ grpc_metadata metadata;
+ // Safe to discard const qualifiers; we're not going to modify the contents.
+ metadata.key = (char *)name.UTF8String;
+ metadata.value = (char *)value.UTF8String;
+ // The trailing \0 isn't encoded in HTTP2.
+ metadata.value_length = value.length;
+ grpc_call_add_metadata_old(_gRPCCall, &metadata, 0);
+}
+
+// TODO(jcanizales): Rename to commitHeaders.
+- (void)sendHeaders:(NSDictionary *)metadata {
+ for (NSString *name in metadata) {
+ id value = metadata[name];
+ if ([value isKindOfClass:[NSData class]]) {
+ [self addHeaderWithName:name binaryValue:value];
+ } else if ([value isKindOfClass:[NSString class]]) {
+ [self addHeaderWithName:name ASCIIValue:value];
+ }
+ }
+}
+
+#pragma mark GRXWriteable implementation
+
+// Only called from the call queue. The error handler will be called from the
+// network queue if the write didn't succeed.
+- (void)writeMessage:(NSData *)message withErrorHandler:(void (^)())errorHandler {
+
+ __weak GRPCCall *weakSelf = self;
+ GRPCEventHandler resumingHandler = ^(grpc_event *event) {
+ if (event->data.write_accepted != GRPC_OP_OK) {
+ errorHandler();
+ }
+ // Resume the request writer (even in the case of error).
+ // TODO(jcanizales): No need to do it in the case of errors anymore?
+ GRPCCall *strongSelf = weakSelf;
+ if (strongSelf) {
+ strongSelf->_requestWriter.state = GRXWriterStateStarted;
+ }
+ };
+
+ grpc_byte_buffer *buffer = message.grpc_byteBuffer;
+ AssertNoErrorInCall(grpc_call_start_write_old(_gRPCCall,
+ buffer,
+ (__bridge_retained void *)resumingHandler,
+ 0));
+ grpc_byte_buffer_destroy(buffer);
+}
+
+- (void)didReceiveValue:(id)value {
+ // TODO(jcanizales): Throw/assert if value isn't NSData.
+
+ // Pause the input and only resume it when the C layer notifies us that writes
+ // can proceed.
+ _requestWriter.state = GRXWriterStatePaused;
+
+ __weak GRPCCall *weakSelf = self;
+ dispatch_async(_callQueue, ^{
+ [weakSelf writeMessage:value withErrorHandler:^{
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeInternal
+ userInfo:nil]];
+ }];
+ });
+}
+
+// Only called from the call queue. The error handler will be called from the
+// network queue if the requests stream couldn't be closed successfully.
+- (void)finishRequestWithErrorHandler:(void (^)())errorHandler {
+ GRPCEventHandler handler = ^(grpc_event *event) {
+ if (event->data.finish_accepted != GRPC_OP_OK) {
+ errorHandler();
+ }
+ };
+ AssertNoErrorInCall(grpc_call_writes_done_old(_gRPCCall, (__bridge_retained void *)handler));
+}
+
+- (void)didFinishWithError:(NSError *)errorOrNil {
+ if (errorOrNil) {
+ [self cancel];
+ } else {
+ __weak GRPCCall *weakSelf = self;
+ dispatch_async(_callQueue, ^{
+ [weakSelf finishRequestWithErrorHandler:^{
+ [weakSelf finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
+ code:GRPCErrorCodeInternal
+ userInfo:nil]];
+ }];
+ });
+ }
+}
+
+#pragma mark Invoke
+
+// Both handlers will eventually be called, from the network queue. Writes can start immediately
+// after this.
+// The first one (metadataHandler), when the response headers are received.
+// The second one (completionHandler), whenever the RPC finishes for any reason.
+- (void)invokeCallWithMetadataHandler:(GRPCEventHandler)metadataHandler
+ completionHandler:(GRPCEventHandler)completionHandler {
+ AssertNoErrorInCall(grpc_call_invoke_old(_gRPCCall,
+ _completionQueue.unmanagedQueue,
+ (__bridge_retained void *)metadataHandler,
+ (__bridge_retained void *)completionHandler,
+ 0));
+}
+
+- (void)invokeCall {
+ __weak GRPCCall *weakSelf = self;
+ [self invokeCallWithMetadataHandler:^(grpc_event *event) {
+ // Response metadata received.
+ // TODO(jcanizales): Name the type of event->data.client_metadata_read
+ // in the C library so one can actually pass the object to a method.
+ grpc_metadata *entries = event->data.client_metadata_read.elements;
+ size_t count = event->data.client_metadata_read.count;
+ GRPCCall *strongSelf = weakSelf;
+ if (strongSelf) {
+ strongSelf.responseMetadata = [NSDictionary grpc_dictionaryFromMetadata:entries
+ count:count];
+ [strongSelf startNextRead];
+ }
+ } completionHandler:^(grpc_event *event) {
+ // TODO(jcanizales): Merge HTTP2 trailers into response metadata.
+ [weakSelf finishWithError:[NSError grpc_errorFromStatus:&event->data.finished]];
+ }];
+ // Now that the RPC has been initiated, request writes can start.
+ [_requestWriter startWithWriteable:self];
+}
+
+#pragma mark GRXWriter implementation
+
+- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ // The following produces a retain cycle self:_responseWriteable:self, which is only
+ // broken when didFinishWithError: is sent to the wrapped writeable.
+ // Care is taken not to retain self strongly in any of the blocks used in
+ // the implementation of GRPCCall, so that the life of the instance is
+ // determined by this retain cycle.
+ _responseWriteable = [[GRPCDelegateWrapper alloc] initWithWriteable:writeable writer:self];
+ [self sendHeaders:_requestMetadata];
+ [self invokeCall];
+}
+
+- (void)setState:(GRXWriterState)newState {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
+ return;
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
+ _state = newState;
+ // Per GRXWriter's contract, setting the state to Finished manually
+ // means one doesn't wish the writeable to be messaged anymore.
+ [_responseWriteable cancelSilently];
+ _responseWriteable = nil;
+ return;
+ case GRXWriterStatePaused:
+ _state = newState;
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ [self startNextRead];
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
+}
+@end
diff --git a/src/objective-c/GRPCClient/GRPCMethodName.h b/src/objective-c/GRPCClient/GRPCMethodName.h
new file mode 100644
index 0000000000..4fb86d2099
--- /dev/null
+++ b/src/objective-c/GRPCClient/GRPCMethodName.h
@@ -0,0 +1,15 @@
+#import <Foundation/Foundation.h>
+
+// See the README file for an introduction to this library.
+
+// A fully-qualified gRPC method name. Full qualification is needed because a gRPC endpoint can
+// implement multiple interfaces.
+// TODO(jcanizales): Is this proto-specific, or actual part of gRPC? If the former, move one layer up.
+@interface GRPCMethodName : NSObject
+@property(nonatomic, readonly) NSString *package;
+@property(nonatomic, readonly) NSString *interface;
+@property(nonatomic, readonly) NSString *method;
+- (instancetype)initWithPackage:(NSString *)package
+ interface:(NSString *)interface
+ method:(NSString *)method;
+@end
diff --git a/src/objective-c/GRPCClient/GRPCMethodName.m b/src/objective-c/GRPCClient/GRPCMethodName.m
new file mode 100644
index 0000000000..be9fd4b85b
--- /dev/null
+++ b/src/objective-c/GRPCClient/GRPCMethodName.m
@@ -0,0 +1,14 @@
+#import "GRPCMethodName.h"
+
+@implementation GRPCMethodName
+- (instancetype)initWithPackage:(NSString *)package
+ interface:(NSString *)interface
+ method:(NSString *)method {
+ if ((self = [super init])) {
+ _package = [package copy];
+ _interface = [interface copy];
+ _method = [method copy];
+ }
+ return self;
+}
+@end
diff --git a/src/objective-c/GRPCClient/README.md b/src/objective-c/GRPCClient/README.md
new file mode 100644
index 0000000000..9b87f0316c
--- /dev/null
+++ b/src/objective-c/GRPCClient/README.md
@@ -0,0 +1,4 @@
+This is a generic gRPC client for Objective-C on iOS.
+
+If you're trying to get started with the library or with gRPC, you should first
+read GRPCCall.h.
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h
new file mode 100644
index 0000000000..8772acc12f
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.h
@@ -0,0 +1,17 @@
+#import <Foundation/Foundation.h>
+
+struct grpc_channel;
+
+// Each separate instance of this class represents at least one TCP
+// connection to the provided host. To create a grpc_call, pass the
+// value of the unmanagedChannel property to grpc_channel_create_call.
+// Release this object when the call is finished.
+@interface GRPCChannel : NSObject
+@property(nonatomic, readonly) struct grpc_channel *unmanagedChannel;
+
+// Convenience constructor to allow for reuse of connections.
++ (instancetype)channelToHost:(NSString *)host;
+
+// Designated initializer
+- (instancetype)initWithHost:(NSString *)host;
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m
new file mode 100644
index 0000000000..af4a01ee05
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.m
@@ -0,0 +1,32 @@
+#import "GRPCChannel.h"
+
+#import <grpc.h>
+
+@implementation GRPCChannel
+
++ (instancetype)channelToHost:(NSString *)host {
+ // TODO(jcanizales): Reuse channels.
+ return [[self alloc] initWithHost:host];
+}
+
+- (instancetype)init {
+ return [self initWithHost:nil];
+}
+
+// Designated initializer
+- (instancetype)initWithHost:(NSString *)host {
+ if (!host) {
+ [NSException raise:NSInvalidArgumentException format:@"Host can't be nil."];
+ }
+ if ((self = [super init])) {
+ _unmanagedChannel = grpc_channel_create(host.UTF8String, NULL);
+ }
+ return self;
+}
+
+- (void)dealloc {
+ // TODO(jcanizales): Be sure to add a test with a server that closes the connection prematurely,
+ // as in the past that made this call to crash.
+ grpc_channel_destroy(_unmanagedChannel);
+}
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h
new file mode 100644
index 0000000000..503df94dd4
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.h
@@ -0,0 +1,21 @@
+#import <Foundation/Foundation.h>
+
+struct grpc_completion_queue;
+struct grpc_event;
+
+typedef void(^GRPCEventHandler)(struct grpc_event *event);
+
+// This class lets one more easily use grpc_completion_queue. To use it, pass
+// the value of the unmanagedQueue property of an instance of this class to
+// grpc_call_start_invoke. Then for every grpc_call_* method that accepts a tag,
+// you can pass a block of type GRPCEventHandler (remembering to cast it using
+// __bridge_retained). The block is guaranteed to eventually be called, by a
+// concurrent queue, and then released. Each such block is passed a pointer to
+// the grpc_event that carried it (in event->tag).
+// Release the GRPCCompletionQueue object only after you are not going to pass
+// any more blocks to the grpc_call that's using it.
+@interface GRPCCompletionQueue : NSObject
+@property(nonatomic, readonly) struct grpc_completion_queue *unmanagedQueue;
+
++ (instancetype)completionQueue;
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
new file mode 100644
index 0000000000..d2508daec4
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
@@ -0,0 +1,73 @@
+#import "GRPCCompletionQueue.h"
+
+#import <grpc.h>
+
+@implementation GRPCCompletionQueue
+
++ (instancetype)completionQueue {
+ // TODO(jcanizales): Reuse completion queues to consume only one thread,
+ // instead of one per call.
+ return [[self alloc] init];
+}
+
+- (instancetype)init {
+ if ((self = [super init])) {
+ _unmanagedQueue = grpc_completion_queue_create();
+
+ // This is for the following block to capture the pointer by value (instead
+ // of retaining self and doing self->_unmanagedQueue). This is essential
+ // because the block doesn't end until after grpc_completion_queue_shutdown
+ // is called, and we only want that to happen after nobody's using the queue
+ // anymore (i.e. on self dealloc). So the block would never end if it
+ // retained self.
+ grpc_completion_queue *unmanagedQueue = _unmanagedQueue;
+
+ // Start a loop on a concurrent queue to read events from the completion
+ // queue and dispatch each.
+ static dispatch_once_t initialization;
+ static dispatch_queue_t gDefaultConcurrentQueue;
+ dispatch_once(&initialization, ^{
+ gDefaultConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
+ });
+ dispatch_async(gDefaultConcurrentQueue, ^{
+ while (YES) {
+ // The following call blocks until an event is available.
+ grpc_event *event = grpc_completion_queue_next(unmanagedQueue, gpr_inf_future);
+ switch (event->type) {
+ case GRPC_WRITE_ACCEPTED:
+ case GRPC_FINISH_ACCEPTED:
+ case GRPC_CLIENT_METADATA_READ:
+ case GRPC_READ:
+ case GRPC_FINISHED:
+ if (event->tag) {
+ GRPCEventHandler handler = (__bridge_transfer GRPCEventHandler) event->tag;
+ handler(event);
+ }
+ grpc_event_finish(event);
+ continue;
+ case GRPC_QUEUE_SHUTDOWN:
+ grpc_completion_queue_destroy(unmanagedQueue);
+ grpc_event_finish(event);
+ return;
+ case GRPC_SERVER_RPC_NEW:
+ NSAssert(NO, @"C gRPC library produced a server-only event.");
+ continue;
+ }
+ // This means the C gRPC library produced an event that wasn't known
+ // when this library was written. To preserve evolvability, ignore the
+ // unknown event on release builds.
+ NSAssert(NO, @"C gRPC library produced an unknown event.");
+ };
+ });
+ }
+ return self;
+}
+
+- (void)dealloc {
+ // This makes the completion queue produce a GRPC_QUEUE_SHUTDOWN event *after*
+ // all other pending events are flushed. What this means is all the blocks
+ // passed to the gRPC C library as void* are eventually called, even if some
+ // are called after self is dealloc'd.
+ grpc_completion_queue_shutdown(_unmanagedQueue);
+}
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h
new file mode 100644
index 0000000000..70a07f817f
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.h
@@ -0,0 +1,48 @@
+#import <Foundation/Foundation.h>
+
+@protocol GRXWriteable;
+@protocol GRXWriter;
+
+// This is a thread-safe wrapper over a GRXWriteable instance. It lets one
+// enqueue calls to a GRXWriteable instance for the main thread, guaranteeing
+// that didFinishWithError: is the last message sent to it (no matter what
+// messages are sent to the wrapper, in what order, nor from which thread). It
+// also guarantees that, if cancelWithError: is called from the main thread
+// (e.g. by the app cancelling the writes), no further messages are sent to the
+// writeable except didFinishWithError:.
+//
+// TODO(jcanizales): Let the user specify another queue for the writeable
+// callbacks.
+// TODO(jcanizales): Rename to GRXWriteableWrapper and move to the Rx library.
+@interface GRPCDelegateWrapper : NSObject
+
+// The GRXWriteable passed is the wrapped writeable.
+// Both the GRXWriter instance and the GRXWriteable instance are retained until
+// didFinishWithError: is sent to the writeable, and released after that.
+// This is used to create a retain cycle that keeps both objects alive until the
+// writing is explicitly finished.
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(id<GRXWriter>)writer
+ NS_DESIGNATED_INITIALIZER;
+
+// Enqueues didReceiveValue: to be sent to the writeable in the main thread.
+// The passed handler is invoked from the main thread after didReceiveValue:
+// returns.
+- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler;
+
+// Enqueues didFinishWithError:nil to be sent to the writeable in the main
+// thread. After that message is sent to the writeable, all other methods of
+// this object are effectively noops.
+- (void)enqueueSuccessfulCompletion;
+
+// If the writeable has not yet received a didFinishWithError: message, this
+// will enqueue one to be sent to it in the main thread, and cancel all other
+// pending messages to the writeable enqueued by this object (both past and
+// future).
+// The error argument cannot be nil.
+- (void)cancelWithError:(NSError *)error;
+
+// Cancels all pending messages to the writeable enqueued by this object (both
+// past and future). Because the writeable won't receive didFinishWithError:,
+// this also releases the writeable and the writer.
+- (void)cancelSilently;
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m
new file mode 100644
index 0000000000..7c64850d7e
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCDelegateWrapper.m
@@ -0,0 +1,87 @@
+#import "GRPCDelegateWrapper.h"
+
+#import <net/grpc/objc/RxLibrary/GRXWriteable.h>
+
+@interface GRPCDelegateWrapper ()
+// These are atomic so that cancellation can nillify them from any thread.
+@property(atomic, strong) id<GRXWriteable> writeable;
+@property(atomic, strong) id<GRXWriter> writer;
+@end
+
+@implementation GRPCDelegateWrapper {
+ dispatch_queue_t _writeableQueue;
+ // This ensures that didFinishWithError: is only sent once to the writeable.
+ dispatch_once_t _alreadyFinished;
+}
+
+- (instancetype)init {
+ return [self initWithWriteable:nil writer:nil];
+}
+
+// Designated initializer
+- (instancetype)initWithWriteable:(id<GRXWriteable>)writeable writer:(id<GRXWriter>)writer {
+ if (self = [super init]) {
+ _writeableQueue = dispatch_get_main_queue();
+ _writeable = writeable;
+ _writer = writer;
+ }
+ return self;
+}
+
+- (void)enqueueMessage:(NSData *)message completionHandler:(void (^)())handler {
+ dispatch_async(_writeableQueue, ^{
+ // We're racing a possible cancellation performed by another thread. To turn
+ // all already-enqueued messages into noops, cancellation nillifies the
+ // writeable property. If we get it before it's nil, we won
+ // the race.
+ id<GRXWriteable> writeable = self.writeable;
+ if (writeable) {
+ [writeable didReceiveValue:message];
+ handler();
+ }
+ });
+}
+
+- (void)enqueueSuccessfulCompletion {
+ dispatch_async(_writeableQueue, ^{
+ dispatch_once(&_alreadyFinished, ^{
+ // Cancellation is now impossible. None of the other three blocks can run
+ // concurrently with this one.
+ [self.writeable didFinishWithError:nil];
+ // Break the retain cycle with writer, and skip any possible message to the
+ // wrapped writeable enqueued after this one.
+ self.writeable = nil;
+ self.writer = nil;
+ });
+ });
+}
+
+- (void)cancelWithError:(NSError *)error {
+ NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
+ dispatch_once(&_alreadyFinished, ^{
+ // Skip any of the still-enqueued messages to the wrapped writeable. We use
+ // the atomic setter to nillify writer and writeable because we might be
+ // running concurrently with the blocks in _writeableQueue, and assignment
+ // with ARC isn't atomic.
+ id<GRXWriteable> writeable = self.writeable;
+ self.writeable = nil;
+
+ dispatch_async(_writeableQueue, ^{
+ [writeable didFinishWithError:error];
+ // Break the retain cycle with writer.
+ self.writer = nil;
+ });
+ });
+}
+
+- (void)cancelSilently {
+ dispatch_once(&_alreadyFinished, ^{
+ // Skip any of the still-enqueued messages to the wrapped writeable. We use
+ // the atomic setter to nillify writer and writeable because we might be
+ // running concurrently with the blocks in _writeableQueue, and assignment
+ // with ARC isn't atomic.
+ self.writeable = nil;
+ self.writer = nil;
+ });
+}
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h
new file mode 100644
index 0000000000..504c669f92
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.h
@@ -0,0 +1,7 @@
+#import <Foundation/Foundation.h>
+
+#import "GRPCMethodName.h"
+
+@interface GRPCMethodName (HTTP2Encoding)
+- (NSString *)HTTP2Path;
+@end
diff --git a/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m
new file mode 100644
index 0000000000..2e9fe8d60b
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/GRPCMethodName+HTTP2Encoding.m
@@ -0,0 +1,11 @@
+#import "GRPCMethodName+HTTP2Encoding.h"
+
+@implementation GRPCMethodName (HTTP2Encoding)
+- (NSString *)HTTP2Path {
+ if (self.package) {
+ return [NSString stringWithFormat:@"/%@.%@/%@", self.package, self.interface, self.method];
+ } else {
+ return [NSString stringWithFormat:@"/%@/%@", self.interface, self.method];
+ }
+}
+@end
diff --git a/src/objective-c/GRPCClient/private/NSData+GRPC.h b/src/objective-c/GRPCClient/private/NSData+GRPC.h
new file mode 100644
index 0000000000..8cb7b76ebc
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/NSData+GRPC.h
@@ -0,0 +1,8 @@
+#import <Foundation/Foundation.h>
+
+struct grpc_byte_buffer;
+
+@interface NSData (GRPC)
++ (instancetype)grpc_dataWithByteBuffer:(struct grpc_byte_buffer *)buffer;
+- (struct grpc_byte_buffer *)grpc_byteBuffer;
+@end
diff --git a/src/objective-c/GRPCClient/private/NSData+GRPC.m b/src/objective-c/GRPCClient/private/NSData+GRPC.m
new file mode 100644
index 0000000000..47f7a07d7a
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/NSData+GRPC.m
@@ -0,0 +1,53 @@
+#import "NSData+GRPC.h"
+
+#include <byte_buffer.h>
+#include <string.h>
+
+// TODO(jcanizales): Move these two incantations to the C library.
+
+static void CopyByteBufferToCharArray(grpc_byte_buffer *buffer, char *array) {
+ size_t offset = 0;
+ grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer);
+ gpr_slice next;
+ while (grpc_byte_buffer_reader_next(reader, &next) != 0){
+ memcpy(array + offset, GPR_SLICE_START_PTR(next), (size_t) GPR_SLICE_LENGTH(next));
+ offset += GPR_SLICE_LENGTH(next);
+ gpr_slice_unref(next);
+ }
+ grpc_byte_buffer_reader_destroy(reader);
+}
+
+static grpc_byte_buffer *CopyCharArrayToNewByteBuffer(const char *array, size_t length) {
+ gpr_slice slice = gpr_slice_from_copied_buffer(array, length);
+ grpc_byte_buffer *buffer = grpc_byte_buffer_create(&slice, 1);
+ gpr_slice_unref(slice);
+ return buffer;
+}
+
+@implementation NSData (GRPC)
++ (instancetype)grpc_dataWithByteBuffer:(grpc_byte_buffer *)buffer {
+ NSUInteger length = grpc_byte_buffer_length(buffer);
+ char *array = malloc(length * sizeof(*array));
+ if (!array) {
+ // TODO(jcanizales): grpc_byte_buffer is reference-counted, so we can
+ // prevent this memory problem by implementing a subclass of NSData
+ // that wraps the grpc_byte_buffer. Then enumerateByteRangesUsingBlock:
+ // can be implemented using a grpc_byte_buffer_reader.
+ return nil;
+ }
+ CopyByteBufferToCharArray(buffer, array);
+ return [self dataWithBytesNoCopy:array length:length freeWhenDone:YES];
+}
+
+- (grpc_byte_buffer *)grpc_byteBuffer {
+ // Some implementations of NSData, as well as grpc_byte_buffer, support O(1)
+ // appending of byte arrays by not using internally a single contiguous memory
+ // block for representation.
+ // The following implementation is thus not optimal, sometimes requiring two
+ // copies (one by self.bytes and another by gpr_slice_from_copied_buffer).
+ // If it turns out to be an issue, we can use enumerateByteRangesUsingblock:
+ // to create an array of gpr_slice objects to pass to grpc_byte_buffer_create.
+ // That would make it do exactly one copy, always.
+ return CopyCharArrayToNewByteBuffer((const char *)self.bytes, (size_t)self.length);
+}
+@end
diff --git a/src/objective-c/GRPCClient/private/NSDictionary+GRPC.h b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.h
new file mode 100644
index 0000000000..b717b108e4
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.h
@@ -0,0 +1,7 @@
+#import <Foundation/Foundation.h>
+
+struct grpc_metadata;
+
+@interface NSDictionary (GRPC)
++ (instancetype)grpc_dictionaryFromMetadata:(struct grpc_metadata *)entries count:(size_t)count;
+@end
diff --git a/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m
new file mode 100644
index 0000000000..a24396d3a9
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/NSDictionary+GRPC.m
@@ -0,0 +1,23 @@
+#import "NSDictionary+GRPC.h"
+
+#include <grpc.h>
+
+@implementation NSDictionary (GRPC)
++ (instancetype)grpc_dictionaryFromMetadata:(grpc_metadata *)entries count:(size_t)count {
+ NSMutableDictionary *metadata = [NSMutableDictionary dictionaryWithCapacity:count];
+ for (grpc_metadata *entry = entries; entry < entries + count; entry++) {
+ // TODO(jcanizales): Verify in a C library test that it's converting header names to lower case automatically.
+ NSString *name = [NSString stringWithUTF8String:entry->key];
+ if (!name) {
+ continue;
+ }
+ if (!metadata[name]) {
+ metadata[name] = [NSMutableArray array];
+ }
+ // TODO(jcanizales): Should we use a non-copy constructor?
+ [metadata[name] addObject:[NSData dataWithBytes:entry->value
+ length:entry->value_length]];
+ }
+ return metadata;
+}
+@end
diff --git a/src/objective-c/GRPCClient/private/NSError+GRPC.h b/src/objective-c/GRPCClient/private/NSError+GRPC.h
new file mode 100644
index 0000000000..949d1dd819
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/NSError+GRPC.h
@@ -0,0 +1,41 @@
+#import <Foundation/Foundation.h>
+
+// TODO(jcanizales): Make the domain string public.
+extern NSString *const kGRPCErrorDomain;
+
+// TODO(jcanizales): Make this public and document each code.
+typedef NS_ENUM(NSInteger, GRPCErrorCode) {
+ GRPCErrorCodeCancelled = 1,
+ GRPCErrorCodeUnknown = 2,
+ GRPCErrorCodeInvalidArgument = 3,
+ GRPCErrorCodeDeadlineExceeded = 4,
+ GRPCErrorCodeNotFound = 5,
+ GRPCErrorCodeAlreadyExists = 6,
+ GRPCErrorCodePermissionDenied = 7,
+ GRPCErrorCodeUnauthenticated = 16,
+ GRPCErrorCodeResourceExhausted = 8,
+ GRPCErrorCodeFailedPrecondition = 9,
+ GRPCErrorCodeAborted = 10,
+ GRPCErrorCodeOutOfRange = 11,
+ GRPCErrorCodeUnimplemented = 12,
+ GRPCErrorCodeInternal = 13,
+ GRPCErrorCodeUnavailable = 14,
+ GRPCErrorCodeDataLoss = 15
+};
+
+// TODO(jcanizales): This is conflating trailing metadata with Status details. Fix it once there's
+// a decision on how to codify Status.
+#include <grpc/status.h>
+struct grpc_metadata;
+struct grpc_status {
+ grpc_status_code status;
+ const char *details;
+ size_t metadata_count;
+ struct grpc_metadata *metadata_elements;
+};
+
+@interface NSError (GRPC)
+// Returns nil if the status is OK. Otherwise, a NSError whose code is one of
+// GRPCErrorCode and whose domain is kGRPCErrorDomain.
++ (instancetype)grpc_errorFromStatus:(struct grpc_status *)status;
+@end
diff --git a/src/objective-c/GRPCClient/private/NSError+GRPC.m b/src/objective-c/GRPCClient/private/NSError+GRPC.m
new file mode 100644
index 0000000000..73ce112f15
--- /dev/null
+++ b/src/objective-c/GRPCClient/private/NSError+GRPC.m
@@ -0,0 +1,18 @@
+#import "NSError+GRPC.h"
+
+#include <grpc.h>
+
+NSString *const kGRPCErrorDomain = @"org.grpc";
+
+@implementation NSError (GRPC)
++ (instancetype)grpc_errorFromStatus:(struct grpc_status *)status {
+ if (status->status == GRPC_STATUS_OK) {
+ return nil;
+ }
+ NSString *message =
+ [NSString stringWithFormat:@"Code=%i Message='%s'", status->status, status->details];
+ return [NSError errorWithDomain:kGRPCErrorDomain
+ code:status->status
+ userInfo:@{NSLocalizedDescriptionKey: message}];
+}
+@end