aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Muxi Yan <mxyan@google.com>2017-03-12 18:33:47 -0700
committerGravatar Muxi Yan <mxyan@google.com>2017-03-12 18:33:47 -0700
commit43be708bb2a7b3c7dce8cc25059be3fce7228d66 (patch)
tree82217258fbf81514d5bf13a73d6501eb8fbde363 /src/core
parent39a797387f1a7bbfd5cad235a90d1059bfddfdba (diff)
parent0b7bd20de4f0d219c399ef8c01c05026bf12da5d (diff)
Merge remote-tracking branch 'upstream/master' into fix-flush-read
Diffstat (limited to 'src/core')
-rw-r--r--src/core/ext/client_channel/client_channel_plugin.c2
-rw-r--r--src/core/ext/client_channel/http_proxy.c10
-rw-r--r--src/core/ext/client_channel/parse_address.c28
-rw-r--r--src/core/ext/client_channel/resolver_registry.c22
-rw-r--r--src/core/ext/client_channel/resolver_registry.h5
-rw-r--r--src/core/ext/client_channel/subchannel.c11
-rw-r--r--src/core/ext/client_channel/subchannel.h3
-rw-r--r--src/core/ext/client_channel/uri_parser.c37
-rw-r--r--src/core/ext/client_channel/uri_parser.h4
-rw-r--r--src/core/ext/lb_policy/grpclb/grpclb.c2
-rw-r--r--src/core/ext/transport/chttp2/client/chttp2_connector.c2
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c3
-rw-r--r--src/core/ext/transport/chttp2/client/secure/secure_channel_create.c7
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c42
-rw-r--r--src/core/lib/iomgr/error.c422
-rw-r--r--src/core/lib/iomgr/error.h11
-rw-r--r--src/core/lib/iomgr/error_internal.h24
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.c12
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c19
-rw-r--r--src/core/lib/iomgr/udp_server.c6
-rw-r--r--src/core/lib/iomgr/wakeup_fd_posix.h2
-rw-r--r--src/core/lib/support/sync.c4
-rw-r--r--src/core/lib/transport/error_utils.c22
23 files changed, 477 insertions, 223 deletions
diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c
index 6f9df3e386..28d3b63f99 100644
--- a/src/core/ext/client_channel/client_channel_plugin.c
+++ b/src/core/ext/client_channel/client_channel_plugin.c
@@ -64,7 +64,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx,
}
}
char *default_authority = grpc_get_default_authority(
- grpc_channel_stack_builder_get_target(builder));
+ exec_ctx, grpc_channel_stack_builder_get_target(builder));
if (default_authority != NULL) {
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
diff --git a/src/core/ext/client_channel/http_proxy.c b/src/core/ext/client_channel/http_proxy.c
index bbe4ff550c..e280cef101 100644
--- a/src/core/ext/client_channel/http_proxy.c
+++ b/src/core/ext/client_channel/http_proxy.c
@@ -46,10 +46,11 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/support/env.h"
-static char* grpc_get_http_proxy_server() {
+static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) {
char* uri_str = gpr_getenv("http_proxy");
if (uri_str == NULL) return NULL;
- grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */);
+ grpc_uri* uri =
+ grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */);
char* proxy_name = NULL;
if (uri == NULL || uri->authority == NULL) {
gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var");
@@ -76,9 +77,10 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
const grpc_channel_args* args,
char** name_to_resolve,
grpc_channel_args** new_args) {
- *name_to_resolve = grpc_get_http_proxy_server();
+ *name_to_resolve = grpc_get_http_proxy_server(exec_ctx);
if (*name_to_resolve == NULL) return false;
- grpc_uri* uri = grpc_uri_parse(server_uri, false /* suppress_errors */);
+ grpc_uri* uri =
+ grpc_uri_parse(exec_ctx, server_uri, false /* suppress_errors */);
if (uri == NULL || uri->path[0] == '\0') {
gpr_log(GPR_ERROR,
"'http_proxy' environment variable set, but cannot "
diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c
index 8e4da03de0..8ae15fc72b 100644
--- a/src/core/ext/client_channel/parse_address.c
+++ b/src/core/ext/client_channel/parse_address.c
@@ -44,6 +44,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/support/string.h"
#ifdef GRPC_HAVE_UNIX_SOCKET
@@ -120,9 +121,30 @@ int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) {
memset(in6, 0, sizeof(*in6));
resolved_addr->len = sizeof(*in6);
in6->sin6_family = AF_INET6;
- if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
- gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
- goto done;
+
+ /* Handle the RFC6874 syntax for IPv6 zone identifiers. */
+ char *host_end = (char *)gpr_memrchr(host, '%', strlen(host));
+ if (host_end != NULL) {
+ GPR_ASSERT(host_end >= host);
+ char host_without_scope[INET6_ADDRSTRLEN];
+ size_t host_without_scope_len = (size_t)(host_end - host);
+ strncpy(host_without_scope, host, host_without_scope_len);
+ host_without_scope[host_without_scope_len] = '\0';
+ if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope);
+ goto done;
+ }
+ if (gpr_parse_bytes_to_uint32(host_end + 1,
+ strlen(host) - host_without_scope_len - 1,
+ &in6->sin6_scope_id) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1);
+ goto done;
+ }
+ } else {
+ if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) {
+ gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host);
+ goto done;
+ }
}
if (port != NULL) {
diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c
index f8e8bc9c39..3c5a6fb3ff 100644
--- a/src/core/ext/client_channel/resolver_registry.c
+++ b/src/core/ext/client_channel/resolver_registry.c
@@ -108,22 +108,23 @@ static grpc_resolver_factory *lookup_factory_by_uri(grpc_uri *uri) {
return lookup_factory(uri->scheme);
}
-static grpc_resolver_factory *resolve_factory(const char *target,
+static grpc_resolver_factory *resolve_factory(grpc_exec_ctx *exec_ctx,
+ const char *target,
grpc_uri **uri,
char **canonical_target) {
grpc_resolver_factory *factory = NULL;
GPR_ASSERT(uri != NULL);
- *uri = grpc_uri_parse(target, 1);
+ *uri = grpc_uri_parse(exec_ctx, target, 1);
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
grpc_uri_destroy(*uri);
gpr_asprintf(canonical_target, "%s%s", g_default_resolver_prefix, target);
- *uri = grpc_uri_parse(*canonical_target, 1);
+ *uri = grpc_uri_parse(exec_ctx, *canonical_target, 1);
factory = lookup_factory_by_uri(*uri);
if (factory == NULL) {
- grpc_uri_destroy(grpc_uri_parse(target, 0));
- grpc_uri_destroy(grpc_uri_parse(*canonical_target, 0));
+ grpc_uri_destroy(grpc_uri_parse(exec_ctx, target, 0));
+ grpc_uri_destroy(grpc_uri_parse(exec_ctx, *canonical_target, 0));
gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target,
*canonical_target);
}
@@ -138,7 +139,7 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
grpc_uri *uri = NULL;
char *canonical_target = NULL;
grpc_resolver_factory *factory =
- resolve_factory(target, &uri, &canonical_target);
+ resolve_factory(exec_ctx, target, &uri, &canonical_target);
grpc_resolver *resolver;
grpc_resolver_args resolver_args;
memset(&resolver_args, 0, sizeof(resolver_args));
@@ -153,21 +154,22 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target,
return resolver;
}
-char *grpc_get_default_authority(const char *target) {
+char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target) {
grpc_uri *uri = NULL;
char *canonical_target = NULL;
grpc_resolver_factory *factory =
- resolve_factory(target, &uri, &canonical_target);
+ resolve_factory(exec_ctx, target, &uri, &canonical_target);
char *authority = grpc_resolver_factory_get_default_authority(factory, uri);
grpc_uri_destroy(uri);
gpr_free(canonical_target);
return authority;
}
-char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target) {
+char *grpc_resolver_factory_add_default_prefix_if_needed(
+ grpc_exec_ctx *exec_ctx, const char *target) {
grpc_uri *uri = NULL;
char *canonical_target = NULL;
- resolve_factory(target, &uri, &canonical_target);
+ resolve_factory(exec_ctx, target, &uri, &canonical_target);
grpc_uri_destroy(uri);
return canonical_target == NULL ? gpr_strdup(target) : canonical_target;
}
diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h
index e2c189cf0c..1a3ebee25a 100644
--- a/src/core/ext/client_channel/resolver_registry.h
+++ b/src/core/ext/client_channel/resolver_registry.h
@@ -74,10 +74,11 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name);
/** Given a target, return a (freshly allocated with gpr_malloc) string
representing the default authority to pass from a client. */
-char *grpc_get_default_authority(const char *target);
+char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target);
/** Returns a newly allocated string containing \a target, adding the
default prefix if needed. */
-char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target);
+char *grpc_resolver_factory_add_default_prefix_if_needed(
+ grpc_exec_ctx *exec_ctx, const char *target);
#endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c
index cb2d2c821d..5df0a9060d 100644
--- a/src/core/ext/client_channel/subchannel.c
+++ b/src/core/ext/client_channel/subchannel.c
@@ -331,7 +331,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
}
c->pollset_set = grpc_pollset_set_create();
grpc_resolved_address *addr = gpr_malloc(sizeof(*addr));
- grpc_get_subchannel_address_arg(args->args, addr);
+ grpc_get_subchannel_address_arg(exec_ctx, args->args, addr);
grpc_set_initial_connect_string(&addr, &c->initial_connect_string);
grpc_resolved_address *new_address = NULL;
grpc_channel_args *new_args = NULL;
@@ -787,9 +787,9 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack(
return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
}
-static void grpc_uri_to_sockaddr(const char *uri_str,
+static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str,
grpc_resolved_address *addr) {
- grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */);
+ grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */);
GPR_ASSERT(uri != NULL);
if (strcmp(uri->scheme, "ipv4") == 0) {
GPR_ASSERT(parse_ipv4(uri, addr));
@@ -801,12 +801,13 @@ static void grpc_uri_to_sockaddr(const char *uri_str,
grpc_uri_destroy(uri);
}
-void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
+void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx,
+ const grpc_channel_args *args,
grpc_resolved_address *addr) {
const char *addr_uri_str = grpc_get_subchannel_address_uri_arg(args);
memset(addr, 0, sizeof(*addr));
if (*addr_uri_str != '\0') {
- grpc_uri_to_sockaddr(addr_uri_str, addr);
+ grpc_uri_to_sockaddr(exec_ctx, addr_uri_str, addr);
}
}
diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h
index 26ce954487..6a70a76467 100644
--- a/src/core/ext/client_channel/subchannel.h
+++ b/src/core/ext/client_channel/subchannel.h
@@ -175,7 +175,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx,
const grpc_subchannel_args *args);
/// Sets \a addr from \a args.
-void grpc_get_subchannel_address_arg(const grpc_channel_args *args,
+void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx,
+ const grpc_channel_args *args,
grpc_resolved_address *addr);
/// Returns the URI string for the address to connect to.
diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c
index 7dd7b753cc..d385db0801 100644
--- a/src/core/ext/client_channel/uri_parser.c
+++ b/src/core/ext/client_channel/uri_parser.c
@@ -35,13 +35,15 @@
#include <string.h>
-#include <grpc/slice.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/string_util.h>
+#include "src/core/lib/slice/percent_encoding.h"
+#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
/** a size_t default value... maps to all 1's */
@@ -68,11 +70,16 @@ static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section,
return NULL;
}
-/** Returns a copy of \a src[begin, end) */
-static char *copy_component(const char *src, size_t begin, size_t end) {
- char *out = gpr_malloc(end - begin + 1);
- memcpy(out, src + begin, end - begin);
- out[end - begin] = 0;
+/** Returns a copy of percent decoded \a src[begin, end) */
+static char *decode_and_copy_component(grpc_exec_ctx *exec_ctx, const char *src,
+ size_t begin, size_t end) {
+ grpc_slice component =
+ grpc_slice_from_copied_buffer(src + begin, end - begin);
+ grpc_slice decoded_component =
+ grpc_permissive_percent_decode_slice(component);
+ char *out = grpc_dump_slice(decoded_component, GPR_DUMP_ASCII);
+ grpc_slice_unref_internal(exec_ctx, component);
+ grpc_slice_unref_internal(exec_ctx, decoded_component);
return out;
}
@@ -175,7 +182,8 @@ static void parse_query_parts(grpc_uri *uri) {
}
}
-grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
+grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text,
+ int suppress_errors) {
grpc_uri *uri;
size_t scheme_begin = 0;
size_t scheme_end = NOT_SET;
@@ -263,11 +271,16 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) {
}
uri = gpr_zalloc(sizeof(*uri));
- uri->scheme = copy_component(uri_text, scheme_begin, scheme_end);
- uri->authority = copy_component(uri_text, authority_begin, authority_end);
- uri->path = copy_component(uri_text, path_begin, path_end);
- uri->query = copy_component(uri_text, query_begin, query_end);
- uri->fragment = copy_component(uri_text, fragment_begin, fragment_end);
+ uri->scheme =
+ decode_and_copy_component(exec_ctx, uri_text, scheme_begin, scheme_end);
+ uri->authority = decode_and_copy_component(exec_ctx, uri_text,
+ authority_begin, authority_end);
+ uri->path =
+ decode_and_copy_component(exec_ctx, uri_text, path_begin, path_end);
+ uri->query =
+ decode_and_copy_component(exec_ctx, uri_text, query_begin, query_end);
+ uri->fragment = decode_and_copy_component(exec_ctx, uri_text, fragment_begin,
+ fragment_end);
parse_query_parts(uri);
return uri;
diff --git a/src/core/ext/client_channel/uri_parser.h b/src/core/ext/client_channel/uri_parser.h
index 5fe0e8f35e..efd4302c1c 100644
--- a/src/core/ext/client_channel/uri_parser.h
+++ b/src/core/ext/client_channel/uri_parser.h
@@ -35,6 +35,7 @@
#define GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H
#include <stddef.h>
+#include "src/core/lib/iomgr/exec_ctx.h"
typedef struct {
char *scheme;
@@ -51,7 +52,8 @@ typedef struct {
} grpc_uri;
/** parse a uri, return NULL on failure */
-grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors);
+grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text,
+ int suppress_errors);
/** return the part of a query string after the '=' in "?key=xxx&...", or NULL
* if key is not present */
diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c
index aea0fcc33d..d612591f2e 100644
--- a/src/core/ext/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/lb_policy/grpclb/grpclb.c
@@ -861,7 +861,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx,
arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI);
GPR_ASSERT(arg != NULL);
GPR_ASSERT(arg->type == GRPC_ARG_STRING);
- grpc_uri *uri = grpc_uri_parse(arg->value.string, true);
+ grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true);
GPR_ASSERT(uri->path[0] != '\0');
glb_policy->server_name =
gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c
index fc5e17d8fc..eae0145ecc 100644
--- a/src/core/ext/transport/chttp2/client/chttp2_connector.c
+++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c
@@ -226,7 +226,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx,
grpc_closure *notify) {
chttp2_connector *c = (chttp2_connector *)con;
grpc_resolved_address addr;
- grpc_get_subchannel_address_arg(args->channel_args, &addr);
+ grpc_get_subchannel_address_arg(exec_ctx, args->channel_args, &addr);
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->notify == NULL);
c->notify = notify;
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index 286232f277..067ac35a5a 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -72,7 +72,8 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
- arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target);
+ arg.value.string =
+ grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target);
const char *to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args *new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
index 825db68c65..f0c241d68e 100644
--- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
+++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c
@@ -83,7 +83,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args(
const char *server_uri_str = server_uri_arg->value.string;
GPR_ASSERT(server_uri_str != NULL);
grpc_uri *server_uri =
- grpc_uri_parse(server_uri_str, true /* supress errors */);
+ grpc_uri_parse(exec_ctx, server_uri_str, true /* supress errors */);
GPR_ASSERT(server_uri != NULL);
const char *server_uri_path;
server_uri_path =
@@ -96,7 +96,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args(
const char *target_uri_str =
grpc_get_subchannel_address_uri_arg(args->args);
grpc_uri *target_uri =
- grpc_uri_parse(target_uri_str, false /* suppress errors */);
+ grpc_uri_parse(exec_ctx, target_uri_str, false /* suppress errors */);
GPR_ASSERT(target_uri != NULL);
if (target_uri->path[0] != '\0') { // "path" may be empty
const grpc_slice key = grpc_slice_from_static_string(
@@ -181,7 +181,8 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_arg arg;
arg.type = GRPC_ARG_STRING;
arg.key = GRPC_ARG_SERVER_URI;
- arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target);
+ arg.value.string =
+ grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target);
const char *to_remove[] = {GRPC_ARG_SERVER_URI};
grpc_channel_args *new_args =
grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1);
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 7ed00522c3..e7f2597f89 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -381,16 +381,38 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx,
s->incoming_window_delta +
t->settings[GRPC_ACKED_SETTINGS]
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
- char *msg;
- gpr_asprintf(&msg,
- "frame of size %d overflows incoming window of %" PRId64,
- t->incoming_frame_size,
- s->incoming_window_delta +
- t->settings[GRPC_ACKED_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
- grpc_error *err = GRPC_ERROR_CREATE(msg);
- gpr_free(msg);
- return err;
+ if (incoming_frame_size <=
+ s->incoming_window_delta +
+ t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) {
+ gpr_log(
+ GPR_ERROR,
+ "Incoming frame of size %d exceeds incoming window size of %" PRId64
+ ".\n"
+ "The (un-acked, future) window size would be %" PRId64
+ " which is not exceeded.\n"
+ "This would usually cause a disconnection, but allowing it due to "
+ "broken HTTP2 implementations in the wild.\n"
+ "See (for example) https://github.com/netty/netty/issues/6520.",
+ t->incoming_frame_size,
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE],
+ s->incoming_window_delta +
+ t->settings[GRPC_SENT_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+ } else {
+ char *msg;
+ gpr_asprintf(&msg,
+ "frame of size %d overflows incoming window of %" PRId64,
+ t->incoming_frame_size,
+ s->incoming_window_delta +
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]);
+ grpc_error *err = GRPC_ERROR_CREATE(msg);
+ gpr_free(msg);
+ return err;
+ }
}
GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s,
diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c
index dbe5b139f9..7cdbe30198 100644
--- a/src/core/lib/iomgr/error.c
+++ b/src/core/lib/iomgr/error.c
@@ -35,6 +35,7 @@
#include <string.h>
+#include <grpc/slice.h>
#include <grpc/status.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -47,46 +48,7 @@
#include "src/core/lib/iomgr/error_internal.h"
#include "src/core/lib/profiling/timers.h"
-
-static void destroy_integer(void *key) {}
-
-static void *copy_integer(void *key) { return key; }
-
-static long compare_integers(void *key1, void *key2) {
- return GPR_ICMP((uintptr_t)key1, (uintptr_t)key2);
-}
-
-static void destroy_string(void *str) { gpr_free(str); }
-
-static void *copy_string(void *str) { return gpr_strdup(str); }
-
-static void destroy_err(void *err) { GRPC_ERROR_UNREF(err); }
-
-static void *copy_err(void *err) { return GRPC_ERROR_REF(err); }
-
-static void destroy_time(void *tm) { gpr_free(tm); }
-
-static gpr_timespec *box_time(gpr_timespec tm) {
- gpr_timespec *out = gpr_malloc(sizeof(*out));
- *out = tm;
- return out;
-}
-
-static void *copy_time(void *tm) { return box_time(*(gpr_timespec *)tm); }
-
-static const gpr_avl_vtable avl_vtable_ints = {destroy_integer, copy_integer,
- compare_integers,
- destroy_integer, copy_integer};
-
-static const gpr_avl_vtable avl_vtable_strs = {destroy_integer, copy_integer,
- compare_integers, destroy_string,
- copy_string};
-
-static const gpr_avl_vtable avl_vtable_times = {
- destroy_integer, copy_integer, compare_integers, destroy_time, copy_time};
-
-static const gpr_avl_vtable avl_vtable_errs = {
- destroy_integer, copy_integer, compare_integers, destroy_err, copy_err};
+#include "src/core/lib/slice/slice_internal.h"
static const char *error_int_name(grpc_error_ints key) {
switch (key) {
@@ -120,6 +82,8 @@ static const char *error_int_name(grpc_error_ints key) {
return "limit";
case GRPC_ERROR_INT_OCCURRED_DURING_WRITE:
return "occurred_during_write";
+ case GRPC_ERROR_INT_MAX:
+ GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -150,6 +114,8 @@ static const char *error_str_name(grpc_error_strs key) {
return "filename";
case GRPC_ERROR_STR_QUEUED_BUFFERS:
return "queued_buffers";
+ case GRPC_ERROR_STR_MAX:
+ GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -158,6 +124,8 @@ static const char *error_time_name(grpc_error_times key) {
switch (key) {
case GRPC_ERROR_TIME_CREATED:
return "created";
+ case GRPC_ERROR_TIME_MAX:
+ GPR_UNREACHABLE_CODE(return "unknown");
}
GPR_UNREACHABLE_CODE(return "unknown");
}
@@ -184,12 +152,36 @@ grpc_error *grpc_error_ref(grpc_error *err) {
}
#endif
+static void unref_errs(grpc_error *err) {
+ uint8_t slot = err->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot);
+ GRPC_ERROR_UNREF(lerr->err);
+ GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX
+ : lerr->next != UINT8_MAX);
+ slot = lerr->next;
+ }
+}
+
+static void unref_slice(grpc_slice slice) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_slice_unref_internal(&exec_ctx, slice);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void unref_strs(grpc_error *err) {
+ for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) {
+ uint8_t slot = err->strs[which];
+ if (slot != UINT8_MAX) {
+ unref_slice(*(grpc_slice *)(err->arena + slot));
+ }
+ }
+}
+
static void error_destroy(grpc_error *err) {
GPR_ASSERT(!grpc_error_is_special(err));
- gpr_avl_unref(err->ints);
- gpr_avl_unref(err->strs);
- gpr_avl_unref(err->errs);
- gpr_avl_unref(err->times);
+ unref_errs(err);
+ unref_strs(err);
gpr_free((void *)gpr_atm_acq_load(&err->error_string));
gpr_free(err);
}
@@ -213,67 +205,189 @@ void grpc_error_unref(grpc_error *err) {
}
#endif
+static uint8_t get_placement(grpc_error **err, size_t size) {
+ GPR_ASSERT(*err);
+ uint8_t slots = (uint8_t)(size / sizeof(intptr_t));
+ if ((*err)->arena_size + slots > (*err)->arena_capacity) {
+ (*err)->arena_capacity = (uint8_t)(3 * (*err)->arena_capacity / 2);
+ *err = gpr_realloc(
+ *err, sizeof(grpc_error) + (*err)->arena_capacity * sizeof(intptr_t));
+ }
+ uint8_t placement = (*err)->arena_size;
+ (*err)->arena_size = (uint8_t)((*err)->arena_size + slots);
+ return placement;
+}
+
+static void internal_set_int(grpc_error **err, grpc_error_ints which,
+ intptr_t value) {
+ // GPR_ASSERT((*err)->ints[which] == UINT8_MAX); // TODO, enforce this
+ uint8_t slot = (*err)->ints[which];
+ if (slot == UINT8_MAX) {
+ slot = get_placement(err, sizeof(value));
+ }
+ (*err)->ints[which] = slot;
+ (*err)->arena[slot] = value;
+}
+
+static void internal_set_str(grpc_error **err, grpc_error_strs which,
+ grpc_slice value) {
+ // GPR_ASSERT((*err)->strs[which] == UINT8_MAX); // TODO, enforce this
+ uint8_t slot = (*err)->strs[which];
+ if (slot == UINT8_MAX) {
+ slot = get_placement(err, sizeof(value));
+ } else {
+ unref_slice(*(grpc_slice *)((*err)->arena + slot));
+ }
+ (*err)->strs[which] = slot;
+ memcpy((*err)->arena + slot, &value, sizeof(value));
+}
+
+static void internal_set_time(grpc_error **err, grpc_error_times which,
+ gpr_timespec value) {
+ // GPR_ASSERT((*err)->times[which] == UINT8_MAX); // TODO, enforce this
+ uint8_t slot = (*err)->times[which];
+ if (slot == UINT8_MAX) {
+ slot = get_placement(err, sizeof(value));
+ }
+ (*err)->times[which] = slot;
+ memcpy((*err)->arena + slot, &value, sizeof(value));
+}
+
+static void internal_add_error(grpc_error **err, grpc_error *new) {
+ grpc_linked_error new_last = {new, UINT8_MAX};
+ uint8_t slot = get_placement(err, sizeof(grpc_linked_error));
+ if ((*err)->first_err == UINT8_MAX) {
+ GPR_ASSERT((*err)->last_err == UINT8_MAX);
+ (*err)->last_err = slot;
+ (*err)->first_err = slot;
+ } else {
+ GPR_ASSERT((*err)->last_err != UINT8_MAX);
+ grpc_linked_error *old_last =
+ (grpc_linked_error *)((*err)->arena + (*err)->last_err);
+ old_last->next = slot;
+ (*err)->last_err = slot;
+ }
+ memcpy((*err)->arena + slot, &new_last, sizeof(grpc_linked_error));
+}
+
+#define SLOTS_PER_INT (sizeof(intptr_t) / sizeof(intptr_t))
+#define SLOTS_PER_STR (sizeof(grpc_slice) / sizeof(intptr_t))
+#define SLOTS_PER_TIME (sizeof(gpr_timespec) / sizeof(intptr_t))
+#define SLOTS_PER_LINKED_ERROR (sizeof(grpc_linked_error) / sizeof(intptr_t))
+
+// size of storing one int and two slices and a timespec. For line, desc, file,
+// and time created
+#define DEFAULT_ERROR_CAPACITY \
+ (SLOTS_PER_INT + (SLOTS_PER_STR * 2) + SLOTS_PER_TIME)
+
+// It is very common to include and extra int and string in an error
+#define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME)
+
grpc_error *grpc_error_create(const char *file, int line, const char *desc,
grpc_error **referencing,
size_t num_referencing) {
GPR_TIMER_BEGIN("grpc_error_create", 0);
- grpc_error *err = gpr_malloc(sizeof(*err));
+ uint8_t initial_arena_capacity = (uint8_t)(
+ DEFAULT_ERROR_CAPACITY +
+ (uint8_t)(num_referencing * SLOTS_PER_LINKED_ERROR) + SURPLUS_CAPACITY);
+ grpc_error *err =
+ gpr_malloc(sizeof(*err) + initial_arena_capacity * sizeof(intptr_t));
if (err == NULL) { // TODO(ctiller): make gpr_malloc return NULL
return GRPC_ERROR_OOM;
}
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line);
#endif
- err->ints = gpr_avl_add(gpr_avl_create(&avl_vtable_ints),
- (void *)(uintptr_t)GRPC_ERROR_INT_FILE_LINE,
- (void *)(uintptr_t)line);
- err->strs = gpr_avl_add(
- gpr_avl_add(gpr_avl_create(&avl_vtable_strs),
- (void *)(uintptr_t)GRPC_ERROR_STR_FILE, gpr_strdup(file)),
- (void *)(uintptr_t)GRPC_ERROR_STR_DESCRIPTION, gpr_strdup(desc));
- err->errs = gpr_avl_create(&avl_vtable_errs);
- err->next_err = 0;
- for (size_t i = 0; i < num_referencing; i++) {
+
+ err->arena_size = 0;
+ err->arena_capacity = initial_arena_capacity;
+ err->first_err = UINT8_MAX;
+ err->last_err = UINT8_MAX;
+
+ memset(err->ints, UINT8_MAX, GRPC_ERROR_INT_MAX);
+ memset(err->strs, UINT8_MAX, GRPC_ERROR_STR_MAX);
+ memset(err->times, UINT8_MAX, GRPC_ERROR_TIME_MAX);
+
+ internal_set_int(&err, GRPC_ERROR_INT_FILE_LINE, line);
+ internal_set_str(&err, GRPC_ERROR_STR_FILE,
+ grpc_slice_from_static_string(file));
+ internal_set_str(
+ &err, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_copied_buffer(
+ desc,
+ strlen(desc) +
+ 1)); // TODO, pull this up. // TODO(ncteisen), pull this up.
+
+ for (size_t i = 0; i < num_referencing; ++i) {
if (referencing[i] == GRPC_ERROR_NONE) continue;
- err->errs = gpr_avl_add(err->errs, (void *)(err->next_err++),
- GRPC_ERROR_REF(referencing[i]));
+ internal_add_error(
+ &err,
+ GRPC_ERROR_REF(
+ referencing[i])); // TODO(ncteisen), change ownership semantics
}
- err->times = gpr_avl_add(gpr_avl_create(&avl_vtable_times),
- (void *)(uintptr_t)GRPC_ERROR_TIME_CREATED,
- box_time(gpr_now(GPR_CLOCK_REALTIME)));
+
+ internal_set_time(&err, GRPC_ERROR_TIME_CREATED, gpr_now(GPR_CLOCK_REALTIME));
+
gpr_atm_no_barrier_store(&err->error_string, 0);
gpr_ref_init(&err->refs, 1);
GPR_TIMER_END("grpc_error_create", 0);
return err;
}
+static void ref_strs(grpc_error *err) {
+ for (size_t i = 0; i < GRPC_ERROR_STR_MAX; ++i) {
+ uint8_t slot = err->strs[i];
+ if (slot != UINT8_MAX) {
+ grpc_slice_ref_internal(*(grpc_slice *)(err->arena + slot));
+ }
+ }
+}
+
+static void ref_errs(grpc_error *err) {
+ uint8_t slot = err->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot);
+ GRPC_ERROR_REF(lerr->err);
+ slot = lerr->next;
+ }
+}
+
static grpc_error *copy_error_and_unref(grpc_error *in) {
GPR_TIMER_BEGIN("copy_error_and_unref", 0);
grpc_error *out;
if (grpc_error_is_special(in)) {
- if (in == GRPC_ERROR_NONE)
- out = grpc_error_set_int(GRPC_ERROR_CREATE("no error"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK);
- else if (in == GRPC_ERROR_OOM)
- out = GRPC_ERROR_CREATE("oom");
- else if (in == GRPC_ERROR_CANCELLED)
- out =
- grpc_error_set_int(GRPC_ERROR_CREATE("cancelled"),
- GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED);
- else
- out = GRPC_ERROR_CREATE("unknown");
+ out = GRPC_ERROR_CREATE("unknown");
+ if (in == GRPC_ERROR_NONE) {
+ internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_static_string("no error"));
+ internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_OK);
+ } else if (in == GRPC_ERROR_OOM) {
+ internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_static_string("oom"));
+ } else if (in == GRPC_ERROR_CANCELLED) {
+ internal_set_str(&out, GRPC_ERROR_STR_DESCRIPTION,
+ grpc_slice_from_static_string("cancelled"));
+ internal_set_int(&out, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_CANCELLED);
+ }
+ } else if (gpr_ref_is_unique(&in->refs)) {
+ out = in;
} else {
- out = gpr_malloc(sizeof(*out));
+ uint8_t new_arena_capacity = in->arena_capacity;
+ // the returned err will be added to, so we ensure this is room to avoid
+ // unneeded allocations.
+ if (in->arena_capacity - in->arena_size < (uint8_t)SLOTS_PER_STR) {
+ new_arena_capacity = (uint8_t)(3 * new_arena_capacity / 2);
+ }
+ out = gpr_malloc(sizeof(*in) + new_arena_capacity * sizeof(intptr_t));
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
gpr_log(GPR_DEBUG, "%p create copying %p", out, in);
#endif
- out->ints = gpr_avl_ref(in->ints);
- out->strs = gpr_avl_ref(in->strs);
- out->errs = gpr_avl_ref(in->errs);
- out->times = gpr_avl_ref(in->times);
+ memcpy(out, in, sizeof(*in) + in->arena_size * sizeof(intptr_t));
+ out->arena_capacity = new_arena_capacity;
gpr_atm_no_barrier_store(&out->error_string, 0);
- out->next_err = in->next_err;
gpr_ref_init(&out->refs, 1);
+ ref_strs(out);
+ ref_errs(out);
GRPC_ERROR_UNREF(in);
}
GPR_TIMER_END("copy_error_and_unref", 0);
@@ -284,7 +398,7 @@ grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value) {
GPR_TIMER_BEGIN("grpc_error_set_int", 0);
grpc_error *new = copy_error_and_unref(src);
- new->ints = gpr_avl_add(new->ints, (void *)(uintptr_t)which, (void *)value);
+ internal_set_int(&new, which, value);
GPR_TIMER_END("grpc_error_set_int", 0);
return new;
}
@@ -302,7 +416,6 @@ static special_error_status_map error_status_map[] = {
bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
GPR_TIMER_BEGIN("grpc_error_get_int", 0);
- void *pp;
if (grpc_error_is_special(err)) {
if (which == GRPC_ERROR_INT_GRPC_STATUS) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(error_status_map); i++) {
@@ -316,8 +429,9 @@ bool grpc_error_get_int(grpc_error *err, grpc_error_ints which, intptr_t *p) {
GPR_TIMER_END("grpc_error_get_int", 0);
return false;
}
- if (gpr_avl_maybe_get(err->ints, (void *)(uintptr_t)which, &pp)) {
- if (p != NULL) *p = (intptr_t)pp;
+ uint8_t slot = err->ints[which];
+ if (slot != UINT8_MAX) {
+ if (p != NULL) *p = err->arena[slot];
GPR_TIMER_END("grpc_error_get_int", 0);
return true;
}
@@ -329,8 +443,9 @@ grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value) {
GPR_TIMER_BEGIN("grpc_error_set_str", 0);
grpc_error *new = copy_error_and_unref(src);
- new->strs =
- gpr_avl_add(new->strs, (void *)(uintptr_t)which, gpr_strdup(value));
+ internal_set_str(&new, which,
+ grpc_slice_from_copied_buffer(
+ value, strlen(value) + 1)); // TODO, pull this up.
GPR_TIMER_END("grpc_error_set_str", 0);
return new;
}
@@ -346,13 +461,19 @@ const char *grpc_error_get_str(grpc_error *err, grpc_error_strs which) {
}
return NULL;
}
- return gpr_avl_get(err->strs, (void *)(uintptr_t)which);
+ uint8_t slot = err->strs[which];
+ if (slot != UINT8_MAX) {
+ return (const char *)GRPC_SLICE_START_PTR(
+ *(grpc_slice *)(err->arena + slot));
+ } else {
+ return NULL;
+ }
}
grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) {
GPR_TIMER_BEGIN("grpc_error_add_child", 0);
grpc_error *new = copy_error_and_unref(src);
- new->errs = gpr_avl_add(new->errs, (void *)(new->next_err++), child);
+ internal_add_error(&new, child);
GPR_TIMER_END("grpc_error_add_child", 0);
return new;
}
@@ -372,42 +493,6 @@ typedef struct {
size_t cap_kvs;
} kv_pairs;
-static void append_kv(kv_pairs *kvs, char *key, char *value) {
- if (kvs->num_kvs == kvs->cap_kvs) {
- kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4);
- kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs);
- }
- kvs->kvs[kvs->num_kvs].key = key;
- kvs->kvs[kvs->num_kvs].value = value;
- kvs->num_kvs++;
-}
-
-static void collect_kvs(gpr_avl_node *node, char *key(void *k),
- char *fmt(void *v), kv_pairs *kvs) {
- if (node == NULL) return;
- append_kv(kvs, key(node->key), fmt(node->value));
- collect_kvs(node->left, key, fmt, kvs);
- collect_kvs(node->right, key, fmt, kvs);
-}
-
-static char *key_int(void *p) {
- return gpr_strdup(error_int_name((grpc_error_ints)(uintptr_t)p));
-}
-
-static char *key_str(void *p) {
- return gpr_strdup(error_str_name((grpc_error_strs)(uintptr_t)p));
-}
-
-static char *key_time(void *p) {
- return gpr_strdup(error_time_name((grpc_error_times)(uintptr_t)p));
-}
-
-static char *fmt_int(void *p) {
- char *s;
- gpr_asprintf(&s, "%" PRIdPTR, (intptr_t)p);
- return s;
-}
-
static void append_chr(char c, char **s, size_t *sz, size_t *cap) {
if (*sz == *cap) {
*cap = GPR_MAX(8, 3 * *cap / 2);
@@ -459,6 +544,40 @@ static void append_esc_str(const char *str, char **s, size_t *sz, size_t *cap) {
append_chr('"', s, sz, cap);
}
+static void append_kv(kv_pairs *kvs, char *key, char *value) {
+ if (kvs->num_kvs == kvs->cap_kvs) {
+ kvs->cap_kvs = GPR_MAX(3 * kvs->cap_kvs / 2, 4);
+ kvs->kvs = gpr_realloc(kvs->kvs, sizeof(*kvs->kvs) * kvs->cap_kvs);
+ }
+ kvs->kvs[kvs->num_kvs].key = key;
+ kvs->kvs[kvs->num_kvs].value = value;
+ kvs->num_kvs++;
+}
+
+static char *key_int(grpc_error_ints which) {
+ return gpr_strdup(error_int_name(which));
+}
+
+static char *fmt_int(intptr_t p) {
+ char *s;
+ gpr_asprintf(&s, "%" PRIdPTR, p);
+ return s;
+}
+
+static void collect_ints_kvs(grpc_error *err, kv_pairs *kvs) {
+ for (size_t which = 0; which < GRPC_ERROR_INT_MAX; ++which) {
+ uint8_t slot = err->ints[which];
+ if (slot != UINT8_MAX) {
+ append_kv(kvs, key_int((grpc_error_ints)which),
+ fmt_int(err->arena[slot]));
+ }
+ }
+}
+
+static char *key_str(grpc_error_strs which) {
+ return gpr_strdup(error_str_name(which));
+}
+
static char *fmt_str(void *p) {
char *s = NULL;
size_t sz = 0;
@@ -468,8 +587,22 @@ static char *fmt_str(void *p) {
return s;
}
-static char *fmt_time(void *p) {
- gpr_timespec tm = *(gpr_timespec *)p;
+static void collect_strs_kvs(grpc_error *err, kv_pairs *kvs) {
+ for (size_t which = 0; which < GRPC_ERROR_STR_MAX; ++which) {
+ uint8_t slot = err->strs[which];
+ if (slot != UINT8_MAX) {
+ append_kv(
+ kvs, key_str((grpc_error_strs)which),
+ fmt_str(GRPC_SLICE_START_PTR(*(grpc_slice *)(err->arena + slot))));
+ }
+ }
+}
+
+static char *key_time(grpc_error_times which) {
+ return gpr_strdup(error_time_name(which));
+}
+
+static char *fmt_time(gpr_timespec tm) {
char *out;
char *pfx = "!!";
switch (tm.clock_type) {
@@ -490,24 +623,37 @@ static char *fmt_time(void *p) {
return out;
}
-static void add_errs(gpr_avl_node *n, char **s, size_t *sz, size_t *cap,
- bool *first) {
- if (n == NULL) return;
- add_errs(n->left, s, sz, cap, first);
- if (!*first) append_chr(',', s, sz, cap);
- *first = false;
- const char *e = grpc_error_string(n->value);
- append_str(e, s, sz, cap);
- add_errs(n->right, s, sz, cap, first);
+static void collect_times_kvs(grpc_error *err, kv_pairs *kvs) {
+ for (size_t which = 0; which < GRPC_ERROR_TIME_MAX; ++which) {
+ uint8_t slot = err->times[which];
+ if (slot != UINT8_MAX) {
+ append_kv(kvs, key_time((grpc_error_times)which),
+ fmt_time(*(gpr_timespec *)(err->arena + slot)));
+ }
+ }
+}
+
+static void add_errs(grpc_error *err, char **s, size_t *sz, size_t *cap) {
+ uint8_t slot = err->first_err;
+ bool first = true;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(err->arena + slot);
+ if (!first) append_chr(',', s, sz, cap);
+ first = false;
+ const char *e = grpc_error_string(lerr->err);
+ append_str(e, s, sz, cap);
+ GPR_ASSERT(err->last_err == slot ? lerr->next == UINT8_MAX
+ : lerr->next != UINT8_MAX);
+ slot = lerr->next;
+ }
}
static char *errs_string(grpc_error *err) {
char *s = NULL;
size_t sz = 0;
size_t cap = 0;
- bool first = true;
append_chr('[', &s, &sz, &cap);
- add_errs(err->errs.root, &s, &sz, &cap, &first);
+ add_errs(err, &s, &sz, &cap);
append_chr(']', &s, &sz, &cap);
append_chr(0, &s, &sz, &cap);
return s;
@@ -555,10 +701,10 @@ const char *grpc_error_string(grpc_error *err) {
kv_pairs kvs;
memset(&kvs, 0, sizeof(kvs));
- collect_kvs(err->ints.root, key_int, fmt_int, &kvs);
- collect_kvs(err->strs.root, key_str, fmt_str, &kvs);
- collect_kvs(err->times.root, key_time, fmt_time, &kvs);
- if (!gpr_avl_is_empty(err->errs)) {
+ collect_ints_kvs(err, &kvs);
+ collect_strs_kvs(err, &kvs);
+ collect_times_kvs(err, &kvs);
+ if (err->first_err != UINT8_MAX) {
append_kv(&kvs, gpr_strdup("referenced_errors"), errs_string(err));
}
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 2613512acb..eb953947ae 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -102,6 +102,9 @@ typedef enum {
GRPC_ERROR_INT_LIMIT,
/// chttp2: did the error occur while a write was in progress
GRPC_ERROR_INT_OCCURRED_DURING_WRITE,
+
+ /// Must always be last
+ GRPC_ERROR_INT_MAX,
} grpc_error_ints;
typedef enum {
@@ -129,11 +132,17 @@ typedef enum {
GRPC_ERROR_STR_KEY,
/// value associated with the error
GRPC_ERROR_STR_VALUE,
+
+ /// Must always be last
+ GRPC_ERROR_STR_MAX,
} grpc_error_strs;
typedef enum {
/// timestamp of error creation
GRPC_ERROR_TIME_CREATED,
+
+ /// Must always be last
+ GRPC_ERROR_TIME_MAX,
} grpc_error_times;
/// The following "special" errors can be propagated without allocating memory.
@@ -184,8 +193,6 @@ void grpc_error_unref(grpc_error *err);
grpc_error *grpc_error_set_int(grpc_error *src, grpc_error_ints which,
intptr_t value) GRPC_MUST_USE_RESULT;
bool grpc_error_get_int(grpc_error *error, grpc_error_ints which, intptr_t *p);
-grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which,
- gpr_timespec value) GRPC_MUST_USE_RESULT;
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value) GRPC_MUST_USE_RESULT;
/// Returns NULL if the specified string is not set.
diff --git a/src/core/lib/iomgr/error_internal.h b/src/core/lib/iomgr/error_internal.h
index 1c89ead4ed..fb4814e41f 100644
--- a/src/core/lib/iomgr/error_internal.h
+++ b/src/core/lib/iomgr/error_internal.h
@@ -35,18 +35,28 @@
#define GRPC_CORE_LIB_IOMGR_ERROR_INTERNAL_H
#include <inttypes.h>
-#include <stdbool.h>
+#include <stdbool.h> // TODO, do we need this?
-#include <grpc/support/avl.h>
+#include <grpc/support/sync.h>
+
+typedef struct grpc_linked_error grpc_linked_error;
+
+struct grpc_linked_error {
+ grpc_error *err;
+ uint8_t next;
+};
struct grpc_error {
gpr_refcount refs;
- gpr_avl ints;
- gpr_avl strs;
- gpr_avl times;
- gpr_avl errs;
- uintptr_t next_err;
+ uint8_t ints[GRPC_ERROR_INT_MAX];
+ uint8_t strs[GRPC_ERROR_STR_MAX];
+ uint8_t times[GRPC_ERROR_TIME_MAX];
+ uint8_t first_err;
+ uint8_t last_err;
gpr_atm error_string;
+ uint8_t arena_size;
+ uint8_t arena_capacity;
+ intptr_t arena[0];
};
bool grpc_error_is_special(grpc_error *err);
diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c
index ffa62cb53c..9d2732666b 100644
--- a/src/core/lib/iomgr/sockaddr_utils.c
+++ b/src/core/lib/iomgr/sockaddr_utils.c
@@ -162,6 +162,7 @@ int grpc_sockaddr_to_string(char **out,
char ntop_buf[INET6_ADDRSTRLEN];
const void *ip = NULL;
int port;
+ uint32_t sin6_scope_id = 0;
int ret;
*out = NULL;
@@ -177,10 +178,19 @@ int grpc_sockaddr_to_string(char **out,
const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr;
ip = &addr6->sin6_addr;
port = ntohs(addr6->sin6_port);
+ sin6_scope_id = addr6->sin6_scope_id;
}
if (ip != NULL &&
grpc_inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) {
- ret = gpr_join_host_port(out, ntop_buf, port);
+ if (sin6_scope_id != 0) {
+ char *host_with_scope;
+ /* Enclose sin6_scope_id with the format defined in RFC 6784 section 2. */
+ gpr_asprintf(&host_with_scope, "%s%%25%" PRIu32, ntop_buf, sin6_scope_id);
+ ret = gpr_join_host_port(out, host_with_scope, port);
+ gpr_free(host_with_scope);
+ } else {
+ ret = gpr_join_host_port(out, ntop_buf, port);
+ }
} else {
ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family);
}
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 36f878fdd4..5f286a6723 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -114,6 +114,8 @@ struct grpc_tcp_server {
/* is this server shutting down? */
bool shutdown;
+ /* have listeners been shutdown? */
+ bool shutdown_listeners;
/* use SO_REUSEPORT */
bool so_reuseport;
/* expand wildcard addresses to a list of all local addresses */
@@ -161,7 +163,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
grpc_tcp_server **server) {
gpr_once_init(&check_init, init);
- grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
+ grpc_tcp_server *s = gpr_zalloc(sizeof(grpc_tcp_server));
s->so_reuseport = has_so_reuseport;
s->resource_quota = grpc_resource_quota_create(NULL);
s->expand_wildcard_addrs = false;
@@ -422,7 +424,14 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
return;
default:
- gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
+ gpr_mu_lock(&sp->server->mu);
+ if (!sp->server->shutdown_listeners) {
+ gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
+ } else {
+ /* if we have shutdown listeners, accept4 could fail, and we
+ needn't notify users */
+ }
+ gpr_mu_unlock(&sp->server->mu);
goto error;
}
}
@@ -438,11 +447,6 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) {
grpc_fd *fdobj = grpc_fd_create(fd, name);
- if (read_notifier_pollset == NULL) {
- gpr_log(GPR_ERROR, "Read notifier pollset is not set on the fd");
- goto error;
- }
-
grpc_pollset_add_fd(exec_ctx, read_notifier_pollset, fdobj);
// Create acceptor.
@@ -941,6 +945,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
gpr_mu_lock(&s->mu);
+ s->shutdown_listeners = true;
/* shutdown all fd's */
if (s->active_ports) {
grpc_tcp_listener *sp;
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 2a1c8d39fa..d1bcd89af1 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -485,7 +485,11 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);
- s->active_ports++;
+ /* Registered for both read and write callbacks: increment active_ports
+ * twice to account for this, and delay free-ing of memory until both
+ * on_read and on_write have fired. */
+ s->active_ports += 2;
+
sp = sp->next;
}
diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h
index 71d32d97ba..c8dd242c75 100644
--- a/src/core/lib/iomgr/wakeup_fd_posix.h
+++ b/src/core/lib/iomgr/wakeup_fd_posix.h
@@ -46,7 +46,7 @@
*
* Setup:
* 1. Before calling anything, call global_init() at least once.
- * 1. Call grpc_wakeup_fd_create() to get a wakeup_fd.
+ * 1. Call grpc_wakeup_fd_init() to set up a wakeup_fd.
* 2. Add the result of GRPC_WAKEUP_FD_FD to the set of monitored file
* descriptors for the poll() style API you are using. Monitor the file
* descriptor for readability.
diff --git a/src/core/lib/support/sync.c b/src/core/lib/support/sync.c
index e4a7fce646..b52f004f74 100644
--- a/src/core/lib/support/sync.c
+++ b/src/core/lib/support/sync.c
@@ -119,6 +119,10 @@ int gpr_unref(gpr_refcount *r) {
return prior == 1;
}
+int gpr_ref_is_unique(gpr_refcount *r) {
+ return gpr_atm_acq_load(&r->count) == 1;
+}
+
void gpr_stats_init(gpr_stats_counter *c, intptr_t n) {
gpr_atm_rel_store(&c->value, n);
}
diff --git a/src/core/lib/transport/error_utils.c b/src/core/lib/transport/error_utils.c
index da77828d9c..ef55e561fb 100644
--- a/src/core/lib/transport/error_utils.c
+++ b/src/core/lib/transport/error_utils.c
@@ -44,12 +44,12 @@ static grpc_error *recursively_find_error_with_field(grpc_error *error,
}
if (grpc_error_is_special(error)) return NULL;
// Otherwise, search through its children.
- intptr_t key = 0;
- while (true) {
- grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++);
- if (child_error == NULL) break;
- grpc_error *result = recursively_find_error_with_field(child_error, which);
- if (result != NULL) return result;
+ uint8_t slot = error->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot);
+ grpc_error *result = recursively_find_error_with_field(lerr->err, which);
+ if (result) return result;
+ slot = lerr->next;
}
return NULL;
}
@@ -112,13 +112,13 @@ bool grpc_error_has_clear_grpc_status(grpc_error *error) {
if (grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, NULL)) {
return true;
}
- intptr_t key = 0;
- while (true) {
- grpc_error *child_error = gpr_avl_get(error->errs, (void *)key++);
- if (child_error == NULL) break;
- if (grpc_error_has_clear_grpc_status(child_error)) {
+ uint8_t slot = error->first_err;
+ while (slot != UINT8_MAX) {
+ grpc_linked_error *lerr = (grpc_linked_error *)(error->arena + slot);
+ if (grpc_error_has_clear_grpc_status(lerr->err)) {
return true;
}
+ slot = lerr->next;
}
return false;
}