aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib
diff options
context:
space:
mode:
authorGravatar Mark D. Roth <roth@google.com>2017-08-25 09:03:33 -0700
committerGravatar Mark D. Roth <roth@google.com>2017-08-25 09:03:33 -0700
commit76e264b8dfa260090c2d545c9c6f1a35c5f50def (patch)
treee6f61fb6571383cd13a0db38f52e9e987c1be41e /src/core/lib
parent14a690ddcd65d6ba16d13ee0faa0777561627041 (diff)
Implement call combiner.
Diffstat (limited to 'src/core/lib')
-rw-r--r--src/core/lib/channel/channel_stack.c16
-rw-r--r--src/core/lib/channel/channel_stack.h11
-rw-r--r--src/core/lib/channel/connected_channel.c92
-rw-r--r--src/core/lib/iomgr/call_combiner.c180
-rw-r--r--src/core/lib/iomgr/call_combiner.h104
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c183
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c89
-rw-r--r--src/core/lib/surface/call.c148
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/core/lib/surface/lame_client.cc19
-rw-r--r--src/core/lib/surface/server.c2
-rw-r--r--src/core/lib/transport/transport.c21
-rw-r--r--src/core/lib/transport/transport.h13
-rw-r--r--src/core/lib/transport/transport_impl.h3
-rw-r--r--src/core/lib/transport/transport_op_string.c7
15 files changed, 629 insertions, 261 deletions
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 0f8e33c4be..775c8bc667 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -233,15 +233,10 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
grpc_call_element *next_elem = elem + 1;
+ GRPC_CALL_LOG_OP(GPR_INFO, next_elem, op);
next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op);
}
-char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem) {
- grpc_call_element *next_elem = elem + 1;
- return next_elem->filter->get_peer(exec_ctx, next_elem);
-}
-
void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
const grpc_channel_info *channel_info) {
@@ -265,12 +260,3 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) {
return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE(
sizeof(grpc_call_stack)));
}
-
-void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_error *error) {
- grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL);
- op->cancel_stream = true;
- op->payload->cancel_stream.cancel_error = error;
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
-}
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index a80f8aa826..ae1cac31f7 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -40,6 +40,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/support/arena.h"
#include "src/core/lib/transport/transport.h"
@@ -71,6 +72,7 @@ typedef struct {
gpr_timespec start_time;
gpr_timespec deadline;
gpr_arena *arena;
+ grpc_call_combiner *call_combiner;
} grpc_call_element_args;
typedef struct {
@@ -150,9 +152,6 @@ typedef struct {
void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem);
- /* Implement grpc_call_get_peer() */
- char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
-
/* Implement grpc_channel_get_info() */
void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
const grpc_channel_info *channel_info);
@@ -271,8 +270,6 @@ void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
stack */
void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
grpc_transport_op *op);
-/* Pass through a request to get_peer to the next child element */
-char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem);
/* Pass through a request to get_channel_info() to the next child element */
void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -288,10 +285,6 @@ void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
grpc_call_element *elem,
grpc_transport_stream_op_batch *op);
-void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx,
- grpc_call_element *cur_elem,
- grpc_error *error);
-
extern grpc_tracer_flag grpc_trace_channel;
#define GRPC_CALL_LOG_OP(sev, elem, op) \
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index af06ca802e..8285226fc4 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -36,7 +36,57 @@ typedef struct connected_channel_channel_data {
grpc_transport *transport;
} channel_data;
-typedef struct connected_channel_call_data { void *unused; } call_data;
+typedef struct {
+ grpc_closure closure;
+ grpc_closure *original_closure;
+ grpc_call_combiner *call_combiner;
+ const char *reason;
+} callback_state;
+
+typedef struct connected_channel_call_data {
+ grpc_call_combiner *call_combiner;
+ // Closures used for returning results on the call combiner.
+ callback_state on_complete[6]; // Max number of pending batches.
+ callback_state recv_initial_metadata_ready;
+ callback_state recv_message_ready;
+} call_data;
+
+static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ callback_state *state = (callback_state *)arg;
+ GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner,
+ state->original_closure, GRPC_ERROR_REF(error),
+ state->reason);
+}
+
+static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *error) {
+ run_in_call_combiner(exec_ctx, arg, error);
+ gpr_free(arg);
+}
+
+static void intercept_callback(call_data *calld, callback_state *state,
+ bool free_when_done, const char *reason,
+ grpc_closure **original_closure) {
+ state->original_closure = *original_closure;
+ state->call_combiner = calld->call_combiner;
+ state->reason = reason;
+ *original_closure = GRPC_CLOSURE_INIT(
+ &state->closure,
+ free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner,
+ state, grpc_schedule_on_exec_ctx);
+}
+
+static callback_state *get_state_for_batch(
+ call_data *calld, grpc_transport_stream_op_batch *batch) {
+ if (batch->send_initial_metadata) return &calld->on_complete[0];
+ if (batch->send_message) return &calld->on_complete[1];
+ if (batch->send_trailing_metadata) return &calld->on_complete[2];
+ if (batch->recv_initial_metadata) return &calld->on_complete[3];
+ if (batch->recv_message) return &calld->on_complete[4];
+ if (batch->recv_trailing_metadata) return &calld->on_complete[5];
+ GPR_UNREACHABLE_CODE(return NULL);
+}
/* We perform a small hack to locate transport data alongside the connected
channel data in call allocations, to allow everything to be pulled in minimal
@@ -49,13 +99,38 @@ typedef struct connected_channel_call_data { void *unused; } call_data;
into transport stream operations */
static void con_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_transport_stream_op_batch *op) {
+ grpc_transport_stream_op_batch *batch) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
-
+ if (batch->recv_initial_metadata) {
+ callback_state *state = &calld->recv_initial_metadata_ready;
+ intercept_callback(
+ calld, state, false, "recv_initial_metadata_ready",
+ &batch->payload->recv_initial_metadata.recv_initial_metadata_ready);
+ }
+ if (batch->recv_message) {
+ callback_state *state = &calld->recv_message_ready;
+ intercept_callback(calld, state, false, "recv_message_ready",
+ &batch->payload->recv_message.recv_message_ready);
+ }
+ if (batch->cancel_stream) {
+ // There can be more than one cancellation batch in flight at any
+ // given time, so we can't just pick out a fixed index into
+ // calld->on_complete like we can for the other ops. However,
+ // cancellation isn't in the fast path, so we just allocate a new
+ // closure for each one.
+ callback_state *state = (callback_state *)gpr_malloc(sizeof(*state));
+ intercept_callback(calld, state, true, "on_complete (cancel_stream)",
+ &batch->on_complete);
+ } else {
+ callback_state *state = get_state_for_batch(calld, batch);
+ intercept_callback(calld, state, false, "on_complete", &batch->on_complete);
+ }
grpc_transport_perform_stream_op(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld), op);
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld),
+ batch);
+ GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner,
+ "passed batch to transport");
}
static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
@@ -71,6 +146,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
+ calld->call_combiner = args->call_combiner;
int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data, args->arena);
@@ -118,11 +194,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
}
-static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- channel_data *chand = elem->channel_data;
- return grpc_transport_get_peer(exec_ctx, chand->transport);
-}
-
/* No-op. */
static void con_get_channel_info(grpc_exec_ctx *exec_ctx,
grpc_channel_element *elem,
@@ -138,7 +209,6 @@ const grpc_channel_filter grpc_connected_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- con_get_peer,
con_get_channel_info,
"connected",
};
diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.c
new file mode 100644
index 0000000000..899f98552d
--- /dev/null
+++ b/src/core/lib/iomgr/call_combiner.c
@@ -0,0 +1,180 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/call_combiner.h"
+
+#include <grpc/support/log.h>
+
+grpc_tracer_flag grpc_call_combiner_trace =
+ GRPC_TRACER_INITIALIZER(false, "call_combiner");
+
+static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) {
+ if (cancel_state & 1) {
+ return (grpc_error*)(cancel_state & ~(gpr_atm)1);
+ }
+ return GRPC_ERROR_NONE;
+}
+
+static gpr_atm encode_cancel_state_error(grpc_error* error) {
+ return (gpr_atm)1 | (gpr_atm)error;
+}
+
+void grpc_call_combiner_init(grpc_call_combiner* call_combiner) {
+ gpr_mpscq_init(&call_combiner->queue);
+}
+
+void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) {
+ gpr_mpscq_destroy(&call_combiner->queue);
+ GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state));
+}
+
+#ifndef NDEBUG
+#define DEBUG_ARGS , const char *file, int line
+#define DEBUG_FMT_STR "%s:%d: "
+#define DEBUG_FMT_ARGS , file, line
+#else
+#define DEBUG_ARGS
+#define DEBUG_FMT_STR
+#define DEBUG_FMT_ARGS
+#endif
+
+void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure,
+ grpc_error* error DEBUG_ARGS,
+ const char* reason) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR
+ "%s] error=%s",
+ call_combiner, closure DEBUG_FMT_ARGS, reason,
+ grpc_error_string(error));
+ }
+ size_t prev_size =
+ (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1);
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
+ prev_size + 1);
+ }
+ if (prev_size == 0) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY");
+ }
+ // Queue was empty, so execute this closure immediately.
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
+ } else {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_INFO, " QUEUING");
+ }
+ // Queue was not empty, so add closure to queue.
+ closure->error_data.error = error;
+ gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure);
+ }
+}
+
+void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner DEBUG_ARGS,
+ const char* reason) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]",
+ call_combiner DEBUG_FMT_ARGS, reason);
+ }
+ size_t prev_size =
+ (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1);
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
+ prev_size - 1);
+ }
+ GPR_ASSERT(prev_size >= 1);
+ if (prev_size > 1) {
+ while (true) {
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " checking queue");
+ }
+ bool empty;
+ grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end(
+ &call_combiner->queue, &empty);
+ if (closure == NULL) {
+ // This can happen either due to a race condition within the mpscq
+ // code or because of a race with grpc_call_combiner_start().
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " queue returned no result; checking again");
+ }
+ continue;
+ }
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s",
+ closure, grpc_error_string(closure->error_data.error));
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error);
+ break;
+ }
+ } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG, " queue empty");
+ }
+}
+
+void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure) {
+ while (true) {
+ // Decode original state.
+ gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
+ grpc_error* original_error = decode_cancel_state_error(original_state);
+ // If error is set, invoke the cancellation closure immediately.
+ // Otherwise, store the new closure.
+ if (original_error != GRPC_ERROR_NONE) {
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error));
+ break;
+ } else {
+ if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
+ (gpr_atm)closure)) {
+ break;
+ }
+ }
+ // cas failed, try again.
+ }
+}
+
+void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_error* error) {
+ while (true) {
+ gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state);
+ grpc_error* original_error = decode_cancel_state_error(original_state);
+ if (original_error != GRPC_ERROR_NONE) {
+ GRPC_ERROR_UNREF(error);
+ break;
+ }
+ if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state,
+ encode_cancel_state_error(error))) {
+ if (original_state != 0) {
+ grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
+ if (GRPC_TRACER_ON(grpc_call_combiner_trace)) {
+ gpr_log(GPR_DEBUG,
+ "call_combiner=%p: scheduling notify_on_cancel callback=%p",
+ call_combiner, notify_on_cancel);
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error));
+ }
+ break;
+ }
+ // cas failed, try again.
+ }
+}
diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h
new file mode 100644
index 0000000000..621e2c3669
--- /dev/null
+++ b/src/core/lib/iomgr/call_combiner.h
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
+#define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H
+
+#include <stddef.h>
+
+#include <grpc/support/atm.h>
+
+#include "src/core/lib/iomgr/closure.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/mpscq.h"
+
+// A simple, lock-free mechanism for serializing activity related to a
+// single call. This is similar to a combiner but is more lightweight.
+//
+// It requires the callback (or, in the common case where the callback
+// actually kicks off a chain of callbacks, the last callback in that
+// chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP())
+// when it is done with the action that was kicked off by the original
+// callback.
+
+extern grpc_tracer_flag grpc_call_combiner_trace;
+
+typedef struct {
+ gpr_atm size; // size_t, num closures in queue or currently executing
+ gpr_mpscq queue;
+ // Either 0 (if not cancelled and no cancellation closure set),
+ // a grpc_closure* (if the lowest bit is 0),
+ // or a grpc_error* (if the lowest bit is 1).
+ gpr_atm cancel_state;
+} grpc_call_combiner;
+
+// Assumes memory was initialized to zero.
+void grpc_call_combiner_init(grpc_call_combiner* call_combiner);
+
+void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner);
+
+#ifndef NDEBUG
+#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \
+ reason) \
+ grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
+ __FILE__, __LINE__, (reason))
+#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \
+ grpc_call_combiner_stop((exec_ctx), (call_combiner), __FILE__, __LINE__, \
+ (reason))
+/// Starts processing \a closure on \a call_combiner.
+void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure, grpc_error* error,
+ const char* file, int line, const char* reason);
+/// Yields the call combiner to the next closure in the queue, if any.
+void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ const char* file, int line, const char* reason);
+#else
+#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \
+ reason) \
+ grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \
+ (reason))
+#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \
+ grpc_call_combiner_stop((exec_ctx), (call_combiner), (reason))
+/// Starts processing \a closure on \a call_combiner.
+void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure, grpc_error* error,
+ const char* reason);
+/// Yields the call combiner to the next closure in the queue, if any.
+void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ const char* reason);
+#endif
+
+/// Tells \a call_combiner to invoke \a closure when
+/// grpc_call_combiner_cancel() is called. If grpc_call_combiner_cancel()
+/// was previously called, \a closure will be invoked immediately.
+/// If \a closure is NULL, then no closure will be invoked on
+/// cancellation; this effectively unregisters the previously set closure.
+void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_closure* closure);
+
+/// Indicates that the call has been cancelled.
+void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx,
+ grpc_call_combiner* call_combiner,
+ grpc_error* error);
+
+#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 531a88434f..e3f0163a6c 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -39,6 +39,7 @@
/* We can have a per-call credentials. */
typedef struct {
+ grpc_call_combiner *call_combiner;
grpc_call_credentials *creds;
bool have_host;
bool have_method;
@@ -49,17 +50,11 @@ typedef struct {
pollset_set so that work can progress when this call wants work to progress
*/
grpc_polling_entity *pollent;
- gpr_atm security_context_set;
- gpr_mu security_context_mu;
grpc_credentials_mdelem_array md_array;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
grpc_auth_metadata_context auth_md_context;
- grpc_closure closure;
- // Either 0 (no cancellation and no async operation in flight),
- // a grpc_closure* (if the lowest bit is 0),
- // or a grpc_error* (if the lowest bit is 1).
- gpr_atm cancellation_state;
- grpc_closure cancel_closure;
+ grpc_closure async_cancel_closure;
+ grpc_closure async_result_closure;
} call_data;
/* We can have a per-channel credentials. */
@@ -68,43 +63,6 @@ typedef struct {
grpc_auth_context *auth_context;
} channel_data;
-static void decode_cancel_state(gpr_atm cancel_state, grpc_closure **func,
- grpc_error **error) {
- // If the lowest bit is 1, the value is a grpc_error*.
- // Otherwise, if non-zdero, the value is a grpc_closure*.
- if (cancel_state & 1) {
- *error = (grpc_error *)(cancel_state & ~(gpr_atm)1);
- } else if (cancel_state != 0) {
- *func = (grpc_closure *)cancel_state;
- }
-}
-
-static gpr_atm encode_cancel_state_error(grpc_error *error) {
- // Set the lowest bit to 1 to indicate that it's an error.
- return (gpr_atm)1 | (gpr_atm)error;
-}
-
-// Returns an error if the call has been cancelled. Otherwise, sets the
-// cancellation function to be called upon cancellation.
-static grpc_error *set_cancel_func(grpc_call_element *elem,
- grpc_iomgr_cb_func func) {
- call_data *calld = (call_data *)elem->call_data;
- // Decode original state.
- gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
- grpc_error *original_error = GRPC_ERROR_NONE;
- grpc_closure *original_func = NULL;
- decode_cancel_state(original_state, &original_func, &original_error);
- // If error is set, return it.
- if (original_error != GRPC_ERROR_NONE) return GRPC_ERROR_REF(original_error);
- // Otherwise, store func.
- GRPC_CLOSURE_INIT(&calld->cancel_closure, func, elem,
- grpc_schedule_on_exec_ctx);
- GPR_ASSERT(((gpr_atm)&calld->cancel_closure & (gpr_atm)1) == 0);
- gpr_atm_rel_store(&calld->cancellation_state,
- (gpr_atm)&calld->cancel_closure);
- return GRPC_ERROR_NONE;
-}
-
static void reset_auth_metadata_context(
grpc_auth_metadata_context *auth_md_context) {
if (auth_md_context->service_url != NULL) {
@@ -135,6 +93,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
reset_auth_metadata_context(&calld->auth_md_context);
grpc_error *error = GRPC_ERROR_REF(input_error);
if (error == GRPC_ERROR_NONE) {
@@ -153,7 +112,8 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg,
} else {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAUTHENTICATED);
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error);
+ grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error,
+ calld->call_combiner);
}
}
@@ -223,7 +183,8 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Incompatible credentials set on channel and call."),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED));
+ GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED),
+ calld->call_combiner);
return;
}
} else {
@@ -234,22 +195,23 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
- grpc_error *cancel_error = set_cancel_func(elem, cancel_get_request_metadata);
- if (cancel_error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
- cancel_error);
- return;
- }
GPR_ASSERT(calld->pollent != NULL);
- GRPC_CLOSURE_INIT(&calld->closure, on_credentials_metadata, batch,
- grpc_schedule_on_exec_ctx);
+
+ GRPC_CLOSURE_INIT(&calld->async_result_closure, on_credentials_metadata,
+ batch, grpc_schedule_on_exec_ctx);
grpc_error *error = GRPC_ERROR_NONE;
if (grpc_call_credentials_get_request_metadata(
exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
- &calld->md_array, &calld->closure, &error)) {
+ &calld->md_array, &calld->async_result_closure, &error)) {
// Synchronous return; invoke on_credentials_metadata() directly.
on_credentials_metadata(exec_ctx, batch, error);
GRPC_ERROR_UNREF(error);
+ } else {
+ // Async return; register cancellation closure with call combiner.
+ GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_get_request_metadata,
+ elem, grpc_schedule_on_exec_ctx);
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
+ &calld->async_cancel_closure);
}
}
@@ -258,7 +220,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg;
grpc_call_element *elem = batch->handler_private.extra_arg;
call_data *calld = elem->call_data;
-
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
if (error == GRPC_ERROR_NONE) {
send_security_metadata(exec_ctx, elem, batch);
} else {
@@ -271,7 +233,8 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg,
exec_ctx, batch,
grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg),
GRPC_ERROR_INT_GRPC_STATUS,
- GRPC_STATUS_UNAUTHENTICATED));
+ GRPC_STATUS_UNAUTHENTICATED),
+ calld->call_combiner);
gpr_free(error_msg);
}
}
@@ -282,7 +245,7 @@ static void cancel_check_call_host(grpc_exec_ctx *exec_ctx, void *arg,
call_data *calld = (call_data *)elem->call_data;
channel_data *chand = (channel_data *)elem->channel_data;
grpc_channel_security_connector_cancel_check_call_host(
- exec_ctx, chand->security_connector, &calld->closure,
+ exec_ctx, chand->security_connector, &calld->async_result_closure,
GRPC_ERROR_REF(error));
}
@@ -295,52 +258,19 @@ static void auth_start_transport_stream_op_batch(
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- if (batch->cancel_stream) {
- while (true) {
- // Decode the original cancellation state.
- gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state);
- grpc_error *cancel_error = GRPC_ERROR_NONE;
- grpc_closure *func = NULL;
- decode_cancel_state(original_state, &func, &cancel_error);
- // If we had already set a cancellation error, there's nothing
- // more to do.
- if (cancel_error != GRPC_ERROR_NONE) break;
- // If there's a cancel func, call it.
- // Note that even if the cancel func has been changed by some
- // other thread between when we decoded it and now, it will just
- // be a no-op.
- cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error);
- if (func != NULL) {
- GRPC_CLOSURE_SCHED(exec_ctx, func, GRPC_ERROR_REF(cancel_error));
- }
- // Encode the new error into cancellation state.
- if (gpr_atm_full_cas(&calld->cancellation_state, original_state,
- encode_cancel_state_error(cancel_error))) {
- break; // Success.
- }
- // The cas failed, so try again.
- }
- } else {
- /* double checked lock over security context to ensure it's set once */
- if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
- gpr_mu_lock(&calld->security_context_mu);
- if (gpr_atm_acq_load(&calld->security_context_set) == 0) {
- GPR_ASSERT(batch->payload->context != NULL);
- if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
- batch->payload->context[GRPC_CONTEXT_SECURITY].value =
- grpc_client_security_context_create();
- batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
- grpc_client_security_context_destroy;
- }
- grpc_client_security_context *sec_ctx =
- batch->payload->context[GRPC_CONTEXT_SECURITY].value;
- GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
- sec_ctx->auth_context =
- GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
- gpr_atm_rel_store(&calld->security_context_set, 1);
- }
- gpr_mu_unlock(&calld->security_context_mu);
+ if (!batch->cancel_stream) {
+ GPR_ASSERT(batch->payload->context != NULL);
+ if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) {
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value =
+ grpc_client_security_context_create();
+ batch->payload->context[GRPC_CONTEXT_SECURITY].destroy =
+ grpc_client_security_context_destroy;
}
+ grpc_client_security_context *sec_ctx =
+ batch->payload->context[GRPC_CONTEXT_SECURITY].value;
+ GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter");
+ sec_ctx->auth_context =
+ GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter");
}
if (batch->send_initial_metadata) {
@@ -365,26 +295,25 @@ static void auth_start_transport_stream_op_batch(
}
}
if (calld->have_host) {
- grpc_error *cancel_error = set_cancel_func(elem, cancel_check_call_host);
- if (cancel_error != GRPC_ERROR_NONE) {
- grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch,
- cancel_error);
+ batch->handler_private.extra_arg = elem;
+ GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch,
+ grpc_schedule_on_exec_ctx);
+ char *call_host = grpc_slice_to_c_string(calld->host);
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (grpc_channel_security_connector_check_call_host(
+ exec_ctx, chand->security_connector, call_host,
+ chand->auth_context, &calld->async_result_closure, &error)) {
+ // Synchronous return; invoke on_host_checked() directly.
+ on_host_checked(exec_ctx, batch, error);
+ GRPC_ERROR_UNREF(error);
} else {
- char *call_host = grpc_slice_to_c_string(calld->host);
- batch->handler_private.extra_arg = elem;
- grpc_error *error = GRPC_ERROR_NONE;
- if (grpc_channel_security_connector_check_call_host(
- exec_ctx, chand->security_connector, call_host,
- chand->auth_context,
- GRPC_CLOSURE_INIT(&calld->closure, on_host_checked, batch,
- grpc_schedule_on_exec_ctx),
- &error)) {
- // Synchronous return; invoke on_host_checked() directly.
- on_host_checked(exec_ctx, batch, error);
- GRPC_ERROR_UNREF(error);
- }
- gpr_free(call_host);
+ // Async return; register cancellation closure with call combiner.
+ GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_check_call_host,
+ elem, grpc_schedule_on_exec_ctx);
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
+ &calld->async_cancel_closure);
}
+ gpr_free(call_host);
GPR_TIMER_END("auth_start_transport_stream_op_batch", 0);
return; /* early exit */
}
@@ -400,8 +329,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
- memset(calld, 0, sizeof(*calld));
- gpr_mu_init(&calld->security_context_mu);
+ calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -426,12 +354,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_slice_unref_internal(exec_ctx, calld->method);
}
reset_auth_metadata_context(&calld->auth_md_context);
- gpr_mu_destroy(&calld->security_context_mu);
- gpr_atm cancel_state = gpr_atm_acq_load(&calld->cancellation_state);
- grpc_error *cancel_error = GRPC_ERROR_NONE;
- grpc_closure *cancel_func = NULL;
- decode_cancel_state(cancel_state, &cancel_func, &cancel_error);
- GRPC_ERROR_UNREF(cancel_error);
}
/* Constructor for channel_data */
@@ -490,6 +412,5 @@ const grpc_channel_filter grpc_client_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"client-auth"};
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 9bf3f0ca0f..b721ce4a22 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -26,7 +26,15 @@
#include "src/core/lib/security/transport/auth_filters.h"
#include "src/core/lib/slice/slice_internal.h"
+typedef enum {
+ STATE_INIT = 0,
+ STATE_DONE,
+ STATE_CANCELLED,
+} async_state;
+
typedef struct call_data {
+ grpc_call_combiner *call_combiner;
+ grpc_call_stack *owning_call;
grpc_transport_stream_op_batch *recv_initial_metadata_batch;
grpc_closure *original_recv_initial_metadata_ready;
grpc_closure recv_initial_metadata_ready;
@@ -34,6 +42,8 @@ typedef struct call_data {
const grpc_metadata *consumed_md;
size_t num_consumed_md;
grpc_auth_context *auth_context;
+ grpc_closure cancel_closure;
+ gpr_atm state; // async_state
} call_data;
typedef struct channel_data {
@@ -78,54 +88,92 @@ static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx *exec_ctx,
return GRPC_FILTERED_MDELEM(md);
}
-/* called from application code */
-static void on_md_processing_done(
- void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
- const grpc_metadata *response_md, size_t num_response_md,
- grpc_status_code status, const char *error_details) {
- grpc_call_element *elem = user_data;
+static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ const grpc_metadata *consumed_md,
+ size_t num_consumed_md,
+ const grpc_metadata *response_md,
+ size_t num_response_md,
+ grpc_error *error) {
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL);
/* TODO(jboeuf): Implement support for response_md. */
if (response_md != NULL && num_response_md > 0) {
gpr_log(GPR_INFO,
"response_md in auth metadata processing not supported for now. "
"Ignoring...");
}
- grpc_error *error = GRPC_ERROR_NONE;
- if (status == GRPC_STATUS_OK) {
+ if (error == GRPC_ERROR_NONE) {
calld->consumed_md = consumed_md;
calld->num_consumed_md = num_consumed_md;
error = grpc_metadata_batch_filter(
- &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
+ exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata,
remove_consumed_md, elem, "Response metadata filtering error");
- } else {
- if (error_details == NULL) {
- error_details = "Authentication metadata processing failed.";
+ }
+ GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready,
+ error);
+}
+
+// Called from application code.
+static void on_md_processing_done(
+ void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md,
+ const grpc_metadata *response_md, size_t num_response_md,
+ grpc_status_code status, const char *error_details) {
+ grpc_call_element *elem = user_data;
+ call_data *calld = elem->call_data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ // If the call was not cancelled while we were in flight, process the result.
+ if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
+ (gpr_atm)STATE_DONE)) {
+ grpc_error *error = GRPC_ERROR_NONE;
+ if (status != GRPC_STATUS_OK) {
+ if (error_details == NULL) {
+ error_details = "Authentication metadata processing failed.";
+ }
+ error = grpc_error_set_int(
+ GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
+ GRPC_ERROR_INT_GRPC_STATUS, status);
}
- error =
- grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details),
- GRPC_ERROR_INT_GRPC_STATUS, status);
+ on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md,
+ response_md, num_response_md, error);
}
+ // Clean up.
for (size_t i = 0; i < calld->md.count; i++) {
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key);
grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value);
}
grpc_metadata_array_destroy(&calld->md);
- GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready,
- error);
+ GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata");
grpc_exec_ctx_finish(&exec_ctx);
}
+static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+ grpc_call_element *elem = (grpc_call_element *)arg;
+ call_data *calld = elem->call_data;
+ // If the result was not already processed, invoke the callback now.
+ if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT,
+ (gpr_atm)STATE_CANCELLED)) {
+ on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0,
+ GRPC_ERROR_REF(error));
+ }
+}
+
static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- grpc_call_element *elem = arg;
+ grpc_call_element *elem = (grpc_call_element *)arg;
channel_data *chand = elem->channel_data;
call_data *calld = elem->call_data;
grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch;
if (error == GRPC_ERROR_NONE) {
if (chand->creds != NULL && chand->creds->processor.process != NULL) {
+ // We're calling out to the application, so we need to make sure
+ // to drop the call combiner early if we get cancelled.
+ GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem,
+ grpc_schedule_on_exec_ctx);
+ grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner,
+ &calld->cancel_closure);
+ GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata");
calld->md = metadata_batch_to_md_array(
batch->payload->recv_initial_metadata.recv_initial_metadata);
chand->creds->processor.process(
@@ -159,6 +207,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
const grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
+ calld->call_combiner = args->call_combiner;
+ calld->owning_call = args->call_stack;
GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready,
recv_initial_metadata_ready, elem,
grpc_schedule_on_exec_ctx);
@@ -218,6 +268,5 @@ const grpc_channel_filter grpc_server_auth_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server-auth"};
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index a0ac9ae7d5..4a7152ea60 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -121,6 +121,7 @@ typedef struct batch_control {
bool is_closure;
} notify_tag;
} completion_data;
+ grpc_closure start_batch;
grpc_closure finish_batch;
gpr_refcount steps_to_complete;
@@ -147,6 +148,7 @@ typedef struct {
struct grpc_call {
gpr_refcount ext_ref;
gpr_arena *arena;
+ grpc_call_combiner call_combiner;
grpc_completion_queue *cq;
grpc_polling_entity pollent;
grpc_channel *channel;
@@ -183,6 +185,11 @@ struct grpc_call {
Element 0 is initial metadata, element 1 is trailing metadata. */
grpc_metadata_array *buffered_metadata[2];
+ grpc_metadata compression_md;
+
+ // A char* indicating the peer name.
+ gpr_atm peer_string;
+
/* Packed received call statuses from various sources */
gpr_atm status[STATUS_SOURCE_COUNT];
@@ -245,8 +252,9 @@ grpc_tracer_flag grpc_compression_trace =
#define CALL_FROM_TOP_ELEM(top_elem) \
CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op);
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *op,
+ grpc_closure *start_batch_closure);
static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_status_code status,
const char *description);
@@ -311,6 +319,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
sizeof(grpc_call) + channel_stack->call_stack_size);
gpr_ref_init(&call->ext_ref, 1);
call->arena = arena;
+ grpc_call_combiner_init(&call->call_combiner);
*out_call = call;
call->channel = args->channel;
call->cq = args->cq;
@@ -414,7 +423,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx,
.path = path,
.start_time = call->start_time,
.deadline = send_deadline,
- .arena = call->arena};
+ .arena = call->arena,
+ .call_combiner = &call->call_combiner};
add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1,
destroy_call, call, &call_args));
if (error != GRPC_ERROR_NONE) {
@@ -481,6 +491,8 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call,
grpc_error *error) {
grpc_call *c = call;
grpc_channel *channel = c->channel;
+ grpc_call_combiner_destroy(&c->call_combiner);
+ gpr_free((char *)c->peer_string);
grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena));
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call");
}
@@ -580,30 +592,37 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
return GRPC_CALL_OK;
}
-static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
- grpc_transport_stream_op_batch *op) {
- grpc_call_element *elem;
-
- GPR_TIMER_BEGIN("execute_op", 0);
- elem = CALL_ELEM_FROM_CALL(call, 0);
- elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op);
- GPR_TIMER_END("execute_op", 0);
+// This is called via the call combiner to start sending a batch down
+// the filter stack.
+static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg,
+ grpc_error *ignored) {
+ grpc_transport_stream_op_batch *batch = arg;
+ grpc_call *call = batch->handler_private.extra_arg;
+ GPR_TIMER_BEGIN("execute_batch", 0);
+ grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
+ GRPC_CALL_LOG_OP(GPR_INFO, elem, batch);
+ elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch);
+ GPR_TIMER_END("execute_batch", 0);
+}
+
+// start_batch_closure points to a caller-allocated closure to be used
+// for entering the call combiner.
+static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call,
+ grpc_transport_stream_op_batch *batch,
+ grpc_closure *start_batch_closure) {
+ batch->handler_private.extra_arg = call;
+ GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch,
+ grpc_schedule_on_exec_ctx);
+ GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure,
+ GRPC_ERROR_NONE, "executing batch");
}
char *grpc_call_get_peer(grpc_call *call) {
- grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- char *result;
- GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
- result = elem->filter->get_peer(&exec_ctx, elem);
- if (result == NULL) {
- result = grpc_channel_get_target(call->channel);
- }
- if (result == NULL) {
- result = gpr_strdup("unknown");
- }
- grpc_exec_ctx_finish(&exec_ctx);
- return result;
+ char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string);
+ if (peer_string != NULL) return gpr_strdup(peer_string);
+ peer_string = grpc_channel_get_target(call->channel);
+ if (peer_string != NULL) return peer_string;
+ return gpr_strdup("unknown");
}
grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
@@ -630,20 +649,41 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
return GRPC_CALL_OK;
}
-static void done_termination(grpc_exec_ctx *exec_ctx, void *call,
+typedef struct {
+ grpc_call *call;
+ grpc_closure start_batch;
+ grpc_closure finish_batch;
+} cancel_state;
+
+// The on_complete callback used when sending a cancel_stream batch down
+// the filter stack. Yields the call combiner when the batch is done.
+static void done_termination(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
- GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination");
+ cancel_state *state = (cancel_state *)arg;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner,
+ "on_complete for cancel_stream op");
+ GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination");
+ gpr_free(state);
}
static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c,
status_source source, grpc_error *error) {
GRPC_CALL_INTERNAL_REF(c, "termination");
+ // Inform the call combiner of the cancellation, so that it can cancel
+ // any in-flight asynchronous actions that may be holding the call
+ // combiner. This ensures that the cancel_stream batch can be sent
+ // down the filter stack in a timely manner.
+ grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error));
set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error));
- grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(
- GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx));
+ cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state));
+ state->call = c;
+ GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state,
+ grpc_schedule_on_exec_ctx);
+ grpc_transport_stream_op_batch *op =
+ grpc_make_transport_stream_op(&state->finish_batch);
op->cancel_stream = true;
op->payload->cancel_stream.cancel_error = error;
- execute_op(exec_ctx, c, op);
+ execute_batch(exec_ctx, c, op, &state->start_batch);
}
static grpc_error *error_from_status(grpc_status_code status,
@@ -1408,6 +1448,18 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
+// The recv_message_ready callback used when sending a batch containing
+// a recv_message op down the filter stack. Yields the call combiner
+// before processing the received message.
+static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx,
+ void *bctlp,
+ grpc_error *error) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready");
+ receiving_stream_ready(exec_ctx, bctlp, error);
+}
+
static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
batch_control *bctl) {
grpc_call *call = bctl->call;
@@ -1514,6 +1566,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner,
+ "recv_initial_metadata_ready");
+
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
@@ -1548,7 +1603,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
-
+ grpc_call *call = bctl->call;
+ GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete");
add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
@@ -1568,9 +1624,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
int num_completion_callbacks_needed = 1;
grpc_call_error error = GRPC_CALL_OK;
- // sent_initial_metadata guards against variable reuse.
- grpc_metadata compression_md;
-
GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
@@ -1618,7 +1671,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
goto done_with_error;
}
/* process compression level */
- memset(&compression_md, 0, sizeof(compression_md));
+ memset(&call->compression_md, 0, sizeof(call->compression_md));
size_t additional_metadata_count = 0;
grpc_compression_level effective_compression_level =
GRPC_COMPRESS_LEVEL_NONE;
@@ -1656,9 +1709,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
const grpc_stream_compression_algorithm calgo =
stream_compression_algorithm_for_level_locked(
call, effective_stream_compression_level);
- compression_md.key =
+ call->compression_md.key =
GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST;
- compression_md.value =
+ call->compression_md.value =
grpc_stream_compression_algorithm_slice(calgo);
} else {
const grpc_compression_algorithm calgo =
@@ -1666,8 +1719,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
call, effective_compression_level);
/* the following will be picked up by the compress filter and used
* as the call's compression algorithm. */
- compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
- compression_md.value = grpc_compression_algorithm_slice(calgo);
+ call->compression_md.key =
+ GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
+ call->compression_md.value =
+ grpc_compression_algorithm_slice(calgo);
additional_metadata_count++;
}
}
@@ -1682,7 +1737,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
if (!prepare_application_metadata(
exec_ctx, call, (int)op->data.send_initial_metadata.count,
op->data.send_initial_metadata.metadata, 0, call->is_client,
- &compression_md, (int)additional_metadata_count)) {
+ &call->compression_md, (int)additional_metadata_count)) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
@@ -1694,6 +1749,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
+ if (call->is_client) {
+ stream_op_payload->send_initial_metadata.peer_string =
+ &call->peer_string;
+ }
break;
case GRPC_OP_SEND_MESSAGE:
if (!are_write_flags_valid(op->flags)) {
@@ -1826,6 +1885,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
+ if (!call->is_client) {
+ stream_op_payload->recv_initial_metadata.peer_string =
+ &call->peer_string;
+ }
num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
@@ -1842,8 +1905,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
- GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready,
- bctl, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
+ receiving_stream_ready_in_call_combiner, bctl,
+ grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
num_completion_callbacks_needed++;
@@ -1913,7 +1977,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
stream_op->on_complete = &bctl->finish_batch;
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
- execute_op(exec_ctx, call, stream_op);
+ execute_batch(exec_ctx, call, stream_op, &bctl->start_batch);
done:
GPR_TIMER_END("grpc_call_start_batch", 0);
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index d199ac060e..75a13d28fc 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -30,6 +30,7 @@
#include "src/core/lib/channel/handshaker_registry.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/combiner.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/iomgr/iomgr.h"
@@ -127,6 +128,7 @@ void grpc_init(void) {
grpc_register_tracer(&grpc_trace_channel_stack_builder);
grpc_register_tracer(&grpc_http1_trace);
grpc_register_tracer(&grpc_cq_pluck_trace); // default on
+ grpc_register_tracer(&grpc_call_combiner_trace);
grpc_register_tracer(&grpc_combiner_trace);
grpc_register_tracer(&grpc_server_channel_trace);
grpc_register_tracer(&grpc_bdp_estimator_trace);
diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc
index a0791080a9..6286f9159d 100644
--- a/src/core/lib/surface/lame_client.cc
+++ b/src/core/lib/surface/lame_client.cc
@@ -40,6 +40,7 @@ namespace grpc_core {
namespace {
struct CallData {
+ grpc_call_combiner *call_combiner;
grpc_linked_mdelem status;
grpc_linked_mdelem details;
grpc_core::atomic<bool> filled_metadata;
@@ -52,14 +53,14 @@ struct ChannelData {
static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_metadata_batch *mdb) {
- CallData *calld = static_cast<CallData *>(elem->call_data);
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
bool expected = false;
if (!calld->filled_metadata.compare_exchange_strong(
expected, true, grpc_core::memory_order_relaxed,
grpc_core::memory_order_relaxed)) {
return;
}
- ChannelData *chand = static_cast<ChannelData *>(elem->channel_data);
+ ChannelData *chand = reinterpret_cast<ChannelData *>(elem->channel_data);
char tmp[GPR_LTOA_MIN_BUFSIZE];
gpr_ltoa(chand->error_code, tmp);
calld->status.md = grpc_mdelem_from_slices(
@@ -79,6 +80,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
static void lame_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
if (op->recv_initial_metadata) {
fill_metadata(exec_ctx, elem,
op->payload->recv_initial_metadata.recv_initial_metadata);
@@ -87,12 +89,8 @@ static void lame_start_transport_stream_op_batch(
op->payload->recv_trailing_metadata.recv_trailing_metadata);
}
grpc_transport_stream_op_batch_finish_with_failure(
- exec_ctx, op,
- GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"));
-}
-
-static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
- return NULL;
+ exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"),
+ calld->call_combiner);
}
static void lame_get_channel_info(grpc_exec_ctx *exec_ctx,
@@ -122,6 +120,8 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
const grpc_call_element_args *args) {
+ CallData *calld = reinterpret_cast<CallData *>(elem->call_data);
+ calld->call_combiner = args->call_combiner;
return GRPC_ERROR_NONE;
}
@@ -156,7 +156,6 @@ extern "C" const grpc_channel_filter grpc_lame_filter = {
sizeof(grpc_core::ChannelData),
grpc_core::init_channel_elem,
grpc_core::destroy_channel_elem,
- grpc_core::lame_get_peer,
grpc_core::lame_get_channel_info,
"lame-client",
};
@@ -176,7 +175,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target,
"error_message=%s)",
3, (target, (int)error_code, error_message));
GPR_ASSERT(elem->filter == &grpc_lame_filter);
- auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data);
+ auto chand = reinterpret_cast<grpc_core::ChannelData *>(elem->channel_data);
chand->error_code = error_code;
chand->error_message = error_message;
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 66dcc299aa..8582d826ca 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -789,7 +789,6 @@ static void server_mutate_op(grpc_call_element *elem,
static void server_start_transport_stream_op_batch(
grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op_batch *op) {
- GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
server_mutate_op(elem, op);
grpc_call_next_op(exec_ctx, elem, op);
}
@@ -962,7 +961,6 @@ const grpc_channel_filter grpc_server_top_filter = {
sizeof(channel_data),
init_channel_elem,
destroy_channel_elem,
- grpc_call_next_get_peer,
grpc_channel_next_get_info,
"server",
};
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index 6c61f4b8d9..650b0559aa 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -197,11 +197,6 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
then_schedule_closure);
}
-char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_transport *transport) {
- return transport->vtable->get_peer(exec_ctx, transport);
-}
-
grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *transport) {
return transport->vtable->get_endpoint(exec_ctx, transport);
@@ -214,24 +209,24 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
// is a function that must always unref cancel_error
// though it lives in lib, it handles transport stream ops sure
// it's grpc_transport_stream_op_batch_finish_with_failure
-
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch,
- grpc_error *error) {
+ grpc_error *error, grpc_call_combiner *call_combiner) {
if (batch->send_message) {
grpc_byte_stream_destroy(exec_ctx,
batch->payload->send_message.send_message);
}
if (batch->recv_message) {
- GRPC_CLOSURE_SCHED(exec_ctx,
- batch->payload->recv_message.recv_message_ready,
- GRPC_ERROR_REF(error));
+ GRPC_CALL_COMBINER_START(exec_ctx, call_combiner,
+ batch->payload->recv_message.recv_message_ready,
+ GRPC_ERROR_REF(error),
+ "failing recv_message_ready");
}
if (batch->recv_initial_metadata) {
- GRPC_CLOSURE_SCHED(
- exec_ctx,
+ GRPC_CALL_COMBINER_START(
+ exec_ctx, call_combiner,
batch->payload->recv_initial_metadata.recv_initial_metadata_ready,
- GRPC_ERROR_REF(error));
+ GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready");
}
GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error);
if (batch->cancel_stream) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 099138ea14..fbf5dcb8b5 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -22,6 +22,7 @@
#include <stddef.h>
#include "src/core/lib/channel/context.h"
+#include "src/core/lib/iomgr/call_combiner.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
@@ -152,6 +153,9 @@ struct grpc_transport_stream_op_batch_payload {
/** Iff send_initial_metadata != NULL, flags associated with
send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */
uint32_t send_initial_metadata_flags;
+ // If non-NULL, will be set by the transport to the peer string
+ // (a char*, which the caller takes ownership of).
+ gpr_atm *peer_string;
} send_initial_metadata;
struct {
@@ -176,6 +180,9 @@ struct grpc_transport_stream_op_batch_payload {
// immediately available. This may be a signal that we received a
// Trailers-Only response.
bool *trailing_metadata_available;
+ // If non-NULL, will be set by the transport to the peer string
+ // (a char*, which the caller takes ownership of).
+ gpr_atm *peer_string;
} recv_initial_metadata;
struct {
@@ -293,7 +300,7 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_batch_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op,
- grpc_error *error);
+ grpc_error *error, grpc_call_combiner *call_combiner);
char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op);
char *grpc_transport_op_string(grpc_transport_op *op);
@@ -332,10 +339,6 @@ void grpc_transport_close(grpc_transport *transport);
/* Destroy the transport */
void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport);
-/* Get the transports peer */
-char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
- grpc_transport *transport);
-
/* Get the endpoint used by \a transport */
grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx,
grpc_transport *transport);
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index fc772c6dd1..bbae69c223 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -59,9 +59,6 @@ typedef struct grpc_transport_vtable {
/* implementation of grpc_transport_destroy */
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
- /* implementation of grpc_transport_get_peer */
- char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
-
/* implementation of grpc_transport_get_endpoint */
grpc_endpoint *(*get_endpoint)(grpc_exec_ctx *exec_ctx, grpc_transport *self);
} grpc_transport_vtable;
diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c
index 7b18229ba6..409a6c4103 100644
--- a/src/core/lib/transport/transport_op_string.c
+++ b/src/core/lib/transport/transport_op_string.c
@@ -112,6 +112,13 @@ char *grpc_transport_stream_op_batch_string(
gpr_strvec_add(&b, tmp);
}
+ if (op->collect_stats) {
+ gpr_strvec_add(&b, gpr_strdup(" "));
+ gpr_asprintf(&tmp, "COLLECT_STATS:%p",
+ op->payload->collect_stats.collect_stats);
+ gpr_strvec_add(&b, tmp);
+ }
+
out = gpr_strvec_flatten(&b, NULL);
gpr_strvec_destroy(&b);