aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_uchannel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_uchannel.c')
-rw-r--r--src/core/channel/client_uchannel.c82
1 files changed, 23 insertions, 59 deletions
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c
index 456ffb7371..2c0b07d8bf 100644
--- a/src/core/channel/client_uchannel.c
+++ b/src/core/channel/client_uchannel.c
@@ -58,13 +58,13 @@ typedef struct client_uchannel_channel_data {
this channel_data via its channel stack.
We occasionally use this to bump the refcount on the master channel
to keep ourselves alive through an asynchronous operation. */
- grpc_channel *master;
+ grpc_channel_stack *owning_stack;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
/** the subchannel wrapped by the microchannel */
- grpc_subchannel *subchannel;
+ grpc_connected_subchannel *connected_subchannel;
/** the callback used to stay subscribed to subchannel connectivity
* notifications */
@@ -84,15 +84,13 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
chand->subchannel_connectivity,
"uchannel_monitor_subchannel");
- grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
- &chand->subchannel_connectivity,
- &chand->connectivity_cb);
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, chand->connected_subchannel, NULL,
+ &chand->subchannel_connectivity, &chand->connectivity_cb);
}
static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
- chand->master);
+ return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data);
}
static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
@@ -128,11 +126,11 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
grpc_metadata_batch *initial_metadata,
- grpc_subchannel **subchannel,
+ grpc_connected_subchannel **connected_subchannel,
grpc_closure *on_ready) {
channel_data *chand = arg;
GPR_ASSERT(initial_metadata != NULL);
- *subchannel = chand->subchannel;
+ *connected_subchannel = chand->connected_subchannel;
return 1;
}
@@ -140,7 +138,7 @@ static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args) {
grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel,
- elem->channel_data);
+ elem->channel_data, args->call_stack);
}
/* Destructor for call_data */
@@ -158,7 +156,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
GPR_ASSERT(args->is_last);
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
- chand->master = args->master;
+ chand->owning_stack = args->channel_stack;
grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
"client_uchannel");
gpr_mu_init(&chand->mu_state);
@@ -168,10 +166,14 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
- grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel,
- &chand->connectivity_cb);
+ /* cancel subscription */
+ grpc_connected_subchannel_notify_on_state_change(
+ exec_ctx, chand->connected_subchannel, NULL, NULL,
+ &chand->connectivity_cb);
grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
gpr_mu_destroy(&chand->mu_state);
+ GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, chand->connected_subchannel,
+ "uchannel");
}
static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
@@ -191,23 +193,14 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
channel_data *chand = elem->channel_data;
grpc_connectivity_state out;
- out = grpc_connectivity_state_check(&chand->state_tracker);
gpr_mu_lock(&chand->mu_state);
- if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
- grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
- GRPC_CHANNEL_CONNECTING,
- "uchannel_connecting_changed");
- chand->subchannel_connectivity = out;
- grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
- &chand->subchannel_connectivity,
- &chand->connectivity_cb);
- }
+ out = grpc_connectivity_state_check(&chand->state_tracker);
gpr_mu_unlock(&chand->mu_state);
return out;
}
void grpc_client_uchannel_watch_connectivity_state(
- grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
+ grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset,
grpc_connectivity_state *state, grpc_closure *on_complete) {
channel_data *chand = elem->channel_data;
gpr_mu_lock(&chand->mu_state);
@@ -216,40 +209,11 @@ void grpc_client_uchannel_watch_connectivity_state(
gpr_mu_unlock(&chand->mu_state);
}
-grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
- grpc_channel_element *elem) {
- channel_data *chand = elem->channel_data;
- grpc_channel_element *parent_elem;
- gpr_mu_lock(&chand->mu_state);
- parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(
- grpc_subchannel_get_master(chand->subchannel)));
- gpr_mu_unlock(&chand->mu_state);
- return grpc_client_channel_get_connecting_pollset_set(parent_elem);
-}
-
-void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- grpc_pollset_set *master_pollset_set =
- grpc_client_uchannel_get_connecting_pollset_set(elem);
- grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset);
-}
-
-void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
- grpc_channel_element *elem,
- grpc_pollset *pollset) {
- grpc_pollset_set *master_pollset_set =
- grpc_client_uchannel_get_connecting_pollset_set(elem);
- grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset);
-}
-
grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
grpc_channel_args *args) {
grpc_channel *channel = NULL;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
- grpc_channel *master = grpc_subchannel_get_master(subchannel);
- char *target = grpc_channel_get_target(master);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
size_t n = 0;
@@ -261,19 +225,19 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
GPR_ASSERT(n <= MAX_FILTERS);
channel =
- grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1);
+ grpc_channel_create_from_filters(&exec_ctx, NULL, filters, n, args, 1);
- gpr_free(target);
return channel;
}
-void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
- grpc_subchannel *subchannel) {
+void grpc_client_uchannel_set_connected_subchannel(
+ grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) {
grpc_channel_element *elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel));
channel_data *chand = elem->channel_data;
GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
gpr_mu_lock(&chand->mu_state);
- chand->subchannel = subchannel;
+ chand->connected_subchannel = connected_subchannel;
+ GRPC_CONNECTED_SUBCHANNEL_REF(connected_subchannel, "uchannel");
gpr_mu_unlock(&chand->mu_state);
}