diff options
author | Adam Michalik <xyzzyz@google.com> | 2016-04-27 16:52:51 -0700 |
---|---|---|
committer | Adam Michalik <xyzzyz@google.com> | 2016-06-07 15:04:36 -0700 |
commit | 6e9fab5ed67106da998ec368e8882ba7a245a9b1 (patch) | |
tree | 5c157c18a665a39bebc94841cf7d695e8f5e3885 /src/core/ext | |
parent | d30d4e279c4a63effaa6e912fc00bd4ad96054c7 (diff) |
Add support for establishing client channel using existing FD
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/transport/chttp2/client/insecure/channel_create.c | 141 |
1 files changed, 123 insertions, 18 deletions
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index c5d3d8d9cc..8ed216db7b 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -33,6 +33,7 @@ #include <grpc/grpc.h> +#include <fcntl.h> #include <stdlib.h> #include <string.h> @@ -47,6 +48,7 @@ #include "src/core/lib/channel/compress_filter.h" #include "src/core/lib/channel/http_client_filter.h" #include "src/core/lib/iomgr/tcp_client.h" +#include "src/core/lib/iomgr/tcp_posix.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" @@ -139,31 +141,31 @@ typedef struct { gpr_refcount refs; grpc_channel_args *merge_args; grpc_channel *master; -} client_channel_factory; +} client_tcp_channel_factory; -static void client_channel_factory_ref( +static void client_tcp_channel_factory_ref( grpc_client_channel_factory *cc_factory) { - client_channel_factory *f = (client_channel_factory *)cc_factory; + client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory; gpr_ref(&f->refs); } -static void client_channel_factory_unref( +static void client_tcp_channel_factory_unref( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { - client_channel_factory *f = (client_channel_factory *)cc_factory; + client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory; if (gpr_unref(&f->refs)) { if (f->master != NULL) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, - "client_channel_factory"); + "client_tcp_channel_factory"); } grpc_channel_args_destroy(f->merge_args); gpr_free(f); } } -static grpc_subchannel *client_channel_factory_create_subchannel( +static grpc_subchannel *client_tcp_channel_factory_create_subchannel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, grpc_subchannel_args *args) { - client_channel_factory *f = (client_channel_factory *)cc_factory; + client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory; connector *c = gpr_malloc(sizeof(*c)); grpc_channel_args *final_args = grpc_channel_args_merge(args->args, f->merge_args); @@ -178,11 +180,11 @@ static grpc_subchannel *client_channel_factory_create_subchannel( return s; } -static grpc_channel *client_channel_factory_create_channel( +static grpc_channel *client_tcp_channel_factory_create_channel( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, const char *target, grpc_client_channel_type type, grpc_channel_args *args) { - client_channel_factory *f = (client_channel_factory *)cc_factory; + client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory; grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args); grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args, GRPC_CLIENT_CHANNEL, NULL); @@ -190,7 +192,7 @@ static grpc_channel *client_channel_factory_create_channel( grpc_resolver *resolver = grpc_resolver_create(target, &f->base); if (!resolver) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, - "client_channel_factory_create_channel"); + "client_tcp_channel_factory_create_channel"); return NULL; } @@ -201,10 +203,11 @@ static grpc_channel *client_channel_factory_create_channel( return channel; } -static const grpc_client_channel_factory_vtable client_channel_factory_vtable = - {client_channel_factory_ref, client_channel_factory_unref, - client_channel_factory_create_subchannel, - client_channel_factory_create_channel}; +static const grpc_client_channel_factory_vtable +client_tcp_channel_factory_vtable = + {client_tcp_channel_factory_ref, client_tcp_channel_factory_unref, + client_tcp_channel_factory_create_subchannel, + client_tcp_channel_factory_create_channel}; /* Create a client channel: Asynchronously: - resolve target @@ -219,13 +222,115 @@ grpc_channel *grpc_insecure_channel_create(const char *target, (target, args, reserved)); GPR_ASSERT(!reserved); - client_channel_factory *f = gpr_malloc(sizeof(*f)); + client_tcp_channel_factory *f = gpr_malloc(sizeof(*f)); memset(f, 0, sizeof(*f)); - f->base.vtable = &client_channel_factory_vtable; + f->base.vtable = &client_tcp_channel_factory_vtable; gpr_ref_init(&f->refs, 1); f->merge_args = grpc_channel_args_copy(args); - grpc_channel *channel = client_channel_factory_create_channel( + grpc_channel *channel = client_tcp_channel_factory_create_channel( + &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); + if (channel != NULL) { + f->master = channel; + GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create"); + } + grpc_client_channel_factory_unref(&exec_ctx, &f->base); + + grpc_exec_ctx_finish(&exec_ctx); + + return channel; /* may be NULL */ +} + +typedef struct { + grpc_client_channel_factory base; + + int fd; + + gpr_refcount refs; + grpc_channel_args *merge_args; + grpc_channel *master; +} client_fd_channel_factory; + +static void client_fd_channel_factory_ref( + grpc_client_channel_factory *cc_factory) { + client_fd_channel_factory *f = (client_fd_channel_factory *)cc_factory; + gpr_ref(&f->refs); +} + +static void client_fd_channel_factory_unref( + grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { + client_fd_channel_factory *f = (client_fd_channel_factory *)cc_factory; + if (gpr_unref(&f->refs)) { + if (f->master != NULL) { + GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, + "client_fd_channel_factory"); + } + grpc_channel_args_destroy(f->merge_args); + gpr_free(f); + } +} + +static grpc_subchannel *client_fd_channel_factory_create_subchannel( + grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, + grpc_subchannel_args *args) { + return NULL; +} + +static grpc_channel *client_fd_channel_factory_create_channel( + grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory, + const char *target, grpc_client_channel_type type, + grpc_channel_args *args) { + client_fd_channel_factory *f = (client_fd_channel_factory *)cc_factory; + + // FIXME(xyzzyz): hack to get the authority sent. + grpc_arg default_authority_arg; + default_authority_arg.type = GRPC_ARG_STRING; + default_authority_arg.key = GRPC_ARG_DEFAULT_AUTHORITY; + default_authority_arg.value.string = "test.authority"; + grpc_channel_args *merged_args = grpc_channel_args_merge(args, f->merge_args); + grpc_channel_args *final_args = grpc_channel_args_copy_and_add( + merged_args, &default_authority_arg, 1); + grpc_channel_args_destroy(merged_args); + + int flags = fcntl(f->fd, F_GETFL, 0); + GPR_ASSERT(fcntl(f->fd, F_SETFL, flags | O_NONBLOCK) == 0); + + grpc_endpoint *client = grpc_tcp_create( + grpc_fd_create(f->fd, "client"), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, + "fd-client"); + + grpc_transport *transport = + grpc_create_chttp2_transport(exec_ctx, final_args, client, 1); + GPR_ASSERT(transport); + grpc_channel *channel = grpc_channel_create( + exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); + grpc_channel_args_destroy(final_args); + grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0); + + return channel; +} + +static const grpc_client_channel_factory_vtable +client_fd_channel_factory_vtable = + {client_fd_channel_factory_ref, client_fd_channel_factory_unref, + client_fd_channel_factory_create_subchannel, + client_fd_channel_factory_create_channel}; + +grpc_channel *grpc_insecure_channel_create_from_fd( + const char *target, int fd, const grpc_channel_args *args) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + GRPC_API_TRACE( + "grpc_insecure_channel_create(target=%p, fd=%d, args=%p)", 3, + (target, fd, args)); + + client_fd_channel_factory *f = gpr_malloc(sizeof(*f)); + memset(f, 0, sizeof(*f)); + f->base.vtable = &client_fd_channel_factory_vtable; + gpr_ref_init(&f->refs, 1); + f->merge_args = grpc_channel_args_copy(args); + f->fd = fd; + + grpc_channel *channel = client_fd_channel_factory_create_channel( &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL); if (channel != NULL) { f->master = channel; |