diff options
Diffstat (limited to 'src/core/surface/channel_create.c')
-rw-r--r-- | src/core/surface/channel_create.c | 235 |
1 files changed, 102 insertions, 133 deletions
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index d069a04a9a..e205f0a9f8 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -31,159 +31,120 @@ * */ -#include "src/core/iomgr/sockaddr.h" - #include <grpc/grpc.h> #include <stdlib.h> #include <string.h> -#include "src/core/channel/census_filter.h" +#include <grpc/support/alloc.h> + #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" -#include "src/core/channel/client_setup.h" -#include "src/core/channel/connected_channel.h" #include "src/core/channel/http_client_filter.h" -#include "src/core/iomgr/endpoint.h" -#include "src/core/iomgr/resolve_address.h" +#include "src/core/client_config/resolver_registry.h" #include "src/core/iomgr/tcp_client.h" #include "src/core/surface/channel.h" -#include "src/core/surface/client.h" -#include "src/core/support/string.h" #include "src/core/transport/chttp2_transport.h" -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/sync.h> -#include <grpc/support/useful.h> - -typedef struct setup setup; -/* A single setup request (started via initiate) */ typedef struct { - grpc_client_setup_request *cs_request; - setup *setup; - /* Resolved addresses, or null if resolution not yet completed */ - grpc_resolved_addresses *resolved; - /* which address in resolved should we pick for the next connection attempt */ - size_t resolved_index; -} request; - -/* Global setup logic (may be running many simultaneous setup requests, but - with only one 'active' */ -struct setup { - const char *target; - grpc_transport_setup_callback setup_callback; - void *setup_user_data; -}; - -static int maybe_try_next_resolved(request *r); - -static void done(request *r, int was_successful) { - grpc_client_setup_request_finish(r->cs_request, was_successful); - if (r->resolved) { - grpc_resolved_addresses_destroy(r->resolved); - } - gpr_free(r); + grpc_connector base; + gpr_refcount refs; + + grpc_iomgr_closure *notify; + grpc_connect_in_args args; + grpc_connect_out_args *result; +} connector; + +static void connector_ref(grpc_connector *con) { + connector *c = (connector *)con; + gpr_ref(&c->refs); } -/* connection callback: tcp is either valid, or null on error */ -static void on_connect(void *rp, grpc_endpoint *tcp) { - request *r = rp; - - if (!grpc_client_setup_request_should_continue(r->cs_request, "on_connect")) { - if (tcp) { - grpc_endpoint_shutdown(tcp); - grpc_endpoint_destroy(tcp); - } - done(r, 0); - return; +static void connector_unref(grpc_connector *con) { + connector *c = (connector *)con; + if (gpr_unref(&c->refs)) { + gpr_free(c); } +} - if (!tcp) { - if (!maybe_try_next_resolved(r)) { - done(r, 0); - return; - } else { - return; - } - } else if (grpc_client_setup_cb_begin(r->cs_request, "on_connect")) { - grpc_create_chttp2_transport( - r->setup->setup_callback, r->setup->setup_user_data, - grpc_client_setup_get_channel_args(r->cs_request), tcp, NULL, 0, - grpc_client_setup_get_mdctx(r->cs_request), 1); - grpc_client_setup_cb_end(r->cs_request, "on_connect"); - done(r, 1); - return; +static void connected(void *arg, grpc_endpoint *tcp) { + connector *c = arg; + grpc_iomgr_closure *notify; + if (tcp != NULL) { + c->result->transport = grpc_create_chttp2_transport( + c->args.channel_args, tcp, c->args.metadata_context, 1); + grpc_chttp2_transport_start_reading(c->result->transport, NULL, 0); + GPR_ASSERT(c->result->transport); + c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *)); + c->result->filters[0] = &grpc_http_client_filter; + c->result->num_filters = 1; } else { - done(r, 0); + memset(c->result, 0, sizeof(*c->result)); } + notify = c->notify; + c->notify = NULL; + grpc_iomgr_add_callback(notify); } -/* attempt to connect to the next available resolved address */ -static int maybe_try_next_resolved(request *r) { - grpc_resolved_address *addr; - if (!r->resolved) return 0; - if (r->resolved_index == r->resolved->naddrs) return 0; - addr = &r->resolved->addrs[r->resolved_index++]; - grpc_tcp_client_connect( - on_connect, r, grpc_client_setup_get_interested_parties(r->cs_request), - (struct sockaddr *)&addr->addr, addr->len, - grpc_client_setup_request_deadline(r->cs_request)); - return 1; +static void connector_connect(grpc_connector *con, + const grpc_connect_in_args *args, + grpc_connect_out_args *result, + grpc_iomgr_closure *notify) { + connector *c = (connector *)con; + GPR_ASSERT(c->notify == NULL); + GPR_ASSERT(notify->cb); + c->notify = notify; + c->args = *args; + c->result = result; + grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, + args->addr_len, args->deadline); } -/* callback for when our target address has been resolved */ -static void on_resolved(void *rp, grpc_resolved_addresses *resolved) { - request *r = rp; - - /* if we're not still the active request, abort */ - if (!grpc_client_setup_request_should_continue(r->cs_request, - "on_resolved")) { - if (resolved) { - grpc_resolved_addresses_destroy(resolved); - } - done(r, 0); - return; - } +static const grpc_connector_vtable connector_vtable = { + connector_ref, connector_unref, connector_connect}; - if (!resolved) { - done(r, 0); - return; - } else { - r->resolved = resolved; - r->resolved_index = 0; - if (!maybe_try_next_resolved(r)) { - done(r, 0); - } - } +typedef struct { + grpc_subchannel_factory base; + gpr_refcount refs; + grpc_mdctx *mdctx; + grpc_channel_args *merge_args; +} subchannel_factory; + +static void subchannel_factory_ref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + gpr_ref(&f->refs); } -static void initiate_setup(void *sp, grpc_client_setup_request *cs_request) { - request *r = gpr_malloc(sizeof(request)); - r->setup = sp; - r->cs_request = cs_request; - r->resolved = NULL; - r->resolved_index = 0; - /* TODO(klempner): Make grpc_resolve_address respect deadline */ - grpc_resolve_address(r->setup->target, "http", on_resolved, r); +static void subchannel_factory_unref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + if (gpr_unref(&f->refs)) { + grpc_channel_args_destroy(f->merge_args); + grpc_mdctx_unref(f->mdctx); + gpr_free(f); + } } -static void done_setup(void *sp) { - setup *s = sp; - gpr_free((void *)s->target); - gpr_free(s); +static grpc_subchannel *subchannel_factory_create_subchannel( + grpc_subchannel_factory *scf, grpc_subchannel_args *args) { + subchannel_factory *f = (subchannel_factory *)scf; + connector *c = gpr_malloc(sizeof(*c)); + grpc_channel_args *final_args = + grpc_channel_args_merge(args->args, f->merge_args); + grpc_subchannel *s; + memset(c, 0, sizeof(*c)); + c->base.vtable = &connector_vtable; + gpr_ref_init(&c->refs, 1); + args->mdctx = f->mdctx; + args->args = final_args; + s = grpc_subchannel_create(&c->base, args); + grpc_connector_unref(&c->base); + grpc_channel_args_destroy(final_args); + return s; } -static grpc_transport_setup_result complete_setup(void *channel_stack, - grpc_transport *transport, - grpc_mdctx *mdctx) { - static grpc_channel_filter const *extra_filters[] = { - &grpc_http_client_filter}; - return grpc_client_channel_transport_setup_complete( - channel_stack, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), - mdctx); -} +static const grpc_subchannel_factory_vtable subchannel_factory_vtable = { + subchannel_factory_ref, subchannel_factory_unref, + subchannel_factory_create_subchannel}; /* Create a client channel: Asynchronously: - resolve target @@ -191,28 +152,36 @@ static grpc_transport_setup_result complete_setup(void *channel_stack, - perform handshakes */ grpc_channel *grpc_channel_create(const char *target, const grpc_channel_args *args) { - setup *s = gpr_malloc(sizeof(setup)); - grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_channel *channel = NULL; #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; + grpc_resolver *resolver; + subchannel_factory *f; + grpc_mdctx *mdctx = grpc_mdctx_create(); int n = 0; - filters[n++] = &grpc_client_surface_filter; /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { filters[n++] = &grpc_client_census_filter; } */ filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); - s->target = gpr_strdup(target); - s->setup_callback = complete_setup; - s->setup_user_data = grpc_channel_get_channel_stack(channel); + f = gpr_malloc(sizeof(*f)); + f->base.vtable = &subchannel_factory_vtable; + gpr_ref_init(&f->refs, 1); + grpc_mdctx_ref(mdctx); + f->mdctx = mdctx; + f->merge_args = grpc_channel_args_copy(args); + resolver = grpc_resolver_create(target, &f->base); + if (!resolver) { + return NULL; + } - grpc_client_setup_create_and_attach(grpc_channel_get_channel_stack(channel), - args, mdctx, initiate_setup, done_setup, - s); + channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); + grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), + resolver); + GRPC_RESOLVER_UNREF(resolver, "create"); + grpc_subchannel_factory_unref(&f->base); return channel; } |