aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/tcp_server_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/tcp_server_posix.c')
-rw-r--r--src/core/iomgr/tcp_server_posix.c64
1 files changed, 28 insertions, 36 deletions
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 213b2e1113..0c5e0053dd 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -117,16 +117,12 @@ struct grpc_tcp_server {
size_t port_capacity;
/* shutdown callback */
- void (*shutdown_complete)(void *);
- void *shutdown_complete_arg;
+ grpc_closure *shutdown_complete;
/* all pollsets interested in new connections */
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
-
- /** workqueue for interally created async work */
- grpc_workqueue *workqueue;
};
grpc_tcp_server *grpc_tcp_server_create(void) {
@@ -140,40 +136,37 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
- s->workqueue = grpc_workqueue_create();
return s;
}
-static void finish_shutdown(grpc_tcp_server *s) {
- s->shutdown_complete(s->shutdown_complete_arg);
- s->shutdown_complete = NULL;
+static void finish_shutdown(grpc_tcp_server *s, grpc_call_list *call_list) {
+ grpc_call_list_add(call_list, s->shutdown_complete, 1);
gpr_mu_destroy(&s->mu);
gpr_free(s->ports);
- GRPC_WORKQUEUE_UNREF(s->workqueue, "destroy");
gpr_free(s);
}
-static void destroyed_port(void *server, int success) {
+static void destroyed_port(void *server, int success,
+ grpc_call_list *call_list) {
grpc_tcp_server *s = server;
gpr_mu_lock(&s->mu);
s->destroyed_ports++;
if (s->destroyed_ports == s->nports) {
gpr_mu_unlock(&s->mu);
- finish_shutdown(s);
+ finish_shutdown(s, call_list);
} else {
GPR_ASSERT(s->destroyed_ports < s->nports);
gpr_mu_unlock(&s->mu);
}
}
-static void dont_care_about_shutdown_completion(void *ignored) {}
-
/* called when all listening endpoints have been shutdown, so no further
events will be received on them - at this point it's safe to destroy
things */
-static void deactivated_all_ports(grpc_tcp_server *s) {
+static void deactivated_all_ports(grpc_tcp_server *s,
+ grpc_call_list *call_list) {
size_t i;
/* delete ALL the things */
@@ -192,38 +185,35 @@ static void deactivated_all_ports(grpc_tcp_server *s) {
}
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
- grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown");
+ grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown",
+ call_list);
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- finish_shutdown(s);
+ finish_shutdown(s, call_list);
}
}
-void grpc_tcp_server_destroy(
- grpc_tcp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg),
- void *shutdown_complete_arg) {
+void grpc_tcp_server_destroy(grpc_tcp_server *s, grpc_closure *closure,
+ grpc_call_list *call_list) {
size_t i;
gpr_mu_lock(&s->mu);
GPR_ASSERT(!s->shutdown);
s->shutdown = 1;
- s->shutdown_complete = shutdown_complete
- ? shutdown_complete
- : dont_care_about_shutdown_completion;
- s->shutdown_complete_arg = shutdown_complete_arg;
+ s->shutdown_complete = closure;
/* shutdown all fd's */
if (s->active_ports) {
for (i = 0; i < s->nports; i++) {
- grpc_fd_shutdown(s->ports[i].emfd);
+ grpc_fd_shutdown(s->ports[i].emfd, call_list);
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- deactivated_all_ports(s);
+ deactivated_all_ports(s, call_list);
}
}
@@ -308,7 +298,7 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(void *arg, int success) {
+static void on_read(void *arg, int success, grpc_call_list *call_list) {
server_port *sp = arg;
grpc_fd *fdobj;
size_t i;
@@ -331,7 +321,7 @@ static void on_read(void *arg, int success) {
case EINTR:
continue;
case EAGAIN:
- grpc_fd_notify_on_read(sp->emfd, &sp->read_closure);
+ grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, call_list);
return;
default:
gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
@@ -348,16 +338,17 @@ static void on_read(void *arg, int success) {
gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str);
}
- fdobj = grpc_fd_create(fd, sp->server->workqueue, name);
+ fdobj = grpc_fd_create(fd, name);
/* TODO(ctiller): revise this when we have server-side sharding
of channels -- we certainly should not be automatically adding every
incoming channel to every pollset owned by the server */
for (i = 0; i < sp->server->pollset_count; i++) {
- grpc_pollset_add_fd(sp->server->pollsets[i], fdobj);
+ grpc_pollset_add_fd(sp->server->pollsets[i], fdobj, call_list);
}
sp->server->cb(
sp->server->cb_arg,
- grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
+ grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str),
+ call_list);
gpr_free(name);
gpr_free(addr_str);
@@ -369,7 +360,7 @@ error:
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
gpr_mu_unlock(&sp->server->mu);
- deactivated_all_ports(sp->server);
+ deactivated_all_ports(sp->server, call_list);
} else {
gpr_mu_unlock(&sp->server->mu);
}
@@ -396,7 +387,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, s->workqueue, name);
+ sp->emfd = grpc_fd_create(fd, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
GPR_ASSERT(sp->emfd);
@@ -495,7 +486,7 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) {
void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets,
size_t pollset_count, grpc_tcp_server_cb cb,
- void *cb_arg) {
+ void *cb_arg, grpc_call_list *call_list) {
size_t i, j;
GPR_ASSERT(cb);
gpr_mu_lock(&s->mu);
@@ -507,11 +498,12 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets,
s->pollset_count = pollset_count;
for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) {
- grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);
+ grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, call_list);
}
s->ports[i].read_closure.cb = on_read;
s->ports[i].read_closure.cb_arg = &s->ports[i];
- grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure);
+ grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure,
+ call_list);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);