diff options
Diffstat (limited to 'src/core/iomgr')
25 files changed, 191 insertions, 208 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 85eadd754b..4ba7c5df94 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -46,6 +46,8 @@ #include <grpc/support/string_util.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/pollset_posix.h" + #define CLOSURE_NOT_READY ((grpc_closure *)0) #define CLOSURE_READY ((grpc_closure *)1) @@ -175,11 +177,11 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { } static void pollset_kick_locked(grpc_fd_watcher *watcher) { - gpr_mu_lock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_lock(&watcher->pollset->mu); GPR_ASSERT(watcher->worker); grpc_pollset_kick_ext(watcher->pollset, watcher->worker, GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP); - gpr_mu_unlock(GRPC_POLLSET_MU(watcher->pollset)); + gpr_mu_unlock(&watcher->pollset->mu); } static void maybe_wake_one_watcher_locked(grpc_fd *fd) { diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 759340e00e..807729708e 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -42,7 +42,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/thd.h> -#include "src/core/iomgr/timer_internal.h" +#include "src/core/iomgr/timer.h" #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/socket_windows.h" diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 212ce5534d..9c89c2c08a 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,9 +41,11 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/timer_internal.h" +#include "src/core/iomgr/timer.h" +#include "src/core/support/env.h" #include "src/core/support/string.h" static gpr_mu g_mu; @@ -116,6 +118,9 @@ void grpc_iomgr_shutdown(void) { "memory leaks are likely", count_objects()); dump_objects("LEAKED"); + if (grpc_iomgr_abort_on_leaks()) { + abort(); + } } break; } @@ -154,3 +159,14 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) { gpr_mu_unlock(&g_mu); gpr_free(obj->name); } + +bool grpc_iomgr_abort_on_leaks(void) { + char *env = gpr_getenv("GRPC_ABORT_ON_LEAKS"); + if (env == NULL) return false; + static const char *truthy[] = {"yes", "Yes", "YES", "true", + "True", "TRUE", "1"}; + for (size_t i = 0; i < GPR_ARRAY_SIZE(truthy); i++) { + if (0 == strcmp(env, truthy[i])) return true; + } + return false; +} diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index e372c18e8a..ac2c46ebe6 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H +#include <stdbool.h> + #include "src/core/iomgr/iomgr.h" #include <grpc/support/sync.h> @@ -55,4 +57,6 @@ void grpc_iomgr_platform_flush(void); /** tear down all platform specific global iomgr structures */ void grpc_iomgr_platform_shutdown(void); +bool grpc_iomgr_abort_on_leaks(void); + #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index c6b0214dea..92a0374ddd 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,8 +35,11 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_H #include <grpc/support/port_platform.h> +#include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/iomgr/exec_ctx.h" + #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) /* A grpc_pollset is a set of file descriptors that a higher level item is @@ -46,15 +49,11 @@ - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_posix.h" -#endif - -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_windows.h" -#endif +typedef struct grpc_pollset grpc_pollset; +typedef struct grpc_pollset_worker grpc_pollset_worker; -void grpc_pollset_init(grpc_pollset *pollset); +size_t grpc_pollset_size(void); +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu); /* Begin shutting down the pollset, and call closure when done. * GRPC_POLLSET_MU(pollset) must be held */ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -83,7 +82,7 @@ void grpc_pollset_destroy(grpc_pollset *pollset); pollset lock */ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker, gpr_timespec now, gpr_timespec deadline); /* Break one polling thread out of polling work for this pollset. diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 4acae2bb71..2e0f27fab8 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -45,6 +45,7 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/profiling/timers.h" #include "src/core/support/block_annotate.h" diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 809f8f39da..92d6fb7241 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -42,13 +42,15 @@ #include <stdlib.h> #include <string.h> -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/support/block_annotate.h" + typedef struct { /* all polled fds */ size_t fd_count; @@ -120,6 +122,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( } else { h->fds[fd_count++] = h->fds[i]; watchers[pfd_count].fd = h->fds[i]; + GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start"); pfds[pfd_count].fd = h->fds[i]->fd; pfds[pfd_count].revents = 0; pfd_count++; @@ -133,8 +136,10 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( gpr_mu_unlock(&pollset->mu); for (i = 2; i < pfd_count; i++) { - pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker, - POLLIN, POLLOUT, &watchers[i]); + grpc_fd *fd = watchers[i].fd; + pfds[i].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN, + POLLOUT, &watchers[i]); + GRPC_FD_UNREF(fd, "multipoller_start"); } /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 19ee6650f0..e895a77884 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -42,17 +42,16 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/timer_internal.h" -#include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/socket_utils_posix.h" -#include "src/core/profiling/timers.h" -#include "src/core/support/block_annotate.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/tls.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include "src/core/profiling/timers.h" +#include "src/core/support/block_annotate.h" GPR_TLS_DECL(g_current_thread_poller); GPR_TLS_DECL(g_current_thread_worker); @@ -98,6 +97,8 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { worker->prev->next = worker->next->prev = worker; } +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + void grpc_pollset_kick_ext(grpc_pollset *p, grpc_pollset_worker *specific_worker, uint32_t flags) { @@ -187,8 +188,9 @@ void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); } static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); -void grpc_pollset_init(grpc_pollset *pollset) { +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { gpr_mu_init(&pollset->mu); + *mu = &pollset->mu; pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->in_flight_cbs = 0; pollset->shutting_down = 0; @@ -205,7 +207,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail); pollset->vtable->destroy(pollset); - gpr_mu_destroy(&pollset->mu); while (pollset->local_wakeup_cache) { grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next; grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd); @@ -247,8 +248,11 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + /* pollset->mu already held */ int added_worker = 0; int locked = 1; @@ -256,16 +260,16 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, int keep_polling = 0; GPR_TIMER_BEGIN("grpc_pollset_work", 0); /* this must happen before we (potentially) drop pollset->mu */ - worker->next = worker->prev = NULL; - worker->reevaluate_polling_on_wakeup = 0; + worker.next = worker.prev = NULL; + worker.reevaluate_polling_on_wakeup = 0; if (pollset->local_wakeup_cache != NULL) { - worker->wakeup_fd = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd->next; + worker.wakeup_fd = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd->next; } else { - worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd)); - grpc_wakeup_fd_init(&worker->wakeup_fd->fd); + worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd)); + grpc_wakeup_fd_init(&worker.wakeup_fd->fd); } - worker->kicked_specifically = 0; + worker.kicked_specifically = 0; /* If there's work waiting for the pollset to be idle, and the pollset is idle, then do that work */ if (!grpc_pollset_has_workers(pollset) && @@ -274,16 +278,6 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL); goto done; } - /* Check alarms - these are a global resource so we just ping - each time through on every pollset. - May update deadline to ensure timely wakeups. - TODO(ctiller): can this work be localized? */ - if (grpc_timer_check(exec_ctx, now, &deadline)) { - GPR_TIMER_MARK("grpc_pollset_work.alarm_triggered", 0); - gpr_mu_unlock(&pollset->mu); - locked = 0; - goto done; - } /* If we're shutting down then we don't execute any extended work */ if (pollset->shutting_down) { GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0); @@ -304,13 +298,13 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, keep_polling = 0; if (!pollset->kicked_without_pollers) { if (!added_worker) { - push_front_worker(pollset, worker); + push_front_worker(pollset, &worker); added_worker = 1; - gpr_tls_set(&g_current_thread_worker, (intptr_t)worker); + gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker); } gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset); GPR_TIMER_BEGIN("maybe_work_and_unlock", 0); - pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker, + pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, &worker, deadline, now); GPR_TIMER_END("maybe_work_and_unlock", 0); locked = 0; @@ -332,10 +326,10 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, /* If we're forced to re-evaluate polling (via grpc_pollset_kick with GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force a loop */ - if (worker->reevaluate_polling_on_wakeup) { - worker->reevaluate_polling_on_wakeup = 0; + if (worker.reevaluate_polling_on_wakeup) { + worker.reevaluate_polling_on_wakeup = 0; pollset->kicked_without_pollers = 0; - if (queued_work || worker->kicked_specifically) { + if (queued_work || worker.kicked_specifically) { /* If there's queued work on the list, then set the deadline to be immediate so we get back out of the polling loop quickly */ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC); @@ -344,12 +338,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, } } if (added_worker) { - remove_worker(pollset, worker); + remove_worker(pollset, &worker); gpr_tls_set(&g_current_thread_worker, 0); } /* release wakeup fd to the local pool */ - worker->wakeup_fd->next = pollset->local_wakeup_cache; - pollset->local_wakeup_cache = worker->wakeup_fd; + worker.wakeup_fd->next = pollset->local_wakeup_cache; + pollset->local_wakeup_cache = worker.wakeup_fd; /* check shutdown conditions */ if (pollset->shutting_down) { if (grpc_pollset_has_workers(pollset)) { @@ -371,6 +365,7 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); } } + *worker_hdl = NULL; GPR_TIMER_END("grpc_pollset_work", 0); } diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 5868b3fa21..bbedb66b00 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -37,8 +37,10 @@ #include <poll.h> #include <grpc/support/sync.h> + #include "src/core/iomgr/exec_ctx.h" #include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" #include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_pollset_vtable grpc_pollset_vtable; @@ -53,15 +55,15 @@ typedef struct grpc_cached_wakeup_fd { struct grpc_cached_wakeup_fd *next; } grpc_cached_wakeup_fd; -typedef struct grpc_pollset_worker { +struct grpc_pollset_worker { grpc_cached_wakeup_fd *wakeup_fd; int reevaluate_polling_on_wakeup; int kicked_specifically; struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; -} grpc_pollset_worker; +}; -typedef struct grpc_pollset { +struct grpc_pollset { /* pollsets under posix can mutate representation as fds are added and removed. For example, we may choose a poll() based implementation on linux for @@ -81,7 +83,7 @@ typedef struct grpc_pollset { } data; /* Local cache of eventfds for workers */ grpc_cached_wakeup_fd *local_wakeup_cache; -} grpc_pollset; +}; struct grpc_pollset_vtable { void (*add_fd)(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -93,8 +95,6 @@ struct grpc_pollset_vtable { void (*destroy)(grpc_pollset *pollset); }; -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) - /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, struct grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 09c04438f7..dddcd8313f 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,15 +41,9 @@ fd's (etc) that have been registered with the set_set to that pollset. Registering fd's automatically adds them to all current pollsets. */ -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_set_posix.h" -#endif +typedef struct grpc_pollset_set grpc_pollset_set; -#ifdef GPR_WIN32 -#include "src/core/iomgr/pollset_set_windows.h" -#endif - -void grpc_pollset_set_init(grpc_pollset_set *pollset_set); +grpc_pollset_set *grpc_pollset_set_create(void); void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 4ec92202e3..9dc9aff4a8 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -41,11 +41,30 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" -void grpc_pollset_set_init(grpc_pollset_set *pollset_set) { +struct grpc_pollset_set { + gpr_mu mu; + + size_t pollset_count; + size_t pollset_capacity; + grpc_pollset **pollsets; + + size_t pollset_set_count; + size_t pollset_set_capacity; + struct grpc_pollset_set **pollset_sets; + + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; +}; + +grpc_pollset_set *grpc_pollset_set_create(void) { + grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); memset(pollset_set, 0, sizeof(*pollset_set)); gpr_mu_init(&pollset_set->mu); + return pollset_set; } void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { @@ -57,6 +76,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { gpr_free(pollset_set->pollsets); gpr_free(pollset_set->pollset_sets); gpr_free(pollset_set->fds); + gpr_free(pollset_set); } void grpc_pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index 4820a61e4b..7d1aaf4181 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,23 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_POSIX_H #include "src/core/iomgr/fd_posix.h" -#include "src/core/iomgr/pollset_posix.h" - -typedef struct grpc_pollset_set { - gpr_mu mu; - - size_t pollset_count; - size_t pollset_capacity; - grpc_pollset **pollsets; - - size_t pollset_set_count; - size_t pollset_set_capacity; - struct grpc_pollset_set **pollset_sets; - - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; -} grpc_pollset_set; +#include "src/core/iomgr/pollset_set.h" void grpc_pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pollset_set, grpc_fd *fd); diff --git a/src/core/iomgr/pollset_set_windows.c b/src/core/iomgr/pollset_set_windows.c index 157b46ec32..3b8eca28e6 100644 --- a/src/core/iomgr/pollset_set_windows.c +++ b/src/core/iomgr/pollset_set_windows.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,9 +35,9 @@ #ifdef GPR_WINSOCK_SOCKET -#include "src/core/iomgr/pollset_set.h" +#include "src/core/iomgr/pollset_set_windows.h" -void grpc_pollset_set_init(grpc_pollset_set* pollset_set) {} +grpc_pollset_set* grpc_pollset_set_create(pollset_set) { return NULL; } void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {} diff --git a/src/core/iomgr/pollset_set_windows.h b/src/core/iomgr/pollset_set_windows.h index cada0d2b61..9661cd2c39 100644 --- a/src/core/iomgr/pollset_set_windows.h +++ b/src/core/iomgr/pollset_set_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,6 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_SET_WINDOWS_H -typedef struct grpc_pollset_set { void *unused; } grpc_pollset_set; +#include "src/core/iomgr/pollset_set.h" #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 02c6678363..c7f30f435f 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -38,7 +38,6 @@ #include <grpc/support/log.h> #include <grpc/support/thd.h> -#include "src/core/iomgr/timer_internal.h" #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/pollset.h" @@ -90,12 +89,15 @@ static void push_front_worker(grpc_pollset_worker *root, worker->links[type].next->links[type].prev = worker; } +size_t grpc_pollset_size(void) { return sizeof(grpc_pollset); } + /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. We're still going to provide a minimal set of features for the sake of the rest of grpc. But grpc_pollset_work won't actually do any polling, and return as quickly as possible. */ -void grpc_pollset_init(grpc_pollset *pollset) { +void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) { + *mu = &grpc_polling_mu; memset(pollset, 0, sizeof(*pollset)); pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = @@ -126,25 +128,25 @@ void grpc_pollset_reset(grpc_pollset *pollset) { } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_pollset_worker *worker, gpr_timespec now, + grpc_pollset_worker **worker_hdl, gpr_timespec now, gpr_timespec deadline) { + grpc_pollset_worker worker; + *worker_hdl = &worker; + int added_worker = 0; - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = - worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = - worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; - worker->kicked = 0; - worker->pollset = pollset; - gpr_cv_init(&worker->cv); - if (grpc_timer_check(exec_ctx, now, &deadline)) { - goto done; - } + worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = NULL; + worker.kicked = 0; + worker.pollset = pollset; + gpr_cv_init(&worker.cv); if (!pollset->kicked_without_pollers && !pollset->shutting_down) { if (g_active_poller == NULL) { grpc_pollset_worker *next_worker; /* become poller */ pollset->is_iocp_worker = 1; - g_active_poller = worker; + g_active_poller = &worker; gpr_mu_unlock(&grpc_polling_mu); grpc_iocp_work(exec_ctx, deadline); grpc_exec_ctx_flush(exec_ctx); @@ -171,12 +173,12 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, goto done; } push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, - worker); + &worker); push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, - worker); + &worker); added_worker = 1; - while (!worker->kicked) { - if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) { + while (!worker.kicked) { + if (gpr_cv_wait(&worker.cv, &grpc_polling_mu, deadline)) { break; } } @@ -190,10 +192,11 @@ done: gpr_mu_lock(&grpc_polling_mu); } if (added_worker) { - remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); - remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET); + remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + remove_worker(&worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } - gpr_cv_destroy(&worker->cv); + gpr_cv_destroy(&worker.cv); + *worker_hdl = NULL; } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 65ba80619b..dc0b7a4104 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -72,8 +72,4 @@ struct grpc_pollset { grpc_closure *on_shutdown; }; -extern gpr_mu grpc_polling_mu; - -#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) - #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index c76c2e3b0f..15727856ab 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -42,17 +42,19 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/timer.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> + #include "src/core/iomgr/iomgr_posix.h" #include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" +#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/time.h> extern int grpc_tcp_trace; diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 048e907441..f74eb3fe51 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -40,8 +40,8 @@ #include <errno.h> #include <stdlib.h> #include <string.h> -#include <sys/types.h> #include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> #include <grpc/support/alloc.h> @@ -51,9 +51,11 @@ #include <grpc/support/sync.h> #include <grpc/support/time.h> -#include "src/core/support/string.h" #include "src/core/debug/trace.h" +#include "src/core/iomgr/pollset_posix.h" +#include "src/core/iomgr/pollset_set_posix.h" #include "src/core/profiling/timers.h" +#include "src/core/support/string.h" #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c index a33d8f63a0..8379fffad0 100644 --- a/src/core/iomgr/timer.c +++ b/src/core/iomgr/timer.c @@ -34,7 +34,6 @@ #include "src/core/iomgr/timer.h" #include "src/core/iomgr/timer_heap.h" -#include "src/core/iomgr/timer_internal.h" #include "src/core/iomgr/time_averaged_stats.h" #include <grpc/support/log.h> #include <grpc/support/sync.h> @@ -336,8 +335,8 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, return (int)n; } -int grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, - gpr_timespec *next) { +bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, + gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_timers( exec_ctx, now, next, diff --git a/src/core/iomgr/timer.h b/src/core/iomgr/timer.h index 720c9d5ab9..9ad1e92f42 100644 --- a/src/core/iomgr/timer.h +++ b/src/core/iomgr/timer.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -86,4 +86,24 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer, Requires: cancel() must happen after add() on a given timer */ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); +/* iomgr internal api for dealing with timers */ + +/* Check for timers to be run, and run them. + Return true if timer callbacks were executed. + Drops drop_mu if it is non-null before executing callbacks. + If next is non-null, TRY to update *next with the next running timer + IF that timer occurs before *next current value. + *next is never guaranteed to be updated on any given execution; however, + with high probability at least one thread in the system will see an update + at any time slice. */ + +bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, + gpr_timespec *next); +void grpc_timer_list_init(gpr_timespec now); +void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx); + +/* the following must be implemented by each iomgr implementation */ + +void grpc_kick_poller(void); + #endif /* GRPC_INTERNAL_CORE_IOMGR_TIMER_H */ diff --git a/src/core/iomgr/timer_heap.c b/src/core/iomgr/timer_heap.c index 9d8be5c1fc..b5df566c45 100644 --- a/src/core/iomgr/timer_heap.c +++ b/src/core/iomgr/timer_heap.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -46,7 +46,7 @@ static void adjust_upwards(grpc_timer **first, uint32_t i, grpc_timer *t) { while (i > 0) { uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(first[parent]->deadline, t->deadline) >= 0) break; + if (gpr_time_cmp(first[parent]->deadline, t->deadline) <= 0) break; first[i] = first[parent]; first[i]->heap_index = i; i = parent; @@ -62,16 +62,14 @@ static void adjust_downwards(grpc_timer **first, uint32_t i, uint32_t length, grpc_timer *t) { for (;;) { uint32_t left_child = 1u + 2u * i; - uint32_t right_child; - uint32_t next_i; if (left_child >= length) break; - right_child = left_child + 1; - next_i = right_child < length && - gpr_time_cmp(first[left_child]->deadline, - first[right_child]->deadline) < 0 - ? right_child - : left_child; - if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break; + uint32_t right_child = left_child + 1; + uint32_t next_i = right_child < length && + gpr_time_cmp(first[left_child]->deadline, + first[right_child]->deadline) > 0 + ? right_child + : left_child; + if (gpr_time_cmp(t->deadline, first[next_i]->deadline) <= 0) break; first[i] = first[next_i]; first[i]->heap_index = i; i = next_i; @@ -95,7 +93,7 @@ static void maybe_shrink(grpc_timer_heap *heap) { static void note_changed_priority(grpc_timer_heap *heap, grpc_timer *timer) { uint32_t i = timer->heap_index; uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) < 0) { + if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) > 0) { adjust_upwards(heap->timers, i, timer); } else { adjust_downwards(heap->timers, i, heap->timer_count, timer); diff --git a/src/core/iomgr/timer_internal.h b/src/core/iomgr/timer_internal.h deleted file mode 100644 index f182e73764..0000000000 --- a/src/core/iomgr/timer_internal.h +++ /dev/null @@ -1,61 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_IOMGR_TIMER_INTERNAL_H -#define GRPC_INTERNAL_CORE_IOMGR_TIMER_INTERNAL_H - -#include "src/core/iomgr/exec_ctx.h" -#include <grpc/support/sync.h> -#include <grpc/support/time.h> - -/* iomgr internal api for dealing with timers */ - -/* Check for timers to be run, and run them. - Return non zero if timer callbacks were executed. - Drops drop_mu if it is non-null before executing callbacks. - If next is non-null, TRY to update *next with the next running timer - IF that timer occurs before *next current value. - *next is never guaranteed to be updated on any given execution; however, - with high probability at least one thread in the system will see an update - at any time slice. */ - -int grpc_timer_check(grpc_exec_ctx* exec_ctx, gpr_timespec now, - gpr_timespec* next); -void grpc_timer_list_init(gpr_timespec now); -void grpc_timer_list_shutdown(grpc_exec_ctx* exec_ctx); - -/* the following must be implemented by each iomgr implementation */ - -void grpc_kick_poller(void); - -#endif /* GRPC_INTERNAL_CORE_IOMGR_TIMER_INTERNAL_H */ diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index 73a21c80ab..a9d0489edf 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_UDP_SERVER_H #include "src/core/iomgr/endpoint.h" +#include "src/core/iomgr/fd_posix.h" /* Forward decl of grpc_server */ typedef struct grpc_server grpc_server; diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index da11df67ef..c096dbfb30 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -44,6 +44,7 @@ #include <grpc/support/useful.h> #include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/pollset_posix.h" static void on_readable(grpc_exec_ctx *exec_ctx, void *arg, bool success); diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 589034fe1b..68f195ee0d 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,6 +34,8 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H #define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H +#include "src/core/iomgr/wakeup_fd_posix.h" + struct grpc_fd; struct grpc_workqueue { |