From 8232492cce70bcf833dc82254d53d00b3544a751 Mon Sep 17 00:00:00 2001 From: Craig Tiller Date: Mon, 16 Oct 2017 21:37:28 +0000 Subject: Fix leak --- src/core/lib/iomgr/ev_epollex_linux.cc | 42 ++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 1e8de8910e..8b0ebc2d17 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -201,7 +201,7 @@ struct grpc_pollset_set { size_t pollset_count; size_t pollset_capacity; - pollable **pollsets; + grpc_pollset **pollsets; size_t fd_count; size_t fd_capacity; @@ -545,6 +545,9 @@ static void pollset_global_shutdown(void) { static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) { +if (GRPC_TRACER_ON(grpc_polling_trace)) { +gpr_log(GPR_DEBUG, "PS:%p (pollable:%p) maybe_finish_shutdown sc=%p (target:!NULL) rw=%p (target:NULL) cpsc=%d (target:0)", pollset, pollset->active_pollable, pollset->shutdown_closure, pollset->root_worker, pollset->containing_pollset_set_count); +} if (pollset->shutdown_closure != NULL && pollset->root_worker == NULL && pollset->containing_pollset_set_count == 0) { GRPC_CLOSURE_SCHED(exec_ctx, pollset->shutdown_closure, GRPC_ERROR_NONE); @@ -1123,7 +1126,11 @@ static void pollset_set_unref(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss) { pollset_set_unref(exec_ctx, pss->parent); gpr_mu_destroy(&pss->mu); for (size_t i = 0; i < pss->pollset_count; i++) { - POLLABLE_UNREF(pss->pollsets[i], "pollset_set"); + gpr_mu_lock(&pss->pollsets[i]->mu); + if (0==--pss->pollsets[i]->containing_pollset_set_count) { + pollset_maybe_finish_shutdown(exec_ctx, pss->pollsets[i]); + } + gpr_mu_unlock(&pss->pollsets[i]->mu); } for (size_t i = 0; i < pss->fd_count; i++) { UNREF_BY(exec_ctx, pss->fds[i], 2, "pollset_set"); @@ -1142,7 +1149,7 @@ static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, static const char *err_desc = "pollset_set_add_fd"; pss = pss_lock_adam(pss); for (size_t i = 0; i < pss->pollset_count; i++) { - append_error(&error, pollable_add_fd(pss->pollsets[i], fd), err_desc); + append_error(&error, pollable_add_fd(pss->pollsets[i]->active_pollable, fd), err_desc); } if (pss->fd_count == pss->fd_capacity) { pss->fd_capacity = GPR_MAX(pss->fd_capacity * 2, 8); @@ -1185,8 +1192,7 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, pss = pss_lock_adam(pss); size_t i; for (i = 0; i < pss->pollset_count; i++) { - if (pss->pollsets[i] == ps->active_pollable) { - POLLABLE_UNREF(pss->pollsets[i], "pollset_set"); + if (pss->pollsets[i] == ps) { break; } } @@ -1204,9 +1210,10 @@ static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, } // add all fds to pollables, and output a new array of unorphaned out_fds -static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds, - size_t fd_count, pollable **pollables, - size_t pollable_count, +// assumes pollsets are multipollable +static grpc_error *add_fds_to_pollsets(grpc_exec_ctx *exec_ctx, grpc_fd **fds, + size_t fd_count, grpc_pollset **pollsets, + size_t pollset_count, const char *err_desc, grpc_fd **out_fds, size_t *out_fd_count) { grpc_error *error = GRPC_ERROR_NONE; @@ -1216,8 +1223,8 @@ static grpc_error *add_fds_to_pollables(grpc_exec_ctx *exec_ctx, grpc_fd **fds, gpr_mu_unlock(&fds[i]->orphan_mu); UNREF_BY(exec_ctx, fds[i], 2, "pollset_set"); } else { - for (size_t j = 0; j < pollable_count; j++) { - append_error(&error, pollable_add_fd(pollables[j], fds[i]), err_desc); + for (size_t j = 0; j < pollset_count; j++) { + append_error(&error, pollable_add_fd(pollsets[j]->active_pollable, fds[i]), err_desc); } gpr_mu_unlock(&fds[i]->orphan_mu); out_fds[(*out_fd_count)++] = fds[i]; @@ -1246,17 +1253,18 @@ static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, pss = pss_lock_adam(pss); size_t initial_fd_count = pss->fd_count; pss->fd_count = 0; - append_error(&error, add_fds_to_pollables(exec_ctx, pss->fds, - initial_fd_count, &pollable_obj, 1, + append_error(&error, add_fds_to_pollsets(exec_ctx, pss->fds, + initial_fd_count, &ps, 1, err_desc, pss->fds, &pss->fd_count), err_desc); if (pss->pollset_count == pss->pollset_capacity) { pss->pollset_capacity = GPR_MAX(pss->pollset_capacity * 2, 8); - pss->pollsets = (pollable **)gpr_realloc( + pss->pollsets = (grpc_pollset **)gpr_realloc( pss->pollsets, pss->pollset_capacity * sizeof(*pss->pollsets)); } - pss->pollsets[pss->pollset_count++] = pollable_obj; + pss->pollsets[pss->pollset_count++] = ps; gpr_mu_unlock(&pss->mu); +POLLABLE_UNREF(pollable_obj, "pollset_set"); GRPC_LOG_IF_ERROR(err_desc, error); } @@ -1309,18 +1317,18 @@ static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, } size_t initial_a_fd_count = a->fd_count; a->fd_count = 0; - append_error(&error, add_fds_to_pollables( + append_error(&error, add_fds_to_pollsets( exec_ctx, a->fds, initial_a_fd_count, b->pollsets, b->pollset_count, "merge_a2b", a->fds, &a->fd_count), err_desc); - append_error(&error, add_fds_to_pollables(exec_ctx, b->fds, b->fd_count, + append_error(&error, add_fds_to_pollsets(exec_ctx, b->fds, b->fd_count, a->pollsets, a->pollset_count, "merge_b2a", a->fds, &a->fd_count), err_desc); if (a->pollset_capacity < a->pollset_count + b->pollset_count) { a->pollset_capacity = GPR_MAX(2 * a->pollset_capacity, a->pollset_count + b->pollset_count); - a->pollsets = (pollable **)gpr_realloc( + a->pollsets = (grpc_pollset **)gpr_realloc( a->pollsets, a->pollset_capacity * sizeof(*a->pollsets)); } memcpy(a->pollsets + a->pollset_count, b->pollsets, -- cgit v1.2.3