aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Adam Michalik <xyzzyz@google.com>2016-04-27 16:52:51 -0700
committerGravatar Adam Michalik <xyzzyz@google.com>2016-06-07 15:04:36 -0700
commit6e9fab5ed67106da998ec368e8882ba7a245a9b1 (patch)
tree5c157c18a665a39bebc94841cf7d695e8f5e3885
parentd30d4e279c4a63effaa6e912fc00bd4ad96054c7 (diff)
Add support for establishing client channel using existing FD
-rw-r--r--include/grpc/grpc.h6
-rw-r--r--src/core/ext/transport/chttp2/client/insecure/channel_create.c141
2 files changed, 129 insertions, 18 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 6f7a67b715..163d5291d3 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -240,6 +240,12 @@ GRPCAPI char *grpc_channel_get_target(grpc_channel *channel);
GRPCAPI grpc_channel *grpc_insecure_channel_create(
const char *target, const grpc_channel_args *args, void *reserved);
+/** Create a client channel to 'target' using file descriptor 'fd'. The 'target'
+ argument will be used to indicate the name for this channel. See the comment
+ for grpc_insecure_channel_create for description of 'args' argument. */
+GRPCAPI grpc_channel *grpc_insecure_channel_create_from_fd(
+ const char *target, int fd, const grpc_channel_args *args);
+
/** Create a lame client: this client fails every operation attempted on it. */
GRPCAPI grpc_channel *grpc_lame_client_channel_create(
const char *target, grpc_status_code error_code, const char *error_message);
diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
index c5d3d8d9cc..8ed216db7b 100644
--- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c
+++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c
@@ -33,6 +33,7 @@
#include <grpc/grpc.h>
+#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
@@ -47,6 +48,7 @@
#include "src/core/lib/channel/compress_filter.h"
#include "src/core/lib/channel/http_client_filter.h"
#include "src/core/lib/iomgr/tcp_client.h"
+#include "src/core/lib/iomgr/tcp_posix.h"
#include "src/core/lib/surface/api_trace.h"
#include "src/core/lib/surface/channel.h"
@@ -139,31 +141,31 @@ typedef struct {
gpr_refcount refs;
grpc_channel_args *merge_args;
grpc_channel *master;
-} client_channel_factory;
+} client_tcp_channel_factory;
-static void client_channel_factory_ref(
+static void client_tcp_channel_factory_ref(
grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
+ client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory;
gpr_ref(&f->refs);
}
-static void client_channel_factory_unref(
+static void client_tcp_channel_factory_unref(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
+ client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory;
if (gpr_unref(&f->refs)) {
if (f->master != NULL) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
- "client_channel_factory");
+ "client_tcp_channel_factory");
}
grpc_channel_args_destroy(f->merge_args);
gpr_free(f);
}
}
-static grpc_subchannel *client_channel_factory_create_subchannel(
+static grpc_subchannel *client_tcp_channel_factory_create_subchannel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
grpc_subchannel_args *args) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
+ client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory;
connector *c = gpr_malloc(sizeof(*c));
grpc_channel_args *final_args =
grpc_channel_args_merge(args->args, f->merge_args);
@@ -178,11 +180,11 @@ static grpc_subchannel *client_channel_factory_create_subchannel(
return s;
}
-static grpc_channel *client_channel_factory_create_channel(
+static grpc_channel *client_tcp_channel_factory_create_channel(
grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
const char *target, grpc_client_channel_type type,
grpc_channel_args *args) {
- client_channel_factory *f = (client_channel_factory *)cc_factory;
+ client_tcp_channel_factory *f = (client_tcp_channel_factory *)cc_factory;
grpc_channel_args *final_args = grpc_channel_args_merge(args, f->merge_args);
grpc_channel *channel = grpc_channel_create(exec_ctx, target, final_args,
GRPC_CLIENT_CHANNEL, NULL);
@@ -190,7 +192,7 @@ static grpc_channel *client_channel_factory_create_channel(
grpc_resolver *resolver = grpc_resolver_create(target, &f->base);
if (!resolver) {
GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel,
- "client_channel_factory_create_channel");
+ "client_tcp_channel_factory_create_channel");
return NULL;
}
@@ -201,10 +203,11 @@ static grpc_channel *client_channel_factory_create_channel(
return channel;
}
-static const grpc_client_channel_factory_vtable client_channel_factory_vtable =
- {client_channel_factory_ref, client_channel_factory_unref,
- client_channel_factory_create_subchannel,
- client_channel_factory_create_channel};
+static const grpc_client_channel_factory_vtable
+client_tcp_channel_factory_vtable =
+ {client_tcp_channel_factory_ref, client_tcp_channel_factory_unref,
+ client_tcp_channel_factory_create_subchannel,
+ client_tcp_channel_factory_create_channel};
/* Create a client channel:
Asynchronously: - resolve target
@@ -219,13 +222,115 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
(target, args, reserved));
GPR_ASSERT(!reserved);
- client_channel_factory *f = gpr_malloc(sizeof(*f));
+ client_tcp_channel_factory *f = gpr_malloc(sizeof(*f));
memset(f, 0, sizeof(*f));
- f->base.vtable = &client_channel_factory_vtable;
+ f->base.vtable = &client_tcp_channel_factory_vtable;
gpr_ref_init(&f->refs, 1);
f->merge_args = grpc_channel_args_copy(args);
- grpc_channel *channel = client_channel_factory_create_channel(
+ grpc_channel *channel = client_tcp_channel_factory_create_channel(
+ &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
+ if (channel != NULL) {
+ f->master = channel;
+ GRPC_CHANNEL_INTERNAL_REF(f->master, "grpc_insecure_channel_create");
+ }
+ grpc_client_channel_factory_unref(&exec_ctx, &f->base);
+
+ grpc_exec_ctx_finish(&exec_ctx);
+
+ return channel; /* may be NULL */
+}
+
+typedef struct {
+ grpc_client_channel_factory base;
+
+ int fd;
+
+ gpr_refcount refs;
+ grpc_channel_args *merge_args;
+ grpc_channel *master;
+} client_fd_channel_factory;
+
+static void client_fd_channel_factory_ref(
+ grpc_client_channel_factory *cc_factory) {
+ client_fd_channel_factory *f = (client_fd_channel_factory *)cc_factory;
+ gpr_ref(&f->refs);
+}
+
+static void client_fd_channel_factory_unref(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) {
+ client_fd_channel_factory *f = (client_fd_channel_factory *)cc_factory;
+ if (gpr_unref(&f->refs)) {
+ if (f->master != NULL) {
+ GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master,
+ "client_fd_channel_factory");
+ }
+ grpc_channel_args_destroy(f->merge_args);
+ gpr_free(f);
+ }
+}
+
+static grpc_subchannel *client_fd_channel_factory_create_subchannel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
+ grpc_subchannel_args *args) {
+ return NULL;
+}
+
+static grpc_channel *client_fd_channel_factory_create_channel(
+ grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory,
+ const char *target, grpc_client_channel_type type,
+ grpc_channel_args *args) {
+ client_fd_channel_factory *f = (client_fd_channel_factory *)cc_factory;
+
+ // FIXME(xyzzyz): hack to get the authority sent.
+ grpc_arg default_authority_arg;
+ default_authority_arg.type = GRPC_ARG_STRING;
+ default_authority_arg.key = GRPC_ARG_DEFAULT_AUTHORITY;
+ default_authority_arg.value.string = "test.authority";
+ grpc_channel_args *merged_args = grpc_channel_args_merge(args, f->merge_args);
+ grpc_channel_args *final_args = grpc_channel_args_copy_and_add(
+ merged_args, &default_authority_arg, 1);
+ grpc_channel_args_destroy(merged_args);
+
+ int flags = fcntl(f->fd, F_GETFL, 0);
+ GPR_ASSERT(fcntl(f->fd, F_SETFL, flags | O_NONBLOCK) == 0);
+
+ grpc_endpoint *client = grpc_tcp_create(
+ grpc_fd_create(f->fd, "client"), GRPC_TCP_DEFAULT_READ_SLICE_SIZE,
+ "fd-client");
+
+ grpc_transport *transport =
+ grpc_create_chttp2_transport(exec_ctx, final_args, client, 1);
+ GPR_ASSERT(transport);
+ grpc_channel *channel = grpc_channel_create(
+ exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
+ grpc_channel_args_destroy(final_args);
+ grpc_chttp2_transport_start_reading(exec_ctx, transport, NULL, 0);
+
+ return channel;
+}
+
+static const grpc_client_channel_factory_vtable
+client_fd_channel_factory_vtable =
+ {client_fd_channel_factory_ref, client_fd_channel_factory_unref,
+ client_fd_channel_factory_create_subchannel,
+ client_fd_channel_factory_create_channel};
+
+grpc_channel *grpc_insecure_channel_create_from_fd(
+ const char *target, int fd, const grpc_channel_args *args) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_API_TRACE(
+ "grpc_insecure_channel_create(target=%p, fd=%d, args=%p)", 3,
+ (target, fd, args));
+
+ client_fd_channel_factory *f = gpr_malloc(sizeof(*f));
+ memset(f, 0, sizeof(*f));
+ f->base.vtable = &client_fd_channel_factory_vtable;
+ gpr_ref_init(&f->refs, 1);
+ f->merge_args = grpc_channel_args_copy(args);
+ f->fd = fd;
+
+ grpc_channel *channel = client_fd_channel_factory_create_channel(
&exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, NULL);
if (channel != NULL) {
f->master = channel;