aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/client_config/connector.c11
-rw-r--r--src/core/client_config/connector.h38
-rw-r--r--src/core/client_config/subchannel.c41
-rw-r--r--src/core/surface/channel_create.c24
-rw-r--r--src/core/surface/server.c2
5 files changed, 75 insertions, 41 deletions
diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c
index 60c392f85b..9cc57ddf38 100644
--- a/src/core/client_config/connector.c
+++ b/src/core/client_config/connector.c
@@ -42,10 +42,9 @@ void grpc_connector_unref(grpc_connector *connector) {
}
void grpc_connector_connect(
- grpc_connector *connector, grpc_pollset_set *pollset_set,
- const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
- const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
- grpc_transport **transport, grpc_iomgr_closure *notify) {
- connector->vtable->connect(connector, pollset_set, addr, addr_len, deadline,
- channel_args, metadata_context, transport, notify);
+ grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args,
+ grpc_iomgr_closure *notify) {
+ connector->vtable->connect(connector, in_args, out_args, notify);
}
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h
index 7241437729..55c6e63129 100644
--- a/src/core/client_config/connector.h
+++ b/src/core/client_config/connector.h
@@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H
+#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/sockaddr.h"
#include "src/core/transport/transport.h"
@@ -44,22 +45,43 @@ struct grpc_connector {
const grpc_connector_vtable *vtable;
};
+typedef struct {
+ /** set of pollsets interested in this connection */
+ grpc_pollset_set *interested_parties;
+ /** address to connect to */
+ const struct sockaddr *addr;
+ int addr_len;
+ /** deadline for connection */
+ gpr_timespec deadline;
+ /** channel arguments (to be passed to transport) */
+ const grpc_channel_args *channel_args;
+ /** metadata context */
+ grpc_mdctx *metadata_context;
+} grpc_connect_in_args;
+
+typedef struct {
+ /** the connected transport */
+ grpc_transport *transport;
+ /** any additional filters (owned by the caller of connect) */
+ const grpc_channel_filter **filters;
+ size_t num_filters;
+} grpc_connect_out_args;
+
struct grpc_connector_vtable {
void (*ref)(grpc_connector *connector);
void (*unref)(grpc_connector *connector);
- void (*connect)(grpc_connector *connector, grpc_pollset_set *pollset_set,
- const struct sockaddr *addr, int addr_len,
- gpr_timespec deadline, const grpc_channel_args *channel_args,
- grpc_mdctx *metadata_context, grpc_transport **transport,
+ void (*connect)(grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args,
grpc_iomgr_closure *notify);
};
void grpc_connector_ref(grpc_connector *connector);
void grpc_connector_unref(grpc_connector *connector);
void grpc_connector_connect(
- grpc_connector *connector, grpc_pollset_set *pollset_set,
- const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
- const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
- grpc_transport **transport, grpc_iomgr_closure *notify);
+ grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args,
+ grpc_iomgr_closure *notify);
#endif
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index e863c5b97c..c2044c9e70 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -74,7 +74,7 @@ struct grpc_subchannel {
grpc_mdctx *mdctx;
/** set during connection */
- grpc_transport *connecting_transport;
+ grpc_connect_out_args connecting_result;
/** callback for connection finishing */
grpc_iomgr_closure connected;
@@ -101,7 +101,8 @@ struct grpc_subchannel_call {
gpr_refcount refs;
};
-#define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1)
+#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
+#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
static void connectivity_state_changed_locked(grpc_subchannel *c);
@@ -160,6 +161,19 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
+static void start_connect(grpc_subchannel *c) {
+ grpc_connect_in_args args;
+
+ args.interested_parties = &c->pollset_set;
+ args.addr = c->addr;
+ args.addr_len = c->addr_len;
+ args.deadline = compute_connect_deadline(c);
+ args.channel_args = c->args;
+ args.metadata_context = c->mdctx;
+
+ grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected);
+}
+
void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_transport_stream_op *initial_op,
grpc_subchannel_call **target,
@@ -187,9 +201,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_subchannel_ref(c);
gpr_mu_unlock(&c->mu);
- grpc_connector_connect(c->connector, &c->pollset_set, c->addr,
- c->addr_len, compute_connect_deadline(c), c->args,
- c->mdctx, &c->connecting_transport, &c->connected);
+ start_connect(c);
} else {
gpr_mu_unlock(&c->mu);
}
@@ -232,9 +244,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
gpr_mu_unlock(&c->mu);
}
if (do_connect) {
- grpc_connector_connect(c->connector, &c->pollset_set, c->addr, c->addr_len,
- compute_connect_deadline(c), c->args, c->mdctx,
- &c->connecting_transport, &c->connected);
+ start_connect(c);
}
}
@@ -246,8 +256,8 @@ static void publish_transport(grpc_subchannel *c) {
gpr_ref_init(&con->refs, 1);
con->subchannel = c;
grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk);
- grpc_connected_channel_bind_transport(stk, c->connecting_transport);
- c->connecting_transport = NULL;
+ grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
+ memset(&c->connecting_result, 0, sizeof(c->connecting_result));
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->active == NULL);
@@ -262,7 +272,7 @@ static void publish_transport(grpc_subchannel *c) {
static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
- if (c->connecting_transport) {
+ if (c->connecting_result.transport) {
publish_transport(c);
} else {
grpc_subchannel_unref(c);
@@ -330,6 +340,11 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
}
grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
- abort();
- return NULL;
+ grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
+ grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
+ call->connection = con;
+ gpr_ref_init(&call->refs, 1);
+ grpc_call_stack_init(chanstk, NULL, initial_op, callstk);
+ return call;
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 3d54ff58d8..afa1d36820 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -49,10 +49,9 @@ typedef struct {
grpc_connector base;
gpr_refcount refs;
- grpc_transport **transport;
grpc_iomgr_closure *notify;
- const grpc_channel_args *args;
- grpc_mdctx *mdctx;
+ grpc_connect_in_args args;
+ grpc_connect_out_args *result;
} connector;
static void connector_ref(grpc_connector *con) {
@@ -71,10 +70,10 @@ static void connected(void *arg, grpc_endpoint *tcp) {
connector *c = arg;
grpc_iomgr_closure *notify;
if (tcp != NULL) {
- *c->transport =
- grpc_create_chttp2_transport(c->args, tcp, NULL, 0, c->mdctx, 1);
+ c->result->transport =
+ grpc_create_chttp2_transport(c->args.channel_args, tcp, NULL, 0, c->args.metadata_context, 1);
} else {
- *c->transport = NULL;
+ c->result->transport = NULL;
}
notify = c->notify;
c->notify = NULL;
@@ -82,18 +81,15 @@ static void connected(void *arg, grpc_endpoint *tcp) {
}
static void connector_connect(
- grpc_connector *con, grpc_pollset_set *pollset_set,
- const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
- const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
- grpc_transport **transport, grpc_iomgr_closure *notify) {
+ grpc_connector *con, const grpc_connect_in_args *args,
+ grpc_connect_out_args *result, grpc_iomgr_closure *notify) {
connector *c = (connector *)con;
GPR_ASSERT(c->notify == NULL);
GPR_ASSERT(notify->cb);
c->notify = notify;
- c->args = channel_args;
- c->mdctx = metadata_context;
- c->transport = transport;
- grpc_tcp_client_connect(connected, c, pollset_set, addr, addr_len, deadline);
+ c->args = *args;
+ c->result = result;
+ grpc_tcp_client_connect(connected, c, args->interested_parties, args->addr, args->addr_len, args->deadline);
}
static const grpc_connector_vtable connector_vtable = {connector_ref, connector_unref, connector_connect};
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index f4d76e882f..6d06725bf3 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -1229,6 +1229,8 @@ static void begin_call(grpc_server *server, call_data *calld,
calld->cq_new = rc->cq_for_notification;
switch (rc->type) {
case BATCH_CALL:
+ GPR_ASSERT(calld->host != NULL);
+ GPR_ASSERT(calld->path != NULL);
cpstr(&rc->data.batch.details->host,
&rc->data.batch.details->host_capacity, calld->host);
cpstr(&rc->data.batch.details->method,