diff options
Diffstat (limited to 'src/core')
33 files changed, 894 insertions, 461 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index de516ab4c9..7add432589 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -52,6 +52,8 @@ /* Client channel implementation */ +grpc_tracer_flag grpc_client_channel_trace = GRPC_TRACER_INITIALIZER(false); + /************************************************************************* * METHOD-CONFIG TABLE */ @@ -241,6 +243,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_REF(error)); } } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand, + grpc_connectivity_state_name(state)); + } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, reason); } @@ -251,6 +257,10 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, grpc_connectivity_state publish_state = w->state; /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, + w->lb_policy, grpc_connectivity_state_name(w->state)); + } if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); @@ -263,7 +273,6 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state); } } - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } @@ -273,7 +282,6 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); - w->chand = chand; GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, grpc_combiner_scheduler(chand->combiner)); @@ -283,6 +291,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +static void start_resolving_locked(grpc_exec_ctx *exec_ctx, + channel_data *chand) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); + } + GPR_ASSERT(!chand->started_resolving); + chand->started_resolving = true; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); +} + typedef struct { char *server_name; grpc_server_retry_throttle_data *retry_throttle_data; @@ -345,8 +365,13 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, + grpc_error_string(error)); + } // Extract the following fields from the resolver result, if non-NULL. char *lb_policy_name = NULL; + bool lb_policy_name_changed = false; grpc_lb_policy *new_lb_policy = NULL; char *service_config_json = NULL; grpc_server_retry_throttle_data *retry_throttle_data = NULL; @@ -394,10 +419,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // taking a lock on chand->info_mu, because this function is the // only thing that modifies its value, and it can only be invoked // once at any given time. - const bool lb_policy_type_changed = + lb_policy_name_changed = chand->info_lb_policy_name == NULL || strcmp(chand->info_lb_policy_name, lb_policy_name) != 0; - if (chand->lb_policy != NULL && !lb_policy_type_changed) { + if (chand->lb_policy != NULL && !lb_policy_name_changed) { // Continue using the same LB policy. Update with new addresses. grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); } else { @@ -445,6 +470,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p: resolver result: lb_policy_name=\"%s\"%s, " + "service_config=\"%s\"", + chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "", + service_config_json); + } // Now swap out fields in chand. Note that the new values may still // be NULL if (e.g.) the resolver failed to return results or the // results did not contain the necessary data. @@ -479,6 +511,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, if (new_lb_policy != NULL || error != GRPC_ERROR_NONE || chand->resolver == NULL) { if (chand->lb_policy != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand, + chand->lb_policy); + } grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, chand->interested_parties); @@ -489,7 +525,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // Now that we've swapped out the relevant fields of chand, check for // error or shutdown. if (error != GRPC_ERROR_NONE || chand->resolver == NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand); + } if (chand->resolver != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand); + } grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; @@ -510,6 +552,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_error *state_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); if (new_lb_policy != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand); + } GRPC_ERROR_UNREF(state_error); state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy, &state_error); @@ -772,7 +817,9 @@ typedef struct client_channel_call_data { gpr_atm subchannel_call_or_error; gpr_arena *arena; - bool pick_pending; + grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending. + grpc_closure lb_pick_closure; + grpc_connected_subchannel *connected_subchannel; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; @@ -837,8 +884,15 @@ static void waiting_for_pick_batches_add_locked( } static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, - call_data *calld, + grpc_call_element *elem, grpc_error *error) { + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", + elem->channel_data, calld, calld->waiting_for_pick_batches_count, + grpc_error_string(error)); + } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error)); @@ -848,14 +902,21 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, } static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx, - call_data *calld) { + grpc_call_element *elem) { + call_data *calld = elem->call_data; if (calld->waiting_for_pick_batches_count == 0) return; call_or_error coe = get_call_or_error(calld); if (coe.error != GRPC_ERROR_NONE) { - waiting_for_pick_batches_fail_locked(exec_ctx, calld, + waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(coe.error)); return; } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR + " pending batches to subchannel_call=%p", + elem->channel_data, calld, calld->waiting_for_pick_batches_count, + coe.subchannel_call); + } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, calld->waiting_for_pick_batches[i]); @@ -869,6 +930,10 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", + chand, calld); + } if (chand->retry_throttle_data != NULL) { calld->retry_throttle_data = grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); @@ -895,7 +960,9 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, } static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, - call_data *calld, grpc_error *error) { + grpc_call_element *elem, + grpc_error *error) { + call_data *calld = elem->call_data; grpc_subchannel_call *subchannel_call = NULL; const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, @@ -906,13 +973,18 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", + elem->channel_data, calld, subchannel_call, + grpc_error_string(new_error)); + } GPR_ASSERT(set_call_or_error( calld, (call_or_error){.subchannel_call = subchannel_call})); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error); } else { - waiting_for_pick_batches_resume_locked(exec_ctx, calld); + waiting_for_pick_batches_resume_locked(exec_ctx, elem); } GRPC_ERROR_UNREF(error); } @@ -922,8 +994,6 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, grpc_error *error) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT(calld->pick_pending); - calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); call_or_error coe = get_call_or_error(calld); @@ -935,8 +1005,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, "Call dropped by load balancing policy") : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to create subchannel", &error, 1); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failed to create subchannel: error=%s", chand, + calld, grpc_error_string(failure)); + } set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure); } else if (coe.error != GRPC_ERROR_NONE) { /* already cancelled before subchannel became ready */ grpc_error *child_errors[] = {error, coe.error}; @@ -950,10 +1025,15 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED); } - waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: cancelled before subchannel became ready: %s", + chand, calld, grpc_error_string(cancellation_error)); + } + waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error); } else { /* Create call on subchannel. */ - create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); + create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); GRPC_ERROR_UNREF(error); @@ -983,41 +1063,77 @@ typedef struct { grpc_closure closure; } pick_after_resolver_result_args; -static void continue_picking_after_resolver_result_locked( - grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error) { pick_after_resolver_result_args *args = arg; if (args->cancelled) { /* cancelled, do nothing */ - } else if (error != GRPC_ERROR_NONE) { - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "call cancelled before resolver result"); + } } else { - if (pick_subchannel_locked(exec_ctx, args->elem)) { - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); + channel_data *chand = args->elem->channel_data; + call_data *calld = args->elem->call_data; + if (error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", + chand, calld); + } + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); + } else { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", + chand, calld); + } + if (pick_subchannel_locked(exec_ctx, args->elem)) { + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); + } } } gpr_free(args); } -static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_error *error) { +static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: deferring pick pending resolver result", chand, + calld); + } + pick_after_resolver_result_args *args = + (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); + args->elem = elem; + GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, + args, grpc_combiner_scheduler(chand->combiner)); + grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, + &args->closure, GRPC_ERROR_NONE); +} + +static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, - &calld->connected_subchannel, - GRPC_ERROR_REF(error)); - } // If we don't yet have a resolver result, then a closure for - // continue_picking_after_resolver_result_locked() will have been added to + // pick_after_resolver_result_done_locked() will have been added to // chand->waiting_for_resolver_result_closures, and it may not be invoked // until after this call has been destroyed. We mark the operation as - // cancelled, so that when continue_picking_after_resolver_result_locked() + // cancelled, so that when pick_after_resolver_result_done_locked() // is called, it will be a no-op. We also immediately invoke // subchannel_ready_locked() to propagate the error back to the caller. for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head; closure != NULL; closure = closure->next_data.next) { pick_after_resolver_result_args *args = closure->cb_arg; if (!args->cancelled && args->elem == elem) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: " + "cancelling pick waiting for resolver result", + chand, calld); + } args->cancelled = true; subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -1027,24 +1143,21 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GRPC_ERROR_UNREF(error); } -// State for pick callback that holds a reference to the LB policy -// from which the pick was requested. -typedef struct { - grpc_lb_policy *lb_policy; - grpc_call_element *elem; - grpc_closure closure; -} pick_callback_args; - // Callback invoked by grpc_lb_policy_pick_locked() for async picks. // Unrefs the LB policy after invoking subchannel_ready_locked(). static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - pick_callback_args *args = arg; - GPR_ASSERT(args != NULL); - GPR_ASSERT(args->lb_policy != NULL); - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); - GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel"); - gpr_free(args); + grpc_call_element *elem = arg; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", + chand, calld); + } + GPR_ASSERT(calld->lb_policy != NULL); + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; + subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } // Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). @@ -1055,23 +1168,44 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, const grpc_lb_policy_pick_args *inputs) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args)); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", + chand, calld, chand->lb_policy); + } + // Keep a ref to the LB policy in calld while the pick is pending. GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); - pick_args->lb_policy = chand->lb_policy; - pick_args->elem = elem; - GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args, + calld->lb_policy = chand->lb_policy; + GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); const bool pick_done = grpc_lb_policy_pick_locked( exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel, - calld->subchannel_call_context, NULL, &pick_args->closure); + calld->subchannel_call_context, NULL, &calld->lb_pick_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ - GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel"); - gpr_free(pick_args); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", + chand, calld); + } + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; } return pick_done; } +static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + GPR_ASSERT(calld->lb_policy != NULL); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", + chand, calld, calld->lb_policy); + } + grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, + &calld->connected_subchannel, error); +} + static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { GPR_TIMER_BEGIN("pick_subchannel", 0); @@ -1107,20 +1241,9 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs); } else if (chand->resolver != NULL) { if (!chand->started_resolving) { - chand->started_resolving = true; - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); + start_resolving_locked(exec_ctx, chand); } - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); - args->elem = elem; - GRPC_CLOSURE_INIT(&args->closure, - continue_picking_after_resolver_result_locked, args, - grpc_combiner_scheduler(chand->combiner)); - grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, - &args->closure, GRPC_ERROR_NONE); + pick_after_resolver_result_start_locked(exec_ctx, elem); } else { subchannel_ready_locked( exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); @@ -1133,63 +1256,77 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); - grpc_transport_stream_op_batch *op = arg; - grpc_call_element *elem = op->handler_private.extra_arg; + grpc_transport_stream_op_batch *batch = arg; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; /* need to recheck that another thread hasn't set the call */ call_or_error coe = get_call_or_error(calld); if (coe.error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); + } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(coe.error)); + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); goto done; } if (coe.subchannel_call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); + } + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); goto done; } // Add to waiting-for-pick list. If we succeed in getting a // subchannel call below, we'll handle this batch (along with any // other waiting batches) in waiting_for_pick_batches_resume_locked(). - waiting_for_pick_batches_add_locked(calld, op); - /* if this is a cancellation, then we can raise our cancelled flag */ - if (op->cancel_stream) { - grpc_error *error = op->payload->cancel_stream.cancel_error; + waiting_for_pick_batches_add_locked(calld, batch); + // If this is a cancellation, cancel the pending pick (if any) and + // fail any pending batches. + if (batch->cancel_stream) { + grpc_error *error = batch->payload->cancel_stream.cancel_error; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, + calld, grpc_error_string(error)); + } /* Stash a copy of cancel_error in our call data, so that we can use it for subsequent operations. This ensures that if the call is - cancelled before any ops are passed down (e.g., if the deadline + cancelled before any batches are passed down (e.g., if the deadline is in the past when the call starts), we can return the right - error to the caller when the first op does get passed down. */ + error to the caller when the first batch does get passed down. */ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); - if (calld->pick_pending) { - cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + if (calld->lb_policy != NULL) { + pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + } else { + pick_after_resolver_result_cancel_locked(exec_ctx, elem, + GRPC_ERROR_REF(error)); } - waiting_for_pick_batches_fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(error)); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); goto done; } /* if we don't have a subchannel, try to get one */ - if (!calld->pick_pending && calld->connected_subchannel == NULL && - op->send_initial_metadata) { - calld->initial_metadata_payload = op->payload; - calld->pick_pending = true; + if (batch->send_initial_metadata) { + GPR_ASSERT(calld->connected_subchannel == NULL); + calld->initial_metadata_payload = batch->payload; GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ if (pick_subchannel_locked(exec_ctx, elem)) { // Pick was returned synchronously. - calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); if (calld->connected_subchannel == NULL) { grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, error); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, error); } else { // Create subchannel call. - create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE); + create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE); } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, @@ -1232,47 +1369,59 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { If it has, we proceed on the fast path. */ static void cc_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + if (GRPC_TRACER_ON(grpc_client_channel_trace) || + GRPC_TRACER_ON(grpc_trace_channel)) { + grpc_call_log_op(GPR_INFO, elem, batch); + } if (chand->deadline_checking_enabled) { grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, - op); + batch); } // Intercept on_complete for recv_trailing_metadata so that we can // check retry throttle status. - if (op->recv_trailing_metadata) { - GPR_ASSERT(op->on_complete != NULL); - calld->original_on_complete = op->on_complete; + if (batch->recv_trailing_metadata) { + GPR_ASSERT(batch->on_complete != NULL); + calld->original_on_complete = batch->on_complete; GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, grpc_schedule_on_exec_ctx); - op->on_complete = &calld->on_complete; + batch->on_complete = &calld->on_complete; } /* try to (atomically) get the call */ call_or_error coe = get_call_or_error(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); if (coe.error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); + } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(coe.error)); - GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); - /* early out */ - return; + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); + goto done; } if (coe.subchannel_call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); - GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); - /* early out */ - return; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); + } + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); + goto done; } /* we failed; lock and figure out what to do */ + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld); + } GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); - op->handler_private.extra_arg = elem; + batch->handler_private.extra_arg = elem; GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, - start_transport_stream_op_batch_locked, op, + exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure, + start_transport_stream_op_batch_locked, batch, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); +done: GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } @@ -1317,7 +1466,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, "client_channel_destroy_call"); } - GPR_ASSERT(!calld->pick_pending); + GPR_ASSERT(calld->lb_policy == NULL); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, @@ -1366,11 +1515,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { chand->exit_idle_when_lb_policy_arrives = true; if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = true; - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); + start_resolving_locked(exec_ctx, chand); } } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 63f7c29940..c99f0092e9 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -23,6 +23,8 @@ #include "src/core/ext/filters/client_channel/resolver.h" #include "src/core/lib/channel/channel_stack.h" +extern grpc_tracer_flag grpc_client_channel_trace; + // Channel arg key for server URI string. #define GRPC_ARG_SERVER_URI "grpc.server_uri" diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c index 60e77d6268..6f133a648b 100644 --- a/src/core/ext/filters/client_channel/client_channel_plugin.c +++ b/src/core/ext/filters/client_channel/client_channel_plugin.c @@ -78,6 +78,7 @@ void grpc_client_channel_init(void) { GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter, (void *)&grpc_client_channel_filter); grpc_http_connect_register_handshaker_factory(); + grpc_register_tracer("client_channel", &grpc_client_channel_trace); #ifndef NDEBUG grpc_register_tracer("resolver_refcount", &grpc_trace_resolver_refcount); #endif diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 8e9d6b0f47..56d340b8c2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -195,11 +195,28 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list, const char *reason) { + if (subchannel_list->shutting_down) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Subchannel list %p already shutting down", + (void *)subchannel_list); + } + return; + }; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "Shutting down subchannel_list %p", + (void *)subchannel_list); + } GPR_ASSERT(!subchannel_list->shutting_down); subchannel_list->shutting_down = true; for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { subchannel_data *sd = &subchannel_list->subchannels[i]; if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "Unsubscribing from subchannel %p as part of shutting down " + "subchannel_list %p", + (void *)sd->subchannel, (void *)subchannel_list); + } grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); @@ -228,13 +245,14 @@ static size_t get_next_ready_subchannel_index_locked( const size_t index = (i + p->last_ready_subchannel_index + 1) % p->subchannel_list->num_subchannels; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " - "state=%d", - (void *)p, - (void *)p->subchannel_list->subchannels[index].subchannel, - (void *)p->subchannel_list, (unsigned long)index, - p->subchannel_list->subchannels[index].curr_connectivity_state); + gpr_log( + GPR_DEBUG, + "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " + "state=%s", + (void *)p, (void *)p->subchannel_list->subchannels[index].subchannel, + (void *)p->subchannel_list, (unsigned long)index, + grpc_connectivity_state_name( + p->subchannel_list->subchannels[index].curr_connectivity_state)); } if (p->subchannel_list->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { @@ -511,16 +529,27 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->subchannel_list->policy; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_DEBUG, + "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " + "prev_state=%s new_state=%s p->shutdown=%d " + "sd->subchannel_list->shutting_down=%d error=%s", + (void *)p, (void *)sd->subchannel, (void *)sd->subchannel_list, + grpc_connectivity_state_name(sd->prev_connectivity_state), + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), + p->shutdown, sd->subchannel_list->shutting_down, + grpc_error_string(error)); + } // If the policy is shutting down, unref and return. if (p->shutdown) { rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); return; } - if (sd->subchannel_list->shutting_down) { + if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) { // the subchannel list associated with sd has been discarded. This callback // corresponds to the unsubscription. - GPR_ASSERT(error == GRPC_ERROR_CANCELLED); rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); return; @@ -536,13 +565,6 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] connectivity changed for subchannel %p: " - "prev_state=%d new_state=%d", - (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state, - sd->curr_connectivity_state); - } // Update state counters and determine new overall state. update_state_counters_locked(sd); sd->prev_connectivity_state = sd->curr_connectivity_state; diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c index af3391a731..5ea75f0554 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c @@ -132,7 +132,7 @@ static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - dns_resolver *r = arg; + dns_resolver *r = (dns_resolver *)arg; r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { @@ -146,7 +146,7 @@ static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - dns_resolver *r = arg; + dns_resolver *r = (dns_resolver *)arg; grpc_channel_args *result = NULL; GPR_ASSERT(r->resolving); r->resolving = false; @@ -241,7 +241,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, char *path = args->uri->path; if (path[0] == '/') ++path; // Create resolver. - dns_resolver *r = gpr_zalloc(sizeof(dns_resolver)); + dns_resolver *r = (dns_resolver *)gpr_zalloc(sizeof(dns_resolver)); grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner); r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c index 7b4fe38272..7ceb8f40a1 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c @@ -177,7 +177,8 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, return NULL; } /* Instantiate resolver. */ - sockaddr_resolver *r = gpr_zalloc(sizeof(sockaddr_resolver)); + sockaddr_resolver *r = + (sockaddr_resolver *)gpr_zalloc(sizeof(sockaddr_resolver)); r->addresses = addresses; r->channel_args = grpc_channel_args_copy(args->args); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner); diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c index ced025e2e2..6789903c95 100644 --- a/src/core/ext/filters/deadline/deadline_filter.c +++ b/src/core/ext/filters/deadline/deadline_filter.c @@ -37,8 +37,8 @@ // Timer callback. static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_call_element* elem = arg; - grpc_deadline_state* deadline_state = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)arg; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; if (error != GRPC_ERROR_CANCELLED) { grpc_call_element_signal_error( exec_ctx, elem, @@ -57,7 +57,7 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) { return; } - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; grpc_deadline_timer_state cur_state; grpc_closure* closure = NULL; retry: @@ -112,7 +112,7 @@ static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, // Callback run when the call is complete. static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_deadline_state* deadline_state = arg; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)arg; cancel_timer_if_needed(exec_ctx, deadline_state); // Invoke the next callback. GRPC_CLOSURE_RUN(exec_ctx, deadline_state->next_on_complete, @@ -145,7 +145,7 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_stack* call_stack, gpr_timespec deadline) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; deadline_state->call_stack = call_stack; // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. @@ -169,13 +169,13 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); } void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec new_deadline) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); start_timer_if_needed(exec_ctx, elem, new_deadline); } @@ -183,7 +183,7 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_client_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, deadline_state); } else { @@ -256,8 +256,8 @@ static void client_start_transport_stream_op_batch( // Callback for receiving initial metadata on the server. static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_call_element* elem = arg; - server_call_data* calld = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)arg; + server_call_data* calld = (server_call_data*)elem->call_data; // Get deadline from metadata and start the timer if needed. start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline); // Invoke the next callback. @@ -269,7 +269,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, static void server_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - server_call_data* calld = elem->call_data; + server_call_data* calld = (server_call_data*)elem->call_data; if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); } else { @@ -341,8 +341,8 @@ static bool maybe_add_deadline_filter(grpc_exec_ctx* exec_ctx, void* arg) { return grpc_deadline_checking_enabled( grpc_channel_stack_builder_get_channel_arguments(builder)) - ? grpc_channel_stack_builder_prepend_filter(builder, arg, NULL, - NULL) + ? grpc_channel_stack_builder_prepend_filter( + builder, (const grpc_channel_filter*)arg, NULL, NULL) : true; } diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index 35304f8150..7d748b9c32 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -108,7 +108,7 @@ static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) { static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; /* Decrease call_count. If there are no active calls at this time, max_idle_timer will start here. If the number of active calls is not 0, max_idle_timer will start after all the active calls end. */ @@ -119,7 +119,7 @@ static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); @@ -140,7 +140,7 @@ static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_grace_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); @@ -156,7 +156,7 @@ static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx, static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; if (error == GRPC_ERROR_NONE) { /* Prevent the max idle timer from being set again */ gpr_atm_no_barrier_fetch_add(&chand->call_count, 1); @@ -176,7 +176,7 @@ static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg, static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_timer_pending = false; gpr_mu_unlock(&chand->max_age_timer_mu); @@ -200,7 +200,7 @@ static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_grace_timer_pending = false; gpr_mu_unlock(&chand->max_age_timer_mu); @@ -220,7 +220,7 @@ static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_transport_op* op = grpc_make_transport_op(NULL); op->on_connectivity_state_change = &chand->channel_connectivity_changed, @@ -264,7 +264,7 @@ static int add_random_max_connection_age_jitter(int value) { static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; increase_call_count(exec_ctx, chand); return GRPC_ERROR_NONE; } @@ -281,7 +281,7 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) { - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; gpr_mu_init(&chand->max_age_timer_mu); chand->max_age_timer_pending = false; chand->max_age_grace_timer_pending = false; diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c index 9bb565ed6d..846c7df69a 100644 --- a/src/core/ext/filters/message_size/message_size_filter.c +++ b/src/core/ext/filters/message_size/message_size_filter.c @@ -60,7 +60,8 @@ static void* message_size_limits_create_from_json(const grpc_json* json) { if (max_response_message_bytes == -1) return NULL; } } - message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); + message_size_limits* value = + (message_size_limits*)gpr_malloc(sizeof(message_size_limits)); value->max_send_size = max_request_message_bytes; value->max_recv_size = max_response_message_bytes; return value; @@ -88,8 +89,8 @@ typedef struct channel_data { // receive message size. static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_error* error) { - grpc_call_element* elem = user_data; - call_data* calld = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)user_data; + call_data* calld = (call_data*)elem->call_data; if (*calld->recv_message != NULL && calld->limits.max_recv_size >= 0 && (*calld->recv_message)->length > (size_t)calld->limits.max_recv_size) { char* message_string; @@ -117,7 +118,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, static void start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - call_data* calld = elem->call_data; + call_data* calld = (call_data*)elem->call_data; // Check max send message size. if (op->send_message && calld->limits.max_send_size >= 0 && op->payload->send_message.send_message->length > @@ -149,8 +150,8 @@ static void start_transport_stream_op_batch( static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - channel_data* chand = elem->channel_data; - call_data* calld = elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; calld->next_recv_message_ready = NULL; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); @@ -160,8 +161,9 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // size to the receive limit. calld->limits = chand->limits; if (chand->method_limit_table != NULL) { - message_size_limits* limits = grpc_method_config_table_get( - exec_ctx, chand->method_limit_table, args->path); + message_size_limits* limits = + (message_size_limits*)grpc_method_config_table_get( + exec_ctx, chand->method_limit_table, args->path); if (limits != NULL) { if (limits->max_send_size >= 0 && (limits->max_send_size < calld->limits.max_send_size || @@ -220,7 +222,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; chand->limits = get_message_size_limits(args->channel_args); // Get method config table from channel args. const grpc_arg* channel_arg = @@ -243,7 +245,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, // Destructor for channel_data. static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem) { - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; grpc_slice_hash_table_unref(exec_ctx, chand->method_limit_table); } diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c index 8b3fff5fa3..b4d2cb4b8c 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c @@ -52,8 +52,8 @@ static bool get_user_agent_mdelem(const grpc_metadata_batch* batch, // Callback invoked when we receive an initial metadata. static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_error* error) { - grpc_call_element* elem = user_data; - call_data* calld = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)user_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_ERROR_NONE == error) { grpc_mdelem md; @@ -75,7 +75,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, static void start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - call_data* calld = elem->call_data; + call_data* calld = (call_data*)elem->call_data; // Inject callback for receiving initial metadata if (op->recv_initial_metadata) { @@ -103,7 +103,7 @@ static void start_transport_stream_op_batch( static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = elem->call_data; + call_data* calld = (call_data*)elem->call_data; calld->next_recv_initial_metadata_ready = NULL; calld->workaround_active = false; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, diff --git a/src/core/ext/filters/workarounds/workaround_utils.c b/src/core/ext/filters/workarounds/workaround_utils.c index bc76753a8a..e600fbee67 100644 --- a/src/core/ext/filters/workarounds/workaround_utils.c +++ b/src/core/ext/filters/workarounds/workaround_utils.c @@ -33,7 +33,8 @@ grpc_workaround_user_agent_md *grpc_parse_user_agent(grpc_mdelem md) { if (NULL != user_agent_md) { return user_agent_md; } - user_agent_md = gpr_malloc(sizeof(grpc_workaround_user_agent_md)); + user_agent_md = (grpc_workaround_user_agent_md *)gpr_malloc( + sizeof(grpc_workaround_user_agent_md)); for (int i = 0; i < GRPC_MAX_WORKAROUND_ID; i++) { if (ua_parser[i]) { user_agent_md->workaround_active[i] = ua_parser[i](md); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 0bdc0603a0..f790267944 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -34,6 +34,7 @@ #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -280,8 +281,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_slice_buffer_init(&t->outbuf); grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); - GRPC_CLOSURE_INIT(&t->write_action, write_action, t, - grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, @@ -399,6 +398,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -487,6 +488,23 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_permit_without_calls = (uint32_t)grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){0, 0, 1}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_OPTIMIZATION_TARGET)) { + if (channel_args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s should be a string", + GRPC_ARG_OPTIMIZATION_TARGET); + } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == + strcmp(channel_args->args[i].value.string, "throughput")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; + } else { + gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", + GRPC_ARG_OPTIMIZATION_TARGET, + channel_args->args[i].value.string); + } } else { static const struct { const char *channel_arg_name; @@ -540,6 +558,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } + GRPC_CLOSURE_INIT(&t->write_action, write_action, t, + t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT + ? grpc_executor_scheduler + : grpc_schedule_on_exec_ctx); + t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; t->ping_state.is_delayed_ping_timer_set = false; @@ -2144,8 +2167,8 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, double bw_dbl, double bdp_dbl) { - int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX); - int32_t target = GPR_MAX((int32_t)bw_dbl / 1000, bdp); + int32_t bdp = (int32_t)GPR_CLAMP(bdp_dbl, 128.0, INT32_MAX); + int32_t target = (int32_t)GPR_MAX(bw_dbl / 1000, bdp); // frame size is bounded [2^14,2^24-1] int32_t frame_size = GPR_CLAMP(target, 16384, 16777215); int64_t delta = (int64_t)frame_size - diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 9fa72ddbdf..a5b67e5aba 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -67,6 +67,11 @@ typedef enum { } grpc_chttp2_ping_type; typedef enum { + GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY, + GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT, +} grpc_chttp2_optimization_target; + +typedef enum { GRPC_CHTTP2_PCL_INITIATE = 0, GRPC_CHTTP2_PCL_NEXT, GRPC_CHTTP2_PCL_INFLIGHT, @@ -229,6 +234,8 @@ struct grpc_chttp2_transport { /** should we probe bdp? */ bool enable_bdp_probe; + grpc_chttp2_optimization_target opt_target; + /** various lists of streams */ grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index 2c91ad357c..9f82c480bc 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -57,9 +57,6 @@ #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) -/* Uncomment the following to enable extra checks on poll_object operations */ -/* #define PO_DEBUG */ - /* The maximum number of polling threads per polling island. By default no limit */ static int g_max_pollers_per_pi = INT_MAX; @@ -92,7 +89,7 @@ typedef enum { } poll_obj_type; typedef struct poll_obj { -#ifdef PO_DEBUG +#ifndef NDEBUG poll_obj_type obj_type; #endif gpr_mu mu; @@ -893,7 +890,7 @@ static grpc_fd *fd_create(int fd, const char *name) { * would be holding a lock to it anyway. */ gpr_mu_lock(&new_fd->po.mu); new_fd->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG new_fd->po.obj_type = POLL_OBJ_FD; #endif @@ -1171,7 +1168,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->po.mu); *mu = &pollset->po.mu; pollset->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pollset->po.obj_type = POLL_OBJ_POLLSET; #endif @@ -1625,7 +1622,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag, poll_obj_type item_type) { GPR_TIMER_BEGIN("add_poll_object", 0); -#ifdef PO_DEBUG +#ifndef NDEBUG GPR_ASSERT(item->obj_type == item_type); GPR_ASSERT(bag->obj_type == bag_type); #endif @@ -1784,7 +1781,7 @@ static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = gpr_malloc(sizeof(*pss)); gpr_mu_init(&pss->po.mu); pss->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pss->po.obj_type = POLL_OBJ_POLLSET_SET; #endif return pss; diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 255e07010b..3c4ca9c7c5 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -54,9 +54,6 @@ gpr_log(GPR_INFO, __VA_ARGS__); \ } -/* Uncomment the following to enable extra checks on poll_object operations */ -/* #define PO_DEBUG */ - static int grpc_wakeup_signal = -1; static bool is_grpc_wakeup_signal_initialized = false; @@ -85,7 +82,7 @@ typedef enum { } poll_obj_type; typedef struct poll_obj { -#ifdef PO_DEBUG +#ifndef NDEBUG poll_obj_type obj_type; #endif gpr_mu mu; @@ -821,7 +818,7 @@ static grpc_fd *fd_create(int fd, const char *name) { * would be holding a lock to it anyway. */ gpr_mu_lock(&new_fd->po.mu); new_fd->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG new_fd->po.obj_type = POLL_OBJ_FD; #endif @@ -1079,7 +1076,7 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->po.mu); *mu = &pollset->po.mu; pollset->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pollset->po.obj_type = POLL_OBJ_POLLSET; #endif @@ -1416,7 +1413,7 @@ static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag, poll_obj_type item_type) { GPR_TIMER_BEGIN("add_poll_object", 0); -#ifdef PO_DEBUG +#ifndef NDEBUG GPR_ASSERT(item->obj_type == item_type); GPR_ASSERT(bag->obj_type == bag_type); #endif @@ -1575,7 +1572,7 @@ static grpc_pollset_set *pollset_set_create(void) { grpc_pollset_set *pss = gpr_malloc(sizeof(*pss)); gpr_mu_init(&pss->po.mu); pss->po.pi = NULL; -#ifdef PO_DEBUG +#ifndef NDEBUG pss->po.obj_type = POLL_OBJ_POLLSET_SET; #endif return pss; diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 45de289e45..a98b8e62db 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -54,7 +54,7 @@ static int retry_named_port_failure(int status, request *r, int retry_status; uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t)); req->data = r; - r->port = svc[i][1]; + r->port = gpr_strdup(svc[i][1]); retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb, r->host, r->port, r->hints); if (retry_status < 0 || getaddrinfo_cb == NULL) { @@ -127,6 +127,8 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error); grpc_exec_ctx_finish(&exec_ctx); gpr_free(r->hints); + gpr_free(r->host); + gpr_free(r->port); gpr_free(r); uv_freeaddrinfo(res); } diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index ab6832932f..2f1d237c07 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -48,6 +48,7 @@ typedef struct grpc_uv_tcp_connect { static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx, grpc_uv_tcp_connect *connect) { grpc_resource_quota_unref_internal(exec_ctx, connect->resource_quota); + gpr_free(connect->addr_name); gpr_free(connect); } @@ -105,6 +106,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { } done = (--connect->refs == 0); if (done) { + grpc_exec_ctx_flush(&exec_ctx); uv_tcp_connect_cleanup(&exec_ctx, connect); } GRPC_CLOSURE_SCHED(&exec_ctx, closure, error); @@ -140,6 +142,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, connect->resource_quota = resource_quota; uv_tcp_init(uv_default_loop(), connect->tcp_handle); connect->connect_req.data = connect; + connect->refs = 1; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 2de0ea90e7..2ab836cc34 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -234,6 +234,7 @@ static void on_connect(uv_stream_t *server, int status) { sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, acceptor); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(peer_name_string); } } diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 7c21b44e76..ff5fd3edc8 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -65,7 +65,10 @@ typedef struct { } grpc_tcp; static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { + grpc_slice_unref_internal(exec_ctx, tcp->read_slice); grpc_resource_user_unref(exec_ctx, tcp->resource_user); + gpr_free(tcp->handle); + gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -115,13 +118,17 @@ static void uv_close_callback(uv_handle_t *handle) { grpc_exec_ctx_finish(&exec_ctx); } +static grpc_slice alloc_read_slice(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { + return grpc_resource_user_slice_malloc(exec_ctx, resource_user, + GRPC_TCP_DEFAULT_READ_SLICE_SIZE); +} + static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp *tcp = handle->data; (void)suggested_size; - tcp->read_slice = grpc_resource_user_slice_malloc( - &exec_ctx, tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); grpc_exec_ctx_finish(&exec_ctx); @@ -148,6 +155,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // Successful read sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); grpc_slice_buffer_add(tcp->read_slices, sub); + tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); error = GRPC_ERROR_NONE; if (GRPC_TRACER_ON(grpc_tcp_trace)) { size_t i; @@ -334,6 +342,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, grpc_resource_quota *resource_quota, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (GRPC_TRACER_ON(grpc_tcp_trace)) { gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp); @@ -350,6 +359,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); + tcp->read_slice = alloc_read_slice(&exec_ctx, tcp->resource_user); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); @@ -357,6 +367,7 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, uv_unref((uv_handle_t *)handle); #endif + grpc_exec_ctx_finish(&exec_ctx); return &tcp->base; } diff --git a/src/core/lib/iomgr/timer_manager.c b/src/core/lib/iomgr/timer_manager.c index 520d4a3252..cb7998db97 100644 --- a/src/core/lib/iomgr/timer_manager.c +++ b/src/core/lib/iomgr/timer_manager.c @@ -50,6 +50,9 @@ static completed_thread *g_completed_threads; static bool g_kicked; // is there a thread waiting until the next timer should fire? static bool g_has_timed_waiter; +// the deadline of the current timed waiter thread (only relevant if +// g_has_timed_waiter is true) +static gpr_timespec g_timed_waiter_deadline; // generation counter to track which thread is waiting for the next timer static uint64_t g_timed_waiter_generation; @@ -101,8 +104,7 @@ static void run_some_timers(grpc_exec_ctx *exec_ctx) { start_timer_thread_and_unlock(); } else { // if there's no thread waiting with a timeout, kick an existing - // waiter - // so that the next deadline is not missed + // waiter so that the next deadline is not missed if (!g_has_timed_waiter) { if (GRPC_TRACER_ON(grpc_timer_check_trace)) { gpr_log(GPR_DEBUG, "kick untimed waiter"); @@ -132,44 +134,79 @@ static bool wait_until(gpr_timespec next) { gpr_mu_unlock(&g_mu); return false; } - // if there's no timed waiter, we should become one: that waiter waits - // only until the next timer should expire - // all other timers wait forever - uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; - if (!g_has_timed_waiter && gpr_time_cmp(next, inf_future) != 0) { - g_has_timed_waiter = true; - // we use a generation counter to track the timed waiter so we can - // cancel an existing one quickly (and when it actually times out it'll - // figure stuff out instead of incurring a wakeup) - my_timed_waiter_generation = ++g_timed_waiter_generation; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_timespec wait_time = gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); - gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", - wait_time.tv_sec, wait_time.tv_nsec); + + // If g_kicked is true at this point, it means there was a kick from the timer + // system that the timer-manager threads here missed. We cannot trust 'next' + // here any longer (since there might be an earlier deadline). So if g_kicked + // is true at this point, we should quickly exit this and get the next + // deadline from the timer system + + if (!g_kicked) { + // if there's no timed waiter, we should become one: that waiter waits + // only until the next timer should expire. All other timers wait forever + // + // 'g_timed_waiter_generation' is a global generation counter. The idea here + // is that the thread becoming a timed-waiter increments and stores this + // global counter locally in 'my_timed_waiter_generation' before going to + // sleep. After waking up, if my_timed_waiter_generation == + // g_timed_waiter_generation, it can be sure that it was the timed_waiter + // thread (and that no other thread took over while this was asleep) + // + // Initialize my_timed_waiter_generation to some value that is NOT equal to + // g_timed_waiter_generation + uint64_t my_timed_waiter_generation = g_timed_waiter_generation - 1; + + /* If there's no timed waiter, we should become one: that waiter waits only + until the next timer should expire. All other timer threads wait forever + unless their 'next' is earlier than the current timed-waiter's deadline + (in which case the thread with earlier 'next' takes over as the new timed + waiter) */ + if (gpr_time_cmp(next, inf_future) != 0) { + if (!g_has_timed_waiter || + (gpr_time_cmp(next, g_timed_waiter_deadline) < 0)) { + my_timed_waiter_generation = ++g_timed_waiter_generation; + g_has_timed_waiter = true; + g_timed_waiter_deadline = next; + + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_timespec wait_time = + gpr_time_sub(next, gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_log(GPR_DEBUG, "sleep for a %" PRId64 ".%09d seconds", + wait_time.tv_sec, wait_time.tv_nsec); + } + } else { // g_timed_waiter == true && next >= g_timed_waiter_deadline + next = inf_future; + } } - } else { - next = inf_future; - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + + if (GRPC_TRACER_ON(grpc_timer_check_trace) && + gpr_time_cmp(next, inf_future) == 0) { gpr_log(GPR_DEBUG, "sleep until kicked"); } + + gpr_cv_wait(&g_cv_wait, &g_mu, next); + + if (GRPC_TRACER_ON(grpc_timer_check_trace)) { + gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", + my_timed_waiter_generation == g_timed_waiter_generation, + g_kicked); + } + // if this was the timed waiter, then we need to check timers, and flag + // that there's now no timed waiter... we'll look for a replacement if + // there's work to do after checking timers (code above) + if (my_timed_waiter_generation == g_timed_waiter_generation) { + g_has_timed_waiter = false; + g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + } } - gpr_cv_wait(&g_cv_wait, &g_mu, next); - if (GRPC_TRACER_ON(grpc_timer_check_trace)) { - gpr_log(GPR_DEBUG, "wait ended: was_timed:%d kicked:%d", - my_timed_waiter_generation == g_timed_waiter_generation, g_kicked); - } - // if this was the timed waiter, then we need to check timers, and flag - // that there's now no timed waiter... we'll look for a replacement if - // there's work to do after checking timers (code above) - if (my_timed_waiter_generation == g_timed_waiter_generation) { - g_has_timed_waiter = false; - } + // if this was a kick from the timer system, consume it (and don't stop // this thread yet) if (g_kicked) { grpc_timer_consume_kick(); g_kicked = false; } + gpr_mu_unlock(&g_mu); return true; } @@ -257,6 +294,9 @@ void grpc_timer_manager_init(void) { g_waiter_count = 0; g_completed_threads = NULL; + g_has_timed_waiter = false; + g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + start_threads(); } @@ -302,6 +342,7 @@ void grpc_kick_poller(void) { gpr_mu_lock(&g_mu); g_kicked = true; g_has_timed_waiter = false; + g_timed_waiter_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); ++g_timed_waiter_generation; gpr_cv_signal(&g_cv_wait); gpr_mu_unlock(&g_mu); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 8c747085bb..6cd558d123 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -462,6 +462,35 @@ static BIGNUM *bignum_from_base64(grpc_exec_ctx *exec_ctx, const char *b64) { return result; } +#if OPENSSL_VERSION_NUMBER < 0x10100000L + +// Provide compatibility across OpenSSL 1.02 and 1.1. +static int RSA_set0_key(RSA *r, BIGNUM *n, BIGNUM *e, BIGNUM *d) { + /* If the fields n and e in r are NULL, the corresponding input + * parameters MUST be non-NULL for n and e. d may be + * left NULL (in case only the public key is used). + */ + if ((r->n == NULL && n == NULL) || (r->e == NULL && e == NULL)) { + return 0; + } + + if (n != NULL) { + BN_free(r->n); + r->n = n; + } + if (e != NULL) { + BN_free(r->e); + r->e = e; + } + if (d != NULL) { + BN_free(r->d); + r->d = d; + } + + return 1; +} +#endif // OPENSSL_VERSION_NUMBER < 0x10100000L + static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json, const char *kty) { const grpc_json *key_prop; @@ -478,21 +507,27 @@ static EVP_PKEY *pkey_from_jwk(grpc_exec_ctx *exec_ctx, const grpc_json *json, gpr_log(GPR_ERROR, "Could not create rsa key."); goto end; } + BIGNUM *tmp_n = NULL; + BIGNUM *tmp_e = NULL; for (key_prop = json->child; key_prop != NULL; key_prop = key_prop->next) { if (strcmp(key_prop->key, "n") == 0) { - rsa->n = + tmp_n = bignum_from_base64(exec_ctx, validate_string_field(key_prop, "n")); - if (rsa->n == NULL) goto end; + if (tmp_n == NULL) goto end; } else if (strcmp(key_prop->key, "e") == 0) { - rsa->e = + tmp_e = bignum_from_base64(exec_ctx, validate_string_field(key_prop, "e")); - if (rsa->e == NULL) goto end; + if (tmp_e == NULL) goto end; } } - if (rsa->e == NULL || rsa->n == NULL) { + if (tmp_e == NULL || tmp_n == NULL) { gpr_log(GPR_ERROR, "Missing RSA public key field."); goto end; } + if (!RSA_set0_key(rsa, tmp_n, tmp_e, NULL)) { + gpr_log(GPR_ERROR, "Cannot set RSA key from inputs."); + goto end; + } result = EVP_PKEY_new(); EVP_PKEY_set1_RSA(result, rsa); /* uprefs rsa. */ diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 58112b04b4..50a51b31cd 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -49,7 +49,6 @@ typedef struct { pollset_set so that work can progress when this call wants work to progress */ grpc_polling_entity *pollent; - grpc_transport_stream_op_batch op; gpr_atm security_context_set; gpr_mu security_context_mu; grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT]; @@ -92,11 +91,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, size_t num_md, grpc_credentials_status status, const char *error_details) { - grpc_call_element *elem = (grpc_call_element *)user_data; + grpc_transport_stream_op_batch *batch = + (grpc_transport_stream_op_batch *)user_data; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; - grpc_transport_stream_op_batch *op = &calld->op; - grpc_metadata_batch *mdb; - size_t i; reset_auth_metadata_context(&calld->auth_md_context); grpc_error *error = GRPC_ERROR_NONE; if (status != GRPC_CREDENTIALS_OK) { @@ -108,9 +106,10 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED); } else { GPR_ASSERT(num_md <= MAX_CREDENTIALS_METADATA_COUNT); - GPR_ASSERT(op->send_initial_metadata); - mdb = op->payload->send_initial_metadata.send_initial_metadata; - for (i = 0; i < num_md; i++) { + GPR_ASSERT(batch->send_initial_metadata); + grpc_metadata_batch *mdb = + batch->payload->send_initial_metadata.send_initial_metadata; + for (size_t i = 0; i < num_md; i++) { add_error(&error, grpc_metadata_batch_add_tail( exec_ctx, mdb, &calld->md_links[i], @@ -120,9 +119,9 @@ static void on_credentials_metadata(grpc_exec_ctx *exec_ctx, void *user_data, } } if (error == GRPC_ERROR_NONE) { - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } else { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, error); } } @@ -158,11 +157,11 @@ void build_auth_metadata_context(grpc_security_connector *sc, static void send_security_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_client_security_context *ctx = - (grpc_client_security_context *)op->payload + (grpc_client_security_context *)batch->payload ->context[GRPC_CONTEXT_SECURITY] .value; grpc_call_credentials *channel_call_creds = @@ -171,7 +170,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, if (channel_call_creds == NULL && !call_creds_has_md) { /* Skip sending metadata altogether. */ - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); return; } @@ -180,7 +179,7 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, ctx->creds, NULL); if (calld->creds == NULL) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, + exec_ctx, batch, grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Incompatible credentials set on channel and call."), @@ -194,28 +193,29 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx, build_auth_metadata_context(&chand->security_connector->base, chand->auth_context, calld); - calld->op = *op; /* Copy op (originates from the caller's stack). */ GPR_ASSERT(calld->pollent != NULL); grpc_call_credentials_get_request_metadata( exec_ctx, calld->creds, calld->pollent, calld->auth_md_context, - on_credentials_metadata, elem); + on_credentials_metadata, batch); } static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, grpc_security_status status) { - grpc_call_element *elem = (grpc_call_element *)user_data; + grpc_transport_stream_op_batch *batch = + (grpc_transport_stream_op_batch *)user_data; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; if (status == GRPC_SECURITY_OK) { - send_security_metadata(exec_ctx, elem, &calld->op); + send_security_metadata(exec_ctx, elem, batch); } else { char *error_msg; char *host = grpc_slice_to_c_string(calld->host); gpr_asprintf(&error_msg, "Invalid host %s set in :authority metadata.", host); gpr_free(host); - grpc_call_element_signal_error( - exec_ctx, elem, + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, batch, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_msg), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAUTHENTICATED)); @@ -223,35 +223,29 @@ static void on_host_checked(grpc_exec_ctx *exec_ctx, void *user_data, } } -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GPR_TIMER_BEGIN("auth_start_transport_op", 0); +static void auth_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + GPR_TIMER_BEGIN("auth_start_transport_stream_op_batch", 0); /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_linked_mdelem *l; - grpc_client_security_context *sec_ctx = NULL; - if (!op->cancel_stream) { + if (!batch->cancel_stream) { /* double checked lock over security context to ensure it's set once */ if (gpr_atm_acq_load(&calld->security_context_set) == 0) { gpr_mu_lock(&calld->security_context_mu); if (gpr_atm_acq_load(&calld->security_context_set) == 0) { - GPR_ASSERT(op->payload->context != NULL); - if (op->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { - op->payload->context[GRPC_CONTEXT_SECURITY].value = + GPR_ASSERT(batch->payload->context != NULL); + if (batch->payload->context[GRPC_CONTEXT_SECURITY].value == NULL) { + batch->payload->context[GRPC_CONTEXT_SECURITY].value = grpc_client_security_context_create(); - op->payload->context[GRPC_CONTEXT_SECURITY].destroy = + batch->payload->context[GRPC_CONTEXT_SECURITY].destroy = grpc_client_security_context_destroy; } - sec_ctx = op->payload->context[GRPC_CONTEXT_SECURITY].value; + grpc_client_security_context *sec_ctx = + batch->payload->context[GRPC_CONTEXT_SECURITY].value; GRPC_AUTH_CONTEXT_UNREF(sec_ctx->auth_context, "client auth filter"); sec_ctx->auth_context = GRPC_AUTH_CONTEXT_REF(chand->auth_context, "client_auth_filter"); @@ -261,9 +255,9 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, } } - if (op->send_initial_metadata) { - for (l = op->payload->send_initial_metadata.send_initial_metadata->list - .head; + if (batch->send_initial_metadata) { + for (grpc_linked_mdelem *l = batch->payload->send_initial_metadata + .send_initial_metadata->list.head; l != NULL; l = l->next) { grpc_mdelem md = l->md; /* Pointer comparison is OK for md_elems created from the same context. @@ -284,19 +278,19 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, } if (calld->have_host) { char *call_host = grpc_slice_to_c_string(calld->host); - calld->op = *op; /* Copy op (originates from the caller's stack). */ + batch->handler_private.extra_arg = elem; grpc_channel_security_connector_check_call_host( exec_ctx, chand->security_connector, call_host, chand->auth_context, - on_host_checked, elem); + on_host_checked, batch); gpr_free(call_host); - GPR_TIMER_END("auth_start_transport_op", 0); + GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); return; /* early exit */ } } /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); - GPR_TIMER_END("auth_start_transport_op", 0); + grpc_call_next_op(exec_ctx, elem, batch); + GPR_TIMER_END("auth_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ @@ -379,7 +373,15 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_client_auth_filter = { - auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), - init_call_elem, set_pollset_or_pollset_set, destroy_call_elem, - sizeof(channel_data), init_channel_elem, destroy_channel_elem, - grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"}; + auth_start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + set_pollset_or_pollset_set, + destroy_call_elem, + sizeof(channel_data), + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "client-auth"}; diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 4e6914be7b..9bf3f0ca0f 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -27,14 +27,9 @@ #include "src/core/lib/slice/slice_internal.h" typedef struct call_data { - grpc_metadata_batch *recv_initial_metadata; - /* Closure to call when finished with the auth_on_recv hook. */ - grpc_closure *on_done_recv; - /* Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member after - handling it. */ - grpc_closure auth_on_recv; - grpc_transport_stream_op_batch *transport_op; + grpc_transport_stream_op_batch *recv_initial_metadata_batch; + grpc_closure *original_recv_initial_metadata_ready; + grpc_closure recv_initial_metadata_ready; grpc_metadata_array md; const grpc_metadata *consumed_md; size_t num_consumed_md; @@ -90,125 +85,96 @@ static void on_md_processing_done( grpc_status_code status, const char *error_details) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; + grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - /* TODO(jboeuf): Implement support for response_md. */ if (response_md != NULL && num_response_md > 0) { gpr_log(GPR_INFO, "response_md in auth metadata processing not supported for now. " "Ignoring..."); } - + grpc_error *error = GRPC_ERROR_NONE; if (status == GRPC_STATUS_OK) { calld->consumed_md = consumed_md; calld->num_consumed_md = num_consumed_md; - /* TODO(ctiller): propagate error */ - GRPC_LOG_IF_ERROR( - "grpc_metadata_batch_filter", - grpc_metadata_batch_filter(&exec_ctx, calld->recv_initial_metadata, - remove_consumed_md, elem, - "Response metadata filtering error")); - for (size_t i = 0; i < calld->md.count; i++) { - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); - } - grpc_metadata_array_destroy(&calld->md); - GRPC_CLOSURE_SCHED(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE); + error = grpc_metadata_batch_filter( + &exec_ctx, batch->payload->recv_initial_metadata.recv_initial_metadata, + remove_consumed_md, elem, "Response metadata filtering error"); } else { - for (size_t i = 0; i < calld->md.count; i++) { - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); - grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); - } - grpc_metadata_array_destroy(&calld->md); - error_details = error_details != NULL - ? error_details - : "Authentication metadata processing failed."; - if (calld->transport_op->send_message) { - grpc_byte_stream_destroy( - &exec_ctx, calld->transport_op->payload->send_message.send_message); - calld->transport_op->payload->send_message.send_message = NULL; + if (error_details == NULL) { + error_details = "Authentication metadata processing failed."; } - GRPC_CLOSURE_SCHED( - &exec_ctx, calld->on_done_recv, + error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status)); + GRPC_ERROR_INT_GRPC_STATUS, status); } - + for (size_t i = 0; i < calld->md.count; i++) { + grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); + grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); + } + grpc_metadata_array_destroy(&calld->md); + GRPC_CLOSURE_SCHED(&exec_ctx, calld->original_recv_initial_metadata_ready, + error); grpc_exec_ctx_finish(&exec_ctx); } -static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *error) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = arg; channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + grpc_transport_stream_op_batch *batch = calld->recv_initial_metadata_batch; if (error == GRPC_ERROR_NONE) { if (chand->creds != NULL && chand->creds->processor.process != NULL) { - calld->md = metadata_batch_to_md_array(calld->recv_initial_metadata); + calld->md = metadata_batch_to_md_array( + batch->payload->recv_initial_metadata.recv_initial_metadata); chand->creds->processor.process( chand->creds->processor.state, calld->auth_context, calld->md.metadata, calld->md.count, on_md_processing_done, elem); return; } } - GRPC_CLOSURE_SCHED(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } -static void set_recv_ops_md_callbacks(grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { +static void auth_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; - - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->on_done_recv = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->auth_on_recv; - calld->transport_op = op; + if (batch->recv_initial_metadata) { + // Inject our callback. + calld->recv_initial_metadata_batch = batch; + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; } -} - -/* Called either: - - in response to an API call (or similar) from above, to send something - - a network event (or similar) from below, to receive something - op contains type and call direction information, in addition to the data - that is being sent or received. */ -static void auth_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - set_recv_ops_md_callbacks(elem, op); - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - grpc_server_security_context *server_ctx = NULL; - - /* initialize members */ - memset(calld, 0, sizeof(*calld)); - GRPC_CLOSURE_INIT(&calld->auth_on_recv, auth_on_recv, elem, + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - + // Create server security context. Set its auth context from channel + // data and save it in the call context. + grpc_server_security_context *server_ctx = + grpc_server_security_context_create(); + server_ctx->auth_context = grpc_auth_context_create(chand->auth_context); + calld->auth_context = server_ctx->auth_context; if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) { args->context[GRPC_CONTEXT_SECURITY].destroy( args->context[GRPC_CONTEXT_SECURITY].value); } - - server_ctx = grpc_server_security_context_create(); - server_ctx->auth_context = grpc_auth_context_create(chand->auth_context); - calld->auth_context = server_ctx->auth_context; - args->context[GRPC_CONTEXT_SECURITY].value = server_ctx; args->context[GRPC_CONTEXT_SECURITY].destroy = grpc_server_security_context_destroy; - return GRPC_ERROR_NONE; } @@ -221,19 +187,15 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { + GPR_ASSERT(!args->is_last); + channel_data *chand = elem->channel_data; grpc_auth_context *auth_context = grpc_find_auth_context_in_args(args->channel_args); - grpc_server_credentials *creds = - grpc_find_server_credentials_in_args(args->channel_args); - /* grab pointers to our data from the channel element */ - channel_data *chand = elem->channel_data; - - GPR_ASSERT(!args->is_last); GPR_ASSERT(auth_context != NULL); - - /* initialize members */ chand->auth_context = GRPC_AUTH_CONTEXT_REF(auth_context, "server_auth_filter"); + grpc_server_credentials *creds = + grpc_find_server_credentials_in_args(args->channel_args); chand->creds = grpc_server_credentials_ref(creds); return GRPC_ERROR_NONE; } @@ -241,14 +203,13 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, /* Destructor for channel data */ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { - /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "server_auth_filter"); grpc_server_credentials_unref(exec_ctx, chand->creds); } const grpc_channel_filter grpc_server_auth_filter = { - auth_start_transport_op, + auth_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c index b433c61b4c..9e0f73ae3d 100644 --- a/src/core/lib/support/arena.c +++ b/src/core/lib/support/arena.c @@ -38,7 +38,7 @@ struct gpr_arena { gpr_arena *gpr_arena_create(size_t initial_size) { initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size); - gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size); + gpr_arena *a = (gpr_arena *)gpr_zalloc(sizeof(gpr_arena) + initial_size); a->initial_zone.size_end = initial_size; return a; } @@ -64,7 +64,7 @@ void *gpr_arena_alloc(gpr_arena *arena, size_t size) { zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm); if (next_z == NULL) { size_t next_z_size = (size_t)gpr_atm_no_barrier_load(&arena->size_so_far); - next_z = gpr_zalloc(sizeof(zone) + next_z_size); + next_z = (zone *)gpr_zalloc(sizeof(zone) + next_z_size); next_z->size_begin = z->size_end; next_z->size_end = z->size_end + next_z_size; if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) { diff --git a/src/core/lib/support/atm.c b/src/core/lib/support/atm.c index caa0bafe33..2f37d62f76 100644 --- a/src/core/lib/support/atm.c +++ b/src/core/lib/support/atm.c @@ -21,12 +21,12 @@ gpr_atm gpr_atm_no_barrier_clamped_add(gpr_atm *value, gpr_atm delta, gpr_atm min, gpr_atm max) { - gpr_atm current; - gpr_atm new; + gpr_atm current_value; + gpr_atm new_value; do { - current = gpr_atm_no_barrier_load(value); - new = GPR_CLAMP(current + delta, min, max); - if (new == current) break; - } while (!gpr_atm_no_barrier_cas(value, current, new)); - return new; + current_value = gpr_atm_no_barrier_load(value); + new_value = GPR_CLAMP(current_value + delta, min, max); + if (new_value == current_value) break; + } while (!gpr_atm_no_barrier_cas(value, current_value, new_value)); + return new_value; } diff --git a/src/core/lib/support/avl.c b/src/core/lib/support/avl.c index aa0f665272..a6178fdbce 100644 --- a/src/core/lib/support/avl.c +++ b/src/core/lib/support/avl.c @@ -76,7 +76,7 @@ static gpr_avl_node *assert_invariants(gpr_avl_node *n) { return n; } gpr_avl_node *new_node(void *key, void *value, gpr_avl_node *left, gpr_avl_node *right) { - gpr_avl_node *node = gpr_malloc(sizeof(*node)); + gpr_avl_node *node = (gpr_avl_node *)gpr_malloc(sizeof(*node)); gpr_ref_init(&node->refs, 1); node->key = key; node->value = value; diff --git a/src/core/lib/support/mpscq.c b/src/core/lib/support/mpscq.c index 58c4c435d3..e9f893988d 100644 --- a/src/core/lib/support/mpscq.c +++ b/src/core/lib/support/mpscq.c @@ -31,12 +31,11 @@ void gpr_mpscq_destroy(gpr_mpscq *q) { GPR_ASSERT(q->tail == &q->stub); } -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n) { gpr_atm_no_barrier_store(&n->next, (gpr_atm)NULL); gpr_mpscq_node *prev = (gpr_mpscq_node *)gpr_atm_full_xchg(&q->head, (gpr_atm)n); gpr_atm_rel_store(&prev->next, (gpr_atm)n); - return prev == &q->stub; } gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q) { @@ -78,25 +77,3 @@ gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty) { *empty = false; return NULL; } - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q) { - gpr_mpscq_init(&q->queue); - q->read_lock = GPR_SPINLOCK_INITIALIZER; -} - -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q) { - gpr_mpscq_destroy(&q->queue); -} - -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n) { - return gpr_mpscq_push(&q->queue, n); -} - -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q) { - if (gpr_spinlock_trylock(&q->read_lock)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&q->queue); - gpr_spinlock_unlock(&q->read_lock); - return n; - } - return NULL; -} diff --git a/src/core/lib/support/mpscq.h b/src/core/lib/support/mpscq.h index 2f4739d7f8..daa51768f7 100644 --- a/src/core/lib/support/mpscq.h +++ b/src/core/lib/support/mpscq.h @@ -22,7 +22,6 @@ #include <grpc/support/atm.h> #include <stdbool.h> #include <stddef.h> -#include "src/core/lib/support/spinlock.h" // Multiple-producer single-consumer lock free queue, based upon the // implementation from Dmitry Vyukov here: @@ -44,34 +43,11 @@ typedef struct gpr_mpscq { void gpr_mpscq_init(gpr_mpscq *q); void gpr_mpscq_destroy(gpr_mpscq *q); // Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); +void gpr_mpscq_push(gpr_mpscq *q, gpr_mpscq_node *n); // Pop a node (returns NULL if no node is ready - which doesn't indicate that // the queue is empty!!) -// Thread compatible - can only be called from one thread at a time gpr_mpscq_node *gpr_mpscq_pop(gpr_mpscq *q); // Pop a node; sets *empty to true if the queue is empty, or false if it is not gpr_mpscq_node *gpr_mpscq_pop_and_check_end(gpr_mpscq *q, bool *empty); -// An mpscq with a spinlock: it's safe to pop from multiple threads, but doing -// only one thread will succeed concurrently -typedef struct gpr_locked_mpscq { - gpr_mpscq queue; - gpr_spinlock read_lock; -} gpr_locked_mpscq; - -void gpr_locked_mpscq_init(gpr_locked_mpscq *q); -void gpr_locked_mpscq_destroy(gpr_locked_mpscq *q); -// Push a node -// Thread safe - can be called from multiple threads concurrently -// Returns true if this was possibly the first node (may return true -// sporadically, will not return false sporadically) -bool gpr_locked_mpscq_push(gpr_locked_mpscq *q, gpr_mpscq_node *n); -// Pop a node (returns NULL if no node is ready - which doesn't indicate that -// the queue is empty!!) -// Thread safe - can be called from multiple threads concurrently -gpr_mpscq_node *gpr_locked_mpscq_pop(gpr_locked_mpscq *q); - #endif /* GRPC_CORE_LIB_SUPPORT_MPSCQ_H */ diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c new file mode 100644 index 0000000000..0fb64ed001 --- /dev/null +++ b/src/core/lib/support/stack_lockfree.c @@ -0,0 +1,137 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/support/stack_lockfree.h" + +#include <stdlib.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/port_platform.h> + +/* The lockfree node structure is a single architecture-level + word that allows for an atomic CAS to set it up. */ +struct lockfree_node_contents { + /* next thing to look at. Actual index for head, next index otherwise */ + uint16_t index; +#ifdef GPR_ARCH_64 + uint16_t pad; + uint32_t aba_ctr; +#else +#ifdef GPR_ARCH_32 + uint16_t aba_ctr; +#else +#error Unsupported bit width architecture +#endif +#endif +}; + +/* Use a union to make sure that these are in the same bits as an atm word */ +typedef union lockfree_node { + gpr_atm atm; + struct lockfree_node_contents contents; +} lockfree_node; + +/* make sure that entries aligned to 8-bytes */ +#define ENTRY_ALIGNMENT_BITS 3 +/* reserve this entry as invalid */ +#define INVALID_ENTRY_INDEX ((1 << 16) - 1) + +struct gpr_stack_lockfree { + lockfree_node *entries; + lockfree_node head; /* An atomic entry describing curr head */ +}; + +gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) { + gpr_stack_lockfree *stack; + stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack)); + /* Since we only allocate 16 bits to represent an entry number, + * make sure that we are within the desired range */ + /* Reserve the highest entry number as a dummy */ + GPR_ASSERT(entries < INVALID_ENTRY_INDEX); + stack->entries = (lockfree_node *)gpr_malloc_aligned( + entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS); + /* Clear out all entries */ + memset(stack->entries, 0, entries * sizeof(stack->entries[0])); + memset(&stack->head, 0, sizeof(stack->head)); + + GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents)); + + /* Point the head at reserved dummy entry */ + stack->head.contents.index = INVALID_ENTRY_INDEX; +/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */ +#ifdef GPR_ARCH_64 + stack->head.contents.pad = 0; +#endif + stack->head.contents.aba_ctr = 0; + return stack; +} + +void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) { + gpr_free_aligned(stack->entries); + gpr_free(stack); +} + +int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { + lockfree_node head; + lockfree_node newhead; + lockfree_node curent; + lockfree_node newent; + + /* First fill in the entry's index and aba ctr for new head */ + newhead.contents.index = (uint16_t)entry; +#ifdef GPR_ARCH_64 + /* Fill in the pad to avoid confusing memcheck tools */ + newhead.contents.pad = 0; +#endif + + /* Also post-increment the aba_ctr */ + curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newhead.contents.aba_ctr = ++curent.contents.aba_ctr; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm); + + do { + /* Atomically get the existing head value for use */ + head.atm = gpr_atm_no_barrier_load(&(stack->head.atm)); + /* Point to it */ + newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); + newent.contents.index = head.contents.index; + gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm); + } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); + /* Use rel_cas above to make sure that entry index is set properly */ + return head.contents.index == INVALID_ENTRY_INDEX; +} + +int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { + lockfree_node head; + lockfree_node newhead; + + do { + head.atm = gpr_atm_acq_load(&(stack->head.atm)); + if (head.contents.index == INVALID_ENTRY_INDEX) { + return -1; + } + newhead.atm = + gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm)); + + } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm)); + + return head.contents.index; +} diff --git a/src/core/lib/support/stack_lockfree.h b/src/core/lib/support/stack_lockfree.h new file mode 100644 index 0000000000..6324211b72 --- /dev/null +++ b/src/core/lib/support/stack_lockfree.h @@ -0,0 +1,38 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H +#define GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H + +#include <stddef.h> + +typedef struct gpr_stack_lockfree gpr_stack_lockfree; + +/* This stack must specify the maximum number of entries to track. + The current implementation only allows up to 65534 entries */ +gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries); +void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack); + +/* Pass in a valid entry number for the next stack entry */ +/* Returns 1 if this is the first element on the stack, 0 otherwise */ +int gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry); + +/* Returns -1 on empty or the actual entry number */ +int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack); + +#endif /* GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H */ diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 84ddf74ab9..0cd436883a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -32,8 +32,7 @@ #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/slice/slice_internal.h" -#include "src/core/lib/support/mpscq.h" -#include "src/core/lib/support/spinlock.h" +#include "src/core/lib/support/stack_lockfree.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -62,7 +61,6 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; grpc_tracer_flag grpc_server_channel_trace = GRPC_TRACER_INITIALIZER(false); typedef struct requested_call { - gpr_mpscq_node request_link; /* must be first */ requested_call_type type; size_t cq_idx; void *tag; @@ -162,7 +160,7 @@ struct request_matcher { grpc_server *server; call_data *pending_head; call_data *pending_tail; - gpr_locked_mpscq *requests_per_cq; + gpr_stack_lockfree **requests_per_cq; }; struct registered_method { @@ -207,6 +205,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; + /** requested call backing data */ + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -306,20 +309,21 @@ static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx, * request_matcher */ -static void request_matcher_init(request_matcher *rm, grpc_server *server) { +static void request_matcher_init(request_matcher *rm, size_t entries, + grpc_server *server) { memset(rm, 0, sizeof(*rm)); rm->server = server; rm->requests_per_cq = gpr_malloc(sizeof(*rm->requests_per_cq) * server->cq_count); for (size_t i = 0; i < server->cq_count; i++) { - gpr_locked_mpscq_init(&rm->requests_per_cq[i]); + rm->requests_per_cq[i] = gpr_stack_lockfree_create(entries); } } static void request_matcher_destroy(request_matcher *rm) { for (size_t i = 0; i < rm->server->cq_count; i++) { - GPR_ASSERT(gpr_locked_mpscq_pop(&rm->requests_per_cq[i]) == NULL); - gpr_locked_mpscq_destroy(&rm->requests_per_cq[i]); + GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests_per_cq[i]) == -1); + gpr_stack_lockfree_destroy(rm->requests_per_cq[i]); } gpr_free(rm->requests_per_cq); } @@ -349,17 +353,13 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, grpc_server *server, request_matcher *rm, grpc_error *error) { - requested_call *rc; + int request_id; for (size_t i = 0; i < server->cq_count; i++) { - /* Here we know: - 1. no requests are being added (since the server is shut down) - 2. no other threads are pulling (since the shut down process is single - threaded) - So, we can ignore the queue lock and just pop, with the guarantee that a - NULL returned here truly means that the queue is empty */ - while ((rc = (requested_call *)gpr_mpscq_pop( - &rm->requests_per_cq[i].queue)) != NULL) { - fail_call(exec_ctx, server, i, rc, GRPC_ERROR_REF(error)); + while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != + -1) { + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], + GRPC_ERROR_REF(error)); } } GRPC_ERROR_UNREF(error); @@ -394,7 +394,13 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(exec_ctx, server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); @@ -452,7 +458,21 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, grpc_cq_completion *c) { - gpr_free(req); + requested_call *rc = req; + grpc_server *server = rc->server; + + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); + } else { + gpr_free(req); + } + + server_unref(exec_ctx, server); } static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, @@ -482,6 +502,10 @@ static void publish_call(grpc_exec_ctx *exec_ctx, grpc_server *server, GPR_UNREACHABLE_CODE(return ); } + grpc_call_element *elem = + grpc_call_stack_element(grpc_call_get_call_stack(call), 0); + channel_data *chand = elem->channel_data; + server_ref(chand->server); grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, GRPC_ERROR_NONE, done_request_event, rc, &rc->completion); } @@ -509,15 +533,15 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, for (size_t i = 0; i < server->cq_count; i++) { size_t cq_idx = (chand->cq_idx + i) % server->cq_count; - requested_call *rc = - (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) { + int request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) { continue; } else { gpr_mu_lock(&calld->mu_state); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -992,6 +1016,8 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { server->root_channel_data.next = server->root_channel_data.prev = &server->root_channel_data; + /* TODO(ctiller): expose a channel_arg for this */ + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1064,15 +1090,29 @@ void grpc_server_start(grpc_server *server) { server->started = true; server->pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } - request_matcher_init(&server->unregistered_request_matcher, server); + request_matcher_init(&server->unregistered_request_matcher, + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } server_ref(server); @@ -1326,11 +1366,21 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, requested_call *rc) { call_data *calld = NULL; request_matcher *rm = NULL; + int request_id; if (gpr_atm_acq_load(&server->shutdown_flag)) { fail_call(exec_ctx, server, cq_idx, rc, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Server Shutdown")); return GRPC_CALL_OK; } + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); + if (request_id == -1) { + /* out of request ids: just fail this one */ + fail_call(exec_ctx, server, cq_idx, rc, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Out of request ids"), + GRPC_ERROR_INT_LIMIT, server->max_requested_calls_per_cq)); + return GRPC_CALL_OK; + } switch (rc->type) { case BATCH_CALL: rm = &server->unregistered_request_matcher; @@ -1339,13 +1389,15 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - if (gpr_locked_mpscq_push(&rm->requests_per_cq[cq_idx], &rc->request_link)) { + server->requested_calls_per_cq[cq_idx][request_id] = *rc; + gpr_free(rc); + if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start matching calls */ gpr_mu_lock(&server->mu_call); while ((calld = rm->pending_head) != NULL) { - rc = (requested_call *)gpr_locked_mpscq_pop(&rm->requests_per_cq[cq_idx]); - if (rc == NULL) break; + request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[cq_idx]); + if (request_id == -1) break; rm->pending_head = calld->pending_next; gpr_mu_unlock(&server->mu_call); gpr_mu_lock(&calld->mu_state); @@ -1361,7 +1413,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); - publish_call(exec_ctx, server, calld, cq_idx, rc); + publish_call(exec_ctx, server, calld, cq_idx, + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1468,6 +1521,7 @@ static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server, rc->initial_metadata->count = 0; GPR_ASSERT(error != GRPC_ERROR_NONE); + server_ref(server); grpc_cq_end_op(exec_ctx, server->cqs[cq_idx], rc->tag, error, done_request_event, rc, &rc->completion); } diff --git a/src/core/lib/transport/static_metadata.c b/src/core/lib/transport/static_metadata.c index 404c240589..2388f19f81 100644 --- a/src/core/lib/transport/static_metadata.c +++ b/src/core/lib/transport/static_metadata.c @@ -464,7 +464,8 @@ grpc_mdelem grpc_static_mdelem_for_static_strings(int a, int b) { if (a == -1 || b == -1) return GRPC_MDNULL; uint32_t k = (uint32_t)(a * 99 + b); uint32_t h = elems_phash(k); - return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k + return h < GPR_ARRAY_SIZE(elem_keys) && elem_keys[h] == k && + elem_idxs[h] != 255 ? GRPC_MAKE_MDELEM(&grpc_static_mdelem_table[elem_idxs[h]], GRPC_MDELEM_STORAGE_STATIC) : GRPC_MDNULL; diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 37d4f038b7..1fd65928f9 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -411,15 +411,11 @@ static tsi_result do_ssl_read(SSL *ssl, unsigned char *unprotected_bytes, GPR_ASSERT(*unprotected_bytes_size <= INT_MAX); read_from_ssl = SSL_read(ssl, unprotected_bytes, (int)*unprotected_bytes_size); - if (read_from_ssl == 0) { - gpr_log(GPR_ERROR, "SSL_read returned 0 unexpectedly."); - return TSI_INTERNAL_ERROR; - } - if (read_from_ssl < 0) { + if (read_from_ssl <= 0) { read_from_ssl = SSL_get_error(ssl, read_from_ssl); switch (read_from_ssl) { - case SSL_ERROR_WANT_READ: - /* We need more data to finish the frame. */ + case SSL_ERROR_ZERO_RETURN: /* Received a close_notify alert. */ + case SSL_ERROR_WANT_READ: /* We need more data to finish the frame. */ *unprotected_bytes_size = 0; return TSI_OK; case SSL_ERROR_WANT_WRITE: |