diff options
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 65 |
1 files changed, 64 insertions, 1 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 87e7aa85ee..39945ff3cb 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -201,11 +201,43 @@ static void become_empty_pollset(grpc_pollset *pollset) { * via poll() */ -static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + +typedef struct grpc_unary_promote_args { + const grpc_pollset_vtable *original_vtable; + grpc_pollset *pollset; + grpc_fd *fd; +} grpc_unary_promote_args; + +static void unary_poll_do_promote(void *args, int success) { + grpc_unary_promote_args *up_args = args; + const grpc_pollset_vtable *original_vtable = up_args->original_vtable; + grpc_pollset *pollset = up_args->pollset; + grpc_fd *fd = up_args->fd; grpc_fd *fds[2]; + gpr_free(up_args); + + gpr_mu_lock(&pollset->mu); + /* First we need to ensure that nobody is polling concurrently */ + while (pollset->counter != 0 && pollset->vtable == original_vtable) { + grpc_pollset_kick(pollset); + gpr_cv_wait(&pollset->cv, &pollset->mu, gpr_inf_future); + } + /* At this point the pollset may no longer be a unary poller. In that case + * we should just call the right add function and be done. */ + /* TODO(klempner): If we're not careful this could cause infinite recursion. + * That's not a problem for now because empty_pollset has a trivial poller + * and we don't have any mechanism to unbecome multipoller. */ + if (pollset->vtable != original_vtable) { + pollset->vtable->add_fd(pollset, fd); + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); + return; + } + if (fd == pollset->data.ptr) return; fds[0] = pollset->data.ptr; fds[1] = fd; + if (!grpc_fd_is_orphaned(fds[0])) { grpc_platform_become_multipoller(pollset, fds, GPR_ARRAY_SIZE(fds)); grpc_fd_unref(fds[0]); @@ -216,6 +248,37 @@ static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { pollset->data.ptr = fd; grpc_fd_ref(fd); } + + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); + + /* Matching ref in unary_poll_pollset_add_fd */ + grpc_fd_unref(fd); +} + +static void unary_poll_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) { + grpc_unary_promote_args *up_args; + if (fd == pollset->data.ptr) return; + + if (grpc_fd_is_orphaned(pollset->data.ptr)) { + /* old fd is orphaned and we haven't cleaned it up until now, so remain a + * unary poller */ + grpc_fd_unref(pollset->data.ptr); + pollset->data.ptr = fd; + grpc_fd_ref(fd); + return; + } + + /* Now we need to promote. This needs to happen when we're not polling. Since + * this may be called from poll, the wait needs to happen asynchronously. */ + grpc_fd_ref(fd); + up_args = gpr_malloc(sizeof(*up_args)); + up_args->pollset = pollset; + up_args->fd = fd; + up_args->original_vtable = pollset->vtable; + grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + + grpc_pollset_kick(pollset); } static void unary_poll_pollset_del_fd(grpc_pollset *pollset, grpc_fd *fd) { |