diff options
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/connector.h | 2 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 30 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.h | 9 | ||||
-rw-r--r-- | src/core/client_config/lb_policy_factory.c | 47 | ||||
-rw-r--r-- | src/core/client_config/lb_policy_factory.h | 73 | ||||
-rw-r--r-- | src/core/client_config/lb_policy_registry.c | 88 | ||||
-rw-r--r-- | src/core/client_config/lb_policy_registry.h | 54 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 23 | ||||
-rw-r--r-- | src/core/client_config/resolvers/sockaddr_resolver.c | 41 | ||||
-rw-r--r-- | src/core/client_config/resolvers/zookeeper_resolver.c | 29 | ||||
-rw-r--r-- | src/core/client_config/uri_parser.c | 152 | ||||
-rw-r--r-- | src/core/client_config/uri_parser.h | 2 |
12 files changed, 465 insertions, 85 deletions
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index edcb10a36e..39f3467990 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -50,7 +50,7 @@ typedef struct { grpc_pollset_set *interested_parties; /** address to connect to */ const struct sockaddr *addr; - int addr_len; + size_t addr_len; /** deadline for connection */ gpr_timespec deadline; /** channel arguments (to be passed to transport) */ diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 5ae2e0ea52..c8262e92ef 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -31,6 +31,7 @@ * */ +#include "src/core/client_config/lb_policy_factory.h" #include "src/core/client_config/lb_policies/pick_first.h" #include <string.h> @@ -314,19 +315,34 @@ static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { pf_check_connectivity, pf_notify_on_state_change}; -grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, - size_t num_subchannels) { +static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} + +static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} + +static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args) { pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(num_subchannels); + GPR_ASSERT(args->num_subchannels > 0); memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); - p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_subchannels); - p->num_subchannels = num_subchannels; + p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); + p->num_subchannels = args->num_subchannels; grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); - memcpy(p->subchannels, subchannels, - sizeof(grpc_subchannel *) * num_subchannels); + memcpy(p->subchannels, args->subchannels, + sizeof(grpc_subchannel *) * args->num_subchannels); grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); gpr_mu_init(&p->mu); return &p->base; } + +static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = { + pick_first_factory_ref, pick_first_factory_unref, create_pick_first, + "pick_first"}; + +static grpc_lb_policy_factory pick_first_lb_policy_factory = { + &pick_first_factory_vtable}; + +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() { + return &pick_first_lb_policy_factory; +} diff --git a/src/core/client_config/lb_policies/pick_first.h b/src/core/client_config/lb_policies/pick_first.h index 31394985e5..3ca53ad42a 100644 --- a/src/core/client_config/lb_policies/pick_first.h +++ b/src/core/client_config/lb_policies/pick_first.h @@ -34,11 +34,10 @@ #ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_PICK_FIRST_H #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_PICK_FIRST_H -#include "src/core/client_config/lb_policy.h" +#include "src/core/client_config/lb_policy_factory.h" -/** Returns a load balancing policy instance that picks up the first subchannel - * from \a subchannels to succesfully connect */ -grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels, - size_t num_subchannels); +/** Returns a load balancing factory for the pick first policy, which picks up + * the first subchannel from \a subchannels to succesfully connect */ +grpc_lb_policy_factory *grpc_pick_first_lb_factory_create(); #endif diff --git a/src/core/client_config/lb_policy_factory.c b/src/core/client_config/lb_policy_factory.c new file mode 100644 index 0000000000..0c097e0542 --- /dev/null +++ b/src/core/client_config/lb_policy_factory.c @@ -0,0 +1,47 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/lb_policy_factory.h" + +void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory) { + factory->vtable->ref(factory); +} +void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory) { + factory->vtable->unref(factory); +} + +grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + if (factory == NULL) return NULL; + return factory->vtable->create_lb_policy(factory, args); +} diff --git a/src/core/client_config/lb_policy_factory.h b/src/core/client_config/lb_policy_factory.h new file mode 100644 index 0000000000..04610316ee --- /dev/null +++ b/src/core/client_config/lb_policy_factory.h @@ -0,0 +1,73 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_FACTORY_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_FACTORY_H + +#include "src/core/client_config/lb_policy.h" +#include "src/core/client_config/subchannel.h" + +typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; +typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; + +/** grpc_lb_policy provides grpc_client_config objects to grpc_channel + objects */ +struct grpc_lb_policy_factory { + const grpc_lb_policy_factory_vtable *vtable; +}; + +typedef struct grpc_lb_policy_args { + grpc_subchannel **subchannels; + size_t num_subchannels; +} grpc_lb_policy_args; + +struct grpc_lb_policy_factory_vtable { + void (*ref)(grpc_lb_policy_factory *factory); + void (*unref)(grpc_lb_policy_factory *factory); + + /** Implementation of grpc_lb_policy_factory_create_lb_policy */ + grpc_lb_policy *(*create_lb_policy)(grpc_lb_policy_factory *factory, + grpc_lb_policy_args *args); + + /** Name for the LB policy this factory implements */ + const char *name; +}; + +void grpc_lb_policy_factory_ref(grpc_lb_policy_factory *factory); +void grpc_lb_policy_factory_unref(grpc_lb_policy_factory *factory); + +/** Create a lb_policy instance. */ +grpc_lb_policy *grpc_lb_policy_factory_create_lb_policy( + grpc_lb_policy_factory *factory, grpc_lb_policy_args *args); + +#endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_FACTORY_H */ diff --git a/src/core/client_config/lb_policy_registry.c b/src/core/client_config/lb_policy_registry.c new file mode 100644 index 0000000000..ae4a077ef3 --- /dev/null +++ b/src/core/client_config/lb_policy_registry.c @@ -0,0 +1,88 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/client_config/lb_policy_registry.h" + +#include <string.h> + +#define MAX_POLICIES 10 + +static grpc_lb_policy_factory *g_all_of_the_lb_policies[MAX_POLICIES]; +static int g_number_of_lb_policies = 0; + +static grpc_lb_policy_factory *g_default_lb_policy_factory; + +void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory) { + g_number_of_lb_policies = 0; + g_default_lb_policy_factory = default_factory; +} + +void grpc_lb_policy_registry_shutdown(void) { + int i; + for (i = 0; i < g_number_of_lb_policies; i++) { + grpc_lb_policy_factory_unref(g_all_of_the_lb_policies[i]); + } +} + +void grpc_register_lb_policy(grpc_lb_policy_factory *factory) { + int i; + for (i = 0; i < g_number_of_lb_policies; i++) { + GPR_ASSERT(0 != strcmp(factory->vtable->name, + g_all_of_the_lb_policies[i]->vtable->name)); + } + GPR_ASSERT(g_number_of_lb_policies != MAX_POLICIES); + grpc_lb_policy_factory_ref(factory); + g_all_of_the_lb_policies[g_number_of_lb_policies++] = factory; +} + +static grpc_lb_policy_factory *lookup_factory(const char* name) { + int i; + + if (name == NULL) return NULL; + + for (i = 0; i < g_number_of_lb_policies; i++) { + if (0 == strcmp(name, g_all_of_the_lb_policies[i]->vtable->name)) { + return g_all_of_the_lb_policies[i]; + } + } + + return NULL; +} + +grpc_lb_policy *grpc_lb_policy_create(const char *name, + grpc_lb_policy_args *args) { + grpc_lb_policy_factory *factory = lookup_factory(name); + grpc_lb_policy *lb_policy = grpc_lb_policy_factory_create_lb_policy( + factory, args); + return lb_policy; +} diff --git a/src/core/client_config/lb_policy_registry.h b/src/core/client_config/lb_policy_registry.h new file mode 100644 index 0000000000..96fc2a1628 --- /dev/null +++ b/src/core/client_config/lb_policy_registry.h @@ -0,0 +1,54 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H +#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H + +#include "src/core/client_config/lb_policy_factory.h" + +/** Initialize the registry and set \a default_factory as the factory to be + * returned when no name is provided in a lookup */ +void grpc_lb_policy_registry_init(grpc_lb_policy_factory *default_factory); +void grpc_lb_policy_registry_shutdown(void); + +/** Register a LB policy factory. */ +void grpc_register_lb_policy(grpc_lb_policy_factory *factory); + +/** Create a \a grpc_lb_policy instance. + * + * If \a name is NULL, the default factory from \a grpc_lb_policy_registry_init + * will be returned. */ +grpc_lb_policy *grpc_lb_policy_create(const char *name, + grpc_lb_policy_args *args); + +#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_LB_POLICY_REGISTRY_H */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 84643c464a..ccec07a08c 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -39,7 +39,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> -#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_registry.h" #include "src/core/client_config/subchannel_factory_decorators/add_channel_arg.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -55,9 +55,8 @@ typedef struct { char *default_port; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); + /** load balancing policy name */ + char *lb_policy_name; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -135,16 +134,19 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { grpc_lb_policy *lb_policy; size_t i; if (addresses) { + grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); for (i = 0; i < addresses->naddrs; i++) { memset(&args, 0, sizeof(args)); args.addr = (struct sockaddr *)(addresses->addrs[i].addr); - args.addr_len = addresses->addrs[i].len; + args.addr_len = (size_t)addresses->addrs[i].len; subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); @@ -193,13 +195,13 @@ static void dns_destroy(grpc_resolver *gr) { grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->name); gpr_free(r->default_port); + gpr_free(r->lb_policy_name); gpr_free(r); } static grpc_resolver *dns_create( grpc_uri *uri, const char *default_port, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), + const char* lb_policy_name, grpc_subchannel_factory *subchannel_factory) { dns_resolver *r; const char *path = uri->path; @@ -220,7 +222,7 @@ static grpc_resolver *dns_create( r->default_port = gpr_strdup(default_port); r->subchannel_factory = subchannel_factory; grpc_subchannel_factory_ref(subchannel_factory); - r->lb_policy_factory = lb_policy_factory; + r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } @@ -235,8 +237,7 @@ static void dns_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { - return dns_create(uri, "https", grpc_create_pick_first_lb_policy, - subchannel_factory); + return dns_create(uri, "https", "pick_first", subchannel_factory); } char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 0d8540a566..2900df285c 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -45,7 +45,7 @@ #include <grpc/support/host_port.h> #include <grpc/support/string_util.h> -#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_registry.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -56,14 +56,13 @@ typedef struct { gpr_refcount refs; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); + /** load balancing policy name */ + char *lb_policy_name; /** the addresses that we've 'resolved' */ struct sockaddr_storage *addrs; /** the corresponding length of the addresses */ - int *addrs_len; + size_t *addrs_len; /** how many elements in \a addrs */ size_t num_addrs; @@ -122,6 +121,7 @@ static void sockaddr_next(grpc_resolver *resolver, static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; + grpc_lb_policy_args lb_policy_args; grpc_subchannel **subchannels; grpc_subchannel_args args; @@ -136,7 +136,10 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - lb_policy = r->lb_policy_factory(subchannels, r->num_addrs); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = r->num_addrs; + lb_policy = + grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); gpr_free(subchannels); grpc_client_config_set_lb_policy(cfg, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "unix"); @@ -153,11 +156,13 @@ static void sockaddr_destroy(grpc_resolver *gr) { grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->addrs); gpr_free(r->addrs_len); + gpr_free(r->lb_policy_name); gpr_free(r); } #ifdef GPR_POSIX_SOCKET -static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { +static int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { struct sockaddr_un *un = (struct sockaddr_un *)addr; un->sun_family = AF_UNIX; @@ -189,7 +194,8 @@ static char *ipv6_get_default_authority(grpc_resolver_factory *factory, return ip_get_default_authority(uri); } -static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { +static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { const char *host_port = uri->path; char *host; char *port; @@ -216,7 +222,7 @@ static int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { gpr_log(GPR_ERROR, "invalid ipv4 port: '%s'", port); goto done; } - in->sin_port = htons(port_num); + in->sin_port = htons((gpr_uint16)port_num); } else { gpr_log(GPR_ERROR, "no port given for ipv4 scheme"); goto done; @@ -229,7 +235,8 @@ done: return result; } -static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { +static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, + size_t *len) { const char *host_port = uri->path; char *host; char *port; @@ -256,7 +263,7 @@ static int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, int *len) { gpr_log(GPR_ERROR, "invalid ipv6 port: '%s'", port); goto done; } - in6->sin6_port = htons(port_num); + in6->sin6_port = htons((gpr_uint16)port_num); } else { gpr_log(GPR_ERROR, "no port given for ipv6 scheme"); goto done; @@ -271,11 +278,9 @@ done: static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( - grpc_uri *uri, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), + grpc_uri *uri, const char *lb_policy_name, grpc_subchannel_factory *subchannel_factory, - int parse(grpc_uri *uri, struct sockaddr_storage *dst, int *len)) { + int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { size_t i; int errors_found = 0; /* GPR_FALSE */ sockaddr_resolver *r; @@ -296,7 +301,7 @@ static grpc_resolver *sockaddr_create( gpr_slice_split(path_slice, ",", &path_parts); r->num_addrs = path_parts.count; r->addrs = gpr_malloc(sizeof(struct sockaddr_storage) * r->num_addrs); - r->addrs_len = gpr_malloc(sizeof(int) * r->num_addrs); + r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs); for(i = 0; i < r->num_addrs; i++) { grpc_uri ith_uri = *uri; @@ -320,7 +325,7 @@ static grpc_resolver *sockaddr_create( gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); r->subchannel_factory = subchannel_factory; - r->lb_policy_factory = lb_policy_factory; + r->lb_policy_name = gpr_strdup(lb_policy_name); grpc_subchannel_factory_ref(subchannel_factory); return &r->base; @@ -338,7 +343,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *name##_factory_create_resolver( \ grpc_resolver_factory *factory, grpc_uri *uri, \ grpc_subchannel_factory *subchannel_factory) { \ - return sockaddr_create(uri, grpc_create_pick_first_lb_policy, \ + return sockaddr_create(uri, "pick_first", \ subchannel_factory, parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index da399f9954..2594e6fae9 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -41,7 +41,7 @@ #include <grpc/grpc_zookeeper.h> #include <zookeeper/zookeeper.h> -#include "src/core/client_config/lb_policies/pick_first.h" +#include "src/core/client_config/lb_policy_registry.h" #include "src/core/client_config/resolver_registry.h" #include "src/core/iomgr/resolve_address.h" #include "src/core/support/string.h" @@ -59,9 +59,8 @@ typedef struct { char *name; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** load balancing policy factory */ - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels); + /** load balancing policy name */ + char *lb_policy_name; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -183,6 +182,7 @@ static void zookeeper_on_resolved(void *arg, grpc_lb_policy *lb_policy; size_t i; if (addresses != NULL) { + grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); for (i = 0; i < addresses->naddrs; i++) { @@ -192,7 +192,10 @@ static void zookeeper_on_resolved(void *arg, subchannels[i] = grpc_subchannel_factory_create_subchannel( r->subchannel_factory, &args); } - lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); + lb_policy_args.subchannels = subchannels; + lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy = + grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "construction"); grpc_resolved_addresses_destroy(addresses); @@ -244,7 +247,7 @@ static void zookeeper_dns_resolved(void *arg, } /** Parses JSON format address of a zookeeper node */ -static char *zookeeper_parse_address(const char *value, int value_len) { +static char *zookeeper_parse_address(const char *value, size_t value_len) { grpc_json *json; grpc_json *cur; const char *host; @@ -294,7 +297,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, return; } - address = zookeeper_parse_address(value, value_len); + address = zookeeper_parse_address(value, (size_t)value_len); if (address != NULL) { /** Further resolves address by DNS */ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); @@ -364,7 +367,7 @@ static void zookeeper_get_node_completion(int rc, const char *value, /** If zookeeper node of path r->name does not have address (i.e. service node), get its children */ - address = zookeeper_parse_address(value, value_len); + address = zookeeper_parse_address(value, (size_t)value_len); if (address != NULL) { r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); r->resolved_addrs->addrs = NULL; @@ -420,13 +423,12 @@ static void zookeeper_destroy(grpc_resolver *gr) { } grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r->name); + gpr_free(r->lb_policy_name); gpr_free(r); } static grpc_resolver *zookeeper_create( - grpc_uri *uri, - grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, - size_t num_subchannels), + grpc_uri *uri, const char *lb_policy_name, grpc_subchannel_factory *subchannel_factory) { zookeeper_resolver *r; size_t length; @@ -451,7 +453,7 @@ static grpc_resolver *zookeeper_create( r->name = gpr_strdup(path); r->subchannel_factory = subchannel_factory; - r->lb_policy_factory = lb_policy_factory; + r->lb_policy_name = gpr_strdup(lb_policy_name); grpc_subchannel_factory_ref(subchannel_factory); /** Initializes zookeeper client */ @@ -490,8 +492,7 @@ static char *zookeeper_factory_get_default_hostname( static grpc_resolver *zookeeper_factory_create_resolver( grpc_resolver_factory *factory, grpc_uri *uri, grpc_subchannel_factory *subchannel_factory) { - return zookeeper_create(uri, grpc_create_pick_first_lb_policy, - subchannel_factory); + return zookeeper_create(uri, "pick_first", subchannel_factory); } static const grpc_resolver_factory_vtable zookeeper_factory_vtable = { diff --git a/src/core/client_config/uri_parser.c b/src/core/client_config/uri_parser.c index 410a61c8cf..2738e2df57 100644 --- a/src/core/client_config/uri_parser.c +++ b/src/core/client_config/uri_parser.c @@ -39,10 +39,13 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, +/** a size_t default value... maps to all 1's */ +#define NOT_SET (~(size_t)0) + +static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section, int suppress_errors) { char *line_prefix; - int pfx_len; + size_t pfx_len; if (!suppress_errors) { gpr_asprintf(&line_prefix, "bad uri.%s: '", section); @@ -60,22 +63,90 @@ static grpc_uri *bad_uri(const char *uri_text, int pos, const char *section, return NULL; } -static char *copy_fragment(const char *src, int begin, int end) { +/** Returns a copy of \a src[begin, end) */ +static char *copy_component(const char *src, size_t begin, size_t end) { char *out = gpr_malloc(end - begin + 1); memcpy(out, src + begin, end - begin); out[end - begin] = 0; return out; } +/** Returns how many chars to advance if \a uri_text[i] begins a valid \a pchar + * production. If \a uri_text[i] introduces an invalid \a pchar (such as percent + * sign not followed by two hex digits), NOT_SET is returned. */ +static size_t parse_pchar(const char *uri_text, size_t i) { + /* pchar = unreserved / pct-encoded / sub-delims / ":" / "@" + * unreserved = ALPHA / DIGIT / "-" / "." / "_" / "~" + * pct-encoded = "%" HEXDIG HEXDIG + * sub-delims = "!" / "$" / "&" / "'" / "(" / ")" + / "*" / "+" / "," / ";" / "=" */ + char c = uri_text[i]; + if (((c >= 'A') && (c <= 'Z')) || ((c >= 'a') && (c <= 'z')) || + ((c >= '0') && (c <= '9')) || + (c == '-' || c == '.' || c == '_' || c == '~') || /* unreserved */ + + (c == '!' || c == '$' || c == '&' || c == '\'' || c == '$' || c == '&' || + c == '(' || c == ')' || c == '*' || c == '+' || c == ',' || c == ';' || + c == '=') /* sub-delims */) { + return 1; + } + if (c == '%') { /* pct-encoded */ + size_t j; + if (uri_text[i + 1] == 0 || uri_text[i + 2] == 0) { + return NOT_SET; + } + for (j = i + 1; j < 2; j++) { + c = uri_text[j]; + if (!(((c >= '0') && (c <= '9')) || ((c >= 'a') && (c <= 'f')) || + ((c >= 'A') && (c <= 'F')))) { + return NOT_SET; + } + } + return 2; + } + return 0; +} + +/* *( pchar / "?" / "/" ) */ +static int parse_fragment_or_query(const char *uri_text, size_t *i) { + char c; + while ((c = uri_text[*i]) != 0) { + const size_t advance = parse_pchar(uri_text, *i); /* pchar */ + switch (advance) { + case 0: /* uri_text[i] isn't in pchar */ + /* maybe it's ? or / */ + if (uri_text[*i] == '?' || uri_text[*i] == '/') { + (*i)++; + break; + } else { + return 1; + } + gpr_log(GPR_ERROR, "should never reach here"); + abort(); + default: + (*i) += advance; + break; + case NOT_SET: /* uri_text[i] introduces an invalid URI */ + return 0; + } + } + /* *i is the first uri_text position past the \a query production, maybe \0 */ + return 1; +} + grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { grpc_uri *uri; - int scheme_begin = 0; - int scheme_end = -1; - int authority_begin = -1; - int authority_end = -1; - int path_begin = -1; - int path_end = -1; - int i; + size_t scheme_begin = 0; + size_t scheme_end = NOT_SET; + size_t authority_begin = NOT_SET; + size_t authority_end = NOT_SET; + size_t path_begin = NOT_SET; + size_t path_end = NOT_SET; + size_t query_begin = NOT_SET; + size_t query_end = NOT_SET; + size_t fragment_begin = NOT_SET; + size_t fragment_end = NOT_SET; + size_t i; for (i = scheme_begin; uri_text[i] != 0; i++) { if (uri_text[i] == ':') { @@ -92,27 +163,22 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { } break; } - if (scheme_end == -1) { + if (scheme_end == NOT_SET) { return bad_uri(uri_text, i, "scheme", suppress_errors); } if (uri_text[scheme_end + 1] == '/' && uri_text[scheme_end + 2] == '/') { authority_begin = scheme_end + 3; - for (i = authority_begin; uri_text[i] != 0 && authority_end == -1; i++) { - if (uri_text[i] == '/') { + for (i = authority_begin; uri_text[i] != 0 && authority_end == NOT_SET; + i++) { + if (uri_text[i] == '/' || uri_text[i] == '?' || uri_text[i] == '#') { authority_end = i; } - if (uri_text[i] == '?') { - return bad_uri(uri_text, i, "query_not_supported", suppress_errors); - } - if (uri_text[i] == '#') { - return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors); - } } - if (authority_end == -1 && uri_text[i] == 0) { + if (authority_end == NOT_SET && uri_text[i] == 0) { authority_end = i; } - if (authority_end == -1) { + if (authority_end == NOT_SET) { return bad_uri(uri_text, i, "authority", suppress_errors); } /* TODO(ctiller): parse the authority correctly */ @@ -122,20 +188,46 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { } for (i = path_begin; uri_text[i] != 0; i++) { - if (uri_text[i] == '?') { - return bad_uri(uri_text, i, "query_not_supported", suppress_errors); + if (uri_text[i] == '?' || uri_text[i] == '#') { + path_end = i; + break; } - if (uri_text[i] == '#') { - return bad_uri(uri_text, i, "fragment_not_supported", suppress_errors); + } + if (path_end == NOT_SET && uri_text[i] == 0) { + path_end = i; + } + if (path_end == NOT_SET) { + return bad_uri(uri_text, i, "path", suppress_errors); + } + + if (uri_text[i] == '?') { + query_begin = ++i; + if (!parse_fragment_or_query(uri_text, &i)) { + return bad_uri(uri_text, i, "query", suppress_errors); + } else if (uri_text[i] != 0 && uri_text[i] != '#') { + /* We must be at the end or at the beginning of a fragment */ + return bad_uri(uri_text, i, "query", suppress_errors); + } + query_end = i; + } + if (uri_text[i] == '#') { + fragment_begin = ++i; + if (!parse_fragment_or_query(uri_text, &i)) { + return bad_uri(uri_text, i - fragment_end, "fragment", suppress_errors); + } else if (uri_text[i] != 0) { + /* We must be at the end */ + return bad_uri(uri_text, i, "fragment", suppress_errors); } + fragment_end = i; } - path_end = i; uri = gpr_malloc(sizeof(*uri)); memset(uri, 0, sizeof(*uri)); - uri->scheme = copy_fragment(uri_text, scheme_begin, scheme_end); - uri->authority = copy_fragment(uri_text, authority_begin, authority_end); - uri->path = copy_fragment(uri_text, path_begin, path_end); + uri->scheme = copy_component(uri_text, scheme_begin, scheme_end); + uri->authority = copy_component(uri_text, authority_begin, authority_end); + uri->path = copy_component(uri_text, path_begin, path_end); + uri->query = copy_component(uri_text, query_begin, query_end); + uri->fragment = copy_component(uri_text, fragment_begin, fragment_end); return uri; } @@ -145,5 +237,7 @@ void grpc_uri_destroy(grpc_uri *uri) { gpr_free(uri->scheme); gpr_free(uri->authority); gpr_free(uri->path); + gpr_free(uri->query); + gpr_free(uri->fragment); gpr_free(uri); } diff --git a/src/core/client_config/uri_parser.h b/src/core/client_config/uri_parser.h index ce4e6aecb0..b8daa13bd4 100644 --- a/src/core/client_config/uri_parser.h +++ b/src/core/client_config/uri_parser.h @@ -38,6 +38,8 @@ typedef struct { char *scheme; char *authority; char *path; + char *query; + char *fragment; } grpc_uri; /** parse a uri, return NULL on failure */ |