diff options
-rw-r--r-- | src/core/lib/channel/message_size_filter.c | 14 | ||||
-rw-r--r-- | src/core/lib/surface/call.c | 15 | ||||
-rw-r--r-- | test/core/end2end/tests/max_message_length.c | 165 |
3 files changed, 179 insertions, 15 deletions
diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 02fc68fc3a..f067a3a51c 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -73,16 +73,22 @@ static void recv_message_ready(grpc_exec_ctx* exec_ctx, void* user_data, gpr_asprintf(&message_string, "Received message larger than max (%u vs. %d)", (*calld->recv_message)->length, chand->max_recv_size); - gpr_slice message = gpr_slice_from_copied_string(message_string); + grpc_error* new_error = grpc_error_set_int( + GRPC_ERROR_CREATE(message_string), GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_INVALID_ARGUMENT); + if (error == GRPC_ERROR_NONE) { + error = new_error; + } else { + error = grpc_error_add_child(error, new_error); + GRPC_ERROR_UNREF(new_error); + } gpr_free(message_string); - grpc_call_element_send_close_with_message( - exec_ctx, elem, GRPC_STATUS_INVALID_ARGUMENT, &message); } // Invoke the next callback. grpc_exec_ctx_sched(exec_ctx, calld->next_recv_message_ready, error, NULL); } -// Start transport op. +// Start transport stream op. static void start_transport_stream_op(grpc_exec_ctx* exec_ctx, grpc_call_element* elem, grpc_transport_stream_op* op) { diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index 5690bcab1e..b0f66f4f61 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1103,8 +1103,8 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp, } } -static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, - bool success) { +static void process_data_after_md(grpc_exec_ctx *exec_ctx, + batch_control *bctl) { grpc_call *call = bctl->call; if (call->receiving_stream == NULL) { *call->receiving_buffer = NULL; @@ -1124,8 +1124,6 @@ static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl, grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready, bctl); continue_receiving_slices(exec_ctx, bctl); - /* early out */ - return; } } @@ -1133,12 +1131,17 @@ static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp, grpc_error *error) { batch_control *bctl = bctlp; grpc_call *call = bctl->call; - + if (error != GRPC_ERROR_NONE) { + grpc_status_code status; + const char *msg; + grpc_error_get_status(error, &status, &msg); + close_with_status(exec_ctx, call, status, msg); + } gpr_mu_lock(&bctl->call->mu); if (bctl->call->has_initial_md_been_received || error != GRPC_ERROR_NONE || call->receiving_stream == NULL) { gpr_mu_unlock(&bctl->call->mu); - process_data_after_md(exec_ctx, bctlp, error); + process_data_after_md(exec_ctx, bctlp); } else { call->saved_receiving_stream_ready_bctlp = bctlp; gpr_mu_unlock(&bctl->call->mu); diff --git a/test/core/end2end/tests/max_message_length.c b/test/core/end2end/tests/max_message_length.c index cdca3e6748..d27ccedb4e 100644 --- a/test/core/end2end/tests/max_message_length.c +++ b/test/core/end2end/tests/max_message_length.c @@ -98,9 +98,12 @@ static void end_test(grpc_end2end_test_fixture *f) { grpc_completion_queue_destroy(f->cq); } -static void test_max_message_length(grpc_end2end_test_config config, - bool send_limit) { - gpr_log(GPR_INFO, "testing with send_limit=%d", send_limit); +// Test with request larger than the limit. +// If send_limit is true, applies send limit on client; otherwise, applies +// recv limit on server. +static void test_max_message_length_on_request(grpc_end2end_test_config config, + bool send_limit) { + gpr_log(GPR_INFO, "testing request with send_limit=%d", send_limit); grpc_end2end_test_fixture f; grpc_arg channel_arg; @@ -239,9 +242,161 @@ done: config.tear_down_data(&f); } +// Test with response larger than the limit. +// If send_limit is true, applies send limit on server; otherwise, applies +// recv limit on client. +static void test_max_message_length_on_response(grpc_end2end_test_config config, + bool send_limit) { + gpr_log(GPR_INFO, "testing response with send_limit=%d", send_limit); + + grpc_end2end_test_fixture f; + grpc_arg channel_arg; + grpc_channel_args channel_args; + grpc_call *c = NULL; + grpc_call *s = NULL; + cq_verifier *cqv; + grpc_op ops[6]; + grpc_op *op; + gpr_slice response_payload_slice = + gpr_slice_from_copied_string("hello world"); + grpc_byte_buffer *response_payload = + grpc_raw_byte_buffer_create(&response_payload_slice, 1); + grpc_byte_buffer *recv_payload = NULL; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + char *details = NULL; + size_t details_capacity = 0; + int was_cancelled = 2; + + channel_arg.key = send_limit ? GRPC_ARG_MAX_SEND_MESSAGE_LENGTH + : GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH; + channel_arg.type = GRPC_ARG_INTEGER; + channel_arg.value.integer = 5; + + channel_args.num_args = 1; + channel_args.args = &channel_arg; + + f = begin_test(config, "test_max_message_length", + send_limit ? NULL : &channel_args, + send_limit ? &channel_args : NULL); + cqv = cq_verifier_create(f.cq); + + c = grpc_channel_create_call(f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, + "/foo", "foo.test.google.fr:1234", + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &recv_payload; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = response_payload; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(102), 1); + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + cq_verify(cqv); + + GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); + GPR_ASSERT(0 == strcmp(call_details.host, "foo.test.google.fr:1234")); + GPR_ASSERT(was_cancelled == 0); + + GPR_ASSERT(status == GRPC_STATUS_INVALID_ARGUMENT); + GPR_ASSERT(strcmp(details, + send_limit + ? "Sent message larger than max (11 vs. 5)" + : "Received message larger than max (11 vs. 5)") == 0); + + gpr_free(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + grpc_byte_buffer_destroy(response_payload); + grpc_byte_buffer_destroy(recv_payload); + + grpc_call_destroy(c); + if (s != NULL) grpc_call_destroy(s); + + cq_verifier_destroy(cqv); + + end_test(&f); + config.tear_down_data(&f); +} + void max_message_length(grpc_end2end_test_config config) { - test_max_message_length(config, true); - test_max_message_length(config, false); + test_max_message_length_on_request(config, false /* send_limit */); + test_max_message_length_on_request(config, true /* send_limit */); + test_max_message_length_on_response(config, false /* send_limit */); + test_max_message_length_on_response(config, true /* send_limit */); } void max_message_length_pre_init(void) {} |