aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:20:29 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:20:29 -0700
commit000cd8f9f7346defc79fe6aa877af11b42ab5f1e (patch)
tree883d73a97471f63e616d02c1e17efc62b099c8ad /src/core
parent38adec97e875c21cd9d6cc9d039664bdf4fdb889 (diff)
Introduce call lists for moving work outside locks
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c139
-rw-r--r--src/core/channel/client_channel.h5
-rw-r--r--src/core/client_config/lb_policies/pick_first.c127
-rw-r--r--src/core/client_config/lb_policy.c35
-rw-r--r--src/core/client_config/lb_policy.h43
-rw-r--r--src/core/client_config/subchannel.c91
-rw-r--r--src/core/client_config/subchannel.h11
-rw-r--r--src/core/iomgr/iomgr.c39
-rw-r--r--src/core/iomgr/iomgr.h15
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c2
-rw-r--r--src/core/iomgr/pollset_posix.c48
-rw-r--r--src/core/iomgr/pollset_posix.h10
-rw-r--r--src/core/surface/channel.c8
-rw-r--r--src/core/surface/channel_connectivity.c16
-rw-r--r--src/core/transport/chttp2/internal.h8
-rw-r--r--src/core/transport/chttp2/writing.c4
-rw-r--r--src/core/transport/chttp2_transport.c67
-rw-r--r--src/core/transport/connectivity_state.c62
-rw-r--r--src/core/transport/connectivity_state.h32
19 files changed, 352 insertions, 410 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 5b165972a7..a2346503d9 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -73,7 +73,7 @@ typedef struct {
guarded by mu_config */
grpc_client_config *incoming_configuration;
/** a list of closures that are all waiting for config to come in */
- grpc_iomgr_closure *waiting_for_config_closures;
+ grpc_iomgr_call_list waiting_for_config_closures;
/** resolver callback */
grpc_iomgr_closure on_config_changed;
/** connectivity state being tracked */
@@ -181,8 +181,8 @@ static void add_to_lb_policy_wait_queue_locked_state_config(
waiting_call *wc = gpr_malloc(sizeof(*wc));
grpc_iomgr_closure_init(&wc->closure, continue_with_pick, wc);
wc->elem = elem;
- wc->closure.next = chand->waiting_for_config_closures;
- chand->waiting_for_config_closures = &wc->closure;
+ grpc_iomgr_call_list_add(&chand->waiting_for_config_closures, &wc->closure,
+ 1);
}
static int is_empty(void *p, int len) {
@@ -230,6 +230,7 @@ static void started_call(void *arg, int iomgr_success) {
static void picked_target(void *arg, int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
@@ -248,9 +249,10 @@ static void picked_target(void *arg, int iomgr_success) {
grpc_iomgr_closure_init(&calld->async_setup_task, started_call, calld);
grpc_subchannel_create_call(calld->picked_channel, pollset,
&calld->subchannel_call,
- &calld->async_setup_task);
+ &calld->async_setup_task, &call_list);
}
}
+ grpc_iomgr_call_list_run(call_list);
}
static grpc_iomgr_closure *merge_into_waiting_op(
@@ -310,7 +312,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
grpc_subchannel_call *subchannel_call;
grpc_lb_policy *lb_policy;
grpc_transport_stream_op op2;
- grpc_iomgr_closure *consumed_op = NULL;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
@@ -328,7 +330,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
case CALL_WAITING_FOR_SEND:
GPR_ASSERT(!continuation);
- consumed_op = merge_into_waiting_op(elem, op);
+ grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1);
if (!calld->waiting_op.send_ops &&
calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
gpr_mu_unlock(&calld->mu_state);
@@ -357,7 +359,8 @@ static void perform_transport_stream_op(grpc_call_element *elem,
handle_op_after_cancellation(elem, op);
handle_op_after_cancellation(elem, &op2);
} else {
- consumed_op = merge_into_waiting_op(elem, op);
+ grpc_iomgr_call_list_add(&call_list, merge_into_waiting_op(elem, op),
+ 1);
gpr_mu_unlock(&calld->mu_state);
}
break;
@@ -398,7 +401,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
calld);
grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata,
&calld->picked_channel,
- &calld->async_setup_task);
+ &calld->async_setup_task, &call_list);
GRPC_LB_POLICY_UNREF(lb_policy, "pick");
} else if (chand->resolver != NULL) {
@@ -424,9 +427,7 @@ static void perform_transport_stream_op(grpc_call_element *elem,
break;
}
- if (consumed_op != NULL) {
- consumed_op->cb(consumed_op->cb_arg, 1);
- }
+ grpc_iomgr_call_list_run(call_list);
}
static void cc_start_transport_stream_op(grpc_call_element *elem,
@@ -435,36 +436,38 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
}
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
- grpc_connectivity_state current_state);
+ grpc_connectivity_state current_state,
+ grpc_iomgr_call_list *cl);
-static void on_lb_policy_state_changed_locked(
- lb_policy_connectivity_watcher *w) {
+static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w,
+ grpc_iomgr_call_list *cl) {
/* check if the notification is for a stale policy */
if (w->lb_policy != w->chand->lb_policy) return;
- grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed");
+ grpc_connectivity_state_set(&w->chand->state_tracker, w->state, "lb_changed",
+ cl);
if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
- watch_lb_policy(w->chand, w->lb_policy, w->state);
+ watch_lb_policy(w->chand, w->lb_policy, w->state, cl);
}
}
static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
lb_policy_connectivity_watcher *w = arg;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&w->chand->mu_config);
- on_lb_policy_state_changed_locked(w);
- grpc_connectivity_state_begin_flush(&w->chand->state_tracker, &f);
+ on_lb_policy_state_changed_locked(w, &cl);
gpr_mu_unlock(&w->chand->mu_config);
- grpc_connectivity_state_end_flush(&f);
+ grpc_iomgr_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
gpr_free(w);
}
static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
- grpc_connectivity_state current_state) {
+ grpc_connectivity_state current_state,
+ grpc_iomgr_call_list *call_list) {
lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
@@ -472,13 +475,8 @@ static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy,
grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
w->state = current_state;
w->lb_policy = lb_policy;
- if (grpc_lb_policy_notify_on_state_change(lb_policy, &w->state,
- &w->on_changed)
- .state_already_changed) {
- on_lb_policy_state_changed_locked(w);
- GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
- gpr_free(w);
- }
+ grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed,
+ call_list);
}
static void cc_on_config_changed(void *arg, int iomgr_success) {
@@ -486,9 +484,8 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
- grpc_iomgr_closure *wakeup_closures = NULL;
grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list cl = GRPC_IOMGR_CALL_LIST_INIT;
int exit_idle = 0;
if (chand->incoming_configuration != NULL) {
@@ -496,7 +493,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (lb_policy != NULL) {
GRPC_LB_POLICY_REF(lb_policy, "channel");
GRPC_LB_POLICY_REF(lb_policy, "config_change");
- state = grpc_lb_policy_check_connectivity(lb_policy);
+ state = grpc_lb_policy_check_connectivity(lb_policy, &cl);
}
grpc_client_config_unref(chand->incoming_configuration);
@@ -508,8 +505,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
old_lb_policy = chand->lb_policy;
chand->lb_policy = lb_policy;
if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
- wakeup_closures = chand->waiting_for_config_closures;
- chand->waiting_for_config_closures = NULL;
+ grpc_iomgr_call_list_move(&chand->waiting_for_config_closures, &cl);
}
if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
@@ -520,15 +516,13 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
- grpc_connectivity_state_set(&chand->state_tracker, state,
- "new_lb+resolver");
+ grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver",
+ &cl);
if (lb_policy != NULL) {
- watch_lb_policy(chand, lb_policy, state);
+ watch_lb_policy(chand, lb_policy, state, &cl);
}
- grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
gpr_mu_unlock(&chand->mu_config);
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
- grpc_connectivity_state_end_flush(&f);
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
@@ -536,10 +530,9 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
old_resolver = chand->resolver;
chand->resolver = NULL;
grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
- grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
+ GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone",
+ &cl);
gpr_mu_unlock(&chand->mu_config);
- grpc_connectivity_state_end_flush(&f);
if (old_resolver != NULL) {
grpc_resolver_shutdown(old_resolver);
GRPC_RESOLVER_UNREF(old_resolver, "channel");
@@ -547,24 +540,20 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
if (exit_idle) {
- grpc_lb_policy_exit_idle(lb_policy);
+ grpc_lb_policy_exit_idle(lb_policy, &cl);
GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
}
if (old_lb_policy != NULL) {
- grpc_lb_policy_shutdown(old_lb_policy);
+ grpc_lb_policy_shutdown(old_lb_policy, &cl);
GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
- while (wakeup_closures) {
- grpc_iomgr_closure *next = wakeup_closures->next;
- wakeup_closures->cb(wakeup_closures->cb_arg, 1);
- wakeup_closures = next;
- }
-
if (lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
}
+
+ grpc_iomgr_call_list_run(cl);
GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
@@ -573,23 +562,21 @@ static void cc_start_transport_op(grpc_channel_element *elem,
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
grpc_resolver *destroy_resolver = NULL;
- grpc_connectivity_state_flusher f;
- grpc_iomgr_closure *call_list = op->on_consumed;
- call_list->next = NULL;
- op->on_consumed = NULL;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+
+ if (op->on_consumed) {
+ grpc_iomgr_call_list_add(&call_list, op->on_consumed, 1);
+ op->on_consumed = NULL;
+ }
GPR_ASSERT(op->set_accept_stream == NULL);
GPR_ASSERT(op->bind_pollset == NULL);
gpr_mu_lock(&chand->mu_config);
if (op->on_connectivity_state_change != NULL) {
- if (grpc_connectivity_state_notify_on_state_change(
- &chand->state_tracker, op->connectivity_state,
- op->on_connectivity_state_change)
- .state_already_changed) {
- op->on_connectivity_state_change->next = call_list;
- call_list = op->on_connectivity_state_change;
- }
+ grpc_connectivity_state_notify_on_state_change(
+ &chand->state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change, &call_list);
op->on_connectivity_state_change = NULL;
op->connectivity_state = NULL;
}
@@ -603,18 +590,17 @@ static void cc_start_transport_op(grpc_channel_element *elem,
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
+ GRPC_CHANNEL_FATAL_FAILURE, "disconnect",
+ &call_list);
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
- grpc_lb_policy_shutdown(chand->lb_policy);
+ grpc_lb_policy_shutdown(chand->lb_policy, &call_list);
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
chand->lb_policy = NULL;
}
}
- grpc_connectivity_state_begin_flush(&chand->state_tracker, &f);
gpr_mu_unlock(&chand->mu_config);
- grpc_connectivity_state_end_flush(&f);
if (destroy_resolver) {
grpc_resolver_shutdown(destroy_resolver);
@@ -622,15 +608,11 @@ static void cc_start_transport_op(grpc_channel_element *elem,
}
if (lb_policy) {
- grpc_lb_policy_broadcast(lb_policy, op);
+ grpc_lb_policy_broadcast(lb_policy, op, &call_list);
GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
}
- while (call_list != NULL) {
- grpc_iomgr_closure *next = call_list->next;
- call_list->cb(call_list->cb_arg, 1);
- call_list = next;
- }
+ grpc_iomgr_call_list_run(call_list);
}
/* Constructor for call_data */
@@ -740,7 +722,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
GRPC_RESOLVER_REF(resolver, "channel");
- if (chand->waiting_for_config_closures != NULL ||
+ if (!grpc_iomgr_call_list_empty(chand->waiting_for_config_closures) ||
chand->exit_idle_when_lb_policy_arrives) {
chand->started_resolving = 1;
GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
@@ -751,14 +733,15 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect) {
+ grpc_channel_element *elem, int try_to_connect,
+ grpc_iomgr_call_list *call_list) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
gpr_mu_lock(&chand->mu_config);
out = grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
if (chand->lb_policy != NULL) {
- grpc_lb_policy_exit_idle(chand->lb_policy);
+ grpc_lb_policy_exit_idle(chand->lb_policy, call_list);
} else {
chand->exit_idle_when_lb_policy_arrives = 1;
if (!chand->started_resolving && chand->resolver != NULL) {
@@ -775,16 +758,12 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
- grpc_iomgr_closure *on_complete) {
+ grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
channel_data *chand = elem->channel_data;
- grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&chand->mu_config);
- r = grpc_connectivity_state_notify_on_state_change(&chand->state_tracker,
- state, on_complete);
+ grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
+ on_complete, call_list);
gpr_mu_unlock(&chand->mu_config);
- if (r.state_already_changed) {
- on_complete->cb(on_complete->cb_arg, 1);
- }
}
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
diff --git a/src/core/channel/client_channel.h b/src/core/channel/client_channel.h
index 13681e3956..60090e8d7e 100644
--- a/src/core/channel/client_channel.h
+++ b/src/core/channel/client_channel.h
@@ -53,11 +53,12 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver *resolver);
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
- grpc_channel_element *elem, int try_to_connect);
+ grpc_channel_element *elem, int try_to_connect,
+ grpc_iomgr_call_list *call_list);
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element *elem, grpc_connectivity_state *state,
- grpc_iomgr_closure *on_complete);
+ grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list);
grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
grpc_channel_element *elem);
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 804dbdeadd..8fd8dd7b67 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -108,9 +108,8 @@ void pf_destroy(grpc_lb_policy *pol) {
gpr_free(p);
}
-void pf_shutdown(grpc_lb_policy *pol) {
+void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state_flusher f;
pending_pick *pp;
gpr_mu_lock(&p->mu);
del_interested_parties_locked(p);
@@ -118,51 +117,40 @@ void pf_shutdown(grpc_lb_policy *pol) {
pp = p->pending_picks;
p->pending_picks = NULL;
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE,
- "shutdown");
- grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
+ "shutdown", call_list);
gpr_mu_unlock(&p->mu);
- grpc_connectivity_state_end_flush(&f);
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- pp->on_complete->cb(pp->on_complete->cb_arg, 0);
+ grpc_iomgr_call_list_add(call_list, pp->on_complete, 1);
gpr_free(pp);
pp = next;
}
}
-/* returns a closure to call, or NULL */
-static grpc_iomgr_closure *start_picking(pick_first_lb_policy *p) {
+static void start_picking(pick_first_lb_policy *p,
+ grpc_iomgr_call_list *call_list) {
p->started_picking = 1;
p->checking_subchannel = 0;
p->checking_connectivity = GRPC_CHANNEL_IDLE;
GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
- if (grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed)
- .state_already_changed) {
- return &p->connectivity_changed;
- } else {
- return NULL;
- }
+ grpc_subchannel_notify_on_state_change(p->subchannels[p->checking_subchannel],
+ &p->checking_connectivity,
+ &p->connectivity_changed, call_list);
}
-void pf_exit_idle(grpc_lb_policy *pol) {
+void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_iomgr_closure *call = NULL;
gpr_mu_lock(&p->mu);
if (!p->started_picking) {
- call = start_picking(p);
+ start_picking(p, call_list);
}
gpr_mu_unlock(&p->mu);
- if (call) {
- call->cb(call->cb_arg, 1);
- }
}
void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
- grpc_iomgr_closure *on_complete) {
+ grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
@@ -171,9 +159,8 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
*target = p->selected;
on_complete->cb(on_complete->cb_arg, 1);
} else {
- grpc_iomgr_closure *call = NULL;
if (!p->started_picking) {
- call = start_picking(p);
+ start_picking(p, call_list);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
pollset);
@@ -184,9 +171,6 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
pp->on_complete = on_complete;
p->pending_picks = pp;
gpr_mu_unlock(&p->mu);
- if (call) {
- call->cb(call->cb_arg, 1);
- }
}
}
@@ -194,8 +178,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
int unref = 0;
- grpc_iomgr_closure *cbs = NULL;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&p->mu);
@@ -203,14 +186,11 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
unref = 1;
} else if (p->selected != NULL) {
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
- "selected_changed");
+ "selected_changed", &call_list);
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
- if (grpc_subchannel_notify_on_state_change(
- p->selected, &p->checking_connectivity, &p->connectivity_changed)
- .state_already_changed) {
- p->connectivity_changed.next = cbs;
- cbs = &p->connectivity_changed;
- }
+ grpc_subchannel_notify_on_state_change(
+ p->selected, &p->checking_connectivity, &p->connectivity_changed,
+ &call_list);
} else {
unref = 1;
}
@@ -219,28 +199,23 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
- "connecting_ready");
+ "connecting_ready", &call_list);
p->selected = p->subchannels[p->checking_subchannel];
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
grpc_subchannel_del_interested_party(p->selected, pp->pollset);
- pp->on_complete->next = cbs;
- cbs = pp->on_complete;
+ grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
- if (grpc_subchannel_notify_on_state_change(p->selected,
- &p->checking_connectivity,
- &p->connectivity_changed)
- .state_already_changed) {
- p->connectivity_changed.next = cbs;
- cbs = &p->connectivity_changed;
- }
+ grpc_subchannel_notify_on_state_change(
+ p->selected, &p->checking_connectivity, &p->connectivity_changed,
+ &call_list);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure");
+ "connecting_transient_failure", &call_list);
del_interested_parties_locked(p);
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
@@ -248,13 +223,9 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
p->subchannels[p->checking_subchannel]);
add_interested_parties_locked(p);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
- if (grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel],
- &p->checking_connectivity, &p->connectivity_changed)
- .state_already_changed) {
- p->connectivity_changed.next = cbs;
- cbs = &p->connectivity_changed;
- }
+ grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed, &call_list);
} else {
goto loop;
}
@@ -262,14 +233,10 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
- "connecting_changed");
- if (grpc_subchannel_notify_on_state_change(
- p->subchannels[p->checking_subchannel],
- &p->checking_connectivity, &p->connectivity_changed)
- .state_already_changed) {
- p->connectivity_changed.next = cbs;
- cbs = &p->connectivity_changed;
- }
+ "connecting_changed", &call_list);
+ grpc_subchannel_notify_on_state_change(
+ p->subchannels[p->checking_subchannel], &p->checking_connectivity,
+ &p->connectivity_changed, &call_list);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
del_interested_parties_locked(p);
@@ -280,19 +247,18 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
if (p->num_subchannels == 0) {
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE,
- "no_more_channels");
+ "no_more_channels", &call_list);
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- pp->on_complete->next = cbs;
- cbs = pp->on_complete;
+ grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1);
gpr_free(pp);
}
unref = 1;
} else {
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
- "subchannel_failed");
+ "subchannel_failed", &call_list);
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
@@ -302,22 +268,17 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
}
}
- grpc_connectivity_state_begin_flush(&p->state_tracker, &f);
gpr_mu_unlock(&p->mu);
- grpc_connectivity_state_end_flush(&f);
- while (cbs != NULL) {
- grpc_iomgr_closure *next = cbs->next;
- cbs->cb(cbs->cb_arg, 1);
- cbs = next;
- }
+ grpc_iomgr_call_list_run(call_list);
if (unref) {
GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
}
}
-static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
+static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
+ grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
size_t n;
@@ -339,7 +300,8 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
gpr_free(subchannels);
}
-static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
+static grpc_connectivity_state pf_check_connectivity(
+ grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
grpc_connectivity_state st;
gpr_mu_lock(&p->mu);
@@ -348,16 +310,15 @@ static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
return st;
}
-static grpc_connectivity_state_notify_on_state_change_result
-pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current,
- grpc_iomgr_closure *notify) {
+void pf_notify_on_state_change(grpc_lb_policy *pol,
+ grpc_connectivity_state *current,
+ grpc_iomgr_closure *notify,
+ grpc_iomgr_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
- grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&p->mu);
- r = grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
- notify);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current,
+ notify, call_list);
gpr_mu_unlock(&p->mu);
- return r;
}
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 48a787da8a..a9dc9dcca8 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -63,33 +63,38 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy) {
}
}
-void grpc_lb_policy_shutdown(grpc_lb_policy *policy) {
- policy->vtable->shutdown(policy);
+void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
+ grpc_iomgr_call_list *call_list) {
+ policy->vtable->shutdown(policy, call_list);
}
void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_subchannel **target,
- grpc_iomgr_closure *on_complete) {
- policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete);
+ grpc_iomgr_closure *on_complete,
+ grpc_iomgr_call_list *call_list) {
+ policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete,
+ call_list);
}
-void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) {
- policy->vtable->broadcast(policy, op);
+void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
+ grpc_iomgr_call_list *call_list) {
+ policy->vtable->broadcast(policy, op, call_list);
}
-void grpc_lb_policy_exit_idle(grpc_lb_policy *policy) {
- policy->vtable->exit_idle(policy);
+void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
+ grpc_iomgr_call_list *call_list) {
+ policy->vtable->exit_idle(policy, call_list);
}
-grpc_connectivity_state_notify_on_state_change_result
-grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *closure) {
- return policy->vtable->notify_on_state_change(policy, state, closure);
+void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure,
+ grpc_iomgr_call_list *call_list) {
+ policy->vtable->notify_on_state_change(policy, state, closure, call_list);
}
grpc_connectivity_state grpc_lb_policy_check_connectivity(
- grpc_lb_policy *policy) {
- return policy->vtable->check_connectivity(policy);
+ grpc_lb_policy *policy, grpc_iomgr_call_list *call_list) {
+ return policy->vtable->check_connectivity(policy, call_list);
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 7c9bdac648..8d7eb579b5 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -53,28 +53,31 @@ struct grpc_lb_policy {
struct grpc_lb_policy_vtable {
void (*destroy)(grpc_lb_policy *policy);
- void (*shutdown)(grpc_lb_policy *policy);
+ void (*shutdown)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
/** implement grpc_lb_policy_pick */
void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
- grpc_iomgr_closure *on_complete);
+ grpc_iomgr_closure *on_complete,
+ grpc_iomgr_call_list *call_list);
/** try to enter a READY connectivity state */
- void (*exit_idle)(grpc_lb_policy *policy);
+ void (*exit_idle)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
/** broadcast a transport op to all subchannels */
- void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op);
+ void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op,
+ grpc_iomgr_call_list *call_list);
/** check the current connectivity of the lb_policy */
- grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy);
+ grpc_connectivity_state (*check_connectivity)(
+ grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the policy */
- grpc_connectivity_state_notify_on_state_change_result (
- *notify_on_state_change)(grpc_lb_policy *policy,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *closure);
+ void (*notify_on_state_change)(grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure,
+ grpc_iomgr_call_list *call_list);
};
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
@@ -98,7 +101,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
const grpc_lb_policy_vtable *vtable);
/** Start shutting down (fail any pending picks) */
-void grpc_lb_policy_shutdown(grpc_lb_policy *policy);
+void grpc_lb_policy_shutdown(grpc_lb_policy *policy,
+ grpc_iomgr_call_list *call_list);
/** Given initial metadata in \a initial_metadata, find an appropriate
target for this rpc, and 'return' it by calling \a on_complete after setting
@@ -107,18 +111,21 @@ void grpc_lb_policy_shutdown(grpc_lb_policy *policy);
void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata,
grpc_subchannel **target,
- grpc_iomgr_closure *on_complete);
+ grpc_iomgr_closure *on_complete,
+ grpc_iomgr_call_list *call_list);
-void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op);
+void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op,
+ grpc_iomgr_call_list *call_list);
-void grpc_lb_policy_exit_idle(grpc_lb_policy *policy);
+void grpc_lb_policy_exit_idle(grpc_lb_policy *policy,
+ grpc_iomgr_call_list *call_list);
-grpc_connectivity_state_notify_on_state_change_result
-grpc_lb_policy_notify_on_state_change(
- grpc_lb_policy *policy, grpc_connectivity_state *state,
- grpc_iomgr_closure *closure) GRPC_MUST_USE_RESULT;
+void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *closure,
+ grpc_iomgr_call_list *call_list);
grpc_connectivity_state grpc_lb_policy_check_connectivity(
- grpc_lb_policy *policy);
+ grpc_lb_policy *policy, grpc_iomgr_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 6fbf966475..b15acf826a 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -146,7 +146,8 @@ struct grpc_subchannel_call {
static grpc_subchannel_call *create_call(connection *con);
static void connectivity_state_changed_locked(grpc_subchannel *c,
- const char *reason);
+ const char *reason,
+ grpc_iomgr_call_list *call_list);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success);
@@ -333,16 +334,18 @@ static void start_connect(grpc_subchannel *c) {
static void continue_creating_call(void *arg, int iomgr_success) {
waiting_for_connect *w4c = arg;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset);
grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
- w4c->notify);
+ w4c->notify, &call_list);
GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
grpc_subchannel_call **target,
- grpc_iomgr_closure *notify) {
+ grpc_iomgr_closure *notify,
+ grpc_iomgr_call_list *call_list) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@@ -365,15 +368,12 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, pollset);
if (!c->connecting) {
- grpc_connectivity_state_flusher f;
c->connecting = 1;
- connectivity_state_changed_locked(c, "create_call");
+ connectivity_state_changed_locked(c, "create_call", call_list);
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
start_connect(c);
} else {
@@ -390,33 +390,26 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state;
}
-grpc_connectivity_state_notify_on_state_change_result
-grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *notify) {
+void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *notify,
+ grpc_iomgr_call_list *call_list) {
int do_connect = 0;
- grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&c->mu);
- r = grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
- notify);
- if (r.current_state_is_idle) {
- grpc_connectivity_state_flusher f;
+ if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
+ notify, call_list)) {
do_connect = 1;
c->connecting = 1;
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- connectivity_state_changed_locked(c, "state_change");
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
- gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
- } else {
- gpr_mu_unlock(&c->mu);
+ connectivity_state_changed_locked(c, "state_change", call_list);
}
+ gpr_mu_unlock(&c->mu);
+
if (do_connect) {
start_connect(c);
}
- return r;
}
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
@@ -424,24 +417,20 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
connection *con = NULL;
grpc_subchannel *destroy;
int cancel_alarm = 0;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
CONNECTION_REF_LOCKED(con, "transport-op");
}
if (op->disconnect) {
- grpc_connectivity_state_flusher f;
c->disconnected = 1;
- connectivity_state_changed_locked(c, "disconnect");
+ connectivity_state_changed_locked(c, "disconnect", &call_list);
if (c->have_alarm) {
cancel_alarm = 1;
}
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
- gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
- } else {
- gpr_mu_unlock(&c->mu);
}
+ gpr_mu_unlock(&c->mu);
if (con != NULL) {
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
@@ -464,6 +453,8 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
if (op->disconnect) {
grpc_connector_shutdown(c->connector);
}
+
+ grpc_iomgr_call_list_run(call_list);
}
static void on_state_changed(void *p, int iomgr_success) {
@@ -474,7 +465,7 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(mu);
@@ -508,26 +499,26 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_connectivity_state_set(
&c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE
: GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connection_failed");
+ "connection_failed", &call_list);
break;
}
done:
- connectivity_state_changed_locked(c, "transport_state_changed");
+ connectivity_state_changed_locked(c, "transport_state_changed", &call_list);
destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw);
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(mu);
- grpc_connectivity_state_end_flush(&f);
if (destroy) {
subchannel_destroy(c);
}
if (destroy_connection != NULL) {
connection_destroy(destroy_connection);
}
+ grpc_iomgr_call_list_run(call_list);
}
-static void publish_transport(grpc_subchannel *c) {
+static void publish_transport(grpc_subchannel *c,
+ grpc_iomgr_call_list *call_list) {
size_t channel_stack_size;
connection *con;
grpc_channel_stack *stk;
@@ -538,7 +529,6 @@ static void publish_transport(grpc_subchannel *c) {
state_watcher *sw;
connection *destroy_connection = NULL;
grpc_channel_element *elem;
- grpc_connectivity_state_flusher f;
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
@@ -601,17 +591,15 @@ static void publish_transport(grpc_subchannel *c) {
elem->filter->start_transport_op(elem, &op);
/* signal completion */
- connectivity_state_changed_locked(c, "connected");
+ connectivity_state_changed_locked(c, "connected", call_list);
w4c = c->waiting;
c->waiting = NULL;
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
while (w4c != NULL) {
waiting_for_connect *next = w4c;
- w4c->continuation.cb(w4c->continuation.cb_arg, 1);
+ grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1);
w4c = next;
}
@@ -653,16 +641,14 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
static void on_alarm(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
iomgr_success = 0;
}
- connectivity_state_changed_locked(c, "alarm");
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
+ connectivity_state_changed_locked(c, "alarm", &call_list);
gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
if (iomgr_success) {
update_reconnect_parameters(c);
continue_connect(c);
@@ -670,24 +656,24 @@ static void on_alarm(void *arg, int iomgr_success) {
GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(c, "connecting");
}
+ grpc_iomgr_call_list_run(call_list);
}
static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
if (c->connecting_result.transport != NULL) {
- publish_transport(c);
+ publish_transport(c, &call_list);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- grpc_connectivity_state_flusher f;
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- connectivity_state_changed_locked(c, "connect_failed");
+ connectivity_state_changed_locked(c, "connect_failed", &call_list);
grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
}
+ grpc_iomgr_call_list_run(call_list);
}
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
@@ -718,9 +704,10 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
}
static void connectivity_state_changed_locked(grpc_subchannel *c,
- const char *reason) {
+ const char *reason,
+ grpc_iomgr_call_list *call_list) {
grpc_connectivity_state current = compute_connectivity_locked(c);
- grpc_connectivity_state_set(&c->state_tracker, current, reason);
+ grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list);
}
/*
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 391df36bfd..7c00ff172d 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -76,7 +76,8 @@ void grpc_subchannel_call_unref(
void grpc_subchannel_create_call(grpc_subchannel *subchannel,
grpc_pollset *pollset,
grpc_subchannel_call **target,
- grpc_iomgr_closure *notify);
+ grpc_iomgr_closure *notify,
+ grpc_iomgr_call_list *call_list);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel,
@@ -88,10 +89,10 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(
/** call notify when the connectivity state of a channel changes from *state.
Updates *state with the new state of the channel */
-grpc_connectivity_state_notify_on_state_change_result
-grpc_subchannel_notify_on_state_change(
- grpc_subchannel *channel, grpc_connectivity_state *state,
- grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT;
+void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *notify,
+ grpc_iomgr_call_list *call_list);
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_subchannel *channel,
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 67eff3e528..ba8f73fe08 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -157,3 +157,42 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
closure->cb_arg = cb_arg;
closure->next = NULL;
}
+
+void grpc_iomgr_call_list_add(grpc_iomgr_call_list *call_list,
+ grpc_iomgr_closure *closure, int success) {
+ if (!closure) return;
+ closure->next = NULL;
+ closure->success = success;
+ if (!call_list->head) {
+ call_list->head = closure;
+ } else {
+ call_list->tail->next = closure;
+ }
+ call_list->tail = closure;
+}
+
+void grpc_iomgr_call_list_run(grpc_iomgr_call_list call_list) {
+ grpc_iomgr_closure *c = call_list.head;
+ while (c) {
+ grpc_iomgr_closure *next = c->next;
+ c->cb(c->cb_arg, c->success);
+ c = next;
+ }
+}
+
+int grpc_iomgr_call_list_empty(grpc_iomgr_call_list call_list) {
+ return call_list.head == NULL;
+}
+
+void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
+ grpc_iomgr_call_list *dst) {
+ if (dst->head == NULL) {
+ *dst = *src;
+ return;
+ }
+ if (src->head == NULL) {
+ return;
+ }
+ dst->tail->next = src->head;
+ dst->tail = src->tail;
+}
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index f1d2e6439d..58c31763d2 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -58,10 +58,25 @@ typedef struct grpc_iomgr_closure {
struct grpc_iomgr_closure *next;
} grpc_iomgr_closure;
+typedef struct grpc_iomgr_call_list {
+ grpc_iomgr_closure *head;
+ grpc_iomgr_closure *tail;
+} grpc_iomgr_call_list;
+
/** Initializes \a closure with \a cb and \a cb_arg. */
void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg);
+#define GRPC_IOMGR_CALL_LIST_INIT \
+ { NULL, NULL }
+
+void grpc_iomgr_call_list_add(grpc_iomgr_call_list *list,
+ grpc_iomgr_closure *closure, int success);
+void grpc_iomgr_call_list_run(grpc_iomgr_call_list list);
+void grpc_iomgr_call_list_move(grpc_iomgr_call_list *src,
+ grpc_iomgr_call_list *dst);
+int grpc_iomgr_call_list_empty(grpc_iomgr_call_list list);
+
/** Initializes the iomgr. */
void grpc_iomgr_init(void);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 12f34c1a19..253f3190f1 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -127,7 +127,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
GRPC_FD_REF(fd, "delayed_add");
grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
- grpc_workqueue_push(fd->workqueue, &da->closure, 1);
+ grpc_pollset_add_unlock_job(pollset, &da->closure);
}
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 291181d9a5..107ffb0b5e 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -136,6 +136,8 @@ void grpc_pollset_init(grpc_pollset *pollset) {
pollset->in_flight_cbs = 0;
pollset->shutting_down = 0;
pollset->called_shutdown = 0;
+ pollset->idle_jobs = NULL;
+ pollset->unlock_jobs = NULL;
become_basic_pollset(pollset, NULL);
}
@@ -145,7 +147,6 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
}
gpr_mu_lock(&pollset->mu);
pollset->vtable->add_fd(pollset, fd, 1);
- grpc_workqueue_flush(fd->workqueue, 0);
/* the following (enabled only in debug) will reacquire and then release
our lock - meaning that if the unlocking flag passed to del_fd above is
not respected, the code will deadlock (in a way that we have a chance of
@@ -174,6 +175,33 @@ static void finish_shutdown(grpc_pollset *pollset) {
pollset->shutdown_done_cb(pollset->shutdown_done_arg);
}
+static void run_jobs(grpc_pollset *pollset, grpc_iomgr_closure **root) {
+ grpc_iomgr_closure *exec = *root;
+ *root = NULL;
+ gpr_mu_unlock(&pollset->mu);
+ while (exec != NULL) {
+ grpc_iomgr_closure *next = exec->next;
+ exec->cb(exec->cb_arg, 1);
+ exec = next;
+ }
+ gpr_mu_lock(&pollset->mu);
+}
+
+static void add_job(grpc_iomgr_closure **root, grpc_iomgr_closure *closure) {
+ closure->next = *root;
+ *root = closure;
+}
+
+void grpc_pollset_add_idle_job(grpc_pollset *pollset,
+ grpc_iomgr_closure *closure) {
+ add_job(&pollset->idle_jobs, closure);
+}
+
+void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
+ grpc_iomgr_closure *closure) {
+ add_job(&pollset->unlock_jobs, closure);
+}
+
void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec now, gpr_timespec deadline) {
/* pollset->mu already held */
@@ -182,6 +210,14 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
+ if (!grpc_pollset_has_workers(pollset) && pollset->idle_jobs != NULL) {
+ run_jobs(pollset, &pollset->idle_jobs);
+ goto done;
+ }
+ if (pollset->unlock_jobs != NULL) {
+ run_jobs(pollset, &pollset->unlock_jobs);
+ goto done;
+ }
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
goto done;
}
@@ -301,12 +337,7 @@ static void basic_do_promote(void *args, int success) {
gpr_mu_lock(&pollset->mu);
/* First we need to ensure that nobody is polling concurrently */
- if (grpc_pollset_has_workers(pollset)) {
- grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
- grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
- gpr_mu_unlock(&pollset->mu);
- return;
- }
+ GPR_ASSERT(!grpc_pollset_has_workers(pollset));
gpr_free(up_args);
/* At this point the pollset may no longer be a unary poller. In that case
@@ -390,13 +421,12 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
GRPC_FD_REF(fd, "basicpoll_add");
pollset->in_flight_cbs++;
up_args = gpr_malloc(sizeof(*up_args));
- up_args->pollset = pollset;
up_args->fd = fd;
up_args->original_vtable = pollset->vtable;
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
- grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
+ grpc_pollset_add_idle_job(pollset, &up_args->promotion_closure);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
exit:
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
index 69bd9cca8c..3f89095d40 100644
--- a/src/core/iomgr/pollset_posix.h
+++ b/src/core/iomgr/pollset_posix.h
@@ -37,6 +37,7 @@
#include <poll.h>
#include <grpc/support/sync.h>
+#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/wakeup_fd_posix.h"
typedef struct grpc_pollset_vtable grpc_pollset_vtable;
@@ -66,6 +67,8 @@ typedef struct grpc_pollset {
int kicked_without_pollers;
void (*shutdown_done_cb)(void *arg);
void *shutdown_done_arg;
+ grpc_iomgr_closure *unlock_jobs;
+ grpc_iomgr_closure *idle_jobs;
union {
int fd;
void *ptr;
@@ -124,4 +127,11 @@ int grpc_pollset_has_workers(grpc_pollset *pollset);
typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
+/** schedule a closure to be run next time there are no active workers */
+void grpc_pollset_add_idle_job(grpc_pollset *pollset,
+ grpc_iomgr_closure *closure);
+/** schedule a closure to be run next time the pollset is unlocked */
+void grpc_pollset_add_unlock_job(grpc_pollset *pollset,
+ grpc_iomgr_closure *closure);
+
#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 4ec6aba7f4..6f49060be5 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -77,7 +77,6 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call *registered_calls;
- grpc_iomgr_closure destroy_closure;
char *target;
grpc_workqueue *workqueue;
};
@@ -273,8 +272,7 @@ void grpc_channel_internal_ref(grpc_channel *c) {
gpr_ref(&c->refs);
}
-static void destroy_channel(void *p, int ok) {
- grpc_channel *channel = p;
+static void destroy_channel(grpc_channel *channel) {
size_t i;
grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CHANNEL(channel));
for (i = 0; i < NUM_CACHED_STATUS_ELEMS; i++) {
@@ -312,9 +310,7 @@ void grpc_channel_internal_unref(grpc_channel *channel, const char *reason) {
void grpc_channel_internal_unref(grpc_channel *channel) {
#endif
if (gpr_unref(&channel->refs)) {
- channel->destroy_closure.cb = destroy_channel;
- channel->destroy_closure.cb_arg = channel;
- grpc_workqueue_push(channel->workqueue, &channel->destroy_closure, 1);
+ destroy_channel(channel);
}
}
diff --git a/src/core/surface/channel_connectivity.c b/src/core/surface/channel_connectivity.c
index 12b15f353f..ef47a28fda 100644
--- a/src/core/surface/channel_connectivity.c
+++ b/src/core/surface/channel_connectivity.c
@@ -45,6 +45,8 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
/* forward through to the underlying client channel */
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
+ grpc_connectivity_state state;
if (client_channel_elem->filter != &grpc_client_channel_filter) {
gpr_log(GPR_ERROR,
"grpc_channel_check_connectivity_state called on something that is "
@@ -52,8 +54,10 @@ grpc_connectivity_state grpc_channel_check_connectivity_state(
client_channel_elem->filter->name);
return GRPC_CHANNEL_FATAL_FAILURE;
}
- return grpc_client_channel_check_connectivity_state(client_channel_elem,
- try_to_connect);
+ state = grpc_client_channel_check_connectivity_state(
+ client_channel_elem, try_to_connect, &call_list);
+ grpc_iomgr_call_list_run(call_list);
+ return state;
}
typedef enum {
@@ -154,6 +158,7 @@ void grpc_channel_watch_connectivity_state(
gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
grpc_channel_element *client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
state_watcher *w = gpr_malloc(sizeof(*w));
grpc_cq_begin_op(cq);
@@ -176,13 +181,14 @@ void grpc_channel_watch_connectivity_state(
"grpc_channel_watch_connectivity_state called on something that is "
"not a client channel, but '%s'",
client_channel_elem->filter->name);
- grpc_workqueue_push(grpc_channel_get_workqueue(channel), &w->on_complete,
- 1);
+ grpc_iomgr_call_list_add(&call_list, &w->on_complete, 1);
} else {
GRPC_CHANNEL_INTERNAL_REF(channel, "watch_connectivity");
grpc_client_channel_add_interested_party(client_channel_elem,
grpc_cq_pollset(cq));
grpc_client_channel_watch_connectivity_state(client_channel_elem, &w->state,
- &w->on_complete);
+ &w->on_complete, &call_list);
}
+
+ grpc_iomgr_call_list_run(call_list);
}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index c8c1abb750..9be1f2f7cf 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -164,8 +164,7 @@ typedef struct {
/** data to write next write */
gpr_slice_buffer qbuf;
/** queued callbacks */
- grpc_iomgr_closure *pending_closures_head;
- grpc_iomgr_closure *pending_closures_tail;
+ grpc_iomgr_call_list run_at_unlock;
/** window available for us to send to peer */
gpr_int64 outgoing_window;
@@ -568,11 +567,6 @@ int grpc_chttp2_list_pop_read_write_state_changed(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
-/** schedule a closure to run without the transport lock taken */
-void grpc_chttp2_schedule_closure(
- grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
- int success);
-
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id);
grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index c015e82931..1da3f85bde 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -238,8 +238,8 @@ void grpc_chttp2_cleanup_writing(
stream_global->outgoing_sopb->nops == 0) {
GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_QUEUED_CLOSE);
stream_global->outgoing_sopb = NULL;
- grpc_chttp2_schedule_closure(transport_global,
- stream_global->send_done_closure, 1);
+ grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
+ stream_global->send_done_closure, 1);
}
}
stream_global->writing_now = 0;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 6376c397a2..50d5a3e8a8 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -499,32 +499,20 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_chttp2_transport *t) {
- grpc_iomgr_closure *run_closures;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list run = GRPC_IOMGR_CALL_LIST_INIT;
unlock_check_read_write_state(t);
if (!t->writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
- grpc_chttp2_schedule_closure(&t->global, &t->writing_action, 1);
+ grpc_iomgr_call_list_add(&t->global.run_at_unlock, &t->writing_action, 1);
prevent_endpoint_shutdown(t);
}
- run_closures = t->global.pending_closures_head;
- t->global.pending_closures_head = NULL;
- t->global.pending_closures_tail = NULL;
-
- grpc_connectivity_state_begin_flush(&t->channel_callback.state_tracker, &f);
+ GPR_SWAP(grpc_iomgr_call_list, run, t->global.run_at_unlock);
gpr_mu_unlock(&t->mu);
-
- grpc_connectivity_state_end_flush(&f);
-
- while (run_closures) {
- grpc_iomgr_closure *next = run_closures->next;
- run_closures->cb(run_closures->cb_arg, run_closures->success);
- run_closures = next;
- }
+ grpc_iomgr_call_list_run(run);
}
/*
@@ -676,8 +664,8 @@ static void perform_stream_op_locked(
}
} else {
grpc_sopb_reset(op->send_ops);
- grpc_chttp2_schedule_closure(transport_global,
- stream_global->send_done_closure, 0);
+ grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
+ stream_global->send_done_closure, 0);
}
}
@@ -715,9 +703,8 @@ static void perform_stream_op_locked(
op->bind_pollset);
}
- if (op->on_consumed) {
- grpc_chttp2_schedule_closure(transport_global, op->on_consumed, 1);
- }
+ grpc_iomgr_call_list_add(&transport_global->run_at_unlock, op->on_consumed,
+ 1);
}
static void perform_stream_op(grpc_transport *gt, grpc_stream *gs,
@@ -754,18 +741,12 @@ static void perform_transport_op(grpc_transport *gt, grpc_transport_op *op) {
lock(t);
- if (op->on_consumed) {
- grpc_chttp2_schedule_closure(&t->global, op->on_consumed, 1);
- }
+ grpc_iomgr_call_list_add(&t->global.run_at_unlock, op->on_consumed, 1);
if (op->on_connectivity_state_change) {
- if (grpc_connectivity_state_notify_on_state_change(
- &t->channel_callback.state_tracker, op->connectivity_state,
- op->on_connectivity_state_change)
- .state_already_changed) {
- grpc_chttp2_schedule_closure(&t->global, op->on_connectivity_state_change,
- 1);
- }
+ grpc_connectivity_state_notify_on_state_change(
+ &t->channel_callback.state_tracker, op->connectivity_state,
+ op->on_connectivity_state_change, &t->global.run_at_unlock);
}
if (op->send_goaway) {
@@ -887,8 +868,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
if (stream_global->outgoing_sopb != NULL) {
grpc_sopb_reset(stream_global->outgoing_sopb);
stream_global->outgoing_sopb = NULL;
- grpc_chttp2_schedule_closure(transport_global,
- stream_global->send_done_closure, 1);
+ grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
+ stream_global->send_done_closure, 1);
}
stream_global->read_closed = 1;
if (!stream_global->published_cancelled) {
@@ -938,8 +919,8 @@ static void unlock_check_read_write_state(grpc_chttp2_transport *t) {
&stream_global->outstanding_metadata);
grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
stream_global->published_state = *stream_global->publish_state = state;
- grpc_chttp2_schedule_closure(transport_global,
- stream_global->recv_done_closure, 1);
+ grpc_iomgr_call_list_add(&transport_global->run_at_unlock,
+ stream_global->recv_done_closure, 1);
stream_global->recv_done_closure = NULL;
stream_global->publish_sopb = NULL;
stream_global->publish_state = NULL;
@@ -1200,21 +1181,7 @@ static void connectivity_state_set(
gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
grpc_connectivity_state_set(
&TRANSPORT_FROM_GLOBAL(transport_global)->channel_callback.state_tracker,
- state, reason);
-}
-
-void grpc_chttp2_schedule_closure(
- grpc_chttp2_transport_global *transport_global, grpc_iomgr_closure *closure,
- int success) {
- closure->success = success;
- if (transport_global->pending_closures_tail == NULL) {
- transport_global->pending_closures_head =
- transport_global->pending_closures_tail = closure;
- } else {
- transport_global->pending_closures_tail->next = closure;
- transport_global->pending_closures_tail = closure;
- }
- closure->next = NULL;
+ state, reason, &transport_global->run_at_unlock);
}
/*
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 034f82474f..b605da9dbf 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -64,7 +64,6 @@ void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
tracker->current_state = init_state;
tracker->watchers = NULL;
tracker->name = gpr_strdup(name);
- tracker->changed = 0;
}
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
@@ -90,20 +89,17 @@ grpc_connectivity_state grpc_connectivity_state_check(
return tracker->current_state;
}
-grpc_connectivity_state_notify_on_state_change_result
-grpc_connectivity_state_notify_on_state_change(
+int grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
- grpc_iomgr_closure *notify) {
- grpc_connectivity_state_notify_on_state_change_result result;
- memset(&result, 0, sizeof(result));
+ grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list) {
if (grpc_connectivity_state_trace) {
- gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s]", tracker->name,
- grpc_connectivity_state_name(*current),
- grpc_connectivity_state_name(tracker->current_state));
+ gpr_log(GPR_DEBUG, "CONWATCH: %s: from %s [cur=%s] notify=%p",
+ tracker->name, grpc_connectivity_state_name(*current),
+ grpc_connectivity_state_name(tracker->current_state), notify);
}
if (tracker->current_state != *current) {
*current = tracker->current_state;
- result.state_already_changed = 1;
+ grpc_iomgr_call_list_add(call_list, notify, 1);
} else {
grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
w->current = current;
@@ -111,13 +107,14 @@ grpc_connectivity_state_notify_on_state_change(
w->next = tracker->watchers;
tracker->watchers = w;
}
- result.current_state_is_idle = tracker->current_state == GRPC_CHANNEL_IDLE;
- return result;
+ return tracker->current_state == GRPC_CHANNEL_IDLE;
}
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
- const char *reason) {
+ const char *reason,
+ grpc_iomgr_call_list *call_list) {
+ grpc_connectivity_state_watcher *w;
if (grpc_connectivity_state_trace) {
gpr_log(GPR_DEBUG, "SET: %s: %s --> %s [%s]", tracker->name,
grpc_connectivity_state_name(tracker->current_state),
@@ -128,40 +125,11 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
}
GPR_ASSERT(tracker->current_state != GRPC_CHANNEL_FATAL_FAILURE);
tracker->current_state = state;
- tracker->changed = 1;
-}
-
-void grpc_connectivity_state_begin_flush(
- grpc_connectivity_state_tracker *tracker,
- grpc_connectivity_state_flusher *flusher) {
- grpc_connectivity_state_watcher *w;
- flusher->cbs = NULL;
- if (!tracker->changed) return;
- w = tracker->watchers;
- tracker->watchers = NULL;
- while (w != NULL) {
- grpc_connectivity_state_watcher *next = w->next;
- if (tracker->current_state != *w->current) {
- *w->current = tracker->current_state;
- w->notify->next = flusher->cbs;
- flusher->cbs = w->notify;
- gpr_free(w);
- } else {
- w->next = tracker->watchers;
- tracker->watchers = w;
- }
- w = next;
- }
- tracker->changed = 0;
-}
-
-void grpc_connectivity_state_end_flush(
- grpc_connectivity_state_flusher *flusher) {
- grpc_iomgr_closure *c = flusher->cbs;
- while (c != NULL) {
- grpc_iomgr_closure *next = c;
- c->cb(c->cb_arg, 1);
- c = next;
+ while ((w = tracker->watchers) != NULL) {
+ *w->current = tracker->current_state;
+ tracker->watchers = w->next;
+ grpc_iomgr_call_list_add(call_list, w->notify, 1);
+ gpr_free(w);
}
}
diff --git a/src/core/transport/connectivity_state.h b/src/core/transport/connectivity_state.h
index 323af2740c..49c2815266 100644
--- a/src/core/transport/connectivity_state.h
+++ b/src/core/transport/connectivity_state.h
@@ -54,12 +54,8 @@ typedef struct {
grpc_connectivity_state_watcher *watchers;
/** a name to help debugging */
char *name;
- /** has this state been changed since the last flush? */
- int changed;
} grpc_connectivity_state_tracker;
-typedef struct { grpc_iomgr_closure *cbs; } grpc_connectivity_state_flusher;
-
extern int grpc_connectivity_state_trace;
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
@@ -71,35 +67,15 @@ void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker);
* external lock */
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state,
- const char *reason);
-
-/** Begin flushing callbacks; not thread safe; access must be serialized using
- * the same external lock as grpc_connectivity_state_set. Initializes flusher.
- */
-void grpc_connectivity_state_begin_flush(
- grpc_connectivity_state_tracker *tracker,
- grpc_connectivity_state_flusher *flusher);
-
-/** Complete flushing updates: must not be called with any locks held */
-void grpc_connectivity_state_end_flush(
- grpc_connectivity_state_flusher *flusher);
+ const char *reason,
+ grpc_iomgr_call_list *call_list);
grpc_connectivity_state grpc_connectivity_state_check(
grpc_connectivity_state_tracker *tracker);
-typedef struct {
- /** 1 if the current state is idle (a hint to begin connecting), 0 otherwise
- */
- int current_state_is_idle;
- /** 1 if the state has already changed: in this case the closure passed to
- * grpc_connectivity_state_notify_on_state_change will not be called */
- int state_already_changed;
-} grpc_connectivity_state_notify_on_state_change_result;
-
/** Return 1 if the channel should start connecting, 0 otherwise */
-grpc_connectivity_state_notify_on_state_change_result
-grpc_connectivity_state_notify_on_state_change(
+int grpc_connectivity_state_notify_on_state_change(
grpc_connectivity_state_tracker *tracker, grpc_connectivity_state *current,
- grpc_iomgr_closure *notify) GRPC_MUST_USE_RESULT;
+ grpc_iomgr_closure *notify, grpc_iomgr_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CONNECTIVITY_STATE_H */