diff options
author | 2017-11-09 17:46:29 -0800 | |
---|---|---|
committer | 2017-11-09 17:46:29 -0800 | |
commit | 4e9265c828f0b559b5fdba04913fed46bf771399 (patch) | |
tree | 4a379fc2bdc037753cf8d81f8b86327e4bc50a42 /src/core/lib/iomgr/udp_server.cc | |
parent | 0ee7574732a06e8cace4e099a678f4bd5dbff679 (diff) | |
parent | d9da7387b8057f3bd99a417a5ee905377bce9296 (diff) |
Merge with master
Diffstat (limited to 'src/core/lib/iomgr/udp_server.cc')
-rw-r--r-- | src/core/lib/iomgr/udp_server.cc | 123 |
1 files changed, 61 insertions, 62 deletions
diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index c868e82d1d..8e837e5e2f 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -59,8 +59,8 @@ typedef struct grpc_udp_listener grpc_udp_listener; struct grpc_udp_listener { int fd; - grpc_fd *emfd; - grpc_udp_server *server; + grpc_fd* emfd; + grpc_udp_server* server; grpc_resolved_address addr; grpc_closure read_closure; grpc_closure write_closure; @@ -74,12 +74,12 @@ struct grpc_udp_listener { // True if orphan_cb is trigered. bool orphan_notified; - struct grpc_udp_listener *next; + struct grpc_udp_listener* next; }; struct shutdown_fd_args { - grpc_fd *fd; - gpr_mu *server_mu; + grpc_fd* fd; + gpr_mu* server_mu; }; /* the overall server */ @@ -87,7 +87,7 @@ struct grpc_udp_server { gpr_mu mu; /* factory to use for creating and binding sockets, or NULL */ - grpc_socket_factory *socket_factory; + grpc_socket_factory* socket_factory; /* active port count: how many ports are actually still listening */ size_t active_ports; @@ -98,34 +98,34 @@ struct grpc_udp_server { int shutdown; /* linked list of server ports */ - grpc_udp_listener *head; - grpc_udp_listener *tail; + grpc_udp_listener* head; + grpc_udp_listener* tail; unsigned nports; /* shutdown callback */ - grpc_closure *shutdown_complete; + grpc_closure* shutdown_complete; /* all pollsets interested in new connections */ - grpc_pollset **pollsets; + grpc_pollset** pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; /* opaque object to pass to callbacks */ - void *user_data; + void* user_data; }; -static grpc_socket_factory *get_socket_factory(const grpc_channel_args *args) { +static grpc_socket_factory* get_socket_factory(const grpc_channel_args* args) { if (args) { - const grpc_arg *arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY); + const grpc_arg* arg = grpc_channel_args_find(args, GRPC_ARG_SOCKET_FACTORY); if (arg) { GPR_ASSERT(arg->type == GRPC_ARG_POINTER); - return (grpc_socket_factory *)arg->value.pointer.p; + return (grpc_socket_factory*)arg->value.pointer.p; } } return NULL; } -grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { - grpc_udp_server *s = (grpc_udp_server *)gpr_malloc(sizeof(grpc_udp_server)); +grpc_udp_server* grpc_udp_server_create(const grpc_channel_args* args) { + grpc_udp_server* s = (grpc_udp_server*)gpr_malloc(sizeof(grpc_udp_server)); gpr_mu_init(&s->mu); s->socket_factory = get_socket_factory(args); if (s->socket_factory) { @@ -141,19 +141,19 @@ grpc_udp_server *grpc_udp_server_create(const grpc_channel_args *args) { return s; } -static void shutdown_fd(void *args, grpc_error *error) { - struct shutdown_fd_args *shutdown_args = (struct shutdown_fd_args *)args; +static void shutdown_fd(void* args, grpc_error* error) { + struct shutdown_fd_args* shutdown_args = (struct shutdown_fd_args*)args; gpr_mu_lock(shutdown_args->server_mu); grpc_fd_shutdown(shutdown_args->fd, GRPC_ERROR_REF(error)); gpr_mu_unlock(shutdown_args->server_mu); gpr_free(shutdown_args); } -static void dummy_cb(void *arg, grpc_error *error) { +static void dummy_cb(void* arg, grpc_error* error) { // No-op. } -static void finish_shutdown(grpc_udp_server *s) { +static void finish_shutdown(grpc_udp_server* s) { if (s->shutdown_complete != NULL) { GRPC_CLOSURE_SCHED(s->shutdown_complete, GRPC_ERROR_NONE); } @@ -161,7 +161,7 @@ static void finish_shutdown(grpc_udp_server *s) { gpr_mu_destroy(&s->mu); while (s->head) { - grpc_udp_listener *sp = s->head; + grpc_udp_listener* sp = s->head; s->head = sp->next; gpr_free(sp); } @@ -173,8 +173,8 @@ static void finish_shutdown(grpc_udp_server *s) { gpr_free(s); } -static void destroyed_port(void *server, grpc_error *error) { - grpc_udp_server *s = (grpc_udp_server *)server; +static void destroyed_port(void* server, grpc_error* error) { + grpc_udp_server* s = (grpc_udp_server*)server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { @@ -188,14 +188,14 @@ static void destroyed_port(void *server, grpc_error *error) { /* called when all listening endpoints have been shutdown, so no further events will be received on them - at this point it's safe to destroy things */ -static void deactivated_all_ports(grpc_udp_server *s) { +static void deactivated_all_ports(grpc_udp_server* s) { /* delete ALL the things */ gpr_mu_lock(&s->mu); GPR_ASSERT(s->shutdown); if (s->head) { - grpc_udp_listener *sp; + grpc_udp_listener* sp; for (sp = s->head; sp; sp = sp->next) { grpc_unlink_if_unix_domain_socket(&sp->addr); @@ -220,8 +220,8 @@ static void deactivated_all_ports(grpc_udp_server *s) { } } -void grpc_udp_server_destroy(grpc_udp_server *s, grpc_closure *on_done) { - grpc_udp_listener *sp; +void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { + grpc_udp_listener* sp; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); @@ -233,8 +233,8 @@ void grpc_udp_server_destroy(grpc_udp_server *s, grpc_closure *on_done) { if (s->active_ports) { for (sp = s->head; sp; sp = sp->next) { GPR_ASSERT(sp->orphan_cb); - struct shutdown_fd_args *args = - (struct shutdown_fd_args *)gpr_malloc(sizeof(*args)); + struct shutdown_fd_args* args = + (struct shutdown_fd_args*)gpr_malloc(sizeof(*args)); args->fd = sp->emfd; args->server_mu = &s->mu; GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, @@ -249,19 +249,18 @@ void grpc_udp_server_destroy(grpc_udp_server *s, grpc_closure *on_done) { } } -static int bind_socket(grpc_socket_factory *socket_factory, int sockfd, - const grpc_resolved_address *addr) { +static int bind_socket(grpc_socket_factory* socket_factory, int sockfd, + const grpc_resolved_address* addr) { return (socket_factory != NULL) ? grpc_socket_factory_bind(socket_factory, sockfd, addr) - : bind(sockfd, (struct sockaddr *)addr->addr, - (socklen_t)addr->len); + : bind(sockfd, (struct sockaddr*)addr->addr, (socklen_t)addr->len); } /* Prepare a recently-created socket for listening. */ -static int prepare_socket(grpc_socket_factory *socket_factory, int fd, - const grpc_resolved_address *addr) { +static int prepare_socket(grpc_socket_factory* socket_factory, int fd, + const grpc_resolved_address* addr) { grpc_resolved_address sockname_temp; - struct sockaddr *addr_ptr = (struct sockaddr *)addr->addr; + struct sockaddr* addr_ptr = (struct sockaddr*)addr->addr; /* Set send/receive socket buffers to 1 MB */ int buffer_size_bytes = 1024 * 1024; @@ -290,7 +289,7 @@ static int prepare_socket(grpc_socket_factory *socket_factory, int fd, GPR_ASSERT(addr->len < ~(socklen_t)0); if (bind_socket(socket_factory, fd, addr) < 0) { - char *addr_str; + char* addr_str; grpc_sockaddr_to_string(&addr_str, addr, 0); gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno)); gpr_free(addr_str); @@ -299,8 +298,8 @@ static int prepare_socket(grpc_socket_factory *socket_factory, int fd, sockname_temp.len = sizeof(struct sockaddr_storage); - if (getsockname(fd, (struct sockaddr *)sockname_temp.addr, - (socklen_t *)&sockname_temp.len) < 0) { + if (getsockname(fd, (struct sockaddr*)sockname_temp.addr, + (socklen_t*)&sockname_temp.len) < 0) { goto error; } @@ -326,8 +325,8 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, grpc_error *error) { - grpc_udp_listener *sp = (grpc_udp_listener *)arg; +static void on_read(void* arg, grpc_error* error) { + grpc_udp_listener* sp = (grpc_udp_listener*)arg; gpr_mu_lock(&sp->server->mu); if (error != GRPC_ERROR_NONE) { @@ -349,8 +348,8 @@ static void on_read(void *arg, grpc_error *error) { gpr_mu_unlock(&sp->server->mu); } -static void on_write(void *arg, grpc_error *error) { - grpc_udp_listener *sp = (grpc_udp_listener *)arg; +static void on_write(void* arg, grpc_error* error) { + grpc_udp_listener* sp = (grpc_udp_listener*)arg; gpr_mu_lock(&(sp->server->mu)); if (error != GRPC_ERROR_NONE) { @@ -372,15 +371,15 @@ static void on_write(void *arg, grpc_error *error) { gpr_mu_unlock(&sp->server->mu); } -static int add_socket_to_server(grpc_udp_server *s, int fd, - const grpc_resolved_address *addr, +static int add_socket_to_server(grpc_udp_server* s, int fd, + const grpc_resolved_address* addr, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { - grpc_udp_listener *sp; + grpc_udp_listener* sp; int port; - char *addr_str; - char *name; + char* addr_str; + char* name; port = prepare_socket(s->socket_factory, fd, addr); if (port >= 0) { @@ -389,7 +388,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, gpr_free(addr_str); gpr_mu_lock(&s->mu); s->nports++; - sp = (grpc_udp_listener *)gpr_malloc(sizeof(grpc_udp_listener)); + sp = (grpc_udp_listener*)gpr_malloc(sizeof(grpc_udp_listener)); sp->next = NULL; if (s->head == NULL) { s->head = sp; @@ -413,12 +412,12 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, return port; } -int grpc_udp_server_add_port(grpc_udp_server *s, - const grpc_resolved_address *addr, +int grpc_udp_server_add_port(grpc_udp_server* s, + const grpc_resolved_address* addr, grpc_udp_server_read_cb read_cb, grpc_udp_server_write_cb write_cb, grpc_udp_server_orphan_cb orphan_cb) { - grpc_udp_listener *sp; + grpc_udp_listener* sp; int allocated_port1 = -1; int allocated_port2 = -1; int fd; @@ -427,7 +426,7 @@ int grpc_udp_server_add_port(grpc_udp_server *s, grpc_resolved_address wild4; grpc_resolved_address wild6; grpc_resolved_address addr4_copy; - grpc_resolved_address *allocated_addr = NULL; + grpc_resolved_address* allocated_addr = NULL; grpc_resolved_address sockname_temp; int port; @@ -436,12 +435,12 @@ int grpc_udp_server_add_port(grpc_udp_server *s, if (grpc_sockaddr_get_port(addr) == 0) { for (sp = s->head; sp; sp = sp->next) { sockname_temp.len = sizeof(struct sockaddr_storage); - if (0 == getsockname(sp->fd, (struct sockaddr *)sockname_temp.addr, - (socklen_t *)&sockname_temp.len)) { + if (0 == getsockname(sp->fd, (struct sockaddr*)sockname_temp.addr, + (socklen_t*)&sockname_temp.len)) { port = grpc_sockaddr_get_port(&sockname_temp); if (port > 0) { - allocated_addr = (grpc_resolved_address *)gpr_malloc( - sizeof(grpc_resolved_address)); + allocated_addr = + (grpc_resolved_address*)gpr_malloc(sizeof(grpc_resolved_address)); memcpy(allocated_addr, addr, sizeof(grpc_resolved_address)); grpc_sockaddr_set_port(allocated_addr, port); addr = allocated_addr; @@ -495,8 +494,8 @@ done: return allocated_port1 >= 0 ? allocated_port1 : allocated_port2; } -int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { - grpc_udp_listener *sp; +int grpc_udp_server_get_fd(grpc_udp_server* s, unsigned port_index) { + grpc_udp_listener* sp; if (port_index >= s->nports) { return -1; } @@ -507,11 +506,11 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) { return sp->fd; } -void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, - size_t pollset_count, void *user_data) { +void grpc_udp_server_start(grpc_udp_server* s, grpc_pollset** pollsets, + size_t pollset_count, void* user_data) { size_t i; gpr_mu_lock(&s->mu); - grpc_udp_listener *sp; + grpc_udp_listener* sp; GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; s->user_data = user_data; |