From 1a277ecd93e908a47a905d323a4a1a77287cede1 Mon Sep 17 00:00:00 2001 From: ctiller Date: Wed, 7 Jan 2015 14:03:30 -0800 Subject: Remove libevent. Fixed any exposed bugs across the stack. Add a poll() based implementation. Heavily leverages pollset infrastructure to allow small polls to be the norm. Exposes a mechanism to plug in epoll/kqueue for platforms where we have them. Simplify iomgr callbacks to return one bit of success or failure (instead of the multi valued result that was mostly unused previously). This will ease the burden on new implementations, and the previous system provided no real value anyway. Removed timeouts on endpoint read/write routines. This simplifies porting burden by providing a more orthogonal interface, and the functionality can always be replicated when desired by using an alarm combined with endpoint_shutdown. I'm fairly certain we ended up with this interface because it was convenient to do from libevent. Things that need attention still: - adding an fd to a pollset is O(n^2) - but this is probably ok given that we'll not use this for multipolling once platform specific implementations are added. - we rely on the backup poller too often - especially for SSL handshakes and for client connection establishment we should have a better mechanism ([] [] - Linux needs to use epoll for multiple fds, FreeBSD variants (including Darwin) need to use kqueue. ([] [] - Linux needs to use eventfd for poll kicking. ([] Change on 2015/01/07 by ctiller ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83461069 --- src/core/iomgr/alarm.c | 34 +- src/core/iomgr/alarm_internal.h | 5 +- src/core/iomgr/endpoint.c | 14 +- src/core/iomgr/endpoint.h | 17 +- src/core/iomgr/fd_posix.c | 274 +++++++++ src/core/iomgr/fd_posix.h | 138 +++++ src/core/iomgr/iomgr.c | 204 +++++++ src/core/iomgr/iomgr.h | 11 +- src/core/iomgr/iomgr_completion_queue_interface.h | 45 -- src/core/iomgr/iomgr_internal.h | 51 ++ src/core/iomgr/iomgr_libevent.c | 652 --------------------- src/core/iomgr/iomgr_libevent.h | 206 ------- src/core/iomgr/iomgr_libevent_use_threads.c | 56 -- src/core/iomgr/iomgr_posix.c | 38 ++ src/core/iomgr/iomgr_posix.h | 42 ++ src/core/iomgr/pollset.c | 37 -- src/core/iomgr/pollset.h | 21 +- .../iomgr/pollset_multipoller_with_poll_posix.c | 237 ++++++++ src/core/iomgr/pollset_posix.c | 340 +++++++++++ src/core/iomgr/pollset_posix.h | 95 +++ src/core/iomgr/resolve_address_posix.c | 6 +- src/core/iomgr/tcp_client_posix.c | 76 ++- src/core/iomgr/tcp_posix.c | 67 +-- src/core/iomgr/tcp_posix.h | 2 +- src/core/iomgr/tcp_server.h | 4 +- src/core/iomgr/tcp_server_posix.c | 38 +- 26 files changed, 1579 insertions(+), 1131 deletions(-) create mode 100644 src/core/iomgr/fd_posix.c create mode 100644 src/core/iomgr/fd_posix.h create mode 100644 src/core/iomgr/iomgr.c delete mode 100644 src/core/iomgr/iomgr_completion_queue_interface.h create mode 100644 src/core/iomgr/iomgr_internal.h delete mode 100644 src/core/iomgr/iomgr_libevent.c delete mode 100644 src/core/iomgr/iomgr_libevent.h delete mode 100644 src/core/iomgr/iomgr_libevent_use_threads.c create mode 100644 src/core/iomgr/iomgr_posix.c create mode 100644 src/core/iomgr/iomgr_posix.h delete mode 100644 src/core/iomgr/pollset.c create mode 100644 src/core/iomgr/pollset_multipoller_with_poll_posix.c create mode 100644 src/core/iomgr/pollset_posix.c create mode 100644 src/core/iomgr/pollset_posix.h (limited to 'src/core/iomgr') diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index b7238f716a..2664879323 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -71,8 +71,8 @@ static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static int run_some_expired_alarms(gpr_timespec now, - grpc_iomgr_cb_status status); +static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, + gpr_timespec *next, int success); static gpr_timespec compute_min_deadline(shard_type *shard) { return grpc_alarm_heap_is_empty(&shard->heap) @@ -102,7 +102,7 @@ void grpc_alarm_list_init(gpr_timespec now) { void grpc_alarm_list_shutdown() { int i; - while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED)) + while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0)) ; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -233,7 +233,7 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { gpr_mu_unlock(&shard->mu); if (triggered) { - alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED); + alarm->cb(alarm->cb_arg, 0); } } @@ -299,8 +299,8 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now, return n; } -static int run_some_expired_alarms(gpr_timespec now, - grpc_iomgr_cb_status status) { +static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, + gpr_timespec *next, int success) { size_t n = 0; size_t i; grpc_alarm *alarms[MAX_ALARMS_PER_CHECK]; @@ -329,19 +329,35 @@ static int run_some_expired_alarms(gpr_timespec now, note_deadline_change(g_shard_queue[0]); } + if (next) { + *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); + } + gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_checker_mu); + } else if (next) { + gpr_mu_lock(&g_mu); + *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); + gpr_mu_unlock(&g_mu); + } + + if (n && drop_mu) { + gpr_mu_unlock(drop_mu); } for (i = 0; i < n; i++) { - alarms[i]->cb(alarms[i]->cb_arg, status); + alarms[i]->cb(alarms[i]->cb_arg, success); + } + + if (n && drop_mu) { + gpr_mu_lock(drop_mu); } return n; } -int grpc_alarm_check(gpr_timespec now) { - return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS); +int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { + return run_some_expired_alarms(drop_mu, now, next, 1); } gpr_timespec grpc_alarm_list_next_timeout() { diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h index e605ff84f9..12b6ab4286 100644 --- a/src/core/iomgr/alarm_internal.h +++ b/src/core/iomgr/alarm_internal.h @@ -34,9 +34,12 @@ #ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ #define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ +#include +#include + /* iomgr internal api for dealing with alarms */ -int grpc_alarm_check(gpr_timespec now); +int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next); void grpc_alarm_list_init(gpr_timespec now); void grpc_alarm_list_shutdown(); diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index f1944bf672..9e5d56389d 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -34,14 +34,16 @@ #include "src/core/iomgr/endpoint.h" void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline) { - ep->vtable->notify_on_read(ep, cb, user_data, deadline); + void *user_data) { + ep->vtable->notify_on_read(ep, cb, user_data); } -grpc_endpoint_write_status grpc_endpoint_write( - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { - return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline); +grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data) { + return ep->vtable->write(ep, slices, nslices, cb, user_data); } void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index bbd800bea8..ec86d9a146 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -48,8 +48,7 @@ typedef enum grpc_endpoint_cb_status { GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */ GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */ GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */ - GRPC_ENDPOINT_CB_ERROR, /* Call interrupted by socket error */ - GRPC_ENDPOINT_CB_TIMED_OUT /* Call timed out */ + GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */ } grpc_endpoint_cb_status; typedef enum grpc_endpoint_write_status { @@ -66,10 +65,10 @@ typedef void (*grpc_endpoint_write_cb)(void *user_data, struct grpc_endpoint_vtable { void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline); + void *user_data); grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_endpoint_write_cb cb, - void *user_data, gpr_timespec deadline); + void *user_data); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); @@ -77,7 +76,7 @@ struct grpc_endpoint_vtable { /* When data is available on the connection, calls the callback with slices. */ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline); + void *user_data); /* Write slices out to the socket. @@ -85,9 +84,11 @@ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, returns GRPC_ENDPOINT_WRITE_DONE. Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the connection is ready for more data. */ -grpc_endpoint_write_status grpc_endpoint_write( - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline); +grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data); /* Causes any pending read/write callbacks to run immediately with GRPC_ENDPOINT_CB_SHUTDOWN status */ diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c new file mode 100644 index 0000000000..3cd2f9a8e0 --- /dev/null +++ b/src/core/iomgr/fd_posix.c @@ -0,0 +1,274 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/fd_posix.h" + +#include +#include + +#include "src/core/iomgr/iomgr_internal.h" +#include +#include +#include + +enum descriptor_state { NOT_READY, READY, WAITING }; + +static void destroy(grpc_fd *fd) { + grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); + gpr_mu_destroy(&fd->set_state_mu); + gpr_free(fd->watchers); + gpr_free(fd); + grpc_iomgr_unref(); +} + +static void ref_by(grpc_fd *fd, int n) { + gpr_atm_no_barrier_fetch_add(&fd->refst, n); +} + +static void unref_by(grpc_fd *fd, int n) { + if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) { + destroy(fd); + } +} + +static void do_nothing(void *ignored, int success) {} + +grpc_fd *grpc_fd_create(int fd) { + grpc_fd *r = gpr_malloc(sizeof(grpc_fd)); + grpc_iomgr_ref(); + gpr_atm_rel_store(&r->refst, 1); + gpr_atm_rel_store(&r->readst.state, NOT_READY); + gpr_atm_rel_store(&r->writest.state, NOT_READY); + gpr_mu_init(&r->set_state_mu); + gpr_mu_init(&r->watcher_mu); + gpr_atm_rel_store(&r->shutdown, 0); + r->fd = fd; + r->watchers = NULL; + r->watcher_count = 0; + r->watcher_capacity = 0; + grpc_pollset_add_fd(grpc_backup_pollset(), r); + return r; +} + +int grpc_fd_is_orphaned(grpc_fd *fd) { + return (gpr_atm_acq_load(&fd->refst) & 1) == 0; +} + +static void wake_watchers(grpc_fd *fd) { + size_t i, n; + gpr_mu_lock(&fd->watcher_mu); + n = fd->watcher_count; + for (i = 0; i < n; i++) { + grpc_pollset_force_kick(fd->watchers[i]); + } + gpr_mu_unlock(&fd->watcher_mu); +} + +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { + fd->on_done = on_done ? on_done : do_nothing; + fd->on_done_user_data = user_data; + ref_by(fd, 1); /* remove active status, but keep referenced */ + wake_watchers(fd); + close(fd->fd); + unref_by(fd, 2); /* drop the reference */ +} + +/* increment refcount by two to avoid changing the orphan bit */ +void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } + +void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } + +typedef struct { + grpc_iomgr_cb_func cb; + void *arg; +} callback; + +static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, + int allow_synchronous_callback) { + if (allow_synchronous_callback) { + cb(arg, success); + } else { + grpc_iomgr_add_delayed_callback(cb, arg, success); + } +} + +static void make_callbacks(callback *callbacks, size_t n, int success, + int allow_synchronous_callback) { + size_t i; + for (i = 0; i < n; i++) { + make_callback(callbacks[i].cb, callbacks[i].arg, success, + allow_synchronous_callback); + } +} + +static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, + void *arg, int allow_synchronous_callback) { + switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { + case NOT_READY: + /* There is no race if the descriptor is already ready, so we skip + the interlocked op in that case. As long as the app doesn't + try to set the same upcall twice (which it shouldn't) then + oldval should never be anything other than READY or NOT_READY. We + don't + check for user error on the fast path. */ + st->cb = cb; + st->cb_arg = arg; + if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) { + /* swap was successful -- the closure will run after the next + set_ready call. NOTE: we don't have an ABA problem here, + since we should never have concurrent calls to the same + notify_on function. */ + wake_watchers(fd); + return; + } + /* swap was unsuccessful due to an intervening set_ready call. + Fall through to the READY code below */ + case READY: + assert(gpr_atm_acq_load(&st->state) == READY); + gpr_atm_rel_store(&st->state, NOT_READY); + make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown), + allow_synchronous_callback); + return; + case WAITING: + /* upcallptr was set to a different closure. This is an error! */ + gpr_log(GPR_ERROR, + "User called a notify_on function with a previous callback still " + "pending"); + abort(); + } + gpr_log(GPR_ERROR, "Corrupt memory in &st->state"); + abort(); +} + +static void set_ready_locked(grpc_fd_state *st, callback *callbacks, + size_t *ncallbacks) { + callback *c; + + switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { + case NOT_READY: + if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) { + /* swap was successful -- the closure will run after the next + notify_on call. */ + return; + } + /* swap was unsuccessful due to an intervening set_ready call. + Fall through to the WAITING code below */ + case WAITING: + assert(gpr_atm_acq_load(&st->state) == WAITING); + c = &callbacks[(*ncallbacks)++]; + c->cb = st->cb; + c->arg = st->cb_arg; + gpr_atm_rel_store(&st->state, NOT_READY); + return; + case READY: + /* duplicate ready, ignore */ + return; + } +} + +static void set_ready(grpc_fd *fd, grpc_fd_state *st, + int allow_synchronous_callback) { + /* only one set_ready can be active at once (but there may be a racing + notify_on) */ + int success; + callback cb; + size_t ncb = 0; + gpr_mu_lock(&fd->set_state_mu); + set_ready_locked(st, &cb, &ncb); + gpr_mu_unlock(&fd->set_state_mu); + success = !gpr_atm_acq_load(&fd->shutdown); + make_callbacks(&cb, ncb, success, allow_synchronous_callback); +} + +void grpc_fd_shutdown(grpc_fd *fd) { + callback cb[2]; + size_t ncb = 0; + gpr_mu_lock(&fd->set_state_mu); + GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown)); + gpr_atm_rel_store(&fd->shutdown, 1); + set_ready_locked(&fd->readst, cb, &ncb); + set_ready_locked(&fd->writest, cb, &ncb); + gpr_mu_unlock(&fd->set_state_mu); + make_callbacks(cb, ncb, 0, 0); +} + +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, + void *read_cb_arg) { + notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0); +} + +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, + void *write_cb_arg) { + notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0); +} + +gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + gpr_uint32 read_mask, gpr_uint32 write_mask) { + /* keep track of pollers that have requested our events, in case they change + */ + gpr_mu_lock(&fd->watcher_mu); + if (fd->watcher_capacity == fd->watcher_count) { + fd->watcher_capacity = + GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2); + fd->watchers = gpr_realloc(fd->watchers, + fd->watcher_capacity * sizeof(grpc_pollset *)); + } + fd->watchers[fd->watcher_count++] = pollset; + gpr_mu_unlock(&fd->watcher_mu); + + return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) | + (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0); +} + +void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) { + size_t r, w, n; + + gpr_mu_lock(&fd->watcher_mu); + n = fd->watcher_count; + for (r = 0, w = 0; r < n; r++) { + if (fd->watchers[r] == pollset) { + fd->watcher_count--; + continue; + } + fd->watchers[w++] = fd->watchers[r]; + } + gpr_mu_unlock(&fd->watcher_mu); +} + +void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { + set_ready(fd, &fd->readst, allow_synchronous_callback); +} + +void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) { + set_ready(fd, &fd->writest, allow_synchronous_callback); +} diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h new file mode 100644 index 0000000000..232de0c3e0 --- /dev/null +++ b/src/core/iomgr/fd_posix.h @@ -0,0 +1,138 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ + +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" +#include +#include +#include + +typedef struct { + grpc_iomgr_cb_func cb; + void *cb_arg; + int success; + gpr_atm state; +} grpc_fd_state; + +typedef struct grpc_fd { + int fd; + /* refst format: + bit0: 1=active/0=orphaned + bit1-n: refcount + meaning that mostly we ref by two to avoid altering the orphaned bit, + and just unref by 1 when we're ready to flag the object as orphaned */ + gpr_atm refst; + + gpr_mu set_state_mu; + gpr_atm shutdown; + + gpr_mu watcher_mu; + grpc_pollset **watchers; + size_t watcher_count; + size_t watcher_capacity; + + grpc_fd_state readst; + grpc_fd_state writest; + + grpc_iomgr_cb_func on_done; + void *on_done_user_data; +} grpc_fd; + +/* Create a wrapped file descriptor. + Requires fd is a non-blocking file descriptor. + This takes ownership of closing fd. */ +grpc_fd *grpc_fd_create(int fd); + +/* Releases fd to be asynchronously destroyed. + on_done is called when the underlying file descriptor is definitely close()d. + If on_done is NULL, no callback will be made. + Requires: *fd initialized; no outstanding notify_on_read or + notify_on_write. */ +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data); + +/* Begin polling on an fd. + Registers that the given pollset is interested in this fd - so that if read + or writability interest changes, the pollset can be kicked to pick up that + new interest. + Return value is: + (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0) + i.e. a combination of read_mask and write_mask determined by the fd's current + interest in said events. + Polling strategies that do not need to alter their behavior depending on the + fd's current interest (such as epoll) do not need to call this function. */ +gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + gpr_uint32 read_mask, gpr_uint32 write_mask); +/* Complete polling previously started with grpc_fd_begin_poll */ +void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset); + +/* Return 1 if this fd is orphaned, 0 otherwise */ +int grpc_fd_is_orphaned(grpc_fd *fd); + +/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */ +void grpc_fd_shutdown(grpc_fd *fd); + +/* Register read interest, causing read_cb to be called once when fd becomes + readable, on deadline specified by deadline, or on shutdown triggered by + grpc_fd_shutdown. + read_cb will be called with read_cb_arg when *fd becomes readable. + read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, + GRPC_CALLBACK_TIMED_OUT if the call timed out, + and CANCELLED if the call was cancelled. + + Requires:This method must not be called before the read_cb for any previous + call runs. Edge triggered events are used whenever they are supported by the + underlying platform. This means that users must drain fd in read_cb before + calling notify_on_read again. Users are also expected to handle spurious + events, i.e read_cb is called while nothing can be readable from fd */ +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, + void *read_cb_arg); + +/* Exactly the same semantics as above, except based on writable events. */ +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, + void *write_cb_arg); + +/* Notification from the poller to an fd that it has become readable or + writable. + If allow_synchronous_callback is 1, allow running the fd callback inline + in this callstack, otherwise register an asynchronous callback and return */ +void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback); +void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback); + +/* Reference counting for fds */ +void grpc_fd_ref(grpc_fd *fd); +void grpc_fd_unref(grpc_fd *fd); + +#endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c new file mode 100644 index 0000000000..03f56a50a3 --- /dev/null +++ b/src/core/iomgr/iomgr.c @@ -0,0 +1,204 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/iomgr.h" + +#include + +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/alarm_internal.h" +#include +#include +#include +#include + +typedef struct delayed_callback { + grpc_iomgr_cb_func cb; + void *cb_arg; + int success; + struct delayed_callback *next; +} delayed_callback; + +static gpr_mu g_mu; +static gpr_cv g_cv; +static delayed_callback *g_cbs_head = NULL; +static delayed_callback *g_cbs_tail = NULL; +static int g_shutdown; +static int g_refs; +static gpr_event g_background_callback_executor_done; + +/* Execute followup callbacks continuously. + Other threads may check in and help during pollset_work() */ +static void background_callback_executor(void *ignored) { + gpr_mu_lock(&g_mu); + while (!g_shutdown) { + gpr_timespec deadline = gpr_inf_future; + if (g_cbs_head) { + delayed_callback *cb = g_cbs_head; + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + cb->cb(cb->cb_arg, cb->success); + gpr_free(cb); + gpr_mu_lock(&g_mu); + } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { + } else { + gpr_cv_wait(&g_cv, &g_mu, deadline); + } + } + gpr_mu_unlock(&g_mu); + gpr_event_set(&g_background_callback_executor_done, (void *)1); +} + +void grpc_kick_poller() { gpr_cv_broadcast(&g_cv); } + +void grpc_iomgr_init() { + gpr_thd_id id; + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); + grpc_alarm_list_init(gpr_now()); + g_refs = 0; + grpc_iomgr_platform_init(); + gpr_event_init(&g_background_callback_executor_done); + gpr_thd_new(&id, background_callback_executor, NULL, NULL); +} + +void grpc_iomgr_shutdown() { + delayed_callback *cb; + gpr_timespec shutdown_deadline = + gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); + + grpc_iomgr_platform_shutdown(); + + gpr_mu_lock(&g_mu); + g_shutdown = 1; + while (g_cbs_head || g_refs) { + gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs, + g_cbs_head ? " and executing final callbacks" : ""); + while (g_cbs_head) { + cb = g_cbs_head; + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + + cb->cb(cb->cb_arg, 0); + gpr_free(cb); + gpr_mu_lock(&g_mu); + } + if (g_refs) { + if (gpr_cv_wait(&g_cv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) { + gpr_log(GPR_DEBUG, + "Failed to free %d iomgr objects before shutdown deadline: " + "memory leaks are likely", + g_refs); + break; + } + } + } + gpr_mu_unlock(&g_mu); + + gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); + + grpc_alarm_list_shutdown(); + gpr_mu_destroy(&g_mu); + gpr_cv_destroy(&g_cv); +} + +void grpc_iomgr_ref() { + gpr_mu_lock(&g_mu); + ++g_refs; + gpr_mu_unlock(&g_mu); +} + +void grpc_iomgr_unref() { + gpr_mu_lock(&g_mu); + if (0 == --g_refs) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); +} + +void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, + int success) { + delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback)); + dcb->cb = cb; + dcb->cb_arg = cb_arg; + dcb->success = success; + gpr_mu_lock(&g_mu); + dcb->next = NULL; + if (!g_cbs_tail) { + g_cbs_head = g_cbs_tail = dcb; + } else { + g_cbs_tail->next = dcb; + g_cbs_tail = dcb; + } + gpr_cv_signal(&g_cv); + gpr_mu_unlock(&g_mu); +} + +void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { + grpc_iomgr_add_delayed_callback(cb, cb_arg, 1); +} + +int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { + int n = 0; + gpr_mu *retake_mu = NULL; + delayed_callback *cb; + for (;;) { + /* check for new work */ + if (!gpr_mu_trylock(&g_mu)) { + break; + } + cb = g_cbs_head; + if (!cb) { + gpr_mu_unlock(&g_mu); + break; + } + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + /* if we have a mutex to drop, do so before executing work */ + if (drop_mu) { + gpr_mu_unlock(drop_mu); + retake_mu = drop_mu; + drop_mu = NULL; + } + cb->cb(cb->cb_arg, success && cb->success); + gpr_free(cb); + n++; + } + if (retake_mu) { + gpr_mu_lock(retake_mu); + } + return n; +} diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index cf39f947bc..16991a9b90 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -34,17 +34,8 @@ #ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__ #define __GRPC_INTERNAL_IOMGR_IOMGR_H__ -/* Status passed to callbacks for grpc_em_fd_notify_on_read and - grpc_em_fd_notify_on_write. */ -typedef enum grpc_em_cb_status { - GRPC_CALLBACK_SUCCESS = 0, - GRPC_CALLBACK_TIMED_OUT, - GRPC_CALLBACK_CANCELLED, - GRPC_CALLBACK_DO_NOT_USE -} grpc_iomgr_cb_status; - /* gRPC Callback definition */ -typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status); +typedef void (*grpc_iomgr_cb_func)(void *arg, int success); void grpc_iomgr_init(); void grpc_iomgr_shutdown(); diff --git a/src/core/iomgr/iomgr_completion_queue_interface.h b/src/core/iomgr/iomgr_completion_queue_interface.h deleted file mode 100644 index 3c4efe773a..0000000000 --- a/src/core/iomgr/iomgr_completion_queue_interface.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ -#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ - -/* Internals of iomgr that are exposed only to be used for completion queue - implementation */ - -extern gpr_mu grpc_iomgr_mu; -extern gpr_cv grpc_iomgr_cv; - -int grpc_iomgr_work(gpr_timespec deadline); - -#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */ diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h new file mode 100644 index 0000000000..5f72542777 --- /dev/null +++ b/src/core/iomgr/iomgr_internal.h @@ -0,0 +1,51 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ +#define __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ + +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/iomgr_internal.h" +#include + +int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); +void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, + int success); + +void grpc_iomgr_ref(); +void grpc_iomgr_unref(); + +void grpc_iomgr_platform_init(); +void grpc_iomgr_platform_shutdown(); + +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ */ diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c deleted file mode 100644 index 6188ab2749..0000000000 --- a/src/core/iomgr/iomgr_libevent.c +++ /dev/null @@ -1,652 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/iomgr/iomgr_libevent.h" - -#include -#include - -#include "src/core/iomgr/alarm.h" -#include "src/core/iomgr/alarm_internal.h" -#include -#include -#include -#include -#include -#include -#include -#include - -#define ALARM_TRIGGER_INIT ((gpr_atm)0) -#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1) -#define DONE_SHUTDOWN ((void *)1) - -#define POLLER_ID_INVALID ((gpr_atm)-1) - -/* Global data */ -struct event_base *g_event_base; -gpr_mu grpc_iomgr_mu; -gpr_cv grpc_iomgr_cv; -static grpc_libevent_activation_data *g_activation_queue; -static int g_num_pollers; -static int g_num_fds; -static int g_num_address_resolutions; -static gpr_timespec g_last_poll_completed; -static int g_shutdown_backup_poller; -static gpr_event g_backup_poller_done; -/* activated to break out of the event loop early */ -static struct event *g_timeout_ev; -/* activated to safely break polling from other threads */ -static struct event *g_break_ev; -static grpc_fd *g_fds_to_free; - -int evthread_use_threads(void); -static void grpc_fd_impl_destroy(grpc_fd *impl); - -void grpc_iomgr_ref_address_resolution(int delta) { - gpr_mu_lock(&grpc_iomgr_mu); - GPR_ASSERT(!g_shutdown_backup_poller); - g_num_address_resolutions += delta; - if (0 == g_num_address_resolutions) { - gpr_cv_broadcast(&grpc_iomgr_cv); - } - gpr_mu_unlock(&grpc_iomgr_mu); -} - -/* If anything is in the work queue, process one item and return 1. - Return 0 if there were no work items to complete. - Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ -static int maybe_do_queue_work() { - grpc_libevent_activation_data *work = g_activation_queue; - - if (work == NULL) return 0; - - if (work->next == work) { - g_activation_queue = NULL; - } else { - g_activation_queue = work->next; - g_activation_queue->prev = work->prev; - g_activation_queue->next->prev = g_activation_queue->prev->next = - g_activation_queue; - } - work->next = work->prev = NULL; - /* force status to cancelled from ok when shutting down */ - if (g_shutdown_backup_poller && work->status == GRPC_CALLBACK_SUCCESS) { - work->status = GRPC_CALLBACK_CANCELLED; - } - gpr_mu_unlock(&grpc_iomgr_mu); - - work->cb(work->arg, work->status); - - gpr_mu_lock(&grpc_iomgr_mu); - return 1; -} - -/* Break out of the event loop on timeout */ -static void timer_callback(int fd, short events, void *context) { - event_base_loopbreak((struct event_base *)context); -} - -static void break_callback(int fd, short events, void *context) { - event_base_loopbreak((struct event_base *)context); -} - -static void free_fd_list(grpc_fd *impl) { - while (impl != NULL) { - grpc_fd *current = impl; - impl = impl->next; - grpc_fd_impl_destroy(current); - current->on_done(current->on_done_user_data, GRPC_CALLBACK_SUCCESS); - gpr_free(current); - } -} - -static void maybe_free_fds() { - if (g_fds_to_free) { - free_fd_list(g_fds_to_free); - g_fds_to_free = NULL; - } -} - -void grpc_kick_poller() { event_active(g_break_ev, EV_READ, 0); } - -/* Spend some time doing polling and libevent maintenance work if no other - thread is. This includes both polling for events and destroying/closing file - descriptor objects. - Returns 1 if polling was performed, 0 otherwise. - Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ -static int maybe_do_polling_work(struct timeval delay) { - int status; - - if (g_num_pollers) return 0; - - g_num_pollers = 1; - - maybe_free_fds(); - - gpr_mu_unlock(&grpc_iomgr_mu); - - event_add(g_timeout_ev, &delay); - status = event_base_loop(g_event_base, EVLOOP_ONCE); - if (status < 0) { - gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status); - } - event_del(g_timeout_ev); - - gpr_mu_lock(&grpc_iomgr_mu); - maybe_free_fds(); - - g_num_pollers = 0; - gpr_cv_broadcast(&grpc_iomgr_cv); - return 1; -} - -static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) { - int r = 0; - if (gpr_time_cmp(next, now) < 0) { - gpr_mu_unlock(&grpc_iomgr_mu); - r = grpc_alarm_check(now); - gpr_mu_lock(&grpc_iomgr_mu); - } - return r; -} - -int grpc_iomgr_work(gpr_timespec deadline) { - gpr_timespec now = gpr_now(); - gpr_timespec next = grpc_alarm_list_next_timeout(); - gpr_timespec delay_timespec = gpr_time_sub(deadline, now); - /* poll for no longer than one second */ - gpr_timespec max_delay = gpr_time_from_seconds(1); - struct timeval delay; - - if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) { - return 0; - } - - if (gpr_time_cmp(delay_timespec, max_delay) > 0) { - delay_timespec = max_delay; - } - - /* Adjust delay to account for the next alarm, if applicable. */ - delay_timespec = gpr_time_min( - delay_timespec, gpr_time_sub(grpc_alarm_list_next_timeout(), now)); - - delay = gpr_timeval_from_timespec(delay_timespec); - - if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) || - maybe_do_polling_work(delay)) { - g_last_poll_completed = gpr_now(); - return 1; - } - - return 0; -} - -static void backup_poller_thread(void *p) { - int backup_poller_engaged = 0; - /* allow no pollers for 100 milliseconds, then engage backup polling */ - gpr_timespec allow_no_pollers = gpr_time_from_millis(100); - - gpr_mu_lock(&grpc_iomgr_mu); - while (!g_shutdown_backup_poller) { - if (g_num_pollers == 0) { - gpr_timespec now = gpr_now(); - gpr_timespec time_until_engage = gpr_time_sub( - allow_no_pollers, gpr_time_sub(now, g_last_poll_completed)); - if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) { - if (!backup_poller_engaged) { - gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller"); - backup_poller_engaged = 1; - } - if (!maybe_do_queue_work()) { - gpr_timespec next = grpc_alarm_list_next_timeout(); - if (!maybe_do_alarm_work(now, next)) { - gpr_timespec deadline = - gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1))); - maybe_do_polling_work( - gpr_timeval_from_timespec(gpr_time_sub(deadline, now))); - } - } - } else { - if (backup_poller_engaged) { - gpr_log(GPR_DEBUG, "Backup poller disengaged"); - backup_poller_engaged = 0; - } - gpr_mu_unlock(&grpc_iomgr_mu); - gpr_sleep_until(gpr_time_add(now, time_until_engage)); - gpr_mu_lock(&grpc_iomgr_mu); - } - } else { - if (backup_poller_engaged) { - gpr_log(GPR_DEBUG, "Backup poller disengaged"); - backup_poller_engaged = 0; - } - gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future); - } - } - gpr_mu_unlock(&grpc_iomgr_mu); - - gpr_event_set(&g_backup_poller_done, (void *)1); -} - -void grpc_iomgr_init() { - gpr_thd_id backup_poller_id; - - if (evthread_use_threads() != 0) { - gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); - abort(); - } - - grpc_alarm_list_init(gpr_now()); - - gpr_mu_init(&grpc_iomgr_mu); - gpr_cv_init(&grpc_iomgr_cv); - g_activation_queue = NULL; - g_num_pollers = 0; - g_num_fds = 0; - g_num_address_resolutions = 0; - g_last_poll_completed = gpr_now(); - g_shutdown_backup_poller = 0; - g_fds_to_free = NULL; - - gpr_event_init(&g_backup_poller_done); - - g_event_base = NULL; - g_timeout_ev = NULL; - g_break_ev = NULL; - - g_event_base = event_base_new(); - if (!g_event_base) { - gpr_log(GPR_ERROR, "Failed to create the event base"); - abort(); - } - - if (evthread_make_base_notifiable(g_event_base) != 0) { - gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!"); - abort(); - } - - g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base); - g_break_ev = event_new(g_event_base, -1, EV_READ | EV_PERSIST, break_callback, - g_event_base); - - event_add(g_break_ev, NULL); - - gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL); -} - -void grpc_iomgr_shutdown() { - gpr_timespec fd_shutdown_deadline = - gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); - - /* broadcast shutdown */ - gpr_mu_lock(&grpc_iomgr_mu); - while (g_num_fds > 0 || g_num_address_resolutions > 0) { - gpr_log(GPR_INFO, - "waiting for %d fds and %d name resolutions to be destroyed before " - "closing event manager", - g_num_fds, g_num_address_resolutions); - if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) { - gpr_log(GPR_ERROR, - "not all fds or name resolutions destroyed before shutdown " - "deadline: memory leaks " - "are likely"); - break; - } else if (g_num_fds == 0 && g_num_address_resolutions == 0) { - gpr_log(GPR_INFO, "all fds closed, all name resolutions finished"); - } - } - - g_shutdown_backup_poller = 1; - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); - - gpr_event_wait(&g_backup_poller_done, gpr_inf_future); - - grpc_alarm_list_shutdown(); - - /* drain pending work */ - gpr_mu_lock(&grpc_iomgr_mu); - while (maybe_do_queue_work()) - ; - gpr_mu_unlock(&grpc_iomgr_mu); - - free_fd_list(g_fds_to_free); - - /* complete shutdown */ - gpr_mu_destroy(&grpc_iomgr_mu); - gpr_cv_destroy(&grpc_iomgr_cv); - - if (g_timeout_ev != NULL) { - event_free(g_timeout_ev); - } - - if (g_break_ev != NULL) { - event_free(g_break_ev); - } - - if (g_event_base != NULL) { - event_base_free(g_event_base); - g_event_base = NULL; - } -} - -static void add_task(grpc_libevent_activation_data *adata) { - gpr_mu_lock(&grpc_iomgr_mu); - if (g_activation_queue) { - adata->next = g_activation_queue; - adata->prev = adata->next->prev; - adata->next->prev = adata->prev->next = adata; - } else { - g_activation_queue = adata; - adata->next = adata->prev = adata; - } - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); -} - -static void grpc_fd_impl_destroy(grpc_fd *impl) { - grpc_em_task_activity_type type; - grpc_libevent_activation_data *adata; - - for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) { - adata = &(impl->task.activation[type]); - GPR_ASSERT(adata->next == NULL); - if (adata->ev != NULL) { - event_free(adata->ev); - adata->ev = NULL; - } - } - - if (impl->shutdown_ev != NULL) { - event_free(impl->shutdown_ev); - impl->shutdown_ev = NULL; - } - gpr_mu_destroy(&impl->mu); - close(impl->fd); -} - -/* Proxy callback to call a gRPC read/write callback */ -static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) { - grpc_fd *em_fd = arg; - grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS; - int run_read_cb = 0; - int run_write_cb = 0; - grpc_libevent_activation_data *rdata, *wdata; - - gpr_mu_lock(&em_fd->mu); - if (em_fd->shutdown_started) { - status = GRPC_CALLBACK_CANCELLED; - } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) { - status = GRPC_CALLBACK_TIMED_OUT; - /* TODO(klempner): This is broken if we are monitoring both read and write - events on the same fd -- generating a spurious event is okay, but - generating a spurious timeout is not. */ - what |= (EV_READ | EV_WRITE); - } - - if (what & EV_READ) { - switch (em_fd->read_state) { - case GRPC_FD_WAITING: - run_read_cb = 1; - em_fd->read_state = GRPC_FD_IDLE; - break; - case GRPC_FD_IDLE: - case GRPC_FD_CACHED: - em_fd->read_state = GRPC_FD_CACHED; - } - } - if (what & EV_WRITE) { - switch (em_fd->write_state) { - case GRPC_FD_WAITING: - run_write_cb = 1; - em_fd->write_state = GRPC_FD_IDLE; - break; - case GRPC_FD_IDLE: - case GRPC_FD_CACHED: - em_fd->write_state = GRPC_FD_CACHED; - } - } - - if (run_read_cb) { - rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]); - rdata->status = status; - add_task(rdata); - } else if (run_write_cb) { - wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]); - wdata->status = status; - add_task(wdata); - } - gpr_mu_unlock(&em_fd->mu); -} - -static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) { - /* TODO(klempner): This could just run directly in the calling thread, except - that libevent's handling of event_active() on an event which is already in - flight on a different thread is racy and easily triggers TSAN. - */ - grpc_fd *impl = arg; - gpr_mu_lock(&impl->mu); - impl->shutdown_started = 1; - if (impl->read_state == GRPC_FD_WAITING) { - event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1); - } - if (impl->write_state == GRPC_FD_WAITING) { - event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1); - } - gpr_mu_unlock(&impl->mu); -} - -grpc_fd *grpc_fd_create(int fd) { - int flags; - grpc_libevent_activation_data *rdata, *wdata; - grpc_fd *impl = gpr_malloc(sizeof(grpc_fd)); - - gpr_mu_lock(&grpc_iomgr_mu); - g_num_fds++; - gpr_mu_unlock(&grpc_iomgr_mu); - - impl->shutdown_ev = NULL; - gpr_mu_init(&impl->mu); - - flags = fcntl(fd, F_GETFL, 0); - GPR_ASSERT((flags & O_NONBLOCK) != 0); - - impl->task.type = GRPC_EM_TASK_FD; - impl->fd = fd; - - rdata = &(impl->task.activation[GRPC_EM_TA_READ]); - rdata->ev = NULL; - rdata->cb = NULL; - rdata->arg = NULL; - rdata->status = GRPC_CALLBACK_SUCCESS; - rdata->prev = NULL; - rdata->next = NULL; - - wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]); - wdata->ev = NULL; - wdata->cb = NULL; - wdata->arg = NULL; - wdata->status = GRPC_CALLBACK_SUCCESS; - wdata->prev = NULL; - wdata->next = NULL; - - impl->read_state = GRPC_FD_IDLE; - impl->write_state = GRPC_FD_IDLE; - - impl->shutdown_started = 0; - impl->next = NULL; - - /* TODO(chenw): detect platforms where only level trigger is supported, - and set the event to non-persist. */ - rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ, - em_fd_cb, impl); - GPR_ASSERT(rdata->ev); - - wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE, - em_fd_cb, impl); - GPR_ASSERT(wdata->ev); - - impl->shutdown_ev = - event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl); - GPR_ASSERT(impl->shutdown_ev); - - return impl; -} - -static void do_nothing(void *ignored, grpc_iomgr_cb_status also_ignored) {} - -void grpc_fd_destroy(grpc_fd *impl, grpc_iomgr_cb_func on_done, - void *user_data) { - if (on_done == NULL) on_done = do_nothing; - - gpr_mu_lock(&grpc_iomgr_mu); - - /* Put the impl on the list to be destroyed by the poller. */ - impl->on_done = on_done; - impl->on_done_user_data = user_data; - impl->next = g_fds_to_free; - g_fds_to_free = impl; - /* TODO(ctiller): kick the poller so it destroys this fd promptly - (currently we may wait up to a second) */ - - g_num_fds--; - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); -} - -int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; } - -/* TODO(chenw): should we enforce the contract that notify_on_read cannot be - called when the previously registered callback has not been called yet. */ -int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb, - void *read_cb_arg, gpr_timespec deadline) { - int force_event = 0; - grpc_libevent_activation_data *rdata; - gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); - struct timeval delay = gpr_timeval_from_timespec(delay_timespec); - struct timeval *delayp = - gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; - - rdata = &impl->task.activation[GRPC_EM_TA_READ]; - - gpr_mu_lock(&impl->mu); - rdata->cb = read_cb; - rdata->arg = read_cb_arg; - - force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED); - impl->read_state = GRPC_FD_WAITING; - - if (force_event) { - event_active(rdata->ev, EV_READ, 1); - } else if (event_add(rdata->ev, delayp) == -1) { - gpr_mu_unlock(&impl->mu); - return 0; - } - gpr_mu_unlock(&impl->mu); - return 1; -} - -int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb, - void *write_cb_arg, gpr_timespec deadline) { - int force_event = 0; - grpc_libevent_activation_data *wdata; - gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); - struct timeval delay = gpr_timeval_from_timespec(delay_timespec); - struct timeval *delayp = - gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; - - wdata = &impl->task.activation[GRPC_EM_TA_WRITE]; - - gpr_mu_lock(&impl->mu); - wdata->cb = write_cb; - wdata->arg = write_cb_arg; - - force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED); - impl->write_state = GRPC_FD_WAITING; - - if (force_event) { - event_active(wdata->ev, EV_WRITE, 1); - } else if (event_add(wdata->ev, delayp) == -1) { - gpr_mu_unlock(&impl->mu); - return 0; - } - gpr_mu_unlock(&impl->mu); - return 1; -} - -void grpc_fd_shutdown(grpc_fd *em_fd) { - event_active(em_fd->shutdown_ev, EV_READ, 1); -} - -/* Sometimes we want a followup callback: something to be added from the - current callback for the EM to invoke once this callback is complete. - This is implemented by inserting an entry into an EM queue. */ - -/* The following structure holds the field needed for adding the - followup callback. These are the argument for the followup callback, - the function to use for the followup callback, and the - activation data pointer used for the queues (to free in the CB) */ -struct followup_callback_arg { - grpc_iomgr_cb_func func; - void *cb_arg; - grpc_libevent_activation_data adata; -}; - -static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) { - struct followup_callback_arg *fcb_arg = cb_arg; - /* Invoke the function */ - fcb_arg->func(fcb_arg->cb_arg, status); - gpr_free(fcb_arg); -} - -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { - grpc_libevent_activation_data *adptr; - struct followup_callback_arg *fcb_arg; - - fcb_arg = gpr_malloc(sizeof(*fcb_arg)); - /* Set up the activation data and followup callback argument structures */ - adptr = &fcb_arg->adata; - adptr->ev = NULL; - adptr->cb = followup_proxy_callback; - adptr->arg = fcb_arg; - adptr->status = GRPC_CALLBACK_SUCCESS; - adptr->prev = NULL; - adptr->next = NULL; - - fcb_arg->func = cb; - fcb_arg->cb_arg = cb_arg; - - /* Insert an activation data for the specified em */ - add_task(adptr); -} diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h deleted file mode 100644 index 5c088006a0..0000000000 --- a/src/core/iomgr/iomgr_libevent.h +++ /dev/null @@ -1,206 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ -#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ - -#include "src/core/iomgr/iomgr.h" -#include -#include - -typedef struct grpc_fd grpc_fd; - -/* gRPC event manager task "base class". This is pretend-inheritance in C89. - This should be the first member of any actual grpc_em task type. - - Memory warning: expanding this will increase memory usage in any derived - class, so be careful. - - For generality, this base can be on multiple task queues and can have - multiple event callbacks registered. Not all "derived classes" will use - this feature. */ - -typedef enum grpc_libevent_task_type { - GRPC_EM_TASK_ALARM, - GRPC_EM_TASK_FD, - GRPC_EM_TASK_DO_NOT_USE -} grpc_libevent_task_type; - -/* Different activity types to shape the callback and queueing arrays */ -typedef enum grpc_em_task_activity_type { - GRPC_EM_TA_READ, /* use this also for single-type events */ - GRPC_EM_TA_WRITE, - GRPC_EM_TA_COUNT -} grpc_em_task_activity_type; - -/* Include the following #define for convenience for tasks like alarms that - only have a single type */ -#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ - -typedef struct grpc_libevent_activation_data { - struct event *ev; /* event activated on this callback type */ - grpc_iomgr_cb_func cb; /* function pointer for callback */ - void *arg; /* argument passed to cb */ - - /* Hold the status associated with the callback when queued */ - grpc_iomgr_cb_status status; - /* Now set up to link activations into scheduler queues */ - struct grpc_libevent_activation_data *prev; - struct grpc_libevent_activation_data *next; -} grpc_libevent_activation_data; - -typedef struct grpc_libevent_task { - grpc_libevent_task_type type; - - /* Now have an array of activation data elements: one for each activity - type that could get activated */ - grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT]; -} grpc_libevent_task; - -/* Initialize *em_fd. - Requires fd is a non-blocking file descriptor. - - This takes ownership of closing fd. - - Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */ -grpc_fd *grpc_fd_create(int fd); - -/* Cause *em_fd no longer to be initialized and closes the underlying fd. - on_done is called when the underlying file descriptor is definitely close()d. - If on_done is NULL, no callback will be made. - Requires: *em_fd initialized; no outstanding notify_on_read or - notify_on_write. */ -void grpc_fd_destroy(grpc_fd *em_fd, grpc_iomgr_cb_func on_done, - void *user_data); - -/* Returns the file descriptor associated with *em_fd. */ -int grpc_fd_get(grpc_fd *em_fd); - -/* Register read interest, causing read_cb to be called once when em_fd becomes - readable, on deadline specified by deadline, or on shutdown triggered by - grpc_fd_shutdown. - read_cb will be called with read_cb_arg when *em_fd becomes readable. - read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, - GRPC_CALLBACK_TIMED_OUT if the call timed out, - and CANCELLED if the call was cancelled. - - Requires:This method must not be called before the read_cb for any previous - call runs. Edge triggered events are used whenever they are supported by the - underlying platform. This means that users must drain em_fd in read_cb before - calling notify_on_read again. Users are also expected to handle spurious - events, i.e read_cb is called while nothing can be readable from em_fd */ -int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb, - void *read_cb_arg, gpr_timespec deadline); - -/* Exactly the same semantics as above, except based on writable events. */ -int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, - void *write_cb_arg, gpr_timespec deadline); - -/* Cause any current and all future read/write callbacks to error out with - GRPC_CALLBACK_CANCELLED. */ -void grpc_fd_shutdown(grpc_fd *em_fd); - -/* =================== Event caching =================== - In order to not miss or double-return edges in the context of edge triggering - and multithreading, we need a per-fd caching layer in the eventmanager itself - to cache relevant events. - - There are two types of events we care about: calls to notify_on_[read|write] - and readable/writable events for the socket from eventfd. There are separate - event caches for read and write. - - There are three states: - 0. "waiting" -- There's been a call to notify_on_[read|write] which has not - had a corresponding event. In other words, we're waiting for an event so we - can run the callback. - 1. "idle" -- We are neither waiting nor have a cached event. - 2. "cached" -- There has been a read/write event without a waiting callback, - so we want to run the event next time the application calls - notify_on_[read|write]. - - The high level state diagram: - - +--------------------------------------------------------------------+ - | WAITING | IDLE | CACHED | - | | | | - | 1. --*-> 2. --+-> 3. --+\ - | | | <--+/ - | | | | - x+-- 6. 5. <-+-- 4. <-*-- | - | | | | - +--------------------------------------------------------------------+ - - Transitions right occur on read|write events. Transitions left occur on - notify_on_[read|write] events. - State transitions: - 1. Read|Write event while waiting -> run the callback and transition to idle. - 2. Read|Write event while idle -> transition to cached. - 3. Read|Write event with one already cached -> still cached. - 4. notify_on_[read|write] with event cached: run callback and transition to - idle. - 5. notify_on_[read|write] when idle: Store callback and transition to - waiting. - 6. notify_on_[read|write] when waiting: invalid. */ - -typedef enum grpc_fd_state { - GRPC_FD_WAITING = 0, - GRPC_FD_IDLE = 1, - GRPC_FD_CACHED = 2 -} grpc_fd_state; - -/* gRPC file descriptor handle. - The handle is used to register read/write callbacks to a file descriptor */ -struct grpc_fd { - grpc_libevent_task task; /* Base class, callbacks, queues, etc */ - int fd; /* File descriptor */ - - /* Note that the shutdown event is only needed as a workaround for libevent - not properly handling event_active on an in flight event. */ - struct event *shutdown_ev; /* activated to trigger shutdown */ - - /* protect shutdown_started|read_state|write_state and ensure barriers - between notify_on_[read|write] and read|write callbacks */ - gpr_mu mu; - int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ - grpc_fd_state read_state; - grpc_fd_state write_state; - - /* descriptor delete list. These are destroyed during polling. */ - struct grpc_fd *next; - grpc_iomgr_cb_func on_done; - void *on_done_user_data; -}; - -void grpc_iomgr_ref_address_resolution(int delta); - -#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */ diff --git a/src/core/iomgr/iomgr_libevent_use_threads.c b/src/core/iomgr/iomgr_libevent_use_threads.c deleted file mode 100644 index af449342f0..0000000000 --- a/src/core/iomgr/iomgr_libevent_use_threads.c +++ /dev/null @@ -1,56 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -/* Posix grpc event manager support code. */ -#include -#include -#include - -static int error_code = 0; -static gpr_once threads_once = GPR_ONCE_INIT; -static void evthread_threads_initialize(void) { - error_code = evthread_use_pthreads(); - if (error_code) { - gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); - } -} - -/* Notify LibEvent that Posix pthread is used. */ -int evthread_use_threads() { - gpr_once_init(&threads_once, &evthread_threads_initialize); - /* For Pthreads or Windows threads, Libevent provides simple APIs to set - mutexes and conditional variables to support cross thread operations. - For other platforms, LibEvent provide callback APIs to hook mutexes and - conditional variables. */ - return error_code; -} diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c new file mode 100644 index 0000000000..ff9195ec1d --- /dev/null +++ b/src/core/iomgr/iomgr_posix.c @@ -0,0 +1,38 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/iomgr_posix.h" + +void grpc_iomgr_platform_init() { grpc_pollset_global_init(); } + +void grpc_iomgr_platform_shutdown() { grpc_pollset_global_shutdown(); } diff --git a/src/core/iomgr/iomgr_posix.h b/src/core/iomgr/iomgr_posix.h new file mode 100644 index 0000000000..ca5af3e527 --- /dev/null +++ b/src/core/iomgr/iomgr_posix.h @@ -0,0 +1,42 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ + +#include "src/core/iomgr/iomgr_internal.h" + +void grpc_pollset_global_init(); +void grpc_pollset_global_shutdown(); + +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ */ diff --git a/src/core/iomgr/pollset.c b/src/core/iomgr/pollset.c deleted file mode 100644 index 62a0019eb3..0000000000 --- a/src/core/iomgr/pollset.c +++ /dev/null @@ -1,37 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/iomgr/pollset.h" - -void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; } -void grpc_pollset_destroy(grpc_pollset *pollset) {} diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index ba1a9d5429..7374a4ec13 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -34,18 +34,31 @@ #ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_ #define __GRPC_INTERNAL_IOMGR_POLLSET_H_ +#include + /* A grpc_pollset is a set of file descriptors that a higher level item is interested in. For example: - a server will typically keep a pollset containing all connected channels, so that it can find new calls to service - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -/* Eventually different implementations of iomgr will provide their own - grpc_pollset structs. As this is just a dummy wrapper to get the API in, - we just define a simple type here. */ -typedef struct { char unused; } grpc_pollset; + +#ifdef GPR_POSIX_SOCKET +#include "src/core/iomgr/pollset_posix.h" +#endif void grpc_pollset_init(grpc_pollset *pollset); void grpc_pollset_destroy(grpc_pollset *pollset); +/* Do some work on a pollset. + May involve invoking asynchronous callbacks, or actually polling file + descriptors. + Requires GRPC_POLLSET_MU(pollset) locked. + May unlock GRPC_POLLSET_MU(pollset) during its execution. */ +int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline); + +/* Break a pollset out of polling work + Requires GRPC_POLLSET_MU(pollset) locked. */ +void grpc_pollset_kick(grpc_pollset *pollset); + #endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */ diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c new file mode 100644 index 0000000000..06c7a5a0dd --- /dev/null +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -0,0 +1,237 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include + +#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL + +#include "src/core/iomgr/pollset_posix.h" + +#include +#include +#include +#include + +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include +#include +#include + +typedef struct { + /* all polled fds */ + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; + /* fds being polled by the current poller: parallel arrays of pollfd and the + * grpc_fd* that the pollfd was constructed from */ + size_t pfd_count; + size_t pfd_capacity; + grpc_fd **selfds; + struct pollfd *pfds; + /* fds that have been removed from the pollset explicitly */ + size_t del_count; + size_t del_capacity; + grpc_fd **dels; +} pollset_hdr; + +static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, + grpc_fd *fd) { + size_t i; + pollset_hdr *h = pollset->data.ptr; + /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ + for (i = 0; i < h->fd_count; i++) { + if (h->fds[i] == fd) return; + } + if (h->fd_count == h->fd_capacity) { + h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); + h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity); + } + h->fds[h->fd_count++] = fd; + grpc_fd_ref(fd); +} + +static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, + grpc_fd *fd) { + /* will get removed next poll cycle */ + pollset_hdr *h = pollset->data.ptr; + if (h->del_count == h->del_capacity) { + h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2); + h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity); + } + h->dels[h->del_count++] = fd; + grpc_fd_ref(fd); +} + +static void end_polling(grpc_pollset *pollset) { + size_t i; + pollset_hdr *h; + h = pollset->data.ptr; + for (i = 1; i < h->pfd_count; i++) { + grpc_fd_end_poll(h->selfds[i], pollset); + } +} + +static int multipoll_with_poll_pollset_maybe_work( + grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, + int allow_synchronous_callback) { + int timeout; + int r; + size_t i, np, nf, nd; + pollset_hdr *h; + + if (pollset->counter) { + return 0; + } + h = pollset->data.ptr; + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + timeout = -1; + } else { + timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); + if (timeout <= 0) { + return 1; + } + } + if (h->pfd_capacity < h->fd_count + 1) { + h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); + gpr_free(h->pfds); + gpr_free(h->selfds); + h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity); + h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity); + } + nf = 0; + np = 1; + h->pfds[0].fd = grpc_kick_read_fd(pollset); + h->pfds[0].events = POLLIN; + h->pfds[0].revents = POLLOUT; + for (i = 0; i < h->fd_count; i++) { + int remove = grpc_fd_is_orphaned(h->fds[i]); + for (nd = 0; nd < h->del_count; nd++) { + if (h->fds[i] == h->dels[nd]) remove = 1; + } + if (remove) { + grpc_fd_unref(h->fds[i]); + } else { + h->fds[nf++] = h->fds[i]; + h->pfds[np].events = + grpc_fd_begin_poll(h->fds[i], pollset, POLLIN, POLLOUT); + h->selfds[np] = h->fds[i]; + h->pfds[np].fd = h->fds[i]->fd; + h->pfds[np].revents = 0; + np++; + } + } + h->pfd_count = np; + h->fd_count = nf; + for (nd = 0; nd < h->del_count; nd++) { + grpc_fd_unref(h->dels[nd]); + } + h->del_count = 0; + if (h->pfd_count == 0) { + end_polling(pollset); + return 0; + } + pollset->counter = 1; + gpr_mu_unlock(&pollset->mu); + + r = poll(h->pfds, h->pfd_count, timeout); + if (r < 0) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } else if (r == 0) { + /* do nothing */ + } else { + if (h->pfds[0].revents & POLLIN) { + grpc_kick_drain(pollset); + } + for (i = 1; i < np; i++) { + if (h->pfds[i].revents & POLLIN) { + grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback); + } + if (h->pfds[i].revents & POLLOUT) { + grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback); + } + } + } + end_polling(pollset); + + gpr_mu_lock(&pollset->mu); + pollset->counter = 0; + gpr_cv_broadcast(&pollset->cv); + return 1; +} + +static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { + size_t i; + pollset_hdr *h = pollset->data.ptr; + GPR_ASSERT(pollset->counter == 0); + for (i = 0; i < h->fd_count; i++) { + grpc_fd_unref(h->fds[i]); + } + for (i = 0; i < h->del_count; i++) { + grpc_fd_unref(h->dels[i]); + } + gpr_free(h->pfds); + gpr_free(h->selfds); + gpr_free(h->fds); + gpr_free(h->dels); + gpr_free(h); +} + +static const grpc_pollset_vtable multipoll_with_poll_pollset = { + multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, + multipoll_with_poll_pollset_maybe_work, + multipoll_with_poll_pollset_destroy}; + +void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, + size_t nfds) { + size_t i; + pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); + pollset->vtable = &multipoll_with_poll_pollset; + pollset->data.ptr = h; + h->fd_count = nfds; + h->fd_capacity = nfds; + h->fds = gpr_malloc(nfds * sizeof(grpc_fd *)); + h->pfd_count = 0; + h->pfd_capacity = 0; + h->pfds = NULL; + h->selfds = NULL; + h->del_count = 0; + h->del_capacity = 0; + h->dels = NULL; + for (i = 0; i < nfds; i++) { + h->fds[i] = fds[i]; + grpc_fd_ref(fds[i]); + } +} + +#endif diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c new file mode 100644 index 0000000000..ba4031e11f --- /dev/null +++ b/src/core/iomgr/pollset_posix.c @@ -0,0 +1,340 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/pollset_posix.h" + +#include +#include +#include +#include +#include + +#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include +#include +#include +#include + +/* kick pipes: we keep a sharded set of pipes to allow breaking from poll. + Ideally this would be 1:1 with pollsets, but we'd like to avoid associating + full kernel objects with each pollset to keep them lightweight, so instead + keep a sharded set and allow associating a pollset with one of the shards. + + TODO(ctiller): move this out from this file, and allow an eventfd + implementation on linux */ + +#define LOG2_KICK_SHARDS 6 +#define KICK_SHARDS (1 << LOG2_KICK_SHARDS) + +static int g_kick_pipes[KICK_SHARDS][2]; +static grpc_pollset g_backup_pollset; +static int g_shutdown_backup_poller; +static gpr_event g_backup_poller_done; + +static void backup_poller(void *p) { + gpr_timespec delta = gpr_time_from_millis(100); + gpr_timespec last_poll = gpr_now(); + + gpr_mu_lock(&g_backup_pollset.mu); + while (g_shutdown_backup_poller == 0) { + gpr_timespec next_poll = gpr_time_add(last_poll, delta); + grpc_pollset_work(&g_backup_pollset, next_poll); + gpr_mu_unlock(&g_backup_pollset.mu); + gpr_sleep_until(next_poll); + gpr_mu_lock(&g_backup_pollset.mu); + last_poll = next_poll; + } + gpr_mu_unlock(&g_backup_pollset.mu); + + gpr_event_set(&g_backup_poller_done, (void *)1); +} + +static size_t kick_shard(const grpc_pollset *info) { + size_t x = (size_t)info; + return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1); +} + +int grpc_kick_read_fd(grpc_pollset *p) { + return g_kick_pipes[kick_shard(p)][0]; +} + +static int grpc_kick_write_fd(grpc_pollset *p) { + return g_kick_pipes[kick_shard(p)][1]; +} + +void grpc_pollset_force_kick(grpc_pollset *p) { + char c = 0; + while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR) + ; +} + +void grpc_pollset_kick(grpc_pollset *p) { + if (!p->counter) return; + grpc_pollset_force_kick(p); +} + +void grpc_kick_drain(grpc_pollset *p) { + int fd = grpc_kick_read_fd(p); + char buf[128]; + int r; + + for (;;) { + r = read(fd, buf, sizeof(buf)); + if (r > 0) continue; + if (r == 0) return; + switch (errno) { + case EAGAIN: + return; + case EINTR: + continue; + default: + gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); + return; + } + } +} + +/* global state management */ + +grpc_pollset *grpc_backup_pollset() { return &g_backup_pollset; } + +void grpc_pollset_global_init() { + int i; + gpr_thd_id id; + + /* initialize the kick shards */ + for (i = 0; i < KICK_SHARDS; i++) { + GPR_ASSERT(0 == pipe(g_kick_pipes[i])); + GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1)); + GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1)); + } + + /* initialize the backup pollset */ + grpc_pollset_init(&g_backup_pollset); + + /* start the backup poller thread */ + g_shutdown_backup_poller = 0; + gpr_event_init(&g_backup_poller_done); + gpr_thd_new(&id, backup_poller, NULL, NULL); +} + +void grpc_pollset_global_shutdown() { + int i; + + /* terminate the backup poller thread */ + gpr_mu_lock(&g_backup_pollset.mu); + g_shutdown_backup_poller = 1; + gpr_mu_unlock(&g_backup_pollset.mu); + gpr_event_wait(&g_backup_poller_done, gpr_inf_future); + + /* destroy the backup pollset */ + grpc_pollset_destroy(&g_backup_pollset); + + /* destroy the kick shards */ + for (i = 0; i < KICK_SHARDS; i++) { + close(g_kick_pipes[i][0]); + close(g_kick_pipes[i][1]); + } +} + +/* main interface */ + +static void become_empty_pollset(grpc_pollset *pollset); +static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd); + +void grpc_pollset_init(grpc_pollset *pollset) { + gpr_mu_init(&pollset->mu); + gpr_cv_init(&pollset->cv); + become_empty_pollset(pollset); +} + +void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + gpr_mu_lock(&pollset->mu); + pollset->vtable->add_fd(pollset, fd); + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); +} + +void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { + gpr_mu_lock(&pollset->mu); + pollset->vtable->del_fd(pollset, fd); + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); +} + +int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { + /* pollset->mu already held */ + gpr_timespec now; + now = gpr_now(); + if (gpr_time_cmp(now, deadline) > 0) { + return 0; + } + if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) { + return 1; + } + if (grpc_alarm_check(&pollset->mu, now, &deadline)) { + return 1; + } + return pollset->vtable->maybe_work(pollset, deadline, now, 1); +} + +void grpc_pollset_destroy(grpc_pollset *pollset) { + pollset->vtable->destroy(pollset); + gpr_mu_destroy(&pollset->mu); + gpr_cv_destroy(&pollset->cv); +} + +/* + * empty_pollset - a vtable that provides polling for NO file descriptors + */ + +static void empty_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + become_unary_pollset(pollset, fd); +} + +static void empty_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {} + +static int empty_pollset_maybe_work(grpc_pollset *pollset, + gpr_timespec deadline, gpr_timespec now, + int allow_synchronous_callback) { + return 0; +} + +static void empty_pollset_destroy(grpc_pollset *pollset) {} + +static const grpc_pollset_vtable empty_pollset = { + empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work, + empty_pollset_destroy}; + +static void become_empty_pollset(grpc_pollset *pollset) { + pollset->vtable = &empty_pollset; +} + +/* + * unary_poll_pollset - a vtable that provides polling for one file descriptor + * via poll() + */ + +static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + grpc_fd *fds[2]; + if (fd == pollset->data.ptr) return; + fds[0] = pollset->data.ptr; + fds[1] = fd; + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_fd_unref(fds[0]); +} + +static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { + if (fd == pollset->data.ptr) { + grpc_fd_unref(pollset->data.ptr); + become_empty_pollset(pollset); + } +} + +static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, + gpr_timespec deadline, + gpr_timespec now, + int allow_synchronous_callback) { + struct pollfd pfd[2]; + grpc_fd *fd; + int timeout; + int r; + + if (pollset->counter) { + return 0; + } + fd = pollset->data.ptr; + if (grpc_fd_is_orphaned(fd)) { + grpc_fd_unref(fd); + become_empty_pollset(pollset); + return 0; + } + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + timeout = -1; + } else { + timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); + if (timeout <= 0) { + return 1; + } + } + pfd[0].fd = grpc_kick_read_fd(pollset); + pfd[0].events = POLLIN; + pfd[0].revents = 0; + pfd[1].fd = fd->fd; + pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT); + pfd[1].revents = 0; + pollset->counter = 1; + gpr_mu_unlock(&pollset->mu); + + r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout); + if (r < 0) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } else if (r == 0) { + /* do nothing */ + } else { + if (pfd[0].revents & POLLIN) { + grpc_kick_drain(pollset); + } + if (pfd[1].revents & POLLIN) { + grpc_fd_become_readable(fd, allow_synchronous_callback); + } + if (pfd[1].revents & POLLOUT) { + grpc_fd_become_writable(fd, allow_synchronous_callback); + } + } + + gpr_mu_lock(&pollset->mu); + grpc_fd_end_poll(fd, pollset); + pollset->counter = 0; + gpr_cv_broadcast(&pollset->cv); + return 1; +} + +static void unary_poll_pollset_destroy(grpc_pollset *pollset) { + GPR_ASSERT(pollset->counter == 0); + grpc_fd_unref(pollset->data.ptr); +} + +static const grpc_pollset_vtable unary_poll_pollset = { + unary_poll_pollset_add_fd, unary_poll_pollset_del_fd, + unary_poll_pollset_maybe_work, unary_poll_pollset_destroy}; + +static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) { + pollset->vtable = &unary_poll_pollset; + pollset->counter = 0; + pollset->data.ptr = fd; + grpc_fd_ref(fd); +} diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h new file mode 100644 index 0000000000..f051079f5b --- /dev/null +++ b/src/core/iomgr/pollset_posix.h @@ -0,0 +1,95 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ + +#include + +typedef struct grpc_pollset_vtable grpc_pollset_vtable; + +/* forward declare only in this file to avoid leaking impl details via + pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not + use the struct tag */ +struct grpc_fd; + +typedef struct grpc_pollset { + /* pollsets under posix can mutate representation as fds are added and + removed. + For example, we may choose a poll() based implementation on linux for + few fds, and an epoll() based implementation for many fds */ + const grpc_pollset_vtable *vtable; + gpr_mu mu; + gpr_cv cv; + int counter; + union { + int fd; + void *ptr; + } data; +} grpc_pollset; + +struct grpc_pollset_vtable { + void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, + gpr_timespec now, int allow_synchronous_callback); + void (*destroy)(grpc_pollset *pollset); +}; + +#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) +#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv) + +/* Add an fd to a pollset */ +void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd); +/* Force remove an fd from a pollset (normally they are removed on the next + poll after an fd is orphaned) */ +void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd); + +/* Force any current pollers to break polling */ +void grpc_pollset_force_kick(grpc_pollset *pollset); +/* Returns the fd to listen on for kicks */ +int grpc_kick_read_fd(grpc_pollset *p); +/* Call after polling has been kicked to leave the kicked state */ +void grpc_kick_drain(grpc_pollset *p); + +/* All fds get added to a backup pollset to ensure that progress is made + regardless of applications listening to events. Relying on this is slow + however (the backup pollset only listens every 100ms or so) - so it's not + to be relied on. */ +grpc_pollset *grpc_backup_pollset(); + +/* turn a pollset into a multipoller: platform specific */ +void grpc_platform_become_multipoller(grpc_pollset *pollset, + struct grpc_fd **fds, size_t fd_count); + +#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index a0a04297eb..c9c2c5378a 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -41,7 +41,7 @@ #include #include -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include @@ -201,7 +201,7 @@ static void do_request(void *rp) { gpr_free(r->default_port); gpr_free(r); cb(arg, resolved); - grpc_iomgr_ref_address_resolution(-1); + grpc_iomgr_unref(); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -213,7 +213,7 @@ void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); gpr_thd_id id; - grpc_iomgr_ref_address_resolution(1); + grpc_iomgr_ref(); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 88b599b582..d675c2dcec 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -38,7 +38,9 @@ #include #include -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/iomgr_posix.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" @@ -49,8 +51,11 @@ typedef struct { void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; + gpr_mu mu; grpc_fd *fd; gpr_timespec deadline; + grpc_alarm alarm; + int refs; } async_connect; static int prepare_socket(int fd) { @@ -74,21 +79,42 @@ error: return 0; } -static void on_writable(void *acp, grpc_iomgr_cb_status status) { +static void on_alarm(void *acp, int success) { + int done; + async_connect *ac = acp; + gpr_mu_lock(&ac->mu); + if (ac->fd != NULL && success) { + grpc_fd_shutdown(ac->fd); + } + done = (--ac->refs == 0); + gpr_mu_unlock(&ac->mu); + if (done) { + gpr_mu_destroy(&ac->mu); + gpr_free(ac); + } +} + +static void on_writable(void *acp, int success) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; int err; - int fd = grpc_fd_get(ac->fd); + int fd = ac->fd->fd; + int done; + grpc_endpoint *ep = NULL; + void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; + void *cb_arg = ac->cb_arg; + + grpc_alarm_cancel(&ac->alarm); - if (status == GRPC_CALLBACK_SUCCESS) { + if (success) { do { so_error_size = sizeof(so_error); err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); - goto error; + goto finish; } else if (so_error != 0) { if (so_error == ENOBUFS) { /* We will get one of these errors if we have run out of @@ -106,7 +132,7 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) { opened too many network connections. The "easy" fix: don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); - grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline); + grpc_fd_notify_on_write(ac->fd, on_writable, ac); return; } else { switch (so_error) { @@ -117,27 +143,31 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) { gpr_log(GPR_ERROR, "socket error: %d", so_error); break; } - goto error; + goto finish; } } else { - goto great_success; + ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + goto finish; } } else { - gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status); - goto error; + gpr_log(GPR_ERROR, "on_writable failed during connect"); + goto finish; } abort(); -error: - ac->cb(ac->cb_arg, NULL); - grpc_fd_destroy(ac->fd, NULL, NULL); - gpr_free(ac); - return; - -great_success: - ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); - gpr_free(ac); +finish: + gpr_mu_lock(&ac->mu); + if (!ep) { + grpc_fd_orphan(ac->fd, NULL, NULL); + } + done = (--ac->refs == 0); + gpr_mu_unlock(&ac->mu); + if (done) { + gpr_mu_destroy(&ac->mu); + gpr_free(ac); + } + cb(cb_arg, ep); } void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), @@ -176,6 +206,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), } while (err < 0 && errno == EINTR); if (err >= 0) { + gpr_log(GPR_DEBUG, "instant connect"); cb(arg, grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); return; @@ -191,7 +222,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac = gpr_malloc(sizeof(async_connect)); ac->cb = cb; ac->cb_arg = arg; - ac->deadline = deadline; ac->fd = grpc_fd_create(fd); - grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline); + gpr_mu_init(&ac->mu); + ac->refs = 2; + + grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); + grpc_fd_notify_on_write(ac->fd, on_writable, ac); } diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index bc3ce69e47..657f34aaf9 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -255,18 +255,14 @@ typedef struct { grpc_endpoint_read_cb read_cb; void *read_user_data; - gpr_timespec read_deadline; grpc_endpoint_write_cb write_cb; void *write_user_data; - gpr_timespec write_deadline; grpc_tcp_slice_state write_state; } grpc_tcp; -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status); -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status); +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success); static void grpc_tcp_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -276,7 +272,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) { static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { - grpc_fd_destroy(tcp->em_fd, NULL, NULL); + grpc_fd_orphan(tcp->em_fd, NULL, NULL); gpr_free(tcp); } } @@ -308,8 +304,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, #define INLINE_SLICE_BUFFER_SIZE 8 #define MAX_READ_IOVEC 4 -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status) { +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; int iov_size = 1; gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; @@ -324,18 +319,12 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, 0); - if (status == GRPC_CALLBACK_CANCELLED) { + if (!success) { call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); grpc_tcp_unref(tcp); return; } - if (status == GRPC_CALLBACK_TIMED_OUT) { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_TIMED_OUT); - grpc_tcp_unref(tcp); - return; - } - /* TODO(klempner): Limit the amount we read at once. */ for (;;) { allocated_bytes = slice_state_append_blocks_into_iovec( @@ -377,8 +366,7 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, } else { /* Spurious read event, consume it here */ slice_state_destroy(&read_state); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, - tcp->read_deadline); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); } } else { /* TODO(klempner): Log interesting errors */ @@ -407,14 +395,13 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, } static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline) { + void *user_data) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_user_data = user_data; - tcp->read_deadline = deadline; gpr_ref(&tcp->refcount); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); } #define MAX_WRITE_IOVEC 16 @@ -460,34 +447,24 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { }; } -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status) { +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; grpc_endpoint_write_status write_status; grpc_endpoint_cb_status cb_status; grpc_endpoint_write_cb cb; - cb_status = GRPC_ENDPOINT_CB_OK; - - if (status == GRPC_CALLBACK_CANCELLED) { - cb_status = GRPC_ENDPOINT_CB_SHUTDOWN; - } else if (status == GRPC_CALLBACK_TIMED_OUT) { - cb_status = GRPC_ENDPOINT_CB_TIMED_OUT; - } - - if (cb_status != GRPC_ENDPOINT_CB_OK) { + if (!success) { slice_state_destroy(&tcp->write_state); cb = tcp->write_cb; tcp->write_cb = NULL; - cb(tcp->write_user_data, cb_status); + cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN); grpc_tcp_unref(tcp); return; } write_status = grpc_tcp_flush(tcp); if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, - tcp->write_deadline); + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); } else { slice_state_destroy(&tcp->write_state); if (write_status == GRPC_ENDPOINT_WRITE_DONE) { @@ -502,9 +479,11 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, } } -static grpc_endpoint_write_status grpc_tcp_write( - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { +static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_endpoint_write_status status; @@ -530,17 +509,15 @@ static grpc_endpoint_write_status grpc_tcp_write( gpr_ref(&tcp->refcount); tcp->write_cb = cb; tcp->write_user_data = user_data; - tcp->write_deadline = deadline; - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, - tcp->write_deadline); + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); } return status; } static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { - /* tickle the pollset so we crash if things aren't wired correctly */ - pollset->unused++; + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_pollset_add_fd(pollset, tcp->em_fd); } static const grpc_endpoint_vtable vtable = { @@ -550,14 +527,12 @@ static const grpc_endpoint_vtable vtable = { grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; - tcp->fd = grpc_fd_get(em_fd); + tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; tcp->read_user_data = NULL; tcp->write_user_data = NULL; tcp->slice_size = slice_size; - tcp->read_deadline = gpr_inf_future; - tcp->write_deadline = gpr_inf_future; slice_state_init(&tcp->write_state, NULL, 0, 0); /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index 830394d534..c3eef1b4b7 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -45,7 +45,7 @@ */ #include "src/core/iomgr/endpoint.h" -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/fd_posix.h" #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 46fba13f90..1968246b75 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -49,8 +49,8 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); grpc_tcp_server *grpc_tcp_server_create(); /* Start listening to bound ports */ -void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb, - void *cb_arg); +void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset, + grpc_tcp_server_cb cb, void *cb_arg); /* Add a port to the server, returning true on success, or false otherwise. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 2abaf15ce4..5ed517748a 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -45,7 +45,7 @@ #include #include -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" @@ -97,13 +97,8 @@ grpc_tcp_server *grpc_tcp_server_create() { return s; } -static void done_destroy(void *p, grpc_iomgr_cb_status status) { - gpr_event_set(p, (void *)1); -} - void grpc_tcp_server_destroy(grpc_tcp_server *s) { size_t i; - gpr_event fd_done; gpr_mu_lock(&s->mu); /* shutdown all fd's */ for (i = 0; i < s->nports; i++) { @@ -118,9 +113,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) { /* delete ALL the things */ for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; - gpr_event_init(&fd_done); - grpc_fd_destroy(sp->emfd, done_destroy, &fd_done); - gpr_event_wait(&fd_done, gpr_inf_future); + grpc_fd_orphan(sp->emfd, NULL, NULL); } gpr_free(s->ports); gpr_free(s); @@ -196,10 +189,10 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, grpc_iomgr_cb_status status) { +static void on_read(void *arg, int success) { server_port *sp = arg; - if (status != GRPC_CALLBACK_SUCCESS) { + if (!success) { goto error; } @@ -215,7 +208,7 @@ static void on_read(void *arg, grpc_iomgr_cb_status status) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future); + grpc_fd_notify_on_read(sp->emfd, on_read, sp); return; default: gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); @@ -254,15 +247,10 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity); } sp = &s->ports[s->nports++]; - sp->emfd = grpc_fd_create(fd); - sp->fd = fd; sp->server = s; - /* initialize the em desc */ - if (sp->emfd == NULL) { - s->nports--; - gpr_mu_unlock(&s->mu); - return 0; - } + sp->fd = fd; + sp->emfd = grpc_fd_create(fd); + GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); return 1; @@ -319,8 +307,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index) { return (0 <= index && index < s->nports) ? s->ports[index].fd : -1; } -void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, - void *cb_arg) { +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset, + grpc_tcp_server_cb cb, void *cb_arg) { size_t i; GPR_ASSERT(cb); gpr_mu_lock(&s->mu); @@ -329,8 +317,10 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, s->cb = cb; s->cb_arg = cb_arg; for (i = 0; i < s->nports; i++) { - grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i], - gpr_inf_future); + if (pollset) { + grpc_pollset_add_fd(pollset, s->ports[i].emfd); + } + grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]); s->active_ports++; } gpr_mu_unlock(&s->mu); -- cgit v1.2.3