diff options
Diffstat (limited to 'src/core/eventmanager')
-rw-r--r-- | src/core/eventmanager/em.c | 664 | ||||
-rw-r--r-- | src/core/eventmanager/em.h | 350 | ||||
-rw-r--r-- | src/core/eventmanager/em_posix.c | 56 | ||||
-rw-r--r-- | src/core/eventmanager/em_win32.c | 38 |
4 files changed, 1108 insertions, 0 deletions
diff --git a/src/core/eventmanager/em.c b/src/core/eventmanager/em.c new file mode 100644 index 0000000000..e02d56c0a1 --- /dev/null +++ b/src/core/eventmanager/em.c @@ -0,0 +1,664 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/eventmanager/em.h" + +#include <unistd.h> +#include <fcntl.h> + +#include <grpc/support/atm.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> +#include <event2/event.h> +#include <event2/thread.h> + +int evthread_use_threads(void); + +#define ALARM_TRIGGER_INIT ((gpr_atm)0) +#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1) +#define DONE_SHUTDOWN ((void *)1) + +#define POLLER_ID_INVALID ((gpr_atm)-1) + +/* ================== grpc_em implementation ===================== */ + +/* If anything is in the work queue, process one item and return 1. + Return 0 if there were no work items to complete. + Requires em->mu locked, may unlock and relock during the call. */ +static int maybe_do_queue_work(grpc_em *em) { + grpc_em_activation_data *work = em->q; + + if (work == NULL) return 0; + + if (work->next == work) { + em->q = NULL; + } else { + em->q = work->next; + em->q->prev = work->prev; + em->q->next->prev = em->q->prev->next = em->q; + } + work->next = work->prev = NULL; + gpr_mu_unlock(&em->mu); + + work->cb(work->arg, work->status); + + gpr_mu_lock(&em->mu); + return 1; +} + +/* Break out of the event loop on timeout */ +static void timer_callback(int fd, short events, void *context) { + event_base_loopbreak((struct event_base *)context); +} + +/* Spend some time polling if no other thread is. + Returns 1 if polling was performed, 0 otherwise. + Requires em->mu locked, may unlock and relock during the call. */ +static int maybe_do_polling_work(grpc_em *em, struct timeval delay) { + int status; + + if (em->num_pollers) return 0; + + em->num_pollers = 1; + gpr_mu_unlock(&em->mu); + + event_add(em->timeout_ev, &delay); + status = event_base_loop(em->event_base, EVLOOP_ONCE); + if (status < 0) { + gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status); + } + event_del(em->timeout_ev); + + gpr_mu_lock(&em->mu); + em->num_pollers = 0; + gpr_cv_broadcast(&em->cv); + return 1; +} + +int grpc_em_work(grpc_em *em, gpr_timespec deadline) { + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + /* poll for no longer than one second */ + gpr_timespec max_delay = {1, 0}; + struct timeval delay; + + GPR_ASSERT(em); + + if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) { + return 0; + } + + if (gpr_time_cmp(delay_timespec, max_delay) > 0) { + delay_timespec = max_delay; + } + + delay = gpr_timeval_from_timespec(delay_timespec); + + if (maybe_do_queue_work(em) || maybe_do_polling_work(em, delay)) { + em->last_poll_completed = gpr_now(); + return 1; + } + + return 0; +} + +static void backup_poller_thread(void *p) { + grpc_em *em = p; + int backup_poller_engaged = 0; + /* allow no pollers for 100 milliseconds, then engage backup polling */ + gpr_timespec allow_no_pollers = gpr_time_from_micros(100 * 1000); + + gpr_mu_lock(&em->mu); + while (!em->shutdown_backup_poller) { + if (em->num_pollers == 0) { + gpr_timespec now = gpr_now(); + gpr_timespec time_until_engage = gpr_time_sub( + allow_no_pollers, gpr_time_sub(now, em->last_poll_completed)); + if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) { + if (!backup_poller_engaged) { + gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller"); + backup_poller_engaged = 1; + } + if (!maybe_do_queue_work(em)) { + struct timeval tv = {1, 0}; + maybe_do_polling_work(em, tv); + } + } else { + if (backup_poller_engaged) { + gpr_log(GPR_DEBUG, "Backup poller disengaged"); + backup_poller_engaged = 0; + } + gpr_mu_unlock(&em->mu); + gpr_sleep_until(gpr_time_add(now, time_until_engage)); + gpr_mu_lock(&em->mu); + } + } else { + if (backup_poller_engaged) { + gpr_log(GPR_DEBUG, "Backup poller disengaged"); + backup_poller_engaged = 0; + } + gpr_cv_wait(&em->cv, &em->mu, gpr_inf_future); + } + } + gpr_mu_unlock(&em->mu); + + gpr_event_set(&em->backup_poller_done, (void *)1); +} + +grpc_em_error grpc_em_init(grpc_em *em) { + gpr_thd_id backup_poller_id; + + if (evthread_use_threads() != 0) { + gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); + return GRPC_EM_ERROR; + } + + gpr_mu_init(&em->mu); + gpr_cv_init(&em->cv); + em->q = NULL; + em->num_pollers = 0; + em->num_fds = 0; + em->last_poll_completed = gpr_now(); + em->shutdown_backup_poller = 0; + + gpr_event_init(&em->backup_poller_done); + + em->event_base = NULL; + em->timeout_ev = NULL; + + em->event_base = event_base_new(); + if (!em->event_base) { + gpr_log(GPR_ERROR, "Failed to create the event base"); + return GRPC_EM_ERROR; + } + + if (evthread_make_base_notifiable(em->event_base) != 0) { + gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!"); + return GRPC_EM_ERROR; + } + + em->timeout_ev = evtimer_new(em->event_base, timer_callback, em->event_base); + + gpr_thd_new(&backup_poller_id, backup_poller_thread, em, NULL); + + return GRPC_EM_OK; +} + +grpc_em_error grpc_em_destroy(grpc_em *em) { + gpr_timespec fd_shutdown_deadline = + gpr_time_add(gpr_now(), gpr_time_from_micros(10 * 1000 * 1000)); + + /* broadcast shutdown */ + gpr_mu_lock(&em->mu); + while (em->num_fds) { + gpr_log(GPR_INFO, + "waiting for %d fds to be destroyed before closing event manager", + em->num_fds); + if (gpr_cv_wait(&em->cv, &em->mu, fd_shutdown_deadline)) { + gpr_log(GPR_ERROR, + "not all fds destroyed before shutdown deadline: memory leaks " + "are likely"); + break; + } else if (em->num_fds == 0) { + gpr_log(GPR_INFO, "all fds closed"); + } + } + + em->shutdown_backup_poller = 1; + gpr_cv_broadcast(&em->cv); + gpr_mu_unlock(&em->mu); + + gpr_event_wait(&em->backup_poller_done, gpr_inf_future); + + /* drain pending work */ + gpr_mu_lock(&em->mu); + while (maybe_do_queue_work(em)) + ; + gpr_mu_unlock(&em->mu); + + /* complete shutdown */ + gpr_mu_destroy(&em->mu); + gpr_cv_destroy(&em->cv); + + if (em->timeout_ev != NULL) { + event_free(em->timeout_ev); + } + + if (em->event_base != NULL) { + event_base_free(em->event_base); + em->event_base = NULL; + } + + return GRPC_EM_OK; +} + +static void add_task(grpc_em *em, grpc_em_activation_data *adata) { + gpr_mu_lock(&em->mu); + if (em->q) { + adata->next = em->q; + adata->prev = adata->next->prev; + adata->next->prev = adata->prev->next = adata; + } else { + em->q = adata; + adata->next = adata->prev = adata; + } + gpr_cv_broadcast(&em->cv); + gpr_mu_unlock(&em->mu); +} + +/* ===============grpc_em_alarm implementation==================== */ + +/* The following function frees up the alarm's libevent structure and + should always be invoked just before calling the alarm's callback */ +static void alarm_ev_destroy(grpc_em_alarm *alarm) { + grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; + if (adata->ev != NULL) { + event_free(adata->ev); + adata->ev = NULL; + } +} +/* Proxy callback triggered by alarm->ev to call alarm->cb */ +static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) { + grpc_em_alarm *alarm = arg; + grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; + int trigger_old; + + /* First check if this alarm has been canceled, atomically */ + trigger_old = + gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT); + if (trigger_old == ALARM_TRIGGER_INIT) { + /* Before invoking user callback, destroy the libevent structure */ + alarm_ev_destroy(alarm); + adata->status = GRPC_CALLBACK_SUCCESS; + add_task(alarm->task.em, adata); + } +} + +grpc_em_error grpc_em_alarm_init(grpc_em_alarm *alarm, grpc_em *em, + grpc_em_cb_func alarm_cb, void *alarm_cb_arg) { + grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; + alarm->task.type = GRPC_EM_TASK_ALARM; + alarm->task.em = em; + gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT); + adata->cb = alarm_cb; + adata->arg = alarm_cb_arg; + adata->prev = NULL; + adata->next = NULL; + adata->ev = NULL; + return GRPC_EM_OK; +} + +grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline) { + grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + struct timeval delay = gpr_timeval_from_timespec(delay_timespec); + if (adata->ev) { + event_free(adata->ev); + gpr_log(GPR_INFO, "Adding an alarm that already has an event."); + adata->ev = NULL; + } + adata->ev = evtimer_new(alarm->task.em->event_base, libevent_alarm_cb, alarm); + /* Set the trigger field to untriggered. Do this as the last store since + it is a release of previous stores. */ + gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT); + + if (adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0) { + return GRPC_EM_OK; + } else { + return GRPC_EM_ERROR; + } +} + +grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm, void **arg) { + grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY]; + int trigger_old; + + *arg = adata->arg; + + /* First check if this alarm has been triggered, atomically */ + trigger_old = + gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT); + if (trigger_old == ALARM_TRIGGER_INIT) { + /* We need to make sure that we only invoke the callback if it hasn't + already been invoked */ + /* First remove this event from libevent. This returns success even if the + event has gone active or invoked its callback. */ + if (evtimer_del(adata->ev) != 0) { + /* The delete was unsuccessful for some reason. */ + gpr_log(GPR_ERROR, "Attempt to delete alarm event was unsuccessful"); + return GRPC_EM_ERROR; + } + /* Free up the event structure before invoking callback */ + alarm_ev_destroy(alarm); + adata->status = GRPC_CALLBACK_CANCELLED; + add_task(alarm->task.em, adata); + } + return GRPC_EM_OK; +} + +/* ==================== grpc_em_fd implementation =================== */ + +/* Proxy callback to call a gRPC read/write callback */ +static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) { + grpc_em_fd *em_fd = arg; + grpc_em_cb_status status = GRPC_CALLBACK_SUCCESS; + int run_read_cb = 0; + int run_write_cb = 0; + grpc_em_activation_data *rdata, *wdata; + + gpr_mu_lock(&em_fd->mu); + /* TODO(klempner): We need to delete the event here too so we avoid spurious + shutdowns. */ + if (em_fd->shutdown_started) { + status = GRPC_CALLBACK_CANCELLED; + } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) { + status = GRPC_CALLBACK_TIMED_OUT; + /* TODO(klempner): This is broken if we are monitoring both read and write + events on the same fd -- generating a spurious event is okay, but + generating a spurious timeout is not. */ + what |= (EV_READ | EV_WRITE); + } + + if (what & EV_READ) { + switch (em_fd->read_state) { + case GRPC_EM_FD_WAITING: + run_read_cb = 1; + em_fd->read_state = GRPC_EM_FD_IDLE; + break; + case GRPC_EM_FD_IDLE: + case GRPC_EM_FD_CACHED: + em_fd->read_state = GRPC_EM_FD_CACHED; + } + } + if (what & EV_WRITE) { + switch (em_fd->write_state) { + case GRPC_EM_FD_WAITING: + run_write_cb = 1; + em_fd->write_state = GRPC_EM_FD_IDLE; + break; + case GRPC_EM_FD_IDLE: + case GRPC_EM_FD_CACHED: + em_fd->write_state = GRPC_EM_FD_CACHED; + } + } + + if (run_read_cb) { + rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]); + rdata->status = status; + add_task(em_fd->task.em, rdata); + } else if (run_write_cb) { + wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]); + wdata->status = status; + add_task(em_fd->task.em, wdata); + } + gpr_mu_unlock(&em_fd->mu); +} + +static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) { + /* TODO(klempner): This could just run directly in the calling thread, except + that libevent's handling of event_active() on an event which is already in + flight on a different thread is racy and easily triggers TSAN. + */ + grpc_em_fd *em_fd = arg; + gpr_mu_lock(&em_fd->mu); + em_fd->shutdown_started = 1; + if (em_fd->read_state == GRPC_EM_FD_WAITING) { + event_active(em_fd->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1); + } + if (em_fd->write_state == GRPC_EM_FD_WAITING) { + event_active(em_fd->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1); + } + gpr_mu_unlock(&em_fd->mu); +} + +grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) { + int flags; + grpc_em_activation_data *rdata, *wdata; + + gpr_mu_lock(&em->mu); + em->num_fds++; + gpr_mu_unlock(&em->mu); + + em_fd->shutdown_ev = NULL; + gpr_mu_init(&em_fd->mu); + + flags = fcntl(fd, F_GETFL, 0); + if ((flags & O_NONBLOCK) == 0) { + gpr_log(GPR_ERROR, "File descriptor %d is blocking", fd); + return GRPC_EM_INVALID_ARGUMENTS; + } + + em_fd->task.type = GRPC_EM_TASK_FD; + em_fd->task.em = em; + em_fd->fd = fd; + + rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]); + rdata->ev = NULL; + rdata->cb = NULL; + rdata->arg = NULL; + rdata->status = GRPC_CALLBACK_SUCCESS; + rdata->prev = NULL; + rdata->next = NULL; + + wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]); + wdata->ev = NULL; + wdata->cb = NULL; + wdata->arg = NULL; + wdata->status = GRPC_CALLBACK_SUCCESS; + wdata->prev = NULL; + wdata->next = NULL; + + em_fd->read_state = GRPC_EM_FD_IDLE; + em_fd->write_state = GRPC_EM_FD_IDLE; + + /* TODO(chenw): detect platforms where only level trigger is supported, + and set the event to non-persist. */ + rdata->ev = event_new(em->event_base, em_fd->fd, EV_ET | EV_PERSIST | EV_READ, + em_fd_cb, em_fd); + if (!rdata->ev) { + gpr_log(GPR_ERROR, "Failed to create read event"); + return GRPC_EM_ERROR; + } + + wdata->ev = event_new(em->event_base, em_fd->fd, + EV_ET | EV_PERSIST | EV_WRITE, em_fd_cb, em_fd); + if (!wdata->ev) { + gpr_log(GPR_ERROR, "Failed to create write event"); + return GRPC_EM_ERROR; + } + + em_fd->shutdown_ev = + event_new(em->event_base, -1, EV_READ, em_fd_shutdown_cb, em_fd); + + if (!em_fd->shutdown_ev) { + gpr_log(GPR_ERROR, "Failed to create shutdown event"); + return GRPC_EM_ERROR; + } + + em_fd->shutdown_started = 0; + return GRPC_EM_OK; +} + +void grpc_em_fd_destroy(grpc_em_fd *em_fd) { + grpc_em_task_activity_type type; + grpc_em_activation_data *adata; + grpc_em *em = em_fd->task.em; + + /* ensure anyone holding the lock has left - it's the callers responsibility + to ensure that no new users enter */ + gpr_mu_lock(&em_fd->mu); + gpr_mu_unlock(&em_fd->mu); + + for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) { + adata = &(em_fd->task.activation[type]); + GPR_ASSERT(adata->next == NULL); + if (adata->ev != NULL) { + event_free(adata->ev); + adata->ev = NULL; + } + } + + if (em_fd->shutdown_ev != NULL) { + event_free(em_fd->shutdown_ev); + em_fd->shutdown_ev = NULL; + } + gpr_mu_destroy(&em_fd->mu); + + gpr_mu_lock(&em->mu); + em->num_fds--; + gpr_cv_broadcast(&em->cv); + gpr_mu_unlock(&em->mu); +} + +int grpc_em_fd_get(struct grpc_em_fd *em_fd) { return em_fd->fd; } + +/* Returns the event manager associated with *em_fd. */ +grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd) { return em_fd->task.em; } + +/* TODO(chenw): should we enforce the contract that notify_on_read cannot be + called when the previously registered callback has not been called yet. */ +grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd, + grpc_em_cb_func read_cb, + void *read_cb_arg, + gpr_timespec deadline) { + int force_event = 0; + grpc_em_activation_data *rdata; + grpc_em_error result = GRPC_EM_OK; + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + struct timeval delay = gpr_timeval_from_timespec(delay_timespec); + struct timeval *delayp = + gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; + + rdata = &em_fd->task.activation[GRPC_EM_TA_READ]; + + gpr_mu_lock(&em_fd->mu); + rdata->cb = read_cb; + rdata->arg = read_cb_arg; + + force_event = + (em_fd->shutdown_started || em_fd->read_state == GRPC_EM_FD_CACHED); + em_fd->read_state = GRPC_EM_FD_WAITING; + + if (force_event) { + event_active(rdata->ev, EV_READ, 1); + } else if (event_add(rdata->ev, delayp) == -1) { + result = GRPC_EM_ERROR; + } + gpr_mu_unlock(&em_fd->mu); + return result; +} + +grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd, + grpc_em_cb_func write_cb, + void *write_cb_arg, + gpr_timespec deadline) { + int force_event = 0; + grpc_em_activation_data *wdata; + grpc_em_error result = GRPC_EM_OK; + gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now()); + struct timeval delay = gpr_timeval_from_timespec(delay_timespec); + struct timeval *delayp = + gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL; + + wdata = &em_fd->task.activation[GRPC_EM_TA_WRITE]; + + gpr_mu_lock(&em_fd->mu); + wdata->cb = write_cb; + wdata->arg = write_cb_arg; + + force_event = + (em_fd->shutdown_started || em_fd->write_state == GRPC_EM_FD_CACHED); + em_fd->write_state = GRPC_EM_FD_WAITING; + + if (force_event) { + event_active(wdata->ev, EV_WRITE, 1); + } else if (event_add(wdata->ev, delayp) == -1) { + result = GRPC_EM_ERROR; + } + gpr_mu_unlock(&em_fd->mu); + return result; +} + +void grpc_em_fd_shutdown(grpc_em_fd *em_fd) { + event_active(em_fd->shutdown_ev, EV_READ, 1); +} + +/*====================== Other callback functions ======================*/ + +/* Sometimes we want a followup callback: something to be added from the + current callback for the EM to invoke once this callback is complete. + This is implemented by inserting an entry into an EM queue. */ + +/* The following structure holds the field needed for adding the + followup callback. These are the argument for the followup callback, + the function to use for the followup callback, and the + activation data pointer used for the queues (to free in the CB) */ +struct followup_callback_arg { + grpc_em_cb_func func; + void *cb_arg; + grpc_em_activation_data adata; +}; + +static void followup_proxy_callback(void *cb_arg, grpc_em_cb_status status) { + struct followup_callback_arg *fcb_arg = cb_arg; + /* Invoke the function */ + fcb_arg->func(fcb_arg->cb_arg, status); + gpr_free(fcb_arg); +} + +grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb, + void *cb_arg) { + grpc_em_activation_data *adptr; + struct followup_callback_arg *fcb_arg; + + fcb_arg = gpr_malloc(sizeof(*fcb_arg)); + if (fcb_arg == NULL) { + return GRPC_EM_ERROR; + } + /* Set up the activation data and followup callback argument structures */ + adptr = &fcb_arg->adata; + adptr->ev = NULL; + adptr->cb = followup_proxy_callback; + adptr->arg = fcb_arg; + adptr->status = GRPC_CALLBACK_SUCCESS; + adptr->prev = NULL; + adptr->next = NULL; + + fcb_arg->func = cb; + fcb_arg->cb_arg = cb_arg; + + /* Insert an activation data for the specified em */ + add_task(em, adptr); + return GRPC_EM_OK; +} diff --git a/src/core/eventmanager/em.h b/src/core/eventmanager/em.h new file mode 100644 index 0000000000..32d37a5b98 --- /dev/null +++ b/src/core/eventmanager/em.h @@ -0,0 +1,350 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef __GRPC_INTERNAL_EVENTMANAGER_EM_H__ +#define __GRPC_INTERNAL_EVENTMANAGER_EM_H__ +/* grpc_em is an event manager wrapping event loop with multithread support. + It executes a callback function when a specific event occurs on a file + descriptor or after a timeout has passed. + All methods are threadsafe and can be called from any thread. + + To use the event manager, a grpc_em instance needs to be initialized to + maintains the internal states. The grpc_em instance can be used to + initialize file descriptor instance of grpc_em_fd, or alarm instance of + grpc_em_alarm. The former is used to register a callback with a IO event. + The later is used to schedule an alarm. + + Instantiating any of these data structures requires including em_internal.h + A typical usage example is shown in the end of that header file. */ + +#include <grpc/support/atm.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> + +/* =============== Enums used in GRPC event manager API ==================== */ + +/* Result of a grpc_em operation */ +typedef enum grpc_em_error { + GRPC_EM_OK = 0, /* everything went ok */ + GRPC_EM_ERROR, /* internal errors not caused by the caller */ + GRPC_EM_INVALID_ARGUMENTS /* invalid arguments from the caller */ +} grpc_em_error; + +/* Status passed to callbacks for grpc_em_fd_notify_on_read and + grpc_em_fd_notify_on_write. */ +typedef enum grpc_em_cb_status { + GRPC_CALLBACK_SUCCESS = 0, + GRPC_CALLBACK_TIMED_OUT, + GRPC_CALLBACK_CANCELLED, + GRPC_CALLBACK_DO_NOT_USE +} grpc_em_cb_status; + +/* ======= Useful forward struct typedefs for GRPC event manager API ======= */ + +struct grpc_em; +struct grpc_em_alarm; +struct grpc_fd; + +typedef struct grpc_em grpc_em; +typedef struct grpc_em_alarm grpc_em_alarm; +typedef struct grpc_em_fd grpc_em_fd; + +/* gRPC Callback definition */ +typedef void (*grpc_em_cb_func)(void *arg, grpc_em_cb_status status); + +/* ============================ grpc_em =============================== */ +/* Initialize *em and start polling, return GRPC_EM_OK on success, return + GRPC_EM_ERROR on failure. Upon failure, caller should call grpc_em_destroy() + to clean partially initialized *em. + + Requires: *em uninitialized. */ +grpc_em_error grpc_em_init(grpc_em *em); + +/* Stop polling and cause *em no longer to be initialized. + Return GRPC_EM_OK if event polling is cleanly stopped. + Otherwise, return GRPC_EM_ERROR if polling is shutdown with errors. + Requires: *em initialized; no other concurrent operation on *em. */ +grpc_em_error grpc_em_destroy(grpc_em *em); + +/* do some work; assumes em->mu locked; may unlock and relock em->mu */ +int grpc_em_work(grpc_em *em, gpr_timespec deadline); + +/* =========================== grpc_em_am ============================== */ +/* Initialize *alarm. When expired or canceled, alarm_cb will be called with + *alarm_cb_arg and status to indicate if it expired (SUCCESS) or was + canceled (CANCELLED). alarm_cb is guaranteed to be called exactly once, + and application code should check the status to determine how it was + invoked. The application callback is also responsible for maintaining + information about when to free up any user-level state. */ +grpc_em_error grpc_em_alarm_init(grpc_em_alarm *alarm, grpc_em *em, + grpc_em_cb_func alarm_cb, void *alarm_cb_arg); + +/* Note that there is no alarm destroy function. This is because the + alarm is a one-time occurrence with a guarantee that the callback will + be called exactly once, either at expiration or cancellation. Thus, all + the internal alarm event management state is destroyed just before + that callback is invoked. If the user has additional state associated with + the alarm, the user is responsible for determining when it is safe to + destroy that state. */ + +/* Schedule *alarm to expire at deadline. If *alarm is + re-added before expiration, the *delay is simply reset to the new value. + Return GRPC_EM_OK on success, or GRPC_EM_ERROR on failure. + Upon failure, caller should abort further operations on *alarm */ +grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline); + +/* Cancel an *alarm. + There are three cases: + 1. We normally cancel the alarm + 2. The alarm has already run + 3. We can't cancel the alarm because it is "in flight". + + In all of these cases, the cancellation is still considered successful. + They are essentially distinguished in that the alarm_cb will be run + exactly once from either the cancellation (with status CANCELLED) + or from the activation (with status SUCCESS) + + Requires: cancel() must happen after add() on a given alarm */ +grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm, void **arg); + +/* ========================== grpc_em_fd ============================= */ + +/* Initialize *em_fd, return GRPM_EM_OK on success, GRPC_EM_ERROR on internal + errors, or GRPC_EM_INVALID_ARGUMENTS if fd is a blocking file descriptor. + Upon failure, caller should call grpc_em_fd_destroy() to clean partially + initialized *em_fd. + fd is a non-blocking file descriptor. + + Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */ +grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd); + +/* Cause *em_fd no longer to be initialized. + Requires: *em_fd initialized; no outstanding notify_on_read or + notify_on_write. */ +void grpc_em_fd_destroy(grpc_em_fd *em_fd); + +/* Returns the file descriptor associated with *em_fd. */ +int grpc_em_fd_get(grpc_em_fd *em_fd); + +/* Returns the event manager associated with *em_fd. */ +grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd); + +/* Register read interest, causing read_cb to be called once when em_fd becomes + readable, on deadline specified by deadline, or on shutdown triggered by + grpc_em_fd_shutdown. + Return GRPC_EM_OK on success, or GRPC_EM_ERROR on failure. + Upon Failure, caller should abort further operations on *em_fd except + grpc_em_fd_shutdown(). + read_cb will be called with read_cb_arg when *em_fd becomes readable. + read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable, + GRPC_CALLBACK_TIMED_OUT if the call timed out, + and CANCELLED if the call was cancelled. + + Requires:This method must not be called before the read_cb for any previous + call runs. Edge triggered events are used whenever they are supported by the + underlying platform. This means that users must drain em_fd in read_cb before + calling notify_on_read again. Users are also expected to handle spurious + events, i.e read_cb is called while nothing can be readable from em_fd */ +grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd, + grpc_em_cb_func read_cb, + void *read_cb_arg, + gpr_timespec deadline); + +/* Exactly the same semantics as above, except based on writable events. */ +grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *fd, + grpc_em_cb_func write_cb, + void *write_cb_arg, + gpr_timespec deadline); + +/* Cause any current and all future read/write callbacks to error out with + GRPC_CALLBACK_CANCELLED. */ +void grpc_em_fd_shutdown(grpc_em_fd *em_fd); + +/* ================== Other functions =================== */ + +/* This function is called from within a callback or from anywhere else + and causes the invocation of a callback at some point in the future */ +grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb, + void *cb_arg); + +/* ========== Declarations related to queue management (non-API) =========== */ + +/* Forward declarations */ +struct grpc_em_activation_data; + +/* ================== Actual structure definitions ========================= */ +/* gRPC event manager handle. + The handle is used to initialize both grpc_em_alarm and grpc_em_fd. */ +struct em_thread_arg; + +struct grpc_em { + struct event_base *event_base; + + gpr_mu mu; + gpr_cv cv; + struct grpc_em_activation_data *q; + int num_pollers; + int num_fds; + gpr_timespec last_poll_completed; + + int shutdown_backup_poller; + gpr_event backup_poller_done; + + struct event *timeout_ev; /* activated to break out of the event loop early */ +}; + +/* gRPC event manager task "base class". This is pretend-inheritance in C89. + This should be the first member of any actual grpc_em task type. + + Memory warning: expanding this will increase memory usage in any derived + class, so be careful. + + For generality, this base can be on multiple task queues and can have + multiple event callbacks registered. Not all "derived classes" will use + this feature. */ + +typedef enum grpc_em_task_type { + GRPC_EM_TASK_ALARM, + GRPC_EM_TASK_FD, + GRPC_EM_TASK_DO_NOT_USE +} grpc_em_task_type; + +/* Different activity types to shape the callback and queueing arrays */ +typedef enum grpc_em_task_activity_type { + GRPC_EM_TA_READ, /* use this also for single-type events */ + GRPC_EM_TA_WRITE, + GRPC_EM_TA_COUNT +} grpc_em_task_activity_type; + +/* Include the following #define for convenience for tasks like alarms that + only have a single type */ +#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ + +typedef struct grpc_em_activation_data { + struct event *ev; /* event activated on this callback type */ + grpc_em_cb_func cb; /* function pointer for callback */ + void *arg; /* argument passed to cb */ + + /* Hold the status associated with the callback when queued */ + grpc_em_cb_status status; + /* Now set up to link activations into scheduler queues */ + struct grpc_em_activation_data *prev; + struct grpc_em_activation_data *next; +} grpc_em_activation_data; + +typedef struct grpc_em_task { + grpc_em_task_type type; + grpc_em *em; + + /* Now have an array of activation data elements: one for each activity + type that could get activated */ + grpc_em_activation_data activation[GRPC_EM_TA_COUNT]; +} grpc_em_task; + +/* gRPC alarm handle. + The handle is used to add an alarm which expires after specified timeout. */ +struct grpc_em_alarm { + grpc_em_task task; /* Include the base class */ + + gpr_atm triggered; /* To be used atomically if alarm triggered */ +}; + +/* =================== Event caching =================== + In order to not miss or double-return edges in the context of edge triggering + and multithreading, we need a per-fd caching layer in the eventmanager itself + to cache relevant events. + + There are two types of events we care about: calls to notify_on_[read|write] + and readable/writable events for the socket from eventfd. There are separate + event caches for read and write. + + There are three states: + 0. "waiting" -- There's been a call to notify_on_[read|write] which has not + had a corresponding event. In other words, we're waiting for an event so we + can run the callback. + 1. "idle" -- We are neither waiting nor have a cached event. + 2. "cached" -- There has been a read/write event without a waiting callback, + so we want to run the event next time the application calls + notify_on_[read|write]. + + The high level state diagram: + + +--------------------------------------------------------------------+ + | WAITING | IDLE | CACHED | + | | | | + | 1. --*-> 2. --+-> 3. --+\ + | | | <--+/ + | | | | + x+-- 6. 5. <-+-- 4. <-*-- | + | | | | + +--------------------------------------------------------------------+ + + Transitions right occur on read|write events. Transitions left occur on + notify_on_[read|write] events. + State transitions: + 1. Read|Write event while waiting -> run the callback and transition to idle. + 2. Read|Write event while idle -> transition to cached. + 3. Read|Write event with one already cached -> still cached. + 4. notify_on_[read|write] with event cached: run callback and transition to + idle. + 5. notify_on_[read|write] when idle: Store callback and transition to + waiting. + 6. notify_on_[read|write] when waiting: invalid. */ + +typedef enum grpc_em_fd_state { + GRPC_EM_FD_WAITING = 0, + GRPC_EM_FD_IDLE = 1, + GRPC_EM_FD_CACHED = 2 +} grpc_em_fd_state; + +/* gRPC file descriptor handle. + The handle is used to register read/write callbacks to a file descriptor */ +struct grpc_em_fd { + grpc_em_task task; /* Base class, callbacks, queues, etc */ + int fd; /* File descriptor */ + + /* Note that the shutdown event is only needed as a workaround for libevent + not properly handling event_active on an in flight event. */ + struct event *shutdown_ev; /* activated to trigger shutdown */ + + /* protect shutdown_started|read_state|write_state and ensure barriers + between notify_on_[read|write] and read|write callbacks */ + gpr_mu mu; + int shutdown_started; /* 0 -> shutdown not started, 1 -> started */ + grpc_em_fd_state read_state; + grpc_em_fd_state write_state; + /* activated after some timeout to activate shutdown_ev */ +}; + +#endif /* __GRPC_INTERNAL_EVENTMANAGER_EM_H__ */ diff --git a/src/core/eventmanager/em_posix.c b/src/core/eventmanager/em_posix.c new file mode 100644 index 0000000000..af449342f0 --- /dev/null +++ b/src/core/eventmanager/em_posix.c @@ -0,0 +1,56 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Posix grpc event manager support code. */ +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <event2/thread.h> + +static int error_code = 0; +static gpr_once threads_once = GPR_ONCE_INIT; +static void evthread_threads_initialize(void) { + error_code = evthread_use_pthreads(); + if (error_code) { + gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!"); + } +} + +/* Notify LibEvent that Posix pthread is used. */ +int evthread_use_threads() { + gpr_once_init(&threads_once, &evthread_threads_initialize); + /* For Pthreads or Windows threads, Libevent provides simple APIs to set + mutexes and conditional variables to support cross thread operations. + For other platforms, LibEvent provide callback APIs to hook mutexes and + conditional variables. */ + return error_code; +} diff --git a/src/core/eventmanager/em_win32.c b/src/core/eventmanager/em_win32.c new file mode 100644 index 0000000000..4d5c3b5126 --- /dev/null +++ b/src/core/eventmanager/em_win32.c @@ -0,0 +1,38 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Windows event manager support code. */ +#include <event2/thread.h> + +/* Notify LibEvent that Windows thread is used. */ +int evthread_use_threads() { return evthread_use_windows_threads(); } |