aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Yang Gao <yangg@google.com>2017-02-27 12:40:04 -0800
committerGravatar GitHub <noreply@github.com>2017-02-27 12:40:04 -0800
commitdc720ca6bf27181c040cefcdb298d9dee8bf3058 (patch)
treef8cd06c6311c3f314c1c8d2781ddf64127805b1d /src/core
parent07e4c13a68d8bb54463eb693afd16e29aec22e28 (diff)
parent23f777df084f15df8d0aac3684c5f5c88399c44d (diff)
Merge pull request #9846 from yang-g/max_message_size
Stop receiving and add an error to batch when there is an error from lower level.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/lib/surface/call.c17
-rw-r--r--src/core/lib/surface/server.c3
2 files changed, 13 insertions, 7 deletions
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 3c563bcc6f..af32c5b378 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -112,7 +112,7 @@ static received_status unpack_received_status(gpr_atm atm) {
.error = (grpc_error *)(atm & ~(gpr_atm)1)};
}
-#define MAX_ERRORS_PER_BATCH 3
+#define MAX_ERRORS_PER_BATCH 4
typedef struct batch_control {
grpc_call *call;
@@ -254,7 +254,7 @@ static void set_status_from_error(grpc_exec_ctx *exec_ctx, grpc_call *call,
static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl);
static void post_batch_completion(grpc_exec_ctx *exec_ctx, batch_control *bctl);
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
- grpc_error *error);
+ grpc_error *error, bool has_cancelled);
static void add_init_error(grpc_error **composite, grpc_error *new) {
if (new == GRPC_ERROR_NONE) return;
@@ -1223,6 +1223,11 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_call *call = bctl->call;
gpr_mu_lock(&bctl->call->mu);
if (error != GRPC_ERROR_NONE) {
+ if (call->receiving_stream != NULL) {
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ call->receiving_stream = NULL;
+ }
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), true);
cancel_with_error(exec_ctx, call, STATUS_FROM_SURFACE,
GRPC_ERROR_REF(error));
}
@@ -1289,10 +1294,10 @@ static void validate_filtered_metadata(grpc_exec_ctx *exec_ctx,
}
static void add_batch_error(grpc_exec_ctx *exec_ctx, batch_control *bctl,
- grpc_error *error) {
+ grpc_error *error, bool has_cancelled) {
if (error == GRPC_ERROR_NONE) return;
int idx = (int)gpr_atm_no_barrier_fetch_add(&bctl->num_errors, 1);
- if (idx == 0) {
+ if (idx == 0 && !has_cancelled) {
cancel_with_error(exec_ctx, bctl->call, STATUS_FROM_CORE,
GRPC_ERROR_REF(error));
}
@@ -1306,7 +1311,7 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
gpr_mu_lock(&call->mu);
- add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
if (error == GRPC_ERROR_NONE) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
@@ -1343,7 +1348,7 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp,
grpc_error *error) {
batch_control *bctl = bctlp;
- add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error));
+ add_batch_error(exec_ctx, bctl, GRPC_ERROR_REF(error), false);
finish_batch_step(exec_ctx, bctl);
}
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 7210c69fb0..fdb230b3e2 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -540,7 +540,8 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg,
&calld->kill_zombie_closure, kill_zombie,
grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0),
grpc_schedule_on_exec_ctx);
- grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure, error);
+ grpc_closure_sched(exec_ctx, &calld->kill_zombie_closure,
+ GRPC_ERROR_REF(error));
return;
}