From 2dc32eabcbe4f5c4ef502d3b8d162ec57cfe46ae Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 31 Jan 2017 15:32:34 -0800 Subject: Fix race detected by TSAN --- src/core/lib/surface/call.c | 81 +++++++++++++++++++++------------------------ 1 file changed, 37 insertions(+), 44 deletions(-) (limited to 'src/core/lib/surface/call.c') diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 5519a1ec7e..e9c623c2b1 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -86,8 +86,11 @@ typedef enum { /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, - /* Status was created by some internal channel stack operation */ + /* Status was created by some internal channel stack operation: must come via + add_batch_error */ STATUS_FROM_CORE, + /* Status was created by some surface error */ + STATUS_FROM_SURFACE, /* Status came from the server sending status */ STATUS_FROM_SERVER_STATUS, STATUS_SOURCE_COUNT @@ -221,11 +224,11 @@ struct grpc_call { static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, grpc_transport_stream_op *op); -static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_status_code status, - const char *description); +static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + status_source source, grpc_status_code status, + const char *description); static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_error *error); + status_source source, grpc_error *error); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack, grpc_error *error); static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, @@ -337,7 +340,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, args->server_transport_data, path, call->start_time, send_deadline, CALL_STACK_FROM_CALL(call)); if (error != GRPC_ERROR_NONE) { - cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error)); + cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } if (args->cq != NULL) { GPR_ASSERT( @@ -512,7 +515,6 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, grpc_status_code status, const char *description, void *reserved) { - grpc_call_error r; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GRPC_API_TRACE( "grpc_call_cancel_with_status(" @@ -520,16 +522,16 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == NULL); gpr_mu_lock(&c->mu); - r = cancel_with_status(&exec_ctx, c, status, description); + cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status, + description); gpr_mu_unlock(&c->mu); grpc_exec_ctx_finish(&exec_ctx); - return r; + return GRPC_CALL_OK; } typedef struct termination_closure { grpc_closure closure; grpc_call *call; - grpc_error *error; grpc_transport_stream_op op; } termination_closure; @@ -544,36 +546,27 @@ static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp, grpc_error *error) { termination_closure *tc = tcp; memset(&tc->op, 0, sizeof(tc->op)); - tc->op.cancel_error = tc->error; + tc->op.cancel_error = GRPC_ERROR_REF(error); /* reuse closure to catch completion */ - grpc_closure_init(&tc->closure, done_termination, tc, - grpc_schedule_on_exec_ctx); - tc->op.on_complete = &tc->closure; + tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc, + grpc_schedule_on_exec_ctx); execute_op(exec_ctx, tc->call, &tc->op); } -static grpc_call_error terminate_with_status(grpc_exec_ctx *exec_ctx, - termination_closure *tc) { - set_status_from_error(exec_ctx, tc->call, STATUS_FROM_API_OVERRIDE, - GRPC_ERROR_REF(tc->error)); - grpc_closure_init(&tc->closure, send_termination, tc, - grpc_schedule_on_exec_ctx); - GRPC_CALL_INTERNAL_REF(tc->call, "termination"); - grpc_closure_sched(exec_ctx, &tc->closure, GRPC_ERROR_NONE); - return GRPC_CALL_OK; -} - -static grpc_call_error terminate_with_error(grpc_exec_ctx *exec_ctx, - grpc_call *c, grpc_error *error) { +static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, + grpc_error *error) { termination_closure *tc = gpr_malloc(sizeof(*tc)); memset(tc, 0, sizeof(*tc)); tc->call = c; - tc->error = error; - return terminate_with_status(exec_ctx, tc); + GRPC_CALL_INTERNAL_REF(tc->call, "termination"); + grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination, + tc, grpc_schedule_on_exec_ctx), + error); } static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_error *error) { + status_source source, grpc_error *error) { + set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); terminate_with_error(exec_ctx, c, error); } @@ -585,11 +578,11 @@ static grpc_error *error_from_status(grpc_status_code status, GRPC_ERROR_INT_GRPC_STATUS, status); } -static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_status_code status, - const char *description) { - return terminate_with_error(exec_ctx, c, - error_from_status(status, description)); +static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, + status_source source, grpc_status_code status, + const char *description) { + cancel_with_error(exec_ctx, c, source, + error_from_status(status, description)); } /******************************************************************************* @@ -1014,11 +1007,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&call->mu); - if (error != GRPC_ERROR_NONE) { - set_status_from_error(exec_ctx, call, STATUS_FROM_CORE, - GRPC_ERROR_REF(error)); - } - if (bctl->send_initial_metadata) { grpc_metadata_batch_destroy( exec_ctx, @@ -1161,7 +1149,7 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_call *call = bctl->call; gpr_mu_lock(&bctl->call->mu); if (error != GRPC_ERROR_NONE) { - cancel_with_error(exec_ctx, call, GRPC_ERROR_REF(error)); + cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { @@ -1188,7 +1176,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Invalid compression algorithm value '%d'.", algo); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, + GRPC_STATUS_UNIMPLEMENTED, error_msg); } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ @@ -1197,7 +1186,8 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); gpr_log(GPR_ERROR, "%s", error_msg); - cancel_with_status(exec_ctx, call, GRPC_STATUS_UNIMPLEMENTED, error_msg); + cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE, + GRPC_STATUS_UNIMPLEMENTED, error_msg); } else { call->incoming_compression_algorithm = algo; } @@ -1227,7 +1217,10 @@ static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_error *error) { if (error == GRPC_ERROR_NONE) return; int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1); - if (idx == 0) cancel_with_error(exec_ctx, bctl->call, GRPC_ERROR_REF(error)); + if (idx == 0) { + cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE, + GRPC_ERROR_REF(error)); + } bctl->errors[idx] = error; } -- cgit v1.2.3