aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/udp_server.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/udp_server.cc')
-rw-r--r--src/core/lib/iomgr/udp_server.cc131
1 files changed, 65 insertions, 66 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc
index 00b2e68bb5..8ce8b961ff 100644
--- a/src/core/lib/iomgr/udp_server.cc
+++ b/src/core/lib/iomgr/udp_server.cc
@@ -59,8 +59,8 @@
typedef struct grpc_udp_listener grpc_udp_listener;
struct grpc_udp_listener {
int fd;
- grpc_fd *emfd;
- grpc_udp_server *server;
+ grpc_fd* emfd;
+ grpc_udp_server* server;
grpc_resolved_address addr;
grpc_closure read_closure;
grpc_closure write_closure;
@@ -74,12 +74,12 @@ struct grpc_udp_listener {
// True if orphan_cb is trigered.
bool orphan_notified;
- struct grpc_udp_listener *next;
+ struct grpc_udp_listener* next;
};
struct shutdown_fd_args {
- grpc_fd *fd;
- gpr_mu *server_mu;
+ grpc_fd* fd;
+ gpr_mu* server_mu;
};
/* the overall server */
@@ -87,7 +87,7 @@ struct grpc_udp_server {
gpr_mu mu;
/* factory to use for creating and binding sockets, or NULL */
- grpc_socket_factory *socket_factory;
+ grpc_socket_factory* socket_factory;
/* active port count: how many ports are actually still listening */
size_t active_ports;
@@ -98,34 +98,34 @@ struct grpc_udp_server {
int shutdown;
/* linked list of server ports */
- grpc_udp_listener *head;
- grpc_udp_listener *tail;
+ grpc_udp_listener* head;
+ grpc_udp_listener* tail;
unsigned nports;
/* shutdown callback */
- grpc_closure *shutdown_complete;
+ grpc_closure* shutdown_complete;
/* all pollsets interested in new connections */
- grpc_pollset **pollsets;
+ grpc_pollset** pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
/* opaque object to pass to callbacks */
- void *user_data;
+ void* user_data;
};
-static grpc_socket_factory *get_socket_factory(const grpc_channel_args *args) {
+static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) {
if (args) {
- const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY);
+ const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY);
if (arg) {
GPR_ASSERT(arg->type == GRPC_ARG_POINTER);
- return (grpc_socket_factory *)arg->value.pointer.p;
+ return (grpc_socket_factory*)arg->value.pointer.p;
}
}
return NULL;
}
-grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) {
- grpc_udp_server *s = (grpc_udp_server *)gpr_malloc(sizeof(grpc_udp_server));
+grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) {
+ grpc_udp_server* s = (grpc_udp_server*)gpr_malloc(sizeof(grpc_udp_server));
gpr_mu_init(&s->mu);
s->socket_factory = get_socket_factory(args);
if (s->socket_factory) {
@@ -141,20 +141,20 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) {
return s;
}
-static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *args,
- grpc_error *error) {
- struct shutdown_fd_args *shutdown_args = (struct shutdown_fd_args *)args;
+static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args,
+ grpc_error* error) {
+ struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args;
gpr_mu_lock(shutdown_args->server_mu);
grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error));
gpr_mu_unlock(shutdown_args->server_mu);
gpr_free(shutdown_args);
}
-static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
+static void dummy_cb(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
// No-op.
}
-static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
+static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
if (s->shutdown_complete != NULL) {
GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
@@ -162,7 +162,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
gpr_mu_destroy(&s->mu);
while (s->head) {
- grpc_udp_listener *sp = s->head;
+ grpc_udp_listener* sp = s->head;
s->head = sp->next;
gpr_free(sp);
}
@@ -174,9 +174,9 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
gpr_free(s);
}
-static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server,
- grpc_error *error) {
- grpc_udp_server *s = (grpc_udp_server *)server;
+static void destroyed_port(grpc_exec_ctx* exec_ctx, void* server,
+ grpc_error* error) {
+ grpc_udp_server* s = (grpc_udp_server*)server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
if (s->destroyed_ports == s->nports) {
@@ -190,14 +190,14 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server,
/* called when all listening endpoints have been shutdown, so no further
events will be received on them - at this point it's safe to destroy
things */
-static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
+static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) {
/* delete ALL the things */
gpr_mu_lock(&s->mu);
GPR_ASSERT(s->shutdown);
if (s->head) {
- grpc_udp_listener *sp;
+ grpc_udp_listener* sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
@@ -223,9 +223,9 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
}
}
-void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
- grpc_closure *on_done) {
- grpc_udp_listener *sp;
+void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
+ grpc_closure* on_done) {
+ grpc_udp_listener* sp;
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
@@ -237,8 +237,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
if (s->active_ports) {
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
- struct shutdown_fd_args *args =
- (struct shutdown_fd_args *)gpr_malloc(sizeof(*args));
+ struct shutdown_fd_args* args =
+ (struct shutdown_fd_args*)gpr_malloc(sizeof(*args));
args->fd = sp->emfd;
args->server_mu = &s->mu;
GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
@@ -254,19 +254,18 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
}
}
-static int bind_socket(grpc_socket_factory *socket_factory, int sockfd,
- const grpc_resolved_address *addr) {
+static int bind_socket(grpc_socket_factory* socket_factory, int sockfd,
+ const grpc_resolved_address* addr) {
return (socket_factory != NULL)
? grpc_socket_factory_bind(socket_factory, sockfd, addr)
- : bind(sockfd, (struct sockaddr *)addr->addr,
- (socklen_t)addr->len);
+ : bind(sockfd, (struct sockaddr*)addr->addr, (socklen_t)addr->len);
}
/* Prepare a recently-created socket for listening. */
-static int prepare_socket(grpc_socket_factory *socket_factory, int fd,
- const grpc_resolved_address *addr) {
+static int prepare_socket(grpc_socket_factory* socket_factory, int fd,
+ const grpc_resolved_address* addr) {
grpc_resolved_address sockname_temp;
- struct sockaddr *addr_ptr = (struct sockaddr *)addr->addr;
+ struct sockaddr* addr_ptr = (struct sockaddr*)addr->addr;
/* Set send/receive socket buffers to 1 MB */
int buffer_size_bytes = 1024 * 1024;
@@ -295,7 +294,7 @@ static int prepare_socket(grpc_socket_factory *socket_factory, int fd,
GPR_ASSERT(addr->len < ~(socklen_t)0);
if (bind_socket(socket_factory, fd, addr) < 0) {
- char *addr_str;
+ char* addr_str;
grpc_sockaddr_to_string(&addr_str, addr, 0);
gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
gpr_free(addr_str);
@@ -304,8 +303,8 @@ static int prepare_socket(grpc_socket_factory *socket_factory, int fd,
sockname_temp.len = sizeof(struct sockaddr_storage);
- if (getsockname(fd, (struct sockaddr *)sockname_temp.addr,
- (socklen_t *)&sockname_temp.len) < 0) {
+ if (getsockname(fd, (struct sockaddr*)sockname_temp.addr,
+ (socklen_t*)&sockname_temp.len) < 0) {
goto error;
}
@@ -331,8 +330,8 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_udp_listener *sp = (grpc_udp_listener *)arg;
+static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_udp_listener* sp = (grpc_udp_listener*)arg;
gpr_mu_lock(&sp->server->mu);
if (error != GRPC_ERROR_NONE) {
@@ -354,8 +353,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_mu_unlock(&sp->server->mu);
}
-static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
- grpc_udp_listener *sp = (grpc_udp_listener *)arg;
+static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) {
+ grpc_udp_listener* sp = (grpc_udp_listener*)arg;
gpr_mu_lock(&(sp->server->mu));
if (error != GRPC_ERROR_NONE) {
@@ -377,15 +376,15 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
gpr_mu_unlock(&sp->server->mu);
}
-static int add_socket_to_server(grpc_udp_server *s, int fd,
- const grpc_resolved_address *addr,
+static int add_socket_to_server(grpc_udp_server* s, int fd,
+ const grpc_resolved_address* addr,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
- grpc_udp_listener *sp;
+ grpc_udp_listener* sp;
int port;
- char *addr_str;
- char *name;
+ char* addr_str;
+ char* name;
port = prepare_socket(s->socket_factory, fd, addr);
if (port >= 0) {
@@ -394,7 +393,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
gpr_free(addr_str);
gpr_mu_lock(&s->mu);
s->nports++;
- sp = (grpc_udp_listener *)gpr_malloc(sizeof(grpc_udp_listener));
+ sp = (grpc_udp_listener*)gpr_malloc(sizeof(grpc_udp_listener));
sp->next = NULL;
if (s->head == NULL) {
s->head = sp;
@@ -418,12 +417,12 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
return port;
}
-int grpc_udp_server_add_port(grpc_udp_server *s,
- const grpc_resolved_address *addr,
+int grpc_udp_server_add_port(grpc_udp_server* s,
+ const grpc_resolved_address* addr,
grpc_udp_server_read_cb read_cb,
grpc_udp_server_write_cb write_cb,
grpc_udp_server_orphan_cb orphan_cb) {
- grpc_udp_listener *sp;
+ grpc_udp_listener* sp;
int allocated_port1 = -1;
int allocated_port2 = -1;
int fd;
@@ -432,7 +431,7 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
grpc_resolved_address wild4;
grpc_resolved_address wild6;
grpc_resolved_address addr4_copy;
- grpc_resolved_address *allocated_addr = NULL;
+ grpc_resolved_address* allocated_addr = NULL;
grpc_resolved_address sockname_temp;
int port;
@@ -441,12 +440,12 @@ int grpc_udp_server_add_port(grpc_udp_server *s,
if (grpc_sockaddr_get_port(addr) == 0) {
for (sp = s->head; sp; sp = sp->next) {
sockname_temp.len = sizeof(struct sockaddr_storage);
- if (0 == getsockname(sp->fd, (struct sockaddr *)sockname_temp.addr,
- (socklen_t *)&sockname_temp.len)) {
+ if (0 == getsockname(sp->fd, (struct sockaddr*)sockname_temp.addr,
+ (socklen_t*)&sockname_temp.len)) {
port = grpc_sockaddr_get_port(&sockname_temp);
if (port > 0) {
- allocated_addr = (grpc_resolved_address *)gpr_malloc(
- sizeof(grpc_resolved_address));
+ allocated_addr =
+ (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address));
memcpy(allocated_addr, addr, sizeof(grpc_resolved_address));
grpc_sockaddr_set_port(allocated_addr, port);
addr = allocated_addr;
@@ -500,8 +499,8 @@ done:
return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
}
-int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) {
- grpc_udp_listener *sp;
+int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) {
+ grpc_udp_listener* sp;
if (port_index >= s->nports) {
return -1;
}
@@ -512,12 +511,12 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) {
return sp->fd;
}
-void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
- grpc_pollset **pollsets, size_t pollset_count,
- void *user_data) {
+void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s,
+ grpc_pollset** pollsets, size_t pollset_count,
+ void* user_data) {
size_t i;
gpr_mu_lock(&s->mu);
- grpc_udp_listener *sp;
+ grpc_udp_listener* sp;
GPR_ASSERT(s->active_ports == 0);
s->pollsets = pollsets;
s->user_data = user_data;