diff options
Diffstat (limited to 'src/core/iomgr')
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 14 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.h | 11 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_posix.c | 19 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_windows.c | 19 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/timer.c | 16 | ||||
-rw-r--r-- | src/core/iomgr/timer.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/timer_heap.c | 22 |
9 files changed, 68 insertions, 43 deletions
diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 807729708e..fa87e5246b 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -71,7 +71,8 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline, timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } -void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { +grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, + gpr_timespec deadline) { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -84,14 +85,14 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { g_iocp, &bytes, &completion_key, &overlapped, deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); if (success == 0 && overlapped == NULL) { - return; + return GRPC_IOCP_WORK_TIMEOUT; } GPR_ASSERT(completion_key && overlapped); if (overlapped == &g_iocp_custom_overlap) { gpr_atm_full_fetch_add(&g_custom_events, -1); if (completion_key == (ULONG_PTR)&g_iocp_kick_token) { /* We were awoken from a kick. */ - return; + return GRPC_IOCP_WORK_KICK; } gpr_log(GPR_ERROR, "Unknown custom completion key."); abort(); @@ -121,6 +122,7 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { } gpr_mu_unlock(&socket->state_mu); grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); + return GRPC_IOCP_WORK_WORK; } void grpc_iocp_init(void) { @@ -140,10 +142,12 @@ void grpc_iocp_kick(void) { void grpc_iocp_flush(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_iocp_work_status work_status; do { - grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); - } while (grpc_exec_ctx_flush(&exec_ctx)); + work_status = grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); + } while (work_status == GRPC_IOCP_WORK_KICK || + grpc_exec_ctx_flush(&exec_ctx)); } void grpc_iocp_shutdown(void) { diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index 75f3ba8477..8b2b1aeb5c 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,7 +38,14 @@ #include "src/core/iomgr/socket_windows.h" -void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline); +typedef enum { + GRPC_IOCP_WORK_WORK, + GRPC_IOCP_WORK_TIMEOUT, + GRPC_IOCP_WORK_KICK +} grpc_iocp_work_status; + +grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, + gpr_timespec deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_flush(void); diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h index 01eedffa88..b059630457 100644 --- a/src/core/iomgr/resolve_address.h +++ b/src/core/iomgr/resolve_address.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -66,7 +66,7 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); /* Resolve addr in a blocking fashion. Returns NULL on failure. On success, result must be freed with grpc_resolved_addresses_destroy. */ -grpc_resolved_addresses *grpc_blocking_resolve_address( - const char *addr, const char *default_port); +extern grpc_resolved_addresses *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port); #endif /* GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index c51745b918..a6c9893f23 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -34,18 +34,13 @@ #include <grpc/support/port_platform.h> #ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/sockaddr.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr.h" +#include <string.h> #include <sys/types.h> #include <sys/un.h> -#include <string.h> -#include "src/core/iomgr/executor.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/sockaddr_utils.h" -#include "src/core/support/block_annotate.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> @@ -53,6 +48,11 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/executor.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" +#include "src/core/support/string.h" typedef struct { char *name; @@ -62,7 +62,7 @@ typedef struct { void *arg; } request; -grpc_resolved_addresses *grpc_blocking_resolve_address( +static grpc_resolved_addresses *blocking_resolve_address_impl( const char *name, const char *default_port) { struct addrinfo hints; struct addrinfo *result = NULL, *resp; @@ -150,6 +150,9 @@ done: return addrs; } +grpc_resolved_addresses *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port) = blocking_resolve_address_impl; + /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 28c8661e73..472e797163 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -34,17 +34,12 @@ #include <grpc/support/port_platform.h> #ifdef GPR_WINSOCK_SOCKET -#include "src/core/iomgr/sockaddr.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr.h" -#include <sys/types.h> #include <string.h> +#include <sys/types.h> -#include "src/core/iomgr/executor.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/sockaddr_utils.h" -#include "src/core/support/block_annotate.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> @@ -52,6 +47,11 @@ #include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> +#include "src/core/iomgr/executor.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" +#include "src/core/support/string.h" typedef struct { char *name; @@ -61,7 +61,7 @@ typedef struct { void *arg; } request; -grpc_resolved_addresses *grpc_blocking_resolve_address( +static grpc_resolved_addresses *blocking_resolve_address_impl( const char *name, const char *default_port) { struct addrinfo hints; struct addrinfo *result = NULL, *resp; @@ -133,6 +133,9 @@ done: return addrs; } +grpc_resolved_addresses *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port) = blocking_resolve_address_impl; + /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index ce930b8f41..a4abc5b974 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -240,8 +240,7 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, sp->shutting_down = 0; gpr_mu_lock(&sp->server->mu); GPR_ASSERT(sp->server->active_ports > 0); - if (0 == --sp->server->active_ports && - sp->server->shutdown_complete != NULL) { + if (0 == --sp->server->active_ports) { notify = 1; } gpr_mu_unlock(&sp->server->mu); diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c index 8379fffad0..f444643428 100644 --- a/src/core/iomgr/timer.c +++ b/src/core/iomgr/timer.c @@ -33,11 +33,11 @@ #include "src/core/iomgr/timer.h" -#include "src/core/iomgr/timer_heap.h" -#include "src/core/iomgr/time_averaged_stats.h" #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/time_averaged_stats.h" +#include "src/core/iomgr/timer_heap.h" #define INVALID_HEAP_INDEX 0xffffffffu @@ -330,6 +330,18 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_checker_mu); + } else if (next != NULL) { + /* TODO(ctiller): this forces calling code to do an short poll, and + then retry the timer check (because this time through the timer list was + contended). + + We could reduce the cost here dramatically by keeping a count of how many + currently active pollers got through the uncontended case above + successfully, and waking up other pollers IFF that count drops to zero. + + Once that count is in place, this entire else branch could disappear. */ + *next = gpr_time_min( + *next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN))); } return (int)n; diff --git a/src/core/iomgr/timer.h b/src/core/iomgr/timer.h index 9ad1e92f42..e239e884e7 100644 --- a/src/core/iomgr/timer.h +++ b/src/core/iomgr/timer.h @@ -96,7 +96,6 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); *next is never guaranteed to be updated on any given execution; however, with high probability at least one thread in the system will see an update at any time slice. */ - bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next); void grpc_timer_list_init(gpr_timespec now); diff --git a/src/core/iomgr/timer_heap.c b/src/core/iomgr/timer_heap.c index 9d8be5c1fc..b5df566c45 100644 --- a/src/core/iomgr/timer_heap.c +++ b/src/core/iomgr/timer_heap.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -46,7 +46,7 @@ static void adjust_upwards(grpc_timer **first, uint32_t i, grpc_timer *t) { while (i > 0) { uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(first[parent]->deadline, t->deadline) >= 0) break; + if (gpr_time_cmp(first[parent]->deadline, t->deadline) <= 0) break; first[i] = first[parent]; first[i]->heap_index = i; i = parent; @@ -62,16 +62,14 @@ static void adjust_downwards(grpc_timer **first, uint32_t i, uint32_t length, grpc_timer *t) { for (;;) { uint32_t left_child = 1u + 2u * i; - uint32_t right_child; - uint32_t next_i; if (left_child >= length) break; - right_child = left_child + 1; - next_i = right_child < length && - gpr_time_cmp(first[left_child]->deadline, - first[right_child]->deadline) < 0 - ? right_child - : left_child; - if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break; + uint32_t right_child = left_child + 1; + uint32_t next_i = right_child < length && + gpr_time_cmp(first[left_child]->deadline, + first[right_child]->deadline) > 0 + ? right_child + : left_child; + if (gpr_time_cmp(t->deadline, first[next_i]->deadline) <= 0) break; first[i] = first[next_i]; first[i]->heap_index = i; i = next_i; @@ -95,7 +93,7 @@ static void maybe_shrink(grpc_timer_heap *heap) { static void note_changed_priority(grpc_timer_heap *heap, grpc_timer *timer) { uint32_t i = timer->heap_index; uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) < 0) { + if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) > 0) { adjust_upwards(heap->timers, i, timer); } else { adjust_downwards(heap->timers, i, heap->timer_count, timer); |