diff options
author | Craig Tiller <ctiller@google.com> | 2016-02-22 09:43:36 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-02-22 09:43:36 -0800 |
commit | 062127e3262dba79ac6731eb556ad77f81416a70 (patch) | |
tree | 213d0c9480971dcb896ee9c919867ea261dcd353 /src/core | |
parent | 0841e4f1d052026bb2a9b6a722042b45f23a5711 (diff) | |
parent | c605c62b30ca15c83a7c4e98386062c62de0d36d (diff) |
Merge branch 'hide-the-pollset' into cleaner-posix2
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/iomgr/ev_poll_and_epoll_posix.c | 24 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset.h | 15 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 9 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 11 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 84 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 14 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 34 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 5 | ||||
-rw-r--r-- | src/core/iomgr/timer.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.h | 4 | ||||
-rw-r--r-- | src/core/security/google_default_credentials.c | 25 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 96 |
16 files changed, 187 insertions, 151 deletions
diff --git a/src/core/iomgr/ev_poll_and_epoll_posix.c b/src/core/iomgr/ev_poll_and_epoll_posix.c index ae61ebf278..9b2fc2056d 100644 --- a/src/core/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/iomgr/ev_poll_and_epoll_posix.c @@ -31,16 +31,16 @@ * */ - /* This file will be removed shortly: it's here to keep refactoring - * steps simple and auditable. - * It's the combination of the old files: - * - fd_posix.{h,c} - * - pollset_posix.{h,c} - * - pullset_multipoller_with_{poll,epoll}.{h,c} - * The new version will be split into: - * - ev_poll_posix.{h,c} - * - ev_epoll_posix.{h,c} - */ +/* This file will be removed shortly: it's here to keep refactoring + * steps simple and auditable. + * It's the combination of the old files: + * - fd_posix.{h,c} + * - pollset_posix.{h,c} + * - pullset_multipoller_with_{poll,epoll}.{h,c} + * The new version will be split into: + * - ev_poll_posix.{h,c} + * - ev_epoll_posix.{h,c} + */ #include <grpc/support/port_platform.h> @@ -292,11 +292,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { } static void pollset_kick_locked(grpc_fd_watcher *watcher) { - gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_lock(watcher->pollset->mu); GPR_ASSERT(watcher->worker); grpc_pollset_kick_ext(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); - gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_unlock(watcher->pollset->mu); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 3283b586b0..04580150f3 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 7eee0b8780..5e185cf5f3 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,8 +35,11 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H #include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/iomgr/exec_ctx.h" + #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) /* A grpc_pollset is a set of file descriptors that a higher level item is @@ -46,6 +49,7 @@ - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ +<<<<<<< HEAD #ifdef GPR_POSIX_SOCKET #include "src/core/iomgr/ev_posix.h" #endif @@ -53,8 +57,13 @@ #ifdef GPR_WIN32 #include "src/core/iomgr/pollset_windows.h" #endif +======= +typedef struct grpc_pollset grpc_pollset; +typedef struct grpc_pollset_worker grpc_pollset_worker; +>>>>>>> c605c62b30ca15c83a7c4e98386062c62de0d36d -void grpc_pollset_init(grpc_pollset *pollset); +size_t grpc_pollset_size(void); +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu); /* Begin shutting down the pollset, and call closure when done. * GRPC_POLLSET_MU(pollset) must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -83,7 +92,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset); pollset lock */ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker, gpr_timespec now, gpr_timespec deadline); /* Break one polling thread out of polling work for this pollset. diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index e7d3708ecd..27884166dc 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -44,6 +44,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> + #include "src/core/iomgr/ev_posix.h" #include "src/core/profiling/timers.h" #include "src/core/support/block_annotate.h" @@ -148,7 +149,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, finally_add_fd(exec_ctx, da->pollset, da->fd); } - gpr_mu_lock(&da->pollset->mu); + gpr_mu_lock(da->pollset->mu); da->pollset->in_flight_cbs--; if (da->pollset->shutting_down) { /* We don't care about this pollset anymore. */ @@ -157,7 +158,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL); } } - gpr_mu_unlock(&da->pollset->mu); + gpr_mu_unlock(da->pollset->mu); GRPC_FD_UNREF(da->fd, "delayed_add"); @@ -169,7 +170,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int and_unlock_pollset) { if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); finally_add_fd(exec_ctx, pollset, fd); } else { delayed_add *da = gpr_malloc(sizeof(*da)); @@ -201,7 +202,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( * here. */ - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 80def2c8bf..ef3d8c2613 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -42,13 +42,14 @@ #include <stdlib.h> #include <string.h> -#include "src/core/iomgr/ev_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/ev_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/support/block_annotate.h" + typedef struct { /* all polled fds */ size_t fd_count; @@ -78,7 +79,7 @@ static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx, GRPC_FD_REF(fd, "multipoller"); exit: if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); } } @@ -130,7 +131,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } h->del_count = 0; h->fd_count = fd_count; - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); for (i = 2; i < pfd_count; i++) { pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker, diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 7ac5c20866..20200113cd 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -42,17 +42,18 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/ev_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/socket_utils_posix.h" -#include "src/core/profiling/timers.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/tls.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/ev_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" + GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); @@ -97,6 +98,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, uint32_t flags) { @@ -186,8 +189,8 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); -void grpc_pollset_init(grpc_pollset *pollset) { - gpr_mu_init(&pollset->mu); +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu) { + pollset->mu = mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->in_flight_cbs = 0; pollset->shutting_down = 0; @@ -204,7 +207,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); pollset->vtable->destroy(pollset); - gpr_mu_destroy(&pollset->mu); while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); @@ -227,15 +229,15 @@ void grpc_pollset_reset(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to add_fd above is not respected, the code will deadlock (in a way that we have a chance of debugging) */ #ifndef NDEBUG - gpr_mu_lock(&pollset->mu); - gpr_mu_unlock(&pollset->mu); + gpr_mu_lock(pollset->mu); + gpr_mu_unlock(pollset->mu); #endif } @@ -246,8 +248,11 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + /* pollset->mu already held */ int added_worker = 0; int locked = 1; @@ -255,16 +260,16 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int keep_polling = 0; GPR_TIMER_BEGIN("grpc_pollset_work", 0); /* this must happen before we (potentially) drop pollset->mu */ - worker->next = worker->prev = NULL; - worker->reevaluate_polling_on_wakeup = 0; + worker.next = worker.prev = NULL; + worker.reevaluate_polling_on_wakeup = 0; if (pollset->local_wakeup_cache != NULL) { - worker->wakeup_fd = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd->next; + worker.wakeup_fd = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { - worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd)); - grpc_wakeup_fd_init(&worker->wakeup_fd->fd); + worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); + grpc_wakeup_fd_init(&worker.wakeup_fd->fd); } - worker->kicked_specifically = 0; + worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the pollset is idle, then do that work */ if (!grpc_pollset_has_workers(pollset) && @@ -281,7 +286,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* Give do_promote priority so we don't starve it out */ if (pollset->in_flight_cbs) { GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); locked = 0; goto done; } @@ -293,13 +298,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, keep_polling = 0; if (!pollset->kicked_without_pollers) { if (!added_worker) { - push_front_worker(pollset, worker); + push_front_worker(pollset, &worker); added_worker = 1; - gpr_tls_set(&g_current_thread_worker, (intptr_t)worker); + gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, + pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; @@ -315,16 +320,16 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, done: if (!locked) { queued_work |= grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); locked = 1; } /* If we're forced to re-evaluate polling (via grpc_pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker->reevaluate_polling_on_wakeup) { - worker->reevaluate_polling_on_wakeup = 0; + if (worker.reevaluate_polling_on_wakeup) { + worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; - if (queued_work || worker->kicked_specifically) { + if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -333,33 +338,34 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } if (added_worker) { - remove_worker(pollset, worker); + remove_worker(pollset, &worker); gpr_tls_set(&g_current_thread_worker, 0); } /* release wakeup fd to the local pool */ - worker->wakeup_fd->next = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd; + worker.wakeup_fd->next = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd; /* check shutdown conditions */ if (pollset->shutting_down) { if (grpc_pollset_has_workers(pollset)) { grpc_pollset_kick(pollset, NULL); } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { pollset->called_shutdown = 1; - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); finish_shutdown(exec_ctx, pollset); grpc_exec_ctx_flush(exec_ctx); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * grpc_pollset_work. * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); } } + *worker_hdl = NULL; GPR_TIMER_END("grpc_pollset_work", 0); } @@ -424,7 +430,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, * 4. The pollset may be shutting down. */ - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); /* First we need to ensure that nobody is polling concurrently */ GPR_ASSERT(!grpc_pollset_has_workers(pollset)); @@ -465,7 +471,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, } } - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); /* Matching ref in basic_pollset_add_fd */ GRPC_FD_UNREF(fd, "basicpoll_add"); @@ -518,7 +524,7 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, exit: if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); } } @@ -554,14 +560,14 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, pfd[2].fd = fd->fd; pfd[2].revents = 0; GRPC_FD_REF(fd, "basicpoll_begin"); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher); if (pfd[2].events != 0) { nfds++; } } else { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); } /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index a9e184187a..3cb3a06c27 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -38,8 +38,10 @@ #include <poll.h> #include <grpc/support/sync.h> + #include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_pollset_vtable grpc_pollset_vtable; @@ -54,21 +56,21 @@ typedef struct grpc_cached_wakeup_fd { struct grpc_cached_wakeup_fd *next; } grpc_cached_wakeup_fd; -typedef struct grpc_pollset_worker { +struct grpc_pollset_worker { grpc_cached_wakeup_fd *wakeup_fd; int reevaluate_polling_on_wakeup; int kicked_specifically; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; -} grpc_pollset_worker; +}; -typedef struct grpc_pollset { +struct grpc_pollset { /* pollsets under posix can mutate representation as fds are added and removed. For example, we may choose a poll() based implementation on linux for few fds, and an epoll() based implementation for many fds */ const grpc_pollset_vtable *vtable; - gpr_mu mu; + gpr_mu *mu; grpc_pollset_worker root_worker; int in_flight_cbs; int shutting_down; @@ -82,7 +84,7 @@ typedef struct grpc_pollset { } data; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; -} grpc_pollset; +}; struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -94,8 +96,6 @@ struct grpc_pollset_vtable { void (*destroy)(grpc_pollset *pollset); }; -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) - /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 4ec92202e3..0eb4a427b1 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,6 +41,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/ev_posix.h" #include "src/core/iomgr/pollset_set.h" void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 598b2a360a..4c583b603b 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,8 +34,12 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H +<<<<<<< HEAD #include <grpc/support/sync.h> #include "src/core/iomgr/ev_posix.h" +======= +#include "src/core/iomgr/fd_posix.h" +>>>>>>> c605c62b30ca15c83a7c4e98386062c62de0d36d typedef struct grpc_pollset_set { gpr_mu mu; diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 35a956b27f..8f35a46509 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -125,22 +125,25 @@ void grpc_pollset_reset(grpc_pollset *pollset) { } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + int added_worker = 0; - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; - worker->kicked = 0; - worker->pollset = pollset; - gpr_cv_init(&worker->cv); + worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; + worker.kicked = 0; + worker.pollset = pollset; + gpr_cv_init(&worker.cv); if (!pollset->kicked_without_pollers && !pollset->shutting_down) { if (g_active_poller == NULL) { grpc_pollset_worker *next_worker; /* become poller */ pollset->is_iocp_worker = 1; - g_active_poller = worker; + g_active_poller = &worker; gpr_mu_unlock(&grpc_polling_mu); grpc_iocp_work(exec_ctx, deadline); grpc_exec_ctx_flush(exec_ctx); @@ -167,12 +170,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, - worker); + &worker); push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, - worker); + &worker); added_worker = 1; - while (!worker->kicked) { - if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) { + while (!worker.kicked) { + if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, deadline)) { break; } } @@ -186,10 +189,11 @@ done: gpr_mu_lock(&grpc_polling_mu); } if (added_worker) { - remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); - remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET); + remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } gpr_cv_destroy(&worker->cv); + *worker_hdl = NULL; } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 65ba80619b..c2f13fdfa8 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -74,6 +74,4 @@ struct grpc_pollset { extern gpr_mu grpc_polling_mu; -#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) - #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 048e907441..fba3563427 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -40,8 +40,8 @@ #include <errno.h> #include <stdlib.h> #include <string.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -51,9 +51,10 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/support/string.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/string.h" #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL diff --git a/src/core/iomgr/timer.h b/src/core/iomgr/timer.h index 906255ddfb..9ad1e92f42 100644 --- a/src/core/iomgr/timer.h +++ b/src/core/iomgr/timer.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 589034fe1b..68f195ee0d 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H +#include "src/core/iomgr/wakeup_fd_posix.h" + struct grpc_fd; struct grpc_workqueue { diff --git a/src/core/security/google_default_credentials.c b/src/core/security/google_default_credentials.c index f3ac14568a..25006e16c7 100644 --- a/src/core/security/google_default_credentials.c +++ b/src/core/security/google_default_credentials.c @@ -58,7 +58,7 @@ static gpr_once g_once = GPR_ONCE_INIT; static void init_default_credentials(void) { gpr_mu_init(&g_mu); } typedef struct { - grpc_pollset pollset; + grpc_pollset *pollset; int is_done; int success; } compute_engine_detector; @@ -80,10 +80,10 @@ static void on_compute_engine_detection_http_response( } } } - gpr_mu_lock(GRPC_POLLSET_MU(&detector->pollset)); + gpr_mu_lock(&g_mu); detector->is_done = 1; - grpc_pollset_kick(&detector->pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(&detector->pollset)); + grpc_pollset_kick(detector->pollset, NULL); + gpr_mu_unlock(&g_mu); } static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool s) { @@ -101,7 +101,8 @@ static int is_stack_running_on_compute_engine(void) { on compute engine. */ gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN); - grpc_pollset_init(&detector.pollset); + detector.pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset_init(detector.pollset, &g_mu); detector.is_done = 0; detector.success = 0; @@ -112,7 +113,7 @@ static int is_stack_running_on_compute_engine(void) { grpc_httpcli_context_init(&context); grpc_httpcli_get( - &exec_ctx, &context, &detector.pollset, &request, + &exec_ctx, &context, detector.pollset, &request, gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay), on_compute_engine_detection_http_response, &detector); @@ -120,20 +121,22 @@ static int is_stack_running_on_compute_engine(void) { /* Block until we get the response. This is not ideal but this should only be called once for the lifetime of the process by the default credentials. */ - gpr_mu_lock(GRPC_POLLSET_MU(&detector.pollset)); + gpr_mu_lock(&g_mu); while (!detector.is_done) { - grpc_pollset_worker worker; - grpc_pollset_work(&exec_ctx, &detector.pollset, &worker, + grpc_pollset_worker *worker = NULL; + grpc_pollset_work(&exec_ctx, detector.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } - gpr_mu_unlock(GRPC_POLLSET_MU(&detector.pollset)); + gpr_mu_unlock(&g_mu); grpc_httpcli_context_destroy(&context); grpc_closure_init(&destroy_closure, destroy_pollset, &detector.pollset); - grpc_pollset_shutdown(&exec_ctx, &detector.pollset, &destroy_closure); + grpc_pollset_shutdown(&exec_ctx, detector.pollset, &destroy_closure); grpc_exec_ctx_finish(&exec_ctx); + gpr_free(detector.pollset); + return detector.success; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index de295ab941..d0659c7e52 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,26 +36,28 @@ #include <stdio.h> #include <string.h> -#include "src/core/iomgr/timer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> + #include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/timer.h" +#include "src/core/profiling/timers.h" #include "src/core/support/string.h" #include "src/core/surface/api_trace.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" -#include "src/core/profiling/timers.h" -#include <grpc/support/alloc.h> -#include <grpc/support/atm.h> -#include <grpc/support/log.h> -#include <grpc/support/time.h> typedef struct { - grpc_pollset_worker *worker; + grpc_pollset_worker **worker; void *tag; } plucker; /* Completion queue structure */ struct grpc_completion_queue { + gpr_mu mu; /** completed events */ grpc_cq_completion completed_head; grpc_cq_completion *completed_tail; @@ -63,8 +65,6 @@ struct grpc_completion_queue { gpr_refcount pending_events; /** Once owning_refs drops to zero, we will destroy the cq */ gpr_refcount owning_refs; - /** the set of low level i/o things that concern this cq */ - grpc_pollset pollset; /** 0 initially, 1 once we've begun shutting down */ int shutdown; int shutdown_called; @@ -82,6 +82,8 @@ struct grpc_completion_queue { grpc_completion_queue *next_free; }; +#define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) + static gpr_mu g_freelist_mu; grpc_completion_queue *g_freelist; @@ -94,7 +96,8 @@ void grpc_cq_global_shutdown(void) { gpr_mu_destroy(&g_freelist_mu); while (g_freelist) { grpc_completion_queue *next = g_freelist->next_free; - grpc_pollset_destroy(&g_freelist->pollset); + grpc_pollset_destroy(POLLSET_FROM_CQ(g_freelist)); + gpr_mu_destroy(&g_freelist->mu); #ifndef NDEBUG gpr_free(g_freelist->outstanding_tags); #endif @@ -124,8 +127,9 @@ grpc_completion_queue *grpc_completion_queue_create(void *reserved) { if (g_freelist == NULL) { gpr_mu_unlock(&g_freelist_mu); - cc = gpr_malloc(sizeof(grpc_completion_queue)); - grpc_pollset_init(&cc->pollset); + cc = gpr_malloc(sizeof(grpc_completion_queue) + grpc_pollset_size()); + gpr_mu_init(&cc->mu); + grpc_pollset_init(POLLSET_FROM_CQ(cc), &cc->mu); #ifndef NDEBUG cc->outstanding_tags = NULL; cc->outstanding_tag_capacity = 0; @@ -184,7 +188,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif if (gpr_unref(&cc->owning_refs)) { GPR_ASSERT(cc->completed_head.next == (uintptr_t)&cc->completed_head); - grpc_pollset_reset(&cc->pollset); + grpc_pollset_reset(POLLSET_FROM_CQ(cc)); gpr_mu_lock(&g_freelist_mu); cc->next_free = g_freelist; g_freelist = cc; @@ -194,7 +198,7 @@ void grpc_cq_internal_unref(grpc_completion_queue *cc) { void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { #ifndef NDEBUG - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&cc->mu); GPR_ASSERT(!cc->shutdown_called); if (cc->outstanding_tag_count == cc->outstanding_tag_capacity) { cc->outstanding_tag_capacity = GPR_MAX(4, 2 * cc->outstanding_tag_capacity); @@ -203,7 +207,7 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, void *tag) { cc->outstanding_tag_capacity); } cc->outstanding_tags[cc->outstanding_tag_count++] = tag; - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); #endif gpr_ref(&cc->pending_events); } @@ -231,7 +235,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, storage->next = ((uintptr_t)&cc->completed_head) | ((uintptr_t)(success != 0)); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&cc->mu); #ifndef NDEBUG for (i = 0; i < (int)cc->outstanding_tag_count; i++) { if (cc->outstanding_tags[i] == tag) { @@ -252,12 +256,12 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, pluck_worker = NULL; for (i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag) { - pluck_worker = cc->pluckers[i].worker; + pluck_worker = *cc->pluckers[i].worker; break; } } - grpc_pollset_kick(&cc->pollset, pluck_worker); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_kick(POLLSET_FROM_CQ(cc), pluck_worker); + gpr_mu_unlock(&cc->mu); } else { cc->completed_tail->next = ((uintptr_t)storage) | (1u & (uintptr_t)cc->completed_tail->next); @@ -265,8 +269,9 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); cc->shutdown = 1; - grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + grpc_pollset_shutdown(exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); + gpr_mu_unlock(&cc->mu); } GPR_TIMER_END("grpc_cq_end_op", 0); @@ -275,7 +280,7 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc, grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline, void *reserved) { grpc_event ret; - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; int first_loop = 1; gpr_timespec now; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -294,7 +299,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "next"); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&cc->mu); for (;;) { if (cc->completed_tail != &cc->completed_head) { grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next; @@ -302,7 +307,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, if (c == cc->completed_tail) { cc->completed_tail = &cc->completed_head; } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -310,14 +315,14 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, break; } if (cc->shutdown) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; } now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -330,12 +335,12 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&cc->mu); continue; } - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); } GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); @@ -348,7 +353,7 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } static int add_plucker(grpc_completion_queue *cc, void *tag, - grpc_pollset_worker *worker) { + grpc_pollset_worker **worker) { if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) { return 0; } @@ -359,7 +364,7 @@ static int add_plucker(grpc_completion_queue *cc, void *tag, } static void del_plucker(grpc_completion_queue *cc, void *tag, - grpc_pollset_worker *worker) { + grpc_pollset_worker **worker) { int i; for (i = 0; i < cc->num_pluckers; i++) { if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) { @@ -376,7 +381,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, grpc_event ret; grpc_cq_completion *c; grpc_cq_completion *prev; - grpc_pollset_worker worker; + grpc_pollset_worker *worker = NULL; gpr_timespec now; int first_loop = 1; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -395,7 +400,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC); GRPC_CQ_INTERNAL_REF(cc, "pluck"); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&cc->mu); for (;;) { prev = &cc->completed_head; while ((c = (grpc_cq_completion *)(prev->next & ~(uintptr_t)1)) != @@ -405,7 +410,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, if (c == cc->completed_tail) { cc->completed_tail = prev; } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); ret.type = GRPC_OP_COMPLETE; ret.success = c->next & 1u; ret.tag = c->tag; @@ -415,7 +420,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, prev = c; } if (cc->shutdown) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_SHUTDOWN; break; @@ -425,7 +430,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, "Too many outstanding grpc_completion_queue_pluck calls: maximum " "is %d", GRPC_MAX_COMPLETION_QUEUE_PLUCKERS); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); memset(&ret, 0, sizeof(ret)); /* TODO(ctiller): should we use a different result here */ ret.type = GRPC_QUEUE_TIMEOUT; @@ -434,7 +439,7 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, now = gpr_now(GPR_CLOCK_MONOTONIC); if (!first_loop && gpr_time_cmp(now, deadline) >= 0) { del_plucker(cc, tag, &worker); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; break; @@ -447,12 +452,12 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec iteration_deadline = deadline; if (grpc_timer_check(&exec_ctx, now, &iteration_deadline)) { GPR_TIMER_MARK("alarm_triggered", 0); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&cc->mu); continue; } - grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, + grpc_pollset_work(&exec_ctx, POLLSET_FROM_CQ(cc), &worker, now, iteration_deadline); del_plucker(cc, tag, &worker); } @@ -472,9 +477,9 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0); GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc)); - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&cc->mu); if (cc->shutdown_called) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); return; } @@ -482,9 +487,10 @@ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->pending_events)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done); + grpc_pollset_shutdown(&exec_ctx, POLLSET_FROM_CQ(cc), + &cc->pollset_shutdown_done); } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&cc->mu); grpc_exec_ctx_finish(&exec_ctx); GPR_TIMER_END("grpc_completion_queue_shutdown", 0); } @@ -498,7 +504,7 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { } grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { - return &cc->pollset; + return POLLSET_FROM_CQ(cc); } void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; } |