diff options
-rw-r--r-- | src/core/channel/client_channel.c | 25 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 17 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 22 |
3 files changed, 44 insertions, 20 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 8e7cb27cfd..ed73b042db 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -196,13 +196,12 @@ static int is_empty(void *p, int len) { return 1; } -static void started_call(grpc_exec_ctx *exec_ctx, void *arg, - int iomgr_success) { +static void started_call_locked(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { call_data *calld = arg; grpc_transport_stream_op op; int have_waiting; - gpr_mu_lock(&calld->mu_state); if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) { memset(&op, 0, sizeof(op)); op.cancel_with_status = GRPC_STATUS_CANCELLED; @@ -230,10 +229,18 @@ static void started_call(grpc_exec_ctx *exec_ctx, void *arg, } } +static void started_call(grpc_exec_ctx *exec_ctx, void *arg, + int iomgr_success) { + call_data *calld = arg; + gpr_mu_lock(&calld->mu_state); + started_call_locked(exec_ctx, arg, iomgr_success); +} + static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { call_data *calld = arg; grpc_pollset *pollset; + grpc_subchannel_call_create_status call_creation_status; if (calld->picked_channel == NULL) { /* treat this like a cancellation */ @@ -248,11 +255,15 @@ static void picked_target(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK); calld->state = CALL_WAITING_FOR_CALL; pollset = calld->waiting_op.bind_pollset; - gpr_mu_unlock(&calld->mu_state); grpc_closure_init(&calld->async_setup_task, started_call, calld); - grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset, - &calld->subchannel_call, - &calld->async_setup_task); + call_creation_status = grpc_subchannel_create_call( + exec_ctx, calld->picked_channel, pollset, &calld->subchannel_call, + &calld->async_setup_task); + if (call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY) { + started_call_locked(exec_ctx, calld, iomgr_success); + } else { + gpr_mu_unlock(&calld->mu_state); + } } } } diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a2c521a20d..5e84dec0ca 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -335,18 +335,20 @@ static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) { + grpc_subchannel_call_create_status call_creation_status; waiting_for_connect *w4c = arg; grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset); - grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset, - w4c->target, w4c->notify); + call_creation_status = grpc_subchannel_create_call( + exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify); + GPR_ASSERT(call_creation_status == GRPC_SUBCHANNEL_CALL_CREATE_READY); + w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success); GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect"); gpr_free(w4c); } -void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, - grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify) { +grpc_subchannel_call_create_status grpc_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_subchannel *c, grpc_pollset *pollset, + grpc_subchannel_call **target, grpc_closure *notify) { connection *con; gpr_mu_lock(&c->mu); if (c->active != NULL) { @@ -355,7 +357,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, gpr_mu_unlock(&c->mu); *target = create_call(exec_ctx, con); - notify->cb(exec_ctx, notify->cb_arg, 1); + return GRPC_SUBCHANNEL_CALL_CREATE_READY; } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; @@ -380,6 +382,7 @@ void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c, } else { gpr_mu_unlock(&c->mu); } + return GRPC_SUBCHANNEL_CALL_CREATE_PENDING; } } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 86b7fa5851..a26d08f02e 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -75,12 +75,22 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -/** construct a call (possibly asynchronously) */ -void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, - grpc_subchannel *subchannel, - grpc_pollset *pollset, - grpc_subchannel_call **target, - grpc_closure *notify); +typedef enum { + GRPC_SUBCHANNEL_CALL_CREATE_READY, + GRPC_SUBCHANNEL_CALL_CREATE_PENDING +} grpc_subchannel_call_create_status; + +/** construct a subchannel call (possibly asynchronously). + * + * If the returned status is \a GRPC_SUBCHANNEL_CALL_CREATE_READY, the call will + * return immediately and \a target will point to a connected \a subchannel_call + * instance. Note that \a notify will \em not be invoked in this case. + * Otherwise, if the returned status is GRPC_SUBCHANNEL_CALL_CREATE_PENDING, the + * subchannel call will be created asynchronously, invoking the \a notify + * callback upon completion. */ +grpc_subchannel_call_create_status grpc_subchannel_create_call( + grpc_exec_ctx *exec_ctx, grpc_subchannel *subchannel, grpc_pollset *pollset, + grpc_subchannel_call **target, grpc_closure *notify); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx, |