diff options
author | ncteisen <ncteisen@gmail.com> | 2017-06-08 14:57:11 -0700 |
---|---|---|
committer | ncteisen <ncteisen@gmail.com> | 2017-06-08 15:29:29 -0700 |
commit | 274bbbe6a0bd56651d48d974e1ccd80cf68689df (patch) | |
tree | 240da3d682eea3d09fdb79c21157462c7fe37a74 /src/core/lib | |
parent | 1621f5e05152a9d0b7c7ca6341d45fd44cf56701 (diff) |
Add rich closure debug mode
Diffstat (limited to 'src/core/lib')
52 files changed, 335 insertions, 229 deletions
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index d6877f6a91..b0559ad745 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -126,7 +126,7 @@ typedef struct { /* Destroy per call data. The filter does not need to do any chaining. The bottom filter of a stack will be passed a non-NULL pointer to - \a then_schedule_closure that should be passed to grpc_closure_sched when + \a then_schedule_closure that should be passed to GRPC_CLOSURE_SCHED when destruction is complete. \a final_info contains data about the completed call, mainly for reporting purposes. */ void (*destroy_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, diff --git a/src/core/lib/channel/handshaker.c b/src/core/lib/channel/handshaker.c index a351e98203..2cb83f4114 100644 --- a/src/core/lib/channel/handshaker.c +++ b/src/core/lib/channel/handshaker.c @@ -190,7 +190,7 @@ static bool call_next_handshaker_locked(grpc_exec_ctx* exec_ctx, // Cancel deadline timer, since we're invoking the on_handshake_done // callback now. grpc_timer_cancel(exec_ctx, &mgr->deadline_timer); - grpc_closure_sched(exec_ctx, &mgr->on_handshake_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, &mgr->on_handshake_done, error); mgr->shutdown = true; } else { grpc_handshaker_do_handshake(exec_ctx, mgr->handshakers[mgr->index], @@ -245,13 +245,13 @@ void grpc_handshake_manager_do_handshake( grpc_slice_buffer_init(mgr->args.read_buffer); // Initialize state needed for calling handshakers. mgr->acceptor = acceptor; - grpc_closure_init(&mgr->call_next_handshaker, call_next_handshaker, mgr, + GRPC_CLOSURE_INIT(&mgr->call_next_handshaker, call_next_handshaker, mgr, grpc_schedule_on_exec_ctx); - grpc_closure_init(&mgr->on_handshake_done, on_handshake_done, &mgr->args, + GRPC_CLOSURE_INIT(&mgr->on_handshake_done, on_handshake_done, &mgr->args, grpc_schedule_on_exec_ctx); // Start deadline timer, which owns a ref. gpr_ref(&mgr->refs); - grpc_closure_init(&mgr->on_timeout, on_timeout, mgr, + GRPC_CLOSURE_INIT(&mgr->on_timeout, on_timeout, mgr, grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &mgr->deadline_timer, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 492eb5ad7b..1b7e2cfe68 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -90,7 +90,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent, req->context->pollset_set); - grpc_closure_sched(exec_ctx, req->on_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, req->on_done, error); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); @@ -213,7 +213,7 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req, return; } addr = &req->addresses->addrs[req->next_address++]; - grpc_closure_init(&req->connected, on_connected, req, + GRPC_CLOSURE_INIT(&req->connected, on_connected, req, grpc_schedule_on_exec_ctx); grpc_arg arg; arg.key = GRPC_ARG_RESOURCE_QUOTA; @@ -256,8 +256,8 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, req->pollent = pollent; req->overall_error = GRPC_ERROR_NONE; req->resource_quota = grpc_resource_quota_ref_internal(resource_quota); - grpc_closure_init(&req->on_read, on_read, req, grpc_schedule_on_exec_ctx); - grpc_closure_init(&req->done_write, done_write, req, + GRPC_CLOSURE_INIT(&req->on_read, on_read, req, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&req->done_write, done_write, req, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&req->incoming); grpc_slice_buffer_init(&req->outgoing); @@ -271,7 +271,7 @@ static void internal_request_begin(grpc_exec_ctx *exec_ctx, grpc_resolve_address( exec_ctx, request->host, req->handshaker->default_port, req->context->pollset_set, - grpc_closure_create(on_resolved, req, grpc_schedule_on_exec_ctx), + GRPC_CLOSURE_CREATE(on_resolved, req, grpc_schedule_on_exec_ctx), &req->addresses); } diff --git a/src/core/lib/http/httpcli_security_connector.c b/src/core/lib/http/httpcli_security_connector.c index 6500192277..34a77c3bf4 100644 --- a/src/core/lib/http/httpcli_security_connector.c +++ b/src/core/lib/http/httpcli_security_connector.c @@ -85,7 +85,7 @@ static void httpcli_ssl_check_peer(grpc_exec_ctx *exec_ctx, error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); } - grpc_closure_sched(exec_ctx, on_peer_checked, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); tsi_peer_destruct(&peer); } diff --git a/src/core/lib/iomgr/closure.c b/src/core/lib/iomgr/closure.c index 72a1b20443..719e2e8cee 100644 --- a/src/core/lib/iomgr/closure.c +++ b/src/core/lib/iomgr/closure.c @@ -24,14 +24,26 @@ #include "src/core/lib/profiling/timers.h" +#ifdef GRPC_CLOSURE_RICH_DEBUG +grpc_closure *grpc_closure_init(const char *file, int line, + grpc_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg, + grpc_closure_scheduler *scheduler) { +#else grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg, grpc_closure_scheduler *scheduler) { +#endif closure->cb = cb; closure->cb_arg = cb_arg; closure->scheduler = scheduler; -#ifndef NDEBUG +#ifdef GRPC_CLOSURE_RICH_DEBUG closure->scheduled = false; + closure->file_initiated = NULL; + closure->line_initiated = 0; + closure->run = false; + closure->file_created = file; + closure->line_created = line; #endif return closure; } @@ -100,19 +112,39 @@ static void closure_wrapper(grpc_exec_ctx *exec_ctx, void *arg, cb(exec_ctx, cb_arg, error); } +#ifdef GRPC_CLOSURE_RICH_DEBUG +grpc_closure *grpc_closure_create(const char *file, int line, + grpc_iomgr_cb_func cb, void *cb_arg, + grpc_closure_scheduler *scheduler) { +#else grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg, grpc_closure_scheduler *scheduler) { +#endif wrapped_closure *wc = gpr_malloc(sizeof(*wc)); wc->cb = cb; wc->cb_arg = cb_arg; +#ifdef GRPC_CLOSURE_RICH_DEBUG + grpc_closure_init(file, line, &wc->wrapper, closure_wrapper, wc, scheduler); +#else grpc_closure_init(&wc->wrapper, closure_wrapper, wc, scheduler); +#endif return &wc->wrapper; } +#ifdef GRPC_CLOSURE_RICH_DEBUG +void grpc_closure_run(const char *file, int line, grpc_exec_ctx *exec_ctx, + grpc_closure *c, grpc_error *error) { +#else void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { +#endif GPR_TIMER_BEGIN("grpc_closure_run", 0); if (c != NULL) { +#ifdef GRPC_CLOSURE_RICH_DEBUG + c->file_initiated = file; + c->line_initiated = line; + c->run = true; +#endif assert(c->cb); c->scheduler->vtable->run(exec_ctx, c, error); } else { @@ -121,13 +153,21 @@ void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *c, GPR_TIMER_END("grpc_closure_run", 0); } +#ifdef GRPC_CLOSURE_RICH_DEBUG +void grpc_closure_sched(const char *file, int line, grpc_exec_ctx *exec_ctx, + grpc_closure *c, grpc_error *error) { +#else void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c, grpc_error *error) { +#endif GPR_TIMER_BEGIN("grpc_closure_sched", 0); if (c != NULL) { -#ifndef NDEBUG +#ifdef GRPC_CLOSURE_RICH_DEBUG GPR_ASSERT(!c->scheduled); c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; #endif assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, error); @@ -137,13 +177,21 @@ void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *c, GPR_TIMER_END("grpc_closure_sched", 0); } +#ifdef GRPC_CLOSURE_RICH_DEBUG +void grpc_closure_list_sched(const char *file, int line, + grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { +#else void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *list) { +#endif grpc_closure *c = list->head; while (c != NULL) { grpc_closure *next = c->next_data.next; -#ifndef NDEBUG +#ifdef GRPC_CLOSURE_RICH_DEBUG GPR_ASSERT(!c->scheduled); c->scheduled = true; + c->file_initiated = file; + c->line_initiated = line; + c->run = false; #endif assert(c->cb); c->scheduler->vtable->sched(exec_ctx, c, c->error_data.error); diff --git a/src/core/lib/iomgr/closure.h b/src/core/lib/iomgr/closure.h index 25086fa483..3ecf9e947b 100644 --- a/src/core/lib/iomgr/closure.h +++ b/src/core/lib/iomgr/closure.h @@ -59,6 +59,8 @@ struct grpc_closure_scheduler { const grpc_closure_scheduler_vtable *vtable; }; +// #define GRPC_CLOSURE_RICH_DEBUG + /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { /** Once queued, next indicates the next queued closure; before then, scratch @@ -85,19 +87,47 @@ struct grpc_closure { uintptr_t scratch; } error_data; -#ifndef NDEBUG +// extra tracing and debugging for grpc_closure. This incurs a decent amount of +// overhead per closure, so it must be enabled at compile time. +#ifdef GRPC_CLOSURE_RICH_DEBUG bool scheduled; + bool run; // true = run, false = scheduled + const char *file_created; + int line_created; + const char *file_initiated; + int line_initiated; #endif }; /** Initializes \a closure with \a cb and \a cb_arg. Returns \a closure. */ +#ifdef GRPC_CLOSURE_RICH_DEBUG +grpc_closure *grpc_closure_init(const char *file, int line, + grpc_closure *closure, grpc_iomgr_cb_func cb, + void *cb_arg, + grpc_closure_scheduler *scheduler); +#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ + grpc_closure_init(__FILE__, __LINE__, closure, cb, cb_arg, scheduler) +#else grpc_closure *grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg, grpc_closure_scheduler *scheduler); +#define GRPC_CLOSURE_INIT(closure, cb, cb_arg, scheduler) \ + grpc_closure_init(closure, cb, cb_arg, scheduler) +#endif /* Create a heap allocated closure: try to avoid except for very rare events */ +#ifdef GRPC_CLOSURE_RICH_DEBUG +grpc_closure *grpc_closure_create(const char *file, int line, + grpc_iomgr_cb_func cb, void *cb_arg, + grpc_closure_scheduler *scheduler); +#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ + grpc_closure_create(__FILE__, __LINE__, cb, cb_arg, scheduler) +#else grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg, grpc_closure_scheduler *scheduler); +#define GRPC_CLOSURE_CREATE(cb, cb_arg, scheduler) \ + grpc_closure_create(cb, cb_arg, scheduler) +#endif #define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } @@ -123,16 +153,44 @@ bool grpc_closure_list_empty(grpc_closure_list list); /** Run a closure directly. Caller ensures that no locks are being held above. * Note that calling this at the end of a closure callback function itself is * by definition safe. */ +#ifdef GRPC_CLOSURE_RICH_DEBUG +void grpc_closure_run(const char *file, int line, grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_error *error); +#define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \ + grpc_closure_run(__FILE__, __LINE__, exec_ctx, closure, error) +#else void grpc_closure_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error); +#define GRPC_CLOSURE_RUN(exec_ctx, closure, error) \ + grpc_closure_run(exec_ctx, closure, error) +#endif /** Schedule a closure to be run. Does not need to be run from a safe point. */ +#ifdef GRPC_CLOSURE_RICH_DEBUG +void grpc_closure_sched(const char *file, int line, grpc_exec_ctx *exec_ctx, + grpc_closure *closure, grpc_error *error); +#define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \ + grpc_closure_sched(__FILE__, __LINE__, exec_ctx, closure, error) +#else void grpc_closure_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error); +#define GRPC_CLOSURE_SCHED(exec_ctx, closure, error) \ + grpc_closure_sched(exec_ctx, closure, error) +#endif /** Schedule all closures in a list to be run. Does not need to be run from a * safe point. */ +#ifdef GRPC_CLOSURE_RICH_DEBUG +void grpc_closure_list_sched(const char *file, int line, + grpc_exec_ctx *exec_ctx, + grpc_closure_list *closure_list); +#define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \ + grpc_closure_list_sched(__FILE__, __LINE__, exec_ctx, closure_list) +#else void grpc_closure_list_sched(grpc_exec_ctx *exec_ctx, grpc_closure_list *closure_list); +#define GRPC_CLOSURE_LIST_SCHED(exec_ctx, closure_list) \ + grpc_closure_list_sched(exec_ctx, closure_list) +#endif #endif /* GRPC_CORE_LIB_IOMGR_CLOSURE_H */ diff --git a/src/core/lib/iomgr/combiner.c b/src/core/lib/iomgr/combiner.c index f6976c5650..750ff102ff 100644 --- a/src/core/lib/iomgr/combiner.c +++ b/src/core/lib/iomgr/combiner.c @@ -81,7 +81,7 @@ grpc_combiner *grpc_combiner_create(void) { gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED); gpr_mpscq_init(&lock->queue); grpc_closure_list_init(&lock->final_list); - grpc_closure_init(&lock->offload, offload, lock, grpc_executor_scheduler); + GRPC_CLOSURE_INIT(&lock->offload, offload, lock, grpc_executor_scheduler); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p create", lock)); return lock; } @@ -196,7 +196,7 @@ static void offload(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void queue_offload(grpc_exec_ctx *exec_ctx, grpc_combiner *lock) { move_next(exec_ctx); GRPC_COMBINER_TRACE(gpr_log(GPR_DEBUG, "C:%p queue_offload", lock)); - grpc_closure_sched(exec_ctx, &lock->offload, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &lock->offload, GRPC_ERROR_NONE); } bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { @@ -247,7 +247,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { GPR_TIMER_BEGIN("combiner.exec1", 0); grpc_closure *cl = (grpc_closure *)n; grpc_error *cl_err = cl->error_data.error; -#ifndef NDEBUG +#ifdef GRPC_CLOSURE_RICH_DEBUG cl->scheduled = false; #endif cl->cb(exec_ctx, cl->cb_arg, cl_err); @@ -264,7 +264,7 @@ bool grpc_combiner_continue_exec_ctx(grpc_exec_ctx *exec_ctx) { gpr_log(GPR_DEBUG, "C:%p execute_final[%d] c=%p", lock, loops, c)); grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; -#ifndef NDEBUG +#ifdef GRPC_CLOSURE_RICH_DEBUG c->scheduled = false; #endif c->cb(exec_ctx, c->cb_arg, error); @@ -332,8 +332,8 @@ static void combiner_finally_exec(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("combiner.execute_finally", 0); if (exec_ctx->active_combiner != lock) { GPR_TIMER_MARK("slowpath", 0); - grpc_closure_sched(exec_ctx, - grpc_closure_create(enqueue_finally, closure, + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_CREATE(enqueue_finally, closure, grpc_combiner_scheduler(lock)), error); GPR_TIMER_END("combiner.execute_finally", 0); diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index e626845fa4..1ce916f2ec 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -149,7 +149,7 @@ grpc_error *grpc_error_create(const char *file, int line, grpc_slice desc, grpc_error_create(__FILE__, __LINE__, grpc_slice_from_copied_string(desc), \ errs, count) -//#define GRPC_ERROR_REFCOUNT_DEBUG +// #define GRPC_ERROR_REFCOUNT_DEBUG #ifdef GRPC_ERROR_REFCOUNT_DEBUG grpc_error *grpc_error_ref(grpc_error *err, const char *file, int line, const char *func); diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c index 63f6962ede..e0c05457e3 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.c +++ b/src/core/lib/iomgr/ev_epoll1_linux.c @@ -237,7 +237,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, close(fd->fd); } - grpc_closure_sched(exec_ctx, on_done, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, on_done, GRPC_ERROR_REF(error)); grpc_iomgr_unregister_object(&fd->iomgr_object); grpc_lfev_destroy(&fd->read_closure); @@ -427,7 +427,7 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && pollset->begin_refs == 0) { - grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = NULL; } } diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c index d7ae0d371e..761e2ed5fe 100644 --- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c +++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c @@ -965,7 +965,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, fd->po.pi = NULL; } - grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->po.mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ @@ -1250,7 +1250,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, /* Release the ref and set pollset->po.pi to NULL */ pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); - grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); } /* pollset->po.mu lock must be held by the caller before calling this */ diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c index 8692b5b3c3..0ab3594818 100644 --- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c +++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c @@ -515,7 +515,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, fd->eps = NULL; } - grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->mu); @@ -716,7 +716,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, /* Release the ref and set pollset->eps to NULL */ pollset_release_epoll_set(exec_ctx, pollset, "ps_shutdown"); - grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); } /* pollset->mu lock must be held by the caller before calling this */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index c3627dc914..abcf7b429f 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -269,7 +269,7 @@ static void unref_by(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int n) { #endif old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - grpc_closure_sched(exec_ctx, grpc_closure_create(fd_destroy, fd, + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(fd_destroy, fd, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } else { @@ -367,7 +367,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, to be alive (and not added to freelist) until the end of this function */ REF_BY(fd, 1, reason); - grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->orphaned_mu); gpr_mu_unlock(&fd->pollable.po.mu); @@ -667,7 +667,7 @@ static grpc_error *fd_become_pollable_locked(grpc_fd *fd) { static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL) { - grpc_closure_sched(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); pollset->shutdown_closure = NULL; } } @@ -966,8 +966,8 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, pollable_add_fd(&pollset->pollable, had_fd); pollable_add_fd(&pollset->pollable, fd); } - grpc_closure_sched(exec_ctx, - grpc_closure_create(unref_fd_no_longer_poller, had_fd, + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_CREATE(unref_fd_no_longer_poller, had_fd, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c index 29a86f73ff..fa4b4e8d0a 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.c +++ b/src/core/lib/iomgr/ev_epollsig_linux.c @@ -893,7 +893,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, fd->po.pi = NULL; } - grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error)); gpr_mu_unlock(&fd->po.mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ @@ -1147,7 +1147,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, /* Release the ref and set pollset->po.pi to NULL */ pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); - grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); } /* pollset->po.mu lock must be held by the caller before calling this */ diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 6145c9ec3f..d1a27b8228 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -386,7 +386,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { if (!fd->released) { close(fd->fd); } - grpc_closure_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE); } static int fd_wrapped_fd(grpc_fd *fd) { @@ -445,7 +445,7 @@ static grpc_error *fd_shutdown_error(grpc_fd *fd) { static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure **st, grpc_closure *closure) { if (fd->shutdown) { - grpc_closure_sched(exec_ctx, closure, + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING("FD shutdown")); } else if (*st == CLOSURE_NOT_READY) { /* not ready ==> switch to a waiting state by setting the closure */ @@ -453,7 +453,7 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_closure_sched(exec_ctx, closure, fd_shutdown_error(fd)); + GRPC_CLOSURE_SCHED(exec_ctx, closure, fd_shutdown_error(fd)); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -476,7 +476,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_closure_sched(exec_ctx, *st, fd_shutdown_error(fd)); + GRPC_CLOSURE_SCHED(exec_ctx, *st, fd_shutdown_error(fd)); *st = CLOSURE_NOT_READY; return 1; } @@ -834,7 +834,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } pollset->fd_count = 0; - grpc_closure_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE); } static void work_combine_error(grpc_error **composite, grpc_error *error) { @@ -883,7 +883,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (!pollset_has_workers(pollset) && !grpc_closure_list_empty(pollset->idle_jobs)) { GPR_TIMER_MARK("pollset_work.idle_jobs", 0); - grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs); goto done; } /* If we're shutting down then we don't execute any extended work */ @@ -1056,7 +1056,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ gpr_mu_lock(&pollset->mu); } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { - grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs); gpr_mu_unlock(&pollset->mu); grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); @@ -1075,7 +1075,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutdown_done = closure; pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset_has_workers(pollset)) { - grpc_closure_list_sched(exec_ctx, &pollset->idle_jobs); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &pollset->idle_jobs); } if (!pollset->called_shutdown && !pollset_has_observers(pollset)) { pollset->called_shutdown = 1; diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 6699be3646..51c36216c5 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -62,7 +62,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; did_something = true; -#ifndef NDEBUG +#ifdef GRPC_CLOSURE_RICH_DEBUG c->scheduled = false; #endif c->cb(exec_ctx, c->cb_arg, error); @@ -85,7 +85,7 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { static void exec_ctx_run(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error) { -#ifndef NDEBUG +#ifdef GRPC_CLOSURE_RICH_DEBUG closure->scheduled = false; #endif closure->cb(exec_ctx, closure->cb_arg, error); diff --git a/src/core/lib/iomgr/executor.c b/src/core/lib/iomgr/executor.c index 7621a7fe75..fda274e797 100644 --- a/src/core/lib/iomgr/executor.c +++ b/src/core/lib/iomgr/executor.c @@ -58,7 +58,7 @@ static size_t run_closures(grpc_exec_ctx *exec_ctx, grpc_closure_list list) { while (c != NULL) { grpc_closure *next = c->next_data.next; grpc_error *error = c->error_data.error; -#ifndef NDEBUG +#ifdef GRPC_CLOSURE_RICH_DEBUG c->scheduled = false; #endif c->cb(exec_ctx, c->cb_arg, error); diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.c index 6fa285b5f3..c2ceecb3c5 100644 --- a/src/core/lib/iomgr/lockfree_event.c +++ b/src/core/lib/iomgr/lockfree_event.c @@ -112,7 +112,7 @@ void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state, closure when transitioning out of CLOSURE_NO_READY state (i.e there is no other code that needs to 'happen-after' this) */ if (gpr_atm_no_barrier_cas(state, CLOSURE_READY, CLOSURE_NOT_READY)) { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); return; /* Successful. Return */ } @@ -125,7 +125,7 @@ void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state, schedule the closure with the shutdown error */ if ((curr & FD_SHUTDOWN_BIT) > 0) { grpc_error *shutdown_err = (grpc_error *)(curr & ~FD_SHUTDOWN_BIT); - grpc_closure_sched(exec_ctx, closure, + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "FD Shutdown", &shutdown_err, 1)); return; @@ -177,7 +177,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state, happens-after on that edge), and a release to pair with anything loading the shutdown state. */ if (gpr_atm_full_cas(state, curr, new_state)) { - grpc_closure_sched(exec_ctx, (grpc_closure *)curr, + GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "FD Shutdown", &shutdown_err, 1)); return true; @@ -226,7 +226,7 @@ void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state) { spurious set_ready; release pairs with this or the acquire in notify_on (or set_shutdown) */ else if (gpr_atm_full_cas(state, curr, CLOSURE_NOT_READY)) { - grpc_closure_sched(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, (grpc_closure *)curr, GRPC_ERROR_NONE); return; } /* else the state changed again (only possible by either a racing diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c index 8ff647f03c..07c7c04559 100644 --- a/src/core/lib/iomgr/pollset_uv.c +++ b/src/core/lib/iomgr/pollset_uv.c @@ -88,7 +88,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, // kick the loop once uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0); } - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); } void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index ab1a327b46..d5a03732cb 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -95,7 +95,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); } else { pollset->on_shutdown = closure; } @@ -143,7 +143,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } if (pollset->shutting_down && pollset->on_shutdown != NULL) { - grpc_closure_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE); pollset->on_shutdown = NULL; } goto done; diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index a78de66941..35dedc23de 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -154,7 +154,7 @@ typedef struct { static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, grpc_error *error) { request *r = rp; - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, r->on_done, grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out)); gpr_free(r->name); @@ -175,13 +175,13 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_closure *on_done, grpc_resolved_addresses **addrs) { request *r = gpr_malloc(sizeof(request)); - grpc_closure_init(&r->request_closure, do_request_thread, r, + GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, grpc_executor_scheduler); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; r->addrs_out = addrs; - grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &r->request_closure, GRPC_ERROR_NONE); } void (*grpc_resolve_address)( diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c index f1ea516175..45de289e45 100644 --- a/src/core/lib/iomgr/resolve_address_uv.c +++ b/src/core/lib/iomgr/resolve_address_uv.c @@ -124,7 +124,7 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status, /* Either no retry was attempted, or the retry failed. Either way, the original error probably has more interesting information */ error = handle_addrinfo_result(status, res, r->addresses); - grpc_closure_sched(&exec_ctx, r->on_done, error); + GRPC_CLOSURE_SCHED(&exec_ctx, r->on_done, error); grpc_exec_ctx_finish(&exec_ctx); gpr_free(r->hints); gpr_free(r); @@ -225,7 +225,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, int s; err = try_split_host_port(name, default_port, &host, &port); if (err != GRPC_ERROR_NONE) { - grpc_closure_sched(exec_ctx, on_done, err); + GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); return; } r = gpr_malloc(sizeof(request)); @@ -252,7 +252,7 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("getaddrinfo failed"); err = grpc_error_set_str(err, GRPC_ERROR_STR_OS_ERROR, grpc_slice_from_static_string(uv_strerror(s))); - grpc_closure_sched(exec_ctx, on_done, err); + GRPC_CLOSURE_SCHED(exec_ctx, on_done, err); gpr_free(r); gpr_free(req); gpr_free(hints); diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 83adfd338a..45cfd7248d 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -139,7 +139,7 @@ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, } else { GRPC_ERROR_REF(error); } - grpc_closure_sched(exec_ctx, r->on_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, r->on_done, error); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r); @@ -158,13 +158,13 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name, grpc_closure *on_done, grpc_resolved_addresses **addresses) { request *r = gpr_malloc(sizeof(request)); - grpc_closure_init(&r->request_closure, do_request_thread, r, + GRPC_CLOSURE_INIT(&r->request_closure, do_request_thread, r, grpc_executor_scheduler); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->on_done = on_done; r->addresses = addresses; - grpc_closure_sched(exec_ctx, &r->request_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &r->request_closure, GRPC_ERROR_NONE); } void (*grpc_resolve_address)( diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 7c3fb93279..f2cc1be74e 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -259,7 +259,7 @@ static void rq_step_sched(grpc_exec_ctx *exec_ctx, if (resource_quota->step_scheduled) return; resource_quota->step_scheduled = true; grpc_resource_quota_ref_internal(resource_quota); - grpc_closure_sched(exec_ctx, &resource_quota->rq_step_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &resource_quota->rq_step_closure, GRPC_ERROR_NONE); } @@ -305,7 +305,7 @@ static bool rq_alloc(grpc_exec_ctx *exec_ctx, } if (resource_user->free_pool >= 0) { resource_user->allocating = false; - grpc_closure_list_sched(exec_ctx, &resource_user->on_allocated); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &resource_user->on_allocated); gpr_mu_unlock(&resource_user->mu); } else { rulist_add_head(resource_user, GRPC_RULIST_AWAITING_ALLOCATION); @@ -363,7 +363,7 @@ static bool rq_reclaim(grpc_exec_ctx *exec_ctx, resource_quota->debug_only_last_reclaimer_resource_user = resource_user; resource_quota->debug_only_last_initiated_reclaimer = c; resource_user->reclaimers[destructive] = NULL; - grpc_closure_run(exec_ctx, c, GRPC_ERROR_NONE); + GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_NONE); return true; } @@ -444,7 +444,7 @@ static bool ru_post_reclaimer(grpc_exec_ctx *exec_ctx, resource_user->new_reclaimers[destructive] = NULL; GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_CANCELLED); return false; } resource_user->reclaimers[destructive] = closure; @@ -485,9 +485,9 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { grpc_resource_user *resource_user = ru; - grpc_closure_sched(exec_ctx, resource_user->reclaimers[0], + GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); - grpc_closure_sched(exec_ctx, resource_user->reclaimers[1], + GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[1], GRPC_ERROR_CANCELLED); resource_user->reclaimers[0] = NULL; resource_user->reclaimers[1] = NULL; @@ -501,9 +501,9 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { for (int i = 0; i < GRPC_RULIST_COUNT; i++) { rulist_remove(resource_user, (grpc_rulist)i); } - grpc_closure_sched(exec_ctx, resource_user->reclaimers[0], + GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[0], GRPC_ERROR_CANCELLED); - grpc_closure_sched(exec_ctx, resource_user->reclaimers[1], + GRPC_CLOSURE_SCHED(exec_ctx, resource_user->reclaimers[1], GRPC_ERROR_CANCELLED); if (resource_user->free_pool != 0) { resource_user->resource_quota->free_pool += resource_user->free_pool; @@ -525,7 +525,7 @@ static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg, slice_allocator->length)); } } - grpc_closure_run(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, &slice_allocator->on_done, GRPC_ERROR_REF(error)); } /******************************************************************************* @@ -579,9 +579,9 @@ grpc_resource_quota *grpc_resource_quota_create(const char *name) { gpr_asprintf(&resource_quota->name, "anonymous_pool_%" PRIxPTR, (intptr_t)resource_quota); } - grpc_closure_init(&resource_quota->rq_step_closure, rq_step, resource_quota, + GRPC_CLOSURE_INIT(&resource_quota->rq_step_closure, rq_step, resource_quota, grpc_combiner_finally_scheduler(resource_quota->combiner)); - grpc_closure_init(&resource_quota->rq_reclamation_done_closure, + GRPC_CLOSURE_INIT(&resource_quota->rq_reclamation_done_closure, rq_reclamation_done, resource_quota, grpc_combiner_scheduler(resource_quota->combiner)); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { @@ -633,8 +633,8 @@ void grpc_resource_quota_resize(grpc_resource_quota *resource_quota, a->size = (int64_t)size; gpr_atm_no_barrier_store(&resource_quota->last_size, (gpr_atm)GPR_MIN((size_t)GPR_ATM_MAX, size)); - grpc_closure_init(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); - grpc_closure_sched(&exec_ctx, &a->closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_INIT(&a->closure, rq_resize, a, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_SCHED(&exec_ctx, &a->closure, GRPC_ERROR_NONE); grpc_exec_ctx_finish(&exec_ctx); } @@ -686,19 +686,19 @@ grpc_resource_user *grpc_resource_user_create( grpc_resource_user *resource_user = gpr_malloc(sizeof(*resource_user)); resource_user->resource_quota = grpc_resource_quota_ref_internal(resource_quota); - grpc_closure_init(&resource_user->allocate_closure, &ru_allocate, + GRPC_CLOSURE_INIT(&resource_user->allocate_closure, &ru_allocate, resource_user, grpc_combiner_scheduler(resource_quota->combiner)); - grpc_closure_init(&resource_user->add_to_free_pool_closure, + GRPC_CLOSURE_INIT(&resource_user->add_to_free_pool_closure, &ru_add_to_free_pool, resource_user, grpc_combiner_scheduler(resource_quota->combiner)); - grpc_closure_init(&resource_user->post_reclaimer_closure[0], + GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[0], &ru_post_benign_reclaimer, resource_user, grpc_combiner_scheduler(resource_quota->combiner)); - grpc_closure_init(&resource_user->post_reclaimer_closure[1], + GRPC_CLOSURE_INIT(&resource_user->post_reclaimer_closure[1], &ru_post_destructive_reclaimer, resource_user, grpc_combiner_scheduler(resource_quota->combiner)); - grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user, + GRPC_CLOSURE_INIT(&resource_user->destroy_closure, &ru_destroy, resource_user, grpc_combiner_scheduler(resource_quota->combiner)); gpr_mu_init(&resource_user->mu); gpr_atm_rel_store(&resource_user->refs, 1); @@ -739,7 +739,7 @@ static void ru_unref_by(grpc_exec_ctx *exec_ctx, gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount); GPR_ASSERT(old >= amount); if (old == amount) { - grpc_closure_sched(exec_ctx, &resource_user->destroy_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->destroy_closure, GRPC_ERROR_NONE); } } @@ -756,9 +756,9 @@ void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx, void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user) { if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, - grpc_closure_create( + GRPC_CLOSURE_CREATE( ru_shutdown, resource_user, grpc_combiner_scheduler(resource_user->resource_quota->combiner)), GRPC_ERROR_NONE); @@ -781,11 +781,11 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); if (!resource_user->allocating) { resource_user->allocating = true; - grpc_closure_sched(exec_ctx, &resource_user->allocate_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->allocate_closure, GRPC_ERROR_NONE); } } else { - grpc_closure_sched(exec_ctx, optional_on_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, optional_on_done, GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); } @@ -804,7 +804,7 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, if (is_bigger_than_zero && was_zero_or_negative && !resource_user->added_to_free_pool) { resource_user->added_to_free_pool = true; - grpc_closure_sched(exec_ctx, &resource_user->add_to_free_pool_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->add_to_free_pool_closure, GRPC_ERROR_NONE); } gpr_mu_unlock(&resource_user->mu); @@ -817,7 +817,7 @@ void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, grpc_closure *closure) { GPR_ASSERT(resource_user->new_reclaimers[destructive] == NULL); resource_user->new_reclaimers[destructive] = closure; - grpc_closure_sched(exec_ctx, + GRPC_CLOSURE_SCHED(exec_ctx, &resource_user->post_reclaimer_closure[destructive], GRPC_ERROR_NONE); } @@ -828,7 +828,7 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "RQ %s %s: reclamation complete", resource_user->resource_quota->name, resource_user->name); } - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, &resource_user->resource_quota->rq_reclamation_done_closure, GRPC_ERROR_NONE); } @@ -836,9 +836,9 @@ void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, void grpc_resource_user_slice_allocator_init( grpc_resource_user_slice_allocator *slice_allocator, grpc_resource_user *resource_user, grpc_iomgr_cb_func cb, void *p) { - grpc_closure_init(&slice_allocator->on_allocated, ru_allocated_slices, + GRPC_CLOSURE_INIT(&slice_allocator->on_allocated, ru_allocated_slices, slice_allocator, grpc_schedule_on_exec_ctx); - grpc_closure_init(&slice_allocator->on_done, cb, p, + GRPC_CLOSURE_INIT(&slice_allocator->on_done, cb, p, grpc_schedule_on_exec_ctx); slice_allocator->resource_user = resource_user; } diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index f73e33db86..a0d731b942 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -116,7 +116,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&socket->state_mu); if (info->has_pending_iocp) { info->has_pending_iocp = 0; - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); } else { info->closure = closure; } @@ -139,7 +139,7 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, GPR_ASSERT(!info->has_pending_iocp); gpr_mu_lock(&socket->state_mu); if (info->closure) { - grpc_closure_sched(exec_ctx, info->closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, info->closure, GRPC_ERROR_NONE); info->closure = NULL; } else { info->has_pending_iocp = 1; diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index abad3c2f22..21e320a6e7 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -234,7 +234,7 @@ finish: grpc_channel_args_destroy(exec_ctx, ac->channel_args); gpr_free(ac); } - grpc_closure_sched(exec_ctx, closure, error); + GRPC_CLOSURE_SCHED(exec_ctx, closure, error); } static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, @@ -263,7 +263,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); if (error != GRPC_ERROR_NONE) { - grpc_closure_sched(exec_ctx, closure, error); + GRPC_CLOSURE_SCHED(exec_ctx, closure, error); return; } if (dsmode == GRPC_DSMODE_IPV4) { @@ -272,7 +272,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr = &addr4_copy; } if ((error = prepare_socket(addr, fd, channel_args)) != GRPC_ERROR_NONE) { - grpc_closure_sched(exec_ctx, closure, error); + GRPC_CLOSURE_SCHED(exec_ctx, closure, error); return; } @@ -290,13 +290,13 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (err >= 0) { *ep = grpc_tcp_client_create_from_fd(exec_ctx, fdobj, channel_args, addr_str); - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_closure_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect")); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect")); goto done; } @@ -311,7 +311,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr_str = NULL; gpr_mu_init(&ac->mu); ac->refs = 2; - grpc_closure_init(&ac->write_closure, on_writable, ac, + GRPC_CLOSURE_INIT(&ac->write_closure, on_writable, ac, grpc_schedule_on_exec_ctx); ac->channel_args = grpc_channel_args_copy(channel_args); @@ -321,7 +321,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, } gpr_mu_lock(&ac->mu); - grpc_closure_init(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&ac->on_alarm, tc_on_alarm, ac, grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index f4084339d6..ab6832932f 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -107,7 +107,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { if (done) { uv_tcp_connect_cleanup(&exec_ctx, connect); } - grpc_closure_sched(&exec_ctx, closure, error); + GRPC_CLOSURE_SCHED(&exec_ctx, closure, error); grpc_exec_ctx_finish(&exec_ctx); } @@ -150,7 +150,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, uv_tcp_connect(&connect->connect_req, connect->tcp_handle, (const struct sockaddr *)resolved_addr->addr, uv_tc_on_connect); - grpc_closure_init(&connect->on_alarm, uv_tc_on_alarm, connect, + GRPC_CLOSURE_INIT(&connect->on_alarm, uv_tc_on_alarm, connect, grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &connect->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index e0913cfaed..fc62105cc9 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -116,7 +116,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect_unlock_and_cleanup(exec_ctx, ac, socket); /* If the connection was aborted, the callback was already called when the deadline was met. */ - grpc_closure_sched(exec_ctx, on_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_done, error); } /* Tries to issue one async connection, then schedules both an IOCP @@ -201,9 +201,9 @@ static void tcp_client_connect_impl( ac->addr_name = grpc_sockaddr_to_uri(addr); ac->endpoint = endpoint; ac->channel_args = grpc_channel_args_copy(channel_args); - grpc_closure_init(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&ac->on_connect, on_connect, ac, grpc_schedule_on_exec_ctx); - grpc_closure_init(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&ac->on_alarm, on_alarm, ac, grpc_schedule_on_exec_ctx); grpc_timer_init(exec_ctx, &ac->alarm, deadline, &ac->on_alarm, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_socket_notify_on_write(exec_ctx, socket, &ac->on_connect); @@ -222,7 +222,7 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } - grpc_closure_sched(exec_ctx, on_done, final_error); + GRPC_CLOSURE_SCHED(exec_ctx, on_done, final_error); } // overridden by api_fuzzer.c diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index a2404f97c1..66e81bf81f 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -221,7 +221,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - grpc_closure_run(exec_ctx, cb, error); + GRPC_CLOSURE_RUN(exec_ctx, cb, error); } #define MAX_READ_IOVEC 4 @@ -348,7 +348,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = false; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_closure_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE); } } @@ -465,7 +465,7 @@ static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, gpr_log(GPR_DEBUG, "write: %s", str); } - grpc_closure_run(exec_ctx, cb, error); + GRPC_CLOSURE_RUN(exec_ctx, cb, error); TCP_UNREF(exec_ctx, tcp, "write"); } } @@ -491,7 +491,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, cb, grpc_fd_is_shutdown(tcp->em_fd) ? tcp_annotate_error(GRPC_ERROR_CREATE_FROM_STATIC_STRING("EOF"), @@ -515,7 +515,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, const char *str = grpc_error_string(error); gpr_log(GPR_DEBUG, "write: %s", str); } - grpc_closure_sched(exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(exec_ctx, cb, error); } GPR_TIMER_END("tcp_write", 0); @@ -616,9 +616,9 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_fd *em_fd, gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; - grpc_closure_init(&tcp->read_closure, tcp_handle_read, tcp, + GRPC_CLOSURE_INIT(&tcp->read_closure, tcp_handle_read, tcp, grpc_schedule_on_exec_ctx); - grpc_closure_init(&tcp->write_closure, tcp_handle_write, tcp, + GRPC_CLOSURE_INIT(&tcp->write_closure, tcp_handle_write, tcp, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&tcp->last_read_buffer); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 7bb8392d4b..f304642951 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -121,7 +121,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { GPR_ASSERT(s->shutdown); gpr_mu_unlock(&s->mu); if (s->shutdown_complete != NULL) { - grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); @@ -163,7 +163,7 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { grpc_tcp_listener *sp; for (sp = s->head; sp; sp = sp->next) { grpc_unlink_if_unix_domain_socket(&sp->addr); - grpc_closure_init(&sp->destroyed_closure, destroyed_port, s, + GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, grpc_schedule_on_exec_ctx); grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL, "tcp_listener_shutdown"); @@ -503,7 +503,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, "clone_port", clone_port(sp, (unsigned)(pollset_count - 1)))); for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); - grpc_closure_init(&sp->read_closure, on_read, sp, + GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); s->active_ports++; @@ -513,7 +513,7 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); } - grpc_closure_init(&sp->read_closure, on_read, sp, + GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); s->active_ports++; @@ -540,7 +540,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { grpc_tcp_server_shutdown_listeners(exec_ctx, s); gpr_mu_lock(&s->mu); - grpc_closure_list_sched(exec_ctx, &s->shutdown_starting); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &s->shutdown_starting); gpr_mu_unlock(&s->mu); tcp_server_destroy(exec_ctx, s); } diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 632a232861..2de0ea90e7 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -117,7 +117,7 @@ void grpc_tcp_server_shutdown_starting_add(grpc_tcp_server *s, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { GPR_ASSERT(s->shutdown); if (s->shutdown_complete != NULL) { - grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } while (s->head) { @@ -171,7 +171,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { /* Complete shutdown_starting work before destroying. */ grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_closure_list_sched(&local_exec_ctx, &s->shutdown_starting); + GRPC_CLOSURE_LIST_SCHED(&local_exec_ctx, &s->shutdown_starting); if (exec_ctx == NULL) { grpc_exec_ctx_flush(&local_exec_ctx); tcp_server_destroy(&local_exec_ctx, s); diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index e8343a94a0..0162afc1ad 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -134,10 +134,10 @@ static void destroy_server(grpc_exec_ctx *exec_ctx, void *arg, static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } - grpc_closure_sched(exec_ctx, grpc_closure_create(destroy_server, s, + GRPC_CLOSURE_SCHED(exec_ctx, GRPC_CLOSURE_CREATE(destroy_server, s, grpc_schedule_on_exec_ctx), GRPC_ERROR_NONE); } @@ -176,7 +176,7 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (gpr_unref(&s->refs)) { grpc_tcp_server_shutdown_listeners(exec_ctx, s); gpr_mu_lock(&s->mu); - grpc_closure_list_sched(exec_ctx, &s->shutdown_starting); + GRPC_CLOSURE_LIST_SCHED(exec_ctx, &s->shutdown_starting); gpr_mu_unlock(&s->mu); tcp_server_destroy(exec_ctx, s); } @@ -437,7 +437,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, SOCKET sock, sp->new_socket = INVALID_SOCKET; sp->port = port; sp->port_index = port_index; - grpc_closure_init(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&sp->on_accept, on_accept, sp, grpc_schedule_on_exec_ctx); GPR_ASSERT(sp->socket); gpr_mu_unlock(&s->mu); *listener = sp; diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index a37a75c8fa..ab5bb9f7da 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -161,7 +161,7 @@ static void read_callback(uv_stream_t *stream, ssize_t nread, // nread < 0: Error error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("TCP Read failed"); } - grpc_closure_sched(&exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); grpc_exec_ctx_finish(&exec_ctx); } @@ -183,7 +183,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, error = grpc_error_set_str(error, GRPC_ERROR_STR_OS_ERROR, grpc_slice_from_static_string(uv_strerror(status))); - grpc_closure_sched(exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(exec_ctx, cb, error); } if (GRPC_TRACER_ON(grpc_tcp_trace)) { const char *str = grpc_error_string(error); @@ -210,7 +210,7 @@ static void write_callback(uv_write_t *req, int status) { gpr_free(tcp->write_buffers); grpc_resource_user_free(&exec_ctx, tcp->resource_user, sizeof(uv_buf_t) * tcp->write_slices->count); - grpc_closure_sched(&exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(&exec_ctx, cb, error); grpc_exec_ctx_finish(&exec_ctx); } @@ -236,7 +236,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } if (tcp->shutting_down) { - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING( + GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "TCP socket is shutting down")); return; } @@ -247,7 +247,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (tcp->write_slices->count == 0) { // No slices means we don't have to do anything, // and libuv doesn't like empty writes - grpc_closure_sched(exec_ctx, cb, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_ERROR_NONE); return; } diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index dbae42e936..161b397534 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -179,7 +179,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { tcp->read_cb = NULL; TCP_UNREF(exec_ctx, tcp, "read"); - grpc_closure_sched(exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(exec_ctx, cb, error); } static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -193,7 +193,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "TCP socket is shutting down", &tcp->shutdown_error, 1)); @@ -220,7 +220,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { info->bytes_transfered = bytes_read; - grpc_closure_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE); return; } @@ -233,7 +233,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; - grpc_closure_sched(exec_ctx, &tcp->on_read, + GRPC_CLOSURE_SCHED(exec_ctx, &tcp->on_read, GRPC_WSA_ERROR(info->wsa_error, "WSARecv")); return; } @@ -265,7 +265,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } TCP_UNREF(exec_ctx, tcp, "write"); - grpc_closure_sched(exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(exec_ctx, cb, error); } /* Initiates a write. */ @@ -283,7 +283,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, cb, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "TCP socket is shutting down", &tcp->shutdown_error, 1)); @@ -317,7 +317,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *error = status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); - grpc_closure_sched(exec_ctx, cb, error); + GRPC_CLOSURE_SCHED(exec_ctx, cb, error); if (allocated) gpr_free(allocated); return; } @@ -335,7 +335,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(exec_ctx, tcp, "write"); - grpc_closure_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend")); + GRPC_CLOSURE_SCHED(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend")); return; } } @@ -426,8 +426,8 @@ grpc_endpoint *grpc_tcp_create(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, tcp->socket = socket; gpr_mu_init(&tcp->mu); gpr_ref_init(&tcp->refcount, 1); - grpc_closure_init(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx); - grpc_closure_init(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&tcp->on_read, on_read, tcp, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&tcp->on_write, on_write, tcp, grpc_schedule_on_exec_ctx); tcp->peer_string = gpr_strdup(peer_string); tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ diff --git a/src/core/lib/iomgr/timer_generic.c b/src/core/lib/iomgr/timer_generic.c index 69b3cfd139..bf73d2c685 100644 --- a/src/core/lib/iomgr/timer_generic.c +++ b/src/core/lib/iomgr/timer_generic.c @@ -230,7 +230,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (!g_shared_mutables.initialized) { timer->pending = false; - grpc_closure_sched(exec_ctx, timer->closure, + GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Attempt to create timer before initialization")); return; @@ -240,7 +240,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, timer->pending = true; if (gpr_time_cmp(deadline, now) <= 0) { timer->pending = false; - grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); gpr_mu_unlock(&shard->mu); /* early out */ return; @@ -310,7 +310,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { timer->pending ? "true" : "false"); } if (timer->pending) { - grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); timer->pending = false; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -400,7 +400,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_REF(error)); n++; } *new_min_deadline = compute_min_deadline(shard); diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c index f814f0e474..4f204cfbf8 100644 --- a/src/core/lib/iomgr/timer_uv.c +++ b/src/core/lib/iomgr/timer_uv.c @@ -44,7 +44,7 @@ void run_expired_timer(uv_timer_t *handle) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_ASSERT(timer->pending); timer->pending = 0; - grpc_closure_sched(&exec_ctx, timer->closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE); stop_uv_timer(handle); grpc_exec_ctx_finish(&exec_ctx); } @@ -57,7 +57,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, timer->closure = closure; if (gpr_time_cmp(deadline, now) <= 0) { timer->pending = 0; - grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_NONE); return; } timer->pending = 1; @@ -76,7 +76,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { if (timer->pending) { timer->pending = 0; - grpc_closure_sched(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED); stop_uv_timer((uv_timer_t *)timer->uv_timer); } } diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 200a722c7e..54e7f417a7 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -156,7 +156,7 @@ static void dummy_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { if (s->shutdown_complete != NULL) { - grpc_closure_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE); } gpr_mu_destroy(&s->mu); @@ -201,13 +201,13 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { for (sp = s->head; sp; sp = sp->next) { grpc_unlink_if_unix_domain_socket(&sp->addr); - grpc_closure_init(&sp->destroyed_closure, destroyed_port, s, + GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, grpc_schedule_on_exec_ctx); if (!sp->orphan_notified) { /* Call the orphan_cb to signal that the FD is about to be closed and * should no longer be used. Because at this point, all listening ports * have been shutdown already, no need to shutdown again.*/ - grpc_closure_init(&sp->orphan_fd_closure, dummy_cb, sp->emfd, + GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, dummy_cb, sp->emfd, grpc_schedule_on_exec_ctx); GPR_ASSERT(sp->orphan_cb); sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, @@ -240,7 +240,7 @@ void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, struct shutdown_fd_args *args = gpr_malloc(sizeof(*args)); args->fd = sp->emfd; args->server_mu = &s->mu; - grpc_closure_init(&sp->orphan_fd_closure, shutdown_fd, args, + GRPC_CLOSURE_INIT(&sp->orphan_fd_closure, shutdown_fd, args, grpc_schedule_on_exec_ctx); sp->orphan_cb(exec_ctx, sp->emfd, &sp->orphan_fd_closure, sp->server->user_data); @@ -525,11 +525,11 @@ void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s, for (i = 0; i < pollset_count; i++) { grpc_pollset_add_fd(exec_ctx, pollsets[i], sp->emfd); } - grpc_closure_init(&sp->read_closure, on_read, sp, + GRPC_CLOSURE_INIT(&sp->read_closure, on_read, sp, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure); - grpc_closure_init(&sp->write_closure, on_write, sp, + GRPC_CLOSURE_INIT(&sp->write_closure, on_write, sp, grpc_schedule_on_exec_ctx); grpc_fd_notify_on_write(exec_ctx, sp->emfd, &sp->write_closure); diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c index 15288e1dbe..3cbb399429 100644 --- a/src/core/lib/security/credentials/fake/fake_credentials.c +++ b/src/core/lib/security/credentials/fake/fake_credentials.c @@ -123,8 +123,8 @@ static void md_only_test_get_request_metadata( if (c->is_async) { grpc_credentials_metadata_request *cb_arg = grpc_credentials_metadata_request_create(creds, cb, user_data); - grpc_closure_sched(exec_ctx, - grpc_closure_create(on_simulated_token_fetch_done, + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_CREATE(on_simulated_token_fetch_done, cb_arg, grpc_executor_scheduler), GRPC_ERROR_NONE); } else { diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index aaa7d97d3f..a2a8e289ee 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -115,7 +115,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { grpc_httpcli_get( exec_ctx, &context, &detector.pollent, resource_quota, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), - grpc_closure_create(on_compute_engine_detection_http_response, &detector, + GRPC_CLOSURE_CREATE(on_compute_engine_detection_http_response, &detector, grpc_schedule_on_exec_ctx), &detector.response); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); @@ -140,7 +140,7 @@ static int is_stack_running_on_compute_engine(grpc_exec_ctx *exec_ctx) { gpr_mu_unlock(g_polling_mu); grpc_httpcli_context_destroy(exec_ctx, &context); - grpc_closure_init(&destroy_closure, destroy_pollset, + GRPC_CLOSURE_INIT(&destroy_closure, destroy_pollset, grpc_polling_entity_pollset(&detector.pollent), grpc_schedule_on_exec_ctx); grpc_pollset_shutdown(exec_ctx, diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index f6db670a67..8c747085bb 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -668,7 +668,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data, grpc_httpcli_get( exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, resource_quota, &req, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay), - grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx), + GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx), &ctx->responses[HTTP_RESPONSE_KEYS]); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); grpc_json_destroy(json); @@ -771,7 +771,7 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, gpr_asprintf(&req.http.path, "/%s/%s", path_prefix, iss); } http_cb = - grpc_closure_create(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_CREATE(on_keys_retrieved, ctx, grpc_schedule_on_exec_ctx); rsp_idx = HTTP_RESPONSE_KEYS; } else { req.host = gpr_strdup(strstr(iss, "https://") == iss ? iss + 8 : iss); @@ -783,7 +783,7 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx, gpr_asprintf(&req.http.path, "/%s%s", path_prefix, GRPC_OPENID_CONFIG_URL_SUFFIX); } - http_cb = grpc_closure_create(on_openid_config_retrieved, ctx, + http_cb = GRPC_CLOSURE_CREATE(on_openid_config_retrieved, ctx, grpc_schedule_on_exec_ctx); rsp_idx = HTTP_RESPONSE_OPENID; } diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c index acf4c37da8..9de561b310 100644 --- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c +++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c @@ -300,7 +300,7 @@ static void compute_engine_fetch_oauth2( grpc_resource_quota_create("oauth2_credentials"); grpc_httpcli_get( exec_ctx, httpcli_context, pollent, resource_quota, &request, deadline, - grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx), + GRPC_CLOSURE_CREATE(response_cb, metadata_req, grpc_schedule_on_exec_ctx), &metadata_req->response); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); @@ -360,7 +360,7 @@ static void refresh_token_fetch_oauth2( grpc_httpcli_post( exec_ctx, httpcli_context, pollent, resource_quota, &request, body, strlen(body), deadline, - grpc_closure_create(response_cb, metadata_req, grpc_schedule_on_exec_ctx), + GRPC_CLOSURE_CREATE(response_cb, metadata_req, grpc_schedule_on_exec_ctx), &metadata_req->response); grpc_resource_quota_unref_internal(exec_ctx, resource_quota); gpr_free(body); diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 643e057229..140cf294ae 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -132,7 +132,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep, } } ep->read_buffer = NULL; - grpc_closure_sched(exec_ctx, ep->read_cb, error); + GRPC_CLOSURE_SCHED(exec_ctx, ep->read_cb, error); SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read"); } @@ -317,7 +317,7 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &ep->output_buffer); - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, cb, grpc_set_tsi_error_result( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Wrap failed"), result)); @@ -399,7 +399,7 @@ grpc_endpoint *grpc_secure_endpoint_create( grpc_slice_buffer_init(&ep->output_buffer); grpc_slice_buffer_init(&ep->source_buffer); ep->read_buffer = NULL; - grpc_closure_init(&ep->on_read, on_read, ep, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&ep->on_read, on_read, ep, grpc_schedule_on_exec_ctx); gpr_mu_init(&ep->protector_mu); gpr_ref_init(&ep->ref, 1); return &ep->base; diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 519e2538a1..30a74302e1 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -122,7 +122,7 @@ void grpc_security_connector_check_peer(grpc_exec_ctx *exec_ctx, grpc_auth_context **auth_context, grpc_closure *on_peer_checked) { if (sc == NULL) { - grpc_closure_sched(exec_ctx, on_peer_checked, + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "cannot check peer -- no security connector")); tsi_peer_destruct(&peer); @@ -340,7 +340,7 @@ static void fake_check_peer(grpc_exec_ctx *exec_ctx, *auth_context, GRPC_TRANSPORT_SECURITY_TYPE_PROPERTY_NAME, GRPC_FAKE_TRANSPORT_SECURITY_TYPE); end: - grpc_closure_sched(exec_ctx, on_peer_checked, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); tsi_peer_destruct(&peer); } @@ -602,7 +602,7 @@ static void ssl_channel_check_peer(grpc_exec_ctx *exec_ctx, ? c->overridden_target_name : c->target_name, &peer, auth_context); - grpc_closure_sched(exec_ctx, on_peer_checked, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); tsi_peer_destruct(&peer); } @@ -612,7 +612,7 @@ static void ssl_server_check_peer(grpc_exec_ctx *exec_ctx, grpc_closure *on_peer_checked) { grpc_error *error = ssl_check_peer(sc, NULL, &peer, auth_context); tsi_peer_destruct(&peer); - grpc_closure_sched(exec_ctx, on_peer_checked, error); + GRPC_CLOSURE_SCHED(exec_ctx, on_peer_checked, error); } static void add_shallow_auth_property_to_peer(tsi_peer *peer, diff --git a/src/core/lib/security/transport/security_handshaker.c b/src/core/lib/security/transport/security_handshaker.c index f39d10cd81..239a211c0b 100644 --- a/src/core/lib/security/transport/security_handshaker.c +++ b/src/core/lib/security/transport/security_handshaker.c @@ -124,7 +124,7 @@ static void security_handshake_failed_locked(grpc_exec_ctx *exec_ctx, h->shutdown = true; } // Invoke callback. - grpc_closure_sched(exec_ctx, h->on_handshake_done, error); + GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, error); } static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, @@ -173,7 +173,7 @@ static void on_peer_checked(grpc_exec_ctx *exec_ctx, void *arg, grpc_channel_args_copy_and_add(tmp_args, &auth_context_arg, 1); grpc_channel_args_destroy(exec_ctx, tmp_args); // Invoke callback. - grpc_closure_sched(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, h->on_handshake_done, GRPC_ERROR_NONE); // Set shutdown to true so that subsequent calls to // security_handshaker_shutdown() do nothing. h->shutdown = true; @@ -408,13 +408,13 @@ static grpc_handshaker *security_handshaker_create( gpr_ref_init(&h->refs, 1); h->handshake_buffer_size = GRPC_INITIAL_HANDSHAKE_BUFFER_SIZE; h->handshake_buffer = gpr_malloc(h->handshake_buffer_size); - grpc_closure_init(&h->on_handshake_data_sent_to_peer, + GRPC_CLOSURE_INIT(&h->on_handshake_data_sent_to_peer, on_handshake_data_sent_to_peer, h, grpc_schedule_on_exec_ctx); - grpc_closure_init(&h->on_handshake_data_received_from_peer, + GRPC_CLOSURE_INIT(&h->on_handshake_data_received_from_peer, on_handshake_data_received_from_peer, h, grpc_schedule_on_exec_ctx); - grpc_closure_init(&h->on_peer_checked, on_peer_checked, h, + GRPC_CLOSURE_INIT(&h->on_peer_checked, on_peer_checked, h, grpc_schedule_on_exec_ctx); grpc_slice_buffer_init(&h->outgoing); return &h->base; @@ -440,7 +440,7 @@ static void fail_handshaker_do_handshake(grpc_exec_ctx *exec_ctx, grpc_tcp_server_acceptor *acceptor, grpc_closure *on_handshake_done, grpc_handshaker_args *args) { - grpc_closure_sched(exec_ctx, on_handshake_done, + GRPC_CLOSURE_SCHED(exec_ctx, on_handshake_done, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Failed to create security handshaker")); } diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index eb7b635098..4e6914be7b 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -113,7 +113,7 @@ static void on_md_processing_done( grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].value); } grpc_metadata_array_destroy(&calld->md); - grpc_closure_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE); } else { for (size_t i = 0; i < calld->md.count; i++) { grpc_slice_unref_internal(&exec_ctx, calld->md.metadata[i].key); @@ -128,7 +128,7 @@ static void on_md_processing_done( &exec_ctx, calld->transport_op->payload->send_message.send_message); calld->transport_op->payload->send_message.send_message = NULL; } - grpc_closure_sched( + GRPC_CLOSURE_SCHED( &exec_ctx, calld->on_done_recv, grpc_error_set_int(GRPC_ERROR_CREATE_FROM_COPIED_STRING(error_details), GRPC_ERROR_INT_GRPC_STATUS, status)); @@ -151,7 +151,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, return; } } - grpc_closure_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error)); } static void set_recv_ops_md_callbacks(grpc_call_element *elem, @@ -193,7 +193,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* initialize members */ memset(calld, 0, sizeof(*calld)); - grpc_closure_init(&calld->auth_on_recv, auth_on_recv, elem, + GRPC_CLOSURE_INIT(&calld->auth_on_recv, auth_on_recv, elem, grpc_schedule_on_exec_ctx); if (args->context[GRPC_CONTEXT_SECURITY].value != NULL) { diff --git a/src/core/lib/surface/alarm.c b/src/core/lib/surface/alarm.c index 4cd73a3f20..ef8405cca8 100644 --- a/src/core/lib/surface/alarm.c +++ b/src/core/lib/surface/alarm.c @@ -50,7 +50,7 @@ grpc_alarm *grpc_alarm_create(grpc_completion_queue *cq, gpr_timespec deadline, alarm->tag = tag; grpc_cq_begin_op(cq, tag); - grpc_closure_init(&alarm->on_alarm, alarm_cb, alarm, + GRPC_CLOSURE_INIT(&alarm->on_alarm, alarm_cb, alarm, grpc_schedule_on_exec_ctx); grpc_timer_init(&exec_ctx, &alarm->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index fd4ed726b8..b499219e17 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -520,7 +520,7 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, } grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c), &c->final_info, - grpc_closure_init(&c->release_call, release_call, c, + GRPC_CLOSURE_INIT(&c->release_call, release_call, c, grpc_schedule_on_exec_ctx)); GPR_TIMER_END("destroy_call", 0); } @@ -634,7 +634,7 @@ static void cancel_with_error(grpc_exec_ctx *exec_ctx, grpc_call *c, GRPC_CALL_INTERNAL_REF(c, "termination"); set_status_from_error(exec_ctx, c, source, GRPC_ERROR_REF(error)); grpc_transport_stream_op_batch *op = grpc_make_transport_stream_op( - grpc_closure_create(done_termination, c, grpc_schedule_on_exec_ctx)); + GRPC_CLOSURE_CREATE(done_termination, c, grpc_schedule_on_exec_ctx)); op->cancel_stream = true; op->payload->cancel_stream.cancel_error = error; execute_op(exec_ctx, c, op); @@ -1170,7 +1170,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, if (bctl->completion_data.notify_tag.is_closure) { /* unrefs bctl->error */ bctl->call = NULL; - grpc_closure_run(exec_ctx, bctl->completion_data.notify_tag.tag, error); + GRPC_CLOSURE_RUN(exec_ctx, bctl->completion_data.notify_tag.tag, error); GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion"); } else { /* unrefs bctl->error */ @@ -1275,7 +1275,7 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, } else { *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0); } - grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl, + GRPC_CLOSURE_INIT(&call->receiving_slice_ready, receiving_slice_ready, bctl, grpc_schedule_on_exec_ctx); continue_receiving_slices(exec_ctx, bctl); } @@ -1390,11 +1390,11 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, call->has_initial_md_been_received = true; if (call->saved_receiving_stream_ready_bctlp != NULL) { - grpc_closure *saved_rsr_closure = grpc_closure_create( + grpc_closure *saved_rsr_closure = GRPC_CLOSURE_CREATE( receiving_stream_ready, call->saved_receiving_stream_ready_bctlp, grpc_schedule_on_exec_ctx); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_closure_run(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, saved_rsr_closure, GRPC_ERROR_REF(error)); } finish_batch_step(exec_ctx, bctl); @@ -1436,7 +1436,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, free_no_op_completion, NULL, gpr_malloc(sizeof(grpc_cq_completion))); } else { - grpc_closure_sched(exec_ctx, notify_tag, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, notify_tag, GRPC_ERROR_NONE); } error = GRPC_CALL_OK; goto done; @@ -1644,7 +1644,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, call->received_initial_metadata = true; call->buffered_metadata[0] = op->data.recv_initial_metadata.recv_initial_metadata; - grpc_closure_init(&call->receiving_initial_metadata_ready, + GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready, receiving_initial_metadata_ready, bctl, grpc_schedule_on_exec_ctx); stream_op->recv_initial_metadata = true; @@ -1668,7 +1668,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, stream_op->recv_message = true; call->receiving_buffer = op->data.recv_message.recv_message; stream_op_payload->recv_message.recv_message = &call->receiving_stream; - grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready, + GRPC_CLOSURE_INIT(&call->receiving_stream_ready, receiving_stream_ready, bctl, grpc_schedule_on_exec_ctx); stream_op_payload->recv_message.recv_message_ready = &call->receiving_stream_ready; @@ -1734,7 +1734,7 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, } gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed); - grpc_closure_init(&bctl->finish_batch, finish_batch, bctl, + GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl, grpc_schedule_on_exec_ctx); stream_op->on_complete = &bctl->finish_batch; gpr_atm_rel_store(&call->any_ops_sent_atm, 1); diff --git a/src/core/lib/surface/channel_ping.c b/src/core/lib/surface/channel_ping.c index 4de3a8af64..80eb80af78 100644 --- a/src/core/lib/surface/channel_ping.c +++ b/src/core/lib/surface/channel_ping.c @@ -56,7 +56,7 @@ void grpc_channel_ping(grpc_channel *channel, grpc_completion_queue *cq, GPR_ASSERT(reserved == NULL); pr->tag = tag; pr->cq = cq; - grpc_closure_init(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&pr->closure, ping_done, pr, grpc_schedule_on_exec_ctx); op->send_ping = &pr->closure; op->bind_pollset = grpc_cq_pollset(cq); grpc_cq_begin_op(cq, tag); diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 07288053c8..1a5c721214 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -113,7 +113,7 @@ static grpc_error *non_polling_poller_work(grpc_exec_ctx *exec_ctx, npp->root = w.next; if (&w == npp->root) { if (npp->shutdown) { - grpc_closure_sched(exec_ctx, npp->shutdown, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, npp->shutdown, GRPC_ERROR_NONE); } npp->root = NULL; } @@ -146,7 +146,7 @@ static void non_polling_poller_shutdown(grpc_exec_ctx *exec_ctx, GPR_ASSERT(closure != NULL); p->shutdown = closure; if (p->root == NULL) { - grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, closure, GRPC_ERROR_NONE); } else { non_polling_worker *w = p->root; do { @@ -417,7 +417,7 @@ grpc_completion_queue *grpc_completion_queue_create_internal( cqd->outstanding_tag_count = 0; #endif cq_event_queue_init(&cqd->queue); - grpc_closure_init(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, + GRPC_CLOSURE_INIT(&cqd->pollset_shutdown_done, on_pollset_shutdown_done, cc, grpc_schedule_on_exec_ctx); GPR_TIMER_END("grpc_completion_queue_create_internal", 0); diff --git a/src/core/lib/surface/lame_client.cc b/src/core/lib/surface/lame_client.cc index c9f498ce6a..a0791080a9 100644 --- a/src/core/lib/surface/lame_client.cc +++ b/src/core/lib/surface/lame_client.cc @@ -105,17 +105,17 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->on_connectivity_state_change) { GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_SHUTDOWN); *op->connectivity_state = GRPC_CHANNEL_SHUTDOWN; - grpc_closure_sched(exec_ctx, op->on_connectivity_state_change, + GRPC_CLOSURE_SCHED(exec_ctx, op->on_connectivity_state_change, GRPC_ERROR_NONE); } if (op->send_ping != NULL) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, op->send_ping, GRPC_ERROR_CREATE_FROM_STATIC_STRING("lame client channel")); } GRPC_ERROR_UNREF(op->disconnect_with_error); if (op->on_consumed != NULL) { - grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); } } @@ -128,7 +128,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, grpc_closure *then_schedule_closure) { - grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 6cccd0d634..8a2616b027 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -269,7 +269,7 @@ static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg, static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel, bool send_goaway, grpc_error *send_disconnect) { struct shutdown_cleanup_args *sc = gpr_malloc(sizeof(*sc)); - grpc_closure_init(&sc->closure, shutdown_cleanup, sc, + GRPC_CLOSURE_INIT(&sc->closure, shutdown_cleanup, sc, grpc_schedule_on_exec_ctx); grpc_transport_op *op = grpc_make_transport_op(&sc->closure); grpc_channel_element *elem; @@ -337,11 +337,11 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init( + GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); } } @@ -432,7 +432,7 @@ static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand, orphan_channel(chand); server_ref(chand->server); maybe_finish_shutdown(exec_ctx, chand->server); - grpc_closure_init(&chand->finish_destroy_channel_closure, + GRPC_CLOSURE_INIT(&chand->finish_destroy_channel_closure, finish_destroy_channel, chand, grpc_schedule_on_exec_ctx); if (GRPC_TRACER_ON(grpc_server_channel_trace) && error != GRPC_ERROR_NONE) { @@ -497,11 +497,11 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init( + GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_REF(error)); return; } @@ -546,9 +546,9 @@ static void finish_start_new_rpc( gpr_mu_lock(&calld->mu_state); calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem, + GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); - grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); return; } @@ -563,7 +563,7 @@ static void finish_start_new_rpc( memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; op.data.recv_message.recv_message = &calld->payload; - grpc_closure_init(&calld->publish, publish_new_rpc, elem, + GRPC_CLOSURE_INIT(&calld->publish, publish_new_rpc, elem, grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, calld->call, &op, 1, &calld->publish); @@ -740,7 +740,7 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_ERROR_UNREF(src_error); } - grpc_closure_run(exec_ctx, calld->on_done_recv_initial_metadata, error); + GRPC_CLOSURE_RUN(exec_ctx, calld->on_done_recv_initial_metadata, error); } static void server_mutate_op(grpc_call_element *elem, @@ -779,9 +779,9 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, if (calld->state == NOT_STARTED) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); - grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem, + GRPC_CLOSURE_INIT(&calld->kill_zombie_closure, kill_zombie, elem, grpc_schedule_on_exec_ctx); - grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); } else if (calld->state == PENDING) { calld->state = ZOMBIED; @@ -819,7 +819,7 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd, op.op = GRPC_OP_RECV_INITIAL_METADATA; op.data.recv_initial_metadata.recv_initial_metadata = &calld->initial_metadata; - grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem, + GRPC_CLOSURE_INIT(&calld->got_initial_metadata, got_initial_metadata, elem, grpc_schedule_on_exec_ctx); grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1, &calld->got_initial_metadata); @@ -855,7 +855,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, calld->call = grpc_call_from_top_element(elem); gpr_mu_init(&calld->mu_state); - grpc_closure_init(&calld->server_on_recv_initial_metadata, + GRPC_CLOSURE_INIT(&calld->server_on_recv_initial_metadata, server_on_recv_initial_metadata, elem, grpc_schedule_on_exec_ctx); @@ -895,7 +895,7 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, chand->next = chand->prev = chand; chand->registered_methods = NULL; chand->connectivity_state = GRPC_CHANNEL_IDLE; - grpc_closure_init(&chand->channel_connectivity_changed, + GRPC_CLOSURE_INIT(&chand->channel_connectivity_changed, channel_connectivity_changed, chand, grpc_schedule_on_exec_ctx); return GRPC_ERROR_NONE; @@ -1075,7 +1075,7 @@ void grpc_server_start(grpc_server *server) { server_ref(server); server->starting = true; - grpc_closure_sched(&exec_ctx, grpc_closure_create(start_listeners, server, + GRPC_CLOSURE_SCHED(&exec_ctx, GRPC_CLOSURE_CREATE(start_listeners, server, grpc_executor_scheduler), GRPC_ERROR_NONE); @@ -1255,7 +1255,7 @@ void grpc_server_shutdown_and_notify(grpc_server *server, /* Shutdown listeners */ for (l = server->listeners; l; l = l->next) { - grpc_closure_init(&l->destroy_done, listener_destroy_done, server, + GRPC_CLOSURE_INIT(&l->destroy_done, listener_destroy_done, server, grpc_schedule_on_exec_ctx); l->destroy(&exec_ctx, server, l->arg, &l->destroy_done); } @@ -1349,11 +1349,11 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&calld->mu_state); if (calld->state == ZOMBIED) { gpr_mu_unlock(&calld->mu_state); - grpc_closure_init( + GRPC_CLOSURE_INIT( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0), grpc_schedule_on_exec_ctx); - grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_CLOSURE_SCHED(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE); } else { GPR_ASSERT(calld->state == PENDING); diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index f1bbfc082a..6fe40af3b2 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -67,7 +67,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner"); } - grpc_closure_sched(exec_ctx, w->notify, error); + GRPC_CLOSURE_SCHED(exec_ctx, w->notify, error); gpr_free(w); } GRPC_ERROR_UNREF(tracker->current_error); @@ -125,7 +125,7 @@ bool grpc_connectivity_state_notify_on_state_change( if (current == NULL) { grpc_connectivity_state_watcher *w = tracker->watchers; if (w != NULL && w->notify == notify) { - grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(exec_ctx, notify, GRPC_ERROR_CANCELLED); tracker->watchers = w->next; gpr_free(w); return false; @@ -133,7 +133,7 @@ bool grpc_connectivity_state_notify_on_state_change( while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; if (rm_candidate != NULL && rm_candidate->notify == notify) { - grpc_closure_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED); + GRPC_CLOSURE_SCHED(exec_ctx, notify, GRPC_ERROR_CANCELLED); w->next = w->next->next; gpr_free(rm_candidate); return false; @@ -144,7 +144,7 @@ bool grpc_connectivity_state_notify_on_state_change( } else { if (cur != *current) { *current = cur; - grpc_closure_sched(exec_ctx, notify, + GRPC_CLOSURE_SCHED(exec_ctx, notify, GRPC_ERROR_REF(tracker->current_error)); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); @@ -197,7 +197,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify); } - grpc_closure_sched(exec_ctx, w->notify, + GRPC_CLOSURE_SCHED(exec_ctx, w->notify, GRPC_ERROR_REF(tracker->current_error)); gpr_free(w); } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index abc289ccd9..c2afedec58 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -65,7 +65,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, there. */ refcount->destroy.scheduler = grpc_executor_scheduler; } - grpc_closure_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE); + GRPC_CLOSURE_SCHED(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE); } } @@ -112,7 +112,7 @@ void grpc_stream_ref_init(grpc_stream_refcount *refcount, int initial_refs, grpc_iomgr_cb_func cb, void *cb_arg) { #endif gpr_ref_init(&refcount->refs, initial_refs); - grpc_closure_init(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&refcount->destroy, cb, cb_arg, grpc_schedule_on_exec_ctx); refcount->slice_refcount.vtable = &stream_ref_slice_vtable; refcount->slice_refcount.sub_refcount = &refcount->slice_refcount; } @@ -202,16 +202,16 @@ void grpc_transport_stream_op_batch_finish_with_failure( grpc_exec_ctx *exec_ctx, grpc_transport_stream_op_batch *op, grpc_error *error) { if (op->recv_message) { - grpc_closure_sched(exec_ctx, op->payload->recv_message.recv_message_ready, + GRPC_CLOSURE_SCHED(exec_ctx, op->payload->recv_message.recv_message_ready, GRPC_ERROR_REF(error)); } if (op->recv_initial_metadata) { - grpc_closure_sched( + GRPC_CLOSURE_SCHED( exec_ctx, op->payload->recv_initial_metadata.recv_initial_metadata_ready, GRPC_ERROR_REF(error)); } - grpc_closure_sched(exec_ctx, op->on_complete, error); + GRPC_CLOSURE_SCHED(exec_ctx, op->on_complete, error); if (op->cancel_stream) { GRPC_ERROR_UNREF(op->payload->cancel_stream.cancel_error); } @@ -226,13 +226,13 @@ typedef struct { static void destroy_made_transport_op(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { made_transport_op *op = arg; - grpc_closure_sched(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_SCHED(exec_ctx, op->inner_on_complete, GRPC_ERROR_REF(error)); gpr_free(op); } grpc_transport_op *grpc_make_transport_op(grpc_closure *on_complete) { made_transport_op *op = gpr_malloc(sizeof(*op)); - grpc_closure_init(&op->outer_on_complete, destroy_made_transport_op, op, + GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_op, op, grpc_schedule_on_exec_ctx); op->inner_on_complete = on_complete; memset(&op->op, 0, sizeof(op->op)); @@ -252,14 +252,14 @@ static void destroy_made_transport_stream_op(grpc_exec_ctx *exec_ctx, void *arg, made_transport_stream_op *op = arg; grpc_closure *c = op->inner_on_complete; gpr_free(op); - grpc_closure_run(exec_ctx, c, GRPC_ERROR_REF(error)); + GRPC_CLOSURE_RUN(exec_ctx, c, GRPC_ERROR_REF(error)); } grpc_transport_stream_op_batch *grpc_make_transport_stream_op( grpc_closure *on_complete) { made_transport_stream_op *op = gpr_zalloc(sizeof(*op)); op->op.payload = &op->payload; - grpc_closure_init(&op->outer_on_complete, destroy_made_transport_stream_op, + GRPC_CLOSURE_INIT(&op->outer_on_complete, destroy_made_transport_stream_op, op, grpc_schedule_on_exec_ctx); op->inner_on_complete = on_complete; op->op.on_complete = &op->outer_on_complete; |