diff options
Diffstat (limited to 'src/core/iomgr/tcp_server_posix.c')
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 64 |
1 files changed, 28 insertions, 36 deletions
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 213b2e1113..0c5e0053dd 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -117,16 +117,12 @@ struct grpc_tcp_server { size_t port_capacity; /* shutdown callback */ - void (*shutdown_complete)(void *); - void *shutdown_complete_arg; + grpc_closure *shutdown_complete; /* all pollsets interested in new connections */ grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; - - /** workqueue for interally created async work */ - grpc_workqueue *workqueue; }; grpc_tcp_server *grpc_tcp_server_create(void) { @@ -140,40 +136,37 @@ grpc_tcp_server *grpc_tcp_server_create(void) { s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; - s->workqueue = grpc_workqueue_create(); return s; } -static void finish_shutdown(grpc_tcp_server *s) { - s->shutdown_complete(s->shutdown_complete_arg); - s->shutdown_complete = NULL; +static void finish_shutdown(grpc_tcp_server *s, grpc_call_list *call_list) { + grpc_call_list_add(call_list, s->shutdown_complete, 1); gpr_mu_destroy(&s->mu); gpr_free(s->ports); - GRPC_WORKQUEUE_UNREF(s->workqueue, "destroy"); gpr_free(s); } -static void destroyed_port(void *server, int success) { +static void destroyed_port(void *server, int success, + grpc_call_list *call_list) { grpc_tcp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { gpr_mu_unlock(&s->mu); - finish_shutdown(s); + finish_shutdown(s, call_list); } else { GPR_ASSERT(s->destroyed_ports < s->nports); gpr_mu_unlock(&s->mu); } } -static void dont_care_about_shutdown_completion(void *ignored) {} - /* called when all listening endpoints have been shutdown, so no further events will be received on them - at this point it's safe to destroy things */ -static void deactivated_all_ports(grpc_tcp_server *s) { +static void deactivated_all_ports(grpc_tcp_server *s, + grpc_call_list *call_list) { size_t i; /* delete ALL the things */ @@ -192,38 +185,35 @@ static void deactivated_all_ports(grpc_tcp_server *s) { } sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; - grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown"); + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown", + call_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(s); + finish_shutdown(s, call_list); } } -void grpc_tcp_server_destroy( - grpc_tcp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg), - void *shutdown_complete_arg) { +void grpc_tcp_server_destroy(grpc_tcp_server *s, grpc_closure *closure, + grpc_call_list *call_list) { size_t i; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); s->shutdown = 1; - s->shutdown_complete = shutdown_complete - ? shutdown_complete - : dont_care_about_shutdown_completion; - s->shutdown_complete_arg = shutdown_complete_arg; + s->shutdown_complete = closure; /* shutdown all fd's */ if (s->active_ports) { for (i = 0; i < s->nports; i++) { - grpc_fd_shutdown(s->ports[i].emfd); + grpc_fd_shutdown(s->ports[i].emfd, call_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - deactivated_all_ports(s); + deactivated_all_ports(s, call_list); } } @@ -308,7 +298,7 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, int success) { +static void on_read(void *arg, int success, grpc_call_list *call_list) { server_port *sp = arg; grpc_fd *fdobj; size_t i; @@ -331,7 +321,7 @@ static void on_read(void *arg, int success) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, call_list); return; default: gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); @@ -348,16 +338,17 @@ static void on_read(void *arg, int success) { gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); } - fdobj = grpc_fd_create(fd, sp->server->workqueue, name); + fdobj = grpc_fd_create(fd, name); /* TODO(ctiller): revise this when we have server-side sharding of channels -- we certainly should not be automatically adding every incoming channel to every pollset owned by the server */ for (i = 0; i < sp->server->pollset_count; i++) { - grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); + grpc_pollset_add_fd(sp->server->pollsets[i], fdobj, call_list); } sp->server->cb( sp->server->cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); + grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), + call_list); gpr_free(name); gpr_free(addr_str); @@ -369,7 +360,7 @@ error: gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server); + deactivated_all_ports(sp->server, call_list); } else { gpr_mu_unlock(&sp->server->mu); } @@ -396,7 +387,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, sp = &s->ports[s->nports++]; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, s->workqueue, name); + sp->emfd = grpc_fd_create(fd, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; GPR_ASSERT(sp->emfd); @@ -495,7 +486,7 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) { void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg) { + void *cb_arg, grpc_call_list *call_list) { size_t i, j; GPR_ASSERT(cb); gpr_mu_lock(&s->mu); @@ -507,11 +498,12 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, s->pollset_count = pollset_count; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd); + grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, call_list); } s->ports[i].read_closure.cb = on_read; s->ports[i].read_closure.cb_arg = &s->ports[i]; - grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure); + grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure, + call_list); s->active_ports++; } gpr_mu_unlock(&s->mu); |