aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-02-09 13:02:20 -0800
committerGravatar Craig Tiller <ctiller@google.com>2017-02-09 13:02:20 -0800
commitd2e5cfc5bbe844ece808a843314458b798368748 (patch)
tree6aa22e4609db7c41c664e182802a2a3fb75663a2 /src
parentd85477515230c5161659175cbc60b684109aedbf (diff)
Start fixing refcounting
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/client_channel/client_channel.c48
1 files changed, 19 insertions, 29 deletions
diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c
index a7dd967a51..282913431f 100644
--- a/src/core/ext/client_channel/client_channel.c
+++ b/src/core/ext/client_channel/client_channel.c
@@ -469,6 +469,9 @@ static void cc_start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg,
}
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op");
+
+ grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
}
static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -476,8 +479,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_transport_op *op) {
channel_data *chand = elem->channel_data;
- grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE);
-
GPR_ASSERT(op->set_accept_stream == false);
if (op->bind_pollset != NULL) {
grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties,
@@ -485,6 +486,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
op->transport_private.args[0] = elem;
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op");
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&op->transport_private.closure,
@@ -670,44 +672,24 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld,
GRPC_ERROR_UNREF(error);
}
-typedef struct {
- grpc_transport_stream_op **ops;
- size_t nops;
- grpc_subchannel_call *call;
-} retry_ops_args;
-
-static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
- retry_ops_args *a = args;
- size_t i;
- for (i = 0; i < a->nops; i++) {
- grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]);
- }
- GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
- gpr_free(a->ops);
- gpr_free(a);
-}
-
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) {
if (calld->waiting_ops_count == 0) {
return;
}
- retry_ops_args *a = gpr_malloc(sizeof(*a));
- a->ops = calld->waiting_ops;
- a->nops = calld->waiting_ops_count;
- a->call = GET_CALL(calld);
- if (a->call == CANCELLED_CALL) {
- gpr_free(a);
+ grpc_subchannel_call *call = GET_CALL(calld);
+ grpc_transport_stream_op **ops = calld->waiting_ops;
+ size_t nops = calld->waiting_ops_count;
+ if (call == CANCELLED_CALL) {
fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED);
return;
}
calld->waiting_ops = NULL;
calld->waiting_ops_count = 0;
calld->waiting_ops_capacity = 0;
- GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
- grpc_closure_sched(
- exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx),
- GRPC_ERROR_NONE);
+ for (size_t i = 0; i < nops; i++) {
+ grpc_subchannel_call_process_op(exec_ctx, call, ops[i]);
+ }
}
static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg,
@@ -978,6 +960,8 @@ retry:
}
/* nothing to be done but wait */
add_waiting_locked(calld, op);
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
+ "start_transport_stream_op");
GPR_TIMER_END("cc_start_transport_stream_op", 0);
}
@@ -1008,6 +992,7 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
return;
}
/* we failed; lock and figure out what to do */
+ GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op");
op->transport_private.args[0] = elem;
grpc_closure_sched(
exec_ctx,
@@ -1109,6 +1094,8 @@ static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx,
// do not yet have service config data, then the timer may be reset
// later.
grpc_deadline_state_start(exec_ctx, elem, calld->deadline);
+ GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call,
+ "initial_read_service_config");
}
/* Constructor for call_data */
@@ -1132,6 +1119,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx,
calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
calld->owning_call = args->call_stack;
calld->pollent = NULL;
+ GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config");
grpc_closure_sched(
exec_ctx,
grpc_closure_init(&calld->read_service_config,
@@ -1204,6 +1192,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg,
&chand->on_resolver_result_changed);
}
}
+ GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect");
}
grpc_connectivity_state grpc_client_channel_check_connectivity_state(
@@ -1212,6 +1201,7 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
grpc_connectivity_state out;
out = grpc_connectivity_state_check(&chand->state_tracker);
if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+ GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect");
grpc_closure_sched(
exec_ctx,
grpc_closure_create(try_to_connect_locked, chand,