diff options
author | Yuchen Zeng <zyc@google.com> | 2017-08-09 19:42:18 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2017-08-09 19:42:18 -0700 |
commit | 873bb70087b3dcdd19b25ddc4f079dcc9c22e50a (patch) | |
tree | cb501e42e9e6aabf42f2a91e0381def165970eb4 /src/core | |
parent | 59611fb5710ee21fb49dae52acb92342cc28fcad (diff) |
Fix data race in call.c
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/lib/surface/call.c | 43 |
1 files changed, 29 insertions, 14 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 00ec9c7c9a..e82d538d29 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -170,9 +170,6 @@ struct grpc_call { gpr_atm any_ops_sent_atm; gpr_atm received_final_op_atm; - /* have we received initial metadata */ - bool has_initial_md_been_received; - batch_control *active_batches[MAX_CONCURRENT_BATCHES]; grpc_transport_stream_op_batch_payload stream_op_payload; @@ -226,7 +223,10 @@ struct grpc_call { } server; } final_op; - void *saved_receiving_stream_ready_bctlp; + // Either 0 (no initial metadata and messages received), + // 1 (recieved initial metadata first) + // or a batch_control* (received messages first the lowest bit is 0) + gpr_atm saved_receiving_stream_ready_bctlp; }; grpc_tracer_flag grpc_call_error_trace = @@ -1290,11 +1290,10 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); } - if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || - call->receiving_stream == NULL) { + if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL || + !gpr_atm_rel_cas(&call->saved_receiving_stream_ready_bctlp, 0, + (gpr_atm)bctlp)) { process_data_after_md(exec_ctx, bctlp); - } else { - call->saved_receiving_stream_ready_bctlp = bctlp; } } @@ -1384,12 +1383,28 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, } } - call->has_initial_md_been_received = true; - if (call->saved_receiving_stream_ready_bctlp != NULL) { - grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE( - receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, - grpc_schedule_on_exec_ctx); - call->saved_receiving_stream_ready_bctlp = NULL; + grpc_closure *saved_rsr_closure = NULL; + while (true) { + gpr_atm rsr_bctlp = + gpr_atm_acq_load(&call->saved_receiving_stream_ready_bctlp); + /* Should only receive initial metadata once */ + GPR_ASSERT(rsr_bctlp != 1); + if (rsr_bctlp == 0) { + /* Not received initial metadata and messages */ + if (gpr_atm_no_barrier_cas(&call->saved_receiving_stream_ready_bctlp, 0, + 1)) { + break; + } + } else { + /* Already received messages */ + saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready, + (batch_control *)rsr_bctlp, + grpc_schedule_on_exec_ctx); + /* No need to modify saved_receiving_stream_ready_bctlp */ + break; + } + } + if (saved_rsr_closure != NULL) { GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); } |