aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr/ev_epoll_thread_pool_linux.c')
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c229
1 files changed, 37 insertions, 192 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index bb44321922..49be72c03e 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2017, Google Inc.
- * All rights reserved.
+ * Copyright 2017 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -61,7 +46,6 @@
#include "src/core/lib/iomgr/lockfree_event.h"
#include "src/core/lib/iomgr/timer.h"
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
-#include "src/core/lib/iomgr/workqueue.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
@@ -109,23 +93,20 @@ static void fd_global_shutdown(void);
* epoll set Declarations
*/
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#ifndef NDEBUG
#define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__)
#define EPS_UNREF(exec_ctx, p, r) \
eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__)
-#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */
+#else
#define EPS_ADD_REF(p, r) eps_add_ref((p))
#define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p))
-#endif /* !defined(GRPC_EPS_REF_COUNT_DEBUG) */
+#endif
-/* This is also used as grpc_workqueue (by directly casting it) */
typedef struct epoll_set {
- grpc_closure_scheduler workqueue_scheduler;
-
/* Mutex poller should acquire to poll this. This enforces that only one
* poller can be polling on epoll_set at any time */
gpr_mu mu;
@@ -139,15 +120,6 @@ typedef struct epoll_set {
/* Number of threads currently polling on this epoll set*/
gpr_atm poller_count;
- /* Mutex guarding the read end of the workqueue (must be held to pop from
- * workqueue_items) */
- gpr_mu workqueue_read_mu;
- /* Queue of closures to be executed */
- gpr_mpscq workqueue_items;
- /* Count of items in workqueue_items */
- gpr_atm workqueue_item_count;
- /* Wakeup fd used to wake pollers to check the contents of workqueue_items */
- grpc_wakeup_fd workqueue_wakeup_fd;
/* Is the epoll set shutdown */
gpr_atm is_shutdown;
@@ -181,7 +153,9 @@ struct grpc_pollset {
/*******************************************************************************
* Pollset-set Declarations
*/
-struct grpc_pollset_set {};
+struct grpc_pollset_set {
+ char unused;
+};
/*****************************************************************************
* Dedicated polling threads and pollsets - Declarations
@@ -235,8 +209,6 @@ static __thread epoll_set *g_current_thread_epoll_set;
/* Forward declaration */
static void epoll_set_delete(epoll_set *eps);
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -249,57 +221,30 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
-static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
- workqueue_enqueue, workqueue_enqueue, "workqueue"};
-
static void eps_add_ref(epoll_set *eps);
static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps);
-#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG
+#ifndef NDEBUG
static void eps_add_ref_dbg(epoll_set *eps, const char *reason,
const char *file, int line) {
- long old_cnt = gpr_atm_acq_load(&eps->ref_count);
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_atm old_cnt = gpr_atm_acq_load(&eps->ref_count);
+ gpr_log(GPR_DEBUG, "Add ref eps: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
+ " (%s) - (%s, %d)",
+ eps, old_cnt, old_cnt + 1, reason, file, line);
+ }
eps_add_ref(eps);
- gpr_log(GPR_DEBUG, "Add ref eps: %p, old: %ld -> new:%ld (%s) - (%s, %d)",
- (void *)eps, old_cnt, old_cnt + 1, reason, file, line);
}
static void eps_unref_dbg(grpc_exec_ctx *exec_ctx, epoll_set *eps,
const char *reason, const char *file, int line) {
- long old_cnt = gpr_atm_acq_load(&eps->ref_count);
- eps_unref(exec_ctx, eps);
- gpr_log(GPR_DEBUG, "Unref eps: %p, old:%ld -> new:%ld (%s) - (%s, %d)",
- (void *)eps, old_cnt, (old_cnt - 1), reason, file, line);
-}
-
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue,
- const char *file, int line,
- const char *reason) {
- if (workqueue != NULL) {
- eps_add_ref_dbg((epoll_set *)workqueue, reason, file, line);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- const char *file, int line, const char *reason) {
- if (workqueue != NULL) {
- eps_unref_dbg(exec_ctx, (epoll_set *)workqueue, reason, file, line);
- }
-}
-#else
-static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- eps_add_ref((epoll_set *)workqueue);
- }
- return workqueue;
-}
-
-static void workqueue_unref(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue) {
- if (workqueue != NULL) {
- eps_unref(exec_ctx, (epoll_set *)workqueue);
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_atm old_cnt = gpr_atm_acq_load(&eps->ref_count);
+ gpr_log(GPR_DEBUG, "Unref eps: %p, old:%" PRIdPTR " -> new:%" PRIdPTR
+ " (%s) - (%s, %d)",
+ eps, old_cnt, (old_cnt - 1), reason, file, line);
}
+ eps_unref(exec_ctx, eps);
}
#endif
@@ -394,24 +339,15 @@ static epoll_set *epoll_set_create(grpc_error **error) {
*error = GRPC_ERROR_NONE;
eps = gpr_malloc(sizeof(*eps));
- eps->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
eps->epoll_fd = -1;
gpr_mu_init(&eps->mu);
- gpr_mu_init(&eps->workqueue_read_mu);
- gpr_mpscq_init(&eps->workqueue_items);
- gpr_atm_rel_store(&eps->workqueue_item_count, 0);
gpr_atm_rel_store(&eps->ref_count, 0);
gpr_atm_rel_store(&eps->poller_count, 0);
gpr_atm_rel_store(&eps->is_shutdown, false);
- if (!append_error(error, grpc_wakeup_fd_init(&eps->workqueue_wakeup_fd),
- err_desc)) {
- goto done;
- }
-
eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (eps->epoll_fd < 0) {
@@ -419,8 +355,6 @@ static epoll_set *epoll_set_create(grpc_error **error) {
goto done;
}
- epoll_set_add_wakeup_fd_locked(eps, &eps->workqueue_wakeup_fd, error);
-
done:
if (*error != GRPC_ERROR_NONE) {
epoll_set_delete(eps);
@@ -434,57 +368,11 @@ static void epoll_set_delete(epoll_set *eps) {
close(eps->epoll_fd);
}
- GPR_ASSERT(gpr_atm_no_barrier_load(&eps->workqueue_item_count) == 0);
gpr_mu_destroy(&eps->mu);
- gpr_mu_destroy(&eps->workqueue_read_mu);
- gpr_mpscq_destroy(&eps->workqueue_items);
- grpc_wakeup_fd_destroy(&eps->workqueue_wakeup_fd);
gpr_free(eps);
}
-static void workqueue_maybe_wakeup(epoll_set *eps) {
- /* If this thread is the current poller, then it may be that it's about to
- decrement the current poller count, so we need to look past this thread */
- bool is_current_poller = (g_current_thread_epoll_set == eps);
- gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0;
- gpr_atm current_pollers = gpr_atm_no_barrier_load(&eps->poller_count);
- /* Only issue a wakeup if it's likely that some poller could come in and take
- it right now. Note that since we do an anticipatory mpscq_pop every poll
- loop, it's ok if we miss the wakeup here, as we'll get the work item when
- the next poller enters anyway. */
- if (current_pollers > min_current_pollers_for_wakeup) {
- GRPC_LOG_IF_ERROR("workqueue_wakeup_fd",
- grpc_wakeup_fd_wakeup(&eps->workqueue_wakeup_fd));
- }
-}
-
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error) {
- GPR_TIMER_BEGIN("workqueue.enqueue", 0);
- grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
- /* take a ref to the workqueue: otherwise it can happen that whatever events
- * this kicks off ends up destroying the workqueue before this function
- * completes */
- GRPC_WORKQUEUE_REF(workqueue, "enqueue");
- epoll_set *eps = (epoll_set *)workqueue;
- gpr_atm last = gpr_atm_no_barrier_fetch_add(&eps->workqueue_item_count, 1);
- closure->error_data.error = error;
- gpr_mpscq_push(&eps->workqueue_items, &closure->next_data.atm_next);
- if (last == 0) {
- workqueue_maybe_wakeup(eps);
- }
-
- GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue");
- GPR_TIMER_END("workqueue.enqueue", 0);
-}
-
-static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
- epoll_set *eps = (epoll_set *)workqueue;
- return workqueue == NULL ? grpc_schedule_on_exec_ctx
- : &eps->workqueue_scheduler;
-}
-
static grpc_error *epoll_set_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -631,7 +519,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->eps = NULL;
}
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->mu);
@@ -680,8 +568,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
}
-static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; }
-
/*******************************************************************************
* Pollset Definitions
*/
@@ -834,7 +720,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
/* Release the ref and set pollset->eps to NULL */
pollset_release_epoll_set(exec_ctx, pollset, "ps_shutdown");
- grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->mu lock must be held by the caller before calling this */
@@ -865,32 +751,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
gpr_mu_destroy(&pollset->mu);
}
-static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, epoll_set *eps) {
- if (gpr_mu_trylock(&eps->workqueue_read_mu)) {
- gpr_mpscq_node *n = gpr_mpscq_pop(&eps->workqueue_items);
- gpr_mu_unlock(&eps->workqueue_read_mu);
- if (n != NULL) {
- if (gpr_atm_full_fetch_add(&eps->workqueue_item_count, -1) > 1) {
- workqueue_maybe_wakeup(eps);
- }
- grpc_closure *c = (grpc_closure *)n;
- grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
- c->scheduled = false;
-#endif
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- return true;
- } else if (gpr_atm_no_barrier_load(&eps->workqueue_item_count) > 0) {
- /* n == NULL might mean there's work but it's not available to be popped
- * yet - try to ensure another workqueue wakes up to check shortly if so
- */
- workqueue_maybe_wakeup(eps);
- }
- }
- return false;
-}
-
/* Blocking call */
static void acquire_epoll_lease(epoll_set *eps) {
if (g_num_threads_per_eps > 1) {
@@ -934,12 +794,7 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_set *eps,
for (int i = 0; i < ep_rv; ++i) {
void *data_ptr = ep_ev[i].data.ptr;
- if (data_ptr == &eps->workqueue_wakeup_fd) {
- append_error(error,
- grpc_wakeup_fd_consume_wakeup(&eps->workqueue_wakeup_fd),
- err_desc);
- maybe_do_workqueue_work(exec_ctx, eps);
- } else if (data_ptr == &epoll_set_wakeup_fd) {
+ if (data_ptr == &epoll_set_wakeup_fd) {
gpr_atm_rel_store(&eps->is_shutdown, 1);
gpr_log(GPR_INFO, "pollset poller: shutdown set");
} else {
@@ -966,18 +821,13 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps,
epoll set. */
epoll_fd = eps->epoll_fd;
- /* If we get some workqueue work to do, it might end up completing an item on
- the completion queue, so there's no need to poll... so we skip that and
- redo the complete loop to verify */
- if (!maybe_do_workqueue_work(exec_ctx, eps)) {
- gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
- g_current_thread_epoll_set = eps;
+ gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1);
+ g_current_thread_epoll_set = eps;
- do_epoll_wait(exec_ctx, epoll_fd, eps, error);
+ do_epoll_wait(exec_ctx, epoll_fd, eps, error);
- g_current_thread_epoll_set = NULL;
- gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1);
- }
+ g_current_thread_epoll_set = NULL;
+ gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1);
GPR_TIMER_END("epoll_set_work", 0);
}
@@ -1120,7 +970,6 @@ static const grpc_event_engine_vtable vtable = {
.fd_notify_on_read = fd_notify_on_read,
.fd_notify_on_write = fd_notify_on_write,
.fd_get_read_notifier_pollset = fd_get_read_notifier_pollset,
- .fd_get_workqueue = fd_get_workqueue,
.pollset_init = pollset_init,
.pollset_shutdown = pollset_shutdown,
@@ -1138,10 +987,6 @@ static const grpc_event_engine_vtable vtable = {
.pollset_set_add_fd = pollset_set_add_fd,
.pollset_set_del_fd = pollset_set_del_fd,
- .workqueue_ref = workqueue_ref,
- .workqueue_unref = workqueue_unref,
- .workqueue_scheduler = workqueue_scheduler,
-
.shutdown_engine = shutdown_engine,
};