diff options
author | 2017-07-26 17:02:31 -0700 | |
---|---|---|
committer | 2017-07-26 17:02:31 -0700 | |
commit | 06038954232cfe5facb69764ac73b204c97bfc08 (patch) | |
tree | fe5e8e8103d4cbe49cf212ebb4fa3fc66406f8b8 /src/core/ext | |
parent | df174ccd697d4ccdc537da14177f85cbdddaeb76 (diff) | |
parent | ddc0d374886f3db33db90c6c1be163214cc5147d (diff) |
Merge remote-tracking branch 'upstream/master' into stream_compression_config
Diffstat (limited to 'src/core/ext')
34 files changed, 2530 insertions, 743 deletions
diff --git a/src/core/ext/filters/client_channel/OWNERS b/src/core/ext/filters/client_channel/OWNERS new file mode 100644 index 0000000000..773bc73179 --- /dev/null +++ b/src/core/ext/filters/client_channel/OWNERS @@ -0,0 +1,4 @@ +set noparent +@markdroth +@dgquintas +@ctiller diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index de516ab4c9..58e31d7b45 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -52,6 +52,9 @@ /* Client channel implementation */ +grpc_tracer_flag grpc_client_channel_trace = + GRPC_TRACER_INITIALIZER(false, "client_channel"); + /************************************************************************* * METHOD-CONFIG TABLE */ @@ -241,6 +244,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, GRPC_ERROR_REF(error)); } } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: setting connectivity state to %s", chand, + grpc_connectivity_state_name(state)); + } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, reason); } @@ -251,6 +258,10 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, grpc_connectivity_state publish_state = w->state; /* check if the notification is for the latest policy */ if (w->lb_policy == w->chand->lb_policy) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: lb_policy=%p state changed to %s", w->chand, + w->lb_policy, grpc_connectivity_state_name(w->state)); + } if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); @@ -263,7 +274,6 @@ static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state); } } - GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } @@ -273,7 +283,6 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); - w->chand = chand; GRPC_CLOSURE_INIT(&w->on_changed, on_lb_policy_state_changed_locked, w, grpc_combiner_scheduler(chand->combiner)); @@ -283,6 +292,18 @@ static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, &w->on_changed); } +static void start_resolving_locked(grpc_exec_ctx *exec_ctx, + channel_data *chand) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: starting name resolution", chand); + } + GPR_ASSERT(!chand->started_resolving); + chand->started_resolving = true; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result, + &chand->on_resolver_result_changed); +} + typedef struct { char *server_name; grpc_server_retry_throttle_data *retry_throttle_data; @@ -345,8 +366,14 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: got resolver result: error=%s", chand, + grpc_error_string(error)); + } // Extract the following fields from the resolver result, if non-NULL. + bool lb_policy_updated = false; char *lb_policy_name = NULL; + bool lb_policy_name_changed = false; grpc_lb_policy *new_lb_policy = NULL; char *service_config_json = NULL; grpc_server_retry_throttle_data *retry_throttle_data = NULL; @@ -394,11 +421,12 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // taking a lock on chand->info_mu, because this function is the // only thing that modifies its value, and it can only be invoked // once at any given time. - const bool lb_policy_type_changed = + lb_policy_name_changed = chand->info_lb_policy_name == NULL || strcmp(chand->info_lb_policy_name, lb_policy_name) != 0; - if (chand->lb_policy != NULL && !lb_policy_type_changed) { + if (chand->lb_policy != NULL && !lb_policy_name_changed) { // Continue using the same LB policy. Update with new addresses. + lb_policy_updated = true; grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); } else { // Instantiate new LB policy. @@ -445,6 +473,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p: resolver result: lb_policy_name=\"%s\"%s, " + "service_config=\"%s\"", + chand, lb_policy_name, lb_policy_name_changed ? " (changed)" : "", + service_config_json); + } // Now swap out fields in chand. Note that the new values may still // be NULL if (e.g.) the resolver failed to return results or the // results did not contain the necessary data. @@ -479,6 +514,10 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, if (new_lb_policy != NULL || error != GRPC_ERROR_NONE || chand->resolver == NULL) { if (chand->lb_policy != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: unreffing lb_policy=%p", chand, + chand->lb_policy); + } grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, chand->interested_parties); @@ -489,7 +528,13 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // Now that we've swapped out the relevant fields of chand, check for // error or shutdown. if (error != GRPC_ERROR_NONE || chand->resolver == NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: shutting down", chand); + } if (chand->resolver != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: shutting down resolver", chand); + } grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; @@ -510,6 +555,9 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_error *state_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); if (new_lb_policy != NULL) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p: initializing new LB policy", chand); + } GRPC_ERROR_UNREF(state_error); state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy, &state_error); @@ -524,8 +572,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, } watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state); } - set_channel_connectivity_state_locked( - exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); + if (!lb_policy_updated) { + set_channel_connectivity_state_locked(exec_ctx, chand, state, + GRPC_ERROR_REF(state_error), + "new_lb+resolver"); + } grpc_resolver_next_locked(exec_ctx, chand->resolver, &chand->resolver_result, &chand->on_resolver_result_changed); @@ -772,7 +823,9 @@ typedef struct client_channel_call_data { gpr_atm subchannel_call_or_error; gpr_arena *arena; - bool pick_pending; + grpc_lb_policy *lb_policy; // Holds ref while LB pick is pending. + grpc_closure lb_pick_closure; + grpc_connected_subchannel *connected_subchannel; grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; @@ -837,8 +890,15 @@ static void waiting_for_pick_batches_add_locked( } static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, - call_data *calld, + grpc_call_element *elem, grpc_error *error) { + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failing %" PRIdPTR " pending batches: %s", + elem->channel_data, calld, calld->waiting_for_pick_batches_count, + grpc_error_string(error)); + } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error)); @@ -848,14 +908,21 @@ static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, } static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx, - call_data *calld) { + grpc_call_element *elem) { + call_data *calld = elem->call_data; if (calld->waiting_for_pick_batches_count == 0) return; call_or_error coe = get_call_or_error(calld); if (coe.error != GRPC_ERROR_NONE) { - waiting_for_pick_batches_fail_locked(exec_ctx, calld, + waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(coe.error)); return; } + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: sending %" PRIdPTR + " pending batches to subchannel_call=%p", + elem->channel_data, calld, calld->waiting_for_pick_batches_count, + coe.subchannel_call); + } for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, calld->waiting_for_pick_batches[i]); @@ -869,6 +936,10 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: applying service config to call", + chand, calld); + } if (chand->retry_throttle_data != NULL) { calld->retry_throttle_data = grpc_server_retry_throttle_data_ref(chand->retry_throttle_data); @@ -895,7 +966,9 @@ static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, } static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, - call_data *calld, grpc_error *error) { + grpc_call_element *elem, + grpc_error *error) { + call_data *calld = elem->call_data; grpc_subchannel_call *subchannel_call = NULL; const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, @@ -906,13 +979,18 @@ static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: create subchannel_call=%p: error=%s", + elem->channel_data, calld, subchannel_call, + grpc_error_string(new_error)); + } GPR_ASSERT(set_call_or_error( calld, (call_or_error){.subchannel_call = subchannel_call})); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, new_error); } else { - waiting_for_pick_batches_resume_locked(exec_ctx, calld); + waiting_for_pick_batches_resume_locked(exec_ctx, elem); } GRPC_ERROR_UNREF(error); } @@ -922,8 +1000,6 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, grpc_error *error) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT(calld->pick_pending); - calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); call_or_error coe = get_call_or_error(calld); @@ -935,8 +1011,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, "Call dropped by load balancing policy") : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to create subchannel", &error, 1); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: failed to create subchannel: error=%s", chand, + calld, grpc_error_string(failure)); + } set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, failure); } else if (coe.error != GRPC_ERROR_NONE) { /* already cancelled before subchannel became ready */ grpc_error *child_errors[] = {error, coe.error}; @@ -950,10 +1031,15 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED); } - waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: cancelled before subchannel became ready: %s", + chand, calld, grpc_error_string(cancellation_error)); + } + waiting_for_pick_batches_fail_locked(exec_ctx, elem, cancellation_error); } else { /* Create call on subchannel. */ - create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); + create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); GRPC_ERROR_UNREF(error); @@ -983,41 +1069,77 @@ typedef struct { grpc_closure closure; } pick_after_resolver_result_args; -static void continue_picking_after_resolver_result_locked( - grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void pick_after_resolver_result_done_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error) { pick_after_resolver_result_args *args = arg; if (args->cancelled) { /* cancelled, do nothing */ - } else if (error != GRPC_ERROR_NONE) { - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "call cancelled before resolver result"); + } } else { - if (pick_subchannel_locked(exec_ctx, args->elem)) { - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); + channel_data *chand = args->elem->channel_data; + call_data *calld = args->elem->call_data; + if (error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver failed to return data", + chand, calld); + } + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); + } else { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: resolver returned, doing pick", + chand, calld); + } + if (pick_subchannel_locked(exec_ctx, args->elem)) { + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); + } } } gpr_free(args); } -static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_error *error) { +static void pick_after_resolver_result_start_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: deferring pick pending resolver result", chand, + calld); + } + pick_after_resolver_result_args *args = + (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); + args->elem = elem; + GRPC_CLOSURE_INIT(&args->closure, pick_after_resolver_result_done_locked, + args, grpc_combiner_scheduler(chand->combiner)); + grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, + &args->closure, GRPC_ERROR_NONE); +} + +static void pick_after_resolver_result_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, - &calld->connected_subchannel, - GRPC_ERROR_REF(error)); - } // If we don't yet have a resolver result, then a closure for - // continue_picking_after_resolver_result_locked() will have been added to + // pick_after_resolver_result_done_locked() will have been added to // chand->waiting_for_resolver_result_closures, and it may not be invoked // until after this call has been destroyed. We mark the operation as - // cancelled, so that when continue_picking_after_resolver_result_locked() + // cancelled, so that when pick_after_resolver_result_done_locked() // is called, it will be a no-op. We also immediately invoke // subchannel_ready_locked() to propagate the error back to the caller. for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head; closure != NULL; closure = closure->next_data.next) { pick_after_resolver_result_args *args = closure->cb_arg; if (!args->cancelled && args->elem == elem) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: " + "cancelling pick waiting for resolver result", + chand, calld); + } args->cancelled = true; subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( @@ -1027,24 +1149,21 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GRPC_ERROR_UNREF(error); } -// State for pick callback that holds a reference to the LB policy -// from which the pick was requested. -typedef struct { - grpc_lb_policy *lb_policy; - grpc_call_element *elem; - grpc_closure closure; -} pick_callback_args; - // Callback invoked by grpc_lb_policy_pick_locked() for async picks. // Unrefs the LB policy after invoking subchannel_ready_locked(). static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - pick_callback_args *args = arg; - GPR_ASSERT(args != NULL); - GPR_ASSERT(args->lb_policy != NULL); - subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); - GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel"); - gpr_free(args); + grpc_call_element *elem = arg; + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed asynchronously", + chand, calld); + } + GPR_ASSERT(calld->lb_policy != NULL); + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; + subchannel_ready_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); } // Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). @@ -1055,23 +1174,44 @@ static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, const grpc_lb_policy_pick_args *inputs) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args)); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: starting pick on lb_policy=%p", + chand, calld, chand->lb_policy); + } + // Keep a ref to the LB policy in calld while the pick is pending. GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); - pick_args->lb_policy = chand->lb_policy; - pick_args->elem = elem; - GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args, + calld->lb_policy = chand->lb_policy; + GRPC_CLOSURE_INIT(&calld->lb_pick_closure, pick_callback_done_locked, elem, grpc_combiner_scheduler(chand->combiner)); const bool pick_done = grpc_lb_policy_pick_locked( exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel, - calld->subchannel_call_context, NULL, &pick_args->closure); + calld->subchannel_call_context, NULL, &calld->lb_pick_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ - GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel"); - gpr_free(pick_args); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: pick completed synchronously", + chand, calld); + } + GRPC_LB_POLICY_UNREF(exec_ctx, calld->lb_policy, "pick_subchannel"); + calld->lb_policy = NULL; } return pick_done; } +static void pick_callback_cancel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_error *error) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + GPR_ASSERT(calld->lb_policy != NULL); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: cancelling pick from LB policy %p", + chand, calld, calld->lb_policy); + } + grpc_lb_policy_cancel_pick_locked(exec_ctx, calld->lb_policy, + &calld->connected_subchannel, error); +} + static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { GPR_TIMER_BEGIN("pick_subchannel", 0); @@ -1107,20 +1247,9 @@ static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs); } else if (chand->resolver != NULL) { if (!chand->started_resolving) { - chand->started_resolving = true; - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); + start_resolving_locked(exec_ctx, chand); } - pick_after_resolver_result_args *args = - (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); - args->elem = elem; - GRPC_CLOSURE_INIT(&args->closure, - continue_picking_after_resolver_result_locked, args, - grpc_combiner_scheduler(chand->combiner)); - grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, - &args->closure, GRPC_ERROR_NONE); + pick_after_resolver_result_start_locked(exec_ctx, elem); } else { subchannel_ready_locked( exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); @@ -1133,63 +1262,77 @@ static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error_ignored) { GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); - grpc_transport_stream_op_batch *op = arg; - grpc_call_element *elem = op->handler_private.extra_arg; + grpc_transport_stream_op_batch *batch = arg; + grpc_call_element *elem = batch->handler_private.extra_arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; /* need to recheck that another thread hasn't set the call */ call_or_error coe = get_call_or_error(calld); if (coe.error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); + } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(coe.error)); + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); goto done; } if (coe.subchannel_call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); + } + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); goto done; } // Add to waiting-for-pick list. If we succeed in getting a // subchannel call below, we'll handle this batch (along with any // other waiting batches) in waiting_for_pick_batches_resume_locked(). - waiting_for_pick_batches_add_locked(calld, op); - /* if this is a cancellation, then we can raise our cancelled flag */ - if (op->cancel_stream) { - grpc_error *error = op->payload->cancel_stream.cancel_error; + waiting_for_pick_batches_add_locked(calld, batch); + // If this is a cancellation, cancel the pending pick (if any) and + // fail any pending batches. + if (batch->cancel_stream) { + grpc_error *error = batch->payload->cancel_stream.cancel_error; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: recording cancel_error=%s", chand, + calld, grpc_error_string(error)); + } /* Stash a copy of cancel_error in our call data, so that we can use it for subsequent operations. This ensures that if the call is - cancelled before any ops are passed down (e.g., if the deadline + cancelled before any batches are passed down (e.g., if the deadline is in the past when the call starts), we can return the right - error to the caller when the first op does get passed down. */ + error to the caller when the first batch does get passed down. */ set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); - if (calld->pick_pending) { - cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + if (calld->lb_policy != NULL) { + pick_callback_cancel_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); + } else { + pick_after_resolver_result_cancel_locked(exec_ctx, elem, + GRPC_ERROR_REF(error)); } - waiting_for_pick_batches_fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(error)); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); goto done; } /* if we don't have a subchannel, try to get one */ - if (!calld->pick_pending && calld->connected_subchannel == NULL && - op->send_initial_metadata) { - calld->initial_metadata_payload = op->payload; - calld->pick_pending = true; + if (batch->send_initial_metadata) { + GPR_ASSERT(calld->connected_subchannel == NULL); + calld->initial_metadata_payload = batch->payload; GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ if (pick_subchannel_locked(exec_ctx, elem)) { // Pick was returned synchronously. - calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); if (calld->connected_subchannel == NULL) { grpc_error *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Call dropped by load balancing policy"); set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); - waiting_for_pick_batches_fail_locked(exec_ctx, calld, error); + waiting_for_pick_batches_fail_locked(exec_ctx, elem, error); } else { // Create subchannel call. - create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE); + create_subchannel_call_locked(exec_ctx, elem, GRPC_ERROR_NONE); } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, @@ -1232,47 +1375,59 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { If it has, we proceed on the fast path. */ static void cc_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + if (GRPC_TRACER_ON(grpc_client_channel_trace) || + GRPC_TRACER_ON(grpc_trace_channel)) { + grpc_call_log_op(GPR_INFO, elem, batch); + } if (chand->deadline_checking_enabled) { grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, - op); + batch); } // Intercept on_complete for recv_trailing_metadata so that we can // check retry throttle status. - if (op->recv_trailing_metadata) { - GPR_ASSERT(op->on_complete != NULL); - calld->original_on_complete = op->on_complete; + if (batch->recv_trailing_metadata) { + GPR_ASSERT(batch->on_complete != NULL); + calld->original_on_complete = batch->on_complete; GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, grpc_schedule_on_exec_ctx); - op->on_complete = &calld->on_complete; + batch->on_complete = &calld->on_complete; } /* try to (atomically) get the call */ call_or_error coe = get_call_or_error(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); if (coe.error != GRPC_ERROR_NONE) { + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: failing batch with error: %s", + chand, calld, grpc_error_string(coe.error)); + } grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(coe.error)); - GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); - /* early out */ - return; + exec_ctx, batch, GRPC_ERROR_REF(coe.error)); + goto done; } if (coe.subchannel_call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); - GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); - /* early out */ - return; + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, + "chand=%p calld=%p: sending batch to subchannel_call=%p", chand, + calld, coe.subchannel_call); + } + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, batch); + goto done; } /* we failed; lock and figure out what to do */ + if (GRPC_TRACER_ON(grpc_client_channel_trace)) { + gpr_log(GPR_DEBUG, "chand=%p calld=%p: entering combiner", chand, calld); + } GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op_batch"); - op->handler_private.extra_arg = elem; + batch->handler_private.extra_arg = elem; GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_INIT(&op->handler_private.closure, - start_transport_stream_op_batch_locked, op, + exec_ctx, GRPC_CLOSURE_INIT(&batch->handler_private.closure, + start_transport_stream_op_batch_locked, batch, grpc_combiner_scheduler(chand->combiner)), GRPC_ERROR_NONE); +done: GPR_TIMER_END("cc_start_transport_stream_op_batch", 0); } @@ -1317,7 +1472,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, coe.subchannel_call, "client_channel_destroy_call"); } - GPR_ASSERT(!calld->pick_pending); + GPR_ASSERT(calld->lb_policy == NULL); GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, @@ -1366,11 +1521,7 @@ static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { chand->exit_idle_when_lb_policy_arrives = true; if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = true; - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); + start_resolving_locked(exec_ctx, chand); } } GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); diff --git a/src/core/ext/filters/client_channel/client_channel.h b/src/core/ext/filters/client_channel/client_channel.h index 63f7c29940..c99f0092e9 100644 --- a/src/core/ext/filters/client_channel/client_channel.h +++ b/src/core/ext/filters/client_channel/client_channel.h @@ -23,6 +23,8 @@ #include "src/core/ext/filters/client_channel/resolver.h" #include "src/core/lib/channel/channel_stack.h" +extern grpc_tracer_flag grpc_client_channel_trace; + // Channel arg key for server URI string. #define GRPC_ARG_SERVER_URI "grpc.server_uri" diff --git a/src/core/ext/filters/client_channel/client_channel_plugin.c b/src/core/ext/filters/client_channel/client_channel_plugin.c index 60e77d6268..c32e83d012 100644 --- a/src/core/ext/filters/client_channel/client_channel_plugin.c +++ b/src/core/ext/filters/client_channel/client_channel_plugin.c @@ -78,8 +78,9 @@ void grpc_client_channel_init(void) { GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, append_filter, (void *)&grpc_client_channel_filter); grpc_http_connect_register_handshaker_factory(); + grpc_register_tracer(&grpc_client_channel_trace); #ifndef NDEBUG - grpc_register_tracer("resolver_refcount", &grpc_trace_resolver_refcount); + grpc_register_tracer(&grpc_trace_resolver_refcount); #endif } diff --git a/src/core/ext/filters/client_channel/http_proxy.c b/src/core/ext/filters/client_channel/http_proxy.c index cfb5ec6f00..ef3512ed83 100644 --- a/src/core/ext/filters/client_channel/http_proxy.c +++ b/src/core/ext/filters/client_channel/http_proxy.c @@ -22,6 +22,7 @@ #include <string.h> #include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> @@ -29,14 +30,23 @@ #include "src/core/ext/filters/client_channel/proxy_mapper_registry.h" #include "src/core/ext/filters/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/slice/b64.h" #include "src/core/lib/support/env.h" +#include "src/core/lib/support/string.h" -static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) { +/** + * Parses the 'http_proxy' env var and returns the proxy hostname to resolve or + * NULL on error. Also sets 'user_cred' to user credentials if present in the + * 'http_proxy' env var, otherwise leaves it unchanged. It is caller's + * responsibility to gpr_free user_cred. + */ +static char* get_http_proxy_server(grpc_exec_ctx* exec_ctx, char** user_cred) { + GPR_ASSERT(user_cred != NULL); + char* proxy_name = NULL; char* uri_str = gpr_getenv("http_proxy"); if (uri_str == NULL) return NULL; grpc_uri* uri = grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */); - char* proxy_name = NULL; if (uri == NULL || uri->authority == NULL) { gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var"); goto done; @@ -45,11 +55,27 @@ static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) { gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme); goto done; } - if (strchr(uri->authority, '@') != NULL) { - gpr_log(GPR_ERROR, "userinfo not supported in proxy URI"); - goto done; + /* Split on '@' to separate user credentials from host */ + char** authority_strs = NULL; + size_t authority_nstrs; + gpr_string_split(uri->authority, "@", &authority_strs, &authority_nstrs); + GPR_ASSERT(authority_nstrs != 0); /* should have at least 1 string */ + if (authority_nstrs == 1) { + /* User cred not present in authority */ + proxy_name = authority_strs[0]; + } else if (authority_nstrs == 2) { + /* User cred found */ + *user_cred = authority_strs[0]; + proxy_name = authority_strs[1]; + gpr_log(GPR_DEBUG, "userinfo found in proxy URI"); + } else { + /* Bad authority */ + for (size_t i = 0; i < authority_nstrs; i++) { + gpr_free(authority_strs[i]); + } + proxy_name = NULL; } - proxy_name = gpr_strdup(uri->authority); + gpr_free(authority_strs); done: gpr_free(uri_str); grpc_uri_destroy(uri); @@ -62,7 +88,8 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx, const grpc_channel_args* args, char** name_to_resolve, grpc_channel_args** new_args) { - *name_to_resolve = grpc_get_http_proxy_server(exec_ctx); + char* user_cred = NULL; + *name_to_resolve = get_http_proxy_server(exec_ctx, &user_cred); if (*name_to_resolve == NULL) return false; grpc_uri* uri = grpc_uri_parse(exec_ctx, server_uri, false /* suppress_errors */); @@ -71,19 +98,82 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx, "'http_proxy' environment variable set, but cannot " "parse server URI '%s' -- not using proxy", server_uri); - if (uri != NULL) grpc_uri_destroy(uri); + if (uri != NULL) { + gpr_free(user_cred); + grpc_uri_destroy(uri); + } return false; } if (strcmp(uri->scheme, "unix") == 0) { gpr_log(GPR_INFO, "not using proxy for Unix domain socket '%s'", server_uri); + gpr_free(user_cred); grpc_uri_destroy(uri); return false; } - grpc_arg new_arg = grpc_channel_arg_string_create( + char* no_proxy_str = gpr_getenv("no_proxy"); + if (no_proxy_str != NULL) { + static const char* NO_PROXY_SEPARATOR = ","; + bool use_proxy = true; + char* server_host; + char* server_port; + if (!gpr_split_host_port(uri->path[0] == '/' ? uri->path + 1 : uri->path, + &server_host, &server_port)) { + gpr_log(GPR_INFO, + "unable to split host and port, not checking no_proxy list for " + "host '%s'", + server_uri); + } else { + size_t uri_len = strlen(server_host); + char** no_proxy_hosts; + size_t num_no_proxy_hosts; + gpr_string_split(no_proxy_str, NO_PROXY_SEPARATOR, &no_proxy_hosts, + &num_no_proxy_hosts); + for (size_t i = 0; i < num_no_proxy_hosts; i++) { + char* no_proxy_entry = no_proxy_hosts[i]; + size_t no_proxy_len = strlen(no_proxy_entry); + if (no_proxy_len <= uri_len && + gpr_stricmp(no_proxy_entry, &server_host[uri_len - no_proxy_len]) == + 0) { + gpr_log(GPR_INFO, "not using proxy for host in no_proxy list '%s'", + server_uri); + use_proxy = false; + break; + } + } + for (size_t i = 0; i < num_no_proxy_hosts; i++) { + gpr_free(no_proxy_hosts[i]); + } + gpr_free(no_proxy_hosts); + gpr_free(server_host); + gpr_free(server_port); + if (!use_proxy) { + grpc_uri_destroy(uri); + gpr_free(*name_to_resolve); + *name_to_resolve = NULL; + return false; + } + } + } + grpc_arg args_to_add[2]; + args_to_add[0] = grpc_channel_arg_string_create( GRPC_ARG_HTTP_CONNECT_SERVER, uri->path[0] == '/' ? uri->path + 1 : uri->path); - *new_args = grpc_channel_args_copy_and_add(args, &new_arg, 1); + if (user_cred != NULL) { + /* Use base64 encoding for user credentials as stated in RFC 7617 */ + char* encoded_user_cred = + grpc_base64_encode(user_cred, strlen(user_cred), 0, 0); + char* header; + gpr_asprintf(&header, "Proxy-Authorization:Basic %s", encoded_user_cred); + gpr_free(encoded_user_cred); + args_to_add[1] = + grpc_channel_arg_string_create(GRPC_ARG_HTTP_CONNECT_HEADERS, header); + *new_args = grpc_channel_args_copy_and_add(args, args_to_add, 2); + gpr_free(header); + } else { + *new_args = grpc_channel_args_copy_and_add(args, args_to_add, 1); + } + gpr_free(user_cred); grpc_uri_destroy(uri); return true; } diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index 8d69ba6af5..dd95a135cf 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -22,7 +22,8 @@ #define WEAK_REF_BITS 16 #ifndef NDEBUG -grpc_tracer_flag grpc_trace_lb_policy_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_lb_policy_refcount = + GRPC_TRACER_INITIALIZER(false, "lb_policy_refcount"); #endif void grpc_lb_policy_init(grpc_lb_policy *policy, @@ -53,7 +54,7 @@ static gpr_atm ref_mutate(grpc_lb_policy *c, gpr_atm delta, #ifndef NDEBUG if (GRPC_TRACER_ON(grpc_trace_lb_policy_refcount)) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, - "LB_POLICY: 0x%p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c, + "LB_POLICY: %p %12s 0x%" PRIxPTR " -> 0x%" PRIxPTR " [%s]", c, purpose, old_val, old_val + delta, reason); } #endif diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index 5a5ff2902d..ebce801b37 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -123,7 +123,7 @@ #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2 -grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_lb_glb_trace = GRPC_TRACER_INITIALIZER(false, "glb"); /* add lb_token of selected subchannel (address) to the call's initial * metadata */ @@ -491,11 +491,8 @@ static grpc_lb_addresses *process_serverlist_locked( for (size_t i = 0; i < serverlist->num_servers; ++i) { if (is_server_valid(serverlist->servers[i], i, true)) ++num_valid; } - if (num_valid == 0) return NULL; - grpc_lb_addresses *lb_addresses = grpc_lb_addresses_create(num_valid, &lb_token_vtable); - /* second pass: actually populate the addresses and LB tokens (aka user data * to the outside world) to be read by the RR policy during its creation. * Given that the validity tests are very cheap, they are performed again @@ -503,14 +500,12 @@ static grpc_lb_addresses *process_serverlist_locked( * incurr in an allocation due to the arbitrary number of server */ size_t addr_idx = 0; for (size_t sl_idx = 0; sl_idx < serverlist->num_servers; ++sl_idx) { - GPR_ASSERT(addr_idx < num_valid); const grpc_grpclb_server *server = serverlist->servers[sl_idx]; if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; - + GPR_ASSERT(addr_idx < num_valid); /* address processing */ grpc_resolved_address addr; parse_server(server, &addr); - /* lb token processing */ void *user_data; if (server->has_load_balance_token) { @@ -596,7 +591,7 @@ static void update_lb_connectivity_status_locked( grpc_connectivity_state_name(rr_state), (void *)glb_policy->rr_policy); } grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, rr_state, - GRPC_ERROR_REF(rr_state_error), + rr_state_error, "update_lb_connectivity_status_locked"); } @@ -678,11 +673,12 @@ static bool pick_from_internal_rr_locked( static grpc_lb_policy_args *lb_policy_args_create(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { + grpc_lb_addresses *addresses = + process_serverlist_locked(exec_ctx, glb_policy->serverlist); + GPR_ASSERT(addresses != NULL); grpc_lb_policy_args *args = gpr_zalloc(sizeof(*args)); args->client_channel_factory = glb_policy->cc_factory; args->combiner = glb_policy->base.combiner; - grpc_lb_addresses *addresses = - process_serverlist_locked(exec_ctx, glb_policy->serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; @@ -727,7 +723,6 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, /* Connectivity state is a function of the RR policy updated/created */ update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_state, rr_state_error); - /* Add the gRPC LB's interested_parties pollset_set to that of the newly * created RR policy. This will make the RR policy progress upon activity on * gRPC LB, which in turn is tied to the application's call */ @@ -761,8 +756,8 @@ static void create_rr_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, pp->wrapped_on_complete_arg.client_stats = grpc_grpclb_client_stats_ref(glb_policy->client_stats); if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { - gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", - (intptr_t)glb_policy->rr_policy); + gpr_log(GPR_INFO, "Pending pick about to (async) PICK from %p", + (void *)glb_policy->rr_policy); } pick_from_internal_rr_locked(exec_ctx, glb_policy, &pp->pick_args, true /* force_async */, pp->target, @@ -788,10 +783,9 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); - if (glb_policy->shutting_down) return; - grpc_lb_policy_args *args = lb_policy_args_create(exec_ctx, glb_policy); + GPR_ASSERT(args != NULL); if (glb_policy->rr_policy != NULL) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_DEBUG, "Updating Round Robin policy (%p)", @@ -826,8 +820,8 @@ static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, unref_needed = true; gpr_free(rr_connectivity); } else { /* rr state != SHUTDOWN && !shutting down: biz as usual */ - update_lb_connectivity_status_locked(exec_ctx, glb_policy, - rr_connectivity->state, error); + update_lb_connectivity_status_locked( + exec_ctx, glb_policy, rr_connectivity->state, GRPC_ERROR_REF(error)); /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ grpc_lb_policy_notify_on_state_change_locked( exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, @@ -1089,6 +1083,16 @@ static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } +// Cancel a specific pending pick. +// +// A grpclb pick progresses as follows: +// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be +// handed over to the RR policy (in create_rr_locked()). From that point +// onwards, it'll be RR's responsibility. For cancellations, that implies the +// pick needs also be cancelled by the RR instance. +// - Otherwise, without an RR instance, picks stay pending at this policy's +// level (grpclb), inside the glb_policy->pending_picks list. To cancel these, +// we invoke the completion closure and set *target to NULL right here. static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel **target, grpc_error *error) { @@ -1108,9 +1112,23 @@ static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } + if (glb_policy->rr_policy != NULL) { + grpc_lb_policy_cancel_pick_locked(exec_ctx, glb_policy->rr_policy, target, + GRPC_ERROR_REF(error)); + } GRPC_ERROR_UNREF(error); } +// Cancel all pending picks. +// +// A grpclb pick progresses as follows: +// - If there's a Round Robin policy (glb_policy->rr_policy) available, it'll be +// handed over to the RR policy (in create_rr_locked()). From that point +// onwards, it'll be RR's responsibility. For cancellations, that implies the +// pick needs also be cancelled by the RR instance. +// - Otherwise, without an RR instance, picks stay pending at this policy's +// level (grpclb), inside the glb_policy->pending_picks list. To cancel these, +// we invoke the completion closure and set *target to NULL right here. static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, @@ -1132,6 +1150,11 @@ static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, } pp = next; } + if (glb_policy->rr_policy != NULL) { + grpc_lb_policy_cancel_picks_locked( + exec_ctx, glb_policy->rr_policy, initial_metadata_flags_mask, + initial_metadata_flags_eq, GRPC_ERROR_REF(error)); + } GRPC_ERROR_UNREF(error); } @@ -1463,7 +1486,8 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op++; /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, + "lb_on_sent_initial_request_locked"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_sent_initial_request); @@ -1480,8 +1504,9 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->reserved = NULL; op++; /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref - * count goes to zero) to be unref'd in lb_on_server_status_received */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + * count goes to zero) to be unref'd in lb_on_server_status_received_locked */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, + "lb_on_server_status_received_locked"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_server_status_received); @@ -1493,8 +1518,9 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; - /* take another weak ref to be unref'd in lb_on_response_received */ - GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received"); + /* take another weak ref to be unref'd/reused in + * lb_on_response_received_locked */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received_locked"); call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), &glb_policy->lb_on_response_received); @@ -1511,13 +1537,12 @@ static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, do_send_client_load_report_locked(exec_ctx, glb_policy); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "lb_on_response_received_locked"); + "lb_on_sent_initial_request_locked"); } static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; @@ -1548,7 +1573,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, } /* take a weak ref (won't prevent calling of \a glb_shutdown() if the * strong ref count goes to zero) to be unref'd in - * send_client_load_report() */ + * send_client_load_report_locked() */ glb_policy->client_load_report_timer_pending = true; GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); schedule_next_client_load_report(exec_ctx, glb_policy); @@ -1576,7 +1601,6 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(ipport); } } - /* update serverlist */ if (serverlist->num_servers > 0) { if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, @@ -1611,9 +1635,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); } } - grpc_slice_unref_internal(exec_ctx, response_slice); - if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; @@ -1621,7 +1643,7 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, op->flags = 0; op->reserved = NULL; op++; - /* reuse the "lb_on_response_received" weak ref taken in + /* reuse the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), @@ -1629,10 +1651,10 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(GRPC_CALL_OK == call_error); } } else { /* empty payload: call cancelled. */ - /* dispose of the "lb_on_response_received" weak ref taken in + /* dispose of the "lb_on_response_received_locked" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "lb_on_response_received_empty_payload"); + "lb_on_response_received_locked_empty_payload"); } } @@ -1699,13 +1721,12 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, &glb_policy->lb_on_call_retry, now); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, - "lb_on_server_status_received"); + "lb_on_server_status_received_locked"); } static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { glb_lb_policy *glb_policy = (glb_lb_policy *)policy; - if (glb_policy->updating_lb_channel) { if (GRPC_TRACER_ON(grpc_lb_glb_trace)) { gpr_log(GPR_INFO, @@ -1813,9 +1834,11 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx, // lb_on_server_status_received will pick up the cancel and reinit // lb_call. if (glb_policy->pending_update_args != NULL) { - const grpc_lb_policy_args *args = glb_policy->pending_update_args; + grpc_lb_policy_args *args = glb_policy->pending_update_args; glb_policy->pending_update_args = NULL; glb_update_locked(exec_ctx, &glb_policy->base, args); + grpc_channel_args_destroy(exec_ctx, args->args); + gpr_free(args); } } else if (glb_policy->started_picking && !glb_policy->shutting_down) { if (glb_policy->retry_timer_active) { @@ -1879,9 +1902,9 @@ static bool maybe_add_client_load_reporting_filter( void grpc_lb_policy_grpclb_init() { grpc_register_lb_policy(grpc_glb_lb_factory_create()); - grpc_register_tracer("glb", &grpc_lb_glb_trace); + grpc_register_tracer(&grpc_lb_glb_trace); #ifndef NDEBUG - grpc_register_tracer("lb_policy_refcount", &grpc_trace_lb_policy_refcount); + grpc_register_tracer(&grpc_trace_lb_policy_refcount); #endif grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index d0acd7a901..fd0fb41fb9 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -28,7 +28,8 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" -grpc_tracer_flag grpc_lb_pick_first_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_lb_pick_first_trace = + GRPC_TRACER_INITIALIZER(false, "pick_first"); typedef struct pending_pick { struct pending_pick *next; @@ -707,7 +708,7 @@ static grpc_lb_policy_factory *pick_first_lb_factory_create() { void grpc_lb_policy_pick_first_init() { grpc_register_lb_policy(pick_first_lb_factory_create()); - grpc_register_tracer("pick_first", &grpc_lb_pick_first_trace); + grpc_register_tracer(&grpc_lb_pick_first_trace); } void grpc_lb_policy_pick_first_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index 8e9d6b0f47..bc40165cfb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -37,7 +37,8 @@ #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" -grpc_tracer_flag grpc_lb_round_robin_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_lb_round_robin_trace = + GRPC_TRACER_INITIALIZER(false, "round_robin"); /** List of entities waiting for a pick. * @@ -141,6 +142,21 @@ struct rr_subchannel_list { bool shutting_down; }; +static rr_subchannel_list *rr_subchannel_list_create(round_robin_lb_policy *p, + size_t num_subchannels) { + rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); + subchannel_list->policy = p; + subchannel_list->subchannels = + gpr_zalloc(sizeof(subchannel_data) * num_subchannels); + subchannel_list->num_subchannels = num_subchannels; + gpr_ref_init(&subchannel_list->refcount, 1); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_INFO, "[RR %p] Created subchannel list %p for %lu subchannels", + (void *)p, (void *)subchannel_list, (unsigned long)num_subchannels); + } + return subchannel_list; +} + static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list) { GPR_ASSERT(subchannel_list->shutting_down); @@ -158,6 +174,7 @@ static void rr_subchannel_list_destroy(grpc_exec_ctx *exec_ctx, if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + sd->user_data = NULL; } } gpr_free(subchannel_list->subchannels); @@ -169,9 +186,9 @@ static void rr_subchannel_list_ref(rr_subchannel_list *subchannel_list, gpr_ref_non_zero(&subchannel_list->refcount); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu", + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p REF %lu->%lu (%s)", (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count - 1), (unsigned long)count); + (unsigned long)(count - 1), (unsigned long)count, reason); } } @@ -181,9 +198,9 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, const bool done = gpr_unref(&subchannel_list->refcount); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { const gpr_atm count = gpr_atm_acq_load(&subchannel_list->refcount.count); - gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu", + gpr_log(GPR_INFO, "[RR %p] subchannel_list %p UNREF %lu->%lu (%s)", (void *)subchannel_list->policy, (void *)subchannel_list, - (unsigned long)(count + 1), (unsigned long)count); + (unsigned long)(count + 1), (unsigned long)count, reason); } if (done) { rr_subchannel_list_destroy(exec_ctx, subchannel_list); @@ -192,14 +209,27 @@ static void rr_subchannel_list_unref(grpc_exec_ctx *exec_ctx, /** Mark \a subchannel_list as discarded. Unsubscribes all its subchannels. The * watcher's callback will ultimately unref \a subchannel_list. */ -static void rr_subchannel_list_shutdown(grpc_exec_ctx *exec_ctx, - rr_subchannel_list *subchannel_list, - const char *reason) { +static void rr_subchannel_list_shutdown_and_unref( + grpc_exec_ctx *exec_ctx, rr_subchannel_list *subchannel_list, + const char *reason) { + GPR_ASSERT(!subchannel_list->shutting_down); + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, "[RR %p] Shutting down subchannel_list %p (%s)", + (void *)subchannel_list->policy, (void *)subchannel_list, reason); + } GPR_ASSERT(!subchannel_list->shutting_down); subchannel_list->shutting_down = true; for (size_t i = 0; i < subchannel_list->num_subchannels; i++) { subchannel_data *sd = &subchannel_list->subchannels[i]; if (sd->subchannel != NULL) { // if subchannel isn't shutdown, unsubscribe. + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_DEBUG, + "[RR %p] Unsubscribing from subchannel %p as part of shutting down " + "subchannel_list %p", + (void *)subchannel_list->policy, (void *)sd->subchannel, + (void *)subchannel_list); + } grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); @@ -228,13 +258,14 @@ static size_t get_next_ready_subchannel_index_locked( const size_t index = (i + p->last_ready_subchannel_index + 1) % p->subchannel_list->num_subchannels; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " - "state=%d", - (void *)p, - (void *)p->subchannel_list->subchannels[index].subchannel, - (void *)p->subchannel_list, (unsigned long)index, - p->subchannel_list->subchannels[index].curr_connectivity_state); + gpr_log( + GPR_DEBUG, + "[RR %p] checking subchannel %p, subchannel_list %p, index %lu: " + "state=%s", + (void *)p, (void *)p->subchannel_list->subchannels[index].subchannel, + (void *)p->subchannel_list, (unsigned long)index, + grpc_connectivity_state_name( + p->subchannel_list->subchannels[index].curr_connectivity_state)); } if (p->subchannel_list->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { @@ -274,7 +305,8 @@ static void update_last_ready_subchannel_index_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); + gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy at %p", + (void *)pol, (void *)pol); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p); @@ -283,7 +315,8 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); + gpr_log(GPR_DEBUG, "[RR %p] Shutting down Round Robin policy at %p", + (void *)pol, (void *)pol); } p->shutdown = true; pending_pick *pp; @@ -298,9 +331,18 @@ static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel Shutdown"), "rr_shutdown"); - rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_shutdown"); + const bool latest_is_current = + p->subchannel_list == p->latest_pending_subchannel_list; + rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_rr_shutdown"); p->subchannel_list = NULL; + if (!latest_is_current && p->latest_pending_subchannel_list != NULL && + !p->latest_pending_subchannel_list->shutting_down) { + rr_subchannel_list_shutdown_and_unref(exec_ctx, + p->latest_pending_subchannel_list, + "sl_shutdown_pending_rr_shutdown"); + p->latest_pending_subchannel_list = NULL; + } } static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -356,8 +398,8 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, p->started_picking = true; for (size_t i = 0; i < p->subchannel_list->num_subchannels; i++) { subchannel_data *sd = &p->subchannel_list->subchannels[i]; - GRPC_LB_POLICY_WEAK_REF(&p->base, "rr_connectivity"); - rr_subchannel_list_ref(sd->subchannel_list, "start_picking"); + GRPC_LB_POLICY_WEAK_REF(&p->base, "start_picking_locked"); + rr_subchannel_list_ref(sd->subchannel_list, "started_picking"); grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -379,7 +421,7 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); + gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol); } if (p->subchannel_list != NULL) { const size_t next_ready_index = get_next_ready_subchannel_index_locked(p); @@ -395,8 +437,8 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log( GPR_DEBUG, - "[RR %p] PICKED TARGET <-- SUBCHANNEL %p (CONNECTED %p) (SL %p, " - "INDEX %lu)", + "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " + "index %lu)", (void *)p, (void *)sd->subchannel, (void *)*target, (void *)sd->subchannel_list, (unsigned long)next_ready_index); } @@ -511,38 +553,53 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->subchannel_list->policy; + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log( + GPR_DEBUG, + "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " + "prev_state=%s new_state=%s p->shutdown=%d " + "sd->subchannel_list->shutting_down=%d error=%s", + (void *)p, (void *)sd->subchannel, (void *)sd->subchannel_list, + grpc_connectivity_state_name(sd->prev_connectivity_state), + grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), + p->shutdown, sd->subchannel_list->shutting_down, + grpc_error_string(error)); + } // If the policy is shutting down, unref and return. if (p->shutdown) { - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "pol_shutdown"); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, + "pol_shutdown+started_picking"); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pol_shutdown"); return; } - if (sd->subchannel_list->shutting_down) { + if (sd->subchannel_list->shutting_down && error == GRPC_ERROR_CANCELLED) { // the subchannel list associated with sd has been discarded. This callback - // corresponds to the unsubscription. - GPR_ASSERT(error == GRPC_ERROR_CANCELLED); - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sl_shutdown"); - GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown"); + // corresponds to the unsubscription. The unrefs correspond to the picking + // ref (start_picking_locked or update_started_picking). + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, + "sl_shutdown+started_picking"); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "sl_shutdown+picking"); return; } // Dispose of outdated subchannel lists. if (sd->subchannel_list != p->subchannel_list && sd->subchannel_list != p->latest_pending_subchannel_list) { - // sd belongs to an outdated subchannel_list: get rid of it. - rr_subchannel_list_shutdown(exec_ctx, sd->subchannel_list, "sl_oudated"); + char *reason = NULL; + if (sd->subchannel_list->shutting_down) { + reason = "sl_outdated_straggler"; + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, reason); + } else { + reason = "sl_outdated"; + rr_subchannel_list_shutdown_and_unref(exec_ctx, sd->subchannel_list, + reason); + } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, reason); return; } // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, - "[RR %p] connectivity changed for subchannel %p: " - "prev_state=%d new_state=%d", - (void *)p, (void *)sd->subchannel, sd->prev_connectivity_state, - sd->curr_connectivity_state); - } // Update state counters and determine new overall state. update_state_counters_locked(sd); sd->prev_connectivity_state = sd->curr_connectivity_state; @@ -556,9 +613,10 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, if (sd->user_data != NULL) { GPR_ASSERT(sd->user_data_vtable != NULL); sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + sd->user_data = NULL; } if (new_policy_connectivity_state == GRPC_CHANNEL_SHUTDOWN) { - /* the policy is shutting down. Flush all the pending picks... */ + // the policy is shutting down. Flush all the pending picks... pending_pick *pp; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -567,8 +625,9 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(pp); } } - /* unref the "rr_connectivity" weak ref from start_picking */ - rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, "sd_shutdown"); + rr_subchannel_list_unref(exec_ctx, sd->subchannel_list, + "sd_shutdown+started_picking"); + // unref the "rr_connectivity_update" weak ref from start_picking. GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity_sd_shutdown"); } else { // sd not in SHUTDOWN @@ -593,10 +652,10 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (p->subchannel_list != NULL) { // dispose of the current subchannel_list - rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_update_connectivity"); + rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_phase_out_shutdown"); } - p->subchannel_list = sd->subchannel_list; + p->subchannel_list = p->latest_pending_subchannel_list; p->latest_pending_subchannel_list = NULL; } /* at this point we know there's at least one suitable subchannel. Go @@ -607,8 +666,8 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, subchannel_data *selected = &p->subchannel_list->subchannels[next_ready_index]; if (p->pending_picks != NULL) { - /* if the selected subchannel is going to be used for the pending - * picks, update the last picked pointer */ + // if the selected subchannel is going to be used for the pending + // picks, update the last picked pointer update_last_ready_subchannel_index_locked(p, next_ready_index); } pending_pick *pp; @@ -622,16 +681,17 @@ static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, } if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_DEBUG, - "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (INDEX %lu)", - (void *)selected->subchannel, - (unsigned long)next_ready_index); + "[RR %p] Fulfilling pending pick. Target <-- subchannel %p " + "(subchannel_list %p, index %lu)", + (void *)p, (void *)selected->subchannel, + (void *)p->subchannel_list, (unsigned long)next_ready_index); } GRPC_CLOSURE_SCHED(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } } - /* renew notification: reuses the "rr_connectivity" weak ref on the policy - * as well as the sd->subchannel_list ref. */ + /* renew notification: reuses the "rr_connectivity_update" weak ref on the + * policy as well as the sd->subchannel_list ref. */ grpc_subchannel_notify_on_state_change( exec_ctx, sd->subchannel, p->base.interested_parties, &sd->pending_connectivity_state_unsafe, @@ -689,8 +749,7 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } else { // otherwise, keep using the current subchannel list (ignore this update). gpr_log(GPR_ERROR, - "No valid LB addresses channel arg for Round Robin %p update, " - "ignoring.", + "[RR %p] No valid LB addresses channel arg for update, ignoring.", (void *)p); } return; @@ -700,30 +759,32 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, for (size_t i = 0; i < addresses->num_addresses; i++) { if (!addresses->addresses[i].is_balancer) ++num_addrs; } + rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs); if (num_addrs == 0) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); if (p->subchannel_list != NULL) { - rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "sl_shutdown_rr_update"); - p->subchannel_list = NULL; + rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "sl_shutdown_empty_update"); } + p->subchannel_list = subchannel_list; // empty list return; } size_t subchannel_index = 0; - rr_subchannel_list *subchannel_list = gpr_zalloc(sizeof(*subchannel_list)); - subchannel_list->policy = p; - subchannel_list->subchannels = - gpr_zalloc(sizeof(subchannel_data) * num_addrs); - subchannel_list->num_subchannels = num_addrs; - gpr_ref_init(&subchannel_list->refcount, 1); - p->latest_pending_subchannel_list = subchannel_list; - if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Created subchannel list %p for %lu subchannels", - (void *)subchannel_list, (unsigned long)num_addrs); + if (p->latest_pending_subchannel_list != NULL && p->started_picking) { + if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { + gpr_log(GPR_DEBUG, + "[RR %p] Shutting down latest pending subchannel list %p, about " + "to be replaced by newer latest %p", + (void *)p, (void *)p->latest_pending_subchannel_list, + (void *)subchannel_list); + } + rr_subchannel_list_shutdown_and_unref( + exec_ctx, p->latest_pending_subchannel_list, "sl_outdated_dont_smash"); } + p->latest_pending_subchannel_list = subchannel_list; grpc_subchannel_args sc_args; /* We need to remove the LB addresses in order to be able to compare the * subchannel keys of subchannels from a different batch of addresses. */ @@ -747,11 +808,12 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { char *address_uri = grpc_sockaddr_to_uri(&addresses->addresses[i].address); - gpr_log(GPR_DEBUG, - "index %lu: Created subchannel %p for address uri %s into " - "subchannel_list %p", - (unsigned long)subchannel_index, (void *)subchannel, address_uri, - (void *)subchannel_list); + gpr_log( + GPR_DEBUG, + "[RR %p] index %lu: Created subchannel %p for address uri %s into " + "subchannel_list %p", + (void *)p, (unsigned long)subchannel_index, (void *)subchannel, + address_uri, (void *)subchannel_list); gpr_free(address_uri); } grpc_channel_args_destroy(exec_ctx, new_args); @@ -790,10 +852,11 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, // The policy isn't picking yet. Save the update for later, disposing of // previous version if any. if (p->subchannel_list != NULL) { - rr_subchannel_list_shutdown(exec_ctx, p->subchannel_list, - "rr_update_before_started_picking"); + rr_subchannel_list_shutdown_and_unref(exec_ctx, p->subchannel_list, + "rr_update_before_started_picking"); } p->subchannel_list = subchannel_list; + p->latest_pending_subchannel_list = NULL; } } @@ -818,12 +881,12 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_lb_policy_args *args) { GPR_ASSERT(args->client_channel_factory != NULL); round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); - rr_update_locked(exec_ctx, &p->base, args); grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); + rr_update_locked(exec_ctx, &p->base, args); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { - gpr_log(GPR_DEBUG, "Created Round Robin %p with %lu subchannels", (void *)p, + gpr_log(GPR_DEBUG, "[RR %p] Created with %lu subchannels", (void *)p, (unsigned long)p->subchannel_list->num_subchannels); } return &p->base; @@ -844,7 +907,7 @@ static grpc_lb_policy_factory *round_robin_lb_factory_create() { void grpc_lb_policy_round_robin_init() { grpc_register_lb_policy(round_robin_lb_factory_create()); - grpc_register_tracer("round_robin", &grpc_lb_round_robin_trace); + grpc_register_tracer(&grpc_lb_round_robin_trace); } void grpc_lb_policy_round_robin_shutdown() {} diff --git a/src/core/ext/filters/client_channel/resolver.c b/src/core/ext/filters/client_channel/resolver.c index de9a8ce41b..8401504fcf 100644 --- a/src/core/ext/filters/client_channel/resolver.c +++ b/src/core/ext/filters/client_channel/resolver.c @@ -20,7 +20,8 @@ #include "src/core/lib/iomgr/combiner.h" #ifndef NDEBUG -grpc_tracer_flag grpc_trace_resolver_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_resolver_refcount = + GRPC_TRACER_INITIALIZER(false, "resolver_refcount"); #endif void grpc_resolver_init(grpc_resolver *resolver, diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c index 1ab8295e9e..b696344eab 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.c @@ -103,10 +103,9 @@ static void fd_node_destroy(grpc_exec_ctx *exec_ctx, fd_node *fdn) { grpc_pollset_set_del_fd(exec_ctx, fdn->ev_driver->pollset_set, fdn->grpc_fd); /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following - grpc_fd_orphan. To prevent this fd from being closed by grpc_fd_orphan, - a fd pointer is provided. */ - int fd; - grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, &fd, "c-ares query finished"); + grpc_fd_orphan. */ + grpc_fd_orphan(exec_ctx, fdn->grpc_fd, NULL, NULL, true /* already_closed */, + "c-ares query finished"); gpr_free(fdn); } diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c index 244b260dfa..9065e33613 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.c @@ -236,12 +236,12 @@ static void on_srv_query_done_cb(void *arg, int status, int timeouts, srv_it = srv_it->next) { if (grpc_ipv6_loopback_available()) { grpc_ares_hostbyname_request *hr = create_hostbyname_request( - r, srv_it->host, srv_it->port, true /* is_balancer */); + r, srv_it->host, htons(srv_it->port), true /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET6, on_hostbyname_done_cb, hr); } grpc_ares_hostbyname_request *hr = create_hostbyname_request( - r, srv_it->host, srv_it->port, true /* is_balancer */); + r, srv_it->host, htons(srv_it->port), true /* is_balancer */); ares_gethostbyname(*channel, hr->host, AF_INET, on_hostbyname_done_cb, hr); grpc_ares_ev_driver_start(&exec_ctx, r->ev_driver); diff --git a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c index af3391a731..5ea75f0554 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/dns/native/dns_resolver.c @@ -132,7 +132,7 @@ static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - dns_resolver *r = arg; + dns_resolver *r = (dns_resolver *)arg; r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { @@ -146,7 +146,7 @@ static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - dns_resolver *r = arg; + dns_resolver *r = (dns_resolver *)arg; grpc_channel_args *result = NULL; GPR_ASSERT(r->resolving); r->resolving = false; @@ -241,7 +241,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, char *path = args->uri->path; if (path[0] == '/') ++path; // Create resolver. - dns_resolver *r = gpr_zalloc(sizeof(dns_resolver)); + dns_resolver *r = (dns_resolver *)gpr_zalloc(sizeof(dns_resolver)); grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner); r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index 479ba393a2..56ed4371a9 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -101,9 +101,6 @@ static void fake_resolver_maybe_finish_next_locked(grpc_exec_ctx* exec_ctx, static void fake_resolver_channel_saw_error_locked(grpc_exec_ctx* exec_ctx, grpc_resolver* resolver) { fake_resolver* r = (fake_resolver*)resolver; - gpr_log( - GPR_INFO, - "FOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO"); if (r->next_results == NULL && r->results_upon_error != NULL) { // Pretend we re-resolved. r->next_results = grpc_channel_args_copy(r->results_upon_error); diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c index 7b4fe38272..7ceb8f40a1 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c @@ -177,7 +177,8 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, return NULL; } /* Instantiate resolver. */ - sockaddr_resolver *r = gpr_zalloc(sizeof(sockaddr_resolver)); + sockaddr_resolver *r = + (sockaddr_resolver *)gpr_zalloc(sizeof(sockaddr_resolver)); r->addresses = addresses; r->channel_args = grpc_channel_args_copy(args->args); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner); diff --git a/src/core/ext/filters/client_channel/retry_throttle.c b/src/core/ext/filters/client_channel/retry_throttle.c index 3009e21d49..0c7a3ae651 100644 --- a/src/core/ext/filters/client_channel/retry_throttle.c +++ b/src/core/ext/filters/client_channel/retry_throttle.c @@ -130,24 +130,28 @@ static grpc_server_retry_throttle_data* grpc_server_retry_throttle_data_create( // avl vtable for string -> server_retry_throttle_data map // -static void* copy_server_name(void* key) { return gpr_strdup(key); } +static void* copy_server_name(void* key, void* unused) { + return gpr_strdup(key); +} -static long compare_server_name(void* key1, void* key2) { +static long compare_server_name(void* key1, void* key2, void* unused) { return strcmp(key1, key2); } -static void destroy_server_retry_throttle_data(void* value) { +static void destroy_server_retry_throttle_data(void* value, void* unused) { grpc_server_retry_throttle_data* throttle_data = value; grpc_server_retry_throttle_data_unref(throttle_data); } -static void* copy_server_retry_throttle_data(void* value) { +static void* copy_server_retry_throttle_data(void* value, void* unused) { grpc_server_retry_throttle_data* throttle_data = value; return grpc_server_retry_throttle_data_ref(throttle_data); } +static void destroy_server_name(void* key, void* unused) { gpr_free(key); } + static const gpr_avl_vtable avl_vtable = { - gpr_free /* destroy_key */, copy_server_name, compare_server_name, + destroy_server_name, copy_server_name, compare_server_name, destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; // @@ -164,19 +168,19 @@ void grpc_retry_throttle_map_init() { void grpc_retry_throttle_map_shutdown() { gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_avl); + gpr_avl_unref(g_avl, NULL); } grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( const char* server_name, int max_milli_tokens, int milli_token_ratio) { gpr_mu_lock(&g_mu); grpc_server_retry_throttle_data* throttle_data = - gpr_avl_get(g_avl, (char*)server_name); + gpr_avl_get(g_avl, (char*)server_name, NULL); if (throttle_data == NULL) { // Entry not found. Create a new one. throttle_data = grpc_server_retry_throttle_data_create( max_milli_tokens, milli_token_ratio, NULL); - g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data, NULL); } else { if (throttle_data->max_milli_tokens != max_milli_tokens || throttle_data->milli_token_ratio != milli_token_ratio) { @@ -184,7 +188,7 @@ grpc_server_retry_throttle_data* grpc_retry_throttle_map_get_data_for_server( // the original one. throttle_data = grpc_server_retry_throttle_data_create( max_milli_tokens, milli_token_ratio, throttle_data); - g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data); + g_avl = gpr_avl_add(g_avl, (char*)server_name, throttle_data, NULL); } else { // Entry found. Increase refcount. grpc_server_retry_throttle_data_ref(throttle_data); diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index 88157ed738..5788819331 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -188,6 +188,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_connector_unref(exec_ctx, c->connector); grpc_pollset_set_destroy(exec_ctx, c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); + gpr_mu_destroy(&c->mu); gpr_free(c); } diff --git a/src/core/ext/filters/client_channel/subchannel_index.c b/src/core/ext/filters/client_channel/subchannel_index.c index e291ca9db9..ababd05d84 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.c +++ b/src/core/ext/filters/client_channel/subchannel_index.c @@ -38,23 +38,7 @@ struct grpc_subchannel_key { grpc_subchannel_args args; }; -GPR_TLS_DECL(subchannel_index_exec_ctx); - -static void enter_ctx(grpc_exec_ctx *exec_ctx) { - GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == 0); - gpr_tls_set(&subchannel_index_exec_ctx, (intptr_t)exec_ctx); -} - -static void leave_ctx(grpc_exec_ctx *exec_ctx) { - GPR_ASSERT(gpr_tls_get(&subchannel_index_exec_ctx) == (intptr_t)exec_ctx); - gpr_tls_set(&subchannel_index_exec_ctx, 0); -} - -static grpc_exec_ctx *current_ctx() { - grpc_exec_ctx *c = (grpc_exec_ctx *)gpr_tls_get(&subchannel_index_exec_ctx); - GPR_ASSERT(c != NULL); - return c; -} +static bool g_force_creation = false; static grpc_subchannel_key *create_key( const grpc_subchannel_args *args, @@ -84,6 +68,7 @@ static grpc_subchannel_key *subchannel_key_copy(grpc_subchannel_key *k) { int grpc_subchannel_key_compare(const grpc_subchannel_key *a, const grpc_subchannel_key *b) { + if (g_force_creation) return false; int c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; if (a->args.filter_count > 0) { @@ -101,21 +86,25 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, gpr_free(k); } -static void sck_avl_destroy(void *p) { - grpc_subchannel_key_destroy(current_ctx(), p); +static void sck_avl_destroy(void *p, void *user_data) { + grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; + grpc_subchannel_key_destroy(exec_ctx, p); } -static void *sck_avl_copy(void *p) { return subchannel_key_copy(p); } +static void *sck_avl_copy(void *p, void *unused) { + return subchannel_key_copy(p); +} -static long sck_avl_compare(void *a, void *b) { +static long sck_avl_compare(void *a, void *b, void *unused) { return grpc_subchannel_key_compare(a, b); } -static void scv_avl_destroy(void *p) { - GRPC_SUBCHANNEL_WEAK_UNREF(current_ctx(), p, "subchannel_index"); +static void scv_avl_destroy(void *p, void *user_data) { + grpc_exec_ctx *exec_ctx = (grpc_exec_ctx *)user_data; + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, p, "subchannel_index"); } -static void *scv_avl_copy(void *p) { +static void *scv_avl_copy(void *p, void *unused) { GRPC_SUBCHANNEL_WEAK_REF(p, "subchannel_index"); return p; } @@ -130,38 +119,33 @@ static const gpr_avl_vtable subchannel_avl_vtable = { void grpc_subchannel_index_init(void) { g_subchannel_index = gpr_avl_create(&subchannel_avl_vtable); gpr_mu_init(&g_mu); - gpr_tls_init(&subchannel_index_exec_ctx); } void grpc_subchannel_index_shutdown(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_destroy(&g_mu); - gpr_avl_unref(g_subchannel_index); - gpr_tls_destroy(&subchannel_index_exec_ctx); + gpr_avl_unref(g_subchannel_index, &exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); } grpc_subchannel *grpc_subchannel_index_find(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key) { - enter_ctx(exec_ctx); - // Lock, and take a reference to the subchannel index. // We don't need to do the search under a lock as avl's are immutable. gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index); + gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); gpr_mu_unlock(&g_mu); - grpc_subchannel *c = - GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(gpr_avl_get(index, key), "index_find"); - gpr_avl_unref(index); + grpc_subchannel *c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF( + gpr_avl_get(index, key, exec_ctx), "index_find"); + gpr_avl_unref(index, exec_ctx); - leave_ctx(exec_ctx); return c; } grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed) { - enter_ctx(exec_ctx); - grpc_subchannel *c = NULL; bool need_to_unref_constructed; @@ -171,11 +155,11 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index); + gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); gpr_mu_unlock(&g_mu); // - Check to see if a subchannel already exists - c = gpr_avl_get(index, key); + c = gpr_avl_get(index, key, exec_ctx); if (c != NULL) { c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register"); } @@ -184,9 +168,9 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, need_to_unref_constructed = true; } else { // no -> update the avl and compare/swap - gpr_avl updated = - gpr_avl_add(gpr_avl_ref(index), subchannel_key_copy(key), - GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register")); + gpr_avl updated = gpr_avl_add( + gpr_avl_ref(index, exec_ctx), subchannel_key_copy(key), + GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"), exec_ctx); // it may happen (but it's expected to be unlikely) // that some other thread has changed the index: @@ -198,13 +182,11 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, } gpr_mu_unlock(&g_mu); - gpr_avl_unref(updated); + gpr_avl_unref(updated, exec_ctx); } - gpr_avl_unref(index); + gpr_avl_unref(index, exec_ctx); } - leave_ctx(exec_ctx); - if (need_to_unref_constructed) { GRPC_SUBCHANNEL_UNREF(exec_ctx, constructed, "index_register"); } @@ -215,27 +197,26 @@ grpc_subchannel *grpc_subchannel_index_register(grpc_exec_ctx *exec_ctx, void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *key, grpc_subchannel *constructed) { - enter_ctx(exec_ctx); - bool done = false; while (!done) { // Compare and swap loop: // - take a reference to the current index gpr_mu_lock(&g_mu); - gpr_avl index = gpr_avl_ref(g_subchannel_index); + gpr_avl index = gpr_avl_ref(g_subchannel_index, exec_ctx); gpr_mu_unlock(&g_mu); // Check to see if this key still refers to the previously // registered subchannel - grpc_subchannel *c = gpr_avl_get(index, key); + grpc_subchannel *c = gpr_avl_get(index, key, exec_ctx); if (c != constructed) { - gpr_avl_unref(index); + gpr_avl_unref(index, exec_ctx); break; } // compare and swap the update (some other thread may have // mutated the index behind us) - gpr_avl updated = gpr_avl_remove(gpr_avl_ref(index), key); + gpr_avl updated = + gpr_avl_remove(gpr_avl_ref(index, exec_ctx), key, exec_ctx); gpr_mu_lock(&g_mu); if (index.root == g_subchannel_index.root) { @@ -244,9 +225,11 @@ void grpc_subchannel_index_unregister(grpc_exec_ctx *exec_ctx, } gpr_mu_unlock(&g_mu); - gpr_avl_unref(updated); - gpr_avl_unref(index); + gpr_avl_unref(updated, exec_ctx); + gpr_avl_unref(index, exec_ctx); } +} - leave_ctx(exec_ctx); +void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) { + g_force_creation = force_creation; } diff --git a/src/core/ext/filters/client_channel/subchannel_index.h b/src/core/ext/filters/client_channel/subchannel_index.h index e303bfaa05..98d882a453 100644 --- a/src/core/ext/filters/client_channel/subchannel_index.h +++ b/src/core/ext/filters/client_channel/subchannel_index.h @@ -59,4 +59,16 @@ void grpc_subchannel_index_init(void); /** Shutdown the subchannel index (global) */ void grpc_subchannel_index_shutdown(void); +/** \em TEST ONLY. + * If \a force_creation is true, all key comparisons will be false, resulting in + * new subchannels always being created. Otherwise, the keys will be compared as + * usual. + * + * This function is *not* threadsafe on purpose: it should *only* be used in + * test code. + * + * Tests using this function \em MUST run tests with and without \a + * force_creation set. */ +void grpc_subchannel_index_test_only_set_force_creation(bool force_creation); + #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_SUBCHANNEL_INDEX_H */ diff --git a/src/core/ext/filters/deadline/deadline_filter.c b/src/core/ext/filters/deadline/deadline_filter.c index ced025e2e2..6789903c95 100644 --- a/src/core/ext/filters/deadline/deadline_filter.c +++ b/src/core/ext/filters/deadline/deadline_filter.c @@ -37,8 +37,8 @@ // Timer callback. static void timer_callback(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_call_element* elem = arg; - grpc_deadline_state* deadline_state = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)arg; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; if (error != GRPC_ERROR_CANCELLED) { grpc_call_element_signal_error( exec_ctx, elem, @@ -57,7 +57,7 @@ static void start_timer_if_needed(grpc_exec_ctx* exec_ctx, if (gpr_time_cmp(deadline, gpr_inf_future(GPR_CLOCK_MONOTONIC)) == 0) { return; } - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; grpc_deadline_timer_state cur_state; grpc_closure* closure = NULL; retry: @@ -112,7 +112,7 @@ static void cancel_timer_if_needed(grpc_exec_ctx* exec_ctx, // Callback run when the call is complete. static void on_complete(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_deadline_state* deadline_state = arg; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)arg; cancel_timer_if_needed(exec_ctx, deadline_state); // Invoke the next callback. GRPC_CLOSURE_RUN(exec_ctx, deadline_state->next_on_complete, @@ -145,7 +145,7 @@ static void start_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_stack* call_stack, gpr_timespec deadline) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; deadline_state->call_stack = call_stack; // Deadline will always be infinite on servers, so the timer will only be // set on clients with a finite deadline. @@ -169,13 +169,13 @@ void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, grpc_call_element* elem) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); } void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, gpr_timespec new_deadline) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; cancel_timer_if_needed(exec_ctx, deadline_state); start_timer_if_needed(exec_ctx, elem, new_deadline); } @@ -183,7 +183,7 @@ void grpc_deadline_state_reset(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, void grpc_deadline_state_client_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - grpc_deadline_state* deadline_state = elem->call_data; + grpc_deadline_state* deadline_state = (grpc_deadline_state*)elem->call_data; if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, deadline_state); } else { @@ -256,8 +256,8 @@ static void client_start_transport_stream_op_batch( // Callback for receiving initial metadata on the server. static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - grpc_call_element* elem = arg; - server_call_data* calld = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)arg; + server_call_data* calld = (server_call_data*)elem->call_data; // Get deadline from metadata and start the timer if needed. start_timer_if_needed(exec_ctx, elem, calld->recv_initial_metadata->deadline); // Invoke the next callback. @@ -269,7 +269,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* arg, static void server_start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - server_call_data* calld = elem->call_data; + server_call_data* calld = (server_call_data*)elem->call_data; if (op->cancel_stream) { cancel_timer_if_needed(exec_ctx, &calld->base.deadline_state); } else { @@ -341,8 +341,8 @@ static bool maybe_add_deadline_filter(grpc_exec_ctx* exec_ctx, void* arg) { return grpc_deadline_checking_enabled( grpc_channel_stack_builder_get_channel_arguments(builder)) - ? grpc_channel_stack_builder_prepend_filter(builder, arg, NULL, - NULL) + ? grpc_channel_stack_builder_prepend_filter( + builder, (const grpc_channel_filter*)arg, NULL, NULL) : true; } diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c index 90f0aed7a0..3ca01a41b5 100644 --- a/src/core/ext/filters/http/client/http_client_filter.c +++ b/src/core/ext/filters/http/client/http_client_filter.c @@ -36,41 +36,29 @@ static const size_t kMaxPayloadSizeForGet = 2048; typedef struct call_data { + // State for handling send_initial_metadata ops. grpc_linked_mdelem method; grpc_linked_mdelem scheme; grpc_linked_mdelem authority; grpc_linked_mdelem te_trailers; grpc_linked_mdelem content_type; grpc_linked_mdelem user_agent; - + // State for handling recv_initial_metadata ops. grpc_metadata_batch *recv_initial_metadata; + grpc_closure *original_recv_initial_metadata_ready; + grpc_closure recv_initial_metadata_ready; + // State for handling recv_trailing_metadata ops. grpc_metadata_batch *recv_trailing_metadata; - uint8_t *payload_bytes; - - /* Vars to read data off of send_message */ - grpc_transport_stream_op_batch *send_op; - uint32_t send_length; - uint32_t send_flags; - grpc_slice incoming_slice; - grpc_slice_buffer_stream replacement_stream; - grpc_slice_buffer slices; - /* flag that indicates that all slices of send_messages aren't availble */ - bool send_message_blocked; - - /** Closure to call when finished with the hc_on_recv hook */ - grpc_closure *on_done_recv_initial_metadata; - grpc_closure *on_done_recv_trailing_metadata; - grpc_closure *on_complete; - grpc_closure *post_send; - - /** Receive closures are chained: we inject this closure as the on_done_recv - up-call on transport_op, and remember to call our on_done_recv member - after handling it. */ - grpc_closure hc_on_recv_initial_metadata; - grpc_closure hc_on_recv_trailing_metadata; - grpc_closure hc_on_complete; - grpc_closure got_slice; - grpc_closure send_done; + grpc_closure *original_recv_trailing_metadata_on_complete; + grpc_closure recv_trailing_metadata_on_complete; + // State for handling send_message ops. + grpc_transport_stream_op_batch *send_message_batch; + size_t send_message_bytes_read; + grpc_byte_stream_cache send_message_cache; + grpc_caching_byte_stream send_message_caching_stream; + grpc_closure on_send_message_next_done; + grpc_closure *original_send_message_on_complete; + grpc_closure send_message_on_complete; } call_data; typedef struct channel_data { @@ -148,7 +136,7 @@ static grpc_error *client_filter_incoming_metadata(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *user_data, grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; @@ -158,11 +146,13 @@ static void hc_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_initial_metadata_ready, + error); } -static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, - void *user_data, grpc_error *error) { +static void recv_trailing_metadata_on_complete(grpc_exec_ctx *exec_ctx, + void *user_data, + grpc_error *error) { grpc_call_element *elem = user_data; call_data *calld = elem->call_data; if (error == GRPC_ERROR_NONE) { @@ -171,25 +161,131 @@ static void hc_on_recv_trailing_metadata(grpc_exec_ctx *exec_ctx, } else { GRPC_ERROR_REF(error); } - GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_trailing_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_recv_trailing_metadata_on_complete, + error); } -static void hc_on_complete(grpc_exec_ctx *exec_ctx, void *user_data, - grpc_error *error) { - grpc_call_element *elem = user_data; - call_data *calld = elem->call_data; - if (calld->payload_bytes) { - gpr_free(calld->payload_bytes); - calld->payload_bytes = NULL; +static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; + grpc_byte_stream_cache_destroy(exec_ctx, &calld->send_message_cache); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, + GRPC_ERROR_REF(error)); +} + +// Pulls a slice from the send_message byte stream, updating +// calld->send_message_bytes_read. +static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, + call_data *calld) { + grpc_slice incoming_slice; + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, &calld->send_message_caching_stream.base, &incoming_slice); + if (error == GRPC_ERROR_NONE) { + calld->send_message_bytes_read += GRPC_SLICE_LENGTH(incoming_slice); + grpc_slice_unref_internal(exec_ctx, incoming_slice); } - calld->on_complete->cb(exec_ctx, calld->on_complete->cb_arg, error); + return error; } -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); +// Reads as many slices as possible from the send_message byte stream. +// Upon successful return, if calld->send_message_bytes_read == +// calld->send_message_caching_stream.base.length, then we have completed +// reading from the byte stream; otherwise, an async read has been dispatched +// and on_send_message_next_done() will be invoked when it is complete. +static grpc_error *read_all_available_send_message_data(grpc_exec_ctx *exec_ctx, + call_data *calld) { + while (grpc_byte_stream_next(exec_ctx, + &calld->send_message_caching_stream.base, + ~(size_t)0, &calld->on_send_message_next_done)) { + grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) return error; + if (calld->send_message_bytes_read == + calld->send_message_caching_stream.base.length) { + break; + } + } + return GRPC_ERROR_NONE; +} + +// Async callback for grpc_byte_stream_next(). +static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + return; + } + error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + return; + } + // There may or may not be more to read, but we don't care. If we got + // here, then we know that all of the data was not available + // synchronously, so we were not able to do a cached call. Instead, + // we just reset the byte stream and then send down the batch as-is. + grpc_caching_byte_stream_reset(&calld->send_message_caching_stream); + grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); +} + +static char *slice_buffer_to_string(grpc_slice_buffer *slice_buffer) { + char *payload_bytes = gpr_malloc(slice_buffer->length + 1); + size_t offset = 0; + for (size_t i = 0; i < slice_buffer->count; ++i) { + memcpy(payload_bytes + offset, + GRPC_SLICE_START_PTR(slice_buffer->slices[i]), + GRPC_SLICE_LENGTH(slice_buffer->slices[i])); + offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]); + } + *(payload_bytes + offset) = '\0'; + return payload_bytes; +} + +// Modifies the path entry in the batch's send_initial_metadata to +// append the base64-encoded query for a GET request. +static grpc_error *update_path_for_get(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + call_data *calld = (call_data *)elem->call_data; + grpc_slice path_slice = + GRPC_MDVALUE(batch->payload->send_initial_metadata.send_initial_metadata + ->idx.named.path->md); + /* sum up individual component's lengths and allocate enough memory to + * hold combined path+query */ + size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); + estimated_len++; /* for the '?' */ + estimated_len += grpc_base64_estimate_encoded_size( + batch->payload->send_message.send_message->length, true /* url_safe */, + false /* multi_line */); + grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len); + /* memcopy individual pieces into this slice */ + char *write_ptr = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); + char *original_path = (char *)GRPC_SLICE_START_PTR(path_slice); + memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice)); + write_ptr += GRPC_SLICE_LENGTH(path_slice); + *write_ptr++ = '?'; + char *payload_bytes = + slice_buffer_to_string(&calld->send_message_cache.cache_buffer); + grpc_base64_encode_core((char *)write_ptr, payload_bytes, + batch->payload->send_message.send_message->length, + true /* url_safe */, false /* multi_line */); + gpr_free(payload_bytes); + /* remove trailing unused memory and add trailing 0 to terminate string */ + char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); + /* safe to use strlen since base64_encode will always add '\0' */ + path_with_query_slice = + grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t)); + /* substitute previous path with the new path+query */ + grpc_mdelem mdelem_path_and_query = + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); + grpc_metadata_batch *b = + batch->payload->send_initial_metadata.send_initial_metadata; + return grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, + mdelem_path_and_query); } static void remove_if_present(grpc_exec_ctx *exec_ctx, @@ -200,273 +296,153 @@ static void remove_if_present(grpc_exec_ctx *exec_ctx, } } -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { +static void hc_start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; - uint8_t *wrptr = calld->payload_bytes; - while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, - &calld->got_slice)) { - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice); - if (GRPC_SLICE_LENGTH(calld->incoming_slice) > 0) { - memcpy(wrptr, GRPC_SLICE_START_PTR(calld->incoming_slice), - GRPC_SLICE_LENGTH(calld->incoming_slice)); - } - wrptr += GRPC_SLICE_LENGTH(calld->incoming_slice); - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - calld->send_message_blocked = false; - break; - } - } -} + channel_data *channeld = elem->channel_data; + GPR_TIMER_BEGIN("hc_start_transport_stream_op_batch", 0); + GRPC_CALL_LOG_OP(GPR_INFO, elem, batch); -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - calld->send_message_blocked = false; - if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice)) { - /* Should never reach here */ - abort(); - } - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - /* Pass down the original send_message op that was blocked.*/ - grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - calld->send_flags); - calld->send_op->payload->send_message.send_message = - &calld->replacement_stream.base; - calld->post_send = calld->send_op->on_complete; - calld->send_op->on_complete = &calld->send_done; - grpc_call_next_op(exec_ctx, elem, calld->send_op); - } else { - continue_send_message(exec_ctx, elem); + if (batch->recv_initial_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_initial_metadata = + batch->payload->recv_initial_metadata.recv_initial_metadata; + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; } -} -static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - /* grab pointers to our data from the call element */ - call_data *calld = elem->call_data; - channel_data *channeld = elem->channel_data; - grpc_error *error; + if (batch->recv_trailing_metadata) { + /* substitute our callback for the higher callback */ + calld->recv_trailing_metadata = + batch->payload->recv_trailing_metadata.recv_trailing_metadata; + calld->original_recv_trailing_metadata_on_complete = batch->on_complete; + batch->on_complete = &calld->recv_trailing_metadata_on_complete; + } - if (op->send_initial_metadata) { - /* Decide which HTTP VERB to use. We use GET if the request is marked - cacheable, and the operation contains both initial metadata and send - message, and the payload is below the size threshold, and all the data - for this request is immediately available. */ + grpc_error *error = GRPC_ERROR_NONE; + bool batch_will_be_handled_asynchronously = false; + if (batch->send_initial_metadata) { + // Decide which HTTP VERB to use. We use GET if the request is marked + // cacheable, and the operation contains both initial metadata and send + // message, and the payload is below the size threshold, and all the data + // for this request is immediately available. grpc_mdelem method = GRPC_MDELEM_METHOD_POST; - if (op->send_message && - (op->payload->send_initial_metadata.send_initial_metadata_flags & + if (batch->send_message && + (batch->payload->send_initial_metadata.send_initial_metadata_flags & GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) && - op->payload->send_message.send_message->length < + batch->payload->send_message.send_message->length < channeld->max_payload_size_for_get) { - method = GRPC_MDELEM_METHOD_GET; - /* The following write to calld->send_message_blocked isn't racy with - reads in hc_start_transport_op (which deals with SEND_MESSAGE ops) because - being here means ops->send_message is not NULL, which is primarily - guarding the read there. */ - calld->send_message_blocked = true; - } else if (op->payload->send_initial_metadata.send_initial_metadata_flags & - GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { - method = GRPC_MDELEM_METHOD_PUT; - } - - /* Attempt to read the data from send_message and create a header field. */ - if (grpc_mdelem_eq(method, GRPC_MDELEM_METHOD_GET)) { - /* allocate memory to hold the entire payload */ - calld->payload_bytes = - gpr_malloc(op->payload->send_message.send_message->length); - - /* read slices of send_message and copy into payload_bytes */ - calld->send_op = op; - calld->send_length = op->payload->send_message.send_message->length; - calld->send_flags = op->payload->send_message.send_message->flags; - continue_send_message(exec_ctx, elem); - - if (calld->send_message_blocked == false) { - /* when all the send_message data is available, then modify the path - * MDELEM by appending base64 encoded query to the path */ - const int k_url_safe = 1; - const int k_multi_line = 0; - const unsigned char k_query_separator = '?'; - - grpc_slice path_slice = - GRPC_MDVALUE(op->payload->send_initial_metadata - .send_initial_metadata->idx.named.path->md); - /* sum up individual component's lengths and allocate enough memory to - * hold combined path+query */ - size_t estimated_len = GRPC_SLICE_LENGTH(path_slice); - estimated_len++; /* for the '?' */ - estimated_len += grpc_base64_estimate_encoded_size( - op->payload->send_message.send_message->length, k_url_safe, - k_multi_line); - grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len); - - /* memcopy individual pieces into this slice */ - uint8_t *write_ptr = - (uint8_t *)GRPC_SLICE_START_PTR(path_with_query_slice); - uint8_t *original_path = (uint8_t *)GRPC_SLICE_START_PTR(path_slice); - memcpy(write_ptr, original_path, GRPC_SLICE_LENGTH(path_slice)); - write_ptr += GRPC_SLICE_LENGTH(path_slice); - - *write_ptr = k_query_separator; - write_ptr++; /* for the '?' */ - - grpc_base64_encode_core((char *)write_ptr, calld->payload_bytes, - op->payload->send_message.send_message->length, - k_url_safe, k_multi_line); - - /* remove trailing unused memory and add trailing 0 to terminate string - */ - char *t = (char *)GRPC_SLICE_START_PTR(path_with_query_slice); - /* safe to use strlen since base64_encode will always add '\0' */ - path_with_query_slice = - grpc_slice_sub_no_ref(path_with_query_slice, 0, strlen(t)); - - /* substitute previous path with the new path+query */ - grpc_mdelem mdelem_path_and_query = grpc_mdelem_from_slices( - exec_ctx, GRPC_MDSTR_PATH, path_with_query_slice); - grpc_metadata_batch *b = - op->payload->send_initial_metadata.send_initial_metadata; - error = grpc_metadata_batch_substitute(exec_ctx, b, b->idx.named.path, - mdelem_path_and_query); - if (error != GRPC_ERROR_NONE) return error; - - calld->on_complete = op->on_complete; - op->on_complete = &calld->hc_on_complete; - op->send_message = false; + calld->send_message_bytes_read = 0; + grpc_byte_stream_cache_init(&calld->send_message_cache, + batch->payload->send_message.send_message); + grpc_caching_byte_stream_init(&calld->send_message_caching_stream, + &calld->send_message_cache); + batch->payload->send_message.send_message = + &calld->send_message_caching_stream.base; + calld->original_send_message_on_complete = batch->on_complete; + batch->on_complete = &calld->send_message_on_complete; + calld->send_message_batch = batch; + error = read_all_available_send_message_data(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) goto done; + // If all the data has been read, then we can use GET. + if (calld->send_message_bytes_read == + calld->send_message_caching_stream.base.length) { + method = GRPC_MDELEM_METHOD_GET; + error = update_path_for_get(exec_ctx, elem, batch); + if (error != GRPC_ERROR_NONE) goto done; + batch->send_message = false; + grpc_byte_stream_destroy(exec_ctx, + &calld->send_message_caching_stream.base); } else { - /* Not all data is available. Fall back to POST. */ + // Not all data is available. The batch will be sent down + // asynchronously in on_send_message_next_done(). + batch_will_be_handled_asynchronously = true; + // Fall back to POST. gpr_log(GPR_DEBUG, - "Request is marked Cacheable but not all data is available.\ - Falling back to POST"); - method = GRPC_MDELEM_METHOD_POST; + "Request is marked Cacheable but not all data is available. " + "Falling back to POST"); } + } else if (batch->payload->send_initial_metadata + .send_initial_metadata_flags & + GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) { + method = GRPC_MDELEM_METHOD_PUT; } - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_METHOD); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_SCHEME); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_TE); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_CONTENT_TYPE); - remove_if_present(exec_ctx, - op->payload->send_initial_metadata.send_initial_metadata, - GRPC_BATCH_USER_AGENT); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_METHOD); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_SCHEME); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_TE); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_CONTENT_TYPE); + remove_if_present( + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, + GRPC_BATCH_USER_AGENT); /* Send : prefixed headers, which have to be before any application layer headers. */ error = grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->method, method); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_head( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->scheme, channeld->static_scheme); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->te_trailers, GRPC_MDELEM_TE_TRAILERS); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->content_type, GRPC_MDELEM_CONTENT_TYPE_APPLICATION_SLASH_GRPC); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; error = grpc_metadata_batch_add_tail( - exec_ctx, op->payload->send_initial_metadata.send_initial_metadata, + exec_ctx, batch->payload->send_initial_metadata.send_initial_metadata, &calld->user_agent, GRPC_MDELEM_REF(channeld->user_agent)); - if (error != GRPC_ERROR_NONE) return error; + if (error != GRPC_ERROR_NONE) goto done; } - if (op->recv_initial_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata; - calld->on_done_recv_initial_metadata = - op->payload->recv_initial_metadata.recv_initial_metadata_ready; - op->payload->recv_initial_metadata.recv_initial_metadata_ready = - &calld->hc_on_recv_initial_metadata; - } - - if (op->recv_trailing_metadata) { - /* substitute our callback for the higher callback */ - calld->recv_trailing_metadata = - op->payload->recv_trailing_metadata.recv_trailing_metadata; - calld->on_done_recv_trailing_metadata = op->on_complete; - op->on_complete = &calld->hc_on_recv_trailing_metadata; - } - - return GRPC_ERROR_NONE; -} - -static void hc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { - GPR_TIMER_BEGIN("hc_start_transport_op", 0); - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_error *error = hc_mutate_op(exec_ctx, elem, op); +done: if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - } else { - call_data *calld = elem->call_data; - if (op->send_message && calld->send_message_blocked) { - /* Don't forward the op. send_message contains slices that aren't ready - yet. The call will be forwarded by the op_complete of slice read call. - */ - } else { - grpc_call_next_op(exec_ctx, elem, op); - } + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + } else if (!batch_will_be_handled_asynchronously) { + grpc_call_next_op(exec_ctx, elem, batch); } - GPR_TIMER_END("hc_start_transport_op", 0); + GPR_TIMER_END("hc_start_transport_stream_op_batch", 0); } /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - call_data *calld = elem->call_data; - calld->on_done_recv_initial_metadata = NULL; - calld->on_done_recv_trailing_metadata = NULL; - calld->on_complete = NULL; - calld->payload_bytes = NULL; - calld->send_message_blocked = false; - grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->hc_on_recv_initial_metadata, - hc_on_recv_initial_metadata, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hc_on_recv_trailing_metadata, - hc_on_recv_trailing_metadata, elem, + call_data *calld = (call_data *)elem->call_data; + GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, elem, grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->hc_on_complete, hc_on_complete, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem, + GRPC_CLOSURE_INIT(&calld->recv_trailing_metadata_on_complete, + recv_trailing_metadata_on_complete, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, + elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, + on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - grpc_closure *ignored) { - call_data *calld = elem->call_data; - grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); -} + grpc_closure *ignored) {} static grpc_mdelem scheme_from_args(const grpc_channel_args *args) { unsigned i; @@ -580,7 +556,7 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, } const grpc_channel_filter grpc_http_client_filter = { - hc_start_transport_op, + hc_start_transport_stream_op_batch, grpc_channel_next_op, sizeof(call_data), init_call_elem, diff --git a/src/core/ext/filters/http/http_filters_plugin.c b/src/core/ext/filters/http/http_filters_plugin.c index 3e4ec01a31..a5c1b92054 100644 --- a/src/core/ext/filters/http/http_filters_plugin.c +++ b/src/core/ext/filters/http/http_filters_plugin.c @@ -65,7 +65,7 @@ static bool maybe_add_required_filter(grpc_exec_ctx *exec_ctx, } void grpc_http_filters_init(void) { - grpc_register_tracer("compression", &grpc_compression_trace); + grpc_register_tracer(&grpc_compression_trace); grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, maybe_add_optional_filter, &compress_filter); diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 68336d60fe..eb1a5a95e2 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -63,14 +63,11 @@ typedef struct call_data { pointer | CANCELLED_BIT - request was cancelled with error pointed to */ gpr_atm send_initial_metadata_state; - grpc_transport_stream_op_batch *send_op; - uint32_t send_length; - uint32_t send_flags; - grpc_slice incoming_slice; + grpc_transport_stream_op_batch *send_message_batch; grpc_slice_buffer_stream replacement_stream; - grpc_closure *post_send; - grpc_closure send_done; - grpc_closure got_slice; + grpc_closure *original_send_message_on_complete; + grpc_closure send_message_on_complete; + grpc_closure on_send_message_next_done; } call_data; typedef struct channel_data { @@ -215,24 +212,25 @@ static grpc_error *process_send_initial_metadata( return error; } -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem); - -static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; +static void send_message_on_complete(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &calld->slices); - calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->original_send_message_on_complete, + GRPC_ERROR_REF(error)); } static void finish_send_message(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { - call_data *calld = elem->call_data; - int did_compress; + call_data *calld = (call_data *)elem->call_data; + // Compress the data if appropriate. grpc_slice_buffer tmp; grpc_slice_buffer_init(&tmp); - did_compress = grpc_msg_compress(exec_ctx, calld->compression_algorithm, - &calld->slices, &tmp); + uint32_t send_flags = + calld->send_message_batch->payload->send_message.send_message->flags; + const bool did_compress = grpc_msg_compress( + exec_ctx, calld->compression_algorithm, &calld->slices, &tmp); if (did_compress) { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; @@ -246,7 +244,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, algo_name, before_size, after_size, 100 * savings_ratio); } grpc_slice_buffer_swap(&calld->slices, &tmp); - calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; + send_flags |= GRPC_WRITE_INTERNAL_COMPRESS; } else { if (GRPC_TRACER_ON(grpc_compression_trace)) { char *algo_name; @@ -258,83 +256,118 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx, algo_name, calld->slices.length); } } - grpc_slice_buffer_destroy_internal(exec_ctx, &tmp); - + // Swap out the original byte stream with our new one and send the + // batch down. + grpc_byte_stream_destroy( + exec_ctx, calld->send_message_batch->payload->send_message.send_message); grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices, - calld->send_flags); - calld->send_op->payload->send_message.send_message = + send_flags); + calld->send_message_batch->payload->send_message.send_message = &calld->replacement_stream.base; - calld->post_send = calld->send_op->on_complete; - calld->send_op->on_complete = &calld->send_done; - - grpc_call_next_op(exec_ctx, elem, calld->send_op); + calld->original_send_message_on_complete = + calld->send_message_batch->on_complete; + calld->send_message_batch->on_complete = &calld->send_message_on_complete; + grpc_call_next_op(exec_ctx, elem, calld->send_message_batch); } -static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, grpc_error *error) { - grpc_call_element *elem = elemp; - call_data *calld = elem->call_data; - if (GRPC_ERROR_NONE != - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice)) { - /* Should never reach here */ - abort(); - } - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { - finish_send_message(exec_ctx, elem); - } else { - continue_send_message(exec_ctx, elem); +// Pulls a slice from the send_message byte stream and adds it to calld->slices. +static grpc_error *pull_slice_from_send_message(grpc_exec_ctx *exec_ctx, + call_data *calld) { + grpc_slice incoming_slice; + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, calld->send_message_batch->payload->send_message.send_message, + &incoming_slice); + if (error == GRPC_ERROR_NONE) { + grpc_slice_buffer_add(&calld->slices, incoming_slice); } + return error; } -static void continue_send_message(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - call_data *calld = elem->call_data; +// Reads as many slices as possible from the send_message byte stream. +// If all data has been read, invokes finish_send_message(). Otherwise, +// an async call to grpc_byte_stream_next() has been started, which will +// eventually result in calling on_send_message_next_done(). +static grpc_error *continue_reading_send_message(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + call_data *calld = (call_data *)elem->call_data; while (grpc_byte_stream_next( - exec_ctx, calld->send_op->payload->send_message.send_message, ~(size_t)0, - &calld->got_slice)) { - grpc_byte_stream_pull(exec_ctx, - calld->send_op->payload->send_message.send_message, - &calld->incoming_slice); - grpc_slice_buffer_add(&calld->slices, calld->incoming_slice); - if (calld->send_length == calld->slices.length) { + exec_ctx, calld->send_message_batch->payload->send_message.send_message, + ~(size_t)0, &calld->on_send_message_next_done)) { + grpc_error *error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) return error; + if (calld->slices.length == + calld->send_message_batch->payload->send_message.send_message->length) { finish_send_message(exec_ctx, elem); break; } } + return GRPC_ERROR_NONE; } -static void handle_send_message_batch(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op_batch *op, - bool has_compression_algorithm) { - call_data *calld = elem->call_data; - if (!skip_compression(elem, op->payload->send_message.send_message->flags, +// Async callback for grpc_byte_stream_next(). +static void on_send_message_next_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_call_element *elem = (grpc_call_element *)arg; + call_data *calld = (call_data *)elem->call_data; + if (error != GRPC_ERROR_NONE) goto fail; + error = pull_slice_from_send_message(exec_ctx, calld); + if (error != GRPC_ERROR_NONE) goto fail; + if (calld->slices.length == + calld->send_message_batch->payload->send_message.send_message->length) { + finish_send_message(exec_ctx, elem); + } else { + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // this function again. + error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) goto fail; + } + return; +fail: + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); +} + +static void start_send_message_batch(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op_batch *batch, + bool has_compression_algorithm) { + call_data *calld = (call_data *)elem->call_data; + if (!skip_compression(elem, batch->payload->send_message.send_message->flags, has_compression_algorithm)) { - calld->send_op = op; - calld->send_length = op->payload->send_message.send_message->length; - calld->send_flags = op->payload->send_message.send_message->flags; - continue_send_message(exec_ctx, elem); + calld->send_message_batch = batch; + // This will either finish reading all of the data and invoke + // finish_send_message(), or else it will make an async call to + // grpc_byte_stream_next(), which will eventually result in calling + // on_send_message_next_done(). + grpc_error *error = continue_reading_send_message(exec_ctx, elem); + if (error != GRPC_ERROR_NONE) { + grpc_transport_stream_op_batch_finish_with_failure( + exec_ctx, calld->send_message_batch, error); + } } else { /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } } static void compress_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_transport_stream_op_batch *op) { + grpc_transport_stream_op_batch *batch) { call_data *calld = elem->call_data; GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); - if (op->cancel_stream) { - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); + if (batch->cancel_stream) { + // TODO(roth): As part of the upcoming call combiner work, change + // this to call grpc_byte_stream_shutdown() on the incoming byte + // stream, to cancel any in-flight calls to grpc_byte_stream_next(). + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error); gpr_atm cur = gpr_atm_full_xchg( &calld->send_initial_metadata_state, - CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error); + CANCELLED_BIT | (gpr_atm)batch->payload->cancel_stream.cancel_error); switch (cur) { case HAS_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM: @@ -344,7 +377,7 @@ static void compress_start_transport_stream_op_batch( if ((cur & CANCELLED_BIT) == 0) { grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, (grpc_transport_stream_op_batch *)cur, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); + GRPC_ERROR_REF(batch->payload->cancel_stream.cancel_error)); } else { GRPC_ERROR_UNREF((grpc_error *)(cur & ~CANCELLED_BIT)); } @@ -352,14 +385,15 @@ static void compress_start_transport_stream_op_batch( } } - if (op->send_initial_metadata) { + if (batch->send_initial_metadata) { bool has_compression_algorithm; grpc_error *error = process_send_initial_metadata( exec_ctx, elem, - op->payload->send_initial_metadata.send_initial_metadata, + batch->payload->send_initial_metadata.send_initial_metadata, &has_compression_algorithm); if (error != GRPC_ERROR_NONE) { - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); + grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, batch, + error); return; } gpr_atm cur; @@ -375,32 +409,32 @@ static void compress_start_transport_stream_op_batch( goto retry_send_im; } if (cur != INITIAL_METADATA_UNSEEN) { - handle_send_message_batch(exec_ctx, elem, - (grpc_transport_stream_op_batch *)cur, - has_compression_algorithm); + start_send_message_batch(exec_ctx, elem, + (grpc_transport_stream_op_batch *)cur, + has_compression_algorithm); } } } - if (op->send_message) { + if (batch->send_message) { gpr_atm cur; retry_send: cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); switch (cur) { case INITIAL_METADATA_UNSEEN: if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, - (gpr_atm)op)) { + (gpr_atm)batch)) { goto retry_send; } break; case HAS_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM: - handle_send_message_batch(exec_ctx, elem, op, - cur == HAS_COMPRESSION_ALGORITHM); + start_send_message_batch(exec_ctx, elem, batch, + cur == HAS_COMPRESSION_ALGORITHM); break; default: if (cur & CANCELLED_BIT) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, op, + exec_ctx, batch, GRPC_ERROR_REF((grpc_error *)(cur & ~CANCELLED_BIT))); } else { /* >1 send_message concurrently */ @@ -409,7 +443,7 @@ static void compress_start_transport_stream_op_batch( } } else { /* pass control down the stack */ - grpc_call_next_op(exec_ctx, elem, op); + grpc_call_next_op(exec_ctx, elem, batch); } GPR_TIMER_END("compress_start_transport_stream_op_batch", 0); @@ -424,10 +458,10 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* initialize members */ grpc_slice_buffer_init(&calld->slices); - GRPC_CLOSURE_INIT(&calld->got_slice, got_slice, elem, - grpc_schedule_on_exec_ctx); - GRPC_CLOSURE_INIT(&calld->send_done, send_done, elem, - grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->on_send_message_next_done, + on_send_message_next_done, elem, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&calld->send_message_on_complete, send_message_on_complete, + elem, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/filters/max_age/max_age_filter.c b/src/core/ext/filters/max_age/max_age_filter.c index 35304f8150..7d748b9c32 100644 --- a/src/core/ext/filters/max_age/max_age_filter.c +++ b/src/core/ext/filters/max_age/max_age_filter.c @@ -108,7 +108,7 @@ static void decrease_call_count(grpc_exec_ctx* exec_ctx, channel_data* chand) { static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; /* Decrease call_count. If there are no active calls at this time, max_idle_timer will start here. If the number of active calls is not 0, max_idle_timer will start after all the active calls end. */ @@ -119,7 +119,7 @@ static void start_max_idle_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_timer"); @@ -140,7 +140,7 @@ static void start_max_age_timer_after_init(grpc_exec_ctx* exec_ctx, void* arg, static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_grace_timer_pending = true; GRPC_CHANNEL_STACK_REF(chand->channel_stack, "max_age max_age_grace_timer"); @@ -156,7 +156,7 @@ static void start_max_age_grace_timer_after_goaway_op(grpc_exec_ctx* exec_ctx, static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; if (error == GRPC_ERROR_NONE) { /* Prevent the max idle timer from being set again */ gpr_atm_no_barrier_fetch_add(&chand->call_count, 1); @@ -176,7 +176,7 @@ static void close_max_idle_channel(grpc_exec_ctx* exec_ctx, void* arg, static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_timer_pending = false; gpr_mu_unlock(&chand->max_age_timer_mu); @@ -200,7 +200,7 @@ static void close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; gpr_mu_lock(&chand->max_age_timer_mu); chand->max_age_grace_timer_pending = false; gpr_mu_unlock(&chand->max_age_timer_mu); @@ -220,7 +220,7 @@ static void force_close_max_age_channel(grpc_exec_ctx* exec_ctx, void* arg, static void channel_connectivity_changed(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { - channel_data* chand = arg; + channel_data* chand = (channel_data*)arg; if (chand->connectivity_state != GRPC_CHANNEL_SHUTDOWN) { grpc_transport_op* op = grpc_make_transport_op(NULL); op->on_connectivity_state_change = &chand->channel_connectivity_changed, @@ -264,7 +264,7 @@ static int add_random_max_connection_age_jitter(int value) { static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; increase_call_count(exec_ctx, chand); return GRPC_ERROR_NONE; } @@ -281,7 +281,7 @@ static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) { - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; gpr_mu_init(&chand->max_age_timer_mu); chand->max_age_timer_pending = false; chand->max_age_grace_timer_pending = false; diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c index 9bb565ed6d..846c7df69a 100644 --- a/src/core/ext/filters/message_size/message_size_filter.c +++ b/src/core/ext/filters/message_size/message_size_filter.c @@ -60,7 +60,8 @@ static void* message_size_limits_create_from_json(const grpc_json* json) { if (max_response_message_bytes == -1) return NULL; } } - message_size_limits* value = gpr_malloc(sizeof(message_size_limits)); + message_size_limits* value = + (message_size_limits*)gpr_malloc(sizeof(message_size_limits)); value->max_send_size = max_request_message_bytes; value->max_recv_size = max_response_message_bytes; return value; @@ -88,8 +89,8 @@ typedef struct channel_data { // receive message size. static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_error* error) { - grpc_call_element* elem = user_data; - call_data* calld = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)user_data; + call_data* calld = (call_data*)elem->call_data; if (*calld->recv_message != NULL && calld->limits.max_recv_size >= 0 && (*calld->recv_message)->length > (size_t)calld->limits.max_recv_size) { char* message_string; @@ -117,7 +118,7 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, static void start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - call_data* calld = elem->call_data; + call_data* calld = (call_data*)elem->call_data; // Check max send message size. if (op->send_message && calld->limits.max_send_size >= 0 && op->payload->send_message.send_message->length > @@ -149,8 +150,8 @@ static void start_transport_stream_op_batch( static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - channel_data* chand = elem->channel_data; - call_data* calld = elem->call_data; + channel_data* chand = (channel_data*)elem->channel_data; + call_data* calld = (call_data*)elem->call_data; calld->next_recv_message_ready = NULL; GRPC_CLOSURE_INIT(&calld->recv_message_ready, recv_message_ready, elem, grpc_schedule_on_exec_ctx); @@ -160,8 +161,9 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // size to the receive limit. calld->limits = chand->limits; if (chand->method_limit_table != NULL) { - message_size_limits* limits = grpc_method_config_table_get( - exec_ctx, chand->method_limit_table, args->path); + message_size_limits* limits = + (message_size_limits*)grpc_method_config_table_get( + exec_ctx, chand->method_limit_table, args->path); if (limits != NULL) { if (limits->max_send_size >= 0 && (limits->max_send_size < calld->limits.max_send_size || @@ -220,7 +222,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; chand->limits = get_message_size_limits(args->channel_args); // Get method config table from channel args. const grpc_arg* channel_arg = @@ -243,7 +245,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, // Destructor for channel_data. static void destroy_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element* elem) { - channel_data* chand = elem->channel_data; + channel_data* chand = (channel_data*)elem->channel_data; grpc_slice_hash_table_unref(exec_ctx, chand->method_limit_table); } diff --git a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c index 8b3fff5fa3..b4d2cb4b8c 100644 --- a/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c +++ b/src/core/ext/filters/workarounds/workaround_cronet_compression_filter.c @@ -52,8 +52,8 @@ static bool get_user_agent_mdelem(const grpc_metadata_batch* batch, // Callback invoked when we receive an initial metadata. static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, void* user_data, grpc_error* error) { - grpc_call_element* elem = user_data; - call_data* calld = elem->call_data; + grpc_call_element* elem = (grpc_call_element*)user_data; + call_data* calld = (call_data*)elem->call_data; if (GRPC_ERROR_NONE == error) { grpc_mdelem md; @@ -75,7 +75,7 @@ static void recv_initial_metadata_ready(grpc_exec_ctx* exec_ctx, static void start_transport_stream_op_batch( grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op_batch* op) { - call_data* calld = elem->call_data; + call_data* calld = (call_data*)elem->call_data; // Inject callback for receiving initial metadata if (op->recv_initial_metadata) { @@ -103,7 +103,7 @@ static void start_transport_stream_op_batch( static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - call_data* calld = elem->call_data; + call_data* calld = (call_data*)elem->call_data; calld->next_recv_initial_metadata_ready = NULL; calld->workaround_active = false; GRPC_CLOSURE_INIT(&calld->recv_initial_metadata_ready, diff --git a/src/core/ext/filters/workarounds/workaround_utils.c b/src/core/ext/filters/workarounds/workaround_utils.c index bc76753a8a..e600fbee67 100644 --- a/src/core/ext/filters/workarounds/workaround_utils.c +++ b/src/core/ext/filters/workarounds/workaround_utils.c @@ -33,7 +33,8 @@ grpc_workaround_user_agent_md *grpc_parse_user_agent(grpc_mdelem md) { if (NULL != user_agent_md) { return user_agent_md; } - user_agent_md = gpr_malloc(sizeof(grpc_workaround_user_agent_md)); + user_agent_md = (grpc_workaround_user_agent_md *)gpr_malloc( + sizeof(grpc_workaround_user_agent_md)); for (int i = 0; i < GRPC_MAX_WORKAROUND_ID; i++) { if (ua_parser[i]) { user_agent_md->workaround_active[i] = ua_parser[i](md); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c index 6a8c81445a..78551df9c3 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c @@ -21,10 +21,10 @@ #include "src/core/lib/transport/metadata.h" void grpc_chttp2_plugin_init(void) { - grpc_register_tracer("http", &grpc_http_trace); - grpc_register_tracer("flowctl", &grpc_flowctl_trace); + grpc_register_tracer(&grpc_http_trace); + grpc_register_tracer(&grpc_flowctl_trace); #ifndef NDEBUG - grpc_register_tracer("chttp2_refcount", &grpc_trace_chttp2_refcount); + grpc_register_tracer(&grpc_trace_chttp2_refcount); #endif } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 0b9c7458db..ede05d57b7 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -35,6 +35,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/compression/stream_compression.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" @@ -53,7 +54,7 @@ #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024) #define MAX_WINDOW 0x7fffffffu #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) -#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) +#define DEFAULT_MAX_HEADER_LIST_SIZE (8 * 1024) #define DEFAULT_CLIENT_KEEPALIVE_TIME_MS INT_MAX #define DEFAULT_CLIENT_KEEPALIVE_TIMEOUT_MS 20000 /* 20 seconds */ @@ -74,11 +75,12 @@ static bool g_default_keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; #define MAX_CLIENT_STREAM_ID 0x7fffffffu -grpc_tracer_flag grpc_http_trace = GRPC_TRACER_INITIALIZER(false); -grpc_tracer_flag grpc_flowctl_trace = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_http_trace = GRPC_TRACER_INITIALIZER(false, "http"); +grpc_tracer_flag grpc_flowctl_trace = GRPC_TRACER_INITIALIZER(false, "flowctl"); #ifndef NDEBUG -grpc_tracer_flag grpc_trace_chttp2_refcount = GRPC_TRACER_INITIALIZER(false); +grpc_tracer_flag grpc_trace_chttp2_refcount = + GRPC_TRACER_INITIALIZER(false, "chttp2_refcount"); #endif static const grpc_transport_vtable vtable; @@ -281,8 +283,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_slice_buffer_init(&t->outbuf); grpc_chttp2_hpack_compressor_init(&t->hpack_compressor); - GRPC_CLOSURE_INIT(&t->write_action, write_action, t, - grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&t->read_action_locked, read_action_locked, t, grpc_combiner_scheduler(t->combiner)); GRPC_CLOSURE_INIT(&t->benign_reclaimer_locked, benign_reclaimer_locked, t, @@ -400,6 +400,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } t->keepalive_permit_without_calls = g_default_keepalive_permit_without_calls; + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -488,6 +490,23 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->keepalive_permit_without_calls = (uint32_t)grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){0, 0, 1}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_OPTIMIZATION_TARGET)) { + if (channel_args->args[i].type != GRPC_ARG_STRING) { + gpr_log(GPR_ERROR, "%s should be a string", + GRPC_ARG_OPTIMIZATION_TARGET); + } else if (0 == strcmp(channel_args->args[i].value.string, "blend")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == strcmp(channel_args->args[i].value.string, "latency")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY; + } else if (0 == + strcmp(channel_args->args[i].value.string, "throughput")) { + t->opt_target = GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT; + } else { + gpr_log(GPR_ERROR, "%s value '%s' unknown, assuming 'blend'", + GRPC_ARG_OPTIMIZATION_TARGET, + channel_args->args[i].value.string); + } } else { static const struct { const char *channel_arg_name; @@ -541,6 +560,11 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } } + GRPC_CLOSURE_INIT(&t->write_action, write_action, t, + t->opt_target == GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT + ? grpc_executor_scheduler + : grpc_schedule_on_exec_ctx); + t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; t->ping_state.is_delayed_ping_timer_set = false; @@ -1166,6 +1190,7 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } if (s->fetched_send_message_length == s->fetching_send_message->length) { + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); int64_t notify_offset = s->next_message_end_offset; if (notify_offset <= s->flow_controlled_bytes_written) { grpc_chttp2_complete_closure_step( @@ -1188,9 +1213,14 @@ static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, return; /* early out */ } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, UINT32_MAX, &s->complete_fetch_locked)) { - grpc_byte_stream_pull(exec_ctx, s->fetching_send_message, - &s->fetching_slice); - add_fetched_slice_locked(exec_ctx, t, s); + grpc_error *error = grpc_byte_stream_pull( + exec_ctx, s->fetching_send_message, &s->fetching_slice); + if (error != GRPC_ERROR_NONE) { + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + } else { + add_fetched_slice_locked(exec_ctx, t, s); + } } } } @@ -1207,10 +1237,9 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, continue_fetching_send_locked(exec_ctx, t, s); } } - if (error != GRPC_ERROR_NONE) { - /* TODO(ctiller): what to do here */ - abort(); + grpc_byte_stream_destroy(exec_ctx, s->fetching_send_message); + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); } } @@ -1218,7 +1247,7 @@ static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} static void log_metadata(const grpc_metadata_batch *md_batch, uint32_t id, bool is_client, bool is_initial) { - for (grpc_linked_mdelem *md = md_batch->list.head; md != md_batch->list.tail; + for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL; md = md->next) { char *key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); char *value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md)); @@ -1486,6 +1515,21 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; + if (!t->is_client) { + if (op->send_initial_metadata) { + gpr_timespec deadline = + op->payload->send_initial_metadata.send_initial_metadata->deadline; + GPR_ASSERT(0 == + gpr_time_cmp(gpr_inf_future(deadline.clock_type), deadline)); + } + if (op->send_trailing_metadata) { + gpr_timespec deadline = + op->payload->send_trailing_metadata.send_trailing_metadata->deadline; + GPR_ASSERT(0 == + gpr_time_cmp(gpr_inf_future(deadline.clock_type), deadline)); + } + } + if (GRPC_TRACER_ON(grpc_http_trace)) { char *str = grpc_transport_stream_op_batch_string(op); gpr_log(GPR_DEBUG, "perform_stream_op[s=%p]: %s", s, str); @@ -2214,8 +2258,8 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, static void update_frame(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, double bw_dbl, double bdp_dbl) { - int32_t bdp = GPR_CLAMP((int32_t)bdp_dbl, 128, INT32_MAX); - int32_t target = GPR_MAX((int32_t)bw_dbl / 1000, bdp); + int32_t bdp = (int32_t)GPR_CLAMP(bdp_dbl, 128.0, INT32_MAX); + int32_t target = (int32_t)GPR_MAX(bw_dbl / 1000, bdp); // frame size is bounded [2^14,2^24-1] int32_t frame_size = GPR_CLAMP(target, 16384, 16777215); int64_t delta = (int64_t)frame_size - @@ -2756,22 +2800,9 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } -static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, - grpc_byte_stream *byte_stream); - static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, void *byte_stream, - grpc_error *error_ignored) { - grpc_chttp2_incoming_byte_stream *bs = byte_stream; - grpc_chttp2_stream *s = bs->stream; - grpc_chttp2_transport *t = s->t; - - GPR_ASSERT(bs->base.destroy == incoming_byte_stream_destroy); - incoming_byte_stream_unref(exec_ctx, bs); - s->pending_byte_stream = false; - grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); - grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); -} + grpc_error *error_ignored); static void incoming_byte_stream_destroy(grpc_exec_ctx *exec_ctx, grpc_byte_stream *byte_stream) { @@ -2838,6 +2869,33 @@ grpc_error *grpc_chttp2_incoming_byte_stream_finished( return error; } +static void incoming_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *byte_stream, + grpc_error *error) { + grpc_chttp2_incoming_byte_stream *bs = + (grpc_chttp2_incoming_byte_stream *)byte_stream; + GRPC_ERROR_UNREF(grpc_chttp2_incoming_byte_stream_finished( + exec_ctx, bs, error, true /* reset_on_error */)); +} + +static const grpc_byte_stream_vtable grpc_chttp2_incoming_byte_stream_vtable = { + incoming_byte_stream_next, incoming_byte_stream_pull, + incoming_byte_stream_shutdown, incoming_byte_stream_destroy}; + +static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, + void *byte_stream, + grpc_error *error_ignored) { + grpc_chttp2_incoming_byte_stream *bs = byte_stream; + grpc_chttp2_stream *s = bs->stream; + grpc_chttp2_transport *t = s->t; + + GPR_ASSERT(bs->base.vtable == &grpc_chttp2_incoming_byte_stream_vtable); + incoming_byte_stream_unref(exec_ctx, bs); + s->pending_byte_stream = false; + grpc_chttp2_maybe_complete_recv_message(exec_ctx, t, s); + grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); +} + grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, uint32_t frame_size, uint32_t flags) { @@ -2846,9 +2904,7 @@ grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create( incoming_byte_stream->base.length = frame_size; incoming_byte_stream->remaining_bytes = frame_size; incoming_byte_stream->base.flags = flags; - incoming_byte_stream->base.next = incoming_byte_stream_next; - incoming_byte_stream->base.pull = incoming_byte_stream_pull; - incoming_byte_stream->base.destroy = incoming_byte_stream_destroy; + incoming_byte_stream->base.vtable = &grpc_chttp2_incoming_byte_stream_vtable; gpr_ref_init(&incoming_byte_stream->refs, 2); incoming_byte_stream->transport = t; incoming_byte_stream->stream = s; diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index eb1acc0f13..b538d1df17 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -68,6 +68,11 @@ typedef enum { } grpc_chttp2_ping_type; typedef enum { + GRPC_CHTTP2_OPTIMIZE_FOR_LATENCY, + GRPC_CHTTP2_OPTIMIZE_FOR_THROUGHPUT, +} grpc_chttp2_optimization_target; + +typedef enum { GRPC_CHTTP2_PCL_INITIATE = 0, GRPC_CHTTP2_PCL_NEXT, GRPC_CHTTP2_PCL_INFLIGHT, @@ -230,6 +235,8 @@ struct grpc_chttp2_transport { /** should we probe bdp? */ bool enable_bdp_probe; + grpc_chttp2_optimization_target opt_target; + /** various lists of streams */ grpc_chttp2_stream_list lists[STREAM_LIST_COUNT]; diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 3c8b470b4f..9d46cfa22e 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -657,6 +657,10 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx, "ignoring grpc_chttp2_stream with non-client generated index %d", t->incoming_stream_id)); return init_skip_frame_parser(exec_ctx, t, 1); + } else if (grpc_chttp2_stream_map_size(&t->stream_map) >= + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Max stream count exceeded"); } t->last_new_stream_id = t->incoming_stream_id; s = t->incoming_stream = diff --git a/src/core/ext/transport/inproc/inproc_plugin.c b/src/core/ext/transport/inproc/inproc_plugin.c new file mode 100644 index 0000000000..6a796a0b19 --- /dev/null +++ b/src/core/ext/transport/inproc/inproc_plugin.c @@ -0,0 +1,29 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/ext/transport/inproc/inproc_transport.h" +#include "src/core/lib/debug/trace.h" + +grpc_tracer_flag grpc_inproc_trace = GRPC_TRACER_INITIALIZER(false, "inproc"); + +void grpc_inproc_plugin_init(void) { + grpc_register_tracer(&grpc_inproc_trace); + grpc_inproc_transport_init(); +} + +void grpc_inproc_plugin_shutdown(void) { grpc_inproc_transport_shutdown(); } diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c new file mode 100644 index 0000000000..6f4b429ee2 --- /dev/null +++ b/src/core/ext/transport/inproc/inproc_transport.c @@ -0,0 +1,1303 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/ext/transport/inproc/inproc_transport.h" +#include <grpc/support/alloc.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> +#include <string.h> +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/surface/api_trace.h" +#include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/channel_stack_type.h" +#include "src/core/lib/surface/server.h" +#include "src/core/lib/transport/connectivity_state.h" +#include "src/core/lib/transport/error_utils.h" +#include "src/core/lib/transport/transport_impl.h" + +#define INPROC_LOG(...) \ + do { \ + if (GRPC_TRACER_ON(grpc_inproc_trace)) gpr_log(__VA_ARGS__); \ + } while (0) + +static const grpc_transport_vtable inproc_vtable; +static grpc_slice g_empty_slice; +static grpc_slice g_fake_path_key; +static grpc_slice g_fake_path_value; +static grpc_slice g_fake_auth_key; +static grpc_slice g_fake_auth_value; + +typedef struct { + gpr_mu mu; + gpr_refcount refs; +} shared_mu; + +typedef struct inproc_transport { + grpc_transport base; + shared_mu *mu; + gpr_refcount refs; + bool is_client; + grpc_connectivity_state_tracker connectivity; + void (*accept_stream_cb)(grpc_exec_ctx *exec_ctx, void *user_data, + grpc_transport *transport, const void *server_data); + void *accept_stream_data; + bool is_closed; + struct inproc_transport *other_side; + struct inproc_stream *stream_list; +} inproc_transport; + +typedef struct sb_list_entry { + grpc_slice_buffer sb; + struct sb_list_entry *next; +} sb_list_entry; + +// Specialize grpc_byte_stream for our use case +typedef struct { + grpc_byte_stream base; + sb_list_entry *le; + grpc_error *shutdown_error; +} inproc_slice_byte_stream; + +typedef struct { + // TODO (vjpai): Add some inlined elements to avoid alloc in simple cases + sb_list_entry *head; + sb_list_entry *tail; +} slice_buffer_list; + +static void slice_buffer_list_init(slice_buffer_list *l) { + l->head = NULL; + l->tail = NULL; +} + +static void sb_list_entry_destroy(grpc_exec_ctx *exec_ctx, sb_list_entry *le) { + grpc_slice_buffer_destroy_internal(exec_ctx, &le->sb); + gpr_free(le); +} + +static void slice_buffer_list_destroy(grpc_exec_ctx *exec_ctx, + slice_buffer_list *l) { + sb_list_entry *curr = l->head; + while (curr != NULL) { + sb_list_entry *le = curr; + curr = curr->next; + sb_list_entry_destroy(exec_ctx, le); + } + l->head = NULL; + l->tail = NULL; +} + +static bool slice_buffer_list_empty(slice_buffer_list *l) { + return l->head == NULL; +} + +static void slice_buffer_list_append_entry(slice_buffer_list *l, + sb_list_entry *next) { + next->next = NULL; + if (l->tail) { + l->tail->next = next; + l->tail = next; + } else { + l->head = next; + l->tail = next; + } +} + +static grpc_slice_buffer *slice_buffer_list_append(slice_buffer_list *l) { + sb_list_entry *next = gpr_malloc(sizeof(*next)); + grpc_slice_buffer_init(&next->sb); + slice_buffer_list_append_entry(l, next); + return &next->sb; +} + +static sb_list_entry *slice_buffer_list_pophead(slice_buffer_list *l) { + sb_list_entry *ret = l->head; + l->head = l->head->next; + if (l->head == NULL) { + l->tail = NULL; + } + return ret; +} + +typedef struct inproc_stream { + inproc_transport *t; + grpc_metadata_batch to_read_initial_md; + uint32_t to_read_initial_md_flags; + bool to_read_initial_md_filled; + slice_buffer_list to_read_message; + grpc_metadata_batch to_read_trailing_md; + bool to_read_trailing_md_filled; + bool reads_needed; + bool read_closure_scheduled; + grpc_closure read_closure; + // Write buffer used only during gap at init time when client-side + // stream is set up but server side stream is not yet set up + grpc_metadata_batch write_buffer_initial_md; + bool write_buffer_initial_md_filled; + uint32_t write_buffer_initial_md_flags; + gpr_timespec write_buffer_deadline; + slice_buffer_list write_buffer_message; + grpc_metadata_batch write_buffer_trailing_md; + bool write_buffer_trailing_md_filled; + grpc_error *write_buffer_cancel_error; + + struct inproc_stream *other_side; + bool other_side_closed; // won't talk anymore + bool write_buffer_other_side_closed; // on hold + grpc_stream_refcount *refs; + grpc_closure *closure_at_destroy; + + gpr_arena *arena; + + grpc_transport_stream_op_batch *recv_initial_md_op; + grpc_transport_stream_op_batch *recv_message_op; + grpc_transport_stream_op_batch *recv_trailing_md_op; + + inproc_slice_byte_stream recv_message_stream; + + bool initial_md_sent; + bool trailing_md_sent; + bool initial_md_recvd; + bool trailing_md_recvd; + + bool closed; + + grpc_error *cancel_self_error; + grpc_error *cancel_other_error; + + gpr_timespec deadline; + + bool listed; + struct inproc_stream *stream_list_prev; + struct inproc_stream *stream_list_next; +} inproc_stream; + +static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *bs, size_t max, + grpc_closure *on_complete) { + // Because inproc transport always provides the entire message atomically, + // the byte stream always has data available when this function is called. + // Thus, this function always returns true (unlike other transports) and + // there is never any need to schedule a closure + return true; +} + +static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *bs, + grpc_slice *slice) { + inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; + if (stream->shutdown_error != GRPC_ERROR_NONE) { + return GRPC_ERROR_REF(stream->shutdown_error); + } + *slice = grpc_slice_buffer_take_first(&stream->le->sb); + return GRPC_ERROR_NONE; +} + +static void inproc_slice_byte_stream_shutdown(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *bs, + grpc_error *error) { + inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; + GRPC_ERROR_UNREF(stream->shutdown_error); + stream->shutdown_error = error; +} + +static void inproc_slice_byte_stream_destroy(grpc_exec_ctx *exec_ctx, + grpc_byte_stream *bs) { + inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs; + sb_list_entry_destroy(exec_ctx, stream->le); + GRPC_ERROR_UNREF(stream->shutdown_error); +} + +static const grpc_byte_stream_vtable inproc_slice_byte_stream_vtable = { + inproc_slice_byte_stream_next, inproc_slice_byte_stream_pull, + inproc_slice_byte_stream_shutdown, inproc_slice_byte_stream_destroy}; + +void inproc_slice_byte_stream_init(inproc_slice_byte_stream *s, + sb_list_entry *le) { + s->base.length = (uint32_t)le->sb.length; + s->base.flags = 0; + s->base.vtable = &inproc_slice_byte_stream_vtable; + s->le = le; + s->shutdown_error = GRPC_ERROR_NONE; +} + +static void ref_transport(inproc_transport *t) { + INPROC_LOG(GPR_DEBUG, "ref_transport %p", t); + gpr_ref(&t->refs); +} + +static void really_destroy_transport(grpc_exec_ctx *exec_ctx, + inproc_transport *t) { + INPROC_LOG(GPR_DEBUG, "really_destroy_transport %p", t); + grpc_connectivity_state_destroy(exec_ctx, &t->connectivity); + if (gpr_unref(&t->mu->refs)) { + gpr_free(t->mu); + } + gpr_free(t); +} + +static void unref_transport(grpc_exec_ctx *exec_ctx, inproc_transport *t) { + INPROC_LOG(GPR_DEBUG, "unref_transport %p", t); + if (gpr_unref(&t->refs)) { + really_destroy_transport(exec_ctx, t); + } +} + +#ifndef NDEBUG +#define STREAM_REF(refs, reason) grpc_stream_ref(refs, reason) +#define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs, reason) +#else +#define STREAM_REF(refs, reason) grpc_stream_ref(refs) +#define STREAM_UNREF(e, refs, reason) grpc_stream_unref(e, refs) +#endif + +static void ref_stream(inproc_stream *s, const char *reason) { + INPROC_LOG(GPR_DEBUG, "ref_stream %p %s", s, reason); + STREAM_REF(s->refs, reason); +} + +static void unref_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s, + const char *reason) { + INPROC_LOG(GPR_DEBUG, "unref_stream %p %s", s, reason); + STREAM_UNREF(exec_ctx, s->refs, reason); +} + +static void really_destroy_stream(grpc_exec_ctx *exec_ctx, inproc_stream *s) { + INPROC_LOG(GPR_DEBUG, "really_destroy_stream %p", s); + + slice_buffer_list_destroy(exec_ctx, &s->to_read_message); + slice_buffer_list_destroy(exec_ctx, &s->write_buffer_message); + GRPC_ERROR_UNREF(s->write_buffer_cancel_error); + GRPC_ERROR_UNREF(s->cancel_self_error); + GRPC_ERROR_UNREF(s->cancel_other_error); + + unref_transport(exec_ctx, s->t); + + if (s->closure_at_destroy) { + GRPC_CLOSURE_SCHED(exec_ctx, s->closure_at_destroy, GRPC_ERROR_NONE); + } +} + +static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + +static void log_metadata(const grpc_metadata_batch *md_batch, bool is_client, + bool is_initial) { + for (grpc_linked_mdelem *md = md_batch->list.head; md != NULL; + md = md->next) { + char *key = grpc_slice_to_c_string(GRPC_MDKEY(md->md)); + char *value = grpc_slice_to_c_string(GRPC_MDVALUE(md->md)); + gpr_log(GPR_INFO, "INPROC:%s:%s: %s: %s", is_initial ? "HDR" : "TRL", + is_client ? "CLI" : "SVR", key, value); + gpr_free(key); + gpr_free(value); + } +} + +static grpc_error *fill_in_metadata(grpc_exec_ctx *exec_ctx, inproc_stream *s, + const grpc_metadata_batch *metadata, + uint32_t flags, grpc_metadata_batch *out_md, + uint32_t *outflags, bool *markfilled) { + if (GRPC_TRACER_ON(grpc_inproc_trace)) { + log_metadata(metadata, s->t->is_client, outflags != NULL); + } + + if (outflags != NULL) { + *outflags = flags; + } + if (markfilled != NULL) { + *markfilled = true; + } + grpc_error *error = GRPC_ERROR_NONE; + for (grpc_linked_mdelem *elem = metadata->list.head; + (elem != NULL) && (error == GRPC_ERROR_NONE); elem = elem->next) { + grpc_linked_mdelem *nelem = gpr_arena_alloc(s->arena, sizeof(*nelem)); + nelem->md = grpc_mdelem_from_slices( + exec_ctx, grpc_slice_intern(GRPC_MDKEY(elem->md)), + grpc_slice_intern(GRPC_MDVALUE(elem->md))); + + error = grpc_metadata_batch_link_tail(exec_ctx, out_md, nelem); + } + return error; +} + +static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_stream_refcount *refcount, + const void *server_data, gpr_arena *arena) { + INPROC_LOG(GPR_DEBUG, "init_stream %p %p %p", gt, gs, server_data); + inproc_transport *t = (inproc_transport *)gt; + inproc_stream *s = (inproc_stream *)gs; + s->arena = arena; + + s->refs = refcount; + // Ref this stream right now + ref_stream(s, "inproc_init_stream:init"); + + grpc_metadata_batch_init(&s->to_read_initial_md); + s->to_read_initial_md_flags = 0; + s->to_read_initial_md_filled = false; + grpc_metadata_batch_init(&s->to_read_trailing_md); + s->to_read_trailing_md_filled = false; + grpc_metadata_batch_init(&s->write_buffer_initial_md); + s->write_buffer_initial_md_flags = 0; + s->write_buffer_initial_md_filled = false; + grpc_metadata_batch_init(&s->write_buffer_trailing_md); + s->write_buffer_trailing_md_filled = false; + slice_buffer_list_init(&s->to_read_message); + slice_buffer_list_init(&s->write_buffer_message); + s->reads_needed = false; + s->read_closure_scheduled = false; + GRPC_CLOSURE_INIT(&s->read_closure, read_state_machine, s, + grpc_schedule_on_exec_ctx); + s->t = t; + s->closure_at_destroy = NULL; + s->other_side_closed = false; + + s->initial_md_sent = s->trailing_md_sent = s->initial_md_recvd = + s->trailing_md_recvd = false; + + s->closed = false; + + s->cancel_self_error = GRPC_ERROR_NONE; + s->cancel_other_error = GRPC_ERROR_NONE; + s->write_buffer_cancel_error = GRPC_ERROR_NONE; + s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + s->write_buffer_deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); + + s->stream_list_prev = NULL; + gpr_mu_lock(&t->mu->mu); + s->listed = true; + ref_stream(s, "inproc_init_stream:list"); + s->stream_list_next = t->stream_list; + if (t->stream_list) { + t->stream_list->stream_list_prev = s; + } + t->stream_list = s; + gpr_mu_unlock(&t->mu->mu); + + if (!server_data) { + ref_transport(t); + inproc_transport *st = t->other_side; + ref_transport(st); + s->other_side = NULL; // will get filled in soon + // Pass the client-side stream address to the server-side for a ref + ref_stream(s, "inproc_init_stream:clt"); // ref it now on behalf of server + // side to avoid destruction + INPROC_LOG(GPR_DEBUG, "calling accept stream cb %p %p", + st->accept_stream_cb, st->accept_stream_data); + (*st->accept_stream_cb)(exec_ctx, st->accept_stream_data, &st->base, + (void *)s); + } else { + // This is the server-side and is being called through accept_stream_cb + inproc_stream *cs = (inproc_stream *)server_data; + s->other_side = cs; + // Ref the server-side stream on behalf of the client now + ref_stream(s, "inproc_init_stream:srv"); + + // Now we are about to affect the other side, so lock the transport + // to make sure that it doesn't get destroyed + gpr_mu_lock(&s->t->mu->mu); + cs->other_side = s; + // Now transfer from the other side's write_buffer if any to the to_read + // buffer + if (cs->write_buffer_initial_md_filled) { + fill_in_metadata(exec_ctx, s, &cs->write_buffer_initial_md, + cs->write_buffer_initial_md_flags, + &s->to_read_initial_md, &s->to_read_initial_md_flags, + &s->to_read_initial_md_filled); + s->deadline = gpr_time_min(s->deadline, cs->write_buffer_deadline); + grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_initial_md); + cs->write_buffer_initial_md_filled = false; + } + while (!slice_buffer_list_empty(&cs->write_buffer_message)) { + slice_buffer_list_append_entry( + &s->to_read_message, + slice_buffer_list_pophead(&cs->write_buffer_message)); + } + if (cs->write_buffer_trailing_md_filled) { + fill_in_metadata(exec_ctx, s, &cs->write_buffer_trailing_md, 0, + &s->to_read_trailing_md, NULL, + &s->to_read_trailing_md_filled); + grpc_metadata_batch_clear(exec_ctx, &cs->write_buffer_trailing_md); + cs->write_buffer_trailing_md_filled = false; + } + if (cs->write_buffer_cancel_error != GRPC_ERROR_NONE) { + s->cancel_other_error = cs->write_buffer_cancel_error; + cs->write_buffer_cancel_error = GRPC_ERROR_NONE; + } + + gpr_mu_unlock(&s->t->mu->mu); + } + return 0; // return value is not important +} + +static void close_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s) { + if (!s->closed) { + // Release the metadata that we would have written out + grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_initial_md); + grpc_metadata_batch_destroy(exec_ctx, &s->write_buffer_trailing_md); + + if (s->listed) { + inproc_stream *p = s->stream_list_prev; + inproc_stream *n = s->stream_list_next; + if (p != NULL) { + p->stream_list_next = n; + } else { + s->t->stream_list = n; + } + if (n != NULL) { + n->stream_list_prev = p; + } + s->listed = false; + unref_stream(exec_ctx, s, "close_stream:list"); + } + s->closed = true; + unref_stream(exec_ctx, s, "close_stream:closing"); + } +} + +// This function means that we are done talking/listening to the other side +static void close_other_side_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, + const char *reason) { + if (s->other_side != NULL) { + // First release the metadata that came from the other side's arena + grpc_metadata_batch_destroy(exec_ctx, &s->to_read_initial_md); + grpc_metadata_batch_destroy(exec_ctx, &s->to_read_trailing_md); + + unref_stream(exec_ctx, s->other_side, reason); + s->other_side_closed = true; + s->other_side = NULL; + } else if (!s->other_side_closed) { + s->write_buffer_other_side_closed = true; + } +} + +static void fail_helper_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, + grpc_error *error) { + INPROC_LOG(GPR_DEBUG, "read_state_machine %p fail_helper", s); + // If we're failing this side, we need to make sure that + // we also send or have already sent trailing metadata + if (!s->trailing_md_sent) { + // Send trailing md to the other side indicating cancellation + s->trailing_md_sent = true; + + grpc_metadata_batch fake_md; + grpc_metadata_batch_init(&fake_md); + + inproc_stream *other = s->other_side; + grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + : &other->to_read_trailing_md; + bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + : &other->to_read_trailing_md_filled; + fill_in_metadata(exec_ctx, s, &fake_md, 0, dest, NULL, destfilled); + grpc_metadata_batch_destroy(exec_ctx, &fake_md); + + if (other != NULL) { + if (other->cancel_other_error == GRPC_ERROR_NONE) { + other->cancel_other_error = GRPC_ERROR_REF(error); + } + if (other->reads_needed) { + if (!other->read_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, + GRPC_ERROR_REF(error)); + other->read_closure_scheduled = true; + } + other->reads_needed = false; + } + } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { + s->write_buffer_cancel_error = GRPC_ERROR_REF(error); + } + } + if (s->recv_initial_md_op) { + grpc_error *err; + if (!s->t->is_client) { + // If this is a server, provide initial metadata with a path and authority + // since it expects that as well as no error yet + grpc_metadata_batch fake_md; + grpc_metadata_batch_init(&fake_md); + grpc_linked_mdelem *path_md = gpr_arena_alloc(s->arena, sizeof(*path_md)); + path_md->md = + grpc_mdelem_from_slices(exec_ctx, g_fake_path_key, g_fake_path_value); + GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, path_md) == + GRPC_ERROR_NONE); + grpc_linked_mdelem *auth_md = gpr_arena_alloc(s->arena, sizeof(*auth_md)); + auth_md->md = + grpc_mdelem_from_slices(exec_ctx, g_fake_auth_key, g_fake_auth_value); + GPR_ASSERT(grpc_metadata_batch_link_tail(exec_ctx, &fake_md, auth_md) == + GRPC_ERROR_NONE); + + fill_in_metadata( + exec_ctx, s, &fake_md, 0, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata, + s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, + NULL); + grpc_metadata_batch_destroy(exec_ctx, &fake_md); + err = GRPC_ERROR_NONE; + } else { + err = GRPC_ERROR_REF(error); + } + INPROC_LOG(GPR_DEBUG, + "fail_helper %p scheduling initial-metadata-ready %p %p", s, + error, err); + GRPC_CLOSURE_SCHED(exec_ctx, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata_ready, + err); + // Last use of err so no need to REF and then UNREF it + + if ((s->recv_initial_md_op != s->recv_message_op) && + (s->recv_initial_md_op != s->recv_trailing_md_op)) { + INPROC_LOG(GPR_DEBUG, + "fail_helper %p scheduling initial-metadata-on-complete %p", + error, s); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, + GRPC_ERROR_REF(error)); + } + s->recv_initial_md_op = NULL; + } + if (s->recv_message_op) { + INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-ready %p", s, + error); + GRPC_CLOSURE_SCHED( + exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error)); + if (s->recv_message_op != s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, "fail_helper %p scheduling message-on-complete %p", + s, error); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, + GRPC_ERROR_REF(error)); + } + s->recv_message_op = NULL; + } + if (s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "fail_helper %p scheduling trailing-md-on-complete %p", s, + error); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_ERROR_REF(error)); + s->recv_trailing_md_op = NULL; + } + close_other_side_locked(exec_ctx, s, "fail_helper:other_side"); + close_stream_locked(exec_ctx, s); + + GRPC_ERROR_UNREF(error); +} + +static void read_state_machine(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + // This function gets called when we have contents in the unprocessed reads + // Get what we want based on our ops wanted + // Schedule our appropriate closures + // and then return to reads_needed state if still needed + + // Since this is a closure directly invoked by the combiner, it should not + // unref the error parameter explicitly; the combiner will do that implicitly + grpc_error *new_err = GRPC_ERROR_NONE; + + bool needs_close = false; + + INPROC_LOG(GPR_DEBUG, "read_state_machine %p", arg); + inproc_stream *s = (inproc_stream *)arg; + gpr_mu *mu = &s->t->mu->mu; // keep aside in case s gets closed + gpr_mu_lock(mu); + s->read_closure_scheduled = false; + // cancellation takes precedence + if (s->cancel_self_error != GRPC_ERROR_NONE) { + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_self_error)); + goto done; + } else if (s->cancel_other_error != GRPC_ERROR_NONE) { + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(s->cancel_other_error)); + goto done; + } else if (error != GRPC_ERROR_NONE) { + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(error)); + goto done; + } + + if (s->recv_initial_md_op) { + if (!s->to_read_initial_md_filled) { + // We entered the state machine on some other kind of read even though + // we still haven't satisfied initial md . That's an error. + new_err = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected frame sequencing"); + INPROC_LOG(GPR_DEBUG, + "read_state_machine %p scheduling on_complete errors for no " + "initial md %p", + s, new_err); + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + goto done; + } else if (s->initial_md_recvd) { + new_err = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md"); + INPROC_LOG( + GPR_DEBUG, + "read_state_machine %p scheduling on_complete errors for already " + "recvd initial md %p", + s, new_err); + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + goto done; + } + + s->initial_md_recvd = true; + new_err = fill_in_metadata( + exec_ctx, s, &s->to_read_initial_md, s->to_read_initial_md_flags, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata, + s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags, NULL); + s->recv_initial_md_op->payload->recv_initial_metadata.recv_initial_metadata + ->deadline = s->deadline; + grpc_metadata_batch_clear(exec_ctx, &s->to_read_initial_md); + s->to_read_initial_md_filled = false; + INPROC_LOG(GPR_DEBUG, + "read_state_machine %p scheduling initial-metadata-ready %p", s, + new_err); + GRPC_CLOSURE_SCHED(exec_ctx, + s->recv_initial_md_op->payload->recv_initial_metadata + .recv_initial_metadata_ready, + GRPC_ERROR_REF(new_err)); + if ((s->recv_initial_md_op != s->recv_message_op) && + (s->recv_initial_md_op != s->recv_trailing_md_op)) { + INPROC_LOG( + GPR_DEBUG, + "read_state_machine %p scheduling initial-metadata-on-complete %p", s, + new_err); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_initial_md_op->on_complete, + GRPC_ERROR_REF(new_err)); + } + s->recv_initial_md_op = NULL; + + if (new_err != GRPC_ERROR_NONE) { + INPROC_LOG(GPR_DEBUG, + "read_state_machine %p scheduling on_complete errors2 %p", s, + new_err); + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + goto done; + } + } + if (s->to_read_initial_md_filled) { + new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Unexpected recv frame"); + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + goto done; + } + if (!slice_buffer_list_empty(&s->to_read_message) && s->recv_message_op) { + inproc_slice_byte_stream_init( + &s->recv_message_stream, + slice_buffer_list_pophead(&s->to_read_message)); + *s->recv_message_op->payload->recv_message.recv_message = + &s->recv_message_stream.base; + INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); + GRPC_CLOSURE_SCHED( + exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); + if (s->recv_message_op != s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "read_state_machine %p scheduling message-on-complete %p", s, + new_err); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, + GRPC_ERROR_REF(new_err)); + } + s->recv_message_op = NULL; + } + if (s->to_read_trailing_md_filled) { + if (s->trailing_md_recvd) { + new_err = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md"); + INPROC_LOG( + GPR_DEBUG, + "read_state_machine %p scheduling on_complete errors for already " + "recvd trailing md %p", + s, new_err); + fail_helper_locked(exec_ctx, s, GRPC_ERROR_REF(new_err)); + goto done; + } + if (s->recv_message_op != NULL) { + // This message needs to be wrapped up because it will never be + // satisfied + INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", + s); + GRPC_CLOSURE_SCHED( + exec_ctx, + s->recv_message_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); + if (s->recv_message_op != s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "read_state_machine %p scheduling message-on-complete %p", s, + new_err); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, + GRPC_ERROR_REF(new_err)); + } + s->recv_message_op = NULL; + } + if (s->recv_trailing_md_op != NULL) { + // We wanted trailing metadata and we got it + s->trailing_md_recvd = true; + new_err = + fill_in_metadata(exec_ctx, s, &s->to_read_trailing_md, 0, + s->recv_trailing_md_op->payload + ->recv_trailing_metadata.recv_trailing_metadata, + NULL, NULL); + grpc_metadata_batch_clear(exec_ctx, &s->to_read_trailing_md); + s->to_read_trailing_md_filled = false; + + // We should schedule the recv_trailing_md_op completion if + // 1. this stream is the client-side + // 2. this stream is the server-side AND has already sent its trailing md + // (If the server hasn't already sent its trailing md, it doesn't have + // a final status, so don't mark this op complete) + if (s->t->is_client || s->trailing_md_sent) { + INPROC_LOG( + GPR_DEBUG, + "read_state_machine %p scheduling trailing-md-on-complete %p", s, + new_err); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_ERROR_REF(new_err)); + s->recv_trailing_md_op = NULL; + needs_close = true; + } else { + INPROC_LOG(GPR_DEBUG, + "read_state_machine %p server needs to delay handling " + "trailing-md-on-complete %p", + s, new_err); + } + } else { + INPROC_LOG( + GPR_DEBUG, + "read_state_machine %p has trailing md but not yet waiting for it", + s); + } + } + if (s->trailing_md_recvd && s->recv_message_op) { + // No further message will come on this stream, so finish off the + // recv_message_op + INPROC_LOG(GPR_DEBUG, "read_state_machine %p scheduling message-ready", s); + GRPC_CLOSURE_SCHED( + exec_ctx, s->recv_message_op->payload->recv_message.recv_message_ready, + GRPC_ERROR_NONE); + if (s->recv_message_op != s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "read_state_machine %p scheduling message-on-complete %p", s, + new_err); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_message_op->on_complete, + GRPC_ERROR_REF(new_err)); + } + s->recv_message_op = NULL; + } + if (s->recv_message_op || s->recv_trailing_md_op) { + // Didn't get the item we wanted so we still need to get + // rescheduled + INPROC_LOG(GPR_DEBUG, "read_state_machine %p still needs closure %p %p", s, + s->recv_message_op, s->recv_trailing_md_op); + s->reads_needed = true; + } +done: + if (needs_close) { + close_other_side_locked(exec_ctx, s, "read_state_machine"); + close_stream_locked(exec_ctx, s); + } + gpr_mu_unlock(mu); + GRPC_ERROR_UNREF(new_err); +} + +static grpc_closure do_nothing_closure; + +static bool cancel_stream_locked(grpc_exec_ctx *exec_ctx, inproc_stream *s, + grpc_error *error) { + bool ret = false; // was the cancel accepted + INPROC_LOG(GPR_DEBUG, "cancel_stream %p with %s", s, + grpc_error_string(error)); + if (s->cancel_self_error == GRPC_ERROR_NONE) { + ret = true; + s->cancel_self_error = GRPC_ERROR_REF(error); + if (s->reads_needed) { + if (!s->read_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, + GRPC_ERROR_REF(s->cancel_self_error)); + s->read_closure_scheduled = true; + } + s->reads_needed = false; + } + // Send trailing md to the other side indicating cancellation, even if we + // already have + s->trailing_md_sent = true; + + grpc_metadata_batch cancel_md; + grpc_metadata_batch_init(&cancel_md); + + inproc_stream *other = s->other_side; + grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + : &other->to_read_trailing_md; + bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + : &other->to_read_trailing_md_filled; + fill_in_metadata(exec_ctx, s, &cancel_md, 0, dest, NULL, destfilled); + grpc_metadata_batch_destroy(exec_ctx, &cancel_md); + + if (other != NULL) { + if (other->cancel_other_error == GRPC_ERROR_NONE) { + other->cancel_other_error = GRPC_ERROR_REF(s->cancel_self_error); + } + if (other->reads_needed) { + if (!other->read_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, + GRPC_ERROR_REF(other->cancel_other_error)); + other->read_closure_scheduled = true; + } + other->reads_needed = false; + } + } else if (s->write_buffer_cancel_error == GRPC_ERROR_NONE) { + s->write_buffer_cancel_error = GRPC_ERROR_REF(s->cancel_self_error); + } + + // if we are a server and already received trailing md but + // couldn't complete that because we hadn't yet sent out trailing + // md, now's the chance + if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "cancel_stream %p scheduling trailing-md-on-complete %p", s, + s->cancel_self_error); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_ERROR_REF(s->cancel_self_error)); + s->recv_trailing_md_op = NULL; + } + } + + close_other_side_locked(exec_ctx, s, "cancel_stream:other_side"); + close_stream_locked(exec_ctx, s); + + GRPC_ERROR_UNREF(error); + return ret; +} + +static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, + grpc_transport_stream_op_batch *op) { + INPROC_LOG(GPR_DEBUG, "perform_stream_op %p %p %p", gt, gs, op); + inproc_stream *s = (inproc_stream *)gs; + gpr_mu *mu = &s->t->mu->mu; // save aside in case s gets closed + gpr_mu_lock(mu); + + if (GRPC_TRACER_ON(grpc_inproc_trace)) { + if (op->send_initial_metadata) { + log_metadata(op->payload->send_initial_metadata.send_initial_metadata, + s->t->is_client, true); + } + if (op->send_trailing_metadata) { + log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata, + s->t->is_client, false); + } + } + grpc_error *error = GRPC_ERROR_NONE; + grpc_closure *on_complete = op->on_complete; + if (on_complete == NULL) { + on_complete = &do_nothing_closure; + } + + if (op->cancel_stream) { + // Call cancel_stream_locked without ref'ing the cancel_error because + // this function is responsible to make sure that that field gets unref'ed + cancel_stream_locked(exec_ctx, s, op->payload->cancel_stream.cancel_error); + // this op can complete without an error + } else if (s->cancel_self_error != GRPC_ERROR_NONE) { + // already self-canceled so still give it an error + error = GRPC_ERROR_REF(s->cancel_self_error); + } else { + INPROC_LOG(GPR_DEBUG, "perform_stream_op %p%s%s%s%s%s%s", s, + op->send_initial_metadata ? " send_initial_metadata" : "", + op->send_message ? " send_message" : "", + op->send_trailing_metadata ? " send_trailing_metadata" : "", + op->recv_initial_metadata ? " recv_initial_metadata" : "", + op->recv_message ? " recv_message" : "", + op->recv_trailing_metadata ? " recv_trailing_metadata" : ""); + } + + bool needs_close = false; + + if (error == GRPC_ERROR_NONE && + (op->send_initial_metadata || op->send_message || + op->send_trailing_metadata)) { + inproc_stream *other = s->other_side; + if (s->t->is_closed) { + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown"); + } + if (error == GRPC_ERROR_NONE && op->send_initial_metadata) { + grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_initial_md + : &other->to_read_initial_md; + uint32_t *destflags = (other == NULL) ? &s->write_buffer_initial_md_flags + : &other->to_read_initial_md_flags; + bool *destfilled = (other == NULL) ? &s->write_buffer_initial_md_filled + : &other->to_read_initial_md_filled; + if (*destfilled || s->initial_md_sent) { + // The buffer is already in use; that's an error! + INPROC_LOG(GPR_DEBUG, "Extra initial metadata %p", s); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata"); + } else { + if (!other->closed) { + fill_in_metadata( + exec_ctx, s, + op->payload->send_initial_metadata.send_initial_metadata, + op->payload->send_initial_metadata.send_initial_metadata_flags, + dest, destflags, destfilled); + } + if (s->t->is_client) { + gpr_timespec *dl = + (other == NULL) ? &s->write_buffer_deadline : &other->deadline; + *dl = gpr_time_min(*dl, op->payload->send_initial_metadata + .send_initial_metadata->deadline); + s->initial_md_sent = true; + } + } + } + if (error == GRPC_ERROR_NONE && op->send_message) { + size_t remaining = op->payload->send_message.send_message->length; + grpc_slice_buffer *dest = slice_buffer_list_append( + (other == NULL) ? &s->write_buffer_message : &other->to_read_message); + do { + grpc_slice message_slice; + grpc_closure unused; + GPR_ASSERT(grpc_byte_stream_next(exec_ctx, + op->payload->send_message.send_message, + SIZE_MAX, &unused)); + error = grpc_byte_stream_pull( + exec_ctx, op->payload->send_message.send_message, &message_slice); + if (error != GRPC_ERROR_NONE) { + cancel_stream_locked(exec_ctx, s, GRPC_ERROR_REF(error)); + break; + } + GPR_ASSERT(error == GRPC_ERROR_NONE); + remaining -= GRPC_SLICE_LENGTH(message_slice); + grpc_slice_buffer_add(dest, message_slice); + } while (remaining != 0); + grpc_byte_stream_destroy(exec_ctx, + op->payload->send_message.send_message); + } + if (error == GRPC_ERROR_NONE && op->send_trailing_metadata) { + grpc_metadata_batch *dest = (other == NULL) ? &s->write_buffer_trailing_md + : &other->to_read_trailing_md; + bool *destfilled = (other == NULL) ? &s->write_buffer_trailing_md_filled + : &other->to_read_trailing_md_filled; + if (*destfilled || s->trailing_md_sent) { + // The buffer is already in use; that's an error! + INPROC_LOG(GPR_DEBUG, "Extra trailing metadata %p", s); + error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata"); + } else { + if (!other->closed) { + fill_in_metadata( + exec_ctx, s, + op->payload->send_trailing_metadata.send_trailing_metadata, 0, + dest, NULL, destfilled); + } + s->trailing_md_sent = true; + if (!s->t->is_client && s->trailing_md_recvd && + s->recv_trailing_md_op) { + INPROC_LOG(GPR_DEBUG, + "perform_stream_op %p scheduling trailing-md-on-complete", + s); + GRPC_CLOSURE_SCHED(exec_ctx, s->recv_trailing_md_op->on_complete, + GRPC_ERROR_NONE); + s->recv_trailing_md_op = NULL; + needs_close = true; + } + } + } + if (other != NULL && other->reads_needed) { + if (!other->read_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &other->read_closure, error); + other->read_closure_scheduled = true; + } + other->reads_needed = false; + } + } + if (error == GRPC_ERROR_NONE && + (op->recv_initial_metadata || op->recv_message || + op->recv_trailing_metadata)) { + // If there are any reads, mark it so that the read closure will react to + // them + if (op->recv_initial_metadata) { + s->recv_initial_md_op = op; + } + if (op->recv_message) { + s->recv_message_op = op; + } + if (op->recv_trailing_metadata) { + s->recv_trailing_md_op = op; + } + + // We want to initiate the closure if: + // 1. There is initial metadata and something ready to take that + // 2. There is a message and something ready to take it + // 3. There is trailing metadata, even if nothing specifically wants + // that because that can shut down the message as well + if ((s->to_read_initial_md_filled && op->recv_initial_metadata) || + ((!slice_buffer_list_empty(&s->to_read_message) || + s->trailing_md_recvd) && + op->recv_message) || + (s->to_read_trailing_md_filled)) { + if (!s->read_closure_scheduled) { + GRPC_CLOSURE_SCHED(exec_ctx, &s->read_closure, GRPC_ERROR_NONE); + s->read_closure_scheduled = true; + } + } else { + s->reads_needed = true; + } + } else { + if (error != GRPC_ERROR_NONE) { + // Schedule op's read closures that we didn't push to read state machine + if (op->recv_initial_metadata) { + INPROC_LOG( + GPR_DEBUG, + "perform_stream_op error %p scheduling initial-metadata-ready %p", + s, error); + GRPC_CLOSURE_SCHED( + exec_ctx, + op->payload->recv_initial_metadata.recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); + } + if (op->recv_message) { + INPROC_LOG( + GPR_DEBUG, + "perform_stream_op error %p scheduling recv message-ready %p", s, + error); + GRPC_CLOSURE_SCHED(exec_ctx, + op->payload->recv_message.recv_message_ready, + GRPC_ERROR_REF(error)); + } + } + INPROC_LOG(GPR_DEBUG, "perform_stream_op %p scheduling on_complete %p", s, + error); + GRPC_CLOSURE_SCHED(exec_ctx, on_complete, GRPC_ERROR_REF(error)); + } + if (needs_close) { + close_other_side_locked(exec_ctx, s, "perform_stream_op:other_side"); + close_stream_locked(exec_ctx, s); + } + gpr_mu_unlock(mu); + GRPC_ERROR_UNREF(error); +} + +static void close_transport_locked(grpc_exec_ctx *exec_ctx, + inproc_transport *t) { + INPROC_LOG(GPR_DEBUG, "close_transport %p %d", t, t->is_closed); + grpc_connectivity_state_set( + exec_ctx, &t->connectivity, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Closing transport."), + "close transport"); + if (!t->is_closed) { + t->is_closed = true; + /* Also end all streams on this transport */ + while (t->stream_list != NULL) { + // cancel_stream_locked also adjusts stream list + cancel_stream_locked( + exec_ctx, t->stream_list, + grpc_error_set_int( + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Transport closed"), + GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE)); + } + } +} + +static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_transport_op *op) { + inproc_transport *t = (inproc_transport *)gt; + INPROC_LOG(GPR_DEBUG, "perform_transport_op %p %p", t, op); + gpr_mu_lock(&t->mu->mu); + if (op->on_connectivity_state_change) { + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &t->connectivity, op->connectivity_state, + op->on_connectivity_state_change); + } + if (op->set_accept_stream) { + t->accept_stream_cb = op->set_accept_stream_fn; + t->accept_stream_data = op->set_accept_stream_user_data; + } + if (op->on_consumed) { + GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); + } + + bool do_close = false; + if (op->goaway_error != GRPC_ERROR_NONE) { + do_close = true; + GRPC_ERROR_UNREF(op->goaway_error); + } + if (op->disconnect_with_error != GRPC_ERROR_NONE) { + do_close = true; + GRPC_ERROR_UNREF(op->disconnect_with_error); + } + + if (do_close) { + close_transport_locked(exec_ctx, t); + } + gpr_mu_unlock(&t->mu->mu); +} + +static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, + grpc_closure *then_schedule_closure) { + INPROC_LOG(GPR_DEBUG, "destroy_stream %p %p", gs, then_schedule_closure); + inproc_stream *s = (inproc_stream *)gs; + s->closure_at_destroy = then_schedule_closure; + really_destroy_stream(exec_ctx, s); +} + +static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) { + inproc_transport *t = (inproc_transport *)gt; + INPROC_LOG(GPR_DEBUG, "destroy_transport %p", t); + gpr_mu_lock(&t->mu->mu); + close_transport_locked(exec_ctx, t); + gpr_mu_unlock(&t->mu->mu); + unref_transport(exec_ctx, t->other_side); + unref_transport(exec_ctx, t); +} + +/******************************************************************************* + * Main inproc transport functions + */ +static void inproc_transports_create(grpc_exec_ctx *exec_ctx, + grpc_transport **server_transport, + const grpc_channel_args *server_args, + grpc_transport **client_transport, + const grpc_channel_args *client_args) { + INPROC_LOG(GPR_DEBUG, "inproc_transports_create"); + inproc_transport *st = gpr_zalloc(sizeof(*st)); + inproc_transport *ct = gpr_zalloc(sizeof(*ct)); + // Share one lock between both sides since both sides get affected + st->mu = ct->mu = gpr_malloc(sizeof(*st->mu)); + gpr_mu_init(&st->mu->mu); + gpr_ref_init(&st->mu->refs, 2); + st->base.vtable = &inproc_vtable; + ct->base.vtable = &inproc_vtable; + // Start each side of transport with 2 refs since they each have a ref + // to the other + gpr_ref_init(&st->refs, 2); + gpr_ref_init(&ct->refs, 2); + st->is_client = false; + ct->is_client = true; + grpc_connectivity_state_init(&st->connectivity, GRPC_CHANNEL_READY, + "inproc_server"); + grpc_connectivity_state_init(&ct->connectivity, GRPC_CHANNEL_READY, + "inproc_client"); + st->other_side = ct; + ct->other_side = st; + st->stream_list = NULL; + ct->stream_list = NULL; + *server_transport = (grpc_transport *)st; + *client_transport = (grpc_transport *)ct; +} + +grpc_channel *grpc_inproc_channel_create(grpc_server *server, + grpc_channel_args *args, + void *reserved) { + GRPC_API_TRACE("grpc_inproc_channel_create(server=%p, args=%p)", 2, + (server, args)); + + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + const grpc_channel_args *server_args = grpc_server_get_channel_args(server); + + // Add a default authority channel argument for the client + + grpc_arg default_authority_arg; + default_authority_arg.type = GRPC_ARG_STRING; + default_authority_arg.key = GRPC_ARG_DEFAULT_AUTHORITY; + default_authority_arg.value.string = "inproc.authority"; + grpc_channel_args *client_args = + grpc_channel_args_copy_and_add(args, &default_authority_arg, 1); + + grpc_transport *server_transport; + grpc_transport *client_transport; + inproc_transports_create(&exec_ctx, &server_transport, server_args, + &client_transport, client_args); + + grpc_server_setup_transport(&exec_ctx, server, server_transport, NULL, + server_args); + grpc_channel *channel = + grpc_channel_create(&exec_ctx, "inproc", client_args, + GRPC_CLIENT_DIRECT_CHANNEL, client_transport); + + // Free up created channel args + grpc_channel_args_destroy(&exec_ctx, client_args); + + // Now finish scheduled operations + grpc_exec_ctx_finish(&exec_ctx); + + return channel; +} + +/******************************************************************************* + * INTEGRATION GLUE + */ + +static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset *pollset) { + // Nothing to do here +} + +static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt, + grpc_stream *gs, grpc_pollset_set *pollset_set) { + // Nothing to do here +} + +static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) { + return gpr_strdup("inproc"); +} + +static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, grpc_transport *t) { + return NULL; +} + +static const grpc_transport_vtable inproc_vtable = { + sizeof(inproc_stream), "inproc", + init_stream, set_pollset, + set_pollset_set, perform_stream_op, + perform_transport_op, destroy_stream, + destroy_transport, get_peer, + get_endpoint}; + +/******************************************************************************* + * GLOBAL INIT AND DESTROY + */ +static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {} + +void grpc_inproc_transport_init(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_CLOSURE_INIT(&do_nothing_closure, do_nothing, NULL, + grpc_schedule_on_exec_ctx); + g_empty_slice = grpc_slice_from_static_buffer(NULL, 0); + + grpc_slice key_tmp = grpc_slice_from_static_string(":path"); + g_fake_path_key = grpc_slice_intern(key_tmp); + grpc_slice_unref_internal(&exec_ctx, key_tmp); + + g_fake_path_value = grpc_slice_from_static_string("/"); + + grpc_slice auth_tmp = grpc_slice_from_static_string(":authority"); + g_fake_auth_key = grpc_slice_intern(auth_tmp); + grpc_slice_unref_internal(&exec_ctx, auth_tmp); + + g_fake_auth_value = grpc_slice_from_static_string("inproc-fail"); + grpc_exec_ctx_finish(&exec_ctx); +} + +void grpc_inproc_transport_shutdown(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_unref_internal(&exec_ctx, g_empty_slice); + grpc_slice_unref_internal(&exec_ctx, g_fake_path_key); + grpc_slice_unref_internal(&exec_ctx, g_fake_path_value); + grpc_slice_unref_internal(&exec_ctx, g_fake_auth_key); + grpc_slice_unref_internal(&exec_ctx, g_fake_auth_value); + grpc_exec_ctx_finish(&exec_ctx); +} diff --git a/src/core/ext/transport/inproc/inproc_transport.h b/src/core/ext/transport/inproc/inproc_transport.h new file mode 100644 index 0000000000..37e6d99e99 --- /dev/null +++ b/src/core/ext/transport/inproc/inproc_transport.h @@ -0,0 +1,41 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_INPROC_INPROC_TRANSPORT_H +#define GRPC_CORE_EXT_TRANSPORT_INPROC_INPROC_TRANSPORT_H + +#include "src/core/lib/transport/transport_impl.h" + +#ifdef __cplusplus +extern "C" { +#endif + +grpc_channel *grpc_inproc_channel_create(grpc_server *server, + grpc_channel_args *args, + void *reserved); + +extern grpc_tracer_flag grpc_inproc_trace; + +void grpc_inproc_transport_init(void); +void grpc_inproc_transport_shutdown(void); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_EXT_TRANSPORT_INPROC_INPROC_TRANSPORT_H */ |