aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c26
-rw-r--r--src/core/lib/iomgr/iomgr.c3
-rw-r--r--src/core/lib/iomgr/network_status_tracker.c7
-rw-r--r--src/core/lib/iomgr/network_status_tracker.h4
-rw-r--r--src/core/lib/iomgr/workqueue.h23
-rw-r--r--src/core/lib/iomgr/workqueue_posix.h3
6 files changed, 49 insertions, 17 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 4282d01a2b..6a63c4d1d1 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -517,14 +517,10 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
done:
if (*error != GRPC_ERROR_NONE) {
- if (pi->epoll_fd < 0) {
- close(pi->epoll_fd);
- }
if (pi->workqueue != NULL) {
GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island");
}
- gpr_mu_destroy(&pi->mu);
- gpr_free(pi);
+ polling_island_delete(exec_ctx, pi);
pi = NULL;
}
return pi;
@@ -533,9 +529,9 @@ done:
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) {
GPR_ASSERT(pi->fd_cnt == 0);
- gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL);
-
- close(pi->epoll_fd);
+ if (pi->epoll_fd >= 0) {
+ close(pi->epoll_fd);
+ }
gpr_mu_destroy(&pi->mu);
gpr_free(pi->fds);
gpr_free(pi);
@@ -936,6 +932,10 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
gpr_mu_unlock(&fd->mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
if (unref_pi != NULL) {
+ /* Unref stale polling island here, outside the fd lock above.
+ The polling island owns a workqueue which owns an fd, and unreffing
+ inside the lock can cause an eventual lock loop that makes TSAN very
+ unhappy. */
PI_UNREF(exec_ctx, unref_pi, "fd_orphan");
}
GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error));
@@ -1559,9 +1559,19 @@ retry:
if (fd->polling_island == pollset->polling_island) {
pi_new = fd->polling_island;
if (pi_new == NULL) {
+ /* Unlock before creating a new polling island: the polling island will
+ create a workqueue which creates a file descriptor, and holding an fd
+ lock here can eventually cause a loop to appear to TSAN (making it
+ unhappy). We don't think it's a real loop (there's an epoch point where
+ that loop possibility disappears), but the advantages of keeping TSAN
+ happy outweigh any performance advantage we might have by keeping the
+ lock held. */
gpr_mu_unlock(&fd->mu);
pi_new = polling_island_create(exec_ctx, fd, &error);
gpr_mu_lock(&fd->mu);
+ /* Need to reverify any assumptions made between the initial lock and
+ getting to this branch: if they've changed, we need to throw away our
+ work and figure things out again. */
if (fd->polling_island != NULL) {
GRPC_POLLING_TRACE(
"pollset_add_fd: Raced creating new polling island. pi_new: %p "
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 89292a153e..d67d388b8c 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -45,6 +45,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/support/env.h"
#include "src/core/lib/support/string.h"
@@ -62,6 +63,7 @@ void grpc_iomgr_init(void) {
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
+ grpc_network_status_init();
grpc_iomgr_platform_init();
}
@@ -140,6 +142,7 @@ void grpc_iomgr_shutdown(void) {
grpc_iomgr_platform_shutdown();
grpc_exec_ctx_global_shutdown();
+ grpc_network_status_shutdown();
gpr_mu_destroy(&g_mu);
gpr_cv_destroy(&g_rcv);
}
diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c
index ccbe136db9..90c074b007 100644
--- a/src/core/lib/iomgr/network_status_tracker.c
+++ b/src/core/lib/iomgr/network_status_tracker.c
@@ -42,9 +42,8 @@ typedef struct endpoint_ll_node {
static endpoint_ll_node *head = NULL;
static gpr_mu g_endpoint_mutex;
-static gpr_once g_once_init = GPR_ONCE_INIT;
-static void destroy_network_status_monitor(void) {
+void grpc_network_status_shutdown(void) {
if (head != NULL) {
gpr_log(GPR_ERROR,
"Memory leaked as all network endpoints were not shut down");
@@ -52,14 +51,12 @@ static void destroy_network_status_monitor(void) {
gpr_mu_destroy(&g_endpoint_mutex);
}
-static void initialize_network_status_monitor(void) {
+void grpc_network_status_init(void) {
gpr_mu_init(&g_endpoint_mutex);
- atexit(destroy_network_status_monitor);
// TODO(makarandd): Install callback with OS to monitor network status.
}
void grpc_network_status_register_endpoint(grpc_endpoint *ep) {
- gpr_once_init(&g_once_init, initialize_network_status_monitor);
gpr_mu_lock(&g_endpoint_mutex);
if (head == NULL) {
head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node));
diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h
index 74a1aa8135..67cb645f44 100644
--- a/src/core/lib/iomgr/network_status_tracker.h
+++ b/src/core/lib/iomgr/network_status_tracker.h
@@ -35,7 +35,11 @@
#define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H
#include "src/core/lib/iomgr/endpoint.h"
+void grpc_network_status_init(void);
+void grpc_network_status_shutdown(void);
+
void grpc_network_status_register_endpoint(grpc_endpoint *ep);
void grpc_network_status_unregister_endpoint(grpc_endpoint *ep);
void grpc_network_status_shutdown_all_endpoints();
+
#endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 416618e258..b2805dc66c 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -50,12 +50,20 @@
/* grpc_workqueue is forward declared in exec_ctx.h */
+/* Reference counting functions. Use the macro's always
+ (GRPC_WORKQUEUE_{REF,UNREF}).
+
+ Pass in a descriptive reason string for reffing/unreffing as the last
+ argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that
+ string will be printed alongside the refcount. When it is not defined, the
+ string will be discarded at compilation time. */
+
//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG
#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
#define GRPC_WORKQUEUE_REF(p, r) \
(grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p))
-#define GRPC_WORKQUEUE_UNREF(cl, p, r) \
- grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r))
+#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \
+ grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r))
void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line,
const char *reason);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
@@ -67,7 +75,16 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
-/** Add a work item to a workqueue */
+/** Add a work item to a workqueue. Items added to a work queue will be started
+ in approximately the order they were enqueued, on some thread that may or
+ may not be the current thread. Successive closures enqueued onto a workqueue
+ MAY be executed concurrently.
+
+ It is generally more expensive to add a closure to a workqueue than to the
+ execution context, both in terms of CPU work and in execution latency.
+
+ Use work queues when it's important that other threads be given a chance to
+ tackle some workload. */
void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
grpc_closure *closure, grpc_error *error);
diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h
index c69ae8a941..03ee21cef7 100644
--- a/src/core/lib/iomgr/workqueue_posix.h
+++ b/src/core/lib/iomgr/workqueue_posix.h
@@ -53,7 +53,8 @@ struct grpc_workqueue {
grpc_closure read_closure;
};
-/** Create a work queue */
+/** Create a work queue. Returns an error if creation fails. If creation
+ succeeds, sets *workqueue to point to it. */
grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx,
grpc_workqueue **workqueue);