aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/eventmanager/em.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/eventmanager/em.c')
-rw-r--r--src/core/eventmanager/em.c664
1 files changed, 664 insertions, 0 deletions
diff --git a/src/core/eventmanager/em.c b/src/core/eventmanager/em.c
new file mode 100644
index 0000000000..e02d56c0a1
--- /dev/null
+++ b/src/core/eventmanager/em.c
@@ -0,0 +1,664 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/eventmanager/em.h"
+
+#include <unistd.h>
+#include <fcntl.h>
+
+#include <grpc/support/atm.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+#include <event2/event.h>
+#include <event2/thread.h>
+
+int evthread_use_threads(void);
+
+#define ALARM_TRIGGER_INIT ((gpr_atm)0)
+#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1)
+#define DONE_SHUTDOWN ((void *)1)
+
+#define POLLER_ID_INVALID ((gpr_atm)-1)
+
+/* ================== grpc_em implementation ===================== */
+
+/* If anything is in the work queue, process one item and return 1.
+ Return 0 if there were no work items to complete.
+ Requires em->mu locked, may unlock and relock during the call. */
+static int maybe_do_queue_work(grpc_em *em) {
+ grpc_em_activation_data *work = em->q;
+
+ if (work == NULL) return 0;
+
+ if (work->next == work) {
+ em->q = NULL;
+ } else {
+ em->q = work->next;
+ em->q->prev = work->prev;
+ em->q->next->prev = em->q->prev->next = em->q;
+ }
+ work->next = work->prev = NULL;
+ gpr_mu_unlock(&em->mu);
+
+ work->cb(work->arg, work->status);
+
+ gpr_mu_lock(&em->mu);
+ return 1;
+}
+
+/* Break out of the event loop on timeout */
+static void timer_callback(int fd, short events, void *context) {
+ event_base_loopbreak((struct event_base *)context);
+}
+
+/* Spend some time polling if no other thread is.
+ Returns 1 if polling was performed, 0 otherwise.
+ Requires em->mu locked, may unlock and relock during the call. */
+static int maybe_do_polling_work(grpc_em *em, struct timeval delay) {
+ int status;
+
+ if (em->num_pollers) return 0;
+
+ em->num_pollers = 1;
+ gpr_mu_unlock(&em->mu);
+
+ event_add(em->timeout_ev, &delay);
+ status = event_base_loop(em->event_base, EVLOOP_ONCE);
+ if (status < 0) {
+ gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status);
+ }
+ event_del(em->timeout_ev);
+
+ gpr_mu_lock(&em->mu);
+ em->num_pollers = 0;
+ gpr_cv_broadcast(&em->cv);
+ return 1;
+}
+
+int grpc_em_work(grpc_em *em, gpr_timespec deadline) {
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
+ /* poll for no longer than one second */
+ gpr_timespec max_delay = {1, 0};
+ struct timeval delay;
+
+ GPR_ASSERT(em);
+
+ if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) {
+ return 0;
+ }
+
+ if (gpr_time_cmp(delay_timespec, max_delay) > 0) {
+ delay_timespec = max_delay;
+ }
+
+ delay = gpr_timeval_from_timespec(delay_timespec);
+
+ if (maybe_do_queue_work(em) || maybe_do_polling_work(em, delay)) {
+ em->last_poll_completed = gpr_now();
+ return 1;
+ }
+
+ return 0;
+}
+
+static void backup_poller_thread(void *p) {
+ grpc_em *em = p;
+ int backup_poller_engaged = 0;
+ /* allow no pollers for 100 milliseconds, then engage backup polling */
+ gpr_timespec allow_no_pollers = gpr_time_from_micros(100 * 1000);
+
+ gpr_mu_lock(&em->mu);
+ while (!em->shutdown_backup_poller) {
+ if (em->num_pollers == 0) {
+ gpr_timespec now = gpr_now();
+ gpr_timespec time_until_engage = gpr_time_sub(
+ allow_no_pollers, gpr_time_sub(now, em->last_poll_completed));
+ if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) {
+ if (!backup_poller_engaged) {
+ gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller");
+ backup_poller_engaged = 1;
+ }
+ if (!maybe_do_queue_work(em)) {
+ struct timeval tv = {1, 0};
+ maybe_do_polling_work(em, tv);
+ }
+ } else {
+ if (backup_poller_engaged) {
+ gpr_log(GPR_DEBUG, "Backup poller disengaged");
+ backup_poller_engaged = 0;
+ }
+ gpr_mu_unlock(&em->mu);
+ gpr_sleep_until(gpr_time_add(now, time_until_engage));
+ gpr_mu_lock(&em->mu);
+ }
+ } else {
+ if (backup_poller_engaged) {
+ gpr_log(GPR_DEBUG, "Backup poller disengaged");
+ backup_poller_engaged = 0;
+ }
+ gpr_cv_wait(&em->cv, &em->mu, gpr_inf_future);
+ }
+ }
+ gpr_mu_unlock(&em->mu);
+
+ gpr_event_set(&em->backup_poller_done, (void *)1);
+}
+
+grpc_em_error grpc_em_init(grpc_em *em) {
+ gpr_thd_id backup_poller_id;
+
+ if (evthread_use_threads() != 0) {
+ gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!");
+ return GRPC_EM_ERROR;
+ }
+
+ gpr_mu_init(&em->mu);
+ gpr_cv_init(&em->cv);
+ em->q = NULL;
+ em->num_pollers = 0;
+ em->num_fds = 0;
+ em->last_poll_completed = gpr_now();
+ em->shutdown_backup_poller = 0;
+
+ gpr_event_init(&em->backup_poller_done);
+
+ em->event_base = NULL;
+ em->timeout_ev = NULL;
+
+ em->event_base = event_base_new();
+ if (!em->event_base) {
+ gpr_log(GPR_ERROR, "Failed to create the event base");
+ return GRPC_EM_ERROR;
+ }
+
+ if (evthread_make_base_notifiable(em->event_base) != 0) {
+ gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!");
+ return GRPC_EM_ERROR;
+ }
+
+ em->timeout_ev = evtimer_new(em->event_base, timer_callback, em->event_base);
+
+ gpr_thd_new(&backup_poller_id, backup_poller_thread, em, NULL);
+
+ return GRPC_EM_OK;
+}
+
+grpc_em_error grpc_em_destroy(grpc_em *em) {
+ gpr_timespec fd_shutdown_deadline =
+ gpr_time_add(gpr_now(), gpr_time_from_micros(10 * 1000 * 1000));
+
+ /* broadcast shutdown */
+ gpr_mu_lock(&em->mu);
+ while (em->num_fds) {
+ gpr_log(GPR_INFO,
+ "waiting for %d fds to be destroyed before closing event manager",
+ em->num_fds);
+ if (gpr_cv_wait(&em->cv, &em->mu, fd_shutdown_deadline)) {
+ gpr_log(GPR_ERROR,
+ "not all fds destroyed before shutdown deadline: memory leaks "
+ "are likely");
+ break;
+ } else if (em->num_fds == 0) {
+ gpr_log(GPR_INFO, "all fds closed");
+ }
+ }
+
+ em->shutdown_backup_poller = 1;
+ gpr_cv_broadcast(&em->cv);
+ gpr_mu_unlock(&em->mu);
+
+ gpr_event_wait(&em->backup_poller_done, gpr_inf_future);
+
+ /* drain pending work */
+ gpr_mu_lock(&em->mu);
+ while (maybe_do_queue_work(em))
+ ;
+ gpr_mu_unlock(&em->mu);
+
+ /* complete shutdown */
+ gpr_mu_destroy(&em->mu);
+ gpr_cv_destroy(&em->cv);
+
+ if (em->timeout_ev != NULL) {
+ event_free(em->timeout_ev);
+ }
+
+ if (em->event_base != NULL) {
+ event_base_free(em->event_base);
+ em->event_base = NULL;
+ }
+
+ return GRPC_EM_OK;
+}
+
+static void add_task(grpc_em *em, grpc_em_activation_data *adata) {
+ gpr_mu_lock(&em->mu);
+ if (em->q) {
+ adata->next = em->q;
+ adata->prev = adata->next->prev;
+ adata->next->prev = adata->prev->next = adata;
+ } else {
+ em->q = adata;
+ adata->next = adata->prev = adata;
+ }
+ gpr_cv_broadcast(&em->cv);
+ gpr_mu_unlock(&em->mu);
+}
+
+/* ===============grpc_em_alarm implementation==================== */
+
+/* The following function frees up the alarm's libevent structure and
+ should always be invoked just before calling the alarm's callback */
+static void alarm_ev_destroy(grpc_em_alarm *alarm) {
+ grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+ if (adata->ev != NULL) {
+ event_free(adata->ev);
+ adata->ev = NULL;
+ }
+}
+/* Proxy callback triggered by alarm->ev to call alarm->cb */
+static void libevent_alarm_cb(int fd, short what, void *arg /*=alarm*/) {
+ grpc_em_alarm *alarm = arg;
+ grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+ int trigger_old;
+
+ /* First check if this alarm has been canceled, atomically */
+ trigger_old =
+ gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT);
+ if (trigger_old == ALARM_TRIGGER_INIT) {
+ /* Before invoking user callback, destroy the libevent structure */
+ alarm_ev_destroy(alarm);
+ adata->status = GRPC_CALLBACK_SUCCESS;
+ add_task(alarm->task.em, adata);
+ }
+}
+
+grpc_em_error grpc_em_alarm_init(grpc_em_alarm *alarm, grpc_em *em,
+ grpc_em_cb_func alarm_cb, void *alarm_cb_arg) {
+ grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+ alarm->task.type = GRPC_EM_TASK_ALARM;
+ alarm->task.em = em;
+ gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
+ adata->cb = alarm_cb;
+ adata->arg = alarm_cb_arg;
+ adata->prev = NULL;
+ adata->next = NULL;
+ adata->ev = NULL;
+ return GRPC_EM_OK;
+}
+
+grpc_em_error grpc_em_alarm_add(grpc_em_alarm *alarm, gpr_timespec deadline) {
+ grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
+ struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
+ if (adata->ev) {
+ event_free(adata->ev);
+ gpr_log(GPR_INFO, "Adding an alarm that already has an event.");
+ adata->ev = NULL;
+ }
+ adata->ev = evtimer_new(alarm->task.em->event_base, libevent_alarm_cb, alarm);
+ /* Set the trigger field to untriggered. Do this as the last store since
+ it is a release of previous stores. */
+ gpr_atm_rel_store(&alarm->triggered, ALARM_TRIGGER_INIT);
+
+ if (adata->ev != NULL && evtimer_add(adata->ev, &delay) == 0) {
+ return GRPC_EM_OK;
+ } else {
+ return GRPC_EM_ERROR;
+ }
+}
+
+grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm, void **arg) {
+ grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
+ int trigger_old;
+
+ *arg = adata->arg;
+
+ /* First check if this alarm has been triggered, atomically */
+ trigger_old =
+ gpr_atm_full_fetch_add(&alarm->triggered, ALARM_TRIGGER_INCREMENT);
+ if (trigger_old == ALARM_TRIGGER_INIT) {
+ /* We need to make sure that we only invoke the callback if it hasn't
+ already been invoked */
+ /* First remove this event from libevent. This returns success even if the
+ event has gone active or invoked its callback. */
+ if (evtimer_del(adata->ev) != 0) {
+ /* The delete was unsuccessful for some reason. */
+ gpr_log(GPR_ERROR, "Attempt to delete alarm event was unsuccessful");
+ return GRPC_EM_ERROR;
+ }
+ /* Free up the event structure before invoking callback */
+ alarm_ev_destroy(alarm);
+ adata->status = GRPC_CALLBACK_CANCELLED;
+ add_task(alarm->task.em, adata);
+ }
+ return GRPC_EM_OK;
+}
+
+/* ==================== grpc_em_fd implementation =================== */
+
+/* Proxy callback to call a gRPC read/write callback */
+static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) {
+ grpc_em_fd *em_fd = arg;
+ grpc_em_cb_status status = GRPC_CALLBACK_SUCCESS;
+ int run_read_cb = 0;
+ int run_write_cb = 0;
+ grpc_em_activation_data *rdata, *wdata;
+
+ gpr_mu_lock(&em_fd->mu);
+ /* TODO(klempner): We need to delete the event here too so we avoid spurious
+ shutdowns. */
+ if (em_fd->shutdown_started) {
+ status = GRPC_CALLBACK_CANCELLED;
+ } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) {
+ status = GRPC_CALLBACK_TIMED_OUT;
+ /* TODO(klempner): This is broken if we are monitoring both read and write
+ events on the same fd -- generating a spurious event is okay, but
+ generating a spurious timeout is not. */
+ what |= (EV_READ | EV_WRITE);
+ }
+
+ if (what & EV_READ) {
+ switch (em_fd->read_state) {
+ case GRPC_EM_FD_WAITING:
+ run_read_cb = 1;
+ em_fd->read_state = GRPC_EM_FD_IDLE;
+ break;
+ case GRPC_EM_FD_IDLE:
+ case GRPC_EM_FD_CACHED:
+ em_fd->read_state = GRPC_EM_FD_CACHED;
+ }
+ }
+ if (what & EV_WRITE) {
+ switch (em_fd->write_state) {
+ case GRPC_EM_FD_WAITING:
+ run_write_cb = 1;
+ em_fd->write_state = GRPC_EM_FD_IDLE;
+ break;
+ case GRPC_EM_FD_IDLE:
+ case GRPC_EM_FD_CACHED:
+ em_fd->write_state = GRPC_EM_FD_CACHED;
+ }
+ }
+
+ if (run_read_cb) {
+ rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]);
+ rdata->status = status;
+ add_task(em_fd->task.em, rdata);
+ } else if (run_write_cb) {
+ wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]);
+ wdata->status = status;
+ add_task(em_fd->task.em, wdata);
+ }
+ gpr_mu_unlock(&em_fd->mu);
+}
+
+static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) {
+ /* TODO(klempner): This could just run directly in the calling thread, except
+ that libevent's handling of event_active() on an event which is already in
+ flight on a different thread is racy and easily triggers TSAN.
+ */
+ grpc_em_fd *em_fd = arg;
+ gpr_mu_lock(&em_fd->mu);
+ em_fd->shutdown_started = 1;
+ if (em_fd->read_state == GRPC_EM_FD_WAITING) {
+ event_active(em_fd->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1);
+ }
+ if (em_fd->write_state == GRPC_EM_FD_WAITING) {
+ event_active(em_fd->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1);
+ }
+ gpr_mu_unlock(&em_fd->mu);
+}
+
+grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
+ int flags;
+ grpc_em_activation_data *rdata, *wdata;
+
+ gpr_mu_lock(&em->mu);
+ em->num_fds++;
+ gpr_mu_unlock(&em->mu);
+
+ em_fd->shutdown_ev = NULL;
+ gpr_mu_init(&em_fd->mu);
+
+ flags = fcntl(fd, F_GETFL, 0);
+ if ((flags & O_NONBLOCK) == 0) {
+ gpr_log(GPR_ERROR, "File descriptor %d is blocking", fd);
+ return GRPC_EM_INVALID_ARGUMENTS;
+ }
+
+ em_fd->task.type = GRPC_EM_TASK_FD;
+ em_fd->task.em = em;
+ em_fd->fd = fd;
+
+ rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]);
+ rdata->ev = NULL;
+ rdata->cb = NULL;
+ rdata->arg = NULL;
+ rdata->status = GRPC_CALLBACK_SUCCESS;
+ rdata->prev = NULL;
+ rdata->next = NULL;
+
+ wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]);
+ wdata->ev = NULL;
+ wdata->cb = NULL;
+ wdata->arg = NULL;
+ wdata->status = GRPC_CALLBACK_SUCCESS;
+ wdata->prev = NULL;
+ wdata->next = NULL;
+
+ em_fd->read_state = GRPC_EM_FD_IDLE;
+ em_fd->write_state = GRPC_EM_FD_IDLE;
+
+ /* TODO(chenw): detect platforms where only level trigger is supported,
+ and set the event to non-persist. */
+ rdata->ev = event_new(em->event_base, em_fd->fd, EV_ET | EV_PERSIST | EV_READ,
+ em_fd_cb, em_fd);
+ if (!rdata->ev) {
+ gpr_log(GPR_ERROR, "Failed to create read event");
+ return GRPC_EM_ERROR;
+ }
+
+ wdata->ev = event_new(em->event_base, em_fd->fd,
+ EV_ET | EV_PERSIST | EV_WRITE, em_fd_cb, em_fd);
+ if (!wdata->ev) {
+ gpr_log(GPR_ERROR, "Failed to create write event");
+ return GRPC_EM_ERROR;
+ }
+
+ em_fd->shutdown_ev =
+ event_new(em->event_base, -1, EV_READ, em_fd_shutdown_cb, em_fd);
+
+ if (!em_fd->shutdown_ev) {
+ gpr_log(GPR_ERROR, "Failed to create shutdown event");
+ return GRPC_EM_ERROR;
+ }
+
+ em_fd->shutdown_started = 0;
+ return GRPC_EM_OK;
+}
+
+void grpc_em_fd_destroy(grpc_em_fd *em_fd) {
+ grpc_em_task_activity_type type;
+ grpc_em_activation_data *adata;
+ grpc_em *em = em_fd->task.em;
+
+ /* ensure anyone holding the lock has left - it's the callers responsibility
+ to ensure that no new users enter */
+ gpr_mu_lock(&em_fd->mu);
+ gpr_mu_unlock(&em_fd->mu);
+
+ for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) {
+ adata = &(em_fd->task.activation[type]);
+ GPR_ASSERT(adata->next == NULL);
+ if (adata->ev != NULL) {
+ event_free(adata->ev);
+ adata->ev = NULL;
+ }
+ }
+
+ if (em_fd->shutdown_ev != NULL) {
+ event_free(em_fd->shutdown_ev);
+ em_fd->shutdown_ev = NULL;
+ }
+ gpr_mu_destroy(&em_fd->mu);
+
+ gpr_mu_lock(&em->mu);
+ em->num_fds--;
+ gpr_cv_broadcast(&em->cv);
+ gpr_mu_unlock(&em->mu);
+}
+
+int grpc_em_fd_get(struct grpc_em_fd *em_fd) { return em_fd->fd; }
+
+/* Returns the event manager associated with *em_fd. */
+grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd) { return em_fd->task.em; }
+
+/* TODO(chenw): should we enforce the contract that notify_on_read cannot be
+ called when the previously registered callback has not been called yet. */
+grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd,
+ grpc_em_cb_func read_cb,
+ void *read_cb_arg,
+ gpr_timespec deadline) {
+ int force_event = 0;
+ grpc_em_activation_data *rdata;
+ grpc_em_error result = GRPC_EM_OK;
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
+ struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
+ struct timeval *delayp =
+ gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
+
+ rdata = &em_fd->task.activation[GRPC_EM_TA_READ];
+
+ gpr_mu_lock(&em_fd->mu);
+ rdata->cb = read_cb;
+ rdata->arg = read_cb_arg;
+
+ force_event =
+ (em_fd->shutdown_started || em_fd->read_state == GRPC_EM_FD_CACHED);
+ em_fd->read_state = GRPC_EM_FD_WAITING;
+
+ if (force_event) {
+ event_active(rdata->ev, EV_READ, 1);
+ } else if (event_add(rdata->ev, delayp) == -1) {
+ result = GRPC_EM_ERROR;
+ }
+ gpr_mu_unlock(&em_fd->mu);
+ return result;
+}
+
+grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd,
+ grpc_em_cb_func write_cb,
+ void *write_cb_arg,
+ gpr_timespec deadline) {
+ int force_event = 0;
+ grpc_em_activation_data *wdata;
+ grpc_em_error result = GRPC_EM_OK;
+ gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
+ struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
+ struct timeval *delayp =
+ gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
+
+ wdata = &em_fd->task.activation[GRPC_EM_TA_WRITE];
+
+ gpr_mu_lock(&em_fd->mu);
+ wdata->cb = write_cb;
+ wdata->arg = write_cb_arg;
+
+ force_event =
+ (em_fd->shutdown_started || em_fd->write_state == GRPC_EM_FD_CACHED);
+ em_fd->write_state = GRPC_EM_FD_WAITING;
+
+ if (force_event) {
+ event_active(wdata->ev, EV_WRITE, 1);
+ } else if (event_add(wdata->ev, delayp) == -1) {
+ result = GRPC_EM_ERROR;
+ }
+ gpr_mu_unlock(&em_fd->mu);
+ return result;
+}
+
+void grpc_em_fd_shutdown(grpc_em_fd *em_fd) {
+ event_active(em_fd->shutdown_ev, EV_READ, 1);
+}
+
+/*====================== Other callback functions ======================*/
+
+/* Sometimes we want a followup callback: something to be added from the
+ current callback for the EM to invoke once this callback is complete.
+ This is implemented by inserting an entry into an EM queue. */
+
+/* The following structure holds the field needed for adding the
+ followup callback. These are the argument for the followup callback,
+ the function to use for the followup callback, and the
+ activation data pointer used for the queues (to free in the CB) */
+struct followup_callback_arg {
+ grpc_em_cb_func func;
+ void *cb_arg;
+ grpc_em_activation_data adata;
+};
+
+static void followup_proxy_callback(void *cb_arg, grpc_em_cb_status status) {
+ struct followup_callback_arg *fcb_arg = cb_arg;
+ /* Invoke the function */
+ fcb_arg->func(fcb_arg->cb_arg, status);
+ gpr_free(fcb_arg);
+}
+
+grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb,
+ void *cb_arg) {
+ grpc_em_activation_data *adptr;
+ struct followup_callback_arg *fcb_arg;
+
+ fcb_arg = gpr_malloc(sizeof(*fcb_arg));
+ if (fcb_arg == NULL) {
+ return GRPC_EM_ERROR;
+ }
+ /* Set up the activation data and followup callback argument structures */
+ adptr = &fcb_arg->adata;
+ adptr->ev = NULL;
+ adptr->cb = followup_proxy_callback;
+ adptr->arg = fcb_arg;
+ adptr->status = GRPC_CALLBACK_SUCCESS;
+ adptr->prev = NULL;
+ adptr->next = NULL;
+
+ fcb_arg->func = cb;
+ fcb_arg->cb_arg = cb_arg;
+
+ /* Insert an activation data for the specified em */
+ add_task(em, adptr);
+ return GRPC_EM_OK;
+}