diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 61 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_internal.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_posix.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/iomgr_posix.h | 3 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 133 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.h | 16 |
8 files changed, 153 insertions, 68 deletions
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index c2f62a41b8..af846f680f 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -50,13 +50,28 @@ static ULONG g_iocp_kick_token; static OVERLAPPED g_iocp_custom_overlap; -static gpr_event g_shutdown_iocp; -static gpr_event g_iocp_done; static gpr_atm g_custom_events = 0; static HANDLE g_iocp; -static void do_iocp_work(grpc_exec_ctx *exec_ctx) { +static DWORD deadline_to_millis_timeout(gpr_timespec deadline, + gpr_timespec now) { + gpr_timespec timeout; + static const int max_spin_polling_us = 10; + if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) { + return INFINITE; + } + if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros( + max_spin_polling_us, + GPR_TIMESPAN))) <= 0) { + return 0; + } + timeout = gpr_time_sub(deadline, now); + return gpr_time_to_millis(gpr_time_add( + timeout, gpr_time_from_nanos(GPR_NS_PER_SEC - 1, GPR_TIMESPAN))); +} + +void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -66,10 +81,10 @@ static void do_iocp_work(grpc_exec_ctx *exec_ctx) { grpc_winsocket_callback_info *info; grpc_closure *closure = NULL; success = GetQueuedCompletionStatus(g_iocp, &bytes, &completion_key, - &overlapped, INFINITE); - /* success = 0 and overlapped = NULL means the deadline got attained. - Which is impossible. since our wait time is +inf */ - GPR_ASSERT(success || overlapped); + &overlapped, deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); + if (success == 0 && overlapped == NULL) { + return; + } GPR_ASSERT(completion_key && overlapped); if (overlapped == &g_iocp_custom_overlap) { gpr_atm_full_fetch_add(&g_custom_events, -1); @@ -109,29 +124,10 @@ static void do_iocp_work(grpc_exec_ctx *exec_ctx) { } } -static void iocp_loop(void *p) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - - while (gpr_atm_acq_load(&g_custom_events) || - !gpr_event_get(&g_shutdown_iocp)) { - do_iocp_work(&exec_ctx); - grpc_exec_ctx_flush(&exec_ctx); - } - grpc_exec_ctx_finish(&exec_ctx); - - gpr_event_set(&g_iocp_done, (void *)1); -} - void grpc_iocp_init(void) { - gpr_thd_id id; - g_iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0); GPR_ASSERT(g_iocp); - - gpr_event_init(&g_iocp_done); - gpr_event_init(&g_shutdown_iocp); - gpr_thd_new(&id, iocp_loop, NULL, NULL); } void grpc_iocp_kick(void) { @@ -144,12 +140,13 @@ void grpc_iocp_kick(void) { } void grpc_iocp_shutdown(void) { - BOOL success; - gpr_event_set(&g_shutdown_iocp, (void *)1); - grpc_iocp_kick(); - gpr_event_wait(&g_iocp_done, gpr_inf_future(GPR_CLOCK_REALTIME)); - success = CloseHandle(g_iocp); - GPR_ASSERT(success); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + while (gpr_atm_acq_load(&g_custom_events)) { + grpc_iocp_work(&exec_ctx, gpr_inf_future(GPR_CLOCK_MONOTONIC)); + grpc_exec_ctx_flush(&exec_ctx); + } + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(CloseHandle(g_iocp)); } void grpc_iocp_add_socket(grpc_winsocket *socket) { diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index b0209e04e3..7e330e7ce2 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -38,6 +38,7 @@ #include "src/core/iomgr/socket_windows.h" +void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_shutdown(void); diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index ef222416af..fe5c1d4e8f 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -66,6 +66,7 @@ void grpc_iomgr_init(void) { g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; grpc_iomgr_platform_init(); + grpc_pollset_global_init(); } static size_t count_objects(void) { @@ -138,6 +139,7 @@ void grpc_iomgr_shutdown(void) { gpr_mu_lock(&g_mu); gpr_mu_unlock(&g_mu); + grpc_pollset_global_shutdown(); grpc_iomgr_platform_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h index f266732c96..1a0724b431 100644 --- a/src/core/iomgr/iomgr_internal.h +++ b/src/core/iomgr/iomgr_internal.h @@ -43,6 +43,9 @@ typedef struct grpc_iomgr_object { struct grpc_iomgr_object *prev; } grpc_iomgr_object; +void grpc_pollset_global_init(void); +void grpc_pollset_global_shutdown(void); + void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name); void grpc_iomgr_unregister_object(grpc_iomgr_object *obj); diff --git a/src/core/iomgr/iomgr_posix.c b/src/core/iomgr/iomgr_posix.c index 2425e59941..db93d0a756 100644 --- a/src/core/iomgr/iomgr_posix.c +++ b/src/core/iomgr/iomgr_posix.c @@ -42,12 +42,10 @@ void grpc_iomgr_platform_init(void) { grpc_fd_global_init(); - grpc_pollset_global_init(); grpc_register_tracer("tcp", &grpc_tcp_trace); } void grpc_iomgr_platform_shutdown(void) { - grpc_pollset_global_shutdown(); grpc_fd_global_shutdown(); } diff --git a/src/core/iomgr/iomgr_posix.h b/src/core/iomgr/iomgr_posix.h index 716fedb636..068a5c6d7c 100644 --- a/src/core/iomgr/iomgr_posix.h +++ b/src/core/iomgr/iomgr_posix.h @@ -36,7 +36,4 @@ #include "src/core/iomgr/iomgr_internal.h" -void grpc_pollset_global_init(void); -void grpc_pollset_global_shutdown(void); - #endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_POSIX_H */ diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 6182eb3532..cb0aad0e33 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -39,38 +39,66 @@ #include "src/core/iomgr/alarm_internal.h" #include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/iocp_windows.h" #include "src/core/iomgr/pollset.h" #include "src/core/iomgr/pollset_windows.h" -static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev->next = worker->next; - worker->next->prev = worker->prev; +static gpr_mu g_polling_mu; +static grpc_pollset_worker *g_active_poller; +static grpc_pollset_worker g_global_root_worker; + +void grpc_pollset_global_init() { + gpr_mu_init(&g_polling_mu); + g_active_poller = NULL; + g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].next = + g_global_root_worker.links[GRPC_POLLSET_WORKER_LINK_GLOBAL].prev = + &g_global_root_worker; +} + +void grpc_pollset_global_shutdown() { + gpr_mu_destroy(&g_polling_mu); } -static int has_workers(grpc_pollset *p) { - return p->root_worker.next != &p->root_worker; +static void remove_worker(grpc_pollset_worker *worker, + grpc_pollset_worker_link_type type) { + worker->links[type].prev->links[type].next = worker->links[type].next; + worker->links[type].next->links[type].prev = worker->links[type].prev; + worker->links[type].next = worker->links[type].prev = worker; } -static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) { - if (has_workers(p)) { - grpc_pollset_worker *w = p->root_worker.next; - remove_worker(p, w); +static int has_workers(grpc_pollset_worker *root, grpc_pollset_worker_link_type type) { + return root->links[type].next != root; +} + +static grpc_pollset_worker *pop_front_worker( + grpc_pollset_worker *root, grpc_pollset_worker_link_type type) { + if (has_workers(root, type)) { + grpc_pollset_worker *w = root->links[type].next; + remove_worker(w, type); return w; } else { return NULL; } } -static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->next = &p->root_worker; - worker->prev = worker->next->prev; - worker->prev->next = worker->next->prev = worker; +static void push_back_worker(grpc_pollset_worker *root, + grpc_pollset_worker_link_type type, + grpc_pollset_worker *worker) { + worker->links[type].next = root; + worker->links[type].prev = worker->links[type].next->links[type].prev; + worker->links[type].prev->links[type].next = + worker->links[type].next->links[type].prev = + worker; } -static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { - worker->prev = &p->root_worker; - worker->next = worker->prev->next; - worker->prev->next = worker->next->prev = worker; +static void push_front_worker(grpc_pollset_worker *root, + grpc_pollset_worker_link_type type, + grpc_pollset_worker *worker) { + worker->links[type].prev = root; + worker->links[type].next = worker->links[type].prev->links[type].next; + worker->links[type].prev->links[type].next = + worker->links[type].next->links[type].prev = + worker; } /* There isn't really any such thing as a pollset under Windows, due to the @@ -81,8 +109,9 @@ static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) { void grpc_pollset_init(grpc_pollset *pollset) { memset(pollset, 0, sizeof(*pollset)); gpr_mu_init(&pollset->mu); - pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker; - pollset->kicked_without_pollers = 0; + pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + pollset->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + &pollset->root_worker; } void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, @@ -90,8 +119,12 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, gpr_mu_lock(&pollset->mu); pollset->shutting_down = 1; grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); + if (!pollset->is_iocp_worker) { + grpc_exec_ctx_enqueue(exec_ctx, closure, 1); + } else { + pollset->on_shutdown = closure; + } gpr_mu_unlock(&pollset->mu); - grpc_exec_ctx_enqueue(exec_ctx, closure, 1); } void grpc_pollset_destroy(grpc_pollset *pollset) { @@ -102,13 +135,42 @@ void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker, gpr_timespec now, gpr_timespec deadline) { int added_worker = 0; - worker->next = worker->prev = NULL; + worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next = + worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].prev = + NULL; gpr_cv_init(&worker->cv); if (grpc_alarm_check(exec_ctx, now, &deadline)) { goto done; } if (!pollset->kicked_without_pollers && !pollset->shutting_down) { - push_front_worker(pollset, worker); + gpr_mu_lock(&g_polling_mu); + if (g_active_poller == NULL) { + grpc_pollset_worker *next_worker; + /* become poller */ + pollset->is_iocp_worker = 1; + g_active_poller = worker; + gpr_mu_unlock(&g_polling_mu); + gpr_mu_unlock(&pollset->mu); + grpc_iocp_work(exec_ctx, deadline); + gpr_mu_lock(&pollset->mu); + gpr_mu_lock(&g_polling_mu); + pollset->is_iocp_worker = 0; + g_active_poller = NULL; + next_worker = pop_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + if (next_worker != NULL) { + gpr_cv_signal(&next_worker->cv); + } + gpr_mu_unlock(&g_polling_mu); + + if (pollset->shutting_down && pollset->on_shutdown != NULL) { + grpc_exec_ctx_enqueue(exec_ctx, pollset->on_shutdown, 1); + pollset->on_shutdown = NULL; + } + goto done; + } + push_front_worker(&g_global_root_worker, GRPC_POLLSET_WORKER_LINK_GLOBAL, worker); + gpr_mu_unlock(&g_polling_mu); + push_front_worker(&pollset->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET, worker); added_worker = 1; gpr_cv_wait(&worker->cv, &pollset->mu, deadline); } else { @@ -122,27 +184,40 @@ done: } gpr_cv_destroy(&worker->cv); if (added_worker) { - remove_worker(pollset, worker); + remove_worker(worker, GRPC_POLLSET_WORKER_LINK_GLOBAL); + remove_worker(worker, GRPC_POLLSET_WORKER_LINK_POLLSET); } } void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) { if (specific_worker != NULL) { if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { - for (specific_worker = p->root_worker.next; + for (specific_worker = p->root_worker.links[GRPC_POLLSET_WORKER_LINK_POLLSET].next; specific_worker != &p->root_worker; - specific_worker = specific_worker->next) { + specific_worker = specific_worker->links[GRPC_POLLSET_WORKER_LINK_POLLSET].next) { gpr_cv_signal(&specific_worker->cv); } p->kicked_without_pollers = 1; + if (p->is_iocp_worker) { + grpc_iocp_kick(); + } } else { - gpr_cv_signal(&specific_worker->cv); + if (p->is_iocp_worker) { + gpr_mu_lock(&g_polling_mu); + if (g_active_poller == specific_worker) { + grpc_iocp_kick(); + } + gpr_mu_unlock(&g_polling_mu); + } else { + gpr_cv_signal(&specific_worker->cv); + } } } else { - specific_worker = pop_front_worker(p); + specific_worker = pop_front_worker(&p->root_worker, GRPC_POLLSET_WORKER_LINK_POLLSET); if (specific_worker != NULL) { - push_back_worker(p, specific_worker); - gpr_cv_signal(&specific_worker->cv); + grpc_pollset_kick(p, specific_worker); + } else if (p->is_iocp_worker) { + grpc_iocp_kick(); } else { p->kicked_without_pollers = 1; } diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index 4efa5a1717..55f87aca72 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -43,17 +43,29 @@ used to synchronize with the IOCP, and workers are condition variables used to block threads until work is ready. */ -typedef struct grpc_pollset_worker { - gpr_cv cv; +typedef enum { + GRPC_POLLSET_WORKER_LINK_POLLSET = 0, + GRPC_POLLSET_WORKER_LINK_GLOBAL, + GRPC_POLLSET_WORKER_LINK_TYPES +} grpc_pollset_worker_link_type; + +typedef struct grpc_pollset_worker_link { struct grpc_pollset_worker *next; struct grpc_pollset_worker *prev; +} grpc_pollset_worker_link; + +typedef struct grpc_pollset_worker { + gpr_cv cv; + grpc_pollset_worker_link links[GRPC_POLLSET_WORKER_LINK_TYPES]; } grpc_pollset_worker; typedef struct grpc_pollset { gpr_mu mu; int shutting_down; int kicked_without_pollers; + int is_iocp_worker; grpc_pollset_worker root_worker; + grpc_closure *on_shutdown; } grpc_pollset; #define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) |