diff options
Diffstat (limited to 'src/core/ext')
44 files changed, 1757 insertions, 1281 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index 65cfe1fa90..fc29dbd454 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -127,7 +127,7 @@ static void server_start_transport_op(grpc_exec_ctx *exec_ctx, static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); memset(d, 0, sizeof(*d)); @@ -138,7 +138,7 @@ static grpc_error *client_init_call_elem(grpc_exec_ctx *exec_ctx, static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */ @@ -146,7 +146,7 @@ static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx, static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); memset(d, 0, sizeof(*d)); @@ -160,7 +160,7 @@ static grpc_error *server_init_call_elem(grpc_exec_ctx *exec_ctx, static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *d = elem->call_data; GPR_ASSERT(d != NULL); /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */ diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index 06038bb5ba..960d00e815 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -51,6 +51,7 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/connected_channel.h" #include "src/core/lib/channel/deadline_filter.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/profiling/timers.h" @@ -70,29 +71,88 @@ */ typedef enum { - WAIT_FOR_READY_UNSET, + /* zero so it can be default initialized */ + WAIT_FOR_READY_UNSET = 0, WAIT_FOR_READY_FALSE, WAIT_FOR_READY_TRUE } wait_for_ready_value; -typedef struct method_parameters { +typedef struct { + gpr_refcount refs; gpr_timespec timeout; wait_for_ready_value wait_for_ready; } method_parameters; +static method_parameters *method_parameters_ref( + method_parameters *method_params) { + gpr_ref(&method_params->refs); + return method_params; +} + +static void method_parameters_unref(method_parameters *method_params) { + if (gpr_unref(&method_params->refs)) { + gpr_free(method_params); + } +} + static void *method_parameters_copy(void *value) { - void *new_value = gpr_malloc(sizeof(method_parameters)); - memcpy(new_value, value, sizeof(method_parameters)); - return new_value; + return method_parameters_ref(value); } -static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *p) { - gpr_free(p); +static void method_parameters_free(grpc_exec_ctx *exec_ctx, void *value) { + method_parameters_unref(value); } static const grpc_slice_hash_table_vtable method_parameters_vtable = { method_parameters_free, method_parameters_copy}; +static bool parse_wait_for_ready(grpc_json *field, + wait_for_ready_value *wait_for_ready) { + if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { + return false; + } + *wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE + : WAIT_FOR_READY_FALSE; + return true; +} + +static bool parse_timeout(grpc_json *field, gpr_timespec *timeout) { + if (field->type != GRPC_JSON_STRING) return false; + size_t len = strlen(field->value); + if (field->value[len - 1] != 's') return false; + char *buf = gpr_strdup(field->value); + buf[len - 1] = '\0'; // Remove trailing 's'. + char *decimal_point = strchr(buf, '.'); + if (decimal_point != NULL) { + *decimal_point = '\0'; + timeout->tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); + if (timeout->tv_nsec == -1) { + gpr_free(buf); + return false; + } + // There should always be exactly 3, 6, or 9 fractional digits. + int multiplier = 1; + switch (strlen(decimal_point + 1)) { + case 9: + break; + case 6: + multiplier *= 1000; + break; + case 3: + multiplier *= 1000000; + break; + default: // Unsupported number of digits. + gpr_free(buf); + return false; + } + timeout->tv_nsec *= multiplier; + } + timeout->tv_sec = gpr_parse_nonnegative_int(buf); + gpr_free(buf); + if (timeout->tv_sec == -1) return false; + return true; +} + static void *method_parameters_create_from_json(const grpc_json *json) { wait_for_ready_value wait_for_ready = WAIT_FOR_READY_UNSET; gpr_timespec timeout = {0, 0, GPR_TIMESPAN}; @@ -100,49 +160,14 @@ static void *method_parameters_create_from_json(const grpc_json *json) { if (field->key == NULL) continue; if (strcmp(field->key, "waitForReady") == 0) { if (wait_for_ready != WAIT_FOR_READY_UNSET) return NULL; // Duplicate. - if (field->type != GRPC_JSON_TRUE && field->type != GRPC_JSON_FALSE) { - return NULL; - } - wait_for_ready = field->type == GRPC_JSON_TRUE ? WAIT_FOR_READY_TRUE - : WAIT_FOR_READY_FALSE; + if (!parse_wait_for_ready(field, &wait_for_ready)) return NULL; } else if (strcmp(field->key, "timeout") == 0) { if (timeout.tv_sec > 0 || timeout.tv_nsec > 0) return NULL; // Duplicate. - if (field->type != GRPC_JSON_STRING) return NULL; - size_t len = strlen(field->value); - if (field->value[len - 1] != 's') return NULL; - char *buf = gpr_strdup(field->value); - buf[len - 1] = '\0'; // Remove trailing 's'. - char *decimal_point = strchr(buf, '.'); - if (decimal_point != NULL) { - *decimal_point = '\0'; - timeout.tv_nsec = gpr_parse_nonnegative_int(decimal_point + 1); - if (timeout.tv_nsec == -1) { - gpr_free(buf); - return NULL; - } - // There should always be exactly 3, 6, or 9 fractional digits. - int multiplier = 1; - switch (strlen(decimal_point + 1)) { - case 9: - break; - case 6: - multiplier *= 1000; - break; - case 3: - multiplier *= 1000000; - break; - default: // Unsupported number of digits. - gpr_free(buf); - return NULL; - } - timeout.tv_nsec *= multiplier; - } - timeout.tv_sec = gpr_parse_nonnegative_int(buf); - if (timeout.tv_sec == -1) return NULL; - gpr_free(buf); + if (!parse_timeout(field, &timeout)) return NULL; } } method_parameters *value = gpr_malloc(sizeof(method_parameters)); + gpr_ref_init(&value->refs, 1); value->timeout = timeout; value->wait_for_ready = wait_for_ready; return value; @@ -160,13 +185,10 @@ typedef struct client_channel_channel_data { /** client channel factory */ grpc_client_channel_factory *client_channel_factory; - /** mutex protecting all variables below in this data structure */ - gpr_mu mu; + /** combiner protecting all variables below in this data structure */ + grpc_combiner *combiner; /** currently active load balancer */ - char *lb_policy_name; grpc_lb_policy *lb_policy; - /** service config in JSON form */ - char *service_config_json; /** maps method names to method_parameters structs */ grpc_slice_hash_table *method_params_table; /** incoming resolver result - set by resolver.next() */ @@ -183,6 +205,13 @@ typedef struct client_channel_channel_data { grpc_channel_stack *owning_stack; /** interested parties (owned) */ grpc_pollset_set *interested_parties; + + /* the following properties are guarded by a mutex since API's require them + to be instantaneously available */ + gpr_mu info_mu; + char *info_lb_policy_name; + /** service config in JSON form */ + char *info_service_config_json; } channel_data; /** We create one watcher for each new lb_policy that is returned from a @@ -195,9 +224,9 @@ typedef struct { grpc_lb_policy *lb_policy; } lb_policy_connectivity_watcher; -static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, - grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state); +static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state); static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, @@ -208,7 +237,7 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, state == GRPC_CHANNEL_SHUTDOWN) && chand->lb_policy != NULL) { /* cancel picks with wait_for_ready=false */ - grpc_lb_policy_cancel_picks( + grpc_lb_policy_cancel_picks_locked( exec_ctx, chand->lb_policy, /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, /* check= */ 0, GRPC_ERROR_REF(error)); @@ -218,54 +247,45 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, } static void on_lb_policy_state_changed_locked(grpc_exec_ctx *exec_ctx, - lb_policy_connectivity_watcher *w, - grpc_error *error) { + void *arg, grpc_error *error) { + lb_policy_connectivity_watcher *w = arg; grpc_connectivity_state publish_state = w->state; - /* check if the notification is for a stale policy */ - if (w->lb_policy != w->chand->lb_policy) return; - - if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { - publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; - grpc_resolver_channel_saw_error(exec_ctx, w->chand->resolver); - GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); - w->chand->lb_policy = NULL; - } - set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, - GRPC_ERROR_REF(error), "lb_changed"); - if (w->state != GRPC_CHANNEL_SHUTDOWN) { - watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state); + /* check if the notification is for the latest policy */ + if (w->lb_policy == w->chand->lb_policy) { + if (publish_state == GRPC_CHANNEL_SHUTDOWN && w->chand->resolver != NULL) { + publish_state = GRPC_CHANNEL_TRANSIENT_FAILURE; + grpc_resolver_channel_saw_error_locked(exec_ctx, w->chand->resolver); + GRPC_LB_POLICY_UNREF(exec_ctx, w->chand->lb_policy, "channel"); + w->chand->lb_policy = NULL; + } + set_channel_connectivity_state_locked(exec_ctx, w->chand, publish_state, + GRPC_ERROR_REF(error), "lb_changed"); + if (w->state != GRPC_CHANNEL_SHUTDOWN) { + watch_lb_policy_locked(exec_ctx, w->chand, w->lb_policy, w->state); + } } -} - -static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - lb_policy_connectivity_watcher *w = arg; - - gpr_mu_lock(&w->chand->mu); - on_lb_policy_state_changed_locked(exec_ctx, w, error); - gpr_mu_unlock(&w->chand->mu); GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "watch_lb_policy"); gpr_free(w); } -static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, - grpc_lb_policy *lb_policy, - grpc_connectivity_state current_state) { +static void watch_lb_policy_locked(grpc_exec_ctx *exec_ctx, channel_data *chand, + grpc_lb_policy *lb_policy, + grpc_connectivity_state current_state) { lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w)); GRPC_CHANNEL_STACK_REF(chand->owning_stack, "watch_lb_policy"); w->chand = chand; - grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&w->on_changed, on_lb_policy_state_changed_locked, w, + grpc_combiner_scheduler(chand->combiner, false)); w->state = current_state; w->lb_policy = lb_policy; - grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state, - &w->on_changed); + grpc_lb_policy_notify_on_state_change_locked(exec_ctx, lb_policy, &w->state, + &w->on_changed); } -static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void on_resolver_result_changed_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { channel_data *chand = arg; char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; @@ -317,13 +337,14 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_lb_policy_args lb_policy_args; lb_policy_args.args = chand->resolver_result; lb_policy_args.client_channel_factory = chand->client_channel_factory; + lb_policy_args.combiner = chand->combiner; lb_policy = grpc_lb_policy_create(exec_ctx, lb_policy_name, &lb_policy_args); if (lb_policy != NULL) { GRPC_LB_POLICY_REF(lb_policy, "config_change"); GRPC_ERROR_UNREF(state_error); - state = - grpc_lb_policy_check_connectivity(exec_ctx, lb_policy, &state_error); + state = grpc_lb_policy_check_connectivity_locked(exec_ctx, lb_policy, + &state_error); } // Find service config. channel_arg = @@ -353,17 +374,18 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, chand->interested_parties); } - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (lb_policy_name != NULL) { - gpr_free(chand->lb_policy_name); - chand->lb_policy_name = lb_policy_name; + gpr_free(chand->info_lb_policy_name); + chand->info_lb_policy_name = lb_policy_name; } old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (service_config_json != NULL) { - gpr_free(chand->service_config_json); - chand->service_config_json = service_config_json; + gpr_free(chand->info_service_config_json); + chand->info_service_config_json = service_config_json; } + gpr_mu_unlock(&chand->info_mu); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } @@ -386,15 +408,15 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, set_channel_connectivity_state_locked( exec_ctx, chand, state, GRPC_ERROR_REF(state_error), "new_lb+resolver"); if (lb_policy != NULL) { - watch_lb_policy(exec_ctx, chand, lb_policy, state); + watch_lb_policy_locked(exec_ctx, chand, lb_policy, state); } GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); - gpr_mu_unlock(&chand->mu); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } else { if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; } @@ -404,11 +426,10 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_CREATE_REFERENCING("Got config after disconnection", refs, GPR_ARRAY_SIZE(refs)), "resolver_gone"); - gpr_mu_unlock(&chand->mu); } if (exit_idle) { - grpc_lb_policy_exit_idle(exec_ctx, lb_policy); + grpc_lb_policy_exit_idle_locked(exec_ctx, lb_policy); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle"); } @@ -426,20 +447,12 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_ERROR_UNREF(state_error); } -static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, - grpc_channel_element *elem, - grpc_transport_op *op) { +static void start_transport_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + grpc_transport_op *op = arg; + grpc_channel_element *elem = op->transport_private.args[0]; channel_data *chand = elem->channel_data; - grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); - - GPR_ASSERT(op->set_accept_stream == false); - if (op->bind_pollset != NULL) { - grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, - op->bind_pollset); - } - - gpr_mu_lock(&chand->mu); if (op->on_connectivity_state_change != NULL) { grpc_connectivity_state_notify_on_state_change( exec_ctx, &chand->state_tracker, op->connectivity_state, @@ -453,7 +466,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_closure_sched(exec_ctx, op->send_ping, GRPC_ERROR_CREATE("Ping with no load balancing")); } else { - grpc_lb_policy_ping_one(exec_ctx, chand->lb_policy, op->send_ping); + grpc_lb_policy_ping_one_locked(exec_ctx, chand->lb_policy, op->send_ping); op->bind_pollset = NULL; } op->send_ping = NULL; @@ -464,7 +477,7 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, set_channel_connectivity_state_locked( exec_ctx, chand, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(op->disconnect_with_error), "disconnect"); - grpc_resolver_shutdown(exec_ctx, chand->resolver); + grpc_resolver_shutdown_locked(exec_ctx, chand->resolver); GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); chand->resolver = NULL; if (!chand->started_resolving) { @@ -482,25 +495,48 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, } GRPC_ERROR_UNREF(op->disconnect_with_error); } - gpr_mu_unlock(&chand->mu); + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "start_transport_op"); + + grpc_closure_sched(exec_ctx, op->on_consumed, GRPC_ERROR_NONE); +} + +static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + grpc_transport_op *op) { + channel_data *chand = elem->channel_data; + + GPR_ASSERT(op->set_accept_stream == false); + if (op->bind_pollset != NULL) { + grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, + op->bind_pollset); + } + + op->transport_private.args[0] = elem; + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "start_transport_op"); + grpc_closure_sched( + exec_ctx, grpc_closure_init( + &op->transport_private.closure, start_transport_op_locked, + op, grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, const grpc_channel_info *info) { channel_data *chand = elem->channel_data; - gpr_mu_lock(&chand->mu); + gpr_mu_lock(&chand->info_mu); if (info->lb_policy_name != NULL) { - *info->lb_policy_name = chand->lb_policy_name == NULL + *info->lb_policy_name = chand->info_lb_policy_name == NULL ? NULL - : gpr_strdup(chand->lb_policy_name); + : gpr_strdup(chand->info_lb_policy_name); } if (info->service_config_json != NULL) { - *info->service_config_json = chand->service_config_json == NULL - ? NULL - : gpr_strdup(chand->service_config_json); + *info->service_config_json = + chand->info_service_config_json == NULL + ? NULL + : gpr_strdup(chand->info_service_config_json); } - gpr_mu_unlock(&chand->mu); + gpr_mu_unlock(&chand->info_mu); } /* Constructor for channel_data */ @@ -508,15 +544,15 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_channel_element_args *args) { channel_data *chand = elem->channel_data; - memset(chand, 0, sizeof(*chand)); GPR_ASSERT(args->is_last); GPR_ASSERT(elem->filter == &grpc_client_channel_filter); // Initialize data members. - gpr_mu_init(&chand->mu); + chand->combiner = grpc_combiner_create(NULL); + gpr_mu_init(&chand->info_mu); chand->owning_stack = args->channel_stack; grpc_closure_init(&chand->on_resolver_result_changed, - on_resolver_result_changed, chand, - grpc_schedule_on_exec_ctx); + on_resolver_result_changed_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)); chand->interested_parties = grpc_pollset_set_create(); grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel"); @@ -539,7 +575,7 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, chand->resolver = grpc_resolver_create( exec_ctx, proxy_name != NULL ? proxy_name : arg->value.string, new_args != NULL ? new_args : args->channel_args, - chand->interested_parties); + chand->interested_parties, chand->combiner); if (proxy_name != NULL) gpr_free(proxy_name); if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); if (chand->resolver == NULL) { @@ -548,13 +584,23 @@ static grpc_error *cc_init_channel_elem(grpc_exec_ctx *exec_ctx, return GRPC_ERROR_NONE; } +static void shutdown_resolver_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_resolver *resolver = arg; + grpc_resolver_shutdown_locked(exec_ctx, resolver); + GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel"); +} + /* Destructor for channel_data */ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_resolver_shutdown(exec_ctx, chand->resolver); - GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(shutdown_resolver_locked, chand->resolver, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } if (chand->client_channel_factory != NULL) { grpc_client_channel_factory_unref(exec_ctx, chand->client_channel_factory); @@ -565,14 +611,15 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } - gpr_free(chand->lb_policy_name); - gpr_free(chand->service_config_json); + gpr_free(chand->info_lb_policy_name); + gpr_free(chand->info_service_config_json); if (chand->method_params_table != NULL) { grpc_slice_hash_table_unref(exec_ctx, chand->method_params_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); - grpc_pollset_set_destroy(chand->interested_parties); - gpr_mu_destroy(&chand->mu); + grpc_pollset_set_destroy(exec_ctx, chand->interested_parties); + GRPC_COMBINER_UNREF(exec_ctx, chand->combiner, "client_channel"); + gpr_mu_destroy(&chand->info_mu); } /************************************************************************* @@ -585,7 +632,8 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, #define CANCELLED_CALL ((grpc_subchannel_call *)1) typedef enum { - GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING, + /* zero so that it can be default-initialized */ + GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING = 0, GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL } subchannel_creation_phase; @@ -606,16 +654,14 @@ typedef struct client_channel_call_data { grpc_slice path; // Request path. gpr_timespec call_start_time; gpr_timespec deadline; - wait_for_ready_value wait_for_ready_from_service_config; - grpc_closure read_service_config; + method_parameters *method_params; grpc_error *cancel_error; /** either 0 for no call, 1 for cancelled, or a pointer to a grpc_subchannel_call */ gpr_atm subchannel_call; - - gpr_mu mu; + gpr_arena *arena; subchannel_creation_phase creation_phase; grpc_connected_subchannel *connected_subchannel; @@ -661,52 +707,73 @@ static void fail_locked(grpc_exec_ctx *exec_ctx, call_data *calld, GRPC_ERROR_UNREF(error); } -typedef struct { - grpc_transport_stream_op **ops; - size_t nops; - grpc_subchannel_call *call; -} retry_ops_args; - -static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, grpc_error *error) { - retry_ops_args *a = args; - size_t i; - for (i = 0; i < a->nops; i++) { - grpc_subchannel_call_process_op(exec_ctx, a->call, a->ops[i]); - } - GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops"); - gpr_free(a->ops); - gpr_free(a); -} - static void retry_waiting_locked(grpc_exec_ctx *exec_ctx, call_data *calld) { if (calld->waiting_ops_count == 0) { return; } - retry_ops_args *a = gpr_malloc(sizeof(*a)); - a->ops = calld->waiting_ops; - a->nops = calld->waiting_ops_count; - a->call = GET_CALL(calld); - if (a->call == CANCELLED_CALL) { - gpr_free(a); + grpc_subchannel_call *call = GET_CALL(calld); + grpc_transport_stream_op **ops = calld->waiting_ops; + size_t nops = calld->waiting_ops_count; + if (call == CANCELLED_CALL) { fail_locked(exec_ctx, calld, GRPC_ERROR_CANCELLED); return; } calld->waiting_ops = NULL; calld->waiting_ops_count = 0; calld->waiting_ops_capacity = 0; - GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops"); - grpc_closure_sched( - exec_ctx, grpc_closure_create(retry_ops, a, grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + for (size_t i = 0; i < nops; i++) { + grpc_subchannel_call_process_op(exec_ctx, call, ops[i]); + } + gpr_free(ops); +} + +// Sets calld->method_params. +// If the method params specify a timeout, populates +// *per_method_deadline and returns true. +static bool set_call_method_params_from_service_config_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + gpr_timespec *per_method_deadline) { + channel_data *chand = elem->channel_data; + call_data *calld = elem->call_data; + if (chand->method_params_table != NULL) { + calld->method_params = grpc_method_config_table_get( + exec_ctx, chand->method_params_table, calld->path); + if (calld->method_params != NULL) { + method_parameters_ref(calld->method_params); + if (gpr_time_cmp(calld->method_params->timeout, + gpr_time_0(GPR_TIMESPAN)) != 0) { + *per_method_deadline = + gpr_time_add(calld->call_start_time, calld->method_params->timeout); + return true; + } + } + } + return false; +} + +static void apply_final_configuration_locked(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem) { + /* apply service-config level configuration to the call (now that we're + * certain it exists) */ + call_data *calld = elem->call_data; + gpr_timespec per_method_deadline; + if (set_call_method_params_from_service_config_locked(exec_ctx, elem, + &per_method_deadline)) { + // If the deadline from the service config is shorter than the one + // from the client API, reset the deadline timer. + if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { + calld->deadline = per_method_deadline; + grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); + } + } } -static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void subchannel_ready_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { grpc_call_element *elem = arg; call_data *calld = elem->call_data; channel_data *chand = elem->channel_data; - gpr_mu_lock(&calld->mu); GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL); grpc_polling_entity_del_from_pollset_set(exec_ctx, calld->pollent, @@ -730,9 +797,14 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, } else { /* Create call on subchannel. */ grpc_subchannel_call *subchannel_call = NULL; + const grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *new_error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (new_error != GRPC_ERROR_NONE) { new_error = grpc_error_add_child(new_error, error); subchannel_call = CANCELLED_CALL; @@ -742,7 +814,6 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); } - gpr_mu_unlock(&calld->mu); GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } @@ -768,37 +839,35 @@ typedef struct { /** Return true if subchannel is available immediately (in which case on_ready should not be called), or false otherwise (in which case on_ready should be called when the subchannel is available). */ -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error); - -static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error); + +static void continue_picking_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { continue_picking_args *cpa = arg; if (cpa->connected_subchannel == NULL) { /* cancelled, do nothing */ } else if (error != GRPC_ERROR_NONE) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_REF(error)); } else { - call_data *calld = cpa->elem->call_data; - gpr_mu_lock(&calld->mu); - if (pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata, - cpa->initial_metadata_flags, cpa->connected_subchannel, - cpa->on_ready, GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, cpa->elem, cpa->initial_metadata, + cpa->initial_metadata_flags, + cpa->connected_subchannel, cpa->on_ready, + GRPC_ERROR_NONE)) { grpc_closure_sched(exec_ctx, cpa->on_ready, GRPC_ERROR_NONE); } - gpr_mu_unlock(&calld->mu); } gpr_free(cpa); } -static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_metadata_batch *initial_metadata, - uint32_t initial_metadata_flags, - grpc_connected_subchannel **connected_subchannel, - grpc_closure *on_ready, grpc_error *error) { +static bool pick_subchannel_locked( + grpc_exec_ctx *exec_ctx, grpc_call_element *elem, + grpc_metadata_batch *initial_metadata, uint32_t initial_metadata_flags, + grpc_connected_subchannel **connected_subchannel, grpc_closure *on_ready, + grpc_error *error) { GPR_TIMER_BEGIN("pick_subchannel", 0); channel_data *chand = elem->channel_data; @@ -808,11 +877,11 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GPR_ASSERT(connected_subchannel); - gpr_mu_lock(&chand->mu); if (initial_metadata == NULL) { if (chand->lb_policy != NULL) { - grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, - connected_subchannel, GRPC_ERROR_REF(error)); + grpc_lb_policy_cancel_pick_locked(exec_ctx, chand->lb_policy, + connected_subchannel, + GRPC_ERROR_REF(error)); } for (closure = chand->waiting_for_config_closures.head; closure != NULL; closure = closure->next_data.next) { @@ -824,16 +893,15 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, GRPC_ERROR_CREATE_REFERENCING("Pick cancelled", &error, 1)); } } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); GRPC_ERROR_UNREF(error); return true; } GPR_ASSERT(error == GRPC_ERROR_NONE); if (chand->lb_policy != NULL) { + apply_final_configuration_locked(exec_ctx, elem); grpc_lb_policy *lb_policy = chand->lb_policy; GRPC_LB_POLICY_REF(lb_policy, "pick_subchannel"); - gpr_mu_unlock(&chand->mu); // If the application explicitly set wait_for_ready, use that. // Otherwise, if the service config specified a value for this // method, use that. @@ -841,10 +909,11 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, initial_metadata_flags & GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; const bool wait_for_ready_set_from_service_config = - calld->wait_for_ready_from_service_config != WAIT_FOR_READY_UNSET; + calld->method_params != NULL && + calld->method_params->wait_for_ready != WAIT_FOR_READY_UNSET; if (!wait_for_ready_set_from_api && wait_for_ready_set_from_service_config) { - if (calld->wait_for_ready_from_service_config == WAIT_FOR_READY_TRUE) { + if (calld->method_params->wait_for_ready == WAIT_FOR_READY_TRUE) { initial_metadata_flags |= GRPC_INITIAL_METADATA_WAIT_FOR_READY; } else { initial_metadata_flags &= ~GRPC_INITIAL_METADATA_WAIT_FOR_READY; @@ -853,7 +922,7 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_lb_policy_pick_args inputs = { initial_metadata, initial_metadata_flags, &calld->lb_token_mdelem, gpr_inf_future(GPR_CLOCK_MONOTONIC)}; - const bool result = grpc_lb_policy_pick( + const bool result = grpc_lb_policy_pick_locked( exec_ctx, lb_policy, &inputs, connected_subchannel, NULL, on_ready); GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick_subchannel"); GPR_TIMER_END("pick_subchannel", 0); @@ -862,8 +931,9 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (chand->resolver != NULL && !chand->started_resolving) { chand->started_resolving = true; GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); } if (chand->resolver != NULL) { cpa = gpr_malloc(sizeof(*cpa)); @@ -872,88 +942,66 @@ static bool pick_subchannel(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, cpa->connected_subchannel = connected_subchannel; cpa->on_ready = on_ready; cpa->elem = elem; - grpc_closure_init(&cpa->closure, continue_picking, cpa, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&cpa->closure, continue_picking_locked, cpa, + grpc_combiner_scheduler(chand->combiner, true)); grpc_closure_list_append(&chand->waiting_for_config_closures, &cpa->closure, GRPC_ERROR_NONE); } else { grpc_closure_sched(exec_ctx, on_ready, GRPC_ERROR_CREATE("Disconnected")); } - gpr_mu_unlock(&chand->mu); GPR_TIMER_END("pick_subchannel", 0); return false; } -// The logic here is fairly complicated, due to (a) the fact that we -// need to handle the case where we receive the send op before the -// initial metadata op, and (b) the need for efficiency, especially in -// the streaming case. -// TODO(ctiller): Explain this more thoroughly. -static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, - grpc_call_element *elem, - grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; +static void start_transport_stream_op_locked_inner(grpc_exec_ctx *exec_ctx, + grpc_transport_stream_op *op, + grpc_call_element *elem) { channel_data *chand = elem->channel_data; - GRPC_CALL_LOG_OP(GPR_INFO, elem, op); - grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); - /* try to (atomically) get the call */ - grpc_subchannel_call *call = GET_CALL(calld); - GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); - if (call == CANCELLED_CALL) { - grpc_transport_stream_op_finish_with_failure( - exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - if (call != NULL) { - grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); - return; - } - /* we failed; lock and figure out what to do */ - gpr_mu_lock(&calld->mu); -retry: + call_data *calld = elem->call_data; + grpc_subchannel_call *call; + /* need to recheck that another thread hasn't set the call */ call = GET_CALL(calld); if (call == CANCELLED_CALL) { - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } if (call != NULL) { - gpr_mu_unlock(&calld->mu); grpc_subchannel_call_process_op(exec_ctx, call, op); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } /* if this is a cancellation, then we can raise our cancelled flag */ if (op->cancel_error != GRPC_ERROR_NONE) { if (!gpr_atm_rel_cas(&calld->subchannel_call, 0, (gpr_atm)(uintptr_t)CANCELLED_CALL)) { - goto retry; + /* recurse to retry */ + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + /* early out */ + return; } else { - // Stash a copy of cancel_error in our call data, so that we can use - // it for subsequent operations. This ensures that if the call is - // cancelled before any ops are passed down (e.g., if the deadline - // is in the past when the call starts), we can return the right - // error to the caller when the first op does get passed down. + /* Stash a copy of cancel_error in our call data, so that we can use + it for subsequent operations. This ensures that if the call is + cancelled before any ops are passed down (e.g., if the deadline + is in the past when the call starts), we can return the right + error to the caller when the first op does get passed down. */ calld->cancel_error = GRPC_ERROR_REF(op->cancel_error); switch (calld->creation_phase) { case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING: fail_locked(exec_ctx, calld, GRPC_ERROR_REF(op->cancel_error)); break; case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL: - pick_subchannel(exec_ctx, elem, NULL, 0, &calld->connected_subchannel, - NULL, GRPC_ERROR_REF(op->cancel_error)); + pick_subchannel_locked(exec_ctx, elem, NULL, 0, + &calld->connected_subchannel, NULL, + GRPC_ERROR_REF(op->cancel_error)); break; } - gpr_mu_unlock(&calld->mu); grpc_transport_stream_op_finish_with_failure( exec_ctx, op, GRPC_ERROR_REF(op->cancel_error)); - GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ return; } } @@ -962,16 +1010,16 @@ retry: calld->connected_subchannel == NULL && op->send_initial_metadata != NULL) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL; - grpc_closure_init(&calld->next_step, subchannel_ready, elem, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&calld->next_step, subchannel_ready_locked, elem, + grpc_combiner_scheduler(chand->combiner, true)); GRPC_CALL_STACK_REF(calld->owning_call, "pick_subchannel"); /* If a subchannel is not available immediately, the polling entity from call_data should be provided to channel_data's interested_parties, so that IO of the lb_policy and resolver could be done under it. */ - if (pick_subchannel(exec_ctx, elem, op->send_initial_metadata, - op->send_initial_metadata_flags, - &calld->connected_subchannel, &calld->next_step, - GRPC_ERROR_NONE)) { + if (pick_subchannel_locked(exec_ctx, elem, op->send_initial_metadata, + op->send_initial_metadata_flags, + &calld->connected_subchannel, &calld->next_step, + GRPC_ERROR_NONE)) { calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "pick_subchannel"); } else { @@ -983,9 +1031,14 @@ retry: if (calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING && calld->connected_subchannel != NULL) { grpc_subchannel_call *subchannel_call = NULL; + const grpc_connected_subchannel_call_args call_args = { + .pollent = calld->pollent, + .path = calld->path, + .start_time = calld->call_start_time, + .deadline = calld->deadline, + .arena = calld->arena}; grpc_error *error = grpc_connected_subchannel_create_call( - exec_ctx, calld->connected_subchannel, calld->pollent, calld->path, - calld->call_start_time, calld->deadline, &subchannel_call); + exec_ctx, calld->connected_subchannel, &call_args, &subchannel_call); if (error != GRPC_ERROR_NONE) { subchannel_call = CANCELLED_CALL; fail_locked(exec_ctx, calld, GRPC_ERROR_REF(error)); @@ -994,130 +1047,85 @@ retry: gpr_atm_rel_store(&calld->subchannel_call, (gpr_atm)(uintptr_t)subchannel_call); retry_waiting_locked(exec_ctx, calld); - goto retry; + /* recurse to retry */ + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + /* early out */ + return; } /* nothing to be done but wait */ add_waiting_locked(calld, op); - gpr_mu_unlock(&calld->mu); - GPR_TIMER_END("cc_start_transport_stream_op", 0); } -// Gets data from the service config. Invoked when the resolver returns -// its initial result. -static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - grpc_call_element *elem = arg; - channel_data *chand = elem->channel_data; +static void start_transport_stream_op_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + GPR_TIMER_BEGIN("start_transport_stream_op_locked", 0); + + grpc_transport_stream_op *op = arg; + grpc_call_element *elem = op->handler_private.args[0]; call_data *calld = elem->call_data; - // If this is an error, there's no point in looking at the service config. - if (error == GRPC_ERROR_NONE) { - // Get the method config table from channel data. - gpr_mu_lock(&chand->mu); - grpc_slice_hash_table *method_params_table = NULL; - if (chand->method_params_table != NULL) { - method_params_table = - grpc_slice_hash_table_ref(chand->method_params_table); - } - gpr_mu_unlock(&chand->mu); - // If the method config table was present, use it. - if (method_params_table != NULL) { - const method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, calld->path); - if (method_params != NULL) { - const bool have_method_timeout = - gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; - if (have_method_timeout || - method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - gpr_mu_lock(&calld->mu); - if (have_method_timeout) { - const gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, method_params->timeout); - if (gpr_time_cmp(per_method_deadline, calld->deadline) < 0) { - calld->deadline = per_method_deadline; - // Reset deadline timer. - grpc_deadline_state_reset(exec_ctx, elem, calld->deadline); - } - } - if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - calld->wait_for_ready_from_service_config = - method_params->wait_for_ready; - } - gpr_mu_unlock(&calld->mu); - } - } - grpc_slice_hash_table_unref(exec_ctx, method_params_table); - } + + start_transport_stream_op_locked_inner(exec_ctx, op, elem); + + GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, + "start_transport_stream_op"); + GPR_TIMER_END("start_transport_stream_op_locked", 0); +} + +/* The logic here is fairly complicated, due to (a) the fact that we + need to handle the case where we receive the send op before the + initial metadata op, and (b) the need for efficiency, especially in + the streaming case. + + We use double-checked locking to initially see if initialization has been + performed. If it has not, we acquire the combiner and perform initialization. + If it has, we proceed on the fast path. */ +static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx, + grpc_call_element *elem, + grpc_transport_stream_op *op) { + call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; + GRPC_CALL_LOG_OP(GPR_INFO, elem, op); + grpc_deadline_state_client_start_transport_stream_op(exec_ctx, elem, op); + /* try to (atomically) get the call */ + grpc_subchannel_call *call = GET_CALL(calld); + GPR_TIMER_BEGIN("cc_start_transport_stream_op", 0); + if (call == CANCELLED_CALL) { + grpc_transport_stream_op_finish_with_failure( + exec_ctx, op, GRPC_ERROR_REF(calld->cancel_error)); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ + return; + } + if (call != NULL) { + grpc_subchannel_call_process_op(exec_ctx, call, op); + GPR_TIMER_END("cc_start_transport_stream_op", 0); + /* early out */ + return; } - GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); + /* we failed; lock and figure out what to do */ + GRPC_CALL_STACK_REF(calld->owning_call, "start_transport_stream_op"); + op->handler_private.args[0] = elem; + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&op->handler_private.closure, + start_transport_stream_op_locked, op, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); + GPR_TIMER_END("cc_start_transport_stream_op", 0); } /* Constructor for call_data */ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { - channel_data *chand = elem->channel_data; + const grpc_call_element_args *args) { call_data *calld = elem->call_data; // Initialize data members. grpc_deadline_state_init(exec_ctx, elem, args->call_stack); calld->path = grpc_slice_ref_internal(args->path); calld->call_start_time = args->start_time; calld->deadline = gpr_convert_clock_type(args->deadline, GPR_CLOCK_MONOTONIC); - calld->wait_for_ready_from_service_config = WAIT_FOR_READY_UNSET; - calld->cancel_error = GRPC_ERROR_NONE; - gpr_atm_rel_store(&calld->subchannel_call, 0); - gpr_mu_init(&calld->mu); - calld->connected_subchannel = NULL; - calld->waiting_ops = NULL; - calld->waiting_ops_count = 0; - calld->waiting_ops_capacity = 0; - calld->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING; calld->owning_call = args->call_stack; - calld->pollent = NULL; - // If the resolver has already returned results, then we can access - // the service config parameters immediately. Otherwise, we need to - // defer that work until the resolver returns an initial result. - // TODO(roth): This code is almost but not quite identical to the code - // in read_service_config() above. It would be nice to find a way to - // combine them, to avoid having to maintain it twice. - gpr_mu_lock(&chand->mu); - if (chand->lb_policy != NULL) { - // We already have a resolver result, so check for service config. - if (chand->method_params_table != NULL) { - grpc_slice_hash_table *method_params_table = - grpc_slice_hash_table_ref(chand->method_params_table); - gpr_mu_unlock(&chand->mu); - method_parameters *method_params = grpc_method_config_table_get( - exec_ctx, method_params_table, args->path); - if (method_params != NULL) { - if (gpr_time_cmp(method_params->timeout, - gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { - gpr_timespec per_method_deadline = - gpr_time_add(calld->call_start_time, method_params->timeout); - calld->deadline = gpr_time_min(calld->deadline, per_method_deadline); - } - if (method_params->wait_for_ready != WAIT_FOR_READY_UNSET) { - calld->wait_for_ready_from_service_config = - method_params->wait_for_ready; - } - } - grpc_slice_hash_table_unref(exec_ctx, method_params_table); - } else { - gpr_mu_unlock(&chand->mu); - } - } else { - // We don't yet have a resolver result, so register a callback to - // get the service config data once the resolver returns. - // Take a reference to the call stack to be owned by the callback. - GRPC_CALL_STACK_REF(calld->owning_call, "read_service_config"); - grpc_closure_init(&calld->read_service_config, read_service_config, elem, - grpc_schedule_on_exec_ctx); - grpc_closure_list_append(&chand->waiting_for_config_closures, - &calld->read_service_config, GRPC_ERROR_NONE); - gpr_mu_unlock(&chand->mu); - } - // Start the deadline timer with the current deadline value. If we - // do not yet have service config data, then the timer may be reset - // later. + calld->arena = args->arena; grpc_deadline_state_start(exec_ctx, elem, calld->deadline); return GRPC_ERROR_NONE; } @@ -1126,24 +1134,28 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *and_free_memory) { + grpc_closure *then_schedule_closure) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); grpc_slice_unref_internal(exec_ctx, calld->path); + if (calld->method_params != NULL) { + method_parameters_unref(calld->method_params); + } GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { + grpc_subchannel_call_set_cleanup_closure(call, then_schedule_closure); + then_schedule_closure = NULL; GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "client_channel_destroy_call"); } GPR_ASSERT(calld->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING); - gpr_mu_destroy(&calld->mu); GPR_ASSERT(calld->waiting_ops_count == 0); if (calld->connected_subchannel != NULL) { GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, calld->connected_subchannel, "picked"); } gpr_free(calld->waiting_ops); - gpr_free(and_free_memory); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx, @@ -1172,26 +1184,37 @@ const grpc_channel_filter grpc_client_channel_filter = { "client-channel", }; +static void try_to_connect_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + channel_data *chand = arg; + if (chand->lb_policy != NULL) { + grpc_lb_policy_exit_idle_locked(exec_ctx, chand->lb_policy); + } else { + chand->exit_idle_when_lb_policy_arrives = true; + if (!chand->started_resolving && chand->resolver != NULL) { + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); + chand->started_resolving = true; + grpc_resolver_next_locked(exec_ctx, chand->resolver, + &chand->resolver_result, + &chand->on_resolver_result_changed); + } + } + GRPC_CHANNEL_STACK_UNREF(exec_ctx, chand->owning_stack, "try_to_connect"); +} + grpc_connectivity_state grpc_client_channel_check_connectivity_state( grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) { channel_data *chand = elem->channel_data; - grpc_connectivity_state out; - gpr_mu_lock(&chand->mu); - out = grpc_connectivity_state_check(&chand->state_tracker, NULL); + grpc_connectivity_state out = + grpc_connectivity_state_check(&chand->state_tracker); if (out == GRPC_CHANNEL_IDLE && try_to_connect) { - if (chand->lb_policy != NULL) { - grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy); - } else { - chand->exit_idle_when_lb_policy_arrives = true; - if (!chand->started_resolving && chand->resolver != NULL) { - GRPC_CHANNEL_STACK_REF(chand->owning_stack, "resolver"); - chand->started_resolving = true; - grpc_resolver_next(exec_ctx, chand->resolver, &chand->resolver_result, - &chand->on_resolver_result_changed); - } - } + GRPC_CHANNEL_STACK_REF(chand->owning_stack, "try_to_connect"); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(try_to_connect_locked, chand, + grpc_combiner_scheduler(chand->combiner, false)), + GRPC_ERROR_NONE); } - gpr_mu_unlock(&chand->mu); return out; } @@ -1199,6 +1222,7 @@ typedef struct { channel_data *chand; grpc_pollset *pollset; grpc_closure *on_complete; + grpc_connectivity_state *state; grpc_closure my_closure; } external_connectivity_watcher; @@ -1211,7 +1235,16 @@ static void on_external_watch_complete(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_STACK_UNREF(exec_ctx, w->chand->owning_stack, "external_connectivity_watcher"); gpr_free(w); - follow_up->cb(exec_ctx, follow_up->cb_arg, error); + grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); +} + +static void watch_connectivity_state_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error_ignored) { + external_connectivity_watcher *w = arg; + grpc_closure_init(&w->my_closure, on_external_watch_complete, w, + grpc_schedule_on_exec_ctx); + grpc_connectivity_state_notify_on_state_change( + exec_ctx, &w->chand->state_tracker, w->state, &w->my_closure); } void grpc_client_channel_watch_connectivity_state( @@ -1222,13 +1255,13 @@ void grpc_client_channel_watch_connectivity_state( w->chand = chand; w->pollset = pollset; w->on_complete = on_complete; + w->state = state; grpc_pollset_set_add_pollset(exec_ctx, chand->interested_parties, pollset); - grpc_closure_init(&w->my_closure, on_external_watch_complete, w, - grpc_schedule_on_exec_ctx); GRPC_CHANNEL_STACK_REF(w->chand->owning_stack, "external_connectivity_watcher"); - gpr_mu_lock(&chand->mu); - grpc_connectivity_state_notify_on_state_change( - exec_ctx, &chand->state_tracker, state, &w->my_closure); - gpr_mu_unlock(&chand->mu); + grpc_closure_sched( + exec_ctx, + grpc_closure_init(&w->my_closure, watch_connectivity_state_locked, w, + grpc_combiner_scheduler(chand->combiner, true)), + GRPC_ERROR_NONE); } diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index 6f9df3e386..28d3b63f99 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -64,7 +64,7 @@ static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, } } char *default_authority = grpc_get_default_authority( - grpc_channel_stack_builder_get_target(builder)); + exec_ctx, grpc_channel_stack_builder_get_target(builder)); if (default_authority != NULL) { grpc_arg arg; arg.type = GRPC_ARG_STRING; diff --git a/src/core/ext/client_channel/http_proxy.c b/src/core/ext/client_channel/http_proxy.c index 7daa071495..e280cef101 100644 --- a/src/core/ext/client_channel/http_proxy.c +++ b/src/core/ext/client_channel/http_proxy.c @@ -46,10 +46,11 @@ #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/support/env.h" -static char* grpc_get_http_proxy_server() { +static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) { char* uri_str = gpr_getenv("http_proxy"); if (uri_str == NULL) return NULL; - grpc_uri* uri = grpc_uri_parse(uri_str, false /* suppress_errors */); + grpc_uri* uri = + grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */); char* proxy_name = NULL; if (uri == NULL || uri->authority == NULL) { gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var"); @@ -76,9 +77,10 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx, const grpc_channel_args* args, char** name_to_resolve, grpc_channel_args** new_args) { - *name_to_resolve = grpc_get_http_proxy_server(); + *name_to_resolve = grpc_get_http_proxy_server(exec_ctx); if (*name_to_resolve == NULL) return false; - grpc_uri* uri = grpc_uri_parse(server_uri, false /* suppress_errors */); + grpc_uri* uri = + grpc_uri_parse(exec_ctx, server_uri, false /* suppress_errors */); if (uri == NULL || uri->path[0] == '\0') { gpr_log(GPR_ERROR, "'http_proxy' environment variable set, but cannot " @@ -87,6 +89,12 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx, if (uri != NULL) grpc_uri_destroy(uri); return false; } + if (strcmp(uri->scheme, "unix") == 0) { + gpr_log(GPR_INFO, "not using proxy for Unix domain socket '%s'", + server_uri); + grpc_uri_destroy(uri); + return false; + } grpc_arg new_arg; new_arg.key = GRPC_ARG_HTTP_CONNECT_SERVER; new_arg.type = GRPC_ARG_STRING; diff --git a/src/core/ext/client_channel/lb_policy.c b/src/core/ext/client_channel/lb_policy.c index 45ee72e2f0..aba51add53 100644 --- a/src/core/ext/client_channel/lb_policy.c +++ b/src/core/ext/client_channel/lb_policy.c @@ -32,14 +32,17 @@ */ #include "src/core/ext/client_channel/lb_policy.h" +#include "src/core/lib/iomgr/combiner.h" #define WEAK_REF_BITS 16 void grpc_lb_policy_init(grpc_lb_policy *policy, - const grpc_lb_policy_vtable *vtable) { + const grpc_lb_policy_vtable *vtable, + grpc_combiner *combiner) { policy->vtable = vtable; gpr_atm_no_barrier_store(&policy->ref_pair, 1 << WEAK_REF_BITS); policy->interested_parties = grpc_pollset_set_create(); + policy->combiner = GRPC_COMBINER_REF(combiner, "lb_policy"); } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -71,6 +74,13 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { ref_mutate(policy, 1 << WEAK_REF_BITS, 0 REF_MUTATE_PASS_ARGS("STRONG_REF")); } +static void shutdown_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_lb_policy *policy = arg; + policy->vtable->shutdown_locked(exec_ctx, policy); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, policy, "strong-unref"); +} + void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { gpr_atm old_val = @@ -79,10 +89,15 @@ void grpc_lb_policy_unref(grpc_exec_ctx *exec_ctx, gpr_atm mask = ~(gpr_atm)((1 << WEAK_REF_BITS) - 1); gpr_atm check = 1 << WEAK_REF_BITS; if ((old_val & mask) == check) { - policy->vtable->shutdown(exec_ctx, policy); + grpc_closure_sched( + exec_ctx, + grpc_closure_create(shutdown_locked, policy, + grpc_combiner_scheduler(policy->combiner, false)), + GRPC_ERROR_NONE); + } else { + grpc_lb_policy_weak_unref(exec_ctx, + policy REF_FUNC_PASS_ARGS("strong-unref")); } - grpc_lb_policy_weak_unref(exec_ctx, - policy REF_FUNC_PASS_ARGS("strong-unref")); } void grpc_lb_policy_weak_ref(grpc_lb_policy *policy REF_FUNC_EXTRA_ARGS) { @@ -94,53 +109,59 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, gpr_atm old_val = ref_mutate(policy, -(gpr_atm)1, 1 REF_MUTATE_PASS_ARGS("WEAK_UNREF")); if (old_val == 1) { - grpc_pollset_set_destroy(policy->interested_parties); + grpc_pollset_set_destroy(exec_ctx, policy->interested_parties); + grpc_combiner *combiner = policy->combiner; policy->vtable->destroy(exec_ctx, policy); + GRPC_COMBINER_UNREF(exec_ctx, combiner, "lb_policy"); } } -int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete) { - return policy->vtable->pick(exec_ctx, policy, pick_args, target, user_data, - on_complete); +int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, + void **user_data, grpc_closure *on_complete) { + return policy->vtable->pick_locked(exec_ctx, policy, pick_args, target, + user_data, on_complete); } -void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target, - grpc_error *error) { - policy->vtable->cancel_pick(exec_ctx, policy, target, error); +void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_connected_subchannel **target, + grpc_error *error) { + policy->vtable->cancel_pick_locked(exec_ctx, policy, target, error); } -void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error) { - policy->vtable->cancel_picks(exec_ctx, policy, initial_metadata_flags_mask, - initial_metadata_flags_eq, error); +void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error) { + policy->vtable->cancel_picks_locked(exec_ctx, policy, + initial_metadata_flags_mask, + initial_metadata_flags_eq, error); } -void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy) { - policy->vtable->exit_idle(exec_ctx, policy); +void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy) { + policy->vtable->exit_idle_locked(exec_ctx, policy); } -void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_closure *closure) { - policy->vtable->ping_one(exec_ctx, policy, closure); +void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_closure *closure) { + policy->vtable->ping_one_locked(exec_ctx, policy, closure); } -void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_closure *closure) { - policy->vtable->notify_on_state_change(exec_ctx, policy, state, closure); +void grpc_lb_policy_notify_on_state_change_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_connectivity_state *state, grpc_closure *closure) { + policy->vtable->notify_on_state_change_locked(exec_ctx, policy, state, + closure); } -grpc_connectivity_state grpc_lb_policy_check_connectivity( +grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_error **connectivity_error) { - return policy->vtable->check_connectivity(exec_ctx, policy, - connectivity_error); + return policy->vtable->check_connectivity_locked(exec_ctx, policy, + connectivity_error); } diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/client_channel/lb_policy.h index 120c641edc..3405709c2c 100644 --- a/src/core/ext/client_channel/lb_policy.h +++ b/src/core/ext/client_channel/lb_policy.h @@ -51,6 +51,8 @@ struct grpc_lb_policy { gpr_atm ref_pair; /* owned pointer to interested parties in load balancing decisions */ grpc_pollset_set *interested_parties; + /* combiner under which lb_policy actions take place */ + grpc_combiner *combiner; }; /** Extra arguments for an LB pick */ @@ -69,42 +71,44 @@ typedef struct grpc_lb_policy_pick_args { struct grpc_lb_policy_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); + void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** \see grpc_lb_policy_pick */ - int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete); + int (*pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete); /** \see grpc_lb_policy_cancel_pick */ - void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target, grpc_error *error); + void (*cancel_pick_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_connected_subchannel **target, + grpc_error *error); /** \see grpc_lb_policy_cancel_picks */ - void (*cancel_picks)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, grpc_error *error); + void (*cancel_picks_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error); /** \see grpc_lb_policy_ping_one */ - void (*ping_one)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_closure *closure); + void (*ping_one_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_closure *closure); /** Try to enter a READY connectivity state */ - void (*exit_idle)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); + void (*exit_idle_locked)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** check the current connectivity of the lb_policy */ - grpc_connectivity_state (*check_connectivity)( + grpc_connectivity_state (*check_connectivity_locked)( grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_error **connectivity_error); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy. Calling with a NULL \a state cancels the subscription. */ - void (*notify_on_state_change)(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_closure *closure); + void (*notify_on_state_change_locked)(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_connectivity_state *state, + grpc_closure *closure); }; /*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ @@ -144,7 +148,8 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); /** called by concrete implementations to initialize the base struct */ void grpc_lb_policy_init(grpc_lb_policy *policy, - const grpc_lb_policy_vtable *vtable); + const grpc_lb_policy_vtable *vtable, + grpc_combiner *combiner); /** Finds an appropriate subchannel for a call, based on \a pick_args. @@ -159,43 +164,45 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, Any IO should be done under the \a interested_parties \a grpc_pollset_set in the \a grpc_lb_policy struct. */ -int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete); +int grpc_lb_policy_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, + void **user_data, grpc_closure *on_complete); /** Perform a connected subchannel ping (see \a grpc_connected_subchannel_ping) against one of the connected subchannels managed by \a policy. */ -void grpc_lb_policy_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_closure *closure); +void grpc_lb_policy_ping_one_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_closure *closure); /** Cancel picks for \a target. The \a on_complete callback of the pending picks will be invoked with \a *target set to NULL. */ -void grpc_lb_policy_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, - grpc_connected_subchannel **target, - grpc_error *error); +void grpc_lb_policy_cancel_pick_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + grpc_connected_subchannel **target, + grpc_error *error); /** Cancel all pending picks for which their \a initial_metadata_flags (as given in the call to \a grpc_lb_policy_pick) matches \a initial_metadata_flags_eq when AND'd with \a initial_metadata_flags_mask */ -void grpc_lb_policy_cancel_picks(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error); +void grpc_lb_policy_cancel_picks_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error); /** Try to enter a READY connectivity state */ -void grpc_lb_policy_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); +void grpc_lb_policy_exit_idle_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *policy); /* Call notify when the connectivity state of a channel changes from \a *state. * Updates \a *state with the new state of the policy */ -void grpc_lb_policy_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *policy, - grpc_connectivity_state *state, - grpc_closure *closure); +void grpc_lb_policy_notify_on_state_change_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, + grpc_connectivity_state *state, grpc_closure *closure); -grpc_connectivity_state grpc_lb_policy_check_connectivity( +grpc_connectivity_state grpc_lb_policy_check_connectivity_locked( grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, grpc_error **connectivity_error); diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h index 9b8b03f982..27c12c0d73 100644 --- a/src/core/ext/client_channel/lb_policy_factory.h +++ b/src/core/ext/client_channel/lb_policy_factory.h @@ -107,6 +107,7 @@ grpc_arg grpc_lb_addresses_create_channel_arg( typedef struct grpc_lb_policy_args { grpc_client_channel_factory *client_channel_factory; grpc_channel_args *args; + grpc_combiner *combiner; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { diff --git a/src/core/ext/client_channel/parse_address.c b/src/core/ext/client_channel/parse_address.c index b1d55ad0f5..cd1b2cd80c 100644 --- a/src/core/ext/client_channel/parse_address.c +++ b/src/core/ext/client_channel/parse_address.c @@ -44,16 +44,18 @@ #include <grpc/support/host_port.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> +#include "src/core/lib/support/string.h" #ifdef GRPC_HAVE_UNIX_SOCKET int parse_unix(grpc_uri *uri, grpc_resolved_address *resolved_addr) { struct sockaddr_un *un = (struct sockaddr_un *)resolved_addr->addr; - + const size_t maxlen = sizeof(un->sun_path); + const size_t path_len = strnlen(uri->path, maxlen); + if (path_len == maxlen) return 0; un->sun_family = AF_UNIX; strcpy(un->sun_path, uri->path); - resolved_addr->len = strlen(un->sun_path) + sizeof(un->sun_family) + 1; - + resolved_addr->len = sizeof(*un); return 1; } @@ -119,9 +121,33 @@ int parse_ipv6(grpc_uri *uri, grpc_resolved_address *resolved_addr) { memset(in6, 0, sizeof(*in6)); resolved_addr->len = sizeof(*in6); in6->sin6_family = AF_INET6; - if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { - gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); - goto done; + + /* Handle the RFC6874 syntax for IPv6 zone identifiers. */ + char *host_end = (char *)gpr_memrchr(host, '%', strlen(host)); + if (host_end != NULL) { + GPR_ASSERT(host_end >= host); + char host_without_scope[INET6_ADDRSTRLEN]; + size_t host_without_scope_len = (size_t)(host_end - host); + uint32_t sin6_scope_id = 0; + strncpy(host_without_scope, host, host_without_scope_len); + host_without_scope[host_without_scope_len] = '\0'; + if (inet_pton(AF_INET6, host_without_scope, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host_without_scope); + goto done; + } + if (gpr_parse_bytes_to_uint32(host_end + 1, + strlen(host) - host_without_scope_len - 1, + &sin6_scope_id) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 scope id: '%s'", host_end + 1); + goto done; + } + // Handle "sin6_scope_id" being type "u_long". See grpc issue ##10027. + in6->sin6_scope_id = sin6_scope_id; + } else { + if (inet_pton(AF_INET6, host, &in6->sin6_addr) == 0) { + gpr_log(GPR_ERROR, "invalid ipv6 address: '%s'", host); + goto done; + } } if (port != NULL) { diff --git a/src/core/ext/client_channel/proxy_mapper_registry.c b/src/core/ext/client_channel/proxy_mapper_registry.c index 2c44b9d490..0935ddbdbd 100644 --- a/src/core/ext/client_channel/proxy_mapper_registry.c +++ b/src/core/ext/client_channel/proxy_mapper_registry.c @@ -94,6 +94,14 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { grpc_proxy_mapper_destroy(list->list[i]); } gpr_free(list->list); + // Clean up in case we re-initialze later. + // TODO(ctiller): This should ideally live in + // grpc_proxy_mapper_registry_init(). However, if we did this there, + // then we would do it AFTER we start registering proxy mappers from + // third-party plugins, so they'd never show up (and would leak memory). + // We probably need some sort of dependency system for plugins to fix + // this. + memset(list, 0, sizeof(*list)); } // @@ -102,9 +110,7 @@ static void grpc_proxy_mapper_list_destroy(grpc_proxy_mapper_list* list) { static grpc_proxy_mapper_list g_proxy_mapper_list; -void grpc_proxy_mapper_registry_init() { - memset(&g_proxy_mapper_list, 0, sizeof(g_proxy_mapper_list)); -} +void grpc_proxy_mapper_registry_init() {} void grpc_proxy_mapper_registry_shutdown() { grpc_proxy_mapper_list_destroy(&g_proxy_mapper_list); diff --git a/src/core/ext/client_channel/resolver.c b/src/core/ext/client_channel/resolver.c index 2ae4fe862e..b1a1faa6c9 100644 --- a/src/core/ext/client_channel/resolver.c +++ b/src/core/ext/client_channel/resolver.c @@ -32,10 +32,13 @@ */ #include "src/core/ext/client_channel/resolver.h" +#include "src/core/lib/iomgr/combiner.h" void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable) { + const grpc_resolver_vtable *vtable, + grpc_combiner *combiner) { resolver->vtable = vtable; + resolver->combiner = GRPC_COMBINER_REF(combiner, "resolver"); gpr_ref_init(&resolver->refs, 1); } @@ -62,20 +65,24 @@ void grpc_resolver_unref(grpc_resolver *resolver, void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { #endif if (gpr_unref(&resolver->refs)) { + grpc_combiner *combiner = resolver->combiner; resolver->vtable->destroy(exec_ctx, resolver); + GRPC_COMBINER_UNREF(exec_ctx, combiner, "resolver"); } } -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { - resolver->vtable->shutdown(exec_ctx, resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->shutdown_locked(exec_ctx, resolver); } -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { - resolver->vtable->channel_saw_error(exec_ctx, resolver); +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { + resolver->vtable->channel_saw_error_locked(exec_ctx, resolver); } -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete) { - resolver->vtable->next(exec_ctx, resolver, result, on_complete); +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete) { + resolver->vtable->next_locked(exec_ctx, resolver, result, on_complete); } diff --git a/src/core/ext/client_channel/resolver.h b/src/core/ext/client_channel/resolver.h index 96ece92b9d..bbba424ca5 100644 --- a/src/core/ext/client_channel/resolver.h +++ b/src/core/ext/client_channel/resolver.h @@ -44,14 +44,16 @@ typedef struct grpc_resolver_vtable grpc_resolver_vtable; struct grpc_resolver { const grpc_resolver_vtable *vtable; gpr_refcount refs; + grpc_combiner *combiner; }; struct grpc_resolver_vtable { void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*shutdown)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*channel_saw_error)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); - void (*next)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + void (*shutdown_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); + void (*channel_saw_error_locked)(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); + void (*next_locked)(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, grpc_closure *on_complete); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG @@ -70,21 +72,30 @@ void grpc_resolver_unref(grpc_exec_ctx *exec_ctx, grpc_resolver *policy); #endif void grpc_resolver_init(grpc_resolver *resolver, - const grpc_resolver_vtable *vtable); + const grpc_resolver_vtable *vtable, + grpc_combiner *combiner); -void grpc_resolver_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver); +void grpc_resolver_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Notification that the channel has seen an error on some address. - Can be used as a hint that re-resolution is desirable soon. */ -void grpc_resolver_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver); + Can be used as a hint that re-resolution is desirable soon. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver); /** Get the next result from the resolver. Expected to set \a *result with new channel args and then schedule \a on_complete for execution. If resolution is fatally broken, set \a *result to NULL and - schedule \a on_complete. */ -void grpc_resolver_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **result, grpc_closure *on_complete); + schedule \a on_complete. + + Must be called from the combiner passed as a resolver_arg at construction + time.*/ +void grpc_resolver_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **result, + grpc_closure *on_complete); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_H */ diff --git a/src/core/ext/client_channel/resolver_factory.h b/src/core/ext/client_channel/resolver_factory.h index 3792ddca18..e3cd99ec5a 100644 --- a/src/core/ext/client_channel/resolver_factory.h +++ b/src/core/ext/client_channel/resolver_factory.h @@ -50,6 +50,7 @@ typedef struct grpc_resolver_args { grpc_uri *uri; const grpc_channel_args *args; grpc_pollset_set *pollset_set; + grpc_combiner *combiner; } grpc_resolver_args; struct grpc_resolver_factory_vtable { diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index 5110a7cad9..3c5a6fb3ff 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -108,22 +108,23 @@ static grpc_resolver_factory *lookup_factory_by_uri(grpc_uri *uri) { return lookup_factory(uri->scheme); } -static grpc_resolver_factory *resolve_factory(const char *target, +static grpc_resolver_factory *resolve_factory(grpc_exec_ctx *exec_ctx, + const char *target, grpc_uri **uri, char **canonical_target) { grpc_resolver_factory *factory = NULL; GPR_ASSERT(uri != NULL); - *uri = grpc_uri_parse(target, 1); + *uri = grpc_uri_parse(exec_ctx, target, 1); factory = lookup_factory_by_uri(*uri); if (factory == NULL) { grpc_uri_destroy(*uri); gpr_asprintf(canonical_target, "%s%s", g_default_resolver_prefix, target); - *uri = grpc_uri_parse(*canonical_target, 1); + *uri = grpc_uri_parse(exec_ctx, *canonical_target, 1); factory = lookup_factory_by_uri(*uri); if (factory == NULL) { - grpc_uri_destroy(grpc_uri_parse(target, 0)); - grpc_uri_destroy(grpc_uri_parse(*canonical_target, 0)); + grpc_uri_destroy(grpc_uri_parse(exec_ctx, target, 0)); + grpc_uri_destroy(grpc_uri_parse(exec_ctx, *canonical_target, 0)); gpr_log(GPR_ERROR, "don't know how to resolve '%s' or '%s'", target, *canonical_target); } @@ -133,17 +134,19 @@ static grpc_resolver_factory *resolve_factory(const char *target, grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set) { + grpc_pollset_set *pollset_set, + grpc_combiner *combiner) { grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = - resolve_factory(target, &uri, &canonical_target); + resolve_factory(exec_ctx, target, &uri, &canonical_target); grpc_resolver *resolver; grpc_resolver_args resolver_args; memset(&resolver_args, 0, sizeof(resolver_args)); resolver_args.uri = uri; resolver_args.args = args; resolver_args.pollset_set = pollset_set; + resolver_args.combiner = combiner; resolver = grpc_resolver_factory_create_resolver(exec_ctx, factory, &resolver_args); grpc_uri_destroy(uri); @@ -151,21 +154,22 @@ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, return resolver; } -char *grpc_get_default_authority(const char *target) { +char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target) { grpc_uri *uri = NULL; char *canonical_target = NULL; grpc_resolver_factory *factory = - resolve_factory(target, &uri, &canonical_target); + resolve_factory(exec_ctx, target, &uri, &canonical_target); char *authority = grpc_resolver_factory_get_default_authority(factory, uri); grpc_uri_destroy(uri); gpr_free(canonical_target); return authority; } -char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target) { +char *grpc_resolver_factory_add_default_prefix_if_needed( + grpc_exec_ctx *exec_ctx, const char *target) { grpc_uri *uri = NULL; char *canonical_target = NULL; - resolve_factory(target, &uri, &canonical_target); + resolve_factory(exec_ctx, target, &uri, &canonical_target); grpc_uri_destroy(uri); return canonical_target == NULL ? gpr_strdup(target) : canonical_target; } diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h index a4606463eb..1a3ebee25a 100644 --- a/src/core/ext/client_channel/resolver_registry.h +++ b/src/core/ext/client_channel/resolver_registry.h @@ -65,7 +65,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); should not be NULL. */ grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args, - grpc_pollset_set *pollset_set); + grpc_pollset_set *pollset_set, + grpc_combiner *combiner); /** Find a resolver factory given a name and return an (owned-by-the-caller) * reference to it */ @@ -73,10 +74,11 @@ grpc_resolver_factory *grpc_resolver_factory_lookup(const char *name); /** Given a target, return a (freshly allocated with gpr_malloc) string representing the default authority to pass from a client. */ -char *grpc_get_default_authority(const char *target); +char *grpc_get_default_authority(grpc_exec_ctx *exec_ctx, const char *target); /** Returns a newly allocated string containing \a target, adding the default prefix if needed. */ -char *grpc_resolver_factory_add_default_prefix_if_needed(const char *target); +char *grpc_resolver_factory_add_default_prefix_if_needed( + grpc_exec_ctx *exec_ctx, const char *target); #endif /* GRPC_CORE_EXT_CLIENT_CHANNEL_RESOLVER_REGISTRY_H */ diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 15021a240a..e886fbc0cc 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -144,6 +144,7 @@ struct grpc_subchannel { struct grpc_subchannel_call { grpc_connected_subchannel *connection; + grpc_closure *schedule_closure_after_destroy; }; #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1)) @@ -212,7 +213,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_channel_args_destroy(exec_ctx, c->args); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); grpc_connector_unref(exec_ctx, c->connector); - grpc_pollset_set_destroy(c->pollset_set); + grpc_pollset_set_destroy(exec_ctx, c->pollset_set); grpc_subchannel_key_destroy(exec_ctx, c->key); gpr_free(c); } @@ -311,8 +312,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, return c; } - c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); + c = gpr_zalloc(sizeof(*c)); c->key = key; gpr_atm_no_barrier_store(&c->ref_pair, 1 << INTERNAL_REF_BITS); c->connector = connector; @@ -327,7 +327,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, } c->pollset_set = grpc_pollset_set_create(); grpc_resolved_address *addr = gpr_malloc(sizeof(*addr)); - grpc_get_subchannel_address_arg(args->args, addr); + grpc_get_subchannel_address_arg(exec_ctx, args->args, addr); grpc_resolved_address *new_address = NULL; grpc_channel_args *new_args = NULL; if (grpc_proxy_mappers_map_address(exec_ctx, addr, args->args, &new_address, @@ -335,17 +335,15 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, GPR_ASSERT(new_address != NULL); gpr_free(addr); addr = new_address; - if (new_args != NULL) c->args = new_args; - } - if (c->args == NULL) { - static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; - grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); - c->args = grpc_channel_args_copy_and_add_and_remove( - args->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &new_arg, - 1); - gpr_free(new_arg.value.string); } + static const char *keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS}; + grpc_arg new_arg = grpc_create_subchannel_address_arg(addr); gpr_free(addr); + c->args = grpc_channel_args_copy_and_add_and_remove( + new_args != NULL ? new_args : args->args, keys_to_remove, + GPR_ARRAY_SIZE(keys_to_remove), &new_arg, 1); + gpr_free(new_arg.value.string); + if (new_args != NULL) grpc_channel_args_destroy(exec_ctx, new_args); c->root_external_state_watcher.next = c->root_external_state_watcher.prev = &c->root_external_state_watcher; grpc_closure_init(&c->connected, subchannel_connected, c, @@ -412,7 +410,7 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c, grpc_error **error) { grpc_connectivity_state state; gpr_mu_lock(&c->mu); - state = grpc_connectivity_state_check(&c->state_tracker, error); + state = grpc_connectivity_state_get(&c->state_tracker, error); gpr_mu_unlock(&c->mu); return state; } @@ -431,7 +429,7 @@ static void on_external_state_watcher_done(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&w->subchannel->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, w->subchannel, "external_state_watcher"); gpr_free(w); - follow_up->cb(exec_ctx, follow_up->cb_arg, error); + grpc_closure_run(exec_ctx, follow_up, GRPC_ERROR_REF(error)); } static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { @@ -713,13 +711,22 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call, grpc_error *error) { grpc_subchannel_call *c = call; + GPR_ASSERT(c->schedule_closure_after_destroy != NULL); GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0); grpc_connected_subchannel *connection = c->connection; - grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, c); + grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), NULL, + c->schedule_closure_after_destroy); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, connection, "subchannel_call"); GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0); } +void grpc_subchannel_call_set_cleanup_closure(grpc_subchannel_call *call, + grpc_closure *closure) { + GPR_ASSERT(call->schedule_closure_after_destroy == NULL); + GPR_ASSERT(closure != NULL); + call->schedule_closure_after_destroy = closure; +} + void grpc_subchannel_call_ref( grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { GRPC_CALL_STACK_REF(SUBCHANNEL_CALL_TO_CALL_STACK(c), REF_REASON); @@ -755,15 +762,22 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel( grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **call) { + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **call) { grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con); - *call = gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size); + *call = gpr_arena_alloc( + args->arena, sizeof(grpc_subchannel_call) + chanstk->call_stack_size); grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(*call); (*call)->connection = con; // Ref is added below. - grpc_error *error = - grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, *call, - NULL, NULL, path, start_time, deadline, callstk); + const grpc_call_element_args call_args = {.call_stack = callstk, + .server_transport_data = NULL, + .context = NULL, + .path = args->path, + .start_time = args->start_time, + .deadline = args->deadline, + .arena = args->arena}; + grpc_error *error = grpc_call_stack_init( + exec_ctx, chanstk, 1, subchannel_call_destroy, *call, &call_args); if (error != GRPC_ERROR_NONE) { const char *error_string = grpc_error_string(error); gpr_log(GPR_ERROR, "error: %s", error_string); @@ -772,7 +786,7 @@ grpc_error *grpc_connected_subchannel_create_call( return error; } GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call"); - grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent); + grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, args->pollent); return GRPC_ERROR_NONE; } @@ -781,9 +795,9 @@ grpc_call_stack *grpc_subchannel_call_get_call_stack( return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call); } -static void grpc_uri_to_sockaddr(const char *uri_str, +static void grpc_uri_to_sockaddr(grpc_exec_ctx *exec_ctx, const char *uri_str, grpc_resolved_address *addr) { - grpc_uri *uri = grpc_uri_parse(uri_str, 0 /* suppress_errors */); + grpc_uri *uri = grpc_uri_parse(exec_ctx, uri_str, 0 /* suppress_errors */); GPR_ASSERT(uri != NULL); if (strcmp(uri->scheme, "ipv4") == 0) { GPR_ASSERT(parse_ipv4(uri, addr)); @@ -795,12 +809,13 @@ static void grpc_uri_to_sockaddr(const char *uri_str, grpc_uri_destroy(uri); } -void grpc_get_subchannel_address_arg(const grpc_channel_args *args, +void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx, + const grpc_channel_args *args, grpc_resolved_address *addr) { const char *addr_uri_str = grpc_get_subchannel_address_uri_arg(args); memset(addr, 0, sizeof(*addr)); if (*addr_uri_str != '\0') { - grpc_uri_to_sockaddr(addr_uri_str, addr); + grpc_uri_to_sockaddr(exec_ctx, addr_uri_str, addr); } } diff --git a/src/core/ext/client_channel/subchannel.h b/src/core/ext/client_channel/subchannel.h index 26ce954487..3e64a2507c 100644 --- a/src/core/ext/client_channel/subchannel.h +++ b/src/core/ext/client_channel/subchannel.h @@ -37,6 +37,7 @@ #include "src/core/ext/client_channel/connector.h" #include "src/core/lib/channel/channel_stack.h" #include "src/core/lib/iomgr/polling_entity.h" +#include "src/core/lib/support/arena.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/metadata.h" @@ -112,10 +113,18 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx, GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a subchannel call */ +typedef struct { + grpc_polling_entity *pollent; + grpc_slice path; + gpr_timespec start_time; + gpr_timespec deadline; + gpr_arena *arena; +} grpc_connected_subchannel_call_args; + grpc_error *grpc_connected_subchannel_create_call( grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel, - grpc_polling_entity *pollent, grpc_slice path, gpr_timespec start_time, - gpr_timespec deadline, grpc_subchannel_call **subchannel_call); + const grpc_connected_subchannel_call_args *args, + grpc_subchannel_call **subchannel_call); /** process a transport level op */ void grpc_connected_subchannel_process_transport_op( @@ -154,6 +163,11 @@ void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx, char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx, grpc_subchannel_call *subchannel_call); +/** Must be called once per call. Sets the 'then_schedule_closure' argument for + call stack destruction. */ +void grpc_subchannel_call_set_cleanup_closure( + grpc_subchannel_call *subchannel_call, grpc_closure *closure); + grpc_call_stack *grpc_subchannel_call_get_call_stack( grpc_subchannel_call *subchannel_call); @@ -175,7 +189,8 @@ grpc_subchannel *grpc_subchannel_create(grpc_exec_ctx *exec_ctx, const grpc_subchannel_args *args); /// Sets \a addr from \a args. -void grpc_get_subchannel_address_arg(const grpc_channel_args *args, +void grpc_get_subchannel_address_arg(grpc_exec_ctx *exec_ctx, + const grpc_channel_args *args, grpc_resolved_address *addr); /// Returns the URI string for the address to connect to. diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c index f8c946b275..d385db0801 100644 --- a/src/core/ext/client_channel/uri_parser.c +++ b/src/core/ext/client_channel/uri_parser.c @@ -35,13 +35,15 @@ #include <string.h> -#include <grpc/slice.h> #include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/lib/slice/percent_encoding.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/slice/slice_string_helpers.h" #include "src/core/lib/support/string.h" /** a size_t default value... maps to all 1's */ @@ -68,11 +70,16 @@ static grpc_uri *bad_uri(const char *uri_text, size_t pos, const char *section, return NULL; } -/** Returns a copy of \a src[begin, end) */ -static char *copy_component(const char *src, size_t begin, size_t end) { - char *out = gpr_malloc(end - begin + 1); - memcpy(out, src + begin, end - begin); - out[end - begin] = 0; +/** Returns a copy of percent decoded \a src[begin, end) */ +static char *decode_and_copy_component(grpc_exec_ctx *exec_ctx, const char *src, + size_t begin, size_t end) { + grpc_slice component = + grpc_slice_from_copied_buffer(src + begin, end - begin); + grpc_slice decoded_component = + grpc_permissive_percent_decode_slice(component); + char *out = grpc_dump_slice(decoded_component, GPR_DUMP_ASCII); + grpc_slice_unref_internal(exec_ctx, component); + grpc_slice_unref_internal(exec_ctx, decoded_component); return out; } @@ -175,7 +182,8 @@ static void parse_query_parts(grpc_uri *uri) { } } -grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { +grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text, + int suppress_errors) { grpc_uri *uri; size_t scheme_begin = 0; size_t scheme_end = NOT_SET; @@ -262,13 +270,17 @@ grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { fragment_end = i; } - uri = gpr_malloc(sizeof(*uri)); - memset(uri, 0, sizeof(*uri)); - uri->scheme = copy_component(uri_text, scheme_begin, scheme_end); - uri->authority = copy_component(uri_text, authority_begin, authority_end); - uri->path = copy_component(uri_text, path_begin, path_end); - uri->query = copy_component(uri_text, query_begin, query_end); - uri->fragment = copy_component(uri_text, fragment_begin, fragment_end); + uri = gpr_zalloc(sizeof(*uri)); + uri->scheme = + decode_and_copy_component(exec_ctx, uri_text, scheme_begin, scheme_end); + uri->authority = decode_and_copy_component(exec_ctx, uri_text, + authority_begin, authority_end); + uri->path = + decode_and_copy_component(exec_ctx, uri_text, path_begin, path_end); + uri->query = + decode_and_copy_component(exec_ctx, uri_text, query_begin, query_end); + uri->fragment = decode_and_copy_component(exec_ctx, uri_text, fragment_begin, + fragment_end); parse_query_parts(uri); return uri; diff --git a/src/core/ext/client_channel/uri_parser.h b/src/core/ext/client_channel/uri_parser.h index 5fe0e8f35e..efd4302c1c 100644 --- a/src/core/ext/client_channel/uri_parser.h +++ b/src/core/ext/client_channel/uri_parser.h @@ -35,6 +35,7 @@ #define GRPC_CORE_EXT_CLIENT_CHANNEL_URI_PARSER_H #include <stddef.h> +#include "src/core/lib/iomgr/exec_ctx.h" typedef struct { char *scheme; @@ -51,7 +52,8 @@ typedef struct { } grpc_uri; /** parse a uri, return NULL on failure */ -grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors); +grpc_uri *grpc_uri_parse(grpc_exec_ctx *exec_ctx, const char *uri_text, + int suppress_errors); /** return the part of a query string after the '=' in "?key=xxx&...", or NULL * if key is not present */ diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index ab62e5ed6a..d612591f2e 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -115,6 +115,7 @@ #include "src/core/ext/lb_policy/grpclb/grpclb_channel.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/iomgr/timer.h" @@ -237,9 +238,7 @@ static void add_pending_pick(pending_pick **root, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, grpc_closure *on_complete) { - pending_pick *pp = gpr_malloc(sizeof(*pp)); - memset(pp, 0, sizeof(pending_pick)); - memset(&pp->wrapped_on_complete_arg, 0, sizeof(wrapped_rr_closure_arg)); + pending_pick *pp = gpr_zalloc(sizeof(*pp)); pp->next = *root; pp->pick_args = *pick_args; pp->target = target; @@ -264,9 +263,7 @@ typedef struct pending_ping { } pending_ping; static void add_pending_ping(pending_ping **root, grpc_closure *notify) { - pending_ping *pping = gpr_malloc(sizeof(*pping)); - memset(pping, 0, sizeof(pending_ping)); - memset(&pping->wrapped_notify_arg, 0, sizeof(wrapped_rr_closure_arg)); + pending_ping *pping = gpr_zalloc(sizeof(*pping)); pping->wrapped_notify_arg.wrapped_closure = notify; pping->wrapped_notify_arg.free_when_done = pping; pping->next = *root; @@ -285,9 +282,6 @@ typedef struct glb_lb_policy { /** base policy: must be first */ grpc_lb_policy base; - /** mutex protecting remaining members */ - gpr_mu mu; - /** who the client is trying to communicate with */ const char *server_name; grpc_client_channel_factory *cc_factory; @@ -492,9 +486,8 @@ static grpc_lb_addresses *process_serverlist_locked( static bool update_lb_connectivity_status_locked( grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_connectivity_state new_rr_state, grpc_error *new_rr_state_error) { - grpc_error *curr_state_error; - const grpc_connectivity_state curr_glb_state = grpc_connectivity_state_check( - &glb_policy->state_tracker, &curr_state_error); + const grpc_connectivity_state curr_glb_state = + grpc_connectivity_state_check(&glb_policy->state_tracker); /* The new connectivity status is a function of the previous one and the new * input coming from the status of the RR policy. @@ -558,9 +551,9 @@ static bool pick_from_internal_rr_locked( const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, wrapped_rr_closure_arg *wc_arg) { GPR_ASSERT(rr_policy != NULL); - const bool pick_done = - grpc_lb_policy_pick(exec_ctx, rr_policy, pick_args, target, - (void **)&wc_arg->lb_token, &wc_arg->wrapper_closure); + const bool pick_done = grpc_lb_policy_pick_locked( + exec_ctx, rr_policy, pick_args, target, (void **)&wc_arg->lb_token, + &wc_arg->wrapper_closure); if (pick_done) { /* synchronous grpc_lb_policy_pick call. Unref the RR policy. */ if (grpc_lb_glb_trace) { @@ -591,6 +584,7 @@ static grpc_lb_policy *create_rr_locked( grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; + args.combiner = glb_policy->base.combiner; grpc_lb_addresses *addresses = process_serverlist_locked(exec_ctx, serverlist); @@ -609,8 +603,8 @@ static grpc_lb_policy *create_rr_locked( return rr; } -static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error); /* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { @@ -634,8 +628,8 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, grpc_error *new_rr_state_error = NULL; const grpc_connectivity_state new_rr_state = - grpc_lb_policy_check_connectivity(exec_ctx, new_rr_policy, - &new_rr_state_error); + grpc_lb_policy_check_connectivity_locked(exec_ctx, new_rr_policy, + &new_rr_state_error); /* Connectivity state is a function of the new RR policy just created */ const bool replace_old_rr = update_lb_connectivity_status_locked( exec_ctx, glb_policy, new_rr_state, new_rr_state_error); @@ -676,19 +670,19 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, /* Allocate the data for the tracking of the new RR policy's connectivity. * It'll be deallocated in glb_rr_connectivity_changed() */ rr_connectivity_data *rr_connectivity = - gpr_malloc(sizeof(rr_connectivity_data)); - memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, - rr_connectivity, grpc_schedule_on_exec_ctx); + gpr_zalloc(sizeof(rr_connectivity_data)); + grpc_closure_init(&rr_connectivity->on_change, + glb_rr_connectivity_changed_locked, rr_connectivity, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); rr_connectivity->glb_policy = glb_policy; rr_connectivity->state = new_rr_state; /* Subscribe to changes to the connectivity of the new RR */ GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); - grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_connectivity->state, - &rr_connectivity->on_change); - grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); + grpc_lb_policy_notify_on_state_change_locked(exec_ctx, glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); + grpc_lb_policy_exit_idle_locked(exec_ctx, glb_policy->rr_policy); /* Update picks and pings in wait */ pending_pick *pp; @@ -714,17 +708,16 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, gpr_log(GPR_INFO, "Pending ping about to PING from 0x%" PRIxPTR "", (intptr_t)glb_policy->rr_policy); } - grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, - &pping->wrapped_notify_arg.wrapper_closure); + grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, + &pping->wrapped_notify_arg.wrapper_closure); } } -static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void glb_rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { rr_connectivity_data *rr_connectivity = arg; glb_lb_policy *glb_policy = rr_connectivity->glb_policy; - gpr_mu_lock(&glb_policy->mu); const bool shutting_down = glb_policy->shutting_down; bool unref_needed = false; GRPC_ERROR_REF(error); @@ -741,11 +734,10 @@ static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, update_lb_connectivity_status_locked(exec_ctx, glb_policy, rr_connectivity->state, error); /* Resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ - grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_connectivity->state, - &rr_connectivity->on_change); + grpc_lb_policy_notify_on_state_change_locked( + exec_ctx, glb_policy->rr_policy, &rr_connectivity->state, + &rr_connectivity->on_change); } - gpr_mu_unlock(&glb_policy->mu); if (unref_needed) { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "rr_connectivity_cb"); @@ -863,14 +855,13 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, } if (num_grpclb_addrs == 0) return NULL; - glb_lb_policy *glb_policy = gpr_malloc(sizeof(*glb_policy)); - memset(glb_policy, 0, sizeof(*glb_policy)); + glb_lb_policy *glb_policy = gpr_zalloc(sizeof(*glb_policy)); /* Get server name. */ arg = grpc_channel_args_find(args->args, GRPC_ARG_SERVER_URI); GPR_ASSERT(arg != NULL); GPR_ASSERT(arg->type == GRPC_ARG_STRING); - grpc_uri *uri = grpc_uri_parse(arg->value.string, true); + grpc_uri *uri = grpc_uri_parse(exec_ctx, arg->value.string, true); GPR_ASSERT(uri->path[0] != '\0'); glb_policy->server_name = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path); @@ -900,8 +891,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, gpr_free(glb_policy); return NULL; } - grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable); - gpr_mu_init(&glb_policy->mu); + grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); return &glb_policy->base; @@ -919,13 +909,11 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { if (glb_policy->serverlist != NULL) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } - gpr_mu_destroy(&glb_policy->mu); gpr_free(glb_policy); } -static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void glb_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); glb_policy->shutting_down = true; pending_pick *pp = glb_policy->pending_picks; @@ -942,7 +930,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { * while holding glb_policy->mu: lb_on_server_status_received, invoked due to * the cancel, needs to acquire that same lock */ grpc_call *lb_call = glb_policy->lb_call; - gpr_mu_unlock(&glb_policy->mu); /* glb_policy->lb_call and this local lb_call must be consistent at this point * because glb_policy->lb_call is only assigned in lb_call_init_locked as part @@ -968,11 +955,10 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } -static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target, - grpc_error *error) { +static void glb_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_connected_subchannel **target, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; while (pp != NULL) { @@ -988,16 +974,15 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&glb_policy->mu); GRPC_ERROR_UNREF(error); } -static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error) { +static void glb_cancel_picks_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; while (pp != NULL) { @@ -1013,7 +998,6 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&glb_policy->mu); GRPC_ERROR_UNREF(error); } @@ -1026,19 +1010,17 @@ static void start_picking_locked(grpc_exec_ctx *exec_ctx, query_for_backends_locked(exec_ctx, glb_policy); } -static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void glb_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); if (!glb_policy->started_picking) { start_picking_locked(exec_ctx, glb_policy); } - gpr_mu_unlock(&glb_policy->mu); } -static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete) { +static int glb_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete) { if (pick_args->lb_token_mdelem_storage == NULL) { *target = NULL; grpc_closure_sched( @@ -1049,7 +1031,6 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); glb_policy->deadline = pick_args->deadline; bool pick_done; @@ -1060,8 +1041,7 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); - wrapped_rr_closure_arg *wc_arg = gpr_malloc(sizeof(wrapped_rr_closure_arg)); - memset(wc_arg, 0, sizeof(wrapped_rr_closure_arg)); + wrapped_rr_closure_arg *wc_arg = gpr_zalloc(sizeof(wrapped_rr_closure_arg)); grpc_closure_init(&wc_arg->wrapper_closure, wrapped_rr_closure, wc_arg, grpc_schedule_on_exec_ctx); @@ -1088,53 +1068,43 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pick_done = false; } - gpr_mu_unlock(&glb_policy->mu); return pick_done; } -static grpc_connectivity_state glb_check_connectivity( +static grpc_connectivity_state glb_check_connectivity_locked( grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **connectivity_error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - grpc_connectivity_state st; - gpr_mu_lock(&glb_policy->mu); - st = grpc_connectivity_state_check(&glb_policy->state_tracker, + return grpc_connectivity_state_get(&glb_policy->state_tracker, connectivity_error); - gpr_mu_unlock(&glb_policy->mu); - return st; } -static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { +static void glb_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); if (glb_policy->rr_policy) { - grpc_lb_policy_ping_one(exec_ctx, glb_policy->rr_policy, closure); + grpc_lb_policy_ping_one_locked(exec_ctx, glb_policy->rr_policy, closure); } else { add_pending_ping(&glb_policy->pending_pings, closure); if (!glb_policy->started_picking) { start_picking_locked(exec_ctx, glb_policy); } } - gpr_mu_unlock(&glb_policy->mu); } -static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { +static void glb_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; - gpr_mu_lock(&glb_policy->mu); grpc_connectivity_state_notify_on_state_change( exec_ctx, &glb_policy->state_tracker, current, notify); - - gpr_mu_unlock(&glb_policy->mu); } -static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); +static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error); +static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); @@ -1163,11 +1133,11 @@ static void lb_call_init_locked(grpc_exec_ctx *exec_ctx, grpc_grpclb_request_destroy(request); grpc_closure_init(&glb_policy->lb_on_server_status_received, - lb_on_server_status_received, glb_policy, - grpc_schedule_on_exec_ctx); + lb_on_server_status_received_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); grpc_closure_init(&glb_policy->lb_on_response_received, - lb_on_response_received, glb_policy, - grpc_schedule_on_exec_ctx); + lb_on_response_received_locked, glb_policy, + grpc_combiner_scheduler(glb_policy->base.combiner, false)); gpr_backoff_init(&glb_policy->lb_call_backoff_state, GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS, @@ -1262,14 +1232,13 @@ static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, GPR_ASSERT(GRPC_CALL_OK == call_error); } -static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_on_response_received_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { glb_lb_policy *glb_policy = arg; grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - gpr_mu_lock(&glb_policy->mu); if (glb_policy->lb_response_payload != NULL) { gpr_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside @@ -1343,20 +1312,17 @@ static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); } - gpr_mu_unlock(&glb_policy->mu); } else { /* empty payload: call cancelled. */ /* dispose of the "lb_on_response_received" weak ref taken in * query_for_backends_locked() and reused in every reception loop */ - gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "lb_on_response_received_empty_payload"); } } -static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_call_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { glb_lb_policy *glb_policy = arg; - gpr_mu_lock(&glb_policy->mu); if (!glb_policy->shutting_down) { if (grpc_lb_glb_trace) { @@ -1366,15 +1332,13 @@ static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(glb_policy->lb_call == NULL); query_for_backends_locked(exec_ctx, glb_policy); } - gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "grpclb_on_retry_timer"); } -static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx, + void *arg, grpc_error *error) { glb_lb_policy *glb_policy = arg; - gpr_mu_lock(&glb_policy->mu); GPR_ASSERT(glb_policy->lb_call != NULL); @@ -1409,21 +1373,27 @@ static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, } } GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); - grpc_closure_init(&glb_policy->lb_on_call_retry, lb_call_on_retry_timer, - glb_policy, grpc_schedule_on_exec_ctx); + grpc_closure_init( + &glb_policy->lb_on_call_retry, lb_call_on_retry_timer_locked, + glb_policy, grpc_combiner_scheduler(glb_policy->base.combiner, false)); grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, &glb_policy->lb_on_call_retry, now); } - gpr_mu_unlock(&glb_policy->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, "lb_on_server_status_received"); } /* Code wiring the policy with the rest of the core */ static const grpc_lb_policy_vtable glb_lb_policy_vtable = { - glb_destroy, glb_shutdown, glb_pick, - glb_cancel_pick, glb_cancel_picks, glb_ping_one, - glb_exit_idle, glb_check_connectivity, glb_notify_on_state_change}; + glb_destroy, + glb_shutdown_locked, + glb_pick_locked, + glb_cancel_pick_locked, + glb_cancel_picks_locked, + glb_ping_one_locked, + glb_exit_idle_locked, + glb_check_connectivity_locked, + glb_notify_on_state_change_locked}; static void glb_factory_ref(grpc_lb_policy_factory *factory) {} diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c index 837e9c1113..3c4604c402 100644 --- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c +++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c @@ -62,8 +62,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field, } dec_arg->num_servers++; } else { /* second pass. Actually decode. */ - grpc_grpclb_server *server = gpr_malloc(sizeof(grpc_grpclb_server)); - memset(server, 0, sizeof(grpc_grpclb_server)); + grpc_grpclb_server *server = gpr_zalloc(sizeof(grpc_grpclb_server)); GPR_ASSERT(dec_arg->num_servers > 0); if (dec_arg->decoding_idx == 0) { /* first iteration of second pass */ dec_arg->servers = @@ -160,8 +159,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist( return NULL; } - grpc_grpclb_serverlist *sl = gpr_malloc(sizeof(grpc_grpclb_serverlist)); - memset(sl, 0, sizeof(*sl)); + grpc_grpclb_serverlist *sl = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); sl->num_servers = arg.num_servers; sl->servers = arg.servers; if (res.server_list.has_expiration_interval) { @@ -183,8 +181,7 @@ void grpc_grpclb_destroy_serverlist(grpc_grpclb_serverlist *serverlist) { grpc_grpclb_serverlist *grpc_grpclb_serverlist_copy( const grpc_grpclb_serverlist *sl) { - grpc_grpclb_serverlist *copy = gpr_malloc(sizeof(grpc_grpclb_serverlist)); - memset(copy, 0, sizeof(grpc_grpclb_serverlist)); + grpc_grpclb_serverlist *copy = gpr_zalloc(sizeof(grpc_grpclb_serverlist)); copy->num_servers = sl->num_servers; memcpy(©->expiration_interval, &sl->expiration_interval, sizeof(grpc_grpclb_duration)); diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c index 9f2aa461be..e2a66d16bd 100644 --- a/src/core/ext/lb_policy/pick_first/pick_first.c +++ b/src/core/ext/lb_policy/pick_first/pick_first.c @@ -38,6 +38,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" @@ -57,11 +58,11 @@ typedef struct { grpc_closure connectivity_changed; - /** the selected channel (a grpc_connected_subchannel) */ - gpr_atm selected; + /** remaining members are protected by the combiner */ + + /** the selected channel */ + grpc_connected_subchannel *selected; - /** mutex protecting remaining members */ - gpr_mu mu; /** have we started picking? */ int started_picking; /** are we shut down? */ @@ -77,32 +78,24 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -#define GET_SELECTED(p) \ - ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected)) - static void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connected_subchannel *selected = GET_SELECTED(p); size_t i; GPR_ASSERT(p->pending_picks == NULL); for (i = 0; i < p->num_subchannels; i++) { GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first"); } - if (selected != NULL) { - GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, selected, "picked_first"); + if (p->selected != NULL) { + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first"); } grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); gpr_free(p); } -static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - grpc_connected_subchannel *selected; - gpr_mu_lock(&p->mu); - selected = GET_SELECTED(p); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; @@ -110,15 +103,14 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_CREATE("Channel shutdown"), "shutdown"); /* cancel subscription */ - if (selected != NULL) { + if (p->selected != NULL) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, NULL, NULL, &p->connectivity_changed); + exec_ctx, p->selected, NULL, NULL, &p->connectivity_changed); } else if (p->num_subchannels > 0) { grpc_subchannel_notify_on_state_change( exec_ctx, p->subchannels[p->checking_subchannel], NULL, NULL, &p->connectivity_changed); } - gpr_mu_unlock(&p->mu); while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; @@ -128,12 +120,11 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } } -static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target, - grpc_error *error) { +static void pf_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_connected_subchannel **target, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { @@ -150,17 +141,15 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } -static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error) { +static void pf_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { @@ -177,7 +166,6 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } @@ -192,63 +180,48 @@ static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) { &p->connectivity_changed); } -static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void pf_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); if (!p->started_picking) { start_picking(exec_ctx, p); } - gpr_mu_unlock(&p->mu); } -static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete) { +static int pf_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; /* Check atomically for a selected channel */ - grpc_connected_subchannel *selected = GET_SELECTED(p); - if (selected != NULL) { - *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); + if (p->selected != NULL) { + *target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); return 1; } - /* No subchannel selected yet, so acquire lock and then attempt again */ - gpr_mu_lock(&p->mu); - selected = GET_SELECTED(p); - if (selected) { - gpr_mu_unlock(&p->mu); - *target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); - return 1; - } else { - if (!p->started_picking) { - start_picking(exec_ctx, p); - } - pp = gpr_malloc(sizeof(*pp)); - pp->next = p->pending_picks; - pp->target = target; - pp->initial_metadata_flags = pick_args->initial_metadata_flags; - pp->on_complete = on_complete; - p->pending_picks = pp; - gpr_mu_unlock(&p->mu); - return 0; + /* No subchannel selected yet, so try again */ + if (!p->started_picking) { + start_picking(exec_ctx, p); } + pp = gpr_malloc(sizeof(*pp)); + pp->next = p->pending_picks; + pp->target = target; + pp->initial_metadata_flags = pick_args->initial_metadata_flags; + pp->on_complete = on_complete; + p->pending_picks = pp; + return 0; } -static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - pick_first_lb_policy *p = arg; +static void destroy_subchannels_locked(grpc_exec_ctx *exec_ctx, + pick_first_lb_policy *p) { size_t i; size_t num_subchannels = p->num_subchannels; grpc_subchannel **subchannels; - gpr_mu_lock(&p->mu); subchannels = p->subchannels; p->num_subchannels = 0; p->subchannels = NULL; - gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "destroy_subchannels"); for (i = 0; i < num_subchannels; i++) { @@ -258,25 +231,19 @@ static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, gpr_free(subchannels); } -static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void pf_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { pick_first_lb_policy *p = arg; grpc_subchannel *selected_subchannel; pending_pick *pp; - grpc_connected_subchannel *selected; GRPC_ERROR_REF(error); - gpr_mu_lock(&p->mu); - - selected = GET_SELECTED(p); - if (p->shutdown) { - gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); GRPC_ERROR_UNREF(error); return; - } else if (selected != NULL) { + } else if (p->selected != NULL) { if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { /* if the selected channel goes bad, we're done */ p->checking_connectivity = GRPC_CHANNEL_SHUTDOWN; @@ -286,7 +253,7 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, "selected_changed"); if (p->checking_connectivity != GRPC_CHANNEL_SHUTDOWN) { grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, p->base.interested_parties, + exec_ctx, p->selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); } else { GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "pick_first_connectivity"); @@ -301,26 +268,21 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "connecting_ready"); selected_subchannel = p->subchannels[p->checking_subchannel]; - selected = - grpc_subchannel_get_connected_subchannel(selected_subchannel); - GPR_ASSERT(selected != NULL); - GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first"); + p->selected = GRPC_CONNECTED_SUBCHANNEL_REF( + grpc_subchannel_get_connected_subchannel(selected_subchannel), + "picked_first"); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); - gpr_atm_rel_store(&p->selected, (gpr_atm)selected); - grpc_closure_sched(exec_ctx, - grpc_closure_create(destroy_subchannels, p, - grpc_schedule_on_exec_ctx), - GRPC_ERROR_NONE); + destroy_subchannels_locked(exec_ctx, p); /* update any calls that were waiting for a pick */ while ((pp = p->pending_picks)) { p->pending_picks = pp->next; - *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked"); + *pp->target = GRPC_CONNECTED_SUBCHANNEL_REF(p->selected, "picked"); grpc_closure_sched(exec_ctx, pp->on_complete, GRPC_ERROR_NONE); gpr_free(pp); } grpc_connected_subchannel_notify_on_state_change( - exec_ctx, selected, p->base.interested_parties, + exec_ctx, p->selected, p->base.interested_parties, &p->checking_connectivity, &p->connectivity_changed); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: @@ -387,48 +349,44 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, } } - gpr_mu_unlock(&p->mu); - GRPC_ERROR_UNREF(error); } -static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_error **error) { +static grpc_connectivity_state pf_check_connectivity_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); - gpr_mu_unlock(&p->mu); - return st; + return grpc_connectivity_state_get(&p->state_tracker, error); } -static void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { +static void pf_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - gpr_mu_lock(&p->mu); grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, current, notify); - gpr_mu_unlock(&p->mu); } -static void pf_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { +static void pf_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; - grpc_connected_subchannel *selected = GET_SELECTED(p); - if (selected) { - grpc_connected_subchannel_ping(exec_ctx, selected, closure); + if (p->selected) { + grpc_connected_subchannel_ping(exec_ctx, p->selected, closure); } else { grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Not connected")); } } static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = { - pf_destroy, pf_shutdown, pf_pick, - pf_cancel_pick, pf_cancel_picks, pf_ping_one, - pf_exit_idle, pf_check_connectivity, pf_notify_on_state_change}; + pf_destroy, + pf_shutdown_locked, + pf_pick_locked, + pf_cancel_pick_locked, + pf_cancel_picks_locked, + pf_ping_one_locked, + pf_exit_idle_locked, + pf_check_connectivity_locked, + pf_notify_on_state_change_locked}; static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {} @@ -451,11 +409,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } if (num_addrs == 0) return NULL; - pick_first_lb_policy *p = gpr_malloc(sizeof(*p)); - memset(p, 0, sizeof(*p)); + pick_first_lb_policy *p = gpr_zalloc(sizeof(*p)); - p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * num_addrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); + p->subchannels = gpr_zalloc(sizeof(grpc_subchannel *) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; for (size_t i = 0; i < addresses->num_addresses; i++) { @@ -489,10 +445,9 @@ static grpc_lb_policy *create_pick_first(grpc_exec_ctx *exec_ctx, } p->num_subchannels = subchannel_idx; - grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); - grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p, - grpc_schedule_on_exec_ctx); - gpr_mu_init(&p->mu); + grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable, args->combiner); + grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed_locked, p, + grpc_combiner_scheduler(args->combiner, false)); return &p->base; } diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 3e060d189a..f2d1d46179 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -67,6 +67,7 @@ #include "src/core/ext/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" @@ -134,7 +135,6 @@ typedef struct { struct round_robin_lb_policy { /** base policy: must be first */ grpc_lb_policy base; - gpr_mu mu; /** total number of addresses received at creation time */ size_t num_addresses; @@ -213,8 +213,7 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { * csc to the list of ready subchannels. */ static ready_list *add_connected_sc_locked(round_robin_lb_policy *p, subchannel_data *sd) { - ready_list *new_elem = gpr_malloc(sizeof(ready_list)); - memset(new_elem, 0, sizeof(ready_list)); + ready_list *new_elem = gpr_zalloc(sizeof(ready_list)); new_elem->subchannel = sd->subchannel; new_elem->user_data = sd->user_data; if (p->ready_list.prev == NULL) { @@ -293,7 +292,6 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker); gpr_free(p->subchannels); - gpr_mu_destroy(&p->mu); elem = p->ready_list.next; while (elem != NULL && elem != &p->ready_list) { @@ -309,12 +307,11 @@ static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { gpr_free(p); } -static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void rr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; size_t i; - gpr_mu_lock(&p->mu); if (grpc_lb_round_robin_trace) { gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); } @@ -335,15 +332,13 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, &sd->connectivity_changed_closure); } - gpr_mu_unlock(&p->mu); } -static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_connected_subchannel **target, - grpc_error *error) { +static void rr_cancel_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_connected_subchannel **target, + grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { @@ -360,17 +355,15 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } -static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - uint32_t initial_metadata_flags_mask, - uint32_t initial_metadata_flags_eq, - grpc_error *error) { +static void rr_cancel_picks_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + uint32_t initial_metadata_flags_mask, + uint32_t initial_metadata_flags_eq, + grpc_error *error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; - gpr_mu_lock(&p->mu); pp = p->pending_picks; p->pending_picks = NULL; while (pp != NULL) { @@ -388,11 +381,11 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } pp = next; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } -static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { +static void start_picking_locked(grpc_exec_ctx *exec_ctx, + round_robin_lb_policy *p) { size_t i; p->started_picking = 1; @@ -411,23 +404,20 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) { } } -static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { +static void rr_exit_idle_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } - gpr_mu_unlock(&p->mu); } -static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - const grpc_lb_policy_pick_args *pick_args, - grpc_connected_subchannel **target, void **user_data, - grpc_closure *on_complete) { +static int rr_pick_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + const grpc_lb_policy_pick_args *pick_args, + grpc_connected_subchannel **target, void **user_data, + grpc_closure *on_complete) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; pending_pick *pp; ready_list *selected; - gpr_mu_lock(&p->mu); if (grpc_lb_round_robin_trace) { gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); @@ -449,12 +439,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } /* only advance the last picked pointer if the selection was used */ advance_last_picked_locked(p); - gpr_mu_unlock(&p->mu); return 1; } else { /* no pick currently available. Save for later in list of pending picks */ if (!p->started_picking) { - start_picking(exec_ctx, p); + start_picking_locked(exec_ctx, p); } pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; @@ -463,7 +452,6 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pp->initial_metadata_flags = pick_args->initial_metadata_flags; pp->user_data = user_data; p->pending_picks = pp; - gpr_mu_unlock(&p->mu); return 0; } } @@ -538,17 +526,15 @@ static grpc_connectivity_state update_lb_connectivity_status( return sd->curr_connectivity_state; } -static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void rr_connectivity_changed_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; GRPC_ERROR_REF(error); - gpr_mu_lock(&p->mu); if (p->shutdown) { - gpr_mu_unlock(&p->mu); GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); GRPC_ERROR_UNREF(error); return; @@ -645,56 +631,51 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &p->base, "rr_connectivity"); break; } - gpr_mu_unlock(&p->mu); GRPC_ERROR_UNREF(error); } -static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_error **error) { +static grpc_connectivity_state rr_check_connectivity_locked( + grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_error **error) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - grpc_connectivity_state st; - gpr_mu_lock(&p->mu); - st = grpc_connectivity_state_check(&p->state_tracker, error); - gpr_mu_unlock(&p->mu); - return st; + return grpc_connectivity_state_get(&p->state_tracker, error); } -static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx, - grpc_lb_policy *pol, - grpc_connectivity_state *current, - grpc_closure *notify) { +static void rr_notify_on_state_change_locked(grpc_exec_ctx *exec_ctx, + grpc_lb_policy *pol, + grpc_connectivity_state *current, + grpc_closure *notify) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; - gpr_mu_lock(&p->mu); grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker, current, notify); - gpr_mu_unlock(&p->mu); } -static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, - grpc_closure *closure) { +static void rr_ping_one_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, + grpc_closure *closure) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; ready_list *selected; grpc_connected_subchannel *target; - gpr_mu_lock(&p->mu); if ((selected = peek_next_connected_locked(p))) { - gpr_mu_unlock(&p->mu); target = GRPC_CONNECTED_SUBCHANNEL_REF( grpc_subchannel_get_connected_subchannel(selected->subchannel), "rr_picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "rr_picked"); } else { - gpr_mu_unlock(&p->mu); grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_CREATE("Round Robin not connected")); } } static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = { - rr_destroy, rr_shutdown, rr_pick, - rr_cancel_pick, rr_cancel_picks, rr_ping_one, - rr_exit_idle, rr_check_connectivity, rr_notify_on_state_change}; + rr_destroy, + rr_shutdown_locked, + rr_pick_locked, + rr_cancel_pick_locked, + rr_cancel_picks_locked, + rr_ping_one_locked, + rr_exit_idle_locked, + rr_check_connectivity_locked, + rr_notify_on_state_change_locked}; static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {} @@ -717,12 +698,10 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, } if (num_addrs == 0) return NULL; - round_robin_lb_policy *p = gpr_malloc(sizeof(*p)); - memset(p, 0, sizeof(*p)); + round_robin_lb_policy *p = gpr_zalloc(sizeof(*p)); p->num_addresses = num_addrs; - p->subchannels = gpr_malloc(sizeof(*p->subchannels) * num_addrs); - memset(p->subchannels, 0, sizeof(*p->subchannels) * num_addrs); + p->subchannels = gpr_zalloc(sizeof(*p->subchannels) * num_addrs); grpc_subchannel_args sc_args; size_t subchannel_idx = 0; @@ -749,8 +728,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, grpc_channel_args_destroy(exec_ctx, new_args); if (subchannel != NULL) { - subchannel_data *sd = gpr_malloc(sizeof(*sd)); - memset(sd, 0, sizeof(*sd)); + subchannel_data *sd = gpr_zalloc(sizeof(*sd)); p->subchannels[subchannel_idx] = sd; sd->policy = p; sd->index = subchannel_idx; @@ -762,7 +740,8 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, } ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, - rr_connectivity_changed, sd, grpc_schedule_on_exec_ctx); + rr_connectivity_changed_locked, sd, + grpc_combiner_scheduler(args->combiner, false)); } } if (subchannel_idx == 0) { @@ -779,7 +758,7 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, p->ready_list.next = NULL; p->ready_list_last_pick = &p->ready_list; - grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable); + grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable, args->combiner); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "round_robin"); @@ -787,7 +766,6 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, gpr_log(GPR_DEBUG, "Created RR policy at %p with %lu subchannels", (void *)p, (unsigned long)p->num_subchannels); } - gpr_mu_init(&p->mu); return &p->base; } diff --git a/src/core/ext/load_reporting/load_reporting.c b/src/core/ext/load_reporting/load_reporting.c index 37b06a737f..942aea4fd1 100644 --- a/src/core/ext/load_reporting/load_reporting.c +++ b/src/core/ext/load_reporting/load_reporting.c @@ -34,14 +34,34 @@ #include <limits.h> #include <string.h> +#include <grpc/load_reporting.h> #include <grpc/support/alloc.h> #include <grpc/support/sync.h> #include "src/core/ext/load_reporting/load_reporting.h" #include "src/core/ext/load_reporting/load_reporting_filter.h" #include "src/core/lib/channel/channel_stack_builder.h" +#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel_init.h" +static void destroy_lr_cost_context(void *c) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_load_reporting_cost_context *cost_ctx = c; + for (size_t i = 0; i < cost_ctx->values_count; ++i) { + grpc_slice_unref_internal(&exec_ctx, cost_ctx->values[i]); + } + grpc_exec_ctx_finish(&exec_ctx); + gpr_free(cost_ctx->values); + gpr_free(cost_ctx); +} + +void grpc_call_set_load_reporting_cost_context( + grpc_call *call, grpc_load_reporting_cost_context *ctx) { + grpc_call_context_set(call, GRPC_CONTEXT_LR_COST, ctx, + destroy_lr_cost_context); +} + static bool is_load_reporting_enabled(const grpc_channel_args *a) { if (a == NULL) return false; for (size_t i = 0; i < a->num_args; i++) { diff --git a/src/core/ext/load_reporting/load_reporting.h b/src/core/ext/load_reporting/load_reporting.h index a157844734..22859a599a 100644 --- a/src/core/ext/load_reporting/load_reporting.h +++ b/src/core/ext/load_reporting/load_reporting.h @@ -35,23 +35,8 @@ #define GRPC_CORE_EXT_LOAD_REPORTING_LOAD_REPORTING_H #include <grpc/impl/codegen/grpc_types.h> -#include "src/core/lib/channel/channel_stack.h" - -/** Metadata key for the gRPC LB load balancer token. - * - * The value corresponding to this key is an opaque token that is given to the - * frontend as part of each pick; the frontend sends this token to the backend - * in each request it sends when using that pick. The token is used by the - * backend to verify the request and to allow the backend to report load to the - * gRPC LB system. */ -#define GRPC_LB_TOKEN_MD_KEY "lb-token" -/** Metadata key for gRPC LB cost reporting. - * - * The value corresponding to this key is an opaque binary blob reported by the - * backend as part of its trailing metadata containing cost information for the - * call. */ -#define GRPC_LB_COST_MD_KEY "lb-cost-bin" +#include "src/core/lib/channel/channel_stack.h" /** Identifiers for the invocation point of the users LR callback */ typedef enum grpc_load_reporting_source { diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index 8af6191c3b..4ed832671d 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -31,11 +31,13 @@ * */ +#include <string.h> + +#include <grpc/load_reporting.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/sync.h> -#include <string.h> #include "src/core/ext/load_reporting/load_reporting.h" #include "src/core/ext/load_reporting/load_reporting_filter.h" @@ -46,8 +48,6 @@ typedef struct call_data { intptr_t id; /**< an id unique to the call */ - bool have_trailing_md_string; - grpc_slice trailing_md_string; bool have_initial_md_string; grpc_slice initial_md_string; bool have_service_method; @@ -100,10 +100,8 @@ static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data, /* Constructor for call_data */ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, - grpc_call_element_args *args) { + const grpc_call_element_args *args) { call_data *calld = elem->call_data; - memset(calld, 0, sizeof(call_data)); - calld->id = (intptr_t)args->call_stack; grpc_closure_init(&calld->on_initial_md_ready, on_initial_md_ready, elem, grpc_schedule_on_exec_ctx); @@ -125,7 +123,7 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) { + grpc_closure *ignored) { call_data *calld = elem->call_data; /* TODO(dgq): do something with the data @@ -142,9 +140,6 @@ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, if (calld->have_initial_md_string) { grpc_slice_unref_internal(exec_ctx, calld->initial_md_string); } - if (calld->have_trailing_md_string) { - grpc_slice_unref_internal(exec_ctx, calld->trailing_md_string); - } if (calld->have_service_method) { grpc_slice_unref_internal(exec_ctx, calld->service_method); } @@ -157,8 +152,6 @@ static grpc_error *init_channel_elem(grpc_exec_ctx *exec_ctx, GPR_ASSERT(!args->is_last); channel_data *chand = elem->channel_data; - memset(chand, 0, sizeof(channel_data)); - chand->id = (intptr_t)args->channel_stack; /* TODO(dgq): do something with the data @@ -201,15 +194,6 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx, /* substitute our callback for the higher callback */ calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->on_initial_md_ready; - } else if (op->send_trailing_metadata) { - if (op->send_trailing_metadata->idx.named.lb_cost_bin != NULL) { - calld->trailing_md_string = grpc_slice_ref_internal( - GRPC_MDVALUE(op->send_trailing_metadata->idx.named.lb_cost_bin->md)); - calld->have_trailing_md_string = true; - grpc_metadata_batch_remove( - exec_ctx, op->send_trailing_metadata, - op->send_trailing_metadata->idx.named.lb_cost_bin); - } } grpc_call_next_op(exec_ctx, elem, op); diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 2c9623211b..d227c19c43 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -40,6 +40,7 @@ #include "src/core/ext/client_channel/lb_policy_registry.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/backoff.h" @@ -63,8 +64,6 @@ typedef struct { /** pollset_set to drive the name resolution process */ grpc_pollset_set *interested_parties; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** are we currently resolving? */ bool resolving; /** which version of the result have we published? */ @@ -95,18 +94,20 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, dns_resolver *r); -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { - dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; + dns_destroy, dns_shutdown_locked, dns_channel_saw_error_locked, + dns_next_locked}; -static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { +static void dns_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->have_retry_timer) { grpc_timer_cancel(exec_ctx, &r->retry_timer); } @@ -116,25 +117,21 @@ static void dns_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver) { GRPC_ERROR_CREATE("Resolver Shutdown")); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void dns_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void dns_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); if (!r->resolving) { gpr_backoff_reset(&r->backoff_state); dns_start_resolving_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void dns_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { dns_resolver *r = (dns_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; @@ -144,30 +141,26 @@ static void dns_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, } else { dns_maybe_finish_next_locked(exec_ctx, r); } - gpr_mu_unlock(&r->mu); } -static void dns_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_retry_timer_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; - gpr_mu_lock(&r->mu); r->have_retry_timer = false; if (error == GRPC_ERROR_NONE) { if (!r->resolving) { dns_start_resolving_locked(exec_ctx, r); } } - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "retry-timer"); } -static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { +static void dns_on_resolved_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { dns_resolver *r = arg; grpc_channel_args *result = NULL; - gpr_mu_lock(&r->mu); GPR_ASSERT(r->resolving); r->resolving = false; if (r->addresses != NULL) { @@ -198,8 +191,8 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, } else { gpr_log(GPR_DEBUG, "retrying immediately"); } - grpc_closure_init(&r->on_retry, dns_on_retry_timer, r, - grpc_schedule_on_exec_ctx); + grpc_closure_init(&r->on_retry, dns_on_retry_timer_locked, r, + grpc_combiner_scheduler(r->base.combiner, false)); grpc_timer_init(exec_ctx, &r->retry_timer, next_try, &r->on_retry, now); } if (r->resolved_result != NULL) { @@ -208,7 +201,6 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, r->resolved_result = result; r->resolved_version++; dns_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); GRPC_RESOLVER_UNREF(exec_ctx, &r->base, "dns-resolving"); } @@ -221,7 +213,8 @@ static void dns_start_resolving_locked(grpc_exec_ctx *exec_ctx, r->addresses = NULL; grpc_resolve_address( exec_ctx, r->name_to_resolve, r->default_port, r->interested_parties, - grpc_closure_create(dns_on_resolved, r, grpc_schedule_on_exec_ctx), + grpc_closure_create(dns_on_resolved_locked, r, + grpc_combiner_scheduler(r->base.combiner, false)), &r->addresses); } @@ -240,11 +233,10 @@ static void dns_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { dns_resolver *r = (dns_resolver *)gr; - gpr_mu_destroy(&r->mu); if (r->resolved_result != NULL) { grpc_channel_args_destroy(exec_ctx, r->resolved_result); } - grpc_pollset_set_destroy(r->interested_parties); + grpc_pollset_set_destroy(exec_ctx, r->interested_parties); gpr_free(r->name_to_resolve); gpr_free(r->default_port); grpc_channel_args_destroy(exec_ctx, r->channel_args); @@ -262,10 +254,8 @@ static grpc_resolver *dns_create(grpc_exec_ctx *exec_ctx, char *path = args->uri->path; if (path[0] == '/') ++path; // Create resolver. - dns_resolver *r = gpr_malloc(sizeof(dns_resolver)); - memset(r, 0, sizeof(*r)); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &dns_resolver_vtable); + dns_resolver *r = gpr_zalloc(sizeof(dns_resolver)); + grpc_resolver_init(&r->base, &dns_resolver_vtable, args->combiner); r->name_to_resolve = gpr_strdup(path); r->default_port = gpr_strdup(default_port); r->channel_args = grpc_channel_args_copy(args->args); diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index a1365f6465..da5923d26f 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -45,6 +45,7 @@ #include "src/core/ext/client_channel/parse_address.h" #include "src/core/ext/client_channel/resolver_registry.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/slice/slice_internal.h" @@ -58,8 +59,6 @@ typedef struct { grpc_lb_addresses *addresses; /** channel args */ grpc_channel_args *channel_args; - /** mutex guarding the rest of the state */ - gpr_mu mu; /** have we published? */ bool published; /** pending next completion, or NULL */ @@ -73,48 +72,43 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *r); static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r); -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, grpc_resolver *r); -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *r); -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *r, - grpc_channel_args **target_result, - grpc_closure *on_complete); +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r); +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *r); +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, grpc_resolver *r, + grpc_channel_args **target_result, + grpc_closure *on_complete); static const grpc_resolver_vtable sockaddr_resolver_vtable = { - sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, - sockaddr_next}; + sockaddr_destroy, sockaddr_shutdown_locked, + sockaddr_channel_saw_error_locked, sockaddr_next_locked}; -static void sockaddr_shutdown(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_shutdown_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_result = NULL; grpc_closure_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE); r->next_completion = NULL; } - gpr_mu_unlock(&r->mu); } -static void sockaddr_channel_saw_error(grpc_exec_ctx *exec_ctx, - grpc_resolver *resolver) { +static void sockaddr_channel_saw_error_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); r->published = false; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } -static void sockaddr_next(grpc_exec_ctx *exec_ctx, grpc_resolver *resolver, - grpc_channel_args **target_result, - grpc_closure *on_complete) { +static void sockaddr_next_locked(grpc_exec_ctx *exec_ctx, + grpc_resolver *resolver, + grpc_channel_args **target_result, + grpc_closure *on_complete) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_result = target_result; sockaddr_maybe_finish_next_locked(exec_ctx, r); - gpr_mu_unlock(&r->mu); } static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, @@ -131,7 +125,6 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; - gpr_mu_destroy(&r->mu); grpc_lb_addresses_destroy(exec_ctx, r->addresses); grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); @@ -197,12 +190,10 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, return NULL; } /* Instantiate resolver. */ - sockaddr_resolver *r = gpr_malloc(sizeof(sockaddr_resolver)); - memset(r, 0, sizeof(*r)); + sockaddr_resolver *r = gpr_zalloc(sizeof(sockaddr_resolver)); r->addresses = addresses; r->channel_args = grpc_channel_args_copy(args->args); - gpr_mu_init(&r->mu); - grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); + grpc_resolver_init(&r->base, &sockaddr_resolver_vtable, args->combiner); return &r->base; } diff --git a/src/core/ext/transport/chttp2/client/chttp2_connector.c b/src/core/ext/transport/chttp2/client/chttp2_connector.c index e7d6801a44..d49c32b671 100644 --- a/src/core/ext/transport/chttp2/client/chttp2_connector.c +++ b/src/core/ext/transport/chttp2/client/chttp2_connector.c @@ -191,7 +191,7 @@ static void chttp2_connector_connect(grpc_exec_ctx *exec_ctx, grpc_closure *notify) { chttp2_connector *c = (chttp2_connector *)con; grpc_resolved_address addr; - grpc_get_subchannel_address_arg(args->channel_args, &addr); + grpc_get_subchannel_address_arg(exec_ctx, args->channel_args, &addr); gpr_mu_lock(&c->mu); GPR_ASSERT(c->notify == NULL); c->notify = notify; @@ -213,8 +213,7 @@ static const grpc_connector_vtable chttp2_connector_vtable = { chttp2_connector_connect}; grpc_connector *grpc_chttp2_connector_create() { - chttp2_connector *c = gpr_malloc(sizeof(*c)); - memset(c, 0, sizeof(*c)); + chttp2_connector *c = gpr_zalloc(sizeof(*c)); c->base.vtable = &chttp2_connector_vtable; gpr_mu_init(&c->mu); gpr_ref_init(&c->refs, 1); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 490a0c560e..067ac35a5a 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -72,8 +72,11 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index d8c18eb122..f0c241d68e 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -83,7 +83,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *server_uri_str = server_uri_arg->value.string; GPR_ASSERT(server_uri_str != NULL); grpc_uri *server_uri = - grpc_uri_parse(server_uri_str, true /* supress errors */); + grpc_uri_parse(exec_ctx, server_uri_str, true /* supress errors */); GPR_ASSERT(server_uri != NULL); const char *server_uri_path; server_uri_path = @@ -96,7 +96,7 @@ static grpc_subchannel_args *get_secure_naming_subchannel_args( const char *target_uri_str = grpc_get_subchannel_address_uri_arg(args->args); grpc_uri *target_uri = - grpc_uri_parse(target_uri_str, false /* suppress errors */); + grpc_uri_parse(exec_ctx, target_uri_str, false /* suppress errors */); GPR_ASSERT(target_uri != NULL); if (target_uri->path[0] != '\0') { // "path" may be empty const grpc_slice key = grpc_slice_from_static_string( @@ -181,8 +181,11 @@ static grpc_channel *client_channel_factory_create_channel( grpc_arg arg; arg.type = GRPC_ARG_STRING; arg.key = GRPC_ARG_SERVER_URI; - arg.value.string = grpc_resolver_factory_add_default_prefix_if_needed(target); - grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); + arg.value.string = + grpc_resolver_factory_add_default_prefix_if_needed(exec_ctx, target); + const char *to_remove[] = {GRPC_ARG_SERVER_URI}; + grpc_channel_args *new_args = + grpc_channel_args_copy_and_add_and_remove(args, to_remove, 1, &arg, 1); gpr_free(arg.value.string); grpc_channel *channel = grpc_channel_create(exec_ctx, target, new_args, GRPC_CLIENT_CHANNEL, NULL); diff --git a/src/core/ext/transport/chttp2/server/chttp2_server.c b/src/core/ext/transport/chttp2/server/chttp2_server.c index ae2c3838ed..837b9fd03d 100644 --- a/src/core/ext/transport/chttp2/server/chttp2_server.c +++ b/src/core/ext/transport/chttp2/server/chttp2_server.c @@ -55,11 +55,6 @@ #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/server.h" -typedef struct pending_handshake_manager_node { - grpc_handshake_manager *handshake_mgr; - struct pending_handshake_manager_node *next; -} pending_handshake_manager_node; - typedef struct { grpc_server *server; grpc_tcp_server *tcp_server; @@ -68,7 +63,7 @@ typedef struct { bool shutdown; grpc_closure tcp_server_shutdown_complete; grpc_closure *server_destroy_listener_done; - pending_handshake_manager_node *pending_handshake_mgrs; + grpc_handshake_manager *pending_handshake_mgrs; } server_state; typedef struct { @@ -78,44 +73,6 @@ typedef struct { grpc_handshake_manager *handshake_mgr; } server_connection_state; -static void pending_handshake_manager_add_locked( - server_state *state, grpc_handshake_manager *handshake_mgr) { - pending_handshake_manager_node *node = gpr_malloc(sizeof(*node)); - node->handshake_mgr = handshake_mgr; - node->next = state->pending_handshake_mgrs; - state->pending_handshake_mgrs = node; -} - -static void pending_handshake_manager_remove_locked( - server_state *state, grpc_handshake_manager *handshake_mgr) { - pending_handshake_manager_node **prev_node = &state->pending_handshake_mgrs; - for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; - node != NULL; node = node->next) { - if (node->handshake_mgr == handshake_mgr) { - *prev_node = node->next; - gpr_free(node); - break; - } - prev_node = &node->next; - } -} - -static void pending_handshake_manager_shutdown_locked(grpc_exec_ctx *exec_ctx, - server_state *state, - grpc_error *why) { - pending_handshake_manager_node *prev_node = NULL; - for (pending_handshake_manager_node *node = state->pending_handshake_mgrs; - node != NULL; node = node->next) { - grpc_handshake_manager_shutdown(exec_ctx, node->handshake_mgr, - GRPC_ERROR_REF(why)); - gpr_free(prev_node); - prev_node = node; - } - gpr_free(prev_node); - state->pending_handshake_mgrs = NULL; - GRPC_ERROR_UNREF(why); -} - static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_handshaker_args *args = arg; @@ -153,8 +110,9 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, grpc_channel_args_destroy(exec_ctx, args->args); } } - pending_handshake_manager_remove_locked(connection_state->server_state, - connection_state->handshake_mgr); + grpc_handshake_manager_pending_list_remove( + &connection_state->server_state->pending_handshake_mgrs, + connection_state->handshake_mgr); gpr_mu_unlock(&connection_state->server_state->mu); grpc_handshake_manager_destroy(exec_ctx, connection_state->handshake_mgr); grpc_tcp_server_unref(exec_ctx, connection_state->server_state->tcp_server); @@ -174,7 +132,8 @@ static void on_accept(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, return; } grpc_handshake_manager *handshake_mgr = grpc_handshake_manager_create(); - pending_handshake_manager_add_locked(state, handshake_mgr); + grpc_handshake_manager_pending_list_add(&state->pending_handshake_mgrs, + handshake_mgr); gpr_mu_unlock(&state->mu); grpc_tcp_server_ref(state->tcp_server); server_connection_state *connection_state = @@ -213,8 +172,8 @@ static void tcp_server_shutdown_complete(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_lock(&state->mu); grpc_closure *destroy_done = state->server_destroy_listener_done; GPR_ASSERT(state->shutdown); - pending_handshake_manager_shutdown_locked(exec_ctx, state, - GRPC_ERROR_REF(error)); + grpc_handshake_manager_pending_list_shutdown_all( + exec_ctx, state->pending_handshake_mgrs, GRPC_ERROR_REF(error)); gpr_mu_unlock(&state->mu); // Flush queued work before destroying handshaker factory, since that // may do a synchronous unref. @@ -263,8 +222,7 @@ grpc_error *grpc_chttp2_server_add_port(grpc_exec_ctx *exec_ctx, if (err != GRPC_ERROR_NONE) { goto error; } - state = gpr_malloc(sizeof(*state)); - memset(state, 0, sizeof(*state)); + state = gpr_zalloc(sizeof(*state)); grpc_closure_init(&state->tcp_server_shutdown_complete, tcp_server_shutdown_complete, state, grpc_schedule_on_exec_ctx); diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index 8a9eaa8b6a..082078c72f 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -48,16 +48,19 @@ #include "src/core/ext/transport/chttp2/transport/varint.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/http2_errors.h" #include "src/core/lib/transport/static_metadata.h" #include "src/core/lib/transport/status_conversion.h" #include "src/core/lib/transport/timeout_encoding.h" +#include "src/core/lib/transport/transport.h" #include "src/core/lib/transport/transport_impl.h" #define DEFAULT_WINDOW 65535 @@ -66,6 +69,10 @@ #define MAX_WRITE_BUFFER_SIZE (64 * 1024 * 1024) #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) +#define DEFAULT_KEEPALIVE_TIME_SECOND INT_MAX +#define DEFAULT_KEEPALIVE_TIMEOUT_SECOND 20 +#define DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS false + #define MAX_CLIENT_STREAM_ID 0x7fffffffu int grpc_http_trace = 0; int grpc_flowctl_trace = 0; @@ -139,6 +146,16 @@ static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, #define DEFAULT_MIN_TIME_BETWEEN_PINGS_MS 0 #define DEFAULT_MAX_PINGS_BETWEEN_DATA 3 +/** keepalive-relevant functions */ +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); + /******************************************************************************* * CONSTRUCTION/DESTRUCTION/REFCOUNTING */ @@ -168,7 +185,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_map_destroy(&t->stream_map); grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker); - grpc_combiner_destroy(exec_ctx, t->combiner); + GRPC_COMBINER_UNREF(exec_ctx, t->combiner, "chttp2_transport"); cancel_pings(exec_ctx, t, GRPC_ERROR_CREATE("Transport destroyed")); @@ -218,8 +235,6 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) == GRPC_CHTTP2_CLIENT_CONNECT_STRLEN); - memset(t, 0, sizeof(*t)); - t->base.vtable = &vtable; t->ep = ep; /* one ref is for destroy */ @@ -255,6 +270,17 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_combiner_scheduler(t->combiner, false)); grpc_closure_init(&t->finish_bdp_ping_locked, finish_bdp_ping_locked, t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->init_keepalive_ping_locked, init_keepalive_ping_locked, + t, grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->start_keepalive_ping_locked, + start_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->finish_keepalive_ping_locked, + finish_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)); + grpc_closure_init(&t->keepalive_watchdog_fired_locked, + keepalive_watchdog_fired_locked, t, + grpc_combiner_scheduler(t->combiner, false)); grpc_bdp_estimator_init(&t->bdp_estimator, t->peer_string); t->last_pid_update = gpr_now(GPR_CLOCK_MONOTONIC); @@ -265,7 +291,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, .gain_d = 0, .initial_control_value = log2(DEFAULT_WINDOW), .min_control_value = -1, - .max_control_value = 22, + .max_control_value = 25, .integral_range = 10}); grpc_chttp2_goaway_parser_init(&t->goaway_parser); @@ -316,6 +342,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_time_from_millis(DEFAULT_MIN_TIME_BETWEEN_PINGS_MS, GPR_TIMESPAN), }; + /* client-side keepalive setting */ + t->keepalive_time = + DEFAULT_KEEPALIVE_TIME_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIME_SECOND, GPR_TIMESPAN); + t->keepalive_timeout = + DEFAULT_KEEPALIVE_TIMEOUT_SECOND == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(DEFAULT_KEEPALIVE_TIMEOUT_SECOND, + GPR_TIMESPAN); + t->keepalive_permit_without_calls = DEFAULT_KEEPALIVE_PERMIT_WITHOUT_CALLS; + if (channel_args) { for (i = 0; i < channel_args->num_args; i++) { if (0 == strcmp(channel_args->args[i].key, @@ -363,6 +401,28 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, strcmp(channel_args->args[i].key, GRPC_ARG_HTTP2_BDP_PROBE)) { t->enable_bdp_probe = grpc_channel_arg_get_integer( &channel_args->args[i], (grpc_integer_options){1, 0, 1}); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_TIME)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_KEEPALIVE_TIME_SECOND, 1, INT_MAX}); + t->keepalive_time = value == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(value, GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_TIMEOUT)) { + const int value = grpc_channel_arg_get_integer( + &channel_args->args[i], + (grpc_integer_options){DEFAULT_KEEPALIVE_TIMEOUT_SECOND, 0, + INT_MAX}); + t->keepalive_timeout = value == INT_MAX + ? gpr_inf_future(GPR_TIMESPAN) + : gpr_time_from_seconds(value, GPR_TIMESPAN); + } else if (0 == strcmp(channel_args->args[i].key, + GRPC_ARG_HTTP2_KEEPALIVE_PERMIT_WITHOUT_CALLS)) { + t->keepalive_permit_without_calls = + (uint32_t)grpc_channel_arg_get_integer( + &channel_args->args[i], (grpc_integer_options){0, 0, 1}); } else { static const struct { const char *channel_arg_name; @@ -414,6 +474,16 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, t->ping_state.pings_before_data_required = t->ping_policy.max_pings_without_data; + /** Start client-side keepalive pings */ + if (t->is_client) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + } + grpc_chttp2_initiate_write(exec_ctx, t, false, "init"); post_benign_reclaimer(exec_ctx, t); } @@ -441,6 +511,10 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (!grpc_error_has_clear_grpc_status(error)) { + error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE); + } if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) { if (t->close_transport_on_writes_finished == NULL) { t->close_transport_on_writes_finished = @@ -450,14 +524,26 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_error_add_child(t->close_transport_on_writes_finished, error); return; } - if (!grpc_error_has_clear_grpc_status(error)) { - error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS, - GRPC_STATUS_UNAVAILABLE); - } t->closed = 1; connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); grpc_endpoint_shutdown(exec_ctx, t->ep, GRPC_ERROR_REF(error)); + if (t->is_client) { + switch (t->keepalive_state) { + case GRPC_CHTTP2_KEEPALIVE_STATE_WAITING: { + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + break; + } + case GRPC_CHTTP2_KEEPALIVE_STATE_PINGING: { + grpc_timer_cancel(exec_ctx, &t->keepalive_ping_timer); + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + break; + } + case GRPC_CHTTP2_KEEPALIVE_STATE_DYING: { + break; + } + } + } /* flush writable stream list to avoid dangling references */ grpc_chttp2_stream *s; @@ -489,13 +575,11 @@ void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream *s) { static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { GPR_TIMER_BEGIN("init_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - memset(s, 0, sizeof(*s)); - s->t = t; s->refcount = refcount; /* We reserve one 'active stream' that's dropped when the stream is @@ -504,8 +588,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_ref_init(&s->active_streams, 1); GRPC_CHTTP2_STREAM_REF(s, "chttp2"); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0]); - grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1]); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[0], arena); + grpc_chttp2_incoming_metadata_buffer_init(&s->metadata_buffer[1], arena); grpc_chttp2_data_parser_init(&s->data_parser); grpc_slice_buffer_init(&s->flow_controlled_buffer); s->deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC); @@ -569,20 +653,29 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, void *sp, GRPC_ERROR_UNREF(s->read_closed_error); GRPC_ERROR_UNREF(s->write_closed_error); + if (s->incoming_window_delta > 0) { + GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( + "destroy", t, s, s->incoming_window_delta); + } else if (s->incoming_window_delta < 0) { + GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( + "destroy", t, s, -s->incoming_window_delta); + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "stream"); GPR_TIMER_END("destroy_stream", 0); - gpr_free(s->destroy_stream_arg); + grpc_closure_sched(exec_ctx, s->destroy_stream_arg, GRPC_ERROR_NONE); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { GPR_TIMER_BEGIN("destroy_stream", 0); grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt; grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs; - s->destroy_stream_arg = and_free_memory; + s->destroy_stream_arg = then_schedule_closure; grpc_closure_sched( exec_ctx, grpc_closure_init(&s->destroy_stream, destroy_stream_locked, s, grpc_combiner_scheduler(t->combiner, false)), @@ -1033,8 +1126,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, GPR_TIMER_BEGIN("perform_stream_op_locked", 0); grpc_transport_stream_op *op = stream_op; - grpc_chttp2_transport *t = op->transport_private.args[0]; - grpc_chttp2_stream *s = op->transport_private.args[1]; + grpc_chttp2_transport *t = op->handler_private.args[0]; + grpc_chttp2_stream *s = op->handler_private.args[1]; if (grpc_http_trace) { char *str = grpc_transport_stream_op_string(op); @@ -1106,8 +1199,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, void *stream_op, grpc_chttp2_list_add_waiting_for_concurrency(t, s); maybe_start_some_streams(exec_ctx, t); } else { - grpc_chttp2_cancel_stream(exec_ctx, t, s, - GRPC_ERROR_CREATE("Transport closed")); + grpc_chttp2_cancel_stream( + exec_ctx, t, s, + grpc_error_set_int(GRPC_ERROR_CREATE("Transport closed"), + GRPC_ERROR_INT_GRPC_STATUS, + GRPC_STATUS_UNAVAILABLE)); } } else { GPR_ASSERT(s->id != 0); @@ -1255,13 +1351,13 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, gpr_free(str); } - op->transport_private.args[0] = gt; - op->transport_private.args[1] = gs; + op->handler_private.args[0] = gt; + op->handler_private.args[1] = gs; GRPC_CHTTP2_STREAM_REF(s, "perform_stream_op"); grpc_closure_sched( exec_ctx, grpc_closure_init( - &op->transport_private.closure, perform_stream_op_locked, op, + &op->handler_private.closure, perform_stream_op_locked, op, grpc_combiner_scheduler(t->combiner, op->covered_by_poller)), GRPC_ERROR_NONE); GPR_TIMER_END("perform_stream_op", 0); @@ -1534,15 +1630,19 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, s->recv_trailing_metadata_finished != NULL) { char status_string[GPR_LTOA_MIN_BUFSIZE]; gpr_ltoa(status, status_string); - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_STATUS, - grpc_slice_from_copied_string(status_string))); + GRPC_LOG_IF_ERROR("add_status", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices( + exec_ctx, GRPC_MDSTR_GRPC_STATUS, + grpc_slice_from_copied_string(status_string)))); if (msg != NULL) { - grpc_chttp2_incoming_metadata_buffer_replace_or_add( - exec_ctx, &s->metadata_buffer[1], - grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, - grpc_slice_from_copied_string(msg))); + GRPC_LOG_IF_ERROR( + "add_status_message", + grpc_chttp2_incoming_metadata_buffer_replace_or_add( + exec_ctx, &s->metadata_buffer[1], + grpc_mdelem_from_slices(exec_ctx, GRPC_MDSTR_GRPC_MESSAGE, + grpc_slice_from_copied_string(msg)))); } s->published_metadata[1] = GRPC_METADATA_SYNTHESIZED_FROM_FAKE; grpc_chttp2_maybe_complete_recv_trailing_metadata(exec_ctx, t, s); @@ -1801,13 +1901,13 @@ static void update_bdp(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, if (delta == 0 || (bdp != 0 && delta > -1024 && delta < 1024)) { return; } + if (grpc_bdp_estimator_trace) { + gpr_log(GPR_DEBUG, "%s: update initial window size to %d", t->peer_string, + (int)bdp); + } push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, bdp); } -/******************************************************************************* - * INPUT PROCESSING - PARSING - */ - static grpc_error *try_http_parsing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_http_parser parser; @@ -1976,6 +2076,77 @@ static void finish_bdp_ping_locked(grpc_exec_ctx *exec_ctx, void *tp, GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "bdp_ping"); } +static void init_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GPR_ASSERT(t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_WAITING); + if (error == GRPC_ERROR_NONE && !(t->destroying || t->closed)) { + if (t->keepalive_permit_without_calls || t->stream_map.count > 0) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_PINGING; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive ping end"); + send_ping_locked(exec_ctx, t, GRPC_CHTTP2_PING_ON_NEXT_WRITE, + &t->start_keepalive_ping_locked, + &t->finish_keepalive_ping_locked); + } else { + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + &t->init_keepalive_ping_locked, gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "init keepalive ping"); +} + +static void start_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + GRPC_CHTTP2_REF_TRANSPORT(t, "keepalive watchdog"); + grpc_timer_init( + exec_ctx, &t->keepalive_watchdog_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_timeout), + &t->keepalive_watchdog_fired_locked, gpr_now(GPR_CLOCK_MONOTONIC)); +} + +static void finish_keepalive_ping_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_WAITING; + grpc_timer_cancel(exec_ctx, &t->keepalive_watchdog_timer); + GRPC_CHTTP2_REF_TRANSPORT(t, "init keepalive ping"); + grpc_timer_init( + exec_ctx, &t->keepalive_ping_timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), t->keepalive_time), + grpc_closure_create(init_keepalive_ping_locked, t, + grpc_combiner_scheduler(t->combiner, false)), + gpr_now(GPR_CLOCK_MONOTONIC)); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive ping end"); +} + +static void keepalive_watchdog_fired_locked(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_transport *t = arg; + if (t->keepalive_state == GRPC_CHTTP2_KEEPALIVE_STATE_PINGING) { + if (error == GRPC_ERROR_NONE) { + t->keepalive_state = GRPC_CHTTP2_KEEPALIVE_STATE_DYING; + close_transport_locked(exec_ctx, t, + GRPC_ERROR_CREATE("keepalive watchdog timeout")); + } + } else { + /** The watchdog timer should have been cancelled by + finish_keepalive_ping_locked. */ + if (error != GRPC_ERROR_CANCELLED) { + gpr_log(GPR_ERROR, "keepalive_ping_end state error: %d (expect: %d)", + t->keepalive_state, GRPC_CHTTP2_KEEPALIVE_STATE_PINGING); + } + } + GRPC_CHTTP2_UNREF_TRANSPORT(exec_ctx, t, "keepalive watchdog"); +} + /******************************************************************************* * CALLBACK LOOP */ @@ -2054,8 +2225,8 @@ static void incoming_byte_stream_update_flow_control(grpc_exec_ctx *exec_ctx, (int64_t)have_already) { write_type = GRPC_CHTTP2_STREAM_WRITE_INITIATE_COVERED; } - GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, incoming_window_delta, - add_max_recv_bytes); + GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA("op", t, s, + add_max_recv_bytes); GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", t, s, announce_window, add_max_recv_bytes); if ((int64_t)s->incoming_window_delta + (int64_t)initial_window_size - @@ -2417,7 +2588,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream), grpc_transport *grpc_create_chttp2_transport( grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args, grpc_endpoint *ep, int is_client) { - grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport)); + grpc_chttp2_transport *t = gpr_zalloc(sizeof(grpc_chttp2_transport)); init_transport(exec_ctx, t, channel_args, ep, is_client != 0); return &t->base; } diff --git a/src/core/ext/transport/chttp2/transport/frame_ping.c b/src/core/ext/transport/chttp2/transport/frame_ping.c index f487533c41..9b4b1a7b84 100644 --- a/src/core/ext/transport/chttp2/transport/frame_ping.c +++ b/src/core/ext/transport/chttp2/transport/frame_ping.c @@ -91,7 +91,7 @@ grpc_error *grpc_chttp2_ping_parser_parse(grpc_exec_ctx *exec_ctx, void *parser, grpc_chttp2_ping_parser *p = parser; while (p->byte != 8 && cur != end) { - p->opaque_8bytes |= (((uint64_t)*cur) << (8 * p->byte)); + p->opaque_8bytes |= (((uint64_t)*cur) << (56 - 8 * p->byte)); cur++; p->byte++; } diff --git a/src/core/ext/transport/chttp2/transport/frame_settings.h b/src/core/ext/transport/chttp2/transport/frame_settings.h index a29dc82106..44137798c0 100644 --- a/src/core/ext/transport/chttp2/transport/frame_settings.h +++ b/src/core/ext/transport/chttp2/transport/frame_settings.h @@ -87,7 +87,7 @@ extern const grpc_chttp2_setting_parameters grpc_chttp2_settings_parameters[GRPC_CHTTP2_NUM_SETTINGS]; /* Create a settings frame by diffing old & new, and updating old to be new */ -grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *new, +grpc_slice grpc_chttp2_settings_create(uint32_t *old, const uint32_t *newval, uint32_t force_mask, size_t count); /* Create an ack settings frame */ grpc_slice grpc_chttp2_settings_ack_create(void); diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c index 63df8e135f..84586cd998 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c +++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c @@ -173,6 +173,7 @@ static void add_header_data(framer_state *st, grpc_slice slice) { static uint8_t *add_tiny_header_data(framer_state *st, size_t len) { ensure_space(st, len); + st->stats->header_bytes += len; return grpc_slice_buffer_tiny_add(st->output, len); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c index c91b019aa0..da0a34d32a 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c @@ -41,69 +41,48 @@ #include <grpc/support/log.h> void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer) { - buffer->deadline = gpr_inf_future(GPR_CLOCK_REALTIME); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena) { + buffer->arena = arena; + grpc_metadata_batch_init(&buffer->batch); + buffer->batch.deadline = gpr_inf_future(GPR_CLOCK_REALTIME); } void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer) { - size_t i; - if (!buffer->published) { - for (i = 0; i < buffer->count; i++) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - gpr_free(buffer->elems); + grpc_metadata_batch_destroy(exec_ctx, &buffer->batch); } -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - GPR_ASSERT(!buffer->published); - if (buffer->capacity == buffer->count) { - buffer->capacity = GPR_MAX(8, 2 * buffer->capacity); - buffer->elems = - gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity); - } - buffer->elems[buffer->count++].md = elem; +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) { buffer->size += GRPC_MDELEM_LENGTH(elem); + return grpc_metadata_batch_add_tail( + exec_ctx, &buffer->batch, + gpr_arena_alloc(buffer->arena, sizeof(grpc_linked_mdelem)), elem); } -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem) { - for (size_t i = 0; i < buffer->count; i++) { - if (grpc_slice_eq(GRPC_MDKEY(buffer->elems[i].md), GRPC_MDKEY(elem))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - buffer->elems[i].md = elem; - return; + for (grpc_linked_mdelem *l = buffer->batch.list.head; l != NULL; + l = l->next) { + if (grpc_slice_eq(GRPC_MDKEY(l->md), GRPC_MDKEY(elem))) { + GRPC_MDELEM_UNREF(exec_ctx, l->md); + l->md = elem; + return GRPC_ERROR_NONE; } } - grpc_chttp2_incoming_metadata_buffer_add(buffer, elem); + return grpc_chttp2_incoming_metadata_buffer_add(exec_ctx, buffer, elem); } void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline) { - GPR_ASSERT(!buffer->published); - buffer->deadline = deadline; + buffer->batch.deadline = deadline; } void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch) { - GPR_ASSERT(!buffer->published); - buffer->published = 1; - if (buffer->count > 0) { - size_t i; - for (i = 0; i < buffer->count; i++) { - /* TODO(ctiller): do something better here */ - if (!GRPC_LOG_IF_ERROR("grpc_chttp2_incoming_metadata_buffer_publish", - grpc_metadata_batch_link_tail( - exec_ctx, batch, &buffer->elems[i]))) { - GRPC_MDELEM_UNREF(exec_ctx, buffer->elems[i].md); - } - } - } else { - batch->list.head = batch->list.tail = NULL; - } - batch->deadline = buffer->deadline; + *batch = buffer->batch; + grpc_metadata_batch_init(&buffer->batch); } diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h index 1eac6fc150..288c917e65 100644 --- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h +++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h @@ -37,28 +37,26 @@ #include "src/core/lib/transport/transport.h" typedef struct { - grpc_linked_mdelem *elems; - size_t count; - size_t capacity; - gpr_timespec deadline; - int published; + gpr_arena *arena; + grpc_metadata_batch batch; size_t size; // total size of metadata } grpc_chttp2_incoming_metadata_buffer; /** assumes everything initially zeroed */ void grpc_chttp2_incoming_metadata_buffer_init( - grpc_chttp2_incoming_metadata_buffer *buffer); + grpc_chttp2_incoming_metadata_buffer *buffer, gpr_arena *arena); void grpc_chttp2_incoming_metadata_buffer_destroy( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer); void grpc_chttp2_incoming_metadata_buffer_publish( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, grpc_metadata_batch *batch); -void grpc_chttp2_incoming_metadata_buffer_add( - grpc_chttp2_incoming_metadata_buffer *buffer, grpc_mdelem elem); -void grpc_chttp2_incoming_metadata_buffer_replace_or_add( +grpc_error *grpc_chttp2_incoming_metadata_buffer_add( grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, - grpc_mdelem elem); + grpc_mdelem elem) GRPC_MUST_USE_RESULT; +grpc_error *grpc_chttp2_incoming_metadata_buffer_replace_or_add( + grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer *buffer, + grpc_mdelem elem) GRPC_MUST_USE_RESULT; void grpc_chttp2_incoming_metadata_buffer_set_deadline( grpc_chttp2_incoming_metadata_buffer *buffer, gpr_timespec deadline); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 075d421dd4..3c56c21599 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -50,6 +50,7 @@ #include "src/core/ext/transport/chttp2/transport/stream_map.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/endpoint.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/transport/bdp_estimator.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/pid_controller.h" @@ -208,6 +209,12 @@ struct grpc_chttp2_incoming_byte_stream { grpc_closure finished_action; }; +typedef enum { + GRPC_CHTTP2_KEEPALIVE_STATE_WAITING, + GRPC_CHTTP2_KEEPALIVE_STATE_PINGING, + GRPC_CHTTP2_KEEPALIVE_STATE_DYING, +} grpc_chttp2_keepalive_state; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -271,10 +278,6 @@ struct grpc_chttp2_transport { /** data to write next write */ grpc_slice_buffer qbuf; - /** window available to announce to peer */ - int64_t announce_incoming_window; - /** how much window would we like to have for incoming_window */ - uint32_t connection_window_target; /** how much data are we willing to buffer when the WRITE_BUFFER_HINT is set? */ uint32_t write_buffer_size; @@ -328,6 +331,16 @@ struct grpc_chttp2_transport { /** window available for peer to send to us */ int64_t incoming_window; + /** calculating what we should give for incoming window: + we track the total amount of flow control over initial window size + across all streams: this is data that we want to receive right now (it + has an outstanding read) + and the total amount of flow control under initial window size across all + streams: this is data we've read early + we want to adjust incoming_window such that: + incoming_window = total_over - max(bdp - total_under, 0) */ + int64_t stream_total_over_incoming_window; + int64_t stream_total_under_incoming_window; /* deframing */ grpc_chttp2_deframe_transport_state deframe_state; @@ -376,6 +389,28 @@ struct grpc_chttp2_transport { grpc_closure benign_reclaimer_locked; /** destructive cleanup closure */ grpc_closure destructive_reclaimer_locked; + + /* keep-alive ping support */ + /** Closure to initialize a keepalive ping */ + grpc_closure init_keepalive_ping_locked; + /** Closure to run when the keepalive ping is sent */ + grpc_closure start_keepalive_ping_locked; + /** Cousure to run when the keepalive ping ack is received */ + grpc_closure finish_keepalive_ping_locked; + /** Closrue to run when the keepalive ping timeouts */ + grpc_closure keepalive_watchdog_fired_locked; + /** timer to initiate ping events */ + grpc_timer keepalive_ping_timer; + /** watchdog to kill the transport when waiting for the keepalive ping */ + grpc_timer keepalive_watchdog_timer; + /** time duration in between pings */ + gpr_timespec keepalive_time; + /** grace period for a ping to complete before watchdog kicks in */ + gpr_timespec keepalive_timeout; + /** if keepalive pings are allowed when there's no outstanding streams */ + bool keepalive_permit_without_calls; + /** keep-alive state machine state */ + grpc_chttp2_keepalive_state keepalive_state; }; typedef enum { @@ -390,7 +425,7 @@ struct grpc_chttp2_stream { grpc_stream_refcount *refcount; grpc_closure destroy_stream; - void *destroy_stream_arg; + grpc_closure *destroy_stream_arg; grpc_chttp2_stream_link links[STREAM_LIST_COUNT]; uint8_t included[STREAM_LIST_COUNT]; @@ -634,6 +669,44 @@ typedef enum { GRPC_CHTTP2_FLOW_CREDIT_COMMON(phase, dst_context, 0, dst_context, dst_var, \ amount) +#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE( \ + phase, transport, dst_context) \ + if (dst_context->incoming_window_delta < 0) { \ + transport->stream_total_under_incoming_window += \ + dst_context->incoming_window_delta; \ + } else if (dst_context->incoming_window_delta > 0) { \ + transport->stream_total_over_incoming_window -= \ + dst_context->incoming_window_delta; \ + } + +#define GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE( \ + phase, transport, dst_context) \ + if (dst_context->incoming_window_delta < 0) { \ + transport->stream_total_under_incoming_window -= \ + dst_context->incoming_window_delta; \ + } else if (dst_context->incoming_window_delta > 0) { \ + transport->stream_total_over_incoming_window += \ + dst_context->incoming_window_delta; \ + } + +#define GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA( \ + phase, transport, dst_context, amount) \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \ + dst_context); \ + GRPC_CHTTP2_FLOW_DEBIT_STREAM(phase, transport, dst_context, \ + incoming_window_delta, amount); \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \ + dst_context); + +#define GRPC_CHTTP2_FLOW_CREDIT_STREAM_INCOMING_WINDOW_DELTA( \ + phase, transport, dst_context, amount) \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_PREUPDATE(phase, transport, \ + dst_context); \ + GRPC_CHTTP2_FLOW_CREDIT_STREAM(phase, transport, dst_context, \ + incoming_window_delta, amount); \ + GRPC_CHTTP2_FLOW_STREAM_INCOMING_WINDOW_DELTA_POSTUPDATE(phase, transport, \ + dst_context); + #define GRPC_CHTTP2_FLOW_DEBIT_COMMON(phase, transport, id, dst_context, \ dst_var, amount) \ do { \ @@ -752,4 +825,6 @@ void grpc_chttp2_fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s, grpc_error *error); +uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t); + #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 24bd93067b..7efc8c63c9 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -376,34 +376,47 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, return err; } - uint32_t target_incoming_window = GPR_MAX( - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - 1024); - GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, - incoming_frame_size); - if (t->incoming_window <= target_incoming_window / 2) { - grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control"); - } - if (s != NULL) { if (incoming_frame_size > s->incoming_window_delta + t->settings[GRPC_ACKED_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { - char *msg; - gpr_asprintf(&msg, - "frame of size %d overflows incoming window of %" PRId64, - t->incoming_frame_size, - s->incoming_window_delta + - t->settings[GRPC_ACKED_SETTINGS] - [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); - grpc_error *err = GRPC_ERROR_CREATE(msg); - gpr_free(msg); - return err; + if (incoming_frame_size <= + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]) { + gpr_log( + GPR_ERROR, + "Incoming frame of size %d exceeds incoming window size of %" PRId64 + ".\n" + "The (un-acked, future) window size would be %" PRId64 + " which is not exceeded.\n" + "This would usually cause a disconnection, but allowing it due to " + "broken HTTP2 implementations in the wild.\n" + "See (for example) https://github.com/netty/netty/issues/6520.", + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], + s->incoming_window_delta + + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + } else { + char *msg; + gpr_asprintf(&msg, + "frame of size %d overflows incoming window of %" PRId64, + t->incoming_frame_size, + s->incoming_window_delta + + t->settings[GRPC_ACKED_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE]); + grpc_error *err = GRPC_ERROR_CREATE(msg); + gpr_free(msg); + return err; + } } - GRPC_CHTTP2_FLOW_DEBIT_STREAM("parse", t, s, incoming_window_delta, - incoming_frame_size); + GRPC_CHTTP2_FLOW_DEBIT_STREAM_INCOMING_WINDOW_DELTA("parse", t, s, + incoming_frame_size); if ((int64_t)t->settings[GRPC_SENT_SETTINGS] [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] + (int64_t)s->incoming_window_delta - (int64_t)s->announce_window <= @@ -417,6 +430,13 @@ static grpc_error *update_incoming_window(grpc_exec_ctx *exec_ctx, s->received_bytes += incoming_frame_size; } + uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t); + GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("parse", t, incoming_window, + incoming_frame_size); + if (t->incoming_window <= target_incoming_window / 2) { + grpc_chttp2_initiate_write(exec_ctx, t, false, "flow_control"); + } + return GRPC_ERROR_NONE; } @@ -528,7 +548,14 @@ static void on_initial_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[0], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[0], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } } @@ -578,7 +605,14 @@ static void on_trailing_header(grpc_exec_ctx *exec_ctx, void *tp, s->seen_error = true; GRPC_MDELEM_UNREF(exec_ctx, md); } else { - grpc_chttp2_incoming_metadata_buffer_add(&s->metadata_buffer[1], md); + grpc_error *error = grpc_chttp2_incoming_metadata_buffer_add( + exec_ctx, &s->metadata_buffer[1], md); + if (error != GRPC_ERROR_NONE) { + grpc_chttp2_cancel_stream(exec_ctx, t, s, error); + grpc_chttp2_parsing_become_skip_parser(exec_ctx, t); + s->seen_error = true; + GRPC_MDELEM_UNREF(exec_ctx, md); + } } GPR_TIMER_END("on_trailing_header", 0); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index 05e6f59947..2b9d93cae7 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -152,6 +152,17 @@ static bool stream_ref_if_not_destroyed(gpr_refcount *r) { return true; } +uint32_t grpc_chttp2_target_incoming_window(grpc_chttp2_transport *t) { + return (uint32_t)GPR_MAX( + (int64_t)((1u << 31) - 1), + t->stream_total_over_incoming_window + + (int64_t)GPR_MAX( + t->settings[GRPC_SENT_SETTINGS] + [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE] - + t->stream_total_under_incoming_window, + 0)); +} + bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_stream *s; @@ -310,13 +321,12 @@ bool grpc_chttp2_begin_write(grpc_exec_ctx *exec_ctx, /* if the grpc_chttp2_transport is ready to send a window update, do so here also; 3/4 is a magic number that will likely get tuned soon */ - uint32_t target_incoming_window = GPR_MAX( - t->settings[GRPC_SENT_SETTINGS][GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE], - 1024); + uint32_t target_incoming_window = grpc_chttp2_target_incoming_window(t); uint32_t threshold_to_send_transport_window_update = t->outbuf.count > 0 ? 3 * target_incoming_window / 4 : target_incoming_window / 2; - if (t->incoming_window <= threshold_to_send_transport_window_update) { + if (t->incoming_window <= threshold_to_send_transport_window_update && + t->incoming_window != target_incoming_window) { maybe_initiate_ping(exec_ctx, t, GRPC_CHTTP2_PING_BEFORE_TRANSPORT_WINDOW_UPDATE); uint32_t announced = (uint32_t)GPR_CLAMP( diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c index 477cf07f45..b6e9e845df 100644 --- a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c +++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/ext/transport/cronet/transport/cronet_transport.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/transport_impl.h" @@ -54,16 +55,14 @@ extern grpc_transport_vtable grpc_cronet_vtable; GRPCAPI grpc_channel *grpc_cronet_secure_channel_create( void *engine, const char *target, const grpc_channel_args *args, void *reserved) { - cronet_transport *ct = gpr_malloc(sizeof(cronet_transport)); - ct->base.vtable = &grpc_cronet_vtable; - ct->engine = engine; - ct->host = gpr_malloc(strlen(target) + 1); - strcpy(ct->host, target); gpr_log(GPR_DEBUG, "grpc_create_cronet_transport: stream_engine = %p, target=%s", engine, - ct->host); + target); + + grpc_transport *ct = + grpc_create_cronet_transport(engine, target, args, reserved); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; return grpc_channel_create(&exec_ctx, target, args, - GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct); + GRPC_CLIENT_DIRECT_CHANNEL, ct); } diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c index da6c0b4fbc..0dc6a5152f 100644 --- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c +++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c @@ -80,4 +80,16 @@ void bidirectional_stream_cancel(bidirectional_stream* stream) { GPR_ASSERT(0); } +void bidirectional_stream_disable_auto_flush(bidirectional_stream* stream, + bool disable_auto_flush) { + GPR_ASSERT(0); +} + +void bidirectional_stream_delay_request_headers_until_flush( + bidirectional_stream* stream, bool delay_headers_until_flush) { + GPR_ASSERT(0); +} + +void bidirectional_stream_flush(bidirectional_stream* stream) { GPR_ASSERT(0); } + #endif /* GRPC_COMPILE_WITH_CRONET */ diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index d755b1f147..450d9ab23a 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -54,6 +54,7 @@ #include "third_party/objective_c/Cronet/bidirectional_stream_c.h" #define GRPC_HEADER_SIZE_IN_BYTES 5 +#define GRPC_FLUSH_READ_SIZE 4096 #define CRONET_LOG(...) \ do { \ @@ -88,7 +89,7 @@ enum e_op_id { /* Cronet callbacks. See cronet_c_for_grpc.h for documentation for each. */ -static void on_request_headers_sent(bidirectional_stream *); +static void on_stream_ready(bidirectional_stream *); static void on_response_headers_received( bidirectional_stream *, const bidirectional_stream_header_array *, const char *); @@ -100,7 +101,7 @@ static void on_succeeded(bidirectional_stream *); static void on_failed(bidirectional_stream *, int); static void on_canceled(bidirectional_stream *); static bidirectional_stream_callback cronet_callbacks = { - on_request_headers_sent, + on_stream_ready, on_response_headers_received, on_read_completed, on_write_completed, @@ -114,6 +115,7 @@ struct grpc_cronet_transport { grpc_transport base; /* must be first element in this structure */ stream_engine *engine; char *host; + bool use_packet_coalescing; }; typedef struct grpc_cronet_transport grpc_cronet_transport; @@ -150,8 +152,17 @@ struct write_state { struct op_state { bool state_op_done[OP_NUM_OPS]; bool state_callback_received[OP_NUM_OPS]; + /* A non-zero gRPC status code has been seen */ bool fail_state; + /* Transport is discarding all buffered messages */ bool flush_read; + bool flush_cronet_when_ready; + bool pending_write_for_trailer; + bool pending_send_message; + /* User requested RECV_TRAILING_METADATA */ + bool pending_recv_trailing_metadata; + /* Cronet has not issued a callback of a bidirectional read */ + bool pending_read_from_cronet; grpc_error *cancel_error; /* data structure for storing data coming from server */ struct read_state rs; @@ -173,9 +184,10 @@ struct op_storage { }; struct stream_obj { + gpr_arena *arena; struct op_and_state *oas; grpc_transport_stream_op *curr_op; - grpc_cronet_transport curr_ct; + grpc_cronet_transport *curr_ct; grpc_stream *curr_gs; bidirectional_stream *cbs; bidirectional_stream_header_array header_array; @@ -244,11 +256,35 @@ static const char *op_id_string(enum e_op_id i) { return "UNKNOWN"; } -static void free_read_buffer(stream_obj *s) { +static void null_and_maybe_free_read_buffer(stream_obj *s) { if (s->state.rs.read_buffer && s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { gpr_free(s->state.rs.read_buffer); - s->state.rs.read_buffer = NULL; + } + s->state.rs.read_buffer = NULL; +} + +static void maybe_flush_read(stream_obj *s) { + /* To enter flush read state (discarding all the buffered messages in + * transport layer), two conditions must be satisfied: 1) non-zero grpc status + * has been received, and 2) an op requesting the status code + * (RECV_TRAILING_METADATA) is issued by the user. (See + * doc/status_ordering.md) */ + /* Whenever the evaluation of any of the two condition is changed, we check + * whether we should enter the flush read state. */ + if (s->state.pending_recv_trailing_metadata && s->state.fail_state) { + if (!s->state.flush_read && !s->state.rs.read_stream_closed) { + CRONET_LOG(GPR_DEBUG, "%p: Flush read", s); + s->state.flush_read = true; + null_and_maybe_free_read_buffer(s); + s->state.rs.read_buffer = gpr_malloc(GRPC_FLUSH_READ_SIZE); + if (!s->state.pending_read_from_cronet) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + GRPC_FLUSH_READ_SIZE); + s->state.pending_read_from_cronet = true; + } + } } } @@ -274,6 +310,13 @@ static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) { new_op->next = storage->head; storage->head = new_op; storage->num_pending_ops++; + if (op->send_message) { + s->state.pending_send_message = true; + } + if (op->recv_trailing_metadata) { + s->state.pending_recv_trailing_metadata = true; + maybe_flush_read(s); + } CRONET_LOG(GPR_DEBUG, "adding new op %p. %d in the queue.", new_op, storage->num_pending_ops); gpr_mu_unlock(&s->mu); @@ -360,7 +403,7 @@ static void on_failed(bidirectional_stream *stream, int net_error) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -383,7 +426,7 @@ static void on_canceled(bidirectional_stream *stream) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -398,7 +441,7 @@ static void on_succeeded(bidirectional_stream *stream) { bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_SUCCEEDED] = true; s->cbs = NULL; - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -406,9 +449,10 @@ static void on_succeeded(bidirectional_stream *stream) { /* Cronet callback */ -static void on_request_headers_sent(bidirectional_stream *stream) { - CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream); +static void on_stream_ready(bidirectional_stream *stream) { + CRONET_LOG(GPR_DEBUG, "W: on_stream_ready(%p)", stream); stream_obj *s = (stream_obj *)stream->annotation; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true; s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true; @@ -417,6 +461,14 @@ static void on_request_headers_sent(bidirectional_stream *stream) { gpr_free(s->header_array.headers); s->header_array.headers = NULL; } + /* Send the initial metadata on wire if there is no SEND_MESSAGE or + * SEND_TRAILING_METADATA ops pending */ + if (t->use_packet_coalescing) { + if (s->state.flush_cronet_when_ready) { + CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(stream); + } + } gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -435,15 +487,18 @@ static void on_response_headers_received( gpr_mu_lock(&s->mu); memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata)); - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata, + s->arena); for (size_t i = 0; i < headers->count; i++) { - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.initial_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - headers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(headers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_headers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.initial_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + headers->headers[i].value))))); } s->state.state_callback_received[OP_RECV_INITIAL_METADATA] = true; if (!(s->state.state_op_done[OP_CANCEL_ERROR] || @@ -457,6 +512,7 @@ static void on_response_headers_received( CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, s->state.rs.remaining_bytes); + s->state.pending_read_from_cronet = true; } gpr_mu_unlock(&s->mu); grpc_exec_ctx_finish(&exec_ctx); @@ -488,10 +544,13 @@ static void on_read_completed(bidirectional_stream *stream, char *data, CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count); gpr_mu_lock(&s->mu); + s->state.pending_read_from_cronet = false; s->state.state_callback_received[OP_RECV_MESSAGE] = true; if (count > 0 && s->state.flush_read) { CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); - bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, 4096); + bidirectional_stream_read(s->cbs, s->state.rs.read_buffer, + GRPC_FLUSH_READ_SIZE); + s->state.pending_read_from_cronet = true; gpr_mu_unlock(&s->mu); } else if (count > 0) { s->state.rs.received_bytes += count; @@ -502,16 +561,14 @@ static void on_read_completed(bidirectional_stream *stream, char *data, bidirectional_stream_read( s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, s->state.rs.remaining_bytes); + s->state.pending_read_from_cronet = true; gpr_mu_unlock(&s->mu); } else { gpr_mu_unlock(&s->mu); execute_from_storage(s); } } else { - if (s->state.flush_read) { - gpr_free(s->state.rs.read_buffer); - s->state.rs.read_buffer = NULL; - } + null_and_maybe_free_read_buffer(s); s->state.rs.read_stream_closed = true; gpr_mu_unlock(&s->mu); execute_from_storage(s); @@ -528,25 +585,30 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream, trailers); stream_obj *s = (stream_obj *)stream->annotation; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; gpr_mu_lock(&s->mu); memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata)); s->state.rs.trailing_metadata_valid = false; - grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata); + grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata, + s->arena); for (size_t i = 0; i < trailers->count; i++) { CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key, trailers->headers[i].value); - grpc_chttp2_incoming_metadata_buffer_add( - &s->state.rs.trailing_metadata, - grpc_mdelem_from_slices( - &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( - trailers->headers[i].key)), - grpc_slice_intern( - grpc_slice_from_static_string(trailers->headers[i].value)))); + GRPC_LOG_IF_ERROR( + "on_response_trailers_received", + grpc_chttp2_incoming_metadata_buffer_add( + &exec_ctx, &s->state.rs.trailing_metadata, + grpc_mdelem_from_slices( + &exec_ctx, grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].key)), + grpc_slice_intern(grpc_slice_from_static_string( + trailers->headers[i].value))))); s->state.rs.trailing_metadata_valid = true; if (0 == strcmp(trailers->headers[i].key, "grpc-status") && 0 != strcmp(trailers->headers[i].value, "0")) { s->state.fail_state = true; + maybe_flush_read(s); } } s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true; @@ -558,6 +620,10 @@ static void on_response_trailers_received( CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); s->state.state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, "", 0, true); + if (t->use_packet_coalescing) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + } s->state.state_op_done[OP_SEND_TRAILING_METADATA] = true; gpr_mu_unlock(&s->mu); @@ -607,7 +673,7 @@ static void convert_metadata_to_cronet_headers( curr = curr->next; num_headers_available++; } - /* Allocate enough memory. It is freed in the on_request_headers_sent callback + /* Allocate enough memory. It is freed in the on_stream_ready callback */ bidirectional_stream_header *headers = (bidirectional_stream_header *)gpr_malloc( @@ -687,8 +753,10 @@ static bool header_has_authority(grpc_linked_mdelem *head) { executed. This is the heart of the state machine. */ static bool op_can_be_run(grpc_transport_stream_op *curr_op, - struct op_state *stream_state, - struct op_state *op_state, enum e_op_id op_id) { + struct stream_obj *s, struct op_state *op_state, + enum e_op_id op_id) { + struct op_state *stream_state = &s->state; + grpc_cronet_transport *t = s->curr_ct; bool result = true; /* When call is canceled, every op can be run, except under following conditions @@ -755,12 +823,14 @@ static bool op_can_be_run(grpc_transport_stream_op *curr_op, else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false; /* we haven't sent message yet */ - else if (curr_op->send_message && + else if (stream_state->pending_send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false; /* we haven't got on_write_completed for the send yet */ else if (stream_state->state_op_done[OP_SEND_MESSAGE] && - !stream_state->state_callback_received[OP_SEND_MESSAGE]) + !stream_state->state_callback_received[OP_SEND_MESSAGE] && + !(t->use_packet_coalescing && + stream_state->pending_write_for_trailer)) result = false; } else if (op_id == OP_CANCEL_ERROR) { /* already executed */ @@ -833,24 +903,28 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, struct op_and_state *oas) { grpc_transport_stream_op *stream_op = &oas->op; struct stream_obj *s = oas->s; + grpc_cronet_transport *t = (grpc_cronet_transport *)s->curr_ct; struct op_state *stream_state = &s->state; enum e_op_result result = NO_ACTION_POSSIBLE; if (stream_op->send_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_INITIAL_METADATA)) { + op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas); /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled, * on_failed */ GPR_ASSERT(s->cbs == NULL); GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]); - s->cbs = bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, - &cronet_callbacks); + s->cbs = + bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs); + if (t->use_packet_coalescing) { + bidirectional_stream_disable_auto_flush(s->cbs, true); + bidirectional_stream_delay_request_headers_until_flush(s->cbs, true); + } char *url = NULL; const char *method = "POST"; s->header_array.headers = NULL; convert_metadata_to_cronet_headers( - stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, + stream_op->send_initial_metadata->list.head, t->host, &url, &s->header_array.headers, &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url); @@ -862,30 +936,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, gpr_free((void *)s->header_array.headers[header_index].value); } stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; - result = ACTION_TAKEN_WITH_CALLBACK; - } else if (stream_op->recv_initial_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_RECV_INITIAL_METADATA)) { - CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); - if (stream_state->state_op_done[OP_CANCEL_ERROR]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); - } else if (stream_state->state_callback_received[OP_FAILED]) { - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); - } else { - grpc_chttp2_incoming_metadata_buffer_publish( - exec_ctx, &oas->s->state.rs.initial_metadata, - stream_op->recv_initial_metadata); - grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, - GRPC_ERROR_NONE); + if (t->use_packet_coalescing) { + if (!stream_op->send_message && !stream_op->send_trailing_metadata) { + s->state.flush_cronet_when_ready = true; + } } - stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; - result = ACTION_TAKEN_NO_CALLBACK; + result = ACTION_TAKEN_WITH_CALLBACK; } else if (stream_op->send_message && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_MESSAGE)) { + op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas); + stream_state->pending_send_message = false; if (stream_state->state_callback_received[OP_FAILED]) { result = NO_ACTION_POSSIBLE; CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); @@ -916,16 +976,63 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_callback_received[OP_SEND_MESSAGE] = false; bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer, (int)write_buffer_size, false); - result = ACTION_TAKEN_WITH_CALLBACK; + if (t->use_packet_coalescing) { + if (!stream_op->send_trailing_metadata) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + result = ACTION_TAKEN_WITH_CALLBACK; + } else { + stream_state->pending_write_for_trailer = true; + result = ACTION_TAKEN_NO_CALLBACK; + } + } else { + result = ACTION_TAKEN_WITH_CALLBACK; + } } else { result = NO_ACTION_POSSIBLE; } } stream_state->state_op_done[OP_SEND_MESSAGE] = true; oas->state.state_op_done[OP_SEND_MESSAGE] = true; + } else if (stream_op->send_trailing_metadata && + op_can_be_run(stream_op, s, &oas->state, + OP_SEND_TRAILING_METADATA)) { + CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); + if (stream_state->state_callback_received[OP_FAILED]) { + result = NO_ACTION_POSSIBLE; + CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); + } else { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); + stream_state->state_callback_received[OP_SEND_MESSAGE] = false; + bidirectional_stream_write(s->cbs, "", 0, true); + if (t->use_packet_coalescing) { + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs); + bidirectional_stream_flush(s->cbs); + } + result = ACTION_TAKEN_WITH_CALLBACK; + } + stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; + } else if (stream_op->recv_initial_metadata && + op_can_be_run(stream_op, s, &oas->state, + OP_RECV_INITIAL_METADATA)) { + CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas); + if (stream_state->state_op_done[OP_CANCEL_ERROR]) { + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); + } else if (stream_state->state_callback_received[OP_FAILED]) { + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); + } else { + grpc_chttp2_incoming_metadata_buffer_publish( + exec_ctx, &oas->s->state.rs.initial_metadata, + stream_op->recv_initial_metadata); + grpc_closure_sched(exec_ctx, stream_op->recv_initial_metadata_ready, + GRPC_ERROR_NONE); + } + stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_op->recv_message && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_RECV_MESSAGE)) { + op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { CRONET_LOG(GPR_DEBUG, "Stream is cancelled."); @@ -947,6 +1054,13 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; result = ACTION_TAKEN_NO_CALLBACK; + } else if (stream_state->flush_read) { + CRONET_LOG(GPR_DEBUG, "flush read"); + grpc_closure_sched(exec_ctx, stream_op->recv_message_ready, + GRPC_ERROR_NONE); + stream_state->state_op_done[OP_RECV_MESSAGE] = true; + oas->state.state_op_done[OP_RECV_MESSAGE] = true; + result = ACTION_TAKEN_NO_CALLBACK; } else if (stream_state->rs.length_field_received == false) { if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && stream_state->rs.remaining_bytes == 0) { @@ -967,6 +1081,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_WITH_CALLBACK; } else { stream_state->rs.remaining_bytes = 0; @@ -980,6 +1095,18 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, GRPC_ERROR_NONE); stream_state->state_op_done[OP_RECV_MESSAGE] = true; oas->state.state_op_done[OP_RECV_MESSAGE] = true; + + /* Extra read to trigger on_succeed */ + stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes; + stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES; + stream_state->rs.received_bytes = 0; + stream_state->rs.length_field_received = false; + CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); + stream_state->state_op_done[OP_READ_REQ_MADE] = + true; /* Indicates that at least one read request has been made */ + bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, + stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_state->rs.remaining_bytes == 0) { @@ -992,6 +1119,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, true; /* Indicates that at least one read request has been made */ bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_WITH_CALLBACK; } else { result = NO_ACTION_POSSIBLE; @@ -1003,7 +1131,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, uint8_t *dst_p = GRPC_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); - free_read_buffer(s); + null_and_maybe_free_read_buffer(s); grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer); grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); @@ -1024,10 +1152,11 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs); bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer, stream_state->rs.remaining_bytes); + stream_state->pending_read_from_cronet = true; result = ACTION_TAKEN_NO_CALLBACK; } } else if (stream_op->recv_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, + op_can_be_run(stream_op, s, &oas->state, OP_RECV_TRAILING_METADATA)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas); if (oas->s->state.rs.trailing_metadata_valid) { @@ -1038,23 +1167,8 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, } stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true; result = ACTION_TAKEN_NO_CALLBACK; - } else if (stream_op->send_trailing_metadata && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_SEND_TRAILING_METADATA)) { - CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas); - if (stream_state->state_callback_received[OP_FAILED]) { - result = NO_ACTION_POSSIBLE; - CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed."); - } else { - CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs); - stream_state->state_callback_received[OP_SEND_MESSAGE] = false; - bidirectional_stream_write(s->cbs, "", 0, true); - result = ACTION_TAKEN_WITH_CALLBACK; - } - stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true; } else if (stream_op->cancel_error && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_CANCEL_ERROR)) { + op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas); CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs); if (s->cbs) { @@ -1068,8 +1182,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, stream_state->cancel_error = GRPC_ERROR_REF(stream_op->cancel_error); } } else if (stream_op->on_complete && - op_can_be_run(stream_op, stream_state, &oas->state, - OP_ON_COMPLETE)) { + op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) { CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas); if (stream_state->state_op_done[OP_CANCEL_ERROR]) { grpc_closure_sched(exec_ctx, stream_op->on_complete, @@ -1097,15 +1210,6 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, make a note */ if (stream_op->recv_message) stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true; - } else if (stream_state->fail_state && !stream_state->flush_read) { - CRONET_LOG(GPR_DEBUG, "running: %p flush read", oas); - if (stream_state->rs.read_buffer && - stream_state->rs.read_buffer != stream_state->rs.grpc_header_bytes) { - gpr_free(stream_state->rs.read_buffer); - stream_state->rs.read_buffer = NULL; - } - stream_state->rs.read_buffer = gpr_malloc(4096); - stream_state->flush_read = true; } else { result = NO_ACTION_POSSIBLE; } @@ -1118,7 +1222,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_stream_refcount *refcount, - const void *server_data) { + const void *server_data, gpr_arena *arena) { stream_obj *s = (stream_obj *)gs; memset(&s->storage, 0, sizeof(s->storage)); s->storage.head = NULL; @@ -1133,6 +1237,15 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(s->state.state_callback_received)); s->state.fail_state = s->state.flush_read = false; s->state.cancel_error = NULL; + s->state.flush_cronet_when_ready = s->state.pending_write_for_trailer = false; + s->state.pending_send_message = false; + s->state.pending_recv_trailing_metadata = false; + s->state.pending_read_from_cronet = false; + + s->curr_gs = gs; + s->curr_ct = (grpc_cronet_transport *)gt; + s->arena = arena; + gpr_mu_init(&s->mu); return 0; } @@ -1147,40 +1260,33 @@ static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx, static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_stream *gs, grpc_transport_stream_op *op) { CRONET_LOG(GPR_DEBUG, "perform_stream_op"); - stream_obj *s = (stream_obj *)gs; - s->curr_gs = gs; - memcpy(&s->curr_ct, gt, sizeof(grpc_cronet_transport)); - add_to_storage(s, op); if (op->send_initial_metadata && header_has_authority(op->send_initial_metadata->list.head)) { /* Cronet does not support :authority header field. We cancel the call when - this field is present in metadata */ - bidirectional_stream_header_array header_array; - bidirectional_stream_header *header; - bidirectional_stream cbs; - CRONET_LOG(GPR_DEBUG, - ":authority header is provided but not supported;" - " cancel operations"); - /* Notify application that operation is cancelled by forging trailers */ - header_array.count = 1; - header_array.capacity = 1; - header_array.headers = gpr_malloc(sizeof(bidirectional_stream_header)); - header = (bidirectional_stream_header *)header_array.headers; - header->key = "grpc-status"; - header->value = "1"; /* Return status GRPC_STATUS_CANCELLED */ - cbs.annotation = (void *)s; - s->state.state_op_done[OP_CANCEL_ERROR] = true; - on_response_trailers_received(&cbs, &header_array); - gpr_free(header_array.headers); - } else { - execute_from_storage(s); + this field is present in metadata */ + if (op->recv_initial_metadata_ready) { + grpc_closure_sched(exec_ctx, op->recv_initial_metadata_ready, + GRPC_ERROR_CANCELLED); + } + if (op->recv_message_ready) { + grpc_closure_sched(exec_ctx, op->recv_message_ready, + GRPC_ERROR_CANCELLED); + } + grpc_closure_sched(exec_ctx, op->on_complete, GRPC_ERROR_CANCELLED); + return; } + stream_obj *s = (stream_obj *)gs; + add_to_storage(s, op); + execute_from_storage(s); } static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt, - grpc_stream *gs, void *and_free_memory) { + grpc_stream *gs, + grpc_closure *then_schedule_closure) { stream_obj *s = (stream_obj *)gs; + null_and_maybe_free_read_buffer(s); GRPC_ERROR_UNREF(s->state.cancel_error); + grpc_closure_sched(exec_ctx, then_schedule_closure, GRPC_ERROR_NONE); } static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {} @@ -1197,14 +1303,58 @@ static grpc_endpoint *get_endpoint(grpc_exec_ctx *exec_ctx, static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, grpc_transport_op *op) {} -const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj), - "cronet_http", - init_stream, - set_pollset_do_nothing, - set_pollset_set_do_nothing, - perform_stream_op, - perform_op, - destroy_stream, - destroy_transport, - get_peer, - get_endpoint}; +static const grpc_transport_vtable grpc_cronet_vtable = { + sizeof(stream_obj), + "cronet_http", + init_stream, + set_pollset_do_nothing, + set_pollset_set_do_nothing, + perform_stream_op, + perform_op, + destroy_stream, + destroy_transport, + get_peer, + get_endpoint}; + +grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, + const grpc_channel_args *args, + void *reserved) { + grpc_cronet_transport *ct = gpr_malloc(sizeof(grpc_cronet_transport)); + if (!ct) { + goto error; + } + ct->base.vtable = &grpc_cronet_vtable; + ct->engine = engine; + ct->host = gpr_malloc(strlen(target) + 1); + if (!ct->host) { + goto error; + } + strcpy(ct->host, target); + + ct->use_packet_coalescing = true; + if (args) { + for (size_t i = 0; i < args->num_args; i++) { + if (0 == + strcmp(args->args[i].key, GRPC_ARG_USE_CRONET_PACKET_COALESCING)) { + if (args->args[i].type != GRPC_ARG_INTEGER) { + gpr_log(GPR_ERROR, "%s ignored: it must be an integer", + GRPC_ARG_USE_CRONET_PACKET_COALESCING); + } else { + ct->use_packet_coalescing = (args->args[i].value.integer != 0); + } + } + } + } + + return &ct->base; + +error: + if (ct) { + if (ct->host) { + gpr_free(ct->host); + } + gpr_free(ct); + } + + return NULL; +} diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.h b/src/core/ext/transport/cronet/transport/cronet_transport.h new file mode 100644 index 0000000000..169ce31fd7 --- /dev/null +++ b/src/core/ext/transport/cronet/transport/cronet_transport.h @@ -0,0 +1,43 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H +#define GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H + +#include "src/core/lib/transport/transport.h" + +grpc_transport *grpc_create_cronet_transport(void *engine, const char *target, + const grpc_channel_args *args, + void *reserved); + +#endif /* GRPC_CORE_EXT_TRANSPORT_CRONET_TRANSPORT_CRONET_TRANSPORT_H */ |