diff options
author | Craig Tiller <ctiller@google.com> | 2016-07-15 07:53:33 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-07-15 07:53:33 -0700 |
commit | 64d8024a1122b33ff1b335530d2d89ce210160e2 (patch) | |
tree | 8a09dc406e0fc355d69b87b12707e8744986414c /src/core/lib/iomgr | |
parent | c69d27b67d16f635aa234f28cc51608b609e0d1d (diff) | |
parent | b955636d412b1734968519cfd561d09cf6a1bd53 (diff) |
Merge github.com:grpc/grpc into grand-unified-closures
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 26 | ||||
-rw-r--r-- | src/core/lib/iomgr/iomgr.c | 3 | ||||
-rw-r--r-- | src/core/lib/iomgr/network_status_tracker.c | 7 | ||||
-rw-r--r-- | src/core/lib/iomgr/network_status_tracker.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue.h | 23 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.h | 3 |
6 files changed, 49 insertions, 17 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 4282d01a2b..6a63c4d1d1 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -517,14 +517,10 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, done: if (*error != GRPC_ERROR_NONE) { - if (pi->epoll_fd < 0) { - close(pi->epoll_fd); - } if (pi->workqueue != NULL) { GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); } - gpr_mu_destroy(&pi->mu); - gpr_free(pi); + polling_island_delete(exec_ctx, pi); pi = NULL; } return pi; @@ -533,9 +529,9 @@ done: static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { GPR_ASSERT(pi->fd_cnt == 0); - gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); - - close(pi->epoll_fd); + if (pi->epoll_fd >= 0) { + close(pi->epoll_fd); + } gpr_mu_destroy(&pi->mu); gpr_free(pi->fds); gpr_free(pi); @@ -936,6 +932,10 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_mu_unlock(&fd->mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ if (unref_pi != NULL) { + /* Unref stale polling island here, outside the fd lock above. + The polling island owns a workqueue which owns an fd, and unreffing + inside the lock can cause an eventual lock loop that makes TSAN very + unhappy. */ PI_UNREF(exec_ctx, unref_pi, "fd_orphan"); } GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); @@ -1559,9 +1559,19 @@ retry: if (fd->polling_island == pollset->polling_island) { pi_new = fd->polling_island; if (pi_new == NULL) { + /* Unlock before creating a new polling island: the polling island will + create a workqueue which creates a file descriptor, and holding an fd + lock here can eventually cause a loop to appear to TSAN (making it + unhappy). We don't think it's a real loop (there's an epoch point where + that loop possibility disappears), but the advantages of keeping TSAN + happy outweigh any performance advantage we might have by keeping the + lock held. */ gpr_mu_unlock(&fd->mu); pi_new = polling_island_create(exec_ctx, fd, &error); gpr_mu_lock(&fd->mu); + /* Need to reverify any assumptions made between the initial lock and + getting to this branch: if they've changed, we need to throw away our + work and figure things out again. */ if (fd->polling_island != NULL) { GRPC_POLLING_TRACE( "pollset_add_fd: Raced creating new polling island. pi_new: %p " diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 89292a153e..d67d388b8c 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" @@ -62,6 +63,7 @@ void grpc_iomgr_init(void) { grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; + grpc_network_status_init(); grpc_iomgr_platform_init(); } @@ -140,6 +142,7 @@ void grpc_iomgr_shutdown(void) { grpc_iomgr_platform_shutdown(); grpc_exec_ctx_global_shutdown(); + grpc_network_status_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c index ccbe136db9..90c074b007 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -42,9 +42,8 @@ typedef struct endpoint_ll_node { static endpoint_ll_node *head = NULL; static gpr_mu g_endpoint_mutex; -static gpr_once g_once_init = GPR_ONCE_INIT; -static void destroy_network_status_monitor(void) { +void grpc_network_status_shutdown(void) { if (head != NULL) { gpr_log(GPR_ERROR, "Memory leaked as all network endpoints were not shut down"); @@ -52,14 +51,12 @@ static void destroy_network_status_monitor(void) { gpr_mu_destroy(&g_endpoint_mutex); } -static void initialize_network_status_monitor(void) { +void grpc_network_status_init(void) { gpr_mu_init(&g_endpoint_mutex); - atexit(destroy_network_status_monitor); // TODO(makarandd): Install callback with OS to monitor network status. } void grpc_network_status_register_endpoint(grpc_endpoint *ep) { - gpr_once_init(&g_once_init, initialize_network_status_monitor); gpr_mu_lock(&g_endpoint_mutex); if (head == NULL) { head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node)); diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h index 74a1aa8135..67cb645f44 100644 --- a/src/core/lib/iomgr/network_status_tracker.h +++ b/src/core/lib/iomgr/network_status_tracker.h @@ -35,7 +35,11 @@ #define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H #include "src/core/lib/iomgr/endpoint.h" +void grpc_network_status_init(void); +void grpc_network_status_shutdown(void); + void grpc_network_status_register_endpoint(grpc_endpoint *ep); void grpc_network_status_unregister_endpoint(grpc_endpoint *ep); void grpc_network_status_shutdown_all_endpoints(); + #endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */ diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 416618e258..b2805dc66c 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -50,12 +50,20 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ +/* Reference counting functions. Use the macro's always + (GRPC_WORKQUEUE_{REF,UNREF}). + + Pass in a descriptive reason string for reffing/unreffing as the last + argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that + string will be printed alongside the refcount. When it is not defined, the + string will be discarded at compilation time. */ + //#define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #define GRPC_WORKQUEUE_REF(p, r) \ (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p)) -#define GRPC_WORKQUEUE_UNREF(cl, p, r) \ - grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r)) +#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \ + grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, @@ -67,7 +75,16 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #endif -/** Add a work item to a workqueue */ +/** Add a work item to a workqueue. Items added to a work queue will be started + in approximately the order they were enqueued, on some thread that may or + may not be the current thread. Successive closures enqueued onto a workqueue + MAY be executed concurrently. + + It is generally more expensive to add a closure to a workqueue than to the + execution context, both in terms of CPU work and in execution latency. + + Use work queues when it's important that other threads be given a chance to + tackle some workload. */ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error); diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h index c69ae8a941..03ee21cef7 100644 --- a/src/core/lib/iomgr/workqueue_posix.h +++ b/src/core/lib/iomgr/workqueue_posix.h @@ -53,7 +53,8 @@ struct grpc_workqueue { grpc_closure read_closure; }; -/** Create a work queue */ +/** Create a work queue. Returns an error if creation fails. If creation + succeeds, sets *workqueue to point to it. */ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, grpc_workqueue **workqueue); |