aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-09-18 22:39:38 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-09-18 22:39:38 -0700
commit1ed11b7d66947b81295fb48e7a67c02441e562b6 (patch)
tree722d010cfc58418e1bf59c57824d8564ee337bac /src/core/lib/surface
parentadbcec4f57f7e25859326a6daf8ad7e08f2805ca (diff)
parentaafa875cb389140cf721d8470a3a05cf930f74b2 (diff)
Merge github.com:grpc/grpc into pollset_kick_stats
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/call.c8
-rw-r--r--src/core/lib/surface/completion_queue.c19
2 files changed, 17 insertions, 10 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 135257c1ac..dc98532663 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -1512,7 +1512,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
} else if (grpc_compression_options_is_stream_compression_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
- char *algo_name = NULL;
+ const char *algo_name = NULL;
grpc_stream_compression_algorithm_name(algo, &algo_name);
gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.",
algo_name);
@@ -1526,7 +1526,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
if (!GPR_BITGET(call->stream_encodings_accepted_by_peer,
call->incoming_stream_compression_algorithm)) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
- char *algo_name = NULL;
+ const char *algo_name = NULL;
grpc_stream_compression_algorithm_name(
call->incoming_stream_compression_algorithm, &algo_name);
gpr_log(
@@ -1553,7 +1553,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
} else if (grpc_compression_options_is_algorithm_enabled(
&compression_options, algo) == 0) {
/* check if algorithm is supported by current channel config */
- char *algo_name = NULL;
+ const char *algo_name = NULL;
grpc_compression_algorithm_name(algo, &algo_name);
gpr_asprintf(&error_msg, "Compression algorithm '%s' is disabled.",
algo_name);
@@ -1569,7 +1569,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
if (!GPR_BITGET(call->encodings_accepted_by_peer,
call->incoming_compression_algorithm)) {
if (GRPC_TRACER_ON(grpc_compression_trace)) {
- char *algo_name = NULL;
+ const char *algo_name = NULL;
grpc_compression_algorithm_name(call->incoming_compression_algorithm,
&algo_name);
gpr_log(GPR_ERROR,
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 13fb6cccf8..559bcc9792 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -566,13 +566,13 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
* true if the increment was successful; false if the counter is zero */
static bool atm_inc_if_nonzero(gpr_atm *counter) {
while (true) {
- gpr_atm count = gpr_atm_no_barrier_load(counter);
+ gpr_atm count = gpr_atm_acq_load(counter);
/* If zero, we are done. If not, we must to a CAS (instead of an atomic
* increment) to maintain the contract: do not increment the counter if it
* is zero. */
if (count == 0) {
return false;
- } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) {
+ } else if (gpr_atm_full_cas(counter, count, count + 1)) {
break;
}
}
@@ -644,8 +644,12 @@ static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
/* Add the completion to the queue */
bool is_first = cq_event_queue_push(&cqd->queue, storage);
gpr_atm_no_barrier_fetch_add(&cqd->things_queued_ever, 1);
- bool will_definitely_shutdown =
- gpr_atm_no_barrier_load(&cqd->pending_events) == 1;
+
+ /* Since we do not hold the cq lock here, it is important to do an 'acquire'
+ load here (instead of a 'no_barrier' load) to match with the release store
+ (done via gpr_atm_full_fetch_add(pending_events, -1)) in cq_shutdown_next
+ */
+ bool will_definitely_shutdown = gpr_atm_acq_load(&cqd->pending_events) == 1;
if (!will_definitely_shutdown) {
/* Only kick if this is the first item queued */
@@ -889,7 +893,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
}
}
- if (gpr_atm_no_barrier_load(&cqd->pending_events) == 0) {
+ if (gpr_atm_acq_load(&cqd->pending_events) == 0) {
/* Before returning, check if the queue has any items left over (since
gpr_mpscq_pop() can sometimes return NULL even if the queue is not
empty. If so, keep retrying but do not return GRPC_QUEUE_SHUTDOWN */
@@ -935,7 +939,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
}
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
- gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
+ gpr_atm_acq_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
cq->poller_vtable->kick(&exec_ctx, POLLSET_FROM_CQ(cq), NULL);
gpr_mu_unlock(cq->mu);
@@ -986,6 +990,9 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
return;
}
cqd->shutdown_called = true;
+ /* Doing a full_fetch_add (i.e acq/release) here to match with
+ * cq_begin_op_for_next and and cq_end_op_for_next functions which read/write
+ * on this counter without necessarily holding a lock on cq */
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_next(exec_ctx, cq);
}