aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c292
1 files changed, 149 insertions, 143 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 2e25033813..0193928a50 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -73,9 +73,9 @@ typedef struct {
guarded by mu_config */
grpc_client_config *incoming_configuration;
/** a list of closures that are all waiting for config to come in */
- grpc_iomgr_closure *waiting_for_config_closures;
+ grpc_closure_list waiting_for_config_closures;
/** resolver callback */
- grpc_iomgr_closure on_config_changed;
+ grpc_closure on_config_changed;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
/** when an lb_policy arrives, should we try to exit idle */
@@ -91,7 +91,7 @@ typedef struct {
update the channel, and create a new watcher */
typedef struct {
channel_data *chand;
- grpc_iomgr_closure on_changed;
+ grpc_closure on_changed;
grpc_connectivity_state state;
grpc_lb_policy *lb_policy;
} lb_policy_connectivity_watcher;
@@ -115,7 +115,7 @@ struct call_data {
call_state state;
gpr_timespec deadline;
grpc_subchannel *picked_channel;
- grpc_iomgr_closure async_setup_task;
+ grpc_closure async_setup_task;
grpc_transport_stream_op waiting_op;
/* our child call stack */
grpc_subchannel_call *subchannel_call;
@@ -123,17 +123,18 @@ struct call_data {
grpc_linked_mdelem details;
};
-static grpc_iomgr_closure *merge_into_waiting_op(
- grpc_call_element *elem,
- grpc_transport_stream_op *new_op) GRPC_MUST_USE_RESULT;
+static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
+ grpc_transport_stream_op *new_op)
+ GRPC_MUST_USE_RESULT;
-static void handle_op_after_cancellation(grpc_call_element *elem,
+static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
if (op->send_ops) {
grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
- op->on_done_send->cb(op->on_done_send->cb_arg, 0);
+ op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
}
if (op->recv_ops) {
char status[GPR_LTOA_MIN_BUFSIZE];
@@ -152,26 +153,28 @@ static void handle_op_after_cancellation(grpc_call_element *elem,
mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
grpc_sopb_add_metadata(op->recv_ops, mdb);
*op->recv_state = GRPC_STREAM_CLOSED;
- op->on_done_recv->cb(op->on_done_recv->cb_arg, 1);
+ op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
}
if (op->on_consumed) {
- op->on_consumed->cb(op->on_consumed->cb_arg, 0);
+ op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
}
}
typedef struct {
- grpc_iomgr_closure closure;
+ grpc_closure closure;
grpc_call_element *elem;
} waiting_call;
-static void perform_transport_stream_op(grpc_call_element *elem,
+static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation);
-static void continue_with_pick(void *arg, int iomgr_success) {
+static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
waiting_call *wc = arg;
call_data *calld = wc->elem->call_data;
- perform_transport_stream_op(wc->elem, &calld->waiting_op, 1);
+ perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1);
gpr_free(wc);
}
@@ -179,10 +182,9 @@ static void add_to_lb_policy_wait_queue_locked_state_config(
grpc_call_element *elem) {
channel_data *chand = elem->channel_data;
waiting_call *wc = gpr_malloc(sizeof(*wc));
- grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
+ grpc_closure_init(&wc->closure, continue_with_pick, wc);
wc->elem = elem;
- wc->closure.next = chand->waiting_for_config_closures;
- chand->waiting_for_config_closures = &wc->closure;
+ grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
}
static int is_empty(void *p, int len) {
@@ -194,7 +196,8 @@ static int is_empty(void *p, int len) {
return 1;
}
-static void started_call(void *arg, int iomgr_success) {
+static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
@@ -204,21 +207,21 @@ static void started_call(void *arg, int iomgr_success) {
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(calld->subchannel_call, &op);
+ grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
} else if (calld->state == CALL_WAITING_FOR_CALL) {
have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
if (calld->subchannel_call != NULL) {
calld->state = CALL_ACTIVE;
gpr_mu_unlock(&calld->mu_state);
if (have_waiting) {
- grpc_subchannel_call_process_op(calld->subchannel_call,
+ grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
&calld->waiting_op);
}
} else {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
if (have_waiting) {
- handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
}
}
} else {
@@ -227,36 +230,37 @@ static void started_call(void *arg, int iomgr_success) {
}
}
-static void picked_target(void *arg, int iomgr_success) {
+static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
- perform_transport_stream_op(calld->elem, &calld->waiting_op, 1);
+ perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1);
} else {
gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_CANCELLED) {
gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(calld->elem, &calld->waiting_op);
+ handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
} else {
GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
calld->state = CALL_WAITING_FOR_CALL;
pollset = calld->waiting_op.bind_pollset;
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
- grpc_subchannel_create_call(calld->picked_channel, pollset,
+ grpc_closure_init(&calld->async_setup_task, started_call, calld);
+ grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
&calld->subchannel_call,
&calld->async_setup_task);
}
}
}
-static grpc_iomgr_closure *merge_into_waiting_op(
- grpc_call_element *elem, grpc_transport_stream_op *new_op) {
+static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
+ grpc_transport_stream_op *new_op) {
call_data *calld = elem->call_data;
- grpc_iomgr_closure *consumed_op = NULL;
+ grpc_closure *consumed_op = NULL;
grpc_transport_stream_op *waiting_op = &calld->waiting_op;
GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
@@ -282,7 +286,7 @@ static grpc_iomgr_closure *merge_into_waiting_op(
return consumed_op;
}
-static char *cc_get_peer(grpc_call_element *elem) {
+static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
grpc_subchannel_call *subchannel_call;
@@ -293,8 +297,8 @@ static char *cc_get_peer(grpc_call_element *elem) {
subchannel_call = calld->subchannel_call;
GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
gpr_mu_unlock(&calld->mu_state);
- result = grpc_subchannel_call_get_peer(subchannel_call);
- GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer");
+ result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
return result;
} else {
gpr_mu_unlock(&calld->mu_state);
@@ -302,7 +306,8 @@ static char *cc_get_peer(grpc_call_element *elem) {
}
}
-static void perform_transport_stream_op(grpc_call_element *elem,
+static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
grpc_transport_stream_op *op,
int continuation) {
call_data *calld = elem->call_data;
@@ -310,7 +315,6 @@ static void perform_transport_stream_op(grpc_call_element *elem,
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
- grpc_iomgr_closure *consumed_op = NULL;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@@ -320,15 +324,15 @@ static void perform_transport_stream_op(grpc_call_element *elem,
GPR_ASSERT(!continuation);
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_process_op(subchannel_call, op);
+ grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
break;
case CALL_CANCELLED:
gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op);
+ handle_op_after_cancellation(exec_ctx, elem, op);
break;
case CALL_WAITING_FOR_SEND:
GPR_ASSERT(!continuation);
- consumed_op = merge_into_waiting_op(elem, op);
+ grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
if (!calld->waiting_op.send_ops &&
calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
gpr_mu_unlock(&calld->mu_state);
@@ -354,10 +358,10 @@ static void perform_transport_stream_op(grpc_call_element *elem,
op2.on_consumed = NULL;
}
gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op);
- handle_op_after_cancellation(elem, &op2);
+ handle_op_after_cancellation(exec_ctx, elem, op);
+ handle_op_after_cancellation(exec_ctx, elem, &op2);
} else {
- consumed_op = merge_into_waiting_op(elem, op);
+ grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
gpr_mu_unlock(&calld->mu_state);
}
break;
@@ -367,7 +371,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
if (op->cancel_with_status != GRPC_STATUS_OK) {
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op);
+ handle_op_after_cancellation(exec_ctx, elem, op);
} else {
calld->waiting_op = *op;
@@ -380,34 +384,33 @@ static void perform_transport_stream_op(grpc_call_element *elem,
gpr_mu_lock(&chand->mu_config);
lb_policy = chand->lb_policy;
if (lb_policy) {
- grpc_transport_stream_op *op = &calld->waiting_op;
- grpc_pollset *bind_pollset = op->bind_pollset;
+ grpc_transport_stream_op *waiting_op = &calld->waiting_op;
+ grpc_pollset *bind_pollset = waiting_op->bind_pollset;
grpc_metadata_batch *initial_metadata =
- &op->send_ops->ops[0].data.metadata;
+ &waiting_op->send_ops->ops[0].data.metadata;
GRPC_LB_POLICY_REF(lb_policy, "pick");
gpr_mu_unlock(&chand->mu_config);
calld->state = CALL_WAITING_FOR_PICK;
- GPR_ASSERT(op->bind_pollset);
- GPR_ASSERT(op->send_ops);
- GPR_ASSERT(op->send_ops->nops >= 1);
- GPR_ASSERT(op->send_ops->ops[0].type == GRPC_OP_METADATA);
+ GPR_ASSERT(waiting_op->bind_pollset);
+ GPR_ASSERT(waiting_op->send_ops);
+ GPR_ASSERT(waiting_op->send_ops->nops >= 1);
+ GPR_ASSERT(waiting_op->send_ops->ops[0].type == GRPC_OP_METADATA);
gpr_mu_unlock(&calld->mu_state);
- grpc_iomgr_closure_init(&calld->async_setup_task, picked_target,
- calld);
- grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
- &calld->picked_channel,
+ grpc_closure_init(&calld->async_setup_task, picked_target, calld);
+ grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset,
+ initial_metadata, &calld->picked_channel,
&calld->async_setup_task);
- GRPC_LB_POLICY_UNREF(lb_policy, "pick");
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick");
} else if (chand->resolver != NULL) {
calld->state = CALL_WAITING_FOR_CONFIG;
add_to_lb_policy_wait_queue_locked_state_config(elem);
if (!chand->started_resolving && chand->resolver != NULL) {
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
chand->started_resolving = 1;
- grpc_resolver_next(chand->resolver,
+ grpc_resolver_next(exec_ctx, chand->resolver,
&chand->incoming_configuration,
&chand->on_config_changed);
}
@@ -417,62 +420,68 @@ static void perform_transport_stream_op(grpc_call_element *elem,
calld->state = CALL_CANCELLED;
gpr_mu_unlock(&chand->mu_config);
gpr_mu_unlock(&calld->mu_state);
- handle_op_after_cancellation(elem, op);
+ handle_op_after_cancellation(exec_ctx, elem, op);
}
}
}
break;
}
-
- if (consumed_op != NULL) {
- consumed_op->cb(consumed_op->cb_arg, 1);
- }
}
-static void cc_start_transport_stream_op(grpc_call_element *elem,
+static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
grpc_transport_stream_op *op) {
- perform_transport_stream_op(elem, op, 0);
+ perform_transport_stream_op(exec_ctx, elem, op, 0);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
+static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state);
-static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
+static void on_lb_policy_state_changed_locked(
+ grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
+ /* check if the notification is for a stale policy */
+ if (w->lb_policy != w->chand->lb_policy) return;
+
+ grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state,
+ "lb_changed");
+ if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
+ watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
+ }
+}
+
+static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
gpr_mu_lock(&w->chand->mu_config);
- /* check if the notification is for a stale policy */
- if (w->lb_policy == w->chand->lb_policy) {
- grpc_connectivity_state_set(&w->chand->state_tracker, w->state,
- "lb_changed");
- if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
- watch_lb_policy(w->chand, w->lb_policy, w->state);
- }
- }
+ on_lb_policy_state_changed_locked(exec_ctx, w);
gpr_mu_unlock(&w->chand->mu_config);
- GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy");
gpr_free(w);
}
-static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
+static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
+ grpc_lb_policy *lb_policy,
grpc_connectivity_state current_state) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
w->chand = chand;
- grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+ grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
- grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed);
+ grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
+ &w->on_changed);
}
-static void cc_on_config_changed(void *arg, int iomgr_success) {
+static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
- grpc_iomgr_closure *wakeup_closures = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
int exit_idle = 0;
@@ -481,10 +490,10 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
- state = grpc_lb_policy_check_connectivity(lb_policy);
+ state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
}
- grpc_client_config_unref(chand->incoming_configuration);
+ grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
}
chand->incoming_configuration = NULL;
@@ -493,8 +502,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
- wakeup_closures = chand->waiting_for_config_closures;
- chand->waiting_for_config_closures = NULL;
+ grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -505,57 +513,53 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
- grpc_connectivity_state_set(&chand->state_tracker, state,
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
"new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy(exec_ctx, chand, lb_policy, state);
+ }
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
- grpc_resolver_next(resolver, &chand->incoming_configuration,
+ grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
&chand->on_config_changed);
- GRPC_RESOLVER_UNREF(resolver, "channel-next");
- if (lb_policy != NULL) {
- watch_lb_policy(chand, lb_policy, state);
- }
+ GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next");
} else {
old_resolver = chand->resolver;
chand->resolver = NULL;
- grpc_connectivity_state_set(&chand->state_tracker,
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
if (old_resolver != NULL) {
- grpc_resolver_shutdown(old_resolver);
- GRPC_RESOLVER_UNREF(old_resolver, "channel");
+ grpc_resolver_shutdown(exec_ctx, old_resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel");
}
}
if (exit_idle) {
- grpc_lb_policy_exit_idle(lb_policy);
- GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
+ grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
}
if (old_lb_policy != NULL) {
- grpc_lb_policy_shutdown(old_lb_policy);
- GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
- }
-
- while (wakeup_closures) {
- grpc_iomgr_closure *next = wakeup_closures->next;
- wakeup_closures->cb(wakeup_closures->cb_arg, 1);
- wakeup_closures = next;
+ grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
}
if (lb_policy != NULL) {
- GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
}
- GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
+
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver");
}
-static void cc_start_transport_op(grpc_channel_element *elem,
+static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
grpc_transport_op *op) {
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
- grpc_iomgr_closure *on_consumed = op->on_consumed;
- op->on_consumed = NULL;
+
+ grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
GPR_ASSERT(op->set_accept_stream == NULL);
GPR_ASSERT(op->bind_pollset == NULL);
@@ -563,7 +567,7 @@ static void cc_start_transport_op(grpc_channel_element *elem,
gpr_mu_lock(&chand->mu_config);
if (op->on_connectivity_state_change != NULL) {
grpc_connectivity_state_notify_on_state_change(
- &chand->state_tracker, op->connectivity_state,
+ exec_ctx, &chand->state_tracker, op->connectivity_state,
op->on_connectivity_state_change);
op->on_connectivity_state_change = NULL;
op->connectivity_state = NULL;
@@ -577,35 +581,31 @@ static void cc_start_transport_op(grpc_channel_element *elem,
}
if (op->disconnect && chand->resolver != NULL) {
- grpc_connectivity_state_set(&chand->state_tracker,
+ grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
- grpc_lb_policy_shutdown(chand->lb_policy);
- GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+ grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
}
gpr_mu_unlock(&chand->mu_config);
if (destroy_resolver) {
- grpc_resolver_shutdown(destroy_resolver);
- GRPC_RESOLVER_UNREF(destroy_resolver, "channel");
+ grpc_resolver_shutdown(exec_ctx, destroy_resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
}
if (lb_policy) {
- grpc_lb_policy_broadcast(lb_policy, op);
- GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
- }
-
- if (on_consumed) {
- grpc_iomgr_add_callback(on_consumed);
+ grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
+ GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
}
}
/* Constructor for call_data */
-static void init_call_elem(grpc_call_element *elem,
+static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const void *server_transport_data,
grpc_transport_stream_op *initial_op) {
call_data *calld = elem->call_data;
@@ -622,7 +622,8 @@ static void init_call_elem(grpc_call_element *elem,
}
/* Destructor for call_data */
-static void destroy_call_elem(grpc_call_element *elem) {
+static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem) {
call_data *calld = elem->call_data;
grpc_subchannel_call *subchannel_call;
@@ -634,7 +635,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
- GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel");
+ GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel");
break;
case CALL_CREATED:
case CALL_CANCELLED:
@@ -651,7 +652,8 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
+static void init_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
@@ -666,25 +668,25 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
chand->mdctx = metadata_context;
chand->master = master;
grpc_pollset_set_init(&chand->pollset_set);
- grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
- chand);
+ grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_channel");
}
/* Destructor for channel_data */
-static void destroy_channel_elem(grpc_channel_element *elem) {
+static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
if (chand->resolver != NULL) {
- grpc_resolver_shutdown(chand->resolver);
- GRPC_RESOLVER_UNREF(chand->resolver, "channel");
+ grpc_resolver_shutdown(exec_ctx, chand->resolver);
+ GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
}
if (chand->lb_policy != NULL) {
- GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+ GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
}
- grpc_connectivity_state_destroy(&chand->state_tracker);
+ grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
grpc_pollset_set_destroy(&chand->pollset_set);
gpr_mu_destroy(&chand->mu_config);
}
@@ -702,7 +704,8 @@ const grpc_channel_filter grpc_client_channel_filter = {
"client-channel",
};
-void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
+void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
grpc_resolver *resolver) {
/* post construction initialization: set the transport setup pointer */
grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
@@ -711,31 +714,32 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
GRPC_RESOLVER_REF(resolver, "channel");
- if (chand->waiting_for_config_closures != NULL ||
+ if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = 1;
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
- grpc_resolver_next(resolver, &chand->incoming_configuration,
+ grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
&chand->on_config_changed);
}
gpr_mu_unlock(&chand->mu_config);
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect) {
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
out = grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
if (chand->lb_policy != NULL) {
- grpc_lb_policy_exit_idle(chand->lb_policy);
+ grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
} else {
chand->exit_idle_when_lb_policy_arrives = 1;
if (!chand->started_resolving && chand->resolver != NULL) {
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
chand->started_resolving = 1;
- grpc_resolver_next(chand->resolver, &chand->incoming_configuration,
+ grpc_resolver_next(exec_ctx, chand->resolver,
+ &chand->incoming_configuration,
&chand->on_config_changed);
}
}
@@ -745,12 +749,12 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
}
void grpc_client_channel_watch_connectivity_state(
- grpc_channel_element *elem, grpc_connectivity_state *state,
- grpc_iomgr_closure *on_complete) {
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_connectivity_state *state, grpc_closure *on_complete) {
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu_config);
- grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
- on_complete);
+ grpc_connectivity_state_notify_on_state_change(
+ exec_ctx, &chand->state_tracker, state, on_complete);
gpr_mu_unlock(&chand->mu_config);
}
@@ -760,14 +764,16 @@ grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
return &chand->pollset_set;
}
-void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
+void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
- grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
+ grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset);
}
-void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
+void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
+ grpc_channel_element *elem,
grpc_pollset *pollset) {
channel_data *chand = elem->channel_data;
- grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
+ grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset);
}