diff options
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/closure.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/closure.h | 18 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.c | 293 | ||||
-rw-r--r-- | src/core/lib/iomgr/combiner.h | 71 | ||||
-rw-r--r-- | src/core/lib/iomgr/error.c | 2 | ||||
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/exec_ctx.h | 6 | ||||
-rw-r--r-- | src/core/lib/iomgr/tcp_posix.c | 17 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue.h | 4 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.c | 97 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_posix.h | 9 | ||||
-rw-r--r-- | src/core/lib/iomgr/workqueue_windows.c | 2 |
12 files changed, 482 insertions, 45 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 0b6c3b2539..1ba0a5c141 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -41,6 +41,10 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, closure->cb_arg = cb_arg; } +void grpc_closure_list_init(grpc_closure_list *closure_list) { + closure_list->head = closure_list->tail = NULL; +} + void grpc_closure_list_append(grpc_closure_list *closure_list, grpc_closure *closure, grpc_error *error) { if (closure == NULL) { diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 08e59a168e..c1a22b6021 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -37,6 +37,7 @@ #include <grpc/support/port_platform.h> #include <stdbool.h> #include "src/core/lib/iomgr/error.h" +#include "src/core/lib/support/mpscq.h" struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -60,6 +61,14 @@ typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg, /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { + /** Once queued, next indicates the next queued closure; before then, scratch + * space */ + union { + grpc_closure *next; + gpr_mpscq_node atm_next; + uintptr_t scratch; + } next_data; + /** Bound callback. */ grpc_iomgr_cb_func cb; @@ -68,13 +77,6 @@ struct grpc_closure { /** Once queued, the result of the closure. Before then: scratch space */ grpc_error *error; - - /** Once queued, next indicates the next queued closure; before then, scratch - * space */ - union { - grpc_closure *next; - uintptr_t scratch; - } next_data; }; /** Initializes \a closure with \a cb and \a cb_arg. */ @@ -87,6 +89,8 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg); #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } +void grpc_closure_list_init(grpc_closure_list *list); + /** add \a closure to the end of \a list and set \a closure's result to \a error */ void grpc_closure_list_append(grpc_closure_list *list, grpc_closure *closure, diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c new file mode 100644 index 0000000000..831bdb4aff --- /dev/null +++ b/src/core/lib/iomgr/combiner.c @@ -0,0 +1,293 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/iomgr/combiner.h" + +#include <string.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> + +#include "src/core/lib/iomgr/workqueue.h" +#include "src/core/lib/profiling/timers.h" + +int grpc_combiner_trace = 0; + +#define GRPC_COMBINER_TRACE(fn) \ + do { \ + if (grpc_combiner_trace) { \ + fn; \ + } \ + } while (0) + +struct grpc_combiner { + grpc_workqueue *optional_workqueue; + gpr_mpscq queue; + // state is: + // lower bit - zero if orphaned + // other bits - number of items queued on the lock + gpr_atm state; + bool take_async_break_before_final_list; + grpc_closure_list final_list; + grpc_closure continue_finishing; +}; + +grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) { + grpc_combiner *lock = gpr_malloc(sizeof(*lock)); + lock->optional_workqueue = optional_workqueue; + gpr_atm_no_barrier_store(&lock->state, 1); + gpr_mpscq_init(&lock->queue); + lock->take_async_break_before_final_list = false; + grpc_closure_list_init(&lock->final_list); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); + return lock; +} + +static void really_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p really_destroy", lock)); + GPR_ASSERT(gpr_atm_no_barrier_load(&lock->state) == 0); + gpr_mpscq_destroy(&lock->queue); + GRPC_WORKQUEUE_UNREF(exec_ctx, lock->optional_workqueue, "combiner"); + gpr_free(lock); +} + +void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -1); + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, "C:%p really_destroy old_state=%" PRIdPTR, lock, old_state)); + if (old_state == 1) { + really_destroy(exec_ctx, lock); + } +} + +static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); +static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); + +static void continue_finishing_mainline(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + GPR_TIMER_BEGIN("combiner.continue_executing_mainline", 0); + grpc_combiner *lock = arg; + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p continue_finishing_mainline", lock)); + GPR_ASSERT(exec_ctx->active_combiner == NULL); + exec_ctx->active_combiner = lock; + if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); + GPR_ASSERT(exec_ctx->active_combiner == lock); + exec_ctx->active_combiner = NULL; + GPR_TIMER_END("combiner.continue_executing_mainline", 0); +} + +static void execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + GPR_TIMER_BEGIN("combiner.execute_final", 0); + grpc_closure *c = lock->final_list.head; + GPR_ASSERT(c != NULL); + grpc_closure_list_init(&lock->final_list); + lock->take_async_break_before_final_list = false; + int loops = 0; + while (c != NULL) { + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); + grpc_closure *next = c->next_data.next; + grpc_error *error = c->error; + c->cb(exec_ctx, c->cb_arg, error); + GRPC_ERROR_UNREF(error); + c = next; + loops++; + } + GPR_TIMER_END("combiner.execute_final", 0); +} + +static void continue_executing_final(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + GPR_TIMER_BEGIN("combiner.continue_executing_final", 0); + grpc_combiner *lock = arg; + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p continue_executing_final", lock)); + GPR_ASSERT(exec_ctx->active_combiner == NULL); + exec_ctx->active_combiner = lock; + // quick peek to see if new things have turned up on the queue: if so, go back + // to executing them before the final list + if ((gpr_atm_acq_load(&lock->state) >> 1) > 1) { + if (maybe_finish_one(exec_ctx, lock)) finish(exec_ctx, lock); + } else { + execute_final(exec_ctx, lock); + finish(exec_ctx, lock); + } + GPR_ASSERT(exec_ctx->active_combiner == lock); + exec_ctx->active_combiner = NULL; + GPR_TIMER_END("combiner.continue_executing_final", 0); +} + +static bool start_execute_final(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + GPR_TIMER_BEGIN("combiner.start_execute_final", 0); + GPR_ASSERT(exec_ctx->active_combiner == lock); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, + "C:%p start_execute_final take_async_break_before_final_list=%d", + lock, lock->take_async_break_before_final_list)); + if (lock->take_async_break_before_final_list) { + grpc_closure_init(&lock->continue_finishing, continue_executing_final, + lock); + grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, + GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); + GPR_TIMER_END("combiner.start_execute_final", 0); + return false; + } else { + execute_final(exec_ctx, lock); + GPR_TIMER_END("combiner.start_execute_final", 0); + return true; + } +} + +static bool maybe_finish_one(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + GPR_TIMER_BEGIN("combiner.maybe_finish_one", 0); + gpr_mpscq_node *n = gpr_mpscq_pop(&lock->queue); + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p maybe_finish_one n=%p", lock, n)); + GPR_ASSERT(exec_ctx->active_combiner == lock); + if (n == NULL) { + // Queue is in an transiently inconsistent state: a new item is being queued + // but is not visible to this thread yet. + // Use this as a cue that we should go off and do something else for a while + // (and come back later) + grpc_closure_init(&lock->continue_finishing, continue_finishing_mainline, + lock); + grpc_exec_ctx_sched(exec_ctx, &lock->continue_finishing, GRPC_ERROR_NONE, + GRPC_WORKQUEUE_REF(lock->optional_workqueue, "sched")); + GPR_TIMER_END("combiner.maybe_finish_one", 0); + return false; + } + grpc_closure *cl = (grpc_closure *)n; + grpc_error *error = cl->error; + cl->cb(exec_ctx, cl->cb_arg, error); + GRPC_ERROR_UNREF(error); + GPR_TIMER_END("combiner.maybe_finish_one", 0); + return true; +} + +static void finish(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { + bool (*executor)(grpc_exec_ctx * exec_ctx, grpc_combiner * lock); + GPR_TIMER_BEGIN("combiner.finish", 0); + int loops = 0; + do { + executor = maybe_finish_one; + gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -2); + GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, + "C:%p finish[%d] old_state=%" PRIdPTR, lock, + loops, old_state)); + switch (old_state) { + default: + // we have multiple queued work items: just continue executing them + break; + case 5: // we're down to one queued item: if it's the final list we + case 4: // should do that + if (!grpc_closure_list_empty(lock->final_list)) { + executor = start_execute_final; + } + break; + case 3: // had one count, one unorphaned --> unlocked unorphaned + GPR_TIMER_END("combiner.finish", 0); + return; + case 2: // and one count, one orphaned --> unlocked and orphaned + really_destroy(exec_ctx, lock); + GPR_TIMER_END("combiner.finish", 0); + return; + case 1: + case 0: + // these values are illegal - representing an already unlocked or + // deleted lock + GPR_UNREACHABLE_CODE(return ); + } + loops++; + } while (executor(exec_ctx, lock)); + GPR_TIMER_END("combiner.finish", 0); +} + +void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, + grpc_closure *cl, grpc_error *error) { + GRPC_COMBINER_TRACE( + gpr_log(GPR_DEBUG, "C:%p grpc_combiner_execute c=%p", lock, cl)); + GPR_TIMER_BEGIN("combiner.execute", 0); + gpr_atm last = gpr_atm_full_fetch_add(&lock->state, 2); + GPR_ASSERT(last & 1); // ensure lock has not been destroyed + if (last == 1) { + exec_ctx->active_combiner = lock; + GPR_TIMER_BEGIN("combiner.execute_first_cb", 0); + cl->cb(exec_ctx, cl->cb_arg, error); + GPR_TIMER_END("combiner.execute_first_cb", 0); + GRPC_ERROR_UNREF(error); + finish(exec_ctx, lock); + GPR_ASSERT(exec_ctx->active_combiner == lock); + exec_ctx->active_combiner = NULL; + } else { + cl->error = error; + gpr_mpscq_push(&lock->queue, &cl->next_data.atm_next); + } + GPR_TIMER_END("combiner.execute", 0); +} + +static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure, + grpc_error *error) { + grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure, + GRPC_ERROR_REF(error), false); +} + +void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, + grpc_closure *closure, grpc_error *error, + bool force_async_break) { + GRPC_COMBINER_TRACE(gpr_log( + GPR_DEBUG, + "C:%p grpc_combiner_execute_finally c=%p force_async_break=%d; ac=%p", + lock, closure, force_async_break, exec_ctx->active_combiner)); + GPR_TIMER_BEGIN("combiner.execute_finally", 0); + if (exec_ctx->active_combiner != lock) { + GPR_TIMER_MARK("slowpath", 0); + grpc_combiner_execute(exec_ctx, lock, + grpc_closure_create(enqueue_finally, closure), error); + GPR_TIMER_END("combiner.execute_finally", 0); + return; + } + + if (force_async_break) { + lock->take_async_break_before_final_list = true; + } + if (grpc_closure_list_empty(lock->final_list)) { + gpr_atm_full_fetch_add(&lock->state, 2); + } + grpc_closure_list_append(&lock->final_list, closure, error); + GPR_TIMER_END("combiner.execute_finally", 0); +} + +void grpc_combiner_force_async_finally(grpc_combiner *lock) { + lock->take_async_break_before_final_list = true; +} diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h new file mode 100644 index 0000000000..1409db24b9 --- /dev/null +++ b/src/core/lib/iomgr/combiner.h @@ -0,0 +1,71 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_IOMGR_COMBINER_H +#define GRPC_CORE_LIB_IOMGR_COMBINER_H + +#include <stddef.h> + +#include <grpc/support/atm.h> +#include "src/core/lib/iomgr/exec_ctx.h" +#include "src/core/lib/support/mpscq.h" + +// Provides serialized access to some resource. +// Each action queued on a combiner is executed serially in a borrowed thread. +// The actual thread executing actions may change over time (but there will only +// every be one at a time). + +// Initialize the lock, with an optional workqueue to shift load to when +// necessary +grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue); +// Destroy the lock +void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock); +// Execute \a action within the lock. +void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, + grpc_closure *closure, grpc_error *error); +// Execute \a action within the lock just prior to unlocking. +// if \a hint_async_break is true, the combiner tries to hand execution to +// another thread before finishing the primary queue of combined closures and +// executing the finally list. +// Deprecation warning: \a hint_async_break will be removed in a future version +// Takes a very slow and round-about path if not called from a +// grpc_combiner_execute closure. +void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock, + grpc_closure *closure, grpc_error *error, + bool hint_async_break); +// Deprecated: force the finally list execution onto another thread +void grpc_combiner_force_async_finally(grpc_combiner *lock); + +extern int grpc_combiner_trace; + +#endif /* GRPC_CORE_LIB_IOMGR_COMBINER_H */ diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index 149c55663c..e366961936 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -332,7 +332,7 @@ grpc_error *grpc_error_add_child(grpc_error *src, grpc_error *child) { return new; } -static const char *no_error_string = "null"; +static const char *no_error_string = "\"No Error\""; static const char *oom_error_string = "\"Out of memory\""; static const char *cancelled_error_string = "\"Cancelled\""; diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index eba347125e..740920d760 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -1531,6 +1531,8 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { + GPR_TIMER_BEGIN("pollset_add_fd", 0); + grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&pollset->mu); @@ -1643,6 +1645,8 @@ retry: gpr_mu_unlock(&pollset->mu); GRPC_LOG_IF_ERROR("pollset_add_fd", error); + + GPR_TIMER_END("pollset_add_fd", 0); } /******************************************************************************* diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 917f332f03..1895ee6245 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -40,8 +40,8 @@ /** A workqueue represents a list of work to be executed asynchronously. Forward declared here to avoid a circular dependency with workqueue.h. */ -struct grpc_workqueue; typedef struct grpc_workqueue grpc_workqueue; +typedef struct grpc_combiner grpc_combiner; #ifndef GRPC_EXECUTION_CONTEXT_SANITIZER /** Execution context. @@ -66,13 +66,15 @@ typedef struct grpc_workqueue grpc_workqueue; */ struct grpc_exec_ctx { grpc_closure_list closure_list; + /** currently active combiner: updated only via combiner.c */ + grpc_combiner *active_combiner; bool cached_ready_to_finish; void *check_ready_to_finish_arg; bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg); }; #define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \ - { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check } + { GRPC_CLOSURE_LIST_INIT, NULL, false, finish_check_arg, finish_check } #else struct grpc_exec_ctx { bool cached_ready_to_finish; diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 974d5ae479..92767721d5 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -379,10 +379,19 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, } if (!tcp_flush(tcp, &error)) { + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "write: delayed"); + } grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { cb = tcp->write_cb; tcp->write_cb = NULL; + if (grpc_tcp_trace) { + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "write: %s", str); + grpc_error_free_string(str); + } + GPR_TIMER_BEGIN("tcp_handle_write.cb", 0); cb->cb(exec_ctx, cb->cb_arg, error); GPR_TIMER_END("tcp_handle_write.cb", 0); @@ -425,8 +434,16 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (!tcp_flush(tcp, &error)) { TCP_REF(tcp, "write"); tcp->write_cb = cb; + if (grpc_tcp_trace) { + gpr_log(GPR_DEBUG, "write: delayed"); + } grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { + if (grpc_tcp_trace) { + const char *str = grpc_error_string(error); + gpr_log(GPR_DEBUG, "write: %s", str); + grpc_error_free_string(str); + } grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 7156e490d7..b2805dc66c 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -50,10 +50,6 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ -/* Deprecated: do not use. - This has *already* been removed in a future commit. */ -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); - /* Reference counting functions. Use the macro's always (GRPC_WORKQUEUE_{REF,UNREF}). diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index e0d6dac230..ecfea68f56 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -44,6 +44,7 @@ #include <grpc/support/useful.h> #include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/profiling/timers.h" static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); @@ -52,8 +53,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, char name[32]; *workqueue = gpr_malloc(sizeof(grpc_workqueue)); gpr_ref_init(&(*workqueue)->refs, 1); - gpr_mu_init(&(*workqueue)->mu); - (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL; + gpr_atm_no_barrier_store(&(*workqueue)->state, 1); grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); if (err != GRPC_ERROR_NONE) { gpr_free(*workqueue); @@ -62,6 +62,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, sprintf(name, "workqueue:%p", (void *)(*workqueue)); (*workqueue)->wakeup_read_fd = grpc_fd_create( GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); + gpr_mpscq_init(&(*workqueue)->queue); grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue); grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, &(*workqueue)->read_closure); @@ -70,57 +71,79 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, static void workqueue_destroy(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd); } +static void workqueue_orphan(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue) { + if (gpr_atm_full_fetch_add(&workqueue->state, -1) == 1) { + workqueue_destroy(exec_ctx, workqueue); + } +} + #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason) { + if (workqueue == NULL) return; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p ref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count + 1, reason); + gpr_ref(&workqueue->refs); +} #else void grpc_workqueue_ref(grpc_workqueue *workqueue) { -#endif + if (workqueue == NULL) return; gpr_ref(&workqueue->refs); } +#endif #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason) { + if (workqueue == NULL) return; gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1, reason); + if (gpr_unref(&workqueue->refs)) { + workqueue_orphan(exec_ctx, workqueue); + } +} #else void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { -#endif + if (workqueue == NULL) return; if (gpr_unref(&workqueue->refs)) { - workqueue_destroy(exec_ctx, workqueue); + workqueue_orphan(exec_ctx, workqueue); } } +#endif + +static void drain(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { + abort(); +} -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - gpr_mu_lock(&workqueue->mu); - grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); - gpr_mu_unlock(&workqueue->mu); +static void wakeup(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { + GPR_TIMER_MARK("workqueue.wakeup", 0); + grpc_error *err = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + if (!GRPC_LOG_IF_ERROR("wakeupfd_wakeup", err)) { + drain(exec_ctx, workqueue); + } } static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + GPR_TIMER_BEGIN("workqueue.on_readable", 0); + grpc_workqueue *workqueue = arg; if (error != GRPC_ERROR_NONE) { - gpr_mu_destroy(&workqueue->mu); /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); grpc_fd_orphan(exec_ctx, workqueue->wakeup_read_fd, NULL, NULL, "destroy"); + GPR_ASSERT(gpr_atm_no_barrier_load(&workqueue->state) == 0); gpr_free(workqueue); } else { - gpr_mu_lock(&workqueue->mu); - grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); - gpr_mu_unlock(&workqueue->mu); + gpr_mpscq_node *n = gpr_mpscq_pop(&workqueue->queue); if (error == GRPC_ERROR_NONE) { grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, &workqueue->read_closure); @@ -128,24 +151,46 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { /* recurse to get error handling */ on_readable(exec_ctx, arg, error); } + if (n == NULL) { + /* try again - queue in an inconsistant state */ + wakeup(exec_ctx, workqueue); + } else { + switch (gpr_atm_full_fetch_add(&workqueue->state, -2)) { + case 3: // had one count, one unorphaned --> done, unorphaned + break; + case 2: // had one count, one orphaned --> done, orphaned + workqueue_destroy(exec_ctx, workqueue); + break; + case 1: + case 0: + // these values are illegal - representing an already done or + // deleted workqueue + GPR_UNREACHABLE_CODE(break); + default: + // schedule a wakeup since there's more to do + wakeup(exec_ctx, workqueue); + } + grpc_closure *cl = (grpc_closure *)n; + grpc_error *clerr = cl->error; + cl->cb(exec_ctx, cl->cb_arg, clerr); + GRPC_ERROR_UNREF(clerr); + } } + + GPR_TIMER_END("workqueue.on_readable", 0); } void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error) { - grpc_error *push_error = GRPC_ERROR_NONE; - gpr_mu_lock(&workqueue->mu); - if (grpc_closure_list_empty(workqueue->closure_list)) { - push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); - } - grpc_closure_list_append(&workqueue->closure_list, closure, error); - if (push_error != GRPC_ERROR_NONE) { - const char *msg = grpc_error_string(push_error); - gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg); - grpc_error_free_string(msg); - grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); + GPR_TIMER_BEGIN("workqueue.enqueue", 0); + gpr_atm last = gpr_atm_full_fetch_add(&workqueue->state, 2); + GPR_ASSERT(last & 1); + closure->error = error; + gpr_mpscq_push(&workqueue->queue, &closure->next_data.atm_next); + if (last == 1) { + wakeup(exec_ctx, workqueue); } - gpr_mu_unlock(&workqueue->mu); + GPR_TIMER_END("workqueue.enqueue", 0); } #endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h index 0f26ba58e2..03ee21cef7 100644 --- a/src/core/lib/iomgr/workqueue_posix.h +++ b/src/core/lib/iomgr/workqueue_posix.h @@ -35,14 +35,17 @@ #define GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H #include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/support/mpscq.h" struct grpc_fd; struct grpc_workqueue { gpr_refcount refs; - - gpr_mu mu; - grpc_closure_list closure_list; + gpr_mpscq queue; + // state is: + // lower bit - zero if orphaned + // other bits - number of items enqueued + gpr_atm state; grpc_wakeup_fd wakeup_fd; struct grpc_fd *wakeup_read_fd; diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c index 23e2dea185..ee81dc248e 100644 --- a/src/core/lib/iomgr/workqueue_windows.c +++ b/src/core/lib/iomgr/workqueue_windows.c @@ -42,8 +42,6 @@ // context, which is at least correct, if not performant or in the spirit of // workqueues. -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} - #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason) {} |