diff options
Diffstat (limited to 'src/core/iomgr/fd_posix.c')
-rw-r--r-- | src/core/iomgr/fd_posix.c | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index 4ba06676ab..28ed7708f7 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -90,6 +90,7 @@ static grpc_fd *alloc_fd(int fd) { gpr_mu_init(&r->set_state_mu); gpr_mu_init(&r->watcher_mu); } + gpr_atm_rel_store(&r->refst, 1); gpr_atm_rel_store(&r->readst, NOT_READY); gpr_atm_rel_store(&r->writest, NOT_READY); @@ -115,8 +116,7 @@ static void ref_by(grpc_fd *fd, int n) { static void unref_by(grpc_fd *fd, int n) { gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n); if (old == n) { - close(fd->fd); - grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data); + grpc_iomgr_add_callback(&fd->on_done_closure); freelist_fd(fd); grpc_iomgr_unregister_object(&fd->iomgr_object); } else { @@ -179,8 +179,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) { } void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) { - fd->on_done = on_done ? on_done : do_nothing; - fd->on_done_user_data = user_data; + grpc_iomgr_closure_init(&fd->on_done_closure, on_done ? on_done : do_nothing, + user_data); shutdown(fd->fd, SHUT_RDWR); ref_by(fd, 1); /* remove active status, but keep referenced */ gpr_mu_lock(&fd->watcher_mu); @@ -194,21 +194,20 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); } void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); } -static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success, +static void process_callback(grpc_iomgr_closure *closure, int success, int allow_synchronous_callback) { if (allow_synchronous_callback) { - cb(arg, success); + closure->cb(closure->cb_arg, success); } else { - grpc_iomgr_add_delayed_callback(cb, arg, success); + grpc_iomgr_add_delayed_callback(closure, success); } } -static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success, - int allow_synchronous_callback) { +static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n, + int success, int allow_synchronous_callback) { size_t i; for (i = 0; i < n; i++) { - make_callback(callbacks[i].cb, callbacks[i].cb_arg, success, - allow_synchronous_callback); + process_callback(callbacks + i, success, allow_synchronous_callback); } } @@ -233,10 +232,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, /* swap was unsuccessful due to an intervening set_ready call. Fall through to the READY code below */ case READY: - assert(gpr_atm_no_barrier_load(st) == READY); + GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY); gpr_atm_rel_store(st, NOT_READY); - make_callback(closure->cb, closure->cb_arg, - !gpr_atm_acq_load(&fd->shutdown), + process_callback(closure, !gpr_atm_acq_load(&fd->shutdown), allow_synchronous_callback); return; default: /* WAITING */ @@ -250,7 +248,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure, abort(); } -static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks, +static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks, size_t *ncallbacks) { gpr_intptr state = gpr_atm_acq_load(st); @@ -268,9 +266,9 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks, Fall through to the WAITING code below */ state = gpr_atm_acq_load(st); default: /* waiting */ - assert(gpr_atm_no_barrier_load(st) != READY && - gpr_atm_no_barrier_load(st) != NOT_READY); - callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state; + GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY && + gpr_atm_no_barrier_load(st) != NOT_READY); + callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state; gpr_atm_rel_store(st, NOT_READY); return; } @@ -281,25 +279,30 @@ static void set_ready(grpc_fd *fd, gpr_atm *st, /* only one set_ready can be active at once (but there may be a racing notify_on) */ int success; - grpc_iomgr_closure cb; + grpc_iomgr_closure* closure; size_t ncb = 0; + gpr_mu_lock(&fd->set_state_mu); - set_ready_locked(st, &cb, &ncb); + set_ready_locked(st, &closure, &ncb); gpr_mu_unlock(&fd->set_state_mu); success = !gpr_atm_acq_load(&fd->shutdown); - make_callbacks(&cb, ncb, success, allow_synchronous_callback); + GPR_ASSERT(ncb <= 1); + if (ncb > 0) { + process_callbacks(closure, ncb, success, allow_synchronous_callback); + } } void grpc_fd_shutdown(grpc_fd *fd) { - grpc_iomgr_closure cb[2]; size_t ncb = 0; gpr_mu_lock(&fd->set_state_mu); GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown)); gpr_atm_rel_store(&fd->shutdown, 1); - set_ready_locked(&fd->readst, cb, &ncb); - set_ready_locked(&fd->writest, cb, &ncb); + set_ready_locked(&fd->readst, &fd->shutdown_closures[0], &ncb); + set_ready_locked(&fd->writest, &fd->shutdown_closures[0], &ncb); gpr_mu_unlock(&fd->set_state_mu); - make_callbacks(cb, ncb, 0, 0); + GPR_ASSERT(ncb <= 2); + process_callbacks(fd->shutdown_closures[0], ncb, 0 /* GPR_FALSE */, + 0 /* GPR_FALSE */); } void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) { |