aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Yuchen Zeng <zyc@google.com>2017-08-09 19:42:18 -0700
committerGravatar Yuchen Zeng <zyc@google.com>2017-08-09 19:42:18 -0700
commit873bb70087b3dcdd19b25ddc4f079dcc9c22e50a (patch)
treecb501e42e9e6aabf42f2a91e0381def165970eb4 /src/core
parent59611fb5710ee21fb49dae52acb92342cc28fcad (diff)
Fix data race in call.c
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lib/surface/call.c43
1 files changed, 29 insertions, 14 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 00ec9c7c9a..e82d538d29 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -170,9 +170,6 @@ struct grpc_call {
gpr_atm any_ops_sent_atm;
gpr_atm received_final_op_atm;
- /* have we received initial metadata */
- bool has_initial_md_been_received;
-
batch_control *active_batches[MAX_CONCURRENT_BATCHES];
grpc_transport_stream_op_batch_payload stream_op_payload;
@@ -226,7 +223,10 @@ struct grpc_call {
} server;
} final_op;
- void *saved_receiving_stream_ready_bctlp;
+ // Either 0 (no initial metadata and messages received),
+ // 1 (recieved initial metadata first)
+ // or a batch_control* (received messages first the lowest bit is 0)
+ gpr_atm saved_receiving_stream_ready_bctlp;
};
grpc_tracer_flag grpc_call_error_trace =
@@ -1290,11 +1290,10 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
}
- if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
- call->receiving_stream == NULL) {
+ if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL ||
+ !gpr_atm_rel_cas(&call->saved_receiving_stream_ready_bctlp, 0,
+ (gpr_atm)bctlp)) {
process_data_after_md(exec_ctx, bctlp);
- } else {
- call->saved_receiving_stream_ready_bctlp = bctlp;
}
}
@@ -1384,12 +1383,28 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
}
}
- call->has_initial_md_been_received = true;
- if (call->saved_receiving_stream_ready_bctlp != NULL) {
- grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE(
- receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
- grpc_schedule_on_exec_ctx);
- call->saved_receiving_stream_ready_bctlp = NULL;
+ grpc_closure *saved_rsr_closure = NULL;
+ while (true) {
+ gpr_atm rsr_bctlp =
+ gpr_atm_acq_load(&call->saved_receiving_stream_ready_bctlp);
+ /* Should only receive initial metadata once */
+ GPR_ASSERT(rsr_bctlp != 1);
+ if (rsr_bctlp == 0) {
+ /* Not received initial metadata and messages */
+ if (gpr_atm_no_barrier_cas(&call->saved_receiving_stream_ready_bctlp, 0,
+ 1)) {
+ break;
+ }
+ } else {
+ /* Already received messages */
+ saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready,
+ (batch_control *)rsr_bctlp,
+ grpc_schedule_on_exec_ctx);
+ /* No need to modify saved_receiving_stream_ready_bctlp */
+ break;
+ }
+ }
+ if (saved_rsr_closure != NULL) {
GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
}