aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/iomgr/fd_posix.c6
-rw-r--r--src/core/iomgr/fd_posix.h3
-rw-r--r--src/core/iomgr/tcp_client_posix.c4
-rw-r--r--src/core/iomgr/tcp_posix.c16
-rw-r--r--src/core/iomgr/tcp_posix.h7
-rw-r--r--src/core/iomgr/tcp_server_posix.c2
-rw-r--r--src/core/iomgr/udp_server.c2
-rw-r--r--src/core/iomgr/workqueue_posix.c2
-rw-r--r--test/core/iomgr/fd_posix_test.c8
-rw-r--r--test/core/iomgr/tcp_posix_test.c17
10 files changed, 33 insertions, 34 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index d7c682a91a..2be0ea235f 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -207,11 +207,13 @@ static int has_watchers(grpc_fd *fd) {
}
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int release_fd, const char *reason) {
+ int *release_fd, const char *reason) {
fd->on_done_closure = on_done;
- fd->released = release_fd;
+ fd->released = release_fd != NULL;
if (!fd->released) {
shutdown(fd->fd, SHUT_RDWR);
+ } else {
+ *release_fd = fd->fd;
}
gpr_mu_lock(&fd->mu);
REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 73443f7152..d628ef3aaf 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -108,11 +108,12 @@ grpc_fd *grpc_fd_create(int fd, const char *name);
/* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d.
If on_done is NULL, no callback will be made.
+ If release_fd is not NULL, it's set to fd and fd will not be closed.
Requires: *fd initialized; no outstanding notify_on_read or
notify_on_write.
MUST NOT be called with a pollset lock taken */
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
- int release_fd, const char *reason);
+ int *release_fd, const char *reason);
/* Begin polling on an fd.
Registers that the given pollset is interested in this fd - so that if read
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 01a88214c7..d9d24ee9a3 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -196,7 +196,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) {
finish:
if (fd != NULL) {
grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd);
- grpc_fd_orphan(exec_ctx, fd, NULL, 0, "tcp_client_orphan");
+ grpc_fd_orphan(exec_ctx, fd, NULL, NULL, "tcp_client_orphan");
fd = NULL;
}
done = (--ac->refs == 0);
@@ -265,7 +265,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno));
- grpc_fd_orphan(exec_ctx, fdobj, NULL, 0, "tcp_client_connect_error");
+ grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
grpc_exec_ctx_enqueue(exec_ctx, closure, 0);
goto done;
}
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index f8d62cf60b..f3be41aa57 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -74,7 +74,6 @@ typedef struct {
grpc_fd *em_fd;
int fd;
int finished_edge;
- int fd_released;
msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
size_t slice_size;
gpr_refcount refcount;
@@ -92,6 +91,7 @@ typedef struct {
grpc_closure *read_cb;
grpc_closure *write_cb;
grpc_closure *release_fd_cb;
+ int *release_fd;
grpc_closure read_closure;
grpc_closure write_closure;
@@ -110,7 +110,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
}
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
- grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->fd_released,
+ grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
gpr_slice_buffer_destroy(&tcp->last_read_buffer);
gpr_free(tcp->peer_string);
@@ -456,11 +456,11 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
tcp->read_cb = NULL;
tcp->write_cb = NULL;
tcp->release_fd_cb = NULL;
+ tcp->release_fd = NULL;
tcp->incoming_buffer = NULL;
tcp->slice_size = slice_size;
tcp->iov_size = 1;
tcp->finished_edge = 1;
- tcp->fd_released = 0;
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
tcp->em_fd = em_fd;
@@ -473,15 +473,11 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
return &tcp->base;
}
-int grpc_tcp_get_fd(grpc_endpoint *ep) {
- grpc_tcp *tcp = (grpc_tcp *)ep;
- return tcp->fd;
-}
-
void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- grpc_closure *done) {
+ int *fd, grpc_closure *done) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- tcp->fd_released = 1;
+ GPR_ASSERT(ep->vtable == &vtable);
+ tcp->release_fd = fd;
tcp->release_fd_cb = done;
TCP_UNREF(exec_ctx, tcp, "destroy");
}
diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h
index 3819905c53..b554983ae1 100644
--- a/src/core/iomgr/tcp_posix.h
+++ b/src/core/iomgr/tcp_posix.h
@@ -56,9 +56,10 @@ extern int grpc_tcp_trace;
grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size,
const char *peer_string);
-int grpc_tcp_get_fd(grpc_endpoint *ep);
-
+/* Destroy the tcp endpoint without closing its fd. *fd will be set and done
+ * will be called when the endpoint is destroyed.
+ * Requires: ep must be a tcp endpoint and fd must not be NULL. */
void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
- grpc_closure *done);
+ int *fd, grpc_closure *done);
#endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 8362cfa4b4..a89ee02d34 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -193,7 +193,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
- grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, 0,
+ grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"tcp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index b568078687..782fbd9f46 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -179,7 +179,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
}
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
- grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, 0,
+ grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index 583e63beef..2e30178131 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -115,7 +115,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success) {
/* HACK: let wakeup_fd code know that we stole the fd */
workqueue->wakeup_fd.read_fd = 0;
grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
- grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, 0, "destroy");
+ grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy");
gpr_free(workqueue);
} else {
gpr_mu_lock(&workqueue->mu);
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 9605babc5b..4be6957a83 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -121,7 +121,7 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
int success) {
session *se = arg;
server *sv = se->sv;
- grpc_fd_orphan(exec_ctx, se->em_fd, NULL, 0, "a");
+ grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a");
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(exec_ctx, sv->em_fd);
@@ -177,7 +177,7 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */,
int success) {
server *sv = arg;
- grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, 0, "b");
+ grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, "b");
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
sv->done = 1;
@@ -294,7 +294,7 @@ static void client_init(client *cl) {
static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx,
void *arg /*client */, int success) {
client *cl = arg;
- grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, 0, "c");
+ grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c");
cl->done = 1;
grpc_pollset_kick(&g_pollset, NULL);
}
@@ -503,7 +503,7 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_fd_orphan(&exec_ctx, em_fd, NULL, 0, "d");
+ grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, "d");
grpc_exec_ctx_finish(&exec_ctx);
destroy_change_data(&a);
destroy_change_data(&b);
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 76f455678c..9feac931a3 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -205,12 +205,6 @@ static void read_test(size_t num_bytes, size_t slice_size) {
grpc_exec_ctx_finish(&exec_ctx);
}
-void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, int success) {
- int *done = arg;
- *done = 1;
- grpc_pollset_kick(&g_pollset, NULL);
-}
-
/* Write to a socket until it fills up, then read from it using the grpc_tcp
API. */
static void large_read_test(size_t slice_size) {
@@ -389,6 +383,12 @@ static void write_test(size_t num_bytes, size_t slice_size) {
grpc_exec_ctx_finish(&exec_ctx);
}
+void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ int *done = arg;
+ *done = 1;
+ grpc_pollset_kick(&g_pollset, NULL);
+}
+
/* Do a read_test, then release fd and try to read/write again. */
static void release_fd_test(size_t num_bytes, size_t slice_size) {
int sv[2];
@@ -408,8 +408,6 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test");
- fd = grpc_tcp_get_fd(ep);
- GPR_ASSERT(fd == sv[1]);
grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@@ -436,7 +434,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_slice_buffer_destroy(&state.incoming);
- grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd_released_cb);
+ grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!fd_released_done) {
grpc_pollset_worker worker;
@@ -445,6 +443,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
GPR_ASSERT(fd_released_done == 1);
+ GPR_ASSERT(fd == sv[1]);
grpc_exec_ctx_finish(&exec_ctx);
written_bytes = fill_socket_partial(sv[0], num_bytes);