diff options
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r-- | src/core/client_config/subchannel.c | 170 |
1 files changed, 88 insertions, 82 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index c570c55267..d41bf8f566 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -144,14 +144,14 @@ struct grpc_subchannel_call { #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) static grpc_subchannel_call *create_call(connection *con, - grpc_call_list *call_list); + grpc_closure_list *closure_list); static void connectivity_state_changed_locked(grpc_subchannel *c, const char *reason, - grpc_call_list *call_list); + grpc_closure_list *closure_list); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success, - grpc_call_list *call_list); + grpc_closure_list *closure_list); static void subchannel_ref_locked( grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); @@ -159,9 +159,10 @@ static int subchannel_unref_locked( grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); static grpc_subchannel *connection_unref_locked( - connection *c, grpc_call_list *call_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) - GRPC_MUST_USE_RESULT; -static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list); + connection *c, grpc_closure_list *closure_list + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; +static void subchannel_destroy(grpc_subchannel *c, + grpc_closure_list *closure_list); #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #define SUBCHANNEL_REF_LOCKED(p, r) \ @@ -197,9 +198,9 @@ static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list); * connection implementation */ -static void connection_destroy(connection *c, grpc_call_list *call_list) { +static void connection_destroy(connection *c, grpc_closure_list *closure_list) { GPR_ASSERT(c->refs == 0); - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c), call_list); + grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c), closure_list); gpr_free(c); } @@ -211,14 +212,15 @@ static void connection_ref_locked( } static grpc_subchannel *connection_unref_locked( - connection *c, grpc_call_list *call_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + connection *c, + grpc_closure_list *closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel *destroy = NULL; UNREF_LOG("CONNECTION", c); if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) { destroy = c->subchannel; } if (--c->refs == 0 && c->subchannel->active != c) { - connection_destroy(c, call_list); + connection_destroy(c, closure_list); } return destroy; } @@ -246,38 +248,39 @@ void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { } void grpc_subchannel_unref(grpc_subchannel *c, - grpc_call_list *call_list + grpc_closure_list *closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { int destroy; gpr_mu_lock(&c->mu); destroy = subchannel_unref_locked(c REF_PASS_ARGS); gpr_mu_unlock(&c->mu); - if (destroy) subchannel_destroy(c, call_list); + if (destroy) subchannel_destroy(c, closure_list); } -static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list) { +static void subchannel_destroy(grpc_subchannel *c, + grpc_closure_list *closure_list) { if (c->active != NULL) { - connection_destroy(c->active, call_list); + connection_destroy(c->active, closure_list); } gpr_free(c->filters); grpc_channel_args_destroy(c->args); gpr_free(c->addr); grpc_mdctx_unref(c->mdctx); - grpc_connectivity_state_destroy(&c->state_tracker, call_list); - grpc_connector_unref(c->connector, call_list); + grpc_connectivity_state_destroy(&c->state_tracker, closure_list); + grpc_connector_unref(c->connector, closure_list); gpr_free(c); } void grpc_subchannel_add_interested_party(grpc_subchannel *c, grpc_pollset *pollset, - grpc_call_list *call_list) { - grpc_pollset_set_add_pollset(c->pollset_set, pollset, call_list); + grpc_closure_list *closure_list) { + grpc_pollset_set_add_pollset(c->pollset_set, pollset, closure_list); } void grpc_subchannel_del_interested_party(grpc_subchannel *c, grpc_pollset *pollset, - grpc_call_list *call_list) { - grpc_pollset_set_del_pollset(c->pollset_set, pollset, call_list); + grpc_closure_list *closure_list) { + grpc_pollset_set_del_pollset(c->pollset_set, pollset, closure_list); } static gpr_uint32 random_seed() { @@ -313,7 +316,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, return c; } -static void continue_connect(grpc_subchannel *c, grpc_call_list *call_list) { +static void continue_connect(grpc_subchannel *c, + grpc_closure_list *closure_list) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; @@ -323,32 +327,32 @@ static void continue_connect(grpc_subchannel *c, grpc_call_list *call_list) { args.channel_args = c->args; grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->connected, call_list); + &c->connected, closure_list); } -static void start_connect(grpc_subchannel *c, grpc_call_list *call_list) { +static void start_connect(grpc_subchannel *c, grpc_closure_list *closure_list) { c->backoff_delta = gpr_time_from_seconds( GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); c->next_attempt = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); - continue_connect(c, call_list); + continue_connect(c, closure_list); } static void continue_creating_call(void *arg, int iomgr_success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset, - call_list); + closure_list); grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, - w4c->notify, call_list); - GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect", call_list); + w4c->notify, closure_list); + GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect", closure_list); gpr_free(w4c); } void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, grpc_subchannel_call **target, grpc_closure *notify, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { connection *con; gpr_mu_lock(&c->mu); if (c->active != NULL) { @@ -356,8 +360,8 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, CONNECTION_REF_LOCKED(con, "call"); gpr_mu_unlock(&c->mu); - *target = create_call(con, call_list); - notify->cb(notify->cb_arg, 1, call_list); + *target = create_call(con, closure_list); + notify->cb(notify->cb_arg, 1, closure_list); } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; @@ -369,16 +373,16 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect"); grpc_closure_init(&w4c->continuation, continue_creating_call, w4c); c->waiting = w4c; - grpc_subchannel_add_interested_party(c, pollset, call_list); + grpc_subchannel_add_interested_party(c, pollset, closure_list); if (!c->connecting) { c->connecting = 1; - connectivity_state_changed_locked(c, "create_call", call_list); + connectivity_state_changed_locked(c, "create_call", closure_list); /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); gpr_mu_unlock(&c->mu); - start_connect(c, call_list); + start_connect(c, closure_list); } else { gpr_mu_unlock(&c->mu); } @@ -396,28 +400,28 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, grpc_connectivity_state *state, grpc_closure *notify, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { int do_connect = 0; gpr_mu_lock(&c->mu); if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, - notify, call_list)) { + notify, closure_list)) { do_connect = 1; c->connecting = 1; /* released by connection */ SUBCHANNEL_REF_LOCKED(c, "connecting"); GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); - connectivity_state_changed_locked(c, "state_change", call_list); + connectivity_state_changed_locked(c, "state_change", closure_list); } gpr_mu_unlock(&c->mu); if (do_connect) { - start_connect(c, call_list); + start_connect(c, closure_list); } } void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { connection *con = NULL; grpc_subchannel *destroy; int cancel_alarm = 0; @@ -428,7 +432,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, } if (op->disconnect) { c->disconnected = 1; - connectivity_state_changed_locked(c, "disconnect", call_list); + connectivity_state_changed_locked(c, "disconnect", closure_list); if (c->have_alarm) { cancel_alarm = 1; } @@ -439,27 +443,27 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(top_elem, op, call_list); + top_elem->filter->start_transport_op(top_elem, op, closure_list); gpr_mu_lock(&c->mu); - destroy = CONNECTION_UNREF_LOCKED(con, "transport-op", call_list); + destroy = CONNECTION_UNREF_LOCKED(con, "transport-op", closure_list); gpr_mu_unlock(&c->mu); if (destroy) { - subchannel_destroy(destroy, call_list); + subchannel_destroy(destroy, closure_list); } } if (cancel_alarm) { - grpc_alarm_cancel(&c->alarm, call_list); + grpc_alarm_cancel(&c->alarm, closure_list); } if (op->disconnect) { - grpc_connector_shutdown(c->connector, call_list); + grpc_connector_shutdown(c->connector, closure_list); } } static void on_state_changed(void *p, int iomgr_success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { state_watcher *sw = p; grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; @@ -486,7 +490,7 @@ static void on_state_changed(void *p, int iomgr_success, op.on_connectivity_state_change = &sw->closure; elem = grpc_channel_stack_element( CHANNEL_STACK_FROM_CONNECTION(c->active), 0); - elem->filter->start_transport_op(elem, &op, call_list); + elem->filter->start_transport_op(elem, &op, closure_list); /* early out */ gpr_mu_unlock(mu); return; @@ -500,24 +504,25 @@ static void on_state_changed(void *p, int iomgr_success, grpc_connectivity_state_set( &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, - "connection_failed", call_list); + "connection_failed", closure_list); break; } done: - connectivity_state_changed_locked(c, "transport_state_changed", call_list); + connectivity_state_changed_locked(c, "transport_state_changed", closure_list); destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); gpr_free(sw); gpr_mu_unlock(mu); if (destroy) { - subchannel_destroy(c, call_list); + subchannel_destroy(c, closure_list); } if (destroy_connection != NULL) { - connection_destroy(destroy_connection, call_list); + connection_destroy(destroy_connection, closure_list); } } -static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { +static void publish_transport(grpc_subchannel *c, + grpc_closure_list *closure_list) { size_t channel_stack_size; connection *con; grpc_channel_stack *stk; @@ -544,7 +549,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { con->refs = 0; con->subchannel = c; grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, - stk, call_list); + stk, closure_list); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free(c->connecting_result.filters); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -561,9 +566,9 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { gpr_mu_unlock(&c->mu); gpr_free(sw); gpr_free(filters); - grpc_channel_stack_destroy(stk, call_list); - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list); - GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list); + grpc_channel_stack_destroy(stk, closure_list); + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list); + GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list); return; } @@ -583,14 +588,14 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { op.on_connectivity_state_change = &sw->closure; op.bind_pollset_set = c->pollset_set; SUBCHANNEL_REF_LOCKED(c, "state_watcher"); - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list); + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list); GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting")); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); - elem->filter->start_transport_op(elem, &op, call_list); + elem->filter->start_transport_op(elem, &op, closure_list); /* signal completion */ - connectivity_state_changed_locked(c, "connected", call_list); + connectivity_state_changed_locked(c, "connected", closure_list); w4c = c->waiting; c->waiting = NULL; @@ -598,14 +603,14 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { while (w4c != NULL) { waiting_for_connect *next = w4c->next; - grpc_call_list_add(call_list, &w4c->continuation, 1); + grpc_closure_list_add(closure_list, &w4c->continuation, 1); w4c = next; } gpr_free(filters); if (destroy_connection != NULL) { - connection_destroy(destroy_connection, call_list); + connection_destroy(destroy_connection, closure_list); } } @@ -638,36 +643,37 @@ static void update_reconnect_parameters(grpc_subchannel *c) { gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); } -static void on_alarm(void *arg, int iomgr_success, grpc_call_list *call_list) { +static void on_alarm(void *arg, int iomgr_success, + grpc_closure_list *closure_list) { grpc_subchannel *c = arg; gpr_mu_lock(&c->mu); c->have_alarm = 0; if (c->disconnected) { iomgr_success = 0; } - connectivity_state_changed_locked(c, "alarm", call_list); + connectivity_state_changed_locked(c, "alarm", closure_list); gpr_mu_unlock(&c->mu); if (iomgr_success) { update_reconnect_parameters(c); - continue_connect(c, call_list); + continue_connect(c, closure_list); } else { - GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list); - GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list); + GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list); + GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list); } } static void subchannel_connected(void *arg, int iomgr_success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_subchannel *c = arg; if (c->connecting_result.transport != NULL) { - publish_transport(c, call_list); + publish_transport(c, closure_list); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - connectivity_state_changed_locked(c, "connect_failed", call_list); - grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now, call_list); + connectivity_state_changed_locked(c, "connect_failed", closure_list); + grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now, closure_list); gpr_mu_unlock(&c->mu); } } @@ -701,9 +707,9 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { static void connectivity_state_changed_locked(grpc_subchannel *c, const char *reason, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_connectivity_state current = compute_connectivity_locked(c); - grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list); + grpc_connectivity_state_set(&c->state_tracker, current, reason, closure_list); } /* @@ -716,45 +722,45 @@ void grpc_subchannel_call_ref( } void grpc_subchannel_call_unref(grpc_subchannel_call *c, - grpc_call_list *call_list + grpc_closure_list *closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { if (gpr_unref(&c->refs)) { gpr_mu *mu = &c->connection->subchannel->mu; grpc_subchannel *destroy; - grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), call_list); + grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), closure_list); gpr_mu_lock(mu); - destroy = CONNECTION_UNREF_LOCKED(c->connection, "call", call_list); + destroy = CONNECTION_UNREF_LOCKED(c->connection, "call", closure_list); gpr_mu_unlock(mu); gpr_free(c); if (destroy != NULL) { - subchannel_destroy(destroy, call_list); + subchannel_destroy(destroy, closure_list); } } } char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - return top_elem->filter->get_peer(top_elem, call_list); + return top_elem->filter->get_peer(top_elem, closure_list); } void grpc_subchannel_call_process_op(grpc_subchannel_call *call, grpc_transport_stream_op *op, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - top_elem->filter->start_transport_stream_op(top_elem, op, call_list); + top_elem->filter->start_transport_stream_op(top_elem, op, closure_list); } static grpc_subchannel_call *create_call(connection *con, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); call->connection = con; gpr_ref_init(&call->refs, 1); - grpc_call_stack_init(chanstk, NULL, NULL, callstk, call_list); + grpc_call_stack_init(chanstk, NULL, NULL, callstk, closure_list); return call; } |