aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-06-29 14:36:42 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-06-29 14:36:42 -0700
commit98465035671778ea65891a28bc2c01776a6418cc (patch)
tree3c3de904fc24b462552a00e5081e0f52d93aef5a /src/core
parentcb22184aef0b344719468317078b869fe26b83c6 (diff)
Debugging
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/channel_stack.c4
-rw-r--r--src/core/channel/channel_stack.h3
-rw-r--r--src/core/channel/client_channel.c62
-rw-r--r--src/core/channel/connected_channel.c2
-rw-r--r--src/core/channel/http_client_filter.c2
-rw-r--r--src/core/channel/http_server_filter.c2
-rw-r--r--src/core/channel/noop_filter.c2
-rw-r--r--src/core/client_config/resolver.c26
-rw-r--r--src/core/client_config/resolver.h25
-rw-r--r--src/core/client_config/resolvers/dns_resolver.c33
-rw-r--r--src/core/client_config/resolvers/unix_resolver_posix.c23
-rw-r--r--src/core/client_config/subchannel.c13
-rw-r--r--src/core/client_config/subchannel.h2
-rw-r--r--src/core/iomgr/iomgr.c11
-rw-r--r--src/core/security/client_auth_filter.c2
-rw-r--r--src/core/security/server_auth_filter.c2
-rw-r--r--src/core/surface/channel.c12
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/lame_client.c2
-rw-r--r--src/core/surface/server.c2
-rw-r--r--src/core/transport/connectivity_state.c4
21 files changed, 160 insertions, 78 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c
index ff1077ce4c..0810a61cd0 100644
--- a/src/core/channel/channel_stack.c
+++ b/src/core/channel/channel_stack.c
@@ -102,7 +102,7 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack,
}
void grpc_channel_stack_init(const grpc_channel_filter **filters,
- size_t filter_count, const grpc_channel_args *args,
+ size_t filter_count, grpc_channel *master, const grpc_channel_args *args,
grpc_mdctx *metadata_context,
grpc_channel_stack *stack) {
size_t call_size =
@@ -122,7 +122,7 @@ void grpc_channel_stack_init(const grpc_channel_filter **filters,
for (i = 0; i < filter_count; i++) {
elems[i].filter = filters[i];
elems[i].channel_data = user_data;
- elems[i].filter->init_channel_elem(&elems[i], args, metadata_context,
+ elems[i].filter->init_channel_elem(&elems[i], master, args, metadata_context,
i == 0, i == (filter_count - 1));
user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data);
call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data);
diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h
index 5ac2372f3a..6db98815df 100644
--- a/src/core/channel/channel_stack.h
+++ b/src/core/channel/channel_stack.h
@@ -97,6 +97,7 @@ typedef struct {
useful for asserting correct configuration by upper layer code.
The filter does not need to do any chaining */
void (*init_channel_elem)(grpc_channel_element *elem,
+ grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last);
@@ -151,7 +152,7 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters,
size_t filter_count);
/* Initialize a channel stack given some filters */
void grpc_channel_stack_init(const grpc_channel_filter **filters,
- size_t filter_count, const grpc_channel_args *args,
+ size_t filter_count, grpc_channel *master,const grpc_channel_args *args,
grpc_mdctx *metadata_context,
grpc_channel_stack *stack);
/* Destroy a channel stack */
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index e2f8debdfa..ee0d2cd9bd 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -38,6 +38,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/surface/channel.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
@@ -56,6 +57,8 @@ typedef struct {
grpc_mdctx *mdctx;
/** resolver for this channel */
grpc_resolver *resolver;
+ /** master channel */
+ grpc_channel *master;
/** mutex protecting client configuration, resolution state */
gpr_mu mu_config;
@@ -321,10 +324,6 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
perform_transport_stream_op(elem, op, 0);
}
-static void update_state_locked(channel_data *chand) {
- gpr_log(GPR_ERROR, "update_state_locked not implemented");
-}
-
static void cc_on_config_changed(void *arg, int iomgr_success) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
@@ -350,31 +349,42 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
gpr_mu_unlock(&chand->mu_config);
- while (wakeup_closures) {
- grpc_iomgr_closure *next = wakeup_closures->next;
- grpc_iomgr_add_callback(wakeup_closures);
- wakeup_closures = next;
- }
-
if (old_lb_policy) {
GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
}
- if (iomgr_success) {
+ gpr_mu_lock(&chand->mu_config);
+ if (iomgr_success && chand->resolver) {
+ grpc_resolver *resolver = chand->resolver;
+ GRPC_RESOLVER_REF(resolver, "channel-next");
+ gpr_mu_unlock(&chand->mu_config);
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed);
+ GRPC_RESOLVER_UNREF(resolver, "channel-next");
} else {
- gpr_mu_lock(&chand->mu_config);
old_resolver = chand->resolver;
chand->resolver = NULL;
- update_state_locked(chand);
+ grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
gpr_mu_unlock(&chand->mu_config);
- grpc_resolver_unref(old_resolver);
+ if (old_resolver != NULL) {
+ grpc_resolver_shutdown(old_resolver);
+ GRPC_RESOLVER_UNREF(old_resolver, "channel");
+ }
+ }
+
+ while (wakeup_closures) {
+ grpc_iomgr_closure *next = wakeup_closures->next;
+ grpc_iomgr_add_callback(wakeup_closures);
+ wakeup_closures = next;
}
+
+ GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {
grpc_lb_policy *lb_policy = NULL;
channel_data *chand = elem->channel_data;
+ grpc_resolver *destroy_resolver = NULL;
grpc_iomgr_closure *on_consumed = op->on_consumed;
op->on_consumed = NULL;
@@ -388,6 +398,13 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op
op->connectivity_state = NULL;
}
+ if (op->disconnect && chand->resolver != NULL) {
+ grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE);
+ destroy_resolver = chand->resolver;
+ chand->resolver = NULL;
+ op->disconnect = 0;
+ }
+
if (!is_empty(op, sizeof(*op))) {
lb_policy = chand->lb_policy;
if (lb_policy) {
@@ -396,6 +413,11 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op
}
gpr_mu_unlock(&chand->mu_config);
+ if (destroy_resolver) {
+ grpc_resolver_shutdown(destroy_resolver);
+ GRPC_RESOLVER_UNREF(destroy_resolver, "channel");
+ }
+
if (lb_policy) {
grpc_lb_policy_broadcast(lb_policy, op);
GRPC_LB_POLICY_UNREF(lb_policy, "broadcast");
@@ -432,6 +454,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
remove it from the in-flight requests tracked by the child_entry we
picked */
gpr_mu_lock(&calld->mu_state);
+ gpr_log(GPR_DEBUG, "call_elem destroy @ state %d", calld->state);
switch (calld->state) {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
@@ -452,7 +475,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
@@ -465,7 +488,10 @@ static void init_channel_elem(grpc_channel_element *elem,
gpr_mu_init(&chand->mu_config);
chand->mdctx = metadata_context;
+ chand->master = master;
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
+
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
}
/* Destructor for channel_data */
@@ -473,7 +499,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
channel_data *chand = elem->channel_data;
if (chand->resolver != NULL) {
- grpc_resolver_unref(chand->resolver);
+ grpc_resolver_shutdown(chand->resolver);
+ GRPC_RESOLVER_UNREF(chand->resolver, "channel");
}
if (chand->lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
@@ -494,6 +521,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
channel_data *chand = elem->channel_data;
GPR_ASSERT(!chand->resolver);
chand->resolver = resolver;
- grpc_resolver_ref(resolver);
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
+ GRPC_RESOLVER_REF(resolver, "channel");
grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed);
}
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 84caecb6b3..99c8a643f6 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -103,7 +103,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *cd = (channel_data *)elem->channel_data;
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 5dec734c8c..3d1fc6a020 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -170,7 +170,7 @@ static const char *scheme_from_args(const grpc_channel_args *args) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index dac53e9bf1..3b1128bef9 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -229,7 +229,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */
diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c
index 1478f04a3c..0d9c2e82a8 100644
--- a/src/core/channel/noop_filter.c
+++ b/src/core/channel/noop_filter.c
@@ -95,7 +95,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
/* grab pointers to our data from the channel element */
diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c
index 11ba27e58d..bbc0ec4e81 100644
--- a/src/core/client_config/resolver.c
+++ b/src/core/client_config/resolver.c
@@ -33,12 +33,34 @@
#include "src/core/client_config/resolver.h"
+void grpc_resolver_init(grpc_resolver *resolver,
+ const grpc_resolver_vtable *vtable) {
+ resolver->vtable = vtable;
+ gpr_ref_init(&resolver->refs, 1);
+}
+
+#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
+void grpc_resolver_ref(grpc_resolver *resolver, const char *file, int line,
+ const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s",
+ resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1, reason);
+#else
void grpc_resolver_ref(grpc_resolver *resolver) {
- resolver->vtable->ref(resolver);
+#endif
+ gpr_ref(&resolver->refs);
}
+#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
+void grpc_resolver_unref(grpc_resolver *resolver, const char *file, int line,
+ const char *reason) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s",
+ resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1, reason);
+#else
void grpc_resolver_unref(grpc_resolver *resolver) {
- resolver->vtable->unref(resolver);
+#endif
+ if (gpr_unref(&resolver->refs)) {
+ resolver->vtable->destroy(resolver);
+ }
}
void grpc_resolver_shutdown(grpc_resolver *resolver) {
diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h
index 7776870c08..16b5964eb6 100644
--- a/src/core/client_config/resolver.h
+++ b/src/core/client_config/resolver.h
@@ -45,11 +45,11 @@ typedef struct grpc_resolver_vtable grpc_resolver_vtable;
objects */
struct grpc_resolver {
const grpc_resolver_vtable *vtable;
+ gpr_refcount refs;
};
struct grpc_resolver_vtable {
- void (*ref)(grpc_resolver *resolver);
- void (*unref)(grpc_resolver *resolver);
+ void (*destroy)(grpc_resolver *resolver);
void (*shutdown)(grpc_resolver *resolver);
void (*channel_saw_error)(grpc_resolver *resolver,
struct sockaddr *failing_address,
@@ -58,8 +58,25 @@ struct grpc_resolver_vtable {
grpc_iomgr_closure *on_complete);
};
-void grpc_resolver_ref(grpc_resolver *resolver);
-void grpc_resolver_unref(grpc_resolver *resolver);
+#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG
+#define GRPC_RESOLVER_REF(p, r) \
+ grpc_resolver_ref((p), __FILE__, __LINE__, (r))
+#define GRPC_RESOLVER_UNREF(p, r) \
+ grpc_resolver_unref((p), __FILE__, __LINE__, (r))
+void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line,
+ const char *reason);
+void grpc_resolver_unref(grpc_resolver *policy, const char *file, int line,
+ const char *reason);
+#else
+#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p))
+#define GRPC_RESOLVER_UNREF(p, r) grpc_resolver_unref((p))
+void grpc_resolver_ref(grpc_resolver *policy);
+void grpc_resolver_unref(grpc_resolver *policy);
+#endif
+
+void grpc_resolver_init(grpc_resolver *resolver,
+ const grpc_resolver_vtable *vtable);
+
void grpc_resolver_shutdown(grpc_resolver *resolver);
/** Notification that the channel has seen an error on some address.
diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c
index 8693bcf5eb..c64491ae51 100644
--- a/src/core/client_config/resolvers/dns_resolver.c
+++ b/src/core/client_config/resolvers/dns_resolver.c
@@ -73,13 +73,11 @@ typedef struct {
grpc_client_config *resolved_config;
} dns_resolver;
-static void dns_destroy(dns_resolver *r);
+static void dns_destroy(grpc_resolver *r);
static void dns_start_resolving_locked(dns_resolver *r);
static void dns_maybe_finish_next_locked(dns_resolver *r);
-static void dns_ref(grpc_resolver *r);
-static void dns_unref(grpc_resolver *r);
static void dns_shutdown(grpc_resolver *r);
static void dns_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
@@ -88,26 +86,13 @@ static void dns_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
static const grpc_resolver_vtable dns_resolver_vtable = {
- dns_ref, dns_unref, dns_shutdown, dns_channel_saw_error, dns_next};
-
-static void dns_ref(grpc_resolver *resolver) {
- dns_resolver *r = (dns_resolver *)resolver;
- gpr_ref(&r->refs);
-}
-
-static void dns_unref(grpc_resolver *resolver) {
- dns_resolver *r = (dns_resolver *)resolver;
- if (gpr_unref(&r->refs)) {
- dns_destroy(r);
- }
-}
+ dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next};
static void dns_shutdown(grpc_resolver *resolver) {
dns_resolver *r = (dns_resolver *)resolver;
gpr_mu_lock(&r->mu);
if (r->next_completion != NULL) {
*r->target_config = NULL;
- /* TODO(ctiller): add delayed callback */
grpc_iomgr_add_callback(r->next_completion);
r->next_completion = NULL;
}
@@ -160,8 +145,12 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs);
grpc_client_config_set_lb_policy(config, lb_policy);
GRPC_LB_POLICY_UNREF(lb_policy, "construction");
+ grpc_resolved_addresses_destroy(addresses);
+ gpr_free(subchannels);
}
gpr_mu_lock(&r->mu);
+ GPR_ASSERT(r->resolving);
+ r->resolving = 0;
if (r->resolved_config) {
grpc_client_config_unref(r->resolved_config);
}
@@ -170,11 +159,12 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) {
dns_maybe_finish_next_locked(r);
gpr_mu_unlock(&r->mu);
- dns_unref(&r->base);
+ GRPC_RESOLVER_UNREF(&r->base, "dns-resolving");
}
static void dns_start_resolving_locked(dns_resolver *r) {
- dns_ref(&r->base);
+ GRPC_RESOLVER_REF(&r->base, "dns-resolving");
+ GPR_ASSERT(!r->resolving);
r->resolving = 1;
grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r);
}
@@ -190,7 +180,8 @@ static void dns_maybe_finish_next_locked(dns_resolver *r) {
}
}
-static void dns_destroy(dns_resolver *r) {
+static void dns_destroy(grpc_resolver *gr) {
+ dns_resolver *r = (dns_resolver *)gr;
gpr_mu_destroy(&r->mu);
if (r->resolved_config) {
grpc_client_config_unref(r->resolved_config);
@@ -220,7 +211,7 @@ static grpc_resolver *dns_create(
memset(r, 0, sizeof(*r));
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
- r->base.vtable = &dns_resolver_vtable;
+ grpc_resolver_init(&r->base, &dns_resolver_vtable);
r->name = gpr_strdup(path);
r->default_port = gpr_strdup(default_port);
r->subchannel_factory = subchannel_factory;
diff --git a/src/core/client_config/resolvers/unix_resolver_posix.c b/src/core/client_config/resolvers/unix_resolver_posix.c
index f7498548b1..7f2008685c 100644
--- a/src/core/client_config/resolvers/unix_resolver_posix.c
+++ b/src/core/client_config/resolvers/unix_resolver_posix.c
@@ -71,12 +71,10 @@ typedef struct {
grpc_client_config **target_config;
} unix_resolver;
-static void unix_destroy(unix_resolver *r);
+static void unix_destroy(grpc_resolver *r);
static void unix_maybe_finish_next_locked(unix_resolver *r);
-static void unix_ref(grpc_resolver *r);
-static void unix_unref(grpc_resolver *r);
static void unix_shutdown(grpc_resolver *r);
static void unix_channel_saw_error(grpc_resolver *r,
struct sockaddr *failing_address,
@@ -85,19 +83,7 @@ static void unix_next(grpc_resolver *r, grpc_client_config **target_config,
grpc_iomgr_closure *on_complete);
static const grpc_resolver_vtable unix_resolver_vtable = {
- unix_ref, unix_unref, unix_shutdown, unix_channel_saw_error, unix_next};
-
-static void unix_ref(grpc_resolver *resolver) {
- unix_resolver *r = (unix_resolver *)resolver;
- gpr_ref(&r->refs);
-}
-
-static void unix_unref(grpc_resolver *resolver) {
- unix_resolver *r = (unix_resolver *)resolver;
- if (gpr_unref(&r->refs)) {
- unix_destroy(r);
- }
-}
+ unix_destroy, unix_shutdown, unix_channel_saw_error, unix_next};
static void unix_shutdown(grpc_resolver *resolver) {
unix_resolver *r = (unix_resolver *)resolver;
@@ -149,7 +135,8 @@ static void unix_maybe_finish_next_locked(unix_resolver *r) {
}
}
-static void unix_destroy(unix_resolver *r) {
+static void unix_destroy(grpc_resolver *gr) {
+ unix_resolver *r = (unix_resolver*)gr;
gpr_mu_destroy(&r->mu);
grpc_subchannel_factory_unref(r->subchannel_factory);
gpr_free(r);
@@ -171,7 +158,7 @@ static grpc_resolver *unix_create(
memset(r, 0, sizeof(*r));
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
- r->base.vtable = &unix_resolver_vtable;
+ grpc_resolver_init(&r->base, &unix_resolver_vtable);
r->subchannel_factory = subchannel_factory;
r->lb_policy_factory = lb_policy_factory;
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index a0d21d99eb..6f4bf2ebe8 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -78,6 +78,8 @@ struct grpc_subchannel {
size_t addr_len;
/** metadata context */
grpc_mdctx *mdctx;
+ /** master channel */
+ grpc_channel *master;
/** set during connection */
grpc_connect_out_args connecting_result;
@@ -217,6 +219,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
c->addr_len = args->addr_len;
c->args = grpc_channel_args_copy(args->args);
c->mdctx = args->mdctx;
+ c->master = args->master;
grpc_mdctx_ref(c->mdctx);
grpc_pollset_set_init(&c->pollset_set);
grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
@@ -267,6 +270,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
w4c->initial_op = *initial_op;
w4c->target = target;
w4c->subchannel = c;
+ /* released when clearing w4c */
subchannel_ref_locked(c);
grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c);
c->waiting = w4c;
@@ -274,6 +278,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
if (!c->connecting) {
c->connecting = 1;
connectivity_state_changed_locked(c);
+ /* released by connection */
subchannel_ref_locked(c);
gpr_mu_unlock(&c->mu);
@@ -301,6 +306,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
notify)) {
do_connect = 1;
c->connecting = 1;
+ /* released by connection */
subchannel_ref_locked(c);
grpc_connectivity_state_set(&c->state_tracker,
compute_connectivity_locked(c));
@@ -313,7 +319,8 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
void grpc_subchannel_process_transport_op(grpc_subchannel *c,
grpc_transport_op *op) {
- abort(); /* not implemented */
+ gpr_log(GPR_ERROR, "grpc_subchannel_process_transport_op not implemented");
+ abort();
}
static void on_state_changed(void *p, int iomgr_success) {
@@ -357,6 +364,7 @@ static void on_state_changed(void *p, int iomgr_success) {
break;
case GRPC_CHANNEL_TRANSIENT_FAILURE:
/* things are starting to go wrong, reconnect but don't deactivate */
+ /* released by connection */
subchannel_ref_locked(c);
do_connect = 1;
c->connecting = 1;
@@ -406,8 +414,9 @@ static void publish_transport(grpc_subchannel *c) {
stk = (grpc_channel_stack *)(con + 1);
con->refs = 0;
con->subchannel = c;
- grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk);
+ grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, stk);
grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
+ gpr_free(c->connecting_result.filters);
memset(&c->connecting_result, 0, sizeof(c->connecting_result));
/* initialize state watcher */
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index b777e51d20..766258846a 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -90,6 +90,8 @@ struct grpc_subchannel_args {
size_t addr_len;
/** metadata context to use */
grpc_mdctx *mdctx;
+ /** master channel */
+ grpc_channel *master;
};
/** create a subchannel given a connector */
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 2765706de8..8fbddd73b0 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -201,10 +201,21 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
closure->next = NULL;
}
+static void assert_not_scheduled_locked(grpc_iomgr_closure *closure) {
+#ifndef NDEBUG
+ grpc_iomgr_closure *c;
+
+ for (c = g_cbs_head; c; c = c->next) {
+ GPR_ASSERT(c != closure);
+ }
+#endif
+}
+
void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) {
closure->success = success;
GPR_ASSERT(closure->cb);
gpr_mu_lock(&g_mu);
+ assert_not_scheduled_locked(closure);
closure->next = NULL;
if (!g_cbs_tail) {
g_cbs_head = g_cbs_tail = closure;
diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c
index 2c05f73df5..0bd370e457 100644
--- a/src/core/security/client_auth_filter.c
+++ b/src/core/security/client_auth_filter.c
@@ -280,7 +280,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index cf5ce4010e..a92b46c85f 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -88,7 +88,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
}
/* Constructor for channel_data */
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
grpc_security_connector *sc = grpc_find_security_connector_in_args(args);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 4857912b4f..e85eaf2c05 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -109,8 +109,6 @@ grpc_channel *grpc_channel_create_from_filters(
}
channel->path_string = grpc_mdstr_from_string(mdctx, ":path");
channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority");
- grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context,
- CHANNEL_STACK_FROM_CHANNEL(channel));
gpr_mu_init(&channel->registered_call_mu);
channel->registered_calls = NULL;
@@ -131,6 +129,9 @@ grpc_channel *grpc_channel_create_from_filters(
}
}
+ grpc_channel_stack_init(filters, num_filters, channel, args, channel->metadata_context,
+ CHANNEL_STACK_FROM_CHANNEL(channel));
+
return channel;
}
@@ -237,6 +238,13 @@ void grpc_channel_internal_unref(grpc_channel *channel) {
}
void grpc_channel_destroy(grpc_channel *channel) {
+ grpc_transport_op op;
+ grpc_channel_element *elem;
+ memset(&op, 0, sizeof(op));
+ op.disconnect = 1;
+ elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0);
+ elem->filter->start_transport_op(elem, &op);
+
GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel");
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 494a44725a..0d756a131e 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -155,6 +155,7 @@ grpc_channel *grpc_channel_create(const char *target,
f = gpr_malloc(sizeof(*f));
f->base.vtable = &subchannel_factory_vtable;
gpr_ref_init(&f->refs, 1);
+ grpc_mdctx_ref(mdctx);
f->mdctx = mdctx;
resolver = grpc_resolver_create(target, &f->base);
if (!resolver) {
@@ -163,7 +164,8 @@ grpc_channel *grpc_channel_create(const char *target,
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver);
- grpc_resolver_unref(resolver);
+ GRPC_RESOLVER_UNREF(resolver, "create");
+ grpc_subchannel_factory_unref(&f->base);
return channel;
}
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 5235d3f7f4..c6ac679871 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -105,7 +105,7 @@ static void init_call_elem(grpc_call_element *elem,
static void destroy_call_elem(grpc_call_element *elem) {}
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
const grpc_channel_args *args, grpc_mdctx *mdctx,
int is_first, int is_last) {
channel_data *chand = elem->channel_data;
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 6d06725bf3..f7d385c7af 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -709,7 +709,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
server_unref(chand->server);
}
-static void init_channel_elem(grpc_channel_element *elem,
+static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
int is_last) {
diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c
index 5cbd67ef3c..8df08af32f 100644
--- a/src/core/transport/connectivity_state.c
+++ b/src/core/transport/connectivity_state.c
@@ -33,15 +33,18 @@
#include "src/core/transport/connectivity_state.h"
#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state init_state) {
tracker->current_state = init_state;
tracker->watchers = NULL;
+ /*gpr_log(GPR_DEBUG, "CS:%p:init:%d", tracker, init_state);*/
}
void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) {
grpc_connectivity_state_watcher *w;
+ /*gpr_log(GPR_DEBUG, "CS:%p:destroy", tracker);*/
while ((w = tracker->watchers)) {
tracker->watchers = w->next;
@@ -80,6 +83,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker,
grpc_connectivity_state state) {
grpc_connectivity_state_watcher *new = NULL;
grpc_connectivity_state_watcher *w;
+ /*gpr_log(GPR_DEBUG, "CS:%p:set:%d", tracker, state);*/
if (tracker->current_state == state) {
return;
}