aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-18 17:29:00 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-18 17:29:00 -0700
commitd1bec03fa148344b8eac2b59517252d86e4ca858 (patch)
treef359e48f9151ab7ceff72cd624ad6c7a59e4d304 /src/core/client_config
parent33825118df7157219cec15382beb006d3462ad96 (diff)
Call list progress
Diffstat (limited to 'src/core/client_config')
-rw-r--r--src/core/client_config/client_config.c9
-rw-r--r--src/core/client_config/client_config.h3
-rw-r--r--src/core/client_config/connector.c9
-rw-r--r--src/core/client_config/connector.h18
-rw-r--r--src/core/client_config/lb_policies/pick_first.c86
-rw-r--r--src/core/client_config/lb_policy.c8
-rw-r--r--src/core/client_config/lb_policy.h14
-rw-r--r--src/core/client_config/lb_policy_factory.h2
-rw-r--r--src/core/client_config/resolver.c26
-rw-r--r--src/core/client_config/resolver.h27
-rw-r--r--src/core/client_config/resolver_factory.h2
-rw-r--r--src/core/client_config/resolver_registry.c6
-rw-r--r--src/core/client_config/resolver_registry.h5
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c67
-rw-r--r--src/core/client_config/resolvers/sockaddr_resolver.c50
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c6
-rw-r--r--src/core/client_config/subchannel.c158
-rw-r--r--src/core/client_config/subchannel.h38
-rw-r--r--src/core/client_config/subchannel_factory.h6
19 files changed, 253 insertions, 287 deletions
diff --git a/src/core/client_config/client_config.c b/src/core/client_config/client_config.c
index 4453824148..0780880473 100644
--- a/src/core/client_config/client_config.c
+++ b/src/core/client_config/client_config.c
@@ -51,21 +51,20 @@ grpc_client_config *grpc_client_config_create() {
void grpc_client_config_ref(grpc_client_config *c) { gpr_ref(&c->refs); }
-void grpc_client_config_unref(grpc_client_config *c) {
+void grpc_client_config_unref(grpc_client_config *c,
+ grpc_call_list *call_list) {
if (gpr_unref(&c->refs)) {
- GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config");
+ GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config", call_list);
gpr_free(c);
}
}
void grpc_client_config_set_lb_policy(grpc_client_config *c,
grpc_lb_policy *lb_policy) {
+ GPR_ASSERT(c->lb_policy == NULL);
if (lb_policy) {
GRPC_LB_POLICY_REF(lb_policy, "client_config");
}
- if (c->lb_policy) {
- GRPC_LB_POLICY_UNREF(c->lb_policy, "client_config");
- }
c->lb_policy = lb_policy;
}
diff --git a/src/core/client_config/client_config.h b/src/core/client_config/client_config.h
index 47612da42c..76a5c66594 100644
--- a/src/core/client_config/client_config.h
+++ b/src/core/client_config/client_config.h
@@ -42,7 +42,8 @@ typedef struct grpc_client_config grpc_client_config;
grpc_client_config *grpc_client_config_create();
void grpc_client_config_ref(grpc_client_config *client_config);
-void grpc_client_config_unref(grpc_client_config *client_config);
+void grpc_client_config_unref(grpc_client_config *client_config,
+ grpc_call_list *call_list);
void grpc_client_config_set_lb_policy(grpc_client_config *client_config,
grpc_lb_policy *lb_policy);
diff --git a/src/core/client_config/connector.c b/src/core/client_config/connector.c
index 6252d57271..31f0b84efe 100644
--- a/src/core/client_config/connector.c
+++ b/src/core/client_config/connector.c
@@ -44,10 +44,11 @@ void grpc_connector_unref(grpc_connector *connector) {
void grpc_connector_connect(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
- grpc_closure *notify) {
- connector->vtable->connect(connector, in_args, out_args, notify);
+ grpc_closure *notify, grpc_call_list *call_list) {
+ connector->vtable->connect(connector, in_args, out_args, notify, call_list);
}
-void grpc_connector_shutdown(grpc_connector *connector) {
- connector->vtable->shutdown(connector);
+void grpc_connector_shutdown(grpc_connector *connector,
+ grpc_call_list *call_list) {
+ connector->vtable->shutdown(connector, call_list);
}
diff --git a/src/core/client_config/connector.h b/src/core/client_config/connector.h
index fc1f6708de..388f656c44 100644
--- a/src/core/client_config/connector.h
+++ b/src/core/client_config/connector.h
@@ -55,10 +55,6 @@ typedef struct {
gpr_timespec deadline;
/** channel arguments (to be passed to transport) */
const grpc_channel_args *channel_args;
- /** metadata context */
- grpc_mdctx *metadata_context;
- /** workqueue */
- grpc_workqueue *workqueue;
} grpc_connect_in_args;
typedef struct {
@@ -71,23 +67,25 @@ typedef struct {
struct grpc_connector_vtable {
void (*ref)(grpc_connector *connector);
- void (*unref)(grpc_connector *connector);
+ void (*unref)(grpc_connector *connector, grpc_call_list *call_list);
/** Implementation of grpc_connector_shutdown */
- void (*shutdown)(grpc_connector *connector);
+ void (*shutdown)(grpc_connector *connector, grpc_call_list *call_list);
/** Implementation of grpc_connector_connect */
void (*connect)(grpc_connector *connector,
const grpc_connect_in_args *in_args,
- grpc_connect_out_args *out_args, grpc_closure *notify);
+ grpc_connect_out_args *out_args, grpc_closure *notify,
+ grpc_call_list *call_list);
};
void grpc_connector_ref(grpc_connector *connector);
-void grpc_connector_unref(grpc_connector *connector);
+void grpc_connector_unref(grpc_connector *connector, grpc_call_list *call_list);
/** Connect using the connector: max one outstanding call at a time */
void grpc_connector_connect(grpc_connector *connector,
const grpc_connect_in_args *in_args,
grpc_connect_out_args *out_args,
- grpc_closure *notify);
+ grpc_closure *notify, grpc_call_list *call_list);
/** Cancel any pending connection */
-void grpc_connector_shutdown(grpc_connector *connector);
+void grpc_connector_shutdown(grpc_connector *connector,
+ grpc_call_list *call_list);
#endif
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index 852eed310d..6dc52f43ce 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -52,8 +52,6 @@ typedef struct {
/** all our subchannels */
grpc_subchannel **subchannels;
size_t num_subchannels;
- /** workqueue for async work */
- grpc_workqueue *workqueue;
grpc_closure connectivity_changed;
@@ -78,33 +76,34 @@ typedef struct {
grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
-static void del_interested_parties_locked(pick_first_lb_policy *p) {
+static void del_interested_parties_locked(pick_first_lb_policy *p,
+ grpc_call_list *call_list) {
pending_pick *pp;
for (pp = p->pending_picks; pp; pp = pp->next) {
grpc_subchannel_del_interested_party(p->subchannels[p->checking_subchannel],
- pp->pollset);
+ pp->pollset, call_list);
}
}
-static void add_interested_parties_locked(pick_first_lb_policy *p) {
+static void add_interested_parties_locked(pick_first_lb_policy *p,
+ grpc_call_list *call_list) {
pending_pick *pp;
for (pp = p->pending_picks; pp; pp = pp->next) {
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
- pp->pollset);
+ pp->pollset, call_list);
}
}
-void pf_destroy(grpc_lb_policy *pol) {
+void pf_destroy(grpc_lb_policy *pol, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
size_t i;
- del_interested_parties_locked(p);
+ GPR_ASSERT(p->shutdown);
for (i = 0; i < p->num_subchannels; i++) {
- GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first");
+ GRPC_SUBCHANNEL_UNREF(p->subchannels[i], "pick_first", call_list);
}
grpc_connectivity_state_destroy(&p->state_tracker);
gpr_free(p->subchannels);
gpr_mu_destroy(&p->mu);
- GRPC_WORKQUEUE_UNREF(p->workqueue, "pick_first");
gpr_free(p);
}
@@ -112,7 +111,7 @@ void pf_shutdown(grpc_lb_policy *pol, grpc_call_list *call_list) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
pending_pick *pp;
gpr_mu_lock(&p->mu);
- del_interested_parties_locked(p);
+ del_interested_parties_locked(p, call_list);
p->shutdown = 1;
pp = p->pending_picks;
p->pending_picks = NULL;
@@ -156,13 +155,13 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
if (p->selected) {
gpr_mu_unlock(&p->mu);
*target = p->selected;
- on_complete->cb(on_complete->cb_arg, 1);
+ grpc_call_list_add(call_list, on_complete, 1);
} else {
if (!p->started_picking) {
start_picking(p, call_list);
}
grpc_subchannel_add_interested_party(p->subchannels[p->checking_subchannel],
- pollset);
+ pollset, call_list);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
pp->pollset = pollset;
@@ -173,58 +172,58 @@ void pf_pick(grpc_lb_policy *pol, grpc_pollset *pollset,
}
}
-static void pf_connectivity_changed(void *arg, int iomgr_success) {
+static void pf_connectivity_changed(void *arg, int iomgr_success,
+ grpc_call_list *call_list) {
pick_first_lb_policy *p = arg;
pending_pick *pp;
- int unref = 0;
- grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&p->mu);
if (p->shutdown) {
- unref = 1;
+ GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list);
} else if (p->selected != NULL) {
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
- "selected_changed", &call_list);
+ "selected_changed", call_list);
if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
grpc_subchannel_notify_on_state_change(
p->selected, &p->checking_connectivity, &p->connectivity_changed,
- &call_list);
+ call_list);
} else {
- unref = 1;
+ GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list);
}
} else {
loop:
switch (p->checking_connectivity) {
case GRPC_CHANNEL_READY:
grpc_connectivity_state_set(&p->state_tracker, GRPC_CHANNEL_READY,
- "connecting_ready", &call_list);
+ "connecting_ready", call_list);
p->selected = p->subchannels[p->checking_subchannel];
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = p->selected;
- grpc_subchannel_del_interested_party(p->selected, pp->pollset);
- grpc_call_list_add(&call_list, pp->on_complete, 1);
+ grpc_subchannel_del_interested_party(p->selected, pp->pollset,
+ call_list);
+ grpc_call_list_add(call_list, pp->on_complete, 1);
gpr_free(pp);
}
grpc_subchannel_notify_on_state_change(
p->selected, &p->checking_connectivity, &p->connectivity_changed,
- &call_list);
+ call_list);
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connecting_transient_failure", &call_list);
- del_interested_parties_locked(p);
+ "connecting_transient_failure", call_list);
+ del_interested_parties_locked(p, call_list);
p->checking_subchannel =
(p->checking_subchannel + 1) % p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(p);
+ add_interested_parties_locked(p, call_list);
if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed, &call_list);
+ &p->connectivity_changed, call_list);
} else {
goto loop;
}
@@ -232,48 +231,43 @@ static void pf_connectivity_changed(void *arg, int iomgr_success) {
case GRPC_CHANNEL_CONNECTING:
case GRPC_CHANNEL_IDLE:
grpc_connectivity_state_set(&p->state_tracker, p->checking_connectivity,
- "connecting_changed", &call_list);
+ "connecting_changed", call_list);
grpc_subchannel_notify_on_state_change(
p->subchannels[p->checking_subchannel], &p->checking_connectivity,
- &p->connectivity_changed, &call_list);
+ &p->connectivity_changed, call_list);
break;
case GRPC_CHANNEL_FATAL_FAILURE:
- del_interested_parties_locked(p);
+ del_interested_parties_locked(p, call_list);
GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
p->subchannels[p->num_subchannels - 1]);
p->num_subchannels--;
- GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first");
+ GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first",
+ call_list);
if (p->num_subchannels == 0) {
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_FATAL_FAILURE,
- "no_more_channels", &call_list);
+ "no_more_channels", call_list);
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = NULL;
- grpc_call_list_add(&call_list, pp->on_complete, 1);
+ grpc_call_list_add(call_list, pp->on_complete, 1);
gpr_free(pp);
}
- unref = 1;
+ GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity", call_list);
} else {
grpc_connectivity_state_set(&p->state_tracker,
GRPC_CHANNEL_TRANSIENT_FAILURE,
- "subchannel_failed", &call_list);
+ "subchannel_failed", call_list);
p->checking_subchannel %= p->num_subchannels;
p->checking_connectivity = grpc_subchannel_check_connectivity(
p->subchannels[p->checking_subchannel]);
- add_interested_parties_locked(p);
+ add_interested_parties_locked(p, call_list);
goto loop;
}
}
}
gpr_mu_unlock(&p->mu);
-
- grpc_call_list_run(call_list);
-
- if (unref) {
- GRPC_LB_POLICY_UNREF(&p->base, "pick_first_connectivity");
- }
}
static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
@@ -293,8 +287,8 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op,
gpr_mu_unlock(&p->mu);
for (i = 0; i < n; i++) {
- grpc_subchannel_process_transport_op(subchannels[i], op);
- GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast");
+ grpc_subchannel_process_transport_op(subchannels[i], op, call_list);
+ GRPC_SUBCHANNEL_UNREF(subchannels[i], "pf_broadcast", call_list);
}
gpr_free(subchannels);
}
@@ -341,8 +335,6 @@ static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
p->subchannels = gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
p->num_subchannels = args->num_subchannels;
- p->workqueue = args->workqueue;
- GRPC_WORKQUEUE_REF(p->workqueue, "pick_first");
grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
"pick_first");
memcpy(p->subchannels, args->subchannels,
diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c
index 95ab29cd88..84a033d583 100644
--- a/src/core/client_config/lb_policy.c
+++ b/src/core/client_config/lb_policy.c
@@ -51,15 +51,15 @@ void grpc_lb_policy_ref(grpc_lb_policy *policy) {
}
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
-void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line,
- const char *reason) {
+void grpc_lb_policy_unref(grpc_lb_policy *policy, grpc_call_list *call_list,
+ const char *file, int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "LB_POLICY:%p unref %d -> %d %s",
policy, (int)policy->refs.count, (int)policy->refs.count - 1, reason);
#else
-void grpc_lb_policy_unref(grpc_lb_policy *policy) {
+void grpc_lb_policy_unref(grpc_lb_policy *policy, grpc_call_list *call_list) {
#endif
if (gpr_unref(&policy->refs)) {
- policy->vtable->destroy(policy);
+ policy->vtable->destroy(policy, call_list);
}
}
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index e445d2cded..f9430b4250 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -51,7 +51,7 @@ struct grpc_lb_policy {
};
struct grpc_lb_policy_vtable {
- void (*destroy)(grpc_lb_policy *policy);
+ void (*destroy)(grpc_lb_policy *policy, grpc_call_list *call_list);
void (*shutdown)(grpc_lb_policy *policy, grpc_call_list *call_list);
@@ -82,17 +82,17 @@ struct grpc_lb_policy_vtable {
#ifdef GRPC_LB_POLICY_REFCOUNT_DEBUG
#define GRPC_LB_POLICY_REF(p, r) \
grpc_lb_policy_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_LB_POLICY_UNREF(p, r) \
- grpc_lb_policy_unref((p), __FILE__, __LINE__, (r))
+#define GRPC_LB_POLICY_UNREF(p, r, cl) \
+ grpc_lb_policy_unref((p), (cl), __FILE__, __LINE__, (r))
void grpc_lb_policy_ref(grpc_lb_policy *policy, const char *file, int line,
const char *reason);
-void grpc_lb_policy_unref(grpc_lb_policy *policy, const char *file, int line,
- const char *reason);
+void grpc_lb_policy_unref(grpc_lb_policy *policy, grpc_call_list *call_list,
+ const char *file, int line, const char *reason);
#else
#define GRPC_LB_POLICY_REF(p, r) grpc_lb_policy_ref((p))
-#define GRPC_LB_POLICY_UNREF(p, r) grpc_lb_policy_unref((p))
+#define GRPC_LB_POLICY_UNREF(p, r, cl) grpc_lb_policy_unref((p), (cl))
void grpc_lb_policy_ref(grpc_lb_policy *policy);
-void grpc_lb_policy_unref(grpc_lb_policy *policy);
+void grpc_lb_policy_unref(grpc_lb_policy *policy, grpc_call_list *call_list);
#endif
/** called by concrete implementations to initialize the base struct */
diff --git a/src/core/client_config/lb_policy_factory.h b/src/core/client_config/lb_policy_factory.h
index 755faf8813..04610316ee 100644
--- a/src/core/client_config/lb_policy_factory.h
+++ b/src/core/client_config/lb_policy_factory.h
@@ -36,7 +36,6 @@
#include "src/core/client_config/lb_policy.h"
#include "src/core/client_config/subchannel.h"
-#include "src/core/iomgr/workqueue.h"
typedef struct grpc_lb_policy_factory grpc_lb_policy_factory;
typedef struct grpc_lb_policy_factory_vtable grpc_lb_policy_factory_vtable;
@@ -50,7 +49,6 @@ struct grpc_lb_policy_factory {
typedef struct grpc_lb_policy_args {
grpc_subchannel **subchannels;
size_t num_subchannels;
- grpc_workqueue *workqueue;
} grpc_lb_policy_args;
struct grpc_lb_policy_factory_vtable {
diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c
index 6a7d73c54f..eae8ec6ddf 100644
--- a/src/core/client_config/resolver.c
+++ b/src/core/client_config/resolver.c
@@ -40,8 +40,8 @@ void grpc_resolver_init(grpc_resolver *resolver,
}
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
-void grpc_resolver_ref(grpc_resolver *resolver, const char *file, int line,
- const char *reason) {
+void grpc_resolver_ref(grpc_resolver *resolver, grpc_call_list *call_list,
+ const char *file, int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s",
resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1,
reason);
@@ -52,32 +52,34 @@ void grpc_resolver_ref(grpc_resolver *resolver) {
}
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
-void grpc_resolver_unref(grpc_resolver *resolver, const char *file, int line,
- const char *reason) {
+void grpc_resolver_unref(grpc_resolver *resolver, grpc_call_list *call_list,
+ const char *file, int line, const char *reason) {
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s",
resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1,
reason);
#else
-void grpc_resolver_unref(grpc_resolver *resolver) {
+void grpc_resolver_unref(grpc_resolver *resolver, grpc_call_list *call_list) {
#endif
if (gpr_unref(&resolver->refs)) {
- resolver->vtable->destroy(resolver);
+ resolver->vtable->destroy(resolver, call_list);
}
}
-void grpc_resolver_shutdown(grpc_resolver *resolver) {
- resolver->vtable->shutdown(resolver);
+void grpc_resolver_shutdown(grpc_resolver *resolver,
+ grpc_call_list *call_list) {
+ resolver->vtable->shutdown(resolver, call_list);
}
void grpc_resolver_channel_saw_error(grpc_resolver *resolver,
struct sockaddr *failing_address,
- int failing_address_len) {
+ int failing_address_len,
+ grpc_call_list *call_list) {
resolver->vtable->channel_saw_error(resolver, failing_address,
- failing_address_len);
+ failing_address_len, call_list);
}
void grpc_resolver_next(grpc_resolver *resolver,
grpc_client_config **target_config,
- grpc_closure *on_complete) {
- resolver->vtable->next(resolver, target_config, on_complete);
+ grpc_closure *on_complete, grpc_call_list *call_list) {
+ resolver->vtable->next(resolver, target_config, on_complete, call_list);
}
diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h
index 217d061abe..7960bcb92f 100644
--- a/src/core/client_config/resolver.h
+++ b/src/core/client_config/resolver.h
@@ -49,40 +49,41 @@ struct grpc_resolver {
};
struct grpc_resolver_vtable {
- void (*destroy)(grpc_resolver *resolver);
- void (*shutdown)(grpc_resolver *resolver);
+ void (*destroy)(grpc_resolver *resolver, grpc_call_list *call_list);
+ void (*shutdown)(grpc_resolver *resolver, grpc_call_list *call_list);
void (*channel_saw_error)(grpc_resolver *resolver,
struct sockaddr *failing_address,
- int failing_address_len);
+ int failing_address_len, grpc_call_list *call_list);
void (*next)(grpc_resolver *resolver, grpc_client_config **target_config,
- grpc_closure *on_complete);
+ grpc_closure *on_complete, grpc_call_list *call_list);
};
#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_RESOLVER_UNREF(p, r) \
- grpc_resolver_unref((p), __FILE__, __LINE__, (r))
+#define GRPC_RESOLVER_UNREF(p, r, cl) \
+ grpc_resolver_unref((p), (cl), __FILE__, __LINE__, (r))
void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line,
const char *reason);
-void grpc_resolver_unref(grpc_resolver *policy, const char *file, int line,
- const char *reason);
+void grpc_resolver_unref(grpc_resolver *policy, grpc_call_list *call_list,
+ const char *file, int line, const char *reason);
#else
#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p))
-#define GRPC_RESOLVER_UNREF(p, r) grpc_resolver_unref((p))
+#define GRPC_RESOLVER_UNREF(p, r, cl) grpc_resolver_unref((p), (cl))
void grpc_resolver_ref(grpc_resolver *policy);
-void grpc_resolver_unref(grpc_resolver *policy);
+void grpc_resolver_unref(grpc_resolver *policy, grpc_call_list *call_list);
#endif
void grpc_resolver_init(grpc_resolver *resolver,
const grpc_resolver_vtable *vtable);
-void grpc_resolver_shutdown(grpc_resolver *resolver);
+void grpc_resolver_shutdown(grpc_resolver *resolver, grpc_call_list *call_list);
/** Notification that the channel has seen an error on some address.
Can be used as a hint that re-resolution is desirable soon. */
void grpc_resolver_channel_saw_error(grpc_resolver *resolver,
struct sockaddr *failing_address,
- int failing_address_len);
+ int failing_address_len,
+ grpc_call_list *call_list);
/** Get the next client config. Called by the channel to fetch a new
configuration. Expected to set *target_config with a new configuration,
@@ -92,6 +93,6 @@ void grpc_resolver_channel_saw_error(grpc_resolver *resolver,
schedule on_complete. */
void grpc_resolver_next(grpc_resolver *resolver,
grpc_client_config **target_config,
- grpc_closure *on_complete);
+ grpc_closure *on_complete, grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_CONFIG_RESOLVER_H */
diff --git a/src/core/client_config/resolver_factory.h b/src/core/client_config/resolver_factory.h
index 24da84d89d..4c4df353f7 100644
--- a/src/core/client_config/resolver_factory.h
+++ b/src/core/client_config/resolver_factory.h
@@ -37,7 +37,6 @@
#include "src/core/client_config/resolver.h"
#include "src/core/client_config/subchannel_factory.h"
#include "src/core/client_config/uri_parser.h"
-#include "src/core/iomgr/workqueue.h"
typedef struct grpc_resolver_factory grpc_resolver_factory;
typedef struct grpc_resolver_factory_vtable grpc_resolver_factory_vtable;
@@ -51,7 +50,6 @@ struct grpc_resolver_factory {
typedef struct grpc_resolver_args {
grpc_uri *uri;
grpc_subchannel_factory *subchannel_factory;
- grpc_workqueue *workqueue;
} grpc_resolver_args;
struct grpc_resolver_factory_vtable {
diff --git a/src/core/client_config/resolver_registry.c b/src/core/client_config/resolver_registry.c
index 3611252bda..89a945c2d3 100644
--- a/src/core/client_config/resolver_registry.c
+++ b/src/core/client_config/resolver_registry.c
@@ -114,9 +114,8 @@ static grpc_resolver_factory *resolve_factory(const char *target,
return factory;
}
-grpc_resolver *grpc_resolver_create(const char *target,
- grpc_subchannel_factory *subchannel_factory,
- grpc_workqueue *workqueue) {
+grpc_resolver *grpc_resolver_create(
+ const char *target, grpc_subchannel_factory *subchannel_factory) {
grpc_uri *uri = NULL;
grpc_resolver_factory *factory = resolve_factory(target, &uri);
grpc_resolver *resolver;
@@ -124,7 +123,6 @@ grpc_resolver *grpc_resolver_create(const char *target,
memset(&args, 0, sizeof(args));
args.uri = uri;
args.subchannel_factory = subchannel_factory;
- args.workqueue = workqueue;
resolver = grpc_resolver_factory_create_resolver(factory, &args);
grpc_uri_destroy(uri);
return resolver;
diff --git a/src/core/client_config/resolver_registry.h b/src/core/client_config/resolver_registry.h
index 9a9a628262..5a7193b7ae 100644
--- a/src/core/client_config/resolver_registry.h
+++ b/src/core/client_config/resolver_registry.h
@@ -55,9 +55,8 @@ void grpc_register_resolver_type(grpc_resolver_factory *factory);
If a resolver factory was found, use it to instantiate a resolver and
return it.
If a resolver factory was not found, return NULL. */
-grpc_resolver *grpc_resolver_create(const char *target,
- grpc_subchannel_factory *subchannel_factory,
- grpc_workqueue *workqueue);
+grpc_resolver *grpc_resolver_create(
+ const char *target, grpc_subchannel_factory *subchannel_factory);
/** Given a target, return a (freshly allocated with gpr_malloc) string
representing the default authority to pass from a client. */
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index e251664e03..9e9b3d4917 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -49,8 +49,6 @@ typedef struct {
grpc_resolver base;
/** refcount */
gpr_refcount refs;
- /** workqueue */
- grpc_workqueue *workqueue;
/** name to resolve */
char *name;
/** default port to use */
@@ -76,37 +74,36 @@ typedef struct {
grpc_client_config *resolved_config;
} dns_resolver;
-static void dns_destroy(grpc_resolver *r);
+static void dns_destroy(grpc_resolver *r, grpc_call_list *call_list);
static void dns_start_resolving_locked(dns_resolver *r);
-static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r)
- GRPC_MUST_USE_RESULT;
+static void dns_maybe_finish_next_locked(dns_resolver *r,
+ grpc_call_list *call_list);
-static void dns_shutdown(grpc_resolver *r);
+static void dns_shutdown(grpc_resolver *r, grpc_call_list *call_list);
static void dns_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
- int failing_address_len);
+ int failing_address_len,
+ grpc_call_list *call_list);
static void dns_next(grpc_resolver *r, grpc_client_config **target_config,
- grpc_closure *on_complete);
+ grpc_closure *on_complete, grpc_call_list *call_list);
static const grpc_resolver_vtable dns_resolver_vtable = {
dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next};
-static void dns_shutdown(grpc_resolver *resolver) {
+static void dns_shutdown(grpc_resolver *resolver, grpc_call_list *call_list) {
dns_resolver *r = (dns_resolver *)resolver;
- grpc_closure *next_completion;
gpr_mu_lock(&r->mu);
- next_completion = r->next_completion;
- r->next_completion = NULL;
- gpr_mu_unlock(&r->mu);
- if (next_completion != NULL) {
+ if (r->next_completion != NULL) {
*r->target_config = NULL;
- next_completion->cb(next_completion->cb_arg, 1);
+ grpc_call_list_add(call_list, r->next_completion, 1);
+ r->next_completion = NULL;
}
+ gpr_mu_unlock(&r->mu);
}
static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
- int len) {
+ int len, grpc_call_list *call_list) {
dns_resolver *r = (dns_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (!r->resolving) {
@@ -117,9 +114,8 @@ static void dns_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa,
static void dns_next(grpc_resolver *resolver,
grpc_client_config **target_config,
- grpc_closure *on_complete) {
+ grpc_closure *on_complete, grpc_call_list *call_list) {
dns_resolver *r = (dns_resolver *)resolver;
- grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
@@ -127,21 +123,18 @@ static void dns_next(grpc_resolver *resolver,
if (r->resolved_version == 0 && !r->resolving) {
dns_start_resolving_locked(r);
} else {
- call = dns_maybe_finish_next_locked(r);
+ dns_maybe_finish_next_locked(r, call_list);
}
gpr_mu_unlock(&r->mu);
- if (call) {
- call->cb(call->cb_arg, 1);
- }
}
-static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
+static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses,
+ grpc_call_list *call_list) {
dns_resolver *r = arg;
grpc_client_config *config = NULL;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
grpc_lb_policy *lb_policy;
- grpc_closure *call;
size_t i;
if (addresses) {
grpc_lb_policy_args lb_policy_args;
@@ -157,10 +150,9 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.subchannels = subchannels;
lb_policy_args.num_subchannels = addresses->naddrs;
- lb_policy_args.workqueue = r->workqueue;
lb_policy = grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
grpc_client_config_set_lb_policy(config, lb_policy);
- GRPC_LB_POLICY_UNREF(lb_policy, "construction");
+ GRPC_LB_POLICY_UNREF(lb_policy, "construction", call_list);
grpc_resolved_addresses_destroy(addresses);
gpr_free(subchannels);
}
@@ -168,17 +160,14 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
GPR_ASSERT(r->resolving);
r->resolving = 0;
if (r->resolved_config) {
- grpc_client_config_unref(r->resolved_config);
+ grpc_client_config_unref(r->resolved_config, call_list);
}
r->resolved_config = config;
r->resolved_version++;
- call = dns_maybe_finish_next_locked(r);
+ dns_maybe_finish_next_locked(r, call_list);
gpr_mu_unlock(&r->mu);
- if (call) {
- call->cb(call->cb_arg, 1);
- }
- GRPC_RESOLVER_UNREF(&r->base, "dns-resolving");
+ GRPC_RESOLVER_UNREF(&r->base, "dns-resolving", call_list);
}
static void dns_start_resolving_locked(dns_resolver *r) {
@@ -188,29 +177,27 @@ static void dns_start_resolving_locked(dns_resolver *r) {
grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r);
}
-static grpc_closure *dns_maybe_finish_next_locked(dns_resolver *r) {
- grpc_closure *ret = NULL;
+static void dns_maybe_finish_next_locked(dns_resolver *r,
+ grpc_call_list *call_list) {
if (r->next_completion != NULL &&
r->resolved_version != r->published_version) {
*r->target_config = r->resolved_config;
if (r->resolved_config) {
grpc_client_config_ref(r->resolved_config);
}
- ret = r->next_completion;
+ grpc_call_list_add(call_list, r->next_completion, 1);
r->next_completion = NULL;
r->published_version = r->resolved_version;
}
- return ret;
}
-static void dns_destroy(grpc_resolver *gr) {
+static void dns_destroy(grpc_resolver *gr, grpc_call_list *call_list) {
dns_resolver *r = (dns_resolver *)gr;
gpr_mu_destroy(&r->mu);
if (r->resolved_config) {
- grpc_client_config_unref(r->resolved_config);
+ grpc_client_config_unref(r->resolved_config, call_list);
}
grpc_subchannel_factory_unref(r->subchannel_factory);
- GRPC_WORKQUEUE_UNREF(r->workqueue, "dns");
gpr_free(r->name);
gpr_free(r->default_port);
gpr_free(r->lb_policy_name);
@@ -239,8 +226,6 @@ static grpc_resolver *dns_create(grpc_resolver_args *args,
r->default_port = gpr_strdup(default_port);
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
- r->workqueue = args->workqueue;
- GRPC_WORKQUEUE_REF(r->workqueue, "dns");
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
}
diff --git a/src/core/client_config/resolvers/sockaddr_resolver.c b/src/core/client_config/resolvers/sockaddr_resolver.c
index 47fe5fad4a..15eb60b93a 100644
--- a/src/core/client_config/resolvers/sockaddr_resolver.c
+++ b/src/core/client_config/resolvers/sockaddr_resolver.c
@@ -56,8 +56,6 @@ typedef struct {
gpr_refcount refs;
/** subchannel factory */
grpc_subchannel_factory *subchannel_factory;
- /** workqueue */
- grpc_workqueue *workqueue;
/** load balancing policy name */
char *lb_policy_name;
@@ -78,61 +76,59 @@ typedef struct {
grpc_client_config **target_config;
} sockaddr_resolver;
-static void sockaddr_destroy(grpc_resolver *r);
+static void sockaddr_destroy(grpc_resolver *r, grpc_call_list *call_list);
-static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r)
- GRPC_MUST_USE_RESULT;
+static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r,
+ grpc_call_list *call_list);
-static void sockaddr_shutdown(grpc_resolver *r);
+static void sockaddr_shutdown(grpc_resolver *r, grpc_call_list *call_list);
static void sockaddr_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
- int failing_address_len);
+ int failing_address_len,
+ grpc_call_list *call_list);
static void sockaddr_next(grpc_resolver *r, grpc_client_config **target_config,
- grpc_closure *on_complete);
+ grpc_closure *on_complete, grpc_call_list *call_list);
static const grpc_resolver_vtable sockaddr_resolver_vtable = {
sockaddr_destroy, sockaddr_shutdown, sockaddr_channel_saw_error,
sockaddr_next};
-static void sockaddr_shutdown(grpc_resolver *resolver) {
+static void sockaddr_shutdown(grpc_resolver *resolver,
+ grpc_call_list *call_list) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- call = r->next_completion;
+ grpc_call_list_add(call_list, r->next_completion, 1);
r->next_completion = NULL;
}
gpr_mu_unlock(&r->mu);
- if (call) {
- call->cb(call->cb_arg, 1);
- }
}
static void sockaddr_channel_saw_error(grpc_resolver *resolver,
- struct sockaddr *sa, int len) {}
+ struct sockaddr *sa, int len,
+ grpc_call_list *call_list) {}
static void sockaddr_next(grpc_resolver *resolver,
grpc_client_config **target_config,
- grpc_closure *on_complete) {
+ grpc_closure *on_complete,
+ grpc_call_list *call_list) {
sockaddr_resolver *r = (sockaddr_resolver *)resolver;
- grpc_closure *call = NULL;
gpr_mu_lock(&r->mu);
GPR_ASSERT(!r->next_completion);
r->next_completion = on_complete;
r->target_config = target_config;
- call = sockaddr_maybe_finish_next_locked(r);
+ sockaddr_maybe_finish_next_locked(r, call_list);
gpr_mu_unlock(&r->mu);
- if (call) call->cb(call->cb_arg, 1);
}
-static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
+static void sockaddr_maybe_finish_next_locked(sockaddr_resolver *r,
+ grpc_call_list *call_list) {
grpc_client_config *cfg;
grpc_lb_policy *lb_policy;
grpc_lb_policy_args lb_policy_args;
grpc_subchannel **subchannels;
grpc_subchannel_args args;
- grpc_closure *call = NULL;
if (r->next_completion != NULL && !r->published) {
size_t i;
@@ -148,26 +144,22 @@ static grpc_closure *sockaddr_maybe_finish_next_locked(sockaddr_resolver *r) {
memset(&lb_policy_args, 0, sizeof(lb_policy_args));
lb_policy_args.subchannels = subchannels;
lb_policy_args.num_subchannels = r->num_addrs;
- lb_policy_args.workqueue = r->workqueue;
lb_policy =
grpc_lb_policy_create(r->lb_policy_name, &lb_policy_args);
gpr_free(subchannels);
grpc_client_config_set_lb_policy(cfg, lb_policy);
- GRPC_LB_POLICY_UNREF(lb_policy, "unix");
+ GRPC_LB_POLICY_UNREF(lb_policy, "sockaddr", call_list);
r->published = 1;
*r->target_config = cfg;
- call = r->next_completion;
+ grpc_call_list_add(call_list, r->next_completion, 1);
r->next_completion = NULL;
}
-
- return call;
}
-static void sockaddr_destroy(grpc_resolver *gr) {
+static void sockaddr_destroy(grpc_resolver *gr, grpc_call_list *call_list) {
sockaddr_resolver *r = (sockaddr_resolver *)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
- GRPC_WORKQUEUE_UNREF(r->workqueue, "sockaddr");
gpr_free(r->addrs);
gpr_free(r->addrs_len);
gpr_free(r->lb_policy_name);
@@ -340,8 +332,6 @@ static grpc_resolver *sockaddr_create(
grpc_resolver_init(&r->base, &sockaddr_resolver_vtable);
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
- r->workqueue = args->workqueue;
- GRPC_WORKQUEUE_REF(r->workqueue, "sockaddr");
r->lb_policy_name = gpr_strdup(lb_policy_name);
return &r->base;
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 5632f0425e..c785e76939 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -61,8 +61,6 @@ typedef struct {
grpc_subchannel_factory *subchannel_factory;
/** load balancing policy name */
char *lb_policy_name;
- /** work queue */
- grpc_workqueue *workqueue;
/** mutex guarding the rest of the state */
gpr_mu mu;
@@ -436,7 +434,6 @@ static void zookeeper_destroy(grpc_resolver *gr) {
grpc_client_config_unref(r->resolved_config);
}
grpc_subchannel_factory_unref(r->subchannel_factory);
- grpc_workqueue_unref(r->workqueue);
gpr_free(r->name);
gpr_free(r->lb_policy_name);
gpr_free(r);
@@ -466,9 +463,6 @@ static grpc_resolver *zookeeper_create(grpc_resolver_args *args,
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path);
- r->workqueue = args->workqueue;
- grpc_workqueue_ref(r->workqueue);
-
r->subchannel_factory = args->subchannel_factory;
grpc_subchannel_factory_ref(r->subchannel_factory);
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 99310e02e5..937b3cd71c 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -76,7 +76,6 @@ typedef struct waiting_for_connect {
struct grpc_subchannel {
grpc_connector *connector;
- grpc_workqueue *workqueue;
/** non-transport related channel filters */
const grpc_channel_filter **filters;
@@ -150,7 +149,8 @@ static void connectivity_state_changed_locked(grpc_subchannel *c,
grpc_call_list *call_list);
static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
-static void subchannel_connected(void *subchannel, int iomgr_success);
+static void subchannel_connected(void *subchannel, int iomgr_success,
+ grpc_call_list *call_list);
static void subchannel_ref_locked(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
@@ -158,8 +158,9 @@ static int subchannel_unref_locked(
grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
static grpc_subchannel *connection_unref_locked(
- connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
-static void subchannel_destroy(grpc_subchannel *c);
+ connection *c, grpc_call_list *call_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS)
+ GRPC_MUST_USE_RESULT;
+static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list);
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define SUBCHANNEL_REF_LOCKED(p, r) \
@@ -168,8 +169,8 @@ static void subchannel_destroy(grpc_subchannel *c);
subchannel_unref_locked((p), __FILE__, __LINE__, (r))
#define CONNECTION_REF_LOCKED(p, r) \
connection_ref_locked((p), __FILE__, __LINE__, (r))
-#define CONNECTION_UNREF_LOCKED(p, r) \
- connection_unref_locked((p), __FILE__, __LINE__, (r))
+#define CONNECTION_UNREF_LOCKED(p, r, cl) \
+ connection_unref_locked((p), (cl), __FILE__, __LINE__, (r))
#define REF_PASS_ARGS , file, line, reason
#define REF_LOG(name, p) \
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
@@ -181,7 +182,7 @@ static void subchannel_destroy(grpc_subchannel *c);
#define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
#define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
#define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
-#define CONNECTION_UNREF_LOCKED(p, r) connection_unref_locked((p))
+#define CONNECTION_UNREF_LOCKED(p, r, cl) connection_unref_locked((p), (cl))
#define REF_PASS_ARGS
#define REF_LOG(name, p) \
do { \
@@ -195,9 +196,9 @@ static void subchannel_destroy(grpc_subchannel *c);
* connection implementation
*/
-static void connection_destroy(connection *c) {
+static void connection_destroy(connection *c, grpc_call_list *call_list) {
GPR_ASSERT(c->refs == 0);
- grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c));
+ grpc_channel_stack_destroy(CHANNEL_STACK_FROM_CONNECTION(c), call_list);
gpr_free(c);
}
@@ -209,14 +210,14 @@ static void connection_ref_locked(
}
static grpc_subchannel *connection_unref_locked(
- connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+ connection *c, grpc_call_list *call_list GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
grpc_subchannel *destroy = NULL;
UNREF_LOG("CONNECTION", c);
if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
destroy = c->subchannel;
}
if (--c->refs == 0 && c->subchannel->active != c) {
- connection_destroy(c);
+ connection_destroy(c, call_list);
}
return destroy;
}
@@ -243,17 +244,19 @@ void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
gpr_mu_unlock(&c->mu);
}
-void grpc_subchannel_unref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void grpc_subchannel_unref(grpc_subchannel *c,
+ grpc_call_list *call_list
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
int destroy;
gpr_mu_lock(&c->mu);
destroy = subchannel_unref_locked(c REF_PASS_ARGS);
gpr_mu_unlock(&c->mu);
- if (destroy) subchannel_destroy(c);
+ if (destroy) subchannel_destroy(c, call_list);
}
-static void subchannel_destroy(grpc_subchannel *c) {
+static void subchannel_destroy(grpc_subchannel *c, grpc_call_list *call_list) {
if (c->active != NULL) {
- connection_destroy(c->active);
+ connection_destroy(c->active, call_list);
}
gpr_free(c->filters);
grpc_channel_args_destroy(c->args);
@@ -261,18 +264,19 @@ static void subchannel_destroy(grpc_subchannel *c) {
grpc_mdctx_unref(c->mdctx);
grpc_connectivity_state_destroy(&c->state_tracker);
grpc_connector_unref(c->connector);
- GRPC_WORKQUEUE_UNREF(c->workqueue, "subchannel");
gpr_free(c);
}
void grpc_subchannel_add_interested_party(grpc_subchannel *c,
- grpc_pollset *pollset) {
- grpc_pollset_set_add_pollset(c->pollset_set, pollset);
+ grpc_pollset *pollset,
+ grpc_call_list *call_list) {
+ grpc_pollset_set_add_pollset(c->pollset_set, pollset, call_list);
}
void grpc_subchannel_del_interested_party(grpc_subchannel *c,
- grpc_pollset *pollset) {
- grpc_pollset_set_del_pollset(c->pollset_set, pollset);
+ grpc_pollset *pollset,
+ grpc_call_list *call_list) {
+ grpc_pollset_set_del_pollset(c->pollset_set, pollset, call_list);
}
static gpr_uint32 random_seed() {
@@ -298,8 +302,6 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->args = grpc_channel_args_copy(args->args);
c->mdctx = args->mdctx;
c->master = args->master;
- c->workqueue = grpc_channel_get_workqueue(c->master);
- GRPC_WORKQUEUE_REF(c->workqueue, "subchannel");
c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
c->random = random_seed();
grpc_mdctx_ref(c->mdctx);
@@ -310,7 +312,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
return c;
}
-static void continue_connect(grpc_subchannel *c) {
+static void continue_connect(grpc_subchannel *c, grpc_call_list *call_list) {
grpc_connect_in_args args;
args.interested_parties = c->pollset_set;
@@ -321,24 +323,25 @@ static void continue_connect(grpc_subchannel *c) {
args.metadata_context = c->mdctx;
grpc_connector_connect(c->connector, &args, &c->connecting_result,
- &c->connected);
+ &c->connected, call_list);
}
-static void start_connect(grpc_subchannel *c) {
+static void start_connect(grpc_subchannel *c, grpc_call_list *call_list) {
c->backoff_delta = gpr_time_from_seconds(
GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
c->next_attempt =
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
- continue_connect(c);
+ continue_connect(c, call_list);
}
-static void continue_creating_call(void *arg, int iomgr_success) {
+static void continue_creating_call(void *arg, int iomgr_success,
+ grpc_call_list *call_list) {
waiting_for_connect *w4c = arg;
- grpc_call_list call_list = GRPC_CALL_LIST_INIT;
- grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset);
+ grpc_subchannel_del_interested_party(w4c->subchannel, w4c->pollset,
+ call_list);
grpc_subchannel_create_call(w4c->subchannel, w4c->pollset, w4c->target,
- w4c->notify, &call_list);
- GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect");
+ w4c->notify, call_list);
+ GRPC_SUBCHANNEL_UNREF(w4c->subchannel, "waiting_for_connect", call_list);
gpr_free(w4c);
}
@@ -354,7 +357,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
gpr_mu_unlock(&c->mu);
*target = create_call(con);
- notify->cb(notify->cb_arg, 1);
+ notify->cb(notify->cb_arg, 1, call_list);
} else {
waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
w4c->next = c->waiting;
@@ -366,7 +369,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
c->waiting = w4c;
- grpc_subchannel_add_interested_party(c, pollset);
+ grpc_subchannel_add_interested_party(c, pollset, call_list);
if (!c->connecting) {
c->connecting = 1;
connectivity_state_changed_locked(c, "create_call", call_list);
@@ -375,7 +378,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, grpc_pollset *pollset,
GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
gpr_mu_unlock(&c->mu);
- start_connect(c);
+ start_connect(c, call_list);
} else {
gpr_mu_unlock(&c->mu);
}
@@ -408,16 +411,16 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
gpr_mu_unlock(&c->mu);
if (do_connect) {
- start_connect(c);
+ start_connect(c, call_list);
}
}
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
- grpc_transport_op *op) {
+ grpc_transport_op *op,
+ grpc_call_list *call_list) {
connection *con = NULL;
grpc_subchannel *destroy;
int cancel_alarm = 0;
- grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
if (c->active != NULL) {
con = c->active;
@@ -425,7 +428,7 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
}
if (op->disconnect) {
c->disconnected = 1;
- connectivity_state_changed_locked(c, "disconnect", &call_list);
+ connectivity_state_changed_locked(c, "disconnect", call_list);
if (c->have_alarm) {
cancel_alarm = 1;
}
@@ -436,28 +439,27 @@ void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_channel_element *top_elem =
grpc_channel_stack_element(channel_stack, 0);
- top_elem->filter->start_transport_op(top_elem, op);
+ top_elem->filter->start_transport_op(top_elem, op, call_list);
gpr_mu_lock(&c->mu);
- destroy = CONNECTION_UNREF_LOCKED(con, "transport-op");
+ destroy = CONNECTION_UNREF_LOCKED(con, "transport-op", call_list);
gpr_mu_unlock(&c->mu);
if (destroy) {
- subchannel_destroy(destroy);
+ subchannel_destroy(destroy, call_list);
}
}
if (cancel_alarm) {
- grpc_alarm_cancel(&c->alarm);
+ grpc_alarm_cancel(&c->alarm, call_list);
}
if (op->disconnect) {
- grpc_connector_shutdown(c->connector);
+ grpc_connector_shutdown(c->connector, call_list);
}
-
- grpc_call_list_run(call_list);
}
-static void on_state_changed(void *p, int iomgr_success) {
+static void on_state_changed(void *p, int iomgr_success,
+ grpc_call_list *call_list) {
state_watcher *sw = p;
grpc_subchannel *c = sw->subchannel;
gpr_mu *mu = &c->mu;
@@ -465,7 +467,6 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_transport_op op;
grpc_channel_element *elem;
connection *destroy_connection = NULL;
- grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(mu);
@@ -485,7 +486,7 @@ static void on_state_changed(void *p, int iomgr_success) {
op.on_connectivity_state_change = &sw->closure;
elem = grpc_channel_stack_element(
CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
- elem->filter->start_transport_op(elem, &op);
+ elem->filter->start_transport_op(elem, &op, call_list);
/* early out */
gpr_mu_unlock(mu);
return;
@@ -499,22 +500,21 @@ static void on_state_changed(void *p, int iomgr_success) {
grpc_connectivity_state_set(
&c->state_tracker, c->disconnected ? GRPC_CHANNEL_FATAL_FAILURE
: GRPC_CHANNEL_TRANSIENT_FAILURE,
- "connection_failed", &call_list);
+ "connection_failed", call_list);
break;
}
done:
- connectivity_state_changed_locked(c, "transport_state_changed", &call_list);
+ connectivity_state_changed_locked(c, "transport_state_changed", call_list);
destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
gpr_free(sw);
gpr_mu_unlock(mu);
if (destroy) {
- subchannel_destroy(c);
+ subchannel_destroy(c, call_list);
}
if (destroy_connection != NULL) {
- connection_destroy(destroy_connection);
+ connection_destroy(destroy_connection, call_list);
}
- grpc_call_list_run(call_list);
}
static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
@@ -544,7 +544,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
con->refs = 0;
con->subchannel = c;
grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx,
- stk);
+ stk, call_list);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
gpr_free(c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
@@ -561,9 +561,9 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
gpr_mu_unlock(&c->mu);
gpr_free(sw);
gpr_free(filters);
- grpc_channel_stack_destroy(stk);
+ grpc_channel_stack_destroy(stk, call_list);
GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
- GRPC_SUBCHANNEL_UNREF(c, "connecting");
+ GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list);
return;
}
@@ -587,7 +587,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
elem =
grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
- elem->filter->start_transport_op(elem, &op);
+ elem->filter->start_transport_op(elem, &op, call_list);
/* signal completion */
connectivity_state_changed_locked(c, "connected", call_list);
@@ -605,7 +605,7 @@ static void publish_transport(grpc_subchannel *c, grpc_call_list *call_list) {
gpr_free(filters);
if (destroy_connection != NULL) {
- connection_destroy(destroy_connection);
+ connection_destroy(destroy_connection, call_list);
}
}
@@ -638,41 +638,38 @@ static void update_reconnect_parameters(grpc_subchannel *c) {
gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
}
-static void on_alarm(void *arg, int iomgr_success) {
+static void on_alarm(void *arg, int iomgr_success, grpc_call_list *call_list) {
grpc_subchannel *c = arg;
- grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(&c->mu);
c->have_alarm = 0;
if (c->disconnected) {
iomgr_success = 0;
}
- connectivity_state_changed_locked(c, "alarm", &call_list);
+ connectivity_state_changed_locked(c, "alarm", call_list);
gpr_mu_unlock(&c->mu);
if (iomgr_success) {
update_reconnect_parameters(c);
- continue_connect(c);
+ continue_connect(c, call_list);
} else {
GRPC_CHANNEL_INTERNAL_UNREF(c->master, "connecting");
- GRPC_SUBCHANNEL_UNREF(c, "connecting");
+ GRPC_SUBCHANNEL_UNREF(c, "connecting", call_list);
}
- grpc_call_list_run(call_list);
}
-static void subchannel_connected(void *arg, int iomgr_success) {
+static void subchannel_connected(void *arg, int iomgr_success,
+ grpc_call_list *call_list) {
grpc_subchannel *c = arg;
- grpc_call_list call_list = GRPC_CALL_LIST_INIT;
if (c->connecting_result.transport != NULL) {
- publish_transport(c, &call_list);
+ publish_transport(c, call_list);
} else {
gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
gpr_mu_lock(&c->mu);
GPR_ASSERT(!c->have_alarm);
c->have_alarm = 1;
- connectivity_state_changed_locked(c, "connect_failed", &call_list);
- grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now);
+ connectivity_state_changed_locked(c, "connect_failed", call_list);
+ grpc_alarm_init(&c->alarm, c->next_attempt, on_alarm, c, now, call_list);
gpr_mu_unlock(&c->mu);
}
- grpc_call_list_run(call_list);
}
static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
@@ -718,33 +715,36 @@ void grpc_subchannel_call_ref(
gpr_ref(&c->refs);
}
-void grpc_subchannel_call_unref(
- grpc_subchannel_call *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
+void grpc_subchannel_call_unref(grpc_subchannel_call *c,
+ grpc_call_list *call_list
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
if (gpr_unref(&c->refs)) {
gpr_mu *mu = &c->connection->subchannel->mu;
grpc_subchannel *destroy;
- grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c));
+ grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c), call_list);
gpr_mu_lock(mu);
- destroy = CONNECTION_UNREF_LOCKED(c->connection, "call");
+ destroy = CONNECTION_UNREF_LOCKED(c->connection, "call", call_list);
gpr_mu_unlock(mu);
gpr_free(c);
if (destroy != NULL) {
- subchannel_destroy(destroy);
+ subchannel_destroy(destroy, call_list);
}
}
}
-char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call) {
+char *grpc_subchannel_call_get_peer(grpc_subchannel_call *call,
+ grpc_call_list *call_list) {
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
- return top_elem->filter->get_peer(top_elem);
+ return top_elem->filter->get_peer(top_elem, call_list);
}
void grpc_subchannel_call_process_op(grpc_subchannel_call *call,
- grpc_transport_stream_op *op) {
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
- top_elem->filter->start_transport_stream_op(top_elem, op);
+ top_elem->filter->start_transport_stream_op(top_elem, op, call_list);
}
grpc_subchannel_call *create_call(connection *con) {
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 189e531d35..0ebfc7c29a 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -47,30 +47,33 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
#define GRPC_SUBCHANNEL_REF(p, r) \
grpc_subchannel_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_UNREF(p, r) \
- grpc_subchannel_unref((p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_UNREF(p, r, cl) \
+ grpc_subchannel_unref((p), (cl), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) \
grpc_subchannel_call_ref((p), __FILE__, __LINE__, (r))
-#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) \
- grpc_subchannel_call_unref((p), __FILE__, __LINE__, (r))
+#define GRPC_SUBCHANNEL_CALL_UNREF(p, r, cl) \
+ grpc_subchannel_call_unref((p), (cl), __FILE__, __LINE__, (r))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS \
, const char *file, int line, const char *reason
#else
#define GRPC_SUBCHANNEL_REF(p, r) grpc_subchannel_ref((p))
-#define GRPC_SUBCHANNEL_UNREF(p, r) grpc_subchannel_unref((p))
+#define GRPC_SUBCHANNEL_UNREF(p, r, cl) grpc_subchannel_unref((p), (cl))
#define GRPC_SUBCHANNEL_CALL_REF(p, r) grpc_subchannel_call_ref((p))
-#define GRPC_SUBCHANNEL_CALL_UNREF(p, r) grpc_subchannel_call_unref((p))
+#define GRPC_SUBCHANNEL_CALL_UNREF(p, r, cl) \
+ grpc_subchannel_call_unref((p), (cl))
#define GRPC_SUBCHANNEL_REF_EXTRA_ARGS
#endif
void grpc_subchannel_ref(
grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_subchannel_unref(
- grpc_subchannel *channel GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_unref(grpc_subchannel *channel,
+ grpc_call_list *call_list
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
void grpc_subchannel_call_ref(
grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
-void grpc_subchannel_call_unref(
- grpc_subchannel_call *call GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
+void grpc_subchannel_call_unref(grpc_subchannel_call *call,
+ grpc_call_list *call_list
+ GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
/** construct a call (possibly asynchronously) */
void grpc_subchannel_create_call(grpc_subchannel *subchannel,
@@ -81,7 +84,8 @@ void grpc_subchannel_create_call(grpc_subchannel *subchannel,
/** process a transport level op */
void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel,
- grpc_transport_op *op);
+ grpc_transport_op *op,
+ grpc_call_list *call_list);
/** poll the current connectivity state of a channel */
grpc_connectivity_state grpc_subchannel_check_connectivity(
@@ -96,17 +100,21 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
/** express interest in \a channel's activities through \a pollset. */
void grpc_subchannel_add_interested_party(grpc_subchannel *channel,
- grpc_pollset *pollset);
+ grpc_pollset *pollset,
+ grpc_call_list *call_list);
/** stop following \a channel's activity through \a pollset. */
void grpc_subchannel_del_interested_party(grpc_subchannel *channel,
- grpc_pollset *pollset);
+ grpc_pollset *pollset,
+ grpc_call_list *call_list);
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call,
- grpc_transport_stream_op *op);
+ grpc_transport_stream_op *op,
+ grpc_call_list *call_list);
/** continue querying for peer */
-char *grpc_subchannel_call_get_peer(grpc_subchannel_call *subchannel_call);
+char *grpc_subchannel_call_get_peer(grpc_subchannel_call *subchannel_call,
+ grpc_call_list *call_list);
struct grpc_subchannel_args {
/** Channel filters for this channel - wrapped factories will likely
diff --git a/src/core/client_config/subchannel_factory.h b/src/core/client_config/subchannel_factory.h
index d7eae1c964..b588580edb 100644
--- a/src/core/client_config/subchannel_factory.h
+++ b/src/core/client_config/subchannel_factory.h
@@ -50,7 +50,8 @@ struct grpc_subchannel_factory_vtable {
void (*ref)(grpc_subchannel_factory *factory);
void (*unref)(grpc_subchannel_factory *factory);
grpc_subchannel *(*create_subchannel)(grpc_subchannel_factory *factory,
- grpc_subchannel_args *args);
+ grpc_subchannel_args *args,
+ grpc_call_list *call_list);
};
void grpc_subchannel_factory_ref(grpc_subchannel_factory *factory);
@@ -58,6 +59,7 @@ void grpc_subchannel_factory_unref(grpc_subchannel_factory *factory);
/** Create a new grpc_subchannel */
grpc_subchannel *grpc_subchannel_factory_create_subchannel(
- grpc_subchannel_factory *factory, grpc_subchannel_args *args);
+ grpc_subchannel_factory *factory, grpc_subchannel_args *args,
+ grpc_call_list *call_list);
#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_SUBCHANNEL_FACTORY_H */