diff options
author | 2016-03-16 08:20:04 -0700 | |
---|---|---|
committer | 2016-03-16 08:20:04 -0700 | |
commit | f4f57f07493453c57ee5fbf2e7111736151d0e3a (patch) | |
tree | 63d6f6e95043bcfb05f0955baaaa723b0423c2db /src/core/iomgr | |
parent | 66e3b02d8a970f978c7907903f04094f802c7b44 (diff) | |
parent | 92bab124695f64588c84884511e32ef7d55c5c42 (diff) |
Merge pull request #5474 from ctiller/esan
Execution context sanitizer
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/exec_ctx.c | 75 | ||||
-rw-r--r-- | src/core/iomgr/exec_ctx.h | 23 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 4 |
4 files changed, 98 insertions, 7 deletions
diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index 1fd79f6eba..893fe4515c 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -34,9 +34,12 @@ #include "src/core/iomgr/exec_ctx.h" #include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> #include "src/core/profiling/timers.h" +#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { bool did_something = 0; GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0); @@ -74,3 +77,75 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } + +void grpc_exec_ctx_global_init(void) {} +void grpc_exec_ctx_global_shutdown(void) {} +#else +static gpr_mu g_mu; +static gpr_cv g_cv; +static int g_threads = 0; + +static void run_closure(void *arg) { + grpc_closure *closure = arg; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(&g_mu); + if (--g_threads == 0) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); +} + +static void start_closure(grpc_closure *closure) { + gpr_thd_id id; + gpr_mu_lock(&g_mu); + g_threads++; + gpr_mu_unlock(&g_mu); + gpr_thd_new(&id, run_closure, closure, NULL); +} + +bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; } + +void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {} + +void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + bool success, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); + if (closure == NULL) return; + closure->final_data = success; + start_closure(closure); +} + +void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, + grpc_closure_list *list, + grpc_workqueue *offload_target_or_null) { + GPR_ASSERT(offload_target_or_null == NULL); + if (list == NULL) return; + grpc_closure *p = list->head; + while (p) { + grpc_closure *start = p; + p = grpc_closure_next(start); + start_closure(start); + } + grpc_closure_list r = GRPC_CLOSURE_LIST_INIT; + *list = r; +} + +void grpc_exec_ctx_global_init(void) { + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); +} + +void grpc_exec_ctx_global_shutdown(void) { + gpr_mu_lock(&g_mu); + while (g_threads != 0) { + gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME)); + } + gpr_mu_unlock(&g_mu); + + gpr_mu_destroy(&g_mu); + gpr_cv_destroy(&g_cv); +} +#endif diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index 9a9b2e55fa..1b627a5dcf 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -36,6 +36,14 @@ #include "src/core/iomgr/closure.h" +/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */ + +/** A workqueue represents a list of work to be executed asynchronously. + Forward declared here to avoid a circular dependency with workqueue.h. */ +struct grpc_workqueue; +typedef struct grpc_workqueue grpc_workqueue; + +#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER /** Execution context. * A bag of data that collects information along a callstack. * Generally created at public API entry points, and passed down as @@ -57,13 +65,15 @@ struct grpc_exec_ctx { grpc_closure_list closure_list; }; -/** A workqueue represents a list of work to be executed asynchronously. - Forward declared here to avoid a circular dependency with workqueue.h. */ -struct grpc_workqueue; -typedef struct grpc_workqueue grpc_workqueue; - #define GRPC_EXEC_CTX_INIT \ { GRPC_CLOSURE_LIST_INIT } +#else +struct grpc_exec_ctx { + int unused; +}; +#define GRPC_EXEC_CTX_INIT \ + { 0 } +#endif /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. @@ -82,4 +92,7 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_closure_list *list, grpc_workqueue *offload_target_or_null); +void grpc_exec_ctx_global_init(void); +void grpc_exec_ctx_global_shutdown(void); + #endif diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 9c89c2c08a..3ab4430668 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -43,6 +43,7 @@ #include <grpc/support/thd.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/timer.h" #include "src/core/support/env.h" @@ -57,6 +58,7 @@ void grpc_iomgr_init(void) { g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); + grpc_exec_ctx_global_init(); grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; @@ -138,6 +140,7 @@ void grpc_iomgr_shutdown(void) { grpc_pollset_global_shutdown(); grpc_iomgr_platform_shutdown(); + grpc_exec_ctx_global_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index c096dbfb30..2b42e6d4fb 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -107,7 +107,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { if (grpc_closure_list_empty(workqueue->closure_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } - grpc_closure_list_move(&exec_ctx->closure_list, &workqueue->closure_list); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); } @@ -123,7 +123,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { gpr_free(workqueue); } else { gpr_mu_lock(&workqueue->mu); - grpc_closure_list_move(&workqueue->closure_list, &exec_ctx->closure_list); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, |