aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/README.md6
-rw-r--r--src/core/lib/iomgr/closure.c38
-rw-r--r--src/core/lib/iomgr/closure.h45
-rw-r--r--src/core/lib/iomgr/combiner.c110
-rw-r--r--src/core/lib/iomgr/combiner.h14
-rw-r--r--src/core/lib/iomgr/ev_epoll_linux.c38
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c26
-rw-r--r--src/core/lib/iomgr/ev_posix.c5
-rw-r--r--src/core/lib/iomgr/ev_posix.h3
-rw-r--r--src/core/lib/iomgr/exec_ctx.c134
-rw-r--r--src/core/lib/iomgr/exec_ctx.h31
-rw-r--r--src/core/lib/iomgr/executor.c37
-rw-r--r--src/core/lib/iomgr/executor.h6
-rw-r--r--src/core/lib/iomgr/iocp_windows.c1
-rw-r--r--src/core/lib/iomgr/iomgr.c12
-rw-r--r--src/core/lib/iomgr/iomgr.h6
-rw-r--r--src/core/lib/iomgr/pollset_uv.c2
-rw-r--r--src/core/lib/iomgr/pollset_windows.c5
-rw-r--r--src/core/lib/iomgr/resolve_address.h2
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c19
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c15
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c16
-rw-r--r--src/core/lib/iomgr/resource_quota.c146
-rw-r--r--src/core/lib/iomgr/resource_quota.h4
-rw-r--r--src/core/lib/iomgr/socket_mutator.c2
-rw-r--r--src/core/lib/iomgr/socket_windows.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c29
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c13
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c18
-rw-r--r--src/core/lib/iomgr/tcp_posix.c45
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c26
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c12
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c29
-rw-r--r--src/core/lib/iomgr/tcp_uv.c14
-rw-r--r--src/core/lib/iomgr/tcp_windows.c34
-rw-r--r--src/core/lib/iomgr/timer.h16
-rw-r--r--src/core/lib/iomgr/timer_generic.c19
-rw-r--r--src/core/lib/iomgr/timer_generic.h2
-rw-r--r--src/core/lib/iomgr/timer_uv.c12
-rw-r--r--src/core/lib/iomgr/timer_uv.h2
-rw-r--r--src/core/lib/iomgr/udp_server.c10
-rw-r--r--src/core/lib/iomgr/workqueue.h11
-rw-r--r--src/core/lib/iomgr/workqueue_uv.c5
-rw-r--r--src/core/lib/iomgr/workqueue_windows.c5
44 files changed, 533 insertions, 496 deletions
diff --git a/src/core/lib/iomgr/README.md b/src/core/lib/iomgr/README.md
new file mode 100644
index 0000000000..9b22b76ceb
--- /dev/null
+++ b/src/core/lib/iomgr/README.md
@@ -0,0 +1,6 @@
+# iomgr
+
+Platform abstractions for I/O (mostly network).
+
+Provides abstractions over TCP/UDP I/O, file loading, polling, and concurrency
+management for various operating systems.
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index c6ddc76732..da0ec878a3 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -37,10 +37,13 @@
#include "src/core/lib/profiling/timers.h"
-void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg) {
+grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
closure->cb = cb;
closure->cb_arg = cb_arg;
+ closure->scheduler = scheduler;
+ return closure;
}
void grpc_closure_list_init(grpc_closure_list *closure_list) {
@@ -105,11 +108,12 @@ static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg,
cb(exec_ctx, cb_arg, error);
}
-grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) {
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
wrapped_closure *wc = gpr_malloc(sizeof(*wc));
wc->cb = cb;
wc->cb_arg = cb_arg;
- grpc_closure_init(&wc->wrapper, closure_wrapper, wc);
+ grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler);
return &wc->wrapper;
}
@@ -117,8 +121,30 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c,
grpc_error *error) {
GPR_TIMER_BEGIN("grpc_closure_run", 0);
if (c != NULL) {
- c->cb(exec_ctx, c->cb_arg, error);
+ c->scheduler->vtable->run(exec_ctx, c, error);
+ } else {
+ GRPC_ERROR_UNREF(error);
}
- GRPC_ERROR_UNREF(error);
GPR_TIMER_END("grpc_closure_run", 0);
}
+
+void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c,
+ grpc_error *error) {
+ GPR_TIMER_BEGIN("grpc_closure_sched", 0);
+ if (c != NULL) {
+ c->scheduler->vtable->sched(exec_ctx, c, error);
+ } else {
+ GRPC_ERROR_UNREF(error);
+ }
+ GPR_TIMER_END("grpc_closure_sched", 0);
+}
+
+void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+ grpc_closure *c = list->head;
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error);
+ c = next;
+ }
+ list->head = list->tail = NULL;
+}
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 2b4b271eaa..ee386fbc76 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -35,6 +35,8 @@
#define GRPC_CORE_LIB_IOMGR_CLOSURE_H
#include <grpc/support/port_platform.h>
+
+#include <grpc/impl/codegen/exec_ctx_fwd.h>
#include <stdbool.h>
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/support/mpscq.h"
@@ -42,10 +44,6 @@
struct grpc_closure;
typedef struct grpc_closure grpc_closure;
-/* forward declaration for exec_ctx.h */
-struct grpc_exec_ctx;
-typedef struct grpc_exec_ctx grpc_exec_ctx;
-
typedef struct grpc_closure_list {
grpc_closure *head;
grpc_closure *tail;
@@ -59,6 +57,22 @@ typedef struct grpc_closure_list {
typedef void (*grpc_iomgr_cb_func)(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error);
+typedef struct grpc_closure_scheduler grpc_closure_scheduler;
+
+typedef struct grpc_closure_scheduler_vtable {
+ /* NOTE: for all these functions, closure->scheduler == the scheduler that was
+ used to find this vtable */
+ void (*run)(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+ void (*sched)(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+} grpc_closure_scheduler_vtable;
+
+/** Abstract type that can schedule closures for execution */
+struct grpc_closure_scheduler {
+ const grpc_closure_scheduler_vtable *vtable;
+};
+
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
/** Once queued, next indicates the next queued closure; before then, scratch
@@ -75,6 +89,10 @@ struct grpc_closure {
/** Arguments to be passed to "cb". */
void *cb_arg;
+ /** Scheduler to schedule against: NULL to schedule against current execution
+ context */
+ grpc_closure_scheduler *scheduler;
+
/** Once queued, the result of the closure. Before then: scratch space */
union {
grpc_error *error;
@@ -82,12 +100,14 @@ struct grpc_closure {
} error_data;
};
-/** Initializes \a closure with \a cb and \a cb_arg. */
-void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
- void *cb_arg);
+/** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */
+grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler);
/* Create a heap allocated closure: try to avoid except for very rare events */
-grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg);
+grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler);
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
@@ -115,4 +135,13 @@ bool grpc_closure_list_empty(grpc_closure_list list);
void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
+/** Schedule a closure to be run. Does not need to be run from a safe point. */
+void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
+
+/** Schedule all closures in a list to be run. Does not need to be run from a
+ * safe point. */
+void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx,
+ grpc_closure_list *closure_list);
+
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index cfc67020ae..c26a73b2b7 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -56,6 +56,10 @@ int grpc_combiner_trace = 0;
struct grpc_combiner {
grpc_combiner *next_combiner_on_this_exec_ctx;
grpc_workqueue *optional_workqueue;
+ grpc_closure_scheduler uncovered_scheduler;
+ grpc_closure_scheduler covered_scheduler;
+ grpc_closure_scheduler uncovered_finally_scheduler;
+ grpc_closure_scheduler covered_finally_scheduler;
gpr_mpscq queue;
// state is:
// lower bit - zero if orphaned (STATE_UNORPHANED)
@@ -70,6 +74,26 @@ struct grpc_combiner {
grpc_closure offload;
};
+static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+static void combiner_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure,
+ grpc_error *error);
+static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure,
+ grpc_error *error);
+
+static const grpc_closure_scheduler_vtable scheduler_uncovered = {
+ combiner_exec_uncovered, combiner_exec_uncovered};
+static const grpc_closure_scheduler_vtable scheduler_covered = {
+ combiner_exec_covered, combiner_exec_covered};
+static const grpc_closure_scheduler_vtable finally_scheduler_uncovered = {
+ combiner_finally_exec_uncovered, combiner_finally_exec_uncovered};
+static const grpc_closure_scheduler_vtable finally_scheduler_covered = {
+ combiner_finally_exec_covered, combiner_finally_exec_covered};
+
static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error);
typedef struct {
@@ -102,11 +126,16 @@ grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue) {
lock->time_to_execute_final_list = false;
lock->optional_workqueue = optional_workqueue;
lock->final_list_covered_by_poller = false;
+ lock->uncovered_scheduler.vtable = &scheduler_uncovered;
+ lock->covered_scheduler.vtable = &scheduler_covered;
+ lock->uncovered_finally_scheduler.vtable = &finally_scheduler_uncovered;
+ lock->covered_finally_scheduler.vtable = &finally_scheduler_covered;
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_atm_no_barrier_store(&lock->elements_covered_by_poller, 0);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- grpc_closure_init(&lock->offload, offload, lock);
+ grpc_closure_init(&lock->offload, offload, lock,
+ grpc_workqueue_scheduler(lock->optional_workqueue));
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -148,9 +177,9 @@ static void push_first_on_exec_ctx(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *cl, grpc_error *error,
- bool covered_by_poller) {
+static void combiner_exec(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
+ grpc_closure *cl, grpc_error *error,
+ bool covered_by_poller) {
GPR_TIMER_BEGIN("combiner.execute", 0);
gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
GRPC_COMBINER_TRACE(gpr_log(
@@ -171,6 +200,24 @@ void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
GPR_TIMER_END("combiner.execute", 0);
}
+#define COMBINER_FROM_CLOSURE_SCHEDULER(closure, scheduler_name) \
+ ((grpc_combiner *)(((char *)((closure)->scheduler)) - \
+ offsetof(grpc_combiner, scheduler_name)))
+
+static void combiner_exec_uncovered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
+ combiner_exec(exec_ctx,
+ COMBINER_FROM_CLOSURE_SCHEDULER(cl, uncovered_scheduler), cl,
+ error, false);
+}
+
+static void combiner_exec_covered(grpc_exec_ctx *exec_ctx, grpc_closure *cl,
+ grpc_error *error) {
+ combiner_exec(exec_ctx,
+ COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_scheduler), cl,
+ error, true);
+}
+
static void move_next(grpc_exec_ctx *exec_ctx) {
exec_ctx->active_combiner =
exec_ctx->active_combiner->next_combiner_on_this_exec_ctx;
@@ -188,8 +235,7 @@ static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
move_next(exec_ctx);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload --> %p", lock,
lock->optional_workqueue));
- grpc_workqueue_enqueue(exec_ctx, lock->optional_workqueue, &lock->offload,
- GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
}
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
@@ -312,23 +358,22 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
}
static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
- grpc_error *error) {
- grpc_combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
- GRPC_ERROR_REF(error), false);
-}
+ grpc_error *error);
-void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller) {
+static void combiner_execute_finally(grpc_exec_ctx *exec_ctx,
+ grpc_combiner *lock, grpc_closure *closure,
+ grpc_error *error,
+ bool covered_by_poller) {
GRPC_COMBINER_TRACE(gpr_log(
GPR_DEBUG, "C:%p grpc_combiner_execute_finally c=%p; ac=%p; cov=%d", lock,
closure, exec_ctx->active_combiner, covered_by_poller));
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
- grpc_combiner_execute(exec_ctx, lock,
- grpc_closure_create(enqueue_finally, closure), error,
- false);
+ grpc_closure_sched(
+ exec_ctx, grpc_closure_create(enqueue_finally, closure,
+ grpc_combiner_scheduler(lock, false)),
+ error);
GPR_TIMER_END("combiner.execute_finally", 0);
return;
}
@@ -342,3 +387,36 @@ void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
grpc_closure_list_append(&lock->final_list, closure, error);
GPR_TIMER_END("combiner.execute_finally", 0);
}
+
+static void enqueue_finally(grpc_exec_ctx *exec_ctx, void *closure,
+ grpc_error *error) {
+ combiner_execute_finally(exec_ctx, exec_ctx->active_combiner, closure,
+ GRPC_ERROR_REF(error), false);
+}
+
+static void combiner_finally_exec_uncovered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *cl,
+ grpc_error *error) {
+ combiner_execute_finally(exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(
+ cl, uncovered_finally_scheduler),
+ cl, error, false);
+}
+
+static void combiner_finally_exec_covered(grpc_exec_ctx *exec_ctx,
+ grpc_closure *cl, grpc_error *error) {
+ combiner_execute_finally(
+ exec_ctx, COMBINER_FROM_CLOSURE_SCHEDULER(cl, covered_finally_scheduler),
+ cl, error, true);
+}
+
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *combiner,
+ bool covered_by_poller) {
+ return covered_by_poller ? &combiner->covered_scheduler
+ : &combiner->uncovered_scheduler;
+}
+
+grpc_closure_scheduler *grpc_combiner_finally_scheduler(
+ grpc_combiner *combiner, bool covered_by_poller) {
+ return covered_by_poller ? &combiner->covered_finally_scheduler
+ : &combiner->uncovered_finally_scheduler;
+}
diff --git a/src/core/lib/iomgr/combiner.h b/src/core/lib/iomgr/combiner.h
index d04eeed83a..81dff85d40 100644
--- a/src/core/lib/iomgr/combiner.h
+++ b/src/core/lib/iomgr/combiner.h
@@ -50,14 +50,12 @@
grpc_combiner *grpc_combiner_create(grpc_workqueue *optional_workqueue);
// Destroy the lock
void grpc_combiner_destroy(grpc_exec_ctx *exec_ctx, grpc_combiner *lock);
-// Execute \a action within the lock.
-void grpc_combiner_execute(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller);
-// Execute \a action within the lock just prior to unlocking.
-void grpc_combiner_execute_finally(grpc_exec_ctx *exec_ctx, grpc_combiner *lock,
- grpc_closure *closure, grpc_error *error,
- bool covered_by_poller);
+// Fetch a scheduler to schedule closures against
+grpc_closure_scheduler *grpc_combiner_scheduler(grpc_combiner *lock,
+ bool covered_by_poller);
+// Scheduler to execute \a action within the lock just prior to unlocking.
+grpc_closure_scheduler *grpc_combiner_finally_scheduler(grpc_combiner *lock,
+ bool covered_by_poller);
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx);
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c
index 1b15e0eb4f..d6664aead2 100644
--- a/src/core/lib/iomgr/ev_epoll_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_linux.c
@@ -31,7 +31,6 @@
*
*/
-#include <grpc/grpc_posix.h>
#include "src/core/lib/iomgr/port.h"
/* This polling engine is only relevant on linux kernels supporting epoll() */
@@ -202,6 +201,8 @@ static void fd_global_shutdown(void);
/* This is also used as grpc_workqueue (by directly casing it) */
typedef struct polling_island {
+ grpc_closure_scheduler workqueue_scheduler;
+
gpr_mu mu;
/* Ref count. Use PI_ADD_REF() and PI_UNREF() macros to increment/decrement
the refcount.
@@ -305,6 +306,8 @@ static __thread polling_island *g_current_thread_polling_island;
/* Forward declaration */
static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error);
#ifdef GRPC_TSAN
/* Currently TSAN may incorrectly flag data races between epoll_ctl and
@@ -317,6 +320,9 @@ static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi);
gpr_atm g_epoll_sync;
#endif /* defined(GRPC_TSAN) */
+static const grpc_closure_scheduler_vtable workqueue_scheduler_vtable = {
+ workqueue_enqueue, workqueue_enqueue};
+
static void pi_add_ref(polling_island *pi);
static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi);
@@ -529,6 +535,7 @@ static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx,
*error = GRPC_ERROR_NONE;
pi = gpr_malloc(sizeof(*pi));
+ pi->workqueue_scheduler.vtable = &workqueue_scheduler_vtable;
gpr_mu_init(&pi->mu);
pi->fd_cnt = 0;
pi->fd_capacity = 0;
@@ -800,10 +807,10 @@ static polling_island *polling_island_merge(polling_island *p,
return q;
}
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue, grpc_closure *closure,
+static void workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
GPR_TIMER_BEGIN("workqueue.enqueue", 0);
+ grpc_workqueue *workqueue = (grpc_workqueue *)closure->scheduler;
/* take a ref to the workqueue: otherwise it can happen that whatever events
* this kicks off ends up destroying the workqueue before this function
* completes */
@@ -820,6 +827,12 @@ static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
GPR_TIMER_END("workqueue.enqueue", 0);
}
+static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
+ polling_island *pi = (polling_island *)workqueue;
+ return workqueue == NULL ? grpc_schedule_on_exec_ctx
+ : &pi->workqueue_scheduler;
+}
+
static grpc_error *polling_island_global_init() {
grpc_error *error = GRPC_ERROR_NONE;
@@ -1030,8 +1043,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->po.pi = NULL;
}
- grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error),
- NULL);
+ grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
@@ -1057,16 +1069,14 @@ static grpc_error *fd_shutdown_error(bool shutdown) {
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (fd->shutdown) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
} else {
/* upcallptr was set to a different closure. This is an error! */
gpr_log(GPR_ERROR,
@@ -1088,7 +1098,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
+ grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -1359,7 +1369,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
/* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->po.mu lock must be held by the caller before calling this */
@@ -1410,7 +1420,9 @@ static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx,
workqueue_maybe_wakeup(pi);
}
grpc_closure *c = (grpc_closure *)n;
- grpc_closure_run(exec_ctx, c, c->error_data.error);
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
return true;
} else if (gpr_atm_no_barrier_load(&pi->workqueue_item_count) > 0) {
/* n == NULL might mean there's work but it's not available to be popped
@@ -1959,7 +1971,7 @@ static const grpc_event_engine_vtable vtable = {
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
- .workqueue_enqueue = workqueue_enqueue,
+ .workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 486181df0d..9477ac3688 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -397,7 +397,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
if (!fd->released) {
close(fd->fd);
}
- grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
}
static int fd_wrapped_fd(grpc_fd *fd) {
@@ -455,16 +455,14 @@ static grpc_error *fd_shutdown_error(bool shutdown) {
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (fd->shutdown) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("FD shutdown"));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
*st = closure;
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@@ -487,7 +485,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL);
+ grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -850,7 +848,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
}
pollset->fd_count = 0;
- grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
static void work_combine_error(grpc_error **composite, grpc_error *error) {
@@ -899,7 +897,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (!pollset_has_workers(pollset) &&
!grpc_closure_list_empty(pollset->idle_jobs)) {
GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
goto done;
}
/* If we're shutting down then we don't execute any extended work */
@@ -1079,7 +1077,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
gpr_mu_unlock(&pollset->mu);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
@@ -1098,7 +1096,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutdown_done = closure;
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset_has_workers(pollset)) {
- grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
}
if (!pollset->called_shutdown && !pollset_has_workers(pollset)) {
pollset->called_shutdown = 1;
@@ -1286,10 +1284,8 @@ static void workqueue_unref(grpc_exec_ctx *exec_ctx,
grpc_workqueue *workqueue) {}
#endif
-static void workqueue_enqueue(grpc_exec_ctx *exec_ctx,
- grpc_workqueue *workqueue, grpc_closure *closure,
- grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+static grpc_closure_scheduler *workqueue_scheduler(grpc_workqueue *workqueue) {
+ return grpc_schedule_on_exec_ctx;
}
/*******************************************************************************
@@ -1532,7 +1528,7 @@ static const grpc_event_engine_vtable vtable = {
.workqueue_ref = workqueue_ref,
.workqueue_unref = workqueue_unref,
- .workqueue_enqueue = workqueue_enqueue,
+ .workqueue_scheduler = workqueue_scheduler,
.shutdown_engine = shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index ab139895fd..2975d619e1 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -275,9 +275,8 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {
}
#endif
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- g_event_engine->workqueue_enqueue(exec_ctx, workqueue, closure, error);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
+ return g_event_engine->workqueue_scheduler(workqueue);
}
#endif // GRPC_POSIX_SOCKET
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index cb5832539d..1068a4bad5 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -106,8 +106,7 @@ typedef struct grpc_event_engine_vtable {
grpc_workqueue *(*workqueue_ref)(grpc_workqueue *workqueue);
void (*workqueue_unref)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
- void (*workqueue_enqueue)(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error);
+ grpc_closure_scheduler *(*workqueue_scheduler)(grpc_workqueue *workqueue);
} grpc_event_engine_vtable;
void grpc_event_engine_init(void);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 604713e578..6aa788f8e5 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -57,7 +57,6 @@ bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
return true;
}
-#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
GPR_TIMER_BEGIN("grpc_exec_ctx_flush", 0);
@@ -67,8 +66,10 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL;
while (c != NULL) {
grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
did_something = true;
- grpc_closure_run(exec_ctx, c, c->error_data.error);
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
c = next;
}
} else if (!grpc_combiner_continue_exec_ctx(exec_ctx)) {
@@ -76,30 +77,6 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
}
}
GPR_ASSERT(exec_ctx->active_combiner == NULL);
- if (exec_ctx->stealing_from_workqueue != NULL) {
- if (grpc_exec_ctx_ready_to_finish(exec_ctx)) {
- grpc_workqueue_enqueue(exec_ctx, exec_ctx->stealing_from_workqueue,
- exec_ctx->stolen_closure,
- exec_ctx->stolen_closure->error_data.error);
- GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
- "exec_ctx_sched");
- exec_ctx->stealing_from_workqueue = NULL;
- exec_ctx->stolen_closure = NULL;
- } else {
- grpc_closure *c = exec_ctx->stolen_closure;
- GRPC_WORKQUEUE_UNREF(exec_ctx, exec_ctx->stealing_from_workqueue,
- "exec_ctx_sched");
- exec_ctx->stealing_from_workqueue = NULL;
- exec_ctx->stolen_closure = NULL;
- grpc_error *error = c->error_data.error;
- GPR_TIMER_BEGIN("grpc_exec_ctx_flush.stolen_cb", 0);
- c->cb(exec_ctx, c->cb_arg, error);
- GRPC_ERROR_UNREF(error);
- GPR_TIMER_END("grpc_exec_ctx_flush.stolen_cb", 0);
- grpc_exec_ctx_flush(exec_ctx);
- did_something = true;
- }
- }
GPR_TIMER_END("grpc_exec_ctx_flush", 0);
return did_something;
}
@@ -109,104 +86,21 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
grpc_exec_ctx_flush(exec_ctx);
}
-void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error,
- grpc_workqueue *offload_target_or_null) {
- GPR_TIMER_BEGIN("grpc_exec_ctx_sched", 0);
- if (offload_target_or_null == NULL) {
- grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
- } else if (exec_ctx->stealing_from_workqueue == NULL) {
- exec_ctx->stealing_from_workqueue = offload_target_or_null;
- closure->error_data.error = error;
- exec_ctx->stolen_closure = closure;
- } else if (exec_ctx->stealing_from_workqueue != offload_target_or_null) {
- grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error);
- GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
- } else { /* stealing_from_workqueue == offload_target_or_null */
- grpc_workqueue_enqueue(exec_ctx, offload_target_or_null,
- exec_ctx->stolen_closure,
- exec_ctx->stolen_closure->error_data.error);
- closure->error_data.error = error;
- exec_ctx->stolen_closure = closure;
- GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched");
- }
- GPR_TIMER_END("grpc_exec_ctx_sched", 0);
+static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ closure->cb(exec_ctx, closure->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
}
-void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list,
- grpc_workqueue *offload_target_or_null) {
- grpc_closure_list_move(list, &exec_ctx->closure_list);
+static void exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
+ grpc_closure_list_append(&exec_ctx->closure_list, closure, error);
}
void grpc_exec_ctx_global_init(void) {}
void grpc_exec_ctx_global_shutdown(void) {}
-#else
-static gpr_mu g_mu;
-static gpr_cv g_cv;
-static int g_threads = 0;
-
-static void run_closure(void *arg) {
- grpc_closure *closure = arg;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- closure->cb(&exec_ctx, closure->cb_arg, (closure->final_data & 1) != 0);
- grpc_exec_ctx_finish(&exec_ctx);
- gpr_mu_lock(&g_mu);
- if (--g_threads == 0) {
- gpr_cv_signal(&g_cv);
- }
- gpr_mu_unlock(&g_mu);
-}
-
-static void start_closure(grpc_closure *closure) {
- gpr_thd_id id;
- gpr_mu_lock(&g_mu);
- g_threads++;
- gpr_mu_unlock(&g_mu);
- gpr_thd_new(&id, run_closure, closure, NULL);
-}
-
-bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { return false; }
-
-void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {}
-void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- bool success,
- grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
- if (closure == NULL) return;
- closure->final_data = success;
- start_closure(closure);
-}
-
-void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list,
- grpc_workqueue *offload_target_or_null) {
- GPR_ASSERT(offload_target_or_null == NULL);
- if (list == NULL) return;
- grpc_closure *p = list->head;
- while (p) {
- grpc_closure *start = p;
- p = grpc_closure_next(start);
- start_closure(start);
- }
- grpc_closure_list r = GRPC_CLOSURE_LIST_INIT;
- *list = r;
-}
-
-void grpc_exec_ctx_global_init(void) {
- gpr_mu_init(&g_mu);
- gpr_cv_init(&g_cv);
-}
-
-void grpc_exec_ctx_global_shutdown(void) {
- gpr_mu_lock(&g_mu);
- while (g_threads != 0) {
- gpr_cv_wait(&g_cv, &g_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
- }
- gpr_mu_unlock(&g_mu);
-
- gpr_mu_destroy(&g_mu);
- gpr_cv_destroy(&g_cv);
-}
-#endif
+static const grpc_closure_scheduler_vtable exec_ctx_scheduler_vtable = {
+ exec_ctx_run, exec_ctx_sched};
+static grpc_closure_scheduler exec_ctx_scheduler = {&exec_ctx_scheduler_vtable};
+grpc_closure_scheduler *grpc_schedule_on_exec_ctx = &exec_ctx_scheduler;
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 7e50cb9825..e566f1b3e8 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -66,17 +66,6 @@ typedef struct grpc_combiner grpc_combiner;
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
struct grpc_exec_ctx {
grpc_closure_list closure_list;
- /** The workqueue we're stealing work from.
- As items are queued to the execution context, we try to steal one
- workqueue item and execute it inline (assuming the exec_ctx is not
- finished) - doing so does not invalidate the workqueue's contract, and
- provides a small latency win in cases where we get a hit */
- grpc_workqueue *stealing_from_workqueue;
- /** The workqueue item that was stolen from the workqueue above. When new
- items are scheduled to be offloaded to that workqueue, we need to update
- this like a 1-deep fifo to maintain the invariant that workqueue items
- queued by one thread are started in order */
- grpc_closure *stolen_closure;
/** currently active combiner: updated only via combiner.c */
grpc_combiner *active_combiner;
/** last active combiner in the active combiner list */
@@ -89,10 +78,7 @@ struct grpc_exec_ctx {
/* initializer for grpc_exec_ctx:
prefer to use GRPC_EXEC_CTX_INIT whenever possible */
#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
- { \
- GRPC_CLOSURE_LIST_INIT, NULL, NULL, NULL, NULL, false, finish_check_arg, \
- finish_check \
- }
+ { GRPC_CLOSURE_LIST_INIT, NULL, NULL, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
bool cached_ready_to_finish;
@@ -108,6 +94,8 @@ struct grpc_exec_ctx {
#define GRPC_EXEC_CTX_INIT \
GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_always_ready_to_finish, NULL)
+extern grpc_closure_scheduler *grpc_schedule_on_exec_ctx;
+
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
* Returns true if work was performed, false otherwise. */
@@ -115,14 +103,6 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx);
/** Finish any pending work for a grpc_exec_ctx. Must be called before
* the instance is destroyed, or work may be lost. */
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
-/** Add a closure to be executed in the future.
- If \a offload_target_or_null is NULL, the closure will be executed at the
- next exec_ctx.{finish,flush} point.
- If \a offload_target_or_null is non-NULL, the closure will be scheduled
- against the workqueue, and a reference to the workqueue will be consumed. */
-void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
- grpc_error *error,
- grpc_workqueue *offload_target_or_null);
/** Returns true if we'd like to leave this execution context as soon as
possible: useful for deciding whether to do something more or not depending
on outside context */
@@ -131,11 +111,6 @@ bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx);
bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
/** A finish check that is always ready to finish */
bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
-/** Add a list of closures to be executed at the next flush/finish point.
- * Leaves \a list empty. */
-void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
- grpc_closure_list *list,
- grpc_workqueue *offload_target_or_null);
void grpc_exec_ctx_global_init(void);
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 8d7535d6fe..852775564f 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -77,10 +77,18 @@ static void closure_exec_thread_func(void *ignored) {
gpr_mu_unlock(&g_executor.mu);
break;
} else {
- grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL);
+ grpc_closure *c = g_executor.closures.head;
+ grpc_closure_list_init(&g_executor.closures);
+ gpr_mu_unlock(&g_executor.mu);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(&exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ }
+ grpc_exec_ctx_flush(&exec_ctx);
}
- gpr_mu_unlock(&g_executor.mu);
- grpc_exec_ctx_flush(&exec_ctx);
}
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -112,7 +120,8 @@ static void maybe_spawn_locked() {
g_executor.pending_join = 1;
}
-void grpc_executor_push(grpc_closure *closure, grpc_error *error) {
+static void executor_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
+ grpc_error *error) {
gpr_mu_lock(&g_executor.mu);
if (g_executor.shutting_down == 0) {
grpc_closure_list_append(&g_executor.closures, closure, error);
@@ -121,9 +130,8 @@ void grpc_executor_push(grpc_closure *closure, grpc_error *error) {
gpr_mu_unlock(&g_executor.mu);
}
-void grpc_executor_shutdown() {
+void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx) {
int pending_join;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_mu_lock(&g_executor.mu);
pending_join = g_executor.pending_join;
@@ -133,11 +141,24 @@ void grpc_executor_shutdown() {
* list below because we aren't accepting new work */
/* Execute pending callbacks, some may be performing cleanups */
- grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures, NULL);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_closure *c = g_executor.closures.head;
+ grpc_closure_list_init(&g_executor.closures);
+ while (c != NULL) {
+ grpc_closure *next = c->next_data.next;
+ grpc_error *error = c->error_data.error;
+ c->cb(exec_ctx, c->cb_arg, error);
+ GRPC_ERROR_UNREF(error);
+ c = next;
+ }
+ grpc_exec_ctx_flush(exec_ctx);
GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
if (pending_join) {
gpr_thd_join(g_executor.tid);
}
gpr_mu_destroy(&g_executor.mu);
}
+
+static const grpc_closure_scheduler_vtable executor_vtable = {executor_push,
+ executor_push};
+static grpc_closure_scheduler executor_scheduler = {&executor_vtable};
+grpc_closure_scheduler *grpc_executor_scheduler = &executor_scheduler;
diff --git a/src/core/lib/iomgr/executor.h b/src/core/lib/iomgr/executor.h
index da9dcd07d0..1213016383 100644
--- a/src/core/lib/iomgr/executor.h
+++ b/src/core/lib/iomgr/executor.h
@@ -43,11 +43,9 @@
* non-blocking solution available. */
void grpc_executor_init();
-/** Enqueue \a closure for its eventual execution of \a f(arg) on a separate
- * thread */
-void grpc_executor_push(grpc_closure *closure, grpc_error *error);
+extern grpc_closure_scheduler *grpc_executor_scheduler;
/** Shutdown the executor, running all pending work as part of the call */
-void grpc_executor_shutdown();
+void grpc_executor_shutdown(grpc_exec_ctx *exec_ctx);
#endif /* GRPC_CORE_LIB_IOMGR_EXECUTOR_H */
diff --git a/src/core/lib/iomgr/iocp_windows.c b/src/core/lib/iomgr/iocp_windows.c
index 60ebe43676..f0f4a6ff39 100644
--- a/src/core/lib/iomgr/iocp_windows.c
+++ b/src/core/lib/iomgr/iocp_windows.c
@@ -80,7 +80,6 @@ grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx,
LPOVERLAPPED overlapped;
grpc_winsocket *socket;
grpc_winsocket_callback_info *info;
- grpc_closure *closure = NULL;
success = GetQueuedCompletionStatus(
g_iocp, &bytes, &completion_key, &overlapped,
deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type)));
diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c
index 3470b5ac81..001e528409 100644
--- a/src/core/lib/iomgr/iomgr.c
+++ b/src/core/lib/iomgr/iomgr.c
@@ -83,11 +83,10 @@ static void dump_objects(const char *kind) {
}
}
-void grpc_iomgr_shutdown(void) {
+void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx) {
gpr_timespec shutdown_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN));
gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_iomgr_platform_flush();
@@ -104,10 +103,9 @@ void grpc_iomgr_shutdown(void) {
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
- if (grpc_timer_check(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC),
- NULL)) {
+ if (grpc_timer_check(exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
gpr_mu_unlock(&g_mu);
- grpc_exec_ctx_flush(&exec_ctx);
+ grpc_exec_ctx_flush(exec_ctx);
grpc_iomgr_platform_flush();
gpr_mu_lock(&g_mu);
continue;
@@ -139,8 +137,8 @@ void grpc_iomgr_shutdown(void) {
}
gpr_mu_unlock(&g_mu);
- grpc_timer_list_shutdown(&exec_ctx);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_timer_list_shutdown(exec_ctx);
+ grpc_exec_ctx_flush(exec_ctx);
/* ensure all threads have left g_mu */
gpr_mu_lock(&g_mu);
diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h
index c1cfaf302e..245a1e08aa 100644
--- a/src/core/lib/iomgr/iomgr.h
+++ b/src/core/lib/iomgr/iomgr.h
@@ -34,12 +34,14 @@
#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_H
#define GRPC_CORE_LIB_IOMGR_IOMGR_H
+#include <grpc/impl/codegen/exec_ctx_fwd.h>
#include "src/core/lib/iomgr/port.h"
/** Initializes the iomgr. */
void grpc_iomgr_init(void);
-/** Signals the intention to shutdown the iomgr. */
-void grpc_iomgr_shutdown(void);
+/** Signals the intention to shutdown the iomgr. Expects to be able to flush
+ * exec_ctx. */
+void grpc_iomgr_shutdown(grpc_exec_ctx *exec_ctx);
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_H */
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index 3a74b842b6..ed3edeee94 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -83,7 +83,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
// Drain any pending UV callbacks without blocking
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
}
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
}
void grpc_pollset_destroy(grpc_pollset *pollset) {
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index 5540303e49..2a45e708df 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -109,7 +109,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
pollset->on_shutdown = closure;
}
@@ -167,8 +167,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
- grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE);
pollset->on_shutdown = NULL;
}
goto done;
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index 275924448a..e03d16fa4e 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -36,6 +36,7 @@
#include <stddef.h>
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/pollset_set.h"
#define GRPC_MAX_SOCKADDR_SIZE 128
@@ -54,6 +55,7 @@ typedef struct {
/* TODO(ctiller): add a timeout here */
extern void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *addr,
const char *default_port,
+ grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addresses);
/* Destroy resolved addresses */
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index de791b2b67..50e470d149 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -163,10 +163,9 @@ typedef struct {
static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
grpc_error *error) {
request *r = rp;
- grpc_exec_ctx_sched(
+ grpc_closure_sched(
exec_ctx, r->on_done,
- grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out),
- NULL);
+ grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out));
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);
@@ -181,20 +180,22 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
+ grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r);
+ grpc_closure_init(&r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addrs_out = addrs;
- grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
-void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
- const char *default_port, grpc_closure *on_done,
- grpc_resolved_addresses **addrs) =
- resolve_address_impl;
+void (*grpc_resolve_address)(
+ grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
+ grpc_pollset_set *interested_parties, grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) = resolve_address_impl;
#endif
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index b8295acfa1..9b5f3209f0 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -98,7 +98,7 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *error;
error = handle_addrinfo_result(status, res, r->addresses);
- grpc_exec_ctx_sched(&exec_ctx, r->on_done, error, NULL);
+ grpc_closure_sched(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(r->hints);
@@ -181,6 +181,7 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
+ grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
uv_getaddrinfo_t *req;
@@ -192,7 +193,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
int s;
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ grpc_closure_sched(exec_ctx, on_done, err);
return;
}
r = gpr_malloc(sizeof(request));
@@ -216,16 +217,16 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
*addrs = NULL;
err = GRPC_ERROR_CREATE("getaddrinfo failed");
err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, uv_strerror(s));
- grpc_exec_ctx_sched(exec_ctx, on_done, err, NULL);
+ grpc_closure_sched(exec_ctx, on_done, err);
gpr_free(r);
gpr_free(req);
gpr_free(hints);
}
}
-void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
- const char *default_port, grpc_closure *on_done,
- grpc_resolved_addresses **addrs) =
- resolve_address_impl;
+void (*grpc_resolve_address)(
+ grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
+ grpc_pollset_set *interested_parties, grpc_closure *on_done,
+ grpc_resolved_addresses **addrs) = resolve_address_impl;
#endif /* GRPC_UV */
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index e139293c03..2439ce3cb7 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -154,7 +154,7 @@ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
} else {
GRPC_ERROR_REF(error);
}
- grpc_exec_ctx_sched(exec_ctx, r->on_done, error, NULL);
+ grpc_closure_sched(exec_ctx, r->on_done, error);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);
@@ -169,20 +169,22 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
const char *default_port,
+ grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addresses) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r);
+ grpc_closure_init(&r->request_closure, do_request_thread, r,
+ grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addresses = addresses;
- grpc_executor_push(&r->request_closure, GRPC_ERROR_NONE);
+ grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
-void (*grpc_resolve_address)(grpc_exec_ctx *exec_ctx, const char *name,
- const char *default_port, grpc_closure *on_done,
- grpc_resolved_addresses **addresses) =
- resolve_address_impl;
+void (*grpc_resolve_address)(
+ grpc_exec_ctx *exec_ctx, const char *name, const char *default_port,
+ grpc_pollset_set *interested_parties, grpc_closure *on_done,
+ grpc_resolved_addresses **addresses) = resolve_address_impl;
#endif
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 213d29600c..42a044df77 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -257,17 +257,16 @@ static void rq_step(grpc_exec_ctx *exec_ctx, void *rq, grpc_error *error) {
}
done:
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
}
static void rq_step_sched(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
if (resource_quota->step_scheduled) return;
resource_quota->step_scheduled = true;
- grpc_resource_quota_internal_ref(resource_quota);
- grpc_combiner_execute_finally(exec_ctx, resource_quota->combiner,
- &resource_quota->rq_step_closure,
- GRPC_ERROR_NONE, false);
+ grpc_resource_quota_ref_internal(resource_quota);
+ grpc_closure_sched(exec_ctx, &resource_quota->rq_step_closure,
+ GRPC_ERROR_NONE);
}
/* returns true if all allocations are completed */
@@ -294,7 +293,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
}
if (resource_user->free_pool >= 0) {
resource_user->allocating = false;
- grpc_exec_ctx_enqueue_list(exec_ctx, &resource_user->on_allocated, NULL);
+ grpc_closure_list_sched(exec_ctx, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu);
} else {
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
@@ -345,7 +344,7 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
destructive ? "destructive" : "benign");
}
resource_quota->reclaiming = true;
- grpc_resource_quota_internal_ref(resource_quota);
+ grpc_resource_quota_ref_internal(resource_quota);
grpc_closure *c = resource_user->reclaimers[destructive];
GPR_ASSERT(c);
resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
@@ -371,21 +370,10 @@ static void ru_slice_ref(void *p) {
gpr_ref(&rc->refs);
}
-static void ru_slice_unref(void *p) {
+static void ru_slice_unref(grpc_exec_ctx *exec_ctx, void *p) {
ru_slice_refcount *rc = p;
if (gpr_unref(&rc->refs)) {
- /* TODO(ctiller): this is dangerous, but I think safe for now:
- we have no guarantee here that we're at a safe point for creating an
- execution context, but we have no way of writing this code otherwise.
- In the future: consider lifting grpc_slice to grpc, and offering an
- internal_{ref,unref} pair that is execution context aware.
- Alternatively,
- make exec_ctx be thread local and 'do the right thing' (whatever that
- is)
- if NULL */
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_resource_user_free(&exec_ctx, rc->resource_user, rc->size);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_resource_user_free(exec_ctx, rc->resource_user, rc->size);
gpr_free(rc);
}
}
@@ -439,7 +427,7 @@ static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx,
resource_user->new_reclaimers[destructive] = NULL;
GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED);
return false;
}
resource_user->reclaimers[destructive] = closure;
@@ -480,10 +468,10 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
grpc_resource_user *resource_user = ru;
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0],
- GRPC_ERROR_CANCELLED, NULL);
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1],
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_ERROR_CANCELLED);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_ERROR_CANCELLED);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
rulist_remove(resource_user, GRPC_RULIST_RECLAIMER_BENIGN);
@@ -496,15 +484,15 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, (grpc_rulist)i);
}
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0],
- GRPC_ERROR_CANCELLED, NULL);
- grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1],
- GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_ERROR_CANCELLED);
+ grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_ERROR_CANCELLED);
if (resource_user->free_pool != 0) {
resource_user->resource_quota->free_pool += resource_user->free_pool;
rq_step_sched(exec_ctx, resource_user->resource_quota);
}
- grpc_resource_quota_internal_unref(exec_ctx, resource_user->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_user->resource_quota);
gpr_mu_destroy(&resource_user->mu);
gpr_free(resource_user->name);
gpr_free(resource_user);
@@ -540,7 +528,7 @@ static void rq_resize(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) {
a->resource_quota->size += delta;
a->resource_quota->free_pool += delta;
rq_step_sched(exec_ctx, a->resource_quota);
- grpc_resource_quota_internal_unref(exec_ctx, a->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, a->resource_quota);
gpr_free(a);
}
@@ -549,7 +537,7 @@ static void rq_reclamation_done(grpc_exec_ctx *exec_ctx, void *rq,
grpc_resource_quota *resource_quota = rq;
resource_quota->reclaiming = false;
rq_step_sched(exec_ctx, resource_quota);
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
}
/*******************************************************************************
@@ -571,16 +559,19 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
(intptr_t)resource_quota);
}
- grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota);
+ grpc_closure_init(
+ &resource_quota->rq_step_closure, rq_step, resource_quota,
+ grpc_combiner_finally_scheduler(resource_quota->combiner, true));
grpc_closure_init(&resource_quota->rq_reclamation_done_closure,
- rq_reclamation_done, resource_quota);
+ rq_reclamation_done, resource_quota,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
resource_quota->roots[i] = NULL;
}
return resource_quota;
}
-void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
+void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota) {
if (gpr_unref(&resource_quota->refs)) {
grpc_combiner_destroy(exec_ctx, resource_quota->combiner);
@@ -592,11 +583,11 @@ void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
/* Public API */
void grpc_resource_quota_unref(grpc_resource_quota *resource_quota) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
grpc_exec_ctx_finish(&exec_ctx);
}
-grpc_resource_quota *grpc_resource_quota_internal_ref(
+grpc_resource_quota *grpc_resource_quota_ref_internal(
grpc_resource_quota *resource_quota) {
gpr_ref(&resource_quota->refs);
return resource_quota;
@@ -604,7 +595,7 @@ grpc_resource_quota *grpc_resource_quota_internal_ref(
/* Public API */
void grpc_resource_quota_ref(grpc_resource_quota *resource_quota) {
- grpc_resource_quota_internal_ref(resource_quota);
+ grpc_resource_quota_ref_internal(resource_quota);
}
/* Public API */
@@ -612,11 +603,10 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
size_t size) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
rq_resize_args *a = gpr_malloc(sizeof(*a));
- a->resource_quota = grpc_resource_quota_internal_ref(resource_quota);
+ a->resource_quota = grpc_resource_quota_ref_internal(resource_quota);
a->size = (int64_t)size;
- grpc_closure_init(&a->closure, rq_resize, a);
- grpc_combiner_execute(&exec_ctx, resource_quota->combiner, &a->closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
+ grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -629,7 +619,7 @@ grpc_resource_quota *grpc_resource_quota_from_channel_args(
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
if (channel_args->args[i].type == GRPC_ARG_POINTER) {
- return grpc_resource_quota_internal_ref(
+ return grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
} else {
gpr_log(GPR_DEBUG, GRPC_ARG_RESOURCE_QUOTA " should be a pointer");
@@ -644,7 +634,9 @@ static void *rq_copy(void *rq) {
return rq;
}
-static void rq_destroy(void *rq) { grpc_resource_quota_unref(rq); }
+static void rq_destroy(grpc_exec_ctx *exec_ctx, void *rq) {
+ grpc_resource_quota_unref_internal(exec_ctx, rq);
+}
static int rq_cmp(void *a, void *b) { return GPR_ICMP(a, b); }
@@ -661,17 +653,21 @@ grpc_resource_user *grpc_resource_user_create(
grpc_resource_quota *resource_quota, const char *name) {
grpc_resource_user *resource_user = gpr_malloc(sizeof(*resource_user));
resource_user->resource_quota =
- grpc_resource_quota_internal_ref(resource_quota);
+ grpc_resource_quota_ref_internal(resource_quota);
grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
- resource_user);
+ resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
grpc_closure_init(&resource_user->add_to_free_pool_closure,
- &ru_add_to_free_pool, resource_user);
+ &ru_add_to_free_pool, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
grpc_closure_init(&resource_user->post_reclaimer_closure[0],
- &ru_post_benign_reclaimer, resource_user);
+ &ru_post_benign_reclaimer, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
grpc_closure_init(&resource_user->post_reclaimer_closure[1],
- &ru_post_destructive_reclaimer, resource_user);
- grpc_closure_init(&resource_user->destroy_closure, &ru_destroy,
- resource_user);
+ &ru_post_destructive_reclaimer, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
+ grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user,
+ grpc_combiner_scheduler(resource_quota->combiner, false));
gpr_mu_init(&resource_user->mu);
gpr_atm_rel_store(&resource_user->refs, 1);
gpr_atm_rel_store(&resource_user->shutdown, 0);
@@ -706,9 +702,8 @@ static void ru_unref_by(grpc_exec_ctx *exec_ctx,
gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
GPR_ASSERT(old >= amount);
if (old == amount) {
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->destroy_closure, GRPC_ERROR_NONE,
- false);
+ grpc_closure_sched(exec_ctx, &resource_user->destroy_closure,
+ GRPC_ERROR_NONE);
}
}
@@ -724,9 +719,12 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- grpc_closure_create(ru_shutdown, resource_user),
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx,
+ grpc_closure_create(
+ ru_shutdown, resource_user,
+ grpc_combiner_scheduler(
+ resource_user->resource_quota->combiner, false)),
+ GRPC_ERROR_NONE);
}
}
@@ -746,12 +744,11 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE);
if (!resource_user->allocating) {
resource_user->allocating = true;
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->allocate_closure, GRPC_ERROR_NONE,
- false);
+ grpc_closure_sched(exec_ctx, &resource_user->allocate_closure,
+ GRPC_ERROR_NONE);
}
} else {
- grpc_exec_ctx_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
}
@@ -770,9 +767,8 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
if (is_bigger_than_zero && was_zero_or_negative &&
!resource_user->added_to_free_pool) {
resource_user->added_to_free_pool = true;
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->add_to_free_pool_closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx, &resource_user->add_to_free_pool_closure,
+ GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
ru_unref_by(exec_ctx, resource_user, (gpr_atm)size);
@@ -784,9 +780,9 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
grpc_closure *closure) {
GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL);
resource_user->new_reclaimers[destructive] = closure;
- grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->post_reclaimer_closure[destructive],
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(exec_ctx,
+ &resource_user->post_reclaimer_closure[destructive],
+ GRPC_ERROR_NONE);
}
void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
@@ -795,18 +791,20 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
- grpc_combiner_execute(
- exec_ctx, resource_user->resource_quota->combiner,
- &resource_user->resource_quota->rq_reclamation_done_closure,
- GRPC_ERROR_NONE, false);
+ grpc_closure_sched(
+ exec_ctx, &resource_user->resource_quota->rq_reclamation_done_closure,
+ GRPC_ERROR_NONE);
}
void grpc_resource_user_slice_allocator_init(
grpc_resource_user_slice_allocator *slice_allocator,
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) {
- grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices,
- slice_allocator);
- grpc_closure_init(&slice_allocator->on_done, cb, p);
+ grpc_closure_init(
+ &slice_allocator->on_allocated, ru_allocated_slices, slice_allocator,
+ grpc_combiner_scheduler(resource_user->resource_quota->combiner, false));
+ grpc_closure_init(
+ &slice_allocator->on_done, cb, p,
+ grpc_combiner_scheduler(resource_user->resource_quota->combiner, false));
slice_allocator->resource_user = resource_user;
}
diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h
index 0181fd978b..ef286c2fce 100644
--- a/src/core/lib/iomgr/resource_quota.h
+++ b/src/core/lib/iomgr/resource_quota.h
@@ -77,9 +77,9 @@
extern int grpc_resource_quota_trace;
-grpc_resource_quota *grpc_resource_quota_internal_ref(
+grpc_resource_quota *grpc_resource_quota_ref_internal(
grpc_resource_quota *resource_quota);
-void grpc_resource_quota_internal_unref(grpc_exec_ctx *exec_ctx,
+void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx,
grpc_resource_quota *resource_quota);
grpc_resource_quota *grpc_resource_quota_from_channel_args(
const grpc_channel_args *channel_args);
diff --git a/src/core/lib/iomgr/socket_mutator.c b/src/core/lib/iomgr/socket_mutator.c
index 8b1efb6bab..34b61dcfe5 100644
--- a/src/core/lib/iomgr/socket_mutator.c
+++ b/src/core/lib/iomgr/socket_mutator.c
@@ -76,7 +76,7 @@ static void *socket_mutator_arg_copy(void *p) {
return grpc_socket_mutator_ref(p);
}
-static void socket_mutator_arg_destroy(void *p) {
+static void socket_mutator_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) {
grpc_socket_mutator_unref(p);
}
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index 54911e0e31..2f2e02f715 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -131,7 +131,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
info->closure = closure;
}
@@ -154,7 +154,7 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
- grpc_exec_ctx_sched(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, info->closure, GRPC_ERROR_NONE);
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index a3a70a8ed7..9a77c92016 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -65,6 +65,7 @@ typedef struct {
grpc_fd *fd;
gpr_timespec deadline;
grpc_timer alarm;
+ grpc_closure on_alarm;
int refs;
grpc_closure write_closure;
grpc_pollset_set *interested_parties;
@@ -128,7 +129,7 @@ static void tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
- grpc_channel_args_destroy(ac->channel_args);
+ grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_free(ac);
}
}
@@ -148,8 +149,8 @@ grpc_endpoint *grpc_tcp_client_create_from_fd(
&channel_args->args[i], options);
} else if (0 ==
strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
- resource_quota = grpc_resource_quota_internal_ref(
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
@@ -157,7 +158,7 @@ grpc_endpoint *grpc_tcp_client_create_from_fd(
grpc_endpoint *ep =
grpc_tcp_create(fd, resource_quota, tcp_read_chunk_size, addr_str);
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
return ep;
}
@@ -262,10 +263,10 @@ finish:
if (done) {
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_str);
- grpc_channel_args_destroy(ac->channel_args);
+ grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_free(ac);
}
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ grpc_closure_sched(exec_ctx, closure, error);
}
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
@@ -294,7 +295,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
if (error != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ grpc_closure_sched(exec_ctx, closure, error);
return;
}
if (dsmode == GRPC_DSMODE_IPV4) {
@@ -303,7 +304,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr = &addr4_copy;
}
if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+ grpc_closure_sched(exec_ctx, closure, error);
return;
}
@@ -321,14 +322,13 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (err >= 0) {
*ep =
grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str);
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
- grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"),
- NULL);
+ grpc_closure_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
}
@@ -343,8 +343,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr_str = NULL;
gpr_mu_init(&ac->mu);
ac->refs = 2;
- ac->write_closure.cb = on_writable;
- ac->write_closure.cb_arg = ac;
+ grpc_closure_init(&ac->write_closure, on_writable, ac,
+ grpc_schedule_on_exec_ctx);
ac->channel_args = grpc_channel_args_copy(channel_args);
if (grpc_tcp_trace) {
@@ -353,9 +353,10 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
}
gpr_mu_lock(&ac->mu);
+ grpc_closure_init(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC));
+ &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
grpc_fd_notify_on_write(exec_ctx, ac->fd, &ac->write_closure);
gpr_mu_unlock(&ac->mu);
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index b07f9ceffa..5225a5402b 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -49,6 +49,7 @@
typedef struct grpc_uv_tcp_connect {
uv_connect_t connect_req;
grpc_timer alarm;
+ grpc_closure on_alarm;
uv_tcp_t *tcp_handle;
grpc_closure *closure;
grpc_endpoint **endpoint;
@@ -59,7 +60,7 @@ typedef struct grpc_uv_tcp_connect {
static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx,
grpc_uv_tcp_connect *connect) {
- grpc_resource_quota_internal_unref(exec_ctx, connect->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, connect->resource_quota);
gpr_free(connect);
}
@@ -110,7 +111,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
if (done) {
uv_tcp_connect_cleanup(&exec_ctx, connect);
}
- grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL);
+ grpc_closure_sched(&exec_ctx, closure, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -128,8 +129,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
- resource_quota = grpc_resource_quota_internal_ref(
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
@@ -148,9 +149,11 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
uv_tcp_connect(&connect->connect_req, connect->tcp_handle,
(const struct sockaddr *)resolved_addr->addr,
uv_tc_on_connect);
+ grpc_closure_init(&connect->on_alarm, uv_tc_on_alarm, connect,
+ grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &connect->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
- uv_tc_on_alarm, connect, gpr_now(GPR_CLOCK_MONOTONIC));
+ &connect->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
}
// overridden by api_fuzzer.c
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 1127588ebc..1e84ec3a1e 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -58,6 +58,7 @@ typedef struct {
grpc_winsocket *socket;
gpr_timespec deadline;
grpc_timer alarm;
+ grpc_closure on_alarm;
char *addr_name;
int refs;
grpc_closure on_connect;
@@ -71,7 +72,7 @@ static void async_connect_unlock_and_cleanup(grpc_exec_ctx *exec_ctx,
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
- grpc_resource_quota_internal_unref(exec_ctx, ac->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, ac->resource_quota);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
@@ -129,7 +130,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
- grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL);
+ grpc_closure_sched(exec_ctx, on_done, error);
}
/* Tries to issue one async connection, then schedules both an IOCP
@@ -157,8 +158,8 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
- resource_quota = grpc_resource_quota_internal_ref(
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ resource_quota = grpc_resource_quota_ref_internal(
channel_args->args[i].value.pointer.p);
}
}
@@ -227,9 +228,10 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint;
ac->resource_quota = resource_quota;
- grpc_closure_init(&ac->on_connect, on_connect, ac);
+ grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
- grpc_timer_init(exec_ctx, &ac->alarm, deadline, on_alarm, ac,
+ grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
+ grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm,
gpr_now(GPR_CLOCK_MONOTONIC));
grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect);
return;
@@ -246,8 +248,8 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
- grpc_resource_quota_internal_unref(exec_ctx, resource_quota);
- grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL);
+ grpc_resource_quota_unref_internal(exec_ctx, resource_quota);
+ grpc_closure_sched(exec_ctx, on_done, final_error);
}
#endif /* GRPC_WINSOCK_SOCKET */
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 540305e4fa..ece44978b0 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -56,6 +56,7 @@
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/slice/slice_internal.h"
#include "src/core/lib/slice/slice_string_helpers.h"
#include "src/core/lib/support/string.h"
@@ -127,7 +128,7 @@ static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
"tcp_unref_orphan");
- grpc_slice_buffer_destroy(&tcp->last_read_buffer);
+ grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer);
grpc_resource_user_unref(exec_ctx, tcp->resource_user);
gpr_free(tcp->peer_string);
gpr_free(tcp);
@@ -168,7 +169,7 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
grpc_network_status_unregister_endpoint(ep);
grpc_tcp *tcp = (grpc_tcp *)ep;
- grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
@@ -235,14 +236,15 @@ static void tcp_do_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
/* We've consumed the edge, request a new one */
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp,
tcp_annotate_error(GRPC_OS_ERROR(errno, "recvmsg"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
}
} else if (read_bytes == 0) {
/* 0 read size ==> end of stream */
- grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
call_read_cb(exec_ctx, tcp,
tcp_annotate_error(GRPC_ERROR_CREATE("Socket closed"), tcp));
TCP_UNREF(exec_ctx, tcp, "read");
@@ -268,8 +270,9 @@ static void tcp_read_allocation_done(grpc_exec_ctx *exec_ctx, void *tcpp,
grpc_error *error) {
grpc_tcp *tcp = tcpp;
if (error != GRPC_ERROR_NONE) {
- grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &tcp->last_read_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
@@ -294,8 +297,9 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
GPR_ASSERT(!tcp->finished_edge);
if (error != GRPC_ERROR_NONE) {
- grpc_slice_buffer_reset_and_unref(tcp->incoming_buffer);
- grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, tcp->incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx,
+ &tcp->last_read_buffer);
call_read_cb(exec_ctx, tcp, GRPC_ERROR_REF(error));
TCP_UNREF(exec_ctx, tcp, "read");
} else {
@@ -309,14 +313,14 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->incoming_buffer = incoming_buffer;
- grpc_slice_buffer_reset_and_unref(incoming_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, incoming_buffer);
grpc_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
TCP_REF(tcp, "read");
if (tcp->finished_edge) {
tcp->finished_edge = false;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
}
}
@@ -460,11 +464,10 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_exec_ctx_sched(exec_ctx, cb,
- grpc_fd_is_shutdown(tcp->em_fd)
- ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp)
- : GRPC_ERROR_NONE,
- NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ grpc_fd_is_shutdown(tcp->em_fd)
+ ? tcp_annotate_error(GRPC_ERROR_CREATE("EOF"), tcp)
+ : GRPC_ERROR_NONE);
return;
}
tcp->outgoing_buffer = buf;
@@ -484,7 +487,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
gpr_log(GPR_DEBUG, "write: %s", str);
grpc_error_free_string(str);
}
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
GPR_TIMER_END("tcp_write", 0);
@@ -552,10 +555,10 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd,
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
- tcp->read_closure.cb = tcp_handle_read;
- tcp->read_closure.cb_arg = tcp;
- tcp->write_closure.cb = tcp_handle_write;
- tcp->write_closure.cb_arg = tcp;
+ grpc_closure_init(&tcp->read_closure, tcp_handle_read, tcp,
+ grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&tcp->write_closure, tcp_handle_write, tcp,
+ grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&tcp->last_read_buffer);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
grpc_resource_user_slice_allocator_init(
@@ -579,7 +582,7 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GPR_ASSERT(ep->vtable == &vtable);
tcp->release_fd = fd;
tcp->release_fd_cb = done;
- grpc_slice_buffer_reset_and_unref(&tcp->last_read_buffer);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer);
TCP_UNREF(exec_ctx, tcp, "destroy");
}
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 179f47ef76..20efb678b2 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -167,18 +167,18 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
s->so_reuseport =
has_so_reuseport && (args->args[i].value.integer != 0);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_ALLOW_REUSEPORT
" must be an integer");
}
} else if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
- grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+ grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
@@ -208,7 +208,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -219,7 +219,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp);
}
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
}
@@ -254,8 +254,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- sp->destroyed_closure.cb = destroyed_port;
- sp->destroyed_closure.cb_arg = s;
+ grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"tcp_listener_shutdown");
}
@@ -723,8 +723,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
@@ -733,8 +733,8 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
sp = sp->next;
@@ -760,7 +760,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index e1a174cfa2..eed2773f8a 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -89,11 +89,11 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
- grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+ grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
@@ -126,7 +126,7 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
while (s->head) {
@@ -136,7 +136,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
gpr_free(sp->handle);
gpr_free(sp);
}
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
}
@@ -170,7 +170,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_exec_ctx_enqueue_list(&local_exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(&local_exec_ctx, &s->shutdown_starting);
if (exec_ctx == NULL) {
grpc_exec_ctx_flush(&local_exec_ctx);
tcp_server_destroy(&local_exec_ctx, s);
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index b0c8586bac..dafe851ce8 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -116,11 +116,11 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) {
if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) {
if (args->args[i].type == GRPC_ARG_POINTER) {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
s->resource_quota =
- grpc_resource_quota_internal_ref(args->args[i].value.pointer.p);
+ grpc_resource_quota_ref_internal(args->args[i].value.pointer.p);
} else {
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA
" must be a pointer to a buffer pool");
@@ -155,18 +155,19 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
grpc_winsocket_destroy(sp->socket);
gpr_free(sp);
}
- grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota);
+ grpc_resource_quota_unref_internal(exec_ctx, s->resource_quota);
gpr_free(s);
}
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
- grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(destroy_server, s),
- GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, grpc_closure_create(destroy_server, s,
+ grpc_schedule_on_exec_ctx),
+ GRPC_ERROR_NONE);
}
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
@@ -183,7 +184,6 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
}
static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
- int immediately_done = 0;
grpc_tcp_listener *sp;
gpr_mu_lock(&s->mu);
@@ -204,7 +204,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_exec_ctx_enqueue_list(exec_ctx, &s->shutdown_starting, NULL);
+ grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
@@ -239,7 +239,7 @@ static grpc_error *prepare_socket(SOCKET sock,
error = GRPC_WSA_ERROR(WSAGetLastError(), "getsockname");
goto failure;
}
- sockname_temp.len = sockname_temp_len;
+ sockname_temp.len = (size_t)sockname_temp_len;
*port = grpc_sockaddr_get_port(&sockname_temp);
return GRPC_ERROR_NONE;
@@ -247,7 +247,7 @@ static grpc_error *prepare_socket(SOCKET sock,
failure:
GPR_ASSERT(error != GRPC_ERROR_NONE);
char *tgtaddr = grpc_sockaddr_to_uri(addr);
- grpc_error *final_error = grpc_error_set_int(
+ grpc_error_set_int(
grpc_error_set_str(GRPC_ERROR_CREATE_REFERENCING(
"Failed to prepare server socket", &error, 1),
GRPC_ERROR_STR_TARGET_ADDRESS, tgtaddr),
@@ -260,7 +260,6 @@ failure:
static void decrement_active_ports_and_notify_locked(grpc_exec_ctx *exec_ctx,
grpc_tcp_listener *sp) {
- int notify = 0;
sp->shutting_down = 0;
GPR_ASSERT(sp->server->active_ports > 0);
if (0 == --sp->server->active_ports) {
@@ -374,7 +373,7 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
int peer_name_len = (int)peer_name.len;
err =
getpeername(sock, (struct sockaddr *)peer_name.addr, &peer_name_len);
- peer_name.len = peer_name_len;
+ peer_name.len = (size_t)peer_name_len;
if (!err) {
peer_name_string = grpc_sockaddr_to_uri(&peer_name);
} else {
@@ -465,7 +464,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->new_socket = INVALID_SOCKET;
sp->port = port;
sp->port_index = port_index;
- grpc_closure_init(&sp->on_accept, on_accept, sp);
+ grpc_closure_init(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
*listener = sp;
@@ -497,7 +496,7 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
if (0 == getsockname(sp->socket->socket,
(struct sockaddr *)sockname_temp.addr,
&sockname_temp_len)) {
- sockname_temp.len = sockname_temp_len;
+ sockname_temp.len = (size_t)sockname_temp_len;
*port = grpc_sockaddr_get_port(&sockname_temp);
if (*port > 0) {
allocated_addr = gpr_malloc(sizeof(grpc_resolved_address));
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 6e2ad1dbe9..3ddc79706b 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -169,7 +169,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
// nread < 0: Error
error = GRPC_ERROR_CREATE("TCP Read failed");
}
- grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_closure_sched(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -181,7 +181,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_slices = read_slices;
- grpc_slice_buffer_reset_and_unref(read_slices);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices);
TCP_REF(tcp, "read");
// TODO(murgatroid99): figure out what the return value here means
status =
@@ -190,7 +190,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
error = GRPC_ERROR_CREATE("TCP Read failed at start");
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, uv_strerror(status));
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
if (grpc_tcp_trace) {
const char *str = grpc_error_string(error);
@@ -217,7 +217,7 @@ static void write_callback(uv_write_t *req, int status) {
gpr_free(tcp->write_buffers);
grpc_resource_user_free(&exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count);
- grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL);
+ grpc_closure_sched(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -243,8 +243,8 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
if (tcp->shutting_down) {
- grpc_exec_ctx_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"));
return;
}
@@ -254,7 +254,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (tcp->write_slices->count == 0) {
// No slices means we don't have to do anything,
// and libuv doesn't like empty writes
- grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE);
return;
}
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index d4613b674e..84f791ba07 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -53,6 +53,7 @@
#include "src/core/lib/iomgr/socket_windows.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/slice_internal.h"
#if defined(__MSYS__) && defined(GPR_ARCH_64)
/* Nasty workaround for nasty bug when using the 64 bits msys compiler
@@ -174,13 +175,13 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
char *utf8_message = gpr_format_message(info->wsa_error);
error = GRPC_ERROR_CREATE(utf8_message);
gpr_free(utf8_message);
- grpc_slice_unref(tcp->read_slice);
+ grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
} else {
if (info->bytes_transfered != 0 && !tcp->shutting_down) {
sub = grpc_slice_sub_no_ref(tcp->read_slice, 0, info->bytes_transfered);
grpc_slice_buffer_add(tcp->read_slices, sub);
} else {
- grpc_slice_unref(tcp->read_slice);
+ grpc_slice_unref_internal(exec_ctx, tcp->read_slice);
error = GRPC_ERROR_CREATE("End of TCP stream");
}
}
@@ -188,7 +189,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
tcp->read_cb = NULL;
TCP_UNREF(exec_ctx, tcp, "read");
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -202,14 +203,14 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
- grpc_exec_ctx_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"));
return;
}
tcp->read_cb = cb;
tcp->read_slices = read_slices;
- grpc_slice_buffer_reset_and_unref(read_slices);
+ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, read_slices);
tcp->read_slice = grpc_slice_malloc(8192);
@@ -227,7 +228,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE);
return;
}
@@ -240,8 +241,8 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- grpc_exec_ctx_sched(exec_ctx, &tcp->on_read,
- GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL);
+ grpc_closure_sched(exec_ctx, &tcp->on_read,
+ GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
return;
}
}
@@ -272,7 +273,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
}
/* Initiates a write. */
@@ -290,8 +291,8 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t len;
if (tcp->shutting_down) {
- grpc_exec_ctx_sched(exec_ctx, cb,
- GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL);
+ grpc_closure_sched(exec_ctx, cb,
+ GRPC_ERROR_CREATE("TCP socket is shutting down"));
return;
}
@@ -322,7 +323,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *error = status == 0
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
- grpc_exec_ctx_sched(exec_ctx, cb, error, NULL);
+ grpc_closure_sched(exec_ctx, cb, error);
if (allocated) gpr_free(allocated);
return;
}
@@ -340,8 +341,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"),
- NULL);
+ grpc_closure_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
return;
}
}
@@ -424,8 +424,8 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket,
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_ref_init(&tcp->refcount, 1);
- grpc_closure_init(&tcp->on_read, on_read, tcp);
- grpc_closure_init(&tcp->on_write, on_write, tcp);
+ grpc_closure_init(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
+ grpc_closure_init(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
tcp->peer_string = gpr_strdup(peer_string);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
diff --git a/src/core/lib/iomgr/timer.h b/src/core/lib/iomgr/timer.h
index 20fe98c4a7..d84a278b18 100644
--- a/src/core/lib/iomgr/timer.h
+++ b/src/core/lib/iomgr/timer.h
@@ -49,15 +49,15 @@
typedef struct grpc_timer grpc_timer;
-/* Initialize *timer. When expired or canceled, timer_cb will be called with
- *timer_cb_arg and error set to indicate if it expired (GRPC_ERROR_NONE) or
- was canceled (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called
- exactly once, and application code should check the error to determine
- how it was invoked. The application callback is also responsible for
- maintaining information about when to free up any user-level state. */
+/* Initialize *timer. When expired or canceled, closure will be called with
+ error set to indicate if it expired (GRPC_ERROR_NONE) or was canceled
+ (GRPC_ERROR_CANCELLED). timer_cb is guaranteed to be called exactly once, and
+ application code should check the error to determine how it was invoked. The
+ application callback is also responsible for maintaining information about
+ when to free up any user-level state. */
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
- gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
- void *timer_cb_arg, gpr_timespec now);
+ gpr_timespec deadline, grpc_closure *closure,
+ gpr_timespec now);
/* Note that there is no timer destroy function. This is because the
timer is a one-time occurrence with a guarantee that the callback will
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 00058f9d86..40c8351472 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -178,28 +178,27 @@ static void note_deadline_change(shard_type *shard) {
}
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
- gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
- void *timer_cb_arg, gpr_timespec now) {
+ gpr_timespec deadline, grpc_closure *closure,
+ gpr_timespec now) {
int is_first_timer = 0;
shard_type *shard = &g_shards[shard_idx(timer)];
GPR_ASSERT(deadline.clock_type == g_clock_type);
GPR_ASSERT(now.clock_type == g_clock_type);
- grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
+ timer->closure = closure;
timer->deadline = deadline;
timer->triggered = 0;
if (!g_initialized) {
timer->triggered = 1;
- grpc_exec_ctx_sched(
- exec_ctx, &timer->closure,
- GRPC_ERROR_CREATE("Attempt to create timer before initialization"),
- NULL);
+ grpc_closure_sched(
+ exec_ctx, timer->closure,
+ GRPC_ERROR_CREATE("Attempt to create timer before initialization"));
return;
}
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
return;
}
@@ -251,7 +250,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
shard_type *shard = &g_shards[shard_idx(timer)];
gpr_mu_lock(&shard->mu);
if (!timer->triggered) {
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
timer->triggered = 1;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@@ -317,7 +316,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL);
+ grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_REF(error));
n++;
}
*new_min_deadline = compute_min_deadline(shard);
diff --git a/src/core/lib/iomgr/timer_generic.h b/src/core/lib/iomgr/timer_generic.h
index e4494adb5f..9d901c7e68 100644
--- a/src/core/lib/iomgr/timer_generic.h
+++ b/src/core/lib/iomgr/timer_generic.h
@@ -43,7 +43,7 @@ struct grpc_timer {
int triggered;
struct grpc_timer *next;
struct grpc_timer *prev;
- grpc_closure closure;
+ grpc_closure *closure;
};
#endif /* GRPC_CORE_LIB_IOMGR_TIMER_GENERIC_H */
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index cfcb89268b..fa2cdee964 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -55,20 +55,20 @@ void run_expired_timer(uv_timer_t *handle) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(!timer->triggered);
timer->triggered = 1;
- grpc_exec_ctx_sched(&exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
stop_uv_timer(handle);
grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
- gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
- void *timer_cb_arg, gpr_timespec now) {
+ gpr_timespec deadline, grpc_closure *closure,
+ gpr_timespec now) {
uint64_t timeout;
uv_timer_t *uv_timer;
- grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
+ timer->closure = closure;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->triggered = 1;
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
return;
}
timer->triggered = 0;
@@ -83,7 +83,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
if (!timer->triggered) {
timer->triggered = 1;
- grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL);
+ grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
stop_uv_timer((uv_timer_t *)timer->uv_timer);
}
}
diff --git a/src/core/lib/iomgr/timer_uv.h b/src/core/lib/iomgr/timer_uv.h
index 3de383ebd5..13cf8bd4fa 100644
--- a/src/core/lib/iomgr/timer_uv.h
+++ b/src/core/lib/iomgr/timer_uv.h
@@ -37,7 +37,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
struct grpc_timer {
- grpc_closure closure;
+ grpc_closure *closure;
/* This is actually a uv_timer_t*, but we want to keep platform-specific
types out of headers */
void *uv_timer;
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 3c24ea9afa..dfbd295c91 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -126,7 +126,7 @@ grpc_udp_server *grpc_udp_server_create(void) {
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL);
+ grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -170,8 +170,8 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- sp->destroyed_closure.cb = destroyed_port;
- sp->destroyed_closure.cb_arg = s;
+ grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ grpc_schedule_on_exec_ctx);
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. */
@@ -446,8 +446,8 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- sp->read_closure.cb = on_read;
- sp->read_closure.cb_arg = sp;
+ grpc_closure_init(&sp->read_closure, on_read, sp,
+ grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h
index 73d9849843..371b0f55dc 100644
--- a/src/core/lib/iomgr/workqueue.h
+++ b/src/core/lib/iomgr/workqueue.h
@@ -72,17 +72,16 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue);
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue);
#endif
-/** Add a work item to a workqueue. Items added to a work queue will be started
- in approximately the order they were enqueued, on some thread that may or
- may not be the current thread. Successive closures enqueued onto a workqueue
- MAY be executed concurrently.
+/** Fetch the workqueue closure scheduler. Items added to a work queue will be
+ started in approximately the order they were enqueued, on some thread that
+ may or may not be the current thread. Successive closures enqueued onto a
+ workqueue MAY be executed concurrently.
It is generally more expensive to add a closure to a workqueue than to the
execution context, both in terms of CPU work and in execution latency.
Use work queues when it's important that other threads be given a chance to
tackle some workload. */
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue);
#endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */
diff --git a/src/core/lib/iomgr/workqueue_uv.c b/src/core/lib/iomgr/workqueue_uv.c
index e58ca476cc..4d61b40912 100644
--- a/src/core/lib/iomgr/workqueue_uv.c
+++ b/src/core/lib/iomgr/workqueue_uv.c
@@ -58,9 +58,8 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
+ return grpc_schedule_on_exec_ctx;
}
#endif /* GPR_UV */
diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c
index 5c93d3c59e..234b47cdf5 100644
--- a/src/core/lib/iomgr/workqueue_windows.c
+++ b/src/core/lib/iomgr/workqueue_windows.c
@@ -56,9 +56,8 @@ grpc_workqueue *grpc_workqueue_ref(grpc_workqueue *workqueue) {
void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {}
#endif
-void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue,
- grpc_closure *closure, grpc_error *error) {
- grpc_exec_ctx_sched(exec_ctx, closure, error, NULL);
+grpc_closure_scheduler *grpc_workqueue_scheduler(grpc_workqueue *workqueue) {
+ return grpc_schedule_on_exec_ctx;
}
#endif /* GPR_WINDOWS */