diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-22 09:30:00 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-22 09:30:00 -0700 |
commit | d9ccbbf6b98721f3e0256436e8a31fb378324d34 (patch) | |
tree | 70044a256dc950acf8b5fe203e417b00da6a1114 /src/core/iomgr | |
parent | 10ee2747a92a20c0bbe8cf3e2e759a121c6cb076 (diff) |
Rename call_list to closure_list
Diffstat (limited to 'src/core/iomgr')
29 files changed, 335 insertions, 316 deletions
diff --git a/src/core/iomgr/alarm.c b/src/core/iomgr/alarm.c index da265d4e50..d26dc57b09 100644 --- a/src/core/iomgr/alarm.c +++ b/src/core/iomgr/alarm.c @@ -73,7 +73,8 @@ static shard_type g_shards[NUM_SHARDS]; static shard_type *g_shard_queue[NUM_SHARDS]; static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next, - int success, grpc_call_list *call_list); + int success, + grpc_closure_list *closure_list); static gpr_timespec compute_min_deadline(shard_type *shard) { return grpc_alarm_heap_is_empty(&shard->heap) @@ -102,9 +103,9 @@ void grpc_alarm_list_init(gpr_timespec now) { } } -void grpc_alarm_list_shutdown(grpc_call_list *call_list) { +void grpc_alarm_list_shutdown(grpc_closure_list *closure_list) { int i; - run_some_expired_alarms(gpr_inf_future(g_clock_type), NULL, 0, call_list); + run_some_expired_alarms(gpr_inf_future(g_clock_type), NULL, 0, closure_list); for (i = 0; i < NUM_SHARDS; i++) { shard_type *shard = &g_shards[i]; gpr_mu_destroy(&shard->mu); @@ -172,7 +173,7 @@ static void note_deadline_change(shard_type *shard) { void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg, - gpr_timespec now, grpc_call_list *call_list) { + gpr_timespec now, grpc_closure_list *closure_list) { int is_first_alarm = 0; shard_type *shard = &g_shards[shard_idx(alarm)]; GPR_ASSERT(deadline.clock_type == g_clock_type); @@ -220,11 +221,11 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, } } -void grpc_alarm_cancel(grpc_alarm *alarm, grpc_call_list *call_list) { +void grpc_alarm_cancel(grpc_alarm *alarm, grpc_closure_list *closure_list) { shard_type *shard = &g_shards[shard_idx(alarm)]; gpr_mu_lock(&shard->mu); if (!alarm->triggered) { - grpc_call_list_add(call_list, &alarm->closure, 0); + grpc_closure_list_add(closure_list, &alarm->closure, 0); alarm->triggered = 1; if (alarm->heap_index == INVALID_HEAP_INDEX) { list_remove(alarm); @@ -285,12 +286,12 @@ static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) { /* REQUIRES: shard->mu unlocked */ static size_t pop_alarms(shard_type *shard, gpr_timespec now, gpr_timespec *new_min_deadline, int success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t n = 0; grpc_alarm *alarm; gpr_mu_lock(&shard->mu); while ((alarm = pop_one(shard, now))) { - grpc_call_list_add(call_list, &alarm->closure, success); + grpc_closure_list_add(closure_list, &alarm->closure, success); n++; } *new_min_deadline = compute_min_deadline(shard); @@ -299,7 +300,8 @@ static size_t pop_alarms(shard_type *shard, gpr_timespec now, } static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next, - int success, grpc_call_list *call_list) { + int success, + grpc_closure_list *closure_list) { size_t n = 0; /* TODO(ctiller): verify that there are any alarms (atomically) here */ @@ -314,7 +316,7 @@ static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next, shard. This may violate perfect alarm deadline ordering, but that shouldn't be a big deal because we don't make ordering guarantees. */ n += pop_alarms(g_shard_queue[0], now, &new_min_deadline, success, - call_list); + closure_list); /* An grpc_alarm_init() on the shard could intervene here, adding a new alarm that is earlier than new_min_deadline. However, @@ -337,11 +339,11 @@ static int run_some_expired_alarms(gpr_timespec now, gpr_timespec *next, } int grpc_alarm_check(gpr_timespec now, gpr_timespec *next, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { GPR_ASSERT(now.clock_type == g_clock_type); return run_some_expired_alarms( now, next, gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0, - call_list); + closure_list); } gpr_timespec grpc_alarm_list_next_timeout(void) { diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h index 05ca7c27bb..09bfa88756 100644 --- a/src/core/iomgr/alarm.h +++ b/src/core/iomgr/alarm.h @@ -55,7 +55,7 @@ typedef struct grpc_alarm { information about when to free up any user-level state. */ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb, void *alarm_cb_arg, - gpr_timespec now, grpc_call_list *call_list); + gpr_timespec now, grpc_closure_list *closure_list); /* Note that there is no alarm destroy function. This is because the alarm is a one-time occurrence with a guarantee that the callback will @@ -83,6 +83,6 @@ void grpc_alarm_init(grpc_alarm *alarm, gpr_timespec deadline, matches this aim. Requires: cancel() must happen after add() on a given alarm */ -void grpc_alarm_cancel(grpc_alarm *alarm, grpc_call_list *call_list); +void grpc_alarm_cancel(grpc_alarm *alarm, grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_ALARM_H */ diff --git a/src/core/iomgr/alarm_internal.h b/src/core/iomgr/alarm_internal.h index a2e9946f4f..aebc789ec6 100644 --- a/src/core/iomgr/alarm_internal.h +++ b/src/core/iomgr/alarm_internal.h @@ -49,9 +49,9 @@ at any time slice. */ int grpc_alarm_check(gpr_timespec now, gpr_timespec *next, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_alarm_list_init(gpr_timespec now); -void grpc_alarm_list_shutdown(grpc_call_list *call_list); +void grpc_alarm_list_shutdown(grpc_closure_list *closure_list); gpr_timespec grpc_alarm_list_next_timeout(void); diff --git a/src/core/iomgr/endpoint.c b/src/core/iomgr/endpoint.c index 769f155a63..db2a738d09 100644 --- a/src/core/iomgr/endpoint.c +++ b/src/core/iomgr/endpoint.c @@ -34,32 +34,33 @@ #include "src/core/iomgr/endpoint.h" void grpc_endpoint_read(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_closure *cb, grpc_call_list *call_list) { - ep->vtable->read(ep, slices, cb, call_list); + grpc_closure *cb, grpc_closure_list *closure_list) { + ep->vtable->read(ep, slices, cb, closure_list); } void grpc_endpoint_write(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_closure *cb, grpc_call_list *call_list) { - ep->vtable->write(ep, slices, cb, call_list); + grpc_closure *cb, grpc_closure_list *closure_list) { + ep->vtable->write(ep, slices, cb, closure_list); } void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset, - grpc_call_list *call_list) { - ep->vtable->add_to_pollset(ep, pollset, call_list); + grpc_closure_list *closure_list) { + ep->vtable->add_to_pollset(ep, pollset, closure_list); } void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set, - grpc_call_list *call_list) { - ep->vtable->add_to_pollset_set(ep, pollset_set, call_list); + grpc_closure_list *closure_list) { + ep->vtable->add_to_pollset_set(ep, pollset_set, closure_list); } -void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_call_list *call_list) { - ep->vtable->shutdown(ep, call_list); +void grpc_endpoint_shutdown(grpc_endpoint *ep, + grpc_closure_list *closure_list) { + ep->vtable->shutdown(ep, closure_list); } -void grpc_endpoint_destroy(grpc_endpoint *ep, grpc_call_list *call_list) { - ep->vtable->destroy(ep, call_list); +void grpc_endpoint_destroy(grpc_endpoint *ep, grpc_closure_list *closure_list) { + ep->vtable->destroy(ep, closure_list); } char *grpc_endpoint_get_peer(grpc_endpoint *ep) { diff --git a/src/core/iomgr/endpoint.h b/src/core/iomgr/endpoint.h index f9881684ff..cb722e32e9 100644 --- a/src/core/iomgr/endpoint.h +++ b/src/core/iomgr/endpoint.h @@ -48,15 +48,15 @@ typedef struct grpc_endpoint_vtable grpc_endpoint_vtable; struct grpc_endpoint_vtable { void (*read)(grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void (*write)(grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void (*add_to_pollset)(grpc_endpoint *ep, grpc_pollset *pollset, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void (*add_to_pollset_set)(grpc_endpoint *ep, grpc_pollset_set *pollset, - grpc_call_list *call_list); - void (*shutdown)(grpc_endpoint *ep, grpc_call_list *call_list); - void (*destroy)(grpc_endpoint *ep, grpc_call_list *call_list); + grpc_closure_list *closure_list); + void (*shutdown)(grpc_endpoint *ep, grpc_closure_list *closure_list); + void (*destroy)(grpc_endpoint *ep, grpc_closure_list *closure_list); char *(*get_peer)(grpc_endpoint *ep); }; @@ -65,7 +65,7 @@ struct grpc_endpoint_vtable { indicates the endpoint is closed. Valid slices may be placed into \a slices even on callback success == 0. */ void grpc_endpoint_read(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_closure *cb, grpc_call_list *call_list); + grpc_closure *cb, grpc_closure_list *closure_list); char *grpc_endpoint_get_peer(grpc_endpoint *ep); @@ -80,20 +80,20 @@ char *grpc_endpoint_get_peer(grpc_endpoint *ep); it is a valid slice buffer. */ void grpc_endpoint_write(grpc_endpoint *ep, gpr_slice_buffer *slices, - grpc_closure *cb, grpc_call_list *call_list); + grpc_closure *cb, grpc_closure_list *closure_list); /* Causes any pending read/write callbacks to run immediately with success==0 */ -void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_call_list *call_list); -void grpc_endpoint_destroy(grpc_endpoint *ep, grpc_call_list *call_list); +void grpc_endpoint_shutdown(grpc_endpoint *ep, grpc_closure_list *closure_list); +void grpc_endpoint_destroy(grpc_endpoint *ep, grpc_closure_list *closure_list); /* Add an endpoint to a pollset, so that when the pollset is polled, events from this endpoint are considered */ void grpc_endpoint_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_endpoint_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set, - grpc_call_list *call_list); + grpc_closure_list *closure_list); struct grpc_endpoint { const grpc_endpoint_vtable *vtable; diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 7c1db32553..5e455f32d7 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -212,7 +212,7 @@ static int has_watchers(grpc_fd *fd) { } void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { fd->on_done_closure = on_done; shutdown(fd->fd, SHUT_RDWR); gpr_mu_lock(&fd->watcher_mu); @@ -220,7 +220,7 @@ void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason, if (!has_watchers(fd)) { fd->closed = 1; close(fd->fd); - grpc_call_list_add(call_list, fd->on_done_closure, 1); + grpc_closure_list_add(closure_list, fd->on_done_closure, 1); } else { wake_all_watchers_locked(fd); } @@ -245,7 +245,7 @@ void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } #endif static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { switch (gpr_atm_acq_load(st)) { case NOT_READY: /* There is no race if the descriptor is already ready, so we skip @@ -267,7 +267,8 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure, case READY: GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); - grpc_call_list_add(call_list, closure, !gpr_atm_acq_load(&fd->shutdown)); + grpc_closure_list_add(closure_list, closure, + !gpr_atm_acq_load(&fd->shutdown)); return; default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ @@ -281,7 +282,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_closure *closure, } static void set_ready_locked(grpc_fd *fd, gpr_atm *st, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { gpr_intptr state = gpr_atm_acq_load(st); switch (state) { @@ -300,38 +301,39 @@ static void set_ready_locked(grpc_fd *fd, gpr_atm *st, default: /* waiting */ GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY && gpr_atm_no_barrier_load(st) != NOT_READY); - grpc_call_list_add(call_list, (grpc_closure *)state, - !gpr_atm_acq_load(&fd->shutdown)); + grpc_closure_list_add(closure_list, (grpc_closure *)state, + !gpr_atm_acq_load(&fd->shutdown)); gpr_atm_rel_store(st, NOT_READY); return; } } -static void set_ready(grpc_fd *fd, gpr_atm *st, grpc_call_list *call_list) { +static void set_ready(grpc_fd *fd, gpr_atm *st, + grpc_closure_list *closure_list) { /* only one set_ready can be active at once (but there may be a racing notify_on) */ gpr_mu_lock(&fd->set_state_mu); - set_ready_locked(fd, st, call_list); + set_ready_locked(fd, st, closure_list); gpr_mu_unlock(&fd->set_state_mu); } -void grpc_fd_shutdown(grpc_fd *fd, grpc_call_list *call_list) { +void grpc_fd_shutdown(grpc_fd *fd, grpc_closure_list *closure_list) { gpr_mu_lock(&fd->set_state_mu); GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown)); gpr_atm_rel_store(&fd->shutdown, 1); - set_ready_locked(fd, &fd->readst, call_list); - set_ready_locked(fd, &fd->writest, call_list); + set_ready_locked(fd, &fd->readst, closure_list); + set_ready_locked(fd, &fd->writest, closure_list); gpr_mu_unlock(&fd->set_state_mu); } void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure, - grpc_call_list *call_list) { - notify_on(fd, &fd->readst, closure, call_list); + grpc_closure_list *closure_list) { + notify_on(fd, &fd->readst, closure, closure_list); } void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure, - grpc_call_list *call_list) { - notify_on(fd, &fd->writest, closure, call_list); + grpc_closure_list *closure_list) { + notify_on(fd, &fd->writest, closure, closure_list); } gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, @@ -378,7 +380,7 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, } void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { int was_polling = 0; int kick = 0; grpc_fd *fd = watcher->fd; @@ -411,19 +413,19 @@ void grpc_fd_end_poll(grpc_fd_watcher *watcher, int got_read, int got_write, if (grpc_fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) { fd->closed = 1; close(fd->fd); - grpc_call_list_add(call_list, fd->on_done_closure, 1); + grpc_closure_list_add(closure_list, fd->on_done_closure, 1); } gpr_mu_unlock(&fd->watcher_mu); GRPC_FD_UNREF(fd, "poll"); } -void grpc_fd_become_readable(grpc_fd *fd, grpc_call_list *call_list) { - set_ready(fd, &fd->readst, call_list); +void grpc_fd_become_readable(grpc_fd *fd, grpc_closure_list *closure_list) { + set_ready(fd, &fd->readst, closure_list); } -void grpc_fd_become_writable(grpc_fd *fd, grpc_call_list *call_list) { - set_ready(fd, &fd->writest, call_list); +void grpc_fd_become_writable(grpc_fd *fd, grpc_closure_list *closure_list) { + set_ready(fd, &fd->writest, closure_list); } #endif diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h index 607cba6181..eab0daa00d 100644 --- a/src/core/iomgr/fd_posix.h +++ b/src/core/iomgr/fd_posix.h @@ -112,7 +112,7 @@ grpc_fd *grpc_fd_create(int fd, const char *name); notify_on_write. MUST NOT be called with a pollset lock taken */ void grpc_fd_orphan(grpc_fd *fd, grpc_closure *on_done, const char *reason, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Begin polling on an fd. Registers that the given pollset is interested in this fd - so that if read @@ -131,13 +131,13 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, /* Complete polling previously started with grpc_fd_begin_poll MUST NOT be called with a pollset lock taken */ void grpc_fd_end_poll(grpc_fd_watcher *rec, int got_read, int got_write, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Return 1 if this fd is orphaned, 0 otherwise */ int grpc_fd_is_orphaned(grpc_fd *fd); /* Cause any current callbacks to error out with GRPC_CALLBACK_CANCELLED. */ -void grpc_fd_shutdown(grpc_fd *fd, grpc_call_list *call_list); +void grpc_fd_shutdown(grpc_fd *fd, grpc_closure_list *closure_list); /* Register read interest, causing read_cb to be called once when fd becomes readable, on deadline specified by deadline, or on shutdown triggered by @@ -153,18 +153,18 @@ void grpc_fd_shutdown(grpc_fd *fd, grpc_call_list *call_list); calling notify_on_read again. Users are also expected to handle spurious events, i.e read_cb is called while nothing can be readable from fd */ void grpc_fd_notify_on_read(grpc_fd *fd, grpc_closure *closure, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Exactly the same semantics as above, except based on writable events. */ void grpc_fd_notify_on_write(grpc_fd *fd, grpc_closure *closure, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Notification from the poller to an fd that it has become readable or writable. If allow_synchronous_callback is 1, allow running the fd callback inline in this callstack, otherwise register an asynchronous callback and return */ -void grpc_fd_become_readable(grpc_fd *fd, grpc_call_list *call_list); -void grpc_fd_become_writable(grpc_fd *fd, grpc_call_list *call_list); +void grpc_fd_become_readable(grpc_fd *fd, grpc_closure_list *closure_list); +void grpc_fd_become_writable(grpc_fd *fd, grpc_closure_list *closure_list); /* Reference counting for fds */ #ifdef GRPC_FD_REF_COUNT_DEBUG diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index b2c17b1ef2..9456bad716 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -88,7 +88,7 @@ void grpc_iomgr_shutdown(void) { gpr_timespec shutdown_deadline = gpr_time_add( gpr_now(GPR_CLOCK_REALTIME), gpr_time_from_seconds(10, GPR_TIMESPAN)); gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME); - grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_closure_list closure_list = GRPC_CLOSURE_LIST_INIT; gpr_mu_lock(&g_mu); g_shutdown = 1; @@ -103,9 +103,9 @@ void grpc_iomgr_shutdown(void) { last_warning_time = gpr_now(GPR_CLOCK_REALTIME); } if (grpc_alarm_check(gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL, - &call_list)) { + &closure_list)) { gpr_mu_unlock(&g_mu); - grpc_call_list_run(&call_list); + grpc_closure_list_run(&closure_list); gpr_mu_lock(&g_mu); continue; } @@ -131,8 +131,8 @@ void grpc_iomgr_shutdown(void) { } gpr_mu_unlock(&g_mu); - grpc_alarm_list_shutdown(&call_list); - grpc_call_list_run(&call_list); + grpc_alarm_list_shutdown(&closure_list); + grpc_closure_list_run(&closure_list); /* ensure all threads have left g_mu */ gpr_mu_lock(&g_mu); @@ -168,36 +168,36 @@ void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, closure->next = NULL; } -void grpc_call_list_add(grpc_call_list *call_list, grpc_closure *closure, - int success) { +void grpc_closure_list_add(grpc_closure_list *closure_list, + grpc_closure *closure, int success) { if (closure == NULL) return; closure->next = NULL; closure->success = success; - if (call_list->head == NULL) { - call_list->head = closure; + if (closure_list->head == NULL) { + closure_list->head = closure; } else { - call_list->tail->next = closure; + closure_list->tail->next = closure; } - call_list->tail = closure; + closure_list->tail = closure; } -void grpc_call_list_run(grpc_call_list *call_list) { - while (!grpc_call_list_empty(*call_list)) { - grpc_closure *c = call_list->head; - call_list->head = call_list->tail = NULL; +void grpc_closure_list_run(grpc_closure_list *closure_list) { + while (!grpc_closure_list_empty(*closure_list)) { + grpc_closure *c = closure_list->head; + closure_list->head = closure_list->tail = NULL; while (c != NULL) { grpc_closure *next = c->next; - c->cb(c->cb_arg, c->success, call_list); + c->cb(c->cb_arg, c->success, closure_list); c = next; } } } -int grpc_call_list_empty(grpc_call_list call_list) { - return call_list.head == NULL; +int grpc_closure_list_empty(grpc_closure_list closure_list) { + return closure_list.head == NULL; } -void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst) { +void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst) { if (src->head == NULL) { return; } diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h index 94f3912990..bc015706a3 100644 --- a/src/core/iomgr/iomgr.h +++ b/src/core/iomgr/iomgr.h @@ -37,10 +37,10 @@ struct grpc_closure; typedef struct grpc_closure grpc_closure; -typedef struct grpc_call_list { +typedef struct grpc_closure_list { grpc_closure *head; grpc_closure *tail; -} grpc_call_list; +} grpc_closure_list; /** gRPC Callback definition. * @@ -48,7 +48,7 @@ typedef struct grpc_call_list { * \param success An indication on the state of the iomgr. On false, cleanup * actions should be taken (eg, shutdown). */ typedef void (*grpc_iomgr_cb_func)(void *arg, int success, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /** A closure over a grpc_iomgr_cb_func. */ struct grpc_closure { @@ -71,14 +71,14 @@ struct grpc_closure { void grpc_closure_init(grpc_closure *closure, grpc_iomgr_cb_func cb, void *cb_arg); -#define GRPC_CALL_LIST_INIT \ +#define GRPC_CLOSURE_LIST_INIT \ { NULL, NULL } -void grpc_call_list_add(grpc_call_list *list, grpc_closure *closure, - int success); -void grpc_call_list_run(grpc_call_list *list); -void grpc_call_list_move(grpc_call_list *src, grpc_call_list *dst); -int grpc_call_list_empty(grpc_call_list list); +void grpc_closure_list_add(grpc_closure_list *list, grpc_closure *closure, + int success); +void grpc_closure_list_run(grpc_closure_list *list); +void grpc_closure_list_move(grpc_closure_list *src, grpc_closure_list *dst); +int grpc_closure_list_empty(grpc_closure_list list); /** Initializes the iomgr. */ void grpc_iomgr_init(void); diff --git a/src/core/iomgr/pollset.h b/src/core/iomgr/pollset.h index 95ba694ff6..b5d71bfb4c 100644 --- a/src/core/iomgr/pollset.h +++ b/src/core/iomgr/pollset.h @@ -56,7 +56,7 @@ void grpc_pollset_init(grpc_pollset *pollset); void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_pollset_destroy(grpc_pollset *pollset); /* Do some work on a pollset. @@ -74,11 +74,12 @@ void grpc_pollset_destroy(grpc_pollset *pollset); not be released by grpc_pollset_work AFTER worker has been destroyed. Tries not to block past deadline. - May call grpc_call_list_run on grpc_call_list, without holding the pollset + May call grpc_closure_list_run on grpc_closure_list, without holding the + pollset lock */ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Break one polling thread out of polling work for this pollset. If specific_worker is GRPC_POLLSET_KICK_BROADCAST, kick ALL the workers. diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index e6e0f5cdd4..a83086572e 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -62,7 +62,7 @@ typedef struct { } pollset_hdr; static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; @@ -84,15 +84,15 @@ static void finally_add_fd(grpc_pollset *pollset, grpc_fd *fd, } } } - grpc_fd_end_poll(&watcher, 0, 0, call_list); + grpc_fd_end_poll(&watcher, 0, 0, closure_list); } static void perform_delayed_add(void *arg, int iomgr_status, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { delayed_add *da = arg; if (!grpc_fd_is_orphaned(da->fd)) { - finally_add_fd(da->pollset, da->fd, call_list); + finally_add_fd(da->pollset, da->fd, closure_list); } gpr_mu_lock(&da->pollset->mu); @@ -101,7 +101,7 @@ static void perform_delayed_add(void *arg, int iomgr_status, /* We don't care about this pollset anymore. */ if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) { da->pollset->called_shutdown = 1; - grpc_call_list_add(call_list, da->pollset->shutdown_done, 1); + grpc_closure_list_add(closure_list, da->pollset->shutdown_done, 1); } } gpr_mu_unlock(&da->pollset->mu); @@ -111,13 +111,12 @@ static void perform_delayed_add(void *arg, int iomgr_status, gpr_free(da); } -static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset, - grpc_call_list *call_list) { +static void multipoll_with_epoll_pollset_add_fd( + grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset, + grpc_closure_list *closure_list) { if (and_unlock_pollset) { gpr_mu_unlock(&pollset->mu); - finally_add_fd(pollset, fd, call_list); + finally_add_fd(pollset, fd, closure_list); } else { delayed_add *da = gpr_malloc(sizeof(*da)); da->pollset = pollset; @@ -125,14 +124,13 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, GRPC_FD_REF(fd, "delayed_add"); grpc_closure_init(&da->closure, perform_delayed_add, da); pollset->in_flight_cbs++; - grpc_call_list_add(call_list, &da->closure, 1); + grpc_closure_list_add(closure_list, &da->closure, 1); } } -static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset, - grpc_call_list *call_list) { +static void multipoll_with_epoll_pollset_del_fd( + grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset, + grpc_closure_list *closure_list) { pollset_hdr *h = pollset->data.ptr; int err; @@ -154,7 +152,7 @@ static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, static void multipoll_with_epoll_pollset_maybe_work_and_unlock( grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, - gpr_timespec now, grpc_call_list *call_list) { + gpr_timespec now, grpc_closure_list *closure_list) { struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS]; int ep_rv; int poll_rv; @@ -208,10 +206,10 @@ static void multipoll_with_epoll_pollset_maybe_work_and_unlock( int read = ep_ev[i].events & (EPOLLIN | EPOLLPRI); int write = ep_ev[i].events & EPOLLOUT; if (read || cancel) { - grpc_fd_become_readable(fd, call_list); + grpc_fd_become_readable(fd, closure_list); } if (write || cancel) { - grpc_fd_become_writable(fd, call_list); + grpc_fd_become_writable(fd, closure_list); } } } @@ -236,7 +234,8 @@ static const grpc_pollset_vtable multipoll_with_epoll_pollset = { multipoll_with_epoll_pollset_destroy}; static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, - size_t nfds, grpc_call_list *call_list) { + size_t nfds, + grpc_closure_list *closure_list) { size_t i; pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); @@ -249,7 +248,7 @@ static void epoll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, abort(); } for (i = 0; i < nfds; i++) { - multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0, call_list); + multipoll_with_epoll_pollset_add_fd(pollset, fds[i], 0, closure_list); } } diff --git a/src/core/iomgr/pollset_multipoller_with_poll_posix.c b/src/core/iomgr/pollset_multipoller_with_poll_posix.c index 03a28894f0..a3aa7e69fa 100644 --- a/src/core/iomgr/pollset_multipoller_with_poll_posix.c +++ b/src/core/iomgr/pollset_multipoller_with_poll_posix.c @@ -59,10 +59,9 @@ typedef struct { grpc_fd **dels; } pollset_hdr; -static void multipoll_with_poll_pollset_add_fd(grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset, - grpc_call_list *call_list) { +static void multipoll_with_poll_pollset_add_fd( + grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset, + grpc_closure_list *closure_list) { size_t i; pollset_hdr *h = pollset->data.ptr; /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */ @@ -81,10 +80,9 @@ exit: } } -static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, - grpc_fd *fd, - int and_unlock_pollset, - grpc_call_list *call_list) { +static void multipoll_with_poll_pollset_del_fd( + grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset, + grpc_closure_list *closure_list) { /* will get removed next poll cycle */ pollset_hdr *h = pollset->data.ptr; if (h->del_count == h->del_capacity) { @@ -100,7 +98,7 @@ static void multipoll_with_poll_pollset_del_fd(grpc_pollset *pollset, static void multipoll_with_poll_pollset_maybe_work_and_unlock( grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, - gpr_timespec now, grpc_call_list *call_list) { + gpr_timespec now, grpc_closure_list *closure_list) { int timeout; int r; size_t i, j, fd_count; @@ -151,7 +149,7 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( for (i = 1; i < pfd_count; i++) { grpc_fd_end_poll(&watchers[i], pfds[i].revents & POLLIN, - pfds[i].revents & POLLOUT, call_list); + pfds[i].revents & POLLOUT, closure_list); } if (r < 0) { @@ -169,10 +167,10 @@ static void multipoll_with_poll_pollset_maybe_work_and_unlock( continue; } if (pfds[i].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable(watchers[i].fd, call_list); + grpc_fd_become_readable(watchers[i].fd, closure_list); } if (pfds[i].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable(watchers[i].fd, call_list); + grpc_fd_become_writable(watchers[i].fd, closure_list); } } } @@ -209,7 +207,8 @@ static const grpc_pollset_vtable multipoll_with_poll_pollset = { multipoll_with_poll_pollset_destroy}; void grpc_poll_become_multipoller(grpc_pollset *pollset, grpc_fd **fds, - size_t nfds, grpc_call_list *call_list) { + size_t nfds, + grpc_closure_list *closure_list) { size_t i; pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr)); pollset->vtable = &multipoll_with_poll_pollset; diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index daca2f6daa..82ef0298a2 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -141,9 +141,9 @@ void grpc_pollset_init(grpc_pollset *pollset) { } void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { gpr_mu_lock(&pollset->mu); - pollset->vtable->add_fd(pollset, fd, 1, call_list); + pollset->vtable->add_fd(pollset, fd, 1, closure_list); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -155,9 +155,9 @@ void grpc_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, } void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { gpr_mu_lock(&pollset->mu); - pollset->vtable->del_fd(pollset, fd, 1, call_list); + pollset->vtable->del_fd(pollset, fd, 1, closure_list); /* the following (enabled only in debug) will reacquire and then release our lock - meaning that if the unlocking flag passed to del_fd above is not respected, the code will deadlock (in a way that we have a chance of @@ -168,14 +168,15 @@ void grpc_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, #endif } -static void finish_shutdown(grpc_pollset *pollset, grpc_call_list *call_list) { +static void finish_shutdown(grpc_pollset *pollset, + grpc_closure_list *closure_list) { pollset->vtable->finish_shutdown(pollset); - grpc_call_list_add(call_list, pollset->shutdown_done, 1); + grpc_closure_list_add(closure_list, pollset->shutdown_done, 1); } void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { /* pollset->mu already held */ int added_worker = 0; int locked = 1; @@ -184,11 +185,11 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, /* TODO(ctiller): pool these */ grpc_wakeup_fd_init(&worker->wakeup_fd); if (!grpc_pollset_has_workers(pollset) && - !grpc_call_list_empty(pollset->idle_jobs)) { - grpc_call_list_move(&pollset->idle_jobs, call_list); + !grpc_closure_list_empty(pollset->idle_jobs)) { + grpc_closure_list_move(&pollset->idle_jobs, closure_list); goto done; } - if (grpc_alarm_check(now, &deadline, call_list)) { + if (grpc_alarm_check(now, &deadline, closure_list)) { goto done; } if (pollset->shutting_down) { @@ -205,7 +206,7 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, added_worker = 1; gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset); pollset->vtable->maybe_work_and_unlock(pollset, worker, deadline, now, - call_list); + closure_list); locked = 0; gpr_tls_set(&g_current_thread_poller, 0); } else { @@ -213,7 +214,7 @@ void grpc_pollset_work(grpc_pollset *pollset, grpc_pollset_worker *worker, } done: if (!locked) { - grpc_call_list_run(call_list); + grpc_closure_list_run(closure_list); gpr_mu_lock(&pollset->mu); locked = 1; } @@ -227,8 +228,8 @@ done: } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); - finish_shutdown(pollset, call_list); - grpc_call_list_run(call_list); + finish_shutdown(pollset, closure_list); + grpc_closure_list_run(closure_list); /* Continuing to access pollset here is safe -- it is the caller's * responsibility to not destroy when it has outstanding calls to * grpc_pollset_work. @@ -239,7 +240,7 @@ done: } void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { int call_shutdown = 0; gpr_mu_lock(&pollset->mu); GPR_ASSERT(!pollset->shutting_down); @@ -254,7 +255,7 @@ void grpc_pollset_shutdown(grpc_pollset *pollset, grpc_closure *closure, gpr_mu_unlock(&pollset->mu); if (call_shutdown) { - finish_shutdown(pollset, call_list); + finish_shutdown(pollset, closure_list); } } @@ -296,7 +297,7 @@ typedef struct grpc_unary_promote_args { } grpc_unary_promote_args; static void basic_do_promote(void *args, int success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_unary_promote_args *up_args = args; const grpc_pollset_vtable *original_vtable = up_args->original_vtable; grpc_pollset *pollset = up_args->pollset; @@ -327,12 +328,12 @@ static void basic_do_promote(void *args, int success, if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) { GPR_ASSERT(!grpc_pollset_has_workers(pollset)); pollset->called_shutdown = 1; - grpc_call_list_add(call_list, pollset->shutdown_done, 1); + grpc_closure_list_add(closure_list, pollset->shutdown_done, 1); } } else if (grpc_fd_is_orphaned(fd)) { /* Don't try to add it to anything, we'll drop our ref on it below */ } else if (pollset->vtable != original_vtable) { - pollset->vtable->add_fd(pollset, fd, 0, call_list); + pollset->vtable->add_fd(pollset, fd, 0, closure_list); } else if (fd != pollset->data.ptr) { grpc_fd *fds[2]; fds[0] = pollset->data.ptr; @@ -340,7 +341,7 @@ static void basic_do_promote(void *args, int success, if (fds[0] && !grpc_fd_is_orphaned(fds[0])) { grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds), - call_list); + closure_list); GRPC_FD_UNREF(fds[0], "basicpoll"); } else { /* old fd is orphaned and we haven't cleaned it up until now, so remain a @@ -358,12 +359,12 @@ static void basic_do_promote(void *args, int success, /* Matching ref in basic_pollset_add_fd */ GRPC_FD_UNREF(fd, "basicpoll_add"); - grpc_call_list_run(call_list); + grpc_closure_list_run(closure_list); } static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_unary_promote_args *up_args; GPR_ASSERT(fd); if (fd == pollset->data.ptr) goto exit; @@ -381,7 +382,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, GRPC_FD_REF(fd, "basicpoll"); } else if (!grpc_fd_is_orphaned(fds[0])) { grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds), - call_list); + closure_list); GRPC_FD_UNREF(fds[0], "basicpoll"); } else { /* old fd is orphaned and we haven't cleaned it up until now, so remain a @@ -404,7 +405,7 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd, up_args->promotion_closure.cb = basic_do_promote; up_args->promotion_closure.cb_arg = up_args; - grpc_call_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); + grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1); grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); exit: @@ -415,7 +416,7 @@ exit: static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, int and_unlock_pollset, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { GPR_ASSERT(fd); if (fd == pollset->data.ptr) { GRPC_FD_UNREF(pollset->data.ptr, "basicpoll"); @@ -427,11 +428,9 @@ static void basic_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd, } } -static void basic_pollset_maybe_work_and_unlock(grpc_pollset *pollset, - grpc_pollset_worker *worker, - gpr_timespec deadline, - gpr_timespec now, - grpc_call_list *call_list) { +static void basic_pollset_maybe_work_and_unlock( + grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, + gpr_timespec now, grpc_closure_list *closure_list) { struct pollfd pfd[2]; grpc_fd *fd; grpc_fd_watcher fd_watcher; @@ -469,7 +468,7 @@ static void basic_pollset_maybe_work_and_unlock(grpc_pollset *pollset, if (fd) { grpc_fd_end_poll(&fd_watcher, pfd[1].revents & POLLIN, - pfd[1].revents & POLLOUT, call_list); + pfd[1].revents & POLLOUT, closure_list); } if (r < 0) { @@ -484,10 +483,10 @@ static void basic_pollset_maybe_work_and_unlock(grpc_pollset *pollset, } if (nfds > 1) { if (pfd[1].revents & (POLLIN | POLLHUP | POLLERR)) { - grpc_fd_become_readable(fd, call_list); + grpc_fd_become_readable(fd, closure_list); } if (pfd[1].revents & (POLLOUT | POLLHUP | POLLERR)) { - grpc_fd_become_writable(fd, call_list); + grpc_fd_become_writable(fd, closure_list); } } } diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index c65f25fd18..2f50cd2209 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -66,7 +66,7 @@ typedef struct grpc_pollset { int called_shutdown; int kicked_without_pollers; grpc_closure *shutdown_done; - grpc_call_list idle_jobs; + grpc_closure_list idle_jobs; union { int fd; void *ptr; @@ -75,13 +75,13 @@ typedef struct grpc_pollset { struct grpc_pollset_vtable { void (*add_fd)(grpc_pollset *pollset, struct grpc_fd *fd, - int and_unlock_pollset, grpc_call_list *call_list); + int and_unlock_pollset, grpc_closure_list *closure_list); void (*del_fd)(grpc_pollset *pollset, struct grpc_fd *fd, - int and_unlock_pollset, grpc_call_list *call_list); + int and_unlock_pollset, grpc_closure_list *closure_list); void (*maybe_work_and_unlock)(grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec deadline, gpr_timespec now, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void (*finish_shutdown)(grpc_pollset *pollset); void (*destroy)(grpc_pollset *pollset); }; @@ -90,11 +90,11 @@ struct grpc_pollset_vtable { /* Add an fd to a pollset */ void grpc_pollset_add_fd(grpc_pollset *pollset, struct grpc_fd *fd, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Force remove an fd from a pollset (normally they are removed on the next poll after an fd is orphaned) */ void grpc_pollset_del_fd(grpc_pollset *pollset, struct grpc_fd *fd, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Returns the fd to listen on for kicks */ int grpc_kick_read_fd(grpc_pollset *p); @@ -114,11 +114,12 @@ int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline, /* turn a pollset into a multipoller: platform specific */ typedef void (*grpc_platform_become_multipoller_type)( grpc_pollset *pollset, struct grpc_fd **fds, size_t fd_count, - grpc_call_list *call_list); + grpc_closure_list *closure_list); extern grpc_platform_become_multipoller_type grpc_platform_become_multipoller; void grpc_poll_become_multipoller(grpc_pollset *pollset, struct grpc_fd **fds, - size_t fd_count, grpc_call_list *call_list); + size_t fd_count, + grpc_closure_list *closure_list); /* Return 1 if the pollset has active threads in grpc_pollset_work (pollset must * be locked) */ diff --git a/src/core/iomgr/pollset_set.h b/src/core/iomgr/pollset_set.h index ca667bf34e..5b5b20ef66 100644 --- a/src/core/iomgr/pollset_set.h +++ b/src/core/iomgr/pollset_set.h @@ -53,9 +53,9 @@ void grpc_pollset_set_init(grpc_pollset_set *pollset_set); void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set); void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_H */ diff --git a/src/core/iomgr/pollset_set_posix.c b/src/core/iomgr/pollset_set_posix.c index a2c1f47629..810543cc2e 100644 --- a/src/core/iomgr/pollset_set_posix.c +++ b/src/core/iomgr/pollset_set_posix.c @@ -60,7 +60,7 @@ void grpc_pollset_set_destroy(grpc_pollset_set *pollset_set) { void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t i, j; gpr_mu_lock(&pollset_set->mu); if (pollset_set->pollset_count == pollset_set->pollset_capacity) { @@ -75,7 +75,7 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, if (grpc_fd_is_orphaned(pollset_set->fds[i])) { GRPC_FD_UNREF(pollset_set->fds[i], "pollset"); } else { - grpc_pollset_add_fd(pollset, pollset_set->fds[i], call_list); + grpc_pollset_add_fd(pollset, pollset_set->fds[i], closure_list); pollset_set->fds[j++] = pollset_set->fds[i]; } } @@ -85,7 +85,7 @@ void grpc_pollset_set_add_pollset(grpc_pollset_set *pollset_set, void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set, grpc_pollset *pollset, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t i; gpr_mu_lock(&pollset_set->mu); for (i = 0; i < pollset_set->pollset_count; i++) { @@ -100,7 +100,7 @@ void grpc_pollset_set_del_pollset(grpc_pollset_set *pollset_set, } void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t i; gpr_mu_lock(&pollset_set->mu); if (pollset_set->fd_count == pollset_set->fd_capacity) { @@ -111,13 +111,13 @@ void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd, GRPC_FD_REF(fd, "pollset_set"); pollset_set->fds[pollset_set->fd_count++] = fd; for (i = 0; i < pollset_set->pollset_count; i++) { - grpc_pollset_add_fd(pollset_set->pollsets[i], fd, call_list); + grpc_pollset_add_fd(pollset_set->pollsets[i], fd, closure_list); } gpr_mu_unlock(&pollset_set->mu); } void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t i; gpr_mu_lock(&pollset_set->mu); for (i = 0; i < pollset_set->fd_count; i++) { diff --git a/src/core/iomgr/pollset_set_posix.h b/src/core/iomgr/pollset_set_posix.h index e9969d2c0f..40d7a9b259 100644 --- a/src/core/iomgr/pollset_set_posix.h +++ b/src/core/iomgr/pollset_set_posix.h @@ -50,8 +50,8 @@ typedef struct grpc_pollset_set { } grpc_pollset_set; void grpc_pollset_set_add_fd(grpc_pollset_set *pollset_set, grpc_fd *fd, - grpc_call_list *call_list); + grpc_closure_list *closure_list); void grpc_pollset_set_del_fd(grpc_pollset_set *pollset_set, grpc_fd *fd, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h index 72b9c1cc87..174432225e 100644 --- a/src/core/iomgr/resolve_address.h +++ b/src/core/iomgr/resolve_address.h @@ -54,7 +54,7 @@ typedef struct { grpc_resolved_addresses_destroy when it's done with them On failure: addresses is NULL */ typedef void (*grpc_resolve_cb)(void *arg, grpc_resolved_addresses *addresses, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Asynchronously resolve addr. Use default_port if a port isn't designated in addr, otherwise use the port in addr. */ /* TODO(ctiller): add a timeout here */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index ea64f8176f..f0a230bab2 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -146,17 +146,17 @@ done: /* Thread function to asynch-ify grpc_blocking_resolve_address */ static void do_request_thread(void *rp) { request *r = rp; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_closure_list closure_list = GRPC_CLOSURE_LIST_INIT; grpc_resolved_addresses *resolved = grpc_blocking_resolve_address(r->name, r->default_port); void *arg = r->arg; grpc_resolve_cb cb = r->cb; gpr_free(r->name); gpr_free(r->default_port); - cb(arg, resolved, &call_list); + cb(arg, resolved, &closure_list); grpc_iomgr_unregister_object(&r->iomgr_object); gpr_free(r); - grpc_call_list_run(&call_list); + grpc_closure_list_run(&closure_list); } void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addrs) { diff --git a/src/core/iomgr/tcp_client.h b/src/core/iomgr/tcp_client.h index 8fb5bfda1d..971e984e54 100644 --- a/src/core/iomgr/tcp_client.h +++ b/src/core/iomgr/tcp_client.h @@ -47,6 +47,7 @@ void grpc_tcp_client_connect(grpc_closure *on_connect, grpc_endpoint **endpoint, grpc_pollset_set *interested_parties, const struct sockaddr *addr, size_t addr_len, - gpr_timespec deadline, grpc_call_list *call_list); + gpr_timespec deadline, + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_CLIENT_H */ diff --git a/src/core/iomgr/tcp_client_posix.c b/src/core/iomgr/tcp_client_posix.c index fed832fce6..e23551c4c0 100644 --- a/src/core/iomgr/tcp_client_posix.c +++ b/src/core/iomgr/tcp_client_posix.c @@ -91,7 +91,8 @@ error: return 0; } -static void tc_on_alarm(void *acp, int success, grpc_call_list *call_list) { +static void tc_on_alarm(void *acp, int success, + grpc_closure_list *closure_list) { int done; async_connect *ac = acp; if (grpc_tcp_trace) { @@ -100,7 +101,7 @@ static void tc_on_alarm(void *acp, int success, grpc_call_list *call_list) { } gpr_mu_lock(&ac->mu); if (ac->fd != NULL) { - grpc_fd_shutdown(ac->fd, call_list); + grpc_fd_shutdown(ac->fd, closure_list); } done = (--ac->refs == 0); gpr_mu_unlock(&ac->mu); @@ -111,7 +112,8 @@ static void tc_on_alarm(void *acp, int success, grpc_call_list *call_list) { } } -static void on_writable(void *acp, int success, grpc_call_list *call_list) { +static void on_writable(void *acp, int success, + grpc_closure_list *closure_list) { async_connect *ac = acp; int so_error = 0; socklen_t so_error_size; @@ -132,7 +134,7 @@ static void on_writable(void *acp, int success, grpc_call_list *call_list) { ac->fd = NULL; gpr_mu_unlock(&ac->mu); - grpc_alarm_cancel(&ac->alarm, call_list); + grpc_alarm_cancel(&ac->alarm, closure_list); gpr_mu_lock(&ac->mu); if (success) { @@ -161,7 +163,7 @@ static void on_writable(void *acp, int success, grpc_call_list *call_list) { don't do that! */ gpr_log(GPR_ERROR, "kernel out of buffers"); gpr_mu_unlock(&ac->mu); - grpc_fd_notify_on_write(fd, &ac->write_closure, call_list); + grpc_fd_notify_on_write(fd, &ac->write_closure, closure_list); return; } else { switch (so_error) { @@ -175,7 +177,7 @@ static void on_writable(void *acp, int success, grpc_call_list *call_list) { goto finish; } } else { - grpc_pollset_set_del_fd(ac->interested_parties, fd, call_list); + grpc_pollset_set_del_fd(ac->interested_parties, fd, closure_list); *ep = grpc_tcp_create(fd, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, ac->addr_str); fd = NULL; goto finish; @@ -189,8 +191,8 @@ static void on_writable(void *acp, int success, grpc_call_list *call_list) { finish: if (fd != NULL) { - grpc_pollset_set_del_fd(ac->interested_parties, fd, call_list); - grpc_fd_orphan(fd, NULL, "tcp_client_orphan", call_list); + grpc_pollset_set_del_fd(ac->interested_parties, fd, closure_list); + grpc_fd_orphan(fd, NULL, "tcp_client_orphan", closure_list); fd = NULL; } done = (--ac->refs == 0); @@ -200,13 +202,14 @@ finish: gpr_free(ac->addr_str); gpr_free(ac); } - grpc_call_list_add(call_list, closure, *ep != NULL); + grpc_closure_list_add(closure_list, closure, *ep != NULL); } void grpc_tcp_client_connect(grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const struct sockaddr *addr, size_t addr_len, - gpr_timespec deadline, grpc_call_list *call_list) { + gpr_timespec deadline, + grpc_closure_list *closure_list) { int fd; grpc_dualstack_mode dsmode; int err; @@ -236,7 +239,7 @@ void grpc_tcp_client_connect(grpc_closure *closure, grpc_endpoint **ep, addr_len = sizeof(addr4_copy); } if (!prepare_socket(addr, fd)) { - grpc_call_list_add(call_list, closure, 0); + grpc_closure_list_add(closure_list, closure, 0); return; } @@ -252,18 +255,18 @@ void grpc_tcp_client_connect(grpc_closure *closure, grpc_endpoint **ep, if (err >= 0) { *ep = grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str); - grpc_call_list_add(call_list, closure, 1); + grpc_closure_list_add(closure_list, closure, 1); goto done; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { gpr_log(GPR_ERROR, "connect error to '%s': %s", addr_str, strerror(errno)); - grpc_fd_orphan(fdobj, NULL, "tcp_client_connect_error", call_list); - grpc_call_list_add(call_list, closure, 0); + grpc_fd_orphan(fdobj, NULL, "tcp_client_connect_error", closure_list); + grpc_closure_list_add(closure_list, closure, 0); goto done; } - grpc_pollset_set_add_fd(interested_parties, fdobj, call_list); + grpc_pollset_set_add_fd(interested_parties, fdobj, closure_list); ac = gpr_malloc(sizeof(async_connect)); ac->closure = closure; @@ -285,8 +288,8 @@ void grpc_tcp_client_connect(grpc_closure *closure, grpc_endpoint **ep, gpr_mu_lock(&ac->mu); grpc_alarm_init(&ac->alarm, gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC), - tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC), call_list); - grpc_fd_notify_on_write(ac->fd, &ac->write_closure, call_list); + tc_on_alarm, ac, gpr_now(GPR_CLOCK_MONOTONIC), closure_list); + grpc_fd_notify_on_write(ac->fd, &ac->write_closure, closure_list); gpr_mu_unlock(&ac->mu); done: diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index 89a85f26dc..942ab8b71d 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -95,17 +95,17 @@ typedef struct { } grpc_tcp; static void tcp_handle_read(void *arg /* grpc_tcp */, int success, - grpc_call_list *call_list); + grpc_closure_list *closure_list); static void tcp_handle_write(void *arg /* grpc_tcp */, int success, - grpc_call_list *call_list); + grpc_closure_list *closure_list); -static void tcp_shutdown(grpc_endpoint *ep, grpc_call_list *call_list) { +static void tcp_shutdown(grpc_endpoint *ep, grpc_closure_list *closure_list) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_fd_shutdown(tcp->em_fd, call_list); + grpc_fd_shutdown(tcp->em_fd, closure_list); } -static void tcp_free(grpc_tcp *tcp, grpc_call_list *call_list) { - grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan", call_list); +static void tcp_free(grpc_tcp *tcp, grpc_closure_list *closure_list) { + grpc_fd_orphan(tcp->em_fd, NULL, "tcp_unref_orphan", closure_list); gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -115,12 +115,12 @@ static void tcp_free(grpc_tcp *tcp, grpc_call_list *call_list) { #define TCP_UNREF(tcp, reason, cl) \ tcp_unref((tcp), (cl), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_tcp *tcp, grpc_call_list *call_list, +static void tcp_unref(grpc_tcp *tcp, grpc_closure_list *closure_list, const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count - 1); if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp, call_list); + tcp_free(tcp, closure_list); } } @@ -133,22 +133,22 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, #else #define TCP_UNREF(tcp, reason, cl) tcp_unref((tcp), (cl)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_tcp *tcp, grpc_call_list *call_list) { +static void tcp_unref(grpc_tcp *tcp, grpc_closure_list *closure_list) { if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp, call_list); + tcp_free(tcp, closure_list); } } static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif -static void tcp_destroy(grpc_endpoint *ep, grpc_call_list *call_list) { +static void tcp_destroy(grpc_endpoint *ep, grpc_closure_list *closure_list) { grpc_tcp *tcp = (grpc_tcp *)ep; - TCP_UNREF(tcp, "destroy", call_list); + TCP_UNREF(tcp, "destroy", closure_list); } static void call_read_cb(grpc_tcp *tcp, int success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_closure *cb = tcp->read_cb; if (grpc_tcp_trace) { @@ -164,11 +164,11 @@ static void call_read_cb(grpc_tcp *tcp, int success, tcp->read_cb = NULL; tcp->incoming_buffer = NULL; - cb->cb(cb->cb_arg, success, call_list); + cb->cb(cb->cb_arg, success, closure_list); } #define MAX_READ_IOVEC 4 -static void tcp_continue_read(grpc_tcp *tcp, grpc_call_list *call_list) { +static void tcp_continue_read(grpc_tcp *tcp, grpc_closure_list *closure_list) { struct msghdr msg; struct iovec iov[MAX_READ_IOVEC]; ssize_t read_bytes; @@ -210,18 +210,18 @@ static void tcp_continue_read(grpc_tcp *tcp, grpc_call_list *call_list) { tcp->iov_size /= 2; } /* We've consumed the edge, request a new one */ - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, call_list); + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, closure_list); } else { /* TODO(klempner): Log interesting errors */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0, call_list); - TCP_UNREF(tcp, "read", call_list); + call_read_cb(tcp, 0, closure_list); + TCP_UNREF(tcp, "read", closure_list); } } else if (read_bytes == 0) { /* 0 read size ==> end of stream */ gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0, call_list); - TCP_UNREF(tcp, "read", call_list); + call_read_cb(tcp, 0, closure_list); + TCP_UNREF(tcp, "read", closure_list); } else { GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length); if ((size_t)read_bytes < tcp->incoming_buffer->length) { @@ -232,29 +232,29 @@ static void tcp_continue_read(grpc_tcp *tcp, grpc_call_list *call_list) { ++tcp->iov_size; } GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length); - call_read_cb(tcp, 1, call_list); - TCP_UNREF(tcp, "read", call_list); + call_read_cb(tcp, 1, closure_list); + TCP_UNREF(tcp, "read", closure_list); } GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0); } static void tcp_handle_read(void *arg /* grpc_tcp */, int success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_tcp *tcp = (grpc_tcp *)arg; GPR_ASSERT(!tcp->finished_edge); if (!success) { gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer); - call_read_cb(tcp, 0, call_list); - TCP_UNREF(tcp, "read", call_list); + call_read_cb(tcp, 0, closure_list); + TCP_UNREF(tcp, "read", closure_list); } else { - tcp_continue_read(tcp, call_list); + tcp_continue_read(tcp, closure_list); } } static void tcp_read(grpc_endpoint *ep, gpr_slice_buffer *incoming_buffer, - grpc_closure *cb, grpc_call_list *call_list) { + grpc_closure *cb, grpc_closure_list *closure_list) { grpc_tcp *tcp = (grpc_tcp *)ep; GPR_ASSERT(tcp->read_cb == NULL); tcp->read_cb = cb; @@ -263,9 +263,9 @@ static void tcp_read(grpc_endpoint *ep, gpr_slice_buffer *incoming_buffer, TCP_REF(tcp, "read"); if (tcp->finished_edge) { tcp->finished_edge = 0; - grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, call_list); + grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure, closure_list); } else { - grpc_call_list_add(call_list, &tcp->read_closure, 1); + grpc_closure_list_add(closure_list, &tcp->read_closure, 1); } } @@ -352,7 +352,7 @@ static flush_result tcp_flush(grpc_tcp *tcp) { } static void tcp_handle_write(void *arg /* grpc_tcp */, int success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_tcp *tcp = (grpc_tcp *)arg; flush_result status; grpc_closure *cb; @@ -360,26 +360,26 @@ static void tcp_handle_write(void *arg /* grpc_tcp */, int success, if (!success) { cb = tcp->write_cb; tcp->write_cb = NULL; - cb->cb(cb->cb_arg, 0, call_list); - TCP_UNREF(tcp, "write", call_list); + cb->cb(cb->cb_arg, 0, closure_list); + TCP_UNREF(tcp, "write", closure_list); return; } GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0); status = tcp_flush(tcp); if (status == FLUSH_PENDING) { - grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, call_list); + grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, closure_list); } else { cb = tcp->write_cb; tcp->write_cb = NULL; - cb->cb(cb->cb_arg, status == FLUSH_DONE, call_list); - TCP_UNREF(tcp, "write", call_list); + cb->cb(cb->cb_arg, status == FLUSH_DONE, closure_list); + TCP_UNREF(tcp, "write", closure_list); } GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0); } static void tcp_write(grpc_endpoint *ep, gpr_slice_buffer *buf, - grpc_closure *cb, grpc_call_list *call_list) { + grpc_closure *cb, grpc_closure_list *closure_list) { grpc_tcp *tcp = (grpc_tcp *)ep; flush_result status; @@ -399,7 +399,7 @@ static void tcp_write(grpc_endpoint *ep, gpr_slice_buffer *buf, if (buf->length == 0) { GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); - grpc_call_list_add(call_list, cb, 1); + grpc_closure_list_add(closure_list, cb, 1); return; } tcp->outgoing_buffer = buf; @@ -410,25 +410,25 @@ static void tcp_write(grpc_endpoint *ep, gpr_slice_buffer *buf, if (status == FLUSH_PENDING) { TCP_REF(tcp, "write"); tcp->write_cb = cb; - grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, call_list); + grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_closure, closure_list); } else { - grpc_call_list_add(call_list, cb, status == FLUSH_DONE); + grpc_closure_list_add(closure_list, cb, status == FLUSH_DONE); } GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0); } static void tcp_add_to_pollset(grpc_endpoint *ep, grpc_pollset *pollset, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_pollset_add_fd(pollset, tcp->em_fd, call_list); + grpc_pollset_add_fd(pollset, tcp->em_fd, closure_list); } static void tcp_add_to_pollset_set(grpc_endpoint *ep, grpc_pollset_set *pollset_set, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_tcp *tcp = (grpc_tcp *)ep; - grpc_pollset_set_add_fd(pollset_set, tcp->em_fd, call_list); + grpc_pollset_set_add_fd(pollset_set, tcp->em_fd, closure_list); } static char *tcp_get_peer(grpc_endpoint *ep) { diff --git a/src/core/iomgr/tcp_server.h b/src/core/iomgr/tcp_server.h index b2aab234d3..034f10eff7 100644 --- a/src/core/iomgr/tcp_server.h +++ b/src/core/iomgr/tcp_server.h @@ -41,7 +41,7 @@ typedef struct grpc_tcp_server grpc_tcp_server; /* Called for newly connected TCP connections. */ typedef void (*grpc_tcp_server_cb)(void *arg, grpc_endpoint *ep, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Create a server, initially not bound to any ports */ grpc_tcp_server *grpc_tcp_server_create(void); @@ -50,7 +50,7 @@ grpc_tcp_server *grpc_tcp_server_create(void); void grpc_tcp_server_start(grpc_tcp_server *server, grpc_pollset **pollsets, size_t pollset_count, grpc_tcp_server_cb on_accept_cb, void *cb_arg, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Add a port to the server, returning port number on success, or negative on failure. @@ -74,6 +74,6 @@ int grpc_tcp_server_add_port(grpc_tcp_server *s, const void *addr, int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index); void grpc_tcp_server_destroy(grpc_tcp_server *server, grpc_closure *closure, - grpc_call_list *call_list); + grpc_closure_list *closure_list); #endif /* GRPC_INTERNAL_CORE_IOMGR_TCP_SERVER_H */ diff --git a/src/core/iomgr/tcp_server_posix.c b/src/core/iomgr/tcp_server_posix.c index 635fdeb198..e6c6a36602 100644 --- a/src/core/iomgr/tcp_server_posix.c +++ b/src/core/iomgr/tcp_server_posix.c @@ -140,8 +140,9 @@ grpc_tcp_server *grpc_tcp_server_create(void) { return s; } -static void finish_shutdown(grpc_tcp_server *s, grpc_call_list *call_list) { - grpc_call_list_add(call_list, s->shutdown_complete, 1); +static void finish_shutdown(grpc_tcp_server *s, + grpc_closure_list *closure_list) { + grpc_closure_list_add(closure_list, s->shutdown_complete, 1); gpr_mu_destroy(&s->mu); @@ -150,13 +151,13 @@ static void finish_shutdown(grpc_tcp_server *s, grpc_call_list *call_list) { } static void destroyed_port(void *server, int success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_tcp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { gpr_mu_unlock(&s->mu); - finish_shutdown(s, call_list); + finish_shutdown(s, closure_list); } else { GPR_ASSERT(s->destroyed_ports < s->nports); gpr_mu_unlock(&s->mu); @@ -167,7 +168,7 @@ static void destroyed_port(void *server, int success, events will be received on them - at this point it's safe to destroy things */ static void deactivated_all_ports(grpc_tcp_server *s, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t i; /* delete ALL the things */ @@ -187,17 +188,17 @@ static void deactivated_all_ports(grpc_tcp_server *s, sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "tcp_listener_shutdown", - call_list); + closure_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(s, call_list); + finish_shutdown(s, closure_list); } } void grpc_tcp_server_destroy(grpc_tcp_server *s, grpc_closure *closure, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t i; gpr_mu_lock(&s->mu); @@ -209,12 +210,12 @@ void grpc_tcp_server_destroy(grpc_tcp_server *s, grpc_closure *closure, /* shutdown all fd's */ if (s->active_ports) { for (i = 0; i < s->nports; i++) { - grpc_fd_shutdown(s->ports[i].emfd, call_list); + grpc_fd_shutdown(s->ports[i].emfd, closure_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - deactivated_all_ports(s, call_list); + deactivated_all_ports(s, closure_list); } } @@ -299,7 +300,7 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, int success, grpc_call_list *call_list) { +static void on_read(void *arg, int success, grpc_closure_list *closure_list) { server_port *sp = arg; grpc_fd *fdobj; size_t i; @@ -322,7 +323,7 @@ static void on_read(void *arg, int success, grpc_call_list *call_list) { case EINTR: continue; case EAGAIN: - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, call_list); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, closure_list); return; default: gpr_log(GPR_ERROR, "Failed accept4: %s", strerror(errno)); @@ -344,12 +345,12 @@ static void on_read(void *arg, int success, grpc_call_list *call_list) { of channels -- we certainly should not be automatically adding every incoming channel to every pollset owned by the server */ for (i = 0; i < sp->server->pollset_count; i++) { - grpc_pollset_add_fd(sp->server->pollsets[i], fdobj, call_list); + grpc_pollset_add_fd(sp->server->pollsets[i], fdobj, closure_list); } sp->server->on_accept_cb( sp->server->on_accept_cb_arg, grpc_tcp_create(fdobj, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, addr_str), - call_list); + closure_list); gpr_free(name); gpr_free(addr_str); @@ -361,7 +362,7 @@ error: gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server, call_list); + deactivated_all_ports(sp->server, closure_list); } else { gpr_mu_unlock(&sp->server->mu); } @@ -488,7 +489,8 @@ int grpc_tcp_server_get_fd(grpc_tcp_server *s, unsigned index) { void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, size_t pollset_count, grpc_tcp_server_cb on_accept_cb, - void *on_accept_cb_arg, grpc_call_list *call_list) { + void *on_accept_cb_arg, + grpc_closure_list *closure_list) { size_t i, j; GPR_ASSERT(on_accept_cb); gpr_mu_lock(&s->mu); @@ -500,12 +502,12 @@ void grpc_tcp_server_start(grpc_tcp_server *s, grpc_pollset **pollsets, s->pollset_count = pollset_count; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, call_list); + grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, closure_list); } s->ports[i].read_closure.cb = on_read; s->ports[i].read_closure.cb_arg = &s->ports[i]; grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure, - call_list); + closure_list); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/iomgr/udp_server.c b/src/core/iomgr/udp_server.c index e1c2ae95fd..0f0bea82ab 100644 --- a/src/core/iomgr/udp_server.c +++ b/src/core/iomgr/udp_server.c @@ -133,8 +133,9 @@ grpc_udp_server *grpc_udp_server_create(void) { return s; } -static void finish_shutdown(grpc_udp_server *s, grpc_call_list *call_list) { - grpc_call_list_add(call_list, s->shutdown_complete, 1); +static void finish_shutdown(grpc_udp_server *s, + grpc_closure_list *closure_list) { + grpc_closure_list_add(closure_list, s->shutdown_complete, 1); gpr_mu_destroy(&s->mu); gpr_cv_destroy(&s->cv); @@ -144,13 +145,13 @@ static void finish_shutdown(grpc_udp_server *s, grpc_call_list *call_list) { } static void destroyed_port(void *server, int success, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { grpc_udp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; if (s->destroyed_ports == s->nports) { gpr_mu_unlock(&s->mu); - finish_shutdown(s, call_list); + finish_shutdown(s, closure_list); } else { gpr_mu_unlock(&s->mu); } @@ -160,7 +161,7 @@ static void destroyed_port(void *server, int success, events will be received on them - at this point it's safe to destroy things */ static void deactivated_all_ports(grpc_udp_server *s, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t i; /* delete ALL the things */ @@ -180,17 +181,17 @@ static void deactivated_all_ports(grpc_udp_server *s, sp->destroyed_closure.cb = destroyed_port; sp->destroyed_closure.cb_arg = s; grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, "udp_listener_shutdown", - call_list); + closure_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - finish_shutdown(s, call_list); + finish_shutdown(s, closure_list); } } void grpc_udp_server_destroy(grpc_udp_server *s, grpc_closure *on_done, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { size_t i; gpr_mu_lock(&s->mu); @@ -202,12 +203,12 @@ void grpc_udp_server_destroy(grpc_udp_server *s, grpc_closure *on_done, /* shutdown all fd's */ if (s->active_ports) { for (i = 0; i < s->nports; i++) { - grpc_fd_shutdown(s->ports[i].emfd, call_list); + grpc_fd_shutdown(s->ports[i].emfd, closure_list); } gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - deactivated_all_ports(s, call_list); + deactivated_all_ports(s, closure_list); } } @@ -262,14 +263,14 @@ error: } /* event manager callback when reads are ready */ -static void on_read(void *arg, int success, grpc_call_list *call_list) { +static void on_read(void *arg, int success, grpc_closure_list *closure_list) { server_port *sp = arg; if (success == 0) { gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); - deactivated_all_ports(sp->server, call_list); + deactivated_all_ports(sp->server, closure_list); } else { gpr_mu_unlock(&sp->server->mu); } @@ -281,7 +282,7 @@ static void on_read(void *arg, int success, grpc_call_list *call_list) { sp->read_cb(sp->fd); /* Re-arm the notification event so we get another chance to read. */ - grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, call_list); + grpc_fd_notify_on_read(sp->emfd, &sp->read_closure, closure_list); } static int add_socket_to_server(grpc_udp_server *s, int fd, @@ -402,19 +403,20 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index) { } void grpc_udp_server_start(grpc_udp_server *s, grpc_pollset **pollsets, - size_t pollset_count, grpc_call_list *call_list) { + size_t pollset_count, + grpc_closure_list *closure_list) { size_t i, j; gpr_mu_lock(&s->mu); GPR_ASSERT(s->active_ports == 0); s->pollsets = pollsets; for (i = 0; i < s->nports; i++) { for (j = 0; j < pollset_count; j++) { - grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, call_list); + grpc_pollset_add_fd(pollsets[j], s->ports[i].emfd, closure_list); } s->ports[i].read_closure.cb = on_read; s->ports[i].read_closure.cb_arg = &s->ports[i]; grpc_fd_notify_on_read(s->ports[i].emfd, &s->ports[i].read_closure, - call_list); + closure_list); s->active_ports++; } gpr_mu_unlock(&s->mu); diff --git a/src/core/iomgr/udp_server.h b/src/core/iomgr/udp_server.h index fa4d2147b4..b66a2d79a2 100644 --- a/src/core/iomgr/udp_server.h +++ b/src/core/iomgr/udp_server.h @@ -47,7 +47,8 @@ grpc_udp_server *grpc_udp_server_create(void); /* Start listening to bound ports */ void grpc_udp_server_start(grpc_udp_server *udp_server, grpc_pollset **pollsets, - size_t pollset_count, grpc_call_list *call_list); + size_t pollset_count, + grpc_closure_list *closure_list); int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index); @@ -65,7 +66,7 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, size_t addr_len, grpc_udp_server_read_cb read_cb); void grpc_udp_server_destroy(grpc_udp_server *server, grpc_closure *on_done, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /* Write the contents of buffer to the underlying UDP socket. */ /* diff --git a/src/core/iomgr/workqueue.h b/src/core/iomgr/workqueue.h index ea7031a9f3..0bec714d4f 100644 --- a/src/core/iomgr/workqueue.h +++ b/src/core/iomgr/workqueue.h @@ -50,9 +50,10 @@ struct grpc_workqueue; typedef struct grpc_workqueue grpc_workqueue; /** Create a work queue */ -grpc_workqueue *grpc_workqueue_create(grpc_call_list *call_list); +grpc_workqueue *grpc_workqueue_create(grpc_closure_list *closure_list); -void grpc_workqueue_flush(grpc_workqueue *workqueue, grpc_call_list *call_list); +void grpc_workqueue_flush(grpc_workqueue *workqueue, + grpc_closure_list *closure_list); #define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -62,19 +63,21 @@ void grpc_workqueue_flush(grpc_workqueue *workqueue, grpc_call_list *call_list); grpc_workqueue_unref((p), (cl), __FILE__, __LINE__, (r)) void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason); -void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list, - const char *file, int line, const char *reason); +void grpc_workqueue_unref(grpc_workqueue *workqueue, + grpc_closure_list *closure_list, const char *file, + int line, const char *reason); #else #define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) #define GRPC_WORKQUEUE_UNREF(p, r, cl) grpc_workqueue_unref((p), (cl)) void grpc_workqueue_ref(grpc_workqueue *workqueue); -void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list); +void grpc_workqueue_unref(grpc_workqueue *workqueue, + grpc_closure_list *closure_list); #endif /** Bind this workqueue to a pollset */ void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, grpc_pollset *pollset, - grpc_call_list *call_list); + grpc_closure_list *closure_list); /** Add a work item to a workqueue */ void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, diff --git a/src/core/iomgr/workqueue_posix.c b/src/core/iomgr/workqueue_posix.c index 83249c583c..9ecd59d390 100644 --- a/src/core/iomgr/workqueue_posix.c +++ b/src/core/iomgr/workqueue_posix.c @@ -45,28 +45,29 @@ #include "src/core/iomgr/fd_posix.h" -static void on_readable(void *arg, int success, grpc_call_list *call_list); +static void on_readable(void *arg, int success, + grpc_closure_list *closure_list); -grpc_workqueue *grpc_workqueue_create(grpc_call_list *call_list) { +grpc_workqueue *grpc_workqueue_create(grpc_closure_list *closure_list) { char name[32]; grpc_workqueue *workqueue = gpr_malloc(sizeof(grpc_workqueue)); gpr_ref_init(&workqueue->refs, 1); gpr_mu_init(&workqueue->mu); - workqueue->call_list.head = workqueue->call_list.tail = NULL; + workqueue->closure_list.head = workqueue->closure_list.tail = NULL; grpc_wakeup_fd_init(&workqueue->wakeup_fd); sprintf(name, "workqueue:%p", (void *)workqueue); workqueue->wakeup_read_fd = grpc_fd_create(GRPC_WAKEUP_FD_GET_READ_FD(&workqueue->wakeup_fd), name); grpc_closure_init(&workqueue->read_closure, on_readable, workqueue); grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure, - call_list); + closure_list); return workqueue; } static void workqueue_destroy(grpc_workqueue *workqueue, - grpc_call_list *call_list) { - GPR_ASSERT(grpc_call_list_empty(workqueue->call_list)); - grpc_fd_shutdown(workqueue->wakeup_read_fd, call_list); + grpc_closure_list *closure_list) { + GPR_ASSERT(grpc_closure_list_empty(workqueue->closure_list)); + grpc_fd_shutdown(workqueue->wakeup_read_fd, closure_list); } #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG @@ -82,34 +83,36 @@ void grpc_workqueue_ref(grpc_workqueue *workqueue) { } #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG -void grpc_workqueue_unref(grpc_workqueue *workqueue, grpc_call_list *call_list, - const char *file, int line, const char *reason) { +void grpc_workqueue_unref(grpc_workqueue *workqueue, + grpc_closure_list *closure_list, const char *file, + int line, const char *reason) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "WORKQUEUE:%p unref %d -> %d %s", workqueue, (int)workqueue->refs.count, (int)workqueue->refs.count - 1, reason); #else void grpc_workqueue_unref(grpc_workqueue *workqueue, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { #endif if (gpr_unref(&workqueue->refs)) { - workqueue_destroy(workqueue, call_list); + workqueue_destroy(workqueue, closure_list); } } void grpc_workqueue_add_to_pollset(grpc_workqueue *workqueue, grpc_pollset *pollset, - grpc_call_list *call_list) { - grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd, call_list); + grpc_closure_list *closure_list) { + grpc_pollset_add_fd(pollset, workqueue->wakeup_read_fd, closure_list); } void grpc_workqueue_flush(grpc_workqueue *workqueue, - grpc_call_list *call_list) { + grpc_closure_list *closure_list) { gpr_mu_lock(&workqueue->mu); - grpc_call_list_move(&workqueue->call_list, call_list); + grpc_closure_list_move(&workqueue->closure_list, closure_list); gpr_mu_unlock(&workqueue->mu); } -static void on_readable(void *arg, int success, grpc_call_list *call_list) { +static void on_readable(void *arg, int success, + grpc_closure_list *closure_list) { grpc_workqueue *workqueue = arg; if (!success) { @@ -117,15 +120,15 @@ static void on_readable(void *arg, int success, grpc_call_list *call_list) { /* HACK: let wakeup_fd code know that we stole the fd */ workqueue->wakeup_fd.read_fd = 0; grpc_wakeup_fd_destroy(&workqueue->wakeup_fd); - grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy", call_list); + grpc_fd_orphan(workqueue->wakeup_read_fd, NULL, "destroy", closure_list); gpr_free(workqueue); } else { gpr_mu_lock(&workqueue->mu); - grpc_call_list_move(&workqueue->call_list, call_list); + grpc_closure_list_move(&workqueue->closure_list, closure_list); grpc_wakeup_fd_consume_wakeup(&workqueue->wakeup_fd); gpr_mu_unlock(&workqueue->mu); grpc_fd_notify_on_read(workqueue->wakeup_read_fd, &workqueue->read_closure, - call_list); + closure_list); } } @@ -134,10 +137,10 @@ void grpc_workqueue_push(grpc_workqueue *workqueue, grpc_closure *closure, closure->success = success; closure->next = NULL; gpr_mu_lock(&workqueue->mu); - if (grpc_call_list_empty(workqueue->call_list)) { + if (grpc_closure_list_empty(workqueue->closure_list)) { grpc_wakeup_fd_wakeup(&workqueue->wakeup_fd); } - grpc_call_list_add(&workqueue->call_list, closure, success); + grpc_closure_list_add(&workqueue->closure_list, closure, success); gpr_mu_unlock(&workqueue->mu); } diff --git a/src/core/iomgr/workqueue_posix.h b/src/core/iomgr/workqueue_posix.h index 22c48c4926..589034fe1b 100644 --- a/src/core/iomgr/workqueue_posix.h +++ b/src/core/iomgr/workqueue_posix.h @@ -40,7 +40,7 @@ struct grpc_workqueue { gpr_refcount refs; gpr_mu mu; - grpc_call_list call_list; + grpc_closure_list closure_list; grpc_wakeup_fd wakeup_fd; struct grpc_fd *wakeup_read_fd; |