aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/closure.c54
-rw-r--r--src/core/lib/iomgr/closure.h60
-rw-r--r--src/core/lib/iomgr/combiner.c12
-rw-r--r--src/core/lib/iomgr/error.h2
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c4
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c4
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c4
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c8
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c4
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c16
-rw-r--r--src/core/lib/iomgr/exec_ctx.c4
-rw-r--r--src/core/lib/iomgr/executor.c2
-rw-r--r--src/core/lib/iomgr/lockfree_event.c8
-rw-r--r--src/core/lib/iomgr/pollset_uv.c2
-rw-r--r--src/core/lib/iomgr/pollset_windows.c4
-rw-r--r--src/core/lib/iomgr/resolve_address_posix.c6
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c6
-rw-r--r--src/core/lib/iomgr/resolve_address_windows.c6
-rw-r--r--src/core/lib/iomgr/resource_quota.c56
-rw-r--r--src/core/lib/iomgr/socket_windows.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_posix.c14
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c8
-rw-r--r--src/core/lib/iomgr/tcp_posix.c14
-rw-r--r--src/core/lib/iomgr/tcp_server_posix.c10
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c4
-rw-r--r--src/core/lib/iomgr/tcp_server_windows.c8
-rw-r--r--src/core/lib/iomgr/tcp_uv.c10
-rw-r--r--src/core/lib/iomgr/tcp_windows.c20
-rw-r--r--src/core/lib/iomgr/timer_generic.c8
-rw-r--r--src/core/lib/iomgr/timer_uv.c6
-rw-r--r--src/core/lib/iomgr/udp_server.c12
32 files changed, 245 insertions, 139 deletions
diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c
index 72a1b20443..719e2e8cee 100644
--- a/src/core/lib/iomgr/closure.c
+++ b/src/core/lib/iomgr/closure.c
@@ -24,14 +24,26 @@
#include "src/core/lib/profiling/timers.h"
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+grpc_closure *grpc_closure_init(const char *file, int line,
+ grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
+#else
grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg,
grpc_closure_scheduler *scheduler) {
+#endif
closure->cb = cb;
closure->cb_arg = cb_arg;
closure->scheduler = scheduler;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
closure->scheduled = false;
+ closure->file_initiated = NULL;
+ closure->line_initiated = 0;
+ closure->run = false;
+ closure->file_created = file;
+ closure->line_created = line;
#endif
return closure;
}
@@ -100,19 +112,39 @@ static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg,
cb(exec_ctx, cb_arg, error);
}
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+grpc_closure *grpc_closure_create(const char *file, int line,
+ grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler) {
+#else
grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
grpc_closure_scheduler *scheduler) {
+#endif
wrapped_closure *wc = gpr_malloc(sizeof(*wc));
wc->cb = cb;
wc->cb_arg = cb_arg;
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+ grpc_closure_init(file, line, &wc->wrapper, closure_wrapper, wc, scheduler);
+#else
grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler);
+#endif
return &wc->wrapper;
}
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_run(const char *file, int line, grpc_exec_ctx *exec_ctx,
+ grpc_closure *c, grpc_error *error) {
+#else
void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c,
grpc_error *error) {
+#endif
GPR_TIMER_BEGIN("grpc_closure_run", 0);
if (c != NULL) {
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+ c->file_initiated = file;
+ c->line_initiated = line;
+ c->run = true;
+#endif
assert(c->cb);
c->scheduler->vtable->run(exec_ctx, c, error);
} else {
@@ -121,13 +153,21 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c,
GPR_TIMER_END("grpc_closure_run", 0);
}
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_sched(const char *file, int line, grpc_exec_ctx *exec_ctx,
+ grpc_closure *c, grpc_error *error) {
+#else
void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c,
grpc_error *error) {
+#endif
GPR_TIMER_BEGIN("grpc_closure_sched", 0);
if (c != NULL) {
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
GPR_ASSERT(!c->scheduled);
c->scheduled = true;
+ c->file_initiated = file;
+ c->line_initiated = line;
+ c->run = false;
#endif
assert(c->cb);
c->scheduler->vtable->sched(exec_ctx, c, error);
@@ -137,13 +177,21 @@ void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c,
GPR_TIMER_END("grpc_closure_sched", 0);
}
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_list_sched(const char *file, int line,
+ grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+#else
void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) {
+#endif
grpc_closure *c = list->head;
while (c != NULL) {
grpc_closure *next = c->next_data.next;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
GPR_ASSERT(!c->scheduled);
c->scheduled = true;
+ c->file_initiated = file;
+ c->line_initiated = line;
+ c->run = false;
#endif
assert(c->cb);
c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error);
diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h
index 25086fa483..3ecf9e947b 100644
--- a/src/core/lib/iomgr/closure.h
+++ b/src/core/lib/iomgr/closure.h
@@ -59,6 +59,8 @@ struct grpc_closure_scheduler {
const grpc_closure_scheduler_vtable *vtable;
};
+// #define GRPC_CLOSURE_RICH_DEBUG
+
/** A closure over a grpc_iomgr_cb_func. */
struct grpc_closure {
/** Once queued, next indicates the next queued closure; before then, scratch
@@ -85,19 +87,47 @@ struct grpc_closure {
uintptr_t scratch;
} error_data;
-#ifndef NDEBUG
+// extra tracing and debugging for grpc_closure. This incurs a decent amount of
+// overhead per closure, so it must be enabled at compile time.
+#ifdef GRPC_CLOSURE_RICH_DEBUG
bool scheduled;
+ bool run; // true = run, false = scheduled
+ const char *file_created;
+ int line_created;
+ const char *file_initiated;
+ int line_initiated;
#endif
};
/** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+grpc_closure *grpc_closure_init(const char *file, int line,
+ grpc_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg,
+ grpc_closure_scheduler *scheduler);
+#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
+ grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg, scheduler)
+#else
grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb,
void *cb_arg,
grpc_closure_scheduler *scheduler);
+#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \
+ grpc_closure_init(closure, cb, cb_arg, scheduler)
+#endif
/* Create a heap allocated closure: try to avoid except for very rare events */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+grpc_closure *grpc_closure_create(const char *file, int line,
+ grpc_iomgr_cb_func cb, void *cb_arg,
+ grpc_closure_scheduler *scheduler);
+#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
+ grpc_closure_create(__FILE__, __LINE__, cb, cb_arg, scheduler)
+#else
grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg,
grpc_closure_scheduler *scheduler);
+#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \
+ grpc_closure_create(cb, cb_arg, scheduler)
+#endif
#define GRPC_CLOSURE_LIST_INIT \
{ NULL, NULL }
@@ -123,16 +153,44 @@ bool grpc_closure_list_empty(grpc_closure_list list);
/** Run a closure directly. Caller ensures that no locks are being held above.
* Note that calling this at the end of a closure callback function itself is
* by definition safe. */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_run(const char *file, int line, grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+#define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \
+ grpc_closure_run(__FILE__, __LINE__, exec_ctx, closure, error)
+#else
void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
+#define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \
+ grpc_closure_run(exec_ctx, closure, error)
+#endif
/** Schedule a closure to be run. Does not need to be run from a safe point. */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_sched(const char *file, int line, grpc_exec_ctx *exec_ctx,
+ grpc_closure *closure, grpc_error *error);
+#define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \
+ grpc_closure_sched(__FILE__, __LINE__, exec_ctx, closure, error)
+#else
void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error);
+#define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \
+ grpc_closure_sched(exec_ctx, closure, error)
+#endif
/** Schedule all closures in a list to be run. Does not need to be run from a
* safe point. */
+#ifdef GRPC_CLOSURE_RICH_DEBUG
+void grpc_closure_list_sched(const char *file, int line,
+ grpc_exec_ctx *exec_ctx,
+ grpc_closure_list *closure_list);
+#define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \
+ grpc_closure_list_sched(__FILE__, __LINE__, exec_ctx, closure_list)
+#else
void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx,
grpc_closure_list *closure_list);
+#define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \
+ grpc_closure_list_sched(exec_ctx, closure_list)
+#endif
#endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */
diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c
index f6976c5650..750ff102ff 100644
--- a/src/core/lib/iomgr/combiner.c
+++ b/src/core/lib/iomgr/combiner.c
@@ -81,7 +81,7 @@ grpc_combiner *grpc_combiner_create(void) {
gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
gpr_mpscq_init(&lock->queue);
grpc_closure_list_init(&lock->final_list);
- grpc_closure_init(&lock->offload, offload, lock, grpc_executor_scheduler);
+ GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock));
return lock;
}
@@ -196,7 +196,7 @@ static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) {
move_next(exec_ctx);
GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock));
- grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &lock->offload, GRPC_ERROR_NONE);
}
bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
@@ -247,7 +247,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
GPR_TIMER_BEGIN("combiner.exec1", 0);
grpc_closure *cl = (grpc_closure *)n;
grpc_error *cl_err = cl->error_data.error;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
cl->scheduled = false;
#endif
cl->cb(exec_ctx, cl->cb_arg, cl_err);
@@ -264,7 +264,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) {
gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c));
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
@@ -332,8 +332,8 @@ static void combiner_finally_exec(grpc_exec_ctx *exec_ctx,
GPR_TIMER_BEGIN("combiner.execute_finally", 0);
if (exec_ctx->active_combiner != lock) {
GPR_TIMER_MARK("slowpath", 0);
- grpc_closure_sched(exec_ctx,
- grpc_closure_create(enqueue_finally, closure,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_CREATE(enqueue_finally, closure,
grpc_combiner_scheduler(lock)),
error);
GPR_TIMER_END("combiner.execute_finally", 0);
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index e626845fa4..1ce916f2ec 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -149,7 +149,7 @@ grpc_error *grpc_error_create(const char *file, int line, grpc_slice desc,
grpc_error_create(__FILE__, __LINE__, grpc_slice_from_copied_string(desc), \
errs, count)
-//#define GRPC_ERROR_REFCOUNT_DEBUG
+// #define GRPC_ERROR_REFCOUNT_DEBUG
#ifdef GRPC_ERROR_REFCOUNT_DEBUG
grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line,
const char *func);
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 63f6962ede..e0c05457e3 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -237,7 +237,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
close(fd->fd);
}
- grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error));
grpc_iomgr_unregister_object(&fd->iomgr_object);
grpc_lfev_destroy(&fd->read_closure);
@@ -427,7 +427,7 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
grpc_pollset *pollset) {
if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL &&
pollset->begin_refs == 0) {
- grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE);
pollset->shutdown_closure = NULL;
}
}
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index d7ae0d371e..761e2ed5fe 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -965,7 +965,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->po.pi = NULL;
}
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
@@ -1250,7 +1250,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
/* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->po.mu lock must be held by the caller before calling this */
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index 8692b5b3c3..0ab3594818 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -515,7 +515,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->eps = NULL;
}
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->mu);
@@ -716,7 +716,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
/* Release the ref and set pollset->eps to NULL */
pollset_release_epoll_set(exec_ctx, pollset, "ps_shutdown");
- grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->mu lock must be held by the caller before calling this */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index a4c6a19d6d..50e3096359 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -270,7 +270,7 @@ static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n) {
#endif
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
- grpc_closure_sched(exec_ctx, grpc_closure_create(fd_destroy, fd,
+ GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(fd_destroy, fd,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
} else {
@@ -368,7 +368,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
to be alive (and not added to freelist) until the end of this function */
REF_BY(fd, 1, reason);
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->orphaned_mu);
gpr_mu_unlock(&fd->pollable.po.mu);
@@ -981,8 +981,8 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
pollable_add_fd(&pollset->pollable, had_fd);
pollable_add_fd(&pollset->pollable, fd);
}
- grpc_closure_sched(exec_ctx,
- grpc_closure_create(unref_fd_no_longer_poller, had_fd,
+ GRPC_CLOSURE_SCHED(exec_ctx,
+ GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 29a86f73ff..fa4b4e8d0a 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -893,7 +893,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
fd->po.pi = NULL;
}
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error));
gpr_mu_unlock(&fd->po.mu);
UNREF_BY(fd, 2, reason); /* Drop the reference */
@@ -1147,7 +1147,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
/* Release the ref and set pollset->po.pi to NULL */
pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown");
- grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
/* pollset->po.mu lock must be held by the caller before calling this */
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
index 6145c9ec3f..d1a27b8228 100644
--- a/src/core/lib/iomgr/ev_poll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -386,7 +386,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
if (!fd->released) {
close(fd->fd);
}
- grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
}
static int fd_wrapped_fd(grpc_fd *fd) {
@@ -445,7 +445,7 @@ static grpc_error *fd_shutdown_error(grpc_fd *fd) {
static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure **st, grpc_closure *closure) {
if (fd->shutdown) {
- grpc_closure_sched(exec_ctx, closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown"));
} else if (*st == CLOSURE_NOT_READY) {
/* not ready ==> switch to a waiting state by setting the closure */
@@ -453,7 +453,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
} else if (*st == CLOSURE_READY) {
/* already ready ==> queue the closure to run immediately */
*st = CLOSURE_NOT_READY;
- grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd));
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, fd_shutdown_error(fd));
maybe_wake_one_watcher_locked(fd);
} else {
/* upcallptr was set to a different closure. This is an error! */
@@ -476,7 +476,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
return 0;
} else {
/* waiting ==> queue closure */
- grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd));
+ GRPC_CLOSURE_SCHED(exec_ctx, *st, fd_shutdown_error(fd));
*st = CLOSURE_NOT_READY;
return 1;
}
@@ -834,7 +834,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
}
pollset->fd_count = 0;
- grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE);
}
static void work_combine_error(grpc_error **composite, grpc_error *error) {
@@ -883,7 +883,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (!pollset_has_workers(pollset) &&
!grpc_closure_list_empty(pollset->idle_jobs)) {
GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
- grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
goto done;
}
/* If we're shutting down then we don't execute any extended work */
@@ -1056,7 +1056,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
* TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
gpr_mu_lock(&pollset->mu);
} else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
- grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
gpr_mu_unlock(&pollset->mu);
grpc_exec_ctx_flush(exec_ctx);
gpr_mu_lock(&pollset->mu);
@@ -1075,7 +1075,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutdown_done = closure;
pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset_has_workers(pollset)) {
- grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs);
}
if (!pollset->called_shutdown && !pollset_has_observers(pollset)) {
pollset->called_shutdown = 1;
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 6699be3646..51c36216c5 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -62,7 +62,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
did_something = true;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
@@ -85,7 +85,7 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
grpc_error *error) {
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
closure->scheduled = false;
#endif
closure->cb(exec_ctx, closure->cb_arg, error);
diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c
index 7621a7fe75..fda274e797 100644
--- a/src/core/lib/iomgr/executor.c
+++ b/src/core/lib/iomgr/executor.c
@@ -58,7 +58,7 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) {
while (c != NULL) {
grpc_closure *next = c->next_data.next;
grpc_error *error = c->error_data.error;
-#ifndef NDEBUG
+#ifdef GRPC_CLOSURE_RICH_DEBUG
c->scheduled = false;
#endif
c->cb(exec_ctx, c->cb_arg, error);
diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.c
index 6fa285b5f3..c2ceecb3c5 100644
--- a/src/core/lib/iomgr/lockfree_event.c
+++ b/src/core/lib/iomgr/lockfree_event.c
@@ -112,7 +112,7 @@ void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
closure when transitioning out of CLOSURE_NO_READY state (i.e there
is no other code that needs to 'happen-after' this) */
if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
return; /* Successful. Return */
}
@@ -125,7 +125,7 @@ void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
schedule the closure with the shutdown error */
if ((curr & FD_SHUTDOWN_BIT) > 0) {
grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT);
- grpc_closure_sched(exec_ctx, closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, closure,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
return;
@@ -177,7 +177,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
happens-after on that edge), and a release to pair with anything
loading the shutdown state. */
if (gpr_atm_full_cas(state, curr, new_state)) {
- grpc_closure_sched(exec_ctx, (grpc_closure *)curr,
+ GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)curr,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"FD Shutdown", &shutdown_err, 1));
return true;
@@ -226,7 +226,7 @@ void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state) {
spurious set_ready; release pairs with this or the acquire in
notify_on (or set_shutdown) */
else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) {
- grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE);
return;
}
/* else the state changed again (only possible by either a racing
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index 8ff647f03c..07c7c04559 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -88,7 +88,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
// kick the loop once
uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
}
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
}
void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c
index ab1a327b46..d5a03732cb 100644
--- a/src/core/lib/iomgr/pollset_windows.c
+++ b/src/core/lib/iomgr/pollset_windows.c
@@ -95,7 +95,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->shutting_down = 1;
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
if (!pollset->is_iocp_worker) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
pollset->on_shutdown = closure;
}
@@ -143,7 +143,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
if (pollset->shutting_down && pollset->on_shutdown != NULL) {
- grpc_closure_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE);
pollset->on_shutdown = NULL;
}
goto done;
diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c
index a78de66941..35dedc23de 100644
--- a/src/core/lib/iomgr/resolve_address_posix.c
+++ b/src/core/lib/iomgr/resolve_address_posix.c
@@ -154,7 +154,7 @@ typedef struct {
static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
grpc_error *error) {
request *r = rp;
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, r->on_done,
grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out));
gpr_free(r->name);
@@ -175,13 +175,13 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r,
+ GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addrs_out = addrs;
- grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index f1ea516175..45de289e45 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -124,7 +124,7 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
/* Either no retry was attempted, or the retry failed. Either way, the
original error probably has more interesting information */
error = handle_addrinfo_result(status, res, r->addresses);
- grpc_closure_sched(&exec_ctx, r->on_done, error);
+ GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);
gpr_free(r->hints);
gpr_free(r);
@@ -225,7 +225,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
int s;
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
- grpc_closure_sched(exec_ctx, on_done, err);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, err);
return;
}
r = gpr_malloc(sizeof(request));
@@ -252,7 +252,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getaddrinfo failed");
err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR,
grpc_slice_from_static_string(uv_strerror(s)));
- grpc_closure_sched(exec_ctx, on_done, err);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, err);
gpr_free(r);
gpr_free(req);
gpr_free(hints);
diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c
index 83adfd338a..45cfd7248d 100644
--- a/src/core/lib/iomgr/resolve_address_windows.c
+++ b/src/core/lib/iomgr/resolve_address_windows.c
@@ -139,7 +139,7 @@ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp,
} else {
GRPC_ERROR_REF(error);
}
- grpc_closure_sched(exec_ctx, r->on_done, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, r->on_done, error);
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r);
@@ -158,13 +158,13 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_closure *on_done,
grpc_resolved_addresses **addresses) {
request *r = gpr_malloc(sizeof(request));
- grpc_closure_init(&r->request_closure, do_request_thread, r,
+ GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r,
grpc_executor_scheduler);
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->on_done = on_done;
r->addresses = addresses;
- grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &r->request_closure, GRPC_ERROR_NONE);
}
void (*grpc_resolve_address)(
diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c
index 7c3fb93279..f2cc1be74e 100644
--- a/src/core/lib/iomgr/resource_quota.c
+++ b/src/core/lib/iomgr/resource_quota.c
@@ -259,7 +259,7 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx,
if (resource_quota->step_scheduled) return;
resource_quota->step_scheduled = true;
grpc_resource_quota_ref_internal(resource_quota);
- grpc_closure_sched(exec_ctx, &resource_quota->rq_step_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &resource_quota->rq_step_closure,
GRPC_ERROR_NONE);
}
@@ -305,7 +305,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx,
}
if (resource_user->free_pool >= 0) {
resource_user->allocating = false;
- grpc_closure_list_sched(exec_ctx, &resource_user->on_allocated);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated);
gpr_mu_unlock(&resource_user->mu);
} else {
rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION);
@@ -363,7 +363,7 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx,
resource_quota->debug_only_last_reclaimer_resource_user = resource_user;
resource_quota->debug_only_last_initiated_reclaimer = c;
resource_user->reclaimers[destructive] = NULL;
- grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_NONE);
return true;
}
@@ -444,7 +444,7 @@ static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx,
resource_user->new_reclaimers[destructive] = NULL;
GPR_ASSERT(resource_user->reclaimers[destructive] == NULL);
if (gpr_atm_acq_load(&resource_user->shutdown) > 0) {
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CANCELLED);
return false;
}
resource_user->reclaimers[destructive] = closure;
@@ -485,9 +485,9 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru,
static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
grpc_resource_user *resource_user = ru;
- grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED);
- grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED);
resource_user->reclaimers[0] = NULL;
resource_user->reclaimers[1] = NULL;
@@ -501,9 +501,9 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) {
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
rulist_remove(resource_user, (grpc_rulist)i);
}
- grpc_closure_sched(exec_ctx, resource_user->reclaimers[0],
+ GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0],
GRPC_ERROR_CANCELLED);
- grpc_closure_sched(exec_ctx, resource_user->reclaimers[1],
+ GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[1],
GRPC_ERROR_CANCELLED);
if (resource_user->free_pool != 0) {
resource_user->resource_quota->free_pool += resource_user->free_pool;
@@ -525,7 +525,7 @@ static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg,
slice_allocator->length));
}
}
- grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_RUN(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error));
}
/*******************************************************************************
@@ -579,9 +579,9 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) {
gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR,
(intptr_t)resource_quota);
}
- grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota,
+ GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota,
grpc_combiner_finally_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_quota->rq_reclamation_done_closure,
+ GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure,
rq_reclamation_done, resource_quota,
grpc_combiner_scheduler(resource_quota->combiner));
for (int i = 0; i < GRPC_RULIST_COUNT; i++) {
@@ -633,8 +633,8 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota,
a->size = (int64_t)size;
gpr_atm_no_barrier_store(&resource_quota->last_size,
(gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size));
- grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
- grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_SCHED(&exec_ctx, &a->closure, GRPC_ERROR_NONE);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -686,19 +686,19 @@ grpc_resource_user *grpc_resource_user_create(
grpc_resource_user *resource_user = gpr_malloc(sizeof(*resource_user));
resource_user->resource_quota =
grpc_resource_quota_ref_internal(resource_quota);
- grpc_closure_init(&resource_user->allocate_closure, &ru_allocate,
+ GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate,
resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_user->add_to_free_pool_closure,
+ GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure,
&ru_add_to_free_pool, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_user->post_reclaimer_closure[0],
+ GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0],
&ru_post_benign_reclaimer, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_user->post_reclaimer_closure[1],
+ GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1],
&ru_post_destructive_reclaimer, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
- grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user,
+ GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user,
grpc_combiner_scheduler(resource_quota->combiner));
gpr_mu_init(&resource_user->mu);
gpr_atm_rel_store(&resource_user->refs, 1);
@@ -739,7 +739,7 @@ static void ru_unref_by(grpc_exec_ctx *exec_ctx,
gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount);
GPR_ASSERT(old >= amount);
if (old == amount) {
- grpc_closure_sched(exec_ctx, &resource_user->destroy_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->destroy_closure,
GRPC_ERROR_NONE);
}
}
@@ -756,9 +756,9 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx,
grpc_resource_user *resource_user) {
if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx,
- grpc_closure_create(
+ GRPC_CLOSURE_CREATE(
ru_shutdown, resource_user,
grpc_combiner_scheduler(resource_user->resource_quota->combiner)),
GRPC_ERROR_NONE);
@@ -781,11 +781,11 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_NONE);
if (!resource_user->allocating) {
resource_user->allocating = true;
- grpc_closure_sched(exec_ctx, &resource_user->allocate_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->allocate_closure,
GRPC_ERROR_NONE);
}
} else {
- grpc_closure_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
}
@@ -804,7 +804,7 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx,
if (is_bigger_than_zero && was_zero_or_negative &&
!resource_user->added_to_free_pool) {
resource_user->added_to_free_pool = true;
- grpc_closure_sched(exec_ctx, &resource_user->add_to_free_pool_closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->add_to_free_pool_closure,
GRPC_ERROR_NONE);
}
gpr_mu_unlock(&resource_user->mu);
@@ -817,7 +817,7 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx,
grpc_closure *closure) {
GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL);
resource_user->new_reclaimers[destructive] = closure;
- grpc_closure_sched(exec_ctx,
+ GRPC_CLOSURE_SCHED(exec_ctx,
&resource_user->post_reclaimer_closure[destructive],
GRPC_ERROR_NONE);
}
@@ -828,7 +828,7 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete",
resource_user->resource_quota->name, resource_user->name);
}
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, &resource_user->resource_quota->rq_reclamation_done_closure,
GRPC_ERROR_NONE);
}
@@ -836,9 +836,9 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx,
void grpc_resource_user_slice_allocator_init(
grpc_resource_user_slice_allocator *slice_allocator,
grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) {
- grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices,
+ GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices,
slice_allocator, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&slice_allocator->on_done, cb, p,
+ GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p,
grpc_schedule_on_exec_ctx);
slice_allocator->resource_user = resource_user;
}
diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c
index f73e33db86..a0d731b942 100644
--- a/src/core/lib/iomgr/socket_windows.c
+++ b/src/core/lib/iomgr/socket_windows.c
@@ -116,7 +116,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&socket->state_mu);
if (info->has_pending_iocp) {
info->has_pending_iocp = 0;
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
} else {
info->closure = closure;
}
@@ -139,7 +139,7 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
GPR_ASSERT(!info->has_pending_iocp);
gpr_mu_lock(&socket->state_mu);
if (info->closure) {
- grpc_closure_sched(exec_ctx, info->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, info->closure, GRPC_ERROR_NONE);
info->closure = NULL;
} else {
info->has_pending_iocp = 1;
diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c
index abad3c2f22..21e320a6e7 100644
--- a/src/core/lib/iomgr/tcp_client_posix.c
+++ b/src/core/lib/iomgr/tcp_client_posix.c
@@ -234,7 +234,7 @@ finish:
grpc_channel_args_destroy(exec_ctx, ac->channel_args);
gpr_free(ac);
}
- grpc_closure_sched(exec_ctx, closure, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
}
static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
@@ -263,7 +263,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd);
if (error != GRPC_ERROR_NONE) {
- grpc_closure_sched(exec_ctx, closure, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
return;
}
if (dsmode == GRPC_DSMODE_IPV4) {
@@ -272,7 +272,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr = &addr4_copy;
}
if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) {
- grpc_closure_sched(exec_ctx, closure, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, error);
return;
}
@@ -290,13 +290,13 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
if (err >= 0) {
*ep =
grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str);
- grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE);
goto done;
}
if (errno != EWOULDBLOCK && errno != EINPROGRESS) {
grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error");
- grpc_closure_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
+ GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"));
goto done;
}
@@ -311,7 +311,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
addr_str = NULL;
gpr_mu_init(&ac->mu);
ac->refs = 2;
- grpc_closure_init(&ac->write_closure, on_writable, ac,
+ GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac,
grpc_schedule_on_exec_ctx);
ac->channel_args = grpc_channel_args_copy(channel_args);
@@ -321,7 +321,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
}
gpr_mu_lock(&ac->mu);
- grpc_closure_init(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
&ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC));
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index f4084339d6..ab6832932f 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -107,7 +107,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) {
if (done) {
uv_tcp_connect_cleanup(&exec_ctx, connect);
}
- grpc_closure_sched(&exec_ctx, closure, error);
+ GRPC_CLOSURE_SCHED(&exec_ctx, closure, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -150,7 +150,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
uv_tcp_connect(&connect->connect_req, connect->tcp_handle,
(const struct sockaddr *)resolved_addr->addr,
uv_tc_on_connect);
- grpc_closure_init(&connect->on_alarm, uv_tc_on_alarm, connect,
+ GRPC_CLOSURE_INIT(&connect->on_alarm, uv_tc_on_alarm, connect,
grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &connect->alarm,
gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index e0913cfaed..fc62105cc9 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -116,7 +116,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) {
async_connect_unlock_and_cleanup(exec_ctx, ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
- grpc_closure_sched(exec_ctx, on_done, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, error);
}
/* Tries to issue one async connection, then schedules both an IOCP
@@ -201,9 +201,9 @@ static void tcp_client_connect_impl(
ac->addr_name = grpc_sockaddr_to_uri(addr);
ac->endpoint = endpoint;
ac->channel_args = grpc_channel_args_copy(channel_args);
- grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx);
grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm,
gpr_now(GPR_CLOCK_MONOTONIC));
grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect);
@@ -222,7 +222,7 @@ failure:
} else if (sock != INVALID_SOCKET) {
closesocket(sock);
}
- grpc_closure_sched(exec_ctx, on_done, final_error);
+ GRPC_CLOSURE_SCHED(exec_ctx, on_done, final_error);
}
// overridden by api_fuzzer.c
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index a2404f97c1..66e81bf81f 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -221,7 +221,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
tcp->read_cb = NULL;
tcp->incoming_buffer = NULL;
- grpc_closure_run(exec_ctx, cb, error);
+ GRPC_CLOSURE_RUN(exec_ctx, cb, error);
}
#define MAX_READ_IOVEC 4
@@ -348,7 +348,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
tcp->finished_edge = false;
grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
} else {
- grpc_closure_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE);
}
}
@@ -465,7 +465,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
gpr_log(GPR_DEBUG, "write: %s", str);
}
- grpc_closure_run(exec_ctx, cb, error);
+ GRPC_CLOSURE_RUN(exec_ctx, cb, error);
TCP_UNREF(exec_ctx, tcp, "write");
}
}
@@ -491,7 +491,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (buf->length == 0) {
GPR_TIMER_END("tcp_write", 0);
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, cb,
grpc_fd_is_shutdown(tcp->em_fd)
? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"),
@@ -515,7 +515,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "write: %s", str);
}
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
GPR_TIMER_END("tcp_write", 0);
@@ -616,9 +616,9 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd,
gpr_ref_init(&tcp->refcount, 1);
gpr_atm_no_barrier_store(&tcp->shutdown_count, 0);
tcp->em_fd = em_fd;
- grpc_closure_init(&tcp->read_closure, tcp_handle_read, tcp,
+ GRPC_CLOSURE_INIT(&tcp->read_closure, tcp_handle_read, tcp,
grpc_schedule_on_exec_ctx);
- grpc_closure_init(&tcp->write_closure, tcp_handle_write, tcp,
+ GRPC_CLOSURE_INIT(&tcp->write_closure, tcp_handle_write, tcp,
grpc_schedule_on_exec_ctx);
grpc_slice_buffer_init(&tcp->last_read_buffer);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c
index 7bb8392d4b..f304642951 100644
--- a/src/core/lib/iomgr/tcp_server_posix.c
+++ b/src/core/lib/iomgr/tcp_server_posix.c
@@ -121,7 +121,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GPR_ASSERT(s->shutdown);
gpr_mu_unlock(&s->mu);
if (s->shutdown_complete != NULL) {
- grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -163,7 +163,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
grpc_tcp_listener *sp;
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"tcp_listener_shutdown");
@@ -503,7 +503,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
"clone_port", clone_port(sp, (unsigned)(pollset_count - 1))));
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
- grpc_closure_init(&sp->read_closure, on_read, sp,
+ GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
@@ -513,7 +513,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s,
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- grpc_closure_init(&sp->read_closure, on_read, sp,
+ GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
s->active_ports++;
@@ -540,7 +540,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 632a232861..2de0ea90e7 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -117,7 +117,7 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s,
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
GPR_ASSERT(s->shutdown);
if (s->shutdown_complete != NULL) {
- grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
while (s->head) {
@@ -171,7 +171,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_closure_list_sched(&local_exec_ctx, &s->shutdown_starting);
+ GRPC_CLOSURE_LIST_SCHED(&local_exec_ctx, &s->shutdown_starting);
if (exec_ctx == NULL) {
grpc_exec_ctx_flush(&local_exec_ctx);
tcp_server_destroy(&local_exec_ctx, s);
diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c
index e8343a94a0..0162afc1ad 100644
--- a/src/core/lib/iomgr/tcp_server_windows.c
+++ b/src/core/lib/iomgr/tcp_server_windows.c
@@ -134,10 +134,10 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg,
static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx,
grpc_tcp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
- grpc_closure_sched(exec_ctx, grpc_closure_create(destroy_server, s,
+ GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(destroy_server, s,
grpc_schedule_on_exec_ctx),
GRPC_ERROR_NONE);
}
@@ -176,7 +176,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
if (gpr_unref(&s->refs)) {
grpc_tcp_server_shutdown_listeners(exec_ctx, s);
gpr_mu_lock(&s->mu);
- grpc_closure_list_sched(exec_ctx, &s->shutdown_starting);
+ GRPC_CLOSURE_LIST_SCHED(exec_ctx, &s->shutdown_starting);
gpr_mu_unlock(&s->mu);
tcp_server_destroy(exec_ctx, s);
}
@@ -437,7 +437,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock,
sp->new_socket = INVALID_SOCKET;
sp->port = port;
sp->port_index = port_index;
- grpc_closure_init(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->socket);
gpr_mu_unlock(&s->mu);
*listener = sp;
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index a37a75c8fa..ab5bb9f7da 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -161,7 +161,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread,
// nread < 0: Error
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed");
}
- grpc_closure_sched(&exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -183,7 +183,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
error =
grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR,
grpc_slice_from_static_string(uv_strerror(status)));
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
const char *str = grpc_error_string(error);
@@ -210,7 +210,7 @@ static void write_callback(uv_write_t *req, int status) {
gpr_free(tcp->write_buffers);
grpc_resource_user_free(&exec_ctx, tcp->resource_user,
sizeof(uv_buf_t) * tcp->write_slices->count);
- grpc_closure_sched(&exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(&exec_ctx, cb, error);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -236,7 +236,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
}
if (tcp->shutting_down) {
- grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"TCP socket is shutting down"));
return;
}
@@ -247,7 +247,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
if (tcp->write_slices->count == 0) {
// No slices means we don't have to do anything,
// and libuv doesn't like empty writes
- grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE);
return;
}
diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c
index dbae42e936..161b397534 100644
--- a/src/core/lib/iomgr/tcp_windows.c
+++ b/src/core/lib/iomgr/tcp_windows.c
@@ -179,7 +179,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
tcp->read_cb = NULL;
TCP_UNREF(exec_ctx, tcp, "read");
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
@@ -193,7 +193,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
WSABUF buffer;
if (tcp->shutting_down) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, cb,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
@@ -220,7 +220,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
/* Did we get data immediately ? Yay. */
if (info->wsa_error != WSAEWOULDBLOCK) {
info->bytes_transfered = bytes_read;
- grpc_closure_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE);
return;
}
@@ -233,7 +233,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
info->wsa_error = wsa_error;
- grpc_closure_sched(exec_ctx, &tcp->on_read,
+ GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read,
GRPC_WSA_ERROR(info->wsa_error, "WSARecv"));
return;
}
@@ -265,7 +265,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) {
}
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
}
/* Initiates a write. */
@@ -283,7 +283,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
size_t len;
if (tcp->shutting_down) {
- grpc_closure_sched(
+ GRPC_CLOSURE_SCHED(
exec_ctx, cb,
GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"TCP socket is shutting down", &tcp->shutdown_error, 1));
@@ -317,7 +317,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *error = status == 0
? GRPC_ERROR_NONE
: GRPC_WSA_ERROR(info->wsa_error, "WSASend");
- grpc_closure_sched(exec_ctx, cb, error);
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, error);
if (allocated) gpr_free(allocated);
return;
}
@@ -335,7 +335,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
int wsa_error = WSAGetLastError();
if (wsa_error != WSA_IO_PENDING) {
TCP_UNREF(exec_ctx, tcp, "write");
- grpc_closure_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
+ GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"));
return;
}
}
@@ -426,8 +426,8 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket,
tcp->socket = socket;
gpr_mu_init(&tcp->mu);
gpr_ref_init(&tcp->refcount, 1);
- grpc_closure_init(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
- grpc_closure_init(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx);
+ GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx);
tcp->peer_string = gpr_strdup(peer_string);
tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string);
/* Tell network status tracking code about the new endpoint */
diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c
index 69b3cfd139..bf73d2c685 100644
--- a/src/core/lib/iomgr/timer_generic.c
+++ b/src/core/lib/iomgr/timer_generic.c
@@ -230,7 +230,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
if (!g_shared_mutables.initialized) {
timer->pending = false;
- grpc_closure_sched(exec_ctx, timer->closure,
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure,
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Attempt to create timer before initialization"));
return;
@@ -240,7 +240,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
timer->pending = true;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->pending = false;
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE);
gpr_mu_unlock(&shard->mu);
/* early out */
return;
@@ -310,7 +310,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
timer->pending ? "true" : "false");
}
if (timer->pending) {
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
timer->pending = false;
if (timer->heap_index == INVALID_HEAP_INDEX) {
list_remove(timer);
@@ -400,7 +400,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
grpc_timer *timer;
gpr_mu_lock(&shard->mu);
while ((timer = pop_one(shard, now))) {
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_REF(error));
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_REF(error));
n++;
}
*new_min_deadline = compute_min_deadline(shard);
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index f814f0e474..4f204cfbf8 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -44,7 +44,7 @@ void run_expired_timer(uv_timer_t *handle) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
GPR_ASSERT(timer->pending);
timer->pending = 0;
- grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
stop_uv_timer(handle);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -57,7 +57,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
timer->closure = closure;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->pending = 0;
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE);
return;
}
timer->pending = 1;
@@ -76,7 +76,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
if (timer->pending) {
timer->pending = 0;
- grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
+ GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
stop_uv_timer((uv_timer_t *)timer->uv_timer);
}
}
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index 200a722c7e..54e7f417a7 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -156,7 +156,7 @@ static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
if (s->shutdown_complete != NULL) {
- grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
+ GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE);
}
gpr_mu_destroy(&s->mu);
@@ -201,13 +201,13 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
for (sp = s->head; sp; sp = sp->next) {
grpc_unlink_if_unix_domain_socket(&sp->addr);
- grpc_closure_init(&sp->destroyed_closure, destroyed_port, s,
+ GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s,
grpc_schedule_on_exec_ctx);
if (!sp->orphan_notified) {
/* Call the orphan_cb to signal that the FD is about to be closed and
* should no longer be used. Because at this point, all listening ports
* have been shutdown already, no need to shutdown again.*/
- grpc_closure_init(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
+ GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd,
grpc_schedule_on_exec_ctx);
GPR_ASSERT(sp->orphan_cb);
sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
@@ -240,7 +240,7 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
struct shutdown_fd_args *args = gpr_malloc(sizeof(*args));
args->fd = sp->emfd;
args->server_mu = &s->mu;
- grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, args,
+ GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args,
grpc_schedule_on_exec_ctx);
sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure,
sp->server->user_data);
@@ -525,11 +525,11 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
for (i = 0; i < pollset_count; i++) {
grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd);
}
- grpc_closure_init(&sp->read_closure, on_read, sp,
+ GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
- grpc_closure_init(&sp->write_closure, on_write, sp,
+ GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp,
grpc_schedule_on_exec_ctx);
grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure);