diff options
author | Muxi Yan <mxyan@google.com> | 2016-10-21 13:59:14 -0700 |
---|---|---|
committer | Muxi Yan <mxyan@google.com> | 2017-01-03 21:17:52 -0800 |
commit | 740ae63a8a816369b18b8ac9602a9acc7cc0b30d (patch) | |
tree | 2c7e62ebbd3d637208c8a2d5e8ead6dc6e7ae5b4 /src | |
parent | 6ff14349ff1c7c16eff295aab21e9c370eb52af7 (diff) |
Packet coalescing transport layer and end2end test changes
Diffstat (limited to 'src')
5 files changed, 112 insertions, 43 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c index 687026c9fd..38755604b9 100644 --- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c +++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c @@ -77,9 +77,8 @@ int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, return 0; } -int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { +void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { GPR_ASSERT(0); - return 0; } #endif /* GRPC_COMPILE_WITH_CRONET */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index afc59f4b12..4063dcaefc 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -86,7 +86,7 @@ enum e_op_id { /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */ -static void on_request_headers_sent(cronet_bidirectional_stream *); +static void on_stream_ready(cronet_bidirectional_stream *); static void on_response_headers_received( cronet_bidirectional_stream *, const cronet_bidirectional_stream_header_array *, const char *); @@ -99,7 +99,7 @@ static void on_succeeded(cronet_bidirectional_stream *); static void on_failed(cronet_bidirectional_stream *, int); static void on_canceled(cronet_bidirectional_stream *); static cronet_bidirectional_stream_callback cronet_callbacks = { - on_request_headers_sent, + on_stream_ready, on_response_headers_received, on_read_completed, on_write_completed, @@ -151,6 +151,11 @@ struct op_state { bool state_callback_received[OP_NUM_OPS]; bool fail_state; bool flush_read; +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + bool flush_cronet_when_ready; + bool pending_write_for_trailer; +#endif + bool unprocessed_send_message; grpc_error *cancel_error; /* data structure for storing data coming from server */ struct read_state rs; @@ -273,6 +278,9 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { new_op->next = storage->head; storage->head = new_op; storage->num_pending_ops++; + if (op->send_message) { + s->state.unprocessed_send_message = true; + } CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, storage->num_pending_ops); gpr_mu_unlock(&s->mu); @@ -405,8 +413,8 @@ static void on_succeeded(cronet_bidirectional_stream *stream) { /* Cronet callback */ -static void on_request_headers_sent(cronet_bidirectional_stream *stream) { - CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); +static void on_stream_ready(cronet_bidirectional_stream *stream) { + CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; @@ -416,6 +424,14 @@ static void on_request_headers_sent(cronet_bidirectional_stream *stream) { gpr_free(s->header_array.headers); s->header_array.headers = NULL; } +/* Send the initial metadata on wire if there is no SEND_MESSAGE or + * SEND_TRAILING_METADATA ops pending */ +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + if (s->state.flush_cronet_when_ready) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + cronet_bidirectional_stream_flush(stream); + } +#endif gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -551,6 +567,10 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; cronet_bidirectional_stream_write(s->cbs, "", 0, true); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + cronet_bidirectional_stream_flush(s->cbs); +#endif s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); @@ -598,7 +618,7 @@ static void convert_metadata_to_cronet_headers( curr = curr->next; num_headers_available++; } - /* Allocate enough memory. It is freed in the on_request_headers_sent callback + /* Allocate enough memory. It is freed in the on_stream_ready callback */ cronet_bidirectional_stream_header *headers = (cronet_bidirectional_stream_header *)gpr_malloc( @@ -740,12 +760,16 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; /* we haven't sent message yet */ - else if (curr_op->send_message && + else if (stream_state->unprocessed_send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false; /* we haven't got on_write_completed for the send yet */ else if (stream_state->state_op_done[OP_SEND_MESSAGE] && - !stream_state->state_callback_received[OP_SEND_MESSAGE]) + !stream_state->state_callback_received[OP_SEND_MESSAGE] +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + && !stream_state->pending_write_for_trailer +#endif + ) result = false; } else if (op_id == OP_CANCEL_ERROR) { /* already executed */ @@ -831,6 +855,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + cronet_bidirectional_stream_disable_auto_flush(s->cbs, true); + cronet_bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); +#endif char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; @@ -843,30 +871,17 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; - result = ACTION_TAKEN_WITH_CALLBACK; - } else if (stream_op->recv_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_RECV_INITIAL_METADATA)) { - CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); - if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_CANCELLED, NULL); - } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_exec_ctx_sched( - exec_ctx, stream_op->recv_initial_metadata_ready, - make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); - } else { - grpc_chttp2_incoming_metadata_buffer_publish( - &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); - grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE, NULL); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + if (!stream_op->send_message && !stream_op->send_trailing_metadata) { + s->state.flush_cronet_when_ready = true; } - stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; - result = ACTION_TAKEN_NO_CALLBACK; +#endif + result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_op->send_message && op_can_be_run(stream_op, stream_state, &oas->state, OP_SEND_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); + stream_state->unprocessed_send_message = false; if (stream_state->state_callback_received[OP_FAILED]) { result = NO_ACTION_POSSIBLE; CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); @@ -897,13 +912,63 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_callback_received[OP_SEND_MESSAGE] = false; cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, (int)write_buffer_size, false); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + if (!stream_op->send_trailing_metadata) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", + s->cbs); + cronet_bidirectional_stream_flush(s->cbs); + result = ACTION_TAKEN_WITH_CALLBACK; + } else { + stream_state->pending_write_for_trailer = true; + result = ACTION_TAKEN_NO_CALLBACK; + } +#else result = ACTION_TAKEN_WITH_CALLBACK; +#endif } else { result = NO_ACTION_POSSIBLE; } } stream_state->state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true; + } else if (stream_op->send_trailing_metadata && + op_can_be_run(stream_op, stream_state, &oas->state, + OP_SEND_TRAILING_METADATA)) { + CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); + if (stream_state->state_callback_received[OP_FAILED]) { + result = NO_ACTION_POSSIBLE; + CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); + } else { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", + s->cbs); + stream_state->state_callback_received[OP_SEND_MESSAGE] = false; + cronet_bidirectional_stream_write(s->cbs, "", 0, true); +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + cronet_bidirectional_stream_flush(s->cbs); +#endif + result = ACTION_TAKEN_WITH_CALLBACK; + } + stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; + } else if (stream_op->recv_initial_metadata && + op_can_be_run(stream_op, stream_state, &oas->state, + OP_RECV_INITIAL_METADATA)) { + CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); + if (stream_state->state_op_done[OP_CANCEL_ERROR]) { + grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED, NULL); + } else if (stream_state->state_callback_received[OP_FAILED]) { + grpc_exec_ctx_sched( + exec_ctx, stream_op->recv_initial_metadata_ready, + make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."), NULL); + } else { + grpc_chttp2_incoming_metadata_buffer_publish( + &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata); + grpc_exec_ctx_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE, NULL); + } + stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->recv_message && op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_MESSAGE)) { @@ -962,6 +1027,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE, NULL); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; + + /* Extra read to trigger on_succeed */ + stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; + stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; + stream_state->rs.received_bytes = 0; + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read(%p)", s->cbs); + stream_state->state_op_done[OP_READ_REQ_MADE] = + true; /* Indicates that at least one read request has been made */ + cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, + stream_state->rs.remaining_bytes); result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_state->rs.remaining_bytes == 0) { @@ -1020,21 +1095,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; - } else if (stream_op->send_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_TRAILING_METADATA)) { - CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); - if (stream_state->state_callback_received[OP_FAILED]) { - result = NO_ACTION_POSSIBLE; - CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); - } else { - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p, 0)", - s->cbs); - stream_state->state_callback_received[OP_SEND_MESSAGE] = false; - cronet_bidirectional_stream_write(s->cbs, "", 0, true); - result = ACTION_TAKEN_WITH_CALLBACK; - } - stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; } else if (stream_op->cancel_error && op_can_be_run(stream_op, stream_state, &oas->state, OP_CANCEL_ERROR)) { @@ -1117,6 +1177,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(s->state.state_callback_received)); s->state.fail_state = s->state.flush_read = false; s->state.cancel_error = NULL; +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; +#endif + s->state.unprocessed_send_message = false; gpr_mu_init(&s->mu); return 0; } diff --git a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m index 4ba7badd86..fe9a7c2247 100644 --- a/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m +++ b/src/objective-c/tests/CoreCronetEnd2EndTests/CoreCronetEnd2EndTests.m @@ -346,6 +346,10 @@ static char *roots_filename; [self testIndividualCase:"no_op"]; } +- (void)testPacketCoalescing { + [self testIndividualCase:"packet_coalescing"]; +} + - (void)testPayload { [self testIndividualCase:"payload"]; } diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile index 5785b976f2..d1ef0886fe 100644 --- a/src/objective-c/tests/Podfile +++ b/src/objective-c/tests/Podfile @@ -92,6 +92,7 @@ post_install do |installer| # GPR_UNREACHABLE_CODE causes "Control may reach end of non-void # function" warning config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO' + config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1' end end diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj index c4a6567ae0..8455e71b02 100644 --- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj +++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj @@ -1296,6 +1296,7 @@ "$(inherited)", "GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS=1", "GRPC_COMPILE_WITH_CRONET=1", + "GRPC_CRONET_WITH_PACKET_COALESCING=1", ); INFOPLIST_FILE = InteropTestsRemoteWithCronet/Info.plist; IPHONEOS_DEPLOYMENT_TARGET = 9.3; |