diff options
Diffstat (limited to 'src/core/ext/client_channel/subchannel.c')
-rw-r--r-- | src/core/ext/client_channel/subchannel.c | 121 |
1 files changed, 87 insertions, 34 deletions
diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index b83c6882dc..8bd284507d 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -38,14 +38,19 @@ #include <grpc/support/alloc.h> #include <grpc/support/avl.h> +#include <grpc/support/string_util.h> #include "src/core/ext/client_channel/client_channel.h" #include "src/core/ext/client_channel/initial_connect_string.h" +#include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/subchannel_index.h" +#include "src/core/ext/client_channel/uri_parser.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" +#include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/profiling/timers.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/backoff.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_init.h" @@ -94,8 +99,6 @@ struct grpc_subchannel { size_t num_filters; /** channel arguments */ grpc_channel_args *args; - /** address to connect to */ - grpc_resolved_address *addr; grpc_subchannel_key *key; @@ -108,6 +111,9 @@ struct grpc_subchannel { /** callback for connection finishing */ grpc_closure connected; + /** callback for our alarm */ + grpc_closure on_alarm; + /** pollset_set tracking who's interested in a connection being setup */ grpc_pollset_set *pollset_set; @@ -206,9 +212,8 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_subchannel *c = arg; gpr_free((void *)c->filters); - grpc_channel_args_destroy(c->args); - gpr_free(c->addr); - grpc_slice_unref(c->initial_connect_string); + grpc_channel_args_destroy(exec_ctx, c->args); + grpc_slice_unref_internal(exec_ctx, c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); grpc_pollset_set_destroy(c->pollset_set); @@ -293,8 +298,9 @@ void grpc_subchannel_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_refs; old_refs = ref_mutate(c, -(gpr_atm)1, 1 REF_MUTATE_PURPOSE("WEAK_UNREF")); if (old_refs == 1) { - grpc_exec_ctx_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c), - GRPC_ERROR_NONE, NULL); + grpc_closure_sched(exec_ctx, grpc_closure_create(subchannel_destroy, c, + grpc_schedule_on_exec_ctx), + GRPC_ERROR_NONE); } } @@ -322,30 +328,38 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } else { c->filters = NULL; } - c->addr = gpr_malloc(sizeof(grpc_resolved_address)); - if (args->addr->len) - memcpy(c->addr, args->addr, sizeof(grpc_resolved_address)); c->pollset_set = grpc_pollset_set_create(); - grpc_set_initial_connect_string(&c->addr, &c->initial_connect_string); - c->args = grpc_channel_args_copy(args->args); + grpc_resolved_address *addr = gpr_malloc(sizeof(*addr)); + grpc_get_subchannel_address_arg(args->args, addr); + grpc_set_initial_connect_string(&addr, &c->initial_connect_string); + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); + gpr_free(addr); + c->args = grpc_channel_args_copy_and_add_and_remove( + args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); + gpr_free(new_arg.value.string); + c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; - grpc_closure_init(&c->connected, subchannel_connected, c); + grpc_closure_init(&c->connected, subchannel_connected, c, + grpc_schedule_on_exec_ctx); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); int initial_backoff_ms = GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS * 1000; int max_backoff_ms = GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000; + int min_backoff_ms = GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS * 1000; bool fixed_reconnect_backoff = false; if (c->args) { for (size_t i = 0; i < c->args->num_args; i++) { if (0 == strcmp(c->args->args[i].key, - "grpc.testing.fixed_reconnect_backoff")) { + "grpc.testing.fixed_reconnect_backoff_ms")) { GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER); fixed_reconnect_backoff = true; - initial_backoff_ms = max_backoff_ms = grpc_channel_arg_get_integer( - &c->args->args[i], - (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); + initial_backoff_ms = min_backoff_ms = max_backoff_ms = + grpc_channel_arg_get_integer( + &c->args->args[i], + (grpc_integer_options){initial_backoff_ms, 100, INT_MAX}); } else if (0 == strcmp(c->args->args[i].key, GRPC_ARG_MAX_RECONNECT_BACKOFF_MS)) { fixed_reconnect_backoff = false; @@ -362,11 +376,11 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } } gpr_backoff_init( - &c->backoff_state, + &c->backoff_state, initial_backoff_ms, fixed_reconnect_backoff ? 1.0 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER, fixed_reconnect_backoff ? 0.0 : GRPC_SUBCHANNEL_RECONNECT_JITTER, - initial_backoff_ms, max_backoff_ms); + min_backoff_ms, max_backoff_ms); gpr_mu_init(&c->mu); return grpc_subchannel_index_register(exec_ctx, key, c); @@ -377,7 +391,6 @@ static void continue_connect_locked(grpc_exec_ctx *exec_ctx, grpc_connect_in_args args; args.interested_parties = c->pollset_set; - args.addr = c->addr; args.deadline = c->next_attempt; args.channel_args = c->args; args.initial_connect_string = c->initial_connect_string; @@ -478,7 +491,8 @@ static void maybe_start_connecting_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Retry in %" PRId64 ".%09d seconds", time_til_next.tv_sec, time_til_next.tv_nsec); } - grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now); + grpc_closure_init(&c->on_alarm, on_alarm, c, grpc_schedule_on_exec_ctx); + grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, &c->on_alarm, now); } } @@ -503,7 +517,8 @@ void grpc_subchannel_notify_on_state_change( w->subchannel = c; w->pollset_set = interested_parties; w->notify = notify; - grpc_closure_init(&w->closure, on_external_state_watcher_done, w); + grpc_closure_init(&w->closure, on_external_state_watcher_done, w, + grpc_schedule_on_exec_ctx); if (interested_parties != NULL) { grpc_pollset_set_add_pollset_set(exec_ctx, c->pollset_set, interested_parties); @@ -598,18 +613,24 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, /* construct channel stack */ grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( - builder, c->connecting_result.channel_args); + exec_ctx, builder, c->connecting_result.channel_args); grpc_channel_stack_builder_set_transport(builder, c->connecting_result.transport); - if (grpc_channel_init_create_stack(exec_ctx, builder, - GRPC_CLIENT_SUBCHANNEL)) { - con = grpc_channel_stack_builder_finish(exec_ctx, builder, 0, 1, - connection_destroy, NULL); - } else { - grpc_channel_stack_builder_destroy(builder); + if (!grpc_channel_init_create_stack(exec_ctx, builder, + GRPC_CLIENT_SUBCHANNEL)) { + grpc_channel_stack_builder_destroy(exec_ctx, builder); abort(); /* TODO(ctiller): what to do here (previously we just crashed) */ } + grpc_error *error = grpc_channel_stack_builder_finish( + exec_ctx, builder, 0, 1, connection_destroy, NULL, (void **)&con); + if (error != GRPC_ERROR_NONE) { + const char *msg = grpc_error_string(error); + gpr_log(GPR_ERROR, "error initializing subchannel stack: %s", msg); + grpc_error_free_string(msg); + GRPC_ERROR_UNREF(error); + abort(); /* TODO(ctiller): what to do here? */ + } stk = CHANNEL_STACK_FROM_CONNECTION(con); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -618,7 +639,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, sw_subchannel->subchannel = c; sw_subchannel->connectivity_state = GRPC_CHANNEL_READY; grpc_closure_init(&sw_subchannel->closure, subchannel_on_child_state_changed, - sw_subchannel); + sw_subchannel, grpc_schedule_on_exec_ctx); if (c->disconnected) { gpr_free(sw_subchannel); @@ -637,9 +658,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); /* setup subchannel watching connected subchannel for changes; subchannel - ref - for connecting is donated - to the state watcher */ + ref for connecting is donated to the state watcher */ GRPC_SUBCHANNEL_WEAK_REF(c, "state_watcher"); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); grpc_connected_subchannel_notify_on_state_change( @@ -680,7 +699,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, } gpr_mu_unlock(&c->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connected"); - grpc_channel_args_destroy(delete_channel_args); + grpc_channel_args_destroy(exec_ctx, delete_channel_args); } /* @@ -757,3 +776,37 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call) { return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); } + +static void grpc_uri_to_sockaddr(char *uri_str, grpc_resolved_address *addr) { + grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); + GPR_ASSERT(uri != NULL); + if (strcmp(uri->scheme, "ipv4") == 0) { + GPR_ASSERT(parse_ipv4(uri, addr)); + } else if (strcmp(uri->scheme, "ipv6") == 0) { + GPR_ASSERT(parse_ipv6(uri, addr)); + } else { + GPR_ASSERT(parse_unix(uri, addr)); + } + grpc_uri_destroy(uri); +} + +void grpc_get_subchannel_address_arg(const grpc_channel_args *args, + grpc_resolved_address *addr) { + const grpc_arg *addr_arg = + grpc_channel_args_find(args, GRPC_ARG_SUBCHANNEL_ADDRESS); + GPR_ASSERT(addr_arg != NULL); // Should have been set by LB policy. + GPR_ASSERT(addr_arg->type == GRPC_ARG_STRING); + memset(addr, 0, sizeof(*addr)); + if (*addr_arg->value.string != '\0') { + grpc_uri_to_sockaddr(addr_arg->value.string, addr); + } +} + +grpc_arg grpc_create_subchannel_address_arg(const grpc_resolved_address *addr) { + grpc_arg new_arg; + new_arg.key = GRPC_ARG_SUBCHANNEL_ADDRESS; + new_arg.type = GRPC_ARG_STRING; + new_arg.value.string = + addr->len > 0 ? grpc_sockaddr_to_uri(addr) : gpr_strdup(""); + return new_arg; +} |