aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar David Garcia Quintas <dgq@google.com>2016-01-29 15:05:53 -0800
committerGravatar David Garcia Quintas <dgq@google.com>2016-01-29 15:05:53 -0800
commitdd18c4a4a32f2038b16d22235e89c64a4ecb6b33 (patch)
treeda88b22ea1ade0616c201b40c83199321114a5b3 /src/core
parenta4c9218889bdc1c90146d3ca179f2afc4d3c32f2 (diff)
parented00b0366d9022bdfe47bbf4c0b4bff1bf03d195 (diff)
Merge branch 'master' of github.com:grpc/grpc into grpclb_api
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/grpc_filter.c4
-rw-r--r--src/core/channel/client_channel.c23
-rw-r--r--src/core/channel/client_uchannel.c6
-rw-r--r--src/core/channel/compress_filter.c6
-rw-r--r--src/core/channel/http_client_filter.c4
-rw-r--r--src/core/channel/http_server_filter.c4
-rw-r--r--src/core/channel/subchannel_call_holder.c18
-rw-r--r--src/core/client_config/lb_policies/pick_first.c18
-rw-r--r--src/core/client_config/lb_policies/round_robin.c14
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c6
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c6
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c4
-rw-r--r--src/core/client_config/subchannel.c18
-rw-r--r--src/core/httpcli/httpcli.c10
-rw-r--r--src/core/iomgr/closure.c8
-rw-r--r--src/core/iomgr/closure.h9
-rw-r--r--src/core/iomgr/exec_ctx.c18
-rw-r--r--src/core/iomgr/exec_ctx.h17
-rw-r--r--src/core/iomgr/executor.c8
-rw-r--r--src/core/iomgr/executor.h4
-rw-r--r--src/core/iomgr/fd_posix.c6
-rw-r--r--src/core/iomgr/iocp_windows.c4
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c6
-rw-r--r--src/core/iomgr/pollset_posix.c13
-rw-r--r--src/core/iomgr/pollset_windows.c4
-rw-r--r--src/core/iomgr/resolve_address_posix.c4
-rw-r--r--src/core/iomgr/resolve_address_windows.c4
-rw-r--r--src/core/iomgr/tcp_client_posix.c14
-rw-r--r--src/core/iomgr/tcp_client_windows.c8
-rw-r--r--src/core/iomgr/tcp_posix.c14
-rw-r--r--src/core/iomgr/tcp_server_posix.c9
-rw-r--r--src/core/iomgr/tcp_server_windows.c6
-rw-r--r--src/core/iomgr/tcp_windows.c20
-rw-r--r--src/core/iomgr/timer.c6
-rw-r--r--src/core/iomgr/workqueue.h6
-rw-r--r--src/core/iomgr/workqueue_posix.c6
-rw-r--r--src/core/security/credentials.c4
-rw-r--r--src/core/security/google_default_credentials.c4
-rw-r--r--src/core/security/handshake.c11
-rw-r--r--src/core/security/secure_endpoint.c10
-rw-r--r--src/core/security/server_auth_filter.c4
-rw-r--r--src/core/security/server_secure_chttp2.c2
-rw-r--r--src/core/surface/call.c22
-rw-r--r--src/core/surface/channel.c6
-rw-r--r--src/core/surface/channel_connectivity.c6
-rw-r--r--src/core/surface/channel_create.c6
-rw-r--r--src/core/surface/channel_ping.c4
-rw-r--r--src/core/surface/completion_queue.c6
-rw-r--r--src/core/surface/lame_client.c6
-rw-r--r--src/core/surface/secure_channel_create.c6
-rw-r--r--src/core/surface/server.c30
-rw-r--r--src/core/surface/server_chttp2.c2
-rw-r--r--src/core/transport/chttp2/internal.h2
-rw-r--r--src/core/transport/chttp2/writing.c4
-rw-r--r--src/core/transport/chttp2_transport.c34
-rw-r--r--src/core/transport/connectivity_state.c12
-rw-r--r--src/core/transport/transport.c10
-rw-r--r--src/core/transport/transport.h15
58 files changed, 287 insertions, 254 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index ee1f62f8c2..a8db32b9d5 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -91,7 +91,7 @@ static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
}
static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
- int success) {
+ bool success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a92a6ecaf2..7176c01b05 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -135,7 +135,7 @@ static void on_lb_policy_state_changed_locked(
}
static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
gpr_mu_lock(&w->chand->mu_config);
@@ -161,7 +161,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
}
static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
@@ -191,7 +191,8 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures,
+ NULL);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -249,7 +250,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
GPR_ASSERT(op->set_accept_stream == NULL);
if (op->bind_pollset != NULL) {
@@ -268,7 +269,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
if (op->send_ping != NULL) {
if (chand->lb_policy == NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->send_ping, false, NULL);
} else {
grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping);
op->bind_pollset = NULL;
@@ -310,15 +311,15 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready);
-static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
continue_picking_args *cpa = arg;
if (!success) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
} else if (cpa->connected_subchannel == NULL) {
/* cancelled, do nothing */
} else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
cpa->connected_subchannel, cpa->on_ready)) {
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, true, NULL);
}
gpr_free(cpa);
}
@@ -346,7 +347,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
cpa = closure->cb_arg;
if (cpa->connected_subchannel == connected_subchannel) {
cpa->connected_subchannel = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL);
}
}
gpr_mu_unlock(&chand->mu_config);
@@ -497,7 +498,7 @@ typedef struct {
} external_connectivity_watcher;
static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
external_connectivity_watcher *w = arg;
grpc_closure *follow_up = w->on_complete;
grpc_pollset_set_del_pollset(exec_ctx, &w->chand->interested_parties,
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
index 2c0b07d8bf..b1e7155773 100644
--- a/src/core/channel/client_uchannel.c
+++ b/src/core/channel/client_uchannel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -79,7 +79,7 @@ typedef struct client_uchannel_channel_data {
typedef grpc_subchannel_call_holder call_data;
static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
channel_data *chand = arg;
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
chand->subchannel_connectivity,
@@ -105,7 +105,7 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
GPR_ASSERT(op->set_accept_stream == NULL);
GPR_ASSERT(op->bind_pollset == NULL);
diff --git a/src/core/channel/compress_filter.c b/src/core/channel/compress_filter.c
index 3de1ef8a35..3e7ca08fd2 100644
--- a/src/core/channel/compress_filter.c
+++ b/src/core/channel/compress_filter.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -153,7 +153,7 @@ static void process_send_initial_metadata(
static void continue_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem);
-static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
+static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, bool success) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
gpr_slice_buffer_reset_and_unref(&calld->slices);
@@ -183,7 +183,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, &calld->send_op);
}
-static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
+static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, bool success) {
grpc_call_element *elem = elemp;
call_data *calld = elem->call_data;
gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 65cfb778bb..43eee046b8 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -1,5 +1,5 @@
/*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -80,7 +80,7 @@ static grpc_mdelem *client_recv_filter(void *user_data, grpc_mdelem *md) {
return md;
}
-static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
+static void hc_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
client_recv_filter_args a;
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index a10b105494..bb75323933 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -131,7 +131,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
}
-static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
+static void hs_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
if (success) {
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
index 1e8f4ba1b6..3ad9fd9efb 100644
--- a/src/core/channel/subchannel_call_holder.c
+++ b/src/core/channel/subchannel_call_holder.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,9 +43,9 @@
#define CANCELLED_CALL ((grpc_subchannel_call *)1)
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
- int success);
+ bool success);
static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
- int success);
+ bool success);
static void add_waiting_locked(grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op);
@@ -166,7 +166,7 @@ retry:
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
}
-static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_subchannel_call_holder *holder = arg;
grpc_subchannel_call *call;
gpr_mu_lock(&holder->mu);
@@ -209,10 +209,11 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), 1);
+ grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true,
+ NULL);
}
-static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, int success) {
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
@@ -240,9 +241,10 @@ static void fail_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder) {
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
- grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false,
+ NULL);
grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
- 0);
+ false, NULL);
}
holder->waiting_ops_count = 0;
}
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 5b10600ab5..459bbebb68 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -121,7 +121,7 @@ void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
*pp->target = NULL;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
pp = next;
}
@@ -140,7 +140,7 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -209,7 +209,7 @@ int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
}
static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
pick_first_lb_policy *p = arg;
size_t i;
size_t num_subchannels = p->num_subchannels;
@@ -230,7 +230,7 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
}
static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
pick_first_lb_policy *p = arg;
grpc_subchannel *selected_subchannel;
pending_pick *pp;
@@ -272,15 +272,15 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
- grpc_exec_ctx_enqueue(exec_ctx,
- grpc_closure_create(destroy_subchannels, p), 1);
+ grpc_exec_ctx_enqueue(
+ exec_ctx, grpc_closure_create(destroy_subchannels, p), true, NULL);
/* update any calls that were waiting for a pick */
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_connected_subchannel_notify_on_state_change(
@@ -327,7 +327,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base,
@@ -374,7 +374,7 @@ void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (selected) {
grpc_connected_subchannel_ping(exec_ctx, selected, closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
}
}
diff --git a/src/core/client_config/lb_policies/round_robin.c b/src/core/client_config/lb_policies/round_robin.c
index d487456363..b1171c45b0 100644
--- a/src/core/client_config/lb_policies/round_robin.c
+++ b/src/core/client_config/lb_policies/round_robin.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -237,7 +237,7 @@ void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
}
grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
@@ -263,7 +263,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
*target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
pp->next = p->pending_picks;
@@ -336,7 +336,7 @@ int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
}
static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
subchannel_data *sd = arg;
round_robin_lb_policy *p = sd->policy;
pending_pick *pp;
@@ -376,7 +376,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
}
grpc_pollset_set_del_pollset(exec_ctx, &p->base.interested_parties,
pp->pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@@ -428,7 +428,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
} else {
@@ -479,7 +479,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
grpc_connected_subchannel_ping(exec_ctx, target, closure);
} else {
gpr_mu_unlock(&p->mu);
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
}
}
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 28ca30b946..376b6b3d76 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -93,7 +93,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) {
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -182,7 +182,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 6343259246..68910ad975 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -98,7 +98,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
@@ -153,7 +153,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr");
r->published = 1;
*r->target_config = cfg;
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
}
}
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 4924ca77d6..166738e768 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -420,7 +420,7 @@ static void zookeeper_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx,
if (r->resolved_config != NULL) {
grpc_client_config_ref(r->resolved_config);
}
- grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index fccc1dda54..0a996a1e8b 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -145,7 +145,7 @@ struct grpc_subchannel_call {
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
- int iomgr_success);
+ bool iomgr_success);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define REF_REASON reason
@@ -175,7 +175,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
*/
static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
grpc_connected_subchannel *c = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
gpr_free(c);
@@ -198,7 +198,7 @@ void grpc_connected_subchannel_unref(grpc_exec_ctx *exec_ctx,
*/
static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
grpc_subchannel *c = arg;
gpr_free((void *)c->filters);
grpc_channel_args_destroy(c->args);
@@ -268,7 +268,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx,
old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF"));
if (old_refs == 1) {
grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(subchannel_destroy, c),
- 1);
+ true, NULL);
}
}
@@ -341,7 +341,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
}
static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
external_state_watcher *w = arg;
grpc_closure *follow_up = w->notify;
if (w->pollset_set != NULL) {
@@ -413,7 +413,7 @@ void grpc_connected_subchannel_process_transport_op(
}
static void subchannel_on_child_state_changed(grpc_exec_ctx *exec_ctx, void *p,
- int iomgr_success) {
+ bool iomgr_success) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
@@ -594,7 +594,7 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) {
grpc_subchannel *c = arg;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
@@ -611,7 +611,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
}
static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
grpc_subchannel *c = arg;
if (c->connecting_result.transport != NULL) {
@@ -647,7 +647,7 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
*/
static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
- int success) {
+ bool success) {
grpc_subchannel_call *c = call;
GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index b5cd8d8d2a..71237bb614 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -114,13 +114,13 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
gpr_free(req);
}
-static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success);
+static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success);
static void do_read(grpc_exec_ctx *exec_ctx, internal_request *req) {
grpc_endpoint_read(exec_ctx, req->ep, &req->incoming, &req->on_read);
}
-static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
internal_request *req = user_data;
size_t i;
@@ -147,7 +147,7 @@ static void on_written(grpc_exec_ctx *exec_ctx, internal_request *req) {
do_read(exec_ctx, req);
}
-static void done_write(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void done_write(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
internal_request *req = arg;
if (success) {
on_written(exec_ctx, req);
@@ -175,7 +175,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
start_write(exec_ctx, req);
}
-static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void on_connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
internal_request *req = arg;
if (!req->ep) {
diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c
index 7646a88ac5..3a96f7385f 100644
--- a/src/core/iomgr/closure.c
+++ b/src/core/iomgr/closure.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -43,7 +43,7 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
}
void grpc_closure_list_add(grpc_closure_list *closure_list,
- grpc_closure *closure, int success) {
+ grpc_closure *closure, bool success) {
if (closure == NULL) return;
closure->final_data = (success != 0);
if (closure_list->head == NULL) {
@@ -54,7 +54,7 @@ void grpc_closure_list_add(grpc_closure_list *closure_list,
closure_list->tail = closure;
}
-int grpc_closure_list_empty(grpc_closure_list closure_list) {
+bool grpc_closure_list_empty(grpc_closure_list closure_list) {
return closure_list.head == NULL;
}
@@ -77,7 +77,7 @@ typedef struct {
grpc_closure wrapper;
} wrapped_closure;
-static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
wrapped_closure *wc = arg;
grpc_iomgr_cb_func cb = wc->cb;
void *cb_arg = wc->cb_arg;
diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h
index 98ef91e1db..ea96c19c71 100644
--- a/src/core/iomgr/closure.h
+++ b/src/core/iomgr/closure.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,6 +35,7 @@
#define GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H
#include <grpc/support/port_platform.h>
+#include <stdbool.h>
struct grpc_closure;
typedef struct grpc_closure grpc_closure;
@@ -54,7 +55,7 @@ typedef struct grpc_closure_list {
* \param success An indication on the state of the iomgr. On false, cleanup
* actions should be taken (eg, shutdown). */
typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg,
- int success);
+ bool success);
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
@@ -83,13 +84,13 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
/** add \a closure to the end of \a list and set \a closure's success to \a
* success */
void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure,
- int success);
+ bool success);
/** append all closures from \a src to \a dst and empty \a src. */
void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst);
/** return whether \a list is empty. */
-int grpc_closure_list_empty(grpc_closure_list list);
+bool grpc_closure_list_empty(grpc_closure_list list);
/** return the next pointer for a queued closure list */
grpc_closure *grpc_closure_next(grpc_closure *closure);
diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c
index 6059a6031c..1fd79f6eba 100644
--- a/src/core/iomgr/exec_ctx.c
+++ b/src/core/iomgr/exec_ctx.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -37,16 +37,16 @@
#include "src/core/profiling/timers.h"
-int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
- int did_something = 0;
+bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
+ bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
while (!grpc_closure_list_empty(exec_ctx->closure_list)) {
grpc_closure *c = exec_ctx->closure_list.head;
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
- int success = (int)(c->final_data & 1);
+ bool success = (bool)(c->final_data & 1);
grpc_closure *next = (grpc_closure *)(c->final_data & ~(uintptr_t)1);
- did_something++;
+ did_something = true;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0);
c->cb(exec_ctx, c->cb_arg, success);
GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0);
@@ -62,11 +62,15 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
}
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- int success) {
+ bool success,
+ grpc_workqueue *offload_target_or_null) {
+ GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_add(&exec_ctx->closure_list, closure, success);
}
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list) {
+ grpc_closure_list *list,
+ grpc_workqueue *offload_target_or_null) {
+ GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_move(list, &exec_ctx->closure_list);
}
diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h
index 43df488094..9a9b2e55fa 100644
--- a/src/core/iomgr/exec_ctx.h
+++ b/src/core/iomgr/exec_ctx.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -57,22 +57,29 @@ struct grpc_exec_ctx {
grpc_closure_list closure_list;
};
+/** A workqueue represents a list of work to be executed asynchronously.
+ Forward declared here to avoid a circular dependency with workqueue.h. */
+struct grpc_workqueue;
+typedef struct grpc_workqueue grpc_workqueue;
+
#define GRPC_EXEC_CTX_INIT \
{ GRPC_CLOSURE_LIST_INIT }
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
- * Returns 1 if work was performed, 0 otherwise. */
-int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
+ * Returns true if work was performed, false otherwise. */
+bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
/** Finish any pending work for a grpc_exec_ctx. Must be called before
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
/** Add a closure to be executed at the next flush/finish point */
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- int success);
+ bool success,
+ grpc_workqueue *offload_target_or_null);
/** Add a list of closures to be executed at the next flush/finish point.
* Leaves \a list empty. */
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list);
+ grpc_closure_list *list,
+ grpc_workqueue *offload_target_or_null);
#endif
diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c
index 00c68f7828..f22d8f30ac 100644
--- a/src/core/iomgr/executor.c
+++ b/src/core/iomgr/executor.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -77,7 +77,7 @@ static void closure_exec_thread_func(void *ignored) {
gpr_mu_unlock(&g_executor.mu);
break;
} else {
- grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
+ grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL);
}
gpr_mu_unlock(&g_executor.mu);
grpc_exec_ctx_flush(&exec_ctx);
@@ -112,7 +112,7 @@ static void maybe_spawn_locked() {
g_executor.pending_join = 1;
}
-void grpc_executor_enqueue(grpc_closure *closure, int success) {
+void grpc_executor_enqueue(grpc_closure *closure, bool success) {
gpr_mu_lock(&g_executor.mu);
if (g_executor.shutting_down == 0) {
grpc_closure_list_add(&g_executor.closures, closure, success);
@@ -133,7 +133,7 @@ void grpc_executor_shutdown() {
* list below because we aren't accepting new work */
/* Execute pending callbacks, some may be performing cleanups */
- grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
+ grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL);
grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) {
diff --git a/src/core/iomgr/executor.h b/src/core/iomgr/executor.h
index 6da446ae9c..aac057ddf5 100644
--- a/src/core/iomgr/executor.h
+++ b/src/core/iomgr/executor.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -45,7 +45,7 @@ void grpc_executor_init();
/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
* thread */
-void grpc_executor_enqueue(grpc_closure *closure, int success);
+void grpc_executor_enqueue(grpc_closure *closure, bool success);
/** Shutdown the executor, running all pending work as part of the call */
void grpc_executor_shutdown();
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 89c938bc04..85eadd754b 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -218,7 +218,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
} else {
grpc_remove_fd_from_all_epoll_sets(fd->fd);
}
- grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
}
int grpc_fd_wrapped_fd(grpc_fd *fd) {
@@ -273,7 +273,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@@ -296,7 +296,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown);
+ grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
*st = CLOSURE_NOT_READY;
return 1;
}
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c
index 6cbe7d2fd4..96b6f81024 100644
--- a/src/core/iomgr/iocp_windows.c
+++ b/src/core/iomgr/iocp_windows.c
@@ -120,7 +120,7 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) {
info->has_pending_iocp = 1;
}
gpr_mu_unlock(&socket->state_mu);
- grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
}
void grpc_iocp_init(void) {
@@ -183,7 +183,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
- grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
} else {
info->closure = closure;
}
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index d117485327..4acae2bb71 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -141,7 +141,7 @@ static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_status) {
+ bool iomgr_status) {
delayed_add *da = arg;
if (!grpc_fd_is_orphaned(da->fd)) {
@@ -154,7 +154,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
/* We don't care about this pollset anymore. */
if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
da->pollset->called_shutdown = 1;
- grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL);
}
}
gpr_mu_unlock(&da->pollset->mu);
@@ -178,7 +178,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
GRPC_FD_REF(fd, "delayed_add");
grpc_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
- grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &da->closure, true, NULL);
}
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index c325b634ae..a8e2e22977 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -243,7 +243,7 @@ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
pollset->vtable->finish_shutdown(pollset);
- grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
}
void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
@@ -271,7 +271,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (!grpc_pollset_has_workers(pollset) &&
!grpc_closure_list_empty(pollset->idle_jobs)) {
GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0);
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
goto done;
}
/* Check alarms - these are a global resource so we just ping
@@ -365,7 +365,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
gpr_mu_unlock(&pollset->mu);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
@@ -381,7 +381,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutdown_done = closure;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!grpc_pollset_has_workers(pollset)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
}
if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
!grpc_pollset_has_workers(pollset)) {
@@ -419,7 +419,8 @@ typedef struct grpc_unary_promote_args {
grpc_closure promotion_closure;
} grpc_unary_promote_args;
-static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) {
+static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args,
+ bool success) {
grpc_unary_promote_args *up_args = args;
const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
grpc_pollset *pollset = up_args->pollset;
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index bfd9e69a16..2e650ca939 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -107,7 +107,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
- grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
} else {
pollset->on_shutdown = closure;
}
@@ -165,7 +165,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, true, NULL);
pollset->on_shutdown = NULL;
}
goto done;
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index 555c74ce7e..c51745b918 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -152,7 +152,7 @@ done:
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
-static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) {
+static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) {
request *r = rp;
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c
index 007c855d10..28c8661e73 100644
--- a/src/core/iomgr/resolve_address_windows.c
+++ b/src/core/iomgr/resolve_address_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -135,7 +135,7 @@ done:
/* Callback to be passed to grpc_executor to asynch-ify
* grpc_blocking_resolve_address */
-static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, int success) {
+static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) {
request *r = rp;
grpc_resolved_addresses *resolved =
grpc_blocking_resolve_address(r->name, r->default_port);
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index d9d24ee9a3..c76c2e3b0f 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -91,7 +91,7 @@ error:
return 0;
}
-static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int success) {
+static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
int done;
async_connect *ac = acp;
if (grpc_tcp_trace) {
@@ -111,7 +111,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int success) {
}
}
-static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) {
+static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, bool success) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
@@ -206,7 +206,7 @@ finish:
gpr_free(ac->addr_str);
gpr_free(ac);
}
- grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, *ep != NULL, NULL);
}
void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
@@ -243,7 +243,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
addr_len = sizeof(addr4_copy);
}
if (!prepare_socket(addr, fd)) {
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
return;
}
@@ -259,14 +259,14 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
if (err >= 0) {
*ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str);
- grpc_exec_ctx_enqueue(exec_ctx, closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
- grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
goto done;
}
diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c
index e5691b7e12..689c6f7b10 100644
--- a/src/core/iomgr/tcp_client_windows.c
+++ b/src/core/iomgr/tcp_client_windows.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -74,7 +74,7 @@ static void async_connect_unlock_and_cleanup(async_connect *ac) {
}
}
-static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int occured) {
+static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
/* If the alarm didn't occur, it got cancelled. */
@@ -84,7 +84,7 @@ static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, int occured) {
async_connect_unlock_and_cleanup(ac);
}
-static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, int from_iocp) {
+static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
grpc_endpoint **ep = ac->endpoint;
@@ -215,7 +215,7 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
- grpc_exec_ctx_enqueue(exec_ctx, on_done, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, on_done, false, NULL);
}
#endif /* GPR_WINSOCK_SOCKET */
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 4fa8ca8c71..048e907441 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -100,9 +100,9 @@ typedef struct {
} grpc_tcp;
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- int success);
+ bool success);
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- int success);
+ bool success);
static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -247,7 +247,7 @@ static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
}
static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- int success) {
+ bool success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
GPR_ASSERT(!tcp->finished_edge);
@@ -273,7 +273,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->finished_edge = 0;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, true, NULL);
}
}
@@ -360,7 +360,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) {
}
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
- int success) {
+ bool success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
flush_result status;
grpc_closure *cb;
@@ -407,7 +407,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_enqueue(exec_ctx, cb, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL);
return;
}
tcp->outgoing_buffer = buf;
@@ -420,7 +420,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->write_cb = cb;
grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE, NULL);
}
GPR_TIMER_END("tcp_write", 0);
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index adf14aeb59..5e07f8261c 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -160,7 +160,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL);
}
gpr_mu_destroy(&s->mu);
@@ -174,7 +174,8 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(s);
}
-static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) {
+static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server,
+ bool success) {
grpc_tcp_server *s = server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
@@ -317,7 +318,7 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index,
sp->fd_index};
@@ -602,7 +603,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL);
gpr_mu_unlock(&s->mu);
if (exec_ctx == NULL) {
grpc_exec_ctx_flush(&local_exec_ctx);
diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c
index 00d381f264..ce930b8f41 100644
--- a/src/core/iomgr/tcp_server_windows.c
+++ b/src/core/iomgr/tcp_server_windows.c
@@ -119,7 +119,7 @@ grpc_tcp_server *grpc_tcp_server_create(grpc_closure *shutdown_complete) {
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, true, NULL);
}
/* Now that the accepts have been aborted, we can destroy the sockets.
@@ -173,7 +173,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting);
+ grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL);
gpr_mu_unlock(&s->mu);
if (exec_ctx == NULL) {
grpc_exec_ctx_flush(&local_exec_ctx);
@@ -311,7 +311,7 @@ failure:
}
/* Event manager callback when reads are ready. */
-static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, int from_iocp) {
+static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, bool from_iocp) {
grpc_tcp_listener *sp = arg;
grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0};
SOCKET sock = sp->new_socket;
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index d3f080cbf9..038e4158c8 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -138,7 +138,7 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
#endif
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, int success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) {
grpc_tcp *tcp = tcpp;
grpc_closure *cb = tcp->read_cb;
grpc_winsocket *socket = tcp->socket;
@@ -184,7 +184,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
- grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
return;
}
@@ -208,7 +208,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, true, NULL);
return;
}
@@ -221,7 +221,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, &tcp->on_read, false, NULL);
return;
}
}
@@ -230,7 +230,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
/* Asynchronous callback from the IOCP, or the background thread. */
-static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, int success) {
+static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, bool success) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
@@ -273,7 +273,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t len;
if (tcp->shutting_down) {
- grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
return;
}
@@ -301,9 +301,9 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
connection that has its send queue filled up. But if we don't, then we can
avoid doing an async write operation at all. */
if (info->wsa_error != WSAEWOULDBLOCK) {
- int ok = 0;
+ bool ok = false;
if (status == 0) {
- ok = 1;
+ ok = true;
GPR_ASSERT(bytes_sent == tcp->write_slices->length);
} else {
if (socket->read_info.wsa_error != WSAECONNRESET) {
@@ -313,7 +313,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
}
if (allocated) gpr_free(allocated);
- grpc_exec_ctx_enqueue(exec_ctx, cb, ok);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, ok, NULL);
return;
}
@@ -330,7 +330,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(tcp, "write");
- grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
return;
}
}
diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c
index 24d6204e07..a33d8f63a0 100644
--- a/src/core/iomgr/timer.c
+++ b/src/core/iomgr/timer.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -224,7 +224,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[shard_idx(timer)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, false, NULL);
timer->triggered = 1;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@@ -290,7 +290,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
- grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success);
+ grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success, NULL);
n++;
}
*new_min_deadline = compute_min_deadline(shard);
diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h
index 714536233c..36dd133468 100644
--- a/src/core/iomgr/workqueue.h
+++ b/src/core/iomgr/workqueue.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -47,9 +47,7 @@
#include "src/core/iomgr/workqueue_windows.h"
#endif
-/** A workqueue represents a list of work to be executed asynchronously. */
-struct grpc_workqueue;
-typedef struct grpc_workqueue grpc_workqueue;
+/* grpc_workqueue is forward declared in exec_ctx.h */
/** Create a work queue */
grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index d2a1c34612..da11df67ef 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -45,7 +45,7 @@
#include "src/core/iomgr/fd_posix.h"
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success);
+static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success);
grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) {
char name[32];
@@ -110,7 +110,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
gpr_mu_unlock(&workqueue->mu);
}
-static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_workqueue *workqueue = arg;
if (!success) {
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index 8b56c57645..afba0079f5 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -793,7 +793,7 @@ static void md_only_test_destruct(grpc_call_credentials *creds) {
}
static void on_simulated_token_fetch_done(grpc_exec_ctx *exec_ctx,
- void *user_data, int success) {
+ void *user_data, bool success) {
grpc_credentials_metadata_request *r =
(grpc_credentials_metadata_request *)user_data;
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)r->creds;
@@ -812,7 +812,7 @@ static void md_only_test_get_request_metadata(
grpc_credentials_metadata_request *cb_arg =
grpc_credentials_metadata_request_create(creds, cb, user_data);
grpc_executor_enqueue(
- grpc_closure_create(on_simulated_token_fetch_done, cb_arg), 1);
+ grpc_closure_create(on_simulated_token_fetch_done, cb_arg), true);
} else {
cb(exec_ctx, user_data, c->md_store->entries, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c
index 5385e41130..ae71107bef 100644
--- a/src/core/security/google_default_credentials.c
+++ b/src/core/security/google_default_credentials.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -86,7 +86,7 @@ static void on_compute_engine_detection_http_response(
gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset));
}
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int s) {
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) {
grpc_pollset_destroy(p);
}
diff --git a/src/core/security/handshake.c b/src/core/security/handshake.c
index 364b765396..a8b2fef629 100644
--- a/src/core/security/handshake.c
+++ b/src/core/security/handshake.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -61,10 +61,10 @@ typedef struct {
} grpc_security_handshake;
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
- void *setup, int success);
+ void *setup, bool success);
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx, void *setup,
- int success);
+ bool success);
static void security_connector_remove_handshake(grpc_security_handshake *h) {
grpc_security_connector_handshake_list *node;
@@ -198,7 +198,8 @@ static void send_handshake_bytes_to_peer(grpc_exec_ctx *exec_ctx,
}
static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
- void *handshake, int success) {
+ void *handshake,
+ bool success) {
grpc_security_handshake *h = handshake;
size_t consumed_slice_size = 0;
tsi_result result = TSI_OK;
@@ -265,7 +266,7 @@ static void on_handshake_data_received_from_peer(grpc_exec_ctx *exec_ctx,
/* If handshake is NULL, the handshake is done. */
static void on_handshake_data_sent_to_peer(grpc_exec_ctx *exec_ctx,
- void *handshake, int success) {
+ void *handshake, bool success) {
grpc_security_handshake *h = handshake;
/* Make sure that write is OK. */
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index 9b87e268c6..d11c43be20 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -126,7 +126,7 @@ static void flush_read_staging_buffer(secure_endpoint *ep, uint8_t **cur,
}
static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
- int success) {
+ bool success) {
if (grpc_trace_secure_endpoint) {
size_t i;
for (i = 0; i < ep->read_buffer->count; i++) {
@@ -137,11 +137,11 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
}
}
ep->read_buffer = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success);
+ grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success, NULL);
SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read");
}
-static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
+static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, bool success) {
unsigned i;
uint8_t keep_looping = 0;
tsi_result result = TSI_OK;
@@ -315,7 +315,7 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
if (result != TSI_OK) {
/* TODO(yangg) do different things according to the error type? */
gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
- grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, false, NULL);
return;
}
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index d5c8c54369..4c78711387 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -153,7 +153,7 @@ static void on_md_processing_done(
}
static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data,
- int success) {
+ bool success) {
grpc_call_element *elem = user_data;
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
diff --git a/src/core/security/server_secure_chttp2.c b/src/core/security/server_secure_chttp2.c
index 08713fceaf..ee3443c828 100644
--- a/src/core/security/server_secure_chttp2.c
+++ b/src/core/security/server_secure_chttp2.c
@@ -142,7 +142,7 @@ static void start(grpc_exec_ctx *exec_ctx, grpc_server *server, void *statep,
on_accept, state);
}
-static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, int success) {
+static void destroy_done(grpc_exec_ctx *exec_ctx, void *statep, bool success) {
grpc_server_secure_state *state = statep;
if (state->destroy_callback != NULL) {
state->destroy_callback->cb(exec_ctx, state->destroy_callback->cb_arg,
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 880666bb38..9495e748b5 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -229,9 +229,9 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
grpc_status_code status,
const char *description);
static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
- int success);
+ bool success);
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- int success);
+ bool success);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask,
@@ -351,7 +351,7 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) {
GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON);
}
-static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) {
+static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, bool success) {
size_t i;
int ii;
grpc_call *c = call;
@@ -688,13 +688,13 @@ typedef struct cancel_closure {
grpc_status_code status;
} cancel_closure;
-static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) {
+static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
cancel_closure *cc = ccp;
GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel");
gpr_free(cc);
}
-static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) {
+static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, bool success) {
grpc_transport_stream_op op;
cancel_closure *cc = ccp;
memset(&op, 0, sizeof(op));
@@ -721,7 +721,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
cc->call = c;
cc->status = status;
GRPC_CALL_INTERNAL_REF(c, "cancel");
- grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, true, NULL);
return GRPC_CALL_OK;
}
@@ -757,7 +757,7 @@ grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
return CALL_FROM_TOP_ELEM(elem);
}
-static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
grpc_call *call = arg;
gpr_mu_lock(&call->mu);
call->have_alarm = 0;
@@ -934,7 +934,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
if (bctl->is_notify_tag_closure) {
- grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
+ grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success, NULL);
gpr_mu_lock(&call->mu);
bctl->call->used_batches =
(uint8_t)(bctl->call->used_batches &
@@ -974,7 +974,7 @@ static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
}
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- int success) {
+ bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
@@ -993,7 +993,7 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
-static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) {
+static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
grpc_call *child_call;
@@ -1066,7 +1066,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) {
}
static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- int success) {
+ bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 001692376f..12d8ebceb9 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -80,7 +80,7 @@ struct grpc_channel {
/* the protobuf library will (by default) start warning at 100megs */
#define DEFAULT_MAX_MESSAGE_LENGTH (100 * 1024 * 1024)
-static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, int success);
+static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg, bool success);
grpc_channel *grpc_channel_create_from_filters(
grpc_exec_ctx *exec_ctx, const char *target,
@@ -268,7 +268,7 @@ void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx,
}
static void destroy_channel(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+ bool iomgr_success) {
grpc_channel *channel = arg;
grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CHANNEL(channel));
while (channel->registered_calls) {
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 10f5c4da4d..2dd4fce26b 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -165,11 +165,11 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
}
}
-static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
+static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
partly_done(exec_ctx, pw, 1);
}
-static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
+static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, bool success) {
partly_done(exec_ctx, pw, 0);
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 49083f0870..4d4337d288 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -80,11 +80,11 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
connector_unref(exec_ctx, arg);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->tcp;
diff --git a/src/core/surface/channel_ping.c b/src/core/surface/channel_ping.c
index b4ce282787..983f1c8a66 100644
--- a/src/core/surface/channel_ping.c
+++ b/src/core/surface/channel_ping.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -53,7 +53,7 @@ static void ping_destroy(grpc_exec_ctx *exec_ctx, void *arg,
gpr_free(arg);
}
-static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void ping_done(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
ping_result *pr = arg;
grpc_cq_end_op(exec_ctx, pr->cq, pr->tag, success, ping_destroy, pr,
&pr->completion_storage);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index c0db9c508a..75298eb795 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -86,7 +86,7 @@ static gpr_mu g_freelist_mu;
grpc_completion_queue *g_freelist;
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
- int success);
+ bool success);
void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); }
@@ -169,7 +169,7 @@ void grpc_cq_internal_ref(grpc_completion_queue *cc) {
}
static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
grpc_completion_queue *cc = arg;
GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index a60e9d20da..705996cad3 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -78,8 +78,8 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
} else if (op->recv_trailing_metadata != NULL) {
fill_metadata(elem, op->recv_trailing_metadata);
}
- grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0);
- grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 552a570713..dd1441ad67 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -128,14 +128,14 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg,
}
static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
- int success) {
+ bool success) {
connector *c = arg;
grpc_security_connector_do_handshake(exec_ctx, &c->security_connector->base,
c->connecting_endpoint,
on_secure_handshake_done, c);
}
-static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+static void connected(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
connector *c = arg;
grpc_closure *notify;
grpc_endpoint *tcp = c->newly_connecting_endpoint;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 0928f1e045..42cffccb4c 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -260,7 +260,7 @@ struct shutdown_cleanup_args {
};
static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_status_ignored) {
+ bool iomgr_status_ignored) {
struct shutdown_cleanup_args *a = arg;
gpr_slice_unref(a->slice);
gpr_free(a);
@@ -313,7 +313,7 @@ static void request_matcher_destroy(request_matcher *rm) {
gpr_stack_lockfree_destroy(rm->requests);
}
-static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) {
+static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, bool success) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
@@ -328,7 +328,7 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
}
}
@@ -392,7 +392,7 @@ static void orphan_channel(channel_data *chand) {
}
static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
- int success) {
+ bool success) {
channel_data *chand = cd;
grpc_server *server = chand->server;
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server");
@@ -407,7 +407,8 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
maybe_finish_shutdown(exec_ctx, chand->server);
chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
chand->finish_destroy_channel_closure.cb_arg = chand;
- grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, true,
+ NULL);
}
static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
@@ -420,7 +421,7 @@ static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
return;
}
@@ -569,7 +570,7 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
}
static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
- int success) {
+ bool success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
gpr_timespec op_deadline;
@@ -609,7 +610,7 @@ static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
- int success) {
+ bool success) {
grpc_call_element *elem = ptr;
call_data *calld = elem->call_data;
if (success) {
@@ -620,7 +621,7 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true, NULL);
} else if (calld->state == PENDING) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
@@ -653,7 +654,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
}
static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
- int iomgr_status_ignored) {
+ bool iomgr_status_ignored) {
channel_data *chand = cd;
grpc_server *server = chand->server;
if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
@@ -985,7 +986,7 @@ void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
}
static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s,
- int success) {
+ bool success) {
grpc_server *server = s;
gpr_mu_lock(&server->mu_global);
server->listeners_destroyed++;
@@ -1140,7 +1141,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
- grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, true,
+ NULL);
} else {
GPR_ASSERT(calld->state == PENDING);
calld->state = ACTIVATED;
@@ -1229,7 +1231,7 @@ done:
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
- void *user_data, int success);
+ void *user_data, bool success);
static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
gpr_slice slice = value->slice;
@@ -1315,7 +1317,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
}
static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
- int success) {
+ bool success) {
requested_call *rc = prc;
grpc_call *call = *rc->call;
grpc_call_element *elem =
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index 6e21d2dcd7..ce970dfe73 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -82,7 +82,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp,
grpc_closure *destroy_done) {
grpc_tcp_server *tcp = tcpp;
grpc_tcp_server_unref(exec_ctx, tcp);
- grpc_exec_ctx_enqueue(exec_ctx, destroy_done, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, destroy_done, true, NULL);
}
int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) {
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index a8262b7af2..6823f8f551 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -488,7 +488,7 @@ void grpc_chttp2_perform_writes(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
grpc_endpoint *endpoint);
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
- void *transport_writing, int success);
+ void *transport_writing, bool success);
void grpc_chttp2_cleanup_writing(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing);
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index fdad05b5fb..f78adb9e26 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -188,7 +188,7 @@ void grpc_chttp2_perform_writes(
grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
&transport_writing->done_cb);
} else {
- grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, true, NULL);
}
}
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 05b25fd8b0..9298573c7f 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -86,14 +86,14 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
/* forward declarations of various callbacks that we'll build closures around */
static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
- int iomgr_success_ignored);
+ bool iomgr_success_ignored);
/** Set a transport level setting, and push it to our peer */
static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
uint32_t value);
/** Endpoint callback to process incoming data */
-static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success);
+static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success);
/** Start disconnection chain */
static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
@@ -183,7 +183,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx,
and maybe they hold resources that need to be freed */
while (t->global.pings.next != &t->global.pings) {
grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
- grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, false, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -602,7 +602,7 @@ static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
t->parsing_active)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
- grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, true, NULL);
prevent_endpoint_shutdown(t);
}
check_read_ops(exec_ctx, &t->global);
@@ -631,7 +631,7 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
}
void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
- void *transport_writing_ptr, int success) {
+ void *transport_writing_ptr, bool success) {
grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
grpc_chttp2_stream_global *stream_global;
@@ -669,7 +669,7 @@ void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
}
static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
- int iomgr_success_ignored) {
+ bool iomgr_success_ignored) {
grpc_chttp2_transport *t = gt;
GPR_TIMER_BEGIN("writing_action", 0);
grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep);
@@ -759,7 +759,7 @@ void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
closure->final_data |= 1;
}
if (closure->final_data < 2) {
- grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0);
+ grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0, NULL);
}
*pclosure = NULL;
}
@@ -777,7 +777,7 @@ static int contains_non_ok_status(
return 0;
}
-static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {}
+static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, bool success) {}
static void perform_stream_op_locked(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
@@ -934,7 +934,7 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx,
for (ping = transport_global->pings.next; ping != &transport_global->pings;
ping = ping->next) {
if (0 == memcmp(opaque_8bytes, ping->id, 8)) {
- grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, true, NULL);
ping->next->prev = ping->prev;
ping->prev->next = ping->next;
gpr_free(ping);
@@ -951,7 +951,7 @@ static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
lock(t);
- grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL);
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
@@ -1022,11 +1022,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames);
GPR_ASSERT(*stream_global->recv_message != NULL);
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true,
+ NULL);
stream_global->recv_message_ready = NULL;
} else if (stream_global->published_trailing_metadata) {
*stream_global->recv_message = NULL;
- grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, true,
+ NULL);
stream_global->recv_message_ready = NULL;
}
}
@@ -1336,7 +1338,7 @@ static void read_error_locked(grpc_exec_ctx *exec_ctx,
}
/* tcp read callback */
-static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
+static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, bool success) {
size_t i;
int keep_reading = 0;
grpc_chttp2_transport *t = tp;
@@ -1523,7 +1525,7 @@ static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
unlock(exec_ctx, bs->transport);
return 1;
} else if (bs->failed) {
- grpc_exec_ctx_enqueue(exec_ctx, on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, on_complete, false, NULL);
unlock(exec_ctx, bs->transport);
return 0;
} else {
@@ -1552,7 +1554,7 @@ void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&bs->transport->mu);
if (bs->on_next != NULL) {
*bs->next = slice;
- grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, true, NULL);
bs->on_next = NULL;
} else {
gpr_slice_buffer_add(&bs->slices, slice);
@@ -1567,7 +1569,7 @@ void grpc_chttp2_incoming_byte_stream_finished(
if (from_parsing_thread) {
gpr_mu_lock(&bs->transport->mu);
}
- grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, false, NULL);
bs->on_next = NULL;
bs->failed = 1;
if (from_parsing_thread) {
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 3c3fd4671d..87765b9799 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -78,7 +78,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx,
} else {
success = 0;
}
- grpc_exec_ctx_enqueue(exec_ctx, w->notify, success);
+ grpc_exec_ctx_enqueue(exec_ctx, w->notify, success, NULL);
gpr_free(w);
}
gpr_free(tracker->name);
@@ -109,7 +109,7 @@ int grpc_connectivity_state_notify_on_state_change(
if (current == NULL) {
grpc_connectivity_state_watcher *w = tracker->watchers;
if (w != NULL && w->notify == notify) {
- grpc_exec_ctx_enqueue(exec_ctx, notify, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL);
tracker->watchers = w->next;
gpr_free(w);
return 0;
@@ -117,7 +117,7 @@ int grpc_connectivity_state_notify_on_state_change(
while (w != NULL) {
grpc_connectivity_state_watcher *rm_candidate = w->next;
if (rm_candidate != NULL && rm_candidate->notify == notify) {
- grpc_exec_ctx_enqueue(exec_ctx, notify, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, notify, false, NULL);
w->next = w->next->next;
gpr_free(rm_candidate);
return 0;
@@ -128,7 +128,7 @@ int grpc_connectivity_state_notify_on_state_change(
} else {
if (tracker->current_state != *current) {
*current = tracker->current_state;
- grpc_exec_ctx_enqueue(exec_ctx, notify, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, notify, true, NULL);
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@@ -158,7 +158,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx,
while ((w = tracker->watchers) != NULL) {
*w->current = tracker->current_state;
tracker->watchers = w->next;
- grpc_exec_ctx_enqueue(exec_ctx, w->notify, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, w->notify, true, NULL);
gpr_free(w);
}
}
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 2ab978be46..08d685668c 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -59,7 +59,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount) {
#endif
if (gpr_unref(&refcount->refs)) {
- grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, 1);
+ grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, true, NULL);
}
}
@@ -125,8 +125,8 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) {
- grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0);
- grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0);
+ grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
}
void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
@@ -150,7 +150,7 @@ typedef struct {
grpc_closure closure;
} close_message_data;
-static void free_message(grpc_exec_ctx *exec_ctx, void *p, int iomgr_success) {
+static void free_message(grpc_exec_ctx *exec_ctx, void *p, bool iomgr_success) {
close_message_data *cmd = p;
gpr_slice_unref(cmd->message);
if (cmd->then_call != NULL) {
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f94f0ae76e..f5cac77adc 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -81,16 +81,29 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount);
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
+ /** Send initial metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_initial_metadata;
+
+ /** Send trailing metadata to the peer, from the provided metadata batch. */
grpc_metadata_batch *send_trailing_metadata;
+ /** Send message data to the peer, from the provided byte stream. */
grpc_byte_stream *send_message;
+ /** Receive initial metadata from the stream, into provided metadata batch. */
grpc_metadata_batch *recv_initial_metadata;
+
+ /** Receive message data from the stream, into provided byte stream. */
grpc_byte_stream **recv_message;
+ /** Should be enqueued when one message is ready to be processed. */
grpc_closure *recv_message_ready;
+
+ /** Receive trailing metadata from the stream, into provided metadata batch.
+ */
grpc_metadata_batch *recv_trailing_metadata;
+ /** Should be enqueued when all requested operations (excluding recv_message
+ which has its own closure) in a given batch have been completed. */
grpc_closure *on_complete;
/** If != GRPC_STATUS_OK, cancel this stream */