diff options
Diffstat (limited to 'src/core/channel/client_uchannel.c')
-rw-r--r-- | src/core/channel/client_uchannel.c | 82 |
1 files changed, 23 insertions, 59 deletions
diff --git a/src/core/channel/client_uchannel.c b/src/core/channel/client_uchannel.c index 456ffb7371..2c0b07d8bf 100644 --- a/src/core/channel/client_uchannel.c +++ b/src/core/channel/client_uchannel.c @@ -58,13 +58,13 @@ typedef struct client_uchannel_channel_data { this channel_data via its channel stack. We occasionally use this to bump the refcount on the master channel to keep ourselves alive through an asynchronous operation. */ - grpc_channel *master; + grpc_channel_stack *owning_stack; /** connectivity state being tracked */ grpc_connectivity_state_tracker state_tracker; /** the subchannel wrapped by the microchannel */ - grpc_subchannel *subchannel; + grpc_connected_subchannel *connected_subchannel; /** the callback used to stay subscribed to subchannel connectivity * notifications */ @@ -84,15 +84,13 @@ static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, chand->subchannel_connectivity, "uchannel_monitor_subchannel"); - grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel, - &chand->subchannel_connectivity, - &chand->connectivity_cb); + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, NULL, + &chand->subchannel_connectivity, &chand->connectivity_cb); } static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data, - chand->master); + return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data); } static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, @@ -128,11 +126,11 @@ static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx, static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, grpc_metadata_batch *initial_metadata, - grpc_subchannel **subchannel, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready) { channel_data *chand = arg; GPR_ASSERT(initial_metadata != NULL); - *subchannel = chand->subchannel; + *connected_subchannel = chand->connected_subchannel; return 1; } @@ -140,7 +138,7 @@ static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel, - elem->channel_data); + elem->channel_data, args->call_stack); } /* Destructor for call_data */ @@ -158,7 +156,7 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand); GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); - chand->master = args->master; + chand->owning_stack = args->channel_stack; grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_uchannel"); gpr_mu_init(&chand->mu_state); @@ -168,10 +166,14 @@ static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx, static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; - grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel, - &chand->connectivity_cb); + /* cancel subscription */ + grpc_connected_subchannel_notify_on_state_change( + exec_ctx, chand->connected_subchannel, NULL, NULL, + &chand->connectivity_cb); grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); gpr_mu_destroy(&chand->mu_state); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, chand->connected_subchannel, + "uchannel"); } static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, @@ -191,23 +193,14 @@ grpc_connectivity_state grpc_client_uchannel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; grpc_connectivity_state out; - out = grpc_connectivity_state_check(&chand->state_tracker); gpr_mu_lock(&chand->mu_state); - if (out == GRPC_CHANNEL_IDLE && try_to_connect) { - grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, - GRPC_CHANNEL_CONNECTING, - "uchannel_connecting_changed"); - chand->subchannel_connectivity = out; - grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel, - &chand->subchannel_connectivity, - &chand->connectivity_cb); - } + out = grpc_connectivity_state_check(&chand->state_tracker); gpr_mu_unlock(&chand->mu_state); return out; } void grpc_client_uchannel_watch_connectivity_state( - grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, grpc_connectivity_state *state, grpc_closure *on_complete) { channel_data *chand = elem->channel_data; gpr_mu_lock(&chand->mu_state); @@ -216,40 +209,11 @@ void grpc_client_uchannel_watch_connectivity_state( gpr_mu_unlock(&chand->mu_state); } -grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set( - grpc_channel_element *elem) { - channel_data *chand = elem->channel_data; - grpc_channel_element *parent_elem; - gpr_mu_lock(&chand->mu_state); - parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack( - grpc_subchannel_get_master(chand->subchannel))); - gpr_mu_unlock(&chand->mu_state); - return grpc_client_channel_get_connecting_pollset_set(parent_elem); -} - -void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset); -} - -void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_pollset *pollset) { - grpc_pollset_set *master_pollset_set = - grpc_client_uchannel_get_connecting_pollset_set(elem); - grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset); -} - grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, grpc_channel_args *args) { grpc_channel *channel = NULL; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; - grpc_channel *master = grpc_subchannel_get_master(subchannel); - char *target = grpc_channel_get_target(master); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; size_t n = 0; @@ -261,19 +225,19 @@ grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel, GPR_ASSERT(n <= MAX_FILTERS); channel = - grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1); + grpc_channel_create_from_filters(&exec_ctx, NULL, filters, n, args, 1); - gpr_free(target); return channel; } -void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel, - grpc_subchannel *subchannel) { +void grpc_client_uchannel_set_connected_subchannel( + grpc_channel *uchannel, grpc_connected_subchannel *connected_subchannel) { grpc_channel_element *elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel)); channel_data *chand = elem->channel_data; GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter); gpr_mu_lock(&chand->mu_state); - chand->subchannel = subchannel; + chand->connected_subchannel = connected_subchannel; + GRPC_CONNECTED_SUBCHANNEL_REF(connected_subchannel, "uchannel"); gpr_mu_unlock(&chand->mu_state); } |