aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/client_channel.c25
-rw-r--r--src/core/client_config/subchannel.c17
-rw-r--r--src/core/client_config/subchannel.h22
3 files changed, 44 insertions, 20 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 8e7cb27cfd..ed73b042db 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -196,13 +196,12 @@ static int is_empty(void *p, int len) {
return 1;
}
-static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
- int iomgr_success) {
+static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
call_data *calld = arg;
grpc_transport_stream_op op;
int have_waiting;
- gpr_mu_lock(&calld->mu_state);
if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
memset(&op, 0, sizeof(op));
op.cancel_with_status = GRPC_STATUS_CANCELLED;
@@ -230,10 +229,18 @@ static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
}
}
+static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
+ int iomgr_success) {
+ call_data *calld = arg;
+ gpr_mu_lock(&calld->mu_state);
+ started_call_locked(exec_ctx, arg, iomgr_success);
+}
+
static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
call_data *calld = arg;
grpc_pollset *pollset;
+ grpc_subchannel_call_create_status call_creation_status;
if (calld->picked_channel == NULL) {
/* treat this like a cancellation */
@@ -248,11 +255,15 @@ static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
calld->state = CALL_WAITING_FOR_CALL;
pollset = calld->waiting_op.bind_pollset;
- gpr_mu_unlock(&calld->mu_state);
grpc_closure_init(&calld->async_setup_task, started_call, calld);
- grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
- &calld->subchannel_call,
- &calld->async_setup_task);
+ call_creation_status = grpc_subchannel_create_call(
+ exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call,
+ &calld->async_setup_task);
+ if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) {
+ started_call_locked(exec_ctx, calld, iomgr_success);
+ } else {
+ gpr_mu_unlock(&calld->mu_state);
+ }
}
}
}
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index a2c521a20d..5e84dec0ca 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -335,18 +335,20 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
int iomgr_success) {
+ grpc_subchannel_call_create_status call_creation_status;
waiting_for_connect *w4c = arg;
grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
- grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset,
- w4c->target, w4c->notify);
+ call_creation_status = grpc_subchannel_create_call(
+ exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
+ GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY);
+ w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
gpr_free(w4c);
}
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
- grpc_pollset *pollset,
- grpc_subchannel_call **target,
- grpc_closure *notify) {
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset,
+ grpc_subchannel_call **target, grpc_closure *notify) {
connection *con;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
@@ -355,7 +357,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
gpr_mu_unlock(&c->mu);
*target = create_call(exec_ctx, con);
- notify->cb(exec_ctx, notify->cb_arg, 1);
+ return GRPC_SUBCHANNEL_CALL_CREATE_READY;
} else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
@@ -380,6 +382,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
} else {
gpr_mu_unlock(&c->mu);
}
+ return GRPC_SUBCHANNEL_CALL_CREATE_PENDING;
}
}
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 86b7fa5851..a26d08f02e 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -75,12 +75,22 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call *call
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-/** construct a call (possibly asynchronously) */
-void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx,
- grpc_subchannel *subchannel,
- grpc_pollset *pollset,
- grpc_subchannel_call **target,
- grpc_closure *notify);
+typedef enum {
+ GRPC_SUBCHANNEL_CALL_CREATE_READY,
+ GRPC_SUBCHANNEL_CALL_CREATE_PENDING
+} grpc_subchannel_call_create_status;
+
+/** construct a subchannel call (possibly asynchronously).
+ *
+ * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will
+ * return immediately and \a target will point to a connected \a subchannel_call
+ * instance. Note that \a notify will \em not be invoked in this case.
+ * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the
+ * subchannel call will be created asynchronously, invoking the \a notify
+ * callback upon completion. */
+grpc_subchannel_call_create_status grpc_subchannel_create_call(
+ grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset,
+ grpc_subchannel_call **target, grpc_closure *notify);
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,