aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_config/subchannel_call_holder.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/client_config/subchannel_call_holder.c')
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c52
1 files changed, 32 insertions, 20 deletions
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 9918fbdcb4..b96a0ad093 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -43,14 +43,14 @@
#define CANCELLED_CALL ((grpc_subchannel_call *)1)
static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
- bool success);
+ grpc_error *error);
static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
- bool success);
+ grpc_error *error);
static void add_waiting_locked(grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op);
static void fail_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder);
+ grpc_subchannel_call_holder *holder, grpc_error *error);
static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder);
@@ -68,6 +68,7 @@ void grpc_subchannel_call_holder_init(
holder->waiting_ops_capacity = 0;
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
holder->owning_call = owning_call;
+ holder->pollent = NULL;
}
void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
@@ -90,7 +91,8 @@ void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call = GET_CALL(holder);
GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
if (call == CANCELLED_CALL) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_CANCELLED);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
@@ -106,7 +108,8 @@ retry:
call = GET_CALL(holder);
if (call == CANCELLED_CALL) {
gpr_mu_unlock(&holder->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_CANCELLED);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
@@ -117,13 +120,13 @@ retry:
return;
}
/* if this is a cancellation, then we can raise our cancelled flag */
- if (op->cancel_with_status != GRPC_STATUS_OK) {
+ if (op->cancel_error != GRPC_ERROR_NONE) {
if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
goto retry;
} else {
switch (holder->creation_phase) {
case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder, GRPC_ERROR_REF(op->cancel_error));
break;
case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
@@ -131,7 +134,8 @@ retry:
break;
}
gpr_mu_unlock(&holder->mu);
- grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op,
+ GRPC_ERROR_CANCELLED);
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
return;
}
@@ -157,7 +161,7 @@ retry:
gpr_atm_rel_store(
&holder->subchannel_call,
(gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollset));
+ exec_ctx, holder->connected_subchannel, holder->pollent));
retry_waiting_locked(exec_ctx, holder);
goto retry;
}
@@ -167,22 +171,28 @@ retry:
GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
}
-static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
grpc_subchannel_call_holder *holder = arg;
gpr_mu_lock(&holder->mu);
GPR_ASSERT(holder->creation_phase ==
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (holder->connected_subchannel == NULL) {
- fail_locked(exec_ctx, holder);
+ gpr_atm_no_barrier_store(&holder->subchannel_call, 1);
+ fail_locked(exec_ctx, holder,
+ GRPC_ERROR_CREATE_REFERENCING("Failed to create subchannel",
+ &error, 1));
} else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
/* already cancelled before subchannel became ready */
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder,
+ GRPC_ERROR_CREATE_REFERENCING(
+ "Cancelled before creating subchannel", &error, 1));
} else {
gpr_atm_rel_store(
&holder->subchannel_call,
(gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollset));
+ exec_ctx, holder->connected_subchannel, holder->pollent));
retry_waiting_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);
@@ -203,18 +213,18 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
a->call = GET_CALL(holder);
if (a->call == CANCELLED_CALL) {
gpr_free(a);
- fail_locked(exec_ctx, holder);
+ fail_locked(exec_ctx, holder, GRPC_ERROR_CANCELLED);
return;
}
holder->waiting_ops = NULL;
holder->waiting_ops_count = 0;
holder->waiting_ops_capacity = 0;
GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
- grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), true,
- NULL);
+ grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a),
+ GRPC_ERROR_NONE, NULL);
}
-static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, bool success) {
+static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
retry_ops_args *a = args;
size_t i;
for (i = 0; i < a->nops; i++) {
@@ -239,13 +249,15 @@ static void add_waiting_locked(grpc_subchannel_call_holder *holder,
}
static void fail_locked(grpc_exec_ctx *exec_ctx,
- grpc_subchannel_call_holder *holder) {
+ grpc_subchannel_call_holder *holder,
+ grpc_error *error) {
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
- grpc_transport_stream_op_finish_with_failure(exec_ctx,
- &holder->waiting_ops[i]);
+ grpc_transport_stream_op_finish_with_failure(
+ exec_ctx, &holder->waiting_ops[i], GRPC_ERROR_REF(error));
}
holder->waiting_ops_count = 0;
+ GRPC_ERROR_UNREF(error);
}
char *grpc_subchannel_call_holder_get_peer(