diff options
Diffstat (limited to 'src/core/ext')
8 files changed, 228 insertions, 89 deletions
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c index fd0fb41fb9..a50ba09bf5 100644 --- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.c @@ -296,8 +296,6 @@ static void stop_connectivity_watchers(grpc_exec_ctx *exec_ctx, static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { pick_first_lb_policy *p = (pick_first_lb_policy *)policy; - /* Find the number of backend addresses. We ignore balancer - * addresses, since we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { @@ -317,11 +315,7 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, return; } const grpc_lb_addresses *addresses = arg->value.pointer.p; - size_t num_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - if (!addresses->addresses[i].is_balancer) ++num_addrs; - } - if (num_addrs == 0) { + if (addresses->num_addresses == 0) { // Empty update. Unsubscribe from all current subchannels and put the // channel in TRANSIENT_FAILURE. grpc_connectivity_state_set( @@ -333,9 +327,10 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, } if (GRPC_TRACER_ON(grpc_lb_pick_first_trace)) { gpr_log(GPR_INFO, "Pick First %p received update with %lu addresses", - (void *)p, (unsigned long)num_addrs); + (void *)p, (unsigned long)addresses->num_addresses); } - grpc_subchannel_args *sc_args = gpr_zalloc(sizeof(*sc_args) * num_addrs); + grpc_subchannel_args *sc_args = + gpr_zalloc(sizeof(*sc_args) * addresses->num_addresses); /* We remove the following keys in order for subchannel keys belonging to * subchannels point to the same address to match. */ static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, @@ -344,7 +339,8 @@ static void pf_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, /* Create list of subchannel args for new addresses in \a args. */ for (size_t i = 0; i < addresses->num_addresses; i++) { - if (addresses->addresses[i].is_balancer) continue; + // If there were any balancer, we would have chosen grpclb policy instead. + GPR_ASSERT(!addresses->addresses[i].is_balancer); if (addresses->addresses[i].user_data != NULL) { gpr_log(GPR_ERROR, "This LB policy doesn't support user data. It will be ignored"); diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c index a63bdd933d..866fb9a1eb 100644 --- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.c @@ -74,9 +74,6 @@ typedef struct round_robin_lb_policy { bool started_picking; /** are we shutting down? */ bool shutdown; - /** has the policy gotten into the GRPC_CHANNEL_SHUTDOWN? No picks can be - * service after this point, the policy will never transition out. */ - bool in_connectivity_shutdown; /** List of picks that are waiting on connectivity */ pending_pick *pending_picks; @@ -424,7 +421,6 @@ static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; GPR_ASSERT(!p->shutdown); - GPR_ASSERT(!p->in_connectivity_shutdown); if (GRPC_TRACER_ON(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Trying to pick", (void *)pol); } @@ -537,7 +533,7 @@ static grpc_connectivity_state update_lb_connectivity_status_locked( grpc_connectivity_state_set(exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); - p->in_connectivity_shutdown = true; + p->shutdown = true; new_state = GRPC_CHANNEL_SHUTDOWN; } else if (subchannel_list->num_transient_failures == p->subchannel_list->num_subchannels) { /* 4) TRANSIENT_FAILURE */ @@ -741,8 +737,6 @@ static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_args *args) { round_robin_lb_policy *p = (round_robin_lb_policy *)policy; - /* Find the number of backend addresses. We ignore balancer addresses, since - * we don't know how to handle them. */ const grpc_arg *arg = grpc_channel_args_find(args->args, GRPC_ARG_LB_ADDRESSES); if (arg == NULL || arg->type != GRPC_ARG_POINTER) { @@ -761,12 +755,9 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, return; } grpc_lb_addresses *addresses = arg->value.pointer.p; - size_t num_addrs = 0; - for (size_t i = 0; i < addresses->num_addresses; i++) { - if (!addresses->addresses[i].is_balancer) ++num_addrs; - } - rr_subchannel_list *subchannel_list = rr_subchannel_list_create(p, num_addrs); - if (num_addrs == 0) { + rr_subchannel_list *subchannel_list = + rr_subchannel_list_create(p, addresses->num_addresses); + if (addresses->num_addresses == 0) { grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), @@ -798,9 +789,8 @@ static void rr_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, GRPC_ARG_LB_ADDRESSES}; /* Create subchannels for addresses in the update. */ for (size_t i = 0; i < addresses->num_addresses; i++) { - /* Skip balancer addresses, since we only know how to handle backends. */ - if (addresses->addresses[i].is_balancer) continue; - GPR_ASSERT(i < num_addrs); + // If there were any balancer, we would have chosen grpclb policy instead. + GPR_ASSERT(!addresses->addresses[i].is_balancer); memset(&sc_args, 0, sizeof(grpc_subchannel_args)); grpc_arg addr_arg = grpc_create_subchannel_address_arg(&addresses->addresses[i].address); diff --git a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c index 56ed4371a9..3ff081a514 100644 --- a/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c +++ b/src/core/ext/filters/client_channel/resolver/fake/fake_resolver.c @@ -32,6 +32,7 @@ #include "src/core/ext/filters/client_channel/parse_address.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" @@ -125,7 +126,6 @@ static const grpc_resolver_vtable fake_resolver_vtable = { struct grpc_fake_resolver_response_generator { fake_resolver* resolver; // Set by the fake_resolver constructor to itself. - grpc_channel_args* next_response; gpr_refcount refcount; }; @@ -151,19 +151,26 @@ void grpc_fake_resolver_response_generator_unref( } } -static void set_response_cb(grpc_exec_ctx* exec_ctx, void* arg, - grpc_error* error) { - grpc_fake_resolver_response_generator* generator = - (grpc_fake_resolver_response_generator*)arg; +typedef struct set_response_closure_arg { + grpc_closure set_response_closure; + grpc_fake_resolver_response_generator* generator; + grpc_channel_args* next_response; +} set_response_closure_arg; + +static void set_response_closure_fn(grpc_exec_ctx* exec_ctx, void* arg, + grpc_error* error) { + set_response_closure_arg* closure_arg = arg; + grpc_fake_resolver_response_generator* generator = closure_arg->generator; fake_resolver* r = generator->resolver; if (r->next_results != NULL) { grpc_channel_args_destroy(exec_ctx, r->next_results); } - r->next_results = generator->next_response; + r->next_results = closure_arg->next_response; if (r->results_upon_error != NULL) { grpc_channel_args_destroy(exec_ctx, r->results_upon_error); } - r->results_upon_error = grpc_channel_args_copy(generator->next_response); + r->results_upon_error = grpc_channel_args_copy(closure_arg->next_response); + gpr_free(closure_arg); fake_resolver_maybe_finish_next_locked(exec_ctx, r); } @@ -171,12 +178,15 @@ void grpc_fake_resolver_response_generator_set_response( grpc_exec_ctx* exec_ctx, grpc_fake_resolver_response_generator* generator, grpc_channel_args* next_response) { GPR_ASSERT(generator->resolver != NULL); - generator->next_response = grpc_channel_args_copy(next_response); - GRPC_CLOSURE_SCHED( - exec_ctx, GRPC_CLOSURE_CREATE(set_response_cb, generator, - grpc_combiner_scheduler( - generator->resolver->base.combiner)), - GRPC_ERROR_NONE); + set_response_closure_arg* closure_arg = gpr_zalloc(sizeof(*closure_arg)); + closure_arg->generator = generator; + closure_arg->next_response = grpc_channel_args_copy(next_response); + GRPC_CLOSURE_SCHED(exec_ctx, + GRPC_CLOSURE_INIT(&closure_arg->set_response_closure, + set_response_closure_fn, closure_arg, + grpc_combiner_scheduler( + generator->resolver->base.combiner)), + GRPC_ERROR_NONE); } static void* response_generator_arg_copy(void* p) { diff --git a/src/core/ext/filters/http/message_compress/message_compress_filter.c b/src/core/ext/filters/http/message_compress/message_compress_filter.c index 20a3488115..a32819bfe4 100644 --- a/src/core/ext/filters/http/message_compress/message_compress_filter.c +++ b/src/core/ext/filters/http/message_compress/message_compress_filter.c @@ -44,7 +44,9 @@ typedef struct call_data { grpc_slice_buffer slices; /**< Buffers up input slices to be compressed */ grpc_linked_mdelem compression_algorithm_storage; + grpc_linked_mdelem stream_compression_algorithm_storage; grpc_linked_mdelem accept_encoding_storage; + grpc_linked_mdelem accept_stream_encoding_storage; uint32_t remaining_slice_bytes; /** Compression algorithm we'll try to use. It may be given by incoming * metadata, or by the channel's default compression settings. */ @@ -75,6 +77,13 @@ typedef struct channel_data { uint32_t enabled_algorithms_bitset; /** Supported compression algorithms */ uint32_t supported_compression_algorithms; + + /** The default, channel-level, stream compression algorithm */ + grpc_stream_compression_algorithm default_stream_compression_algorithm; + /** Bitset of enabled stream compression algorithms */ + uint32_t enabled_stream_compression_algorithms_bitset; + /** Supported stream compression algorithms */ + uint32_t supported_stream_compression_algorithms; } channel_data; static bool skip_compression(grpc_call_element *elem, uint32_t flags, @@ -106,31 +115,56 @@ static grpc_error *process_send_initial_metadata( call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; *has_compression_algorithm = false; - /* Parse incoming request for compression. If any, it'll be available - * at calld->compression_algorithm */ - if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) { + grpc_stream_compression_algorithm stream_compression_algorithm = + GRPC_STREAM_COMPRESS_NONE; + if (initial_metadata->idx.named.grpc_internal_stream_encoding_request != + NULL) { grpc_mdelem md = - initial_metadata->idx.named.grpc_internal_encoding_request->md; - if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md), - &calld->compression_algorithm)) { + initial_metadata->idx.named.grpc_internal_stream_encoding_request->md; + if (!grpc_stream_compression_algorithm_parse( + GRPC_MDVALUE(md), &stream_compression_algorithm)) { char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, - "Invalid compression algorithm: '%s' (unknown). Ignoring.", val); + "Invalid stream compression algorithm: '%s' (unknown). Ignoring.", + val); gpr_free(val); - calld->compression_algorithm = GRPC_COMPRESS_NONE; + stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; + } + if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset, + stream_compression_algorithm)) { + char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); + gpr_log( + GPR_ERROR, + "Invalid stream compression algorithm: '%s' (previously disabled). " + "Ignoring.", + val); + gpr_free(val); + stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; + } + *has_compression_algorithm = true; + grpc_metadata_batch_remove( + exec_ctx, initial_metadata, + initial_metadata->idx.named.grpc_internal_stream_encoding_request); + /* Disable message-wise compression */ + calld->compression_algorithm = GRPC_COMPRESS_NONE; + if (initial_metadata->idx.named.grpc_internal_encoding_request != NULL) { + grpc_metadata_batch_remove( + exec_ctx, initial_metadata, + initial_metadata->idx.named.grpc_internal_encoding_request); } - if (!GPR_BITGET(channeld->enabled_algorithms_bitset, - calld->compression_algorithm)) { + } else if (initial_metadata->idx.named.grpc_internal_encoding_request != + NULL) { + grpc_mdelem md = + initial_metadata->idx.named.grpc_internal_encoding_request->md; + if (!grpc_compression_algorithm_parse(GRPC_MDVALUE(md), + &calld->compression_algorithm)) { char *val = grpc_slice_to_c_string(GRPC_MDVALUE(md)); gpr_log(GPR_ERROR, - "Invalid compression algorithm: '%s' (previously disabled). " - "Ignoring.", - val); + "Invalid compression algorithm: '%s' (unknown). Ignoring.", val); gpr_free(val); calld->compression_algorithm = GRPC_COMPRESS_NONE; } *has_compression_algorithm = true; - grpc_metadata_batch_remove( exec_ctx, initial_metadata, initial_metadata->idx.named.grpc_internal_encoding_request); @@ -138,13 +172,25 @@ static grpc_error *process_send_initial_metadata( /* If no algorithm was found in the metadata and we aren't * exceptionally skipping compression, fall back to the channel * default */ - calld->compression_algorithm = channeld->default_compression_algorithm; + if (channeld->default_stream_compression_algorithm != + GRPC_STREAM_COMPRESS_NONE) { + stream_compression_algorithm = + channeld->default_stream_compression_algorithm; + calld->compression_algorithm = GRPC_COMPRESS_NONE; + } else { + calld->compression_algorithm = channeld->default_compression_algorithm; + } *has_compression_algorithm = true; } grpc_error *error = GRPC_ERROR_NONE; /* hint compression algorithm */ - if (calld->compression_algorithm != GRPC_COMPRESS_NONE) { + if (stream_compression_algorithm != GRPC_STREAM_COMPRESS_NONE) { + error = grpc_metadata_batch_add_tail( + exec_ctx, initial_metadata, + &calld->stream_compression_algorithm_storage, + grpc_stream_compression_encoding_mdelem(stream_compression_algorithm)); + } else if (calld->compression_algorithm != GRPC_COMPRESS_NONE) { error = grpc_metadata_batch_add_tail( exec_ctx, initial_metadata, &calld->compression_algorithm_storage, grpc_compression_encoding_mdelem(calld->compression_algorithm)); @@ -158,6 +204,16 @@ static grpc_error *process_send_initial_metadata( GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS( channeld->supported_compression_algorithms)); + if (error != GRPC_ERROR_NONE) return error; + + /* Do not overwrite accept-encoding header if it already presents. */ + if (!initial_metadata->idx.named.accept_encoding) { + error = grpc_metadata_batch_add_tail( + exec_ctx, initial_metadata, &calld->accept_stream_encoding_storage, + GRPC_MDELEM_ACCEPT_STREAM_ENCODING_FOR_ALGORITHMS( + channeld->supported_stream_compression_algorithms)); + } + return error; } @@ -435,6 +491,7 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element_args *args) { channel_data *channeld = elem->channel_data; + /* Configuration for message compression */ channeld->enabled_algorithms_bitset = grpc_channel_args_compression_algorithm_get_states(args->channel_args); @@ -449,16 +506,32 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, channeld->default_compression_algorithm = GRPC_COMPRESS_NONE; } - channeld->supported_compression_algorithms = 1; /* always support identity */ - for (grpc_compression_algorithm algo_idx = 1; - algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) { - /* skip disabled algorithms */ - if (!GPR_BITGET(channeld->enabled_algorithms_bitset, algo_idx)) { - continue; - } - channeld->supported_compression_algorithms |= 1u << algo_idx; + channeld->supported_compression_algorithms = + (((1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1) & + channeld->enabled_algorithms_bitset) | + 1u; + + /* Configuration for stream compression */ + channeld->enabled_stream_compression_algorithms_bitset = + grpc_channel_args_stream_compression_algorithm_get_states( + args->channel_args); + + channeld->default_stream_compression_algorithm = + grpc_channel_args_get_stream_compression_algorithm(args->channel_args); + + if (!GPR_BITGET(channeld->enabled_stream_compression_algorithms_bitset, + channeld->default_stream_compression_algorithm)) { + gpr_log(GPR_DEBUG, + "stream compression algorithm %d not enabled: switching to none", + channeld->default_stream_compression_algorithm); + channeld->default_stream_compression_algorithm = GRPC_STREAM_COMPRESS_NONE; } + channeld->supported_stream_compression_algorithms = + (((1u << GRPC_STREAM_COMPRESS_ALGORITHMS_COUNT) - 1) & + channeld->enabled_stream_compression_algorithms_bitset) | + 1u; + GPR_ASSERT(!args->is_last); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 7bad188f4e..7541bd5c92 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -1298,6 +1298,15 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, if (op->send_initial_metadata) { GPR_ASSERT(s->send_initial_metadata_finished == NULL); on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE; + + /* Identify stream compression */ + if ((s->stream_compression_send_enabled = + (op_payload->send_initial_metadata.send_initial_metadata->idx.named + .content_encoding != NULL)) == true) { + s->compressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer)); + grpc_slice_buffer_init(s->compressed_data_buffer); + } + s->send_initial_metadata_finished = add_closure_barrier(on_complete); s->send_initial_metadata = op_payload->send_initial_metadata.send_initial_metadata; @@ -2703,6 +2712,9 @@ static grpc_error *incoming_byte_stream_pull(grpc_exec_ctx *exec_ctx, grpc_stream_compression_context_destroy(s->stream_decompression_ctx); s->stream_decompression_ctx = NULL; } + if (s->unprocessed_incoming_frames_buffer.length == 0) { + *slice = grpc_empty_slice(); + } } error = grpc_deframe_unprocessed_incoming_frames( exec_ctx, &s->data_parser, s, &s->unprocessed_incoming_frames_buffer, diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 7f37365558..c21d76ba71 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1655,6 +1655,23 @@ static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst"); } +static void parse_stream_compression_md(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s, + grpc_metadata_batch *initial_metadata) { + if (initial_metadata->idx.named.content_encoding != NULL) { + grpc_slice content_encoding = + GRPC_MDVALUE(initial_metadata->idx.named.content_encoding->md); + if (!grpc_slice_eq(content_encoding, GRPC_MDSTR_IDENTITY)) { + if (grpc_slice_eq(content_encoding, GRPC_MDSTR_GZIP)) { + s->stream_compression_recv_enabled = true; + s->decompressed_data_buffer = gpr_malloc(sizeof(grpc_slice_buffer)); + grpc_slice_buffer_init(s->decompressed_data_buffer); + } + } + } +} + grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, void *hpack_parser, grpc_chttp2_transport *t, @@ -1681,9 +1698,16 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, if (s != NULL) { if (parser->is_boundary) { if (s->header_frames_received == GPR_ARRAY_SIZE(s->metadata_buffer)) { + GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0); return GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Too many trailer frames"); } + /* Process stream compression md element if it exists */ + if (s->header_frames_received == + 0) { /* Only acts on initial metadata */ + parse_stream_compression_md(exec_ctx, t, s, + &s->metadata_buffer[0].batch); + } s->published_metadata[s->header_frames_received] = GRPC_METADATA_PUBLISHED_FROM_WIRE; maybe_complete_funcs[s->header_frames_received](exec_ctx, t, s); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 80eb51ff0d..711938b278 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -154,17 +154,8 @@ static uint32_t target_write_size(grpc_chttp2_transport *t) { } // Returns true if initial_metadata contains only default headers. -// -// TODO(roth): The fact that we hard-code these particular headers here -// is fairly ugly. Need some better way to know which headers are -// default, maybe via a bit in the static metadata table? static bool is_default_initial_metadata(grpc_metadata_batch *initial_metadata) { - int num_default_fields = - (initial_metadata->idx.named.status != NULL) + - (initial_metadata->idx.named.content_type != NULL) + - (initial_metadata->idx.named.grpc_encoding != NULL) + - (initial_metadata->idx.named.grpc_accept_encoding != NULL); - return (size_t)num_default_fields == initial_metadata->list.count; + return initial_metadata->list.default_count == initial_metadata->list.count; } grpc_chttp2_begin_write_result grpc_chttp2_begin_write( diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index abb558982b..765c13c65e 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -187,9 +187,34 @@ struct stream_obj { /* Mutex to protect storage */ gpr_mu mu; + + /* Refcount object of the stream */ + grpc_stream_refcount *refcount; }; typedef struct stream_obj stream_obj; +#ifndef NDEBUG +#define GRPC_CRONET_STREAM_REF(stream, reason) \ + grpc_cronet_stream_ref((stream), (reason)) +#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ + grpc_cronet_stream_unref((exec_ctx), (stream), (reason)) +void grpc_cronet_stream_ref(stream_obj *s, const char *reason) { + grpc_stream_ref(s->refcount, reason); +} +void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s, + const char *reason) { + grpc_stream_unref(exec_ctx, s->refcount, reason); +} +#else +#define GRPC_CRONET_STREAM_REF(stream, reason) grpc_cronet_stream_ref((stream)) +#define GRPC_CRONET_STREAM_UNREF(exec_ctx, stream, reason) \ + grpc_cronet_stream_unref((exec_ctx), (stream)) +void grpc_cronet_stream_ref(stream_obj *s) { grpc_stream_ref(s->refcount); } +void grpc_cronet_stream_unref(grpc_exec_ctx *exec_ctx, stream_obj *s) { + grpc_stream_unref(exec_ctx, s->refcount); +} +#endif + static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas); @@ -346,13 +371,12 @@ static void remove_from_storage(struct stream_obj *s, This can get executed from the Cronet network thread via cronet callback or on the application supplied thread via the perform_stream_op function. */ -static void execute_from_storage(stream_obj *s) { - grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; +static void execute_from_storage(grpc_exec_ctx *exec_ctx, stream_obj *s) { gpr_mu_lock(&s->mu); for (struct op_and_state *curr = s->storage.head; curr != NULL;) { CRONET_LOG(GPR_DEBUG, "calling op at %p. done = %d", curr, curr->done); GPR_ASSERT(curr->done == 0); - enum e_op_result result = execute_stream_op(&exec_ctx, curr); + enum e_op_result result = execute_stream_op(exec_ctx, curr); CRONET_LOG(GPR_DEBUG, "execute_stream_op[%p] returns %s", curr, op_result_string(result)); /* if this op is done, then remove it and free memory */ @@ -369,7 +393,6 @@ static void execute_from_storage(stream_obj *s) { } } gpr_mu_unlock(&s->mu); - grpc_exec_ctx_finish(&exec_ctx); } /* @@ -377,6 +400,8 @@ static void execute_from_storage(stream_obj *s) { */ static void on_failed(bidirectional_stream *stream, int net_error) { CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); bidirectional_stream_destroy(s->cbs); @@ -392,7 +417,9 @@ static void on_failed(bidirectional_stream *stream, int net_error) { } null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -400,6 +427,8 @@ static void on_failed(bidirectional_stream *stream, int net_error) { */ static void on_canceled(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); bidirectional_stream_destroy(s->cbs); @@ -415,7 +444,9 @@ static void on_canceled(bidirectional_stream *stream) { } null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -423,6 +454,8 @@ static void on_canceled(bidirectional_stream *stream) { */ static void on_succeeded(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + stream_obj *s = (stream_obj *)stream->annotation; gpr_mu_lock(&s->mu); bidirectional_stream_destroy(s->cbs); @@ -430,7 +463,9 @@ static void on_succeeded(bidirectional_stream *stream) { s->cbs = NULL; null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + GRPC_CRONET_STREAM_UNREF(&exec_ctx, s, "cronet transport"); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -438,6 +473,7 @@ static void on_succeeded(bidirectional_stream *stream) { */ static void on_stream_ready(bidirectional_stream *stream) { CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj *s = (stream_obj *)stream->annotation; grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); @@ -457,7 +493,8 @@ static void on_stream_ready(bidirectional_stream *stream) { } } gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -513,14 +550,15 @@ static void on_response_headers_received( s->state.pending_read_from_cronet = true; } gpr_mu_unlock(&s->mu); + execute_from_storage(&exec_ctx, s); grpc_exec_ctx_finish(&exec_ctx); - execute_from_storage(s); } /* Cronet callback */ static void on_write_completed(bidirectional_stream *stream, const char *data) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "W: on_write_completed(%p, %s)", stream, data); gpr_mu_lock(&s->mu); @@ -530,7 +568,8 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) { } s->state.state_callback_received[OP_SEND_MESSAGE] = true; gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -538,6 +577,7 @@ static void on_write_completed(bidirectional_stream *stream, const char *data) { */ static void on_read_completed(bidirectional_stream *stream, char *data, int count) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; stream_obj *s = (stream_obj *)stream->annotation; CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); @@ -563,14 +603,15 @@ static void on_read_completed(bidirectional_stream *stream, char *data, gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); } } else { null_and_maybe_free_read_buffer(s); s->state.rs.read_stream_closed = true; gpr_mu_unlock(&s->mu); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); } + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -625,12 +666,11 @@ static void on_response_trailers_received( s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); - grpc_exec_ctx_finish(&exec_ctx); } else { gpr_mu_unlock(&s->mu); - grpc_exec_ctx_finish(&exec_ctx); - execute_from_storage(s); + execute_from_storage(&exec_ctx, s); } + grpc_exec_ctx_finish(&exec_ctx); } /* @@ -1313,6 +1353,9 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, const void *server_data, gpr_arena *arena) { stream_obj *s = (stream_obj *)gs; + + s->refcount = refcount; + GRPC_CRONET_STREAM_REF(s, "cronet transport"); memset(&s->storage, 0, sizeof(s->storage)); s->storage.head = NULL; memset(&s->state, 0, sizeof(s->state)); @@ -1370,7 +1413,7 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, } stream_obj *s = (stream_obj *)gs; add_to_storage(s, op); - execute_from_storage(s); + execute_from_storage(exec_ctx, s); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, |