aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/alarm.c34
-rw-r--r--src/core/iomgr/alarm_internal.h5
-rw-r--r--src/core/iomgr/endpoint.c14
-rw-r--r--src/core/iomgr/endpoint.h17
-rw-r--r--src/core/iomgr/fd_posix.c274
-rw-r--r--src/core/iomgr/fd_posix.h138
-rw-r--r--src/core/iomgr/iomgr.c204
-rw-r--r--src/core/iomgr/iomgr.h11
-rw-r--r--src/core/iomgr/iomgr_internal.h (renamed from src/core/iomgr/iomgr_libevent_use_threads.c)35
-rw-r--r--src/core/iomgr/iomgr_libevent.c652
-rw-r--r--src/core/iomgr/iomgr_libevent.h206
-rw-r--r--src/core/iomgr/iomgr_posix.c (renamed from src/core/iomgr/pollset.c)7
-rw-r--r--src/core/iomgr/iomgr_posix.h (renamed from src/core/iomgr/iomgr_completion_queue_interface.h)15
-rw-r--r--src/core/iomgr/pollset.h21
-rw-r--r--src/core/iomgr/pollset_multipoller_with_poll_posix.c237
-rw-r--r--src/core/iomgr/pollset_posix.c340
-rw-r--r--src/core/iomgr/pollset_posix.h95
-rw-r--r--src/core/iomgr/resolve_address_posix.c6
-rw-r--r--src/core/iomgr/tcp_client_posix.c76
-rw-r--r--src/core/iomgr/tcp_posix.c67
-rw-r--r--src/core/iomgr/tcp_posix.h2
-rw-r--r--src/core/iomgr/tcp_server.h4
-rw-r--r--src/core/iomgr/tcp_server_posix.c38
23 files changed, 1473 insertions, 1025 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c
index b7238f716a..2664879323 100644
--- a/src/core/iomgr/alarm.c
+++ b/src/core/iomgr/alarm.c
@@ -71,8 +71,8 @@ static shard_type g_shards[NUM_SHARDS];
/* Protected by g_mu */
static shard_type *g_shard_queue[NUM_SHARDS];
-static int run_some_expired_alarms(gpr_timespec now,
- grpc_iomgr_cb_status status);
+static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
+ gpr_timespec *next, int success);
static gpr_timespec compute_min_deadline(shard_type *shard) {
return grpc_alarm_heap_is_empty(&shard->heap)
@@ -102,7 +102,7 @@ void grpc_alarm_list_init(gpr_timespec now) {
void grpc_alarm_list_shutdown() {
int i;
- while (run_some_expired_alarms(gpr_inf_future, GRPC_CALLBACK_CANCELLED))
+ while (run_some_expired_alarms(NULL, gpr_inf_future, NULL, 0))
;
for (i = 0; i < NUM_SHARDS; i++) {
shard_type *shard = &g_shards[i];
@@ -233,7 +233,7 @@ void grpc_alarm_cancel(grpc_alarm *alarm) {
gpr_mu_unlock(&shard->mu);
if (triggered) {
- alarm->cb(alarm->cb_arg, GRPC_CALLBACK_CANCELLED);
+ alarm->cb(alarm->cb_arg, 0);
}
}
@@ -299,8 +299,8 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now,
return n;
}
-static int run_some_expired_alarms(gpr_timespec now,
- grpc_iomgr_cb_status status) {
+static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now,
+ gpr_timespec *next, int success) {
size_t n = 0;
size_t i;
grpc_alarm *alarms[MAX_ALARMS_PER_CHECK];
@@ -329,19 +329,35 @@ static int run_some_expired_alarms(gpr_timespec now,
note_deadline_change(g_shard_queue[0]);
}
+ if (next) {
+ *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
+ }
+
gpr_mu_unlock(&g_mu);
gpr_mu_unlock(&g_checker_mu);
+ } else if (next) {
+ gpr_mu_lock(&g_mu);
+ *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
+ gpr_mu_unlock(&g_mu);
+ }
+
+ if (n && drop_mu) {
+ gpr_mu_unlock(drop_mu);
}
for (i = 0; i < n; i++) {
- alarms[i]->cb(alarms[i]->cb_arg, status);
+ alarms[i]->cb(alarms[i]->cb_arg, success);
+ }
+
+ if (n && drop_mu) {
+ gpr_mu_lock(drop_mu);
}
return n;
}
-int grpc_alarm_check(gpr_timespec now) {
- return run_some_expired_alarms(now, GRPC_CALLBACK_SUCCESS);
+int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) {
+ return run_some_expired_alarms(drop_mu, now, next, 1);
}
gpr_timespec grpc_alarm_list_next_timeout() {
diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h
index e605ff84f9..12b6ab4286 100644
--- a/src/core/iomgr/alarm_internal.h
+++ b/src/core/iomgr/alarm_internal.h
@@ -34,9 +34,12 @@
#ifndef __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
#define __GRPC_INTERNAL_IOMGR_ALARM_INTERNAL_H_
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
/* iomgr internal api for dealing with alarms */
-int grpc_alarm_check(gpr_timespec now);
+int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next);
void grpc_alarm_list_init(gpr_timespec now);
void grpc_alarm_list_shutdown();
diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c
index f1944bf672..9e5d56389d 100644
--- a/src/core/iomgr/endpoint.c
+++ b/src/core/iomgr/endpoint.c
@@ -34,14 +34,16 @@
#include "src/core/iomgr/endpoint.h"
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data, gpr_timespec deadline) {
- ep->vtable->notify_on_read(ep, cb, user_data, deadline);
+ void *user_data) {
+ ep->vtable->notify_on_read(ep, cb, user_data);
}
-grpc_endpoint_write_status grpc_endpoint_write(
- grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
- grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) {
- return ep->vtable->write(ep, slices, nslices, cb, user_data, deadline);
+grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
+ gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_write_cb cb,
+ void *user_data) {
+ return ep->vtable->write(ep, slices, nslices, cb, user_data);
}
void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h
index bbd800bea8..ec86d9a146 100644
--- a/src/core/iomgr/endpoint.h
+++ b/src/core/iomgr/endpoint.h
@@ -48,8 +48,7 @@ typedef enum grpc_endpoint_cb_status {
GRPC_ENDPOINT_CB_OK = 0, /* Call completed successfully */
GRPC_ENDPOINT_CB_EOF, /* Call completed successfully, end of file reached */
GRPC_ENDPOINT_CB_SHUTDOWN, /* Call interrupted by shutdown */
- GRPC_ENDPOINT_CB_ERROR, /* Call interrupted by socket error */
- GRPC_ENDPOINT_CB_TIMED_OUT /* Call timed out */
+ GRPC_ENDPOINT_CB_ERROR /* Call interrupted by socket error */
} grpc_endpoint_cb_status;
typedef enum grpc_endpoint_write_status {
@@ -66,10 +65,10 @@ typedef void (*grpc_endpoint_write_cb)(void *user_data,
struct grpc_endpoint_vtable {
void (*notify_on_read)(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data, gpr_timespec deadline);
+ void *user_data);
grpc_endpoint_write_status (*write)(grpc_endpoint *ep, gpr_slice *slices,
size_t nslices, grpc_endpoint_write_cb cb,
- void *user_data, gpr_timespec deadline);
+ void *user_data);
void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset);
void (*shutdown)(grpc_endpoint *ep);
void (*destroy)(grpc_endpoint *ep);
@@ -77,7 +76,7 @@ struct grpc_endpoint_vtable {
/* When data is available on the connection, calls the callback with slices. */
void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data, gpr_timespec deadline);
+ void *user_data);
/* Write slices out to the socket.
@@ -85,9 +84,11 @@ void grpc_endpoint_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
returns GRPC_ENDPOINT_WRITE_DONE.
Otherwise it returns GRPC_ENDPOINT_WRITE_PENDING and calls cb when the
connection is ready for more data. */
-grpc_endpoint_write_status grpc_endpoint_write(
- grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
- grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline);
+grpc_endpoint_write_status grpc_endpoint_write(grpc_endpoint *ep,
+ gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_write_cb cb,
+ void *user_data);
/* Causes any pending read/write callbacks to run immediately with
GRPC_ENDPOINT_CB_SHUTDOWN status */
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
new file mode 100644
index 0000000000..3cd2f9a8e0
--- /dev/null
+++ b/src/core/iomgr/fd_posix.c
@@ -0,0 +1,274 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/fd_posix.h"
+
+#include <assert.h>
+#include <unistd.h>
+
+#include "src/core/iomgr/iomgr_internal.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+enum descriptor_state { NOT_READY, READY, WAITING };
+
+static void destroy(grpc_fd *fd) {
+ grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
+ gpr_mu_destroy(&fd->set_state_mu);
+ gpr_free(fd->watchers);
+ gpr_free(fd);
+ grpc_iomgr_unref();
+}
+
+static void ref_by(grpc_fd *fd, int n) {
+ gpr_atm_no_barrier_fetch_add(&fd->refst, n);
+}
+
+static void unref_by(grpc_fd *fd, int n) {
+ if (gpr_atm_full_fetch_add(&fd->refst, -n) == n) {
+ destroy(fd);
+ }
+}
+
+static void do_nothing(void *ignored, int success) {}
+
+grpc_fd *grpc_fd_create(int fd) {
+ grpc_fd *r = gpr_malloc(sizeof(grpc_fd));
+ grpc_iomgr_ref();
+ gpr_atm_rel_store(&r->refst, 1);
+ gpr_atm_rel_store(&r->readst.state, NOT_READY);
+ gpr_atm_rel_store(&r->writest.state, NOT_READY);
+ gpr_mu_init(&r->set_state_mu);
+ gpr_mu_init(&r->watcher_mu);
+ gpr_atm_rel_store(&r->shutdown, 0);
+ r->fd = fd;
+ r->watchers = NULL;
+ r->watcher_count = 0;
+ r->watcher_capacity = 0;
+ grpc_pollset_add_fd(grpc_backup_pollset(), r);
+ return r;
+}
+
+int grpc_fd_is_orphaned(grpc_fd *fd) {
+ return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
+}
+
+static void wake_watchers(grpc_fd *fd) {
+ size_t i, n;
+ gpr_mu_lock(&fd->watcher_mu);
+ n = fd->watcher_count;
+ for (i = 0; i < n; i++) {
+ grpc_pollset_force_kick(fd->watchers[i]);
+ }
+ gpr_mu_unlock(&fd->watcher_mu);
+}
+
+void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
+ fd->on_done = on_done ? on_done : do_nothing;
+ fd->on_done_user_data = user_data;
+ ref_by(fd, 1); /* remove active status, but keep referenced */
+ wake_watchers(fd);
+ close(fd->fd);
+ unref_by(fd, 2); /* drop the reference */
+}
+
+/* increment refcount by two to avoid changing the orphan bit */
+void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
+
+void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
+
+typedef struct {
+ grpc_iomgr_cb_func cb;
+ void *arg;
+} callback;
+
+static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
+ int allow_synchronous_callback) {
+ if (allow_synchronous_callback) {
+ cb(arg, success);
+ } else {
+ grpc_iomgr_add_delayed_callback(cb, arg, success);
+ }
+}
+
+static void make_callbacks(callback *callbacks, size_t n, int success,
+ int allow_synchronous_callback) {
+ size_t i;
+ for (i = 0; i < n; i++) {
+ make_callback(callbacks[i].cb, callbacks[i].arg, success,
+ allow_synchronous_callback);
+ }
+}
+
+static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb,
+ void *arg, int allow_synchronous_callback) {
+ switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
+ case NOT_READY:
+ /* There is no race if the descriptor is already ready, so we skip
+ the interlocked op in that case. As long as the app doesn't
+ try to set the same upcall twice (which it shouldn't) then
+ oldval should never be anything other than READY or NOT_READY. We
+ don't
+ check for user error on the fast path. */
+ st->cb = cb;
+ st->cb_arg = arg;
+ if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) {
+ /* swap was successful -- the closure will run after the next
+ set_ready call. NOTE: we don't have an ABA problem here,
+ since we should never have concurrent calls to the same
+ notify_on function. */
+ wake_watchers(fd);
+ return;
+ }
+ /* swap was unsuccessful due to an intervening set_ready call.
+ Fall through to the READY code below */
+ case READY:
+ assert(gpr_atm_acq_load(&st->state) == READY);
+ gpr_atm_rel_store(&st->state, NOT_READY);
+ make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown),
+ allow_synchronous_callback);
+ return;
+ case WAITING:
+ /* upcallptr was set to a different closure. This is an error! */
+ gpr_log(GPR_ERROR,
+ "User called a notify_on function with a previous callback still "
+ "pending");
+ abort();
+ }
+ gpr_log(GPR_ERROR, "Corrupt memory in &st->state");
+ abort();
+}
+
+static void set_ready_locked(grpc_fd_state *st, callback *callbacks,
+ size_t *ncallbacks) {
+ callback *c;
+
+ switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) {
+ case NOT_READY:
+ if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) {
+ /* swap was successful -- the closure will run after the next
+ notify_on call. */
+ return;
+ }
+ /* swap was unsuccessful due to an intervening set_ready call.
+ Fall through to the WAITING code below */
+ case WAITING:
+ assert(gpr_atm_acq_load(&st->state) == WAITING);
+ c = &callbacks[(*ncallbacks)++];
+ c->cb = st->cb;
+ c->arg = st->cb_arg;
+ gpr_atm_rel_store(&st->state, NOT_READY);
+ return;
+ case READY:
+ /* duplicate ready, ignore */
+ return;
+ }
+}
+
+static void set_ready(grpc_fd *fd, grpc_fd_state *st,
+ int allow_synchronous_callback) {
+ /* only one set_ready can be active at once (but there may be a racing
+ notify_on) */
+ int success;
+ callback cb;
+ size_t ncb = 0;
+ gpr_mu_lock(&fd->set_state_mu);
+ set_ready_locked(st, &cb, &ncb);
+ gpr_mu_unlock(&fd->set_state_mu);
+ success = !gpr_atm_acq_load(&fd->shutdown);
+ make_callbacks(&cb, ncb, success, allow_synchronous_callback);
+}
+
+void grpc_fd_shutdown(grpc_fd *fd) {
+ callback cb[2];
+ size_t ncb = 0;
+ gpr_mu_lock(&fd->set_state_mu);
+ GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown));
+ gpr_atm_rel_store(&fd->shutdown, 1);
+ set_ready_locked(&fd->readst, cb, &ncb);
+ set_ready_locked(&fd->writest, cb, &ncb);
+ gpr_mu_unlock(&fd->set_state_mu);
+ make_callbacks(cb, ncb, 0, 0);
+}
+
+void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
+ void *read_cb_arg) {
+ notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0);
+}
+
+void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
+ void *write_cb_arg) {
+ notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0);
+}
+
+gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ gpr_uint32 read_mask, gpr_uint32 write_mask) {
+ /* keep track of pollers that have requested our events, in case they change
+ */
+ gpr_mu_lock(&fd->watcher_mu);
+ if (fd->watcher_capacity == fd->watcher_count) {
+ fd->watcher_capacity =
+ GPR_MAX(fd->watcher_capacity + 8, fd->watcher_capacity * 3 / 2);
+ fd->watchers = gpr_realloc(fd->watchers,
+ fd->watcher_capacity * sizeof(grpc_pollset *));
+ }
+ fd->watchers[fd->watcher_count++] = pollset;
+ gpr_mu_unlock(&fd->watcher_mu);
+
+ return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) |
+ (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0);
+}
+
+void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset) {
+ size_t r, w, n;
+
+ gpr_mu_lock(&fd->watcher_mu);
+ n = fd->watcher_count;
+ for (r = 0, w = 0; r < n; r++) {
+ if (fd->watchers[r] == pollset) {
+ fd->watcher_count--;
+ continue;
+ }
+ fd->watchers[w++] = fd->watchers[r];
+ }
+ gpr_mu_unlock(&fd->watcher_mu);
+}
+
+void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) {
+ set_ready(fd, &fd->readst, allow_synchronous_callback);
+}
+
+void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) {
+ set_ready(fd, &fd->writest, allow_synchronous_callback);
+}
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
new file mode 100644
index 0000000000..232de0c3e0
--- /dev/null
+++ b/src/core/iomgr/fd_posix.h
@@ -0,0 +1,138 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_FD_POSIX_H_
+#define __GRPC_INTERNAL_IOMGR_FD_POSIX_H_
+
+#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/pollset.h"
+#include <grpc/support/atm.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+
+typedef struct {
+ grpc_iomgr_cb_func cb;
+ void *cb_arg;
+ int success;
+ gpr_atm state;
+} grpc_fd_state;
+
+typedef struct grpc_fd {
+ int fd;
+ /* refst format:
+ bit0: 1=active/0=orphaned
+ bit1-n: refcount
+ meaning that mostly we ref by two to avoid altering the orphaned bit,
+ and just unref by 1 when we're ready to flag the object as orphaned */
+ gpr_atm refst;
+
+ gpr_mu set_state_mu;
+ gpr_atm shutdown;
+
+ gpr_mu watcher_mu;
+ grpc_pollset **watchers;
+ size_t watcher_count;
+ size_t watcher_capacity;
+
+ grpc_fd_state readst;
+ grpc_fd_state writest;
+
+ grpc_iomgr_cb_func on_done;
+ void *on_done_user_data;
+} grpc_fd;
+
+/* Create a wrapped file descriptor.
+ Requires fd is a non-blocking file descriptor.
+ This takes ownership of closing fd. */
+grpc_fd *grpc_fd_create(int fd);
+
+/* Releases fd to be asynchronously destroyed.
+ on_done is called when the underlying file descriptor is definitely close()d.
+ If on_done is NULL, no callback will be made.
+ Requires: *fd initialized; no outstanding notify_on_read or
+ notify_on_write. */
+void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data);
+
+/* Begin polling on an fd.
+ Registers that the given pollset is interested in this fd - so that if read
+ or writability interest changes, the pollset can be kicked to pick up that
+ new interest.
+ Return value is:
+ (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
+ i.e. a combination of read_mask and write_mask determined by the fd's current
+ interest in said events.
+ Polling strategies that do not need to alter their behavior depending on the
+ fd's current interest (such as epoll) do not need to call this function. */
+gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ gpr_uint32 read_mask, gpr_uint32 write_mask);
+/* Complete polling previously started with grpc_fd_begin_poll */
+void grpc_fd_end_poll(grpc_fd *fd, grpc_pollset *pollset);
+
+/* Return 1 if this fd is orphaned, 0 otherwise */
+int grpc_fd_is_orphaned(grpc_fd *fd);
+
+/* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */
+void grpc_fd_shutdown(grpc_fd *fd);
+
+/* Register read interest, causing read_cb to be called once when fd becomes
+ readable, on deadline specified by deadline, or on shutdown triggered by
+ grpc_fd_shutdown.
+ read_cb will be called with read_cb_arg when *fd becomes readable.
+ read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable,
+ GRPC_CALLBACK_TIMED_OUT if the call timed out,
+ and CANCELLED if the call was cancelled.
+
+ Requires:This method must not be called before the read_cb for any previous
+ call runs. Edge triggered events are used whenever they are supported by the
+ underlying platform. This means that users must drain fd in read_cb before
+ calling notify_on_read again. Users are also expected to handle spurious
+ events, i.e read_cb is called while nothing can be readable from fd */
+void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb,
+ void *read_cb_arg);
+
+/* Exactly the same semantics as above, except based on writable events. */
+void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
+ void *write_cb_arg);
+
+/* Notification from the poller to an fd that it has become readable or
+ writable.
+ If allow_synchronous_callback is 1, allow running the fd callback inline
+ in this callstack, otherwise register an asynchronous callback and return */
+void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback);
+void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback);
+
+/* Reference counting for fds */
+void grpc_fd_ref(grpc_fd *fd);
+void grpc_fd_unref(grpc_fd *fd);
+
+#endif /* __GRPC_INTERNAL_IOMGR_FD_POSIX_H_ */
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
new file mode 100644
index 0000000000..03f56a50a3
--- /dev/null
+++ b/src/core/iomgr/iomgr.c
@@ -0,0 +1,204 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/iomgr.h"
+
+#include <stdlib.h>
+
+#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/alarm_internal.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/sync.h>
+
+typedef struct delayed_callback {
+ grpc_iomgr_cb_func cb;
+ void *cb_arg;
+ int success;
+ struct delayed_callback *next;
+} delayed_callback;
+
+static gpr_mu g_mu;
+static gpr_cv g_cv;
+static delayed_callback *g_cbs_head = NULL;
+static delayed_callback *g_cbs_tail = NULL;
+static int g_shutdown;
+static int g_refs;
+static gpr_event g_background_callback_executor_done;
+
+/* Execute followup callbacks continuously.
+ Other threads may check in and help during pollset_work() */
+static void background_callback_executor(void *ignored) {
+ gpr_mu_lock(&g_mu);
+ while (!g_shutdown) {
+ gpr_timespec deadline = gpr_inf_future;
+ if (g_cbs_head) {
+ delayed_callback *cb = g_cbs_head;
+ g_cbs_head = cb->next;
+ if (!g_cbs_head) g_cbs_tail = NULL;
+ gpr_mu_unlock(&g_mu);
+ cb->cb(cb->cb_arg, cb->success);
+ gpr_free(cb);
+ gpr_mu_lock(&g_mu);
+ } else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
+ } else {
+ gpr_cv_wait(&g_cv, &g_mu, deadline);
+ }
+ }
+ gpr_mu_unlock(&g_mu);
+ gpr_event_set(&g_background_callback_executor_done, (void *)1);
+}
+
+void grpc_kick_poller() { gpr_cv_broadcast(&g_cv); }
+
+void grpc_iomgr_init() {
+ gpr_thd_id id;
+ gpr_mu_init(&g_mu);
+ gpr_cv_init(&g_cv);
+ grpc_alarm_list_init(gpr_now());
+ g_refs = 0;
+ grpc_iomgr_platform_init();
+ gpr_event_init(&g_background_callback_executor_done);
+ gpr_thd_new(&id, background_callback_executor, NULL, NULL);
+}
+
+void grpc_iomgr_shutdown() {
+ delayed_callback *cb;
+ gpr_timespec shutdown_deadline =
+ gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
+
+ grpc_iomgr_platform_shutdown();
+
+ gpr_mu_lock(&g_mu);
+ g_shutdown = 1;
+ while (g_cbs_head || g_refs) {
+ gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed%s", g_refs,
+ g_cbs_head ? " and executing final callbacks" : "");
+ while (g_cbs_head) {
+ cb = g_cbs_head;
+ g_cbs_head = cb->next;
+ if (!g_cbs_head) g_cbs_tail = NULL;
+ gpr_mu_unlock(&g_mu);
+
+ cb->cb(cb->cb_arg, 0);
+ gpr_free(cb);
+ gpr_mu_lock(&g_mu);
+ }
+ if (g_refs) {
+ if (gpr_cv_wait(&g_cv, &g_mu, shutdown_deadline) && g_cbs_head == NULL) {
+ gpr_log(GPR_DEBUG,
+ "Failed to free %d iomgr objects before shutdown deadline: "
+ "memory leaks are likely",
+ g_refs);
+ break;
+ }
+ }
+ }
+ gpr_mu_unlock(&g_mu);
+
+ gpr_event_wait(&g_background_callback_executor_done, gpr_inf_future);
+
+ grpc_alarm_list_shutdown();
+ gpr_mu_destroy(&g_mu);
+ gpr_cv_destroy(&g_cv);
+}
+
+void grpc_iomgr_ref() {
+ gpr_mu_lock(&g_mu);
+ ++g_refs;
+ gpr_mu_unlock(&g_mu);
+}
+
+void grpc_iomgr_unref() {
+ gpr_mu_lock(&g_mu);
+ if (0 == --g_refs) {
+ gpr_cv_signal(&g_cv);
+ }
+ gpr_mu_unlock(&g_mu);
+}
+
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
+ int success) {
+ delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback));
+ dcb->cb = cb;
+ dcb->cb_arg = cb_arg;
+ dcb->success = success;
+ gpr_mu_lock(&g_mu);
+ dcb->next = NULL;
+ if (!g_cbs_tail) {
+ g_cbs_head = g_cbs_tail = dcb;
+ } else {
+ g_cbs_tail->next = dcb;
+ g_cbs_tail = dcb;
+ }
+ gpr_cv_signal(&g_cv);
+ gpr_mu_unlock(&g_mu);
+}
+
+void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
+ grpc_iomgr_add_delayed_callback(cb, cb_arg, 1);
+}
+
+int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
+ int n = 0;
+ gpr_mu *retake_mu = NULL;
+ delayed_callback *cb;
+ for (;;) {
+ /* check for new work */
+ if (!gpr_mu_trylock(&g_mu)) {
+ break;
+ }
+ cb = g_cbs_head;
+ if (!cb) {
+ gpr_mu_unlock(&g_mu);
+ break;
+ }
+ g_cbs_head = cb->next;
+ if (!g_cbs_head) g_cbs_tail = NULL;
+ gpr_mu_unlock(&g_mu);
+ /* if we have a mutex to drop, do so before executing work */
+ if (drop_mu) {
+ gpr_mu_unlock(drop_mu);
+ retake_mu = drop_mu;
+ drop_mu = NULL;
+ }
+ cb->cb(cb->cb_arg, success && cb->success);
+ gpr_free(cb);
+ n++;
+ }
+ if (retake_mu) {
+ gpr_mu_lock(retake_mu);
+ }
+ return n;
+}
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index cf39f947bc..16991a9b90 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -34,17 +34,8 @@
#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_H__
#define __GRPC_INTERNAL_IOMGR_IOMGR_H__
-/* Status passed to callbacks for grpc_em_fd_notify_on_read and
- grpc_em_fd_notify_on_write. */
-typedef enum grpc_em_cb_status {
- GRPC_CALLBACK_SUCCESS = 0,
- GRPC_CALLBACK_TIMED_OUT,
- GRPC_CALLBACK_CANCELLED,
- GRPC_CALLBACK_DO_NOT_USE
-} grpc_iomgr_cb_status;
-
/* gRPC Callback definition */
-typedef void (*grpc_iomgr_cb_func)(void *arg, grpc_iomgr_cb_status status);
+typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
void grpc_iomgr_init();
void grpc_iomgr_shutdown();
diff --git a/src/core/iomgr/iomgr_libevent_use_threads.c b/src/core/iomgr/iomgr_internal.h
index af449342f0..5f72542777 100644
--- a/src/core/iomgr/iomgr_libevent_use_threads.c
+++ b/src/core/iomgr/iomgr_internal.h
@@ -31,26 +31,21 @@
*
*/
-/* Posix grpc event manager support code. */
-#include <grpc/support/log.h>
+#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_
+#define __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_
+
+#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/sync.h>
-#include <event2/thread.h>
-static int error_code = 0;
-static gpr_once threads_once = GPR_ONCE_INIT;
-static void evthread_threads_initialize(void) {
- error_code = evthread_use_pthreads();
- if (error_code) {
- gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!");
- }
-}
+int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
+ int success);
+
+void grpc_iomgr_ref();
+void grpc_iomgr_unref();
+
+void grpc_iomgr_platform_init();
+void grpc_iomgr_platform_shutdown();
-/* Notify LibEvent that Posix pthread is used. */
-int evthread_use_threads() {
- gpr_once_init(&threads_once, &evthread_threads_initialize);
- /* For Pthreads or Windows threads, Libevent provides simple APIs to set
- mutexes and conditional variables to support cross thread operations.
- For other platforms, LibEvent provide callback APIs to hook mutexes and
- conditional variables. */
- return error_code;
-}
+#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_INTERNAL_H_ */
diff --git a/src/core/iomgr/iomgr_libevent.c b/src/core/iomgr/iomgr_libevent.c
deleted file mode 100644
index 6188ab2749..0000000000
--- a/src/core/iomgr/iomgr_libevent.c
+++ /dev/null
@@ -1,652 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/core/iomgr/iomgr_libevent.h"
-
-#include <unistd.h>
-#include <fcntl.h>
-
-#include "src/core/iomgr/alarm.h"
-#include "src/core/iomgr/alarm_internal.h"
-#include <grpc/support/atm.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/thd.h>
-#include <grpc/support/time.h>
-#include <event2/event.h>
-#include <event2/thread.h>
-
-#define ALARM_TRIGGER_INIT ((gpr_atm)0)
-#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1)
-#define DONE_SHUTDOWN ((void *)1)
-
-#define POLLER_ID_INVALID ((gpr_atm)-1)
-
-/* Global data */
-struct event_base *g_event_base;
-gpr_mu grpc_iomgr_mu;
-gpr_cv grpc_iomgr_cv;
-static grpc_libevent_activation_data *g_activation_queue;
-static int g_num_pollers;
-static int g_num_fds;
-static int g_num_address_resolutions;
-static gpr_timespec g_last_poll_completed;
-static int g_shutdown_backup_poller;
-static gpr_event g_backup_poller_done;
-/* activated to break out of the event loop early */
-static struct event *g_timeout_ev;
-/* activated to safely break polling from other threads */
-static struct event *g_break_ev;
-static grpc_fd *g_fds_to_free;
-
-int evthread_use_threads(void);
-static void grpc_fd_impl_destroy(grpc_fd *impl);
-
-void grpc_iomgr_ref_address_resolution(int delta) {
- gpr_mu_lock(&grpc_iomgr_mu);
- GPR_ASSERT(!g_shutdown_backup_poller);
- g_num_address_resolutions += delta;
- if (0 == g_num_address_resolutions) {
- gpr_cv_broadcast(&grpc_iomgr_cv);
- }
- gpr_mu_unlock(&grpc_iomgr_mu);
-}
-
-/* If anything is in the work queue, process one item and return 1.
- Return 0 if there were no work items to complete.
- Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
-static int maybe_do_queue_work() {
- grpc_libevent_activation_data *work = g_activation_queue;
-
- if (work == NULL) return 0;
-
- if (work->next == work) {
- g_activation_queue = NULL;
- } else {
- g_activation_queue = work->next;
- g_activation_queue->prev = work->prev;
- g_activation_queue->next->prev = g_activation_queue->prev->next =
- g_activation_queue;
- }
- work->next = work->prev = NULL;
- /* force status to cancelled from ok when shutting down */
- if (g_shutdown_backup_poller && work->status == GRPC_CALLBACK_SUCCESS) {
- work->status = GRPC_CALLBACK_CANCELLED;
- }
- gpr_mu_unlock(&grpc_iomgr_mu);
-
- work->cb(work->arg, work->status);
-
- gpr_mu_lock(&grpc_iomgr_mu);
- return 1;
-}
-
-/* Break out of the event loop on timeout */
-static void timer_callback(int fd, short events, void *context) {
- event_base_loopbreak((struct event_base *)context);
-}
-
-static void break_callback(int fd, short events, void *context) {
- event_base_loopbreak((struct event_base *)context);
-}
-
-static void free_fd_list(grpc_fd *impl) {
- while (impl != NULL) {
- grpc_fd *current = impl;
- impl = impl->next;
- grpc_fd_impl_destroy(current);
- current->on_done(current->on_done_user_data, GRPC_CALLBACK_SUCCESS);
- gpr_free(current);
- }
-}
-
-static void maybe_free_fds() {
- if (g_fds_to_free) {
- free_fd_list(g_fds_to_free);
- g_fds_to_free = NULL;
- }
-}
-
-void grpc_kick_poller() { event_active(g_break_ev, EV_READ, 0); }
-
-/* Spend some time doing polling and libevent maintenance work if no other
- thread is. This includes both polling for events and destroying/closing file
- descriptor objects.
- Returns 1 if polling was performed, 0 otherwise.
- Requires grpc_iomgr_mu locked, may unlock and relock during the call. */
-static int maybe_do_polling_work(struct timeval delay) {
- int status;
-
- if (g_num_pollers) return 0;
-
- g_num_pollers = 1;
-
- maybe_free_fds();
-
- gpr_mu_unlock(&grpc_iomgr_mu);
-
- event_add(g_timeout_ev, &delay);
- status = event_base_loop(g_event_base, EVLOOP_ONCE);
- if (status < 0) {
- gpr_log(GPR_ERROR, "event polling loop stops with error status %d", status);
- }
- event_del(g_timeout_ev);
-
- gpr_mu_lock(&grpc_iomgr_mu);
- maybe_free_fds();
-
- g_num_pollers = 0;
- gpr_cv_broadcast(&grpc_iomgr_cv);
- return 1;
-}
-
-static int maybe_do_alarm_work(gpr_timespec now, gpr_timespec next) {
- int r = 0;
- if (gpr_time_cmp(next, now) < 0) {
- gpr_mu_unlock(&grpc_iomgr_mu);
- r = grpc_alarm_check(now);
- gpr_mu_lock(&grpc_iomgr_mu);
- }
- return r;
-}
-
-int grpc_iomgr_work(gpr_timespec deadline) {
- gpr_timespec now = gpr_now();
- gpr_timespec next = grpc_alarm_list_next_timeout();
- gpr_timespec delay_timespec = gpr_time_sub(deadline, now);
- /* poll for no longer than one second */
- gpr_timespec max_delay = gpr_time_from_seconds(1);
- struct timeval delay;
-
- if (gpr_time_cmp(delay_timespec, gpr_time_0) <= 0) {
- return 0;
- }
-
- if (gpr_time_cmp(delay_timespec, max_delay) > 0) {
- delay_timespec = max_delay;
- }
-
- /* Adjust delay to account for the next alarm, if applicable. */
- delay_timespec = gpr_time_min(
- delay_timespec, gpr_time_sub(grpc_alarm_list_next_timeout(), now));
-
- delay = gpr_timeval_from_timespec(delay_timespec);
-
- if (maybe_do_queue_work() || maybe_do_alarm_work(now, next) ||
- maybe_do_polling_work(delay)) {
- g_last_poll_completed = gpr_now();
- return 1;
- }
-
- return 0;
-}
-
-static void backup_poller_thread(void *p) {
- int backup_poller_engaged = 0;
- /* allow no pollers for 100 milliseconds, then engage backup polling */
- gpr_timespec allow_no_pollers = gpr_time_from_millis(100);
-
- gpr_mu_lock(&grpc_iomgr_mu);
- while (!g_shutdown_backup_poller) {
- if (g_num_pollers == 0) {
- gpr_timespec now = gpr_now();
- gpr_timespec time_until_engage = gpr_time_sub(
- allow_no_pollers, gpr_time_sub(now, g_last_poll_completed));
- if (gpr_time_cmp(time_until_engage, gpr_time_0) <= 0) {
- if (!backup_poller_engaged) {
- gpr_log(GPR_DEBUG, "No pollers for a while - engaging backup poller");
- backup_poller_engaged = 1;
- }
- if (!maybe_do_queue_work()) {
- gpr_timespec next = grpc_alarm_list_next_timeout();
- if (!maybe_do_alarm_work(now, next)) {
- gpr_timespec deadline =
- gpr_time_min(next, gpr_time_add(now, gpr_time_from_seconds(1)));
- maybe_do_polling_work(
- gpr_timeval_from_timespec(gpr_time_sub(deadline, now)));
- }
- }
- } else {
- if (backup_poller_engaged) {
- gpr_log(GPR_DEBUG, "Backup poller disengaged");
- backup_poller_engaged = 0;
- }
- gpr_mu_unlock(&grpc_iomgr_mu);
- gpr_sleep_until(gpr_time_add(now, time_until_engage));
- gpr_mu_lock(&grpc_iomgr_mu);
- }
- } else {
- if (backup_poller_engaged) {
- gpr_log(GPR_DEBUG, "Backup poller disengaged");
- backup_poller_engaged = 0;
- }
- gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, gpr_inf_future);
- }
- }
- gpr_mu_unlock(&grpc_iomgr_mu);
-
- gpr_event_set(&g_backup_poller_done, (void *)1);
-}
-
-void grpc_iomgr_init() {
- gpr_thd_id backup_poller_id;
-
- if (evthread_use_threads() != 0) {
- gpr_log(GPR_ERROR, "Failed to initialize libevent thread support!");
- abort();
- }
-
- grpc_alarm_list_init(gpr_now());
-
- gpr_mu_init(&grpc_iomgr_mu);
- gpr_cv_init(&grpc_iomgr_cv);
- g_activation_queue = NULL;
- g_num_pollers = 0;
- g_num_fds = 0;
- g_num_address_resolutions = 0;
- g_last_poll_completed = gpr_now();
- g_shutdown_backup_poller = 0;
- g_fds_to_free = NULL;
-
- gpr_event_init(&g_backup_poller_done);
-
- g_event_base = NULL;
- g_timeout_ev = NULL;
- g_break_ev = NULL;
-
- g_event_base = event_base_new();
- if (!g_event_base) {
- gpr_log(GPR_ERROR, "Failed to create the event base");
- abort();
- }
-
- if (evthread_make_base_notifiable(g_event_base) != 0) {
- gpr_log(GPR_ERROR, "Couldn't make event base notifiable cross threads!");
- abort();
- }
-
- g_timeout_ev = evtimer_new(g_event_base, timer_callback, g_event_base);
- g_break_ev = event_new(g_event_base, -1, EV_READ | EV_PERSIST, break_callback,
- g_event_base);
-
- event_add(g_break_ev, NULL);
-
- gpr_thd_new(&backup_poller_id, backup_poller_thread, NULL, NULL);
-}
-
-void grpc_iomgr_shutdown() {
- gpr_timespec fd_shutdown_deadline =
- gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
-
- /* broadcast shutdown */
- gpr_mu_lock(&grpc_iomgr_mu);
- while (g_num_fds > 0 || g_num_address_resolutions > 0) {
- gpr_log(GPR_INFO,
- "waiting for %d fds and %d name resolutions to be destroyed before "
- "closing event manager",
- g_num_fds, g_num_address_resolutions);
- if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, fd_shutdown_deadline)) {
- gpr_log(GPR_ERROR,
- "not all fds or name resolutions destroyed before shutdown "
- "deadline: memory leaks "
- "are likely");
- break;
- } else if (g_num_fds == 0 && g_num_address_resolutions == 0) {
- gpr_log(GPR_INFO, "all fds closed, all name resolutions finished");
- }
- }
-
- g_shutdown_backup_poller = 1;
- gpr_cv_broadcast(&grpc_iomgr_cv);
- gpr_mu_unlock(&grpc_iomgr_mu);
-
- gpr_event_wait(&g_backup_poller_done, gpr_inf_future);
-
- grpc_alarm_list_shutdown();
-
- /* drain pending work */
- gpr_mu_lock(&grpc_iomgr_mu);
- while (maybe_do_queue_work())
- ;
- gpr_mu_unlock(&grpc_iomgr_mu);
-
- free_fd_list(g_fds_to_free);
-
- /* complete shutdown */
- gpr_mu_destroy(&grpc_iomgr_mu);
- gpr_cv_destroy(&grpc_iomgr_cv);
-
- if (g_timeout_ev != NULL) {
- event_free(g_timeout_ev);
- }
-
- if (g_break_ev != NULL) {
- event_free(g_break_ev);
- }
-
- if (g_event_base != NULL) {
- event_base_free(g_event_base);
- g_event_base = NULL;
- }
-}
-
-static void add_task(grpc_libevent_activation_data *adata) {
- gpr_mu_lock(&grpc_iomgr_mu);
- if (g_activation_queue) {
- adata->next = g_activation_queue;
- adata->prev = adata->next->prev;
- adata->next->prev = adata->prev->next = adata;
- } else {
- g_activation_queue = adata;
- adata->next = adata->prev = adata;
- }
- gpr_cv_broadcast(&grpc_iomgr_cv);
- gpr_mu_unlock(&grpc_iomgr_mu);
-}
-
-static void grpc_fd_impl_destroy(grpc_fd *impl) {
- grpc_em_task_activity_type type;
- grpc_libevent_activation_data *adata;
-
- for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) {
- adata = &(impl->task.activation[type]);
- GPR_ASSERT(adata->next == NULL);
- if (adata->ev != NULL) {
- event_free(adata->ev);
- adata->ev = NULL;
- }
- }
-
- if (impl->shutdown_ev != NULL) {
- event_free(impl->shutdown_ev);
- impl->shutdown_ev = NULL;
- }
- gpr_mu_destroy(&impl->mu);
- close(impl->fd);
-}
-
-/* Proxy callback to call a gRPC read/write callback */
-static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) {
- grpc_fd *em_fd = arg;
- grpc_iomgr_cb_status status = GRPC_CALLBACK_SUCCESS;
- int run_read_cb = 0;
- int run_write_cb = 0;
- grpc_libevent_activation_data *rdata, *wdata;
-
- gpr_mu_lock(&em_fd->mu);
- if (em_fd->shutdown_started) {
- status = GRPC_CALLBACK_CANCELLED;
- } else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) {
- status = GRPC_CALLBACK_TIMED_OUT;
- /* TODO(klempner): This is broken if we are monitoring both read and write
- events on the same fd -- generating a spurious event is okay, but
- generating a spurious timeout is not. */
- what |= (EV_READ | EV_WRITE);
- }
-
- if (what & EV_READ) {
- switch (em_fd->read_state) {
- case GRPC_FD_WAITING:
- run_read_cb = 1;
- em_fd->read_state = GRPC_FD_IDLE;
- break;
- case GRPC_FD_IDLE:
- case GRPC_FD_CACHED:
- em_fd->read_state = GRPC_FD_CACHED;
- }
- }
- if (what & EV_WRITE) {
- switch (em_fd->write_state) {
- case GRPC_FD_WAITING:
- run_write_cb = 1;
- em_fd->write_state = GRPC_FD_IDLE;
- break;
- case GRPC_FD_IDLE:
- case GRPC_FD_CACHED:
- em_fd->write_state = GRPC_FD_CACHED;
- }
- }
-
- if (run_read_cb) {
- rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]);
- rdata->status = status;
- add_task(rdata);
- } else if (run_write_cb) {
- wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]);
- wdata->status = status;
- add_task(wdata);
- }
- gpr_mu_unlock(&em_fd->mu);
-}
-
-static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) {
- /* TODO(klempner): This could just run directly in the calling thread, except
- that libevent's handling of event_active() on an event which is already in
- flight on a different thread is racy and easily triggers TSAN.
- */
- grpc_fd *impl = arg;
- gpr_mu_lock(&impl->mu);
- impl->shutdown_started = 1;
- if (impl->read_state == GRPC_FD_WAITING) {
- event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1);
- }
- if (impl->write_state == GRPC_FD_WAITING) {
- event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1);
- }
- gpr_mu_unlock(&impl->mu);
-}
-
-grpc_fd *grpc_fd_create(int fd) {
- int flags;
- grpc_libevent_activation_data *rdata, *wdata;
- grpc_fd *impl = gpr_malloc(sizeof(grpc_fd));
-
- gpr_mu_lock(&grpc_iomgr_mu);
- g_num_fds++;
- gpr_mu_unlock(&grpc_iomgr_mu);
-
- impl->shutdown_ev = NULL;
- gpr_mu_init(&impl->mu);
-
- flags = fcntl(fd, F_GETFL, 0);
- GPR_ASSERT((flags & O_NONBLOCK) != 0);
-
- impl->task.type = GRPC_EM_TASK_FD;
- impl->fd = fd;
-
- rdata = &(impl->task.activation[GRPC_EM_TA_READ]);
- rdata->ev = NULL;
- rdata->cb = NULL;
- rdata->arg = NULL;
- rdata->status = GRPC_CALLBACK_SUCCESS;
- rdata->prev = NULL;
- rdata->next = NULL;
-
- wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]);
- wdata->ev = NULL;
- wdata->cb = NULL;
- wdata->arg = NULL;
- wdata->status = GRPC_CALLBACK_SUCCESS;
- wdata->prev = NULL;
- wdata->next = NULL;
-
- impl->read_state = GRPC_FD_IDLE;
- impl->write_state = GRPC_FD_IDLE;
-
- impl->shutdown_started = 0;
- impl->next = NULL;
-
- /* TODO(chenw): detect platforms where only level trigger is supported,
- and set the event to non-persist. */
- rdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ,
- em_fd_cb, impl);
- GPR_ASSERT(rdata->ev);
-
- wdata->ev = event_new(g_event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE,
- em_fd_cb, impl);
- GPR_ASSERT(wdata->ev);
-
- impl->shutdown_ev =
- event_new(g_event_base, -1, EV_READ, em_fd_shutdown_cb, impl);
- GPR_ASSERT(impl->shutdown_ev);
-
- return impl;
-}
-
-static void do_nothing(void *ignored, grpc_iomgr_cb_status also_ignored) {}
-
-void grpc_fd_destroy(grpc_fd *impl, grpc_iomgr_cb_func on_done,
- void *user_data) {
- if (on_done == NULL) on_done = do_nothing;
-
- gpr_mu_lock(&grpc_iomgr_mu);
-
- /* Put the impl on the list to be destroyed by the poller. */
- impl->on_done = on_done;
- impl->on_done_user_data = user_data;
- impl->next = g_fds_to_free;
- g_fds_to_free = impl;
- /* TODO(ctiller): kick the poller so it destroys this fd promptly
- (currently we may wait up to a second) */
-
- g_num_fds--;
- gpr_cv_broadcast(&grpc_iomgr_cv);
- gpr_mu_unlock(&grpc_iomgr_mu);
-}
-
-int grpc_fd_get(struct grpc_fd *em_fd) { return em_fd->fd; }
-
-/* TODO(chenw): should we enforce the contract that notify_on_read cannot be
- called when the previously registered callback has not been called yet. */
-int grpc_fd_notify_on_read(grpc_fd *impl, grpc_iomgr_cb_func read_cb,
- void *read_cb_arg, gpr_timespec deadline) {
- int force_event = 0;
- grpc_libevent_activation_data *rdata;
- gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
- struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
- struct timeval *delayp =
- gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
-
- rdata = &impl->task.activation[GRPC_EM_TA_READ];
-
- gpr_mu_lock(&impl->mu);
- rdata->cb = read_cb;
- rdata->arg = read_cb_arg;
-
- force_event = (impl->shutdown_started || impl->read_state == GRPC_FD_CACHED);
- impl->read_state = GRPC_FD_WAITING;
-
- if (force_event) {
- event_active(rdata->ev, EV_READ, 1);
- } else if (event_add(rdata->ev, delayp) == -1) {
- gpr_mu_unlock(&impl->mu);
- return 0;
- }
- gpr_mu_unlock(&impl->mu);
- return 1;
-}
-
-int grpc_fd_notify_on_write(grpc_fd *impl, grpc_iomgr_cb_func write_cb,
- void *write_cb_arg, gpr_timespec deadline) {
- int force_event = 0;
- grpc_libevent_activation_data *wdata;
- gpr_timespec delay_timespec = gpr_time_sub(deadline, gpr_now());
- struct timeval delay = gpr_timeval_from_timespec(delay_timespec);
- struct timeval *delayp =
- gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
-
- wdata = &impl->task.activation[GRPC_EM_TA_WRITE];
-
- gpr_mu_lock(&impl->mu);
- wdata->cb = write_cb;
- wdata->arg = write_cb_arg;
-
- force_event = (impl->shutdown_started || impl->write_state == GRPC_FD_CACHED);
- impl->write_state = GRPC_FD_WAITING;
-
- if (force_event) {
- event_active(wdata->ev, EV_WRITE, 1);
- } else if (event_add(wdata->ev, delayp) == -1) {
- gpr_mu_unlock(&impl->mu);
- return 0;
- }
- gpr_mu_unlock(&impl->mu);
- return 1;
-}
-
-void grpc_fd_shutdown(grpc_fd *em_fd) {
- event_active(em_fd->shutdown_ev, EV_READ, 1);
-}
-
-/* Sometimes we want a followup callback: something to be added from the
- current callback for the EM to invoke once this callback is complete.
- This is implemented by inserting an entry into an EM queue. */
-
-/* The following structure holds the field needed for adding the
- followup callback. These are the argument for the followup callback,
- the function to use for the followup callback, and the
- activation data pointer used for the queues (to free in the CB) */
-struct followup_callback_arg {
- grpc_iomgr_cb_func func;
- void *cb_arg;
- grpc_libevent_activation_data adata;
-};
-
-static void followup_proxy_callback(void *cb_arg, grpc_iomgr_cb_status status) {
- struct followup_callback_arg *fcb_arg = cb_arg;
- /* Invoke the function */
- fcb_arg->func(fcb_arg->cb_arg, status);
- gpr_free(fcb_arg);
-}
-
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
- grpc_libevent_activation_data *adptr;
- struct followup_callback_arg *fcb_arg;
-
- fcb_arg = gpr_malloc(sizeof(*fcb_arg));
- /* Set up the activation data and followup callback argument structures */
- adptr = &fcb_arg->adata;
- adptr->ev = NULL;
- adptr->cb = followup_proxy_callback;
- adptr->arg = fcb_arg;
- adptr->status = GRPC_CALLBACK_SUCCESS;
- adptr->prev = NULL;
- adptr->next = NULL;
-
- fcb_arg->func = cb;
- fcb_arg->cb_arg = cb_arg;
-
- /* Insert an activation data for the specified em */
- add_task(adptr);
-}
diff --git a/src/core/iomgr/iomgr_libevent.h b/src/core/iomgr/iomgr_libevent.h
deleted file mode 100644
index 5c088006a0..0000000000
--- a/src/core/iomgr/iomgr_libevent.h
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__
-#define __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__
-
-#include "src/core/iomgr/iomgr.h"
-#include <grpc/support/sync.h>
-#include <grpc/support/time.h>
-
-typedef struct grpc_fd grpc_fd;
-
-/* gRPC event manager task "base class". This is pretend-inheritance in C89.
- This should be the first member of any actual grpc_em task type.
-
- Memory warning: expanding this will increase memory usage in any derived
- class, so be careful.
-
- For generality, this base can be on multiple task queues and can have
- multiple event callbacks registered. Not all "derived classes" will use
- this feature. */
-
-typedef enum grpc_libevent_task_type {
- GRPC_EM_TASK_ALARM,
- GRPC_EM_TASK_FD,
- GRPC_EM_TASK_DO_NOT_USE
-} grpc_libevent_task_type;
-
-/* Different activity types to shape the callback and queueing arrays */
-typedef enum grpc_em_task_activity_type {
- GRPC_EM_TA_READ, /* use this also for single-type events */
- GRPC_EM_TA_WRITE,
- GRPC_EM_TA_COUNT
-} grpc_em_task_activity_type;
-
-/* Include the following #define for convenience for tasks like alarms that
- only have a single type */
-#define GRPC_EM_TA_ONLY GRPC_EM_TA_READ
-
-typedef struct grpc_libevent_activation_data {
- struct event *ev; /* event activated on this callback type */
- grpc_iomgr_cb_func cb; /* function pointer for callback */
- void *arg; /* argument passed to cb */
-
- /* Hold the status associated with the callback when queued */
- grpc_iomgr_cb_status status;
- /* Now set up to link activations into scheduler queues */
- struct grpc_libevent_activation_data *prev;
- struct grpc_libevent_activation_data *next;
-} grpc_libevent_activation_data;
-
-typedef struct grpc_libevent_task {
- grpc_libevent_task_type type;
-
- /* Now have an array of activation data elements: one for each activity
- type that could get activated */
- grpc_libevent_activation_data activation[GRPC_EM_TA_COUNT];
-} grpc_libevent_task;
-
-/* Initialize *em_fd.
- Requires fd is a non-blocking file descriptor.
-
- This takes ownership of closing fd.
-
- Requires: *em_fd uninitialized. fd is a non-blocking file descriptor. */
-grpc_fd *grpc_fd_create(int fd);
-
-/* Cause *em_fd no longer to be initialized and closes the underlying fd.
- on_done is called when the underlying file descriptor is definitely close()d.
- If on_done is NULL, no callback will be made.
- Requires: *em_fd initialized; no outstanding notify_on_read or
- notify_on_write. */
-void grpc_fd_destroy(grpc_fd *em_fd, grpc_iomgr_cb_func on_done,
- void *user_data);
-
-/* Returns the file descriptor associated with *em_fd. */
-int grpc_fd_get(grpc_fd *em_fd);
-
-/* Register read interest, causing read_cb to be called once when em_fd becomes
- readable, on deadline specified by deadline, or on shutdown triggered by
- grpc_fd_shutdown.
- read_cb will be called with read_cb_arg when *em_fd becomes readable.
- read_cb is Called with status of GRPC_CALLBACK_SUCCESS if readable,
- GRPC_CALLBACK_TIMED_OUT if the call timed out,
- and CANCELLED if the call was cancelled.
-
- Requires:This method must not be called before the read_cb for any previous
- call runs. Edge triggered events are used whenever they are supported by the
- underlying platform. This means that users must drain em_fd in read_cb before
- calling notify_on_read again. Users are also expected to handle spurious
- events, i.e read_cb is called while nothing can be readable from em_fd */
-int grpc_fd_notify_on_read(grpc_fd *em_fd, grpc_iomgr_cb_func read_cb,
- void *read_cb_arg, gpr_timespec deadline);
-
-/* Exactly the same semantics as above, except based on writable events. */
-int grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb,
- void *write_cb_arg, gpr_timespec deadline);
-
-/* Cause any current and all future read/write callbacks to error out with
- GRPC_CALLBACK_CANCELLED. */
-void grpc_fd_shutdown(grpc_fd *em_fd);
-
-/* =================== Event caching ===================
- In order to not miss or double-return edges in the context of edge triggering
- and multithreading, we need a per-fd caching layer in the eventmanager itself
- to cache relevant events.
-
- There are two types of events we care about: calls to notify_on_[read|write]
- and readable/writable events for the socket from eventfd. There are separate
- event caches for read and write.
-
- There are three states:
- 0. "waiting" -- There's been a call to notify_on_[read|write] which has not
- had a corresponding event. In other words, we're waiting for an event so we
- can run the callback.
- 1. "idle" -- We are neither waiting nor have a cached event.
- 2. "cached" -- There has been a read/write event without a waiting callback,
- so we want to run the event next time the application calls
- notify_on_[read|write].
-
- The high level state diagram:
-
- +--------------------------------------------------------------------+
- | WAITING | IDLE | CACHED |
- | | | |
- | 1. --*-> 2. --+-> 3. --+\
- | | | <--+/
- | | | |
- x+-- 6. 5. <-+-- 4. <-*-- |
- | | | |
- +--------------------------------------------------------------------+
-
- Transitions right occur on read|write events. Transitions left occur on
- notify_on_[read|write] events.
- State transitions:
- 1. Read|Write event while waiting -> run the callback and transition to idle.
- 2. Read|Write event while idle -> transition to cached.
- 3. Read|Write event with one already cached -> still cached.
- 4. notify_on_[read|write] with event cached: run callback and transition to
- idle.
- 5. notify_on_[read|write] when idle: Store callback and transition to
- waiting.
- 6. notify_on_[read|write] when waiting: invalid. */
-
-typedef enum grpc_fd_state {
- GRPC_FD_WAITING = 0,
- GRPC_FD_IDLE = 1,
- GRPC_FD_CACHED = 2
-} grpc_fd_state;
-
-/* gRPC file descriptor handle.
- The handle is used to register read/write callbacks to a file descriptor */
-struct grpc_fd {
- grpc_libevent_task task; /* Base class, callbacks, queues, etc */
- int fd; /* File descriptor */
-
- /* Note that the shutdown event is only needed as a workaround for libevent
- not properly handling event_active on an in flight event. */
- struct event *shutdown_ev; /* activated to trigger shutdown */
-
- /* protect shutdown_started|read_state|write_state and ensure barriers
- between notify_on_[read|write] and read|write callbacks */
- gpr_mu mu;
- int shutdown_started; /* 0 -> shutdown not started, 1 -> started */
- grpc_fd_state read_state;
- grpc_fd_state write_state;
-
- /* descriptor delete list. These are destroyed during polling. */
- struct grpc_fd *next;
- grpc_iomgr_cb_func on_done;
- void *on_done_user_data;
-};
-
-void grpc_iomgr_ref_address_resolution(int delta);
-
-#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_LIBEVENT_H__ */
diff --git a/src/core/iomgr/pollset.c b/src/core/iomgr/iomgr_posix.c
index 62a0019eb3..ff9195ec1d 100644
--- a/src/core/iomgr/pollset.c
+++ b/src/core/iomgr/iomgr_posix.c
@@ -31,7 +31,8 @@
*
*/
-#include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/iomgr_posix.h"
-void grpc_pollset_init(grpc_pollset *pollset) { pollset->unused = 0; }
-void grpc_pollset_destroy(grpc_pollset *pollset) {}
+void grpc_iomgr_platform_init() { grpc_pollset_global_init(); }
+
+void grpc_iomgr_platform_shutdown() { grpc_pollset_global_shutdown(); }
diff --git a/src/core/iomgr/iomgr_completion_queue_interface.h b/src/core/iomgr/iomgr_posix.h
index 3c4efe773a..ca5af3e527 100644
--- a/src/core/iomgr/iomgr_completion_queue_interface.h
+++ b/src/core/iomgr/iomgr_posix.h
@@ -31,15 +31,12 @@
*
*/
-#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_
-#define __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_
+#ifndef __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_
+#define __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_
-/* Internals of iomgr that are exposed only to be used for completion queue
- implementation */
+#include "src/core/iomgr/iomgr_internal.h"
-extern gpr_mu grpc_iomgr_mu;
-extern gpr_cv grpc_iomgr_cv;
+void grpc_pollset_global_init();
+void grpc_pollset_global_shutdown();
-int grpc_iomgr_work(gpr_timespec deadline);
-
-#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_COMPLETION_QUEUE_INTERFACE_H_ */
+#endif /* __GRPC_INTERNAL_IOMGR_IOMGR_POSIX_H_ */
diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h
index ba1a9d5429..7374a4ec13 100644
--- a/src/core/iomgr/pollset.h
+++ b/src/core/iomgr/pollset.h
@@ -34,18 +34,31 @@
#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_H_
#define __GRPC_INTERNAL_IOMGR_POLLSET_H_
+#include <grpc/support/port_platform.h>
+
/* A grpc_pollset is a set of file descriptors that a higher level item is
interested in. For example:
- a server will typically keep a pollset containing all connected channels,
so that it can find new calls to service
- a completion queue might keep a pollset with an entry for each transport
that is servicing a call that it's tracking */
-/* Eventually different implementations of iomgr will provide their own
- grpc_pollset structs. As this is just a dummy wrapper to get the API in,
- we just define a simple type here. */
-typedef struct { char unused; } grpc_pollset;
+
+#ifdef GPR_POSIX_SOCKET
+#include "src/core/iomgr/pollset_posix.h"
+#endif
void grpc_pollset_init(grpc_pollset *pollset);
void grpc_pollset_destroy(grpc_pollset *pollset);
+/* Do some work on a pollset.
+ May involve invoking asynchronous callbacks, or actually polling file
+ descriptors.
+ Requires GRPC_POLLSET_MU(pollset) locked.
+ May unlock GRPC_POLLSET_MU(pollset) during its execution. */
+int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline);
+
+/* Break a pollset out of polling work
+ Requires GRPC_POLLSET_MU(pollset) locked. */
+void grpc_pollset_kick(grpc_pollset *pollset);
+
#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_H_ */
diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
new file mode 100644
index 0000000000..06c7a5a0dd
--- /dev/null
+++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c
@@ -0,0 +1,237 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_MULTIPOLL_WITH_POLL
+
+#include "src/core/iomgr/pollset_posix.h"
+
+#include <errno.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/useful.h>
+
+typedef struct {
+ /* all polled fds */
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+ /* fds being polled by the current poller: parallel arrays of pollfd and the
+ * grpc_fd* that the pollfd was constructed from */
+ size_t pfd_count;
+ size_t pfd_capacity;
+ grpc_fd **selfds;
+ struct pollfd *pfds;
+ /* fds that have been removed from the pollset explicitly */
+ size_t del_count;
+ size_t del_capacity;
+ grpc_fd **dels;
+} pollset_hdr;
+
+static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset,
+ grpc_fd *fd) {
+ size_t i;
+ pollset_hdr *h = pollset->data.ptr;
+ /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
+ for (i = 0; i < h->fd_count; i++) {
+ if (h->fds[i] == fd) return;
+ }
+ if (h->fd_count == h->fd_capacity) {
+ h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2);
+ h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity);
+ }
+ h->fds[h->fd_count++] = fd;
+ grpc_fd_ref(fd);
+}
+
+static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset,
+ grpc_fd *fd) {
+ /* will get removed next poll cycle */
+ pollset_hdr *h = pollset->data.ptr;
+ if (h->del_count == h->del_capacity) {
+ h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2);
+ h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity);
+ }
+ h->dels[h->del_count++] = fd;
+ grpc_fd_ref(fd);
+}
+
+static void end_polling(grpc_pollset *pollset) {
+ size_t i;
+ pollset_hdr *h;
+ h = pollset->data.ptr;
+ for (i = 1; i < h->pfd_count; i++) {
+ grpc_fd_end_poll(h->selfds[i], pollset);
+ }
+}
+
+static int multipoll_with_poll_pollset_maybe_work(
+ grpc_pollset *pollset, gpr_timespec deadline, gpr_timespec now,
+ int allow_synchronous_callback) {
+ int timeout;
+ int r;
+ size_t i, np, nf, nd;
+ pollset_hdr *h;
+
+ if (pollset->counter) {
+ return 0;
+ }
+ h = pollset->data.ptr;
+ if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
+ timeout = -1;
+ } else {
+ timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
+ if (timeout <= 0) {
+ return 1;
+ }
+ }
+ if (h->pfd_capacity < h->fd_count + 1) {
+ h->pfd_capacity = GPR_MAX(h->pfd_capacity * 3 / 2, h->fd_count + 1);
+ gpr_free(h->pfds);
+ gpr_free(h->selfds);
+ h->pfds = gpr_malloc(sizeof(struct pollfd) * h->pfd_capacity);
+ h->selfds = gpr_malloc(sizeof(grpc_fd *) * h->pfd_capacity);
+ }
+ nf = 0;
+ np = 1;
+ h->pfds[0].fd = grpc_kick_read_fd(pollset);
+ h->pfds[0].events = POLLIN;
+ h->pfds[0].revents = POLLOUT;
+ for (i = 0; i < h->fd_count; i++) {
+ int remove = grpc_fd_is_orphaned(h->fds[i]);
+ for (nd = 0; nd < h->del_count; nd++) {
+ if (h->fds[i] == h->dels[nd]) remove = 1;
+ }
+ if (remove) {
+ grpc_fd_unref(h->fds[i]);
+ } else {
+ h->fds[nf++] = h->fds[i];
+ h->pfds[np].events =
+ grpc_fd_begin_poll(h->fds[i], pollset, POLLIN, POLLOUT);
+ h->selfds[np] = h->fds[i];
+ h->pfds[np].fd = h->fds[i]->fd;
+ h->pfds[np].revents = 0;
+ np++;
+ }
+ }
+ h->pfd_count = np;
+ h->fd_count = nf;
+ for (nd = 0; nd < h->del_count; nd++) {
+ grpc_fd_unref(h->dels[nd]);
+ }
+ h->del_count = 0;
+ if (h->pfd_count == 0) {
+ end_polling(pollset);
+ return 0;
+ }
+ pollset->counter = 1;
+ gpr_mu_unlock(&pollset->mu);
+
+ r = poll(h->pfds, h->pfd_count, timeout);
+ if (r < 0) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ } else if (r == 0) {
+ /* do nothing */
+ } else {
+ if (h->pfds[0].revents & POLLIN) {
+ grpc_kick_drain(pollset);
+ }
+ for (i = 1; i < np; i++) {
+ if (h->pfds[i].revents & POLLIN) {
+ grpc_fd_become_readable(h->selfds[i], allow_synchronous_callback);
+ }
+ if (h->pfds[i].revents & POLLOUT) {
+ grpc_fd_become_writable(h->selfds[i], allow_synchronous_callback);
+ }
+ }
+ }
+ end_polling(pollset);
+
+ gpr_mu_lock(&pollset->mu);
+ pollset->counter = 0;
+ gpr_cv_broadcast(&pollset->cv);
+ return 1;
+}
+
+static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
+ size_t i;
+ pollset_hdr *h = pollset->data.ptr;
+ GPR_ASSERT(pollset->counter == 0);
+ for (i = 0; i < h->fd_count; i++) {
+ grpc_fd_unref(h->fds[i]);
+ }
+ for (i = 0; i < h->del_count; i++) {
+ grpc_fd_unref(h->dels[i]);
+ }
+ gpr_free(h->pfds);
+ gpr_free(h->selfds);
+ gpr_free(h->fds);
+ gpr_free(h->dels);
+ gpr_free(h);
+}
+
+static const grpc_pollset_vtable multipoll_with_poll_pollset = {
+ multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd,
+ multipoll_with_poll_pollset_maybe_work,
+ multipoll_with_poll_pollset_destroy};
+
+void grpc_platform_become_multipoller(grpc_pollset *pollset, grpc_fd **fds,
+ size_t nfds) {
+ size_t i;
+ pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
+ pollset->vtable = &multipoll_with_poll_pollset;
+ pollset->data.ptr = h;
+ h->fd_count = nfds;
+ h->fd_capacity = nfds;
+ h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
+ h->pfd_count = 0;
+ h->pfd_capacity = 0;
+ h->pfds = NULL;
+ h->selfds = NULL;
+ h->del_count = 0;
+ h->del_capacity = 0;
+ h->dels = NULL;
+ for (i = 0; i < nfds; i++) {
+ h->fds[i] = fds[i];
+ grpc_fd_ref(fds[i]);
+ }
+}
+
+#endif
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
new file mode 100644
index 0000000000..ba4031e11f
--- /dev/null
+++ b/src/core/iomgr/pollset_posix.c
@@ -0,0 +1,340 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/pollset_posix.h"
+
+#include <errno.h>
+#include <poll.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "src/core/iomgr/alarm_internal.h"
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/iomgr_internal.h"
+#include "src/core/iomgr/socket_utils_posix.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/useful.h>
+
+/* kick pipes: we keep a sharded set of pipes to allow breaking from poll.
+ Ideally this would be 1:1 with pollsets, but we'd like to avoid associating
+ full kernel objects with each pollset to keep them lightweight, so instead
+ keep a sharded set and allow associating a pollset with one of the shards.
+
+ TODO(ctiller): move this out from this file, and allow an eventfd
+ implementation on linux */
+
+#define LOG2_KICK_SHARDS 6
+#define KICK_SHARDS (1 << LOG2_KICK_SHARDS)
+
+static int g_kick_pipes[KICK_SHARDS][2];
+static grpc_pollset g_backup_pollset;
+static int g_shutdown_backup_poller;
+static gpr_event g_backup_poller_done;
+
+static void backup_poller(void *p) {
+ gpr_timespec delta = gpr_time_from_millis(100);
+ gpr_timespec last_poll = gpr_now();
+
+ gpr_mu_lock(&g_backup_pollset.mu);
+ while (g_shutdown_backup_poller == 0) {
+ gpr_timespec next_poll = gpr_time_add(last_poll, delta);
+ grpc_pollset_work(&g_backup_pollset, next_poll);
+ gpr_mu_unlock(&g_backup_pollset.mu);
+ gpr_sleep_until(next_poll);
+ gpr_mu_lock(&g_backup_pollset.mu);
+ last_poll = next_poll;
+ }
+ gpr_mu_unlock(&g_backup_pollset.mu);
+
+ gpr_event_set(&g_backup_poller_done, (void *)1);
+}
+
+static size_t kick_shard(const grpc_pollset *info) {
+ size_t x = (size_t)info;
+ return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (KICK_SHARDS - 1);
+}
+
+int grpc_kick_read_fd(grpc_pollset *p) {
+ return g_kick_pipes[kick_shard(p)][0];
+}
+
+static int grpc_kick_write_fd(grpc_pollset *p) {
+ return g_kick_pipes[kick_shard(p)][1];
+}
+
+void grpc_pollset_force_kick(grpc_pollset *p) {
+ char c = 0;
+ while (write(grpc_kick_write_fd(p), &c, 1) != 1 && errno == EINTR)
+ ;
+}
+
+void grpc_pollset_kick(grpc_pollset *p) {
+ if (!p->counter) return;
+ grpc_pollset_force_kick(p);
+}
+
+void grpc_kick_drain(grpc_pollset *p) {
+ int fd = grpc_kick_read_fd(p);
+ char buf[128];
+ int r;
+
+ for (;;) {
+ r = read(fd, buf, sizeof(buf));
+ if (r > 0) continue;
+ if (r == 0) return;
+ switch (errno) {
+ case EAGAIN:
+ return;
+ case EINTR:
+ continue;
+ default:
+ gpr_log(GPR_ERROR, "error reading pipe: %s", strerror(errno));
+ return;
+ }
+ }
+}
+
+/* global state management */
+
+grpc_pollset *grpc_backup_pollset() { return &g_backup_pollset; }
+
+void grpc_pollset_global_init() {
+ int i;
+ gpr_thd_id id;
+
+ /* initialize the kick shards */
+ for (i = 0; i < KICK_SHARDS; i++) {
+ GPR_ASSERT(0 == pipe(g_kick_pipes[i]));
+ GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][0], 1));
+ GPR_ASSERT(grpc_set_socket_nonblocking(g_kick_pipes[i][1], 1));
+ }
+
+ /* initialize the backup pollset */
+ grpc_pollset_init(&g_backup_pollset);
+
+ /* start the backup poller thread */
+ g_shutdown_backup_poller = 0;
+ gpr_event_init(&g_backup_poller_done);
+ gpr_thd_new(&id, backup_poller, NULL, NULL);
+}
+
+void grpc_pollset_global_shutdown() {
+ int i;
+
+ /* terminate the backup poller thread */
+ gpr_mu_lock(&g_backup_pollset.mu);
+ g_shutdown_backup_poller = 1;
+ gpr_mu_unlock(&g_backup_pollset.mu);
+ gpr_event_wait(&g_backup_poller_done, gpr_inf_future);
+
+ /* destroy the backup pollset */
+ grpc_pollset_destroy(&g_backup_pollset);
+
+ /* destroy the kick shards */
+ for (i = 0; i < KICK_SHARDS; i++) {
+ close(g_kick_pipes[i][0]);
+ close(g_kick_pipes[i][1]);
+ }
+}
+
+/* main interface */
+
+static void become_empty_pollset(grpc_pollset *pollset);
+static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd);
+
+void grpc_pollset_init(grpc_pollset *pollset) {
+ gpr_mu_init(&pollset->mu);
+ gpr_cv_init(&pollset->cv);
+ become_empty_pollset(pollset);
+}
+
+void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
+ gpr_mu_lock(&pollset->mu);
+ pollset->vtable->add_fd(pollset, fd);
+ gpr_cv_broadcast(&pollset->cv);
+ gpr_mu_unlock(&pollset->mu);
+}
+
+void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
+ gpr_mu_lock(&pollset->mu);
+ pollset->vtable->del_fd(pollset, fd);
+ gpr_cv_broadcast(&pollset->cv);
+ gpr_mu_unlock(&pollset->mu);
+}
+
+int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
+ /* pollset->mu already held */
+ gpr_timespec now;
+ now = gpr_now();
+ if (gpr_time_cmp(now, deadline) > 0) {
+ return 0;
+ }
+ if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
+ return 1;
+ }
+ if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
+ return 1;
+ }
+ return pollset->vtable->maybe_work(pollset, deadline, now, 1);
+}
+
+void grpc_pollset_destroy(grpc_pollset *pollset) {
+ pollset->vtable->destroy(pollset);
+ gpr_mu_destroy(&pollset->mu);
+ gpr_cv_destroy(&pollset->cv);
+}
+
+/*
+ * empty_pollset - a vtable that provides polling for NO file descriptors
+ */
+
+static void empty_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
+ become_unary_pollset(pollset, fd);
+}
+
+static void empty_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {}
+
+static int empty_pollset_maybe_work(grpc_pollset *pollset,
+ gpr_timespec deadline, gpr_timespec now,
+ int allow_synchronous_callback) {
+ return 0;
+}
+
+static void empty_pollset_destroy(grpc_pollset *pollset) {}
+
+static const grpc_pollset_vtable empty_pollset = {
+ empty_pollset_add_fd, empty_pollset_del_fd, empty_pollset_maybe_work,
+ empty_pollset_destroy};
+
+static void become_empty_pollset(grpc_pollset *pollset) {
+ pollset->vtable = &empty_pollset;
+}
+
+/*
+ * unary_poll_pollset - a vtable that provides polling for one file descriptor
+ * via poll()
+ */
+
+static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
+ grpc_fd *fds[2];
+ if (fd == pollset->data.ptr) return;
+ fds[0] = pollset->data.ptr;
+ fds[1] = fd;
+ grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds));
+ grpc_fd_unref(fds[0]);
+}
+
+static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) {
+ if (fd == pollset->data.ptr) {
+ grpc_fd_unref(pollset->data.ptr);
+ become_empty_pollset(pollset);
+ }
+}
+
+static int unary_poll_pollset_maybe_work(grpc_pollset *pollset,
+ gpr_timespec deadline,
+ gpr_timespec now,
+ int allow_synchronous_callback) {
+ struct pollfd pfd[2];
+ grpc_fd *fd;
+ int timeout;
+ int r;
+
+ if (pollset->counter) {
+ return 0;
+ }
+ fd = pollset->data.ptr;
+ if (grpc_fd_is_orphaned(fd)) {
+ grpc_fd_unref(fd);
+ become_empty_pollset(pollset);
+ return 0;
+ }
+ if (gpr_time_cmp(deadline, gpr_inf_future) == 0) {
+ timeout = -1;
+ } else {
+ timeout = gpr_time_to_millis(gpr_time_sub(deadline, now));
+ if (timeout <= 0) {
+ return 1;
+ }
+ }
+ pfd[0].fd = grpc_kick_read_fd(pollset);
+ pfd[0].events = POLLIN;
+ pfd[0].revents = 0;
+ pfd[1].fd = fd->fd;
+ pfd[1].events = grpc_fd_begin_poll(fd, pollset, POLLIN, POLLOUT);
+ pfd[1].revents = 0;
+ pollset->counter = 1;
+ gpr_mu_unlock(&pollset->mu);
+
+ r = poll(pfd, GPR_ARRAY_SIZE(pfd), timeout);
+ if (r < 0) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ } else if (r == 0) {
+ /* do nothing */
+ } else {
+ if (pfd[0].revents & POLLIN) {
+ grpc_kick_drain(pollset);
+ }
+ if (pfd[1].revents & POLLIN) {
+ grpc_fd_become_readable(fd, allow_synchronous_callback);
+ }
+ if (pfd[1].revents & POLLOUT) {
+ grpc_fd_become_writable(fd, allow_synchronous_callback);
+ }
+ }
+
+ gpr_mu_lock(&pollset->mu);
+ grpc_fd_end_poll(fd, pollset);
+ pollset->counter = 0;
+ gpr_cv_broadcast(&pollset->cv);
+ return 1;
+}
+
+static void unary_poll_pollset_destroy(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->counter == 0);
+ grpc_fd_unref(pollset->data.ptr);
+}
+
+static const grpc_pollset_vtable unary_poll_pollset = {
+ unary_poll_pollset_add_fd, unary_poll_pollset_del_fd,
+ unary_poll_pollset_maybe_work, unary_poll_pollset_destroy};
+
+static void become_unary_pollset(grpc_pollset *pollset, grpc_fd *fd) {
+ pollset->vtable = &unary_poll_pollset;
+ pollset->counter = 0;
+ pollset->data.ptr = fd;
+ grpc_fd_ref(fd);
+}
diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h
new file mode 100644
index 0000000000..f051079f5b
--- /dev/null
+++ b/src/core/iomgr/pollset_posix.h
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_
+#define __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_
+
+#include <grpc/support/sync.h>
+
+typedef struct grpc_pollset_vtable grpc_pollset_vtable;
+
+/* forward declare only in this file to avoid leaking impl details via
+ pollset.h; real users of grpc_fd should always include 'fd_posix.h' and not
+ use the struct tag */
+struct grpc_fd;
+
+typedef struct grpc_pollset {
+ /* pollsets under posix can mutate representation as fds are added and
+ removed.
+ For example, we may choose a poll() based implementation on linux for
+ few fds, and an epoll() based implementation for many fds */
+ const grpc_pollset_vtable *vtable;
+ gpr_mu mu;
+ gpr_cv cv;
+ int counter;
+ union {
+ int fd;
+ void *ptr;
+ } data;
+} grpc_pollset;
+
+struct grpc_pollset_vtable {
+ void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
+ void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd);
+ int (*maybe_work)(grpc_pollset *pollset, gpr_timespec deadline,
+ gpr_timespec now, int allow_synchronous_callback);
+ void (*destroy)(grpc_pollset *pollset);
+};
+
+#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
+#define GRPC_POLLSET_CV(pollset) (&(pollset)->cv)
+
+/* Add an fd to a pollset */
+void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd);
+/* Force remove an fd from a pollset (normally they are removed on the next
+ poll after an fd is orphaned) */
+void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd);
+
+/* Force any current pollers to break polling */
+void grpc_pollset_force_kick(grpc_pollset *pollset);
+/* Returns the fd to listen on for kicks */
+int grpc_kick_read_fd(grpc_pollset *p);
+/* Call after polling has been kicked to leave the kicked state */
+void grpc_kick_drain(grpc_pollset *p);
+
+/* All fds get added to a backup pollset to ensure that progress is made
+ regardless of applications listening to events. Relying on this is slow
+ however (the backup pollset only listens every 100ms or so) - so it's not
+ to be relied on. */
+grpc_pollset *grpc_backup_pollset();
+
+/* turn a pollset into a multipoller: platform specific */
+void grpc_platform_become_multipoller(grpc_pollset *pollset,
+ struct grpc_fd **fds, size_t fd_count);
+
+#endif /* __GRPC_INTERNAL_IOMGR_POLLSET_POSIX_H_ */
diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c
index a0a04297eb..c9c2c5378a 100644
--- a/src/core/iomgr/resolve_address_posix.c
+++ b/src/core/iomgr/resolve_address_posix.c
@@ -41,7 +41,7 @@
#include <unistd.h>
#include <string.h>
-#include "src/core/iomgr/iomgr_libevent.h"
+#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/alloc.h>
@@ -201,7 +201,7 @@ static void do_request(void *rp) {
gpr_free(r->default_port);
gpr_free(r);
cb(arg, resolved);
- grpc_iomgr_ref_address_resolution(-1);
+ grpc_iomgr_unref();
}
void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) {
@@ -213,7 +213,7 @@ void grpc_resolve_address(const char *name, const char *default_port,
grpc_resolve_cb cb, void *arg) {
request *r = gpr_malloc(sizeof(request));
gpr_thd_id id;
- grpc_iomgr_ref_address_resolution(1);
+ grpc_iomgr_ref();
r->name = gpr_strdup(name);
r->default_port = gpr_strdup(default_port);
r->cb = cb;
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index 88b599b582..d675c2dcec 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -38,7 +38,9 @@
#include <string.h>
#include <unistd.h>
-#include "src/core/iomgr/iomgr_libevent.h"
+#include "src/core/iomgr/alarm.h"
+#include "src/core/iomgr/iomgr_posix.h"
+#include "src/core/iomgr/pollset_posix.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
@@ -49,8 +51,11 @@
typedef struct {
void (*cb)(void *arg, grpc_endpoint *tcp);
void *cb_arg;
+ gpr_mu mu;
grpc_fd *fd;
gpr_timespec deadline;
+ grpc_alarm alarm;
+ int refs;
} async_connect;
static int prepare_socket(int fd) {
@@ -74,21 +79,42 @@ error:
return 0;
}
-static void on_writable(void *acp, grpc_iomgr_cb_status status) {
+static void on_alarm(void *acp, int success) {
+ int done;
+ async_connect *ac = acp;
+ gpr_mu_lock(&ac->mu);
+ if (ac->fd != NULL && success) {
+ grpc_fd_shutdown(ac->fd);
+ }
+ done = (--ac->refs == 0);
+ gpr_mu_unlock(&ac->mu);
+ if (done) {
+ gpr_mu_destroy(&ac->mu);
+ gpr_free(ac);
+ }
+}
+
+static void on_writable(void *acp, int success) {
async_connect *ac = acp;
int so_error = 0;
socklen_t so_error_size;
int err;
- int fd = grpc_fd_get(ac->fd);
+ int fd = ac->fd->fd;
+ int done;
+ grpc_endpoint *ep = NULL;
+ void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb;
+ void *cb_arg = ac->cb_arg;
+
+ grpc_alarm_cancel(&ac->alarm);
- if (status == GRPC_CALLBACK_SUCCESS) {
+ if (success) {
do {
so_error_size = sizeof(so_error);
err = getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_error, &so_error_size);
} while (err < 0 && errno == EINTR);
if (err < 0) {
gpr_log(GPR_ERROR, "getsockopt(ERROR): %s", strerror(errno));
- goto error;
+ goto finish;
} else if (so_error != 0) {
if (so_error == ENOBUFS) {
/* We will get one of these errors if we have run out of
@@ -106,7 +132,7 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) {
opened too many network connections. The "easy" fix:
don't do that! */
gpr_log(GPR_ERROR, "kernel out of buffers");
- grpc_fd_notify_on_write(ac->fd, on_writable, ac, ac->deadline);
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac);
return;
} else {
switch (so_error) {
@@ -117,27 +143,31 @@ static void on_writable(void *acp, grpc_iomgr_cb_status status) {
gpr_log(GPR_ERROR, "socket error: %d", so_error);
break;
}
- goto error;
+ goto finish;
}
} else {
- goto great_success;
+ ep = grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ goto finish;
}
} else {
- gpr_log(GPR_ERROR, "on_writable failed during connect: status=%d", status);
- goto error;
+ gpr_log(GPR_ERROR, "on_writable failed during connect");
+ goto finish;
}
abort();
-error:
- ac->cb(ac->cb_arg, NULL);
- grpc_fd_destroy(ac->fd, NULL, NULL);
- gpr_free(ac);
- return;
-
-great_success:
- ac->cb(ac->cb_arg, grpc_tcp_create(ac->fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
- gpr_free(ac);
+finish:
+ gpr_mu_lock(&ac->mu);
+ if (!ep) {
+ grpc_fd_orphan(ac->fd, NULL, NULL);
+ }
+ done = (--ac->refs == 0);
+ gpr_mu_unlock(&ac->mu);
+ if (done) {
+ gpr_mu_destroy(&ac->mu);
+ gpr_free(ac);
+ }
+ cb(cb_arg, ep);
}
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
@@ -176,6 +206,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
} while (err < 0 && errno == EINTR);
if (err >= 0) {
+ gpr_log(GPR_DEBUG, "instant connect");
cb(arg,
grpc_tcp_create(grpc_fd_create(fd), GRPC_TCP_DEFAULT_READ_SLICE_SIZE));
return;
@@ -191,7 +222,10 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
ac = gpr_malloc(sizeof(async_connect));
ac->cb = cb;
ac->cb_arg = arg;
- ac->deadline = deadline;
ac->fd = grpc_fd_create(fd);
- grpc_fd_notify_on_write(ac->fd, on_writable, ac, deadline);
+ gpr_mu_init(&ac->mu);
+ ac->refs = 2;
+
+ grpc_alarm_init(&ac->alarm, deadline, on_alarm, ac, gpr_now());
+ grpc_fd_notify_on_write(ac->fd, on_writable, ac);
}
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index bc3ce69e47..657f34aaf9 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -255,18 +255,14 @@ typedef struct {
grpc_endpoint_read_cb read_cb;
void *read_user_data;
- gpr_timespec read_deadline;
grpc_endpoint_write_cb write_cb;
void *write_user_data;
- gpr_timespec write_deadline;
grpc_tcp_slice_state write_state;
} grpc_tcp;
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
- grpc_iomgr_cb_status status);
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
- grpc_iomgr_cb_status status);
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success);
static void grpc_tcp_shutdown(grpc_endpoint *ep) {
grpc_tcp *tcp = (grpc_tcp *)ep;
@@ -276,7 +272,7 @@ static void grpc_tcp_shutdown(grpc_endpoint *ep) {
static void grpc_tcp_unref(grpc_tcp *tcp) {
int refcount_zero = gpr_unref(&tcp->refcount);
if (refcount_zero) {
- grpc_fd_destroy(tcp->em_fd, NULL, NULL);
+ grpc_fd_orphan(tcp->em_fd, NULL, NULL);
gpr_free(tcp);
}
}
@@ -308,8 +304,7 @@ static void call_read_cb(grpc_tcp *tcp, gpr_slice *slices, size_t nslices,
#define INLINE_SLICE_BUFFER_SIZE 8
#define MAX_READ_IOVEC 4
-static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
- grpc_iomgr_cb_status status) {
+static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
int iov_size = 1;
gpr_slice static_read_slices[INLINE_SLICE_BUFFER_SIZE];
@@ -324,18 +319,12 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
slice_state_init(&read_state, static_read_slices, INLINE_SLICE_BUFFER_SIZE,
0);
- if (status == GRPC_CALLBACK_CANCELLED) {
+ if (!success) {
call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_SHUTDOWN);
grpc_tcp_unref(tcp);
return;
}
- if (status == GRPC_CALLBACK_TIMED_OUT) {
- call_read_cb(tcp, NULL, 0, GRPC_ENDPOINT_CB_TIMED_OUT);
- grpc_tcp_unref(tcp);
- return;
- }
-
/* TODO(klempner): Limit the amount we read at once. */
for (;;) {
allocated_bytes = slice_state_append_blocks_into_iovec(
@@ -377,8 +366,7 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
} else {
/* Spurious read event, consume it here */
slice_state_destroy(&read_state);
- grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp,
- tcp->read_deadline);
+ grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
}
} else {
/* TODO(klempner): Log interesting errors */
@@ -407,14 +395,13 @@ static void grpc_tcp_handle_read(void *arg /* grpc_tcp */,
}
static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
- void *user_data, gpr_timespec deadline) {
+ void *user_data) {
grpc_tcp *tcp = (grpc_tcp *)ep;
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_user_data = user_data;
- tcp->read_deadline = deadline;
gpr_ref(&tcp->refcount);
- grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp, deadline);
+ grpc_fd_notify_on_read(tcp->em_fd, grpc_tcp_handle_read, tcp);
}
#define MAX_WRITE_IOVEC 16
@@ -460,34 +447,24 @@ static grpc_endpoint_write_status grpc_tcp_flush(grpc_tcp *tcp) {
};
}
-static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
- grpc_iomgr_cb_status status) {
+static void grpc_tcp_handle_write(void *arg /* grpc_tcp */, int success) {
grpc_tcp *tcp = (grpc_tcp *)arg;
grpc_endpoint_write_status write_status;
grpc_endpoint_cb_status cb_status;
grpc_endpoint_write_cb cb;
- cb_status = GRPC_ENDPOINT_CB_OK;
-
- if (status == GRPC_CALLBACK_CANCELLED) {
- cb_status = GRPC_ENDPOINT_CB_SHUTDOWN;
- } else if (status == GRPC_CALLBACK_TIMED_OUT) {
- cb_status = GRPC_ENDPOINT_CB_TIMED_OUT;
- }
-
- if (cb_status != GRPC_ENDPOINT_CB_OK) {
+ if (!success) {
slice_state_destroy(&tcp->write_state);
cb = tcp->write_cb;
tcp->write_cb = NULL;
- cb(tcp->write_user_data, cb_status);
+ cb(tcp->write_user_data, GRPC_ENDPOINT_CB_SHUTDOWN);
grpc_tcp_unref(tcp);
return;
}
write_status = grpc_tcp_flush(tcp);
if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
- grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
- tcp->write_deadline);
+ grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
} else {
slice_state_destroy(&tcp->write_state);
if (write_status == GRPC_ENDPOINT_WRITE_DONE) {
@@ -502,9 +479,11 @@ static void grpc_tcp_handle_write(void *arg /* grpc_tcp */,
}
}
-static grpc_endpoint_write_status grpc_tcp_write(
- grpc_endpoint *ep, gpr_slice *slices, size_t nslices,
- grpc_endpoint_write_cb cb, void *user_data, gpr_timespec deadline) {
+static grpc_endpoint_write_status grpc_tcp_write(grpc_endpoint *ep,
+ gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_write_cb cb,
+ void *user_data) {
grpc_tcp *tcp = (grpc_tcp *)ep;
grpc_endpoint_write_status status;
@@ -530,17 +509,15 @@ static grpc_endpoint_write_status grpc_tcp_write(
gpr_ref(&tcp->refcount);
tcp->write_cb = cb;
tcp->write_user_data = user_data;
- tcp->write_deadline = deadline;
- grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp,
- tcp->write_deadline);
+ grpc_fd_notify_on_write(tcp->em_fd, grpc_tcp_handle_write, tcp);
}
return status;
}
static void grpc_tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) {
- /* tickle the pollset so we crash if things aren't wired correctly */
- pollset->unused++;
+ grpc_tcp *tcp = (grpc_tcp *)ep;
+ grpc_pollset_add_fd(pollset, tcp->em_fd);
}
static const grpc_endpoint_vtable vtable = {
@@ -550,14 +527,12 @@ static const grpc_endpoint_vtable vtable = {
grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
tcp->base.vtable = &vtable;
- tcp->fd = grpc_fd_get(em_fd);
+ tcp->fd = em_fd->fd;
tcp->read_cb = NULL;
tcp->write_cb = NULL;
tcp->read_user_data = NULL;
tcp->write_user_data = NULL;
tcp->slice_size = slice_size;
- tcp->read_deadline = gpr_inf_future;
- tcp->write_deadline = gpr_inf_future;
slice_state_init(&tcp->write_state, NULL, 0, 0);
/* paired with unref in grpc_tcp_destroy */
gpr_ref_init(&tcp->refcount, 1);
diff --git a/src/core/iomgr/tcp_posix.h b/src/core/iomgr/tcp_posix.h
index 830394d534..c3eef1b4b7 100644
--- a/src/core/iomgr/tcp_posix.h
+++ b/src/core/iomgr/tcp_posix.h
@@ -45,7 +45,7 @@
*/
#include "src/core/iomgr/endpoint.h"
-#include "src/core/iomgr/iomgr_libevent.h"
+#include "src/core/iomgr/fd_posix.h"
#define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192
diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h
index 46fba13f90..1968246b75 100644
--- a/src/core/iomgr/tcp_server.h
+++ b/src/core/iomgr/tcp_server.h
@@ -49,8 +49,8 @@ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep);
grpc_tcp_server *grpc_tcp_server_create();
/* Start listening to bound ports */
-void grpc_tcp_server_start(grpc_tcp_server *server, grpc_tcp_server_cb cb,
- void *cb_arg);
+void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset *pollset,
+ grpc_tcp_server_cb cb, void *cb_arg);
/* Add a port to the server, returning true on success, or false otherwise.
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index 2abaf15ce4..5ed517748a 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -45,7 +45,7 @@
#include <string.h>
#include <errno.h>
-#include "src/core/iomgr/iomgr_libevent.h"
+#include "src/core/iomgr/pollset_posix.h"
#include "src/core/iomgr/sockaddr_utils.h"
#include "src/core/iomgr/socket_utils_posix.h"
#include "src/core/iomgr/tcp_posix.h"
@@ -97,13 +97,8 @@ grpc_tcp_server *grpc_tcp_server_create() {
return s;
}
-static void done_destroy(void *p, grpc_iomgr_cb_status status) {
- gpr_event_set(p, (void *)1);
-}
-
void grpc_tcp_server_destroy(grpc_tcp_server *s) {
size_t i;
- gpr_event fd_done;
gpr_mu_lock(&s->mu);
/* shutdown all fd's */
for (i = 0; i < s->nports; i++) {
@@ -118,9 +113,7 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s) {
/* delete ALL the things */
for (i = 0; i < s->nports; i++) {
server_port *sp = &s->ports[i];
- gpr_event_init(&fd_done);
- grpc_fd_destroy(sp->emfd, done_destroy, &fd_done);
- gpr_event_wait(&fd_done, gpr_inf_future);
+ grpc_fd_orphan(sp->emfd, NULL, NULL);
}
gpr_free(s->ports);
gpr_free(s);
@@ -196,10 +189,10 @@ error:
}
/* event manager callback when reads are ready */
-static void on_read(void *arg, grpc_iomgr_cb_status status) {
+static void on_read(void *arg, int success) {
server_port *sp = arg;
- if (status != GRPC_CALLBACK_SUCCESS) {
+ if (!success) {
goto error;
}
@@ -215,7 +208,7 @@ static void on_read(void *arg, grpc_iomgr_cb_status status) {
case EINTR:
continue;
case EAGAIN:
- grpc_fd_notify_on_read(sp->emfd, on_read, sp, gpr_inf_future);
+ grpc_fd_notify_on_read(sp->emfd, on_read, sp);
return;
default:
gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno));
@@ -254,15 +247,10 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
s->ports = gpr_realloc(s->ports, sizeof(server_port *) * s->port_capacity);
}
sp = &s->ports[s->nports++];
- sp->emfd = grpc_fd_create(fd);
- sp->fd = fd;
sp->server = s;
- /* initialize the em desc */
- if (sp->emfd == NULL) {
- s->nports--;
- gpr_mu_unlock(&s->mu);
- return 0;
- }
+ sp->fd = fd;
+ sp->emfd = grpc_fd_create(fd);
+ GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
return 1;
@@ -319,8 +307,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, int index) {
return (0 <= index && index < s->nports) ? s->ports[index].fd : -1;
}
-void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb,
- void *cb_arg) {
+void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset *pollset,
+ grpc_tcp_server_cb cb, void *cb_arg) {
size_t i;
GPR_ASSERT(cb);
gpr_mu_lock(&s->mu);
@@ -329,8 +317,10 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_tcp_server_cb cb,
s->cb = cb;
s->cb_arg = cb_arg;
for (i = 0; i < s->nports; i++) {
- grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i],
- gpr_inf_future);
+ if (pollset) {
+ grpc_pollset_add_fd(pollset, s->ports[i].emfd);
+ }
+ grpc_fd_notify_on_read(s->ports[i].emfd, on_read, &s->ports[i]);
s->active_ports++;
}
gpr_mu_unlock(&s->mu);