diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/closure.c | 8 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.h | 5 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 28 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.h | 23 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.c | 34 | ||||
-rw-r--r-- | src/core/lib/iomgr/resource_quota.h | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/sockaddr_utils.c | 38 | ||||
-rw-r--r-- | src/core/lib/iomgr/sockaddr_utils.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_client_windows.c | 27 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_server_posix.c | 331 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.c | 42 | ||||
-rw-r--r-- | src/core/lib/iomgr/udp_server.h | 8 |
12 files changed, 436 insertions, 118 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 8e4efd3370..509c1ff95d 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -51,20 +51,22 @@ void grpc_closure_list_init(grpc_closure_list *closure_list) { closure_list->head = closure_list->tail = NULL; } -void grpc_closure_list_append(grpc_closure_list *closure_list, +bool grpc_closure_list_append(grpc_closure_list *closure_list, grpc_closure *closure, grpc_error *error) { if (closure == NULL) { GRPC_ERROR_UNREF(error); - return; + return false; } closure->error_data.error = error; closure->next_data.next = NULL; - if (closure_list->head == NULL) { + bool was_empty = (closure_list->head == NULL); + if (was_empty) { closure_list->head = closure; } else { closure_list->tail->next_data.next = closure; } closure_list->tail = closure; + return was_empty; } void grpc_closure_list_fail_all(grpc_closure_list *list, diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 6748b21e59..2510d50b42 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -116,8 +116,9 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg, void grpc_closure_list_init(grpc_closure_list *list); /** add \a closure to the end of \a list - and set \a closure's result to \a error */ -void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure, + and set \a closure's result to \a error + Returns true if \a list becomes non-empty */ +bool grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure, grpc_error *error); /** force all success bits in \a list to false */ diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index ba6c7087a9..fa9966c3a6 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -72,6 +72,7 @@ struct grpc_combiner { bool final_list_covered_by_poller; grpc_closure_list final_list; grpc_closure offload; + gpr_refcount refs; }; static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, @@ -126,6 +127,7 @@ static bool is_covered_by_poller(grpc_combiner *lock) { grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { grpc_combiner *lock = gpr_malloc(sizeof(*lock)); + gpr_ref_init(&lock->refs, 1); lock->next_combiner_on_this_exec_ctx = NULL; lock->time_to_execute_final_list = false; lock->optional_workqueue = optional_workqueue; @@ -152,7 +154,7 @@ static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { gpr_free(lock); } -void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { +static void start_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED); GRPC_COMBINER_TRACE(gpr_log( GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); @@ -161,6 +163,30 @@ void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { } } +#ifdef GRPC_COMBINER_REFCOUNT_DEBUG +#define GRPC_COMBINER_DEBUG_SPAM(op, delta) \ + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, \ + "combiner[%p] %s %" PRIdPTR " --> %" PRIdPTR " %s", lock, (op), \ + gpr_atm_no_barrier_load(&lock->refs.count), \ + gpr_atm_no_barrier_load(&lock->refs.count) + (delta), reason); +#else +#define GRPC_COMBINER_DEBUG_SPAM(op, delta) +#endif + +void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { + GRPC_COMBINER_DEBUG_SPAM("UNREF", -1); + if (gpr_unref(&lock->refs)) { + start_destroy(exec_ctx, lock); + } +} + +grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS) { + GRPC_COMBINER_DEBUG_SPAM(" REF", 1); + gpr_ref(&lock->refs); + return lock; +} + static void push_last_on_exec_ctx(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { lock->next_combiner_on_this_exec_ctx = NULL; diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h index 81dff85d40..75dcb0b70a 100644 --- a/src/core/lib/iomgr/combiner.h +++ b/src/core/lib/iomgr/combiner.h @@ -48,8 +48,27 @@ // Initialize the lock, with an optional workqueue to shift load to when // necessary grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); -// Destroy the lock -void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); + +//#define GRPC_COMBINER_REFCOUNT_DEBUG +#ifdef GRPC_COMBINER_REFCOUNT_DEBUG +#define GRPC_COMBINER_DEBUG_ARGS \ + , const char *file, int line, const char *reason +#define GRPC_COMBINER_REF(combiner, reason) \ + grpc_combiner_ref((combiner), __FILE__, __LINE__, (reason)) +#define GRPC_COMBINER_UNREF(exec_ctx, combiner, reason) \ + grpc_combiner_unref((exec_ctx), (combiner), __FILE__, __LINE__, (reason)) +#else +#define GRPC_COMBINER_DEBUG_ARGS +#define GRPC_COMBINER_REF(combiner, reason) grpc_combiner_ref((combiner)) +#define GRPC_COMBINER_UNREF(exec_ctx, combiner, reason) \ + grpc_combiner_unref((exec_ctx), (combiner)) +#endif + +// Ref/unref the lock, for when we're sharing the lock ownership +// Prefer to use the macros above +grpc_combiner *grpc_combiner_ref(grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS); +void grpc_combiner_unref(grpc_exec_ctx *exec_ctx, + grpc_combiner *lock GRPC_COMBINER_DEBUG_ARGS); // Fetch a scheduler to schedule closures against grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock, bool covered_by_poller); diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index d5995a5ac6..511ffdcdf1 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -33,6 +33,8 @@ #include "src/core/lib/iomgr/resource_quota.h" +#include <limits.h> +#include <stdint.h> #include <string.h> #include <grpc/support/alloc.h> @@ -44,6 +46,8 @@ int grpc_resource_quota_trace = 0; +#define MEMORY_USAGE_ESTIMATION_MAX 65536 + /* Internal linked list pointers for a resource user */ typedef struct { grpc_resource_user *next; @@ -126,9 +130,12 @@ struct grpc_resource_quota { /* refcount */ gpr_refcount refs; + /* estimate of current memory usage + scaled to the range [0..RESOURCE_USAGE_ESTIMATION_MAX] */ + gpr_atm memory_usage_estimation; + /* Master combiner lock: all activity on a quota executes under this combiner - * (so no mutex is needed for this data structure) - */ + * (so no mutex is needed for this data structure) */ grpc_combiner *combiner; /* Size of the resource quota */ int64_t size; @@ -269,6 +276,16 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); } +/* update the atomically available resource estimate - use no barriers since + timeliness of delivery really doesn't matter much */ +static void rq_update_estimate(grpc_resource_quota *resource_quota) { + gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, + (gpr_atm)((1.0 - + ((double)resource_quota->free_pool) / + ((double)resource_quota->size)) * + MEMORY_USAGE_ESTIMATION_MAX)); +} + /* returns true if all allocations are completed */ static bool rq_alloc(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { @@ -281,6 +298,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, int64_t amt = -resource_user->free_pool; resource_user->free_pool = 0; resource_quota->free_pool -= amt; + rq_update_estimate(resource_quota); if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "RQ %s %s: grant alloc %" PRId64 " bytes; rq_free_pool -> %" PRId64, @@ -315,6 +333,7 @@ static bool rq_reclaim_from_per_user_free_pool( int64_t amt = resource_user->free_pool; resource_user->free_pool = 0; resource_quota->free_pool += amt; + rq_update_estimate(resource_quota); if (grpc_resource_quota_trace) { gpr_log(GPR_DEBUG, "RQ %s %s: reclaim_from_per_user_free_pool %" PRId64 " bytes; rq_free_pool -> %" PRId64, @@ -531,6 +550,7 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { int64_t delta = a->size - a->resource_quota->size; a->resource_quota->size += delta; a->resource_quota->free_pool += delta; + rq_update_estimate(a->resource_quota); rq_step_sched(exec_ctx, a->resource_quota); grpc_resource_quota_unref_internal(exec_ctx, a->resource_quota); gpr_free(a); @@ -557,6 +577,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { resource_quota->size = INT64_MAX; resource_quota->step_scheduled = false; resource_quota->reclaiming = false; + gpr_atm_no_barrier_store(&resource_quota->memory_usage_estimation, 0); if (name != NULL) { resource_quota->name = gpr_strdup(name); } else { @@ -578,7 +599,7 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx, grpc_resource_quota *resource_quota) { if (gpr_unref(&resource_quota->refs)) { - grpc_combiner_destroy(exec_ctx, resource_quota->combiner); + GRPC_COMBINER_UNREF(exec_ctx, resource_quota->combiner, "resource_quota"); gpr_free(resource_quota->name); gpr_free(resource_quota); } @@ -602,6 +623,13 @@ void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) { grpc_resource_quota_ref_internal(resource_quota); } +double grpc_resource_quota_get_memory_pressure( + grpc_resource_quota *resource_quota) { + return ((double)(gpr_atm_no_barrier_load( + &resource_quota->memory_usage_estimation))) / + ((double)MEMORY_USAGE_ESTIMATION_MAX); +} + /* Public API */ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, size_t size) { diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index d1127ce9ea..b9f62cbf83 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -84,6 +84,12 @@ void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx, grpc_resource_quota *grpc_resource_quota_from_channel_args( const grpc_channel_args *channel_args); +/* Return a number indicating current memory pressure: + 0.0 ==> no memory usage + 1.0 ==> maximum memory usage */ +double grpc_resource_quota_get_memory_pressure( + grpc_resource_quota *resource_quota); + typedef struct grpc_resource_user grpc_resource_user; grpc_resource_user *grpc_resource_user_create( diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c index 44bc2f968b..ffa62cb53c 100644 --- a/src/core/lib/iomgr/sockaddr_utils.c +++ b/src/core/lib/iomgr/sockaddr_utils.c @@ -190,31 +190,37 @@ int grpc_sockaddr_to_string(char **out, } char *grpc_sockaddr_to_uri(const grpc_resolved_address *resolved_addr) { - char *temp; - char *result; grpc_resolved_address addr_normalized; - const struct sockaddr *addr; - if (grpc_sockaddr_is_v4mapped(resolved_addr, &addr_normalized)) { resolved_addr = &addr_normalized; } + const char *scheme = grpc_sockaddr_get_uri_scheme(resolved_addr); + if (scheme == NULL || strcmp("unix", scheme) == 0) { + return grpc_sockaddr_to_uri_unix_if_possible(resolved_addr); + } + char *path = NULL; + char *uri_str = NULL; + if (grpc_sockaddr_to_string(&path, resolved_addr, + false /* suppress errors */) && + scheme != NULL) { + gpr_asprintf(&uri_str, "%s:%s", scheme, path); + } + gpr_free(path); + return uri_str != NULL ? uri_str : NULL; +} - addr = (const struct sockaddr *)resolved_addr->addr; - +const char *grpc_sockaddr_get_uri_scheme( + const grpc_resolved_address *resolved_addr) { + const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr; switch (addr->sa_family) { case AF_INET: - grpc_sockaddr_to_string(&temp, resolved_addr, 0); - gpr_asprintf(&result, "ipv4:%s", temp); - gpr_free(temp); - return result; + return "ipv4"; case AF_INET6: - grpc_sockaddr_to_string(&temp, resolved_addr, 0); - gpr_asprintf(&result, "ipv6:%s", temp); - gpr_free(temp); - return result; - default: - return grpc_sockaddr_to_uri_unix_if_possible(resolved_addr); + return "ipv6"; + case AF_UNIX: + return "unix"; } + return NULL; } int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) { diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h index 5371e360c5..2b22f11b49 100644 --- a/src/core/lib/iomgr/sockaddr_utils.h +++ b/src/core/lib/iomgr/sockaddr_utils.h @@ -84,6 +84,10 @@ int grpc_sockaddr_set_port(const grpc_resolved_address *addr, int port); int grpc_sockaddr_to_string(char **out, const grpc_resolved_address *addr, int normalize); +/* Returns the URI string corresponding to \a addr */ char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr); +/* Returns the URI scheme corresponding to \a addr */ +const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr); + #endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index 1e84ec3a1e..c8dc9e64bd 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -135,12 +135,10 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { /* Tries to issue one async connection, then schedules both an IOCP notification request for the connection, and one timeout alert. */ -void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done, - grpc_endpoint **endpoint, - grpc_pollset_set *interested_parties, - const grpc_channel_args *channel_args, - const grpc_resolved_address *addr, - gpr_timespec deadline) { +static void tcp_client_connect_impl( + grpc_exec_ctx *exec_ctx, grpc_closure *on_done, grpc_endpoint **endpoint, + grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, + const grpc_resolved_address *addr, gpr_timespec deadline) { SOCKET sock = INVALID_SOCKET; BOOL success; int status; @@ -252,4 +250,21 @@ failure: grpc_closure_sched(exec_ctx, on_done, final_error); } +// overridden by api_fuzzer.c +void (*grpc_tcp_client_connect_impl)( + grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, + grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, + const grpc_resolved_address *addr, + gpr_timespec deadline) = tcp_client_connect_impl; + +void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_endpoint **ep, + grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, + const grpc_resolved_address *addr, + gpr_timespec deadline) { + grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, + channel_args, addr, deadline); +} + #endif /* GRPC_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index e9e7511c9c..36f878fdd4 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -44,6 +44,7 @@ #include <errno.h> #include <fcntl.h> +#include <ifaddrs.h> #include <limits.h> #include <netinet/in.h> #include <netinet/tcp.h> @@ -115,6 +116,8 @@ struct grpc_tcp_server { bool shutdown; /* use SO_REUSEPORT */ bool so_reuseport; + /* expand wildcard addresses to a list of all local addresses */ + bool expand_wildcard_addrs; /* linked list of server ports */ grpc_tcp_listener *head; @@ -161,6 +164,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); s->so_reuseport = has_so_reuseport; s->resource_quota = grpc_resource_quota_create(NULL); + s->expand_wildcard_addrs = false; for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { if (0 == strcmp(GRPC_ARG_ALLOW_REUSEPORT, args->args[i].key)) { if (args->args[i].type == GRPC_ARG_INTEGER) { @@ -183,6 +187,15 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA " must be a pointer to a buffer pool"); } + } else if (0 == strcmp(GRPC_ARG_EXPAND_WILDCARD_ADDRS, args->args[i].key)) { + if (args->args[i].type == GRPC_ARG_INTEGER) { + s->expand_wildcard_addrs = (args->args[i].value.integer != 0); + } else { + grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota); + gpr_free(s); + return GRPC_ERROR_CREATE(GRPC_ARG_EXPAND_WILDCARD_ADDRS + " must be an integer"); + } } } gpr_ref_init(&s->refs, 1); @@ -504,9 +517,224 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, int fd, return err; } -/* Insert count new listeners after listener. Every new listener will have the - same listen address as listener (SO_REUSEPORT must be enabled). Every new - listener is a sibling of listener. */ +/* If successful, add a listener to s for addr, set *dsmode for the socket, and + return the *listener. */ +static grpc_error *add_addr_to_server(grpc_tcp_server *s, + const grpc_resolved_address *addr, + unsigned port_index, unsigned fd_index, + grpc_dualstack_mode *dsmode, + grpc_tcp_listener **listener) { + grpc_resolved_address addr4_copy; + int fd; + grpc_error *err = + grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, dsmode, &fd); + if (err != GRPC_ERROR_NONE) { + return err; + } + if (*dsmode == GRPC_DSMODE_IPV4 && + grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { + addr = &addr4_copy; + } + return add_socket_to_server(s, fd, addr, port_index, fd_index, listener); +} + +/* Bind to "::" to get a port number not used by any address. */ +static grpc_error *get_unused_port(int *port) { + grpc_resolved_address wild; + grpc_sockaddr_make_wildcard6(0, &wild); + grpc_dualstack_mode dsmode; + int fd; + grpc_error *err = + grpc_create_dualstack_socket(&wild, SOCK_STREAM, 0, &dsmode, &fd); + if (err != GRPC_ERROR_NONE) { + return err; + } + if (dsmode == GRPC_DSMODE_IPV4) { + grpc_sockaddr_make_wildcard4(0, &wild); + } + if (bind(fd, (const struct sockaddr *)wild.addr, (socklen_t)wild.len) != 0) { + err = GRPC_OS_ERROR(errno, "bind"); + close(fd); + return err; + } + if (getsockname(fd, (struct sockaddr *)wild.addr, (socklen_t *)&wild.len) != + 0) { + err = GRPC_OS_ERROR(errno, "getsockname"); + close(fd); + return err; + } + close(fd); + *port = grpc_sockaddr_get_port(&wild); + return *port <= 0 ? GRPC_ERROR_CREATE("Bad port") : GRPC_ERROR_NONE; +} + +/* Return the listener in s with address addr or NULL. */ +static grpc_tcp_listener *find_listener_with_addr(grpc_tcp_server *s, + grpc_resolved_address *addr) { + grpc_tcp_listener *l; + gpr_mu_lock(&s->mu); + for (l = s->head; l != NULL; l = l->next) { + if (l->addr.len != addr->len) { + continue; + } + if (memcmp(l->addr.addr, addr->addr, addr->len) == 0) { + break; + } + } + gpr_mu_unlock(&s->mu); + return l; +} + +/* Get all addresses assigned to network interfaces on the machine and create a + listener for each. requested_port is the port to use for every listener, or 0 + to select one random port that will be used for every listener. Set *out_port + to the port selected. Return GRPC_ERROR_NONE only if all listeners were + added. */ +static grpc_error *add_all_local_addrs_to_server(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port) { + struct ifaddrs *ifa = NULL; + struct ifaddrs *ifa_it; + unsigned fd_index = 0; + grpc_tcp_listener *sp = NULL; + grpc_error *err = GRPC_ERROR_NONE; + if (requested_port == 0) { + /* Note: There could be a race where some local addrs can listen on the + selected port and some can't. The sane way to handle this would be to + retry by recreating the whole grpc_tcp_server. Backing out individual + listeners and orphaning the FDs looks like too much trouble. */ + if ((err = get_unused_port(&requested_port)) != GRPC_ERROR_NONE) { + return err; + } else if (requested_port <= 0) { + return GRPC_ERROR_CREATE("Bad get_unused_port()"); + } + gpr_log(GPR_DEBUG, "Picked unused port %d", requested_port); + } + if (getifaddrs(&ifa) != 0 || ifa == NULL) { + return GRPC_OS_ERROR(errno, "getifaddrs"); + } + for (ifa_it = ifa; ifa_it != NULL; ifa_it = ifa_it->ifa_next) { + grpc_resolved_address addr; + char *addr_str = NULL; + grpc_dualstack_mode dsmode; + grpc_tcp_listener *new_sp = NULL; + const char *ifa_name = (ifa_it->ifa_name ? ifa_it->ifa_name : "<unknown>"); + if (ifa_it->ifa_addr == NULL) { + continue; + } else if (ifa_it->ifa_addr->sa_family == AF_INET) { + addr.len = sizeof(struct sockaddr_in); + } else if (ifa_it->ifa_addr->sa_family == AF_INET6) { + addr.len = sizeof(struct sockaddr_in6); + } else { + continue; + } + memcpy(addr.addr, ifa_it->ifa_addr, addr.len); + if (!grpc_sockaddr_set_port(&addr, requested_port)) { + /* Should never happen, because we check sa_family above. */ + err = GRPC_ERROR_CREATE("Failed to set port"); + break; + } + if (grpc_sockaddr_to_string(&addr_str, &addr, 0) < 0) { + addr_str = gpr_strdup("<error>"); + } + gpr_log(GPR_DEBUG, + "Adding local addr from interface %s flags 0x%x to server: %s", + ifa_name, ifa_it->ifa_flags, addr_str); + /* We could have multiple interfaces with the same address (e.g., bonding), + so look for duplicates. */ + if (find_listener_with_addr(s, &addr) != NULL) { + gpr_log(GPR_DEBUG, "Skipping duplicate addr %s on interface %s", addr_str, + ifa_name); + gpr_free(addr_str); + continue; + } + if ((err = add_addr_to_server(s, &addr, port_index, fd_index, &dsmode, + &new_sp)) != GRPC_ERROR_NONE) { + char *err_str = NULL; + grpc_error *root_err; + if (gpr_asprintf(&err_str, "Failed to add listener: %s", addr_str) < 0) { + err_str = gpr_strdup("Failed to add listener"); + } + root_err = GRPC_ERROR_CREATE(err_str); + gpr_free(err_str); + gpr_free(addr_str); + err = grpc_error_add_child(root_err, err); + break; + } else { + GPR_ASSERT(requested_port == new_sp->port); + ++fd_index; + if (sp != NULL) { + new_sp->is_sibling = 1; + sp->sibling = new_sp; + } + sp = new_sp; + } + gpr_free(addr_str); + } + freeifaddrs(ifa); + if (err != GRPC_ERROR_NONE) { + return err; + } else if (sp == NULL) { + return GRPC_ERROR_CREATE("No local addresses"); + } else { + *out_port = sp->port; + return GRPC_ERROR_NONE; + } +} + +/* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ +static grpc_error *add_wildcard_addrs_to_server(grpc_tcp_server *s, + unsigned port_index, + int requested_port, + int *out_port) { + grpc_resolved_address wild4; + grpc_resolved_address wild6; + unsigned fd_index = 0; + grpc_dualstack_mode dsmode; + grpc_tcp_listener *sp = NULL; + grpc_tcp_listener *sp2 = NULL; + grpc_error *v6_err = GRPC_ERROR_NONE; + grpc_error *v4_err = GRPC_ERROR_NONE; + *out_port = -1; + if (s->expand_wildcard_addrs) { + return add_all_local_addrs_to_server(s, port_index, requested_port, + out_port); + } + grpc_sockaddr_make_wildcards(requested_port, &wild4, &wild6); + /* Try listening on IPv6 first. */ + if ((v6_err = add_addr_to_server(s, &wild6, port_index, fd_index, &dsmode, + &sp)) == GRPC_ERROR_NONE) { + ++fd_index; + requested_port = *out_port = sp->port; + if (dsmode == GRPC_DSMODE_DUALSTACK || dsmode == GRPC_DSMODE_IPV4) { + return GRPC_ERROR_NONE; + } + } + /* If we got a v6-only socket or nothing, try adding 0.0.0.0. */ + grpc_sockaddr_set_port(&wild4, requested_port); + if ((v4_err = add_addr_to_server(s, &wild4, port_index, fd_index, &dsmode, + &sp2)) == GRPC_ERROR_NONE) { + *out_port = sp2->port; + if (sp != NULL) { + sp2->is_sibling = 1; + sp->sibling = sp2; + } + } + if (*out_port > 0) { + GRPC_LOG_IF_ERROR("Failed to add :: listener", v6_err); + GRPC_LOG_IF_ERROR("Failed to add 0.0.0.0 listener", v4_err); + return GRPC_ERROR_NONE; + } else { + grpc_error *root_err = + GRPC_ERROR_CREATE("Failed to add any wildcard listeners"); + GPR_ASSERT(v6_err != GRPC_ERROR_NONE && v4_err != GRPC_ERROR_NONE); + root_err = grpc_error_add_child(root_err, v6_err); + root_err = grpc_error_add_child(root_err, v4_err); + return root_err; + } +} + static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { grpc_tcp_listener *sp = NULL; char *addr_str; @@ -559,19 +787,13 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, const grpc_resolved_address *addr, int *out_port) { grpc_tcp_listener *sp; - grpc_tcp_listener *sp2 = NULL; - int fd; - grpc_dualstack_mode dsmode; - grpc_resolved_address addr6_v4mapped; - grpc_resolved_address wild4; - grpc_resolved_address wild6; - grpc_resolved_address addr4_copy; - grpc_resolved_address *allocated_addr = NULL; grpc_resolved_address sockname_temp; - int port; + grpc_resolved_address addr6_v4mapped; + int requested_port = grpc_sockaddr_get_port(addr); unsigned port_index = 0; - unsigned fd_index = 0; - grpc_error *errs[2] = {GRPC_ERROR_NONE, GRPC_ERROR_NONE}; + grpc_dualstack_mode dsmode; + grpc_error *err; + *out_port = -1; if (s->tail != NULL) { port_index = s->tail->port_index + 1; } @@ -579,85 +801,34 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s, /* Check if this is a wildcard port, and if so, try to keep the port the same as some previously created listener. */ - if (grpc_sockaddr_get_port(addr) == 0) { + if (requested_port == 0) { for (sp = s->head; sp; sp = sp->next) { sockname_temp.len = sizeof(struct sockaddr_storage); - if (0 == getsockname(sp->fd, (struct sockaddr *)sockname_temp.addr, + if (0 == getsockname(sp->fd, (struct sockaddr *)&sockname_temp.addr, (socklen_t *)&sockname_temp.len)) { - port = grpc_sockaddr_get_port(&sockname_temp); - if (port > 0) { - allocated_addr = gpr_malloc(sizeof(grpc_resolved_address)); - memcpy(allocated_addr, addr, addr->len); - grpc_sockaddr_set_port(allocated_addr, port); - addr = allocated_addr; + int used_port = grpc_sockaddr_get_port(&sockname_temp); + if (used_port > 0) { + memcpy(&sockname_temp, addr, sizeof(grpc_resolved_address)); + grpc_sockaddr_set_port(&sockname_temp, used_port); + requested_port = used_port; + addr = &sockname_temp; break; } } } } - - sp = NULL; - + if (grpc_sockaddr_is_wildcard(addr, &requested_port)) { + return add_wildcard_addrs_to_server(s, port_index, requested_port, + out_port); + } if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { addr = &addr6_v4mapped; } - - /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */ - if (grpc_sockaddr_is_wildcard(addr, &port)) { - grpc_sockaddr_make_wildcards(port, &wild4, &wild6); - - /* Try listening on IPv6 first. */ - addr = &wild6; - errs[0] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); - if (errs[0] == GRPC_ERROR_NONE) { - errs[0] = add_socket_to_server(s, fd, addr, port_index, fd_index, &sp); - if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { - goto done; - } - if (sp != NULL) { - ++fd_index; - } - /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */ - if (port == 0 && sp != NULL) { - grpc_sockaddr_set_port(&wild4, sp->port); - } - } - addr = &wild4; - } - - errs[1] = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); - if (errs[1] == GRPC_ERROR_NONE) { - if (dsmode == GRPC_DSMODE_IPV4 && - grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { - addr = &addr4_copy; - } - sp2 = sp; - errs[1] = add_socket_to_server(s, fd, addr, port_index, fd_index, &sp); - if (sp2 != NULL && sp != NULL) { - sp2->sibling = sp; - sp->is_sibling = 1; - } - } - -done: - gpr_free(allocated_addr); - if (sp != NULL) { + if ((err = add_addr_to_server(s, addr, port_index, 0, &dsmode, &sp)) == + GRPC_ERROR_NONE) { *out_port = sp->port; - GRPC_ERROR_UNREF(errs[0]); - GRPC_ERROR_UNREF(errs[1]); - return GRPC_ERROR_NONE; - } else { - *out_port = -1; - char *addr_str = grpc_sockaddr_to_uri(addr); - grpc_error *err = grpc_error_set_str( - GRPC_ERROR_CREATE_REFERENCING("Failed to add port to server", errs, - GPR_ARRAY_SIZE(errs)), - GRPC_ERROR_STR_TARGET_ADDRESS, addr_str); - GRPC_ERROR_UNREF(errs[0]); - GRPC_ERROR_UNREF(errs[1]); - gpr_free(addr_str); - return err; } + return err; } /* Return listener at port_index or NULL. Should only be called with s->mu diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 3b23b47d4f..2a1c8d39fa 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -76,8 +76,10 @@ struct grpc_udp_listener { grpc_udp_server *server; grpc_resolved_address addr; grpc_closure read_closure; + grpc_closure write_closure; grpc_closure destroyed_closure; grpc_udp_server_read_cb read_cb; + grpc_udp_server_write_cb write_cb; grpc_udp_server_orphan_cb orphan_cb; struct grpc_udp_listener *next; @@ -176,7 +178,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { /* Call the orphan_cb to signal that the FD is about to be closed and * should no longer be used. */ GPR_ASSERT(sp->orphan_cb); - sp->orphan_cb(sp->emfd); + sp->orphan_cb(exec_ctx, sp->emfd); grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, "udp_listener_shutdown"); @@ -202,7 +204,7 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); - sp->orphan_cb(sp->emfd); + sp->orphan_cb(exec_ctx, sp->emfd); grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE("Server destroyed")); } @@ -304,9 +306,33 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_unlock(&sp->server->mu); } +static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + grpc_udp_listener *sp = arg; + + gpr_mu_lock(&(sp->server->mu)); + if (error != GRPC_ERROR_NONE) { + if (0 == --sp->server->active_ports) { + gpr_mu_unlock(&sp->server->mu); + deactivated_all_ports(exec_ctx, sp->server); + } else { + gpr_mu_unlock(&sp->server->mu); + } + return; + } + + /* Tell the registered callback that the socket is writeable. */ + GPR_ASSERT(sp->write_cb); + sp->write_cb(exec_ctx, sp->emfd); + + /* Re-arm the notification event so we get another chance to write. */ + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + gpr_mu_unlock(&sp->server->mu); +} + static int add_socket_to_server(grpc_udp_server *s, int fd, const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, + grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { grpc_udp_listener *sp; int port; @@ -333,6 +359,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, sp->emfd = grpc_fd_create(fd, name); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->read_cb = read_cb; + sp->write_cb = write_cb; sp->orphan_cb = orphan_cb; GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); @@ -345,6 +372,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, int grpc_udp_server_add_port(grpc_udp_server *s, const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, + grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { grpc_udp_listener *sp; int allocated_port1 = -1; @@ -391,7 +419,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, // TODO(rjshade): Test and propagate the returned grpc_error*: GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd)); - allocated_port1 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb); + allocated_port1 = + add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { goto done; } @@ -413,7 +442,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) { addr = &addr4_copy; } - allocated_port2 = add_socket_to_server(s, fd, addr, read_cb, orphan_cb); + allocated_port2 = + add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb); done: gpr_free(allocated_addr); @@ -451,6 +481,10 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); + grpc_closure_init(&sp->write_closure, on_write, sp, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); + s->active_ports++; sp = sp->next; } diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h index f3c466a031..ed63fa7d81 100644 --- a/src/core/lib/iomgr/udp_server.h +++ b/src/core/lib/iomgr/udp_server.h @@ -49,8 +49,13 @@ typedef struct grpc_udp_server grpc_udp_server; typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd, struct grpc_server *server); +/* Called when the socket is writeable. */ +typedef void (*grpc_udp_server_write_cb)(grpc_exec_ctx *exec_ctx, + grpc_fd *emfd); + /* Called when the grpc_fd is about to be orphaned (and the FD closed). */ -typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd); +typedef void (*grpc_udp_server_orphan_cb)(grpc_exec_ctx *exec_ctx, + grpc_fd *emfd); /* Create a server, initially not bound to any ports */ grpc_udp_server *grpc_udp_server_create(void); @@ -75,6 +80,7 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index); int grpc_udp_server_add_port(grpc_udp_server *s, const grpc_resolved_address *addr, grpc_udp_server_read_cb read_cb, + grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb); void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server, |