diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-18 17:29:00 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-18 17:29:00 -0700 |
commit | d1bec03fa148344b8eac2b59517252d86e4ca858 (patch) | |
tree | f359e48f9151ab7ceff72cd624ad6c7a59e4d304 /src/core/iomgr | |
parent | 33825118df7157219cec15382beb006d3462ad96 (diff) |
Call list progress
Diffstat (limited to 'src/core/iomgr')
30 files changed, 473 insertions, 540 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index 7b67fe3b1d..6e0d516f0c 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -44,7 +44,6 @@ #define LOG2_NUM_SHARDS 5 #define NUM_SHARDS (1 << LOG2_NUM_SHARDS) -#define MAX_ALARMS_PER_CHECK 128 #define ADD_DEADLINE_SCALE 0.33 #define MIN_QUEUE_WINDOW_DURATION 0.01 #define MAX_QUEUE_WINDOW_DURATION 1 @@ -73,8 +72,8 @@ static shard_type g_shards[NUM_SHARDS]; /* Protected by g_mu */ static shard_type *g_shard_queue[NUM_SHARDS]; -static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, - gpr_timespec *next, int success); +static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next, + int success, grpc_call_list *call_list); static gpr_timespec compute_min_deadline(shard_type *shard) { return grpc_alarm_heap_is_empty(&shard->heap) @@ -103,10 +102,9 @@ void grpc_alarm_list_init(gpr_timespec now) { } } -void grpc_alarm_list_shutdown(void) { +void grpc_alarm_list_shutdown(grpc_call_list *call_list) { int i; - while (run_some_expired_alarms(NULL, gpr_inf_future(g_clock_type), NULL, 0)) - ; + run_some_expired_alarms(gpr_inf_future(g_clock_type), NULL, 0, call_list); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); @@ -174,13 +172,12 @@ static void note_deadline_change(shard_type *shard) { void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg, - gpr_timespec now) { + gpr_timespec now, grpc_call_list *call_list) { int is_first_alarm = 0; shard_type *shard = &g_shards[shard_idx(alarm)]; GPR_ASSERT(deadline.clock_type == g_clock_type); GPR_ASSERT(now.clock_type == g_clock_type); - alarm->cb = alarm_cb; - alarm->cb_arg = alarm_cb_arg; + grpc_closure_init(&alarm->closure, alarm_cb, alarm_cb_arg); alarm->deadline = deadline; alarm->triggered = 0; @@ -223,12 +220,11 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, } } -void grpc_alarm_cancel(grpc_alarm *alarm) { +void grpc_alarm_cancel(grpc_alarm *alarm, grpc_call_list *call_list) { shard_type *shard = &g_shards[shard_idx(alarm)]; - int triggered = 0; gpr_mu_lock(&shard->mu); if (!alarm->triggered) { - triggered = 1; + grpc_call_list_add(call_list, &alarm->closure, 1); alarm->triggered = 1; if (alarm->heap_index == INVALID_HEAP_INDEX) { list_remove(alarm); @@ -237,10 +233,6 @@ void grpc_alarm_cancel(grpc_alarm *alarm) { } } gpr_mu_unlock(&shard->mu); - - if (triggered) { - alarm->cb(alarm->cb_arg, 0); - } } /* This is called when the queue is empty and "now" has reached the @@ -292,39 +284,36 @@ static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) { /* REQUIRES: shard->mu unlocked */ static size_t pop_alarms(shard_type *shard, gpr_timespec now, - grpc_alarm **alarms, size_t max_alarms, - gpr_timespec *new_min_deadline) { + gpr_timespec *new_min_deadline, int success, + grpc_call_list *call_list) { size_t n = 0; grpc_alarm *alarm; gpr_mu_lock(&shard->mu); - while (n < max_alarms && (alarm = pop_one(shard, now))) { - alarms[n++] = alarm; + while ((alarm = pop_one(shard, now))) { + grpc_call_list_add(call_list, &alarm->closure, success); } *new_min_deadline = compute_min_deadline(shard); gpr_mu_unlock(&shard->mu); return n; } -static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, - gpr_timespec *next, int success) { +static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next, + int success, grpc_call_list *call_list) { size_t n = 0; - size_t i; - grpc_alarm *alarms[MAX_ALARMS_PER_CHECK]; /* TODO(ctiller): verify that there are any alarms (atomically) here */ if (gpr_mu_trylock(&g_checker_mu)) { gpr_mu_lock(&g_mu); - while (n < MAX_ALARMS_PER_CHECK && - gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { + while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) { gpr_timespec new_min_deadline; /* For efficiency, we pop as many available alarms as we can from the shard. This may violate perfect alarm deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ - n += pop_alarms(g_shard_queue[0], now, alarms + n, - MAX_ALARMS_PER_CHECK - n, &new_min_deadline); + n += pop_alarms(g_shard_queue[0], now, &new_min_deadline, success, + call_list); /* An grpc_alarm_init() on the shard could intervene here, adding a new alarm that is earlier than new_min_deadline. However, @@ -343,26 +332,15 @@ static int run_some_expired_alarms(gpr_mu *drop_mu, gpr_timespec now, gpr_mu_unlock(&g_checker_mu); } - if (n && drop_mu) { - gpr_mu_unlock(drop_mu); - } - - for (i = 0; i < n; i++) { - alarms[i]->cb(alarms[i]->cb_arg, success); - } - - if (n && drop_mu) { - gpr_mu_lock(drop_mu); - } - - return (int)n; + return n > 0; } -int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next) { +int grpc_alarm_check(gpr_timespec now, gpr_timespec *next, + grpc_call_list *call_list) { GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_alarms( - drop_mu, now, next, - gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0); + now, next, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0, + call_list); } gpr_timespec grpc_alarm_list_next_timeout(void) { diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h index 4a13527e64..05ca7c27bb 100644 --- a/src/core/iomgr/alarm.h +++ b/src/core/iomgr/alarm.h @@ -44,8 +44,7 @@ typedef struct grpc_alarm { int triggered; struct grpc_alarm *next; struct grpc_alarm *prev; - grpc_iomgr_cb_func cb; - void *cb_arg; + grpc_closure closure; } grpc_alarm; /* Initialize *alarm. When expired or canceled, alarm_cb will be called with @@ -56,7 +55,7 @@ typedef struct grpc_alarm { information about when to free up any user-level state. */ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg, - gpr_timespec now); + gpr_timespec now, grpc_call_list *call_list); /* Note that there is no alarm destroy function. This is because the alarm is a one-time occurrence with a guarantee that the callback will @@ -84,6 +83,6 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, matches this aim. Requires: cancel() must happen after add() on a given alarm */ -void grpc_alarm_cancel(grpc_alarm *alarm); +void grpc_alarm_cancel(grpc_alarm *alarm, grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */ diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h index e9f98a3444..a2e9946f4f 100644 --- a/src/core/iomgr/alarm_internal.h +++ b/src/core/iomgr/alarm_internal.h @@ -48,10 +48,10 @@ with high probability at least one thread in the system will see an update at any time slice. */ -int grpc_alarm_check(gpr_mu *drop_mu, gpr_timespec now, gpr_timespec *next); - +int grpc_alarm_check(gpr_timespec now, gpr_timespec *next, + grpc_call_list *call_list); void grpc_alarm_list_init(gpr_timespec now); -void grpc_alarm_list_shutdown(void); +void grpc_alarm_list_shutdown(grpc_call_list *call_list); gpr_timespec grpc_alarm_list_next_timeout(void); diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 1955f74b9a..769f155a63 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -33,30 +33,34 @@ #include "src/core/iomgr/endpoint.h" -grpc_endpoint_op_status grpc_endpoint_read(grpc_endpoint *ep, - gpr_slice_buffer *slices, - grpc_closure *cb) { - return ep->vtable->read(ep, slices, cb); +void grpc_endpoint_read(grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_closure *cb, grpc_call_list *call_list) { + ep->vtable->read(ep, slices, cb, call_list); } -grpc_endpoint_op_status grpc_endpoint_write(grpc_endpoint *ep, - gpr_slice_buffer *slices, - grpc_closure *cb) { - return ep->vtable->write(ep, slices, cb); +void grpc_endpoint_write(grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_closure *cb, grpc_call_list *call_list) { + ep->vtable->write(ep, slices, cb, call_list); } -void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { - ep->vtable->add_to_pollset(ep, pollset); +void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset, + grpc_call_list *call_list) { + ep->vtable->add_to_pollset(ep, pollset, call_list); } void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, - grpc_pollset_set *pollset_set) { - ep->vtable->add_to_pollset_set(ep, pollset_set); + grpc_pollset_set *pollset_set, + grpc_call_list *call_list) { + ep->vtable->add_to_pollset_set(ep, pollset_set, call_list); } -void grpc_endpoint_shutdown(grpc_endpoint *ep) { ep->vtable->shutdown(ep); } +void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_call_list *call_list) { + ep->vtable->shutdown(ep, call_list); +} -void grpc_endpoint_destroy(grpc_endpoint *ep) { ep->vtable->destroy(ep); } +void grpc_endpoint_destroy(grpc_endpoint *ep, grpc_call_list *call_list) { + ep->vtable->destroy(ep, call_list); +} char *grpc_endpoint_get_peer(grpc_endpoint *ep) { return ep->vtable->get_peer(ep); diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index 79b7d6a78e..f9881684ff 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -46,21 +46,17 @@ typedef struct grpc_endpoint grpc_endpoint; typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; -typedef enum grpc_endpoint_op_status { - GRPC_ENDPOINT_DONE, /* completed immediately, cb won't be called */ - GRPC_ENDPOINT_PENDING, /* cb will be called when completed */ - GRPC_ENDPOINT_ERROR /* write errored out, cb won't be called */ -} grpc_endpoint_op_status; - struct grpc_endpoint_vtable { - grpc_endpoint_op_status (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_closure *cb); - grpc_endpoint_op_status (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_closure *cb); - void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset); - void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset); - void (*shutdown)(grpc_endpoint *ep); - void (*destroy)(grpc_endpoint *ep); + void (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb, + grpc_call_list *call_list); + void (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb, + grpc_call_list *call_list); + void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset, + grpc_call_list *call_list); + void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset, + grpc_call_list *call_list); + void (*shutdown)(grpc_endpoint *ep, grpc_call_list *call_list); + void (*destroy)(grpc_endpoint *ep, grpc_call_list *call_list); char *(*get_peer)(grpc_endpoint *ep); }; @@ -68,9 +64,8 @@ struct grpc_endpoint_vtable { Callback success indicates that the endpoint can accept more reads, failure indicates the endpoint is closed. Valid slices may be placed into \a slices even on callback success == 0. */ -grpc_endpoint_op_status grpc_endpoint_read( - grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_closure *cb) GRPC_MUST_USE_RESULT; +void grpc_endpoint_read(grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_closure *cb, grpc_call_list *call_list); char *grpc_endpoint_get_peer(grpc_endpoint *ep); @@ -84,20 +79,21 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep); No guarantee is made to the content of slices after a write EXCEPT that it is a valid slice buffer. */ -grpc_endpoint_op_status grpc_endpoint_write( - grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_closure *cb) GRPC_MUST_USE_RESULT; +void grpc_endpoint_write(grpc_endpoint *ep, gpr_slice_buffer *slices, + grpc_closure *cb, grpc_call_list *call_list); /* Causes any pending read/write callbacks to run immediately with success==0 */ -void grpc_endpoint_shutdown(grpc_endpoint *ep); -void grpc_endpoint_destroy(grpc_endpoint *ep); +void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_call_list *call_list); +void grpc_endpoint_destroy(grpc_endpoint *ep, grpc_call_list *call_list); /* Add an endpoint to a pollset, so that when the pollset is polled, events from this endpoint are considered */ -void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset); +void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset, + grpc_call_list *call_list); void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, - grpc_pollset_set *pollset_set); + grpc_pollset_set *pollset_set, + grpc_call_list *call_list); struct grpc_endpoint { const grpc_endpoint_vtable *vtable; diff --git a/src/core/iomgr/endpoint_pair.h b/src/core/iomgr/endpoint_pair.h index 25ef1891fb..095ec5fcc9 100644 --- a/src/core/iomgr/endpoint_pair.h +++ b/src/core/iomgr/endpoint_pair.h @@ -42,7 +42,6 @@ typedef struct { } grpc_endpoint_pair; grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size, - grpc_workqueue *workqueue); + size_t read_slice_size); #endif /* GRPC_INTERNAL_CORE_IOMGR_ENDPOINT_PAIR_H */ diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index dc1f441b4b..deae9c6875 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -59,20 +59,19 @@ static void create_sockets(int sv[2]) { } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, - size_t read_slice_size, - grpc_workqueue *workqueue) { + size_t read_slice_size) { int sv[2]; grpc_endpoint_pair p; char *final_name; create_sockets(sv); gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], workqueue, final_name), - read_slice_size, "socketpair-server"); + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), read_slice_size, + "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], workqueue, final_name), - read_slice_size, "socketpair-client"); + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), read_slice_size, + "socketpair-client"); gpr_free(final_name); return p; } diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index dfebbcc2e1..294cb70746 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -71,9 +71,6 @@ static grpc_fd *fd_freelist = NULL; static gpr_mu fd_freelist_mu; static void freelist_fd(grpc_fd *fd) { - if (fd->workqueue->wakeup_read_fd != fd) { - GRPC_WORKQUEUE_UNREF(fd->workqueue, "fd"); - } gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -161,14 +158,8 @@ void grpc_fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name) { +grpc_fd *grpc_fd_create(int fd, const char *name) { grpc_fd *r = alloc_fd(fd); - r->workqueue = workqueue; - /* if the wakeup_read_fd is NULL, then the workqueue is under construction - ==> this fd will be the wakeup_read_fd, and we shouldn't take a ref */ - if (workqueue->wakeup_read_fd != NULL) { - GRPC_WORKQUEUE_REF(workqueue, "fd"); - } grpc_iomgr_register_object(&r->iomgr_object, name); return r; } @@ -218,7 +209,8 @@ static int has_watchers(grpc_fd *fd) { fd->inactive_watcher_root.next != &fd->inactive_watcher_root; } -void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason) { +void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason, + grpc_call_list *call_list) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); gpr_mu_lock(&fd->watcher_mu); @@ -226,9 +218,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason) { if (!has_watchers(fd)) { fd->closed = 1; close(fd->fd); - if (fd->on_done_closure) { - grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1); - } + grpc_call_list_add(call_list, fd->on_done_closure, 1); } else { wake_all_watchers_locked(fd); } @@ -252,25 +242,8 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif -static void process_callback(grpc_closure *closure, int success, - grpc_workqueue *optional_workqueue) { - if (optional_workqueue == NULL) { - closure->cb(closure->cb_arg, success); - } else { - grpc_workqueue_push(optional_workqueue, closure, success); - } -} - -static void process_callbacks(grpc_closure *callbacks, size_t n, int success, - grpc_workqueue *optional_workqueue) { - size_t i; - for (i = 0; i < n; i++) { - process_callback(callbacks + i, success, optional_workqueue); - } -} - static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure, - int allow_synchronous_callback) { + grpc_call_list *call_list) { switch (gpr_atm_acq_load(st)) { case NOT_READY: /* There is no race if the descriptor is already ready, so we skip @@ -292,8 +265,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure, case READY: GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); - process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), - allow_synchronous_callback ? NULL : fd->workqueue); + grpc_call_list_add(call_list, closure, !gpr_atm_acq_load(&fd->shutdown)); return; default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ @@ -306,8 +278,8 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure, abort(); } -static void set_ready_locked(gpr_atm *st, grpc_closure **callbacks, - size_t *ncallbacks) { +static void set_ready_locked(grpc_fd *fd, gpr_atm *st, + grpc_call_list *call_list) { gpr_intptr state = gpr_atm_acq_load(st); switch (state) { @@ -326,50 +298,38 @@ static void set_ready_locked(gpr_atm *st, grpc_closure **callbacks, default: /* waiting */ GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY && gpr_atm_no_barrier_load(st) != NOT_READY); - callbacks[(*ncallbacks)++] = (grpc_closure *)state; + grpc_call_list_add(call_list, (grpc_closure *)state, + !gpr_atm_acq_load(&fd->shutdown)); gpr_atm_rel_store(st, NOT_READY); return; } } -static void set_ready(grpc_fd *fd, gpr_atm *st, - int allow_synchronous_callback) { +static void set_ready(grpc_fd *fd, gpr_atm *st, grpc_call_list *call_list) { /* only one set_ready can be active at once (but there may be a racing notify_on) */ - int success; - grpc_closure *closure; - size_t ncb = 0; - gpr_mu_lock(&fd->set_state_mu); - set_ready_locked(st, &closure, &ncb); + set_ready_locked(fd, st, call_list); gpr_mu_unlock(&fd->set_state_mu); - success = !gpr_atm_acq_load(&fd->shutdown); - GPR_ASSERT(ncb <= 1); - if (ncb > 0) { - process_callbacks(closure, ncb, success, - allow_synchronous_callback ? NULL : fd->workqueue); - } } -void grpc_fd_shutdown(grpc_fd *fd) { - size_t ncb = 0; +void grpc_fd_shutdown(grpc_fd *fd, grpc_call_list *call_list) { gpr_mu_lock(&fd->set_state_mu); GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown)); gpr_atm_rel_store(&fd->shutdown, 1); - set_ready_locked(&fd->readst, &fd->shutdown_closures[0], &ncb); - set_ready_locked(&fd->writest, &fd->shutdown_closures[0], &ncb); + set_ready_locked(fd, &fd->readst, call_list); + set_ready_locked(fd, &fd->writest, call_list); gpr_mu_unlock(&fd->set_state_mu); - GPR_ASSERT(ncb <= 2); - process_callbacks(fd->shutdown_closures[0], ncb, 0 /* GPR_FALSE */, - 0 /* GPR_FALSE */); } -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure) { - notify_on(fd, &fd->readst, closure, 0); +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure, + grpc_call_list *call_list) { + notify_on(fd, &fd->readst, closure, call_list); } -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure) { - notify_on(fd, &fd->writest, closure, 0); +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure, + grpc_call_list *call_list) { + notify_on(fd, &fd->writest, closure, call_list); } gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, @@ -415,7 +375,8 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, return mask; } -void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { +void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write, + grpc_call_list *call_list) { int was_polling = 0; int kick = 0; grpc_fd *fd = watcher->fd; @@ -448,21 +409,19 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write) { if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { fd->closed = 1; close(fd->fd); - if (fd->on_done_closure != NULL) { - grpc_workqueue_push(fd->workqueue, fd->on_done_closure, 1); - } + grpc_call_list_add(call_list, fd->on_done_closure, 1); } gpr_mu_unlock(&fd->watcher_mu); GRPC_FD_UNREF(fd, "poll"); } -void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback) { - set_ready(fd, &fd->readst, allow_synchronous_callback); +void grpc_fd_become_readable(grpc_fd *fd, grpc_call_list *call_list) { + set_ready(fd, &fd->readst, call_list); } -void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback) { - set_ready(fd, &fd->writest, allow_synchronous_callback); +void grpc_fd_become_writable(grpc_fd *fd, grpc_call_list *call_list) { + set_ready(fd, &fd->writest, call_list); } #endif diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index bb85b6c16e..f435e2d3f9 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -58,7 +58,6 @@ struct grpc_fd { meaning that mostly we ref by two to avoid altering the orphaned bit, and just unref by 1 when we're ready to flag the object as orphaned */ gpr_atm refst; - grpc_workqueue *workqueue; gpr_mu set_state_mu; gpr_atm shutdown; @@ -105,7 +104,7 @@ struct grpc_fd { /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. This takes ownership of closing fd. */ -grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name); +grpc_fd *grpc_fd_create(int fd, const char *name); /* Releases fd to be asynchronously destroyed. on_done is called when the underlying file descriptor is definitely close()d. @@ -113,7 +112,8 @@ grpc_fd *grpc_fd_create(int fd, grpc_workqueue *workqueue, const char *name); Requires: *fd initialized; no outstanding notify_on_read or notify_on_write. MUST NOT be called with a pollset lock taken */ -void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason); +void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason, + grpc_call_list *call_list); /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read @@ -131,13 +131,14 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, grpc_fd_watcher *rec); /* Complete polling previously started with grpc_fd_begin_poll MUST NOT be called with a pollset lock taken */ -void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write); +void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write, + grpc_call_list *call_list); /* Return 1 if this fd is orphaned, 0 otherwise */ int grpc_fd_is_orphaned(grpc_fd *fd); /* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */ -void grpc_fd_shutdown(grpc_fd *fd); +void grpc_fd_shutdown(grpc_fd *fd, grpc_call_list *call_list); /* Register read interest, causing read_cb to be called once when fd becomes readable, on deadline specified by deadline, or on shutdown triggered by @@ -152,17 +153,19 @@ void grpc_fd_shutdown(grpc_fd *fd); underlying platform. This means that users must drain fd in read_cb before calling notify_on_read again. Users are also expected to handle spurious events, i.e read_cb is called while nothing can be readable from fd */ -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure); +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure, + grpc_call_list *call_list); /* Exactly the same semantics as above, except based on writable events. */ -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure); +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure, + grpc_call_list *call_list); /* Notification from the poller to an fd that it has become readable or writable. If allow_synchronous_callback is 1, allow running the fd callback inline in this callstack, otherwise register an asynchronous callback and return */ -void grpc_fd_become_readable(grpc_fd *fd, int allow_synchronous_callback); -void grpc_fd_become_writable(grpc_fd *fd, int allow_synchronous_callback); +void grpc_fd_become_readable(grpc_fd *fd, grpc_call_list *call_list); +void grpc_fd_become_writable(grpc_fd *fd, grpc_call_list *call_list); /* Reference counting for fds */ #ifdef GRPC_FD_REF_COUNT_DEBUG diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index dd76044913..029c689982 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -88,6 +88,7 @@ void grpc_iomgr_shutdown(void) { gpr_timespec shutdown_deadline = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN)); gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&g_mu); g_shutdown = 1; @@ -101,7 +102,11 @@ void grpc_iomgr_shutdown(void) { } last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } - if (grpc_alarm_check(&g_mu, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL)) { + if (grpc_alarm_check(gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL, + &call_list)) { + gpr_mu_unlock(&g_mu); + grpc_call_list_run(&call_list); + gpr_mu_lock(&g_mu); continue; } if (g_root_object.next != &g_root_object) { @@ -126,7 +131,8 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); - grpc_alarm_list_shutdown(); + grpc_alarm_list_shutdown(&call_list); + grpc_call_list_run(&call_list); grpc_iomgr_platform_shutdown(); gpr_mu_destroy(&g_mu); @@ -171,12 +177,15 @@ void grpc_call_list_add(grpc_call_list *call_list, grpc_closure *closure, call_list->tail = closure; } -void grpc_call_list_run(grpc_call_list call_list) { - grpc_closure *c = call_list.head; - while (c) { - grpc_closure *next = c->next; - c->cb(c->cb_arg, c->success); - c = next; +void grpc_call_list_run(grpc_call_list *call_list) { + while (!grpc_call_list_empty(*call_list)) { + grpc_closure *c = call_list->head; + call_list->head = call_list->tail = NULL; + while (c) { + grpc_closure *next = c->next; + c->cb(c->cb_arg, c->success, call_list); + c = next; + } } } diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 56f0195be9..94f3912990 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -34,15 +34,24 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_H #define GRPC_INTERNAL_CORE_IOMGR_IOMGR_H +struct grpc_closure; +typedef struct grpc_closure grpc_closure; + +typedef struct grpc_call_list { + grpc_closure *head; + grpc_closure *tail; +} grpc_call_list; + /** gRPC Callback definition. * * \param arg Arbitrary input. * \param success An indication on the state of the iomgr. On false, cleanup * actions should be taken (eg, shutdown). */ -typedef void (*grpc_iomgr_cb_func)(void *arg, int success); +typedef void (*grpc_iomgr_cb_func)(void *arg, int success, + grpc_call_list *call_list); /** A closure over a grpc_iomgr_cb_func. */ -typedef struct grpc_closure { +struct grpc_closure { /** Bound callback. */ grpc_iomgr_cb_func cb; @@ -56,12 +65,7 @@ typedef struct grpc_closure { /**< Internal. Do not touch */ struct grpc_closure *next; -} grpc_closure; - -typedef struct grpc_call_list { - grpc_closure *head; - grpc_closure *tail; -} grpc_call_list; +}; /** Initializes \a closure with \a cb and \a cb_arg. */ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, @@ -72,7 +76,7 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void grpc_call_list_add(grpc_call_list *list, grpc_closure *closure, int success); -void grpc_call_list_run(grpc_call_list list); +void grpc_call_list_run(grpc_call_list *list); void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst); int grpc_call_list_empty(grpc_call_list list); diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 337596cb74..98653412bb 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -55,9 +55,8 @@ #endif void grpc_pollset_init(grpc_pollset *pollset); -void grpc_pollset_shutdown(grpc_pollset *pollset, - void (*shutdown_done)(void *arg), - void *shutdown_done_arg); +void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure, + grpc_call_list *call_list); void grpc_pollset_destroy(grpc_pollset *pollset); /* Do some work on a pollset. diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index 5ca957b8e1..e6e0f5cdd4 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -61,7 +61,8 @@ typedef struct { wakeup_fd_hdl *free_wakeup_fds; } pollset_hdr; -static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) { +static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd, + grpc_call_list *call_list) { pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; @@ -83,15 +84,15 @@ static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd) { } } } - grpc_fd_end_poll(&watcher, 0, 0); + grpc_fd_end_poll(&watcher, 0, 0, call_list); } -static void perform_delayed_add(void *arg, int iomgr_status) { +static void perform_delayed_add(void *arg, int iomgr_status, + grpc_call_list *call_list) { delayed_add *da = arg; - int do_shutdown_cb = 0; if (!grpc_fd_is_orphaned(da->fd)) { - finally_add_fd(da->pollset, da->fd); + finally_add_fd(da->pollset, da->fd, call_list); } gpr_mu_lock(&da->pollset->mu); @@ -100,26 +101,23 @@ static void perform_delayed_add(void *arg, int iomgr_status) { /* We don't care about this pollset anymore. */ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { da->pollset->called_shutdown = 1; - do_shutdown_cb = 1; + grpc_call_list_add(call_list, da->pollset->shutdown_done, 1); } } gpr_mu_unlock(&da->pollset->mu); GRPC_FD_UNREF(da->fd, "delayed_add"); - if (do_shutdown_cb) { - da->pollset->shutdown_done_cb(da->pollset->shutdown_done_arg); - } - gpr_free(da); } static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { + int and_unlock_pollset, + grpc_call_list *call_list) { if (and_unlock_pollset) { gpr_mu_unlock(&pollset->mu); - finally_add_fd(pollset, fd); + finally_add_fd(pollset, fd, call_list); } else { delayed_add *da = gpr_malloc(sizeof(*da)); da->pollset = pollset; @@ -127,13 +125,14 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, GRPC_FD_REF(fd, "delayed_add"); grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_pollset_add_unlock_job(pollset, &da->closure); + grpc_call_list_add(call_list, &da->closure, 1); } } static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { + int and_unlock_pollset, + grpc_call_list *call_list) { pollset_hdr *h = pollset->data.ptr; int err; @@ -153,9 +152,9 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, /* TODO(klempner): We probably want to turn this down a bit */ #define GRPC_EPOLL_MAX_EVENTS 1000 -static void multipoll_with_epoll_pollset_maybe_work( +static void multipoll_with_epoll_pollset_maybe_work_and_unlock( grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, - gpr_timespec now, int allow_synchronous_callback) { + gpr_timespec now, grpc_call_list *call_list) { struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; int ep_rv; int poll_rv; @@ -209,18 +208,16 @@ static void multipoll_with_epoll_pollset_maybe_work( int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI); int write = ep_ev[i].events & EPOLLOUT; if (read || cancel) { - grpc_fd_become_readable(fd, allow_synchronous_callback); + grpc_fd_become_readable(fd, call_list); } if (write || cancel) { - grpc_fd_become_writable(fd, allow_synchronous_callback); + grpc_fd_become_writable(fd, call_list); } } } } while (ep_rv == GRPC_EPOLL_MAX_EVENTS); } } - - gpr_mu_lock(&pollset->mu); } static void multipoll_with_epoll_pollset_finish_shutdown( @@ -234,12 +231,12 @@ static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) { static const grpc_pollset_vtable multipoll_with_epoll_pollset = { multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd, - multipoll_with_epoll_pollset_maybe_work, + multipoll_with_epoll_pollset_maybe_work_and_unlock, multipoll_with_epoll_pollset_finish_shutdown, multipoll_with_epoll_pollset_destroy}; static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, - size_t nfds) { + size_t nfds, grpc_call_list *call_list) { size_t i; pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); @@ -252,7 +249,7 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, abort(); } for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0); + multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0, call_list); } } diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index cae260cab0..03a28894f0 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -61,7 +61,8 @@ typedef struct { static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { + int and_unlock_pollset, + grpc_call_list *call_list) { size_t i; pollset_hdr *h = pollset->data.ptr; /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ @@ -82,7 +83,8 @@ exit: static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { + int and_unlock_pollset, + grpc_call_list *call_list) { /* will get removed next poll cycle */ pollset_hdr *h = pollset->data.ptr; if (h->del_count == h->del_capacity) { @@ -96,9 +98,9 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, } } -static void multipoll_with_poll_pollset_maybe_work( +static void multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, - gpr_timespec now, int allow_synchronous_callback) { + gpr_timespec now, grpc_call_list *call_list) { int timeout; int r; size_t i, j, fd_count; @@ -149,7 +151,7 @@ static void multipoll_with_poll_pollset_maybe_work( for (i = 1; i < pfd_count; i++) { grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN, - pfds[i].revents & POLLOUT); + pfds[i].revents & POLLOUT, call_list); } if (r < 0) { @@ -167,18 +169,16 @@ static void multipoll_with_poll_pollset_maybe_work( continue; } if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable(watchers[i].fd, allow_synchronous_callback); + grpc_fd_become_readable(watchers[i].fd, call_list); } if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable(watchers[i].fd, allow_synchronous_callback); + grpc_fd_become_writable(watchers[i].fd, call_list); } } } gpr_free(pfds); gpr_free(watchers); - - gpr_mu_lock(&pollset->mu); } static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) { @@ -204,12 +204,12 @@ static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) { static const grpc_pollset_vtable multipoll_with_poll_pollset = { multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd, - multipoll_with_poll_pollset_maybe_work, + multipoll_with_poll_pollset_maybe_work_and_unlock, multipoll_with_poll_pollset_finish_shutdown, multipoll_with_poll_pollset_destroy}; void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, - size_t nfds) { + size_t nfds, grpc_call_list *call_list) { size_t i; pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); pollset->vtable = &multipoll_with_poll_pollset; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 0fe3f80d44..885cb29234 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -136,17 +136,14 @@ void grpc_pollset_init(grpc_pollset *pollset) { pollset->in_flight_cbs = 0; pollset->shutting_down = 0; pollset->called_shutdown = 0; - pollset->idle_jobs = NULL; - pollset->unlock_jobs = NULL; + pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL; become_basic_pollset(pollset, NULL); } -void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { - if (fd->workqueue->wakeup_read_fd != fd) { - grpc_pollset_add_fd(pollset, fd->workqueue->wakeup_read_fd); - } +void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, + grpc_call_list *call_list) { gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(pollset, fd, 1); + pollset->vtable->add_fd(pollset, fd, 1, call_list); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -157,9 +154,10 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { #endif } -void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { +void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, + grpc_call_list *call_list) { gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(pollset, fd, 1); + pollset->vtable->del_fd(pollset, fd, 1, call_list); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -170,53 +168,27 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { #endif } -static void finish_shutdown(grpc_pollset *pollset) { +static void finish_shutdown(grpc_pollset *pollset, grpc_call_list *call_list) { pollset->vtable->finish_shutdown(pollset); - pollset->shutdown_done_cb(pollset->shutdown_done_arg); -} - -static void run_jobs(grpc_pollset *pollset, grpc_closure **root) { - grpc_closure *exec = *root; - *root = NULL; - gpr_mu_unlock(&pollset->mu); - while (exec != NULL) { - grpc_closure *next = exec->next; - exec->cb(exec->cb_arg, 1); - exec = next; - } - gpr_mu_lock(&pollset->mu); -} - -static void add_job(grpc_closure **root, grpc_closure *closure) { - closure->next = *root; - *root = closure; -} - -void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure) { - add_job(&pollset->idle_jobs, closure); -} - -void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure) { - add_job(&pollset->unlock_jobs, closure); + grpc_call_list_add(call_list, pollset->shutdown_done, 1); } void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline) { /* pollset->mu already held */ int added_worker = 0; + int locked = 1; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; /* this must happen before we (potentially) drop pollset->mu */ worker->next = worker->prev = NULL; /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); - if (!grpc_pollset_has_workers(pollset) && pollset->idle_jobs != NULL) { - run_jobs(pollset, &pollset->idle_jobs); + if (!grpc_pollset_has_workers(pollset) && + !grpc_call_list_empty(pollset->idle_jobs)) { + grpc_call_list_move(&pollset->idle_jobs, &call_list); goto done; } - if (pollset->unlock_jobs != NULL) { - run_jobs(pollset, &pollset->unlock_jobs); - goto done; - } - if (grpc_alarm_check(&pollset->mu, now, &deadline)) { + if (grpc_alarm_check(now, &deadline, &call_list)) { goto done; } if (pollset->shutting_down) { @@ -225,19 +197,32 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, if (pollset->in_flight_cbs) { /* Give do_promote priority so we don't starve it out */ gpr_mu_unlock(&pollset->mu); - gpr_mu_lock(&pollset->mu); + locked = 0; goto done; } if (!pollset->kicked_without_pollers) { push_front_worker(pollset, worker); added_worker = 1; gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); - pollset->vtable->maybe_work(pollset, worker, deadline, now, 1); + pollset->vtable->maybe_work_and_unlock(pollset, worker, deadline, now, + NULL); + locked = 0; gpr_tls_set(&g_current_thread_poller, 0); } else { pollset->kicked_without_pollers = 0; } done: + if (!grpc_call_list_empty(call_list)) { + if (locked) { + gpr_mu_unlock(&pollset->mu); + locked = 0; + } + grpc_call_list_run(&call_list); + } + if (!locked) { + gpr_mu_lock(&pollset->mu); + locked = 1; + } grpc_wakeup_fd_destroy(&worker->wakeup_fd); if (added_worker) { remove_worker(pollset, worker); @@ -248,7 +233,8 @@ done: } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); - finish_shutdown(pollset); + finish_shutdown(pollset, &call_list); + grpc_call_list_run(&call_list); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * grpc_pollset_work. @@ -258,9 +244,8 @@ done: } } -void grpc_pollset_shutdown(grpc_pollset *pollset, - void (*shutdown_done)(void *arg), - void *shutdown_done_arg) { +void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure, + grpc_call_list *call_list) { int call_shutdown = 0; gpr_mu_lock(&pollset->mu); GPR_ASSERT(!pollset->shutting_down); @@ -270,13 +255,12 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, pollset->called_shutdown = 1; call_shutdown = 1; } - pollset->shutdown_done_cb = shutdown_done; - pollset->shutdown_done_arg = shutdown_done_arg; + pollset->shutdown_done = closure; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); gpr_mu_unlock(&pollset->mu); if (call_shutdown) { - finish_shutdown(pollset); + finish_shutdown(pollset, call_list); } } @@ -317,12 +301,12 @@ typedef struct grpc_unary_promote_args { grpc_closure promotion_closure; } grpc_unary_promote_args; -static void basic_do_promote(void *args, int success) { +static void basic_do_promote(void *args, int success, + grpc_call_list *call_list) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; grpc_fd *fd = up_args->fd; - int do_shutdown_cb = 0; /* * This is quite tricky. There are a number of cases to keep in mind here: @@ -349,19 +333,20 @@ static void basic_do_promote(void *args, int success) { if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); pollset->called_shutdown = 1; - do_shutdown_cb = 1; + grpc_call_list_add(call_list, pollset->shutdown_done, 1); } } else if (grpc_fd_is_orphaned(fd)) { /* Don't try to add it to anything, we'll drop our ref on it below */ } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(pollset, fd, 0); + pollset->vtable->add_fd(pollset, fd, 0, call_list); } else if (fd != pollset->data.ptr) { grpc_fd *fds[2]; fds[0] = pollset->data.ptr; fds[1] = fd; if (fds[0] && !grpc_fd_is_orphaned(fds[0])) { - grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds), + call_list); GRPC_FD_UNREF(fds[0], "basicpoll"); } else { /* old fd is orphaned and we haven't cleaned it up until now, so remain a @@ -376,16 +361,15 @@ static void basic_do_promote(void *args, int success) { gpr_mu_unlock(&pollset->mu); - if (do_shutdown_cb) { - pollset->shutdown_done_cb(pollset->shutdown_done_arg); - } - /* Matching ref in basic_pollset_add_fd */ GRPC_FD_UNREF(fd, "basicpoll_add"); + + grpc_call_list_run(call_list); } static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { + int and_unlock_pollset, + grpc_call_list *call_list) { grpc_unary_promote_args *up_args; GPR_ASSERT(fd); if (fd == pollset->data.ptr) goto exit; @@ -402,7 +386,8 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, pollset->data.ptr = fd; GRPC_FD_REF(fd, "basicpoll"); } else if (!grpc_fd_is_orphaned(fds[0])) { - grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); + grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds), + call_list); GRPC_FD_UNREF(fds[0], "basicpoll"); } else { /* old fd is orphaned and we haven't cleaned it up until now, so remain a @@ -424,7 +409,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_pollset_add_idle_job(pollset, &up_args->promotion_closure); + grpc_call_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: @@ -434,7 +419,8 @@ exit: } static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, - int and_unlock_pollset) { + int and_unlock_pollset, + grpc_call_list *call_list) { GPR_ASSERT(fd); if (fd == pollset->data.ptr) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); @@ -446,10 +432,11 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, } } -static void basic_pollset_maybe_work(grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback) { +static void basic_pollset_maybe_work_and_unlock(grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, + gpr_timespec now, + grpc_call_list *call_list) { struct pollfd pfd[2]; grpc_fd *fd; grpc_fd_watcher fd_watcher; @@ -487,7 +474,7 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, if (fd) { grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, - pfd[1].revents & POLLOUT); + pfd[1].revents & POLLOUT, call_list); } if (r < 0) { @@ -502,15 +489,13 @@ static void basic_pollset_maybe_work(grpc_pollset *pollset, } if (nfds > 1) { if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable(fd, allow_synchronous_callback); + grpc_fd_become_readable(fd, call_list); } if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable(fd, allow_synchronous_callback); + grpc_fd_become_writable(fd, call_list); } } } - - gpr_mu_lock(&pollset->mu); } static void basic_pollset_destroy(grpc_pollset *pollset) { @@ -521,8 +506,9 @@ static void basic_pollset_destroy(grpc_pollset *pollset) { } static const grpc_pollset_vtable basic_pollset = { - basic_pollset_add_fd, basic_pollset_del_fd, basic_pollset_maybe_work, - basic_pollset_destroy, basic_pollset_destroy}; + basic_pollset_add_fd, basic_pollset_del_fd, + basic_pollset_maybe_work_and_unlock, basic_pollset_destroy, + basic_pollset_destroy}; static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) { pollset->vtable = &basic_pollset; diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 4f09f870f7..c65f25fd18 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -65,10 +65,8 @@ typedef struct grpc_pollset { int shutting_down; int called_shutdown; int kicked_without_pollers; - void (*shutdown_done_cb)(void *arg); - void *shutdown_done_arg; - grpc_closure *unlock_jobs; - grpc_closure *idle_jobs; + grpc_closure *shutdown_done; + grpc_call_list idle_jobs; union { int fd; void *ptr; @@ -77,12 +75,13 @@ typedef struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd, - int and_unlock_pollset); + int and_unlock_pollset, grpc_call_list *call_list); void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd, - int and_unlock_pollset); - void (*maybe_work)(grpc_pollset *pollset, grpc_pollset_worker *worker, - gpr_timespec deadline, gpr_timespec now, - int allow_synchronous_callback); + int and_unlock_pollset, grpc_call_list *call_list); + void (*maybe_work_and_unlock)(grpc_pollset *pollset, + grpc_pollset_worker *worker, + gpr_timespec deadline, gpr_timespec now, + grpc_call_list *call_list); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; @@ -90,10 +89,12 @@ struct grpc_pollset_vtable { #define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) /* Add an fd to a pollset */ -void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd); +void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd, + grpc_call_list *call_list); /* Force remove an fd from a pollset (normally they are removed on the next poll after an fd is orphaned) */ -void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd); +void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd, + grpc_call_list *call_list); /* Returns the fd to listen on for kicks */ int grpc_kick_read_fd(grpc_pollset *p); @@ -111,13 +112,13 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, gpr_timespec now); /* turn a pollset into a multipoller: platform specific */ -typedef void (*grpc_platform_become_multipoller_type)(grpc_pollset *pollset, - struct grpc_fd **fds, - size_t fd_count); +typedef void (*grpc_platform_become_multipoller_type)( + grpc_pollset *pollset, struct grpc_fd **fds, size_t fd_count, + grpc_call_list *call_list); extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller; void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds, - size_t fd_count); + size_t fd_count, grpc_call_list *call_list); /* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must * be locked) */ @@ -127,9 +128,4 @@ int grpc_pollset_has_workers(grpc_pollset *pollset); typedef int (*grpc_poll_function_type)(struct pollfd *, nfds_t, int); extern grpc_poll_function_type grpc_poll_function; -/** schedule a closure to be run next time there are no active workers */ -void grpc_pollset_add_idle_job(grpc_pollset *pollset, grpc_closure *closure); -/** schedule a closure to be run next time the pollset is unlocked */ -void grpc_pollset_add_unlock_job(grpc_pollset *pollset, grpc_closure *closure); - #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_POSIX_H */ diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index 6d73951c70..ca667bf34e 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -52,8 +52,10 @@ void grpc_pollset_set_init(grpc_pollset_set *pollset_set); void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, - grpc_pollset *pollset); + grpc_pollset *pollset, + grpc_call_list *call_list); void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set, - grpc_pollset *pollset); + grpc_pollset *pollset, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index 2076ac70ef..a2c1f47629 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -59,7 +59,8 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { } void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, - grpc_pollset *pollset) { + grpc_pollset *pollset, + grpc_call_list *call_list) { size_t i, j; gpr_mu_lock(&pollset_set->mu); if (pollset_set->pollset_count == pollset_set->pollset_capacity) { @@ -74,7 +75,7 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, if (grpc_fd_is_orphaned(pollset_set->fds[i])) { GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); } else { - grpc_pollset_add_fd(pollset, pollset_set->fds[i]); + grpc_pollset_add_fd(pollset, pollset_set->fds[i], call_list); pollset_set->fds[j++] = pollset_set->fds[i]; } } @@ -83,7 +84,8 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, } void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set, - grpc_pollset *pollset) { + grpc_pollset *pollset, + grpc_call_list *call_list) { size_t i; gpr_mu_lock(&pollset_set->mu); for (i = 0; i < pollset_set->pollset_count; i++) { @@ -97,7 +99,8 @@ void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set, gpr_mu_unlock(&pollset_set->mu); } -void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) { +void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd, + grpc_call_list *call_list) { size_t i; gpr_mu_lock(&pollset_set->mu); if (pollset_set->fd_count == pollset_set->fd_capacity) { @@ -108,12 +111,13 @@ void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) { GRPC_FD_REF(fd, "pollset_set"); pollset_set->fds[pollset_set->fd_count++] = fd; for (i = 0; i < pollset_set->pollset_count; i++) { - grpc_pollset_add_fd(pollset_set->pollsets[i], fd); + grpc_pollset_add_fd(pollset_set->pollsets[i], fd, call_list); } gpr_mu_unlock(&pollset_set->mu); } -void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd) { +void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd, + grpc_call_list *call_list) { size_t i; gpr_mu_lock(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index e88740bde1..e9969d2c0f 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -49,7 +49,9 @@ typedef struct grpc_pollset_set { grpc_fd **fds; } grpc_pollset_set; -void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd); -void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd); +void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd, + grpc_call_list *call_list); +void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h index 9f361cb892..ec8d83fffa 100644 --- a/src/core/iomgr/resolve_address.h +++ b/src/core/iomgr/resolve_address.h @@ -34,7 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H #define GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H -#include <stddef.h> +#include "src/core/iomgr/iomgr.h" #define GRPC_MAX_SOCKADDR_SIZE 128 @@ -52,7 +52,8 @@ typedef struct { On success: addresses is the result, and the callee must call grpc_resolved_addresses_destroy when it's done with them On failure: addresses is NULL */ -typedef void (*grpc_resolve_cb)(void *arg, grpc_resolved_addresses *addresses); +typedef void (*grpc_resolve_cb)(void *arg, grpc_resolved_addresses *addresses, + grpc_call_list *call_list); /* Asynchronously resolve addr. Use default_port if a port isn't designated in addr, otherwise use the port in addr. */ /* TODO(ctiller): add a timeout here */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index ce6972b797..ea64f8176f 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -144,17 +144,19 @@ done: } /* Thread function to asynch-ify grpc_blocking_resolve_address */ -static void do_request(void *rp) { +static void do_request_thread(void *rp) { request *r = rp; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); void *arg = r->arg; grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); - cb(arg, resolved); + cb(arg, resolved, &call_list); grpc_iomgr_unregister_object(&r->iomgr_object); gpr_free(r); + grpc_call_list_run(&call_list); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { @@ -175,7 +177,7 @@ void grpc_resolve_address(const char *name, const char *default_port, r->default_port = gpr_strdup(default_port); r->cb = cb; r->arg = arg; - gpr_thd_new(&id, do_request, r, NULL); + gpr_thd_new(&id, do_request_thread, r, NULL); } #endif diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h index 57f80016c2..8fb5bfda1d 100644 --- a/src/core/iomgr/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -44,10 +44,9 @@ NULL on failure). interested_parties points to a set of pollsets that would be interested in this connection being established (in order to continue their work) */ -void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *tcp), - void *arg, grpc_pollset_set *interested_parties, - grpc_workqueue *workqueue, +void grpc_tcp_client_connect(grpc_closure *on_connect, grpc_endpoint **endpoint, + grpc_pollset_set *interested_parties, const struct sockaddr *addr, size_t addr_len, - gpr_timespec deadline); + gpr_timespec deadline, grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index 1ea2155060..a4828201c7 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -57,8 +57,6 @@ extern int grpc_tcp_trace; typedef struct { - void (*cb)(void *arg, grpc_endpoint *tcp); - void *cb_arg; gpr_mu mu; grpc_fd *fd; gpr_timespec deadline; @@ -67,6 +65,8 @@ typedef struct { grpc_closure write_closure; grpc_pollset_set *interested_parties; char *addr_str; + grpc_endpoint **ep; + grpc_closure *closure; } async_connect; static int prepare_socket(const struct sockaddr *addr, int fd) { @@ -91,7 +91,7 @@ error: return 0; } -static void tc_on_alarm(void *acp, int success) { +static void tc_on_alarm(void *acp, int success, grpc_call_list *call_list) { int done; async_connect *ac = acp; if (grpc_tcp_trace) { @@ -100,7 +100,7 @@ static void tc_on_alarm(void *acp, int success) { } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { - grpc_fd_shutdown(ac->fd); + grpc_fd_shutdown(ac->fd, call_list); } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); @@ -111,15 +111,14 @@ static void tc_on_alarm(void *acp, int success) { } } -static void on_writable(void *acp, int success) { +static void on_writable(void *acp, int success, grpc_call_list *call_list) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; int err; int done; - grpc_endpoint *ep = NULL; - void (*cb)(void *arg, grpc_endpoint *tcp) = ac->cb; - void *cb_arg = ac->cb_arg; + grpc_endpoint **ep = ac->ep; + grpc_closure *closure = ac->closure; grpc_fd *fd; if (grpc_tcp_trace) { @@ -133,7 +132,7 @@ static void on_writable(void *acp, int success) { ac->fd = NULL; gpr_mu_unlock(&ac->mu); - grpc_alarm_cancel(&ac->alarm); + grpc_alarm_cancel(&ac->alarm, call_list); gpr_mu_lock(&ac->mu); if (success) { @@ -162,7 +161,7 @@ static void on_writable(void *acp, int success) { don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); gpr_mu_unlock(&ac->mu); - grpc_fd_notify_on_write(fd, &ac->write_closure); + grpc_fd_notify_on_write(fd, &ac->write_closure, call_list); return; } else { switch (so_error) { @@ -176,8 +175,8 @@ static void on_writable(void *acp, int success) { goto finish; } } else { - grpc_pollset_set_del_fd(ac->interested_parties, fd); - ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); + grpc_pollset_set_del_fd(ac->interested_parties, fd, call_list); + *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); fd = NULL; goto finish; } @@ -190,8 +189,8 @@ static void on_writable(void *acp, int success) { finish: if (fd != NULL) { - grpc_pollset_set_del_fd(ac->interested_parties, fd); - grpc_fd_orphan(fd, NULL, "tcp_client_orphan"); + grpc_pollset_set_del_fd(ac->interested_parties, fd, call_list); + grpc_fd_orphan(fd, NULL, "tcp_client_orphan", call_list); fd = NULL; } done = (--ac->refs == 0); @@ -201,14 +200,14 @@ finish: gpr_free(ac->addr_str); gpr_free(ac); } - cb(cb_arg, ep); + grpc_call_list_add(call_list, closure, *ep != NULL); } -void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), - void *arg, grpc_pollset_set *interested_parties, +void grpc_tcp_client_connect(grpc_closure *closure, grpc_endpoint **ep, + grpc_pollset_set *interested_parties, grpc_workqueue *workqueue, const struct sockaddr *addr, size_t addr_len, - gpr_timespec deadline) { + gpr_timespec deadline, grpc_call_list *call_list) { int fd; grpc_dualstack_mode dsmode; int err; @@ -219,6 +218,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), char *name; char *addr_str; + *ep = NULL; + /* Use dualstack sockets where available. */ if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) { addr = (const struct sockaddr *)&addr6_v4mapped; @@ -236,7 +237,7 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), addr_len = sizeof(addr4_copy); } if (!prepare_socket(addr, fd)) { - cb(arg, NULL); + grpc_call_list_add(call_list, closure, 0); return; } @@ -248,25 +249,26 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), addr_str = grpc_sockaddr_to_uri(addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); - fdobj = grpc_fd_create(fd, workqueue, name); + fdobj = grpc_fd_create(fd, name); if (err >= 0) { - cb(arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); + *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); + grpc_call_list_add(call_list, closure, 1); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno)); - grpc_fd_orphan(fdobj, NULL, "tcp_client_connect_error"); - cb(arg, NULL); + grpc_fd_orphan(fdobj, NULL, "tcp_client_connect_error", call_list); + grpc_call_list_add(call_list, closure, 0); goto done; } - grpc_pollset_set_add_fd(interested_parties, fdobj); + grpc_pollset_set_add_fd(interested_parties, fdobj, call_list); ac = gpr_malloc(sizeof(async_connect)); - ac->cb = cb; - ac->cb_arg = arg; + ac->closure = closure; + ac->ep = ep; ac->fd = fdobj; ac->interested_parties = interested_parties; ac->addr_str = addr_str; @@ -284,8 +286,8 @@ void grpc_tcp_client_connect(void (*cb)(void *arg, grpc_endpoint *ep), gpr_mu_lock(&ac->mu); grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC)); - grpc_fd_notify_on_write(ac->fd, &ac->write_closure); + tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC), call_list); + grpc_fd_notify_on_write(ac->fd, &ac->write_closure, call_list); gpr_mu_unlock(&ac->mu); done: diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 374d2f3a40..89a85f26dc 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -94,30 +94,33 @@ typedef struct { char *peer_string; } grpc_tcp; -static void tcp_handle_read(void *arg /* grpc_tcp */, int success); -static void tcp_handle_write(void *arg /* grpc_tcp */, int success); +static void tcp_handle_read(void *arg /* grpc_tcp */, int success, + grpc_call_list *call_list); +static void tcp_handle_write(void *arg /* grpc_tcp */, int success, + grpc_call_list *call_list); -static void tcp_shutdown(grpc_endpoint *ep) { +static void tcp_shutdown(grpc_endpoint *ep, grpc_call_list *call_list) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_fd_shutdown(tcp->em_fd); + grpc_fd_shutdown(tcp->em_fd, call_list); } -static void tcp_free(grpc_tcp *tcp) { - grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan"); +static void tcp_free(grpc_tcp *tcp, grpc_call_list *call_list) { + grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan", call_list); gpr_free(tcp->peer_string); gpr_free(tcp); } /*#define GRPC_TCP_REFCOUNT_DEBUG*/ #ifdef GRPC_TCP_REFCOUNT_DEBUG -#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +#define TCP_UNREF(tcp, reason, cl) \ + tcp_unref((tcp), (cl), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, - int line) { +static void tcp_unref(grpc_tcp *tcp, grpc_call_list *call_list, + const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count - 1); if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(tcp, call_list); } } @@ -128,23 +131,24 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, gpr_ref(&tcp->refcount); } #else -#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_UNREF(tcp, reason, cl) tcp_unref((tcp), (cl)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_tcp *tcp) { +static void tcp_unref(grpc_tcp *tcp, grpc_call_list *call_list) { if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(tcp, call_list); } } static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif -static void tcp_destroy(grpc_endpoint *ep) { +static void tcp_destroy(grpc_endpoint *ep, grpc_call_list *call_list) { grpc_tcp *tcp = (grpc_tcp *)ep; - TCP_UNREF(tcp, "destroy"); + TCP_UNREF(tcp, "destroy", call_list); } -static void call_read_cb(grpc_tcp *tcp, int success) { +static void call_read_cb(grpc_tcp *tcp, int success, + grpc_call_list *call_list) { grpc_closure *cb = tcp->read_cb; if (grpc_tcp_trace) { @@ -160,11 +164,11 @@ static void call_read_cb(grpc_tcp *tcp, int success) { tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - cb->cb(cb->cb_arg, success); + cb->cb(cb->cb_arg, success, call_list); } #define MAX_READ_IOVEC 4 -static void tcp_continue_read(grpc_tcp *tcp) { +static void tcp_continue_read(grpc_tcp *tcp, grpc_call_list *call_list) { struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; @@ -206,18 +210,18 @@ static void tcp_continue_read(grpc_tcp *tcp) { tcp->iov_size /= 2; } /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, call_list); } else { /* TODO(klempner): Log interesting errors */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0); - TCP_UNREF(tcp, "read"); + call_read_cb(tcp, 0, call_list); + TCP_UNREF(tcp, "read", call_list); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0); - TCP_UNREF(tcp, "read"); + call_read_cb(tcp, 0, call_list); + TCP_UNREF(tcp, "read", call_list); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { @@ -228,29 +232,29 @@ static void tcp_continue_read(grpc_tcp *tcp) { ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); - call_read_cb(tcp, 1); - TCP_UNREF(tcp, "read"); + call_read_cb(tcp, 1, call_list); + TCP_UNREF(tcp, "read", call_list); } GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); } -static void tcp_handle_read(void *arg /* grpc_tcp */, int success) { +static void tcp_handle_read(void *arg /* grpc_tcp */, int success, + grpc_call_list *call_list) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); if (!success) { gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0); - TCP_UNREF(tcp, "read"); + call_read_cb(tcp, 0, call_list); + TCP_UNREF(tcp, "read", call_list); } else { - tcp_continue_read(tcp); + tcp_continue_read(tcp, call_list); } } -static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, - gpr_slice_buffer *incoming_buffer, - grpc_closure *cb) { +static void tcp_read(grpc_endpoint *ep, gpr_slice_buffer *incoming_buffer, + grpc_closure *cb, grpc_call_list *call_list) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; @@ -259,16 +263,16 @@ static grpc_endpoint_op_status tcp_read(grpc_endpoint *ep, TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = 0; - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure); + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, call_list); } else { - grpc_workqueue_push(tcp->em_fd->workqueue, &tcp->read_closure, 1); + grpc_call_list_add(call_list, &tcp->read_closure, 1); } - /* TODO(ctiller): immediate return */ - return GRPC_ENDPOINT_PENDING; } +typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result; + #define MAX_WRITE_IOVEC 16 -static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { +static flush_result tcp_flush(grpc_tcp *tcp) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; msg_iovlen_type iov_size; @@ -318,10 +322,10 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { if (errno == EAGAIN) { tcp->outgoing_slice_idx = unwind_slice_idx; tcp->outgoing_byte_idx = unwind_byte_idx; - return GRPC_ENDPOINT_PENDING; + return FLUSH_PENDING; } else { /* TODO(klempner): Log some of these */ - return GRPC_ENDPOINT_ERROR; + return FLUSH_ERROR; } } @@ -342,42 +346,42 @@ static grpc_endpoint_op_status tcp_flush(grpc_tcp *tcp) { } if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) { - return GRPC_ENDPOINT_DONE; + return FLUSH_DONE; } }; } -static void tcp_handle_write(void *arg /* grpc_tcp */, int success) { +static void tcp_handle_write(void *arg /* grpc_tcp */, int success, + grpc_call_list *call_list) { grpc_tcp *tcp = (grpc_tcp *)arg; - grpc_endpoint_op_status status; + flush_result status; grpc_closure *cb; if (!success) { cb = tcp->write_cb; tcp->write_cb = NULL; - cb->cb(cb->cb_arg, 0); - TCP_UNREF(tcp, "write"); + cb->cb(cb->cb_arg, 0, call_list); + TCP_UNREF(tcp, "write", call_list); return; } GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); status = tcp_flush(tcp); - if (status == GRPC_ENDPOINT_PENDING) { - grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); + if (status == FLUSH_PENDING) { + grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, call_list); } else { cb = tcp->write_cb; tcp->write_cb = NULL; - cb->cb(cb->cb_arg, status == GRPC_ENDPOINT_DONE); - TCP_UNREF(tcp, "write"); + cb->cb(cb->cb_arg, status == FLUSH_DONE, call_list); + TCP_UNREF(tcp, "write", call_list); } GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); } -static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep, - gpr_slice_buffer *buf, - grpc_closure *cb) { +static void tcp_write(grpc_endpoint *ep, gpr_slice_buffer *buf, + grpc_closure *cb, grpc_call_list *call_list) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_endpoint_op_status status; + flush_result status; if (grpc_tcp_trace) { size_t i; @@ -395,32 +399,36 @@ static grpc_endpoint_op_status tcp_write(grpc_endpoint *ep, if (buf->length == 0) { GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); - return GRPC_ENDPOINT_DONE; + grpc_call_list_add(call_list, cb, 1); + return; } tcp->outgoing_buffer = buf; tcp->outgoing_slice_idx = 0; tcp->outgoing_byte_idx = 0; status = tcp_flush(tcp); - if (status == GRPC_ENDPOINT_PENDING) { + if (status == FLUSH_PENDING) { TCP_REF(tcp, "write"); tcp->write_cb = cb; - grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure); + grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, call_list); + } else { + grpc_call_list_add(call_list, cb, status == FLUSH_DONE); } GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); - return status; } -static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset) { +static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset, + grpc_call_list *call_list) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_pollset_add_fd(pollset, tcp->em_fd); + grpc_pollset_add_fd(pollset, tcp->em_fd, call_list); } static void tcp_add_to_pollset_set(grpc_endpoint *ep, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set, + grpc_call_list *call_list) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_pollset_set_add_fd(pollset_set, tcp->em_fd); + grpc_pollset_set_add_fd(pollset_set, tcp->em_fd, call_list); } static char *tcp_get_peer(grpc_endpoint *ep) { diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index 5165f5c5ca..19e58bc294 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -40,7 +40,8 @@ typedef struct grpc_tcp_server grpc_tcp_server; /* New server callback: tcp is the newly connected tcp connection */ -typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep); +typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep, + grpc_call_list *call_list); /* Create a server, initially not bound to any ports */ grpc_tcp_server *grpc_tcp_server_create(void); @@ -48,7 +49,7 @@ grpc_tcp_server *grpc_tcp_server_create(void); /* Start listening to bound ports */ void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets, size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg); + void *cb_arg, grpc_call_list *call_list); /* Add a port to the server, returning port number on success, or negative on failure. @@ -71,8 +72,7 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, up when grpc_tcp_server_destroy is called. */ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index); -void grpc_tcp_server_destroy(grpc_tcp_server *server, - void (*shutdown_done)(void *shutdown_done_arg), - void *shutdown_done_arg); +void grpc_tcp_server_destroy(grpc_tcp_server *server, grpc_closure *closure, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 213b2e1113..0c5e0053dd 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -117,16 +117,12 @@ struct grpc_tcp_server { size_t port_capacity; /* shutdown callback */ - void (*shutdown_complete)(void *); - void *shutdown_complete_arg; + grpc_closure *shutdown_complete; /* all pollsets interested in new connections */ grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; - - /** workqueue for interally created async work */ - grpc_workqueue *workqueue; }; grpc_tcp_server *grpc_tcp_server_create(void) { @@ -140,40 +136,37 @@ grpc_tcp_server *grpc_tcp_server_create(void) { s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; - s->workqueue = grpc_workqueue_create(); return s; } -static void finish_shutdown(grpc_tcp_server *s) { - s->shutdown_complete(s->shutdown_complete_arg); - s->shutdown_complete = NULL; +static void finish_shutdown(grpc_tcp_server *s, grpc_call_list *call_list) { + grpc_call_list_add(call_list, s->shutdown_complete, 1); gpr_mu_destroy(&s->mu); gpr_free(s->ports); - GRPC_WORKQUEUE_UNREF(s->workqueue, "destroy"); gpr_free(s); } -static void destroyed_port(void *server, int success) { +static void destroyed_port(void *server, int success, + grpc_call_list *call_list) { grpc_tcp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { gpr_mu_unlock(&s->mu); - finish_shutdown(s); + finish_shutdown(s, call_list); } else { GPR_ASSERT(s->destroyed_ports < s->nports); gpr_mu_unlock(&s->mu); } } -static void dont_care_about_shutdown_completion(void *ignored) {} - /* called when all listening endpoints have been shutdown, so no further events will be received on them - at this point it's safe to destroy things */ -static void deactivated_all_ports(grpc_tcp_server *s) { +static void deactivated_all_ports(grpc_tcp_server *s, + grpc_call_list *call_list) { size_t i; /* delete ALL the things */ @@ -192,38 +185,35 @@ static void deactivated_all_ports(grpc_tcp_server *s) { } sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; - grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown"); + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown", + call_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(s); + finish_shutdown(s, call_list); } } -void grpc_tcp_server_destroy( - grpc_tcp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg), - void *shutdown_complete_arg) { +void grpc_tcp_server_destroy(grpc_tcp_server *s, grpc_closure *closure, + grpc_call_list *call_list) { size_t i; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); s->shutdown = 1; - s->shutdown_complete = shutdown_complete - ? shutdown_complete - : dont_care_about_shutdown_completion; - s->shutdown_complete_arg = shutdown_complete_arg; + s->shutdown_complete = closure; /* shutdown all fd's */ if (s->active_ports) { for (i = 0; i < s->nports; i++) { - grpc_fd_shutdown(s->ports[i].emfd); + grpc_fd_shutdown(s->ports[i].emfd, call_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - deactivated_all_ports(s); + deactivated_all_ports(s, call_list); } } @@ -308,7 +298,7 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, int success) { +static void on_read(void *arg, int success, grpc_call_list *call_list) { server_port *sp = arg; grpc_fd *fdobj; size_t i; @@ -331,7 +321,7 @@ static void on_read(void *arg, int success) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, call_list); return; default: gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); @@ -348,16 +338,17 @@ static void on_read(void *arg, int success) { gpr_log(GPR_DEBUG, "SERVER_CONNECT: incoming connection: %s", addr_str); } - fdobj = grpc_fd_create(fd, sp->server->workqueue, name); + fdobj = grpc_fd_create(fd, name); /* TODO(ctiller): revise this when we have server-side sharding of channels -- we certainly should not be automatically adding every incoming channel to every pollset owned by the server */ for (i = 0; i < sp->server->pollset_count; i++) { - grpc_pollset_add_fd(sp->server->pollsets[i], fdobj); + grpc_pollset_add_fd(sp->server->pollsets[i], fdobj, call_list); } sp->server->cb( sp->server->cb_arg, - grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str)); + grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), + call_list); gpr_free(name); gpr_free(addr_str); @@ -369,7 +360,7 @@ error: gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server); + deactivated_all_ports(sp->server, call_list); } else { gpr_mu_unlock(&sp->server->mu); } @@ -396,7 +387,7 @@ static int add_socket_to_server(grpc_tcp_server *s, int fd, sp = &s->ports[s->nports++]; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, s->workqueue, name); + sp->emfd = grpc_fd_create(fd, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; GPR_ASSERT(sp->emfd); @@ -495,7 +486,7 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) { void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, size_t pollset_count, grpc_tcp_server_cb cb, - void *cb_arg) { + void *cb_arg, grpc_call_list *call_list) { size_t i, j; GPR_ASSERT(cb); gpr_mu_lock(&s->mu); @@ -507,11 +498,12 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, s->pollset_count = pollset_count; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd); + grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, call_list); } s->ports[i].read_closure.cb = on_read; s->ports[i].read_closure.cb_arg = &s->ports[i]; - grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure); + grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure, + call_list); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index 1a1d812050..e1c2ae95fd 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -111,15 +111,12 @@ struct grpc_udp_server { size_t port_capacity; /* shutdown callback */ - void (*shutdown_complete)(void *); - void *shutdown_complete_arg; + grpc_closure *shutdown_complete; /* all pollsets interested in new connections */ grpc_pollset **pollsets; /* number of pollsets in the pollsets array */ size_t pollset_count; - - grpc_workqueue *workqueue; }; grpc_udp_server *grpc_udp_server_create(void) { @@ -132,40 +129,38 @@ grpc_udp_server *grpc_udp_server_create(void) { s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP); s->nports = 0; s->port_capacity = INIT_PORT_CAP; - s->workqueue = grpc_workqueue_create(); return s; } -static void finish_shutdown(grpc_udp_server *s) { - s->shutdown_complete(s->shutdown_complete_arg); +static void finish_shutdown(grpc_udp_server *s, grpc_call_list *call_list) { + grpc_call_list_add(call_list, s->shutdown_complete, 1); gpr_mu_destroy(&s->mu); gpr_cv_destroy(&s->cv); gpr_free(s->ports); - GRPC_WORKQUEUE_UNREF(s->workqueue, "workqueue"); gpr_free(s); } -static void destroyed_port(void *server, int success) { +static void destroyed_port(void *server, int success, + grpc_call_list *call_list) { grpc_udp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { gpr_mu_unlock(&s->mu); - finish_shutdown(s); + finish_shutdown(s, call_list); } else { gpr_mu_unlock(&s->mu); } } -static void dont_care_about_shutdown_completion(void *ignored) {} - /* called when all listening endpoints have been shutdown, so no further events will be received on them - at this point it's safe to destroy things */ -static void deactivated_all_ports(grpc_udp_server *s) { +static void deactivated_all_ports(grpc_udp_server *s, + grpc_call_list *call_list) { size_t i; /* delete ALL the things */ @@ -184,38 +179,35 @@ static void deactivated_all_ports(grpc_udp_server *s) { } sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; - grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown"); + grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown", + call_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(s); + finish_shutdown(s, call_list); } } -void grpc_udp_server_destroy( - grpc_udp_server *s, void (*shutdown_complete)(void *shutdown_complete_arg), - void *shutdown_complete_arg) { +void grpc_udp_server_destroy(grpc_udp_server *s, grpc_closure *on_done, + grpc_call_list *call_list) { size_t i; gpr_mu_lock(&s->mu); GPR_ASSERT(!s->shutdown); s->shutdown = 1; - s->shutdown_complete = shutdown_complete - ? shutdown_complete - : dont_care_about_shutdown_completion; - s->shutdown_complete_arg = shutdown_complete_arg; + s->shutdown_complete = on_done; /* shutdown all fd's */ if (s->active_ports) { for (i = 0; i < s->nports; i++) { - grpc_fd_shutdown(s->ports[i].emfd); + grpc_fd_shutdown(s->ports[i].emfd, call_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - deactivated_all_ports(s); + deactivated_all_ports(s, call_list); } } @@ -270,14 +262,14 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, int success) { +static void on_read(void *arg, int success, grpc_call_list *call_list) { server_port *sp = arg; if (success == 0) { gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server); + deactivated_all_ports(sp->server, call_list); } else { gpr_mu_unlock(&sp->server->mu); } @@ -289,7 +281,7 @@ static void on_read(void *arg, int success) { sp->read_cb(sp->fd); /* Re-arm the notification event so we get another chance to read. */ - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, call_list); } static int add_socket_to_server(grpc_udp_server *s, int fd, @@ -313,7 +305,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd, sp = &s->ports[s->nports++]; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, s->workqueue, name); + sp->emfd = grpc_fd_create(fd, name); memcpy(sp->addr.untyped, addr, addr_len); sp->addr_len = addr_len; sp->read_cb = read_cb; @@ -410,18 +402,19 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { } void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, - size_t pollset_count) { + size_t pollset_count, grpc_call_list *call_list) { size_t i, j; gpr_mu_lock(&s->mu); GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd); + grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, call_list); } s->ports[i].read_closure.cb = on_read; s->ports[i].read_closure.cb_arg = &s->ports[i]; - grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure); + grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure, + call_list); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index c930e81cbc..fa4d2147b4 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -47,7 +47,7 @@ grpc_udp_server *grpc_udp_server_create(void); /* Start listening to bound ports */ void grpc_udp_server_start(grpc_udp_server *udp_server, grpc_pollset **pollsets, - size_t pollset_count); + size_t pollset_count, grpc_call_list *call_list); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); @@ -64,9 +64,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, size_t addr_len, grpc_udp_server_read_cb read_cb); -void grpc_udp_server_destroy(grpc_udp_server *server, - void (*shutdown_done)(void *shutdown_done_arg), - void *shutdown_done_arg); +void grpc_udp_server_destroy(grpc_udp_server *server, grpc_closure *on_done, + grpc_call_list *call_list); /* Write the contents of buffer to the underlying UDP socket. */ /* diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h index 6f09399b55..b9d2a87dca 100644 --- a/src/core/iomgr/workqueue.h +++ b/src/core/iomgr/workqueue.h @@ -50,25 +50,25 @@ struct grpc_workqueue; typedef struct grpc_workqueue grpc_workqueue; /** Create a work queue */ -grpc_workqueue *grpc_workqueue_create(void); +grpc_workqueue *grpc_workqueue_create(grpc_call_list *call_list); -void grpc_workqueue_flush(grpc_workqueue *workqueue); +void grpc_workqueue_flush(grpc_workqueue *workqueue, grpc_call_list *call_list); #define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #define GRPC_WORKQUEUE_REF(p, r) \ grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_WORKQUEUE_UNREF(p, r) \ - grpc_workqueue_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_WORKQUEUE_UNREF(p, r, cl) \ + grpc_workqueue_unref((p), (cl), __FILE__, __LINE__, (r)) void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason); -void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line, - const char *reason); +void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list, + const char *file, int line, const char *reason); #else #define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) -#define GRPC_WORKQUEUE_UNREF(p, r) grpc_workqueue_unref((p)) +#define GRPC_WORKQUEUE_UNREF(p, r, cl) grpc_workqueue_unref((p), (cl)) void grpc_workqueue_ref(grpc_workqueue *workqueue); -void grpc_workqueue_unref(grpc_workqueue *workqueue); +void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list); #endif /** Bind this workqueue to a pollset */ diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index 85b541f4d2..83249c583c 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -45,9 +45,9 @@ #include "src/core/iomgr/fd_posix.h" -static void on_readable(void *arg, int success); +static void on_readable(void *arg, int success, grpc_call_list *call_list); -grpc_workqueue *grpc_workqueue_create(void) { +grpc_workqueue *grpc_workqueue_create(grpc_call_list *call_list) { char name[32]; grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); gpr_ref_init(&workqueue->refs, 1); @@ -55,17 +55,18 @@ grpc_workqueue *grpc_workqueue_create(void) { workqueue->call_list.head = workqueue->call_list.tail = NULL; grpc_wakeup_fd_init(&workqueue->wakeup_fd); sprintf(name, "workqueue:%p", (void *)workqueue); - workqueue->wakeup_read_fd = NULL; /* inspected during grpc_fd_create below */ - workqueue->wakeup_read_fd = grpc_fd_create( - GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), workqueue, name); + workqueue->wakeup_read_fd = + grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name); grpc_closure_init(&workqueue->read_closure, on_readable, workqueue); - grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); + grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure, + call_list); return workqueue; } -static void workqueue_destroy(grpc_workqueue *workqueue) { +static void workqueue_destroy(grpc_workqueue *workqueue, + grpc_call_list *call_list) { GPR_ASSERT(grpc_call_list_empty(workqueue->call_list)); - grpc_fd_shutdown(workqueue->wakeup_read_fd); + grpc_fd_shutdown(workqueue->wakeup_read_fd, call_list); } #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -81,33 +82,34 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue) { } #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -void grpc_workqueue_unref(grpc_workqueue *workqueue, const char *file, int line, - const char *reason) { +void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list, + const char *file, int line, const char *reason) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1, reason); #else -void grpc_workqueue_unref(grpc_workqueue *workqueue) { +void grpc_workqueue_unref(grpc_workqueue *workqueue, + grpc_call_list *call_list) { #endif if (gpr_unref(&workqueue->refs)) { - workqueue_destroy(workqueue); + workqueue_destroy(workqueue, call_list); } } void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, - grpc_pollset *pollset) { - grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd); + grpc_pollset *pollset, + grpc_call_list *call_list) { + grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd, call_list); } -void grpc_workqueue_flush(grpc_workqueue *workqueue) { - grpc_call_list todo = GRPC_CALL_LIST_INIT; +void grpc_workqueue_flush(grpc_workqueue *workqueue, + grpc_call_list *call_list) { gpr_mu_lock(&workqueue->mu); - GPR_SWAP(grpc_call_list, todo, workqueue->call_list); + grpc_call_list_move(&workqueue->call_list, call_list); gpr_mu_unlock(&workqueue->mu); - grpc_call_list_run(todo); } -static void on_readable(void *arg, int success) { +static void on_readable(void *arg, int success, grpc_call_list *call_list) { grpc_workqueue *workqueue = arg; if (!success) { @@ -115,16 +117,15 @@ static void on_readable(void *arg, int success) { /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); - grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy"); + grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy", call_list); gpr_free(workqueue); } else { - grpc_call_list todo = GRPC_CALL_LIST_INIT; gpr_mu_lock(&workqueue->mu); - GPR_SWAP(grpc_call_list, todo, workqueue->call_list); + grpc_call_list_move(&workqueue->call_list, call_list); grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); - grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure); - grpc_call_list_run(todo); + grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure, + call_list); } } |