diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/endpoint_pair.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/endpoint_pair_posix.c | 11 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 30 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 139 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.h | 9 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_internal.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 10 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/tcp_client_posix.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_posix.c | 9 | ||||
-rw-r--r-- | src/core/iomgr/udp_server.c | 6 | ||||
-rw-r--r-- | src/core/iomgr/wakeup_fd_eventfd.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/wakeup_fd_pipe.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/workqueue.h | 66 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.c | 125 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_posix.h | 52 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_windows.c | 40 | ||||
-rw-r--r-- | src/core/iomgr/workqueue_windows.h | 37 |
21 files changed, 384 insertions, 174 deletions
diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h index 095ec5fcc9..25ef1891fb 100644 --- a/src/core/iomgr/endpoint_pair.h +++ b/src/core/iomgr/endpoint_pair.h @@ -42,6 +42,7 @@ typedef struct { } grpc_endpoint_pair; grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size); + size_t read_slice_size, + grpc_workqueue *workqueue); #endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index deae9c6875..dc1f441b4b 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -59,19 +59,20 @@ static void create_sockets(int sv[2]) { } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size) { + size_t read_slice_size, + grpc_workqueue *workqueue) { int sv[2]; grpc_endpoint_pair p; char *final_name; create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size, - "socketpair-server"); + p.client = grpc_tcp_create(grpc_fd_create(sv[1], workqueue, final_name), + read_slice_size, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size, - "socketpair-client"); + p.server = grpc_tcp_create(grpc_fd_create(sv[0], workqueue, final_name), + read_slice_size, "socketpair-client"); gpr_free(final_name); return p; } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 38a543e36e..69518597d5 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -71,6 +71,9 @@ static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; static void freelist_fd(grpc_fd *fd) { + if (fd->workqueue->wakeup_read_fd != fd) { + grpc_workqueue_unref(fd->workqueue); + } gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -158,8 +161,14 @@ void grpc_fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -grpc_fd *grpc_fd_create(int fd, const char *name) { +grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) { grpc_fd *r = alloc_fd(fd); + r->workqueue = workqueue; + /* if the wakeup_read_fd is NULL, then the workqueue is under construction + ==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */ + if (workqueue->wakeup_read_fd != NULL) { + grpc_workqueue_ref(workqueue); + } grpc_iomgr_register_object(&r->iomgr_object, name); return r; } @@ -219,7 +228,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_closure *on_done, fd->closed = 1; close(fd->fd); if (fd->on_done_closure) { - grpc_iomgr_add_callback(fd->on_done_closure); + grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1); } } else { wake_all_watchers_locked(fd); @@ -245,19 +254,19 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif static void process_callback(grpc_iomgr_closure *closure, int success, - int allow_synchronous_callback) { - if (allow_synchronous_callback) { + grpc_workqueue *optional_workqueue) { + if (optional_workqueue == NULL) { closure->cb(closure->cb_arg, success); } else { - grpc_iomgr_add_delayed_callback(closure, success); + grpc_workqueue_push(optional_workqueue, closure, success); } } static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n, - int success, int allow_synchronous_callback) { + int success, grpc_workqueue *optional_workqueue) { size_t i; for (i = 0; i < n; i++) { - process_callback(callbacks + i, success, allow_synchronous_callback); + process_callback(callbacks + i, success, optional_workqueue); } } @@ -285,7 +294,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), - allow_synchronous_callback); + allow_synchronous_callback ? NULL : fd->workqueue); return; default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ @@ -338,7 +347,8 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, success = !gpr_atm_acq_load(&fd->shutdown); GPR_ASSERT(ncb <= 1); if (ncb > 0) { - process_callbacks(closure, ncb, success, allow_synchronous_callback); + process_callbacks(closure, ncb, success, + allow_synchronous_callback ? NULL : fd->workqueue); } } @@ -440,7 +450,7 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { fd->closed = 1; close(fd->fd); if (fd->on_done_closure != NULL) { - grpc_iomgr_add_callback(fd->on_done_closure); + grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1); } } gpr_mu_unlock(&fd->watcher_mu); diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 835e9b339a..e5157ad342 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -36,6 +36,7 @@ #include "src/core/iomgr/iomgr_internal.h" #include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/workqueue.h" #include <grpc/support/atm.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> @@ -57,6 +58,7 @@ struct grpc_fd { meaning that mostly we ref by two to avoid altering the orphaned bit, and just unref by 1 when we're ready to flag the object as orphaned */ gpr_atm refst; + grpc_workqueue *workqueue; gpr_mu set_state_mu; gpr_atm shutdown; @@ -103,7 +105,7 @@ struct grpc_fd { /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. This takes ownership of closing fd. */ -grpc_fd *grpc_fd_create(int fd, const char *name); +grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name); /* Releases fd to be asynchronously destroyed. on_done is called when the underlying file descriptor is definitely close()d. diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index d6ca5d1f71..5d1fc68767 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -48,39 +48,9 @@ static gpr_mu g_mu; static gpr_cv g_rcv; -static grpc_iomgr_closure *g_cbs_head = NULL; -static grpc_iomgr_closure *g_cbs_tail = NULL; static int g_shutdown; -static gpr_event g_background_callback_executor_done; static grpc_iomgr_object g_root_object; -/* Execute followup callbacks continuously. - Other threads may check in and help during pollset_work() */ -static void background_callback_executor(void *ignored) { - gpr_mu_lock(&g_mu); - while (!g_shutdown) { - gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); - gpr_timespec short_deadline = gpr_time_add( - gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_millis(100, GPR_TIMESPAN)); - if (g_cbs_head) { - grpc_iomgr_closure *closure = g_cbs_head; - g_cbs_head = closure->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - closure->cb(closure->cb_arg, closure->success); - gpr_mu_lock(&g_mu); - } else if (grpc_alarm_check(&g_mu, gpr_now(GPR_CLOCK_MONOTONIC), - &deadline)) { - } else { - gpr_mu_unlock(&g_mu); - gpr_sleep_until(gpr_time_min(short_deadline, deadline)); - gpr_mu_lock(&g_mu); - } - } - gpr_mu_unlock(&g_mu); - gpr_event_set(&g_background_callback_executor_done, (void *)1); -} - void grpc_kick_poller(void) { /* Empty. The background callback executor polls periodically. The activity * the kicker is trying to draw the executor's attention to will be picked up @@ -89,7 +59,6 @@ void grpc_kick_poller(void) { } void grpc_iomgr_init(void) { - gpr_thd_id id; g_shutdown = 0; gpr_mu_init(&g_mu); gpr_cv_init(&g_rcv); @@ -97,8 +66,6 @@ void grpc_iomgr_init(void) { g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_iomgr_platform_init(); - gpr_event_init(&g_background_callback_executor_done); - gpr_thd_new(&id, background_callback_executor, NULL, NULL); } static size_t count_objects(void) { @@ -118,58 +85,36 @@ static void dump_objects(const char *kind) { } void grpc_iomgr_shutdown(void) { - grpc_iomgr_closure *closure; gpr_timespec shutdown_deadline = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN)); gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME); gpr_mu_lock(&g_mu); g_shutdown = 1; - while (g_cbs_head != NULL || g_root_object.next != &g_root_object) { + while (g_root_object.next != &g_root_object) { if (gpr_time_cmp( gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), last_warning_time), gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) { - if (g_cbs_head != NULL && g_root_object.next != &g_root_object) { - gpr_log(GPR_DEBUG, - "Waiting for %d iomgr objects to be destroyed and executing " - "final callbacks", - count_objects()); - } else if (g_cbs_head != NULL) { - gpr_log(GPR_DEBUG, "Executing final iomgr callbacks"); - } else { + if (g_root_object.next != &g_root_object) { gpr_log(GPR_DEBUG, "Waiting for %d iomgr objects to be destroyed", count_objects()); } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (g_cbs_head) { - do { - closure = g_cbs_head; - g_cbs_head = closure->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - - closure->cb(closure->cb_arg, 0); - gpr_mu_lock(&g_mu); - } while (g_cbs_head); - continue; - } if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { continue; } if (g_root_object.next != &g_root_object) { int timeout = 0; - while (g_cbs_head == NULL) { - gpr_timespec short_deadline = gpr_time_add( + gpr_timespec short_deadline = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_millis(100, GPR_TIMESPAN)); - if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline) && g_cbs_head == NULL) { - if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { - timeout = 1; - break; - } + if (gpr_cv_wait(&g_rcv, &g_mu, short_deadline)) { + if (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), shutdown_deadline) > 0) { + timeout = 1; + break; } } - if (timeout) { + if (timeout && g_root_object.next != &g_root_object) { gpr_log(GPR_DEBUG, "Failed to free %d iomgr objects before shutdown deadline: " "memory leaks are likely", @@ -183,10 +128,6 @@ void grpc_iomgr_shutdown(void) { memset(&g_root_object, 0, sizeof(g_root_object)); - grpc_kick_poller(); - gpr_event_wait(&g_background_callback_executor_done, - gpr_inf_future(GPR_CLOCK_REALTIME)); - grpc_alarm_list_shutdown(); grpc_iomgr_platform_shutdown(); @@ -218,67 +159,3 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, closure->cb_arg = cb_arg; closure->next = NULL; } - -static void assert_not_scheduled_locked(grpc_iomgr_closure *closure) { -#ifndef NDEBUG - grpc_iomgr_closure *c; - - for (c = g_cbs_head; c; c = c->next) { - GPR_ASSERT(c != closure); - } -#endif -} - -void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { - closure->success = success; - GPR_ASSERT(closure->cb); - gpr_mu_lock(&g_mu); - assert_not_scheduled_locked(closure); - closure->next = NULL; - if (!g_cbs_tail) { - g_cbs_head = g_cbs_tail = closure; - } else { - g_cbs_tail->next = closure; - g_cbs_tail = closure; - } - if (g_shutdown) { - gpr_cv_signal(&g_rcv); - } - gpr_mu_unlock(&g_mu); -} - -void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) { - grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */); -} - -int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) { - int n = 0; - gpr_mu *retake_mu = NULL; - grpc_iomgr_closure *closure; - for (;;) { - /* check for new work */ - if (!gpr_mu_trylock(&g_mu)) { - break; - } - closure = g_cbs_head; - if (!closure) { - gpr_mu_unlock(&g_mu); - break; - } - g_cbs_head = closure->next; - if (!g_cbs_head) g_cbs_tail = NULL; - gpr_mu_unlock(&g_mu); - /* if we have a mutex to drop, do so before executing work */ - if (drop_mu) { - gpr_mu_unlock(drop_mu); - retake_mu = drop_mu; - drop_mu = NULL; - } - closure->cb(closure->cb_arg, success && closure->success); - n++; - } - if (retake_mu) { - gpr_mu_lock(retake_mu); - } - return n; -} diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 261c17366a..f1d2e6439d 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -68,13 +68,4 @@ void grpc_iomgr_init(void); /** Signals the intention to shutdown the iomgr. */ void grpc_iomgr_shutdown(void); -/** Registers a closure to be invoked at some point in the future. - * - * Can be called from within a callback or from anywhere else */ -void grpc_iomgr_add_callback(grpc_iomgr_closure *closure); - -/** As per grpc_iomgr_add_callback, with the ability to set the success - argument. */ -void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); - #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */ diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index 4cec973ba0..f266732c96 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -43,9 +43,6 @@ typedef struct grpc_iomgr_object { struct grpc_iomgr_object *prev; } grpc_iomgr_object; -int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success); -void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success); - void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name); void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 481bdc4ede..12f34c1a19 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -127,7 +127,7 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, GRPC_FD_REF(fd, "delayed_add"); grpc_iomgr_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_iomgr_add_callback(&da->closure); + grpc_workqueue_push(fd->workqueue, &da->closure, 1); } } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index dec2f5490f..8bde41a146 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -140,6 +140,9 @@ void grpc_pollset_init(grpc_pollset *pollset) { } void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + if (fd->workqueue->wakeup_read_fd != fd) { + grpc_pollset_add_fd(pollset, fd->workqueue->wakeup_read_fd); + } gpr_mu_lock(&pollset->mu); pollset->vtable->add_fd(pollset, fd, 1); /* the following (enabled only in debug) will reacquire and then release @@ -178,9 +181,6 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); - if (grpc_maybe_call_delayed_callbacks(&pollset->mu, 1)) { - goto done; - } if (grpc_alarm_check(&pollset->mu, now, &deadline)) { goto done; } @@ -296,7 +296,7 @@ static void basic_do_promote(void *args, int success) { /* First we need to ensure that nobody is polling concurrently */ if (grpc_pollset_has_workers(pollset)) { grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - grpc_iomgr_add_callback(&up_args->promotion_closure); + grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1); gpr_mu_unlock(&pollset->mu); return; } @@ -388,7 +388,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, up_args->original_vtable = pollset->vtable; up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_iomgr_add_callback(&up_args->promotion_closure); + grpc_workqueue_push(fd->workqueue, &up_args->promotion_closure, 1); grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h index 12296bd55b..57f80016c2 100644 --- a/src/core/iomgr/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -46,6 +46,7 @@ in this connection being established (in order to continue their work) */ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), void *arg, grpc_pollset_set *interested_parties, + grpc_workqueue *workqueue, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline); diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index c3668f6a92..8b1a3b0f9e 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -195,6 +195,7 @@ finish: void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), void *arg, grpc_pollset_set *interested_parties, + grpc_workqueue *workqueue, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { int fd; @@ -236,7 +237,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), addr_str = grpc_sockaddr_to_uri(addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); - fdobj = grpc_fd_create(fd, name); + fdobj = grpc_fd_create(fd, workqueue, name); if (err >= 0) { cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 68f469c368..c539cf2d34 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -261,7 +261,7 @@ static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, tcp->finished_edge = 0; grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); } else { - grpc_iomgr_add_delayed_callback(&tcp->read_closure, 1); + grpc_workqueue_push(tcp->em_fd->workqueue, &tcp->read_closure, 1); } /* TODO(ctiller): immediate return */ return GRPC_ENDPOINT_PENDING; diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index bcbd0afe6b..02d37350f7 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -124,6 +124,9 @@ struct grpc_tcp_server { grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; + + /** workqueue for interally created async work */ + grpc_workqueue *workqueue; }; grpc_tcp_server *grpc_tcp_server_create(void) { @@ -137,6 +140,7 @@ grpc_tcp_server *grpc_tcp_server_create(void) { s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; + s->workqueue = grpc_workqueue_create(); return s; } @@ -147,6 +151,7 @@ static void finish_shutdown(grpc_tcp_server *s) { gpr_mu_destroy(&s->mu); gpr_free(s->ports); + grpc_workqueue_unref(s->workqueue); gpr_free(s); } @@ -339,7 +344,7 @@ static void on_read(void *arg, int success) { addr_str = grpc_sockaddr_to_uri((struct sockaddr *)&addr); gpr_asprintf(&name, "tcp-server-connection:%s", addr_str); - fdobj = grpc_fd_create(fd, name); + fdobj = grpc_fd_create(fd, sp->server->workqueue, name); /* TODO(ctiller): revise this when we have server-side sharding of channels -- we certainly should not be automatically adding every incoming channel to every pollset owned by the server */ @@ -387,7 +392,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, sp = &s->ports[s->nports++]; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, s->workqueue, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; GPR_ASSERT(sp->emfd); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 7957066598..96688054fb 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -118,6 +118,8 @@ struct grpc_udp_server { grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; + + grpc_workqueue *workqueue; }; grpc_udp_server *grpc_udp_server_create(void) { @@ -130,6 +132,7 @@ grpc_udp_server *grpc_udp_server_create(void) { s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; + s->workqueue = grpc_workqueue_create(); return s; } @@ -141,6 +144,7 @@ static void finish_shutdown(grpc_udp_server *s) { gpr_cv_destroy(&s->cv); gpr_free(s->ports); + grpc_workqueue_unref(s->workqueue); gpr_free(s); } @@ -309,7 +313,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, sp = &s->ports[s->nports++]; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, s->workqueue, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; sp->read_cb = read_cb; diff --git a/src/core/iomgr/wakeup_fd_eventfd.c b/src/core/iomgr/wakeup_fd_eventfd.c index 08fdc74f17..ba622eb1ee 100644 --- a/src/core/iomgr/wakeup_fd_eventfd.c +++ b/src/core/iomgr/wakeup_fd_eventfd.c @@ -66,7 +66,7 @@ static void eventfd_wakeup(grpc_wakeup_fd *fd_info) { } static void eventfd_destroy(grpc_wakeup_fd *fd_info) { - close(fd_info->read_fd); + if (fd_info->read_fd != 0) close(fd_info->read_fd); } static int eventfd_check_availability(void) { diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c index 902034ee4b..63e3a22811 100644 --- a/src/core/iomgr/wakeup_fd_pipe.c +++ b/src/core/iomgr/wakeup_fd_pipe.c @@ -81,8 +81,8 @@ static void pipe_wakeup(grpc_wakeup_fd *fd_info) { } static void pipe_destroy(grpc_wakeup_fd *fd_info) { - close(fd_info->read_fd); - close(fd_info->write_fd); + if (fd_info->read_fd != 0) close(fd_info->read_fd); + if (fd_info->write_fd != 0) close(fd_info->write_fd); } static int pipe_check_availability(void) { diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h new file mode 100644 index 0000000000..0bfa959953 --- /dev/null +++ b/src/core/iomgr/workqueue.h @@ -0,0 +1,66 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_H +#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_H + +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/pollset.h" + +#ifdef GPR_POSIX_SOCKET +#include "src/core/iomgr/workqueue_posix.h" +#endif + +#ifdef GPR_WIN32 +#include "src/core/iomgr/workqueue_windows.h" +#endif + +/** A workqueue represents a list of work to be executed asynchronously. */ +struct grpc_workqueue; +typedef struct grpc_workqueue grpc_workqueue; + +/** Create a work queue */ +grpc_workqueue *grpc_workqueue_create(void); + +void grpc_workqueue_ref(grpc_workqueue *workqueue); +void grpc_workqueue_unref(grpc_workqueue *workqueue); + +/** Bind this workqueue to a pollset */ +void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, + grpc_pollset *pollset); + +/** Add a work item to a workqueue */ +void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure, + int success); + +#endif diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c new file mode 100644 index 0000000000..26626bef3b --- /dev/null +++ b/src/core/iomgr/workqueue_posix.c @@ -0,0 +1,125 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_SOCKET + +#include "src/core/iomgr/fd_posix.h" +#include "src/core/iomgr/workqueue.h" + +#include <stdio.h> + +#include <grpc/support/alloc.h> + +static void on_readable(void *arg, int success); + +grpc_workqueue *grpc_workqueue_create(void) { + char name[32]; + grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); + gpr_ref_init(&workqueue->refs, 1); + gpr_mu_init(&workqueue->mu); + workqueue->head.next = NULL; + workqueue->tail = &workqueue->head; + grpc_wakeup_fd_init(&workqueue->wakeup_fd); + sprintf(name, "workqueue:%p", (void *)workqueue); + workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */ + workqueue->wakeup_read_fd = grpc_fd_create( + GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name); + grpc_iomgr_closure_init(&workqueue->read_closure, on_readable, workqueue); + grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); + return workqueue; +} + +static void workqueue_destroy(grpc_workqueue *workqueue) { + grpc_fd_shutdown(workqueue->wakeup_read_fd); +} + +void grpc_workqueue_ref(grpc_workqueue *workqueue) { + gpr_ref(&workqueue->refs); +} + +void grpc_workqueue_unref(grpc_workqueue *workqueue) { + if (gpr_unref(&workqueue->refs)) { + workqueue_destroy(workqueue); + } +} + +void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, + grpc_pollset *pollset) { + grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd); +} + +static void on_readable(void *arg, int success) { + grpc_workqueue *workqueue = arg; + grpc_iomgr_closure *todo; + + if (!success) { + gpr_mu_destroy(&workqueue->mu); + /* HACK: let wakeup_fd code know that we stole the fd */ + workqueue->wakeup_fd.read_fd = 0; + grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); + grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy"); + gpr_free(workqueue); + return; + } else { + gpr_mu_lock(&workqueue->mu); + todo = workqueue->head.next; + workqueue->head.next = NULL; + workqueue->tail = &workqueue->head; + grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); + gpr_mu_unlock(&workqueue->mu); + grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); + + while (todo) { + grpc_iomgr_closure *next = todo->next; + todo->cb(todo->cb_arg, todo->success); + todo = next; + } + } +} + +void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_iomgr_closure *closure, + int success) { + closure->success = success; + closure->next = NULL; + gpr_mu_lock(&workqueue->mu); + if (workqueue->tail == &workqueue->head) { + grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); + } + workqueue->tail->next = closure; + workqueue->tail = closure; + gpr_mu_unlock(&workqueue->mu); +} + +#endif /* GPR_POSIX_SOCKET */ diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h new file mode 100644 index 0000000000..1b3a0e281b --- /dev/null +++ b/src/core/iomgr/workqueue_posix.h @@ -0,0 +1,52 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H +#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H + +struct grpc_fd; + +struct grpc_workqueue { + gpr_refcount refs; + + gpr_mu mu; + grpc_iomgr_closure head; + grpc_iomgr_closure *tail; + + grpc_wakeup_fd wakeup_fd; + struct grpc_fd *wakeup_read_fd; + + grpc_iomgr_closure read_closure; +}; + +#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_POSIX_H */ diff --git a/src/core/iomgr/workqueue_windows.c b/src/core/iomgr/workqueue_windows.c new file mode 100644 index 0000000000..f9ca57557b --- /dev/null +++ b/src/core/iomgr/workqueue_windows.c @@ -0,0 +1,40 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <grpc/support/port_platform.h> + +#ifdef GPR_WIN32 + +#include "src/core/iomgr/workqueue.h" + +#endif /* GPR_WIN32 */ diff --git a/src/core/iomgr/workqueue_windows.h b/src/core/iomgr/workqueue_windows.h new file mode 100644 index 0000000000..941f195f51 --- /dev/null +++ b/src/core/iomgr/workqueue_windows.h @@ -0,0 +1,37 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_WINDOWS_H +#define GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_WINDOWS_H + +#endif /* GRPC_INTERNAL_CORE_IOMGR_WORKQUEUE_WINDOWS_H */ |