diff options
Diffstat (limited to 'src/core/iomgr/fd_posix.c')
-rw-r--r-- | src/core/iomgr/fd_posix.c | 87 |
1 files changed, 41 insertions, 46 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 4f52339bc1..41fd24e05a 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -45,7 +45,10 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> -enum descriptor_state { NOT_READY, READY, WAITING }; +enum descriptor_state { + NOT_READY = 0, + READY = 1 +}; /* or a pointer to a closure to call */ /* We need to keep a freelist not because of any concerns of malloc performance * but instead so that implementations with multiple threads in (for example) @@ -88,8 +91,8 @@ static grpc_fd *alloc_fd(int fd) { gpr_mu_init(&r->watcher_mu); } gpr_atm_rel_store(&r->refst, 1); - gpr_atm_rel_store(&r->readst.state, NOT_READY); - gpr_atm_rel_store(&r->writest.state, NOT_READY); + gpr_atm_rel_store(&r->readst, NOT_READY); + gpr_atm_rel_store(&r->writest, NOT_READY); gpr_atm_rel_store(&r->shutdown, 0); r->fd = fd; r->watcher_root.next = r->watcher_root.prev = &r->watcher_root; @@ -166,11 +169,6 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } -typedef struct { - grpc_iomgr_cb_func cb; - void *arg; -} callback; - static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, int allow_synchronous_callback) { if (allow_synchronous_callback) { @@ -180,18 +178,18 @@ static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, } } -static void make_callbacks(callback *callbacks, size_t n, int success, +static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success, int allow_synchronous_callback) { size_t i; for (i = 0; i < n; i++) { - make_callback(callbacks[i].cb, callbacks[i].arg, success, + make_callback(callbacks[i].cb, callbacks[i].cb_arg, success, allow_synchronous_callback); } } -static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, - void *arg, int allow_synchronous_callback) { - switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { +static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, + int allow_synchronous_callback) { + switch (gpr_atm_acq_load(st)) { case NOT_READY: /* There is no race if the descriptor is already ready, so we skip the interlocked op in that case. As long as the app doesn't @@ -199,9 +197,7 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, oldval should never be anything other than READY or NOT_READY. We don't check for user error on the fast path. */ - st->cb = cb; - st->cb_arg = arg; - if (gpr_atm_rel_cas(&st->state, NOT_READY, WAITING)) { + if (gpr_atm_rel_cas(st, NOT_READY, (gpr_intptr)closure)) { /* swap was successful -- the closure will run after the next set_ready call. NOTE: we don't have an ABA problem here, since we should never have concurrent calls to the same @@ -212,12 +208,13 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, /* swap was unsuccessful due to an intervening set_ready call. Fall through to the READY code below */ case READY: - assert(gpr_atm_acq_load(&st->state) == READY); - gpr_atm_rel_store(&st->state, NOT_READY); - make_callback(cb, arg, !gpr_atm_acq_load(&fd->shutdown), + assert(gpr_atm_acq_load(st) == READY); + gpr_atm_rel_store(st, NOT_READY); + make_callback(closure->cb, closure->cb_arg, + !gpr_atm_acq_load(&fd->shutdown), allow_synchronous_callback); return; - case WAITING: + default: /* WAITING */ /* upcallptr was set to a different closure. This is an error! */ gpr_log(GPR_ERROR, "User called a notify_on function with a previous callback still " @@ -228,38 +225,38 @@ static void notify_on(grpc_fd *fd, grpc_fd_state *st, grpc_iomgr_cb_func cb, abort(); } -static void set_ready_locked(grpc_fd_state *st, callback *callbacks, +static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks, size_t *ncallbacks) { - callback *c; + gpr_intptr state = gpr_atm_acq_load(st); - switch ((enum descriptor_state)gpr_atm_acq_load(&st->state)) { + switch (state) { + case READY: + /* duplicate ready, ignore */ + return; case NOT_READY: - if (gpr_atm_rel_cas(&st->state, NOT_READY, READY)) { + if (gpr_atm_rel_cas(st, NOT_READY, READY)) { /* swap was successful -- the closure will run after the next notify_on call. */ return; } - /* swap was unsuccessful due to an intervening set_ready call. - Fall through to the WAITING code below */ - case WAITING: - assert(gpr_atm_acq_load(&st->state) == WAITING); - c = &callbacks[(*ncallbacks)++]; - c->cb = st->cb; - c->arg = st->cb_arg; - gpr_atm_rel_store(&st->state, NOT_READY); - return; - case READY: - /* duplicate ready, ignore */ + /* swap was unsuccessful due to an intervening set_ready call. + Fall through to the WAITING code below */ + state = gpr_atm_acq_load(st); + default: /* waiting */ + assert(gpr_atm_acq_load(st) != READY && + gpr_atm_acq_load(st) != NOT_READY); + callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state; + gpr_atm_rel_store(st, NOT_READY); return; } } -static void set_ready(grpc_fd *fd, grpc_fd_state *st, +static void set_ready(grpc_fd *fd, gpr_atm *st, int allow_synchronous_callback) { /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - callback cb; + grpc_iomgr_closure cb; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); set_ready_locked(st, &cb, &ncb); @@ -269,7 +266,7 @@ static void set_ready(grpc_fd *fd, grpc_fd_state *st, } void grpc_fd_shutdown(grpc_fd *fd) { - callback cb[2]; + grpc_iomgr_closure cb[2]; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); GPR_ASSERT(!gpr_atm_acq_load(&fd->shutdown)); @@ -280,14 +277,12 @@ void grpc_fd_shutdown(grpc_fd *fd) { make_callbacks(cb, ncb, 0, 0); } -void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_cb_func read_cb, - void *read_cb_arg) { - notify_on(fd, &fd->readst, read_cb, read_cb_arg, 0); +void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { + notify_on(fd, &fd->readst, closure, 0); } -void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_cb_func write_cb, - void *write_cb_arg) { - notify_on(fd, &fd->writest, write_cb, write_cb_arg, 0); +void grpc_fd_notify_on_write(grpc_fd *fd, grpc_iomgr_closure *closure) { + notify_on(fd, &fd->writest, closure, 0); } gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, @@ -305,8 +300,8 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, watcher->fd = fd; gpr_mu_unlock(&fd->watcher_mu); - return (gpr_atm_acq_load(&fd->readst.state) != READY ? read_mask : 0) | - (gpr_atm_acq_load(&fd->writest.state) != READY ? write_mask : 0); + return (gpr_atm_acq_load(&fd->readst) != READY ? read_mask : 0) | + (gpr_atm_acq_load(&fd->writest) != READY ? write_mask : 0); } void grpc_fd_end_poll(grpc_fd_watcher *watcher) { |