diff options
author | Craig Tiller <ctiller@google.com> | 2016-04-26 13:00:44 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-04-26 13:00:44 -0700 |
commit | 89c5cb596df9f3bbbf1d024fb59df2b799c85fba (patch) | |
tree | bb9ae905ada472eb788c485b09dd8db214fb227d /src/core/ext | |
parent | 6c8ae9aad5f1aaaf9c043817e184ec26bee75c86 (diff) | |
parent | c3d869ef5853c4cfad57b7d3694d5260eeb7ce75 (diff) |
Merge github.com:grpc/grpc into split-me-baby-one-more-time
Diffstat (limited to 'src/core/ext')
22 files changed, 466 insertions, 157 deletions
diff --git a/src/core/ext/census/grpc_plugin.c b/src/core/ext/census/grpc_plugin.c index 0f15ecb2c2..e43ceafd0c 100644 --- a/src/core/ext/census/grpc_plugin.c +++ b/src/core/ext/census/grpc_plugin.c @@ -32,6 +32,7 @@ */ #include <limits.h> +#include <string.h> #include <grpc/census.h> @@ -39,13 +40,24 @@ #include "src/core/lib/channel/channel_stack_builder.h" #include "src/core/lib/surface/channel_init.h" +static bool is_census_enabled(const grpc_channel_args *a) { + size_t i; + if (a == NULL) return 0; + for (i = 0; i < a->num_args; i++) { + if (0 == strcmp(a->args[i].key, GRPC_ARG_ENABLE_CENSUS)) { + return a->args[i].value.integer != 0 && census_enabled(); + } + } + return census_enabled(); +} + static bool maybe_add_census_filter(grpc_channel_stack_builder *builder, - void *arg_must_be_null) { + void *arg) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); - if (grpc_channel_args_is_census_enabled(args)) { + if (is_census_enabled(args)) { return grpc_channel_stack_builder_prepend_filter( - builder, &grpc_client_census_filter, NULL, NULL); + builder, (const grpc_channel_filter *)arg, NULL, NULL); } return true; } @@ -60,9 +72,11 @@ void census_grpc_plugin_init(void) { } } grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, - maybe_add_census_filter, NULL); + maybe_add_census_filter, + (void *)&grpc_client_census_filter); grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX, - maybe_add_census_filter, NULL); + maybe_add_census_filter, + (void *)&grpc_server_census_filter); } void census_grpc_plugin_shutdown(void) { census_shutdown(); } diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index 922d4413fd..9b5a078aec 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -205,7 +205,11 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&chand->mu_config); old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; - if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) { + if (lb_policy != NULL) { + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, + NULL); + } else if (chand->resolver == NULL /* disconnected */) { + grpc_closure_list_fail_all(&chand->waiting_for_config_closures); grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, NULL); } @@ -293,6 +297,11 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_resolver_shutdown(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; + if (!chand->started_resolving) { + grpc_closure_list_fail_all(&chand->waiting_for_config_closures); + grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures, + NULL); + } if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, chand->lb_policy->interested_parties, @@ -321,10 +330,10 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg, static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, bool success) { continue_picking_args *cpa = arg; - if (!success) { - grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); - } else if (cpa->connected_subchannel == NULL) { + if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ + } else if (!success) { + grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, false, NULL); } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready)) { @@ -381,14 +390,19 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, &chand->incoming_configuration, &chand->on_config_changed); } - cpa = gpr_malloc(sizeof(*cpa)); - cpa->initial_metadata = initial_metadata; - cpa->initial_metadata_flags = initial_metadata_flags; - cpa->connected_subchannel = connected_subchannel; - cpa->on_ready = on_ready; - cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa); - grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1); + if (chand->resolver != NULL) { + cpa = gpr_malloc(sizeof(*cpa)); + cpa->initial_metadata = initial_metadata; + cpa->initial_metadata_flags = initial_metadata_flags; + cpa->connected_subchannel = connected_subchannel; + cpa->on_ready = on_ready; + cpa->elem = elem; + grpc_closure_init(&cpa->closure, continue_picking, cpa); + grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, + 1); + } else { + grpc_exec_ctx_enqueue(exec_ctx, on_ready, false, NULL); + } gpr_mu_unlock(&chand->mu_config); return 0; } diff --git a/src/core/ext/client_config/client_config_plugin.c b/src/core/ext/client_config/client_config_plugin.c new file mode 100644 index 0000000000..5e31613420 --- /dev/null +++ b/src/core/ext/client_config/client_config_plugin.c @@ -0,0 +1,95 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <limits.h> +#include <stdbool.h> +#include <string.h> + +#include <grpc/support/alloc.h> + +#include "src/core/ext/client_config/client_channel.h" +#include "src/core/ext/client_config/lb_policy_registry.h" +#include "src/core/ext/client_config/resolver_registry.h" +#include "src/core/ext/client_config/subchannel_index.h" +#include "src/core/lib/surface/channel_init.h" + +#ifndef GRPC_DEFAULT_NAME_PREFIX +#define GRPC_DEFAULT_NAME_PREFIX "dns:///" +#endif + +static bool append_filter(grpc_channel_stack_builder *builder, void *arg) { + return grpc_channel_stack_builder_append_filter( + builder, (const grpc_channel_filter *)arg, NULL, NULL); +} + +static bool set_default_host_if_unset(grpc_channel_stack_builder *builder, + void *unused) { + const grpc_channel_args *args = + grpc_channel_stack_builder_get_channel_arguments(builder); + for (size_t i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_DEFAULT_AUTHORITY) || + 0 == strcmp(args->args[i].key, GRPC_SSL_TARGET_NAME_OVERRIDE_ARG)) { + return true; + } + } + char *default_authority = grpc_get_default_authority( + grpc_channel_stack_builder_get_target(builder)); + if (default_authority != NULL) { + grpc_arg arg; + arg.type = GRPC_ARG_STRING; + arg.key = GRPC_ARG_DEFAULT_AUTHORITY; + arg.value.string = default_authority; + grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + grpc_channel_stack_builder_set_channel_arguments(builder, new_args); + gpr_free(default_authority); + grpc_channel_args_destroy(new_args); + } + return true; +} + +void grpc_client_config_init(void) { + grpc_lb_policy_registry_init(); + grpc_resolver_registry_init(GRPC_DEFAULT_NAME_PREFIX); + grpc_subchannel_index_init(); + grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MIN, + set_default_host_if_unset, NULL); + grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX, append_filter, + (void *)&grpc_client_channel_filter); +} + +void grpc_client_config_shutdown(void) { + grpc_subchannel_index_shutdown(); + grpc_channel_init_shutdown(); + grpc_resolver_registry_shutdown(); + grpc_lb_policy_registry_shutdown(); +} diff --git a/src/core/ext/client_config/parse_address.c b/src/core/ext/client_config/parse_address.c new file mode 100644 index 0000000000..8b4abe24a6 --- /dev/null +++ b/src/core/ext/client_config/parse_address.c @@ -0,0 +1,137 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/client_config/parse_address.h" + +#include <stdio.h> +#include <string.h> +#ifdef GPR_HAVE_UNIX_SOCKET +#include <sys/un.h> +#endif + +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#ifdef GPR_HAVE_UNIX_SOCKET +int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) { + struct sockaddr_un *un = (struct sockaddr_un *)addr; + + un->sun_family = AF_UNIX; + strcpy(un->sun_path, uri->path); + *len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; + + return 1; +} +#endif + +int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) { + const char *host_port = uri->path; + char *host; + char *port; + int port_num; + int result = 0; + struct sockaddr_in *in = (struct sockaddr_in *)addr; + + if (*host_port == '/') ++host_port; + if (!gpr_split_host_port(host_port, &host, &port)) { + return 0; + } + + memset(in, 0, sizeof(*in)); + *len = sizeof(*in); + in->sin_family = AF_INET; + if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); + goto done; + } + + if (port != NULL) { + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || + port_num > 65535) { + gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); + goto done; + } + in->sin_port = htons((uint16_t)port_num); + } else { + gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); + goto done; + } + + result = 1; +done: + gpr_free(host); + gpr_free(port); + return result; +} + +int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) { + const char *host_port = uri->path; + char *host; + char *port; + int port_num; + int result = 0; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr; + + if (*host_port == '/') ++host_port; + if (!gpr_split_host_port(host_port, &host, &port)) { + return 0; + } + + memset(in6, 0, sizeof(*in6)); + *len = sizeof(*in6); + in6->sin6_family = AF_INET6; + if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); + goto done; + } + + if (port != NULL) { + if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || + port_num > 65535) { + gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); + goto done; + } + in6->sin6_port = htons((uint16_t)port_num); + } else { + gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); + goto done; + } + + result = 1; +done: + gpr_free(host); + gpr_free(port); + return result; +} diff --git a/src/core/ext/client_config/parse_address.h b/src/core/ext/client_config/parse_address.h new file mode 100644 index 0000000000..74c86f4d93 --- /dev/null +++ b/src/core/ext/client_config/parse_address.h @@ -0,0 +1,56 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_CLIENT_CONFIG_PARSE_ADDRESS_H +#define GRPC_CORE_EXT_CLIENT_CONFIG_PARSE_ADDRESS_H + +#include <stddef.h> + +#include "src/core/ext/client_config/uri_parser.h" +#include "src/core/lib/iomgr/sockaddr.h" + +#ifdef GPR_HAVE_UNIX_SOCKET +/** Populate \a addr and \a len from \a uri, whose path is expected to contain a + * unix socket path. Returns true upon success. */ +int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len); +#endif + +/** Populate /a addr and \a len from \a uri, whose path is expected to contain a + * host:port pair. Returns true upon success. */ +int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len); + +/** Populate /a addr and \a len from \a uri, whose path is expected to contain a + * host:port pair. Returns true upon success. */ +int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len); + +#endif /* GRPC_CORE_EXT_CLIENT_CONFIG_PARSE_ADDRESS_H */ diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index ef937ae942..3a5af9f53d 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -135,8 +135,6 @@ struct grpc_subchannel { int have_alarm; /** our alarm */ grpc_timer alarm; - /** current random value */ - uint32_t random; }; struct grpc_subchannel_call { @@ -297,10 +295,6 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, } } -static uint32_t random_seed() { - return (uint32_t)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC))); -} - grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_connector *connector, grpc_subchannel_args *args) { @@ -332,7 +326,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, grpc_set_initial_connect_string(&c->addr, &c->addr_len, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); - c->random = random_seed(); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c); @@ -546,9 +539,20 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, state_watcher *sw_subchannel; /* construct channel stack */ - con = grpc_channel_init_create_stack( - exec_ctx, GRPC_CLIENT_SUBCHANNEL, 0, c->connecting_result.channel_args, 1, - connection_destroy, NULL, c->connecting_result.transport); + grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); + grpc_channel_stack_builder_set_channel_arguments( + builder, c->connecting_result.channel_args); + grpc_channel_stack_builder_set_transport(builder, + c->connecting_result.transport); + + if (grpc_channel_init_create_stack(exec_ctx, builder, + GRPC_CLIENT_SUBCHANNEL)) { + con = grpc_channel_stack_builder_finish(exec_ctx, builder, 0, 1, + connection_destroy, NULL); + } else { + grpc_channel_stack_builder_destroy(builder); + abort(); /* TODO(ctiller): what to do here (previously we just crashed) */ + } stk = CHANNEL_STACK_FROM_CONNECTION(con); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -576,7 +580,8 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); c->connecting = 0; - /* setup subchannel watching connected subchannel for changes; subchannel ref + /* setup subchannel watching connected subchannel for changes; subchannel + ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index 3db462b246..9918fbdcb4 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -252,9 +252,9 @@ char *grpc_subchannel_call_holder_get_peer( grpc_exec_ctx *exec_ctx, grpc_subchannel_call_holder *holder) { grpc_subchannel_call *subchannel_call = GET_CALL(holder); - if (subchannel_call) { - return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); - } else { + if (subchannel_call == NULL || subchannel_call == CANCELLED_CALL) { return NULL; + } else { + return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call); } } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 5926f9d70b..0d215cd196 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -109,7 +109,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (selected != NULL) { grpc_connected_subchannel_notify_on_state_change( exec_ctx, selected, NULL, NULL, &p->connectivity_changed); - } else { + } else if (p->num_subchannels > 0) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 2749b0ca01..620ba4e2aa 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -86,7 +86,8 @@ typedef struct { static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_start_resolving_locked(dns_resolver *r); +static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r); static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r); @@ -119,7 +120,7 @@ static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&r->mu); if (!r->resolving) { gpr_backoff_reset(&r->backoff_state); - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } gpr_mu_unlock(&r->mu); } @@ -134,7 +135,7 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, r->target_config = target_config; if (r->resolved_version == 0 && !r->resolving) { gpr_backoff_reset(&r->backoff_state); - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } else { dns_maybe_finish_next_locked(exec_ctx, r); } @@ -149,7 +150,7 @@ static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, r->have_retry_timer = false; if (success) { if (!r->resolving) { - dns_start_resolving_locked(r); + dns_start_resolving_locked(exec_ctx, r); } } gpr_mu_unlock(&r->mu); @@ -201,11 +202,12 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } -static void dns_start_resolving_locked(dns_resolver *r) { +static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, + dns_resolver *r) { GRPC_RESOLVER_REF(&r->base, "dns-resolving"); GPR_ASSERT(!r->resolving); r->resolving = 1; - grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); + grpc_resolve_address(exec_ctx, r->name, r->default_port, dns_on_resolved, r); } static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 1f14b40e18..a4fa9acf22 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -41,6 +41,7 @@ #include <grpc/support/string_util.h> #include "src/core/ext/client_config/lb_policy_registry.h" +#include "src/core/ext/client_config/parse_address.h" #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" @@ -162,87 +163,12 @@ static char *ipv6_get_default_authority(grpc_resolver_factory *factory, return ip_get_default_authority(uri); } -static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, - size_t *len) { - const char *host_port = uri->path; - char *host; - char *port; - int port_num; - int result = 0; - struct sockaddr_in *in = (struct sockaddr_in *)addr; - - if (*host_port == '/') ++host_port; - if (!gpr_split_host_port(host_port, &host, &port)) { - return 0; - } - - memset(in, 0, sizeof(*in)); - *len = sizeof(*in); - in->sin_family = AF_INET; - if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); - goto done; - } - - if (port != NULL) { - if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || - port_num > 65535) { - gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); - goto done; - } - in->sin_port = htons((uint16_t)port_num); - } else { - gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); - goto done; - } - - result = 1; -done: - gpr_free(host); - gpr_free(port); - return result; -} - -static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, - size_t *len) { - const char *host_port = uri->path; - char *host; - char *port; - int port_num; - int result = 0; - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr; - - if (*host_port == '/') ++host_port; - if (!gpr_split_host_port(host_port, &host, &port)) { - return 0; - } - - memset(in6, 0, sizeof(*in6)); - *len = sizeof(*in6); - in6->sin6_family = AF_INET6; - if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); - goto done; - } - - if (port != NULL) { - if (sscanf(port, "%d", &port_num) != 1 || port_num < 0 || - port_num > 65535) { - gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); - goto done; - } - in6->sin6_port = htons((uint16_t)port_num); - } else { - gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); - goto done; - } - - result = 1; -done: - gpr_free(host); - gpr_free(port); - return result; +#ifdef GPR_HAVE_UNIX_SOCKET +char *unix_get_default_authority(grpc_resolver_factory *factory, + grpc_uri *uri) { + return gpr_strdup("localhost"); } +#endif static void do_nothing(void *ignored) {} @@ -334,23 +260,22 @@ static void sockaddr_factory_ref(grpc_resolver_factory *factory) {} static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} -#define DECL_FACTORY(name, prefix) \ +#define DECL_FACTORY(name) \ static grpc_resolver *name##_factory_create_resolver( \ grpc_resolver_factory *factory, grpc_resolver_args *args) { \ - return sockaddr_create(args, "pick_first", prefix##parse_##name); \ + return sockaddr_create(args, "pick_first", parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ sockaddr_factory_ref, sockaddr_factory_unref, \ - name##_factory_create_resolver, prefix##name##_get_default_authority, \ - #name}; \ + name##_factory_create_resolver, name##_get_default_authority, #name}; \ static grpc_resolver_factory name##_resolver_factory = { \ &name##_factory_vtable} #ifdef GPR_HAVE_UNIX_SOCKET -DECL_FACTORY(unix, grpc_); +DECL_FACTORY(unix); #endif -DECL_FACTORY(ipv4, ); -DECL_FACTORY(ipv6, ); +DECL_FACTORY(ipv4); +DECL_FACTORY(ipv6); void grpc_resolver_sockaddr_init(void) { grpc_register_resolver_type(&ipv4_resolver_factory); diff --git a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c index 898632c3cd..deb4b9b1ef 100644 --- a/src/core/ext/resolver/zookeeper/zookeeper_resolver.c +++ b/src/core/ext/resolver/zookeeper/zookeeper_resolver.c @@ -299,7 +299,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, address = zookeeper_parse_address(value, (size_t)value_len); if (address != NULL) { /** Further resolves address by DNS */ - grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r); gpr_free(address); } else { gpr_log(GPR_ERROR, "Error in resolving a child node of %s", r->name); @@ -375,8 +375,10 @@ static void zookeeper_get_node_completion(int rc, const char *value, r->resolved_addrs->naddrs = 0; r->resolved_total = 1; /** Further resolves address by DNS */ - grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resolve_address(&exec_ctx, address, NULL, zookeeper_dns_resolved, r); gpr_free(address); + grpc_exec_ctx_finish(&exec_ctx); return; } diff --git a/src/core/ext/transport/chttp2/transport/alpn.c b/src/core/ext/transport/chttp2/alpn/alpn.c index 4271d08ded..48b0217265 100644 --- a/src/core/ext/transport/chttp2/transport/alpn.c +++ b/src/core/ext/transport/chttp2/alpn/alpn.c @@ -31,7 +31,7 @@ * */ -#include "src/core/ext/transport/chttp2/transport/alpn.h" +#include "src/core/ext/transport/chttp2/alpn/alpn.h" #include <grpc/support/log.h> #include <grpc/support/useful.h> diff --git a/src/core/ext/transport/chttp2/transport/alpn.h b/src/core/ext/transport/chttp2/alpn/alpn.h index 08a6f039f4..1316770f11 100644 --- a/src/core/ext/transport/chttp2/transport/alpn.h +++ b/src/core/ext/transport/chttp2/alpn/alpn.h @@ -31,8 +31,8 @@ * */ -#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_ALPN_H -#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_ALPN_H +#ifndef GRPC_CORE_EXT_TRANSPORT_CHTTP2_ALPN_ALPN_H +#define GRPC_CORE_EXT_TRANSPORT_CHTTP2_ALPN_ALPN_H #include <string.h> @@ -46,4 +46,4 @@ size_t grpc_chttp2_num_alpn_versions(void); * grpc_chttp2_num_alpn_versions()) */ const char *grpc_chttp2_get_alpn_version_index(size_t i); -#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_ALPN_H */ +#endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_ALPN_ALPN_H */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 5484438f0a..c5d3d8d9cc 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -40,7 +40,6 @@ #include <grpc/support/slice.h> #include <grpc/support/slice_buffer.h> -#include "src/core/ext/census/grpc_filter.h" #include "src/core/ext/client_config/client_channel.h" #include "src/core/ext/client_config/resolver_registry.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" @@ -236,5 +235,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target, grpc_exec_ctx_finish(&exec_ctx); - return channel; /* may be NULL */ + return channel != NULL ? channel : grpc_lame_client_channel_create( + target, GRPC_STATUS_INTERNAL, + "Failed to create client channel"); } diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.c b/src/core/ext/transport/chttp2/transport/bin_encoder.c index 71c634e39b..1b43c28be1 100644 --- a/src/core/ext/transport/chttp2/transport/bin_encoder.c +++ b/src/core/ext/transport/chttp2/transport/bin_encoder.c @@ -175,7 +175,7 @@ static void enc_add1(huff_out *out, uint8_t a) { enc_flush_some(out); } -gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input) { +gpr_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(gpr_slice input) { size_t input_length = GPR_SLICE_LENGTH(input); size_t input_triplets = input_length / 3; size_t tail_case = input_length % 3; @@ -194,9 +194,13 @@ gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input) { /* encode full triplets */ for (i = 0; i < input_triplets; i++) { - enc_add2(&out, in[0] >> 2, (uint8_t)((in[0] & 0x3) << 4) | (in[1] >> 4)); - enc_add2(&out, (uint8_t)((in[1] & 0xf) << 2) | (in[2] >> 6), - (uint8_t)(in[2] & 0x3f)); + const uint8_t low_to_high = (uint8_t)((in[0] & 0x3) << 4); + const uint8_t high_to_low = in[1] >> 4; + enc_add2(&out, in[0] >> 2, low_to_high | high_to_low); + + const uint8_t a = (uint8_t)((in[1] & 0xf) << 2); + const uint8_t b = (in[2] >> 6); + enc_add2(&out, a | b, in[2] & 0x3f); in += 3; } @@ -208,12 +212,14 @@ gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input) { enc_add2(&out, in[0] >> 2, (uint8_t)((in[0] & 0x3) << 4)); in += 1; break; - case 2: - enc_add2(&out, in[0] >> 2, - (uint8_t)((in[0] & 0x3) << 4) | (uint8_t)(in[1] >> 4)); + case 2: { + const uint8_t low_to_high = (uint8_t)((in[0] & 0x3) << 4); + const uint8_t high_to_low = in[1] >> 4; + enc_add2(&out, in[0] >> 2, low_to_high | high_to_low); enc_add1(&out, (uint8_t)((in[1] & 0xf) << 2)); in += 2; break; + } } if (out.temp_length) { diff --git a/src/core/ext/transport/chttp2/transport/bin_encoder.h b/src/core/ext/transport/chttp2/transport/bin_encoder.h index 660f114ebc..61ebbafa9a 100644 --- a/src/core/ext/transport/chttp2/transport/bin_encoder.h +++ b/src/core/ext/transport/chttp2/transport/bin_encoder.h @@ -49,6 +49,6 @@ gpr_slice grpc_chttp2_huffman_compress(gpr_slice input); gpr_slice y = grpc_chttp2_huffman_compress(x); gpr_slice_unref(x); return y; */ -gpr_slice grpc_chttp2_base64_encode_and_huffman_compress(gpr_slice input); +gpr_slice grpc_chttp2_base64_encode_and_huffman_compress_impl(gpr_slice input); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_BIN_ENCODER_H */ diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c new file mode 100644 index 0000000000..bd87253ed3 --- /dev/null +++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c @@ -0,0 +1,46 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/ext/transport/chttp2/transport/bin_encoder.h" +#include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/transport/metadata.h" + +void grpc_chttp2_plugin_init(void) { + grpc_chttp2_base64_encode_and_huffman_compress = + grpc_chttp2_base64_encode_and_huffman_compress_impl; + grpc_register_tracer("http", &grpc_http_trace); + grpc_register_tracer("flowctl", &grpc_flowctl_trace); +} + +void grpc_chttp2_plugin_shutdown(void) {} diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8c78f77f2d..24448714a8 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1248,6 +1248,10 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status, NULL); } + if (status != GRPC_STATUS_OK && !stream_global->seen_error) { + stream_global->seen_error = 1; + grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); + } grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, 1); } diff --git a/src/core/ext/transport/chttp2/transport/frame_data.c b/src/core/ext/transport/chttp2/transport/frame_data.c index 9c301d1608..3a6d80e0a3 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.c +++ b/src/core/ext/transport/chttp2/transport/frame_data.c @@ -159,7 +159,10 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( } switch (p->state) { - fh_0: + case GRPC_CHTTP2_DATA_ERROR: + p->state = GRPC_CHTTP2_DATA_ERROR; + return GRPC_CHTTP2_STREAM_ERROR; + fh_0: case GRPC_CHTTP2_DATA_FH_0: stream_parsing->stats.incoming.framing_bytes++; p->frame_type = *cur; @@ -172,6 +175,7 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( break; default: gpr_log(GPR_ERROR, "Bad GRPC frame type 0x%02x", p->frame_type); + p->state = GRPC_CHTTP2_DATA_ERROR; return GRPC_CHTTP2_STREAM_ERROR; } if (++cur == end) { @@ -218,13 +222,11 @@ grpc_chttp2_parse_error grpc_chttp2_data_parser_parse( message_flags, &p->incoming_frames); /* fallthrough */ case GRPC_CHTTP2_DATA_FRAME: + grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, + stream_parsing); if (cur == end) { - grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, - stream_parsing); return GRPC_CHTTP2_PARSE_OK; } - grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, - stream_parsing); uint32_t remaining = (uint32_t)(end - cur); if (remaining == p->frame_size) { stream_parsing->stats.incoming.data_bytes += p->frame_size; diff --git a/src/core/ext/transport/chttp2/transport/frame_data.h b/src/core/ext/transport/chttp2/transport/frame_data.h index 2ff32963d6..af71f483a2 100644 --- a/src/core/ext/transport/chttp2/transport/frame_data.h +++ b/src/core/ext/transport/chttp2/transport/frame_data.h @@ -49,7 +49,8 @@ typedef enum { GRPC_CHTTP2_DATA_FH_2, GRPC_CHTTP2_DATA_FH_3, GRPC_CHTTP2_DATA_FH_4, - GRPC_CHTTP2_DATA_FRAME + GRPC_CHTTP2_DATA_FRAME, + GRPC_CHTTP2_DATA_ERROR } grpc_chttp2_stream_state; typedef struct grpc_chttp2_incoming_byte_stream diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 807cb5c8f4..555027c866 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -49,6 +49,7 @@ #include "src/core/ext/transport/chttp2/transport/hpack_table.h" #include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" #include "src/core/ext/transport/chttp2/transport/varint.h" +#include "src/core/lib/transport/metadata.h" #include "src/core/lib/transport/static_metadata.h" #define HASH_FRAGMENT_1(x) ((x)&255) @@ -182,8 +183,7 @@ static void add_elem(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem) { uint32_t key_hash = elem->key->hash; uint32_t elem_hash = GRPC_MDSTR_KV_HASH(key_hash, elem->value->hash); uint32_t new_index = c->tail_remote_index + c->table_elems + 1; - size_t elem_size = 32 + GPR_SLICE_LENGTH(elem->key->slice) + - GPR_SLICE_LENGTH(elem->value->slice); + size_t elem_size = grpc_mdelem_get_size_in_hpack_table(elem); GPR_ASSERT(elem_size < 65536); @@ -399,8 +399,7 @@ static void hpack_enc(grpc_chttp2_hpack_compressor *c, grpc_mdelem *elem, } /* should this elem be in the table? */ - decoder_space_usage = 32 + GPR_SLICE_LENGTH(elem->key->slice) + - GPR_SLICE_LENGTH(elem->value->slice); + decoder_space_usage = grpc_mdelem_get_size_in_hpack_table(elem); should_add_elem = decoder_space_usage < MAX_DECODER_SPACE_USAGE && c->filter_elems[HASH_FRAGMENT_1(elem_hash)] >= c->filter_elems_sum / ONE_ON_ADD_PROBABILITY; diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index a36d2fc382..687936bfd3 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -638,6 +638,10 @@ static int on_hdr(grpc_chttp2_hpack_parser *p, grpc_mdelem *md, return 0; } } + if (p->on_header == NULL) { + GRPC_MDELEM_UNREF(md); + return 0; + } p->on_header(p->on_header_user_data, md); return 1; } @@ -1382,12 +1386,8 @@ static int parse_value_string_with_literal_key(grpc_chttp2_hpack_parser *p, /* PUBLIC INTERFACE */ -static void on_header_not_set(void *user_data, grpc_mdelem *md) { - GPR_UNREACHABLE_CODE(return ); -} - void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) { - p->on_header = on_header_not_set; + p->on_header = NULL; p->on_header_user_data = NULL; p->state = parse_begin; p->key.str = NULL; @@ -1455,7 +1455,7 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( stream_parsing->received_close = 1; } } - parser->on_header = on_header_not_set; + parser->on_header = NULL; parser->on_header_user_data = NULL; parser->is_boundary = 0xde; parser->is_eof = 0xde; |