diff options
-rw-r--r-- | src/core/lib/iomgr/ev_epoll_linux.c | 585 |
1 files changed, 272 insertions, 313 deletions
diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index 07fbfd849e..1b15e0eb4f 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -69,6 +69,9 @@ static int grpc_polling_trace = 0; /* Disabled by default */ gpr_log(GPR_INFO, (fmt), __VA_ARGS__); \ } +/* Uncomment the following enable extra checks on poll_object operations */ +/* #define PO_DEBUG */ + static int grpc_wakeup_signal = -1; static bool is_grpc_wakeup_signal_initialized = false; @@ -95,10 +98,42 @@ void grpc_use_signal(int signum) { struct polling_island; +typedef enum { + POLL_OBJ_FD, + POLL_OBJ_POLLSET, + POLL_OBJ_POLLSET_SET +} poll_obj_type; + +typedef struct poll_obj { +#ifdef PO_DEBUG + poll_obj_type obj_type; +#endif + gpr_mu mu; + struct polling_island *pi; +} poll_obj; + +const char *poll_obj_string(poll_obj_type po_type) { + switch (po_type) { + case POLL_OBJ_FD: + return "fd"; + case POLL_OBJ_POLLSET: + return "pollset"; + case POLL_OBJ_POLLSET_SET: + return "pollset_set"; + } + + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + /******************************************************************************* * Fd Declarations */ + +#define FD_FROM_PO(po) ((grpc_fd *)(po)) + struct grpc_fd { + poll_obj po; + int fd; /* refst format: bit 0 : 1=Active / 0=Orphaned @@ -106,8 +141,6 @@ struct grpc_fd { Ref/Unref by two to avoid altering the orphaned bit */ gpr_atm refst; - gpr_mu mu; - /* Indicates that the fd is shutdown and that any pending read/write closures should fail */ bool shutdown; @@ -120,9 +153,6 @@ struct grpc_fd { grpc_closure *read_closure; grpc_closure *write_closure; - /* The polling island to which this fd belongs to (protected by mu) */ - struct polling_island *polling_island; - struct grpc_fd *freelist_next; grpc_closure *on_done_closure; @@ -225,41 +255,21 @@ struct grpc_pollset_worker { }; struct grpc_pollset { - gpr_mu mu; + poll_obj po; + grpc_pollset_worker root_worker; bool kicked_without_pollers; bool shutting_down; /* Is the pollset shutting down ? */ bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */ grpc_closure *shutdown_done; /* Called after after shutdown is complete */ - - /* The polling island to which this pollset belongs to */ - struct polling_island *polling_island; }; /******************************************************************************* * Pollset-set Declarations */ -/* TODO: sreek - Change the pollset_set implementation such that a pollset_set - * directly points to a polling_island (and adding an fd/pollset/pollset_set to - * the current pollset_set would result in polling island merges. This would - * remove the need to maintain fd_count here. This will also significantly - * simplify the grpc_fd structure since we would no longer need to explicitly - * maintain the orphaned state */ struct grpc_pollset_set { - gpr_mu mu; - - size_t pollset_count; - size_t pollset_capacity; - grpc_pollset **pollsets; - - size_t pollset_set_count; - size_t pollset_set_capacity; - struct grpc_pollset_set **pollset_sets; - - size_t fd_count; - size_t fd_capacity; - grpc_fd **fds; + poll_obj po; }; /******************************************************************************* @@ -915,7 +925,7 @@ static void fd_global_shutdown(void) { while (fd_freelist != NULL) { grpc_fd *fd = fd_freelist; fd_freelist = fd_freelist->freelist_next; - gpr_mu_destroy(&fd->mu); + gpr_mu_destroy(&fd->po.mu); gpr_free(fd); } gpr_mu_destroy(&fd_freelist_mu); @@ -933,13 +943,17 @@ static grpc_fd *fd_create(int fd, const char *name) { if (new_fd == NULL) { new_fd = gpr_malloc(sizeof(grpc_fd)); - gpr_mu_init(&new_fd->mu); + gpr_mu_init(&new_fd->po.mu); } - /* Note: It is not really needed to get the new_fd->mu lock here. If this is a - newly created fd (or an fd we got from the freelist), no one else would be - holding a lock to it anyway. */ - gpr_mu_lock(&new_fd->mu); + /* Note: It is not really needed to get the new_fd->po.mu lock here. If this + * is a newly created fd (or an fd we got from the freelist), no one else + * would be holding a lock to it anyway. */ + gpr_mu_lock(&new_fd->po.mu); + new_fd->po.pi = NULL; +#ifdef PO_DEBUG + new_fd->po.obj_type = POLL_OBJ_FD; +#endif gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; @@ -947,12 +961,11 @@ static grpc_fd *fd_create(int fd, const char *name) { new_fd->orphaned = false; new_fd->read_closure = CLOSURE_NOT_READY; new_fd->write_closure = CLOSURE_NOT_READY; - new_fd->polling_island = NULL; new_fd->freelist_next = NULL; new_fd->on_done_closure = NULL; new_fd->read_notifier_pollset = NULL; - gpr_mu_unlock(&new_fd->mu); + gpr_mu_unlock(&new_fd->po.mu); char *fd_name; gpr_asprintf(&fd_name, "%s fd=%d", name, fd); @@ -964,17 +977,13 @@ static grpc_fd *fd_create(int fd, const char *name) { return new_fd; } -static bool fd_is_orphaned(grpc_fd *fd) { - return (gpr_atm_acq_load(&fd->refst) & 1) == 0; -} - static int fd_wrapped_fd(grpc_fd *fd) { int ret_fd = -1; - gpr_mu_lock(&fd->mu); + gpr_mu_lock(&fd->po.mu); if (!fd->orphaned) { ret_fd = fd->fd; } - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); return ret_fd; } @@ -986,7 +995,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *error = GRPC_ERROR_NONE; polling_island *unref_pi = NULL; - gpr_mu_lock(&fd->mu); + gpr_mu_lock(&fd->po.mu); fd->on_done_closure = on_done; /* If release_fd is not NULL, we should be relinquishing control of the file @@ -1006,25 +1015,25 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, /* Remove the fd from the polling island: - Get a lock on the latest polling island (i.e the last island in the - linked list pointed by fd->polling_island). This is the island that + linked list pointed by fd->po.pi). This is the island that would actually contain the fd - Remove the fd from the latest polling island - Unlock the latest polling island - - Set fd->polling_island to NULL (but remove the ref on the polling island + - Set fd->po.pi to NULL (but remove the ref on the polling island before doing this.) */ - if (fd->polling_island != NULL) { - polling_island *pi_latest = polling_island_lock(fd->polling_island); + if (fd->po.pi != NULL) { + polling_island *pi_latest = polling_island_lock(fd->po.pi); polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error); gpr_mu_unlock(&pi_latest->mu); - unref_pi = fd->polling_island; - fd->polling_island = NULL; + unref_pi = fd->po.pi; + fd->po.pi = NULL; } grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, GRPC_ERROR_REF(error), NULL); - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ if (unref_pi != NULL) { /* Unref stale polling island here, outside the fd lock above. @@ -1089,23 +1098,23 @@ static grpc_pollset *fd_get_read_notifier_pollset(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { grpc_pollset *notifier = NULL; - gpr_mu_lock(&fd->mu); + gpr_mu_lock(&fd->po.mu); notifier = fd->read_notifier_pollset; - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); return notifier; } static bool fd_is_shutdown(grpc_fd *fd) { - gpr_mu_lock(&fd->mu); + gpr_mu_lock(&fd->po.mu); const bool r = fd->shutdown; - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); return r; } /* Might be called multiple times */ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - gpr_mu_lock(&fd->mu); + gpr_mu_lock(&fd->po.mu); /* Do the actual shutdown only once */ if (!fd->shutdown) { fd->shutdown = true; @@ -1116,28 +1125,28 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { set_ready_locked(exec_ctx, fd, &fd->read_closure); set_ready_locked(exec_ctx, fd, &fd->write_closure); } - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); } static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - gpr_mu_lock(&fd->mu); + gpr_mu_lock(&fd->po.mu); notify_on_locked(exec_ctx, fd, &fd->read_closure, closure); - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); } static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure) { - gpr_mu_lock(&fd->mu); + gpr_mu_lock(&fd->po.mu); notify_on_locked(exec_ctx, fd, &fd->write_closure, closure); - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); } static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { - gpr_mu_lock(&fd->mu); - grpc_workqueue *workqueue = GRPC_WORKQUEUE_REF( - (grpc_workqueue *)fd->polling_island, "fd_get_workqueue"); - gpr_mu_unlock(&fd->mu); + gpr_mu_lock(&fd->po.mu); + grpc_workqueue *workqueue = + GRPC_WORKQUEUE_REF((grpc_workqueue *)fd->po.pi, "fd_get_workqueue"); + gpr_mu_unlock(&fd->po.mu); return workqueue; } @@ -1277,8 +1286,12 @@ static grpc_error *kick_poller(void) { } static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { - gpr_mu_init(&pollset->mu); - *mu = &pollset->mu; + gpr_mu_init(&pollset->po.mu); + *mu = &pollset->po.mu; + pollset->po.pi = NULL; +#ifdef PO_DEBUG + pollset->po.obj_type = POLL_OBJ_POLLSET; +#endif pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; pollset->kicked_without_pollers = false; @@ -1286,8 +1299,6 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) { pollset->shutting_down = false; pollset->finish_shutdown_called = false; pollset->shutdown_done = NULL; - - pollset->polling_island = NULL; } /* Convert a timespec to milliseconds: @@ -1317,26 +1328,26 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline, static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_pollset *notifier) { - /* Need the fd->mu since we might be racing with fd_notify_on_read */ - gpr_mu_lock(&fd->mu); + /* Need the fd->po.mu since we might be racing with fd_notify_on_read */ + gpr_mu_lock(&fd->po.mu); set_ready_locked(exec_ctx, fd, &fd->read_closure); fd->read_notifier_pollset = notifier; - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); } static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { - /* Need the fd->mu since we might be racing with fd_notify_on_write */ - gpr_mu_lock(&fd->mu); + /* Need the fd->po.mu since we might be racing with fd_notify_on_write */ + gpr_mu_lock(&fd->po.mu); set_ready_locked(exec_ctx, fd, &fd->write_closure); - gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&fd->po.mu); } static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, grpc_pollset *ps, char *reason) { - if (ps->polling_island != NULL) { - PI_UNREF(exec_ctx, ps->polling_island, reason); + if (ps->po.pi != NULL) { + PI_UNREF(exec_ctx, ps->po.pi, reason); } - ps->polling_island = NULL; + ps->po.pi = NULL; } static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, @@ -1346,12 +1357,12 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, pollset->finish_shutdown_called = true; - /* Release the ref and set pollset->polling_island to NULL */ + /* Release the ref and set pollset->po.pi to NULL */ pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } -/* pollset->mu lock must be held by the caller before calling this */ +/* pollset->po.mu lock must be held by the caller before calling this */ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { GPR_TIMER_BEGIN("pollset_shutdown", 0); @@ -1376,7 +1387,7 @@ static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * here */ static void pollset_destroy(grpc_pollset *pollset) { GPR_ASSERT(!pollset_has_workers(pollset)); - gpr_mu_destroy(&pollset->mu); + gpr_mu_destroy(&pollset->po.mu); } static void pollset_reset(grpc_pollset *pollset) { @@ -1386,7 +1397,7 @@ static void pollset_reset(grpc_pollset *pollset) { pollset->finish_shutdown_called = false; pollset->kicked_without_pollers = false; pollset->shutdown_done = NULL; - GPR_ASSERT(pollset->polling_island == NULL); + GPR_ASSERT(pollset->po.pi == NULL); } static bool maybe_do_workqueue_work(grpc_exec_ctx *exec_ctx, @@ -1426,7 +1437,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("pollset_work_and_unlock", 0); /* We need to get the epoll_fd to wait on. The epoll_fd is in inside the - latest polling island pointed by pollset->polling_island. + latest polling island pointed by pollset->po.pi Since epoll_fd is immutable, we can read it without obtaining the polling island lock. There is however a possibility that the polling island (from @@ -1435,36 +1446,36 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, right-away from epoll_wait() and pick up the latest polling_island the next this function (i.e pollset_work_and_unlock()) is called */ - if (pollset->polling_island == NULL) { - pollset->polling_island = polling_island_create(exec_ctx, NULL, error); - if (pollset->polling_island == NULL) { + if (pollset->po.pi == NULL) { + pollset->po.pi = polling_island_create(exec_ctx, NULL, error); + if (pollset->po.pi == NULL) { GPR_TIMER_END("pollset_work_and_unlock", 0); return; /* Fatal error. We cannot continue */ } - PI_ADD_REF(pollset->polling_island, "ps"); + PI_ADD_REF(pollset->po.pi, "ps"); GRPC_POLLING_TRACE("pollset_work: pollset: %p created new pi: %p", - (void *)pollset, (void *)pollset->polling_island); + (void *)pollset, (void *)pollset->po.pi); } - pi = polling_island_maybe_get_latest(pollset->polling_island); + pi = polling_island_maybe_get_latest(pollset->po.pi); epoll_fd = pi->epoll_fd; - /* Update the pollset->polling_island since the island being pointed by - pollset->polling_island maybe older than the one pointed by pi) */ - if (pollset->polling_island != pi) { + /* Update the pollset->po.pi since the island being pointed by + pollset->po.pi maybe older than the one pointed by pi) */ + if (pollset->po.pi != pi) { /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the polling island to be deleted */ PI_ADD_REF(pi, "ps"); - PI_UNREF(exec_ctx, pollset->polling_island, "ps"); - pollset->polling_island = pi; + PI_UNREF(exec_ctx, pollset->po.pi, "ps"); + pollset->po.pi = pi; } /* Add an extra ref so that the island does not get destroyed (which means the epoll_fd won't be closed) while we are are doing an epoll_wait() on the epoll_fd */ PI_ADD_REF(pi, "ps_work"); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&pollset->po.mu); /* If we get some workqueue work to do, it might end up completing an item on the completion queue, so there's no need to poll... so we skip that and @@ -1537,17 +1548,17 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, GPR_ASSERT(pi != NULL); /* Before leaving, release the extra ref we added to the polling island. It - is important to use "pi" here (i.e our old copy of pollset->polling_island + is important to use "pi" here (i.e our old copy of pollset->po.pi that we got before releasing the polling island lock). This is because - pollset->polling_island pointer might get udpated in other parts of the + pollset->po.pi pointer might get udpated in other parts of the code when there is an island merge while we are doing epoll_wait() above */ PI_UNREF(exec_ctx, pi, "ps_work"); GPR_TIMER_END("pollset_work_and_unlock", 0); } -/* pollset->mu lock must be held by the caller before calling this. - The function pollset_work() may temporarily release the lock (pollset->mu) +/* pollset->po.mu lock must be held by the caller before calling this. + The function pollset_work() may temporarily release the lock (pollset->po.mu) during the course of its execution but it will always re-acquire the lock and ensure that it is held by the time the function returns */ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -1617,7 +1628,7 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, &g_orig_sigmask, &error); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&pollset->po.mu); /* Note: There is no need to reset worker.is_kicked to 0 since we are no longer going to use this worker */ @@ -1637,9 +1648,9 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, GPR_TIMER_MARK("pollset_work.finish_shutdown_locked", 0); finish_shutdown_locked(exec_ctx, pollset); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&pollset->po.mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&pollset->po.mu); } *worker_hdl = NULL; @@ -1653,130 +1664,160 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, return error; } -static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, - grpc_fd *fd) { - GPR_TIMER_BEGIN("pollset_add_fd", 0); - - grpc_error *error = GRPC_ERROR_NONE; +static void add_poll_object(grpc_exec_ctx *exec_ctx, poll_obj *bag, + poll_obj_type bag_type, poll_obj *item, + poll_obj_type item_type) { + GPR_TIMER_BEGIN("add_poll_object", 0); - gpr_mu_lock(&pollset->mu); - gpr_mu_lock(&fd->mu); +#ifdef PO_DEBUG + GPR_ASSERT(item->obj_type == item_type); + GPR_ASSERT(bag->obj_type == bag_type); +#endif + grpc_error *error = GRPC_ERROR_NONE; polling_island *pi_new = NULL; + gpr_mu_lock(&bag->mu); + gpr_mu_lock(&item->mu); + retry: - /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and - * equal, do nothing. - * 2) If fd->polling_island and pollset->polling_island are both NULL, create - * a new polling island (with a refcount of 2) and make the polling_island - * fields in both fd and pollset to point to the new island - * 3) If one of fd->polling_island or pollset->polling_island is NULL, update - * the NULL polling_island field to point to the non-NULL polling_island - * field (ensure that the refcount on the polling island is incremented by - * 1 to account for the newly added reference) - * 4) Finally, if fd->polling_island and pollset->polling_island are non-NULL - * and different, merge both the polling islands and update the - * polling_island fields in both fd and pollset to point to the merged - * polling island. + /* + * 1) If item->pi and bag->pi are both non-NULL and equal, do nothing + * 2) If item->pi and bag->pi are both NULL, create a new polling island (with + * a refcount of 2) and point item->pi and bag->pi to the new island + * 3) If exactly one of item->pi or bag->pi is NULL, update it to point to + * the other's non-NULL pi + * 4) Finally if item->pi and bag-pi are non-NULL and not-equal, merge the + * polling islands and update item->pi and bag->pi to point to the new + * island */ - if (fd->orphaned) { - gpr_mu_unlock(&fd->mu); - gpr_mu_unlock(&pollset->mu); - /* early out */ + /* Early out if we are trying to add an 'fd' to a 'bag' but the fd is already + * orphaned */ + if (item_type == POLL_OBJ_FD && (FD_FROM_PO(item))->orphaned) { + gpr_mu_unlock(&item->mu); + gpr_mu_unlock(&bag->mu); return; } - if (fd->polling_island == pollset->polling_island) { - pi_new = fd->polling_island; + if (item->pi == bag->pi) { + pi_new = item->pi; if (pi_new == NULL) { - /* Unlock before creating a new polling island: the polling island will - create a workqueue which creates a file descriptor, and holding an fd - lock here can eventually cause a loop to appear to TSAN (making it - unhappy). We don't think it's a real loop (there's an epoch point where - that loop possibility disappears), but the advantages of keeping TSAN - happy outweigh any performance advantage we might have by keeping the - lock held. */ - gpr_mu_unlock(&fd->mu); - pi_new = polling_island_create(exec_ctx, fd, &error); - gpr_mu_lock(&fd->mu); - /* Need to reverify any assumptions made between the initial lock and - getting to this branch: if they've changed, we need to throw away our - work and figure things out again. */ - if (fd->polling_island != NULL) { - GRPC_POLLING_TRACE( - "pollset_add_fd: Raced creating new polling island. pi_new: %p " - "(fd: %d, pollset: %p)", - (void *)pi_new, fd->fd, (void *)pollset); - - /* No need to lock 'pi_new' here since this is a new polling island and - * no one has a reference to it yet */ - polling_island_remove_all_fds_locked(pi_new, true, &error); - - /* Ref and unref so that the polling island gets deleted during unref */ - PI_ADD_REF(pi_new, "dance_of_destruction"); - PI_UNREF(exec_ctx, pi_new, "dance_of_destruction"); - goto retry; + /* GPR_ASSERT(item->pi == bag->pi == NULL) */ + + /* If we are adding an fd to a bag (i.e pollset or pollset_set), then + * we need to do some extra work to make TSAN happy */ + if (item_type == POLL_OBJ_FD) { + /* Unlock before creating a new polling island: the polling island will + create a workqueue which creates a file descriptor, and holding an fd + lock here can eventually cause a loop to appear to TSAN (making it + unhappy). We don't think it's a real loop (there's an epoch point + where that loop possibility disappears), but the advantages of + keeping TSAN happy outweigh any performance advantage we might have + by keeping the lock held. */ + gpr_mu_unlock(&item->mu); + pi_new = polling_island_create(exec_ctx, FD_FROM_PO(item), &error); + gpr_mu_lock(&item->mu); + + /* Need to reverify any assumptions made between the initial lock and + getting to this branch: if they've changed, we need to throw away our + work and figure things out again. */ + if (item->pi != NULL) { + GRPC_POLLING_TRACE( + "add_poll_object: Raced creating new polling island. pi_new: %p " + "(fd: %d, %s: %p)", + (void *)pi_new, FD_FROM_PO(item)->fd, poll_obj_string(bag_type), + (void *)bag); + /* No need to lock 'pi_new' here since this is a new polling island + * and no one has a reference to it yet */ + polling_island_remove_all_fds_locked(pi_new, true, &error); + + /* Ref and unref so that the polling island gets deleted during unref + */ + PI_ADD_REF(pi_new, "dance_of_destruction"); + PI_UNREF(exec_ctx, pi_new, "dance_of_destruction"); + goto retry; + } } else { - GRPC_POLLING_TRACE( - "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " - "pollset: %p)", - (void *)pi_new, fd->fd, (void *)pollset); + pi_new = polling_island_create(exec_ctx, NULL, &error); } + + GRPC_POLLING_TRACE( + "add_poll_object: Created new polling island. pi_new: %p (%s: %p, " + "%s: %p)", + (void *)pi_new, poll_obj_string(item_type), (void *)item, + poll_obj_string(bag_type), (void *)bag); + } else { + GRPC_POLLING_TRACE( + "add_poll_object: Same polling island. pi: %p (%s, %s)", + (void *)pi_new, poll_obj_string(item_type), + poll_obj_string(bag_type)); + } + } else if (item->pi == NULL) { + /* GPR_ASSERT(bag->pi != NULL) */ + /* Make pi_new point to latest pi*/ + pi_new = polling_island_lock(bag->pi); + + if (item_type == POLL_OBJ_FD) { + grpc_fd *fd = FD_FROM_PO(item); + polling_island_add_fds_locked(pi_new, &fd, 1, true, &error); } - } else if (fd->polling_island == NULL) { - pi_new = polling_island_lock(pollset->polling_island); - polling_island_add_fds_locked(pi_new, &fd, 1, true, &error); - gpr_mu_unlock(&pi_new->mu); + gpr_mu_unlock(&pi_new->mu); GRPC_POLLING_TRACE( - "pollset_add_fd: fd->pi was NULL. pi_new: %p (fd: %d, pollset: %p, " - "pollset->pi: %p)", - (void *)pi_new, fd->fd, (void *)pollset, - (void *)pollset->polling_island); - } else if (pollset->polling_island == NULL) { - pi_new = polling_island_lock(fd->polling_island); + "add_poll_obj: item->pi was NULL. pi_new: %p (item(%s): %p, " + "bag(%s): %p)", + (void *)pi_new, poll_obj_string(item_type), (void *)item, + poll_obj_string(bag_type), (void *)bag); + } else if (bag->pi == NULL) { + /* GPR_ASSERT(item->pi != NULL) */ + /* Make pi_new to point to latest pi */ + pi_new = polling_island_lock(item->pi); gpr_mu_unlock(&pi_new->mu); - GRPC_POLLING_TRACE( - "pollset_add_fd: pollset->pi was NULL. pi_new: %p (fd: %d, pollset: " - "%p, fd->pi: %p", - (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island); + "add_poll_obj: bag->pi was NULL. pi_new: %p (item(%s): %p, " + "bag(%s): %p)", + (void *)pi_new, poll_obj_string(item_type), (void *)item, + poll_obj_string(bag_type), (void *)bag); } else { - pi_new = polling_island_merge(fd->polling_island, pollset->polling_island, - &error); + pi_new = polling_island_merge(item->pi, bag->pi, &error); GRPC_POLLING_TRACE( - "pollset_add_fd: polling islands merged. pi_new: %p (fd: %d, pollset: " - "%p, fd->pi: %p, pollset->pi: %p)", - (void *)pi_new, fd->fd, (void *)pollset, (void *)fd->polling_island, - (void *)pollset->polling_island); + "add_poll_obj: polling islands merged. pi_new: %p (item(%s): %p, " + "bag(%s): %p)", + (void *)pi_new, poll_obj_string(item_type), (void *)item, + poll_obj_string(bag_type), (void *)bag); } - /* At this point, pi_new is the polling island that both fd->polling_island - and pollset->polling_island must be pointing to */ + /* At this point, pi_new is the polling island that both item->pi and bag->pi + MUST be pointing to */ - if (fd->polling_island != pi_new) { - PI_ADD_REF(pi_new, "fd"); - if (fd->polling_island != NULL) { - PI_UNREF(exec_ctx, fd->polling_island, "fd"); + if (item->pi != pi_new) { + PI_ADD_REF(pi_new, poll_obj_string(item_type)); + if (item->pi != NULL) { + PI_UNREF(exec_ctx, item->pi, poll_obj_string(item_type)); } - fd->polling_island = pi_new; + item->pi = pi_new; } - if (pollset->polling_island != pi_new) { - PI_ADD_REF(pi_new, "ps"); - if (pollset->polling_island != NULL) { - PI_UNREF(exec_ctx, pollset->polling_island, "ps"); + if (bag->pi != pi_new) { + PI_ADD_REF(pi_new, poll_obj_string(bag_type)); + if (bag->pi != NULL) { + PI_UNREF(exec_ctx, bag->pi, poll_obj_string(bag_type)); } - pollset->polling_island = pi_new; + bag->pi = pi_new; } - gpr_mu_unlock(&fd->mu); - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&item->mu); + gpr_mu_unlock(&bag->mu); - GRPC_LOG_IF_ERROR("pollset_add_fd", error); + GRPC_LOG_IF_ERROR("add_poll_object", error); + GPR_TIMER_END("add_poll_object", 0); +} - GPR_TIMER_END("pollset_add_fd", 0); +static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, + grpc_fd *fd) { + add_poll_object(exec_ctx, &pollset->po, POLL_OBJ_POLLSET, &fd->po, + POLL_OBJ_FD); } /******************************************************************************* @@ -1784,142 +1825,60 @@ retry: */ static grpc_pollset_set *pollset_set_create(void) { - grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set)); - memset(pollset_set, 0, sizeof(*pollset_set)); - gpr_mu_init(&pollset_set->mu); - return pollset_set; + grpc_pollset_set *pss = gpr_malloc(sizeof(*pss)); + gpr_mu_init(&pss->po.mu); + pss->po.pi = NULL; +#ifdef PO_DEBUG + pss->po.obj_type = POLL_OBJ_POLLSET_SET; +#endif + return pss; } -static void pollset_set_destroy(grpc_pollset_set *pollset_set) { - size_t i; - gpr_mu_destroy(&pollset_set->mu); - for (i = 0; i < pollset_set->fd_count; i++) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); +static void pollset_set_destroy(grpc_pollset_set *pss) { + gpr_mu_destroy(&pss->po.mu); + + if (pss->po.pi != NULL) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + PI_UNREF(&exec_ctx, pss->po.pi, "pss_destroy"); + grpc_exec_ctx_finish(&exec_ctx); } - gpr_free(pollset_set->pollsets); - gpr_free(pollset_set->pollset_sets); - gpr_free(pollset_set->fds); - gpr_free(pollset_set); + + gpr_free(pss); } -static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, grpc_fd *fd) { - size_t i; - gpr_mu_lock(&pollset_set->mu); - if (pollset_set->fd_count == pollset_set->fd_capacity) { - pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity); - pollset_set->fds = gpr_realloc( - pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds)); - } - GRPC_FD_REF(fd, "pollset_set"); - pollset_set->fds[pollset_set->fd_count++] = fd; - for (i = 0; i < pollset_set->pollset_count; i++) { - pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd); - } - for (i = 0; i < pollset_set->pollset_set_count; i++) { - pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd); - } - gpr_mu_unlock(&pollset_set->mu); +static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, + grpc_fd *fd) { + add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &fd->po, + POLL_OBJ_FD); } -static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, grpc_fd *fd) { - size_t i; - gpr_mu_lock(&pollset_set->mu); - for (i = 0; i < pollset_set->fd_count; i++) { - if (pollset_set->fds[i] == fd) { - pollset_set->fd_count--; - GPR_SWAP(grpc_fd *, pollset_set->fds[i], - pollset_set->fds[pollset_set->fd_count]); - GRPC_FD_UNREF(fd, "pollset_set"); - break; - } - } - for (i = 0; i < pollset_set->pollset_set_count; i++) { - pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd); - } - gpr_mu_unlock(&pollset_set->mu); +static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset_set *pss, + grpc_fd *fd) { + /* Nothing to do */ } static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, - grpc_pollset *pollset) { - size_t i, j; - gpr_mu_lock(&pollset_set->mu); - if (pollset_set->pollset_count == pollset_set->pollset_capacity) { - pollset_set->pollset_capacity = - GPR_MAX(8, 2 * pollset_set->pollset_capacity); - pollset_set->pollsets = - gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity * - sizeof(*pollset_set->pollsets)); - } - pollset_set->pollsets[pollset_set->pollset_count++] = pollset; - for (i = 0, j = 0; i < pollset_set->fd_count; i++) { - if (fd_is_orphaned(pollset_set->fds[i])) { - GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set"); - } else { - pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]); - pollset_set->fds[j++] = pollset_set->fds[i]; - } - } - pollset_set->fd_count = j; - gpr_mu_unlock(&pollset_set->mu); + grpc_pollset_set *pss, grpc_pollset *ps) { + add_poll_object(exec_ctx, &pss->po, POLL_OBJ_POLLSET_SET, &ps->po, + POLL_OBJ_POLLSET); } static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx, - grpc_pollset_set *pollset_set, - grpc_pollset *pollset) { - size_t i; - gpr_mu_lock(&pollset_set->mu); - for (i = 0; i < pollset_set->pollset_count; i++) { - if (pollset_set->pollsets[i] == pollset) { - pollset_set->pollset_count--; - GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i], - pollset_set->pollsets[pollset_set->pollset_count]); - break; - } - } - gpr_mu_unlock(&pollset_set->mu); + grpc_pollset_set *pss, grpc_pollset *ps) { + /* Nothing to do */ } static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx, grpc_pollset_set *bag, grpc_pollset_set *item) { - size_t i, j; - gpr_mu_lock(&bag->mu); - if (bag->pollset_set_count == bag->pollset_set_capacity) { - bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity); - bag->pollset_sets = - gpr_realloc(bag->pollset_sets, - bag->pollset_set_capacity * sizeof(*bag->pollset_sets)); - } - bag->pollset_sets[bag->pollset_set_count++] = item; - for (i = 0, j = 0; i < bag->fd_count; i++) { - if (fd_is_orphaned(bag->fds[i])) { - GRPC_FD_UNREF(bag->fds[i], "pollset_set"); - } else { - pollset_set_add_fd(exec_ctx, item, bag->fds[i]); - bag->fds[j++] = bag->fds[i]; - } - } - bag->fd_count = j; - gpr_mu_unlock(&bag->mu); + add_poll_object(exec_ctx, &bag->po, POLL_OBJ_POLLSET_SET, &item->po, + POLL_OBJ_POLLSET_SET); } static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, grpc_pollset_set *bag, grpc_pollset_set *item) { - size_t i; - gpr_mu_lock(&bag->mu); - for (i = 0; i < bag->pollset_set_count; i++) { - if (bag->pollset_sets[i] == item) { - bag->pollset_set_count--; - GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i], - bag->pollset_sets[bag->pollset_set_count]); - break; - } - } - gpr_mu_unlock(&bag->mu); + /* Nothing to do */ } /* Test helper functions @@ -1927,9 +1886,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, void *grpc_fd_get_polling_island(grpc_fd *fd) { polling_island *pi; - gpr_mu_lock(&fd->mu); - pi = fd->polling_island; - gpr_mu_unlock(&fd->mu); + gpr_mu_lock(&fd->po.mu); + pi = fd->po.pi; + gpr_mu_unlock(&fd->po.mu); return pi; } @@ -1937,9 +1896,9 @@ void *grpc_fd_get_polling_island(grpc_fd *fd) { void *grpc_pollset_get_polling_island(grpc_pollset *ps) { polling_island *pi; - gpr_mu_lock(&ps->mu); - pi = ps->polling_island; - gpr_mu_unlock(&ps->mu); + gpr_mu_lock(&ps->po.mu); + pi = ps->po.pi; + gpr_mu_unlock(&ps->po.mu); return pi; } |