diff options
-rw-r--r-- | src/core/ext/client_config/method_config.c | 2 | ||||
-rw-r--r-- | src/core/ext/resolver/sockaddr/sockaddr_resolver.c | 83 | ||||
-rw-r--r-- | test/core/client_config/resolvers/sockaddr_resolver_test.c | 102 |
3 files changed, 167 insertions, 20 deletions
diff --git a/src/core/ext/client_config/method_config.c b/src/core/ext/client_config/method_config.c index 95efe492a9..1135a1c4c4 100644 --- a/src/core/ext/client_config/method_config.c +++ b/src/core/ext/client_config/method_config.c @@ -284,7 +284,7 @@ grpc_arg grpc_method_config_table_create_channel_arg( grpc_arg arg; arg.type = GRPC_ARG_POINTER; arg.key = GRPC_ARG_SERVICE_CONFIG; - arg.value.pointer.p = grpc_method_config_table_ref(table); + arg.value.pointer.p = table; arg.value.pointer.vtable = &arg_vtable; return arg; } diff --git a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c index 74d2015e5c..98e22f0f63 100644 --- a/src/core/ext/resolver/sockaddr/sockaddr_resolver.c +++ b/src/core/ext/resolver/sockaddr/sockaddr_resolver.c @@ -33,6 +33,7 @@ #include <stdbool.h> #include <stdio.h> +#include <stdlib.h> #include <string.h> #include <grpc/support/alloc.h> @@ -40,8 +41,10 @@ #include <grpc/support/port_platform.h> #include <grpc/support/string_util.h> +#include "src/core/ext/client_config/method_config.h" #include "src/core/ext/client_config/parse_address.h" #include "src/core/ext/client_config/resolver_registry.h" +#include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/unix_sockets_posix.h" #include "src/core/lib/support/string.h" @@ -53,6 +56,8 @@ typedef struct { gpr_refcount refs; /** load balancing policy name */ char *lb_policy_name; + /** method config table */ + grpc_method_config_table *method_config_table; /** the addresses that we've 'resolved' */ grpc_lb_addresses *addresses; @@ -120,9 +125,15 @@ static void sockaddr_maybe_finish_next_locked(grpc_exec_ctx *exec_ctx, sockaddr_resolver *r) { if (r->next_completion != NULL && !r->published) { r->published = true; + grpc_channel_args *lb_policy_args = NULL; + if (r->method_config_table != NULL) { + const grpc_arg arg = grpc_method_config_table_create_channel_arg( + r->method_config_table); + lb_policy_args = grpc_channel_args_copy_and_add(NULL /* src */, &arg, 1); + } *r->target_result = grpc_resolver_result_create( "", grpc_lb_addresses_copy(r->addresses, NULL /* user_data_copy */), - r->lb_policy_name, NULL); + r->lb_policy_name, lb_policy_args); grpc_exec_ctx_sched(exec_ctx, r->next_completion, GRPC_ERROR_NONE, NULL); r->next_completion = NULL; } @@ -133,6 +144,7 @@ static void sockaddr_destroy(grpc_exec_ctx *exec_ctx, grpc_resolver *gr) { gpr_mu_destroy(&r->mu); grpc_lb_addresses_destroy(r->addresses, NULL /* user_data_destroy */); gpr_free(r->lb_policy_name); + grpc_method_config_table_unref(r->method_config_table); gpr_free(r); } @@ -164,30 +176,29 @@ static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( grpc_resolver_args *args, const char *default_lb_policy_name, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { - bool errors_found = false; - sockaddr_resolver *r; - gpr_slice path_slice; - gpr_slice_buffer path_parts; - if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based uri's not supported by the %s scheme", args->uri->scheme); return NULL; } - r = gpr_malloc(sizeof(sockaddr_resolver)); + sockaddr_resolver *r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); + // Initialize LB policy name. r->lb_policy_name = gpr_strdup(grpc_uri_get_query_arg(args->uri, "lb_policy")); + if (r->lb_policy_name == NULL) { + r->lb_policy_name = gpr_strdup(default_lb_policy_name); + } + + // Get lb_enabled arg. const char *lb_enabled_qpart = grpc_uri_get_query_arg(args->uri, "lb_enabled"); - /* anything other than "0" is interpreted as true */ + // Anything other than "0" is interpreted as true. const bool lb_enabled = - (lb_enabled_qpart != NULL && (strcmp("0", lb_enabled_qpart) != 0)); - - if (r->lb_policy_name != NULL && strcmp("grpclb", r->lb_policy_name) == 0 && - !lb_enabled) { + lb_enabled_qpart != NULL && strcmp("0", lb_enabled_qpart) != 0; + if (strcmp("grpclb", r->lb_policy_name) == 0 && !lb_enabled) { /* we want grpclb but the "resolved" addresses aren't LB enabled. Bail * out, as this is meant mostly for tests. */ gpr_log(GPR_ERROR, @@ -196,16 +207,14 @@ static grpc_resolver *sockaddr_create( abort(); } - if (r->lb_policy_name == NULL) { - r->lb_policy_name = gpr_strdup(default_lb_policy_name); - } - - path_slice = + // Construct addresses. + gpr_slice path_slice = gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); + gpr_slice_buffer path_parts; gpr_slice_buffer_init(&path_parts); - gpr_slice_split(path_slice, ",", &path_parts); r->addresses = grpc_lb_addresses_create(path_parts.count); + bool errors_found = false; for (size_t i = 0; i < r->addresses->num_addresses; i++) { grpc_uri ith_uri = *args->uri; char *part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); @@ -219,7 +228,6 @@ static grpc_resolver *sockaddr_create( r->addresses->addresses[i].is_balancer = lb_enabled; if (errors_found) break; } - gpr_slice_buffer_destroy(&path_parts); gpr_slice_unref(path_slice); if (errors_found) { @@ -229,6 +237,43 @@ static grpc_resolver *sockaddr_create( return NULL; } + // Construct method config table. + // We only support parameters for a single method. + const char *method_name = grpc_uri_get_query_arg(args->uri, "method_name"); + if (method_name != NULL) { + const char *wait_for_ready_str = + grpc_uri_get_query_arg(args->uri, "wait_for_ready"); + // Anything other than "0" is interpreted as true. + bool wait_for_ready = + wait_for_ready_str != NULL && strcmp("0", wait_for_ready_str) != 0; + const char* timeout_str = + grpc_uri_get_query_arg(args->uri, "timeout_seconds"); + gpr_timespec timeout = { + timeout_str == NULL ? 0 : atoi(timeout_str), 0, GPR_CLOCK_MONOTONIC}; + const char* max_request_message_bytes_str = + grpc_uri_get_query_arg(args->uri, "max_request_message_bytes"); + int32_t max_request_message_bytes = + max_request_message_bytes_str == NULL + ? 0 : atoi(max_request_message_bytes_str); + const char* max_response_message_bytes_str = + grpc_uri_get_query_arg(args->uri, "max_response_message_bytes"); + int32_t max_response_message_bytes = + max_response_message_bytes_str == NULL + ? 0 : atoi(max_response_message_bytes_str); + grpc_method_config *method_config = grpc_method_config_create( + wait_for_ready_str == NULL ? NULL : &wait_for_ready, + timeout_str == NULL ? NULL : &timeout, + max_request_message_bytes_str == NULL + ? NULL : &max_request_message_bytes, + max_response_message_bytes_str == NULL + ? NULL : &max_response_message_bytes); + grpc_method_config_table_entry entry = { + grpc_mdstr_from_string(method_name), method_config}; + r->method_config_table = grpc_method_config_table_create(1, &entry); + GRPC_MDSTR_UNREF(entry.method_name); + grpc_method_config_unref(method_config); + } + gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); diff --git a/test/core/client_config/resolvers/sockaddr_resolver_test.c b/test/core/client_config/resolvers/sockaddr_resolver_test.c index d8430d39c4..d7dac9848a 100644 --- a/test/core/client_config/resolvers/sockaddr_resolver_test.c +++ b/test/core/client_config/resolvers/sockaddr_resolver_test.c @@ -33,11 +33,66 @@ #include <string.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include "src/core/ext/client_config/method_config.h" #include "src/core/ext/client_config/resolver_registry.h" +#include "src/core/ext/client_config/resolver_result.h" +#include "src/core/lib/channel/channel_args.h" +#include "src/core/lib/transport/metadata.h" + #include "test/core/util/test_config.h" +typedef struct on_resolution_arg { + const char *expected_method_name; + bool expected_wait_for_ready; + gpr_timespec expected_timeout; + int32_t expected_max_request_message_bytes; + int32_t expected_max_response_message_bytes; + grpc_resolver_result *resolver_result; +} on_resolution_arg; + +void on_resolution_cb(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { + on_resolution_arg *res = arg; + const grpc_channel_args *lb_policy_args = + grpc_resolver_result_get_lb_policy_args(res->resolver_result); + if (res->expected_method_name == NULL) { + GPR_ASSERT(lb_policy_args == NULL); + } else { + const grpc_arg *channel_arg = grpc_channel_args_find( + lb_policy_args, GRPC_ARG_SERVICE_CONFIG); + GPR_ASSERT(channel_arg != NULL); + GPR_ASSERT(channel_arg->type == GRPC_ARG_POINTER); + grpc_method_config_table *method_config_table = + (grpc_method_config_table *)channel_arg->value.pointer.p; + GPR_ASSERT(method_config_table != NULL); + grpc_mdstr *path = grpc_mdstr_from_string(res->expected_method_name); + grpc_method_config *method_config = + grpc_method_config_table_get_method_config(method_config_table, path); + GRPC_MDSTR_UNREF(path); + GPR_ASSERT(method_config != NULL); + bool* wait_for_ready = grpc_method_config_get_wait_for_ready(method_config); + GPR_ASSERT(wait_for_ready != NULL); + GPR_ASSERT(*wait_for_ready == res->expected_wait_for_ready); + gpr_timespec* timeout = grpc_method_config_get_timeout(method_config); + GPR_ASSERT(timeout != NULL); + GPR_ASSERT(gpr_time_cmp(*timeout, res->expected_timeout) == 0); + int32_t* max_request_message_bytes = + grpc_method_config_get_max_request_message_bytes(method_config); + GPR_ASSERT(max_request_message_bytes != NULL); + GPR_ASSERT(*max_request_message_bytes == + res->expected_max_request_message_bytes); + int32_t* max_response_message_bytes = + grpc_method_config_get_max_response_message_bytes(method_config); + GPR_ASSERT(max_response_message_bytes != NULL); + GPR_ASSERT(*max_response_message_bytes == + res->expected_max_response_message_bytes); + } + grpc_resolver_result_unref(exec_ctx, res->resolver_result); +} + static void test_succeeds(grpc_resolver_factory *factory, const char *string) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_uri *uri = grpc_uri_parse(string, 0); @@ -50,9 +105,46 @@ static void test_succeeds(grpc_resolver_factory *factory, const char *string) { args.uri = uri; resolver = grpc_resolver_factory_create_resolver(factory, &args); GPR_ASSERT(resolver != NULL); + on_resolution_arg on_res_arg; + memset(&on_res_arg, 0, sizeof(on_res_arg)); + grpc_closure *on_resolution = + grpc_closure_create(on_resolution_cb, &on_res_arg); + grpc_resolver_next(&exec_ctx, resolver, &on_res_arg.resolver_result, + on_resolution); GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test_succeeds"); + grpc_exec_ctx_finish(&exec_ctx); grpc_uri_destroy(uri); +} + +static void test_succeeds_with_service_config( + grpc_resolver_factory *factory, const char *string, + const char *method_name, bool wait_for_ready, gpr_timespec timeout, + int32_t max_request_message_bytes, int32_t max_response_message_bytes) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_uri *uri = grpc_uri_parse(string, 0); + grpc_resolver_args args; + grpc_resolver *resolver; + gpr_log(GPR_DEBUG, "test: '%s' should be valid for '%s'", string, + factory->vtable->scheme); + GPR_ASSERT(uri); + memset(&args, 0, sizeof(args)); + args.uri = uri; + resolver = grpc_resolver_factory_create_resolver(factory, &args); + GPR_ASSERT(resolver != NULL); + on_resolution_arg on_res_arg; + memset(&on_res_arg, 0, sizeof(on_res_arg)); + on_res_arg.expected_method_name = method_name; + on_res_arg.expected_wait_for_ready = wait_for_ready; + on_res_arg.expected_timeout = timeout; + on_res_arg.expected_max_request_message_bytes = max_request_message_bytes; + on_res_arg.expected_max_response_message_bytes = max_response_message_bytes; + grpc_closure *on_resolution = + grpc_closure_create(on_resolution_cb, &on_res_arg); + grpc_resolver_next(&exec_ctx, resolver, &on_res_arg.resolver_result, + on_resolution); + GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "test_succeeds"); grpc_exec_ctx_finish(&exec_ctx); + grpc_uri_destroy(uri); } static void test_fails(grpc_resolver_factory *factory, const char *string) { @@ -93,6 +185,16 @@ int main(int argc, char **argv) { test_fails(ipv6, "ipv6:[::]:123456"); test_fails(ipv6, "ipv6:www.google.com"); + test_succeeds_with_service_config( + ipv4, + "ipv4:127.0.0.1:1234?method_name=/service/method" + "&wait_for_ready=1" + "&timeout_seconds=7" + "&max_request_message_bytes=456" + "&max_response_message_bytes=789", + "/service/method", true /* wait_for_ready */, + (gpr_timespec){7, 0, GPR_CLOCK_MONOTONIC}, 456, 789); + grpc_resolver_factory_unref(ipv4); grpc_resolver_factory_unref(ipv6); grpc_shutdown(); |