diff options
Diffstat (limited to 'src/core')
65 files changed, 1105 insertions, 754 deletions
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c index 5a283de468..397dbc40a8 100644 --- a/src/core/ext/census/grpc_filter.c +++ b/src/core/ext/census/grpc_filter.c @@ -191,6 +191,7 @@ const grpc_channel_filter grpc_client_census_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "census-client"}; const grpc_channel_filter grpc_server_census_filter = { @@ -204,4 +205,5 @@ const grpc_channel_filter grpc_server_census_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "census-server"}; diff --git a/src/core/ext/client_channel/client_channel.c b/src/core/ext/client_channel/client_channel.c index ff773ac334..7954fcfb8b 100644 --- a/src/core/ext/client_channel/client_channel.c +++ b/src/core/ext/client_channel/client_channel.c @@ -39,6 +39,7 @@ #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/useful.h> @@ -91,8 +92,12 @@ static int method_parameters_cmp(void *value1, void *value2) { return 0; } +static void method_parameters_del(grpc_exec_ctx *exec_ctx, void *p) { + gpr_free(p); +} + static const grpc_mdstr_hash_table_vtable method_parameters_vtable = { - gpr_free, method_parameters_copy, method_parameters_cmp}; + method_parameters_del, method_parameters_copy, method_parameters_cmp}; static void *method_config_convert_value( const grpc_method_config *method_config) { @@ -123,6 +128,7 @@ typedef struct client_channel_channel_data { /** mutex protecting all variables below in this data structure */ gpr_mu mu; /** currently active load balancer */ + char *lb_policy_name; grpc_lb_policy *lb_policy; /** maps method names to method_parameters structs */ grpc_mdstr_hash_table *method_params_table; @@ -223,6 +229,7 @@ static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand, static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { channel_data *chand = arg; + char *lb_policy_name = NULL; grpc_lb_policy *lb_policy = NULL; grpc_lb_policy *old_lb_policy; grpc_mdstr_hash_table *method_params_table = NULL; @@ -236,7 +243,6 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, lb_policy_args.client_channel_factory = chand->client_channel_factory; // Find LB policy name. - const char *lb_policy_name = NULL; const grpc_arg *channel_arg = grpc_channel_args_find(lb_policy_args.args, GRPC_ARG_LB_POLICY_NAME); if (channel_arg != NULL) { @@ -286,10 +292,14 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, if (channel_arg != NULL) { GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); method_params_table = grpc_method_config_table_convert( - (grpc_method_config_table *)channel_arg->value.pointer.p, + exec_ctx, (grpc_method_config_table *)channel_arg->value.pointer.p, method_config_convert_value, &method_parameters_vtable); } - grpc_channel_args_destroy(chand->resolver_result); + // Before we clean up, save a copy of lb_policy_name, since it might + // be pointing to data inside chand->resolver_result. + // The copy will be saved in chand->lb_policy_name below. + lb_policy_name = gpr_strdup(lb_policy_name); + grpc_channel_args_destroy(exec_ctx, chand->resolver_result); chand->resolver_result = NULL; } @@ -299,10 +309,14 @@ static void on_resolver_result_changed(grpc_exec_ctx *exec_ctx, void *arg, } gpr_mu_lock(&chand->mu); + if (lb_policy_name != NULL) { + gpr_free(chand->lb_policy_name); + chand->lb_policy_name = lb_policy_name; + } old_lb_policy = chand->lb_policy; chand->lb_policy = lb_policy; if (chand->method_params_table != NULL) { - grpc_mdstr_hash_table_unref(chand->method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table); } chand->method_params_table = method_params_table; if (lb_policy != NULL) { @@ -426,6 +440,19 @@ static void cc_start_transport_op(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&chand->mu); } +static void cc_get_channel_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *info) { + channel_data *chand = elem->channel_data; + gpr_mu_lock(&chand->mu); + if (info->lb_policy_name != NULL) { + *info->lb_policy_name = chand->lb_policy_name == NULL + ? NULL + : gpr_strdup(chand->lb_policy_name); + } + gpr_mu_unlock(&chand->mu); +} + /* Constructor for channel_data */ static void cc_init_channel_elem(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, @@ -465,8 +492,9 @@ static void cc_destroy_channel_elem(grpc_exec_ctx *exec_ctx, chand->interested_parties); GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel"); } + gpr_free(chand->lb_policy_name); if (chand->method_params_table != NULL) { - grpc_mdstr_hash_table_unref(chand->method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, chand->method_params_table); } grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker); grpc_pollset_set_destroy(chand->interested_parties); @@ -906,8 +934,8 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&chand->mu); // If the method config table was present, use it. if (method_params_table != NULL) { - const method_parameters *method_params = - grpc_method_config_table_get(method_params_table, calld->path); + const method_parameters *method_params = grpc_method_config_table_get( + exec_ctx, method_params_table, calld->path); if (method_params != NULL) { const bool have_method_timeout = gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_TIMESPAN)) != 0; @@ -930,7 +958,7 @@ static void read_service_config(grpc_exec_ctx *exec_ctx, void *arg, gpr_mu_unlock(&calld->mu); } } - grpc_mdstr_hash_table_unref(method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, method_params_table); } } GRPC_CALL_STACK_UNREF(exec_ctx, calld->owning_call, "read_service_config"); @@ -971,8 +999,8 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_mdstr_hash_table *method_params_table = grpc_mdstr_hash_table_ref(chand->method_params_table); gpr_mu_unlock(&chand->mu); - method_parameters *method_params = - grpc_method_config_table_get(method_params_table, args->path); + method_parameters *method_params = grpc_method_config_table_get( + exec_ctx, method_params_table, args->path); if (method_params != NULL) { if (gpr_time_cmp(method_params->timeout, gpr_time_0(GPR_CLOCK_MONOTONIC)) != 0) { @@ -985,7 +1013,7 @@ static grpc_error *cc_init_call_elem(grpc_exec_ctx *exec_ctx, method_params->wait_for_ready; } } - grpc_mdstr_hash_table_unref(method_params_table); + grpc_mdstr_hash_table_unref(exec_ctx, method_params_table); } else { gpr_mu_unlock(&chand->mu); } @@ -1013,7 +1041,7 @@ static void cc_destroy_call_elem(grpc_exec_ctx *exec_ctx, void *and_free_memory) { call_data *calld = elem->call_data; grpc_deadline_state_destroy(exec_ctx, elem); - GRPC_MDSTR_UNREF(calld->path); + GRPC_MDSTR_UNREF(exec_ctx, calld->path); GRPC_ERROR_UNREF(calld->cancel_error); grpc_subchannel_call *call = GET_CALL(calld); if (call != NULL && call != CANCELLED_CALL) { @@ -1052,6 +1080,7 @@ const grpc_channel_filter grpc_client_channel_filter = { cc_init_channel_elem, cc_destroy_channel_elem, cc_get_peer, + cc_get_channel_info, "client-channel", }; diff --git a/src/core/ext/client_channel/client_channel_plugin.c b/src/core/ext/client_channel/client_channel_plugin.c index a3e5079843..988b7a1d5c 100644 --- a/src/core/ext/client_channel/client_channel_plugin.c +++ b/src/core/ext/client_channel/client_channel_plugin.c @@ -43,12 +43,14 @@ #include "src/core/ext/client_channel/subchannel_index.h" #include "src/core/lib/surface/channel_init.h" -static bool append_filter(grpc_channel_stack_builder *builder, void *arg) { +static bool append_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg) { return grpc_channel_stack_builder_append_filter( builder, (const grpc_channel_filter *)arg, NULL, NULL); } -static bool set_default_host_if_unset(grpc_channel_stack_builder *builder, +static bool set_default_host_if_unset(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *unused) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); @@ -66,9 +68,10 @@ static bool set_default_host_if_unset(grpc_channel_stack_builder *builder, arg.key = GRPC_ARG_DEFAULT_AUTHORITY; arg.value.string = default_authority; grpc_channel_args *new_args = grpc_channel_args_copy_and_add(args, &arg, 1); - grpc_channel_stack_builder_set_channel_arguments(builder, new_args); + grpc_channel_stack_builder_set_channel_arguments(exec_ctx, builder, + new_args); gpr_free(default_authority); - grpc_channel_args_destroy(new_args); + grpc_channel_args_destroy(exec_ctx, new_args); } return true; } diff --git a/src/core/ext/client_channel/http_connect_handshaker.c b/src/core/ext/client_channel/http_connect_handshaker.c index 6ce1953209..b1d3eb56a5 100644 --- a/src/core/ext/client_channel/http_connect_handshaker.c +++ b/src/core/ext/client_channel/http_connect_handshaker.c @@ -44,6 +44,7 @@ #include "src/core/lib/http/format_request.h" #include "src/core/lib/http/parser.h" #include "src/core/lib/iomgr/timer.h" +#include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/support/env.h" typedef struct http_connect_handshaker { @@ -72,7 +73,8 @@ typedef struct http_connect_handshaker { } http_connect_handshaker; // Unref and clean up handshaker. -static void http_connect_handshaker_unref(http_connect_handshaker* handshaker) { +static void http_connect_handshaker_unref(grpc_exec_ctx* exec_ctx, + http_connect_handshaker* handshaker) { if (gpr_unref(&handshaker->refcount)) { gpr_free(handshaker->proxy_server); gpr_free(handshaker->server_name); @@ -89,7 +91,7 @@ static void on_timeout(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { // Timer fired, rather than being cancelled. grpc_endpoint_shutdown(exec_ctx, handshaker->endpoint); } - http_connect_handshaker_unref(handshaker); + http_connect_handshaker_unref(exec_ctx, handshaker); } // Callback invoked when finished writing HTTP CONNECT request. @@ -159,7 +161,8 @@ static void on_read_done(grpc_exec_ctx* exec_ctx, void* arg, // complete (e.g., handling chunked transfer encoding or looking // at the Content-Length: header). if (handshaker->http_parser.state != GRPC_HTTP_BODY) { - grpc_slice_buffer_reset_and_unref_internal(exec_ctx, handshaker->read_buffer); + grpc_slice_buffer_reset_and_unref_internal(exec_ctx, + handshaker->read_buffer); grpc_endpoint_read(exec_ctx, handshaker->endpoint, handshaker->read_buffer, &handshaker->response_read_closure); return; @@ -186,7 +189,7 @@ done: static void http_connect_handshaker_destroy(grpc_exec_ctx* exec_ctx, grpc_handshaker* handshaker_in) { http_connect_handshaker* handshaker = (http_connect_handshaker*)handshaker_in; - http_connect_handshaker_unref(handshaker); + http_connect_handshaker_unref(exec_ctx, handshaker); } static void http_connect_handshaker_shutdown(grpc_exec_ctx* exec_ctx, diff --git a/src/core/ext/client_channel/lb_policy.h b/src/core/ext/client_channel/lb_policy.h index 54ad779792..120c641edc 100644 --- a/src/core/ext/client_channel/lb_policy.h +++ b/src/core/ext/client_channel/lb_policy.h @@ -109,10 +109,16 @@ struct grpc_lb_policy_vtable { /*#define GRPC_LB_POLICY_REFCOUNT_DEBUG*/ #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG + +/* Strong references: the policy will shutdown when they reach zero */ #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_UNREF(exec_ctx, p, r) \ grpc_lb_policy_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) + +/* Weak references: they don't prevent the shutdown of the LB policy. When no + * strong references are left but there are still weak ones, shutdown is called. + * Once the weak reference also reaches zero, the LB policy is destroyed. */ #define GRPC_LB_POLICY_WEAK_REF(p, r) \ grpc_lb_policy_weak_ref((p), __FILE__, __LINE__, (r)) #define GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, p, r) \ diff --git a/src/core/ext/client_channel/lb_policy_factory.c b/src/core/ext/client_channel/lb_policy_factory.c index 8a474c8818..7af9bb0411 100644 --- a/src/core/ext/client_channel/lb_policy_factory.c +++ b/src/core/ext/client_channel/lb_policy_factory.c @@ -112,11 +112,13 @@ int grpc_lb_addresses_cmp(const grpc_lb_addresses* addresses1, return 0; } -void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) { +void grpc_lb_addresses_destroy(grpc_exec_ctx* exec_ctx, + grpc_lb_addresses* addresses) { for (size_t i = 0; i < addresses->num_addresses; ++i) { gpr_free(addresses->addresses[i].balancer_name); if (addresses->addresses[i].user_data != NULL) { - addresses->user_data_vtable->destroy(addresses->addresses[i].user_data); + addresses->user_data_vtable->destroy(exec_ctx, + addresses->addresses[i].user_data); } } gpr_free(addresses->addresses); @@ -126,8 +128,8 @@ void grpc_lb_addresses_destroy(grpc_lb_addresses* addresses) { static void* lb_addresses_copy(void* addresses) { return grpc_lb_addresses_copy(addresses); } -static void lb_addresses_destroy(void* addresses) { - grpc_lb_addresses_destroy(addresses); +static void lb_addresses_destroy(grpc_exec_ctx* exec_ctx, void* addresses) { + grpc_lb_addresses_destroy(exec_ctx, addresses); } static int lb_addresses_cmp(void* addresses1, void* addresses2) { return grpc_lb_addresses_cmp(addresses1, addresses2); diff --git a/src/core/ext/client_channel/lb_policy_factory.h b/src/core/ext/client_channel/lb_policy_factory.h index e2b8080a32..ceee3efbc2 100644 --- a/src/core/ext/client_channel/lb_policy_factory.h +++ b/src/core/ext/client_channel/lb_policy_factory.h @@ -61,7 +61,7 @@ typedef struct grpc_lb_address { typedef struct grpc_lb_user_data_vtable { void *(*copy)(void *); - void (*destroy)(void *); + void (*destroy)(grpc_exec_ctx *exec_ctx, void *); int (*cmp)(void *, void *); } grpc_lb_user_data_vtable; @@ -93,7 +93,8 @@ int grpc_lb_addresses_cmp(const grpc_lb_addresses *addresses1, const grpc_lb_addresses *addresses2); /** Destroys \a addresses. */ -void grpc_lb_addresses_destroy(grpc_lb_addresses *addresses); +void grpc_lb_addresses_destroy(grpc_exec_ctx *exec_ctx, + grpc_lb_addresses *addresses); /** Returns a channel arg containing \a addresses. */ grpc_arg grpc_lb_addresses_create_channel_arg( diff --git a/src/core/ext/client_channel/resolver_factory.c b/src/core/ext/client_channel/resolver_factory.c index 7c3d644257..00bbb92dd0 100644 --- a/src/core/ext/client_channel/resolver_factory.c +++ b/src/core/ext/client_channel/resolver_factory.c @@ -43,9 +43,10 @@ void grpc_resolver_factory_unref(grpc_resolver_factory* factory) { /** Create a resolver instance for a name */ grpc_resolver* grpc_resolver_factory_create_resolver( - grpc_resolver_factory* factory, grpc_resolver_args* args) { + grpc_exec_ctx* exec_ctx, grpc_resolver_factory* factory, + grpc_resolver_args* args) { if (factory == NULL) return NULL; - return factory->vtable->create_resolver(factory, args); + return factory->vtable->create_resolver(exec_ctx, factory, args); } char* grpc_resolver_factory_get_default_authority( diff --git a/src/core/ext/client_channel/resolver_registry.c b/src/core/ext/client_channel/resolver_registry.c index d0f0fc3f33..9feba14e58 100644 --- a/src/core/ext/client_channel/resolver_registry.c +++ b/src/core/ext/client_channel/resolver_registry.c @@ -131,7 +131,7 @@ static grpc_resolver_factory *resolve_factory(const char *target, return factory; } -grpc_resolver *grpc_resolver_create(const char *target, +grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args) { grpc_uri *uri = NULL; grpc_resolver_factory *factory = resolve_factory(target, &uri); @@ -140,7 +140,8 @@ grpc_resolver *grpc_resolver_create(const char *target, memset(&resolver_args, 0, sizeof(resolver_args)); resolver_args.uri = uri; resolver_args.args = args; - resolver = grpc_resolver_factory_create_resolver(factory, &resolver_args); + resolver = + grpc_resolver_factory_create_resolver(exec_ctx, factory, &resolver_args); grpc_uri_destroy(uri); return resolver; } diff --git a/src/core/ext/client_channel/resolver_registry.h b/src/core/ext/client_channel/resolver_registry.h index 2a95a669f0..4f6aba0ba5 100644 --- a/src/core/ext/client_channel/resolver_registry.h +++ b/src/core/ext/client_channel/resolver_registry.h @@ -60,7 +60,7 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); If a resolver factory was not found, return NULL. \a args is a set of channel arguments to be included in the result (typically the set of arguments passed in from the client API). */ -grpc_resolver *grpc_resolver_create(const char *target, +grpc_resolver *grpc_resolver_create(grpc_exec_ctx *exec_ctx, const char *target, const grpc_channel_args *args); /** Find a resolver factory given a name and return an (owned-by-the-caller) diff --git a/src/core/ext/client_channel/subchannel.c b/src/core/ext/client_channel/subchannel.c index 2175d2094e..bcb3082267 100644 --- a/src/core/ext/client_channel/subchannel.c +++ b/src/core/ext/client_channel/subchannel.c @@ -205,7 +205,7 @@ static void subchannel_destroy(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { grpc_subchannel *c = arg; gpr_free((void *)c->filters); - grpc_channel_args_destroy(c->args); + grpc_channel_args_destroy(exec_ctx, c->args); gpr_free(c->addr); grpc_slice_unref_internal(exec_ctx, c->initial_connect_string); grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker); @@ -539,7 +539,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, /* construct channel stack */ grpc_channel_stack_builder *builder = grpc_channel_stack_builder_create(); grpc_channel_stack_builder_set_channel_arguments( - builder, c->connecting_result.channel_args); + exec_ctx, builder, c->connecting_result.channel_args); grpc_channel_stack_builder_set_transport(builder, c->connecting_result.transport); @@ -548,7 +548,7 @@ static void publish_transport_locked(grpc_exec_ctx *exec_ctx, con = grpc_channel_stack_builder_finish(exec_ctx, builder, 0, 1, connection_destroy, NULL); } else { - grpc_channel_stack_builder_destroy(builder); + grpc_channel_stack_builder_destroy(exec_ctx, builder); abort(); /* TODO(ctiller): what to do here (previously we just crashed) */ } stk = CHANNEL_STACK_FROM_CONNECTION(con); @@ -651,7 +651,7 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg, } gpr_mu_unlock(&c->mu); GRPC_SUBCHANNEL_WEAK_UNREF(exec_ctx, c, "connecting"); - grpc_channel_args_destroy(delete_channel_args); + grpc_channel_args_destroy(exec_ctx, delete_channel_args); } /* diff --git a/src/core/ext/client_channel/subchannel_index.c b/src/core/ext/client_channel/subchannel_index.c index 227013a7d7..25fdfcf3d4 100644 --- a/src/core/ext/client_channel/subchannel_index.c +++ b/src/core/ext/client_channel/subchannel_index.c @@ -131,7 +131,7 @@ void grpc_subchannel_key_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel_key *k) { grpc_connector_unref(exec_ctx, k->connector); gpr_free((grpc_channel_args *)k->args.filters); - grpc_channel_args_destroy((grpc_channel_args *)k->args.args); + grpc_channel_args_destroy(exec_ctx, (grpc_channel_args *)k->args.args); gpr_free((void *)k->args.server_name); gpr_free(k->args.addr); gpr_free(k); diff --git a/src/core/ext/client_channel/uri_parser.c b/src/core/ext/client_channel/uri_parser.c index bcb6a1dee4..f8c946b275 100644 --- a/src/core/ext/client_channel/uri_parser.c +++ b/src/core/ext/client_channel/uri_parser.c @@ -35,11 +35,11 @@ #include <string.h> +#include <grpc/slice.h> +#include <grpc/slice_buffer.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/port_platform.h> -#include <grpc/support/slice.h> -#include <grpc/support/slice_buffer.h> #include <grpc/support/string_util.h> #include "src/core/lib/support/string.h" @@ -137,7 +137,6 @@ static int parse_fragment_or_query(const char *uri_text, size_t *i) { return 1; } -static void do_nothing(void *ignored) {} static void parse_query_parts(grpc_uri *uri) { static const char *QUERY_PARTS_SEPARATOR = "&"; static const char *QUERY_PARTS_VALUE_SEPARATOR = "="; @@ -148,38 +147,32 @@ static void parse_query_parts(grpc_uri *uri) { uri->num_query_parts = 0; return; } - gpr_slice query_slice = - gpr_slice_new(uri->query, strlen(uri->query), do_nothing); - gpr_slice_buffer query_parts; /* the &-separated elements of the query */ - gpr_slice_buffer query_param_parts; /* the =-separated subelements */ - gpr_slice_buffer_init(&query_parts); - gpr_slice_buffer_init(&query_param_parts); - - gpr_slice_split(query_slice, QUERY_PARTS_SEPARATOR, &query_parts); - uri->query_parts = gpr_malloc(query_parts.count * sizeof(char *)); - uri->query_parts_values = gpr_malloc(query_parts.count * sizeof(char *)); - uri->num_query_parts = query_parts.count; - for (size_t i = 0; i < query_parts.count; i++) { - gpr_slice_split(query_parts.slices[i], QUERY_PARTS_VALUE_SEPARATOR, - &query_param_parts); - GPR_ASSERT(query_param_parts.count > 0); - uri->query_parts[i] = - gpr_dump_slice(query_param_parts.slices[0], GPR_DUMP_ASCII); - if (query_param_parts.count > 1) { + gpr_string_split(uri->query, QUERY_PARTS_SEPARATOR, &uri->query_parts, + &uri->num_query_parts); + uri->query_parts_values = gpr_malloc(uri->num_query_parts * sizeof(char **)); + for (size_t i = 0; i < uri->num_query_parts; i++) { + char **query_param_parts; + size_t num_query_param_parts; + char *full = uri->query_parts[i]; + gpr_string_split(full, QUERY_PARTS_VALUE_SEPARATOR, &query_param_parts, + &num_query_param_parts); + GPR_ASSERT(num_query_param_parts > 0); + uri->query_parts[i] = query_param_parts[0]; + if (num_query_param_parts > 1) { /* TODO(dgq): only the first value after the separator is considered. * Perhaps all chars after the first separator for the query part should * be included, even if they include the separator. */ - uri->query_parts_values[i] = - gpr_dump_slice(query_param_parts.slices[1], GPR_DUMP_ASCII); + uri->query_parts_values[i] = query_param_parts[1]; } else { uri->query_parts_values[i] = NULL; } - gpr_slice_buffer_reset_and_unref(&query_param_parts); + for (size_t j = 2; j < num_query_param_parts; j++) { + gpr_free(query_param_parts[j]); + } + gpr_free(query_param_parts); + gpr_free(full); } - gpr_slice_buffer_destroy(&query_parts); - gpr_slice_buffer_destroy(&query_param_parts); - gpr_slice_unref(query_slice); } grpc_uri *grpc_uri_parse(const char *uri_text, int suppress_errors) { diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 00c7468326..c81c0fb332 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -43,30 +43,23 @@ * policy to select from this list of LB server backends. * * The first time the policy gets a request for a pick, a ping, or to exit the - * idle state, \a query_for_backends() is called. It creates an instance of \a - * lb_client_data, an internal struct meant to contain the data associated with - * the internal communication with the LB server. This instance is created via - * \a lb_client_data_create(). There, the call over lb_channel to pick-first - * from {a1..an} is created, the \a LoadBalancingRequest message is assembled - * and all necessary callbacks for the progress of the internal call configured. + * idle state, \a query_for_backends_locked() is called. This function sets up + * and initiates the internal communication with the LB server. In particular, + * it's responsible for instantiating the internal *streaming* call to the LB + * server (whichever address from {a1..an} pick-first chose). This call is + * serviced by two callbacks, \a lb_on_server_status_received and \a + * lb_on_response_received. The former will be called when the call to the LB + * server completes. This can happen if the LB server closes the connection or + * if this policy itself cancels the call (for example because it's shutting + * down). If the internal call times out, the usual behavior of pick-first + * applies, continuing to pick from the list {a1..an}. * - * Back in \a query_for_backends(), the internal *streaming* call to the LB - * server (whichever address from {a1..an} pick-first chose) is kicked off. - * It'll progress over the callbacks configured in \a lb_client_data_create() - * (see the field docstrings of \a lb_client_data for more details). - * - * If the call fails with UNIMPLEMENTED, the original call will also fail. - * There's a misconfiguration somewhere: at least one of {a1..an} isn't a LB - * server, which contradicts the LB bit being set. If the internal call times - * out, the usual behavior of pick-first applies, continuing to pick from the - * list {a1..an}. - * - * Upon sucesss, a \a LoadBalancingResponse is expected in \a res_recv_cb. An - * invalid one results in the termination of the streaming call. A new streaming - * call should be created if possible, failing the original call otherwise. - * For a valid \a LoadBalancingResponse, the server list of actual backends is - * extracted. A Round Robin policy will be created from this list. There are two - * possible scenarios: + * Upon sucesss, the incoming \a LoadBalancingResponse is processed by \a + * res_recv. An invalid one results in the termination of the streaming call. A + * new streaming call should be created if possible, failing the original call + * otherwise. For a valid \a LoadBalancingResponse, the server list of actual + * backends is extracted. A Round Robin policy will be created from this list. + * There are two possible scenarios: * * 1. This is the first server list received. There was no previous instance of * the Round Robin policy. \a rr_handover_locked() will instantiate the RR @@ -84,10 +77,10 @@ * Once a RR policy instance is in place (and getting updated as described), * calls to for a pick, a ping or a cancellation will be serviced right away by * forwarding them to the RR instance. Any time there's no RR policy available - * (ie, right after the creation of the gRPCLB policy, if an empty serverlist - * is received, etc), pick/ping requests are added to a list of pending - * picks/pings to be flushed and serviced as part of \a rr_handover_locked() the - * moment the RR policy instance becomes available. + * (ie, right after the creation of the gRPCLB policy, if an empty serverlist is + * received, etc), pick/ping requests are added to a list of pending picks/pings + * to be flushed and serviced as part of \a rr_handover_locked() the moment the + * RR policy instance becomes available. * * \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the * high level design and details. */ @@ -120,14 +113,21 @@ #include "src/core/ext/lb_policy/grpclb/grpclb.h" #include "src/core/ext/lb_policy/grpclb/load_balancer_api.h" #include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/iomgr/sockaddr_utils.h" -#include "src/core/lib/slice/slice_internal.h" +#include "src/core/lib/iomgr/timer.h" #include "src/core/lib/slice/slice_string_helpers.h" +#include "src/core/lib/support/backoff.h" #include "src/core/lib/support/string.h" #include "src/core/lib/surface/call.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/static_metadata.h" +#define BACKOFF_MULTIPLIER 1.6 +#define BACKOFF_JITTER 0.2 +#define BACKOFF_MIN_SECONDS 10 +#define BACKOFF_MAX_SECONDS 60 + int grpc_lb_glb_trace = 0; /* add lb_token of selected subchannel (address) to the call's initial @@ -176,13 +176,12 @@ typedef struct wrapped_rr_closure_arg { static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { wrapped_rr_closure_arg *wc_arg = arg; - if (wc_arg->rr_policy != NULL) { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", - (intptr_t)wc_arg->rr_policy); - } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); + GPR_ASSERT(wc_arg->wrapped_closure != NULL); + grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), + NULL); + + if (wc_arg->rr_policy != NULL) { /* if target is NULL, no pick has been made by the RR policy (eg, all * addresses failed to connect). There won't be any user_data/token * available */ @@ -191,10 +190,12 @@ static void wrapped_rr_closure(grpc_exec_ctx *exec_ctx, void *arg, wc_arg->lb_token_mdelem_storage, GRPC_MDELEM_REF(wc_arg->lb_token)); } + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", + (intptr_t)wc_arg->rr_policy); + } + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "wrapped_rr_closure"); } - GPR_ASSERT(wc_arg->wrapped_closure != NULL); - grpc_exec_ctx_sched(exec_ctx, wc_arg->wrapped_closure, GRPC_ERROR_REF(error), - NULL); GPR_ASSERT(wc_arg->free_when_done != NULL); gpr_free(wc_arg->free_when_done); } @@ -266,7 +267,6 @@ static void add_pending_ping(pending_ping **root, grpc_closure *notify) { * glb_lb_policy */ typedef struct rr_connectivity_data rr_connectivity_data; -struct lb_client_data; static const grpc_lb_policy_vtable glb_lb_policy_vtable; typedef struct glb_lb_policy { /** base policy: must be first */ @@ -298,20 +298,47 @@ typedef struct glb_lb_policy { * response has arrived. */ grpc_grpclb_serverlist *serverlist; - /** addresses from \a serverlist */ - grpc_lb_addresses *addresses; - /** list of picks that are waiting on RR's policy connectivity */ pending_pick *pending_picks; /** list of pings that are waiting on RR's policy connectivity */ pending_ping *pending_pings; - /** client data associated with the LB server communication */ - struct lb_client_data *lb_client; + bool shutting_down; + + /************************************************************/ + /* client data associated with the LB server communication */ + /************************************************************/ + /* Status from the LB server has been received. This signals the end of the LB + * call. */ + grpc_closure lb_on_server_status_received; + + /* A response from the LB server has been received. Process it */ + grpc_closure lb_on_response_received; + + grpc_call *lb_call; /* streaming call to the LB server, */ + + grpc_metadata_array lb_initial_metadata_recv; /* initial MD from LB server */ + grpc_metadata_array + lb_trailing_metadata_recv; /* trailing MD from LB server */ + + /* what's being sent to the LB server. Note that its value may vary if the LB + * server indicates a redirect. */ + grpc_byte_buffer *lb_request_payload; + + /* response the LB server, if any. Processed in lb_on_response_received() */ + grpc_byte_buffer *lb_response_payload; + + /* call status code and details, set in lb_on_server_status_received() */ + grpc_status_code lb_call_status; + char *lb_call_status_details; + size_t lb_call_status_details_capacity; - /** for tracking of the RR connectivity */ - rr_connectivity_data *rr_connectivity; + /** LB call retry backoff state */ + gpr_backoff lb_call_backoff_state; + + /** LB call retry timer */ + grpc_timer lb_call_retry_timer; } glb_lb_policy; /* Keeps track and reacts to changes in connectivity of the RR instance */ @@ -349,8 +376,8 @@ static bool is_server_valid(const grpc_grpclb_server *server, size_t idx, static void *lb_token_copy(void *token) { return token == NULL ? NULL : GRPC_MDELEM_REF(token); } -static void lb_token_destroy(void *token) { - if (token != NULL) GRPC_MDELEM_UNREF(token); +static void lb_token_destroy(grpc_exec_ctx *exec_ctx, void *token) { + if (token != NULL) GRPC_MDELEM_UNREF(exec_ctx, token); } static int lb_token_cmp(void *token1, void *token2) { if (token1 > token2) return 1; @@ -360,9 +387,31 @@ static int lb_token_cmp(void *token1, void *token2) { static const grpc_lb_user_data_vtable lb_token_vtable = { lb_token_copy, lb_token_destroy, lb_token_cmp}; +static void parse_server(const grpc_grpclb_server *server, + grpc_resolved_address *addr) { + const uint16_t netorder_port = htons((uint16_t)server->port); + /* the addresses are given in binary format (a in(6)_addr struct) in + * server->ip_address.bytes. */ + const grpc_grpclb_ip_address *ip = &server->ip_address; + memset(addr, 0, sizeof(*addr)); + if (ip->size == 4) { + addr->len = sizeof(struct sockaddr_in); + struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr->addr; + addr4->sin_family = AF_INET; + memcpy(&addr4->sin_addr, ip->bytes, ip->size); + addr4->sin_port = netorder_port; + } else if (ip->size == 16) { + addr->len = sizeof(struct sockaddr_in6); + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr->addr; + addr6->sin6_family = AF_INET6; + memcpy(&addr6->sin6_addr, ip->bytes, ip->size); + addr6->sin6_port = netorder_port; + } +} + /* Returns addresses extracted from \a serverlist. */ static grpc_lb_addresses *process_serverlist( - const grpc_grpclb_serverlist *serverlist) { + grpc_exec_ctx *exec_ctx, const grpc_grpclb_serverlist *serverlist) { size_t num_valid = 0; /* first pass: count how many are valid in order to allocate the necessary * memory in a single block */ @@ -386,35 +435,20 @@ static grpc_lb_addresses *process_serverlist( if (!is_server_valid(serverlist->servers[sl_idx], sl_idx, false)) continue; /* address processing */ - const uint16_t netorder_port = htons((uint16_t)server->port); - /* the addresses are given in binary format (a in(6)_addr struct) in - * server->ip_address.bytes. */ - const grpc_grpclb_ip_address *ip = &server->ip_address; grpc_resolved_address addr; - memset(&addr, 0, sizeof(addr)); - if (ip->size == 4) { - addr.len = sizeof(struct sockaddr_in); - struct sockaddr_in *addr4 = (struct sockaddr_in *)&addr.addr; - addr4->sin_family = AF_INET; - memcpy(&addr4->sin_addr, ip->bytes, ip->size); - addr4->sin_port = netorder_port; - } else if (ip->size == 16) { - addr.len = sizeof(struct sockaddr_in6); - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)&addr.addr; - addr6->sin6_family = AF_INET; - memcpy(&addr6->sin6_addr, ip->bytes, ip->size); - addr6->sin6_port = netorder_port; - } + parse_server(server, &addr); /* lb token processing */ void *user_data; if (server->has_load_balance_token) { - const size_t lb_token_size = - GPR_ARRAY_SIZE(server->load_balance_token) - 1; + const size_t lb_token_max_length = + GPR_ARRAY_SIZE(server->load_balance_token); + const size_t lb_token_length = + strnlen(server->load_balance_token, lb_token_max_length); grpc_mdstr *lb_token_mdstr = grpc_mdstr_from_buffer( - (uint8_t *)server->load_balance_token, lb_token_size); - user_data = grpc_mdelem_from_metadata_strings(GRPC_MDSTR_LB_TOKEN, - lb_token_mdstr); + (uint8_t *)server->load_balance_token, lb_token_length); + user_data = grpc_mdelem_from_metadata_strings( + exec_ctx, GRPC_MDSTR_LB_TOKEN, lb_token_mdstr); } else { gpr_log(GPR_ERROR, "Missing LB token for backend address '%s'. The empty token will " @@ -429,7 +463,6 @@ static grpc_lb_addresses *process_serverlist( ++addr_idx; } GPR_ASSERT(addr_idx == num_valid); - return lb_addresses; } @@ -450,7 +483,7 @@ static bool pick_from_internal_rr_locked( gpr_log(GPR_INFO, "Unreffing RR (0x%" PRIxPTR ")", (intptr_t)wc_arg->rr_policy); } - GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick"); + GRPC_LB_POLICY_UNREF(exec_ctx, wc_arg->rr_policy, "glb_pick_sync"); /* add the load reporting initial metadata */ initial_metadata_add_lb_token(pick_args->initial_metadata, @@ -463,7 +496,6 @@ static bool pick_from_internal_rr_locked( * pending pick list inside the RR policy (glb_policy->rr_policy). * Eventually, wrapped_on_complete will be called, which will -among other * things- add the LB token to the call's initial metadata */ - return pick_done; } @@ -472,54 +504,70 @@ static grpc_lb_policy *create_rr_locked( glb_lb_policy *glb_policy) { GPR_ASSERT(serverlist != NULL && serverlist->num_servers > 0); - if (glb_policy->addresses != NULL) { - /* dispose of the previous version */ - grpc_lb_addresses_destroy(glb_policy->addresses); - } - glb_policy->addresses = process_serverlist(serverlist); - grpc_lb_policy_args args; memset(&args, 0, sizeof(args)); args.client_channel_factory = glb_policy->cc_factory; + grpc_lb_addresses *addresses = process_serverlist(exec_ctx, serverlist); // Replace the LB addresses in the channel args that we pass down to // the subchannel. static const char *keys_to_remove[] = {GRPC_ARG_LB_ADDRESSES}; - const grpc_arg arg = - grpc_lb_addresses_create_channel_arg(glb_policy->addresses); + const grpc_arg arg = grpc_lb_addresses_create_channel_arg(addresses); args.args = grpc_channel_args_copy_and_add_and_remove( glb_policy->args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), &arg, 1); grpc_lb_policy *rr = grpc_lb_policy_create(exec_ctx, "round_robin", &args); - grpc_channel_args_destroy(args.args); - + GPR_ASSERT(rr != NULL); + grpc_lb_addresses_destroy(exec_ctx, addresses); + grpc_channel_args_destroy(exec_ctx, args.args); return rr; } +static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +/* glb_policy->rr_policy may be NULL (initial handover) */ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy, grpc_error *error) { GPR_ASSERT(glb_policy->serverlist != NULL && glb_policy->serverlist->num_servers > 0); + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "RR handover. Old RR: %p", (void *)glb_policy->rr_policy); + } + if (glb_policy->rr_policy != NULL) { + /* if we are phasing out an existing RR instance, unref it. */ + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "rr_handover"); + } + glb_policy->rr_policy = create_rr_locked(exec_ctx, glb_policy->serverlist, glb_policy); - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "Created RR policy (0x%" PRIxPTR ")", - (intptr_t)glb_policy->rr_policy); + gpr_log(GPR_INFO, "Created RR policy (%p)", (void *)glb_policy->rr_policy); } + GPR_ASSERT(glb_policy->rr_policy != NULL); grpc_pollset_set_add_pollset_set(exec_ctx, glb_policy->rr_policy->interested_parties, glb_policy->base.interested_parties); - glb_policy->rr_connectivity->state = grpc_lb_policy_check_connectivity( + + rr_connectivity_data *rr_connectivity = + gpr_malloc(sizeof(rr_connectivity_data)); + memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); + grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, + rr_connectivity); + rr_connectivity->glb_policy = glb_policy; + rr_connectivity->state = grpc_lb_policy_check_connectivity( exec_ctx, glb_policy->rr_policy, &error); - grpc_lb_policy_notify_on_state_change( - exec_ctx, glb_policy->rr_policy, &glb_policy->rr_connectivity->state, - &glb_policy->rr_connectivity->on_change); + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - glb_policy->rr_connectivity->state, - GRPC_ERROR_REF(error), "rr_handover"); + rr_connectivity->state, GRPC_ERROR_REF(error), + "rr_handover"); + /* subscribe */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "rr_connectivity_cb"); + grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, + &rr_connectivity->state, + &rr_connectivity->on_change); grpc_lb_policy_exit_idle(exec_ctx, glb_policy->rr_policy); /* flush pending ops */ @@ -553,35 +601,27 @@ static void rr_handover_locked(grpc_exec_ctx *exec_ctx, static void glb_rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + /* If shutdown or error free the arg. Rely on the rest of the code to set the + * right grpclb status. */ rr_connectivity_data *rr_conn_data = arg; glb_lb_policy *glb_policy = rr_conn_data->glb_policy; - if (rr_conn_data->state == GRPC_CHANNEL_SHUTDOWN) { - if (glb_policy->serverlist != NULL) { - /* a RR policy is shutting down but there's a serverlist available -> - * perform a handover */ - gpr_mu_lock(&glb_policy->mu); - rr_handover_locked(exec_ctx, glb_policy, error); - gpr_mu_unlock(&glb_policy->mu); - } else { - /* shutting down and no new serverlist available. Bail out. */ - gpr_free(rr_conn_data); - } + if (rr_conn_data->state != GRPC_CHANNEL_SHUTDOWN && + !glb_policy->shutting_down) { + gpr_mu_lock(&glb_policy->mu); + /* RR not shutting down. Mimic the RR's policy state */ + grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, + rr_conn_data->state, GRPC_ERROR_REF(error), + "rr_connectivity_cb"); + /* resubscribe. Reuse the "rr_connectivity_cb" weak ref. */ + grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, + &rr_conn_data->state, + &rr_conn_data->on_change); + gpr_mu_unlock(&glb_policy->mu); } else { - if (error == GRPC_ERROR_NONE) { - gpr_mu_lock(&glb_policy->mu); - /* RR not shutting down. Mimic the RR's policy state */ - grpc_connectivity_state_set(exec_ctx, &glb_policy->state_tracker, - rr_conn_data->state, GRPC_ERROR_REF(error), - "glb_rr_connectivity_changed"); - /* resubscribe */ - grpc_lb_policy_notify_on_state_change(exec_ctx, glb_policy->rr_policy, - &rr_conn_data->state, - &rr_conn_data->on_change); - gpr_mu_unlock(&glb_policy->mu); - } else { /* error */ - gpr_free(rr_conn_data); - } + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "rr_connectivity_cb"); + gpr_free(rr_conn_data); } } @@ -671,7 +711,7 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, glb_policy->lb_channel = grpc_client_channel_factory_create_channel( exec_ctx, glb_policy->cc_factory, target_uri_str, GRPC_CLIENT_CHANNEL_TYPE_LOAD_BALANCING, new_args); - grpc_channel_args_destroy(new_args); + grpc_channel_args_destroy(exec_ctx, new_args); gpr_free(target_uri_str); for (size_t i = 0; i < num_grpclb_addrs; i++) { @@ -684,18 +724,11 @@ static grpc_lb_policy *glb_create(grpc_exec_ctx *exec_ctx, return NULL; } - rr_connectivity_data *rr_connectivity = - gpr_malloc(sizeof(rr_connectivity_data)); - memset(rr_connectivity, 0, sizeof(rr_connectivity_data)); - grpc_closure_init(&rr_connectivity->on_change, glb_rr_connectivity_changed, - rr_connectivity); - rr_connectivity->glb_policy = glb_policy; - glb_policy->rr_connectivity = rr_connectivity; - grpc_lb_policy_init(&glb_policy->base, &glb_lb_policy_vtable); gpr_mu_init(&glb_policy->mu); grpc_connectivity_state_init(&glb_policy->state_tracker, GRPC_CHANNEL_IDLE, "grpclb"); + return &glb_policy->base; } @@ -704,7 +737,7 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GPR_ASSERT(glb_policy->pending_picks == NULL); GPR_ASSERT(glb_policy->pending_pings == NULL); gpr_free((void *)glb_policy->server_name); - grpc_channel_args_destroy(glb_policy->args); + grpc_channel_args_destroy(exec_ctx, glb_policy->args); grpc_channel_destroy(glb_policy->lb_channel); glb_policy->lb_channel = NULL; grpc_connectivity_state_destroy(exec_ctx, &glb_policy->state_tracker); @@ -712,19 +745,30 @@ static void glb_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } gpr_mu_destroy(&glb_policy->mu); - grpc_lb_addresses_destroy(glb_policy->addresses); gpr_free(glb_policy); } -static void lb_client_data_destroy(struct lb_client_data *lb_client); static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); + glb_policy->shutting_down = true; pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; pending_ping *pping = glb_policy->pending_pings; glb_policy->pending_pings = NULL; + if (glb_policy->rr_policy) { + GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); + } + if (glb_policy->started_picking) { + if (glb_policy->lb_call != NULL) { + grpc_call_cancel(glb_policy->lb_call, NULL); + /* lb_on_server_status_received will pick up the cancel and clean up */ + } + } + grpc_connectivity_state_set( + exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, + GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown"); gpr_mu_unlock(&glb_policy->mu); while (pp != NULL) { @@ -741,21 +785,6 @@ static void glb_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { GRPC_ERROR_NONE, NULL); pping = next; } - - if (glb_policy->rr_policy) { - /* unsubscribe */ - grpc_lb_policy_notify_on_state_change( - exec_ctx, glb_policy->rr_policy, NULL, - &glb_policy->rr_connectivity->on_change); - GRPC_LB_POLICY_UNREF(exec_ctx, glb_policy->rr_policy, "glb_shutdown"); - } - - lb_client_data_destroy(glb_policy->lb_client); - glb_policy->lb_client = NULL; - - grpc_connectivity_state_set( - exec_ctx, &glb_policy->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE("Channel Shutdown"), "glb_shutdown"); } static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, @@ -782,17 +811,12 @@ static void glb_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static grpc_call *lb_client_data_get_call(struct lb_client_data *lb_client); static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error *error) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); - if (glb_policy->lb_client != NULL) { - /* cancel the call to the load balancer service, if any */ - grpc_call_cancel(lb_client_data_get_call(glb_policy->lb_client), NULL); - } pending_pick *pp = glb_policy->pending_picks; glb_policy->pending_picks = NULL; while (pp != NULL) { @@ -812,18 +836,20 @@ static void glb_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_UNREF(error); } -static void query_for_backends(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy); -static void start_picking(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { +static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy); +static void start_picking_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { glb_policy->started_picking = true; - query_for_backends(exec_ctx, glb_policy); + gpr_backoff_reset(&glb_policy->lb_call_backoff_state); + query_for_backends_locked(exec_ctx, glb_policy); } static void glb_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { glb_lb_policy *glb_policy = (glb_lb_policy *)pol; gpr_mu_lock(&glb_policy->mu); if (!glb_policy->started_picking) { - start_picking(exec_ctx, glb_policy); + start_picking_locked(exec_ctx, glb_policy); } gpr_mu_unlock(&glb_policy->mu); } @@ -849,8 +875,8 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, if (glb_policy->rr_policy != NULL) { if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, "about to PICK from 0x%" PRIxPTR "", - (intptr_t)glb_policy->rr_policy); + gpr_log(GPR_INFO, "grpclb %p about to PICK from RR %p", + (void *)glb_policy, (void *)glb_policy->rr_policy); } GRPC_LB_POLICY_REF(glb_policy->rr_policy, "glb_pick"); @@ -867,11 +893,17 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pick_done = pick_from_internal_rr_locked(exec_ctx, glb_policy->rr_policy, pick_args, target, wc_arg); } else { + if (grpc_lb_glb_trace) { + gpr_log(GPR_DEBUG, + "No RR policy in grpclb instance %p. Adding to grpclb's pending " + "picks", + (void *)(glb_policy)); + } add_pending_pick(&glb_policy->pending_picks, pick_args, target, on_complete); if (!glb_policy->started_picking) { - start_picking(exec_ctx, glb_policy); + start_picking_locked(exec_ctx, glb_policy); } pick_done = false; } @@ -900,7 +932,7 @@ static void glb_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, } else { add_pending_ping(&glb_policy->pending_pings, closure); if (!glb_policy->started_picking) { - start_picking(exec_ctx, glb_policy); + start_picking_locked(exec_ctx, glb_policy); } } gpr_mu_unlock(&glb_policy->mu); @@ -918,251 +950,182 @@ static void glb_notify_on_state_change(grpc_exec_ctx *exec_ctx, gpr_mu_unlock(&glb_policy->mu); } -/* - * lb_client_data - * - * Used internally for the client call to the LB */ -typedef struct lb_client_data { - gpr_mu mu; - - /* called once initial metadata's been sent */ - grpc_closure md_sent; - - /* called once the LoadBalanceRequest has been sent to the LB server. See - * src/proto/grpc/.../load_balancer.proto */ - grpc_closure req_sent; - - /* A response from the LB server has been received (or error). Process it */ - grpc_closure res_rcvd; - - /* After the client has sent a close to the LB server */ - grpc_closure close_sent; - - /* ... and the status from the LB server has been received */ - grpc_closure srv_status_rcvd; - - grpc_call *lb_call; /* streaming call to the LB server, */ - gpr_timespec deadline; /* for the streaming call to the LB server */ - - grpc_metadata_array initial_metadata_recv; /* initial MD from LB server */ - grpc_metadata_array trailing_metadata_recv; /* trailing MD from LB server */ - - /* what's being sent to the LB server. Note that its value may vary if the LB - * server indicates a redirect. */ - grpc_byte_buffer *request_payload; - - /* response from the LB server, if any. Processed in res_recv_cb() */ - grpc_byte_buffer *response_payload; - - /* the call's status and status detailset in srv_status_rcvd_cb() */ - grpc_status_code status; - char *status_details; - size_t status_details_capacity; - - /* pointer back to the enclosing policy */ - glb_lb_policy *glb_policy; -} lb_client_data; - -static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error); -static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); -static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error); - -static lb_client_data *lb_client_data_create(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error); +static void lb_call_init(grpc_exec_ctx *exec_ctx, glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->server_name != NULL); GPR_ASSERT(glb_policy->server_name[0] != '\0'); - lb_client_data *lb_client = gpr_malloc(sizeof(lb_client_data)); - memset(lb_client, 0, sizeof(lb_client_data)); - - gpr_mu_init(&lb_client->mu); - grpc_closure_init(&lb_client->md_sent, md_sent_cb, lb_client); - - grpc_closure_init(&lb_client->req_sent, req_sent_cb, lb_client); - grpc_closure_init(&lb_client->res_rcvd, res_recv_cb, lb_client); - grpc_closure_init(&lb_client->close_sent, close_sent_cb, lb_client); - grpc_closure_init(&lb_client->srv_status_rcvd, srv_status_rcvd_cb, lb_client); - - lb_client->deadline = glb_policy->deadline; - /* Note the following LB call progresses every time there's activity in \a * glb_policy->base.interested_parties, which is comprised of the polling * entities from \a client_channel. */ - lb_client->lb_call = grpc_channel_create_pollset_set_call( - glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, + glb_policy->lb_call = grpc_channel_create_pollset_set_call( + exec_ctx, glb_policy->lb_channel, NULL, GRPC_PROPAGATE_DEFAULTS, glb_policy->base.interested_parties, "/grpc.lb.v1.LoadBalancer/BalanceLoad", glb_policy->server_name, - lb_client->deadline, NULL); + glb_policy->deadline, NULL); - grpc_metadata_array_init(&lb_client->initial_metadata_recv); - grpc_metadata_array_init(&lb_client->trailing_metadata_recv); + grpc_metadata_array_init(&glb_policy->lb_initial_metadata_recv); + grpc_metadata_array_init(&glb_policy->lb_trailing_metadata_recv); grpc_grpclb_request *request = grpc_grpclb_request_create(glb_policy->server_name); grpc_slice request_payload_slice = grpc_grpclb_request_encode(request); - lb_client->request_payload = + glb_policy->lb_request_payload = grpc_raw_byte_buffer_create(&request_payload_slice, 1); - grpc_slice_unref_internal(exec_ctx, request_payload_slice); + grpc_slice_unref(request_payload_slice); grpc_grpclb_request_destroy(request); - lb_client->status_details = NULL; - lb_client->status_details_capacity = 0; - lb_client->glb_policy = glb_policy; - return lb_client; + glb_policy->lb_call_status_details = NULL; + glb_policy->lb_call_status_details_capacity = 0; + + grpc_closure_init(&glb_policy->lb_on_server_status_received, + lb_on_server_status_received, glb_policy); + grpc_closure_init(&glb_policy->lb_on_response_received, + lb_on_response_received, glb_policy); + + gpr_backoff_init(&glb_policy->lb_call_backoff_state, BACKOFF_MULTIPLIER, + BACKOFF_JITTER, BACKOFF_MIN_SECONDS * 1000, + BACKOFF_MAX_SECONDS * 1000); } -static void lb_client_data_destroy(lb_client_data *lb_client) { - grpc_call_destroy(lb_client->lb_call); - grpc_metadata_array_destroy(&lb_client->initial_metadata_recv); - grpc_metadata_array_destroy(&lb_client->trailing_metadata_recv); +static void lb_call_destroy_locked(glb_lb_policy *glb_policy) { + GPR_ASSERT(glb_policy->lb_call != NULL); + grpc_call_destroy(glb_policy->lb_call); + glb_policy->lb_call = NULL; - grpc_byte_buffer_destroy(lb_client->request_payload); + grpc_metadata_array_destroy(&glb_policy->lb_initial_metadata_recv); + grpc_metadata_array_destroy(&glb_policy->lb_trailing_metadata_recv); - gpr_free(lb_client->status_details); - gpr_mu_destroy(&lb_client->mu); - gpr_free(lb_client); -} -static grpc_call *lb_client_data_get_call(lb_client_data *lb_client) { - return lb_client->lb_call; + grpc_byte_buffer_destroy(glb_policy->lb_request_payload); + gpr_free(glb_policy->lb_call_status_details); } /* * Auxiliary functions and LB client callbacks. */ -static void query_for_backends(grpc_exec_ctx *exec_ctx, - glb_lb_policy *glb_policy) { +static void query_for_backends_locked(grpc_exec_ctx *exec_ctx, + glb_lb_policy *glb_policy) { GPR_ASSERT(glb_policy->lb_channel != NULL); + lb_call_init(exec_ctx, glb_policy); + + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Query for backends (grpclb: %p, lb_call: %p)", + (void *)glb_policy, (void *)glb_policy->lb_call); + } + GPR_ASSERT(glb_policy->lb_call != NULL); - glb_policy->lb_client = lb_client_data_create(exec_ctx, glb_policy); grpc_call_error call_error; - grpc_op ops[1]; + grpc_op ops[4]; memset(ops, 0, sizeof(ops)); + grpc_op *op = ops; op->op = GRPC_OP_SEND_INITIAL_METADATA; op->data.send_initial_metadata.count = 0; op->flags = 0; op->reserved = NULL; op++; - call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_client->md_sent); - GPR_ASSERT(GRPC_CALL_OK == call_error); - op = ops; - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = - &glb_policy->lb_client->trailing_metadata_recv; - op->data.recv_status_on_client.status = &glb_policy->lb_client->status; - op->data.recv_status_on_client.status_details = - &glb_policy->lb_client->status_details; - op->data.recv_status_on_client.status_details_capacity = - &glb_policy->lb_client->status_details_capacity; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata = &glb_policy->lb_initial_metadata_recv; op->flags = 0; op->reserved = NULL; op++; - call_error = grpc_call_start_batch_and_execute( - exec_ctx, glb_policy->lb_client->lb_call, ops, (size_t)(op - ops), - &glb_policy->lb_client->srv_status_rcvd); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void md_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - grpc_op ops[1]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; + GPR_ASSERT(glb_policy->lb_request_payload != NULL); op->op = GRPC_OP_SEND_MESSAGE; - op->data.send_message = lb_client->request_payload; + op->data.send_message = glb_policy->lb_request_payload; op->flags = 0; op->reserved = NULL; op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->req_sent); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - -static void req_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; - GPR_ASSERT(lb_client->lb_call); - grpc_op ops[2]; - memset(ops, 0, sizeof(ops)); - grpc_op *op = ops; - - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata = &lb_client->initial_metadata_recv; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = + &glb_policy->lb_trailing_metadata_recv; + op->data.recv_status_on_client.status = &glb_policy->lb_call_status; + op->data.recv_status_on_client.status_details = + &glb_policy->lb_call_status_details; + op->data.recv_status_on_client.status_details_capacity = + &glb_policy->lb_call_status_details_capacity; op->flags = 0; op->reserved = NULL; op++; + /* take a weak ref (won't prevent calling of \a glb_shutdown if the strong ref + * count goes to zero) to be unref'd in lb_on_server_status_received */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_server_status_received"); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_server_status_received); + GPR_ASSERT(GRPC_CALL_OK == call_error); + op = ops; op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &lb_client->response_payload; + op->data.recv_message = &glb_policy->lb_response_payload; op->flags = 0; op->reserved = NULL; op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->res_rcvd); + /* take another weak ref to be unref'd in lb_on_response_received */ + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "lb_on_response_received"); + call_error = grpc_call_start_batch_and_execute( + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_response_received); GPR_ASSERT(GRPC_CALL_OK == call_error); } -static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { - lb_client_data *lb_client = arg; +static void lb_on_response_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + grpc_op ops[2]; memset(ops, 0, sizeof(ops)); grpc_op *op = ops; - if (lb_client->response_payload != NULL) { + if (glb_policy->lb_response_payload != NULL) { + gpr_backoff_reset(&glb_policy->lb_call_backoff_state); /* Received data from the LB server. Look inside - * lb_client->response_payload, for a serverlist. */ + * glb_policy->lb_response_payload, for a serverlist. */ grpc_byte_buffer_reader bbr; - grpc_byte_buffer_reader_init(&bbr, lb_client->response_payload); + grpc_byte_buffer_reader_init(&bbr, glb_policy->lb_response_payload); grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr); - grpc_byte_buffer_destroy(lb_client->response_payload); + grpc_byte_buffer_destroy(glb_policy->lb_response_payload); grpc_grpclb_serverlist *serverlist = grpc_grpclb_response_parse_serverlist(response_slice); if (serverlist != NULL) { - grpc_slice_unref_internal(exec_ctx, response_slice); + GPR_ASSERT(glb_policy->lb_call != NULL); + grpc_slice_unref(response_slice); if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Serverlist with %lu servers received", (unsigned long)serverlist->num_servers); + for (size_t i = 0; i < serverlist->num_servers; ++i) { + grpc_resolved_address addr; + parse_server(serverlist->servers[i], &addr); + char *ipport; + grpc_sockaddr_to_string(&ipport, &addr, false); + gpr_log(GPR_INFO, "Serverlist[%lu]: %s", (unsigned long)i, ipport); + gpr_free(ipport); + } } /* update serverlist */ if (serverlist->num_servers > 0) { - gpr_mu_lock(&lb_client->glb_policy->mu); - if (grpc_grpclb_serverlist_equals(lb_client->glb_policy->serverlist, - serverlist)) { + gpr_mu_lock(&glb_policy->mu); + if (grpc_grpclb_serverlist_equals(glb_policy->serverlist, serverlist)) { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, "Incoming server list identical to current, ignoring."); } } else { /* new serverlist */ - if (lb_client->glb_policy->serverlist != NULL) { + if (glb_policy->serverlist != NULL) { /* dispose of the old serverlist */ - grpc_grpclb_destroy_serverlist(lb_client->glb_policy->serverlist); + grpc_grpclb_destroy_serverlist(glb_policy->serverlist); } /* and update the copy in the glb_lb_policy instance */ - lb_client->glb_policy->serverlist = serverlist; - } - if (lb_client->glb_policy->rr_policy == NULL) { - /* initial "handover", in this case from a null RR policy, meaning - * it'll just create the first RR policy instance */ - rr_handover_locked(exec_ctx, lb_client->glb_policy, error); - } else { - /* unref the RR policy, eventually leading to its substitution with a - * new one constructed from the received serverlist (see - * glb_rr_connectivity_changed) */ - GRPC_LB_POLICY_UNREF(exec_ctx, lb_client->glb_policy->rr_policy, - "serverlist_received"); + glb_policy->serverlist = serverlist; + + rr_handover_locked(exec_ctx, glb_policy, error); } - gpr_mu_unlock(&lb_client->glb_policy->mu); + gpr_mu_unlock(&glb_policy->mu); } else { if (grpc_lb_glb_trace) { gpr_log(GPR_INFO, @@ -1170,60 +1133,94 @@ static void res_recv_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { "response with > 0 servers is received"); } } + } else { /* serverlist == NULL */ + gpr_log(GPR_ERROR, "Invalid LB response received: '%s'. Ignoring.", + grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX)); + grpc_slice_unref(response_slice); + } + if (!glb_policy->shutting_down) { /* keep listening for serverlist updates */ op->op = GRPC_OP_RECV_MESSAGE; - op->data.recv_message = &lb_client->response_payload; + op->data.recv_message = &glb_policy->lb_response_payload; op->flags = 0; op->reserved = NULL; op++; + /* reuse the "lb_on_response_received" weak ref taken in + * query_for_backends_locked() */ const grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->res_rcvd); /* loop */ + exec_ctx, glb_policy->lb_call, ops, (size_t)(op - ops), + &glb_policy->lb_on_response_received); /* loop */ GPR_ASSERT(GRPC_CALL_OK == call_error); - return; } - - GPR_ASSERT(serverlist == NULL); - gpr_log(GPR_ERROR, "Invalid LB response received: '%s'", - grpc_dump_slice(response_slice, GPR_DUMP_ASCII)); - grpc_slice_unref_internal(exec_ctx, response_slice); - - /* Disconnect from server returning invalid response. */ - op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; - op->flags = 0; - op->reserved = NULL; - op++; - grpc_call_error call_error = grpc_call_start_batch_and_execute( - exec_ctx, lb_client->lb_call, ops, (size_t)(op - ops), - &lb_client->close_sent); - GPR_ASSERT(GRPC_CALL_OK == call_error); + } else { /* empty payload: call cancelled. */ + /* dispose of the "lb_on_response_received" weak ref taken in + * query_for_backends_locked() and reused in every reception loop */ + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_response_received_empty_payload"); } - /* empty payload: call cancelled by server. Cleanups happening in - * srv_status_rcvd_cb */ } -static void close_sent_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "Close from LB client sent. Waiting from server status now"); +static void lb_call_on_retry_timer(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + gpr_mu_lock(&glb_policy->mu); + + if (!glb_policy->shutting_down) { + if (grpc_lb_glb_trace) { + gpr_log(GPR_INFO, "Restaring call to LB server (grpclb %p)", + (void *)glb_policy); + } + GPR_ASSERT(glb_policy->lb_call == NULL); + query_for_backends_locked(exec_ctx, glb_policy); } + gpr_mu_unlock(&glb_policy->mu); + + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "grpclb_on_retry_timer"); } -static void srv_status_rcvd_cb(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - lb_client_data *lb_client = arg; +static void lb_on_server_status_received(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + glb_lb_policy *glb_policy = arg; + gpr_mu_lock(&glb_policy->mu); + + GPR_ASSERT(glb_policy->lb_call != NULL); + if (grpc_lb_glb_trace) { - gpr_log(GPR_INFO, - "status from lb server received. Status = %d, Details = '%s', " - "Capacity " - "= %lu", - lb_client->status, lb_client->status_details, - (unsigned long)lb_client->status_details_capacity); + gpr_log(GPR_DEBUG, + "Status from LB server received. Status = %d, Details = '%s', " + "(call: %p)", + glb_policy->lb_call_status, glb_policy->lb_call_status_details, + (void *)glb_policy->lb_call); } - /* TODO(dgq): deal with stream termination properly (fire up another one? - * fail the original call?) */ + + /* We need to performe cleanups no matter what. */ + lb_call_destroy_locked(glb_policy); + + if (!glb_policy->shutting_down) { + /* if we aren't shutting down, restart the LB client call after some time */ + gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); + gpr_timespec next_try = + gpr_backoff_step(&glb_policy->lb_call_backoff_state, now); + if (grpc_lb_glb_trace) { + gpr_log(GPR_DEBUG, "Connection to LB server lost (grpclb: %p)...", + (void *)glb_policy); + gpr_timespec timeout = gpr_time_sub(next_try, now); + if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { + gpr_log(GPR_DEBUG, "... retrying in %" PRId64 ".%09d seconds.", + timeout.tv_sec, timeout.tv_nsec); + } else { + gpr_log(GPR_DEBUG, "... retrying immediately."); + } + } + GRPC_LB_POLICY_WEAK_REF(&glb_policy->base, "grpclb_retry_timer"); + grpc_timer_init(exec_ctx, &glb_policy->lb_call_retry_timer, next_try, + lb_call_on_retry_timer, glb_policy, now); + } + gpr_mu_unlock(&glb_policy->mu); + GRPC_LB_POLICY_WEAK_UNREF(exec_ctx, &glb_policy->base, + "lb_on_server_status_received"); } /* Code wiring the policy with the rest of the core */ diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h index 53fed22bae..e36d0966f8 100644 --- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h +++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h @@ -77,7 +77,7 @@ typedef struct _grpc_lb_v1_Server { bool has_port; int32_t port; bool has_load_balance_token; - char load_balance_token[65]; + char load_balance_token[50]; bool has_drop_request; bool drop_request; /* @@protoc_insertion_point(struct:grpc_lb_v1_Server) */ @@ -172,7 +172,7 @@ extern const pb_field_t grpc_lb_v1_Server_fields[5]; #define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size) #define grpc_lb_v1_InitialLoadBalanceResponse_size 90 /* grpc_lb_v1_ServerList_size depends on runtime parameters */ -#define grpc_lb_v1_Server_size 98 +#define grpc_lb_v1_Server_size 83 /* Message IDs (where set with "msgid" option) */ #ifdef PB_MSGID diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c index 37a9b18b97..e101c0369c 100644 --- a/src/core/ext/lb_policy/round_robin/round_robin.c +++ b/src/core/ext/lb_policy/round_robin/round_robin.c @@ -120,6 +120,8 @@ typedef struct { grpc_connectivity_state connectivity_state; /** the subchannel's target user data */ void *user_data; + /** vtable to operate over \a user_data */ + const grpc_lb_user_data_vtable *user_data_vtable; } subchannel_data; struct round_robin_lb_policy { @@ -186,9 +188,13 @@ static void advance_last_picked_locked(round_robin_lb_policy *p) { } if (grpc_lb_round_robin_trace) { - gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)", - (void *)p->ready_list_last_pick, - (void *)p->ready_list_last_pick->subchannel); + gpr_log(GPR_DEBUG, + "[READYLIST, RR: %p] ADVANCED LAST PICK. NOW AT NODE %p (SC %p, " + "CSC %p)", + (void *)p, (void *)p->ready_list_last_pick, + (void *)p->ready_list_last_pick->subchannel, + (void *)grpc_subchannel_get_connected_subchannel( + p->ready_list_last_pick->subchannel)); } } @@ -255,9 +261,18 @@ static void remove_disconnected_sc_locked(round_robin_lb_policy *p, static void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { round_robin_lb_policy *p = (round_robin_lb_policy *)pol; ready_list *elem; + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Destroying Round Robin policy at %p", (void *)pol); + } + for (size_t i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; - GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin"); + GRPC_SUBCHANNEL_UNREF(exec_ctx, sd->subchannel, "round_robin_destroy"); + if (sd->user_data != NULL) { + GPR_ASSERT(sd->user_data_vtable != NULL); + sd->user_data_vtable->destroy(exec_ctx, sd->user_data); + } gpr_free(sd); } @@ -285,6 +300,9 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { size_t i; gpr_mu_lock(&p->mu); + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_DEBUG, "Shutting down Round Robin policy at %p", (void *)pol); + } p->shutdown = 1; while ((pp = p->pending_picks)) { @@ -296,7 +314,7 @@ static void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { } grpc_connectivity_state_set( exec_ctx, &p->state_tracker, GRPC_CHANNEL_SHUTDOWN, - GRPC_ERROR_CREATE("Channel Shutdown"), "shutdown"); + GRPC_ERROR_CREATE("Channel Shutdown"), "rr_shutdown"); for (i = 0; i < p->num_subchannels; i++) { subchannel_data *sd = p->subchannels[i]; grpc_subchannel_notify_on_state_change(exec_ctx, sd->subchannel, NULL, NULL, @@ -395,6 +413,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, pending_pick *pp; ready_list *selected; gpr_mu_lock(&p->mu); + + if (grpc_lb_round_robin_trace) { + gpr_log(GPR_INFO, "Round Robin %p trying to pick", (void *)pol); + } + if ((selected = peek_next_connected_locked(p))) { /* readily available, report right away */ *target = GRPC_CONNECTED_SUBCHANNEL_REF( @@ -435,7 +458,6 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, subchannel_data *sd = arg; round_robin_lb_policy *p = sd->policy; pending_pick *pp; - ready_list *selected; int unref = 0; @@ -456,12 +478,14 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemtively replicates rr_pick()'s actions. */ - selected = peek_next_connected_locked(p); + ready_list *selected = peek_next_connected_locked(p); + GPR_ASSERT(selected != NULL); if (p->pending_picks != NULL) { /* if the selected subchannel is going to be used for the pending * picks, update the last picked pointer */ advance_last_picked_locked(p); } + while ((pp = p->pending_picks)) { p->pending_picks = pp->next; @@ -585,6 +609,7 @@ static void rr_ping_one(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_subchannel_get_connected_subchannel(selected->subchannel), "picked"); grpc_connected_subchannel_ping(exec_ctx, target, closure); + GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, target, "picked"); } else { gpr_mu_unlock(&p->mu); grpc_exec_ctx_sched(exec_ctx, closure, @@ -653,7 +678,11 @@ static grpc_lb_policy *round_robin_create(grpc_exec_ctx *exec_ctx, sd->policy = p; sd->index = subchannel_idx; sd->subchannel = subchannel; - sd->user_data = addresses->addresses[i].user_data; + sd->user_data_vtable = addresses->user_data_vtable; + if (sd->user_data_vtable != NULL) { + sd->user_data = + sd->user_data_vtable->copy(addresses->addresses[i].user_data); + } ++subchannel_idx; grpc_closure_init(&sd->connectivity_changed_closure, rr_connectivity_changed, sd); diff --git a/src/core/ext/load_reporting/load_reporting.c b/src/core/ext/load_reporting/load_reporting.c index df1ea0ec9a..37b06a737f 100644 --- a/src/core/ext/load_reporting/load_reporting.c +++ b/src/core/ext/load_reporting/load_reporting.c @@ -53,7 +53,8 @@ static bool is_load_reporting_enabled(const grpc_channel_args *a) { return false; } -static bool maybe_add_load_reporting_filter(grpc_channel_stack_builder *builder, +static bool maybe_add_load_reporting_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c index eeae2400fb..2b08c4efec 100644 --- a/src/core/ext/load_reporting/load_reporting_filter.c +++ b/src/core/ext/load_reporting/load_reporting_filter.c @@ -92,8 +92,8 @@ static void on_initial_md_ready(grpc_exec_ctx *exec_ctx, void *user_data, recv_md_filter_args a; a.elem = elem; a.exec_ctx = exec_ctx; - grpc_metadata_batch_filter(calld->recv_initial_metadata, recv_md_filter, - &a); + grpc_metadata_batch_filter(exec_ctx, calld->recv_initial_metadata, + recv_md_filter, &a); if (calld->service_method == NULL) { err = grpc_error_add_child(err, GRPC_ERROR_CREATE("Missing :path header")); @@ -213,7 +213,7 @@ static void lr_start_transport_stream_op(grpc_exec_ctx *exec_ctx, calld->ops_recv_initial_metadata_ready = op->recv_initial_metadata_ready; op->recv_initial_metadata_ready = &calld->on_initial_md_ready; } else if (op->send_trailing_metadata) { - grpc_metadata_batch_filter(op->send_trailing_metadata, + grpc_metadata_batch_filter(exec_ctx, op->send_trailing_metadata, lr_trailing_md_filter, elem); } grpc_call_next_op(exec_ctx, elem, op); @@ -232,4 +232,5 @@ const grpc_channel_filter grpc_load_reporting_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "load_reporting"}; diff --git a/src/core/ext/resolver/dns/native/dns_resolver.c b/src/core/ext/resolver/dns/native/dns_resolver.c index 958b8af8b2..052bfd4bcc 100644 --- a/src/core/ext/resolver/dns/native/dns_resolver.c +++ b/src/core/ext/resolver/dns/native/dns_resolver.c @@ -179,7 +179,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, grpc_arg new_arg = grpc_lb_addresses_create_channel_arg(addresses); result = grpc_channel_args_copy_and_add(r->channel_args, &new_arg, 1); grpc_resolved_addresses_destroy(r->addresses); - grpc_lb_addresses_destroy(addresses); + grpc_lb_addresses_destroy(exec_ctx, addresses); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec next_try = gpr_backoff_step(&r->backoff_state, now); @@ -190,7 +190,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(!r->have_retry_timer); r->have_retry_timer = true; GRPC_RESOLVER_REF(&r->base, "retry-timer"); - if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) <= 0) { + if (gpr_time_cmp(timeout, gpr_time_0(timeout.clock_type)) > 0) { gpr_log(GPR_DEBUG, "retrying in %" PRId64 ".%09d seconds", timeout.tv_sec, timeout.tv_nsec); } else { @@ -200,7 +200,7 @@ static void dns_on_resolved(grpc_exec_ctx *exec_ctx, void *arg, now); } if (r->resolved_result != NULL) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } r->resolved_result = result; r->resolved_version++; @@ -237,11 +237,11 @@ static void dns_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { dns_resolver *r = (dns_resolver *)gr; gpr_mu_destroy(&r->mu); if (r->resolved_result != NULL) { - grpc_channel_args_destroy(r->resolved_result); + grpc_channel_args_destroy(exec_ctx, r->resolved_result); } gpr_free(r->name_to_resolve); gpr_free(r->default_port); - grpc_channel_args_destroy(r->channel_args); + grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); } @@ -283,7 +283,8 @@ static void dns_factory_ref(grpc_resolver_factory *factory) {} static void dns_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( - grpc_resolver_factory *factory, grpc_resolver_args *args) { + grpc_exec_ctx *exec_ctx, grpc_resolver_factory *factory, + grpc_resolver_args *args) { return dns_create(args, "https"); } diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index a3f13cd61f..55f8c071f2 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -132,8 +132,8 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); - grpc_lb_addresses_destroy(r->addresses); - grpc_channel_args_destroy(r->channel_args); + grpc_lb_addresses_destroy(exec_ctx, r->addresses); + grpc_channel_args_destroy(exec_ctx, r->channel_args); gpr_free(r); } @@ -193,7 +193,7 @@ static grpc_resolver *sockaddr_create(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_destroy_internal(exec_ctx, &path_parts); grpc_slice_unref_internal(exec_ctx, path_slice); if (errors_found) { - grpc_lb_addresses_destroy(addresses); + grpc_lb_addresses_destroy(exec_ctx, addresses); return NULL; } /* Instantiate resolver. */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create.c b/src/core/ext/transport/chttp2/client/insecure/channel_create.c index 8e03fd82c1..ad1dd29241 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create.c @@ -98,7 +98,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, grpc_error *error) { connector *c = user_data; if (error != GRPC_ERROR_NONE) { - grpc_channel_args_destroy(args); + grpc_channel_args_destroy(exec_ctx, args); gpr_free(read_buffer); } else { c->result->transport = @@ -197,7 +197,7 @@ static grpc_channel *client_channel_factory_create_channel( const grpc_channel_args *args) { grpc_channel *channel = grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL); - grpc_resolver *resolver = grpc_resolver_create(target, args); + grpc_resolver *resolver = grpc_resolver_create(exec_ctx, target, args); if (!resolver) { GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, channel, "client_channel_factory_create_channel"); diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c index 1e5b1c22e3..f1069e2c57 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.c @@ -74,7 +74,7 @@ grpc_channel *grpc_insecure_channel_create_from_fd( GPR_ASSERT(transport); grpc_channel *channel = grpc_channel_create( &exec_ctx, target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); - grpc_channel_args_destroy(final_args); + grpc_channel_args_destroy(&exec_ctx, final_args); grpc_chttp2_transport_start_reading(&exec_ctx, transport, NULL); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 22d7ab0d14..92e7fb17f9 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -91,7 +91,7 @@ static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) { connector *c = (connector *)con; if (gpr_unref(&c->refs)) { /* c->initial_string_buffer does not need to be destroyed */ - grpc_channel_args_destroy(c->tmp_args); + grpc_channel_args_destroy(exec_ctx, c->tmp_args); grpc_handshake_manager_destroy(exec_ctx, c->handshake_mgr); gpr_free(c); } @@ -240,7 +240,7 @@ static void client_channel_factory_unref( grpc_exec_ctx *exec_ctx, grpc_client_channel_factory *cc_factory) { client_channel_factory *f = (client_channel_factory *)cc_factory; if (gpr_unref(&f->refs)) { - GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, + GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, &f->security_connector->base, "client_channel_factory"); gpr_free(f); } @@ -276,7 +276,7 @@ static grpc_channel *client_channel_factory_create_channel( client_channel_factory *f = (client_channel_factory *)cc_factory; grpc_channel *channel = grpc_channel_create(exec_ctx, target, args, GRPC_CLIENT_CHANNEL, NULL); - grpc_resolver *resolver = grpc_resolver_create(target, args); + grpc_resolver *resolver = grpc_resolver_create(exec_ctx, target, args); if (resolver != NULL) { grpc_client_channel_finish_initialization( exec_ctx, grpc_channel_get_channel_stack(channel), resolver, &f->base); @@ -320,8 +320,8 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, grpc_channel_security_connector *security_connector; grpc_channel_args *new_args_from_connector; if (grpc_channel_credentials_create_security_connector( - creds, target, args, &security_connector, &new_args_from_connector) != - GRPC_SECURITY_OK) { + &exec_ctx, creds, target, args, &security_connector, + &new_args_from_connector) != GRPC_SECURITY_OK) { grpc_exec_ctx_finish(&exec_ctx); return grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, "Failed to create security connector."); @@ -332,7 +332,7 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, new_args_from_connector != NULL ? new_args_from_connector : args, &connector_arg, 1); if (new_args_from_connector != NULL) { - grpc_channel_args_destroy(new_args_from_connector); + grpc_channel_args_destroy(&exec_ctx, new_args_from_connector); } // Create client channel factory. client_channel_factory *f = gpr_malloc(sizeof(*f)); @@ -346,9 +346,9 @@ grpc_channel *grpc_secure_channel_create(grpc_channel_credentials *creds, grpc_channel *channel = client_channel_factory_create_channel( &exec_ctx, &f->base, target, GRPC_CLIENT_CHANNEL_TYPE_REGULAR, new_args); // Clean up. - GRPC_SECURITY_CONNECTOR_UNREF(&f->security_connector->base, - "client_channel_factory_create_channel"); - grpc_channel_args_destroy(new_args); + GRPC_SECURITY_CONNECTOR_UNREF(&exec_ctx, &f->security_connector->base, + "secure_client_channel_factory_create_channel"); + grpc_channel_args_destroy(&exec_ctx, new_args); grpc_client_channel_factory_unref(&exec_ctx, &f->base); grpc_exec_ctx_finish(&exec_ctx); return channel; /* may be NULL */ diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c index c18d618f96..ce7cd7586b 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2.c @@ -80,7 +80,7 @@ static void on_handshake_done(grpc_exec_ctx *exec_ctx, grpc_endpoint *endpoint, grpc_chttp2_transport_start_reading(exec_ctx, transport, read_buffer); } // Clean up. - grpc_channel_args_destroy(args); + grpc_channel_args_destroy(exec_ctx, args); grpc_handshake_manager_destroy(exec_ctx, state->handshake_mgr); gpr_free(state); } diff --git a/src/core/ext/transport/chttp2/transport/hpack_parser.c b/src/core/ext/transport/chttp2/transport/hpack_parser.c index f69fbd48a9..f4e69df12d 100644 --- a/src/core/ext/transport/chttp2/transport/hpack_parser.c +++ b/src/core/ext/transport/chttp2/transport/hpack_parser.c @@ -50,6 +50,7 @@ #include <grpc/support/useful.h> #include "src/core/ext/transport/chttp2/transport/bin_encoder.h" +#include "src/core/ext/transport/chttp2/transport/http2_errors.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" @@ -1580,6 +1581,20 @@ static const maybe_complete_func_type maybe_complete_funcs[] = { grpc_chttp2_maybe_complete_recv_initial_metadata, grpc_chttp2_maybe_complete_recv_trailing_metadata}; +static void force_client_rst_stream(grpc_exec_ctx *exec_ctx, void *sp, + grpc_error *error) { + grpc_chttp2_stream *s = sp; + grpc_chttp2_transport *t = s->t; + if (!s->write_closed) { + grpc_slice_buffer_add( + &t->qbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR, + &s->stats.outgoing)); + grpc_chttp2_initiate_write(exec_ctx, t, false, "force_rst_stream"); + grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, true, GRPC_ERROR_NONE); + } + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, s, "final_rst"); +} + grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, void *hpack_parser, grpc_chttp2_transport *t, @@ -1615,6 +1630,17 @@ grpc_error *grpc_chttp2_header_parser_parse(grpc_exec_ctx *exec_ctx, s->header_frames_received++; } if (parser->is_eof) { + if (t->is_client && !s->write_closed) { + /* server eof ==> complete closure; we may need to forcefully close + the stream. Wait until the combiner lock is ready to be released + however -- it might be that we receive a RST_STREAM following this + and can avoid the extra write */ + GRPC_CHTTP2_STREAM_REF(s, "final_rst"); + grpc_combiner_execute_finally( + exec_ctx, t->combiner, + grpc_closure_create(force_client_rst_stream, s), GRPC_ERROR_NONE, + false); + } grpc_chttp2_mark_stream_closed(exec_ctx, t, s, true, false, GRPC_ERROR_NONE); } diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c index c32e7e6277..947dff4cb3 100644 --- a/src/core/lib/channel/channel_stack.c +++ b/src/core/lib/channel/channel_stack.c @@ -255,6 +255,13 @@ char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, return next_elem->filter->get_peer(exec_ctx, next_elem); } +void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info) { + grpc_channel_element *next_elem = elem + 1; + next_elem->filter->get_channel_info(exec_ctx, next_elem, channel_info); +} + void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { grpc_channel_element *next_elem = elem + 1; diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h index 0d58994726..c3b662c969 100644 --- a/src/core/lib/channel/channel_stack.h +++ b/src/core/lib/channel/channel_stack.h @@ -156,6 +156,10 @@ typedef struct { /* Implement grpc_call_get_peer() */ char *(*get_peer)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); + /* Implement grpc_channel_get_info() */ + void (*get_channel_info)(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, + const grpc_channel_info *channel_info); + /* The name of this filter */ const char *name; } grpc_channel_filter; @@ -273,6 +277,10 @@ void grpc_channel_next_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op); /* Pass through a request to get_peer to the next child element */ char *grpc_call_next_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem); +/* Pass through a request to get_channel_info() to the next child element */ +void grpc_channel_next_get_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info); /* Given the top element of a channel stack, get the channel stack itself */ grpc_channel_stack *grpc_channel_stack_from_top_element( diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c index 9cb52627ce..dd496ee095 100644 --- a/src/core/lib/channel/compress_filter.c +++ b/src/core/lib/channel/compress_filter.c @@ -112,9 +112,13 @@ static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) { return md; } -static int skip_compression(grpc_call_element *elem) { +static int skip_compression(grpc_call_element *elem, uint32_t flags) { call_data *calld = elem->call_data; channel_data *channeld = elem->channel_data; + + if (flags & (GRPC_WRITE_NO_COMPRESS | GRPC_WRITE_INTERNAL_COMPRESS)) { + return 1; + } if (calld->has_compression_algorithm) { if (calld->compression_algorithm == GRPC_COMPRESS_NONE) { return 1; @@ -244,8 +248,8 @@ static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx, if (op->send_initial_metadata) { process_send_initial_metadata(exec_ctx, elem, op->send_initial_metadata); } - if (op->send_message != NULL && !skip_compression(elem) && - 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) { + if (op->send_message != NULL && + !skip_compression(elem, op->send_message->flags)) { calld->send_op = op; calld->send_length = op->send_message->length; calld->send_flags = op->send_message->flags; @@ -331,4 +335,5 @@ const grpc_channel_filter grpc_compress_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "compress"}; diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c index 0e62d58475..ba2c7fcb0a 100644 --- a/src/core/lib/channel/connected_channel.c +++ b/src/core/lib/channel/connected_channel.c @@ -134,6 +134,11 @@ static char *con_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { return grpc_transport_get_peer(exec_ctx, chand->transport); } +/* No-op. */ +static void con_get_channel_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info) {} + static const grpc_channel_filter connected_channel_filter = { con_start_transport_stream_op, con_start_transport_op, @@ -145,6 +150,7 @@ static const grpc_channel_filter connected_channel_filter = { init_channel_elem, destroy_channel_elem, con_get_peer, + con_get_channel_info, "connected", }; @@ -164,7 +170,8 @@ static void bind_transport(grpc_channel_stack *channel_stack, channel_stack->call_stack_size += grpc_transport_stream_size(t); } -bool grpc_add_connected_filter(grpc_channel_stack_builder *builder, +bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg_must_be_null) { GPR_ASSERT(arg_must_be_null == NULL); grpc_transport *t = grpc_channel_stack_builder_get_transport(builder); diff --git a/src/core/lib/channel/connected_channel.h b/src/core/lib/channel/connected_channel.h index 3142d647b7..3585c0ecbc 100644 --- a/src/core/lib/channel/connected_channel.h +++ b/src/core/lib/channel/connected_channel.h @@ -36,7 +36,8 @@ #include "src/core/lib/channel/channel_stack_builder.h" -bool grpc_add_connected_filter(grpc_channel_stack_builder *builder, +bool grpc_add_connected_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg_must_be_null); #endif /* GRPC_CORE_LIB_CHANNEL_CONNECTED_CHANNEL_H */ diff --git a/src/core/lib/channel/deadline_filter.c b/src/core/lib/channel/deadline_filter.c index 3b24e52ff4..0293c34237 100644 --- a/src/core/lib/channel/deadline_filter.c +++ b/src/core/lib/channel/deadline_filter.c @@ -317,6 +317,7 @@ const grpc_channel_filter grpc_client_deadline_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "deadline", }; @@ -331,5 +332,6 @@ const grpc_channel_filter grpc_server_deadline_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "deadline", }; diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c index 026e4d486e..63afa4bf09 100644 --- a/src/core/lib/channel/http_client_filter.c +++ b/src/core/lib/channel/http_client_filter.c @@ -449,4 +449,5 @@ const grpc_channel_filter grpc_http_client_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "http-client"}; diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c index d09a2b13ee..da31176ce9 100644 --- a/src/core/lib/channel/http_server_filter.c +++ b/src/core/lib/channel/http_server_filter.c @@ -163,7 +163,6 @@ static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) { /* Retrieve the payload from the value of the 'grpc-internal-payload-bin' header field */ calld->seen_payload_bin = 1; - grpc_slice_buffer_init(&calld->read_slice_buffer); grpc_slice_buffer_add(&calld->read_slice_buffer, grpc_slice_ref_internal(md->value->slice)); grpc_slice_buffer_stream_init(&calld->read_stream, @@ -316,13 +315,17 @@ static grpc_error *init_call_elem(grpc_exec_ctx *exec_ctx, grpc_closure_init(&calld->hs_on_recv, hs_on_recv, elem); grpc_closure_init(&calld->hs_on_complete, hs_on_complete, elem); grpc_closure_init(&calld->hs_recv_message_ready, hs_recv_message_ready, elem); + grpc_slice_buffer_init(&calld->read_slice_buffer); return GRPC_ERROR_NONE; } /* Destructor for call_data */ static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem, const grpc_call_final_info *final_info, - void *ignored) {} + void *ignored) { + call_data *calld = elem->call_data; + grpc_slice_buffer_destroy(&calld->read_slice_buffer); +} /* Constructor for channel_data */ static void init_channel_elem(grpc_exec_ctx *exec_ctx, @@ -346,4 +349,5 @@ const grpc_channel_filter grpc_http_server_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "http-server"}; diff --git a/src/core/lib/channel/message_size_filter.c b/src/core/lib/channel/message_size_filter.c index 50118b52fd..1655d843d5 100644 --- a/src/core/lib/channel/message_size_filter.c +++ b/src/core/lib/channel/message_size_filter.c @@ -250,4 +250,5 @@ const grpc_channel_filter grpc_message_size_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "message_size"}; diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c index b4bb7e3cf7..a5ca9ed2c3 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -46,7 +46,7 @@ static gpr_mu g_endpoint_mutex; void grpc_network_status_shutdown(void) { if (head != NULL) { gpr_log(GPR_ERROR, - "Memory leaked as all network endpoints were not shut down"); + "Memory leaked as not all network endpoints were shut down"); } gpr_mu_destroy(&g_endpoint_mutex); } diff --git a/src/core/lib/iomgr/resource_quota.c b/src/core/lib/iomgr/resource_quota.c index 0d6665de51..efcb0f8174 100644 --- a/src/core/lib/iomgr/resource_quota.c +++ b/src/core/lib/iomgr/resource_quota.c @@ -44,6 +44,81 @@ int grpc_resource_quota_trace = 0; +/* Internal linked list pointers for a resource user */ +typedef struct { + grpc_resource_user *next; + grpc_resource_user *prev; +} grpc_resource_user_link; + +/* Resource users are kept in (potentially) several intrusive linked lists + at once. These are the list names. */ +typedef enum { + /* Resource users that are waiting for an allocation */ + GRPC_RULIST_AWAITING_ALLOCATION, + /* Resource users that have free memory available for internal reclamation */ + GRPC_RULIST_NON_EMPTY_FREE_POOL, + /* Resource users that have published a benign reclamation is available */ + GRPC_RULIST_RECLAIMER_BENIGN, + /* Resource users that have published a destructive reclamation is + available */ + GRPC_RULIST_RECLAIMER_DESTRUCTIVE, + /* Number of lists: must be last */ + GRPC_RULIST_COUNT +} grpc_rulist; + +struct grpc_resource_user { + /* The quota this resource user consumes from */ + grpc_resource_quota *resource_quota; + + /* Closure to schedule an allocation under the resource quota combiner lock */ + grpc_closure allocate_closure; + /* Closure to publish a non empty free pool under the resource quota combiner + lock */ + grpc_closure add_to_free_pool_closure; + + /* one ref for each ref call (released by grpc_resource_user_unref), and one + ref for each byte allocated (released by grpc_resource_user_free) */ + gpr_atm refs; + /* is this resource user unlocked? starts at 0, increases for each shutdown + call */ + gpr_atm shutdown; + + gpr_mu mu; + /* The amount of memory (in bytes) this user has cached for its own use: to + avoid quota contention, each resource user can keep some memory in + addition to what it is immediately using (e.g., for caching), and the quota + can pull it back under memory pressure. + This value can become negative if more memory has been requested than + existed in the free pool, at which point the quota is consulted to bring + this value non-negative (asynchronously). */ + int64_t free_pool; + /* A list of closures to call once free_pool becomes non-negative - ie when + all outstanding allocations have been granted. */ + grpc_closure_list on_allocated; + /* True if we are currently trying to allocate from the quota, false if not */ + bool allocating; + /* True if we are currently trying to add ourselves to the non-free quota + list, false otherwise */ + bool added_to_free_pool; + + /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer + */ + grpc_closure *reclaimers[2]; + /* Trampoline closures to finish reclamation and re-enter the quota combiner + lock */ + grpc_closure post_reclaimer_closure[2]; + + /* Closure to execute under the quota combiner to de-register and shutdown the + resource user */ + grpc_closure destroy_closure; + + /* Links in the various grpc_rulist lists */ + grpc_resource_user_link links[GRPC_RULIST_COUNT]; + + /* The name of this resource user, for debugging/tracing */ + char *name; +}; + struct grpc_resource_quota { /* refcount */ gpr_refcount refs; @@ -362,9 +437,19 @@ static void ru_post_destructive_reclaimer(grpc_exec_ctx *exec_ctx, void *ru, rulist_add_tail(resource_user, GRPC_RULIST_RECLAIMER_DESTRUCTIVE); } +static void ru_shutdown(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { + grpc_resource_user *resource_user = ru; + grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[0], + GRPC_ERROR_CANCELLED, NULL); + grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], + GRPC_ERROR_CANCELLED, NULL); + resource_user->reclaimers[0] = NULL; + resource_user->reclaimers[1] = NULL; +} + static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { grpc_resource_user *resource_user = ru; - GPR_ASSERT(resource_user->allocated == 0); + GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->refs) == 0); for (int i = 0; i < GRPC_RULIST_COUNT; i++) { rulist_remove(resource_user, (grpc_rulist)i); } @@ -372,13 +457,14 @@ static void ru_destroy(grpc_exec_ctx *exec_ctx, void *ru, grpc_error *error) { GRPC_ERROR_CANCELLED, NULL); grpc_exec_ctx_sched(exec_ctx, resource_user->reclaimers[1], GRPC_ERROR_CANCELLED, NULL); - grpc_exec_ctx_sched(exec_ctx, (grpc_closure *)gpr_atm_no_barrier_load( - &resource_user->on_done_destroy_closure), - GRPC_ERROR_NONE, NULL); if (resource_user->free_pool != 0) { resource_user->resource_quota->free_pool += resource_user->free_pool; rq_step_sched(exec_ctx, resource_user->resource_quota); } + grpc_resource_quota_unref_internal(exec_ctx, resource_user->resource_quota); + gpr_mu_destroy(&resource_user->mu); + gpr_free(resource_user->name); + gpr_free(resource_user); } static void ru_allocated_slices(grpc_exec_ctx *exec_ctx, void *arg, @@ -530,9 +616,9 @@ const grpc_arg_pointer_vtable *grpc_resource_quota_arg_vtable(void) { * grpc_resource_user api */ -void grpc_resource_user_init(grpc_resource_user *resource_user, - grpc_resource_quota *resource_quota, - const char *name) { +grpc_resource_user *grpc_resource_user_create( + grpc_resource_quota *resource_quota, const char *name) { + grpc_resource_user *resource_user = gpr_malloc(sizeof(*resource_user)); resource_user->resource_quota = grpc_resource_quota_ref_internal(resource_quota); grpc_closure_init(&resource_user->allocate_closure, &ru_allocate, @@ -546,12 +632,12 @@ void grpc_resource_user_init(grpc_resource_user *resource_user, grpc_closure_init(&resource_user->destroy_closure, &ru_destroy, resource_user); gpr_mu_init(&resource_user->mu); - resource_user->allocated = 0; + gpr_atm_rel_store(&resource_user->refs, 1); + gpr_atm_rel_store(&resource_user->shutdown, 0); resource_user->free_pool = 0; grpc_closure_list_init(&resource_user->on_allocated); resource_user->allocating = false; resource_user->added_to_free_pool = false; - gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, 0); resource_user->reclaimers[0] = NULL; resource_user->reclaimers[1] = NULL; for (int i = 0; i < GRPC_RULIST_COUNT; i++) { @@ -563,56 +649,54 @@ void grpc_resource_user_init(grpc_resource_user *resource_user, gpr_asprintf(&resource_user->name, "anonymous_resource_user_%" PRIxPTR, (intptr_t)resource_user); } + return resource_user; } -void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user, - grpc_closure *on_done) { - gpr_mu_lock(&resource_user->mu); - GPR_ASSERT(gpr_atm_no_barrier_load(&resource_user->on_done_destroy_closure) == - 0); - gpr_atm_no_barrier_store(&resource_user->on_done_destroy_closure, - (gpr_atm)on_done); - if (resource_user->allocated == 0) { +static void ru_ref_by(grpc_resource_user *resource_user, gpr_atm amount) { + GPR_ASSERT(amount > 0); + GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&resource_user->refs, amount) != 0); +} + +static void ru_unref_by(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, gpr_atm amount) { + GPR_ASSERT(amount > 0); + gpr_atm old = gpr_atm_full_fetch_add(&resource_user->refs, -amount); + GPR_ASSERT(old >= amount); + if (old == amount) { grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, &resource_user->destroy_closure, GRPC_ERROR_NONE, false); } - gpr_mu_unlock(&resource_user->mu); } -void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user) { - grpc_resource_quota_unref_internal(exec_ctx, resource_user->resource_quota); - gpr_mu_destroy(&resource_user->mu); - gpr_free(resource_user->name); +void grpc_resource_user_ref(grpc_resource_user *resource_user) { + ru_ref_by(resource_user, 1); +} + +void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { + ru_unref_by(exec_ctx, resource_user, 1); +} + +void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user) { + if (gpr_atm_full_fetch_add(&resource_user->shutdown, 1) == 0) { + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + grpc_closure_create(ru_shutdown, resource_user), + GRPC_ERROR_NONE, false); + } } void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, size_t size, grpc_closure *optional_on_done) { gpr_mu_lock(&resource_user->mu); - grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( - &resource_user->on_done_destroy_closure); - if (on_done_destroy != NULL) { - /* already shutdown */ - if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR " after shutdown", - resource_user->resource_quota->name, resource_user->name, size); - } - grpc_exec_ctx_sched( - exec_ctx, optional_on_done, - GRPC_ERROR_CREATE("Buffer pool user is already shutdown"), NULL); - gpr_mu_unlock(&resource_user->mu); - return; - } - resource_user->allocated += (int64_t)size; + ru_ref_by(resource_user, (gpr_atm)size); resource_user->free_pool -= (int64_t)size; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; allocated -> %" PRId64 - ", free_pool -> %" PRId64, + gpr_log(GPR_DEBUG, "RQ %s %s: alloc %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, - resource_user->allocated, resource_user->free_pool); + resource_user->free_pool); } if (resource_user->free_pool < 0) { grpc_closure_list_append(&resource_user->on_allocated, optional_on_done, @@ -632,15 +716,12 @@ void grpc_resource_user_alloc(grpc_exec_ctx *exec_ctx, void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, size_t size) { gpr_mu_lock(&resource_user->mu); - GPR_ASSERT(resource_user->allocated >= (int64_t)size); bool was_zero_or_negative = resource_user->free_pool <= 0; resource_user->free_pool += (int64_t)size; - resource_user->allocated -= (int64_t)size; if (grpc_resource_quota_trace) { - gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; allocated -> %" PRId64 - ", free_pool -> %" PRId64, + gpr_log(GPR_DEBUG, "RQ %s %s: free %" PRIdPTR "; free_pool -> %" PRId64, resource_user->resource_quota->name, resource_user->name, size, - resource_user->allocated, resource_user->free_pool); + resource_user->free_pool); } bool is_bigger_than_zero = resource_user->free_pool > 0; if (is_bigger_than_zero && was_zero_or_negative && @@ -650,29 +731,23 @@ void grpc_resource_user_free(grpc_exec_ctx *exec_ctx, &resource_user->add_to_free_pool_closure, GRPC_ERROR_NONE, false); } - grpc_closure *on_done_destroy = (grpc_closure *)gpr_atm_no_barrier_load( - &resource_user->on_done_destroy_closure); - if (on_done_destroy != NULL && resource_user->allocated == 0) { - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->destroy_closure, GRPC_ERROR_NONE, - false); - } gpr_mu_unlock(&resource_user->mu); + ru_unref_by(exec_ctx, resource_user, (gpr_atm)size); } void grpc_resource_user_post_reclaimer(grpc_exec_ctx *exec_ctx, grpc_resource_user *resource_user, bool destructive, grpc_closure *closure) { - if (gpr_atm_acq_load(&resource_user->on_done_destroy_closure) == 0) { - GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); - resource_user->reclaimers[destructive] = closure; - grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, - &resource_user->post_reclaimer_closure[destructive], - GRPC_ERROR_NONE, false); - } else { + GPR_ASSERT(resource_user->reclaimers[destructive] == NULL); + if (gpr_atm_acq_load(&resource_user->shutdown) > 0) { grpc_exec_ctx_sched(exec_ctx, closure, GRPC_ERROR_CANCELLED, NULL); + return; } + resource_user->reclaimers[destructive] = closure; + grpc_combiner_execute(exec_ctx, resource_user->resource_quota->combiner, + &resource_user->post_reclaimer_closure[destructive], + GRPC_ERROR_NONE, false); } void grpc_resource_user_finish_reclamation(grpc_exec_ctx *exec_ctx, @@ -706,3 +781,10 @@ void grpc_resource_user_alloc_slices( grpc_resource_user_alloc(exec_ctx, slice_allocator->resource_user, count * length, &slice_allocator->on_allocated); } + +grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + size_t size) { + grpc_resource_user_alloc(exec_ctx, resource_user, size, NULL); + return ru_slice_create(resource_user, size); +} diff --git a/src/core/lib/iomgr/resource_quota.h b/src/core/lib/iomgr/resource_quota.h index f1da73933e..ef286c2fce 100644 --- a/src/core/lib/iomgr/resource_quota.h +++ b/src/core/lib/iomgr/resource_quota.h @@ -84,91 +84,15 @@ void grpc_resource_quota_unref_internal(grpc_exec_ctx *exec_ctx, grpc_resource_quota *grpc_resource_quota_from_channel_args( const grpc_channel_args *channel_args); -/* Resource users are kept in (potentially) several intrusive linked lists - at once. These are the list names. */ -typedef enum { - /* Resource users that are waiting for an allocation */ - GRPC_RULIST_AWAITING_ALLOCATION, - /* Resource users that have free memory available for internal reclamation */ - GRPC_RULIST_NON_EMPTY_FREE_POOL, - /* Resource users that have published a benign reclamation is available */ - GRPC_RULIST_RECLAIMER_BENIGN, - /* Resource users that have published a destructive reclamation is - available */ - GRPC_RULIST_RECLAIMER_DESTRUCTIVE, - /* Number of lists: must be last */ - GRPC_RULIST_COUNT -} grpc_rulist; - typedef struct grpc_resource_user grpc_resource_user; -/* Internal linked list pointers for a resource user */ -typedef struct { - grpc_resource_user *next; - grpc_resource_user *prev; -} grpc_resource_user_link; - -struct grpc_resource_user { - /* The quota this resource user consumes from */ - grpc_resource_quota *resource_quota; - - /* Closure to schedule an allocation under the resource quota combiner lock */ - grpc_closure allocate_closure; - /* Closure to publish a non empty free pool under the resource quota combiner - lock */ - grpc_closure add_to_free_pool_closure; - - gpr_mu mu; - /* Total allocated memory outstanding by this resource user in bytes; - always positive */ - int64_t allocated; - /* The amount of memory (in bytes) this user has cached for its own use: to - avoid quota contention, each resource user can keep some memory in - addition to what it is immediately using (e.g., for caching), and the quota - can pull it back under memory pressure. - This value can become negative if more memory has been requested than - existed in the free pool, at which point the quota is consulted to bring - this value non-negative (asynchronously). */ - int64_t free_pool; - /* A list of closures to call once free_pool becomes non-negative - ie when - all outstanding allocations have been granted. */ - grpc_closure_list on_allocated; - /* True if we are currently trying to allocate from the quota, false if not */ - bool allocating; - /* True if we are currently trying to add ourselves to the non-free quota - list, false otherwise */ - bool added_to_free_pool; - - /* Reclaimers: index 0 is the benign reclaimer, 1 is the destructive reclaimer - */ - grpc_closure *reclaimers[2]; - /* Trampoline closures to finish reclamation and re-enter the quota combiner - lock */ - grpc_closure post_reclaimer_closure[2]; - - /* Closure to execute under the quota combiner to de-register and shutdown the - resource user */ - grpc_closure destroy_closure; - /* User supplied closure to call once the user has finished shutting down AND - all outstanding allocations have been freed. Real type is grpc_closure*, - but it's stored as an atomic to avoid a mutex on some fast paths. */ - gpr_atm on_done_destroy_closure; - - /* Links in the various grpc_rulist lists */ - grpc_resource_user_link links[GRPC_RULIST_COUNT]; - - /* The name of this resource user, for debugging/tracing */ - char *name; -}; - -void grpc_resource_user_init(grpc_resource_user *resource_user, - grpc_resource_quota *resource_quota, - const char *name); +grpc_resource_user *grpc_resource_user_create( + grpc_resource_quota *resource_quota, const char *name); +void grpc_resource_user_ref(grpc_resource_user *resource_user); +void grpc_resource_user_unref(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user); void grpc_resource_user_shutdown(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user, - grpc_closure *on_done); -void grpc_resource_user_destroy(grpc_exec_ctx *exec_ctx, - grpc_resource_user *resource_user); + grpc_resource_user *resource_user); /* Allocate from the resource user (and its quota). If optional_on_done is NULL, then allocate immediately. This may push the @@ -221,4 +145,9 @@ void grpc_resource_user_alloc_slices( grpc_resource_user_slice_allocator *slice_allocator, size_t length, size_t count, grpc_slice_buffer *dest); +/* Allocate one slice of length \a size synchronously. */ +grpc_slice grpc_resource_user_slice_malloc(grpc_exec_ctx *exec_ctx, + grpc_resource_user *resource_user, + size_t size); + #endif /* GRPC_CORE_LIB_IOMGR_RESOURCE_QUOTA_H */ diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c index 6274667042..b07f9ceffa 100644 --- a/src/core/lib/iomgr/tcp_client_uv.c +++ b/src/core/lib/iomgr/tcp_client_uv.c @@ -54,9 +54,12 @@ typedef struct grpc_uv_tcp_connect { grpc_endpoint **endpoint; int refs; char *addr_name; + grpc_resource_quota *resource_quota; } grpc_uv_tcp_connect; -static void uv_tcp_connect_cleanup(grpc_uv_tcp_connect *connect) { +static void uv_tcp_connect_cleanup(grpc_exec_ctx *exec_ctx, + grpc_uv_tcp_connect *connect) { + grpc_resource_quota_internal_unref(exec_ctx, connect->resource_quota); gpr_free(connect); } @@ -74,7 +77,7 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp, } done = (--connect->refs == 0); if (done) { - uv_tcp_connect_cleanup(connect); + uv_tcp_connect_cleanup(exec_ctx, connect); } } @@ -86,8 +89,8 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { grpc_closure *closure = connect->closure; grpc_timer_cancel(&exec_ctx, &connect->alarm); if (status == 0) { - *connect->endpoint = - grpc_tcp_create(connect->tcp_handle, connect->addr_name); + *connect->endpoint = grpc_tcp_create( + connect->tcp_handle, connect->resource_quota, connect->addr_name); } else { error = GRPC_ERROR_CREATE("Failed to connect to remote host"); error = grpc_error_set_int(error, GRPC_ERROR_INT_ERRNO, -status); @@ -105,7 +108,7 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { } done = (--connect->refs == 0); if (done) { - uv_tcp_connect_cleanup(connect); + uv_tcp_connect_cleanup(&exec_ctx, connect); } grpc_exec_ctx_sched(&exec_ctx, closure, error, NULL); grpc_exec_ctx_finish(&exec_ctx); @@ -114,16 +117,31 @@ static void uv_tc_on_connect(uv_connect_t *req, int status) { static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, const grpc_resolved_address *resolved_addr, gpr_timespec deadline) { grpc_uv_tcp_connect *connect; + grpc_resource_quota *resource_quota = grpc_resource_quota_create(NULL); + (void)channel_args; (void)interested_parties; + + if (channel_args != NULL) { + for (size_t i = 0; i < channel_args->num_args; i++) { + if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) { + grpc_resource_quota_internal_unref(exec_ctx, resource_quota); + resource_quota = grpc_resource_quota_internal_ref( + channel_args->args[i].value.pointer.p); + } + } + } + connect = gpr_malloc(sizeof(grpc_uv_tcp_connect)); memset(connect, 0, sizeof(grpc_uv_tcp_connect)); connect->closure = closure; connect->endpoint = ep; connect->tcp_handle = gpr_malloc(sizeof(uv_tcp_t)); connect->addr_name = grpc_sockaddr_to_uri(resolved_addr); + connect->resource_quota = resource_quota; uv_tcp_init(uv_default_loop(), connect->tcp_handle); connect->connect_req.data = connect; // TODO(murgatroid99): figure out what the return value here means @@ -138,16 +156,18 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx, // overridden by api_fuzzer.c void (*grpc_tcp_client_connect_impl)( grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, - grpc_pollset_set *interested_parties, const grpc_resolved_address *addr, + grpc_pollset_set *interested_parties, const grpc_channel_args *channel_args, + const grpc_resolved_address *addr, gpr_timespec deadline) = tcp_client_connect_impl; void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, + const grpc_channel_args *channel_args, const grpc_resolved_address *addr, gpr_timespec deadline) { - grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, addr, - deadline); + grpc_tcp_client_connect_impl(exec_ctx, closure, ep, interested_parties, + channel_args, addr, deadline); } #endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 4bf13bee27..e27a762f20 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -104,7 +104,7 @@ typedef struct { char *peer_string; - grpc_resource_user resource_user; + grpc_resource_user *resource_user; grpc_resource_user_slice_allocator slice_allocator; } grpc_tcp; @@ -112,28 +112,18 @@ static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, grpc_error *error); -static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - grpc_error *error); - -static void tcp_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx, - grpc_tcp *tcp) { - if (gpr_atm_full_fetch_add(&tcp->shutdown_count, 1) == 0) { - grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user, - grpc_closure_create(tcp_unref_closure, tcp)); - } -} static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_maybe_shutdown_resource_user(exec_ctx, tcp); grpc_fd_shutdown(exec_ctx, tcp->em_fd); + grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); } static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, "tcp_unref_orphan"); grpc_slice_buffer_destroy_internal(exec_ctx, &tcp->last_read_buffer); - grpc_resource_user_destroy(exec_ctx, &tcp->resource_user); + grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp->peer_string); gpr_free(tcp); } @@ -170,15 +160,9 @@ static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif -static void tcp_unref_closure(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - TCP_UNREF(exec_ctx, arg, "resource_user"); -} - static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; - tcp_maybe_shutdown_resource_user(exec_ctx, tcp); grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } @@ -520,7 +504,7 @@ static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { static grpc_resource_user *tcp_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - return &tcp->resource_user; + return tcp->resource_user; } static const grpc_endpoint_vtable vtable = {tcp_read, @@ -548,9 +532,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, tcp->slice_size = slice_size; tcp->iov_size = 1; tcp->finished_edge = true; - /* paired with unref in grpc_tcp_destroy, and with the shutdown for our - * resource_user */ - gpr_ref_init(&tcp->refcount, 2); + /* paired with unref in grpc_tcp_destroy */ + gpr_ref_init(&tcp->refcount, 1); gpr_atm_no_barrier_store(&tcp->shutdown_count, 0); tcp->em_fd = em_fd; tcp->read_closure.cb = tcp_handle_read; @@ -558,10 +541,9 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, tcp->write_closure.cb = tcp_handle_write; tcp->write_closure.cb_arg = tcp; grpc_slice_buffer_init(&tcp->last_read_buffer); - grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); - grpc_resource_user_slice_allocator_init(&tcp->slice_allocator, - &tcp->resource_user, - tcp_read_allocation_done, tcp); + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); + grpc_resource_user_slice_allocator_init( + &tcp->slice_allocator, tcp->resource_user, tcp_read_allocation_done, tcp); /* Tell network status tracker about new endpoint */ grpc_network_status_register_endpoint(&tcp->base); @@ -581,7 +563,6 @@ void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, GPR_ASSERT(ep->vtable == &vtable); tcp->release_fd = fd; tcp->release_fd_cb = done; - tcp_maybe_shutdown_resource_user(exec_ctx, tcp); grpc_slice_buffer_reset_and_unref_internal(exec_ctx, &tcp->last_read_buffer); TCP_UNREF(exec_ctx, tcp, "destroy"); } diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index 1a753d1231..c8401d442e 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -657,41 +657,46 @@ done: } } +/* Return listener at port_index or NULL. Should only be called with s->mu + locked. */ +static grpc_tcp_listener *get_port_index(grpc_tcp_server *s, + unsigned port_index) { + unsigned num_ports = 0; + grpc_tcp_listener *sp; + for (sp = s->head; sp; sp = sp->next) { + if (!sp->is_sibling) { + if (++num_ports > port_index) { + return sp; + } + } + } + return NULL; +} + unsigned grpc_tcp_server_port_fd_count(grpc_tcp_server *s, unsigned port_index) { unsigned num_fds = 0; - grpc_tcp_listener *sp; gpr_mu_lock(&s->mu); - for (sp = s->head; sp && port_index != 0; sp = sp->next) { - if (!sp->is_sibling) { - --port_index; - } + grpc_tcp_listener *sp = get_port_index(s, port_index); + for (; sp; sp = sp->sibling) { + ++num_fds; } - for (; sp; sp = sp->sibling, ++num_fds) - ; gpr_mu_unlock(&s->mu); return num_fds; } int grpc_tcp_server_port_fd(grpc_tcp_server *s, unsigned port_index, unsigned fd_index) { - grpc_tcp_listener *sp; - int fd; gpr_mu_lock(&s->mu); - for (sp = s->head; sp && port_index != 0; sp = sp->next) { - if (!sp->is_sibling) { - --port_index; + grpc_tcp_listener *sp = get_port_index(s, port_index); + for (; sp; sp = sp->sibling, --fd_index) { + if (fd_index == 0) { + gpr_mu_unlock(&s->mu); + return sp->fd; } } - for (; sp && fd_index != 0; sp = sp->sibling, --fd_index) - ; - if (sp) { - fd = sp->fd; - } else { - fd = -1; - } gpr_mu_unlock(&s->mu); - return fd; + return -1; } void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s, diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c index 73e4db3d65..b5b9b92a20 100644 --- a/src/core/lib/iomgr/tcp_server_uv.c +++ b/src/core/lib/iomgr/tcp_server_uv.c @@ -76,13 +76,30 @@ struct grpc_tcp_server { /* shutdown callback */ grpc_closure *shutdown_complete; + + grpc_resource_quota *resource_quota; }; -grpc_error *grpc_tcp_server_create(grpc_closure *shutdown_complete, +grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx, + grpc_closure *shutdown_complete, const grpc_channel_args *args, grpc_tcp_server **server) { grpc_tcp_server *s = gpr_malloc(sizeof(grpc_tcp_server)); - (void)args; + s->resource_quota = grpc_resource_quota_create(NULL); + for (size_t i = 0; i < (args == NULL ? 0 : args->num_args); i++) { + if (0 == strcmp(GRPC_ARG_RESOURCE_QUOTA, args->args[i].key)) { + if (args->args[i].type == GRPC_ARG_POINTER) { + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + s->resource_quota = + grpc_resource_quota_internal_ref(args->args[i].value.pointer.p); + } else { + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); + gpr_free(s); + return GRPC_ERROR_CREATE(GRPC_ARG_RESOURCE_QUOTA + " must be a pointer to a buffer pool"); + } + } + } gpr_ref_init(&s->refs, 1); s->on_accept_cb = NULL; s->on_accept_cb_arg = NULL; @@ -119,6 +136,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) { gpr_free(sp->handle); gpr_free(sp); } + grpc_resource_quota_internal_unref(exec_ctx, s->resource_quota); gpr_free(s); } @@ -201,7 +219,7 @@ static void on_connect(uv_stream_t *server, int status) { } else { gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status)); } - ep = grpc_tcp_create(client, peer_name_string); + ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string); sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL, &acceptor); grpc_exec_ctx_finish(&exec_ctx); diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c index 6f54ae5383..8b58c04ff5 100644 --- a/src/core/lib/iomgr/tcp_uv.c +++ b/src/core/lib/iomgr/tcp_uv.c @@ -54,6 +54,9 @@ typedef struct { grpc_endpoint base; gpr_refcount refcount; + uv_write_t write_req; + uv_shutdown_t shutdown_req; + uv_tcp_t *handle; grpc_closure *read_cb; @@ -64,14 +67,23 @@ typedef struct { GRPC_SLICE_buffer *write_slices; uv_buf_t *write_buffers; + grpc_resource_user resource_user; + bool shutting_down; + bool resource_user_shutting_down; + char *peer_string; grpc_pollset *pollset; } grpc_tcp; static void uv_close_callback(uv_handle_t *handle) { gpr_free(handle); } -static void tcp_free(grpc_tcp *tcp) { gpr_free(tcp); } +static void tcp_free(grpc_tcp *tcp) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_resource_user_destroy(&exec_ctx, &tcp->resource_user); + gpr_free(tcp); + grpc_exec_ctx_finish(&exec_ctx); +} /*#define GRPC_TCP_REFCOUNT_DEBUG*/ #ifdef GRPC_TCP_REFCOUNT_DEBUG @@ -106,11 +118,14 @@ static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } static void alloc_uv_buf(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp *tcp = handle->data; (void)suggested_size; - tcp->read_slice = GRPC_SLICE_malloc(GRPC_TCP_DEFAULT_READ_SLICE_SIZE); + tcp->read_slice = grpc_resource_user_slice_malloc( + &exec_ctx, &tcp->resource_user, GRPC_TCP_DEFAULT_READ_SLICE_SIZE); buf->base = (char *)GRPC_SLICE_START_PTR(tcp->read_slice); buf->len = GRPC_SLICE_LENGTH(tcp->read_slice); + grpc_exec_ctx_finish(&exec_ctx); } static void read_callback(uv_stream_t *stream, ssize_t nread, @@ -198,7 +213,8 @@ static void write_callback(uv_write_t *req, int status) { gpr_log(GPR_DEBUG, "write complete on %p: error=%s", tcp, str); } gpr_free(tcp->write_buffers); - gpr_free(req); + grpc_resource_user_free(&exec_ctx, &tcp->resource_user, + sizeof(uv_buf_t) * tcp->write_slices->count); grpc_exec_ctx_sched(&exec_ctx, cb, error, NULL); grpc_exec_ctx_finish(&exec_ctx); } @@ -243,12 +259,15 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, tcp->write_cb = cb; buffer_count = (unsigned int)tcp->write_slices->count; buffers = gpr_malloc(sizeof(uv_buf_t) * buffer_count); + grpc_resource_user_alloc(exec_ctx, &tcp->resource_user, + sizeof(uv_buf_t) * buffer_count, NULL); for (i = 0; i < buffer_count; i++) { slice = &tcp->write_slices->slices[i]; buffers[i].base = (char *)GRPC_SLICE_START_PTR(*slice); buffers[i].len = GRPC_SLICE_LENGTH(*slice); } - write_req = gpr_malloc(sizeof(uv_write_t)); + tcp->write_buffers = buffers; + write_req = &tcp->write_req; write_req->data = tcp; TCP_REF(tcp, "write"); // TODO(murgatroid99): figure out what the return value here means @@ -274,13 +293,29 @@ static void uv_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, (void)pollset; } -static void shutdown_callback(uv_shutdown_t *req, int status) { gpr_free(req); } +static void shutdown_callback(uv_shutdown_t *req, int status) {} + +static void resource_user_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + TCP_UNREF(arg, "resource_user"); +} + +static void uv_resource_user_maybe_shutdown(grpc_exec_ctx *exec_ctx, + grpc_tcp *tcp) { + if (!tcp->resource_user_shutting_down) { + tcp->resource_user_shutting_down = true; + TCP_REF(tcp, "resource_user"); + grpc_resource_user_shutdown( + exec_ctx, &tcp->resource_user, + grpc_closure_create(resource_user_shutdown_done, tcp)); + } +} static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; if (!tcp->shutting_down) { tcp->shutting_down = true; - uv_shutdown_t *req = gpr_malloc(sizeof(uv_shutdown_t)); + uv_shutdown_t *req = &tcp->shutdown_req; uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback); } } @@ -289,6 +324,7 @@ static void uv_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; uv_close((uv_handle_t *)tcp->handle, uv_close_callback); + uv_resource_user_maybe_shutdown(exec_ctx, tcp); TCP_UNREF(tcp, "destroy"); } @@ -297,18 +333,21 @@ static char *uv_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } +static grpc_resource_user *uv_get_resource_user(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return &tcp->resource_user; +} + static grpc_workqueue *uv_get_workqueue(grpc_endpoint *ep) { return NULL; } -static grpc_endpoint_vtable vtable = {uv_endpoint_read, - uv_endpoint_write, - uv_get_workqueue, - uv_add_to_pollset, - uv_add_to_pollset_set, - uv_endpoint_shutdown, - uv_destroy, - uv_get_peer}; +static grpc_endpoint_vtable vtable = { + uv_endpoint_read, uv_endpoint_write, uv_get_workqueue, + uv_add_to_pollset, uv_add_to_pollset_set, uv_endpoint_shutdown, + uv_destroy, uv_get_resource_user, uv_get_peer}; -grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { +grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, + grpc_resource_quota *resource_quota, + char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); if (grpc_tcp_trace) { @@ -325,6 +364,8 @@ grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string) { gpr_ref_init(&tcp->refcount, 1); tcp->peer_string = gpr_strdup(peer_string); tcp->shutting_down = false; + tcp->resource_user_shutting_down = false; + grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/iomgr/tcp_uv.h b/src/core/lib/iomgr/tcp_uv.h index eed41151ea..970fcafe4a 100644 --- a/src/core/lib/iomgr/tcp_uv.h +++ b/src/core/lib/iomgr/tcp_uv.h @@ -52,6 +52,8 @@ extern int grpc_tcp_trace; #define GRPC_TCP_DEFAULT_READ_SLICE_SIZE 8192 -grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, char *peer_string); +grpc_endpoint *grpc_tcp_create(uv_tcp_t *handle, + grpc_resource_quota *resource_quota, + char *peer_string); #endif /* GRPC_CORE_LIB_IOMGR_TCP_UV_H */ diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index a97b1b21fe..e5fa360094 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -109,46 +109,35 @@ typedef struct grpc_tcp { grpc_slice_buffer *write_slices; grpc_slice_buffer *read_slices; - grpc_resource_user resource_user; + grpc_resource_user *resource_user; /* The IO Completion Port runs from another thread. We need some mechanism to protect ourselves when requesting a shutdown. */ gpr_mu mu; int shutting_down; - gpr_atm resource_user_shutdown_count; - char *peer_string; } grpc_tcp; -static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */, - grpc_error *error); - -static void win_maybe_shutdown_resource_user(grpc_exec_ctx *exec_ctx, - grpc_tcp *tcp) { - if (gpr_atm_full_fetch_add(&tcp->resource_user_shutdown_count, 1) == 0) { - grpc_resource_user_shutdown(exec_ctx, &tcp->resource_user, - grpc_closure_create(win_unref_closure, tcp)); - } -} - -static void tcp_free(grpc_tcp *tcp) { +static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { grpc_winsocket_destroy(tcp->socket); gpr_mu_destroy(&tcp->mu); gpr_free(tcp->peer_string); + grpc_resource_user_unref(exec_ctx, tcp->resource_user); gpr_free(tcp); } /*#define GRPC_TCP_REFCOUNT_DEBUG*/ #ifdef GRPC_TCP_REFCOUNT_DEBUG -#define TCP_UNREF(tcp, reason) tcp_unref((tcp), (reason), __FILE__, __LINE__) +#define TCP_UNREF(exec_ctx, tcp, reason) \ + tcp_unref((exec_ctx), (tcp), (reason), __FILE__, __LINE__) #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__) -static void tcp_unref(grpc_tcp *tcp, const char *reason, const char *file, - int line) { +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, + const char *reason, const char *file, int line) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp, reason, tcp->refcount.count, tcp->refcount.count - 1); if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } @@ -159,22 +148,17 @@ static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file, gpr_ref(&tcp->refcount); } #else -#define TCP_UNREF(tcp, reason) tcp_unref((tcp)) +#define TCP_UNREF(exec_ctx, tcp, reason) tcp_unref((exec_ctx), (tcp)) #define TCP_REF(tcp, reason) tcp_ref((tcp)) -static void tcp_unref(grpc_tcp *tcp) { +static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) { if (gpr_unref(&tcp->refcount)) { - tcp_free(tcp); + tcp_free(exec_ctx, tcp); } } static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); } #endif -static void win_unref_closure(grpc_exec_ctx *exec_ctx, void *arg, - grpc_error *error) { - TCP_UNREF(arg, "resource_user"); -} - /* Asynchronous callback from the IOCP, or the background thread. */ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { grpc_tcp *tcp = tcpp; @@ -203,7 +187,7 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } tcp->read_cb = NULL; - TCP_UNREF(tcp, "read"); + TCP_UNREF(exec_ctx, tcp, "read"); grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } @@ -287,7 +271,7 @@ static void on_write(grpc_exec_ctx *exec_ctx, void *tcpp, grpc_error *error) { } } - TCP_UNREF(tcp, "write"); + TCP_UNREF(exec_ctx, tcp, "write"); grpc_exec_ctx_sched(exec_ctx, cb, error, NULL); } @@ -355,7 +339,7 @@ static void win_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, if (status != 0) { int wsa_error = WSAGetLastError(); if (wsa_error != WSA_IO_PENDING) { - TCP_UNREF(tcp, "write"); + TCP_UNREF(exec_ctx, tcp, "write"); grpc_exec_ctx_sched(exec_ctx, cb, GRPC_WSA_ERROR(wsa_error, "WSASend"), NULL); return; @@ -396,15 +380,14 @@ static void win_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { callback. See the comments in on_read and on_write. */ tcp->shutting_down = 1; grpc_winsocket_shutdown(tcp->socket); - win_maybe_shutdown_resource_user(exec_ctx, tcp); gpr_mu_unlock(&tcp->mu); + grpc_resource_user_shutdown(exec_ctx, tcp->resource_user); } static void win_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) { grpc_network_status_unregister_endpoint(ep); grpc_tcp *tcp = (grpc_tcp *)ep; - win_maybe_shutdown_resource_user(exec_ctx, tcp); - TCP_UNREF(tcp, "destroy"); + TCP_UNREF(exec_ctx, tcp, "destroy"); } static char *win_get_peer(grpc_endpoint *ep) { @@ -416,7 +399,7 @@ static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } static grpc_resource_user *win_get_resource_user(grpc_endpoint *ep) { grpc_tcp *tcp = (grpc_tcp *)ep; - return &tcp->resource_user; + return tcp->resource_user; } static grpc_endpoint_vtable vtable = {win_read, @@ -441,7 +424,7 @@ grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, grpc_closure_init(&tcp->on_read, on_read, tcp); grpc_closure_init(&tcp->on_write, on_write, tcp); tcp->peer_string = gpr_strdup(peer_string); - grpc_resource_user_init(&tcp->resource_user, resource_quota, peer_string); + tcp->resource_user = grpc_resource_user_create(resource_quota, peer_string); /* Tell network status tracking code about the new endpoint */ grpc_network_status_register_endpoint(&tcp->base); diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c index 05c4f4cd77..0281db385b 100644 --- a/src/core/lib/security/credentials/jwt/jwt_verifier.c +++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c @@ -90,7 +90,7 @@ static grpc_json *parse_json_part_from_jwt(grpc_exec_ctx *exec_ctx, grpc_slice *buffer) { grpc_json *json; - *buffer = grpc_base64_decode_with_len(str, len, 1); + *buffer = grpc_base64_decode_with_len(exec_ctx, str, len, 1); if (GRPC_SLICE_IS_EMPTY(*buffer)) { gpr_log(GPR_ERROR, "Invalid base64."); return NULL; @@ -456,7 +456,7 @@ static BIGNUM *bignum_from_base64(grpc_exec_ctx *exec_ctx, const char *b64) { grpc_slice bin; if (b64 == NULL) return NULL; - bin = grpc_base64_decode(b64, 1); + bin = grpc_base64_decode(exec_ctx, b64, 1); if (GRPC_SLICE_IS_EMPTY(bin)) { gpr_log(GPR_ERROR, "Invalid base64 for big num."); return NULL; @@ -833,7 +833,7 @@ void grpc_jwt_verifier_verify(grpc_exec_ctx *exec_ctx, signed_jwt_len = (size_t)(dot - jwt); cur = dot + 1; - signature = grpc_base64_decode(cur, 1); + signature = grpc_base64_decode(exec_ctx, cur, 1); if (GRPC_SLICE_IS_EMPTY(signature)) goto error; retrieve_key_and_verify( exec_ctx, diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c index 285f96aa9e..ae40bb499c 100644 --- a/src/core/lib/security/transport/client_auth_filter.c +++ b/src/core/lib/security/transport/client_auth_filter.c @@ -344,14 +344,8 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx, GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "client_auth_filter"); } -const grpc_channel_filter grpc_client_auth_filter = {auth_start_transport_op, - grpc_channel_next_op, - sizeof(call_data), - init_call_elem, - set_pollset_or_pollset_set, - destroy_call_elem, - sizeof(channel_data), - init_channel_elem, - destroy_channel_elem, - grpc_call_next_get_peer, - "client-auth"}; +const grpc_channel_filter grpc_client_auth_filter = { + auth_start_transport_op, grpc_channel_next_op, sizeof(call_data), + init_call_elem, set_pollset_or_pollset_set, destroy_call_elem, + sizeof(channel_data), init_channel_elem, destroy_channel_elem, + grpc_call_next_get_peer, grpc_channel_next_get_info, "client-auth"}; diff --git a/src/core/lib/security/transport/security_connector.c b/src/core/lib/security/transport/security_connector.c index 6b2569f646..7192f228cd 100644 --- a/src/core/lib/security/transport/security_connector.c +++ b/src/core/lib/security/transport/security_connector.c @@ -212,11 +212,11 @@ void grpc_security_connector_unref(grpc_exec_ctx *exec_ctx, } static void connector_pointer_arg_destroy(grpc_exec_ctx *exec_ctx, void *p) { - GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, p, "connector_pointer_arg"); + GRPC_SECURITY_CONNECTOR_UNREF(exec_ctx, p, "connector_pointer_arg_destroy"); } static void *connector_pointer_arg_copy(void *p) { - return GRPC_SECURITY_CONNECTOR_REF(p, "connector_pointer_arg"); + return GRPC_SECURITY_CONNECTOR_REF(p, "connector_pointer_arg_copy"); } static int connector_pointer_cmp(void *a, void *b) { return GPR_ICMP(a, b); } diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c index d5fb48b38f..246ca35bc6 100644 --- a/src/core/lib/security/transport/server_auth_filter.c +++ b/src/core/lib/security/transport/server_auth_filter.c @@ -278,4 +278,5 @@ const grpc_channel_filter grpc_server_auth_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "server-auth"}; diff --git a/src/core/lib/security/util/b64.c b/src/core/lib/security/util/b64.c index c227889726..bbd7e335a6 100644 --- a/src/core/lib/security/util/b64.c +++ b/src/core/lib/security/util/b64.c @@ -40,6 +40,8 @@ #include <grpc/support/log.h> #include <grpc/support/useful.h> +#include "src/core/lib/slice/slice_internal.h" + /* --- Constants. --- */ static const int8_t base64_bytes[] = { @@ -120,8 +122,9 @@ char *grpc_base64_encode(const void *vdata, size_t data_size, int url_safe, return result; } -grpc_slice grpc_base64_decode(const char *b64, int url_safe) { - return grpc_base64_decode_with_len(b64, strlen(b64), url_safe); +grpc_slice grpc_base64_decode(grpc_exec_ctx *exec_ctx, const char *b64, + int url_safe) { + return grpc_base64_decode_with_len(exec_ctx, b64, strlen(b64), url_safe); } static void decode_one_char(const unsigned char *codes, unsigned char *result, @@ -182,8 +185,8 @@ static int decode_group(const unsigned char *codes, size_t num_codes, return 1; } -grpc_slice grpc_base64_decode_with_len(const char *b64, size_t b64_len, - int url_safe) { +grpc_slice grpc_base64_decode_with_len(grpc_exec_ctx *exec_ctx, const char *b64, + size_t b64_len, int url_safe) { grpc_slice result = grpc_slice_malloc(b64_len); unsigned char *current = GRPC_SLICE_START_PTR(result); size_t result_size = 0; diff --git a/src/core/lib/security/util/b64.h b/src/core/lib/security/util/b64.h index 6ea0b5365b..d42a136f61 100644 --- a/src/core/lib/security/util/b64.h +++ b/src/core/lib/security/util/b64.h @@ -43,10 +43,11 @@ char *grpc_base64_encode(const void *data, size_t data_size, int url_safe, /* Decodes data according to the base64 specification. Returns an empty slice in case of failure. */ -grpc_slice grpc_base64_decode(const char *b64, int url_safe); +grpc_slice grpc_base64_decode(grpc_exec_ctx *exec_ctx, const char *b64, + int url_safe); /* Same as above except that the length is provided by the caller. */ -grpc_slice grpc_base64_decode_with_len(const char *b64, size_t b64_len, - int url_safe); +grpc_slice grpc_base64_decode_with_len(grpc_exec_ctx *exec_ctx, const char *b64, + size_t b64_len, int url_safe); #endif /* GRPC_CORE_LIB_SECURITY_UTIL_B64_H */ diff --git a/src/core/lib/support/env.h b/src/core/lib/support/env.h index 1a24216656..6ada5d9390 100644 --- a/src/core/lib/support/env.h +++ b/src/core/lib/support/env.h @@ -36,8 +36,6 @@ #include <stdio.h> -#include <grpc/slice.h> - #ifdef __cplusplus extern "C" { #endif diff --git a/src/core/lib/support/string.h b/src/core/lib/support/string.h index 13891d0b9e..81a12ae476 100644 --- a/src/core/lib/support/string.h +++ b/src/core/lib/support/string.h @@ -95,6 +95,9 @@ char *gpr_strjoin(const char **strs, size_t nstrs, size_t *total_length); char *gpr_strjoin_sep(const char **strs, size_t nstrs, const char *sep, size_t *total_length); +void gpr_string_split(const char *input, const char *sep, char ***strs, + size_t *nstrs); + /* A vector of strings... for building up a final string one piece at a time */ typedef struct { char **strs; diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c index be568feba1..9be13d84fe 100644 --- a/src/core/lib/surface/call.c +++ b/src/core/lib/surface/call.c @@ -1473,6 +1473,12 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx, grpc_slice_buffer_stream_init( &call->sending_stream, &op->data.send_message->data.raw.slice_buffer, op->flags); + /* If the outgoing buffer is already compressed, mark it as so in the + flags. These will be picked up by the compression filter and further + (wasteful) attempts at compression skipped. */ + if (op->data.send_message->data.raw.compression > GRPC_COMPRESS_NONE) { + call->sending_stream.base.flags |= GRPC_WRITE_INTERNAL_COMPRESS; + } stream_op->send_message = &call->sending_stream.base; break; case GRPC_OP_SEND_CLOSE_FROM_CLIENT: diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c index 82617390bb..7e87f05531 100644 --- a/src/core/lib/surface/channel.c +++ b/src/core/lib/surface/channel.c @@ -176,6 +176,15 @@ char *grpc_channel_get_target(grpc_channel *channel) { return gpr_strdup(channel->target); } +void grpc_channel_get_info(grpc_channel *channel, + const grpc_channel_info *channel_info) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_channel_element *elem = + grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); + elem->filter->get_channel_info(&exec_ctx, elem, channel_info); + grpc_exec_ctx_finish(&exec_ctx); +} + static grpc_call *grpc_channel_create_call_internal( grpc_exec_ctx *exec_ctx, grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask, grpc_completion_queue *cq, diff --git a/src/core/lib/surface/channel_init.c b/src/core/lib/surface/channel_init.c index 0627b34479..7acb444d9b 100644 --- a/src/core/lib/surface/channel_init.c +++ b/src/core/lib/surface/channel_init.c @@ -131,7 +131,7 @@ bool grpc_channel_init_create_stack(grpc_exec_ctx *exec_ctx, for (size_t i = 0; i < g_slots[type].num_slots; i++) { const stage_slot *slot = &g_slots[type].slots[i]; - if (!slot->fn(builder, slot->arg)) { + if (!slot->fn(exec_ctx, builder, slot->arg)) { return false; } } diff --git a/src/core/lib/surface/channel_init.h b/src/core/lib/surface/channel_init.h index b53f2aefb9..411f5eae18 100644 --- a/src/core/lib/surface/channel_init.h +++ b/src/core/lib/surface/channel_init.h @@ -51,7 +51,8 @@ extern "C" { /// One stage of mutation: call functions against \a builder to influence the /// finally constructed channel stack -typedef bool (*grpc_channel_init_stage)(grpc_channel_stack_builder *builder, +typedef bool (*grpc_channel_init_stage)(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg); /// Global initialization of the system diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c index 8c82f38c77..d3b602cf2a 100644 --- a/src/core/lib/surface/init.c +++ b/src/core/lib/surface/init.c @@ -80,17 +80,20 @@ static void do_basic_init(void) { g_initializations = 0; } -static bool append_filter(grpc_channel_stack_builder *builder, void *arg) { +static bool append_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg) { return grpc_channel_stack_builder_append_filter( builder, (const grpc_channel_filter *)arg, NULL, NULL); } -static bool prepend_filter(grpc_channel_stack_builder *builder, void *arg) { +static bool prepend_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg) { return grpc_channel_stack_builder_prepend_filter( builder, (const grpc_channel_filter *)arg, NULL, NULL); } -static bool maybe_add_http_filter(grpc_channel_stack_builder *builder, +static bool maybe_add_http_filter(grpc_exec_ctx *exec_ctx, + grpc_channel_stack_builder *builder, void *arg) { grpc_transport *t = grpc_channel_stack_builder_get_transport(builder); if (t && strstr(t->vtable->name, "http")) { diff --git a/src/core/lib/surface/init_secure.c b/src/core/lib/surface/init_secure.c index 7ee7b51568..520a8aa84f 100644 --- a/src/core/lib/surface/init_secure.c +++ b/src/core/lib/surface/init_secure.c @@ -50,7 +50,7 @@ void grpc_security_pre_init(void) { } static bool maybe_prepend_client_auth_filter( - grpc_channel_stack_builder *builder, void *arg) { + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); if (args) { @@ -65,7 +65,7 @@ static bool maybe_prepend_client_auth_filter( } static bool maybe_prepend_server_auth_filter( - grpc_channel_stack_builder *builder, void *arg) { + grpc_exec_ctx *exec_ctx, grpc_channel_stack_builder *builder, void *arg) { const grpc_channel_args *args = grpc_channel_stack_builder_get_channel_arguments(builder); if (args) { diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c index 1b57c5cd01..d606f70c2f 100644 --- a/src/core/lib/surface/lame_client.c +++ b/src/core/lib/surface/lame_client.c @@ -89,6 +89,10 @@ static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) { return NULL; } +static void lame_get_channel_info(grpc_exec_ctx *exec_ctx, + grpc_channel_element *elem, + const grpc_channel_info *channel_info) {} + static void lame_start_transport_op(grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, grpc_transport_op *op) { @@ -141,6 +145,7 @@ const grpc_channel_filter grpc_lame_filter = { init_channel_elem, destroy_channel_elem, lame_get_peer, + lame_get_channel_info, "lame-client", }; diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index 6d9d3a92ab..530e5ed46c 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -967,6 +967,7 @@ const grpc_channel_filter grpc_server_top_filter = { init_channel_elem, destroy_channel_elem, grpc_call_next_get_peer, + grpc_channel_next_get_info, "server", }; diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c index 41242684da..0db8b41aa9 100644 --- a/src/core/lib/surface/version.c +++ b/src/core/lib/surface/version.c @@ -36,6 +36,6 @@ #include <grpc/grpc.h> -const char *grpc_version_string(void) { return "1.1.0-dev"; } +const char *grpc_version_string(void) { return "2.0.0-dev"; } const char *grpc_g_stands_for(void) { return "good"; } diff --git a/src/core/lib/transport/metadata.c b/src/core/lib/transport/metadata.c index ef5fd32b52..9b5d8099c7 100644 --- a/src/core/lib/transport/metadata.c +++ b/src/core/lib/transport/metadata.c @@ -695,6 +695,11 @@ size_t grpc_mdstr_length(const grpc_mdstr *s) { return GRPC_MDSTR_LENGTH(s); } grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs DEBUG_ARGS) { internal_string *s = (internal_string *)gs; if (is_mdstr_static(gs)) return gs; +#ifdef GRPC_METADATA_REFCOUNT_DEBUG + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "STR REF:%p:%zu->%zu: '%s'", + (void *)s, gpr_atm_no_barrier_load(&s->refcnt), + gpr_atm_no_barrier_load(&s->refcnt) + 1, grpc_mdstr_as_c_string(gs)); +#endif GPR_ASSERT(gpr_atm_full_fetch_add(&s->refcnt, 1) > 0); return gs; } @@ -702,6 +707,11 @@ grpc_mdstr *grpc_mdstr_ref(grpc_mdstr *gs DEBUG_ARGS) { void grpc_mdstr_unref(grpc_exec_ctx *exec_ctx, grpc_mdstr *gs DEBUG_ARGS) { internal_string *s = (internal_string *)gs; if (is_mdstr_static(gs)) return; +#ifdef GRPC_METADATA_REFCOUNT_DEBUG + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "STR UNREF:%p:%zu->%zu: '%s'", + (void *)s, gpr_atm_no_barrier_load(&s->refcnt), + gpr_atm_no_barrier_load(&s->refcnt) - 1, grpc_mdstr_as_c_string(gs)); +#endif if (1 == gpr_atm_full_fetch_add(&s->refcnt, -1)) { strtab_shard *shard = &g_strtab_shard[SHARD_IDX(s->hash, LOG2_STRTAB_SHARD_COUNT)]; diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h index cf77753692..8a64be7025 100644 --- a/src/core/lib/transport/metadata.h +++ b/src/core/lib/transport/metadata.h @@ -37,6 +37,8 @@ #include <grpc/slice.h> #include <grpc/support/useful.h> +#include "src/core/lib/iomgr/exec_ctx.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/src/core/lib/transport/pid_controller.c b/src/core/lib/transport/pid_controller.c new file mode 100644 index 0000000000..3cef225d4b --- /dev/null +++ b/src/core/lib/transport/pid_controller.c @@ -0,0 +1,57 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/lib/transport/pid_controller.h" + +void grpc_pid_controller_init(grpc_pid_controller *pid_controller, + double gain_p, double gain_i, double gain_d) { + pid_controller->gain_p = gain_p; + pid_controller->gain_i = gain_i; + pid_controller->gain_d = gain_d; + grpc_pid_controller_reset(pid_controller); +} + +void grpc_pid_controller_reset(grpc_pid_controller *pid_controller) { + pid_controller->last_error = 0.0; + pid_controller->error_integral = 0.0; +} + +double grpc_pid_controller_update(grpc_pid_controller *pid_controller, + double error, double dt) { + pid_controller->error_integral += error * dt; + double diff_error = (error - pid_controller->last_error) / dt; + pid_controller->last_error = error; + return dt * (pid_controller->gain_p * error + + pid_controller->gain_i * pid_controller->error_integral + + pid_controller->gain_d * diff_error); +} diff --git a/src/core/lib/transport/pid_controller.h b/src/core/lib/transport/pid_controller.h new file mode 100644 index 0000000000..059b5b0834 --- /dev/null +++ b/src/core/lib/transport/pid_controller.h @@ -0,0 +1,64 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H +#define GRPC_CORE_LIB_TRANSPORT_PID_CONTROLLER_H + +/* \file Simple PID controller. + Implements a proportional-integral-derivative controller. + Used when we want to iteratively control a variable to converge some other + observed value to a 'set-point'. + Gains can be set to adjust sensitivity to current error (p), the integral + of error (i), and the derivative of error (d). */ + +typedef struct { + double gain_p; + double gain_i; + double gain_d; + double last_error; + double error_integral; +} grpc_pid_controller; + +/** Initialize the controller */ +void grpc_pid_controller_init(grpc_pid_controller *pid_controller, + double gain_p, double gain_i, double gain_d); + +/** Reset the controller: useful when things have changed significantly */ +void grpc_pid_controller_reset(grpc_pid_controller *pid_controller); + +/** Update the controller: given a current error estimate, and the time since + the last update, returns a delta to the control value */ +double grpc_pid_controller_update(grpc_pid_controller *pid_controller, + double error, double dt); + +#endif |