diff options
author | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2015-12-02 08:21:49 -0800 |
---|---|---|
committer | Nicolas Noble <nicolasnoble@users.noreply.github.com> | 2015-12-02 08:21:49 -0800 |
commit | 023be1fbfb1e17e39dd2b6f60cdbe77e5155b30d (patch) | |
tree | 192842cd5a08d1f193dc0bd7707298ecb679179b | |
parent | f8766235dd00bf545e09220e5e5d81bae1ce34aa (diff) | |
parent | 5d850377482cf0925a46f286ce8a3c71c640d7c5 (diff) |
Merge pull request #4222 from yang-g/release_fd
Support release fd from tcp endpoint
-rw-r--r-- | src/core/iomgr/fd_posix.c | 17 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 16 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 2 | ||||
-rw-r--r-- | test/core/iomgr/fd_posix_test.c | 8 | ||||
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 72 |
10 files changed, 118 insertions, 15 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 7ff80e6cf8..2be0ea235f 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -207,14 +207,21 @@ static int has_watchers(grpc_fd *fd) { } void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, - const char *reason) { + int *release_fd, const char *reason) { fd->on_done_closure = on_done; - shutdown(fd->fd, SHUT_RDWR); + fd->released = release_fd != NULL; + if (!fd->released) { + shutdown(fd->fd, SHUT_RDWR); + } else { + *release_fd = fd->fd; + } gpr_mu_lock(&fd->mu); REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ if (!has_watchers(fd)) { fd->closed = 1; - close(fd->fd); + if (!fd->released) { + close(fd->fd); + } grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1); } else { wake_all_watchers_locked(fd); @@ -406,7 +413,9 @@ void grpc_fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, } if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { fd->closed = 1; - close(fd->fd); + if (!fd->released) { + close(fd->fd); + } grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, 1); } gpr_mu_unlock(&fd->mu); diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index dc917ebbc0..d628ef3aaf 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -62,6 +62,7 @@ struct grpc_fd { gpr_mu mu; int shutdown; int closed; + int released; /* The watcher list. @@ -107,11 +108,12 @@ grpc_fd *grpc_fd_create(int fd, const char *name); /* Releases fd to be asynchronously destroyed. on_done is called when the underlying file descriptor is definitely close()d. If on_done is NULL, no callback will be made. + If release_fd is not NULL, it's set to fd and fd will not be closed. Requires: *fd initialized; no outstanding notify_on_read or notify_on_write. MUST NOT be called with a pollset lock taken */ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done, - const char *reason); + int *release_fd, const char *reason); /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index abd6315ca1..d9d24ee9a3 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -196,7 +196,7 @@ static void on_writable(grpc_exec_ctx *exec_ctx, void *acp, int success) { finish: if (fd != NULL) { grpc_pollset_set_del_fd(exec_ctx, ac->interested_parties, fd); - grpc_fd_orphan(exec_ctx, fd, NULL, "tcp_client_orphan"); + grpc_fd_orphan(exec_ctx, fd, NULL, NULL, "tcp_client_orphan"); fd = NULL; } done = (--ac->refs == 0); @@ -265,7 +265,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, if (errno != EWOULDBLOCK && errno != EINPROGRESS) { gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno)); - grpc_fd_orphan(exec_ctx, fdobj, NULL, "tcp_client_connect_error"); + grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); grpc_exec_ctx_enqueue(exec_ctx, closure, 0); goto done; } diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 915553d509..f3be41aa57 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -90,6 +90,8 @@ typedef struct { grpc_closure *read_cb; grpc_closure *write_cb; + grpc_closure *release_fd_cb; + int *release_fd; grpc_closure read_closure; grpc_closure write_closure; @@ -108,7 +110,8 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { } static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { - grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan"); + grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, + "tcp_unref_orphan"); gpr_slice_buffer_destroy(&tcp->last_read_buffer); gpr_free(tcp->peer_string); gpr_free(tcp); @@ -452,6 +455,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; + tcp->release_fd_cb = NULL; + tcp->release_fd = NULL; tcp->incoming_buffer = NULL; tcp->slice_size = slice_size; tcp->iov_size = 1; @@ -468,4 +473,13 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, return &tcp->base; } +void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + int *fd, grpc_closure *done) { + grpc_tcp *tcp = (grpc_tcp *)ep; + GPR_ASSERT(ep->vtable == &vtable); + tcp->release_fd = fd; + tcp->release_fd_cb = done; + TCP_UNREF(exec_ctx, tcp, "destroy"); +} + #endif diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index 40b3ae2679..b554983ae1 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -56,4 +56,10 @@ extern int grpc_tcp_trace; grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size, const char *peer_string); +/* Destroy the tcp endpoint without closing its fd. *fd will be set and done + * will be called when the endpoint is destroyed. + * Requires: ep must be a tcp endpoint and fd must not be NULL. */ +void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, + int *fd, grpc_closure *done); + #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 0ece77c4e8..a89ee02d34 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -193,7 +193,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { } sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; - grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, + grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, "tcp_listener_shutdown"); } gpr_mu_unlock(&s->mu); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 9903e970e6..782fbd9f46 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -179,7 +179,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { } sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; - grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, + grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, "udp_listener_shutdown"); } gpr_mu_unlock(&s->mu); diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index c087b887b8..2e30178131 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -115,7 +115,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success) { /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); - grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, "destroy"); + grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy"); gpr_free(workqueue); } else { gpr_mu_lock(&workqueue->mu); diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index f592f63ba9..4be6957a83 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -121,7 +121,7 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */ int success) { session *se = arg; server *sv = se->sv; - grpc_fd_orphan(exec_ctx, se->em_fd, NULL, "a"); + grpc_fd_orphan(exec_ctx, se->em_fd, NULL, NULL, "a"); gpr_free(se); /* Start to shutdown listen fd. */ grpc_fd_shutdown(exec_ctx, sv->em_fd); @@ -177,7 +177,7 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */, int success) { server *sv = arg; - grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, "b"); + grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, "b"); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); sv->done = 1; @@ -294,7 +294,7 @@ static void client_init(client *cl) { static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*client */, int success) { client *cl = arg; - grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, "c"); + grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c"); cl->done = 1; grpc_pollset_kick(&g_pollset, NULL); } @@ -503,7 +503,7 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_fd_orphan(&exec_ctx, em_fd, NULL, "d"); + grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, "d"); grpc_exec_ctx_finish(&exec_ctx); destroy_change_data(&a); destroy_change_data(&b); diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index f676454b7f..9feac931a3 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -383,6 +383,76 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_exec_ctx_finish(&exec_ctx); } +void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, int success) { + int *done = arg; + *done = 1; + grpc_pollset_kick(&g_pollset, NULL); +} + +/* Do a read_test, then release fd and try to read/write again. */ +static void release_fd_test(size_t num_bytes, size_t slice_size) { + int sv[2]; + grpc_endpoint *ep; + struct read_socket_state state; + size_t written_bytes; + int fd; + gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_closure fd_released_cb; + int fd_released_done = 0; + grpc_closure_init(&fd_released_cb, &on_fd_released, &fd_released_done); + + gpr_log(GPR_INFO, "Release fd read_test of size %d, slice size %d", num_bytes, + slice_size); + + create_sockets(sv); + + ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); + + written_bytes = fill_socket_partial(sv[0], num_bytes); + gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); + + state.ep = ep; + state.read_bytes = 0; + state.target_read_bytes = written_bytes; + gpr_slice_buffer_init(&state.incoming); + grpc_closure_init(&state.read_cb, read_cb, &state); + + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); + + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + while (state.read_bytes < state.target_read_bytes) { + grpc_pollset_worker worker; + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + } + GPR_ASSERT(state.read_bytes == state.target_read_bytes); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + + gpr_slice_buffer_destroy(&state.incoming); + grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + while (!fd_released_done) { + grpc_pollset_worker worker; + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + GPR_ASSERT(fd_released_done == 1); + GPR_ASSERT(fd == sv[1]); + grpc_exec_ctx_finish(&exec_ctx); + + written_bytes = fill_socket_partial(sv[0], num_bytes); + drain_socket_blocking(fd, written_bytes, written_bytes); + written_bytes = fill_socket_partial(fd, num_bytes); + drain_socket_blocking(sv[0], written_bytes, written_bytes); + close(fd); +} + void run_tests(void) { size_t i = 0; @@ -402,6 +472,8 @@ void run_tests(void) { for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { write_test(40320, i); } + + release_fd_test(100, 8192); } static void clean_up(void) {} |