aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/udp_server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/udp_server.c')
-rw-r--r--src/core/lib/iomgr/udp_server.c62
1 files changed, 46 insertions, 16 deletions
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 71e295770a..60579e18ba 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -59,11 +59,13 @@
#include <grpc/support/string_util.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
+#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/socket_factory_posix.h"
#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "src/core/lib/iomgr/unix_sockets_posix.h"
#include "src/core/lib/support/string.h"
@@ -89,6 +91,9 @@ struct grpc_udp_listener {
struct grpc_udp_server {
gpr_mu mu;
+ /* factory to use for creating and binding sockets, or NULL */
+ grpc_socket_factory *socket_factory;
+
/* active port count: how many ports are actually still listening */
size_t active_ports;
/* destroyed port count: how many ports are completely destroyed */
@@ -113,9 +118,24 @@ struct grpc_udp_server {
void *user_data;
};
-grpc_udp_server *grpc_udp_server_create(void) {
+static grpc_socket_factory *get_socket_factory(const grpc_channel_args *args) {
+ if (args) {
+ const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY);
+ if (arg) {
+ GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
+ return arg->value.pointer.p;
+ }
+ }
+ return NULL;
+}
+
+grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) {
grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server));
gpr_mu_init(&s->mu);
+ s->socket_factory = get_socket_factory(args);
+ if (s->socket_factory) {
+ grpc_socket_factory_ref(s->socket_factory);
+ }
s->active_ports = 0;
s->destroyed_ports = 0;
s->shutdown = 0;
@@ -139,6 +159,10 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
gpr_free(sp);
}
+ if (s->socket_factory) {
+ grpc_socket_factory_unref(s->socket_factory);
+ }
+
gpr_free(s);
}
@@ -162,10 +186,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
/* delete ALL the things */
gpr_mu_lock(&s->mu);
- if (!s->shutdown) {
- gpr_mu_unlock(&s->mu);
- return;
- }
+ GPR_ASSERT(s->shutdown);
if (s->head) {
grpc_udp_listener *sp;
@@ -205,8 +226,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(exec_ctx, sp->emfd, sp->server->user_data);
- grpc_fd_shutdown(exec_ctx, sp->emfd,
- GRPC_ERROR_CREATE("Server destroyed"));
+ grpc_fd_shutdown(exec_ctx, sp->emfd, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ "Server destroyed"));
}
gpr_mu_unlock(&s->mu);
} else {
@@ -215,8 +236,17 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
}
}
+static int bind_socket(grpc_socket_factory *socket_factory, int sockfd,
+ const grpc_resolved_address *addr) {
+ return (socket_factory != NULL)
+ ? grpc_socket_factory_bind(socket_factory, sockfd, addr)
+ : bind(sockfd, (struct sockaddr *)addr->addr,
+ (socklen_t)addr->len);
+}
+
/* Prepare a recently-created socket for listening. */
-static int prepare_socket(int fd, const grpc_resolved_address *addr) {
+static int prepare_socket(grpc_socket_factory *socket_factory, int fd,
+ const grpc_resolved_address *addr) {
grpc_resolved_address sockname_temp;
struct sockaddr *addr_ptr = (struct sockaddr *)addr->addr;
/* Set send/receive socket buffers to 1 MB */
@@ -246,7 +276,7 @@ static int prepare_socket(int fd, const grpc_resolved_address *addr) {
}
GPR_ASSERT(addr->len < ~(socklen_t)0);
- if (bind(fd, (struct sockaddr *)addr, (socklen_t)addr->len) < 0) {
+ if (bind_socket(socket_factory, fd, addr) < 0) {
char *addr_str;
grpc_sockaddr_to_string(&addr_str, addr, 0);
gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
@@ -288,7 +318,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_mu_lock(&sp->server->mu);
if (error != GRPC_ERROR_NONE) {
- if (0 == --sp->server->active_ports) {
+ if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
deactivated_all_ports(exec_ctx, sp->server);
} else {
@@ -311,7 +341,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_mu_lock(&(sp->server->mu));
if (error != GRPC_ERROR_NONE) {
- if (0 == --sp->server->active_ports) {
+ if (0 == --sp->server->active_ports && sp->server->shutdown) {
gpr_mu_unlock(&sp->server->mu);
deactivated_all_ports(exec_ctx, sp->server);
} else {
@@ -339,7 +369,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
char *addr_str;
char *name;
- port = prepare_socket(fd, addr);
+ port = prepare_socket(s->socket_factory, fd, addr);
if (port >= 0) {
grpc_sockaddr_to_string(&addr_str, addr, 1);
gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
@@ -417,8 +447,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
/* Try listening on IPv6 first. */
addr = &wild6;
// TODO(rjshade): Test and propagate the returned grpc_error*:
- GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
- &dsmode, &fd));
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
+ s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
allocated_port1 =
add_socket_to_server(s, fd, addr, read_cb, write_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
@@ -433,8 +463,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
}
// TODO(rjshade): Test and propagate the returned grpc_error*:
- GRPC_ERROR_UNREF(grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP,
- &dsmode, &fd));
+ GRPC_ERROR_UNREF(grpc_create_dualstack_socket_using_factory(
+ s->socket_factory, addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd));
if (fd < 0) {
gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
}