diff options
Diffstat (limited to 'src/core')
76 files changed, 2344 insertions, 771 deletions
diff --git a/src/core/ext/filters/client_channel/client_channel.c b/src/core/ext/filters/client_channel/client_channel.c index ce9abdad61..24843d52e9 100644 --- a/src/core/ext/filters/client_channel/client_channel.c +++ b/src/core/ext/filters/client_channel/client_channel.c @@ -96,17 +96,10 @@ static void method_parameters_unref(method_parameters *method_params) { } } -static void *method_parameters_copy(void *value) { - return method_parameters_ref(value); -} - static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { method_parameters_unref(value); } -static const grpc_slice_hash_table_vtable method_parameters_vtable = { - method_parameters_free, method_parameters_copy}; - static bool parse_wait_for_ready(grpc_json *field, wait_for_ready_value *wait_for_ready) { if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { @@ -407,26 +400,24 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(channel_arg->type == GRPC_ARG_STRING); lb_policy_name = channel_arg->value.string; } - // Special case: If all of the addresses are balancer addresses, - // assume that we should use the grpclb policy, regardless of what the - // resolver actually specified. + // Special case: If at least one balancer address is present, we use + // the grpclb policy, regardless of what the resolver actually specified. channel_arg = grpc_channel_args_find(chand->resolver_result, GRPC_ARG_LB_ADDRESSES); if (channel_arg != NULL && channel_arg->type == GRPC_ARG_POINTER) { grpc_lb_addresses *addresses = channel_arg->value.pointer.p; - bool found_backend_address = false; + bool found_balancer_address = false; for (size_t i = 0; i < addresses->num_addresses; ++i) { - if (!addresses->addresses[i].is_balancer) { - found_backend_address = true; + if (addresses->addresses[i].is_balancer) { + found_balancer_address = true; break; } } - if (!found_backend_address) { + if (found_balancer_address) { if (lb_policy_name != NULL && strcmp(lb_policy_name, "grpclb") != 0) { gpr_log(GPR_INFO, - "resolver requested LB policy %s but provided only balancer " - "addresses, no backend addresses -- forcing use of grpclb LB " - "policy", + "resolver requested LB policy %s but provided at least one " + "balancer address -- forcing use of grpclb LB policy", lb_policy_name); } lb_policy_name = "grpclb"; @@ -472,7 +463,7 @@ static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, grpc_uri_destroy(uri); method_params_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, method_parameters_create_from_json, - &method_parameters_vtable); + method_parameters_free); grpc_service_config_destroy(service_config); } } @@ -769,12 +760,6 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, #define CANCELLED_CALL ((grpc_subchannel_call *)1) -typedef enum { - /* zero so that it can be default-initialized */ - GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING = 0, - GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL -} subchannel_creation_phase; - /** Call data. Holds a pointer to grpc_subchannel_call and the associated machinery to create such a pointer. Handles queueing of stream ops until a call object is ready, waiting @@ -802,8 +787,9 @@ typedef struct client_channel_call_data { gpr_atm subchannel_call; gpr_arena *arena; - subchannel_creation_phase creation_phase; + bool pick_pending; grpc_connected_subchannel *connected_subchannel; + grpc_call_context_element subchannel_call_context[GRPC_CONTEXT_COUNT]; grpc_polling_entity *pollent; grpc_transport_stream_op_batch **waiting_ops; @@ -923,11 +909,10 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_call_element *elem = arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - GPR_ASSERT(calld->creation_phase == - GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); + GPR_ASSERT(calld->pick_pending); + calld->pick_pending = false; grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, chand->interested_parties); - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (calld->connected_subchannel == NULL) { gpr_atm_no_barrier_store(&calld->subchannel_call, 1); fail_locked(exec_ctx, calld, @@ -953,7 +938,8 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, .path = calld->path, .start_time = calld->call_start_time, .deadline = calld->deadline, - .arena = calld->arena}; + .arena = calld->arena, + .context = calld->subchannel_call_context}; grpc_error *new_error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); gpr_atm_rel_store(&calld->subchannel_call, @@ -982,6 +968,7 @@ typedef struct { grpc_metadata_batch *initial_metadata; uint32_t initial_metadata_flags; grpc_connected_subchannel **connected_subchannel; + grpc_call_context_element *subchannel_call_context; grpc_closure *on_ready; grpc_call_element *elem; grpc_closure closure; @@ -993,8 +980,8 @@ typedef struct { static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, - grpc_error *error); + grpc_connected_subchannel **connected_subchannel, + grpc_call_context_element *subchannel_call_context, grpc_closure *on_ready); static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -1006,49 +993,49 @@ static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, - cpa->connected_subchannel, cpa->on_ready, - GRPC_ERROR_NONE)) { + cpa->connected_subchannel, + cpa->subchannel_call_context, cpa->on_ready)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } } gpr_free(cpa); } +static void cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_error *error) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->lb_policy != NULL) { + grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, + &calld->connected_subchannel, + GRPC_ERROR_REF(error)); + } + for (grpc_closure *closure = chand->waiting_for_config_closures.head; + closure != NULL; closure = closure->next_data.next) { + continue_picking_args *cpa = closure->cb_arg; + if (cpa->connected_subchannel == &calld->connected_subchannel) { + cpa->connected_subchannel = NULL; + grpc_closure_sched(exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( + "Pick cancelled", &error, 1)); + } + } + GRPC_ERROR_UNREF(error); +} + static bool pick_subchannel_locked( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, - grpc_error *error) { + grpc_connected_subchannel **connected_subchannel, + grpc_call_context_element *subchannel_call_context, + grpc_closure *on_ready) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; - continue_picking_args *cpa; - grpc_closure *closure; GPR_ASSERT(connected_subchannel); - if (initial_metadata == NULL) { - if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, - connected_subchannel, - GRPC_ERROR_REF(error)); - } - for (closure = chand->waiting_for_config_closures.head; closure != NULL; - closure = closure->next_data.next) { - cpa = closure->cb_arg; - if (cpa->connected_subchannel == connected_subchannel) { - cpa->connected_subchannel = NULL; - grpc_closure_sched(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( - "Pick cancelled", &error, 1)); - } - } - GPR_TIMER_END("pick_subchannel", 0); - GRPC_ERROR_UNREF(error); - return true; - } - GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { apply_final_configuration_locked(exec_ctx, elem); grpc_lb_policy *lb_policy = chand->lb_policy; @@ -1071,8 +1058,7 @@ static bool pick_subchannel_locked( } } const grpc_lb_policy_pick_args inputs = { - initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, - gpr_inf_future(GPR_CLOCK_MONOTONIC)}; + initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem}; // Wrap the user-provided callback in order to hold a strong reference to // the LB policy for the duration of the pick. @@ -1085,8 +1071,8 @@ static bool pick_subchannel_locked( GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel_wrapping"); w_on_pick_arg->lb_policy = lb_policy; const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, - &w_on_pick_arg->wrapper_closure); + exec_ctx, lb_policy, &inputs, connected_subchannel, + subchannel_call_context, NULL, &w_on_pick_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the LB policy. */ GRPC_LB_POLICY_UNREF(exec_ctx, w_on_pick_arg->lb_policy, @@ -1105,10 +1091,11 @@ static bool pick_subchannel_locked( &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { - cpa = gpr_malloc(sizeof(*cpa)); + continue_picking_args *cpa = gpr_malloc(sizeof(*cpa)); cpa->initial_metadata = initial_metadata; cpa->initial_metadata_flags = initial_metadata_flags; cpa->connected_subchannel = connected_subchannel; + cpa->subchannel_call_context = subchannel_call_context; cpa->on_ready = on_ready; cpa->elem = elem; grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, @@ -1160,16 +1147,13 @@ static void start_transport_stream_op_batch_locked_inner( error to the caller when the first op does get passed down. */ calld->cancel_error = GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); - switch (calld->creation_phase) { - case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: - fail_locked(exec_ctx, calld, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - break; - case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: - pick_subchannel_locked( - exec_ctx, elem, NULL, 0, &calld->connected_subchannel, NULL, - GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); - break; + if (calld->pick_pending) { + cancel_pick_locked( + exec_ctx, elem, + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); + } else { + fail_locked(exec_ctx, calld, + GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error)); } grpc_transport_stream_op_batch_finish_with_failure( exec_ctx, op, @@ -1179,9 +1163,9 @@ static void start_transport_stream_op_batch_locked_inner( } } /* if we don't have a subchannel, try to get one */ - if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - calld->connected_subchannel == NULL && op->send_initial_metadata) { - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; + if (!calld->pick_pending && calld->connected_subchannel == NULL && + op->send_initial_metadata) { + calld->pick_pending = true; grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, grpc_combiner_scheduler(chand->combiner, true)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); @@ -1192,8 +1176,9 @@ static void start_transport_stream_op_batch_locked_inner( exec_ctx, elem, op->payload->send_initial_metadata.send_initial_metadata, op->payload->send_initial_metadata.send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step, GRPC_ERROR_NONE)) { - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; + &calld->connected_subchannel, calld->subchannel_call_context, + &calld->next_step)) { + calld->pick_pending = false; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { grpc_polling_entity_add_to_pollset_set(exec_ctx, calld->pollent, @@ -1201,15 +1186,15 @@ static void start_transport_stream_op_batch_locked_inner( } } /* if we've got a subchannel, then let's ask it to create a call */ - if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && - calld->connected_subchannel != NULL) { + if (!calld->pick_pending && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; const grpc_connected_subchannel_call_args call_args = { .pollent = calld->pollent, .path = calld->path, .start_time = calld->call_start_time, .deadline = calld->deadline, - .arena = calld->arena}; + .arena = calld->arena, + .context = calld->subchannel_call_context}; grpc_error *error = grpc_connected_subchannel_create_call( exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); gpr_atm_rel_store(&calld->subchannel_call, @@ -1358,12 +1343,18 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, then_schedule_closure = NULL; GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } - GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); + GPR_ASSERT(!calld->pick_pending); GPR_ASSERT(calld->waiting_ops_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, "picked"); } + for (size_t i = 0; i < GRPC_CONTEXT_COUNT; ++i) { + if (calld->subchannel_call_context[i].value != NULL) { + calld->subchannel_call_context[i].destroy( + calld->subchannel_call_context[i].value); + } + } gpr_free(calld->waiting_ops); grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } @@ -1459,12 +1450,12 @@ static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, void grpc_client_channel_watch_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_pollset *pollset, - grpc_connectivity_state *state, grpc_closure *on_complete) { + grpc_connectivity_state *state, grpc_closure *closure) { channel_data *chand = elem->channel_data; external_connectivity_watcher *w = gpr_malloc(sizeof(*w)); w->chand = chand; w->pollset = pollset; - w->on_complete = on_complete; + w->on_complete = closure; w->state = state; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, diff --git a/src/core/ext/filters/client_channel/lb_policy.c b/src/core/ext/filters/client_channel/lb_policy.c index 2d31499d13..112ba40658 100644 --- a/src/core/ext/filters/client_channel/lb_policy.c +++ b/src/core/ext/filters/client_channel/lb_policy.c @@ -119,9 +119,10 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target, - user_data, on_complete); + context, user_data, on_complete); } void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/filters/client_channel/lb_policy.h b/src/core/ext/filters/client_channel/lb_policy.h index 25427666ae..184b2ef720 100644 --- a/src/core/ext/filters/client_channel/lb_policy.h +++ b/src/core/ext/filters/client_channel/lb_policy.h @@ -43,9 +43,6 @@ typedef struct grpc_lb_policy grpc_lb_policy; typedef struct grpc_lb_policy_vtable grpc_lb_policy_vtable; -typedef void (*grpc_lb_completion)(void *cb_arg, grpc_subchannel *subchannel, - grpc_status_code status, const char *errmsg); - struct grpc_lb_policy { const grpc_lb_policy_vtable *vtable; gpr_atm ref_pair; @@ -65,8 +62,6 @@ typedef struct grpc_lb_policy_pick_args { uint32_t initial_metadata_flags; /** Storage for LB token in \a initial_metadata, or NULL if not used */ grpc_linked_mdelem *lb_token_mdelem_storage; - /** Deadline for the call to the LB server */ - gpr_timespec deadline; } grpc_lb_policy_pick_args; struct grpc_lb_policy_vtable { @@ -76,7 +71,8 @@ struct grpc_lb_policy_vtable { /** \see grpc_lb_policy_pick */ int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete); /** \see grpc_lb_policy_cancel_pick */ @@ -156,6 +152,8 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, \a target will be set to the selected subchannel, or NULL on failure. Upon success, \a user_data will be set to whatever opaque information may need to be propagated from the LB policy, or NULL if not needed. + \a context will be populated with context to pass to the subchannel + call, if needed. If the pick succeeds and a result is known immediately, a non-zero value will be returned. Otherwise, \a on_complete will be invoked @@ -167,6 +165,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete); /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c new file mode 100644 index 0000000000..67baa46de7 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.c @@ -0,0 +1,153 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" + +#include <grpc/support/atm.h> +#include <grpc/support/log.h> + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/profiling/timers.h" + +static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_channel_element_args *args) { + return GRPC_ERROR_NONE; +} + +static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem) {} + +typedef struct { + // Stats object to update. + grpc_grpclb_client_stats *client_stats; + // State for intercepting send_initial_metadata. + grpc_closure on_complete_for_send; + grpc_closure *original_on_complete_for_send; + bool send_initial_metadata_succeeded; + // State for intercepting recv_initial_metadata. + grpc_closure recv_initial_metadata_ready; + grpc_closure *original_recv_initial_metadata_ready; + bool recv_initial_metadata_succeeded; +} call_data; + +static void on_complete_for_send(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + call_data *calld = arg; + if (error == GRPC_ERROR_NONE) { + calld->send_initial_metadata_succeeded = true; + } + grpc_closure_run(exec_ctx, calld->original_on_complete_for_send, + GRPC_ERROR_REF(error)); +} + +static void recv_initial_metadata_ready(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + call_data *calld = arg; + if (error == GRPC_ERROR_NONE) { + calld->recv_initial_metadata_succeeded = true; + } + grpc_closure_run(exec_ctx, calld->original_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); +} + +static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + const grpc_call_element_args *args) { + call_data *calld = elem->call_data; + // Get stats object from context and take a ref. + GPR_ASSERT(args->context != NULL); + GPR_ASSERT(args->context[GRPC_GRPCLB_CLIENT_STATS].value != NULL); + calld->client_stats = grpc_grpclb_client_stats_ref( + args->context[GRPC_GRPCLB_CLIENT_STATS].value); + // Record call started. + grpc_grpclb_client_stats_add_call_started(calld->client_stats); + return GRPC_ERROR_NONE; +} + +static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + const grpc_call_final_info *final_info, + grpc_closure *ignored) { + call_data *calld = elem->call_data; + // Record call finished, optionally setting client_failed_to_send and + // received. + grpc_grpclb_client_stats_add_call_finished( + false /* drop_for_rate_limiting */, false /* drop_for_load_balancing */, + !calld->send_initial_metadata_succeeded /* client_failed_to_send */, + calld->recv_initial_metadata_succeeded /* known_received */, + calld->client_stats); + // All done, so unref the stats object. + grpc_grpclb_client_stats_unref(calld->client_stats); +} + +static void start_transport_stream_op_batch( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_transport_stream_op_batch *batch) { + call_data *calld = elem->call_data; + GPR_TIMER_BEGIN("clr_start_transport_stream_op_batch", 0); + // Intercept send_initial_metadata. + if (batch->send_initial_metadata) { + calld->original_on_complete_for_send = batch->on_complete; + grpc_closure_init(&calld->on_complete_for_send, on_complete_for_send, calld, + grpc_schedule_on_exec_ctx); + batch->on_complete = &calld->on_complete_for_send; + } + // Intercept recv_initial_metadata. + if (batch->recv_initial_metadata) { + calld->original_recv_initial_metadata_ready = + batch->payload->recv_initial_metadata.recv_initial_metadata_ready; + grpc_closure_init(&calld->recv_initial_metadata_ready, + recv_initial_metadata_ready, calld, + grpc_schedule_on_exec_ctx); + batch->payload->recv_initial_metadata.recv_initial_metadata_ready = + &calld->recv_initial_metadata_ready; + } + // Chain to next filter. + grpc_call_next_op(exec_ctx, elem, batch); + GPR_TIMER_END("clr_start_transport_stream_op_batch", 0); +} + +const grpc_channel_filter grpc_client_load_reporting_filter = { + start_transport_stream_op_batch, + grpc_channel_next_op, + sizeof(call_data), + init_call_elem, + grpc_call_stack_ignore_set_pollset_or_pollset_set, + destroy_call_elem, + 0, // sizeof(channel_data) + init_channel_elem, + destroy_channel_elem, + grpc_call_next_get_peer, + grpc_channel_next_get_info, + "client_load_reporting"}; diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h new file mode 100644 index 0000000000..28b313d874 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H + +#include "src/core/lib/channel/channel_stack.h" + +extern const grpc_channel_filter grpc_client_load_reporting_filter; + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_CLIENT_LOAD_REPORTING_FILTER_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c index a271d05ca8..695be4fdf2 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c @@ -95,8 +95,7 @@ headers. Therefore, sockaddr.h must always be included first */ #include "src/core/lib/iomgr/sockaddr.h" -#include <errno.h> - +#include <limits.h> #include <string.h> #include <grpc/byte_buffer_reader.h> @@ -108,13 +107,16 @@ #include "src/core/ext/filters/client_channel/client_channel.h" #include "src/core/ext/filters/client_channel/client_channel_factory.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h" +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -126,6 +128,7 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/channel_init.h" #include "src/core/lib/transport/static_metadata.h" #define GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS 20 @@ -147,6 +150,10 @@ static grpc_error *initial_metadata_add_lb_token( lb_token_mdelem_storage, lb_token); } +static void destroy_client_stats(void *arg) { + grpc_grpclb_client_stats_unref(arg); +} + typedef struct wrapped_rr_closure_arg { /* the closure instance using this struct as argument */ grpc_closure wrapper_closure; @@ -163,6 +170,13 @@ typedef struct wrapped_rr_closure_arg { * initial metadata */ grpc_connected_subchannel **target; + /* the context to be populated for the subchannel call */ + grpc_call_context_element *context; + + /* Stats for client-side load reporting. Note that this holds a + * reference, which must be either passed on via context or unreffed. */ + grpc_grpclb_client_stats *client_stats; + /* the LB token associated with the pick */ grpc_mdelem lb_token; @@ -202,6 +216,12 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, (void *)*wc_arg->target, (void *)wc_arg->rr_policy); abort(); } + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != NULL); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + } else { + grpc_grpclb_client_stats_unref(wc_arg->client_stats); } if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Unreffing RR %p", (void *)wc_arg->rr_policy); @@ -237,6 +257,7 @@ typedef struct pending_pick { static void add_pending_pick(pending_pick **root, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, + grpc_call_context_element *context, grpc_closure *on_complete) { pending_pick *pp = gpr_zalloc(sizeof(*pp)); pp->next = *root; @@ -244,6 +265,7 @@ static void add_pending_pick(pending_pick **root, pp->target = target; pp->wrapped_on_complete_arg.wrapped_closure = on_complete; pp->wrapped_on_complete_arg.target = target; + pp->wrapped_on_complete_arg.context = context; pp->wrapped_on_complete_arg.initial_metadata = pick_args->initial_metadata; pp->wrapped_on_complete_arg.lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; @@ -287,8 +309,8 @@ typedef struct glb_lb_policy { grpc_client_channel_factory *cc_factory; grpc_channel_args *args; - /** deadline for the LB's call */ - gpr_timespec deadline; + /** timeout in milliseconds for the LB call. 0 means no deadline. */ + int lb_call_timeout_ms; /** for communicating with the LB server */ grpc_channel *lb_channel; @@ -316,6 +338,10 @@ typedef struct glb_lb_policy { /************************************************************/ /* client data associated with the LB server communication */ /************************************************************/ + + /* Finished sending initial request. */ + grpc_closure lb_on_sent_initial_request; + /* Status from the LB server has been received. This signals the end of the LB * call. */ grpc_closure lb_on_server_status_received; @@ -348,6 +374,23 @@ typedef struct glb_lb_policy { /** LB call retry timer */ grpc_timer lb_call_retry_timer; + + bool initial_request_sent; + bool seen_initial_response; + + /* Stats for client-side load reporting. Should be unreffed and + * recreated whenever lb_call is replaced. */ + grpc_grpclb_client_stats *client_stats; + /* Interval and timer for next client load report. */ + gpr_timespec client_stats_report_interval; + grpc_timer client_load_report_timer; + bool client_load_report_timer_pending; + bool last_client_load_report_counters_were_zero; + /* Closure used for either the load report timer or the callback for + * completion of sending the load report. */ + grpc_closure client_load_report_closure; + /* Client load report message payload. */ + grpc_byte_buffer *client_load_report_payload; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -552,8 +595,8 @@ static bool pick_from_internal_rr_locked( grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { GPR_ASSERT(rr_policy != NULL); const bool pick_done = grpc_lb_policy_pick_locked( - exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token, - &wc_arg->wrapper_closure); + exec_ctx, rr_policy, pick_args, target, wc_arg->context, + (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { @@ -567,7 +610,12 @@ static bool pick_from_internal_rr_locked( pick_args->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); - gpr_free(wc_arg); + // Pass on client stats via context. Passes ownership of the reference. + GPR_ASSERT(wc_arg->client_stats != NULL); + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].value = wc_arg->client_stats; + wc_arg->context[GRPC_GRPCLB_CLIENT_STATS].destroy = destroy_client_stats; + + gpr_free(wc_arg->free_when_done); } /* else, the pending pick will be registered and taken care of by the * pending pick list inside the RR policy (glb_policy->rr_policy). @@ -690,6 +738,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_policy->pending_picks = pp->next; GRPC_LB_POLICY_REF(glb_policy->rr_policy, "rr_handover_pending_pick"); pp->wrapped_on_complete_arg.rr_policy = glb_policy->rr_policy; + pp->wrapped_on_complete_arg.client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Pending pick about to PICK from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); @@ -750,18 +800,11 @@ static void destroy_balancer_name(grpc_exec_ctx *exec_ctx, gpr_free(balancer_name); } -static void *copy_balancer_name(void *balancer_name) { - return gpr_strdup(balancer_name); -} - static grpc_slice_hash_table_entry targets_info_entry_create( const char *address, const char *balancer_name) { - static const grpc_slice_hash_table_vtable vtable = {destroy_balancer_name, - copy_balancer_name}; grpc_slice_hash_table_entry entry; entry.key = grpc_slice_from_copied_string(address); - entry.value = (void *)balancer_name; - entry.vtable = &vtable; + entry.value = gpr_strdup(balancer_name); return entry; } @@ -825,11 +868,8 @@ static char *get_lb_uri_target_addresses(grpc_exec_ctx *exec_ctx, uri_path); gpr_free(uri_path); - *targets_info = - grpc_slice_hash_table_create(num_grpclb_addrs, targets_info_entries); - for (size_t i = 0; i < num_grpclb_addrs; i++) { - grpc_slice_unref_internal(exec_ctx, targets_info_entries[i].key); - } + *targets_info = grpc_slice_hash_table_create( + num_grpclb_addrs, targets_info_entries, destroy_balancer_name); gpr_free(targets_info_entries); return target_uri_str; @@ -841,10 +881,10 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, /* Count the number of gRPC-LB addresses. There must be at least one. * TODO(roth): For now, we ignore non-balancer addresses, but in the * future, we may change the behavior such that we fall back to using - * the non-balancer addresses if we cannot reach any balancers. At that - * time, this should be changed to allow a list with no balancer addresses, - * since the resolver might fail to return a balancer address even when - * this is the right LB policy to use. */ + * the non-balancer addresses if we cannot reach any balancers. In the + * fallback case, we should use the LB policy indicated by + * GRPC_ARG_LB_POLICY_NAME (although if that specifies grpclb or is + * unset, we should default to pick_first). */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { @@ -874,9 +914,22 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_uri_destroy(uri); glb_policy->cc_factory = args->client_channel_factory; - glb_policy->args = grpc_channel_args_copy(args->args); GPR_ASSERT(glb_policy->cc_factory != NULL); + arg = grpc_channel_args_find(args->args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS); + glb_policy->lb_call_timeout_ms = + grpc_channel_arg_get_integer(arg, (grpc_integer_options){0, 0, INT_MAX}); + + // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args, + // since we use this to trigger the client_load_reporting filter. + grpc_arg new_arg; + new_arg.key = GRPC_ARG_LB_POLICY_NAME; + new_arg.type = GRPC_ARG_STRING; + new_arg.value.string = "grpclb"; + static const char *args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME}; + glb_policy->args = grpc_channel_args_copy_and_add_and_remove( + args->args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1); + grpc_slice_hash_table *targets_info = NULL; /* Create a client channel over them to communicate with a LB service */ char *lb_service_target_addresses = @@ -890,6 +943,8 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, lb_channel_args); gpr_free(lb_service_target_addresses); if (glb_policy->lb_channel == NULL) { + gpr_free((void *)glb_policy->server_name); + grpc_channel_args_destroy(exec_ctx, glb_policy->args); gpr_free(glb_policy); return NULL; } @@ -905,6 +960,9 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GPR_ASSERT(glb_policy->pending_pings == NULL); gpr_free((void *)glb_policy->server_name); grpc_channel_args_destroy(exec_ctx, glb_policy->args); + if (glb_policy->client_stats != NULL) { + grpc_grpclb_client_stats_unref(glb_policy->client_stats); + } grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -1021,7 +1079,8 @@ static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; @@ -1033,7 +1092,6 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - glb_policy->deadline = pick_args->deadline; bool pick_done; if (glb_policy->rr_policy != NULL) { @@ -1049,6 +1107,10 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_schedule_on_exec_ctx); wc_arg->rr_policy = glb_policy->rr_policy; wc_arg->target = target; + wc_arg->context = context; + GPR_ASSERT(glb_policy->client_stats != NULL); + wc_arg->client_stats = + grpc_grpclb_client_stats_ref(glb_policy->client_stats); wc_arg->wrapped_closure = on_complete; wc_arg->lb_token_mdelem_storage = pick_args->lb_token_mdelem_storage; wc_arg->initial_metadata = pick_args->initial_metadata; @@ -1062,7 +1124,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, "picks", (void *)(glb_policy)); } - add_pending_pick(&glb_policy->pending_picks, pick_args, target, + add_pending_pick(&glb_policy->pending_picks, pick_args, target, context, on_complete); if (!glb_policy->started_picking) { @@ -1103,6 +1165,104 @@ static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, exec_ctx, &glb_policy->state_tracker, current, notify); } +static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + +static void schedule_next_client_load_report(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + const gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + const gpr_timespec next_client_load_report_time = + gpr_time_add(now, glb_policy->client_stats_report_interval); + grpc_closure_init(&glb_policy->client_load_report_closure, + send_client_load_report_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_timer_init(exec_ctx, &glb_policy->client_load_report_timer, + next_client_load_report_time, + &glb_policy->client_load_report_closure, now); +} + +static void client_load_report_done_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + grpc_byte_buffer_destroy(glb_policy->client_load_report_payload); + glb_policy->client_load_report_payload = NULL; + if (error != GRPC_ERROR_NONE || glb_policy->lb_call == NULL) { + glb_policy->client_load_report_timer_pending = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "client_load_report"); + return; + } + schedule_next_client_load_report(exec_ctx, glb_policy); +} + +static void do_send_client_load_report_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message.send_message = glb_policy->client_load_report_payload; + grpc_closure_init(&glb_policy->client_load_report_closure, + client_load_report_done_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); + grpc_call_error call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, &op, 1, + &glb_policy->client_load_report_closure); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +static bool load_report_counters_are_zero(grpc_grpclb_request *request) { + return request->client_stats.num_calls_started == 0 && + request->client_stats.num_calls_finished == 0 && + request->client_stats.num_calls_finished_with_drop_for_rate_limiting == + 0 && + request->client_stats + .num_calls_finished_with_drop_for_load_balancing == 0 && + request->client_stats.num_calls_finished_with_client_failed_to_send == + 0 && + request->client_stats.num_calls_finished_known_received == 0; +} + +static void send_client_load_report_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + if (error == GRPC_ERROR_CANCELLED || glb_policy->lb_call == NULL) { + glb_policy->client_load_report_timer_pending = false; + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "client_load_report"); + return; + } + // Construct message payload. + GPR_ASSERT(glb_policy->client_load_report_payload == NULL); + grpc_grpclb_request *request = + grpc_grpclb_load_report_request_create(glb_policy->client_stats); + // Skip client load report if the counters were all zero in the last + // report and they are still zero in this one. + if (load_report_counters_are_zero(request)) { + if (glb_policy->last_client_load_report_counters_were_zero) { + grpc_grpclb_request_destroy(request); + schedule_next_client_load_report(exec_ctx, glb_policy); + return; + } + glb_policy->last_client_load_report_counters_were_zero = true; + } else { + glb_policy->last_client_load_report_counters_were_zero = false; + } + grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); + glb_policy->client_load_report_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + grpc_slice_unref_internal(exec_ctx, request_payload_slice); + grpc_grpclb_request_destroy(request); + // If we've already sent the initial request, then we can go ahead and + // sent the load report. Otherwise, we need to wait until the initial + // request has been sent to send this + // (see lb_on_sent_initial_request_locked() below). + if (glb_policy->initial_request_sent) { + do_send_client_load_report_locked(exec_ctx, glb_policy); + } +} + +static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error); static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, @@ -1117,13 +1277,24 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, * glb_policy->base.interested_parties, which is comprised of the polling * entities from \a client_channel. */ grpc_slice host = grpc_slice_from_copied_string(glb_policy->server_name); + gpr_timespec deadline = + glb_policy->lb_call_timeout_ms == 0 + ? gpr_inf_future(GPR_CLOCK_MONOTONIC) + : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(glb_policy->lb_call_timeout_ms, + GPR_TIMESPAN)); glb_policy->lb_call = grpc_channel_create_pollset_set_call( exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD, - &host, glb_policy->deadline, NULL); + &host, deadline, NULL); grpc_slice_unref_internal(exec_ctx, host); + if (glb_policy->client_stats != NULL) { + grpc_grpclb_client_stats_unref(glb_policy->client_stats); + } + glb_policy->client_stats = grpc_grpclb_client_stats_create(); + grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv); grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv); @@ -1135,6 +1306,9 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_slice_unref_internal(exec_ctx, request_payload_slice); grpc_grpclb_request_destroy(request); + grpc_closure_init(&glb_policy->lb_on_sent_initial_request, + lb_on_sent_initial_request_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); grpc_closure_init(&glb_policy->lb_on_server_status_received, lb_on_server_status_received_locked, glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); @@ -1148,6 +1322,10 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, GRPC_GRPCLB_RECONNECT_JITTER, GRPC_GRPCLB_MIN_CONNECT_TIMEOUT_SECONDS * 1000, GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS * 1000); + + glb_policy->initial_request_sent = false; + glb_policy->seen_initial_response = false; + glb_policy->last_client_load_report_counters_were_zero = false; } static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, @@ -1161,6 +1339,10 @@ static void lb_call_destroy_locked(grpc_exec_ctx *exec_ctx, grpc_byte_buffer_destroy(glb_policy->lb_request_payload); grpc_slice_unref_internal(exec_ctx, glb_policy->lb_call_status_details); + + if (!glb_policy->client_load_report_timer_pending) { + grpc_timer_cancel(exec_ctx, &glb_policy->client_load_report_timer); + } } /* @@ -1189,21 +1371,27 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, op->flags = 0; op->reserved = NULL; op++; - op->op = GRPC_OP_RECV_INITIAL_METADATA; op->data.recv_initial_metadata.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; - GPR_ASSERT(glb_policy->lb_request_payload != NULL); op->op = GRPC_OP_SEND_MESSAGE; op->data.send_message.send_message = glb_policy->lb_request_payload; op->flags = 0; op->reserved = NULL; op++; + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_sent_initial_request_locked() */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_sent_initial_request); + GPR_ASSERT(GRPC_CALL_OK == call_error); + op = ops; op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; op->data.recv_status_on_client.trailing_metadata = &glb_policy->lb_trailing_metadata_recv; @@ -1235,6 +1423,19 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(GRPC_CALL_OK == call_error); } +static void lb_on_sent_initial_request_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { + glb_lb_policy *glb_policy = arg; + glb_policy->initial_request_sent = true; + // If we attempted to send a client load report before the initial + // request was sent, send the load report now. + if (glb_policy->client_load_report_payload != NULL) { + do_send_client_load_report_locked(exec_ctx, glb_policy); + } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_response_received_locked"); +} + static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; @@ -1250,58 +1451,91 @@ static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); grpc_byte_buffer_destroy(glb_policy->lb_response_payload); - grpc_grpclb_serverlist *serverlist = - grpc_grpclb_response_parse_serverlist(response_slice); - if (serverlist != NULL) { - GPR_ASSERT(glb_policy->lb_call != NULL); - grpc_slice_unref_internal(exec_ctx, response_slice); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Serverlist with %lu servers received", - (unsigned long)serverlist->num_servers); - for (size_t i = 0; i < serverlist->num_servers; ++i) { - grpc_resolved_address addr; - parse_server(serverlist->servers[i], &addr); - char *ipport; - grpc_sockaddr_to_string(&ipport, &addr, false); - gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); - gpr_free(ipport); + + grpc_grpclb_initial_response *response = NULL; + if (!glb_policy->seen_initial_response && + (response = grpc_grpclb_initial_response_parse(response_slice)) != + NULL) { + if (response->has_client_stats_report_interval) { + glb_policy->client_stats_report_interval = + gpr_time_max(gpr_time_from_seconds(1, GPR_TIMESPAN), + grpc_grpclb_duration_to_timespec( + &response->client_stats_report_interval)); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "received initial LB response message; " + "client load reporting interval = %" PRId64 ".%09d sec", + glb_policy->client_stats_report_interval.tv_sec, + glb_policy->client_stats_report_interval.tv_nsec); } + /* take a weak ref (won't prevent calling of \a glb_shutdown() if the + * strong ref count goes to zero) to be unref'd in + * send_client_load_report() */ + glb_policy->client_load_report_timer_pending = true; + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "client_load_report"); + schedule_next_client_load_report(exec_ctx, glb_policy); + } else if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "received initial LB response message; " + "client load reporting NOT enabled"); } + grpc_grpclb_initial_response_destroy(response); + glb_policy->seen_initial_response = true; + } else { + grpc_grpclb_serverlist *serverlist = + grpc_grpclb_response_parse_serverlist(response_slice); + if (serverlist != NULL) { + GPR_ASSERT(glb_policy->lb_call != NULL); + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Serverlist with %lu servers received", + (unsigned long)serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + parse_server(serverlist->servers[i], &addr); + char *ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); + gpr_free(ipport); + } + } - /* update serverlist */ - if (serverlist->num_servers > 0) { - if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { + /* update serverlist */ + if (serverlist->num_servers > 0) { + if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, + serverlist)) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, + "Incoming server list identical to current, ignoring."); + } + grpc_grpclb_destroy_serverlist(serverlist); + } else { /* new serverlist */ + if (glb_policy->serverlist != NULL) { + /* dispose of the old serverlist */ + grpc_grpclb_destroy_serverlist(glb_policy->serverlist); + } + /* and update the copy in the glb_lb_policy instance. This + * serverlist instance will be destroyed either upon the next + * update or in glb_destroy() */ + glb_policy->serverlist = serverlist; + + rr_handover_locked(exec_ctx, glb_policy); + } + } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, - "Incoming server list identical to current, ignoring."); + "Received empty server list. Picks will stay pending until " + "a response with > 0 servers is received"); } grpc_grpclb_destroy_serverlist(serverlist); - } else { /* new serverlist */ - if (glb_policy->serverlist != NULL) { - /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(glb_policy->serverlist); - } - /* and update the copy in the glb_lb_policy instance. This serverlist - * instance will be destroyed either upon the next update or in - * glb_destroy() */ - glb_policy->serverlist = serverlist; - - rr_handover_locked(exec_ctx, glb_policy); - } - } else { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "Received empty server list. Picks will stay pending until a " - "response with > 0 servers is received"); } - grpc_grpclb_destroy_serverlist(glb_policy->serverlist); + } else { /* serverlist == NULL */ + gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); } - } else { /* serverlist == NULL */ - gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", - grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); - grpc_slice_unref_internal(exec_ctx, response_slice); } + grpc_slice_unref_internal(exec_ctx, response_slice); + if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; @@ -1413,9 +1647,29 @@ grpc_lb_policy_factory *grpc_glb_lb_factory_create() { } /* Plugin registration */ + +// Only add client_load_reporting filter if the grpclb LB policy is used. +static bool maybe_add_client_load_reporting_filter( + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { + const grpc_channel_args *args = + grpc_channel_stack_builder_get_channel_arguments(builder); + const grpc_arg *channel_arg = + grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME); + if (channel_arg != NULL && channel_arg->type == GRPC_ARG_STRING && + strcmp(channel_arg->value.string, "grpclb") == 0) { + return grpc_channel_stack_builder_append_filter( + builder, (const grpc_channel_filter *)arg, NULL, NULL); + } + return true; +} + void grpc_lb_policy_grpclb_init() { grpc_register_lb_policy(grpc_glb_lb_factory_create()); grpc_register_tracer("glb", &grpc_lb_glb_trace); + grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, + GRPC_CHANNEL_INIT_BUILTIN_PRIORITY, + maybe_add_client_load_reporting_filter, + (void *)&grpc_client_load_reporting_filter); } void grpc_lb_policy_grpclb_shutdown() {} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c new file mode 100644 index 0000000000..444c03b9aa --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.c @@ -0,0 +1,133 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" + +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> +#include <grpc/support/useful.h> + +#include "src/core/lib/channel/channel_args.h" + +#define GRPC_ARG_GRPCLB_CLIENT_STATS "grpc.grpclb_client_stats" + +struct grpc_grpclb_client_stats { + gpr_refcount refs; + gpr_atm num_calls_started; + gpr_atm num_calls_finished; + gpr_atm num_calls_finished_with_drop_for_rate_limiting; + gpr_atm num_calls_finished_with_drop_for_load_balancing; + gpr_atm num_calls_finished_with_client_failed_to_send; + gpr_atm num_calls_finished_known_received; +}; + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_create() { + grpc_grpclb_client_stats* client_stats = gpr_zalloc(sizeof(*client_stats)); + gpr_ref_init(&client_stats->refs, 1); + return client_stats; +} + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( + grpc_grpclb_client_stats* client_stats) { + gpr_ref(&client_stats->refs); + return client_stats; +} + +void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats) { + if (gpr_unref(&client_stats->refs)) { + gpr_free(client_stats); + } +} + +void grpc_grpclb_client_stats_add_call_started( + grpc_grpclb_client_stats* client_stats) { + gpr_atm_full_fetch_add(&client_stats->num_calls_started, (gpr_atm)1); +} + +void grpc_grpclb_client_stats_add_call_finished( + bool finished_with_drop_for_rate_limiting, + bool finished_with_drop_for_load_balancing, + bool finished_with_client_failed_to_send, bool finished_known_received, + grpc_grpclb_client_stats* client_stats) { + gpr_atm_full_fetch_add(&client_stats->num_calls_finished, (gpr_atm)1); + if (finished_with_drop_for_rate_limiting) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_drop_for_rate_limiting, + (gpr_atm)1); + } + if (finished_with_drop_for_load_balancing) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_drop_for_load_balancing, + (gpr_atm)1); + } + if (finished_with_client_failed_to_send) { + gpr_atm_full_fetch_add( + &client_stats->num_calls_finished_with_client_failed_to_send, + (gpr_atm)1); + } + if (finished_known_received) { + gpr_atm_full_fetch_add(&client_stats->num_calls_finished_known_received, + (gpr_atm)1); + } +} + +static void atomic_get_and_reset_counter(int64_t* value, gpr_atm* counter) { + *value = (int64_t)gpr_atm_acq_load(counter); + gpr_atm_full_fetch_add(counter, (gpr_atm)(-*value)); +} + +void grpc_grpclb_client_stats_get( + grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, + int64_t* num_calls_finished, + int64_t* num_calls_finished_with_drop_for_rate_limiting, + int64_t* num_calls_finished_with_drop_for_load_balancing, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received) { + atomic_get_and_reset_counter(num_calls_started, + &client_stats->num_calls_started); + atomic_get_and_reset_counter(num_calls_finished, + &client_stats->num_calls_finished); + atomic_get_and_reset_counter( + num_calls_finished_with_drop_for_rate_limiting, + &client_stats->num_calls_finished_with_drop_for_rate_limiting); + atomic_get_and_reset_counter( + num_calls_finished_with_drop_for_load_balancing, + &client_stats->num_calls_finished_with_drop_for_load_balancing); + atomic_get_and_reset_counter( + num_calls_finished_with_client_failed_to_send, + &client_stats->num_calls_finished_with_client_failed_to_send); + atomic_get_and_reset_counter( + num_calls_finished_known_received, + &client_stats->num_calls_finished_known_received); +} diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h new file mode 100644 index 0000000000..0af4a919f8 --- /dev/null +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h @@ -0,0 +1,65 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H +#define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H + +#include <stdbool.h> + +#include <grpc/impl/codegen/grpc_types.h> + +typedef struct grpc_grpclb_client_stats grpc_grpclb_client_stats; + +grpc_grpclb_client_stats* grpc_grpclb_client_stats_create(); +grpc_grpclb_client_stats* grpc_grpclb_client_stats_ref( + grpc_grpclb_client_stats* client_stats); +void grpc_grpclb_client_stats_unref(grpc_grpclb_client_stats* client_stats); + +void grpc_grpclb_client_stats_add_call_started( + grpc_grpclb_client_stats* client_stats); +void grpc_grpclb_client_stats_add_call_finished( + bool finished_with_drop_for_rate_limiting, + bool finished_with_drop_for_load_balancing, + bool finished_with_client_failed_to_send, bool finished_known_received, + grpc_grpclb_client_stats* client_stats); + +void grpc_grpclb_client_stats_get( + grpc_grpclb_client_stats* client_stats, int64_t* num_calls_started, + int64_t* num_calls_finished, + int64_t* num_calls_finished_with_drop_for_rate_limiting, + int64_t* num_calls_finished_with_drop_for_load_balancing, + int64_t* num_calls_finished_with_client_failed_to_send, + int64_t* num_calls_finished_known_received); + +#endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_GRPCLB_GRPCLB_CLIENT_STATS_H \ + */ diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c index 10af252531..81b6932fae 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.c @@ -80,15 +80,45 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name) { grpc_grpclb_request *req = gpr_malloc(sizeof(grpc_grpclb_request)); - - req->has_client_stats = 0; /* TODO(dgq): add support for stats once defined */ - req->has_initial_request = 1; - req->initial_request.has_name = 1; + req->has_client_stats = false; + req->has_initial_request = true; + req->initial_request.has_name = true; strncpy(req->initial_request.name, lb_service_name, GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH); return req; } +static void populate_timestamp(gpr_timespec timestamp, + struct _grpc_lb_v1_Timestamp *timestamp_pb) { + timestamp_pb->has_seconds = true; + timestamp_pb->seconds = timestamp.tv_sec; + timestamp_pb->has_nanos = true; + timestamp_pb->nanos = timestamp.tv_nsec; +} + +grpc_grpclb_request *grpc_grpclb_load_report_request_create( + grpc_grpclb_client_stats *client_stats) { + grpc_grpclb_request *req = gpr_zalloc(sizeof(grpc_grpclb_request)); + req->has_client_stats = true; + req->client_stats.has_timestamp = true; + populate_timestamp(gpr_now(GPR_CLOCK_REALTIME), &req->client_stats.timestamp); + req->client_stats.has_num_calls_started = true; + req->client_stats.has_num_calls_finished = true; + req->client_stats.has_num_calls_finished_with_drop_for_rate_limiting = true; + req->client_stats.has_num_calls_finished_with_drop_for_load_balancing = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_with_client_failed_to_send = true; + req->client_stats.has_num_calls_finished_known_received = true; + grpc_grpclb_client_stats_get( + client_stats, &req->client_stats.num_calls_started, + &req->client_stats.num_calls_finished, + &req->client_stats.num_calls_finished_with_drop_for_rate_limiting, + &req->client_stats.num_calls_finished_with_drop_for_load_balancing, + &req->client_stats.num_calls_finished_with_client_failed_to_send, + &req->client_stats.num_calls_finished_known_received); + return req; +} + grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { size_t encoded_length; pb_ostream_t sizestream; @@ -98,7 +128,7 @@ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) { pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request); encoded_length = sizestream.bytes_written; - slice = grpc_slice_malloc(encoded_length); + slice = GRPC_SLICE_MALLOC(encoded_length); outputstream = pb_ostream_from_buffer(GRPC_SLICE_START_PTR(slice), encoded_length); GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields, @@ -122,6 +152,9 @@ grpc_grpclb_initial_response *grpc_grpclb_initial_response_parse( gpr_log(GPR_ERROR, "nanopb error: %s", PB_GET_ERROR(&stream)); return NULL; } + + if (!res.has_initial_response) return NULL; + grpc_grpclb_initial_response *initial_res = gpr_malloc(sizeof(grpc_grpclb_initial_response)); memcpy(initial_res, &res.initial_response, @@ -243,6 +276,15 @@ int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, return 0; } +gpr_timespec grpc_grpclb_duration_to_timespec( + grpc_grpclb_duration *duration_pb) { + gpr_timespec duration; + duration.tv_sec = duration_pb->has_seconds ? duration_pb->seconds : 0; + duration.tv_nsec = duration_pb->has_nanos ? duration_pb->nanos : 0; + duration.clock_type = GPR_TIMESPAN; + return duration; +} + void grpc_grpclb_initial_response_destroy( grpc_grpclb_initial_response *response) { gpr_free(response); diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h index d014b8800c..06873821bd 100644 --- a/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h +++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h @@ -36,6 +36,7 @@ #include <grpc/slice_buffer.h> +#include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h" #include "src/core/ext/filters/client_channel/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" @@ -58,6 +59,8 @@ typedef struct grpc_grpclb_serverlist { /** Create a request for a gRPC LB service under \a lb_service_name */ grpc_grpclb_request *grpc_grpclb_request_create(const char *lb_service_name); +grpc_grpclb_request *grpc_grpclb_load_report_request_create( + grpc_grpclb_client_stats *client_stats); /** Protocol Buffers v3-encode \a request */ grpc_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request); @@ -93,6 +96,9 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist); int grpc_grpclb_duration_compare(const grpc_grpclb_duration *lhs, const grpc_grpclb_duration *rhs); +gpr_timespec grpc_grpclb_duration_to_timespec( + grpc_grpclb_duration *duration_pb); + /** Destroy \a initial_response */ void grpc_grpclb_initial_response_destroy( grpc_grpclb_initial_response *response); diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index 2b77cd39b8..b1c5dfc61c 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -189,7 +189,8 @@ static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index ff41e61b3e..4c17f9c082 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -414,7 +414,8 @@ static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, + grpc_connected_subchannel **target, + grpc_call_context_element *context, void **user_data, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; diff --git a/src/core/ext/filters/client_channel/subchannel.c b/src/core/ext/filters/client_channel/subchannel.c index 967e571221..1af3393a62 100644 --- a/src/core/ext/filters/client_channel/subchannel.c +++ b/src/core/ext/filters/client_channel/subchannel.c @@ -615,7 +615,7 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, elem->filter->start_transport_op(exec_ctx, elem, op); } -static void publish_transport_locked(grpc_exec_ctx *exec_ctx, +static bool publish_transport_locked(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_connected_subchannel *con; grpc_channel_stack *stk; @@ -631,15 +631,16 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, if (!grpc_channel_init_create_stack(exec_ctx, builder, GRPC_CLIENT_SUBCHANNEL)) { grpc_channel_stack_builder_destroy(exec_ctx, builder); - abort(); /* TODO(ctiller): what to do here (previously we just crashed) */ + return false; } grpc_error *error = grpc_channel_stack_builder_finish( exec_ctx, builder, 0, 1, connection_destroy, NULL, (void **)&con); if (error != GRPC_ERROR_NONE) { + grpc_transport_destroy(exec_ctx, c->connecting_result.transport); gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", grpc_error_string(error)); GRPC_ERROR_UNREF(error); - abort(); /* TODO(ctiller): what to do here? */ + return false; } stk = CHANNEL_STACK_FROM_CONNECTION(con); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -656,7 +657,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, grpc_channel_stack_destroy(exec_ctx, stk); gpr_free(con); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); - return; + return false; } /* publish */ @@ -678,6 +679,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, /* signal completion */ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connected"); + return true; } static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, @@ -688,8 +690,9 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); gpr_mu_lock(&c->mu); c->connecting = false; - if (c->connecting_result.transport != NULL) { - publish_transport_locked(exec_ctx, c); + if (c->connecting_result.transport != NULL && + publish_transport_locked(exec_ctx, c)) { + /* do nothing, transport was published */ } else if (c->disconnected) { GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { @@ -778,7 +781,7 @@ grpc_error *grpc_connected_subchannel_create_call( (*call)->connection = GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); const grpc_call_element_args call_args = {.call_stack = callstk, .server_transport_data = NULL, - .context = NULL, + .context = args->context, .path = args->path, .start_time = args->start_time, .deadline = args->deadline, diff --git a/src/core/ext/filters/client_channel/subchannel.h b/src/core/ext/filters/client_channel/subchannel.h index 6473de49b0..e433c33e40 100644 --- a/src/core/ext/filters/client_channel/subchannel.h +++ b/src/core/ext/filters/client_channel/subchannel.h @@ -119,6 +119,7 @@ typedef struct { gpr_timespec start_time; gpr_timespec deadline; gpr_arena *arena; + grpc_call_context_element *context; } grpc_connected_subchannel_call_args; grpc_error *grpc_connected_subchannel_create_call( diff --git a/src/core/ext/filters/http/client/http_client_filter.c b/src/core/ext/filters/http/client/http_client_filter.c index bf5fbc7da7..d8ae080eee 100644 --- a/src/core/ext/filters/http/client/http_client_filter.c +++ b/src/core/ext/filters/http/client/http_client_filter.c @@ -323,7 +323,7 @@ static grpc_error *hc_mutate_op(grpc_exec_ctx *exec_ctx, estimated_len += grpc_base64_estimate_encoded_size( op->payload->send_message.send_message->length, k_url_safe, k_multi_line); - grpc_slice path_with_query_slice = grpc_slice_malloc(estimated_len); + grpc_slice path_with_query_slice = GRPC_SLICE_MALLOC(estimated_len); /* memcopy individual pieces into this slice */ uint8_t *write_ptr = diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index f414a60eee..1da8cf69cb 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -49,8 +49,6 @@ #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" -int grpc_compression_trace = 0; - #define INITIAL_METADATA_UNSEEN 0 #define HAS_COMPRESSION_ALGORITHM 2 #define NO_COMPRESSION_ALGORITHM 4 @@ -279,13 +277,10 @@ static void compress_start_transport_stream_op_batch( GPR_TIMER_BEGIN("compress_start_transport_stream_op_batch", 0); if (op->cancel_stream) { - gpr_atm cur; GRPC_ERROR_REF(op->payload->cancel_stream.cancel_error); - do { - cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); - } while (!gpr_atm_rel_cas( - &calld->send_initial_metadata_state, cur, - CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error)); + gpr_atm cur = gpr_atm_full_xchg( + &calld->send_initial_metadata_state, + CANCELLED_BIT | (gpr_atm)op->payload->cancel_stream.cancel_error); switch (cur) { case HAS_COMPRESSION_ALGORITHM: case NO_COMPRESSION_ALGORITHM: @@ -313,13 +308,18 @@ static void compress_start_transport_stream_op_batch( grpc_transport_stream_op_batch_finish_with_failure(exec_ctx, op, error); return; } - gpr_atm cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); + gpr_atm cur; + retry_send_im: + cur = gpr_atm_acq_load(&calld->send_initial_metadata_state); GPR_ASSERT(cur != HAS_COMPRESSION_ALGORITHM && cur != NO_COMPRESSION_ALGORITHM); if ((cur & CANCELLED_BIT) == 0) { - gpr_atm_rel_store(&calld->send_initial_metadata_state, - has_compression_algorithm ? HAS_COMPRESSION_ALGORITHM - : NO_COMPRESSION_ALGORITHM); + if (!gpr_atm_rel_cas(&calld->send_initial_metadata_state, cur, + has_compression_algorithm + ? HAS_COMPRESSION_ALGORITHM + : NO_COMPRESSION_ALGORITHM)) { + goto retry_send_im; + } if (cur != INITIAL_METADATA_UNSEEN) { grpc_call_next_op(exec_ctx, elem, (grpc_transport_stream_op_batch *)cur); diff --git a/src/core/ext/filters/message_size/message_size_filter.c b/src/core/ext/filters/message_size/message_size_filter.c index db0f011905..b615116965 100644 --- a/src/core/ext/filters/message_size/message_size_filter.c +++ b/src/core/ext/filters/message_size/message_size_filter.c @@ -50,19 +50,10 @@ typedef struct message_size_limits { int max_recv_size; } message_size_limits; -static void* message_size_limits_copy(void* value) { - void* new_value = gpr_malloc(sizeof(message_size_limits)); - memcpy(new_value, value, sizeof(message_size_limits)); - return new_value; -} - static void message_size_limits_free(grpc_exec_ctx* exec_ctx, void* value) { gpr_free(value); } -static const grpc_slice_hash_table_vtable message_size_limits_vtable = { - message_size_limits_free, message_size_limits_copy}; - static void* message_size_limits_create_from_json(const grpc_json* json) { int max_request_message_bytes = -1; int max_response_message_bytes = -1; @@ -130,6 +121,8 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, GRPC_ERROR_UNREF(new_error); } gpr_free(message_string); + } else { + GRPC_ERROR_REF(error); } // Invoke the next callback. grpc_closure_run(exec_ctx, calld->next_recv_message_ready, error); @@ -255,7 +248,7 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, chand->method_limit_table = grpc_service_config_create_method_config_table( exec_ctx, service_config, message_size_limits_create_from_json, - &message_size_limits_vtable); + message_size_limits_free); grpc_service_config_destroy(service_config); } } diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index b463506a98..b9c62c376a 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -80,7 +80,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&connection_state->server_state->mu); if (error != GRPC_ERROR_NONE || connection_state->server_state->shutdown) { const char *error_str = grpc_error_string(error); - gpr_log(GPR_ERROR, "Handshaking failed: %s", error_str); + gpr_log(GPR_DEBUG, "Handshaking failed: %s", error_str); if (error == GRPC_ERROR_NONE && args->endpoint != NULL) { // We were shut down after handshaking completed successfully, so diff --git a/src/core/ext/transport/chttp2/transport/bin_decoder.c b/src/core/ext/transport/chttp2/transport/bin_decoder.c index 8c87de112e..21bed18c9a 100644 --- a/src/core/ext/transport/chttp2/transport/bin_decoder.c +++ b/src/core/ext/transport/chttp2/transport/bin_decoder.c @@ -169,7 +169,7 @@ grpc_slice grpc_chttp2_base64_decode(grpc_exec_ctx *exec_ctx, } } } - output = grpc_slice_malloc(output_length); + output = GRPC_SLICE_MALLOC(output_length); ctx.input_cur = GRPC_SLICE_START_PTR(input); ctx.input_end = GRPC_SLICE_END_PTR(input); @@ -193,7 +193,7 @@ grpc_slice grpc_chttp2_base64_decode_with_length(grpc_exec_ctx *exec_ctx, grpc_slice input, size_t output_length) { size_t input_length = GRPC_SLICE_LENGTH(input); - grpc_slice output = grpc_slice_malloc(output_length); + grpc_slice output = GRPC_SLICE_MALLOC(output_length); struct grpc_base64_decode_context ctx; // The length of a base64 string cannot be 4 * n + 1 diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.c b/src/core/ext/transport/chttp2/transport/bin_encoder.c index e301c073f3..3b8ab1f17a 100644 --- a/src/core/ext/transport/chttp2/transport/bin_encoder.c +++ b/src/core/ext/transport/chttp2/transport/bin_encoder.c @@ -66,7 +66,7 @@ grpc_slice grpc_chttp2_base64_encode(grpc_slice input) { size_t input_triplets = input_length / 3; size_t tail_case = input_length % 3; size_t output_length = input_triplets * 4 + tail_xtra[tail_case]; - grpc_slice output = grpc_slice_malloc(output_length); + grpc_slice output = GRPC_SLICE_MALLOC(output_length); uint8_t *in = GRPC_SLICE_START_PTR(input); char *out = (char *)GRPC_SLICE_START_PTR(output); size_t i; @@ -119,7 +119,7 @@ grpc_slice grpc_chttp2_huffman_compress(grpc_slice input) { nbits += grpc_chttp2_huffsyms[*in].length; } - output = grpc_slice_malloc(nbits / 8 + (nbits % 8 != 0)); + output = GRPC_SLICE_MALLOC(nbits / 8 + (nbits % 8 != 0)); out = GRPC_SLICE_START_PTR(output); for (in = GRPC_SLICE_START_PTR(input); in != GRPC_SLICE_END_PTR(input); ++in) { @@ -184,7 +184,7 @@ grpc_slice grpc_chttp2_base64_encode_and_huffman_compress(grpc_slice input) { size_t output_syms = input_triplets * 4 + tail_xtra[tail_case]; size_t max_output_bits = 11 * output_syms; size_t max_output_length = max_output_bits / 8 + (max_output_bits % 8 != 0); - grpc_slice output = grpc_slice_malloc(max_output_length); + grpc_slice output = GRPC_SLICE_MALLOC(max_output_length); uint8_t *in = GRPC_SLICE_START_PTR(input); uint8_t *start_out = GRPC_SLICE_START_PTR(output); huff_out out; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 5d74532ede..d6b79bd492 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1969,7 +1969,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, the time we got around to sending this, so instead we ignore HPACK compression and just write the uncompressed bytes onto the wire. */ if (!s->sent_initial_metadata) { - http_status_hdr = grpc_slice_malloc(13); + http_status_hdr = GRPC_SLICE_MALLOC(13); p = GRPC_SLICE_START_PTR(http_status_hdr); *p++ = 0x00; *p++ = 7; @@ -1987,7 +1987,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(p == GRPC_SLICE_END_PTR(http_status_hdr)); len += (uint32_t)GRPC_SLICE_LENGTH(http_status_hdr); - content_type_hdr = grpc_slice_malloc(31); + content_type_hdr = GRPC_SLICE_MALLOC(31); p = GRPC_SLICE_START_PTR(content_type_hdr); *p++ = 0x00; *p++ = 12; @@ -2024,7 +2024,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, len += (uint32_t)GRPC_SLICE_LENGTH(content_type_hdr); } - status_hdr = grpc_slice_malloc(15 + (grpc_status >= 10)); + status_hdr = GRPC_SLICE_MALLOC(15 + (grpc_status >= 10)); p = GRPC_SLICE_START_PTR(status_hdr); *p++ = 0x00; /* literal header, not indexed */ *p++ = 11; /* len(grpc-status) */ @@ -2053,7 +2053,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, size_t msg_len = GRPC_SLICE_LENGTH(slice); GPR_ASSERT(msg_len <= UINT32_MAX); uint32_t msg_len_len = GRPC_CHTTP2_VARINT_LENGTH((uint32_t)msg_len, 1); - message_pfx = grpc_slice_malloc(14 + msg_len_len); + message_pfx = GRPC_SLICE_MALLOC(14 + msg_len_len); p = GRPC_SLICE_START_PTR(message_pfx); *p++ = 0x00; /* literal header, not indexed */ *p++ = 12; /* len(grpc-message) */ @@ -2075,7 +2075,7 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, len += (uint32_t)GRPC_SLICE_LENGTH(message_pfx); len += (uint32_t)msg_len; - hdr = grpc_slice_malloc(9); + hdr = GRPC_SLICE_MALLOC(9); p = GRPC_SLICE_START_PTR(hdr); *p++ = (uint8_t)(len >> 16); *p++ = (uint8_t)(len >> 8); diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index 9aba3ea445..8cb8489794 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -92,7 +92,7 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, uint8_t *p; static const size_t header_size = 9; - hdr = grpc_slice_malloc(header_size); + hdr = GRPC_SLICE_MALLOC(header_size); p = GRPC_SLICE_START_PTR(hdr); GPR_ASSERT(write_bytes < (1 << 24)); *p++ = (uint8_t)(write_bytes >> 16); @@ -106,7 +106,7 @@ void grpc_chttp2_encode_data(uint32_t id, grpc_slice_buffer *inbuf, *p++ = (uint8_t)(id); grpc_slice_buffer_add(outbuf, hdr); - grpc_slice_buffer_move_first(inbuf, write_bytes, outbuf); + grpc_slice_buffer_move_first_no_ref(inbuf, write_bytes, outbuf); stats->framing_bytes += header_size; stats->data_bytes += write_bytes; diff --git a/src/core/ext/transport/chttp2/transport/frame_goaway.c b/src/core/ext/transport/chttp2/transport/frame_goaway.c index 001271dd22..0f1c8b0772 100644 --- a/src/core/ext/transport/chttp2/transport/frame_goaway.c +++ b/src/core/ext/transport/chttp2/transport/frame_goaway.c @@ -163,7 +163,7 @@ grpc_error *grpc_chttp2_goaway_parser_parse(grpc_exec_ctx *exec_ctx, void grpc_chttp2_goaway_append(uint32_t last_stream_id, uint32_t error_code, grpc_slice debug_data, grpc_slice_buffer *slice_buffer) { - grpc_slice header = grpc_slice_malloc(9 + 4 + 4); + grpc_slice header = GRPC_SLICE_MALLOC(9 + 4 + 4); uint8_t *p = GRPC_SLICE_START_PTR(header); uint32_t frame_length; GPR_ASSERT(GRPC_SLICE_LENGTH(debug_data) < UINT32_MAX - 4 - 4); diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index 6016e43127..f09ca60739 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -43,7 +43,7 @@ static bool g_disable_ping_ack = false; grpc_slice grpc_chttp2_ping_create(uint8_t ack, uint64_t opaque_8bytes) { - grpc_slice slice = grpc_slice_malloc(9 + 8); + grpc_slice slice = GRPC_SLICE_MALLOC(9 + 8); uint8_t *p = GRPC_SLICE_START_PTR(slice); *p++ = 0; diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c index 225f15c77c..e0caa50e92 100644 --- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c +++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c @@ -44,7 +44,7 @@ grpc_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code, grpc_transport_one_way_stats *stats) { static const size_t frame_size = 13; - grpc_slice slice = grpc_slice_malloc(frame_size); + grpc_slice slice = GRPC_SLICE_MALLOC(frame_size); stats->framing_bytes += frame_size; uint8_t *p = GRPC_SLICE_START_PTR(slice); diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.c b/src/core/ext/transport/chttp2/transport/frame_settings.c index 4f2b827832..e3cd70d3f3 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.c +++ b/src/core/ext/transport/chttp2/transport/frame_settings.c @@ -70,7 +70,7 @@ grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, n += (new[i] != old[i] || (force_mask & (1u << i)) != 0); } - output = grpc_slice_malloc(9 + 6 * n); + output = GRPC_SLICE_MALLOC(9 + 6 * n); p = fill_header(GRPC_SLICE_START_PTR(output), 6 * n, 0); for (i = 0; i < count; i++) { @@ -91,7 +91,7 @@ grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, } grpc_slice grpc_chttp2_settings_ack_create(void) { - grpc_slice output = grpc_slice_malloc(9); + grpc_slice output = GRPC_SLICE_MALLOC(9); fill_header(GRPC_SLICE_START_PTR(output), 0, GRPC_CHTTP2_FLAG_ACK); return output; } diff --git a/src/core/ext/transport/chttp2/transport/frame_window_update.c b/src/core/ext/transport/chttp2/transport/frame_window_update.c index b76b6f6f47..8ed72dddca 100644 --- a/src/core/ext/transport/chttp2/transport/frame_window_update.c +++ b/src/core/ext/transport/chttp2/transport/frame_window_update.c @@ -41,7 +41,7 @@ grpc_slice grpc_chttp2_window_update_create( uint32_t id, uint32_t window_update, grpc_transport_one_way_stats *stats) { static const size_t frame_size = 13; - grpc_slice slice = grpc_slice_malloc(frame_size); + grpc_slice slice = GRPC_SLICE_MALLOC(frame_size); stats->header_bytes += frame_size; uint8_t *p = GRPC_SLICE_START_PTR(slice); diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index b1bc677a7a..8fdd4ee77c 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -123,7 +123,7 @@ static void finish_frame(framer_state *st, int is_header_boundary, output before beginning */ static void begin_frame(framer_state *st) { st->header_idx = - grpc_slice_buffer_add_indexed(st->output, grpc_slice_malloc(9)); + grpc_slice_buffer_add_indexed(st->output, GRPC_SLICE_MALLOC(9)); st->output_length_at_start_of_frame = st->output->length; } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 7896c70f9e..67974b0b6a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -886,6 +886,10 @@ static bool op_can_be_run(grpc_transport_stream_op_batch *curr_op, !stream_state->state_op_done[OP_RECV_MESSAGE]) { CRONET_LOG(GPR_DEBUG, "Because"); result = false; + } else if (curr_op->cancel_stream && + !stream_state->state_callback_received[OP_CANCELED]) { + CRONET_LOG(GPR_DEBUG, "Because"); + result = false; } else if (curr_op->recv_trailing_metadata) { /* We aren't done with trailing metadata yet */ if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) { @@ -1177,7 +1181,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_state->rs.remaining_bytes == 0) { CRONET_LOG(GPR_DEBUG, "read operation complete"); grpc_slice read_data_slice = - grpc_slice_malloc((uint32_t)stream_state->rs.length_field); + GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field); uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 22caf24373..d8985268eb 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -128,7 +128,9 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *cd = (channel_data *)elem->channel_data; - grpc_transport_destroy(exec_ctx, cd->transport); + if (cd->transport) { + grpc_transport_destroy(exec_ctx, cd->transport); + } } static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { diff --git a/src/core/lib/channel/context.h b/src/core/lib/channel/context.h index 6c931ad28a..f0a21113c5 100644 --- a/src/core/lib/channel/context.h +++ b/src/core/lib/channel/context.h @@ -50,6 +50,9 @@ typedef enum { /// Reserved for traffic_class_context. GRPC_CONTEXT_TRAFFIC, + /// Value is a \a grpc_grpclb_client_stats. + GRPC_GRPCLB_CLIENT_STATS, + GRPC_CONTEXT_COUNT } grpc_context_index; diff --git a/src/core/lib/compression/message_compress.c b/src/core/lib/compression/message_compress.c index 49beb953b0..fd3d1e6fcc 100644 --- a/src/core/lib/compression/message_compress.c +++ b/src/core/lib/compression/message_compress.c @@ -50,7 +50,7 @@ static int zlib_body(grpc_exec_ctx* exec_ctx, z_stream* zs, int r; int flush; size_t i; - grpc_slice outbuf = grpc_slice_malloc(OUTPUT_BLOCK_SIZE); + grpc_slice outbuf = GRPC_SLICE_MALLOC(OUTPUT_BLOCK_SIZE); const uInt uint_max = ~(uInt)0; GPR_ASSERT(GRPC_SLICE_LENGTH(outbuf) <= uint_max); @@ -65,7 +65,7 @@ static int zlib_body(grpc_exec_ctx* exec_ctx, z_stream* zs, do { if (zs->avail_out == 0) { grpc_slice_buffer_add_indexed(output, outbuf); - outbuf = grpc_slice_malloc(OUTPUT_BLOCK_SIZE); + outbuf = GRPC_SLICE_MALLOC(OUTPUT_BLOCK_SIZE); GPR_ASSERT(GRPC_SLICE_LENGTH(outbuf) <= uint_max); zs->avail_out = (uInt)GRPC_SLICE_LENGTH(outbuf); zs->next_out = GRPC_SLICE_START_PTR(outbuf); diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 453a64b049..0ac2c2ad52 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -105,7 +105,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); - grpc_closure_sched(exec_ctx, req->on_done, error); + grpc_closure_sched(exec_ctx, req->on_done, GRPC_ERROR_REF(error)); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 9eab1360a4..76946434f0 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -106,9 +106,8 @@ static grpc_security_connector_vtable httpcli_ssl_vtable = { httpcli_ssl_destroy, httpcli_ssl_check_peer}; static grpc_security_status httpcli_ssl_channel_security_connector_create( - grpc_exec_ctx *exec_ctx, const unsigned char *pem_root_certs, - size_t pem_root_certs_size, const char *secure_peer_name, - grpc_channel_security_connector **sc) { + grpc_exec_ctx *exec_ctx, const char *pem_root_certs, + const char *secure_peer_name, grpc_channel_security_connector **sc) { tsi_result result = TSI_OK; grpc_httpcli_ssl_channel_security_connector *c; @@ -126,8 +125,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( c->secure_peer_name = gpr_strdup(secure_peer_name); } result = tsi_create_ssl_client_handshaker_factory( - NULL, 0, NULL, 0, pem_root_certs, pem_root_certs_size, NULL, NULL, NULL, - 0, &c->handshaker_factory); + NULL, pem_root_certs, NULL, NULL, 0, &c->handshaker_factory); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", tsi_result_to_string(result)); @@ -173,10 +171,9 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg, void (*on_done)(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *endpoint)) { grpc_channel_security_connector *sc = NULL; - const unsigned char *pem_root_certs = NULL; on_done_closure *c = gpr_malloc(sizeof(*c)); - size_t pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs); - if (pem_root_certs == NULL || pem_root_certs_size == 0) { + const char *pem_root_certs = grpc_get_default_ssl_roots(); + if (pem_root_certs == NULL) { gpr_log(GPR_ERROR, "Could not get default pem root certs."); on_done(exec_ctx, arg, NULL); gpr_free(c); @@ -186,8 +183,7 @@ static void ssl_handshake(grpc_exec_ctx *exec_ctx, void *arg, c->arg = arg; c->handshake_mgr = grpc_handshake_manager_create(); GPR_ASSERT(httpcli_ssl_channel_security_connector_create( - exec_ctx, pem_root_certs, pem_root_certs_size, host, &sc) == - GRPC_SECURITY_OK); + exec_ctx, pem_root_certs, host, &sc) == GRPC_SECURITY_OK); grpc_channel_security_connector_add_handshakers(exec_ctx, sc, c->handshake_mgr); grpc_handshake_manager_do_handshake( diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index 269dc35003..2a553f4114 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -88,6 +88,7 @@ #ifndef __GLIBC__ #define GRPC_LINUX_EPOLL 1 #define GRPC_LINUX_EVENTFD 1 +#define GRPC_MSG_IOVLEN_TYPE int #endif #ifndef GRPC_LINUX_EVENTFD #define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 diff --git a/src/core/lib/iomgr/socket_mutator.h b/src/core/lib/iomgr/socket_mutator.h index 2f5b6c248e..28b1710ec4 100644 --- a/src/core/lib/iomgr/socket_mutator.h +++ b/src/core/lib/iomgr/socket_mutator.h @@ -37,6 +37,8 @@ #include <grpc/impl/codegen/grpc_types.h> #include <grpc/support/sync.h> +#include <stdbool.h> + #ifdef __cplusplus extern "C" { #endif diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index f74aa68793..bdd4dd07af 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -219,7 +219,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->read_slices = read_slices; grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices); - tcp->read_slice = grpc_slice_malloc(8192); + tcp->read_slice = GRPC_SLICE_MALLOC(8192); buffer.len = (ULONG)GRPC_SLICE_LENGTH( tcp->read_slice); // we know slice size fits in 32bit. diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index ca283d034f..af70746064 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -92,6 +92,11 @@ struct grpc_udp_listener { struct grpc_udp_listener *next; }; +struct shutdown_fd_args { + grpc_fd *fd; + gpr_mu *server_mu; +}; + /* the overall server */ struct grpc_udp_server { gpr_mu mu; @@ -151,8 +156,13 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { return s; } -static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *fd, grpc_error *error) { - grpc_fd_shutdown(exec_ctx, (grpc_fd *)fd, GRPC_ERROR_REF(error)); +static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *args, + grpc_error *error) { + struct shutdown_fd_args *shutdown_args = (struct shutdown_fd_args *)args; + gpr_mu_lock(shutdown_args->server_mu); + grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error)); + gpr_mu_unlock(shutdown_args->server_mu); + gpr_free(shutdown_args); } static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -242,7 +252,10 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); - grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, sp->emfd, + struct shutdown_fd_args *args = gpr_malloc(sizeof(*args)); + args->fd = sp->emfd; + args->server_mu = &s->mu; + grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, args, grpc_schedule_on_exec_ctx); sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.c b/src/core/lib/security/credentials/ssl/ssl_credentials.c index b63bb6b6e9..7c35ebe684 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.c +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.c @@ -40,28 +40,24 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> // -// Utils +// SSL Channel Credentials. // -static void ssl_copy_key_material(const char *input, unsigned char **output, - size_t *output_size) { - *output_size = strlen(input); - *output = gpr_malloc(*output_size); - memcpy(*output, input, *output_size); +static void ssl_config_pem_key_cert_pair_destroy( + tsi_ssl_pem_key_cert_pair *kp) { + if (kp == NULL) return; + gpr_free((void *)kp->private_key); + gpr_free((void *)kp->cert_chain); } -// -// SSL Channel Credentials. -// - static void ssl_destruct(grpc_exec_ctx *exec_ctx, grpc_channel_credentials *creds) { grpc_ssl_credentials *c = (grpc_ssl_credentials *)creds; - if (c->config.pem_root_certs != NULL) gpr_free(c->config.pem_root_certs); - if (c->config.pem_private_key != NULL) gpr_free(c->config.pem_private_key); - if (c->config.pem_cert_chain != NULL) gpr_free(c->config.pem_cert_chain); + gpr_free(c->config.pem_root_certs); + ssl_config_pem_key_cert_pair_destroy(&c->config.pem_key_cert_pair); } static grpc_security_status ssl_create_security_connector( @@ -102,18 +98,15 @@ static void ssl_build_config(const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair, grpc_ssl_config *config) { if (pem_root_certs != NULL) { - ssl_copy_key_material(pem_root_certs, &config->pem_root_certs, - &config->pem_root_certs_size); + config->pem_root_certs = gpr_strdup(pem_root_certs); } if (pem_key_cert_pair != NULL) { GPR_ASSERT(pem_key_cert_pair->private_key != NULL); GPR_ASSERT(pem_key_cert_pair->cert_chain != NULL); - ssl_copy_key_material(pem_key_cert_pair->private_key, - &config->pem_private_key, - &config->pem_private_key_size); - ssl_copy_key_material(pem_key_cert_pair->cert_chain, - &config->pem_cert_chain, - &config->pem_cert_chain_size); + config->pem_key_cert_pair.cert_chain = + gpr_strdup(pem_key_cert_pair->cert_chain); + config->pem_key_cert_pair.private_key = + gpr_strdup(pem_key_cert_pair->private_key); } } @@ -143,22 +136,10 @@ static void ssl_server_destruct(grpc_exec_ctx *exec_ctx, grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds; size_t i; for (i = 0; i < c->config.num_key_cert_pairs; i++) { - if (c->config.pem_private_keys[i] != NULL) { - gpr_free(c->config.pem_private_keys[i]); - } - if (c->config.pem_cert_chains[i] != NULL) { - gpr_free(c->config.pem_cert_chains[i]); - } - } - if (c->config.pem_private_keys != NULL) gpr_free(c->config.pem_private_keys); - if (c->config.pem_private_keys_sizes != NULL) { - gpr_free(c->config.pem_private_keys_sizes); - } - if (c->config.pem_cert_chains != NULL) gpr_free(c->config.pem_cert_chains); - if (c->config.pem_cert_chains_sizes != NULL) { - gpr_free(c->config.pem_cert_chains_sizes); + ssl_config_pem_key_cert_pair_destroy(&c->config.pem_key_cert_pairs[i]); } - if (c->config.pem_root_certs != NULL) gpr_free(c->config.pem_root_certs); + gpr_free(c->config.pem_key_cert_pairs); + gpr_free(c->config.pem_root_certs); } static grpc_security_status ssl_server_create_security_connector( @@ -179,30 +160,21 @@ static void ssl_build_server_config( size_t i; config->client_certificate_request = client_certificate_request; if (pem_root_certs != NULL) { - ssl_copy_key_material(pem_root_certs, &config->pem_root_certs, - &config->pem_root_certs_size); + config->pem_root_certs = gpr_strdup(pem_root_certs); } if (num_key_cert_pairs > 0) { GPR_ASSERT(pem_key_cert_pairs != NULL); - config->pem_private_keys = - gpr_malloc(num_key_cert_pairs * sizeof(unsigned char *)); - config->pem_cert_chains = - gpr_malloc(num_key_cert_pairs * sizeof(unsigned char *)); - config->pem_private_keys_sizes = - gpr_malloc(num_key_cert_pairs * sizeof(size_t)); - config->pem_cert_chains_sizes = - gpr_malloc(num_key_cert_pairs * sizeof(size_t)); + config->pem_key_cert_pairs = + gpr_zalloc(num_key_cert_pairs * sizeof(tsi_ssl_pem_key_cert_pair)); } config->num_key_cert_pairs = num_key_cert_pairs; for (i = 0; i < num_key_cert_pairs; i++) { GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL); GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL); - ssl_copy_key_material(pem_key_cert_pairs[i].private_key, - &config->pem_private_keys[i], - &config->pem_private_keys_sizes[i]); - ssl_copy_key_material(pem_key_cert_pairs[i].cert_chain, - &config->pem_cert_chains[i], - &config->pem_cert_chains_sizes[i]); + config->pem_key_cert_pairs[i].cert_chain = + gpr_strdup(pem_key_cert_pairs[i].cert_chain); + config->pem_key_cert_pairs[i].private_key = + gpr_strdup(pem_key_cert_pairs[i].private_key); } } diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index f526653ffa..1f0daf7325 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -343,8 +343,16 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element_args *args) { grpc_security_connector *sc = grpc_security_connector_find_in_args(args->channel_args); + if (sc == NULL) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Security connector missing from client auth filter args"); + } grpc_auth_context *auth_context = grpc_find_auth_context_in_args(args->channel_args); + if (auth_context == NULL) { + return GRPC_ERROR_CREATE_FROM_STATIC_STRING( + "Auth context missing from client auth filter args"); + } /* grab pointers to our data from the channel element */ channel_data *chand = elem->channel_data; @@ -353,8 +361,6 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, handle the case that there's no 'next' filter to call on the up or down path */ GPR_ASSERT(!args->is_last); - GPR_ASSERT(sc != NULL); - GPR_ASSERT(auth_context != NULL); /* initialize members */ chand->security_connector = diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 24da949e48..0d5c7432c6 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -130,7 +130,7 @@ static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); } static void flush_read_staging_buffer(secure_endpoint *ep, uint8_t **cur, uint8_t **end) { grpc_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer); - ep->read_staging_buffer = grpc_slice_malloc(STAGING_BUFFER_SIZE); + ep->read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE); *cur = GRPC_SLICE_START_PTR(ep->read_staging_buffer); *end = GRPC_SLICE_END_PTR(ep->read_staging_buffer); } @@ -252,7 +252,7 @@ static void endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, static void flush_write_staging_buffer(secure_endpoint *ep, uint8_t **cur, uint8_t **end) { grpc_slice_buffer_add(&ep->output_buffer, ep->write_staging_buffer); - ep->write_staging_buffer = grpc_slice_malloc(STAGING_BUFFER_SIZE); + ep->write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE); *cur = GRPC_SLICE_START_PTR(ep->write_staging_buffer); *end = GRPC_SLICE_END_PTR(ep->write_staging_buffer); } @@ -415,8 +415,8 @@ grpc_endpoint *grpc_secure_endpoint_create( grpc_slice_buffer_add(&ep->leftover_bytes, grpc_slice_ref_internal(leftover_slices[i])); } - ep->write_staging_buffer = grpc_slice_malloc(STAGING_BUFFER_SIZE); - ep->read_staging_buffer = grpc_slice_malloc(STAGING_BUFFER_SIZE); + ep->write_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE); + ep->read_staging_buffer = GRPC_SLICE_MALLOC(STAGING_BUFFER_SIZE); grpc_slice_buffer_init(&ep->output_buffer); grpc_slice_buffer_init(&ep->source_buffer); ep->read_buffer = NULL; diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index b15196e677..30431a4e4a 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -78,9 +78,8 @@ void grpc_set_ssl_roots_override_callback(grpc_ssl_roots_override_callback cb) { /* Defines the cipher suites that we accept by default. All these cipher suites are compliant with HTTP2. */ -#define GRPC_SSL_CIPHER_SUITES \ - "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-" \ - "SHA384:ECDHE-RSA-AES256-GCM-SHA384" +#define GRPC_SSL_CIPHER_SUITES \ + "ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384" static gpr_once cipher_suites_once = GPR_ONCE_INIT; static const char *cipher_suites = NULL; @@ -695,6 +694,7 @@ static grpc_security_connector_vtable ssl_channel_vtable = { static grpc_security_connector_vtable ssl_server_vtable = { ssl_server_destroy, ssl_server_check_peer}; +/* returns a NULL terminated slice. */ static grpc_slice compute_default_pem_root_certs_once(void) { grpc_slice result = grpc_empty_slice(); @@ -703,7 +703,7 @@ static grpc_slice compute_default_pem_root_certs_once(void) { gpr_getenv(GRPC_DEFAULT_SSL_ROOTS_FILE_PATH_ENV_VAR); if (default_root_certs_path != NULL) { GRPC_LOG_IF_ERROR("load_file", - grpc_load_file(default_root_certs_path, 0, &result)); + grpc_load_file(default_root_certs_path, 1, &result)); gpr_free(default_root_certs_path); } @@ -714,15 +714,18 @@ static grpc_slice compute_default_pem_root_certs_once(void) { ovrd_res = ssl_roots_override_cb(&pem_root_certs); if (ovrd_res == GRPC_SSL_ROOTS_OVERRIDE_OK) { GPR_ASSERT(pem_root_certs != NULL); - result = grpc_slice_new(pem_root_certs, strlen(pem_root_certs), gpr_free); + result = grpc_slice_from_copied_buffer( + pem_root_certs, + strlen(pem_root_certs) + 1); // NULL terminator. } + gpr_free(pem_root_certs); } /* Fall back to installed certs if needed. */ if (GRPC_SLICE_IS_EMPTY(result) && ovrd_res != GRPC_SSL_ROOTS_OVERRIDE_FAIL_PERMANENTLY) { GRPC_LOG_IF_ERROR("load_file", - grpc_load_file(installed_roots_path, 0, &result)); + grpc_load_file(installed_roots_path, 1, &result)); } return result; } @@ -762,13 +765,14 @@ get_tsi_client_certificate_request_type( } } -size_t grpc_get_default_ssl_roots(const unsigned char **pem_root_certs) { +const char *grpc_get_default_ssl_roots(void) { /* TODO(jboeuf@google.com): Maybe revisit the approach which consists in loading all the roots once for the lifetime of the process. */ static gpr_once once = GPR_ONCE_INIT; gpr_once_init(&once, init_default_pem_root_certs); - *pem_root_certs = GRPC_SLICE_START_PTR(default_pem_root_certs); - return GRPC_SLICE_LENGTH(default_pem_root_certs); + return GRPC_SLICE_IS_EMPTY(default_pem_root_certs) + ? NULL + : (const char *)GRPC_SLICE_START_PTR(default_pem_root_certs); } grpc_security_status grpc_ssl_channel_security_connector_create( @@ -776,22 +780,16 @@ grpc_security_status grpc_ssl_channel_security_connector_create( const grpc_ssl_config *config, const char *target_name, const char *overridden_target_name, grpc_channel_security_connector **sc) { size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); - const unsigned char **alpn_protocol_strings = + const char **alpn_protocol_strings = gpr_malloc(sizeof(const char *) * num_alpn_protocols); - unsigned char *alpn_protocol_string_lengths = - gpr_malloc(sizeof(unsigned char) * num_alpn_protocols); tsi_result result = TSI_OK; grpc_ssl_channel_security_connector *c; size_t i; - const unsigned char *pem_root_certs; - size_t pem_root_certs_size; + const char *pem_root_certs; char *port; for (i = 0; i < num_alpn_protocols; i++) { - alpn_protocol_strings[i] = - (const unsigned char *)grpc_chttp2_get_alpn_version_index(i); - alpn_protocol_string_lengths[i] = - (unsigned char)strlen(grpc_chttp2_get_alpn_version_index(i)); + alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i); } if (config == NULL || target_name == NULL) { @@ -799,14 +797,13 @@ grpc_security_status grpc_ssl_channel_security_connector_create( goto error; } if (config->pem_root_certs == NULL) { - pem_root_certs_size = grpc_get_default_ssl_roots(&pem_root_certs); - if (pem_root_certs == NULL || pem_root_certs_size == 0) { + pem_root_certs = grpc_get_default_ssl_roots(); + if (pem_root_certs == NULL) { gpr_log(GPR_ERROR, "Could not get default pem root certs."); goto error; } } else { pem_root_certs = config->pem_root_certs; - pem_root_certs_size = config->pem_root_certs_size; } c = gpr_zalloc(sizeof(grpc_ssl_channel_security_connector)); @@ -823,11 +820,12 @@ grpc_security_status grpc_ssl_channel_security_connector_create( if (overridden_target_name != NULL) { c->overridden_target_name = gpr_strdup(overridden_target_name); } + + bool has_key_cert_pair = config->pem_key_cert_pair.private_key != NULL && + config->pem_key_cert_pair.cert_chain != NULL; result = tsi_create_ssl_client_handshaker_factory( - config->pem_private_key, config->pem_private_key_size, - config->pem_cert_chain, config->pem_cert_chain_size, pem_root_certs, - pem_root_certs_size, ssl_cipher_suites(), alpn_protocol_strings, - alpn_protocol_string_lengths, (uint16_t)num_alpn_protocols, + has_key_cert_pair ? &config->pem_key_cert_pair : NULL, pem_root_certs, + ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols, &c->handshaker_factory); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", @@ -838,12 +836,10 @@ grpc_security_status grpc_ssl_channel_security_connector_create( } *sc = &c->base; gpr_free((void *)alpn_protocol_strings); - gpr_free(alpn_protocol_string_lengths); return GRPC_SECURITY_OK; error: gpr_free((void *)alpn_protocol_strings); - gpr_free(alpn_protocol_string_lengths); return GRPC_SECURITY_ERROR; } @@ -851,19 +847,14 @@ grpc_security_status grpc_ssl_server_security_connector_create( grpc_exec_ctx *exec_ctx, const grpc_ssl_server_config *config, grpc_server_security_connector **sc) { size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions(); - const unsigned char **alpn_protocol_strings = + const char **alpn_protocol_strings = gpr_malloc(sizeof(const char *) * num_alpn_protocols); - unsigned char *alpn_protocol_string_lengths = - gpr_malloc(sizeof(unsigned char) * num_alpn_protocols); tsi_result result = TSI_OK; grpc_ssl_server_security_connector *c; size_t i; for (i = 0; i < num_alpn_protocols; i++) { - alpn_protocol_strings[i] = - (const unsigned char *)grpc_chttp2_get_alpn_version_index(i); - alpn_protocol_string_lengths[i] = - (unsigned char)strlen(grpc_chttp2_get_alpn_version_index(i)); + alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i); } if (config == NULL || config->num_key_cert_pairs == 0) { @@ -876,15 +867,11 @@ grpc_security_status grpc_ssl_server_security_connector_create( c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; c->base.base.vtable = &ssl_server_vtable; result = tsi_create_ssl_server_handshaker_factory_ex( - (const unsigned char **)config->pem_private_keys, - config->pem_private_keys_sizes, - (const unsigned char **)config->pem_cert_chains, - config->pem_cert_chains_sizes, config->num_key_cert_pairs, - config->pem_root_certs, config->pem_root_certs_size, - get_tsi_client_certificate_request_type( - config->client_certificate_request), - ssl_cipher_suites(), alpn_protocol_strings, alpn_protocol_string_lengths, - (uint16_t)num_alpn_protocols, &c->handshaker_factory); + config->pem_key_cert_pairs, config->num_key_cert_pairs, + config->pem_root_certs, get_tsi_client_certificate_request_type( + config->client_certificate_request), + ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols, + &c->handshaker_factory); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.", tsi_result_to_string(result)); @@ -895,11 +882,9 @@ grpc_security_status grpc_ssl_server_security_connector_create( c->base.add_handshakers = ssl_server_add_handshakers; *sc = &c->base; gpr_free((void *)alpn_protocol_strings); - gpr_free(alpn_protocol_string_lengths); return GRPC_SECURITY_OK; error: gpr_free((void *)alpn_protocol_strings); - gpr_free(alpn_protocol_string_lengths); return GRPC_SECURITY_ERROR; } diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h index cf56cb3183..d74f6739c0 100644 --- a/src/core/lib/security/transport/security_connector.h +++ b/src/core/lib/security/transport/security_connector.h @@ -34,11 +34,14 @@ #ifndef GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_CONNECTOR_H #define GRPC_CORE_LIB_SECURITY_TRANSPORT_SECURITY_CONNECTOR_H +#include <stdbool.h> + #include <grpc/grpc_security.h> #include "src/core/lib/channel/handshaker.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/tcp_server.h" +#include "src/core/tsi/ssl_transport_security.h" #include "src/core/tsi/transport_security_interface.h" /* --- status enum. --- */ @@ -184,13 +187,10 @@ grpc_server_security_connector *grpc_fake_server_security_connector_create( void); /* Config for ssl clients. */ + typedef struct { - unsigned char *pem_private_key; - size_t pem_private_key_size; - unsigned char *pem_cert_chain; - size_t pem_cert_chain_size; - unsigned char *pem_root_certs; - size_t pem_root_certs_size; + tsi_ssl_pem_key_cert_pair pem_key_cert_pair; + char *pem_root_certs; } grpc_ssl_config; /* Creates an SSL channel_security_connector. @@ -211,21 +211,17 @@ grpc_security_status grpc_ssl_channel_security_connector_create( const grpc_ssl_config *config, const char *target_name, const char *overridden_target_name, grpc_channel_security_connector **sc); -/* Gets the default ssl roots. */ -size_t grpc_get_default_ssl_roots(const unsigned char **pem_root_certs); +/* Gets the default ssl roots. Returns NULL if not found. */ +const char *grpc_get_default_ssl_roots(void); /* Exposed for TESTING ONLY!. */ grpc_slice grpc_get_default_ssl_roots_for_testing(void); /* Config for ssl servers. */ typedef struct { - unsigned char **pem_private_keys; - size_t *pem_private_keys_sizes; - unsigned char **pem_cert_chains; - size_t *pem_cert_chains_sizes; + tsi_ssl_pem_key_cert_pair *pem_key_cert_pairs; size_t num_key_cert_pairs; - unsigned char *pem_root_certs; - size_t pem_root_certs_size; + char *pem_root_certs; grpc_ssl_client_certificate_request_type client_certificate_request; } grpc_ssl_server_config; diff --git a/src/core/lib/slice/b64.c b/src/core/lib/slice/b64.c index 2007cc4810..d9091646e0 100644 --- a/src/core/lib/slice/b64.c +++ b/src/core/lib/slice/b64.c @@ -202,7 +202,7 @@ static int decode_group(const unsigned char *codes, size_t num_codes, grpc_slice grpc_base64_decode_with_len(grpc_exec_ctx *exec_ctx, const char *b64, size_t b64_len, int url_safe) { - grpc_slice result = grpc_slice_malloc(b64_len); + grpc_slice result = GRPC_SLICE_MALLOC(b64_len); unsigned char *current = GRPC_SLICE_START_PTR(result); size_t result_size = 0; unsigned char codes[4]; diff --git a/src/core/lib/slice/percent_encoding.c b/src/core/lib/slice/percent_encoding.c index c76c58d371..a77c69763f 100644 --- a/src/core/lib/slice/percent_encoding.c +++ b/src/core/lib/slice/percent_encoding.c @@ -71,7 +71,7 @@ grpc_slice grpc_percent_encode_slice(grpc_slice slice, return grpc_slice_ref_internal(slice); } // second pass: actually encode - grpc_slice out = grpc_slice_malloc(output_length); + grpc_slice out = GRPC_SLICE_MALLOC(output_length); uint8_t *q = GRPC_SLICE_START_PTR(out); for (p = slice_start; p < slice_end; p++) { if (is_unreserved_character(*p, unreserved_bytes)) { @@ -125,7 +125,7 @@ bool grpc_strict_percent_decode_slice(grpc_slice slice_in, return true; } p = GRPC_SLICE_START_PTR(slice_in); - *slice_out = grpc_slice_malloc(out_length); + *slice_out = GRPC_SLICE_MALLOC(out_length); uint8_t *q = GRPC_SLICE_START_PTR(*slice_out); while (p != in_end) { if (*p == '%') { @@ -163,7 +163,7 @@ grpc_slice grpc_permissive_percent_decode_slice(grpc_slice slice_in) { return grpc_slice_ref_internal(slice_in); } p = GRPC_SLICE_START_PTR(slice_in); - grpc_slice out = grpc_slice_malloc(out_length); + grpc_slice out = GRPC_SLICE_MALLOC(out_length); uint8_t *q = GRPC_SLICE_START_PTR(out); while (p != in_end) { if (*p == '%') { diff --git a/src/core/lib/slice/slice.c b/src/core/lib/slice/slice.c index a2fcbbdbdf..b90738fd1a 100644 --- a/src/core/lib/slice/slice.c +++ b/src/core/lib/slice/slice.c @@ -55,6 +55,13 @@ grpc_slice grpc_empty_slice(void) { return out; } +grpc_slice grpc_slice_copy(grpc_slice s) { + grpc_slice out = GRPC_SLICE_MALLOC(GRPC_SLICE_LENGTH(s)); + memcpy(GRPC_SLICE_START_PTR(out), GRPC_SLICE_START_PTR(s), + GRPC_SLICE_LENGTH(s)); + return out; +} + grpc_slice grpc_slice_ref_internal(grpc_slice slice) { if (slice.refcount) { slice.refcount->vtable->ref(slice.refcount); @@ -198,7 +205,7 @@ grpc_slice grpc_slice_new_with_len(void *p, size_t len, grpc_slice grpc_slice_from_copied_buffer(const char *source, size_t length) { if (length == 0) return grpc_empty_slice(); - grpc_slice slice = grpc_slice_malloc(length); + grpc_slice slice = GRPC_SLICE_MALLOC(length); memcpy(GRPC_SLICE_START_PTR(slice), source, length); return slice; } @@ -228,35 +235,42 @@ static const grpc_slice_refcount_vtable malloc_vtable = { malloc_ref, malloc_unref, grpc_slice_default_eq_impl, grpc_slice_default_hash_impl}; +grpc_slice grpc_slice_malloc_large(size_t length) { + grpc_slice slice; + + /* Memory layout used by the slice created here: + + +-----------+----------------------------------------------------------+ + | refcount | bytes | + +-----------+----------------------------------------------------------+ + + refcount is a malloc_refcount + bytes is an array of bytes of the requested length + Both parts are placed in the same allocation returned from gpr_malloc */ + malloc_refcount *rc = gpr_malloc(sizeof(malloc_refcount) + length); + + /* Initial refcount on rc is 1 - and it's up to the caller to release + this reference. */ + gpr_ref_init(&rc->refs, 1); + + rc->base.vtable = &malloc_vtable; + rc->base.sub_refcount = &rc->base; + + /* Build up the slice to be returned. */ + /* The slices refcount points back to the allocated block. */ + slice.refcount = &rc->base; + /* The data bytes are placed immediately after the refcount struct */ + slice.data.refcounted.bytes = (uint8_t *)(rc + 1); + /* And the length of the block is set to the requested length */ + slice.data.refcounted.length = length; + return slice; +} + grpc_slice grpc_slice_malloc(size_t length) { grpc_slice slice; if (length > sizeof(slice.data.inlined.bytes)) { - /* Memory layout used by the slice created here: - - +-----------+----------------------------------------------------------+ - | refcount | bytes | - +-----------+----------------------------------------------------------+ - - refcount is a malloc_refcount - bytes is an array of bytes of the requested length - Both parts are placed in the same allocation returned from gpr_malloc */ - malloc_refcount *rc = gpr_malloc(sizeof(malloc_refcount) + length); - - /* Initial refcount on rc is 1 - and it's up to the caller to release - this reference. */ - gpr_ref_init(&rc->refs, 1); - - rc->base.vtable = &malloc_vtable; - rc->base.sub_refcount = &rc->base; - - /* Build up the slice to be returned. */ - /* The slices refcount points back to the allocated block. */ - slice.refcount = &rc->base; - /* The data bytes are placed immediately after the refcount struct */ - slice.data.refcounted.bytes = (uint8_t *)(rc + 1); - /* And the length of the block is set to the requested length */ - slice.data.refcounted.length = length; + return grpc_slice_malloc_large(length); } else { /* small slice: just inline the data */ slice.refcount = NULL; @@ -306,7 +320,8 @@ grpc_slice grpc_slice_sub(grpc_slice source, size_t begin, size_t end) { return subset; } -grpc_slice grpc_slice_split_tail(grpc_slice *source, size_t split) { +grpc_slice grpc_slice_split_tail_maybe_ref(grpc_slice *source, size_t split, + grpc_slice_ref_whom ref_whom) { grpc_slice tail; if (source->refcount == NULL) { @@ -320,28 +335,46 @@ grpc_slice grpc_slice_split_tail(grpc_slice *source, size_t split) { } else { size_t tail_length = source->data.refcounted.length - split; GPR_ASSERT(source->data.refcounted.length >= split); - if (tail_length < sizeof(tail.data.inlined.bytes)) { + if (tail_length < sizeof(tail.data.inlined.bytes) && + ref_whom != GRPC_SLICE_REF_TAIL) { /* Copy out the bytes - it'll be cheaper than refcounting */ tail.refcount = NULL; tail.data.inlined.length = (uint8_t)tail_length; memcpy(tail.data.inlined.bytes, source->data.refcounted.bytes + split, tail_length); + source->refcount = source->refcount->sub_refcount; } else { /* Build the result */ - tail.refcount = source->refcount->sub_refcount; - /* Bump the refcount */ - tail.refcount->vtable->ref(tail.refcount); + switch (ref_whom) { + case GRPC_SLICE_REF_TAIL: + tail.refcount = source->refcount->sub_refcount; + source->refcount = &noop_refcount; + break; + case GRPC_SLICE_REF_HEAD: + tail.refcount = &noop_refcount; + source->refcount = source->refcount->sub_refcount; + break; + case GRPC_SLICE_REF_BOTH: + tail.refcount = source->refcount->sub_refcount; + source->refcount = source->refcount->sub_refcount; + /* Bump the refcount */ + tail.refcount->vtable->ref(tail.refcount); + break; + } /* Point into the source array */ tail.data.refcounted.bytes = source->data.refcounted.bytes + split; tail.data.refcounted.length = tail_length; } - source->refcount = source->refcount->sub_refcount; source->data.refcounted.length = split; } return tail; } +grpc_slice grpc_slice_split_tail(grpc_slice *source, size_t split) { + return grpc_slice_split_tail_maybe_ref(source, split, GRPC_SLICE_REF_BOTH); +} + grpc_slice grpc_slice_split_head(grpc_slice *source, size_t split) { grpc_slice head; @@ -459,7 +492,7 @@ int grpc_slice_slice(grpc_slice haystack, grpc_slice needle) { } grpc_slice grpc_slice_dup(grpc_slice a) { - grpc_slice copy = grpc_slice_malloc(GRPC_SLICE_LENGTH(a)); + grpc_slice copy = GRPC_SLICE_MALLOC(GRPC_SLICE_LENGTH(a)); memcpy(GRPC_SLICE_START_PTR(copy), GRPC_SLICE_START_PTR(a), GRPC_SLICE_LENGTH(a)); return copy; diff --git a/src/core/lib/slice/slice_buffer.c b/src/core/lib/slice/slice_buffer.c index c96b9c3b28..e8d41ca0f7 100644 --- a/src/core/lib/slice/slice_buffer.c +++ b/src/core/lib/slice/slice_buffer.c @@ -253,16 +253,18 @@ void grpc_slice_buffer_move_into(grpc_slice_buffer *src, src->length = 0; } -void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n, - grpc_slice_buffer *dst) { - size_t output_len = dst->length + n; - size_t new_input_len = src->length - n; +static void slice_buffer_move_first_maybe_ref(grpc_slice_buffer *src, size_t n, + grpc_slice_buffer *dst, + bool incref) { GPR_ASSERT(src->length >= n); if (src->length == n) { grpc_slice_buffer_move_into(src, dst); return; } + size_t output_len = dst->length + n; + size_t new_input_len = src->length - n; + while (src->count > 0) { grpc_slice slice = grpc_slice_buffer_take_first(src); size_t slice_len = GRPC_SLICE_LENGTH(slice); @@ -272,11 +274,18 @@ void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n, } else if (n == slice_len) { grpc_slice_buffer_add(dst, slice); break; - } else { /* n < slice_len */ - grpc_slice_buffer_undo_take_first(src, grpc_slice_split_tail(&slice, n)); + } else if (incref) { /* n < slice_len */ + grpc_slice_buffer_undo_take_first( + src, grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_BOTH)); GPR_ASSERT(GRPC_SLICE_LENGTH(slice) == n); grpc_slice_buffer_add(dst, slice); break; + } else { /* n < slice_len */ + grpc_slice_buffer_undo_take_first( + src, grpc_slice_split_tail_maybe_ref(&slice, n, GRPC_SLICE_REF_TAIL)); + GPR_ASSERT(GRPC_SLICE_LENGTH(slice) == n); + grpc_slice_buffer_add_indexed(dst, slice); + break; } } GPR_ASSERT(dst->length == output_len); @@ -284,6 +293,16 @@ void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n, GPR_ASSERT(src->count > 0); } +void grpc_slice_buffer_move_first(grpc_slice_buffer *src, size_t n, + grpc_slice_buffer *dst) { + slice_buffer_move_first_maybe_ref(src, n, dst, true); +} + +void grpc_slice_buffer_move_first_no_ref(grpc_slice_buffer *src, size_t n, + grpc_slice_buffer *dst) { + slice_buffer_move_first_maybe_ref(src, n, dst, false); +} + void grpc_slice_buffer_move_first_into_buffer(grpc_exec_ctx *exec_ctx, grpc_slice_buffer *src, size_t n, void *dst) { diff --git a/src/core/lib/slice/slice_hash_table.c b/src/core/lib/slice/slice_hash_table.c index 219567f36f..444f22aa19 100644 --- a/src/core/lib/slice/slice_hash_table.c +++ b/src/core/lib/slice/slice_hash_table.c @@ -42,56 +42,47 @@ struct grpc_slice_hash_table { gpr_refcount refs; + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value); size_t size; + size_t max_num_probes; grpc_slice_hash_table_entry* entries; }; static bool is_empty(grpc_slice_hash_table_entry* entry) { - return entry->vtable == NULL; + return entry->value == NULL; } -// Helper function for insert and get operations that performs quadratic -// probing (https://en.wikipedia.org/wiki/Quadratic_probing). -static size_t grpc_slice_hash_table_find_index( - const grpc_slice_hash_table* table, const grpc_slice key, bool find_empty) { - size_t hash = grpc_slice_hash(key); - for (size_t i = 0; i < table->size; ++i) { - const size_t idx = (hash + i * i) % table->size; +static void grpc_slice_hash_table_add(grpc_slice_hash_table* table, + grpc_slice key, void* value) { + GPR_ASSERT(value != NULL); + const size_t hash = grpc_slice_hash(key); + for (size_t offset = 0; offset < table->size; ++offset) { + const size_t idx = (hash + offset) % table->size; if (is_empty(&table->entries[idx])) { - return find_empty ? idx : table->size; - } - if (grpc_slice_eq(table->entries[idx].key, key)) { - return idx; + table->entries[idx].key = key; + table->entries[idx].value = value; + // Keep track of the maximum number of probes needed, since this + // provides an upper bound for lookups. + if (offset > table->max_num_probes) table->max_num_probes = offset; + return; } } - return table->size; // Not found. -} - -static void grpc_slice_hash_table_add( - grpc_slice_hash_table* table, grpc_slice key, void* value, - const grpc_slice_hash_table_vtable* vtable) { - GPR_ASSERT(value != NULL); - const size_t idx = - grpc_slice_hash_table_find_index(table, key, true /* find_empty */); - GPR_ASSERT(idx != table->size); // Table should never be full. - grpc_slice_hash_table_entry* entry = &table->entries[idx]; - entry->key = grpc_slice_ref_internal(key); - entry->value = vtable->copy_value(value); - entry->vtable = vtable; + GPR_ASSERT(false); // Table should never be full. } grpc_slice_hash_table* grpc_slice_hash_table_create( - size_t num_entries, grpc_slice_hash_table_entry* entries) { + size_t num_entries, grpc_slice_hash_table_entry* entries, + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) { grpc_slice_hash_table* table = gpr_zalloc(sizeof(*table)); gpr_ref_init(&table->refs, 1); - // Quadratic probing gets best performance when the table is no more - // than half full. + table->destroy_value = destroy_value; + // Keep load factor low to improve performance of lookups. table->size = num_entries * 2; const size_t entry_size = sizeof(grpc_slice_hash_table_entry) * table->size; table->entries = gpr_zalloc(entry_size); for (size_t i = 0; i < num_entries; ++i) { grpc_slice_hash_table_entry* entry = &entries[i]; - grpc_slice_hash_table_add(table, entry->key, entry->value, entry->vtable); + grpc_slice_hash_table_add(table, entry->key, entry->value); } return table; } @@ -108,7 +99,7 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx* exec_ctx, grpc_slice_hash_table_entry* entry = &table->entries[i]; if (!is_empty(entry)) { grpc_slice_unref_internal(exec_ctx, entry->key); - entry->vtable->destroy_value(exec_ctx, entry->value); + table->destroy_value(exec_ctx, entry->value); } } gpr_free(table->entries); @@ -118,8 +109,15 @@ void grpc_slice_hash_table_unref(grpc_exec_ctx* exec_ctx, void* grpc_slice_hash_table_get(const grpc_slice_hash_table* table, const grpc_slice key) { - const size_t idx = - grpc_slice_hash_table_find_index(table, key, false /* find_empty */); - if (idx == table->size) return NULL; // Not found. - return table->entries[idx].value; + const size_t hash = grpc_slice_hash(key); + // We cap the number of probes at the max number recorded when + // populating the table. + for (size_t offset = 0; offset <= table->max_num_probes; ++offset) { + const size_t idx = (hash + offset) % table->size; + if (is_empty(&table->entries[idx])) break; + if (grpc_slice_eq(table->entries[idx].key, key)) { + return table->entries[idx].value; + } + } + return NULL; // Not found. } diff --git a/src/core/lib/slice/slice_hash_table.h b/src/core/lib/slice/slice_hash_table.h index d0c27122d7..1e61c5eb11 100644 --- a/src/core/lib/slice/slice_hash_table.h +++ b/src/core/lib/slice/slice_hash_table.h @@ -37,33 +37,28 @@ /** Hash table implementation. * * This implementation uses open addressing - * (https://en.wikipedia.org/wiki/Open_addressing) with quadratic - * probing (https://en.wikipedia.org/wiki/Quadratic_probing). + * (https://en.wikipedia.org/wiki/Open_addressing) with linear + * probing (https://en.wikipedia.org/wiki/Linear_probing). * * The keys are \a grpc_slice objects. The values are arbitrary pointers - * with a common vtable. + * with a common destroy function. * * Hash tables are intentionally immutable, to avoid the need for locking. */ typedef struct grpc_slice_hash_table grpc_slice_hash_table; -typedef struct grpc_slice_hash_table_vtable { - void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value); - void *(*copy_value)(void *value); -} grpc_slice_hash_table_vtable; - typedef struct grpc_slice_hash_table_entry { grpc_slice key; void *value; /* Must not be NULL. */ - const grpc_slice_hash_table_vtable *vtable; } grpc_slice_hash_table_entry; /** Creates a new hash table of containing \a entries, which is an array - of length \a num_entries. - Creates its own copy of all keys and values from \a entries. */ + of length \a num_entries. Takes ownership of all keys and values in + \a entries. Values will be cleaned up via \a destroy_value(). */ grpc_slice_hash_table *grpc_slice_hash_table_create( - size_t num_entries, grpc_slice_hash_table_entry *entries); + size_t num_entries, grpc_slice_hash_table_entry *entries, + void (*destroy_value)(grpc_exec_ctx *exec_ctx, void *value)); grpc_slice_hash_table *grpc_slice_hash_table_ref(grpc_slice_hash_table *table); void grpc_slice_hash_table_unref(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/support/atomic.h b/src/core/lib/support/atomic.h new file mode 100644 index 0000000000..2226189b68 --- /dev/null +++ b/src/core/lib/support/atomic.h @@ -0,0 +1,45 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_ATOMIC_H +#define GRPC_CORE_LIB_SUPPORT_ATOMIC_H + +#include <grpc/support/port_platform.h> + +#ifdef GPR_HAS_CXX11_ATOMIC +#include "src/core/lib/support/atomic_with_std.h" +#else +#include "src/core/lib/support/atomic_with_atm.h" +#endif + +#endif /* GRPC_CORE_LIB_SUPPORT_ATOMIC_H */ diff --git a/src/core/lib/support/atomic_with_atm.h b/src/core/lib/support/atomic_with_atm.h new file mode 100644 index 0000000000..55727f1dee --- /dev/null +++ b/src/core/lib/support/atomic_with_atm.h @@ -0,0 +1,70 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_ATOMIC_WITH_ATM_H +#define GRPC_CORE_LIB_SUPPORT_ATOMIC_WITH_ATM_H + +#include <grpc/support/atm.h> + +namespace grpc_core { + +enum MemoryOrderRelaxed { memory_order_relaxed }; + +template <class T> +class atomic; + +template <> +class atomic<bool> { + public: + atomic() { gpr_atm_no_barrier_store(&x_, static_cast<gpr_atm>(false)); } + explicit atomic(bool x) { + gpr_atm_no_barrier_store(&x_, static_cast<gpr_atm>(x)); + } + + bool compare_exchange_strong(bool& expected, bool update, MemoryOrderRelaxed, + MemoryOrderRelaxed) { + if (!gpr_atm_no_barrier_cas(&x_, static_cast<gpr_atm>(expected), + static_cast<gpr_atm>(update))) { + expected = gpr_atm_no_barrier_load(&x_) != 0; + return false; + } + return true; + } + + private: + gpr_atm x_; +}; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_SUPPORT_ATOMIC_WITH_ATM_H */ diff --git a/src/core/lib/support/atomic_with_std.h b/src/core/lib/support/atomic_with_std.h new file mode 100644 index 0000000000..7e9c19efe8 --- /dev/null +++ b/src/core/lib/support/atomic_with_std.h @@ -0,0 +1,48 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_ATOMIC_WITH_STD_H +#define GRPC_CORE_LIB_SUPPORT_ATOMIC_WITH_STD_H + +#include <atomic> + +namespace grpc_core { + +template <class T> +using atomic = std::atomic<T>; + +typedef std::memory_order memory_order; + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_SUPPORT_ATOMIC_WITH_STD_H */ diff --git a/src/core/lib/support/avl.c b/src/core/lib/support/avl.c index acf8fd5a55..ffa10c1e4f 100644 --- a/src/core/lib/support/avl.c +++ b/src/core/lib/support/avl.c @@ -205,8 +205,8 @@ static gpr_avl_node *rebalance(const gpr_avl_vtable *vtable, void *key, } } -static gpr_avl_node *add(const gpr_avl_vtable *vtable, gpr_avl_node *node, - void *key, void *value) { +static gpr_avl_node *add_key(const gpr_avl_vtable *vtable, gpr_avl_node *node, + void *key, void *value) { long cmp; if (node == NULL) { return new_node(key, value, NULL, NULL); @@ -217,17 +217,17 @@ static gpr_avl_node *add(const gpr_avl_vtable *vtable, gpr_avl_node *node, } else if (cmp > 0) { return rebalance( vtable, vtable->copy_key(node->key), vtable->copy_value(node->value), - add(vtable, node->left, key, value), ref_node(node->right)); + add_key(vtable, node->left, key, value), ref_node(node->right)); } else { return rebalance(vtable, vtable->copy_key(node->key), vtable->copy_value(node->value), ref_node(node->left), - add(vtable, node->right, key, value)); + add_key(vtable, node->right, key, value)); } } gpr_avl gpr_avl_add(gpr_avl avl, void *key, void *value) { gpr_avl_node *old_root = avl.root; - avl.root = add(avl.vtable, avl.root, key, value); + avl.root = add_key(avl.vtable, avl.root, key, value); assert_invariants(avl.root); unref_node(avl.vtable, old_root); return avl; @@ -247,8 +247,8 @@ static gpr_avl_node *in_order_tail(gpr_avl_node *node) { return node; } -static gpr_avl_node *remove(const gpr_avl_vtable *vtable, gpr_avl_node *node, - void *key) { +static gpr_avl_node *remove_key(const gpr_avl_vtable *vtable, + gpr_avl_node *node, void *key) { long cmp; if (node == NULL) { return NULL; @@ -263,27 +263,27 @@ static gpr_avl_node *remove(const gpr_avl_vtable *vtable, gpr_avl_node *node, gpr_avl_node *h = in_order_head(node->right); return rebalance(vtable, vtable->copy_key(h->key), vtable->copy_value(h->value), ref_node(node->left), - remove(vtable, node->right, h->key)); + remove_key(vtable, node->right, h->key)); } else { gpr_avl_node *h = in_order_tail(node->left); return rebalance( vtable, vtable->copy_key(h->key), vtable->copy_value(h->value), - remove(vtable, node->left, h->key), ref_node(node->right)); + remove_key(vtable, node->left, h->key), ref_node(node->right)); } } else if (cmp > 0) { - return rebalance(vtable, vtable->copy_key(node->key), - vtable->copy_value(node->value), - remove(vtable, node->left, key), ref_node(node->right)); + return rebalance( + vtable, vtable->copy_key(node->key), vtable->copy_value(node->value), + remove_key(vtable, node->left, key), ref_node(node->right)); } else { return rebalance(vtable, vtable->copy_key(node->key), vtable->copy_value(node->value), ref_node(node->left), - remove(vtable, node->right, key)); + remove_key(vtable, node->right, key)); } } gpr_avl gpr_avl_remove(gpr_avl avl, void *key) { gpr_avl_node *old_root = avl.root; - avl.root = remove(avl.vtable, avl.root, key); + avl.root = remove_key(avl.vtable, avl.root, key); assert_invariants(avl.root); unref_node(avl.vtable, old_root); return avl; diff --git a/src/core/lib/support/cpu_linux.c b/src/core/lib/support/cpu_linux.c index 1e50f59823..b826dde160 100644 --- a/src/core/lib/support/cpu_linux.c +++ b/src/core/lib/support/cpu_linux.c @@ -67,16 +67,16 @@ unsigned gpr_cpu_num_cores(void) { } unsigned gpr_cpu_current_cpu(void) { -#ifdef __GLIBC__ +#ifdef GPR_MUSL_LIBC_COMPAT + // sched_getcpu() is undefined on musl + return 0; +#else int cpu = sched_getcpu(); if (cpu < 0) { gpr_log(GPR_ERROR, "Error determining current CPU: %s\n", strerror(errno)); return 0; } return (unsigned)cpu; -#else - // sched_getcpu() is undefined on musl - return 0; #endif } diff --git a/src/core/lib/support/memory.h b/src/core/lib/support/memory.h new file mode 100644 index 0000000000..6eff94eff7 --- /dev/null +++ b/src/core/lib/support/memory.h @@ -0,0 +1,74 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_MEMORY_H +#define GRPC_CORE_LIB_SUPPORT_MEMORY_H + +#include <grpc/support/alloc.h> + +#include <memory> +#include <utility> + +namespace grpc_core { + +// Alternative to new, since we cannot use it (for fear of libstdc++) +template <typename T, typename... Args> +inline T* New(Args&&... args) { + void* p = gpr_malloc(sizeof(T)); + return new (p) T(std::forward<Args>(args)...); +} + +// Alternative to delete, since we cannot use it (for fear of libstdc++) +template <typename T> +inline void Delete(T* p) { + p->~T(); + gpr_free(p); +} + +template <typename T> +class DefaultDelete { + public: + void operator()(T* p) { Delete(p); } +}; + +template <typename T, typename Deleter = DefaultDelete<T>> +using UniquePtr = std::unique_ptr<T, Deleter>; + +template <typename T, typename... Args> +inline UniquePtr<T> MakeUnique(Args&&... args) { + return UniquePtr<T>(New<T>(std::forward<Args>(args)...)); +} + +} // namespace grpc_core + +#endif /* GRPC_CORE_LIB_SUPPORT_MEMORY_H */ diff --git a/src/core/lib/support/stack_lockfree.c b/src/core/lib/support/stack_lockfree.c index c481a3e0dc..dfbd3fb125 100644 --- a/src/core/lib/support/stack_lockfree.c +++ b/src/core/lib/support/stack_lockfree.c @@ -76,13 +76,13 @@ struct gpr_stack_lockfree { gpr_stack_lockfree *gpr_stack_lockfree_create(size_t entries) { gpr_stack_lockfree *stack; - stack = gpr_malloc(sizeof(*stack)); + stack = (gpr_stack_lockfree *)gpr_malloc(sizeof(*stack)); /* Since we only allocate 16 bits to represent an entry number, * make sure that we are within the desired range */ /* Reserve the highest entry number as a dummy */ GPR_ASSERT(entries < INVALID_ENTRY_INDEX); - stack->entries = gpr_malloc_aligned(entries * sizeof(stack->entries[0]), - ENTRY_ALIGNMENT_BITS); + stack->entries = (lockfree_node *)gpr_malloc_aligned( + entries * sizeof(stack->entries[0]), ENTRY_ALIGNMENT_BITS); /* Clear out all entries */ memset(stack->entries, 0, entries * sizeof(stack->entries[0])); memset(&stack->head, 0, sizeof(stack->head)); diff --git a/src/core/lib/support/time_posix.c b/src/core/lib/support/time_posix.c index a69c501e9f..9bfec7782a 100644 --- a/src/core/lib/support/time_posix.c +++ b/src/core/lib/support/time_posix.c @@ -42,6 +42,7 @@ #ifdef __linux__ #include <sys/syscall.h> #endif +#include <grpc/support/atm.h> #include <grpc/support/log.h> #include <grpc/support/time.h> #include "src/core/lib/support/block_annotate.h" @@ -144,7 +145,14 @@ static gpr_timespec now_impl(gpr_clock_type clock) { gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type) = now_impl; +#ifdef GPR_LOW_LEVEL_COUNTERS +gpr_atm gpr_now_call_count; +#endif + gpr_timespec gpr_now(gpr_clock_type clock_type) { +#ifdef GPR_LOW_LEVEL_COUNTERS + __atomic_fetch_add(&gpr_now_call_count, 1, __ATOMIC_RELAXED); +#endif return gpr_now_impl(clock_type); } diff --git a/src/core/lib/support/tmpfile_posix.c b/src/core/lib/support/tmpfile_posix.c index 0cd4bb6fc3..5771c158e0 100644 --- a/src/core/lib/support/tmpfile_posix.c +++ b/src/core/lib/support/tmpfile_posix.c @@ -50,34 +50,34 @@ FILE *gpr_tmpfile(const char *prefix, char **tmp_filename) { FILE *result = NULL; - char *template; + char *filename_template; int fd; if (tmp_filename != NULL) *tmp_filename = NULL; - gpr_asprintf(&template, "/tmp/%s_XXXXXX", prefix); - GPR_ASSERT(template != NULL); + gpr_asprintf(&filename_template, "/tmp/%s_XXXXXX", prefix); + GPR_ASSERT(filename_template != NULL); - fd = mkstemp(template); + fd = mkstemp(filename_template); if (fd == -1) { - gpr_log(GPR_ERROR, "mkstemp failed for template %s with error %s.", - template, strerror(errno)); + gpr_log(GPR_ERROR, "mkstemp failed for filename_template %s with error %s.", + filename_template, strerror(errno)); goto end; } result = fdopen(fd, "w+"); if (result == NULL) { gpr_log(GPR_ERROR, "Could not open file %s from fd %d (error = %s).", - template, fd, strerror(errno)); - unlink(template); + filename_template, fd, strerror(errno)); + unlink(filename_template); close(fd); goto end; } end: if (result != NULL && tmp_filename != NULL) { - *tmp_filename = template; + *tmp_filename = filename_template; } else { - gpr_free(template); + gpr_free(filename_template); } return result; } diff --git a/src/core/lib/support/wrap_memcpy.c b/src/core/lib/support/wrap_memcpy.c index 050cc6db5e..deb8d6b198 100644 --- a/src/core/lib/support/wrap_memcpy.c +++ b/src/core/lib/support/wrap_memcpy.c @@ -31,6 +31,8 @@ * */ +#include <grpc/support/port_platform.h> + #include <string.h> /* Provide a wrapped memcpy for targets that need to be backwards @@ -40,7 +42,7 @@ */ #ifdef __linux__ -#if defined(__x86_64__) && defined(__GNU_LIBRARY__) +#if defined(__x86_64__) && !defined(GPR_MUSL_LIBC_COMPAT) __asm__(".symver memcpy,memcpy@GLIBC_2.2.5"); void *__wrap_memcpy(void *destination, const void *source, size_t num) { return memcpy(destination, source, num); diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c index 1a6ccdaddb..539b014278 100644 --- a/src/core/lib/surface/byte_buffer_reader.c +++ b/src/core/lib/surface/byte_buffer_reader.c @@ -124,7 +124,7 @@ grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) { grpc_slice in_slice; size_t bytes_read = 0; const size_t input_size = grpc_byte_buffer_length(reader->buffer_out); - grpc_slice out_slice = grpc_slice_malloc(input_size); + grpc_slice out_slice = GRPC_SLICE_MALLOC(input_size); uint8_t *const outbuf = GRPC_SLICE_START_PTR(out_slice); /* just an alias */ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 9aa457d792..7525806583 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -245,6 +245,7 @@ struct grpc_call { }; int grpc_call_error_trace = 0; +int grpc_compression_trace = 0; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1) diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 35e9f7eb30..eae3f103b1 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -60,13 +60,154 @@ typedef struct { void *tag; } plucker; +typedef struct { + bool can_get_pollset; + bool can_listen; + size_t (*size)(void); + void (*init)(grpc_pollset *pollset, gpr_mu **mu); + grpc_error *(*kick)(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); + grpc_error *(*work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline); + void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure); + void (*destroy)(grpc_pollset *pollset); +} cq_poller_vtable; + +typedef struct non_polling_worker { + gpr_cv cv; + bool kicked; + struct non_polling_worker *next; + struct non_polling_worker *prev; +} non_polling_worker; + +typedef struct { + gpr_mu mu; + non_polling_worker *root; + grpc_closure *shutdown; +} non_polling_poller; + +static size_t non_polling_poller_size(void) { + return sizeof(non_polling_poller); +} + +static void non_polling_poller_init(grpc_pollset *pollset, gpr_mu **mu) { + non_polling_poller *npp = (non_polling_poller *)pollset; + gpr_mu_init(&npp->mu); + *mu = &npp->mu; +} + +static void non_polling_poller_destroy(grpc_pollset *pollset) { + non_polling_poller *npp = (non_polling_poller *)pollset; + gpr_mu_destroy(&npp->mu); +} + +static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker **worker, + gpr_timespec now, + gpr_timespec deadline) { + non_polling_poller *npp = (non_polling_poller *)pollset; + if (npp->shutdown) return GRPC_ERROR_NONE; + non_polling_worker w; + gpr_cv_init(&w.cv); + if (worker != NULL) *worker = (grpc_pollset_worker *)&w; + if (npp->root == NULL) { + npp->root = w.next = w.prev = &w; + } else { + w.next = npp->root; + w.prev = w.next->prev; + w.next->prev = w.prev->next = &w; + } + w.kicked = false; + while (!npp->shutdown && !w.kicked && !gpr_cv_wait(&w.cv, &npp->mu, deadline)) + ; + if (&w == npp->root) { + npp->root = w.next; + if (&w == npp->root) { + if (npp->shutdown) { + grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE); + } + npp->root = NULL; + } + } + w.next->prev = w.prev; + w.prev->next = w.next; + gpr_cv_destroy(&w.cv); + if (worker != NULL) *worker = NULL; + return GRPC_ERROR_NONE; +} + +static grpc_error *non_polling_poller_kick( + grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + non_polling_poller *p = (non_polling_poller *)pollset; + if (specific_worker == NULL) specific_worker = (grpc_pollset_worker *)p->root; + if (specific_worker != NULL) { + non_polling_worker *w = (non_polling_worker *)specific_worker; + if (!w->kicked) { + w->kicked = true; + gpr_cv_signal(&w->cv); + } + } + return GRPC_ERROR_NONE; +} + +static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_closure *closure) { + non_polling_poller *p = (non_polling_poller *)pollset; + GPR_ASSERT(closure != NULL); + p->shutdown = closure; + if (p->root == NULL) { + grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + } else { + non_polling_worker *w = p->root; + do { + gpr_cv_signal(&w->cv); + w = w->next; + } while (w != p->root); + } +} + +static const cq_poller_vtable g_poller_vtable_by_poller_type[] = { + /* GRPC_CQ_DEFAULT_POLLING */ + {.can_get_pollset = true, + .can_listen = true, + .size = grpc_pollset_size, + .init = grpc_pollset_init, + .kick = grpc_pollset_kick, + .work = grpc_pollset_work, + .shutdown = grpc_pollset_shutdown, + .destroy = grpc_pollset_destroy}, + /* GRPC_CQ_NON_LISTENING */ + {.can_get_pollset = true, + .can_listen = false, + .size = grpc_pollset_size, + .init = grpc_pollset_init, + .kick = grpc_pollset_kick, + .work = grpc_pollset_work, + .shutdown = grpc_pollset_shutdown, + .destroy = grpc_pollset_destroy}, + /* GRPC_CQ_NON_POLLING */ + {.can_get_pollset = false, + .can_listen = false, + .size = non_polling_poller_size, + .init = non_polling_poller_init, + .kick = non_polling_poller_kick, + .work = non_polling_poller_work, + .shutdown = non_polling_poller_shutdown, + .destroy = non_polling_poller_destroy}, +}; + /* Completion queue structure */ struct grpc_completion_queue { /** owned by pollset */ gpr_mu *mu; grpc_cq_completion_type completion_type; - grpc_cq_polling_type polling_type; + + const cq_poller_vtable *poller_vtable; /** completed events */ grpc_cq_completion completed_head; @@ -127,15 +268,18 @@ grpc_completion_queue *grpc_completion_queue_create_internal( "polling_type=%d)", 2, (completion_type, polling_type)); - cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); - grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); + const cq_poller_vtable *poller_vtable = + &g_poller_vtable_by_poller_type[polling_type]; + + cc = gpr_zalloc(sizeof(grpc_completion_queue) + poller_vtable->size()); + poller_vtable->init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; cc->outstanding_tag_capacity = 0; #endif cc->completion_type = completion_type; - cc->polling_type = polling_type; + cc->poller_vtable = poller_vtable; /* Initial ref is dropped by grpc_completion_queue_shutdown */ gpr_ref_init(&cc->pending_events, 1); @@ -164,10 +308,6 @@ grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc) { return cc->completion_type; } -grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc) { - return cc->polling_type; -} - #ifdef GRPC_CQ_REF_COUNT_DEBUG void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, const char *file, int line) { @@ -195,7 +335,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_destroy(POLLSET_FROM_CQ(cc)); + cc->poller_vtable->destroy(POLLSET_FROM_CQ(cc)); #ifndef NDEBUG gpr_free(cc->outstanding_tags); #endif @@ -280,7 +420,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, } } grpc_error *kick_error = - grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + cc->poller_vtable->kick(POLLSET_FROM_CQ(cc), pluck_worker); gpr_mu_unlock(cc->mu); if (kick_error != GRPC_ERROR_NONE) { const char *msg = grpc_error_string(kick_error); @@ -295,8 +435,8 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cc->poller_vtable->shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); gpr_mu_unlock(cc->mu); } @@ -452,8 +592,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(cc->mu); continue; } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), NULL, - now, iteration_deadline); + grpc_error *err = cc->poller_vtable->work(&exec_ctx, POLLSET_FROM_CQ(cc), + NULL, now, iteration_deadline); if (err != GRPC_ERROR_NONE) { gpr_mu_unlock(cc->mu); const char *msg = grpc_error_string(err); @@ -644,8 +784,8 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(cc->mu); } else { - grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), - &worker, now, iteration_deadline); + grpc_error *err = cc->poller_vtable->work( + &exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); if (err != GRPC_ERROR_NONE) { del_plucker(cc, tag, &worker); gpr_mu_unlock(cc->mu); @@ -689,8 +829,8 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->pending_events)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), - &cc->pollset_shutdown_done); + cc->poller_vtable->shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); } gpr_mu_unlock(cc->mu); grpc_exec_ctx_finish(&exec_ctx); @@ -706,7 +846,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return POLLSET_FROM_CQ(cc); + return cc->poller_vtable->can_get_pollset ? POLLSET_FROM_CQ(cc) : NULL; } grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps) { @@ -727,4 +867,10 @@ bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc) { void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } -int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; } +bool grpc_cq_is_server_cq(grpc_completion_queue *cc) { + return cc->is_server_cq; +} + +bool grpc_cq_can_listen(grpc_completion_queue *cc) { + return cc->poller_vtable->can_listen; +} diff --git a/src/core/lib/surface/completion_queue.h b/src/core/lib/surface/completion_queue.h index 1ff3d64293..a932087939 100644 --- a/src/core/lib/surface/completion_queue.h +++ b/src/core/lib/surface/completion_queue.h @@ -94,13 +94,11 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); grpc_completion_queue *grpc_cq_from_pollset(grpc_pollset *ps); -void grpc_cq_mark_non_listening_server_cq(grpc_completion_queue *cc); -bool grpc_cq_is_non_listening_server_cq(grpc_completion_queue *cc); void grpc_cq_mark_server_cq(grpc_completion_queue *cc); -int grpc_cq_is_server_cq(grpc_completion_queue *cc); +bool grpc_cq_is_server_cq(grpc_completion_queue *cc); +bool grpc_cq_can_listen(grpc_completion_queue *cc); grpc_cq_completion_type grpc_get_cq_completion_type(grpc_completion_queue *cc); -grpc_cq_polling_type grpc_get_cq_polling_type(grpc_completion_queue *cc); grpc_completion_queue *grpc_completion_queue_create_internal( grpc_cq_completion_type completion_type, grpc_cq_polling_type polling_type); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.cc index 82428c42c0..88f4eaac08 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.cc @@ -31,39 +31,50 @@ * */ -#include "src/core/lib/surface/lame_client.h" - #include <grpc/grpc.h> #include <string.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> + +#include "src/core/lib/support/atomic.h" + +extern "C" { #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" +#include "src/core/lib/surface/lame_client.h" #include "src/core/lib/transport/static_metadata.h" +} -typedef struct { +namespace grpc_core { + +namespace { + +struct CallData { grpc_linked_mdelem status; grpc_linked_mdelem details; - gpr_atm filled_metadata; -} call_data; + grpc_core::atomic<bool> filled_metadata; +}; -typedef struct { +struct ChannelData { grpc_status_code error_code; const char *error_message; -} channel_data; +}; static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_metadata_batch *mdb) { - call_data *calld = elem->call_data; - if (!gpr_atm_no_barrier_cas(&calld->filled_metadata, 0, 1)) { + CallData *calld = static_cast<CallData *>(elem->call_data); + bool expected = false; + if (!calld->filled_metadata.compare_exchange_strong( + expected, true, grpc_core::memory_order_relaxed, + grpc_core::memory_order_relaxed)) { return; } - channel_data *chand = elem->channel_data; + ChannelData *chand = static_cast<ChannelData *>(elem->channel_data); char tmp[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(chand->error_code, tmp); calld->status.md = grpc_mdelem_from_slices( @@ -83,7 +94,6 @@ static void fill_metadata(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, static void lame_start_transport_stream_op_batch( grpc_exec_ctx *exec_ctx, grpc_call_element *elem, grpc_transport_stream_op_batch *op) { - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); if (op->recv_initial_metadata) { fill_metadata(exec_ctx, elem, op->payload->recv_initial_metadata.recv_initial_metadata); @@ -127,8 +137,6 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - call_data *calld = elem->call_data; - gpr_atm_no_barrier_store(&calld->filled_metadata, 0); return GRPC_ERROR_NONE; } @@ -149,18 +157,22 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) {} -const grpc_channel_filter grpc_lame_filter = { - lame_start_transport_stream_op_batch, - lame_start_transport_op, - sizeof(call_data), - init_call_elem, +} // namespace + +} // namespace grpc_core + +extern "C" const grpc_channel_filter grpc_lame_filter = { + grpc_core::lame_start_transport_stream_op_batch, + grpc_core::lame_start_transport_op, + sizeof(grpc_core::CallData), + grpc_core::init_call_elem, grpc_call_stack_ignore_set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - lame_get_peer, - lame_get_channel_info, + grpc_core::destroy_call_elem, + sizeof(grpc_core::ChannelData), + grpc_core::init_channel_elem, + grpc_core::destroy_channel_elem, + grpc_core::lame_get_peer, + grpc_core::lame_get_channel_info, "lame-client", }; @@ -171,7 +183,6 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, const char *error_message) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_channel_element *elem; - channel_data *chand; grpc_channel *channel = grpc_channel_create(&exec_ctx, target, NULL, GRPC_CLIENT_LAME_CHANNEL, NULL); elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0); @@ -180,7 +191,7 @@ grpc_channel *grpc_lame_client_channel_create(const char *target, "error_message=%s)", 3, (target, (int)error_code, error_message)); GPR_ASSERT(elem->filter == &grpc_lame_filter); - chand = (channel_data *)elem->channel_data; + auto chand = static_cast<grpc_core::ChannelData *>(elem->channel_data); chand->error_code = error_code; chand->error_message = error_message; grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 26c81e9aca..934ca0431a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -981,7 +981,7 @@ const grpc_channel_filter grpc_server_top_filter = { static void register_completion_queue(grpc_server *server, grpc_completion_queue *cq, - bool is_non_listening, void *reserved) { + void *reserved) { size_t i, n; GPR_ASSERT(!reserved); for (i = 0; i < server->cq_count; i++) { @@ -990,10 +990,6 @@ static void register_completion_queue(grpc_server *server, grpc_cq_mark_server_cq(cq); - if (is_non_listening) { - grpc_cq_mark_non_listening_server_cq(cq); - } - GRPC_CQ_INTERNAL_REF(cq, "server"); n = server->cq_count++; server->cqs = gpr_realloc(server->cqs, @@ -1016,16 +1012,7 @@ void grpc_server_register_completion_queue(grpc_server *server, calls grpc_completion_queue_pluck() on server completion queues */ } - register_completion_queue(server, cq, false, reserved); -} - -void grpc_server_register_non_listening_completion_queue( - grpc_server *server, grpc_completion_queue *cq, void *reserved) { - GRPC_API_TRACE( - "grpc_server_register_non_listening_completion_queue(server=%p, cq=%p, " - "reserved=%p)", - 3, (server, cq, reserved)); - register_completion_queue(server, cq, true, reserved); + register_completion_queue(server, cq, reserved); } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { @@ -1033,8 +1020,6 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { grpc_server *server = gpr_zalloc(sizeof(grpc_server)); - GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); - gpr_mu_init(&server->mu_global); gpr_mu_init(&server->mu_call); gpr_cv_init(&server->starting_cv); @@ -1123,7 +1108,7 @@ void grpc_server_start(grpc_server *server) { server->requested_calls_per_cq = gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { - if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { + if (grpc_cq_can_listen(server->cqs[i])) { server->pollsets[server->pollset_count++] = grpc_cq_pollset(server->cqs[i]); } diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 3793845559..cddc595e4c 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -38,4 +38,4 @@ const char *grpc_version_string(void) { return "4.0.0-dev"; } -const char *grpc_g_stands_for(void) { return "gentle"; } +const char *grpc_g_stands_for(void) { return "gregarious"; } diff --git a/src/core/lib/transport/service_config.c b/src/core/lib/transport/service_config.c index 1195f75044..6aecb7fa93 100644 --- a/src/core/lib/transport/service_config.c +++ b/src/core/lib/transport/service_config.c @@ -162,7 +162,6 @@ static char* parse_json_method_name(grpc_json* json) { static bool parse_json_method_config( grpc_exec_ctx* exec_ctx, grpc_json* json, void* (*create_value)(const grpc_json* method_config_json), - const grpc_slice_hash_table_vtable* vtable, grpc_slice_hash_table_entry* entries, size_t* idx) { // Construct value. void* method_config = create_value(json); @@ -185,13 +184,11 @@ static bool parse_json_method_config( // Add entry for each path. for (size_t i = 0; i < paths.count; ++i) { entries[*idx].key = grpc_slice_from_copied_string(paths.strs[i]); - entries[*idx].value = vtable->copy_value(method_config); - entries[*idx].vtable = vtable; + entries[*idx].value = method_config; ++*idx; } success = true; done: - vtable->destroy_value(exec_ctx, method_config); gpr_strvec_destroy(&paths); return success; } @@ -199,7 +196,7 @@ done: grpc_slice_hash_table* grpc_service_config_create_method_config_table( grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config, void* (*create_value)(const grpc_json* method_config_json), - const grpc_slice_hash_table_vtable* vtable) { + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)) { const grpc_json* json = service_config->json_tree; // Traverse parsed JSON tree. if (json->type != GRPC_JSON_OBJECT || json->key != NULL) return NULL; @@ -220,8 +217,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table( size_t idx = 0; for (grpc_json* method = field->child; method != NULL; method = method->next) { - if (!parse_json_method_config(exec_ctx, method, create_value, vtable, - entries, &idx)) { + if (!parse_json_method_config(exec_ctx, method, create_value, entries, + &idx)) { return NULL; } } @@ -231,12 +228,8 @@ grpc_slice_hash_table* grpc_service_config_create_method_config_table( // Instantiate method config table. grpc_slice_hash_table* method_config_table = NULL; if (entries != NULL) { - method_config_table = grpc_slice_hash_table_create(num_entries, entries); - // Clean up. - for (size_t i = 0; i < num_entries; ++i) { - grpc_slice_unref_internal(exec_ctx, entries[i].key); - vtable->destroy_value(exec_ctx, entries[i].value); - } + method_config_table = + grpc_slice_hash_table_create(num_entries, entries, destroy_value); gpr_free(entries); } return method_config_table; diff --git a/src/core/lib/transport/service_config.h b/src/core/lib/transport/service_config.h index ebfc59b534..e0548b9c3f 100644 --- a/src/core/lib/transport/service_config.h +++ b/src/core/lib/transport/service_config.h @@ -57,12 +57,12 @@ const char* grpc_service_config_get_lb_policy_name( /// Creates a method config table based on the data in \a json. /// The table's keys are request paths. The table's value type is /// returned by \a create_value(), based on data parsed from the JSON tree. -/// \a vtable provides methods used to manage the values. +/// \a destroy_value is used to clean up values. /// Returns NULL on error. grpc_slice_hash_table* grpc_service_config_create_method_config_table( grpc_exec_ctx* exec_ctx, const grpc_service_config* service_config, void* (*create_value)(const grpc_json* method_config_json), - const grpc_slice_hash_table_vtable* vtable); + void (*destroy_value)(grpc_exec_ctx* exec_ctx, void* value)); /// A helper function for looking up values in the table returned by /// \a grpc_service_config_create_method_config_table(). diff --git a/src/core/tsi/fake_transport_security.c b/src/core/tsi/fake_transport_security.c index 822fad51cb..1836beefc4 100644 --- a/src/core/tsi/fake_transport_security.c +++ b/src/core/tsi/fake_transport_security.c @@ -499,6 +499,7 @@ static const tsi_handshaker_vtable handshaker_vtable = { fake_handshaker_extract_peer, fake_handshaker_create_frame_protector, fake_handshaker_destroy, + NULL, }; tsi_handshaker *tsi_create_fake_handshaker(int is_client) { diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 5f4705db92..e1d634a1fa 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -479,9 +479,9 @@ static tsi_result do_ssl_write(SSL *ssl, unsigned char *unprotected_bytes, } /* Loads an in-memory PEM certificate chain into the SSL context. */ -static tsi_result ssl_ctx_use_certificate_chain( - SSL_CTX *context, const unsigned char *pem_cert_chain, - size_t pem_cert_chain_size) { +static tsi_result ssl_ctx_use_certificate_chain(SSL_CTX *context, + const char *pem_cert_chain, + size_t pem_cert_chain_size) { tsi_result result = TSI_OK; X509 *certificate = NULL; BIO *pem; @@ -522,8 +522,7 @@ static tsi_result ssl_ctx_use_certificate_chain( } /* Loads an in-memory PEM private key into the SSL context. */ -static tsi_result ssl_ctx_use_private_key(SSL_CTX *context, - const unsigned char *pem_key, +static tsi_result ssl_ctx_use_private_key(SSL_CTX *context, const char *pem_key, size_t pem_key_size) { tsi_result result = TSI_OK; EVP_PKEY *private_key = NULL; @@ -549,9 +548,11 @@ static tsi_result ssl_ctx_use_private_key(SSL_CTX *context, /* Loads in-memory PEM verification certs into the SSL context and optionally returns the verification cert names (root_names can be NULL). */ -static tsi_result ssl_ctx_load_verification_certs( - SSL_CTX *context, const unsigned char *pem_roots, size_t pem_roots_size, - STACK_OF(X509_NAME) * *root_names) { +static tsi_result ssl_ctx_load_verification_certs(SSL_CTX *context, + const char *pem_roots, + size_t pem_roots_size, + STACK_OF(X509_NAME) * + *root_names) { tsi_result result = TSI_OK; size_t num_roots = 0; X509 *root = NULL; @@ -618,24 +619,25 @@ static tsi_result ssl_ctx_load_verification_certs( /* Populates the SSL context with a private key and a cert chain, and sets the cipher list and the ephemeral ECDH key. */ static tsi_result populate_ssl_context( - SSL_CTX *context, const unsigned char *pem_private_key, - size_t pem_private_key_size, const unsigned char *pem_certificate_chain, - size_t pem_certificate_chain_size, const char *cipher_list) { + SSL_CTX *context, const tsi_ssl_pem_key_cert_pair *key_cert_pair, + const char *cipher_list) { tsi_result result = TSI_OK; - if (pem_certificate_chain != NULL) { - result = ssl_ctx_use_certificate_chain(context, pem_certificate_chain, - pem_certificate_chain_size); - if (result != TSI_OK) { - gpr_log(GPR_ERROR, "Invalid cert chain file."); - return result; + if (key_cert_pair != NULL) { + if (key_cert_pair->cert_chain != NULL) { + result = ssl_ctx_use_certificate_chain(context, key_cert_pair->cert_chain, + strlen(key_cert_pair->cert_chain)); + if (result != TSI_OK) { + gpr_log(GPR_ERROR, "Invalid cert chain file."); + return result; + } } - } - if (pem_private_key != NULL) { - result = - ssl_ctx_use_private_key(context, pem_private_key, pem_private_key_size); - if (result != TSI_OK || !SSL_CTX_check_private_key(context)) { - gpr_log(GPR_ERROR, "Invalid private key."); - return result != TSI_OK ? result : TSI_INVALID_ARGUMENT; + if (key_cert_pair->private_key != NULL) { + result = ssl_ctx_use_private_key(context, key_cert_pair->private_key, + strlen(key_cert_pair->private_key)); + if (result != TSI_OK || !SSL_CTX_check_private_key(context)) { + gpr_log(GPR_ERROR, "Invalid private key."); + return result != TSI_OK ? result : TSI_INVALID_ARGUMENT; + } } } if ((cipher_list != NULL) && !SSL_CTX_set_cipher_list(context, cipher_list)) { @@ -656,13 +658,12 @@ static tsi_result populate_ssl_context( } /* Extracts the CN and the SANs from an X509 cert as a peer object. */ -static tsi_result extract_x509_subject_names_from_pem_cert( - const unsigned char *pem_cert, size_t pem_cert_size, tsi_peer *peer) { +static tsi_result extract_x509_subject_names_from_pem_cert(const char *pem_cert, + tsi_peer *peer) { tsi_result result = TSI_OK; X509 *cert = NULL; BIO *pem; - GPR_ASSERT(pem_cert_size <= INT_MAX); - pem = BIO_new_mem_buf((void *)pem_cert, (int)pem_cert_size); + pem = BIO_new_mem_buf((void *)pem_cert, (int)strlen(pem_cert)); if (pem == NULL) return TSI_OUT_OF_RESOURCES; cert = PEM_read_bio_X509(pem, NULL, NULL, ""); @@ -679,8 +680,7 @@ static tsi_result extract_x509_subject_names_from_pem_cert( /* Builds the alpn protocol name list according to rfc 7301. */ static tsi_result build_alpn_protocol_name_list( - const unsigned char **alpn_protocols, - const unsigned char *alpn_protocols_lengths, uint16_t num_alpn_protocols, + const char **alpn_protocols, uint16_t num_alpn_protocols, unsigned char **protocol_name_list, size_t *protocol_name_list_length) { uint16_t i; unsigned char *current; @@ -688,19 +688,21 @@ static tsi_result build_alpn_protocol_name_list( *protocol_name_list_length = 0; if (num_alpn_protocols == 0) return TSI_INVALID_ARGUMENT; for (i = 0; i < num_alpn_protocols; i++) { - if (alpn_protocols_lengths[i] == 0) { - gpr_log(GPR_ERROR, "Invalid 0-length protocol name."); + size_t length = alpn_protocols[i] == NULL ? 0 : strlen(alpn_protocols[i]); + if (length == 0 || length > 255) { + gpr_log(GPR_ERROR, "Invalid protocol name length: %d.", (int)length); return TSI_INVALID_ARGUMENT; } - *protocol_name_list_length += (size_t)alpn_protocols_lengths[i] + 1; + *protocol_name_list_length += length + 1; } *protocol_name_list = gpr_malloc(*protocol_name_list_length); if (*protocol_name_list == NULL) return TSI_OUT_OF_RESOURCES; current = *protocol_name_list; for (i = 0; i < num_alpn_protocols; i++) { - *(current++) = alpn_protocols_lengths[i]; - memcpy(current, alpn_protocols[i], alpn_protocols_lengths[i]); - current += alpn_protocols_lengths[i]; + size_t length = strlen(alpn_protocols[i]); + *(current++) = (uint8_t)length; /* max checked above. */ + memcpy(current, alpn_protocols[i], length); + current += length; } /* Safety check. */ if ((current < *protocol_name_list) || @@ -1040,6 +1042,7 @@ static const tsi_handshaker_vtable handshaker_vtable = { ssl_handshaker_extract_peer, ssl_handshaker_create_frame_protector, ssl_handshaker_destroy, + NULL, }; /* --- tsi_ssl_handshaker_factory common methods. --- */ @@ -1280,11 +1283,9 @@ static int server_handshaker_factory_npn_advertised_callback( /* --- tsi_ssl_handshaker_factory constructors. --- */ tsi_result tsi_create_ssl_client_handshaker_factory( - const unsigned char *pem_private_key, size_t pem_private_key_size, - const unsigned char *pem_cert_chain, size_t pem_cert_chain_size, - const unsigned char *pem_root_certs, size_t pem_root_certs_size, - const char *cipher_list, const unsigned char **alpn_protocols, - const unsigned char *alpn_protocols_lengths, uint16_t num_alpn_protocols, + const tsi_ssl_pem_key_cert_pair *pem_key_cert_pair, + const char *pem_root_certs, const char *cipher_suites, + const char **alpn_protocols, uint16_t num_alpn_protocols, tsi_ssl_client_handshaker_factory **factory) { SSL_CTX *ssl_context = NULL; tsi_ssl_client_handshaker_factory *impl = NULL; @@ -1307,20 +1308,19 @@ tsi_result tsi_create_ssl_client_handshaker_factory( do { result = - populate_ssl_context(ssl_context, pem_private_key, pem_private_key_size, - pem_cert_chain, pem_cert_chain_size, cipher_list); + populate_ssl_context(ssl_context, pem_key_cert_pair, cipher_suites); if (result != TSI_OK) break; result = ssl_ctx_load_verification_certs(ssl_context, pem_root_certs, - pem_root_certs_size, NULL); + strlen(pem_root_certs), NULL); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Cannot load server root certificates."); break; } if (num_alpn_protocols != 0) { - result = build_alpn_protocol_name_list( - alpn_protocols, alpn_protocols_lengths, num_alpn_protocols, - &impl->alpn_protocol_list, &impl->alpn_protocol_list_length); + result = build_alpn_protocol_name_list(alpn_protocols, num_alpn_protocols, + &impl->alpn_protocol_list, + &impl->alpn_protocol_list_length); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Building alpn list failed with error %s.", tsi_result_to_string(result)); @@ -1352,34 +1352,24 @@ tsi_result tsi_create_ssl_client_handshaker_factory( } tsi_result tsi_create_ssl_server_handshaker_factory( - const unsigned char **pem_private_keys, - const size_t *pem_private_keys_sizes, const unsigned char **pem_cert_chains, - const size_t *pem_cert_chains_sizes, size_t key_cert_pair_count, - const unsigned char *pem_client_root_certs, - size_t pem_client_root_certs_size, int force_client_auth, - const char *cipher_list, const unsigned char **alpn_protocols, - const unsigned char *alpn_protocols_lengths, uint16_t num_alpn_protocols, + const tsi_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs, const char *pem_client_root_certs, + int force_client_auth, const char *cipher_suites, + const char **alpn_protocols, uint16_t num_alpn_protocols, tsi_ssl_server_handshaker_factory **factory) { return tsi_create_ssl_server_handshaker_factory_ex( - pem_private_keys, pem_private_keys_sizes, pem_cert_chains, - pem_cert_chains_sizes, key_cert_pair_count, pem_client_root_certs, - pem_client_root_certs_size, + pem_key_cert_pairs, num_key_cert_pairs, pem_client_root_certs, force_client_auth ? TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY : TSI_DONT_REQUEST_CLIENT_CERTIFICATE, - cipher_list, alpn_protocols, alpn_protocols_lengths, num_alpn_protocols, - factory); + cipher_suites, alpn_protocols, num_alpn_protocols, factory); } tsi_result tsi_create_ssl_server_handshaker_factory_ex( - const unsigned char **pem_private_keys, - const size_t *pem_private_keys_sizes, const unsigned char **pem_cert_chains, - const size_t *pem_cert_chains_sizes, size_t key_cert_pair_count, - const unsigned char *pem_client_root_certs, - size_t pem_client_root_certs_size, + const tsi_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs, const char *pem_client_root_certs, tsi_client_certificate_request_type client_certificate_request, - const char *cipher_list, const unsigned char **alpn_protocols, - const unsigned char *alpn_protocols_lengths, uint16_t num_alpn_protocols, - tsi_ssl_server_handshaker_factory **factory) { + const char *cipher_suites, const char **alpn_protocols, + uint16_t num_alpn_protocols, tsi_ssl_server_handshaker_factory **factory) { tsi_ssl_server_handshaker_factory *impl = NULL; tsi_result result = TSI_OK; size_t i = 0; @@ -1388,33 +1378,32 @@ tsi_result tsi_create_ssl_server_handshaker_factory_ex( if (factory == NULL) return TSI_INVALID_ARGUMENT; *factory = NULL; - if (key_cert_pair_count == 0 || pem_private_keys == NULL || - pem_cert_chains == NULL) { + if (num_key_cert_pairs == 0 || pem_key_cert_pairs == NULL) { return TSI_INVALID_ARGUMENT; } impl = gpr_zalloc(sizeof(*impl)); - impl->ssl_contexts = gpr_zalloc(key_cert_pair_count * sizeof(SSL_CTX *)); + impl->ssl_contexts = gpr_zalloc(num_key_cert_pairs * sizeof(SSL_CTX *)); impl->ssl_context_x509_subject_names = - gpr_zalloc(key_cert_pair_count * sizeof(tsi_peer)); + gpr_zalloc(num_key_cert_pairs * sizeof(tsi_peer)); if (impl->ssl_contexts == NULL || impl->ssl_context_x509_subject_names == NULL) { tsi_ssl_server_handshaker_factory_destroy(impl); return TSI_OUT_OF_RESOURCES; } - impl->ssl_context_count = key_cert_pair_count; + impl->ssl_context_count = num_key_cert_pairs; if (num_alpn_protocols > 0) { - result = build_alpn_protocol_name_list( - alpn_protocols, alpn_protocols_lengths, num_alpn_protocols, - &impl->alpn_protocol_list, &impl->alpn_protocol_list_length); + result = build_alpn_protocol_name_list(alpn_protocols, num_alpn_protocols, + &impl->alpn_protocol_list, + &impl->alpn_protocol_list_length); if (result != TSI_OK) { tsi_ssl_server_handshaker_factory_destroy(impl); return result; } } - for (i = 0; i < key_cert_pair_count; i++) { + for (i = 0; i < num_key_cert_pairs; i++) { do { impl->ssl_contexts[i] = SSL_CTX_new(TLSv1_2_method()); if (impl->ssl_contexts[i] == NULL) { @@ -1422,16 +1411,15 @@ tsi_result tsi_create_ssl_server_handshaker_factory_ex( result = TSI_OUT_OF_RESOURCES; break; } - result = populate_ssl_context( - impl->ssl_contexts[i], pem_private_keys[i], pem_private_keys_sizes[i], - pem_cert_chains[i], pem_cert_chains_sizes[i], cipher_list); + result = populate_ssl_context(impl->ssl_contexts[i], + &pem_key_cert_pairs[i], cipher_suites); if (result != TSI_OK) break; if (pem_client_root_certs != NULL) { STACK_OF(X509_NAME) *root_names = NULL; result = ssl_ctx_load_verification_certs( impl->ssl_contexts[i], pem_client_root_certs, - pem_client_root_certs_size, &root_names); + strlen(pem_client_root_certs), &root_names); if (result != TSI_OK) { gpr_log(GPR_ERROR, "Invalid verification certs."); break; @@ -1464,7 +1452,7 @@ tsi_result tsi_create_ssl_server_handshaker_factory_ex( } result = extract_x509_subject_names_from_pem_cert( - pem_cert_chains[i], pem_cert_chains_sizes[i], + pem_key_cert_pairs[i].cert_chain, &impl->ssl_context_x509_subject_names[i]); if (result != TSI_OK) break; diff --git a/src/core/tsi/ssl_transport_security.h b/src/core/tsi/ssl_transport_security.h index 48dcaec121..3117571d9f 100644 --- a/src/core/tsi/ssl_transport_security.h +++ b/src/core/tsi/ssl_transport_security.h @@ -60,27 +60,32 @@ extern "C" { typedef struct tsi_ssl_client_handshaker_factory tsi_ssl_client_handshaker_factory; +/* Object that holds a private key / certificate chain pair in PEM format. */ +typedef struct { + /* private_key is the NULL-terminated string containing the PEM encoding of + the client's private key. */ + const char *private_key; + + /* cert_chain is the NULL-terminated string containing the PEM encoding of + the client's certificate chain. */ + const char *cert_chain; +} tsi_ssl_pem_key_cert_pair; + /* Creates a client handshaker factory. - - pem_private_key is the buffer containing the PEM encoding of the client's - private key. This parameter can be NULL if the client does not have a - private key. - - pem_private_key_size is the size of the associated buffer. - - pem_cert_chain is the buffer containing the PEM encoding of the client's - certificate chain. This parameter can be NULL if the client does not have - a certificate chain. - - pem_cert_chain_size is the size of the associated buffer. - - pem_roots_cert is the buffer containing the PEM encoding of the server - root certificates. This parameter cannot be NULL. - - pem_roots_cert_size is the size of the associated buffer. + - pem_key_cert_pair is a pointer to the object containing client's private + key and certificate chain. This parameter can be NULL if the client does + not have such a key/cert pair. + - pem_roots_cert is the NULL-terminated string containing the PEM encoding of + the client root certificates. This parameter may be NULL if the server does + not want the client to be authenticated with SSL. - cipher_suites contains an optional list of the ciphers that the client supports. The format of this string is described in: https://www.openssl.org/docs/apps/ciphers.html. This parameter can be set to NULL to use the default set of ciphers. TODO(jboeuf): Revisit the format of this parameter. - - alpn_protocols is an array containing the protocol names that the - handshakers created with this factory support. This parameter can be NULL. - - alpn_protocols_lengths is an array containing the lengths of the alpn - protocols specified in alpn_protocols. This parameter can be NULL. + - alpn_protocols is an array containing the NULL terminated protocol names + that the handshakers created with this factory support. This parameter can + be NULL. - num_alpn_protocols is the number of alpn protocols and associated lengths specified. If this parameter is 0, the other alpn parameters must be NULL. - factory is the address of the factory pointer to be created. @@ -88,11 +93,9 @@ typedef struct tsi_ssl_client_handshaker_factory - This method returns TSI_OK on success or TSI_INVALID_PARAMETER in the case where a parameter is invalid. */ tsi_result tsi_create_ssl_client_handshaker_factory( - const unsigned char *pem_private_key, size_t pem_private_key_size, - const unsigned char *pem_cert_chain, size_t pem_cert_chain_size, - const unsigned char *pem_root_certs, size_t pem_root_certs_size, - const char *cipher_suites, const unsigned char **alpn_protocols, - const unsigned char *alpn_protocols_lengths, uint16_t num_alpn_protocols, + const tsi_ssl_pem_key_cert_pair *pem_key_cert_pair, + const char *pem_root_certs, const char *cipher_suites, + const char **alpn_protocols, uint16_t num_alpn_protocols, tsi_ssl_client_handshaker_factory **factory); /* Creates a client handshaker. @@ -122,37 +125,19 @@ typedef struct tsi_ssl_server_handshaker_factory tsi_ssl_server_handshaker_factory; /* Creates a server handshaker factory. - - version indicates which version of the specification to use. - - pem_private_keys is an array containing the PEM encoding of the server's - private keys. This parameter cannot be NULL. The size of the array is - given by the key_cert_pair_count parameter. - - pem_private_keys_sizes is the array containing the sizes of the associated - buffers. - - pem_cert_chains is an array containing the PEM encoding of the server's - cert chains. This parameter cannot be NULL. The size of the array is - given by the key_cert_pair_count parameter. - - pem_cert_chains_sizes is the array containing the sizes of the associated - buffers. - - key_cert_pair_count indicates the number of items in the private_key_files - and cert_chain_files parameters. - - pem_client_roots is the buffer containing the PEM encoding of the client - root certificates. This parameter may be NULL in which case the server will - not authenticate the client. If not NULL, the force_client_auth parameter - specifies if the server will accept only authenticated clients or both - authenticated and non-authenticated clients. - - pem_client_root_certs_size is the size of the associated buffer. - - force_client_auth, if set to non-zero will force the client to authenticate - with an SSL cert. Note that this option is ignored if pem_client_root_certs - is NULL or pem_client_roots_certs_size is 0 + - pem_key_cert_pairs is an array private key / certificate chains of the + server. + - num_key_cert_pairs is the number of items in the pem_key_cert_pairs array. + - pem_root_certs is the NULL-terminated string containing the PEM encoding + of the server root certificates. - cipher_suites contains an optional list of the ciphers that the server supports. The format of this string is described in: https://www.openssl.org/docs/apps/ciphers.html. This parameter can be set to NULL to use the default set of ciphers. TODO(jboeuf): Revisit the format of this parameter. - - alpn_protocols is an array containing the protocol names that the - handshakers created with this factory support. This parameter can be NULL. - - alpn_protocols_lengths is an array containing the lengths of the alpn - protocols specified in alpn_protocols. This parameter can be NULL. + - alpn_protocols is an array containing the NULL terminated protocol names + that the handshakers created with this factory support. This parameter can + be NULL. - num_alpn_protocols is the number of alpn protocols and associated lengths specified. If this parameter is 0, the other alpn parameters must be NULL. - factory is the address of the factory pointer to be created. @@ -160,13 +145,10 @@ typedef struct tsi_ssl_server_handshaker_factory - This method returns TSI_OK on success or TSI_INVALID_PARAMETER in the case where a parameter is invalid. */ tsi_result tsi_create_ssl_server_handshaker_factory( - const unsigned char **pem_private_keys, - const size_t *pem_private_keys_sizes, const unsigned char **pem_cert_chains, - const size_t *pem_cert_chains_sizes, size_t key_cert_pair_count, - const unsigned char *pem_client_root_certs, - size_t pem_client_root_certs_size, int force_client_auth, - const char *cipher_suites, const unsigned char **alpn_protocols, - const unsigned char *alpn_protocols_lengths, uint16_t num_alpn_protocols, + const tsi_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs, const char *pem_client_root_certs, + int force_client_auth, const char *cipher_suites, + const char **alpn_protocols, uint16_t num_alpn_protocols, tsi_ssl_server_handshaker_factory **factory); /* Same as tsi_create_ssl_server_handshaker_factory method except uses @@ -176,15 +158,11 @@ tsi_result tsi_create_ssl_server_handshaker_factory( authenticate with an SSL cert. Note that this option is ignored if pem_client_root_certs is NULL or pem_client_roots_certs_size is 0 */ tsi_result tsi_create_ssl_server_handshaker_factory_ex( - const unsigned char **pem_private_keys, - const size_t *pem_private_keys_sizes, const unsigned char **pem_cert_chains, - const size_t *pem_cert_chains_sizes, size_t key_cert_pair_count, - const unsigned char *pem_client_root_certs, - size_t pem_client_root_certs_size, + const tsi_ssl_pem_key_cert_pair *pem_key_cert_pairs, + size_t num_key_cert_pairs, const char *pem_client_root_certs, tsi_client_certificate_request_type client_certificate_request, - const char *cipher_suites, const unsigned char **alpn_protocols, - const unsigned char *alpn_protocols_lengths, uint16_t num_alpn_protocols, - tsi_ssl_server_handshaker_factory **factory); + const char *cipher_suites, const char **alpn_protocols, + uint16_t num_alpn_protocols, tsi_ssl_server_handshaker_factory **factory); /* Creates a server handshaker. - self is the factory from which the handshaker will be created. diff --git a/src/core/tsi/transport_security.c b/src/core/tsi/transport_security.c index 67ebe1b1f3..b11c00c43c 100644 --- a/src/core/tsi/transport_security.c +++ b/src/core/tsi/transport_security.c @@ -73,6 +73,8 @@ const char *tsi_result_to_string(tsi_result result) { return "TSI_HANDSHAKE_IN_PROGRESS"; case TSI_OUT_OF_RESOURCES: return "TSI_OUT_OF_RESOURCES"; + case TSI_ASYNC: + return "TSI_ASYNC"; default: return "UNKNOWN"; } @@ -92,6 +94,9 @@ tsi_result tsi_frame_protector_protect(tsi_frame_protector *self, protected_output_frames_size == NULL) { return TSI_INVALID_ARGUMENT; } + if (self->vtable == NULL || self->vtable->protect == NULL) { + return TSI_UNIMPLEMENTED; + } return self->vtable->protect(self, unprotected_bytes, unprotected_bytes_size, protected_output_frames, protected_output_frames_size); @@ -104,6 +109,9 @@ tsi_result tsi_frame_protector_protect_flush( protected_output_frames_size == NULL || still_pending_size == NULL) { return TSI_INVALID_ARGUMENT; } + if (self->vtable == NULL || self->vtable->protect_flush == NULL) { + return TSI_UNIMPLEMENTED; + } return self->vtable->protect_flush(self, protected_output_frames, protected_output_frames_size, still_pending_size); @@ -118,6 +126,9 @@ tsi_result tsi_frame_protector_unprotect( unprotected_bytes_size == NULL) { return TSI_INVALID_ARGUMENT; } + if (self->vtable == NULL || self->vtable->unprotect == NULL) { + return TSI_UNIMPLEMENTED; + } return self->vtable->unprotect(self, protected_frames_bytes, protected_frames_bytes_size, unprotected_bytes, unprotected_bytes_size); @@ -139,6 +150,9 @@ tsi_result tsi_handshaker_get_bytes_to_send_to_peer(tsi_handshaker *self, return TSI_INVALID_ARGUMENT; } if (self->frame_protector_created) return TSI_FAILED_PRECONDITION; + if (self->vtable == NULL || self->vtable->get_bytes_to_send_to_peer == NULL) { + return TSI_UNIMPLEMENTED; + } return self->vtable->get_bytes_to_send_to_peer(self, bytes, bytes_size); } @@ -149,12 +163,18 @@ tsi_result tsi_handshaker_process_bytes_from_peer(tsi_handshaker *self, return TSI_INVALID_ARGUMENT; } if (self->frame_protector_created) return TSI_FAILED_PRECONDITION; + if (self->vtable == NULL || self->vtable->process_bytes_from_peer == NULL) { + return TSI_UNIMPLEMENTED; + } return self->vtable->process_bytes_from_peer(self, bytes, bytes_size); } tsi_result tsi_handshaker_get_result(tsi_handshaker *self) { if (self == NULL) return TSI_INVALID_ARGUMENT; if (self->frame_protector_created) return TSI_FAILED_PRECONDITION; + if (self->vtable == NULL || self->vtable->get_result == NULL) { + return TSI_UNIMPLEMENTED; + } return self->vtable->get_result(self); } @@ -165,6 +185,9 @@ tsi_result tsi_handshaker_extract_peer(tsi_handshaker *self, tsi_peer *peer) { if (tsi_handshaker_get_result(self) != TSI_OK) { return TSI_FAILED_PRECONDITION; } + if (self->vtable == NULL || self->vtable->extract_peer == NULL) { + return TSI_UNIMPLEMENTED; + } return self->vtable->extract_peer(self, peer); } @@ -177,19 +200,77 @@ tsi_result tsi_handshaker_create_frame_protector( if (tsi_handshaker_get_result(self) != TSI_OK) { return TSI_FAILED_PRECONDITION; } + if (self->vtable == NULL || self->vtable->create_frame_protector == NULL) { + return TSI_UNIMPLEMENTED; + } result = self->vtable->create_frame_protector(self, max_protected_frame_size, protector); if (result == TSI_OK) { - self->frame_protector_created = 1; + self->frame_protector_created = true; } return result; } +tsi_result tsi_handshaker_next( + tsi_handshaker *self, const unsigned char *received_bytes, + size_t received_bytes_size, unsigned char **bytes_to_send, + size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result, + tsi_handshaker_on_next_done_cb cb, void *user_data) { + if (self == NULL) return TSI_INVALID_ARGUMENT; + if (self->handshaker_result_created) return TSI_FAILED_PRECONDITION; + if (self->vtable == NULL || self->vtable->next == NULL) { + return TSI_UNIMPLEMENTED; + } + return self->vtable->next(self, received_bytes, received_bytes_size, + bytes_to_send, bytes_to_send_size, + handshaker_result, cb, user_data); +} + void tsi_handshaker_destroy(tsi_handshaker *self) { if (self == NULL) return; self->vtable->destroy(self); } +/* --- tsi_handshaker_result implementation. --- */ + +tsi_result tsi_handshaker_result_extract_peer(const tsi_handshaker_result *self, + tsi_peer *peer) { + if (self == NULL || peer == NULL) return TSI_INVALID_ARGUMENT; + memset(peer, 0, sizeof(tsi_peer)); + if (self->vtable == NULL || self->vtable->extract_peer == NULL) { + return TSI_UNIMPLEMENTED; + } + return self->vtable->extract_peer(self, peer); +} + +tsi_result tsi_handshaker_result_create_frame_protector( + const tsi_handshaker_result *self, size_t *max_protected_frame_size, + tsi_frame_protector **protector) { + if (self == NULL || protector == NULL) return TSI_INVALID_ARGUMENT; + if (self->vtable == NULL || self->vtable->create_frame_protector == NULL) { + return TSI_UNIMPLEMENTED; + } + return self->vtable->create_frame_protector(self, max_protected_frame_size, + protector); +} + +tsi_result tsi_handshaker_result_get_unused_bytes( + const tsi_handshaker_result *self, unsigned char **bytes, + size_t *bytes_size) { + if (self == NULL || bytes == NULL || bytes_size == NULL) { + return TSI_INVALID_ARGUMENT; + } + if (self->vtable == NULL || self->vtable->get_unused_bytes == NULL) { + return TSI_UNIMPLEMENTED; + } + return self->vtable->get_unused_bytes(self, bytes, bytes_size); +} + +void tsi_handshaker_result_destroy(tsi_handshaker_result *self) { + if (self == NULL) return; + self->vtable->destroy(self); +} + /* --- tsi_peer implementation. --- */ tsi_peer_property tsi_init_peer_property(void) { diff --git a/src/core/tsi/transport_security.h b/src/core/tsi/transport_security.h index 491fa1a8bd..a4c9cbc001 100644 --- a/src/core/tsi/transport_security.h +++ b/src/core/tsi/transport_security.h @@ -34,6 +34,8 @@ #ifndef GRPC_CORE_TSI_TRANSPORT_SECURITY_H #define GRPC_CORE_TSI_TRANSPORT_SECURITY_H +#include <stdbool.h> + #include "src/core/tsi/transport_security_interface.h" #ifdef __cplusplus @@ -81,11 +83,33 @@ typedef struct { size_t *max_protected_frame_size, tsi_frame_protector **protector); void (*destroy)(tsi_handshaker *self); + tsi_result (*next)(tsi_handshaker *self, const unsigned char *received_bytes, + size_t received_bytes_size, unsigned char **bytes_to_send, + size_t *bytes_to_send_size, + tsi_handshaker_result **handshaker_result, + tsi_handshaker_on_next_done_cb cb, void *user_data); } tsi_handshaker_vtable; struct tsi_handshaker { const tsi_handshaker_vtable *vtable; - int frame_protector_created; + bool frame_protector_created; + bool handshaker_result_created; +}; + +/* Base for tsi_handshaker_result implementations. + See transport_security_interface.h for documentation. */ +typedef struct { + tsi_result (*extract_peer)(const tsi_handshaker_result *self, tsi_peer *peer); + tsi_result (*create_frame_protector)(const tsi_handshaker_result *self, + size_t *max_output_protected_frame_size, + tsi_frame_protector **protector); + tsi_result (*get_unused_bytes)(const tsi_handshaker_result *self, + unsigned char **bytes, size_t *bytes_size); + void (*destroy)(tsi_handshaker_result *self); +} tsi_handshaker_result_vtable; + +struct tsi_handshaker_result { + const tsi_handshaker_result_vtable *vtable; }; /* Peer and property construction/destruction functions. */ diff --git a/src/core/tsi/transport_security_adapter.c b/src/core/tsi/transport_security_adapter.c new file mode 100644 index 0000000000..9f2147b530 --- /dev/null +++ b/src/core/tsi/transport_security_adapter.c @@ -0,0 +1,236 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/tsi/transport_security_adapter.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include "src/core/tsi/transport_security.h" + +#define TSI_ADAPTER_INITIAL_BUFFER_SIZE 256 + +/* --- tsi_adapter_handshaker_result implementation ---*/ + +typedef struct { + tsi_handshaker_result base; + tsi_handshaker *wrapped; + unsigned char *unused_bytes; + size_t unused_bytes_size; +} tsi_adapter_handshaker_result; + +static tsi_result adapter_result_extract_peer(const tsi_handshaker_result *self, + tsi_peer *peer) { + tsi_adapter_handshaker_result *impl = (tsi_adapter_handshaker_result *)self; + return tsi_handshaker_extract_peer(impl->wrapped, peer); +} + +static tsi_result adapter_result_create_frame_protector( + const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + tsi_frame_protector **protector) { + tsi_adapter_handshaker_result *impl = (tsi_adapter_handshaker_result *)self; + return tsi_handshaker_create_frame_protector( + impl->wrapped, max_output_protected_frame_size, protector); +} + +static tsi_result adapter_result_get_unused_bytes( + const tsi_handshaker_result *self, unsigned char **bytes, + size_t *byte_size) { + tsi_adapter_handshaker_result *impl = (tsi_adapter_handshaker_result *)self; + *bytes = impl->unused_bytes; + *byte_size = impl->unused_bytes_size; + return TSI_OK; +} + +static void adapter_result_destroy(tsi_handshaker_result *self) { + tsi_adapter_handshaker_result *impl = (tsi_adapter_handshaker_result *)self; + tsi_handshaker_destroy(impl->wrapped); + gpr_free(impl->unused_bytes); + gpr_free(self); +} + +static const tsi_handshaker_result_vtable result_vtable = { + adapter_result_extract_peer, adapter_result_create_frame_protector, + adapter_result_get_unused_bytes, adapter_result_destroy, +}; + +/* Ownership of wrapped tsi_handshaker is transferred to the result object. */ +static tsi_result tsi_adapter_create_handshaker_result( + tsi_handshaker *wrapped, const unsigned char *unused_bytes, + size_t unused_bytes_size, tsi_handshaker_result **handshaker_result) { + if (wrapped == NULL || (unused_bytes_size > 0 && unused_bytes == NULL)) { + return TSI_INVALID_ARGUMENT; + } + tsi_adapter_handshaker_result *impl = gpr_zalloc(sizeof(*impl)); + impl->base.vtable = &result_vtable; + impl->wrapped = wrapped; + impl->unused_bytes_size = unused_bytes_size; + if (unused_bytes_size > 0) { + impl->unused_bytes = gpr_malloc(unused_bytes_size); + memcpy(impl->unused_bytes, unused_bytes, unused_bytes_size); + } else { + impl->unused_bytes = NULL; + } + *handshaker_result = &impl->base; + return TSI_OK; +} + +/* --- tsi_adapter_handshaker implementation ---*/ + +typedef struct { + tsi_handshaker base; + tsi_handshaker *wrapped; + unsigned char *adapter_buffer; + size_t adapter_buffer_size; +} tsi_adapter_handshaker; + +static tsi_result adapter_get_bytes_to_send_to_peer(tsi_handshaker *self, + unsigned char *bytes, + size_t *bytes_size) { + return tsi_handshaker_get_bytes_to_send_to_peer( + tsi_adapter_handshaker_get_wrapped(self), bytes, bytes_size); +} + +static tsi_result adapter_process_bytes_from_peer(tsi_handshaker *self, + const unsigned char *bytes, + size_t *bytes_size) { + return tsi_handshaker_process_bytes_from_peer( + tsi_adapter_handshaker_get_wrapped(self), bytes, bytes_size); +} + +static tsi_result adapter_get_result(tsi_handshaker *self) { + return tsi_handshaker_get_result(tsi_adapter_handshaker_get_wrapped(self)); +} + +static tsi_result adapter_extract_peer(tsi_handshaker *self, tsi_peer *peer) { + return tsi_handshaker_extract_peer(tsi_adapter_handshaker_get_wrapped(self), + peer); +} + +static tsi_result adapter_create_frame_protector( + tsi_handshaker *self, size_t *max_protected_frame_size, + tsi_frame_protector **protector) { + return tsi_handshaker_create_frame_protector( + tsi_adapter_handshaker_get_wrapped(self), max_protected_frame_size, + protector); +} + +static void adapter_destroy(tsi_handshaker *self) { + tsi_adapter_handshaker *impl = (tsi_adapter_handshaker *)self; + tsi_handshaker_destroy(impl->wrapped); + gpr_free(impl->adapter_buffer); + gpr_free(self); +} + +static tsi_result adapter_next( + tsi_handshaker *self, const unsigned char *received_bytes, + size_t received_bytes_size, unsigned char **bytes_to_send, + size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result, + tsi_handshaker_on_next_done_cb cb, void *user_data) { + /* Input sanity check. */ + if ((received_bytes_size > 0 && received_bytes == NULL) || + bytes_to_send == NULL || bytes_to_send_size == NULL || + handshaker_result == NULL) { + return TSI_INVALID_ARGUMENT; + } + + /* If there are received bytes, process them first. */ + tsi_adapter_handshaker *impl = (tsi_adapter_handshaker *)self; + tsi_result status = TSI_OK; + size_t bytes_consumed = received_bytes_size; + if (received_bytes_size > 0) { + status = tsi_handshaker_process_bytes_from_peer( + impl->wrapped, received_bytes, &bytes_consumed); + if (status != TSI_OK) return status; + } + + /* Get bytes to send to the peer, if available. */ + size_t offset = 0; + do { + size_t to_send_size = impl->adapter_buffer_size - offset; + status = tsi_handshaker_get_bytes_to_send_to_peer( + impl->wrapped, impl->adapter_buffer + offset, &to_send_size); + offset += to_send_size; + if (status == TSI_INCOMPLETE_DATA) { + impl->adapter_buffer_size *= 2; + impl->adapter_buffer = + gpr_realloc(impl->adapter_buffer, impl->adapter_buffer_size); + } + } while (status == TSI_INCOMPLETE_DATA); + if (status != TSI_OK) return status; + *bytes_to_send = impl->adapter_buffer; + *bytes_to_send_size = offset; + + /* If handshake completes, create tsi_handshaker_result. */ + if (tsi_handshaker_is_in_progress(impl->wrapped)) { + *handshaker_result = NULL; + } else { + size_t unused_bytes_size = received_bytes_size - bytes_consumed; + const unsigned char *unused_bytes = + unused_bytes_size == 0 ? NULL : received_bytes + bytes_consumed; + status = tsi_adapter_create_handshaker_result( + impl->wrapped, unused_bytes, unused_bytes_size, handshaker_result); + if (status == TSI_OK) { + impl->base.handshaker_result_created = true; + impl->wrapped = NULL; + } + } + return status; +} + +static const tsi_handshaker_vtable handshaker_vtable = { + adapter_get_bytes_to_send_to_peer, + adapter_process_bytes_from_peer, + adapter_get_result, + adapter_extract_peer, + adapter_create_frame_protector, + adapter_destroy, + adapter_next, +}; + +tsi_handshaker *tsi_create_adapter_handshaker(tsi_handshaker *wrapped) { + GPR_ASSERT(wrapped != NULL); + tsi_adapter_handshaker *impl = gpr_zalloc(sizeof(*impl)); + impl->base.vtable = &handshaker_vtable; + impl->wrapped = wrapped; + impl->adapter_buffer_size = TSI_ADAPTER_INITIAL_BUFFER_SIZE; + impl->adapter_buffer = gpr_malloc(impl->adapter_buffer_size); + return &impl->base; +} + +tsi_handshaker *tsi_adapter_handshaker_get_wrapped(tsi_handshaker *adapter) { + if (adapter == NULL) return NULL; + tsi_adapter_handshaker *impl = (tsi_adapter_handshaker *)adapter; + return impl->wrapped; +} diff --git a/src/core/tsi/transport_security_adapter.h b/src/core/tsi/transport_security_adapter.h new file mode 100644 index 0000000000..400df2f11b --- /dev/null +++ b/src/core/tsi/transport_security_adapter.h @@ -0,0 +1,62 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_TSI_TRANSPORT_SECURITY_ADAPTER_H +#define GRPC_CORE_TSI_TRANSPORT_SECURITY_ADAPTER_H + +#include "src/core/tsi/transport_security_interface.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/* Create a tsi handshaker that takes an implementation of old interface and + converts into an implementation of new interface. In the old interface, + there are get_bytes_to_send_to_peer, process_bytes_from_peer, get_result, + extract_peer, and create_frame_protector. In the new interface, only next + method is needed. See transport_security_interface.h for details. Note that + this tsi adapter handshaker is temporary. It will be removed once TSI has + been fully migrated to the new interface. + Ownership of input tsi_handshaker is transferred to this new adapter. */ +tsi_handshaker *tsi_create_adapter_handshaker(tsi_handshaker *wrapped); + +/* Given a tsi adapter handshaker, return the original wrapped handshaker. The + adapter still owns the wrapped handshaker which should not be destroyed by + the caller. */ +tsi_handshaker *tsi_adapter_handshaker_get_wrapped(tsi_handshaker *adapter); + +#ifdef __cplusplus +} +#endif + +#endif /* GRPC_CORE_TSI_TRANSPORT_SECURITY_ADAPTER_H */ diff --git a/src/core/tsi/transport_security_interface.h b/src/core/tsi/transport_security_interface.h index caed43eac4..f2112b62b6 100644 --- a/src/core/tsi/transport_security_interface.h +++ b/src/core/tsi/transport_security_interface.h @@ -56,7 +56,8 @@ typedef enum { TSI_NOT_FOUND = 9, TSI_PROTOCOL_FAILURE = 10, TSI_HANDSHAKE_IN_PROGRESS = 11, - TSI_OUT_OF_RESOURCES = 12 + TSI_OUT_OF_RESOURCES = 12, + TSI_ASYNC = 13 } tsi_result; typedef enum { @@ -208,76 +209,138 @@ typedef struct { /* Destructs the tsi_peer object. */ void tsi_peer_destruct(tsi_peer *self); +/* --- tsi_handshaker_result object --- + + This object contains all necessary handshake results and data such as peer + info, negotiated keys, unused handshake bytes, when the handshake completes. + Implementations of this object must be thread compatible. */ + +typedef struct tsi_handshaker_result tsi_handshaker_result; + +/* This method extracts tsi peer. It returns TSI_OK assuming there is no fatal + error. + The caller is responsible for destructing the peer. */ +tsi_result tsi_handshaker_result_extract_peer(const tsi_handshaker_result *self, + tsi_peer *peer); + +/* This method creates a tsi_frame_protector object. It returns TSI_OK assuming + there is no fatal error. + The caller is responsible for destroying the protector. */ +tsi_result tsi_handshaker_result_create_frame_protector( + const tsi_handshaker_result *self, size_t *max_output_protected_frame_size, + tsi_frame_protector **protector); + +/* This method returns the unused bytes from the handshake. It returns TSI_OK + assuming there is no fatal error. + Ownership of the bytes is retained by the handshaker result. As a + consequence, the caller must not free the bytes. */ +tsi_result tsi_handshaker_result_get_unused_bytes( + const tsi_handshaker_result *self, unsigned char **bytes, + size_t *byte_size); + +/* This method releases the tsi_handshaker_handshaker object. After this method + is called, no other method can be called on the object. */ +void tsi_handshaker_result_destroy(tsi_handshaker_result *self); + /* --- tsi_handshaker objects ---- Implementations of this object must be thread compatible. - A typical usage of this object would be: + ------------------------------------------------------------------------ + + A typical usage supporting both synchronous and asynchronous TSI handshaker + implementations would be: ------------------------------------------------------------------------ - tsi_result result = TSI_OK; - unsigned char buf[4096]; - size_t buf_offset; - size_t buf_size; - while (1) { - // See if we need to send some bytes to the peer. - do { - size_t buf_size_to_send = sizeof(buf); - result = tsi_handshaker_get_bytes_to_send_to_peer(handshaker, buf, - &buf_size_to_send); - if (buf_size_to_send > 0) send_bytes_to_peer(buf, buf_size_to_send); - } while (result == TSI_INCOMPLETE_DATA); - if (result != TSI_OK) return result; - if (!tsi_handshaker_is_in_progress(handshaker)) break; - - do { - // Read bytes from the peer. - buf_size = sizeof(buf); - buf_offset = 0; - read_bytes_from_peer(buf, &buf_size); - if (buf_size == 0) break; - - // Process the bytes from the peer. We have to be careful as these bytes - // may contain non-handshake data (protected data). If this is the case, - // we will exit from the loop with buf_size > 0. - size_t consumed_by_handshaker = buf_size; - result = tsi_handshaker_process_bytes_from_peer( - handshaker, buf, &consumed_by_handshaker); - buf_size -= consumed_by_handshaker; - buf_offset += consumed_by_handshaker; - } while (result == TSI_INCOMPLETE_DATA); - - if (result != TSI_OK) return result; - if (!tsi_handshaker_is_in_progress(handshaker)) break; + + typedef struct { + tsi_handshaker *handshaker; + tsi_handshaker_result *handshaker_result; + unsigned char *handshake_buffer; + size_t handshake_buffer_size; + ... + } security_handshaker; + + void do_handshake(security_handshaker *h, ...) { + // Start the handshake by the calling do_handshake_next. + do_handshake_next(h, NULL, 0); + ... } - // Check the Peer. - tsi_peer peer; - do { - result = tsi_handshaker_extract_peer(handshaker, &peer); - if (result != TSI_OK) break; - result = check_peer(&peer); - } while (0); - tsi_peer_destruct(&peer); - if (result != TSI_OK) return result; - - // Create the protector. - tsi_frame_protector* protector = NULL; - result = tsi_handshaker_create_frame_protector(handshaker, NULL, - &protector); - if (result != TSI_OK) return result; - - // Do not forget to unprotect outstanding data if any. - if (buf_size > 0) { - result = tsi_frame_protector_unprotect(protector, buf + buf_offset, - buf_size, ..., ...); - .... + // This method is the callback function when data is received from the + // peer. This method will read bytes into the handshake buffer and call + // do_handshake_next. + void on_handshake_data_received_from_peer(void *user_data) { + security_handshaker *h = (security_handshaker *)user_data; + size_t bytes_received_size = h->handshake_buffer_size; + read_bytes_from_peer(h->handshake_buffer, &bytes_received_size); + do_handshake_next(h, h->handshake_buffer, bytes_received_size); + } + + // This method processes a step of handshake, calling tsi_handshaker_next. + void do_handshake_next(security_handshaker *h, + const unsigned char* bytes_received, + size_t bytes_received_size) { + tsi_result status = TSI_OK; + unsigned char *bytes_to_send = NULL; + size_t bytes_to_send_size = 0; + tsi_handshaker_result *result = NULL; + status = tsi_handshaker_next( + handshaker, bytes_received, bytes_received_size, &bytes_to_send, + &bytes_to_send_size, &result, on_handshake_next_done, h); + // If TSI handshaker is asynchronous, on_handshake_next_done will be + // executed inside tsi_handshaker_next. + if (status == TSI_ASYNC) return; + // If TSI handshaker is synchronous, invoke callback directly in this + // thread. + on_handshake_next_done(status, (void *)h, bytes_to_send, + bytes_to_send_size, result); + } + + // This is the callback function to execute after tsi_handshaker_next. + // It is passed to tsi_handshaker_next as a function parameter. + void on_handshake_next_done( + tsi_result status, void *user_data, const unsigned char *bytes_to_send, + size_t bytes_to_send_size, tsi_handshaker_result *result) { + security_handshaker *h = (security_handshaker *)user_data; + if (status == TSI_INCOMPLETE_DATA) { + // Schedule an asynchronous read from the peer. If handshake data are + // received, on_handshake_data_received_from_peer will be called. + async_read_from_peer(..., ..., on_handshake_data_received_from_peer); + return; + } + if (status != TSI_OK) return; + + if (bytes_to_send_size > 0) { + send_bytes_to_peer(bytes_to_send, bytes_to_send_size); + } + + if (result != NULL) { + // Handshake completed. + h->result = result; + // Check the Peer. + tsi_peer peer; + status = tsi_handshaker_result_extract_peer(result, &peer); + if (status != TSI_OK) return; + status = check_peer(&peer); + tsi_peer_destruct(&peer); + if (status != TSI_OK) return; + + // Create the protector. + tsi_frame_protector* protector = NULL; + status = tsi_handshaker_result_create_frame_protector(result, NULL, + &protector); + if (status != TSI_OK) return; + + // Do not forget to unprotect outstanding data if any. + .... + } } - ... ------------------------------------------------------------------------ */ typedef struct tsi_handshaker tsi_handshaker; -/* Gets bytes that need to be sent to the peer. +/* TO BE DEPRECATED SOON. Use tsi_handshaker_next instead. + Gets bytes that need to be sent to the peer. - bytes is the buffer that will be written with the data to be sent to the peer. - bytes_size is an input/output parameter specifying the capacity of the @@ -292,7 +355,8 @@ tsi_result tsi_handshaker_get_bytes_to_send_to_peer(tsi_handshaker *self, unsigned char *bytes, size_t *bytes_size); -/* Processes bytes received from the peer. +/* TO BE DEPRECATED SOON. Use tsi_handshaker_next instead. + Processes bytes received from the peer. - bytes is the buffer containing the data. - bytes_size is an input/output parameter specifying the size of the data as input and the number of bytes consumed as output. @@ -305,24 +369,29 @@ tsi_result tsi_handshaker_process_bytes_from_peer(tsi_handshaker *self, const unsigned char *bytes, size_t *bytes_size); -/* Gets the result of the handshaker. +/* TO BE DEPRECATED SOON. + Gets the result of the handshaker. Returns TSI_OK if the hanshake completed successfully and there has been no errors. Returns TSI_HANDSHAKE_IN_PROGRESS if the handshaker is not done yet but no error has been encountered so far. Otherwise the handshaker failed with the returned error. */ tsi_result tsi_handshaker_get_result(tsi_handshaker *self); -/* Returns 1 if the handshake is in progress, 0 otherwise. */ +/* TO BE DEPRECATED SOON. + Returns 1 if the handshake is in progress, 0 otherwise. */ #define tsi_handshaker_is_in_progress(h) \ (tsi_handshaker_get_result((h)) == TSI_HANDSHAKE_IN_PROGRESS) -/* This method may return TSI_FAILED_PRECONDITION if +/* TO BE DEPRECATED SOON. Use tsi_handshaker_result_extract_peer instead. + This method may return TSI_FAILED_PRECONDITION if tsi_handshaker_is_in_progress returns 1, it returns TSI_OK otherwise assuming the handshaker is not in a fatal error state. The caller is responsible for destructing the peer. */ tsi_result tsi_handshaker_extract_peer(tsi_handshaker *self, tsi_peer *peer); -/* This method creates a tsi_frame_protector object after the handshake phase +/* TO BE DEPRECATED SOON. Use tsi_handshaker_result_create_frame_protector + instead. + This method creates a tsi_frame_protector object after the handshake phase is done. After this method has been called successfully, the only method that can be called on this object is Destroy. - max_output_protected_frame_size is an input/output parameter specifying the @@ -342,10 +411,53 @@ tsi_result tsi_handshaker_create_frame_protector( tsi_handshaker *self, size_t *max_output_protected_frame_size, tsi_frame_protector **protector); +/* Callback function definition for tsi_handshaker_next. + - status indicates the status of the next operation. + - user_data is the argument to callback function passed from the caller. + - bytes_to_send is the data buffer to be sent to the peer. + - bytes_to_send_size is the size of data buffer to be sent to the peer. + - handshaker_result is the result of handshake when the handshake completes, + is NULL otherwise. */ +typedef void (*tsi_handshaker_on_next_done_cb)( + tsi_result status, void *user_data, const unsigned char *bytes_to_send, + size_t bytes_to_send_size, tsi_handshaker_result *handshaker_result); + +/* Conduct a next step of the handshake. + - received_bytes is the buffer containing the data received from the peer. + - received_bytes_size is the size of the data received from the peer. + - bytes_to_send is the data buffer to be sent to the peer. + - bytes_to_send_size is the size of data buffer to be sent to the peer. + - handshaker_result is the result of handshake if the handshake completes. + - cb is the callback function defined above. It can be NULL for synchronous + TSI handshaker implementation. + - user_data is the argument to callback function passed from the caller. + This method returns TSI_ASYNC if the TSI handshaker implementation is + asynchronous, and in this case, the callback is guaranteed to run in another + thread owned by TSI. It returns TSI_OK if the handshake completes or if + there are data to send to the peer, otherwise returns TSI_INCOMPLETE_DATA + which indicates that this method needs to be called again with more data + from the peer. In case of a fatal error in the handshake, another specific + error code is returned. + The caller is responsible for destroying the handshaker_result. However, + the caller should not free bytes_to_send, as the buffer is owned by the + tsi_handshaker object. */ +tsi_result tsi_handshaker_next( + tsi_handshaker *self, const unsigned char *received_bytes, + size_t received_bytes_size, unsigned char **bytes_to_send, + size_t *bytes_to_send_size, tsi_handshaker_result **handshaker_result, + tsi_handshaker_on_next_done_cb cb, void *user_data); + /* This method releases the tsi_handshaker object. After this method is called, no other method can be called on the object. */ void tsi_handshaker_destroy(tsi_handshaker *self); +/* This method initializes the necessary shared objects used for tsi + implementation. */ +void tsi_init(); + +/* This method destroys the shared objects created by tsi_init. */ +void tsi_destroy(); + #ifdef __cplusplus } #endif |