aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:20:29 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:20:29 -0700
commit000cd8f9f7346defc79fe6aa877af11b42ab5f1e (patch)
tree883d73a97471f63e616d02c1e17efc62b099c8ad /src/core/channel
parent38adec97e875c21cd9d6cc9d039664bdf4fdb889 (diff)
Introduce call lists for moving work outside locks
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/client_channel.c139
-rw-r--r--src/core/channel/client_channel.h5
2 files changed, 62 insertions, 82 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 5b165972a7..a2346503d9 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -73,7 +73,7 @@ typedef struct {
guarded by mu_config */
grpc_client_config *incoming_configuration;
/** a list of closures that are all waiting for config to come in */
- grpc_iomgr_closure *waiting_for_config_closures;
+ grpc_iomgr_call_list waiting_for_config_closures;
/** resolver callback */
grpc_iomgr_closure on_config_changed;
/** connectivity state being tracked */
@@ -181,8 +181,8 @@ static void add_to_lb_policy_wait_queue_locked_state_config(
waiting_call *wc = gpr_malloc(sizeof(*wc));
grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
wc->elem = elem;
- wc->closure.next = chand->waiting_for_config_closures;
- chand->waiting_for_config_closures = &wc->closure;
+ grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure,
+ 1);
}
static int is_empty(void *p, int len) {
@@ -230,6 +230,7 @@ static void started_call(void *arg, int iomgr_success) {
static void picked_target(void *arg, int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
@@ -248,9 +249,10 @@ static void picked_target(void *arg, int iomgr_success) {
grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
grpc_subchannel_create_call(calld->picked_channel, pollset,
&calld->subchannel_call,
- &calld->async_setup_task);
+ &calld->async_setup_task, &call_list);
}
}
+ grpc_iomgr_call_list_run(call_list);
}
static grpc_iomgr_closure *merge_into_waiting_op(
@@ -310,7 +312,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
- grpc_iomgr_closure *consumed_op = NULL;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@@ -328,7 +330,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
case CALL_WAITING_FOR_SEND:
GPR_ASSERT(!continuation);
- consumed_op = merge_into_waiting_op(elem, op);
+ grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
if (!calld->waiting_op.send_ops &&
calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
gpr_mu_unlock(&calld->mu_state);
@@ -357,7 +359,8 @@ static void perform_transport_stream_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op);
handle_op_after_cancellation(elem, &op2);
} else {
- consumed_op = merge_into_waiting_op(elem, op);
+ grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op),
+ 1);
gpr_mu_unlock(&calld->mu_state);
}
break;
@@ -398,7 +401,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
&calld->picked_channel,
- &calld->async_setup_task);
+ &calld->async_setup_task, &call_list);
GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else if (chand->resolver != NULL) {
@@ -424,9 +427,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
}
- if (consumed_op != NULL) {
- consumed_op->cb(consumed_op->cb_arg, 1);
- }
+ grpc_iomgr_call_list_run(call_list);
}
static void cc_start_transport_stream_op(grpc_call_element *elem,
@@ -435,36 +436,38 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
}
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
- grpc_connectivity_state current_state);
+ grpc_connectivity_state current_state,
+ grpc_iomgr_call_list *cl);
-static void on_lb_policy_state_changed_locked(
- lb_policy_connectivity_watcher *w) {
+static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
+ grpc_iomgr_call_list *cl) {
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
- grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed");
+ grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed",
+ cl);
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
- watch_lb_policy(w->chand, w->lb_policy, w->state);
+ watch_lb_policy(w->chand, w->lb_policy, w->state, cl);
}
}
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&w->chand->mu_config);
- on_lb_policy_state_changed_locked(w);
- grpc_connectivity_state_begin_flush(&w->chand->state_tracker, &f);
+ on_lb_policy_state_changed_locked(w, &cl);
gpr_mu_unlock(&w->chand->mu_config);
- grpc_connectivity_state_end_flush(&f);
+ grpc_iomgr_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
gpr_free(w);
}
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
- grpc_connectivity_state current_state) {
+ grpc_connectivity_state current_state,
+ grpc_iomgr_call_list *call_list) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
@@ -472,13 +475,8 @@ static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
- if (grpc_lb_policy_notify_on_state_change(lb_policy, &w->state,
- &w->on_changed)
- .state_already_changed) {
- on_lb_policy_state_changed_locked(w);
- GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
- gpr_free(w);
- }
+ grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed,
+ call_list);
}
static void cc_on_config_changed(void *arg, int iomgr_success) {
@@ -486,9 +484,8 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
- grpc_iomgr_closure *wakeup_closures = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
int exit_idle = 0;
if (chand->incoming_configuration != NULL) {
@@ -496,7 +493,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
- state = grpc_lb_policy_check_connectivity(lb_policy);
+ state = grpc_lb_policy_check_connectivity(lb_policy, &cl);
}
grpc_client_config_unref(chand->incoming_configuration);
@@ -508,8 +505,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
- wakeup_closures = chand->waiting_for_config_closures;
- chand->waiting_for_config_closures = NULL;
+ grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -520,15 +516,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
- grpc_connectivity_state_set(&chand->state_tracker, state,
- "new_lb+resolver");
+ grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver",
+ &cl);
if (lb_policy != NULL) {
- watch_lb_policy(chand, lb_policy, state);
+ watch_lb_policy(chand, lb_policy, state, &cl);
}
- grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
- grpc_connectivity_state_end_flush(&f);
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
@@ -536,10 +530,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
old_resolver = chand->resolver;
chand->resolver = NULL;
grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
- grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
+ GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone",
+ &cl);
gpr_mu_unlock(&chand->mu_config);
- grpc_connectivity_state_end_flush(&f);
if (old_resolver != NULL) {
grpc_resolver_shutdown(old_resolver);
GRPC_RESOLVER_UNREF(old_resolver, "channel");
@@ -547,24 +540,20 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
if (exit_idle) {
- grpc_lb_policy_exit_idle(lb_policy);
+ grpc_lb_policy_exit_idle(lb_policy, &cl);
GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
}
if (old_lb_policy != NULL) {
- grpc_lb_policy_shutdown(old_lb_policy);
+ grpc_lb_policy_shutdown(old_lb_policy, &cl);
GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
- while (wakeup_closures) {
- grpc_iomgr_closure *next = wakeup_closures->next;
- wakeup_closures->cb(wakeup_closures->cb_arg, 1);
- wakeup_closures = next;
- }
-
if (lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
}
+
+ grpc_iomgr_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
@@ -573,23 +562,21 @@ static void cc_start_transport_op(grpc_channel_element *elem,
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
- grpc_connectivity_state_flusher f;
- grpc_iomgr_closure *call_list = op->on_consumed;
- call_list->next = NULL;
- op->on_consumed = NULL;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+
+ if (op->on_consumed) {
+ grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1);
+ op->on_consumed = NULL;
+ }
GPR_ASSERT(op->set_accept_stream == NULL);
GPR_ASSERT(op->bind_pollset == NULL);
gpr_mu_lock(&chand->mu_config);
if (op->on_connectivity_state_change != NULL) {
- if (grpc_connectivity_state_notify_on_state_change(
- &chand->state_tracker, op->connectivity_state,
- op->on_connectivity_state_change)
- .state_already_changed) {
- op->on_connectivity_state_change->next = call_list;
- call_list = op->on_connectivity_state_change;
- }
+ grpc_connectivity_state_notify_on_state_change(
+ &chand->state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change, &call_list);
op->on_connectivity_state_change = NULL;
op->connectivity_state = NULL;
}
@@ -603,18 +590,17 @@ static void cc_start_transport_op(grpc_channel_element *elem,
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ GRPC_CHANNEL_FATAL_FAILURE, "disconnect",
+ &call_list);
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
- grpc_lb_policy_shutdown(chand->lb_policy);
+ grpc_lb_policy_shutdown(chand->lb_policy, &call_list);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
}
- grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
gpr_mu_unlock(&chand->mu_config);
- grpc_connectivity_state_end_flush(&f);
if (destroy_resolver) {
grpc_resolver_shutdown(destroy_resolver);
@@ -622,15 +608,11 @@ static void cc_start_transport_op(grpc_channel_element *elem,
}
if (lb_policy) {
- grpc_lb_policy_broadcast(lb_policy, op);
+ grpc_lb_policy_broadcast(lb_policy, op, &call_list);
GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
}
- while (call_list != NULL) {
- grpc_iomgr_closure *next = call_list->next;
- call_list->cb(call_list->cb_arg, 1);
- call_list = next;
- }
+ grpc_iomgr_call_list_run(call_list);
}
/* Constructor for call_data */
@@ -740,7 +722,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
GRPC_RESOLVER_REF(resolver, "channel");
- if (chand->waiting_for_config_closures != NULL ||
+ if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = 1;
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
@@ -751,14 +733,15 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect) {
+ grpc_channel_element *elem, int try_to_connect,
+ grpc_iomgr_call_list *call_list) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
out = grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
if (chand->lb_policy != NULL) {
- grpc_lb_policy_exit_idle(chand->lb_policy);
+ grpc_lb_policy_exit_idle(chand->lb_policy, call_list);
} else {
chand->exit_idle_when_lb_policy_arrives = 1;
if (!chand->started_resolving && chand->resolver != NULL) {
@@ -775,16 +758,12 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
- grpc_iomgr_closure *on_complete) {
+ grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
channel_data *chand = elem->channel_data;
- grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&chand->mu_config);
- r = grpc_connectivity_state_notify_on_state_change(&chand->state_tracker,
- state, on_complete);
+ grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
+ on_complete, call_list);
gpr_mu_unlock(&chand->mu_config);
- if (r.state_already_changed) {
- on_complete->cb(on_complete->cb_arg, 1);
- }
}
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index 13681e3956..60090e8d7e 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -53,11 +53,12 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect);
+ grpc_channel_element *elem, int try_to_connect,
+ grpc_iomgr_call_list *call_list);
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
- grpc_iomgr_closure *on_complete);
+ grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list);
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
grpc_channel_element *elem);