From 764cf04a13958d72db5a22eb4bbb9370e00777f5 Mon Sep 17 00:00:00 2001 From: "Mark D. Roth" Date: Fri, 1 Sep 2017 09:00:06 -0700 Subject: Revert "Revert "Implement call combiner"" This reverts commit bf19961d0a49b43cb528392efeb4880eeebb9b5e. --- src/core/lib/channel/channel_stack.c | 16 +- src/core/lib/channel/channel_stack.h | 11 +- src/core/lib/channel/connected_channel.c | 92 +++++++++-- src/core/lib/iomgr/call_combiner.c | 180 ++++++++++++++++++++ src/core/lib/iomgr/call_combiner.h | 104 ++++++++++++ .../lib/security/transport/client_auth_filter.c | 183 ++++++--------------- .../lib/security/transport/server_auth_filter.c | 89 +++++++--- src/core/lib/surface/call.c | 148 ++++++++++++----- src/core/lib/surface/init.c | 2 + src/core/lib/surface/lame_client.cc | 19 +-- src/core/lib/surface/server.c | 2 - src/core/lib/transport/transport.c | 21 +-- src/core/lib/transport/transport.h | 13 +- src/core/lib/transport/transport_impl.h | 3 - src/core/lib/transport/transport_op_string.c | 7 + 15 files changed, 629 insertions(+), 261 deletions(-) create mode 100644 src/core/lib/iomgr/call_combiner.c create mode 100644 src/core/lib/iomgr/call_combiner.h (limited to 'src/core/lib') diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 0f8e33c4be..775c8bc667 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -233,15 +233,10 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { grpc_call_element *next_elem = elem + 1; + GRPC_CALL_LOG_OP(GPR_INFO, next_elem, op); next_elem->filter->start_transport_stream_op_batch(exec_ctx, next_elem, op); } -char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - grpc_call_element *next_elem = elem + 1; - return next_elem->filter->get_peer(exec_ctx, next_elem); -} - void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *channel_info) { @@ -265,12 +260,3 @@ grpc_call_stack *grpc_call_stack_from_top_element(grpc_call_element *elem) { return (grpc_call_stack *)((char *)(elem)-ROUND_UP_TO_ALIGNMENT_SIZE( sizeof(grpc_call_stack))); } - -void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_error *error) { - grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op(NULL); - op->cancel_stream = true; - op->payload->cancel_stream.cancel_error = error; - elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); -} diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index a80f8aa826..ae1cac31f7 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -40,6 +40,7 @@ #include #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/support/arena.h" #include "src/core/lib/transport/transport.h" @@ -71,6 +72,7 @@ typedef struct { gpr_timespec start_time; gpr_timespec deadline; gpr_arena *arena; + grpc_call_combiner *call_combiner; } grpc_call_element_args; typedef struct { @@ -150,9 +152,6 @@ typedef struct { void (*destroy_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem); - /* Implement grpc_call_get_peer() */ - char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); - /* Implement grpc_channel_get_info() */ void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *channel_info); @@ -271,8 +270,6 @@ void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, stack */ void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op); -/* Pass through a request to get_peer to the next child element */ -char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); /* Pass through a request to get_channel_info() to the next child element */ void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -288,10 +285,6 @@ void grpc_call_log_op(char *file, int line, gpr_log_severity severity, grpc_call_element *elem, grpc_transport_stream_op_batch *op); -void grpc_call_element_signal_error(grpc_exec_ctx *exec_ctx, - grpc_call_element *cur_elem, - grpc_error *error); - extern grpc_tracer_flag grpc_trace_channel; #define GRPC_CALL_LOG_OP(sev, elem, op) \ diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index af06ca802e..8285226fc4 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -36,7 +36,57 @@ typedef struct connected_channel_channel_data { grpc_transport *transport; } channel_data; -typedef struct connected_channel_call_data { void *unused; } call_data; +typedef struct { + grpc_closure closure; + grpc_closure *original_closure; + grpc_call_combiner *call_combiner; + const char *reason; +} callback_state; + +typedef struct connected_channel_call_data { + grpc_call_combiner *call_combiner; + // Closures used for returning results on the call combiner. + callback_state on_complete[6]; // Max number of pending batches. + callback_state recv_initial_metadata_ready; + callback_state recv_message_ready; +} call_data; + +static void run_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + callback_state *state = (callback_state *)arg; + GRPC_CALL_COMBINER_START(exec_ctx, state->call_combiner, + state->original_closure, GRPC_ERROR_REF(error), + state->reason); +} + +static void run_cancel_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + run_in_call_combiner(exec_ctx, arg, error); + gpr_free(arg); +} + +static void intercept_callback(call_data *calld, callback_state *state, + bool free_when_done, const char *reason, + grpc_closure **original_closure) { + state->original_closure = *original_closure; + state->call_combiner = calld->call_combiner; + state->reason = reason; + *original_closure = GRPC_CLOSURE_INIT( + &state->closure, + free_when_done ? run_cancel_in_call_combiner : run_in_call_combiner, + state, grpc_schedule_on_exec_ctx); +} + +static callback_state *get_state_for_batch( + call_data *calld, grpc_transport_stream_op_batch *batch) { + if (batch->send_initial_metadata) return &calld->on_complete[0]; + if (batch->send_message) return &calld->on_complete[1]; + if (batch->send_trailing_metadata) return &calld->on_complete[2]; + if (batch->recv_initial_metadata) return &calld->on_complete[3]; + if (batch->recv_message) return &calld->on_complete[4]; + if (batch->recv_trailing_metadata) return &calld->on_complete[5]; + GPR_UNREACHABLE_CODE(return NULL); +} /* We perform a small hack to locate transport data alongside the connected channel data in call allocations, to allow everything to be pulled in minimal @@ -49,13 +99,38 @@ typedef struct connected_channel_call_data { void *unused; } call_data; into transport stream operations */ static void con_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - + if (batch->recv_initial_metadata) { + callback_state *state = &calld->recv_initial_metadata_ready; + intercept_callback( + calld, state, false, "recv_initial_metadata_ready", + &batch->payload->recv_initial_metadata.recv_initial_metadata_ready); + } + if (batch->recv_message) { + callback_state *state = &calld->recv_message_ready; + intercept_callback(calld, state, false, "recv_message_ready", + &batch->payload->recv_message.recv_message_ready); + } + if (batch->cancel_stream) { + // There can be more than one cancellation batch in flight at any + // given time, so we can't just pick out a fixed index into + // calld->on_complete like we can for the other ops. However, + // cancellation isn't in the fast path, so we just allocate a new + // closure for each one. + callback_state *state = (callback_state *)gpr_malloc(sizeof(*state)); + intercept_callback(calld, state, true, "on_complete (cancel_stream)", + &batch->on_complete); + } else { + callback_state *state = get_state_for_batch(calld, batch); + intercept_callback(calld, state, false, "on_complete", &batch->on_complete); + } grpc_transport_perform_stream_op(exec_ctx, chand->transport, - TRANSPORT_STREAM_FROM_CALL_DATA(calld), op); + TRANSPORT_STREAM_FROM_CALL_DATA(calld), + batch); + GRPC_CALL_COMBINER_STOP(exec_ctx, calld->call_combiner, + "passed batch to transport"); } static void con_start_transport_op(grpc_exec_ctx *exec_ctx, @@ -71,6 +146,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; + calld->call_combiner = args->call_combiner; int r = grpc_transport_init_stream( exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), &args->call_stack->refcount, args->server_transport_data, args->arena); @@ -118,11 +194,6 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } } -static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - channel_data *chand = elem->channel_data; - return grpc_transport_get_peer(exec_ctx, chand->transport); -} - /* No-op. */ static void con_get_channel_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -138,7 +209,6 @@ const grpc_channel_filter grpc_connected_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - con_get_peer, con_get_channel_info, "connected", }; diff --git a/src/core/lib/iomgr/call_combiner.c b/src/core/lib/iomgr/call_combiner.c new file mode 100644 index 0000000000..899f98552d --- /dev/null +++ b/src/core/lib/iomgr/call_combiner.c @@ -0,0 +1,180 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/call_combiner.h" + +#include + +grpc_tracer_flag grpc_call_combiner_trace = + GRPC_TRACER_INITIALIZER(false, "call_combiner"); + +static grpc_error* decode_cancel_state_error(gpr_atm cancel_state) { + if (cancel_state & 1) { + return (grpc_error*)(cancel_state & ~(gpr_atm)1); + } + return GRPC_ERROR_NONE; +} + +static gpr_atm encode_cancel_state_error(grpc_error* error) { + return (gpr_atm)1 | (gpr_atm)error; +} + +void grpc_call_combiner_init(grpc_call_combiner* call_combiner) { + gpr_mpscq_init(&call_combiner->queue); +} + +void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner) { + gpr_mpscq_destroy(&call_combiner->queue); + GRPC_ERROR_UNREF(decode_cancel_state_error(call_combiner->cancel_state)); +} + +#ifndef NDEBUG +#define DEBUG_ARGS , const char *file, int line +#define DEBUG_FMT_STR "%s:%d: " +#define DEBUG_FMT_ARGS , file, line +#else +#define DEBUG_ARGS +#define DEBUG_FMT_STR +#define DEBUG_FMT_ARGS +#endif + +void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner, + grpc_closure* closure, + grpc_error* error DEBUG_ARGS, + const char* reason) { + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, + "==> grpc_call_combiner_start() [%p] closure=%p [" DEBUG_FMT_STR + "%s] error=%s", + call_combiner, closure DEBUG_FMT_ARGS, reason, + grpc_error_string(error)); + } + size_t prev_size = + (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)1); + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, + prev_size + 1); + } + if (prev_size == 0) { + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, " EXECUTING IMMEDIATELY"); + } + // Queue was empty, so execute this closure immediately. + GRPC_CLOSURE_SCHED(exec_ctx, closure, error); + } else { + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_INFO, " QUEUING"); + } + // Queue was not empty, so add closure to queue. + closure->error_data.error = error; + gpr_mpscq_push(&call_combiner->queue, (gpr_mpscq_node*)closure); + } +} + +void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner DEBUG_ARGS, + const char* reason) { + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, + "==> grpc_call_combiner_stop() [%p] [" DEBUG_FMT_STR "%s]", + call_combiner DEBUG_FMT_ARGS, reason); + } + size_t prev_size = + (size_t)gpr_atm_full_fetch_add(&call_combiner->size, (gpr_atm)-1); + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size, + prev_size - 1); + } + GPR_ASSERT(prev_size >= 1); + if (prev_size > 1) { + while (true) { + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, " checking queue"); + } + bool empty; + grpc_closure* closure = (grpc_closure*)gpr_mpscq_pop_and_check_end( + &call_combiner->queue, &empty); + if (closure == NULL) { + // This can happen either due to a race condition within the mpscq + // code or because of a race with grpc_call_combiner_start(). + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, " queue returned no result; checking again"); + } + continue; + } + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, " EXECUTING FROM QUEUE: closure=%p error=%s", + closure, grpc_error_string(closure->error_data.error)); + } + GRPC_CLOSURE_SCHED(exec_ctx, closure, closure->error_data.error); + break; + } + } else if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, " queue empty"); + } +} + +void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner, + grpc_closure* closure) { + while (true) { + // Decode original state. + gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); + grpc_error* original_error = decode_cancel_state_error(original_state); + // If error is set, invoke the cancellation closure immediately. + // Otherwise, store the new closure. + if (original_error != GRPC_ERROR_NONE) { + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_REF(original_error)); + break; + } else { + if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, + (gpr_atm)closure)) { + break; + } + } + // cas failed, try again. + } +} + +void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner, + grpc_error* error) { + while (true) { + gpr_atm original_state = gpr_atm_acq_load(&call_combiner->cancel_state); + grpc_error* original_error = decode_cancel_state_error(original_state); + if (original_error != GRPC_ERROR_NONE) { + GRPC_ERROR_UNREF(error); + break; + } + if (gpr_atm_full_cas(&call_combiner->cancel_state, original_state, + encode_cancel_state_error(error))) { + if (original_state != 0) { + grpc_closure* notify_on_cancel = (grpc_closure*)original_state; + if (GRPC_TRACER_ON(grpc_call_combiner_trace)) { + gpr_log(GPR_DEBUG, + "call_combiner=%p: scheduling notify_on_cancel callback=%p", + call_combiner, notify_on_cancel); + } + GRPC_CLOSURE_SCHED(exec_ctx, notify_on_cancel, GRPC_ERROR_REF(error)); + } + break; + } + // cas failed, try again. + } +} diff --git a/src/core/lib/iomgr/call_combiner.h b/src/core/lib/iomgr/call_combiner.h new file mode 100644 index 0000000000..621e2c3669 --- /dev/null +++ b/src/core/lib/iomgr/call_combiner.h @@ -0,0 +1,104 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H +#define GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H + +#include + +#include + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/support/mpscq.h" + +// A simple, lock-free mechanism for serializing activity related to a +// single call. This is similar to a combiner but is more lightweight. +// +// It requires the callback (or, in the common case where the callback +// actually kicks off a chain of callbacks, the last callback in that +// chain) to explicitly indicate (by calling GRPC_CALL_COMBINER_STOP()) +// when it is done with the action that was kicked off by the original +// callback. + +extern grpc_tracer_flag grpc_call_combiner_trace; + +typedef struct { + gpr_atm size; // size_t, num closures in queue or currently executing + gpr_mpscq queue; + // Either 0 (if not cancelled and no cancellation closure set), + // a grpc_closure* (if the lowest bit is 0), + // or a grpc_error* (if the lowest bit is 1). + gpr_atm cancel_state; +} grpc_call_combiner; + +// Assumes memory was initialized to zero. +void grpc_call_combiner_init(grpc_call_combiner* call_combiner); + +void grpc_call_combiner_destroy(grpc_call_combiner* call_combiner); + +#ifndef NDEBUG +#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \ + reason) \ + grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \ + __FILE__, __LINE__, (reason)) +#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \ + grpc_call_combiner_stop((exec_ctx), (call_combiner), __FILE__, __LINE__, \ + (reason)) +/// Starts processing \a closure on \a call_combiner. +void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner, + grpc_closure* closure, grpc_error* error, + const char* file, int line, const char* reason); +/// Yields the call combiner to the next closure in the queue, if any. +void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner, + const char* file, int line, const char* reason); +#else +#define GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, closure, error, \ + reason) \ + grpc_call_combiner_start((exec_ctx), (call_combiner), (closure), (error), \ + (reason)) +#define GRPC_CALL_COMBINER_STOP(exec_ctx, call_combiner, reason) \ + grpc_call_combiner_stop((exec_ctx), (call_combiner), (reason)) +/// Starts processing \a closure on \a call_combiner. +void grpc_call_combiner_start(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner, + grpc_closure* closure, grpc_error* error, + const char* reason); +/// Yields the call combiner to the next closure in the queue, if any. +void grpc_call_combiner_stop(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner, + const char* reason); +#endif + +/// Tells \a call_combiner to invoke \a closure when +/// grpc_call_combiner_cancel() is called. If grpc_call_combiner_cancel() +/// was previously called, \a closure will be invoked immediately. +/// If \a closure is NULL, then no closure will be invoked on +/// cancellation; this effectively unregisters the previously set closure. +void grpc_call_combiner_set_notify_on_cancel(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner, + grpc_closure* closure); + +/// Indicates that the call has been cancelled. +void grpc_call_combiner_cancel(grpc_exec_ctx* exec_ctx, + grpc_call_combiner* call_combiner, + grpc_error* error); + +#endif /* GRPC_CORE_LIB_IOMGR_CALL_COMBINER_H */ diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 531a88434f..e3f0163a6c 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -39,6 +39,7 @@ /* We can have a per-call credentials. */ typedef struct { + grpc_call_combiner *call_combiner; grpc_call_credentials *creds; bool have_host; bool have_method; @@ -49,17 +50,11 @@ typedef struct { pollset_set so that work can progress when this call wants work to progress */ grpc_polling_entity *pollent; - gpr_atm security_context_set; - gpr_mu security_context_mu; grpc_credentials_mdelem_array md_array; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; grpc_auth_metadata_context auth_md_context; - grpc_closure closure; - // Either 0 (no cancellation and no async operation in flight), - // a grpc_closure* (if the lowest bit is 0), - // or a grpc_error* (if the lowest bit is 1). - gpr_atm cancellation_state; - grpc_closure cancel_closure; + grpc_closure async_cancel_closure; + grpc_closure async_result_closure; } call_data; /* We can have a per-channel credentials. */ @@ -68,43 +63,6 @@ typedef struct { grpc_auth_context *auth_context; } channel_data; -static void decode_cancel_state(gpr_atm cancel_state, grpc_closure **func, - grpc_error **error) { - // If the lowest bit is 1, the value is a grpc_error*. - // Otherwise, if non-zdero, the value is a grpc_closure*. - if (cancel_state & 1) { - *error = (grpc_error *)(cancel_state & ~(gpr_atm)1); - } else if (cancel_state != 0) { - *func = (grpc_closure *)cancel_state; - } -} - -static gpr_atm encode_cancel_state_error(grpc_error *error) { - // Set the lowest bit to 1 to indicate that it's an error. - return (gpr_atm)1 | (gpr_atm)error; -} - -// Returns an error if the call has been cancelled. Otherwise, sets the -// cancellation function to be called upon cancellation. -static grpc_error *set_cancel_func(grpc_call_element *elem, - grpc_iomgr_cb_func func) { - call_data *calld = (call_data *)elem->call_data; - // Decode original state. - gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state); - grpc_error *original_error = GRPC_ERROR_NONE; - grpc_closure *original_func = NULL; - decode_cancel_state(original_state, &original_func, &original_error); - // If error is set, return it. - if (original_error != GRPC_ERROR_NONE) return GRPC_ERROR_REF(original_error); - // Otherwise, store func. - GRPC_CLOSURE_INIT(&calld->cancel_closure, func, elem, - grpc_schedule_on_exec_ctx); - GPR_ASSERT(((gpr_atm)&calld->cancel_closure & (gpr_atm)1) == 0); - gpr_atm_rel_store(&calld->cancellation_state, - (gpr_atm)&calld->cancel_closure); - return GRPC_ERROR_NONE; -} - static void reset_auth_metadata_context( grpc_auth_metadata_context *auth_md_context) { if (auth_md_context->service_url != NULL) { @@ -135,6 +93,7 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg; grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL); reset_auth_metadata_context(&calld->auth_md_context); grpc_error *error = GRPC_ERROR_REF(input_error); if (error == GRPC_ERROR_NONE) { @@ -153,7 +112,8 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *arg, } else { error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error, + calld->call_combiner); } } @@ -223,7 +183,8 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Incompatible credentials set on channel and call."), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED)); + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED), + calld->call_combiner); return; } } else { @@ -234,22 +195,23 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, build_auth_metadata_context(&chand->security_connector->base, chand->auth_context, calld); - grpc_error *cancel_error = set_cancel_func(elem, cancel_get_request_metadata); - if (cancel_error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, - cancel_error); - return; - } GPR_ASSERT(calld->pollent != NULL); - GRPC_CLOSURE_INIT(&calld->closure, on_credentials_metadata, batch, - grpc_schedule_on_exec_ctx); + + GRPC_CLOSURE_INIT(&calld->async_result_closure, on_credentials_metadata, + batch, grpc_schedule_on_exec_ctx); grpc_error *error = GRPC_ERROR_NONE; if (grpc_call_credentials_get_request_metadata( exec_ctx, calld->creds, calld->pollent, calld->auth_md_context, - &calld->md_array, &calld->closure, &error)) { + &calld->md_array, &calld->async_result_closure, &error)) { // Synchronous return; invoke on_credentials_metadata() directly. on_credentials_metadata(exec_ctx, batch, error); GRPC_ERROR_UNREF(error); + } else { + // Async return; register cancellation closure with call combiner. + GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_get_request_metadata, + elem, grpc_schedule_on_exec_ctx); + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, + &calld->async_cancel_closure); } } @@ -258,7 +220,7 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg, grpc_transport_stream_op_batch *batch = (grpc_transport_stream_op_batch *)arg; grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; - + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL); if (error == GRPC_ERROR_NONE) { send_security_metadata(exec_ctx, elem, batch); } else { @@ -271,7 +233,8 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *arg, exec_ctx, batch, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg), GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAUTHENTICATED)); + GRPC_STATUS_UNAUTHENTICATED), + calld->call_combiner); gpr_free(error_msg); } } @@ -282,7 +245,7 @@ static void cancel_check_call_host(grpc_exec_ctx *exec_ctx, void *arg, call_data *calld = (call_data *)elem->call_data; channel_data *chand = (channel_data *)elem->channel_data; grpc_channel_security_connector_cancel_check_call_host( - exec_ctx, chand->security_connector, &calld->closure, + exec_ctx, chand->security_connector, &calld->async_result_closure, GRPC_ERROR_REF(error)); } @@ -295,52 +258,19 @@ static void auth_start_transport_stream_op_batch( call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - if (batch->cancel_stream) { - while (true) { - // Decode the original cancellation state. - gpr_atm original_state = gpr_atm_acq_load(&calld->cancellation_state); - grpc_error *cancel_error = GRPC_ERROR_NONE; - grpc_closure *func = NULL; - decode_cancel_state(original_state, &func, &cancel_error); - // If we had already set a cancellation error, there's nothing - // more to do. - if (cancel_error != GRPC_ERROR_NONE) break; - // If there's a cancel func, call it. - // Note that even if the cancel func has been changed by some - // other thread between when we decoded it and now, it will just - // be a no-op. - cancel_error = GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); - if (func != NULL) { - GRPC_CLOSURE_SCHED(exec_ctx, func, GRPC_ERROR_REF(cancel_error)); - } - // Encode the new error into cancellation state. - if (gpr_atm_full_cas(&calld->cancellation_state, original_state, - encode_cancel_state_error(cancel_error))) { - break; // Success. - } - // The cas failed, so try again. - } - } else { - /* double checked lock over security context to ensure it's set once */ - if (gpr_atm_acq_load(&calld->security_context_set) == 0) { - gpr_mu_lock(&calld->security_context_mu); - if (gpr_atm_acq_load(&calld->security_context_set) == 0) { - GPR_ASSERT(batch->payload->context != NULL); - if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { - batch->payload->context[GRPC_CONTEXT_SECURITY].value = - grpc_client_security_context_create(); - batch->payload->context[GRPC_CONTEXT_SECURITY].destroy = - grpc_client_security_context_destroy; - } - grpc_client_security_context *sec_ctx = - batch->payload->context[GRPC_CONTEXT_SECURITY].value; - GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); - sec_ctx->auth_context = - GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); - gpr_atm_rel_store(&calld->security_context_set, 1); - } - gpr_mu_unlock(&calld->security_context_mu); + if (!batch->cancel_stream) { + GPR_ASSERT(batch->payload->context != NULL); + if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + batch->payload->context[GRPC_CONTEXT_SECURITY].value = + grpc_client_security_context_create(); + batch->payload->context[GRPC_CONTEXT_SECURITY].destroy = + grpc_client_security_context_destroy; } + grpc_client_security_context *sec_ctx = + batch->payload->context[GRPC_CONTEXT_SECURITY].value; + GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); + sec_ctx->auth_context = + GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); } if (batch->send_initial_metadata) { @@ -365,26 +295,25 @@ static void auth_start_transport_stream_op_batch( } } if (calld->have_host) { - grpc_error *cancel_error = set_cancel_func(elem, cancel_check_call_host); - if (cancel_error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, - cancel_error); + batch->handler_private.extra_arg = elem; + GRPC_CLOSURE_INIT(&calld->async_result_closure, on_host_checked, batch, + grpc_schedule_on_exec_ctx); + char *call_host = grpc_slice_to_c_string(calld->host); + grpc_error *error = GRPC_ERROR_NONE; + if (grpc_channel_security_connector_check_call_host( + exec_ctx, chand->security_connector, call_host, + chand->auth_context, &calld->async_result_closure, &error)) { + // Synchronous return; invoke on_host_checked() directly. + on_host_checked(exec_ctx, batch, error); + GRPC_ERROR_UNREF(error); } else { - char *call_host = grpc_slice_to_c_string(calld->host); - batch->handler_private.extra_arg = elem; - grpc_error *error = GRPC_ERROR_NONE; - if (grpc_channel_security_connector_check_call_host( - exec_ctx, chand->security_connector, call_host, - chand->auth_context, - GRPC_CLOSURE_INIT(&calld->closure, on_host_checked, batch, - grpc_schedule_on_exec_ctx), - &error)) { - // Synchronous return; invoke on_host_checked() directly. - on_host_checked(exec_ctx, batch, error); - GRPC_ERROR_UNREF(error); - } - gpr_free(call_host); + // Async return; register cancellation closure with call combiner. + GRPC_CLOSURE_INIT(&calld->async_cancel_closure, cancel_check_call_host, + elem, grpc_schedule_on_exec_ctx); + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, + &calld->async_cancel_closure); } + gpr_free(call_host); GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); return; /* early exit */ } @@ -400,8 +329,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { call_data *calld = elem->call_data; - memset(calld, 0, sizeof(*calld)); - gpr_mu_init(&calld->security_context_mu); + calld->call_combiner = args->call_combiner; return GRPC_ERROR_NONE; } @@ -426,12 +354,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_slice_unref_internal(exec_ctx, calld->method); } reset_auth_metadata_context(&calld->auth_md_context); - gpr_mu_destroy(&calld->security_context_mu); - gpr_atm cancel_state = gpr_atm_acq_load(&calld->cancellation_state); - grpc_error *cancel_error = GRPC_ERROR_NONE; - grpc_closure *cancel_func = NULL; - decode_cancel_state(cancel_state, &cancel_func, &cancel_error); - GRPC_ERROR_UNREF(cancel_error); } /* Constructor for channel_data */ @@ -490,6 +412,5 @@ const grpc_channel_filter grpc_client_auth_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"}; diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 9bf3f0ca0f..b721ce4a22 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -26,7 +26,15 @@ #include "src/core/lib/security/transport/auth_filters.h" #include "src/core/lib/slice/slice_internal.h" +typedef enum { + STATE_INIT = 0, + STATE_DONE, + STATE_CANCELLED, +} async_state; + typedef struct call_data { + grpc_call_combiner *call_combiner; + grpc_call_stack *owning_call; grpc_transport_stream_op_batch *recv_initial_metadata_batch; grpc_closure *original_recv_initial_metadata_ready; grpc_closure recv_initial_metadata_ready; @@ -34,6 +42,8 @@ typedef struct call_data { const grpc_metadata *consumed_md; size_t num_consumed_md; grpc_auth_context *auth_context; + grpc_closure cancel_closure; + gpr_atm state; // async_state } call_data; typedef struct channel_data { @@ -78,54 +88,92 @@ static grpc_filtered_mdelem remove_consumed_md(grpc_exec_ctx *exec_ctx, return GRPC_FILTERED_MDELEM(md); } -/* called from application code */ -static void on_md_processing_done( - void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, - const grpc_metadata *response_md, size_t num_response_md, - grpc_status_code status, const char *error_details) { - grpc_call_element *elem = user_data; +static void on_md_processing_done_inner(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_metadata *consumed_md, + size_t num_consumed_md, + const grpc_metadata *response_md, + size_t num_response_md, + grpc_error *error) { call_data *calld = elem->call_data; grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, NULL); /* TODO(jboeuf): Implement support for response_md. */ if (response_md != NULL && num_response_md > 0) { gpr_log(GPR_INFO, "response_md in auth metadata processing not supported for now. " "Ignoring..."); } - grpc_error *error = GRPC_ERROR_NONE; - if (status == GRPC_STATUS_OK) { + if (error == GRPC_ERROR_NONE) { calld->consumed_md = consumed_md; calld->num_consumed_md = num_consumed_md; error = grpc_metadata_batch_filter( - &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, + exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, remove_consumed_md, elem, "Response metadata filtering error"); - } else { - if (error_details == NULL) { - error_details = "Authentication metadata processing failed."; + } + GRPC_CLOSURE_SCHED(exec_ctx, calld->original_recv_initial_metadata_ready, + error); +} + +// Called from application code. +static void on_md_processing_done( + void *user_data, const grpc_metadata *consumed_md, size_t num_consumed_md, + const grpc_metadata *response_md, size_t num_response_md, + grpc_status_code status, const char *error_details) { + grpc_call_element *elem = user_data; + call_data *calld = elem->call_data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + // If the call was not cancelled while we were in flight, process the result. + if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, + (gpr_atm)STATE_DONE)) { + grpc_error *error = GRPC_ERROR_NONE; + if (status != GRPC_STATUS_OK) { + if (error_details == NULL) { + error_details = "Authentication metadata processing failed."; + } + error = grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status); } - error = - grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status); + on_md_processing_done_inner(&exec_ctx, elem, consumed_md, num_consumed_md, + response_md, num_response_md, error); } + // Clean up. for (size_t i = 0; i < calld->md.count; i++) { grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); } grpc_metadata_array_destroy(&calld->md); - GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready, - error); + GRPC_CALL_STACK_UNREF(&exec_ctx, calld->owning_call, "server_auth_metadata"); grpc_exec_ctx_finish(&exec_ctx); } +static void cancel_call(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = elem->call_data; + // If the result was not already processed, invoke the callback now. + if (gpr_atm_full_cas(&calld->state, (gpr_atm)STATE_INIT, + (gpr_atm)STATE_CANCELLED)) { + on_md_processing_done_inner(exec_ctx, elem, NULL, 0, NULL, 0, + GRPC_ERROR_REF(error)); + } +} + static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_call_element *elem = arg; + grpc_call_element *elem = (grpc_call_element *)arg; channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; if (error == GRPC_ERROR_NONE) { if (chand->creds != NULL && chand->creds->processor.process != NULL) { + // We're calling out to the application, so we need to make sure + // to drop the call combiner early if we get cancelled. + GRPC_CLOSURE_INIT(&calld->cancel_closure, cancel_call, elem, + grpc_schedule_on_exec_ctx); + grpc_call_combiner_set_notify_on_cancel(exec_ctx, calld->call_combiner, + &calld->cancel_closure); + GRPC_CALL_STACK_REF(calld->owning_call, "server_auth_metadata"); calld->md = metadata_batch_to_md_array( batch->payload->recv_initial_metadata.recv_initial_metadata); chand->creds->processor.process( @@ -159,6 +207,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_element_args *args) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; + calld->call_combiner = args->call_combiner; + calld->owning_call = args->call_stack; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); @@ -218,6 +268,5 @@ const grpc_channel_filter grpc_server_auth_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "server-auth"}; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index e68c201134..03eaaf99ac 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -122,6 +122,7 @@ typedef struct batch_control { bool is_closure; } notify_tag; } completion_data; + grpc_closure start_batch; grpc_closure finish_batch; gpr_refcount steps_to_complete; @@ -151,6 +152,7 @@ typedef struct { struct grpc_call { gpr_refcount ext_ref; gpr_arena *arena; + grpc_call_combiner call_combiner; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; @@ -184,6 +186,11 @@ struct grpc_call { Element 0 is initial metadata, element 1 is trailing metadata. */ grpc_metadata_array *buffered_metadata[2]; + grpc_metadata compression_md; + + // A char* indicating the peer name. + gpr_atm peer_string; + /* Packed received call statuses from various sources */ gpr_atm status[STATUS_SOURCE_COUNT]; @@ -262,8 +269,9 @@ grpc_tracer_flag grpc_compression_trace = #define CALL_FROM_TOP_ELEM(top_elem) \ CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem)) -static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op_batch *op); +static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op_batch *op, + grpc_closure *start_batch_closure); static void cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_status_code status, const char *description); @@ -328,6 +336,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, sizeof(grpc_call) + channel_stack->call_stack_size); gpr_ref_init(&call->ext_ref, 1); call->arena = arena; + grpc_call_combiner_init(&call->call_combiner); *out_call = call; call->channel = args->channel; call->cq = args->cq; @@ -436,7 +445,8 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, .path = path, .start_time = call->start_time, .deadline = send_deadline, - .arena = call->arena}; + .arena = call->arena, + .call_combiner = &call->call_combiner}; add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, destroy_call, call, &call_args)); if (error != GRPC_ERROR_NONE) { @@ -503,6 +513,8 @@ static void release_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_call *c = call; grpc_channel *channel = c->channel; + grpc_call_combiner_destroy(&c->call_combiner); + gpr_free((char *)c->peer_string); grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); } @@ -602,30 +614,37 @@ grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) { return GRPC_CALL_OK; } -static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call, - grpc_transport_stream_op_batch *op) { - grpc_call_element *elem; - - GPR_TIMER_BEGIN("execute_op", 0); - elem = CALL_ELEM_FROM_CALL(call, 0); - elem->filter->start_transport_stream_op_batch(exec_ctx, elem, op); - GPR_TIMER_END("execute_op", 0); +// This is called via the call combiner to start sending a batch down +// the filter stack. +static void execute_batch_in_call_combiner(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *ignored) { + grpc_transport_stream_op_batch *batch = arg; + grpc_call *call = batch->handler_private.extra_arg; + GPR_TIMER_BEGIN("execute_batch", 0); + grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); + GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); + elem->filter->start_transport_stream_op_batch(exec_ctx, elem, batch); + GPR_TIMER_END("execute_batch", 0); +} + +// start_batch_closure points to a caller-allocated closure to be used +// for entering the call combiner. +static void execute_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, + grpc_transport_stream_op_batch *batch, + grpc_closure *start_batch_closure) { + batch->handler_private.extra_arg = call; + GRPC_CLOSURE_INIT(start_batch_closure, execute_batch_in_call_combiner, batch, + grpc_schedule_on_exec_ctx); + GRPC_CALL_COMBINER_START(exec_ctx, &call->call_combiner, start_batch_closure, + GRPC_ERROR_NONE, "executing batch"); } char *grpc_call_get_peer(grpc_call *call) { - grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0); - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - char *result; - GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call)); - result = elem->filter->get_peer(&exec_ctx, elem); - if (result == NULL) { - result = grpc_channel_get_target(call->channel); - } - if (result == NULL) { - result = gpr_strdup("unknown"); - } - grpc_exec_ctx_finish(&exec_ctx); - return result; + char *peer_string = (char *)gpr_atm_acq_load(&call->peer_string); + if (peer_string != NULL) return gpr_strdup(peer_string); + peer_string = grpc_channel_get_target(call->channel); + if (peer_string != NULL) return peer_string; + return gpr_strdup("unknown"); } grpc_call *grpc_call_from_top_element(grpc_call_element *elem) { @@ -652,20 +671,41 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, return GRPC_CALL_OK; } -static void done_termination(grpc_exec_ctx *exec_ctx, void *call, +typedef struct { + grpc_call *call; + grpc_closure start_batch; + grpc_closure finish_batch; +} cancel_state; + +// The on_complete callback used when sending a cancel_stream batch down +// the filter stack. Yields the call combiner when the batch is done. +static void done_termination(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination"); + cancel_state *state = (cancel_state *)arg; + GRPC_CALL_COMBINER_STOP(exec_ctx, &state->call->call_combiner, + "on_complete for cancel_stream op"); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, state->call, "termination"); + gpr_free(state); } static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { GRPC_CALL_INTERNAL_REF(c, "termination"); + // Inform the call combiner of the cancellation, so that it can cancel + // any in-flight asynchronous actions that may be holding the call + // combiner. This ensures that the cancel_stream batch can be sent + // down the filter stack in a timely manner. + grpc_call_combiner_cancel(exec_ctx, &c->call_combiner, GRPC_ERROR_REF(error)); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op( - GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx)); + cancel_state *state = (cancel_state *)gpr_malloc(sizeof(*state)); + state->call = c; + GRPC_CLOSURE_INIT(&state->finish_batch, done_termination, state, + grpc_schedule_on_exec_ctx); + grpc_transport_stream_op_batch *op = + grpc_make_transport_stream_op(&state->finish_batch); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; - execute_op(exec_ctx, c, op); + execute_batch(exec_ctx, c, op, &state->start_batch); } static grpc_error *error_from_status(grpc_status_code status, @@ -1431,6 +1471,18 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } +// The recv_message_ready callback used when sending a batch containing +// a recv_message op down the filter stack. Yields the call combiner +// before processing the received message. +static void receiving_stream_ready_in_call_combiner(grpc_exec_ctx *exec_ctx, + void *bctlp, + grpc_error *error) { + batch_control *bctl = bctlp; + grpc_call *call = bctl->call; + GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "recv_message_ready"); + receiving_stream_ready(exec_ctx, bctlp, error); +} + static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, batch_control *bctl) { grpc_call *call = bctl->call; @@ -1537,6 +1589,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, batch_control *bctl = bctlp; grpc_call *call = bctl->call; + GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, + "recv_initial_metadata_ready"); + add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = @@ -1590,7 +1645,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; - + grpc_call *call = bctl->call; + GRPC_CALL_COMBINER_STOP(exec_ctx, &call->call_combiner, "on_complete"); add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); finish_batch_step(exec_ctx, bctl); } @@ -1610,9 +1666,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, int num_completion_callbacks_needed = 1; grpc_call_error error = GRPC_CALL_OK; - // sent_initial_metadata guards against variable reuse. - grpc_metadata compression_md; - GPR_TIMER_BEGIN("grpc_call_start_batch", 0); GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag); @@ -1660,7 +1713,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done_with_error; } /* process compression level */ - memset(&compression_md, 0, sizeof(compression_md)); + memset(&call->compression_md, 0, sizeof(call->compression_md)); size_t additional_metadata_count = 0; grpc_compression_level effective_compression_level = GRPC_COMPRESS_LEVEL_NONE; @@ -1698,9 +1751,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, const grpc_stream_compression_algorithm calgo = stream_compression_algorithm_for_level_locked( call, effective_stream_compression_level); - compression_md.key = + call->compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_STREAM_ENCODING_REQUEST; - compression_md.value = + call->compression_md.value = grpc_stream_compression_algorithm_slice(calgo); } else { const grpc_compression_algorithm calgo = @@ -1708,8 +1761,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call, effective_compression_level); /* the following will be picked up by the compress filter and used * as the call's compression algorithm. */ - compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; - compression_md.value = grpc_compression_algorithm_slice(calgo); + call->compression_md.key = + GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST; + call->compression_md.value = + grpc_compression_algorithm_slice(calgo); additional_metadata_count++; } } @@ -1724,7 +1779,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, if (!prepare_application_metadata( exec_ctx, call, (int)op->data.send_initial_metadata.count, op->data.send_initial_metadata.metadata, 0, call->is_client, - &compression_md, (int)additional_metadata_count)) { + &call->compression_md, (int)additional_metadata_count)) { error = GRPC_CALL_ERROR_INVALID_METADATA; goto done_with_error; } @@ -1736,6 +1791,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->send_initial_metadata.send_initial_metadata_flags = op->flags; + if (call->is_client) { + stream_op_payload->send_initial_metadata.peer_string = + &call->peer_string; + } break; case GRPC_OP_SEND_MESSAGE: if (!are_write_flags_valid(op->flags)) { @@ -1868,6 +1927,10 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */]; stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready = &call->receiving_initial_metadata_ready; + if (!call->is_client) { + stream_op_payload->recv_initial_metadata.peer_string = + &call->peer_string; + } num_completion_callbacks_needed++; break; case GRPC_OP_RECV_MESSAGE: @@ -1884,8 +1947,9 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->recv_message = true; call->receiving_buffer = op->data.recv_message.recv_message; stream_op_payload->recv_message.recv_message = &call->receiving_stream; - GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready, - bctl, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&call->receiving_stream_ready, + receiving_stream_ready_in_call_combiner, bctl, + grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; num_completion_callbacks_needed++; @@ -1955,7 +2019,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); - execute_op(exec_ctx, call, stream_op); + execute_batch(exec_ctx, call, stream_op, &bctl->start_batch); done: GPR_TIMER_END("grpc_call_start_batch", 0); diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 898476daee..280315036f 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -31,6 +31,7 @@ #include "src/core/lib/debug/stats.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" @@ -129,6 +130,7 @@ void grpc_init(void) { grpc_register_tracer(&grpc_trace_channel_stack_builder); grpc_register_tracer(&grpc_http1_trace); grpc_register_tracer(&grpc_cq_pluck_trace); // default on + grpc_register_tracer(&grpc_call_combiner_trace); grpc_register_tracer(&grpc_combiner_trace); grpc_register_tracer(&grpc_server_channel_trace); grpc_register_tracer(&grpc_bdp_estimator_trace); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index a0791080a9..6286f9159d 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -40,6 +40,7 @@ namespace grpc_core { namespace { struct CallData { + grpc_call_combiner *call_combiner; grpc_linked_mdelem status; grpc_linked_mdelem details; grpc_core::atomic filled_metadata; @@ -52,14 +53,14 @@ struct ChannelData { static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *mdb) { - CallData *calld = static_cast(elem->call_data); + CallData *calld = reinterpret_cast(elem->call_data); bool expected = false; if (!calld->filled_metadata.compare_exchange_strong( expected, true, grpc_core::memory_order_relaxed, grpc_core::memory_order_relaxed)) { return; } - ChannelData *chand = static_cast(elem->channel_data); + ChannelData *chand = reinterpret_cast(elem->channel_data); char tmp[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(chand->error_code, tmp); calld->status.md = grpc_mdelem_from_slices( @@ -79,6 +80,7 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void lame_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { + CallData *calld = reinterpret_cast(elem->call_data); if (op->recv_initial_metadata) { fill_metadata(exec_ctx, elem, op->payload->recv_initial_metadata.recv_initial_metadata); @@ -87,12 +89,8 @@ static void lame_start_transport_stream_op_batch( op->payload->recv_trailing_metadata.recv_trailing_metadata); } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); -} - -static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - return NULL; + exec_ctx, op, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel"), + calld->call_combiner); } static void lame_get_channel_info(grpc_exec_ctx *exec_ctx, @@ -122,6 +120,8 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { + CallData *calld = reinterpret_cast(elem->call_data); + calld->call_combiner = args->call_combiner; return GRPC_ERROR_NONE; } @@ -156,7 +156,6 @@ extern "C" const grpc_channel_filter grpc_lame_filter = { sizeof(grpc_core::ChannelData), grpc_core::init_channel_elem, grpc_core::destroy_channel_elem, - grpc_core::lame_get_peer, grpc_core::lame_get_channel_info, "lame-client", }; @@ -176,7 +175,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, "error_message=%s)", 3, (target, (int)error_code, error_message)); GPR_ASSERT(elem->filter == &grpc_lame_filter); - auto chand = static_cast(elem->channel_data); + auto chand = reinterpret_cast(elem->channel_data); chand->error_code = error_code; chand->error_message = error_message; grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 66dcc299aa..8582d826ca 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -789,7 +789,6 @@ static void server_mutate_op(grpc_call_element *elem, static void server_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); server_mutate_op(elem, op); grpc_call_next_op(exec_ctx, elem, op); } @@ -962,7 +961,6 @@ const grpc_channel_filter grpc_server_top_filter = { sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "server", }; diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 6c61f4b8d9..650b0559aa 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -197,11 +197,6 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, then_schedule_closure); } -char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, - grpc_transport *transport) { - return transport->vtable->get_peer(exec_ctx, transport); -} - grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *transport) { return transport->vtable->get_endpoint(exec_ctx, transport); @@ -214,24 +209,24 @@ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, // is a function that must always unref cancel_error // though it lives in lib, it handles transport stream ops sure // it's grpc_transport_stream_op_batch_finish_with_failure - void grpc_transport_stream_op_batch_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *batch, - grpc_error *error) { + grpc_error *error, grpc_call_combiner *call_combiner) { if (batch->send_message) { grpc_byte_stream_destroy(exec_ctx, batch->payload->send_message.send_message); } if (batch->recv_message) { - GRPC_CLOSURE_SCHED(exec_ctx, - batch->payload->recv_message.recv_message_ready, - GRPC_ERROR_REF(error)); + GRPC_CALL_COMBINER_START(exec_ctx, call_combiner, + batch->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error), + "failing recv_message_ready"); } if (batch->recv_initial_metadata) { - GRPC_CLOSURE_SCHED( - exec_ctx, + GRPC_CALL_COMBINER_START( + exec_ctx, call_combiner, batch->payload->recv_initial_metadata.recv_initial_metadata_ready, - GRPC_ERROR_REF(error)); + GRPC_ERROR_REF(error), "failing recv_initial_metadata_ready"); } GRPC_CLOSURE_SCHED(exec_ctx, batch->on_complete, error); if (batch->cancel_stream) { diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index 099138ea14..fbf5dcb8b5 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -22,6 +22,7 @@ #include #include "src/core/lib/channel/context.h" +#include "src/core/lib/iomgr/call_combiner.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/pollset.h" @@ -152,6 +153,9 @@ struct grpc_transport_stream_op_batch_payload { /** Iff send_initial_metadata != NULL, flags associated with send_initial_metadata: a bitfield of GRPC_INITIAL_METADATA_xxx */ uint32_t send_initial_metadata_flags; + // If non-NULL, will be set by the transport to the peer string + // (a char*, which the caller takes ownership of). + gpr_atm *peer_string; } send_initial_metadata; struct { @@ -176,6 +180,9 @@ struct grpc_transport_stream_op_batch_payload { // immediately available. This may be a signal that we received a // Trailers-Only response. bool *trailing_metadata_available; + // If non-NULL, will be set by the transport to the peer string + // (a char*, which the caller takes ownership of). + gpr_atm *peer_string; } recv_initial_metadata; struct { @@ -293,7 +300,7 @@ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_batch_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, - grpc_error *error); + grpc_error *error, grpc_call_combiner *call_combiner); char *grpc_transport_stream_op_batch_string(grpc_transport_stream_op_batch *op); char *grpc_transport_op_string(grpc_transport_op *op); @@ -332,10 +339,6 @@ void grpc_transport_close(grpc_transport *transport); /* Destroy the transport */ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, grpc_transport *transport); -/* Get the transports peer */ -char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, - grpc_transport *transport); - /* Get the endpoint used by \a transport */ grpc_endpoint *grpc_transport_get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *transport); diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index fc772c6dd1..bbae69c223 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -59,9 +59,6 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self); - /* implementation of grpc_transport_get_peer */ - char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_transport *self); - /* implementation of grpc_transport_get_endpoint */ grpc_endpoint *(*get_endpoint)(grpc_exec_ctx *exec_ctx, grpc_transport *self); } grpc_transport_vtable; diff --git a/src/core/lib/transport/transport_op_string.c b/src/core/lib/transport/transport_op_string.c index 7b18229ba6..409a6c4103 100644 --- a/src/core/lib/transport/transport_op_string.c +++ b/src/core/lib/transport/transport_op_string.c @@ -112,6 +112,13 @@ char *grpc_transport_stream_op_batch_string( gpr_strvec_add(&b, tmp); } + if (op->collect_stats) { + gpr_strvec_add(&b, gpr_strdup(" ")); + gpr_asprintf(&tmp, "COLLECT_STATS:%p", + op->payload->collect_stats.collect_stats); + gpr_strvec_add(&b, tmp); + } + out = gpr_strvec_flatten(&b, NULL); gpr_strvec_destroy(&b); -- cgit v1.2.3