aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-01-28 13:53:40 -0800
committerGravatar Craig Tiller <ctiller@google.com>2016-01-28 13:53:40 -0800
commit6c39686dfa4307bb38ca0d3af78f7a72413d0198 (patch)
treea4e3eecadab8e9c60b31f8dd76ee9b647f2164b9 /src/core/surface
parentddd91dbb42f6f52655738c6520548d04ea5f8468 (diff)
Preparatory changes for work shedding
- cleanup: change grpc_iomgr_cb_func to take a bool instead of int success - cleanup: follow through with iomgr callback scheduling functions - prepare: add a workqueue to offload to to grpc_exec_ctx_enqueue* functions
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c22
-rw-r--r--src/core/surface/channel.c4
-rw-r--r--src/core/surface/channel_connectivity.c4
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/channel_ping.c2
-rw-r--r--src/core/surface/completion_queue.c4
-rw-r--r--src/core/surface/lame_client.c4
-rw-r--r--src/core/surface/secure_channel_create.c4
-rw-r--r--src/core/surface/server.c30
-rw-r--r--src/core/surface/server_chttp2.c2
10 files changed, 41 insertions, 39 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 880666bb38..9495e748b5 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -229,9 +229,9 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
- int success);
+ bool success);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- int success);
+ bool success);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask,
@@ -351,7 +351,7 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
-static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) {
+static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
size_t i;
int ii;
grpc_call *c = call;
@@ -688,13 +688,13 @@ typedef struct cancel_closure {
grpc_status_code status;
} cancel_closure;
-static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) {
+static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
cancel_closure *cc = ccp;
GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel");
gpr_free(cc);
}
-static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) {
+static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
grpc_transport_stream_op op;
cancel_closure *cc = ccp;
memset(&op, 0, sizeof(op));
@@ -721,7 +721,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
cc->call = c;
cc->status = status;
GRPC_CALL_INTERNAL_REF(c, "cancel");
- grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL);
return GRPC_CALL_OK;
}
@@ -757,7 +757,7 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
-static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_call *call = arg;
gpr_mu_lock(&call->mu);
call->have_alarm = 0;
@@ -934,7 +934,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
if (bctl->is_notify_tag_closure) {
- grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
+ grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(uint8_t)(bctl->call->used_batches &
@@ -974,7 +974,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
}
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- int success) {
+ bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
@@ -993,7 +993,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
-static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) {
+static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
grpc_call *child_call;
@@ -1066,7 +1066,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) {
}
static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- int success) {
+ bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 001692376f..ffbc2ca971 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -80,7 +80,7 @@ struct grpc_channel {
/* the protobuf library will (by default) start warning at 100megs */
#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
-static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, int success);
+static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success);
grpc_channel *grpc_channel_create_from_filters(
grpc_exec_ctx *exec_ctx, const char *target,
@@ -268,7 +268,7 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx,
}
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
grpc_channel *channel = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 10f5c4da4d..e2973bba6e 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -165,11 +165,11 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
}
}
-static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
+static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
partly_done(exec_ctx, pw, 1);
}
-static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
+static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
partly_done(exec_ctx, pw, 0);
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 49083f0870..b3ee203ddc 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -80,11 +80,11 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
connector_unref(exec_ctx, arg);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->tcp;
diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c
index b4ce282787..4a63f29c6a 100644
--- a/src/core/surface/channel_ping.c
+++ b/src/core/surface/channel_ping.c
@@ -53,7 +53,7 @@ static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(arg);
}
-static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
ping_result *pr = arg;
grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr,
&pr->completion_storage);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c0db9c508a..5ac02f5b94 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -86,7 +86,7 @@ static gpr_mu g_freelist_mu;
grpc_completion_queue *g_freelist;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
- int success);
+ bool success);
void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); }
@@ -169,7 +169,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) {
}
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
grpc_completion_queue *cc = arg;
GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index a60e9d20da..b6d468ed26 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -78,8 +78,8 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
} else if (op->recv_trailing_metadata != NULL) {
fill_metadata(elem, op->recv_trailing_metadata);
}
- grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0);
- grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 552a570713..0a7d51d97e 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -128,14 +128,14 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
connector *c = arg;
grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base,
c->connecting_endpoint,
on_secure_handshake_done, c);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->newly_connecting_endpoint;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 0928f1e045..42cffccb4c 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -260,7 +260,7 @@ struct shutdown_cleanup_args {
};
static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_status_ignored) {
+ bool iomgr_status_ignored) {
struct shutdown_cleanup_args *a = arg;
gpr_slice_unref(a->slice);
gpr_free(a);
@@ -313,7 +313,7 @@ static void request_matcher_destroy(request_matcher *rm) {
gpr_stack_lockfree_destroy(rm->requests);
}
-static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) {
+static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
@@ -328,7 +328,7 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
}
}
@@ -392,7 +392,7 @@ static void orphan_channel(channel_data *chand) {
}
static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
- int success) {
+ bool success) {
channel_data *chand = cd;
grpc_server *server = chand->server;
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server");
@@ -407,7 +407,8 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
maybe_finish_shutdown(exec_ctx, chand->server);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true,
+ NULL);
}
static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
@@ -420,7 +421,7 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
return;
}
@@ -569,7 +570,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
- int success) {
+ bool success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
gpr_timespec op_deadline;
@@ -609,7 +610,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
- int success) {
+ bool success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
if (success) {
@@ -620,7 +621,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
@@ -653,7 +654,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
}
static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
- int iomgr_status_ignored) {
+ bool iomgr_status_ignored) {
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
@@ -985,7 +986,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
}
static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s,
- int success) {
+ bool success) {
grpc_server *server = s;
gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++;
@@ -1140,7 +1141,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true,
+ NULL);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
@@ -1229,7 +1231,7 @@ done:
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
- void *user_data, int success);
+ void *user_data, bool success);
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
@@ -1315,7 +1317,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
- int success) {
+ bool success) {
requested_call *rc = prc;
grpc_call *call = *rc->call;
grpc_call_element *elem =
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 6e21d2dcd7..ce970dfe73 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -82,7 +82,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_unref(exec_ctx, tcp);
- grpc_exec_ctx_enqueue(exec_ctx, destroy_done, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, destroy_done, true, NULL);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {