diff options
author | 2015-09-18 07:20:29 -0700 | |
---|---|---|
committer | 2015-09-18 07:20:29 -0700 | |
commit | 000cd8f9f7346defc79fe6aa877af11b42ab5f1e (patch) | |
tree | 883d73a97471f63e616d02c1e17efc62b099c8ad /src/core/client_config/subchannel.c | |
parent | 38adec97e875c21cd9d6cc9d039664bdf4fdb889 (diff) |
Introduce call lists for moving work outside locks
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r-- | src/core/client_config/subchannel.c | 91 |
1 files changed, 39 insertions, 52 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 6fbf966475..b15acf826a 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -146,7 +146,8 @@ struct grpc_subchannel_call { static grpc_subchannel_call *create_call(connection *con); static void connectivity_state_changed_locked(grpc_subchannel *c, - const char *reason); + const char *reason, + grpc_iomgr_call_list *call_list); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); @@ -333,16 +334,18 @@ static void start_connect(grpc_subchannel *c) { static void continue_creating_call(void *arg, int iomgr_success) { waiting_for_connect *w4c = arg; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset); grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, - w4c->notify); + w4c->notify, &call_list); GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, grpc_subchannel_call **target, - grpc_iomgr_closure *notify) { + grpc_iomgr_closure *notify, + grpc_iomgr_call_list *call_list) { connection *con; gpr_mu_lock(&c->mu); if (c->active != NULL) { @@ -365,15 +368,12 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, c->waiting = w4c; grpc_subchannel_add_interested_party(c, pollset); if (!c->connecting) { - grpc_connectivity_state_flusher f; c->connecting = 1; - connectivity_state_changed_locked(c, "create_call"); + connectivity_state_changed_locked(c, "create_call", call_list); /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); start_connect(c); } else { @@ -390,33 +390,26 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { return state; } -grpc_connectivity_state_notify_on_state_change_result -grpc_subchannel_notify_on_state_change(grpc_subchannel *c, - grpc_connectivity_state *state, - grpc_iomgr_closure *notify) { +void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, + grpc_connectivity_state *state, + grpc_iomgr_closure *notify, + grpc_iomgr_call_list *call_list) { int do_connect = 0; - grpc_connectivity_state_notify_on_state_change_result r; gpr_mu_lock(&c->mu); - r = grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, - notify); - if (r.current_state_is_idle) { - grpc_connectivity_state_flusher f; + if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, + notify, call_list)) { do_connect = 1; c->connecting = 1; /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - connectivity_state_changed_locked(c, "state_change"); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); - gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); - } else { - gpr_mu_unlock(&c->mu); + connectivity_state_changed_locked(c, "state_change", call_list); } + gpr_mu_unlock(&c->mu); + if (do_connect) { start_connect(c); } - return r; } void grpc_subchannel_process_transport_op(grpc_subchannel *c, @@ -424,24 +417,20 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, connection *con = NULL; grpc_subchannel *destroy; int cancel_alarm = 0; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; CONNECTION_REF_LOCKED(con, "transport-op"); } if (op->disconnect) { - grpc_connectivity_state_flusher f; c->disconnected = 1; - connectivity_state_changed_locked(c, "disconnect"); + connectivity_state_changed_locked(c, "disconnect", &call_list); if (c->have_alarm) { cancel_alarm = 1; } - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); - gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); - } else { - gpr_mu_unlock(&c->mu); } + gpr_mu_unlock(&c->mu); if (con != NULL) { grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); @@ -464,6 +453,8 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, if (op->disconnect) { grpc_connector_shutdown(c->connector); } + + grpc_iomgr_call_list_run(call_list); } static void on_state_changed(void *p, int iomgr_success) { @@ -474,7 +465,7 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_transport_op op; grpc_channel_element *elem; connection *destroy_connection = NULL; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(mu); @@ -508,26 +499,26 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_connectivity_state_set( &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, - "connection_failed"); + "connection_failed", &call_list); break; } done: - connectivity_state_changed_locked(c, "transport_state_changed"); + connectivity_state_changed_locked(c, "transport_state_changed", &call_list); destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); gpr_free(sw); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(mu); - grpc_connectivity_state_end_flush(&f); if (destroy) { subchannel_destroy(c); } if (destroy_connection != NULL) { connection_destroy(destroy_connection); } + grpc_iomgr_call_list_run(call_list); } -static void publish_transport(grpc_subchannel *c) { +static void publish_transport(grpc_subchannel *c, + grpc_iomgr_call_list *call_list) { size_t channel_stack_size; connection *con; grpc_channel_stack *stk; @@ -538,7 +529,6 @@ static void publish_transport(grpc_subchannel *c) { state_watcher *sw; connection *destroy_connection = NULL; grpc_channel_element *elem; - grpc_connectivity_state_flusher f; /* build final filter list */ num_filters = c->num_filters + c->connecting_result.num_filters + 1; @@ -601,17 +591,15 @@ static void publish_transport(grpc_subchannel *c) { elem->filter->start_transport_op(elem, &op); /* signal completion */ - connectivity_state_changed_locked(c, "connected"); + connectivity_state_changed_locked(c, "connected", call_list); w4c = c->waiting; c->waiting = NULL; - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); while (w4c != NULL) { waiting_for_connect *next = w4c; - w4c->continuation.cb(w4c->continuation.cb_arg, 1); + grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1); w4c = next; } @@ -653,16 +641,14 @@ static void update_reconnect_parameters(grpc_subchannel *c) { static void on_alarm(void *arg, int iomgr_success) { grpc_subchannel *c = arg; - grpc_connectivity_state_flusher f; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; gpr_mu_lock(&c->mu); c->have_alarm = 0; if (c->disconnected) { iomgr_success = 0; } - connectivity_state_changed_locked(c, "alarm"); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); + connectivity_state_changed_locked(c, "alarm", &call_list); gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); if (iomgr_success) { update_reconnect_parameters(c); continue_connect(c); @@ -670,24 +656,24 @@ static void on_alarm(void *arg, int iomgr_success) { GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(c, "connecting"); } + grpc_iomgr_call_list_run(call_list); } static void subchannel_connected(void *arg, int iomgr_success) { grpc_subchannel *c = arg; + grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; if (c->connecting_result.transport != NULL) { - publish_transport(c); + publish_transport(c, &call_list); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - grpc_connectivity_state_flusher f; gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - connectivity_state_changed_locked(c, "connect_failed"); + connectivity_state_changed_locked(c, "connect_failed", &call_list); grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now); - grpc_connectivity_state_begin_flush(&c->state_tracker, &f); gpr_mu_unlock(&c->mu); - grpc_connectivity_state_end_flush(&f); } + grpc_iomgr_call_list_run(call_list); } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { @@ -718,9 +704,10 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { } static void connectivity_state_changed_locked(grpc_subchannel *c, - const char *reason) { + const char *reason, + grpc_iomgr_call_list *call_list) { grpc_connectivity_state current = compute_connectivity_locked(c); - grpc_connectivity_state_set(&c->state_tracker, current, reason); + grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list); } /* |