diff options
author | Craig Tiller <ctiller@google.com> | 2016-02-19 16:22:44 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-02-19 16:22:44 -0800 |
commit | a8be91b3153fc75a425718799130172357507dd3 (patch) | |
tree | 94a9b0e1936ff743805c0876aa7de97b6ca4d14e /src/core/iomgr | |
parent | 3633ce48a9ec540c9608d2a7e773d4e1113bb1e1 (diff) |
Provide an interface firewall between pollset and its implementations
Starting to allow for >1 implementation of pollset within a binary.
Do so without requiring an extra allocation for completion queues (which
we could not tolerate).
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/fd_posix.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/pollset.h | 15 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 9 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_poll_posix.c | 12 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 45 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.h | 14 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.c | 1 | ||||
-rw-r--r-- | src/core/iomgr/pollset_set_posix.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 5 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 1 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.h | 2 |
12 files changed, 60 insertions, 53 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 85eadd754b..812ff0992e 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -46,6 +46,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/pollset_posix.h" + #define CLOSURE_NOT_READY ((grpc_closure *)0) #define CLOSURE_READY ((grpc_closure *)1) @@ -175,11 +177,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { } static void pollset_kick_locked(grpc_fd_watcher *watcher) { - gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_lock(watcher->pollset->mu); GPR_ASSERT(watcher->worker); grpc_pollset_kick_ext(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); - gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_unlock(watcher->pollset->mu); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 0c0efad760..dfbd4a40ec 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -35,8 +35,11 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H #include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/iomgr/exec_ctx.h" + #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) /* A grpc_pollset is a set of file descriptors that a higher level item is @@ -46,15 +49,11 @@ - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_posix.h" -#endif - -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_windows.h" -#endif +typedef struct grpc_pollset grpc_pollset; +typedef struct grpc_pollset_worker grpc_pollset_worker; -void grpc_pollset_init(grpc_pollset *pollset); +size_t grpc_pollset_size(void); +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu); /* Begin shutting down the pollset, and call closure when done. * GRPC_POLLSET_MU(pollset) must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 4acae2bb71..e1af2b5241 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -45,6 +45,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/profiling/timers.h" #include "src/core/support/block_annotate.h" @@ -148,7 +149,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, finally_add_fd(exec_ctx, da->pollset, da->fd); } - gpr_mu_lock(&da->pollset->mu); + gpr_mu_lock(da->pollset->mu); da->pollset->in_flight_cbs--; if (da->pollset->shutting_down) { /* We don't care about this pollset anymore. */ @@ -157,7 +158,7 @@ static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg, grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, true, NULL); } } - gpr_mu_unlock(&da->pollset->mu); + gpr_mu_unlock(da->pollset->mu); GRPC_FD_UNREF(da->fd, "delayed_add"); @@ -169,7 +170,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_fd *fd, int and_unlock_pollset) { if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); finally_add_fd(exec_ctx, pollset, fd); } else { delayed_add *da = gpr_malloc(sizeof(*da)); @@ -201,7 +202,7 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( * here. */ - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 809f8f39da..348d339104 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -42,13 +42,15 @@ #include <stdlib.h> #include <string.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/support/block_annotate.h" + typedef struct { /* all polled fds */ size_t fd_count; @@ -78,7 +80,7 @@ static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx, GRPC_FD_REF(fd, "multipoller"); exit: if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); } } @@ -130,7 +132,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } h->del_count = 0; h->fd_count = fd_count; - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); for (i = 2; i < pfd_count; i++) { pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker, diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index ee7e9f48f4..63321638ba 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -42,16 +42,16 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/socket_utils_posix.h" -#include "src/core/profiling/timers.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/tls.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); @@ -97,6 +97,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, uint32_t flags) { @@ -186,8 +188,8 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); -void grpc_pollset_init(grpc_pollset *pollset) { - gpr_mu_init(&pollset->mu); +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu *mu) { + pollset->mu = mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->in_flight_cbs = 0; pollset->shutting_down = 0; @@ -204,7 +206,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); pollset->vtable->destroy(pollset); - gpr_mu_destroy(&pollset->mu); while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); @@ -227,15 +228,15 @@ void grpc_pollset_reset(grpc_pollset *pollset) { void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_fd *fd) { - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); pollset->vtable->add_fd(exec_ctx, pollset, fd, 1); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to add_fd above is not respected, the code will deadlock (in a way that we have a chance of debugging) */ #ifndef NDEBUG - gpr_mu_lock(&pollset->mu); - gpr_mu_unlock(&pollset->mu); + gpr_mu_lock(pollset->mu); + gpr_mu_unlock(pollset->mu); #endif } @@ -284,7 +285,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* Give do_promote priority so we don't starve it out */ if (pollset->in_flight_cbs) { GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); locked = 0; goto done; } @@ -318,7 +319,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, done: if (!locked) { queued_work |= grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); locked = 1; } /* If we're forced to re-evaluate polling (via grpc_pollset_kick with @@ -348,19 +349,19 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_kick(pollset, NULL); } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { pollset->called_shutdown = 1; - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); finish_shutdown(exec_ctx, pollset); grpc_exec_ctx_flush(exec_ctx); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * grpc_pollset_work. * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */ - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); } else if (!grpc_closure_list_empty(pollset->idle_jobs)) { grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); } } *worker_hdl = NULL; @@ -428,7 +429,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, * 4. The pollset may be shutting down. */ - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(pollset->mu); /* First we need to ensure that nobody is polling concurrently */ GPR_ASSERT(!grpc_pollset_has_workers(pollset)); @@ -469,7 +470,7 @@ static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, } } - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); /* Matching ref in basic_pollset_add_fd */ GRPC_FD_UNREF(fd, "basicpoll_add"); @@ -522,7 +523,7 @@ static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, exit: if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); } } @@ -558,14 +559,14 @@ static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx, pfd[2].fd = fd->fd; pfd[2].revents = 0; GRPC_FD_REF(fd, "basicpoll_begin"); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN, POLLOUT, &fd_watcher); if (pfd[2].events != 0) { nfds++; } } else { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(pollset->mu); } /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index b34bb09426..58158e3d46 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -37,8 +37,10 @@ #include <poll.h> #include <grpc/support/sync.h> + #include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_pollset_vtable grpc_pollset_vtable; @@ -53,21 +55,21 @@ typedef struct grpc_cached_wakeup_fd { struct grpc_cached_wakeup_fd *next; } grpc_cached_wakeup_fd; -typedef struct grpc_pollset_worker { +struct grpc_pollset_worker { grpc_cached_wakeup_fd *wakeup_fd; int reevaluate_polling_on_wakeup; int kicked_specifically; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; -} grpc_pollset_worker; +}; -typedef struct grpc_pollset { +struct grpc_pollset { /* pollsets under posix can mutate representation as fds are added and removed. For example, we may choose a poll() based implementation on linux for few fds, and an epoll() based implementation for many fds */ const grpc_pollset_vtable *vtable; - gpr_mu mu; + gpr_mu *mu; grpc_pollset_worker root_worker; int in_flight_cbs; int shutting_down; @@ -81,7 +83,7 @@ typedef struct grpc_pollset { } data; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; -} grpc_pollset; +}; struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -93,8 +95,6 @@ struct grpc_pollset_vtable { void (*destroy)(grpc_pollset *pollset); }; -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) - /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 4ec92202e3..85a0cadfc7 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -41,6 +41,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/pollset_set.h" void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 4820a61e4b..7ce8ec7343 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -35,7 +35,6 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H #include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/pollset_posix.h" typedef struct grpc_pollset_set { gpr_mu mu; diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 65ba80619b..dfec5821b3 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -74,6 +74,4 @@ struct grpc_pollset { extern gpr_mu grpc_polling_mu; -#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) - #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 048e907441..fba3563427 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -40,8 +40,8 @@ #include <errno.h> #include <stdlib.h> #include <string.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -51,9 +51,10 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/support/string.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/string.h" #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index da11df67ef..c096dbfb30 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -44,6 +44,7 @@ #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 589034fe1b..6f29f4004c 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H +#include "src/core/iomgr/wakeup_fd_posix.h" + struct grpc_fd; struct grpc_workqueue { |