diff options
author | Craig Tiller <ctiller@google.com> | 2017-09-18 22:39:38 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2017-09-18 22:39:38 -0700 |
commit | 1ed11b7d66947b81295fb48e7a67c02441e562b6 (patch) | |
tree | 722d010cfc58418e1bf59c57824d8564ee337bac /src/core/lib/surface | |
parent | adbcec4f57f7e25859326a6daf8ad7e08f2805ca (diff) | |
parent | aafa875cb389140cf721d8470a3a05cf930f74b2 (diff) |
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r-- | src/core/lib/surface/call.c | 8 | ||||
-rw-r--r-- | src/core/lib/surface/completion_queue.c | 19 |
2 files changed, 17 insertions, 10 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 135257c1ac..dc98532663 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1512,7 +1512,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } else if (grpc_compression_options_is_stream_compression_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ - char *algo_name = NULL; + const char *algo_name = NULL; grpc_stream_compression_algorithm_name(algo, &algo_name); gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.", algo_name); @@ -1526,7 +1526,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, if (!GPR_BITGET(call->stream_encodings_accepted_by_peer, call->incoming_stream_compression_algorithm)) { if (GRPC_TRACER_ON(grpc_compression_trace)) { - char *algo_name = NULL; + const char *algo_name = NULL; grpc_stream_compression_algorithm_name( call->incoming_stream_compression_algorithm, &algo_name); gpr_log( @@ -1553,7 +1553,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, } else if (grpc_compression_options_is_algorithm_enabled( &compression_options, algo) == 0) { /* check if algorithm is supported by current channel config */ - char *algo_name = NULL; + const char *algo_name = NULL; grpc_compression_algorithm_name(algo, &algo_name); gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.", algo_name); @@ -1569,7 +1569,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, if (!GPR_BITGET(call->encodings_accepted_by_peer, call->incoming_compression_algorithm)) { if (GRPC_TRACER_ON(grpc_compression_trace)) { - char *algo_name = NULL; + const char *algo_name = NULL; grpc_compression_algorithm_name(call->incoming_compression_algorithm, &algo_name); gpr_log(GPR_ERROR, diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 13fb6cccf8..559bcc9792 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -566,13 +566,13 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {} * true if the increment was successful; false if the counter is zero */ static bool atm_inc_if_nonzero(gpr_atm *counter) { while (true) { - gpr_atm count = gpr_atm_no_barrier_load(counter); + gpr_atm count = gpr_atm_acq_load(counter); /* If zero, we are done. If not, we must to a CAS (instead of an atomic * increment) to maintain the contract: do not increment the counter if it * is zero. */ if (count == 0) { return false; - } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) { + } else if (gpr_atm_full_cas(counter, count, count + 1)) { break; } } @@ -644,8 +644,12 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx, /* Add the completion to the queue */ bool is_first = cq_event_queue_push(&cqd->queue, storage); gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1); - bool will_definitely_shutdown = - gpr_atm_no_barrier_load(&cqd->pending_events) == 1; + + /* Since we do not hold the cq lock here, it is important to do an 'acquire' + load here (instead of a 'no_barrier' load) to match with the release store + (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next + */ + bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1; if (!will_definitely_shutdown) { /* Only kick if this is the first item queued */ @@ -889,7 +893,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, } } - if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) { + if (gpr_atm_acq_load(&cqd->pending_events) == 0) { /* Before returning, check if the queue has any items left over (since gpr_mpscq_pop() can sometimes return NULL even if the queue is not empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */ @@ -935,7 +939,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline, } if (cq_event_queue_num_items(&cqd->queue) > 0 && - gpr_atm_no_barrier_load(&cqd->pending_events) > 0) { + gpr_atm_acq_load(&cqd->pending_events) > 0) { gpr_mu_lock(cq->mu); cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), NULL); gpr_mu_unlock(cq->mu); @@ -986,6 +990,9 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx, return; } cqd->shutdown_called = true; + /* Doing a full_fetch_add (i.e acq/release) here to match with + * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write + * on this counter without necessarily holding a lock on cq */ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) { cq_finish_shutdown_next(exec_ctx, cq); } |