diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 28 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel.c | 5 | ||||
-rw-r--r-- | src/core/ext/client_config/subchannel.h | 3 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.c | 13 | ||||
-rw-r--r-- | src/core/lib/channel/channel_stack.h | 13 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.c | 58 | ||||
-rw-r--r-- | src/core/lib/channel/deadline_filter.h | 22 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 72 |
8 files changed, 134 insertions, 80 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 5a8030b23e..76c2f38a5d 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -385,6 +385,9 @@ typedef struct client_channel_call_data { // stack and each has its own mutex. If/when we have time, find a way // to avoid this without breaking the grpc_deadline_state abstraction. grpc_deadline_state deadline_state; + gpr_timespec deadline; + + grpc_error *cancel_error; /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ @@ -482,7 +485,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, } else { grpc_subchannel_call *subchannel_call = NULL; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); @@ -627,8 +630,8 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call = GET_CALL(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -644,8 +647,8 @@ retry: call = GET_CALL(calld); if (call == CANCELLED_CALL) { gpr_mu_unlock(&calld->mu); - grpc_transport_stream_op_finish_with_failure(exec_ctx, op, - GRPC_ERROR_CANCELLED); + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); GPR_TIMER_END("cc_start_transport_stream_op", 0); return; } @@ -661,6 +664,12 @@ retry: (gpr_atm)(uintptr_t)CANCELLED_CALL)) { goto retry; } else { + // Stash a copy of cancel_error in our call data, so that we can use + // it for subsequent operations. This ensures that if the call is + // cancelled before any ops are passed down (e.g., if the deadline + // is in the past when the call starts), we can return the right + // error to the caller when the first op does get passed down. + calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); @@ -697,7 +706,7 @@ retry: calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, + exec_ctx, calld->connected_subchannel, calld->pollent, calld->deadline, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; @@ -720,7 +729,9 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_call_element_args *args) { call_data *calld = elem->call_data; - grpc_deadline_state_init(&calld->deadline_state, args->call_stack); + grpc_deadline_state_init(exec_ctx, elem, args); + calld->deadline = args->deadline; + calld->cancel_error = GRPC_ERROR_NONE; gpr_atm_rel_store(&calld->subchannel_call, 0); gpr_mu_init(&calld->mu); calld->connected_subchannel = NULL; @@ -739,7 +750,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, const grpc_call_final_info *final_info, void *and_free_memory) { call_data *calld = elem->call_data; - grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state); + grpc_deadline_state_destroy(exec_ctx, elem); + GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 456cc44635..8f4a2f9e3e 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -706,14 +706,15 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_subchannel_call **call) { + grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. grpc_error *error = grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, callstk); + NULL, NULL, deadline, callstk); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h index ae1d96e640..763ff85757 100644 --- a/src/core/ext/client_config/subchannel.h +++ b/src/core/ext/client_config/subchannel.h @@ -110,7 +110,8 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, /** construct a subchannel call */ grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call); + grpc_polling_entity *pollent, gpr_timespec deadline, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index f5fa0b0390..98177f439b 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -157,13 +157,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, } } -grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, - void *destroy_arg, - grpc_call_context_element *context, - const void *transport_server_data, - grpc_call_stack *call_stack) { +grpc_error *grpc_call_stack_init( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, + grpc_call_context_element *context, const void *transport_server_data, + gpr_timespec deadline, grpc_call_stack *call_stack) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); grpc_call_element_args args; size_t count = channel_stack->count; @@ -184,6 +182,7 @@ grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, args.call_stack = call_stack; args.server_transport_data = transport_server_data; args.context = context; + args.deadline = deadline; call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index eeaab17d39..1cfe2885d8 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -74,6 +74,7 @@ typedef struct { grpc_call_stack *call_stack; const void *server_transport_data; grpc_call_context_element *context; + gpr_timespec deadline; } grpc_call_element_args; typedef struct { @@ -220,13 +221,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, /* Initialize a call stack given a channel stack. transport_server_data is expected to be NULL on a client, or an opaque transport owned pointer on the server. */ -grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, - grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, - void *destroy_arg, - grpc_call_context_element *context, - const void *transport_server_data, - grpc_call_stack *call_stack); +grpc_error *grpc_call_stack_init( + grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, + grpc_call_context_element *context, const void *transport_server_data, + gpr_timespec deadline, grpc_call_stack *call_stack); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 010fedd7b7..079b98a2f8 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -34,10 +34,12 @@ #include <stdbool.h> #include <string.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/timer.h" // @@ -106,15 +108,49 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state, op->on_complete = &deadline_state->on_complete; } -void grpc_deadline_state_init(grpc_deadline_state* deadline_state, - grpc_call_stack* call_stack) { +// Callback and associated state for starting the timer after call stack +// initialization has been completed. +struct start_timer_after_init_state { + grpc_call_element* elem; + gpr_timespec deadline; + grpc_closure closure; +}; +static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + struct start_timer_after_init_state* state = arg; + start_timer_if_needed(exec_ctx, state->elem, state->deadline); + gpr_free(state); +} + +void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_call_element_args* args) { + grpc_deadline_state* deadline_state = elem->call_data; memset(deadline_state, 0, sizeof(*deadline_state)); - deadline_state->call_stack = call_stack; + deadline_state->call_stack = args->call_stack; gpr_mu_init(&deadline_state->timer_mu); + // Deadline will always be infinite on servers, so the timer will only be + // set on clients with a finite deadline. + const gpr_timespec deadline = + gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); + if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) != 0) { + // When the deadline passes, we indicate the failure by sending down + // an op with cancel_error set. However, we can't send down any ops + // until after the call stack is fully initialized. If we start the + // timer here, we have no guarantee that the timer won't pop before + // call stack initialization is finished. To avoid that problem, we + // create a closure to start the timer, and we schedule that closure + // to be run after call stack initialization is done. + struct start_timer_after_init_state* state = gpr_malloc(sizeof(*state)); + state->elem = elem; + state->deadline = deadline; + grpc_closure_init(&state->closure, start_timer_after_init, state); + grpc_exec_ctx_sched(exec_ctx, &state->closure, GRPC_ERROR_NONE, NULL); + } } void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* deadline_state) { + grpc_call_element* elem) { + grpc_deadline_state* deadline_state = elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); gpr_mu_destroy(&deadline_state->timer_mu); } @@ -127,12 +163,6 @@ void grpc_deadline_state_client_start_transport_stream_op( op->close_error != GRPC_ERROR_NONE) { cancel_timer_if_needed(exec_ctx, deadline_state); } else { - // If we're sending initial metadata, get the deadline from the metadata - // and start the timer if needed. - if (op->send_initial_metadata != NULL) { - start_timer_if_needed(exec_ctx, elem, - op->send_initial_metadata->deadline); - } // Make sure we know when the call is complete, so that we can cancel // the timer. if (op->recv_trailing_metadata != NULL) { @@ -177,10 +207,9 @@ typedef struct server_call_data { static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_element_args* args) { - base_call_data* calld = elem->call_data; // Note: size of call data is different between client and server. - memset(calld, 0, elem->filter->sizeof_call_data); - grpc_deadline_state_init(&calld->deadline_state, args->call_stack); + memset(elem->call_data, 0, elem->filter->sizeof_call_data); + grpc_deadline_state_init(exec_ctx, elem, args); return GRPC_ERROR_NONE; } @@ -188,8 +217,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, void* and_free_memory) { - base_call_data* calld = elem->call_data; - grpc_deadline_state_destroy(exec_ctx, &calld->deadline_state); + grpc_deadline_state_destroy(exec_ctx, elem); } // Method for starting a call op for client filter. diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index a09f85afe6..685df87761 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -36,7 +36,7 @@ #include "src/core/lib/iomgr/timer.h" // State used for filters that enforce call deadlines. -// Should be the first field in the filter's call_data. +// Must be the first field in the filter's call_data. typedef struct grpc_deadline_state { // We take a reference to the call stack for the timer callback. grpc_call_stack* call_stack; @@ -54,16 +54,18 @@ typedef struct grpc_deadline_state { grpc_closure* next_on_complete; } grpc_deadline_state; -void grpc_deadline_state_init(grpc_deadline_state* call_data, - grpc_call_stack* call_stack); -void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, - grpc_deadline_state* call_data); - -// To be used in a filter's start_transport_stream_op() method to -// enforce call deadlines. -// It is the caller's responsibility to chain to the next filter if -// necessary after this function returns. +// To be used in a filter's init_call_elem(), destroy_call_elem(), and +// start_transport_stream_op() methods to enforce call deadlines. +// // REQUIRES: The first field in elem->call_data is a grpc_deadline_state. +// +// For grpc_deadline_state_client_start_transport_stream_op(), it is the +// caller's responsibility to chain to the next filter if necessary +// after the function returns. +void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, + grpc_call_element_args* args); +void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, + grpc_call_element* elem); void grpc_deadline_state_client_start_transport_stream_op( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op); diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 0ced42a571..f8b7c9bf90 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -254,34 +254,7 @@ grpc_call *grpc_call_create( } } send_deadline = gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC); - GRPC_CHANNEL_INTERNAL_REF(channel, "call"); - /* initial refcount dropped by grpc_call_destroy */ - grpc_error *error = grpc_call_stack_init( - &exec_ctx, channel_stack, 1, destroy_call, call, call->context, - server_transport_data, CALL_STACK_FROM_CALL(call)); - if (error != GRPC_ERROR_NONE) { - grpc_status_code status; - const char *error_str; - grpc_error_get_status(error, &status, &error_str); - close_with_status(&exec_ctx, call, status, error_str); - GRPC_ERROR_UNREF(error); - } - if (cq != NULL) { - GPR_ASSERT( - pollset_set_alternative == NULL && - "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL."); - GRPC_CQ_INTERNAL_REF(cq, "bind"); - call->pollent = - grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); - } - if (pollset_set_alternative != NULL) { - call->pollent = - grpc_polling_entity_create_from_pollset_set(pollset_set_alternative); - } - if (!grpc_polling_entity_is_empty(&call->pollent)) { - grpc_call_stack_set_pollset_or_pollset_set( - &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); - } + if (parent_call != NULL) { GRPC_CALL_INTERNAL_REF(parent_call, "child"); GPR_ASSERT(call->is_client); @@ -323,7 +296,38 @@ grpc_call *grpc_call_create( gpr_mu_unlock(&parent_call->mu); } + call->send_deadline = send_deadline; + + GRPC_CHANNEL_INTERNAL_REF(channel, "call"); + /* initial refcount dropped by grpc_call_destroy */ + grpc_error *error = grpc_call_stack_init( + &exec_ctx, channel_stack, 1, destroy_call, call, call->context, + server_transport_data, send_deadline, CALL_STACK_FROM_CALL(call)); + if (error != GRPC_ERROR_NONE) { + grpc_status_code status; + const char *error_str; + grpc_error_get_status(error, &status, &error_str); + close_with_status(&exec_ctx, call, status, error_str); + GRPC_ERROR_UNREF(error); + } + if (cq != NULL) { + GPR_ASSERT( + pollset_set_alternative == NULL && + "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL."); + GRPC_CQ_INTERNAL_REF(cq, "bind"); + call->pollent = + grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq)); + } + if (pollset_set_alternative != NULL) { + call->pollent = + grpc_polling_entity_create_from_pollset_set(pollset_set_alternative); + } + if (!grpc_polling_entity_is_empty(&call->pollent)) { + grpc_call_stack_set_pollset_or_pollset_set( + &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent); + } + grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_call_create", 0); return call; @@ -1220,8 +1224,8 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) != 0 && !call->is_client) { - call->send_deadline = gpr_convert_clock_type(md->deadline, - GPR_CLOCK_MONOTONIC); + call->send_deadline = + gpr_convert_clock_type(md->deadline, GPR_CLOCK_MONOTONIC); } } @@ -1250,6 +1254,14 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, GRPC_ERROR_REF(error); gpr_mu_lock(&call->mu); + + // If the error has an associated status code, set the call's status. + intptr_t status; + if (error != GRPC_ERROR_NONE && + grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status)) { + set_status_from_error(call, STATUS_FROM_CORE, error); + } + if (bctl->send_initial_metadata) { if (error != GRPC_ERROR_NONE) { set_status_from_error(call, STATUS_FROM_CORE, error); |