diff options
author | Mark D. Roth <roth@google.com> | 2016-10-26 09:12:25 -0700 |
---|---|---|
committer | Mark D. Roth <roth@google.com> | 2016-10-26 09:12:25 -0700 |
commit | fb809b78508b8ab0cca85fab63e8fbcde2540d32 (patch) | |
tree | bdb1e9c208d72e8d627a348de32388f5eea1d816 /src/core | |
parent | b477cebbe77fca069fb2a4bb1e9806e0891ead83 (diff) | |
parent | 4c2e3e7bb9f32d252a664bb83c5dea3ecda65a9b (diff) |
Merge remote-tracking branch 'upstream/master' into resolver_channel_args
Diffstat (limited to 'src/core')
90 files changed, 2657 insertions, 588 deletions
diff --git a/src/core/ext/client_channel/connector.h b/src/core/ext/client_channel/connector.h index e08244b2c0..ed7d5450de 100644 --- a/src/core/ext/client_channel/connector.h +++ b/src/core/ext/client_channel/connector.h @@ -35,7 +35,7 @@ #define GRPC_CORE_EXT_CLIENT_CHANNEL_CONNECTOR_H #include "src/core/lib/channel/channel_stack.h" -#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/transport/transport.h" typedef struct grpc_connector grpc_connector; @@ -49,7 +49,7 @@ typedef struct { /** set of pollsets interested in this connection */ grpc_pollset_set *interested_parties; /** address to connect to */ - const struct sockaddr *addr; + const grpc_resolved_address *addr; size_t addr_len; /** initial connect string to send */ gpr_slice initial_connect_string; diff --git a/src/core/ext/client_channel/default_initial_connect_string.c b/src/core/ext/client_channel/default_initial_connect_string.c index a70da4a84a..0b251372fd 100644 --- a/src/core/ext/client_channel/default_initial_connect_string.c +++ b/src/core/ext/client_channel/default_initial_connect_string.c @@ -32,8 +32,7 @@ */ #include <grpc/support/slice.h> -#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/resolve_address.h" -void grpc_set_default_initial_connect_string(struct sockaddr **addr, - size_t *addr_len, +void grpc_set_default_initial_connect_string(grpc_resolved_address **addr, gpr_slice *initial_str) {} diff --git a/src/core/ext/client_channel/initial_connect_string.c b/src/core/ext/client_channel/initial_connect_string.c index fd8ddb83a8..fb1493d77d 100644 --- a/src/core/ext/client_channel/initial_connect_string.c +++ b/src/core/ext/client_channel/initial_connect_string.c @@ -35,9 +35,8 @@ #include <stddef.h> -extern void grpc_set_default_initial_connect_string(struct sockaddr **addr, - size_t *addr_len, - gpr_slice *initial_str); +extern void grpc_set_default_initial_connect_string( + grpc_resolved_address **addr, gpr_slice *initial_str); static grpc_set_initial_connect_string_func g_set_initial_connect_string_func = grpc_set_default_initial_connect_string; @@ -47,7 +46,7 @@ void grpc_test_set_initial_connect_string_function( g_set_initial_connect_string_func = func; } -void grpc_set_initial_connect_string(struct sockaddr **addr, size_t *addr_len, +void grpc_set_initial_connect_string(grpc_resolved_address **addr, gpr_slice *initial_str) { - g_set_initial_connect_string_func(addr, addr_len, initial_str); + g_set_initial_connect_string_func(addr, initial_str); } diff --git a/src/core/ext/client_channel/initial_connect_string.h b/src/core/ext/client_channel/initial_connect_string.h index 39f6465fc9..68adb0373c 100644 --- a/src/core/ext/client_channel/initial_connect_string.h +++ b/src/core/ext/client_channel/initial_connect_string.h @@ -35,16 +35,16 @@ #define GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H #include <grpc/support/slice.h> -#include "src/core/lib/iomgr/sockaddr.h" -typedef void (*grpc_set_initial_connect_string_func)(struct sockaddr **addr, - size_t *addr_len, - gpr_slice *initial_str); +#include "src/core/lib/iomgr/resolve_address.h" + +typedef void (*grpc_set_initial_connect_string_func)( + grpc_resolved_address **addr, gpr_slice *initial_str); void grpc_test_set_initial_connect_string_function( grpc_set_initial_connect_string_func func); /** Set a string to be sent once connected. Optionally reset addr. */ -void grpc_set_initial_connect_string(struct sockaddr **addr, size_t *addr_len, +void grpc_set_initial_connect_string(grpc_resolved_address **addr, gpr_slice *connect_string); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_INITIAL_CONNECT_STRING_H */ diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c index 31ca44299b..b1d55ad0f5 100644 --- a/src/core/ext/client_channel/parse_address.c +++ b/src/core/ext/client_channel/parse_address.c @@ -32,10 +32,11 @@ */ #include "src/core/ext/client_channel/parse_address.h" +#include "src/core/lib/iomgr/sockaddr.h" #include <stdio.h> #include <string.h> -#ifdef GPR_HAVE_UNIX_SOCKET +#ifdef GRPC_HAVE_UNIX_SOCKET #include <sys/un.h> #endif @@ -44,33 +45,39 @@ #include <grpc/support/log.h> #include <grpc/support/string_util.h> -#ifdef GPR_HAVE_UNIX_SOCKET -int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) { - struct sockaddr_un *un = (struct sockaddr_un *)addr; +#ifdef GRPC_HAVE_UNIX_SOCKET + +int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) { + struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr; un->sun_family = AF_UNIX; strcpy(un->sun_path, uri->path); - *len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; + resolved_addr->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; return 1; } -#endif -int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) { +#else /* GRPC_HAVE_UNIX_SOCKET */ + +int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) { abort(); } + +#endif /* GRPC_HAVE_UNIX_SOCKET */ + +int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr) { const char *host_port = uri->path; char *host; char *port; int port_num; int result = 0; - struct sockaddr_in *in = (struct sockaddr_in *)addr; + struct sockaddr_in *in = (struct sockaddr_in *)resolved_addr->addr; if (*host_port == '/') ++host_port; if (!gpr_split_host_port(host_port, &host, &port)) { return 0; } - memset(in, 0, sizeof(*in)); - *len = sizeof(*in); + memset(resolved_addr, 0, sizeof(grpc_resolved_address)); + resolved_addr->len = sizeof(struct sockaddr_in); in->sin_family = AF_INET; if (inet_pton(AF_INET, host, &in->sin_addr) == 0) { gpr_log(GPR_ERROR, "invalid ipv4 address: '%s'", host); @@ -96,13 +103,13 @@ done: return result; } -int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) { +int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) { const char *host_port = uri->path; char *host; char *port; int port_num; int result = 0; - struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)addr; + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)resolved_addr->addr; if (*host_port == '/') ++host_port; if (!gpr_split_host_port(host_port, &host, &port)) { @@ -110,7 +117,7 @@ int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len) { } memset(in6, 0, sizeof(*in6)); - *len = sizeof(*in6); + resolved_addr->len = sizeof(*in6); in6->sin6_family = AF_INET6; if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); diff --git a/src/core/ext/client_channel/parse_address.h b/src/core/ext/client_channel/parse_address.h index 3aa6c41546..bf99c5298a 100644 --- a/src/core/ext/client_channel/parse_address.h +++ b/src/core/ext/client_channel/parse_address.h @@ -37,20 +37,18 @@ #include <stddef.h> #include "src/core/ext/client_channel/uri_parser.h" -#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/resolve_address.h" -#ifdef GPR_HAVE_UNIX_SOCKET /** Populate \a addr and \a len from \a uri, whose path is expected to contain a * unix socket path. Returns true upon success. */ -int parse_unix(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len); -#endif +int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr); /** Populate /a addr and \a len from \a uri, whose path is expected to contain a * host:port pair. Returns true upon success. */ -int parse_ipv4(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len); +int parse_ipv4(grpc_uri *uri, grpc_resolved_address *resolved_addr); /** Populate /a addr and \a len from \a uri, whose path is expected to contain a * host:port pair. Returns true upon success. */ -int parse_ipv6(grpc_uri *uri, struct sockaddr_storage *addr, size_t *len); +int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_PARSE_ADDRESS_H */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index b35bb79bc7..ab6aede23a 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -95,8 +95,7 @@ struct grpc_subchannel { /** channel arguments */ grpc_channel_args *args; /** address to connect to */ - struct sockaddr *addr; - size_t addr_len; + grpc_resolved_address *addr; grpc_subchannel_key *key; @@ -320,12 +319,11 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } else { c->filters = NULL; } - c->addr = gpr_malloc(args->addr_len); - if (args->addr_len) memcpy(c->addr, args->addr, args->addr_len); + c->addr = gpr_malloc(sizeof(grpc_resolved_address)); + if (args->addr->len) + memcpy(c->addr, args->addr, sizeof(grpc_resolved_address)); c->pollset_set = grpc_pollset_set_create(); - c->addr_len = args->addr_len; - grpc_set_initial_connect_string(&c->addr, &c->addr_len, - &c->initial_connect_string); + grpc_set_initial_connect_string(&c->addr, &c->initial_connect_string); c->args = grpc_channel_args_copy(args->args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; @@ -376,7 +374,6 @@ static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { args.interested_parties = c->pollset_set; args.addr = c->addr; - args.addr_len = c->addr_len; args.deadline = c->next_attempt; args.channel_args = c->args; args.initial_connect_string = c->initial_connect_string; diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 0f34cff14b..5652074074 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -167,8 +167,7 @@ struct grpc_subchannel_args { /** Server name */ const char *server_name; /** Address to connect to */ - struct sockaddr *addr; - size_t addr_len; + grpc_resolved_address *addr; }; /** create a subchannel given a connector */ diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/client_channel/subchannel_index.c index 7c547f8edb..227013a7d7 100644 --- a/src/core/ext/client_channel/subchannel_index.c +++ b/src/core/ext/client_channel/subchannel_index.c @@ -87,10 +87,10 @@ static grpc_subchannel_key *create_key( k->args.filters = NULL; } k->args.server_name = gpr_strdup(args->server_name); - k->args.addr_len = args->addr_len; - k->args.addr = gpr_malloc(args->addr_len); - if (k->args.addr_len > 0) { - memcpy(k->args.addr, args->addr, k->args.addr_len); + k->args.addr = gpr_malloc(sizeof(grpc_resolved_address)); + k->args.addr->len = args->addr->len; + if (k->args.addr->len > 0) { + memcpy(k->args.addr, args->addr, sizeof(grpc_resolved_address)); } k->args.args = copy_channel_args(args->args); return k; @@ -109,14 +109,14 @@ static int subchannel_key_compare(grpc_subchannel_key *a, grpc_subchannel_key *b) { int c = GPR_ICMP(a->connector, b->connector); if (c != 0) return c; - c = GPR_ICMP(a->args.addr_len, b->args.addr_len); + c = GPR_ICMP(a->args.addr->len, b->args.addr->len); if (c != 0) return c; c = GPR_ICMP(a->args.filter_count, b->args.filter_count); if (c != 0) return c; c = strcmp(a->args.server_name, b->args.server_name); if (c != 0) return c; - if (a->args.addr_len) { - c = memcmp(a->args.addr, b->args.addr, a->args.addr_len); + if (a->args.addr->len) { + c = memcmp(a->args.addr->addr, b->args.addr->addr, a->args.addr->len); if (c != 0) return c; } if (a->args.filter_count > 0) { diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index b0ec5633a7..6d73c48cc3 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -96,6 +96,12 @@ * - Implement LB service forwarding (point 2c. in the doc's diagram). */ +/* With the addition of a libuv endpoint, sockaddr.h now includes uv.h when + using that endpoint. Because of various transitive includes in uv.h, + including windows.h on Windows, uv.h must be included before other system + headers. Therefore, sockaddr.h must always be included first */ +#include "src/core/lib/iomgr/sockaddr.h" + #include <errno.h> #include <string.h> @@ -114,7 +120,6 @@ #include "src/core/ext/lb_policy/grpclb/grpclb.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" #include "src/core/lib/channel/channel_args.h" -#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" @@ -412,7 +417,7 @@ static grpc_lb_addresses *process_serverlist( gpr_log(GPR_ERROR, "Missing LB token for backend address '%s'. The empty token will " "be used instead", - grpc_sockaddr_to_uri((struct sockaddr *)&addr.addr)); + grpc_sockaddr_to_uri(&addr)); user_data = GRPC_MDELEM_LB_TOKEN_EMPTY; } @@ -629,14 +634,12 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, } if (addresses->addresses[i].is_balancer) { if (addr_index == 0) { - addr_strs[addr_index++] = grpc_sockaddr_to_uri( - (const struct sockaddr *)&addresses->addresses[i].address.addr); + addr_strs[addr_index++] = + grpc_sockaddr_to_uri(&addresses->addresses[i].address); } else { - GPR_ASSERT( - grpc_sockaddr_to_string( - &addr_strs[addr_index++], - (const struct sockaddr *)&addresses->addresses[i].address.addr, - true) > 0); + GPR_ASSERT(grpc_sockaddr_to_string( + &addr_strs[addr_index++], + &addresses->addresses[i].address, true) > 0); } } } diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index da4341d30d..5d3433df74 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -473,8 +473,7 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, /* server_name will be copied as part of the subchannel creation. This makes * the copying of server_name (a borrowed pointer) OK. */ sc_args.server_name = server_name; - sc_args.addr = (struct sockaddr *)(&addresses->addresses[i].address.addr); - sc_args.addr_len = addresses->addresses[i].address.len; + sc_args.addr = &addresses->addresses[i].address; sc_args.args = args->args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index b2676a7c86..c0743b00e8 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -635,8 +635,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, /* server_name will be copied as part of the subchannel creation. This makes * the copying of server_name (a borrowed pointer) OK. */ sc_args.server_name = server_name; - sc_args.addr = (struct sockaddr *)(&addresses->addresses[i].address.addr); - sc_args.addr_len = addresses->addresses[i].address.len; + sc_args.addr = &addresses->addresses[i].address; sc_args.args = args->args; grpc_subchannel *subchannel = grpc_client_channel_factory_create_subchannel( diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 921e8f8f15..5fec03a8e4 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -151,7 +151,7 @@ static char *ipv6_get_default_authority(grpc_resolver_factory *factory, return ip_get_default_authority(uri); } -#ifdef GPR_HAVE_UNIX_SOCKET +#ifdef GRPC_HAVE_UNIX_SOCKET char *unix_get_default_authority(grpc_resolver_factory *factory, grpc_uri *uri) { return gpr_strdup("localhost"); @@ -162,8 +162,7 @@ static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create(grpc_resolver_args *args, int parse(grpc_uri *uri, - struct sockaddr_storage *dst, - size_t *len)) { + grpc_resolved_address *dst)) { if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme", args->uri->scheme); @@ -182,11 +181,8 @@ static grpc_resolver *sockaddr_create(grpc_resolver_args *args, grpc_uri ith_uri = *args->uri; char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; - if (!parse( - &ith_uri, - (struct sockaddr_storage *)(&addresses->addresses[i].address.addr), - &addresses->addresses[i].address.len)) { - errors_found = true; + if (!parse(&ith_uri, &addresses->addresses[i].address)) { + errors_found = true; /* GPR_TRUE */ } gpr_free(part_str); if (errors_found) break; @@ -231,7 +227,7 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver_factory name##_resolver_factory = { \ &name##_factory_vtable} -#ifdef GPR_HAVE_UNIX_SOCKET +#ifdef GRPC_HAVE_UNIX_SOCKET DECL_FACTORY(unix); #endif DECL_FACTORY(ipv4); @@ -240,7 +236,7 @@ DECL_FACTORY(ipv6); void grpc_resolver_sockaddr_init(void) { grpc_register_resolver_type(&ipv4_resolver_factory); grpc_register_resolver_type(&ipv6_resolver_factory); -#ifdef GPR_HAVE_UNIX_SOCKET +#ifdef GRPC_HAVE_UNIX_SOCKET grpc_register_resolver_type(&unix_resolver_factory); #endif } diff --git a/src/core/ext/transport/chttp2/alpn/alpn.c b/src/core/ext/transport/chttp2/alpn/alpn.c index 48b0217265..55710dc5ae 100644 --- a/src/core/ext/transport/chttp2/alpn/alpn.c +++ b/src/core/ext/transport/chttp2/alpn/alpn.c @@ -36,7 +36,7 @@ #include <grpc/support/useful.h> /* in order of preference */ -static const char *const supported_versions[] = {"h2"}; +static const char *const supported_versions[] = {"grpc-exp", "h2"}; int grpc_chttp2_is_alpn_version_supported(const char *version, size_t size) { size_t i; diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 6dc7316991..372fb7bf4c 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -154,8 +154,7 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, c->tcp = NULL; grpc_closure_init(&c->connected, connected, c); grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp, - args->interested_parties, args->addr, args->addr_len, - args->deadline); + args->interested_parties, args->addr, args->deadline); } static const grpc_connector_vtable connector_vtable = { diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 00eb82e12a..29095af207 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -212,9 +212,9 @@ static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con, GPR_ASSERT(c->connecting_endpoint == NULL); gpr_mu_unlock(&c->mu); grpc_closure_init(&c->connected_closure, connected, c); - grpc_tcp_client_connect( - exec_ctx, &c->connected_closure, &c->newly_connecting_endpoint, - args->interested_parties, args->addr, args->addr_len, args->deadline); + grpc_tcp_client_connect(exec_ctx, &c->connected_closure, + &c->newly_connecting_endpoint, + args->interested_parties, args->addr, args->deadline); } static const grpc_connector_vtable connector_vtable = { diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index f0e07429fa..e4967c2f08 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -148,9 +148,7 @@ int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { const size_t naddrs = resolved->naddrs; errors = gpr_malloc(sizeof(*errors) * naddrs); for (i = 0; i < naddrs; i++) { - errors[i] = grpc_tcp_server_add_port( - tcp, (struct sockaddr *)&resolved->addrs[i].addr, - resolved->addrs[i].len, &port_temp); + errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); if (errors[i] == GRPC_ERROR_NONE) { if (port_num == -1) { port_num = port_temp; diff --git a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c index 563271f4f8..0fab888541 100644 --- a/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c +++ b/src/core/ext/transport/chttp2/server/secure/server_secure_chttp2.c @@ -286,9 +286,7 @@ int grpc_server_add_secure_http2_port(grpc_server *server, const char *addr, errors = gpr_malloc(sizeof(*errors) * resolved->naddrs); for (i = 0; i < resolved->naddrs; i++) { - errors[i] = grpc_tcp_server_add_port( - tcp, (struct sockaddr *)&resolved->addrs[i].addr, - resolved->addrs[i].len, &port_temp); + errors[i] = grpc_tcp_server_add_port(tcp, &resolved->addrs[i], &port_temp); if (errors[i] == GRPC_ERROR_NONE) { if (port_num == -1) { port_num = port_temp; diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 97780d90f2..ecf3aea870 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -853,53 +853,51 @@ static bool contains_non_ok_status(grpc_metadata_batch *batch) { static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, - grpc_chttp2_stream *s); + grpc_chttp2_stream *s) { + s->fetched_send_message_length += + (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); + gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); + if (s->id != 0) { + grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); + } +} static void continue_fetching_send_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s) { - if (s->fetching_send_message == NULL) { - /* Stream was cancelled before message fetch completed */ - abort(); /* TODO(ctiller): what cleanup here? */ - return; - } - if (s->fetched_send_message_length == s->fetching_send_message->length) { - int64_t notify_offset = s->next_message_end_offset; - if (notify_offset <= s->flow_controlled_bytes_written) { - grpc_chttp2_complete_closure_step( - exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, - "fetching_send_message_finished"); - } else { - grpc_chttp2_write_cb *cb = t->write_cb_pool; - if (cb == NULL) { - cb = gpr_malloc(sizeof(*cb)); + for (;;) { + if (s->fetching_send_message == NULL) { + /* Stream was cancelled before message fetch completed */ + abort(); /* TODO(ctiller): what cleanup here? */ + return; /* early out */ + } + if (s->fetched_send_message_length == s->fetching_send_message->length) { + int64_t notify_offset = s->next_message_end_offset; + if (notify_offset <= s->flow_controlled_bytes_written) { + grpc_chttp2_complete_closure_step( + exec_ctx, t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE, + "fetching_send_message_finished"); } else { - t->write_cb_pool = cb->next; + grpc_chttp2_write_cb *cb = t->write_cb_pool; + if (cb == NULL) { + cb = gpr_malloc(sizeof(*cb)); + } else { + t->write_cb_pool = cb->next; + } + cb->call_at_byte = notify_offset; + cb->closure = s->fetching_send_message_finished; + s->fetching_send_message_finished = NULL; + cb->next = s->on_write_finished_cbs; + s->on_write_finished_cbs = cb; } - cb->call_at_byte = notify_offset; - cb->closure = s->fetching_send_message_finished; - s->fetching_send_message_finished = NULL; - cb->next = s->on_write_finished_cbs; - s->on_write_finished_cbs = cb; + s->fetching_send_message = NULL; + return; /* early out */ + } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, + &s->fetching_slice, UINT32_MAX, + &s->complete_fetch)) { + add_fetched_slice_locked(exec_ctx, t, s); } - s->fetching_send_message = NULL; - } else if (grpc_byte_stream_next(exec_ctx, s->fetching_send_message, - &s->fetching_slice, UINT32_MAX, - &s->complete_fetch)) { - add_fetched_slice_locked(exec_ctx, t, s); - } -} - -static void add_fetched_slice_locked(grpc_exec_ctx *exec_ctx, - grpc_chttp2_transport *t, - grpc_chttp2_stream *s) { - s->fetched_send_message_length += - (uint32_t)GPR_SLICE_LENGTH(s->fetching_slice); - gpr_slice_buffer_add(&s->flow_controlled_buffer, s->fetching_slice); - if (s->id != 0) { - grpc_chttp2_become_writable(exec_ctx, t, s, true, "op.send_message"); } - continue_fetching_send_locked(exec_ctx, t, s); } static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, @@ -908,6 +906,7 @@ static void complete_fetch_locked(grpc_exec_ctx *exec_ctx, void *gs, grpc_chttp2_transport *t = s->t; if (error == GRPC_ERROR_NONE) { add_fetched_slice_locked(exec_ctx, t, s); + continue_fetching_send_locked(exec_ctx, t, s); } else { /* TODO(ctiller): what to do here */ abort(); diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 7f3c2d120d..55de1b5990 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -32,7 +32,6 @@ */ #include "src/core/lib/http/httpcli.h" -#include "src/core/lib/iomgr/sockaddr.h" #include <string.h> @@ -126,7 +125,7 @@ static void append_error(internal_request *req, grpc_error *error) { req->overall_error = GRPC_ERROR_CREATE("Failed HTTP/1 client request"); } grpc_resolved_address *addr = &req->addresses->addrs[req->next_address - 1]; - char *addr_text = grpc_sockaddr_to_uri((struct sockaddr *)addr->addr); + char *addr_text = grpc_sockaddr_to_uri(addr); req->overall_error = grpc_error_add_child( req->overall_error, grpc_error_set_str(error, GRPC_ERROR_STR_TARGET_ADDRESS, addr_text)); @@ -224,9 +223,8 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, } addr = &req->addresses->addrs[req->next_address++]; grpc_closure_init(&req->connected, on_connected, req); - grpc_tcp_client_connect( - exec_ctx, &req->connected, &req->ep, req->context->pollset_set, - (struct sockaddr *)&addr->addr, addr->len, req->deadline); + grpc_tcp_client_connect(exec_ctx, &req->connected, &req->ep, + req->context->pollset_set, addr, req->deadline); } static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { diff --git a/src/core/lib/iomgr/endpoint_pair_posix.c b/src/core/lib/iomgr/endpoint_pair_posix.c index e295fb4867..ec2cd782b1 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.c +++ b/src/core/lib/iomgr/endpoint_pair_posix.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/iomgr/endpoint_pair.h" #include "src/core/lib/iomgr/socket_utils_posix.h" diff --git a/src/core/lib/security/credentials/google_default/credentials_windows.c b/src/core/lib/iomgr/endpoint_pair_uv.c index 208b8fd9ad..7941e20388 100644 --- a/src/core/lib/security/credentials/google_default/credentials_windows.c +++ b/src/core/lib/iomgr/endpoint_pair_uv.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,31 +31,23 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINDOWS +#ifdef GRPC_UV -#include "src/core/lib/security/credentials/google_default/google_default_credentials.h" +#include <stdlib.h> -#include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include "src/core/lib/support/env.h" -#include "src/core/lib/support/string.h" +#include "src/core/lib/iomgr/endpoint_pair.h" -char *grpc_get_well_known_google_credentials_file_path_impl(void) { - char *result = NULL; - char *appdata_path = gpr_getenv("APPDATA"); - if (appdata_path == NULL) { - gpr_log(GPR_ERROR, "Could not get APPDATA environment variable."); - return NULL; - } - gpr_asprintf(&result, "%s/%s/%s", appdata_path, - GRPC_GOOGLE_CLOUD_SDK_CONFIG_DIRECTORY, - GRPC_GOOGLE_WELL_KNOWN_CREDENTIALS_FILE); - gpr_free(appdata_path); - return result; +grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, + size_t read_slice_size) { + grpc_endpoint_pair endpoint_pair; + // TODO(mlumish): implement this properly under libuv + GPR_ASSERT(false && + "grpc_iomgr_create_endpoint_pair is not suppoted with libuv"); + return endpoint_pair; } -#endif /* GPR_WINDOWS */ +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/endpoint_pair_windows.c b/src/core/lib/iomgr/endpoint_pair_windows.c index 582704e267..5c78c95492 100644 --- a/src/core/lib/iomgr/endpoint_pair_windows.c +++ b/src/core/lib/iomgr/endpoint_pair_windows.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINSOCK_SOCKET +#ifdef GRPC_WINSOCK_SOCKET #include "src/core/lib/iomgr/endpoint_pair.h" #include "src/core/lib/iomgr/sockaddr_utils.h" diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index e5909d9380..8381f4a63a 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -32,10 +32,10 @@ */ #include <grpc/grpc_posix.h> -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" /* This polling engine is only relevant on linux kernels supporting epoll() */ -#ifdef GPR_LINUX_EPOLL +#ifdef GRPC_LINUX_EPOLL #include "src/core/lib/iomgr/ev_epoll_linux.h" @@ -2049,13 +2049,13 @@ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return &vtable; } -#else /* defined(GPR_LINUX_EPOLL) */ -#if defined(GPR_POSIX_SOCKET) +#else /* defined(GRPC_LINUX_EPOLL) */ +#if defined(GRPC_POSIX_SOCKET) #include "src/core/lib/iomgr/ev_posix.h" -/* If GPR_LINUX_EPOLL is not defined, it means epoll is not available. Return +/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable *grpc_init_epoll_linux(void) { return NULL; } -#endif /* defined(GPR_POSIX_SOCKET) */ +#endif /* defined(GRPC_POSIX_SOCKET) */ void grpc_use_signal(int signum) {} -#endif /* !defined(GPR_LINUX_EPOLL) */ +#endif /* !defined(GRPC_LINUX_EPOLL) */ diff --git a/src/core/lib/iomgr/ev_epoll_linux.h b/src/core/lib/iomgr/ev_epoll_linux.h index 7a494aba19..8fc3ff59a3 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.h +++ b/src/core/lib/iomgr/ev_epoll_linux.h @@ -35,13 +35,14 @@ #define GRPC_CORE_LIB_IOMGR_EV_EPOLL_LINUX_H #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/port.h" const grpc_event_engine_vtable *grpc_init_epoll_linux(void); -#ifdef GPR_LINUX_EPOLL +#ifdef GRPC_LINUX_EPOLL void *grpc_fd_get_polling_island(grpc_fd *fd); void *grpc_pollset_get_polling_island(grpc_pollset *ps); bool grpc_are_polling_islands_equal(void *p, void *q); -#endif /* defined(GPR_LINUX_EPOLL) */ +#endif /* defined(GRPC_LINUX_EPOLL) */ #endif /* GRPC_CORE_LIB_IOMGR_EV_EPOLL_LINUX_H */ diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 1829440a6e..bf51404203 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -42,9 +42,9 @@ * - ev_epoll_posix.{h,c} */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h" @@ -1338,7 +1338,7 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { * pollset_multipoller_with_poll_posix.c */ -#ifndef GPR_LINUX_MULTIPOLL_WITH_EPOLL +#ifndef GRPC_LINUX_MULTIPOLL_WITH_EPOLL typedef struct { /* all polled fds */ @@ -1520,13 +1520,13 @@ static void poll_become_multipoller(grpc_exec_ctx *exec_ctx, } } -#endif /* !GPR_LINUX_MULTIPOLL_WITH_EPOLL */ +#endif /* !GRPC_LINUX_MULTIPOLL_WITH_EPOLL */ /******************************************************************************* * pollset_multipoller_with_epoll_posix.c */ -#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL +#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL #include <errno.h> #include <poll.h> @@ -1839,11 +1839,11 @@ static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx, } } -#else /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ +#else /* GRPC_LINUX_MULTIPOLL_WITH_EPOLL */ static void remove_fd_from_all_epoll_sets(int fd) {} -#endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */ +#endif /* GRPC_LINUX_MULTIPOLL_WITH_EPOLL */ /******************************************************************************* * pollset_set_posix.c @@ -2063,7 +2063,7 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) { -#ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL +#ifdef GRPC_LINUX_MULTIPOLL_WITH_EPOLL platform_become_multipoller = epoll_become_multipoller; #else platform_become_multipoller = poll_become_multipoller; diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 27e966c18c..e1d620cfff 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/iomgr/ev_poll_posix.h" diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 9857b0bce9..ef36ba89b2 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/iomgr/ev_posix.h" @@ -282,4 +282,4 @@ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error); } -#endif // GPR_POSIX_SOCKET +#endif // GRPC_POSIX_SOCKET diff --git a/src/core/lib/iomgr/iocp_windows.c b/src/core/lib/iomgr/iocp_windows.c index 2532e52e48..60ebe43676 100644 --- a/src/core/lib/iomgr/iocp_windows.c +++ b/src/core/lib/iomgr/iocp_windows.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINSOCK_SOCKET +#ifdef GRPC_WINSOCK_SOCKET #include <winsock2.h> @@ -166,4 +166,4 @@ void grpc_iocp_add_socket(grpc_winsocket *socket) { GPR_ASSERT(ret == g_iocp); } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h index 6c82de78ac..c1cfaf302e 100644 --- a/src/core/lib/iomgr/iomgr.h +++ b/src/core/lib/iomgr/iomgr.h @@ -34,6 +34,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_IOMGR_H #define GRPC_CORE_LIB_IOMGR_IOMGR_H +#include "src/core/lib/iomgr/port.h" + /** Initializes the iomgr. */ void grpc_iomgr_init(void); diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c index cede97f4c6..f5ee0c9ee4 100644 --- a/src/core/lib/iomgr/iomgr_posix.c +++ b/src/core/lib/iomgr/iomgr_posix.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/debug/trace.h" #include "src/core/lib/iomgr/ev_posix.h" diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c new file mode 100644 index 0000000000..96516ff167 --- /dev/null +++ b/src/core/lib/iomgr/iomgr_uv.c @@ -0,0 +1,49 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV + +#include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/pollset_uv.h" +#include "src/core/lib/iomgr/tcp_uv.h" + +void grpc_iomgr_platform_init(void) { + grpc_pollset_global_init(); + grpc_register_tracer("tcp", &grpc_tcp_trace); +} +void grpc_iomgr_platform_flush(void) {} +void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); } + +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/iomgr_windows.c b/src/core/lib/iomgr/iomgr_windows.c index 7653f6e635..b659264ede 100644 --- a/src/core/lib/iomgr/iomgr_windows.c +++ b/src/core/lib/iomgr/iomgr_windows.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINSOCK_SOCKET +#ifdef GRPC_WINSOCK_SOCKET #include "src/core/lib/iomgr/sockaddr_windows.h" diff --git a/src/core/lib/iomgr/pollset_set_uv.c b/src/core/lib/iomgr/pollset_set_uv.c new file mode 100644 index 0000000000..e5ef8b29e0 --- /dev/null +++ b/src/core/lib/iomgr/pollset_set_uv.c @@ -0,0 +1,62 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV + +#include "src/core/lib/iomgr/pollset_set.h" + +grpc_pollset_set* grpc_pollset_set_create(void) { + return (grpc_pollset_set*)((intptr_t)0xdeafbeef); +} + +void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} + +void grpc_pollset_set_add_pollset(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* pollset_set, + grpc_pollset* pollset) {} + +void grpc_pollset_set_del_pollset(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* pollset_set, + grpc_pollset* pollset) {} + +void grpc_pollset_set_add_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} + +void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, + grpc_pollset_set* bag, + grpc_pollset_set* item) {} + +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.c index a35a9766fc..645650db9b 100644 --- a/src/core/lib/iomgr/pollset_set_windows.c +++ b/src/core/lib/iomgr/pollset_set_windows.c @@ -31,10 +31,10 @@ * */ -#include <grpc/support/port_platform.h> #include <stdint.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINSOCK_SOCKET +#ifdef GRPC_WINSOCK_SOCKET #include "src/core/lib/iomgr/pollset_set_windows.h" @@ -60,4 +60,4 @@ void grpc_pollset_set_del_pollset_set(grpc_exec_ctx* exec_ctx, grpc_pollset_set* bag, grpc_pollset_set* item) {} -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c new file mode 100644 index 0000000000..3a74b842b6 --- /dev/null +++ b/src/core/lib/iomgr/pollset_uv.c @@ -0,0 +1,142 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV + +#include <uv.h> + +#include <string.h> + +#include <grpc/support/log.h> +#include <grpc/support/sync.h> + +#include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_uv.h" + +struct grpc_pollset { + uv_timer_t timer; + int shutting_down; +}; + +/* Indicates that grpc_pollset_work should run an iteration of the UV loop + before running callbacks. This defaults to 1, and should be disabled if + grpc_pollset_work will be called within the callstack of uv_run */ +int grpc_pollset_work_run_loop; + +gpr_mu grpc_polling_mu; + +size_t grpc_pollset_size() { return sizeof(grpc_pollset); } + +void grpc_pollset_global_init(void) { + gpr_mu_init(&grpc_polling_mu); + grpc_pollset_work_run_loop = 1; +} + +void grpc_pollset_global_shutdown(void) { gpr_mu_destroy(&grpc_polling_mu); } + +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + *mu = &grpc_polling_mu; + memset(pollset, 0, sizeof(grpc_pollset)); + uv_timer_init(uv_default_loop(), &pollset->timer); + pollset->shutting_down = 0; +} + +static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; } + +void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_closure *closure) { + GPR_ASSERT(!pollset->shutting_down); + pollset->shutting_down = 1; + if (grpc_pollset_work_run_loop) { + // Drain any pending UV callbacks without blocking + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); +} + +void grpc_pollset_destroy(grpc_pollset *pollset) { + uv_close((uv_handle_t *)&pollset->timer, timer_close_cb); + // timer.data is a boolean indicating that the timer has finished closing + pollset->timer.data = (void *)0; + if (grpc_pollset_work_run_loop) { + while (!pollset->timer.data) { + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } + } +} + +void grpc_pollset_reset(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + pollset->shutting_down = 0; +} + +static void timer_run_cb(uv_timer_t *timer) {} + +grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, + gpr_timespec now, gpr_timespec deadline) { + uint64_t timeout; + gpr_mu_unlock(&grpc_polling_mu); + if (grpc_pollset_work_run_loop) { + if (gpr_time_cmp(deadline, now) >= 0) { + timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); + } else { + timeout = 0; + } + /* We special-case timeout=0 so that we don't bother with the timer when + the loop won't block anyway */ + if (timeout > 0) { + uv_timer_start(&pollset->timer, timer_run_cb, timeout, 0); + /* Run until there is some I/O activity or the timer triggers. It doesn't + matter which happens */ + uv_run(uv_default_loop(), UV_RUN_ONCE); + uv_timer_stop(&pollset->timer); + } else { + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } + } + if (!grpc_closure_list_empty(exec_ctx->closure_list)) { + grpc_exec_ctx_flush(exec_ctx); + } + gpr_mu_lock(&grpc_polling_mu); + return GRPC_ERROR_NONE; +} + +grpc_error *grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) { + return GRPC_ERROR_NONE; +} + +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/pollset_uv.h b/src/core/lib/iomgr/pollset_uv.h new file mode 100644 index 0000000000..0715eb4295 --- /dev/null +++ b/src/core/lib/iomgr/pollset_uv.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_POLLSET_UV_H +#define GRPC_CORE_LIB_IOMGR_POLLSET_UV_H + +extern int grpc_pollset_work_run_loop; + +void grpc_pollset_global_init(void); +void grpc_pollset_global_shutdown(void); + +#endif /* GRPC_CORE_LIB_IOMGR_POLLSET_UV_H */ diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 626dd784b3..5540303e49 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINSOCK_SOCKET +#ifdef GRPC_WINSOCK_SOCKET #include <grpc/support/log.h> #include <grpc/support/thd.h> @@ -241,4 +241,4 @@ grpc_error *grpc_pollset_kick(grpc_pollset *p, void grpc_kick_poller(void) { grpc_iocp_kick(); } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/port.h b/src/core/lib/iomgr/port.h new file mode 100644 index 0000000000..c0bb3b5a23 --- /dev/null +++ b/src/core/lib/iomgr/port.h @@ -0,0 +1,131 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifndef GRPC_CORE_LIB_IOMGR_PORT_H +#define GRPC_CORE_LIB_IOMGR_PORT_H + +#if defined(GRPC_UV) +// Do nothing +#elif defined(GPR_MANYLINUX1) +#define GRPC_HAVE_IPV6_RECVPKTINFO 1 +#define GRPC_HAVE_IP_PKTINFO 1 +#define GRPC_HAVE_MSG_NOSIGNAL 1 +#define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#define GRPC_POSIX_SOCKET 1 +#define GRPC_POSIX_SOCKETADDR 1 +#define GRPC_POSIX_SOCKETUTILS 1 +#define GRPC_POSIX_WAKEUP_FD 1 +#define GRPC_TIMER_USE_GENERIC 1 +#elif defined(GPR_WINDOWS) +#define GRPC_TIMER_USE_GENERIC 1 +#define GRPC_WINSOCK_SOCKET 1 +#define GRPC_WINDOWS_SOCKETUTILS 1 +#elif defined(GPR_ANDROID) +#define GRPC_HAVE_IPV6_RECVPKTINFO 1 +#define GRPC_HAVE_IP_PKTINFO 1 +#define GRPC_HAVE_MSG_NOSIGNAL 1 +#define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_LINUX_EVENTFD 1 +#define GRPC_POSIX_SOCKET 1 +#define GRPC_POSIX_SOCKETADDR 1 +#define GRPC_POSIX_SOCKETUTILS 1 +#define GRPC_POSIX_WAKEUP_FD 1 +#define GRPC_TIMER_USE_GENERIC 1 +#elif defined(GPR_LINUX) +#define GRPC_HAVE_IPV6_RECVPKTINFO 1 +#define GRPC_HAVE_IP_PKTINFO 1 +#define GRPC_HAVE_MSG_NOSIGNAL 1 +#define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_LINUX_MULTIPOLL_WITH_EPOLL 1 +#define GRPC_POSIX_SOCKET 1 +#define GRPC_POSIX_SOCKETADDR 1 +#define GRPC_POSIX_WAKEUP_FD 1 +#define GRPC_TIMER_USE_GENERIC 1 +#ifdef __GLIBC_PREREQ +#if __GLIBC_PREREQ(2, 9) +#define GRPC_LINUX_EPOLL 1 +#define GRPC_LINUX_EVENTFD 1 +#endif +#if __GLIBC_PREREQ(2, 10) +#define GRPC_LINUX_SOCKETUTILS 1 +#endif +#endif +#ifndef GRPC_LINUX_EVENTFD +#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#endif +#ifndef GRPC_LINUX_SOCKETUTILS +#define GRPC_POSIX_SOCKETUTILS +#endif +#elif defined(GPR_APPLE) +#define GRPC_HAVE_IP_PKTINFO 1 +#define GRPC_HAVE_SO_NOSIGPIPE 1 +#define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_MSG_IOVLEN_TYPE int +#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#define GRPC_POSIX_SOCKET 1 +#define GRPC_POSIX_SOCKETADDR 1 +#define GRPC_POSIX_SOCKETUTILS 1 +#define GRPC_POSIX_WAKEUP_FD 1 +#define GRPC_TIMER_USE_GENERIC 1 +#elif defined(GPR_FREEBSD) +#define GRPC_HAVE_IPV6_RECVPKTINFO 1 +#define GRPC_HAVE_IP_PKTINFO 1 +#define GRPC_HAVE_SO_NOSIGPIPE 1 +#define GRPC_HAVE_UNIX_SOCKET 1 +#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#define GRPC_POSIX_SOCKET 1 +#define GRPC_POSIX_SOCKETADDR 1 +#define GRPC_POSIX_SOCKETUTILS 1 +#define GRPC_POSIX_WAKEUP_FD 1 +#define GRPC_TIMER_USE_GENERIC 1 +#elif defined(GPR_NACL) +#define GRPC_POSIX_NO_SPECIAL_WAKEUP_FD 1 +#define GRPC_POSIX_SOCKET 1 +#define GRPC_POSIX_SOCKETADDR 1 +#define GRPC_POSIX_SOCKETUTILS 1 +#define GRPC_POSIX_WAKEUP_FD 1 +#define GRPC_TIMER_USE_GENERIC 1 +#elif !defined(GPR_NO_AUTODETECT_PLATFORM) +#error "Platform not recognized" +#endif + +#if defined(GRPC_POSIX_SOCKET) + defined(GRPC_WINSOCK_SOCKET) + \ + defined(GRPC_CUSTOM_SOCKET) + defined(GRPC_UV) != \ + 1 +#error Must define exactly one of GRPC_POSIX_SOCKET, GRPC_WINSOCK_SOCKET, GPR_CUSTOM_SOCKET +#endif + +#endif /* GRPC_CORE_LIB_IOMGR_PORT_H */ diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h index ddbe375755..275924448a 100644 --- a/src/core/lib/iomgr/resolve_address.h +++ b/src/core/lib/iomgr/resolve_address.h @@ -36,7 +36,6 @@ #include <stddef.h> #include "src/core/lib/iomgr/exec_ctx.h" -#include "src/core/lib/iomgr/iomgr.h" #define GRPC_MAX_SOCKADDR_SIZE 128 diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 4e9f978584..de791b2b67 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -31,12 +31,13 @@ * */ -#include <grpc/support/port_platform.h> -#ifdef GPR_POSIX_SOCKET +#include "src/core/lib/iomgr/port.h" +#ifdef GRPC_POSIX_SOCKET -#include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/resolve_address.h" + #include <string.h> #include <sys/types.h> @@ -49,7 +50,6 @@ #include <grpc/support/useful.h> #include "src/core/lib/iomgr/executor.h" #include "src/core/lib/iomgr/iomgr_internal.h" -#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/block_annotate.h" #include "src/core/lib/support/string.h" diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c new file mode 100644 index 0000000000..b8295acfa1 --- /dev/null +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -0,0 +1,231 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" +#ifdef GRPC_UV + +#include <uv.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/iomgr/closure.h" +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" + +#include <string.h> + +typedef struct request { + grpc_closure *on_done; + grpc_resolved_addresses **addresses; + struct addrinfo *hints; +} request; + +static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result, + grpc_resolved_addresses **addresses) { + struct addrinfo *resp; + size_t i; + if (status != 0) { + grpc_error *error; + *addresses = NULL; + error = GRPC_ERROR_CREATE("getaddrinfo failed"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + return error; + } + (*addresses) = gpr_malloc(sizeof(grpc_resolved_addresses)); + (*addresses)->naddrs = 0; + for (resp = result; resp != NULL; resp = resp->ai_next) { + (*addresses)->naddrs++; + } + (*addresses)->addrs = + gpr_malloc(sizeof(grpc_resolved_address) * (*addresses)->naddrs); + i = 0; + for (resp = result; resp != NULL; resp = resp->ai_next) { + memcpy(&(*addresses)->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); + (*addresses)->addrs[i].len = resp->ai_addrlen; + i++; + } + + { + for (i = 0; i < (*addresses)->naddrs; i++) { + char *buf; + grpc_sockaddr_to_string(&buf, &(*addresses)->addrs[i], 0); + gpr_free(buf); + } + } + return GRPC_ERROR_NONE; +} + +static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, + struct addrinfo *res) { + request *r = (request *)req->data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_error *error; + error = handle_addrinfo_result(status, res, r->addresses); + grpc_exec_ctx_sched(&exec_ctx, r->on_done, error, NULL); + grpc_exec_ctx_finish(&exec_ctx); + + gpr_free(r->hints); + gpr_free(r); + gpr_free(req); + uv_freeaddrinfo(res); +} + +static grpc_error *try_split_host_port(const char *name, + const char *default_port, char **host, + char **port) { + /* parse name, splitting it into host and port parts */ + grpc_error *error; + gpr_split_host_port(name, host, port); + if (host == NULL) { + char *msg; + gpr_asprintf(&msg, "unparseable host:port: '%s'", name); + error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + return error; + } + if (port == NULL) { + if (default_port == NULL) { + char *msg; + gpr_asprintf(&msg, "no port in name '%s'", name); + error = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + return error; + } + *port = gpr_strdup(default_port); + } + return GRPC_ERROR_NONE; +} + +static grpc_error *blocking_resolve_address_impl( + const char *name, const char *default_port, + grpc_resolved_addresses **addresses) { + char *host; + char *port; + struct addrinfo hints; + uv_getaddrinfo_t req; + int s; + grpc_error *err; + + req.addrinfo = NULL; + + err = try_split_host_port(name, default_port, &host, &port); + if (err != GRPC_ERROR_NONE) { + goto done; + } + + /* Call getaddrinfo */ + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; /* ipv4 or ipv6 */ + hints.ai_socktype = SOCK_STREAM; /* stream socket */ + hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */ + + s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints); + err = handle_addrinfo_result(s, req.addrinfo, addresses); + +done: + gpr_free(host); + gpr_free(port); + if (req.addrinfo) { + uv_freeaddrinfo(req.addrinfo); + } + return err; +} + +grpc_error *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port, + grpc_resolved_addresses **addresses) = blocking_resolve_address_impl; + +void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { + if (addrs != NULL) { + gpr_free(addrs->addrs); + } + gpr_free(addrs); +} + +static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, + grpc_closure *on_done, + grpc_resolved_addresses **addrs) { + uv_getaddrinfo_t *req; + request *r; + struct addrinfo *hints; + char *host; + char *port; + grpc_error *err; + int s; + err = try_split_host_port(name, default_port, &host, &port); + if (err != GRPC_ERROR_NONE) { + grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL); + return; + } + r = gpr_malloc(sizeof(request)); + r->on_done = on_done; + r->addresses = addrs; + req = gpr_malloc(sizeof(uv_getaddrinfo_t)); + req->data = r; + + /* Call getaddrinfo */ + hints = gpr_malloc(sizeof(struct addrinfo)); + memset(hints, 0, sizeof(struct addrinfo)); + hints->ai_family = AF_UNSPEC; /* ipv4 or ipv6 */ + hints->ai_socktype = SOCK_STREAM; /* stream socket */ + hints->ai_flags = AI_PASSIVE; /* for wildcard IP address */ + r->hints = hints; + + s = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_callback, host, port, + hints); + + if (s != 0) { + *addrs = NULL; + err = GRPC_ERROR_CREATE("getaddrinfo failed"); + err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, uv_strerror(s)); + grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL); + gpr_free(r); + gpr_free(req); + gpr_free(hints); + } +} + +void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name, + const char *default_port, grpc_closure *on_done, + grpc_resolved_addresses **addrs) = + resolve_address_impl; + +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 2af8af82dc..e139293c03 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -31,12 +31,13 @@ * */ -#include <grpc/support/port_platform.h> -#ifdef GPR_WINSOCK_SOCKET +#include "src/core/lib/iomgr/port.h" +#ifdef GRPC_WINSOCK_SOCKET -#include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/resolve_address.h" + #include <string.h> #include <sys/types.h> @@ -124,8 +125,7 @@ static grpc_error *blocking_resolve_address_impl( { for (i = 0; i < (*addresses)->naddrs; i++) { char *buf; - grpc_sockaddr_to_string( - &buf, (struct sockaddr *)&(*addresses)->addrs[i].addr, 0); + grpc_sockaddr_to_string(&buf, &(*addresses)->addrs[i], 0); gpr_free(buf); } } diff --git a/src/core/lib/iomgr/sockaddr.h b/src/core/lib/iomgr/sockaddr.h index 5563d0b8a6..52b504390d 100644 --- a/src/core/lib/iomgr/sockaddr.h +++ b/src/core/lib/iomgr/sockaddr.h @@ -31,16 +31,24 @@ * */ +/* This header transitively includes other headers that care about include + * order, so it should be included first. As a consequence, it should not be + * included in any other header. */ + #ifndef GRPC_CORE_LIB_IOMGR_SOCKADDR_H #define GRPC_CORE_LIB_IOMGR_SOCKADDR_H -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV +#include <uv.h> +#endif #ifdef GPR_WINDOWS #include "src/core/lib/iomgr/sockaddr_windows.h" #endif -#ifdef GPR_POSIX_SOCKETADDR +#ifdef GRPC_POSIX_SOCKETADDR #include "src/core/lib/iomgr/sockaddr_posix.h" #endif diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c index 127d95c618..44bc2f968b 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.c @@ -42,26 +42,32 @@ #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" static const uint8_t kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; -int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr, - struct sockaddr_in *addr4_out) { - GPR_ASSERT(addr != (struct sockaddr *)addr4_out); +int grpc_sockaddr_is_v4mapped(const grpc_resolved_address *resolved_addr, + grpc_resolved_address *resolved_addr4_out) { + GPR_ASSERT(resolved_addr != resolved_addr4_out); + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; + struct sockaddr_in *addr4_out = + (struct sockaddr_in *)resolved_addr4_out->addr; if (addr->sa_family == AF_INET6) { const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr; if (memcmp(addr6->sin6_addr.s6_addr, kV4MappedPrefix, sizeof(kV4MappedPrefix)) == 0) { - if (addr4_out != NULL) { + if (resolved_addr4_out != NULL) { /* Normalize ::ffff:0.0.0.0/96 to IPv4. */ - memset(addr4_out, 0, sizeof(*addr4_out)); + memset(resolved_addr4_out, 0, sizeof(*resolved_addr4_out)); addr4_out->sin_family = AF_INET; /* s6_addr32 would be nice, but it's non-standard. */ memcpy(&addr4_out->sin_addr, &addr6->sin6_addr.s6_addr[12], 4); addr4_out->sin_port = addr6->sin6_port; + resolved_addr4_out->len = sizeof(struct sockaddr_in); } return 1; } @@ -69,26 +75,33 @@ int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr, return 0; } -int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr, - struct sockaddr_in6 *addr6_out) { - GPR_ASSERT(addr != (struct sockaddr *)addr6_out); +int grpc_sockaddr_to_v4mapped(const grpc_resolved_address *resolved_addr, + grpc_resolved_address *resolved_addr6_out) { + GPR_ASSERT(resolved_addr != resolved_addr6_out); + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; + struct sockaddr_in6 *addr6_out = + (struct sockaddr_in6 *)resolved_addr6_out->addr; if (addr->sa_family == AF_INET) { const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr; - memset(addr6_out, 0, sizeof(*addr6_out)); + memset(resolved_addr6_out, 0, sizeof(*resolved_addr6_out)); addr6_out->sin6_family = AF_INET6; memcpy(&addr6_out->sin6_addr.s6_addr[0], kV4MappedPrefix, 12); memcpy(&addr6_out->sin6_addr.s6_addr[12], &addr4->sin_addr, 4); addr6_out->sin6_port = addr4->sin_port; + resolved_addr6_out->len = sizeof(struct sockaddr_in6); return 1; } return 0; } -int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out) { - struct sockaddr_in addr4_normalized; - if (grpc_sockaddr_is_v4mapped(addr, &addr4_normalized)) { - addr = (struct sockaddr *)&addr4_normalized; +int grpc_sockaddr_is_wildcard(const grpc_resolved_address *resolved_addr, + int *port_out) { + const struct sockaddr *addr; + grpc_resolved_address addr4_normalized; + if (grpc_sockaddr_is_v4mapped(resolved_addr, &addr4_normalized)) { + resolved_addr = &addr4_normalized; } + addr = (const struct sockaddr *)resolved_addr->addr; if (addr->sa_family == AF_INET) { /* Check for 0.0.0.0 */ const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr; @@ -113,39 +126,49 @@ int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out) { } } -void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, - struct sockaddr_in6 *wild6_out) { +void grpc_sockaddr_make_wildcards(int port, grpc_resolved_address *wild4_out, + grpc_resolved_address *wild6_out) { grpc_sockaddr_make_wildcard4(port, wild4_out); grpc_sockaddr_make_wildcard6(port, wild6_out); } -void grpc_sockaddr_make_wildcard4(int port, struct sockaddr_in *wild_out) { +void grpc_sockaddr_make_wildcard4(int port, + grpc_resolved_address *resolved_wild_out) { + struct sockaddr_in *wild_out = (struct sockaddr_in *)resolved_wild_out->addr; GPR_ASSERT(port >= 0 && port < 65536); - memset(wild_out, 0, sizeof(*wild_out)); + memset(resolved_wild_out, 0, sizeof(*resolved_wild_out)); wild_out->sin_family = AF_INET; wild_out->sin_port = htons((uint16_t)port); + resolved_wild_out->len = sizeof(struct sockaddr_in); } -void grpc_sockaddr_make_wildcard6(int port, struct sockaddr_in6 *wild_out) { +void grpc_sockaddr_make_wildcard6(int port, + grpc_resolved_address *resolved_wild_out) { + struct sockaddr_in6 *wild_out = + (struct sockaddr_in6 *)resolved_wild_out->addr; GPR_ASSERT(port >= 0 && port < 65536); - memset(wild_out, 0, sizeof(*wild_out)); + memset(resolved_wild_out, 0, sizeof(*resolved_wild_out)); wild_out->sin6_family = AF_INET6; wild_out->sin6_port = htons((uint16_t)port); + resolved_wild_out->len = sizeof(struct sockaddr_in6); } -int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, +int grpc_sockaddr_to_string(char **out, + const grpc_resolved_address *resolved_addr, int normalize) { + const struct sockaddr *addr; const int save_errno = errno; - struct sockaddr_in addr_normalized; + grpc_resolved_address addr_normalized; char ntop_buf[INET6_ADDRSTRLEN]; const void *ip = NULL; int port; int ret; *out = NULL; - if (normalize && grpc_sockaddr_is_v4mapped(addr, &addr_normalized)) { - addr = (const struct sockaddr *)&addr_normalized; + if (normalize && grpc_sockaddr_is_v4mapped(resolved_addr, &addr_normalized)) { + resolved_addr = &addr_normalized; } + addr = (const struct sockaddr *)resolved_addr->addr; if (addr->sa_family == AF_INET) { const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr; ip = &addr4->sin_addr; @@ -155,10 +178,8 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, ip = &addr6->sin6_addr; port = ntohs(addr6->sin6_port); } - /* Windows inet_ntop wants a mutable ip pointer */ if (ip != NULL && - inet_ntop(addr->sa_family, (void *)ip, ntop_buf, sizeof(ntop_buf)) != - NULL) { + grpc_inet_ntop(addr->sa_family, ip, ntop_buf, sizeof(ntop_buf)) != NULL) { ret = gpr_join_host_port(out, ntop_buf, port); } else { ret = gpr_asprintf(out, "(sockaddr family=%d)", addr->sa_family); @@ -168,39 +189,43 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, return ret; } -char *grpc_sockaddr_to_uri(const struct sockaddr *addr) { +char *grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr) { char *temp; char *result; - struct sockaddr_in addr_normalized; + grpc_resolved_address addr_normalized; + const struct sockaddr *addr; - if (grpc_sockaddr_is_v4mapped(addr, &addr_normalized)) { - addr = (const struct sockaddr *)&addr_normalized; + if (grpc_sockaddr_is_v4mapped(resolved_addr, &addr_normalized)) { + resolved_addr = &addr_normalized; } + addr = (const struct sockaddr *)resolved_addr->addr; + switch (addr->sa_family) { case AF_INET: - grpc_sockaddr_to_string(&temp, addr, 0); + grpc_sockaddr_to_string(&temp, resolved_addr, 0); gpr_asprintf(&result, "ipv4:%s", temp); gpr_free(temp); return result; case AF_INET6: - grpc_sockaddr_to_string(&temp, addr, 0); + grpc_sockaddr_to_string(&temp, resolved_addr, 0); gpr_asprintf(&result, "ipv6:%s", temp); gpr_free(temp); return result; default: - return grpc_sockaddr_to_uri_unix_if_possible(addr); + return grpc_sockaddr_to_uri_unix_if_possible(resolved_addr); } } -int grpc_sockaddr_get_port(const struct sockaddr *addr) { +int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; switch (addr->sa_family) { case AF_INET: return ntohs(((struct sockaddr_in *)addr)->sin_port); case AF_INET6: return ntohs(((struct sockaddr_in6 *)addr)->sin6_port); default: - if (grpc_is_unix_socket(addr)) { + if (grpc_is_unix_socket(resolved_addr)) { return 1; } gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", @@ -209,7 +234,9 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) { } } -int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) { +int grpc_sockaddr_set_port(const grpc_resolved_address *resolved_addr, + int port) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; switch (addr->sa_family) { case AF_INET: GPR_ASSERT(port >= 0 && port < 65536); diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 9f81992e6b..5371e360c5 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -34,40 +34,40 @@ #ifndef GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H #define GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H -#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/resolve_address.h" /* Returns true if addr is an IPv4-mapped IPv6 address within the ::ffff:0.0.0.0/96 range, or false otherwise. If addr4_out is non-NULL, the inner IPv4 address will be copied here when returning true. */ -int grpc_sockaddr_is_v4mapped(const struct sockaddr *addr, - struct sockaddr_in *addr4_out); +int grpc_sockaddr_is_v4mapped(const grpc_resolved_address *addr, + grpc_resolved_address *addr4_out); /* If addr is an AF_INET address, writes the corresponding ::ffff:0.0.0.0/96 address to addr6_out and returns true. Otherwise returns false. */ -int grpc_sockaddr_to_v4mapped(const struct sockaddr *addr, - struct sockaddr_in6 *addr6_out); +int grpc_sockaddr_to_v4mapped(const grpc_resolved_address *addr, + grpc_resolved_address *addr6_out); /* If addr is ::, 0.0.0.0, or ::ffff:0.0.0.0, writes the port number to *port_out (if not NULL) and returns true, otherwise returns false. */ -int grpc_sockaddr_is_wildcard(const struct sockaddr *addr, int *port_out); +int grpc_sockaddr_is_wildcard(const grpc_resolved_address *addr, int *port_out); /* Writes 0.0.0.0:port and [::]:port to separate sockaddrs. */ -void grpc_sockaddr_make_wildcards(int port, struct sockaddr_in *wild4_out, - struct sockaddr_in6 *wild6_out); +void grpc_sockaddr_make_wildcards(int port, grpc_resolved_address *wild4_out, + grpc_resolved_address *wild6_out); /* Writes 0.0.0.0:port. */ -void grpc_sockaddr_make_wildcard4(int port, struct sockaddr_in *wild_out); +void grpc_sockaddr_make_wildcard4(int port, grpc_resolved_address *wild_out); /* Writes [::]:port. */ -void grpc_sockaddr_make_wildcard6(int port, struct sockaddr_in6 *wild_out); +void grpc_sockaddr_make_wildcard6(int port, grpc_resolved_address *wild_out); /* Return the IP port number of a sockaddr */ -int grpc_sockaddr_get_port(const struct sockaddr *addr); +int grpc_sockaddr_get_port(const grpc_resolved_address *addr); /* Set IP port number of a sockaddr */ -int grpc_sockaddr_set_port(const struct sockaddr *addr, int port); +int grpc_sockaddr_set_port(const grpc_resolved_address *addr, int port); /* Converts a sockaddr into a newly-allocated human-readable string. @@ -81,9 +81,9 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port); In the unlikely event of an error, returns -1 and sets *out to NULL. The existing value of errno is always preserved. */ -int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, +int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr, int normalize); -char *grpc_sockaddr_to_uri(const struct sockaddr *addr); +char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr); #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/socket_utils.h b/src/core/lib/iomgr/socket_utils.h new file mode 100644 index 0000000000..cc3ee2e30c --- /dev/null +++ b/src/core/lib/iomgr/socket_utils.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H +#define GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H + +#include <stddef.h> + +/* A wrapper for inet_ntop on POSIX systems and InetNtop on Windows systems */ +const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size); + +#endif /* GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_H */ diff --git a/src/core/lib/iomgr/socket_utils_common_posix.c b/src/core/lib/iomgr/socket_utils_common_posix.c index d2f6261e2a..bc28bbe316 100644 --- a/src/core/lib/iomgr/socket_utils_common_posix.c +++ b/src/core/lib/iomgr/socket_utils_common_posix.c @@ -31,10 +31,11 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET +#include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include <arpa/inet.h> @@ -78,7 +79,7 @@ grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking) { } grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) { -#ifdef GPR_HAVE_SO_NOSIGPIPE +#ifdef GRPC_HAVE_SO_NOSIGPIPE int val = 1; int newval; socklen_t intlen = sizeof(newval); @@ -96,7 +97,7 @@ grpc_error *grpc_set_socket_no_sigpipe_if_possible(int fd) { } grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) { -#ifdef GPR_HAVE_IP_PKTINFO +#ifdef GRPC_HAVE_IP_PKTINFO int get_local_ip = 1; if (0 != setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip, sizeof(get_local_ip))) { @@ -107,7 +108,7 @@ grpc_error *grpc_set_socket_ip_pktinfo_if_possible(int fd) { } grpc_error *grpc_set_socket_ipv6_recvpktinfo_if_possible(int fd) { -#ifdef GPR_HAVE_IPV6_RECVPKTINFO +#ifdef GRPC_HAVE_IPV6_RECVPKTINFO int get_local_ip = 1; if (0 != setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip, sizeof(get_local_ip))) { @@ -253,7 +254,7 @@ static int set_socket_dualstack(int fd) { } } -static grpc_error *error_for_fd(int fd, const struct sockaddr *addr) { +static grpc_error *error_for_fd(int fd, const grpc_resolved_address *addr) { if (fd >= 0) return GRPC_ERROR_NONE; char *addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); @@ -263,10 +264,10 @@ static grpc_error *error_for_fd(int fd, const struct sockaddr *addr) { return err; } -grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type, - int protocol, - grpc_dualstack_mode *dsmode, - int *newfd) { +grpc_error *grpc_create_dualstack_socket( + const grpc_resolved_address *resolved_addr, int type, int protocol, + grpc_dualstack_mode *dsmode, int *newfd) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; int family = addr->sa_family; if (family == AF_INET6) { if (grpc_ipv6_loopback_available()) { @@ -281,9 +282,9 @@ grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type, return GRPC_ERROR_NONE; } /* If this isn't an IPv4 address, then return whatever we've got. */ - if (!grpc_sockaddr_is_v4mapped(addr, NULL)) { + if (!grpc_sockaddr_is_v4mapped(resolved_addr, NULL)) { *dsmode = GRPC_DSMODE_IPV6; - return error_for_fd(*newfd, addr); + return error_for_fd(*newfd, resolved_addr); } /* Fall back to AF_INET. */ if (*newfd >= 0) { @@ -293,7 +294,12 @@ grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type, } *dsmode = family == AF_INET ? GRPC_DSMODE_IPV4 : GRPC_DSMODE_NONE; *newfd = socket(family, type, protocol); - return error_for_fd(*newfd, addr); + return error_for_fd(*newfd, resolved_addr); +} + +const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) { + GPR_ASSERT(size <= (socklen_t)-1); + return inet_ntop(af, src, dst, (socklen_t)size); } #endif diff --git a/src/core/lib/iomgr/socket_utils_linux.c b/src/core/lib/iomgr/socket_utils_linux.c index 144e3110c8..bf6e9e4f55 100644 --- a/src/core/lib/iomgr/socket_utils_linux.c +++ b/src/core/lib/iomgr/socket_utils_linux.c @@ -31,21 +31,27 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_LINUX_SOCKETUTILS +#ifdef GRPC_LINUX_SOCKETUTILS +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/socket_utils_posix.h" +#include <grpc/support/log.h> + #include <sys/socket.h> #include <sys/types.h> -int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, - int nonblock, int cloexec) { +int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock, + int cloexec) { int flags = 0; + GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t)); + GPR_ASSERT(resolved_addr->len <= (socklen_t)-1); flags |= nonblock ? SOCK_NONBLOCK : 0; flags |= cloexec ? SOCK_CLOEXEC : 0; - return accept4(sockfd, addr, addrlen, flags); + return accept4(sockfd, (struct sockaddr *)resolved_addr->addr, + (socklen_t *)&resolved_addr->len, flags); } #endif diff --git a/src/core/lib/iomgr/socket_utils_posix.c b/src/core/lib/iomgr/socket_utils_posix.c index 57ae64c103..9dea0c0cd8 100644 --- a/src/core/lib/iomgr/socket_utils_posix.c +++ b/src/core/lib/iomgr/socket_utils_posix.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKETUTILS +#ifdef GRPC_POSIX_SOCKETUTILS #include "src/core/lib/iomgr/socket_utils_posix.h" @@ -42,12 +42,15 @@ #include <unistd.h> #include <grpc/support/log.h> +#include "src/core/lib/iomgr/sockaddr.h" -int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, - int nonblock, int cloexec) { +int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock, + int cloexec) { int fd, flags; - - fd = accept(sockfd, addr, addrlen); + GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t)); + GPR_ASSERT(resolved_addr->len <= (socklen_t)-1); + fd = accept(sockfd, (struct sockaddr *)resolved_addr->addr, + (socklen_t *)&resolved_addr->len); if (fd >= 0) { if (nonblock) { flags = fcntl(fd, F_GETFL, 0); @@ -67,4 +70,4 @@ close_and_error: return -1; } -#endif /* GPR_POSIX_SOCKETUTILS */ +#endif /* GRPC_POSIX_SOCKETUTILS */ diff --git a/src/core/lib/iomgr/socket_utils_posix.h b/src/core/lib/iomgr/socket_utils_posix.h index 7bcc2219ae..175fb2b717 100644 --- a/src/core/lib/iomgr/socket_utils_posix.h +++ b/src/core/lib/iomgr/socket_utils_posix.h @@ -34,14 +34,16 @@ #ifndef GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H #define GRPC_CORE_LIB_IOMGR_SOCKET_UTILS_POSIX_H +#include "src/core/lib/iomgr/resolve_address.h" + #include <sys/socket.h> #include <unistd.h> #include "src/core/lib/iomgr/error.h" /* a wrapper for accept or accept4 */ -int grpc_accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, - int nonblock, int cloexec); +int grpc_accept4(int sockfd, grpc_resolved_address *resolved_addr, int nonblock, + int cloexec); /* set a socket to non blocking mode */ grpc_error *grpc_set_socket_nonblocking(int fd, int non_blocking); @@ -125,8 +127,8 @@ extern int grpc_forbid_dualstack_sockets_for_testing; IPv4, so that bind() or connect() see the correct family. Also, it's important to distinguish between DUALSTACK and IPV6 when listening on the [::] wildcard address. */ -grpc_error *grpc_create_dualstack_socket(const struct sockaddr *addr, int type, - int protocol, +grpc_error *grpc_create_dualstack_socket(const grpc_resolved_address *addr, + int type, int protocol, grpc_dualstack_mode *dsmode, int *newfd); diff --git a/src/core/lib/iomgr/socket_utils_uv.c b/src/core/lib/iomgr/socket_utils_uv.c new file mode 100644 index 0000000000..741bf28969 --- /dev/null +++ b/src/core/lib/iomgr/socket_utils_uv.c @@ -0,0 +1,49 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV + +#include <uv.h> + +#include "src/core/lib/iomgr/socket_utils.h" + +#include <grpc/support/log.h> + +const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) { + uv_inet_ntop(af, src, dst, size); + return dst; +} + +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/socket_utils_windows.c b/src/core/lib/iomgr/socket_utils_windows.c new file mode 100644 index 0000000000..628ad4a45b --- /dev/null +++ b/src/core/lib/iomgr/socket_utils_windows.c @@ -0,0 +1,48 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_WINDOWS_SOCKETUTILS + +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/socket_utils.h" + +#include <grpc/support/log.h> + +const char *grpc_inet_ntop(int af, const void *src, char *dst, size_t size) { + /* Windows InetNtopA wants a mutable ip pointer */ + return InetNtopA(af, (void *)src, dst, size); +} + +#endif /* GRPC_WINDOWS_SOCKETUTILS */ diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index 78ef46d042..35f23300dc 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINSOCK_SOCKET +#ifdef GRPC_WINSOCK_SOCKET #include <winsock2.h> @@ -156,4 +156,4 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, if (should_destroy) destroy(socket); } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_client.h b/src/core/lib/iomgr/tcp_client.h index a07e0b9f0c..fe1e7f4d3e 100644 --- a/src/core/lib/iomgr/tcp_client.h +++ b/src/core/lib/iomgr/tcp_client.h @@ -37,7 +37,7 @@ #include <grpc/support/time.h> #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/pollset_set.h" -#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/resolve_address.h" /* Asynchronously connect to an address (specified as (addr, len)), and call cb with arg and the completed connection when done (or call cb with arg and @@ -47,7 +47,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_connect, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, - const struct sockaddr *addr, size_t addr_len, + const grpc_resolved_address *addr, gpr_timespec deadline); #endif /* GRPC_CORE_LIB_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 3496b6094f..e7ae1ef695 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/iomgr/tcp_client.h" @@ -71,7 +71,7 @@ typedef struct { grpc_closure *closure; } async_connect; -static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { +static grpc_error *prepare_socket(const grpc_resolved_address *addr, int fd) { grpc_error *err = GRPC_ERROR_NONE; GPR_ASSERT(fd >= 0); @@ -223,14 +223,14 @@ finish: static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, - const struct sockaddr *addr, - size_t addr_len, gpr_timespec deadline) { + const grpc_resolved_address *addr, + gpr_timespec deadline) { int fd; grpc_dualstack_mode dsmode; int err; async_connect *ac; - struct sockaddr_in6 addr6_v4mapped; - struct sockaddr_in addr4_copy; + grpc_resolved_address addr6_v4mapped; + grpc_resolved_address addr4_copy; grpc_fd *fdobj; char *name; char *addr_str; @@ -240,8 +240,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, /* Use dualstack sockets where available. */ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { - addr = (const struct sockaddr *)&addr6_v4mapped; - addr_len = sizeof(addr6_v4mapped); + addr = &addr6_v4mapped; } error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); @@ -252,8 +251,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (dsmode == GRPC_DSMODE_IPV4) { /* If we got an AF_INET socket, map the address back to IPv4. */ GPR_ASSERT(grpc_sockaddr_is_v4mapped(addr, &addr4_copy)); - addr = (struct sockaddr *)&addr4_copy; - addr_len = sizeof(addr4_copy); + addr = &addr4_copy; } if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) { grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); @@ -261,8 +259,9 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, } do { - GPR_ASSERT(addr_len < ~(socklen_t)0); - err = connect(fd, addr, (socklen_t)addr_len); + GPR_ASSERT(addr->len < ~(socklen_t)0); + err = + connect(fd, (const struct sockaddr *)addr->addr, (socklen_t)addr->len); } while (err < 0 && errno == EINTR); addr_str = grpc_sockaddr_to_uri(addr); @@ -317,16 +316,16 @@ done: // overridden by api_fuzzer.c void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, - grpc_pollset_set *interested_parties, const struct sockaddr *addr, - size_t addr_len, gpr_timespec deadline) = tcp_client_connect_impl; + grpc_pollset_set *interested_parties, const grpc_resolved_address *addr, + gpr_timespec deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, - const struct sockaddr *addr, size_t addr_len, + const grpc_resolved_address *addr, gpr_timespec deadline) { grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, - addr_len, deadline); + deadline); } #endif diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c new file mode 100644 index 0000000000..6274667042 --- /dev/null +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -0,0 +1,153 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/tcp_uv.h" +#include "src/core/lib/iomgr/timer.h" + +typedef struct grpc_uv_tcp_connect { + uv_connect_t connect_req; + grpc_timer alarm; + uv_tcp_t *tcp_handle; + grpc_closure *closure; + grpc_endpoint **endpoint; + int refs; + char *addr_name; +} grpc_uv_tcp_connect; + +static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) { + gpr_free(connect); +} + +static void tcp_close_callback(uv_handle_t *handle) { gpr_free(handle); } + +static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, + grpc_error *error) { + int done; + grpc_uv_tcp_connect *connect = acp; + if (error == GRPC_ERROR_NONE) { + /* error == NONE implies that the timer ran out, and wasn't cancelled. If + it was cancelled, then the handler that cancelled it also should close + the handle, if applicable */ + uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback); + } + done = (--connect->refs == 0); + if (done) { + uv_tcp_connect_cleanup(connect); + } +} + +static void uv_tc_on_connect(uv_connect_t *req, int status) { + grpc_uv_tcp_connect *connect = req->data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_error *error = GRPC_ERROR_NONE; + int done; + grpc_closure *closure = connect->closure; + grpc_timer_cancel(&exec_ctx, &connect->alarm); + if (status == 0) { + *connect->endpoint = + grpc_tcp_create(connect->tcp_handle, connect->addr_name); + } else { + error = GRPC_ERROR_CREATE("Failed to connect to remote host"); + error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + if (status == UV_ECANCELED) { + error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + "Timeout occurred"); + // This should only happen if the handle is already closed + } else { + error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, + uv_strerror(status)); + uv_close((uv_handle_t *)connect->tcp_handle, tcp_close_callback); + } + } + done = (--connect->refs == 0); + if (done) { + uv_tcp_connect_cleanup(connect); + } + grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_endpoint **ep, + grpc_pollset_set *interested_parties, + const grpc_resolved_address *resolved_addr, + gpr_timespec deadline) { + grpc_uv_tcp_connect *connect; + (void)interested_parties; + connect = gpr_malloc(sizeof(grpc_uv_tcp_connect)); + memset(connect, 0, sizeof(grpc_uv_tcp_connect)); + connect->closure = closure; + connect->endpoint = ep; + connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t)); + connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); + uv_tcp_init(uv_default_loop(), connect->tcp_handle); + connect->connect_req.data = connect; + // TODO(murgatroid99): figure out what the return value here means + uv_tcp_connect(&connect->connect_req, connect->tcp_handle, + (const struct sockaddr *)resolved_addr->addr, + uv_tc_on_connect); + grpc_timer_init(exec_ctx, &connect->alarm, + gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), + uv_tc_on_alarm, connect, gpr_now(GPR_CLOCK_MONOTONIC)); +} + +// overridden by api_fuzzer.c +void (*grpc_tcp_client_connect_impl)( + grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, + grpc_pollset_set *interested_parties, const grpc_resolved_address *addr, + gpr_timespec deadline) = tcp_client_connect_impl; + +void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_endpoint **ep, + grpc_pollset_set *interested_parties, + const grpc_resolved_address *addr, + gpr_timespec deadline) { + grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, + deadline); +} + +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 562cb9c6bf..fdd8c1a1f8 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINSOCK_SOCKET +#ifdef GRPC_WINSOCK_SOCKET #include "src/core/lib/iomgr/sockaddr_windows.h" @@ -129,13 +129,13 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, - const struct sockaddr *addr, size_t addr_len, + const grpc_resolved_address *addr, gpr_timespec deadline) { SOCKET sock = INVALID_SOCKET; BOOL success; int status; - struct sockaddr_in6 addr6_v4mapped; - struct sockaddr_in6 local_address; + grpc_resolved_address addr6_v4mapped; + grpc_resolved_address local_address; async_connect *ac; grpc_winsocket *socket = NULL; LPFN_CONNECTEX ConnectEx; @@ -148,8 +148,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, /* Use dualstack sockets where available. */ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { - addr = (const struct sockaddr *)&addr6_v4mapped; - addr_len = sizeof(addr6_v4mapped); + addr = &addr6_v4mapped; } sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, @@ -178,7 +177,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_sockaddr_make_wildcard6(0, &local_address); - status = bind(sock, (struct sockaddr *)&local_address, sizeof(local_address)); + status = + bind(sock, (struct sockaddr *)&local_address.addr, local_address.len); if (status != 0) { error = GRPC_WSA_ERROR(WSAGetLastError(), "bind"); goto failure; @@ -186,8 +186,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, socket = grpc_winsocket_create(sock, "client"); info = &socket->write_info; - success = - ConnectEx(sock, addr, (int)addr_len, NULL, 0, NULL, &info->overlapped); + success = ConnectEx(sock, (struct sockaddr *)&addr->addr, (int)addr->len, + NULL, 0, NULL, &info->overlapped); /* It wouldn't be unusual to get a success immediately. But we'll still get an IOCP notification, so let's ignore it. */ @@ -228,4 +228,4 @@ failure: grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL); } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 00fd77679a..18c63cfbae 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/tcp_posix.h" @@ -58,14 +58,14 @@ #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" -#ifdef GPR_HAVE_MSG_NOSIGNAL +#ifdef GRPC_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL #else #define SENDMSG_FLAGS 0 #endif -#ifdef GPR_MSG_IOVLEN_TYPE -typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type; +#ifdef GRPC_MSG_IOVLEN_TYPE +typedef GRPC_MSG_IOVLEN_TYPE msg_iovlen_type; #else typedef size_t msg_iovlen_type; #endif diff --git a/src/core/lib/iomgr/tcp_server.h b/src/core/lib/iomgr/tcp_server.h index 9a390699b4..0c2a4f41da 100644 --- a/src/core/lib/iomgr/tcp_server.h +++ b/src/core/lib/iomgr/tcp_server.h @@ -38,6 +38,7 @@ #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/resolve_address.h" /* Forward decl of grpc_tcp_server */ typedef struct grpc_tcp_server grpc_tcp_server; @@ -78,8 +79,9 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, but not dualstack sockets. */ /* TODO(ctiller): deprecate this, and make grpc_tcp_server_add_ports to handle all of the multiple socket port matching logic in one place */ -grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len, int *out_port); +grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, + const grpc_resolved_address *addr, + int *out_port); /* Number of fds at the given port_index, or 0 if port_index is out of bounds. */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 73df5477e6..127f8595d1 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -36,9 +36,9 @@ #define _GNU_SOURCE #endif -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/iomgr/tcp_server.h" @@ -62,6 +62,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" #include "src/core/lib/iomgr/tcp_posix.h" @@ -79,11 +80,7 @@ struct grpc_tcp_listener { int fd; grpc_fd *emfd; grpc_tcp_server *server; - union { - uint8_t untyped[GRPC_MAX_SOCKADDR_SIZE]; - struct sockaddr sockaddr; - } addr; - size_t addr_len; + grpc_resolved_address addr; int port; unsigned port_index; unsigned fd_index; @@ -238,7 +235,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->head) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { - grpc_unlink_if_unix_domain_socket(&sp->addr.sockaddr); + grpc_unlink_if_unix_domain_socket(&sp->addr); sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, @@ -304,11 +301,9 @@ static int get_max_accept_queue_size(void) { } /* Prepare a recently-created socket for listening. */ -static grpc_error *prepare_socket(int fd, const struct sockaddr *addr, - size_t addr_len, bool so_reuseport, - int *port) { - struct sockaddr_storage sockname_temp; - socklen_t sockname_len; +static grpc_error *prepare_socket(int fd, const grpc_resolved_address *addr, + bool so_reuseport, int *port) { + grpc_resolved_address sockname_temp; grpc_error *err = GRPC_ERROR_NONE; GPR_ASSERT(fd >= 0); @@ -331,8 +326,8 @@ static grpc_error *prepare_socket(int fd, const struct sockaddr *addr, err = grpc_set_socket_no_sigpipe_if_possible(fd); if (err != GRPC_ERROR_NONE) goto error; - GPR_ASSERT(addr_len < ~(socklen_t)0); - if (bind(fd, addr, (socklen_t)addr_len) < 0) { + GPR_ASSERT(addr->len < ~(socklen_t)0); + if (bind(fd, (struct sockaddr *)addr->addr, (socklen_t)addr->len) < 0) { err = GRPC_OS_ERROR(errno, "bind"); goto error; } @@ -342,13 +337,15 @@ static grpc_error *prepare_socket(int fd, const struct sockaddr *addr, goto error; } - sockname_len = sizeof(sockname_temp); - if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) { + sockname_temp.len = sizeof(struct sockaddr_storage); + + if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, + (socklen_t *)&sockname_temp.len) < 0) { err = GRPC_OS_ERROR(errno, "getsockname"); goto error; } - *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + *port = grpc_sockaddr_get_port(&sockname_temp); return GRPC_ERROR_NONE; error: @@ -382,13 +379,13 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { /* loop until accept4 returns EAGAIN, and then re-arm notification */ for (;;) { - struct sockaddr_storage addr; - socklen_t addrlen = sizeof(addr); + grpc_resolved_address addr; char *addr_str; char *name; + addr.len = sizeof(struct sockaddr_storage); /* Note: If we ever decide to return this address to the user, remember to strip off the ::ffff:0.0.0.0/96 prefix first. */ - int fd = grpc_accept4(sp->fd, (struct sockaddr *)&addr, &addrlen, 1, 1); + int fd = grpc_accept4(sp->fd, &addr, 1, 1); if (fd < 0) { switch (errno) { case EINTR: @@ -404,7 +401,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *err) { grpc_set_socket_no_sigpipe_if_possible(fd); - addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); + addr_str = grpc_sockaddr_to_uri(&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); if (grpc_tcp_trace) { @@ -442,19 +439,18 @@ error: } static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, - const struct sockaddr *addr, - size_t addr_len, unsigned port_index, - unsigned fd_index, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, grpc_tcp_listener **listener) { grpc_tcp_listener *sp = NULL; int port = -1; char *addr_str; char *name; - grpc_error *err = prepare_socket(fd, addr, addr_len, s->so_reuseport, &port); + grpc_error *err = prepare_socket(fd, addr, s->so_reuseport, &port); if (err == GRPC_ERROR_NONE) { GPR_ASSERT(port > 0); - grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "tcp-server-listener:%s", addr_str); gpr_mu_lock(&s->mu); s->nports++; @@ -470,8 +466,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, sp->server = s; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name); - memcpy(sp->addr.untyped, addr, addr_len); - sp->addr_len = addr_len; + memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = port_index; sp->fd_index = fd_index; @@ -504,14 +499,13 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { int fd = -1; int port = -1; grpc_dualstack_mode dsmode; - err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0, - &dsmode, &fd); + err = grpc_create_dualstack_socket(&listener->addr, SOCK_STREAM, 0, &dsmode, + &fd); if (err != GRPC_ERROR_NONE) return err; - err = prepare_socket(fd, &listener->addr.sockaddr, listener->addr_len, true, - &port); + err = prepare_socket(fd, &listener->addr, true, &port); if (err != GRPC_ERROR_NONE) return err; listener->server->nports++; - grpc_sockaddr_to_string(&addr_str, &listener->addr.sockaddr, 1); + grpc_sockaddr_to_string(&addr_str, &listener->addr, 1); gpr_asprintf(&name, "tcp-server-listener:%s/clone-%d", addr_str, i); sp = gpr_malloc(sizeof(grpc_tcp_listener)); sp->next = listener->next; @@ -524,8 +518,7 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { sp->server = listener->server; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name); - memcpy(sp->addr.untyped, listener->addr.untyped, listener->addr_len); - sp->addr_len = listener->addr_len; + memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = listener->port_index; sp->fd_index = listener->fd_index + count - i; @@ -540,19 +533,19 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { return GRPC_ERROR_NONE; } -grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len, int *out_port) { +grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, + const grpc_resolved_address *addr, + int *out_port) { grpc_tcp_listener *sp; grpc_tcp_listener *sp2 = NULL; int fd; grpc_dualstack_mode dsmode; - struct sockaddr_in6 addr6_v4mapped; - struct sockaddr_in wild4; - struct sockaddr_in6 wild6; - struct sockaddr_in addr4_copy; - struct sockaddr *allocated_addr = NULL; - struct sockaddr_storage sockname_temp; - socklen_t sockname_len; + grpc_resolved_address addr6_v4mapped; + grpc_resolved_address wild4; + grpc_resolved_address wild6; + grpc_resolved_address addr4_copy; + grpc_resolved_address *allocated_addr = NULL; + grpc_resolved_address sockname_temp; int port; unsigned port_index = 0; unsigned fd_index = 0; @@ -560,19 +553,19 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, if (s->tail != NULL) { port_index = s->tail->port_index + 1; } - grpc_unlink_if_unix_domain_socket((struct sockaddr *)addr); + grpc_unlink_if_unix_domain_socket(addr); /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { for (sp = s->head; sp; sp = sp->next) { - sockname_len = sizeof(sockname_temp); - if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp, - &sockname_len)) { - port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + sockname_temp.len = sizeof(struct sockaddr_storage); + if (0 == getsockname(sp->fd, (struct sockaddr *)sockname_temp.addr, + (socklen_t *)&sockname_temp.len)) { + port = grpc_sockaddr_get_port(&sockname_temp); if (port > 0) { - allocated_addr = gpr_malloc(addr_len); - memcpy(allocated_addr, addr, addr_len); + allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); + memcpy(allocated_addr, addr, addr->len); grpc_sockaddr_set_port(allocated_addr, port); addr = allocated_addr; break; @@ -584,8 +577,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, sp = NULL; if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { - addr = (const struct sockaddr *)&addr6_v4mapped; - addr_len = sizeof(addr6_v4mapped); + addr = &addr6_v4mapped; } /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ @@ -593,12 +585,10 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, grpc_sockaddr_make_wildcards(port, &wild4, &wild6); /* Try listening on IPv6 first. */ - addr = (struct sockaddr *)&wild6; - addr_len = sizeof(wild6); + addr = &wild6; errs[0] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); if (errs[0] == GRPC_ERROR_NONE) { - errs[0] = add_socket_to_server(s, fd, addr, addr_len, port_index, - fd_index, &sp); + errs[0] = add_socket_to_server(s, fd, addr, port_index, fd_index, &sp); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -607,23 +597,20 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, } /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ if (port == 0 && sp != NULL) { - grpc_sockaddr_set_port((struct sockaddr *)&wild4, sp->port); + grpc_sockaddr_set_port(&wild4, sp->port); } } - addr = (struct sockaddr *)&wild4; - addr_len = sizeof(wild4); + addr = &wild4; } errs[1] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); if (errs[1] == GRPC_ERROR_NONE) { if (dsmode == GRPC_DSMODE_IPV4 && grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { - addr = (struct sockaddr *)&addr4_copy; - addr_len = sizeof(addr4_copy); + addr = &addr4_copy; } sp2 = sp; - errs[1] = - add_socket_to_server(s, fd, addr, addr_len, port_index, fd_index, &sp); + errs[1] = add_socket_to_server(s, fd, addr, port_index, fd_index, &sp); if (sp2 != NULL && sp != NULL) { sp2->sibling = sp; sp->is_sibling = 1; @@ -704,7 +691,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, s->pollset_count = pollset_count; sp = s->head; while (sp != NULL) { - if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr.sockaddr) && + if (s->so_reuseport && !grpc_is_unix_socket(&sp->addr) && pollset_count > 1) { GPR_ASSERT(GRPC_LOG_IF_ERROR( "clone_port", clone_port(sp, (unsigned)(pollset_count - 1)))); diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c new file mode 100644 index 0000000000..73e4db3d65 --- /dev/null +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -0,0 +1,365 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/iomgr/sockaddr.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" +#include "src/core/lib/iomgr/tcp_server.h" +#include "src/core/lib/iomgr/tcp_uv.h" + +/* one listening port */ +typedef struct grpc_tcp_listener grpc_tcp_listener; +struct grpc_tcp_listener { + uv_tcp_t *handle; + grpc_tcp_server *server; + unsigned port_index; + int port; + /* linked list */ + struct grpc_tcp_listener *next; +}; + +struct grpc_tcp_server { + gpr_refcount refs; + + /* Called whenever accept() succeeds on a server port. */ + grpc_tcp_server_cb on_accept_cb; + void *on_accept_cb_arg; + + int open_ports; + + /* linked list of server ports */ + grpc_tcp_listener *head; + grpc_tcp_listener *tail; + + /* List of closures passed to shutdown_starting_add(). */ + grpc_closure_list shutdown_starting; + + /* shutdown callback */ + grpc_closure *shutdown_complete; +}; + +grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, + const grpc_channel_args *args, + grpc_tcp_server **server) { + grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); + (void)args; + gpr_ref_init(&s->refs, 1); + s->on_accept_cb = NULL; + s->on_accept_cb_arg = NULL; + s->open_ports = 0; + s->head = NULL; + s->tail = NULL; + s->shutdown_starting.head = NULL; + s->shutdown_starting.tail = NULL; + s->shutdown_complete = shutdown_complete; + *server = s; + return GRPC_ERROR_NONE; +} + +grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) { + gpr_ref(&s->refs); + return s; +} + +void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, + grpc_closure *shutdown_starting) { + grpc_closure_list_append(&s->shutdown_starting, shutdown_starting, + GRPC_ERROR_NONE); +} + +static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + if (s->shutdown_complete != NULL) { + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + } + + while (s->head) { + grpc_tcp_listener *sp = s->head; + s->head = sp->next; + sp->next = NULL; + gpr_free(sp->handle); + gpr_free(sp); + } + gpr_free(s); +} + +static void handle_close_callback(uv_handle_t *handle) { + grpc_tcp_listener *sp = (grpc_tcp_listener *)handle->data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + sp->server->open_ports--; + if (sp->server->open_ports == 0) { + finish_shutdown(&exec_ctx, sp->server); + } + grpc_exec_ctx_finish(&exec_ctx); +} + +static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + int immediately_done = 0; + grpc_tcp_listener *sp; + + if (s->open_ports == 0) { + immediately_done = 1; + } + for (sp = s->head; sp; sp = sp->next) { + uv_close((uv_handle_t *)sp->handle, handle_close_callback); + } + + if (immediately_done) { + finish_shutdown(exec_ctx, s); + } +} + +void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { + if (gpr_unref(&s->refs)) { + /* Complete shutdown_starting work before destroying. */ + grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL); + if (exec_ctx == NULL) { + grpc_exec_ctx_flush(&local_exec_ctx); + tcp_server_destroy(&local_exec_ctx, s); + grpc_exec_ctx_finish(&local_exec_ctx); + } else { + grpc_exec_ctx_finish(&local_exec_ctx); + tcp_server_destroy(exec_ctx, s); + } + } +} + +static void accepted_connection_close_cb(uv_handle_t *handle) { + gpr_free(handle); +} + +static void on_connect(uv_stream_t *server, int status) { + grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data; + grpc_tcp_server_acceptor acceptor = {sp->server, sp->port_index, 0}; + uv_tcp_t *client; + grpc_endpoint *ep = NULL; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resolved_address peer_name; + char *peer_name_string; + int err; + + if (status < 0) { + gpr_log(GPR_INFO, "Skipping on_accept due to error: %s", + uv_strerror(status)); + return; + } + client = gpr_malloc(sizeof(uv_tcp_t)); + uv_tcp_init(uv_default_loop(), client); + // UV documentation says this is guaranteed to succeed + uv_accept((uv_stream_t *)server, (uv_stream_t *)client); + // If the server has not been started, we discard incoming connections + if (sp->server->on_accept_cb == NULL) { + uv_close((uv_handle_t *)client, accepted_connection_close_cb); + } else { + peer_name_string = NULL; + memset(&peer_name, 0, sizeof(grpc_resolved_address)); + peer_name.len = sizeof(struct sockaddr_storage); + err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr, + (int *)&peer_name.len); + if (err == 0) { + peer_name_string = grpc_sockaddr_to_uri(&peer_name); + } else { + gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); + } + ep = grpc_tcp_create(client, peer_name_string); + sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, + &acceptor); + grpc_exec_ctx_finish(&exec_ctx); + } +} + +static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle, + const grpc_resolved_address *addr, + unsigned port_index, + grpc_tcp_listener **listener) { + grpc_tcp_listener *sp = NULL; + int port = -1; + int status; + grpc_error *error; + grpc_resolved_address sockname_temp; + + // The last argument to uv_tcp_bind is flags + status = uv_tcp_bind(handle, (struct sockaddr *)addr->addr, 0); + if (status != 0) { + error = GRPC_ERROR_CREATE("Failed to bind to port"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + return error; + } + + status = uv_listen((uv_stream_t *)handle, SOMAXCONN, on_connect); + if (status != 0) { + error = GRPC_ERROR_CREATE("Failed to listen to port"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + return error; + } + + sockname_temp.len = (int)sizeof(struct sockaddr_storage); + status = uv_tcp_getsockname(handle, (struct sockaddr *)&sockname_temp.addr, + (int *)&sockname_temp.len); + if (status != 0) { + error = GRPC_ERROR_CREATE("getsockname failed"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + return error; + } + + port = grpc_sockaddr_get_port(&sockname_temp); + + GPR_ASSERT(port >= 0); + GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server"); + sp = gpr_malloc(sizeof(grpc_tcp_listener)); + sp->next = NULL; + if (s->head == NULL) { + s->head = sp; + } else { + s->tail->next = sp; + } + s->tail = sp; + sp->server = s; + sp->handle = handle; + sp->port = port; + sp->port_index = port_index; + handle->data = sp; + s->open_ports++; + GPR_ASSERT(sp->handle); + *listener = sp; + + return GRPC_ERROR_NONE; +} + +grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, + const grpc_resolved_address *addr, + int *port) { + // This function is mostly copied from tcp_server_windows.c + grpc_tcp_listener *sp = NULL; + uv_tcp_t *handle; + grpc_resolved_address addr6_v4mapped; + grpc_resolved_address wildcard; + grpc_resolved_address *allocated_addr = NULL; + grpc_resolved_address sockname_temp; + unsigned port_index = 0; + int status; + grpc_error *error = GRPC_ERROR_NONE; + + if (s->tail != NULL) { + port_index = s->tail->port_index + 1; + } + + /* Check if this is a wildcard port, and if so, try to keep the port the same + as some previously created listener. */ + if (grpc_sockaddr_get_port(addr) == 0) { + for (sp = s->head; sp; sp = sp->next) { + sockname_temp.len = sizeof(struct sockaddr_storage); + if (0 == uv_tcp_getsockname(sp->handle, + (struct sockaddr *)&sockname_temp.addr, + (int *)&sockname_temp.len)) { + *port = grpc_sockaddr_get_port(&sockname_temp); + if (*port > 0) { + allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); + memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); + grpc_sockaddr_set_port(allocated_addr, *port); + addr = allocated_addr; + break; + } + } + } + } + + if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { + addr = &addr6_v4mapped; + } + + /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ + if (grpc_sockaddr_is_wildcard(addr, port)) { + grpc_sockaddr_make_wildcard6(*port, &wildcard); + + addr = &wildcard; + } + + handle = gpr_malloc(sizeof(uv_tcp_t)); + status = uv_tcp_init(uv_default_loop(), handle); + if (status == 0) { + error = add_socket_to_server(s, handle, addr, port_index, &sp); + } else { + error = GRPC_ERROR_CREATE("Failed to initialize UV tcp handle"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + } + + gpr_free(allocated_addr); + + if (error != GRPC_ERROR_NONE) { + grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING( + "Failed to add port to server", &error, 1); + GRPC_ERROR_UNREF(error); + error = error_out; + *port = -1; + } else { + GPR_ASSERT(sp != NULL); + *port = sp->port; + } + return error; +} + +void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server, + grpc_pollset **pollsets, size_t pollset_count, + grpc_tcp_server_cb on_accept_cb, void *cb_arg) { + grpc_tcp_listener *sp; + (void)pollsets; + (void)pollset_count; + GPR_ASSERT(on_accept_cb); + GPR_ASSERT(!server->on_accept_cb); + server->on_accept_cb = on_accept_cb; + server->on_accept_cb_arg = cb_arg; + for (sp = server->head; sp; sp = sp->next) { + GPR_ASSERT(uv_listen((uv_stream_t *)sp->handle, SOMAXCONN, on_connect) == + 0); + } +} + +void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, + grpc_tcp_server *s) {} + +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 4ff05601fa..ad6769a6ba 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -31,13 +31,13 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINSOCK_SOCKET +#ifdef GRPC_WINSOCK_SOCKET -#include <io.h> +#include "src/core/lib/iomgr/sockaddr.h" -#include "src/core/lib/iomgr/sockaddr_utils.h" +#include <io.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -48,6 +48,8 @@ #include "src/core/lib/iomgr/iocp_windows.h" #include "src/core/lib/iomgr/pollset_windows.h" +#include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_windows.h" #include "src/core/lib/iomgr/tcp_server.h" #include "src/core/lib/iomgr/tcp_windows.h" @@ -183,10 +185,10 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } /* Prepare (bind) a recently-created socket for listening. */ -static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr, - size_t addr_len, int *port) { - struct sockaddr_storage sockname_temp; - socklen_t sockname_len; +static grpc_error *prepare_socket(SOCKET sock, + const grpc_resolved_address *addr, + int *port) { + grpc_resolved_address sockname_temp; grpc_error *error = GRPC_ERROR_NONE; error = grpc_tcp_prepare_socket(sock); @@ -194,7 +196,8 @@ static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr, goto failure; } - if (bind(sock, addr, (int)addr_len) == SOCKET_ERROR) { + if (bind(sock, (const struct sockaddr *)addr->addr, (int)addr->len) == + SOCKET_ERROR) { error = GRPC_WSA_ERROR(WSAGetLastError(), "bind"); goto failure; } @@ -204,14 +207,14 @@ static grpc_error *prepare_socket(SOCKET sock, const struct sockaddr *addr, goto failure; } - sockname_len = sizeof(sockname_temp); - if (getsockname(sock, (struct sockaddr *)&sockname_temp, &sockname_len) == - SOCKET_ERROR) { + sockname_temp.len = sizeof(struct sockaddr_storage); + if (getsockname(sock, (struct sockaddr *)sockname_temp.addr, + &sockname_temp.len) == SOCKET_ERROR) { error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname"); goto failure; } - *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + *port = grpc_sockaddr_get_port(&sockname_temp); return GRPC_ERROR_NONE; failure: @@ -307,15 +310,16 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; - struct sockaddr_storage peer_name; + grpc_resolved_address peer_name; char *peer_name_string; char *fd_name; - int peer_name_len = sizeof(peer_name); DWORD transfered_bytes; DWORD flags; BOOL wsa_success; int err; + peer_name.len = sizeof(struct sockaddr_storage); + /* The general mechanism for shutting down is to queue abortion calls. While this is necessary in the read/write case, it's useless for the accept case. We only need to adjust the pending callback count */ @@ -353,9 +357,10 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message); gpr_free(utf8_message); } - err = getpeername(sock, (struct sockaddr *)&peer_name, &peer_name_len); + err = + getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name.len); if (!err) { - peer_name_string = grpc_sockaddr_to_uri((struct sockaddr *)&peer_name); + peer_name_string = grpc_sockaddr_to_uri(&peer_name); } else { char *utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message); @@ -385,8 +390,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, - const struct sockaddr *addr, - size_t addr_len, unsigned port_index, + const grpc_resolved_address *addr, + unsigned port_index, grpc_tcp_listener **listener) { grpc_tcp_listener *sp = NULL; int port = -1; @@ -410,7 +415,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, return NULL; } - error = prepare_socket(sock, addr, addr_len, &port); + error = prepare_socket(sock, addr, &port); if (error != GRPC_ERROR_NONE) { return error; } @@ -441,15 +446,15 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, return GRPC_ERROR_NONE; } -grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, - size_t addr_len, int *port) { +grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, + const grpc_resolved_address *addr, + int *port) { grpc_tcp_listener *sp = NULL; SOCKET sock; - struct sockaddr_in6 addr6_v4mapped; - struct sockaddr_in6 wildcard; - struct sockaddr *allocated_addr = NULL; - struct sockaddr_storage sockname_temp; - socklen_t sockname_len; + grpc_resolved_address addr6_v4mapped; + grpc_resolved_address wildcard; + grpc_resolved_address *allocated_addr = NULL; + grpc_resolved_address sockname_temp; unsigned port_index = 0; grpc_error *error = GRPC_ERROR_NONE; @@ -461,13 +466,14 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { for (sp = s->head; sp; sp = sp->next) { - sockname_len = sizeof(sockname_temp); + sockname_temp.len = sizeof(struct sockaddr_storage); if (0 == getsockname(sp->socket->socket, - (struct sockaddr *)&sockname_temp, &sockname_len)) { - *port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + (struct sockaddr *)sockname_temp.addr, + &sockname_temp.len)) { + *port = grpc_sockaddr_get_port(&sockname_temp); if (*port > 0) { - allocated_addr = gpr_malloc(addr_len); - memcpy(allocated_addr, addr, addr_len); + allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); + memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); grpc_sockaddr_set_port(allocated_addr, *port); addr = allocated_addr; break; @@ -477,16 +483,14 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, } if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { - addr = (const struct sockaddr *)&addr6_v4mapped; - addr_len = sizeof(addr6_v4mapped); + addr = &addr6_v4mapped; } /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ if (grpc_sockaddr_is_wildcard(addr, port)) { grpc_sockaddr_make_wildcard6(*port, &wildcard); - addr = (struct sockaddr *)&wildcard; - addr_len = sizeof(wildcard); + addr = &wildcard; } sock = WSASocket(AF_INET6, SOCK_STREAM, IPPROTO_TCP, NULL, 0, @@ -496,7 +500,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, goto done; } - error = add_socket_to_server(s, sock, addr, addr_len, port_index, &sp); + error = add_socket_to_server(s, sock, addr, port_index, &sp); done: gpr_free(allocated_addr); @@ -535,4 +539,4 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {} -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c new file mode 100644 index 0000000000..05d6eeb240 --- /dev/null +++ b/src/core/lib/iomgr/tcp_uv.c @@ -0,0 +1,335 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV + +#include <limits.h> +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/slice_buffer.h> +#include <grpc/support/string_util.h> + +#include "src/core/lib/iomgr/error.h" +#include "src/core/lib/iomgr/network_status_tracker.h" +#include "src/core/lib/iomgr/tcp_uv.h" +#include "src/core/lib/support/string.h" + +int grpc_tcp_trace = 0; + +typedef struct { + grpc_endpoint base; + gpr_refcount refcount; + + uv_tcp_t *handle; + + grpc_closure *read_cb; + grpc_closure *write_cb; + + gpr_slice read_slice; + gpr_slice_buffer *read_slices; + gpr_slice_buffer *write_slices; + uv_buf_t *write_buffers; + + bool shutting_down; + char *peer_string; + grpc_pollset *pollset; +} grpc_tcp; + +static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); } + +static void tcp_free(grpc_tcp *tcp) { gpr_free(tcp); } + +/*#define GRPC_TCP_REFCOUNT_DEBUG*/ +#ifdef GRPC_TCP_REFCOUNT_DEBUG +#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +#define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) +static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count - 1); + if (gpr_unref(&tcp->refcount)) { + tcp_free(tcp); + } +} + +static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, + int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp, + reason, tcp->refcount.count, tcp->refcount.count + 1); + gpr_ref(&tcp->refcount); +} +#else +#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_REF(tcp, reason) tcp_ref((tcp)) +static void tcp_unref(grpc_tcp *tcp) { + if (gpr_unref(&tcp->refcount)) { + tcp_free(tcp); + } +} + +static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } +#endif + +static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, + uv_buf_t *buf) { + grpc_tcp *tcp = handle->data; + (void)suggested_size; + tcp->read_slice = gpr_slice_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + buf->base = (char *)GPR_SLICE_START_PTR(tcp->read_slice); + buf->len = GPR_SLICE_LENGTH(tcp->read_slice); +} + +static void read_callback(uv_stream_t *stream, ssize_t nread, + const uv_buf_t *buf) { + gpr_slice sub; + grpc_error *error; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_tcp *tcp = stream->data; + grpc_closure *cb = tcp->read_cb; + if (nread == 0) { + // Nothing happened. Wait for the next callback + return; + } + TCP_UNREF(tcp, "read"); + tcp->read_cb = NULL; + // TODO(murgatroid99): figure out what the return value here means + uv_read_stop(stream); + if (nread == UV_EOF) { + error = GRPC_ERROR_CREATE("EOF"); + } else if (nread > 0) { + // Successful read + sub = gpr_slice_sub_no_ref(tcp->read_slice, 0, (size_t)nread); + gpr_slice_buffer_add(tcp->read_slices, sub); + error = GRPC_ERROR_NONE; + if (grpc_tcp_trace) { + size_t i; + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "read: error=%s", str); + grpc_error_free_string(str); + for (i = 0; i < tcp->read_slices->count; i++) { + char *dump = gpr_dump_slice(tcp->read_slices->slices[i], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, + dump); + gpr_free(dump); + } + } + } else { + // nread < 0: Error + error = GRPC_ERROR_CREATE("TCP Read failed"); + } + grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + gpr_slice_buffer *read_slices, grpc_closure *cb) { + grpc_tcp *tcp = (grpc_tcp *)ep; + int status; + grpc_error *error = GRPC_ERROR_NONE; + GPR_ASSERT(tcp->read_cb == NULL); + tcp->read_cb = cb; + tcp->read_slices = read_slices; + gpr_slice_buffer_reset_and_unref(read_slices); + TCP_REF(tcp, "read"); + // TODO(murgatroid99): figure out what the return value here means + status = + uv_read_start((uv_stream_t *)tcp->handle, alloc_uv_buf, read_callback); + if (status != 0) { + error = GRPC_ERROR_CREATE("TCP Read failed at start"); + error = + grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status)); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); + } + if (grpc_tcp_trace) { + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "Initiating read on %p: error=%s", tcp, str); + } +} + +static void write_callback(uv_write_t *req, int status) { + grpc_tcp *tcp = req->data; + grpc_error *error; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_closure *cb = tcp->write_cb; + tcp->write_cb = NULL; + TCP_UNREF(tcp, "write"); + if (status == 0) { + error = GRPC_ERROR_NONE; + } else { + error = GRPC_ERROR_CREATE("TCP Write failed"); + } + if (grpc_tcp_trace) { + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); + } + gpr_free(tcp->write_buffers); + gpr_free(req); + grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + gpr_slice_buffer *write_slices, + grpc_closure *cb) { + grpc_tcp *tcp = (grpc_tcp *)ep; + uv_buf_t *buffers; + unsigned int buffer_count; + unsigned int i; + gpr_slice *slice; + uv_write_t *write_req; + + if (grpc_tcp_trace) { + size_t j; + + for (j = 0; j < write_slices->count; j++) { + char *data = gpr_dump_slice(write_slices->slices[j], + GPR_DUMP_HEX | GPR_DUMP_ASCII); + gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data); + gpr_free(data); + } + } + + if (tcp->shutting_down) { + grpc_exec_ctx_sched(exec_ctx, cb, + GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); + return; + } + + GPR_ASSERT(tcp->write_cb == NULL); + tcp->write_slices = write_slices; + GPR_ASSERT(tcp->write_slices->count <= UINT_MAX); + if (tcp->write_slices->count == 0) { + // No slices means we don't have to do anything, + // and libuv doesn't like empty writes + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + return; + } + + tcp->write_cb = cb; + buffer_count = (unsigned int)tcp->write_slices->count; + buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count); + for (i = 0; i < buffer_count; i++) { + slice = &tcp->write_slices->slices[i]; + buffers[i].base = (char *)GPR_SLICE_START_PTR(*slice); + buffers[i].len = GPR_SLICE_LENGTH(*slice); + } + write_req = gpr_malloc(sizeof(uv_write_t)); + write_req->data = tcp; + TCP_REF(tcp, "write"); + // TODO(murgatroid99): figure out what the return value here means + uv_write(write_req, (uv_stream_t *)tcp->handle, buffers, buffer_count, + write_callback); +} + +static void uv_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset *pollset) { + // No-op. We're ignoring pollsets currently + (void)exec_ctx; + (void)ep; + (void)pollset; + grpc_tcp *tcp = (grpc_tcp *)ep; + tcp->pollset = pollset; +} + +static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + grpc_pollset_set *pollset) { + // No-op. We're ignoring pollsets currently + (void)exec_ctx; + (void)ep; + (void)pollset; +} + +static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); } + +static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + if (!tcp->shutting_down) { + tcp->shutting_down = true; + uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t)); + uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); + } +} + +static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { + grpc_network_status_unregister_endpoint(ep); + grpc_tcp *tcp = (grpc_tcp *)ep; + uv_close((uv_handle_t *)tcp->handle, uv_close_callback); + TCP_UNREF(tcp, "destroy"); +} + +static char *uv_get_peer(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return gpr_strdup(tcp->peer_string); +} + +static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } + +static grpc_endpoint_vtable vtable = {uv_endpoint_read, + uv_endpoint_write, + uv_get_workqueue, + uv_add_to_pollset, + uv_add_to_pollset_set, + uv_endpoint_shutdown, + uv_destroy, + uv_get_peer}; + +grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { + grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); + + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "Creating TCP endpoint %p", tcp); + } + + memset(tcp, 0, sizeof(grpc_tcp)); + tcp->base.vtable = &vtable; + tcp->handle = handle; + handle->data = tcp; + gpr_ref_init(&tcp->refcount, 1); + tcp->peer_string = gpr_strdup(peer_string); + tcp->shutting_down = false; + /* Tell network status tracking code about the new endpoint */ + grpc_network_status_register_endpoint(&tcp->base); + +#ifndef GRPC_UV_TCP_HOLD_LOOP + uv_unref((uv_handle_t *)handle); +#endif + + return &tcp->base; +} + +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h new file mode 100644 index 0000000000..eed41151ea --- /dev/null +++ b/src/core/lib/iomgr/tcp_uv.h @@ -0,0 +1,57 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TCP_UV_H +#define GRPC_CORE_LIB_IOMGR_TCP_UV_H +/* + Low level TCP "bottom half" implementation, for use by transports built on + top of a TCP connection. + + Note that this file does not (yet) include APIs for creating the socket in + the first place. + + All calls passing slice transfer ownership of a slice refcount unless + otherwise specified. +*/ + +#include "src/core/lib/iomgr/endpoint.h" + +#include <uv.h> + +extern int grpc_tcp_trace; + +#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 + +grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string); + +#endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */ diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 448a72671c..a5f508a2c3 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_WINSOCK_SOCKET +#ifdef GRPC_WINSOCK_SOCKET #include <limits.h> @@ -417,4 +417,4 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { return &tcp->base; } -#endif /* GPR_WINSOCK_SOCKET */ +#endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h index 5a9a177963..20fe98c4a7 100644 --- a/src/core/lib/iomgr/timer.h +++ b/src/core/lib/iomgr/timer.h @@ -34,19 +34,20 @@ #ifndef GRPC_CORE_LIB_IOMGR_TIMER_H #define GRPC_CORE_LIB_IOMGR_TIMER_H +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV +#include "src/core/lib/iomgr/timer_uv.h" +#else +#include "src/core/lib/iomgr/timer_generic.h" +#endif /* GRPC_UV */ + #include <grpc/support/port_platform.h> #include <grpc/support/time.h> #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" -typedef struct grpc_timer { - gpr_timespec deadline; - uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ - int triggered; - struct grpc_timer *next; - struct grpc_timer *prev; - grpc_closure closure; -} grpc_timer; +typedef struct grpc_timer grpc_timer; /* Initialize *timer. When expired or canceled, timer_cb will be called with *timer_cb_arg and error set to indicate if it expired (GRPC_ERROR_NONE) or diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer_generic.c index 9975fa1671..00058f9d86 100644 --- a/src/core/lib/iomgr/timer.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -31,6 +31,10 @@ * */ +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_TIMER_USE_GENERIC + #include "src/core/lib/iomgr/timer.h" #include <grpc/support/log.h> @@ -382,3 +386,5 @@ bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("Shutting down timer system")); } + +#endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h new file mode 100644 index 0000000000..e4494adb5f --- /dev/null +++ b/src/core/lib/iomgr/timer_generic.h @@ -0,0 +1,49 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H +#define GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H + +#include <grpc/support/time.h> +#include "src/core/lib/iomgr/exec_ctx.h" + +struct grpc_timer { + gpr_timespec deadline; + uint32_t heap_index; /* INVALID_HEAP_INDEX if not in heap */ + int triggered; + struct grpc_timer *next; + struct grpc_timer *prev; + grpc_closure closure; +}; + +#endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */ diff --git a/src/core/lib/iomgr/timer_heap.c b/src/core/lib/iomgr/timer_heap.c index 2ad9bb9cd2..f736d335e6 100644 --- a/src/core/lib/iomgr/timer_heap.c +++ b/src/core/lib/iomgr/timer_heap.c @@ -31,6 +31,10 @@ * */ +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_TIMER_USE_GENERIC + #include "src/core/lib/iomgr/timer_heap.h" #include <string.h> @@ -144,3 +148,5 @@ grpc_timer *grpc_timer_heap_top(grpc_timer_heap *heap) { void grpc_timer_heap_pop(grpc_timer_heap *heap) { grpc_timer_heap_remove(heap, grpc_timer_heap_top(heap)); } + +#endif /* GRPC_TIMER_USE_GENERIC */ diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c new file mode 100644 index 0000000000..cfcb89268b --- /dev/null +++ b/src/core/lib/iomgr/timer_uv.c @@ -0,0 +1,99 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#if GRPC_UV + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/iomgr/timer.h" + +#include <uv.h> + +static void timer_close_callback(uv_handle_t *handle) { gpr_free(handle); } + +static void stop_uv_timer(uv_timer_t *handle) { + uv_timer_stop(handle); + uv_unref((uv_handle_t *)handle); + uv_close((uv_handle_t *)handle, timer_close_callback); +} + +void run_expired_timer(uv_timer_t *handle) { + grpc_timer *timer = (grpc_timer *)handle->data; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GPR_ASSERT(!timer->triggered); + timer->triggered = 1; + grpc_exec_ctx_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); + stop_uv_timer(handle); + grpc_exec_ctx_finish(&exec_ctx); +} + +void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, + gpr_timespec deadline, grpc_iomgr_cb_func timer_cb, + void *timer_cb_arg, gpr_timespec now) { + uint64_t timeout; + uv_timer_t *uv_timer; + grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg); + if (gpr_time_cmp(deadline, now) <= 0) { + timer->triggered = 1; + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); + return; + } + timer->triggered = 0; + timeout = (uint64_t)gpr_time_to_millis(gpr_time_sub(deadline, now)); + uv_timer = gpr_malloc(sizeof(uv_timer_t)); + uv_timer_init(uv_default_loop(), uv_timer); + uv_timer->data = timer; + timer->uv_timer = uv_timer; + uv_timer_start(uv_timer, run_expired_timer, timeout, 0); +} + +void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { + if (!timer->triggered) { + timer->triggered = 1; + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); + stop_uv_timer((uv_timer_t *)timer->uv_timer); + } +} + +bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, + gpr_timespec *next) { + return false; +} + +void grpc_timer_list_init(gpr_timespec now) {} +void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {} + +#endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h new file mode 100644 index 0000000000..3de383ebd5 --- /dev/null +++ b/src/core/lib/iomgr/timer_uv.h @@ -0,0 +1,47 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_TIMER_UV_H +#define GRPC_CORE_LIB_IOMGR_TIMER_UV_H + +#include "src/core/lib/iomgr/exec_ctx.h" + +struct grpc_timer { + grpc_closure closure; + /* This is actually a uv_timer_t*, but we want to keep platform-specific + types out of headers */ + void *uv_timer; + int triggered; +}; + +#endif /* GRPC_CORE_LIB_IOMGR_TIMER_UV_H */ diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index edf7b133e9..fd0c7a0f9d 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -36,9 +36,9 @@ #define _GNU_SOURCE #endif -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_SOCKET +#ifdef GRPC_POSIX_SOCKET #include "src/core/lib/iomgr/udp_server.h" @@ -62,32 +62,30 @@ #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/socket_utils_posix.h" +#include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" -#define INIT_PORT_CAP 2 - /* one listening port */ -typedef struct { +typedef struct grpc_udp_listener grpc_udp_listener; +struct grpc_udp_listener { int fd; grpc_fd *emfd; grpc_udp_server *server; - union { - uint8_t untyped[GRPC_MAX_SOCKADDR_SIZE]; - struct sockaddr sockaddr; - } addr; - size_t addr_len; + grpc_resolved_address addr; grpc_closure read_closure; grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; grpc_udp_server_orphan_cb orphan_cb; -} server_port; + + struct grpc_udp_listener *next; +}; /* the overall server */ struct grpc_udp_server { gpr_mu mu; - gpr_cv cv; /* active port count: how many ports are actually still listening */ size_t active_ports; @@ -97,10 +95,10 @@ struct grpc_udp_server { /* is this server shutting down? (boolean) */ int shutdown; - /* all listening ports */ - server_port *ports; - size_t nports; - size_t port_capacity; + /* linked list of server ports */ + grpc_udp_listener *head; + grpc_udp_listener *tail; + unsigned nports; /* shutdown callback */ grpc_closure *shutdown_complete; @@ -116,24 +114,29 @@ struct grpc_udp_server { grpc_udp_server *grpc_udp_server_create(void) { grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); - gpr_cv_init(&s->cv); s->active_ports = 0; s->destroyed_ports = 0; s->shutdown = 0; - s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); + s->head = NULL; + s->tail = NULL; s->nports = 0; - s->port_capacity = INIT_PORT_CAP; return s; } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + if (s->shutdown_complete != NULL) { + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + } gpr_mu_destroy(&s->mu); - gpr_cv_destroy(&s->cv); - gpr_free(s->ports); + while (s->head) { + grpc_udp_listener *sp = s->head; + s->head = sp->next; + gpr_free(sp); + } + gpr_free(s); } @@ -154,8 +157,6 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, events will be received on them - at this point it's safe to destroy things */ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - size_t i; - /* delete ALL the things */ gpr_mu_lock(&s->mu); @@ -164,9 +165,11 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { return; } - if (s->nports) { - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; + if (s->head) { + grpc_udp_listener *sp; + for (sp = s->head; sp; sp = sp->next) { + grpc_unlink_if_unix_domain_socket(&sp->addr); + sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; @@ -187,7 +190,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_closure *on_done) { - size_t i; + grpc_udp_listener *sp; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -197,14 +200,10 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, /* shutdown all fd's */ if (s->active_ports) { - for (i = 0; i < s->nports; i++) { - server_port *sp = &s->ports[i]; - /* Call the orphan_cb to signal that the FD is about to be closed and - * should no longer be used. */ + for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); sp->orphan_cb(sp->emfd); - - grpc_fd_shutdown(exec_ctx, s->ports[i].emfd); + grpc_fd_shutdown(exec_ctx, sp->emfd); } gpr_mu_unlock(&s->mu); } else { @@ -214,10 +213,9 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, } /* Prepare a recently-created socket for listening. */ -static int prepare_socket(int fd, const struct sockaddr *addr, - size_t addr_len) { - struct sockaddr_storage sockname_temp; - socklen_t sockname_len; +static int prepare_socket(int fd, const grpc_resolved_address *addr) { + grpc_resolved_address sockname_temp; + struct sockaddr *addr_ptr = (struct sockaddr *)addr->addr; /* Set send/receive socket buffers to 1 MB */ int buffer_size_bytes = 1024 * 1024; @@ -237,15 +235,15 @@ static int prepare_socket(int fd, const struct sockaddr *addr, if (grpc_set_socket_ip_pktinfo_if_possible(fd) != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Unable to set ip_pktinfo."); goto error; - } else if (addr->sa_family == AF_INET6) { + } else if (addr_ptr->sa_family == AF_INET6) { if (grpc_set_socket_ipv6_recvpktinfo_if_possible(fd) != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Unable to set ipv6_recvpktinfo."); goto error; } } - GPR_ASSERT(addr_len < ~(socklen_t)0); - if (bind(fd, addr, (socklen_t)addr_len) < 0) { + GPR_ASSERT(addr->len < ~(socklen_t)0); + if (bind(fd, (struct sockaddr *)addr, (socklen_t)addr->len) < 0) { char *addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); @@ -253,8 +251,10 @@ static int prepare_socket(int fd, const struct sockaddr *addr, goto error; } - sockname_len = sizeof(sockname_temp); - if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) { + sockname_temp.len = sizeof(struct sockaddr_storage); + + if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, + (socklen_t *)&sockname_temp.len) < 0) { goto error; } @@ -270,7 +270,7 @@ static int prepare_socket(int fd, const struct sockaddr *addr, goto error; } - return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + return grpc_sockaddr_get_port(&sockname_temp); error: if (fd >= 0) { @@ -281,10 +281,10 @@ error: /* event manager callback when reads are ready */ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - server_port *sp = arg; + grpc_udp_listener *sp = arg; + gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { - gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); deactivated_all_ports(exec_ctx, sp->server); @@ -300,34 +300,37 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* Re-arm the notification event so we get another chance to read. */ grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + gpr_mu_unlock(&sp->server->mu); } static int add_socket_to_server(grpc_udp_server *s, int fd, - const struct sockaddr *addr, size_t addr_len, + const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, grpc_udp_server_orphan_cb orphan_cb) { - server_port *sp; + grpc_udp_listener *sp; int port; char *addr_str; char *name; - port = prepare_socket(fd, addr, addr_len); + port = prepare_socket(fd, addr); if (port >= 0) { - grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_free(addr_str); gpr_mu_lock(&s->mu); - /* append it to the list under a lock */ - if (s->nports == s->port_capacity) { - s->port_capacity *= 2; - s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity); + s->nports++; + sp = gpr_malloc(sizeof(grpc_udp_listener)); + sp->next = NULL; + if (s->head == NULL) { + s->head = sp; + } else { + s->tail->next = sp; } - sp = &s->ports[s->nports++]; + s->tail = sp; sp->server = s; sp->fd = fd; sp->emfd = grpc_fd_create(fd, name); - memcpy(sp->addr.untyped, addr, addr_len); - sp->addr_len = addr_len; + memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->read_cb = read_cb; sp->orphan_cb = orphan_cb; GPR_ASSERT(sp->emfd); @@ -338,34 +341,34 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, return port; } -int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, - size_t addr_len, grpc_udp_server_read_cb read_cb, +int grpc_udp_server_add_port(grpc_udp_server *s, + const grpc_resolved_address *addr, + grpc_udp_server_read_cb read_cb, grpc_udp_server_orphan_cb orphan_cb) { + grpc_udp_listener *sp; int allocated_port1 = -1; int allocated_port2 = -1; - unsigned i; int fd; grpc_dualstack_mode dsmode; - struct sockaddr_in6 addr6_v4mapped; - struct sockaddr_in wild4; - struct sockaddr_in6 wild6; - struct sockaddr_in addr4_copy; - struct sockaddr *allocated_addr = NULL; - struct sockaddr_storage sockname_temp; - socklen_t sockname_len; + grpc_resolved_address addr6_v4mapped; + grpc_resolved_address wild4; + grpc_resolved_address wild6; + grpc_resolved_address addr4_copy; + grpc_resolved_address *allocated_addr = NULL; + grpc_resolved_address sockname_temp; int port; /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ if (grpc_sockaddr_get_port(addr) == 0) { - for (i = 0; i < s->nports; i++) { - sockname_len = sizeof(sockname_temp); - if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp, - &sockname_len)) { - port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp); + for (sp = s->head; sp; sp = sp->next) { + sockname_temp.len = sizeof(struct sockaddr_storage); + if (0 == getsockname(sp->fd, (struct sockaddr *)sockname_temp.addr, + (socklen_t *)&sockname_temp.len)) { + port = grpc_sockaddr_get_port(&sockname_temp); if (port > 0) { - allocated_addr = gpr_malloc(addr_len); - memcpy(allocated_addr, addr, addr_len); + allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); + memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); grpc_sockaddr_set_port(allocated_addr, port); addr = allocated_addr; break; @@ -375,8 +378,7 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, } if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { - addr = (const struct sockaddr *)&addr6_v4mapped; - addr_len = sizeof(addr6_v4mapped); + addr = &addr6_v4mapped; } /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ @@ -384,22 +386,19 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, grpc_sockaddr_make_wildcards(port, &wild4, &wild6); /* Try listening on IPv6 first. */ - addr = (struct sockaddr *)&wild6; - addr_len = sizeof(wild6); + addr = &wild6; // TODO(rjshade): Test and propagate the returned grpc_error*: grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd); - allocated_port1 = - add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb); + allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ if (port == 0 && allocated_port1 > 0) { - grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1); + grpc_sockaddr_set_port(&wild4, allocated_port1); } - addr = (struct sockaddr *)&wild4; - addr_len = sizeof(wild4); + addr = &wild4; } // TODO(rjshade): Test and propagate the returned grpc_error*: @@ -409,11 +408,9 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, } if (dsmode == GRPC_DSMODE_IPV4 && grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { - addr = (struct sockaddr *)&addr4_copy; - addr_len = sizeof(addr4_copy); + addr = &addr4_copy; } - allocated_port2 = - add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb); + allocated_port2 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -421,27 +418,40 @@ done: } int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { - return (port_index < s->nports) ? s->ports[port_index].fd : -1; + grpc_udp_listener *sp; + if (port_index >= s->nports) { + return -1; + } + + for (sp = s->head; sp && port_index != 0; sp = sp->next) { + --port_index; + } + return sp->fd; } void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_pollset **pollsets, size_t pollset_count, grpc_server *server) { - size_t i, j; + size_t i; gpr_mu_lock(&s->mu); + grpc_udp_listener *sp; GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; s->grpc_server = server; - for (i = 0; i < s->nports; i++) { - for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd); + + sp = s->head; + while (sp != NULL) { + for (i = 0; i < pollset_count; i++) { + grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); } - s->ports[i].read_closure.cb = on_read; - s->ports[i].read_closure.cb_arg = &s->ports[i]; - grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd, - &s->ports[i].read_closure); + sp->read_closure.cb = on_read; + sp->read_closure.cb_arg = sp; + grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + s->active_ports++; + sp = sp->next; } + gpr_mu_unlock(&s->mu); } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index 33c5ce11cd..f3c466a031 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -36,6 +36,7 @@ #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/resolve_address.h" /* Forward decl of struct grpc_server */ /* This is not typedef'ed to avoid a typedef-redefinition error */ @@ -59,7 +60,7 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *udp_server, grpc_pollset **pollsets, size_t pollset_count, struct grpc_server *server); -int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); +int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index); /* Add a port to the server, returning port number on success, or negative on failure. @@ -71,8 +72,9 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); /* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle all of the multiple socket port matching logic in one place */ -int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, - size_t addr_len, grpc_udp_server_read_cb read_cb, +int grpc_udp_server_add_port(grpc_udp_server *s, + const grpc_resolved_address *addr, + grpc_udp_server_read_cb read_cb, grpc_udp_server_orphan_cb orphan_cb); void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, diff --git a/src/core/lib/iomgr/unix_sockets_posix.c b/src/core/lib/iomgr/unix_sockets_posix.c index 0e7670e5a5..030acd9811 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.c +++ b/src/core/lib/iomgr/unix_sockets_posix.c @@ -30,16 +30,19 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ +#include "src/core/lib/iomgr/port.h" -#include "src/core/lib/iomgr/unix_sockets_posix.h" +#ifdef GRPC_HAVE_UNIX_SOCKET -#ifdef GPR_HAVE_UNIX_SOCKET +#include "src/core/lib/iomgr/sockaddr.h" #include <string.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/un.h> +#include "src/core/lib/iomgr/unix_sockets_posix.h" + #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -61,15 +64,18 @@ grpc_error *grpc_resolve_unix_domain_address(const char *name, return GRPC_ERROR_NONE; } -int grpc_is_unix_socket(const struct sockaddr *addr) { +int grpc_is_unix_socket(const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; return addr->sa_family == AF_UNIX; } -void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) { +void grpc_unlink_if_unix_domain_socket( + const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; if (addr->sa_family != AF_UNIX) { return; } - struct sockaddr_un *un = (struct sockaddr_un *)addr; + struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr; struct stat st; if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) { @@ -77,7 +83,9 @@ void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) { } } -char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr) { +char *grpc_sockaddr_to_uri_unix_if_possible( + const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; if (addr->sa_family != AF_UNIX) { return NULL; } diff --git a/src/core/lib/iomgr/unix_sockets_posix.h b/src/core/lib/iomgr/unix_sockets_posix.h index db0516d945..21afd3aa15 100644 --- a/src/core/lib/iomgr/unix_sockets_posix.h +++ b/src/core/lib/iomgr/unix_sockets_posix.h @@ -34,22 +34,23 @@ #ifndef GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H #define GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" #include <grpc/support/string_util.h> #include "src/core/lib/iomgr/resolve_address.h" -#include "src/core/lib/iomgr/sockaddr.h" void grpc_create_socketpair_if_unix(int sv[2]); grpc_error *grpc_resolve_unix_domain_address( const char *name, grpc_resolved_addresses **addresses); -int grpc_is_unix_socket(const struct sockaddr *addr); +int grpc_is_unix_socket(const grpc_resolved_address *resolved_addr); -void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr); +void grpc_unlink_if_unix_domain_socket( + const grpc_resolved_address *resolved_addr); -char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr); +char *grpc_sockaddr_to_uri_unix_if_possible( + const grpc_resolved_address *resolved_addr); #endif /* GRPC_CORE_LIB_IOMGR_UNIX_SOCKETS_POSIX_H */ diff --git a/src/core/lib/iomgr/unix_sockets_posix_noop.c b/src/core/lib/iomgr/unix_sockets_posix_noop.c index 56b47c3daf..1daf5152c1 100644 --- a/src/core/lib/iomgr/unix_sockets_posix_noop.c +++ b/src/core/lib/iomgr/unix_sockets_posix_noop.c @@ -33,7 +33,7 @@ #include "src/core/lib/iomgr/unix_sockets_posix.h" -#ifndef GPR_HAVE_UNIX_SOCKET +#ifndef GRPC_HAVE_UNIX_SOCKET #include <grpc/support/log.h> @@ -50,11 +50,11 @@ grpc_error *grpc_resolve_unix_domain_address( return GRPC_ERROR_CREATE("Unix domain sockets are not supported on Windows"); } -int grpc_is_unix_socket(const struct sockaddr *addr) { return false; } +int grpc_is_unix_socket(const grpc_resolved_address *addr) { return false; } -void grpc_unlink_if_unix_domain_socket(const struct sockaddr *addr) {} +void grpc_unlink_if_unix_domain_socket(const grpc_resolved_address *addr) {} -char *grpc_sockaddr_to_uri_unix_if_possible(const struct sockaddr *addr) { +char *grpc_sockaddr_to_uri_unix_if_possible(const grpc_resolved_address *addr) { return NULL; } diff --git a/src/core/lib/iomgr/wakeup_fd_cv.c b/src/core/lib/iomgr/wakeup_fd_cv.c index b4165208ed..da4c2870cd 100644 --- a/src/core/lib/iomgr/wakeup_fd_cv.c +++ b/src/core/lib/iomgr/wakeup_fd_cv.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_WAKEUP_FD +#ifdef GRPC_POSIX_WAKEUP_FD #include "src/core/lib/iomgr/wakeup_fd_cv.h" @@ -115,4 +115,4 @@ const grpc_wakeup_fd_vtable grpc_cv_wakeup_fd_vtable = { cv_fd_init, cv_fd_consume, cv_fd_wakeup, cv_fd_destroy, cv_check_availability}; -#endif /* GPR_POSIX_WAKUP_FD */ +#endif /* GRPC_POSIX_WAKUP_FD */ diff --git a/src/core/lib/iomgr/wakeup_fd_eventfd.c b/src/core/lib/iomgr/wakeup_fd_eventfd.c index 95f6102330..373e21d3e1 100644 --- a/src/core/lib/iomgr/wakeup_fd_eventfd.c +++ b/src/core/lib/iomgr/wakeup_fd_eventfd.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_LINUX_EVENTFD +#ifdef GRPC_LINUX_EVENTFD #include <errno.h> #include <sys/eventfd.h> @@ -94,4 +94,4 @@ const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = { eventfd_create, eventfd_consume, eventfd_wakeup, eventfd_destroy, eventfd_check_availability}; -#endif /* GPR_LINUX_EVENTFD */ +#endif /* GRPC_LINUX_EVENTFD */ diff --git a/src/core/lib/iomgr/wakeup_fd_nospecial.c b/src/core/lib/iomgr/wakeup_fd_nospecial.c index cb2f707dc5..611bced029 100644 --- a/src/core/lib/iomgr/wakeup_fd_nospecial.c +++ b/src/core/lib/iomgr/wakeup_fd_nospecial.c @@ -36,9 +36,9 @@ * systems without anything better than pipe. */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_NO_SPECIAL_WAKEUP_FD +#ifdef GRPC_POSIX_NO_SPECIAL_WAKEUP_FD #include <stddef.h> #include "src/core/lib/iomgr/wakeup_fd_posix.h" @@ -48,4 +48,4 @@ static int check_availability_invalid(void) { return 0; } const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = { NULL, NULL, NULL, NULL, check_availability_invalid}; -#endif /* GPR_POSIX_NO_SPECIAL_WAKEUP_FD */ +#endif /* GRPC_POSIX_NO_SPECIAL_WAKEUP_FD */ diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c index d0ea216aa0..183f0eb930 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_WAKEUP_FD +#ifdef GRPC_POSIX_WAKEUP_FD #include "src/core/lib/iomgr/wakeup_fd_posix.h" diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c index 5c894bef37..85526402bd 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.c @@ -31,9 +31,9 @@ * */ -#include <grpc/support/port_platform.h> +#include "src/core/lib/iomgr/port.h" -#ifdef GPR_POSIX_WAKEUP_FD +#ifdef GRPC_POSIX_WAKEUP_FD #include <stddef.h> #include "src/core/lib/iomgr/wakeup_fd_cv.h" @@ -98,4 +98,4 @@ void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) { } } -#endif /* GPR_POSIX_WAKEUP_FD */ +#endif /* GRPC_POSIX_WAKEUP_FD */ diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 5b96d1d851..73d9849843 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -39,6 +39,7 @@ #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/pollset.h" #include "src/core/lib/iomgr/pollset_set.h" +#include "src/core/lib/iomgr/port.h" #ifdef GPR_WINDOWS #include "src/core/lib/iomgr/workqueue_windows.h" diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c new file mode 100644 index 0000000000..e58ca476cc --- /dev/null +++ b/src/core/lib/iomgr/workqueue_uv.c @@ -0,0 +1,66 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#ifdef GRPC_UV + +#include "src/core/lib/iomgr/workqueue.h" + +// Minimal implementation of grpc_workqueue for libuv +// Works by directly enqueuing workqueue items onto the current execution +// context, which is at least correct, if not performant or in the spirit of +// workqueues. + +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, + int line, const char *reason) { + return workqueue; +} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) {} +#else +grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) { + return workqueue; +} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} +#endif + +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +} + +#endif /* GPR_UV */ diff --git a/src/core/lib/iomgr/workqueue_uv.h b/src/core/lib/iomgr/workqueue_uv.h new file mode 100644 index 0000000000..be3f8e4d93 --- /dev/null +++ b/src/core/lib/iomgr/workqueue_uv.h @@ -0,0 +1,37 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H +#define GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H + +#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_UV_H */ diff --git a/src/core/lib/security/credentials/google_default/credentials_posix.c b/src/core/lib/security/credentials/google_default/credentials_generic.c index 42c9d7f997..d13d8c5200 100644 --- a/src/core/lib/security/credentials/google_default/credentials_posix.c +++ b/src/core/lib/security/credentials/google_default/credentials_generic.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -31,10 +31,6 @@ * */ -#include <grpc/support/port_platform.h> - -#ifdef GPR_POSIX_FILE - #include "src/core/lib/security/credentials/google_default/google_default_credentials.h" #include <grpc/support/alloc.h> @@ -46,16 +42,13 @@ char *grpc_get_well_known_google_credentials_file_path_impl(void) { char *result = NULL; - char *home = gpr_getenv("HOME"); - if (home == NULL) { - gpr_log(GPR_ERROR, "Could not get HOME environment variable."); + char *base = gpr_getenv(GRPC_GOOGLE_CREDENTIALS_PATH_ENV_VAR); + if (base == NULL) { + gpr_log(GPR_ERROR, "Could not get " GRPC_GOOGLE_CREDENTIALS_ENV_VAR + " environment variable."); return NULL; } - gpr_asprintf(&result, "%s/.config/%s/%s", home, - GRPC_GOOGLE_CLOUD_SDK_CONFIG_DIRECTORY, - GRPC_GOOGLE_WELL_KNOWN_CREDENTIALS_FILE); - gpr_free(home); + gpr_asprintf(&result, "%s/%s", base, GRPC_GOOGLE_CREDENTIALS_PATH_SUFFIX); + gpr_free(base); return result; } - -#endif /* GPR_POSIX_FILE */ diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.h b/src/core/lib/security/credentials/google_default/google_default_credentials.h index fac4377e2c..b55546ded0 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.h +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.h @@ -34,12 +34,26 @@ #ifndef GRPC_CORE_LIB_SECURITY_CREDENTIALS_GOOGLE_DEFAULT_GOOGLE_DEFAULT_CREDENTIALS_H #define GRPC_CORE_LIB_SECURITY_CREDENTIALS_GOOGLE_DEFAULT_GOOGLE_DEFAULT_CREDENTIALS_H +#include <grpc/support/port_platform.h> + #include "src/core/lib/security/credentials/credentials.h" #define GRPC_GOOGLE_CLOUD_SDK_CONFIG_DIRECTORY "gcloud" #define GRPC_GOOGLE_WELL_KNOWN_CREDENTIALS_FILE \ "application_default_credentials.json" +#ifdef GPR_WINDOWS +#define GRPC_GOOGLE_CREDENTIALS_PATH_ENV_VAR "APPDATA" +#define GRPC_GOOGLE_CREDENTIALS_PATH_SUFFIX \ + GRPC_GOOGLE_CLOUD_SDK_CONFIG_DIRECTORY \ + "/" GRPC_GOOGLE_WELL_KNOWN_CREDENTIALS_FILE +#else +#define GRPC_GOOGLE_CREDENTIALS_PATH_ENV_VAR "HOME" +#define GRPC_GOOGLE_CREDENTIALS_PATH_SUFFIX \ + ".config/" GRPC_GOOGLE_CLOUD_SDK_CONFIG_DIRECTORY \ + "/" GRPC_GOOGLE_WELL_KNOWN_CREDENTIALS_FILE +#endif + void grpc_flush_cached_google_default_credentials(void); #endif /* GRPC_CORE_LIB_SECURITY_CREDENTIALS_GOOGLE_DEFAULT_GOOGLE_DEFAULT_CREDENTIALS_H \ diff --git a/src/core/lib/support/thd.c b/src/core/lib/support/thd.c index 41daeb5d0e..40f53a18e5 100644 --- a/src/core/lib/support/thd.c +++ b/src/core/lib/support/thd.c @@ -33,7 +33,7 @@ /* Posix implementation for gpr threads. */ -#include <memory.h> +#include <string.h> #include <grpc/support/thd.h> diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 30559304c6..02a5de0857 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -30,6 +30,7 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ + #include <assert.h> #include <limits.h> #include <stdio.h> diff --git a/src/core/lib/tsi/ssl_transport_security.c b/src/core/lib/tsi/ssl_transport_security.c index e91c6316e7..749b46e19f 100644 --- a/src/core/lib/tsi/ssl_transport_security.c +++ b/src/core/lib/tsi/ssl_transport_security.c @@ -31,6 +31,9 @@ * */ +#include "src/core/lib/iomgr/sockaddr.h" + +#include "src/core/lib/iomgr/socket_utils.h" #include "src/core/lib/tsi/ssl_transport_security.h" #include <grpc/support/port_platform.h> @@ -38,13 +41,6 @@ #include <limits.h> #include <string.h> -/* TODO(jboeuf): refactor inet_ntop into a portability header. */ -#ifdef GPR_WINSOCK_SOCKET -#include <ws2tcpip.h> -#else -#include <arpa/inet.h> -#endif - #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -353,8 +349,8 @@ static tsi_result add_subject_alt_names_properties_to_peer( result = TSI_INTERNAL_ERROR; break; } - const char *name = inet_ntop(af, subject_alt_name->d.iPAddress->data, - ntop_buf, INET6_ADDRSTRLEN); + const char *name = grpc_inet_ntop(af, subject_alt_name->d.iPAddress->data, + ntop_buf, INET6_ADDRSTRLEN); if (name == NULL) { gpr_log(GPR_ERROR, "Could not get IP string from asn1 octet."); result = TSI_INTERNAL_ERROR; |