diff options
author | 2015-11-18 11:41:30 -0800 | |
---|---|---|
committer | 2015-11-18 11:41:30 -0800 | |
commit | 71b962634eceacae336b1c706829e4ad9621b397 (patch) | |
tree | 35378c669bf04c24097a403af65a5adb8c82e812 /src/core/iomgr | |
parent | 42630b010f398ca072a0ba89648855c3052c6594 (diff) | |
parent | 75b53d6a5d891fa33a9173318b1446faeaf154e4 (diff) |
merge with head
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/closure.c | 23 | ||||
-rw-r--r-- | src/core/iomgr/closure.h | 19 | ||||
-rw-r--r-- | src/core/iomgr/exec_ctx.c | 5 | ||||
-rw-r--r-- | src/core/iomgr/executor.c | 13 | ||||
-rw-r--r-- | src/core/iomgr/pollset.h | 5 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 14 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 88 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 9 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 12 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 2 |
11 files changed, 108 insertions, 86 deletions
diff --git a/src/core/iomgr/closure.c b/src/core/iomgr/closure.c index b4f1817de4..4aae52a454 100644 --- a/src/core/iomgr/closure.c +++ b/src/core/iomgr/closure.c @@ -39,18 +39,17 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg) { closure->cb = cb; closure->cb_arg = cb_arg; - closure->next = NULL; + closure->final_data = 0; } void grpc_closure_list_add(grpc_closure_list *closure_list, grpc_closure *closure, int success) { if (closure == NULL) return; - closure->next = NULL; - closure->success = success; + closure->final_data = (success != 0); if (closure_list->head == NULL) { closure_list->head = closure; } else { - closure_list->tail->next = closure; + closure_list->tail->final_data |= (gpr_uintptr)closure; } closure_list->tail = closure; } @@ -66,22 +65,12 @@ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) { if (dst->head == NULL) { *dst = *src; } else { - dst->tail->next = src->head; + dst->tail->final_data |= (gpr_uintptr)src->head; dst->tail = src->tail; } src->head = src->tail = NULL; } -grpc_closure *grpc_closure_list_pop(grpc_closure_list *list) { - grpc_closure *head; - if (list->head == NULL) { - return NULL; - } - head = list->head; - list->head = list->head->next; - return head; -} - typedef struct { grpc_iomgr_cb_func cb; void *cb_arg; @@ -103,3 +92,7 @@ grpc_closure *grpc_closure_create(grpc_iomgr_cb_func cb, void *cb_arg) { grpc_closure_init(&wc->wrapper, closure_wrapper, wc); return &wc->wrapper; } + +grpc_closure *grpc_closure_next(grpc_closure *closure) { + return (grpc_closure *)(closure->final_data & ~(gpr_uintptr)1); +} diff --git a/src/core/iomgr/closure.h b/src/core/iomgr/closure.h index 7a9f7ccad0..a1d738bf5a 100644 --- a/src/core/iomgr/closure.h +++ b/src/core/iomgr/closure.h @@ -34,7 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H #define GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H -#include <stddef.h> +#include <grpc/support/port_platform.h> struct grpc_closure; typedef struct grpc_closure grpc_closure; @@ -64,13 +64,10 @@ struct grpc_closure { /** Arguments to be passed to "cb". */ void *cb_arg; - /** Internal. A boolean indication to "cb" on the state of the iomgr. - * For instance, closures created during a shutdown would have this field set - * to false. */ - int success; - - /**< Internal. Do not touch */ - struct grpc_closure *next; + /** Once enqueued, contains in the lower bit the success of the closure, + and in the upper bits the pointer to the next closure in the list. + Before enqueing for execution, this is usable for scratch data. */ + gpr_uintptr final_data; }; /** Initializes \a closure with \a cb and \a cb_arg. */ @@ -91,10 +88,10 @@ void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, /** append all closures from \a src to \a dst and empty \a src. */ void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); -/** pop (return and remove) the head closure from \a list. */ -grpc_closure *grpc_closure_list_pop(grpc_closure_list *list); - /** return whether \a list is empty. */ int grpc_closure_list_empty(grpc_closure_list list); +/** return the next pointer for a queued closure list */ +grpc_closure *grpc_closure_next(grpc_closure *closure); + #endif /* GRPC_INTERNAL_CORE_IOMGR_CLOSURE_H */ diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index 410b34c521..e95eaf267a 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -44,10 +44,11 @@ int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { - grpc_closure *next = c->next; + int success = (int)(c->final_data & 1); + grpc_closure *next = (grpc_closure *)(c->final_data & ~(gpr_uintptr)1); did_something++; GPR_TIMER_BEGIN("grpc_exec_ctx_flush.cb", 0); - c->cb(exec_ctx, c->cb_arg, c->success); + c->cb(exec_ctx, c->cb_arg, success); GPR_TIMER_END("grpc_exec_ctx_flush.cb", 0); c = next; } diff --git a/src/core/iomgr/executor.c b/src/core/iomgr/executor.c index 457e5cdbac..00c68f7828 100644 --- a/src/core/iomgr/executor.c +++ b/src/core/iomgr/executor.c @@ -63,8 +63,6 @@ void grpc_executor_init() { /* thread body */ static void closure_exec_thread_func(void *ignored) { - grpc_closure *closure; - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; while (1) { gpr_mu_lock(&g_executor.mu); @@ -72,16 +70,16 @@ static void closure_exec_thread_func(void *ignored) { gpr_mu_unlock(&g_executor.mu); break; } - closure = grpc_closure_list_pop(&g_executor.closures); - if (closure == NULL) { + if (grpc_closure_list_empty(g_executor.closures)) { /* no more work, time to die */ GPR_ASSERT(g_executor.busy == 1); g_executor.busy = 0; gpr_mu_unlock(&g_executor.mu); break; + } else { + grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures); } gpr_mu_unlock(&g_executor.mu); - closure->cb(&exec_ctx, closure->cb_arg, closure->success); grpc_exec_ctx_flush(&exec_ctx); } grpc_exec_ctx_finish(&exec_ctx); @@ -125,7 +123,6 @@ void grpc_executor_enqueue(grpc_closure *closure, int success) { void grpc_executor_shutdown() { int pending_join; - grpc_closure *closure; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_mu_lock(&g_executor.mu); @@ -136,9 +133,7 @@ void grpc_executor_shutdown() { * list below because we aren't accepting new work */ /* Execute pending callbacks, some may be performing cleanups */ - while ((closure = grpc_closure_list_pop(&g_executor.closures)) != NULL) { - closure->cb(&exec_ctx, closure->cb_arg, closure->success); - } + grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(grpc_closure_list_empty(g_executor.closures)); if (pending_join) { diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index d15553a12a..c6b0214dea 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -55,8 +55,13 @@ #endif void grpc_pollset_init(grpc_pollset *pollset); +/* Begin shutting down the pollset, and call closure when done. + * GRPC_POLLSET_MU(pollset) must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure); +/** Reset the pollset to its initial state (perhaps with some cached objects); + * must have been previously shutdown */ +void grpc_pollset_reset(grpc_pollset *pollset); void grpc_pollset_destroy(grpc_pollset *pollset); /* Do some work on a pollset. diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 2aafd21dfb..1f1bf47e98 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -47,21 +47,13 @@ #include "src/core/support/block_annotate.h" #include "src/core/profiling/timers.h" -typedef struct wakeup_fd_hdl { - grpc_wakeup_fd wakeup_fd; - struct wakeup_fd_hdl *next; -} wakeup_fd_hdl; - typedef struct { grpc_pollset *pollset; grpc_fd *fd; grpc_closure closure; } delayed_add; -typedef struct { - int epoll_fd; - wakeup_fd_hdl *free_wakeup_fds; -} pollset_hdr; +typedef struct { int epoll_fd; } pollset_hdr; static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { @@ -174,7 +166,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); - pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); pfds[0].events = POLLIN; pfds[0].revents = 0; pfds[1].fd = h->epoll_fd; @@ -197,7 +189,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( /* do nothing */ } else { if (pfds[0].revents) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); } if (pfds[1].revents) { do { diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index faa6c14491..09f04b64b9 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -124,7 +124,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); pfds[0].events = POLLIN; pfds[0].revents = 0; - pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); pfds[1].events = POLLIN; pfds[1].revents = 0; for (i = 0; i < h->fd_count; i++) { @@ -174,7 +174,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); } if (pfds[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); } for (i = 2; i < pfd_count; i++) { if (watchers[i].fd == NULL) { diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index a8e620ea1e..c179248214 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -111,7 +111,7 @@ void grpc_pollset_kick_ext(grpc_pollset *p, for (specific_worker = p->root_worker.next; specific_worker != &p->root_worker; specific_worker = specific_worker->next) { - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); } p->kicked_without_pollers = 1; GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0); @@ -122,14 +122,14 @@ void grpc_pollset_kick_ext(grpc_pollset *p, specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) { GPR_TIMER_MARK("kick_yoself", 0); if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) { specific_worker->reevaluate_polling_on_wakeup = 1; } specific_worker->kicked_specifically = 1; - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); } } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0); @@ -151,7 +151,7 @@ void grpc_pollset_kick_ext(grpc_pollset *p, if (specific_worker != NULL) { GPR_TIMER_MARK("finally_kick", 0); push_back_worker(p, specific_worker); - grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd); } } else { GPR_TIMER_MARK("kicked_no_pollers", 0); @@ -177,9 +177,9 @@ void grpc_pollset_global_init(void) { void grpc_pollset_global_shutdown(void) { grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd); - grpc_wakeup_fd_global_destroy(); gpr_tls_destroy(&g_current_thread_poller); gpr_tls_destroy(&g_current_thread_worker); + grpc_wakeup_fd_global_destroy(); } void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } @@ -196,6 +196,34 @@ void grpc_pollset_init(grpc_pollset *pollset) { pollset->called_shutdown = 0; pollset->kicked_without_pollers = 0; pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; + pollset->local_wakeup_cache = NULL; + pollset->kicked_without_pollers = 0; + become_basic_pollset(pollset, NULL); +} + +void grpc_pollset_destroy(grpc_pollset *pollset) { + GPR_ASSERT(pollset->in_flight_cbs == 0); + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); + GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); + pollset->vtable->destroy(pollset); + gpr_mu_destroy(&pollset->mu); + while (pollset->local_wakeup_cache) { + grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; + grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); + gpr_free(pollset->local_wakeup_cache); + pollset->local_wakeup_cache = next; + } +} + +void grpc_pollset_reset(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + GPR_ASSERT(pollset->in_flight_cbs == 0); + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); + GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); + pollset->vtable->destroy(pollset); + pollset->shutting_down = 0; + pollset->called_shutdown = 0; + pollset->kicked_without_pollers = 0; become_basic_pollset(pollset, NULL); } @@ -245,13 +273,19 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; worker->reevaluate_polling_on_wakeup = 0; + if (pollset->local_wakeup_cache != NULL) { + worker->wakeup_fd = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker->wakeup_fd->next; + } else { + worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd)); + grpc_wakeup_fd_init(&worker->wakeup_fd->fd); + } worker->kicked_specifically = 0; - /* TODO(ctiller): pool these */ - grpc_wakeup_fd_init(&worker->wakeup_fd); /* If there's work waiting for the pollset to be idle, and the pollset is idle, then do that work */ if (!grpc_pollset_has_workers(pollset) && !grpc_closure_list_empty(pollset->idle_jobs)) { + GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0); grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); goto done; } @@ -260,16 +294,19 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, May update deadline to ensure timely wakeups. TODO(ctiller): can this work be localized? */ if (grpc_timer_check(exec_ctx, now, &deadline)) { + GPR_TIMER_MARK("grpc_pollset_work.alarm_triggered", 0); gpr_mu_unlock(&pollset->mu); locked = 0; goto done; } /* If we're shutting down then we don't execute any extended work */ if (pollset->shutting_down) { + GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0); goto done; } /* Give do_promote priority so we don't starve it out */ if (pollset->in_flight_cbs) { + GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0); gpr_mu_unlock(&pollset->mu); locked = 0; goto done; @@ -294,6 +331,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, locked = 0; gpr_tls_set(&g_current_thread_poller, 0); } else { + GPR_TIMER_MARK("grpc_pollset_work.kicked_without_pollers", 0); pollset->kicked_without_pollers = 0; } /* Finished execution - start cleaning up. @@ -324,7 +362,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, remove_worker(pollset, worker); gpr_tls_set(&g_current_thread_worker, 0); } - grpc_wakeup_fd_destroy(&worker->wakeup_fd); + /* release wakeup fd to the local pool */ + worker->wakeup_fd->next = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker->wakeup_fd; + /* check shutdown conditions */ if (pollset->shutting_down) { if (grpc_pollset_has_workers(pollset)) { grpc_pollset_kick(pollset, NULL); @@ -339,8 +380,8 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ gpr_mu_lock(&pollset->mu); } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { - gpr_mu_unlock(&pollset->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); + gpr_mu_unlock(&pollset->mu); grpc_exec_ctx_flush(exec_ctx); gpr_mu_lock(&pollset->mu); } @@ -350,35 +391,20 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { - int call_shutdown = 0; - gpr_mu_lock(&pollset->mu); GPR_ASSERT(!pollset->shutting_down); pollset->shutting_down = 1; - if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && - !grpc_pollset_has_workers(pollset)) { - pollset->called_shutdown = 1; - call_shutdown = 1; - } + pollset->shutdown_done = closure; + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!grpc_pollset_has_workers(pollset)) { grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs); } - pollset->shutdown_done = closure; - grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - gpr_mu_unlock(&pollset->mu); - - if (call_shutdown) { + if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && + !grpc_pollset_has_workers(pollset)) { + pollset->called_shutdown = 1; finish_shutdown(exec_ctx, pollset); } } -void grpc_pollset_destroy(grpc_pollset *pollset) { - GPR_ASSERT(pollset->shutting_down); - GPR_ASSERT(pollset->in_flight_cbs == 0); - GPR_ASSERT(!grpc_pollset_has_workers(pollset)); - pollset->vtable->destroy(pollset); - gpr_mu_destroy(&pollset->mu); -} - int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now) { gpr_timespec timeout; @@ -558,7 +584,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd); pfd[0].events = POLLIN; pfd[0].revents = 0; - pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd); pfd[1].events = POLLIN; pfd[1].revents = 0; nfds = 2; @@ -600,7 +626,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd); } if (pfd[1].revents & POLLIN_CHECK) { - grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd); } if (nfds > 2) { grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK, diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 95ebeab1c2..e4593728bd 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -48,8 +48,13 @@ typedef struct grpc_pollset_vtable grpc_pollset_vtable; use the struct tag */ struct grpc_fd; +typedef struct grpc_cached_wakeup_fd { + grpc_wakeup_fd fd; + struct grpc_cached_wakeup_fd *next; +} grpc_cached_wakeup_fd; + typedef struct grpc_pollset_worker { - grpc_wakeup_fd wakeup_fd; + grpc_cached_wakeup_fd *wakeup_fd; int reevaluate_polling_on_wakeup; int kicked_specifically; struct grpc_pollset_worker *next; @@ -74,6 +79,8 @@ typedef struct grpc_pollset { int fd; void *ptr; } data; + /* Local cache of eventfds for workers */ + grpc_cached_wakeup_fd *local_wakeup_cache; } grpc_pollset; struct grpc_pollset_vtable { diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 9f74580273..c3f310ee27 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -35,6 +35,7 @@ #ifdef GPR_WINSOCK_SOCKET +#include <grpc/support/log.h> #include <grpc/support/thd.h> #include "src/core/iomgr/timer_internal.h" @@ -112,7 +113,6 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { - gpr_mu_lock(&grpc_polling_mu); pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); if (!pollset->is_iocp_worker) { @@ -120,11 +120,19 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } else { pollset->on_shutdown = closure; } - gpr_mu_unlock(&grpc_polling_mu); } void grpc_pollset_destroy(grpc_pollset *pollset) {} +void grpc_pollset_reset(grpc_pollset *pollset) { + GPR_ASSERT(pollset->shutting_down); + GPR_ASSERT(!has_workers(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET)); + pollset->shutting_down = 0; + pollset->is_iocp_worker = 0; + pollset->kicked_without_pollers = 0; + pollset->on_shutdown = NULL; +} + void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline) { diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index 0a0f3c364e..c087b887b8 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -129,8 +129,6 @@ static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, int success) { void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, int success) { - closure->success = success; - closure->next = NULL; gpr_mu_lock(&workqueue->mu); if (grpc_closure_list_empty(workqueue->closure_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); |