diff options
Diffstat (limited to 'src/core')
106 files changed, 2362 insertions, 1327 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index b80d831557..fc29dbd454 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -138,7 +138,7 @@ static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ @@ -160,7 +160,7 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 21ba4301ff..960d00e815 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -71,29 +71,88 @@ */ typedef enum { - WAIT_FOR_READY_UNSET, + /* zero so it can be default initialized */ + WAIT_FOR_READY_UNSET = 0, WAIT_FOR_READY_FALSE, WAIT_FOR_READY_TRUE } wait_for_ready_value; -typedef struct method_parameters { +typedef struct { + gpr_refcount refs; gpr_timespec timeout; wait_for_ready_value wait_for_ready; } method_parameters; +static method_parameters *method_parameters_ref( + method_parameters *method_params) { + gpr_ref(&method_params->refs); + return method_params; +} + +static void method_parameters_unref(method_parameters *method_params) { + if (gpr_unref(&method_params->refs)) { + gpr_free(method_params); + } +} + static void *method_parameters_copy(void *value) { - void *new_value = gpr_malloc(sizeof(method_parameters)); - memcpy(new_value, value, sizeof(method_parameters)); - return new_value; + return method_parameters_ref(value); } -static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) { - gpr_free(p); +static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { + method_parameters_unref(value); } static const grpc_slice_hash_table_vtable method_parameters_vtable = { method_parameters_free, method_parameters_copy}; +static bool parse_wait_for_ready(grpc_json *field, + wait_for_ready_value *wait_for_ready) { + if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { + return false; + } + *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE + : WAIT_FOR_READY_FALSE; + return true; +} + +static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) { + if (field->type != GRPC_JSON_STRING) return false; + size_t len = strlen(field->value); + if (field->value[len - 1] != 's') return false; + char *buf = gpr_strdup(field->value); + buf[len - 1] = '\0'; // Remove trailing 's'. + char *decimal_point = strchr(buf, '.'); + if (decimal_point != NULL) { + *decimal_point = '\0'; + timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); + if (timeout->tv_nsec == -1) { + gpr_free(buf); + return false; + } + // There should always be exactly 3, 6, or 9 fractional digits. + int multiplier = 1; + switch (strlen(decimal_point + 1)) { + case 9: + break; + case 6: + multiplier *= 1000; + break; + case 3: + multiplier *= 1000000; + break; + default: // Unsupported number of digits. + gpr_free(buf); + return false; + } + timeout->tv_nsec *= multiplier; + } + timeout->tv_sec = gpr_parse_nonnegative_int(buf); + gpr_free(buf); + if (timeout->tv_sec == -1) return false; + return true; +} + static void *method_parameters_create_from_json(const grpc_json *json) { wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; gpr_timespec timeout = {0, 0, GPR_TIMESPAN}; @@ -101,49 +160,14 @@ static void *method_parameters_create_from_json(const grpc_json *json) { if (field->key == NULL) continue; if (strcmp(field->key, "waitForReady") == 0) { if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate. - if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { - return NULL; - } - wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE - : WAIT_FOR_READY_FALSE; + if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL; } else if (strcmp(field->key, "timeout") == 0) { if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate. - if (field->type != GRPC_JSON_STRING) return NULL; - size_t len = strlen(field->value); - if (field->value[len - 1] != 's') return NULL; - char *buf = gpr_strdup(field->value); - buf[len - 1] = '\0'; // Remove trailing 's'. - char *decimal_point = strchr(buf, '.'); - if (decimal_point != NULL) { - *decimal_point = '\0'; - timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); - if (timeout.tv_nsec == -1) { - gpr_free(buf); - return NULL; - } - // There should always be exactly 3, 6, or 9 fractional digits. - int multiplier = 1; - switch (strlen(decimal_point + 1)) { - case 9: - break; - case 6: - multiplier *= 1000; - break; - case 3: - multiplier *= 1000000; - break; - default: // Unsupported number of digits. - gpr_free(buf); - return NULL; - } - timeout.tv_nsec *= multiplier; - } - timeout.tv_sec = gpr_parse_nonnegative_int(buf); - if (timeout.tv_sec == -1) return NULL; - gpr_free(buf); + if (!parse_timeout(field, &timeout)) return NULL; } } method_parameters *value = gpr_malloc(sizeof(method_parameters)); + gpr_ref_init(&value->refs, 1); value->timeout = timeout; value->wait_for_ready = wait_for_ready; return value; @@ -520,7 +544,6 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; - memset(chand, 0, sizeof(*chand)); GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. @@ -609,7 +632,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, #define CANCELLED_CALL ((grpc_subchannel_call *)1) typedef enum { - GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, + /* zero so that it can be default-initialized */ + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING = 0, GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL } subchannel_creation_phase; @@ -630,14 +654,14 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; - wait_for_ready_value wait_for_ready_from_service_config; - grpc_closure read_service_config; + method_parameters *method_params; grpc_error *cancel_error; /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; + gpr_arena *arena; subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; @@ -704,6 +728,47 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { gpr_free(ops); } +// Sets calld->method_params. +// If the method params specify a timeout, populates +// *per_method_deadline and returns true. +static bool set_call_method_params_from_service_config_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + gpr_timespec *per_method_deadline) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->method_params_table != NULL) { + calld->method_params = grpc_method_config_table_get( + exec_ctx, chand->method_params_table, calld->path); + if (calld->method_params != NULL) { + method_parameters_ref(calld->method_params); + if (gpr_time_cmp(calld->method_params->timeout, + gpr_time_0(GPR_TIMESPAN)) != 0) { + *per_method_deadline = + gpr_time_add(calld->call_start_time, calld->method_params->timeout); + return true; + } + } + } + return false; +} + +static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + /* apply service-config level configuration to the call (now that we're + * certain it exists) */ + call_data *calld = elem->call_data; + gpr_timespec per_method_deadline; + if (set_call_method_params_from_service_config_locked(exec_ctx, elem, + &per_method_deadline)) { + // If the deadline from the service config is shorter than the one + // from the client API, reset the deadline timer. + if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { + calld->deadline = per_method_deadline; + grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); + } + } +} + static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; @@ -732,9 +797,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; + const grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -829,6 +899,7 @@ static bool pick_subchannel_locked( } GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { + apply_final_configuration_locked(exec_ctx, elem); grpc_lb_policy *lb_policy = chand->lb_policy; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); // If the application explicitly set wait_for_ready, use that. @@ -838,10 +909,11 @@ static bool pick_subchannel_locked( initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; const bool wait_for_ready_set_from_service_config = - calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET; + calld->method_params != NULL && + calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET; if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) { - if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) { + if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) { initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; } else { initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; @@ -959,9 +1031,14 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; + const grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); @@ -979,10 +1056,9 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, add_waiting_locked(calld, op); } -static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error_ignored) { - GPR_TIMER_BEGIN("cc_start_transport_stream_op_locked", 0); +static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); grpc_transport_stream_op *op = arg; grpc_call_element *elem = op->handler_private.args[0]; @@ -992,7 +1068,7 @@ static void cc_start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "start_transport_stream_op"); - GPR_TIMER_END("cc_start_transport_stream_op_locked", 0); + GPR_TIMER_END("start_transport_stream_op_locked", 0); } /* The logic here is fairly complicated, due to (a) the fact that we @@ -1032,136 +1108,25 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, grpc_closure_sched( exec_ctx, grpc_closure_init(&op->handler_private.closure, - cc_start_transport_stream_op_locked, op, + start_transport_stream_op_locked, op, grpc_combiner_scheduler(chand->combiner, false)), GRPC_ERROR_NONE); GPR_TIMER_END("cc_start_transport_stream_op", 0); } -// Gets data from the service config. Invoked when the resolver returns -// its initial result. -static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_call_element *elem = arg; - channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; - // If this is an error, there's no point in looking at the service config. - if (error == GRPC_ERROR_NONE) { - // Get the method config table from channel data. - grpc_slice_hash_table *method_params_table = NULL; - if (chand->method_params_table != NULL) { - method_params_table = - grpc_slice_hash_table_ref(chand->method_params_table); - } - // If the method config table was present, use it. - if (method_params_table != NULL) { - const method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, calld->path); - if (method_params != NULL) { - const bool have_method_timeout = - gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; - if (have_method_timeout || - method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - if (have_method_timeout) { - const gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, method_params->timeout); - if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { - calld->deadline = per_method_deadline; - // Reset deadline timer. - grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); - } - } - if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - calld->wait_for_ready_from_service_config = - method_params->wait_for_ready; - } - } - } - grpc_slice_hash_table_unref(exec_ctx, method_params_table); - } - } - GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); -} - -static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error_ignored) { - grpc_call_element *elem = arg; - channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; - // If the resolver has already returned results, then we can access - // the service config parameters immediately. Otherwise, we need to - // defer that work until the resolver returns an initial result. - // TODO(roth): This code is almost but not quite identical to the code - // in read_service_config() above. It would be nice to find a way to - // combine them, to avoid having to maintain it twice. - if (chand->lb_policy != NULL) { - // We already have a resolver result, so check for service config. - if (chand->method_params_table != NULL) { - grpc_slice_hash_table *method_params_table = - grpc_slice_hash_table_ref(chand->method_params_table); - method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, calld->path); - if (method_params != NULL) { - if (gpr_time_cmp(method_params->timeout, - gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { - gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, method_params->timeout); - calld->deadline = gpr_time_min(calld->deadline, per_method_deadline); - } - if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - calld->wait_for_ready_from_service_config = - method_params->wait_for_ready; - } - } - grpc_slice_hash_table_unref(exec_ctx, method_params_table); - } - } else { - // We don't yet have a resolver result, so register a callback to - // get the service config data once the resolver returns. - // Take a reference to the call stack to be owned by the callback. - GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config_locked, - elem, grpc_combiner_scheduler(chand->combiner, false)); - grpc_closure_list_append(&chand->waiting_for_config_closures, - &calld->read_service_config, GRPC_ERROR_NONE); - } - // Start the deadline timer with the current deadline value. If we - // do not yet have service config data, then the timer may be reset - // later. - grpc_deadline_state_start(exec_ctx, elem, calld->deadline); - GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, - "initial_read_service_config"); -} - /* Constructor for call_data */ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; // Initialize data members. grpc_deadline_state_init(exec_ctx, elem, args->call_stack); calld->path = grpc_slice_ref_internal(args->path); calld->call_start_time = args->start_time; calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); - calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; - calld->cancel_error = GRPC_ERROR_NONE; - gpr_atm_rel_store(&calld->subchannel_call, 0); - calld->connected_subchannel = NULL; - calld->waiting_ops = NULL; - calld->waiting_ops_count = 0; - calld->waiting_ops_capacity = 0; - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->owning_call = args->call_stack; - calld->pollent = NULL; - GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); - grpc_closure_sched( - exec_ctx, - grpc_closure_init(&calld->read_service_config, - initial_read_service_config_locked, elem, - grpc_combiner_scheduler(chand->combiner, false)), - GRPC_ERROR_NONE); + calld->arena = args->arena; + grpc_deadline_state_start(exec_ctx, elem, calld->deadline); return GRPC_ERROR_NONE; } @@ -1169,13 +1134,18 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); grpc_slice_unref_internal(exec_ctx, calld->path); + if (calld->method_params != NULL) { + method_parameters_unref(calld->method_params); + } GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { + grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + then_schedule_closure = NULL; GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); @@ -1185,7 +1155,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, "picked"); } gpr_free(calld->waiting_ops); - gpr_free(and_free_memory); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 6f9df3e386..28d3b63f99 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -64,7 +64,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, } } char *default_authority = grpc_get_default_authority( - grpc_channel_stack_builder_get_target(builder)); + exec_ctx, grpc_channel_stack_builder_get_target(builder)); if (default_authority != NULL) { grpc_arg arg; arg.type = GRPC_ARG_STRING; diff --git a/src/core/ext/client_channel/http_proxy.c b/src/core/ext/client_channel/http_proxy.c index bbe4ff550c..e280cef101 100644 --- a/src/core/ext/client_channel/http_proxy.c +++ b/src/core/ext/client_channel/http_proxy.c @@ -46,10 +46,11 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/support/env.h" -static char* grpc_get_http_proxy_server() { +static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) { char* uri_str = gpr_getenv("http_proxy"); if (uri_str == NULL) return NULL; - grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */); + grpc_uri* uri = + grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */); char* proxy_name = NULL; if (uri == NULL || uri->authority == NULL) { gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var"); @@ -76,9 +77,10 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx, const grpc_channel_args* args, char** name_to_resolve, grpc_channel_args** new_args) { - *name_to_resolve = grpc_get_http_proxy_server(); + *name_to_resolve = grpc_get_http_proxy_server(exec_ctx); if (*name_to_resolve == NULL) return false; - grpc_uri* uri = grpc_uri_parse(server_uri, false /* suppress_errors */); + grpc_uri* uri = + grpc_uri_parse(exec_ctx, server_uri, false /* suppress_errors */); if (uri == NULL || uri->path[0] == '\0') { gpr_log(GPR_ERROR, "'http_proxy' environment variable set, but cannot " diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c index b1d55ad0f5..cd1b2cd80c 100644 --- a/src/core/ext/client_channel/parse_address.c +++ b/src/core/ext/client_channel/parse_address.c @@ -44,16 +44,18 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/lib/support/string.h" #ifdef GRPC_HAVE_UNIX_SOCKET int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) { struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr; - + const size_t maxlen = sizeof(un->sun_path); + const size_t path_len = strnlen(uri->path, maxlen); + if (path_len == maxlen) return 0; un->sun_family = AF_UNIX; strcpy(un->sun_path, uri->path); - resolved_addr->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; - + resolved_addr->len = sizeof(*un); return 1; } @@ -119,9 +121,33 @@ int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) { memset(in6, 0, sizeof(*in6)); resolved_addr->len = sizeof(*in6); in6->sin6_family = AF_INET6; - if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); - goto done; + + /* Handle the RFC6874 syntax for IPv6 zone identifiers. */ + char *host_end = (char *)gpr_memrchr(host, '%', strlen(host)); + if (host_end != NULL) { + GPR_ASSERT(host_end >= host); + char host_without_scope[INET6_ADDRSTRLEN]; + size_t host_without_scope_len = (size_t)(host_end - host); + uint32_t sin6_scope_id = 0; + strncpy(host_without_scope, host, host_without_scope_len); + host_without_scope[host_without_scope_len] = '\0'; + if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope); + goto done; + } + if (gpr_parse_bytes_to_uint32(host_end + 1, + strlen(host) - host_without_scope_len - 1, + &sin6_scope_id) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1); + goto done; + } + // Handle "sin6_scope_id" being type "u_long". See grpc issue ##10027. + in6->sin6_scope_id = sin6_scope_id; + } else { + if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); + goto done; + } } if (port != NULL) { diff --git a/src/core/ext/client_channel/proxy_mapper_registry.c b/src/core/ext/client_channel/proxy_mapper_registry.c index 2c44b9d490..0935ddbdbd 100644 --- a/src/core/ext/client_channel/proxy_mapper_registry.c +++ b/src/core/ext/client_channel/proxy_mapper_registry.c @@ -94,6 +94,14 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { grpc_proxy_mapper_destroy(list->list[i]); } gpr_free(list->list); + // Clean up in case we re-initialze later. + // TODO(ctiller): This should ideally live in + // grpc_proxy_mapper_registry_init(). However, if we did this there, + // then we would do it AFTER we start registering proxy mappers from + // third-party plugins, so they'd never show up (and would leak memory). + // We probably need some sort of dependency system for plugins to fix + // this. + memset(list, 0, sizeof(*list)); } // @@ -102,9 +110,7 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { static grpc_proxy_mapper_list g_proxy_mapper_list; -void grpc_proxy_mapper_registry_init() { - memset(&g_proxy_mapper_list, 0, sizeof(g_proxy_mapper_list)); -} +void grpc_proxy_mapper_registry_init() {} void grpc_proxy_mapper_registry_shutdown() { grpc_proxy_mapper_list_destroy(&g_proxy_mapper_list); diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index 1b93d0b3c9..0f074a3386 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -107,22 +107,23 @@ static grpc_resolver_factory *lookup_factory_by_uri(grpc_uri *uri) { return lookup_factory(uri->scheme); } -static grpc_resolver_factory *resolve_factory(const char *target, +static grpc_resolver_factory *resolve_factory(grpc_exec_ctx *exec_ctx, + const char *target, grpc_uri **uri, char **canonical_target) { grpc_resolver_factory *factory = NULL; GPR_ASSERT(uri != NULL); - *uri = grpc_uri_parse(target, 1); + *uri = grpc_uri_parse(exec_ctx, target, 1); factory = lookup_factory_by_uri(*uri); if (factory == NULL) { grpc_uri_destroy(*uri); gpr_asprintf(canonical_target, "%s%s", g_default_resolver_prefix, target); - *uri = grpc_uri_parse(*canonical_target, 1); + *uri = grpc_uri_parse(exec_ctx, *canonical_target, 1); factory = lookup_factory_by_uri(*uri); if (factory == NULL) { - grpc_uri_destroy(grpc_uri_parse(target, 0)); - grpc_uri_destroy(grpc_uri_parse(*canonical_target, 0)); + grpc_uri_destroy(grpc_uri_parse(exec_ctx, target, 0)); + grpc_uri_destroy(grpc_uri_parse(exec_ctx, *canonical_target, 0)); gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, *canonical_target); } @@ -137,7 +138,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = - resolve_factory(target, &uri, &canonical_target); + resolve_factory(exec_ctx, target, &uri, &canonical_target); grpc_resolver *resolver; grpc_resolver_args resolver_args; memset(&resolver_args, 0, sizeof(resolver_args)); @@ -152,21 +153,22 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, return resolver; } -char *grpc_get_default_authority(const char *target) { +char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target) { grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = - resolve_factory(target, &uri, &canonical_target); + resolve_factory(exec_ctx, target, &uri, &canonical_target); char *authority = grpc_resolver_factory_get_default_authority(factory, uri); grpc_uri_destroy(uri); gpr_free(canonical_target); return authority; } -char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target) { +char *grpc_resolver_factory_add_default_prefix_if_needed( + grpc_exec_ctx *exec_ctx, const char *target) { grpc_uri *uri = NULL; char *canonical_target = NULL; - resolve_factory(target, &uri, &canonical_target); + resolve_factory(exec_ctx, target, &uri, &canonical_target); grpc_uri_destroy(uri); return canonical_target == NULL ? gpr_strdup(target) : canonical_target; } diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h index e2c189cf0c..1a3ebee25a 100644 --- a/src/core/ext/client_channel/resolver_registry.h +++ b/src/core/ext/client_channel/resolver_registry.h @@ -74,10 +74,11 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name); /** Given a target, return a (freshly allocated with gpr_malloc) string representing the default authority to pass from a client. */ -char *grpc_get_default_authority(const char *target); +char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target); /** Returns a newly allocated string containing \a target, adding the default prefix if needed. */ -char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target); +char *grpc_resolver_factory_add_default_prefix_if_needed( + grpc_exec_ctx *exec_ctx, const char *target); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index f2da148e49..ed5029ea9a 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -148,6 +148,7 @@ struct grpc_subchannel { struct grpc_subchannel_call { grpc_connected_subchannel *connection; + grpc_closure *schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) @@ -316,8 +317,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, return c; } - c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); + c = gpr_zalloc(sizeof(*c)); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; @@ -332,7 +332,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address *addr = gpr_malloc(sizeof(*addr)); - grpc_get_subchannel_address_arg(args->args, addr); + grpc_get_subchannel_address_arg(exec_ctx, args->args, addr); grpc_set_initial_connect_string(&addr, &c->initial_connect_string); grpc_resolved_address *new_address = NULL; grpc_channel_args *new_args = NULL; @@ -341,17 +341,15 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(new_address != NULL); gpr_free(addr); addr = new_address; - if (new_args != NULL) c->args = new_args; - } - if (c->args == NULL) { - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); - c->args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, - 1); - gpr_free(new_arg.value.string); } + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); gpr_free(addr); + c->args = grpc_channel_args_copy_and_add_and_remove( + new_args != NULL ? new_args : args->args, keys_to_remove, + GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); + gpr_free(new_arg.value.string); + if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c, @@ -720,13 +718,22 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_subchannel_call *c = call; + GPR_ASSERT(c->schedule_closure_after_destroy != NULL); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_connected_subchannel *connection = c->connection; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c); + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, + c->schedule_closure_after_destroy); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } +void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call, + grpc_closure *closure) { + GPR_ASSERT(call->schedule_closure_after_destroy == NULL); + GPR_ASSERT(closure != NULL); + call->schedule_closure_after_destroy = closure; +} + void grpc_subchannel_call_ref( grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); @@ -762,15 +769,22 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **call) { + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); + *call = gpr_arena_alloc( + args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. - grpc_error *error = - grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, path, start_time, deadline, callstk); + const grpc_call_element_args call_args = {.call_stack = callstk, + .server_transport_data = NULL, + .context = NULL, + .path = args->path, + .start_time = args->start_time, + .deadline = args->deadline, + .arena = args->arena}; + grpc_error *error = grpc_call_stack_init( + exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); @@ -779,7 +793,7 @@ grpc_error *grpc_connected_subchannel_create_call( return error; } GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent); + grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent); return GRPC_ERROR_NONE; } @@ -788,9 +802,9 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack( return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); } -static void grpc_uri_to_sockaddr(const char *uri_str, +static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str, grpc_resolved_address *addr) { - grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); + grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */); GPR_ASSERT(uri != NULL); if (strcmp(uri->scheme, "ipv4") == 0) { GPR_ASSERT(parse_ipv4(uri, addr)); @@ -802,12 +816,13 @@ static void grpc_uri_to_sockaddr(const char *uri_str, grpc_uri_destroy(uri); } -void grpc_get_subchannel_address_arg(const grpc_channel_args *args, +void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx, + const grpc_channel_args *args, grpc_resolved_address *addr) { const char *addr_uri_str = grpc_get_subchannel_address_uri_arg(args); memset(addr, 0, sizeof(*addr)); if (*addr_uri_str != '\0') { - grpc_uri_to_sockaddr(addr_uri_str, addr); + grpc_uri_to_sockaddr(exec_ctx, addr_uri_str, addr); } } diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 26ce954487..3e64a2507c 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -37,6 +37,7 @@ #include "src/core/ext/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -112,10 +113,18 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a subchannel call */ +typedef struct { + grpc_polling_entity *pollent; + grpc_slice path; + gpr_timespec start_time; + gpr_timespec deadline; + gpr_arena *arena; +} grpc_connected_subchannel_call_args; + grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **subchannel_call); + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( @@ -154,6 +163,11 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call); +/** Must be called once per call. Sets the 'then_schedule_closure' argument for + call stack destruction. */ +void grpc_subchannel_call_set_cleanup_closure( + grpc_subchannel_call *subchannel_call, grpc_closure *closure); + grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call); @@ -175,7 +189,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, const grpc_subchannel_args *args); /// Sets \a addr from \a args. -void grpc_get_subchannel_address_arg(const grpc_channel_args *args, +void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx, + const grpc_channel_args *args, grpc_resolved_address *addr); /// Returns the URI string for the address to connect to. diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c index f8c946b275..d385db0801 100644 --- a/src/core/ext/client_channel/uri_parser.c +++ b/src/core/ext/client_channel/uri_parser.c @@ -35,13 +35,15 @@ #include <string.h> -#include <grpc/slice.h> #include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/lib/slice/percent_encoding.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" /** a size_t default value... maps to all 1's */ @@ -68,11 +70,16 @@ static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section, return NULL; } -/** Returns a copy of \a src[begin, end) */ -static char *copy_component(const char *src, size_t begin, size_t end) { - char *out = gpr_malloc(end - begin + 1); - memcpy(out, src + begin, end - begin); - out[end - begin] = 0; +/** Returns a copy of percent decoded \a src[begin, end) */ +static char *decode_and_copy_component(grpc_exec_ctx *exec_ctx, const char *src, + size_t begin, size_t end) { + grpc_slice component = + grpc_slice_from_copied_buffer(src + begin, end - begin); + grpc_slice decoded_component = + grpc_permissive_percent_decode_slice(component); + char *out = grpc_dump_slice(decoded_component, GPR_DUMP_ASCII); + grpc_slice_unref_internal(exec_ctx, component); + grpc_slice_unref_internal(exec_ctx, decoded_component); return out; } @@ -175,7 +182,8 @@ static void parse_query_parts(grpc_uri *uri) { } } -grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { +grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text, + int suppress_errors) { grpc_uri *uri; size_t scheme_begin = 0; size_t scheme_end = NOT_SET; @@ -262,13 +270,17 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { fragment_end = i; } - uri = gpr_malloc(sizeof(*uri)); - memset(uri, 0, sizeof(*uri)); - uri->scheme = copy_component(uri_text, scheme_begin, scheme_end); - uri->authority = copy_component(uri_text, authority_begin, authority_end); - uri->path = copy_component(uri_text, path_begin, path_end); - uri->query = copy_component(uri_text, query_begin, query_end); - uri->fragment = copy_component(uri_text, fragment_begin, fragment_end); + uri = gpr_zalloc(sizeof(*uri)); + uri->scheme = + decode_and_copy_component(exec_ctx, uri_text, scheme_begin, scheme_end); + uri->authority = decode_and_copy_component(exec_ctx, uri_text, + authority_begin, authority_end); + uri->path = + decode_and_copy_component(exec_ctx, uri_text, path_begin, path_end); + uri->query = + decode_and_copy_component(exec_ctx, uri_text, query_begin, query_end); + uri->fragment = decode_and_copy_component(exec_ctx, uri_text, fragment_begin, + fragment_end); parse_query_parts(uri); return uri; diff --git a/src/core/ext/client_channel/uri_parser.h b/src/core/ext/client_channel/uri_parser.h index 5fe0e8f35e..efd4302c1c 100644 --- a/src/core/ext/client_channel/uri_parser.h +++ b/src/core/ext/client_channel/uri_parser.h @@ -35,6 +35,7 @@ #define GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H #include <stddef.h> +#include "src/core/lib/iomgr/exec_ctx.h" typedef struct { char *scheme; @@ -51,7 +52,8 @@ typedef struct { } grpc_uri; /** parse a uri, return NULL on failure */ -grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors); +grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text, + int suppress_errors); /** return the part of a query string after the '=' in "?key=xxx&...", or NULL * if key is not present */ diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index b5210cb046..d612591f2e 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -238,9 +238,7 @@ static void add_pending_pick(pending_pick **root, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, grpc_closure *on_complete) { - pending_pick *pp = gpr_malloc(sizeof(*pp)); - memset(pp, 0, sizeof(pending_pick)); - memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); + pending_pick *pp = gpr_zalloc(sizeof(*pp)); pp->next = *root; pp->pick_args = *pick_args; pp->target = target; @@ -265,9 +263,7 @@ typedef struct pending_ping { } pending_ping; static void add_pending_ping(pending_ping **root, grpc_closure *notify) { - pending_ping *pping = gpr_malloc(sizeof(*pping)); - memset(pping, 0, sizeof(pending_ping)); - memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg)); + pending_ping *pping = gpr_zalloc(sizeof(*pping)); pping->wrapped_notify_arg.wrapped_closure = notify; pping->wrapped_notify_arg.free_when_done = pping; pping->next = *root; @@ -674,8 +670,7 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, /* Allocate the data for the tracking of the new RR policy's connectivity. * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = - gpr_malloc(sizeof(rr_connectivity_data)); - memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); + gpr_zalloc(sizeof(rr_connectivity_data)); grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed_locked, rr_connectivity, grpc_combiner_scheduler(glb_policy->base.combiner, false)); @@ -860,14 +855,13 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, } if (num_grpclb_addrs == 0) return NULL; - glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); - memset(glb_policy, 0, sizeof(*glb_policy)); + glb_lb_policy *glb_policy = gpr_zalloc(sizeof(*glb_policy)); /* Get server name. */ arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); GPR_ASSERT(arg != NULL); GPR_ASSERT(arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(arg->value.string, true); + grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); glb_policy->server_name = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); @@ -1047,8 +1041,7 @@ static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); - wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); - memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg)); + wrapped_rr_closure_arg *wc_arg = gpr_zalloc(sizeof(wrapped_rr_closure_arg)); grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c index 837e9c1113..3c4604c402 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c @@ -62,8 +62,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, } dec_arg->num_servers++; } else { /* second pass. Actually decode. */ - grpc_grpclb_server *server = gpr_malloc(sizeof(grpc_grpclb_server)); - memset(server, 0, sizeof(grpc_grpclb_server)); + grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); GPR_ASSERT(dec_arg->num_servers > 0); if (dec_arg->decoding_idx == 0) { /* first iteration of second pass */ dec_arg->servers = @@ -160,8 +159,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( return NULL; } - grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist)); - memset(sl, 0, sizeof(*sl)); + grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); sl->num_servers = arg.num_servers; sl->servers = arg.servers; if (res.server_list.has_expiration_interval) { @@ -183,8 +181,7 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) { grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( const grpc_grpclb_serverlist *sl) { - grpc_grpclb_serverlist *copy = gpr_malloc(sizeof(grpc_grpclb_serverlist)); - memset(copy, 0, sizeof(grpc_grpclb_serverlist)); + grpc_grpclb_serverlist *copy = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); copy->num_servers = sl->num_servers; memcpy(©->expiration_interval, &sl->expiration_interval, sizeof(grpc_grpclb_duration)); diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 501cb6d94d..e2a66d16bd 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -409,11 +409,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } if (num_addrs == 0) return NULL; - pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - memset(p, 0, sizeof(*p)); + pick_first_lb_policy *p = gpr_zalloc(sizeof(*p)); - p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); + p->subchannels = gpr_zalloc(sizeof(grpc_subchannel *) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 687df170ad..f2d1d46179 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -213,8 +213,7 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { * csc to the list of ready subchannels. */ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, subchannel_data *sd) { - ready_list *new_elem = gpr_malloc(sizeof(ready_list)); - memset(new_elem, 0, sizeof(ready_list)); + ready_list *new_elem = gpr_zalloc(sizeof(ready_list)); new_elem->subchannel = sd->subchannel; new_elem->user_data = sd->user_data; if (p->ready_list.prev == NULL) { @@ -699,12 +698,10 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, } if (num_addrs == 0) return NULL; - round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); - memset(p, 0, sizeof(*p)); + round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); p->num_addresses = num_addrs; - p->subchannels = gpr_malloc(sizeof(*p->subchannels) * num_addrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); + p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; @@ -731,8 +728,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel != NULL) { - subchannel_data *sd = gpr_malloc(sizeof(*sd)); - memset(sd, 0, sizeof(*sd)); + subchannel_data *sd = gpr_zalloc(sizeof(*sd)); p->subchannels[subchannel_idx] = sd; sd->policy = p; sd->index = subchannel_idx; diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index c6386a8942..4ed832671d 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -102,8 +102,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { call_data *calld = elem->call_data; - memset(calld, 0, sizeof(call_data)); - calld->id = (intptr_t)args->call_stack; grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem, grpc_schedule_on_exec_ctx); @@ -125,7 +123,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; /* TODO(dgq): do something with the data @@ -154,8 +152,6 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->is_last); channel_data *chand = elem->channel_data; - memset(chand, 0, sizeof(channel_data)); - chand->id = (intptr_t)args->channel_stack; /* TODO(dgq): do something with the data diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 6e6a804eaa..328fee84ad 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -255,8 +255,7 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, char *path = args->uri->path; if (path[0] == '/') ++path; // Create resolver. - dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); - memset(r, 0, sizeof(*r)); + dns_resolver *r = gpr_zalloc(sizeof(dns_resolver)); grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner); r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index e7f66649b5..da5923d26f 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -190,8 +190,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, return NULL; } /* Instantiate resolver. */ - sockaddr_resolver *r = gpr_malloc(sizeof(sockaddr_resolver)); - memset(r, 0, sizeof(*r)); + sockaddr_resolver *r = gpr_zalloc(sizeof(sockaddr_resolver)); r->addresses = addresses; r->channel_args = grpc_channel_args_copy(args->args); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner); diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index d0a762a280..eae0145ecc 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -226,7 +226,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, grpc_closure *notify) { chttp2_connector *c = (chttp2_connector *)con; grpc_resolved_address addr; - grpc_get_subchannel_address_arg(args->channel_args, &addr); + grpc_get_subchannel_address_arg(exec_ctx, args->channel_args, &addr); gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == NULL); c->notify = notify; @@ -248,8 +248,7 @@ static const grpc_connector_vtable chttp2_connector_vtable = { chttp2_connector_connect}; grpc_connector *grpc_chttp2_connector_create() { - chttp2_connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); + chttp2_connector *c = gpr_zalloc(sizeof(*c)); c->base.vtable = &chttp2_connector_vtable; gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 490a0c560e..067ac35a5a 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -72,8 +72,11 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index d8c18eb122..f0c241d68e 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -83,7 +83,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *server_uri_str = server_uri_arg->value.string; GPR_ASSERT(server_uri_str != NULL); grpc_uri *server_uri = - grpc_uri_parse(server_uri_str, true /* supress errors */); + grpc_uri_parse(exec_ctx, server_uri_str, true /* supress errors */); GPR_ASSERT(server_uri != NULL); const char *server_uri_path; server_uri_path = @@ -96,7 +96,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *target_uri_str = grpc_get_subchannel_address_uri_arg(args->args); grpc_uri *target_uri = - grpc_uri_parse(target_uri_str, false /* suppress errors */); + grpc_uri_parse(exec_ctx, target_uri_str, false /* suppress errors */); GPR_ASSERT(target_uri != NULL); if (target_uri->path[0] != '\0') { // "path" may be empty const grpc_slice key = grpc_slice_from_static_string( @@ -181,8 +181,11 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index 0fc180d52f..837b9fd03d 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -222,8 +222,7 @@ grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, if (err != GRPC_ERROR_NONE) { goto error; } - state = gpr_malloc(sizeof(*state)); - memset(state, 0, sizeof(*state)); + state = gpr_zalloc(sizeof(*state)); grpc_closure_init(&state->tcp_server_shutdown_complete, tcp_server_shutdown_complete, state, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 28a3166832..082078c72f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -48,16 +48,19 @@ #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/http2_errors.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_conversion.h" #include "src/core/lib/transport/timeout_encoding.h" +#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport_impl.h" #define DEFAULT_WINDOW 65535 @@ -66,6 +69,10 @@ #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) +#define DEFAULT_KEEPALIVE_TIME_SECOND INT_MAX +#define DEFAULT_KEEPALIVE_TIMEOUT_SECOND 20 +#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false + #define MAX_CLIENT_STREAM_ID 0x7fffffffu int grpc_http_trace = 0; int grpc_flowctl_trace = 0; @@ -139,6 +146,16 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, #define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 3 +/** keepalive-relevant functions */ +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -218,8 +235,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - memset(t, 0, sizeof(*t)); - t->base.vtable = &vtable; t->ep = ep; /* one ref is for destroy */ @@ -255,6 +270,17 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner, false)); grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, + t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->start_keepalive_ping_locked, + start_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->finish_keepalive_ping_locked, + finish_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->keepalive_watchdog_fired_locked, + keepalive_watchdog_fired_locked, t, + grpc_combiner_scheduler(t->combiner, false)); grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); @@ -316,6 +342,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN), }; + /* client-side keepalive setting */ + t->keepalive_time = + DEFAULT_KEEPALIVE_TIME_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN); + t->keepalive_timeout = + DEFAULT_KEEPALIVE_TIMEOUT_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND, + GPR_TIMESPAN); + t->keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -363,6 +401,28 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { t->enable_bdp_probe = grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){1, 0, 1}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_TIME)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_KEEPALIVE_TIME_SECOND, 1, INT_MAX}); + t->keepalive_time = value == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(value, GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_TIMEOUT)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_KEEPALIVE_TIMEOUT_SECOND, 0, + INT_MAX}); + t->keepalive_timeout = value == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(value, GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { + t->keepalive_permit_without_calls = + (uint32_t)grpc_channel_arg_get_integer( + &channel_args->args[i], (grpc_integer_options){0, 0, 1}); } else { static const struct { const char *channel_arg_name; @@ -414,6 +474,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; + /** Start client-side keepalive pings */ + if (t->is_client) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + } + grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); post_benign_reclaimer(exec_ctx, t); } @@ -441,6 +511,10 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (!grpc_error_has_clear_grpc_status(error)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) { if (t->close_transport_on_writes_finished == NULL) { t->close_transport_on_writes_finished = @@ -450,14 +524,26 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error_add_child(t->close_transport_on_writes_finished, error); return; } - if (!grpc_error_has_clear_grpc_status(error)) { - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE); - } t->closed = 1; connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); + if (t->is_client) { + switch (t->keepalive_state) { + case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: { + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + break; + } + case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: { + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + break; + } + case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: { + break; + } + } + } /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; @@ -489,13 +575,11 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - memset(s, 0, sizeof(*s)); - s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is @@ -504,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); @@ -581,16 +665,17 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_TIMER_END("destroy_stream", 0); - gpr_free(s->destroy_stream_arg); + grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - s->destroy_stream_arg = and_free_memory; + s->destroy_stream_arg = then_schedule_closure; grpc_closure_sched( exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, grpc_combiner_scheduler(t->combiner, false)), @@ -1545,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->recv_trailing_metadata_finished != NULL) { char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS, - grpc_slice_from_copied_string(status_string))); + GRPC_LOG_IF_ERROR("add_status", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_GRPC_STATUS, + grpc_slice_from_copied_string(status_string)))); if (msg != NULL) { - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_from_copied_string(msg))); + GRPC_LOG_IF_ERROR( + "add_status_message", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); @@ -1987,6 +2076,77 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); } +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); + if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) { + if (t->keepalive_permit_without_calls || t->stream_map.count > 0) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, + &t->start_keepalive_ping_locked, + &t->finish_keepalive_ping_locked); + } else { + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); +} + +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); + grpc_timer_init( + exec_ctx, &t->keepalive_watchdog_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout), + &t->keepalive_watchdog_fired_locked, gpr_now(GPR_CLOCK_MONOTONIC)); +} + +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + grpc_closure_create(init_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)), + gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); +} + +static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + close_transport_locked(exec_ctx, t, + GRPC_ERROR_CREATE("keepalive watchdog timeout")); + } + } else { + /** The watchdog timer should have been cancelled by + finish_keepalive_ping_locked. */ + if (error != GRPC_ERROR_CANCELLED) { + gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", + t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog"); +} + /******************************************************************************* * CALLBACK LOOP */ @@ -2428,7 +2588,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, grpc_endpoint *ep, int is_client) { - grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); + grpc_chttp2_transport *t = gpr_zalloc(sizeof(grpc_chttp2_transport)); init_transport(exec_ctx, t, channel_args, ep, is_client != 0); return &t->base; } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index f487533c41..9b4b1a7b84 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -91,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_ping_parser *p = parser; while (p->byte != 8 && cur != end) { - p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte)); + p->opaque_8bytes |= (((uint64_t)*cur) << (56 - 8 * p->byte)); cur++; p->byte++; } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index c91b019aa0..da0a34d32a 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -41,69 +41,48 @@ #include <grpc/support/log.h> void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer) { - buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { + buffer->arena = arena; + grpc_metadata_batch_init(&buffer->batch); + buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) { - size_t i; - if (!buffer->published) { - for (i = 0; i < buffer->count; i++) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - gpr_free(buffer->elems); + grpc_metadata_batch_destroy(exec_ctx, &buffer->batch); } -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - GPR_ASSERT(!buffer->published); - if (buffer->capacity == buffer->count) { - buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); - buffer->elems = - gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity); - } - buffer->elems[buffer->count++].md = elem; +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); + return grpc_metadata_batch_add_tail( + exec_ctx, &buffer->batch, + gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); } -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - for (size_t i = 0; i < buffer->count; i++) { - if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - buffer->elems[i].md = elem; - return; + for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL; + l = l->next) { + if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + l->md = elem; + return GRPC_ERROR_NONE; } } - grpc_chttp2_incoming_metadata_buffer_add(buffer, elem); + return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem); } void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { - GPR_ASSERT(!buffer->published); - buffer->deadline = deadline; + buffer->batch.deadline = deadline; } void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) { - GPR_ASSERT(!buffer->published); - buffer->published = 1; - if (buffer->count > 0) { - size_t i; - for (i = 0; i < buffer->count; i++) { - /* TODO(ctiller): do something better here */ - if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish", - grpc_metadata_batch_link_tail( - exec_ctx, batch, &buffer->elems[i]))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - } else { - batch->list.head = batch->list.tail = NULL; - } - batch->deadline = buffer->deadline; + *batch = buffer->batch; + grpc_metadata_batch_init(&buffer->batch); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index 1eac6fc150..288c917e65 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -37,28 +37,26 @@ #include "src/core/lib/transport/transport.h" typedef struct { - grpc_linked_mdelem *elems; - size_t count; - size_t capacity; - gpr_timespec deadline; - int published; + gpr_arena *arena; + grpc_metadata_batch batch; size_t size; // total size of metadata } grpc_chttp2_incoming_metadata_buffer; /** assumes everything initially zeroed */ void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena); void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch); -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem); -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, - grpc_mdelem elem); + grpc_mdelem elem) GRPC_MUST_USE_RESULT; +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) GRPC_MUST_USE_RESULT; void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 5d41f4bfda..3c56c21599 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -50,6 +50,7 @@ #include "src/core/ext/transport/chttp2/transport/stream_map.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/pid_controller.h" @@ -208,6 +209,12 @@ struct grpc_chttp2_incoming_byte_stream { grpc_closure finished_action; }; +typedef enum { + GRPC_CHTTP2_KEEPALIVE_STATE_WAITING, + GRPC_CHTTP2_KEEPALIVE_STATE_PINGING, + GRPC_CHTTP2_KEEPALIVE_STATE_DYING, +} grpc_chttp2_keepalive_state; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -382,6 +389,28 @@ struct grpc_chttp2_transport { grpc_closure benign_reclaimer_locked; /** destructive cleanup closure */ grpc_closure destructive_reclaimer_locked; + + /* keep-alive ping support */ + /** Closure to initialize a keepalive ping */ + grpc_closure init_keepalive_ping_locked; + /** Closure to run when the keepalive ping is sent */ + grpc_closure start_keepalive_ping_locked; + /** Cousure to run when the keepalive ping ack is received */ + grpc_closure finish_keepalive_ping_locked; + /** Closrue to run when the keepalive ping timeouts */ + grpc_closure keepalive_watchdog_fired_locked; + /** timer to initiate ping events */ + grpc_timer keepalive_ping_timer; + /** watchdog to kill the transport when waiting for the keepalive ping */ + grpc_timer keepalive_watchdog_timer; + /** time duration in between pings */ + gpr_timespec keepalive_time; + /** grace period for a ping to complete before watchdog kicks in */ + gpr_timespec keepalive_timeout; + /** if keepalive pings are allowed when there's no outstanding streams */ + bool keepalive_permit_without_calls; + /** keep-alive state machine state */ + grpc_chttp2_keepalive_state keepalive_state; }; typedef enum { @@ -396,7 +425,7 @@ struct grpc_chttp2_stream { grpc_stream_refcount *refcount; grpc_closure destroy_stream; - void *destroy_stream_arg; + grpc_closure *destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT]; diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 7ed00522c3..7efc8c63c9 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -381,16 +381,38 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, s->incoming_window_delta + t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { - char *msg; - gpr_asprintf(&msg, - "frame of size %d overflows incoming window of %" PRId64, - t->incoming_frame_size, - s->incoming_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); - grpc_error *err = GRPC_ERROR_CREATE(msg); - gpr_free(msg); - return err; + if (incoming_frame_size <= + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { + gpr_log( + GPR_ERROR, + "Incoming frame of size %d exceeds incoming window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to " + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + } else { + char *msg; + gpr_asprintf(&msg, + "frame of size %d overflows incoming window of %" PRId64, + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + grpc_error *err = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + return err; + } } GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, @@ -526,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[0], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } } @@ -576,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[1], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } GPR_TIMER_END("on_trailing_header", 0); diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 01a03533da..450d9ab23a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -54,6 +54,7 @@ #include "third_party/objective_c/Cronet/bidirectional_stream_c.h" #define GRPC_HEADER_SIZE_IN_BYTES 5 +#define GRPC_FLUSH_READ_SIZE 4096 #define CRONET_LOG(...) \ do { \ @@ -151,11 +152,17 @@ struct write_state { struct op_state { bool state_op_done[OP_NUM_OPS]; bool state_callback_received[OP_NUM_OPS]; + /* A non-zero gRPC status code has been seen */ bool fail_state; + /* Transport is discarding all buffered messages */ bool flush_read; bool flush_cronet_when_ready; bool pending_write_for_trailer; - bool unprocessed_send_message; + bool pending_send_message; + /* User requested RECV_TRAILING_METADATA */ + bool pending_recv_trailing_metadata; + /* Cronet has not issued a callback of a bidirectional read */ + bool pending_read_from_cronet; grpc_error *cancel_error; /* data structure for storing data coming from server */ struct read_state rs; @@ -177,6 +184,7 @@ struct op_storage { }; struct stream_obj { + gpr_arena *arena; struct op_and_state *oas; grpc_transport_stream_op *curr_op; grpc_cronet_transport *curr_ct; @@ -248,11 +256,35 @@ static const char *op_id_string(enum e_op_id i) { return "UNKNOWN"; } -static void free_read_buffer(stream_obj *s) { +static void null_and_maybe_free_read_buffer(stream_obj *s) { if (s->state.rs.read_buffer && s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { gpr_free(s->state.rs.read_buffer); - s->state.rs.read_buffer = NULL; + } + s->state.rs.read_buffer = NULL; +} + +static void maybe_flush_read(stream_obj *s) { + /* To enter flush read state (discarding all the buffered messages in + * transport layer), two conditions must be satisfied: 1) non-zero grpc status + * has been received, and 2) an op requesting the status code + * (RECV_TRAILING_METADATA) is issued by the user. (See + * doc/status_ordering.md) */ + /* Whenever the evaluation of any of the two condition is changed, we check + * whether we should enter the flush read state. */ + if (s->state.pending_recv_trailing_metadata && s->state.fail_state) { + if (!s->state.flush_read && !s->state.rs.read_stream_closed) { + CRONET_LOG(GPR_DEBUG, "%p: Flush read", s); + s->state.flush_read = true; + null_and_maybe_free_read_buffer(s); + s->state.rs.read_buffer = gpr_malloc(GRPC_FLUSH_READ_SIZE); + if (!s->state.pending_read_from_cronet) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + GRPC_FLUSH_READ_SIZE); + s->state.pending_read_from_cronet = true; + } + } } } @@ -279,7 +311,11 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { storage->head = new_op; storage->num_pending_ops++; if (op->send_message) { - s->state.unprocessed_send_message = true; + s->state.pending_send_message = true; + } + if (op->recv_trailing_metadata) { + s->state.pending_recv_trailing_metadata = true; + maybe_flush_read(s); } CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, storage->num_pending_ops); @@ -367,7 +403,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -390,7 +426,7 @@ static void on_canceled(bidirectional_stream *stream) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -405,7 +441,7 @@ static void on_succeeded(bidirectional_stream *stream) { bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_SUCCEEDED] = true; s->cbs = NULL; - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -451,15 +487,18 @@ static void on_response_headers_received( gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, + s->arena); for (size_t i = 0; i < headers->count; i++) { - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(headers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_headers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.initial_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].value))))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -473,6 +512,7 @@ static void on_response_headers_received( CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, s->state.rs.remaining_bytes); + s->state.pending_read_from_cronet = true; } gpr_mu_unlock(&s->mu); grpc_exec_ctx_finish(&exec_ctx); @@ -504,10 +544,13 @@ static void on_read_completed(bidirectional_stream *stream, char *data, CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); gpr_mu_lock(&s->mu); + s->state.pending_read_from_cronet = false; s->state.state_callback_received[OP_RECV_MESSAGE] = true; if (count > 0 && s->state.flush_read) { CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); - bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + GRPC_FLUSH_READ_SIZE); + s->state.pending_read_from_cronet = true; gpr_mu_unlock(&s->mu); } else if (count > 0) { s->state.rs.received_bytes += count; @@ -518,16 +561,14 @@ static void on_read_completed(bidirectional_stream *stream, char *data, bidirectional_stream_read( s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, s->state.rs.remaining_bytes); + s->state.pending_read_from_cronet = true; gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); execute_from_storage(s); } } else { - if (s->state.flush_read) { - gpr_free(s->state.rs.read_buffer); - s->state.rs.read_buffer = NULL; - } + null_and_maybe_free_read_buffer(s); s->state.rs.read_stream_closed = true; gpr_mu_unlock(&s->mu); execute_from_storage(s); @@ -549,21 +590,25 @@ static void on_response_trailers_received( memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, + s->arena); for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(trailers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_trailers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.trailing_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].value))))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { s->state.fail_state = true; + maybe_flush_read(s); } } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; @@ -778,7 +823,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; /* we haven't sent message yet */ - else if (stream_state->unprocessed_send_message && + else if (stream_state->pending_send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false; /* we haven't got on_write_completed for the send yet */ @@ -900,7 +945,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_op->send_message && op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); - stream_state->unprocessed_send_message = false; + stream_state->pending_send_message = false; if (stream_state->state_callback_received[OP_FAILED]) { result = NO_ACTION_POSSIBLE; CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); @@ -1009,6 +1054,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; + } else if (stream_state->flush_read) { + CRONET_LOG(GPR_DEBUG, "flush read"); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); + stream_state->state_op_done[OP_RECV_MESSAGE] = true; + oas->state.state_op_done[OP_RECV_MESSAGE] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.length_field_received == false) { if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && stream_state->rs.remaining_bytes == 0) { @@ -1029,6 +1081,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_WITH_CALLBACK; } else { stream_state->rs.remaining_bytes = 0; @@ -1047,11 +1100,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.received_bytes = 0; + stream_state->rs.length_field_received = false; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_state->rs.remaining_bytes == 0) { @@ -1064,6 +1119,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_WITH_CALLBACK; } else { result = NO_ACTION_POSSIBLE; @@ -1075,7 +1131,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); @@ -1096,6 +1152,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_op->recv_trailing_metadata && @@ -1153,15 +1210,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, make a note */ if (stream_op->recv_message) stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true; - } else if (stream_state->fail_state && !stream_state->flush_read) { - CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas); - if (stream_state->rs.read_buffer && - stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) { - gpr_free(stream_state->rs.read_buffer); - stream_state->rs.read_buffer = NULL; - } - stream_state->rs.read_buffer = gpr_malloc(4096); - stream_state->flush_read = true; } else { result = NO_ACTION_POSSIBLE; } @@ -1174,7 +1222,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { stream_obj *s = (stream_obj *)gs; memset(&s->storage, 0, sizeof(s->storage)); s->storage.head = NULL; @@ -1190,10 +1238,13 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->state.fail_state = s->state.flush_read = false; s->state.cancel_error = NULL; s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; - s->state.unprocessed_send_message = false; + s->state.pending_send_message = false; + s->state.pending_recv_trailing_metadata = false; + s->state.pending_read_from_cronet = false; s->curr_gs = gs; s->curr_ct = (grpc_cronet_transport *)gt; + s->arena = arena; gpr_mu_init(&s->mu); return 0; @@ -1209,38 +1260,33 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); - stream_obj *s = (stream_obj *)gs; - add_to_storage(s, op); if (op->send_initial_metadata && header_has_authority(op->send_initial_metadata->list.head)) { /* Cronet does not support :authority header field. We cancel the call when - this field is present in metadata */ - bidirectional_stream_header_array header_array; - bidirectional_stream_header *header; - bidirectional_stream cbs; - CRONET_LOG(GPR_DEBUG, - ":authority header is provided but not supported;" - " cancel operations"); - /* Notify application that operation is cancelled by forging trailers */ - header_array.count = 1; - header_array.capacity = 1; - header_array.headers = gpr_malloc(sizeof(bidirectional_stream_header)); - header = (bidirectional_stream_header *)header_array.headers; - header->key = "grpc-status"; - header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */ - cbs.annotation = (void *)s; - s->state.state_op_done[OP_CANCEL_ERROR] = true; - on_response_trailers_received(&cbs, &header_array); - gpr_free(header_array.headers); - } else { - execute_from_storage(s); + this field is present in metadata */ + if (op->recv_initial_metadata_ready) { + grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); + } + if (op->recv_message_ready) { + grpc_closure_sched(exec_ctx, op->recv_message_ready, + GRPC_ERROR_CANCELLED); + } + grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); + return; } + stream_obj *s = (stream_obj *)gs; + add_to_storage(s, op); + execute_from_storage(s); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { stream_obj *s = (stream_obj *)gs; + null_and_maybe_free_read_buffer(s); GRPC_ERROR_UNREF(s->state.cancel_error); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index 3fb2a60ac7..6d53b0576e 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -166,41 +166,32 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, } } -grpc_error *grpc_call_stack_init( - grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, - grpc_call_context_element *context, const void *transport_server_data, - grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, - grpc_call_stack *call_stack) { +grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, + const grpc_call_element_args *elem_args) { grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack); size_t count = channel_stack->count; grpc_call_element *call_elems; char *user_data; size_t i; - call_stack->count = count; - GRPC_STREAM_REF_INIT(&call_stack->refcount, initial_refs, destroy, + elem_args->call_stack->count = count; + GRPC_STREAM_REF_INIT(&elem_args->call_stack->refcount, initial_refs, destroy, destroy_arg, "CALL_STACK"); - call_elems = CALL_ELEMS_FROM_STACK(call_stack); + call_elems = CALL_ELEMS_FROM_STACK(elem_args->call_stack); user_data = ((char *)call_elems) + ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element)); /* init per-filter data */ grpc_error *first_error = GRPC_ERROR_NONE; - const grpc_call_element_args args = { - .start_time = start_time, - .call_stack = call_stack, - .server_transport_data = transport_server_data, - .context = context, - .path = path, - .deadline = deadline, - }; for (i = 0; i < count; i++) { call_elems[i].filter = channel_elems[i].filter; call_elems[i].channel_data = channel_elems[i].channel_data; call_elems[i].call_data = user_data; - grpc_error *error = - call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args); + grpc_error *error = call_elems[i].filter->init_call_elem( + exec_ctx, &call_elems[i], elem_args); if (error != GRPC_ERROR_NONE) { if (first_error == GRPC_ERROR_NONE) { first_error = error; @@ -241,15 +232,16 @@ void grpc_call_stack_ignore_set_pollset_or_pollset_set( void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { grpc_call_element *elems = CALL_ELEMS_FROM_STACK(stack); size_t count = stack->count; size_t i; /* destroy per-filter data */ for (i = 0; i < count; i++) { - elems[i].filter->destroy_call_elem(exec_ctx, &elems[i], final_info, - i == count - 1 ? and_free_memory : NULL); + elems[i].filter->destroy_call_elem( + exec_ctx, &elems[i], final_info, + i == count - 1 ? then_schedule_closure : NULL); } } diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 1e943dc2e5..80e3603e8d 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -56,6 +56,7 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/transport.h" #ifdef __cplusplus @@ -84,6 +85,7 @@ typedef struct { grpc_slice path; gpr_timespec start_time; gpr_timespec deadline; + gpr_arena *arena; } grpc_call_element_args; typedef struct { @@ -128,7 +130,8 @@ typedef struct { server_transport_data is an opaque pointer. If it is NULL, this call is on a client; if it is non-NULL, then it points to memory owned by the transport and is on the server. Most filters want to ignore this - argument. */ + argument. + Implementations may assume that elem->call_data is all zeros. */ grpc_error *(*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args); @@ -138,12 +141,12 @@ typedef struct { /* Destroy per call data. The filter does not need to do any chaining. The bottom filter of a stack will be passed a non-NULL pointer to - \a and_free_memory that should be passed to gpr_free when destruction - is complete. \a final_info contains data about the completed call, mainly - for reporting purposes. */ + \a then_schedule_closure that should be passed to grpc_closure_sched when + destruction is complete. \a final_info contains data about the completed + call, mainly for reporting purposes. */ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory); + grpc_closure *then_schedule_closure); /* sizeof(per channel data) */ size_t sizeof_channel_data; @@ -152,7 +155,8 @@ typedef struct { is what needs initializing. is_first, is_last designate this elements position in the stack, and are useful for asserting correct configuration by upper layer code. - The filter does not need to do any chaining */ + The filter does not need to do any chaining. + Implementations may assume that elem->call_data is all zeros. */ grpc_error *(*init_channel_elem)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args); @@ -234,12 +238,11 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx, /* Initialize a call stack given a channel stack. transport_server_data is expected to be NULL on a client, or an opaque transport owned pointer on the server. */ -grpc_error *grpc_call_stack_init( - grpc_exec_ctx *exec_ctx, grpc_channel_stack *channel_stack, - int initial_refs, grpc_iomgr_cb_func destroy, void *destroy_arg, - grpc_call_context_element *context, const void *transport_server_data, - grpc_slice path, gpr_timespec start_time, gpr_timespec deadline, - grpc_call_stack *call_stack); +grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx, + grpc_channel_stack *channel_stack, + int initial_refs, grpc_iomgr_cb_func destroy, + void *destroy_arg, + const grpc_call_element_args *elem_args); /* Set a pollset or a pollset_set for a call stack: must occur before the first * op is started */ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, @@ -269,7 +272,7 @@ void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, /* Destroy a call stack */ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack, const grpc_call_final_info *final_info, - void *and_free_memory); + grpc_closure *then_schedule_closure); /* Ignore set pollset{_set} - used by filters if they don't care about pollsets * at all. Does nothing. */ diff --git a/src/core/lib/channel/channel_stack_builder.c b/src/core/lib/channel/channel_stack_builder.c index 5f9e3b4539..b515b7321a 100644 --- a/src/core/lib/channel/channel_stack_builder.c +++ b/src/core/lib/channel/channel_stack_builder.c @@ -65,8 +65,7 @@ struct grpc_channel_stack_builder_iterator { }; grpc_channel_stack_builder *grpc_channel_stack_builder_create(void) { - grpc_channel_stack_builder *b = gpr_malloc(sizeof(*b)); - memset(b, 0, sizeof(*b)); + grpc_channel_stack_builder *b = gpr_zalloc(sizeof(*b)); b->begin.filter = NULL; b->end.filter = NULL; @@ -251,7 +250,7 @@ grpc_error *grpc_channel_stack_builder_finish( size_t channel_stack_size = grpc_channel_stack_size(filters, num_filters); // allocate memory, with prefix_bytes followed by channel_stack_size - *result = gpr_malloc(prefix_bytes + channel_stack_size); + *result = gpr_zalloc(prefix_bytes + channel_stack_size); // fetch a pointer to the channel stack grpc_channel_stack *channel_stack = (grpc_channel_stack *)((char *)(*result) + prefix_bytes); diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index aa41014a21..02dc479f3a 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -292,7 +292,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 29796f7ca7..42ef7b7806 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -88,7 +88,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, channel_data *chand = elem->channel_data; int r = grpc_transport_init_stream( exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - &args->call_stack->refcount, args->server_transport_data); + &args->call_stack->refcount, args->server_transport_data, args->arena); return r == 0 ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("transport stream initialization failed"); } @@ -105,12 +105,12 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; grpc_transport_destroy_stream(exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld), - and_free_memory); + then_schedule_closure); } /* Constructor for channel_data */ diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index f9668be0fa..34114bbebf 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -144,7 +144,6 @@ static void inject_on_complete_cb(grpc_deadline_state* deadline_state, void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_stack* call_stack) { grpc_deadline_state* deadline_state = elem->call_data; - memset(deadline_state, 0, sizeof(*deadline_state)); deadline_state->call_stack = call_stack; } @@ -249,8 +248,6 @@ typedef struct server_call_data { static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_element_args* args) { - // Note: size of call data is different between client and server. - memset(elem->call_data, 0, elem->filter->sizeof_call_data); grpc_deadline_state_init(exec_ctx, elem, args->call_stack); grpc_deadline_state_start(exec_ctx, elem, args->deadline); return GRPC_ERROR_NONE; @@ -259,7 +256,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // Destructor for call_data. Used for both client and server filters. static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, - void* and_free_memory) { + grpc_closure* ignored) { grpc_deadline_state_destroy(exec_ctx, elem); } diff --git a/src/core/lib/channel/deadline_filter.h b/src/core/lib/channel/deadline_filter.h index 94717f6bc7..72cd5cb929 100644 --- a/src/core/lib/channel/deadline_filter.h +++ b/src/core/lib/channel/deadline_filter.h @@ -62,6 +62,7 @@ typedef struct grpc_deadline_state { // elem->call_data is a grpc_deadline_state. // +// assumes elem->call_data is zero'd void grpc_deadline_state_init(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_call_stack* call_stack); void grpc_deadline_state_destroy(grpc_exec_ctx* exec_ctx, diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index 82c361c7ef..1b4240bb10 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -99,8 +99,7 @@ struct grpc_handshake_manager { }; grpc_handshake_manager* grpc_handshake_manager_create() { - grpc_handshake_manager* mgr = gpr_malloc(sizeof(grpc_handshake_manager)); - memset(mgr, 0, sizeof(*mgr)); + grpc_handshake_manager* mgr = gpr_zalloc(sizeof(grpc_handshake_manager)); gpr_mu_init(&mgr->mu); gpr_ref_init(&mgr->refs, 1); return mgr; diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index c031533dd8..f9d0d689ac 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -412,7 +412,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->slices); } diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index ce519f9c92..bebd3af335 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -345,7 +345,6 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* grab pointers to our data from the call element */ call_data *calld = elem->call_data; /* initialize members */ - memset(calld, 0, sizeof(*calld)); grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem, grpc_schedule_on_exec_ctx); grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem, @@ -359,7 +358,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; grpc_slice_buffer_destroy_internal(exec_ctx, &calld->read_slice_buffer); } diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 22938c64b5..5ba13fe251 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -200,7 +200,7 @@ static grpc_error* init_call_elem(grpc_exec_ctx* exec_ctx, // Destructor for call_data. static void destroy_call_elem(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, const grpc_call_final_info* final_info, - void* ignored) {} + grpc_closure* ignored) {} // Constructor for channel_data. static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, @@ -208,7 +208,6 @@ static grpc_error* init_channel_elem(grpc_exec_ctx* exec_ctx, grpc_channel_element_args* args) { GPR_ASSERT(!args->is_last); channel_data* chand = elem->channel_data; - memset(chand, 0, sizeof(*chand)); chand->max_send_size = GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH; chand->max_recv_size = GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH; for (size_t i = 0; i < args->channel_args->num_args; ++i) { diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index f4f6f3c27a..354d2f4a09 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -118,8 +118,7 @@ static grpc_security_status httpcli_ssl_channel_security_connector_create( return GRPC_SECURITY_ERROR; } - c = gpr_malloc(sizeof(grpc_httpcli_ssl_channel_security_connector)); - memset(c, 0, sizeof(grpc_httpcli_ssl_channel_security_connector)); + c = gpr_zalloc(sizeof(grpc_httpcli_ssl_channel_security_connector)); gpr_ref_init(&c->base.base.refcount, 1); c->base.base.vtable = &httpcli_ssl_vtable; diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c index 2f84adc187..b9c56c103c 100644 --- a/src/core/lib/http/parser.c +++ b/src/core/lib/http/parser.c @@ -284,9 +284,9 @@ static grpc_error *addbyte(grpc_http_parser *parser, uint8_t byte, case GRPC_HTTP_HEADERS: if (parser->cur_line_length >= GRPC_HTTP_PARSER_MAX_HEADER_LENGTH) { if (grpc_http1_trace) - gpr_log(GPR_ERROR, "HTTP client max line length (%d) exceeded", + gpr_log(GPR_ERROR, "HTTP header max line length (%d) exceeded", GRPC_HTTP_PARSER_MAX_HEADER_LENGTH); - return GRPC_ERROR_NONE; + return GRPC_ERROR_CREATE("HTTP header max line length exceeded"); } parser->cur_line[parser->cur_line_length] = byte; parser->cur_line_length++; diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index dbe5b139f9..1127fff756 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -35,6 +35,7 @@ #include <string.h> +#include <grpc/slice.h> #include <grpc/status.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -47,46 +48,7 @@ #include "src/core/lib/iomgr/error_internal.h" #include "src/core/lib/profiling/timers.h" - -static void destroy_integer(void *key) {} - -static void *copy_integer(void *key) { return key; } - -static long compare_integers(void *key1, void *key2) { - return GPR_ICMP((uintptr_t)key1, (uintptr_t)key2); -} - -static void destroy_string(void *str) { gpr_free(str); } - -static void *copy_string(void *str) { return gpr_strdup(str); } - -static void destroy_err(void *err) { GRPC_ERROR_UNREF(err); } - -static void *copy_err(void *err) { return GRPC_ERROR_REF(err); } - -static void destroy_time(void *tm) { gpr_free(tm); } - -static gpr_timespec *box_time(gpr_timespec tm) { - gpr_timespec *out = gpr_malloc(sizeof(*out)); - *out = tm; - return out; -} - -static void *copy_time(void *tm) { return box_time(*(gpr_timespec *)tm); } - -static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer, - compare_integers, - destroy_integer, copy_integer}; - -static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer, - compare_integers, destroy_string, - copy_string}; - -static const gpr_avl_vtable avl_vtable_times = { - destroy_integer, copy_integer, compare_integers, destroy_time, copy_time}; - -static const gpr_avl_vtable avl_vtable_errs = { - destroy_integer, copy_integer, compare_integers, destroy_err, copy_err}; +#include "src/core/lib/slice/slice_internal.h" static const char *error_int_name(grpc_error_ints key) { switch (key) { @@ -120,6 +82,8 @@ static const char *error_int_name(grpc_error_ints key) { return "limit"; case GRPC_ERROR_INT_OCCURRED_DURING_WRITE: return "occurred_during_write"; + case GRPC_ERROR_INT_MAX: + GPR_UNREACHABLE_CODE(return "unknown"); } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -150,6 +114,8 @@ static const char *error_str_name(grpc_error_strs key) { return "filename"; case GRPC_ERROR_STR_QUEUED_BUFFERS: return "queued_buffers"; + case GRPC_ERROR_STR_MAX: + GPR_UNREACHABLE_CODE(return "unknown"); } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -158,6 +124,8 @@ static const char *error_time_name(grpc_error_times key) { switch (key) { case GRPC_ERROR_TIME_CREATED: return "created"; + case GRPC_ERROR_TIME_MAX: + GPR_UNREACHABLE_CODE(return "unknown"); } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -172,25 +140,51 @@ grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line, const char *func) { if (grpc_error_is_special(err)) return err; gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d %s]", err, - err->refs.count, err->refs.count + 1, file, line, func); - gpr_ref(&err->refs); + gpr_atm_no_barrier_load(&err->atomics.refs.count), + gpr_atm_no_barrier_load(&err->atomics.refs.count) + 1, file, line, + func); + gpr_ref(&err->atomics.refs); return err; } #else grpc_error *grpc_error_ref(grpc_error *err) { if (grpc_error_is_special(err)) return err; - gpr_ref(&err->refs); + gpr_ref(&err->atomics.refs); return err; } #endif +static void unref_errs(grpc_error *err) { + uint8_t slot = err->first_err; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot); + GRPC_ERROR_UNREF(lerr->err); + GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX + : lerr->next != UINT8_MAX); + slot = lerr->next; + } +} + +static void unref_slice(grpc_slice slice) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_unref_internal(&exec_ctx, slice); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void unref_strs(grpc_error *err) { + for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) { + uint8_t slot = err->strs[which]; + if (slot != UINT8_MAX) { + unref_slice(*(grpc_slice *)(err->arena + slot)); + } + } +} + static void error_destroy(grpc_error *err) { GPR_ASSERT(!grpc_error_is_special(err)); - gpr_avl_unref(err->ints); - gpr_avl_unref(err->strs); - gpr_avl_unref(err->errs); - gpr_avl_unref(err->times); - gpr_free((void *)gpr_atm_acq_load(&err->error_string)); + unref_errs(err); + unref_strs(err); + gpr_free((void *)gpr_atm_acq_load(&err->atomics.error_string)); gpr_free(err); } @@ -199,81 +193,209 @@ void grpc_error_unref(grpc_error *err, const char *file, int line, const char *func) { if (grpc_error_is_special(err)) return; gpr_log(GPR_DEBUG, "%p: %" PRIdPTR " -> %" PRIdPTR " [%s:%d %s]", err, - err->refs.count, err->refs.count - 1, file, line, func); - if (gpr_unref(&err->refs)) { + gpr_atm_no_barrier_load(&err->atomics.refs.count), + gpr_atm_no_barrier_load(&err->atomics.refs.count) - 1, file, line, + func); + if (gpr_unref(&err->atomics.refs)) { error_destroy(err); } } #else void grpc_error_unref(grpc_error *err) { if (grpc_error_is_special(err)) return; - if (gpr_unref(&err->refs)) { + if (gpr_unref(&err->atomics.refs)) { error_destroy(err); } } #endif +static uint8_t get_placement(grpc_error **err, size_t size) { + GPR_ASSERT(*err); + uint8_t slots = (uint8_t)(size / sizeof(intptr_t)); + if ((*err)->arena_size + slots > (*err)->arena_capacity) { + (*err)->arena_capacity = (uint8_t)(3 * (*err)->arena_capacity / 2); + *err = gpr_realloc( + *err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t)); + } + uint8_t placement = (*err)->arena_size; + (*err)->arena_size = (uint8_t)((*err)->arena_size + slots); + return placement; +} + +static void internal_set_int(grpc_error **err, grpc_error_ints which, + intptr_t value) { + // GPR_ASSERT((*err)->ints[which] == UINT8_MAX); // TODO, enforce this + uint8_t slot = (*err)->ints[which]; + if (slot == UINT8_MAX) { + slot = get_placement(err, sizeof(value)); + } + (*err)->ints[which] = slot; + (*err)->arena[slot] = value; +} + +static void internal_set_str(grpc_error **err, grpc_error_strs which, + grpc_slice value) { + // GPR_ASSERT((*err)->strs[which] == UINT8_MAX); // TODO, enforce this + uint8_t slot = (*err)->strs[which]; + if (slot == UINT8_MAX) { + slot = get_placement(err, sizeof(value)); + } else { + unref_slice(*(grpc_slice *)((*err)->arena + slot)); + } + (*err)->strs[which] = slot; + memcpy((*err)->arena + slot, &value, sizeof(value)); +} + +static void internal_set_time(grpc_error **err, grpc_error_times which, + gpr_timespec value) { + // GPR_ASSERT((*err)->times[which] == UINT8_MAX); // TODO, enforce this + uint8_t slot = (*err)->times[which]; + if (slot == UINT8_MAX) { + slot = get_placement(err, sizeof(value)); + } + (*err)->times[which] = slot; + memcpy((*err)->arena + slot, &value, sizeof(value)); +} + +static void internal_add_error(grpc_error **err, grpc_error *new) { + grpc_linked_error new_last = {new, UINT8_MAX}; + uint8_t slot = get_placement(err, sizeof(grpc_linked_error)); + if ((*err)->first_err == UINT8_MAX) { + GPR_ASSERT((*err)->last_err == UINT8_MAX); + (*err)->last_err = slot; + (*err)->first_err = slot; + } else { + GPR_ASSERT((*err)->last_err != UINT8_MAX); + grpc_linked_error *old_last = + (grpc_linked_error *)((*err)->arena + (*err)->last_err); + old_last->next = slot; + (*err)->last_err = slot; + } + memcpy((*err)->arena + slot, &new_last, sizeof(grpc_linked_error)); +} + +#define SLOTS_PER_INT (sizeof(intptr_t) / sizeof(intptr_t)) +#define SLOTS_PER_STR (sizeof(grpc_slice) / sizeof(intptr_t)) +#define SLOTS_PER_TIME (sizeof(gpr_timespec) / sizeof(intptr_t)) +#define SLOTS_PER_LINKED_ERROR (sizeof(grpc_linked_error) / sizeof(intptr_t)) + +// size of storing one int and two slices and a timespec. For line, desc, file, +// and time created +#define DEFAULT_ERROR_CAPACITY \ + (SLOTS_PER_INT + (SLOTS_PER_STR * 2) + SLOTS_PER_TIME) + +// It is very common to include and extra int and string in an error +#define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME) + grpc_error *grpc_error_create(const char *file, int line, const char *desc, grpc_error **referencing, size_t num_referencing) { GPR_TIMER_BEGIN("grpc_error_create", 0); - grpc_error *err = gpr_malloc(sizeof(*err)); + uint8_t initial_arena_capacity = (uint8_t)( + DEFAULT_ERROR_CAPACITY + + (uint8_t)(num_referencing * SLOTS_PER_LINKED_ERROR) + SURPLUS_CAPACITY); + grpc_error *err = + gpr_malloc(sizeof(*err) + initial_arena_capacity * sizeof(intptr_t)); if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL return GRPC_ERROR_OOM; } #ifdef GRPC_ERROR_REFCOUNT_DEBUG gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line); #endif - err->ints = gpr_avl_add(gpr_avl_create(&avl_vtable_ints), - (void *)(uintptr_t)GRPC_ERROR_INT_FILE_LINE, - (void *)(uintptr_t)line); - err->strs = gpr_avl_add( - gpr_avl_add(gpr_avl_create(&avl_vtable_strs), - (void *)(uintptr_t)GRPC_ERROR_STR_FILE, gpr_strdup(file)), - (void *)(uintptr_t)GRPC_ERROR_STR_DESCRIPTION, gpr_strdup(desc)); - err->errs = gpr_avl_create(&avl_vtable_errs); - err->next_err = 0; - for (size_t i = 0; i < num_referencing; i++) { + + err->arena_size = 0; + err->arena_capacity = initial_arena_capacity; + err->first_err = UINT8_MAX; + err->last_err = UINT8_MAX; + + memset(err->ints, UINT8_MAX, GRPC_ERROR_INT_MAX); + memset(err->strs, UINT8_MAX, GRPC_ERROR_STR_MAX); + memset(err->times, UINT8_MAX, GRPC_ERROR_TIME_MAX); + + internal_set_int(&err, GRPC_ERROR_INT_FILE_LINE, line); + internal_set_str(&err, GRPC_ERROR_STR_FILE, + grpc_slice_from_static_string(file)); + internal_set_str( + &err, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_copied_buffer( + desc, + strlen(desc) + + 1)); // TODO, pull this up. // TODO(ncteisen), pull this up. + + for (size_t i = 0; i < num_referencing; ++i) { if (referencing[i] == GRPC_ERROR_NONE) continue; - err->errs = gpr_avl_add(err->errs, (void *)(err->next_err++), - GRPC_ERROR_REF(referencing[i])); - } - err->times = gpr_avl_add(gpr_avl_create(&avl_vtable_times), - (void *)(uintptr_t)GRPC_ERROR_TIME_CREATED, - box_time(gpr_now(GPR_CLOCK_REALTIME))); - gpr_atm_no_barrier_store(&err->error_string, 0); - gpr_ref_init(&err->refs, 1); + internal_add_error( + &err, + GRPC_ERROR_REF( + referencing[i])); // TODO(ncteisen), change ownership semantics + } + + internal_set_time(&err, GRPC_ERROR_TIME_CREATED, gpr_now(GPR_CLOCK_REALTIME)); + + gpr_atm_no_barrier_store(&err->atomics.error_string, 0); + gpr_ref_init(&err->atomics.refs, 1); GPR_TIMER_END("grpc_error_create", 0); return err; } +static void ref_strs(grpc_error *err) { + for (size_t i = 0; i < GRPC_ERROR_STR_MAX; ++i) { + uint8_t slot = err->strs[i]; + if (slot != UINT8_MAX) { + grpc_slice_ref_internal(*(grpc_slice *)(err->arena + slot)); + } + } +} + +static void ref_errs(grpc_error *err) { + uint8_t slot = err->first_err; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot); + GRPC_ERROR_REF(lerr->err); + slot = lerr->next; + } +} + static grpc_error *copy_error_and_unref(grpc_error *in) { GPR_TIMER_BEGIN("copy_error_and_unref", 0); grpc_error *out; if (grpc_error_is_special(in)) { - if (in == GRPC_ERROR_NONE) - out = grpc_error_set_int(GRPC_ERROR_CREATE("no error"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK); - else if (in == GRPC_ERROR_OOM) - out = GRPC_ERROR_CREATE("oom"); - else if (in == GRPC_ERROR_CANCELLED) - out = - grpc_error_set_int(GRPC_ERROR_CREATE("cancelled"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED); - else - out = GRPC_ERROR_CREATE("unknown"); + out = GRPC_ERROR_CREATE("unknown"); + if (in == GRPC_ERROR_NONE) { + internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_static_string("no error")); + internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK); + } else if (in == GRPC_ERROR_OOM) { + internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_static_string("oom")); + } else if (in == GRPC_ERROR_CANCELLED) { + internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_static_string("cancelled")); + internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED); + } + } else if (gpr_ref_is_unique(&in->atomics.refs)) { + out = in; } else { - out = gpr_malloc(sizeof(*out)); + uint8_t new_arena_capacity = in->arena_capacity; + // the returned err will be added to, so we ensure this is room to avoid + // unneeded allocations. + if (in->arena_capacity - in->arena_size < (uint8_t)SLOTS_PER_STR) { + new_arena_capacity = (uint8_t)(3 * new_arena_capacity / 2); + } + out = gpr_malloc(sizeof(*in) + new_arena_capacity * sizeof(intptr_t)); #ifdef GRPC_ERROR_REFCOUNT_DEBUG gpr_log(GPR_DEBUG, "%p create copying %p", out, in); #endif - out->ints = gpr_avl_ref(in->ints); - out->strs = gpr_avl_ref(in->strs); - out->errs = gpr_avl_ref(in->errs); - out->times = gpr_avl_ref(in->times); - gpr_atm_no_barrier_store(&out->error_string, 0); - out->next_err = in->next_err; - gpr_ref_init(&out->refs, 1); + // bulk memcpy of the rest of the struct. + size_t skip = sizeof(&out->atomics); + memcpy((void *)((uintptr_t)out + skip), (void *)((uintptr_t)in + skip), + sizeof(*in) + (in->arena_size * sizeof(intptr_t)) - skip); + // manually set the atomics and the new capacity + gpr_atm_no_barrier_store(&out->atomics.error_string, 0); + gpr_ref_init(&out->atomics.refs, 1); + out->arena_capacity = new_arena_capacity; + ref_strs(out); + ref_errs(out); GRPC_ERROR_UNREF(in); } GPR_TIMER_END("copy_error_and_unref", 0); @@ -284,7 +406,7 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, intptr_t value) { GPR_TIMER_BEGIN("grpc_error_set_int", 0); grpc_error *new = copy_error_and_unref(src); - new->ints = gpr_avl_add(new->ints, (void *)(uintptr_t)which, (void *)value); + internal_set_int(&new, which, value); GPR_TIMER_END("grpc_error_set_int", 0); return new; } @@ -302,7 +424,6 @@ static special_error_status_map error_status_map[] = { bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) { GPR_TIMER_BEGIN("grpc_error_get_int", 0); - void *pp; if (grpc_error_is_special(err)) { if (which == GRPC_ERROR_INT_GRPC_STATUS) { for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); i++) { @@ -316,8 +437,9 @@ bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) { GPR_TIMER_END("grpc_error_get_int", 0); return false; } - if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) { - if (p != NULL) *p = (intptr_t)pp; + uint8_t slot = err->ints[which]; + if (slot != UINT8_MAX) { + if (p != NULL) *p = err->arena[slot]; GPR_TIMER_END("grpc_error_get_int", 0); return true; } @@ -329,8 +451,9 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, const char *value) { GPR_TIMER_BEGIN("grpc_error_set_str", 0); grpc_error *new = copy_error_and_unref(src); - new->strs = - gpr_avl_add(new->strs, (void *)(uintptr_t)which, gpr_strdup(value)); + internal_set_str(&new, which, + grpc_slice_from_copied_buffer( + value, strlen(value) + 1)); // TODO, pull this up. GPR_TIMER_END("grpc_error_set_str", 0); return new; } @@ -346,13 +469,19 @@ const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) { } return NULL; } - return gpr_avl_get(err->strs, (void *)(uintptr_t)which); + uint8_t slot = err->strs[which]; + if (slot != UINT8_MAX) { + return (const char *)GRPC_SLICE_START_PTR( + *(grpc_slice *)(err->arena + slot)); + } else { + return NULL; + } } grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { GPR_TIMER_BEGIN("grpc_error_add_child", 0); grpc_error *new = copy_error_and_unref(src); - new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child); + internal_add_error(&new, child); GPR_TIMER_END("grpc_error_add_child", 0); return new; } @@ -372,42 +501,6 @@ typedef struct { size_t cap_kvs; } kv_pairs; -static void append_kv(kv_pairs *kvs, char *key, char *value) { - if (kvs->num_kvs == kvs->cap_kvs) { - kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4); - kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs); - } - kvs->kvs[kvs->num_kvs].key = key; - kvs->kvs[kvs->num_kvs].value = value; - kvs->num_kvs++; -} - -static void collect_kvs(gpr_avl_node *node, char *key(void *k), - char *fmt(void *v), kv_pairs *kvs) { - if (node == NULL) return; - append_kv(kvs, key(node->key), fmt(node->value)); - collect_kvs(node->left, key, fmt, kvs); - collect_kvs(node->right, key, fmt, kvs); -} - -static char *key_int(void *p) { - return gpr_strdup(error_int_name((grpc_error_ints)(uintptr_t)p)); -} - -static char *key_str(void *p) { - return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p)); -} - -static char *key_time(void *p) { - return gpr_strdup(error_time_name((grpc_error_times)(uintptr_t)p)); -} - -static char *fmt_int(void *p) { - char *s; - gpr_asprintf(&s, "%" PRIdPTR, (intptr_t)p); - return s; -} - static void append_chr(char c, char **s, size_t *sz, size_t *cap) { if (*sz == *cap) { *cap = GPR_MAX(8, 3 * *cap / 2); @@ -459,6 +552,40 @@ static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) { append_chr('"', s, sz, cap); } +static void append_kv(kv_pairs *kvs, char *key, char *value) { + if (kvs->num_kvs == kvs->cap_kvs) { + kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4); + kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs); + } + kvs->kvs[kvs->num_kvs].key = key; + kvs->kvs[kvs->num_kvs].value = value; + kvs->num_kvs++; +} + +static char *key_int(grpc_error_ints which) { + return gpr_strdup(error_int_name(which)); +} + +static char *fmt_int(intptr_t p) { + char *s; + gpr_asprintf(&s, "%" PRIdPTR, p); + return s; +} + +static void collect_ints_kvs(grpc_error *err, kv_pairs *kvs) { + for (size_t which = 0; which < GRPC_ERROR_INT_MAX; ++which) { + uint8_t slot = err->ints[which]; + if (slot != UINT8_MAX) { + append_kv(kvs, key_int((grpc_error_ints)which), + fmt_int(err->arena[slot])); + } + } +} + +static char *key_str(grpc_error_strs which) { + return gpr_strdup(error_str_name(which)); +} + static char *fmt_str(void *p) { char *s = NULL; size_t sz = 0; @@ -468,8 +595,22 @@ static char *fmt_str(void *p) { return s; } -static char *fmt_time(void *p) { - gpr_timespec tm = *(gpr_timespec *)p; +static void collect_strs_kvs(grpc_error *err, kv_pairs *kvs) { + for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) { + uint8_t slot = err->strs[which]; + if (slot != UINT8_MAX) { + append_kv( + kvs, key_str((grpc_error_strs)which), + fmt_str(GRPC_SLICE_START_PTR(*(grpc_slice *)(err->arena + slot)))); + } + } +} + +static char *key_time(grpc_error_times which) { + return gpr_strdup(error_time_name(which)); +} + +static char *fmt_time(gpr_timespec tm) { char *out; char *pfx = "!!"; switch (tm.clock_type) { @@ -490,24 +631,37 @@ static char *fmt_time(void *p) { return out; } -static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap, - bool *first) { - if (n == NULL) return; - add_errs(n->left, s, sz, cap, first); - if (!*first) append_chr(',', s, sz, cap); - *first = false; - const char *e = grpc_error_string(n->value); - append_str(e, s, sz, cap); - add_errs(n->right, s, sz, cap, first); +static void collect_times_kvs(grpc_error *err, kv_pairs *kvs) { + for (size_t which = 0; which < GRPC_ERROR_TIME_MAX; ++which) { + uint8_t slot = err->times[which]; + if (slot != UINT8_MAX) { + append_kv(kvs, key_time((grpc_error_times)which), + fmt_time(*(gpr_timespec *)(err->arena + slot))); + } + } +} + +static void add_errs(grpc_error *err, char **s, size_t *sz, size_t *cap) { + uint8_t slot = err->first_err; + bool first = true; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot); + if (!first) append_chr(',', s, sz, cap); + first = false; + const char *e = grpc_error_string(lerr->err); + append_str(e, s, sz, cap); + GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX + : lerr->next != UINT8_MAX); + slot = lerr->next; + } } static char *errs_string(grpc_error *err) { char *s = NULL; size_t sz = 0; size_t cap = 0; - bool first = true; append_chr('[', &s, &sz, &cap); - add_errs(err->errs.root, &s, &sz, &cap, &first); + add_errs(err, &s, &sz, &cap); append_chr(']', &s, &sz, &cap); append_chr(0, &s, &sz, &cap); return s; @@ -546,7 +700,7 @@ const char *grpc_error_string(grpc_error *err) { if (err == GRPC_ERROR_OOM) return oom_error_string; if (err == GRPC_ERROR_CANCELLED) return cancelled_error_string; - void *p = (void *)gpr_atm_acq_load(&err->error_string); + void *p = (void *)gpr_atm_acq_load(&err->atomics.error_string); if (p != NULL) { GPR_TIMER_END("grpc_error_string", 0); return p; @@ -555,10 +709,10 @@ const char *grpc_error_string(grpc_error *err) { kv_pairs kvs; memset(&kvs, 0, sizeof(kvs)); - collect_kvs(err->ints.root, key_int, fmt_int, &kvs); - collect_kvs(err->strs.root, key_str, fmt_str, &kvs); - collect_kvs(err->times.root, key_time, fmt_time, &kvs); - if (!gpr_avl_is_empty(err->errs)) { + collect_ints_kvs(err, &kvs); + collect_strs_kvs(err, &kvs); + collect_times_kvs(err, &kvs); + if (err->first_err != UINT8_MAX) { append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err)); } @@ -566,9 +720,9 @@ const char *grpc_error_string(grpc_error *err) { char *out = finish_kvs(&kvs); - if (!gpr_atm_rel_cas(&err->error_string, 0, (gpr_atm)out)) { + if (!gpr_atm_rel_cas(&err->atomics.error_string, 0, (gpr_atm)out)) { gpr_free(out); - out = (char *)gpr_atm_no_barrier_load(&err->error_string); + out = (char *)gpr_atm_no_barrier_load(&err->atomics.error_string); } GPR_TIMER_END("grpc_error_string", 0); diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 2613512acb..eb953947ae 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -102,6 +102,9 @@ typedef enum { GRPC_ERROR_INT_LIMIT, /// chttp2: did the error occur while a write was in progress GRPC_ERROR_INT_OCCURRED_DURING_WRITE, + + /// Must always be last + GRPC_ERROR_INT_MAX, } grpc_error_ints; typedef enum { @@ -129,11 +132,17 @@ typedef enum { GRPC_ERROR_STR_KEY, /// value associated with the error GRPC_ERROR_STR_VALUE, + + /// Must always be last + GRPC_ERROR_STR_MAX, } grpc_error_strs; typedef enum { /// timestamp of error creation GRPC_ERROR_TIME_CREATED, + + /// Must always be last + GRPC_ERROR_TIME_MAX, } grpc_error_times; /// The following "special" errors can be propagated without allocating memory. @@ -184,8 +193,6 @@ void grpc_error_unref(grpc_error *err); grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p); -grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which, - gpr_timespec value) GRPC_MUST_USE_RESULT; grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, const char *value) GRPC_MUST_USE_RESULT; /// Returns NULL if the specified string is not set. diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h index 1c89ead4ed..7f204df1b2 100644 --- a/src/core/lib/iomgr/error_internal.h +++ b/src/core/lib/iomgr/error_internal.h @@ -35,18 +35,39 @@ #define GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H #include <inttypes.h> -#include <stdbool.h> +#include <stdbool.h> // TODO, do we need this? -#include <grpc/support/avl.h> +#include <grpc/support/sync.h> +typedef struct grpc_linked_error grpc_linked_error; + +struct grpc_linked_error { + grpc_error *err; + uint8_t next; +}; + +// c core representation of an error. See error.h for high level description of +// this object. struct grpc_error { - gpr_refcount refs; - gpr_avl ints; - gpr_avl strs; - gpr_avl times; - gpr_avl errs; - uintptr_t next_err; - gpr_atm error_string; + // All atomics in grpc_error must be stored in this nested struct. The rest of + // the object is memcpy-ed in bulk in copy_and_unref. + struct atomics { + gpr_refcount refs; + gpr_atm error_string; + } atomics; + // These arrays index into dynamic arena at the bottom of the struct. + // UINT8_MAX is used as a sentinel value. + uint8_t ints[GRPC_ERROR_INT_MAX]; + uint8_t strs[GRPC_ERROR_STR_MAX]; + uint8_t times[GRPC_ERROR_TIME_MAX]; + // The child errors are stored in the arena, but are effectively a linked list + // structure, since they are contained withing grpc_linked_error objects. + uint8_t first_err; + uint8_t last_err; + // The arena is dynamically reallocated with a grow factor of 1.5. + uint8_t arena_size; + uint8_t arena_capacity; + intptr_t arena[0]; }; bool grpc_error_is_special(grpc_error *err); diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index c03fadaebb..5ddd5313e2 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -1131,8 +1131,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, */ static grpc_pollset_set *pollset_set_create(void) { - grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); - memset(pollset_set, 0, sizeof(*pollset_set)); + grpc_pollset_set *pollset_set = gpr_zalloc(sizeof(*pollset_set)); gpr_mu_init(&pollset_set->mu); return pollset_set; } diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index c4b62a2790..e19ce697b8 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -53,6 +53,7 @@ typedef struct grpc_pollset grpc_pollset; typedef struct grpc_pollset_worker grpc_pollset_worker; size_t grpc_pollset_size(void); +/* Initialize a pollset: assumes *pollset contains all zeros */ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); /* Begin shutting down the pollset, and call closure when done. * pollset's mutex must be held */ diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index 5847771108..a2f81bcd78 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -39,6 +39,7 @@ #include <string.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -57,24 +58,40 @@ int grpc_pollset_work_run_loop; gpr_mu grpc_polling_mu; +/* This is used solely to kick the uv loop, by setting a callback to be run + immediately in the next loop iteration. + Note: In the future, if there is a bug that involves missing wakeups in the + future, try adding a uv_async_t to kick the loop differently */ +uv_timer_t *dummy_uv_handle; + size_t grpc_pollset_size() { return sizeof(grpc_pollset); } +void dummy_timer_cb(uv_timer_t *handle) {} + +void dummy_handle_close_cb(uv_handle_t *handle) { gpr_free(handle); } + void grpc_pollset_global_init(void) { gpr_mu_init(&grpc_polling_mu); + dummy_uv_handle = gpr_malloc(sizeof(uv_timer_t)); + uv_timer_init(uv_default_loop(), dummy_uv_handle); grpc_pollset_work_run_loop = 1; } -void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); } +void grpc_pollset_global_shutdown(void) { + gpr_mu_destroy(&grpc_polling_mu); + uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb); +} + +static void timer_run_cb(uv_timer_t *timer) {} + +static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { *mu = &grpc_polling_mu; - memset(pollset, 0, sizeof(grpc_pollset)); uv_timer_init(uv_default_loop(), &pollset->timer); pollset->shutting_down = 0; } -static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } - void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_ASSERT(!pollset->shutting_down); @@ -82,6 +99,9 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (grpc_pollset_work_run_loop) { // Drain any pending UV callbacks without blocking uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } else { + // kick the loop once + uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); } grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); } @@ -97,8 +117,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { } } -static void timer_run_cb(uv_timer_t *timer) {} - grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { @@ -131,6 +149,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *grpc_pollset_kick(grpc_pollset *pollset, grpc_pollset_worker *specific_worker) { + uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); return GRPC_ERROR_NONE; } diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 9be73a7ce8..17043c1ea1 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -98,7 +98,6 @@ size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { *mu = &grpc_polling_mu; - memset(pollset, 0, sizeof(*pollset)); pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = &pollset->root_worker; diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h index f1897bb91f..94a454c0b7 100644 --- a/src/core/lib/iomgr/port.h +++ b/src/core/lib/iomgr/port.h @@ -39,6 +39,7 @@ #if defined(GRPC_UV) // Do nothing #elif defined(GPR_MANYLINUX1) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 @@ -65,6 +66,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_LINUX) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_IP_PKTINFO 1 #define GRPC_HAVE_MSG_NOSIGNAL 1 @@ -90,6 +92,7 @@ #define GRPC_POSIX_SOCKETUTILS #endif #elif defined(GPR_APPLE) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 #define GRPC_MSG_IOVLEN_TYPE int @@ -100,6 +103,7 @@ #define GRPC_POSIX_WAKEUP_FD 1 #define GRPC_TIMER_USE_GENERIC 1 #elif defined(GPR_FREEBSD) +#define GRPC_HAVE_IFADDRS 1 #define GRPC_HAVE_IPV6_RECVPKTINFO 1 #define GRPC_HAVE_SO_NOSIGPIPE 1 #define GRPC_HAVE_UNIX_SOCKET 1 diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index 79ff910738..4d715be94c 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -40,6 +40,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include <grpc/support/useful.h> #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/error.h" @@ -54,8 +55,36 @@ typedef struct request { grpc_closure *on_done; grpc_resolved_addresses **addresses; struct addrinfo *hints; + char *host; + char *port; } request; +static int retry_named_port_failure(int status, request *r, + uv_getaddrinfo_cb getaddrinfo_cb) { + if (status != 0) { + // This loop is copied from resolve_address_posix.c + char *svc[][2] = {{"http", "80"}, {"https", "443"}}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(svc); i++) { + if (strcmp(r->port, svc[i][0]) == 0) { + int retry_status; + uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t)); + req->data = r; + retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb, + r->host, svc[i][1], r->hints); + if (retry_status < 0 || getaddrinfo_cb == NULL) { + // The callback will not be called + gpr_free(req); + } + return retry_status; + } + } + } + /* If this function calls uv_getaddrinfo, it will return that function's + return value. That function only returns numbers <=0, so we can safely + return 1 to indicate that we never retried */ + return 1; +} + static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result, grpc_resolved_addresses **addresses) { struct addrinfo *resp; @@ -97,13 +126,21 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, request *r = (request *)req->data; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_error *error; + int retry_status; + + gpr_free(req); + retry_status = retry_named_port_failure(status, r, getaddrinfo_callback); + if (retry_status == 0) { + // The request is being retried. Nothing should be done here + return; + } + /* Either no retry was attempted, or the retry failed. Either way, the + original error probably has more interesting information */ error = handle_addrinfo_result(status, res, r->addresses); grpc_closure_sched(&exec_ctx, r->on_done, error); grpc_exec_ctx_finish(&exec_ctx); - gpr_free(r->hints); gpr_free(r); - gpr_free(req); uv_freeaddrinfo(res); } @@ -143,6 +180,7 @@ static grpc_error *blocking_resolve_address_impl( uv_getaddrinfo_t req; int s; grpc_error *err; + int retry_status; req.addrinfo = NULL; @@ -158,6 +196,12 @@ static grpc_error *blocking_resolve_address_impl( hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints); + request r = { + .addresses = addresses, .hints = &hints, .host = host, .port = port}; + retry_status = retry_named_port_failure(s, &r, NULL); + if (retry_status <= 0) { + s = retry_status; + } err = handle_addrinfo_result(s, req.addrinfo, addresses); done: @@ -200,6 +244,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, r = gpr_malloc(sizeof(request)); r->on_done = on_done; r->addresses = addrs; + r->host = host; + r->port = port; req = gpr_malloc(sizeof(uv_getaddrinfo_t)); req->data = r; @@ -222,6 +268,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, gpr_free(r); gpr_free(req); gpr_free(hints); + gpr_free(host); + gpr_free(port); } } diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c index ffa62cb53c..9d2732666b 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.c @@ -162,6 +162,7 @@ int grpc_sockaddr_to_string(char **out, char ntop_buf[INET6_ADDRSTRLEN]; const void *ip = NULL; int port; + uint32_t sin6_scope_id = 0; int ret; *out = NULL; @@ -177,10 +178,19 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr; ip = &addr6->sin6_addr; port = ntohs(addr6->sin6_port); + sin6_scope_id = addr6->sin6_scope_id; } if (ip != NULL && grpc_inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) { - ret = gpr_join_host_port(out, ntop_buf, port); + if (sin6_scope_id != 0) { + char *host_with_scope; + /* Enclose sin6_scope_id with the format defined in RFC 6784 section 2. */ + gpr_asprintf(&host_with_scope, "%s%%25%" PRIu32, ntop_buf, sin6_scope_id); + ret = gpr_join_host_port(out, host_with_scope, port); + gpr_free(host_with_scope); + } else { + ret = gpr_join_host_port(out, ntop_buf, port); + } } else { ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family); } diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 3de0795187..618483d9cb 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -76,7 +76,6 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, const char *str = grpc_error_string(error); gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s", connect->addr_name, str); - grpc_error_free_string(str); } if (error == GRPC_ERROR_NONE) { /* error == NONE implies that the timer ran out, and wasn't cancelled. If @@ -144,8 +143,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, } } - connect = gpr_malloc(sizeof(grpc_uv_tcp_connect)); - memset(connect, 0, sizeof(grpc_uv_tcp_connect)); + connect = gpr_zalloc(sizeof(grpc_uv_tcp_connect)); connect->closure = closure; connect->endpoint = ep; connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t)); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 36f878fdd4..b3644518f5 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -44,11 +44,8 @@ #include <errno.h> #include <fcntl.h> -#include <ifaddrs.h> -#include <limits.h> #include <netinet/in.h> #include <netinet/tcp.h> -#include <stdio.h> #include <string.h> #include <sys/socket.h> #include <sys/stat.h> @@ -67,80 +64,10 @@ #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" -#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 - -static gpr_once s_init_max_accept_queue_size; -static int s_max_accept_queue_size; - -/* one listening port */ -typedef struct grpc_tcp_listener grpc_tcp_listener; -struct grpc_tcp_listener { - int fd; - grpc_fd *emfd; - grpc_tcp_server *server; - grpc_resolved_address addr; - int port; - unsigned port_index; - unsigned fd_index; - grpc_closure read_closure; - grpc_closure destroyed_closure; - struct grpc_tcp_listener *next; - /* sibling is a linked list of all listeners for a given port. add_port and - clone_port place all new listeners in the same sibling list. A member of - the 'sibling' list is also a member of the 'next' list. The head of each - sibling list has is_sibling==0, and subsequent members of sibling lists - have is_sibling==1. is_sibling allows separate sibling lists to be - identified while iterating through 'next'. */ - struct grpc_tcp_listener *sibling; - int is_sibling; -}; - -/* the overall server */ -struct grpc_tcp_server { - gpr_refcount refs; - /* Called whenever accept() succeeds on a server port. */ - grpc_tcp_server_cb on_accept_cb; - void *on_accept_cb_arg; - - gpr_mu mu; - - /* active port count: how many ports are actually still listening */ - size_t active_ports; - /* destroyed port count: how many ports are completely destroyed */ - size_t destroyed_ports; - - /* is this server shutting down? */ - bool shutdown; - /* use SO_REUSEPORT */ - bool so_reuseport; - /* expand wildcard addresses to a list of all local addresses */ - bool expand_wildcard_addrs; - - /* linked list of server ports */ - grpc_tcp_listener *head; - grpc_tcp_listener *tail; - unsigned nports; - - /* List of closures passed to shutdown_starting_add(). */ - grpc_closure_list shutdown_starting; - - /* shutdown callback */ - grpc_closure *shutdown_complete; - - /* all pollsets interested in new connections */ - grpc_pollset **pollsets; - /* number of pollsets in the pollsets array */ - size_t pollset_count; - - /* next pollset to assign a channel to */ - gpr_atm next_pollset_to_assign; - - grpc_resource_quota *resource_quota; -}; - static gpr_once check_init = GPR_ONCE_INIT; static bool has_so_reuseport = false; @@ -161,7 +88,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server **server) { gpr_once_init(&check_init, init); - grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; s->resource_quota = grpc_resource_quota_create(NULL); s->expand_wildcard_addrs = false; @@ -299,99 +226,6 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } } -/* get max listen queue size on linux */ -static void init_max_accept_queue_size(void) { - int n = SOMAXCONN; - char buf[64]; - FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r"); - if (fp == NULL) { - /* 2.4 kernel. */ - s_max_accept_queue_size = SOMAXCONN; - return; - } - if (fgets(buf, sizeof buf, fp)) { - char *end; - long i = strtol(buf, &end, 10); - if (i > 0 && i <= INT_MAX && end && *end == 0) { - n = (int)i; - } - } - fclose(fp); - s_max_accept_queue_size = n; - - if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) { - gpr_log(GPR_INFO, - "Suspiciously small accept queue (%d) will probably lead to " - "connection drops", - s_max_accept_queue_size); - } -} - -static int get_max_accept_queue_size(void) { - gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size); - return s_max_accept_queue_size; -} - -/* Prepare a recently-created socket for listening. */ -static grpc_error *prepare_socket(int fd, const grpc_resolved_address *addr, - bool so_reuseport, int *port) { - grpc_resolved_address sockname_temp; - grpc_error *err = GRPC_ERROR_NONE; - - GPR_ASSERT(fd >= 0); - - if (so_reuseport && !grpc_is_unix_socket(addr)) { - err = grpc_set_socket_reuse_port(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - } - - err = grpc_set_socket_nonblocking(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - err = grpc_set_socket_cloexec(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - if (!grpc_is_unix_socket(addr)) { - err = grpc_set_socket_low_latency(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - err = grpc_set_socket_reuse_addr(fd, 1); - if (err != GRPC_ERROR_NONE) goto error; - } - err = grpc_set_socket_no_sigpipe_if_possible(fd); - if (err != GRPC_ERROR_NONE) goto error; - - GPR_ASSERT(addr->len < ~(socklen_t)0); - if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) { - err = GRPC_OS_ERROR(errno, "bind"); - goto error; - } - - if (listen(fd, get_max_accept_queue_size()) < 0) { - err = GRPC_OS_ERROR(errno, "listen"); - goto error; - } - - sockname_temp.len = sizeof(struct sockaddr_storage); - - if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, - (socklen_t *)&sockname_temp.len) < 0) { - err = GRPC_OS_ERROR(errno, "getsockname"); - goto error; - } - - *port = grpc_sockaddr_get_port(&sockname_temp); - return GRPC_ERROR_NONE; - -error: - GPR_ASSERT(err != GRPC_ERROR_NONE); - if (fd >= 0) { - close(fd); - } - grpc_error *ret = grpc_error_set_int( - GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1), - GRPC_ERROR_INT_FD, fd); - GRPC_ERROR_UNREF(err); - return ret; -} - /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_tcp_listener *sp = arg; @@ -422,7 +256,14 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); return; default: - gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); + gpr_mu_lock(&sp->server->mu); + if (!sp->server->shutdown_listeners) { + gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); + } else { + /* if we have shutdown listeners, accept4 could fail, and we + needn't notify users */ + } + gpr_mu_unlock(&sp->server->mu); goto error; } } @@ -438,11 +279,6 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_fd *fdobj = grpc_fd_create(fd, name); - if (read_notifier_pollset == NULL) { - gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd"); - goto error; - } - grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); // Create acceptor. @@ -473,216 +309,6 @@ error: } } -static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, - const grpc_resolved_address *addr, - unsigned port_index, unsigned fd_index, - grpc_tcp_listener **listener) { - grpc_tcp_listener *sp = NULL; - int port = -1; - char *addr_str; - char *name; - - grpc_error *err = prepare_socket(fd, addr, s->so_reuseport, &port); - if (err == GRPC_ERROR_NONE) { - GPR_ASSERT(port > 0); - grpc_sockaddr_to_string(&addr_str, addr, 1); - gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); - gpr_mu_lock(&s->mu); - s->nports++; - GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); - sp = gpr_malloc(sizeof(grpc_tcp_listener)); - sp->next = NULL; - if (s->head == NULL) { - s->head = sp; - } else { - s->tail->next = sp; - } - s->tail = sp; - sp->server = s; - sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); - memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); - sp->port = port; - sp->port_index = port_index; - sp->fd_index = fd_index; - sp->is_sibling = 0; - sp->sibling = NULL; - GPR_ASSERT(sp->emfd); - gpr_mu_unlock(&s->mu); - gpr_free(addr_str); - gpr_free(name); - } - - *listener = sp; - return err; -} - -/* If successful, add a listener to s for addr, set *dsmode for the socket, and - return the *listener. */ -static grpc_error *add_addr_to_server(grpc_tcp_server *s, - const grpc_resolved_address *addr, - unsigned port_index, unsigned fd_index, - grpc_dualstack_mode *dsmode, - grpc_tcp_listener **listener) { - grpc_resolved_address addr4_copy; - int fd; - grpc_error *err = - grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd); - if (err != GRPC_ERROR_NONE) { - return err; - } - if (*dsmode == GRPC_DSMODE_IPV4 && - grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { - addr = &addr4_copy; - } - return add_socket_to_server(s, fd, addr, port_index, fd_index, listener); -} - -/* Bind to "::" to get a port number not used by any address. */ -static grpc_error *get_unused_port(int *port) { - grpc_resolved_address wild; - grpc_sockaddr_make_wildcard6(0, &wild); - grpc_dualstack_mode dsmode; - int fd; - grpc_error *err = - grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd); - if (err != GRPC_ERROR_NONE) { - return err; - } - if (dsmode == GRPC_DSMODE_IPV4) { - grpc_sockaddr_make_wildcard4(0, &wild); - } - if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) { - err = GRPC_OS_ERROR(errno, "bind"); - close(fd); - return err; - } - if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) != - 0) { - err = GRPC_OS_ERROR(errno, "getsockname"); - close(fd); - return err; - } - close(fd); - *port = grpc_sockaddr_get_port(&wild); - return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE; -} - -/* Return the listener in s with address addr or NULL. */ -static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s, - grpc_resolved_address *addr) { - grpc_tcp_listener *l; - gpr_mu_lock(&s->mu); - for (l = s->head; l != NULL; l = l->next) { - if (l->addr.len != addr->len) { - continue; - } - if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) { - break; - } - } - gpr_mu_unlock(&s->mu); - return l; -} - -/* Get all addresses assigned to network interfaces on the machine and create a - listener for each. requested_port is the port to use for every listener, or 0 - to select one random port that will be used for every listener. Set *out_port - to the port selected. Return GRPC_ERROR_NONE only if all listeners were - added. */ -static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s, - unsigned port_index, - int requested_port, - int *out_port) { - struct ifaddrs *ifa = NULL; - struct ifaddrs *ifa_it; - unsigned fd_index = 0; - grpc_tcp_listener *sp = NULL; - grpc_error *err = GRPC_ERROR_NONE; - if (requested_port == 0) { - /* Note: There could be a race where some local addrs can listen on the - selected port and some can't. The sane way to handle this would be to - retry by recreating the whole grpc_tcp_server. Backing out individual - listeners and orphaning the FDs looks like too much trouble. */ - if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) { - return err; - } else if (requested_port <= 0) { - return GRPC_ERROR_CREATE("Bad get_unused_port()"); - } - gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port); - } - if (getifaddrs(&ifa) != 0 || ifa == NULL) { - return GRPC_OS_ERROR(errno, "getifaddrs"); - } - for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) { - grpc_resolved_address addr; - char *addr_str = NULL; - grpc_dualstack_mode dsmode; - grpc_tcp_listener *new_sp = NULL; - const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>"); - if (ifa_it->ifa_addr == NULL) { - continue; - } else if (ifa_it->ifa_addr->sa_family == AF_INET) { - addr.len = sizeof(struct sockaddr_in); - } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { - addr.len = sizeof(struct sockaddr_in6); - } else { - continue; - } - memcpy(addr.addr, ifa_it->ifa_addr, addr.len); - if (!grpc_sockaddr_set_port(&addr, requested_port)) { - /* Should never happen, because we check sa_family above. */ - err = GRPC_ERROR_CREATE("Failed to set port"); - break; - } - if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) { - addr_str = gpr_strdup("<error>"); - } - gpr_log(GPR_DEBUG, - "Adding local addr from interface %s flags 0x%x to server: %s", - ifa_name, ifa_it->ifa_flags, addr_str); - /* We could have multiple interfaces with the same address (e.g., bonding), - so look for duplicates. */ - if (find_listener_with_addr(s, &addr) != NULL) { - gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str, - ifa_name); - gpr_free(addr_str); - continue; - } - if ((err = add_addr_to_server(s, &addr, port_index, fd_index, &dsmode, - &new_sp)) != GRPC_ERROR_NONE) { - char *err_str = NULL; - grpc_error *root_err; - if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) { - err_str = gpr_strdup("Failed to add listener"); - } - root_err = GRPC_ERROR_CREATE(err_str); - gpr_free(err_str); - gpr_free(addr_str); - err = grpc_error_add_child(root_err, err); - break; - } else { - GPR_ASSERT(requested_port == new_sp->port); - ++fd_index; - if (sp != NULL) { - new_sp->is_sibling = 1; - sp->sibling = new_sp; - } - sp = new_sp; - } - gpr_free(addr_str); - } - freeifaddrs(ifa); - if (err != GRPC_ERROR_NONE) { - return err; - } else if (sp == NULL) { - return GRPC_ERROR_CREATE("No local addresses"); - } else { - *out_port = sp->port; - return GRPC_ERROR_NONE; - } -} - /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, unsigned port_index, @@ -697,14 +323,16 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, grpc_error *v6_err = GRPC_ERROR_NONE; grpc_error *v4_err = GRPC_ERROR_NONE; *out_port = -1; - if (s->expand_wildcard_addrs) { - return add_all_local_addrs_to_server(s, port_index, requested_port, - out_port); + + if (grpc_tcp_server_have_ifaddrs() && s->expand_wildcard_addrs) { + return grpc_tcp_server_add_all_local_addrs(s, port_index, requested_port, + out_port); } + grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6); /* Try listening on IPv6 first. */ - if ((v6_err = add_addr_to_server(s, &wild6, port_index, fd_index, &dsmode, - &sp)) == GRPC_ERROR_NONE) { + if ((v6_err = grpc_tcp_server_add_addr(s, &wild6, port_index, fd_index, + &dsmode, &sp)) == GRPC_ERROR_NONE) { ++fd_index; requested_port = *out_port = sp->port; if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) { @@ -713,8 +341,8 @@ static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, } /* If we got a v6-only socket or nothing, try adding 0.0.0.0. */ grpc_sockaddr_set_port(&wild4, requested_port); - if ((v4_err = add_addr_to_server(s, &wild4, port_index, fd_index, &dsmode, - &sp2)) == GRPC_ERROR_NONE) { + if ((v4_err = grpc_tcp_server_add_addr(s, &wild4, port_index, fd_index, + &dsmode, &sp2)) == GRPC_ERROR_NONE) { *out_port = sp2->port; if (sp != NULL) { sp2->is_sibling = 1; @@ -752,7 +380,7 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode, &fd); if (err != GRPC_ERROR_NONE) return err; - err = prepare_socket(fd, &listener->addr, true, &port); + err = grpc_tcp_server_prepare_socket(fd, &listener->addr, true, &port); if (err != GRPC_ERROR_NONE) return err; listener->server->nports++; grpc_sockaddr_to_string(&addr_str, &listener->addr, 1); @@ -824,7 +452,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { addr = &addr6_v4mapped; } - if ((err = add_addr_to_server(s, addr, port_index, 0, &dsmode, &sp)) == + if ((err = grpc_tcp_server_add_addr(s, addr, port_index, 0, &dsmode, &sp)) == GRPC_ERROR_NONE) { *out_port = sp->port; } @@ -941,6 +569,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_mu_lock(&s->mu); + s->shutdown_listeners = true; /* shutdown all fd's */ if (s->active_ports) { grpc_tcp_listener *sp; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix.h b/src/core/lib/iomgr/tcp_server_utils_posix.h new file mode 100644 index 0000000000..f5dc8532f9 --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix.h @@ -0,0 +1,134 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H +#define GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H + +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" +#include "src/core/lib/iomgr/tcp_server.h" + +/* one listening port */ +typedef struct grpc_tcp_listener { + int fd; + grpc_fd *emfd; + grpc_tcp_server *server; + grpc_resolved_address addr; + int port; + unsigned port_index; + unsigned fd_index; + grpc_closure read_closure; + grpc_closure destroyed_closure; + struct grpc_tcp_listener *next; + /* sibling is a linked list of all listeners for a given port. add_port and + clone_port place all new listeners in the same sibling list. A member of + the 'sibling' list is also a member of the 'next' list. The head of each + sibling list has is_sibling==0, and subsequent members of sibling lists + have is_sibling==1. is_sibling allows separate sibling lists to be + identified while iterating through 'next'. */ + struct grpc_tcp_listener *sibling; + int is_sibling; +} grpc_tcp_listener; + +/* the overall server */ +struct grpc_tcp_server { + gpr_refcount refs; + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; + + gpr_mu mu; + + /* active port count: how many ports are actually still listening */ + size_t active_ports; + /* destroyed port count: how many ports are completely destroyed */ + size_t destroyed_ports; + + /* is this server shutting down? */ + bool shutdown; + /* have listeners been shutdown? */ + bool shutdown_listeners; + /* use SO_REUSEPORT */ + bool so_reuseport; + /* expand wildcard addresses to a list of all local addresses */ + bool expand_wildcard_addrs; + + /* linked list of server ports */ + grpc_tcp_listener *head; + grpc_tcp_listener *tail; + unsigned nports; + + /* List of closures passed to shutdown_starting_add(). */ + grpc_closure_list shutdown_starting; + + /* shutdown callback */ + grpc_closure *shutdown_complete; + + /* all pollsets interested in new connections */ + grpc_pollset **pollsets; + /* number of pollsets in the pollsets array */ + size_t pollset_count; + + /* next pollset to assign a channel to */ + gpr_atm next_pollset_to_assign; + + grpc_resource_quota *resource_quota; +}; + +/* If successful, add a listener to \a s for \a addr, set \a dsmode for the + socket, and return the \a listener. */ +grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_dualstack_mode *dsmode, + grpc_tcp_listener **listener); + +/* Get all addresses assigned to network interfaces on the machine and create a + listener for each. requested_port is the port to use for every listener, or 0 + to select one random port that will be used for every listener. Set *out_port + to the port selected. Return GRPC_ERROR_NONE only if all listeners were + added. */ +grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port); + +/* Prepare a recently-created socket for listening. */ +grpc_error *grpc_tcp_server_prepare_socket(int fd, + const grpc_resolved_address *addr, + bool so_reuseport, int *port); +/* Ruturn true if the platform supports ifaddrs */ +bool grpc_tcp_server_have_ifaddrs(void); + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_SERVER_UTILS_POSIX_H */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.c b/src/core/lib/iomgr/tcp_server_utils_posix_common.c new file mode 100644 index 0000000000..e45e27d5ab --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.c @@ -0,0 +1,220 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_IFADDRS + +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" + +#include <errno.h> +#include <limits.h> +#include <stdio.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/unix_sockets_posix.h" + +#define MIN_SAFE_ACCEPT_QUEUE_SIZE 100 + +static gpr_once s_init_max_accept_queue_size; +static int s_max_accept_queue_size; + +/* get max listen queue size on linux */ +static void init_max_accept_queue_size(void) { + int n = SOMAXCONN; + char buf[64]; + FILE *fp = fopen("/proc/sys/net/core/somaxconn", "r"); + if (fp == NULL) { + /* 2.4 kernel. */ + s_max_accept_queue_size = SOMAXCONN; + return; + } + if (fgets(buf, sizeof buf, fp)) { + char *end; + long i = strtol(buf, &end, 10); + if (i > 0 && i <= INT_MAX && end && *end == 0) { + n = (int)i; + } + } + fclose(fp); + s_max_accept_queue_size = n; + + if (s_max_accept_queue_size < MIN_SAFE_ACCEPT_QUEUE_SIZE) { + gpr_log(GPR_INFO, + "Suspiciously small accept queue (%d) will probably lead to " + "connection drops", + s_max_accept_queue_size); + } +} + +static int get_max_accept_queue_size(void) { + gpr_once_init(&s_init_max_accept_queue_size, init_max_accept_queue_size); + return s_max_accept_queue_size; +} + +static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_tcp_listener **listener) { + grpc_tcp_listener *sp = NULL; + int port = -1; + char *addr_str; + char *name; + + grpc_error *err = + grpc_tcp_server_prepare_socket(fd, addr, s->so_reuseport, &port); + if (err == GRPC_ERROR_NONE) { + GPR_ASSERT(port > 0); + grpc_sockaddr_to_string(&addr_str, addr, 1); + gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); + gpr_mu_lock(&s->mu); + s->nports++; + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); + sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp->next = NULL; + if (s->head == NULL) { + s->head = sp; + } else { + s->tail->next = sp; + } + s->tail = sp; + sp->server = s; + sp->fd = fd; + sp->emfd = grpc_fd_create(fd, name); + memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); + sp->port = port; + sp->port_index = port_index; + sp->fd_index = fd_index; + sp->is_sibling = 0; + sp->sibling = NULL; + GPR_ASSERT(sp->emfd); + gpr_mu_unlock(&s->mu); + gpr_free(addr_str); + gpr_free(name); + } + + *listener = sp; + return err; +} + +/* If successful, add a listener to s for addr, set *dsmode for the socket, and + return the *listener. */ +grpc_error *grpc_tcp_server_add_addr(grpc_tcp_server *s, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_dualstack_mode *dsmode, + grpc_tcp_listener **listener) { + grpc_resolved_address addr4_copy; + int fd; + grpc_error *err = + grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd); + if (err != GRPC_ERROR_NONE) { + return err; + } + if (*dsmode == GRPC_DSMODE_IPV4 && + grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { + addr = &addr4_copy; + } + return add_socket_to_server(s, fd, addr, port_index, fd_index, listener); +} + +/* Prepare a recently-created socket for listening. */ +grpc_error *grpc_tcp_server_prepare_socket(int fd, + const grpc_resolved_address *addr, + bool so_reuseport, int *port) { + grpc_resolved_address sockname_temp; + grpc_error *err = GRPC_ERROR_NONE; + + GPR_ASSERT(fd >= 0); + + if (so_reuseport && !grpc_is_unix_socket(addr)) { + err = grpc_set_socket_reuse_port(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + } + + err = grpc_set_socket_nonblocking(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_cloexec(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + if (!grpc_is_unix_socket(addr)) { + err = grpc_set_socket_low_latency(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_reuse_addr(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + } + err = grpc_set_socket_no_sigpipe_if_possible(fd); + if (err != GRPC_ERROR_NONE) goto error; + + GPR_ASSERT(addr->len < ~(socklen_t)0); + if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) { + err = GRPC_OS_ERROR(errno, "bind"); + goto error; + } + + if (listen(fd, get_max_accept_queue_size()) < 0) { + err = GRPC_OS_ERROR(errno, "listen"); + goto error; + } + + sockname_temp.len = sizeof(struct sockaddr_storage); + + if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, + (socklen_t *)&sockname_temp.len) < 0) { + err = GRPC_OS_ERROR(errno, "getsockname"); + goto error; + } + + *port = grpc_sockaddr_get_port(&sockname_temp); + return GRPC_ERROR_NONE; + +error: + GPR_ASSERT(err != GRPC_ERROR_NONE); + if (fd >= 0) { + close(fd); + } + grpc_error *ret = grpc_error_set_int( + GRPC_ERROR_CREATE_REFERENCING("Unable to configure socket", &err, 1), + GRPC_ERROR_INT_FD, fd); + GRPC_ERROR_UNREF(err); + return ret; +} + +#endif /* GRPC_HAVE_IFADDRS */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c new file mode 100644 index 0000000000..6354a6bdc1 --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix_ifaddrs.c @@ -0,0 +1,195 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_HAVE_IFADDRS + +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" + +#include <errno.h> +#include <ifaddrs.h> +#include <stddef.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" + +/* Return the listener in s with address addr or NULL. */ +static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s, + grpc_resolved_address *addr) { + grpc_tcp_listener *l; + gpr_mu_lock(&s->mu); + for (l = s->head; l != NULL; l = l->next) { + if (l->addr.len != addr->len) { + continue; + } + if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) { + break; + } + } + gpr_mu_unlock(&s->mu); + return l; +} + +/* Bind to "::" to get a port number not used by any address. */ +static grpc_error *get_unused_port(int *port) { + grpc_resolved_address wild; + grpc_sockaddr_make_wildcard6(0, &wild); + grpc_dualstack_mode dsmode; + int fd; + grpc_error *err = + grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd); + if (err != GRPC_ERROR_NONE) { + return err; + } + if (dsmode == GRPC_DSMODE_IPV4) { + grpc_sockaddr_make_wildcard4(0, &wild); + } + if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) { + err = GRPC_OS_ERROR(errno, "bind"); + close(fd); + return err; + } + if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) != + 0) { + err = GRPC_OS_ERROR(errno, "getsockname"); + close(fd); + return err; + } + close(fd); + *port = grpc_sockaddr_get_port(&wild); + return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE; +} + +grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port) { + struct ifaddrs *ifa = NULL; + struct ifaddrs *ifa_it; + unsigned fd_index = 0; + grpc_tcp_listener *sp = NULL; + grpc_error *err = GRPC_ERROR_NONE; + if (requested_port == 0) { + /* Note: There could be a race where some local addrs can listen on the + selected port and some can't. The sane way to handle this would be to + retry by recreating the whole grpc_tcp_server. Backing out individual + listeners and orphaning the FDs looks like too much trouble. */ + if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) { + return err; + } else if (requested_port <= 0) { + return GRPC_ERROR_CREATE("Bad get_unused_port()"); + } + gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port); + } + if (getifaddrs(&ifa) != 0 || ifa == NULL) { + return GRPC_OS_ERROR(errno, "getifaddrs"); + } + for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) { + grpc_resolved_address addr; + char *addr_str = NULL; + grpc_dualstack_mode dsmode; + grpc_tcp_listener *new_sp = NULL; + const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>"); + if (ifa_it->ifa_addr == NULL) { + continue; + } else if (ifa_it->ifa_addr->sa_family == AF_INET) { + addr.len = sizeof(struct sockaddr_in); + } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { + addr.len = sizeof(struct sockaddr_in6); + } else { + continue; + } + memcpy(addr.addr, ifa_it->ifa_addr, addr.len); + if (!grpc_sockaddr_set_port(&addr, requested_port)) { + /* Should never happen, because we check sa_family above. */ + err = GRPC_ERROR_CREATE("Failed to set port"); + break; + } + if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) { + addr_str = gpr_strdup("<error>"); + } + gpr_log(GPR_DEBUG, + "Adding local addr from interface %s flags 0x%x to server: %s", + ifa_name, ifa_it->ifa_flags, addr_str); + /* We could have multiple interfaces with the same address (e.g., bonding), + so look for duplicates. */ + if (find_listener_with_addr(s, &addr) != NULL) { + gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str, + ifa_name); + gpr_free(addr_str); + continue; + } + if ((err = grpc_tcp_server_add_addr(s, &addr, port_index, fd_index, &dsmode, + &new_sp)) != GRPC_ERROR_NONE) { + char *err_str = NULL; + grpc_error *root_err; + if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) { + err_str = gpr_strdup("Failed to add listener"); + } + root_err = GRPC_ERROR_CREATE(err_str); + gpr_free(err_str); + gpr_free(addr_str); + err = grpc_error_add_child(root_err, err); + break; + } else { + GPR_ASSERT(requested_port == new_sp->port); + ++fd_index; + if (sp != NULL) { + new_sp->is_sibling = 1; + sp->sibling = new_sp; + } + sp = new_sp; + } + gpr_free(addr_str); + } + freeifaddrs(ifa); + if (err != GRPC_ERROR_NONE) { + return err; + } else if (sp == NULL) { + return GRPC_ERROR_CREATE("No local addresses"); + } else { + *out_port = sp->port; + return GRPC_ERROR_NONE; + } +} + +bool grpc_tcp_server_have_ifaddrs(void) { return true; } + +#endif /* GRPC_HAVE_IFADDRS */ diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c new file mode 100644 index 0000000000..95c3198be6 --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_utils_posix_noifaddrs.c @@ -0,0 +1,49 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#if defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS) + +#include "src/core/lib/iomgr/tcp_server_utils_posix.h" + +grpc_error *grpc_tcp_server_add_all_local_addrs(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port) { + return GRPC_ERROR_CREATE("no ifaddrs available"); +} + +bool grpc_tcp_server_have_ifaddrs(void) { return false; } + +#endif /* defined(GRPC_POSIX_SOCKET) && !defined(GRPC_HAVE_IFADDRS) */ diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 6d638bcbaa..d4df96c214 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -42,6 +42,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/iomgr/time_averaged_stats.h" #include "src/core/lib/iomgr/timer_heap.h" +#include "src/core/lib/support/spinlock.h" #define INVALID_HEAP_INDEX 0xffffffffu @@ -69,7 +70,7 @@ typedef struct { /* Protects g_shard_queue */ static gpr_mu g_mu; /* Allow only one run_some_expired_timers at once */ -static gpr_mu g_checker_mu; +static gpr_spinlock g_checker_mu = GPR_SPINLOCK_STATIC_INITIALIZER; static gpr_clock_type g_clock_type; static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ @@ -90,7 +91,6 @@ void grpc_timer_list_init(gpr_timespec now) { g_initialized = true; gpr_mu_init(&g_mu); - gpr_mu_init(&g_checker_mu); g_clock_type = now.clock_type; for (i = 0; i < NUM_SHARDS; i++) { @@ -117,7 +117,6 @@ void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { grpc_timer_heap_destroy(&shard->heap); } gpr_mu_destroy(&g_mu); - gpr_mu_destroy(&g_checker_mu); g_initialized = false; } @@ -324,7 +323,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, /* TODO(ctiller): verify that there are any timers (atomically) here */ - if (gpr_mu_trylock(&g_checker_mu)) { + if (gpr_spinlock_trylock(&g_checker_mu)) { gpr_mu_lock(&g_mu); while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { @@ -350,7 +349,7 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, } gpr_mu_unlock(&g_mu); - gpr_mu_unlock(&g_checker_mu); + gpr_spinlock_unlock(&g_checker_mu); } else if (next != NULL) { /* TODO(ctiller): this forces calling code to do an short poll, and then retry the timer check (because this time through the timer list was diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 2a1c8d39fa..71e295770a 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -109,8 +109,8 @@ struct grpc_udp_server { grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; - /* The parent grpc server */ - grpc_server *grpc_server; + /* opaque object to pass to callbacks */ + void *user_data; }; grpc_udp_server *grpc_udp_server_create(void) { @@ -178,7 +178,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { /* Call the orphan_cb to signal that the FD is about to be closed and * should no longer be used. */ GPR_ASSERT(sp->orphan_cb); - sp->orphan_cb(exec_ctx, sp->emfd); + sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data); grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, "udp_listener_shutdown"); @@ -204,7 +204,7 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); - sp->orphan_cb(exec_ctx, sp->emfd); + sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data); grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE("Server destroyed")); } @@ -299,7 +299,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* Tell the registered callback that data is available to read. */ GPR_ASSERT(sp->read_cb); - sp->read_cb(exec_ctx, sp->emfd, sp->server->grpc_server); + sp->read_cb(exec_ctx, sp->emfd, sp->server->user_data); /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); @@ -322,7 +322,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* Tell the registered callback that the socket is writeable. */ GPR_ASSERT(sp->write_cb); - sp->write_cb(exec_ctx, sp->emfd); + sp->write_cb(exec_ctx, sp->emfd, sp->server->user_data); /* Re-arm the notification event so we get another chance to write. */ grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); @@ -464,13 +464,13 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_pollset **pollsets, size_t pollset_count, - grpc_server *server) { + void *user_data) { size_t i; gpr_mu_lock(&s->mu); grpc_udp_listener *sp; GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; - s->grpc_server = server; + s->user_data = user_data; sp = s->head; while (sp != NULL) { @@ -485,7 +485,11 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); - s->active_ports++; + /* Registered for both read and write callbacks: increment active_ports + * twice to account for this, and delay free-ing of memory until both + * on_read and on_write have fired. */ + s->active_ports += 2; + sp = sp->next; } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index ed63fa7d81..90842a47f0 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -47,23 +47,23 @@ typedef struct grpc_udp_server grpc_udp_server; /* Called when data is available to read from the socket. */ typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, - struct grpc_server *server); + void *user_data); /* Called when the socket is writeable. */ -typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, - grpc_fd *emfd); +typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, + void *user_data); /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx, - grpc_fd *emfd); + grpc_fd *emfd, void *user_data); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); -/* Start listening to bound ports */ +/* Start listening to bound ports. user_data is passed to callbacks. */ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, grpc_pollset **pollsets, size_t pollset_count, - struct grpc_server *server); + void *user_data); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index); diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index 71d32d97ba..c8dd242c75 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -46,7 +46,7 @@ * * Setup: * 1. Before calling anything, call global_init() at least once. - * 1. Call grpc_wakeup_fd_create() to get a wakeup_fd. + * 1. Call grpc_wakeup_fd_init() to set up a wakeup_fd. * 2. Add the result of GRPC_WAKEUP_FD_FD to the set of monitored file * descriptors for the poll() style API you are using. Monitor the file * descriptor for readability. diff --git a/src/core/lib/json/json.c b/src/core/lib/json/json.c index 48b13686d7..5f7e4ec042 100644 --- a/src/core/lib/json/json.c +++ b/src/core/lib/json/json.c @@ -38,8 +38,7 @@ #include "src/core/lib/json/json.h" grpc_json* grpc_json_create(grpc_json_type type) { - grpc_json* json = gpr_malloc(sizeof(*json)); - memset(json, 0, sizeof(*json)); + grpc_json* json = gpr_zalloc(sizeof(*json)); json->type = type; return json; diff --git a/src/core/lib/security/context/security_context.c b/src/core/lib/security/context/security_context.c index fe82fabc31..d29fc55ea0 100644 --- a/src/core/lib/security/context/security_context.c +++ b/src/core/lib/security/context/security_context.c @@ -91,10 +91,7 @@ void grpc_auth_context_release(grpc_auth_context *context) { /* --- grpc_client_security_context --- */ grpc_client_security_context *grpc_client_security_context_create(void) { - grpc_client_security_context *ctx = - gpr_malloc(sizeof(grpc_client_security_context)); - memset(ctx, 0, sizeof(grpc_client_security_context)); - return ctx; + return gpr_zalloc(sizeof(grpc_client_security_context)); } void grpc_client_security_context_destroy(void *ctx) { @@ -112,10 +109,7 @@ void grpc_client_security_context_destroy(void *ctx) { /* --- grpc_server_security_context --- */ grpc_server_security_context *grpc_server_security_context_create(void) { - grpc_server_security_context *ctx = - gpr_malloc(sizeof(grpc_server_security_context)); - memset(ctx, 0, sizeof(grpc_server_security_context)); - return ctx; + return gpr_zalloc(sizeof(grpc_server_security_context)); } void grpc_server_security_context_destroy(void *ctx) { @@ -132,8 +126,7 @@ void grpc_server_security_context_destroy(void *ctx) { static grpc_auth_property_iterator empty_iterator = {NULL, 0, NULL}; grpc_auth_context *grpc_auth_context_create(grpc_auth_context *chained) { - grpc_auth_context *ctx = gpr_malloc(sizeof(grpc_auth_context)); - memset(ctx, 0, sizeof(grpc_auth_context)); + grpc_auth_context *ctx = gpr_zalloc(sizeof(grpc_auth_context)); gpr_ref_init(&ctx->refcount, 1); if (chained != NULL) { ctx->chained = GRPC_AUTH_CONTEXT_REF(chained, "chained"); diff --git a/src/core/lib/security/credentials/composite/composite_credentials.c b/src/core/lib/security/credentials/composite/composite_credentials.c index be1588dd8b..e497b9f657 100644 --- a/src/core/lib/security/credentials/composite/composite_credentials.c +++ b/src/core/lib/security/credentials/composite/composite_credentials.c @@ -115,8 +115,7 @@ static void composite_call_get_request_metadata( grpc_composite_call_credentials *c = (grpc_composite_call_credentials *)creds; grpc_composite_call_credentials_metadata_context *ctx; - ctx = gpr_malloc(sizeof(grpc_composite_call_credentials_metadata_context)); - memset(ctx, 0, sizeof(grpc_composite_call_credentials_metadata_context)); + ctx = gpr_zalloc(sizeof(grpc_composite_call_credentials_metadata_context)); ctx->auth_md_context = auth_md_context; ctx->user_data = user_data; ctx->cb = cb; @@ -158,8 +157,7 @@ grpc_call_credentials *grpc_composite_call_credentials_create( GPR_ASSERT(reserved == NULL); GPR_ASSERT(creds1 != NULL); GPR_ASSERT(creds2 != NULL); - c = gpr_malloc(sizeof(grpc_composite_call_credentials)); - memset(c, 0, sizeof(grpc_composite_call_credentials)); + c = gpr_zalloc(sizeof(grpc_composite_call_credentials)); c->base.type = GRPC_CALL_CREDENTIALS_TYPE_COMPOSITE; c->base.vtable = &composite_call_credentials_vtable; gpr_ref_init(&c->base.refcount, 1); @@ -167,8 +165,7 @@ grpc_call_credentials *grpc_composite_call_credentials_create( creds2_array = get_creds_array(&creds2); c->inner.num_creds = creds1_array.num_creds + creds2_array.num_creds; creds_array_byte_size = c->inner.num_creds * sizeof(grpc_call_credentials *); - c->inner.creds_array = gpr_malloc(creds_array_byte_size); - memset(c->inner.creds_array, 0, creds_array_byte_size); + c->inner.creds_array = gpr_zalloc(creds_array_byte_size); for (i = 0; i < creds1_array.num_creds; i++) { grpc_call_credentials *cur_creds = creds1_array.creds_array[i]; c->inner.creds_array[i] = grpc_call_credentials_ref(cur_creds); @@ -262,8 +259,7 @@ static grpc_channel_credentials_vtable composite_channel_credentials_vtable = { grpc_channel_credentials *grpc_composite_channel_credentials_create( grpc_channel_credentials *channel_creds, grpc_call_credentials *call_creds, void *reserved) { - grpc_composite_channel_credentials *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); + grpc_composite_channel_credentials *c = gpr_zalloc(sizeof(*c)); GPR_ASSERT(channel_creds != NULL && call_creds != NULL && reserved == NULL); GRPC_API_TRACE( "grpc_composite_channel_credentials_create(channel_creds=%p, " diff --git a/src/core/lib/security/credentials/credentials.c b/src/core/lib/security/credentials/credentials.c index b24697ce54..52b80141d1 100644 --- a/src/core/lib/security/credentials/credentials.c +++ b/src/core/lib/security/credentials/credentials.c @@ -57,8 +57,7 @@ grpc_credentials_metadata_request *grpc_credentials_metadata_request_create( grpc_call_credentials *creds, grpc_credentials_metadata_cb cb, void *user_data) { grpc_credentials_metadata_request *r = - gpr_malloc(sizeof(grpc_credentials_metadata_request)); - memset(&r->response, 0, sizeof(r->response)); + gpr_zalloc(sizeof(grpc_credentials_metadata_request)); r->creds = grpc_call_credentials_ref(creds); r->cb = cb; r->user_data = user_data; diff --git a/src/core/lib/security/credentials/credentials_metadata.c b/src/core/lib/security/credentials/credentials_metadata.c index 68da5fb4a8..f11cc58786 100644 --- a/src/core/lib/security/credentials/credentials_metadata.c +++ b/src/core/lib/security/credentials/credentials_metadata.c @@ -50,8 +50,7 @@ static void store_ensure_capacity(grpc_credentials_md_store *store) { grpc_credentials_md_store *grpc_credentials_md_store_create( size_t initial_capacity) { grpc_credentials_md_store *store = - gpr_malloc(sizeof(grpc_credentials_md_store)); - memset(store, 0, sizeof(grpc_credentials_md_store)); + gpr_zalloc(sizeof(grpc_credentials_md_store)); if (initial_capacity > 0) { store->entries = gpr_malloc(initial_capacity * sizeof(grpc_credentials_md)); store->allocated = initial_capacity; diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c index a0629f76ce..68636ba208 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.c +++ b/src/core/lib/security/credentials/fake/fake_credentials.c @@ -71,8 +71,7 @@ static grpc_server_credentials_vtable grpc_channel_credentials *grpc_fake_transport_security_credentials_create( void) { - grpc_channel_credentials *c = gpr_malloc(sizeof(grpc_channel_credentials)); - memset(c, 0, sizeof(grpc_channel_credentials)); + grpc_channel_credentials *c = gpr_zalloc(sizeof(grpc_channel_credentials)); c->type = GRPC_CHANNEL_CREDENTIALS_TYPE_FAKE_TRANSPORT_SECURITY; c->vtable = &fake_transport_security_credentials_vtable; gpr_ref_init(&c->refcount, 1); @@ -131,8 +130,7 @@ static grpc_call_credentials_vtable md_only_test_vtable = { grpc_call_credentials *grpc_md_only_test_credentials_create( const char *md_key, const char *md_value, int is_async) { grpc_md_only_test_credentials *c = - gpr_malloc(sizeof(grpc_md_only_test_credentials)); - memset(c, 0, sizeof(grpc_md_only_test_credentials)); + gpr_zalloc(sizeof(grpc_md_only_test_credentials)); c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2; c->base.vtable = &md_only_test_vtable; gpr_ref_init(&c->base.refcount, 1); diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index ecd26de9fa..dd44621347 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -112,7 +112,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { on compute engine. */ gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); - grpc_pollset *pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset *pollset = gpr_zalloc(grpc_pollset_size()); grpc_pollset_init(pollset, &g_polling_mu); detector.pollent = grpc_polling_entity_create_from_pollset(pollset); detector.is_done = 0; diff --git a/src/core/lib/security/credentials/iam/iam_credentials.c b/src/core/lib/security/credentials/iam/iam_credentials.c index abd69a9670..393a433f2e 100644 --- a/src/core/lib/security/credentials/iam/iam_credentials.c +++ b/src/core/lib/security/credentials/iam/iam_credentials.c @@ -72,8 +72,7 @@ grpc_call_credentials *grpc_google_iam_credentials_create( GPR_ASSERT(reserved == NULL); GPR_ASSERT(token != NULL); GPR_ASSERT(authority_selector != NULL); - c = gpr_malloc(sizeof(grpc_google_iam_credentials)); - memset(c, 0, sizeof(grpc_google_iam_credentials)); + c = gpr_zalloc(sizeof(grpc_google_iam_credentials)); c->base.type = GRPC_CALL_CREDENTIALS_TYPE_IAM; c->base.vtable = &iam_vtable; gpr_ref_init(&c->base.refcount, 1); diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.c b/src/core/lib/security/credentials/jwt/jwt_credentials.c index 616be64a54..178ce89aa6 100644 --- a/src/core/lib/security/credentials/jwt/jwt_credentials.c +++ b/src/core/lib/security/credentials/jwt/jwt_credentials.c @@ -135,8 +135,7 @@ grpc_service_account_jwt_access_credentials_create_from_auth_json_key( gpr_log(GPR_ERROR, "Invalid input for jwt credentials creation"); return NULL; } - c = gpr_malloc(sizeof(grpc_service_account_jwt_access_credentials)); - memset(c, 0, sizeof(grpc_service_account_jwt_access_credentials)); + c = gpr_zalloc(sizeof(grpc_service_account_jwt_access_credentials)); c->base.type = GRPC_CALL_CREDENTIALS_TYPE_JWT; gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &jwt_vtable; diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index f128177e8c..5c59cf0f4a 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -144,8 +144,7 @@ static void jose_header_destroy(grpc_exec_ctx *exec_ctx, jose_header *h) { static jose_header *jose_header_from_json(grpc_exec_ctx *exec_ctx, grpc_json *json, grpc_slice buffer) { grpc_json *cur; - jose_header *h = gpr_malloc(sizeof(jose_header)); - memset(h, 0, sizeof(jose_header)); + jose_header *h = gpr_zalloc(sizeof(jose_header)); h->buffer = buffer; for (cur = json->child; cur != NULL; cur = cur->next) { if (strcmp(cur->key, "alg") == 0) { @@ -363,8 +362,7 @@ static verifier_cb_ctx *verifier_cb_ctx_create( const char *signed_jwt, size_t signed_jwt_len, void *user_data, grpc_jwt_verification_done_cb cb) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - verifier_cb_ctx *ctx = gpr_malloc(sizeof(verifier_cb_ctx)); - memset(ctx, 0, sizeof(verifier_cb_ctx)); + verifier_cb_ctx *ctx = gpr_zalloc(sizeof(verifier_cb_ctx)); ctx->verifier = verifier; ctx->pollent = grpc_polling_entity_create_from_pollset(pollset); ctx->header = header; @@ -878,8 +876,7 @@ error: grpc_jwt_verifier *grpc_jwt_verifier_create( const grpc_jwt_verifier_email_domain_key_url_mapping *mappings, size_t num_mappings) { - grpc_jwt_verifier *v = gpr_malloc(sizeof(grpc_jwt_verifier)); - memset(v, 0, sizeof(grpc_jwt_verifier)); + grpc_jwt_verifier *v = gpr_zalloc(sizeof(grpc_jwt_verifier)); grpc_httpcli_context_init(&v->http_ctx); /* We know at least of one mapping. */ diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index c0f260f938..ccfb3566c1 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -389,8 +389,7 @@ grpc_refresh_token_credentials_create_from_auth_refresh_token( gpr_log(GPR_ERROR, "Invalid input for refresh token credentials creation"); return NULL; } - c = gpr_malloc(sizeof(grpc_google_refresh_token_credentials)); - memset(c, 0, sizeof(grpc_google_refresh_token_credentials)); + c = gpr_zalloc(sizeof(grpc_google_refresh_token_credentials)); init_oauth2_token_fetcher(&c->base, refresh_token_fetch_oauth2); c->base.base.vtable = &refresh_token_vtable; c->refresh_token = refresh_token; @@ -450,14 +449,13 @@ static grpc_call_credentials_vtable access_token_vtable = { grpc_call_credentials *grpc_access_token_credentials_create( const char *access_token, void *reserved) { grpc_access_token_credentials *c = - gpr_malloc(sizeof(grpc_access_token_credentials)); + gpr_zalloc(sizeof(grpc_access_token_credentials)); char *token_md_value; GRPC_API_TRACE( "grpc_access_token_credentials_create(access_token=<redacted>, " "reserved=%p)", 1, (reserved)); GPR_ASSERT(reserved == NULL); - memset(c, 0, sizeof(grpc_access_token_credentials)); c->base.type = GRPC_CALL_CREDENTIALS_TYPE_OAUTH2; c->base.vtable = &access_token_vtable; gpr_ref_init(&c->base.refcount, 1); diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.c b/src/core/lib/security/credentials/plugin/plugin_credentials.c index 7bc5dfb403..8416bfa5a8 100644 --- a/src/core/lib/security/credentials/plugin/plugin_credentials.c +++ b/src/core/lib/security/credentials/plugin/plugin_credentials.c @@ -126,8 +126,7 @@ static void plugin_get_request_metadata(grpc_exec_ctx *exec_ctx, void *user_data) { grpc_plugin_credentials *c = (grpc_plugin_credentials *)creds; if (c->plugin.get_metadata != NULL) { - grpc_metadata_plugin_request *request = gpr_malloc(sizeof(*request)); - memset(request, 0, sizeof(*request)); + grpc_metadata_plugin_request *request = gpr_zalloc(sizeof(*request)); request->user_data = user_data; request->cb = cb; c->plugin.get_metadata(c->plugin.state, context, @@ -142,11 +141,10 @@ static grpc_call_credentials_vtable plugin_vtable = { grpc_call_credentials *grpc_metadata_credentials_create_from_plugin( grpc_metadata_credentials_plugin plugin, void *reserved) { - grpc_plugin_credentials *c = gpr_malloc(sizeof(*c)); + grpc_plugin_credentials *c = gpr_zalloc(sizeof(*c)); GRPC_API_TRACE("grpc_metadata_credentials_create_from_plugin(reserved=%p)", 1, (reserved)); GPR_ASSERT(reserved == NULL); - memset(c, 0, sizeof(*c)); c->base.type = plugin.type; c->base.vtable = &plugin_vtable; gpr_ref_init(&c->base.refcount, 1); diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.c b/src/core/lib/security/credentials/ssl/ssl_credentials.c index 4eebb7d613..4b17ac8098 100644 --- a/src/core/lib/security/credentials/ssl/ssl_credentials.c +++ b/src/core/lib/security/credentials/ssl/ssl_credentials.c @@ -121,14 +121,13 @@ static void ssl_build_config(const char *pem_root_certs, grpc_channel_credentials *grpc_ssl_credentials_create( const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pair, void *reserved) { - grpc_ssl_credentials *c = gpr_malloc(sizeof(grpc_ssl_credentials)); + grpc_ssl_credentials *c = gpr_zalloc(sizeof(grpc_ssl_credentials)); GRPC_API_TRACE( "grpc_ssl_credentials_create(pem_root_certs=%s, " "pem_key_cert_pair=%p, " "reserved=%p)", 3, (pem_root_certs, pem_key_cert_pair, reserved)); GPR_ASSERT(reserved == NULL); - memset(c, 0, sizeof(grpc_ssl_credentials)); c->base.type = GRPC_CHANNEL_CREDENTIALS_TYPE_SSL; c->base.vtable = &ssl_vtable; gpr_ref_init(&c->base.refcount, 1); @@ -225,7 +224,7 @@ grpc_server_credentials *grpc_ssl_server_credentials_create_ex( grpc_ssl_client_certificate_request_type client_certificate_request, void *reserved) { grpc_ssl_server_credentials *c = - gpr_malloc(sizeof(grpc_ssl_server_credentials)); + gpr_zalloc(sizeof(grpc_ssl_server_credentials)); GRPC_API_TRACE( "grpc_ssl_server_credentials_create_ex(" "pem_root_certs=%s, pem_key_cert_pairs=%p, num_key_cert_pairs=%lu, " @@ -233,7 +232,6 @@ grpc_server_credentials *grpc_ssl_server_credentials_create_ex( 5, (pem_root_certs, pem_key_cert_pairs, (unsigned long)num_key_cert_pairs, client_certificate_request, reserved)); GPR_ASSERT(reserved == NULL); - memset(c, 0, sizeof(grpc_ssl_server_credentials)); c->base.type = GRPC_CHANNEL_CREDENTIALS_TYPE_SSL; gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &ssl_server_vtable; diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index a23082a866..8dea1d98ff 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -318,7 +318,7 @@ static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; grpc_call_credentials_unref(exec_ctx, calld->creds); if (calld->have_host) { diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index aeb04e33a3..ad083a730f 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -412,8 +412,7 @@ static grpc_security_connector_vtable fake_server_vtable = { grpc_channel_security_connector *grpc_fake_channel_security_connector_create( grpc_call_credentials *request_metadata_creds, const char *target, const grpc_channel_args *args) { - grpc_fake_channel_security_connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); + grpc_fake_channel_security_connector *c = gpr_zalloc(sizeof(*c)); gpr_ref_init(&c->base.base.refcount, 1); c->base.base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; c->base.base.vtable = &fake_channel_vtable; @@ -435,8 +434,7 @@ grpc_channel_security_connector *grpc_fake_channel_security_connector_create( grpc_server_security_connector *grpc_fake_server_security_connector_create( void) { grpc_server_security_connector *c = - gpr_malloc(sizeof(grpc_server_security_connector)); - memset(c, 0, sizeof(*c)); + gpr_zalloc(sizeof(grpc_server_security_connector)); gpr_ref_init(&c->base.refcount, 1); c->base.vtable = &fake_server_vtable; c->base.url_scheme = GRPC_FAKE_SECURITY_URL_SCHEME; @@ -815,8 +813,7 @@ grpc_security_status grpc_ssl_channel_security_connector_create( pem_root_certs_size = config->pem_root_certs_size; } - c = gpr_malloc(sizeof(grpc_ssl_channel_security_connector)); - memset(c, 0, sizeof(grpc_ssl_channel_security_connector)); + c = gpr_zalloc(sizeof(grpc_ssl_channel_security_connector)); gpr_ref_init(&c->base.base.refcount, 1); c->base.base.vtable = &ssl_channel_vtable; @@ -877,8 +874,7 @@ grpc_security_status grpc_ssl_server_security_connector_create( gpr_log(GPR_ERROR, "An SSL server needs a key and a cert."); goto error; } - c = gpr_malloc(sizeof(grpc_ssl_server_security_connector)); - memset(c, 0, sizeof(grpc_ssl_server_security_connector)); + c = gpr_zalloc(sizeof(grpc_ssl_server_security_connector)); gpr_ref_init(&c->base.base.refcount, 1); c->base.base.url_scheme = GRPC_SSL_URL_SCHEME; diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index 5d57543ac5..7065d261ba 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -387,8 +387,7 @@ static const grpc_handshaker_vtable security_handshaker_vtable = { static grpc_handshaker *security_handshaker_create( grpc_exec_ctx *exec_ctx, tsi_handshaker *handshaker, grpc_security_connector *connector) { - security_handshaker *h = gpr_malloc(sizeof(security_handshaker)); - memset(h, 0, sizeof(security_handshaker)); + security_handshaker *h = gpr_zalloc(sizeof(security_handshaker)); grpc_handshaker_init(&security_handshaker_vtable, &h->base); h->handshaker = handshaker; h->connector = GRPC_SECURITY_CONNECTOR_REF(connector, "handshake"); diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index 14619d97ca..01cb473177 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -227,7 +227,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) {} + grpc_closure *ignored) {} /* Constructor for channel_data */ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/slice/slice_hash_table.c b/src/core/lib/slice/slice_hash_table.c index 46f807f4a5..219567f36f 100644 --- a/src/core/lib/slice/slice_hash_table.c +++ b/src/core/lib/slice/slice_hash_table.c @@ -82,15 +82,13 @@ static void grpc_slice_hash_table_add( grpc_slice_hash_table* grpc_slice_hash_table_create( size_t num_entries, grpc_slice_hash_table_entry* entries) { - grpc_slice_hash_table* table = gpr_malloc(sizeof(*table)); - memset(table, 0, sizeof(*table)); + grpc_slice_hash_table* table = gpr_zalloc(sizeof(*table)); gpr_ref_init(&table->refs, 1); // Quadratic probing gets best performance when the table is no more // than half full. table->size = num_entries * 2; const size_t entry_size = sizeof(grpc_slice_hash_table_entry) * table->size; - table->entries = gpr_malloc(entry_size); - memset(table->entries, 0, entry_size); + table->entries = gpr_zalloc(entry_size); for (size_t i = 0; i < num_entries; ++i) { grpc_slice_hash_table_entry* entry = &entries[i]; grpc_slice_hash_table_add(table, entry->key, entry->value, entry->vtable); diff --git a/src/core/lib/slice/slice_intern.c b/src/core/lib/slice/slice_intern.c index 32adc4df97..1c00a2d88e 100644 --- a/src/core/lib/slice/slice_intern.c +++ b/src/core/lib/slice/slice_intern.c @@ -144,8 +144,7 @@ static void grow_shard(slice_shard *shard) { GPR_TIMER_BEGIN("grow_strtab", 0); - strtab = gpr_malloc(sizeof(interned_slice_refcount *) * capacity); - memset(strtab, 0, sizeof(interned_slice_refcount *) * capacity); + strtab = gpr_zalloc(sizeof(interned_slice_refcount *) * capacity); for (i = 0; i < shard->capacity; i++) { for (s = shard->strs[i]; s; s = next) { @@ -296,8 +295,7 @@ void grpc_slice_intern_init(void) { gpr_mu_init(&shard->mu); shard->count = 0; shard->capacity = INITIAL_SHARD_CAPACITY; - shard->strs = gpr_malloc(sizeof(*shard->strs) * shard->capacity); - memset(shard->strs, 0, sizeof(*shard->strs) * shard->capacity); + shard->strs = gpr_zalloc(sizeof(*shard->strs) * shard->capacity); } for (size_t i = 0; i < GPR_ARRAY_SIZE(static_metadata_hash); i++) { static_metadata_hash[i].hash = 0; diff --git a/src/core/lib/support/alloc.c b/src/core/lib/support/alloc.c index 618c3f6acd..9973166217 100644 --- a/src/core/lib/support/alloc.c +++ b/src/core/lib/support/alloc.c @@ -36,9 +36,19 @@ #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <stdlib.h> +#include <string.h> #include "src/core/lib/profiling/timers.h" -static gpr_allocation_functions g_alloc_functions = {malloc, realloc, free}; +static void *zalloc_with_calloc(size_t sz) { return calloc(sz, 1); } + +static void *zalloc_with_gpr_malloc(size_t sz) { + void *p = gpr_malloc(sz); + memset(p, 0, sz); + return p; +} + +static gpr_allocation_functions g_alloc_functions = {malloc, zalloc_with_calloc, + realloc, free}; gpr_allocation_functions gpr_get_allocation_functions() { return g_alloc_functions; @@ -48,6 +58,9 @@ void gpr_set_allocation_functions(gpr_allocation_functions functions) { GPR_ASSERT(functions.malloc_fn != NULL); GPR_ASSERT(functions.realloc_fn != NULL); GPR_ASSERT(functions.free_fn != NULL); + if (functions.zalloc_fn == NULL) { + functions.zalloc_fn = zalloc_with_gpr_malloc; + } g_alloc_functions = functions; } @@ -63,6 +76,18 @@ void *gpr_malloc(size_t size) { return p; } +void *gpr_zalloc(size_t size) { + void *p; + if (size == 0) return NULL; + GPR_TIMER_BEGIN("gpr_zalloc", 0); + p = g_alloc_functions.zalloc_fn(size); + if (!p) { + abort(); + } + GPR_TIMER_END("gpr_zalloc", 0); + return p; +} + void gpr_free(void *p) { GPR_TIMER_BEGIN("gpr_free", 0); g_alloc_functions.free_fn(p); diff --git a/src/core/lib/support/arena.c b/src/core/lib/support/arena.c new file mode 100644 index 0000000000..7bcb983f24 --- /dev/null +++ b/src/core/lib/support/arena.c @@ -0,0 +1,98 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/support/arena.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +#define ROUND_UP_TO_ALIGNMENT_SIZE(x) \ + (((x) + GPR_MAX_ALIGNMENT - 1u) & ~(GPR_MAX_ALIGNMENT - 1u)) + +typedef struct zone { + size_t size_begin; + size_t size_end; + gpr_atm next_atm; +} zone; + +struct gpr_arena { + gpr_atm size_so_far; + zone initial_zone; +}; + +gpr_arena *gpr_arena_create(size_t initial_size) { + initial_size = ROUND_UP_TO_ALIGNMENT_SIZE(initial_size); + gpr_arena *a = gpr_zalloc(sizeof(gpr_arena) + initial_size); + a->initial_zone.size_end = initial_size; + return a; +} + +size_t gpr_arena_destroy(gpr_arena *arena) { + gpr_atm size = gpr_atm_no_barrier_load(&arena->size_so_far); + zone *z = (zone *)gpr_atm_no_barrier_load(&arena->initial_zone.next_atm); + gpr_free(arena); + while (z) { + zone *next_z = (zone *)gpr_atm_no_barrier_load(&z->next_atm); + gpr_free(z); + z = next_z; + } + return (size_t)size; +} + +void *gpr_arena_alloc(gpr_arena *arena, size_t size) { + size = ROUND_UP_TO_ALIGNMENT_SIZE(size); + size_t start = + (size_t)gpr_atm_no_barrier_fetch_add(&arena->size_so_far, size); + zone *z = &arena->initial_zone; + while (start > z->size_end) { + zone *next_z = (zone *)gpr_atm_acq_load(&z->next_atm); + if (next_z == NULL) { + size_t next_z_size = (size_t)gpr_atm_no_barrier_load(&arena->size_so_far); + next_z = gpr_zalloc(sizeof(zone) + next_z_size); + next_z->size_begin = z->size_end; + next_z->size_end = z->size_end + next_z_size; + if (!gpr_atm_rel_cas(&z->next_atm, (gpr_atm)NULL, (gpr_atm)next_z)) { + gpr_free(next_z); + next_z = (zone *)gpr_atm_acq_load(&z->next_atm); + } + } + z = next_z; + } + if (start + size > z->size_end) { + return gpr_arena_alloc(arena, size); + } + GPR_ASSERT(start >= z->size_begin); + GPR_ASSERT(start + size <= z->size_end); + return ((char *)(z + 1)) + start - z->size_begin; +} diff --git a/src/core/lib/support/arena.h b/src/core/lib/support/arena.h new file mode 100644 index 0000000000..c28033ffc3 --- /dev/null +++ b/src/core/lib/support/arena.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2017, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// \file Arena based allocator +// Allows very fast allocation of memory, but that memory cannot be freed until +// the arena as a whole is freed +// Tracks the total memory allocated against it, so that future arenas can +// pre-allocate the right amount of memory + +#ifndef GRPC_CORE_LIB_SUPPORT_ARENA_H +#define GRPC_CORE_LIB_SUPPORT_ARENA_H + +#include <stddef.h> + +typedef struct gpr_arena gpr_arena; + +// Create an arena, with \a initial_size bytes in the first allocated buffer +gpr_arena *gpr_arena_create(size_t initial_size); +// Allocate \a size bytes from the arena +void *gpr_arena_alloc(gpr_arena *arena, size_t size); +// Destroy an arena, returning the total number of bytes allocated +size_t gpr_arena_destroy(gpr_arena *arena); + +#endif /* GRPC_CORE_LIB_SUPPORT_ARENA_H */ diff --git a/src/core/lib/support/cmdline.c b/src/core/lib/support/cmdline.c index d47498676d..88a65a8e2e 100644 --- a/src/core/lib/support/cmdline.c +++ b/src/core/lib/support/cmdline.c @@ -71,8 +71,7 @@ struct gpr_cmdline { static int normal_state(gpr_cmdline *cl, char *arg); gpr_cmdline *gpr_cmdline_create(const char *description) { - gpr_cmdline *cl = gpr_malloc(sizeof(gpr_cmdline)); - memset(cl, 0, sizeof(gpr_cmdline)); + gpr_cmdline *cl = gpr_zalloc(sizeof(gpr_cmdline)); cl->description = description; cl->state = normal_state; @@ -101,8 +100,7 @@ static void add_arg(gpr_cmdline *cl, const char *name, const char *help, GPR_ASSERT(0 != strcmp(a->name, name)); } - a = gpr_malloc(sizeof(arg)); - memset(a, 0, sizeof(arg)); + a = gpr_zalloc(sizeof(arg)); a->name = name; a->help = help; a->type = type; diff --git a/src/core/lib/support/histogram.c b/src/core/lib/support/histogram.c index 7b016bbc78..ba8176bb05 100644 --- a/src/core/lib/support/histogram.c +++ b/src/core/lib/support/histogram.c @@ -102,8 +102,7 @@ gpr_histogram *gpr_histogram_create(double resolution, h->num_buckets = bucket_for_unchecked(h, max_bucket_start) + 1; GPR_ASSERT(h->num_buckets > 1); GPR_ASSERT(h->num_buckets < 100000000); - h->buckets = gpr_malloc(sizeof(uint32_t) * h->num_buckets); - memset(h->buckets, 0, sizeof(uint32_t) * h->num_buckets); + h->buckets = gpr_zalloc(sizeof(uint32_t) * h->num_buckets); return h; } diff --git a/src/core/lib/support/spinlock.h b/src/core/lib/support/spinlock.h new file mode 100644 index 0000000000..d8c7c5ffde --- /dev/null +++ b/src/core/lib/support/spinlock.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_SUPPORT_SPINLOCK_H +#define GRPC_CORE_LIB_SUPPORT_SPINLOCK_H + +#include <grpc/support/atm.h> + +/* Simple spinlock. No backoff strategy, gpr_spinlock_lock is almost always + a concurrency code smell. */ +typedef struct { gpr_atm atm; } gpr_spinlock; + +#define GPR_SPINLOCK_INITIALIZER ((gpr_spinlock){0}) +#define GPR_SPINLOCK_STATIC_INITIALIZER \ + { 0 } +#define gpr_spinlock_trylock(lock) (gpr_atm_acq_cas(&(lock)->atm, 0, 1)) +#define gpr_spinlock_unlock(lock) (gpr_atm_rel_store(&(lock)->atm, 0)) +#define gpr_spinlock_lock(lock) \ + do { \ + } while (!gpr_spinlock_trylock((lock))) + +#endif /* GRPC_CORE_LIB_SUPPORT_SPINLOCK_H */ diff --git a/src/core/lib/support/subprocess_posix.c b/src/core/lib/support/subprocess_posix.c index 4247a1c12b..ed653b9c2e 100644 --- a/src/core/lib/support/subprocess_posix.c +++ b/src/core/lib/support/subprocess_posix.c @@ -76,8 +76,7 @@ gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) { _exit(1); return NULL; } else { - r = gpr_malloc(sizeof(gpr_subprocess)); - memset(r, 0, sizeof(*r)); + r = gpr_zalloc(sizeof(gpr_subprocess)); r->pid = pid; return r; } diff --git a/src/core/lib/support/sync.c b/src/core/lib/support/sync.c index 44b83f8175..b52f004f74 100644 --- a/src/core/lib/support/sync.c +++ b/src/core/lib/support/sync.c @@ -37,6 +37,8 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> +#include <assert.h> + /* Number of mutexes to allocate for events, to avoid lock contention. Should be a prime. */ enum { event_sync_partitions = 31 }; @@ -99,8 +101,12 @@ void gpr_ref_init(gpr_refcount *r, int n) { gpr_atm_rel_store(&r->count, n); } void gpr_ref(gpr_refcount *r) { gpr_atm_no_barrier_fetch_add(&r->count, 1); } void gpr_ref_non_zero(gpr_refcount *r) { +#ifndef NDEBUG gpr_atm prior = gpr_atm_no_barrier_fetch_add(&r->count, 1); - GPR_ASSERT(prior > 0); + assert(prior > 0); +#else + gpr_ref(r); +#endif } void gpr_refn(gpr_refcount *r, int n) { @@ -113,6 +119,10 @@ int gpr_unref(gpr_refcount *r) { return prior == 1; } +int gpr_ref_is_unique(gpr_refcount *r) { + return gpr_atm_acq_load(&r->count) == 1; +} + void gpr_stats_init(gpr_stats_counter *c, intptr_t n) { gpr_atm_rel_store(&c->value, n); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index af32c5b378..2c5d8c0ff3 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -51,6 +51,7 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/call.h" @@ -138,14 +139,15 @@ typedef struct batch_control { } batch_control; struct grpc_call { + gpr_arena *arena; grpc_completion_queue *cq; grpc_polling_entity pollent; grpc_channel *channel; grpc_call *parent; grpc_call *first_child; gpr_timespec start_time; - /* TODO(ctiller): share with cq if possible? */ - gpr_mu mu; + /* protects first_child, and child next/prev links */ + gpr_mu child_list_mu; /* client or server call */ bool is_client; @@ -160,7 +162,8 @@ struct grpc_call { bool received_initial_metadata; bool receiving_message; bool requested_final_op; - bool received_final_op; + gpr_atm any_ops_sent_atm; + gpr_atm received_final_op_atm; /* have we received initial metadata */ bool has_initial_md_been_received; @@ -211,6 +214,8 @@ struct grpc_call { grpc_closure receiving_initial_metadata_ready; uint32_t test_only_last_message_flags; + grpc_closure release_call; + union { struct { grpc_status_code *status; @@ -272,10 +277,13 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, grpc_channel_get_channel_stack(args->channel); grpc_call *call; GPR_TIMER_BEGIN("grpc_call_create", 0); - call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size); + gpr_arena *arena = + gpr_arena_create(grpc_channel_get_call_size_estimate(args->channel)); + call = gpr_arena_alloc(arena, + sizeof(grpc_call) + channel_stack->call_stack_size); + call->arena = arena; *out_call = call; - memset(call, 0, sizeof(grpc_call)); - gpr_mu_init(&call->mu); + gpr_mu_init(&call->child_list_mu); call->channel = args->channel; call->cq = args->cq; call->parent = args->parent_call; @@ -313,7 +321,7 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(call->is_client); GPR_ASSERT(!args->parent_call->is_client); - gpr_mu_lock(&args->parent_call->mu); + gpr_mu_lock(&args->parent_call->child_list_mu); if (args->propagation_mask & GRPC_PROPAGATE_DEADLINE) { send_deadline = gpr_time_min( @@ -341,6 +349,10 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, } if (args->propagation_mask & GRPC_PROPAGATE_CANCELLATION) { call->cancellation_is_inherited = 1; + if (gpr_atm_acq_load(&args->parent_call->received_final_op_atm)) { + cancel_with_error(exec_ctx, call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); + } } if (args->parent_call->first_child == NULL) { @@ -353,18 +365,23 @@ grpc_error *grpc_call_create(grpc_exec_ctx *exec_ctx, call; } - gpr_mu_unlock(&args->parent_call->mu); + gpr_mu_unlock(&args->parent_call->child_list_mu); } call->send_deadline = send_deadline; GRPC_CHANNEL_INTERNAL_REF(args->channel, "call"); /* initial refcount dropped by grpc_call_destroy */ + grpc_call_element_args call_args = { + .call_stack = CALL_STACK_FROM_CALL(call), + .server_transport_data = args->server_transport_data, + .context = call->context, + .path = path, + .start_time = call->start_time, + .deadline = send_deadline, + .arena = call->arena}; add_init_error(&error, grpc_call_stack_init(exec_ctx, channel_stack, 1, - destroy_call, call, call->context, - args->server_transport_data, path, - call->start_time, send_deadline, - CALL_STACK_FROM_CALL(call))); + destroy_call, call, &call_args)); if (error != GRPC_ERROR_NONE) { cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE, GRPC_ERROR_REF(error)); @@ -421,6 +438,14 @@ void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c REF_ARG) { GRPC_CALL_STACK_UNREF(exec_ctx, CALL_STACK_FROM_CALL(c), REF_REASON); } +static void release_call(grpc_exec_ctx *exec_ctx, void *call, + grpc_error *error) { + grpc_call *c = call; + grpc_channel *channel = c->channel; + grpc_channel_update_call_size_estimate(channel, gpr_arena_destroy(c->arena)); + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); +} + static void set_status_value_directly(grpc_status_code status, void *dest); static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { @@ -435,7 +460,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, c->receiving_stream); } - gpr_mu_destroy(&c->mu); + gpr_mu_destroy(&c->child_list_mu); for (ii = 0; ii < c->send_extra_metadata_count; ii++) { GRPC_MDELEM_UNREF(exec_ctx, c->send_extra_metadata[ii].md); } @@ -447,7 +472,6 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, if (c->cq) { GRPC_CQ_INTERNAL_UNREF(c->cq, "bind"); } - grpc_channel *channel = c->channel; get_final_status(call, set_status_value_directly, &c->final_info.final_status, NULL); @@ -456,11 +480,12 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, for (i = 0; i < STATUS_SOURCE_COUNT; i++) { GRPC_ERROR_UNREF( - unpack_received_status(gpr_atm_no_barrier_load(&c->status[i])).error); + unpack_received_status(gpr_atm_acq_load(&c->status[i])).error); } - grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, c); - GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "call"); + grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, + grpc_closure_init(&c->release_call, release_call, c, + grpc_schedule_on_exec_ctx)); GPR_TIMER_END("destroy_call", 0); } @@ -473,7 +498,7 @@ void grpc_call_destroy(grpc_call *c) { GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c)); if (parent) { - gpr_mu_lock(&parent->mu); + gpr_mu_lock(&parent->child_list_mu); if (c == parent->first_child) { parent->first_child = c->sibling_next; if (c == parent->first_child) { @@ -482,15 +507,14 @@ void grpc_call_destroy(grpc_call *c) { c->sibling_prev->sibling_next = c->sibling_next; c->sibling_next->sibling_prev = c->sibling_prev; } - gpr_mu_unlock(&parent->mu); + gpr_mu_unlock(&parent->child_list_mu); GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child"); } - gpr_mu_lock(&c->mu); GPR_ASSERT(!c->destroy_called); c->destroy_called = 1; - cancel = !c->received_final_op; - gpr_mu_unlock(&c->mu); + cancel = gpr_atm_acq_load(&c->any_ops_sent_atm) && + !gpr_atm_acq_load(&c->received_final_op_atm); if (cancel) { cancel_with_error(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, GRPC_ERROR_CANCELLED); @@ -555,53 +579,25 @@ grpc_call_error grpc_call_cancel_with_status(grpc_call *c, "c=%p, status=%d, description=%s, reserved=%p)", 4, (c, (int)status, description, reserved)); GPR_ASSERT(reserved == NULL); - gpr_mu_lock(&c->mu); cancel_with_status(&exec_ctx, c, STATUS_FROM_API_OVERRIDE, status, description); - gpr_mu_unlock(&c->mu); grpc_exec_ctx_finish(&exec_ctx); return GRPC_CALL_OK; } -typedef struct termination_closure { - grpc_closure closure; - grpc_call *call; - grpc_transport_stream_op op; -} termination_closure; - -static void done_termination(grpc_exec_ctx *exec_ctx, void *tcp, +static void done_termination(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { - termination_closure *tc = tcp; - GRPC_CALL_INTERNAL_UNREF(exec_ctx, tc->call, "termination"); - gpr_free(tc); -} - -static void send_termination(grpc_exec_ctx *exec_ctx, void *tcp, - grpc_error *error) { - termination_closure *tc = tcp; - memset(&tc->op, 0, sizeof(tc->op)); - tc->op.cancel_error = GRPC_ERROR_REF(error); - /* reuse closure to catch completion */ - tc->op.on_complete = grpc_closure_init(&tc->closure, done_termination, tc, - grpc_schedule_on_exec_ctx); - execute_op(exec_ctx, tc->call, &tc->op); -} - -static void terminate_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, - grpc_error *error) { - termination_closure *tc = gpr_malloc(sizeof(*tc)); - memset(tc, 0, sizeof(*tc)); - tc->call = c; - GRPC_CALL_INTERNAL_REF(tc->call, "termination"); - grpc_closure_sched(exec_ctx, grpc_closure_init(&tc->closure, send_termination, - tc, grpc_schedule_on_exec_ctx), - error); + GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "termination"); } static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, status_source source, grpc_error *error) { + GRPC_CALL_INTERNAL_REF(c, "termination"); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); - terminate_with_error(exec_ctx, c, error); + grpc_transport_stream_op *op = grpc_make_transport_stream_op( + grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx)); + op->cancel_error = error; + execute_op(exec_ctx, c, op); } static grpc_error *error_from_status(grpc_status_code status, @@ -715,9 +711,7 @@ static void set_incoming_compression_algorithm( grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm( grpc_call *call) { grpc_compression_algorithm algorithm; - gpr_mu_lock(&call->mu); algorithm = call->incoming_compression_algorithm; - gpr_mu_unlock(&call->mu); return algorithm; } @@ -729,9 +723,7 @@ static grpc_compression_algorithm compression_algorithm_for_level_locked( uint32_t grpc_call_test_only_get_message_flags(grpc_call *call) { uint32_t flags; - gpr_mu_lock(&call->mu); flags = call->test_only_last_message_flags; - gpr_mu_unlock(&call->mu); return flags; } @@ -785,9 +777,7 @@ static void set_encodings_accepted_by_peer(grpc_exec_ctx *exec_ctx, uint32_t grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) { uint32_t encodings_accepted_by_peer; - gpr_mu_lock(&call->mu); encodings_accepted_by_peer = call->encodings_accepted_by_peer; - gpr_mu_unlock(&call->mu); return encodings_accepted_by_peer; } @@ -1056,7 +1046,7 @@ static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data, } static grpc_error *consolidate_batch_errors(batch_control *bctl) { - size_t n = (size_t)gpr_atm_no_barrier_load(&bctl->num_errors); + size_t n = (size_t)gpr_atm_acq_load(&bctl->num_errors); if (n == 0) { return GRPC_ERROR_NONE; } else if (n == 1) { @@ -1083,8 +1073,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, grpc_call *call = bctl->call; grpc_error *error = consolidate_batch_errors(bctl); - gpr_mu_lock(&call->mu); - if (bctl->send_initial_metadata) { grpc_metadata_batch_destroy( exec_ctx, @@ -1103,20 +1091,23 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */]; recv_trailing_filter(exec_ctx, call, md); - call->received_final_op = true; /* propagate cancellation to any interested children */ + gpr_atm_rel_store(&call->received_final_op_atm, 1); + gpr_mu_lock(&call->child_list_mu); child_call = call->first_child; if (child_call != NULL) { do { next_child_call = child_call->sibling_next; if (child_call->cancellation_is_inherited) { GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel"); - grpc_call_cancel(child_call, NULL); + cancel_with_error(exec_ctx, child_call, STATUS_FROM_API_OVERRIDE, + GRPC_ERROR_CANCELLED); GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel"); } child_call = next_child_call; } while (child_call != call->first_child); } + gpr_mu_unlock(&call->child_list_mu); if (call->is_client) { get_final_status(call, set_status_value_directly, @@ -1130,7 +1121,6 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(error); error = GRPC_ERROR_NONE; } - gpr_mu_unlock(&call->mu); if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ @@ -1221,7 +1211,6 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; - gpr_mu_lock(&bctl->call->mu); if (error != GRPC_ERROR_NONE) { if (call->receiving_stream != NULL) { grpc_byte_stream_destroy(exec_ctx, call->receiving_stream); @@ -1233,11 +1222,9 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } if (call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { - gpr_mu_unlock(&bctl->call->mu); process_data_after_md(exec_ctx, bctlp); } else { call->saved_receiving_stream_ready_bctlp = bctlp; - gpr_mu_unlock(&bctl->call->mu); } } @@ -1296,7 +1283,7 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx, static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_error *error, bool has_cancelled) { if (error == GRPC_ERROR_NONE) return; - int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1); + int idx = (int)gpr_atm_full_fetch_add(&bctl->num_errors, 1); if (idx == 0 && !has_cancelled) { cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE, GRPC_ERROR_REF(error)); @@ -1309,8 +1296,6 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, batch_control *bctl = bctlp; grpc_call *call = bctl->call; - gpr_mu_lock(&call->mu); - add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false); if (error == GRPC_ERROR_NONE) { grpc_metadata_batch *md = @@ -1336,11 +1321,9 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, grpc_schedule_on_exec_ctx); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_closure_sched(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); + grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); } - gpr_mu_unlock(&call->mu); - finish_batch_step(exec_ctx, bctl); } @@ -1386,7 +1369,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, goto done; } - /* TODO(ctiller): this feels like it could be made lock-free */ bctl = allocate_batch_control(call, ops, nops); if (bctl == NULL) { return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS; @@ -1394,7 +1376,6 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, bctl->notify_tag = notify_tag; bctl->is_notify_tag_closure = (uint8_t)(is_notify_tag_closure != 0); - gpr_mu_lock(&call->mu); grpc_transport_stream_op *stream_op = &bctl->op; memset(stream_op, 0, sizeof(*stream_op)); stream_op->covered_by_poller = true; @@ -1680,7 +1661,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; - gpr_mu_unlock(&call->mu); + gpr_atm_rel_store(&call->any_ops_sent_atm, 1); execute_op(exec_ctx, call, stream_op); @@ -1711,7 +1692,6 @@ done_with_error: if (bctl->recv_final_op) { call->requested_final_op = 0; } - gpr_mu_unlock(&call->mu); goto done; } @@ -1760,10 +1740,8 @@ uint8_t grpc_call_is_client(grpc_call *call) { return call->is_client; } grpc_compression_algorithm grpc_call_compression_for_level( grpc_call *call, grpc_compression_level level) { - gpr_mu_lock(&call->mu); grpc_compression_algorithm algo = compression_algorithm_for_level_locked(call, level); - gpr_mu_unlock(&call->mu); return algo; } diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index d6acd392c1..2b700b2f67 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -68,6 +68,8 @@ struct grpc_channel { grpc_compression_options compression_options; grpc_mdelem default_authority; + gpr_atm call_size_estimate; + gpr_mu registered_call_mu; registered_call *registered_calls; @@ -115,6 +117,10 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target, gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; + gpr_atm_no_barrier_store( + &channel->call_size_estimate, + (gpr_atm)CHANNEL_STACK_FROM_CHANNEL(channel)->call_stack_size); + grpc_compression_options_init(&channel->compression_options); for (size_t i = 0; i < args->num_args; i++) { if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY)) { @@ -177,6 +183,32 @@ done: return channel; } +size_t grpc_channel_get_call_size_estimate(grpc_channel *channel) { +#define ROUND_UP_SIZE 256 + return ((size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate) + + ROUND_UP_SIZE) & + ~(size_t)(ROUND_UP_SIZE - 1); +} + +void grpc_channel_update_call_size_estimate(grpc_channel *channel, + size_t size) { + size_t cur = (size_t)gpr_atm_no_barrier_load(&channel->call_size_estimate); + if (cur < size) { + /* size grew: update estimate */ + gpr_atm_no_barrier_cas(&channel->call_size_estimate, (gpr_atm)cur, + (gpr_atm)size); + /* if we lose: never mind, something else will likely update soon enough */ + } else if (cur == size) { + /* no change: holding pattern */ + } else if (cur > 0) { + /* size shrank: decrease estimate */ + gpr_atm_no_barrier_cas( + &channel->call_size_estimate, (gpr_atm)cur, + (gpr_atm)(GPR_MIN(cur - 1, (255 * cur + size) / 256))); + /* if we lose: never mind, something else will likely update soon enough */ + } +} + char *grpc_channel_get_target(grpc_channel *channel) { GRPC_API_TRACE("grpc_channel_get_target(channel=%p)", 1, (channel)); return gpr_strdup(channel->target); diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h index 3a441d7add..c4aebd8b9b 100644 --- a/src/core/lib/surface/channel.h +++ b/src/core/lib/surface/channel.h @@ -66,6 +66,9 @@ grpc_mdelem grpc_channel_get_reffed_status_elem(grpc_exec_ctx *exec_ctx, grpc_channel *channel, int status_code); +size_t grpc_channel_get_call_size_estimate(grpc_channel *channel); +void grpc_channel_update_call_size_estimate(grpc_channel *channel, size_t size); + #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_exec_ctx *exec_ctx, grpc_channel *channel, diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index b80656aa8d..b4594817e4 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -118,7 +118,7 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved)); - cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); + cc = gpr_zalloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 49bc4c114b..9ddb88bd11 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -130,8 +130,8 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { - gpr_free(and_free_memory); + grpc_closure *then_schedule_closure) { + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index fdb230b3e2..1186a4af63 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -898,7 +898,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; @@ -1016,12 +1016,10 @@ void grpc_server_register_non_listening_completion_queue( grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); - grpc_server *server = gpr_malloc(sizeof(grpc_server)); + grpc_server *server = gpr_zalloc(sizeof(grpc_server)); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); - memset(server, 0, sizeof(grpc_server)); - gpr_mu_init(&server->mu_global); gpr_mu_init(&server->mu_call); @@ -1070,8 +1068,7 @@ void *grpc_server_register_method( flags); return NULL; } - m = gpr_malloc(sizeof(registered_method)); - memset(m, 0, sizeof(*m)); + m = gpr_zalloc(sizeof(registered_method)); m->method = gpr_strdup(method); m->host = gpr_strdup(host); m->next = server->registered_methods; @@ -1175,8 +1172,7 @@ void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s, if (num_registered_methods > 0) { slots = 2 * num_registered_methods; alloc = sizeof(channel_registered_method) * slots; - chand->registered_methods = gpr_malloc(alloc); - memset(chand->registered_methods, 0, alloc); + chand->registered_methods = gpr_zalloc(alloc); for (rm = s->registered_methods; rm; rm = rm->next) { grpc_slice host; bool has_host; diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 1143a9e044..ba80bd801e 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -38,4 +38,4 @@ const char *grpc_version_string(void) { return "3.0.0-dev"; } -const char *grpc_g_stands_for(void) { return "green"; } +const char *grpc_g_stands_for(void) { return "gentle"; } diff --git a/src/core/lib/transport/error_utils.c b/src/core/lib/transport/error_utils.c index da77828d9c..ef55e561fb 100644 --- a/src/core/lib/transport/error_utils.c +++ b/src/core/lib/transport/error_utils.c @@ -44,12 +44,12 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error, } if (grpc_error_is_special(error)) return NULL; // Otherwise, search through its children. - intptr_t key = 0; - while (true) { - grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++); - if (child_error == NULL) break; - grpc_error *result = recursively_find_error_with_field(child_error, which); - if (result != NULL) return result; + uint8_t slot = error->first_err; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot); + grpc_error *result = recursively_find_error_with_field(lerr->err, which); + if (result) return result; + slot = lerr->next; } return NULL; } @@ -112,13 +112,13 @@ bool grpc_error_has_clear_grpc_status(grpc_error *error) { if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { return true; } - intptr_t key = 0; - while (true) { - grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++); - if (child_error == NULL) break; - if (grpc_error_has_clear_grpc_status(child_error)) { + uint8_t slot = error->first_err; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot); + if (grpc_error_has_clear_grpc_status(lerr->err)) { return true; } + slot = lerr->next; } return false; } diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c index 489c20cbc8..f2417d8c4e 100644 --- a/src/core/lib/transport/metadata.c +++ b/src/core/lib/transport/metadata.c @@ -130,8 +130,7 @@ void grpc_mdctx_global_init(void) { shard->count = 0; gpr_atm_no_barrier_store(&shard->free_estimate, 0); shard->capacity = INITIAL_SHARD_CAPACITY; - shard->elems = gpr_malloc(sizeof(*shard->elems) * shard->capacity); - memset(shard->elems, 0, sizeof(*shard->elems) * shard->capacity); + shard->elems = gpr_zalloc(sizeof(*shard->elems) * shard->capacity); } } @@ -216,8 +215,7 @@ static void grow_mdtab(mdtab_shard *shard) { GPR_TIMER_BEGIN("grow_mdtab", 0); - mdtab = gpr_malloc(sizeof(interned_metadata *) * capacity); - memset(mdtab, 0, sizeof(interned_metadata *) * capacity); + mdtab = gpr_zalloc(sizeof(interned_metadata *) * capacity); for (i = 0; i < shard->capacity; i++) { for (md = shard->elems[i]; md; md = next) { diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 004e748f25..d56cb31ee0 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -84,6 +84,39 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, } } +#define STREAM_REF_FROM_SLICE_REF(p) \ + ((grpc_stream_refcount *)(((uint8_t *)p) - \ + offsetof(grpc_stream_refcount, slice_refcount))) + +static void slice_stream_ref(void *p) { +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p), "slice"); +#else + grpc_stream_ref(STREAM_REF_FROM_SLICE_REF(p)); +#endif +} + +static void slice_stream_unref(grpc_exec_ctx *exec_ctx, void *p) { +#ifdef GRPC_STREAM_REFCOUNT_DEBUG + grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p), "slice"); +#else + grpc_stream_unref(exec_ctx, STREAM_REF_FROM_SLICE_REF(p)); +#endif +} + +grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount, + void *buffer, size_t length) { + slice_stream_ref(&refcount->slice_refcount); + return (grpc_slice){.refcount = &refcount->slice_refcount, + .data.refcounted = {.bytes = buffer, .length = length}}; +} + +static const grpc_slice_refcount_vtable stream_ref_slice_vtable = { + .ref = slice_stream_ref, + .unref = slice_stream_unref, + .eq = grpc_slice_default_eq_impl, + .hash = grpc_slice_default_hash_impl}; + #ifdef GRPC_STREAM_REFCOUNT_DEBUG void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, grpc_iomgr_cb_func cb, void *cb_arg, @@ -95,6 +128,8 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, #endif gpr_ref_init(&refcount->refs, initial_refs); grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx); + refcount->slice_refcount.vtable = &stream_ref_slice_vtable; + refcount->slice_refcount.sub_refcount = &refcount->slice_refcount; } static void move64(uint64_t *from, uint64_t *to) { @@ -127,9 +162,9 @@ void grpc_transport_destroy(grpc_exec_ctx *exec_ctx, int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { return transport->vtable->init_stream(exec_ctx, transport, stream, refcount, - server_data); + server_data, arena); } void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx, @@ -162,9 +197,10 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream, void *and_free_memory) { + grpc_stream *stream, + grpc_closure *then_schedule_closure) { transport->vtable->destroy_stream(exec_ctx, transport, stream, - and_free_memory); + then_schedule_closure); } char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, @@ -219,8 +255,9 @@ typedef struct { static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { made_transport_stream_op *op = arg; - grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error)); + grpc_closure *c = op->inner_on_complete; gpr_free(op); + grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error)); } grpc_transport_stream_op *grpc_make_transport_stream_op( diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h index e56bf2780a..950b18aeda 100644 --- a/src/core/lib/transport/transport.h +++ b/src/core/lib/transport/transport.h @@ -41,6 +41,7 @@ #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/byte_stream.h" #include "src/core/lib/transport/metadata_batch.h" @@ -64,6 +65,7 @@ typedef struct grpc_stream_refcount { #ifdef GRPC_STREAM_REFCOUNT_DEBUG const char *object_type; #endif + grpc_slice_refcount slice_refcount; } grpc_stream_refcount; #ifdef GRPC_STREAM_REFCOUNT_DEBUG @@ -84,6 +86,11 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount); grpc_stream_ref_init(rc, ir, cb, cb_arg) #endif +/* Wrap a buffer that is owned by some stream object into a slice that shares + the same refcount */ +grpc_slice grpc_slice_from_stream_owned_buffer(grpc_stream_refcount *refcount, + void *buffer, size_t length); + typedef struct { uint64_t framing_bytes; uint64_t data_bytes; @@ -213,6 +220,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport); /* Initialize transport data for a stream. Returns 0 on success, any other (transport-defined) value for failure. + May assume that stream contains all-zeros. Arguments: transport - the transport on which to create this stream @@ -222,7 +230,7 @@ size_t grpc_transport_stream_size(grpc_transport *transport); int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data); + const void *server_data, gpr_arena *arena); void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, grpc_stream *stream, grpc_polling_entity *pollent); @@ -239,7 +247,8 @@ void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport, caller, but any child memory must be cleaned up) */ void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *transport, - grpc_stream *stream, void *and_free_memory); + grpc_stream *stream, + grpc_closure *then_schedule_closure); void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h index 8553148c35..6f688bf8d2 100644 --- a/src/core/lib/transport/transport_impl.h +++ b/src/core/lib/transport/transport_impl.h @@ -47,7 +47,7 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_init_stream */ int (*init_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, grpc_stream *stream, grpc_stream_refcount *refcount, - const void *server_data); + const void *server_data, gpr_arena *arena); /* implementation of grpc_transport_set_pollset */ void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_transport *self, @@ -67,7 +67,8 @@ typedef struct grpc_transport_vtable { /* implementation of grpc_transport_destroy_stream */ void (*destroy_stream)(grpc_exec_ctx *exec_ctx, grpc_transport *self, - grpc_stream *stream, void *and_free_memory); + grpc_stream *stream, + grpc_closure *then_schedule_closure); /* implementation of grpc_transport_destroy */ void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_transport *self); diff --git a/src/core/lib/tsi/fake_transport_security.c b/src/core/lib/tsi/fake_transport_security.c index 0e20d6fd71..bbe323df3b 100644 --- a/src/core/lib/tsi/fake_transport_security.c +++ b/src/core/lib/tsi/fake_transport_security.c @@ -502,8 +502,7 @@ static const tsi_handshaker_vtable handshaker_vtable = { }; tsi_handshaker *tsi_create_fake_handshaker(int is_client) { - tsi_fake_handshaker *impl = gpr_malloc(sizeof(*impl)); - memset(impl, 0, sizeof(*impl)); + tsi_fake_handshaker *impl = gpr_zalloc(sizeof(*impl)); impl->base.vtable = &handshaker_vtable; impl->is_client = is_client; impl->result = TSI_HANDSHAKE_IN_PROGRESS; @@ -519,8 +518,7 @@ tsi_handshaker *tsi_create_fake_handshaker(int is_client) { tsi_frame_protector *tsi_create_fake_protector( size_t *max_protected_frame_size) { - tsi_fake_frame_protector *impl = gpr_malloc(sizeof(*impl)); - memset(impl, 0, sizeof(*impl)); + tsi_fake_frame_protector *impl = gpr_zalloc(sizeof(*impl)); impl->max_frame_size = (max_protected_frame_size == NULL) ? TSI_FAKE_DEFAULT_FRAME_SIZE : *max_protected_frame_size; diff --git a/src/core/lib/tsi/ssl_transport_security.c b/src/core/lib/tsi/ssl_transport_security.c index 366dca9507..53aabdb926 100644 --- a/src/core/lib/tsi/ssl_transport_security.c +++ b/src/core/lib/tsi/ssl_transport_security.c @@ -976,9 +976,7 @@ static tsi_result ssl_handshaker_extract_peer(tsi_handshaker *self, if (alpn_selected != NULL) { size_t i; tsi_peer_property *new_properties = - gpr_malloc(sizeof(*new_properties) * (peer->property_count + 1)); - memset(new_properties, 0, - sizeof(*new_properties) * (peer->property_count + 1)); + gpr_zalloc(sizeof(*new_properties) * (peer->property_count + 1)); for (i = 0; i < peer->property_count; i++) { new_properties[i] = peer->properties[i]; } @@ -1002,8 +1000,7 @@ static tsi_result ssl_handshaker_create_frame_protector( size_t actual_max_output_protected_frame_size = TSI_SSL_MAX_PROTECTED_FRAME_SIZE_UPPER_BOUND; tsi_ssl_handshaker *impl = (tsi_ssl_handshaker *)self; - tsi_ssl_frame_protector *protector_impl = gpr_malloc(sizeof(*protector_impl)); - memset(protector_impl, 0, sizeof(*protector_impl)); + tsi_ssl_frame_protector *protector_impl = gpr_zalloc(sizeof(*protector_impl)); if (max_output_protected_frame_size != NULL) { if (*max_output_protected_frame_size > @@ -1119,8 +1116,7 @@ static tsi_result create_tsi_ssl_handshaker(SSL_CTX *ctx, int is_client, SSL_set_accept_state(ssl); } - impl = gpr_malloc(sizeof(*impl)); - memset(impl, 0, sizeof(*impl)); + impl = gpr_zalloc(sizeof(*impl)); impl->ssl = ssl; impl->into_ssl = into_ssl; impl->from_ssl = from_ssl; @@ -1338,8 +1334,7 @@ tsi_result tsi_create_ssl_client_handshaker_factory( return TSI_INVALID_ARGUMENT; } - impl = gpr_malloc(sizeof(*impl)); - memset(impl, 0, sizeof(*impl)); + impl = gpr_zalloc(sizeof(*impl)); impl->ssl_context = ssl_context; do { @@ -1433,17 +1428,13 @@ tsi_result tsi_create_ssl_server_handshaker_factory_ex( return TSI_INVALID_ARGUMENT; } - impl = gpr_malloc(sizeof(*impl)); - memset(impl, 0, sizeof(*impl)); + impl = gpr_zalloc(sizeof(*impl)); impl->base.create_handshaker = ssl_server_handshaker_factory_create_handshaker; impl->base.destroy = ssl_server_handshaker_factory_destroy; - impl->ssl_contexts = gpr_malloc(key_cert_pair_count * sizeof(SSL_CTX *)); - memset(impl->ssl_contexts, 0, key_cert_pair_count * sizeof(SSL_CTX *)); + impl->ssl_contexts = gpr_zalloc(key_cert_pair_count * sizeof(SSL_CTX *)); impl->ssl_context_x509_subject_names = - gpr_malloc(key_cert_pair_count * sizeof(tsi_peer)); - memset(impl->ssl_context_x509_subject_names, 0, - key_cert_pair_count * sizeof(tsi_peer)); + gpr_zalloc(key_cert_pair_count * sizeof(tsi_peer)); if (impl->ssl_contexts == NULL || impl->ssl_context_x509_subject_names == NULL) { tsi_ssl_handshaker_factory_destroy(&impl->base); diff --git a/src/core/lib/tsi/test_creds/BUILD b/src/core/lib/tsi/test_creds/BUILD new file mode 100644 index 0000000000..5cf04caf17 --- /dev/null +++ b/src/core/lib/tsi/test_creds/BUILD @@ -0,0 +1,36 @@ +# Copyright 2017, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +licenses(["notice"]) # 3-clause BSD + +exports_files([ + "ca.pem", + "server1.key", + "server1.pem", +]) diff --git a/src/core/lib/tsi/transport_security.c b/src/core/lib/tsi/transport_security.c index 830cf09584..2cbf381c88 100644 --- a/src/core/lib/tsi/transport_security.c +++ b/src/core/lib/tsi/transport_security.c @@ -231,8 +231,7 @@ tsi_result tsi_construct_allocated_string_peer_property( *property = tsi_init_peer_property(); if (name != NULL) property->name = gpr_strdup(name); if (value_length > 0) { - property->value.data = gpr_malloc(value_length); - memset(property->value.data, 0, value_length); + property->value.data = gpr_zalloc(value_length); property->value.length = value_length; } return TSI_OK; @@ -260,8 +259,7 @@ tsi_result tsi_construct_string_peer_property(const char *name, tsi_result tsi_construct_peer(size_t property_count, tsi_peer *peer) { memset(peer, 0, sizeof(tsi_peer)); if (property_count > 0) { - peer->properties = gpr_malloc(property_count * sizeof(tsi_peer_property)); - memset(peer->properties, 0, property_count * sizeof(tsi_peer_property)); + peer->properties = gpr_zalloc(property_count * sizeof(tsi_peer_property)); peer->property_count = property_count; } return TSI_OK; |