diff options
author | 2017-03-12 18:33:47 -0700 | |
---|---|---|
committer | 2017-03-12 18:33:47 -0700 | |
commit | 43be708bb2a7b3c7dce8cc25059be3fce7228d66 (patch) | |
tree | 82217258fbf81514d5bf13a73d6501eb8fbde363 /src/core | |
parent | 39a797387f1a7bbfd5cad235a90d1059bfddfdba (diff) | |
parent | 0b7bd20de4f0d219c399ef8c01c05026bf12da5d (diff) |
Merge remote-tracking branch 'upstream/master' into fix-flush-read
Diffstat (limited to 'src/core')
23 files changed, 477 insertions, 223 deletions
diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 6f9df3e386..28d3b63f99 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -64,7 +64,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, } } char *default_authority = grpc_get_default_authority( - grpc_channel_stack_builder_get_target(builder)); + exec_ctx, grpc_channel_stack_builder_get_target(builder)); if (default_authority != NULL) { grpc_arg arg; arg.type = GRPC_ARG_STRING; diff --git a/src/core/ext/client_channel/http_proxy.c b/src/core/ext/client_channel/http_proxy.c index bbe4ff550c..e280cef101 100644 --- a/src/core/ext/client_channel/http_proxy.c +++ b/src/core/ext/client_channel/http_proxy.c @@ -46,10 +46,11 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/support/env.h" -static char* grpc_get_http_proxy_server() { +static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) { char* uri_str = gpr_getenv("http_proxy"); if (uri_str == NULL) return NULL; - grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */); + grpc_uri* uri = + grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */); char* proxy_name = NULL; if (uri == NULL || uri->authority == NULL) { gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var"); @@ -76,9 +77,10 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx, const grpc_channel_args* args, char** name_to_resolve, grpc_channel_args** new_args) { - *name_to_resolve = grpc_get_http_proxy_server(); + *name_to_resolve = grpc_get_http_proxy_server(exec_ctx); if (*name_to_resolve == NULL) return false; - grpc_uri* uri = grpc_uri_parse(server_uri, false /* suppress_errors */); + grpc_uri* uri = + grpc_uri_parse(exec_ctx, server_uri, false /* suppress_errors */); if (uri == NULL || uri->path[0] == '\0') { gpr_log(GPR_ERROR, "'http_proxy' environment variable set, but cannot " diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c index 8e4da03de0..8ae15fc72b 100644 --- a/src/core/ext/client_channel/parse_address.c +++ b/src/core/ext/client_channel/parse_address.c @@ -44,6 +44,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/lib/support/string.h" #ifdef GRPC_HAVE_UNIX_SOCKET @@ -120,9 +121,30 @@ int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) { memset(in6, 0, sizeof(*in6)); resolved_addr->len = sizeof(*in6); in6->sin6_family = AF_INET6; - if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); - goto done; + + /* Handle the RFC6874 syntax for IPv6 zone identifiers. */ + char *host_end = (char *)gpr_memrchr(host, '%', strlen(host)); + if (host_end != NULL) { + GPR_ASSERT(host_end >= host); + char host_without_scope[INET6_ADDRSTRLEN]; + size_t host_without_scope_len = (size_t)(host_end - host); + strncpy(host_without_scope, host, host_without_scope_len); + host_without_scope[host_without_scope_len] = '\0'; + if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope); + goto done; + } + if (gpr_parse_bytes_to_uint32(host_end + 1, + strlen(host) - host_without_scope_len - 1, + &in6->sin6_scope_id) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1); + goto done; + } + } else { + if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); + goto done; + } } if (port != NULL) { diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index f8e8bc9c39..3c5a6fb3ff 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -108,22 +108,23 @@ static grpc_resolver_factory *lookup_factory_by_uri(grpc_uri *uri) { return lookup_factory(uri->scheme); } -static grpc_resolver_factory *resolve_factory(const char *target, +static grpc_resolver_factory *resolve_factory(grpc_exec_ctx *exec_ctx, + const char *target, grpc_uri **uri, char **canonical_target) { grpc_resolver_factory *factory = NULL; GPR_ASSERT(uri != NULL); - *uri = grpc_uri_parse(target, 1); + *uri = grpc_uri_parse(exec_ctx, target, 1); factory = lookup_factory_by_uri(*uri); if (factory == NULL) { grpc_uri_destroy(*uri); gpr_asprintf(canonical_target, "%s%s", g_default_resolver_prefix, target); - *uri = grpc_uri_parse(*canonical_target, 1); + *uri = grpc_uri_parse(exec_ctx, *canonical_target, 1); factory = lookup_factory_by_uri(*uri); if (factory == NULL) { - grpc_uri_destroy(grpc_uri_parse(target, 0)); - grpc_uri_destroy(grpc_uri_parse(*canonical_target, 0)); + grpc_uri_destroy(grpc_uri_parse(exec_ctx, target, 0)); + grpc_uri_destroy(grpc_uri_parse(exec_ctx, *canonical_target, 0)); gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, *canonical_target); } @@ -138,7 +139,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = - resolve_factory(target, &uri, &canonical_target); + resolve_factory(exec_ctx, target, &uri, &canonical_target); grpc_resolver *resolver; grpc_resolver_args resolver_args; memset(&resolver_args, 0, sizeof(resolver_args)); @@ -153,21 +154,22 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, return resolver; } -char *grpc_get_default_authority(const char *target) { +char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target) { grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = - resolve_factory(target, &uri, &canonical_target); + resolve_factory(exec_ctx, target, &uri, &canonical_target); char *authority = grpc_resolver_factory_get_default_authority(factory, uri); grpc_uri_destroy(uri); gpr_free(canonical_target); return authority; } -char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target) { +char *grpc_resolver_factory_add_default_prefix_if_needed( + grpc_exec_ctx *exec_ctx, const char *target) { grpc_uri *uri = NULL; char *canonical_target = NULL; - resolve_factory(target, &uri, &canonical_target); + resolve_factory(exec_ctx, target, &uri, &canonical_target); grpc_uri_destroy(uri); return canonical_target == NULL ? gpr_strdup(target) : canonical_target; } diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h index e2c189cf0c..1a3ebee25a 100644 --- a/src/core/ext/client_channel/resolver_registry.h +++ b/src/core/ext/client_channel/resolver_registry.h @@ -74,10 +74,11 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name); /** Given a target, return a (freshly allocated with gpr_malloc) string representing the default authority to pass from a client. */ -char *grpc_get_default_authority(const char *target); +char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target); /** Returns a newly allocated string containing \a target, adding the default prefix if needed. */ -char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target); +char *grpc_resolver_factory_add_default_prefix_if_needed( + grpc_exec_ctx *exec_ctx, const char *target); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index cb2d2c821d..5df0a9060d 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -331,7 +331,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address *addr = gpr_malloc(sizeof(*addr)); - grpc_get_subchannel_address_arg(args->args, addr); + grpc_get_subchannel_address_arg(exec_ctx, args->args, addr); grpc_set_initial_connect_string(&addr, &c->initial_connect_string); grpc_resolved_address *new_address = NULL; grpc_channel_args *new_args = NULL; @@ -787,9 +787,9 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack( return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); } -static void grpc_uri_to_sockaddr(const char *uri_str, +static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str, grpc_resolved_address *addr) { - grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); + grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */); GPR_ASSERT(uri != NULL); if (strcmp(uri->scheme, "ipv4") == 0) { GPR_ASSERT(parse_ipv4(uri, addr)); @@ -801,12 +801,13 @@ static void grpc_uri_to_sockaddr(const char *uri_str, grpc_uri_destroy(uri); } -void grpc_get_subchannel_address_arg(const grpc_channel_args *args, +void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx, + const grpc_channel_args *args, grpc_resolved_address *addr) { const char *addr_uri_str = grpc_get_subchannel_address_uri_arg(args); memset(addr, 0, sizeof(*addr)); if (*addr_uri_str != '\0') { - grpc_uri_to_sockaddr(addr_uri_str, addr); + grpc_uri_to_sockaddr(exec_ctx, addr_uri_str, addr); } } diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 26ce954487..6a70a76467 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -175,7 +175,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, const grpc_subchannel_args *args); /// Sets \a addr from \a args. -void grpc_get_subchannel_address_arg(const grpc_channel_args *args, +void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx, + const grpc_channel_args *args, grpc_resolved_address *addr); /// Returns the URI string for the address to connect to. diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c index 7dd7b753cc..d385db0801 100644 --- a/src/core/ext/client_channel/uri_parser.c +++ b/src/core/ext/client_channel/uri_parser.c @@ -35,13 +35,15 @@ #include <string.h> -#include <grpc/slice.h> #include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/lib/slice/percent_encoding.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" /** a size_t default value... maps to all 1's */ @@ -68,11 +70,16 @@ static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section, return NULL; } -/** Returns a copy of \a src[begin, end) */ -static char *copy_component(const char *src, size_t begin, size_t end) { - char *out = gpr_malloc(end - begin + 1); - memcpy(out, src + begin, end - begin); - out[end - begin] = 0; +/** Returns a copy of percent decoded \a src[begin, end) */ +static char *decode_and_copy_component(grpc_exec_ctx *exec_ctx, const char *src, + size_t begin, size_t end) { + grpc_slice component = + grpc_slice_from_copied_buffer(src + begin, end - begin); + grpc_slice decoded_component = + grpc_permissive_percent_decode_slice(component); + char *out = grpc_dump_slice(decoded_component, GPR_DUMP_ASCII); + grpc_slice_unref_internal(exec_ctx, component); + grpc_slice_unref_internal(exec_ctx, decoded_component); return out; } @@ -175,7 +182,8 @@ static void parse_query_parts(grpc_uri *uri) { } } -grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { +grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text, + int suppress_errors) { grpc_uri *uri; size_t scheme_begin = 0; size_t scheme_end = NOT_SET; @@ -263,11 +271,16 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { } uri = gpr_zalloc(sizeof(*uri)); - uri->scheme = copy_component(uri_text, scheme_begin, scheme_end); - uri->authority = copy_component(uri_text, authority_begin, authority_end); - uri->path = copy_component(uri_text, path_begin, path_end); - uri->query = copy_component(uri_text, query_begin, query_end); - uri->fragment = copy_component(uri_text, fragment_begin, fragment_end); + uri->scheme = + decode_and_copy_component(exec_ctx, uri_text, scheme_begin, scheme_end); + uri->authority = decode_and_copy_component(exec_ctx, uri_text, + authority_begin, authority_end); + uri->path = + decode_and_copy_component(exec_ctx, uri_text, path_begin, path_end); + uri->query = + decode_and_copy_component(exec_ctx, uri_text, query_begin, query_end); + uri->fragment = decode_and_copy_component(exec_ctx, uri_text, fragment_begin, + fragment_end); parse_query_parts(uri); return uri; diff --git a/src/core/ext/client_channel/uri_parser.h b/src/core/ext/client_channel/uri_parser.h index 5fe0e8f35e..efd4302c1c 100644 --- a/src/core/ext/client_channel/uri_parser.h +++ b/src/core/ext/client_channel/uri_parser.h @@ -35,6 +35,7 @@ #define GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H #include <stddef.h> +#include "src/core/lib/iomgr/exec_ctx.h" typedef struct { char *scheme; @@ -51,7 +52,8 @@ typedef struct { } grpc_uri; /** parse a uri, return NULL on failure */ -grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors); +grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text, + int suppress_errors); /** return the part of a query string after the '=' in "?key=xxx&...", or NULL * if key is not present */ diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index aea0fcc33d..d612591f2e 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -861,7 +861,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); GPR_ASSERT(arg != NULL); GPR_ASSERT(arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(arg->value.string, true); + grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); glb_policy->server_name = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index fc5e17d8fc..eae0145ecc 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -226,7 +226,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, grpc_closure *notify) { chttp2_connector *c = (chttp2_connector *)con; grpc_resolved_address addr; - grpc_get_subchannel_address_arg(args->channel_args, &addr); + grpc_get_subchannel_address_arg(exec_ctx, args->channel_args, &addr); gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == NULL); c->notify = notify; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 286232f277..067ac35a5a 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -72,7 +72,8 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); const char *to_remove[] = {GRPC_ARG_SERVER_URI}; grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 825db68c65..f0c241d68e 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -83,7 +83,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *server_uri_str = server_uri_arg->value.string; GPR_ASSERT(server_uri_str != NULL); grpc_uri *server_uri = - grpc_uri_parse(server_uri_str, true /* supress errors */); + grpc_uri_parse(exec_ctx, server_uri_str, true /* supress errors */); GPR_ASSERT(server_uri != NULL); const char *server_uri_path; server_uri_path = @@ -96,7 +96,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *target_uri_str = grpc_get_subchannel_address_uri_arg(args->args); grpc_uri *target_uri = - grpc_uri_parse(target_uri_str, false /* suppress errors */); + grpc_uri_parse(exec_ctx, target_uri_str, false /* suppress errors */); GPR_ASSERT(target_uri != NULL); if (target_uri->path[0] != '\0') { // "path" may be empty const grpc_slice key = grpc_slice_from_static_string( @@ -181,7 +181,8 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); const char *to_remove[] = {GRPC_ARG_SERVER_URI}; grpc_channel_args *new_args = grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 7ed00522c3..e7f2597f89 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -381,16 +381,38 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, s->incoming_window_delta + t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { - char *msg; - gpr_asprintf(&msg, - "frame of size %d overflows incoming window of %" PRId64, - t->incoming_frame_size, - s->incoming_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); - grpc_error *err = GRPC_ERROR_CREATE(msg); - gpr_free(msg); - return err; + if (incoming_frame_size <= + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { + gpr_log( + GPR_ERROR, + "Incoming frame of size %d exceeds incoming window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to " + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + } else { + char *msg; + gpr_asprintf(&msg, + "frame of size %d overflows incoming window of %" PRId64, + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + grpc_error *err = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + return err; + } } GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index dbe5b139f9..7cdbe30198 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -35,6 +35,7 @@ #include <string.h> +#include <grpc/slice.h> #include <grpc/status.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -47,46 +48,7 @@ #include "src/core/lib/iomgr/error_internal.h" #include "src/core/lib/profiling/timers.h" - -static void destroy_integer(void *key) {} - -static void *copy_integer(void *key) { return key; } - -static long compare_integers(void *key1, void *key2) { - return GPR_ICMP((uintptr_t)key1, (uintptr_t)key2); -} - -static void destroy_string(void *str) { gpr_free(str); } - -static void *copy_string(void *str) { return gpr_strdup(str); } - -static void destroy_err(void *err) { GRPC_ERROR_UNREF(err); } - -static void *copy_err(void *err) { return GRPC_ERROR_REF(err); } - -static void destroy_time(void *tm) { gpr_free(tm); } - -static gpr_timespec *box_time(gpr_timespec tm) { - gpr_timespec *out = gpr_malloc(sizeof(*out)); - *out = tm; - return out; -} - -static void *copy_time(void *tm) { return box_time(*(gpr_timespec *)tm); } - -static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer, - compare_integers, - destroy_integer, copy_integer}; - -static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer, - compare_integers, destroy_string, - copy_string}; - -static const gpr_avl_vtable avl_vtable_times = { - destroy_integer, copy_integer, compare_integers, destroy_time, copy_time}; - -static const gpr_avl_vtable avl_vtable_errs = { - destroy_integer, copy_integer, compare_integers, destroy_err, copy_err}; +#include "src/core/lib/slice/slice_internal.h" static const char *error_int_name(grpc_error_ints key) { switch (key) { @@ -120,6 +82,8 @@ static const char *error_int_name(grpc_error_ints key) { return "limit"; case GRPC_ERROR_INT_OCCURRED_DURING_WRITE: return "occurred_during_write"; + case GRPC_ERROR_INT_MAX: + GPR_UNREACHABLE_CODE(return "unknown"); } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -150,6 +114,8 @@ static const char *error_str_name(grpc_error_strs key) { return "filename"; case GRPC_ERROR_STR_QUEUED_BUFFERS: return "queued_buffers"; + case GRPC_ERROR_STR_MAX: + GPR_UNREACHABLE_CODE(return "unknown"); } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -158,6 +124,8 @@ static const char *error_time_name(grpc_error_times key) { switch (key) { case GRPC_ERROR_TIME_CREATED: return "created"; + case GRPC_ERROR_TIME_MAX: + GPR_UNREACHABLE_CODE(return "unknown"); } GPR_UNREACHABLE_CODE(return "unknown"); } @@ -184,12 +152,36 @@ grpc_error *grpc_error_ref(grpc_error *err) { } #endif +static void unref_errs(grpc_error *err) { + uint8_t slot = err->first_err; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot); + GRPC_ERROR_UNREF(lerr->err); + GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX + : lerr->next != UINT8_MAX); + slot = lerr->next; + } +} + +static void unref_slice(grpc_slice slice) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_slice_unref_internal(&exec_ctx, slice); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void unref_strs(grpc_error *err) { + for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) { + uint8_t slot = err->strs[which]; + if (slot != UINT8_MAX) { + unref_slice(*(grpc_slice *)(err->arena + slot)); + } + } +} + static void error_destroy(grpc_error *err) { GPR_ASSERT(!grpc_error_is_special(err)); - gpr_avl_unref(err->ints); - gpr_avl_unref(err->strs); - gpr_avl_unref(err->errs); - gpr_avl_unref(err->times); + unref_errs(err); + unref_strs(err); gpr_free((void *)gpr_atm_acq_load(&err->error_string)); gpr_free(err); } @@ -213,67 +205,189 @@ void grpc_error_unref(grpc_error *err) { } #endif +static uint8_t get_placement(grpc_error **err, size_t size) { + GPR_ASSERT(*err); + uint8_t slots = (uint8_t)(size / sizeof(intptr_t)); + if ((*err)->arena_size + slots > (*err)->arena_capacity) { + (*err)->arena_capacity = (uint8_t)(3 * (*err)->arena_capacity / 2); + *err = gpr_realloc( + *err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t)); + } + uint8_t placement = (*err)->arena_size; + (*err)->arena_size = (uint8_t)((*err)->arena_size + slots); + return placement; +} + +static void internal_set_int(grpc_error **err, grpc_error_ints which, + intptr_t value) { + // GPR_ASSERT((*err)->ints[which] == UINT8_MAX); // TODO, enforce this + uint8_t slot = (*err)->ints[which]; + if (slot == UINT8_MAX) { + slot = get_placement(err, sizeof(value)); + } + (*err)->ints[which] = slot; + (*err)->arena[slot] = value; +} + +static void internal_set_str(grpc_error **err, grpc_error_strs which, + grpc_slice value) { + // GPR_ASSERT((*err)->strs[which] == UINT8_MAX); // TODO, enforce this + uint8_t slot = (*err)->strs[which]; + if (slot == UINT8_MAX) { + slot = get_placement(err, sizeof(value)); + } else { + unref_slice(*(grpc_slice *)((*err)->arena + slot)); + } + (*err)->strs[which] = slot; + memcpy((*err)->arena + slot, &value, sizeof(value)); +} + +static void internal_set_time(grpc_error **err, grpc_error_times which, + gpr_timespec value) { + // GPR_ASSERT((*err)->times[which] == UINT8_MAX); // TODO, enforce this + uint8_t slot = (*err)->times[which]; + if (slot == UINT8_MAX) { + slot = get_placement(err, sizeof(value)); + } + (*err)->times[which] = slot; + memcpy((*err)->arena + slot, &value, sizeof(value)); +} + +static void internal_add_error(grpc_error **err, grpc_error *new) { + grpc_linked_error new_last = {new, UINT8_MAX}; + uint8_t slot = get_placement(err, sizeof(grpc_linked_error)); + if ((*err)->first_err == UINT8_MAX) { + GPR_ASSERT((*err)->last_err == UINT8_MAX); + (*err)->last_err = slot; + (*err)->first_err = slot; + } else { + GPR_ASSERT((*err)->last_err != UINT8_MAX); + grpc_linked_error *old_last = + (grpc_linked_error *)((*err)->arena + (*err)->last_err); + old_last->next = slot; + (*err)->last_err = slot; + } + memcpy((*err)->arena + slot, &new_last, sizeof(grpc_linked_error)); +} + +#define SLOTS_PER_INT (sizeof(intptr_t) / sizeof(intptr_t)) +#define SLOTS_PER_STR (sizeof(grpc_slice) / sizeof(intptr_t)) +#define SLOTS_PER_TIME (sizeof(gpr_timespec) / sizeof(intptr_t)) +#define SLOTS_PER_LINKED_ERROR (sizeof(grpc_linked_error) / sizeof(intptr_t)) + +// size of storing one int and two slices and a timespec. For line, desc, file, +// and time created +#define DEFAULT_ERROR_CAPACITY \ + (SLOTS_PER_INT + (SLOTS_PER_STR * 2) + SLOTS_PER_TIME) + +// It is very common to include and extra int and string in an error +#define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME) + grpc_error *grpc_error_create(const char *file, int line, const char *desc, grpc_error **referencing, size_t num_referencing) { GPR_TIMER_BEGIN("grpc_error_create", 0); - grpc_error *err = gpr_malloc(sizeof(*err)); + uint8_t initial_arena_capacity = (uint8_t)( + DEFAULT_ERROR_CAPACITY + + (uint8_t)(num_referencing * SLOTS_PER_LINKED_ERROR) + SURPLUS_CAPACITY); + grpc_error *err = + gpr_malloc(sizeof(*err) + initial_arena_capacity * sizeof(intptr_t)); if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL return GRPC_ERROR_OOM; } #ifdef GRPC_ERROR_REFCOUNT_DEBUG gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line); #endif - err->ints = gpr_avl_add(gpr_avl_create(&avl_vtable_ints), - (void *)(uintptr_t)GRPC_ERROR_INT_FILE_LINE, - (void *)(uintptr_t)line); - err->strs = gpr_avl_add( - gpr_avl_add(gpr_avl_create(&avl_vtable_strs), - (void *)(uintptr_t)GRPC_ERROR_STR_FILE, gpr_strdup(file)), - (void *)(uintptr_t)GRPC_ERROR_STR_DESCRIPTION, gpr_strdup(desc)); - err->errs = gpr_avl_create(&avl_vtable_errs); - err->next_err = 0; - for (size_t i = 0; i < num_referencing; i++) { + + err->arena_size = 0; + err->arena_capacity = initial_arena_capacity; + err->first_err = UINT8_MAX; + err->last_err = UINT8_MAX; + + memset(err->ints, UINT8_MAX, GRPC_ERROR_INT_MAX); + memset(err->strs, UINT8_MAX, GRPC_ERROR_STR_MAX); + memset(err->times, UINT8_MAX, GRPC_ERROR_TIME_MAX); + + internal_set_int(&err, GRPC_ERROR_INT_FILE_LINE, line); + internal_set_str(&err, GRPC_ERROR_STR_FILE, + grpc_slice_from_static_string(file)); + internal_set_str( + &err, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_copied_buffer( + desc, + strlen(desc) + + 1)); // TODO, pull this up. // TODO(ncteisen), pull this up. + + for (size_t i = 0; i < num_referencing; ++i) { if (referencing[i] == GRPC_ERROR_NONE) continue; - err->errs = gpr_avl_add(err->errs, (void *)(err->next_err++), - GRPC_ERROR_REF(referencing[i])); + internal_add_error( + &err, + GRPC_ERROR_REF( + referencing[i])); // TODO(ncteisen), change ownership semantics } - err->times = gpr_avl_add(gpr_avl_create(&avl_vtable_times), - (void *)(uintptr_t)GRPC_ERROR_TIME_CREATED, - box_time(gpr_now(GPR_CLOCK_REALTIME))); + + internal_set_time(&err, GRPC_ERROR_TIME_CREATED, gpr_now(GPR_CLOCK_REALTIME)); + gpr_atm_no_barrier_store(&err->error_string, 0); gpr_ref_init(&err->refs, 1); GPR_TIMER_END("grpc_error_create", 0); return err; } +static void ref_strs(grpc_error *err) { + for (size_t i = 0; i < GRPC_ERROR_STR_MAX; ++i) { + uint8_t slot = err->strs[i]; + if (slot != UINT8_MAX) { + grpc_slice_ref_internal(*(grpc_slice *)(err->arena + slot)); + } + } +} + +static void ref_errs(grpc_error *err) { + uint8_t slot = err->first_err; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot); + GRPC_ERROR_REF(lerr->err); + slot = lerr->next; + } +} + static grpc_error *copy_error_and_unref(grpc_error *in) { GPR_TIMER_BEGIN("copy_error_and_unref", 0); grpc_error *out; if (grpc_error_is_special(in)) { - if (in == GRPC_ERROR_NONE) - out = grpc_error_set_int(GRPC_ERROR_CREATE("no error"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK); - else if (in == GRPC_ERROR_OOM) - out = GRPC_ERROR_CREATE("oom"); - else if (in == GRPC_ERROR_CANCELLED) - out = - grpc_error_set_int(GRPC_ERROR_CREATE("cancelled"), - GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED); - else - out = GRPC_ERROR_CREATE("unknown"); + out = GRPC_ERROR_CREATE("unknown"); + if (in == GRPC_ERROR_NONE) { + internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_static_string("no error")); + internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK); + } else if (in == GRPC_ERROR_OOM) { + internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_static_string("oom")); + } else if (in == GRPC_ERROR_CANCELLED) { + internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION, + grpc_slice_from_static_string("cancelled")); + internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED); + } + } else if (gpr_ref_is_unique(&in->refs)) { + out = in; } else { - out = gpr_malloc(sizeof(*out)); + uint8_t new_arena_capacity = in->arena_capacity; + // the returned err will be added to, so we ensure this is room to avoid + // unneeded allocations. + if (in->arena_capacity - in->arena_size < (uint8_t)SLOTS_PER_STR) { + new_arena_capacity = (uint8_t)(3 * new_arena_capacity / 2); + } + out = gpr_malloc(sizeof(*in) + new_arena_capacity * sizeof(intptr_t)); #ifdef GRPC_ERROR_REFCOUNT_DEBUG gpr_log(GPR_DEBUG, "%p create copying %p", out, in); #endif - out->ints = gpr_avl_ref(in->ints); - out->strs = gpr_avl_ref(in->strs); - out->errs = gpr_avl_ref(in->errs); - out->times = gpr_avl_ref(in->times); + memcpy(out, in, sizeof(*in) + in->arena_size * sizeof(intptr_t)); + out->arena_capacity = new_arena_capacity; gpr_atm_no_barrier_store(&out->error_string, 0); - out->next_err = in->next_err; gpr_ref_init(&out->refs, 1); + ref_strs(out); + ref_errs(out); GRPC_ERROR_UNREF(in); } GPR_TIMER_END("copy_error_and_unref", 0); @@ -284,7 +398,7 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, intptr_t value) { GPR_TIMER_BEGIN("grpc_error_set_int", 0); grpc_error *new = copy_error_and_unref(src); - new->ints = gpr_avl_add(new->ints, (void *)(uintptr_t)which, (void *)value); + internal_set_int(&new, which, value); GPR_TIMER_END("grpc_error_set_int", 0); return new; } @@ -302,7 +416,6 @@ static special_error_status_map error_status_map[] = { bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) { GPR_TIMER_BEGIN("grpc_error_get_int", 0); - void *pp; if (grpc_error_is_special(err)) { if (which == GRPC_ERROR_INT_GRPC_STATUS) { for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); i++) { @@ -316,8 +429,9 @@ bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) { GPR_TIMER_END("grpc_error_get_int", 0); return false; } - if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) { - if (p != NULL) *p = (intptr_t)pp; + uint8_t slot = err->ints[which]; + if (slot != UINT8_MAX) { + if (p != NULL) *p = err->arena[slot]; GPR_TIMER_END("grpc_error_get_int", 0); return true; } @@ -329,8 +443,9 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, const char *value) { GPR_TIMER_BEGIN("grpc_error_set_str", 0); grpc_error *new = copy_error_and_unref(src); - new->strs = - gpr_avl_add(new->strs, (void *)(uintptr_t)which, gpr_strdup(value)); + internal_set_str(&new, which, + grpc_slice_from_copied_buffer( + value, strlen(value) + 1)); // TODO, pull this up. GPR_TIMER_END("grpc_error_set_str", 0); return new; } @@ -346,13 +461,19 @@ const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) { } return NULL; } - return gpr_avl_get(err->strs, (void *)(uintptr_t)which); + uint8_t slot = err->strs[which]; + if (slot != UINT8_MAX) { + return (const char *)GRPC_SLICE_START_PTR( + *(grpc_slice *)(err->arena + slot)); + } else { + return NULL; + } } grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { GPR_TIMER_BEGIN("grpc_error_add_child", 0); grpc_error *new = copy_error_and_unref(src); - new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child); + internal_add_error(&new, child); GPR_TIMER_END("grpc_error_add_child", 0); return new; } @@ -372,42 +493,6 @@ typedef struct { size_t cap_kvs; } kv_pairs; -static void append_kv(kv_pairs *kvs, char *key, char *value) { - if (kvs->num_kvs == kvs->cap_kvs) { - kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4); - kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs); - } - kvs->kvs[kvs->num_kvs].key = key; - kvs->kvs[kvs->num_kvs].value = value; - kvs->num_kvs++; -} - -static void collect_kvs(gpr_avl_node *node, char *key(void *k), - char *fmt(void *v), kv_pairs *kvs) { - if (node == NULL) return; - append_kv(kvs, key(node->key), fmt(node->value)); - collect_kvs(node->left, key, fmt, kvs); - collect_kvs(node->right, key, fmt, kvs); -} - -static char *key_int(void *p) { - return gpr_strdup(error_int_name((grpc_error_ints)(uintptr_t)p)); -} - -static char *key_str(void *p) { - return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p)); -} - -static char *key_time(void *p) { - return gpr_strdup(error_time_name((grpc_error_times)(uintptr_t)p)); -} - -static char *fmt_int(void *p) { - char *s; - gpr_asprintf(&s, "%" PRIdPTR, (intptr_t)p); - return s; -} - static void append_chr(char c, char **s, size_t *sz, size_t *cap) { if (*sz == *cap) { *cap = GPR_MAX(8, 3 * *cap / 2); @@ -459,6 +544,40 @@ static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) { append_chr('"', s, sz, cap); } +static void append_kv(kv_pairs *kvs, char *key, char *value) { + if (kvs->num_kvs == kvs->cap_kvs) { + kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4); + kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs); + } + kvs->kvs[kvs->num_kvs].key = key; + kvs->kvs[kvs->num_kvs].value = value; + kvs->num_kvs++; +} + +static char *key_int(grpc_error_ints which) { + return gpr_strdup(error_int_name(which)); +} + +static char *fmt_int(intptr_t p) { + char *s; + gpr_asprintf(&s, "%" PRIdPTR, p); + return s; +} + +static void collect_ints_kvs(grpc_error *err, kv_pairs *kvs) { + for (size_t which = 0; which < GRPC_ERROR_INT_MAX; ++which) { + uint8_t slot = err->ints[which]; + if (slot != UINT8_MAX) { + append_kv(kvs, key_int((grpc_error_ints)which), + fmt_int(err->arena[slot])); + } + } +} + +static char *key_str(grpc_error_strs which) { + return gpr_strdup(error_str_name(which)); +} + static char *fmt_str(void *p) { char *s = NULL; size_t sz = 0; @@ -468,8 +587,22 @@ static char *fmt_str(void *p) { return s; } -static char *fmt_time(void *p) { - gpr_timespec tm = *(gpr_timespec *)p; +static void collect_strs_kvs(grpc_error *err, kv_pairs *kvs) { + for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) { + uint8_t slot = err->strs[which]; + if (slot != UINT8_MAX) { + append_kv( + kvs, key_str((grpc_error_strs)which), + fmt_str(GRPC_SLICE_START_PTR(*(grpc_slice *)(err->arena + slot)))); + } + } +} + +static char *key_time(grpc_error_times which) { + return gpr_strdup(error_time_name(which)); +} + +static char *fmt_time(gpr_timespec tm) { char *out; char *pfx = "!!"; switch (tm.clock_type) { @@ -490,24 +623,37 @@ static char *fmt_time(void *p) { return out; } -static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap, - bool *first) { - if (n == NULL) return; - add_errs(n->left, s, sz, cap, first); - if (!*first) append_chr(',', s, sz, cap); - *first = false; - const char *e = grpc_error_string(n->value); - append_str(e, s, sz, cap); - add_errs(n->right, s, sz, cap, first); +static void collect_times_kvs(grpc_error *err, kv_pairs *kvs) { + for (size_t which = 0; which < GRPC_ERROR_TIME_MAX; ++which) { + uint8_t slot = err->times[which]; + if (slot != UINT8_MAX) { + append_kv(kvs, key_time((grpc_error_times)which), + fmt_time(*(gpr_timespec *)(err->arena + slot))); + } + } +} + +static void add_errs(grpc_error *err, char **s, size_t *sz, size_t *cap) { + uint8_t slot = err->first_err; + bool first = true; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot); + if (!first) append_chr(',', s, sz, cap); + first = false; + const char *e = grpc_error_string(lerr->err); + append_str(e, s, sz, cap); + GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX + : lerr->next != UINT8_MAX); + slot = lerr->next; + } } static char *errs_string(grpc_error *err) { char *s = NULL; size_t sz = 0; size_t cap = 0; - bool first = true; append_chr('[', &s, &sz, &cap); - add_errs(err->errs.root, &s, &sz, &cap, &first); + add_errs(err, &s, &sz, &cap); append_chr(']', &s, &sz, &cap); append_chr(0, &s, &sz, &cap); return s; @@ -555,10 +701,10 @@ const char *grpc_error_string(grpc_error *err) { kv_pairs kvs; memset(&kvs, 0, sizeof(kvs)); - collect_kvs(err->ints.root, key_int, fmt_int, &kvs); - collect_kvs(err->strs.root, key_str, fmt_str, &kvs); - collect_kvs(err->times.root, key_time, fmt_time, &kvs); - if (!gpr_avl_is_empty(err->errs)) { + collect_ints_kvs(err, &kvs); + collect_strs_kvs(err, &kvs); + collect_times_kvs(err, &kvs); + if (err->first_err != UINT8_MAX) { append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err)); } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 2613512acb..eb953947ae 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -102,6 +102,9 @@ typedef enum { GRPC_ERROR_INT_LIMIT, /// chttp2: did the error occur while a write was in progress GRPC_ERROR_INT_OCCURRED_DURING_WRITE, + + /// Must always be last + GRPC_ERROR_INT_MAX, } grpc_error_ints; typedef enum { @@ -129,11 +132,17 @@ typedef enum { GRPC_ERROR_STR_KEY, /// value associated with the error GRPC_ERROR_STR_VALUE, + + /// Must always be last + GRPC_ERROR_STR_MAX, } grpc_error_strs; typedef enum { /// timestamp of error creation GRPC_ERROR_TIME_CREATED, + + /// Must always be last + GRPC_ERROR_TIME_MAX, } grpc_error_times; /// The following "special" errors can be propagated without allocating memory. @@ -184,8 +193,6 @@ void grpc_error_unref(grpc_error *err); grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which, intptr_t value) GRPC_MUST_USE_RESULT; bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p); -grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which, - gpr_timespec value) GRPC_MUST_USE_RESULT; grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which, const char *value) GRPC_MUST_USE_RESULT; /// Returns NULL if the specified string is not set. diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h index 1c89ead4ed..fb4814e41f 100644 --- a/src/core/lib/iomgr/error_internal.h +++ b/src/core/lib/iomgr/error_internal.h @@ -35,18 +35,28 @@ #define GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H #include <inttypes.h> -#include <stdbool.h> +#include <stdbool.h> // TODO, do we need this? -#include <grpc/support/avl.h> +#include <grpc/support/sync.h> + +typedef struct grpc_linked_error grpc_linked_error; + +struct grpc_linked_error { + grpc_error *err; + uint8_t next; +}; struct grpc_error { gpr_refcount refs; - gpr_avl ints; - gpr_avl strs; - gpr_avl times; - gpr_avl errs; - uintptr_t next_err; + uint8_t ints[GRPC_ERROR_INT_MAX]; + uint8_t strs[GRPC_ERROR_STR_MAX]; + uint8_t times[GRPC_ERROR_TIME_MAX]; + uint8_t first_err; + uint8_t last_err; gpr_atm error_string; + uint8_t arena_size; + uint8_t arena_capacity; + intptr_t arena[0]; }; bool grpc_error_is_special(grpc_error *err); diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c index ffa62cb53c..9d2732666b 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.c @@ -162,6 +162,7 @@ int grpc_sockaddr_to_string(char **out, char ntop_buf[INET6_ADDRSTRLEN]; const void *ip = NULL; int port; + uint32_t sin6_scope_id = 0; int ret; *out = NULL; @@ -177,10 +178,19 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr; ip = &addr6->sin6_addr; port = ntohs(addr6->sin6_port); + sin6_scope_id = addr6->sin6_scope_id; } if (ip != NULL && grpc_inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) { - ret = gpr_join_host_port(out, ntop_buf, port); + if (sin6_scope_id != 0) { + char *host_with_scope; + /* Enclose sin6_scope_id with the format defined in RFC 6784 section 2. */ + gpr_asprintf(&host_with_scope, "%s%%25%" PRIu32, ntop_buf, sin6_scope_id); + ret = gpr_join_host_port(out, host_with_scope, port); + gpr_free(host_with_scope); + } else { + ret = gpr_join_host_port(out, ntop_buf, port); + } } else { ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family); } diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 36f878fdd4..5f286a6723 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -114,6 +114,8 @@ struct grpc_tcp_server { /* is this server shutting down? */ bool shutdown; + /* have listeners been shutdown? */ + bool shutdown_listeners; /* use SO_REUSEPORT */ bool so_reuseport; /* expand wildcard addresses to a list of all local addresses */ @@ -161,7 +163,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server **server) { gpr_once_init(&check_init, init); - grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; s->resource_quota = grpc_resource_quota_create(NULL); s->expand_wildcard_addrs = false; @@ -422,7 +424,14 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); return; default: - gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); + gpr_mu_lock(&sp->server->mu); + if (!sp->server->shutdown_listeners) { + gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); + } else { + /* if we have shutdown listeners, accept4 could fail, and we + needn't notify users */ + } + gpr_mu_unlock(&sp->server->mu); goto error; } } @@ -438,11 +447,6 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_fd *fdobj = grpc_fd_create(fd, name); - if (read_notifier_pollset == NULL) { - gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd"); - goto error; - } - grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj); // Create acceptor. @@ -941,6 +945,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_mu_lock(&s->mu); + s->shutdown_listeners = true; /* shutdown all fd's */ if (s->active_ports) { grpc_tcp_listener *sp; diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 2a1c8d39fa..d1bcd89af1 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -485,7 +485,11 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); - s->active_ports++; + /* Registered for both read and write callbacks: increment active_ports + * twice to account for this, and delay free-ing of memory until both + * on_read and on_write have fired. */ + s->active_ports += 2; + sp = sp->next; } diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index 71d32d97ba..c8dd242c75 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -46,7 +46,7 @@ * * Setup: * 1. Before calling anything, call global_init() at least once. - * 1. Call grpc_wakeup_fd_create() to get a wakeup_fd. + * 1. Call grpc_wakeup_fd_init() to set up a wakeup_fd. * 2. Add the result of GRPC_WAKEUP_FD_FD to the set of monitored file * descriptors for the poll() style API you are using. Monitor the file * descriptor for readability. diff --git a/src/core/lib/support/sync.c b/src/core/lib/support/sync.c index e4a7fce646..b52f004f74 100644 --- a/src/core/lib/support/sync.c +++ b/src/core/lib/support/sync.c @@ -119,6 +119,10 @@ int gpr_unref(gpr_refcount *r) { return prior == 1; } +int gpr_ref_is_unique(gpr_refcount *r) { + return gpr_atm_acq_load(&r->count) == 1; +} + void gpr_stats_init(gpr_stats_counter *c, intptr_t n) { gpr_atm_rel_store(&c->value, n); } diff --git a/src/core/lib/transport/error_utils.c b/src/core/lib/transport/error_utils.c index da77828d9c..ef55e561fb 100644 --- a/src/core/lib/transport/error_utils.c +++ b/src/core/lib/transport/error_utils.c @@ -44,12 +44,12 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error, } if (grpc_error_is_special(error)) return NULL; // Otherwise, search through its children. - intptr_t key = 0; - while (true) { - grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++); - if (child_error == NULL) break; - grpc_error *result = recursively_find_error_with_field(child_error, which); - if (result != NULL) return result; + uint8_t slot = error->first_err; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot); + grpc_error *result = recursively_find_error_with_field(lerr->err, which); + if (result) return result; + slot = lerr->next; } return NULL; } @@ -112,13 +112,13 @@ bool grpc_error_has_clear_grpc_status(grpc_error *error) { if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) { return true; } - intptr_t key = 0; - while (true) { - grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++); - if (child_error == NULL) break; - if (grpc_error_has_clear_grpc_status(child_error)) { + uint8_t slot = error->first_err; + while (slot != UINT8_MAX) { + grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot); + if (grpc_error_has_clear_grpc_status(lerr->err)) { return true; } + slot = lerr->next; } return false; } |