aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/ext/census/grpc_filter.c14
-rw-r--r--src/core/ext/client_config/client_channel.c6
-rw-r--r--src/core/ext/client_config/subchannel.c25
-rw-r--r--src/core/ext/client_config/subchannel.h4
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c38
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c7
-rw-r--r--src/core/lib/channel/channel_stack.c25
-rw-r--r--src/core/lib/channel/channel_stack.h18
-rw-r--r--src/core/lib/channel/compress_filter.c7
-rw-r--r--src/core/lib/channel/connected_channel.c12
-rw-r--r--src/core/lib/channel/http_client_filter.c6
-rw-r--r--src/core/lib/channel/http_server_filter.c6
-rw-r--r--src/core/lib/iomgr/error.h2
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c6
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c7
-rw-r--r--src/core/lib/surface/call.c16
-rw-r--r--src/core/lib/surface/lame_client.c7
-rw-r--r--src/core/lib/surface/server.c6
18 files changed, 144 insertions, 68 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 3004a1fc97..9dacc17eb4 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -127,13 +127,14 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, op);
}
-static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
+ return GRPC_ERROR_NONE;
}
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
@@ -145,15 +146,16 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
}
-static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
/* TODO(hongyu): call census_tracing_start_op here. */
grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
+ return GRPC_ERROR_NONE;
}
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 739487a06b..2c0c4abffc 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -436,10 +436,12 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem,
args->call_stack);
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index d089cd4399..df35904b85 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -702,19 +702,26 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
return GET_CONNECTED_SUBCHANNEL(c, acq);
}
-grpc_subchannel_call *grpc_connected_subchannel_create_call(
+grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_polling_entity *pollent) {
+ grpc_polling_entity *pollent, grpc_subchannel_call **call) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
- grpc_subchannel_call *call =
- gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
- grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
- call->connection = con;
+ *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
+ grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call);
+ (*call)->connection = con; // Ref is added below.
+ grpc_error *error =
+ grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call,
+ NULL, NULL, callstk);
+ if (error != GRPC_ERROR_NONE) {
+ const char *error_string = grpc_error_string(error);
+ gpr_log(GPR_ERROR, "error: %s", error_string);
+ grpc_error_free_string(error_string);
+ gpr_free(*call);
+ return error;
+ }
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
- NULL, NULL, callstk);
grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
- return call;
+ return GRPC_ERROR_NONE;
}
grpc_call_stack *grpc_subchannel_call_get_call_stack(
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index b6d39f5dc5..ae1d96e640 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -108,9 +108,9 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a subchannel call */
-grpc_subchannel_call *grpc_connected_subchannel_create_call(
+grpc_error *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_polling_entity *pollent);
+ grpc_polling_entity *pollent, grpc_subchannel_call **subchannel_call);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index b96a0ad093..be6d054af4 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -84,6 +84,11 @@ void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
gpr_free(holder->waiting_ops);
}
+// The logic here is fairly complicated, due to (a) the fact that we
+// need to handle the case where we receive the send op before the
+// initial metadata op, and (b) the need for efficiency, especially in
+// the streaming case.
+// TODO(ctiller): Explain this more thoroughly.
void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder,
grpc_transport_stream_op *op) {
@@ -121,7 +126,8 @@ retry:
}
/* if this is a cancellation, then we can raise our cancelled flag */
if (op->cancel_error != GRPC_ERROR_NONE) {
- if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
+ if (!gpr_atm_rel_cas(&holder->subchannel_call, 0,
+ (gpr_atm)(uintptr_t)CANCELLED_CALL)) {
goto retry;
} else {
switch (holder->creation_phase) {
@@ -158,10 +164,17 @@ retry:
/* if we've got a subchannel, then let's ask it to create a call */
if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
holder->connected_subchannel != NULL) {
- gpr_atm_rel_store(
- &holder->subchannel_call,
- (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollent));
+ grpc_subchannel_call *subchannel_call = NULL;
+ grpc_error *error = grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollent,
+ &subchannel_call);
+ if (error != GRPC_ERROR_NONE) {
+ subchannel_call = CANCELLED_CALL;
+ fail_locked(exec_ctx, holder, GRPC_ERROR_REF(error));
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op, error);
+ }
+ gpr_atm_rel_store(&holder->subchannel_call,
+ (gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, holder);
goto retry;
}
@@ -189,10 +202,17 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_ERROR_CREATE_REFERENCING(
"Cancelled before creating subchannel", &error, 1));
} else {
- gpr_atm_rel_store(
- &holder->subchannel_call,
- (gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollent));
+ grpc_subchannel_call *subchannel_call = NULL;
+ grpc_error *new_error = grpc_connected_subchannel_create_call(
+ exec_ctx, holder->connected_subchannel, holder->pollent,
+ &subchannel_call);
+ if (new_error != GRPC_ERROR_NONE) {
+ new_error = grpc_error_add_child(new_error, error);
+ subchannel_call = CANCELLED_CALL;
+ fail_locked(exec_ctx, holder, new_error);
+ }
+ gpr_atm_rel_store(&holder->subchannel_call,
+ (gpr_atm)(uintptr_t)subchannel_call);
retry_waiting_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index 99b560ae27..394f0cb832 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -107,8 +107,9 @@ static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(call_data));
@@ -125,6 +126,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
NULL,
NULL};
*/
+
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index f9b7347b89..98f304f2da 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -157,12 +157,13 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack, int initial_refs,
- grpc_iomgr_cb_func destroy, void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack) {
+grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy,
+ void *destroy_arg,
+ grpc_call_context_element *context,
+ const void *transport_server_data,
+ grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -178,6 +179,7 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
ROUND_UP_TO_ALIGNMENT_SIZE(count * sizeof(grpc_call_element));
/* init per-filter data */
+ grpc_error *first_error = GRPC_ERROR_NONE;
for (i = 0; i < count; i++) {
args.call_stack = call_stack;
args.server_transport_data = transport_server_data;
@@ -185,10 +187,19 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
- call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
+ grpc_error *error =
+ call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
+ if (error != GRPC_ERROR_NONE) {
+ if (first_error == GRPC_ERROR_NONE) {
+ first_error = error;
+ } else {
+ GRPC_ERROR_UNREF(error);
+ }
+ }
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
+ return first_error;
}
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 19d18ccf93..ee68701456 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -115,8 +115,9 @@ typedef struct {
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
argument. */
- void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args);
+ grpc_error *(*init_call_elem)(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args);
void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent);
@@ -215,12 +216,13 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
-void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack, int initial_refs,
- grpc_iomgr_cb_func destroy, void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack);
+grpc_error *grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy,
+ void *destroy_arg,
+ grpc_call_context_element *context,
+ const void *transport_server_data,
+ grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index af21ed794d..134180e619 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -256,8 +256,9 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -266,6 +267,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
calld->has_compression_algorithm = 0;
grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);
+
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 73714369cd..918379c845 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -81,16 +81,16 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- int r;
-
- r = grpc_transport_init_stream(
+ int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data);
- GPR_ASSERT(r == 0);
+ return r == 0 ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE("transport stream initialization failed");
}
static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index f1ed22c0ad..a7a775cc53 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -175,11 +175,13 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
calld->on_done_recv = NULL;
grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index d52cc7d018..5ce51f9016 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -224,13 +224,15 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h
index 13f898e31a..bc7781250e 100644
--- a/src/core/lib/iomgr/error.h
+++ b/src/core/lib/iomgr/error.h
@@ -171,6 +171,8 @@ grpc_error *grpc_error_set_time(grpc_error *src, grpc_error_times which,
gpr_timespec value) GRPC_MUST_USE_RESULT;
grpc_error *grpc_error_set_str(grpc_error *src, grpc_error_strs which,
const char *value) GRPC_MUST_USE_RESULT;
+/// Returns NULL if the specified string is not set.
+/// Caller does NOT own return value.
const char *grpc_error_get_str(grpc_error *error, grpc_error_strs which);
/// Add a child error: an error that is believed to have contributed to this
/// error occurring. Allows root causing high level errors from lower level
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 382d30756a..2a1bf4d4e3 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -267,10 +267,12 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(*calld));
+ return GRPC_ERROR_NONE;
}
static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 5f3d0dcd6e..def16c8229 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -199,8 +199,9 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
@@ -222,6 +223,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
args->context[GRPC_CONTEXT_SECURITY].value = server_ctx;
args->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_server_security_context_destroy;
+
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 59295f47f0..772681109a 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -264,9 +264,19 @@ grpc_call *grpc_call_create(
gpr_convert_clock_type(send_deadline, GPR_CLOCK_MONOTONIC);
GRPC_CHANNEL_INTERNAL_REF(channel, "call");
/* initial refcount dropped by grpc_call_destroy */
- grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
- call->context, server_transport_data,
- CALL_STACK_FROM_CALL(call));
+ grpc_error *error = grpc_call_stack_init(
+ &exec_ctx, channel_stack, 1, destroy_call, call, call->context,
+ server_transport_data, CALL_STACK_FROM_CALL(call));
+ if (error != GRPC_ERROR_NONE) {
+ intptr_t status;
+ if (!grpc_error_get_int(error, GRPC_ERROR_INT_GRPC_STATUS, &status))
+ status = GRPC_STATUS_UNKNOWN;
+ const char *error_str =
+ grpc_error_get_str(error, GRPC_ERROR_STR_DESCRIPTION);
+ close_with_status(&exec_ctx, call, (grpc_status_code)status,
+ error_str == NULL ? "unknown error" : error_str);
+ GRPC_ERROR_UNREF(error);
+ }
if (cq != NULL) {
GPR_ASSERT(
pollset_set_alternative == NULL &&
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 0d3168e56a..19b78369dd 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -107,8 +107,11 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {}
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ return GRPC_ERROR_NONE;
+}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_final_info *final_info,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index a482ba43d8..1d61ec3bbb 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -856,8 +856,9 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
}
}
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -869,6 +870,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_on_recv_initial_metadata, elem);
server_ref(chand->server);
+ return GRPC_ERROR_NONE;
}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,