aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/tcp_server_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/tcp_server_posix.c')
-rw-r--r--src/core/iomgr/tcp_server_posix.c83
1 files changed, 58 insertions, 25 deletions
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index c49f3e1518..2ac35f863a 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -85,6 +85,7 @@ typedef struct {
} addr;
int addr_len;
grpc_iomgr_closure read_closure;
+ grpc_iomgr_closure destroyed_closure;
} server_port;
static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
@@ -101,13 +102,14 @@ struct grpc_tcp_server {
void *cb_arg;
gpr_mu mu;
- gpr_cv cv;
/* active port count: how many ports are actually still listening */
size_t active_ports;
/* destroyed port count: how many ports are completely destroyed */
size_t destroyed_ports;
+ int shutdown;
+
/* all listening ports */
server_port *ports;
size_t nports;
@@ -116,14 +118,17 @@ struct grpc_tcp_server {
/* shutdown callback */
void (*shutdown_complete)(void *);
void *shutdown_complete_arg;
+
+ grpc_pollset **pollsets;
+ size_t pollset_count;
};
grpc_tcp_server *grpc_tcp_server_create(void) {
grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server));
gpr_mu_init(&s->mu);
- gpr_cv_init(&s->cv);
s->active_ports = 0;
s->destroyed_ports = 0;
+ s->shutdown = 0;
s->cb = NULL;
s->cb_arg = NULL;
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
@@ -136,7 +141,6 @@ static void finish_shutdown(grpc_tcp_server *s) {
s->shutdown_complete(s->shutdown_complete_arg);
gpr_mu_destroy(&s->mu);
- gpr_cv_destroy(&s->cv);
gpr_free(s->ports);
gpr_free(s);
@@ -156,40 +160,57 @@ static void destroyed_port(void *server, int success) {
static void dont_care_about_shutdown_completion(void *ignored) {}
+static void deactivated_all_ports(grpc_tcp_server *s) {
+ size_t i;
+
+ /* delete ALL the things */
+ gpr_mu_lock(&s->mu);
+
+ if (!s->shutdown) {
+ gpr_mu_unlock(&s->mu);
+ return;
+ }
+
+ if (s->nports) {
+ for (i = 0; i < s->nports; i++) {
+ server_port *sp = &s->ports[i];
+ if (sp->addr.sockaddr.sa_family == AF_UNIX) {
+ unlink_if_unix_domain_socket(&sp->addr.un);
+ }
+ sp->destroyed_closure.cb = destroyed_port;
+ sp->destroyed_closure.cb_arg = s;
+ grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown");
+ }
+ gpr_mu_unlock(&s->mu);
+ } else {
+ gpr_mu_unlock(&s->mu);
+ finish_shutdown(s);
+ }
+}
+
void grpc_tcp_server_destroy(
grpc_tcp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg),
void *shutdown_complete_arg) {
size_t i;
gpr_mu_lock(&s->mu);
+ GPR_ASSERT(!s->shutdown);
+ s->shutdown = 1;
+
s->shutdown_complete = shutdown_complete
? shutdown_complete
: dont_care_about_shutdown_completion;
s->shutdown_complete_arg = shutdown_complete_arg;
/* shutdown all fd's */
- for (i = 0; i < s->nports; i++) {
- grpc_fd_shutdown(s->ports[i].emfd);
- }
- /* wait while that happens */
- /* TODO(ctiller): make this asynchronous also */
- while (s->active_ports) {
- gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future);
- }
-
- /* delete ALL the things */
- if (s->nports) {
+ if (s->active_ports) {
for (i = 0; i < s->nports; i++) {
- server_port *sp = &s->ports[i];
- if (sp->addr.sockaddr.sa_family == AF_UNIX) {
- unlink_if_unix_domain_socket(&sp->addr.un);
- }
- grpc_fd_orphan(sp->emfd, destroyed_port, s);
+ grpc_fd_shutdown(s->ports[i].emfd);
}
gpr_mu_unlock(&s->mu);
} else {
gpr_mu_unlock(&s->mu);
- finish_shutdown(s);
+ deactivated_all_ports(s);
}
}
@@ -274,6 +295,8 @@ error:
/* event manager callback when reads are ready */
static void on_read(void *arg, int success) {
server_port *sp = arg;
+ grpc_fd *fdobj;
+ size_t i;
if (!success) {
goto error;
@@ -306,12 +329,18 @@ static void on_read(void *arg, int success) {
grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
+ fdobj = grpc_fd_create(fd, name);
+ /* TODO(ctiller): revise this when we have server-side sharding
+ of channels -- we certainly should not be automatically adding every
+ incoming channel to every pollset owned by the server */
+ for (i = 0; i < sp->server->pollset_count; i++) {
+ grpc_pollset_add_fd(sp->server->pollsets[i], fdobj);
+ }
sp->server->cb(sp->server->cb_arg,
- grpc_tcp_create(grpc_fd_create(fd, name),
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
+ grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
- gpr_free(addr_str);
gpr_free(name);
+ gpr_free(addr_str);
}
abort();
@@ -319,9 +348,11 @@ static void on_read(void *arg, int success) {
error:
gpr_mu_lock(&sp->server->mu);
if (0 == --sp->server->active_ports) {
- gpr_cv_broadcast(&sp->server->cv);
+ gpr_mu_unlock(&sp->server->mu);
+ deactivated_all_ports(sp->server);
+ } else {
+ gpr_mu_unlock(&sp->server->mu);
}
- gpr_mu_unlock(&sp->server->mu);
}
static int add_socket_to_server(grpc_tcp_server *s, int fd,
@@ -452,6 +483,8 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets,
GPR_ASSERT(s->active_ports == 0);
s->cb = cb;
s->cb_arg = cb_arg;
+ s->pollsets = pollsets;
+ s->pollset_count = pollset_count;
for (i = 0; i < s->nports; i++) {
for (j = 0; j < pollset_count; j++) {
grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd);