aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/lib/iomgr
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-10-04 22:41:13 +0000
committerGravatar Craig Tiller <ctiller@google.com>2017-10-04 22:41:13 +0000
commit4fd6a41e0bf3e64ff393ce310b9439c638613301 (patch)
tree985320eb60f3d1956cbcd13a63cc296b5a09f548 /src/core/lib/iomgr
parent29a9c3af38e28153c37541163f73ad6223eb4ff6 (diff)
Fix orphan behavior
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c63
1 files changed, 47 insertions, 16 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index a5ef1d89f9..6724e7df1f 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -130,6 +130,8 @@ struct grpc_fd {
Ref/Unref by two to avoid altering the orphaned bit */
gpr_atm refst;
+ gpr_mu orphan_mu;
+
gpr_mu pollable_mu;
pollable *pollable_obj;
@@ -268,6 +270,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) {
grpc_iomgr_unregister_object(&fd->iomgr_object);
POLLABLE_UNREF(fd->pollable_obj, "fd_pollable");
gpr_mu_destroy(&fd->pollable_mu);
+ gpr_mu_destroy(&fd->orphan_mu);
gpr_mu_lock(&fd_freelist_mu);
fd->freelist_next = fd_freelist;
fd_freelist = fd;
@@ -328,6 +331,7 @@ static grpc_fd *fd_create(int fd, const char *name) {
}
gpr_mu_init(&new_fd->pollable_mu);
+ gpr_mu_init(&new_fd->orphan_mu);
new_fd->pollable_obj = NULL;
gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1);
new_fd->fd = fd;
@@ -360,6 +364,8 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
bool already_closed, const char *reason) {
bool is_fd_closed = already_closed;
+ gpr_mu_lock(&fd->orphan_mu);
+
fd->on_done_closure = on_done;
/* If release_fd is not NULL, we should be relinquishing control of the file
@@ -381,6 +387,8 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE);
+ gpr_mu_unlock(&fd->orphan_mu);
+
UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */
}
@@ -1027,9 +1035,15 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
pollset, fd);
break;
case PO_FD:
- /* fd --> multipoller */
- error = pollset_transition_pollable_from_fd_to_multi_locked(exec_ctx,
- pollset, fd);
+ gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
+ if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & 1) == 0) {
+ error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx, pollset, fd);
+ } else {
+ /* fd --> multipoller */
+ error = pollset_transition_pollable_from_fd_to_multi_locked(exec_ctx,
+ pollset, fd);
+ }
+ gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
break;
case PO_MULTI:
error = pollable_add_fd(pollset->active_pollable, fd);
@@ -1057,8 +1071,15 @@ static grpc_error *pollset_as_multipollable(grpc_exec_ctx *exec_ctx,
error = pollable_create(PO_MULTI, &pollset->active_pollable);
break;
case PO_FD:
- error = pollset_transition_pollable_from_fd_to_multi_locked(
- exec_ctx, pollset, NULL);
+ gpr_mu_lock(&po_at_start->owner_fd->orphan_mu);
+ if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & 1) == 0) {
+ POLLABLE_UNREF(pollset->active_pollable, "pollset");
+ error = pollable_create(PO_MULTI, &pollset->active_pollable);
+ } else {
+ error = pollset_transition_pollable_from_fd_to_multi_locked(
+ exec_ctx, pollset, NULL);
+ }
+ gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu);
break;
case PO_MULTI:
break;
@@ -1212,14 +1233,23 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
gpr_mu_unlock(&pss->mu);
}
+// add all fds to pollables, and output a new array of unorphaned out_fds
static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds,
size_t fd_count, pollable **pollables,
size_t pollable_count,
- const char *err_desc) {
+ const char *err_desc, grpc_fd **out_fds, size_t *out_fd_count) {
grpc_error *error = GRPC_ERROR_NONE;
for (size_t i = 0; i < fd_count; i++) {
- for (size_t j = 0; j < pollable_count; j++) {
- append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc);
+ gpr_mu_lock(&fds[i]->orphan_mu);
+ if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) {
+ gpr_mu_unlock(&fds[i]->orphan_mu);
+ UNREF_BY(exec_ctx, fds[i], 2, "pollset_set");
+ } else {
+ for (size_t j = 0; j < pollable_count; j++) {
+ append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc);
+ }
+ gpr_mu_unlock(&fds[i]->orphan_mu);
+ out_fds[(*out_fd_count)++] = fds[i];
}
}
return error;
@@ -1267,25 +1297,26 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
}
gpr_ref(&a->refs);
b->parent = a;
+ if (a->fd_capacity < a->fd_count + b->fd_count) {
+ a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count);
+ a->fds = gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds));
+ }
+ size_t initial_a_fd_count = a->fd_count;
+ a->fd_count = 0;
append_error(&error,
- add_fds_to_pollables(exec_ctx, a->fds, a->fd_count, b->pollsets,
- b->pollset_count, "merge_a2b"),
+ add_fds_to_pollables(exec_ctx, a->fds, initial_a_fd_count, b->pollsets,
+ b->pollset_count, "merge_a2b", a->fds, &a->fd_count),
err_desc);
append_error(&error,
add_fds_to_pollables(exec_ctx, b->fds, b->fd_count, a->pollsets,
- a->pollset_count, "merge_b2a"),
+ a->pollset_count, "merge_b2a", a->fds, &a->fd_count),
err_desc);
- if (a->fd_capacity < a->fd_count + b->fd_count) {
- a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count);
- a->fds = gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds));
- }
if (a->pollset_capacity < a->pollset_count + b->pollset_count) {
a->pollset_capacity =
GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count);
a->pollsets =
gpr_realloc(a->pollsets, a->pollset_capacity * sizeof(*a->pollsets));
}
- memcpy(a->fds + a->fd_count, b->fds, b->fd_count * sizeof(*b->fds));
memcpy(a->pollsets + a->pollset_count, b->pollsets,
b->pollset_count * sizeof(*b->pollsets));
a->fd_count += b->fd_count;