aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/subchannel.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:20:29 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-18 07:20:29 -0700
commit000cd8f9f7346defc79fe6aa877af11b42ab5f1e (patch)
tree883d73a97471f63e616d02c1e17efc62b099c8ad /src/core/client_config/subchannel.c
parent38adec97e875c21cd9d6cc9d039664bdf4fdb889 (diff)
Introduce call lists for moving work outside locks
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r--src/core/client_config/subchannel.c91
1 files changed, 39 insertions, 52 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 6fbf966475..b15acf826a 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -146,7 +146,8 @@ struct grpc_subchannel_call {
static grpc_subchannel_call *create_call(connection *con);
static void connectivity_state_changed_locked(grpc_subchannel *c,
- const char *reason);
+ const char *reason,
+ grpc_iomgr_call_list *call_list);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success);
@@ -333,16 +334,18 @@ static void start_connect(grpc_subchannel *c) {
static void continue_creating_call(void *arg, int iomgr_success) {
waiting_for_connect *w4c = arg;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset);
grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
- w4c->notify);
+ w4c->notify, &call_list);
GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
grpc_subchannel_call **target,
- grpc_iomgr_closure *notify) {
+ grpc_iomgr_closure *notify,
+ grpc_iomgr_call_list *call_list) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@@ -365,15 +368,12 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
c->waiting = w4c;
grpc_subchannel_add_interested_party(c, pollset);
if (!c->connecting) {
- grpc_connectivity_state_flusher f;
c->connecting = 1;
- connectivity_state_changed_locked(c, "create_call");
+ connectivity_state_changed_locked(c, "create_call", call_list);
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
start_connect(c);
} else {
@@ -390,33 +390,26 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
return state;
}
-grpc_connectivity_state_notify_on_state_change_result
-grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
- grpc_connectivity_state *state,
- grpc_iomgr_closure *notify) {
+void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
+ grpc_connectivity_state *state,
+ grpc_iomgr_closure *notify,
+ grpc_iomgr_call_list *call_list) {
int do_connect = 0;
- grpc_connectivity_state_notify_on_state_change_result r;
gpr_mu_lock(&c->mu);
- r = grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
- notify);
- if (r.current_state_is_idle) {
- grpc_connectivity_state_flusher f;
+ if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
+ notify, call_list)) {
do_connect = 1;
c->connecting = 1;
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- connectivity_state_changed_locked(c, "state_change");
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
- gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
- } else {
- gpr_mu_unlock(&c->mu);
+ connectivity_state_changed_locked(c, "state_change", call_list);
}
+ gpr_mu_unlock(&c->mu);
+
if (do_connect) {
start_connect(c);
}
- return r;
}
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
@@ -424,24 +417,20 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
connection *con = NULL;
grpc_subchannel *destroy;
int cancel_alarm = 0;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
CONNECTION_REF_LOCKED(con, "transport-op");
}
if (op->disconnect) {
- grpc_connectivity_state_flusher f;
c->disconnected = 1;
- connectivity_state_changed_locked(c, "disconnect");
+ connectivity_state_changed_locked(c, "disconnect", &call_list);
if (c->have_alarm) {
cancel_alarm = 1;
}
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
- gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
- } else {
- gpr_mu_unlock(&c->mu);
}
+ gpr_mu_unlock(&c->mu);
if (con != NULL) {
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
@@ -464,6 +453,8 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
if (op->disconnect) {
grpc_connector_shutdown(c->connector);
}
+
+ grpc_iomgr_call_list_run(call_list);
}
static void on_state_changed(void *p, int iomgr_success) {
@@ -474,7 +465,7 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(mu);
@@ -508,26 +499,26 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_connectivity_state_set(
&c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE
: GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connection_failed");
+ "connection_failed", &call_list);
break;
}
done:
- connectivity_state_changed_locked(c, "transport_state_changed");
+ connectivity_state_changed_locked(c, "transport_state_changed", &call_list);
destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw);
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(mu);
- grpc_connectivity_state_end_flush(&f);
if (destroy) {
subchannel_destroy(c);
}
if (destroy_connection != NULL) {
connection_destroy(destroy_connection);
}
+ grpc_iomgr_call_list_run(call_list);
}
-static void publish_transport(grpc_subchannel *c) {
+static void publish_transport(grpc_subchannel *c,
+ grpc_iomgr_call_list *call_list) {
size_t channel_stack_size;
connection *con;
grpc_channel_stack *stk;
@@ -538,7 +529,6 @@ static void publish_transport(grpc_subchannel *c) {
state_watcher *sw;
connection *destroy_connection = NULL;
grpc_channel_element *elem;
- grpc_connectivity_state_flusher f;
/* build final filter list */
num_filters = c->num_filters + c->connecting_result.num_filters + 1;
@@ -601,17 +591,15 @@ static void publish_transport(grpc_subchannel *c) {
elem->filter->start_transport_op(elem, &op);
/* signal completion */
- connectivity_state_changed_locked(c, "connected");
+ connectivity_state_changed_locked(c, "connected", call_list);
w4c = c->waiting;
c->waiting = NULL;
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
while (w4c != NULL) {
waiting_for_connect *next = w4c;
- w4c->continuation.cb(w4c->continuation.cb_arg, 1);
+ grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1);
w4c = next;
}
@@ -653,16 +641,14 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
static void on_alarm(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
- grpc_connectivity_state_flusher f;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
iomgr_success = 0;
}
- connectivity_state_changed_locked(c, "alarm");
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
+ connectivity_state_changed_locked(c, "alarm", &call_list);
gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
if (iomgr_success) {
update_reconnect_parameters(c);
continue_connect(c);
@@ -670,24 +656,24 @@ static void on_alarm(void *arg, int iomgr_success) {
GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
GRPC_SUBCHANNEL_UNREF(c, "connecting");
}
+ grpc_iomgr_call_list_run(call_list);
}
static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
+ grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT;
if (c->connecting_result.transport != NULL) {
- publish_transport(c);
+ publish_transport(c, &call_list);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
- grpc_connectivity_state_flusher f;
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- connectivity_state_changed_locked(c, "connect_failed");
+ connectivity_state_changed_locked(c, "connect_failed", &call_list);
grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
- grpc_connectivity_state_begin_flush(&c->state_tracker, &f);
gpr_mu_unlock(&c->mu);
- grpc_connectivity_state_end_flush(&f);
}
+ grpc_iomgr_call_list_run(call_list);
}
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
@@ -718,9 +704,10 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
}
static void connectivity_state_changed_locked(grpc_subchannel *c,
- const char *reason) {
+ const char *reason,
+ grpc_iomgr_call_list *call_list) {
grpc_connectivity_state current = compute_connectivity_locked(c);
- grpc_connectivity_state_set(&c->state_tracker, current, reason);
+ grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list);
}
/*