aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-22 10:51:35 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-22 10:51:35 -0700
commit3de4b47e2b4f0b4b93602e2d066eec43215bdc5b (patch)
treec9f343cf9eebacbf8a238c5a26ac18a96d1dae53 /src
parente9fad15b0bd1c42fcb379e04eb4cbbc53a925d2c (diff)
parent8222b19eb14273df8124cb5f46f629836e72f6ad (diff)
Merge github.com:grpc/grpc into one-shouldnt-depend-on-protobufs
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/call.c34
-rw-r--r--src/core/surface/channel.c9
-rw-r--r--src/core/surface/channel.h1
-rw-r--r--src/core/surface/server.c102
-rw-r--r--src/cpp/server/server.cc18
-rw-r--r--src/objective-c/GRPCClient/private/GRPCWrappedCall.h32
-rw-r--r--src/objective-c/GRPCClient/private/GRPCWrappedCall.m180
7 files changed, 209 insertions, 167 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index cf0a595147..6e2714db0b 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -214,6 +214,9 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
+ /** Compression level for the call */
+ grpc_compression_level compression_level;
+
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -402,6 +405,8 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
static void set_status_code(grpc_call *call, status_source source,
gpr_uint32 status) {
+ if (call->status[source].is_set) return;
+
call->status[source].is_set = 1;
call->status[source].code = status;
@@ -410,6 +415,11 @@ static void set_status_code(grpc_call *call, status_source source,
}
}
+static void set_decode_compression_level(grpc_call *call,
+ grpc_compression_level clevel) {
+ call->compression_level = clevel;
+}
+
static void set_status_details(grpc_call *call, status_source source,
grpc_mdstr *status) {
if (call->status[source].details != NULL) {
@@ -1169,6 +1179,28 @@ static gpr_uint32 decode_status(grpc_mdelem *md) {
return status;
}
+/* just as for status above, we need to offset: metadata userdata can't hold a
+ * zero (null), which in this case is used to signal no compression */
+#define COMPRESS_OFFSET 1
+static void destroy_compression(void *ignored) {}
+
+static gpr_uint32 decode_compression(grpc_mdelem *md) {
+ grpc_compression_level clevel;
+ void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
+ if (user_data) {
+ clevel = ((grpc_compression_level)(gpr_intptr)user_data) - COMPRESS_OFFSET;
+ } else {
+ if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
+ GPR_SLICE_LENGTH(md->value->slice),
+ &clevel)) {
+ clevel = GRPC_COMPRESS_LEVEL_NONE; /* could not parse, no compression */
+ }
+ grpc_mdelem_set_user_data(md, destroy_compression,
+ (void *)(gpr_intptr)(clevel + COMPRESS_OFFSET));
+ }
+ return clevel;
+}
+
static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
grpc_linked_mdelem *l;
grpc_metadata_array *dest;
@@ -1184,6 +1216,8 @@ static void recv_metadata(grpc_call *call, grpc_metadata_batch *md) {
set_status_code(call, STATUS_FROM_WIRE, decode_status(md));
} else if (key == grpc_channel_get_message_string(call->channel)) {
set_status_details(call, STATUS_FROM_WIRE, grpc_mdstr_ref(md->value));
+ } else if (key == grpc_channel_get_compresssion_level_string(call->channel)) {
+ set_decode_compression_level(call, decode_compression(md));
} else {
dest = &call->buffered_metadata[is_trailing];
if (dest->count == dest->capacity) {
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 9ecdd374a9..a3c4dcebc1 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -64,6 +64,7 @@ struct grpc_channel {
grpc_mdctx *metadata_context;
/** mdstr for the grpc-status key */
grpc_mdstr *grpc_status_string;
+ grpc_mdstr *grpc_compression_level_string;
grpc_mdstr *grpc_message_string;
grpc_mdstr *path_string;
grpc_mdstr *authority_string;
@@ -98,6 +99,8 @@ grpc_channel *grpc_channel_create_from_filters(
gpr_ref_init(&channel->refs, 1 + is_client);
channel->metadata_context = mdctx;
channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status");
+ channel->grpc_compression_level_string =
+ grpc_mdstr_from_string(mdctx, "grpc-compression-level");
channel->grpc_message_string = grpc_mdstr_from_string(mdctx, "grpc-message");
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
char buf[GPR_LTOA_MIN_BUFSIZE];
@@ -205,6 +208,7 @@ static void destroy_channel(void *p, int ok) {
grpc_mdelem_unref(channel->grpc_status_elem[i]);
}
grpc_mdstr_unref(channel->grpc_status_string);
+ grpc_mdstr_unref(channel->grpc_compression_level_string);
grpc_mdstr_unref(channel->grpc_message_string);
grpc_mdstr_unref(channel->path_string);
grpc_mdstr_unref(channel->authority_string);
@@ -269,6 +273,11 @@ grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel) {
return channel->grpc_status_string;
}
+grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel) {
+ return channel->grpc_compression_level_string;
+}
+
+
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel, int i) {
if (i >= 0 && i < NUM_CACHED_STATUS_ELEMS) {
return grpc_mdelem_ref(channel->grpc_status_elem[i]);
diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h
index ba3c0abac9..3c04676b43 100644
--- a/src/core/surface/channel.h
+++ b/src/core/surface/channel.h
@@ -53,6 +53,7 @@ grpc_mdctx *grpc_channel_get_metadata_context(grpc_channel *channel);
grpc_mdelem *grpc_channel_get_reffed_status_elem(grpc_channel *channel,
int status_code);
grpc_mdstr *grpc_channel_get_status_string(grpc_channel *channel);
+grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel);
grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel);
gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index c8ac559a0d..546b17c1ff 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -141,7 +141,15 @@ struct grpc_server {
grpc_pollset **pollsets;
size_t cq_count;
- gpr_mu mu;
+ /* The two following mutexes control access to server-state
+ mu_global controls access to non-call-related state (e.g., channel state)
+ mu_call controls access to call-related state (e.g., the call lists)
+
+ If they are ever required to be nested, you must lock mu_global
+ before mu_call. This is currently used in shutdown processing
+ (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
+ gpr_mu mu_global; /* mutex for server and channel state */
+ gpr_mu mu_call; /* mutex for call-specific state */
registered_method *registered_methods;
requested_call_array requested_calls;
@@ -200,6 +208,8 @@ static void begin_call(grpc_server *server, call_data *calld,
static void fail_call(grpc_server *server, requested_call *rc);
static void shutdown_channel(channel_data *chand, int send_goaway,
int send_disconnect);
+/* Before calling maybe_finish_shutdown, we must hold mu_global and not
+ hold mu_call */
static void maybe_finish_shutdown(grpc_server *server);
static int call_list_join(call_data **root, call_data *call, call_list list) {
@@ -273,7 +283,8 @@ static void server_delete(grpc_server *server) {
registered_method *rm;
size_t i;
grpc_channel_args_destroy(server->channel_args);
- gpr_mu_destroy(&server->mu);
+ gpr_mu_destroy(&server->mu_global);
+ gpr_mu_destroy(&server->mu_call);
gpr_free(server->channel_filters);
requested_call_array_destroy(&server->requested_calls);
while ((rm = server->registered_methods) != NULL) {
@@ -335,11 +346,11 @@ static void finish_start_new_rpc_and_unlock(grpc_server *server,
if (array->count == 0) {
calld->state = PENDING;
call_list_join(pending_root, calld, PENDING_START);
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
} else {
rc = array->calls[--array->count];
calld->state = ACTIVATED;
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, &rc);
}
}
@@ -352,7 +363,7 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_uint32 hash;
channel_registered_method *rm;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_call);
if (chand->registered_methods && calld->path && calld->host) {
/* TODO(ctiller): unify these two searches */
/* check for an exact match with host */
@@ -404,11 +415,16 @@ static void maybe_finish_shutdown(grpc_server *server) {
if (!server->shutdown || server->shutdown_published) {
return;
}
+
+ gpr_mu_lock(&server->mu_call);
if (server->lists[ALL_CALLS] != NULL) {
gpr_log(GPR_DEBUG,
"Waiting for all calls to finish before destroying server");
+ gpr_mu_unlock(&server->mu_call);
return;
}
+ gpr_mu_unlock(&server->mu_call);
+
if (server->root_channel_data.next != &server->root_channel_data) {
gpr_log(GPR_DEBUG,
"Waiting for all channels to close before destroying server");
@@ -452,6 +468,7 @@ static void server_on_recv(void *ptr, int success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
+ int remove_res;
if (success && !calld->got_initial_metadata) {
size_t i;
@@ -476,16 +493,16 @@ static void server_on_recv(void *ptr, int success) {
case GRPC_STREAM_SEND_CLOSED:
break;
case GRPC_STREAM_RECV_CLOSED:
- gpr_mu_lock(&chand->server->mu);
+ gpr_mu_lock(&chand->server->mu_call);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
- gpr_mu_unlock(&chand->server->mu);
+ gpr_mu_unlock(&chand->server->mu_call);
break;
case GRPC_STREAM_CLOSED:
- gpr_mu_lock(&chand->server->mu);
+ gpr_mu_lock(&chand->server->mu_call);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
@@ -496,10 +513,13 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
- if (call_list_remove(calld, ALL_CALLS)) {
+ remove_res = call_list_remove(calld, ALL_CALLS);
+ gpr_mu_unlock(&chand->server->mu_call);
+ gpr_mu_lock(&chand->server->mu_global);
+ if (remove_res) {
decrement_call_count(chand);
}
- gpr_mu_unlock(&chand->server->mu);
+ gpr_mu_unlock(&chand->server->mu_global);
break;
}
@@ -542,10 +562,10 @@ static void channel_op(grpc_channel_element *elem,
case GRPC_TRANSPORT_CLOSED:
/* if the transport is closed for a server channel, we destroy the
channel */
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_global);
server_ref(server);
destroy_channel(chand);
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
server_unref(server);
break;
case GRPC_TRANSPORT_GOAWAY:
@@ -612,10 +632,13 @@ static void init_call_elem(grpc_call_element *elem,
calld->deadline = gpr_inf_future;
calld->call = grpc_call_from_top_element(elem);
- gpr_mu_lock(&chand->server->mu);
+ gpr_mu_lock(&chand->server->mu_call);
call_list_join(&chand->server->lists[ALL_CALLS], calld, ALL_CALLS);
+ gpr_mu_unlock(&chand->server->mu_call);
+
+ gpr_mu_lock(&chand->server->mu_global);
chand->num_calls++;
- gpr_mu_unlock(&chand->server->mu);
+ gpr_mu_unlock(&chand->server->mu_global);
server_ref(chand->server);
@@ -628,14 +651,16 @@ static void destroy_call_elem(grpc_call_element *elem) {
int removed[CALL_LIST_COUNT];
size_t i;
- gpr_mu_lock(&chand->server->mu);
+ gpr_mu_lock(&chand->server->mu_call);
for (i = 0; i < CALL_LIST_COUNT; i++) {
removed[i] = call_list_remove(elem->call_data, i);
}
+ gpr_mu_unlock(&chand->server->mu_call);
if (removed[ALL_CALLS]) {
+ gpr_mu_lock(&chand->server->mu_global);
decrement_call_count(chand);
+ gpr_mu_unlock(&chand->server->mu_global);
}
- gpr_mu_unlock(&chand->server->mu);
if (calld->host) {
grpc_mdstr_unref(calld->host);
@@ -678,12 +703,12 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
gpr_free(chand->registered_methods);
}
if (chand->server) {
- gpr_mu_lock(&chand->server->mu);
+ gpr_mu_lock(&chand->server->mu_global);
chand->next->prev = chand->prev;
chand->prev->next = chand->next;
chand->next = chand->prev = chand;
maybe_finish_shutdown(chand->server);
- gpr_mu_unlock(&chand->server->mu);
+ gpr_mu_unlock(&chand->server->mu_global);
grpc_mdstr_unref(chand->path_key);
grpc_mdstr_unref(chand->authority_key);
server_unref(chand->server);
@@ -730,7 +755,8 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
memset(server, 0, sizeof(grpc_server));
- gpr_mu_init(&server->mu);
+ gpr_mu_init(&server->mu_global);
+ gpr_mu_init(&server->mu_call);
/* decremented by grpc_server_destroy */
gpr_ref_init(&server->internal_refcount, 1);
@@ -880,11 +906,11 @@ grpc_transport_setup_result grpc_server_setup_transport(
result = grpc_connected_channel_bind_transport(
grpc_channel_get_channel_stack(channel), transport);
- gpr_mu_lock(&s->mu);
+ gpr_mu_lock(&s->mu_global);
chand->next = &s->root_channel_data;
chand->prev = chand->next->prev;
chand->next->prev = chand->prev->next = chand;
- gpr_mu_unlock(&s->mu);
+ gpr_mu_unlock(&s->mu_global);
gpr_free(filters);
@@ -901,7 +927,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
shutdown_tag *sdt;
/* lock, and gather up some stuff to do */
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_global);
grpc_cq_begin_op(cq, NULL);
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
@@ -910,7 +936,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
sdt->tag = tag;
sdt->cq = cq;
if (server->shutdown) {
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
return;
}
@@ -920,6 +946,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}
/* collect all unregistered then registered calls */
+ gpr_mu_lock(&server->mu_call);
requested_calls = server->requested_calls;
memset(&server->requested_calls, 0, sizeof(server->requested_calls));
for (rm = server->registered_methods; rm; rm = rm->next) {
@@ -938,10 +965,11 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
gpr_free(rm->requested.calls);
memset(&rm->requested, 0, sizeof(rm->requested));
}
+ gpr_mu_unlock(&server->mu_call);
server->shutdown = 1;
maybe_finish_shutdown(server);
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
/* terminate all the requested calls */
for (i = 0; i < requested_calls.count; i++) {
@@ -957,10 +985,10 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
void grpc_server_listener_destroy_done(void *s) {
grpc_server *server = s;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++;
maybe_finish_shutdown(server);
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
}
void grpc_server_cancel_all_calls(grpc_server *server) {
@@ -971,12 +999,12 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
int is_first = 1;
size_t i;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_call);
GPR_ASSERT(server->shutdown);
if (!server->lists[ALL_CALLS]) {
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
return;
}
@@ -996,7 +1024,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
is_first = 0;
}
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
for (i = 0; i < call_count; i++) {
grpc_call_cancel_with_status(calls[i], GRPC_STATUS_UNAVAILABLE,
@@ -1010,7 +1038,7 @@ void grpc_server_cancel_all_calls(grpc_server *server) {
void grpc_server_destroy(grpc_server *server) {
listener *l;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_global);
GPR_ASSERT(server->shutdown || !server->listeners);
GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
@@ -1020,7 +1048,7 @@ void grpc_server_destroy(grpc_server *server) {
gpr_free(l);
}
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
server_unref(server);
}
@@ -1042,9 +1070,9 @@ static grpc_call_error queue_call_request(grpc_server *server,
requested_call *rc) {
call_data *calld = NULL;
requested_call_array *requested_calls = NULL;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_call);
if (server->shutdown) {
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
fail_call(server, rc);
return GRPC_CALL_OK;
}
@@ -1063,12 +1091,12 @@ static grpc_call_error queue_call_request(grpc_server *server,
if (calld) {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
begin_call(server, calld, rc);
return GRPC_CALL_OK;
} else {
*requested_call_array_add(requested_calls) = *rc;
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_call);
return GRPC_CALL_OK;
}
}
@@ -1212,8 +1240,8 @@ const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
int grpc_server_has_open_connections(grpc_server *server) {
int r;
- gpr_mu_lock(&server->mu);
+ gpr_mu_lock(&server->mu_global);
r = server->root_channel_data.next != &server->root_channel_data;
- gpr_mu_unlock(&server->mu);
+ gpr_mu_unlock(&server->mu_global);
return r;
}
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index ed1bb2a362..31b6a0ee00 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -67,7 +67,8 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
in_flight_(false),
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
- RpcMethod::SERVER_STREAMING) {
+ RpcMethod::SERVER_STREAMING),
+ cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
@@ -84,10 +85,16 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
+ void SetupRequest() { cq_ = grpc_completion_queue_create(); }
+
+ void TeardownRequest() {
+ grpc_completion_queue_destroy(cq_);
+ cq_ = nullptr;
+ }
+
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
- GPR_ASSERT(!in_flight_);
+ GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
- cq_ = grpc_completion_queue_create();
GPR_ASSERT(GRPC_CALL_OK ==
grpc_server_request_registered_call(
server, tag_, &call_, &deadline_, &request_metadata_,
@@ -254,6 +261,7 @@ bool Server::Start() {
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
+ m->SetupRequest();
m->Request(server_, cq_.cq());
}
@@ -384,9 +392,13 @@ void Server::RunRpc() {
if (ok) {
SyncRequest::CallData cd(this, mrd);
{
+ mrd->SetupRequest();
grpc::unique_lock<grpc::mutex> lock(mu_);
if (!shutdown_) {
mrd->Request(server_, cq_.cq());
+ } else {
+ // destroy the structure that was created
+ mrd->TeardownRequest();
}
}
cd.Run();
diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
index 4deeec0475..c08aefc6a8 100644
--- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
+++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.h
@@ -33,53 +33,51 @@
#import <Foundation/Foundation.h>
#include <grpc/grpc.h>
-#import "GRPCChannel.h"
-
-typedef void(^GRPCCompletionHandler)(NSDictionary *);
-
-@protocol GRPCOp <NSObject>
-- (void)getOp:(grpc_op *)op;
+#import "GRPCChannel.h"
+@interface GRPCOperation : NSObject
+@property(nonatomic, readonly) grpc_op op;
+// Guaranteed to be called when the operation has finished.
- (void)finish;
-
@end
-@interface GRPCOpSendMetadata : NSObject <GRPCOp>
+@interface GRPCOpSendMetadata : GRPCOperation
- (instancetype)initWithMetadata:(NSDictionary *)metadata
- handler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER;
+ handler:(void(^)())handler NS_DESIGNATED_INITIALIZER;
@end
-@interface GRPCOpSendMessage : NSObject <GRPCOp>
+@interface GRPCOpSendMessage : GRPCOperation
- (instancetype)initWithMessage:(NSData *)message
- handler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER;
+ handler:(void(^)())handler NS_DESIGNATED_INITIALIZER;
@end
-@interface GRPCOpSendClose : NSObject <GRPCOp>
+@interface GRPCOpSendClose : GRPCOperation
-- (instancetype)initWithHandler:(void(^)(void))handler NS_DESIGNATED_INITIALIZER;
+- (instancetype)initWithHandler:(void(^)())handler NS_DESIGNATED_INITIALIZER;
@end
-@interface GRPCOpRecvMetadata : NSObject <GRPCOp>
+@interface GRPCOpRecvMetadata : GRPCOperation
- (instancetype)initWithHandler:(void(^)(NSDictionary *))handler NS_DESIGNATED_INITIALIZER;
@end
-@interface GRPCOpRecvMessage : NSObject <GRPCOp>
+@interface GRPCOpRecvMessage : GRPCOperation
- (instancetype)initWithHandler:(void(^)(grpc_byte_buffer *))handler NS_DESIGNATED_INITIALIZER;
@end
-@interface GRPCOpRecvStatus : NSObject <GRPCOp>
+@interface GRPCOpRecvStatus : GRPCOperation
-- (instancetype)initWithHandler:(void(^)(NSError *, NSDictionary *))handler NS_DESIGNATED_INITIALIZER;
+- (instancetype)initWithHandler:(void(^)(NSError *, NSDictionary *))handler
+ NS_DESIGNATED_INITIALIZER;
@end
diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
index ea482b29ef..4ccd5723c6 100644
--- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
+++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
@@ -41,110 +41,85 @@
#import "NSData+GRPC.h"
#import "NSError+GRPC.h"
-@implementation GRPCOpSendMetadata{
- void(^_handler)(void);
- grpc_metadata *_sendMetadata;
- size_t _count;
+@implementation GRPCOperation {
+@protected
+ // Most operation subclasses don't set any flags in the grpc_op, and rely on the flag member being
+ // initialized to zero.
+ grpc_op _op;
+ void(^_handler)();
}
+- (void)finish {
+ if (_handler) {
+ _handler();
+ }
+}
+@end
+
+@implementation GRPCOpSendMetadata
+
- (instancetype)init {
return [self initWithMetadata:nil handler:nil];
}
-- (instancetype)initWithMetadata:(NSDictionary *)metadata handler:(void (^)(void))handler {
+- (instancetype)initWithMetadata:(NSDictionary *)metadata handler:(void (^)())handler {
if (self = [super init]) {
- _sendMetadata = [metadata grpc_metadataArray];
- _count = metadata.count;
+ _op.op = GRPC_OP_SEND_INITIAL_METADATA;
+ _op.data.send_initial_metadata.count = metadata.count;
+ _op.data.send_initial_metadata.metadata = metadata.grpc_metadataArray;
_handler = handler;
}
return self;
}
-- (void)getOp:(grpc_op *)op {
- op->op = GRPC_OP_SEND_INITIAL_METADATA;
- op->data.send_initial_metadata.count = _count;
- op->data.send_initial_metadata.metadata = _sendMetadata;
-}
-
-- (void)finish {
- if (_handler) {
- _handler();
- }
-}
-
- (void)dealloc {
- gpr_free(_sendMetadata);
+ gpr_free(_op.data.send_initial_metadata.metadata);
}
@end
-@implementation GRPCOpSendMessage{
- void(^_handler)(void);
- grpc_byte_buffer *_byteBuffer;
-}
+@implementation GRPCOpSendMessage
- (instancetype)init {
return [self initWithMessage:nil handler:nil];
}
-- (instancetype)initWithMessage:(NSData *)message handler:(void (^)(void))handler {
+- (instancetype)initWithMessage:(NSData *)message handler:(void (^)())handler {
if (!message) {
[NSException raise:NSInvalidArgumentException format:@"message cannot be nil"];
}
if (self = [super init]) {
- _byteBuffer = [message grpc_byteBuffer];
+ _op.op = GRPC_OP_SEND_MESSAGE;
+ _op.data.send_message = message.grpc_byteBuffer;
_handler = handler;
}
return self;
}
-- (void)getOp:(grpc_op *)op {
- op->op = GRPC_OP_SEND_MESSAGE;
- op->data.send_message = _byteBuffer;
-}
-
-- (void)finish {
- if (_handler) {
- _handler();
- }
-}
-
- (void)dealloc {
- gpr_free(_byteBuffer);
+ gpr_free(_op.data.send_message);
}
@end
-@implementation GRPCOpSendClose{
- void(^_handler)(void);
-}
+@implementation GRPCOpSendClose
- (instancetype)init {
return [self initWithHandler:nil];
}
-- (instancetype)initWithHandler:(void (^)(void))handler {
+- (instancetype)initWithHandler:(void (^)())handler {
if (self = [super init]) {
+ _op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
_handler = handler;
}
return self;
}
-- (void)getOp:(grpc_op *)op {
- op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
-}
-
-- (void)finish {
- if (_handler) {
- _handler();
- }
-}
-
@end
-@implementation GRPCOpRecvMetadata{
- void(^_handler)(NSDictionary *);
- grpc_metadata_array _recvInitialMetadata;
+@implementation GRPCOpRecvMetadata {
+ grpc_metadata_array _headers;
}
- (instancetype) init {
@@ -153,33 +128,27 @@
- (instancetype) initWithHandler:(void (^)(NSDictionary *))handler {
if (self = [super init]) {
- _handler = handler;
- grpc_metadata_array_init(&_recvInitialMetadata);
+ _op.op = GRPC_OP_RECV_INITIAL_METADATA;
+ grpc_metadata_array_init(&_headers);
+ _op.data.recv_initial_metadata = &_headers;
+ if (handler) {
+ _handler = ^{
+ NSDictionary *metadata = [NSDictionary grpc_dictionaryFromMetadataArray:_headers];
+ handler(metadata);
+ };
+ }
}
return self;
}
-- (void)getOp:(grpc_op *)op {
- op->op = GRPC_OP_RECV_INITIAL_METADATA;
- op->data.recv_initial_metadata = &_recvInitialMetadata;
-}
-
-- (void)finish {
- NSDictionary *metadata = [NSDictionary grpc_dictionaryFromMetadataArray:_recvInitialMetadata];
- if (_handler) {
- _handler(metadata);
- }
-}
-
- (void)dealloc {
- grpc_metadata_array_destroy(&_recvInitialMetadata);
+ grpc_metadata_array_destroy(&_headers);
}
@end
@implementation GRPCOpRecvMessage{
- void(^_handler)(grpc_byte_buffer *);
- grpc_byte_buffer *_recvMessage;
+ grpc_byte_buffer *_receivedMessage;
}
- (instancetype)init {
@@ -188,30 +157,24 @@
- (instancetype)initWithHandler:(void (^)(grpc_byte_buffer *))handler {
if (self = [super init]) {
- _handler = handler;
+ _op.op = GRPC_OP_RECV_MESSAGE;
+ _op.data.recv_message = &_receivedMessage;
+ if (handler) {
+ _handler = ^{
+ handler(_receivedMessage);
+ };
+ }
}
return self;
}
-- (void)getOp:(grpc_op *)op {
- op->op = GRPC_OP_RECV_MESSAGE;
- op->data.recv_message = &_recvMessage;
-}
-
-- (void)finish {
- if (_handler) {
- _handler(_recvMessage);
- }
-}
-
@end
@implementation GRPCOpRecvStatus{
- void(^_handler)(NSError *, NSDictionary *);
grpc_status_code _statusCode;
char *_details;
size_t _detailsCapacity;
- grpc_metadata_array _metadata;
+ grpc_metadata_array _trailers;
}
- (instancetype) init {
@@ -220,30 +183,25 @@
- (instancetype) initWithHandler:(void (^)(NSError *, NSDictionary *))handler {
if (self = [super init]) {
- _handler = handler;
- grpc_metadata_array_init(&_metadata);
+ _op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ _op.data.recv_status_on_client.status = &_statusCode;
+ _op.data.recv_status_on_client.status_details = &_details;
+ _op.data.recv_status_on_client.status_details_capacity = &_detailsCapacity;
+ grpc_metadata_array_init(&_trailers);
+ _op.data.recv_status_on_client.trailing_metadata = &_trailers;
+ if (handler) {
+ _handler = ^{
+ NSError *error = [NSError grpc_errorFromStatusCode:_statusCode details:_details];
+ NSDictionary *trailers = [NSDictionary grpc_dictionaryFromMetadataArray:_trailers];
+ handler(error, trailers);
+ };
+ }
}
return self;
}
-- (void)getOp:(grpc_op *)op {
- op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- op->data.recv_status_on_client.status = &_statusCode;
- op->data.recv_status_on_client.status_details = &_details;
- op->data.recv_status_on_client.status_details_capacity = &_detailsCapacity;
- op->data.recv_status_on_client.trailing_metadata = &_metadata;
-}
-
-- (void)finish {
- if (_handler) {
- NSError *error = [NSError grpc_errorFromStatusCode:_statusCode details:_details];
- NSDictionary *trailers = [NSDictionary grpc_dictionaryFromMetadataArray:_metadata];
- _handler(error, trailers);
- }
-}
-
- (void)dealloc {
- grpc_metadata_array_destroy(&_metadata);
+ grpc_metadata_array_destroy(&_trailers);
gpr_free(_details);
}
@@ -293,8 +251,8 @@
size_t nops = operations.count;
grpc_op *ops_array = gpr_malloc(nops * sizeof(grpc_op));
size_t i = 0;
- for (id op in operations) {
- [op getOp:&ops_array[i++]];
+ for (GRPCOperation *operation in operations) {
+ ops_array[i++] = operation.op;
}
grpc_call_error error = grpc_call_start_batch(_call, ops_array, nops,
(__bridge_retained void *)(^(bool success){
@@ -305,14 +263,16 @@
return;
}
}
- for (id<GRPCOp> operation in operations) {
+ for (GRPCOperation *operation in operations) {
[operation finish];
}
}));
-
+ gpr_free(ops_array);
+
if (error != GRPC_CALL_OK) {
[NSException raise:NSInternalInconsistencyException
- format:@"A precondition for calling grpc_call_start_batch wasn't met"];
+ format:@"A precondition for calling grpc_call_start_batch wasn't met. Error %i",
+ error];
}
}