diff options
Diffstat (limited to 'src')
6 files changed, 303 insertions, 548 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index f29c5d55ed..de516ab4c9 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -178,8 +178,8 @@ typedef struct client_channel_channel_data { grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ grpc_channel_args *resolver_result; - /** a list of closures that are all waiting for config to come in */ - grpc_closure_list waiting_for_config_closures; + /** a list of closures that are all waiting for resolver result to come in */ + grpc_closure_list waiting_for_resolver_result_closures; /** resolver callback */ grpc_closure on_resolver_result_changed; /** connectivity state being tracked */ @@ -342,49 +342,15 @@ static void parse_retry_throttle_params(const grpc_json *field, void *arg) { } } -// Wrap a closure associated with \a lb_policy. The associated callback (\a -// wrapped_on_pick_closure_cb) is responsible for unref'ing \a lb_policy after -// scheduling \a wrapped_closure. -typedef struct wrapped_on_pick_closure_arg { - /* the closure instance using this struct as argument */ - grpc_closure wrapper_closure; - - /* the original closure. Usually a on_complete/notify cb for pick() and ping() - * calls against the internal RR instance, respectively. */ - grpc_closure *wrapped_closure; - - /* The policy instance related to the closure */ - grpc_lb_policy *lb_policy; -} wrapped_on_pick_closure_arg; - -// Invoke \a arg->wrapped_closure, unref \a arg->lb_policy and free \a arg. -static void wrapped_on_pick_closure_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - wrapped_on_pick_closure_arg *wc_arg = arg; - GPR_ASSERT(wc_arg != NULL); - GPR_ASSERT(wc_arg->wrapped_closure != NULL); - GPR_ASSERT(wc_arg->lb_policy != NULL); - GRPC_CLOSURE_RUN(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error)); - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->lb_policy, "pick_subchannel_wrapping"); - gpr_free(wc_arg); -} - static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; + // Extract the following fields from the resolver result, if non-NULL. char *lb_policy_name = NULL; - grpc_lb_policy *lb_policy = NULL; - grpc_lb_policy *old_lb_policy = NULL; - grpc_slice_hash_table *method_params_table = NULL; - grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; - bool exit_idle = false; - grpc_error *state_error = - GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); + grpc_lb_policy *new_lb_policy = NULL; char *service_config_json = NULL; - service_config_parsing_state parsing_state; - memset(&parsing_state, 0, sizeof(parsing_state)); - - bool lb_policy_updated = false; + grpc_server_retry_throttle_data *retry_throttle_data = NULL; + grpc_slice_hash_table *method_params_table = NULL; if (chand->resolver_result != NULL) { // Find LB policy name. const grpc_arg *channel_arg = @@ -419,32 +385,29 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, // Use pick_first if nothing was specified and we didn't select grpclb // above. if (lb_policy_name == NULL) lb_policy_name = "pick_first"; - // Instantiate LB policy. grpc_lb_policy_args lb_policy_args; lb_policy_args.args = chand->resolver_result; lb_policy_args.client_channel_factory = chand->client_channel_factory; lb_policy_args.combiner = chand->combiner; - + // Check to see if we're already using the right LB policy. + // Note: It's safe to use chand->info_lb_policy_name here without + // taking a lock on chand->info_mu, because this function is the + // only thing that modifies its value, and it can only be invoked + // once at any given time. const bool lb_policy_type_changed = - (chand->info_lb_policy_name == NULL) || - (strcmp(chand->info_lb_policy_name, lb_policy_name) != 0); + chand->info_lb_policy_name == NULL || + strcmp(chand->info_lb_policy_name, lb_policy_name) != 0; if (chand->lb_policy != NULL && !lb_policy_type_changed) { - // update - lb_policy_updated = true; + // Continue using the same LB policy. Update with new addresses. grpc_lb_policy_update_locked(exec_ctx, chand->lb_policy, &lb_policy_args); } else { - lb_policy = + // Instantiate new LB policy. + new_lb_policy = grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); - if (lb_policy != NULL) { - GRPC_LB_POLICY_REF(lb_policy, "config_change"); - GRPC_ERROR_UNREF(state_error); - state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy, - &state_error); - old_lb_policy = chand->lb_policy; - chand->lb_policy = lb_policy; + if (new_lb_policy == NULL) { + gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", lb_policy_name); } } - // Find service config. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_SERVICE_CONFIG); @@ -461,12 +424,14 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_uri *uri = grpc_uri_parse(exec_ctx, channel_arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); + service_config_parsing_state parsing_state; + memset(&parsing_state, 0, sizeof(parsing_state)); parsing_state.server_name = uri->path[0] == '/' ? uri->path + 1 : uri->path; grpc_service_config_parse_global_params( service_config, parse_retry_throttle_params, &parsing_state); - parsing_state.server_name = NULL; grpc_uri_destroy(uri); + retry_throttle_data = parsing_state.retry_throttle_data; method_params_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, method_parameters_create_from_json, method_parameters_free); @@ -480,12 +445,11 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } - - if (lb_policy != NULL) { - grpc_pollset_set_add_pollset_set(exec_ctx, lb_policy->interested_parties, - chand->interested_parties); - } - + // Now swap out fields in chand. Note that the new values may still + // be NULL if (e.g.) the resolver failed to return results or the + // results did not contain the necessary data. + // + // First, swap out the data used by cc_get_channel_info(). gpr_mu_lock(&chand->info_mu); if (lb_policy_name != NULL) { gpr_free(chand->info_lb_policy_name); @@ -496,75 +460,77 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, chand->info_service_config_json = service_config_json; } gpr_mu_unlock(&chand->info_mu); - + // Swap out the retry throttle data. if (chand->retry_throttle_data != NULL) { grpc_server_retry_throttle_data_unref(chand->retry_throttle_data); } - chand->retry_throttle_data = parsing_state.retry_throttle_data; + chand->retry_throttle_data = retry_throttle_data; + // Swap out the method params table. if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } chand->method_params_table = method_params_table; - if (lb_policy != NULL) { - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures); - } else if (chand->resolver == NULL /* disconnected */) { - grpc_closure_list_fail_all(&chand->waiting_for_config_closures, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Channel disconnected", &error, 1)); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures); - } - if (!lb_policy_updated && lb_policy != NULL && - chand->exit_idle_when_lb_policy_arrives) { - GRPC_LB_POLICY_REF(lb_policy, "exit_idle"); - exit_idle = true; - chand->exit_idle_when_lb_policy_arrives = false; - } - - if (error == GRPC_ERROR_NONE && chand->resolver) { - if (!lb_policy_updated) { - set_channel_connectivity_state_locked(exec_ctx, chand, state, - GRPC_ERROR_REF(state_error), - "new_lb+resolver"); - if (lb_policy != NULL) { - watch_lb_policy_locked(exec_ctx, chand, lb_policy, state); - } + // If we have a new LB policy or are shutting down (in which case + // new_lb_policy will be NULL), swap out the LB policy, unreffing the + // old one and removing its fds from chand->interested_parties. + // Note that we do NOT do this if either (a) we updated the existing + // LB policy above or (b) we failed to create the new LB policy (in + // which case we want to continue using the most recent one we had). + if (new_lb_policy != NULL || error != GRPC_ERROR_NONE || + chand->resolver == NULL) { + if (chand->lb_policy != NULL) { + grpc_pollset_set_del_pollset_set(exec_ctx, + chand->lb_policy->interested_parties, + chand->interested_parties); + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); - } else { + chand->lb_policy = new_lb_policy; + } + // Now that we've swapped out the relevant fields of chand, check for + // error or shutdown. + if (error != GRPC_ERROR_NONE || chand->resolver == NULL) { if (chand->resolver != NULL) { grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; } - grpc_error *refs[] = {error, state_error}; set_channel_connectivity_state_locked( exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)), + "Got resolver result after disconnection", &error, 1), "resolver_gone"); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver"); + grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Channel disconnected", &error, 1)); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, + &chand->waiting_for_resolver_result_closures); + } else { // Not shutting down. + grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_error *state_error = + GRPC_ERROR_CREATE_FROM_STATIC_STRING("No load balancing policy"); + if (new_lb_policy != NULL) { + GRPC_ERROR_UNREF(state_error); + state = grpc_lb_policy_check_connectivity_locked(exec_ctx, new_lb_policy, + &state_error); + grpc_pollset_set_add_pollset_set(exec_ctx, + new_lb_policy->interested_parties, + chand->interested_parties); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, + &chand->waiting_for_resolver_result_closures); + if (chand->exit_idle_when_lb_policy_arrives) { + grpc_lb_policy_exit_idle_locked(exec_ctx, new_lb_policy); + chand->exit_idle_when_lb_policy_arrives = false; + } + watch_lb_policy_locked(exec_ctx, chand, new_lb_policy, state); + } + set_channel_connectivity_state_locked( + exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); + GRPC_ERROR_UNREF(state_error); } - - if (!lb_policy_updated && lb_policy != NULL && exit_idle) { - grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); - } - - if (old_lb_policy != NULL) { - grpc_pollset_set_del_pollset_set( - exec_ctx, old_lb_policy->interested_parties, chand->interested_parties); - GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel"); - old_lb_policy = NULL; - } - - if (!lb_policy_updated && lb_policy != NULL) { - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change"); - } - - GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "resolver"); - GRPC_ERROR_UNREF(state_error); } static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -602,9 +568,10 @@ static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; if (!chand->started_resolving) { - grpc_closure_list_fail_all(&chand->waiting_for_config_closures, + grpc_closure_list_fail_all(&chand->waiting_for_resolver_result_closures, GRPC_ERROR_REF(op->disconnect_with_error)); - GRPC_CLOSURE_LIST_SCHED(exec_ctx, &chand->waiting_for_config_closures); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, + &chand->waiting_for_resolver_result_closures); } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, @@ -770,6 +737,16 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, * PER-CALL FUNCTIONS */ +// Max number of batches that can be pending on a call at any given +// time. This includes: +// recv_initial_metadata +// send_initial_metadata +// recv_message +// send_message +// recv_trailing_metadata +// send_trailing_metadata +#define MAX_WAITING_BATCHES 6 + /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting @@ -800,11 +777,10 @@ typedef struct client_channel_call_data { grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; - grpc_transport_stream_op_batch **waiting_ops; - size_t waiting_ops_count; - size_t waiting_ops_capacity; + grpc_transport_stream_op_batch *waiting_for_pick_batches[MAX_WAITING_BATCHES]; + size_t waiting_for_pick_batches_count; - grpc_closure next_step; + grpc_transport_stream_op_batch_payload *initial_metadata_payload; grpc_call_stack *owning_call; @@ -853,57 +829,44 @@ grpc_subchannel_call *grpc_client_channel_get_subchannel_call( return get_call_or_error(call_elem->call_data).subchannel_call; } -static void add_waiting_locked(call_data *calld, - grpc_transport_stream_op_batch *op) { - GPR_TIMER_BEGIN("add_waiting_locked", 0); - if (calld->waiting_ops_count == calld->waiting_ops_capacity) { - calld->waiting_ops_capacity = GPR_MAX(3, 2 * calld->waiting_ops_capacity); - calld->waiting_ops = - gpr_realloc(calld->waiting_ops, - calld->waiting_ops_capacity * sizeof(*calld->waiting_ops)); - } - calld->waiting_ops[calld->waiting_ops_count++] = op; - GPR_TIMER_END("add_waiting_locked", 0); +static void waiting_for_pick_batches_add_locked( + call_data *calld, grpc_transport_stream_op_batch *batch) { + GPR_ASSERT(calld->waiting_for_pick_batches_count < MAX_WAITING_BATCHES); + calld->waiting_for_pick_batches[calld->waiting_for_pick_batches_count++] = + batch; } -static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, - grpc_error *error) { - size_t i; - for (i = 0; i < calld->waiting_ops_count; i++) { +static void waiting_for_pick_batches_fail_locked(grpc_exec_ctx *exec_ctx, + call_data *calld, + grpc_error *error) { + for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { grpc_transport_stream_op_batch_finish_with_failure( - exec_ctx, calld->waiting_ops[i], GRPC_ERROR_REF(error)); + exec_ctx, calld->waiting_for_pick_batches[i], GRPC_ERROR_REF(error)); } - calld->waiting_ops_count = 0; + calld->waiting_for_pick_batches_count = 0; GRPC_ERROR_UNREF(error); } -static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { - if (calld->waiting_ops_count == 0) { - return; - } - - call_or_error call = get_call_or_error(calld); - grpc_transport_stream_op_batch **ops = calld->waiting_ops; - size_t nops = calld->waiting_ops_count; - if (call.error != GRPC_ERROR_NONE) { - fail_locked(exec_ctx, calld, GRPC_ERROR_REF(call.error)); +static void waiting_for_pick_batches_resume_locked(grpc_exec_ctx *exec_ctx, + call_data *calld) { + if (calld->waiting_for_pick_batches_count == 0) return; + call_or_error coe = get_call_or_error(calld); + if (coe.error != GRPC_ERROR_NONE) { + waiting_for_pick_batches_fail_locked(exec_ctx, calld, + GRPC_ERROR_REF(coe.error)); return; } - calld->waiting_ops = NULL; - calld->waiting_ops_count = 0; - calld->waiting_ops_capacity = 0; - for (size_t i = 0; i < nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, call.subchannel_call, ops[i]); + for (size_t i = 0; i < calld->waiting_for_pick_batches_count; ++i) { + grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, + calld->waiting_for_pick_batches[i]); } - gpr_free(ops); + calld->waiting_for_pick_batches_count = 0; } -// Sets calld->method_params and calld->retry_throttle_data. -// If the method params specify a timeout, populates -// *per_method_deadline and returns true. -static bool set_call_method_params_from_service_config_locked( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - gpr_timespec *per_method_deadline) { +// Applies service config to the call. Must be invoked once we know +// that the resolver has returned results to the channel. +static void apply_service_config_to_call_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; if (chand->retry_throttle_data != NULL) { @@ -915,39 +878,48 @@ static bool set_call_method_params_from_service_config_locked( exec_ctx, chand->method_params_table, calld->path); if (calld->method_params != NULL) { method_parameters_ref(calld->method_params); - if (gpr_time_cmp(calld->method_params->timeout, + // If the deadline from the service config is shorter than the one + // from the client API, reset the deadline timer. + if (chand->deadline_checking_enabled && + gpr_time_cmp(calld->method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0) { - *per_method_deadline = + const gpr_timespec per_method_deadline = gpr_time_add(calld->call_start_time, calld->method_params->timeout); - return true; + if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { + calld->deadline = per_method_deadline; + grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); + } } } } - return false; } -static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem) { - /* apply service-config level configuration to the call (now that we're - * certain it exists) */ - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - gpr_timespec per_method_deadline; - if (set_call_method_params_from_service_config_locked(exec_ctx, elem, - &per_method_deadline)) { - // If the deadline from the service config is shorter than the one - // from the client API, reset the deadline timer. - if (chand->deadline_checking_enabled && - gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { - calld->deadline = per_method_deadline; - grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); - } +static void create_subchannel_call_locked(grpc_exec_ctx *exec_ctx, + call_data *calld, grpc_error *error) { + grpc_subchannel_call *subchannel_call = NULL; + const grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena, + .context = calld->subchannel_call_context}; + grpc_error *new_error = grpc_connected_subchannel_create_call( + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); + GPR_ASSERT(set_call_or_error( + calld, (call_or_error){.subchannel_call = subchannel_call})); + if (new_error != GRPC_ERROR_NONE) { + new_error = grpc_error_add_child(new_error, error); + waiting_for_pick_batches_fail_locked(exec_ctx, calld, new_error); + } else { + waiting_for_pick_batches_resume_locked(exec_ctx, calld); } + GRPC_ERROR_UNREF(error); } -static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, +static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, grpc_error *error) { - grpc_call_element *elem = arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; GPR_ASSERT(calld->pick_pending); @@ -956,6 +928,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, chand->interested_parties); call_or_error coe = get_call_or_error(calld); if (calld->connected_subchannel == NULL) { + // Failed to create subchannel. grpc_error *failure = error == GRPC_ERROR_NONE ? GRPC_ERROR_CREATE_FROM_STATIC_STRING( @@ -963,7 +936,7 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, : GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Failed to create subchannel", &error, 1); set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(failure)}); - fail_locked(exec_ctx, calld, failure); + waiting_for_pick_batches_fail_locked(exec_ctx, calld, failure); } else if (coe.error != GRPC_ERROR_NONE) { /* already cancelled before subchannel became ready */ grpc_error *child_errors[] = {error, coe.error}; @@ -977,29 +950,13 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error_set_int(cancellation_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_DEADLINE_EXCEEDED); } - fail_locked(exec_ctx, calld, cancellation_error); + waiting_for_pick_batches_fail_locked(exec_ctx, calld, cancellation_error); } else { /* Create call on subchannel. */ - grpc_subchannel_call *subchannel_call = NULL; - const grpc_connected_subchannel_call_args call_args = { - .pollent = calld->pollent, - .path = calld->path, - .start_time = calld->call_start_time, - .deadline = calld->deadline, - .arena = calld->arena, - .context = calld->subchannel_call_context}; - grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - GPR_ASSERT(set_call_or_error( - calld, (call_or_error){.subchannel_call = subchannel_call})); - if (new_error != GRPC_ERROR_NONE) { - new_error = grpc_error_add_child(new_error, error); - fail_locked(exec_ctx, calld, new_error); - } else { - retry_waiting_locked(exec_ctx, calld); - } + create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); + GRPC_ERROR_UNREF(error); } static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { @@ -1013,41 +970,32 @@ static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { } } +/** Return true if subchannel is available immediately (in which case + subchannel_ready_locked() should not be called), or false otherwise (in + which case subchannel_ready_locked() should be called when the subchannel + is available). */ +static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem); + typedef struct { - grpc_metadata_batch *initial_metadata; - uint32_t initial_metadata_flags; - grpc_connected_subchannel **connected_subchannel; - grpc_call_context_element *subchannel_call_context; - grpc_closure *on_ready; grpc_call_element *elem; + bool cancelled; grpc_closure closure; -} continue_picking_args; +} pick_after_resolver_result_args; -/** Return true if subchannel is available immediately (in which case on_ready - should not be called), or false otherwise (in which case on_ready should be - called when the subchannel is available). */ -static bool pick_subchannel_locked( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready); - -static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - continue_picking_args *cpa = arg; - if (cpa->connected_subchannel == NULL) { +static void continue_picking_after_resolver_result_locked( + grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + pick_after_resolver_result_args *args = arg; + if (args->cancelled) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { - GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); } else { - if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, - cpa->connected_subchannel, - cpa->subchannel_call_context, cpa->on_ready)) { - GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); + if (pick_subchannel_locked(exec_ctx, args->elem)) { + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_NONE); } } - gpr_free(cpa); + gpr_free(args); } static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, @@ -1059,39 +1007,85 @@ static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, &calld->connected_subchannel, GRPC_ERROR_REF(error)); } - for (grpc_closure *closure = chand->waiting_for_config_closures.head; + // If we don't yet have a resolver result, then a closure for + // continue_picking_after_resolver_result_locked() will have been added to + // chand->waiting_for_resolver_result_closures, and it may not be invoked + // until after this call has been destroyed. We mark the operation as + // cancelled, so that when continue_picking_after_resolver_result_locked() + // is called, it will be a no-op. We also immediately invoke + // subchannel_ready_locked() to propagate the error back to the caller. + for (grpc_closure *closure = chand->waiting_for_resolver_result_closures.head; closure != NULL; closure = closure->next_data.next) { - continue_picking_args *cpa = closure->cb_arg; - if (cpa->connected_subchannel == &calld->connected_subchannel) { - cpa->connected_subchannel = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); + pick_after_resolver_result_args *args = closure->cb_arg; + if (!args->cancelled && args->elem == elem) { + args->cancelled = true; + subchannel_ready_locked(exec_ctx, elem, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); } } GRPC_ERROR_UNREF(error); } -static bool pick_subchannel_locked( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_call_context_element *subchannel_call_context, - grpc_closure *on_ready) { - GPR_TIMER_BEGIN("pick_subchannel", 0); +// State for pick callback that holds a reference to the LB policy +// from which the pick was requested. +typedef struct { + grpc_lb_policy *lb_policy; + grpc_call_element *elem; + grpc_closure closure; +} pick_callback_args; + +// Callback invoked by grpc_lb_policy_pick_locked() for async picks. +// Unrefs the LB policy after invoking subchannel_ready_locked(). +static void pick_callback_done_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + pick_callback_args *args = arg; + GPR_ASSERT(args != NULL); + GPR_ASSERT(args->lb_policy != NULL); + subchannel_ready_locked(exec_ctx, args->elem, GRPC_ERROR_REF(error)); + GRPC_LB_POLICY_UNREF(exec_ctx, args->lb_policy, "pick_subchannel"); + gpr_free(args); +} +// Takes a ref to chand->lb_policy and calls grpc_lb_policy_pick_locked(). +// If the pick was completed synchronously, unrefs the LB policy and +// returns true. +static bool pick_callback_start_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_lb_policy_pick_args *inputs) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; + pick_callback_args *pick_args = gpr_zalloc(sizeof(*pick_args)); + GRPC_LB_POLICY_REF(chand->lb_policy, "pick_subchannel"); + pick_args->lb_policy = chand->lb_policy; + pick_args->elem = elem; + GRPC_CLOSURE_INIT(&pick_args->closure, pick_callback_done_locked, pick_args, + grpc_combiner_scheduler(chand->combiner)); + const bool pick_done = grpc_lb_policy_pick_locked( + exec_ctx, chand->lb_policy, inputs, &calld->connected_subchannel, + calld->subchannel_call_context, NULL, &pick_args->closure); + if (pick_done) { + /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ + GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "pick_subchannel"); + gpr_free(pick_args); + } + return pick_done; +} - GPR_ASSERT(connected_subchannel); - +static bool pick_subchannel_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + GPR_TIMER_BEGIN("pick_subchannel", 0); + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + bool pick_done = false; if (chand->lb_policy != NULL) { - apply_final_configuration_locked(exec_ctx, elem); - grpc_lb_policy *lb_policy = chand->lb_policy; - GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); + apply_service_config_to_call_locked(exec_ctx, elem); // If the application explicitly set wait_for_ready, use that. // Otherwise, if the service config specified a value for this // method, use that. + uint32_t initial_metadata_flags = + calld->initial_metadata_payload->send_initial_metadata + .send_initial_metadata_flags; const bool wait_for_ready_set_from_api = initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; @@ -1107,78 +1101,57 @@ static bool pick_subchannel_locked( } } const grpc_lb_policy_pick_args inputs = { - initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem}; - - // Wrap the user-provided callback in order to hold a strong reference to - // the LB policy for the duration of the pick. - wrapped_on_pick_closure_arg *w_on_pick_arg = - gpr_zalloc(sizeof(*w_on_pick_arg)); - GRPC_CLOSURE_INIT(&w_on_pick_arg->wrapper_closure, - wrapped_on_pick_closure_cb, w_on_pick_arg, - grpc_schedule_on_exec_ctx); - w_on_pick_arg->wrapped_closure = on_ready; - GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping"); - w_on_pick_arg->lb_policy = lb_policy; - const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, lb_policy, &inputs, connected_subchannel, - subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure); - if (pick_done) { - /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ - GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy, - "pick_subchannel_wrapping"); - gpr_free(w_on_pick_arg); + calld->initial_metadata_payload->send_initial_metadata + .send_initial_metadata, + initial_metadata_flags, &calld->lb_token_mdelem}; + pick_done = pick_callback_start_locked(exec_ctx, elem, &inputs); + } else if (chand->resolver != NULL) { + if (!chand->started_resolving) { + chand->started_resolving = true; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); - GPR_TIMER_END("pick_subchannel", 0); - return pick_done; - } - if (chand->resolver != NULL && !chand->started_resolving) { - chand->started_resolving = true; - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next_locked(exec_ctx, chand->resolver, - &chand->resolver_result, - &chand->on_resolver_result_changed); - } - if (chand->resolver != NULL) { - continue_picking_args *cpa = gpr_malloc(sizeof(*cpa)); - cpa->initial_metadata = initial_metadata; - cpa->initial_metadata_flags = initial_metadata_flags; - cpa->connected_subchannel = connected_subchannel; - cpa->subchannel_call_context = subchannel_call_context; - cpa->on_ready = on_ready; - cpa->elem = elem; - GRPC_CLOSURE_INIT(&cpa->closure, continue_picking_locked, cpa, + pick_after_resolver_result_args *args = + (pick_after_resolver_result_args *)gpr_zalloc(sizeof(*args)); + args->elem = elem; + GRPC_CLOSURE_INIT(&args->closure, + continue_picking_after_resolver_result_locked, args, grpc_combiner_scheduler(chand->combiner)); - grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, - GRPC_ERROR_NONE); + grpc_closure_list_append(&chand->waiting_for_resolver_result_closures, + &args->closure, GRPC_ERROR_NONE); } else { - GRPC_CLOSURE_SCHED(exec_ctx, on_ready, - GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); + subchannel_ready_locked( + exec_ctx, elem, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Disconnected")); } - GPR_TIMER_END("pick_subchannel", 0); - return false; + return pick_done; } -static void start_transport_stream_op_batch_locked_inner( - grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, - grpc_call_element *elem) { - channel_data *chand = elem->channel_data; +static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, + void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); + grpc_transport_stream_op_batch *op = arg; + grpc_call_element *elem = op->handler_private.extra_arg; call_data *calld = elem->call_data; - + channel_data *chand = elem->channel_data; /* need to recheck that another thread hasn't set the call */ call_or_error coe = get_call_or_error(calld); if (coe.error != GRPC_ERROR_NONE) { grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(coe.error)); - /* early out */ - return; + goto done; } if (coe.subchannel_call != NULL) { grpc_subchannel_call_process_op(exec_ctx, coe.subchannel_call, op); - /* early out */ - return; + goto done; } + // Add to waiting-for-pick list. If we succeed in getting a + // subchannel call below, we'll handle this batch (along with any + // other waiting batches) in waiting_for_pick_batches_resume_locked(). + waiting_for_pick_batches_add_locked(calld, op); /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_stream) { grpc_error *error = op->payload->cancel_stream.cancel_error; @@ -1190,30 +1163,22 @@ static void start_transport_stream_op_batch_locked_inner( set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); if (calld->pick_pending) { cancel_pick_locked(exec_ctx, elem, GRPC_ERROR_REF(error)); - } else { - fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); } - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, - GRPC_ERROR_REF(error)); - /* early out */ - return; + waiting_for_pick_batches_fail_locked(exec_ctx, calld, + GRPC_ERROR_REF(error)); + goto done; } /* if we don't have a subchannel, try to get one */ if (!calld->pick_pending && calld->connected_subchannel == NULL && op->send_initial_metadata) { + calld->initial_metadata_payload = op->payload; calld->pick_pending = true; - GRPC_CLOSURE_INIT(&calld->next_step, subchannel_ready_locked, elem, - grpc_combiner_scheduler(chand->combiner)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ - if (pick_subchannel_locked( - exec_ctx, elem, - op->payload->send_initial_metadata.send_initial_metadata, - op->payload->send_initial_metadata.send_initial_metadata_flags, - &calld->connected_subchannel, calld->subchannel_call_context, - &calld->next_step)) { + if (pick_subchannel_locked(exec_ctx, elem)) { + // Pick was returned synchronously. calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); if (calld->connected_subchannel == NULL) { @@ -1221,42 +1186,20 @@ static void start_transport_stream_op_batch_locked_inner( "Call dropped by load balancing policy"); set_call_or_error(calld, (call_or_error){.error = GRPC_ERROR_REF(error)}); - fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - return; // Early out. + waiting_for_pick_batches_fail_locked(exec_ctx, calld, error); + } else { + // Create subchannel call. + create_subchannel_call_locked(exec_ctx, calld, GRPC_ERROR_NONE); } } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); } } - /* if we've got a subchannel, then let's ask it to create a call */ - if (!calld->pick_pending && calld->connected_subchannel != NULL) { - grpc_subchannel_call *subchannel_call = NULL; - const grpc_connected_subchannel_call_args call_args = { - .pollent = calld->pollent, - .path = calld->path, - .start_time = calld->call_start_time, - .deadline = calld->deadline, - .arena = calld->arena, - .context = calld->subchannel_call_context}; - grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); - GPR_ASSERT(set_call_or_error( - calld, (call_or_error){.subchannel_call = subchannel_call})); - if (error != GRPC_ERROR_NONE) { - fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); - grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); - } else { - retry_waiting_locked(exec_ctx, calld); - /* recurse to retry */ - start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); - } - /* early out */ - return; - } - /* nothing to be done but wait */ - add_waiting_locked(calld, op); +done: + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "start_transport_stream_op_batch"); + GPR_TIMER_END("start_transport_stream_op_batch_locked", 0); } static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -1279,30 +1222,6 @@ static void on_complete(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GRPC_ERROR_REF(error)); } -static void start_transport_stream_op_batch_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error_ignored) { - GPR_TIMER_BEGIN("start_transport_stream_op_batch_locked", 0); - - grpc_transport_stream_op_batch *op = arg; - grpc_call_element *elem = op->handler_private.extra_arg; - call_data *calld = elem->call_data; - - if (op->recv_trailing_metadata) { - GPR_ASSERT(op->on_complete != NULL); - calld->original_on_complete = op->on_complete; - GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, - grpc_schedule_on_exec_ctx); - op->on_complete = &calld->on_complete; - } - - start_transport_stream_op_batch_locked_inner(exec_ctx, op, elem); - - GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, - "start_transport_stream_op_batch"); - GPR_TIMER_END("start_transport_stream_op_batch_locked", 0); -} - /* The logic here is fairly complicated, due to (a) the fact that we need to handle the case where we receive the send op before the initial metadata op, and (b) the need for efficiency, especially in @@ -1321,6 +1240,15 @@ static void cc_start_transport_stream_op_batch( grpc_deadline_state_client_start_transport_stream_op_batch(exec_ctx, elem, op); } + // Intercept on_complete for recv_trailing_metadata so that we can + // check retry throttle status. + if (op->recv_trailing_metadata) { + GPR_ASSERT(op->on_complete != NULL); + calld->original_on_complete = op->on_complete; + GRPC_CLOSURE_INIT(&calld->on_complete, on_complete, elem, + grpc_schedule_on_exec_ctx); + op->on_complete = &calld->on_complete; + } /* try to (atomically) get the call */ call_or_error coe = get_call_or_error(calld); GPR_TIMER_BEGIN("cc_start_transport_stream_op_batch", 0); @@ -1390,7 +1318,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, "client_channel_destroy_call"); } GPR_ASSERT(!calld->pick_pending); - GPR_ASSERT(calld->waiting_ops_count == 0); + GPR_ASSERT(calld->waiting_for_pick_batches_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, "picked"); @@ -1401,7 +1329,6 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, calld->subchannel_call_context[i].value); } } - gpr_free(calld->waiting_ops); GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index 8e73d606aa..a311334d13 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -74,7 +74,9 @@ static void fake_resolver_shutdown_locked(grpc_exec_ctx* exec_ctx, fake_resolver* r = (fake_resolver*)resolver; if (r->next_completion != NULL) { *r->target_result = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, r->next_completion, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown")); r->next_completion = NULL; } } diff --git a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c index b5d2e5c92b..7b4fe38272 100644 --- a/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/sockaddr/sockaddr_resolver.c @@ -73,7 +73,9 @@ static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r = (sockaddr_resolver *)resolver; if (r->next_completion != NULL) { *r->target_result = NULL; - GRPC_CLOSURE_SCHED(exec_ctx, r->next_completion, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED( + exec_ctx, r->next_completion, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("Resolver Shutdown")); r->next_completion = NULL; } } diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c deleted file mode 100644 index 0fb64ed001..0000000000 --- a/src/core/lib/support/stack_lockfree.c +++ /dev/null @@ -1,137 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "src/core/lib/support/stack_lockfree.h" - -#include <stdlib.h> -#include <string.h> - -#include <grpc/support/alloc.h> -#include <grpc/support/atm.h> -#include <grpc/support/log.h> -#include <grpc/support/port_platform.h> - -/* The lockfree node structure is a single architecture-level - word that allows for an atomic CAS to set it up. */ -struct lockfree_node_contents { - /* next thing to look at. Actual index for head, next index otherwise */ - uint16_t index; -#ifdef GPR_ARCH_64 - uint16_t pad; - uint32_t aba_ctr; -#else -#ifdef GPR_ARCH_32 - uint16_t aba_ctr; -#else -#error Unsupported bit width architecture -#endif -#endif -}; - -/* Use a union to make sure that these are in the same bits as an atm word */ -typedef union lockfree_node { - gpr_atm atm; - struct lockfree_node_contents contents; -} lockfree_node; - -/* make sure that entries aligned to 8-bytes */ -#define ENTRY_ALIGNMENT_BITS 3 -/* reserve this entry as invalid */ -#define INVALID_ENTRY_INDEX ((1 << 16) - 1) - -struct gpr_stack_lockfree { - lockfree_node *entries; - lockfree_node head; /* An atomic entry describing curr head */ -}; - -gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) { - gpr_stack_lockfree *stack; - stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack)); - /* Since we only allocate 16 bits to represent an entry number, - * make sure that we are within the desired range */ - /* Reserve the highest entry number as a dummy */ - GPR_ASSERT(entries < INVALID_ENTRY_INDEX); - stack->entries = (lockfree_node *)gpr_malloc_aligned( - entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS); - /* Clear out all entries */ - memset(stack->entries, 0, entries * sizeof(stack->entries[0])); - memset(&stack->head, 0, sizeof(stack->head)); - - GPR_ASSERT(sizeof(stack->entries->atm) == sizeof(stack->entries->contents)); - - /* Point the head at reserved dummy entry */ - stack->head.contents.index = INVALID_ENTRY_INDEX; -/* Fill in the pad and aba_ctr to avoid confusing memcheck tools */ -#ifdef GPR_ARCH_64 - stack->head.contents.pad = 0; -#endif - stack->head.contents.aba_ctr = 0; - return stack; -} - -void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack) { - gpr_free_aligned(stack->entries); - gpr_free(stack); -} - -int gpr_stack_lockfree_push(gpr_stack_lockfree *stack, int entry) { - lockfree_node head; - lockfree_node newhead; - lockfree_node curent; - lockfree_node newent; - - /* First fill in the entry's index and aba ctr for new head */ - newhead.contents.index = (uint16_t)entry; -#ifdef GPR_ARCH_64 - /* Fill in the pad to avoid confusing memcheck tools */ - newhead.contents.pad = 0; -#endif - - /* Also post-increment the aba_ctr */ - curent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); - newhead.contents.aba_ctr = ++curent.contents.aba_ctr; - gpr_atm_no_barrier_store(&stack->entries[entry].atm, curent.atm); - - do { - /* Atomically get the existing head value for use */ - head.atm = gpr_atm_no_barrier_load(&(stack->head.atm)); - /* Point to it */ - newent.atm = gpr_atm_no_barrier_load(&stack->entries[entry].atm); - newent.contents.index = head.contents.index; - gpr_atm_no_barrier_store(&stack->entries[entry].atm, newent.atm); - } while (!gpr_atm_rel_cas(&(stack->head.atm), head.atm, newhead.atm)); - /* Use rel_cas above to make sure that entry index is set properly */ - return head.contents.index == INVALID_ENTRY_INDEX; -} - -int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack) { - lockfree_node head; - lockfree_node newhead; - - do { - head.atm = gpr_atm_acq_load(&(stack->head.atm)); - if (head.contents.index == INVALID_ENTRY_INDEX) { - return -1; - } - newhead.atm = - gpr_atm_no_barrier_load(&(stack->entries[head.contents.index].atm)); - - } while (!gpr_atm_no_barrier_cas(&(stack->head.atm), head.atm, newhead.atm)); - - return head.contents.index; -} diff --git a/src/core/lib/support/stack_lockfree.h b/src/core/lib/support/stack_lockfree.h deleted file mode 100644 index 6324211b72..0000000000 --- a/src/core/lib/support/stack_lockfree.h +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#ifndef GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H -#define GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H - -#include <stddef.h> - -typedef struct gpr_stack_lockfree gpr_stack_lockfree; - -/* This stack must specify the maximum number of entries to track. - The current implementation only allows up to 65534 entries */ -gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries); -void gpr_stack_lockfree_destroy(gpr_stack_lockfree *stack); - -/* Pass in a valid entry number for the next stack entry */ -/* Returns 1 if this is the first element on the stack, 0 otherwise */ -int gpr_stack_lockfree_push(gpr_stack_lockfree *, int entry); - -/* Returns -1 on empty or the actual entry number */ -int gpr_stack_lockfree_pop(gpr_stack_lockfree *stack); - -#endif /* GRPC_CORE_LIB_SUPPORT_STACK_LOCKFREE_H */ diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py index 48782174a7..c194004991 100644 --- a/src/python/grpcio/grpc_core_dependencies.py +++ b/src/python/grpcio/grpc_core_dependencies.py @@ -39,7 +39,6 @@ CORE_SOURCE_FILES = [ 'src/core/lib/support/log_windows.c', 'src/core/lib/support/mpscq.c', 'src/core/lib/support/murmur_hash.c', - 'src/core/lib/support/stack_lockfree.c', 'src/core/lib/support/string.c', 'src/core/lib/support/string_posix.c', 'src/core/lib/support/string_util_windows.c', |