aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar klempner <klempner@google.com>2014-12-08 13:06:06 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2014-12-08 18:50:12 -0800
commit5ef51949bcd339c8bc615ec98f13c71bc503484a (patch)
treef0a5cd21f32fed21a436e63b18cd61fed4ac8c3d
parent493fbcc2afd232faa93e8f294b6f1b17a6d54335 (diff)
Make em destroy underlying descriptor objects (and their corresponding libevent
objects and closing the actual fd) asynchronously in the poller thread where they won't race with libevent internals while polling. This requires splitting the actual descriptor data from the application owned handle, because the former needs to have a longer lifetime. This CL also hacks dualstack_socket_test which is legitimately sensitive to the delayed close of the listening socket. Change on 2014/12/08 by klempner <klempner@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=81603300
-rw-r--r--src/core/eventmanager/em.c178
-rw-r--r--src/core/eventmanager/em.h20
-rw-r--r--test/core/end2end/dualstack_socket_test.c5
3 files changed, 132 insertions, 71 deletions
diff --git a/src/core/eventmanager/em.c b/src/core/eventmanager/em.c
index 36f3720e0d..0dc6c6a6d0 100644
--- a/src/core/eventmanager/em.c
+++ b/src/core/eventmanager/em.c
@@ -46,12 +46,33 @@
int evthread_use_threads(void);
+static void grpc_em_fd_impl_destroy(struct grpc_em_fd_impl *impl);
+
#define ALARM_TRIGGER_INIT ((gpr_atm)0)
#define ALARM_TRIGGER_INCREMENT ((gpr_atm)1)
#define DONE_SHUTDOWN ((void *)1)
#define POLLER_ID_INVALID ((gpr_atm)-1)
+typedef struct grpc_em_fd_impl {
+ grpc_em_task task; /* Base class, callbacks, queues, etc */
+ int fd; /* File descriptor */
+
+ /* Note that the shutdown event is only needed as a workaround for libevent
+ not properly handling event_active on an in flight event. */
+ struct event *shutdown_ev; /* activated to trigger shutdown */
+
+ /* protect shutdown_started|read_state|write_state and ensure barriers
+ between notify_on_[read|write] and read|write callbacks */
+ gpr_mu mu;
+ int shutdown_started; /* 0 -> shutdown not started, 1 -> started */
+ grpc_em_fd_state read_state;
+ grpc_em_fd_state write_state;
+
+ /* descriptor delete list. These are destroyed during polling. */
+ struct grpc_em_fd_impl *next;
+} grpc_em_fd_impl;
+
/* ================== grpc_em implementation ===================== */
/* If anything is in the work queue, process one item and return 1.
@@ -83,7 +104,18 @@ static void timer_callback(int fd, short events, void *context) {
event_base_loopbreak((struct event_base *)context);
}
-/* Spend some time polling if no other thread is.
+static void free_fd_list(grpc_em_fd_impl *impl) {
+ while (impl != NULL) {
+ grpc_em_fd_impl *current = impl;
+ impl = impl->next;
+ grpc_em_fd_impl_destroy(current);
+ gpr_free(current);
+ }
+}
+
+/* Spend some time doing polling and libevent maintenance work if no other
+ thread is. This includes both polling for events and destroying/closing file
+ descriptor objects.
Returns 1 if polling was performed, 0 otherwise.
Requires em->mu locked, may unlock and relock during the call. */
static int maybe_do_polling_work(grpc_em *em, struct timeval delay) {
@@ -92,6 +124,10 @@ static int maybe_do_polling_work(grpc_em *em, struct timeval delay) {
if (em->num_pollers) return 0;
em->num_pollers = 1;
+
+ free_fd_list(em->fds_to_free);
+ em->fds_to_free = NULL;
+
gpr_mu_unlock(&em->mu);
event_add(em->timeout_ev, &delay);
@@ -102,6 +138,11 @@ static int maybe_do_polling_work(grpc_em *em, struct timeval delay) {
event_del(em->timeout_ev);
gpr_mu_lock(&em->mu);
+ if (em->fds_to_free) {
+ free_fd_list(em->fds_to_free);
+ em->fds_to_free = NULL;
+ }
+
em->num_pollers = 0;
gpr_cv_broadcast(&em->cv);
return 1;
@@ -191,6 +232,7 @@ grpc_em_error grpc_em_init(grpc_em *em) {
em->num_fds = 0;
em->last_poll_completed = gpr_now();
em->shutdown_backup_poller = 0;
+ em->fds_to_free = NULL;
gpr_event_init(&em->backup_poller_done);
@@ -247,6 +289,8 @@ grpc_em_error grpc_em_destroy(grpc_em *em) {
;
gpr_mu_unlock(&em->mu);
+ free_fd_list(em->fds_to_free);
+
/* complete shutdown */
gpr_mu_destroy(&em->mu);
gpr_cv_destroy(&em->cv);
@@ -284,6 +328,7 @@ static void add_task(grpc_em *em, grpc_em_activation_data *adata) {
static void alarm_ev_destroy(grpc_em_alarm *alarm) {
grpc_em_activation_data *adata = &alarm->task.activation[GRPC_EM_TA_ONLY];
if (adata->ev != NULL) {
+ /* TODO(klempner): Is this safe to do when we're cancelling? */
event_free(adata->ev);
adata->ev = NULL;
}
@@ -368,16 +413,14 @@ grpc_em_error grpc_em_alarm_cancel(grpc_em_alarm *alarm) {
/* ==================== grpc_em_fd implementation =================== */
/* Proxy callback to call a gRPC read/write callback */
-static void em_fd_cb(int fd, short what, void *arg /*=em_fd*/) {
- grpc_em_fd *em_fd = arg;
+static void em_fd_cb(int fd, short what, void *arg /*=em_fd_impl*/) {
+ grpc_em_fd_impl *em_fd = arg;
grpc_em_cb_status status = GRPC_CALLBACK_SUCCESS;
int run_read_cb = 0;
int run_write_cb = 0;
grpc_em_activation_data *rdata, *wdata;
gpr_mu_lock(&em_fd->mu);
- /* TODO(klempner): We need to delete the event here too so we avoid spurious
- shutdowns. */
if (em_fd->shutdown_started) {
status = GRPC_CALLBACK_CANCELLED;
} else if (status == GRPC_CALLBACK_SUCCESS && (what & EV_TIMEOUT)) {
@@ -428,28 +471,32 @@ static void em_fd_shutdown_cb(int fd, short what, void *arg /*=em_fd*/) {
that libevent's handling of event_active() on an event which is already in
flight on a different thread is racy and easily triggers TSAN.
*/
- grpc_em_fd *em_fd = arg;
- gpr_mu_lock(&em_fd->mu);
- em_fd->shutdown_started = 1;
- if (em_fd->read_state == GRPC_EM_FD_WAITING) {
- event_active(em_fd->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1);
+ grpc_em_fd_impl *impl = arg;
+ gpr_mu_lock(&impl->mu);
+ impl->shutdown_started = 1;
+ if (impl->read_state == GRPC_EM_FD_WAITING) {
+ event_active(impl->task.activation[GRPC_EM_TA_READ].ev, EV_READ, 1);
}
- if (em_fd->write_state == GRPC_EM_FD_WAITING) {
- event_active(em_fd->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1);
+ if (impl->write_state == GRPC_EM_FD_WAITING) {
+ event_active(impl->task.activation[GRPC_EM_TA_WRITE].ev, EV_WRITE, 1);
}
- gpr_mu_unlock(&em_fd->mu);
+ gpr_mu_unlock(&impl->mu);
}
grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
int flags;
grpc_em_activation_data *rdata, *wdata;
+ grpc_em_fd_impl *impl = gpr_malloc(sizeof(grpc_em_fd_impl));
gpr_mu_lock(&em->mu);
em->num_fds++;
+
gpr_mu_unlock(&em->mu);
- em_fd->shutdown_ev = NULL;
- gpr_mu_init(&em_fd->mu);
+ em_fd->impl = impl;
+
+ impl->shutdown_ev = NULL;
+ gpr_mu_init(&impl->mu);
flags = fcntl(fd, F_GETFL, 0);
if ((flags & O_NONBLOCK) == 0) {
@@ -457,11 +504,11 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
return GRPC_EM_INVALID_ARGUMENTS;
}
- em_fd->task.type = GRPC_EM_TASK_FD;
- em_fd->task.em = em;
- em_fd->fd = fd;
+ impl->task.type = GRPC_EM_TASK_FD;
+ impl->task.em = em;
+ impl->fd = fd;
- rdata = &(em_fd->task.activation[GRPC_EM_TA_READ]);
+ rdata = &(impl->task.activation[GRPC_EM_TA_READ]);
rdata->ev = NULL;
rdata->cb = NULL;
rdata->arg = NULL;
@@ -469,7 +516,7 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
rdata->prev = NULL;
rdata->next = NULL;
- wdata = &(em_fd->task.activation[GRPC_EM_TA_WRITE]);
+ wdata = &(impl->task.activation[GRPC_EM_TA_WRITE]);
wdata->ev = NULL;
wdata->cb = NULL;
wdata->arg = NULL;
@@ -477,49 +524,45 @@ grpc_em_error grpc_em_fd_init(grpc_em_fd *em_fd, grpc_em *em, int fd) {
wdata->prev = NULL;
wdata->next = NULL;
- em_fd->read_state = GRPC_EM_FD_IDLE;
- em_fd->write_state = GRPC_EM_FD_IDLE;
+ impl->read_state = GRPC_EM_FD_IDLE;
+ impl->write_state = GRPC_EM_FD_IDLE;
+
+ impl->shutdown_started = 0;
+ impl->next = NULL;
/* TODO(chenw): detect platforms where only level trigger is supported,
and set the event to non-persist. */
- rdata->ev = event_new(em->event_base, em_fd->fd, EV_ET | EV_PERSIST | EV_READ,
- em_fd_cb, em_fd);
+ rdata->ev = event_new(em->event_base, impl->fd, EV_ET | EV_PERSIST | EV_READ,
+ em_fd_cb, impl);
if (!rdata->ev) {
gpr_log(GPR_ERROR, "Failed to create read event");
return GRPC_EM_ERROR;
}
- wdata->ev = event_new(em->event_base, em_fd->fd,
- EV_ET | EV_PERSIST | EV_WRITE, em_fd_cb, em_fd);
+ wdata->ev = event_new(em->event_base, impl->fd, EV_ET | EV_PERSIST | EV_WRITE,
+ em_fd_cb, impl);
if (!wdata->ev) {
gpr_log(GPR_ERROR, "Failed to create write event");
return GRPC_EM_ERROR;
}
- em_fd->shutdown_ev =
- event_new(em->event_base, -1, EV_READ, em_fd_shutdown_cb, em_fd);
+ impl->shutdown_ev =
+ event_new(em->event_base, -1, EV_READ, em_fd_shutdown_cb, impl);
- if (!em_fd->shutdown_ev) {
+ if (!impl->shutdown_ev) {
gpr_log(GPR_ERROR, "Failed to create shutdown event");
return GRPC_EM_ERROR;
}
- em_fd->shutdown_started = 0;
return GRPC_EM_OK;
}
-void grpc_em_fd_destroy(grpc_em_fd *em_fd) {
+static void grpc_em_fd_impl_destroy(grpc_em_fd_impl *impl) {
grpc_em_task_activity_type type;
grpc_em_activation_data *adata;
- grpc_em *em = em_fd->task.em;
-
- /* ensure anyone holding the lock has left - it's the callers responsibility
- to ensure that no new users enter */
- gpr_mu_lock(&em_fd->mu);
- gpr_mu_unlock(&em_fd->mu);
for (type = GRPC_EM_TA_READ; type < GRPC_EM_TA_COUNT; type++) {
- adata = &(em_fd->task.activation[type]);
+ adata = &(impl->task.activation[type]);
GPR_ASSERT(adata->next == NULL);
if (adata->ev != NULL) {
event_free(adata->ev);
@@ -527,24 +570,43 @@ void grpc_em_fd_destroy(grpc_em_fd *em_fd) {
}
}
- if (em_fd->shutdown_ev != NULL) {
- event_free(em_fd->shutdown_ev);
- em_fd->shutdown_ev = NULL;
+ if (impl->shutdown_ev != NULL) {
+ event_free(impl->shutdown_ev);
+ impl->shutdown_ev = NULL;
}
- gpr_mu_destroy(&em_fd->mu);
+ gpr_mu_destroy(&impl->mu);
+ close(impl->fd);
+}
+
+void grpc_em_fd_destroy(grpc_em_fd *em_fd) {
+ grpc_em_fd_impl *impl = em_fd->impl;
+ grpc_em *em = impl->task.em;
gpr_mu_lock(&em->mu);
+
+ if (em->num_pollers == 0) {
+ /* it is safe to simply free it */
+ grpc_em_fd_impl_destroy(impl);
+ gpr_free(impl);
+ } else {
+ /* Put the impl on the list to be destroyed by the poller. */
+ impl->next = em->fds_to_free;
+ em->fds_to_free = impl;
+ /* Kick the poller so it closes the fd promptly.
+ * TODO(klempner): maybe this should be a different event.
+ */
+ event_active(em_fd->impl->shutdown_ev, EV_READ, 1);
+ }
+
em->num_fds--;
gpr_cv_broadcast(&em->cv);
gpr_mu_unlock(&em->mu);
-
- close(em_fd->fd);
}
-int grpc_em_fd_get(struct grpc_em_fd *em_fd) { return em_fd->fd; }
+int grpc_em_fd_get(struct grpc_em_fd *em_fd) { return em_fd->impl->fd; }
/* Returns the event manager associated with *em_fd. */
-grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd) { return em_fd->task.em; }
+grpc_em *grpc_em_fd_get_em(grpc_em_fd *em_fd) { return em_fd->impl->task.em; }
/* TODO(chenw): should we enforce the contract that notify_on_read cannot be
called when the previously registered callback has not been called yet. */
@@ -552,6 +614,7 @@ grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd,
grpc_em_cb_func read_cb,
void *read_cb_arg,
gpr_timespec deadline) {
+ grpc_em_fd_impl *impl = em_fd->impl;
int force_event = 0;
grpc_em_activation_data *rdata;
grpc_em_error result = GRPC_EM_OK;
@@ -560,22 +623,22 @@ grpc_em_error grpc_em_fd_notify_on_read(grpc_em_fd *em_fd,
struct timeval *delayp =
gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
- rdata = &em_fd->task.activation[GRPC_EM_TA_READ];
+ rdata = &impl->task.activation[GRPC_EM_TA_READ];
- gpr_mu_lock(&em_fd->mu);
+ gpr_mu_lock(&impl->mu);
rdata->cb = read_cb;
rdata->arg = read_cb_arg;
force_event =
- (em_fd->shutdown_started || em_fd->read_state == GRPC_EM_FD_CACHED);
- em_fd->read_state = GRPC_EM_FD_WAITING;
+ (impl->shutdown_started || impl->read_state == GRPC_EM_FD_CACHED);
+ impl->read_state = GRPC_EM_FD_WAITING;
if (force_event) {
event_active(rdata->ev, EV_READ, 1);
} else if (event_add(rdata->ev, delayp) == -1) {
result = GRPC_EM_ERROR;
}
- gpr_mu_unlock(&em_fd->mu);
+ gpr_mu_unlock(&impl->mu);
return result;
}
@@ -583,6 +646,7 @@ grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd,
grpc_em_cb_func write_cb,
void *write_cb_arg,
gpr_timespec deadline) {
+ grpc_em_fd_impl *impl = em_fd->impl;
int force_event = 0;
grpc_em_activation_data *wdata;
grpc_em_error result = GRPC_EM_OK;
@@ -591,27 +655,27 @@ grpc_em_error grpc_em_fd_notify_on_write(grpc_em_fd *em_fd,
struct timeval *delayp =
gpr_time_cmp(deadline, gpr_inf_future) ? &delay : NULL;
- wdata = &em_fd->task.activation[GRPC_EM_TA_WRITE];
+ wdata = &impl->task.activation[GRPC_EM_TA_WRITE];
- gpr_mu_lock(&em_fd->mu);
+ gpr_mu_lock(&impl->mu);
wdata->cb = write_cb;
wdata->arg = write_cb_arg;
force_event =
- (em_fd->shutdown_started || em_fd->write_state == GRPC_EM_FD_CACHED);
- em_fd->write_state = GRPC_EM_FD_WAITING;
+ (impl->shutdown_started || impl->write_state == GRPC_EM_FD_CACHED);
+ impl->write_state = GRPC_EM_FD_WAITING;
if (force_event) {
event_active(wdata->ev, EV_WRITE, 1);
} else if (event_add(wdata->ev, delayp) == -1) {
result = GRPC_EM_ERROR;
}
- gpr_mu_unlock(&em_fd->mu);
+ gpr_mu_unlock(&impl->mu);
return result;
}
void grpc_em_fd_shutdown(grpc_em_fd *em_fd) {
- event_active(em_fd->shutdown_ev, EV_READ, 1);
+ event_active(em_fd->impl->shutdown_ev, EV_READ, 1);
}
/*====================== Other callback functions ======================*/
diff --git a/src/core/eventmanager/em.h b/src/core/eventmanager/em.h
index aa439d1112..f190bc8743 100644
--- a/src/core/eventmanager/em.h
+++ b/src/core/eventmanager/em.h
@@ -204,6 +204,7 @@ grpc_em_error grpc_em_add_callback(grpc_em *em, grpc_em_cb_func cb,
/* Forward declarations */
struct grpc_em_activation_data;
+struct grpc_em_fd_impl;
/* ================== Actual structure definitions ========================= */
/* gRPC event manager handle.
@@ -223,6 +224,8 @@ struct grpc_em {
int shutdown_backup_poller;
gpr_event backup_poller_done;
+ struct grpc_em_fd_impl *fds_to_free;
+
struct event *timeout_ev; /* activated to break out of the event loop early */
};
@@ -330,23 +333,12 @@ typedef enum grpc_em_fd_state {
GRPC_EM_FD_CACHED = 2
} grpc_em_fd_state;
+struct grpc_em_fd_impl;
+
/* gRPC file descriptor handle.
The handle is used to register read/write callbacks to a file descriptor */
struct grpc_em_fd {
- grpc_em_task task; /* Base class, callbacks, queues, etc */
- int fd; /* File descriptor */
-
- /* Note that the shutdown event is only needed as a workaround for libevent
- not properly handling event_active on an in flight event. */
- struct event *shutdown_ev; /* activated to trigger shutdown */
-
- /* protect shutdown_started|read_state|write_state and ensure barriers
- between notify_on_[read|write] and read|write callbacks */
- gpr_mu mu;
- int shutdown_started; /* 0 -> shutdown not started, 1 -> started */
- grpc_em_fd_state read_state;
- grpc_em_fd_state write_state;
- /* activated after some timeout to activate shutdown_ev */
+ struct grpc_em_fd_impl *impl;
};
#endif /* __GRPC_INTERNAL_EVENTMANAGER_EM_H__ */
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c
index fd679661a1..4813672104 100644
--- a/test/core/end2end/dualstack_socket_test.c
+++ b/test/core/end2end/dualstack_socket_test.c
@@ -165,6 +165,11 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_completion_queue_shutdown(server_cq);
drain_cq(server_cq);
grpc_completion_queue_destroy(server_cq);
+ /* TODO(klempner): We need to give the EM time to actually close the listening
+ socket, or later tests will fail to bind to this port. We should fix this
+ by adding an API to EM to get notified when this happens and having it
+ prevent listener teardown. */
+ gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_millis(250)));
}
int main(int argc, char **argv) {