aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/subchannel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r--src/core/client_config/subchannel.c170
1 files changed, 88 insertions, 82 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index c570c55267..d41bf8f566 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -144,14 +144,14 @@ struct grpc_subchannel_call {
#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
static grpc_subchannel_call *create_call(connection *con,
- grpc_call_list *call_list);
+ grpc_closure_list *closure_list);
static void connectivity_state_changed_locked(grpc_subchannel *c,
const char *reason,
- grpc_call_list *call_list);
+ grpc_closure_list *closure_list);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
static void subchannel_connected(void *subchannel, int iomgr_success,
- grpc_call_list *call_list);
+ grpc_closure_list *closure_list);
static void subchannel_ref_locked(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
@@ -159,9 +159,10 @@ static int subchannel_unref_locked(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
static grpc_subchannel *connection_unref_locked(
- connection *c, grpc_call_list *call_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
- GRPC_MUST_USE_RESULT;
-static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list);
+ connection *c, grpc_closure_list *closure_list
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
+static void subchannel_destroy(grpc_subchannel *c,
+ grpc_closure_list *closure_list);
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) \
@@ -197,9 +198,9 @@ static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list);
* connection implementation
*/
-static void connection_destroy(connection *c, grpc_call_list *call_list) {
+static void connection_destroy(connection *c, grpc_closure_list *closure_list) {
GPR_ASSERT(c->refs == 0);
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c), call_list);
+ grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c), closure_list);
gpr_free(c);
}
@@ -211,14 +212,15 @@ static void connection_ref_locked(
}
static grpc_subchannel *connection_unref_locked(
- connection *c, grpc_call_list *call_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ connection *c,
+ grpc_closure_list *closure_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
grpc_subchannel *destroy = NULL;
UNREF_LOG("CONNECTION", c);
if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
destroy = c->subchannel;
}
if (--c->refs == 0 && c->subchannel->active != c) {
- connection_destroy(c, call_list);
+ connection_destroy(c, closure_list);
}
return destroy;
}
@@ -246,38 +248,39 @@ void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
}
void grpc_subchannel_unref(grpc_subchannel *c,
- grpc_call_list *call_list
+ grpc_closure_list *closure_list
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
int destroy;
gpr_mu_lock(&c->mu);
destroy = subchannel_unref_locked(c REF_PASS_ARGS);
gpr_mu_unlock(&c->mu);
- if (destroy) subchannel_destroy(c, call_list);
+ if (destroy) subchannel_destroy(c, closure_list);
}
-static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list) {
+static void subchannel_destroy(grpc_subchannel *c,
+ grpc_closure_list *closure_list) {
if (c->active != NULL) {
- connection_destroy(c->active, call_list);
+ connection_destroy(c->active, closure_list);
}
gpr_free(c->filters);
grpc_channel_args_destroy(c->args);
gpr_free(c->addr);
grpc_mdctx_unref(c->mdctx);
- grpc_connectivity_state_destroy(&c->state_tracker, call_list);
- grpc_connector_unref(c->connector, call_list);
+ grpc_connectivity_state_destroy(&c->state_tracker, closure_list);
+ grpc_connector_unref(c->connector, closure_list);
gpr_free(c);
}
void grpc_subchannel_add_interested_party(grpc_subchannel *c,
grpc_pollset *pollset,
- grpc_call_list *call_list) {
- grpc_pollset_set_add_pollset(c->pollset_set, pollset, call_list);
+ grpc_closure_list *closure_list) {
+ grpc_pollset_set_add_pollset(c->pollset_set, pollset, closure_list);
}
void grpc_subchannel_del_interested_party(grpc_subchannel *c,
grpc_pollset *pollset,
- grpc_call_list *call_list) {
- grpc_pollset_set_del_pollset(c->pollset_set, pollset, call_list);
+ grpc_closure_list *closure_list) {
+ grpc_pollset_set_del_pollset(c->pollset_set, pollset, closure_list);
}
static gpr_uint32 random_seed() {
@@ -313,7 +316,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
-static void continue_connect(grpc_subchannel *c, grpc_call_list *call_list) {
+static void continue_connect(grpc_subchannel *c,
+ grpc_closure_list *closure_list) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
@@ -323,32 +327,32 @@ static void continue_connect(grpc_subchannel *c, grpc_call_list *call_list) {
args.channel_args = c->args;
grpc_connector_connect(c->connector, &args, &c->connecting_result,
- &c->connected, call_list);
+ &c->connected, closure_list);
}
-static void start_connect(grpc_subchannel *c, grpc_call_list *call_list) {
+static void start_connect(grpc_subchannel *c, grpc_closure_list *closure_list) {
c->backoff_delta = gpr_time_from_seconds(
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
c->next_attempt =
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
- continue_connect(c, call_list);
+ continue_connect(c, closure_list);
}
static void continue_creating_call(void *arg, int iomgr_success,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset,
- call_list);
+ closure_list);
grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
- w4c->notify, call_list);
- GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect", call_list);
+ w4c->notify, closure_list);
+ GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect", closure_list);
gpr_free(w4c);
}
void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
grpc_subchannel_call **target,
grpc_closure *notify,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@@ -356,8 +360,8 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
CONNECTION_REF_LOCKED(con, "call");
gpr_mu_unlock(&c->mu);
- *target = create_call(con, call_list);
- notify->cb(notify->cb_arg, 1, call_list);
+ *target = create_call(con, closure_list);
+ notify->cb(notify->cb_arg, 1, closure_list);
} else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
@@ -369,16 +373,16 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
c->waiting = w4c;
- grpc_subchannel_add_interested_party(c, pollset, call_list);
+ grpc_subchannel_add_interested_party(c, pollset, closure_list);
if (!c->connecting) {
c->connecting = 1;
- connectivity_state_changed_locked(c, "create_call", call_list);
+ connectivity_state_changed_locked(c, "create_call", closure_list);
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
gpr_mu_unlock(&c->mu);
- start_connect(c, call_list);
+ start_connect(c, closure_list);
} else {
gpr_mu_unlock(&c->mu);
}
@@ -396,28 +400,28 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_connectivity_state *state,
grpc_closure *notify,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
int do_connect = 0;
gpr_mu_lock(&c->mu);
if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state,
- notify, call_list)) {
+ notify, closure_list)) {
do_connect = 1;
c->connecting = 1;
/* released by connection */
SUBCHANNEL_REF_LOCKED(c, "connecting");
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
- connectivity_state_changed_locked(c, "state_change", call_list);
+ connectivity_state_changed_locked(c, "state_change", closure_list);
}
gpr_mu_unlock(&c->mu);
if (do_connect) {
- start_connect(c, call_list);
+ start_connect(c, closure_list);
}
}
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_transport_op *op,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
connection *con = NULL;
grpc_subchannel *destroy;
int cancel_alarm = 0;
@@ -428,7 +432,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
}
if (op->disconnect) {
c->disconnected = 1;
- connectivity_state_changed_locked(c, "disconnect", call_list);
+ connectivity_state_changed_locked(c, "disconnect", closure_list);
if (c->have_alarm) {
cancel_alarm = 1;
}
@@ -439,27 +443,27 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_channel_element *top_elem =
grpc_channel_stack_element(channel_stack, 0);
- top_elem->filter->start_transport_op(top_elem, op, call_list);
+ top_elem->filter->start_transport_op(top_elem, op, closure_list);
gpr_mu_lock(&c->mu);
- destroy = CONNECTION_UNREF_LOCKED(con, "transport-op", call_list);
+ destroy = CONNECTION_UNREF_LOCKED(con, "transport-op", closure_list);
gpr_mu_unlock(&c->mu);
if (destroy) {
- subchannel_destroy(destroy, call_list);
+ subchannel_destroy(destroy, closure_list);
}
}
if (cancel_alarm) {
- grpc_alarm_cancel(&c->alarm, call_list);
+ grpc_alarm_cancel(&c->alarm, closure_list);
}
if (op->disconnect) {
- grpc_connector_shutdown(c->connector, call_list);
+ grpc_connector_shutdown(c->connector, closure_list);
}
}
static void on_state_changed(void *p, int iomgr_success,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
@@ -486,7 +490,7 @@ static void on_state_changed(void *p, int iomgr_success,
op.on_connectivity_state_change = &sw->closure;
elem = grpc_channel_stack_element(
CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
- elem->filter->start_transport_op(elem, &op, call_list);
+ elem->filter->start_transport_op(elem, &op, closure_list);
/* early out */
gpr_mu_unlock(mu);
return;
@@ -500,24 +504,25 @@ static void on_state_changed(void *p, int iomgr_success,
grpc_connectivity_state_set(
&c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE
: GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connection_failed", call_list);
+ "connection_failed", closure_list);
break;
}
done:
- connectivity_state_changed_locked(c, "transport_state_changed", call_list);
+ connectivity_state_changed_locked(c, "transport_state_changed", closure_list);
destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw);
gpr_mu_unlock(mu);
if (destroy) {
- subchannel_destroy(c, call_list);
+ subchannel_destroy(c, closure_list);
}
if (destroy_connection != NULL) {
- connection_destroy(destroy_connection, call_list);
+ connection_destroy(destroy_connection, closure_list);
}
}
-static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
+static void publish_transport(grpc_subchannel *c,
+ grpc_closure_list *closure_list) {
size_t channel_stack_size;
connection *con;
grpc_channel_stack *stk;
@@ -544,7 +549,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
con->refs = 0;
con->subchannel = c;
grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx,
- stk, call_list);
+ stk, closure_list);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
gpr_free(c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
@@ -561,9 +566,9 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
gpr_mu_unlock(&c->mu);
gpr_free(sw);
gpr_free(filters);
- grpc_channel_stack_destroy(stk, call_list);
- GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list);
- GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list);
+ grpc_channel_stack_destroy(stk, closure_list);
+ GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list);
+ GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list);
return;
}
@@ -583,14 +588,14 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
op.on_connectivity_state_change = &sw->closure;
op.bind_pollset_set = c->pollset_set;
SUBCHANNEL_REF_LOCKED(c, "state_watcher");
- GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list);
+ GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list);
GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
- elem->filter->start_transport_op(elem, &op, call_list);
+ elem->filter->start_transport_op(elem, &op, closure_list);
/* signal completion */
- connectivity_state_changed_locked(c, "connected", call_list);
+ connectivity_state_changed_locked(c, "connected", closure_list);
w4c = c->waiting;
c->waiting = NULL;
@@ -598,14 +603,14 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
while (w4c != NULL) {
waiting_for_connect *next = w4c->next;
- grpc_call_list_add(call_list, &w4c->continuation, 1);
+ grpc_closure_list_add(closure_list, &w4c->continuation, 1);
w4c = next;
}
gpr_free(filters);
if (destroy_connection != NULL) {
- connection_destroy(destroy_connection, call_list);
+ connection_destroy(destroy_connection, closure_list);
}
}
@@ -638,36 +643,37 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
}
-static void on_alarm(void *arg, int iomgr_success, grpc_call_list *call_list) {
+static void on_alarm(void *arg, int iomgr_success,
+ grpc_closure_list *closure_list) {
grpc_subchannel *c = arg;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
iomgr_success = 0;
}
- connectivity_state_changed_locked(c, "alarm", call_list);
+ connectivity_state_changed_locked(c, "alarm", closure_list);
gpr_mu_unlock(&c->mu);
if (iomgr_success) {
update_reconnect_parameters(c);
- continue_connect(c, call_list);
+ continue_connect(c, closure_list);
} else {
- GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", call_list);
- GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list);
+ GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting", closure_list);
+ GRPC_SUBCHANNEL_UNREF(c, "connecting", closure_list);
}
}
static void subchannel_connected(void *arg, int iomgr_success,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
grpc_subchannel *c = arg;
if (c->connecting_result.transport != NULL) {
- publish_transport(c, call_list);
+ publish_transport(c, closure_list);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- connectivity_state_changed_locked(c, "connect_failed", call_list);
- grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now, call_list);
+ connectivity_state_changed_locked(c, "connect_failed", closure_list);
+ grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now, closure_list);
gpr_mu_unlock(&c->mu);
}
}
@@ -701,9 +707,9 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
static void connectivity_state_changed_locked(grpc_subchannel *c,
const char *reason,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
grpc_connectivity_state current = compute_connectivity_locked(c);
- grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list);
+ grpc_connectivity_state_set(&c->state_tracker, current, reason, closure_list);
}
/*
@@ -716,45 +722,45 @@ void grpc_subchannel_call_ref(
}
void grpc_subchannel_call_unref(grpc_subchannel_call *c,
- grpc_call_list *call_list
+ grpc_closure_list *closure_list
GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
if (gpr_unref(&c->refs)) {
gpr_mu *mu = &c->connection->subchannel->mu;
grpc_subchannel *destroy;
- grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), call_list);
+ grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), closure_list);
gpr_mu_lock(mu);
- destroy = CONNECTION_UNREF_LOCKED(c->connection, "call", call_list);
+ destroy = CONNECTION_UNREF_LOCKED(c->connection, "call", closure_list);
gpr_mu_unlock(mu);
gpr_free(c);
if (destroy != NULL) {
- subchannel_destroy(destroy, call_list);
+ subchannel_destroy(destroy, closure_list);
}
}
}
char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
- return top_elem->filter->get_peer(top_elem, call_list);
+ return top_elem->filter->get_peer(top_elem, closure_list);
}
void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
grpc_transport_stream_op *op,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
- top_elem->filter->start_transport_stream_op(top_elem, op, call_list);
+ top_elem->filter->start_transport_stream_op(top_elem, op, closure_list);
}
static grpc_subchannel_call *create_call(connection *con,
- grpc_call_list *call_list) {
+ grpc_closure_list *closure_list) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con;
gpr_ref_init(&call->refs, 1);
- grpc_call_stack_init(chanstk, NULL, NULL, callstk, call_list);
+ grpc_call_stack_init(chanstk, NULL, NULL, callstk, closure_list);
return call;
}