diff options
author | Sree Kuchibhotla <sreek@google.com> | 2017-03-21 10:28:40 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2017-03-21 10:28:40 -0700 |
commit | 36c370793ba250fc423dcd0947fc8a07759c4d08 (patch) | |
tree | a82d809fb9ae41011e5f47c11df208d7d1db5736 /src/core/ext | |
parent | 89da88c7b44cb0f1765f6216faf2d5ec3d16f403 (diff) | |
parent | e50c7bdcc83638544eed01f20c19b89648d78fe9 (diff) |
Merge branch 'master' into cq_create_api_changes
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/census/grpc_filter.c | 4 | ||||
-rw-r--r-- | src/core/ext/client_channel/client_channel.c | 173 | ||||
-rw-r--r-- | src/core/ext/client_channel/parse_address.c | 5 | ||||
-rw-r--r-- | src/core/ext/client_channel/proxy_mapper_registry.c | 12 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.c | 49 | ||||
-rw-r--r-- | src/core/ext/client_channel/subchannel.h | 18 | ||||
-rw-r--r-- | src/core/ext/load_reporting/load_reporting_filter.c | 2 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/chttp2_transport.c | 41 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/frame_ping.c | 2 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/hpack_parser.c | 17 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/incoming_metadata.c | 65 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/incoming_metadata.h | 18 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/internal.h | 2 | ||||
-rw-r--r-- | src/core/ext/transport/chttp2/transport/parsing.c | 60 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 176 |
15 files changed, 356 insertions, 288 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index b80d831557..fc29dbd454 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -138,7 +138,7 @@ static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ @@ -160,7 +160,7 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index bf64f84772..960d00e815 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -71,7 +71,8 @@ */ typedef enum { - WAIT_FOR_READY_UNSET, + /* zero so it can be default initialized */ + WAIT_FOR_READY_UNSET = 0, WAIT_FOR_READY_FALSE, WAIT_FOR_READY_TRUE } wait_for_ready_value; @@ -631,7 +632,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, #define CANCELLED_CALL ((grpc_subchannel_call *)1) typedef enum { - GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, + /* zero so that it can be default-initialized */ + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING = 0, GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL } subchannel_creation_phase; @@ -653,13 +655,13 @@ typedef struct client_channel_call_data { gpr_timespec call_start_time; gpr_timespec deadline; method_parameters *method_params; - grpc_closure read_service_config; grpc_error *cancel_error; /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; + gpr_arena *arena; subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; @@ -726,6 +728,47 @@ static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { gpr_free(ops); } +// Sets calld->method_params. +// If the method params specify a timeout, populates +// *per_method_deadline and returns true. +static bool set_call_method_params_from_service_config_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + gpr_timespec *per_method_deadline) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->method_params_table != NULL) { + calld->method_params = grpc_method_config_table_get( + exec_ctx, chand->method_params_table, calld->path); + if (calld->method_params != NULL) { + method_parameters_ref(calld->method_params); + if (gpr_time_cmp(calld->method_params->timeout, + gpr_time_0(GPR_TIMESPAN)) != 0) { + *per_method_deadline = + gpr_time_add(calld->call_start_time, calld->method_params->timeout); + return true; + } + } + } + return false; +} + +static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + /* apply service-config level configuration to the call (now that we're + * certain it exists) */ + call_data *calld = elem->call_data; + gpr_timespec per_method_deadline; + if (set_call_method_params_from_service_config_locked(exec_ctx, elem, + &per_method_deadline)) { + // If the deadline from the service config is shorter than the one + // from the client API, reset the deadline timer. + if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { + calld->deadline = per_method_deadline; + grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); + } + } +} + static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_call_element *elem = arg; @@ -754,9 +797,14 @@ static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, } else { /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; + const grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -851,6 +899,7 @@ static bool pick_subchannel_locked( } GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { + apply_final_configuration_locked(exec_ctx, elem); grpc_lb_policy *lb_policy = chand->lb_policy; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); // If the application explicitly set wait_for_ready, use that. @@ -982,9 +1031,14 @@ static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; + const grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); @@ -1060,114 +1114,19 @@ static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, GPR_TIMER_END("cc_start_transport_stream_op", 0); } -// Sets calld->method_params. -// If the method params specify a timeout, populates -// *per_method_deadline and returns true. -static bool set_call_method_params_from_service_config_locked( - grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - gpr_timespec *per_method_deadline) { - channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; - if (chand->method_params_table != NULL) { - calld->method_params = grpc_method_config_table_get( - exec_ctx, chand->method_params_table, calld->path); - if (calld->method_params != NULL) { - method_parameters_ref(calld->method_params); - if (gpr_time_cmp(calld->method_params->timeout, - gpr_time_0(GPR_TIMESPAN)) != 0) { - *per_method_deadline = - gpr_time_add(calld->call_start_time, calld->method_params->timeout); - return true; - } - } - } - return false; -} - -// Gets data from the service config. Invoked when the resolver returns -// its initial result. -static void read_service_config_locked(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_call_element *elem = arg; - call_data *calld = elem->call_data; - // If this is an error, there's no point in looking at the service config. - if (error == GRPC_ERROR_NONE) { - gpr_timespec per_method_deadline; - if (set_call_method_params_from_service_config_locked( - exec_ctx, elem, &per_method_deadline)) { - // If the deadline from the service config is shorter than the one - // from the client API, reset the deadline timer. - if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { - calld->deadline = per_method_deadline; - grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); - } - } - } - GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); -} - -static void initial_read_service_config_locked(grpc_exec_ctx *exec_ctx, - void *arg, - grpc_error *error_ignored) { - grpc_call_element *elem = arg; - channel_data *chand = elem->channel_data; - call_data *calld = elem->call_data; - // If the resolver has already returned results, then we can access - // the service config parameters immediately. Otherwise, we need to - // defer that work until the resolver returns an initial result. - if (chand->lb_policy != NULL) { - // We already have a resolver result, so check for service config. - gpr_timespec per_method_deadline; - if (set_call_method_params_from_service_config_locked( - exec_ctx, elem, &per_method_deadline)) { - calld->deadline = gpr_time_min(calld->deadline, per_method_deadline); - } - } else { - // We don't yet have a resolver result, so register a callback to - // get the service config data once the resolver returns. - // Take a reference to the call stack to be owned by the callback. - GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config_locked, - elem, grpc_combiner_scheduler(chand->combiner, false)); - grpc_closure_list_append(&chand->waiting_for_config_closures, - &calld->read_service_config, GRPC_ERROR_NONE); - } - // Start the deadline timer with the current deadline value. If we - // do not yet have service config data, then the timer may be reset - // later. - grpc_deadline_state_start(exec_ctx, elem, calld->deadline); - GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, - "initial_read_service_config"); -} - /* Constructor for call_data */ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_element_args *args) { - channel_data *chand = elem->channel_data; call_data *calld = elem->call_data; // Initialize data members. grpc_deadline_state_init(exec_ctx, elem, args->call_stack); calld->path = grpc_slice_ref_internal(args->path); calld->call_start_time = args->start_time; calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); - calld->method_params = NULL; - calld->cancel_error = GRPC_ERROR_NONE; - gpr_atm_rel_store(&calld->subchannel_call, 0); - calld->connected_subchannel = NULL; - calld->waiting_ops = NULL; - calld->waiting_ops_count = 0; - calld->waiting_ops_capacity = 0; - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->owning_call = args->call_stack; - calld->pollent = NULL; - GRPC_CALL_STACK_REF(calld->owning_call, "initial_read_service_config"); - grpc_closure_sched( - exec_ctx, - grpc_closure_init(&calld->read_service_config, - initial_read_service_config_locked, elem, - grpc_combiner_scheduler(chand->combiner, false)), - GRPC_ERROR_NONE); + calld->arena = args->arena; + grpc_deadline_state_start(exec_ctx, elem, calld->deadline); return GRPC_ERROR_NONE; } @@ -1175,7 +1134,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); grpc_slice_unref_internal(exec_ctx, calld->path); @@ -1185,6 +1144,8 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { + grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + then_schedule_closure = NULL; GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); @@ -1194,7 +1155,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, "picked"); } gpr_free(calld->waiting_ops); - gpr_free(and_free_memory); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c index 8ae15fc72b..cd1b2cd80c 100644 --- a/src/core/ext/client_channel/parse_address.c +++ b/src/core/ext/client_channel/parse_address.c @@ -128,6 +128,7 @@ int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) { GPR_ASSERT(host_end >= host); char host_without_scope[INET6_ADDRSTRLEN]; size_t host_without_scope_len = (size_t)(host_end - host); + uint32_t sin6_scope_id = 0; strncpy(host_without_scope, host, host_without_scope_len); host_without_scope[host_without_scope_len] = '\0'; if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) { @@ -136,10 +137,12 @@ int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) { } if (gpr_parse_bytes_to_uint32(host_end + 1, strlen(host) - host_without_scope_len - 1, - &in6->sin6_scope_id) == 0) { + &sin6_scope_id) == 0) { gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1); goto done; } + // Handle "sin6_scope_id" being type "u_long". See grpc issue ##10027. + in6->sin6_scope_id = sin6_scope_id; } else { if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); diff --git a/src/core/ext/client_channel/proxy_mapper_registry.c b/src/core/ext/client_channel/proxy_mapper_registry.c index 2c44b9d490..0935ddbdbd 100644 --- a/src/core/ext/client_channel/proxy_mapper_registry.c +++ b/src/core/ext/client_channel/proxy_mapper_registry.c @@ -94,6 +94,14 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { grpc_proxy_mapper_destroy(list->list[i]); } gpr_free(list->list); + // Clean up in case we re-initialze later. + // TODO(ctiller): This should ideally live in + // grpc_proxy_mapper_registry_init(). However, if we did this there, + // then we would do it AFTER we start registering proxy mappers from + // third-party plugins, so they'd never show up (and would leak memory). + // We probably need some sort of dependency system for plugins to fix + // this. + memset(list, 0, sizeof(*list)); } // @@ -102,9 +110,7 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { static grpc_proxy_mapper_list g_proxy_mapper_list; -void grpc_proxy_mapper_registry_init() { - memset(&g_proxy_mapper_list, 0, sizeof(g_proxy_mapper_list)); -} +void grpc_proxy_mapper_registry_init() {} void grpc_proxy_mapper_registry_shutdown() { grpc_proxy_mapper_list_destroy(&g_proxy_mapper_list); diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 5df0a9060d..ed5029ea9a 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -148,6 +148,7 @@ struct grpc_subchannel { struct grpc_subchannel_call { grpc_connected_subchannel *connection; + grpc_closure *schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) @@ -340,17 +341,15 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(new_address != NULL); gpr_free(addr); addr = new_address; - if (new_args != NULL) c->args = new_args; - } - if (c->args == NULL) { - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); - c->args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, - 1); - gpr_free(new_arg.value.string); } + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); gpr_free(addr); + c->args = grpc_channel_args_copy_and_add_and_remove( + new_args != NULL ? new_args : args->args, keys_to_remove, + GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); + gpr_free(new_arg.value.string); + if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c, @@ -719,13 +718,22 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_subchannel_call *c = call; + GPR_ASSERT(c->schedule_closure_after_destroy != NULL); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_connected_subchannel *connection = c->connection; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c); + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, + c->schedule_closure_after_destroy); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } +void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call, + grpc_closure *closure) { + GPR_ASSERT(call->schedule_closure_after_destroy == NULL); + GPR_ASSERT(closure != NULL); + call->schedule_closure_after_destroy = closure; +} + void grpc_subchannel_call_ref( grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); @@ -761,15 +769,22 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **call) { + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = gpr_zalloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); + *call = gpr_arena_alloc( + args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. - grpc_error *error = - grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, path, start_time, deadline, callstk); + const grpc_call_element_args call_args = {.call_stack = callstk, + .server_transport_data = NULL, + .context = NULL, + .path = args->path, + .start_time = args->start_time, + .deadline = args->deadline, + .arena = args->arena}; + grpc_error *error = grpc_call_stack_init( + exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); @@ -778,7 +793,7 @@ grpc_error *grpc_connected_subchannel_create_call( return error; } GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent); + grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent); return GRPC_ERROR_NONE; } diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 6a70a76467..3e64a2507c 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -37,6 +37,7 @@ #include "src/core/ext/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -112,10 +113,18 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a subchannel call */ +typedef struct { + grpc_polling_entity *pollent; + grpc_slice path; + gpr_timespec start_time; + gpr_timespec deadline; + gpr_arena *arena; +} grpc_connected_subchannel_call_args; + grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **subchannel_call); + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( @@ -154,6 +163,11 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call); +/** Must be called once per call. Sets the 'then_schedule_closure' argument for + call stack destruction. */ +void grpc_subchannel_call_set_cleanup_closure( + grpc_subchannel_call *subchannel_call, grpc_closure *closure); + grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call); diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index c2750634a5..4ed832671d 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -123,7 +123,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; /* TODO(dgq): do something with the data diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index da4c7dc7b2..082078c72f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -511,6 +511,10 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (!grpc_error_has_clear_grpc_status(error)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) { if (t->close_transport_on_writes_finished == NULL) { t->close_transport_on_writes_finished = @@ -520,10 +524,6 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error_add_child(t->close_transport_on_writes_finished, error); return; } - if (!grpc_error_has_clear_grpc_status(error)) { - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE); - } t->closed = 1; connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); @@ -575,7 +575,7 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; @@ -588,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); @@ -665,16 +665,17 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GPR_TIMER_END("destroy_stream", 0); - gpr_free(s->destroy_stream_arg); + grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - s->destroy_stream_arg = and_free_memory; + s->destroy_stream_arg = then_schedule_closure; grpc_closure_sched( exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, grpc_combiner_scheduler(t->combiner, false)), @@ -1629,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->recv_trailing_metadata_finished != NULL) { char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS, - grpc_slice_from_copied_string(status_string))); + GRPC_LOG_IF_ERROR("add_status", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_GRPC_STATUS, + grpc_slice_from_copied_string(status_string)))); if (msg != NULL) { - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_from_copied_string(msg))); + GRPC_LOG_IF_ERROR( + "add_status_message", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index f487533c41..9b4b1a7b84 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -91,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_ping_parser *p = parser; while (p->byte != 8 && cur != end) { - p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte)); + p->opaque_8bytes |= (((uint64_t)*cur) << (56 - 8 * p->byte)); cur++; p->byte++; } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index 40f5120308..1865b997b7 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -1620,13 +1620,18 @@ void grpc_chttp2_hpack_parser_destroy(grpc_exec_ctx *exec_ctx, grpc_error *grpc_chttp2_hpack_parser_parse(grpc_exec_ctx *exec_ctx, grpc_chttp2_hpack_parser *p, grpc_slice slice) { - /* TODO(ctiller): limit the distance of end from beg, and perform multiple - steps in the event of a large chunk of data to limit - stack space usage when no tail call optimization is - available */ +/* max number of bytes to parse at a time... limits call stack depth on + * compilers without TCO */ +#define MAX_PARSE_LENGTH 1024 p->current_slice_refcount = slice.refcount; - grpc_error *error = p->state(exec_ctx, p, GRPC_SLICE_START_PTR(slice), - GRPC_SLICE_END_PTR(slice)); + uint8_t *start = GRPC_SLICE_START_PTR(slice); + uint8_t *end = GRPC_SLICE_END_PTR(slice); + grpc_error *error = GRPC_ERROR_NONE; + while (start != end && error == GRPC_ERROR_NONE) { + uint8_t *target = start + GPR_MIN(MAX_PARSE_LENGTH, end - start); + error = p->state(exec_ctx, p, start, target); + start = target; + } p->current_slice_refcount = NULL; return error; } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index c91b019aa0..da0a34d32a 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -41,69 +41,48 @@ #include <grpc/support/log.h> void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer) { - buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { + buffer->arena = arena; + grpc_metadata_batch_init(&buffer->batch); + buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) { - size_t i; - if (!buffer->published) { - for (i = 0; i < buffer->count; i++) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - gpr_free(buffer->elems); + grpc_metadata_batch_destroy(exec_ctx, &buffer->batch); } -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - GPR_ASSERT(!buffer->published); - if (buffer->capacity == buffer->count) { - buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); - buffer->elems = - gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity); - } - buffer->elems[buffer->count++].md = elem; +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); + return grpc_metadata_batch_add_tail( + exec_ctx, &buffer->batch, + gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); } -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - for (size_t i = 0; i < buffer->count; i++) { - if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - buffer->elems[i].md = elem; - return; + for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL; + l = l->next) { + if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + l->md = elem; + return GRPC_ERROR_NONE; } } - grpc_chttp2_incoming_metadata_buffer_add(buffer, elem); + return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem); } void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { - GPR_ASSERT(!buffer->published); - buffer->deadline = deadline; + buffer->batch.deadline = deadline; } void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) { - GPR_ASSERT(!buffer->published); - buffer->published = 1; - if (buffer->count > 0) { - size_t i; - for (i = 0; i < buffer->count; i++) { - /* TODO(ctiller): do something better here */ - if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish", - grpc_metadata_batch_link_tail( - exec_ctx, batch, &buffer->elems[i]))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - } else { - batch->list.head = batch->list.tail = NULL; - } - batch->deadline = buffer->deadline; + *batch = buffer->batch; + grpc_metadata_batch_init(&buffer->batch); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index 1eac6fc150..288c917e65 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -37,28 +37,26 @@ #include "src/core/lib/transport/transport.h" typedef struct { - grpc_linked_mdelem *elems; - size_t count; - size_t capacity; - gpr_timespec deadline; - int published; + gpr_arena *arena; + grpc_metadata_batch batch; size_t size; // total size of metadata } grpc_chttp2_incoming_metadata_buffer; /** assumes everything initially zeroed */ void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena); void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch); -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem); -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, - grpc_mdelem elem); + grpc_mdelem elem) GRPC_MUST_USE_RESULT; +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) GRPC_MUST_USE_RESULT; void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index d26812ad6b..3c56c21599 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -425,7 +425,7 @@ struct grpc_chttp2_stream { grpc_stream_refcount *refcount; grpc_closure destroy_stream; - void *destroy_stream_arg; + grpc_closure *destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT]; diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 7ed00522c3..7efc8c63c9 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -381,16 +381,38 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, s->incoming_window_delta + t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { - char *msg; - gpr_asprintf(&msg, - "frame of size %d overflows incoming window of %" PRId64, - t->incoming_frame_size, - s->incoming_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); - grpc_error *err = GRPC_ERROR_CREATE(msg); - gpr_free(msg); - return err; + if (incoming_frame_size <= + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { + gpr_log( + GPR_ERROR, + "Incoming frame of size %d exceeds incoming window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to " + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + } else { + char *msg; + gpr_asprintf(&msg, + "frame of size %d overflows incoming window of %" PRId64, + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + grpc_error *err = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + return err; + } } GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, @@ -526,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[0], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } } @@ -576,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[1], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } GPR_TIMER_END("on_trailing_header", 0); diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 01a03533da..450d9ab23a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -54,6 +54,7 @@ #include "third_party/objective_c/Cronet/bidirectional_stream_c.h" #define GRPC_HEADER_SIZE_IN_BYTES 5 +#define GRPC_FLUSH_READ_SIZE 4096 #define CRONET_LOG(...) \ do { \ @@ -151,11 +152,17 @@ struct write_state { struct op_state { bool state_op_done[OP_NUM_OPS]; bool state_callback_received[OP_NUM_OPS]; + /* A non-zero gRPC status code has been seen */ bool fail_state; + /* Transport is discarding all buffered messages */ bool flush_read; bool flush_cronet_when_ready; bool pending_write_for_trailer; - bool unprocessed_send_message; + bool pending_send_message; + /* User requested RECV_TRAILING_METADATA */ + bool pending_recv_trailing_metadata; + /* Cronet has not issued a callback of a bidirectional read */ + bool pending_read_from_cronet; grpc_error *cancel_error; /* data structure for storing data coming from server */ struct read_state rs; @@ -177,6 +184,7 @@ struct op_storage { }; struct stream_obj { + gpr_arena *arena; struct op_and_state *oas; grpc_transport_stream_op *curr_op; grpc_cronet_transport *curr_ct; @@ -248,11 +256,35 @@ static const char *op_id_string(enum e_op_id i) { return "UNKNOWN"; } -static void free_read_buffer(stream_obj *s) { +static void null_and_maybe_free_read_buffer(stream_obj *s) { if (s->state.rs.read_buffer && s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { gpr_free(s->state.rs.read_buffer); - s->state.rs.read_buffer = NULL; + } + s->state.rs.read_buffer = NULL; +} + +static void maybe_flush_read(stream_obj *s) { + /* To enter flush read state (discarding all the buffered messages in + * transport layer), two conditions must be satisfied: 1) non-zero grpc status + * has been received, and 2) an op requesting the status code + * (RECV_TRAILING_METADATA) is issued by the user. (See + * doc/status_ordering.md) */ + /* Whenever the evaluation of any of the two condition is changed, we check + * whether we should enter the flush read state. */ + if (s->state.pending_recv_trailing_metadata && s->state.fail_state) { + if (!s->state.flush_read && !s->state.rs.read_stream_closed) { + CRONET_LOG(GPR_DEBUG, "%p: Flush read", s); + s->state.flush_read = true; + null_and_maybe_free_read_buffer(s); + s->state.rs.read_buffer = gpr_malloc(GRPC_FLUSH_READ_SIZE); + if (!s->state.pending_read_from_cronet) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + GRPC_FLUSH_READ_SIZE); + s->state.pending_read_from_cronet = true; + } + } } } @@ -279,7 +311,11 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { storage->head = new_op; storage->num_pending_ops++; if (op->send_message) { - s->state.unprocessed_send_message = true; + s->state.pending_send_message = true; + } + if (op->recv_trailing_metadata) { + s->state.pending_recv_trailing_metadata = true; + maybe_flush_read(s); } CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, storage->num_pending_ops); @@ -367,7 +403,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -390,7 +426,7 @@ static void on_canceled(bidirectional_stream *stream) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -405,7 +441,7 @@ static void on_succeeded(bidirectional_stream *stream) { bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_SUCCEEDED] = true; s->cbs = NULL; - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -451,15 +487,18 @@ static void on_response_headers_received( gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, + s->arena); for (size_t i = 0; i < headers->count; i++) { - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(headers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_headers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.initial_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].value))))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -473,6 +512,7 @@ static void on_response_headers_received( CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, s->state.rs.remaining_bytes); + s->state.pending_read_from_cronet = true; } gpr_mu_unlock(&s->mu); grpc_exec_ctx_finish(&exec_ctx); @@ -504,10 +544,13 @@ static void on_read_completed(bidirectional_stream *stream, char *data, CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); gpr_mu_lock(&s->mu); + s->state.pending_read_from_cronet = false; s->state.state_callback_received[OP_RECV_MESSAGE] = true; if (count > 0 && s->state.flush_read) { CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); - bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + GRPC_FLUSH_READ_SIZE); + s->state.pending_read_from_cronet = true; gpr_mu_unlock(&s->mu); } else if (count > 0) { s->state.rs.received_bytes += count; @@ -518,16 +561,14 @@ static void on_read_completed(bidirectional_stream *stream, char *data, bidirectional_stream_read( s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, s->state.rs.remaining_bytes); + s->state.pending_read_from_cronet = true; gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); execute_from_storage(s); } } else { - if (s->state.flush_read) { - gpr_free(s->state.rs.read_buffer); - s->state.rs.read_buffer = NULL; - } + null_and_maybe_free_read_buffer(s); s->state.rs.read_stream_closed = true; gpr_mu_unlock(&s->mu); execute_from_storage(s); @@ -549,21 +590,25 @@ static void on_response_trailers_received( memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, + s->arena); for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(trailers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_trailers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.trailing_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].value))))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { s->state.fail_state = true; + maybe_flush_read(s); } } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; @@ -778,7 +823,7 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; /* we haven't sent message yet */ - else if (stream_state->unprocessed_send_message && + else if (stream_state->pending_send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false; /* we haven't got on_write_completed for the send yet */ @@ -900,7 +945,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } else if (stream_op->send_message && op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); - stream_state->unprocessed_send_message = false; + stream_state->pending_send_message = false; if (stream_state->state_callback_received[OP_FAILED]) { result = NO_ACTION_POSSIBLE; CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); @@ -1009,6 +1054,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; + } else if (stream_state->flush_read) { + CRONET_LOG(GPR_DEBUG, "flush read"); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); + stream_state->state_op_done[OP_RECV_MESSAGE] = true; + oas->state.state_op_done[OP_RECV_MESSAGE] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.length_field_received == false) { if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && stream_state->rs.remaining_bytes == 0) { @@ -1029,6 +1081,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_WITH_CALLBACK; } else { stream_state->rs.remaining_bytes = 0; @@ -1047,11 +1100,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; stream_state->rs.received_bytes = 0; + stream_state->rs.length_field_received = false; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); stream_state->state_op_done[OP_READ_REQ_MADE] = true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_state->rs.remaining_bytes == 0) { @@ -1064,6 +1119,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_WITH_CALLBACK; } else { result = NO_ACTION_POSSIBLE; @@ -1075,7 +1131,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); @@ -1096,6 +1152,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_op->recv_trailing_metadata && @@ -1153,15 +1210,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, make a note */ if (stream_op->recv_message) stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true; - } else if (stream_state->fail_state && !stream_state->flush_read) { - CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas); - if (stream_state->rs.read_buffer && - stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) { - gpr_free(stream_state->rs.read_buffer); - stream_state->rs.read_buffer = NULL; - } - stream_state->rs.read_buffer = gpr_malloc(4096); - stream_state->flush_read = true; } else { result = NO_ACTION_POSSIBLE; } @@ -1174,7 +1222,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { stream_obj *s = (stream_obj *)gs; memset(&s->storage, 0, sizeof(s->storage)); s->storage.head = NULL; @@ -1190,10 +1238,13 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, s->state.fail_state = s->state.flush_read = false; s->state.cancel_error = NULL; s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; - s->state.unprocessed_send_message = false; + s->state.pending_send_message = false; + s->state.pending_recv_trailing_metadata = false; + s->state.pending_read_from_cronet = false; s->curr_gs = gs; s->curr_ct = (grpc_cronet_transport *)gt; + s->arena = arena; gpr_mu_init(&s->mu); return 0; @@ -1209,38 +1260,33 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); - stream_obj *s = (stream_obj *)gs; - add_to_storage(s, op); if (op->send_initial_metadata && header_has_authority(op->send_initial_metadata->list.head)) { /* Cronet does not support :authority header field. We cancel the call when - this field is present in metadata */ - bidirectional_stream_header_array header_array; - bidirectional_stream_header *header; - bidirectional_stream cbs; - CRONET_LOG(GPR_DEBUG, - ":authority header is provided but not supported;" - " cancel operations"); - /* Notify application that operation is cancelled by forging trailers */ - header_array.count = 1; - header_array.capacity = 1; - header_array.headers = gpr_malloc(sizeof(bidirectional_stream_header)); - header = (bidirectional_stream_header *)header_array.headers; - header->key = "grpc-status"; - header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */ - cbs.annotation = (void *)s; - s->state.state_op_done[OP_CANCEL_ERROR] = true; - on_response_trailers_received(&cbs, &header_array); - gpr_free(header_array.headers); - } else { - execute_from_storage(s); + this field is present in metadata */ + if (op->recv_initial_metadata_ready) { + grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); + } + if (op->recv_message_ready) { + grpc_closure_sched(exec_ctx, op->recv_message_ready, + GRPC_ERROR_CANCELLED); + } + grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); + return; } + stream_obj *s = (stream_obj *)gs; + add_to_storage(s, op); + execute_from_storage(s); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { stream_obj *s = (stream_obj *)gs; + null_and_maybe_free_read_buffer(s); GRPC_ERROR_UNREF(s->state.cancel_error); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} |