diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-18 17:29:00 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-18 17:29:00 -0700 |
commit | d1bec03fa148344b8eac2b59517252d86e4ca858 (patch) | |
tree | f359e48f9151ab7ceff72cd624ad6c7a59e4d304 /src/core/client_config | |
parent | 33825118df7157219cec15382beb006d3462ad96 (diff) |
Call list progress
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/client_config.c | 9 | ||||
-rw-r--r-- | src/core/client_config/client_config.h | 3 | ||||
-rw-r--r-- | src/core/client_config/connector.c | 9 | ||||
-rw-r--r-- | src/core/client_config/connector.h | 18 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 86 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 8 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 14 | ||||
-rw-r--r-- | src/core/client_config/lb_policy_factory.h | 2 | ||||
-rw-r--r-- | src/core/client_config/resolver.c | 26 | ||||
-rw-r--r-- | src/core/client_config/resolver.h | 27 | ||||
-rw-r--r-- | src/core/client_config/resolver_factory.h | 2 | ||||
-rw-r--r-- | src/core/client_config/resolver_registry.c | 6 | ||||
-rw-r--r-- | src/core/client_config/resolver_registry.h | 5 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 67 | ||||
-rw-r--r-- | src/core/client_config/resolvers/sockaddr_resolver.c | 50 | ||||
-rw-r--r-- | src/core/client_config/resolvers/zookeeper_resolver.c | 6 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 158 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 38 | ||||
-rw-r--r-- | src/core/client_config/subchannel_factory.h | 6 |
19 files changed, 253 insertions, 287 deletions
diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c index 4453824148..0780880473 100644 --- a/src/core/client_config/client_config.c +++ b/src/core/client_config/client_config.c @@ -51,21 +51,20 @@ grpc_client_config *grpc_client_config_create() { void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); } -void grpc_client_config_unref(grpc_client_config *c) { +void grpc_client_config_unref(grpc_client_config *c, + grpc_call_list *call_list) { if (gpr_unref(&c->refs)) { - GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config"); + GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config", call_list); gpr_free(c); } } void grpc_client_config_set_lb_policy(grpc_client_config *c, grpc_lb_policy *lb_policy) { + GPR_ASSERT(c->lb_policy == NULL); if (lb_policy) { GRPC_LB_POLICY_REF(lb_policy, "client_config"); } - if (c->lb_policy) { - GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config"); - } c->lb_policy = lb_policy; } diff --git a/src/core/client_config/client_config.h b/src/core/client_config/client_config.h index 47612da42c..76a5c66594 100644 --- a/src/core/client_config/client_config.h +++ b/src/core/client_config/client_config.h @@ -42,7 +42,8 @@ typedef struct grpc_client_config grpc_client_config; grpc_client_config *grpc_client_config_create(); void grpc_client_config_ref(grpc_client_config *client_config); -void grpc_client_config_unref(grpc_client_config *client_config); +void grpc_client_config_unref(grpc_client_config *client_config, + grpc_call_list *call_list); void grpc_client_config_set_lb_policy(grpc_client_config *client_config, grpc_lb_policy *lb_policy); diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c index 6252d57271..31f0b84efe 100644 --- a/src/core/client_config/connector.c +++ b/src/core/client_config/connector.c @@ -44,10 +44,11 @@ void grpc_connector_unref(grpc_connector *connector) { void grpc_connector_connect(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, - grpc_closure *notify) { - connector->vtable->connect(connector, in_args, out_args, notify); + grpc_closure *notify, grpc_call_list *call_list) { + connector->vtable->connect(connector, in_args, out_args, notify, call_list); } -void grpc_connector_shutdown(grpc_connector *connector) { - connector->vtable->shutdown(connector); +void grpc_connector_shutdown(grpc_connector *connector, + grpc_call_list *call_list) { + connector->vtable->shutdown(connector, call_list); } diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h index fc1f6708de..388f656c44 100644 --- a/src/core/client_config/connector.h +++ b/src/core/client_config/connector.h @@ -55,10 +55,6 @@ typedef struct { gpr_timespec deadline; /** channel arguments (to be passed to transport) */ const grpc_channel_args *channel_args; - /** metadata context */ - grpc_mdctx *metadata_context; - /** workqueue */ - grpc_workqueue *workqueue; } grpc_connect_in_args; typedef struct { @@ -71,23 +67,25 @@ typedef struct { struct grpc_connector_vtable { void (*ref)(grpc_connector *connector); - void (*unref)(grpc_connector *connector); + void (*unref)(grpc_connector *connector, grpc_call_list *call_list); /** Implementation of grpc_connector_shutdown */ - void (*shutdown)(grpc_connector *connector); + void (*shutdown)(grpc_connector *connector, grpc_call_list *call_list); /** Implementation of grpc_connector_connect */ void (*connect)(grpc_connector *connector, const grpc_connect_in_args *in_args, - grpc_connect_out_args *out_args, grpc_closure *notify); + grpc_connect_out_args *out_args, grpc_closure *notify, + grpc_call_list *call_list); }; void grpc_connector_ref(grpc_connector *connector); -void grpc_connector_unref(grpc_connector *connector); +void grpc_connector_unref(grpc_connector *connector, grpc_call_list *call_list); /** Connect using the connector: max one outstanding call at a time */ void grpc_connector_connect(grpc_connector *connector, const grpc_connect_in_args *in_args, grpc_connect_out_args *out_args, - grpc_closure *notify); + grpc_closure *notify, grpc_call_list *call_list); /** Cancel any pending connection */ -void grpc_connector_shutdown(grpc_connector *connector); +void grpc_connector_shutdown(grpc_connector *connector, + grpc_call_list *call_list); #endif diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 852eed310d..6dc52f43ce 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -52,8 +52,6 @@ typedef struct { /** all our subchannels */ grpc_subchannel **subchannels; size_t num_subchannels; - /** workqueue for async work */ - grpc_workqueue *workqueue; grpc_closure connectivity_changed; @@ -78,33 +76,34 @@ typedef struct { grpc_connectivity_state_tracker state_tracker; } pick_first_lb_policy; -static void del_interested_parties_locked(pick_first_lb_policy *p) { +static void del_interested_parties_locked(pick_first_lb_policy *p, + grpc_call_list *call_list) { pending_pick *pp; for (pp = p->pending_picks; pp; pp = pp->next) { grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset); + pp->pollset, call_list); } } -static void add_interested_parties_locked(pick_first_lb_policy *p) { +static void add_interested_parties_locked(pick_first_lb_policy *p, + grpc_call_list *call_list) { pending_pick *pp; for (pp = p->pending_picks; pp; pp = pp->next) { grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], - pp->pollset); + pp->pollset, call_list); } } -void pf_destroy(grpc_lb_policy *pol) { +void pf_destroy(grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; size_t i; - del_interested_parties_locked(p); + GPR_ASSERT(p->shutdown); for (i = 0; i < p->num_subchannels; i++) { - GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first"); + GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first", call_list); } grpc_connectivity_state_destroy(&p->state_tracker); gpr_free(p->subchannels); gpr_mu_destroy(&p->mu); - GRPC_WORKQUEUE_UNREF(p->workqueue, "pick_first"); gpr_free(p); } @@ -112,7 +111,7 @@ void pf_shutdown(grpc_lb_policy *pol, grpc_call_list *call_list) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; pending_pick *pp; gpr_mu_lock(&p->mu); - del_interested_parties_locked(p); + del_interested_parties_locked(p, call_list); p->shutdown = 1; pp = p->pending_picks; p->pending_picks = NULL; @@ -156,13 +155,13 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, if (p->selected) { gpr_mu_unlock(&p->mu); *target = p->selected; - on_complete->cb(on_complete->cb_arg, 1); + grpc_call_list_add(call_list, on_complete, 1); } else { if (!p->started_picking) { start_picking(p, call_list); } grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel], - pollset); + pollset, call_list); pp = gpr_malloc(sizeof(*pp)); pp->next = p->pending_picks; pp->pollset = pollset; @@ -173,58 +172,58 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset, } } -static void pf_connectivity_changed(void *arg, int iomgr_success) { +static void pf_connectivity_changed(void *arg, int iomgr_success, + grpc_call_list *call_list) { pick_first_lb_policy *p = arg; pending_pick *pp; - int unref = 0; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&p->mu); if (p->shutdown) { - unref = 1; + GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list); } else if (p->selected != NULL) { grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "selected_changed", &call_list); + "selected_changed", call_list); if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) { grpc_subchannel_notify_on_state_change( p->selected, &p->checking_connectivity, &p->connectivity_changed, - &call_list); + call_list); } else { - unref = 1; + GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list); } } else { loop: switch (p->checking_connectivity) { case GRPC_CHANNEL_READY: grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY, - "connecting_ready", &call_list); + "connecting_ready", call_list); p->selected = p->subchannels[p->checking_subchannel]; while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = p->selected; - grpc_subchannel_del_interested_party(p->selected, pp->pollset); - grpc_call_list_add(&call_list, pp->on_complete, 1); + grpc_subchannel_del_interested_party(p->selected, pp->pollset, + call_list); + grpc_call_list_add(call_list, pp->on_complete, 1); gpr_free(pp); } grpc_subchannel_notify_on_state_change( p->selected, &p->checking_connectivity, &p->connectivity_changed, - &call_list); + call_list); break; case GRPC_CHANNEL_TRANSIENT_FAILURE: grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "connecting_transient_failure", &call_list); - del_interested_parties_locked(p); + "connecting_transient_failure", call_list); + del_interested_parties_locked(p, call_list); p->checking_subchannel = (p->checking_subchannel + 1) % p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(p); + add_interested_parties_locked(p, call_list); if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) { grpc_subchannel_notify_on_state_change( p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, &call_list); + &p->connectivity_changed, call_list); } else { goto loop; } @@ -232,48 +231,43 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) { case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity, - "connecting_changed", &call_list); + "connecting_changed", call_list); grpc_subchannel_notify_on_state_change( p->subchannels[p->checking_subchannel], &p->checking_connectivity, - &p->connectivity_changed, &call_list); + &p->connectivity_changed, call_list); break; case GRPC_CHANNEL_FATAL_FAILURE: - del_interested_parties_locked(p); + del_interested_parties_locked(p, call_list); GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel], p->subchannels[p->num_subchannels - 1]); p->num_subchannels--; - GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first", + call_list); if (p->num_subchannels == 0) { grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_FATAL_FAILURE, - "no_more_channels", &call_list); + "no_more_channels", call_list); while ((pp = p->pending_picks)) { p->pending_picks = pp->next; *pp->target = NULL; - grpc_call_list_add(&call_list, pp->on_complete, 1); + grpc_call_list_add(call_list, pp->on_complete, 1); gpr_free(pp); } - unref = 1; + GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list); } else { grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE, - "subchannel_failed", &call_list); + "subchannel_failed", call_list); p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity( p->subchannels[p->checking_subchannel]); - add_interested_parties_locked(p); + add_interested_parties_locked(p, call_list); goto loop; } } } gpr_mu_unlock(&p->mu); - - grpc_call_list_run(call_list); - - if (unref) { - GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity"); - } } static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, @@ -293,8 +287,8 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op, gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { - grpc_subchannel_process_transport_op(subchannels[i], op); - GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast"); + grpc_subchannel_process_transport_op(subchannels[i], op, call_list); + GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast", call_list); } gpr_free(subchannels); } @@ -341,8 +335,6 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory, grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable); p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels); p->num_subchannels = args->num_subchannels; - p->workqueue = args->workqueue; - GRPC_WORKQUEUE_REF(p->workqueue, "pick_first"); grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE, "pick_first"); memcpy(p->subchannels, args->subchannels, diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 95ab29cd88..84a033d583 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -51,15 +51,15 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy) { } #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG -void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, - const char *reason) { +void grpc_lb_policy_unref(grpc_lb_policy *policy, grpc_call_list *call_list, + const char *file, int line, const char *reason) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s", policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason); #else -void grpc_lb_policy_unref(grpc_lb_policy *policy) { +void grpc_lb_policy_unref(grpc_lb_policy *policy, grpc_call_list *call_list) { #endif if (gpr_unref(&policy->refs)) { - policy->vtable->destroy(policy); + policy->vtable->destroy(policy, call_list); } } diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index e445d2cded..f9430b4250 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -51,7 +51,7 @@ struct grpc_lb_policy { }; struct grpc_lb_policy_vtable { - void (*destroy)(grpc_lb_policy *policy); + void (*destroy)(grpc_lb_policy *policy, grpc_call_list *call_list); void (*shutdown)(grpc_lb_policy *policy, grpc_call_list *call_list); @@ -82,17 +82,17 @@ struct grpc_lb_policy_vtable { #ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG #define GRPC_LB_POLICY_REF(p, r) \ grpc_lb_policy_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_LB_POLICY_UNREF(p, r) \ - grpc_lb_policy_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_LB_POLICY_UNREF(p, r, cl) \ + grpc_lb_policy_unref((p), (cl), __FILE__, __LINE__, (r)) void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line, const char *reason); -void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line, - const char *reason); +void grpc_lb_policy_unref(grpc_lb_policy *policy, grpc_call_list *call_list, + const char *file, int line, const char *reason); #else #define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p)) -#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p)) +#define GRPC_LB_POLICY_UNREF(p, r, cl) grpc_lb_policy_unref((p), (cl)) void grpc_lb_policy_ref(grpc_lb_policy *policy); -void grpc_lb_policy_unref(grpc_lb_policy *policy); +void grpc_lb_policy_unref(grpc_lb_policy *policy, grpc_call_list *call_list); #endif /** called by concrete implementations to initialize the base struct */ diff --git a/src/core/client_config/lb_policy_factory.h b/src/core/client_config/lb_policy_factory.h index 755faf8813..04610316ee 100644 --- a/src/core/client_config/lb_policy_factory.h +++ b/src/core/client_config/lb_policy_factory.h @@ -36,7 +36,6 @@ #include "src/core/client_config/lb_policy.h" #include "src/core/client_config/subchannel.h" -#include "src/core/iomgr/workqueue.h" typedef struct grpc_lb_policy_factory grpc_lb_policy_factory; typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable; @@ -50,7 +49,6 @@ struct grpc_lb_policy_factory { typedef struct grpc_lb_policy_args { grpc_subchannel **subchannels; size_t num_subchannels; - grpc_workqueue *workqueue; } grpc_lb_policy_args; struct grpc_lb_policy_factory_vtable { diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c index 6a7d73c54f..eae8ec6ddf 100644 --- a/src/core/client_config/resolver.c +++ b/src/core/client_config/resolver.c @@ -40,8 +40,8 @@ void grpc_resolver_init(grpc_resolver *resolver, } #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG -void grpc_resolver_ref(grpc_resolver *resolver, const char *file, int line, - const char *reason) { +void grpc_resolver_ref(grpc_resolver *resolver, grpc_call_list *call_list, + const char *file, int line, const char *reason) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s", resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1, reason); @@ -52,32 +52,34 @@ void grpc_resolver_ref(grpc_resolver *resolver) { } #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG -void grpc_resolver_unref(grpc_resolver *resolver, const char *file, int line, - const char *reason) { +void grpc_resolver_unref(grpc_resolver *resolver, grpc_call_list *call_list, + const char *file, int line, const char *reason) { gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s", resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1, reason); #else -void grpc_resolver_unref(grpc_resolver *resolver) { +void grpc_resolver_unref(grpc_resolver *resolver, grpc_call_list *call_list) { #endif if (gpr_unref(&resolver->refs)) { - resolver->vtable->destroy(resolver); + resolver->vtable->destroy(resolver, call_list); } } -void grpc_resolver_shutdown(grpc_resolver *resolver) { - resolver->vtable->shutdown(resolver); +void grpc_resolver_shutdown(grpc_resolver *resolver, + grpc_call_list *call_list) { + resolver->vtable->shutdown(resolver, call_list); } void grpc_resolver_channel_saw_error(grpc_resolver *resolver, struct sockaddr *failing_address, - int failing_address_len) { + int failing_address_len, + grpc_call_list *call_list) { resolver->vtable->channel_saw_error(resolver, failing_address, - failing_address_len); + failing_address_len, call_list); } void grpc_resolver_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_closure *on_complete) { - resolver->vtable->next(resolver, target_config, on_complete); + grpc_closure *on_complete, grpc_call_list *call_list) { + resolver->vtable->next(resolver, target_config, on_complete, call_list); } diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h index 217d061abe..7960bcb92f 100644 --- a/src/core/client_config/resolver.h +++ b/src/core/client_config/resolver.h @@ -49,40 +49,41 @@ struct grpc_resolver { }; struct grpc_resolver_vtable { - void (*destroy)(grpc_resolver *resolver); - void (*shutdown)(grpc_resolver *resolver); + void (*destroy)(grpc_resolver *resolver, grpc_call_list *call_list); + void (*shutdown)(grpc_resolver *resolver, grpc_call_list *call_list); void (*channel_saw_error)(grpc_resolver *resolver, struct sockaddr *failing_address, - int failing_address_len); + int failing_address_len, grpc_call_list *call_list); void (*next)(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_closure *on_complete); + grpc_closure *on_complete, grpc_call_list *call_list); }; #ifdef GRPC_RESOLVER_REFCOUNT_DEBUG #define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_RESOLVER_UNREF(p, r) \ - grpc_resolver_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_RESOLVER_UNREF(p, r, cl) \ + grpc_resolver_unref((p), (cl), __FILE__, __LINE__, (r)) void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line, const char *reason); -void grpc_resolver_unref(grpc_resolver *policy, const char *file, int line, - const char *reason); +void grpc_resolver_unref(grpc_resolver *policy, grpc_call_list *call_list, + const char *file, int line, const char *reason); #else #define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p)) -#define GRPC_RESOLVER_UNREF(p, r) grpc_resolver_unref((p)) +#define GRPC_RESOLVER_UNREF(p, r, cl) grpc_resolver_unref((p), (cl)) void grpc_resolver_ref(grpc_resolver *policy); -void grpc_resolver_unref(grpc_resolver *policy); +void grpc_resolver_unref(grpc_resolver *policy, grpc_call_list *call_list); #endif void grpc_resolver_init(grpc_resolver *resolver, const grpc_resolver_vtable *vtable); -void grpc_resolver_shutdown(grpc_resolver *resolver); +void grpc_resolver_shutdown(grpc_resolver *resolver, grpc_call_list *call_list); /** Notification that the channel has seen an error on some address. Can be used as a hint that re-resolution is desirable soon. */ void grpc_resolver_channel_saw_error(grpc_resolver *resolver, struct sockaddr *failing_address, - int failing_address_len); + int failing_address_len, + grpc_call_list *call_list); /** Get the next client config. Called by the channel to fetch a new configuration. Expected to set *target_config with a new configuration, @@ -92,6 +93,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver, schedule on_complete. */ void grpc_resolver_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_closure *on_complete); + grpc_closure *on_complete, grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_H */ diff --git a/src/core/client_config/resolver_factory.h b/src/core/client_config/resolver_factory.h index 24da84d89d..4c4df353f7 100644 --- a/src/core/client_config/resolver_factory.h +++ b/src/core/client_config/resolver_factory.h @@ -37,7 +37,6 @@ #include "src/core/client_config/resolver.h" #include "src/core/client_config/subchannel_factory.h" #include "src/core/client_config/uri_parser.h" -#include "src/core/iomgr/workqueue.h" typedef struct grpc_resolver_factory grpc_resolver_factory; typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable; @@ -51,7 +50,6 @@ struct grpc_resolver_factory { typedef struct grpc_resolver_args { grpc_uri *uri; grpc_subchannel_factory *subchannel_factory; - grpc_workqueue *workqueue; } grpc_resolver_args; struct grpc_resolver_factory_vtable { diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c index 3611252bda..89a945c2d3 100644 --- a/src/core/client_config/resolver_registry.c +++ b/src/core/client_config/resolver_registry.c @@ -114,9 +114,8 @@ static grpc_resolver_factory *resolve_factory(const char *target, return factory; } -grpc_resolver *grpc_resolver_create(const char *target, - grpc_subchannel_factory *subchannel_factory, - grpc_workqueue *workqueue) { +grpc_resolver *grpc_resolver_create( + const char *target, grpc_subchannel_factory *subchannel_factory) { grpc_uri *uri = NULL; grpc_resolver_factory *factory = resolve_factory(target, &uri); grpc_resolver *resolver; @@ -124,7 +123,6 @@ grpc_resolver *grpc_resolver_create(const char *target, memset(&args, 0, sizeof(args)); args.uri = uri; args.subchannel_factory = subchannel_factory; - args.workqueue = workqueue; resolver = grpc_resolver_factory_create_resolver(factory, &args); grpc_uri_destroy(uri); return resolver; diff --git a/src/core/client_config/resolver_registry.h b/src/core/client_config/resolver_registry.h index 9a9a628262..5a7193b7ae 100644 --- a/src/core/client_config/resolver_registry.h +++ b/src/core/client_config/resolver_registry.h @@ -55,9 +55,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory); If a resolver factory was found, use it to instantiate a resolver and return it. If a resolver factory was not found, return NULL. */ -grpc_resolver *grpc_resolver_create(const char *target, - grpc_subchannel_factory *subchannel_factory, - grpc_workqueue *workqueue); +grpc_resolver *grpc_resolver_create( + const char *target, grpc_subchannel_factory *subchannel_factory); /** Given a target, return a (freshly allocated with gpr_malloc) string representing the default authority to pass from a client. */ diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index e251664e03..9e9b3d4917 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -49,8 +49,6 @@ typedef struct { grpc_resolver base; /** refcount */ gpr_refcount refs; - /** workqueue */ - grpc_workqueue *workqueue; /** name to resolve */ char *name; /** default port to use */ @@ -76,37 +74,36 @@ typedef struct { grpc_client_config *resolved_config; } dns_resolver; -static void dns_destroy(grpc_resolver *r); +static void dns_destroy(grpc_resolver *r, grpc_call_list *call_list); static void dns_start_resolving_locked(dns_resolver *r); -static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r) - GRPC_MUST_USE_RESULT; +static void dns_maybe_finish_next_locked(dns_resolver *r, + grpc_call_list *call_list); -static void dns_shutdown(grpc_resolver *r); +static void dns_shutdown(grpc_resolver *r, grpc_call_list *call_list); static void dns_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, - int failing_address_len); + int failing_address_len, + grpc_call_list *call_list); static void dns_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_closure *on_complete); + grpc_closure *on_complete, grpc_call_list *call_list); static const grpc_resolver_vtable dns_resolver_vtable = { dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; -static void dns_shutdown(grpc_resolver *resolver) { +static void dns_shutdown(grpc_resolver *resolver, grpc_call_list *call_list) { dns_resolver *r = (dns_resolver *)resolver; - grpc_closure *next_completion; gpr_mu_lock(&r->mu); - next_completion = r->next_completion; - r->next_completion = NULL; - gpr_mu_unlock(&r->mu); - if (next_completion != NULL) { + if (r->next_completion != NULL) { *r->target_config = NULL; - next_completion->cb(next_completion->cb_arg, 1); + grpc_call_list_add(call_list, r->next_completion, 1); + r->next_completion = NULL; } + gpr_mu_unlock(&r->mu); } static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, - int len) { + int len, grpc_call_list *call_list) { dns_resolver *r = (dns_resolver *)resolver; gpr_mu_lock(&r->mu); if (!r->resolving) { @@ -117,9 +114,8 @@ static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, static void dns_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_closure *on_complete) { + grpc_closure *on_complete, grpc_call_list *call_list) { dns_resolver *r = (dns_resolver *)resolver; - grpc_closure *call = NULL; gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; @@ -127,21 +123,18 @@ static void dns_next(grpc_resolver *resolver, if (r->resolved_version == 0 && !r->resolving) { dns_start_resolving_locked(r); } else { - call = dns_maybe_finish_next_locked(r); + dns_maybe_finish_next_locked(r, call_list); } gpr_mu_unlock(&r->mu); - if (call) { - call->cb(call->cb_arg, 1); - } } -static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { +static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses, + grpc_call_list *call_list) { dns_resolver *r = arg; grpc_client_config *config = NULL; grpc_subchannel **subchannels; grpc_subchannel_args args; grpc_lb_policy *lb_policy; - grpc_closure *call; size_t i; if (addresses) { grpc_lb_policy_args lb_policy_args; @@ -157,10 +150,9 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { memset(&lb_policy_args, 0, sizeof(lb_policy_args)); lb_policy_args.subchannels = subchannels; lb_policy_args.num_subchannels = addresses->naddrs; - lb_policy_args.workqueue = r->workqueue; lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); grpc_client_config_set_lb_policy(config, lb_policy); - GRPC_LB_POLICY_UNREF(lb_policy, "construction"); + GRPC_LB_POLICY_UNREF(lb_policy, "construction", call_list); grpc_resolved_addresses_destroy(addresses); gpr_free(subchannels); } @@ -168,17 +160,14 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { GPR_ASSERT(r->resolving); r->resolving = 0; if (r->resolved_config) { - grpc_client_config_unref(r->resolved_config); + grpc_client_config_unref(r->resolved_config, call_list); } r->resolved_config = config; r->resolved_version++; - call = dns_maybe_finish_next_locked(r); + dns_maybe_finish_next_locked(r, call_list); gpr_mu_unlock(&r->mu); - if (call) { - call->cb(call->cb_arg, 1); - } - GRPC_RESOLVER_UNREF(&r->base, "dns-resolving"); + GRPC_RESOLVER_UNREF(&r->base, "dns-resolving", call_list); } static void dns_start_resolving_locked(dns_resolver *r) { @@ -188,29 +177,27 @@ static void dns_start_resolving_locked(dns_resolver *r) { grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); } -static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r) { - grpc_closure *ret = NULL; +static void dns_maybe_finish_next_locked(dns_resolver *r, + grpc_call_list *call_list) { if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; if (r->resolved_config) { grpc_client_config_ref(r->resolved_config); } - ret = r->next_completion; + grpc_call_list_add(call_list, r->next_completion, 1); r->next_completion = NULL; r->published_version = r->resolved_version; } - return ret; } -static void dns_destroy(grpc_resolver *gr) { +static void dns_destroy(grpc_resolver *gr, grpc_call_list *call_list) { dns_resolver *r = (dns_resolver *)gr; gpr_mu_destroy(&r->mu); if (r->resolved_config) { - grpc_client_config_unref(r->resolved_config); + grpc_client_config_unref(r->resolved_config, call_list); } grpc_subchannel_factory_unref(r->subchannel_factory); - GRPC_WORKQUEUE_UNREF(r->workqueue, "dns"); gpr_free(r->name); gpr_free(r->default_port); gpr_free(r->lb_policy_name); @@ -239,8 +226,6 @@ static grpc_resolver *dns_create(grpc_resolver_args *args, r->default_port = gpr_strdup(default_port); r->subchannel_factory = args->subchannel_factory; grpc_subchannel_factory_ref(r->subchannel_factory); - r->workqueue = args->workqueue; - GRPC_WORKQUEUE_REF(r->workqueue, "dns"); r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; } diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c index 47fe5fad4a..15eb60b93a 100644 --- a/src/core/client_config/resolvers/sockaddr_resolver.c +++ b/src/core/client_config/resolvers/sockaddr_resolver.c @@ -56,8 +56,6 @@ typedef struct { gpr_refcount refs; /** subchannel factory */ grpc_subchannel_factory *subchannel_factory; - /** workqueue */ - grpc_workqueue *workqueue; /** load balancing policy name */ char *lb_policy_name; @@ -78,61 +76,59 @@ typedef struct { grpc_client_config **target_config; } sockaddr_resolver; -static void sockaddr_destroy(grpc_resolver *r); +static void sockaddr_destroy(grpc_resolver *r, grpc_call_list *call_list); -static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) - GRPC_MUST_USE_RESULT; +static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r, + grpc_call_list *call_list); -static void sockaddr_shutdown(grpc_resolver *r); +static void sockaddr_shutdown(grpc_resolver *r, grpc_call_list *call_list); static void sockaddr_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, - int failing_address_len); + int failing_address_len, + grpc_call_list *call_list); static void sockaddr_next(grpc_resolver *r, grpc_client_config **target_config, - grpc_closure *on_complete); + grpc_closure *on_complete, grpc_call_list *call_list); static const grpc_resolver_vtable sockaddr_resolver_vtable = { sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error, sockaddr_next}; -static void sockaddr_shutdown(grpc_resolver *resolver) { +static void sockaddr_shutdown(grpc_resolver *resolver, + grpc_call_list *call_list) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - grpc_closure *call = NULL; gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - call = r->next_completion; + grpc_call_list_add(call_list, r->next_completion, 1); r->next_completion = NULL; } gpr_mu_unlock(&r->mu); - if (call) { - call->cb(call->cb_arg, 1); - } } static void sockaddr_channel_saw_error(grpc_resolver *resolver, - struct sockaddr *sa, int len) {} + struct sockaddr *sa, int len, + grpc_call_list *call_list) {} static void sockaddr_next(grpc_resolver *resolver, grpc_client_config **target_config, - grpc_closure *on_complete) { + grpc_closure *on_complete, + grpc_call_list *call_list) { sockaddr_resolver *r = (sockaddr_resolver *)resolver; - grpc_closure *call = NULL; gpr_mu_lock(&r->mu); GPR_ASSERT(!r->next_completion); r->next_completion = on_complete; r->target_config = target_config; - call = sockaddr_maybe_finish_next_locked(r); + sockaddr_maybe_finish_next_locked(r, call_list); gpr_mu_unlock(&r->mu); - if (call) call->cb(call->cb_arg, 1); } -static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { +static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r, + grpc_call_list *call_list) { grpc_client_config *cfg; grpc_lb_policy *lb_policy; grpc_lb_policy_args lb_policy_args; grpc_subchannel **subchannels; grpc_subchannel_args args; - grpc_closure *call = NULL; if (r->next_completion != NULL && !r->published) { size_t i; @@ -148,26 +144,22 @@ static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) { memset(&lb_policy_args, 0, sizeof(lb_policy_args)); lb_policy_args.subchannels = subchannels; lb_policy_args.num_subchannels = r->num_addrs; - lb_policy_args.workqueue = r->workqueue; lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args); gpr_free(subchannels); grpc_client_config_set_lb_policy(cfg, lb_policy); - GRPC_LB_POLICY_UNREF(lb_policy, "unix"); + GRPC_LB_POLICY_UNREF(lb_policy, "sockaddr", call_list); r->published = 1; *r->target_config = cfg; - call = r->next_completion; + grpc_call_list_add(call_list, r->next_completion, 1); r->next_completion = NULL; } - - return call; } -static void sockaddr_destroy(grpc_resolver *gr) { +static void sockaddr_destroy(grpc_resolver *gr, grpc_call_list *call_list) { sockaddr_resolver *r = (sockaddr_resolver *)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(r->subchannel_factory); - GRPC_WORKQUEUE_UNREF(r->workqueue, "sockaddr"); gpr_free(r->addrs); gpr_free(r->addrs_len); gpr_free(r->lb_policy_name); @@ -340,8 +332,6 @@ static grpc_resolver *sockaddr_create( grpc_resolver_init(&r->base, &sockaddr_resolver_vtable); r->subchannel_factory = args->subchannel_factory; grpc_subchannel_factory_ref(r->subchannel_factory); - r->workqueue = args->workqueue; - GRPC_WORKQUEUE_REF(r->workqueue, "sockaddr"); r->lb_policy_name = gpr_strdup(lb_policy_name); return &r->base; diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 5632f0425e..c785e76939 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -61,8 +61,6 @@ typedef struct { grpc_subchannel_factory *subchannel_factory; /** load balancing policy name */ char *lb_policy_name; - /** work queue */ - grpc_workqueue *workqueue; /** mutex guarding the rest of the state */ gpr_mu mu; @@ -436,7 +434,6 @@ static void zookeeper_destroy(grpc_resolver *gr) { grpc_client_config_unref(r->resolved_config); } grpc_subchannel_factory_unref(r->subchannel_factory); - grpc_workqueue_unref(r->workqueue); gpr_free(r->name); gpr_free(r->lb_policy_name); gpr_free(r); @@ -466,9 +463,6 @@ static grpc_resolver *zookeeper_create(grpc_resolver_args *args, grpc_resolver_init(&r->base, &zookeeper_resolver_vtable); r->name = gpr_strdup(path); - r->workqueue = args->workqueue; - grpc_workqueue_ref(r->workqueue); - r->subchannel_factory = args->subchannel_factory; grpc_subchannel_factory_ref(r->subchannel_factory); diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 99310e02e5..937b3cd71c 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -76,7 +76,6 @@ typedef struct waiting_for_connect { struct grpc_subchannel { grpc_connector *connector; - grpc_workqueue *workqueue; /** non-transport related channel filters */ const grpc_channel_filter **filters; @@ -150,7 +149,8 @@ static void connectivity_state_changed_locked(grpc_subchannel *c, grpc_call_list *call_list); static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c); static gpr_timespec compute_connect_deadline(grpc_subchannel *c); -static void subchannel_connected(void *subchannel, int iomgr_success); +static void subchannel_connected(void *subchannel, int iomgr_success, + grpc_call_list *call_list); static void subchannel_ref_locked( grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); @@ -158,8 +158,9 @@ static int subchannel_unref_locked( grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS); static grpc_subchannel *connection_unref_locked( - connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT; -static void subchannel_destroy(grpc_subchannel *c); + connection *c, grpc_call_list *call_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) + GRPC_MUST_USE_RESULT; +static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list); #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #define SUBCHANNEL_REF_LOCKED(p, r) \ @@ -168,8 +169,8 @@ static void subchannel_destroy(grpc_subchannel *c); subchannel_unref_locked((p), __FILE__, __LINE__, (r)) #define CONNECTION_REF_LOCKED(p, r) \ connection_ref_locked((p), __FILE__, __LINE__, (r)) -#define CONNECTION_UNREF_LOCKED(p, r) \ - connection_unref_locked((p), __FILE__, __LINE__, (r)) +#define CONNECTION_UNREF_LOCKED(p, r, cl) \ + connection_unref_locked((p), (cl), __FILE__, __LINE__, (r)) #define REF_PASS_ARGS , file, line, reason #define REF_LOG(name, p) \ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \ @@ -181,7 +182,7 @@ static void subchannel_destroy(grpc_subchannel *c); #define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p)) #define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p)) #define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p)) -#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p)) +#define CONNECTION_UNREF_LOCKED(p, r, cl) connection_unref_locked((p), (cl)) #define REF_PASS_ARGS #define REF_LOG(name, p) \ do { \ @@ -195,9 +196,9 @@ static void subchannel_destroy(grpc_subchannel *c); * connection implementation */ -static void connection_destroy(connection *c) { +static void connection_destroy(connection *c, grpc_call_list *call_list) { GPR_ASSERT(c->refs == 0); - grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c)); + grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c), call_list); gpr_free(c); } @@ -209,14 +210,14 @@ static void connection_ref_locked( } static grpc_subchannel *connection_unref_locked( - connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { + connection *c, grpc_call_list *call_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { grpc_subchannel *destroy = NULL; UNREF_LOG("CONNECTION", c); if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) { destroy = c->subchannel; } if (--c->refs == 0 && c->subchannel->active != c) { - connection_destroy(c); + connection_destroy(c, call_list); } return destroy; } @@ -243,17 +244,19 @@ void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { gpr_mu_unlock(&c->mu); } -void grpc_subchannel_unref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_unref(grpc_subchannel *c, + grpc_call_list *call_list + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { int destroy; gpr_mu_lock(&c->mu); destroy = subchannel_unref_locked(c REF_PASS_ARGS); gpr_mu_unlock(&c->mu); - if (destroy) subchannel_destroy(c); + if (destroy) subchannel_destroy(c, call_list); } -static void subchannel_destroy(grpc_subchannel *c) { +static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list) { if (c->active != NULL) { - connection_destroy(c->active); + connection_destroy(c->active, call_list); } gpr_free(c->filters); grpc_channel_args_destroy(c->args); @@ -261,18 +264,19 @@ static void subchannel_destroy(grpc_subchannel *c) { grpc_mdctx_unref(c->mdctx); grpc_connectivity_state_destroy(&c->state_tracker); grpc_connector_unref(c->connector); - GRPC_WORKQUEUE_UNREF(c->workqueue, "subchannel"); gpr_free(c); } void grpc_subchannel_add_interested_party(grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_add_pollset(c->pollset_set, pollset); + grpc_pollset *pollset, + grpc_call_list *call_list) { + grpc_pollset_set_add_pollset(c->pollset_set, pollset, call_list); } void grpc_subchannel_del_interested_party(grpc_subchannel *c, - grpc_pollset *pollset) { - grpc_pollset_set_del_pollset(c->pollset_set, pollset); + grpc_pollset *pollset, + grpc_call_list *call_list) { + grpc_pollset_set_del_pollset(c->pollset_set, pollset, call_list); } static gpr_uint32 random_seed() { @@ -298,8 +302,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->args = grpc_channel_args_copy(args->args); c->mdctx = args->mdctx; c->master = args->master; - c->workqueue = grpc_channel_get_workqueue(c->master); - GRPC_WORKQUEUE_REF(c->workqueue, "subchannel"); c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem); c->random = random_seed(); grpc_mdctx_ref(c->mdctx); @@ -310,7 +312,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, return c; } -static void continue_connect(grpc_subchannel *c) { +static void continue_connect(grpc_subchannel *c, grpc_call_list *call_list) { grpc_connect_in_args args; args.interested_parties = c->pollset_set; @@ -321,24 +323,25 @@ static void continue_connect(grpc_subchannel *c) { args.metadata_context = c->mdctx; grpc_connector_connect(c->connector, &args, &c->connecting_result, - &c->connected); + &c->connected, call_list); } -static void start_connect(grpc_subchannel *c) { +static void start_connect(grpc_subchannel *c, grpc_call_list *call_list) { c->backoff_delta = gpr_time_from_seconds( GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN); c->next_attempt = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta); - continue_connect(c); + continue_connect(c, call_list); } -static void continue_creating_call(void *arg, int iomgr_success) { +static void continue_creating_call(void *arg, int iomgr_success, + grpc_call_list *call_list) { waiting_for_connect *w4c = arg; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; - grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset); + grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset, + call_list); grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target, - w4c->notify, &call_list); - GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect"); + w4c->notify, call_list); + GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect", call_list); gpr_free(w4c); } @@ -354,7 +357,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, gpr_mu_unlock(&c->mu); *target = create_call(con); - notify->cb(notify->cb_arg, 1); + notify->cb(notify->cb_arg, 1, call_list); } else { waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c)); w4c->next = c->waiting; @@ -366,7 +369,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect"); grpc_closure_init(&w4c->continuation, continue_creating_call, w4c); c->waiting = w4c; - grpc_subchannel_add_interested_party(c, pollset); + grpc_subchannel_add_interested_party(c, pollset, call_list); if (!c->connecting) { c->connecting = 1; connectivity_state_changed_locked(c, "create_call", call_list); @@ -375,7 +378,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset, GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting"); gpr_mu_unlock(&c->mu); - start_connect(c); + start_connect(c, call_list); } else { gpr_mu_unlock(&c->mu); } @@ -408,16 +411,16 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, gpr_mu_unlock(&c->mu); if (do_connect) { - start_connect(c); + start_connect(c, call_list); } } void grpc_subchannel_process_transport_op(grpc_subchannel *c, - grpc_transport_op *op) { + grpc_transport_op *op, + grpc_call_list *call_list) { connection *con = NULL; grpc_subchannel *destroy; int cancel_alarm = 0; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; @@ -425,7 +428,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, } if (op->disconnect) { c->disconnected = 1; - connectivity_state_changed_locked(c, "disconnect", &call_list); + connectivity_state_changed_locked(c, "disconnect", call_list); if (c->have_alarm) { cancel_alarm = 1; } @@ -436,28 +439,27 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con); grpc_channel_element *top_elem = grpc_channel_stack_element(channel_stack, 0); - top_elem->filter->start_transport_op(top_elem, op); + top_elem->filter->start_transport_op(top_elem, op, call_list); gpr_mu_lock(&c->mu); - destroy = CONNECTION_UNREF_LOCKED(con, "transport-op"); + destroy = CONNECTION_UNREF_LOCKED(con, "transport-op", call_list); gpr_mu_unlock(&c->mu); if (destroy) { - subchannel_destroy(destroy); + subchannel_destroy(destroy, call_list); } } if (cancel_alarm) { - grpc_alarm_cancel(&c->alarm); + grpc_alarm_cancel(&c->alarm, call_list); } if (op->disconnect) { - grpc_connector_shutdown(c->connector); + grpc_connector_shutdown(c->connector, call_list); } - - grpc_call_list_run(call_list); } -static void on_state_changed(void *p, int iomgr_success) { +static void on_state_changed(void *p, int iomgr_success, + grpc_call_list *call_list) { state_watcher *sw = p; grpc_subchannel *c = sw->subchannel; gpr_mu *mu = &c->mu; @@ -465,7 +467,6 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_transport_op op; grpc_channel_element *elem; connection *destroy_connection = NULL; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(mu); @@ -485,7 +486,7 @@ static void on_state_changed(void *p, int iomgr_success) { op.on_connectivity_state_change = &sw->closure; elem = grpc_channel_stack_element( CHANNEL_STACK_FROM_CONNECTION(c->active), 0); - elem->filter->start_transport_op(elem, &op); + elem->filter->start_transport_op(elem, &op, call_list); /* early out */ gpr_mu_unlock(mu); return; @@ -499,22 +500,21 @@ static void on_state_changed(void *p, int iomgr_success) { grpc_connectivity_state_set( &c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE : GRPC_CHANNEL_TRANSIENT_FAILURE, - "connection_failed", &call_list); + "connection_failed", call_list); break; } done: - connectivity_state_changed_locked(c, "transport_state_changed", &call_list); + connectivity_state_changed_locked(c, "transport_state_changed", call_list); destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher"); gpr_free(sw); gpr_mu_unlock(mu); if (destroy) { - subchannel_destroy(c); + subchannel_destroy(c, call_list); } if (destroy_connection != NULL) { - connection_destroy(destroy_connection); + connection_destroy(destroy_connection, call_list); } - grpc_call_list_run(call_list); } static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { @@ -544,7 +544,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { con->refs = 0; con->subchannel = c; grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, - stk); + stk, call_list); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); gpr_free(c->connecting_result.filters); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); @@ -561,9 +561,9 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { gpr_mu_unlock(&c->mu); gpr_free(sw); gpr_free(filters); - grpc_channel_stack_destroy(stk); + grpc_channel_stack_destroy(stk, call_list); GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); - GRPC_SUBCHANNEL_UNREF(c, "connecting"); + GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list); return; } @@ -587,7 +587,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting")); elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0); - elem->filter->start_transport_op(elem, &op); + elem->filter->start_transport_op(elem, &op, call_list); /* signal completion */ connectivity_state_changed_locked(c, "connected", call_list); @@ -605,7 +605,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) { gpr_free(filters); if (destroy_connection != NULL) { - connection_destroy(destroy_connection); + connection_destroy(destroy_connection, call_list); } } @@ -638,41 +638,38 @@ static void update_reconnect_parameters(grpc_subchannel *c) { gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN)); } -static void on_alarm(void *arg, int iomgr_success) { +static void on_alarm(void *arg, int iomgr_success, grpc_call_list *call_list) { grpc_subchannel *c = arg; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(&c->mu); c->have_alarm = 0; if (c->disconnected) { iomgr_success = 0; } - connectivity_state_changed_locked(c, "alarm", &call_list); + connectivity_state_changed_locked(c, "alarm", call_list); gpr_mu_unlock(&c->mu); if (iomgr_success) { update_reconnect_parameters(c); - continue_connect(c); + continue_connect(c, call_list); } else { GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting"); - GRPC_SUBCHANNEL_UNREF(c, "connecting"); + GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list); } - grpc_call_list_run(call_list); } -static void subchannel_connected(void *arg, int iomgr_success) { +static void subchannel_connected(void *arg, int iomgr_success, + grpc_call_list *call_list) { grpc_subchannel *c = arg; - grpc_call_list call_list = GRPC_CALL_LIST_INIT; if (c->connecting_result.transport != NULL) { - publish_transport(c, &call_list); + publish_transport(c, call_list); } else { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_mu_lock(&c->mu); GPR_ASSERT(!c->have_alarm); c->have_alarm = 1; - connectivity_state_changed_locked(c, "connect_failed", &call_list); - grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now); + connectivity_state_changed_locked(c, "connect_failed", call_list); + grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now, call_list); gpr_mu_unlock(&c->mu); } - grpc_call_list_run(call_list); } static gpr_timespec compute_connect_deadline(grpc_subchannel *c) { @@ -718,33 +715,36 @@ void grpc_subchannel_call_ref( gpr_ref(&c->refs); } -void grpc_subchannel_call_unref( - grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { +void grpc_subchannel_call_unref(grpc_subchannel_call *c, + grpc_call_list *call_list + GRPC_SUBCHANNEL_REF_EXTRA_ARGS) { if (gpr_unref(&c->refs)) { gpr_mu *mu = &c->connection->subchannel->mu; grpc_subchannel *destroy; - grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c)); + grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), call_list); gpr_mu_lock(mu); - destroy = CONNECTION_UNREF_LOCKED(c->connection, "call"); + destroy = CONNECTION_UNREF_LOCKED(c->connection, "call", call_list); gpr_mu_unlock(mu); gpr_free(c); if (destroy != NULL) { - subchannel_destroy(destroy); + subchannel_destroy(destroy, call_list); } } } -char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call) { +char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call, + grpc_call_list *call_list) { grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - return top_elem->filter->get_peer(top_elem); + return top_elem->filter->get_peer(top_elem, call_list); } void grpc_subchannel_call_process_op(grpc_subchannel_call *call, - grpc_transport_stream_op *op) { + grpc_transport_stream_op *op, + grpc_call_list *call_list) { grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call); grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0); - top_elem->filter->start_transport_stream_op(top_elem, op); + top_elem->filter->start_transport_stream_op(top_elem, op, call_list); } grpc_subchannel_call *create_call(connection *con) { diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 189e531d35..0ebfc7c29a 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -47,30 +47,33 @@ typedef struct grpc_subchannel_args grpc_subchannel_args; #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG #define GRPC_SUBCHANNEL_REF(p, r) \ grpc_subchannel_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_UNREF(p, r) \ - grpc_subchannel_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_UNREF(p, r, cl) \ + grpc_subchannel_unref((p), (cl), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) \ grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \ - grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_CALL_UNREF(p, r, cl) \ + grpc_subchannel_call_unref((p), (cl), __FILE__, __LINE__, (r)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \ , const char *file, int line, const char *reason #else #define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p)) -#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p)) +#define GRPC_SUBCHANNEL_UNREF(p, r, cl) grpc_subchannel_unref((p), (cl)) #define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p)) -#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p)) +#define GRPC_SUBCHANNEL_CALL_UNREF(p, r, cl) \ + grpc_subchannel_call_unref((p), (cl)) #define GRPC_SUBCHANNEL_REF_EXTRA_ARGS #endif void grpc_subchannel_ref( grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_unref( - grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_unref(grpc_subchannel *channel, + grpc_call_list *call_list + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); void grpc_subchannel_call_ref( grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); -void grpc_subchannel_call_unref( - grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS); +void grpc_subchannel_call_unref(grpc_subchannel_call *call, + grpc_call_list *call_list + GRPC_SUBCHANNEL_REF_EXTRA_ARGS); /** construct a call (possibly asynchronously) */ void grpc_subchannel_create_call(grpc_subchannel *subchannel, @@ -81,7 +84,8 @@ void grpc_subchannel_create_call(grpc_subchannel *subchannel, /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, - grpc_transport_op *op); + grpc_transport_op *op, + grpc_call_list *call_list); /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( @@ -96,17 +100,21 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel, /** express interest in \a channel's activities through \a pollset. */ void grpc_subchannel_add_interested_party(grpc_subchannel *channel, - grpc_pollset *pollset); + grpc_pollset *pollset, + grpc_call_list *call_list); /** stop following \a channel's activity through \a pollset. */ void grpc_subchannel_del_interested_party(grpc_subchannel *channel, - grpc_pollset *pollset); + grpc_pollset *pollset, + grpc_call_list *call_list); /** continue processing a transport op */ void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call, - grpc_transport_stream_op *op); + grpc_transport_stream_op *op, + grpc_call_list *call_list); /** continue querying for peer */ -char *grpc_subchannel_call_get_peer(grpc_subchannel_call *subchannel_call); +char *grpc_subchannel_call_get_peer(grpc_subchannel_call *subchannel_call, + grpc_call_list *call_list); struct grpc_subchannel_args { /** Channel filters for this channel - wrapped factories will likely diff --git a/src/core/client_config/subchannel_factory.h b/src/core/client_config/subchannel_factory.h index d7eae1c964..b588580edb 100644 --- a/src/core/client_config/subchannel_factory.h +++ b/src/core/client_config/subchannel_factory.h @@ -50,7 +50,8 @@ struct grpc_subchannel_factory_vtable { void (*ref)(grpc_subchannel_factory *factory); void (*unref)(grpc_subchannel_factory *factory); grpc_subchannel *(*create_subchannel)(grpc_subchannel_factory *factory, - grpc_subchannel_args *args); + grpc_subchannel_args *args, + grpc_call_list *call_list); }; void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory); @@ -58,6 +59,7 @@ void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory); /** Create a new grpc_subchannel */ grpc_subchannel *grpc_subchannel_factory_create_subchannel( - grpc_subchannel_factory *factory, grpc_subchannel_args *args); + grpc_subchannel_factory *factory, grpc_subchannel_args *args, + grpc_call_list *call_list); #endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */ |