aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/surface
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-08-29 14:16:14 -0700
committerGravatar Craig Tiller <ctiller@google.com>2017-08-29 14:16:14 -0700
commite944c5d4907732b2bc0a9c6e355ff1e6a65b37e2 (patch)
tree7be516e29013f54061921af50862ad736b8fb020 /src/core/lib/surface
parent33aeabad66e8083d47f47ddc4bafa4483f1585f8 (diff)
parent33b51aaa755b2e5f10aade60b67f216bff86cb36 (diff)
Merge branch 'stats' into stats_histo
Diffstat (limited to 'src/core/lib/surface')
-rw-r--r--src/core/lib/surface/alarm.c76
-rw-r--r--src/core/lib/surface/alarm_internal.h40
-rw-r--r--src/core/lib/surface/call.c463
-rw-r--r--src/core/lib/surface/call_test_only.h12
-rw-r--r--src/core/lib/surface/channel.c29
-rw-r--r--src/core/lib/surface/channel_ping.c2
-rw-r--r--src/core/lib/surface/completion_queue.c151
-rw-r--r--src/core/lib/surface/completion_queue.h5
-rw-r--r--src/core/lib/surface/init.c4
-rw-r--r--src/core/lib/surface/lame_client.cc19
-rw-r--r--src/core/lib/surface/server.c16
-rw-r--r--src/core/lib/surface/version.c2
12 files changed, 642 insertions, 177 deletions
diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c
index ef8405cca8..7d60b1de17 100644
--- a/src/core/lib/surface/alarm.c
+++ b/src/core/lib/surface/alarm.c
@@ -15,13 +15,21 @@
* limitations under the License.
*
*/
+#include "src/core/lib/surface/alarm_internal.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/surface/completion_queue.h"
+#ifndef NDEBUG
+grpc_tracer_flag grpc_trace_alarm_refcount =
+ GRPC_TRACER_INITIALIZER(false, "alarm_refcount");
+#endif
+
struct grpc_alarm {
+ gpr_refcount refs;
grpc_timer alarm;
grpc_closure on_alarm;
grpc_cq_completion completion;
@@ -31,13 +39,58 @@ struct grpc_alarm {
void *tag;
};
-static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
- grpc_cq_completion *c) {}
+static void alarm_ref(grpc_alarm *alarm) { gpr_ref(&alarm->refs); }
+
+static void alarm_unref(grpc_alarm *alarm) {
+ if (gpr_unref(&alarm->refs)) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_free(alarm);
+ }
+}
+
+#ifndef NDEBUG
+static void alarm_ref_dbg(grpc_alarm *alarm, const char *reason,
+ const char *file, int line) {
+ if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
+ gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "Alarm:%p ref %" PRIdPTR " -> %" PRIdPTR " %s", alarm, val,
+ val + 1, reason);
+ }
+
+ alarm_ref(alarm);
+}
+
+static void alarm_unref_dbg(grpc_alarm *alarm, const char *reason,
+ const char *file, int line) {
+ if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
+ gpr_atm val = gpr_atm_no_barrier_load(&alarm->refs.count);
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
+ "Alarm:%p Unref %" PRIdPTR " -> %" PRIdPTR " %s", alarm, val,
+ val - 1, reason);
+ }
+
+ alarm_unref(alarm);
+}
+#endif
+
+static void alarm_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_cq_completion *c) {
+ grpc_alarm *alarm = arg;
+ GRPC_ALARM_UNREF(alarm, "dequeue-end-op");
+}
static void alarm_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_alarm *alarm = arg;
- grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error,
- do_nothing_end_completion, NULL, &alarm->completion);
+
+ /* We are queuing an op on completion queue. This means, the alarm's structure
+ cannot be destroyed until the op is dequeued. Adding an extra ref
+ here and unref'ing when the op is dequeued will achieve this */
+ GRPC_ALARM_REF(alarm, "queue-end-op");
+ grpc_cq_end_op(exec_ctx, alarm->cq, alarm->tag, error, alarm_end_completion,
+ (void *)alarm, &alarm->completion);
}
grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
@@ -45,11 +98,19 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline,
grpc_alarm *alarm = gpr_malloc(sizeof(grpc_alarm));
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ gpr_ref_init(&alarm->refs, 1);
+
+#ifndef NDEBUG
+ if (GRPC_TRACER_ON(grpc_trace_alarm_refcount)) {
+ gpr_log(GPR_DEBUG, "Alarm:%p created (ref: 1)", alarm);
+ }
+#endif
+
GRPC_CQ_INTERNAL_REF(cq, "alarm");
alarm->cq = cq;
alarm->tag = tag;
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm,
grpc_schedule_on_exec_ctx);
grpc_timer_init(&exec_ctx, &alarm->alarm,
@@ -66,9 +127,6 @@ void grpc_alarm_cancel(grpc_alarm *alarm) {
}
void grpc_alarm_destroy(grpc_alarm *alarm) {
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_alarm_cancel(alarm);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, alarm->cq, "alarm");
- gpr_free(alarm);
- grpc_exec_ctx_finish(&exec_ctx);
+ GRPC_ALARM_UNREF(alarm, "alarm_destroy");
}
diff --git a/src/core/lib/surface/alarm_internal.h b/src/core/lib/surface/alarm_internal.h
new file mode 100644
index 0000000000..7f2126c5c9
--- /dev/null
+++ b/src/core/lib/surface/alarm_internal.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2015-2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_SURFACE_ALARM_INTERNAL_H
+#define GRPC_CORE_LIB_SURFACE_ALARM_INTERNAL_H
+
+#include <grpc/support/log.h>
+#include "src/core/lib/debug/trace.h"
+
+#ifndef NDEBUG
+
+extern grpc_tracer_flag grpc_trace_alarm_refcount;
+
+#define GRPC_ALARM_REF(a, reason) alarm_ref_dbg(a, reason, __FILE__, __LINE__)
+#define GRPC_ALARM_UNREF(a, reason) \
+ alarm_unref_dbg(a, reason, __FILE__, __LINE__)
+
+#else /* !defined(NDEBUG) */
+
+#define GRPC_ALARM_REF(a, reason) alarm_ref(a)
+#define GRPC_ALARM_UNREF(a, reason) alarm_unref(a)
+
+#endif /* defined(NDEBUG) */
+
+#endif /* GRPC_CORE_LIB_SURFACE_ALARM_INTERNAL_H */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 2f42726c7f..03eaaf99ac 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -122,6 +122,7 @@ typedef struct batch_control {
bool is_closure;
} notify_tag;
} completion_data;
+ grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
@@ -145,9 +146,13 @@ typedef struct {
grpc_call *sibling_prev;
} child_call;
+#define RECV_NONE ((gpr_atm)0)
+#define RECV_INITIAL_METADATA_FIRST ((gpr_atm)1)
+
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
+ grpc_call_combiner call_combiner;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
@@ -171,9 +176,6 @@ struct grpc_call {
gpr_atm any_ops_sent_atm;
gpr_atm received_final_op_atm;
- /* have we received initial metadata */
- bool has_initial_md_been_received;
-
batch_control *active_batches[MAX_CONCURRENT_BATCHES];
grpc_transport_stream_op_batch_payload stream_op_payload;
@@ -184,6 +186,11 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
+ grpc_metadata compression_md;
+
+ // A char* indicating the peer name.
+ gpr_atm peer_string;
+
/* Packed received call statuses from various sources */
gpr_atm status[STATUS_SOURCE_COUNT];
@@ -193,8 +200,12 @@ struct grpc_call {
/* Compression algorithm for *incoming* data */
grpc_compression_algorithm incoming_compression_algorithm;
+ /* Stream compression algorithm for *incoming* data */
+ grpc_stream_compression_algorithm incoming_stream_compression_algorithm;
/* Supported encodings (compression algorithms), a bitset */
uint32_t encodings_accepted_by_peer;
+ /* Supported stream encodings (stream compression algorithms), a bitset */
+ uint32_t stream_encodings_accepted_by_peer;
/* Contexts for various subsystems (security, tracing, ...). */
grpc_call_context_element context[GRPC_CONTEXT_COUNT];
@@ -227,7 +238,23 @@ struct grpc_call {
} server;
} final_op;
- void *saved_receiving_stream_ready_bctlp;
+ /* recv_state can contain one of the following values:
+ RECV_NONE : : no initial metadata and messages received
+ RECV_INITIAL_METADATA_FIRST : received initial metadata first
+ a batch_control* : received messages first
+
+ +------1------RECV_NONE------3-----+
+ | |
+ | |
+ v v
+ RECV_INITIAL_METADATA_FIRST receiving_stream_ready_bctlp
+ | ^ | ^
+ | | | |
+ +-----2-----+ +-----4-----+
+
+ For 1, 4: See receiving_initial_metadata_ready() function
+ For 2, 3: See receiving_stream_ready() function */
+ gpr_atm recv_state;
};
grpc_tracer_flag grpc_call_error_trace =
@@ -242,8 +269,9 @@ grpc_tracer_flag grpc_compression_trace =
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op);
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op,
+ grpc_closure *start_batch_closure);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -308,6 +336,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
sizeof(grpc_call) + channel_stack->call_stack_size);
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
+ grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
call->channel = args->channel;
call->cq = args->cq;
@@ -416,7 +445,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
.path = path,
.start_time = call->start_time,
.deadline = send_deadline,
- .arena = call->arena};
+ .arena = call->arena,
+ .call_combiner = &call->call_combiner};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
@@ -483,6 +513,8 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
grpc_call *c = call;
grpc_channel *channel = c->channel;
+ grpc_call_combiner_destroy(&c->call_combiner);
+ gpr_free((char *)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
}
@@ -582,30 +614,37 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
return GRPC_CALL_OK;
}
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op) {
- grpc_call_element *elem;
-
- GPR_TIMER_BEGIN("execute_op", 0);
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
- GPR_TIMER_END("execute_op", 0);
+// This is called via the call combiner to start sending a batch down
+// the filter stack.
+static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *ignored) {
+ grpc_transport_stream_op_batch *batch = arg;
+ grpc_call *call = batch->handler_private.extra_arg;
+ GPR_TIMER_BEGIN("execute_batch", 0);
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
+ GPR_TIMER_END("execute_batch", 0);
+}
+
+// start_batch_closure points to a caller-allocated closure to be used
+// for entering the call combiner.
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *batch,
+ grpc_closure *start_batch_closure) {
+ batch->handler_private.extra_arg = call;
+ GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
+ GRPC_ERROR_NONE, "executing batch");
}
char *grpc_call_get_peer(grpc_call *call) {
- grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- char *result;
- GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
- result = elem->filter->get_peer(&exec_ctx, elem);
- if (result == NULL) {
- result = grpc_channel_get_target(call->channel);
- }
- if (result == NULL) {
- result = gpr_strdup("unknown");
- }
- grpc_exec_ctx_finish(&exec_ctx);
- return result;
+ char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
+ if (peer_string != NULL) return gpr_strdup(peer_string);
+ peer_string = grpc_channel_get_target(call->channel);
+ if (peer_string != NULL) return peer_string;
+ return gpr_strdup("unknown");
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -632,24 +671,47 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return GRPC_CALL_OK;
}
-static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
+typedef struct {
+ grpc_call *call;
+ grpc_closure start_batch;
+ grpc_closure finish_batch;
+} cancel_state;
+
+// The on_complete callback used when sending a cancel_stream batch down
+// the filter stack. Yields the call combiner when the batch is done.
+static void done_termination(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
+ cancel_state *state = (cancel_state *)arg;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
+ "on_complete for cancel_stream op");
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
+ gpr_free(state);
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
+ // Inform the call combiner of the cancellation, so that it can cancel
+ // any in-flight asynchronous actions that may be holding the call
+ // combiner. This ensures that the cancel_stream batch can be sent
+ // down the filter stack in a timely manner.
+ grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
- GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx));
+ cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
+ state->call = c;
+ GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
+ grpc_schedule_on_exec_ctx);
+ grpc_transport_stream_op_batch *op =
+ grpc_make_transport_stream_op(&state->finish_batch);
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_op(exec_ctx, c, op);
+ execute_batch(exec_ctx, c, op, &state->start_batch);
}
static grpc_error *error_from_status(grpc_status_code status,
const char *description) {
+ // copying 'description' is needed to ensure the grpc_call_cancel_with_status
+ // guarantee that can be short-lived.
return grpc_error_set_int(
grpc_error_set_str(GRPC_ERROR_CREATE_FROM_COPIED_STRING(description),
GRPC_ERROR_STR_GRPC_MESSAGE,
@@ -756,6 +818,12 @@ static void set_incoming_compression_algorithm(
call->incoming_compression_algorithm = algo;
}
+static void set_incoming_stream_compression_algorithm(
+ grpc_call *call, grpc_stream_compression_algorithm algo) {
+ GPR_ASSERT(algo < GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT);
+ call->incoming_stream_compression_algorithm = algo;
+}
+
grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
grpc_call *call) {
grpc_compression_algorithm algorithm;
@@ -769,6 +837,13 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked(
call->encodings_accepted_by_peer);
}
+static grpc_stream_compression_algorithm
+stream_compression_algorithm_for_level_locked(
+ grpc_call *call, grpc_stream_compression_level level) {
+ return grpc_stream_compression_algorithm_for_level(
+ level, call->stream_encodings_accepted_by_peer);
+}
+
uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) {
uint32_t flags;
flags = call->test_only_last_message_flags;
@@ -823,13 +898,71 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
(void *)(((uintptr_t)call->encodings_accepted_by_peer) + 1));
}
+static void set_stream_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx,
+ grpc_call *call,
+ grpc_mdelem mdel) {
+ size_t i;
+ grpc_stream_compression_algorithm algorithm;
+ grpc_slice_buffer accept_encoding_parts;
+ grpc_slice accept_encoding_slice;
+ void *accepted_user_data;
+
+ accepted_user_data =
+ grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
+ if (accepted_user_data != NULL) {
+ call->stream_encodings_accepted_by_peer =
+ (uint32_t)(((uintptr_t)accepted_user_data) - 1);
+ return;
+ }
+
+ accept_encoding_slice = GRPC_MDVALUE(mdel);
+ grpc_slice_buffer_init(&accept_encoding_parts);
+ grpc_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
+
+ /* Always support no compression */
+ GPR_BITSET(&call->stream_encodings_accepted_by_peer,
+ GRPC_STREAM_COMPRESS_NONE);
+ for (i = 0; i < accept_encoding_parts.count; i++) {
+ grpc_slice accept_encoding_entry_slice = accept_encoding_parts.slices[i];
+ if (grpc_stream_compression_algorithm_parse(accept_encoding_entry_slice,
+ &algorithm)) {
+ GPR_BITSET(&call->stream_encodings_accepted_by_peer, algorithm);
+ } else {
+ char *accept_encoding_entry_str =
+ grpc_slice_to_c_string(accept_encoding_entry_slice);
+ gpr_log(GPR_ERROR,
+ "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
+ accept_encoding_entry_str);
+ gpr_free(accept_encoding_entry_str);
+ }
+ }
+
+ grpc_slice_buffer_destroy_internal(exec_ctx, &accept_encoding_parts);
+
+ grpc_mdelem_set_user_data(
+ mdel, destroy_encodings_accepted_by_peer,
+ (void *)(((uintptr_t)call->stream_encodings_accepted_by_peer) + 1));
+}
+
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
uint32_t encodings_accepted_by_peer;
encodings_accepted_by_peer = call->encodings_accepted_by_peer;
return encodings_accepted_by_peer;
}
-static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
+uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer(
+ grpc_call *call) {
+ uint32_t stream_encodings_accepted_by_peer;
+ stream_encodings_accepted_by_peer = call->stream_encodings_accepted_by_peer;
+ return stream_encodings_accepted_by_peer;
+}
+
+grpc_stream_compression_algorithm
+grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call) {
+ return call->incoming_stream_compression_algorithm;
+}
+
+static grpc_linked_mdelem *linked_from_md(const grpc_metadata *md) {
return (grpc_linked_mdelem *)&md->internal_data;
}
@@ -853,7 +986,7 @@ static int prepare_application_metadata(
for (i = 0; i < total_count; i++) {
const grpc_metadata *md =
get_md_elem(metadata, additional_metadata, i, count);
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ grpc_linked_mdelem *l = linked_from_md(md);
GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
if (!GRPC_LOG_IF_ERROR("validate_metadata",
grpc_validate_header_key_is_legal(md->key))) {
@@ -870,7 +1003,7 @@ static int prepare_application_metadata(
for (int j = 0; j < i; j++) {
const grpc_metadata *md =
get_md_elem(metadata, additional_metadata, j, count);
- grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
+ grpc_linked_mdelem *l = linked_from_md(md);
GRPC_MDELEM_UNREF(exec_ctx, l->md);
}
return 0;
@@ -888,9 +1021,12 @@ static int prepare_application_metadata(
}
for (i = 0; i < total_count; i++) {
grpc_metadata *md = get_md_elem(metadata, additional_metadata, i, count);
- GRPC_LOG_IF_ERROR(
- "prepare_application_metadata",
- grpc_metadata_batch_link_tail(exec_ctx, batch, linked_from_md(md)));
+ grpc_linked_mdelem *l = linked_from_md(md);
+ grpc_error *error = grpc_metadata_batch_link_tail(exec_ctx, batch, l);
+ if (error != GRPC_ERROR_NONE) {
+ GRPC_MDELEM_UNREF(exec_ctx, l->md);
+ }
+ GRPC_LOG_IF_ERROR("prepare_application_metadata", error);
}
call->send_extra_metadata_count = 0;
@@ -937,6 +1073,22 @@ static grpc_compression_algorithm decode_compression(grpc_mdelem md) {
return algorithm;
}
+static grpc_stream_compression_algorithm decode_stream_compression(
+ grpc_mdelem md) {
+ grpc_stream_compression_algorithm algorithm =
+ grpc_stream_compression_algorithm_from_slice(GRPC_MDVALUE(md));
+ if (algorithm == GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
+ char *md_c_str = grpc_slice_to_c_string(GRPC_MDVALUE(md));
+ gpr_log(GPR_ERROR,
+ "Invalid incoming stream compression algorithm: '%s'. Interpreting "
+ "incoming data as uncompressed.",
+ md_c_str);
+ gpr_free(md_c_str);
+ return GRPC_STREAM_COMPRESS_NONE;
+ }
+ return algorithm;
+}
+
static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
int is_trailing) {
if (b->list.count == 0) return;
@@ -961,7 +1113,19 @@ static void publish_app_metadata(grpc_call *call, grpc_metadata_batch *b,
static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch *b) {
- if (b->idx.named.grpc_encoding != NULL) {
+ if (b->idx.named.content_encoding != NULL) {
+ if (b->idx.named.grpc_encoding != NULL) {
+ gpr_log(GPR_ERROR,
+ "Received both content-encoding and grpc-encoding header. "
+ "Ignoring grpc-encoding.");
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_encoding);
+ }
+ GPR_TIMER_BEGIN("incoming_stream_compression_algorithm", 0);
+ set_incoming_stream_compression_algorithm(
+ call, decode_stream_compression(b->idx.named.content_encoding->md));
+ GPR_TIMER_END("incoming_stream_compression_algorithm", 0);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.content_encoding);
+ } else if (b->idx.named.grpc_encoding != NULL) {
GPR_TIMER_BEGIN("incoming_compression_algorithm", 0);
set_incoming_compression_algorithm(
call, decode_compression(b->idx.named.grpc_encoding->md));
@@ -975,6 +1139,13 @@ static void recv_initial_filter(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.grpc_accept_encoding);
GPR_TIMER_END("encodings_accepted_by_peer", 0);
}
+ if (b->idx.named.accept_encoding != NULL) {
+ GPR_TIMER_BEGIN("stream_encodings_accepted_by_peer", 0);
+ set_stream_encodings_accepted_by_peer(exec_ctx, call,
+ b->idx.named.accept_encoding->md);
+ grpc_metadata_batch_remove(exec_ctx, b, b->idx.named.accept_encoding);
+ GPR_TIMER_END("stream_encodings_accepted_by_peer", 0);
+ }
publish_app_metadata(call, b, false);
}
@@ -1291,19 +1462,73 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
}
- if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE ||
- call->receiving_stream == NULL) {
+ /* If recv_state is RECV_NONE, we will save the batch_control
+ * object with rel_cas, and will not use it after the cas. Its corresponding
+ * acq_load is in receiving_initial_metadata_ready() */
+ if (error != GRPC_ERROR_NONE || call->receiving_stream == NULL ||
+ !gpr_atm_rel_cas(&call->recv_state, RECV_NONE, (gpr_atm)bctlp)) {
process_data_after_md(exec_ctx, bctlp);
- } else {
- call->saved_receiving_stream_ready_bctlp = bctlp;
}
}
+// The recv_message_ready callback used when sending a batch containing
+// a recv_message op down the filter stack. Yields the call combiner
+// before processing the received message.
+static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx,
+ void *bctlp,
+ grpc_error *error) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
+ receiving_stream_ready(exec_ctx, bctlp, error);
+}
+
static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
- /* validate call->incoming_compression_algorithm */
- if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
+ /* validate compression algorithms */
+ if (call->incoming_stream_compression_algorithm !=
+ GRPC_STREAM_COMPRESS_NONE) {
+ const grpc_stream_compression_algorithm algo =
+ call->incoming_stream_compression_algorithm;
+ char *error_msg = NULL;
+ const grpc_compression_options compression_options =
+ grpc_channel_compression_options(call->channel);
+ if (algo >= GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) {
+ gpr_asprintf(&error_msg,
+ "Invalid stream compression algorithm value '%d'.", algo);
+ gpr_log(GPR_ERROR, "%s", error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ } else if (grpc_compression_options_is_stream_compression_algorithm_enabled(
+ &compression_options, algo) == 0) {
+ /* check if algorithm is supported by current channel config */
+ char *algo_name = NULL;
+ grpc_stream_compression_algorithm_name(algo, &algo_name);
+ gpr_asprintf(&error_msg, "Stream compression algorithm '%s' is disabled.",
+ algo_name);
+ gpr_log(GPR_ERROR, "%s", error_msg);
+ cancel_with_status(exec_ctx, call, STATUS_FROM_SURFACE,
+ GRPC_STATUS_UNIMPLEMENTED, error_msg);
+ }
+ gpr_free(error_msg);
+
+ GPR_ASSERT(call->stream_encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->stream_encodings_accepted_by_peer,
+ call->incoming_stream_compression_algorithm)) {
+ if (GRPC_TRACER_ON(grpc_compression_trace)) {
+ char *algo_name = NULL;
+ grpc_stream_compression_algorithm_name(
+ call->incoming_stream_compression_algorithm, &algo_name);
+ gpr_log(
+ GPR_ERROR,
+ "Stream compression algorithm (content-encoding = '%s') not "
+ "present in the bitset of accepted encodings (accept-encodings: "
+ "'0x%x')",
+ algo_name, call->stream_encodings_accepted_by_peer);
+ }
+ }
+ } else if (call->incoming_compression_algorithm != GRPC_COMPRESS_NONE) {
const grpc_compression_algorithm algo =
call->incoming_compression_algorithm;
char *error_msg = NULL;
@@ -1330,22 +1555,20 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
call->incoming_compression_algorithm = algo;
}
gpr_free(error_msg);
- }
- /* make sure the received grpc-encoding is amongst the ones listed in
- * grpc-accept-encoding */
- GPR_ASSERT(call->encodings_accepted_by_peer != 0);
- if (!GPR_BITGET(call->encodings_accepted_by_peer,
- call->incoming_compression_algorithm)) {
- if (GRPC_TRACER_ON(grpc_compression_trace)) {
- char *algo_name = NULL;
- grpc_compression_algorithm_name(call->incoming_compression_algorithm,
- &algo_name);
- gpr_log(GPR_ERROR,
- "Compression algorithm (grpc-encoding = '%s') not present in "
- "the bitset of accepted encodings (grpc-accept-encodings: "
- "'0x%x')",
- algo_name, call->encodings_accepted_by_peer);
+ GPR_ASSERT(call->encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->encodings_accepted_by_peer,
+ call->incoming_compression_algorithm)) {
+ if (GRPC_TRACER_ON(grpc_compression_trace)) {
+ char *algo_name = NULL;
+ grpc_compression_algorithm_name(call->incoming_compression_algorithm,
+ &algo_name);
+ gpr_log(GPR_ERROR,
+ "Compression algorithm (grpc-encoding = '%s') not present in "
+ "the bitset of accepted encodings (grpc-accept-encodings: "
+ "'0x%x')",
+ algo_name, call->encodings_accepted_by_peer);
+ }
}
}
}
@@ -1366,6 +1589,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
+ "recv_initial_metadata_ready");
+
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1385,12 +1611,31 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
}
}
- call->has_initial_md_been_received = true;
- if (call->saved_receiving_stream_ready_bctlp != NULL) {
- grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE(
- receiving_stream_ready, call->saved_receiving_stream_ready_bctlp,
- grpc_schedule_on_exec_ctx);
- call->saved_receiving_stream_ready_bctlp = NULL;
+ grpc_closure *saved_rsr_closure = NULL;
+ while (true) {
+ gpr_atm rsr_bctlp = gpr_atm_acq_load(&call->recv_state);
+ /* Should only receive initial metadata once */
+ GPR_ASSERT(rsr_bctlp != 1);
+ if (rsr_bctlp == 0) {
+ /* We haven't seen initial metadata and messages before, thus initial
+ * metadata is received first.
+ * no_barrier_cas is used, as this function won't access the batch_control
+ * object saved by receiving_stream_ready() if the initial metadata is
+ * received first. */
+ if (gpr_atm_no_barrier_cas(&call->recv_state, RECV_NONE,
+ RECV_INITIAL_METADATA_FIRST)) {
+ break;
+ }
+ } else {
+ /* Already received messages */
+ saved_rsr_closure = GRPC_CLOSURE_CREATE(receiving_stream_ready,
+ (batch_control *)rsr_bctlp,
+ grpc_schedule_on_exec_ctx);
+ /* No need to modify recv_state */
+ break;
+ }
+ }
+ if (saved_rsr_closure != NULL) {
GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error));
}
@@ -1400,7 +1645,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
-
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
@@ -1420,15 +1666,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
- // sent_initial_metadata guards against variable reuse.
- grpc_metadata compression_md;
-
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
if (nops == 0) {
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(exec_ctx, call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, NULL,
gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1470,31 +1713,60 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* process compression level */
- memset(&compression_md, 0, sizeof(compression_md));
+ memset(&call->compression_md, 0, sizeof(call->compression_md));
size_t additional_metadata_count = 0;
- grpc_compression_level effective_compression_level;
+ grpc_compression_level effective_compression_level =
+ GRPC_COMPRESS_LEVEL_NONE;
+ grpc_stream_compression_level effective_stream_compression_level =
+ GRPC_STREAM_COMPRESS_LEVEL_NONE;
bool level_set = false;
- if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
+ bool stream_compression = false;
+ if (op->data.send_initial_metadata.maybe_stream_compression_level
+ .is_set) {
+ effective_stream_compression_level =
+ op->data.send_initial_metadata.maybe_stream_compression_level
+ .level;
+ level_set = true;
+ stream_compression = true;
+ } else if (op->data.send_initial_metadata.maybe_compression_level
+ .is_set) {
effective_compression_level =
op->data.send_initial_metadata.maybe_compression_level.level;
level_set = true;
} else {
const grpc_compression_options copts =
grpc_channel_compression_options(call->channel);
- level_set = copts.default_level.is_set;
- if (level_set) {
+ if (copts.default_stream_compression_level.is_set) {
+ level_set = true;
+ effective_stream_compression_level =
+ copts.default_stream_compression_level.level;
+ stream_compression = true;
+ } else if (copts.default_level.is_set) {
+ level_set = true;
effective_compression_level = copts.default_level.level;
}
}
if (level_set && !call->is_client) {
- const grpc_compression_algorithm calgo =
- compression_algorithm_for_level_locked(
- call, effective_compression_level);
- // the following will be picked up by the compress filter and used as
- // the call's compression algorithm.
- compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
- compression_md.value = grpc_compression_algorithm_slice(calgo);
- additional_metadata_count++;
+ if (stream_compression) {
+ const grpc_stream_compression_algorithm calgo =
+ stream_compression_algorithm_for_level_locked(
+ call, effective_stream_compression_level);
+ call->compression_md.key =
+ GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
+ call->compression_md.value =
+ grpc_stream_compression_algorithm_slice(calgo);
+ } else {
+ const grpc_compression_algorithm calgo =
+ compression_algorithm_for_level_locked(
+ call, effective_compression_level);
+ /* the following will be picked up by the compress filter and used
+ * as the call's compression algorithm. */
+ call->compression_md.key =
+ GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ call->compression_md.value =
+ grpc_compression_algorithm_slice(calgo);
+ additional_metadata_count++;
+ }
}
if (op->data.send_initial_metadata.count + additional_metadata_count >
@@ -1507,7 +1779,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
- &compression_md, (int)additional_metadata_count)) {
+ &call->compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1519,6 +1791,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
+ if (call->is_client) {
+ stream_op_payload->send_initial_metadata.peer_string =
+ &call->peer_string;
+ }
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1651,6 +1927,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
+ if (!call->is_client) {
+ stream_op_payload->recv_initial_metadata.peer_string =
+ &call->peer_string;
+ }
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
@@ -1667,8 +1947,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
- GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready,
- bctl, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
+ receiving_stream_ready_in_call_combiner, bctl,
+ grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
num_completion_callbacks_needed++;
@@ -1729,7 +2010,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
- grpc_cq_begin_op(call->cq, notify_tag);
+ GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
@@ -1738,7 +2019,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_op(exec_ctx, call, stream_op);
+ execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
@@ -1850,6 +2131,8 @@ const char *grpc_call_error_to_string(grpc_call_error error) {
return "GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH";
case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
return "GRPC_CALL_ERROR_TOO_MANY_OPERATIONS";
+ case GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN:
+ return "GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN";
case GRPC_CALL_OK:
return "GRPC_CALL_OK";
}
diff --git a/src/core/lib/surface/call_test_only.h b/src/core/lib/surface/call_test_only.h
index 2f1b80bfd7..a5a01b3679 100644
--- a/src/core/lib/surface/call_test_only.h
+++ b/src/core/lib/surface/call_test_only.h
@@ -42,6 +42,18 @@ uint32_t grpc_call_test_only_get_message_flags(grpc_call *call);
* To be indexed by grpc_compression_algorithm enum values. */
uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call);
+/** Returns a bitset for the stream encodings (stream compression algorithms)
+ * supported by \a call's peer.
+ *
+ * To be indexed by grpc_stream_compression_algorithm enum values. */
+uint32_t grpc_call_test_only_get_stream_encodings_accepted_by_peer(
+ grpc_call *call);
+
+/** Returns the incoming stream compression algorithm (content-encoding header)
+ * received by a call. */
+grpc_stream_compression_algorithm
+grpc_call_test_only_get_incoming_stream_encodings(grpc_call *call);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index 5780a18ce8..850fbe6a69 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -142,6 +142,16 @@ grpc_channel *grpc_channel_create_with_builder(
GRPC_COMPRESS_LEVEL_NONE,
GRPC_COMPRESS_LEVEL_COUNT - 1});
} else if (0 == strcmp(args->args[i].key,
+ GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_LEVEL)) {
+ channel->compression_options.default_stream_compression_level.is_set =
+ true;
+ channel->compression_options.default_stream_compression_level.level =
+ (grpc_stream_compression_level)grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){GRPC_STREAM_COMPRESS_LEVEL_NONE,
+ GRPC_STREAM_COMPRESS_LEVEL_NONE,
+ GRPC_STREAM_COMPRESS_LEVEL_COUNT - 1});
+ } else if (0 == strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
channel->compression_options.default_algorithm.is_set = true;
channel->compression_options.default_algorithm.algorithm =
@@ -149,12 +159,31 @@ grpc_channel *grpc_channel_create_with_builder(
&args->args[i],
(grpc_integer_options){GRPC_COMPRESS_NONE, GRPC_COMPRESS_NONE,
GRPC_COMPRESS_ALGORITHMS_COUNT - 1});
+ } else if (0 == strcmp(args->args[i].key,
+ GRPC_STREAM_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM)) {
+ channel->compression_options.default_stream_compression_algorithm.is_set =
+ true;
+ channel->compression_options.default_stream_compression_algorithm
+ .algorithm =
+ (grpc_stream_compression_algorithm)grpc_channel_arg_get_integer(
+ &args->args[i],
+ (grpc_integer_options){
+ GRPC_STREAM_COMPRESS_NONE, GRPC_STREAM_COMPRESS_NONE,
+ GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT - 1});
} else if (0 ==
strcmp(args->args[i].key,
GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
channel->compression_options.enabled_algorithms_bitset =
(uint32_t)args->args[i].value.integer |
0x1; /* always support no compression */
+ } else if (0 ==
+ strcmp(
+ args->args[i].key,
+ GRPC_STREAM_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET)) {
+ channel->compression_options
+ .enabled_stream_compression_algorithms_bitset =
+ (uint32_t)args->args[i].value.integer |
+ 0x1; /* always support no compression */
}
}
diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c
index 80eb80af78..e85b308850 100644
--- a/src/core/lib/surface/channel_ping.c
+++ b/src/core/lib/surface/channel_ping.c
@@ -59,7 +59,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq,
GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx);
op->send_ping = &pr->closure;
op->bind_pollset = grpc_cq_pollset(cq);
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
top_elem->filter->start_transport_op(&exec_ctx, top_elem, op);
grpc_exec_ctx_finish(&exec_ctx);
}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 978d7b4171..10e4e5ab0c 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -196,7 +196,7 @@ typedef struct cq_vtable {
void (*init)(void *data);
void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq);
void (*destroy)(void *data);
- void (*begin_op)(grpc_completion_queue *cq, void *tag);
+ bool (*begin_op)(grpc_completion_queue *cq, void *tag);
void (*end_op)(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cq, void *tag,
grpc_error *error,
void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
@@ -235,7 +235,8 @@ typedef struct cq_next_data {
/* Number of outstanding events (+1 if not shut down) */
gpr_atm pending_events;
- int shutdown_called;
+ /** 0 initially. 1 once we initiated shutdown */
+ bool shutdown_called;
} cq_next_data;
typedef struct cq_pluck_data {
@@ -244,15 +245,20 @@ typedef struct cq_pluck_data {
grpc_cq_completion *completed_tail;
/** Number of pending events (+1 if we're not shutdown) */
- gpr_refcount pending_events;
+ gpr_atm pending_events;
/** Counter of how many things have ever been queued on this completion queue
useful for avoiding locks to check the queue */
gpr_atm things_queued_ever;
- /** 0 initially, 1 once we've begun shutting down */
+ /** 0 initially. 1 once we completed shutting */
+ /* TODO: (sreek) This is not needed since (shutdown == 1) if and only if
+ * (pending_events == 0). So consider removing this in future and use
+ * pending_events */
gpr_atm shutdown;
- int shutdown_called;
+
+ /** 0 initially. 1 once we initiated shutdown */
+ bool shutdown_called;
int num_pluckers;
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
@@ -288,8 +294,8 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq);
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag);
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag);
static void cq_end_op_for_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq, void *tag,
@@ -436,7 +442,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal(
static void cq_init_next(void *ptr) {
cq_next_data *cqd = ptr;
- /* Initial ref is dropped by grpc_completion_queue_shutdown */
+ /* Initial count is dropped by grpc_completion_queue_shutdown */
gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->shutdown_called = false;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
@@ -451,12 +457,12 @@ static void cq_destroy_next(void *ptr) {
static void cq_init_pluck(void *ptr) {
cq_pluck_data *cqd = ptr;
- /* Initial ref is dropped by grpc_completion_queue_shutdown */
- gpr_ref_init(&cqd->pending_events, 1);
+ /* Initial count is dropped by grpc_completion_queue_shutdown */
+ gpr_atm_no_barrier_store(&cqd->pending_events, 1);
cqd->completed_tail = &cqd->completed_head;
cqd->completed_head.next = (uintptr_t)cqd->completed_tail;
gpr_atm_no_barrier_store(&cqd->shutdown, 0);
- cqd->shutdown_called = 0;
+ cqd->shutdown_called = false;
cqd->num_pluckers = 0;
gpr_atm_no_barrier_store(&cqd->things_queued_ever, 0);
}
@@ -522,33 +528,6 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx,
}
}
-static void cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
- cq_next_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_atm_no_barrier_fetch_add(&cqd->pending_events, 1);
-}
-
-static void cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
- cq_pluck_data *cqd = DATA_FROM_CQ(cq);
- GPR_ASSERT(!cqd->shutdown_called);
- gpr_ref(&cqd->pending_events);
-}
-
-void grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
-#ifndef NDEBUG
- gpr_mu_lock(cq->mu);
- if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
- cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
- cq->outstanding_tags =
- gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
- cq->outstanding_tag_capacity);
- }
- cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
- gpr_mu_unlock(cq->mu);
-#endif
- cq->vtable->begin_op(cq, tag);
-}
-
#ifndef NDEBUG
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
int found = 0;
@@ -576,6 +555,49 @@ static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {
static void cq_check_tag(grpc_completion_queue *cq, void *tag, bool lock_cq) {}
#endif
+/* Atomically increments a counter only if the counter is not zero. Returns
+ * true if the increment was successful; false if the counter is zero */
+static bool atm_inc_if_nonzero(gpr_atm *counter) {
+ while (true) {
+ gpr_atm count = gpr_atm_no_barrier_load(counter);
+ /* If zero, we are done. If not, we must to a CAS (instead of an atomic
+ * increment) to maintain the contract: do not increment the counter if it
+ * is zero. */
+ if (count == 0) {
+ return false;
+ } else if (gpr_atm_no_barrier_cas(counter, count, count + 1)) {
+ break;
+ }
+ }
+
+ return true;
+}
+
+static bool cq_begin_op_for_next(grpc_completion_queue *cq, void *tag) {
+ cq_next_data *cqd = DATA_FROM_CQ(cq);
+ return atm_inc_if_nonzero(&cqd->pending_events);
+}
+
+static bool cq_begin_op_for_pluck(grpc_completion_queue *cq, void *tag) {
+ cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ return atm_inc_if_nonzero(&cqd->pending_events);
+}
+
+bool grpc_cq_begin_op(grpc_completion_queue *cq, void *tag) {
+#ifndef NDEBUG
+ gpr_mu_lock(cq->mu);
+ if (cq->outstanding_tag_count == cq->outstanding_tag_capacity) {
+ cq->outstanding_tag_capacity = GPR_MAX(4, 2 * cq->outstanding_tag_capacity);
+ cq->outstanding_tags =
+ gpr_realloc(cq->outstanding_tags, sizeof(*cq->outstanding_tags) *
+ cq->outstanding_tag_capacity);
+ }
+ cq->outstanding_tags[cq->outstanding_tag_count++] = tag;
+ gpr_mu_unlock(cq->mu);
+#endif
+ return cq->vtable->begin_op(cq, tag);
+}
+
/* Queue a GRPC_OP_COMPLETED operation to a completion queue (with a
* completion
* type of GRPC_CQ_NEXT) */
@@ -696,8 +718,10 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
((uintptr_t)storage) | (1u & (uintptr_t)cqd->completed_tail->next);
cqd->completed_tail = storage;
- int shutdown = gpr_unref(&cqd->pending_events);
- if (!shutdown) {
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
+ cq_finish_shutdown_pluck(exec_ctx, cq);
+ gpr_mu_unlock(cq->mu);
+ } else {
grpc_pollset_worker *pluck_worker = NULL;
for (int i = 0; i < cqd->num_pluckers; i++) {
if (cqd->pluckers[i].tag == tag) {
@@ -717,9 +741,6 @@ static void cq_end_op_for_pluck(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(kick_error);
}
- } else {
- cq_finish_shutdown_pluck(exec_ctx, cq);
- gpr_mu_unlock(cq->mu);
}
GPR_TIMER_END("cq_end_op_for_pluck", 0);
@@ -855,8 +876,7 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
inconsistent state. If it is the latter, we shold do a 0-timeout poll
so that the thread comes back quickly from poll to make a second
attempt at popping. Not doing this can potentially deadlock this
- thread
- forever (if the deadline is infinity) */
+ thread forever (if the deadline is infinity) */
if (cq_event_queue_num_items(&cqd->queue) > 0) {
iteration_deadline = gpr_time_0(GPR_CLOCK_MONOTONIC);
}
@@ -869,10 +889,8 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
if (cq_event_queue_num_items(&cqd->queue) > 0) {
/* Go to the beginning of the loop. No point doing a poll because
(cq->shutdown == true) is only possible when there is no pending
- work
- (i.e cq->pending_events == 0) and any outstanding
- grpc_cq_completion
- events are already queued on this cq */
+ work (i.e cq->pending_events == 0) and any outstanding completion
+ events should have already been queued on this cq */
continue;
}
@@ -909,11 +927,6 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
is_finished_arg.first_loop = false;
}
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
- GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
- grpc_exec_ctx_finish(&exec_ctx);
- GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
-
if (cq_event_queue_num_items(&cqd->queue) > 0 &&
gpr_atm_no_barrier_load(&cqd->pending_events) > 0) {
gpr_mu_lock(cq->mu);
@@ -921,6 +934,11 @@ static grpc_event cq_next(grpc_completion_queue *cq, gpr_timespec deadline,
gpr_mu_unlock(cq->mu);
}
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cq, &ret);
+ GRPC_CQ_INTERNAL_UNREF(&exec_ctx, cq, "next");
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(is_finished_arg.stolen_completion == NULL);
+
GPR_TIMER_END("grpc_completion_queue_next", 0);
return ret;
@@ -947,15 +965,20 @@ static void cq_shutdown_next(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_next_data *cqd = DATA_FROM_CQ(cq);
+ /* Need an extra ref for cq here because:
+ * We call cq_finish_shutdown_next() below, that would call pollset shutdown.
+ * Pollset shutdown decrements the cq ref count which can potentially destroy
+ * the cq (if that happens to be the last ref).
+ * Creating an extra ref here prevents the cq from getting destroyed while
+ * this function is still active */
GRPC_CQ_INTERNAL_REF(cq, "shutting_down");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down");
- GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
return;
}
- cqd->shutdown_called = 1;
+ cqd->shutdown_called = true;
if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_next(exec_ctx, cq);
}
@@ -1167,21 +1190,31 @@ static void cq_finish_shutdown_pluck(grpc_exec_ctx *exec_ctx,
&cq->pollset_shutdown_done);
}
+/* NOTE: This function is almost exactly identical to cq_shutdown_next() but
+ * merging them is a bit tricky and probably not worth it */
static void cq_shutdown_pluck(grpc_exec_ctx *exec_ctx,
grpc_completion_queue *cq) {
cq_pluck_data *cqd = DATA_FROM_CQ(cq);
+ /* Need an extra ref for cq here because:
+ * We call cq_finish_shutdown_pluck() below, that would call pollset shutdown.
+ * Pollset shutdown decrements the cq ref count which can potentially destroy
+ * the cq (if that happens to be the last ref).
+ * Creating an extra ref here prevents the cq from getting destroyed while
+ * this function is still active */
+ GRPC_CQ_INTERNAL_REF(cq, "shutting_down (pluck cq)");
gpr_mu_lock(cq->mu);
if (cqd->shutdown_called) {
gpr_mu_unlock(cq->mu);
- GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
return;
}
- cqd->shutdown_called = 1;
- if (gpr_unref(&cqd->pending_events)) {
+ cqd->shutdown_called = true;
+ if (gpr_atm_full_fetch_add(&cqd->pending_events, -1) == 1) {
cq_finish_shutdown_pluck(exec_ctx, cq);
}
gpr_mu_unlock(cq->mu);
+ GRPC_CQ_INTERNAL_UNREF(exec_ctx, cq, "shutting_down (pluck cq)");
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h
index af44482513..69d144bd95 100644
--- a/src/core/lib/surface/completion_queue.h
+++ b/src/core/lib/surface/completion_queue.h
@@ -72,8 +72,9 @@ void grpc_cq_internal_unref(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made.
- \a tag is currently used only in debug builds. */
-void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
+ \a tag is currently used only in debug builds. Return true on success, and
+ false if completion_queue has been shutdown. */
+bool grpc_cq_begin_op(grpc_completion_queue *cc, void *tag);
/* Queue a GRPC_OP_COMPLETED operation; tag must correspond to the tag passed to
grpc_cq_begin_op */
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 822056af5a..280315036f 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -31,12 +31,14 @@
#include "src/core/lib/debug/stats.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/surface/alarm_internal.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/call.h"
#include "src/core/lib/surface/channel_init.h"
@@ -128,6 +130,7 @@ void grpc_init(void) {
grpc_register_tracer(&grpc_trace_channel_stack_builder);
grpc_register_tracer(&grpc_http1_trace);
grpc_register_tracer(&grpc_cq_pluck_trace); // default on
+ grpc_register_tracer(&grpc_call_combiner_trace);
grpc_register_tracer(&grpc_combiner_trace);
grpc_register_tracer(&grpc_server_channel_trace);
grpc_register_tracer(&grpc_bdp_estimator_trace);
@@ -137,6 +140,7 @@ void grpc_init(void) {
grpc_register_tracer(&grpc_call_error_trace);
#ifndef NDEBUG
grpc_register_tracer(&grpc_trace_pending_tags);
+ grpc_register_tracer(&grpc_trace_alarm_refcount);
grpc_register_tracer(&grpc_trace_cq_refcount);
grpc_register_tracer(&grpc_trace_closure);
grpc_register_tracer(&grpc_trace_error_refcount);
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index a0791080a9..6286f9159d 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -40,6 +40,7 @@ namespace grpc_core {
namespace {
struct CallData {
+ grpc_call_combiner *call_combiner;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
grpc_core::atomic<bool> filled_metadata;
@@ -52,14 +53,14 @@ struct ChannelData {
static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *mdb) {
- CallData *calld = static_cast<CallData *>(elem->call_data);
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
bool expected = false;
if (!calld->filled_metadata.compare_exchange_strong(
expected, true, grpc_core::memory_order_relaxed,
grpc_core::memory_order_relaxed)) {
return;
}
- ChannelData *chand = static_cast<ChannelData *>(elem->channel_data);
+ ChannelData *chand = reinterpret_cast<ChannelData *>(elem->channel_data);
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(chand->error_code, tmp);
calld->status.md = grpc_mdelem_from_slices(
@@ -79,6 +80,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void lame_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
if (op->recv_initial_metadata) {
fill_metadata(exec_ctx, elem,
op->payload->recv_initial_metadata.recv_initial_metadata);
@@ -87,12 +89,8 @@ static void lame_start_transport_stream_op_batch(
op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
-}
-
-static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- return NULL;
+ exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
+ calld->call_combiner);
}
static void lame_get_channel_info(grpc_exec_ctx *exec_ctx,
@@ -122,6 +120,8 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
+ calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -156,7 +156,6 @@ extern "C" const grpc_channel_filter grpc_lame_filter = {
sizeof(grpc_core::ChannelData),
grpc_core::init_channel_elem,
grpc_core::destroy_channel_elem,
- grpc_core::lame_get_peer,
grpc_core::lame_get_channel_info,
"lame-client",
};
@@ -176,7 +175,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
"error_message=%s)",
3, (target, (int)error_code, error_message));
GPR_ASSERT(elem->filter == &grpc_lame_filter);
- auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data);
+ auto chand = reinterpret_cast<grpc_core::ChannelData *>(elem->channel_data);
chand->error_code = error_code;
chand->error_message = error_message;
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index fce7f8dca1..8582d826ca 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -789,7 +789,6 @@ static void server_mutate_op(grpc_call_element *elem,
static void server_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -962,7 +961,6 @@ const grpc_channel_filter grpc_server_top_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server",
};
@@ -1259,7 +1257,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server,
}
/* stay locked, and gather up some stuff to do */
- grpc_cq_begin_op(cq, tag);
+ GPR_ASSERT(grpc_cq_begin_op(cq, tag));
if (server->shutdown_published) {
grpc_cq_end_op(&exec_ctx, cq, tag, GRPC_ERROR_NONE, done_published_shutdown,
NULL, gpr_malloc(sizeof(grpc_cq_completion)));
@@ -1446,7 +1444,11 @@ grpc_call_error grpc_server_request_call(
error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
details->reserved = NULL;
rc->cq_idx = cq_idx;
rc->type = BATCH_CALL;
@@ -1496,7 +1498,11 @@ grpc_call_error grpc_server_request_registered_call(
error = GRPC_CALL_ERROR_PAYLOAD_TYPE_MISMATCH;
goto done;
}
- grpc_cq_begin_op(cq_for_notification, tag);
+ if (grpc_cq_begin_op(cq_for_notification, tag) == false) {
+ gpr_free(rc);
+ error = GRPC_CALL_ERROR_COMPLETION_QUEUE_SHUTDOWN;
+ goto done;
+ }
rc->cq_idx = cq_idx;
rc->type = REGISTERED_CALL;
rc->server = server;
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index 8cef15da80..96c16105e7 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -23,4 +23,4 @@
const char *grpc_version_string(void) { return "4.0.0-dev"; }
-const char *grpc_g_stands_for(void) { return "gregarious"; }
+const char *grpc_g_stands_for(void) { return "gambit"; }