aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-26 17:21:41 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-26 17:21:41 -0700
commit04c5d4b8fda632af2fc0bec1ca7ed1b276b1969b (patch)
tree85f33dd6c89574b46e0718e59f40ae3dc53498b0 /src/core/client_config
parentff54c92adc8c46a0cdce74d06f98faea14f52100 (diff)
Progress - need to add http filters
Diffstat (limited to 'src/core/client_config')
-rw-r--r--src/core/client_config/connector.c11
-rw-r--r--src/core/client_config/connector.h38
-rw-r--r--src/core/client_config/subchannel.c41
3 files changed, 63 insertions, 27 deletions
diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c
index 60c392f85b..9cc57ddf38 100644
--- a/src/core/client_config/connector.c
+++ b/src/core/client_config/connector.c
@@ -42,10 +42,9 @@ void grpc_connector_unref(grpc_connector *connector) {
}
void grpc_connector_connect(
- grpc_connector *connector, grpc_pollset_set *pollset_set,
- const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
- const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
- grpc_transport **transport, grpc_iomgr_closure *notify) {
- connector->vtable->connect(connector, pollset_set, addr, addr_len, deadline,
- channel_args, metadata_context, transport, notify);
+ grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args,
+ grpc_iomgr_closure *notify) {
+ connector->vtable->connect(connector, in_args, out_args, notify);
}
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h
index 7241437729..55c6e63129 100644
--- a/src/core/client_config/connector.h
+++ b/src/core/client_config/connector.h
@@ -34,6 +34,7 @@
#ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H
#define GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H
+#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/sockaddr.h"
#include "src/core/transport/transport.h"
@@ -44,22 +45,43 @@ struct grpc_connector {
const grpc_connector_vtable *vtable;
};
+typedef struct {
+ /** set of pollsets interested in this connection */
+ grpc_pollset_set *interested_parties;
+ /** address to connect to */
+ const struct sockaddr *addr;
+ int addr_len;
+ /** deadline for connection */
+ gpr_timespec deadline;
+ /** channel arguments (to be passed to transport) */
+ const grpc_channel_args *channel_args;
+ /** metadata context */
+ grpc_mdctx *metadata_context;
+} grpc_connect_in_args;
+
+typedef struct {
+ /** the connected transport */
+ grpc_transport *transport;
+ /** any additional filters (owned by the caller of connect) */
+ const grpc_channel_filter **filters;
+ size_t num_filters;
+} grpc_connect_out_args;
+
struct grpc_connector_vtable {
void (*ref)(grpc_connector *connector);
void (*unref)(grpc_connector *connector);
- void (*connect)(grpc_connector *connector, grpc_pollset_set *pollset_set,
- const struct sockaddr *addr, int addr_len,
- gpr_timespec deadline, const grpc_channel_args *channel_args,
- grpc_mdctx *metadata_context, grpc_transport **transport,
+ void (*connect)(grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args,
grpc_iomgr_closure *notify);
};
void grpc_connector_ref(grpc_connector *connector);
void grpc_connector_unref(grpc_connector *connector);
void grpc_connector_connect(
- grpc_connector *connector, grpc_pollset_set *pollset_set,
- const struct sockaddr *addr, int addr_len, gpr_timespec deadline,
- const grpc_channel_args *channel_args, grpc_mdctx *metadata_context,
- grpc_transport **transport, grpc_iomgr_closure *notify);
+ grpc_connector *connector,
+ const grpc_connect_in_args *in_args,
+ grpc_connect_out_args *out_args,
+ grpc_iomgr_closure *notify);
#endif
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index e863c5b97c..c2044c9e70 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -74,7 +74,7 @@ struct grpc_subchannel {
grpc_mdctx *mdctx;
/** set during connection */
- grpc_transport *connecting_transport;
+ grpc_connect_out_args connecting_result;
/** callback for connection finishing */
grpc_iomgr_closure connected;
@@ -101,7 +101,8 @@ struct grpc_subchannel_call {
gpr_refcount refs;
};
-#define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1)
+#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
+#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
static void connectivity_state_changed_locked(grpc_subchannel *c);
@@ -160,6 +161,19 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
+static void start_connect(grpc_subchannel *c) {
+ grpc_connect_in_args args;
+
+ args.interested_parties = &c->pollset_set;
+ args.addr = c->addr;
+ args.addr_len = c->addr_len;
+ args.deadline = compute_connect_deadline(c);
+ args.channel_args = c->args;
+ args.metadata_context = c->mdctx;
+
+ grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected);
+}
+
void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_transport_stream_op *initial_op,
grpc_subchannel_call **target,
@@ -187,9 +201,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_subchannel_ref(c);
gpr_mu_unlock(&c->mu);
- grpc_connector_connect(c->connector, &c->pollset_set, c->addr,
- c->addr_len, compute_connect_deadline(c), c->args,
- c->mdctx, &c->connecting_transport, &c->connected);
+ start_connect(c);
} else {
gpr_mu_unlock(&c->mu);
}
@@ -232,9 +244,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
gpr_mu_unlock(&c->mu);
}
if (do_connect) {
- grpc_connector_connect(c->connector, &c->pollset_set, c->addr, c->addr_len,
- compute_connect_deadline(c), c->args, c->mdctx,
- &c->connecting_transport, &c->connected);
+ start_connect(c);
}
}
@@ -246,8 +256,8 @@ static void publish_transport(grpc_subchannel *c) {
gpr_ref_init(&con->refs, 1);
con->subchannel = c;
grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk);
- grpc_connected_channel_bind_transport(stk, c->connecting_transport);
- c->connecting_transport = NULL;
+ grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
+ memset(&c->connecting_result, 0, sizeof(c->connecting_result));
gpr_mu_lock(&c->mu);
GPR_ASSERT(c->active == NULL);
@@ -262,7 +272,7 @@ static void publish_transport(grpc_subchannel *c) {
static void subchannel_connected(void *arg, int iomgr_success) {
grpc_subchannel *c = arg;
- if (c->connecting_transport) {
+ if (c->connecting_result.transport) {
publish_transport(c);
} else {
grpc_subchannel_unref(c);
@@ -330,6 +340,11 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
}
grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
- abort();
- return NULL;
+ grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
+ grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
+ grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
+ call->connection = con;
+ gpr_ref_init(&call->refs, 1);
+ grpc_call_stack_init(chanstk, NULL, initial_op, callstk);
+ return call;
}