From c027e77d6408c7ae9b1889f64e2629fa66872f3d Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 3 May 2016 16:27:00 -0700 Subject: Progress --- src/core/lib/iomgr/workqueue_posix.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src/core/lib/iomgr/workqueue_posix.c') diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 80e7a0b206..e27525dd50 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -45,7 +45,7 @@ #include "src/core/lib/iomgr/ev_posix.h" -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) { char name[32]; @@ -110,10 +110,10 @@ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_unlock(&workqueue->mu); } -static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_workqueue *workqueue = arg; - if (!success) { + if (error != GRPC_ERROR_NONE) { gpr_mu_destroy(&workqueue->mu); /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; @@ -131,12 +131,12 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - int success) { + grpc_error *error) { gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } - grpc_closure_list_add(&workqueue->closure_list, closure, success); + grpc_closure_list_append(&workqueue->closure_list, closure, error); gpr_mu_unlock(&workqueue->mu); } -- cgit v1.2.3 From 4f1d0f337b15748d171ec21dddba1f6d80e94778 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Fri, 6 May 2016 17:12:37 -0700 Subject: Error reporting progress --- src/core/lib/iomgr/error.c | 10 ++ src/core/lib/iomgr/error.h | 6 + src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 131 ++++++++++++++------- src/core/lib/iomgr/ev_posix.c | 16 +-- src/core/lib/iomgr/ev_posix.h | 12 +- src/core/lib/iomgr/pollset.h | 11 +- src/core/lib/iomgr/tcp_client_posix.c | 15 ++- src/core/lib/iomgr/wakeup_fd_pipe.c | 22 ++-- src/core/lib/iomgr/wakeup_fd_posix.c | 12 +- src/core/lib/iomgr/wakeup_fd_posix.h | 15 ++- src/core/lib/iomgr/workqueue.h | 10 +- src/core/lib/iomgr/workqueue_posix.c | 62 ++++++---- .../google_default/google_default_credentials.c | 13 +- src/core/lib/surface/completion_queue.c | 38 +++++- test/core/iomgr/endpoint_tests.c | 10 +- test/core/security/oauth2_utils.c | 12 +- 16 files changed, 266 insertions(+), 129 deletions(-) (limited to 'src/core/lib/iomgr/workqueue_posix.c') diff --git a/src/core/lib/iomgr/error.c b/src/core/lib/iomgr/error.c index d5587a4194..bc50d6870f 100644 --- a/src/core/lib/iomgr/error.c +++ b/src/core/lib/iomgr/error.c @@ -469,3 +469,13 @@ grpc_error *grpc_os_error(const char *file, int line, int err, GRPC_ERROR_STR_OS_ERROR, strerror(err)), GRPC_ERROR_STR_SYSCALL, call_name); } + +bool grpc_log_if_error(const char *what, grpc_error *error, const char *file, + int line) { + if (error == GRPC_ERROR_NONE) return true; + const char *msg = grpc_error_string(error); + gpr_log(file, line, GPR_LOG_SEVERITY_ERROR, "%s: %s", what, msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(error); + return false; +} diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index 98661a6a2c..2b56aede75 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -34,6 +34,7 @@ #ifndef GRPC_CORE_LIB_IOMGR_ERROR_H #define GRPC_CORE_LIB_IOMGR_ERROR_H +#include #include #include @@ -114,4 +115,9 @@ grpc_error *grpc_os_error(const char *file, int line, int err, #define GRPC_OS_ERROR(err, call_name) \ grpc_os_error(__FILE__, __LINE__, err, call_name) +bool grpc_log_if_error(const char *what, grpc_error *error, const char *file, + int line); +#define GRPC_LOG_IF_ERROR(what, error) \ + grpc_log_if_error((what), (error), __FILE__, __LINE__) + #endif /* GRPC_CORE_LIB_IOMGR_ERROR_H */ diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index d11c947df2..288a44103b 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -217,9 +217,10 @@ struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd, int and_unlock_pollset); - void (*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now); + grpc_error *(*maybe_work_and_unlock)(grpc_exec_ctx *exec_ctx, + grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; @@ -247,9 +248,9 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, #define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2 /* As per pollset_kick, with an extended set of flags (defined above) -- mostly for fd_posix's use. */ -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags); +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) GRPC_MUST_USE_RESULT; /* turn a pollset into a multipoller: platform specific */ typedef void (*platform_become_multipoller_type)(grpc_exec_ctx *exec_ctx, @@ -415,12 +416,13 @@ static bool fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } -static void pollset_kick_locked(grpc_fd_watcher *watcher) { +static grpc_error *pollset_kick_locked(grpc_fd_watcher *watcher) { gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); - pollset_kick_ext(watcher->pollset, watcher->worker, - GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); + grpc_error *err = pollset_kick_ext(watcher->pollset, watcher->worker, + GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); gpr_mu_unlock(&watcher->pollset->mu); + return err; } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { @@ -726,10 +728,19 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } -static void pollset_kick_ext(grpc_pollset *p, - grpc_pollset_worker *specific_worker, - uint32_t flags) { +static void kick_append_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("Kick Failure"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *pollset_kick_ext(grpc_pollset *p, + grpc_pollset_worker *specific_worker, + uint32_t flags) { GPR_TIMER_BEGIN("pollset_kick_ext", 0); + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ if (specific_worker != NULL) { @@ -739,7 +750,8 @@ static void pollset_kick_ext(grpc_pollset *p, for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } p->kicked_without_pollers = 1; GPR_TIMER_END("pollset_kick_ext.broadcast", 0); @@ -750,14 +762,16 @@ static void pollset_kick_ext(grpc_pollset *p, specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { GPR_TIMER_MARK("kick_yoself", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error(&error, + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) { GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); @@ -778,7 +792,8 @@ static void pollset_kick_ext(grpc_pollset *p, if (specific_worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); + kick_append_error( + &error, grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd)); } } else { GPR_TIMER_MARK("kicked_no_pollers", 0); @@ -787,20 +802,21 @@ static void pollset_kick_ext(grpc_pollset *p, } GPR_TIMER_END("pollset_kick_ext", 0); + return error; } -static void pollset_kick(grpc_pollset *p, - grpc_pollset_worker *specific_worker) { - pollset_kick_ext(p, specific_worker, 0); +static grpc_error *pollset_kick(grpc_pollset *p, + grpc_pollset_worker *specific_worker) { + return pollset_kick_ext(p, specific_worker, 0); } /* global state management */ -static void pollset_global_init(void) { +static grpc_error *pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); gpr_tls_init(&g_current_thread_worker); grpc_wakeup_fd_global_init(); - grpc_wakeup_fd_init(&grpc_global_wakeup_fd); + return grpc_wakeup_fd_init(&grpc_global_wakeup_fd); } static void pollset_global_shutdown(void) { @@ -810,7 +826,9 @@ static void pollset_global_shutdown(void) { grpc_wakeup_fd_global_destroy(); } -static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } +static grpc_error *kick_poller(void) { + return grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); +} /* main interface */ @@ -876,11 +894,12 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } -static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker_hdl, gpr_timespec now, - gpr_timespec deadline) { +static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker_hdl, + gpr_timespec now, gpr_timespec deadline) { grpc_pollset_worker worker; *worker_hdl = &worker; + grpc_error *error = GRPC_ERROR_NONE; /* pollset->mu already held */ int added_worker = 0; @@ -896,7 +915,10 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); - grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + error = grpc_wakeup_fd_init(&worker.wakeup_fd->fd); + if (error != GRPC_ERROR_NONE) { + return error; + } } worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the @@ -933,8 +955,8 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, - deadline, now); + error = pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, + deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); @@ -955,7 +977,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* If we're forced to re-evaluate polling (via pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker.reevaluate_polling_on_wakeup) { + if (worker.reevaluate_polling_on_wakeup && error == GRPC_ERROR_NONE) { worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; if (queued_work || worker.kicked_specifically) { @@ -996,6 +1018,7 @@ static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } *worker_hdl = NULL; GPR_TIMER_END("pollset_work", 0); + return error; } static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1156,11 +1179,19 @@ exit: } } -static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, - grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now) { +static void work_combine_error(grpc_error **composite, grpc_error *error) { + if (error == GRPC_ERROR_NONE) return; + if (*composite == GRPC_ERROR_NONE) { + *composite = GRPC_ERROR_CREATE("pollset_work"); + } + *composite = grpc_error_add_child(*composite, error); +} + +static grpc_error *basic_pollset_maybe_work_and_unlock( + grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1210,7 +1241,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (r < 0) { if (errno != EINTR) { - gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + work_combine_error(&error, GRPC_OS_ERROR(errno, "poll")); } if (fd) { fd_end_poll(exec_ctx, &fd_watcher, 0, 0); @@ -1221,10 +1252,12 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, } } else { if (pfd[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } if (nfds > 2) { fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, @@ -1237,6 +1270,8 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, if (fd) { GRPC_FD_UNREF(fd, "basicpoll_begin"); } + + return error; } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -1297,9 +1332,11 @@ exit: } } -static void multipoll_with_poll_pollset_maybe_work_and_unlock( +static grpc_error *multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now) { + grpc_error *error = GRPC_ERROR_NONE; + #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR) #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR) @@ -1374,10 +1411,12 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } } else { if (pfds[0].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); } if (pfds[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { @@ -1391,6 +1430,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_free(pfds); gpr_free(watchers); + return error; } static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { @@ -1934,14 +1974,23 @@ static const grpc_event_engine_vtable vtable = { }; const grpc_event_engine_vtable *grpc_init_poll_and_epoll_posix(void) { + const char *msg; + grpc_error *err = GRPC_ERROR_NONE; #ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL platform_become_multipoller = epoll_become_multipoller; #else platform_become_multipoller = poll_become_multipoller; #endif fd_global_init(); - pollset_global_init(); + err = pollset_global_init(); + if (err != GRPC_ERROR_NONE) goto error; return &vtable; + +error: + msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "%s", msg); + grpc_error_free_string(msg); + return NULL; } #endif diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index 7df1751352..3b965c8b8c 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -101,15 +101,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { g_event_engine->pollset_destroy(pollset); } -void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline) { - g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); +grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline) { + return g_event_engine->pollset_work(exec_ctx, pollset, worker, now, deadline); } -void grpc_pollset_kick(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker) { - g_event_engine->pollset_kick(pollset, specific_worker); +grpc_error *grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) { + return g_event_engine->pollset_kick(pollset, specific_worker); } void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -159,6 +159,6 @@ void grpc_pollset_set_del_fd(grpc_exec_ctx *exec_ctx, g_event_engine->pollset_set_del_fd(exec_ctx, pollset_set, fd); } -void grpc_kick_poller(void) { g_event_engine->kick_poller(); } +grpc_error *grpc_kick_poller(void) { return g_event_engine->kick_poller(); } #endif // GPR_POSIX_SOCKET diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 1fa9f5ef2d..c9f9fd9957 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -61,11 +61,11 @@ typedef struct grpc_event_engine_vtable { grpc_closure *closure); void (*pollset_reset)(grpc_pollset *pollset); void (*pollset_destroy)(grpc_pollset *pollset); - void (*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); - void (*pollset_kick)(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker); + grpc_error *(*pollset_work)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline); + grpc_error *(*pollset_kick)(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); void (*pollset_add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); @@ -88,7 +88,7 @@ typedef struct grpc_event_engine_vtable { void (*pollset_set_del_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); - void (*kick_poller)(void); + grpc_error *(*kick_poller)(void); void (*shutdown_engine)(void); } grpc_event_engine_vtable; diff --git a/src/core/lib/iomgr/pollset.h b/src/core/lib/iomgr/pollset.h index c40a474877..8d9edc8406 100644 --- a/src/core/lib/iomgr/pollset.h +++ b/src/core/lib/iomgr/pollset.h @@ -81,14 +81,15 @@ void grpc_pollset_destroy(grpc_pollset *pollset); May call grpc_closure_list_run on grpc_closure_list, without holding the pollset lock */ -void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker **worker, gpr_timespec now, - gpr_timespec deadline); +grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_pollset_worker **worker, gpr_timespec now, + gpr_timespec deadline) GRPC_MUST_USE_RESULT; /* Break one polling thread out of polling work for this pollset. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. Otherwise, if specific_worker is non-NULL, then kick that worker. */ -void grpc_pollset_kick(grpc_pollset *pollset, - grpc_pollset_worker *specific_worker); +grpc_error *grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker) + GRPC_MUST_USE_RESULT; #endif /* GRPC_CORE_LIB_IOMGR_POLLSET_H */ diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index f5a3fb4aa3..c70f11b40a 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -76,13 +76,16 @@ static grpc_error *prepare_socket(const struct sockaddr *addr, int fd) { GPR_ASSERT(fd >= 0); - if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1) || - (!grpc_is_unix_socket(addr) && !grpc_set_socket_low_latency(fd, 1)) || - !grpc_set_socket_no_sigpipe_if_possible(fd)) { - gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, - strerror(errno)); - goto error; + err = grpc_set_socket_nonblocking(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + err = grpc_set_socket_cloexec(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; + if (!grpc_is_unix_socket(addr)) { + err = grpc_set_socket_low_latency(fd, 1); + if (err != GRPC_ERROR_NONE) goto error; } + err = grpc_set_socket_no_sigpipe_if_possible(fd); + if (err != GRPC_ERROR_NONE) goto error; goto done; error: diff --git a/src/core/lib/iomgr/wakeup_fd_pipe.c b/src/core/lib/iomgr/wakeup_fd_pipe.c index e9b9a0119f..4e5dbdcb73 100644 --- a/src/core/lib/iomgr/wakeup_fd_pipe.c +++ b/src/core/lib/iomgr/wakeup_fd_pipe.c @@ -45,7 +45,7 @@ #include "src/core/lib/iomgr/socket_utils_posix.h" -static void pipe_init(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_init(grpc_wakeup_fd* fd_info) { int pipefd[2]; /* TODO(klempner): Make this nonfatal */ int r = pipe(pipefd); @@ -53,36 +53,40 @@ static void pipe_init(grpc_wakeup_fd* fd_info) { gpr_log(GPR_ERROR, "pipe creation failed (%d): %s", errno, strerror(errno)); abort(); } - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); + grpc_error* err; + err = grpc_set_socket_nonblocking(pipefd[0], 1); + if (err != GRPC_ERROR_NONE) return err; + err = grpc_set_socket_nonblocking(pipefd[1], 1); + if (err != GRPC_ERROR_NONE) return err; fd_info->read_fd = pipefd[0]; fd_info->write_fd = pipefd[1]; + return GRPC_ERROR_NONE; } -static void pipe_consume(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_consume(grpc_wakeup_fd* fd_info) { char buf[128]; ssize_t r; for (;;) { r = read(fd_info->read_fd, buf, sizeof(buf)); if (r > 0) continue; - if (r == 0) return; + if (r == 0) return GRPC_ERROR_NONE; switch (errno) { case EAGAIN: - return; + return GRPC_ERROR_NONE; case EINTR: continue; default: - gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); - return; + return GRPC_OS_ERROR(errno, "read"); } } } -static void pipe_wakeup(grpc_wakeup_fd* fd_info) { +static grpc_error* pipe_wakeup(grpc_wakeup_fd* fd_info) { char c = 0; while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR) ; + return GRPC_ERROR_NONE; } static void pipe_destroy(grpc_wakeup_fd* fd_info) { diff --git a/src/core/lib/iomgr/wakeup_fd_posix.c b/src/core/lib/iomgr/wakeup_fd_posix.c index 525369c356..046208abc8 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.c +++ b/src/core/lib/iomgr/wakeup_fd_posix.c @@ -53,16 +53,16 @@ void grpc_wakeup_fd_global_init(void) { void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; } -void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->init(fd_info); +grpc_error *grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->init(fd_info); } -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->consume(fd_info); +grpc_error *grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->consume(fd_info); } -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { - wakeup_fd_vtable->wakeup(fd_info); +grpc_error *grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { + return wakeup_fd_vtable->wakeup(fd_info); } void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) { diff --git a/src/core/lib/iomgr/wakeup_fd_posix.h b/src/core/lib/iomgr/wakeup_fd_posix.h index 6b069c1837..e269f242d8 100644 --- a/src/core/lib/iomgr/wakeup_fd_posix.h +++ b/src/core/lib/iomgr/wakeup_fd_posix.h @@ -62,6 +62,8 @@ #ifndef GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H #define GRPC_CORE_LIB_IOMGR_WAKEUP_FD_POSIX_H +#include "src/core/lib/iomgr/error.h" + void grpc_wakeup_fd_global_init(void); void grpc_wakeup_fd_global_destroy(void); @@ -72,9 +74,9 @@ void grpc_wakeup_fd_global_init_force_fallback(void); typedef struct grpc_wakeup_fd grpc_wakeup_fd; typedef struct grpc_wakeup_fd_vtable { - void (*init)(grpc_wakeup_fd* fd_info); - void (*consume)(grpc_wakeup_fd* fd_info); - void (*wakeup)(grpc_wakeup_fd* fd_info); + grpc_error* (*init)(grpc_wakeup_fd* fd_info); + grpc_error* (*consume)(grpc_wakeup_fd* fd_info); + grpc_error* (*wakeup)(grpc_wakeup_fd* fd_info); void (*destroy)(grpc_wakeup_fd* fd_info); /* Must be called before calling any other functions */ int (*check_availability)(void); @@ -89,9 +91,10 @@ extern int grpc_allow_specialized_wakeup_fd; #define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) -void grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info); -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info); -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info); +grpc_error* grpc_wakeup_fd_init(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT; +grpc_error* grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd* fd_info) + GRPC_MUST_USE_RESULT; +grpc_error* grpc_wakeup_fd_wakeup(grpc_wakeup_fd* fd_info) GRPC_MUST_USE_RESULT; void grpc_wakeup_fd_destroy(grpc_wakeup_fd* fd_info); /* Defined in some specialized implementation's .c file, or by diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 20b47e1c14..cb6aed9bd4 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -50,9 +50,11 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ /** Create a work queue */ -grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx); +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue); -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); +grpc_error *grpc_workqueue_flush( + grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) GRPC_MUST_USE_RESULT; #define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -77,7 +79,7 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); /** Add a work item to a workqueue */ -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - grpc_error *error); +void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error); #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index e27525dd50..1187e627a2 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -47,20 +47,25 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -grpc_workqueue *grpc_workqueue_create(grpc_exec_ctx *exec_ctx) { +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue) { char name[32]; - grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); - gpr_ref_init(&workqueue->refs, 1); - gpr_mu_init(&workqueue->mu); - workqueue->closure_list.head = workqueue->closure_list.tail = NULL; - grpc_wakeup_fd_init(&workqueue->wakeup_fd); - sprintf(name, "workqueue:%p", (void *)workqueue); - workqueue->wakeup_read_fd = - grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name); - grpc_closure_init(&workqueue->read_closure, on_readable, workqueue); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); - return workqueue; + *workqueue = gpr_malloc(sizeof(grpc_workqueue)); + gpr_ref_init(&(*workqueue)->refs, 1); + gpr_mu_init(&(*workqueue)->mu); + (*workqueue)->closure_list.head = (*workqueue)->closure_list.tail = NULL; + grpc_error *err = grpc_wakeup_fd_init(&(*workqueue)->wakeup_fd); + if (err != GRPC_ERROR_NONE) { + gpr_free(*workqueue); + return err; + } + sprintf(name, "workqueue:%p", (void *)(*workqueue)); + (*workqueue)->wakeup_read_fd = grpc_fd_create( + GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); + grpc_closure_init(&(*workqueue)->read_closure, on_readable, workqueue); + grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, + &(*workqueue)->read_closure); + return GRPC_ERROR_NONE; } static void workqueue_destroy(grpc_exec_ctx *exec_ctx, @@ -101,13 +106,16 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd); } -void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { +grpc_error *grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, + grpc_workqueue *workqueue) { + grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); + return error; } static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -123,20 +131,32 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } else { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); - grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + error = grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); - grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, - &workqueue->read_closure); + if (error == GRPC_ERROR_NONE) { + grpc_fd_notify_on_read(exec_ctx, workqueue->wakeup_read_fd, + &workqueue->read_closure); + } else { + /* recurse to get error handling */ + on_readable(exec_ctx, arg, error); + } } } -void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, - grpc_error *error) { +void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { - grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + push_error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } grpc_closure_list_append(&workqueue->closure_list, closure, error); + if (push_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(push_error); + gpr_log(GPR_ERROR, "Failed to push to workqueue: %s", msg); + grpc_error_free_string(msg); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); + } gpr_mu_unlock(&workqueue->mu); } diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c index a67f19ae0f..019def95c2 100644 --- a/src/core/lib/security/credentials/google_default/google_default_credentials.c +++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c @@ -88,7 +88,7 @@ static void on_compute_engine_detection_http_response(grpc_exec_ctx *exec_ctx, } gpr_mu_lock(g_polling_mu); detector->is_done = 1; - grpc_pollset_kick(detector->pollset, NULL); + GRPC_LOG_IF_ERROR("Pollset kick", grpc_pollset_kick(detector->pollset, NULL)); gpr_mu_unlock(g_polling_mu); } @@ -131,9 +131,14 @@ static int is_stack_running_on_compute_engine(void) { gpr_mu_lock(g_polling_mu); while (!detector.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, detector.pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + if (!GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, detector.pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))) { + detector.is_done = 1; + detector.success = 0; + } } gpr_mu_unlock(g_polling_mu); diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c index 50e2a1bd16..5bb9390288 100644 --- a/src/core/lib/surface/completion_queue.c +++ b/src/core/lib/surface/completion_queue.c @@ -263,8 +263,15 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, break; } } - grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + grpc_error *kick_error = + grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); gpr_mu_unlock(cc->mu); + if (kick_error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(kick_error); + gpr_log(GPR_ERROR, "Kick failed: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(kick_error); + } } else { cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); @@ -343,8 +350,18 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_mu_lock(cc->mu); continue; } else { - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, - iteration_deadline); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), + &worker, now, iteration_deadline); + if (err != GRPC_ERROR_NONE) { + gpr_mu_unlock(cc->mu); + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + break; + } } } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); @@ -460,8 +477,19 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(cc->mu); } else { - grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, - iteration_deadline); + grpc_error *err = grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), + &worker, now, iteration_deadline); + if (err != GRPC_ERROR_NONE) { + del_plucker(cc, tag, &worker); + gpr_mu_unlock(cc->mu); + const char *msg = grpc_error_string(err); + gpr_log(GPR_ERROR, "Completion queue next failed: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(err); + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + break; + } } del_plucker(cc, tag, &worker); } diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index ded76bccec..8eec10414d 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -137,7 +137,7 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Read handler done"); gpr_mu_lock(g_mu); state->read_done = 1 + (error == GRPC_ERROR_NONE); - grpc_pollset_kick(g_pollset, NULL); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)); gpr_mu_unlock(g_mu); } else if (error == GRPC_ERROR_NONE) { grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, @@ -172,7 +172,7 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Write handler done"); gpr_mu_lock(g_mu); state->write_done = 1 + (error != GRPC_ERROR_NONE); - grpc_pollset_kick(g_pollset, NULL); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)); gpr_mu_unlock(g_mu); } @@ -237,8 +237,10 @@ static void read_and_write_test(grpc_endpoint_test_config config, while (!state.read_done || !state.write_done) { grpc_pollset_worker *worker = NULL; GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); } gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index e55ce5c7b4..c92850d681 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -70,7 +70,7 @@ static void on_oauth2_response(grpc_exec_ctx *exec_ctx, void *user_data, gpr_mu_lock(request->mu); request->is_done = 1; request->token = token; - grpc_pollset_kick(request->pollset, NULL); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(request->pollset, NULL)); gpr_mu_unlock(request->mu); } @@ -99,9 +99,13 @@ char *grpc_test_fetch_oauth2_token_with_credentials( gpr_mu_lock(request.mu); while (!request.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, request.pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + if (GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, request.pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))) { + request.is_done = 1; + } } gpr_mu_unlock(request.mu); -- cgit v1.2.3 From 1aee5362f43ece8eb77ba219c163f1169d0d0b24 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Sat, 7 May 2016 11:26:50 -0700 Subject: Progress converting to new error system --- src/core/lib/iomgr/ev_poll_and_epoll_posix.c | 6 ++- src/core/lib/iomgr/wakeup_fd_eventfd.c | 2 +- src/core/lib/iomgr/workqueue.h | 3 +- src/core/lib/iomgr/workqueue_posix.c | 8 +--- src/core/lib/transport/connectivity_state.c | 3 +- test/core/http/httpcli_test.c | 15 +++++--- test/core/http/httpscli_test.c | 15 +++++--- test/core/internal_api_canaries/iomgr.c | 7 ++-- test/core/iomgr/fd_posix_test.c | 44 ++++++++++++++-------- test/core/iomgr/tcp_client_posix_test.c | 15 +++++--- test/core/iomgr/tcp_posix_test.c | 44 ++++++++++++++-------- test/core/iomgr/tcp_server_posix_test.c | 9 +++-- test/core/iomgr/workqueue_test.c | 29 +++++++++----- .../security/print_google_default_creds_token.c | 11 ++++-- test/core/security/verify_jwt.c | 11 ++++-- test/core/surface/concurrent_connectivity_test.c | 8 +++- test/core/util/test_tcp_server.c | 5 ++- 17 files changed, 149 insertions(+), 86 deletions(-) (limited to 'src/core/lib/iomgr/workqueue_posix.c') diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index d9146c3b14..836db6b3ce 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -1699,7 +1699,8 @@ static grpc_error *multipoll_with_epoll_pollset_maybe_work_and_unlock( /* do nothing */ } else { if (pfds[0].revents) { - work_combine_error(&error, grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); + work_combine_error(&error, + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd)); } if (pfds[1].revents) { do { @@ -1719,7 +1720,8 @@ static grpc_error *multipoll_with_epoll_pollset_maybe_work_and_unlock( int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); int write_ev = ep_ev[i].events & EPOLLOUT; if (fd == NULL) { - work_combine_error(&error, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd)); + work_combine_error(&error, grpc_wakeup_fd_consume_wakeup( + &grpc_global_wakeup_fd)); } else { if (read_ev || cancel) { fd_become_readable(exec_ctx, fd); diff --git a/src/core/lib/iomgr/wakeup_fd_eventfd.c b/src/core/lib/iomgr/wakeup_fd_eventfd.c index 50852dc4d6..3047205833 100644 --- a/src/core/lib/iomgr/wakeup_fd_eventfd.c +++ b/src/core/lib/iomgr/wakeup_fd_eventfd.c @@ -54,7 +54,7 @@ static grpc_error* eventfd_create(grpc_wakeup_fd* fd_info) { return GRPC_ERROR_NONE; } -static grpc_error *eventfd_consume(grpc_wakeup_fd* fd_info) { +static grpc_error* eventfd_consume(grpc_wakeup_fd* fd_info) { eventfd_t value; int err; do { diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index cb6aed9bd4..6e8b1218be 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -53,8 +53,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, grpc_workqueue **workqueue); -grpc_error *grpc_workqueue_flush( - grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) GRPC_MUST_USE_RESULT; +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 1187e627a2..66879ba7b3 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -106,16 +106,10 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd); } -grpc_error *grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue) { - grpc_error *error = GRPC_ERROR_NONE; +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_lock(&workqueue->mu); - if (grpc_closure_list_empty(workqueue->closure_list)) { - error = grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); - } grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); gpr_mu_unlock(&workqueue->mu); - return error; } static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 6df32a3a30..da0de276a2 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -133,7 +133,8 @@ int grpc_connectivity_state_notify_on_state_change( } else { if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_REF(tracker->current_error), NULL); + grpc_exec_ctx_push(exec_ctx, notify, + GRPC_ERROR_REF(tracker->current_error), NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; diff --git a/test/core/http/httpcli_test.c b/test/core/http/httpcli_test.c index 32bef2005a..7b872879cc 100644 --- a/test/core/http/httpcli_test.c +++ b/test/core/http/httpcli_test.c @@ -65,7 +65,8 @@ static void on_finish(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GPR_ASSERT(0 == memcmp(expect, response->body, response->body_length)); gpr_mu_lock(g_mu); g_done = 1; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } @@ -92,8 +93,10 @@ static void test_get(int port) { gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -126,8 +129,10 @@ static void test_post(int port) { gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); diff --git a/test/core/http/httpscli_test.c b/test/core/http/httpscli_test.c index dce3eb6de0..c7db7a2159 100644 --- a/test/core/http/httpscli_test.c +++ b/test/core/http/httpscli_test.c @@ -65,7 +65,8 @@ static void on_finish(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { GPR_ASSERT(0 == memcmp(expect, response->body, response->body_length)); gpr_mu_lock(g_mu); g_done = 1; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } @@ -93,8 +94,10 @@ static void test_get(int port) { gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -128,8 +131,10 @@ static void test_post(int port) { gpr_mu_lock(g_mu); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 84ff8ff556..71a1bb1436 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -103,9 +103,10 @@ static void test_code(void) { grpc_pollset_shutdown(NULL, NULL, NULL); grpc_pollset_reset(NULL); grpc_pollset_destroy(NULL); - grpc_pollset_work(NULL, NULL, NULL, gpr_now(GPR_CLOCK_REALTIME), - gpr_now(GPR_CLOCK_MONOTONIC)); - grpc_pollset_kick(NULL, NULL); + GRPC_ERROR_UNREF(grpc_pollset_work(NULL, NULL, NULL, + gpr_now(GPR_CLOCK_REALTIME), + gpr_now(GPR_CLOCK_MONOTONIC))); + GRPC_ERROR_UNREF(grpc_pollset_kick(NULL, NULL)); } int main(void) { diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 02a7b341de..ab5c279ac6 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -185,7 +185,8 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */, gpr_mu_lock(g_mu); sv->done = 1; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } @@ -257,9 +258,11 @@ static void server_wait_and_shutdown(server *sv) { while (!sv->done) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -300,7 +303,8 @@ static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx, client *cl = arg; grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c"); cl->done = 1; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); } /* Write as much as possible, then register notify_on_write. */ @@ -372,9 +376,11 @@ static void client_wait_and_shutdown(client *cl) { while (!cl->done) { grpc_pollset_worker *worker = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -417,7 +423,8 @@ static void first_read_callback(grpc_exec_ctx *exec_ctx, gpr_mu_lock(g_mu); fdc->cb_that_ran = first_read_callback; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } @@ -428,7 +435,8 @@ static void second_read_callback(grpc_exec_ctx *exec_ctx, gpr_mu_lock(g_mu); fdc->cb_that_ran = second_read_callback; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } @@ -474,9 +482,11 @@ static void test_grpc_fd_change(void) { gpr_mu_lock(g_mu); while (a.cb_that_ran == NULL) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -498,9 +508,11 @@ static void test_grpc_fd_change(void) { gpr_mu_lock(g_mu); while (b.cb_that_ran == NULL) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index d1c57ca769..d0c1047423 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -63,7 +63,8 @@ static gpr_timespec test_deadline(void) { static void finish_connection() { gpr_mu_lock(g_mu); g_connections_complete++; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } @@ -126,9 +127,11 @@ void test_succeeds(void) { while (g_connections_complete == connections_complete_before) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); @@ -169,7 +172,9 @@ void test_fails(void) { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec polling_deadline = test_deadline(); if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { - grpc_pollset_work(&exec_ctx, g_pollset, &worker, now, polling_deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, now, + polling_deadline))); } gpr_mu_unlock(g_mu); grpc_exec_ctx_flush(&exec_ctx); diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 6b21f0dc27..0a351f7422 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -193,8 +193,10 @@ static void read_test(size_t num_bytes, size_t slice_size) { gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -239,8 +241,10 @@ static void large_read_test(size_t slice_size) { gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -289,7 +293,8 @@ static void write_done(grpc_exec_ctx *exec_ctx, gpr_mu_lock(g_mu); gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } @@ -308,9 +313,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { for (;;) { grpc_pollset_worker *worker = NULL; gpr_mu_lock(g_mu); - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); do { @@ -372,8 +379,10 @@ static void write_test(size_t num_bytes, size_t slice_size) { if (state.write_done) { break; } - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -389,7 +398,8 @@ static void write_test(size_t num_bytes, size_t slice_size) { void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *errors) { int *done = arg; *done = 1; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); } /* Do a read_test, then release fd and try to read/write again. Verify that @@ -429,8 +439,10 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); @@ -443,8 +455,10 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { gpr_mu_lock(g_mu); while (!fd_released_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); } gpr_mu_unlock(g_mu); GPR_ASSERT(fd_released_done == 1); diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 00ec175c8f..a37ff1773b 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -120,7 +120,8 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_mu_lock(g_mu); on_connect_result_set(&g_result, acceptor); g_nconnects++; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } @@ -196,8 +197,10 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, while (g_nconnects == nconnects_before && gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(exec_ctx, g_pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(exec_ctx); gpr_mu_lock(g_mu); diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c index 2818e55b45..65513cb9b1 100644 --- a/test/core/iomgr/workqueue_test.c +++ b/test/core/iomgr/workqueue_test.c @@ -46,13 +46,16 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, grpc_error *error) { GPR_ASSERT(error == GRPC_ERROR_NONE); gpr_mu_lock(g_mu); *(int *)p = 1; - grpc_pollset_kick(g_pollset, NULL); + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); gpr_mu_unlock(g_mu); } static void test_ref_unref(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq = grpc_workqueue_create(&exec_ctx); + grpc_workqueue *wq; + GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", + grpc_workqueue_create(&exec_ctx, &wq))); GRPC_WORKQUEUE_REF(wq, "test"); GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "test"); GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); @@ -63,18 +66,22 @@ static void test_add_closure(void) { grpc_closure c; int done = 0; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq = grpc_workqueue_create(&exec_ctx); + grpc_workqueue *wq; + GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", + grpc_workqueue_create(&exec_ctx, &wq))); gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); grpc_pollset_worker *worker = NULL; grpc_closure_init(&c, must_succeed, &done); - grpc_workqueue_push(wq, &c, GRPC_ERROR_NONE); + grpc_workqueue_push(&exec_ctx, wq, &c, GRPC_ERROR_NONE); grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); gpr_mu_lock(g_mu); GPR_ASSERT(!done); - grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(deadline.clock_type), - deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(deadline.clock_type), deadline))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(done); @@ -87,7 +94,9 @@ static void test_flush(void) { grpc_closure c; int done = 0; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_workqueue *wq = grpc_workqueue_create(&exec_ctx); + grpc_workqueue *wq; + GPR_ASSERT(GRPC_LOG_IF_ERROR("grpc_workqueue_create", + grpc_workqueue_create(&exec_ctx, &wq))); gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); grpc_pollset_worker *worker = NULL; grpc_closure_init(&c, must_succeed, &done); @@ -98,8 +107,10 @@ static void test_flush(void) { gpr_mu_lock(g_mu); GPR_ASSERT(!done); - grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(deadline.clock_type), - deadline); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(deadline.clock_type), deadline))); gpr_mu_unlock(g_mu); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(done); diff --git a/test/core/security/print_google_default_creds_token.c b/test/core/security/print_google_default_creds_token.c index 1b7036cf9e..33e59c2ca9 100644 --- a/test/core/security/print_google_default_creds_token.c +++ b/test/core/security/print_google_default_creds_token.c @@ -67,7 +67,7 @@ static void on_metadata_response(grpc_exec_ctx *exec_ctx, void *user_data, } gpr_mu_lock(sync->mu); sync->is_done = 1; - grpc_pollset_kick(sync->pollset, NULL); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(sync->pollset, NULL)); gpr_mu_unlock(sync->mu); } @@ -105,9 +105,12 @@ int main(int argc, char **argv) { gpr_mu_lock(sync.mu); while (!sync.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, sync.pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + if (!GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, sync.pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))) + sync.is_done = 1; gpr_mu_unlock(sync.mu); grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(sync.mu); diff --git a/test/core/security/verify_jwt.c b/test/core/security/verify_jwt.c index ecb873b655..728d2d637a 100644 --- a/test/core/security/verify_jwt.c +++ b/test/core/security/verify_jwt.c @@ -81,7 +81,7 @@ static void on_jwt_verification_done(void *user_data, gpr_mu_lock(sync->mu); sync->is_done = 1; - grpc_pollset_kick(sync->pollset, NULL); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(sync->pollset, NULL)); gpr_mu_unlock(sync->mu); } @@ -115,9 +115,12 @@ int main(int argc, char **argv) { gpr_mu_lock(sync.mu); while (!sync.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, sync.pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + if (!GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(&exec_ctx, sync.pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), + gpr_inf_future(GPR_CLOCK_MONOTONIC)))) + sync.is_done = true; gpr_mu_unlock(sync.mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(sync.mu); diff --git a/test/core/surface/concurrent_connectivity_test.c b/test/core/surface/concurrent_connectivity_test.c index 4bafd35803..917c4758a6 100644 --- a/test/core/surface/concurrent_connectivity_test.c +++ b/test/core/surface/concurrent_connectivity_test.c @@ -101,7 +101,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *vargs, grpc_endpoint *tcp, (void)acceptor; grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp); - grpc_pollset_kick(args->pollset, NULL); + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(args->pollset, NULL)); } void bad_server_thread(void *vargs) { @@ -131,7 +131,11 @@ void bad_server_thread(void *vargs) { gpr_time_add(now, gpr_time_from_millis(100, GPR_TIMESPAN)); grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, args->pollset, &worker, now, deadline); + if (!GRPC_LOG_IF_ERROR("pollset_work", + grpc_pollset_work(&exec_ctx, args->pollset, &worker, + now, deadline))) { + gpr_atm_rel_store(&args->stop, 1); + } gpr_mu_unlock(args->mu); grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(args->mu); diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index fc89c41907..27c16fc764 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -94,8 +94,9 @@ void test_tcp_server_poll(test_tcp_server *server, int seconds) { gpr_time_from_seconds(seconds, GPR_TIMESPAN)); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(server->mu); - grpc_pollset_work(&exec_ctx, server->pollset, &worker, - gpr_now(GPR_CLOCK_MONOTONIC), deadline); + GRPC_LOG_IF_ERROR("pollset_work", + grpc_pollset_work(&exec_ctx, server->pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline)); gpr_mu_unlock(server->mu); grpc_exec_ctx_finish(&exec_ctx); } -- cgit v1.2.3 From 25dc53915b2469ae43c22e118a8f2e537eee200a Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 10 May 2016 10:53:55 -0700 Subject: Fix workqueues, timer shutdown --- src/core/lib/iomgr/timer.c | 3 ++- src/core/lib/iomgr/workqueue_posix.c | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) (limited to 'src/core/lib/iomgr/workqueue_posix.c') diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c index bd8769b11b..99a66ee042 100644 --- a/src/core/lib/iomgr/timer.c +++ b/src/core/lib/iomgr/timer.c @@ -105,7 +105,8 @@ void grpc_timer_list_init(gpr_timespec now) { void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) { int i; - run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0); + run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, + GRPC_ERROR_CREATE("Timer list shutdown")); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 66879ba7b3..297e8d3102 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -62,7 +62,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, sprintf(name, "workqueue:%p", (void *)(*workqueue)); (*workqueue)->wakeup_read_fd = grpc_fd_create( GRPC_WAKEUP_FD_GET_READ_FD(&(*workqueue)->wakeup_fd), name); - grpc_closure_init(&(*workqueue)->read_closure, on_readable, workqueue); + grpc_closure_init(&(*workqueue)->read_closure, on_readable, *workqueue); grpc_fd_notify_on_read(exec_ctx, (*workqueue)->wakeup_read_fd, &(*workqueue)->read_closure); return GRPC_ERROR_NONE; -- cgit v1.2.3 From 332f1b35d534f6910093729e9ecc2cacf8bb9688 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 May 2016 13:21:21 -0700 Subject: Rename functions --- src/core/ext/client_config/client_channel.c | 20 ++++++------ src/core/ext/client_config/subchannel.c | 2 +- .../ext/client_config/subchannel_call_holder.c | 4 +-- src/core/ext/lb_policy/pick_first/pick_first.c | 26 ++++++++-------- src/core/ext/lb_policy/round_robin/round_robin.c | 12 ++++---- src/core/ext/resolver/dns/native/dns_resolver.c | 4 +-- src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 4 +-- .../chttp2/server/insecure/server_chttp2.c | 2 +- .../transport/chttp2/transport/chttp2_transport.c | 36 +++++++++++----------- src/core/ext/transport/chttp2/transport/writing.c | 2 +- .../transport/cronet/transport/cronet_transport.c | 4 +-- src/core/lib/http/httpcli.c | 2 +- src/core/lib/iomgr/ev_poll_posix.c | 10 +++--- src/core/lib/iomgr/exec_ctx.c | 2 +- src/core/lib/iomgr/exec_ctx.h | 6 ++-- src/core/lib/iomgr/pollset_windows.c | 4 +-- src/core/lib/iomgr/resolve_address_posix.c | 2 +- src/core/lib/iomgr/resolve_address_windows.c | 2 +- src/core/lib/iomgr/socket_windows.c | 4 +-- src/core/lib/iomgr/tcp_client_posix.c | 10 +++--- src/core/lib/iomgr/tcp_client_windows.c | 4 +-- src/core/lib/iomgr/tcp_posix.c | 8 ++--- src/core/lib/iomgr/tcp_server_posix.c | 2 +- src/core/lib/iomgr/tcp_server_windows.c | 2 +- src/core/lib/iomgr/tcp_windows.c | 16 +++++----- src/core/lib/iomgr/timer.c | 8 ++--- src/core/lib/iomgr/workqueue.h | 4 +-- src/core/lib/iomgr/workqueue_posix.c | 2 +- src/core/lib/security/transport/secure_endpoint.c | 4 +-- .../lib/security/transport/server_auth_filter.c | 6 ++-- src/core/lib/surface/call.c | 6 ++-- src/core/lib/surface/lame_client.c | 6 ++-- src/core/lib/surface/server.c | 22 ++++++------- src/core/lib/transport/connectivity_state.c | 10 +++--- src/core/lib/transport/transport.c | 8 ++--- test/core/end2end/fuzzers/api_fuzzer.c | 12 ++++---- test/core/end2end/tests/filter_causes_close.c | 2 +- test/core/internal_api_canaries/iomgr.c | 2 +- test/core/iomgr/workqueue_test.c | 4 +-- test/core/security/credentials_test.c | 12 ++++---- test/core/security/jwt_verifier_test.c | 10 +++--- test/core/util/mock_endpoint.c | 8 ++--- test/core/util/passthru_endpoint.c | 12 ++++---- 43 files changed, 164 insertions(+), 164 deletions(-) (limited to 'src/core/lib/iomgr/workqueue_posix.c') diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index d00b966812..7da998e5e2 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -277,7 +277,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); GPR_ASSERT(op->set_accept_stream == false); if (op->bind_pollset != NULL) { @@ -296,9 +296,9 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->send_ping != NULL) { if (chand->lb_policy == NULL) { - grpc_exec_ctx_push(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("Ping with no load balancing"), - NULL); + grpc_exec_ctx_sched(exec_ctx, op->send_ping, + GRPC_ERROR_CREATE("Ping with no load balancing"), + NULL); } else { grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; @@ -354,11 +354,11 @@ static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error), NULL); } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, cpa->initial_metadata_flags, cpa->connected_subchannel, cpa->on_ready)) { - grpc_exec_ctx_push(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE, NULL); } gpr_free(cpa); } @@ -387,8 +387,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, cpa = closure->cb_arg; if (cpa->connected_subchannel == connected_subchannel) { cpa->connected_subchannel = NULL; - grpc_exec_ctx_push(exec_ctx, cpa->on_ready, - GRPC_ERROR_CREATE("Pick cancelled"), NULL); + grpc_exec_ctx_sched(exec_ctx, cpa->on_ready, + GRPC_ERROR_CREATE("Pick cancelled"), NULL); } } gpr_mu_unlock(&chand->mu_config); @@ -423,8 +423,8 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp, grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { - grpc_exec_ctx_push(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), - NULL); + grpc_exec_ctx_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected"), + NULL); } gpr_mu_unlock(&chand->mu_config); return 0; diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index dd61caea94..1e1b30c4c0 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -290,7 +290,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { - grpc_exec_ctx_push(exec_ctx, grpc_closure_create(subchannel_destroy, c), + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c), GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c index 07b7813482..14022e9095 100644 --- a/src/core/ext/client_config/subchannel_call_holder.c +++ b/src/core/ext/client_config/subchannel_call_holder.c @@ -222,8 +222,8 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, holder->waiting_ops_count = 0; holder->waiting_ops_capacity = 0; GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_exec_ctx_push(exec_ctx, grpc_closure_create(retry_ops, a), - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(retry_ops, a), + GRPC_ERROR_NONE, NULL); } static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index b5c2c426de..deb3f519fa 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -121,7 +121,7 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { *pp->target = NULL; grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); pp = next; } @@ -140,8 +140,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE("Pick Cancelled"), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -166,8 +166,8 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, initial_metadata_flags_eq) { grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Pick Cancelled"), NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, + GRPC_ERROR_CREATE("Pick Cancelled"), NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -306,16 +306,16 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); gpr_atm_rel_store(&p->selected, (gpr_atm)selected); - grpc_exec_ctx_push(exec_ctx, - grpc_closure_create(destroy_subchannels, p), - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, + grpc_closure_create(destroy_subchannels, p), + GRPC_ERROR_NONE, NULL); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = selected; grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( @@ -368,8 +368,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, + NULL); gpr_free(pp); } GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, @@ -421,8 +421,8 @@ static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (selected) { grpc_connected_subchannel_ping(exec_ctx, selected, closure); } else { - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"), - NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected"), + NULL); } } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 95305e9cdd..76d9d83c9b 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -239,7 +239,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CREATE("Channel Shutdown"), NULL); gpr_free(pp); } @@ -267,7 +267,7 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -293,7 +293,7 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -412,7 +412,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } grpc_subchannel_notify_on_state_change( @@ -466,7 +466,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_exec_ctx_push(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, NULL); gpr_free(pp); } @@ -521,7 +521,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_connected_subchannel_ping(exec_ctx, target, closure); } else { gpr_mu_unlock(&p->mu); - grpc_exec_ctx_push(exec_ctx, closure, + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Round Robin not connected"), NULL); } } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index adb9c3af84..e35aaba815 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -111,7 +111,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { } if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_push(exec_ctx, r->next_completion, + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); r->next_completion = NULL; } @@ -227,7 +227,7 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, if (r->resolved_config) { grpc_client_config_ref(r->resolved_config); } - grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; r->published_version = r->resolved_version; } diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 90da6a93a0..1f7cce2f43 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -92,7 +92,7 @@ static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -133,7 +133,7 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "sockaddr"); r->published = 1; *r->target_config = cfg; - grpc_exec_ctx_push(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } } diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index bf4c817e2b..c06d3eb60a 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -75,7 +75,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, grpc_server *server, void *tcpp, grpc_closure *destroy_done) { grpc_tcp_server *tcp = tcpp; grpc_tcp_server_unref(exec_ctx, tcp); - grpc_exec_ctx_push(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, destroy_done, GRPC_ERROR_NONE, NULL); } int grpc_server_add_insecure_http2_port(grpc_server *server, const char *addr) { diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 5264b9e1f8..be517154e6 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -190,8 +190,8 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, and maybe they hold resources that need to be freed */ while (t->global.pings.next != &t->global.pings) { grpc_chttp2_outstanding_ping *ping = t->global.pings.next; - grpc_exec_ctx_push(exec_ctx, ping->on_recv, - GRPC_ERROR_CREATE("Transport closed"), NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, + GRPC_ERROR_CREATE("Transport closed"), NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -643,7 +643,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, t->executor.writing_active = 1; REF_TRANSPORT(t, "writing"); prevent_endpoint_shutdown(t); - grpc_exec_ctx_push(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); } check_read_ops(exec_ctx, &t->global); @@ -907,7 +907,7 @@ void grpc_chttp2_complete_closure_step( stream_global->collecting_stats); stream_global->collecting_stats = NULL; } - grpc_exec_ctx_push(exec_ctx, closure, closure->error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, closure->error, NULL); } *pclosure = NULL; } @@ -1127,7 +1127,7 @@ static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, for (ping = transport_global->pings.next; ping != &transport_global->pings; ping = ping->next) { if (0 == memcmp(opaque_8bytes, ping->id, 8)) { - grpc_exec_ctx_push(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, ping->on_recv, GRPC_ERROR_NONE, NULL); ping->next->prev = ping->prev; ping->prev->next = ping->next; gpr_free(ping); @@ -1160,7 +1160,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, return; } - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( @@ -1234,8 +1234,8 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer_publish( &stream_global->received_initial_metadata, stream_global->recv_initial_metadata); - grpc_exec_ctx_push(exec_ctx, stream_global->recv_initial_metadata_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_initial_metadata_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { @@ -1248,13 +1248,13 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); GPR_ASSERT(*stream_global->recv_message != NULL); - grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } else if (stream_global->published_trailing_metadata) { *stream_global->recv_message = NULL; - grpc_exec_ctx_push(exec_ctx, stream_global->recv_message_ready, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, stream_global->recv_message_ready, + GRPC_ERROR_NONE, NULL); stream_global->recv_message_ready = NULL; } } @@ -1643,7 +1643,7 @@ static void reading_action_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); grpc_chttp2_prepare_to_read(transport_global, transport_parsing); - grpc_exec_ctx_push(exec_ctx, &t->parsing_action, error, NULL); + grpc_exec_ctx_sched(exec_ctx, &t->parsing_action, error, NULL); } else { post_reading_action_locked(exec_ctx, t, s_unused, arg); } @@ -1870,10 +1870,10 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, } if (bs->slices.count > 0) { *arg->slice = gpr_slice_buffer_take_first(&bs->slices); - grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_NONE, NULL); } else if (bs->error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), - NULL); + grpc_exec_ctx_sched(exec_ctx, arg->on_complete, GRPC_ERROR_REF(bs->error), + NULL); } else { bs->on_next = arg->on_complete; bs->next = arg->slice; @@ -1930,7 +1930,7 @@ static void incoming_byte_stream_push_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs = arg->byte_stream; if (bs->on_next != NULL) { *bs->next = arg->slice; - grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_NONE, NULL); bs->on_next = NULL; } else { gpr_slice_buffer_add(&bs->slices, arg->slice); @@ -1968,7 +1968,7 @@ static void incoming_byte_stream_finished_failed_locked( grpc_chttp2_incoming_byte_stream *bs = a->bs; grpc_error *error = a->error; gpr_free(a); - grpc_exec_ctx_push(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, bs->on_next, GRPC_ERROR_REF(error), NULL); bs->on_next = NULL; GRPC_ERROR_UNREF(bs->error); bs->error = error; diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index ca56debc84..ed6536b3c7 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -187,7 +187,7 @@ void grpc_chttp2_perform_writes( grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf, &transport_writing->done_cb); } else { - grpc_exec_ctx_push(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index ea0f2bcba4..c5318f092e 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -155,11 +155,11 @@ static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt, static void enqueue_callbacks(grpc_closure *callback_list[]) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; if (callback_list[0]) { - grpc_exec_ctx_push(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, callback_list[0], GRPC_ERROR_NONE, NULL); callback_list[0] = NULL; } if (callback_list[1]) { - grpc_exec_ctx_push(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, callback_list[1], GRPC_ERROR_NONE, NULL); callback_list[1] = NULL; } grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c index 4bda734bfa..bcd9c8c821 100644 --- a/src/core/lib/http/httpcli.c +++ b/src/core/lib/http/httpcli.c @@ -102,7 +102,7 @@ static void finish(grpc_exec_ctx *exec_ctx, internal_request *req, grpc_error *error) { grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set, req->pollset); - grpc_exec_ctx_push(exec_ctx, req->on_done, error, NULL); + grpc_exec_ctx_sched(exec_ctx, req->on_done, error, NULL); grpc_http_parser_destroy(&req->parser); if (req->addresses != NULL) { grpc_resolved_addresses_destroy(req->addresses); diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 52efb0d6b2..e5eb16c3be 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -373,7 +373,7 @@ static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { if (!fd->released) { close(fd->fd); } - grpc_exec_ctx_push(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE, NULL); } static int fd_wrapped_fd(grpc_fd *fd) { @@ -438,8 +438,8 @@ static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, } else if (*st == CLOSURE_READY) { /* already ready ==> queue the closure to run immediately */ *st = CLOSURE_NOT_READY; - grpc_exec_ctx_push(exec_ctx, closure, fd_shutdown_error(fd->shutdown), - NULL); + grpc_exec_ctx_sched(exec_ctx, closure, fd_shutdown_error(fd->shutdown), + NULL); maybe_wake_one_watcher_locked(fd); } else { /* upcallptr was set to a different closure. This is an error! */ @@ -462,7 +462,7 @@ static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd, return 0; } else { /* waiting ==> queue closure */ - grpc_exec_ctx_push(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); + grpc_exec_ctx_sched(exec_ctx, *st, fd_shutdown_error(fd->shutdown), NULL); *st = CLOSURE_NOT_READY; return 1; } @@ -811,7 +811,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } pollset->fd_count = 0; - grpc_exec_ctx_push(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } static void work_combine_error(grpc_error **composite, grpc_error *error) { diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index fcf39ffd54..9652f826c1 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -82,7 +82,7 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { grpc_exec_ctx_flush(exec_ctx); } -void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, +void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null) { GPR_ASSERT(offload_target_or_null == NULL); diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 617f48f1b8..38f27d9b13 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -94,9 +94,9 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); /** Add a closure to be executed at the next flush/finish point */ -void grpc_exec_ctx_push(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error, - grpc_workqueue *offload_target_or_null); +void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, + grpc_error *error, + grpc_workqueue *offload_target_or_null); /** Returns true if we'd like to leave this execution context as soon as possible: useful for deciding whether to do something more or not depending on outside context */ diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index 5882d8d71d..d9f0f05ba6 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -109,7 +109,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); } else { pollset->on_shutdown = closure; } @@ -167,7 +167,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } if (pollset->shutting_down && pollset->on_shutdown != NULL) { - grpc_exec_ctx_push(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, + grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, NULL); pollset->on_shutdown = NULL; } diff --git a/src/core/lib/iomgr/resolve_address_posix.c b/src/core/lib/iomgr/resolve_address_posix.c index 9a320dc952..4e9f978584 100644 --- a/src/core/lib/iomgr/resolve_address_posix.c +++ b/src/core/lib/iomgr/resolve_address_posix.c @@ -163,7 +163,7 @@ typedef struct { static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, grpc_error *error) { request *r = rp; - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, r->on_done, grpc_blocking_resolve_address(r->name, r->default_port, r->addrs_out), NULL); diff --git a/src/core/lib/iomgr/resolve_address_windows.c b/src/core/lib/iomgr/resolve_address_windows.c index 22a9feb72d..fac4d13a27 100644 --- a/src/core/lib/iomgr/resolve_address_windows.c +++ b/src/core/lib/iomgr/resolve_address_windows.c @@ -154,7 +154,7 @@ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, } else { GRPC_ERROR_REF(error); } - grpc_exec_ctx_push(exec_ctx, r->on_done, error, NULL); + grpc_exec_ctx_sched(exec_ctx, r->on_done, error, NULL); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r); diff --git a/src/core/lib/iomgr/socket_windows.c b/src/core/lib/iomgr/socket_windows.c index 8919452751..fbc1bb5301 100644 --- a/src/core/lib/iomgr/socket_windows.c +++ b/src/core/lib/iomgr/socket_windows.c @@ -123,7 +123,7 @@ static void socket_notify_on_iocp(grpc_exec_ctx *exec_ctx, gpr_mu_lock(&socket->state_mu); if (info->has_pending_iocp) { info->has_pending_iocp = 0; - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); } else { info->closure = closure; } @@ -146,7 +146,7 @@ void grpc_socket_become_ready(grpc_exec_ctx *exec_ctx, grpc_winsocket *socket, GPR_ASSERT(!info->has_pending_iocp); gpr_mu_lock(&socket->state_mu); if (info->closure) { - grpc_exec_ctx_push(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, info->closure, GRPC_ERROR_NONE, NULL); info->closure = NULL; } else { info->has_pending_iocp = 1; diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 9c4ca1617f..078696c126 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -221,7 +221,7 @@ finish: gpr_free(ac->addr_str); gpr_free(ac); } - grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); } static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, @@ -250,7 +250,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, error = grpc_create_dualstack_socket(addr, SOCK_STREAM, 0, &dsmode, &fd); if (error != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); return; } if (dsmode == GRPC_DSMODE_IPV4) { @@ -260,7 +260,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, addr_len = sizeof(addr4_copy); } if ((error = prepare_socket(addr, fd)) != GRPC_ERROR_NONE) { - grpc_exec_ctx_push(exec_ctx, closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); return; } @@ -276,13 +276,13 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (err >= 0) { *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); - grpc_exec_ctx_push(exec_ctx, closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_NONE, NULL); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); - grpc_exec_ctx_push(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), + grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), NULL); goto done; } diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c index dbf116422f..6134d4476b 100644 --- a/src/core/lib/iomgr/tcp_client_windows.c +++ b/src/core/lib/iomgr/tcp_client_windows.c @@ -121,7 +121,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, grpc_error *error) { async_connect_unlock_and_cleanup(ac, socket); /* If the connection was aborted, the callback was already called when the deadline was met. */ - grpc_exec_ctx_push(exec_ctx, on_done, error, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, error, NULL); } /* Tries to issue one async connection, then schedules both an IOCP @@ -225,7 +225,7 @@ failure: } else if (sock != INVALID_SOCKET) { closesocket(sock); } - grpc_exec_ctx_push(exec_ctx, on_done, final_error, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, final_error, NULL); } #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 48a19bed84..2f7c50ce35 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -175,7 +175,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } #define MAX_READ_IOVEC 4 @@ -277,7 +277,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->finished_edge = false; grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure); } else { - grpc_exec_ctx_push(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &tcp->read_closure, GRPC_ERROR_NONE, NULL); } } @@ -410,7 +410,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (buf->length == 0) { GPR_TIMER_END("tcp_write", 0); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); return; } tcp->outgoing_buffer = buf; @@ -422,7 +422,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure); } else { - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } GPR_TIMER_END("tcp_write", 0); diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index e5c6a3c549..6b10d3ebeb 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -154,7 +154,7 @@ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_push(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); } gpr_mu_destroy(&s->mu); diff --git a/src/core/lib/iomgr/tcp_server_windows.c b/src/core/lib/iomgr/tcp_server_windows.c index 481111c9f9..b4f28a4c3b 100644 --- a/src/core/lib/iomgr/tcp_server_windows.c +++ b/src/core/lib/iomgr/tcp_server_windows.c @@ -121,7 +121,7 @@ grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { if (s->shutdown_complete != NULL) { - grpc_exec_ctx_push(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); } /* Now that the accepts have been aborted, we can destroy the sockets. diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 88c9354f6e..383af9f12b 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -183,7 +183,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { tcp->read_cb = NULL; TCP_UNREF(tcp, "read"); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -197,7 +197,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, WSABUF buffer; if (tcp->shutting_down) { - grpc_exec_ctx_push(exec_ctx, cb, + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -222,7 +222,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, /* Did we get data immediately ? Yay. */ if (info->wsa_error != WSAEWOULDBLOCK) { info->bytes_transfered = bytes_read; - grpc_exec_ctx_push(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_ERROR_NONE, NULL); return; } @@ -235,7 +235,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; - grpc_exec_ctx_push(exec_ctx, &tcp->on_read, + grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); return; } @@ -267,7 +267,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } TCP_UNREF(tcp, "write"); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } /* Initiates a write. */ @@ -285,7 +285,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, size_t len; if (tcp->shutting_down) { - grpc_exec_ctx_push(exec_ctx, cb, + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -317,7 +317,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_error *error = status == 0 ? GRPC_ERROR_NONE : GRPC_WSA_ERROR(info->wsa_error, "WSASend"); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); return; } @@ -334,7 +334,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), NULL); return; } diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c index b46c6df287..7b0ef550a7 100644 --- a/src/core/lib/iomgr/timer.c +++ b/src/core/lib/iomgr/timer.c @@ -186,7 +186,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (!g_initialized) { timer->triggered = 1; - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, &timer->closure, GRPC_ERROR_CREATE("Attempt to create timer before initialization"), NULL); @@ -195,7 +195,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, if (gpr_time_cmp(deadline, now) <= 0) { timer->triggered = 1; - grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_NONE, NULL); return; } @@ -247,7 +247,7 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) { shard_type *shard = &g_shards[shard_idx(timer)]; gpr_mu_lock(&shard->mu); if (!timer->triggered) { - grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_CANCELLED, NULL); timer->triggered = 1; if (timer->heap_index == INVALID_HEAP_INDEX) { list_remove(timer); @@ -313,7 +313,7 @@ static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard, grpc_timer *timer; gpr_mu_lock(&shard->mu); while ((timer = pop_one(shard, now))) { - grpc_exec_ctx_push(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, &timer->closure, GRPC_ERROR_REF(error), NULL); n++; } *new_min_deadline = compute_min_deadline(shard); diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 6e8b1218be..7b6402e3a4 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -78,7 +78,7 @@ void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset); /** Add a work item to a workqueue */ -void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error); +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error); #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_H */ diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 297e8d3102..d2e3b5acb3 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -137,7 +137,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } } -void grpc_workqueue_push(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error) { grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 23b51f33f6..072d468415 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -138,7 +138,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep, } } ep->read_buffer = NULL; - grpc_exec_ctx_push(exec_ctx, ep->read_cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, ep->read_cb, error, NULL); SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read"); } @@ -319,7 +319,7 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep, if (result != TSI_OK) { /* TODO(yangg) do different things according to the error type? */ gpr_slice_buffer_reset_and_unref(&ep->output_buffer); - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, cb, grpc_set_tsi_error_bits(GRPC_ERROR_CREATE("Wrap failed"), result), NULL); diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index d45ec4020c..acbce769fb 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -128,7 +128,7 @@ static void on_md_processing_done( grpc_metadata_batch_filter(calld->recv_initial_metadata, remove_consumed_md, elem); grpc_metadata_array_destroy(&calld->md); - grpc_exec_ctx_push(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, GRPC_ERROR_NONE, NULL); } else { gpr_slice message; grpc_transport_stream_op close_op; @@ -146,7 +146,7 @@ static void on_md_processing_done( calld->transport_op.send_trailing_metadata = NULL; grpc_transport_stream_op_add_close(&close_op, status, &message); grpc_call_next_op(&exec_ctx, elem, &close_op); - grpc_exec_ctx_push(&exec_ctx, calld->on_done_recv, + grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, grpc_error_set_int(GRPC_ERROR_CREATE(error_details), GRPC_ERROR_INT_GRPC_STATUS, status), NULL); @@ -169,7 +169,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, return; } } - grpc_exec_ctx_push(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), + grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), NULL); } diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 3915ba57a2..09167efb5c 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -743,7 +743,7 @@ static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c, cc->call = c; cc->status = status; GRPC_CALL_INTERNAL_REF(c, "cancel"); - grpc_exec_ctx_push(exec_ctx, &cc->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &cc->closure, GRPC_ERROR_NONE, NULL); return GRPC_CALL_OK; } @@ -970,7 +970,7 @@ static void post_batch_completion(grpc_exec_ctx *exec_ctx, grpc_call *call = bctl->call; if (bctl->is_notify_tag_closure) { /* unrefs bctl->error */ - grpc_exec_ctx_push(exec_ctx, bctl->notify_tag, bctl->error, NULL); + grpc_exec_ctx_sched(exec_ctx, bctl->notify_tag, bctl->error, NULL); gpr_mu_lock(&call->mu); bctl->call->used_batches = (uint8_t)(bctl->call->used_batches & @@ -1129,7 +1129,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx, grpc_closure *saved_rsr_closure = grpc_closure_create( receiving_stream_ready, call->saved_receiving_stream_ready_bctlp); call->saved_receiving_stream_ready_bctlp = NULL; - grpc_exec_ctx_push(exec_ctx, saved_rsr_closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, saved_rsr_closure, error, NULL); } gpr_mu_unlock(&call->mu); diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 5c15b676f8..8a0dcdcc07 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -94,14 +94,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->on_connectivity_state_change) { GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; - grpc_exec_ctx_push(exec_ctx, op->on_connectivity_state_change, + grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, GRPC_ERROR_NONE, NULL); } if (op->on_consumed != NULL) { - grpc_exec_ctx_push(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); } if (op->send_ping != NULL) { - grpc_exec_ctx_push(exec_ctx, op->send_ping, + grpc_exec_ctx_sched(exec_ctx, op->send_ping, GRPC_ERROR_CREATE("lame client channel"), NULL); } GRPC_ERROR_UNREF(op->disconnect_with_error); diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index f261321750..eae70717a5 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -344,8 +344,8 @@ static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); } } @@ -526,7 +526,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, error, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, error, NULL); return; } @@ -571,8 +571,8 @@ static void finish_start_new_rpc( calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, + NULL); return; } @@ -757,8 +757,8 @@ static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, GRPC_ERROR_CREATE_REFERENCING("Missing :authority or :path", &error, 1); } - grpc_exec_ctx_push(exec_ctx, calld->on_done_recv_initial_metadata, error, - NULL); + grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv_initial_metadata, error, + NULL); } static void server_mutate_op(grpc_call_element *elem, @@ -794,8 +794,8 @@ static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr, calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, GRPC_ERROR_NONE, - NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else if (calld->state == PENDING) { calld->state = ZOMBIED; gpr_mu_unlock(&calld->mu_state); @@ -1339,8 +1339,8 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, grpc_closure_init( &calld->kill_zombie_closure, kill_zombie, grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0)); - grpc_exec_ctx_push(exec_ctx, &calld->kill_zombie_closure, - GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &calld->kill_zombie_closure, + GRPC_ERROR_NONE, NULL); } else { GPR_ASSERT(calld->state == PENDING); calld->state = ACTIVATED; diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 3286af9fb2..f714e385de 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -79,7 +79,7 @@ void grpc_connectivity_state_destroy(grpc_exec_ctx *exec_ctx, } else { error = GRPC_ERROR_CREATE("Shutdown connectivity owner"); } - grpc_exec_ctx_push(exec_ctx, w->notify, error, NULL); + grpc_exec_ctx_sched(exec_ctx, w->notify, error, NULL); gpr_free(w); } GRPC_ERROR_UNREF(tracker->current_error); @@ -114,7 +114,7 @@ int grpc_connectivity_state_notify_on_state_change( if (current == NULL) { grpc_connectivity_state_watcher *w = tracker->watchers; if (w != NULL && w->notify == notify) { - grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); tracker->watchers = w->next; gpr_free(w); return 0; @@ -122,7 +122,7 @@ int grpc_connectivity_state_notify_on_state_change( while (w != NULL) { grpc_connectivity_state_watcher *rm_candidate = w->next; if (rm_candidate != NULL && rm_candidate->notify == notify) { - grpc_exec_ctx_push(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_CANCELLED, NULL); w->next = w->next->next; gpr_free(rm_candidate); return 0; @@ -133,7 +133,7 @@ int grpc_connectivity_state_notify_on_state_change( } else { if (tracker->current_state != *current) { *current = tracker->current_state; - grpc_exec_ctx_push(exec_ctx, notify, + grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_REF(tracker->current_error), NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); @@ -179,7 +179,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; - grpc_exec_ctx_push(exec_ctx, w->notify, + grpc_exec_ctx_sched(exec_ctx, w->notify, GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index fdf0f4b2aa..10d58153af 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -60,7 +60,7 @@ void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount) { #endif if (gpr_unref(&refcount->refs)) { - grpc_exec_ctx_push(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, &refcount->destroy, GRPC_ERROR_NONE, NULL); } } @@ -146,11 +146,11 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx, void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, grpc_error *error) { - grpc_exec_ctx_push(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), + grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), NULL); - grpc_exec_ctx_push(exec_ctx, op->recv_initial_metadata_ready, + grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready, GRPC_ERROR_REF(error), NULL); - grpc_exec_ctx_push(exec_ctx, op->on_complete, error, NULL); + grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL); } void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op, diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c index aa715c841c..d00a67ffc9 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.c +++ b/test/core/end2end/fuzzers/api_fuzzer.c @@ -200,9 +200,9 @@ static void finish_resolve(grpc_exec_ctx *exec_ctx, void *arg, addrs->addrs = gpr_malloc(sizeof(*addrs->addrs)); addrs->addrs[0].len = 0; *r->addrs = addrs; - grpc_exec_ctx_push(exec_ctx, r->on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, r->on_done, GRPC_ERROR_NONE, NULL); } else { - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, r->on_done, GRPC_ERROR_CREATE_REFERENCING("Resolution failed", &error, 1), NULL); } @@ -247,7 +247,7 @@ static void do_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { future_connect *fc = arg; if (error != GRPC_ERROR_NONE) { *fc->ep = NULL; - grpc_exec_ctx_push(exec_ctx, fc->closure, GRPC_ERROR_REF(error), NULL); + grpc_exec_ctx_sched(exec_ctx, fc->closure, GRPC_ERROR_REF(error), NULL); } else if (g_server != NULL) { grpc_endpoint *client; grpc_endpoint *server; @@ -259,7 +259,7 @@ static void do_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_server_setup_transport(exec_ctx, g_server, transport, NULL, NULL); grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); - grpc_exec_ctx_push(exec_ctx, fc->closure, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, fc->closure, GRPC_ERROR_NONE, NULL); } else { sched_connect(exec_ctx, fc->closure, fc->ep, fc->deadline); } @@ -270,8 +270,8 @@ static void sched_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, gpr_timespec deadline) { if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) < 0) { *ep = NULL; - grpc_exec_ctx_push(exec_ctx, closure, - GRPC_ERROR_CREATE("Connect deadline exceeded"), NULL); + grpc_exec_ctx_sched(exec_ctx, closure, + GRPC_ERROR_CREATE("Connect deadline exceeded"), NULL); return; } diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c index 8fd847b878..71c0dbfb58 100644 --- a/test/core/end2end/tests/filter_causes_close.c +++ b/test/core/end2end/tests/filter_causes_close.c @@ -216,7 +216,7 @@ static void recv_im_ready(grpc_exec_ctx *exec_ctx, void *arg, &message); grpc_call_next_op(exec_ctx, elem, &op); } - grpc_exec_ctx_push( + grpc_exec_ctx_sched( exec_ctx, calld->recv_im_ready, GRPC_ERROR_CREATE_REFERENCING("Forced call to close", &error, 1), NULL); } diff --git a/test/core/internal_api_canaries/iomgr.c b/test/core/internal_api_canaries/iomgr.c index 71a1bb1436..5e86c42309 100644 --- a/test/core/internal_api_canaries/iomgr.c +++ b/test/core/internal_api_canaries/iomgr.c @@ -72,7 +72,7 @@ static void test_code(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_exec_ctx_flush(&exec_ctx); grpc_exec_ctx_finish(&exec_ctx); - grpc_exec_ctx_push(&exec_ctx, &closure, GRPC_ERROR_CREATE("Foo"), NULL); + grpc_exec_ctx_sched(&exec_ctx, &closure, GRPC_ERROR_CREATE("Foo"), NULL); grpc_exec_ctx_enqueue_list(&exec_ctx, &closure_list, NULL); /* endpoint.h */ diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c index 9a359cd799..76ecfae74b 100644 --- a/test/core/iomgr/workqueue_test.c +++ b/test/core/iomgr/workqueue_test.c @@ -73,7 +73,7 @@ static void test_add_closure(void) { grpc_pollset_worker *worker = NULL; grpc_closure_init(&c, must_succeed, &done); - grpc_workqueue_push(&exec_ctx, wq, &c, GRPC_ERROR_NONE); + grpc_workqueue_enqueue(&exec_ctx, wq, &c, GRPC_ERROR_NONE); grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); gpr_mu_lock(g_mu); @@ -103,7 +103,7 @@ static void test_flush(void) { grpc_pollset_worker *worker = NULL; grpc_closure_init(&c, must_succeed, &done); - grpc_exec_ctx_push(&exec_ctx, &c, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(&exec_ctx, &c, GRPC_ERROR_NONE, NULL); grpc_workqueue_flush(&exec_ctx, wq); grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c index ec417b84dc..e703dbdeb6 100644 --- a/test/core/security/credentials_test.c +++ b/test/core/security/credentials_test.c @@ -560,7 +560,7 @@ static int compute_engine_httpcli_get_success_override( grpc_httpcli_response *response) { validate_compute_engine_http_request(request); *response = http_response(200, valid_oauth2_json_response); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -570,7 +570,7 @@ static int compute_engine_httpcli_get_failure_override( grpc_httpcli_response *response) { validate_compute_engine_http_request(request); *response = http_response(403, "Not Authorized."); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -663,7 +663,7 @@ static int refresh_token_httpcli_post_success( grpc_closure *on_done, grpc_httpcli_response *response) { validate_refresh_token_http_request(request, body, body_size); *response = http_response(200, valid_oauth2_json_response); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -673,7 +673,7 @@ static int refresh_token_httpcli_post_failure( grpc_closure *on_done, grpc_httpcli_response *response) { validate_refresh_token_http_request(request, body, body_size); *response = http_response(403, "Not Authorized."); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -915,7 +915,7 @@ static int default_creds_gce_detection_httpcli_get_success_override( response->hdrs = headers; GPR_ASSERT(strcmp(request->http.path, "/") == 0); GPR_ASSERT(strcmp(request->host, "metadata.google.internal") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -973,7 +973,7 @@ static int default_creds_gce_detection_httpcli_get_failure_override( GPR_ASSERT(strcmp(request->http.path, "/") == 0); GPR_ASSERT(strcmp(request->host, "metadata.google.internal") == 0); *response = http_response(200, ""); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } diff --git a/test/core/security/jwt_verifier_test.c b/test/core/security/jwt_verifier_test.c index 23b46958f4..36b331a777 100644 --- a/test/core/security/jwt_verifier_test.c +++ b/test/core/security/jwt_verifier_test.c @@ -294,7 +294,7 @@ static int httpcli_get_google_keys_for_email( "/robot/v1/metadata/x509/" "777-abaslkan11hlb6nmim3bpspl31ud@developer." "gserviceaccount.com") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -338,7 +338,7 @@ static int httpcli_get_custom_keys_for_email( GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "keys.bar.com") == 0); GPR_ASSERT(strcmp(request->http.path, "/jwk/foo@bar.com") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -372,7 +372,7 @@ static int httpcli_get_jwk_set(grpc_exec_ctx *exec_ctx, GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0); GPR_ASSERT(strcmp(request->http.path, "/oauth2/v3/certs") == 0); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -387,7 +387,7 @@ static int httpcli_get_openid_config(grpc_exec_ctx *exec_ctx, GPR_ASSERT(strcmp(request->http.path, GRPC_OPENID_CONFIG_URL_SUFFIX) == 0); grpc_httpcli_set_override(httpcli_get_jwk_set, httpcli_post_should_not_be_called); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } @@ -427,7 +427,7 @@ static int httpcli_get_bad_json(grpc_exec_ctx *exec_ctx, grpc_httpcli_response *response) { *response = http_response(200, gpr_strdup("{\"bad\": \"stuff\"}")); GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); - grpc_exec_ctx_push(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, on_done, GRPC_ERROR_NONE, NULL); return 1; } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index deef68ef59..e3df5b1841 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -51,7 +51,7 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_mu_lock(&m->mu); if (m->read_buffer.count > 0) { gpr_slice_buffer_swap(&m->read_buffer, slices); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); } else { m->on_read = cb; m->on_read_out = slices; @@ -65,7 +65,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (size_t i = 0; i < slices->count; i++) { m->on_write(slices->slices[i]); } - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); } static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -78,7 +78,7 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_mock_endpoint *m = (grpc_mock_endpoint *)ep; gpr_mu_lock(&m->mu); if (m->on_read) { - grpc_exec_ctx_push(exec_ctx, m->on_read, + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL); m->on_read = NULL; } @@ -116,7 +116,7 @@ void grpc_mock_endpoint_put_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_mu_lock(&m->mu); if (m->on_read != NULL) { gpr_slice_buffer_add(m->on_read_out, slice); - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); m->on_read = NULL; } else { gpr_slice_buffer_add(&m->read_buffer, slice); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index 93753be251..bf897d8f7e 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -59,11 +59,11 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, half *m = (half *)ep; gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_CREATE("Already shutdown"), + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("Already shutdown"), NULL); } else if (m->read_buffer.count > 0) { gpr_slice_buffer_swap(&m->read_buffer, slices); - grpc_exec_ctx_push(exec_ctx, cb, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); } else { m->on_read = cb; m->on_read_out = slices; @@ -87,7 +87,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, for (size_t i = 0; i < slices->count; i++) { gpr_slice_buffer_add(m->on_read_out, gpr_slice_ref(slices->slices[i])); } - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_NONE, NULL); m->on_read = NULL; } else { for (size_t i = 0; i < slices->count; i++) { @@ -95,7 +95,7 @@ static void me_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } } gpr_mu_unlock(&m->parent->mu); - grpc_exec_ctx_push(exec_ctx, cb, error, NULL); + grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } static void me_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -109,13 +109,13 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { gpr_mu_lock(&m->parent->mu); m->parent->shutdown = true; if (m->on_read) { - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), NULL); m->on_read = NULL; } m = other_half(m); if (m->on_read) { - grpc_exec_ctx_push(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), + grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), NULL); m->on_read = NULL; } -- cgit v1.2.3 From 77c983dc87dcf19b3e869247f90063ef4c10af3e Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Tue, 24 May 2016 13:23:14 -0700 Subject: clang-format --- src/core/ext/client_config/subchannel.c | 2 +- src/core/ext/lb_policy/round_robin/round_robin.c | 12 +++++++----- src/core/ext/resolver/dns/native/dns_resolver.c | 2 +- src/core/ext/transport/chttp2/transport/writing.c | 2 +- src/core/lib/iomgr/exec_ctx.c | 4 ++-- src/core/lib/iomgr/pollset_windows.c | 2 +- src/core/lib/iomgr/tcp_client_posix.c | 2 +- src/core/lib/iomgr/tcp_windows.c | 8 ++++---- src/core/lib/iomgr/workqueue_posix.c | 2 +- src/core/lib/security/transport/server_auth_filter.c | 8 ++++---- src/core/lib/surface/lame_client.c | 4 ++-- src/core/lib/transport/connectivity_state.c | 4 ++-- src/core/lib/transport/transport.c | 4 ++-- test/core/util/mock_endpoint.c | 2 +- test/core/util/passthru_endpoint.c | 6 +++--- 15 files changed, 33 insertions(+), 31 deletions(-) (limited to 'src/core/lib/iomgr/workqueue_posix.c') diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c index 1e1b30c4c0..c6c7b7a3a0 100644 --- a/src/core/ext/client_config/subchannel.c +++ b/src/core/ext/client_config/subchannel.c @@ -291,7 +291,7 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c), - GRPC_ERROR_NONE, NULL); + GRPC_ERROR_NONE, NULL); } } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 76d9d83c9b..42356d8b0b 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -240,7 +240,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { p->pending_picks = pp->next; *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, - GRPC_ERROR_CREATE("Channel Shutdown"), NULL); + GRPC_ERROR_CREATE("Channel Shutdown"), NULL); gpr_free(pp); } grpc_connectivity_state_set( @@ -267,7 +267,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, + NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -293,7 +294,8 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties, pp->pollset); *pp->target = NULL; - grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_CANCELLED, + NULL); gpr_free(pp); } else { pp->next = p->pending_picks; @@ -467,7 +469,7 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, p->pending_picks = pp->next; *pp->target = NULL; grpc_exec_ctx_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE, - NULL); + NULL); gpr_free(pp); } } else { @@ -522,7 +524,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } else { gpr_mu_unlock(&p->mu); grpc_exec_ctx_sched(exec_ctx, closure, - GRPC_ERROR_CREATE("Round Robin not connected"), NULL); + GRPC_ERROR_CREATE("Round Robin not connected"), NULL); } } diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index e35aaba815..d6e7c89ef8 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -112,7 +112,7 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { if (r->next_completion != NULL) { *r->target_config = NULL; grpc_exec_ctx_sched(exec_ctx, r->next_completion, - GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); + GRPC_ERROR_CREATE("Resolver Shutdown"), NULL); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index ed6536b3c7..add7678182 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -188,7 +188,7 @@ void grpc_chttp2_perform_writes( &transport_writing->done_cb); } else { grpc_exec_ctx_sched(exec_ctx, &transport_writing->done_cb, GRPC_ERROR_NONE, - NULL); + NULL); } } diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index 9652f826c1..c44aafcddf 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -83,8 +83,8 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { } void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, - grpc_error *error, - grpc_workqueue *offload_target_or_null) { + grpc_error *error, + grpc_workqueue *offload_target_or_null) { GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_append(&exec_ctx->closure_list, closure, error); } diff --git a/src/core/lib/iomgr/pollset_windows.c b/src/core/lib/iomgr/pollset_windows.c index d9f0f05ba6..626dd784b3 100644 --- a/src/core/lib/iomgr/pollset_windows.c +++ b/src/core/lib/iomgr/pollset_windows.c @@ -168,7 +168,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->shutting_down && pollset->on_shutdown != NULL) { grpc_exec_ctx_sched(exec_ctx, pollset->on_shutdown, GRPC_ERROR_NONE, - NULL); + NULL); pollset->on_shutdown = NULL; } goto done; diff --git a/src/core/lib/iomgr/tcp_client_posix.c b/src/core/lib/iomgr/tcp_client_posix.c index 078696c126..80c7a3f128 100644 --- a/src/core/lib/iomgr/tcp_client_posix.c +++ b/src/core/lib/iomgr/tcp_client_posix.c @@ -283,7 +283,7 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, if (errno != EWOULDBLOCK && errno != EINPROGRESS) { grpc_fd_orphan(exec_ctx, fdobj, NULL, NULL, "tcp_client_connect_error"); grpc_exec_ctx_sched(exec_ctx, closure, GRPC_OS_ERROR(errno, "connect"), - NULL); + NULL); goto done; } diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 383af9f12b..b2003675be 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -198,7 +198,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (tcp->shutting_down) { grpc_exec_ctx_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); + GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -236,7 +236,7 @@ static void win_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (wsa_error != WSA_IO_PENDING) { info->wsa_error = wsa_error; grpc_exec_ctx_sched(exec_ctx, &tcp->on_read, - GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); + GRPC_WSA_ERROR(info->wsa_error, "WSARecv"), NULL); return; } } @@ -286,7 +286,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (tcp->shutting_down) { grpc_exec_ctx_sched(exec_ctx, cb, - GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); + GRPC_ERROR_CREATE("TCP socket is shutting down"), NULL); return; } @@ -335,7 +335,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (wsa_error != WSA_IO_PENDING) { TCP_UNREF(tcp, "write"); grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), - NULL); + NULL); return; } } diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index d2e3b5acb3..45e0f6063b 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -138,7 +138,7 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { } void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, - grpc_closure *closure, grpc_error *error) { + grpc_closure *closure, grpc_error *error) { grpc_error *push_error = GRPC_ERROR_NONE; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index acbce769fb..34120aba42 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -147,9 +147,9 @@ static void on_md_processing_done( grpc_transport_stream_op_add_close(&close_op, status, &message); grpc_call_next_op(&exec_ctx, elem, &close_op); grpc_exec_ctx_sched(&exec_ctx, calld->on_done_recv, - grpc_error_set_int(GRPC_ERROR_CREATE(error_details), - GRPC_ERROR_INT_GRPC_STATUS, status), - NULL); + grpc_error_set_int(GRPC_ERROR_CREATE(error_details), + GRPC_ERROR_INT_GRPC_STATUS, status), + NULL); } grpc_exec_ctx_finish(&exec_ctx); @@ -170,7 +170,7 @@ static void auth_on_recv(grpc_exec_ctx *exec_ctx, void *user_data, } } grpc_exec_ctx_sched(exec_ctx, calld->on_done_recv, GRPC_ERROR_REF(error), - NULL); + NULL); } static void set_recv_ops_md_callbacks(grpc_call_element *elem, diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 8a0dcdcc07..6c94de555a 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -95,14 +95,14 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE); *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE; grpc_exec_ctx_sched(exec_ctx, op->on_connectivity_state_change, - GRPC_ERROR_NONE, NULL); + GRPC_ERROR_NONE, NULL); } if (op->on_consumed != NULL) { grpc_exec_ctx_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE, NULL); } if (op->send_ping != NULL) { grpc_exec_ctx_sched(exec_ctx, op->send_ping, - GRPC_ERROR_CREATE("lame client channel"), NULL); + GRPC_ERROR_CREATE("lame client channel"), NULL); } GRPC_ERROR_UNREF(op->disconnect_with_error); } diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index f714e385de..062d0b3742 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -134,7 +134,7 @@ int grpc_connectivity_state_notify_on_state_change( if (tracker->current_state != *current) { *current = tracker->current_state; grpc_exec_ctx_sched(exec_ctx, notify, - GRPC_ERROR_REF(tracker->current_error), NULL); + GRPC_ERROR_REF(tracker->current_error), NULL); } else { grpc_connectivity_state_watcher *w = gpr_malloc(sizeof(*w)); w->current = current; @@ -180,7 +180,7 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, *w->current = tracker->current_state; tracker->watchers = w->next; grpc_exec_ctx_sched(exec_ctx, w->notify, - GRPC_ERROR_REF(tracker->current_error), NULL); + GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); } } diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c index 10d58153af..ec6833e323 100644 --- a/src/core/lib/transport/transport.c +++ b/src/core/lib/transport/transport.c @@ -147,9 +147,9 @@ void grpc_transport_stream_op_finish_with_failure(grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op, grpc_error *error) { grpc_exec_ctx_sched(exec_ctx, op->recv_message_ready, GRPC_ERROR_REF(error), - NULL); + NULL); grpc_exec_ctx_sched(exec_ctx, op->recv_initial_metadata_ready, - GRPC_ERROR_REF(error), NULL); + GRPC_ERROR_REF(error), NULL); grpc_exec_ctx_sched(exec_ctx, op->on_complete, error, NULL); } diff --git a/test/core/util/mock_endpoint.c b/test/core/util/mock_endpoint.c index e3df5b1841..ed9545e9df 100644 --- a/test/core/util/mock_endpoint.c +++ b/test/core/util/mock_endpoint.c @@ -79,7 +79,7 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { gpr_mu_lock(&m->mu); if (m->on_read) { grpc_exec_ctx_sched(exec_ctx, m->on_read, - GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL); + GRPC_ERROR_CREATE("Endpoint Shutdown"), NULL); m->on_read = NULL; } gpr_mu_unlock(&m->mu); diff --git a/test/core/util/passthru_endpoint.c b/test/core/util/passthru_endpoint.c index bf897d8f7e..a39f3dd66e 100644 --- a/test/core/util/passthru_endpoint.c +++ b/test/core/util/passthru_endpoint.c @@ -60,7 +60,7 @@ static void me_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_mu_lock(&m->parent->mu); if (m->parent->shutdown) { grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_CREATE("Already shutdown"), - NULL); + NULL); } else if (m->read_buffer.count > 0) { gpr_slice_buffer_swap(&m->read_buffer, slices); grpc_exec_ctx_sched(exec_ctx, cb, GRPC_ERROR_NONE, NULL); @@ -110,13 +110,13 @@ static void me_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { m->parent->shutdown = true; if (m->on_read) { grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), - NULL); + NULL); m->on_read = NULL; } m = other_half(m); if (m->on_read) { grpc_exec_ctx_sched(exec_ctx, m->on_read, GRPC_ERROR_CREATE("Shutdown"), - NULL); + NULL); m->on_read = NULL; } gpr_mu_unlock(&m->parent->mu); -- cgit v1.2.3