diff options
Diffstat (limited to 'src/core/iomgr')
23 files changed, 1473 insertions, 1025 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index b7238f716a..2664879323 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -71,8 +71,8 @@ static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static int run_some_expired_alarms(gpr_timespec now, - grpc_iomgr_cb_status status); +static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, + gpr_timespec *next, int success); static gpr_timespec compute_min_deadline(shard_type *shard) { return grpc_alarm_heap_is_empty(&shard->heap) @@ -102,7 +102,7 @@ void grpc_alarm_list_init(gpr_timespec now) { void grpc_alarm_list_shutdown() { int i; - while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED)) + while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0)) ; for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; @@ -233,7 +233,7 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { gpr_mu_unlock(&shard->mu); if (triggered) { - alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED); + alarm->cb(alarm->cb_arg, 0); } } @@ -299,8 +299,8 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now, return n; } -static int run_some_expired_alarms(gpr_timespec now, - grpc_iomgr_cb_status status) { +static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, + gpr_timespec *next, int success) { size_t n = 0; size_t i; grpc_alarm *alarms[MAX_ALARMS_PER_CHECK]; @@ -329,19 +329,35 @@ static int run_some_expired_alarms(gpr_timespec now, note_deadline_change(g_shard_queue[0]); } + if (next) { + *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); + } + gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_checker_mu); + } else if (next) { + gpr_mu_lock(&g_mu); + *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline); + gpr_mu_unlock(&g_mu); + } + + if (n && drop_mu) { + gpr_mu_unlock(drop_mu); } for (i = 0; i < n; i++) { - alarms[i]->cb(alarms[i]->cb_arg, status); + alarms[i]->cb(alarms[i]->cb_arg, success); + } + + if (n && drop_mu) { + gpr_mu_lock(drop_mu); } return n; } -int grpc_alarm_check(gpr_timespec now) { - return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS); +int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { + return run_some_expired_alarms(drop_mu, now, next, 1); } gpr_timespec grpc_alarm_list_next_timeout() { diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h index e605ff84f9..12b6ab4286 100644 --- a/src/core/iomgr/alarm_internal.h +++ b/src/core/iomgr/alarm_internal.h @@ -34,9 +34,12 @@ #ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ #define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_ +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + /* iomgr internal api for dealing with alarms */ -int grpc_alarm_check(gpr_timespec now); +int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next); void grpc_alarm_list_init(gpr_timespec now); void grpc_alarm_list_shutdown(); diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index f1944bf672..9e5d56389d 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -34,14 +34,16 @@ #include "src/core/iomgr/endpoint.h" void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline) { - ep->vtable->notify_on_read(ep, cb, user_data, deadline); + void *user_data) { + ep->vtable->notify_on_read(ep, cb, user_data); } -grpc_endpoint_write_status grpc_endpoint_write( - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { - return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline); +grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data) { + return ep->vtable->write(ep, slices, nslices, cb, user_data); } void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index bbd800bea8..ec86d9a146 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -48,8 +48,7 @@ typedef enum grpc_endpoint_cb_status { GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */ GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */ GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */ - GRPC_ENDPOINT_CB_ERROR, /* Call interrupted by socket error */ - GRPC_ENDPOINT_CB_TIMED_OUT /* Call timed out */ + GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */ } grpc_endpoint_cb_status; typedef enum grpc_endpoint_write_status { @@ -66,10 +65,10 @@ typedef void (*grpc_endpoint_write_cb)(void *user_data, struct grpc_endpoint_vtable { void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline); + void *user_data); grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices, size_t nslices, grpc_endpoint_write_cb cb, - void *user_data, gpr_timespec deadline); + void *user_data); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); void (*shutdown)(grpc_endpoint *ep); void (*destroy)(grpc_endpoint *ep); @@ -77,7 +76,7 @@ struct grpc_endpoint_vtable { /* When data is available on the connection, calls the callback with slices. */ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline); + void *user_data); /* Write slices out to the socket. @@ -85,9 +84,11 @@ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, returns GRPC_ENDPOINT_WRITE_DONE. Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the connection is ready for more data. */ -grpc_endpoint_write_status grpc_endpoint_write( - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline); +grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data); /* Causes any pending read/write callbacks to run immediately with GRPC_ENDPOINT_CB_SHUTDOWN status */ diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c new file mode 100644 index 0000000000..3cd2f9a8e0 --- /dev/null +++ b/src/core/iomgr/fd_posix.c @@ -0,0 +1,274 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/fd_posix.h" + +#include <assert.h> +#include <unistd.h> + +#include "src/core/iomgr/iomgr_internal.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +enum descriptor_state { NOT_READY, READY, WAITING }; + +static void destroy(grpc_fd *fd) { + grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); + gpr_mu_destroy(&fd->set_state_mu); + gpr_free(fd->watchers); + gpr_free(fd); + grpc_iomgr_unref(); +} + +static void ref_by(grpc_fd *fd, int n) { + gpr_atm_no_barrier_fetch_add(&fd->refst, n); +} + +static void unref_by(grpc_fd *fd, int n) { + if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) { + destroy(fd); + } +} + +static void do_nothing(void *ignored, int success) {} + +grpc_fd *grpc_fd_create(int fd) { + grpc_fd *r = gpr_malloc(sizeof(grpc_fd)); + grpc_iomgr_ref(); + gpr_atm_rel_store(&r->refst, 1); + gpr_atm_rel_store(&r->readst.state, NOT_READY); + gpr_atm_rel_store(&r->writest.state, NOT_READY); + gpr_mu_init(&r->set_state_mu); + gpr_mu_init(&r->watcher_mu); + gpr_atm_rel_store(&r->shutdown, 0); + r->fd = fd; + r->watchers = NULL; + r->watcher_count = 0; + r->watcher_capacity = 0; + grpc_pollset_add_fd(grpc_backup_pollset(), r); + return r; +} + +int grpc_fd_is_orphaned(grpc_fd *fd) { + return (gpr_atm_acq_load(&fd->refst) & 1) == 0; +} + +static void wake_watchers(grpc_fd *fd) { + size_t i, n; + gpr_mu_lock(&fd->watcher_mu); + n = fd->watcher_count; + for (i = 0; i < n; i++) { + grpc_pollset_force_kick(fd->watchers[i]); + } + gpr_mu_unlock(&fd->watcher_mu); +} + +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { + fd->on_done = on_done ? on_done : do_nothing; + fd->on_done_user_data = user_data; + ref_by(fd, 1); /* remove active status, but keep referenced */ + wake_watchers(fd); + close(fd->fd); + unref_by(fd, 2); /* drop the reference */ +} + +/* increment refcount by two to avoid changing the orphan bit */ +void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } + +void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } + +typedef struct { + grpc_iomgr_cb_func cb; + void *arg; +} callback; + +static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, + int allow_synchronous_callback) { + if (allow_synchronous_callback) { + cb(arg, success); + } else { + grpc_iomgr_add_delayed_callback(cb, arg, success); + } +} + +static void make_callbacks(callback *callbacks, size_t n, int success, + int allow_synchronous_callback) { + size_t i; + for (i = 0; i < n; i++) { + make_callback(callbacks[i].cb, callbacks[i].arg, success, + allow_synchronous_callback); + } +} + +static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, + void *arg, int allow_synchronous_callback) { + switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { + case NOT_READY: + /* There is no race if the descriptor is already ready, so we skip + the interlocked op in that case. As long as the app doesn't + try to set the same upcall twice (which it shouldn't) then + oldval should never be anything other than READY or NOT_READY. We + don't + check for user error on the fast path. */ + st->cb = cb; + st->cb_arg = arg; + if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) { + /* swap was successful -- the closure will run after the next + set_ready call. NOTE: we don't have an ABA problem here, + since we should never have concurrent calls to the same + notify_on function. */ + wake_watchers(fd); + return; + } + /* swap was unsuccessful due to an intervening set_ready call. + Fall through to the READY code below */ + case READY: + assert(gpr_atm_acq_load(&st->state) == READY); + gpr_atm_rel_store(&st->state, NOT_READY); + make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown), + allow_synchronous_callback); + return; + case WAITING: + /* upcallptr was set to a different closure. This is an error! */ + gpr_log(GPR_ERROR, + "User called a notify_on function with a previous callback still " + "pending"); + abort(); + } + gpr_log(GPR_ERROR, "Corrupt memory in &st->state"); + abort(); +} + +static void set_ready_locked(grpc_fd_state *st, callback *callbacks, + size_t *ncallbacks) { + callback *c; + + switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { + case NOT_READY: + if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) { + /* swap was successful -- the closure will run after the next + notify_on call. */ + return; + } + /* swap was unsuccessful due to an intervening set_ready call. + Fall through to the WAITING code below */ + case WAITING: + assert(gpr_atm_acq_load(&st->state) == WAITING); + c = &callbacks[(*ncallbacks)++]; + c->cb = st->cb; + c->arg = st->cb_arg; + gpr_atm_rel_store(&st->state, NOT_READY); + return; + case READY: + /* duplicate ready, ignore */ + return; + } +} + +static void set_ready(grpc_fd *fd, grpc_fd_state *st, + int allow_synchronous_callback) { + /* only one set_ready can be active at once (but there may be a racing + notify_on) */ + int success; + callback cb; + size_t ncb = 0; + gpr_mu_lock(&fd->set_state_mu); + set_ready_locked(st, &cb, &ncb); + gpr_mu_unlock(&fd->set_state_mu); + success = !gpr_atm_acq_load(&fd->shutdown); + make_callbacks(&cb, ncb, success, allow_synchronous_callback); +} + +void grpc_fd_shutdown(grpc_fd *fd) { + callback cb[2]; + size_t ncb = 0; + gpr_mu_lock(&fd->set_state_mu); + GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown)); + gpr_atm_rel_store(&fd->shutdown, 1); + set_ready_locked(&fd->readst, cb, &ncb); + set_ready_locked(&fd->writest, cb, &ncb); + gpr_mu_unlock(&fd->set_state_mu); + make_callbacks(cb, ncb, 0, 0); +} + +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, + void *read_cb_arg) { + notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0); +} + +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, + void *write_cb_arg) { + notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0); +} + +gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + gpr_uint32 read_mask, gpr_uint32 write_mask) { + /* keep track of pollers that have requested our events, in case they change + */ + gpr_mu_lock(&fd->watcher_mu); + if (fd->watcher_capacity == fd->watcher_count) { + fd->watcher_capacity = + GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2); + fd->watchers = gpr_realloc(fd->watchers, + fd->watcher_capacity * sizeof(grpc_pollset *)); + } + fd->watchers[fd->watcher_count++] = pollset; + gpr_mu_unlock(&fd->watcher_mu); + + return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) | + (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0); +} + +void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) { + size_t r, w, n; + + gpr_mu_lock(&fd->watcher_mu); + n = fd->watcher_count; + for (r = 0, w = 0; r < n; r++) { + if (fd->watchers[r] == pollset) { + fd->watcher_count--; + continue; + } + fd->watchers[w++] = fd->watchers[r]; + } + gpr_mu_unlock(&fd->watcher_mu); +} + +void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { + set_ready(fd, &fd->readst, allow_synchronous_callback); +} + +void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) { + set_ready(fd, &fd->writest, allow_synchronous_callback); +} diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h new file mode 100644 index 0000000000..232de0c3e0 --- /dev/null +++ b/src/core/iomgr/fd_posix.h @@ -0,0 +1,138 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ + +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +typedef struct { + grpc_iomgr_cb_func cb; + void *cb_arg; + int success; + gpr_atm state; +} grpc_fd_state; + +typedef struct grpc_fd { + int fd; + /* refst format: + bit0: 1=active/0=orphaned + bit1-n: refcount + meaning that mostly we ref by two to avoid altering the orphaned bit, + and just unref by 1 when we're ready to flag the object as orphaned */ + gpr_atm refst; + + gpr_mu set_state_mu; + gpr_atm shutdown; + + gpr_mu watcher_mu; + grpc_pollset **watchers; + size_t watcher_count; + size_t watcher_capacity; + + grpc_fd_state readst; + grpc_fd_state writest; + + grpc_iomgr_cb_func on_done; + void *on_done_user_data; +} grpc_fd; + +/* Create a wrapped file descriptor. + Requires fd is a non-blocking file descriptor. + This takes ownership of closing fd. */ +grpc_fd *grpc_fd_create(int fd); + +/* Releases fd to be asynchronously destroyed. + on_done is called when the underlying file descriptor is definitely close()d. + If on_done is NULL, no callback will be made. + Requires: *fd initialized; no outstanding notify_on_read or + notify_on_write. */ +void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data); + +/* Begin polling on an fd. + Registers that the given pollset is interested in this fd - so that if read + or writability interest changes, the pollset can be kicked to pick up that + new interest. + Return value is: + (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0) + i.e. a combination of read_mask and write_mask determined by the fd's current + interest in said events. + Polling strategies that do not need to alter their behavior depending on the + fd's current interest (such as epoll) do not need to call this function. */ +gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, + gpr_uint32 read_mask, gpr_uint32 write_mask); +/* Complete polling previously started with grpc_fd_begin_poll */ +void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset); + +/* Return 1 if this fd is orphaned, 0 otherwise */ +int grpc_fd_is_orphaned(grpc_fd *fd); + +/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */ +void grpc_fd_shutdown(grpc_fd *fd); + +/* Register read interest, causing read_cb to be called once when fd becomes + readable, on deadline specified by deadline, or on shutdown triggered by + grpc_fd_shutdown. + read_cb will be called with read_cb_arg when *fd becomes readable. + read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, + GRPC_CALLBACK_TIMED_OUT if the call timed out, + and CANCELLED if the call was cancelled. + + Requires:This method must not be called before the read_cb for any previous + call runs. Edge triggered events are used whenever they are supported by the + underlying platform. This means that users must drain fd in read_cb before + calling notify_on_read again. Users are also expected to handle spurious + events, i.e read_cb is called while nothing can be readable from fd */ +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, + void *read_cb_arg); + +/* Exactly the same semantics as above, except based on writable events. */ +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, + void *write_cb_arg); + +/* Notification from the poller to an fd that it has become readable or + writable. + If allow_synchronous_callback is 1, allow running the fd callback inline + in this callstack, otherwise register an asynchronous callback and return */ +void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback); +void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback); + +/* Reference counting for fds */ +void grpc_fd_ref(grpc_fd *fd); +void grpc_fd_unref(grpc_fd *fd); + +#endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c new file mode 100644 index 0000000000..03f56a50a3 --- /dev/null +++ b/src/core/iomgr/iomgr.c @@ -0,0 +1,204 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/iomgr.h" + +#include <stdlib.h> + +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/alarm_internal.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/sync.h> + +typedef struct delayed_callback { + grpc_iomgr_cb_func cb; + void *cb_arg; + int success; + struct delayed_callback *next; +} delayed_callback; + +static gpr_mu g_mu; +static gpr_cv g_cv; +static delayed_callback *g_cbs_head = NULL; +static delayed_callback *g_cbs_tail = NULL; +static int g_shutdown; +static int g_refs; +static gpr_event g_background_callback_executor_done; + +/* Execute followup callbacks continuously. + Other threads may check in and help during pollset_work() */ +static void background_callback_executor(void *ignored) { + gpr_mu_lock(&g_mu); + while (!g_shutdown) { + gpr_timespec deadline = gpr_inf_future; + if (g_cbs_head) { + delayed_callback *cb = g_cbs_head; + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + cb->cb(cb->cb_arg, cb->success); + gpr_free(cb); + gpr_mu_lock(&g_mu); + } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) { + } else { + gpr_cv_wait(&g_cv, &g_mu, deadline); + } + } + gpr_mu_unlock(&g_mu); + gpr_event_set(&g_background_callback_executor_done, (void *)1); +} + +void grpc_kick_poller() { gpr_cv_broadcast(&g_cv); } + +void grpc_iomgr_init() { + gpr_thd_id id; + gpr_mu_init(&g_mu); + gpr_cv_init(&g_cv); + grpc_alarm_list_init(gpr_now()); + g_refs = 0; + grpc_iomgr_platform_init(); + gpr_event_init(&g_background_callback_executor_done); + gpr_thd_new(&id, background_callback_executor, NULL, NULL); +} + +void grpc_iomgr_shutdown() { + delayed_callback *cb; + gpr_timespec shutdown_deadline = + gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); + + grpc_iomgr_platform_shutdown(); + + gpr_mu_lock(&g_mu); + g_shutdown = 1; + while (g_cbs_head || g_refs) { + gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs, + g_cbs_head ? " and executing final callbacks" : ""); + while (g_cbs_head) { + cb = g_cbs_head; + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + + cb->cb(cb->cb_arg, 0); + gpr_free(cb); + gpr_mu_lock(&g_mu); + } + if (g_refs) { + if (gpr_cv_wait(&g_cv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) { + gpr_log(GPR_DEBUG, + "Failed to free %d iomgr objects before shutdown deadline: " + "memory leaks are likely", + g_refs); + break; + } + } + } + gpr_mu_unlock(&g_mu); + + gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future); + + grpc_alarm_list_shutdown(); + gpr_mu_destroy(&g_mu); + gpr_cv_destroy(&g_cv); +} + +void grpc_iomgr_ref() { + gpr_mu_lock(&g_mu); + ++g_refs; + gpr_mu_unlock(&g_mu); +} + +void grpc_iomgr_unref() { + gpr_mu_lock(&g_mu); + if (0 == --g_refs) { + gpr_cv_signal(&g_cv); + } + gpr_mu_unlock(&g_mu); +} + +void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, + int success) { + delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback)); + dcb->cb = cb; + dcb->cb_arg = cb_arg; + dcb->success = success; + gpr_mu_lock(&g_mu); + dcb->next = NULL; + if (!g_cbs_tail) { + g_cbs_head = g_cbs_tail = dcb; + } else { + g_cbs_tail->next = dcb; + g_cbs_tail = dcb; + } + gpr_cv_signal(&g_cv); + gpr_mu_unlock(&g_mu); +} + +void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { + grpc_iomgr_add_delayed_callback(cb, cb_arg, 1); +} + +int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { + int n = 0; + gpr_mu *retake_mu = NULL; + delayed_callback *cb; + for (;;) { + /* check for new work */ + if (!gpr_mu_trylock(&g_mu)) { + break; + } + cb = g_cbs_head; + if (!cb) { + gpr_mu_unlock(&g_mu); + break; + } + g_cbs_head = cb->next; + if (!g_cbs_head) g_cbs_tail = NULL; + gpr_mu_unlock(&g_mu); + /* if we have a mutex to drop, do so before executing work */ + if (drop_mu) { + gpr_mu_unlock(drop_mu); + retake_mu = drop_mu; + drop_mu = NULL; + } + cb->cb(cb->cb_arg, success && cb->success); + gpr_free(cb); + n++; + } + if (retake_mu) { + gpr_mu_lock(retake_mu); + } + return n; +} diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index cf39f947bc..16991a9b90 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -34,17 +34,8 @@ #ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__ #define __GRPC_INTERNAL_IOMGR_IOMGR_H__ -/* Status passed to callbacks for grpc_em_fd_notify_on_read and - grpc_em_fd_notify_on_write. */ -typedef enum grpc_em_cb_status { - GRPC_CALLBACK_SUCCESS = 0, - GRPC_CALLBACK_TIMED_OUT, - GRPC_CALLBACK_CANCELLED, - GRPC_CALLBACK_DO_NOT_USE -} grpc_iomgr_cb_status; - /* gRPC Callback definition */ -typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status); +typedef void (*grpc_iomgr_cb_func)(void *arg, int success); void grpc_iomgr_init(); void grpc_iomgr_shutdown(); diff --git a/src/core/iomgr/iomgr_libevent_use_threads.c b/src/core/iomgr/iomgr_internal.h index af449342f0..5f72542777 100644 --- a/src/core/iomgr/iomgr_libevent_use_threads.c +++ b/src/core/iomgr/iomgr_internal.h @@ -31,26 +31,21 @@ * */ -/* Posix grpc event manager support code. */ -#include <grpc/support/log.h> +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ +#define __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ + +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/iomgr_internal.h" #include <grpc/support/sync.h> -#include <event2/thread.h> -static int error_code = 0; -static gpr_once threads_once = GPR_ONCE_INIT; -static void evthread_threads_initialize(void) { - error_code = evthread_use_pthreads(); - if (error_code) { - gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); - } -} +int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); +void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg, + int success); + +void grpc_iomgr_ref(); +void grpc_iomgr_unref(); + +void grpc_iomgr_platform_init(); +void grpc_iomgr_platform_shutdown(); -/* Notify LibEvent that Posix pthread is used. */ -int evthread_use_threads() { - gpr_once_init(&threads_once, &evthread_threads_initialize); - /* For Pthreads or Windows threads, Libevent provides simple APIs to set - mutexes and conditional variables to support cross thread operations. - For other platforms, LibEvent provide callback APIs to hook mutexes and - conditional variables. */ - return error_code; -} +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ */ diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c deleted file mode 100644 index 6188ab2749..0000000000 --- a/src/core/iomgr/iomgr_libevent.c +++ /dev/null @@ -1,652 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "src/core/iomgr/iomgr_libevent.h" - -#include <unistd.h> -#include <fcntl.h> - -#include "src/core/iomgr/alarm.h" -#include "src/core/iomgr/alarm_internal.h" -#include <grpc/support/atm.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/sync.h> -#include <grpc/support/thd.h> -#include <grpc/support/time.h> -#include <event2/event.h> -#include <event2/thread.h> - -#define ALARM_TRIGGER_INIT ((gpr_atm)0) -#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1) -#define DONE_SHUTDOWN ((void *)1) - -#define POLLER_ID_INVALID ((gpr_atm)-1) - -/* Global data */ -struct event_base *g_event_base; -gpr_mu grpc_iomgr_mu; -gpr_cv grpc_iomgr_cv; -static grpc_libevent_activation_data *g_activation_queue; -static int g_num_pollers; -static int g_num_fds; -static int g_num_address_resolutions; -static gpr_timespec g_last_poll_completed; -static int g_shutdown_backup_poller; -static gpr_event g_backup_poller_done; -/* activated to break out of the event loop early */ -static struct event *g_timeout_ev; -/* activated to safely break polling from other threads */ -static struct event *g_break_ev; -static grpc_fd *g_fds_to_free; - -int evthread_use_threads(void); -static void grpc_fd_impl_destroy(grpc_fd *impl); - -void grpc_iomgr_ref_address_resolution(int delta) { - gpr_mu_lock(&grpc_iomgr_mu); - GPR_ASSERT(!g_shutdown_backup_poller); - g_num_address_resolutions += delta; - if (0 == g_num_address_resolutions) { - gpr_cv_broadcast(&grpc_iomgr_cv); - } - gpr_mu_unlock(&grpc_iomgr_mu); -} - -/* If anything is in the work queue, process one item and return 1. - Return 0 if there were no work items to complete. - Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ -static int maybe_do_queue_work() { - grpc_libevent_activation_data *work = g_activation_queue; - - if (work == NULL) return 0; - - if (work->next == work) { - g_activation_queue = NULL; - } else { - g_activation_queue = work->next; - g_activation_queue->prev = work->prev; - g_activation_queue->next->prev = g_activation_queue->prev->next = - g_activation_queue; - } - work->next = work->prev = NULL; - /* force status to cancelled from ok when shutting down */ - if (g_shutdown_backup_poller && work->status == GRPC_CALLBACK_SUCCESS) { - work->status = GRPC_CALLBACK_CANCELLED; - } - gpr_mu_unlock(&grpc_iomgr_mu); - - work->cb(work->arg, work->status); - - gpr_mu_lock(&grpc_iomgr_mu); - return 1; -} - -/* Break out of the event loop on timeout */ -static void timer_callback(int fd, short events, void *context) { - event_base_loopbreak((struct event_base *)context); -} - -static void break_callback(int fd, short events, void *context) { - event_base_loopbreak((struct event_base *)context); -} - -static void free_fd_list(grpc_fd *impl) { - while (impl != NULL) { - grpc_fd *current = impl; - impl = impl->next; - grpc_fd_impl_destroy(current); - current->on_done(current->on_done_user_data, GRPC_CALLBACK_SUCCESS); - gpr_free(current); - } -} - -static void maybe_free_fds() { - if (g_fds_to_free) { - free_fd_list(g_fds_to_free); - g_fds_to_free = NULL; - } -} - -void grpc_kick_poller() { event_active(g_break_ev, EV_READ, 0); } - -/* Spend some time doing polling and libevent maintenance work if no other - thread is. This includes both polling for events and destroying/closing file - descriptor objects. - Returns 1 if polling was performed, 0 otherwise. - Requires grpc_iomgr_mu locked, may unlock and relock during the call. */ -static int maybe_do_polling_work(struct timeval delay) { - int status; - - if (g_num_pollers) return 0; - - g_num_pollers = 1; - - maybe_free_fds(); - - gpr_mu_unlock(&grpc_iomgr_mu); - - event_add(g_timeout_ev, &delay); - status = event_base_loop(g_event_base, EVLOOP_ONCE); - if (status < 0) { - gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status); - } - event_del(g_timeout_ev); - - gpr_mu_lock(&grpc_iomgr_mu); - maybe_free_fds(); - - g_num_pollers = 0; - gpr_cv_broadcast(&grpc_iomgr_cv); - return 1; -} - -static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) { - int r = 0; - if (gpr_time_cmp(next, now) < 0) { - gpr_mu_unlock(&grpc_iomgr_mu); - r = grpc_alarm_check(now); - gpr_mu_lock(&grpc_iomgr_mu); - } - return r; -} - -int grpc_iomgr_work(gpr_timespec deadline) { - gpr_timespec now = gpr_now(); - gpr_timespec next = grpc_alarm_list_next_timeout(); - gpr_timespec delay_timespec = gpr_time_sub(deadline, now); - /* poll for no longer than one second */ - gpr_timespec max_delay = gpr_time_from_seconds(1); - struct timeval delay; - - if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) { - return 0; - } - - if (gpr_time_cmp(delay_timespec, max_delay) > 0) { - delay_timespec = max_delay; - } - - /* Adjust delay to account for the next alarm, if applicable. */ - delay_timespec = gpr_time_min( - delay_timespec, gpr_time_sub(grpc_alarm_list_next_timeout(), now)); - - delay = gpr_timeval_from_timespec(delay_timespec); - - if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) || - maybe_do_polling_work(delay)) { - g_last_poll_completed = gpr_now(); - return 1; - } - - return 0; -} - -static void backup_poller_thread(void *p) { - int backup_poller_engaged = 0; - /* allow no pollers for 100 milliseconds, then engage backup polling */ - gpr_timespec allow_no_pollers = gpr_time_from_millis(100); - - gpr_mu_lock(&grpc_iomgr_mu); - while (!g_shutdown_backup_poller) { - if (g_num_pollers == 0) { - gpr_timespec now = gpr_now(); - gpr_timespec time_until_engage = gpr_time_sub( - allow_no_pollers, gpr_time_sub(now, g_last_poll_completed)); - if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) { - if (!backup_poller_engaged) { - gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller"); - backup_poller_engaged = 1; - } - if (!maybe_do_queue_work()) { - gpr_timespec next = grpc_alarm_list_next_timeout(); - if (!maybe_do_alarm_work(now, next)) { - gpr_timespec deadline = - gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1))); - maybe_do_polling_work( - gpr_timeval_from_timespec(gpr_time_sub(deadline, now))); - } - } - } else { - if (backup_poller_engaged) { - gpr_log(GPR_DEBUG, "Backup poller disengaged"); - backup_poller_engaged = 0; - } - gpr_mu_unlock(&grpc_iomgr_mu); - gpr_sleep_until(gpr_time_add(now, time_until_engage)); - gpr_mu_lock(&grpc_iomgr_mu); - } - } else { - if (backup_poller_engaged) { - gpr_log(GPR_DEBUG, "Backup poller disengaged"); - backup_poller_engaged = 0; - } - gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future); - } - } - gpr_mu_unlock(&grpc_iomgr_mu); - - gpr_event_set(&g_backup_poller_done, (void *)1); -} - -void grpc_iomgr_init() { - gpr_thd_id backup_poller_id; - - if (evthread_use_threads() != 0) { - gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); - abort(); - } - - grpc_alarm_list_init(gpr_now()); - - gpr_mu_init(&grpc_iomgr_mu); - gpr_cv_init(&grpc_iomgr_cv); - g_activation_queue = NULL; - g_num_pollers = 0; - g_num_fds = 0; - g_num_address_resolutions = 0; - g_last_poll_completed = gpr_now(); - g_shutdown_backup_poller = 0; - g_fds_to_free = NULL; - - gpr_event_init(&g_backup_poller_done); - - g_event_base = NULL; - g_timeout_ev = NULL; - g_break_ev = NULL; - - g_event_base = event_base_new(); - if (!g_event_base) { - gpr_log(GPR_ERROR, "Failed to create the event base"); - abort(); - } - - if (evthread_make_base_notifiable(g_event_base) != 0) { - gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!"); - abort(); - } - - g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base); - g_break_ev = event_new(g_event_base, -1, EV_READ | EV_PERSIST, break_callback, - g_event_base); - - event_add(g_break_ev, NULL); - - gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL); -} - -void grpc_iomgr_shutdown() { - gpr_timespec fd_shutdown_deadline = - gpr_time_add(gpr_now(), gpr_time_from_seconds(10)); - - /* broadcast shutdown */ - gpr_mu_lock(&grpc_iomgr_mu); - while (g_num_fds > 0 || g_num_address_resolutions > 0) { - gpr_log(GPR_INFO, - "waiting for %d fds and %d name resolutions to be destroyed before " - "closing event manager", - g_num_fds, g_num_address_resolutions); - if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) { - gpr_log(GPR_ERROR, - "not all fds or name resolutions destroyed before shutdown " - "deadline: memory leaks " - "are likely"); - break; - } else if (g_num_fds == 0 && g_num_address_resolutions == 0) { - gpr_log(GPR_INFO, "all fds closed, all name resolutions finished"); - } - } - - g_shutdown_backup_poller = 1; - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); - - gpr_event_wait(&g_backup_poller_done, gpr_inf_future); - - grpc_alarm_list_shutdown(); - - /* drain pending work */ - gpr_mu_lock(&grpc_iomgr_mu); - while (maybe_do_queue_work()) - ; - gpr_mu_unlock(&grpc_iomgr_mu); - - free_fd_list(g_fds_to_free); - - /* complete shutdown */ - gpr_mu_destroy(&grpc_iomgr_mu); - gpr_cv_destroy(&grpc_iomgr_cv); - - if (g_timeout_ev != NULL) { - event_free(g_timeout_ev); - } - - if (g_break_ev != NULL) { - event_free(g_break_ev); - } - - if (g_event_base != NULL) { - event_base_free(g_event_base); - g_event_base = NULL; - } -} - -static void add_task(grpc_libevent_activation_data *adata) { - gpr_mu_lock(&grpc_iomgr_mu); - if (g_activation_queue) { - adata->next = g_activation_queue; - adata->prev = adata->next->prev; - adata->next->prev = adata->prev->next = adata; - } else { - g_activation_queue = adata; - adata->next = adata->prev = adata; - } - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); -} - -static void grpc_fd_impl_destroy(grpc_fd *impl) { - grpc_em_task_activity_type type; - grpc_libevent_activation_data *adata; - - for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) { - adata = &(impl->task.activation[type]); - GPR_ASSERT(adata->next == NULL); - if (adata->ev != NULL) { - event_free(adata->ev); - adata->ev = NULL; - } - } - - if (impl->shutdown_ev != NULL) { - event_free(impl->shutdown_ev); - impl->shutdown_ev = NULL; - } - gpr_mu_destroy(&impl->mu); - close(impl->fd); -} - -/* Proxy callback to call a gRPC read/write callback */ -static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) { - grpc_fd *em_fd = arg; - grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS; - int run_read_cb = 0; - int run_write_cb = 0; - grpc_libevent_activation_data *rdata, *wdata; - - gpr_mu_lock(&em_fd->mu); - if (em_fd->shutdown_started) { - status = GRPC_CALLBACK_CANCELLED; - } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) { - status = GRPC_CALLBACK_TIMED_OUT; - /* TODO(klempner): This is broken if we are monitoring both read and write - events on the same fd -- generating a spurious event is okay, but - generating a spurious timeout is not. */ - what |= (EV_READ | EV_WRITE); - } - - if (what & EV_READ) { - switch (em_fd->read_state) { - case GRPC_FD_WAITING: - run_read_cb = 1; - em_fd->read_state = GRPC_FD_IDLE; - break; - case GRPC_FD_IDLE: - case GRPC_FD_CACHED: - em_fd->read_state = GRPC_FD_CACHED; - } - } - if (what & EV_WRITE) { - switch (em_fd->write_state) { - case GRPC_FD_WAITING: - run_write_cb = 1; - em_fd->write_state = GRPC_FD_IDLE; - break; - case GRPC_FD_IDLE: - case GRPC_FD_CACHED: - em_fd->write_state = GRPC_FD_CACHED; - } - } - - if (run_read_cb) { - rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]); - rdata->status = status; - add_task(rdata); - } else if (run_write_cb) { - wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]); - wdata->status = status; - add_task(wdata); - } - gpr_mu_unlock(&em_fd->mu); -} - -static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) { - /* TODO(klempner): This could just run directly in the calling thread, except - that libevent's handling of event_active() on an event which is already in - flight on a different thread is racy and easily triggers TSAN. - */ - grpc_fd *impl = arg; - gpr_mu_lock(&impl->mu); - impl->shutdown_started = 1; - if (impl->read_state == GRPC_FD_WAITING) { - event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1); - } - if (impl->write_state == GRPC_FD_WAITING) { - event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1); - } - gpr_mu_unlock(&impl->mu); -} - -grpc_fd *grpc_fd_create(int fd) { - int flags; - grpc_libevent_activation_data *rdata, *wdata; - grpc_fd *impl = gpr_malloc(sizeof(grpc_fd)); - - gpr_mu_lock(&grpc_iomgr_mu); - g_num_fds++; - gpr_mu_unlock(&grpc_iomgr_mu); - - impl->shutdown_ev = NULL; - gpr_mu_init(&impl->mu); - - flags = fcntl(fd, F_GETFL, 0); - GPR_ASSERT((flags & O_NONBLOCK) != 0); - - impl->task.type = GRPC_EM_TASK_FD; - impl->fd = fd; - - rdata = &(impl->task.activation[GRPC_EM_TA_READ]); - rdata->ev = NULL; - rdata->cb = NULL; - rdata->arg = NULL; - rdata->status = GRPC_CALLBACK_SUCCESS; - rdata->prev = NULL; - rdata->next = NULL; - - wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]); - wdata->ev = NULL; - wdata->cb = NULL; - wdata->arg = NULL; - wdata->status = GRPC_CALLBACK_SUCCESS; - wdata->prev = NULL; - wdata->next = NULL; - - impl->read_state = GRPC_FD_IDLE; - impl->write_state = GRPC_FD_IDLE; - - impl->shutdown_started = 0; - impl->next = NULL; - - /* TODO(chenw): detect platforms where only level trigger is supported, - and set the event to non-persist. */ - rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ, - em_fd_cb, impl); - GPR_ASSERT(rdata->ev); - - wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE, - em_fd_cb, impl); - GPR_ASSERT(wdata->ev); - - impl->shutdown_ev = - event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl); - GPR_ASSERT(impl->shutdown_ev); - - return impl; -} - -static void do_nothing(void *ignored, grpc_iomgr_cb_status also_ignored) {} - -void grpc_fd_destroy(grpc_fd *impl, grpc_iomgr_cb_func on_done, - void *user_data) { - if (on_done == NULL) on_done = do_nothing; - - gpr_mu_lock(&grpc_iomgr_mu); - - /* Put the impl on the list to be destroyed by the poller. */ - impl->on_done = on_done; - impl->on_done_user_data = user_data; - impl->next = g_fds_to_free; - g_fds_to_free = impl; - /* TODO(ctiller): kick the poller so it destroys this fd promptly - (currently we may wait up to a second) */ - - g_num_fds--; - gpr_cv_broadcast(&grpc_iomgr_cv); - gpr_mu_unlock(&grpc_iomgr_mu); -} - -int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; } - -/* TODO(chenw): should we enforce the contract that notify_on_read cannot be - called when the previously registered callback has not been called yet. */ -int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb, - void *read_cb_arg, gpr_timespec deadline) { - int force_event = 0; - grpc_libevent_activation_data *rdata; - gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); - struct timeval delay = gpr_timeval_from_timespec(delay_timespec); - struct timeval *delayp = - gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; - - rdata = &impl->task.activation[GRPC_EM_TA_READ]; - - gpr_mu_lock(&impl->mu); - rdata->cb = read_cb; - rdata->arg = read_cb_arg; - - force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED); - impl->read_state = GRPC_FD_WAITING; - - if (force_event) { - event_active(rdata->ev, EV_READ, 1); - } else if (event_add(rdata->ev, delayp) == -1) { - gpr_mu_unlock(&impl->mu); - return 0; - } - gpr_mu_unlock(&impl->mu); - return 1; -} - -int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb, - void *write_cb_arg, gpr_timespec deadline) { - int force_event = 0; - grpc_libevent_activation_data *wdata; - gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); - struct timeval delay = gpr_timeval_from_timespec(delay_timespec); - struct timeval *delayp = - gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; - - wdata = &impl->task.activation[GRPC_EM_TA_WRITE]; - - gpr_mu_lock(&impl->mu); - wdata->cb = write_cb; - wdata->arg = write_cb_arg; - - force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED); - impl->write_state = GRPC_FD_WAITING; - - if (force_event) { - event_active(wdata->ev, EV_WRITE, 1); - } else if (event_add(wdata->ev, delayp) == -1) { - gpr_mu_unlock(&impl->mu); - return 0; - } - gpr_mu_unlock(&impl->mu); - return 1; -} - -void grpc_fd_shutdown(grpc_fd *em_fd) { - event_active(em_fd->shutdown_ev, EV_READ, 1); -} - -/* Sometimes we want a followup callback: something to be added from the - current callback for the EM to invoke once this callback is complete. - This is implemented by inserting an entry into an EM queue. */ - -/* The following structure holds the field needed for adding the - followup callback. These are the argument for the followup callback, - the function to use for the followup callback, and the - activation data pointer used for the queues (to free in the CB) */ -struct followup_callback_arg { - grpc_iomgr_cb_func func; - void *cb_arg; - grpc_libevent_activation_data adata; -}; - -static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) { - struct followup_callback_arg *fcb_arg = cb_arg; - /* Invoke the function */ - fcb_arg->func(fcb_arg->cb_arg, status); - gpr_free(fcb_arg); -} - -void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) { - grpc_libevent_activation_data *adptr; - struct followup_callback_arg *fcb_arg; - - fcb_arg = gpr_malloc(sizeof(*fcb_arg)); - /* Set up the activation data and followup callback argument structures */ - adptr = &fcb_arg->adata; - adptr->ev = NULL; - adptr->cb = followup_proxy_callback; - adptr->arg = fcb_arg; - adptr->status = GRPC_CALLBACK_SUCCESS; - adptr->prev = NULL; - adptr->next = NULL; - - fcb_arg->func = cb; - fcb_arg->cb_arg = cb_arg; - - /* Insert an activation data for the specified em */ - add_task(adptr); -} diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h deleted file mode 100644 index 5c088006a0..0000000000 --- a/src/core/iomgr/iomgr_libevent.h +++ /dev/null @@ -1,206 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ -#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ - -#include "src/core/iomgr/iomgr.h" -#include <grpc/support/sync.h> -#include <grpc/support/time.h> - -typedef struct grpc_fd grpc_fd; - -/* gRPC event manager task "base class". This is pretend-inheritance in C89. - This should be the first member of any actual grpc_em task type. - - Memory warning: expanding this will increase memory usage in any derived - class, so be careful. - - For generality, this base can be on multiple task queues and can have - multiple event callbacks registered. Not all "derived classes" will use - this feature. */ - -typedef enum grpc_libevent_task_type { - GRPC_EM_TASK_ALARM, - GRPC_EM_TASK_FD, - GRPC_EM_TASK_DO_NOT_USE -} grpc_libevent_task_type; - -/* Different activity types to shape the callback and queueing arrays */ -typedef enum grpc_em_task_activity_type { - GRPC_EM_TA_READ, /* use this also for single-type events */ - GRPC_EM_TA_WRITE, - GRPC_EM_TA_COUNT -} grpc_em_task_activity_type; - -/* Include the following #define for convenience for tasks like alarms that - only have a single type */ -#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ - -typedef struct grpc_libevent_activation_data { - struct event *ev; /* event activated on this callback type */ - grpc_iomgr_cb_func cb; /* function pointer for callback */ - void *arg; /* argument passed to cb */ - - /* Hold the status associated with the callback when queued */ - grpc_iomgr_cb_status status; - /* Now set up to link activations into scheduler queues */ - struct grpc_libevent_activation_data *prev; - struct grpc_libevent_activation_data *next; -} grpc_libevent_activation_data; - -typedef struct grpc_libevent_task { - grpc_libevent_task_type type; - - /* Now have an array of activation data elements: one for each activity - type that could get activated */ - grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT]; -} grpc_libevent_task; - -/* Initialize *em_fd. - Requires fd is a non-blocking file descriptor. - - This takes ownership of closing fd. - - Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */ -grpc_fd *grpc_fd_create(int fd); - -/* Cause *em_fd no longer to be initialized and closes the underlying fd. - on_done is called when the underlying file descriptor is definitely close()d. - If on_done is NULL, no callback will be made. - Requires: *em_fd initialized; no outstanding notify_on_read or - notify_on_write. */ -void grpc_fd_destroy(grpc_fd *em_fd, grpc_iomgr_cb_func on_done, - void *user_data); - -/* Returns the file descriptor associated with *em_fd. */ -int grpc_fd_get(grpc_fd *em_fd); - -/* Register read interest, causing read_cb to be called once when em_fd becomes - readable, on deadline specified by deadline, or on shutdown triggered by - grpc_fd_shutdown. - read_cb will be called with read_cb_arg when *em_fd becomes readable. - read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, - GRPC_CALLBACK_TIMED_OUT if the call timed out, - and CANCELLED if the call was cancelled. - - Requires:This method must not be called before the read_cb for any previous - call runs. Edge triggered events are used whenever they are supported by the - underlying platform. This means that users must drain em_fd in read_cb before - calling notify_on_read again. Users are also expected to handle spurious - events, i.e read_cb is called while nothing can be readable from em_fd */ -int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb, - void *read_cb_arg, gpr_timespec deadline); - -/* Exactly the same semantics as above, except based on writable events. */ -int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, - void *write_cb_arg, gpr_timespec deadline); - -/* Cause any current and all future read/write callbacks to error out with - GRPC_CALLBACK_CANCELLED. */ -void grpc_fd_shutdown(grpc_fd *em_fd); - -/* =================== Event caching =================== - In order to not miss or double-return edges in the context of edge triggering - and multithreading, we need a per-fd caching layer in the eventmanager itself - to cache relevant events. - - There are two types of events we care about: calls to notify_on_[read|write] - and readable/writable events for the socket from eventfd. There are separate - event caches for read and write. - - There are three states: - 0. "waiting" -- There's been a call to notify_on_[read|write] which has not - had a corresponding event. In other words, we're waiting for an event so we - can run the callback. - 1. "idle" -- We are neither waiting nor have a cached event. - 2. "cached" -- There has been a read/write event without a waiting callback, - so we want to run the event next time the application calls - notify_on_[read|write]. - - The high level state diagram: - - +--------------------------------------------------------------------+ - | WAITING | IDLE | CACHED | - | | | | - | 1. --*-> 2. --+-> 3. --+\ - | | | <--+/ - | | | | - x+-- 6. 5. <-+-- 4. <-*-- | - | | | | - +--------------------------------------------------------------------+ - - Transitions right occur on read|write events. Transitions left occur on - notify_on_[read|write] events. - State transitions: - 1. Read|Write event while waiting -> run the callback and transition to idle. - 2. Read|Write event while idle -> transition to cached. - 3. Read|Write event with one already cached -> still cached. - 4. notify_on_[read|write] with event cached: run callback and transition to - idle. - 5. notify_on_[read|write] when idle: Store callback and transition to - waiting. - 6. notify_on_[read|write] when waiting: invalid. */ - -typedef enum grpc_fd_state { - GRPC_FD_WAITING = 0, - GRPC_FD_IDLE = 1, - GRPC_FD_CACHED = 2 -} grpc_fd_state; - -/* gRPC file descriptor handle. - The handle is used to register read/write callbacks to a file descriptor */ -struct grpc_fd { - grpc_libevent_task task; /* Base class, callbacks, queues, etc */ - int fd; /* File descriptor */ - - /* Note that the shutdown event is only needed as a workaround for libevent - not properly handling event_active on an in flight event. */ - struct event *shutdown_ev; /* activated to trigger shutdown */ - - /* protect shutdown_started|read_state|write_state and ensure barriers - between notify_on_[read|write] and read|write callbacks */ - gpr_mu mu; - int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ - grpc_fd_state read_state; - grpc_fd_state write_state; - - /* descriptor delete list. These are destroyed during polling. */ - struct grpc_fd *next; - grpc_iomgr_cb_func on_done; - void *on_done_user_data; -}; - -void grpc_iomgr_ref_address_resolution(int delta); - -#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */ diff --git a/src/core/iomgr/pollset.c b/src/core/iomgr/iomgr_posix.c index 62a0019eb3..ff9195ec1d 100644 --- a/src/core/iomgr/pollset.c +++ b/src/core/iomgr/iomgr_posix.c @@ -31,7 +31,8 @@ * */ -#include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/iomgr_posix.h" -void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; } -void grpc_pollset_destroy(grpc_pollset *pollset) {} +void grpc_iomgr_platform_init() { grpc_pollset_global_init(); } + +void grpc_iomgr_platform_shutdown() { grpc_pollset_global_shutdown(); } diff --git a/src/core/iomgr/iomgr_completion_queue_interface.h b/src/core/iomgr/iomgr_posix.h index 3c4efe773a..ca5af3e527 100644 --- a/src/core/iomgr/iomgr_completion_queue_interface.h +++ b/src/core/iomgr/iomgr_posix.h @@ -31,15 +31,12 @@ * */ -#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ -#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ +#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ -/* Internals of iomgr that are exposed only to be used for completion queue - implementation */ +#include "src/core/iomgr/iomgr_internal.h" -extern gpr_mu grpc_iomgr_mu; -extern gpr_cv grpc_iomgr_cv; +void grpc_pollset_global_init(); +void grpc_pollset_global_shutdown(); -int grpc_iomgr_work(gpr_timespec deadline); - -#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */ +#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ */ diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index ba1a9d5429..7374a4ec13 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -34,18 +34,31 @@ #ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_ #define __GRPC_INTERNAL_IOMGR_POLLSET_H_ +#include <grpc/support/port_platform.h> + /* A grpc_pollset is a set of file descriptors that a higher level item is interested in. For example: - a server will typically keep a pollset containing all connected channels, so that it can find new calls to service - a completion queue might keep a pollset with an entry for each transport that is servicing a call that it's tracking */ -/* Eventually different implementations of iomgr will provide their own - grpc_pollset structs. As this is just a dummy wrapper to get the API in, - we just define a simple type here. */ -typedef struct { char unused; } grpc_pollset; + +#ifdef GPR_POSIX_SOCKET +#include "src/core/iomgr/pollset_posix.h" +#endif void grpc_pollset_init(grpc_pollset *pollset); void grpc_pollset_destroy(grpc_pollset *pollset); +/* Do some work on a pollset. + May involve invoking asynchronous callbacks, or actually polling file + descriptors. + Requires GRPC_POLLSET_MU(pollset) locked. + May unlock GRPC_POLLSET_MU(pollset) during its execution. */ +int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline); + +/* Break a pollset out of polling work + Requires GRPC_POLLSET_MU(pollset) locked. */ +void grpc_pollset_kick(grpc_pollset *pollset); + #endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */ diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c new file mode 100644 index 0000000000..06c7a5a0dd --- /dev/null +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -0,0 +1,237 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL + +#include "src/core/iomgr/pollset_posix.h" + +#include <errno.h> +#include <poll.h> +#include <stdlib.h> +#include <string.h> + +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/useful.h> + +typedef struct { + /* all polled fds */ + size_t fd_count; + size_t fd_capacity; + grpc_fd **fds; + /* fds being polled by the current poller: parallel arrays of pollfd and the + * grpc_fd* that the pollfd was constructed from */ + size_t pfd_count; + size_t pfd_capacity; + grpc_fd **selfds; + struct pollfd *pfds; + /* fds that have been removed from the pollset explicitly */ + size_t del_count; + size_t del_capacity; + grpc_fd **dels; +} pollset_hdr; + +static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, + grpc_fd *fd) { + size_t i; + pollset_hdr *h = pollset->data.ptr; + /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ + for (i = 0; i < h->fd_count; i++) { + if (h->fds[i] == fd) return; + } + if (h->fd_count == h->fd_capacity) { + h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2); + h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity); + } + h->fds[h->fd_count++] = fd; + grpc_fd_ref(fd); +} + +static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, + grpc_fd *fd) { + /* will get removed next poll cycle */ + pollset_hdr *h = pollset->data.ptr; + if (h->del_count == h->del_capacity) { + h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2); + h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity); + } + h->dels[h->del_count++] = fd; + grpc_fd_ref(fd); +} + +static void end_polling(grpc_pollset *pollset) { + size_t i; + pollset_hdr *h; + h = pollset->data.ptr; + for (i = 1; i < h->pfd_count; i++) { + grpc_fd_end_poll(h->selfds[i], pollset); + } +} + +static int multipoll_with_poll_pollset_maybe_work( + grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now, + int allow_synchronous_callback) { + int timeout; + int r; + size_t i, np, nf, nd; + pollset_hdr *h; + + if (pollset->counter) { + return 0; + } + h = pollset->data.ptr; + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + timeout = -1; + } else { + timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); + if (timeout <= 0) { + return 1; + } + } + if (h->pfd_capacity < h->fd_count + 1) { + h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1); + gpr_free(h->pfds); + gpr_free(h->selfds); + h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity); + h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity); + } + nf = 0; + np = 1; + h->pfds[0].fd = grpc_kick_read_fd(pollset); + h->pfds[0].events = POLLIN; + h->pfds[0].revents = POLLOUT; + for (i = 0; i < h->fd_count; i++) { + int remove = grpc_fd_is_orphaned(h->fds[i]); + for (nd = 0; nd < h->del_count; nd++) { + if (h->fds[i] == h->dels[nd]) remove = 1; + } + if (remove) { + grpc_fd_unref(h->fds[i]); + } else { + h->fds[nf++] = h->fds[i]; + h->pfds[np].events = + grpc_fd_begin_poll(h->fds[i], pollset, POLLIN, POLLOUT); + h->selfds[np] = h->fds[i]; + h->pfds[np].fd = h->fds[i]->fd; + h->pfds[np].revents = 0; + np++; + } + } + h->pfd_count = np; + h->fd_count = nf; + for (nd = 0; nd < h->del_count; nd++) { + grpc_fd_unref(h->dels[nd]); + } + h->del_count = 0; + if (h->pfd_count == 0) { + end_polling(pollset); + return 0; + } + pollset->counter = 1; + gpr_mu_unlock(&pollset->mu); + + r = poll(h->pfds, h->pfd_count, timeout); + if (r < 0) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } else if (r == 0) { + /* do nothing */ + } else { + if (h->pfds[0].revents & POLLIN) { + grpc_kick_drain(pollset); + } + for (i = 1; i < np; i++) { + if (h->pfds[i].revents & POLLIN) { + grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback); + } + if (h->pfds[i].revents & POLLOUT) { + grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback); + } + } + } + end_polling(pollset); + + gpr_mu_lock(&pollset->mu); + pollset->counter = 0; + gpr_cv_broadcast(&pollset->cv); + return 1; +} + +static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { + size_t i; + pollset_hdr *h = pollset->data.ptr; + GPR_ASSERT(pollset->counter == 0); + for (i = 0; i < h->fd_count; i++) { + grpc_fd_unref(h->fds[i]); + } + for (i = 0; i < h->del_count; i++) { + grpc_fd_unref(h->dels[i]); + } + gpr_free(h->pfds); + gpr_free(h->selfds); + gpr_free(h->fds); + gpr_free(h->dels); + gpr_free(h); +} + +static const grpc_pollset_vtable multipoll_with_poll_pollset = { + multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, + multipoll_with_poll_pollset_maybe_work, + multipoll_with_poll_pollset_destroy}; + +void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, + size_t nfds) { + size_t i; + pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); + pollset->vtable = &multipoll_with_poll_pollset; + pollset->data.ptr = h; + h->fd_count = nfds; + h->fd_capacity = nfds; + h->fds = gpr_malloc(nfds * sizeof(grpc_fd *)); + h->pfd_count = 0; + h->pfd_capacity = 0; + h->pfds = NULL; + h->selfds = NULL; + h->del_count = 0; + h->del_capacity = 0; + h->dels = NULL; + for (i = 0; i < nfds; i++) { + h->fds[i] = fds[i]; + grpc_fd_ref(fds[i]); + } +} + +#endif diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c new file mode 100644 index 0000000000..ba4031e11f --- /dev/null +++ b/src/core/iomgr/pollset_posix.c @@ -0,0 +1,340 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/pollset_posix.h" + +#include <errno.h> +#include <poll.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "src/core/iomgr/alarm_internal.h" +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/socket_utils_posix.h" +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> + +/* kick pipes: we keep a sharded set of pipes to allow breaking from poll. + Ideally this would be 1:1 with pollsets, but we'd like to avoid associating + full kernel objects with each pollset to keep them lightweight, so instead + keep a sharded set and allow associating a pollset with one of the shards. + + TODO(ctiller): move this out from this file, and allow an eventfd + implementation on linux */ + +#define LOG2_KICK_SHARDS 6 +#define KICK_SHARDS (1 << LOG2_KICK_SHARDS) + +static int g_kick_pipes[KICK_SHARDS][2]; +static grpc_pollset g_backup_pollset; +static int g_shutdown_backup_poller; +static gpr_event g_backup_poller_done; + +static void backup_poller(void *p) { + gpr_timespec delta = gpr_time_from_millis(100); + gpr_timespec last_poll = gpr_now(); + + gpr_mu_lock(&g_backup_pollset.mu); + while (g_shutdown_backup_poller == 0) { + gpr_timespec next_poll = gpr_time_add(last_poll, delta); + grpc_pollset_work(&g_backup_pollset, next_poll); + gpr_mu_unlock(&g_backup_pollset.mu); + gpr_sleep_until(next_poll); + gpr_mu_lock(&g_backup_pollset.mu); + last_poll = next_poll; + } + gpr_mu_unlock(&g_backup_pollset.mu); + + gpr_event_set(&g_backup_poller_done, (void *)1); +} + +static size_t kick_shard(const grpc_pollset *info) { + size_t x = (size_t)info; + return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1); +} + +int grpc_kick_read_fd(grpc_pollset *p) { + return g_kick_pipes[kick_shard(p)][0]; +} + +static int grpc_kick_write_fd(grpc_pollset *p) { + return g_kick_pipes[kick_shard(p)][1]; +} + +void grpc_pollset_force_kick(grpc_pollset *p) { + char c = 0; + while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR) + ; +} + +void grpc_pollset_kick(grpc_pollset *p) { + if (!p->counter) return; + grpc_pollset_force_kick(p); +} + +void grpc_kick_drain(grpc_pollset *p) { + int fd = grpc_kick_read_fd(p); + char buf[128]; + int r; + + for (;;) { + r = read(fd, buf, sizeof(buf)); + if (r > 0) continue; + if (r == 0) return; + switch (errno) { + case EAGAIN: + return; + case EINTR: + continue; + default: + gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno)); + return; + } + } +} + +/* global state management */ + +grpc_pollset *grpc_backup_pollset() { return &g_backup_pollset; } + +void grpc_pollset_global_init() { + int i; + gpr_thd_id id; + + /* initialize the kick shards */ + for (i = 0; i < KICK_SHARDS; i++) { + GPR_ASSERT(0 == pipe(g_kick_pipes[i])); + GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1)); + GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1)); + } + + /* initialize the backup pollset */ + grpc_pollset_init(&g_backup_pollset); + + /* start the backup poller thread */ + g_shutdown_backup_poller = 0; + gpr_event_init(&g_backup_poller_done); + gpr_thd_new(&id, backup_poller, NULL, NULL); +} + +void grpc_pollset_global_shutdown() { + int i; + + /* terminate the backup poller thread */ + gpr_mu_lock(&g_backup_pollset.mu); + g_shutdown_backup_poller = 1; + gpr_mu_unlock(&g_backup_pollset.mu); + gpr_event_wait(&g_backup_poller_done, gpr_inf_future); + + /* destroy the backup pollset */ + grpc_pollset_destroy(&g_backup_pollset); + + /* destroy the kick shards */ + for (i = 0; i < KICK_SHARDS; i++) { + close(g_kick_pipes[i][0]); + close(g_kick_pipes[i][1]); + } +} + +/* main interface */ + +static void become_empty_pollset(grpc_pollset *pollset); +static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd); + +void grpc_pollset_init(grpc_pollset *pollset) { + gpr_mu_init(&pollset->mu); + gpr_cv_init(&pollset->cv); + become_empty_pollset(pollset); +} + +void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + gpr_mu_lock(&pollset->mu); + pollset->vtable->add_fd(pollset, fd); + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); +} + +void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { + gpr_mu_lock(&pollset->mu); + pollset->vtable->del_fd(pollset, fd); + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); +} + +int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { + /* pollset->mu already held */ + gpr_timespec now; + now = gpr_now(); + if (gpr_time_cmp(now, deadline) > 0) { + return 0; + } + if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) { + return 1; + } + if (grpc_alarm_check(&pollset->mu, now, &deadline)) { + return 1; + } + return pollset->vtable->maybe_work(pollset, deadline, now, 1); +} + +void grpc_pollset_destroy(grpc_pollset *pollset) { + pollset->vtable->destroy(pollset); + gpr_mu_destroy(&pollset->mu); + gpr_cv_destroy(&pollset->cv); +} + +/* + * empty_pollset - a vtable that provides polling for NO file descriptors + */ + +static void empty_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + become_unary_pollset(pollset, fd); +} + +static void empty_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {} + +static int empty_pollset_maybe_work(grpc_pollset *pollset, + gpr_timespec deadline, gpr_timespec now, + int allow_synchronous_callback) { + return 0; +} + +static void empty_pollset_destroy(grpc_pollset *pollset) {} + +static const grpc_pollset_vtable empty_pollset = { + empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work, + empty_pollset_destroy}; + +static void become_empty_pollset(grpc_pollset *pollset) { + pollset->vtable = &empty_pollset; +} + +/* + * unary_poll_pollset - a vtable that provides polling for one file descriptor + * via poll() + */ + +static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + grpc_fd *fds[2]; + if (fd == pollset->data.ptr) return; + fds[0] = pollset->data.ptr; + fds[1] = fd; + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_fd_unref(fds[0]); +} + +static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { + if (fd == pollset->data.ptr) { + grpc_fd_unref(pollset->data.ptr); + become_empty_pollset(pollset); + } +} + +static int unary_poll_pollset_maybe_work(grpc_pollset *pollset, + gpr_timespec deadline, + gpr_timespec now, + int allow_synchronous_callback) { + struct pollfd pfd[2]; + grpc_fd *fd; + int timeout; + int r; + + if (pollset->counter) { + return 0; + } + fd = pollset->data.ptr; + if (grpc_fd_is_orphaned(fd)) { + grpc_fd_unref(fd); + become_empty_pollset(pollset); + return 0; + } + if (gpr_time_cmp(deadline, gpr_inf_future) == 0) { + timeout = -1; + } else { + timeout = gpr_time_to_millis(gpr_time_sub(deadline, now)); + if (timeout <= 0) { + return 1; + } + } + pfd[0].fd = grpc_kick_read_fd(pollset); + pfd[0].events = POLLIN; + pfd[0].revents = 0; + pfd[1].fd = fd->fd; + pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT); + pfd[1].revents = 0; + pollset->counter = 1; + gpr_mu_unlock(&pollset->mu); + + r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout); + if (r < 0) { + gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno)); + } else if (r == 0) { + /* do nothing */ + } else { + if (pfd[0].revents & POLLIN) { + grpc_kick_drain(pollset); + } + if (pfd[1].revents & POLLIN) { + grpc_fd_become_readable(fd, allow_synchronous_callback); + } + if (pfd[1].revents & POLLOUT) { + grpc_fd_become_writable(fd, allow_synchronous_callback); + } + } + + gpr_mu_lock(&pollset->mu); + grpc_fd_end_poll(fd, pollset); + pollset->counter = 0; + gpr_cv_broadcast(&pollset->cv); + return 1; +} + +static void unary_poll_pollset_destroy(grpc_pollset *pollset) { + GPR_ASSERT(pollset->counter == 0); + grpc_fd_unref(pollset->data.ptr); +} + +static const grpc_pollset_vtable unary_poll_pollset = { + unary_poll_pollset_add_fd, unary_poll_pollset_del_fd, + unary_poll_pollset_maybe_work, unary_poll_pollset_destroy}; + +static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) { + pollset->vtable = &unary_poll_pollset; + pollset->counter = 0; + pollset->data.ptr = fd; + grpc_fd_ref(fd); +} diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h new file mode 100644 index 0000000000..f051079f5b --- /dev/null +++ b/src/core/iomgr/pollset_posix.h @@ -0,0 +1,95 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ +#define __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ + +#include <grpc/support/sync.h> + +typedef struct grpc_pollset_vtable grpc_pollset_vtable; + +/* forward declare only in this file to avoid leaking impl details via + pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not + use the struct tag */ +struct grpc_fd; + +typedef struct grpc_pollset { + /* pollsets under posix can mutate representation as fds are added and + removed. + For example, we may choose a poll() based implementation on linux for + few fds, and an epoll() based implementation for many fds */ + const grpc_pollset_vtable *vtable; + gpr_mu mu; + gpr_cv cv; + int counter; + union { + int fd; + void *ptr; + } data; +} grpc_pollset; + +struct grpc_pollset_vtable { + void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd); + int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline, + gpr_timespec now, int allow_synchronous_callback); + void (*destroy)(grpc_pollset *pollset); +}; + +#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) +#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv) + +/* Add an fd to a pollset */ +void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd); +/* Force remove an fd from a pollset (normally they are removed on the next + poll after an fd is orphaned) */ +void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd); + +/* Force any current pollers to break polling */ +void grpc_pollset_force_kick(grpc_pollset *pollset); +/* Returns the fd to listen on for kicks */ +int grpc_kick_read_fd(grpc_pollset *p); +/* Call after polling has been kicked to leave the kicked state */ +void grpc_kick_drain(grpc_pollset *p); + +/* All fds get added to a backup pollset to ensure that progress is made + regardless of applications listening to events. Relying on this is slow + however (the backup pollset only listens every 100ms or so) - so it's not + to be relied on. */ +grpc_pollset *grpc_backup_pollset(); + +/* turn a pollset into a multipoller: platform specific */ +void grpc_platform_become_multipoller(grpc_pollset *pollset, + struct grpc_fd **fds, size_t fd_count); + +#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index a0a04297eb..c9c2c5378a 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -41,7 +41,7 @@ #include <unistd.h> #include <string.h> -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include <grpc/support/alloc.h> @@ -201,7 +201,7 @@ static void do_request(void *rp) { gpr_free(r->default_port); gpr_free(r); cb(arg, resolved); - grpc_iomgr_ref_address_resolution(-1); + grpc_iomgr_unref(); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -213,7 +213,7 @@ void grpc_resolve_address(const char *name, const char *default_port, grpc_resolve_cb cb, void *arg) { request *r = gpr_malloc(sizeof(request)); gpr_thd_id id; - grpc_iomgr_ref_address_resolution(1); + grpc_iomgr_ref(); r->name = gpr_strdup(name); r->default_port = gpr_strdup(default_port); r->cb = cb; diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 88b599b582..d675c2dcec 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -38,7 +38,9 @@ #include <string.h> #include <unistd.h> -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/alarm.h" +#include "src/core/iomgr/iomgr_posix.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" @@ -49,8 +51,11 @@ typedef struct { void (*cb)(void *arg, grpc_endpoint *tcp); void *cb_arg; + gpr_mu mu; grpc_fd *fd; gpr_timespec deadline; + grpc_alarm alarm; + int refs; } async_connect; static int prepare_socket(int fd) { @@ -74,21 +79,42 @@ error: return 0; } -static void on_writable(void *acp, grpc_iomgr_cb_status status) { +static void on_alarm(void *acp, int success) { + int done; + async_connect *ac = acp; + gpr_mu_lock(&ac->mu); + if (ac->fd != NULL && success) { + grpc_fd_shutdown(ac->fd); + } + done = (--ac->refs == 0); + gpr_mu_unlock(&ac->mu); + if (done) { + gpr_mu_destroy(&ac->mu); + gpr_free(ac); + } +} + +static void on_writable(void *acp, int success) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; int err; - int fd = grpc_fd_get(ac->fd); + int fd = ac->fd->fd; + int done; + grpc_endpoint *ep = NULL; + void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; + void *cb_arg = ac->cb_arg; + + grpc_alarm_cancel(&ac->alarm); - if (status == GRPC_CALLBACK_SUCCESS) { + if (success) { do { so_error_size = sizeof(so_error); err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size); } while (err < 0 && errno == EINTR); if (err < 0) { gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno)); - goto error; + goto finish; } else if (so_error != 0) { if (so_error == ENOBUFS) { /* We will get one of these errors if we have run out of @@ -106,7 +132,7 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) { opened too many network connections. The "easy" fix: don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); - grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline); + grpc_fd_notify_on_write(ac->fd, on_writable, ac); return; } else { switch (so_error) { @@ -117,27 +143,31 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) { gpr_log(GPR_ERROR, "socket error: %d", so_error); break; } - goto error; + goto finish; } } else { - goto great_success; + ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + goto finish; } } else { - gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status); - goto error; + gpr_log(GPR_ERROR, "on_writable failed during connect"); + goto finish; } abort(); -error: - ac->cb(ac->cb_arg, NULL); - grpc_fd_destroy(ac->fd, NULL, NULL); - gpr_free(ac); - return; - -great_success: - ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); - gpr_free(ac); +finish: + gpr_mu_lock(&ac->mu); + if (!ep) { + grpc_fd_orphan(ac->fd, NULL, NULL); + } + done = (--ac->refs == 0); + gpr_mu_unlock(&ac->mu); + if (done) { + gpr_mu_destroy(&ac->mu); + gpr_free(ac); + } + cb(cb_arg, ep); } void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), @@ -176,6 +206,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), } while (err < 0 && errno == EINTR); if (err >= 0) { + gpr_log(GPR_DEBUG, "instant connect"); cb(arg, grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE)); return; @@ -191,7 +222,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), ac = gpr_malloc(sizeof(async_connect)); ac->cb = cb; ac->cb_arg = arg; - ac->deadline = deadline; ac->fd = grpc_fd_create(fd); - grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline); + gpr_mu_init(&ac->mu); + ac->refs = 2; + + grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now()); + grpc_fd_notify_on_write(ac->fd, on_writable, ac); } diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index bc3ce69e47..657f34aaf9 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -255,18 +255,14 @@ typedef struct { grpc_endpoint_read_cb read_cb; void *read_user_data; - gpr_timespec read_deadline; grpc_endpoint_write_cb write_cb; void *write_user_data; - gpr_timespec write_deadline; grpc_tcp_slice_state write_state; } grpc_tcp; -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status); -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status); +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success); +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success); static void grpc_tcp_shutdown(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; @@ -276,7 +272,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) { static void grpc_tcp_unref(grpc_tcp *tcp) { int refcount_zero = gpr_unref(&tcp->refcount); if (refcount_zero) { - grpc_fd_destroy(tcp->em_fd, NULL, NULL); + grpc_fd_orphan(tcp->em_fd, NULL, NULL); gpr_free(tcp); } } @@ -308,8 +304,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices, #define INLINE_SLICE_BUFFER_SIZE 8 #define MAX_READ_IOVEC 4 -static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status) { +static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; int iov_size = 1; gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE]; @@ -324,18 +319,12 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE, 0); - if (status == GRPC_CALLBACK_CANCELLED) { + if (!success) { call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN); grpc_tcp_unref(tcp); return; } - if (status == GRPC_CALLBACK_TIMED_OUT) { - call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_TIMED_OUT); - grpc_tcp_unref(tcp); - return; - } - /* TODO(klempner): Limit the amount we read at once. */ for (;;) { allocated_bytes = slice_state_append_blocks_into_iovec( @@ -377,8 +366,7 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, } else { /* Spurious read event, consume it here */ slice_state_destroy(&read_state); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, - tcp->read_deadline); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); } } else { /* TODO(klempner): Log interesting errors */ @@ -407,14 +395,13 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, } static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb, - void *user_data, gpr_timespec deadline) { + void *user_data) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; tcp->read_user_data = user_data; - tcp->read_deadline = deadline; gpr_ref(&tcp->refcount); - grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline); + grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp); } #define MAX_WRITE_IOVEC 16 @@ -460,34 +447,24 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) { }; } -static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, - grpc_iomgr_cb_status status) { +static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) { grpc_tcp *tcp = (grpc_tcp *)arg; grpc_endpoint_write_status write_status; grpc_endpoint_cb_status cb_status; grpc_endpoint_write_cb cb; - cb_status = GRPC_ENDPOINT_CB_OK; - - if (status == GRPC_CALLBACK_CANCELLED) { - cb_status = GRPC_ENDPOINT_CB_SHUTDOWN; - } else if (status == GRPC_CALLBACK_TIMED_OUT) { - cb_status = GRPC_ENDPOINT_CB_TIMED_OUT; - } - - if (cb_status != GRPC_ENDPOINT_CB_OK) { + if (!success) { slice_state_destroy(&tcp->write_state); cb = tcp->write_cb; tcp->write_cb = NULL; - cb(tcp->write_user_data, cb_status); + cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN); grpc_tcp_unref(tcp); return; } write_status = grpc_tcp_flush(tcp); if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, - tcp->write_deadline); + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); } else { slice_state_destroy(&tcp->write_state); if (write_status == GRPC_ENDPOINT_WRITE_DONE) { @@ -502,9 +479,11 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, } } -static grpc_endpoint_write_status grpc_tcp_write( - grpc_endpoint *ep, gpr_slice *slices, size_t nslices, - grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) { +static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep, + gpr_slice *slices, + size_t nslices, + grpc_endpoint_write_cb cb, + void *user_data) { grpc_tcp *tcp = (grpc_tcp *)ep; grpc_endpoint_write_status status; @@ -530,17 +509,15 @@ static grpc_endpoint_write_status grpc_tcp_write( gpr_ref(&tcp->refcount); tcp->write_cb = cb; tcp->write_user_data = user_data; - tcp->write_deadline = deadline; - grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp, - tcp->write_deadline); + grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp); } return status; } static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { - /* tickle the pollset so we crash if things aren't wired correctly */ - pollset->unused++; + grpc_tcp *tcp = (grpc_tcp *)ep; + grpc_pollset_add_fd(pollset, tcp->em_fd); } static const grpc_endpoint_vtable vtable = { @@ -550,14 +527,12 @@ static const grpc_endpoint_vtable vtable = { grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); tcp->base.vtable = &vtable; - tcp->fd = grpc_fd_get(em_fd); + tcp->fd = em_fd->fd; tcp->read_cb = NULL; tcp->write_cb = NULL; tcp->read_user_data = NULL; tcp->write_user_data = NULL; tcp->slice_size = slice_size; - tcp->read_deadline = gpr_inf_future; - tcp->write_deadline = gpr_inf_future; slice_state_init(&tcp->write_state, NULL, 0, 0); /* paired with unref in grpc_tcp_destroy */ gpr_ref_init(&tcp->refcount, 1); diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h index 830394d534..c3eef1b4b7 100644 --- a/src/core/iomgr/tcp_posix.h +++ b/src/core/iomgr/tcp_posix.h @@ -45,7 +45,7 @@ */ #include "src/core/iomgr/endpoint.h" -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/fd_posix.h" #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 46fba13f90..1968246b75 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -49,8 +49,8 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); grpc_tcp_server *grpc_tcp_server_create(); /* Start listening to bound ports */ -void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb, - void *cb_arg); +void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset, + grpc_tcp_server_cb cb, void *cb_arg); /* Add a port to the server, returning true on success, or false otherwise. diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 2abaf15ce4..5ed517748a 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -45,7 +45,7 @@ #include <string.h> #include <errno.h> -#include "src/core/iomgr/iomgr_libevent.h" +#include "src/core/iomgr/pollset_posix.h" #include "src/core/iomgr/sockaddr_utils.h" #include "src/core/iomgr/socket_utils_posix.h" #include "src/core/iomgr/tcp_posix.h" @@ -97,13 +97,8 @@ grpc_tcp_server *grpc_tcp_server_create() { return s; } -static void done_destroy(void *p, grpc_iomgr_cb_status status) { - gpr_event_set(p, (void *)1); -} - void grpc_tcp_server_destroy(grpc_tcp_server *s) { size_t i; - gpr_event fd_done; gpr_mu_lock(&s->mu); /* shutdown all fd's */ for (i = 0; i < s->nports; i++) { @@ -118,9 +113,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) { /* delete ALL the things */ for (i = 0; i < s->nports; i++) { server_port *sp = &s->ports[i]; - gpr_event_init(&fd_done); - grpc_fd_destroy(sp->emfd, done_destroy, &fd_done); - gpr_event_wait(&fd_done, gpr_inf_future); + grpc_fd_orphan(sp->emfd, NULL, NULL); } gpr_free(s->ports); gpr_free(s); @@ -196,10 +189,10 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, grpc_iomgr_cb_status status) { +static void on_read(void *arg, int success) { server_port *sp = arg; - if (status != GRPC_CALLBACK_SUCCESS) { + if (!success) { goto error; } @@ -215,7 +208,7 @@ static void on_read(void *arg, grpc_iomgr_cb_status status) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future); + grpc_fd_notify_on_read(sp->emfd, on_read, sp); return; default: gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); @@ -254,15 +247,10 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity); } sp = &s->ports[s->nports++]; - sp->emfd = grpc_fd_create(fd); - sp->fd = fd; sp->server = s; - /* initialize the em desc */ - if (sp->emfd == NULL) { - s->nports--; - gpr_mu_unlock(&s->mu); - return 0; - } + sp->fd = fd; + sp->emfd = grpc_fd_create(fd); + GPR_ASSERT(sp->emfd); gpr_mu_unlock(&s->mu); return 1; @@ -319,8 +307,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index) { return (0 <= index && index < s->nports) ? s->ports[index].fd : -1; } -void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, - void *cb_arg) { +void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset, + grpc_tcp_server_cb cb, void *cb_arg) { size_t i; GPR_ASSERT(cb); gpr_mu_lock(&s->mu); @@ -329,8 +317,10 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb, s->cb = cb; s->cb_arg = cb_arg; for (i = 0; i < s->nports; i++) { - grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i], - gpr_inf_future); + if (pollset) { + grpc_pollset_add_fd(pollset, s->ports[i].emfd); + } + grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]); s->active_ports++; } gpr_mu_unlock(&s->mu); |