aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/subchannel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config/subchannel.c')
-rw-r--r--src/core/client_config/subchannel.c77
1 files changed, 71 insertions, 6 deletions
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 037f0c0ab0..9637cf39fe 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -37,6 +37,13 @@
#include <grpc/support/alloc.h>
+#include "src/core/channel/channel_args.h"
+
+typedef struct {
+ gpr_refcount refs;
+ grpc_subchannel *subchannel;
+} connection;
+
struct grpc_subchannel {
gpr_refcount refs;
grpc_connector *connector;
@@ -49,24 +56,45 @@ struct grpc_subchannel {
/** address to connect to */
struct sockaddr *addr;
size_t addr_len;
+
+ /** set during connection */
+ grpc_transport *connecting_transport;
+
+ /** callback for connection finishing */
+ grpc_iomgr_closure connected;
+
+ /** mutex protecting remaining elements */
+ gpr_mu mu;
+
+ /** active connection */
+ connection *active;
+ /** are we connecting */
+ int connecting;
+ /** closures waiting for a connection */
+ grpc_iomgr_closure *waiting;
};
struct grpc_subchannel_call {
- grpc_subchannel *subchannel;
+ connection *connection;
gpr_refcount refs;
};
#define SUBCHANNEL_CALL_TO_CALL_STACK(call) (((grpc_call_stack *)(call)) + 1)
+static grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op);
+
/*
* grpc_subchannel implementation
*/
-void grpc_subchannel_ref(grpc_subchannel *channel) { gpr_ref(&channel->refs); }
+void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); }
-void grpc_subchannel_unref(grpc_subchannel *channel) {
- if (gpr_unref(&channel->refs)) {
- gpr_free(channel);
+void grpc_subchannel_unref(grpc_subchannel *c) {
+ if (gpr_unref(&c->refs)) {
+ gpr_free(c->filters);
+ grpc_channel_args_destroy(c->args);
+ gpr_free(c->addr);
+ gpr_free(c);
}
}
@@ -84,9 +112,39 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->addr = gpr_malloc(args->addr_len);
memcpy(c->addr, args->addr, args->addr_len);
c->addr_len = args->addr_len;
+ c->args = grpc_channel_args_copy(args->args);
+ gpr_mu_init(&c->mu);
return c;
}
+void grpc_subchannel_create_call(grpc_subchannel *c,
+ grpc_mdctx *mdctx,
+ grpc_transport_stream_op *initial_op,
+ grpc_subchannel_call **target,
+ grpc_iomgr_closure *notify) {
+ connection *con;
+ gpr_mu_lock(&c->mu);
+ if (c->active != NULL) {
+ con = c->active;
+ gpr_ref(&con->refs);
+ gpr_mu_unlock(&c->mu);
+
+ *target = create_call(con, initial_op);
+ notify->cb(notify->cb_arg, 1);
+ } else {
+ notify->next = c->waiting;
+ c->waiting = notify;
+ if (!c->connecting) {
+ c->connecting = 1;
+ gpr_mu_unlock(&c->mu);
+
+ grpc_connector_connect(c->connector, c->args, mdctx, &c->connecting_transport, &c->connected);
+ } else {
+ gpr_mu_unlock(&c->mu);
+ }
+ }
+}
+
/*
* grpc_subchannel_call implementation
*/
@@ -98,7 +156,9 @@ void grpc_subchannel_call_ref(grpc_subchannel_call *call) {
void grpc_subchannel_call_unref(grpc_subchannel_call *call) {
if (gpr_unref(&call->refs)) {
grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call));
- grpc_subchannel_unref(call->subchannel);
+ if (gpr_unref(&call->connection->refs)) {
+ gpr_free(call->connection);
+ }
gpr_free(call);
}
}
@@ -109,3 +169,8 @@ void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
top_elem->filter->start_transport_stream_op(top_elem, op);
}
+
+grpc_subchannel_call *create_call(connection *con, grpc_transport_stream_op *initial_op) {
+ abort();
+ return NULL;
+}