diff options
author | 2015-09-18 17:29:00 -0700 | |
---|---|---|
committer | 2015-09-18 17:29:00 -0700 | |
commit | d1bec03fa148344b8eac2b59517252d86e4ca858 (patch) | |
tree | f359e48f9151ab7ceff72cd624ad6c7a59e4d304 /src/core/channel/client_channel.c | |
parent | 33825118df7157219cec15382beb006d3462ad96 (diff) |
Call list progress
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r-- | src/core/channel/client_channel.c | 180 |
1 files changed, 90 insertions, 90 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 588b9c36ee..6618336e93 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -128,12 +128,13 @@ static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, GRPC_MUST_USE_RESULT; static void handle_op_after_cancellation(grpc_call_element *elem, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; if (op->send_ops) { grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops); - op->on_done_send->cb(op->on_done_send->cb_arg, 0); + op->on_done_send->cb(op->on_done_send->cb_arg, 0, call_list); } if (op->recv_ops) { char status[GPR_LTOA_MIN_BUFSIZE]; @@ -152,10 +153,10 @@ static void handle_op_after_cancellation(grpc_call_element *elem, mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); grpc_sopb_add_metadata(op->recv_ops, mdb); *op->recv_state = GRPC_STREAM_CLOSED; - op->on_done_recv->cb(op->on_done_recv->cb_arg, 1); + op->on_done_recv->cb(op->on_done_recv->cb_arg, 1, call_list); } if (op->on_consumed) { - op->on_consumed->cb(op->on_consumed->cb_arg, 0); + op->on_consumed->cb(op->on_consumed->cb_arg, 0, call_list); } } @@ -166,12 +167,14 @@ typedef struct { static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, - int continuation); + int continuation, + grpc_call_list *call_list); -static void continue_with_pick(void *arg, int iomgr_success) { +static void continue_with_pick(void *arg, int iomgr_success, + grpc_call_list *call_list) { waiting_call *wc = arg; call_data *calld = wc->elem->call_data; - perform_transport_stream_op(wc->elem, &calld->waiting_op, 1); + perform_transport_stream_op(wc->elem, &calld->waiting_op, 1, call_list); gpr_free(wc); } @@ -193,7 +196,8 @@ static int is_empty(void *p, int len) { return 1; } -static void started_call(void *arg, int iomgr_success) { +static void started_call(void *arg, int iomgr_success, + grpc_call_list *call_list) { call_data *calld = arg; grpc_transport_stream_op op; int have_waiting; @@ -203,7 +207,7 @@ static void started_call(void *arg, int iomgr_success) { memset(&op, 0, sizeof(op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(calld->subchannel_call, &op); + grpc_subchannel_call_process_op(calld->subchannel_call, &op, call_list); } else if (calld->state == CALL_WAITING_FOR_CALL) { have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op)); if (calld->subchannel_call != NULL) { @@ -211,13 +215,14 @@ static void started_call(void *arg, int iomgr_success) { gpr_mu_unlock(&calld->mu_state); if (have_waiting) { grpc_subchannel_call_process_op(calld->subchannel_call, - &calld->waiting_op); + &calld->waiting_op, call_list); } } else { calld->state = CALL_CANCELLED; gpr_mu_unlock(&calld->mu_state); if (have_waiting) { - handle_op_after_cancellation(calld->elem, &calld->waiting_op); + handle_op_after_cancellation(calld->elem, &calld->waiting_op, + call_list); } } } else { @@ -226,20 +231,20 @@ static void started_call(void *arg, int iomgr_success) { } } -static void picked_target(void *arg, int iomgr_success) { +static void picked_target(void *arg, int iomgr_success, + grpc_call_list *call_list) { call_data *calld = arg; grpc_pollset *pollset; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; if (calld->picked_channel == NULL) { /* treat this like a cancellation */ calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE; - perform_transport_stream_op(calld->elem, &calld->waiting_op, 1); + perform_transport_stream_op(calld->elem, &calld->waiting_op, 1, call_list); } else { gpr_mu_lock(&calld->mu_state); if (calld->state == CALL_CANCELLED) { gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(calld->elem, &calld->waiting_op); + handle_op_after_cancellation(calld->elem, &calld->waiting_op, call_list); } else { GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); calld->state = CALL_WAITING_FOR_CALL; @@ -248,10 +253,9 @@ static void picked_target(void *arg, int iomgr_success) { grpc_closure_init(&calld->async_setup_task, started_call, calld); grpc_subchannel_create_call(calld->picked_channel, pollset, &calld->subchannel_call, - &calld->async_setup_task, &call_list); + &calld->async_setup_task, call_list); } } - grpc_call_list_run(call_list); } static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, @@ -283,7 +287,7 @@ static grpc_closure *merge_into_waiting_op(grpc_call_element *elem, return consumed_op; } -static char *cc_get_peer(grpc_call_element *elem) { +static char *cc_get_peer(grpc_call_element *elem, grpc_call_list *call_list) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; @@ -294,8 +298,8 @@ static char *cc_get_peer(grpc_call_element *elem) { subchannel_call = calld->subchannel_call; GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer"); gpr_mu_unlock(&calld->mu_state); - result = grpc_subchannel_call_get_peer(subchannel_call); - GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer"); + result = grpc_subchannel_call_get_peer(subchannel_call, call_list); + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "get_peer", call_list); return result; } else { gpr_mu_unlock(&calld->mu_state); @@ -305,13 +309,13 @@ static char *cc_get_peer(grpc_call_element *elem) { static void perform_transport_stream_op(grpc_call_element *elem, grpc_transport_stream_op *op, - int continuation) { + int continuation, + grpc_call_list *call_list) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_subchannel_call *subchannel_call; grpc_lb_policy *lb_policy; grpc_transport_stream_op op2; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; GPR_ASSERT(elem->filter == &grpc_client_channel_filter); GRPC_CALL_LOG_OP(GPR_INFO, elem, op); @@ -321,15 +325,15 @@ static void perform_transport_stream_op(grpc_call_element *elem, GPR_ASSERT(!continuation); subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_process_op(subchannel_call, op); + grpc_subchannel_call_process_op(subchannel_call, op, call_list); break; case CALL_CANCELLED: gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); + handle_op_after_cancellation(elem, op, call_list); break; case CALL_WAITING_FOR_SEND: GPR_ASSERT(!continuation); - grpc_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1); + grpc_call_list_add(call_list, merge_into_waiting_op(elem, op), 1); if (!calld->waiting_op.send_ops && calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) { gpr_mu_unlock(&calld->mu_state); @@ -355,10 +359,10 @@ static void perform_transport_stream_op(grpc_call_element *elem, op2.on_consumed = NULL; } gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); - handle_op_after_cancellation(elem, &op2); + handle_op_after_cancellation(elem, op, call_list); + handle_op_after_cancellation(elem, &op2, call_list); } else { - grpc_call_list_add(&call_list, merge_into_waiting_op(elem, op), 1); + grpc_call_list_add(call_list, merge_into_waiting_op(elem, op), 1); gpr_mu_unlock(&calld->mu_state); } break; @@ -368,7 +372,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, if (op->cancel_with_status != GRPC_STATUS_OK) { calld->state = CALL_CANCELLED; gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); + handle_op_after_cancellation(elem, op, call_list); } else { calld->waiting_op = *op; @@ -398,9 +402,9 @@ static void perform_transport_stream_op(grpc_call_element *elem, grpc_closure_init(&calld->async_setup_task, picked_target, calld); grpc_lb_policy_pick(lb_policy, bind_pollset, initial_metadata, &calld->picked_channel, - &calld->async_setup_task, &call_list); + &calld->async_setup_task, call_list); - GRPC_LB_POLICY_UNREF(lb_policy, "pick"); + GRPC_LB_POLICY_UNREF(lb_policy, "pick", call_list); } else if (chand->resolver != NULL) { calld->state = CALL_WAITING_FOR_CONFIG; add_to_lb_policy_wait_queue_locked_state_config(elem); @@ -409,7 +413,7 @@ static void perform_transport_stream_op(grpc_call_element *elem, chand->started_resolving = 1; grpc_resolver_next(chand->resolver, &chand->incoming_configuration, - &chand->on_config_changed); + &chand->on_config_changed, call_list); } gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&calld->mu_state); @@ -417,19 +421,18 @@ static void perform_transport_stream_op(grpc_call_element *elem, calld->state = CALL_CANCELLED; gpr_mu_unlock(&chand->mu_config); gpr_mu_unlock(&calld->mu_state); - handle_op_after_cancellation(elem, op); + handle_op_after_cancellation(elem, op, call_list); } } } break; } - - grpc_call_list_run(call_list); } static void cc_start_transport_stream_op(grpc_call_element *elem, - grpc_transport_stream_op *op) { - perform_transport_stream_op(elem, op, 0); + grpc_transport_stream_op *op, + grpc_call_list *call_list) { + perform_transport_stream_op(elem, op, 0, call_list); } static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, @@ -448,16 +451,14 @@ static void on_lb_policy_state_changed_locked(lb_policy_connectivity_watcher *w, } } -static void on_lb_policy_state_changed(void *arg, int iomgr_success) { +static void on_lb_policy_state_changed(void *arg, int iomgr_success, + grpc_call_list *call_list) { lb_policy_connectivity_watcher *w = arg; - grpc_call_list cl = GRPC_CALL_LIST_INIT; gpr_mu_lock(&w->chand->mu_config); - on_lb_policy_state_changed_locked(w, &cl); + on_lb_policy_state_changed_locked(w, call_list); gpr_mu_unlock(&w->chand->mu_config); - grpc_call_list_run(cl); - GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy"); gpr_free(w); } @@ -476,13 +477,13 @@ static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, call_list); } -static void cc_on_config_changed(void *arg, int iomgr_success) { +static void cc_on_config_changed(void *arg, int iomgr_success, + grpc_call_list *call_list) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_resolver *old_resolver; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_call_list cl = GRPC_CALL_LIST_INIT; int exit_idle = 0; if (chand->incoming_configuration != NULL) { @@ -490,10 +491,10 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "channel"); GRPC_LB_POLICY_REF(lb_policy, "config_change"); - state = grpc_lb_policy_check_connectivity(lb_policy, &cl); + state = grpc_lb_policy_check_connectivity(lb_policy, call_list); } - grpc_client_config_unref(chand->incoming_configuration); + grpc_client_config_unref(chand->incoming_configuration, call_list); } chand->incoming_configuration = NULL; @@ -502,7 +503,7 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { - grpc_call_list_move(&chand->waiting_for_config_closures, &cl); + grpc_call_list_move(&chand->waiting_for_config_closures, call_list); } if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) { GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); @@ -514,57 +515,53 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { grpc_resolver *resolver = chand->resolver; GRPC_RESOLVER_REF(resolver, "channel-next"); grpc_connectivity_state_set(&chand->state_tracker, state, "new_lb+resolver", - &cl); + call_list); if (lb_policy != NULL) { - watch_lb_policy(chand, lb_policy, state, &cl); + watch_lb_policy(chand, lb_policy, state, call_list); } gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); grpc_resolver_next(resolver, &chand->incoming_configuration, - &chand->on_config_changed); - GRPC_RESOLVER_UNREF(resolver, "channel-next"); + &chand->on_config_changed, call_list); + GRPC_RESOLVER_UNREF(resolver, "channel-next", call_list); } else { old_resolver = chand->resolver; chand->resolver = NULL; grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone", - &cl); + call_list); gpr_mu_unlock(&chand->mu_config); if (old_resolver != NULL) { - grpc_resolver_shutdown(old_resolver); - GRPC_RESOLVER_UNREF(old_resolver, "channel"); + grpc_resolver_shutdown(old_resolver, call_list); + GRPC_RESOLVER_UNREF(old_resolver, "channel", call_list); } } if (exit_idle) { - grpc_lb_policy_exit_idle(lb_policy, &cl); - GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle"); + grpc_lb_policy_exit_idle(lb_policy, call_list); + GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle", call_list); } if (old_lb_policy != NULL) { - grpc_lb_policy_shutdown(old_lb_policy, &cl); - GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); + grpc_lb_policy_shutdown(old_lb_policy, call_list); + GRPC_LB_POLICY_UNREF(old_lb_policy, "channel", call_list); } if (lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(lb_policy, "config_change"); + GRPC_LB_POLICY_UNREF(lb_policy, "config_change", call_list); } - grpc_call_list_run(cl); GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); } static void cc_start_transport_op(grpc_channel_element *elem, - grpc_transport_op *op) { + grpc_transport_op *op, + grpc_call_list *call_list) { grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; grpc_resolver *destroy_resolver = NULL; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; - if (op->on_consumed) { - grpc_call_list_add(&call_list, op->on_consumed, 1); - op->on_consumed = NULL; - } + grpc_call_list_add(call_list, op->on_consumed, 1); GPR_ASSERT(op->set_accept_stream == NULL); GPR_ASSERT(op->bind_pollset == NULL); @@ -573,7 +570,7 @@ static void cc_start_transport_op(grpc_channel_element *elem, if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( &chand->state_tracker, op->connectivity_state, - op->on_connectivity_state_change, &call_list); + op->on_connectivity_state_change, call_list); op->on_connectivity_state_change = NULL; op->connectivity_state = NULL; } @@ -588,28 +585,26 @@ static void cc_start_transport_op(grpc_channel_element *elem, if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect", - &call_list); + call_list); destroy_resolver = chand->resolver; chand->resolver = NULL; if (chand->lb_policy != NULL) { - grpc_lb_policy_shutdown(chand->lb_policy, &call_list); - GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); + grpc_lb_policy_shutdown(chand->lb_policy, call_list); + GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel", call_list); chand->lb_policy = NULL; } } gpr_mu_unlock(&chand->mu_config); if (destroy_resolver) { - grpc_resolver_shutdown(destroy_resolver); - GRPC_RESOLVER_UNREF(destroy_resolver, "channel"); + grpc_resolver_shutdown(destroy_resolver, call_list); + GRPC_RESOLVER_UNREF(destroy_resolver, "channel", call_list); } if (lb_policy) { - grpc_lb_policy_broadcast(lb_policy, op, &call_list); - GRPC_LB_POLICY_UNREF(lb_policy, "broadcast"); + grpc_lb_policy_broadcast(lb_policy, op, call_list); + GRPC_LB_POLICY_UNREF(lb_policy, "broadcast", call_list); } - - grpc_call_list_run(call_list); } /* Constructor for call_data */ @@ -630,7 +625,8 @@ static void init_call_elem(grpc_call_element *elem, } /* Destructor for call_data */ -static void destroy_call_elem(grpc_call_element *elem) { +static void destroy_call_elem(grpc_call_element *elem, + grpc_call_list *call_list) { call_data *calld = elem->call_data; grpc_subchannel_call *subchannel_call; @@ -642,7 +638,7 @@ static void destroy_call_elem(grpc_call_element *elem) { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); - GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel"); + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "client_channel", call_list); break; case CALL_CREATED: case CALL_CANCELLED: @@ -662,7 +658,7 @@ static void destroy_call_elem(grpc_call_element *elem) { static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, - int is_last) { + int is_last, grpc_call_list *call_list) { channel_data *chand = elem->channel_data; memset(chand, 0, sizeof(*chand)); @@ -681,15 +677,16 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, } /* Destructor for channel_data */ -static void destroy_channel_elem(grpc_channel_element *elem) { +static void destroy_channel_elem(grpc_channel_element *elem, + grpc_call_list *call_list) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_resolver_shutdown(chand->resolver); - GRPC_RESOLVER_UNREF(chand->resolver, "channel"); + grpc_resolver_shutdown(chand->resolver, call_list); + GRPC_RESOLVER_UNREF(chand->resolver, "channel", call_list); } if (chand->lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); + GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel", call_list); } grpc_connectivity_state_destroy(&chand->state_tracker); grpc_pollset_set_destroy(&chand->pollset_set); @@ -710,7 +707,8 @@ const grpc_channel_filter grpc_client_channel_filter = { }; void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, - grpc_resolver *resolver) { + grpc_resolver *resolver, + grpc_call_list *call_list) { /* post construction initialization: set the transport setup pointer */ grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack); channel_data *chand = elem->channel_data; @@ -723,7 +721,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, chand->started_resolving = 1; GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); grpc_resolver_next(resolver, &chand->incoming_configuration, - &chand->on_config_changed); + &chand->on_config_changed, call_list); } gpr_mu_unlock(&chand->mu_config); } @@ -743,7 +741,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); chand->started_resolving = 1; grpc_resolver_next(chand->resolver, &chand->incoming_configuration, - &chand->on_config_changed); + &chand->on_config_changed, call_list); } } } @@ -768,13 +766,15 @@ grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set( } void grpc_client_channel_add_interested_party(grpc_channel_element *elem, - grpc_pollset *pollset) { + grpc_pollset *pollset, + grpc_call_list *call_list) { channel_data *chand = elem->channel_data; - grpc_pollset_set_add_pollset(&chand->pollset_set, pollset); + grpc_pollset_set_add_pollset(&chand->pollset_set, pollset, call_list); } void grpc_client_channel_del_interested_party(grpc_channel_element *elem, - grpc_pollset *pollset) { + grpc_pollset *pollset, + grpc_call_list *call_list) { channel_data *chand = elem->channel_data; - grpc_pollset_set_del_pollset(&chand->pollset_set, pollset); + grpc_pollset_set_del_pollset(&chand->pollset_set, pollset, call_list); } |