diff options
Diffstat (limited to 'src/core/lib/iomgr/udp_server.cc')
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 131 |
1 files changed, 65 insertions, 66 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 00b2e68bb5..8ce8b961ff 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -59,8 +59,8 @@ typedef struct grpc_udp_listener grpc_udp_listener; struct grpc_udp_listener { int fd; - grpc_fd *emfd; - grpc_udp_server *server; + grpc_fd* emfd; + grpc_udp_server* server; grpc_resolved_address addr; grpc_closure read_closure; grpc_closure write_closure; @@ -74,12 +74,12 @@ struct grpc_udp_listener { // True if orphan_cb is trigered. bool orphan_notified; - struct grpc_udp_listener *next; + struct grpc_udp_listener* next; }; struct shutdown_fd_args { - grpc_fd *fd; - gpr_mu *server_mu; + grpc_fd* fd; + gpr_mu* server_mu; }; /* the overall server */ @@ -87,7 +87,7 @@ struct grpc_udp_server { gpr_mu mu; /* factory to use for creating and binding sockets, or NULL */ - grpc_socket_factory *socket_factory; + grpc_socket_factory* socket_factory; /* active port count: how many ports are actually still listening */ size_t active_ports; @@ -98,34 +98,34 @@ struct grpc_udp_server { int shutdown; /* linked list of server ports */ - grpc_udp_listener *head; - grpc_udp_listener *tail; + grpc_udp_listener* head; + grpc_udp_listener* tail; unsigned nports; /* shutdown callback */ - grpc_closure *shutdown_complete; + grpc_closure* shutdown_complete; /* all pollsets interested in new connections */ - grpc_pollset **pollsets; + grpc_pollset** pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; /* opaque object to pass to callbacks */ - void *user_data; + void* user_data; }; -static grpc_socket_factory *get_socket_factory(const grpc_channel_args *args) { +static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) { if (args) { - const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY); + const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY); if (arg) { GPR_ASSERT(arg->type == GRPC_ARG_POINTER); - return (grpc_socket_factory *)arg->value.pointer.p; + return (grpc_socket_factory*)arg->value.pointer.p; } } return NULL; } -grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { - grpc_udp_server *s = (grpc_udp_server *)gpr_malloc(sizeof(grpc_udp_server)); +grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { + grpc_udp_server* s = (grpc_udp_server*)gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); s->socket_factory = get_socket_factory(args); if (s->socket_factory) { @@ -141,20 +141,20 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { return s; } -static void shutdown_fd(grpc_exec_ctx *exec_ctx, void *args, - grpc_error *error) { - struct shutdown_fd_args *shutdown_args = (struct shutdown_fd_args *)args; +static void shutdown_fd(grpc_exec_ctx* exec_ctx, void* args, + grpc_error* error) { + struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args; gpr_mu_lock(shutdown_args->server_mu); grpc_fd_shutdown(exec_ctx, shutdown_args->fd, GRPC_ERROR_REF(error)); gpr_mu_unlock(shutdown_args->server_mu); gpr_free(shutdown_args); } -static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { +static void dummy_cb(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { // No-op. } -static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { +static void finish_shutdown(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { if (s->shutdown_complete != NULL) { GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } @@ -162,7 +162,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { gpr_mu_destroy(&s->mu); while (s->head) { - grpc_udp_listener *sp = s->head; + grpc_udp_listener* sp = s->head; s->head = sp->next; gpr_free(sp); } @@ -174,9 +174,9 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { gpr_free(s); } -static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, - grpc_error *error) { - grpc_udp_server *s = (grpc_udp_server *)server; +static void destroyed_port(grpc_exec_ctx* exec_ctx, void* server, + grpc_error* error) { + grpc_udp_server* s = (grpc_udp_server*)server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { @@ -190,14 +190,14 @@ static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, /* called when all listening endpoints have been shutdown, so no further events will be received on them - at this point it's safe to destroy things */ -static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { +static void deactivated_all_ports(grpc_exec_ctx* exec_ctx, grpc_udp_server* s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); GPR_ASSERT(s->shutdown); if (s->head) { - grpc_udp_listener *sp; + grpc_udp_listener* sp; for (sp = s->head; sp; sp = sp->next) { grpc_unlink_if_unix_domain_socket(&sp->addr); @@ -223,9 +223,9 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { } } -void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, - grpc_closure *on_done) { - grpc_udp_listener *sp; +void grpc_udp_server_destroy(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, + grpc_closure* on_done) { + grpc_udp_listener* sp; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -237,8 +237,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); - struct shutdown_fd_args *args = - (struct shutdown_fd_args *)gpr_malloc(sizeof(*args)); + struct shutdown_fd_args* args = + (struct shutdown_fd_args*)gpr_malloc(sizeof(*args)); args->fd = sp->emfd; args->server_mu = &s->mu; GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, @@ -254,19 +254,18 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, } } -static int bind_socket(grpc_socket_factory *socket_factory, int sockfd, - const grpc_resolved_address *addr) { +static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, + const grpc_resolved_address* addr) { return (socket_factory != NULL) ? grpc_socket_factory_bind(socket_factory, sockfd, addr) - : bind(sockfd, (struct sockaddr *)addr->addr, - (socklen_t)addr->len); + : bind(sockfd, (struct sockaddr*)addr->addr, (socklen_t)addr->len); } /* Prepare a recently-created socket for listening. */ -static int prepare_socket(grpc_socket_factory *socket_factory, int fd, - const grpc_resolved_address *addr) { +static int prepare_socket(grpc_socket_factory* socket_factory, int fd, + const grpc_resolved_address* addr) { grpc_resolved_address sockname_temp; - struct sockaddr *addr_ptr = (struct sockaddr *)addr->addr; + struct sockaddr* addr_ptr = (struct sockaddr*)addr->addr; /* Set send/receive socket buffers to 1 MB */ int buffer_size_bytes = 1024 * 1024; @@ -295,7 +294,7 @@ static int prepare_socket(grpc_socket_factory *socket_factory, int fd, GPR_ASSERT(addr->len < ~(socklen_t)0); if (bind_socket(socket_factory, fd, addr) < 0) { - char *addr_str; + char* addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); gpr_free(addr_str); @@ -304,8 +303,8 @@ static int prepare_socket(grpc_socket_factory *socket_factory, int fd, sockname_temp.len = sizeof(struct sockaddr_storage); - if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, - (socklen_t *)&sockname_temp.len) < 0) { + if (getsockname(fd, (struct sockaddr*)sockname_temp.addr, + (socklen_t*)&sockname_temp.len) < 0) { goto error; } @@ -331,8 +330,8 @@ error: } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_udp_listener *sp = (grpc_udp_listener *)arg; +static void on_read(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_udp_listener* sp = (grpc_udp_listener*)arg; gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { @@ -354,8 +353,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_unlock(&sp->server->mu); } -static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - grpc_udp_listener *sp = (grpc_udp_listener *)arg; +static void on_write(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { + grpc_udp_listener* sp = (grpc_udp_listener*)arg; gpr_mu_lock(&(sp->server->mu)); if (error != GRPC_ERROR_NONE) { @@ -377,15 +376,15 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { gpr_mu_unlock(&sp->server->mu); } -static int add_socket_to_server(grpc_udp_server *s, int fd, - const grpc_resolved_address *addr, +static int add_socket_to_server(grpc_udp_server* s, int fd, + const grpc_resolved_address* addr, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { - grpc_udp_listener *sp; + grpc_udp_listener* sp; int port; - char *addr_str; - char *name; + char* addr_str; + char* name; port = prepare_socket(s->socket_factory, fd, addr); if (port >= 0) { @@ -394,7 +393,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, gpr_free(addr_str); gpr_mu_lock(&s->mu); s->nports++; - sp = (grpc_udp_listener *)gpr_malloc(sizeof(grpc_udp_listener)); + sp = (grpc_udp_listener*)gpr_malloc(sizeof(grpc_udp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -418,12 +417,12 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, return port; } -int grpc_udp_server_add_port(grpc_udp_server *s, - const grpc_resolved_address *addr, +int grpc_udp_server_add_port(grpc_udp_server* s, + const grpc_resolved_address* addr, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { - grpc_udp_listener *sp; + grpc_udp_listener* sp; int allocated_port1 = -1; int allocated_port2 = -1; int fd; @@ -432,7 +431,7 @@ int grpc_udp_server_add_port(grpc_udp_server *s, grpc_resolved_address wild4; grpc_resolved_address wild6; grpc_resolved_address addr4_copy; - grpc_resolved_address *allocated_addr = NULL; + grpc_resolved_address* allocated_addr = NULL; grpc_resolved_address sockname_temp; int port; @@ -441,12 +440,12 @@ int grpc_udp_server_add_port(grpc_udp_server *s, if (grpc_sockaddr_get_port(addr) == 0) { for (sp = s->head; sp; sp = sp->next) { sockname_temp.len = sizeof(struct sockaddr_storage); - if (0 == getsockname(sp->fd, (struct sockaddr *)sockname_temp.addr, - (socklen_t *)&sockname_temp.len)) { + if (0 == getsockname(sp->fd, (struct sockaddr*)sockname_temp.addr, + (socklen_t*)&sockname_temp.len)) { port = grpc_sockaddr_get_port(&sockname_temp); if (port > 0) { - allocated_addr = (grpc_resolved_address *)gpr_malloc( - sizeof(grpc_resolved_address)); + allocated_addr = + (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address)); memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); grpc_sockaddr_set_port(allocated_addr, port); addr = allocated_addr; @@ -500,8 +499,8 @@ done: return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; } -int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { - grpc_udp_listener *sp; +int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { + grpc_udp_listener* sp; if (port_index >= s->nports) { return -1; } @@ -512,12 +511,12 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { return sp->fd; } -void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, - grpc_pollset **pollsets, size_t pollset_count, - void *user_data) { +void grpc_udp_server_start(grpc_exec_ctx* exec_ctx, grpc_udp_server* s, + grpc_pollset** pollsets, size_t pollset_count, + void* user_data) { size_t i; gpr_mu_lock(&s->mu); - grpc_udp_listener *sp; + grpc_udp_listener* sp; GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; s->user_data = user_data; |