aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/surface/call.c17
-rw-r--r--src/core/lib/surface/server.c3
-rw-r--r--src/proto/grpc/testing/echo_messages.proto1
3 files changed, 14 insertions, 7 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 3c563bcc6f..af32c5b378 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -112,7 +112,7 @@ static received_status unpack_received_status(gpr_atm atm) {
.error = (grpc_error *)(atm & ~(gpr_atm)1)};
}
-#define MAX_ERRORS_PER_BATCH 3
+#define MAX_ERRORS_PER_BATCH 4
typedef struct batch_control {
grpc_call *call;
@@ -254,7 +254,7 @@ static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl);
static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl);
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
- grpc_error *error);
+ grpc_error *error, bool has_cancelled);
static void add_init_error(grpc_error **composite, grpc_error *new) {
if (new == GRPC_ERROR_NONE) return;
@@ -1223,6 +1223,11 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
if (error != GRPC_ERROR_NONE) {
+ if (call->receiving_stream != NULL) {
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ call->receiving_stream = NULL;
+ }
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true);
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
}
@@ -1289,10 +1294,10 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
}
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
- grpc_error *error) {
+ grpc_error *error, bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
- if (idx == 0) {
+ if (idx == 0 && !has_cancelled) {
cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
}
@@ -1306,7 +1311,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu);
- add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1343,7 +1348,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
- add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 7210c69fb0..fdb230b3e2 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -540,7 +540,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
- grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_ERROR_REF(error));
return;
}
diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto
index b405acf043..efb6f4d493 100644
--- a/src/proto/grpc/testing/echo_messages.proto
+++ b/src/proto/grpc/testing/echo_messages.proto
@@ -50,6 +50,7 @@ message RequestParams {
bool skip_cancelled_check = 9;
string expected_transport_security_type = 10;
DebugInfo debug_info = 11;
+ bool server_die = 12; // Server should not see a request with this set.
}
message EchoRequest {