diff options
Diffstat (limited to 'src/core/iomgr')
32 files changed, 643 insertions, 579 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 931f746f75..68d33b9cf6 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -361,7 +361,9 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { GPR_ASSERT(now.clock_type == g_clock_type); - return run_some_expired_alarms(drop_mu, now, next, 1); + return run_some_expired_alarms( + drop_mu, now, next, + gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); } gpr_timespec grpc_alarm_list_next_timeout(void) { diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 96487958a7..744fe7656c 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -50,6 +50,14 @@ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { ep->vtable->add_to_pollset(ep, pollset); } +void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) { + ep->vtable->add_to_pollset_set(ep, pollset_set); +} + void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); } void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); } + +char *grpc_endpoint_get_peer(grpc_endpoint *ep) { + return ep->vtable->get_peer(ep); +} diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index 881e851800..a2216925f9 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -35,6 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_H #include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/pollset_set.h" #include <grpc/support/slice.h> #include <grpc/support/time.h> @@ -70,14 +71,18 @@ struct grpc_endpoint_vtable { size_t nslices, grpc_endpoint_write_cb cb, void *user_data); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); + void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); + char *(*get_peer)(grpc_endpoint *ep); }; /* When data is available on the connection, calls the callback with slices. */ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, void *user_data); +char *grpc_endpoint_get_peer(grpc_endpoint *ep); + /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it @@ -98,6 +103,7 @@ void grpc_endpoint_destroy(grpc_endpoint *ep); /* Add an endpoint to a pollset, so that when the pollset is polled, events from this endpoint are considered */ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset); +void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set); struct grpc_endpoint { const grpc_endpoint_vtable *vtable; diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index fa2d2555d6..deae9c6875 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -66,12 +66,12 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = - grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size); + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size, + "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = - grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size); + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size, + "socketpair-client"); gpr_free(final_name); return p; } diff --git a/src/core/iomgr/endpoint_pair_windows.c b/src/core/iomgr/endpoint_pair_windows.c index c6790b2937..e8295df8b3 100644 --- a/src/core/iomgr/endpoint_pair_windows.c +++ b/src/core/iomgr/endpoint_pair_windows.c @@ -81,8 +81,10 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, size_t read SOCKET sv[2]; grpc_endpoint_pair p; create_sockets(sv); - p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client")); - p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server")); + p.client = grpc_tcp_create(grpc_winsocket_create(sv[1], "endpoint:client"), + "endpoint:server"); + p.server = grpc_tcp_create(grpc_winsocket_create(sv[0], "endpoint:server"), + "endpoint:client"); return p; } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 6ad377ce1c..2d08a77a70 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -102,6 +102,7 @@ static grpc_fd *alloc_fd(int fd) { r->freelist_next = NULL; r->read_watcher = r->write_watcher = NULL; r->on_done_closure = NULL; + r->closed = 0; return r; } @@ -167,13 +168,19 @@ int grpc_fd_is_orphaned(grpc_fd *fd) { return (gpr_atm_acq_load(&fd->refst) & 1) == 0; } +static void pollset_kick_locked(grpc_pollset *pollset) { + gpr_mu_lock(GRPC_POLLSET_MU(pollset)); + grpc_pollset_kick(pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(pollset)); +} + static void maybe_wake_one_watcher_locked(grpc_fd *fd) { if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) { - grpc_pollset_force_kick(fd->inactive_watcher_root.next->pollset); + pollset_kick_locked(fd->inactive_watcher_root.next->pollset); } else if (fd->read_watcher) { - grpc_pollset_force_kick(fd->read_watcher->pollset); + pollset_kick_locked(fd->read_watcher->pollset); } else if (fd->write_watcher) { - grpc_pollset_force_kick(fd->write_watcher->pollset); + pollset_kick_locked(fd->write_watcher->pollset); } } @@ -187,13 +194,13 @@ static void wake_all_watchers_locked(grpc_fd *fd) { grpc_fd_watcher *watcher; for (watcher = fd->inactive_watcher_root.next; watcher != &fd->inactive_watcher_root; watcher = watcher->next) { - grpc_pollset_force_kick(watcher->pollset); + pollset_kick_locked(watcher->pollset); } if (fd->read_watcher) { - grpc_pollset_force_kick(fd->read_watcher->pollset); + pollset_kick_locked(fd->read_watcher->pollset); } if (fd->write_watcher && fd->write_watcher != fd->read_watcher) { - grpc_pollset_force_kick(fd->write_watcher->pollset); + pollset_kick_locked(fd->write_watcher->pollset); } } @@ -209,6 +216,8 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); if (!has_watchers(fd)) { + GPR_ASSERT(!fd->closed); + fd->closed = 1; close(fd->fd); if (fd->on_done_closure) { grpc_iomgr_add_callback(fd->on_done_closure); @@ -373,13 +382,15 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, return 0; } /* if there is nobody polling for read, but we need to, then start doing so */ - if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { + if (read_mask && !fd->read_watcher && + (gpr_uintptr)gpr_atm_acq_load(&fd->readst) > READY) { fd->read_watcher = watcher; mask |= read_mask; } /* if there is nobody polling for write, but we need to, then start doing so */ - if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { + if (write_mask && !fd->write_watcher && + (gpr_uintptr)gpr_atm_acq_load(&fd->writest) > READY) { fd->write_watcher = watcher; mask |= write_mask; } @@ -426,7 +437,8 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { if (kick) { maybe_wake_one_watcher_locked(fd); } - if (grpc_fd_is_orphaned(fd) && !has_watchers(fd)) { + if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { + fd->closed = 1; close(fd->fd); if (fd->on_done_closure != NULL) { grpc_iomgr_add_callback(fd->on_done_closure); diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 94d0019fa4..835e9b339a 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -60,6 +60,7 @@ struct grpc_fd { gpr_mu set_state_mu; gpr_atm shutdown; + int closed; /* The watcher list. @@ -108,7 +109,8 @@ grpc_fd *grpc_fd_create(int fd, const char *name); on_done is called when the underlying file descriptor is definitely close()d. If on_done is NULL, no callback will be made. Requires: *fd initialized; no outstanding notify_on_read or - notify_on_write. */ + notify_on_write. + MUST NOT be called with a pollset lock taken */ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, const char *reason); @@ -121,11 +123,13 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, i.e. a combination of read_mask and write_mask determined by the fd's current interest in said events. Polling strategies that do not need to alter their behavior depending on the - fd's current interest (such as epoll) do not need to call this function. */ + fd's current interest (such as epoll) do not need to call this function. + MUST NOT be called with a pollset lock taken */ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, gpr_uint32 read_mask, gpr_uint32 write_mask, grpc_fd_watcher *rec); -/* Complete polling previously started with grpc_fd_begin_poll */ +/* Complete polling previously started with grpc_fd_begin_poll + MUST NOT be called with a pollset lock taken */ void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write); /* Return 1 if this fd is orphaned, 0 otherwise */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index a18c176b30..fdc9adf4af 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -88,6 +88,7 @@ void grpc_kick_poller(void) { void grpc_iomgr_init(void) { gpr_thd_id id; + g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); grpc_alarm_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); @@ -146,7 +147,6 @@ void grpc_iomgr_shutdown(void) { continue; } if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { - gpr_log(GPR_DEBUG, "got late alarm"); continue; } if (g_root_object.next != &g_root_object) { diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index c40188b3c9..c474e4dbf1 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -37,6 +37,8 @@ #include <grpc/support/port_platform.h> #include <grpc/support/time.h> +#define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker *)1) + /* A grpc_pollset is a set of file descriptors that a higher level item is interested in. For example: - a server will typically keep a pollset containing all connected channels, @@ -63,13 +65,24 @@ void grpc_pollset_destroy(grpc_pollset *pollset); descriptors. Requires GRPC_POLLSET_MU(pollset) locked. May unlock GRPC_POLLSET_MU(pollset) during its execution. - + + worker is a (platform-specific) handle that can be used to wake up + from grpc_pollset_work before any events are received and before the timeout + has expired. It is both initialized and destroyed by grpc_pollset_work. + Initialization of worker is guaranteed to occur BEFORE the + GRPC_POLLSET_MU(pollset) is released for the first time by + grpc_pollset_work, and it is guaranteed that GRPC_POLLSET_MU(pollset) will + not be released by grpc_pollset_work AFTER worker has been destroyed. + Returns true if some work has been done, and false if the deadline - got attained. */ -int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline); + expired. */ +int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline); /* Break one polling thread out of polling work for this pollset. - Requires GRPC_POLLSET_MU(pollset) locked. */ -void grpc_pollset_kick(grpc_pollset *pollset); + If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. + Otherwise, if specific_worker is non-NULL, then kick that worker. */ +void grpc_pollset_kick(grpc_pollset *pollset, + grpc_pollset_worker *specific_worker); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c deleted file mode 100644 index 51021784f2..0000000000 --- a/src/core/iomgr/pollset_kick_posix.c +++ /dev/null @@ -1,168 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc/support/port_platform.h> - -#ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/pollset_kick_posix.h" - -#include <errno.h> -#include <string.h> -#include <unistd.h> - -#include "src/core/iomgr/socket_utils_posix.h" -#include "src/core/iomgr/wakeup_fd_posix.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> - -/* This implementation is based on a freelist of wakeup fds, with extra logic to - * handle kicks while there is no attached fd. */ - -/* TODO(klempner): Autosize this, and consider providing a way to disable the - * cap entirely on systems with large fd limits */ -#define GRPC_MAX_CACHED_WFDS 50 - -static grpc_kick_fd_info *fd_freelist = NULL; -static int fd_freelist_count = 0; -static gpr_mu fd_freelist_mu; - -static grpc_kick_fd_info *allocate_wfd(void) { - grpc_kick_fd_info *info = NULL; - gpr_mu_lock(&fd_freelist_mu); - if (fd_freelist != NULL) { - info = fd_freelist; - fd_freelist = fd_freelist->next; - --fd_freelist_count; - } - gpr_mu_unlock(&fd_freelist_mu); - if (info == NULL) { - info = gpr_malloc(sizeof(*info)); - grpc_wakeup_fd_create(&info->wakeup_fd); - info->next = NULL; - } - return info; -} - -static void destroy_wfd(grpc_kick_fd_info *wfd) { - grpc_wakeup_fd_destroy(&wfd->wakeup_fd); - gpr_free(wfd); -} - -static void free_wfd(grpc_kick_fd_info *fd_info) { - gpr_mu_lock(&fd_freelist_mu); - if (fd_freelist_count < GRPC_MAX_CACHED_WFDS) { - fd_info->next = fd_freelist; - fd_freelist = fd_info; - fd_freelist_count++; - fd_info = NULL; - } - gpr_mu_unlock(&fd_freelist_mu); - - if (fd_info) { - destroy_wfd(fd_info); - } -} - -void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) { - gpr_mu_init(&kick_state->mu); - kick_state->kicked = 0; - kick_state->fd_list.next = kick_state->fd_list.prev = &kick_state->fd_list; -} - -void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) { - gpr_mu_destroy(&kick_state->mu); - GPR_ASSERT(kick_state->fd_list.next == &kick_state->fd_list); -} - -grpc_kick_fd_info *grpc_pollset_kick_pre_poll( - grpc_pollset_kick_state *kick_state) { - grpc_kick_fd_info *fd_info; - gpr_mu_lock(&kick_state->mu); - if (kick_state->kicked) { - kick_state->kicked = 0; - gpr_mu_unlock(&kick_state->mu); - return NULL; - } - fd_info = allocate_wfd(); - fd_info->next = &kick_state->fd_list; - fd_info->prev = fd_info->next->prev; - fd_info->next->prev = fd_info->prev->next = fd_info; - gpr_mu_unlock(&kick_state->mu); - return fd_info; -} - -void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, - grpc_kick_fd_info *fd_info) { - grpc_wakeup_fd_consume_wakeup(&fd_info->wakeup_fd); -} - -void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, - grpc_kick_fd_info *fd_info) { - gpr_mu_lock(&kick_state->mu); - fd_info->next->prev = fd_info->prev; - fd_info->prev->next = fd_info->next; - free_wfd(fd_info); - gpr_mu_unlock(&kick_state->mu); -} - -void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) { - gpr_mu_lock(&kick_state->mu); - if (kick_state->fd_list.next != &kick_state->fd_list) { - grpc_wakeup_fd_wakeup(&kick_state->fd_list.next->wakeup_fd); - } else { - kick_state->kicked = 1; - } - gpr_mu_unlock(&kick_state->mu); -} - -void grpc_pollset_kick_global_init_fallback_fd(void) { - gpr_mu_init(&fd_freelist_mu); - grpc_wakeup_fd_global_init_force_fallback(); -} - -void grpc_pollset_kick_global_init(void) { - gpr_mu_init(&fd_freelist_mu); - grpc_wakeup_fd_global_init(); -} - -void grpc_pollset_kick_global_destroy(void) { - while (fd_freelist != NULL) { - grpc_kick_fd_info *current = fd_freelist; - fd_freelist = fd_freelist->next; - destroy_wfd(current); - } - grpc_wakeup_fd_global_destroy(); - gpr_mu_destroy(&fd_freelist_mu); -} - -#endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h deleted file mode 100644 index 77e32a8d51..0000000000 --- a/src/core/iomgr/pollset_kick_posix.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H -#define GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H - -#include "src/core/iomgr/wakeup_fd_posix.h" -#include <grpc/support/sync.h> - -/* pollset kicking allows breaking a thread out of polling work for - a given pollset. - writing a byte to a pipe is used as a posix-ly portable base - mechanism, and eventfds are utilized on Linux for better performance. */ - -typedef struct grpc_kick_fd_info { - grpc_wakeup_fd_info wakeup_fd; - /* used for polling list and free list */ - struct grpc_kick_fd_info *next; - /* only used when polling */ - struct grpc_kick_fd_info *prev; -} grpc_kick_fd_info; - -typedef struct grpc_pollset_kick_state { - gpr_mu mu; - int kicked; - struct grpc_kick_fd_info fd_list; -} grpc_pollset_kick_state; - -#define GRPC_POLLSET_KICK_GET_FD(kick_fd_info) \ - GRPC_WAKEUP_FD_GET_READ_FD(&(kick_fd_info)->wakeup_fd) - -/* This is an abstraction around the typical pipe mechanism for waking up a - thread sitting in a poll() style call. */ - -void grpc_pollset_kick_global_init(void); -void grpc_pollset_kick_global_destroy(void); - -void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state); -void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state); - -/* Guarantees a pure posix implementation rather than a specialized one, if - * applicable. Intended for testing. */ -void grpc_pollset_kick_global_init_fallback_fd(void); - -/* Must be called before entering poll(). If return value is NULL, this consumed - an existing kick. Otherwise the return value is an FD to add to the poll set. - */ -grpc_kick_fd_info *grpc_pollset_kick_pre_poll( - grpc_pollset_kick_state *kick_state); - -/* Consume an existing kick. Must be called after poll returns that the fd was - readable, and before calling kick_post_poll. */ -void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state, - grpc_kick_fd_info *fd_info); - -/* Must be called after pre_poll, and after consume if applicable */ -void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state, - grpc_kick_fd_info *fd_info); - -/* Actually kick */ -void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state); - -#endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_KICK_POSIX_H */ diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index d697b59e4c..1320c64579 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -36,6 +36,7 @@ #ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL #include <errno.h> +#include <poll.h> #include <string.h> #include <sys/epoll.h> #include <unistd.h> @@ -44,23 +45,28 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +typedef struct wakeup_fd_hdl { + grpc_wakeup_fd wakeup_fd; + struct wakeup_fd_hdl *next; +} wakeup_fd_hdl; + +typedef struct { + grpc_pollset *pollset; + grpc_fd *fd; + grpc_iomgr_closure closure; +} delayed_add; + typedef struct { int epoll_fd; - grpc_wakeup_fd_info wakeup_fd; + wakeup_fd_hdl *free_wakeup_fds; } pollset_hdr; -static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset) { +static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) { pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; grpc_fd_watcher watcher; - if (and_unlock_pollset) { - gpr_mu_unlock(&pollset->mu); - } - /* We pretend to be polling whilst adding an fd to keep the fd from being closed during the add. This may result in a spurious wakeup being assigned to this pollset whilst adding, but that should be benign. */ @@ -80,6 +86,52 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, grpc_fd_end_poll(&watcher, 0, 0); } +static void perform_delayed_add(void *arg, int iomgr_status) { + delayed_add *da = arg; + int do_shutdown_cb = 0; + + if (!grpc_fd_is_orphaned(da->fd)) { + finally_add_fd(da->pollset, da->fd); + } + + gpr_mu_lock(&da->pollset->mu); + da->pollset->in_flight_cbs--; + if (da->pollset->shutting_down) { + /* We don't care about this pollset anymore. */ + if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { + GPR_ASSERT(!grpc_pollset_has_workers(da->pollset)); + da->pollset->called_shutdown = 1; + do_shutdown_cb = 1; + } + } + gpr_mu_unlock(&da->pollset->mu); + + GRPC_FD_UNREF(da->fd, "delayed_add"); + + if (do_shutdown_cb) { + da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg); + } + + gpr_free(da); +} + +static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, + grpc_fd *fd, + int and_unlock_pollset) { + if (and_unlock_pollset) { + gpr_mu_unlock(&pollset->mu); + finally_add_fd(pollset, fd); + } else { + delayed_add *da = gpr_malloc(sizeof(*da)); + da->pollset = pollset; + da->fd = fd; + GRPC_FD_REF(fd, "delayed_add"); + grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da); + pollset->in_flight_cbs++; + grpc_iomgr_add_callback(&da->closure); + } +} + static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset) { @@ -103,12 +155,14 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, #define GRPC_EPOLL_MAX_EVENTS 1000 static void multipoll_with_epoll_pollset_maybe_work( - grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback) { + grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, + gpr_timespec now, int allow_synchronous_callback) { struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; int ep_rv; + int poll_rv; pollset_hdr *h = pollset->data.ptr; int timeout_ms; + struct pollfd pfds[2]; /* If you want to ignore epoll's ability to sanely handle parallel pollers, * for a more apples-to-apples performance comparison with poll, add a @@ -116,43 +170,58 @@ static void multipoll_with_epoll_pollset_maybe_work( * here. */ - pollset->counter += 1; gpr_mu_unlock(&pollset->mu); timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now); - do { - ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, timeout_ms); - if (ep_rv < 0) { - if (errno != EINTR) { - gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); - } - } else { - int i; - for (i = 0; i < ep_rv; ++i) { - if (ep_ev[i].data.ptr == 0) { - grpc_wakeup_fd_consume_wakeup(&h->wakeup_fd); - } else { - grpc_fd *fd = ep_ev[i].data.ptr; - /* TODO(klempner): We might want to consider making err and pri - * separate events */ - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write = ep_ev[i].events & EPOLLOUT; - if (read || cancel) { - grpc_fd_become_readable(fd, allow_synchronous_callback); + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfds[0].events = POLLIN; + pfds[0].revents = 0; + pfds[1].fd = h->epoll_fd; + pfds[1].events = POLLIN; + pfds[1].revents = 0; + + poll_rv = poll(pfds, 2, timeout_ms); + + if (poll_rv < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } + } else if (poll_rv == 0) { + /* do nothing */ + } else { + if (pfds[0].revents) { + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); + } + if (pfds[1].revents) { + do { + ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0); + if (ep_rv < 0) { + if (errno != EINTR) { + gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno)); } - if (write || cancel) { - grpc_fd_become_writable(fd, allow_synchronous_callback); + } else { + int i; + for (i = 0; i < ep_rv; ++i) { + grpc_fd *fd = ep_ev[i].data.ptr; + /* TODO(klempner): We might want to consider making err and pri + * separate events */ + int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); + int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI); + int write = ep_ev[i].events & EPOLLOUT; + if (read || cancel) { + grpc_fd_become_readable(fd, allow_synchronous_callback); + } + if (write || cancel) { + grpc_fd_become_writable(fd, allow_synchronous_callback); + } } } - } + } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); } - timeout_ms = 0; - } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); + } gpr_mu_lock(&pollset->mu); - pollset->counter -= 1; } static void multipoll_with_epoll_pollset_finish_shutdown( @@ -160,21 +229,14 @@ static void multipoll_with_epoll_pollset_finish_shutdown( static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { pollset_hdr *h = pollset->data.ptr; - grpc_wakeup_fd_destroy(&h->wakeup_fd); close(h->epoll_fd); gpr_free(h); } -static void epoll_kick(grpc_pollset *pollset) { - pollset_hdr *h = pollset->data.ptr; - grpc_wakeup_fd_wakeup(&h->wakeup_fd); -} - static const grpc_pollset_vtable multipoll_with_epoll_pollset = { multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd, multipoll_with_epoll_pollset_maybe_work, - epoll_kick, multipoll_with_epoll_pollset_finish_shutdown, multipoll_with_epoll_pollset_destroy}; @@ -182,8 +244,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, size_t nfds) { size_t i; pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); - struct epoll_event ev; - int err; pollset->vtable = &multipoll_with_epoll_pollset; pollset->data.ptr = h; @@ -196,16 +256,6 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, for (i = 0; i < nfds; i++) { multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0); } - - grpc_wakeup_fd_create(&h->wakeup_fd); - ev.events = EPOLLIN; - ev.data.ptr = 0; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, - GRPC_WAKEUP_FD_GET_READ_FD(&h->wakeup_fd), &ev); - if (err < 0) { - gpr_log(GPR_ERROR, "Wakeup fd epoll_ctl failed: %s", strerror(errno)); - abort(); - } } grpc_platform_become_multipoller_type grpc_platform_become_multipoller = diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 0084e83953..b5b2d7534d 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -53,12 +53,6 @@ typedef struct { size_t fd_count; size_t fd_capacity; grpc_fd **fds; - /* fds being polled by the current poller: parallel arrays of pollfd, and - a grpc_fd_watcher */ - size_t pfd_count; - size_t pfd_capacity; - grpc_fd_watcher *watchers; - struct pollfd *pfds; /* fds that have been removed from the pollset explicitly */ size_t del_count; size_t del_capacity; @@ -102,80 +96,60 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, } } -static void end_polling(grpc_pollset *pollset) { - size_t i; - pollset_hdr *h; - h = pollset->data.ptr; - for (i = 1; i < h->pfd_count; i++) { - grpc_fd_end_poll(&h->watchers[i], h->pfds[i].revents & POLLIN, - h->pfds[i].revents & POLLOUT); - } -} - static void multipoll_with_poll_pollset_maybe_work( - grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback) { + grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, + gpr_timespec now, int allow_synchronous_callback) { int timeout; int r; - size_t i, np, nf, nd; + size_t i, j, pfd_count, fd_count; pollset_hdr *h; - grpc_kick_fd_info *kfd; + /* TODO(ctiller): inline some elements to avoid an allocation */ + grpc_fd_watcher *watchers; + struct pollfd *pfds; h = pollset->data.ptr; timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); - if (h->pfd_capacity < h->fd_count + 1) { - h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); - gpr_free(h->pfds); - gpr_free(h->watchers); - h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity); - h->watchers = gpr_malloc(sizeof(grpc_fd_watcher) * h->pfd_capacity); - } - nf = 0; - np = 1; - kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state); - if (kfd == NULL) { - /* Already kicked */ - return; - } - h->pfds[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd); - h->pfds[0].events = POLLIN; - h->pfds[0].revents = POLLOUT; + /* TODO(ctiller): perform just one malloc here if we exceed the inline case */ + pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 1)); + watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 1)); + fd_count = 0; + pfd_count = 1; + pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); + pfds[0].events = POLLIN; + pfds[0].revents = POLLOUT; for (i = 0; i < h->fd_count; i++) { int remove = grpc_fd_is_orphaned(h->fds[i]); - for (nd = 0; nd < h->del_count; nd++) { - if (h->fds[i] == h->dels[nd]) remove = 1; + for (j = 0; !remove && j < h->del_count; j++) { + if (h->fds[i] == h->dels[j]) remove = 1; } if (remove) { GRPC_FD_UNREF(h->fds[i], "multipoller"); } else { - h->fds[nf++] = h->fds[i]; - h->watchers[np].fd = h->fds[i]; - h->pfds[np].fd = h->fds[i]->fd; - h->pfds[np].revents = 0; - np++; + h->fds[fd_count++] = h->fds[i]; + watchers[pfd_count].fd = h->fds[i]; + pfds[pfd_count].fd = h->fds[i]->fd; + pfds[pfd_count].revents = 0; + pfd_count++; } } - h->pfd_count = np; - h->fd_count = nf; - for (nd = 0; nd < h->del_count; nd++) { - GRPC_FD_UNREF(h->dels[nd], "multipoller_del"); + for (j = 0; j < h->del_count; j++) { + GRPC_FD_UNREF(h->dels[j], "multipoller_del"); } h->del_count = 0; - if (h->pfd_count == 0) { - end_polling(pollset); - return; - } - pollset->counter++; + h->fd_count = fd_count; gpr_mu_unlock(&pollset->mu); - for (i = 1; i < np; i++) { - h->pfds[i].events = grpc_fd_begin_poll(h->watchers[i].fd, pollset, POLLIN, - POLLOUT, &h->watchers[i]); + for (i = 1; i < pfd_count; i++) { + pfds[i].events = grpc_fd_begin_poll(watchers[i].fd, pollset, POLLIN, + POLLOUT, &watchers[i]); } - r = poll(h->pfds, h->pfd_count, timeout); + r = poll(pfds, pfd_count, timeout); - end_polling(pollset); + for (i = 1; i < pfd_count; i++) { + grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN, + pfds[i].revents & POLLOUT); + } if (r < 0) { if (errno != EINTR) { @@ -184,35 +158,31 @@ static void multipoll_with_poll_pollset_maybe_work( } else if (r == 0) { /* do nothing */ } else { - if (h->pfds[0].revents & POLLIN) { - grpc_pollset_kick_consume(&pollset->kick_state, kfd); + if (pfds[0].revents & POLLIN) { + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } - for (i = 1; i < np; i++) { - if (h->watchers[i].fd == NULL) { + for (i = 1; i < pfd_count; i++) { + if (watchers[i].fd == NULL) { continue; } - if (h->pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable(h->watchers[i].fd, allow_synchronous_callback); + if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) { + grpc_fd_become_readable(watchers[i].fd, allow_synchronous_callback); } - if (h->pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable(h->watchers[i].fd, allow_synchronous_callback); + if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) { + grpc_fd_become_writable(watchers[i].fd, allow_synchronous_callback); } } } - grpc_pollset_kick_post_poll(&pollset->kick_state, kfd); - gpr_mu_lock(&pollset->mu); - pollset->counter--; -} + gpr_free(pfds); + gpr_free(watchers); -static void multipoll_with_poll_pollset_kick(grpc_pollset *p) { - grpc_pollset_force_kick(p); + gpr_mu_lock(&pollset->mu); } static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { size_t i; pollset_hdr *h = pollset->data.ptr; - GPR_ASSERT(pollset->counter == 0); for (i = 0; i < h->fd_count; i++) { GRPC_FD_UNREF(h->fds[i], "multipoller"); } @@ -226,8 +196,6 @@ static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { pollset_hdr *h = pollset->data.ptr; multipoll_with_poll_pollset_finish_shutdown(pollset); - gpr_free(h->pfds); - gpr_free(h->watchers); gpr_free(h->fds); gpr_free(h->dels); gpr_free(h); @@ -237,7 +205,6 @@ static const grpc_pollset_vtable multipoll_with_poll_pollset = { multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, multipoll_with_poll_pollset_maybe_work, - multipoll_with_poll_pollset_kick, multipoll_with_poll_pollset_finish_shutdown, multipoll_with_poll_pollset_destroy}; @@ -250,10 +217,6 @@ void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, h->fd_count = nfds; h->fd_capacity = nfds; h->fds = gpr_malloc(nfds * sizeof(grpc_fd *)); - h->pfd_count = 0; - h->pfd_capacity = 0; - h->pfds = NULL; - h->watchers = NULL; h->del_count = 0; h->del_capacity = 0; h->dels = NULL; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index c8646af615..d3a9193af1 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -55,22 +55,60 @@ #include <grpc/support/useful.h> GPR_TLS_DECL(g_current_thread_poller); +GPR_TLS_DECL(g_current_thread_worker); -void grpc_pollset_kick(grpc_pollset *p) { - if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p && p->counter) { - p->vtable->kick(p); - } +static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev->next = worker->next; + worker->next->prev = worker->prev; +} + +int grpc_pollset_has_workers(grpc_pollset *p) { + return p->root_worker.next != &p->root_worker; } -void grpc_pollset_force_kick(grpc_pollset *p) { - if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { - grpc_pollset_kick_kick(&p->kick_state); +static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { + if (grpc_pollset_has_workers(p)) { + grpc_pollset_worker *w = p->root_worker.next; + remove_worker(p, w); + return w; + } else { + return NULL; } } -static void kick_using_pollset_kick(grpc_pollset *p) { - if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { - grpc_pollset_kick_kick(&p->kick_state); +static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->next = &p->root_worker; + worker->prev = worker->next->prev; + worker->prev->next = worker->next->prev = worker; +} + +static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev = &p->root_worker; + worker->next = worker->prev->next; + worker->prev->next = worker->next->prev = worker; +} + +void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { + if (specific_worker != NULL) { + if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + for (specific_worker = p->root_worker.next; + specific_worker != &p->root_worker; + specific_worker = specific_worker->next) { + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + } + p->kicked_without_pollers = 1; + } else if (gpr_tls_get(&g_current_thread_worker) != + (gpr_intptr)specific_worker) { + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + } + } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) { + specific_worker = pop_front_worker(p); + if (specific_worker != NULL) { + push_back_worker(p, specific_worker); + grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd); + } else { + p->kicked_without_pollers = 1; + } } } @@ -78,16 +116,12 @@ static void kick_using_pollset_kick(grpc_pollset *p) { void grpc_pollset_global_init(void) { gpr_tls_init(&g_current_thread_poller); - - /* Initialize kick fd state */ - grpc_pollset_kick_global_init(); + grpc_wakeup_fd_global_init(); } void grpc_pollset_global_shutdown(void) { - /* destroy the kick pipes */ - grpc_pollset_kick_global_destroy(); - gpr_tls_destroy(&g_current_thread_poller); + grpc_wakeup_fd_global_destroy(); } /* main interface */ @@ -96,7 +130,7 @@ static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null); void grpc_pollset_init(grpc_pollset *pollset) { gpr_mu_init(&pollset->mu); - grpc_pollset_kick_init(&pollset->kick_state); + pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->in_flight_cbs = 0; pollset->shutting_down = 0; pollset->called_shutdown = 0; @@ -134,27 +168,44 @@ static void finish_shutdown(grpc_pollset *pollset) { pollset->shutdown_done_cb(pollset->shutdown_done_arg); } -int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { +int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline) { /* pollset->mu already held */ gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + int added_worker = 0; if (gpr_time_cmp(now, deadline) > 0) { return 0; } + /* this must happen before we (potentially) drop pollset->mu */ + worker->next = worker->prev = NULL; + /* TODO(ctiller): pool these */ + grpc_wakeup_fd_init(&worker->wakeup_fd); if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) { - return 1; + goto done; } if (grpc_alarm_check(&pollset->mu, now, &deadline)) { - return 1; + goto done; } if (pollset->shutting_down) { - return 1; + goto done; + } + if (!pollset->kicked_without_pollers) { + push_front_worker(pollset, worker); + added_worker = 1; + gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); + pollset->vtable->maybe_work(pollset, worker, deadline, now, 1); + gpr_tls_set(&g_current_thread_poller, 0); + } else { + pollset->kicked_without_pollers = 0; + } +done: + grpc_wakeup_fd_destroy(&worker->wakeup_fd); + if (added_worker) { + remove_worker(pollset, worker); } - gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); - pollset->vtable->maybe_work(pollset, deadline, now, 1); - gpr_tls_set(&g_current_thread_poller, 0); if (pollset->shutting_down) { - if (pollset->counter > 0) { - grpc_pollset_kick(pollset); + if (grpc_pollset_has_workers(pollset)) { + grpc_pollset_kick(pollset, NULL); } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); @@ -177,15 +228,13 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, GPR_ASSERT(!pollset->shutting_down); pollset->shutting_down = 1; if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && - pollset->counter == 0) { + !grpc_pollset_has_workers(pollset)) { pollset->called_shutdown = 1; call_shutdown = 1; } pollset->shutdown_done_cb = shutdown_done; pollset->shutdown_done_arg = shutdown_done_arg; - if (pollset->counter > 0) { - grpc_pollset_kick(pollset); - } + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); gpr_mu_unlock(&pollset->mu); if (call_shutdown) { @@ -196,8 +245,8 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, void grpc_pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(pollset->shutting_down); GPR_ASSERT(pollset->in_flight_cbs == 0); + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); pollset->vtable->destroy(pollset); - grpc_pollset_kick_destroy(&pollset->kick_state); gpr_mu_destroy(&pollset->mu); } @@ -248,8 +297,8 @@ static void basic_do_promote(void *args, int success) { gpr_mu_lock(&pollset->mu); /* First we need to ensure that nobody is polling concurrently */ - if (pollset->counter != 0) { - grpc_pollset_kick(pollset); + if (grpc_pollset_has_workers(pollset)) { + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); grpc_iomgr_add_callback(&up_args->promotion_closure); gpr_mu_unlock(&pollset->mu); return; @@ -264,7 +313,8 @@ static void basic_do_promote(void *args, int success) { pollset->in_flight_cbs--; if (pollset->shutting_down) { /* We don't care about this pollset anymore. */ - if (pollset->in_flight_cbs == 0 && pollset->counter == 0 && !pollset->called_shutdown) { + if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { + GPR_ASSERT(!grpc_pollset_has_workers(pollset)); pollset->called_shutdown = 1; do_shutdown_cb = 1; } @@ -307,7 +357,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, GPR_ASSERT(fd); if (fd == pollset->data.ptr) goto exit; - if (!pollset->counter) { + if (!grpc_pollset_has_workers(pollset)) { /* Fast path -- no in flight cbs */ /* TODO(klempner): Comment this out and fix any test failures or establish * they are due to timing issues */ @@ -343,7 +393,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, up_args->promotion_closure.cb_arg = up_args; grpc_iomgr_add_callback(&up_args->promotion_closure); - grpc_pollset_kick(pollset); + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: if (and_unlock_pollset) { @@ -365,12 +415,12 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, } static void basic_pollset_maybe_work(grpc_pollset *pollset, + grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now, int allow_synchronous_callback) { struct pollfd pfd[2]; grpc_fd *fd; grpc_fd_watcher fd_watcher; - grpc_kick_fd_info *kfd; int timeout; int r; int nfds; @@ -387,16 +437,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, fd = pollset->data.ptr = NULL; } timeout = grpc_poll_deadline_to_millis_timeout(deadline, now); - kfd = grpc_pollset_kick_pre_poll(&pollset->kick_state); - if (kfd == NULL) { - /* Already kicked */ - return; - } - pfd[0].fd = GRPC_POLLSET_KICK_GET_FD(kfd); + pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd); pfd[0].events = POLLIN; pfd[0].revents = 0; nfds = 1; - pollset->counter++; if (fd) { pfd[1].fd = fd->fd; pfd[1].revents = 0; @@ -428,7 +472,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, /* do nothing */ } else { if (pfd[0].revents & POLLIN) { - grpc_pollset_kick_consume(&pollset->kick_state, kfd); + grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd); } if (nfds > 1) { if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) { @@ -440,14 +484,10 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, } } - grpc_pollset_kick_post_poll(&pollset->kick_state, kfd); - gpr_mu_lock(&pollset->mu); - pollset->counter--; } static void basic_pollset_destroy(grpc_pollset *pollset) { - GPR_ASSERT(pollset->counter == 0); if (pollset->data.ptr != NULL) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); pollset->data.ptr = NULL; @@ -455,14 +495,13 @@ static void basic_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, - kick_using_pollset_kick, basic_pollset_destroy, basic_pollset_destroy}; + basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, + basic_pollset_destroy, basic_pollset_destroy}; static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { pollset->vtable = &basic_pollset; - pollset->counter = 0; pollset->data.ptr = fd_or_null; - if (fd_or_null) { + if (fd_or_null != NULL) { GRPC_FD_REF(fd_or_null, "basicpoll"); } } diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 37de1276d1..1c1b736193 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -35,8 +35,7 @@ #define GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H #include <grpc/support/sync.h> - -#include "src/core/iomgr/pollset_kick_posix.h" +#include "src/core/iomgr/wakeup_fd_posix.h" typedef struct grpc_pollset_vtable grpc_pollset_vtable; @@ -45,6 +44,12 @@ typedef struct grpc_pollset_vtable grpc_pollset_vtable; use the struct tag */ struct grpc_fd; +typedef struct grpc_pollset_worker { + grpc_wakeup_fd wakeup_fd; + struct grpc_pollset_worker *next; + struct grpc_pollset_worker *prev; +} grpc_pollset_worker; + typedef struct grpc_pollset { /* pollsets under posix can mutate representation as fds are added and removed. @@ -52,11 +57,11 @@ typedef struct grpc_pollset { few fds, and an epoll() based implementation for many fds */ const grpc_pollset_vtable *vtable; gpr_mu mu; - grpc_pollset_kick_state kick_state; - int counter; + grpc_pollset_worker root_worker; int in_flight_cbs; int shutting_down; int called_shutdown; + int kicked_without_pollers; void (*shutdown_done_cb)(void *arg); void *shutdown_done_arg; union { @@ -70,9 +75,9 @@ struct grpc_pollset_vtable { int and_unlock_pollset); void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd, int and_unlock_pollset); - void (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, - gpr_timespec now, int allow_synchronous_callback); - void (*kick)(grpc_pollset *pollset); + void (*maybe_work)(grpc_pollset *pollset, grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now, + int allow_synchronous_callback); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; @@ -85,22 +90,16 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd); poll after an fd is orphaned) */ void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd); -/* Force any current pollers to break polling: it's the callers responsibility - to ensure that the pollset indeed needs to be kicked - no verification that - the pollset is actually performing polling work is done. At worst this will - result in spurious wakeups if performed at the wrong moment. - Does not touch pollset->mu. */ -void grpc_pollset_force_kick(grpc_pollset *pollset); /* Returns the fd to listen on for kicks */ int grpc_kick_read_fd(grpc_pollset *p); /* Call after polling has been kicked to leave the kicked state */ void grpc_kick_drain(grpc_pollset *p); /* Convert a timespec to milliseconds: - - very small or negative poll times are clamped to zero to do a + - very small or negative poll times are clamped to zero to do a non-blocking poll (which becomes spin polling) - other small values are rounded up to one millisecond - - longer than a millisecond polls are rounded up to the next nearest + - longer than a millisecond polls are rounded up to the next nearest millisecond to avoid spinning - infinite timeouts are converted to -1 */ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); @@ -114,4 +113,8 @@ extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller; void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds, size_t fd_count); +/* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must + * be locked) */ +int grpc_pollset_has_workers(grpc_pollset *pollset); + #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 5ff7df1dcd..2076ac70ef 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -60,7 +60,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset) { - size_t i; + size_t i, j; gpr_mu_lock(&pollset_set->mu); if (pollset_set->pollset_count == pollset_set->pollset_capacity) { pollset_set->pollset_capacity = @@ -70,9 +70,15 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, sizeof(*pollset_set->pollsets)); } pollset_set->pollsets[pollset_set->pollset_count++] = pollset; - for (i = 0; i < pollset_set->fd_count; i++) { - grpc_pollset_add_fd(pollset, pollset_set->fds[i]); + for (i = 0, j = 0; i < pollset_set->fd_count; i++) { + if (grpc_fd_is_orphaned(pollset_set->fds[i])) { + GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); + } else { + grpc_pollset_add_fd(pollset, pollset_set->fds[i]); + pollset_set->fds[j++] = pollset_set->fds[i]; + } } + pollset_set->fd_count = j; gpr_mu_unlock(&pollset_set->mu); } diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index a9c4739c7c..22dc5891c3 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -42,6 +42,38 @@ #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" +static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev->next = worker->next; + worker->next->prev = worker->prev; +} + +static int has_workers(grpc_pollset *p) { + return p->root_worker.next != &p->root_worker; +} + +static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { + if (has_workers(p)) { + grpc_pollset_worker *w = p->root_worker.next; + remove_worker(p, w); + return w; + } + else { + return NULL; + } +} + +static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->next = &p->root_worker; + worker->prev = worker->next->prev; + worker->prev->next = worker->next->prev = worker; +} + +static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { + worker->prev = &p->root_worker; + worker->next = worker->prev->next; + worker->prev->next = worker->next->prev = worker; +} + /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. We're still going to provide a minimal set of features for the sake of the rest of grpc. But grpc_pollset_work @@ -50,7 +82,8 @@ void grpc_pollset_init(grpc_pollset *pollset) { memset(pollset, 0, sizeof(*pollset)); gpr_mu_init(&pollset->mu); - gpr_cv_init(&pollset->cv); + pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; + pollset->kicked_without_pollers = 0; } void grpc_pollset_shutdown(grpc_pollset *pollset, @@ -58,34 +91,66 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, void *shutdown_done_arg) { gpr_mu_lock(&pollset->mu); pollset->shutting_down = 1; - gpr_cv_broadcast(&pollset->cv); + grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); gpr_mu_unlock(&pollset->mu); shutdown_done(shutdown_done_arg); } void grpc_pollset_destroy(grpc_pollset *pollset) { gpr_mu_destroy(&pollset->mu); - gpr_cv_destroy(&pollset->cv); } -int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { +int grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline) { gpr_timespec now; + int added_worker = 0; now = gpr_now(GPR_CLOCK_MONOTONIC); if (gpr_time_cmp(now, deadline) > 0) { return 0 /* GPR_FALSE */; } + worker->next = worker->prev = NULL; + gpr_cv_init(&worker->cv); if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1 /* GPR_TRUE */)) { - return 1 /* GPR_TRUE */; + goto done; } if (grpc_alarm_check(&pollset->mu, now, &deadline)) { - return 1 /* GPR_TRUE */; + goto done; } - if (!pollset->shutting_down) { - gpr_cv_wait(&pollset->cv, &pollset->mu, deadline); + if (!pollset->kicked_without_pollers && !pollset->shutting_down) { + push_front_worker(pollset, worker); + added_worker = 1; + gpr_cv_wait(&worker->cv, &pollset->mu, deadline); + } else { + pollset->kicked_without_pollers = 0; + } +done: + gpr_cv_destroy(&worker->cv); + if (added_worker) { + remove_worker(pollset, worker); } return 1 /* GPR_TRUE */; } -void grpc_pollset_kick(grpc_pollset *p) { gpr_cv_signal(&p->cv); } +void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { + if (specific_worker != NULL) { + if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { + for (specific_worker = p->root_worker.next; + specific_worker != &p->root_worker; + specific_worker = specific_worker->next) { + gpr_cv_signal(&specific_worker->cv); + } + p->kicked_without_pollers = 1; + } else { + gpr_cv_signal(&specific_worker->cv); + } + } else { + specific_worker = pop_front_worker(p); + if (specific_worker != NULL) { + push_back_worker(p, specific_worker); + gpr_cv_signal(&specific_worker->cv); + } else { + p->kicked_without_pollers = 1; + } + } +} #endif /* GPR_WINSOCK_SOCKET */ diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index c9b8d3f374..4efa5a1717 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -40,12 +40,20 @@ /* There isn't really any such thing as a pollset under Windows, due to the nature of the IO completion ports. A Windows "pollset" is merely a mutex - and a condition variable, used to synchronize with the IOCP. */ + used to synchronize with the IOCP, and workers are condition variables + used to block threads until work is ready. */ + +typedef struct grpc_pollset_worker { + gpr_cv cv; + struct grpc_pollset_worker *next; + struct grpc_pollset_worker *prev; +} grpc_pollset_worker; typedef struct grpc_pollset { gpr_mu mu; - gpr_cv cv; int shutting_down; + int kicked_without_pollers; + grpc_pollset_worker root_worker; } grpc_pollset; #define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index e91b94f8c8..65ec1f94ac 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -36,12 +36,18 @@ #include <errno.h> #include <string.h> -#include "src/core/support/string.h" +#ifdef GPR_POSIX_SOCKET +#include <sys/un.h> +#endif + +#include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/support/string.h" + static const gpr_uint8 kV4MappedPrefix[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff}; @@ -161,6 +167,36 @@ int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, return ret; } +char *grpc_sockaddr_to_uri(const struct sockaddr *addr) { + char *temp; + char *result; + struct sockaddr_in addr_normalized; + + if (grpc_sockaddr_is_v4mapped(addr, &addr_normalized)) { + addr = (const struct sockaddr *)&addr_normalized; + } + + switch (addr->sa_family) { + case AF_INET: + grpc_sockaddr_to_string(&temp, addr, 0); + gpr_asprintf(&result, "ipv4:%s", temp); + gpr_free(temp); + return result; + case AF_INET6: + grpc_sockaddr_to_string(&temp, addr, 0); + gpr_asprintf(&result, "ipv6:%s", temp); + gpr_free(temp); + return result; +#ifdef GPR_POSIX_SOCKET + case AF_UNIX: + gpr_asprintf(&result, "unix:%s", ((struct sockaddr_un *)addr)->sun_path); + return result; +#endif + } + + return NULL; +} + int grpc_sockaddr_get_port(const struct sockaddr *addr) { switch (addr->sa_family) { case AF_INET: diff --git a/src/core/iomgr/sockaddr_utils.h b/src/core/iomgr/sockaddr_utils.h index bdfb83479b..99f1ed54da 100644 --- a/src/core/iomgr/sockaddr_utils.h +++ b/src/core/iomgr/sockaddr_utils.h @@ -84,4 +84,6 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port); int grpc_sockaddr_to_string(char **out, const struct sockaddr *addr, int normalize); +char *grpc_sockaddr_to_uri(const struct sockaddr *addr); + #endif /* GRPC_INTERNAL_CORE_IOMGR_SOCKADDR_UTILS_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 41d8b169e0..9572ce5980 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -64,6 +64,7 @@ typedef struct { int refs; grpc_iomgr_closure write_closure; grpc_pollset_set *interested_parties; + char *addr_str; } async_connect; static int prepare_socket(const struct sockaddr *addr, int fd) { @@ -88,17 +89,18 @@ error: return 0; } -static void on_alarm(void *acp, int success) { +static void tc_on_alarm(void *acp, int success) { int done; async_connect *ac = acp; gpr_mu_lock(&ac->mu); - if (ac->fd != NULL && success) { + if (ac->fd != NULL) { grpc_fd_shutdown(ac->fd); } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_str); gpr_free(ac); } } @@ -108,11 +110,17 @@ static void on_writable(void *acp, int success) { int so_error = 0; socklen_t so_error_size; int err; - int fd = ac->fd->fd; int done; grpc_endpoint *ep = NULL; void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; void *cb_arg = ac->cb_arg; + grpc_fd *fd; + + gpr_mu_lock(&ac->mu); + GPR_ASSERT(ac->fd); + fd = ac->fd; + ac->fd = NULL; + gpr_mu_unlock(&ac->mu); grpc_alarm_cancel(&ac->alarm); @@ -120,7 +128,7 @@ static void on_writable(void *acp, int success) { if (success) { do { so_error_size = sizeof(so_error); - err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); + err = getsockopt(fd->fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); @@ -143,7 +151,7 @@ static void on_writable(void *acp, int success) { don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); gpr_mu_unlock(&ac->mu); - grpc_fd_notify_on_write(ac->fd, &ac->write_closure); + grpc_fd_notify_on_write(fd, &ac->write_closure); return; } else { switch (so_error) { @@ -157,8 +165,9 @@ static void on_writable(void *acp, int success) { goto finish; } } else { - grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); - ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + grpc_pollset_set_del_fd(ac->interested_parties, fd); + ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); + fd = NULL; goto finish; } } else { @@ -169,16 +178,16 @@ static void on_writable(void *acp, int success) { abort(); finish: - if (ep == NULL) { - grpc_pollset_set_del_fd(ac->interested_parties, ac->fd); - grpc_fd_orphan(ac->fd, NULL, "tcp_client_orphan"); - } else { - ac->fd = NULL; + if (fd != NULL) { + grpc_pollset_set_del_fd(ac->interested_parties, fd); + grpc_fd_orphan(fd, NULL, "tcp_client_orphan"); + fd = NULL; } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_str); gpr_free(ac); } cb(cb_arg, ep); @@ -223,13 +232,13 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), err = connect(fd, addr, addr_len); } while (err < 0 && errno == EINTR); - grpc_sockaddr_to_string(&addr_str, addr, 1); + addr_str = grpc_sockaddr_to_uri(addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); fdobj = grpc_fd_create(fd, name); if (err >= 0) { - cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); goto done; } @@ -247,6 +256,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac->cb_arg = arg; ac->fd = fdobj; ac->interested_parties = interested_parties; + ac->addr_str = addr_str; + addr_str = NULL; gpr_mu_init(&ac->mu); ac->refs = 2; ac->write_closure.cb = on_writable; @@ -254,7 +265,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), gpr_mu_lock(&ac->mu); grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); + tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); grpc_fd_notify_on_write(ac->fd, &ac->write_closure); gpr_mu_unlock(&ac->mu); diff --git a/src/core/iomgr/tcp_client_windows.c b/src/core/iomgr/tcp_client_windows.c index 39fd43130b..79a58fe2af 100644 --- a/src/core/iomgr/tcp_client_windows.c +++ b/src/core/iomgr/tcp_client_windows.c @@ -58,6 +58,7 @@ typedef struct { grpc_winsocket *socket; gpr_timespec deadline; grpc_alarm alarm; + char *addr_name; int refs; int aborted; } async_connect; @@ -67,6 +68,7 @@ static void async_connect_cleanup(async_connect *ac) { gpr_mu_unlock(&ac->mu); if (done) { gpr_mu_destroy(&ac->mu); + gpr_free(ac->addr_name); gpr_free(ac); } } @@ -107,7 +109,7 @@ static void on_connect(void *acp, int from_iocp) { gpr_log(GPR_ERROR, "on_connect error: %s", utf8_message); gpr_free(utf8_message); } else if (!aborted) { - ep = grpc_tcp_create(ac->socket); + ep = grpc_tcp_create(ac->socket, ac->addr_name); } } else { gpr_log(GPR_ERROR, "on_connect is shutting down"); @@ -213,6 +215,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), ac->socket = socket; gpr_mu_init(&ac->mu); ac->refs = 2; + ac->addr_name = grpc_sockaddr_to_uri(addr); ac->aborted = 0; grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index b6d6efc9fb..24fee0596f 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -44,15 +44,17 @@ #include <sys/socket.h> #include <unistd.h> -#include "src/core/support/string.h" -#include "src/core/debug/trace.h" -#include "src/core/profiling/timers.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/slice.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/support/string.h" +#include "src/core/debug/trace.h" +#include "src/core/profiling/timers.h" + #ifdef GPR_HAVE_MSG_NOSIGNAL #define SENDMSG_FLAGS MSG_NOSIGNAL #else @@ -282,6 +284,8 @@ typedef struct { grpc_iomgr_closure write_closure; grpc_iomgr_closure handle_read_closure; + + char *peer_string; } grpc_tcp; static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); @@ -296,6 +300,7 @@ static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); + gpr_free(tcp->peer_string); gpr_free(tcp); } } @@ -314,7 +319,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, gpr_log(GPR_DEBUG, "read: status=%d", status); for (i = 0; i < nslices; i++) { char *dump = gpr_dump_slice(slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_DEBUG, "READ: %s", dump); + gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump); gpr_free(dump); } } @@ -443,7 +448,7 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { tcp->handle_read_closure.cb_arg = tcp; - grpc_iomgr_add_callback(&tcp->handle_read_closure); + grpc_iomgr_add_delayed_callback(&tcp->handle_read_closure, 1); } } @@ -567,13 +572,27 @@ static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { grpc_pollset_add_fd(pollset, tcp->em_fd); } +static void grpc_tcp_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set) { + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); +} + +static char *grpc_tcp_get_peer(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return gpr_strdup(tcp->peer_string); +} + static const grpc_endpoint_vtable vtable = { - grpc_tcp_notify_on_read, grpc_tcp_write, grpc_tcp_add_to_pollset, - grpc_tcp_shutdown, grpc_tcp_destroy}; + grpc_tcp_notify_on_read, grpc_tcp_write, + grpc_tcp_add_to_pollset, grpc_tcp_add_to_pollset_set, + grpc_tcp_shutdown, grpc_tcp_destroy, + grpc_tcp_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { +grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, + const char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; + tcp->peer_string = gpr_strdup(peer_string); tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index 44279d5a26..d752feaeea 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -53,6 +53,7 @@ extern int grpc_tcp_trace; /* Create a tcp endpoint given a file desciptor and a read slice size. Takes ownership of fd. */ -grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size); +grpc_endpoint *grpc_tcp_create(grpc_fd *fd, size_t read_slice_size, + const char *peer_string); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_POSIX_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 5854031c9b..6399aaadb9 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -142,6 +142,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) { static void finish_shutdown(grpc_tcp_server *s) { s->shutdown_complete(s->shutdown_complete_arg); + s->shutdown_complete = NULL; gpr_mu_destroy(&s->mu); @@ -157,6 +158,7 @@ static void destroyed_port(void *server, int success) { gpr_mu_unlock(&s->mu); finish_shutdown(s); } else { + GPR_ASSERT(s->destroyed_ports < s->nports); gpr_mu_unlock(&s->mu); } } @@ -332,7 +334,7 @@ static void on_read(void *arg, int success) { grpc_set_socket_no_sigpipe_if_possible(fd); - grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1); + addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); fdobj = grpc_fd_create(fd, name); @@ -342,8 +344,9 @@ static void on_read(void *arg, int success) { for (i = 0; i < sp->server->pollset_count; i++) { grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); } - sp->server->cb(sp->server->cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); + sp->server->cb( + sp->server->cb_arg, + grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); gpr_free(name); gpr_free(addr_str); diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index 187009b2c8..0adbe9507c 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -79,6 +79,8 @@ struct grpc_tcp_server { /* active port count: how many ports are actually still listening */ int active_ports; + /* number of iomgr callbacks that have been explicitly scheduled during shutdown */ + int iomgr_callbacks_pending; /* all listening ports */ server_port *ports; @@ -93,6 +95,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) { gpr_mu_init(&s->mu); gpr_cv_init(&s->cv); s->active_ports = 0; + s->iomgr_callbacks_pending = 0; s->cb = NULL; s->cb_arg = NULL; s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); @@ -112,10 +115,10 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; sp->shutting_down = 1; - grpc_winsocket_shutdown(sp->socket); + s->iomgr_callbacks_pending += grpc_winsocket_shutdown(sp->socket); } /* This happens asynchronously. Wait while that happens. */ - while (s->active_ports) { + while (s->active_ports || s->iomgr_callbacks_pending) { gpr_cv_wait(&s->cv, &s->mu, gpr_inf_future(GPR_CLOCK_REALTIME)); } gpr_mu_unlock(&s->mu); @@ -183,6 +186,17 @@ error: return -1; } +static void decrement_active_ports_and_notify(server_port *sp) { + sp->shutting_down = 0; + sp->socket->read_info.outstanding = 0; + gpr_mu_lock(&sp->server->mu); + GPR_ASSERT(sp->server->active_ports > 0); + if (0 == --sp->server->active_ports) { + gpr_cv_broadcast(&sp->server->cv); + } + gpr_mu_unlock(&sp->server->mu); +} + /* start_accept will reference that for the IOCP notification request. */ static void on_accept(void *arg, int from_iocp); @@ -231,6 +245,15 @@ static void start_accept(server_port *port) { return; failure: + if (port->shutting_down) { + /* We are abandoning the listener port, take that into account to prevent + occasional hangs on shutdown. The hang happens when sp->shutting_down + change is not seen by on_accept and we proceed to trying new accept, + but we fail there because the listening port has been closed in the + meantime. */ + decrement_active_ports_and_notify(port); + return; + } utf8_message = gpr_format_message(WSAGetLastError()); gpr_log(GPR_ERROR, message, utf8_message); gpr_free(utf8_message); @@ -243,14 +266,27 @@ static void on_accept(void *arg, int from_iocp) { SOCKET sock = sp->new_socket; grpc_winsocket_callback_info *info = &sp->socket->read_info; grpc_endpoint *ep = NULL; + struct sockaddr_storage peer_name; + char *peer_name_string; + char *fd_name; + int peer_name_len = sizeof(peer_name); DWORD transfered_bytes; DWORD flags; BOOL wsa_success; + int err; /* The general mechanism for shutting down is to queue abortion calls. While this is necessary in the read/write case, it's useless for the accept - case. Let's do nothing. */ - if (!from_iocp) return; + case. We only need to adjust the pending callback count */ + if (!from_iocp) { + gpr_mu_lock(&sp->server->mu); + GPR_ASSERT(sp->server->iomgr_callbacks_pending > 0); + if (0 == --sp->server->iomgr_callbacks_pending) { + gpr_cv_broadcast(&sp->server->cv); + } + gpr_mu_unlock(&sp->server->mu); + return; + } /* The IOCP notified us of a completed operation. Let's grab the results, and act accordingly. */ @@ -259,15 +295,9 @@ static void on_accept(void *arg, int from_iocp) { &transfered_bytes, FALSE, &flags); if (!wsa_success) { if (sp->shutting_down) { - /* During the shutdown case, we ARE expecting an error. So that's swell, + /* During the shutdown case, we ARE expecting an error. So that's well, and we can wake up the shutdown thread. */ - sp->shutting_down = 0; - sp->socket->read_info.outstanding = 0; - gpr_mu_lock(&sp->server->mu); - if (0 == --sp->server->active_ports) { - gpr_cv_broadcast(&sp->server->cv); - } - gpr_mu_unlock(&sp->server->mu); + decrement_active_ports_and_notify(sp); return; } else { char *utf8_message = gpr_format_message(WSAGetLastError()); @@ -277,8 +307,28 @@ static void on_accept(void *arg, int from_iocp) { } } else { if (!sp->shutting_down) { - /* TODO(ctiller): add sockaddr address to label */ - ep = grpc_tcp_create(grpc_winsocket_create(sock, "server")); + peer_name_string = NULL; + err = setsockopt(sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + (char *)&sp->socket->socket, + sizeof(sp->socket->socket)); + if (err) { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "setsockopt error: %s", utf8_message); + gpr_free(utf8_message); + } + err = getpeername(sock, (struct sockaddr*)&peer_name, &peer_name_len); + if (!err) { + peer_name_string = grpc_sockaddr_to_uri((struct sockaddr*)&peer_name); + } else { + char *utf8_message = gpr_format_message(WSAGetLastError()); + gpr_log(GPR_ERROR, "getpeername error: %s", utf8_message); + gpr_free(utf8_message); + } + gpr_asprintf(&fd_name, "tcp_server:%s", peer_name_string); + ep = grpc_tcp_create(grpc_winsocket_create(sock, fd_name), + peer_name_string); + gpr_free(fd_name); + gpr_free(peer_name_string); } } diff --git a/src/core/iomgr/tcp_windows.c b/src/core/iomgr/tcp_windows.c index 1bf81a73e0..89aa741470 100644 --- a/src/core/iomgr/tcp_windows.c +++ b/src/core/iomgr/tcp_windows.c @@ -96,6 +96,8 @@ typedef struct grpc_tcp { to protect ourselves when requesting a shutdown. */ gpr_mu mu; int shutting_down; + + char *peer_string; } grpc_tcp; static void tcp_ref(grpc_tcp *tcp) { @@ -107,6 +109,7 @@ static void tcp_unref(grpc_tcp *tcp) { gpr_slice_buffer_destroy(&tcp->write_slices); grpc_winsocket_orphan(tcp->socket); gpr_mu_destroy(&tcp->mu); + gpr_free(tcp->peer_string); gpr_free(tcp); } } @@ -365,8 +368,17 @@ static grpc_endpoint_write_status win_write(grpc_endpoint *ep, return GRPC_ENDPOINT_WRITE_PENDING; } -static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { - grpc_tcp *tcp = (grpc_tcp *) ep; +static void win_add_to_pollset(grpc_endpoint *ep, grpc_pollset *ps) { + grpc_tcp *tcp; + (void) ps; + tcp = (grpc_tcp *) ep; + grpc_iocp_add_socket(tcp->socket); +} + +static void win_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pss) { + grpc_tcp *tcp; + (void) pss; + tcp = (grpc_tcp *) ep; grpc_iocp_add_socket(tcp->socket); } @@ -393,11 +405,17 @@ static void win_destroy(grpc_endpoint *ep) { tcp_unref(tcp); } -static grpc_endpoint_vtable vtable = { - win_notify_on_read, win_write, win_add_to_pollset, win_shutdown, win_destroy -}; +static char *win_get_peer(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return gpr_strdup(tcp->peer_string); +} + +static grpc_endpoint_vtable vtable = {win_notify_on_read, win_write, + win_add_to_pollset, win_add_to_pollset_set, + win_shutdown, win_destroy, + win_get_peer}; -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *) gpr_malloc(sizeof(grpc_tcp)); memset(tcp, 0, sizeof(grpc_tcp)); tcp->base.vtable = &vtable; @@ -405,6 +423,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket) { gpr_mu_init(&tcp->mu); gpr_slice_buffer_init(&tcp->write_slices); gpr_ref_init(&tcp->refcount, 1); + tcp->peer_string = gpr_strdup(peer_string); return &tcp->base; } diff --git a/src/core/iomgr/tcp_windows.h b/src/core/iomgr/tcp_windows.h index 4cbc12c53a..7e301db250 100644 --- a/src/core/iomgr/tcp_windows.h +++ b/src/core/iomgr/tcp_windows.h @@ -50,7 +50,7 @@ /* Create a tcp endpoint given a winsock handle. * Takes ownership of the handle. */ -grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket); +grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string); int grpc_tcp_prepare_socket(SOCKET sock); diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c index 99c32bb9db..52912235f8 100644 --- a/src/core/iomgr/wakeup_fd_eventfd.c +++ b/src/core/iomgr/wakeup_fd_eventfd.c @@ -42,7 +42,7 @@ #include "src/core/iomgr/wakeup_fd_posix.h" #include <grpc/support/log.h> -static void eventfd_create(grpc_wakeup_fd_info *fd_info) { +static void eventfd_create(grpc_wakeup_fd *fd_info) { int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); /* TODO(klempner): Handle failure more gracefully */ GPR_ASSERT(efd >= 0); @@ -50,7 +50,7 @@ static void eventfd_create(grpc_wakeup_fd_info *fd_info) { fd_info->write_fd = -1; } -static void eventfd_consume(grpc_wakeup_fd_info *fd_info) { +static void eventfd_consume(grpc_wakeup_fd *fd_info) { eventfd_t value; int err; do { @@ -58,14 +58,14 @@ static void eventfd_consume(grpc_wakeup_fd_info *fd_info) { } while (err < 0 && errno == EINTR); } -static void eventfd_wakeup(grpc_wakeup_fd_info *fd_info) { +static void eventfd_wakeup(grpc_wakeup_fd *fd_info) { int err; do { err = eventfd_write(fd_info->read_fd, 1); } while (err < 0 && errno == EINTR); } -static void eventfd_destroy(grpc_wakeup_fd_info *fd_info) { +static void eventfd_destroy(grpc_wakeup_fd *fd_info) { close(fd_info->read_fd); } diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c index f895478990..9fc4ee2388 100644 --- a/src/core/iomgr/wakeup_fd_pipe.c +++ b/src/core/iomgr/wakeup_fd_pipe.c @@ -44,7 +44,7 @@ #include "src/core/iomgr/socket_utils_posix.h" #include <grpc/support/log.h> -static void pipe_create(grpc_wakeup_fd_info *fd_info) { +static void pipe_init(grpc_wakeup_fd *fd_info) { int pipefd[2]; /* TODO(klempner): Make this nonfatal */ GPR_ASSERT(0 == pipe(pipefd)); @@ -54,7 +54,7 @@ static void pipe_create(grpc_wakeup_fd_info *fd_info) { fd_info->write_fd = pipefd[1]; } -static void pipe_consume(grpc_wakeup_fd_info *fd_info) { +static void pipe_consume(grpc_wakeup_fd *fd_info) { char buf[128]; int r; @@ -74,13 +74,13 @@ static void pipe_consume(grpc_wakeup_fd_info *fd_info) { } } -static void pipe_wakeup(grpc_wakeup_fd_info *fd_info) { +static void pipe_wakeup(grpc_wakeup_fd *fd_info) { char c = 0; while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR) ; } -static void pipe_destroy(grpc_wakeup_fd_info *fd_info) { +static void pipe_destroy(grpc_wakeup_fd *fd_info) { close(fd_info->read_fd); close(fd_info->write_fd); } @@ -91,7 +91,7 @@ static int pipe_check_availability(void) { } const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = { - pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability -}; + pipe_init, pipe_consume, pipe_wakeup, pipe_destroy, + pipe_check_availability}; #endif /* GPR_POSIX_WAKUP_FD */ diff --git a/src/core/iomgr/wakeup_fd_posix.c b/src/core/iomgr/wakeup_fd_posix.c index d3cc3ec570..e48f5223fa 100644 --- a/src/core/iomgr/wakeup_fd_posix.c +++ b/src/core/iomgr/wakeup_fd_posix.c @@ -57,19 +57,19 @@ void grpc_wakeup_fd_global_destroy(void) { wakeup_fd_vtable = NULL; } -void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info) { - wakeup_fd_vtable->create(fd_info); +void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info) { + wakeup_fd_vtable->init(fd_info); } -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info) { +void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info) { wakeup_fd_vtable->consume(fd_info); } -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) { +void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info) { wakeup_fd_vtable->wakeup(fd_info); } -void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) { +void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info) { wakeup_fd_vtable->destroy(fd_info); } diff --git a/src/core/iomgr/wakeup_fd_posix.h b/src/core/iomgr/wakeup_fd_posix.h index 1b0ff70c7f..a4da4df51f 100644 --- a/src/core/iomgr/wakeup_fd_posix.h +++ b/src/core/iomgr/wakeup_fd_posix.h @@ -69,28 +69,28 @@ void grpc_wakeup_fd_global_destroy(void); * purposes only.*/ void grpc_wakeup_fd_global_init_force_fallback(void); -typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info; +typedef struct grpc_wakeup_fd grpc_wakeup_fd; typedef struct grpc_wakeup_fd_vtable { - void (*create)(grpc_wakeup_fd_info *fd_info); - void (*consume)(grpc_wakeup_fd_info *fd_info); - void (*wakeup)(grpc_wakeup_fd_info *fd_info); - void (*destroy)(grpc_wakeup_fd_info *fd_info); + void (*init)(grpc_wakeup_fd *fd_info); + void (*consume)(grpc_wakeup_fd *fd_info); + void (*wakeup)(grpc_wakeup_fd *fd_info); + void (*destroy)(grpc_wakeup_fd *fd_info); /* Must be called before calling any other functions */ int (*check_availability)(void); } grpc_wakeup_fd_vtable; -struct grpc_wakeup_fd_info { +struct grpc_wakeup_fd { int read_fd; int write_fd; }; #define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) -void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info); -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info); -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info); -void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info); +void grpc_wakeup_fd_init(grpc_wakeup_fd *fd_info); +void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd *fd_info); +void grpc_wakeup_fd_wakeup(grpc_wakeup_fd *fd_info); +void grpc_wakeup_fd_destroy(grpc_wakeup_fd *fd_info); /* Defined in some specialized implementation's .c file, or by * wakeup_fd_nospecial.c if no such implementation exists. */ |