diff options
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/connector.c | 2 | ||||
-rw-r--r-- | src/core/client_config/connector.h | 4 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 33 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 17 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 35 | ||||
-rw-r--r-- | src/core/client_config/resolver.c | 2 | ||||
-rw-r--r-- | src/core/client_config/resolver.h | 4 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 18 | ||||
-rw-r--r-- | src/core/client_config/resolvers/sockaddr_resolver.c | 19 | ||||
-rw-r--r-- | src/core/client_config/resolvers/zookeeper_resolver.c | 21 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 49 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 8 |
12 files changed, 102 insertions, 110 deletions
diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index c1e583e4a5..6252d57271 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -44,7 +44,7 @@ void grpc_connector_unref(grpc_connector *connector) { void grpc_connector_connect(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, - grpc_iomgr_closure *notify) { + grpc_closure *notify) { connector->vtable->connect(connector, in_args, out_args, notify); } diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index ad0b567553..fc1f6708de 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -77,7 +77,7 @@ struct grpc_connector_vtable { /** Implementation of grpc_connector_connect */ void (*connect)(grpc_connector *connector, const grpc_connect_in_args *in_args, - grpc_connect_out_args *out_args, grpc_iomgr_closure *notify); + grpc_connect_out_args *out_args, grpc_closure *notify); }; void grpc_connector_ref(grpc_connector *connector); @@ -86,7 +86,7 @@ void grpc_connector_unref(grpc_connector *connector); void grpc_connector_connect(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, - grpc_iomgr_closure *notify); + grpc_closure *notify); /** Cancel any pending connection */ void grpc_connector_shutdown(grpc_connector *connector); diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 8fd8dd7b67..852eed310d 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -43,7 +43,7 @@ typedef struct pending_pick { struct pending_pick *next; grpc_pollset *pollset; grpc_subchannel **target; - grpc_iomgr_closure *on_complete; + grpc_closure *on_complete; } pending_pick; typedef struct { @@ -55,7 +55,7 @@ typedef struct { /** workqueue for async work */ grpc_workqueue *workqueue; - grpc_iomgr_closure connectivity_changed; + grpc_closure connectivity_changed; /** mutex protecting remaining members */ gpr_mu mu; @@ -108,7 +108,7 @@ void pf_destroy(grpc_lb_policy *pol) { gpr_free(p); } -void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { +void pf_shutdown(grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -122,14 +122,13 @@ void pf_shutdown(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { while (pp != NULL) { pending_pick *next = pp->next; *pp->target = NULL; - grpc_iomgr_call_list_add(call_list, pp->on_complete, 1); + grpc_call_list_add(call_list, pp->on_complete, 1); gpr_free(pp); pp = next; } } -static void start_picking(pick_first_lb_policy *p, - grpc_iomgr_call_list *call_list) { +static void start_picking(pick_first_lb_policy *p, grpc_call_list *call_list) { p->started_picking = 1; p->checking_subchannel = 0; p->checking_connectivity = GRPC_CHANNEL_IDLE; @@ -139,7 +138,7 @@ static void start_picking(pick_first_lb_policy *p, &p->connectivity_changed, call_list); } -void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { +void pf_exit_idle(grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; gpr_mu_lock(&p->mu); if (!p->started_picking) { @@ -150,7 +149,7 @@ void pf_exit_idle(grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_iomgr_closure *on_complete, grpc_iomgr_call_list *call_list) { + grpc_closure *on_complete, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); @@ -178,7 +177,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { pick_first_lb_policy *p = arg; pending_pick *pp; int unref = 0; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&p->mu); @@ -205,7 +204,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { p->pending_picks = pp->next; *pp->target = p->selected; grpc_subchannel_del_interested_party(p->selected, pp->pollset); - grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1); + grpc_call_list_add(&call_list, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( @@ -251,7 +250,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_iomgr_call_list_add(&call_list, pp->on_complete, 1); + grpc_call_list_add(&call_list, pp->on_complete, 1); gpr_free(pp); } unref = 1; @@ -270,7 +269,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { gpr_mu_unlock(&p->mu); - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); if (unref) { GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity"); @@ -278,7 +277,7 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { } static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; size_t n; @@ -301,7 +300,7 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, } static grpc_connectivity_state pf_check_connectivity( - grpc_lb_policy *pol, grpc_iomgr_call_list *call_list) { + grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; grpc_connectivity_state st; gpr_mu_lock(&p->mu); @@ -312,8 +311,8 @@ static grpc_connectivity_state pf_check_connectivity( void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list) { + grpc_closure *notify, + grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; gpr_mu_lock(&p->mu); grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, @@ -348,7 +347,7 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, "pick_first"); memcpy(p->subchannels, args->subchannels, sizeof(grpc_subchannel *) * args->num_subchannels); - grpc_iomgr_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); + grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p); gpr_mu_init(&p->mu); return &p->base; } diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index a9dc9dcca8..95ab29cd88 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -64,37 +64,36 @@ void grpc_lb_policy_unref(grpc_lb_policy *policy) { } void grpc_lb_policy_shutdown(grpc_lb_policy *policy, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { policy->vtable->shutdown(policy, call_list); } void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, - grpc_iomgr_closure *on_complete, - grpc_iomgr_call_list *call_list) { + grpc_subchannel **target, grpc_closure *on_complete, + grpc_call_list *call_list) { policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete, call_list); } void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { policy->vtable->broadcast(policy, op, call_list); } void grpc_lb_policy_exit_idle(grpc_lb_policy *policy, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { policy->vtable->exit_idle(policy, call_list); } void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, grpc_connectivity_state *state, - grpc_iomgr_closure *closure, - grpc_iomgr_call_list *call_list) { + grpc_closure *closure, + grpc_call_list *call_list) { policy->vtable->notify_on_state_change(policy, state, closure, call_list); } grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_lb_policy *policy, grpc_iomgr_call_list *call_list) { + grpc_lb_policy *policy, grpc_call_list *call_list) { return policy->vtable->check_connectivity(policy, call_list); } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 8d7eb579b5..e445d2cded 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -53,31 +53,30 @@ struct grpc_lb_policy { struct grpc_lb_policy_vtable { void (*destroy)(grpc_lb_policy *policy); - void (*shutdown)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); + void (*shutdown)(grpc_lb_policy *policy, grpc_call_list *call_list); /** implement grpc_lb_policy_pick */ void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, grpc_subchannel **target, - grpc_iomgr_closure *on_complete, - grpc_iomgr_call_list *call_list); + grpc_closure *on_complete, grpc_call_list *call_list); /** try to enter a READY connectivity state */ - void (*exit_idle)(grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); + void (*exit_idle)(grpc_lb_policy *policy, grpc_call_list *call_list); /** broadcast a transport op to all subchannels */ void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op, - grpc_iomgr_call_list *call_list); + grpc_call_list *call_list); /** check the current connectivity of the lb_policy */ - grpc_connectivity_state (*check_connectivity)( - grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); + grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy, + grpc_call_list *call_list); /** call notify when the connectivity state of a channel changes from *state. Updates *state with the new state of the policy */ void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, - grpc_iomgr_closure *closure, - grpc_iomgr_call_list *call_list); + grpc_closure *closure, + grpc_call_list *call_list); }; #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG @@ -101,8 +100,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); /** Start shutting down (fail any pending picks) */ -void grpc_lb_policy_shutdown(grpc_lb_policy *policy, - grpc_iomgr_call_list *call_list); +void grpc_lb_policy_shutdown(grpc_lb_policy *policy, grpc_call_list *call_list); /** Given initial metadata in \a initial_metadata, find an appropriate target for this rpc, and 'return' it by calling \a on_complete after setting @@ -110,22 +108,21 @@ void grpc_lb_policy_shutdown(grpc_lb_policy *policy, Picking can be asynchronous. Any IO should be done under \a pollset. */ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_metadata_batch *initial_metadata, - grpc_subchannel **target, - grpc_iomgr_closure *on_complete, - grpc_iomgr_call_list *call_list); + grpc_subchannel **target, grpc_closure *on_complete, + grpc_call_list *call_list); void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op, - grpc_iomgr_call_list *call_list); + grpc_call_list *call_list); void grpc_lb_policy_exit_idle(grpc_lb_policy *policy, - grpc_iomgr_call_list *call_list); + grpc_call_list *call_list); void grpc_lb_policy_notify_on_state_change(grpc_lb_policy *policy, grpc_connectivity_state *state, - grpc_iomgr_closure *closure, - grpc_iomgr_call_list *call_list); + grpc_closure *closure, + grpc_call_list *call_list); grpc_connectivity_state grpc_lb_policy_check_connectivity( - grpc_lb_policy *policy, grpc_iomgr_call_list *call_list); + grpc_lb_policy *policy, grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c index 91e42bb684..6a7d73c54f 100644 --- a/src/core/client_config/resolver.c +++ b/src/core/client_config/resolver.c @@ -78,6 +78,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver, void grpc_resolver_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { + grpc_closure *on_complete) { resolver->vtable->next(resolver, target_config, on_complete); } diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h index 8ad87d789b..217d061abe 100644 --- a/src/core/client_config/resolver.h +++ b/src/core/client_config/resolver.h @@ -55,7 +55,7 @@ struct grpc_resolver_vtable { struct sockaddr *failing_address, int failing_address_len); void (*next)(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG @@ -92,6 +92,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver, schedule on_complete. */ void grpc_resolver_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); #endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_H */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 4fee543a5b..e251664e03 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -69,7 +69,7 @@ typedef struct { /** which version of resolved_config is current? */ int resolved_version; /** pending next completion, or NULL */ - grpc_iomgr_closure *next_completion; + grpc_closure *next_completion; /** target config address for next completion */ grpc_client_config **target_config; /** current (fully resolved) config */ @@ -79,7 +79,7 @@ typedef struct { static void dns_destroy(grpc_resolver *r); static void dns_start_resolving_locked(dns_resolver *r); -static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) +static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r) GRPC_MUST_USE_RESULT; static void dns_shutdown(grpc_resolver *r); @@ -87,14 +87,14 @@ static void dns_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, int failing_address_len); static void dns_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; static void dns_shutdown(grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; - grpc_iomgr_closure *next_completion; + grpc_closure *next_completion; gpr_mu_lock(&r->mu); next_completion = r->next_completion; r->next_completion = NULL; @@ -117,9 +117,9 @@ static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, static void dns_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { + grpc_closure *on_complete) { dns_resolver *r = (dns_resolver *)resolver; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; @@ -141,7 +141,7 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; - grpc_iomgr_closure *call; + grpc_closure *call; size_t i; if (addresses) { grpc_lb_policy_args lb_policy_args; @@ -188,8 +188,8 @@ static void dns_start_resolving_locked(dns_resolver *r) { grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); } -static grpc_iomgr_closure *dns_maybe_finish_next_locked(dns_resolver *r) { - grpc_iomgr_closure *ret = NULL; +static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r) { + grpc_closure *ret = NULL; if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 220915853c..47fe5fad4a 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -73,22 +73,22 @@ typedef struct { /** have we published? */ int published; /** pending next completion, or NULL */ - grpc_iomgr_closure *next_completion; + grpc_closure *next_completion; /** target config address for next completion */ grpc_client_config **target_config; } sockaddr_resolver; static void sockaddr_destroy(grpc_resolver *r); -static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked( - sockaddr_resolver *r) GRPC_MUST_USE_RESULT; +static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) + GRPC_MUST_USE_RESULT; static void sockaddr_shutdown(grpc_resolver *r); static void sockaddr_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, int failing_address_len); static void sockaddr_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); static const grpc_resolver_vtable sockaddr_resolver_vtable = { sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, @@ -96,7 +96,7 @@ static const grpc_resolver_vtable sockaddr_resolver_vtable = { static void sockaddr_shutdown(grpc_resolver *resolver) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; @@ -114,9 +114,9 @@ static void sockaddr_channel_saw_error(grpc_resolver *resolver, static void sockaddr_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { + grpc_closure *on_complete) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; @@ -126,14 +126,13 @@ static void sockaddr_next(grpc_resolver *resolver, if (call) call->cb(call->cb_arg, 1); } -static grpc_iomgr_closure *sockaddr_maybe_finish_next_locked( - sockaddr_resolver *r) { +static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; grpc_lb_policy_args lb_policy_args; grpc_subchannel **subchannels; grpc_subchannel_args args; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; if (r->next_completion != NULL && !r->published) { size_t i; diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 94ec12dd16..5632f0425e 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -73,7 +73,7 @@ typedef struct { /** which version of resolved_config is current? */ int resolved_version; /** pending next completion, or NULL */ - grpc_iomgr_closure *next_completion; + grpc_closure *next_completion; /** target config address for next completion */ grpc_client_config **target_config; /** current (fully resolved) config */ @@ -92,15 +92,15 @@ typedef struct { static void zookeeper_destroy(grpc_resolver *r); static void zookeeper_start_resolving_locked(zookeeper_resolver *r); -static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked( - zookeeper_resolver *r) GRPC_MUST_USE_RESULT; +static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) + GRPC_MUST_USE_RESULT; static void zookeeper_shutdown(grpc_resolver *r); static void zookeeper_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, int failing_address_len); static void zookeeper_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete); + grpc_closure *on_complete); static const grpc_resolver_vtable zookeeper_resolver_vtable = { zookeeper_destroy, zookeeper_shutdown, zookeeper_channel_saw_error, @@ -108,7 +108,7 @@ static const grpc_resolver_vtable zookeeper_resolver_vtable = { static void zookeeper_shutdown(grpc_resolver *resolver) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; - grpc_iomgr_closure *call = NULL; + grpc_closure *call = NULL; gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; @@ -134,9 +134,9 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver, static void zookeeper_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_iomgr_closure *on_complete) { + grpc_closure *on_complete) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; - grpc_iomgr_closure *call; + grpc_closure *call; gpr_mu_lock(&r->mu); GPR_ASSERT(r->next_completion == NULL); r->next_completion = on_complete; @@ -189,7 +189,7 @@ static void zookeeper_on_resolved(void *arg, grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; - grpc_iomgr_closure *call; + grpc_closure *call; size_t i; if (addresses != NULL) { grpc_lb_policy_args lb_policy_args; @@ -414,9 +414,8 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { zookeeper_resolve_address(r); } -static grpc_iomgr_closure *zookeeper_maybe_finish_next_locked( - zookeeper_resolver *r) { - grpc_iomgr_closure *call = NULL; +static grpc_closure *zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { + grpc_closure *call = NULL; if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index b15acf826a..99310e02e5 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -59,7 +59,7 @@ typedef struct { } connection; typedef struct { - grpc_iomgr_closure closure; + grpc_closure closure; size_t version; grpc_subchannel *subchannel; grpc_connectivity_state connectivity_state; @@ -67,11 +67,11 @@ typedef struct { typedef struct waiting_for_connect { struct waiting_for_connect *next; - grpc_iomgr_closure *notify; + grpc_closure *notify; grpc_pollset *pollset; grpc_subchannel_call **target; grpc_subchannel *subchannel; - grpc_iomgr_closure continuation; + grpc_closure continuation; } waiting_for_connect; struct grpc_subchannel { @@ -100,7 +100,7 @@ struct grpc_subchannel { grpc_connect_out_args connecting_result; /** callback for connection finishing */ - grpc_iomgr_closure connected; + grpc_closure connected; /** pollset_set tracking who's interested in a connection being setup - owned by the master channel (in particular the @@ -147,7 +147,7 @@ struct grpc_subchannel_call { static grpc_subchannel_call *create_call(connection *con); static void connectivity_state_changed_locked(grpc_subchannel *c, const char *reason, - grpc_iomgr_call_list *call_list); + grpc_call_list *call_list); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); @@ -303,7 +303,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); c->random = random_seed(); grpc_mdctx_ref(c->mdctx); - grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); + grpc_closure_init(&c->connected, subchannel_connected, c); grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE, "subchannel"); gpr_mu_init(&c->mu); @@ -334,7 +334,7 @@ static void start_connect(grpc_subchannel *c) { static void continue_creating_call(void *arg, int iomgr_success) { waiting_for_connect *w4c = arg; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset); grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, w4c->notify, &call_list); @@ -344,8 +344,8 @@ static void continue_creating_call(void *arg, int iomgr_success) { void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, grpc_subchannel_call **target, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list) { + grpc_closure *notify, + grpc_call_list *call_list) { connection *con; gpr_mu_lock(&c->mu); if (c->active != NULL) { @@ -364,7 +364,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, w4c->subchannel = c; /* released when clearing w4c */ SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect"); - grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c); + grpc_closure_init(&w4c->continuation, continue_creating_call, w4c); c->waiting = w4c; grpc_subchannel_add_interested_party(c, pollset); if (!c->connecting) { @@ -392,8 +392,8 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) { void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, grpc_connectivity_state *state, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list) { + grpc_closure *notify, + grpc_call_list *call_list) { int do_connect = 0; gpr_mu_lock(&c->mu); if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, @@ -417,7 +417,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, connection *con = NULL; grpc_subchannel *destroy; int cancel_alarm = 0; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; @@ -454,7 +454,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_connector_shutdown(c->connector); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } static void on_state_changed(void *p, int iomgr_success) { @@ -465,7 +465,7 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_transport_op op; grpc_channel_element *elem; connection *destroy_connection = NULL; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(mu); @@ -514,11 +514,10 @@ done: if (destroy_connection != NULL) { connection_destroy(destroy_connection); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } -static void publish_transport(grpc_subchannel *c, - grpc_iomgr_call_list *call_list) { +static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { size_t channel_stack_size; connection *con; grpc_channel_stack *stk; @@ -552,7 +551,7 @@ static void publish_transport(grpc_subchannel *c, /* initialize state watcher */ sw = gpr_malloc(sizeof(*sw)); - grpc_iomgr_closure_init(&sw->closure, on_state_changed, sw); + grpc_closure_init(&sw->closure, on_state_changed, sw); sw->subchannel = c; sw->connectivity_state = GRPC_CHANNEL_READY; @@ -599,7 +598,7 @@ static void publish_transport(grpc_subchannel *c, while (w4c != NULL) { waiting_for_connect *next = w4c; - grpc_iomgr_call_list_add(call_list, &w4c->continuation, 1); + grpc_call_list_add(call_list, &w4c->continuation, 1); w4c = next; } @@ -641,7 +640,7 @@ static void update_reconnect_parameters(grpc_subchannel *c) { static void on_alarm(void *arg, int iomgr_success) { grpc_subchannel *c = arg; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&c->mu); c->have_alarm = 0; if (c->disconnected) { @@ -656,12 +655,12 @@ static void on_alarm(void *arg, int iomgr_success) { GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); GRPC_SUBCHANNEL_UNREF(c, "connecting"); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } static void subchannel_connected(void *arg, int iomgr_success) { grpc_subchannel *c = arg; - grpc_iomgr_call_list call_list = GRPC_IOMGR_CALL_LIST_INIT; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; if (c->connecting_result.transport != NULL) { publish_transport(c, &call_list); } else { @@ -673,7 +672,7 @@ static void subchannel_connected(void *arg, int iomgr_success) { grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now); gpr_mu_unlock(&c->mu); } - grpc_iomgr_call_list_run(call_list); + grpc_call_list_run(call_list); } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { @@ -705,7 +704,7 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) { static void connectivity_state_changed_locked(grpc_subchannel *c, const char *reason, - grpc_iomgr_call_list *call_list) { + grpc_call_list *call_list) { grpc_connectivity_state current = compute_connectivity_locked(c); grpc_connectivity_state_set(&c->state_tracker, current, reason, call_list); } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 7c00ff172d..189e531d35 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -76,8 +76,8 @@ void grpc_subchannel_call_unref( void grpc_subchannel_create_call(grpc_subchannel *subchannel, grpc_pollset *pollset, grpc_subchannel_call **target, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list); + grpc_closure *notify, + grpc_call_list *call_list); /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, @@ -91,8 +91,8 @@ grpc_connectivity_state grpc_subchannel_check_connectivity( Updates *state with the new state of the channel */ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, grpc_connectivity_state *state, - grpc_iomgr_closure *notify, - grpc_iomgr_call_list *call_list); + grpc_closure *notify, + grpc_call_list *call_list); /** express interest in \a channel's activities through \a pollset. */ void grpc_subchannel_add_interested_party(grpc_subchannel *channel, |