diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/client_channel.c | 29 | ||||
-rw-r--r-- | src/core/channel/subchannel_call_holder.c | 18 | ||||
-rw-r--r-- | src/core/client_config/client_config.c | 6 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 2 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 57 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 34 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.c | 14 | ||||
-rw-r--r-- | src/core/iomgr/iocp_windows.h | 11 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address.h | 6 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_posix.c | 19 | ||||
-rw-r--r-- | src/core/iomgr/resolve_address_windows.c | 19 | ||||
-rw-r--r-- | src/core/iomgr/tcp_server_windows.c | 3 | ||||
-rw-r--r-- | src/core/iomgr/timer.c | 16 | ||||
-rw-r--r-- | src/core/iomgr/timer.h | 1 | ||||
-rw-r--r-- | src/core/iomgr/timer_heap.c | 22 | ||||
-rw-r--r-- | src/core/support/alloc.c | 4 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 6 | ||||
-rw-r--r-- | src/core/tsi/ssl_transport_security.c | 76 | ||||
-rw-r--r-- | src/core/tsi/ssl_transport_security.h | 5 |
20 files changed, 223 insertions, 127 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index d4ba950818..f021a8ae32 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -165,7 +165,6 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; - grpc_resolver *old_resolver; grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE; int exit_idle = 0; @@ -201,28 +200,25 @@ static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg, } if (iomgr_success && chand->resolver) { - grpc_resolver *resolver = chand->resolver; - GRPC_RESOLVER_REF(resolver, "channel-next"); grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, "new_lb+resolver"); if (lb_policy != NULL) { watch_lb_policy(exec_ctx, chand, lb_policy, state); } - gpr_mu_unlock(&chand->mu_config); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration, + grpc_resolver_next(exec_ctx, chand->resolver, + &chand->incoming_configuration, &chand->on_config_changed); - GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next"); + gpr_mu_unlock(&chand->mu_config); } else { - old_resolver = chand->resolver; - chand->resolver = NULL; + if (chand->resolver != NULL) { + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + chand->resolver = NULL; + } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone"); gpr_mu_unlock(&chand->mu_config); - if (old_resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, old_resolver); - GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel"); - } } if (exit_idle) { @@ -247,7 +243,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { channel_data *chand = elem->channel_data; - grpc_resolver *destroy_resolver = NULL; grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, true, NULL); @@ -279,7 +274,8 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, if (op->disconnect && chand->resolver != NULL) { grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, "disconnect"); - destroy_resolver = chand->resolver; + grpc_resolver_shutdown(exec_ctx, chand->resolver); + GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; if (chand->lb_policy != NULL) { grpc_pollset_set_del_pollset_set(exec_ctx, @@ -290,11 +286,6 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } } gpr_mu_unlock(&chand->mu_config); - - if (destroy_resolver) { - grpc_resolver_shutdown(exec_ctx, destroy_resolver); - GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel"); - } } typedef struct { diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c index 8f46885a04..9c087dc2a1 100644 --- a/src/core/channel/subchannel_call_holder.c +++ b/src/core/channel/subchannel_call_holder.c @@ -174,17 +174,15 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) { holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; if (holder->connected_subchannel == NULL) { fail_locked(exec_ctx, holder); + } else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) { + /* already cancelled before subchannel became ready */ + fail_locked(exec_ctx, holder); } else { - if (!gpr_atm_rel_cas( - &holder->subchannel_call, 0, - (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( - exec_ctx, holder->connected_subchannel, holder->pollset))) { - GPR_ASSERT(gpr_atm_acq_load(&holder->subchannel_call) == 1); - /* if this cas fails, the call was cancelled before the pick completed */ - fail_locked(exec_ctx, holder); - } else { - retry_waiting_locked(exec_ctx, holder); - } + gpr_atm_rel_store( + &holder->subchannel_call, + (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call( + exec_ctx, holder->connected_subchannel, holder->pollset)); + retry_waiting_locked(exec_ctx, holder); } gpr_mu_unlock(&holder->mu); GRPC_CALL_STACK_UNREF(exec_ctx, holder->owning_call, "pick_subchannel"); diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c index 6ecffb3854..c500af25ee 100644 --- a/src/core/client_config/client_config.c +++ b/src/core/client_config/client_config.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -53,7 +53,9 @@ void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); } void grpc_client_config_unref(grpc_exec_ctx *exec_ctx, grpc_client_config *c) { if (gpr_unref(&c->refs)) { - GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config"); + if (c->lb_policy != NULL) { + GRPC_LB_POLICY_UNREF(exec_ctx, c->lb_policy, "client_config"); + } gpr_free(c); } } diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 81167b31c8..8ed1223d39 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -387,8 +387,8 @@ static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {} static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, grpc_lb_policy_args *args) { + if (args->num_subchannels == 0) return NULL; pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - GPR_ASSERT(args->num_subchannels > 0); memset(p, 0, sizeof(*p)); grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 376b6b3d76..e28e4757a1 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -41,6 +41,7 @@ #include "src/core/client_config/lb_policy_registry.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/timer.h" #include "src/core/support/string.h" typedef struct { @@ -71,6 +72,9 @@ typedef struct { grpc_client_config **target_config; /** current (fully resolved) config */ grpc_client_config *resolved_config; + /** retry timer */ + bool have_retry_timer; + grpc_timer retry_timer; } dns_resolver; static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); @@ -91,6 +95,9 @@ static const grpc_resolver_vtable dns_resolver_vtable = { static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; gpr_mu_lock(&r->mu); + if (r->have_retry_timer) { + grpc_timer_cancel(exec_ctx, &r->retry_timer); + } if (r->next_completion != NULL) { *r->target_config = NULL; grpc_exec_ctx_enqueue(exec_ctx, r->next_completion, true, NULL); @@ -125,6 +132,22 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, gpr_mu_unlock(&r->mu); } +static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, + bool success) { + dns_resolver *r = arg; + + gpr_mu_lock(&r->mu); + r->have_retry_timer = false; + if (success) { + if (!r->resolving) { + dns_start_resolving_locked(r); + } + } + gpr_mu_unlock(&r->mu); + + GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); +} + static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_resolved_addresses *addresses) { dns_resolver *r = arg; @@ -133,29 +156,47 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_subchannel_args args; grpc_lb_policy *lb_policy; size_t i; - if (addresses) { + gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving); + r->resolving = 0; + if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); + size_t naddrs = 0; for (i = 0; i < addresses->naddrs; i++) { memset(&args, 0, sizeof(args)); args.addr = (struct sockaddr *)(addresses->addrs[i].addr); args.addr_len = (size_t)addresses->addrs[i].len; - subchannels[i] = grpc_subchannel_factory_create_subchannel( + grpc_subchannel *subchannel = grpc_subchannel_factory_create_subchannel( exec_ctx, r->subchannel_factory, &args); + if (subchannel != NULL) { + subchannels[naddrs++] = subchannel; + } } memset(&lb_policy_args, 0, sizeof(lb_policy_args)); lb_policy_args.subchannels = subchannels; - lb_policy_args.num_subchannels = addresses->naddrs; + lb_policy_args.num_subchannels = naddrs; lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); - grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + if (lb_policy != NULL) { + grpc_client_config_set_lb_policy(config, lb_policy); + GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "construction"); + } grpc_resolved_addresses_destroy(addresses); gpr_free(subchannels); + } else { + int retry_seconds = 15; + gpr_log(GPR_DEBUG, "dns resolution failed: retrying in %d seconds", + retry_seconds); + GPR_ASSERT(!r->have_retry_timer); + r->have_retry_timer = true; + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + GRPC_RESOLVER_REF(&r->base, "retry-timer"); + grpc_timer_init( + exec_ctx, &r->retry_timer, + gpr_time_add(now, gpr_time_from_seconds(retry_seconds, GPR_TIMESPAN)), + dns_on_retry_timer, r, now); } - gpr_mu_lock(&r->mu); - GPR_ASSERT(r->resolving); - r->resolving = 0; if (r->resolved_config) { grpc_client_config_unref(exec_ctx, r->resolved_config); } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a5e8ebdfb0..020b4e6fd4 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -39,7 +39,6 @@ #include <grpc/support/avl.h> #include "src/core/channel/channel_args.h" -#include "src/core/surface/channel_init.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/client_config/initial_connect_string.h" @@ -47,6 +46,7 @@ #include "src/core/iomgr/timer.h" #include "src/core/profiling/timers.h" #include "src/core/surface/channel.h" +#include "src/core/surface/channel_init.h" #include "src/core/transport/connectivity_state.h" #define INTERNAL_REF_BITS 16 @@ -185,8 +185,8 @@ static void connection_destroy(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(c); } -void grpc_connected_subchannel_ref(grpc_connected_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_connected_subchannel_ref( + grpc_connected_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CHANNEL_STACK_REF(CHANNEL_STACK_FROM_CONNECTION(c), REF_REASON); } @@ -227,8 +227,8 @@ static gpr_atm ref_mutate(grpc_subchannel *c, gpr_atm delta, return old_val; } -grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_ref( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, (1 << INTERNAL_REF_BITS), 0 REF_MUTATE_PURPOSE("STRONG_REF")); @@ -236,8 +236,8 @@ grpc_subchannel *grpc_subchannel_ref(grpc_subchannel *c return c; } -grpc_subchannel *grpc_subchannel_weak_ref(grpc_subchannel *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +grpc_subchannel *grpc_subchannel_weak_ref( + grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_atm old_refs; old_refs = ref_mutate(c, 1, 0 REF_MUTATE_PURPOSE("WEAK_REF")); GPR_ASSERT(old_refs != 0); @@ -506,7 +506,8 @@ void grpc_connected_subchannel_ping(grpc_exec_ctx *exec_ctx, elem->filter->start_transport_op(exec_ctx, elem, &op); } -static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { +static void publish_transport_locked(grpc_exec_ctx *exec_ctx, + grpc_subchannel *c) { grpc_connected_subchannel *con; grpc_channel_stack *stk; state_watcher *sw_subchannel; @@ -525,10 +526,7 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, sw_subchannel); - gpr_mu_lock(&c->mu); - if (c->disconnected) { - gpr_mu_unlock(&c->mu); gpr_free(sw_subchannel); grpc_channel_stack_destroy(exec_ctx, stk); gpr_free(con); @@ -557,8 +555,6 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { /* signal completion */ grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_READY, "connected"); - - gpr_mu_unlock(&c->mu); } /* Generate a random number between 0 and 1. */ @@ -626,21 +622,23 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, bool iomgr_success) { grpc_subchannel *c = arg; + GRPC_SUBCHANNEL_WEAK_REF(c, "connected"); + gpr_mu_lock(&c->mu); if (c->connecting_result.transport != NULL) { - publish_transport(exec_ctx, c); + publish_transport_locked(exec_ctx, c); } else if (c->disconnected) { GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); - gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; grpc_connectivity_state_set(exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, "connect_failed"); grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); - gpr_mu_unlock(&c->mu); } + gpr_mu_unlock(&c->mu); + GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { @@ -668,8 +666,8 @@ static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } -void grpc_subchannel_call_ref(grpc_subchannel_call *c - GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_call_ref( + grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); } diff --git a/src/core/iomgr/iocp_windows.c b/src/core/iomgr/iocp_windows.c index 807729708e..fa87e5246b 100644 --- a/src/core/iomgr/iocp_windows.c +++ b/src/core/iomgr/iocp_windows.c @@ -71,7 +71,8 @@ static DWORD deadline_to_millis_timeout(gpr_timespec deadline, timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN))); } -void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { +grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, + gpr_timespec deadline) { BOOL success; DWORD bytes = 0; DWORD flags = 0; @@ -84,14 +85,14 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { g_iocp, &bytes, &completion_key, &overlapped, deadline_to_millis_timeout(deadline, gpr_now(deadline.clock_type))); if (success == 0 && overlapped == NULL) { - return; + return GRPC_IOCP_WORK_TIMEOUT; } GPR_ASSERT(completion_key && overlapped); if (overlapped == &g_iocp_custom_overlap) { gpr_atm_full_fetch_add(&g_custom_events, -1); if (completion_key == (ULONG_PTR)&g_iocp_kick_token) { /* We were awoken from a kick. */ - return; + return GRPC_IOCP_WORK_KICK; } gpr_log(GPR_ERROR, "Unknown custom completion key."); abort(); @@ -121,6 +122,7 @@ void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline) { } gpr_mu_unlock(&socket->state_mu); grpc_exec_ctx_enqueue(exec_ctx, closure, true, NULL); + return GRPC_IOCP_WORK_WORK; } void grpc_iocp_init(void) { @@ -140,10 +142,12 @@ void grpc_iocp_kick(void) { void grpc_iocp_flush(void) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_iocp_work_status work_status; do { - grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); - } while (grpc_exec_ctx_flush(&exec_ctx)); + work_status = grpc_iocp_work(&exec_ctx, gpr_inf_past(GPR_CLOCK_MONOTONIC)); + } while (work_status == GRPC_IOCP_WORK_KICK || + grpc_exec_ctx_flush(&exec_ctx)); } void grpc_iocp_shutdown(void) { diff --git a/src/core/iomgr/iocp_windows.h b/src/core/iomgr/iocp_windows.h index 75f3ba8477..8b2b1aeb5c 100644 --- a/src/core/iomgr/iocp_windows.h +++ b/src/core/iomgr/iocp_windows.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,7 +38,14 @@ #include "src/core/iomgr/socket_windows.h" -void grpc_iocp_work(grpc_exec_ctx *exec_ctx, gpr_timespec deadline); +typedef enum { + GRPC_IOCP_WORK_WORK, + GRPC_IOCP_WORK_TIMEOUT, + GRPC_IOCP_WORK_KICK +} grpc_iocp_work_status; + +grpc_iocp_work_status grpc_iocp_work(grpc_exec_ctx *exec_ctx, + gpr_timespec deadline); void grpc_iocp_init(void); void grpc_iocp_kick(void); void grpc_iocp_flush(void); diff --git a/src/core/iomgr/resolve_address.h b/src/core/iomgr/resolve_address.h index 01eedffa88..b059630457 100644 --- a/src/core/iomgr/resolve_address.h +++ b/src/core/iomgr/resolve_address.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -66,7 +66,7 @@ void grpc_resolved_addresses_destroy(grpc_resolved_addresses *addresses); /* Resolve addr in a blocking fashion. Returns NULL on failure. On success, result must be freed with grpc_resolved_addresses_destroy. */ -grpc_resolved_addresses *grpc_blocking_resolve_address( - const char *addr, const char *default_port); +extern grpc_resolved_addresses *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port); #endif /* GRPC_INTERNAL_CORE_IOMGR_RESOLVE_ADDRESS_H */ diff --git a/src/core/iomgr/resolve_address_posix.c b/src/core/iomgr/resolve_address_posix.c index c51745b918..a6c9893f23 100644 --- a/src/core/iomgr/resolve_address_posix.c +++ b/src/core/iomgr/resolve_address_posix.c @@ -34,18 +34,13 @@ #include <grpc/support/port_platform.h> #ifdef GPR_POSIX_SOCKET -#include "src/core/iomgr/sockaddr.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr.h" +#include <string.h> #include <sys/types.h> #include <sys/un.h> -#include <string.h> -#include "src/core/iomgr/executor.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/sockaddr_utils.h" -#include "src/core/support/block_annotate.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> @@ -53,6 +48,11 @@ #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/executor.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" +#include "src/core/support/string.h" typedef struct { char *name; @@ -62,7 +62,7 @@ typedef struct { void *arg; } request; -grpc_resolved_addresses *grpc_blocking_resolve_address( +static grpc_resolved_addresses *blocking_resolve_address_impl( const char *name, const char *default_port) { struct addrinfo hints; struct addrinfo *result = NULL, *resp; @@ -150,6 +150,9 @@ done: return addrs; } +grpc_resolved_addresses *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port) = blocking_resolve_address_impl; + /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { diff --git a/src/core/iomgr/resolve_address_windows.c b/src/core/iomgr/resolve_address_windows.c index 28c8661e73..472e797163 100644 --- a/src/core/iomgr/resolve_address_windows.c +++ b/src/core/iomgr/resolve_address_windows.c @@ -34,17 +34,12 @@ #include <grpc/support/port_platform.h> #ifdef GPR_WINSOCK_SOCKET -#include "src/core/iomgr/sockaddr.h" #include "src/core/iomgr/resolve_address.h" +#include "src/core/iomgr/sockaddr.h" -#include <sys/types.h> #include <string.h> +#include <sys/types.h> -#include "src/core/iomgr/executor.h" -#include "src/core/iomgr/iomgr_internal.h" -#include "src/core/iomgr/sockaddr_utils.h" -#include "src/core/support/block_annotate.h" -#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> #include <grpc/support/log.h> @@ -52,6 +47,11 @@ #include <grpc/support/string_util.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> +#include "src/core/iomgr/executor.h" +#include "src/core/iomgr/iomgr_internal.h" +#include "src/core/iomgr/sockaddr_utils.h" +#include "src/core/support/block_annotate.h" +#include "src/core/support/string.h" typedef struct { char *name; @@ -61,7 +61,7 @@ typedef struct { void *arg; } request; -grpc_resolved_addresses *grpc_blocking_resolve_address( +static grpc_resolved_addresses *blocking_resolve_address_impl( const char *name, const char *default_port) { struct addrinfo hints; struct addrinfo *result = NULL, *resp; @@ -133,6 +133,9 @@ done: return addrs; } +grpc_resolved_addresses *(*grpc_blocking_resolve_address)( + const char *name, const char *default_port) = blocking_resolve_address_impl; + /* Callback to be passed to grpc_executor to asynch-ify * grpc_blocking_resolve_address */ static void do_request_thread(grpc_exec_ctx *exec_ctx, void *rp, bool success) { diff --git a/src/core/iomgr/tcp_server_windows.c b/src/core/iomgr/tcp_server_windows.c index ce930b8f41..a4abc5b974 100644 --- a/src/core/iomgr/tcp_server_windows.c +++ b/src/core/iomgr/tcp_server_windows.c @@ -240,8 +240,7 @@ static void decrement_active_ports_and_notify(grpc_exec_ctx *exec_ctx, sp->shutting_down = 0; gpr_mu_lock(&sp->server->mu); GPR_ASSERT(sp->server->active_ports > 0); - if (0 == --sp->server->active_ports && - sp->server->shutdown_complete != NULL) { + if (0 == --sp->server->active_ports) { notify = 1; } gpr_mu_unlock(&sp->server->mu); diff --git a/src/core/iomgr/timer.c b/src/core/iomgr/timer.c index 8379fffad0..f444643428 100644 --- a/src/core/iomgr/timer.c +++ b/src/core/iomgr/timer.c @@ -33,11 +33,11 @@ #include "src/core/iomgr/timer.h" -#include "src/core/iomgr/timer_heap.h" -#include "src/core/iomgr/time_averaged_stats.h" #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> +#include "src/core/iomgr/time_averaged_stats.h" +#include "src/core/iomgr/timer_heap.h" #define INVALID_HEAP_INDEX 0xffffffffu @@ -330,6 +330,18 @@ static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_mu_unlock(&g_mu); gpr_mu_unlock(&g_checker_mu); + } else if (next != NULL) { + /* TODO(ctiller): this forces calling code to do an short poll, and + then retry the timer check (because this time through the timer list was + contended). + + We could reduce the cost here dramatically by keeping a count of how many + currently active pollers got through the uncontended case above + successfully, and waking up other pollers IFF that count drops to zero. + + Once that count is in place, this entire else branch could disappear. */ + *next = gpr_time_min( + *next, gpr_time_add(now, gpr_time_from_millis(1, GPR_TIMESPAN))); } return (int)n; diff --git a/src/core/iomgr/timer.h b/src/core/iomgr/timer.h index 9ad1e92f42..e239e884e7 100644 --- a/src/core/iomgr/timer.h +++ b/src/core/iomgr/timer.h @@ -96,7 +96,6 @@ void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer); *next is never guaranteed to be updated on any given execution; however, with high probability at least one thread in the system will see an update at any time slice. */ - bool grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now, gpr_timespec *next); void grpc_timer_list_init(gpr_timespec now); diff --git a/src/core/iomgr/timer_heap.c b/src/core/iomgr/timer_heap.c index 9d8be5c1fc..b5df566c45 100644 --- a/src/core/iomgr/timer_heap.c +++ b/src/core/iomgr/timer_heap.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -46,7 +46,7 @@ static void adjust_upwards(grpc_timer **first, uint32_t i, grpc_timer *t) { while (i > 0) { uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(first[parent]->deadline, t->deadline) >= 0) break; + if (gpr_time_cmp(first[parent]->deadline, t->deadline) <= 0) break; first[i] = first[parent]; first[i]->heap_index = i; i = parent; @@ -62,16 +62,14 @@ static void adjust_downwards(grpc_timer **first, uint32_t i, uint32_t length, grpc_timer *t) { for (;;) { uint32_t left_child = 1u + 2u * i; - uint32_t right_child; - uint32_t next_i; if (left_child >= length) break; - right_child = left_child + 1; - next_i = right_child < length && - gpr_time_cmp(first[left_child]->deadline, - first[right_child]->deadline) < 0 - ? right_child - : left_child; - if (gpr_time_cmp(t->deadline, first[next_i]->deadline) >= 0) break; + uint32_t right_child = left_child + 1; + uint32_t next_i = right_child < length && + gpr_time_cmp(first[left_child]->deadline, + first[right_child]->deadline) > 0 + ? right_child + : left_child; + if (gpr_time_cmp(t->deadline, first[next_i]->deadline) <= 0) break; first[i] = first[next_i]; first[i]->heap_index = i; i = next_i; @@ -95,7 +93,7 @@ static void maybe_shrink(grpc_timer_heap *heap) { static void note_changed_priority(grpc_timer_heap *heap, grpc_timer *timer) { uint32_t i = timer->heap_index; uint32_t parent = (uint32_t)(((int)i - 1) / 2); - if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) < 0) { + if (gpr_time_cmp(heap->timers[parent]->deadline, timer->deadline) > 0) { adjust_upwards(heap->timers, i, timer); } else { adjust_downwards(heap->timers, i, heap->timer_count, timer); diff --git a/src/core/support/alloc.c b/src/core/support/alloc.c index 0a064b2c18..b99584bd20 100644 --- a/src/core/support/alloc.c +++ b/src/core/support/alloc.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -87,4 +87,4 @@ void *gpr_malloc_aligned(size_t size, size_t alignment_log) { return (void *)ret; } -void gpr_free_aligned(void *ptr) { free(((void **)ptr)[-1]); } +void gpr_free_aligned(void *ptr) { gpr_free(((void **)ptr)[-1]); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index f6a95ebbd3..b22818ea87 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -86,7 +86,7 @@ struct grpc_completion_queue { #define POLLSET_FROM_CQ(cq) ((grpc_pollset *)(cq + 1)) static gpr_mu g_freelist_mu; -grpc_completion_queue *g_freelist; +static grpc_completion_queue *g_freelist; static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc, bool success); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 620c57f3bc..b16768d06e 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -851,9 +851,11 @@ static void perform_stream_op_locked( if (stream_global->write_closed) { grpc_chttp2_complete_closure_step( exec_ctx, &stream_global->send_message_finished, 0); - } else if (stream_global->id != 0) { + } else { stream_global->send_message = op->send_message; - grpc_chttp2_become_writable(transport_global, stream_global); + if (stream_global->id != 0) { + grpc_chttp2_become_writable(transport_global, stream_global); + } } } diff --git a/src/core/tsi/ssl_transport_security.c b/src/core/tsi/ssl_transport_security.c index 6adcaac9ed..fcbd910f07 100644 --- a/src/core/tsi/ssl_transport_security.c +++ b/src/core/tsi/ssl_transport_security.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -33,9 +33,18 @@ #include "src/core/tsi/ssl_transport_security.h" +#include <grpc/support/port_platform.h> + #include <limits.h> #include <string.h> +/* TODO(jboeuf): refactor inet_ntop into a portability header. */ +#ifdef GPR_WINSOCK_SOCKET +#include <ws2tcpip.h> +#else +#include <arpa/inet.h> +#endif + #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/thd.h> @@ -197,13 +206,16 @@ static void ssl_info_callback(const SSL *ssl, int where, int ret) { } /* Returns 1 if name looks like an IP address, 0 otherwise. - This is a very rough heuristic as it does not handle IPV6 or things like: - 0300.0250.00.01, 0xC0.0Xa8.0x0.0x1, 000030052000001, 0xc0.052000001 */ + This is a very rough heuristic, and only handles IPv6 in hexadecimal form. */ static int looks_like_ip_address(const char *name) { size_t i; size_t dot_count = 0; size_t num_size = 0; for (i = 0; i < strlen(name); i++) { + if (name[i] == ':') { + /* IPv6 Address in hexadecimal form, : is not allowed in DNS names. */ + return 1; + } if (name[i] >= '0' && name[i] <= '9') { if (num_size > 3) return 0; num_size++; @@ -296,21 +308,44 @@ static tsi_result add_subject_alt_names_properties_to_peer( sk_GENERAL_NAME_value(subject_alt_names, TSI_SIZE_AS_SIZE(i)); /* Filter out the non-dns entries names. */ if (subject_alt_name->type == GEN_DNS) { - unsigned char *dns_name = NULL; - int dns_name_size = - ASN1_STRING_to_UTF8(&dns_name, subject_alt_name->d.dNSName); - if (dns_name_size < 0) { + unsigned char *name = NULL; + int name_size; + name_size = ASN1_STRING_to_UTF8(&name, subject_alt_name->d.dNSName); + if (name_size < 0) { gpr_log(GPR_ERROR, "Could not get utf8 from asn1 string."); result = TSI_INTERNAL_ERROR; break; } result = tsi_construct_string_peer_property( - TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY, - (const char *)dns_name, (size_t)dns_name_size, + TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY, (const char *)name, + (size_t)name_size, &peer->properties[peer->property_count++]); + OPENSSL_free(name); + } else if (subject_alt_name->type == GEN_IPADD) { + char ntop_buf[INET6_ADDRSTRLEN]; + int af; + + if (subject_alt_name->d.iPAddress->length == 4) { + af = AF_INET; + } else if (subject_alt_name->d.iPAddress->length == 16) { + af = AF_INET6; + } else { + gpr_log(GPR_ERROR, "SAN IP Address contained invalid IP"); + result = TSI_INTERNAL_ERROR; + break; + } + const char *name = inet_ntop(af, subject_alt_name->d.iPAddress->data, + ntop_buf, INET6_ADDRSTRLEN); + if (name == NULL) { + gpr_log(GPR_ERROR, "Could not get IP string from asn1 octet."); + result = TSI_INTERNAL_ERROR; + break; + } + + result = tsi_construct_string_peer_property_from_cstring( + TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY, name, &peer->properties[peer->property_count++]); - OPENSSL_free(dns_name); - if (result != TSI_OK) break; } + if (result != TSI_OK) break; } return result; } @@ -1436,9 +1471,7 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name) { size_t i = 0; size_t san_count = 0; const tsi_peer_property *cn_property = NULL; - - /* For now reject what looks like an IP address. */ - if (looks_like_ip_address(name)) return 0; + int like_ip = looks_like_ip_address(name); /* Check the SAN first. */ for (i = 0; i < peer->property_count; i++) { @@ -1447,8 +1480,15 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name) { if (strcmp(property->name, TSI_X509_SUBJECT_ALTERNATIVE_NAME_PEER_PROPERTY) == 0) { san_count++; - if (does_entry_match_name(property->value.data, property->value.length, - name)) { + + if (!like_ip && does_entry_match_name(property->value.data, + property->value.length, name)) { + return 1; + } else if (like_ip && + strncmp(name, property->value.data, property->value.length) == + 0 && + strlen(name) == property->value.length) { + /* IP Addresses are exact matches only. */ return 1; } } else if (strcmp(property->name, @@ -1457,8 +1497,8 @@ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name) { } } - /* If there's no SAN, try the CN. */ - if (san_count == 0 && cn_property != NULL) { + /* If there's no SAN, try the CN, but only if its not like an IP Address */ + if (san_count == 0 && cn_property != NULL && !like_ip) { if (does_entry_match_name(cn_property->value.data, cn_property->value.length, name)) { return 1; diff --git a/src/core/tsi/ssl_transport_security.h b/src/core/tsi/ssl_transport_security.h index 51c0003a85..4909af4c47 100644 --- a/src/core/tsi/ssl_transport_security.h +++ b/src/core/tsi/ssl_transport_security.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -162,8 +162,7 @@ void tsi_ssl_handshaker_factory_destroy(tsi_ssl_handshaker_factory *self); Still TODO(jboeuf): - handle mixed case. - handle %encoded chars. - - handle public suffix wildchar more strictly (e.g. *.co.uk) - - handle IP addresses in SAN. */ + - handle public suffix wildchar more strictly (e.g. *.co.uk) */ int tsi_ssl_peer_matches_name(const tsi_peer *peer, const char *name); #ifdef __cplusplus |