aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-03-17 07:49:27 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-03-17 07:49:27 -0700
commit0239ba8ebf20b3f1cf454ccbf8d9a3800ac9b8c9 (patch)
treeaf21c64baffd4d3b7a6ce0a1f67ba5bba9406b57 /src/core
parent7cd6ed7da391b12b567b64361e1d682be6b574b3 (diff)
parente4ce826e3009bf0de85225fef53fae330e4496ff (diff)
Merge github.com:grpc/grpc into cleaner-posix2
Diffstat (limited to 'src/core')
-rw-r--r--src/core/iomgr/exec_ctx.c75
-rw-r--r--src/core/iomgr/exec_ctx.h23
-rw-r--r--src/core/iomgr/iomgr.c3
-rw-r--r--src/core/iomgr/pollset.h12
-rw-r--r--src/core/iomgr/workqueue_posix.c4
5 files changed, 104 insertions, 13 deletions
diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c
index 1fd79f6eba..893fe4515c 100644
--- a/src/core/iomgr/exec_ctx.c
+++ b/src/core/iomgr/exec_ctx.c
@@ -34,9 +34,12 @@
#include "src/core/iomgr/exec_ctx.h"
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/thd.h>
#include "src/core/profiling/timers.h"
+#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
@@ -74,3 +77,75 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
GPR_ASSERT(offload_target_or_null == NULL);
grpc_closure_list_move(list, &exec_ctx->closure_list);
}
+
+void grpc_exec_ctx_global_init(void) {}
+void grpc_exec_ctx_global_shutdown(void) {}
+#else
+static gpr_mu g_mu;
+static gpr_cv g_cv;
+static int g_threads = 0;
+
+static void run_closure(void *arg) {
+ grpc_closure *closure = arg;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0);
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(&g_mu);
+ if (--g_threads == 0) {
+ gpr_cv_signal(&g_cv);
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+static void start_closure(grpc_closure *closure) {
+ gpr_thd_id id;
+ gpr_mu_lock(&g_mu);
+ g_threads++;
+ gpr_mu_unlock(&g_mu);
+ gpr_thd_new(&id, run_closure, closure, NULL);
+}
+
+bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; }
+
+void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {}
+
+void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ bool success,
+ grpc_workqueue *offload_target_or_null) {
+ GPR_ASSERT(offload_target_or_null == NULL);
+ if (closure == NULL) return;
+ closure->final_data = success;
+ start_closure(closure);
+}
+
+void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
+ grpc_closure_list *list,
+ grpc_workqueue *offload_target_or_null) {
+ GPR_ASSERT(offload_target_or_null == NULL);
+ if (list == NULL) return;
+ grpc_closure *p = list->head;
+ while (p) {
+ grpc_closure *start = p;
+ p = grpc_closure_next(start);
+ start_closure(start);
+ }
+ grpc_closure_list r = GRPC_CLOSURE_LIST_INIT;
+ *list = r;
+}
+
+void grpc_exec_ctx_global_init(void) {
+ gpr_mu_init(&g_mu);
+ gpr_cv_init(&g_cv);
+}
+
+void grpc_exec_ctx_global_shutdown(void) {
+ gpr_mu_lock(&g_mu);
+ while (g_threads != 0) {
+ gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
+ }
+ gpr_mu_unlock(&g_mu);
+
+ gpr_mu_destroy(&g_mu);
+ gpr_cv_destroy(&g_cv);
+}
+#endif
diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h
index 9a9b2e55fa..1b627a5dcf 100644
--- a/src/core/iomgr/exec_ctx.h
+++ b/src/core/iomgr/exec_ctx.h
@@ -36,6 +36,14 @@
#include "src/core/iomgr/closure.h"
+/* #define GRPC_EXECUTION_CONTEXT_SANITIZER 1 */
+
+/** A workqueue represents a list of work to be executed asynchronously.
+ Forward declared here to avoid a circular dependency with workqueue.h. */
+struct grpc_workqueue;
+typedef struct grpc_workqueue grpc_workqueue;
+
+#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
/** Execution context.
* A bag of data that collects information along a callstack.
* Generally created at public API entry points, and passed down as
@@ -57,13 +65,15 @@ struct grpc_exec_ctx {
grpc_closure_list closure_list;
};
-/** A workqueue represents a list of work to be executed asynchronously.
- Forward declared here to avoid a circular dependency with workqueue.h. */
-struct grpc_workqueue;
-typedef struct grpc_workqueue grpc_workqueue;
-
#define GRPC_EXEC_CTX_INIT \
{ GRPC_CLOSURE_LIST_INIT }
+#else
+struct grpc_exec_ctx {
+ int unused;
+};
+#define GRPC_EXEC_CTX_INIT \
+ { 0 }
+#endif
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
@@ -82,4 +92,7 @@ void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
grpc_closure_list *list,
grpc_workqueue *offload_target_or_null);
+void grpc_exec_ctx_global_init(void);
+void grpc_exec_ctx_global_shutdown(void);
+
#endif
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 3fe12ca76a..f009a6d32c 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -43,6 +43,7 @@
#include <grpc/support/thd.h>
#include <grpc/support/useful.h>
+#include "src/core/iomgr/exec_ctx.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/timer.h"
#include "src/core/support/env.h"
@@ -57,6 +58,7 @@ void grpc_iomgr_init(void) {
g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
+ grpc_exec_ctx_global_init();
grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC));
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
@@ -136,6 +138,7 @@ void grpc_iomgr_shutdown(void) {
gpr_mu_unlock(&g_mu);
grpc_iomgr_platform_shutdown();
+ grpc_exec_ctx_global_shutdown();
gpr_mu_destroy(&g_mu);
gpr_cv_destroy(&g_rcv);
}
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index 92a0374ddd..ee1debfb71 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -55,7 +55,7 @@ typedef struct grpc_pollset_worker grpc_pollset_worker;
size_t grpc_pollset_size(void);
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu);
/* Begin shutting down the pollset, and call closure when done.
- * GRPC_POLLSET_MU(pollset) must be held */
+ * pollset's mutex must be held */
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure);
/** Reset the pollset to its initial state (perhaps with some cached objects);
@@ -66,16 +66,16 @@ void grpc_pollset_destroy(grpc_pollset *pollset);
/* Do some work on a pollset.
May involve invoking asynchronous callbacks, or actually polling file
descriptors.
- Requires GRPC_POLLSET_MU(pollset) locked.
- May unlock GRPC_POLLSET_MU(pollset) during its execution.
+ Requires pollset's mutex locked.
+ May unlock its mutex during its execution.
worker is a (platform-specific) handle that can be used to wake up
from grpc_pollset_work before any events are received and before the timeout
has expired. It is both initialized and destroyed by grpc_pollset_work.
Initialization of worker is guaranteed to occur BEFORE the
- GRPC_POLLSET_MU(pollset) is released for the first time by
- grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will
- not be released by grpc_pollset_work AFTER worker has been destroyed.
+ pollset's mutex is released for the first time by grpc_pollset_work
+ and it is guaranteed that it will not be released by grpc_pollset_work
+ AFTER worker has been destroyed.
Tries not to block past deadline.
May call grpc_closure_list_run on grpc_closure_list, without holding the
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
index b8854ba5e5..ffc4611c07 100644
--- a/src/core/iomgr/workqueue_posix.c
+++ b/src/core/iomgr/workqueue_posix.c
@@ -106,7 +106,7 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
if (grpc_closure_list_empty(workqueue->closure_list)) {
grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
}
- grpc_closure_list_move(&exec_ctx->closure_list, &workqueue->closure_list);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
gpr_mu_unlock(&workqueue->mu);
}
@@ -122,7 +122,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
gpr_free(workqueue);
} else {
gpr_mu_lock(&workqueue->mu);
- grpc_closure_list_move(&workqueue->closure_list, &exec_ctx->closure_list);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL);
grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
gpr_mu_unlock(&workqueue->mu);
grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd,