aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/iomgr/fd_posix.c28
-rw-r--r--src/core/iomgr/fd_posix.h4
-rw-r--r--src/core/iomgr/tcp_client_posix.c12
-rw-r--r--src/core/iomgr/tcp_posix.c2
-rw-r--r--src/core/iomgr/tcp_server_posix.c5
-rw-r--r--test/core/iomgr/fd_posix_test.c8
6 files changed, 30 insertions, 29 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index a43e3ed278..52a6920321 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -134,7 +134,9 @@ static void unref_by(grpc_fd *fd, int n) {
#endif
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
- grpc_iomgr_add_callback(&fd->on_done_closure);
+ if (fd->on_done_closure) {
+ grpc_iomgr_add_callback(fd->on_done_closure);
+ }
freelist_fd(fd);
grpc_iomgr_unregister_object(&fd->iomgr_object);
} else {
@@ -153,8 +155,6 @@ void grpc_fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-static void do_nothing(void *ignored, int success) {}
-
grpc_fd *grpc_fd_create(int fd, const char *name) {
grpc_fd *r = alloc_fd(fd);
grpc_iomgr_register_object(&r->iomgr_object, name);
@@ -195,9 +195,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}
}
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
- grpc_iomgr_closure_init(&fd->on_done_closure, on_done ? on_done : do_nothing,
- user_data);
+void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done) {
+ fd->on_done_closure = on_done;
shutdown(fd->fd, SHUT_RDWR);
REF_BY(fd, 1, "orphan"); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
@@ -208,21 +207,18 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
/* increment refcount by two to avoid changing the orphan bit */
#ifdef GRPC_FD_REF_COUNT_DEBUG
-void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line) {
- ref_by(fd, 2, reason, file, line);
+void grpc_fd_ref(grpc_fd *fd, const char *reason, const char *file, int line) {
+ ref_by(fd, 2, reason, file, line);
}
-void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file, int line) {
- unref_by(fd, 2, reason, file, line);
+void grpc_fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ unref_by(fd, 2, reason, file, line);
}
#else
-void grpc_fd_ref(grpc_fd *fd) {
- ref_by(fd, 2);
-}
+void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
-void grpc_fd_unref(grpc_fd *fd) {
- unref_by(fd, 2);
-}
+void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static void process_callback(grpc_iomgr_closure *closure, int success,
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 70992aead5..1de5d088c5 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -93,7 +93,7 @@ struct grpc_fd {
struct grpc_fd *freelist_next;
- grpc_iomgr_closure on_done_closure;
+ grpc_iomgr_closure *on_done_closure;
grpc_iomgr_closure *shutdown_closures[2];
grpc_iomgr_object iomgr_object;
@@ -109,7 +109,7 @@ grpc_fd *grpc_fd_create(int fd, const char *name);
If on_done is NULL, no callback will be made.
Requires: *fd initialized; no outstanding notify_on_read or
notify_on_write. */
-void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data);
+void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done);
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 981c326511..93c14c4aab 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -112,8 +112,6 @@ static void on_writable(void *acp, int success) {
void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
void *cb_arg = ac->cb_arg;
- grpc_alarm_cancel(&ac->alarm);
-
if (success) {
do {
so_error_size = sizeof(so_error);
@@ -166,13 +164,15 @@ static void on_writable(void *acp, int success) {
finish:
gpr_mu_lock(&ac->mu);
if (!ep) {
- grpc_fd_orphan(ac->fd, NULL, NULL);
+ grpc_fd_orphan(ac->fd, NULL);
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac);
+ } else {
+ grpc_alarm_cancel(&ac->alarm);
}
cb(cb_arg, ep);
}
@@ -230,7 +230,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
gpr_log(GPR_ERROR, "connect error to '%s': %s", strerror(errno));
- grpc_fd_orphan(fdobj, NULL, NULL);
+ grpc_fd_orphan(fdobj, NULL);
cb(arg, NULL);
goto done;
}
@@ -244,8 +244,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac->write_closure.cb = on_writable;
ac->write_closure.cb_arg = ac;
- grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
+ gpr_mu_lock(&ac->mu);
grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ grpc_fd_notify_on_write(ac->fd, &ac->write_closure);
+ gpr_mu_unlock(&ac->mu);
done:
gpr_free(name);
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 2f19f9d442..4fbbaa7c7d 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -295,7 +295,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) {
static void grpc_tcp_unref(grpc_tcp *tcp) {
int refcount_zero = gpr_unref(&tcp->refcount);
if (refcount_zero) {
- grpc_fd_orphan(tcp->em_fd, NULL, NULL);
+ grpc_fd_orphan(tcp->em_fd, NULL);
gpr_free(tcp);
}
}
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 5c0203c3e3..fe71bdfe6f 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -84,6 +84,7 @@ typedef struct {
} addr;
int addr_len;
grpc_iomgr_closure read_closure;
+ grpc_iomgr_closure destroyed_closure;
} server_port;
static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
@@ -175,7 +176,9 @@ static void deactivated_all_ports(grpc_tcp_server *s) {
if (sp->addr.sockaddr.sa_family == AF_UNIX) {
unlink_if_unix_domain_socket(&sp->addr.un);
}
- grpc_fd_orphan(sp->emfd, destroyed_port, s);
+ sp->destroyed_closure.cb = destroyed_port;
+ sp->destroyed_closure.cb_arg = s;
+ grpc_fd_orphan(sp->emfd, &sp->destroyed_closure);
}
gpr_mu_unlock(&s->mu);
} else {
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 4faa888ca5..2d1ae45bc5 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -120,7 +120,7 @@ static void session_shutdown_cb(void *arg, /*session*/
int success) {
session *se = arg;
server *sv = se->sv;
- grpc_fd_orphan(se->em_fd, NULL, NULL);
+ grpc_fd_orphan(se->em_fd, NULL);
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(sv->em_fd);
@@ -175,7 +175,7 @@ static void session_read_cb(void *arg, /*session*/
static void listen_shutdown_cb(void *arg /*server*/, int success) {
server *sv = arg;
- grpc_fd_orphan(sv->em_fd, NULL, NULL);
+ grpc_fd_orphan(sv->em_fd, NULL);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
sv->done = 1;
@@ -284,7 +284,7 @@ static void client_init(client *cl) {
/* Called when a client upload session is ready to shutdown. */
static void client_session_shutdown_cb(void *arg /*client*/, int success) {
client *cl = arg;
- grpc_fd_orphan(cl->em_fd, NULL, NULL);
+ grpc_fd_orphan(cl->em_fd, NULL);
cl->done = 1;
grpc_pollset_kick(&g_pollset);
}
@@ -472,7 +472,7 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_fd_orphan(em_fd, NULL, NULL);
+ grpc_fd_orphan(em_fd, NULL);
destroy_change_data(&a);
destroy_change_data(&b);
close(sv[1]);