diff options
author | Craig Tiller <ctiller@google.com> | 2017-02-09 13:02:20 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-02-09 13:02:20 -0800 |
commit | d2e5cfc5bbe844ece808a843314458b798368748 (patch) | |
tree | 6aa22e4609db7c41c664e182802a2a3fb75663a2 /src | |
parent | d85477515230c5161659175cbc60b684109aedbf (diff) |
Start fixing refcounting
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 48 |
1 files changed, 19 insertions, 29 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index a7dd967a51..282913431f 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -469,6 +469,9 @@ static void cc_start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, } GRPC_ERROR_UNREF(op->disconnect_with_error); } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op"); + + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); } static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -476,8 +479,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); - GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, @@ -485,6 +486,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } op->transport_private.args[0] = elem; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); grpc_closure_sched( exec_ctx, grpc_closure_init(&op->transport_private.closure, @@ -670,44 +672,24 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, GRPC_ERROR_UNREF(error); } -typedef struct { - grpc_transport_stream_op **ops; - size_t nops; - grpc_subchannel_call *call; -} retry_ops_args; - -static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { - retry_ops_args *a = args; - size_t i; - for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); - } - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); - gpr_free(a->ops); - gpr_free(a); -} - static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { if (calld->waiting_ops_count == 0) { return; } - retry_ops_args *a = gpr_malloc(sizeof(*a)); - a->ops = calld->waiting_ops; - a->nops = calld->waiting_ops_count; - a->call = GET_CALL(calld); - if (a->call == CANCELLED_CALL) { - gpr_free(a); + grpc_subchannel_call *call = GET_CALL(calld); + grpc_transport_stream_op **ops = calld->waiting_ops; + size_t nops = calld->waiting_ops_count; + if (call == CANCELLED_CALL) { fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; - GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_closure_sched( - exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + for (size_t i = 0; i < nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + } } static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -978,6 +960,8 @@ retry: } /* nothing to be done but wait */ add_waiting_locked(calld, op); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "start_transport_stream_op"); GPR_TIMER_END("cc_start_transport_stream_op", 0); } @@ -1008,6 +992,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, return; } /* we failed; lock and figure out what to do */ + GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op"); op->transport_private.args[0] = elem; grpc_closure_sched( exec_ctx, @@ -1109,6 +1094,8 @@ static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx, // do not yet have service config data, then the timer may be reset // later. grpc_deadline_state_start(exec_ctx, elem, calld->deadline); + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "initial_read_service_config"); } /* Constructor for call_data */ @@ -1132,6 +1119,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->owning_call = args->call_stack; calld->pollent = NULL; + GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); grpc_closure_sched( exec_ctx, grpc_closure_init(&calld->read_service_config, @@ -1204,6 +1192,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, &chand->on_resolver_result_changed); } } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); } grpc_connectivity_state grpc_client_channel_check_connectivity_state( @@ -1212,6 +1201,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_connectivity_state out; out = grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); grpc_closure_sched( exec_ctx, grpc_closure_create(try_to_connect_locked, chand, |