diff options
Diffstat (limited to 'src/core/lib/iomgr/ev_epollex_linux.cc')
-rw-r--r-- | src/core/lib/iomgr/ev_epollex_linux.cc | 97 |
1 files changed, 73 insertions, 24 deletions
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index a9b2adf75d..5ffabdc665 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -104,8 +104,10 @@ struct pollable { int epfd; grpc_wakeup_fd wakeup; - // only for type fd... one ref to the owner fd - grpc_fd* owner_fd; + // The following are relevant only for type PO_FD + grpc_fd* owner_fd; // Set to the owner_fd if the type is PO_FD + gpr_mu owner_orphan_mu; // Synchronizes access to owner_orphaned field + bool owner_orphaned; // Is the owner fd orphaned grpc_pollset_set* pollset_set; pollable* next; @@ -338,21 +340,45 @@ static void ref_by(grpc_fd* fd, int n) { GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0); } +#ifndef NDEBUG +#define INVALIDATE_FD(fd) invalidate_fd(fd) +/* Since an fd is never really destroyed (i.e gpr_free() is not called), it is + * hard to cases where fd fields are accessed even after calling fd_destroy(). + * The following invalidates fd fields to make catching such errors easier */ +static void invalidate_fd(grpc_fd* fd) { + fd->fd = -1; + fd->salt = -1; + gpr_atm_no_barrier_store(&fd->refst, -1); + memset(&fd->orphan_mu, -1, sizeof(fd->orphan_mu)); + memset(&fd->pollable_mu, -1, sizeof(fd->pollable_mu)); + fd->pollable_obj = nullptr; + fd->on_done_closure = nullptr; + gpr_atm_no_barrier_store(&fd->read_notifier_pollset, 0); + memset(&fd->iomgr_object, -1, sizeof(fd->iomgr_object)); + fd->track_err = false; +} +#else +#define INVALIDATE_FD(fd) +#endif + +/* Uninitialize and add to the freelist */ static void fd_destroy(void* arg, grpc_error* error) { grpc_fd* fd = static_cast<grpc_fd*>(arg); - /* Add the fd to the freelist */ grpc_iomgr_unregister_object(&fd->iomgr_object); POLLABLE_UNREF(fd->pollable_obj, "fd_pollable"); gpr_mu_destroy(&fd->pollable_mu); gpr_mu_destroy(&fd->orphan_mu); - gpr_mu_lock(&fd_freelist_mu); - fd->freelist_next = fd_freelist; - fd_freelist = fd; fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); fd->error_closure->DestroyEvent(); + INVALIDATE_FD(fd); + + /* Add the fd to the freelist */ + gpr_mu_lock(&fd_freelist_mu); + fd->freelist_next = fd_freelist; + fd_freelist = fd; gpr_mu_unlock(&fd_freelist_mu); } @@ -408,20 +434,18 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { new_fd->error_closure.Init(); } - gpr_mu_init(&new_fd->pollable_mu); - gpr_mu_init(&new_fd->orphan_mu); - new_fd->pollable_obj = nullptr; - gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; - new_fd->track_err = track_err; new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); + gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); + gpr_mu_init(&new_fd->orphan_mu); + gpr_mu_init(&new_fd->pollable_mu); + new_fd->pollable_obj = nullptr; new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); new_fd->error_closure->InitEvent(); - gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); - new_fd->freelist_next = nullptr; new_fd->on_done_closure = nullptr; + gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); char* fd_name; gpr_asprintf(&fd_name, "%s fd=%d", name, fd); @@ -432,6 +456,8 @@ static grpc_fd* fd_create(int fd, const char* name, bool track_err) { } #endif gpr_free(fd_name); + + new_fd->track_err = track_err; return new_fd; } @@ -446,6 +472,17 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, gpr_mu_lock(&fd->orphan_mu); + // Get the fd->pollable_obj and set the owner_orphaned on that pollable to + // true so that the pollable will no longer access its owner_fd field. + gpr_mu_lock(&fd->pollable_mu); + pollable* pollable_obj = fd->pollable_obj; + gpr_mu_unlock(&fd->pollable_mu); + + if (pollable_obj) { + gpr_mu_lock(&pollable_obj->owner_orphan_mu); + pollable_obj->owner_orphaned = true; + } + fd->on_done_closure = on_done; /* If release_fd is not NULL, we should be relinquishing control of the file @@ -467,6 +504,10 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, GRPC_CLOSURE_SCHED(fd->on_done_closure, GRPC_ERROR_NONE); + if (pollable_obj) { + gpr_mu_unlock(&pollable_obj->owner_orphan_mu); + } + gpr_mu_unlock(&fd->orphan_mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ @@ -551,6 +592,8 @@ static grpc_error* pollable_create(pollable_type type, pollable** p) { gpr_mu_init(&(*p)->mu); (*p)->epfd = epfd; (*p)->owner_fd = nullptr; + gpr_mu_init(&(*p)->owner_orphan_mu); + (*p)->owner_orphaned = false; (*p)->pollset_set = nullptr; (*p)->next = (*p)->prev = *p; (*p)->root_worker = nullptr; @@ -590,6 +633,7 @@ static void pollable_unref(pollable* p, int line, const char* reason) { GRPC_FD_TRACE("pollable_unref: Closing epfd: %d", p->epfd); close(p->epfd); grpc_wakeup_fd_destroy(&p->wakeup); + gpr_mu_destroy(&p->owner_orphan_mu); gpr_free(p); } } @@ -846,10 +890,15 @@ static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } -static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { +/* Get the pollable_obj attached to this fd. If none is attached, create a new + * pollable object (of type PO_FD), attach it to the fd and return it + * + * Note that if a pollable object is already attached to the fd, it may be of + * either PO_FD or PO_MULTI type */ +static grpc_error* get_fd_pollable(grpc_fd* fd, pollable** p) { gpr_mu_lock(&fd->pollable_mu); grpc_error* error = GRPC_ERROR_NONE; - static const char* err_desc = "fd_get_or_become_pollable"; + static const char* err_desc = "get_fd_pollable"; if (fd->pollable_obj == nullptr) { if (append_error(&error, pollable_create(PO_FD, &fd->pollable_obj), err_desc)) { @@ -1186,7 +1235,7 @@ static grpc_error* pollset_transition_pollable_from_empty_to_fd_locked( } append_error(&error, pollset_kick_all(pollset), err_desc); POLLABLE_UNREF(pollset->active_pollable, "pollset"); - append_error(&error, fd_get_or_become_pollable(fd, &pollset->active_pollable), + append_error(&error, get_fd_pollable(fd, &pollset->active_pollable), err_desc); return error; } @@ -1230,9 +1279,8 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) { error = pollset_transition_pollable_from_empty_to_fd_locked(pollset, fd); break; case PO_FD: - gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); - if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & - 1) == 0) { + gpr_mu_lock(&po_at_start->owner_orphan_mu); + if (po_at_start->owner_orphaned) { error = pollset_transition_pollable_from_empty_to_fd_locked(pollset, fd); } else { @@ -1240,7 +1288,7 @@ static grpc_error* pollset_add_fd_locked(grpc_pollset* pollset, grpc_fd* fd) { error = pollset_transition_pollable_from_fd_to_multi_locked(pollset, fd); } - gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); + gpr_mu_unlock(&po_at_start->owner_orphan_mu); break; case PO_MULTI: error = pollable_add_fd(pollset->active_pollable, fd); @@ -1276,16 +1324,17 @@ static grpc_error* pollset_as_multipollable_locked(grpc_pollset* pollset, append_error(&error, pollset_kick_all(pollset), err_desc); break; case PO_FD: - gpr_mu_lock(&po_at_start->owner_fd->orphan_mu); - if ((gpr_atm_no_barrier_load(&pollset->active_pollable->owner_fd->refst) & - 1) == 0) { + gpr_mu_lock(&po_at_start->owner_orphan_mu); + if (po_at_start->owner_orphaned) { + // Unlock before Unref'ing the pollable + gpr_mu_unlock(&po_at_start->owner_orphan_mu); POLLABLE_UNREF(pollset->active_pollable, "pollset"); error = pollable_create(PO_MULTI, &pollset->active_pollable); } else { error = pollset_transition_pollable_from_fd_to_multi_locked(pollset, nullptr); + gpr_mu_unlock(&po_at_start->owner_orphan_mu); } - gpr_mu_unlock(&po_at_start->owner_fd->orphan_mu); break; case PO_MULTI: break; |