diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-26 16:08:21 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-26 16:08:21 -0700 |
commit | 5f84c8478ab61c30cc98d956bbe327e02142d2e6 (patch) | |
tree | b2ee548b4335ff14f81654ac4ff6b2b1e9355a46 /src/core/surface/channel_create.c | |
parent | 0c5cf25d4e40f040cc1c297afd1485c2c35753f3 (diff) |
Connector progress
Diffstat (limited to 'src/core/surface/channel_create.c')
-rw-r--r-- | src/core/surface/channel_create.c | 67 |
1 files changed, 60 insertions, 7 deletions
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index ba4e1a24ec..46d1d708dd 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -41,11 +41,18 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/client_config/resolver_registry.h" +#include "src/core/iomgr/tcp_client.h" #include "src/core/surface/channel.h" +#include "src/core/transport/chttp2_transport.h" typedef struct { grpc_connector base; gpr_refcount refs; + + grpc_transport **transport; + grpc_iomgr_closure *notify; + const grpc_channel_args *args; + grpc_mdctx *mdctx; } connector; static void connector_ref(grpc_connector *con) { @@ -60,28 +67,68 @@ static void connector_unref(grpc_connector *con) { } } -static void connector_connect(grpc_connector *connector, const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, grpc_transport **transport, grpc_iomgr_closure *notify) { - abort(); +static void connected(void *arg, grpc_endpoint *tcp) { + connector *c = arg; + grpc_iomgr_closure *notify; + if (tcp != NULL) { + *c->transport = + grpc_create_chttp2_transport(c->args, tcp, NULL, 0, c->mdctx, 1); + } else { + *c->transport = NULL; + } + notify = c->notify; + c->notify = NULL; + grpc_iomgr_add_callback(notify); +} + +static void connector_connect( + grpc_connector *con, grpc_pollset_set *pollset_set, + const struct sockaddr *addr, int addr_len, gpr_timespec deadline, + const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, + grpc_transport **transport, grpc_iomgr_closure *notify) { + connector *c = (connector *)con; + GPR_ASSERT(c->notify == NULL); + c->notify = notify; + c->args = channel_args; + c->mdctx = metadata_context; + grpc_tcp_client_connect(connected, c, pollset_set, addr, addr_len, deadline); } static const grpc_connector_vtable connector_vtable = {connector_ref, connector_unref, connector_connect}; -static void subchannel_factory_ref(grpc_subchannel_factory *scf) {} +typedef struct { + grpc_subchannel_factory base; + gpr_refcount refs; + grpc_mdctx *mdctx; +} subchannel_factory; + +static void subchannel_factory_ref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + gpr_ref(&f->refs); +} -static void subchannel_factory_unref(grpc_subchannel_factory *scf) {} +static void subchannel_factory_unref(grpc_subchannel_factory *scf) { + subchannel_factory *f = (subchannel_factory *)scf; + if (gpr_unref(&f->refs)) { + grpc_mdctx_unref(f->mdctx); + gpr_free(f); + } +} static grpc_subchannel *subchannel_factory_create_subchannel(grpc_subchannel_factory *scf, grpc_subchannel_args *args) { + subchannel_factory *f = (subchannel_factory *)scf; connector *c = gpr_malloc(sizeof(*c)); grpc_subchannel *s; + memset(c, 0, sizeof(*c)); c->base.vtable = &connector_vtable; gpr_ref_init(&c->refs, 1); + args->mdctx = f->mdctx; s = grpc_subchannel_create(&c->base, args); grpc_connector_unref(&c->base); return s; } static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {subchannel_factory_ref, subchannel_factory_unref, subchannel_factory_create_subchannel}; -static grpc_subchannel_factory subchannel_factory = {&subchannel_factory_vtable}; /* Create a client channel: Asynchronously: - resolve target @@ -93,6 +140,8 @@ grpc_channel *grpc_channel_create(const char *target, #define MAX_FILTERS 3 const grpc_channel_filter *filters[MAX_FILTERS]; grpc_resolver *resolver; + subchannel_factory *f; + grpc_mdctx *mdctx = grpc_mdctx_create(); int n = 0; /* TODO(census) if (grpc_channel_args_is_census_enabled(args)) { @@ -101,12 +150,16 @@ grpc_channel *grpc_channel_create(const char *target, filters[n++] = &grpc_client_channel_filter; GPR_ASSERT(n <= MAX_FILTERS); - resolver = grpc_resolver_create(target, &subchannel_factory); + f = gpr_malloc(sizeof(*f)); + f->base.vtable = &subchannel_factory_vtable; + gpr_ref_init(&f->refs, 1); + f->mdctx = mdctx; + resolver = grpc_resolver_create(target, &f->base); if (!resolver) { return NULL; } - channel = grpc_channel_create_from_filters(filters, n, args, grpc_mdctx_create(), 1); + channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); return channel; |