diff options
author | Craig Tiller <ctiller@google.com> | 2016-03-17 07:49:27 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-03-17 07:49:27 -0700 |
commit | 0239ba8ebf20b3f1cf454ccbf8d9a3800ac9b8c9 (patch) | |
tree | af21c64baffd4d3b7a6ce0a1f67ba5bba9406b57 /src/core | |
parent | 7cd6ed7da391b12b567b64361e1d682be6b574b3 (diff) | |
parent | e4ce826e3009bf0de85225fef53fae330e4496ff (diff) |
Merge github.com:grpc/grpc into cleaner-posix2
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/iomgr/exec_ctx.c | 75 | ||||
-rw-r--r-- | src/core/iomgr/exec_ctx.h | 23 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/pollset.h | 12 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 4 |
5 files changed, 104 insertions, 13 deletions
diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index 1fd79f6eba..893fe4515c 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -34,9 +34,12 @@ #include "src/core/iomgr/exec_ctx.h" #include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> #include "src/core/profiling/timers.h" +#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); @@ -74,3 +77,75 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } + +void grpc_exec_ctx_global_init(void) {} +void grpc_exec_ctx_global_shutdown(void) {} +#else +static gpr_mu g_mu; +static gpr_cv g_cv; +static int g_threads = 0; + +static void run_closure(void *arg) { + grpc_closure *closure = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(&g_mu); + if (--g_threads == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); +} + +static void start_closure(grpc_closure *closure) { + gpr_thd_id id; + gpr_mu_lock(&g_mu); + g_threads++; + gpr_mu_unlock(&g_mu); + gpr_thd_new(&id, run_closure, closure, NULL); +} + +bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; } + +void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {} + +void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + bool success, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); + if (closure == NULL) return; + closure->final_data = success; + start_closure(closure); +} + +void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, + grpc_closure_list *list, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); + if (list == NULL) return; + grpc_closure *p = list->head; + while (p) { + grpc_closure *start = p; + p = grpc_closure_next(start); + start_closure(start); + } + grpc_closure_list r = GRPC_CLOSURE_LIST_INIT; + *list = r; +} + +void grpc_exec_ctx_global_init(void) { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); +} + +void grpc_exec_ctx_global_shutdown(void) { + gpr_mu_lock(&g_mu); + while (g_threads != 0) { + gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&g_mu); + + gpr_mu_destroy(&g_mu); + gpr_cv_destroy(&g_cv); +} +#endif diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index 9a9b2e55fa..1b627a5dcf 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -36,6 +36,14 @@ #include "src/core/iomgr/closure.h" +/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ + +/** A workqueue represents a list of work to be executed asynchronously. + Forward declared here to avoid a circular dependency with workqueue.h. */ +struct grpc_workqueue; +typedef struct grpc_workqueue grpc_workqueue; + +#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER /** Execution context. * A bag of data that collects information along a callstack. * Generally created at public API entry points, and passed down as @@ -57,13 +65,15 @@ struct grpc_exec_ctx { grpc_closure_list closure_list; }; -/** A workqueue represents a list of work to be executed asynchronously. - Forward declared here to avoid a circular dependency with workqueue.h. */ -struct grpc_workqueue; -typedef struct grpc_workqueue grpc_workqueue; - #define GRPC_EXEC_CTX_INIT \ { GRPC_CLOSURE_LIST_INIT } +#else +struct grpc_exec_ctx { + int unused; +}; +#define GRPC_EXEC_CTX_INIT \ + { 0 } +#endif /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. @@ -82,4 +92,7 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_closure_list *list, grpc_workqueue *offload_target_or_null); +void grpc_exec_ctx_global_init(void); +void grpc_exec_ctx_global_shutdown(void); + #endif diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 3fe12ca76a..f009a6d32c 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -43,6 +43,7 @@ #include <grpc/support/thd.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/timer.h" #include "src/core/support/env.h" @@ -57,6 +58,7 @@ void grpc_iomgr_init(void) { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); + grpc_exec_ctx_global_init(); grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; @@ -136,6 +138,7 @@ void grpc_iomgr_shutdown(void) { gpr_mu_unlock(&g_mu); grpc_iomgr_platform_shutdown(); + grpc_exec_ctx_global_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 92a0374ddd..ee1debfb71 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -55,7 +55,7 @@ typedef struct grpc_pollset_worker grpc_pollset_worker; size_t grpc_pollset_size(void); void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); /* Begin shutting down the pollset, and call closure when done. - * GRPC_POLLSET_MU(pollset) must be held */ + * pollset's mutex must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); /** Reset the pollset to its initial state (perhaps with some cached objects); @@ -66,16 +66,16 @@ void grpc_pollset_destroy(grpc_pollset *pollset); /* Do some work on a pollset. May involve invoking asynchronous callbacks, or actually polling file descriptors. - Requires GRPC_POLLSET_MU(pollset) locked. - May unlock GRPC_POLLSET_MU(pollset) during its execution. + Requires pollset's mutex locked. + May unlock its mutex during its execution. worker is a (platform-specific) handle that can be used to wake up from grpc_pollset_work before any events are received and before the timeout has expired. It is both initialized and destroyed by grpc_pollset_work. Initialization of worker is guaranteed to occur BEFORE the - GRPC_POLLSET_MU(pollset) is released for the first time by - grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will - not be released by grpc_pollset_work AFTER worker has been destroyed. + pollset's mutex is released for the first time by grpc_pollset_work + and it is guaranteed that it will not be released by grpc_pollset_work + AFTER worker has been destroyed. Tries not to block past deadline. May call grpc_closure_list_run on grpc_closure_list, without holding the diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index b8854ba5e5..ffc4611c07 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -106,7 +106,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { if (grpc_closure_list_empty(workqueue->closure_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } - grpc_closure_list_move(&exec_ctx->closure_list, &workqueue->closure_list); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); } @@ -122,7 +122,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { gpr_free(workqueue); } else { gpr_mu_lock(&workqueue->mu); - grpc_closure_list_move(&workqueue->closure_list, &exec_ctx->closure_list); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, |