aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-08-31 10:54:11 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-08-31 10:54:11 -0700
commit8fd40d5ed9303c693865ff431b7565173a4b898e (patch)
tree5d3f6ce958de74e79a8bbaf71b7eb8bbd8acfecd /src/core/lib/surface
parentc9c78ee96c5a73234513decd398a63d48f14aa89 (diff)
parent8d51e8d17e012f81ca8e94c18f525e1781130481 (diff)
Merge github.com:grpc/grpc into wc
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/alarm.c76
-rw-r--r--src/core/lib/surface/alarm_internal.h40
-rw-r--r--src/core/lib/surface/call.c323
-rw-r--r--src/core/lib/surface/call_test_only.h12
-rw-r--r--src/core/lib/surface/channel.c29
-rw-r--r--src/core/lib/surface/channel_ping.c2
-rw-r--r--src/core/lib/surface/completion_queue.c151
-rw-r--r--src/core/lib/surface/completion_queue.h5
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/core/lib/surface/server.c14
-rw-r--r--src/core/lib/surface/version.c2
11 files changed, 529 insertions, 127 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index ef8405cca8..7d60b1de17 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -15,13 +15,21 @@
* limitations under the License.
*
*/
+#include "src/core/lib/surface/alarm_internal.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
+#ifndef NDEBUG
+grpc_tracer_flag grpc_trace_alarm_refcount =
+ GRPC_TRACER_INITIALIZER(false, "alarm_refcount");
+#endif
+
struct grpc_alarm {
+ gpr_refcount refs;
grpc_timer alarm;
grpc_closure on_alarm;
grpc_cq_completion completion;
@@ -31,13 +39,58 @@ struct grpc_alarm {
void *tag;
};
-static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_cq_completion *c) {}
+static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); }
+
+static void alarm_unref(grpc_alarm *alarm) {
+ if (gpr_unref(&alarm->refs)) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(alarm);
+ }
+}
+
+#ifndef NDEBUG
+static void alarm_ref_dbg(grpc_alarm *alarm, const char *reason,
+ const char *file, int line) {
+ if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
+ gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "Alarm:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", alarm, val,
+ val + 1, reason);
+ }
+
+ alarm_ref(alarm);
+}
+
+static void alarm_unref_dbg(grpc_alarm *alarm, const char *reason,
+ const char *file, int line) {
+ if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
+ gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "Alarm:%p Unref %" PRIdPTR " -> %" PRIdPTR " %s", alarm, val,
+ val - 1, reason);
+ }
+
+ alarm_unref(alarm);
+}
+#endif
+
+static void alarm_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_cq_completion *c) {
+ grpc_alarm *alarm = arg;
+ GRPC_ALARM_UNREF(alarm, "dequeue-end-op");
+}
static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_alarm *alarm = arg;
- grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error,
- do_nothing_end_completion, NULL, &alarm->completion);
+
+ /* We are queuing an op on completion queue. This means, the alarm's structure
+ cannot be destroyed until the op is dequeued. Adding an extra ref
+ here and unref'ing when the op is dequeued will achieve this */
+ GRPC_ALARM_REF(alarm, "queue-end-op");
+ grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error, alarm_end_completion,
+ (void *)alarm, &alarm->completion);
}
grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
@@ -45,11 +98,19 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_ref_init(&alarm->refs, 1);
+
+#ifndef NDEBUG
+ if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
+ gpr_log(GPR_DEBUG, "Alarm:%p created (ref: 1)", alarm);
+ }
+#endif
+
GRPC_CQ_INTERNAL_REF(cq, "alarm");
alarm->cq = cq;
alarm->tag = tag;
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
@@ -66,9 +127,6 @@ void grpc_alarm_cancel(grpc_alarm *alarm) {
}
void grpc_alarm_destroy(grpc_alarm *alarm) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_alarm_cancel(alarm);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
- gpr_free(alarm);
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_ALARM_UNREF(alarm, "alarm_destroy");
}
diff --git a/src/core/lib/surface/alarm_internal.h b/src/core/lib/surface/alarm_internal.h
new file mode 100644
index 0000000000..7f2126c5c9
--- /dev/null
+++ b/src/core/lib/surface/alarm_internal.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2015-2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SURFACE_ALARM_INTERNAL_H
+#define GRPC_CORE_LIB_SURFACE_ALARM_INTERNAL_H
+
+#include <grpc/support/log.h>
+#include "src/core/lib/debug/trace.h"
+
+#ifndef NDEBUG
+
+extern grpc_tracer_flag grpc_trace_alarm_refcount;
+
+#define GRPC_ALARM_REF(a, reason) alarm_ref_dbg(a, reason, __FILE__, __LINE__)
+#define GRPC_ALARM_UNREF(a, reason) \
+ alarm_unref_dbg(a, reason, __FILE__, __LINE__)
+
+#else /* !defined(NDEBUG) */
+
+#define GRPC_ALARM_REF(a, reason) alarm_ref(a)
+#define GRPC_ALARM_UNREF(a, reason) alarm_unref(a)
+
+#endif /* defined(NDEBUG) */
+
+#endif /* GRPC_CORE_LIB_SURFACE_ALARM_INTERNAL_H */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 2365d27307..62d542d2c5 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -144,6 +144,9 @@ typedef struct {
grpc_call *sibling_prev;
} child_call;
+#define RECV_NONE ((gpr_atm)0)
+#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
+
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
@@ -170,9 +173,6 @@ struct grpc_call {
gpr_atm any_ops_sent_atm;
gpr_atm received_final_op_atm;
- /* have we received initial metadata */
- bool has_initial_md_been_received;
-
batch_control *active_batches[MAX_CONCURRENT_BATCHES];
grpc_transport_stream_op_batch_payload stream_op_payload;
@@ -192,8 +192,12 @@ struct grpc_call {
/* Compression algorithm for *incoming* data */
grpc_compression_algorithm incoming_compression_algorithm;
+ /* Stream compression algorithm for *incoming* data */
+ grpc_stream_compression_algorithm incoming_stream_compression_algorithm;
/* Supported encodings (compression algorithms), a bitset */
uint32_t encodings_accepted_by_peer;
+ /* Supported stream encodings (stream compression algorithms), a bitset */
+ uint32_t stream_encodings_accepted_by_peer;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -226,7 +230,23 @@ struct grpc_call {
} server;
} final_op;
- void *saved_receiving_stream_ready_bctlp;
+ /* recv_state can contain one of the following values:
+ RECV_NONE : : no initial metadata and messages received
+ RECV_INITIAL_METADATA_FIRST : received initial metadata first
+ a batch_control* : received messages first
+
+ +------1------RECV_NONE------3-----+
+ | |
+ | |
+ v v
+ RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
+ | ^ | ^
+ | | | |
+ +-----2-----+ +-----4-----+
+
+ For 1, 4: See receiving_initial_metadata_ready() function
+ For 2, 3: See receiving_stream_ready() function */
+ gpr_atm recv_state;
};
grpc_tracer_flag grpc_call_error_trace =
@@ -644,6 +664,8 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
static grpc_error *error_from_status(grpc_status_code status,
const char *description) {
+ // copying 'description' is needed to ensure the grpc_call_cancel_with_status
+ // guarantee that can be short-lived.
return grpc_error_set_int(
grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
GRPC_ERROR_STR_GRPC_MESSAGE,
@@ -750,6 +772,12 @@ static void set_incoming_compression_algorithm(
call->incoming_compression_algorithm = algo;
}
+static void set_incoming_stream_compression_algorithm(
+ grpc_call *call, grpc_stream_compression_algorithm algo) {
+ GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
+ call->incoming_stream_compression_algorithm = algo;
+}
+
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
grpc_call *call) {
grpc_compression_algorithm algorithm;
@@ -763,6 +791,13 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked(
call->encodings_accepted_by_peer);
}
+static grpc_stream_compression_algorithm
+stream_compression_algorithm_for_level_locked(
+ grpc_call *call, grpc_stream_compression_level level) {
+ return grpc_stream_compression_algorithm_for_level(
+ level, call->stream_encodings_accepted_by_peer);
+}
+
uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
uint32_t flags;
flags = call->test_only_last_message_flags;
@@ -817,13 +852,71 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
(void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
}
+static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
+ grpc_call *call,
+ grpc_mdelem mdel) {
+ size_t i;
+ grpc_stream_compression_algorithm algorithm;
+ grpc_slice_buffer accept_encoding_parts;
+ grpc_slice accept_encoding_slice;
+ void *accepted_user_data;
+
+ accepted_user_data =
+ grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
+ if (accepted_user_data != NULL) {
+ call->stream_encodings_accepted_by_peer =
+ (uint32_t)(((uintptr_t)accepted_user_data) - 1);
+ return;
+ }
+
+ accept_encoding_slice = GRPC_MDVALUE(mdel);
+ grpc_slice_buffer_init(&accept_encoding_parts);
+ grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
+
+ /* Always support no compression */
+ GPR_BITSET(&call->stream_encodings_accepted_by_peer,
+ GRPC_STREAM_COMPRESS_NONE);
+ for (i = 0; i < accept_encoding_parts.count; i++) {
+ grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
+ if (grpc_stream_compression_algorithm_parse(accept_encoding_entry_slice,
+ &algorithm)) {
+ GPR_BITSET(&call->stream_encodings_accepted_by_peer, algorithm);
+ } else {
+ char *accept_encoding_entry_str =
+ grpc_slice_to_c_string(accept_encoding_entry_slice);
+ gpr_log(GPR_ERROR,
+ "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
+ accept_encoding_entry_str);
+ gpr_free(accept_encoding_entry_str);
+ }
+ }
+
+ grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
+
+ grpc_mdelem_set_user_data(
+ mdel, destroy_encodings_accepted_by_peer,
+ (void *)(((uintptr_t)call->stream_encodings_accepted_by_peer) + 1));
+}
+
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
uint32_t encodings_accepted_by_peer;
encodings_accepted_by_peer = call->encodings_accepted_by_peer;
return encodings_accepted_by_peer;
}
-static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
+uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer(
+ grpc_call *call) {
+ uint32_t stream_encodings_accepted_by_peer;
+ stream_encodings_accepted_by_peer = call->stream_encodings_accepted_by_peer;
+ return stream_encodings_accepted_by_peer;
+}
+
+grpc_stream_compression_algorithm
+grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call) {
+ return call->incoming_stream_compression_algorithm;
+}
+
+static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) {
return (grpc_linked_mdelem *)&md->internal_data;
}
@@ -847,7 +940,7 @@ static int prepare_application_metadata(
for (i = 0; i < total_count; i++) {
const grpc_metadata *md =
get_md_elem(metadata, additional_metadata, i, count);
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ grpc_linked_mdelem *l = linked_from_md(md);
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
if (!GRPC_LOG_IF_ERROR("validate_metadata",
grpc_validate_header_key_is_legal(md->key))) {
@@ -864,7 +957,7 @@ static int prepare_application_metadata(
for (int j = 0; j < i; j++) {
const grpc_metadata *md =
get_md_elem(metadata, additional_metadata, j, count);
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ grpc_linked_mdelem *l = linked_from_md(md);
GRPC_MDELEM_UNREF(exec_ctx, l->md);
}
return 0;
@@ -882,9 +975,12 @@ static int prepare_application_metadata(
}
for (i = 0; i < total_count; i++) {
grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
- GRPC_LOG_IF_ERROR(
- "prepare_application_metadata",
- grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md)));
+ grpc_linked_mdelem *l = linked_from_md(md);
+ grpc_error *error = grpc_metadata_batch_link_tail(exec_ctx, batch, l);
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
+ }
+ GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
}
call->send_extra_metadata_count = 0;
@@ -931,6 +1027,22 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) {
return algorithm;
}
+static grpc_stream_compression_algorithm decode_stream_compression(
+ grpc_mdelem md) {
+ grpc_stream_compression_algorithm algorithm =
+ grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
+ if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
+ char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR,
+ "Invalid incoming stream compression algorithm: '%s'. Interpreting "
+ "incoming data as uncompressed.",
+ md_c_str);
+ gpr_free(md_c_str);
+ return GRPC_STREAM_COMPRESS_NONE;
+ }
+ return algorithm;
+}
+
static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
int is_trailing) {
if (b->list.count == 0) return;
@@ -955,7 +1067,19 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch *b) {
- if (b->idx.named.grpc_encoding != NULL) {
+ if (b->idx.named.content_encoding != NULL) {
+ if (b->idx.named.grpc_encoding != NULL) {
+ gpr_log(GPR_ERROR,
+ "Received both content-encoding and grpc-encoding header. "
+ "Ignoring grpc-encoding.");
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
+ }
+ GPR_TIMER_BEGIN("incoming_stream_compression_algorithm", 0);
+ set_incoming_stream_compression_algorithm(
+ call, decode_stream_compression(b->idx.named.content_encoding->md));
+ GPR_TIMER_END("incoming_stream_compression_algorithm", 0);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding);
+ } else if (b->idx.named.grpc_encoding != NULL) {
GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
set_incoming_compression_algorithm(
call, decode_compression(b->idx.named.grpc_encoding->md));
@@ -969,6 +1093,13 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
GPR_TIMER_END("encodings_accepted_by_peer", 0);
}
+ if (b->idx.named.accept_encoding != NULL) {
+ GPR_TIMER_BEGIN("stream_encodings_accepted_by_peer", 0);
+ set_stream_encodings_accepted_by_peer(exec_ctx, call,
+ b->idx.named.accept_encoding->md);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding);
+ GPR_TIMER_END("stream_encodings_accepted_by_peer", 0);
+ }
publish_app_metadata(call, b, false);
}
@@ -1285,19 +1416,61 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
}
- if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
- call->receiving_stream == NULL) {
+ /* If recv_state is RECV_NONE, we will save the batch_control
+ * object with rel_cas, and will not use it after the cas. Its corresponding
+ * acq_load is in receiving_initial_metadata_ready() */
+ if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL ||
+ !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
process_data_after_md(exec_ctx, bctlp);
- } else {
- call->saved_receiving_stream_ready_bctlp = bctlp;
}
}
static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
- /* validate call->incoming_compression_algorithm */
- if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
+ /* validate compression algorithms */
+ if (call->incoming_stream_compression_algorithm !=
+ GRPC_STREAM_COMPRESS_NONE) {
+ const grpc_stream_compression_algorithm algo =
+ call->incoming_stream_compression_algorithm;
+ char *error_msg = NULL;
+ const grpc_compression_options compression_options =
+ grpc_channel_compression_options(call->channel);
+ if (algo >= GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
+ gpr_asprintf(&error_msg,
+ "Invalid stream compression algorithm value '%d'.", algo);
+ gpr_log(GPR_ERROR, "%s", error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ } else if (grpc_compression_options_is_stream_compression_algorithm_enabled(
+ &compression_options, algo) == 0) {
+ /* check if algorithm is supported by current channel config */
+ char *algo_name = NULL;
+ grpc_stream_compression_algorithm_name(algo, &algo_name);
+ gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.",
+ algo_name);
+ gpr_log(GPR_ERROR, "%s", error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ }
+ gpr_free(error_msg);
+
+ GPR_ASSERT(call->stream_encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->stream_encodings_accepted_by_peer,
+ call->incoming_stream_compression_algorithm)) {
+ if (GRPC_TRACER_ON(grpc_compression_trace)) {
+ char *algo_name = NULL;
+ grpc_stream_compression_algorithm_name(
+ call->incoming_stream_compression_algorithm, &algo_name);
+ gpr_log(
+ GPR_ERROR,
+ "Stream compression algorithm (content-encoding = '%s') not "
+ "present in the bitset of accepted encodings (accept-encodings: "
+ "'0x%x')",
+ algo_name, call->stream_encodings_accepted_by_peer);
+ }
+ }
+ } else if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
const grpc_compression_algorithm algo =
call->incoming_compression_algorithm;
char *error_msg = NULL;
@@ -1324,22 +1497,20 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
call->incoming_compression_algorithm = algo;
}
gpr_free(error_msg);
- }
- /* make sure the received grpc-encoding is amongst the ones listed in
- * grpc-accept-encoding */
- GPR_ASSERT(call->encodings_accepted_by_peer != 0);
- if (!GPR_BITGET(call->encodings_accepted_by_peer,
- call->incoming_compression_algorithm)) {
- if (GRPC_TRACER_ON(grpc_compression_trace)) {
- char *algo_name = NULL;
- grpc_compression_algorithm_name(call->incoming_compression_algorithm,
- &algo_name);
- gpr_log(GPR_ERROR,
- "Compression algorithm (grpc-encoding = '%s') not present in "
- "the bitset of accepted encodings (grpc-accept-encodings: "
- "'0x%x')",
- algo_name, call->encodings_accepted_by_peer);
+ GPR_ASSERT(call->encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->encodings_accepted_by_peer,
+ call->incoming_compression_algorithm)) {
+ if (GRPC_TRACER_ON(grpc_compression_trace)) {
+ char *algo_name = NULL;
+ grpc_compression_algorithm_name(call->incoming_compression_algorithm,
+ &algo_name);
+ gpr_log(GPR_ERROR,
+ "Compression algorithm (grpc-encoding = '%s') not present in "
+ "the bitset of accepted encodings (grpc-accept-encodings: "
+ "'0x%x')",
+ algo_name, call->encodings_accepted_by_peer);
+ }
}
}
}
@@ -1379,12 +1550,31 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
}
}
- call->has_initial_md_been_received = true;
- if (call->saved_receiving_stream_ready_bctlp != NULL) {
- grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE(
- receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
- grpc_schedule_on_exec_ctx);
- call->saved_receiving_stream_ready_bctlp = NULL;
+ grpc_closure *saved_rsr_closure = NULL;
+ while (true) {
+ gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
+ /* Should only receive initial metadata once */
+ GPR_ASSERT(rsr_bctlp != 1);
+ if (rsr_bctlp == 0) {
+ /* We haven't seen initial metadata and messages before, thus initial
+ * metadata is received first.
+ * no_barrier_cas is used, as this function won't access the batch_control
+ * object saved by receiving_stream_ready() if the initial metadata is
+ * received first. */
+ if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
+ RECV_INITIAL_METADATA_FIRST)) {
+ break;
+ }
+ } else {
+ /* Already received messages */
+ saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready,
+ (batch_control *)rsr_bctlp,
+ grpc_schedule_on_exec_ctx);
+ /* No need to modify recv_state */
+ break;
+ }
+ }
+ if (saved_rsr_closure != NULL) {
GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
}
@@ -1422,7 +1612,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (nops == 0) {
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1466,29 +1656,56 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
/* process compression level */
memset(&compression_md, 0, sizeof(compression_md));
size_t additional_metadata_count = 0;
- grpc_compression_level effective_compression_level;
+ grpc_compression_level effective_compression_level =
+ GRPC_COMPRESS_LEVEL_NONE;
+ grpc_stream_compression_level effective_stream_compression_level =
+ GRPC_STREAM_COMPRESS_LEVEL_NONE;
bool level_set = false;
- if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
+ bool stream_compression = false;
+ if (op->data.send_initial_metadata.maybe_stream_compression_level
+ .is_set) {
+ effective_stream_compression_level =
+ op->data.send_initial_metadata.maybe_stream_compression_level
+ .level;
+ level_set = true;
+ stream_compression = true;
+ } else if (op->data.send_initial_metadata.maybe_compression_level
+ .is_set) {
effective_compression_level =
op->data.send_initial_metadata.maybe_compression_level.level;
level_set = true;
} else {
const grpc_compression_options copts =
grpc_channel_compression_options(call->channel);
- level_set = copts.default_level.is_set;
- if (level_set) {
+ if (copts.default_stream_compression_level.is_set) {
+ level_set = true;
+ effective_stream_compression_level =
+ copts.default_stream_compression_level.level;
+ stream_compression = true;
+ } else if (copts.default_level.is_set) {
+ level_set = true;
effective_compression_level = copts.default_level.level;
}
}
if (level_set && !call->is_client) {
- const grpc_compression_algorithm calgo =
- compression_algorithm_for_level_locked(
- call, effective_compression_level);
- // the following will be picked up by the compress filter and used as
- // the call's compression algorithm.
- compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
- compression_md.value = grpc_compression_algorithm_slice(calgo);
- additional_metadata_count++;
+ if (stream_compression) {
+ const grpc_stream_compression_algorithm calgo =
+ stream_compression_algorithm_for_level_locked(
+ call, effective_stream_compression_level);
+ compression_md.key =
+ GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
+ compression_md.value =
+ grpc_stream_compression_algorithm_slice(calgo);
+ } else {
+ const grpc_compression_algorithm calgo =
+ compression_algorithm_for_level_locked(
+ call, effective_compression_level);
+ /* the following will be picked up by the compress filter and used
+ * as the call's compression algorithm. */
+ compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ compression_md.value = grpc_compression_algorithm_slice(calgo);
+ additional_metadata_count++;
+ }
}
if (op->data.send_initial_metadata.count + additional_metadata_count >
@@ -1723,7 +1940,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
@@ -1844,6 +2061,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) {
return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
+ case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
+ return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
case GRPC_CALL_OK:
return "GRPC_CALL_OK";
}
diff --git a/src/core/lib/surface/call_test_only.h b/src/core/lib/surface/call_test_only.h
index 2f1b80bfd7..a5a01b3679 100644
--- a/src/core/lib/surface/call_test_only.h
+++ b/src/core/lib/surface/call_test_only.h
@@ -42,6 +42,18 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call);
* To be indexed by grpc_compression_algorithm enum values. */
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call);
+/** Returns a bitset for the stream encodings (stream compression algorithms)
+ * supported by \a call's peer.
+ *
+ * To be indexed by grpc_stream_compression_algorithm enum values. */
+uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer(
+ grpc_call *call);
+
+/** Returns the incoming stream compression algorithm (content-encoding header)
+ * received by a call. */
+grpc_stream_compression_algorithm
+grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 5780a18ce8..850fbe6a69 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -142,6 +142,16 @@ grpc_channel *grpc_channel_create_with_builder(
GRPC_COMPRESS_LEVEL_NONE,
GRPC_COMPRESS_LEVEL_COUNT - 1});
} else if (0 == strcmp(args->args[i].key,
+ GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) {
+ channel->compression_options.default_stream_compression_level.is_set =
+ true;
+ channel->compression_options.default_stream_compression_level.level =
+ (grpc_stream_compression_level)grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){GRPC_STREAM_COMPRESS_LEVEL_NONE,
+ GRPC_STREAM_COMPRESS_LEVEL_NONE,
+ GRPC_STREAM_COMPRESS_LEVEL_COUNT - 1});
+ } else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
channel->compression_options.default_algorithm.is_set = true;
channel->compression_options.default_algorithm.algorithm =
@@ -149,12 +159,31 @@ grpc_channel *grpc_channel_create_with_builder(
&args->args[i],
(grpc_integer_options){GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_ALGORITHMS_COUNT - 1});
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
+ channel->compression_options.default_stream_compression_algorithm.is_set =
+ true;
+ channel->compression_options.default_stream_compression_algorithm
+ .algorithm =
+ (grpc_stream_compression_algorithm)grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){
+ GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_NONE,
+ GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT - 1});
} else if (0 ==
strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
channel->compression_options.enabled_algorithms_bitset =
(uint32_t)args->args[i].value.integer |
0x1; /* always support no compression */
+ } else if (0 ==
+ strcmp(
+ args->args[i].key,
+ GRPC_STREAM_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
+ channel->compression_options
+ .enabled_stream_compression_algorithms_bitset =
+ (uint32_t)args->args[i].value.integer |
+ 0x1; /* always support no compression */
}
}
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 80eb80af78..e85b308850 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 978d7b4171..10e4e5ab0c 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -196,7 +196,7 @@ typedef struct cq_vtable {
void (*init)(void *data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
void (*destroy)(void *data);
- void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ bool (*begin_op)(grpc_completion_queue *cq, void *tag);
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
@@ -235,7 +235,8 @@ typedef struct cq_next_data {
/* Number of outstanding events (+1 if not shut down) */
gpr_atm pending_events;
- int shutdown_called;
+ /** 0 initially. 1 once we initiated shutdown */
+ bool shutdown_called;
} cq_next_data;
typedef struct cq_pluck_data {
@@ -244,15 +245,20 @@ typedef struct cq_pluck_data {
grpc_cq_completion *completed_tail;
/** Number of pending events (+1 if we're not shutdown) */
- gpr_refcount pending_events;
+ gpr_atm pending_events;
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
- /** 0 initially, 1 once we've begun shutting down */
+ /** 0 initially. 1 once we completed shutting */
+ /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
+ * (pending_events == 0). So consider removing this in future and use
+ * pending_events */
gpr_atm shutdown;
- int shutdown_called;
+
+ /** 0 initially. 1 once we initiated shutdown */
+ bool shutdown_called;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
@@ -288,8 +294,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq);
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq, void *tag,
@@ -436,7 +442,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
static void cq_init_next(void *ptr) {
cq_next_data *cqd = ptr;
- /* Initial ref is dropped by grpc_completion_queue_shutdown */
+ /* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
@@ -451,12 +457,12 @@ static void cq_destroy_next(void *ptr) {
static void cq_init_pluck(void *ptr) {
cq_pluck_data *cqd = ptr;
- /* Initial ref is dropped by grpc_completion_queue_shutdown */
- gpr_ref_init(&cqd->pending_events, 1);
+ /* Initial count is dropped by grpc_completion_queue_shutdown */
+ gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
gpr_atm_no_barrier_store(&cqd->shutdown, 0);
- cqd->shutdown_called = 0;
+ cqd->shutdown_called = false;
cqd->num_pluckers = 0;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
}
@@ -522,33 +528,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
-}
-
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_ref(&cqd->pending_events);
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
-#ifndef NDEBUG
- gpr_mu_lock(cq->mu);
- if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
- cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
- cq->outstanding_tags =
- gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
- cq->outstanding_tag_capacity);
- }
- cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cq->mu);
-#endif
- cq->vtable->begin_op(cq, tag);
-}
-
#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
@@ -576,6 +555,49 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
+/* Atomically increments a counter only if the counter is not zero. Returns
+ * true if the increment was successful; false if the counter is zero */
+static bool atm_inc_if_nonzero(gpr_atm *counter) {
+ while (true) {
+ gpr_atm count = gpr_atm_no_barrier_load(counter);
+ /* If zero, we are done. If not, we must to a CAS (instead of an atomic
+ * increment) to maintain the contract: do not increment the counter if it
+ * is zero. */
+ if (count == 0) {
+ return false;
+ } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) {
+ break;
+ }
+ }
+
+ return true;
+}
+
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+ return atm_inc_if_nonzero(&cqd->pending_events);
+}
+
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ return atm_inc_if_nonzero(&cqd->pending_events);
+}
+
+bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+ gpr_mu_lock(cq->mu);
+ if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
+ cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
+ cq->outstanding_tags =
+ gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
+ cq->outstanding_tag_capacity);
+ }
+ cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
+ gpr_mu_unlock(cq->mu);
+#endif
+ return cq->vtable->begin_op(cq, tag);
+}
+
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
@@ -696,8 +718,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next);
cqd->completed_tail = storage;
- int shutdown = gpr_unref(&cqd->pending_events);
- if (!shutdown) {
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_pluck(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ } else {
grpc_pollset_worker *pluck_worker = NULL;
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag) {
@@ -717,9 +741,6 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(kick_error);
}
- } else {
- cq_finish_shutdown_pluck(exec_ctx, cq);
- gpr_mu_unlock(cq->mu);
}
GPR_TIMER_END("cq_end_op_for_pluck", 0);
@@ -855,8 +876,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
inconsistent state. If it is the latter, we shold do a 0-timeout poll
so that the thread comes back quickly from poll to make a second
attempt at popping. Not doing this can potentially deadlock this
- thread
- forever (if the deadline is infinity) */
+ thread forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
}
@@ -869,10 +889,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
if (cq_event_queue_num_items(&cqd->queue) > 0) {
/* Go to the beginning of the loop. No point doing a poll because
(cq->shutdown == true) is only possible when there is no pending
- work
- (i.e cq->pending_events == 0) and any outstanding
- grpc_cq_completion
- events are already queued on this cq */
+ work (i.e cq->pending_events == 0) and any outstanding completion
+ events should have already been queued on this cq */
continue;
}
@@ -909,11 +927,6 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
is_finished_arg.first_loop = false;
}
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
- grpc_exec_ctx_finish(&exec_ctx);
- GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
-
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
@@ -921,6 +934,11 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
gpr_mu_unlock(cq->mu);
}
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
+
GPR_TIMER_END("grpc_completion_queue_next", 0);
return ret;
@@ -947,15 +965,20 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
+ /* Need an extra ref for cq here because:
+ * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
+ * Pollset shutdown decrements the cq ref count which can potentially destroy
+ * the cq (if that happens to be the last ref).
+ * Creating an extra ref here prevents the cq from getting destroyed while
+ * this function is still active */
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
- GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
- cqd->shutdown_called = 1;
+ cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_next(exec_ctx, cq);
}
@@ -1167,21 +1190,31 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
&cq->pollset_shutdown_done);
}
+/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
+ * merging them is a bit tricky and probably not worth it */
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ /* Need an extra ref for cq here because:
+ * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
+ * Pollset shutdown decrements the cq ref count which can potentially destroy
+ * the cq (if that happens to be the last ref).
+ * Creating an extra ref here prevents the cq from getting destroyed while
+ * this function is still active */
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
return;
}
- cqd->shutdown_called = 1;
- if (gpr_unref(&cqd->pending_events)) {
+ cqd->shutdown_called = true;
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_pluck(exec_ctx, cq);
}
gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index af44482513..69d144bd95 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
- \a tag is currently used only in debug builds. */
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
+ \a tag is currently used only in debug builds. Return true on success, and
+ false if completion_queue has been shutdown. */
+bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index db111e597f..d199ac060e 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -36,6 +36,7 @@
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/alarm_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel_init.h"
@@ -135,6 +136,7 @@ void grpc_init(void) {
grpc_register_tracer(&grpc_call_error_trace);
#ifndef NDEBUG
grpc_register_tracer(&grpc_trace_pending_tags);
+ grpc_register_tracer(&grpc_trace_alarm_refcount);
grpc_register_tracer(&grpc_trace_cq_refcount);
grpc_register_tracer(&grpc_trace_closure);
grpc_register_tracer(&grpc_trace_error_refcount);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 4bdc03052f..f8c84fb4a8 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -1261,7 +1261,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}
/* stay locked, and gather up some stuff to do */
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
NULL, gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1448,7 +1448,11 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
details->reserved = NULL;
rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
@@ -1498,7 +1502,11 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index 8cef15da80..96c16105e7 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -23,4 +23,4 @@
const char *grpc_version_string(void) { return "4.0.0-dev"; }
-const char *grpc_g_stands_for(void) { return "gregarious"; }
+const char *grpc_g_stands_for(void) { return "gambit"; }