diff options
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r-- | src/core/client_config/subchannel.c | 77 |
1 files changed, 71 insertions, 6 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 037f0c0ab0..9637cf39fe 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -37,6 +37,13 @@ #include <grpc/support/alloc.h> +#include "src/core/channel/channel_args.h" + +typedef struct { + gpr_refcount refs; + grpc_subchannel *subchannel; +} connection; + struct grpc_subchannel { gpr_refcount refs; grpc_connector *connector; @@ -49,24 +56,45 @@ struct grpc_subchannel { /** address to connect to */ struct sockaddr *addr; size_t addr_len; + + /** set during connection */ + grpc_transport *connecting_transport; + + /** callback for connection finishing */ + grpc_iomgr_closure connected; + + /** mutex protecting remaining elements */ + gpr_mu mu; + + /** active connection */ + connection *active; + /** are we connecting */ + int connecting; + /** closures waiting for a connection */ + grpc_iomgr_closure *waiting; }; struct grpc_subchannel_call { - grpc_subchannel *subchannel; + connection *connection; gpr_refcount refs; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1) +static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op); + /* * grpc_subchannel implementation */ -void grpc_subchannel_ref(grpc_subchannel *channel) { gpr_ref(&channel->refs); } +void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); } -void grpc_subchannel_unref(grpc_subchannel *channel) { - if (gpr_unref(&channel->refs)) { - gpr_free(channel); +void grpc_subchannel_unref(grpc_subchannel *c) { + if (gpr_unref(&c->refs)) { + gpr_free(c->filters); + grpc_channel_args_destroy(c->args); + gpr_free(c->addr); + gpr_free(c); } } @@ -84,9 +112,39 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->addr = gpr_malloc(args->addr_len); memcpy(c->addr, args->addr, args->addr_len); c->addr_len = args->addr_len; + c->args = grpc_channel_args_copy(args->args); + gpr_mu_init(&c->mu); return c; } +void grpc_subchannel_create_call(grpc_subchannel *c, + grpc_mdctx *mdctx, + grpc_transport_stream_op *initial_op, + grpc_subchannel_call **target, + grpc_iomgr_closure *notify) { + connection *con; + gpr_mu_lock(&c->mu); + if (c->active != NULL) { + con = c->active; + gpr_ref(&con->refs); + gpr_mu_unlock(&c->mu); + + *target = create_call(con, initial_op); + notify->cb(notify->cb_arg, 1); + } else { + notify->next = c->waiting; + c->waiting = notify; + if (!c->connecting) { + c->connecting = 1; + gpr_mu_unlock(&c->mu); + + grpc_connector_connect(c->connector, c->args, mdctx, &c->connecting_transport, &c->connected); + } else { + gpr_mu_unlock(&c->mu); + } + } +} + /* * grpc_subchannel_call implementation */ @@ -98,7 +156,9 @@ void grpc_subchannel_call_ref(grpc_subchannel_call *call) { void grpc_subchannel_call_unref(grpc_subchannel_call *call) { if (gpr_unref(&call->refs)) { grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call)); - grpc_subchannel_unref(call->subchannel); + if (gpr_unref(&call->connection->refs)) { + gpr_free(call->connection); + } gpr_free(call); } } @@ -109,3 +169,8 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call, grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); top_elem->filter->start_transport_stream_op(top_elem, op); } + +grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) { + abort(); + return NULL; +} |