aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr')
-rw-r--r--src/core/iomgr/endpoint_pair.h3
-rw-r--r--src/core/iomgr/endpoint_pair_posix.c11
-rw-r--r--src/core/iomgr/fd_posix.c30
-rw-r--r--src/core/iomgr/fd_posix.h4
-rw-r--r--src/core/iomgr/iomgr.c139
-rw-r--r--src/core/iomgr/iomgr.h9
-rw-r--r--src/core/iomgr/iomgr_internal.h3
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c2
-rw-r--r--src/core/iomgr/pollset_posix.c10
-rw-r--r--src/core/iomgr/tcp_client.h1
-rw-r--r--src/core/iomgr/tcp_client_posix.c3
-rw-r--r--src/core/iomgr/tcp_posix.c2
-rw-r--r--src/core/iomgr/tcp_server_posix.c9
-rw-r--r--src/core/iomgr/udp_server.c6
-rw-r--r--src/core/iomgr/wakeup_fd_eventfd.c2
-rw-r--r--src/core/iomgr/wakeup_fd_pipe.c4
-rw-r--r--src/core/iomgr/workqueue.h66
-rw-r--r--src/core/iomgr/workqueue_posix.c125
-rw-r--r--src/core/iomgr/workqueue_posix.h52
-rw-r--r--src/core/iomgr/workqueue_windows.c40
-rw-r--r--src/core/iomgr/workqueue_windows.h37
21 files changed, 384 insertions, 174 deletions
diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h
index 095ec5fcc9..25ef1891fb 100644
--- a/src/core/iomgr/endpoint_pair.h
+++ b/src/core/iomgr/endpoint_pair.h
@@ -42,6 +42,7 @@ typedef struct {
} grpc_endpoint_pair;
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
- size_t read_slice_size);
+ size_t read_slice_size,
+ grpc_workqueue *workqueue);
#endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c
index deae9c6875..dc1f441b4b 100644
--- a/src/core/iomgr/endpoint_pair_posix.c
+++ b/src/core/iomgr/endpoint_pair_posix.c
@@ -59,19 +59,20 @@ static void create_sockets(int sv[2]) {
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
- size_t read_slice_size) {
+ size_t read_slice_size,
+ grpc_workqueue *workqueue) {
int sv[2];
grpc_endpoint_pair p;
char *final_name;
create_sockets(sv);
gpr_asprintf(&final_name, "%s:client", name);
- p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size,
- "socketpair-server");
+ p.client = grpc_tcp_create(grpc_fd_create(sv[1], workqueue, final_name),
+ read_slice_size, "socketpair-server");
gpr_free(final_name);
gpr_asprintf(&final_name, "%s:server", name);
- p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size,
- "socketpair-client");
+ p.server = grpc_tcp_create(grpc_fd_create(sv[0], workqueue, final_name),
+ read_slice_size, "socketpair-client");
gpr_free(final_name);
return p;
}
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 38a543e36e..69518597d5 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -71,6 +71,9 @@ static grpc_fd *fd_freelist = NULL;
static gpr_mu fd_freelist_mu;
static void freelist_fd(grpc_fd *fd) {
+ if (fd->workqueue->wakeup_read_fd != fd) {
+ grpc_workqueue_unref(fd->workqueue);
+ }
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
@@ -158,8 +161,14 @@ void grpc_fd_global_shutdown(void) {
gpr_mu_destroy(&fd_freelist_mu);
}
-grpc_fd *grpc_fd_create(int fd, const char *name) {
+grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) {
grpc_fd *r = alloc_fd(fd);
+ r->workqueue = workqueue;
+ /* if the wakeup_read_fd is NULL, then the workqueue is under construction
+ ==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */
+ if (workqueue->wakeup_read_fd != NULL) {
+ grpc_workqueue_ref(workqueue);
+ }
grpc_iomgr_register_object(&r->iomgr_object, name);
return r;
}
@@ -219,7 +228,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done,
fd->closed = 1;
close(fd->fd);
if (fd->on_done_closure) {
- grpc_iomgr_add_callback(fd->on_done_closure);
+ grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1);
}
} else {
wake_all_watchers_locked(fd);
@@ -245,19 +254,19 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
#endif
static void process_callback(grpc_iomgr_closure *closure, int success,
- int allow_synchronous_callback) {
- if (allow_synchronous_callback) {
+ grpc_workqueue *optional_workqueue) {
+ if (optional_workqueue == NULL) {
closure->cb(closure->cb_arg, success);
} else {
- grpc_iomgr_add_delayed_callback(closure, success);
+ grpc_workqueue_push(optional_workqueue, closure, success);
}
}
static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
- int success, int allow_synchronous_callback) {
+ int success, grpc_workqueue *optional_workqueue) {
size_t i;
for (i = 0; i < n; i++) {
- process_callback(callbacks + i, success, allow_synchronous_callback);
+ process_callback(callbacks + i, success, optional_workqueue);
}
}
@@ -285,7 +294,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
- allow_synchronous_callback);
+ allow_synchronous_callback ? NULL : fd->workqueue);
return;
default: /* WAITING */
/* upcallptr was set to a different closure. This is an error! */
@@ -338,7 +347,8 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
success = !gpr_atm_acq_load(&fd->shutdown);
GPR_ASSERT(ncb <= 1);
if (ncb > 0) {
- process_callbacks(closure, ncb, success, allow_synchronous_callback);
+ process_callbacks(closure, ncb, success,
+ allow_synchronous_callback ? NULL : fd->workqueue);
}
}
@@ -440,7 +450,7 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) {
fd->closed = 1;
close(fd->fd);
if (fd->on_done_closure != NULL) {
- grpc_iomgr_add_callback(fd->on_done_closure);
+ grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1);
}
}
gpr_mu_unlock(&fd->watcher_mu);
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index 835e9b339a..e5157ad342 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -36,6 +36,7 @@
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/workqueue.h"
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@@ -57,6 +58,7 @@ struct grpc_fd {
meaning that mostly we ref by two to avoid altering the orphaned bit,
and just unref by 1 when we're ready to flag the object as orphaned */
gpr_atm refst;
+ grpc_workqueue *workqueue;
gpr_mu set_state_mu;
gpr_atm shutdown;
@@ -103,7 +105,7 @@ struct grpc_fd {
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
This takes ownership of closing fd. */
-grpc_fd *grpc_fd_create(int fd, const char *name);
+grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name);
/* Releases fd to be asynchronously destroyed.
on_done is called when the underlying file descriptor is definitely close()d.
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index d6ca5d1f71..5d1fc68767 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -48,39 +48,9 @@
static gpr_mu g_mu;
static gpr_cv g_rcv;
-static grpc_iomgr_closure *g_cbs_head = NULL;
-static grpc_iomgr_closure *g_cbs_tail = NULL;
static int g_shutdown;
-static gpr_event g_background_callback_executor_done;
static grpc_iomgr_object g_root_object;
-/* Execute followup callbacks continuously.
- Other threads may check in and help during pollset_work() */
-static void background_callback_executor(void *ignored) {
- gpr_mu_lock(&g_mu);
- while (!g_shutdown) {
- gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
- gpr_timespec short_deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(100, GPR_TIMESPAN));
- if (g_cbs_head) {
- grpc_iomgr_closure *closure = g_cbs_head;
- g_cbs_head = closure->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
- closure->cb(closure->cb_arg, closure->success);
- gpr_mu_lock(&g_mu);
- } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_MONOTONIC),
- &deadline)) {
- } else {
- gpr_mu_unlock(&g_mu);
- gpr_sleep_until(gpr_time_min(short_deadline, deadline));
- gpr_mu_lock(&g_mu);
- }
- }
- gpr_mu_unlock(&g_mu);
- gpr_event_set(&g_background_callback_executor_done, (void *)1);
-}
-
void grpc_kick_poller(void) {
/* Empty. The background callback executor polls periodically. The activity
* the kicker is trying to draw the executor's attention to will be picked up
@@ -89,7 +59,6 @@ void grpc_kick_poller(void) {
}
void grpc_iomgr_init(void) {
- gpr_thd_id id;
g_shutdown = 0;
gpr_mu_init(&g_mu);
gpr_cv_init(&g_rcv);
@@ -97,8 +66,6 @@ void grpc_iomgr_init(void) {
g_root_object.next = g_root_object.prev = &g_root_object;
g_root_object.name = "root";
grpc_iomgr_platform_init();
- gpr_event_init(&g_background_callback_executor_done);
- gpr_thd_new(&id, background_callback_executor, NULL, NULL);
}
static size_t count_objects(void) {
@@ -118,58 +85,36 @@ static void dump_objects(const char *kind) {
}
void grpc_iomgr_shutdown(void) {
- grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN));
gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
gpr_mu_lock(&g_mu);
g_shutdown = 1;
- while (g_cbs_head != NULL || g_root_object.next != &g_root_object) {
+ while (g_root_object.next != &g_root_object) {
if (gpr_time_cmp(
gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time),
gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
- if (g_cbs_head != NULL && g_root_object.next != &g_root_object) {
- gpr_log(GPR_DEBUG,
- "Waiting for %d iomgr objects to be destroyed and executing "
- "final callbacks",
- count_objects());
- } else if (g_cbs_head != NULL) {
- gpr_log(GPR_DEBUG, "Executing final iomgr callbacks");
- } else {
+ if (g_root_object.next != &g_root_object) {
gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed",
count_objects());
}
last_warning_time = gpr_now(GPR_CLOCK_REALTIME);
}
- if (g_cbs_head) {
- do {
- closure = g_cbs_head;
- g_cbs_head = closure->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
-
- closure->cb(closure->cb_arg, 0);
- gpr_mu_lock(&g_mu);
- } while (g_cbs_head);
- continue;
- }
if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) {
continue;
}
if (g_root_object.next != &g_root_object) {
int timeout = 0;
- while (g_cbs_head == NULL) {
- gpr_timespec short_deadline = gpr_time_add(
+ gpr_timespec short_deadline = gpr_time_add(
gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN));
- if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) {
- if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
- timeout = 1;
- break;
- }
+ if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) {
+ if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) {
+ timeout = 1;
+ break;
}
}
- if (timeout) {
+ if (timeout && g_root_object.next != &g_root_object) {
gpr_log(GPR_DEBUG,
"Failed to free %d iomgr objects before shutdown deadline: "
"memory leaks are likely",
@@ -183,10 +128,6 @@ void grpc_iomgr_shutdown(void) {
memset(&g_root_object, 0, sizeof(g_root_object));
- grpc_kick_poller();
- gpr_event_wait(&g_background_callback_executor_done,
- gpr_inf_future(GPR_CLOCK_REALTIME));
-
grpc_alarm_list_shutdown();
grpc_iomgr_platform_shutdown();
@@ -218,67 +159,3 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
closure->cb_arg = cb_arg;
closure->next = NULL;
}
-
-static void assert_not_scheduled_locked(grpc_iomgr_closure *closure) {
-#ifndef NDEBUG
- grpc_iomgr_closure *c;
-
- for (c = g_cbs_head; c; c = c->next) {
- GPR_ASSERT(c != closure);
- }
-#endif
-}
-
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) {
- closure->success = success;
- GPR_ASSERT(closure->cb);
- gpr_mu_lock(&g_mu);
- assert_not_scheduled_locked(closure);
- closure->next = NULL;
- if (!g_cbs_tail) {
- g_cbs_head = g_cbs_tail = closure;
- } else {
- g_cbs_tail->next = closure;
- g_cbs_tail = closure;
- }
- if (g_shutdown) {
- gpr_cv_signal(&g_rcv);
- }
- gpr_mu_unlock(&g_mu);
-}
-
-void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) {
- grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */);
-}
-
-int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
- int n = 0;
- gpr_mu *retake_mu = NULL;
- grpc_iomgr_closure *closure;
- for (;;) {
- /* check for new work */
- if (!gpr_mu_trylock(&g_mu)) {
- break;
- }
- closure = g_cbs_head;
- if (!closure) {
- gpr_mu_unlock(&g_mu);
- break;
- }
- g_cbs_head = closure->next;
- if (!g_cbs_head) g_cbs_tail = NULL;
- gpr_mu_unlock(&g_mu);
- /* if we have a mutex to drop, do so before executing work */
- if (drop_mu) {
- gpr_mu_unlock(drop_mu);
- retake_mu = drop_mu;
- drop_mu = NULL;
- }
- closure->cb(closure->cb_arg, success && closure->success);
- n++;
- }
- if (retake_mu) {
- gpr_mu_lock(retake_mu);
- }
- return n;
-}
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 261c17366a..f1d2e6439d 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -68,13 +68,4 @@ void grpc_iomgr_init(void);
/** Signals the intention to shutdown the iomgr. */
void grpc_iomgr_shutdown(void);
-/** Registers a closure to be invoked at some point in the future.
- *
- * Can be called from within a callback or from anywhere else */
-void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
-
-/** As per grpc_iomgr_add_callback, with the ability to set the success
- argument. */
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
-
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h
index 4cec973ba0..f266732c96 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_internal.h
@@ -43,9 +43,6 @@ typedef struct grpc_iomgr_object {
struct grpc_iomgr_object *prev;
} grpc_iomgr_object;
-int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
-
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name);
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index 481bdc4ede..12f34c1a19 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -127,7 +127,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
GRPC_FD_REF(fd, "delayed_add");
grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da);
pollset->in_flight_cbs++;
- grpc_iomgr_add_callback(&da->closure);
+ grpc_workqueue_push(fd->workqueue, &da->closure, 1);
}
}
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index dec2f5490f..8bde41a146 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -140,6 +140,9 @@ void grpc_pollset_init(grpc_pollset *pollset) {
}
void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
+ if (fd->workqueue->wakeup_read_fd != fd) {
+ grpc_pollset_add_fd(pollset, fd->workqueue->wakeup_read_fd);
+ }
gpr_mu_lock(&pollset->mu);
pollset->vtable->add_fd(pollset, fd, 1);
/* the following (enabled only in debug) will reacquire and then release
@@ -178,9 +181,6 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker,
worker->next = worker->prev = NULL;
/* TODO(ctiller): pool these */
grpc_wakeup_fd_init(&worker->wakeup_fd);
- if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) {
- goto done;
- }
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
goto done;
}
@@ -296,7 +296,7 @@ static void basic_do_promote(void *args, int success) {
/* First we need to ensure that nobody is polling concurrently */
if (grpc_pollset_has_workers(pollset)) {
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
- grpc_iomgr_add_callback(&up_args->promotion_closure);
+ grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
gpr_mu_unlock(&pollset->mu);
return;
}
@@ -388,7 +388,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd,
up_args->original_vtable = pollset->vtable;
up_args->promotion_closure.cb = basic_do_promote;
up_args->promotion_closure.cb_arg = up_args;
- grpc_iomgr_add_callback(&up_args->promotion_closure);
+ grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1);
grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h
index 12296bd55b..57f80016c2 100644
--- a/src/core/iomgr/tcp_client.h
+++ b/src/core/iomgr/tcp_client.h
@@ -46,6 +46,7 @@
in this connection being established (in order to continue their work) */
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp),
void *arg, grpc_pollset_set *interested_parties,
+ grpc_workqueue *workqueue,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline);
diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c
index c3668f6a92..8b1a3b0f9e 100644
--- a/src/core/iomgr/tcp_client_posix.c
+++ b/src/core/iomgr/tcp_client_posix.c
@@ -195,6 +195,7 @@ finish:
void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
void *arg, grpc_pollset_set *interested_parties,
+ grpc_workqueue *workqueue,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) {
int fd;
@@ -236,7 +237,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep),
addr_str = grpc_sockaddr_to_uri(addr);
gpr_asprintf(&name, "tcp-client:%s", addr_str);
- fdobj = grpc_fd_create(fd, name);
+ fdobj = grpc_fd_create(fd, workqueue, name);
if (err >= 0) {
cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str));
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index 68f469c368..c539cf2d34 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -261,7 +261,7 @@ static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep,
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
- grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1);
+ grpc_workqueue_push(tcp->em_fd->workqueue, &tcp->read_closure, 1);
}
/* TODO(ctiller): immediate return */
return GRPC_ENDPOINT_PENDING;
diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c
index bcbd0afe6b..02d37350f7 100644
--- a/src/core/iomgr/tcp_server_posix.c
+++ b/src/core/iomgr/tcp_server_posix.c
@@ -124,6 +124,9 @@ struct grpc_tcp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
+
+ /** workqueue for interally created async work */
+ grpc_workqueue *workqueue;
};
grpc_tcp_server *grpc_tcp_server_create(void) {
@@ -137,6 +140,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) {
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
+ s->workqueue = grpc_workqueue_create();
return s;
}
@@ -147,6 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) {
gpr_mu_destroy(&s->mu);
gpr_free(s->ports);
+ grpc_workqueue_unref(s->workqueue);
gpr_free(s);
}
@@ -339,7 +344,7 @@ static void on_read(void *arg, int success) {
addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr);
gpr_asprintf(&name, "tcp-server-connection:%s", addr_str);
- fdobj = grpc_fd_create(fd, name);
+ fdobj = grpc_fd_create(fd, sp->server->workqueue, name);
/* TODO(ctiller): revise this when we have server-side sharding
of channels -- we certainly should not be automatically adding every
incoming channel to every pollset owned by the server */
@@ -387,7 +392,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, s->workqueue, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
GPR_ASSERT(sp->emfd);
diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c
index 7957066598..96688054fb 100644
--- a/src/core/iomgr/udp_server.c
+++ b/src/core/iomgr/udp_server.c
@@ -118,6 +118,8 @@ struct grpc_udp_server {
grpc_pollset **pollsets;
/* number of pollsets in the pollsets array */
size_t pollset_count;
+
+ grpc_workqueue *workqueue;
};
grpc_udp_server *grpc_udp_server_create(void) {
@@ -130,6 +132,7 @@ grpc_udp_server *grpc_udp_server_create(void) {
s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
s->nports = 0;
s->port_capacity = INIT_PORT_CAP;
+ s->workqueue = grpc_workqueue_create();
return s;
}
@@ -141,6 +144,7 @@ static void finish_shutdown(grpc_udp_server *s) {
gpr_cv_destroy(&s->cv);
gpr_free(s->ports);
+ grpc_workqueue_unref(s->workqueue);
gpr_free(s);
}
@@ -309,7 +313,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
sp = &s->ports[s->nports++];
sp->server = s;
sp->fd = fd;
- sp->emfd = grpc_fd_create(fd, name);
+ sp->emfd = grpc_fd_create(fd, s->workqueue, name);
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->read_cb = read_cb;
diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c
index 08fdc74f17..ba622eb1ee 100644
--- a/src/core/iomgr/wakeup_fd_eventfd.c
+++ b/src/core/iomgr/wakeup_fd_eventfd.c
@@ -66,7 +66,7 @@ static void eventfd_wakeup(grpc_wakeup_fd *fd_info) {
}
static void eventfd_destroy(grpc_wakeup_fd *fd_info) {
- close(fd_info->read_fd);
+ if (fd_info->read_fd != 0) close(fd_info->read_fd);
}
static int eventfd_check_availability(void) {
diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c
index 902034ee4b..63e3a22811 100644
--- a/src/core/iomgr/wakeup_fd_pipe.c
+++ b/src/core/iomgr/wakeup_fd_pipe.c
@@ -81,8 +81,8 @@ static void pipe_wakeup(grpc_wakeup_fd *fd_info) {
}
static void pipe_destroy(grpc_wakeup_fd *fd_info) {
- close(fd_info->read_fd);
- close(fd_info->write_fd);
+ if (fd_info->read_fd != 0) close(fd_info->read_fd);
+ if (fd_info->write_fd != 0) close(fd_info->write_fd);
}
static int pipe_check_availability(void) {
diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h
new file mode 100644
index 0000000000..0bfa959953
--- /dev/null
+++ b/src/core/iomgr/workqueue.h
@@ -0,0 +1,66 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_H
+#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_H
+
+#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/pollset.h"
+
+#ifdef GPR_POSIX_SOCKET
+#include "src/core/iomgr/workqueue_posix.h"
+#endif
+
+#ifdef GPR_WIN32
+#include "src/core/iomgr/workqueue_windows.h"
+#endif
+
+/** A workqueue represents a list of work to be executed asynchronously. */
+struct grpc_workqueue;
+typedef struct grpc_workqueue grpc_workqueue;
+
+/** Create a work queue */
+grpc_workqueue *grpc_workqueue_create(void);
+
+void grpc_workqueue_ref(grpc_workqueue *workqueue);
+void grpc_workqueue_unref(grpc_workqueue *workqueue);
+
+/** Bind this workqueue to a pollset */
+void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
+ grpc_pollset *pollset);
+
+/** Add a work item to a workqueue */
+void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
+ int success);
+
+#endif
diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c
new file mode 100644
index 0000000000..26626bef3b
--- /dev/null
+++ b/src/core/iomgr/workqueue_posix.c
@@ -0,0 +1,125 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SOCKET
+
+#include "src/core/iomgr/fd_posix.h"
+#include "src/core/iomgr/workqueue.h"
+
+#include <stdio.h>
+
+#include <grpc/support/alloc.h>
+
+static void on_readable(void *arg, int success);
+
+grpc_workqueue *grpc_workqueue_create(void) {
+ char name[32];
+ grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue));
+ gpr_ref_init(&workqueue->refs, 1);
+ gpr_mu_init(&workqueue->mu);
+ workqueue->head.next = NULL;
+ workqueue->tail = &workqueue->head;
+ grpc_wakeup_fd_init(&workqueue->wakeup_fd);
+ sprintf(name, "workqueue:%p", (void *)workqueue);
+ workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */
+ workqueue->wakeup_read_fd = grpc_fd_create(
+ GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name);
+ grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue);
+ grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
+ return workqueue;
+}
+
+static void workqueue_destroy(grpc_workqueue *workqueue) {
+ grpc_fd_shutdown(workqueue->wakeup_read_fd);
+}
+
+void grpc_workqueue_ref(grpc_workqueue *workqueue) {
+ gpr_ref(&workqueue->refs);
+}
+
+void grpc_workqueue_unref(grpc_workqueue *workqueue) {
+ if (gpr_unref(&workqueue->refs)) {
+ workqueue_destroy(workqueue);
+ }
+}
+
+void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue,
+ grpc_pollset *pollset) {
+ grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd);
+}
+
+static void on_readable(void *arg, int success) {
+ grpc_workqueue *workqueue = arg;
+ grpc_iomgr_closure *todo;
+
+ if (!success) {
+ gpr_mu_destroy(&workqueue->mu);
+ /* HACK: let wakeup_fd code know that we stole the fd */
+ workqueue->wakeup_fd.read_fd = 0;
+ grpc_wakeup_fd_destroy(&workqueue->wakeup_fd);
+ grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy");
+ gpr_free(workqueue);
+ return;
+ } else {
+ gpr_mu_lock(&workqueue->mu);
+ todo = workqueue->head.next;
+ workqueue->head.next = NULL;
+ workqueue->tail = &workqueue->head;
+ grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd);
+ gpr_mu_unlock(&workqueue->mu);
+ grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure);
+
+ while (todo) {
+ grpc_iomgr_closure *next = todo->next;
+ todo->cb(todo->cb_arg, todo->success);
+ todo = next;
+ }
+ }
+}
+
+void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure,
+ int success) {
+ closure->success = success;
+ closure->next = NULL;
+ gpr_mu_lock(&workqueue->mu);
+ if (workqueue->tail == &workqueue->head) {
+ grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd);
+ }
+ workqueue->tail->next = closure;
+ workqueue->tail = closure;
+ gpr_mu_unlock(&workqueue->mu);
+}
+
+#endif /* GPR_POSIX_SOCKET */
diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h
new file mode 100644
index 0000000000..1b3a0e281b
--- /dev/null
+++ b/src/core/iomgr/workqueue_posix.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
+#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H
+
+struct grpc_fd;
+
+struct grpc_workqueue {
+ gpr_refcount refs;
+
+ gpr_mu mu;
+ grpc_iomgr_closure head;
+ grpc_iomgr_closure *tail;
+
+ grpc_wakeup_fd wakeup_fd;
+ struct grpc_fd *wakeup_read_fd;
+
+ grpc_iomgr_closure read_closure;
+};
+
+#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */
diff --git a/src/core/iomgr/workqueue_windows.c b/src/core/iomgr/workqueue_windows.c
new file mode 100644
index 0000000000..f9ca57557b
--- /dev/null
+++ b/src/core/iomgr/workqueue_windows.c
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_WIN32
+
+#include "src/core/iomgr/workqueue.h"
+
+#endif /* GPR_WIN32 */
diff --git a/src/core/iomgr/workqueue_windows.h b/src/core/iomgr/workqueue_windows.h
new file mode 100644
index 0000000000..941f195f51
--- /dev/null
+++ b/src/core/iomgr/workqueue_windows.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_WINDOWS_H
+#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_WINDOWS_H
+
+#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_WINDOWS_H */