aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext/client_config
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/ext/client_config')
-rw-r--r--src/core/ext/client_config/client_channel.c6
-rw-r--r--src/core/ext/client_config/subchannel.c25
-rw-r--r--src/core/ext/client_config/subchannel.h4
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c38
4 files changed, 51 insertions, 22 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 739487a06b..2c0c4abffc 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -436,10 +436,12 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem,
args->call_stack);
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index d089cd4399..df35904b85 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -702,19 +702,26 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
return GET_CONNECTED_SUBCHANNEL(c, acq);
}
-grpc_subchannel_call *grpc_connected_subchannel_create_call(
+grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent) {
+ grpc_polling_entity *pollent, grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_subchannel_call *call =
- gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
- grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
- call->connection = con;
+ *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
+ grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
+ (*call)->connection = con; // Ref is added below.
+ grpc_error *error =
+ grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
+ NULL, NULL, callstk);
+ if (error != GRPC_ERROR_NONE) {
+ const char *error_string = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "error: %s", error_string);
+ grpc_error_free_string(error_string);
+ gpr_free(*call);
+ return error;
+ }
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
- NULL, NULL, callstk);
grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
- return call;
+ return GRPC_ERROR_NONE;
}
grpc_call_stack *grpc_subchannel_call_get_call_stack(
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index 9fbf3b6123..40d90be124 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -108,9 +108,9 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a subchannel call */
-grpc_subchannel_call *grpc_connected_subchannel_create_call(
+grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent);
+ grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index b96a0ad093..be6d054af4 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -84,6 +84,11 @@ void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
gpr_free(holder->waiting_ops);
}
+// The logic here is fairly complicated, due to (a) the fact that we
+// need to handle the case where we receive the send op before the
+// initial metadata op, and (b) the need for efficiency, especially in
+// the streaming case.
+// TODO(ctiller): Explain this more thoroughly.
void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op) {
@@ -121,7 +126,8 @@ retry:
}
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_error != GRPC_ERROR_NONE) {
- if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
+ if (!gpr_atm_rel_cas(&holder->subchannel_call, 0,
+ (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
goto retry;
} else {
switch (holder->creation_phase) {
@@ -158,10 +164,17 @@ retry:
/* if we've got a subchannel, then let's ask it to create a call */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
holder->connected_subchannel != NULL) {
- gpr_atm_rel_store(
- &holder->subchannel_call,
- (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollent));
+ grpc_subchannel_call *subchannel_call = NULL;
+ grpc_error *error = grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollent,
+ &subchannel_call);
+ if (error != GRPC_ERROR_NONE) {
+ subchannel_call = CANCELLED_CALL;
+ fail_locked(exec_ctx, holder, GRPC_ERROR_REF(error));
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ }
+ gpr_atm_rel_store(&holder->subchannel_call,
+ (gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, holder);
goto retry;
}
@@ -189,10 +202,17 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_REFERENCING(
"Cancelled before creating subchannel", &error, 1));
} else {
- gpr_atm_rel_store(
- &holder->subchannel_call,
- (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollent));
+ grpc_subchannel_call *subchannel_call = NULL;
+ grpc_error *new_error = grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollent,
+ &subchannel_call);
+ if (new_error != GRPC_ERROR_NONE) {
+ new_error = grpc_error_add_child(new_error, error);
+ subchannel_call = CANCELLED_CALL;
+ fail_locked(exec_ctx, holder, new_error);
+ }
+ gpr_atm_rel_store(&holder->subchannel_call,
+ (gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);