From 60ab7ef00ac0a988ee2672c636d946c964e6fa41 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Mon, 23 Jan 2017 23:00:35 -0800 Subject: Dynamically enable/disable packet coalecsing and test it --- src/core/ext/transport/cronet/client/secure/cronet_channel_create.c | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src/core/ext/transport/cronet/client/secure/cronet_channel_create.c') diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c index 477cf07f45..2e40020ae0 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -51,6 +51,8 @@ typedef struct cronet_transport { extern grpc_transport_vtable grpc_cronet_vtable; +bool grpc_cronet_packet_coalescing_enabled = true; + GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( void *engine, const char *target, const grpc_channel_args *args, void *reserved) { @@ -67,3 +69,7 @@ GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( return grpc_channel_create(&exec_ctx, target, args, GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); } + +GRPCAPI void grpc_cronet_use_packet_coalescing(bool use_coalescing) { + grpc_cronet_packet_coalescing_enabled = use_coalescing; +} -- cgit v1.2.3 From eb5ee45eec0c6f88bcf649f383060f3dc34f2084 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 1 Feb 2017 11:57:33 -0800 Subject: Revert "Dynamically enable/disable packet coalecsing and test it" This reverts commit 60ab7ef00ac0a988ee2672c636d946c964e6fa41. --- include/grpc/grpc_cronet.h | 2 - .../cronet/client/secure/cronet_channel_create.c | 6 -- .../transport/cronet/transport/cronet_transport.c | 72 ++++++++++++---------- .../tests/CronetUnitTests/CronetUnitTests.m | 13 +--- src/objective-c/tests/Podfile | 1 + 5 files changed, 42 insertions(+), 52 deletions(-) (limited to 'src/core/ext/transport/cronet/client/secure/cronet_channel_create.c') diff --git a/include/grpc/grpc_cronet.h b/include/grpc/grpc_cronet.h index 566c34a388..295e0f55e8 100644 --- a/include/grpc/grpc_cronet.h +++ b/include/grpc/grpc_cronet.h @@ -44,8 +44,6 @@ GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( void *engine, const char *target, const grpc_channel_args *args, void *reserved); -GRPCAPI void grpc_cronet_use_packet_coalescing(bool use_coalescing); - #ifdef __cplusplus } #endif diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c index 2e40020ae0..477cf07f45 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -51,8 +51,6 @@ typedef struct cronet_transport { extern grpc_transport_vtable grpc_cronet_vtable; -bool grpc_cronet_packet_coalescing_enabled = true; - GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( void *engine, const char *target, const grpc_channel_args *args, void *reserved) { @@ -69,7 +67,3 @@ GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( return grpc_channel_create(&exec_ctx, target, args, GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); } - -GRPCAPI void grpc_cronet_use_packet_coalescing(bool use_coalescing) { - grpc_cronet_packet_coalescing_enabled = use_coalescing; -} diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 5429eb32e3..447f3f31ec 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -61,8 +61,6 @@ /* TODO (makdharma): Hook up into the wider tracing mechanism */ int grpc_cronet_trace = 0; -extern bool grpc_cronet_packet_coalescing_enabled; - enum e_op_result { ACTION_TAKEN_WITH_CALLBACK, ACTION_TAKEN_NO_CALLBACK, @@ -152,13 +150,12 @@ struct op_state { bool state_callback_received[OP_NUM_OPS]; bool fail_state; bool flush_read; +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING bool flush_cronet_when_ready; bool pending_write_for_trailer; +#endif bool unprocessed_send_message; grpc_error *cancel_error; - - /* Whether packet coalescing is enabled */ - bool packet_coalescing_enabled; /* data structure for storing data coming from server */ struct read_state rs; /* data structure for storing data going to the server */ @@ -428,10 +425,12 @@ static void on_stream_ready(bidirectional_stream *stream) { } /* Send the initial metadata on wire if there is no SEND_MESSAGE or * SEND_TRAILING_METADATA ops pending */ - if (s->state.packet_coalescing_enabled && s->state.flush_cronet_when_ready) { +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + if (s->state.flush_cronet_when_ready) { CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); bidirectional_stream_flush(stream); } +#endif gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -569,10 +568,10 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, "", 0, true); - if (s->state.packet_coalescing_enabled) { - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); - bidirectional_stream_flush(s->cbs); - } +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); +#endif s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); @@ -769,9 +768,11 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, result = false; /* we haven't got on_write_completed for the send yet */ else if (stream_state->state_op_done[OP_SEND_MESSAGE] && - !stream_state->state_callback_received[OP_SEND_MESSAGE] && - !(stream_state->packet_coalescing_enabled && - stream_state->pending_write_for_trailer)) + !stream_state->state_callback_received[OP_SEND_MESSAGE] +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + && !stream_state->pending_write_for_trailer +#endif + ) result = false; } else if (op_id == OP_CANCEL_ERROR) { /* already executed */ @@ -857,10 +858,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs); - if (stream_state->packet_coalescing_enabled) { - bidirectional_stream_disable_auto_flush(s->cbs, true); - bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); - } +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + bidirectional_stream_disable_auto_flush(s->cbs, true); + bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); +#endif char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; @@ -871,10 +872,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; - if (stream_state->packet_coalescing_enabled && !stream_op->send_message && - !stream_op->send_trailing_metadata) { +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + if (!stream_op->send_message && !stream_op->send_trailing_metadata) { s->state.flush_cronet_when_ready = true; } +#endif result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_op->send_message && op_can_be_run(stream_op, stream_state, &oas->state, @@ -911,18 +913,19 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, (int)write_buffer_size, false); - if (stream_state->packet_coalescing_enabled) { - if (!stream_op->send_trailing_metadata) { - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); - bidirectional_stream_flush(s->cbs); - result = ACTION_TAKEN_WITH_CALLBACK; - } else { - stream_state->pending_write_for_trailer = true; - result = ACTION_TAKEN_NO_CALLBACK; - } - } else { +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + if (!stream_op->send_trailing_metadata) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", + s->cbs); + bidirectional_stream_flush(s->cbs); result = ACTION_TAKEN_WITH_CALLBACK; + } else { + stream_state->pending_write_for_trailer = true; + result = ACTION_TAKEN_NO_CALLBACK; } +#else + result = ACTION_TAKEN_WITH_CALLBACK; +#endif } else { result = NO_ACTION_POSSIBLE; } @@ -941,10 +944,10 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->cbs); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, "", 0, true); - if (stream_state->packet_coalescing_enabled) { - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); - bidirectional_stream_flush(s->cbs); - } +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); +#endif result = ACTION_TAKEN_WITH_CALLBACK; } stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; @@ -1173,9 +1176,10 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(s->state.state_callback_received)); s->state.fail_state = s->state.flush_read = false; s->state.cancel_error = NULL; +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; +#endif s->state.unprocessed_send_message = false; - s->state.packet_coalescing_enabled = grpc_cronet_packet_coalescing_enabled; gpr_mu_init(&s->mu); return 0; } diff --git a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m index 9bbf3cdb11..dcd7f2fa8d 100644 --- a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m +++ b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m @@ -269,9 +269,7 @@ unsigned int parse_h2_length(const char *field) { grpc_completion_queue_destroy(cq); } -- (void)PacketCoalescing:(bool)use_coalescing { - grpc_cronet_use_packet_coalescing(use_coalescing); - +- (void)testPacketCoalescing { grpc_call *c; grpc_slice request_payload_slice = grpc_slice_from_copied_string("hello world"); @@ -381,7 +379,7 @@ unsigned int parse_h2_length(const char *field) { long len; bool coalesced = false; while ((len = SSL_read(ssl, buf, sizeof(buf))) > 0) { - gpr_log(GPR_DEBUG, "Read len: %ld", len); + NSLog(@"Read len: %ld", len); // Analyze the HTTP/2 frames in the same TLS PDU to identify if // coalescing is successful @@ -406,7 +404,7 @@ unsigned int parse_h2_length(const char *field) { } } - XCTAssert(coalesced == use_coalescing); + XCTAssert(coalesced); SSL_free(ssl); SSL_CTX_free(ctx); close(s); @@ -435,9 +433,4 @@ unsigned int parse_h2_length(const char *field) { grpc_completion_queue_destroy(cq); } -- (void)testPacketCoalescing { - [self PacketCoalescing:false]; - [self PacketCoalescing:true]; -} - @end diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile index 3760330be9..462c6a8e0e 100644 --- a/src/objective-c/tests/Podfile +++ b/src/objective-c/tests/Podfile @@ -97,6 +97,7 @@ post_install do |installer| # GPR_UNREACHABLE_CODE causes "Control may reach end of non-void # function" warning config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO' + config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1' end end -- cgit v1.2.3 From 0a2fae9aedeef98b39c5f825cb28b5ca32ecc6a2 Mon Sep 17 00:00:00 2001 From: Muxi Yan Date: Wed, 1 Feb 2017 14:49:03 -0800 Subject: Dynamically enable packet coalescing by channel args --- BUILD | 1 + build.yaml | 1 + gRPC-Core.podspec | 3 +- include/grpc/impl/codegen/grpc_types.h | 4 + .../cronet/client/secure/cronet_channel_create.c | 13 +- .../transport/cronet/transport/cronet_transport.c | 181 +++++++++++++-------- .../transport/cronet/transport/cronet_transport.h | 43 +++++ .../tests/CronetUnitTests/CronetUnitTests.m | 20 ++- src/objective-c/tests/Podfile | 1 - templates/gRPC-Core.podspec.template | 3 +- tools/run_tests/generated/sources_and_headers.json | 4 +- 11 files changed, 186 insertions(+), 88 deletions(-) create mode 100644 src/core/ext/transport/cronet/transport/cronet_transport.h (limited to 'src/core/ext/transport/cronet/client/secure/cronet_channel_create.c') diff --git a/BUILD b/BUILD index 54192514cc..e5c2d20d25 100644 --- a/BUILD +++ b/BUILD @@ -1028,6 +1028,7 @@ grpc_cc_library( ], hdrs = [ "third_party/Cronet/bidirectional_stream_c.h", + "src/core/ext/transport/cronet/transport/cronet_transport.h", ], language = "c", public_hdrs = [ diff --git a/build.yaml b/build.yaml index 23e2659ea1..63c05b76c6 100644 --- a/build.yaml +++ b/build.yaml @@ -684,6 +684,7 @@ filegroups: - include/grpc/grpc_security.h - include/grpc/grpc_security_constants.h headers: + - src/core/ext/transport/cronet/transport/cronet_transport.h - third_party/Cronet/bidirectional_stream_c.h src: - src/core/ext/transport/cronet/client/secure/cronet_channel_create.c diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec index 1eb178931d..57816d066d 100644 --- a/gRPC-Core.podspec +++ b/gRPC-Core.podspec @@ -848,7 +848,8 @@ Pod::Spec.new do |s| s.subspec 'Cronet-Interface' do |ss| ss.header_mappings_dir = 'include/grpc' - ss.source_files = 'include/grpc/grpc_cronet.h' + ss.source_files = 'include/grpc/grpc_cronet.h', + 'src/core/ext/transport/cronet/transport/cronet_transport.h' end s.subspec 'Cronet-Implementation' do |ss| diff --git a/include/grpc/impl/codegen/grpc_types.h b/include/grpc/impl/codegen/grpc_types.h index ee8101aab8..74be7d0ccf 100644 --- a/include/grpc/impl/codegen/grpc_types.h +++ b/include/grpc/impl/codegen/grpc_types.h @@ -217,6 +217,10 @@ typedef struct { #define GRPC_ARG_LB_POLICY_NAME "grpc.lb_policy_name" /** The grpc_socket_mutator instance that set the socket options. A pointer. */ #define GRPC_ARG_SOCKET_MUTATOR "grpc.socket_mutator" +/** If non-zero, Cronet transport will coalesce packets to fewer frames when + * possible. */ +#define GRPC_ARG_USE_CRONET_PACKET_COALESCING \ + "grpc.use_cronet_packet_coalescing" /** \} */ /** Result of a grpc call. If the caller satisfies the prerequisites of a diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c index 477cf07f45..b6e9e845df 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -39,6 +39,7 @@ #include #include +#include "src/core/ext/transport/cronet/transport/cronet_transport.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/transport_impl.h" @@ -54,16 +55,14 @@ extern grpc_transport_vtable grpc_cronet_vtable; GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( void *engine, const char *target, const grpc_channel_args *args, void *reserved) { - cronet_transport *ct = gpr_malloc(sizeof(cronet_transport)); - ct->base.vtable = &grpc_cronet_vtable; - ct->engine = engine; - ct->host = gpr_malloc(strlen(target) + 1); - strcpy(ct->host, target); gpr_log(GPR_DEBUG, "grpc_create_cronet_transport: stream_engine = %p, target=%s", engine, - ct->host); + target); + + grpc_transport *ct = + grpc_create_cronet_transport(engine, target, args, reserved); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; return grpc_channel_create(&exec_ctx, target, args, - GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); + GRPC_CLIENT_DIRECT_CHANNEL, ct); } diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 447f3f31ec..cba5365bad 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -112,6 +112,7 @@ struct grpc_cronet_transport { grpc_transport base; /* must be first element in this structure */ stream_engine *engine; char *host; + bool use_packet_coalescing; }; typedef struct grpc_cronet_transport grpc_cronet_transport; @@ -150,10 +151,8 @@ struct op_state { bool state_callback_received[OP_NUM_OPS]; bool fail_state; bool flush_read; -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING bool flush_cronet_when_ready; bool pending_write_for_trailer; -#endif bool unprocessed_send_message; grpc_error *cancel_error; /* data structure for storing data coming from server */ @@ -178,7 +177,7 @@ struct op_storage { struct stream_obj { struct op_and_state *oas; grpc_transport_stream_op *curr_op; - grpc_cronet_transport curr_ct; + grpc_cronet_transport *curr_ct; grpc_stream *curr_gs; bidirectional_stream *cbs; bidirectional_stream_header_array header_array; @@ -415,6 +414,7 @@ static void on_succeeded(bidirectional_stream *stream) { static void on_stream_ready(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; @@ -425,12 +425,12 @@ static void on_stream_ready(bidirectional_stream *stream) { } /* Send the initial metadata on wire if there is no SEND_MESSAGE or * SEND_TRAILING_METADATA ops pending */ -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - if (s->state.flush_cronet_when_ready) { - CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); - bidirectional_stream_flush(stream); + if (t->use_packet_coalescing) { + if (s->state.flush_cronet_when_ready) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(stream); + } } -#endif gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -540,6 +540,7 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj *s = (stream_obj *)stream->annotation; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); @@ -568,10 +569,10 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, "", 0, true); -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); - bidirectional_stream_flush(s->cbs); -#endif + if (t->use_packet_coalescing) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + } s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); @@ -695,8 +696,10 @@ static bool header_has_authority(grpc_linked_mdelem *head) { executed. This is the heart of the state machine. */ static bool op_can_be_run(grpc_transport_stream_op *curr_op, - struct op_state *stream_state, - struct op_state *op_state, enum e_op_id op_id) { + struct stream_obj *s, struct op_state *op_state, + enum e_op_id op_id) { + struct op_state *stream_state = &s->state; + grpc_cronet_transport *t = s->curr_ct; bool result = true; /* When call is canceled, every op can be run, except under following conditions @@ -768,11 +771,9 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, result = false; /* we haven't got on_write_completed for the send yet */ else if (stream_state->state_op_done[OP_SEND_MESSAGE] && - !stream_state->state_callback_received[OP_SEND_MESSAGE] -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - && !stream_state->pending_write_for_trailer -#endif - ) + !stream_state->state_callback_received[OP_SEND_MESSAGE] && + !(t->use_packet_coalescing && + stream_state->pending_write_for_trailer)) result = false; } else if (op_id == OP_CANCEL_ERROR) { /* already executed */ @@ -845,42 +846,41 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas) { grpc_transport_stream_op *stream_op = &oas->op; struct stream_obj *s = oas->s; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; struct op_state *stream_state = &s->state; enum e_op_result result = NO_ACTION_POSSIBLE; if (stream_op->send_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_INITIAL_METADATA)) { + op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas); /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled, * on_failed */ GPR_ASSERT(s->cbs == NULL); GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]); - s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, + s->cbs = bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs); -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - bidirectional_stream_disable_auto_flush(s->cbs, true); - bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); -#endif + if (t->use_packet_coalescing) { + bidirectional_stream_disable_auto_flush(s->cbs, true); + bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); + } char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; convert_metadata_to_cronet_headers( - stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, + stream_op->send_initial_metadata->list.head, t->host, &url, &s->header_array.headers, &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - if (!stream_op->send_message && !stream_op->send_trailing_metadata) { - s->state.flush_cronet_when_ready = true; + if (t->use_packet_coalescing) { + if (!stream_op->send_message && !stream_op->send_trailing_metadata) { + s->state.flush_cronet_when_ready = true; + } } -#endif result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_op->send_message && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_MESSAGE)) { + op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); stream_state->unprocessed_send_message = false; if (stream_state->state_callback_received[OP_FAILED]) { @@ -913,19 +913,18 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, (int)write_buffer_size, false); -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - if (!stream_op->send_trailing_metadata) { - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", - s->cbs); - bidirectional_stream_flush(s->cbs); - result = ACTION_TAKEN_WITH_CALLBACK; + if (t->use_packet_coalescing) { + if (!stream_op->send_trailing_metadata) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + result = ACTION_TAKEN_WITH_CALLBACK; + } else { + stream_state->pending_write_for_trailer = true; + result = ACTION_TAKEN_NO_CALLBACK; + } } else { - stream_state->pending_write_for_trailer = true; - result = ACTION_TAKEN_NO_CALLBACK; + result = ACTION_TAKEN_WITH_CALLBACK; } -#else - result = ACTION_TAKEN_WITH_CALLBACK; -#endif } else { result = NO_ACTION_POSSIBLE; } @@ -933,7 +932,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true; } else if (stream_op->send_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, + op_can_be_run(stream_op, s, &oas->state, OP_SEND_TRAILING_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); if (stream_state->state_callback_received[OP_FAILED]) { @@ -944,15 +943,15 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->cbs); stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, "", 0, true); -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); - bidirectional_stream_flush(s->cbs); -#endif + if (t->use_packet_coalescing) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + } result = ACTION_TAKEN_WITH_CALLBACK; } stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; } else if (stream_op->recv_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, + op_can_be_run(stream_op, s, &oas->state, OP_RECV_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { @@ -971,8 +970,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->recv_message && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_RECV_MESSAGE)) { + op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); @@ -1084,7 +1082,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_op->recv_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, + op_can_be_run(stream_op, s, &oas->state, OP_RECV_TRAILING_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); if (oas->s->state.rs.trailing_metadata_valid) { @@ -1096,8 +1094,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->cancel_error && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_CANCEL_ERROR)) { + op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); if (s->cbs) { @@ -1111,8 +1108,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); } } else if (stream_op->on_complete && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_ON_COMPLETE)) { + op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { grpc_closure_sched(exec_ctx, stream_op->on_complete, @@ -1176,10 +1172,12 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(s->state.state_callback_received)); s->state.fail_state = s->state.flush_read = false; s->state.cancel_error = NULL; -#ifdef GRPC_CRONET_WITH_PACKET_COALESCING s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; -#endif s->state.unprocessed_send_message = false; + + s->curr_gs = gs; + s->curr_ct = (grpc_cronet_transport *)gt; + gpr_mu_init(&s->mu); return 0; } @@ -1195,8 +1193,6 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); stream_obj *s = (stream_obj *)gs; - s->curr_gs = gs; - memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport)); add_to_storage(s, op); if (op->send_initial_metadata && header_has_authority(op->send_initial_metadata->list.head)) { @@ -1244,14 +1240,55 @@ static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) {} -const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), - "cronet_http", - init_stream, - set_pollset_do_nothing, - set_pollset_set_do_nothing, - perform_stream_op, - perform_op, - destroy_stream, - destroy_transport, - get_peer, - get_endpoint}; +static const grpc_transport_vtable grpc_cronet_vtable = { + sizeof(stream_obj), + "cronet_http", + init_stream, + set_pollset_do_nothing, + set_pollset_set_do_nothing, + perform_stream_op, + perform_op, + destroy_stream, + destroy_transport, + get_peer, + get_endpoint}; + +grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, + const grpc_channel_args *args, + void *reserved) { + grpc_cronet_transport *ct = gpr_malloc(sizeof(grpc_cronet_transport)); + if (!ct) { + goto error; + } + ct->base.vtable = &grpc_cronet_vtable; + ct->engine = engine; + ct->host = gpr_malloc(strlen(target) + 1); + if (!ct->host) { + goto error; + } + strcpy(ct->host, target); + + ct->use_packet_coalescing = true; + for (size_t i = 0; i < args->num_args; i++) { + if (0 == strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_USE_CRONET_PACKET_COALESCING); + } else { + ct->use_packet_coalescing = (args->args[i].value.integer != 0); + } + } + } + + return &ct->base; + +error: + if (ct) { + if (ct->host) { + gpr_free(ct->host); + } + gpr_free(ct); + } + + return NULL; +} \ No newline at end of file diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.h b/src/core/ext/transport/cronet/transport/cronet_transport.h new file mode 100644 index 0000000000..169ce31fd7 --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_transport.h @@ -0,0 +1,43 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H +#define GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H + +#include "src/core/lib/transport/transport.h" + +grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, + const grpc_channel_args *args, + void *reserved); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H */ diff --git a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m index dcd7f2fa8d..d06fe7767f 100644 --- a/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m +++ b/src/objective-c/tests/CronetUnitTests/CronetUnitTests.m @@ -269,7 +269,12 @@ unsigned int parse_h2_length(const char *field) { grpc_completion_queue_destroy(cq); } -- (void)testPacketCoalescing { +- (void)PacketCoalescing:(bool)use_coalescing { + grpc_arg arg; + arg.key = GRPC_ARG_USE_CRONET_PACKET_COALESCING; + arg.type = GRPC_ARG_INTEGER; + arg.value.integer = use_coalescing ? 1:0; + grpc_channel_args *args = grpc_channel_args_copy_and_add(NULL, &arg, 1); grpc_call *c; grpc_slice request_payload_slice = grpc_slice_from_copied_string("hello world"); @@ -285,8 +290,8 @@ unsigned int parse_h2_length(const char *field) { gpr_join_host_port(&addr, "127.0.0.1", port); grpc_completion_queue *cq = grpc_completion_queue_create(NULL); stream_engine *cronetEngine = [Cronet getGlobalEngine]; - grpc_channel *client = grpc_cronet_secure_channel_create(cronetEngine, addr, - NULL, NULL); + grpc_channel *client = + grpc_cronet_secure_channel_create(cronetEngine, addr, args, NULL); cq_verifier *cqv = cq_verifier_create(cq); grpc_op ops[6]; @@ -379,7 +384,7 @@ unsigned int parse_h2_length(const char *field) { long len; bool coalesced = false; while ((len = SSL_read(ssl, buf, sizeof(buf))) > 0) { - NSLog(@"Read len: %ld", len); + gpr_log(GPR_DEBUG, "Read len: %ld", len); // Analyze the HTTP/2 frames in the same TLS PDU to identify if // coalescing is successful @@ -404,7 +409,7 @@ unsigned int parse_h2_length(const char *field) { } } - XCTAssert(coalesced); + XCTAssert(coalesced == use_coalescing); SSL_free(ssl); SSL_CTX_free(ctx); close(s); @@ -433,4 +438,9 @@ unsigned int parse_h2_length(const char *field) { grpc_completion_queue_destroy(cq); } +- (void)testPacketCoalescing { + [self PacketCoalescing:true]; + [self PacketCoalescing:false]; +} + @end diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile index 462c6a8e0e..3760330be9 100644 --- a/src/objective-c/tests/Podfile +++ b/src/objective-c/tests/Podfile @@ -97,7 +97,6 @@ post_install do |installer| # GPR_UNREACHABLE_CODE causes "Control may reach end of non-void # function" warning config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO' - config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1' end end diff --git a/templates/gRPC-Core.podspec.template b/templates/gRPC-Core.podspec.template index 1b97d18f16..0738b7221b 100644 --- a/templates/gRPC-Core.podspec.template +++ b/templates/gRPC-Core.podspec.template @@ -161,7 +161,8 @@ s.subspec 'Cronet-Interface' do |ss| ss.header_mappings_dir = 'include/grpc' - ss.source_files = 'include/grpc/grpc_cronet.h' + ss.source_files = 'include/grpc/grpc_cronet.h', + 'src/core/ext/transport/cronet/transport/cronet_transport.h' end s.subspec 'Cronet-Implementation' do |ss| diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json index 9bc82486d2..7f59411019 100644 --- a/tools/run_tests/generated/sources_and_headers.json +++ b/tools/run_tests/generated/sources_and_headers.json @@ -7873,6 +7873,7 @@ "include/grpc/grpc_cronet.h", "include/grpc/grpc_security.h", "include/grpc/grpc_security_constants.h", + "src/core/ext/transport/cronet/transport/cronet_transport.h", "third_party/Cronet/bidirectional_stream_c.h" ], "is_filegroup": true, @@ -7884,7 +7885,8 @@ "include/grpc/grpc_security_constants.h", "src/core/ext/transport/cronet/client/secure/cronet_channel_create.c", "src/core/ext/transport/cronet/transport/cronet_api_dummy.c", - "src/core/ext/transport/cronet/transport/cronet_transport.c" + "src/core/ext/transport/cronet/transport/cronet_transport.c", + "src/core/ext/transport/cronet/transport/cronet_transport.h" ], "third_party": false, "type": "filegroup" -- cgit v1.2.3