diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epoll_thread_pool_linux.c')
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_thread_pool_linux.c | 229 |
1 files changed, 37 insertions, 192 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c index bb44321922..49be72c03e 100644 --- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c +++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c @@ -1,33 +1,18 @@ /* * - * Copyright 2017, Google Inc. - * All rights reserved. + * Copyright 2017 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -61,7 +46,6 @@ #include "src/core/lib/iomgr/lockfree_event.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" -#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -109,23 +93,20 @@ static void fd_global_shutdown(void); * epoll set Declarations */ -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#ifndef NDEBUG #define EPS_ADD_REF(p, r) eps_add_ref_dbg((p), (r), __FILE__, __LINE__) #define EPS_UNREF(exec_ctx, p, r) \ eps_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) -#else /* defined(GRPC_WORKQUEUE_REFCOUNT_DEBUG) */ +#else #define EPS_ADD_REF(p, r) eps_add_ref((p)) #define EPS_UNREF(exec_ctx, p, r) eps_unref((exec_ctx), (p)) -#endif /* !defined(GRPC_EPS_REF_COUNT_DEBUG) */ +#endif -/* This is also used as grpc_workqueue (by directly casting it) */ typedef struct epoll_set { - grpc_closure_scheduler workqueue_scheduler; - /* Mutex poller should acquire to poll this. This enforces that only one * poller can be polling on epoll_set at any time */ gpr_mu mu; @@ -139,15 +120,6 @@ typedef struct epoll_set { /* Number of threads currently polling on this epoll set*/ gpr_atm poller_count; - /* Mutex guarding the read end of the workqueue (must be held to pop from - * workqueue_items) */ - gpr_mu workqueue_read_mu; - /* Queue of closures to be executed */ - gpr_mpscq workqueue_items; - /* Count of items in workqueue_items */ - gpr_atm workqueue_item_count; - /* Wakeup fd used to wake pollers to check the contents of workqueue_items */ - grpc_wakeup_fd workqueue_wakeup_fd; /* Is the epoll set shutdown */ gpr_atm is_shutdown; @@ -181,7 +153,9 @@ struct grpc_pollset { /******************************************************************************* * Pollset-set Declarations */ -struct grpc_pollset_set {}; +struct grpc_pollset_set { + char unused; +}; /***************************************************************************** * Dedicated polling threads and pollsets - Declarations @@ -235,8 +209,6 @@ static __thread epoll_set *g_current_thread_epoll_set; /* Forward declaration */ static void epoll_set_delete(epoll_set *eps); -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -249,57 +221,30 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ -static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = { - workqueue_enqueue, workqueue_enqueue, "workqueue"}; - static void eps_add_ref(epoll_set *eps); static void eps_unref(grpc_exec_ctx *exec_ctx, epoll_set *eps); -#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +#ifndef NDEBUG static void eps_add_ref_dbg(epoll_set *eps, const char *reason, const char *file, int line) { - long old_cnt = gpr_atm_acq_load(&eps->ref_count); + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_atm old_cnt = gpr_atm_acq_load(&eps->ref_count); + gpr_log(GPR_DEBUG, "Add ref eps: %p, old:%" PRIdPTR " -> new:%" PRIdPTR + " (%s) - (%s, %d)", + eps, old_cnt, old_cnt + 1, reason, file, line); + } eps_add_ref(eps); - gpr_log(GPR_DEBUG, "Add ref eps: %p, old: %ld -> new:%ld (%s) - (%s, %d)", - (void *)eps, old_cnt, old_cnt + 1, reason, file, line); } static void eps_unref_dbg(grpc_exec_ctx *exec_ctx, epoll_set *eps, const char *reason, const char *file, int line) { - long old_cnt = gpr_atm_acq_load(&eps->ref_count); - eps_unref(exec_ctx, eps); - gpr_log(GPR_DEBUG, "Unref eps: %p, old:%ld -> new:%ld (%s) - (%s, %d)", - (void *)eps, old_cnt, (old_cnt - 1), reason, file, line); -} - -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue, - const char *file, int line, - const char *reason) { - if (workqueue != NULL) { - eps_add_ref_dbg((epoll_set *)workqueue, reason, file, line); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - const char *file, int line, const char *reason) { - if (workqueue != NULL) { - eps_unref_dbg(exec_ctx, (epoll_set *)workqueue, reason, file, line); - } -} -#else -static grpc_workqueue *workqueue_ref(grpc_workqueue *workqueue) { - if (workqueue != NULL) { - eps_add_ref((epoll_set *)workqueue); - } - return workqueue; -} - -static void workqueue_unref(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - if (workqueue != NULL) { - eps_unref(exec_ctx, (epoll_set *)workqueue); + if (GRPC_TRACER_ON(grpc_polling_trace)) { + gpr_atm old_cnt = gpr_atm_acq_load(&eps->ref_count); + gpr_log(GPR_DEBUG, "Unref eps: %p, old:%" PRIdPTR " -> new:%" PRIdPTR + " (%s) - (%s, %d)", + eps, old_cnt, (old_cnt - 1), reason, file, line); } + eps_unref(exec_ctx, eps); } #endif @@ -394,24 +339,15 @@ static epoll_set *epoll_set_create(grpc_error **error) { *error = GRPC_ERROR_NONE; eps = gpr_malloc(sizeof(*eps)); - eps->workqueue_scheduler.vtable = &workqueue_scheduler_vtable; eps->epoll_fd = -1; gpr_mu_init(&eps->mu); - gpr_mu_init(&eps->workqueue_read_mu); - gpr_mpscq_init(&eps->workqueue_items); - gpr_atm_rel_store(&eps->workqueue_item_count, 0); gpr_atm_rel_store(&eps->ref_count, 0); gpr_atm_rel_store(&eps->poller_count, 0); gpr_atm_rel_store(&eps->is_shutdown, false); - if (!append_error(error, grpc_wakeup_fd_init(&eps->workqueue_wakeup_fd), - err_desc)) { - goto done; - } - eps->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (eps->epoll_fd < 0) { @@ -419,8 +355,6 @@ static epoll_set *epoll_set_create(grpc_error **error) { goto done; } - epoll_set_add_wakeup_fd_locked(eps, &eps->workqueue_wakeup_fd, error); - done: if (*error != GRPC_ERROR_NONE) { epoll_set_delete(eps); @@ -434,57 +368,11 @@ static void epoll_set_delete(epoll_set *eps) { close(eps->epoll_fd); } - GPR_ASSERT(gpr_atm_no_barrier_load(&eps->workqueue_item_count) == 0); gpr_mu_destroy(&eps->mu); - gpr_mu_destroy(&eps->workqueue_read_mu); - gpr_mpscq_destroy(&eps->workqueue_items); - grpc_wakeup_fd_destroy(&eps->workqueue_wakeup_fd); gpr_free(eps); } -static void workqueue_maybe_wakeup(epoll_set *eps) { - /* If this thread is the current poller, then it may be that it's about to - decrement the current poller count, so we need to look past this thread */ - bool is_current_poller = (g_current_thread_epoll_set == eps); - gpr_atm min_current_pollers_for_wakeup = is_current_poller ? 1 : 0; - gpr_atm current_pollers = gpr_atm_no_barrier_load(&eps->poller_count); - /* Only issue a wakeup if it's likely that some poller could come in and take - it right now. Note that since we do an anticipatory mpscq_pop every poll - loop, it's ok if we miss the wakeup here, as we'll get the work item when - the next poller enters anyway. */ - if (current_pollers > min_current_pollers_for_wakeup) { - GRPC_LOG_IF_ERROR("workqueue_wakeup_fd", - grpc_wakeup_fd_wakeup(&eps->workqueue_wakeup_fd)); - } -} - -static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error) { - GPR_TIMER_BEGIN("workqueue.enqueue", 0); - grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler; - /* take a ref to the workqueue: otherwise it can happen that whatever events - * this kicks off ends up destroying the workqueue before this function - * completes */ - GRPC_WORKQUEUE_REF(workqueue, "enqueue"); - epoll_set *eps = (epoll_set *)workqueue; - gpr_atm last = gpr_atm_no_barrier_fetch_add(&eps->workqueue_item_count, 1); - closure->error_data.error = error; - gpr_mpscq_push(&eps->workqueue_items, &closure->next_data.atm_next); - if (last == 0) { - workqueue_maybe_wakeup(eps); - } - - GRPC_WORKQUEUE_UNREF(exec_ctx, workqueue, "enqueue"); - GPR_TIMER_END("workqueue.enqueue", 0); -} - -static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) { - epoll_set *eps = (epoll_set *)workqueue; - return workqueue == NULL ? grpc_schedule_on_exec_ctx - : &eps->workqueue_scheduler; -} - static grpc_error *epoll_set_global_init() { grpc_error *error = GRPC_ERROR_NONE; @@ -631,7 +519,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, fd->eps = NULL; } - grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->mu); @@ -680,8 +568,6 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure); } -static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } - /******************************************************************************* * Pollset Definitions */ @@ -834,7 +720,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, /* Release the ref and set pollset->eps to NULL */ pollset_release_epoll_set(exec_ctx, pollset, "ps_shutdown"); - grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); } /* pollset->mu lock must be held by the caller before calling this */ @@ -865,32 +751,6 @@ static void pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); } -static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, epoll_set *eps) { - if (gpr_mu_trylock(&eps->workqueue_read_mu)) { - gpr_mpscq_node *n = gpr_mpscq_pop(&eps->workqueue_items); - gpr_mu_unlock(&eps->workqueue_read_mu); - if (n != NULL) { - if (gpr_atm_full_fetch_add(&eps->workqueue_item_count, -1) > 1) { - workqueue_maybe_wakeup(eps); - } - grpc_closure *c = (grpc_closure *)n; - grpc_error *error = c->error_data.error; -#ifndef NDEBUG - c->scheduled = false; -#endif - c->cb(exec_ctx, c->cb_arg, error); - GRPC_ERROR_UNREF(error); - return true; - } else if (gpr_atm_no_barrier_load(&eps->workqueue_item_count) > 0) { - /* n == NULL might mean there's work but it's not available to be popped - * yet - try to ensure another workqueue wakes up to check shortly if so - */ - workqueue_maybe_wakeup(eps); - } - } - return false; -} - /* Blocking call */ static void acquire_epoll_lease(epoll_set *eps) { if (g_num_threads_per_eps > 1) { @@ -934,12 +794,7 @@ static void do_epoll_wait(grpc_exec_ctx *exec_ctx, int epoll_fd, epoll_set *eps, for (int i = 0; i < ep_rv; ++i) { void *data_ptr = ep_ev[i].data.ptr; - if (data_ptr == &eps->workqueue_wakeup_fd) { - append_error(error, - grpc_wakeup_fd_consume_wakeup(&eps->workqueue_wakeup_fd), - err_desc); - maybe_do_workqueue_work(exec_ctx, eps); - } else if (data_ptr == &epoll_set_wakeup_fd) { + if (data_ptr == &epoll_set_wakeup_fd) { gpr_atm_rel_store(&eps->is_shutdown, 1); gpr_log(GPR_INFO, "pollset poller: shutdown set"); } else { @@ -966,18 +821,13 @@ static void epoll_set_work(grpc_exec_ctx *exec_ctx, epoll_set *eps, epoll set. */ epoll_fd = eps->epoll_fd; - /* If we get some workqueue work to do, it might end up completing an item on - the completion queue, so there's no need to poll... so we skip that and - redo the complete loop to verify */ - if (!maybe_do_workqueue_work(exec_ctx, eps)) { - gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1); - g_current_thread_epoll_set = eps; + gpr_atm_no_barrier_fetch_add(&eps->poller_count, 1); + g_current_thread_epoll_set = eps; - do_epoll_wait(exec_ctx, epoll_fd, eps, error); + do_epoll_wait(exec_ctx, epoll_fd, eps, error); - g_current_thread_epoll_set = NULL; - gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1); - } + g_current_thread_epoll_set = NULL; + gpr_atm_no_barrier_fetch_add(&eps->poller_count, -1); GPR_TIMER_END("epoll_set_work", 0); } @@ -1120,7 +970,6 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, - .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, @@ -1138,10 +987,6 @@ static const grpc_event_engine_vtable vtable = { .pollset_set_add_fd = pollset_set_add_fd, .pollset_set_del_fd = pollset_set_del_fd, - .workqueue_ref = workqueue_ref, - .workqueue_unref = workqueue_unref, - .workqueue_scheduler = workqueue_scheduler, - .shutdown_engine = shutdown_engine, }; |