diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/pollset_kick.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_kick_eventfd.c | 85 | ||||
-rw-r--r-- | src/core/iomgr/pollset_kick_eventfd.h | 42 | ||||
-rw-r--r-- | src/core/iomgr/pollset_kick_posix.c | 172 | ||||
-rw-r--r-- | src/core/iomgr/pollset_kick_posix.h | 15 |
5 files changed, 245 insertions, 73 deletions
diff --git a/src/core/iomgr/pollset_kick.h b/src/core/iomgr/pollset_kick.h index f088818b9a..4459a31b4f 100644 --- a/src/core/iomgr/pollset_kick.h +++ b/src/core/iomgr/pollset_kick.h @@ -48,6 +48,10 @@ void grpc_pollset_kick_global_init(void); void grpc_pollset_kick_global_destroy(void); +/* Guarantees a pure posix implementation rather than a specialized one, if + * applicable. Intended for testing. */ +void grpc_pollset_kick_global_init_posix(void); + void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state); void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state); diff --git a/src/core/iomgr/pollset_kick_eventfd.c b/src/core/iomgr/pollset_kick_eventfd.c new file mode 100644 index 0000000000..301ebad875 --- /dev/null +++ b/src/core/iomgr/pollset_kick_eventfd.c @@ -0,0 +1,85 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/pollset_kick_eventfd.h" + +#ifdef GPR_LINUX_EVENTFD +#include <errno.h> +#include <sys/eventfd.h> +#include <unistd.h> + +#include <grpc/support/port_platform.h> +#include <grpc/support/log.h> + +static void eventfd_create(grpc_kick_fd_info *fd_info) { + int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + /* TODO(klempner): Handle failure more gracefully */ + GPR_ASSERT(efd >= 0); + fd_info->read_fd = efd; + fd_info->write_fd = -1; +} + +static void eventfd_consume(grpc_kick_fd_info *fd_info) { + eventfd_t value; + int err; + do { + err = eventfd_read(fd_info->read_fd, &value); + } while (err < 0 && errno == EINTR); +} + +static void eventfd_kick(grpc_kick_fd_info *fd_info) { + int err; + do { + err = eventfd_write(fd_info->read_fd, 1); + } while (err < 0 && errno == EINTR); +} + +static void eventfd_destroy(grpc_kick_fd_info *fd_info) { + close(fd_info->read_fd); +} + +static const grpc_pollset_kick_vtable eventfd_kick_vtable = { + eventfd_create, eventfd_consume, eventfd_kick, eventfd_destroy +}; + +const grpc_pollset_kick_vtable *grpc_pollset_kick_eventfd_init(void) { + /* TODO(klempner): Check that eventfd works */ + return &eventfd_kick_vtable; +} + +#else /* GPR_LINUX_EVENTFD not defined */ +const grpc_pollset_kick_vtable *grpc_pollset_kick_eventfd_init(void) { + return NULL; +} + +#endif /* GPR_LINUX_EVENTFD */ diff --git a/src/core/iomgr/pollset_kick_eventfd.h b/src/core/iomgr/pollset_kick_eventfd.h new file mode 100644 index 0000000000..f06f7f65ec --- /dev/null +++ b/src/core/iomgr/pollset_kick_eventfd.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_KICK_EVENTFD_H_ +#define __GRPC_INTERNAL_IOMGR_POLLSET_KICK_EVENTFD_H_ + +#include "src/core/iomgr/pollset_kick_posix.h" + +/* Tries to enable eventfd support, returns a kick vtable if successful. */ +const grpc_pollset_kick_vtable *grpc_pollset_kick_eventfd_init(void); + +#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_EVENTFD_H_ */ diff --git a/src/core/iomgr/pollset_kick_posix.c b/src/core/iomgr/pollset_kick_posix.c index 9f85b6137a..4e3ee25dc7 100644 --- a/src/core/iomgr/pollset_kick_posix.c +++ b/src/core/iomgr/pollset_kick_posix.c @@ -37,6 +37,7 @@ #include <string.h> #include <unistd.h> +#include "src/core/iomgr/pollset_kick_eventfd.h" #include "src/core/iomgr/socket_utils_posix.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -46,82 +47,58 @@ #define GRPC_MAX_CACHED_PIPES 50 #define GRPC_PIPE_LOW_WATERMARK 25 -typedef struct grpc_kick_pipe_info { - int pipe_read_fd; - int pipe_write_fd; - struct grpc_kick_pipe_info *next; -} grpc_kick_pipe_info; - -static grpc_kick_pipe_info *pipe_freelist = NULL; -static int pipe_freelist_count = 0; -static gpr_mu pipe_freelist_mu; - -static grpc_kick_pipe_info *allocate_pipe(void) { - grpc_kick_pipe_info *info; - gpr_mu_lock(&pipe_freelist_mu); - if (pipe_freelist != NULL) { - info = pipe_freelist; - pipe_freelist = pipe_freelist->next; - --pipe_freelist_count; +static grpc_kick_fd_info *fd_freelist = NULL; +static int fd_freelist_count = 0; +static gpr_mu fd_freelist_mu; +static const grpc_pollset_kick_vtable *kick_vtable = NULL; + +static grpc_kick_fd_info *allocate_pipe(void) { + grpc_kick_fd_info *info; + gpr_mu_lock(&fd_freelist_mu); + if (fd_freelist != NULL) { + info = fd_freelist; + fd_freelist = fd_freelist->next; + --fd_freelist_count; } else { - int pipefd[2]; - /* TODO(klempner): Make this nonfatal */ - GPR_ASSERT(0 == pipe(pipefd)); - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); - GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); info = gpr_malloc(sizeof(*info)); - info->pipe_read_fd = pipefd[0]; - info->pipe_write_fd = pipefd[1]; + kick_vtable->create(info); info->next = NULL; } - gpr_mu_unlock(&pipe_freelist_mu); + gpr_mu_unlock(&fd_freelist_mu); return info; } static void destroy_pipe(void) { - /* assumes pipe_freelist_mu is held */ - grpc_kick_pipe_info *current = pipe_freelist; - pipe_freelist = pipe_freelist->next; - pipe_freelist_count--; - close(current->pipe_read_fd); - close(current->pipe_write_fd); + /* assumes fd_freelist_mu is held */ + grpc_kick_fd_info *current = fd_freelist; + fd_freelist = fd_freelist->next; + fd_freelist_count--; + kick_vtable->destroy(current); gpr_free(current); } -static void free_pipe(grpc_kick_pipe_info *pipe_info) { - gpr_mu_lock(&pipe_freelist_mu); - pipe_info->next = pipe_freelist; - pipe_freelist = pipe_info; - pipe_freelist_count++; - if (pipe_freelist_count > GRPC_MAX_CACHED_PIPES) { - while (pipe_freelist_count > GRPC_PIPE_LOW_WATERMARK) { +static void free_pipe(grpc_kick_fd_info *fd_info) { + gpr_mu_lock(&fd_freelist_mu); + fd_info->next = fd_freelist; + fd_freelist = fd_info; + fd_freelist_count++; + if (fd_freelist_count > GRPC_MAX_CACHED_PIPES) { + while (fd_freelist_count > GRPC_PIPE_LOW_WATERMARK) { destroy_pipe(); } } - gpr_mu_unlock(&pipe_freelist_mu); -} - -void grpc_pollset_kick_global_init() { - pipe_freelist = NULL; - gpr_mu_init(&pipe_freelist_mu); -} - -void grpc_pollset_kick_global_destroy() { - while (pipe_freelist != NULL) { - destroy_pipe(); - } - gpr_mu_destroy(&pipe_freelist_mu); + gpr_mu_unlock(&fd_freelist_mu); } void grpc_pollset_kick_init(grpc_pollset_kick_state *kick_state) { gpr_mu_init(&kick_state->mu); kick_state->kicked = 0; - kick_state->pipe_info = NULL; + kick_state->fd_info = NULL; } void grpc_pollset_kick_destroy(grpc_pollset_kick_state *kick_state) { gpr_mu_destroy(&kick_state->mu); - GPR_ASSERT(kick_state->pipe_info == NULL); + GPR_ASSERT(kick_state->fd_info == NULL); } int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) { @@ -131,17 +108,48 @@ int grpc_pollset_kick_pre_poll(grpc_pollset_kick_state *kick_state) { gpr_mu_unlock(&kick_state->mu); return -1; } - kick_state->pipe_info = allocate_pipe(); + kick_state->fd_info = allocate_pipe(); gpr_mu_unlock(&kick_state->mu); - return kick_state->pipe_info->pipe_read_fd; + return kick_state->fd_info->read_fd; } void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state) { + kick_vtable->consume(kick_state->fd_info); +} + +void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) { + gpr_mu_lock(&kick_state->mu); + free_pipe(kick_state->fd_info); + kick_state->fd_info = NULL; + gpr_mu_unlock(&kick_state->mu); +} + +void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) { + gpr_mu_lock(&kick_state->mu); + if (kick_state->fd_info != NULL) { + kick_vtable->kick(kick_state->fd_info); + } else { + kick_state->kicked = 1; + } + gpr_mu_unlock(&kick_state->mu); +} + +static void pipe_create(grpc_kick_fd_info *fd_info) { + int pipefd[2]; + /* TODO(klempner): Make this nonfatal */ + GPR_ASSERT(0 == pipe(pipefd)); + GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[0], 1)); + GPR_ASSERT(grpc_set_socket_nonblocking(pipefd[1], 1)); + fd_info->read_fd = pipefd[0]; + fd_info->write_fd = pipefd[1]; +} + +static void pipe_consume(grpc_kick_fd_info *fd_info) { char buf[128]; int r; for (;;) { - r = read(kick_state->pipe_info->pipe_read_fd, buf, sizeof(buf)); + r = read(fd_info->read_fd, buf, sizeof(buf)); if (r > 0) continue; if (r == 0) return; switch (errno) { @@ -156,22 +164,44 @@ void grpc_pollset_kick_consume(grpc_pollset_kick_state *kick_state) { } } -void grpc_pollset_kick_post_poll(grpc_pollset_kick_state *kick_state) { - gpr_mu_lock(&kick_state->mu); - free_pipe(kick_state->pipe_info); - kick_state->pipe_info = NULL; - gpr_mu_unlock(&kick_state->mu); +static void pipe_kick(grpc_kick_fd_info *fd_info) { + char c = 0; + while (write(fd_info->write_fd, &c, 1) != 1 && errno == EINTR) + ; } -void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) { - gpr_mu_lock(&kick_state->mu); - if (kick_state->pipe_info != NULL) { - char c = 0; - while (write(kick_state->pipe_info->pipe_write_fd, &c, 1) != 1 && - errno == EINTR) - ; - } else { - kick_state->kicked = 1; +static void pipe_destroy(grpc_kick_fd_info *fd_info) { + close(fd_info->read_fd); + close(fd_info->write_fd); +} + +static const grpc_pollset_kick_vtable pipe_kick_vtable = { + pipe_create, pipe_consume, pipe_kick, pipe_destroy +}; + +static void global_init_common(void) { + fd_freelist = NULL; + gpr_mu_init(&fd_freelist_mu); +} + +void grpc_pollset_kick_global_init_posix(void) { + global_init_common(); + kick_vtable = &pipe_kick_vtable; +} + +void grpc_pollset_kick_global_init(void) { + global_init_common(); + kick_vtable = grpc_pollset_kick_eventfd_init(); + if (kick_vtable == NULL) { + kick_vtable = &pipe_kick_vtable; } - gpr_mu_unlock(&kick_state->mu); } + +void grpc_pollset_kick_global_destroy(void) { + while (fd_freelist != NULL) { + destroy_pipe(); + } + gpr_mu_destroy(&fd_freelist_mu); +} + + diff --git a/src/core/iomgr/pollset_kick_posix.h b/src/core/iomgr/pollset_kick_posix.h index bae3b5923a..5eb4998760 100644 --- a/src/core/iomgr/pollset_kick_posix.h +++ b/src/core/iomgr/pollset_kick_posix.h @@ -36,12 +36,23 @@ #include <grpc/support/sync.h> -struct grpc_kick_pipe_info; +typedef struct grpc_kick_fd_info { + int read_fd; + int write_fd; + struct grpc_kick_fd_info *next; +} grpc_kick_fd_info; + +typedef struct grpc_pollset_kick_vtable { + void (*create)(struct grpc_kick_fd_info *fd_info); + void (*consume)(struct grpc_kick_fd_info *fd_info); + void (*kick)(struct grpc_kick_fd_info *fd_info); + void (*destroy)(struct grpc_kick_fd_info *fd_info); +} grpc_pollset_kick_vtable; typedef struct grpc_pollset_kick_state { gpr_mu mu; int kicked; - struct grpc_kick_pipe_info *pipe_info; + struct grpc_kick_fd_info *fd_info; } grpc_pollset_kick_state; #endif /* __GRPC_INTERNAL_IOMGR_POLLSET_KICK_POSIX_H_ */ |