aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/iomgr/fd_posix.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/iomgr/fd_posix.c')
-rw-r--r--src/core/iomgr/fd_posix.c53
1 files changed, 28 insertions, 25 deletions
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 4ba06676ab..28ed7708f7 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -90,6 +90,7 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_init(&r->set_state_mu);
gpr_mu_init(&r->watcher_mu);
}
+
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst, NOT_READY);
gpr_atm_rel_store(&r->writest, NOT_READY);
@@ -115,8 +116,7 @@ static void ref_by(grpc_fd *fd, int n) {
static void unref_by(grpc_fd *fd, int n) {
gpr_atm old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
- close(fd->fd);
- grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
+ grpc_iomgr_add_callback(&fd->on_done_closure);
freelist_fd(fd);
grpc_iomgr_unregister_object(&fd->iomgr_object);
} else {
@@ -179,8 +179,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
- fd->on_done = on_done ? on_done : do_nothing;
- fd->on_done_user_data = user_data;
+ grpc_iomgr_closure_init(&fd->on_done_closure, on_done ? on_done : do_nothing,
+ user_data);
shutdown(fd->fd, SHUT_RDWR);
ref_by(fd, 1); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
@@ -194,21 +194,20 @@ void grpc_fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
void grpc_fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
-static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
+static void process_callback(grpc_iomgr_closure *closure, int success,
int allow_synchronous_callback) {
if (allow_synchronous_callback) {
- cb(arg, success);
+ closure->cb(closure->cb_arg, success);
} else {
- grpc_iomgr_add_delayed_callback(cb, arg, success);
+ grpc_iomgr_add_delayed_callback(closure, success);
}
}
-static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
- int allow_synchronous_callback) {
+static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
+ int success, int allow_synchronous_callback) {
size_t i;
for (i = 0; i < n; i++) {
- make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
- allow_synchronous_callback);
+ process_callback(callbacks + i, success, allow_synchronous_callback);
}
}
@@ -233,10 +232,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the READY code below */
case READY:
- assert(gpr_atm_no_barrier_load(st) == READY);
+ GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
- make_callback(closure->cb, closure->cb_arg,
- !gpr_atm_acq_load(&fd->shutdown),
+ process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
allow_synchronous_callback);
return;
default: /* WAITING */
@@ -250,7 +248,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
abort();
}
-static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
+static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
size_t *ncallbacks) {
gpr_intptr state = gpr_atm_acq_load(st);
@@ -268,9 +266,9 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
Fall through to the WAITING code below */
state = gpr_atm_acq_load(st);
default: /* waiting */
- assert(gpr_atm_no_barrier_load(st) != READY &&
- gpr_atm_no_barrier_load(st) != NOT_READY);
- callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state;
+ GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
+ gpr_atm_no_barrier_load(st) != NOT_READY);
+ callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state;
gpr_atm_rel_store(st, NOT_READY);
return;
}
@@ -281,25 +279,30 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
- grpc_iomgr_closure cb;
+ grpc_iomgr_closure* closure;
size_t ncb = 0;
+
gpr_mu_lock(&fd->set_state_mu);
- set_ready_locked(st, &cb, &ncb);
+ set_ready_locked(st, &closure, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
success = !gpr_atm_acq_load(&fd->shutdown);
- make_callbacks(&cb, ncb, success, allow_synchronous_callback);
+ GPR_ASSERT(ncb <= 1);
+ if (ncb > 0) {
+ process_callbacks(closure, ncb, success, allow_synchronous_callback);
+ }
}
void grpc_fd_shutdown(grpc_fd *fd) {
- grpc_iomgr_closure cb[2];
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
gpr_atm_rel_store(&fd->shutdown, 1);
- set_ready_locked(&fd->readst, cb, &ncb);
- set_ready_locked(&fd->writest, cb, &ncb);
+ set_ready_locked(&fd->readst, &fd->shutdown_closures[0], &ncb);
+ set_ready_locked(&fd->writest, &fd->shutdown_closures[0], &ncb);
gpr_mu_unlock(&fd->set_state_mu);
- make_callbacks(cb, ncb, 0, 0);
+ GPR_ASSERT(ncb <= 2);
+ process_callbacks(fd->shutdown_closures[0], ncb, 0 /* GPR_FALSE */,
+ 0 /* GPR_FALSE */);
}
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {