aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/lib/iomgr/endpoint.h2
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c24
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c24
-rw-r--r--src/core/lib/iomgr/ev_posix.c4
-rw-r--r--src/core/lib/iomgr/ev_posix.h6
-rw-r--r--src/core/lib/iomgr/tcp_posix.c2
-rw-r--r--test/core/iomgr/endpoint_tests.c60
-rw-r--r--test/core/iomgr/tcp_posix_test.c2
8 files changed, 107 insertions, 17 deletions
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 3877ceb1e2..f9808bbda1 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -82,7 +82,7 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep);
void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_slice_buffer *slices, grpc_closure *cb);
-/* Causes any pending read/write callbacks to run immediately with
+/* Causes any pending and future read/write callbacks to run immediately with
success==0 */
void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 943c404f91..ef8bbd6277 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -515,7 +515,9 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
- if (*st == CLOSURE_NOT_READY) {
+ if (fd->shutdown) {
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ } else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
@@ -557,13 +559,24 @@ static void set_read_notifier_pollset_locked(
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
- GPR_ASSERT(!fd->shutdown);
- fd->shutdown = 1;
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ /* only shutdown once */
+ if (!fd->shutdown) {
+ fd->shutdown = 1;
+ /* signal read/write closed to OS so that future operations fail */
+ shutdown(fd->fd, SHUT_RDWR);
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ }
gpr_mu_unlock(&fd->mu);
}
+static bool fd_is_shutdown(grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ bool r = fd->shutdown;
+ gpr_mu_unlock(&fd->mu);
+ return r;
+}
+
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
gpr_mu_lock(&fd->mu);
@@ -1938,6 +1951,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_wrapped_fd = fd_wrapped_fd,
.fd_orphan = fd_orphan,
.fd_shutdown = fd_shutdown,
+ .fd_is_shutdown = fd_is_shutdown,
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 0167999dad..6313032869 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -421,7 +421,9 @@ static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
- if (*st == CLOSURE_NOT_READY) {
+ if (fd->shutdown) {
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ } else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
@@ -463,13 +465,24 @@ static void set_read_notifier_pollset_locked(
static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
gpr_mu_lock(&fd->mu);
- GPR_ASSERT(!fd->shutdown);
- fd->shutdown = 1;
- set_ready_locked(exec_ctx, fd, &fd->read_closure);
- set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ /* only shutdown once */
+ if (!fd->shutdown) {
+ fd->shutdown = 1;
+ /* signal read/write closed to OS so that future operations fail */
+ shutdown(fd->fd, SHUT_RDWR);
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ }
gpr_mu_unlock(&fd->mu);
}
+static bool fd_is_shutdown(grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ bool r = fd->shutdown;
+ gpr_mu_unlock(&fd->mu);
+ return r;
+}
+
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
gpr_mu_lock(&fd->mu);
@@ -1172,6 +1185,7 @@ static const grpc_event_engine_vtable vtable = {
.fd_wrapped_fd = fd_wrapped_fd,
.fd_orphan = fd_orphan,
.fd_shutdown = fd_shutdown,
+ .fd_is_shutdown = fd_is_shutdown,
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 6477b05dcd..a17ee17945 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -153,6 +153,10 @@ void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
g_event_engine->fd_shutdown(exec_ctx, fd);
}
+bool grpc_fd_is_shutdown(grpc_fd *fd) {
+ return g_event_engine->fd_is_shutdown(fd);
+}
+
void grpc_fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
g_event_engine->fd_notify_on_read(exec_ctx, fd, closure);
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 344bf63438..53457aa9d5 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -55,6 +55,7 @@ typedef struct grpc_event_engine_vtable {
grpc_closure *closure);
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
+ bool (*fd_is_shutdown)(grpc_fd *fd);
grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx,
grpc_fd *fd);
@@ -116,7 +117,10 @@ int grpc_fd_wrapped_fd(grpc_fd *fd);
void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason);
-/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */
+/* Has grpc_fd_shutdown been called on an fd? */
+bool grpc_fd_is_shutdown(grpc_fd *fd);
+
+/* Cause any current and future callbacks to fail. */
void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
/* Register read interest, causing read_cb to be called once when fd becomes
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index e2869224f1..1bfc190f7a 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -408,7 +408,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_enqueue(exec_ctx, cb, true, NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, cb, !grpc_fd_is_shutdown(tcp->em_fd), NULL);
return;
}
tcp->outgoing_buffer = buf;
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index 6d15ecf141..0f26c0edfd 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -222,7 +222,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size;
read_and_write_test_write_handler(&exec_ctx, &state, 1);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_flush(&exec_ctx);
grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming,
&state.done_read);
@@ -233,7 +233,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_log(GPR_DEBUG, "shutdown write");
grpc_endpoint_shutdown(&exec_ctx, state.write_ep);
}
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
while (!state.read_done || !state.write_done) {
@@ -243,7 +243,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_now(GPR_CLOCK_MONOTONIC), deadline);
}
gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_flush(&exec_ctx);
end_test(config);
gpr_slice_buffer_destroy(&state.outgoing);
@@ -253,11 +253,64 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_exec_ctx_finish(&exec_ctx);
}
+static void inc_on_failure(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+ *(int *)arg += (success == false);
+}
+
+static void wait_for_fail_count(grpc_exec_ctx *exec_ctx, int *fail_count,
+ int want_fail_count) {
+ grpc_exec_ctx_flush(exec_ctx);
+ for (int i = 0; i < 5 && *fail_count < want_fail_count; i++) {
+ grpc_pollset_worker *worker = NULL;
+ gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
+ gpr_timespec deadline =
+ gpr_time_add(now, gpr_time_from_seconds(1, GPR_TIMESPAN));
+ gpr_mu_lock(g_mu);
+ grpc_pollset_work(exec_ctx, g_pollset, &worker, now, deadline);
+ gpr_mu_unlock(g_mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ }
+ GPR_ASSERT(*fail_count == want_fail_count);
+}
+
+static void multiple_shutdown_test(grpc_endpoint_test_config config) {
+ grpc_endpoint_test_fixture f =
+ begin_test(config, "multiple_shutdown_test", 128);
+ int fail_count = 0;
+
+ gpr_slice_buffer slice_buffer;
+ gpr_slice_buffer_init(&slice_buffer);
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset);
+ grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer,
+ grpc_closure_create(inc_on_failure, &fail_count));
+ wait_for_fail_count(&exec_ctx, &fail_count, 0);
+ grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
+ wait_for_fail_count(&exec_ctx, &fail_count, 1);
+ grpc_endpoint_read(&exec_ctx, f.client_ep, &slice_buffer,
+ grpc_closure_create(inc_on_failure, &fail_count));
+ wait_for_fail_count(&exec_ctx, &fail_count, 2);
+ gpr_slice_buffer_add(&slice_buffer, gpr_slice_from_copied_string("a"));
+ grpc_endpoint_write(&exec_ctx, f.client_ep, &slice_buffer,
+ grpc_closure_create(inc_on_failure, &fail_count));
+ wait_for_fail_count(&exec_ctx, &fail_count, 3);
+ grpc_endpoint_shutdown(&exec_ctx, f.client_ep);
+ wait_for_fail_count(&exec_ctx, &fail_count, 3);
+
+ gpr_slice_buffer_destroy(&slice_buffer);
+
+ grpc_endpoint_destroy(&exec_ctx, f.client_ep);
+ grpc_endpoint_destroy(&exec_ctx, f.server_ep);
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
void grpc_endpoint_tests(grpc_endpoint_test_config config,
grpc_pollset *pollset, gpr_mu *mu) {
size_t i;
g_pollset = pollset;
g_mu = mu;
+ multiple_shutdown_test(config);
read_and_write_test(config, 10000000, 100000, 8192, 0);
read_and_write_test(config, 1000000, 100000, 1, 0);
read_and_write_test(config, 100000000, 100000, 1, 1);
@@ -265,4 +318,5 @@ void grpc_endpoint_tests(grpc_endpoint_test_config config,
read_and_write_test(config, 40320, i, i, 0);
}
g_pollset = NULL;
+ g_mu = NULL;
}
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 19ba258303..1112e8a3aa 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -517,8 +517,8 @@ int main(int argc, char **argv) {
grpc_init();
g_pollset = gpr_malloc(grpc_pollset_size());
grpc_pollset_init(g_pollset, &g_mu);
- run_tests();
grpc_endpoint_tests(configs[0], g_pollset, g_mu);
+ run_tests();
grpc_closure_init(&destroyed, destroy_pollset, g_pollset);
grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
grpc_exec_ctx_finish(&exec_ctx);