diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/exec_ctx.c | 5 | ||||
-rw-r--r-- | src/core/iomgr/exec_ctx.h | 5 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 73 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_internal.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_posix.c | 5 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_posix.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_windows.c | 4 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 155 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.h | 30 |
11 files changed, 206 insertions, 86 deletions
diff --git a/src/core/iomgr/exec_ctx.c b/src/core/iomgr/exec_ctx.c index a830a27b0b..f2914d376e 100644 --- a/src/core/iomgr/exec_ctx.c +++ b/src/core/iomgr/exec_ctx.c @@ -35,16 +35,19 @@ #include <grpc/support/log.h> -void grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { +int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) { + int did_something = 0; while (!grpc_closure_list_empty(exec_ctx->closure_list)) { grpc_closure *c = exec_ctx->closure_list.head; exec_ctx->closure_list.head = exec_ctx->closure_list.tail = NULL; while (c != NULL) { grpc_closure *next = c->next; + did_something = 1; c->cb(exec_ctx, c->cb_arg, c->success); c = next; } } + return did_something; } void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { diff --git a/src/core/iomgr/exec_ctx.h b/src/core/iomgr/exec_ctx.h index f99aa038c5..aa0610cbea 100644 --- a/src/core/iomgr/exec_ctx.h +++ b/src/core/iomgr/exec_ctx.h @@ -61,8 +61,9 @@ struct grpc_exec_ctx { { GRPC_CLOSURE_LIST_INIT } /** Flush any work that has been enqueued onto this grpc_exec_ctx. - * Caller must guarantee that no interfering locks are held. */ -void grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); + * Caller must guarantee that no interfering locks are held. + * Returns 1 if work was performed, 0 otherwise. */ +int grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index c2f62a41b8..cf33d74366 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -50,13 +50,28 @@ static ULONG g_iocp_kick_token; static OVERLAPPED g_iocp_custom_overlap; -static gpr_event g_shutdown_iocp; -static gpr_event g_iocp_done; static gpr_atm g_custom_events = 0; static HANDLE g_iocp; -static void do_iocp_work(grpc_exec_ctx *exec_ctx) { +static DWORD deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now) { + gpr_timespec timeout; + static const int max_spin_polling_us = 10; + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { + return INFINITE; + } + if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { + return 0; + } + timeout = gpr_time_sub(deadline, now); + return gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); +} + +void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -66,10 +81,10 @@ static void do_iocp_work(grpc_exec_ctx *exec_ctx) { grpc_winsocket_callback_info *info; grpc_closure *closure = NULL; success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, - &overlapped, INFINITE); - /* success = 0 and overlapped = NULL means the deadline got attained. - Which is impossible. since our wait time is +inf */ - GPR_ASSERT(success || overlapped); + &overlapped, deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); + if (success == 0 && overlapped == NULL) { + return; + } GPR_ASSERT(completion_key && overlapped); if (overlapped == &g_iocp_custom_overlap) { gpr_atm_full_fetch_add(&g_custom_events, -1); @@ -104,34 +119,13 @@ static void do_iocp_work(grpc_exec_ctx *exec_ctx) { info->has_pending_iocp = 1; } gpr_mu_unlock(&socket->state_mu); - if (closure) { - closure->cb(exec_ctx, closure->cb_arg, 1); - } -} - -static void iocp_loop(void *p) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - while (gpr_atm_acq_load(&g_custom_events) || - !gpr_event_get(&g_shutdown_iocp)) { - do_iocp_work(&exec_ctx); - grpc_exec_ctx_flush(&exec_ctx); - } - grpc_exec_ctx_finish(&exec_ctx); - - gpr_event_set(&g_iocp_done, (void *)1); + grpc_exec_ctx_enqueue(exec_ctx, closure, 1); } void grpc_iocp_init(void) { - gpr_thd_id id; - g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0); GPR_ASSERT(g_iocp); - - gpr_event_init(&g_iocp_done); - gpr_event_init(&g_shutdown_iocp); - gpr_thd_new(&id, iocp_loop, NULL, NULL); } void grpc_iocp_kick(void) { @@ -143,13 +137,22 @@ void grpc_iocp_kick(void) { GPR_ASSERT(success); } +void grpc_iocp_flush(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + + do { + grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); + } while (grpc_exec_ctx_flush(&exec_ctx)); +} + void grpc_iocp_shutdown(void) { - BOOL success; - gpr_event_set(&g_shutdown_iocp, (void *)1); - grpc_iocp_kick(); - gpr_event_wait(&g_iocp_done, gpr_inf_future(GPR_CLOCK_REALTIME)); - success = CloseHandle(g_iocp); - GPR_ASSERT(success); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + while (gpr_atm_acq_load(&g_custom_events)) { + grpc_iocp_work(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_exec_ctx_flush(&exec_ctx); + } + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(CloseHandle(g_iocp)); } void grpc_iocp_add_socket(grpc_winsocket *socket) { diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index b0209e04e3..75f3ba8477 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -38,8 +38,10 @@ #include "src/core/iomgr/socket_windows.h" +void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); +void grpc_iocp_flush(void); void grpc_iocp_shutdown(void); void grpc_iocp_add_socket(grpc_winsocket *); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 612419b70e..a10399311f 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -66,6 +66,7 @@ void grpc_iomgr_init(void) { g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_iomgr_platform_init(); + grpc_pollset_global_init(); } static size_t count_objects(void) { @@ -90,6 +91,8 @@ void grpc_iomgr_shutdown(void) { gpr_timespec last_warning_time = gpr_now(GPR_CLOCK_REALTIME); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_iomgr_platform_flush(); + gpr_mu_lock(&g_mu); g_shutdown = 1; while (g_root_object.next != &g_root_object) { @@ -135,6 +138,7 @@ void grpc_iomgr_shutdown(void) { gpr_mu_lock(&g_mu); gpr_mu_unlock(&g_mu); + grpc_pollset_global_shutdown(); grpc_iomgr_platform_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index f266732c96..e372c18e8a 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -43,10 +43,16 @@ typedef struct grpc_iomgr_object { struct grpc_iomgr_object *prev; } grpc_iomgr_object; +void grpc_pollset_global_init(void); +void grpc_pollset_global_shutdown(void); + void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name); void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); void grpc_iomgr_platform_init(void); +/** flush any globally queued work from iomgr */ +void grpc_iomgr_platform_flush(void); +/** tear down all platform specific global iomgr structures */ void grpc_iomgr_platform_shutdown(void); #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H */ diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index 2425e59941..f6474b7e6d 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -42,12 +42,13 @@ void grpc_iomgr_platform_init(void) { grpc_fd_global_init(); - grpc_pollset_global_init(); grpc_register_tracer("tcp", &grpc_tcp_trace); } +void grpc_iomgr_platform_flush(void) { +} + void grpc_iomgr_platform_shutdown(void) { - grpc_pollset_global_shutdown(); grpc_fd_global_shutdown(); } diff --git a/src/core/iomgr/iomgr_posix.h b/src/core/iomgr/iomgr_posix.h index 716fedb636..068a5c6d7c 100644 --- a/src/core/iomgr/iomgr_posix.h +++ b/src/core/iomgr/iomgr_posix.h @@ -36,7 +36,4 @@ #include "src/core/iomgr/iomgr_internal.h" -void grpc_pollset_global_init(void); -void grpc_pollset_global_shutdown(void); - #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_POSIX_H */ diff --git a/src/core/iomgr/iomgr_windows.c b/src/core/iomgr/iomgr_windows.c index b49cb87e97..93bdc5ec16 100644 --- a/src/core/iomgr/iomgr_windows.c +++ b/src/core/iomgr/iomgr_windows.c @@ -63,6 +63,10 @@ void grpc_iomgr_platform_init(void) { grpc_iocp_init(); } +void grpc_iomgr_platform_flush(void) { + grpc_iocp_flush(); +} + void grpc_iomgr_platform_shutdown(void) { grpc_iocp_shutdown(); winsock_shutdown(); diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 6182eb3532..798b637635 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -39,38 +39,66 @@ #include "src/core/iomgr/alarm_internal.h" #include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" -static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev->next = worker->next; - worker->next->prev = worker->prev; +gpr_mu grpc_polling_mu; +static grpc_pollset_worker *g_active_poller; +static grpc_pollset_worker g_global_root_worker; + +void grpc_pollset_global_init() { + gpr_mu_init(&grpc_polling_mu); + g_active_poller = NULL; + g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = + &g_global_root_worker; +} + +void grpc_pollset_global_shutdown() { + gpr_mu_destroy(&grpc_polling_mu); +} + +static void remove_worker(grpc_pollset_worker *worker, + grpc_pollset_worker_link_type type) { + worker->links[type].prev->links[type].next = worker->links[type].next; + worker->links[type].next->links[type].prev = worker->links[type].prev; + worker->links[type].next = worker->links[type].prev = worker; } -static int has_workers(grpc_pollset *p) { - return p->root_worker.next != &p->root_worker; +static int has_workers(grpc_pollset_worker *root, grpc_pollset_worker_link_type type) { + return root->links[type].next != root; } -static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { - if (has_workers(p)) { - grpc_pollset_worker *w = p->root_worker.next; - remove_worker(p, w); +static grpc_pollset_worker *pop_front_worker( + grpc_pollset_worker *root, grpc_pollset_worker_link_type type) { + if (has_workers(root, type)) { + grpc_pollset_worker *w = root->links[type].next; + remove_worker(w, type); return w; } else { return NULL; } } -static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->next = &p->root_worker; - worker->prev = worker->next->prev; - worker->prev->next = worker->next->prev = worker; +static void push_back_worker(grpc_pollset_worker *root, + grpc_pollset_worker_link_type type, + grpc_pollset_worker *worker) { + worker->links[type].next = root; + worker->links[type].prev = worker->links[type].next->links[type].prev; + worker->links[type].prev->links[type].next = + worker->links[type].next->links[type].prev = + worker; } -static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev = &p->root_worker; - worker->next = worker->prev->next; - worker->prev->next = worker->next->prev = worker; +static void push_front_worker(grpc_pollset_worker *root, + grpc_pollset_worker_link_type type, + grpc_pollset_worker *worker) { + worker->links[type].prev = root; + worker->links[type].next = worker->links[type].prev->links[type].next; + worker->links[type].prev->links[type].next = + worker->links[type].next->links[type].prev = + worker; } /* There isn't really any such thing as a pollset under Windows, due to the @@ -80,69 +108,122 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { void grpc_pollset_init(grpc_pollset *pollset) { memset(pollset, 0, sizeof(*pollset)); - gpr_mu_init(&pollset->mu); - pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; - pollset->kicked_without_pollers = 0; + pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + &pollset->root_worker; } void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_closure *closure) { - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&grpc_polling_mu); pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); - gpr_mu_unlock(&pollset->mu); - grpc_exec_ctx_enqueue(exec_ctx, closure, 1); + if (!pollset->is_iocp_worker) { + grpc_exec_ctx_enqueue(exec_ctx, closure, 1); + } else { + pollset->on_shutdown = closure; + } + gpr_mu_unlock(&grpc_polling_mu); } void grpc_pollset_destroy(grpc_pollset *pollset) { - gpr_mu_destroy(&pollset->mu); } void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline) { int added_worker = 0; - worker->next = worker->prev = NULL; + worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + worker->links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = + NULL; + worker->kicked = 0; + worker->pollset = pollset; gpr_cv_init(&worker->cv); if (grpc_alarm_check(exec_ctx, now, &deadline)) { goto done; } if (!pollset->kicked_without_pollers && !pollset->shutting_down) { - push_front_worker(pollset, worker); + if (g_active_poller == NULL) { + grpc_pollset_worker *next_worker; + /* become poller */ + pollset->is_iocp_worker = 1; + g_active_poller = worker; + gpr_mu_unlock(&grpc_polling_mu); + grpc_iocp_work(exec_ctx, deadline); + grpc_exec_ctx_flush(exec_ctx); + gpr_mu_lock(&grpc_polling_mu); + pollset->is_iocp_worker = 0; + g_active_poller = NULL; + /* try to get a worker from this pollsets worker list */ + next_worker = pop_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET); + /* try to get a worker from the global list */ + next_worker = pop_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + if (next_worker != NULL) { + next_worker->kicked = 1; + gpr_cv_signal(&next_worker->cv); + } + + if (pollset->shutting_down && pollset->on_shutdown != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1); + pollset->on_shutdown = NULL; + } + goto done; + } + push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, worker); + push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, worker); added_worker = 1; - gpr_cv_wait(&worker->cv, &pollset->mu, deadline); + while (!worker->kicked) { + if (gpr_cv_wait(&worker->cv, &grpc_polling_mu, deadline)) { + break; + } + } } else { pollset->kicked_without_pollers = 0; } done: if (!grpc_closure_list_empty(exec_ctx->closure_list)) { - gpr_mu_unlock(&pollset->mu); + gpr_mu_unlock(&grpc_polling_mu); grpc_exec_ctx_flush(exec_ctx); - gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&grpc_polling_mu); } - gpr_cv_destroy(&worker->cv); if (added_worker) { - remove_worker(pollset, worker); + remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } + gpr_cv_destroy(&worker->cv); } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { - for (specific_worker = p->root_worker.next; + for (specific_worker = p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next; specific_worker != &p->root_worker; - specific_worker = specific_worker->next) { + specific_worker = specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) { + specific_worker->kicked = 1; gpr_cv_signal(&specific_worker->cv); } p->kicked_without_pollers = 1; + if (p->is_iocp_worker) { + grpc_iocp_kick(); + } } else { - gpr_cv_signal(&specific_worker->cv); + if (p->is_iocp_worker) { + if (g_active_poller == specific_worker) { + grpc_iocp_kick(); + } + } else { + specific_worker->kicked = 1; + gpr_cv_signal(&specific_worker->cv); + } } } else { - specific_worker = pop_front_worker(p); + specific_worker = pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET); if (specific_worker != NULL) { - push_back_worker(p, specific_worker); - gpr_cv_signal(&specific_worker->cv); + grpc_pollset_kick(p, specific_worker); + } else if (p->is_iocp_worker) { + grpc_iocp_kick(); } else { p->kicked_without_pollers = 1; } diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 4efa5a1717..65ba80619b 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -43,19 +43,37 @@ used to synchronize with the IOCP, and workers are condition variables used to block threads until work is ready. */ -typedef struct grpc_pollset_worker { - gpr_cv cv; +typedef enum { + GRPC_POLLSET_WORKER_LINK_POLLSET = 0, + GRPC_POLLSET_WORKER_LINK_GLOBAL, + GRPC_POLLSET_WORKER_LINK_TYPES +} grpc_pollset_worker_link_type; + +typedef struct grpc_pollset_worker_link { struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; +} grpc_pollset_worker_link; + +struct grpc_pollset; +typedef struct grpc_pollset grpc_pollset; + +typedef struct grpc_pollset_worker { + gpr_cv cv; + int kicked; + struct grpc_pollset *pollset; + grpc_pollset_worker_link links[GRPC_POLLSET_WORKER_LINK_TYPES]; } grpc_pollset_worker; -typedef struct grpc_pollset { - gpr_mu mu; +struct grpc_pollset { int shutting_down; int kicked_without_pollers; + int is_iocp_worker; grpc_pollset_worker root_worker; -} grpc_pollset; + grpc_closure *on_shutdown; +}; + +extern gpr_mu grpc_polling_mu; -#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) +#define GRPC_POLLSET_MU(pollset) (&grpc_polling_mu) #endif /* GRPC_INTERNAL_CORE_IOMGR_POLLSET_WINDOWS_H */ |