diff options
author | Craig Tiller <craig.tiller@gmail.com> | 2015-06-26 17:21:41 -0700 |
---|---|---|
committer | Craig Tiller <craig.tiller@gmail.com> | 2015-06-26 17:21:41 -0700 |
commit | 04c5d4b8fda632af2fc0bec1ca7ed1b276b1969b (patch) | |
tree | 85f33dd6c89574b46e0718e59f40ae3dc53498b0 /src/core/client_config | |
parent | ff54c92adc8c46a0cdce74d06f98faea14f52100 (diff) |
Progress - need to add http filters
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/connector.c | 11 | ||||
-rw-r--r-- | src/core/client_config/connector.h | 38 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 41 |
3 files changed, 63 insertions, 27 deletions
diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index 60c392f85b..9cc57ddf38 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -42,10 +42,9 @@ void grpc_connector_unref(grpc_connector *connector) { } void grpc_connector_connect( - grpc_connector *connector, grpc_pollset_set *pollset_set, - const struct sockaddr *addr, int addr_len, gpr_timespec deadline, - const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, - grpc_transport **transport, grpc_iomgr_closure *notify) { - connector->vtable->connect(connector, pollset_set, addr, addr_len, deadline, - channel_args, metadata_context, transport, notify); + grpc_connector *connector, + const grpc_connect_in_args *in_args, + grpc_connect_out_args *out_args, + grpc_iomgr_closure *notify) { + connector->vtable->connect(connector, in_args, out_args, notify); } diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index 7241437729..55c6e63129 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -34,6 +34,7 @@ #ifndef GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H #define GRPC_INTERNAL_CORE_CLIENT_CONFIG_CONNECTOR_H +#include "src/core/channel/channel_stack.h" #include "src/core/iomgr/sockaddr.h" #include "src/core/transport/transport.h" @@ -44,22 +45,43 @@ struct grpc_connector { const grpc_connector_vtable *vtable; }; +typedef struct { + /** set of pollsets interested in this connection */ + grpc_pollset_set *interested_parties; + /** address to connect to */ + const struct sockaddr *addr; + int addr_len; + /** deadline for connection */ + gpr_timespec deadline; + /** channel arguments (to be passed to transport) */ + const grpc_channel_args *channel_args; + /** metadata context */ + grpc_mdctx *metadata_context; +} grpc_connect_in_args; + +typedef struct { + /** the connected transport */ + grpc_transport *transport; + /** any additional filters (owned by the caller of connect) */ + const grpc_channel_filter **filters; + size_t num_filters; +} grpc_connect_out_args; + struct grpc_connector_vtable { void (*ref)(grpc_connector *connector); void (*unref)(grpc_connector *connector); - void (*connect)(grpc_connector *connector, grpc_pollset_set *pollset_set, - const struct sockaddr *addr, int addr_len, - gpr_timespec deadline, const grpc_channel_args *channel_args, - grpc_mdctx *metadata_context, grpc_transport **transport, + void (*connect)(grpc_connector *connector, + const grpc_connect_in_args *in_args, + grpc_connect_out_args *out_args, grpc_iomgr_closure *notify); }; void grpc_connector_ref(grpc_connector *connector); void grpc_connector_unref(grpc_connector *connector); void grpc_connector_connect( - grpc_connector *connector, grpc_pollset_set *pollset_set, - const struct sockaddr *addr, int addr_len, gpr_timespec deadline, - const grpc_channel_args *channel_args, grpc_mdctx *metadata_context, - grpc_transport **transport, grpc_iomgr_closure *notify); + grpc_connector *connector, + const grpc_connect_in_args *in_args, + grpc_connect_out_args *out_args, + grpc_iomgr_closure *notify); #endif diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index e863c5b97c..c2044c9e70 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -74,7 +74,7 @@ struct grpc_subchannel { grpc_mdctx *mdctx; /** set during connection */ - grpc_transport *connecting_transport; + grpc_connect_out_args connecting_result; /** callback for connection finishing */ grpc_iomgr_closure connected; @@ -101,7 +101,8 @@ struct grpc_subchannel_call { gpr_refcount refs; }; -#define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1) +#define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) +#define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1)) static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op); static void connectivity_state_changed_locked(grpc_subchannel *c); @@ -160,6 +161,19 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, return c; } +static void start_connect(grpc_subchannel *c) { + grpc_connect_in_args args; + + args.interested_parties = &c->pollset_set; + args.addr = c->addr; + args.addr_len = c->addr_len; + args.deadline = compute_connect_deadline(c); + args.channel_args = c->args; + args.metadata_context = c->mdctx; + + grpc_connector_connect(c->connector, &args, &c->connecting_result, &c->connected); +} + void grpc_subchannel_create_call(grpc_subchannel *c, grpc_transport_stream_op *initial_op, grpc_subchannel_call **target, @@ -187,9 +201,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_subchannel_ref(c); gpr_mu_unlock(&c->mu); - grpc_connector_connect(c->connector, &c->pollset_set, c->addr, - c->addr_len, compute_connect_deadline(c), c->args, - c->mdctx, &c->connecting_transport, &c->connected); + start_connect(c); } else { gpr_mu_unlock(&c->mu); } @@ -232,9 +244,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, gpr_mu_unlock(&c->mu); } if (do_connect) { - grpc_connector_connect(c->connector, &c->pollset_set, c->addr, c->addr_len, - compute_connect_deadline(c), c->args, c->mdctx, - &c->connecting_transport, &c->connected); + start_connect(c); } } @@ -246,8 +256,8 @@ static void publish_transport(grpc_subchannel *c) { gpr_ref_init(&con->refs, 1); con->subchannel = c; grpc_channel_stack_init(c->filters, c->filter_count, c->args, c->mdctx, stk); - grpc_connected_channel_bind_transport(stk, c->connecting_transport); - c->connecting_transport = NULL; + grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); + memset(&c->connecting_result, 0, sizeof(c->connecting_result)); gpr_mu_lock(&c->mu); GPR_ASSERT(c->active == NULL); @@ -262,7 +272,7 @@ static void publish_transport(grpc_subchannel *c) { static void subchannel_connected(void *arg, int iomgr_success) { grpc_subchannel *c = arg; - if (c->connecting_transport) { + if (c->connecting_result.transport) { publish_transport(c); } else { grpc_subchannel_unref(c); @@ -330,6 +340,11 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call, } grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) { - abort(); - return NULL; + grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); + grpc_subchannel_call *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); + grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call); + call->connection = con; + gpr_ref_init(&call->refs, 1); + grpc_call_stack_init(chanstk, NULL, initial_op, callstk); + return call; } |