diff options
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_api_dummy.c | 3 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 146 | ||||
-rw-r--r-- | src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m | 11 | ||||
-rw-r--r-- | src/objective-c/tests/Podfile | 1 | ||||
-rw-r--r-- | src/objective-c/tests/Tests.xcodeproj/project.pbxproj | 1 | ||||
-rwxr-xr-x | test/core/end2end/gen_build_yaml.py | 2 | ||||
-rw-r--r-- | test/core/end2end/tests/packet_coalescing.c | 284 | ||||
-rw-r--r-- | third_party/objective_c/Cronet/cronet_c_for_grpc.h | 115 |
8 files changed, 482 insertions, 81 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c index 687026c9fd..38755604b9 100644 --- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c +++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c @@ -77,9 +77,8 @@ int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, return 0; } -int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { +void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { GPR_ASSERT(0); - return 0; } #endif /* GRPC_COMPILE_WITH_CRONET */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 296c406324..96ff6ef772 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -86,7 +86,7 @@ enum e_op_id { /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */ -static void on_request_headers_sent(cronet_bidirectional_stream *); +static void on_stream_ready(cronet_bidirectional_stream *); static void on_response_headers_received( cronet_bidirectional_stream *, const cronet_bidirectional_stream_header_array *, const char *); @@ -99,7 +99,7 @@ static void on_succeeded(cronet_bidirectional_stream *); static void on_failed(cronet_bidirectional_stream *, int); static void on_canceled(cronet_bidirectional_stream *); static cronet_bidirectional_stream_callback cronet_callbacks = { - on_request_headers_sent, + on_stream_ready, on_response_headers_received, on_read_completed, on_write_completed, @@ -151,6 +151,11 @@ struct op_state { bool state_callback_received[OP_NUM_OPS]; bool fail_state; bool flush_read; +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + bool flush_cronet_when_ready; + bool pending_write_for_trailer; +#endif + bool unprocessed_send_message; grpc_error *cancel_error; /* data structure for storing data coming from server */ struct read_state rs; @@ -273,6 +278,9 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { new_op->next = storage->head; storage->head = new_op; storage->num_pending_ops++; + if (op->send_message) { + s->state.unprocessed_send_message = true; + } CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, storage->num_pending_ops); gpr_mu_unlock(&s->mu); @@ -405,8 +413,8 @@ static void on_succeeded(cronet_bidirectional_stream *stream) { /* Cronet callback */ -static void on_request_headers_sent(cronet_bidirectional_stream *stream) { - CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); +static void on_stream_ready(cronet_bidirectional_stream *stream) { + CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; @@ -416,6 +424,14 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) { gpr_free(s->header_array.headers); s->header_array.headers = NULL; } +/* Send the initial metadata on wire if there is no SEND_MESSAGE or + * SEND_TRAILING_METADATA ops pending */ +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + if (s->state.flush_cronet_when_ready) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + cronet_bidirectional_stream_flush(stream); + } +#endif gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -554,6 +570,10 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; cronet_bidirectional_stream_write(s->cbs, "", 0, true); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + cronet_bidirectional_stream_flush(s->cbs); +#endif s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); @@ -603,7 +623,7 @@ static void convert_metadata_to_cronet_headers( curr = curr->next; num_headers_available++; } - /* Allocate enough memory. It is freed in the on_request_headers_sent callback + /* Allocate enough memory. It is freed in the on_stream_ready callback */ cronet_bidirectional_stream_header *headers = (cronet_bidirectional_stream_header *)gpr_malloc( @@ -745,12 +765,16 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; /* we haven't sent message yet */ - else if (curr_op->send_message && + else if (stream_state->unprocessed_send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false; /* we haven't got on_write_completed for the send yet */ else if (stream_state->state_op_done[OP_SEND_MESSAGE] && - !stream_state->state_callback_received[OP_SEND_MESSAGE]) + !stream_state->state_callback_received[OP_SEND_MESSAGE] +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + && !stream_state->pending_write_for_trailer +#endif + ) result = false; } else if (op_id == OP_CANCEL_ERROR) { /* already executed */ @@ -836,6 +860,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + cronet_bidirectional_stream_disable_auto_flush(s->cbs, true); + cronet_bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); +#endif char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; @@ -848,30 +876,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; - result = ACTION_TAKEN_WITH_CALLBACK; - } else if (stream_op->recv_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_RECV_INITIAL_METADATA)) { - CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); - if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_CANCELLED); - } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_closure_sched( - exec_ctx, stream_op->recv_initial_metadata_ready, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); - } else { - grpc_chttp2_incoming_metadata_buffer_publish( - &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + if (!stream_op->send_message && !stream_op->send_trailing_metadata) { + s->state.flush_cronet_when_ready = true; } - stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; - result = ACTION_TAKEN_NO_CALLBACK; +#endif + result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_op->send_message && op_can_be_run(stream_op, stream_state, &oas->state, OP_SEND_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); + stream_state->unprocessed_send_message = false; if (stream_state->state_callback_received[OP_FAILED]) { result = NO_ACTION_POSSIBLE; CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); @@ -902,13 +917,63 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_callback_received[OP_SEND_MESSAGE] = false; cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, (int)write_buffer_size, false); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + if (!stream_op->send_trailing_metadata) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", + s->cbs); + cronet_bidirectional_stream_flush(s->cbs); + result = ACTION_TAKEN_WITH_CALLBACK; + } else { + stream_state->pending_write_for_trailer = true; + result = ACTION_TAKEN_NO_CALLBACK; + } +#else result = ACTION_TAKEN_WITH_CALLBACK; +#endif } else { result = NO_ACTION_POSSIBLE; } } stream_state->state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true; + } else if (stream_op->send_trailing_metadata && + op_can_be_run(stream_op, stream_state, &oas->state, + OP_SEND_TRAILING_METADATA)) { + CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); + if (stream_state->state_callback_received[OP_FAILED]) { + result = NO_ACTION_POSSIBLE; + CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); + } else { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", + s->cbs); + stream_state->state_callback_received[OP_SEND_MESSAGE] = false; + cronet_bidirectional_stream_write(s->cbs, "", 0, true); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + cronet_bidirectional_stream_flush(s->cbs); +#endif + result = ACTION_TAKEN_WITH_CALLBACK; + } + stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; + } else if (stream_op->recv_initial_metadata && + op_can_be_run(stream_op, stream_state, &oas->state, + OP_RECV_INITIAL_METADATA)) { + CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); + if (stream_state->state_op_done[OP_CANCEL_ERROR]) { + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); + } else if (stream_state->state_callback_received[OP_FAILED]) { + grpc_closure_sched( + exec_ctx, stream_op->recv_initial_metadata_ready, + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.")); + } else { + grpc_chttp2_incoming_metadata_buffer_publish( + &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); + } + stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->recv_message && op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_MESSAGE)) { @@ -967,6 +1032,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; + + /* Extra read to trigger on_succeed */ + stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; + stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; + stream_state->rs.received_bytes = 0; + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + stream_state->state_op_done[OP_READ_REQ_MADE] = + true; /* Indicates that at least one read request has been made */ + cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, + stream_state->rs.remaining_bytes); result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_state->rs.remaining_bytes == 0) { @@ -1025,21 +1100,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; - } else if (stream_op->send_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_TRAILING_METADATA)) { - CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); - if (stream_state->state_callback_received[OP_FAILED]) { - result = NO_ACTION_POSSIBLE; - CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); - } else { - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", - s->cbs); - stream_state->state_callback_received[OP_SEND_MESSAGE] = false; - cronet_bidirectional_stream_write(s->cbs, "", 0, true); - result = ACTION_TAKEN_WITH_CALLBACK; - } - stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; } else if (stream_op->cancel_error && op_can_be_run(stream_op, stream_state, &oas->state, OP_CANCEL_ERROR)) { @@ -1121,6 +1181,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(s->state.state_callback_received)); s->state.fail_state = s->state.flush_read = false; s->state.cancel_error = NULL; +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; +#endif + s->state.unprocessed_send_message = false; gpr_mu_init(&s->mu); return 0; } diff --git a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m index 4ba7badd86..315d942484 100644 --- a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m +++ b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m @@ -168,6 +168,9 @@ static grpc_end2end_test_config configs[] = { chttp2_tear_down_secure_fullstack}, }; +extern void packet_coalescing(grpc_end2end_test_config config); +extern void packet_coalescing_pre_init(void); + static char *roots_filename; @interface CoreCronetEnd2EndTests : XCTestCase @@ -346,6 +349,14 @@ static char *roots_filename; [self testIndividualCase:"no_op"]; } +- (void)testPacketCoalescing { + // Directly invoke the test function since the test is for Cronet only and thus not included in + // end2end_tests.c + // TODO (mxyan): Do the same to all test cases so that this file will no longer depend on + // end2end_tests.c + packet_coalescing(configs[0]); +} + - (void)testPayload { [self testIndividualCase:"payload"]; } diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile index 5785b976f2..d1ef0886fe 100644 --- a/src/objective-c/tests/Podfile +++ b/src/objective-c/tests/Podfile @@ -92,6 +92,7 @@ post_install do |installer| # GPR_UNREACHABLE_CODE causes "Control may reach end of non-void # function" warning config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO' + config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1' end end diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj index c4a6567ae0..8455e71b02 100644 --- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj +++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj @@ -1296,6 +1296,7 @@ "$(inherited)", "GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS=1", "GRPC_COMPILE_WITH_CRONET=1", + "GRPC_CRONET_WITH_PACKET_COALESCING=1", ); INFOPLIST_FILE = InteropTestsRemoteWithCronet/Info.plist; IPHONEOS_DEPLOYMENT_TARGET = 9.3; diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py index 201a92a1fd..78779908a9 100755 --- a/test/core/end2end/gen_build_yaml.py +++ b/test/core/end2end/gen_build_yaml.py @@ -91,6 +91,7 @@ LOWCPU = 0.1 # maps test names to options END2END_TESTS = { + 'authority_not_supported': default_test_options, 'bad_hostname': default_test_options, 'binary_metadata': default_test_options, 'resource_quota_server': default_test_options._replace(large_writes=True, @@ -142,7 +143,6 @@ END2END_TESTS = { 'simple_request': default_test_options, 'streaming_error_response': default_test_options, 'trailing_metadata': default_test_options, - 'authority_not_supported': default_test_options, } diff --git a/test/core/end2end/tests/packet_coalescing.c b/test/core/end2end/tests/packet_coalescing.c new file mode 100644 index 0000000000..336f765a13 --- /dev/null +++ b/test/core/end2end/tests/packet_coalescing.c @@ -0,0 +1,284 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ +#include "test/core/end2end/end2end_tests.h" + +#include <stdio.h> +#include <string.h> + +#include <grpc/byte_buffer.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "test/core/end2end/cq_verifier.h" + +extern void gpr_default_log(gpr_log_func_args *args); + +static void *tag(intptr_t t) { return (void *)t; } + +static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config, + const char *test_name, + grpc_channel_args *client_args, + grpc_channel_args *server_args) { + grpc_end2end_test_fixture f; + gpr_log(GPR_INFO, "%s/%s", test_name, config.name); + f = config.create_fixture(client_args, server_args); + config.init_server(&f, server_args); + config.init_client(&f, client_args); + return f; +} + +static gpr_timespec n_seconds_time(int n) { + return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n); +} + +static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); } + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event ev; + do { + ev = grpc_completion_queue_next(cq, five_seconds_time(), NULL); + } while (ev.type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown_and_notify(f->server, f->cq, tag(1000)); + GPR_ASSERT(grpc_completion_queue_pluck( + f->cq, tag(1000), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), NULL) + .type == GRPC_OP_COMPLETE); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->cq); + drain_cq(f->cq); + grpc_completion_queue_destroy(f->cq); +} + +static bool coalesced_message_and_eos; + +static void log_processor(gpr_log_func_args *args) { + const int file_len = (int)strlen(args->file); + const char suffix1[] = "secure_endpoint.c"; + const int suffix1_len = sizeof(suffix1) - 1; + const char suffix2[] = "tcp_posix.c"; + const int suffix2_len = sizeof(suffix2) - 1; + const char prefix[] = "READ"; + const int prefix_len = sizeof(prefix) - 1; + if (((file_len >= suffix1_len && + 0 == strcmp(suffix1, &args->file[file_len - suffix1_len])) || + (file_len >= suffix2_len && + 0 == strcmp(suffix2, &args->file[file_len - suffix2_len]))) && + 0 == strncmp(prefix, args->message, (size_t)prefix_len) && + strstr(args->message, + "00 00 10 00 01 00 00 00 01 00 00 00 00 0b 68 65 6c 6c 6f 20 77 " + "6f 72 6c 64")) { + fprintf(stderr, "%s, %s\n", args->file, args->message); + coalesced_message_and_eos = true; + } +} + +/* Request/response with metadata and payload.*/ +static void test_request_response_with_metadata_and_payload( + grpc_end2end_test_config config) { + grpc_call *c; + grpc_call *s; + grpc_slice request_payload_slice = + grpc_slice_from_copied_string("hello world"); + grpc_byte_buffer *request_payload = + grpc_raw_byte_buffer_create(&request_payload_slice, 1); + gpr_timespec deadline = five_seconds_time(); + grpc_metadata meta_c[2] = { + {"key1", "val1", 4, 0, {{NULL, NULL, NULL, NULL}}}, + {"key2", "val2", 4, 0, {{NULL, NULL, NULL, NULL}}}}; + grpc_metadata meta_s[2] = { + {"key3", "val3", 4, 0, {{NULL, NULL, NULL, NULL}}}, + {"key4", "val4", 4, 0, {{NULL, NULL, NULL, NULL}}}}; + grpc_end2end_test_fixture f = begin_test( + config, "test_request_response_with_metadata_and_payload", NULL, NULL); + cq_verifier *cqv = cq_verifier_create(f.cq); + grpc_op ops[6]; + grpc_op *op; + grpc_metadata_array initial_metadata_recv; + grpc_metadata_array trailing_metadata_recv; + grpc_metadata_array request_metadata_recv; + grpc_byte_buffer *request_payload_recv = NULL; + grpc_call_details call_details; + grpc_status_code status; + grpc_call_error error; + char *details = NULL; + size_t details_capacity = 0; + int was_cancelled = 2; + coalesced_message_and_eos = false; + + c = grpc_channel_create_call( + f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq, "/foo", + get_host_override_string("foo.test.google.fr:1234", config), deadline, + NULL); + GPR_ASSERT(c); + + grpc_metadata_array_init(&initial_metadata_recv); + grpc_metadata_array_init(&trailing_metadata_recv); + grpc_metadata_array_init(&request_metadata_recv); + grpc_call_details_init(&call_details); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 2; + op->data.send_initial_metadata.metadata = meta_c; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_MESSAGE; + op->data.send_message = request_payload; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &initial_metadata_recv; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv; + op->data.recv_status_on_client.status = &status; + op->data.recv_status_on_client.status_details = &details; + op->data.recv_status_on_client.status_details_capacity = &details_capacity; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + error = + grpc_server_request_call(f.server, &s, &call_details, + &request_metadata_recv, f.cq, f.cq, tag(101)); + GPR_ASSERT(GRPC_CALL_OK == error); + CQ_EXPECT_COMPLETION(cqv, tag(101), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 2; + op->data.send_initial_metadata.metadata = meta_s; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_RECV_MESSAGE; + op->data.recv_message = &request_payload_recv; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(102), 1); + cq_verify(cqv); + + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op->data.recv_close_on_server.cancelled = &was_cancelled; + op->flags = 0; + op->reserved = NULL; + op++; + op->op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op->data.send_status_from_server.trailing_metadata_count = 0; + op->data.send_status_from_server.status = GRPC_STATUS_OK; + op->data.send_status_from_server.status_details = "xyz"; + op->flags = 0; + op->reserved = NULL; + op++; + error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(103), NULL); + GPR_ASSERT(GRPC_CALL_OK == error); + + CQ_EXPECT_COMPLETION(cqv, tag(103), 1); + CQ_EXPECT_COMPLETION(cqv, tag(1), 1); + cq_verify(cqv); + + GPR_ASSERT(status == GRPC_STATUS_OK); + GPR_ASSERT(0 == strcmp(details, "xyz")); + GPR_ASSERT(0 == strcmp(call_details.method, "/foo")); + validate_host_override_string("foo.test.google.fr:1234", call_details.host, + config); + GPR_ASSERT(was_cancelled == 0); + GPR_ASSERT(byte_buffer_eq_string(request_payload_recv, "hello world")); + GPR_ASSERT(contains_metadata(&request_metadata_recv, "key1", "val1")); + GPR_ASSERT(contains_metadata(&request_metadata_recv, "key2", "val2")); + GPR_ASSERT(coalesced_message_and_eos); + + gpr_free(details); + grpc_metadata_array_destroy(&initial_metadata_recv); + grpc_metadata_array_destroy(&trailing_metadata_recv); + grpc_metadata_array_destroy(&request_metadata_recv); + grpc_call_details_destroy(&call_details); + + grpc_call_destroy(c); + grpc_call_destroy(s); + + cq_verifier_destroy(cqv); + + grpc_byte_buffer_destroy(request_payload); + grpc_byte_buffer_destroy(request_payload_recv); + + end_test(&f); + config.tear_down_data(&f); +} + +void packet_coalescing(grpc_end2end_test_config config) { + gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); + grpc_tracer_set_enabled("all", 1); + gpr_set_log_function(log_processor); + test_request_response_with_metadata_and_payload(config); + gpr_set_log_function(gpr_default_log); + grpc_tracer_set_enabled("all", 0); +} + +void packet_coalescing_pre_init(void) {} diff --git a/third_party/objective_c/Cronet/cronet_c_for_grpc.h b/third_party/objective_c/Cronet/cronet_c_for_grpc.h index 15a511aebd..3d58a8370e 100644 --- a/third_party/objective_c/Cronet/cronet_c_for_grpc.h +++ b/third_party/objective_c/Cronet/cronet_c_for_grpc.h @@ -5,6 +5,8 @@ #ifndef COMPONENTS_CRONET_IOS_CRONET_C_FOR_GRPC_H_ #define COMPONENTS_CRONET_IOS_CRONET_C_FOR_GRPC_H_ +#define CRONET_EXPORT __attribute__((visibility("default"))) + #ifdef __cplusplus extern "C" { #endif @@ -15,12 +17,10 @@ extern "C" { /* Opaque object representing Cronet Engine. Created and configured outside * of this API to facilitate sharing with other components */ -typedef struct cronet_engine { void* obj; } cronet_engine; - -void cronet_engine_add_quic_hint(cronet_engine* engine, - const char* host, - int port, - int alternate_port); +typedef struct cronet_engine { + void* obj; + void* annotation; +} cronet_engine; /* Cronet Bidirectional Stream API */ @@ -45,11 +45,12 @@ typedef struct cronet_bidirectional_stream_header_array { /* Set of callbacks used to receive callbacks from bidirectional stream. */ typedef struct cronet_bidirectional_stream_callback { - /* Invoked when request headers are sent. Indicates that stream has initiated - * the request. Consumer may call cronet_bidirectional_stream_write() to start - * writing data. + /* Invoked when the stream is ready for reading and writing. + * Consumer may call cronet_bidirectional_stream_read() to start reading data. + * Consumer may call cronet_bidirectional_stream_write() to start writing + * data. */ - void (*on_request_headers_sent)(cronet_bidirectional_stream* stream); + void (*on_stream_ready)(cronet_bidirectional_stream* stream); /* Invoked when initial response headers are received. * Consumer must call cronet_bidirectional_stream_read() to start reading. @@ -67,20 +68,19 @@ typedef struct cronet_bidirectional_stream_callback { * It may be invoked after on_response_trailers_received()}, if there was * pending read data before trailers were received. * - * If count is 0, it means the remote side has signaled that it will send no - * more data; future calls to cronet_bidirectional_stream_read() will result - * in the on_data_read() callback or on_succeded() callback if + * If |bytes_read| is 0, it means the remote side has signaled that it will + * send no more data; future calls to cronet_bidirectional_stream_read() + * will result in the on_data_read() callback or on_succeded() callback if * cronet_bidirectional_stream_write() was invoked with end_of_stream set to * true. */ void (*on_read_completed)(cronet_bidirectional_stream* stream, char* data, - int count); + int bytes_read); /** * Invoked when all data passed to cronet_bidirectional_stream_write() is - * sent. - * To continue writing, call cronet_bidirectional_stream_write(). + * sent. To continue writing, call cronet_bidirectional_stream_write(). */ void (*on_write_completed)(cronet_bidirectional_stream* stream, const char* data); @@ -117,7 +117,7 @@ typedef struct cronet_bidirectional_stream_callback { void (*on_canceled)(cronet_bidirectional_stream* stream); } cronet_bidirectional_stream_callback; -/* Create a new stream object that uses |engine| and |callback|. All stream +/* Creates a new stream object that uses |engine| and |callback|. All stream * tasks are performed asynchronously on the |engine| network thread. |callback| * methods are invoked synchronously on the |engine| network thread, but must * not run tasks on the current thread to prevent blocking networking operations @@ -129,6 +129,7 @@ typedef struct cronet_bidirectional_stream_callback { * * Both |calback| and |engine| must remain valid until stream is destroyed. */ +CRONET_EXPORT cronet_bidirectional_stream* cronet_bidirectional_stream_create( cronet_engine* engine, void* annotation, @@ -136,15 +137,40 @@ cronet_bidirectional_stream* cronet_bidirectional_stream_create( /* TBD: The following methods return int. Should it be a custom type? */ -/* Destroy stream object. Destroy could be called from any thread, including +/* Destroys stream object. Destroy could be called from any thread, including * network thread, but is posted, so |stream| is valid until calling task is * complete. */ +CRONET_EXPORT int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream); -/* Start the stream by sending request to |url| using |method| and |headers|. If - * |end_of_stream| is true, then no data is expected to be written. +/** + * Disables or enables auto flush. By default, data is flushed after + * every cronet_bidirectional_stream_write(). If the auto flush is disabled, + * the client should explicitly call cronet_bidirectional_stream_flush to flush + * the data. + */ +CRONET_EXPORT void cronet_bidirectional_stream_disable_auto_flush( + cronet_bidirectional_stream* stream, + bool disable_auto_flush); + +/** + * Delays sending request headers until cronet_bidirectional_stream_flush() + * is called. This flag is currently only respected when QUIC is negotiated. + * When true, QUIC will send request header frame along with data frame(s) + * as a single packet when possible. + */ +CRONET_EXPORT +void cronet_bidirectional_stream_delay_request_headers_until_flush( + cronet_bidirectional_stream* stream, + bool delay_headers_until_flush); + +/* Starts the stream by sending request to |url| using |method| and |headers|. + * If |end_of_stream| is true, then no data is expected to be written. The + * |method| is HTTP verb, with PUT having a special meaning to mark idempotent + * request, which could use QUIC 0-RTT. */ +CRONET_EXPORT int cronet_bidirectional_stream_start( cronet_bidirectional_stream* stream, const char* url, @@ -153,46 +179,61 @@ int cronet_bidirectional_stream_start( const cronet_bidirectional_stream_header_array* headers, bool end_of_stream); -/* Read response data into |buffer| of |capacity| length. Must only be called at - * most once in response to each invocation of the - * on_response_headers_received() and on_read_completed() methods of the - * cronet_bidirectional_stream_callback. - * Each call will result in an invocation of one of the callback's - * on_read_completed method if data is read, its on_succeeded() method if - * the stream is closed, or its on_failed() method if there's an error. +/* Reads response data into |buffer| of |capacity| length. Must only be called + * at most once in response to each invocation of the + * on_stream_ready()/on_response_headers_received() and on_read_completed() + * methods of the cronet_bidirectional_stream_callback. + * Each call will result in an invocation of the callback's + * on_read_completed() method if data is read, or its on_failed() method if + * there's an error. The callback's on_succeeded() method is also invoked if + * there is no more data to read and |end_of_stream| was previously sent. */ +CRONET_EXPORT int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream, char* buffer, int capacity); -/* Read response data into |buffer| of |capacity| length. Must only be called at - * most once in response to each invocation of the - * on_response_headers_received() and on_read_completed() methods of the - * cronet_bidirectional_stream_callback. - * Each call will result in an invocation of one of the callback's - * on_read_completed method if data is read, its on_succeeded() method if - * the stream is closed, or its on_failed() method if there's an error. +/* Writes request data from |buffer| of |buffer_length| length. If auto flush is + * disabled, data will be sent only after cronet_bidirectional_stream_flush() is + * called. + * Each call will result in an invocation the callback's on_write_completed() + * method if data is sent, or its on_failed() method if there's an error. + * The callback's on_succeeded() method is also invoked if |end_of_stream| is + * set and all response data has been read. */ +CRONET_EXPORT int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, const char* buffer, - int count, + int buffer_length, bool end_of_stream); +/** + * Flushes pending writes. This method should not be called before invocation of + * on_stream_ready() method of the cronet_bidirectional_stream_callback. + * For each previously called cronet_bidirectional_stream_write() + * a corresponding on_write_completed() callback will be invoked when the buffer + * is sent. + */ +CRONET_EXPORT +void cronet_bidirectional_stream_flush(cronet_bidirectional_stream* stream); + /* Cancels the stream. Can be called at any time after * cronet_bidirectional_stream_start(). The on_canceled() method of * cronet_bidirectional_stream_callback will be invoked when cancelation * is complete and no further callback methods will be invoked. If the * stream has completed or has not started, calling * cronet_bidirectional_stream_cancel() has no effect and on_canceled() will not - * be invoked. At most one callback method may be invoked after + * be invoked. At most one callback method may be invoked after * cronet_bidirectional_stream_cancel() has completed. */ -int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream); +CRONET_EXPORT +void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream); /* Returns true if the |stream| was successfully started and is now done * (succeeded, canceled, or failed). * Returns false if the |stream| stream is not yet started or is in progress. */ +CRONET_EXPORT bool cronet_bidirectional_stream_is_done(cronet_bidirectional_stream* stream); #ifdef __cplusplus |