aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:44:19 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:44:19 -0700
commit33825118df7157219cec15382beb006d3462ad96 (patch)
tree649a40da98f56a875ed6558e474dd6c61ce2c7be /src/core
parent000cd8f9f7346defc79fe6aa877af11b42ab5f1e (diff)
Cleanup
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/grpc_filter.c4
-rw-r--r--src/core/channel/client_channel.c79
-rw-r--r--src/core/channel/client_channel.h5
-rw-r--r--src/core/channel/http_client_filter.c6
-rw-r--r--src/core/channel/http_server_filter.c6
-rw-r--r--src/core/client_config/connector.c2
-rw-r--r--src/core/client_config/connector.h4
-rw-r--r--src/core/client_config/lb_policies/pick_first.c33
-rw-r--r--src/core/client_config/lb_policy.c17
-rw-r--r--src/core/client_config/lb_policy.h35
-rw-r--r--src/core/client_config/resolver.c2
-rw-r--r--src/core/client_config/resolver.h4
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c18
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c19
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c21
-rw-r--r--src/core/client_config/subchannel.c49
-rw-r--r--src/core/client_config/subchannel.h8
-rw-r--r--src/core/httpcli/httpcli.c8
-rw-r--r--src/core/iomgr/endpoint.c4
-rw-r--r--src/core/iomgr/endpoint.h8
-rw-r--r--src/core/iomgr/fd_posix.c21
-rw-r--r--src/core/iomgr/fd_posix.h11
-rw-r--r--src/core/iomgr/iomgr.c19
-rw-r--r--src/core/iomgr/iomgr.h31
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c4
-rw-r--r--src/core/iomgr/pollset_posix.c16
-rw-r--r--src/core/iomgr/pollset_posix.h10
-rw-r--r--src/core/iomgr/socket_windows.h2
-rw-r--r--src/core/iomgr/tcp_client_posix.c2
-rw-r--r--src/core/iomgr/tcp_posix.c16
-rw-r--r--src/core/iomgr/tcp_server_posix.c4
-rw-r--r--src/core/iomgr/tcp_windows.c12
-rw-r--r--src/core/iomgr/udp_server.c4
-rw-r--r--src/core/iomgr/workqueue.h4
-rw-r--r--src/core/iomgr/workqueue_posix.c97
-rw-r--r--src/core/iomgr/workqueue_posix.h5
-rw-r--r--src/core/security/secure_endpoint.c12
-rw-r--r--src/core/security/secure_transport_setup.c12
-rw-r--r--src/core/security/server_auth_filter.c6
-rw-r--r--src/core/surface/call.c19
-rw-r--r--src/core/surface/channel_connectivity.c14
-rw-r--r--src/core/surface/channel_create.c6
-rw-r--r--src/core/surface/secure_channel_create.c8
-rw-r--r--src/core/surface/server.c32
-rw-r--r--src/core/transport/chttp2/internal.h14
-rw-r--r--src/core/transport/chttp2/writing.c4
-rw-r--r--src/core/transport/chttp2_transport.c36
-rw-r--r--src/core/transport/connectivity_state.c8
-rw-r--r--src/core/transport/connectivity_state.h7
-rw-r--r--src/core/transport/transport.c6
-rw-r--r--src/core/transport/transport.h14
51 files changed, 350 insertions, 438 deletions
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index 8b6ba1d472..d98162ec9a 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -54,7 +54,7 @@ typedef struct call_data {
/* recv callback */
grpc_stream_op_buffer* recv_ops;
- grpc_iomgr_closure* on_done_recv;
+ grpc_closure* on_done_recv;
} call_data;
typedef struct channel_data {
@@ -145,7 +145,7 @@ static void server_init_call_elem(grpc_call_element* elem,
GPR_ASSERT(d != NULL);
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
/* TODO(hongyu): call census_tracing_start_op here. */
- grpc_iomgr_closure_init(d->on_done_recv, server_on_done_recv, elem);
+ grpc_closure_init(d->on_done_recv, server_on_done_recv, elem);
if (initial_op) server_mutate_op(elem, initial_op);
}
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index a2346503d9..588b9c36ee 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -73,9 +73,9 @@ typedef struct {
guarded by mu_config */
grpc_client_config *incoming_configuration;
/** a list of closures that are all waiting for config to come in */
- grpc_iomgr_call_list waiting_for_config_closures;
+ grpc_call_list waiting_for_config_closures;
/** resolver callback */
- grpc_iomgr_closure on_config_changed;
+ grpc_closure on_config_changed;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
/** when an lb_policy arrives, should we try to exit idle */
@@ -91,7 +91,7 @@ typedef struct {
update the channel, and create a new watcher */
typedef struct {
channel_data *chand;
- grpc_iomgr_closure on_changed;
+ grpc_closure on_changed;
grpc_connectivity_state state;
grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher;
@@ -115,7 +115,7 @@ struct call_data {
call_state state;
gpr_timespec deadline;
grpc_subchannel *picked_channel;
- grpc_iomgr_closure async_setup_task;
+ grpc_closure async_setup_task;
grpc_transport_stream_op waiting_op;
/* our child call stack */
grpc_subchannel_call *subchannel_call;
@@ -123,9 +123,9 @@ struct call_data {
grpc_linked_mdelem details;
};
-static grpc_iomgr_closure *merge_into_waiting_op(
- grpc_call_element *elem,
- grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT;
+static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
+ grpc_transport_stream_op *new_op)
+ GRPC_MUST_USE_RESULT;
static void handle_op_after_cancellation(grpc_call_element *elem,
grpc_transport_stream_op *op) {
@@ -160,7 +160,7 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
}
typedef struct {
- grpc_iomgr_closure closure;
+ grpc_closure closure;
grpc_call_element *elem;
} waiting_call;
@@ -179,10 +179,9 @@ static void add_to_lb_policy_wait_queue_locked_state_config(
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
waiting_call *wc = gpr_malloc(sizeof(*wc));
- grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
+ grpc_closure_init(&wc->closure, continue_with_pick, wc);
wc->elem = elem;
- grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure,
- 1);
+ grpc_call_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
}
static int is_empty(void *p, int len) {
@@ -230,7 +229,7 @@ static void started_call(void *arg, int iomgr_success) {
static void picked_target(void *arg, int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
@@ -246,19 +245,19 @@ static void picked_target(void *arg, int iomgr_success) {
calld->state = CALL_WAITING_FOR_CALL;
pollset = calld->waiting_op.bind_pollset;
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
+ grpc_closure_init(&calld->async_setup_task, started_call, calld);
grpc_subchannel_create_call(calld->picked_channel, pollset,
&calld->subchannel_call,
&calld->async_setup_task, &call_list);
}
}
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
}
-static grpc_iomgr_closure *merge_into_waiting_op(
- grpc_call_element *elem, grpc_transport_stream_op *new_op) {
+static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
+ grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data;
- grpc_iomgr_closure *consumed_op = NULL;
+ grpc_closure *consumed_op = NULL;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
@@ -312,7 +311,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@@ -330,7 +329,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
case CALL_WAITING_FOR_SEND:
GPR_ASSERT(!continuation);
- grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
+ grpc_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
if (!calld->waiting_op.send_ops &&
calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
gpr_mu_unlock(&calld->mu_state);
@@ -359,8 +358,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op);
handle_op_after_cancellation(elem, &op2);
} else {
- grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op),
- 1);
+ grpc_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
gpr_mu_unlock(&calld->mu_state);
}
break;
@@ -397,8 +395,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->async_setup_task, picked_target,
- calld);
+ grpc_closure_init(&calld->async_setup_task, picked_target, calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
&calld->picked_channel,
&calld->async_setup_task, &call_list);
@@ -427,7 +424,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
}
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
}
static void cc_start_transport_stream_op(grpc_call_element *elem,
@@ -437,10 +434,10 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state,
- grpc_iomgr_call_list *cl);
+ grpc_call_list *cl);
static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
- grpc_iomgr_call_list *cl) {
+ grpc_call_list *cl) {
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
@@ -453,13 +450,13 @@ static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
- grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list cl = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&w->chand->mu_config);
on_lb_policy_state_changed_locked(w, &cl);
gpr_mu_unlock(&w->chand->mu_config);
- grpc_iomgr_call_list_run(cl);
+ grpc_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
gpr_free(w);
@@ -467,12 +464,12 @@ static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state,
- grpc_iomgr_call_list *call_list) {
+ grpc_call_list *call_list) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
w->chand = chand;
- grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+ grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed,
@@ -485,7 +482,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list cl = GRPC_CALL_LIST_INIT;
int exit_idle = 0;
if (chand->incoming_configuration != NULL) {
@@ -505,7 +502,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
- grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl);
+ grpc_call_list_move(&chand->waiting_for_config_closures, &cl);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -553,7 +550,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
}
- grpc_iomgr_call_list_run(cl);
+ grpc_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
@@ -562,10 +559,10 @@ static void cc_start_transport_op(grpc_channel_element *elem,
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
if (op->on_consumed) {
- grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1);
+ grpc_call_list_add(&call_list, op->on_consumed, 1);
op->on_consumed = NULL;
}
@@ -612,7 +609,7 @@ static void cc_start_transport_op(grpc_channel_element *elem,
GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
}
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
}
/* Constructor for call_data */
@@ -677,8 +674,7 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
chand->mdctx = metadata_context;
chand->master = master;
grpc_pollset_set_init(&chand->pollset_set);
- grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
- chand);
+ grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
grpc_connectivity_state_init(&chand->state_tracker,
GRPC_CHANNEL_IDLE, "client_channel");
@@ -722,7 +718,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
GRPC_RESOLVER_REF(resolver, "channel");
- if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) ||
+ if (!grpc_call_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = 1;
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
@@ -733,8 +729,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect,
- grpc_iomgr_call_list *call_list) {
+ grpc_channel_element *elem, int try_to_connect, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
@@ -758,7 +753,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
- grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
+ grpc_closure *on_complete, grpc_call_list *call_list) {
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu_config);
grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index 60090e8d7e..bd84dad6b7 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -53,12 +53,11 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect,
- grpc_iomgr_call_list *call_list);
+ grpc_channel_element *elem, int try_to_connect, grpc_call_list *call_list);
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
- grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list);
+ grpc_closure *on_complete, grpc_call_list *call_list);
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
grpc_channel_element *elem);
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index ec832a0367..bcaaca0f7d 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -50,11 +50,11 @@ typedef struct call_data {
grpc_stream_op_buffer *recv_ops;
/** Closure to call when finished with the hc_on_recv hook */
- grpc_iomgr_closure *on_done_recv;
+ grpc_closure *on_done_recv;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
- grpc_iomgr_closure hc_on_recv;
+ grpc_closure hc_on_recv;
} call_data;
typedef struct channel_data {
@@ -162,7 +162,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->sent_initial_metadata = 0;
calld->got_initial_metadata = 0;
calld->on_done_recv = NULL;
- grpc_iomgr_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
+ grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
if (initial_op) hc_mutate_op(elem, initial_op);
}
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 2f061946a1..7a9f17d057 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -50,11 +50,11 @@ typedef struct call_data {
grpc_stream_op_buffer *recv_ops;
/** Closure to call when finished with the hs_on_recv hook */
- grpc_iomgr_closure *on_done_recv;
+ grpc_closure *on_done_recv;
/** Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member
after handling it. */
- grpc_iomgr_closure hs_on_recv;
+ grpc_closure hs_on_recv;
} call_data;
typedef struct channel_data {
@@ -232,7 +232,7 @@ static void init_call_elem(grpc_call_element *elem,
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
- grpc_iomgr_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
+ grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
if (initial_op) hs_mutate_op(elem, initial_op);
}
diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c
index c1e583e4a5..6252d57271 100644
--- a/src/core/client_config/connector.c
+++ b/src/core/client_config/connector.c
@@ -44,7 +44,7 @@ void grpc_connector_unref(grpc_connector *connector) {
void grpc_connector_connect(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
- grpc_iomgr_closure *notify) {
+ grpc_closure *notify) {
connector->vtable->connect(connector, in_args, out_args, notify);
}
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h
index ad0b567553..fc1f6708de 100644
--- a/src/core/client_config/connector.h
+++ b/src/core/client_config/connector.h
@@ -77,7 +77,7 @@ struct grpc_connector_vtable {
/** Implementation of grpc_connector_connect */
void (*connect)(grpc_connector *connector,
const grpc_connect_in_args *in_args,
- grpc_connect_out_args *out_args, grpc_iomgr_closure *notify);
+ grpc_connect_out_args *out_args, grpc_closure *notify);
};
void grpc_connector_ref(grpc_connector *connector);
@@ -86,7 +86,7 @@ void grpc_connector_unref(grpc_connector *connector);
void grpc_connector_connect(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
- grpc_iomgr_closure *notify);
+ grpc_closure *notify);
/** Cancel any pending connection */
void grpc_connector_shutdown(grpc_connector *connector);
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 8fd8dd7b67..852eed310d 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -43,7 +43,7 @@ typedef struct pending_pick {
struct pending_pick *next;
grpc_pollset *pollset;
grpc_subchannel **target;
- grpc_iomgr_closure *on_complete;
+ grpc_closure *on_complete;
} pending_pick;
typedef struct {
@@ -55,7 +55,7 @@ typedef struct {
/** workqueue for async work */
grpc_workqueue *workqueue;
- grpc_iomgr_closure connectivity_changed;
+ grpc_closure connectivity_changed;
/** mutex protecting remaining members */
gpr_mu mu;
@@ -108,7 +108,7 @@ void pf_destroy(grpc_lb_policy *pol) {
gpr_free(p);
}
-void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
+void pf_shutdown(grpc_lb_policy *pol, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -122,14 +122,13 @@ void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_iomgr_call_list_add(call_list, pp->on_complete, 1);
+ grpc_call_list_add(call_list, pp->on_complete, 1);
gpr_free(pp);
pp = next;
}
}
-static void start_picking(pick_first_lb_policy *p,
- grpc_iomgr_call_list *call_list) {
+static void start_picking(pick_first_lb_policy *p, grpc_call_list *call_list) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
@@ -139,7 +138,7 @@ static void start_picking(pick_first_lb_policy *p,
&p->connectivity_changed, call_list);
}
-void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
+void pf_exit_idle(grpc_lb_policy *pol, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
gpr_mu_lock(&p->mu);
if (!p->started_picking) {
@@ -150,7 +149,7 @@ void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
- grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
+ grpc_closure *on_complete, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -178,7 +177,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
int unref = 0;
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&p->mu);
@@ -205,7 +204,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
- grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
+ grpc_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
@@ -251,7 +250,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
+ grpc_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
unref = 1;
@@ -270,7 +269,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
gpr_mu_unlock(&p->mu);
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
if (unref) {
GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
@@ -278,7 +277,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
}
static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
- grpc_iomgr_call_list *call_list) {
+ grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
size_t n;
@@ -301,7 +300,7 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
}
static grpc_connectivity_state pf_check_connectivity(
- grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
+ grpc_lb_policy *pol, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
@@ -312,8 +311,8 @@ static grpc_connectivity_state pf_check_connectivity(
void pf_notify_on_state_change(grpc_lb_policy *pol,
grpc_connectivity_state *current,
- grpc_iomgr_closure *notify,
- grpc_iomgr_call_list *call_list) {
+ grpc_closure *notify,
+ grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
gpr_mu_lock(&p->mu);
grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
@@ -348,7 +347,7 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
"pick_first");
memcpy(p->subchannels, args->subchannels,
sizeof(grpc_subchannel *) * args->num_subchannels);
- grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
+ grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
gpr_mu_init(&p->mu);
return &p->base;
}
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index a9dc9dcca8..95ab29cd88 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -64,37 +64,36 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy) {
}
void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
- grpc_iomgr_call_list *call_list) {
+ grpc_call_list *call_list) {
policy->vtable->shutdown(policy, call_list);
}
void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **target,
- grpc_iomgr_closure *on_complete,
- grpc_iomgr_call_list *call_list) {
+ grpc_subchannel **target, grpc_closure *on_complete,
+ grpc_call_list *call_list) {
policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete,
call_list);
}
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
- grpc_iomgr_call_list *call_list) {
+ grpc_call_list *call_list) {
policy->vtable->broadcast(policy, op, call_list);
}
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
- grpc_iomgr_call_list *call_list) {
+ grpc_call_list *call_list) {
policy->vtable->exit_idle(policy, call_list);
}
void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
grpc_connectivity_state *state,
- grpc_iomgr_closure *closure,
- grpc_iomgr_call_list *call_list) {
+ grpc_closure *closure,
+ grpc_call_list *call_list) {
policy->vtable->notify_on_state_change(policy, state, closure, call_list);
}
grpc_connectivity_state grpc_lb_policy_check_connectivity(
- grpc_lb_policy *policy, grpc_iomgr_call_list *call_list) {
+ grpc_lb_policy *policy, grpc_call_list *call_list) {
return policy->vtable->check_connectivity(policy, call_list);
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 8d7eb579b5..e445d2cded 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -53,31 +53,30 @@ struct grpc_lb_policy {
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_lb_policy *policy);
- void (*shutdown)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
+ void (*shutdown)(grpc_lb_policy *policy, grpc_call_list *call_list);
/** implement grpc_lb_policy_pick */
void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
- grpc_iomgr_closure *on_complete,
- grpc_iomgr_call_list *call_list);
+ grpc_closure *on_complete, grpc_call_list *call_list);
/** try to enter a READY connectivity state */
- void (*exit_idle)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
+ void (*exit_idle)(grpc_lb_policy *policy, grpc_call_list *call_list);
/** broadcast a transport op to all subchannels */
void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op,
- grpc_iomgr_call_list *call_list);
+ grpc_call_list *call_list);
/** check the current connectivity of the lb_policy */
- grpc_connectivity_state (*check_connectivity)(
- grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
+ grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy,
+ grpc_call_list *call_list);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy */
void (*notify_on_state_change)(grpc_lb_policy *policy,
grpc_connectivity_state *state,
- grpc_iomgr_closure *closure,
- grpc_iomgr_call_list *call_list);
+ grpc_closure *closure,
+ grpc_call_list *call_list);
};
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -101,8 +100,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
/** Start shutting down (fail any pending picks) */
-void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
- grpc_iomgr_call_list *call_list);
+void grpc_lb_policy_shutdown(grpc_lb_policy *policy, grpc_call_list *call_list);
/** Given initial metadata in \a initial_metadata, find an appropriate
target for this rpc, and 'return' it by calling \a on_complete after setting
@@ -110,22 +108,21 @@ void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
Picking can be asynchronous. Any IO should be done under \a pollset. */
void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **target,
- grpc_iomgr_closure *on_complete,
- grpc_iomgr_call_list *call_list);
+ grpc_subchannel **target, grpc_closure *on_complete,
+ grpc_call_list *call_list);
void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
- grpc_iomgr_call_list *call_list);
+ grpc_call_list *call_list);
void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
- grpc_iomgr_call_list *call_list);
+ grpc_call_list *call_list);
void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
grpc_connectivity_state *state,
- grpc_iomgr_closure *closure,
- grpc_iomgr_call_list *call_list);
+ grpc_closure *closure,
+ grpc_call_list *call_list);
grpc_connectivity_state grpc_lb_policy_check_connectivity(
- grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
+ grpc_lb_policy *policy, grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */
diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c
index 91e42bb684..6a7d73c54f 100644
--- a/src/core/client_config/resolver.c
+++ b/src/core/client_config/resolver.c
@@ -78,6 +78,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver,
void grpc_resolver_next(grpc_resolver *resolver,
grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete) {
+ grpc_closure *on_complete) {
resolver->vtable->next(resolver, target_config, on_complete);
}
diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h
index 8ad87d789b..217d061abe 100644
--- a/src/core/client_config/resolver.h
+++ b/src/core/client_config/resolver.h
@@ -55,7 +55,7 @@ struct grpc_resolver_vtable {
struct sockaddr *failing_address,
int failing_address_len);
void (*next)(grpc_resolver *resolver, grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete);
+ grpc_closure *on_complete);
};
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
@@ -92,6 +92,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver,
schedule on_complete. */
void grpc_resolver_next(grpc_resolver *resolver,
grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete);
+ grpc_closure *on_complete);
#endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_H */
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 4fee543a5b..e251664e03 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -69,7 +69,7 @@ typedef struct {
/** which version of resolved_config is current? */
int resolved_version;
/** pending next completion, or NULL */
- grpc_iomgr_closure *next_completion;
+ grpc_closure *next_completion;
/** target config address for next completion */
grpc_client_config **target_config;
/** current (fully resolved) config */
@@ -79,7 +79,7 @@ typedef struct {
static void dns_destroy(grpc_resolver *r);
static void dns_start_resolving_locked(dns_resolver *r);
-static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r)
+static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r)
GRPC_MUST_USE_RESULT;
static void dns_shutdown(grpc_resolver *r);
@@ -87,14 +87,14 @@ static void dns_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
static void dns_next(grpc_resolver *r, grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete);
+ grpc_closure *on_complete);
static const grpc_resolver_vtable dns_resolver_vtable = {
dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next};
static void dns_shutdown(grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
- grpc_iomgr_closure *next_completion;
+ grpc_closure *next_completion;
gpr_mu_lock(&r->mu);
next_completion = r->next_completion;
r->next_completion = NULL;
@@ -117,9 +117,9 @@ static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
static void dns_next(grpc_resolver *resolver,
grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete) {
+ grpc_closure *on_complete) {
dns_resolver *r = (dns_resolver *)resolver;
- grpc_iomgr_closure *call = NULL;
+ grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
@@ -141,7 +141,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
- grpc_iomgr_closure *call;
+ grpc_closure *call;
size_t i;
if (addresses) {
grpc_lb_policy_args lb_policy_args;
@@ -188,8 +188,8 @@ static void dns_start_resolving_locked(dns_resolver *r) {
grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r);
}
-static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) {
- grpc_iomgr_closure *ret = NULL;
+static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r) {
+ grpc_closure *ret = NULL;
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 220915853c..47fe5fad4a 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -73,22 +73,22 @@ typedef struct {
/** have we published? */
int published;
/** pending next completion, or NULL */
- grpc_iomgr_closure *next_completion;
+ grpc_closure *next_completion;
/** target config address for next completion */
grpc_client_config **target_config;
} sockaddr_resolver;
static void sockaddr_destroy(grpc_resolver *r);
-static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked(
- sockaddr_resolver *r) GRPC_MUST_USE_RESULT;
+static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r)
+ GRPC_MUST_USE_RESULT;
static void sockaddr_shutdown(grpc_resolver *r);
static void sockaddr_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
static void sockaddr_next(grpc_resolver *r, grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete);
+ grpc_closure *on_complete);
static const grpc_resolver_vtable sockaddr_resolver_vtable = {
sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error,
@@ -96,7 +96,7 @@ static const grpc_resolver_vtable sockaddr_resolver_vtable = {
static void sockaddr_shutdown(grpc_resolver *resolver) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- grpc_iomgr_closure *call = NULL;
+ grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
@@ -114,9 +114,9 @@ static void sockaddr_channel_saw_error(grpc_resolver *resolver,
static void sockaddr_next(grpc_resolver *resolver,
grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete) {
+ grpc_closure *on_complete) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- grpc_iomgr_closure *call = NULL;
+ grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
@@ -126,14 +126,13 @@ static void sockaddr_next(grpc_resolver *resolver,
if (call) call->cb(call->cb_arg, 1);
}
-static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked(
- sockaddr_resolver *r) {
+static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
grpc_lb_policy_args lb_policy_args;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
- grpc_iomgr_closure *call = NULL;
+ grpc_closure *call = NULL;
if (r->next_completion != NULL && !r->published) {
size_t i;
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 94ec12dd16..5632f0425e 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -73,7 +73,7 @@ typedef struct {
/** which version of resolved_config is current? */
int resolved_version;
/** pending next completion, or NULL */
- grpc_iomgr_closure *next_completion;
+ grpc_closure *next_completion;
/** target config address for next completion */
grpc_client_config **target_config;
/** current (fully resolved) config */
@@ -92,15 +92,15 @@ typedef struct {
static void zookeeper_destroy(grpc_resolver *r);
static void zookeeper_start_resolving_locked(zookeeper_resolver *r);
-static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked(
- zookeeper_resolver *r) GRPC_MUST_USE_RESULT;
+static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r)
+ GRPC_MUST_USE_RESULT;
static void zookeeper_shutdown(grpc_resolver *r);
static void zookeeper_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
int failing_address_len);
static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete);
+ grpc_closure *on_complete);
static const grpc_resolver_vtable zookeeper_resolver_vtable = {
zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error,
@@ -108,7 +108,7 @@ static const grpc_resolver_vtable zookeeper_resolver_vtable = {
static void zookeeper_shutdown(grpc_resolver *resolver) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
- grpc_iomgr_closure *call = NULL;
+ grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
@@ -134,9 +134,9 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver,
static void zookeeper_next(grpc_resolver *resolver,
grpc_client_config **target_config,
- grpc_iomgr_closure *on_complete) {
+ grpc_closure *on_complete) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
- grpc_iomgr_closure *call;
+ grpc_closure *call;
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->next_completion == NULL);
r->next_completion = on_complete;
@@ -189,7 +189,7 @@ static void zookeeper_on_resolved(void *arg,
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
- grpc_iomgr_closure *call;
+ grpc_closure *call;
size_t i;
if (addresses != NULL) {
grpc_lb_policy_args lb_policy_args;
@@ -414,9 +414,8 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
zookeeper_resolve_address(r);
}
-static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked(
- zookeeper_resolver *r) {
- grpc_iomgr_closure *call = NULL;
+static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) {
+ grpc_closure *call = NULL;
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index b15acf826a..99310e02e5 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -59,7 +59,7 @@ typedef struct {
} connection;
typedef struct {
- grpc_iomgr_closure closure;
+ grpc_closure closure;
size_t version;
grpc_subchannel *subchannel;
grpc_connectivity_state connectivity_state;
@@ -67,11 +67,11 @@ typedef struct {
typedef struct waiting_for_connect {
struct waiting_for_connect *next;
- grpc_iomgr_closure *notify;
+ grpc_closure *notify;
grpc_pollset *pollset;
grpc_subchannel_call **target;
grpc_subchannel *subchannel;
- grpc_iomgr_closure continuation;
+ grpc_closure continuation;
} waiting_for_connect;
struct grpc_subchannel {
@@ -100,7 +100,7 @@ struct grpc_subchannel {
grpc_connect_out_args connecting_result;
/** callback for connection finishing */
- grpc_iomgr_closure connected;
+ grpc_closure connected;
/** pollset_set tracking who's interested in a connection
being setup - owned by the master channel (in particular the
@@ -147,7 +147,7 @@ struct grpc_subchannel_call {
static grpc_subchannel_call *create_call(connection *con);
static void connectivity_state_changed_locked(grpc_subchannel *c,
const char *reason,
- grpc_iomgr_call_list *call_list);
+ grpc_call_list *call_list);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success);
@@ -303,7 +303,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
c->random = random_seed();
grpc_mdctx_ref(c->mdctx);
- grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
+ grpc_closure_init(&c->connected, subchannel_connected, c);
grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
"subchannel");
gpr_mu_init(&c->mu);
@@ -334,7 +334,7 @@ static void start_connect(grpc_subchannel *c) {
static void continue_creating_call(void *arg, int iomgr_success) {
waiting_for_connect *w4c = arg;
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset);
grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
w4c->notify, &call_list);
@@ -344,8 +344,8 @@ static void continue_creating_call(void *arg, int iomgr_success) {
void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
grpc_subchannel_call **target,
- grpc_iomgr_closure *notify,
- grpc_iomgr_call_list *call_list) {
+ grpc_closure *notify,
+ grpc_call_list *call_list) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@@ -364,7 +364,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
w4c->subchannel = c;
/* released when clearing w4c */
SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
- grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
+ grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, pollset);
if (!c->connecting) {
@@ -392,8 +392,8 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_connectivity_state *state,
- grpc_iomgr_closure *notify,
- grpc_iomgr_call_list *call_list) {
+ grpc_closure *notify,
+ grpc_call_list *call_list) {
int do_connect = 0;
gpr_mu_lock(&c->mu);
if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
@@ -417,7 +417,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
connection *con = NULL;
grpc_subchannel *destroy;
int cancel_alarm = 0;
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
@@ -454,7 +454,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_connector_shutdown(c->connector);
}
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
}
static void on_state_changed(void *p, int iomgr_success) {
@@ -465,7 +465,7 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(mu);
@@ -514,11 +514,10 @@ done:
if (destroy_connection != NULL) {
connection_destroy(destroy_connection);
}
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
}
-static void publish_transport(grpc_subchannel *c,
- grpc_iomgr_call_list *call_list) {
+static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
size_t channel_stack_size;
connection *con;
grpc_channel_stack *stk;
@@ -552,7 +551,7 @@ static void publish_transport(grpc_subchannel *c,
/* initialize state watcher */
sw = gpr_malloc(sizeof(*sw));
- grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw);
+ grpc_closure_init(&sw->closure, on_state_changed, sw);
sw->subchannel = c;
sw->connectivity_state = GRPC_CHANNEL_READY;
@@ -599,7 +598,7 @@ static void publish_transport(grpc_subchannel *c,
while (w4c != NULL) {
waiting_for_connect *next = w4c;
- grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1);
+ grpc_call_list_add(call_list, &w4c->continuation, 1);
w4c = next;
}
@@ -641,7 +640,7 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
static void on_alarm(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
@@ -656,12 +655,12 @@ static void on_alarm(void *arg, int iomgr_success) {
GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(c, "connecting");
}
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
}
static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
if (c->connecting_result.transport != NULL) {
publish_transport(c, &call_list);
} else {
@@ -673,7 +672,7 @@ static void subchannel_connected(void *arg, int iomgr_success) {
grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
gpr_mu_unlock(&c->mu);
}
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
}
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
@@ -705,7 +704,7 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
static void connectivity_state_changed_locked(grpc_subchannel *c,
const char *reason,
- grpc_iomgr_call_list *call_list) {
+ grpc_call_list *call_list) {
grpc_connectivity_state current = compute_connectivity_locked(c);
grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list);
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 7c00ff172d..189e531d35 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -76,8 +76,8 @@ void grpc_subchannel_call_unref(
void grpc_subchannel_create_call(grpc_subchannel *subchannel,
grpc_pollset *pollset,
grpc_subchannel_call **target,
- grpc_iomgr_closure *notify,
- grpc_iomgr_call_list *call_list);
+ grpc_closure *notify,
+ grpc_call_list *call_list);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel,
@@ -91,8 +91,8 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
Updates *state with the new state of the channel */
void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
grpc_connectivity_state *state,
- grpc_iomgr_closure *notify,
- grpc_iomgr_call_list *call_list);
+ grpc_closure *notify,
+ grpc_call_list *call_list);
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_subchannel *channel,
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index 75f3f7db4b..c484f4c7b3 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -63,8 +63,8 @@ typedef struct {
grpc_iomgr_object iomgr_obj;
gpr_slice_buffer incoming;
gpr_slice_buffer outgoing;
- grpc_iomgr_closure on_read;
- grpc_iomgr_closure done_write;
+ grpc_closure on_read;
+ grpc_closure done_write;
grpc_workqueue *workqueue;
} internal_request;
@@ -237,8 +237,8 @@ static void internal_request_begin(grpc_httpcli_context *context,
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
req->pollset = pollset;
- grpc_iomgr_closure_init(&req->on_read, on_read, req);
- grpc_iomgr_closure_init(&req->done_write, done_write, req);
+ grpc_closure_init(&req->on_read, on_read, req);
+ grpc_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming);
gpr_slice_buffer_init(&req->outgoing);
grpc_iomgr_register_object(&req->iomgr_obj, name);
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index a7878e31dd..1955f74b9a 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -35,13 +35,13 @@
grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep,
gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
return ep->vtable->read(ep, slices, cb);
}
grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep,
gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
return ep->vtable->write(ep, slices, cb);
}
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index d14d52d561..79b7d6a78e 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -54,9 +54,9 @@ typedef enum grpc_endpoint_op_status {
struct grpc_endpoint_vtable {
grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb);
+ grpc_closure *cb);
grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb);
+ grpc_closure *cb);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset);
void (*shutdown)(grpc_endpoint *ep);
@@ -70,7 +70,7 @@ struct grpc_endpoint_vtable {
Valid slices may be placed into \a slices even on callback success == 0. */
grpc_endpoint_op_status grpc_endpoint_read(
grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
+ grpc_closure *cb) GRPC_MUST_USE_RESULT;
char *grpc_endpoint_get_peer(grpc_endpoint *ep);
@@ -86,7 +86,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
*/
grpc_endpoint_op_status grpc_endpoint_write(
grpc_endpoint *ep, gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) GRPC_MUST_USE_RESULT;
+ grpc_closure *cb) GRPC_MUST_USE_RESULT;
/* Causes any pending read/write callbacks to run immediately with
success==0 */
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 5bdce0bfd8..dfebbcc2e1 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -218,8 +218,7 @@ static int has_watchers(grpc_fd *fd) {
fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
}
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
- const char *reason) {
+void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason) {
fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
gpr_mu_lock(&fd->watcher_mu);
@@ -253,7 +252,7 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
-static void process_callback(grpc_iomgr_closure *closure, int success,
+static void process_callback(grpc_closure *closure, int success,
grpc_workqueue *optional_workqueue) {
if (optional_workqueue == NULL) {
closure->cb(closure->cb_arg, success);
@@ -262,15 +261,15 @@ static void process_callback(grpc_iomgr_closure *closure, int success,
}
}
-static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
- int success, grpc_workqueue *optional_workqueue) {
+static void process_callbacks(grpc_closure *callbacks, size_t n, int success,
+ grpc_workqueue *optional_workqueue) {
size_t i;
for (i = 0; i < n; i++) {
process_callback(callbacks + i, success, optional_workqueue);
}
}
-static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
+static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure,
int allow_synchronous_callback) {
switch (gpr_atm_acq_load(st)) {
case NOT_READY:
@@ -307,7 +306,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
abort();
}
-static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
+static void set_ready_locked(gpr_atm *st, grpc_closure **callbacks,
size_t *ncallbacks) {
gpr_intptr state = gpr_atm_acq_load(st);
@@ -327,7 +326,7 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
default: /* waiting */
GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
gpr_atm_no_barrier_load(st) != NOT_READY);
- callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state;
+ callbacks[(*ncallbacks)++] = (grpc_closure *)state;
gpr_atm_rel_store(st, NOT_READY);
return;
}
@@ -338,7 +337,7 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
- grpc_iomgr_closure *closure;
+ grpc_closure *closure;
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
@@ -365,11 +364,11 @@ void grpc_fd_shutdown(grpc_fd *fd) {
0 /* GPR_FALSE */);
}
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
+void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure) {
notify_on(fd, &fd->readst, closure, 0);
}
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) {
+void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure) {
notify_on(fd, &fd->writest, closure, 0);
}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index e5157ad342..bb85b6c16e 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -96,8 +96,8 @@ struct grpc_fd {
struct grpc_fd *freelist_next;
- grpc_iomgr_closure *on_done_closure;
- grpc_iomgr_closure *shutdown_closures[2];
+ grpc_closure *on_done_closure;
+ grpc_closure *shutdown_closures[2];
grpc_iomgr_object iomgr_object;
};
@@ -113,8 +113,7 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name);
Requires: *fd initialized; no outstanding notify_on_read or
notify_on_write.
MUST NOT be called with a pollset lock taken */
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
- const char *reason);
+void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason);
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
@@ -153,10 +152,10 @@ void grpc_fd_shutdown(grpc_fd *fd);
underlying platform. This means that users must drain fd in read_cb before
calling notify_on_read again. Users are also expected to handle spurious
events, i.e read_cb is called while nothing can be readable from fd */
-void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure);
+void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure);
/* Exactly the same semantics as above, except based on writable events. */
-void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure);
+void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure);
/* Notification from the poller to an fd that it has become readable or
writable.
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index ba8f73fe08..dd76044913 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -151,15 +151,15 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_free(obj->name);
}
-void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg) {
+void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg) {
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->next = NULL;
}
-void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list,
- grpc_iomgr_closure *closure, int success) {
+void grpc_call_list_add(grpc_call_list *call_list, grpc_closure *closure,
+ int success) {
if (!closure) return;
closure->next = NULL;
closure->success = success;
@@ -171,21 +171,20 @@ void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list,
call_list->tail = closure;
}
-void grpc_iomgr_call_list_run(grpc_iomgr_call_list call_list) {
- grpc_iomgr_closure *c = call_list.head;
+void grpc_call_list_run(grpc_call_list call_list) {
+ grpc_closure *c = call_list.head;
while (c) {
- grpc_iomgr_closure *next = c->next;
+ grpc_closure *next = c->next;
c->cb(c->cb_arg, c->success);
c = next;
}
}
-int grpc_iomgr_call_list_empty(grpc_iomgr_call_list call_list) {
+int grpc_call_list_empty(grpc_call_list call_list) {
return call_list.head == NULL;
}
-void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
- grpc_iomgr_call_list *dst) {
+void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst) {
if (dst->head == NULL) {
*dst = *src;
return;
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 58c31763d2..56f0195be9 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -42,7 +42,7 @@
typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
/** A closure over a grpc_iomgr_cb_func. */
-typedef struct grpc_iomgr_closure {
+typedef struct grpc_closure {
/** Bound callback. */
grpc_iomgr_cb_func cb;
@@ -55,27 +55,26 @@ typedef struct grpc_iomgr_closure {
int success;
/**< Internal. Do not touch */
- struct grpc_iomgr_closure *next;
-} grpc_iomgr_closure;
+ struct grpc_closure *next;
+} grpc_closure;
-typedef struct grpc_iomgr_call_list {
- grpc_iomgr_closure *head;
- grpc_iomgr_closure *tail;
-} grpc_iomgr_call_list;
+typedef struct grpc_call_list {
+ grpc_closure *head;
+ grpc_closure *tail;
+} grpc_call_list;
/** Initializes \a closure with \a cb and \a cb_arg. */
-void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg);
+void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg);
-#define GRPC_IOMGR_CALL_LIST_INIT \
+#define GRPC_CALL_LIST_INIT \
{ NULL, NULL }
-void grpc_iomgr_call_list_add(grpc_iomgr_call_list *list,
- grpc_iomgr_closure *closure, int success);
-void grpc_iomgr_call_list_run(grpc_iomgr_call_list list);
-void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
- grpc_iomgr_call_list *dst);
-int grpc_iomgr_call_list_empty(grpc_iomgr_call_list list);
+void grpc_call_list_add(grpc_call_list *list, grpc_closure *closure,
+ int success);
+void grpc_call_list_run(grpc_call_list list);
+void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst);
+int grpc_call_list_empty(grpc_call_list list);
/** Initializes the iomgr. */
void grpc_iomgr_init(void);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 253f3190f1..5ca957b8e1 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -53,7 +53,7 @@ typedef struct wakeup_fd_hdl {
typedef struct {
grpc_pollset *pollset;
grpc_fd *fd;
- grpc_iomgr_closure closure;
+ grpc_closure closure;
} delayed_add;
typedef struct {
@@ -125,7 +125,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
da->pollset = pollset;
da->fd = fd;
GRPC_FD_REF(fd, "delayed_add");
- grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
+ grpc_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
grpc_pollset_add_unlock_job(pollset, &da->closure);
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 107ffb0b5e..0fe3f80d44 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -175,30 +175,28 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
-static void run_jobs(grpc_pollset *pollset, grpc_iomgr_closure **root) {
- grpc_iomgr_closure *exec = *root;
+static void run_jobs(grpc_pollset *pollset, grpc_closure **root) {
+ grpc_closure *exec = *root;
*root = NULL;
gpr_mu_unlock(&pollset->mu);
while (exec != NULL) {
- grpc_iomgr_closure *next = exec->next;
+ grpc_closure *next = exec->next;
exec->cb(exec->cb_arg, 1);
exec = next;
}
gpr_mu_lock(&pollset->mu);
}
-static void add_job(grpc_iomgr_closure **root, grpc_iomgr_closure *closure) {
+static void add_job(grpc_closure **root, grpc_closure *closure) {
closure->next = *root;
*root = closure;
}
-void grpc_pollset_add_idle_job(grpc_pollset *pollset,
- grpc_iomgr_closure *closure) {
+void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure) {
add_job(&pollset->idle_jobs, closure);
}
-void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
- grpc_iomgr_closure *closure) {
+void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure) {
add_job(&pollset->unlock_jobs, closure);
}
@@ -316,7 +314,7 @@ typedef struct grpc_unary_promote_args {
const grpc_pollset_vtable *original_vtable;
grpc_pollset *pollset;
grpc_fd *fd;
- grpc_iomgr_closure promotion_closure;
+ grpc_closure promotion_closure;
} grpc_unary_promote_args;
static void basic_do_promote(void *args, int success) {
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 3f89095d40..4f09f870f7 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -67,8 +67,8 @@ typedef struct grpc_pollset {
int kicked_without_pollers;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
- grpc_iomgr_closure *unlock_jobs;
- grpc_iomgr_closure *idle_jobs;
+ grpc_closure *unlock_jobs;
+ grpc_closure *idle_jobs;
union {
int fd;
void *ptr;
@@ -128,10 +128,8 @@ typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
/** schedule a closure to be run next time there are no active workers */
-void grpc_pollset_add_idle_job(grpc_pollset *pollset,
- grpc_iomgr_closure *closure);
+void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure);
/** schedule a closure to be run next time the pollset is unlocked */
-void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
- grpc_iomgr_closure *closure);
+void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure);
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index 498921e0fd..45bc657225 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -91,7 +91,7 @@ typedef struct grpc_winsocket {
This prevents that. */
int added_to_iocp;
- grpc_iomgr_closure shutdown_closure;
+ grpc_closure shutdown_closure;
/* A label for iomgr to track outstanding objects */
grpc_iomgr_object iomgr_object;
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 0df984c3e6..1ea2155060 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -64,7 +64,7 @@ typedef struct {
gpr_timespec deadline;
grpc_alarm alarm;
int refs;
- grpc_iomgr_closure write_closure;
+ grpc_closure write_closure;
grpc_pollset_set *interested_parties;
char *addr_str;
} async_connect;
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index c539cf2d34..374d2f3a40 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -85,11 +85,11 @@ typedef struct {
/** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
size_t outgoing_byte_idx;
- grpc_iomgr_closure *read_cb;
- grpc_iomgr_closure *write_cb;
+ grpc_closure *read_cb;
+ grpc_closure *write_cb;
- grpc_iomgr_closure read_closure;
- grpc_iomgr_closure write_closure;
+ grpc_closure read_closure;
+ grpc_closure write_closure;
char *peer_string;
} grpc_tcp;
@@ -145,7 +145,7 @@ static void tcp_destroy(grpc_endpoint *ep) {
}
static void call_read_cb(grpc_tcp *tcp, int success) {
- grpc_iomgr_closure *cb = tcp->read_cb;
+ grpc_closure *cb = tcp->read_cb;
if (grpc_tcp_trace) {
size_t i;
@@ -250,7 +250,7 @@ static void tcp_handle_read(void *arg /* grpc_tcp */, int success) {
static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
gpr_slice_buffer *incoming_buffer,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
@@ -350,7 +350,7 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) {
static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
grpc_endpoint_op_status status;
- grpc_iomgr_closure *cb;
+ grpc_closure *cb;
if (!success) {
cb = tcp->write_cb;
@@ -375,7 +375,7 @@ static void tcp_handle_write(void *arg /* grpc_tcp */, int success) {
static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep,
gpr_slice_buffer *buf,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_endpoint_op_status status;
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 0b79678e9d..213b2e1113 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -84,8 +84,8 @@ typedef struct {
struct sockaddr_un un;
} addr;
size_t addr_len;
- grpc_iomgr_closure read_closure;
- grpc_iomgr_closure destroyed_closure;
+ grpc_closure read_closure;
+ grpc_closure destroyed_closure;
} server_port;
static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c
index fe3673c607..e361d22dc3 100644
--- a/src/core/iomgr/tcp_windows.c
+++ b/src/core/iomgr/tcp_windows.c
@@ -82,8 +82,8 @@ typedef struct grpc_tcp {
/* Refcounting how many operations are in progress. */
gpr_refcount refcount;
- grpc_iomgr_closure *read_cb;
- grpc_iomgr_closure *write_cb;
+ grpc_closure *read_cb;
+ grpc_closure *write_cb;
gpr_slice read_slice;
gpr_slice_buffer *write_slices;
gpr_slice_buffer *read_slices;
@@ -169,7 +169,7 @@ static int on_read(grpc_tcp *tcp, int success) {
static void on_read_cb(void *tcpp, int from_iocp) {
grpc_tcp *tcp = tcpp;
- grpc_iomgr_closure *cb = tcp->read_cb;
+ grpc_closure *cb = tcp->read_cb;
int success = on_read(tcp, from_iocp);
tcp->read_cb = NULL;
TCP_UNREF(tcp, "read");
@@ -180,7 +180,7 @@ static void on_read_cb(void *tcpp, int from_iocp) {
static grpc_endpoint_op_status win_read(grpc_endpoint *ep,
gpr_slice_buffer *read_slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->read_info;
@@ -241,7 +241,7 @@ static void on_write(void *tcpp, int success) {
grpc_tcp *tcp = (grpc_tcp *)tcpp;
grpc_winsocket *handle = tcp->socket;
grpc_winsocket_callback_info *info = &handle->write_info;
- grpc_iomgr_closure *cb;
+ grpc_closure *cb;
int do_abort = 0;
gpr_mu_lock(&tcp->mu);
@@ -269,7 +269,7 @@ static void on_write(void *tcpp, int success) {
/* Initiates a write. */
static grpc_endpoint_op_status win_write(grpc_endpoint *ep,
gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_winsocket *socket = tcp->socket;
grpc_winsocket_callback_info *info = &socket->write_info;
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index 30957f8dee..1a1d812050 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -79,8 +79,8 @@ typedef struct {
struct sockaddr_un un;
} addr;
size_t addr_len;
- grpc_iomgr_closure read_closure;
- grpc_iomgr_closure destroyed_closure;
+ grpc_closure read_closure;
+ grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
} server_port;
diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h
index 124f294a23..6f09399b55 100644
--- a/src/core/iomgr/workqueue.h
+++ b/src/core/iomgr/workqueue.h
@@ -52,7 +52,7 @@ typedef struct grpc_workqueue grpc_workqueue;
/** Create a work queue */
grpc_workqueue *grpc_workqueue_create(void);
-void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously);
+void grpc_workqueue_flush(grpc_workqueue *workqueue);
#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
@@ -76,7 +76,7 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
grpc_pollset *pollset);
/** Add a work item to a workqueue */
-void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
+void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
int success);
#endif
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index ead8c50d8e..85b541f4d2 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -41,7 +41,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include <grpc/support/thd.h>
+#include <grpc/support/useful.h>
#include "src/core/iomgr/fd_posix.h"
@@ -52,71 +52,19 @@ grpc_workqueue *grpc_workqueue_create(void) {
grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue));
gpr_ref_init(&workqueue->refs, 1);
gpr_mu_init(&workqueue->mu);
- workqueue->head.next = NULL;
- workqueue->tail = &workqueue->head;
+ workqueue->call_list.head = workqueue->call_list.tail = NULL;
grpc_wakeup_fd_init(&workqueue->wakeup_fd);
sprintf(name, "workqueue:%p", (void *)workqueue);
workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */
workqueue->wakeup_read_fd = grpc_fd_create(
GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name);
- grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue);
+ grpc_closure_init(&workqueue->read_closure, on_readable, workqueue);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
return workqueue;
}
-static void shutdown_thread(void *arg) {
- grpc_iomgr_closure *todo = arg;
-
- while (todo) {
- grpc_iomgr_closure *next = todo->next;
- todo->cb(todo->cb_arg, todo->success);
- todo = next;
- }
-}
-
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
-static size_t count_waiting(grpc_workqueue *workqueue) {
- size_t i = 0;
- grpc_iomgr_closure *c;
- for (c = workqueue->head.next; c; c = c->next) {
- i++;
- }
- return i;
-}
-#endif
-
-void grpc_workqueue_flush(grpc_workqueue *workqueue, int asynchronously) {
- grpc_iomgr_closure *todo;
- gpr_thd_id thd;
-
- gpr_mu_lock(&workqueue->mu);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
- if (workqueue->head.next) {
- gpr_log(GPR_DEBUG, "WORKQUEUE:%p flush %d objects %s", workqueue,
- count_waiting(workqueue),
- asynchronously ? "asynchronously" : "synchronously");
- }
-#endif
- todo = workqueue->head.next;
- workqueue->head.next = NULL;
- workqueue->tail = &workqueue->head;
- gpr_mu_unlock(&workqueue->mu);
-
- if (todo != NULL) {
- if (asynchronously) {
- gpr_thd_new(&thd, shutdown_thread, todo, NULL);
- } else {
- while (todo) {
- grpc_iomgr_closure *next = todo->next;
- todo->cb(todo->cb_arg, todo->success);
- todo = next;
- }
- }
- }
-}
-
static void workqueue_destroy(grpc_workqueue *workqueue) {
- GPR_ASSERT(workqueue->tail == &workqueue->head);
+ GPR_ASSERT(grpc_call_list_empty(workqueue->call_list));
grpc_fd_shutdown(workqueue->wakeup_read_fd);
}
@@ -151,9 +99,16 @@ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd);
}
+void grpc_workqueue_flush(grpc_workqueue *workqueue) {
+ grpc_call_list todo = GRPC_CALL_LIST_INIT;
+ gpr_mu_lock(&workqueue->mu);
+ GPR_SWAP(grpc_call_list, todo, workqueue->call_list);
+ gpr_mu_unlock(&workqueue->mu);
+ grpc_call_list_run(todo);
+}
+
static void on_readable(void *arg, int success) {
grpc_workqueue *workqueue = arg;
- grpc_iomgr_closure *todo;
if (!success) {
gpr_mu_destroy(&workqueue->mu);
@@ -162,42 +117,26 @@ static void on_readable(void *arg, int success) {
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy");
gpr_free(workqueue);
- return;
} else {
+ grpc_call_list todo = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&workqueue->mu);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
- gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
- count_waiting(workqueue));
-#endif
- todo = workqueue->head.next;
- workqueue->head.next = NULL;
- workqueue->tail = &workqueue->head;
+ GPR_SWAP(grpc_call_list, todo, workqueue->call_list);
grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu);
grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
-
- while (todo) {
- grpc_iomgr_closure *next = todo->next;
- todo->cb(todo->cb_arg, todo->success);
- todo = next;
- }
+ grpc_call_list_run(todo);
}
}
-void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
+void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure,
int success) {
closure->success = success;
closure->next = NULL;
gpr_mu_lock(&workqueue->mu);
- if (workqueue->tail == &workqueue->head) {
+ if (grpc_call_list_empty(workqueue->call_list)) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
}
- workqueue->tail->next = closure;
- workqueue->tail = closure;
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
- gpr_log(GPR_DEBUG, "WORKQUEUE:%p %d objects", workqueue,
- count_waiting(workqueue));
-#endif
+ grpc_call_list_add(&workqueue->call_list, closure, success);
gpr_mu_unlock(&workqueue->mu);
}
diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h
index 1b3a0e281b..22c48c4926 100644
--- a/src/core/iomgr/workqueue_posix.h
+++ b/src/core/iomgr/workqueue_posix.h
@@ -40,13 +40,12 @@ struct grpc_workqueue {
gpr_refcount refs;
gpr_mu mu;
- grpc_iomgr_closure head;
- grpc_iomgr_closure *tail;
+ grpc_call_list call_list;
grpc_wakeup_fd wakeup_fd;
struct grpc_fd *wakeup_read_fd;
- grpc_iomgr_closure read_closure;
+ grpc_closure read_closure;
};
#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */
diff --git a/src/core/security/secure_endpoint.c b/src/core/security/secure_endpoint.c
index b696e384fc..dcafc0fc0f 100644
--- a/src/core/security/secure_endpoint.c
+++ b/src/core/security/secure_endpoint.c
@@ -49,9 +49,9 @@ typedef struct {
struct tsi_frame_protector *protector;
gpr_mu protector_mu;
/* saved upper level callbacks and user_data. */
- grpc_iomgr_closure *read_cb;
- grpc_iomgr_closure *write_cb;
- grpc_iomgr_closure on_read;
+ grpc_closure *read_cb;
+ grpc_closure *write_cb;
+ grpc_closure on_read;
gpr_slice_buffer *read_buffer;
gpr_slice_buffer source_buffer;
/* saved handshaker leftover data to unprotect. */
@@ -214,7 +214,7 @@ static void on_read_cb(void *user_data, int success) {
static grpc_endpoint_op_status endpoint_read(grpc_endpoint *secure_ep,
gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
secure_endpoint *ep = (secure_endpoint *)secure_ep;
int immediate_read_success = -1;
ep->read_cb = cb;
@@ -257,7 +257,7 @@ static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
static grpc_endpoint_op_status endpoint_write(grpc_endpoint *secure_ep,
gpr_slice_buffer *slices,
- grpc_iomgr_closure *cb) {
+ grpc_closure *cb) {
unsigned i;
tsi_result result = TSI_OK;
secure_endpoint *ep = (secure_endpoint *)secure_ep;
@@ -386,7 +386,7 @@ grpc_endpoint *grpc_secure_endpoint_create(
gpr_slice_buffer_init(&ep->output_buffer);
gpr_slice_buffer_init(&ep->source_buffer);
ep->read_buffer = NULL;
- grpc_iomgr_closure_init(&ep->on_read, on_read_cb, ep);
+ grpc_closure_init(&ep->on_read, on_read_cb, ep);
gpr_mu_init(&ep->protector_mu);
gpr_ref_init(&ep->ref, 1);
return &ep->base;
diff --git a/src/core/security/secure_transport_setup.c b/src/core/security/secure_transport_setup.c
index bf0079577e..181aa74ba9 100644
--- a/src/core/security/secure_transport_setup.c
+++ b/src/core/security/secure_transport_setup.c
@@ -54,8 +54,8 @@ typedef struct {
gpr_slice_buffer outgoing;
grpc_secure_transport_setup_done_cb cb;
void *user_data;
- grpc_iomgr_closure on_handshake_data_sent_to_peer;
- grpc_iomgr_closure on_handshake_data_received_from_peer;
+ grpc_closure on_handshake_data_sent_to_peer;
+ grpc_closure on_handshake_data_received_from_peer;
} grpc_secure_transport_setup;
static void on_handshake_data_received_from_peer(void *setup, int success);
@@ -301,10 +301,10 @@ void grpc_setup_secure_transport(grpc_security_connector *connector,
s->wrapped_endpoint = nonsecure_endpoint;
s->user_data = user_data;
s->cb = cb;
- grpc_iomgr_closure_init(&s->on_handshake_data_sent_to_peer,
- on_handshake_data_sent_to_peer, s);
- grpc_iomgr_closure_init(&s->on_handshake_data_received_from_peer,
- on_handshake_data_received_from_peer, s);
+ grpc_closure_init(&s->on_handshake_data_sent_to_peer,
+ on_handshake_data_sent_to_peer, s);
+ grpc_closure_init(&s->on_handshake_data_received_from_peer,
+ on_handshake_data_received_from_peer, s);
gpr_slice_buffer_init(&s->left_overs);
gpr_slice_buffer_init(&s->outgoing);
gpr_slice_buffer_init(&s->incoming);
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index d134201e87..8468697472 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -44,11 +44,11 @@ typedef struct call_data {
gpr_uint8 got_client_metadata;
grpc_stream_op_buffer *recv_ops;
/* Closure to call when finished with the auth_on_recv hook. */
- grpc_iomgr_closure *on_done_recv;
+ grpc_closure *on_done_recv;
/* Receive closures are chained: we inject this closure as the on_done_recv
up-call on transport_op, and remember to call our on_done_recv member after
handling it. */
- grpc_iomgr_closure auth_on_recv;
+ grpc_closure auth_on_recv;
grpc_transport_stream_op transport_op;
grpc_metadata_array md;
const grpc_metadata *consumed_md;
@@ -202,7 +202,7 @@ static void init_call_elem(grpc_call_element *elem,
/* initialize members */
memset(calld, 0, sizeof(*calld));
- grpc_iomgr_closure_init(&calld->auth_on_recv, auth_on_recv, elem);
+ grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem);
GPR_ASSERT(initial_op && initial_op->context != NULL &&
initial_op->context[GRPC_CONTEXT_SECURITY].value == NULL);
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 8f9e97a396..b5a4155e02 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -256,10 +256,10 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
gpr_uint32 incoming_message_flags;
- grpc_iomgr_closure destroy_closure;
- grpc_iomgr_closure on_done_recv;
- grpc_iomgr_closure on_done_send;
- grpc_iomgr_closure on_done_bind;
+ grpc_closure destroy_closure;
+ grpc_closure on_done_recv;
+ grpc_closure on_done_send;
+ grpc_closure on_done_bind;
/** completion events - for completion queue use */
grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
@@ -333,9 +333,9 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
grpc_sopb_init(&call->send_ops);
grpc_sopb_init(&call->recv_ops);
gpr_slice_buffer_init(&call->incoming_message);
- grpc_iomgr_closure_init(&call->on_done_recv, call_on_done_recv, call);
- grpc_iomgr_closure_init(&call->on_done_send, call_on_done_send, call);
- grpc_iomgr_closure_init(&call->on_done_bind, finished_loose_op, call);
+ grpc_closure_init(&call->on_done_recv, call_on_done_recv, call);
+ grpc_closure_init(&call->on_done_send, call_on_done_send, call);
+ grpc_closure_init(&call->on_done_bind, finished_loose_op, call);
/* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
gpr_ref_init(&call->internal_refcount, 2);
/* server hack: start reads immediately so we can get initial metadata.
@@ -1353,7 +1353,7 @@ static void finished_loose_op(void *call, int success_ignored) {
typedef struct {
grpc_call *call;
- grpc_iomgr_closure closure;
+ grpc_closure closure;
} finished_loose_op_allocated_args;
static void finished_loose_op_allocated(void *alloc, int success) {
@@ -1373,8 +1373,7 @@ static void execute_op(grpc_call *call, grpc_transport_stream_op *op) {
} else {
finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
args->call = call;
- grpc_iomgr_closure_init(&args->closure, finished_loose_op_allocated,
- args);
+ grpc_closure_init(&args->closure, finished_loose_op_allocated, args);
op->on_consumed = &args->closure;
}
}
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index ef47a28fda..4d0cf1ed8b 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -45,7 +45,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
/* forward through to the underlying client channel */
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_connectivity_state state;
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,
@@ -56,7 +56,7 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
}
state = grpc_client_channel_check_connectivity_state(
client_channel_elem, try_to_connect, &call_list);
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
return state;
}
@@ -71,7 +71,7 @@ typedef struct {
gpr_mu mu;
callback_phase phase;
int success;
- grpc_iomgr_closure on_complete;
+ grpc_closure on_complete;
grpc_alarm alarm;
grpc_connectivity_state state;
grpc_completion_queue *cq;
@@ -158,13 +158,13 @@ void grpc_channel_watch_connectivity_state(
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
- grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
state_watcher *w = gpr_malloc(sizeof(*w));
grpc_cq_begin_op(cq);
gpr_mu_init(&w->mu);
- grpc_iomgr_closure_init(&w->on_complete, watch_complete, w);
+ grpc_closure_init(&w->on_complete, watch_complete, w);
w->phase = WAITING;
w->state = last_observed_state;
w->success = 0;
@@ -181,7 +181,7 @@ void grpc_channel_watch_connectivity_state(
"grpc_channel_watch_connectivity_state called on something that is "
"not a client channel, but '%s'",
client_channel_elem->filter->name);
- grpc_iomgr_call_list_add(&call_list, &w->on_complete, 1);
+ grpc_call_list_add(&call_list, &w->on_complete, 1);
} else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
grpc_client_channel_add_interested_party(client_channel_elem,
@@ -190,5 +190,5 @@ void grpc_channel_watch_connectivity_state(
&w->on_complete, &call_list);
}
- grpc_iomgr_call_list_run(call_list);
+ grpc_call_list_run(call_list);
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 908c07b752..f6f42b3d7a 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -52,7 +52,7 @@ typedef struct {
grpc_connector base;
gpr_refcount refs;
- grpc_iomgr_closure *notify;
+ grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
} connector;
@@ -71,7 +71,7 @@ static void connector_unref(grpc_connector *con) {
static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
- grpc_iomgr_closure *notify;
+ grpc_closure *notify;
if (tcp != NULL) {
c->result->transport = grpc_create_chttp2_transport(
c->args.channel_args, tcp, c->args.metadata_context, c->args.workqueue,
@@ -94,7 +94,7 @@ static void connector_shutdown(grpc_connector *con) {}
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
- grpc_iomgr_closure *notify) {
+ grpc_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index 690612d164..d325b3f77d 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -58,7 +58,7 @@ typedef struct {
grpc_channel_security_connector *security_connector;
- grpc_iomgr_closure *notify;
+ grpc_closure *notify;
grpc_connect_in_args args;
grpc_connect_out_args *result;
@@ -83,7 +83,7 @@ static void on_secure_transport_setup_done(void *arg,
grpc_endpoint *wrapped_endpoint,
grpc_endpoint *secure_endpoint) {
connector *c = arg;
- grpc_iomgr_closure *notify;
+ grpc_closure *notify;
gpr_mu_lock(&c->mu);
if (c->connecting_endpoint == NULL) {
memset(c->result, 0, sizeof(*c->result));
@@ -114,7 +114,7 @@ static void on_secure_transport_setup_done(void *arg,
static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
- grpc_iomgr_closure *notify;
+ grpc_closure *notify;
if (tcp != NULL) {
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->connecting_endpoint == NULL);
@@ -145,7 +145,7 @@ static void connector_shutdown(grpc_connector *con) {
static void connector_connect(grpc_connector *con,
const grpc_connect_in_args *args,
grpc_connect_out_args *result,
- grpc_iomgr_closure *notify) {
+ grpc_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index efddb9ee47..75600027ed 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -113,8 +113,8 @@ struct channel_data {
channel_registered_method *registered_methods;
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
- grpc_iomgr_closure finish_destroy_channel_closure;
- grpc_iomgr_closure channel_connectivity_changed;
+ grpc_closure finish_destroy_channel_closure;
+ grpc_closure channel_connectivity_changed;
};
typedef struct shutdown_tag {
@@ -153,10 +153,10 @@ struct call_data {
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
- grpc_iomgr_closure *on_done_recv;
+ grpc_closure *on_done_recv;
- grpc_iomgr_closure server_on_recv;
- grpc_iomgr_closure kill_zombie_closure;
+ grpc_closure server_on_recv;
+ grpc_closure kill_zombie_closure;
call_data *pending_next;
};
@@ -254,7 +254,7 @@ static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
}
struct shutdown_cleanup_args {
- grpc_iomgr_closure closure;
+ grpc_closure closure;
gpr_slice slice;
};
@@ -277,7 +277,7 @@ static void send_shutdown(grpc_channel *channel, int send_goaway,
op.goaway_message = &sc->slice;
op.goaway_status = GRPC_STATUS_OK;
op.disconnect = send_disconnect;
- grpc_iomgr_closure_init(&sc->closure, shutdown_cleanup, sc);
+ grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
op.on_consumed = &sc->closure;
elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
@@ -323,7 +323,7 @@ static void request_matcher_zombify_all_pending_calls(
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(
+ grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_workqueue_push(workqueue, &calld->kill_zombie_closure, 1);
@@ -420,7 +420,7 @@ static void finish_start_new_rpc(grpc_server *server, grpc_call_element *elem,
gpr_mu_lock(&calld->mu_state);
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
return;
}
@@ -603,7 +603,7 @@ static void server_on_recv(void *ptr, int success) {
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_workqueue_push(chand->server->workqueue,
&calld->kill_zombie_closure, 1);
} else {
@@ -615,7 +615,7 @@ static void server_on_recv(void *ptr, int success) {
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
grpc_workqueue_push(chand->server->workqueue,
&calld->kill_zombie_closure, 1);
} else if (calld->state == PENDING) {
@@ -689,7 +689,7 @@ static void init_call_elem(grpc_call_element *elem,
calld->call = grpc_call_from_top_element(elem);
gpr_mu_init(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->server_on_recv, server_on_recv, elem);
+ grpc_closure_init(&calld->server_on_recv, server_on_recv, elem);
server_ref(chand->server);
@@ -729,8 +729,8 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
chand->next = chand->prev = chand;
chand->registered_methods = NULL;
chand->connectivity_state = GRPC_CHANNEL_IDLE;
- grpc_iomgr_closure_init(&chand->channel_connectivity_changed,
- channel_connectivity_changed, chand);
+ grpc_closure_init(&chand->channel_connectivity_changed,
+ channel_connectivity_changed, chand);
}
static void destroy_channel_elem(grpc_channel_element *elem) {
@@ -1077,7 +1077,7 @@ void grpc_server_destroy(grpc_server *server) {
gpr_mu_unlock(&server->mu_global);
- grpc_workqueue_flush(server->workqueue, 0);
+ grpc_workqueue_flush(server->workqueue);
server_unref(server);
}
@@ -1132,7 +1132,7 @@ static grpc_call_error queue_call_request(grpc_server *server,
gpr_mu_lock(&calld->mu_state);
if (calld->state == ZOMBIED) {
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(
+ grpc_closure_init(
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
grpc_workqueue_push(server->workqueue, &calld->kill_zombie_closure, 1);
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 9be1f2f7cf..386f2dd315 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -155,7 +155,7 @@ typedef enum {
/* Outstanding ping request data */
typedef struct grpc_chttp2_outstanding_ping {
gpr_uint8 id[8];
- grpc_iomgr_closure *on_recv;
+ grpc_closure *on_recv;
struct grpc_chttp2_outstanding_ping *next;
struct grpc_chttp2_outstanding_ping *prev;
} grpc_chttp2_outstanding_ping;
@@ -164,7 +164,7 @@ typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** queued callbacks */
- grpc_iomgr_call_list run_at_unlock;
+ grpc_call_list run_at_unlock;
/** window available for us to send to peer */
gpr_int64 outgoing_window;
@@ -214,7 +214,7 @@ typedef struct {
/** is this a client? */
gpr_uint8 is_client;
/** callback for when writing is done */
- grpc_iomgr_closure done_cb;
+ grpc_closure done_cb;
} grpc_chttp2_transport_writing;
struct grpc_chttp2_transport_parsing {
@@ -332,9 +332,9 @@ struct grpc_chttp2_transport {
grpc_chttp2_stream_map new_stream_map;
/** closure to execute writing */
- grpc_iomgr_closure writing_action;
+ grpc_closure writing_action;
/** closure to finish reading from the endpoint */
- grpc_iomgr_closure recv_data;
+ grpc_closure recv_data;
/** incoming read bytes */
gpr_slice_buffer read_buffer;
@@ -359,8 +359,8 @@ typedef struct {
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
gpr_uint32 id;
- grpc_iomgr_closure *send_done_closure;
- grpc_iomgr_closure *recv_done_closure;
+ grpc_closure *send_done_closure;
+ grpc_closure *recv_done_closure;
/** window available for us to send to peer */
gpr_int64 outgoing_window;
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index 1da3f85bde..db6715b43a 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -238,8 +238,8 @@ void grpc_chttp2_cleanup_writing(
stream_global->outgoing_sopb->nops == 0) {
GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
stream_global->outgoing_sopb = NULL;
- grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
- stream_global->send_done_closure, 1);
+ grpc_call_list_add(&transport_global->run_at_unlock,
+ stream_global->send_done_closure, 1);
}
}
stream_global->writing_now = 0;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 50d5a3e8a8..bc056ac0b8 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -250,15 +250,15 @@ static void init_transport(grpc_chttp2_transport *t,
gpr_slice_buffer_init(&t->writing.outbuf);
grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
- grpc_iomgr_closure_init(&t->writing_action, writing_action, t);
+ grpc_closure_init(&t->writing_action, writing_action, t);
gpr_slice_buffer_init(&t->parsing.qbuf);
grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
- grpc_iomgr_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
- &t->writing);
- grpc_iomgr_closure_init(&t->recv_data, recv_data, t);
+ grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
+ &t->writing);
+ grpc_closure_init(&t->recv_data, recv_data, t);
gpr_slice_buffer_init(&t->read_buffer);
if (is_client) {
@@ -499,20 +499,20 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
- grpc_iomgr_call_list run = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_call_list run = GRPC_CALL_LIST_INIT;
unlock_check_read_write_state(t);
if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
- grpc_iomgr_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1);
+ grpc_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1);
prevent_endpoint_shutdown(t);
}
- GPR_SWAP(grpc_iomgr_call_list, run, t->global.run_at_unlock);
+ GPR_SWAP(grpc_call_list, run, t->global.run_at_unlock);
gpr_mu_unlock(&t->mu);
- grpc_iomgr_call_list_run(run);
+ grpc_call_list_run(run);
}
/*
@@ -664,8 +664,8 @@ static void perform_stream_op_locked(
}
} else {
grpc_sopb_reset(op->send_ops);
- grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
- stream_global->send_done_closure, 0);
+ grpc_call_list_add(&transport_global->run_at_unlock,
+ stream_global->send_done_closure, 0);
}
}
@@ -703,8 +703,7 @@ static void perform_stream_op_locked(
op->bind_pollset);
}
- grpc_iomgr_call_list_add(&transport_global->run_at_unlock, op->on_consumed,
- 1);
+ grpc_call_list_add(&transport_global->run_at_unlock, op->on_consumed, 1);
}
static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
@@ -717,8 +716,7 @@ static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
unlock(t);
}
-static void send_ping_locked(grpc_chttp2_transport *t,
- grpc_iomgr_closure *on_recv) {
+static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
p->next = &t->global.pings;
p->prev = p->next->prev;
@@ -741,7 +739,7 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
lock(t);
- grpc_iomgr_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1);
+ grpc_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1);
if (op->on_connectivity_state_change) {
grpc_connectivity_state_notify_on_state_change(
@@ -868,8 +866,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
if (stream_global->outgoing_sopb != NULL) {
grpc_sopb_reset(stream_global->outgoing_sopb);
stream_global->outgoing_sopb = NULL;
- grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
- stream_global->send_done_closure, 1);
+ grpc_call_list_add(&transport_global->run_at_unlock,
+ stream_global->send_done_closure, 1);
}
stream_global->read_closed = 1;
if (!stream_global->published_cancelled) {
@@ -919,8 +917,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
&stream_global->outstanding_metadata);
grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
stream_global->published_state = *stream_global->publish_state = state;
- grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
- stream_global->recv_done_closure, 1);
+ grpc_call_list_add(&transport_global->run_at_unlock,
+ stream_global->recv_done_closure, 1);
stream_global->recv_done_closure = NULL;
stream_global->publish_sopb = NULL;
stream_global->publish_state = NULL;
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index b605da9dbf..8b2e1b9b02 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -91,7 +91,7 @@ grpc_connectivity_state grpc_connectivity_state_check(
int grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
- grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list) {
+ grpc_closure *notify, grpc_call_list *call_list) {
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p",
tracker->name, grpc_connectivity_state_name(*current),
@@ -99,7 +99,7 @@ int grpc_connectivity_state_notify_on_state_change(
}
if (tracker->current_state != *current) {
*current = tracker->current_state;
- grpc_iomgr_call_list_add(call_list, notify, 1);
+ grpc_call_list_add(call_list, notify, 1);
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@@ -113,7 +113,7 @@ int grpc_connectivity_state_notify_on_state_change(
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
const char *reason,
- grpc_iomgr_call_list *call_list) {
+ grpc_call_list *call_list) {
grpc_connectivity_state_watcher *w;
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name,
@@ -128,7 +128,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
while ((w = tracker->watchers) != NULL) {
*w->current = tracker->current_state;
tracker->watchers = w->next;
- grpc_iomgr_call_list_add(call_list, w->notify, 1);
+ grpc_call_list_add(call_list, w->notify, 1);
gpr_free(w);
}
}
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index 49c2815266..a29c655b4e 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -42,7 +42,7 @@ typedef struct grpc_connectivity_state_watcher {
/** we keep watchers in a linked list */
struct grpc_connectivity_state_watcher *next;
/** closure to notify on change */
- grpc_iomgr_closure *notify;
+ grpc_closure *notify;
/** the current state as believed by the watcher */
grpc_connectivity_state *current;
} grpc_connectivity_state_watcher;
@@ -67,8 +67,7 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
* external lock */
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
- const char *reason,
- grpc_iomgr_call_list *call_list);
+ const char *reason, grpc_call_list *call_list);
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker);
@@ -76,6 +75,6 @@ grpc_connectivity_state grpc_connectivity_state_check(
/** Return 1 if the channel should start connecting, 0 otherwise */
int grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
- grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list);
+ grpc_closure *notify, grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index c0d92cf93f..3f6b93c3e8 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -101,8 +101,8 @@ void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
typedef struct {
gpr_slice message;
- grpc_iomgr_closure *then_call;
- grpc_iomgr_closure closure;
+ grpc_closure *then_call;
+ grpc_closure closure;
} close_message_data;
static void free_message(void *p, int iomgr_success) {
@@ -130,7 +130,7 @@ void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
cmd = gpr_malloc(sizeof(*cmd));
cmd->message = *optional_message;
cmd->then_call = op->on_consumed;
- grpc_iomgr_closure_init(&cmd->closure, free_message, cmd);
+ grpc_closure_init(&cmd->closure, free_message, cmd);
op->on_consumed = &cmd->closure;
op->optional_close_message = &cmd->message;
}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index 6e1ec2f64c..271891d430 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -64,11 +64,11 @@ typedef enum grpc_stream_state {
/* Transport stream op: a set of operations to perform on a transport
against a single stream */
typedef struct grpc_transport_stream_op {
- grpc_iomgr_closure *on_consumed;
+ grpc_closure *on_consumed;
grpc_stream_op_buffer *send_ops;
int is_last_send;
- grpc_iomgr_closure *on_done_send;
+ grpc_closure *on_done_send;
grpc_stream_op_buffer *recv_ops;
grpc_stream_state *recv_state;
@@ -76,7 +76,7 @@ typedef struct grpc_transport_stream_op {
These bytes will be eventually used to replenish per-stream flow control
windows. */
size_t max_recv_bytes;
- grpc_iomgr_closure *on_done_recv;
+ grpc_closure *on_done_recv;
grpc_pollset *bind_pollset;
@@ -95,9 +95,9 @@ typedef struct grpc_transport_stream_op {
/** Transport op: a set of operations to perform on a transport as a whole */
typedef struct grpc_transport_op {
/** called when processing of this op is done */
- grpc_iomgr_closure *on_consumed;
+ grpc_closure *on_consumed;
/** connectivity monitoring */
- grpc_iomgr_closure *on_connectivity_state_change;
+ grpc_closure *on_connectivity_state_change;
grpc_connectivity_state *connectivity_state;
/** should the transport be disconnected */
int disconnect;
@@ -118,7 +118,7 @@ typedef struct grpc_transport_op {
/** add this transport to a pollset_set */
grpc_pollset_set *bind_pollset_set;
/** send a ping, call this back if not NULL */
- grpc_iomgr_closure *send_ping;
+ grpc_closure *send_ping;
} grpc_transport_op;
/* Returns the amount of memory required to store a grpc_stream for this
@@ -181,7 +181,7 @@ void grpc_transport_perform_op(grpc_transport *transport,
/* Send a ping on a transport
Calls cb with user data when a response is received. */
-void grpc_transport_ping(grpc_transport *transport, grpc_iomgr_closure *cb);
+void grpc_transport_ping(grpc_transport *transport, grpc_closure *cb);
/* Advise peer of pending connection termination. */
void grpc_transport_goaway(grpc_transport *transport, grpc_status_code status,