diff options
author | 2017-10-04 22:41:13 +0000 | |
---|---|---|
committer | 2017-10-04 22:41:13 +0000 | |
commit | 4fd6a41e0bf3e64ff393ce310b9439c638613301 (patch) | |
tree | 985320eb60f3d1956cbcd13a63cc296b5a09f548 /src/core/lib/iomgr | |
parent | 29a9c3af38e28153c37541163f73ad6223eb4ff6 (diff) |
Fix orphan behavior
Diffstat (limited to 'src/core/lib/iomgr')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.c | 63 |
1 files changed, 47 insertions, 16 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c index a5ef1d89f9..6724e7df1f 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.c +++ b/src/core/lib/iomgr/ev_epollex_linux.c @@ -130,6 +130,8 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; + gpr_mu orphan_mu; + gpr_mu pollable_mu; pollable *pollable_obj; @@ -268,6 +270,7 @@ static void fd_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_iomgr_unregister_object(&fd->iomgr_object); POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); gpr_mu_destroy(&fd->pollable_mu); + gpr_mu_destroy(&fd->orphan_mu); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; fd_freelist = fd; @@ -328,6 +331,7 @@ static grpc_fd *fd_create(int fd, const char *name) { } gpr_mu_init(&new_fd->pollable_mu); + gpr_mu_init(&new_fd->orphan_mu); new_fd->pollable_obj = NULL; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; @@ -360,6 +364,8 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, bool already_closed, const char *reason) { bool is_fd_closed = already_closed; + gpr_mu_lock(&fd->orphan_mu); + fd->on_done_closure = on_done; /* If release_fd is not NULL, we should be relinquishing control of the file @@ -381,6 +387,8 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, GRPC_CLOSURE_SCHED(exec_ctx, fd->on_done_closure, GRPC_ERROR_NONE); + gpr_mu_unlock(&fd->orphan_mu); + UNREF_BY(exec_ctx, fd, 2, reason); /* Drop the reference */ } @@ -1027,9 +1035,15 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx, pollset, fd); break; case PO_FD: - /* fd --> multipoller */ - error = pollset_transition_pollable_from_fd_to_multi_locked(exec_ctx, - pollset, fd); + gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); + if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & 1) == 0) { + error = pollset_transition_pollable_from_empty_to_fd_locked(exec_ctx, pollset, fd); + } else { + /* fd --> multipoller */ + error = pollset_transition_pollable_from_fd_to_multi_locked(exec_ctx, + pollset, fd); + } + gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); break; case PO_MULTI: error = pollable_add_fd(pollset->active_pollable, fd); @@ -1057,8 +1071,15 @@ static grpc_error *pollset_as_multipollable(grpc_exec_ctx *exec_ctx, error = pollable_create(PO_MULTI, &pollset->active_pollable); break; case PO_FD: - error = pollset_transition_pollable_from_fd_to_multi_locked( - exec_ctx, pollset, NULL); + gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); + if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & 1) == 0) { + POLLABLE_UNREF(pollset->active_pollable, "pollset"); + error = pollable_create(PO_MULTI, &pollset->active_pollable); + } else { + error = pollset_transition_pollable_from_fd_to_multi_locked( + exec_ctx, pollset, NULL); + } + gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); break; case PO_MULTI: break; @@ -1212,14 +1233,23 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&pss->mu); } +// add all fds to pollables, and output a new array of unorphaned out_fds static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds, size_t fd_count, pollable **pollables, size_t pollable_count, - const char *err_desc) { + const char *err_desc, grpc_fd **out_fds, size_t *out_fd_count) { grpc_error *error = GRPC_ERROR_NONE; for (size_t i = 0; i < fd_count; i++) { - for (size_t j = 0; j < pollable_count; j++) { - append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc); + gpr_mu_lock(&fds[i]->orphan_mu); + if ((gpr_atm_no_barrier_load(&fds[i]->refst) & 1) == 0) { + gpr_mu_unlock(&fds[i]->orphan_mu); + UNREF_BY(exec_ctx, fds[i], 2, "pollset_set"); + } else { + for (size_t j = 0; j < pollable_count; j++) { + append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc); + } + gpr_mu_unlock(&fds[i]->orphan_mu); + out_fds[(*out_fd_count)++] = fds[i]; } } return error; @@ -1267,25 +1297,26 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, } gpr_ref(&a->refs); b->parent = a; + if (a->fd_capacity < a->fd_count + b->fd_count) { + a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count); + a->fds = gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds)); + } + size_t initial_a_fd_count = a->fd_count; + a->fd_count = 0; append_error(&error, - add_fds_to_pollables(exec_ctx, a->fds, a->fd_count, b->pollsets, - b->pollset_count, "merge_a2b"), + add_fds_to_pollables(exec_ctx, a->fds, initial_a_fd_count, b->pollsets, + b->pollset_count, "merge_a2b", a->fds, &a->fd_count), err_desc); append_error(&error, add_fds_to_pollables(exec_ctx, b->fds, b->fd_count, a->pollsets, - a->pollset_count, "merge_b2a"), + a->pollset_count, "merge_b2a", a->fds, &a->fd_count), err_desc); - if (a->fd_capacity < a->fd_count + b->fd_count) { - a->fd_capacity = GPR_MAX(2 * a->fd_capacity, a->fd_count + b->fd_count); - a->fds = gpr_realloc(a->fds, a->fd_capacity * sizeof(*a->fds)); - } if (a->pollset_capacity < a->pollset_count + b->pollset_count) { a->pollset_capacity = GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count); a->pollsets = gpr_realloc(a->pollsets, a->pollset_capacity * sizeof(*a->pollsets)); } - memcpy(a->fds + a->fd_count, b->fds, b->fd_count * sizeof(*b->fds)); memcpy(a->pollsets + a->pollset_count, b->pollsets, b->pollset_count * sizeof(*b->pollsets)); a->fd_count += b->fd_count; |