aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/endpoint.c5
-rw-r--r--src/core/lib/iomgr/endpoint.h5
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c17
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c17
-rw-r--r--src/core/lib/iomgr/ev_posix.c4
-rw-r--r--src/core/lib/iomgr/ev_posix.h4
-rw-r--r--src/core/lib/iomgr/network_status_tracker.c3
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c3
-rw-r--r--src/core/lib/iomgr/tcp_posix.c5
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c6
-rw-r--r--src/core/lib/iomgr/tcp_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_windows.c27
-rw-r--r--src/core/lib/iomgr/udp_server.c3
13 files changed, 68 insertions, 35 deletions
diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c
index 2d300f4560..bf6e98146a 100644
--- a/src/core/lib/iomgr/endpoint.c
+++ b/src/core/lib/iomgr/endpoint.c
@@ -54,8 +54,9 @@ void grpc_endpoint_add_to_pollset_set(grpc_exec_ctx* exec_ctx,
ep->vtable->add_to_pollset_set(exec_ctx, ep, pollset_set);
}
-void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
- ep->vtable->shutdown(exec_ctx, ep);
+void grpc_endpoint_shutdown(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep,
+ grpc_error* why) {
+ ep->vtable->shutdown(exec_ctx, ep, why);
}
void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) {
diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h
index 1609b64f2b..740357ecc5 100644
--- a/src/core/lib/iomgr/endpoint.h
+++ b/src/core/lib/iomgr/endpoint.h
@@ -57,7 +57,7 @@ struct grpc_endpoint_vtable {
grpc_pollset *pollset);
void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_pollset_set *pollset);
- void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
+ void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *why);
void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
grpc_resource_user *(*get_resource_user)(grpc_endpoint *ep);
char *(*get_peer)(grpc_endpoint *ep);
@@ -96,7 +96,8 @@ void grpc_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Causes any pending and future read/write callbacks to run immediately with
success==0 */
-void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
+void grpc_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_error *why);
void grpc_endpoint_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep);
/* Add an endpoint to a pollset, so that when the pollset is polled, events from
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 39b4d9cbd7..51842fc208 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -143,6 +143,7 @@ struct grpc_fd {
/* Indicates that the fd is shutdown and that any pending read/write closures
should fail */
bool shutdown;
+ grpc_error *shutdown_error; /* reason for shutdown: set iff shutdown==true */
/* The fd is either closed or we relinquished control of it. In either cases,
this indicates that the 'fd' on this structure is no longer valid */
@@ -907,6 +908,7 @@ static void unref_by(grpc_fd *fd, int n) {
fd->freelist_next = fd_freelist;
fd_freelist = fd;
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_mu_unlock(&fd_freelist_mu);
} else {
@@ -1058,11 +1060,11 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
GRPC_ERROR_UNREF(error);
}
-static grpc_error *fd_shutdown_error(bool shutdown) {
- if (!shutdown) {
+static grpc_error *fd_shutdown_error(grpc_fd *fd) {
+ if (!fd->shutdown) {
return GRPC_ERROR_NONE;
} else {
- return GRPC_ERROR_CREATE("FD shutdown");
+ return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
}
}
@@ -1076,7 +1078,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@@ -1098,7 +1100,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
+ grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -1123,17 +1125,20 @@ static bool fd_is_shutdown(grpc_fd *fd) {
}
/* Might be called multiple times */
-static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
gpr_mu_lock(&fd->po.mu);
/* Do the actual shutdown only once */
if (!fd->shutdown) {
fd->shutdown = true;
+ fd->shutdown_error = why;
shutdown(fd->fd, SHUT_RDWR);
/* Flush any pending read and write closures. Since fd->shutdown is 'true'
at this point, the closures would be called with 'success = false' */
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ } else {
+ GRPC_ERROR_UNREF(why);
}
gpr_mu_unlock(&fd->po.mu);
}
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 9477ac3688..ca12932219 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -82,6 +82,7 @@ struct grpc_fd {
int shutdown;
int closed;
int released;
+ grpc_error *shutdown_error;
/* The watcher list.
@@ -306,6 +307,7 @@ static void unref_by(grpc_fd *fd, int n) {
if (old == n) {
gpr_mu_destroy(&fd->mu);
grpc_iomgr_unregister_object(&fd->iomgr_object);
+ if (fd->shutdown) GRPC_ERROR_UNREF(fd->shutdown_error);
gpr_free(fd);
} else {
GPR_ASSERT(old > n);
@@ -444,11 +446,11 @@ static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
-static grpc_error *fd_shutdown_error(bool shutdown) {
- if (!shutdown) {
+static grpc_error *fd_shutdown_error(grpc_fd *fd) {
+ if (!fd->shutdown) {
return GRPC_ERROR_NONE;
} else {
- return GRPC_ERROR_CREATE("FD shutdown");
+ return GRPC_ERROR_CREATE_REFERENCING("FD shutdown", &fd->shutdown_error, 1);
}
}
@@ -462,7 +464,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@@ -485,7 +487,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
+ grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -496,15 +498,18 @@ static void set_read_notifier_pollset_locked(
fd->read_notifier_pollset = read_notifier_pollset;
}
-static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
gpr_mu_lock(&fd->mu);
/* only shutdown once */
if (!fd->shutdown) {
fd->shutdown = 1;
+ fd->shutdown_error = why;
/* signal read/write closed to OS so that future operations fail */
shutdown(fd->fd, SHUT_RDWR);
set_ready_locked(exec_ctx, fd, &fd->read_closure);
set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ } else {
+ GRPC_ERROR_UNREF(why);
}
gpr_mu_unlock(&fd->mu);
}
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index c106ba5400..5bb55631d6 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -162,8 +162,8 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
g_event_engine->fd_orphan(exec_ctx, fd, on_done, release_fd, reason);
}
-void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- g_event_engine->fd_shutdown(exec_ctx, fd);
+void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
+ g_event_engine->fd_shutdown(exec_ctx, fd, why);
}
bool grpc_fd_is_shutdown(grpc_fd *fd) {
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index 1068a4bad5..a589efdeec 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -51,7 +51,7 @@ typedef struct grpc_event_engine_vtable {
int (*fd_wrapped_fd)(grpc_fd *fd);
void (*fd_orphan)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
int *release_fd, const char *reason);
- void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
+ void (*fd_shutdown)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
void (*fd_notify_on_read)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure);
void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
@@ -140,7 +140,7 @@ void grpc_fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *on_done,
bool grpc_fd_is_shutdown(grpc_fd *fd);
/* Cause any current and future callbacks to fail. */
-void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd);
+void grpc_fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why);
/* Register read interest, causing read_cb to be called once when fd becomes
readable, on deadline specified by deadline, or on shutdown triggered by
diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c
index a5ca9ed2c3..1601a39002 100644
--- a/src/core/lib/iomgr/network_status_tracker.c
+++ b/src/core/lib/iomgr/network_status_tracker.c
@@ -117,7 +117,8 @@ void grpc_network_status_shutdown_all_endpoints() {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
for (endpoint_ll_node *curr = head; curr != NULL; curr = curr->next) {
- curr->ep->vtable->shutdown(&exec_ctx, curr->ep);
+ curr->ep->vtable->shutdown(&exec_ctx, curr->ep,
+ GRPC_ERROR_CREATE("Network unavailable"));
}
gpr_mu_unlock(&g_endpoint_mutex);
grpc_exec_ctx_finish(&exec_ctx);
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index 16b0f4e73c..0144192b71 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -121,7 +121,8 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
}
gpr_mu_lock(&ac->mu);
if (ac->fd != NULL) {
- grpc_fd_shutdown(exec_ctx, ac->fd);
+ grpc_fd_shutdown(exec_ctx, ac->fd,
+ GRPC_ERROR_CREATE("connect() timed out"));
}
done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index a33e63e845..a4381f8fc9 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -119,9 +119,10 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
grpc_error *error);
-static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_error *why) {
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_fd_shutdown(exec_ctx, tcp->em_fd);
+ grpc_fd_shutdown(exec_ctx, tcp->em_fd, why);
grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
}
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 20efb678b2..e9e7511c9c 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -276,7 +276,8 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->active_ports) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
- grpc_fd_shutdown(exec_ctx, sp->emfd);
+ grpc_fd_shutdown(exec_ctx, sp->emfd,
+ GRPC_ERROR_CREATE("Server destroyed"));
}
gpr_mu_unlock(&s->mu);
} else {
@@ -773,7 +774,8 @@ void grpc_tcp_server_shutdown_listeners(grpc_exec_ctx *exec_ctx,
if (s->active_ports) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
- grpc_fd_shutdown(exec_ctx, sp->emfd);
+ grpc_fd_shutdown(exec_ctx, sp->emfd,
+ GRPC_ERROR_CREATE("Server shutdown"));
}
}
gpr_mu_unlock(&s->mu);
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 7f4ea49a1c..5fb398c50b 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -298,13 +298,15 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
static void shutdown_callback(uv_shutdown_t *req, int status) {}
-static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_error *why) {
grpc_tcp *tcp = (grpc_tcp *)ep;
if (!tcp->shutting_down) {
tcp->shutting_down = true;
uv_shutdown_t *req = &tcp->shutdown_req;
uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
}
+ GRPC_ERROR_UNREF(why);
}
static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index 84f791ba07..6c413971e3 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -116,6 +116,7 @@ typedef struct grpc_tcp {
to protect ourselves when requesting a shutdown. */
gpr_mu mu;
int shutting_down;
+ grpc_error *shutdown_error;
char *peer_string;
} grpc_tcp;
@@ -125,6 +126,7 @@ static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
gpr_mu_destroy(&tcp->mu);
gpr_free(tcp->peer_string);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
+ if (tcp->shutting_down) GRPC_ERROR_UNREF(tcp->shutdown_error);
gpr_free(tcp);
}
@@ -182,7 +184,10 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
grpc_slice_buffer_add(tcp->read_slices, sub);
} else {
grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
- error = GRPC_ERROR_CREATE("End of TCP stream");
+ error = tcp->shutting_down
+ ? GRPC_ERROR_CREATE_REFERENCING("TCP stream shutting down",
+ &tcp->shutdown_error, 1)
+ : GRPC_ERROR_CREATE("End of TCP stream");
}
}
}
@@ -203,8 +208,9 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
- grpc_closure_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"));
+ grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING(
+ "TCP socket is shutting down",
+ &tcp->shutdown_error, 1));
return;
}
@@ -291,8 +297,9 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t len;
if (tcp->shutting_down) {
- grpc_closure_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"));
+ grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING(
+ "TCP socket is shutting down",
+ &tcp->shutdown_error, 1));
return;
}
@@ -373,12 +380,18 @@ static void win_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
we're not going to protect against these. However the IO Completion Port
callback will happen from another thread, so we need to protect against
concurrent access of the data structure in that regard. */
-static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
+static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
+ grpc_error *why) {
grpc_tcp *tcp = (grpc_tcp *)ep;
gpr_mu_lock(&tcp->mu);
/* At that point, what may happen is that we're already inside the IOCP
callback. See the comments in on_read and on_write. */
- tcp->shutting_down = 1;
+ if (!tcp->shutting_down) {
+ tcp->shutting_down = 1;
+ tcp->shutdown_error = why;
+ } else {
+ GRPC_ERROR_UNREF(why);
+ }
grpc_winsocket_shutdown(tcp->socket);
gpr_mu_unlock(&tcp->mu);
grpc_resource_user_shutdown(exec_ctx, tcp->resource_user);
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index dfbd295c91..3b23b47d4f 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -203,7 +203,8 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
for (sp = s->head; sp; sp = sp->next) {
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(sp->emfd);
- grpc_fd_shutdown(exec_ctx, sp->emfd);
+ grpc_fd_shutdown(exec_ctx, sp->emfd,
+ GRPC_ERROR_CREATE("Server destroyed"));
}
gpr_mu_unlock(&s->mu);
} else {