diff options
Diffstat (limited to 'src/core/client_config/resolvers')
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 31 | ||||
-rw-r--r-- | src/core/client_config/resolvers/sockaddr_resolver.c | 30 |
2 files changed, 33 insertions, 28 deletions
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index ccec07a08c..b519ce4174 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -49,6 +49,8 @@ typedef struct { grpc_resolver base; /** refcount */ gpr_refcount refs; + /** workqueue */ + grpc_workqueue *workqueue; /** name to resolve */ char *name; /** default port to use */ @@ -94,7 +96,7 @@ static void dns_shutdown(grpc_resolver *resolver) { gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -180,7 +182,7 @@ static void dns_maybe_finish_next_locked(dns_resolver *r) { if (r->resolved_config) { grpc_client_config_ref(r->resolved_config); } - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; r->published_version = r->resolved_version; } @@ -193,21 +195,21 @@ static void dns_destroy(grpc_resolver *gr) { grpc_client_config_unref(r->resolved_config); } grpc_subchannel_factory_unref(r->subchannel_factory); + grpc_workqueue_unref(r->workqueue); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r->lb_policy_name); gpr_free(r); } -static grpc_resolver *dns_create( - grpc_uri *uri, const char *default_port, - const char* lb_policy_name, - grpc_subchannel_factory *subchannel_factory) { +static grpc_resolver *dns_create(grpc_resolver_args *args, + const char *default_port, + const char *lb_policy_name) { dns_resolver *r; - const char *path = uri->path; + const char *path = args->uri->path; - if (0 != strcmp(uri->authority, "")) { - gpr_log(GPR_ERROR, "authority based uri's not supported"); + if (0 != strcmp(args->uri->authority, "")) { + gpr_log(GPR_ERROR, "authority based dns uri's not supported"); return NULL; } @@ -220,8 +222,10 @@ static grpc_resolver *dns_create( grpc_resolver_init(&r->base, &dns_resolver_vtable); r->name = gpr_strdup(path); r->default_port = gpr_strdup(default_port); - r->subchannel_factory = subchannel_factory; - grpc_subchannel_factory_ref(subchannel_factory); + r->subchannel_factory = args->subchannel_factory; + grpc_subchannel_factory_ref(r->subchannel_factory); + r->workqueue = args->workqueue; + grpc_workqueue_ref(r->workqueue); r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } @@ -235,9 +239,8 @@ static void dns_factory_ref(grpc_resolver_factory *factory) {} static void dns_factory_unref(grpc_resolver_factory *factory) {} static grpc_resolver *dns_factory_create_resolver( - grpc_resolver_factory *factory, grpc_uri *uri, - grpc_subchannel_factory *subchannel_factory) { - return dns_create(uri, "https", "pick_first", subchannel_factory); + grpc_resolver_factory *factory, grpc_resolver_args *args) { + return dns_create(args, "https", "pick_first"); } char *dns_factory_get_default_host_name(grpc_resolver_factory *factory, diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 2900df285c..260bee27a9 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -56,6 +56,8 @@ typedef struct { gpr_refcount refs; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; + /** workqueue */ + grpc_workqueue *workqueue; /** load balancing policy name */ char *lb_policy_name; @@ -96,8 +98,7 @@ static void sockaddr_shutdown(grpc_resolver *resolver) { gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - /* TODO(ctiller): add delayed callback */ - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); @@ -145,7 +146,7 @@ static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { GRPC_LB_POLICY_UNREF(lb_policy, "unix"); r->published = 1; *r->target_config = cfg; - grpc_iomgr_add_callback(r->next_completion); + grpc_workqueue_push(r->workqueue, r->next_completion, 1); r->next_completion = NULL; } } @@ -154,6 +155,7 @@ static void sockaddr_destroy(grpc_resolver *gr) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(r->subchannel_factory); + grpc_workqueue_unref(r->workqueue); gpr_free(r->addrs); gpr_free(r->addrs_len); gpr_free(r->lb_policy_name); @@ -278,8 +280,7 @@ done: static void do_nothing(void *ignored) {} static grpc_resolver *sockaddr_create( - grpc_uri *uri, const char *lb_policy_name, - grpc_subchannel_factory *subchannel_factory, + grpc_resolver_args *args, const char *lb_policy_name, int parse(grpc_uri *uri, struct sockaddr_storage *dst, size_t *len)) { size_t i; int errors_found = 0; /* GPR_FALSE */ @@ -287,7 +288,7 @@ static grpc_resolver *sockaddr_create( gpr_slice path_slice; gpr_slice_buffer path_parts; - if (0 != strcmp(uri->authority, "")) { + if (0 != strcmp(args->uri->authority, "")) { gpr_log(GPR_ERROR, "authority based uri's not supported"); return NULL; } @@ -295,7 +296,8 @@ static grpc_resolver *sockaddr_create( r = gpr_malloc(sizeof(sockaddr_resolver)); memset(r, 0, sizeof(*r)); - path_slice = gpr_slice_new(uri->path, strlen(uri->path), do_nothing); + path_slice = + gpr_slice_new(args->uri->path, strlen(args->uri->path), do_nothing); gpr_slice_buffer_init(&path_parts); gpr_slice_split(path_slice, ",", &path_parts); @@ -304,7 +306,7 @@ static grpc_resolver *sockaddr_create( r->addrs_len = gpr_malloc(sizeof(*r->addrs_len) * r->num_addrs); for(i = 0; i < r->num_addrs; i++) { - grpc_uri ith_uri = *uri; + grpc_uri ith_uri = *args->uri; char* part_str = gpr_dump_slice(path_parts.slices[i], GPR_DUMP_ASCII); ith_uri.path = part_str; if (!parse(&ith_uri, &r->addrs[i], &r->addrs_len[i])) { @@ -324,10 +326,12 @@ static grpc_resolver *sockaddr_create( gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); - r->subchannel_factory = subchannel_factory; + r->subchannel_factory = args->subchannel_factory; + grpc_subchannel_factory_ref(r->subchannel_factory); + r->workqueue = args->workqueue; + grpc_workqueue_ref(r->workqueue); r->lb_policy_name = gpr_strdup(lb_policy_name); - grpc_subchannel_factory_ref(subchannel_factory); return &r->base; } @@ -341,10 +345,8 @@ static void sockaddr_factory_unref(grpc_resolver_factory *factory) {} #define DECL_FACTORY(name) \ static grpc_resolver *name##_factory_create_resolver( \ - grpc_resolver_factory *factory, grpc_uri *uri, \ - grpc_subchannel_factory *subchannel_factory) { \ - return sockaddr_create(uri, "pick_first", \ - subchannel_factory, parse_##name); \ + grpc_resolver_factory *factory, grpc_resolver_args *args) { \ + return sockaddr_create(args, "pick_first", parse_##name); \ } \ static const grpc_resolver_factory_vtable name##_factory_vtable = { \ sockaddr_factory_ref, sockaddr_factory_unref, \ |