aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/core/ext/census/grpc_filter.c14
-rw-r--r--src/core/ext/client_config/client_channel.c6
-rw-r--r--src/core/ext/client_config/subchannel.c7
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c6
-rw-r--r--src/core/lib/channel/channel_stack.c19
-rw-r--r--src/core/lib/channel/channel_stack.h18
-rw-r--r--src/core/lib/channel/compress_filter.c7
-rw-r--r--src/core/lib/channel/connected_channel.c13
-rw-r--r--src/core/lib/channel/http_client_filter.c6
-rw-r--r--src/core/lib/channel/http_server_filter.c6
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c6
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c7
-rw-r--r--src/core/lib/surface/call.c9
-rw-r--r--src/core/lib/surface/lame_client.c7
-rw-r--r--src/core/lib/surface/server.c6
-rw-r--r--test/core/channel/channel_stack_test.c12
-rw-r--r--test/core/end2end/tests/filter_causes_close.c7
17 files changed, 100 insertions, 56 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 72e4e5427e..dd2e2e0124 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -124,13 +124,14 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, op);
}
-static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* client_init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
+ return GRPC_ERROR_NONE;
}
static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
@@ -142,15 +143,16 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
/* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
}
-static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* server_init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *d = elem->call_data;
GPR_ASSERT(d != NULL);
memset(d, 0, sizeof(*d));
d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
/* TODO(hongyu): call census_tracing_start_op here. */
grpc_closure_init(&d->finish_recv, server_on_done_recv, elem);
+ return GRPC_ERROR_NONE;
}
static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 1d5a7d5224..b0a09dab93 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -430,10 +430,12 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem,
args->call_stack);
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 468067ea57..2e9cf7d7d4 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -709,8 +709,11 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
call->connection = con;
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
- grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
- NULL, NULL, callstk);
+ grpc_error* error = grpc_call_stack_init(exec_ctx, chanstk, 1,
+ subchannel_call_destroy, call,
+ NULL, NULL, callstk);
+// FIXME: handle error (probably requires changing this function's API)
+GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
return call;
}
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index f372f88c3a..cfe752d6e2 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -56,10 +56,12 @@ static void invoke_lr_fn_locked(grpc_load_reporting_config *lrc,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(call_data));
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index bbba85d80b..0613f94c29 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -157,12 +157,13 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack, int initial_refs,
- grpc_iomgr_cb_func destroy, void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack) {
+grpc_error* grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy,
+ void *destroy_arg,
+ grpc_call_context_element *context,
+ const void *transport_server_data,
+ grpc_call_stack *call_stack) {
grpc_channel_element *channel_elems = CHANNEL_ELEMS_FROM_STACK(channel_stack);
grpc_call_element_args args;
size_t count = channel_stack->count;
@@ -185,10 +186,14 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
call_elems[i].filter = channel_elems[i].filter;
call_elems[i].channel_data = channel_elems[i].channel_data;
call_elems[i].call_data = user_data;
- call_elems[i].filter->init_call_elem(exec_ctx, &call_elems[i], &args);
+ grpc_error* error = call_elems[i].filter->init_call_elem(
+ exec_ctx, &call_elems[i], &args);
+ if (error != GRPC_ERROR_NONE)
+ return error;
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
+ return GRPC_ERROR_NONE;
}
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 41dd4a0d8a..f56c654067 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -110,8 +110,9 @@ typedef struct {
on a client; if it is non-NULL, then it points to memory owned by the
transport and is on the server. Most filters want to ignore this
argument. */
- void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args);
+ grpc_error* (*init_call_elem)(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args);
void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx,
grpc_call_element *elem,
grpc_polling_entity *pollent);
@@ -209,12 +210,13 @@ void grpc_channel_stack_destroy(grpc_exec_ctx *exec_ctx,
/* Initialize a call stack given a channel stack. transport_server_data is
expected to be NULL on a client, or an opaque transport owned pointer on the
server. */
-void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
- grpc_channel_stack *channel_stack, int initial_refs,
- grpc_iomgr_cb_func destroy, void *destroy_arg,
- grpc_call_context_element *context,
- const void *transport_server_data,
- grpc_call_stack *call_stack);
+grpc_error* grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
+ grpc_channel_stack *channel_stack,
+ int initial_refs, grpc_iomgr_cb_func destroy,
+ void *destroy_arg,
+ grpc_call_context_element *context,
+ const void *transport_server_data,
+ grpc_call_stack *call_stack);
/* Set a pollset or a pollset_set for a call stack: must occur before the first
* op is started */
void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 32ebe53ee6..bcc8e17a9d 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -256,8 +256,9 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
@@ -266,6 +267,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
calld->has_compression_algorithm = 0;
grpc_closure_init(&calld->got_slice, got_slice, elem);
grpc_closure_init(&calld->send_done, send_done, elem);
+
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 0a7d27a1dc..8dde688df2 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -81,16 +81,17 @@ static void con_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- int r;
-
- r = grpc_transport_init_stream(
+ int r = grpc_transport_init_stream(
exec_ctx, chand->transport, TRANSPORT_STREAM_FROM_CALL_DATA(calld),
&args->call_stack->refcount, args->server_transport_data);
- GPR_ASSERT(r == 0);
+ return r == 0
+ ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE("transport initialization failed");
}
static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index ab6c6c9ef0..30f4ed1f6f 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -169,11 +169,13 @@ static void hc_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
calld->on_done_recv = NULL;
grpc_closure_init(&calld->hc_on_recv, hc_on_recv, elem);
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index d0beebd817..960d1ce45e 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -224,13 +224,15 @@ static void hs_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
/* initialize members */
memset(calld, 0, sizeof(*calld));
grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem);
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 76be2acd72..6179b9e18c 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -264,10 +264,12 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
memset(calld, 0, sizeof(*calld));
+ return GRPC_ERROR_NONE;
}
static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 12e789bde9..379247131b 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -199,8 +199,9 @@ static void auth_start_transport_op(grpc_exec_ctx *exec_ctx,
}
/* Constructor for call_data */
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
/* grab pointers to our data from the call element */
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
@@ -222,6 +223,8 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
args->context[GRPC_CONTEXT_SECURITY].value = server_ctx;
args->context[GRPC_CONTEXT_SECURITY].destroy =
grpc_server_security_context_destroy;
+
+ return GRPC_ERROR_NONE;
}
/* Destructor for call_data */
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 04291b0ee0..d64ca64a15 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -262,9 +262,12 @@ grpc_call *grpc_call_create(
call->send_deadline = send_deadline;
GRPC_CHANNEL_INTERNAL_REF(channel, "call");
/* initial refcount dropped by grpc_call_destroy */
- grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
- call->context, server_transport_data,
- CALL_STACK_FROM_CALL(call));
+ grpc_error* error = grpc_call_stack_init(&exec_ctx, channel_stack, 1,
+ destroy_call, call, call->context,
+ server_transport_data,
+ CALL_STACK_FROM_CALL(call));
+// FIXME: handle error (probably requires changing this function's API)
+GPR_ASSERT(error == GRPC_ERROR_NONE);
if (cq != NULL) {
GPR_ASSERT(
pollset_set_alternative == NULL &&
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index 5ea4cba5d1..7d9168f2e0 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -107,8 +107,11 @@ static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
GRPC_ERROR_UNREF(op->disconnect_with_error);
}
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {}
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ return GRPC_ERROR_NONE;
+}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_stats *stats,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index def6e5068b..0a0c224b4d 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -848,8 +848,9 @@ static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
}
}
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
memset(calld, 0, sizeof(call_data));
@@ -861,6 +862,7 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
server_on_recv_initial_metadata, elem);
server_ref(chand->server);
+ return GRPC_ERROR_NONE;
}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c
index f9561bed70..9c6a47eb52 100644
--- a/test/core/channel/channel_stack_test.c
+++ b/test/core/channel/channel_stack_test.c
@@ -53,10 +53,12 @@ static void channel_init_func(grpc_exec_ctx *exec_ctx,
*(int *)(elem->channel_data) = 0;
}
-static void call_init_func(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {
+static grpc_error* call_init_func(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
++*(int *)(elem->channel_data);
*(int *)(elem->call_data) = 0;
+ return GRPC_ERROR_NONE;
}
static void channel_destroy_func(grpc_exec_ctx *exec_ctx,
@@ -132,8 +134,10 @@ static void test_create_channel_stack(void) {
GPR_ASSERT(*channel_data == 0);
call_stack = gpr_malloc(channel_stack->call_stack_size);
- grpc_call_stack_init(&exec_ctx, channel_stack, 1, free_call, call_stack, NULL,
- NULL, call_stack);
+ grpc_error* error = grpc_call_stack_init(&exec_ctx, channel_stack, 1,
+ free_call, call_stack, NULL, NULL,
+ call_stack);
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
GPR_ASSERT(call_stack->count == 1);
call_elem = grpc_call_stack_element(call_stack, 0);
GPR_ASSERT(call_elem->filter == channel_elem->filter);
diff --git a/test/core/end2end/tests/filter_causes_close.c b/test/core/end2end/tests/filter_causes_close.c
index 526c05ca3e..3ff0abed63 100644
--- a/test/core/end2end/tests/filter_causes_close.c
+++ b/test/core/end2end/tests/filter_causes_close.c
@@ -233,8 +233,11 @@ static void start_transport_stream_op(grpc_exec_ctx *exec_ctx,
grpc_call_next_op(exec_ctx, elem, op);
}
-static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_call_element_args *args) {}
+static grpc_error* init_call_elem(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_call_element_args *args) {
+ return GRPC_ERROR_NONE;
+}
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_stats *stats,