diff options
Diffstat (limited to 'src/core/lib/iomgr/pollset_uv.cc')
-rw-r--r-- | src/core/lib/iomgr/pollset_uv.cc | 145 |
1 files changed, 40 insertions, 105 deletions
diff --git a/src/core/lib/iomgr/pollset_uv.cc b/src/core/lib/iomgr/pollset_uv.cc index c6a2f43bf1..bade6eae6c 100644 --- a/src/core/lib/iomgr/pollset_uv.cc +++ b/src/core/lib/iomgr/pollset_uv.cc @@ -22,137 +22,72 @@ #ifdef GRPC_UV -#include <uv.h> - -#include <string.h> - #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include <grpc/support/sync.h> - -#include "src/core/lib/iomgr/iomgr_uv.h" -#include "src/core/lib/iomgr/pollset.h" -#include "src/core/lib/iomgr/pollset_uv.h" +#include "src/core/lib/iomgr/pollset_custom.h" -#include "src/core/lib/debug/trace.h" - -grpc_core::DebugOnlyTraceFlag grpc_trace_fd_refcount(false, "fd_refcount"); - -struct grpc_pollset { - uv_timer_t* timer; - int shutting_down; -}; +#include <uv.h> /* Indicates that grpc_pollset_work should run an iteration of the UV loop before running callbacks. This defaults to 1, and should be disabled if grpc_pollset_work will be called within the callstack of uv_run */ -int grpc_pollset_work_run_loop; - -gpr_mu grpc_polling_mu; +int grpc_pollset_work_run_loop = 1; -/* This is used solely to kick the uv loop, by setting a callback to be run - immediately in the next loop iteration. - Note: In the future, if there is a bug that involves missing wakeups in the - future, try adding a uv_async_t to kick the loop differently */ -uv_timer_t* dummy_uv_handle; +static bool g_kicked = false; -size_t grpc_pollset_size() { return sizeof(grpc_pollset); } +typedef struct uv_poller_handle { + uv_timer_t poll_timer; + uv_timer_t kick_timer; + int refs; +} uv_poller_handle; -void dummy_timer_cb(uv_timer_t* handle) {} +static uv_poller_handle* g_handle; -void dummy_handle_close_cb(uv_handle_t* handle) { gpr_free(handle); } - -void grpc_pollset_global_init(void) { - gpr_mu_init(&grpc_polling_mu); - dummy_uv_handle = (uv_timer_t*)gpr_malloc(sizeof(uv_timer_t)); - uv_timer_init(uv_default_loop(), dummy_uv_handle); - grpc_pollset_work_run_loop = 1; -} - -void grpc_pollset_global_shutdown(void) { - GRPC_UV_ASSERT_SAME_THREAD(); - gpr_mu_destroy(&grpc_polling_mu); - uv_close((uv_handle_t*)dummy_uv_handle, dummy_handle_close_cb); +static void init() { + g_handle = (uv_poller_handle*)gpr_malloc(sizeof(uv_poller_handle)); + g_handle->refs = 2; + uv_timer_init(uv_default_loop(), &g_handle->poll_timer); + uv_timer_init(uv_default_loop(), &g_handle->kick_timer); } -static void timer_run_cb(uv_timer_t* timer) {} +static void empty_timer_cb(uv_timer_t* handle) {} -static void timer_close_cb(uv_handle_t* handle) { - handle->data = (void*)1; - gpr_free(handle); -} +static void kick_timer_cb(uv_timer_t* handle) { g_kicked = false; } -void grpc_pollset_init(grpc_pollset* pollset, gpr_mu** mu) { - GRPC_UV_ASSERT_SAME_THREAD(); - *mu = &grpc_polling_mu; - pollset->timer = (uv_timer_t*)gpr_malloc(sizeof(uv_timer_t)); - uv_timer_init(uv_default_loop(), pollset->timer); - pollset->shutting_down = 0; +static void run_loop(size_t timeout) { + if (grpc_pollset_work_run_loop) { + if (timeout == 0) { + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + } else { + uv_timer_start(&g_handle->poll_timer, empty_timer_cb, timeout, 0); + uv_run(uv_default_loop(), UV_RUN_ONCE); + uv_timer_stop(&g_handle->poll_timer); + } + } } -void grpc_pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { - GPR_ASSERT(!pollset->shutting_down); - GRPC_UV_ASSERT_SAME_THREAD(); - pollset->shutting_down = 1; - if (grpc_pollset_work_run_loop) { - // Drain any pending UV callbacks without blocking - uv_run(uv_default_loop(), UV_RUN_NOWAIT); - } else { - // kick the loop once - uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); +static void kick() { + if (!g_kicked) { + g_kicked = true; + uv_timer_start(&g_handle->kick_timer, kick_timer_cb, 0, 0); } - GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE); } -void grpc_pollset_destroy(grpc_pollset* pollset) { - GRPC_UV_ASSERT_SAME_THREAD(); - uv_close((uv_handle_t*)pollset->timer, timer_close_cb); - // timer.data is a boolean indicating that the timer has finished closing - pollset->timer->data = (void*)0; - if (grpc_pollset_work_run_loop) { - while (!pollset->timer->data) { - uv_run(uv_default_loop(), UV_RUN_NOWAIT); - } +static void close_timer_cb(uv_handle_t* handle) { + g_handle->refs--; + if (g_handle->refs == 0) { + gpr_free(g_handle); } } -grpc_error* grpc_pollset_work(grpc_pollset* pollset, - grpc_pollset_worker** worker_hdl, - grpc_millis deadline) { - uint64_t timeout; - GRPC_UV_ASSERT_SAME_THREAD(); - gpr_mu_unlock(&grpc_polling_mu); +static void shutdown() { + uv_close((uv_handle_t*)&g_handle->poll_timer, close_timer_cb); + uv_close((uv_handle_t*)&g_handle->kick_timer, close_timer_cb); if (grpc_pollset_work_run_loop) { - grpc_millis now = grpc_core::ExecCtx::Get()->Now(); - if (deadline >= now) { - timeout = deadline - now; - } else { - timeout = 0; - } - /* We special-case timeout=0 so that we don't bother with the timer when - the loop won't block anyway */ - if (timeout > 0) { - uv_timer_start(pollset->timer, timer_run_cb, timeout, 0); - /* Run until there is some I/O activity or the timer triggers. It doesn't - matter which happens */ - uv_run(uv_default_loop(), UV_RUN_ONCE); - uv_timer_stop(pollset->timer); - } else { - uv_run(uv_default_loop(), UV_RUN_NOWAIT); - } - } - if (!grpc_closure_list_empty(*grpc_core::ExecCtx::Get()->closure_list())) { - grpc_core::ExecCtx::Get()->Flush(); + GPR_ASSERT(uv_run(uv_default_loop(), UV_RUN_DEFAULT) == 0); } - gpr_mu_lock(&grpc_polling_mu); - return GRPC_ERROR_NONE; } -grpc_error* grpc_pollset_kick(grpc_pollset* pollset, - grpc_pollset_worker* specific_worker) { - GRPC_UV_ASSERT_SAME_THREAD(); - uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); - return GRPC_ERROR_NONE; -} +grpc_custom_poller_vtable uv_pollset_vtable = {init, run_loop, kick, shutdown}; #endif /* GRPC_UV */ |