diff options
author | 2015-06-29 14:36:42 -0700 | |
---|---|---|
committer | 2015-06-29 14:36:42 -0700 | |
commit | 98465035671778ea65891a28bc2c01776a6418cc (patch) | |
tree | 3c3de904fc24b462552a00e5081e0f52d93aef5a /src/core | |
parent | cb22184aef0b344719468317078b869fe26b83c6 (diff) |
Debugging
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/channel/channel_stack.c | 4 | ||||
-rw-r--r-- | src/core/channel/channel_stack.h | 3 | ||||
-rw-r--r-- | src/core/channel/client_channel.c | 62 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 2 | ||||
-rw-r--r-- | src/core/channel/http_client_filter.c | 2 | ||||
-rw-r--r-- | src/core/channel/http_server_filter.c | 2 | ||||
-rw-r--r-- | src/core/channel/noop_filter.c | 2 | ||||
-rw-r--r-- | src/core/client_config/resolver.c | 26 | ||||
-rw-r--r-- | src/core/client_config/resolver.h | 25 | ||||
-rw-r--r-- | src/core/client_config/resolvers/dns_resolver.c | 33 | ||||
-rw-r--r-- | src/core/client_config/resolvers/unix_resolver_posix.c | 23 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 13 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 2 | ||||
-rw-r--r-- | src/core/iomgr/iomgr.c | 11 | ||||
-rw-r--r-- | src/core/security/client_auth_filter.c | 2 | ||||
-rw-r--r-- | src/core/security/server_auth_filter.c | 2 | ||||
-rw-r--r-- | src/core/surface/channel.c | 12 | ||||
-rw-r--r-- | src/core/surface/channel_create.c | 4 | ||||
-rw-r--r-- | src/core/surface/lame_client.c | 2 | ||||
-rw-r--r-- | src/core/surface/server.c | 2 | ||||
-rw-r--r-- | src/core/transport/connectivity_state.c | 4 |
21 files changed, 160 insertions, 78 deletions
diff --git a/src/core/channel/channel_stack.c b/src/core/channel/channel_stack.c index ff1077ce4c..0810a61cd0 100644 --- a/src/core/channel/channel_stack.c +++ b/src/core/channel/channel_stack.c @@ -102,7 +102,7 @@ grpc_call_element *grpc_call_stack_element(grpc_call_stack *call_stack, } void grpc_channel_stack_init(const grpc_channel_filter **filters, - size_t filter_count, const grpc_channel_args *args, + size_t filter_count, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, grpc_channel_stack *stack) { size_t call_size = @@ -122,7 +122,7 @@ void grpc_channel_stack_init(const grpc_channel_filter **filters, for (i = 0; i < filter_count; i++) { elems[i].filter = filters[i]; elems[i].channel_data = user_data; - elems[i].filter->init_channel_elem(&elems[i], args, metadata_context, + elems[i].filter->init_channel_elem(&elems[i], master, args, metadata_context, i == 0, i == (filter_count - 1)); user_data += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_channel_data); call_size += ROUND_UP_TO_ALIGNMENT_SIZE(filters[i]->sizeof_call_data); diff --git a/src/core/channel/channel_stack.h b/src/core/channel/channel_stack.h index 5ac2372f3a..6db98815df 100644 --- a/src/core/channel/channel_stack.h +++ b/src/core/channel/channel_stack.h @@ -97,6 +97,7 @@ typedef struct { useful for asserting correct configuration by upper layer code. The filter does not need to do any chaining */ void (*init_channel_elem)(grpc_channel_element *elem, + grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last); @@ -151,7 +152,7 @@ size_t grpc_channel_stack_size(const grpc_channel_filter **filters, size_t filter_count); /* Initialize a channel stack given some filters */ void grpc_channel_stack_init(const grpc_channel_filter **filters, - size_t filter_count, const grpc_channel_args *args, + size_t filter_count, grpc_channel *master,const grpc_channel_args *args, grpc_mdctx *metadata_context, grpc_channel_stack *stack); /* Destroy a channel stack */ diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index e2f8debdfa..ee0d2cd9bd 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -38,6 +38,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" +#include "src/core/surface/channel.h" #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/pollset_set.h" #include "src/core/support/string.h" @@ -56,6 +57,8 @@ typedef struct { grpc_mdctx *mdctx; /** resolver for this channel */ grpc_resolver *resolver; + /** master channel */ + grpc_channel *master; /** mutex protecting client configuration, resolution state */ gpr_mu mu_config; @@ -321,10 +324,6 @@ static void cc_start_transport_stream_op(grpc_call_element *elem, perform_transport_stream_op(elem, op, 0); } -static void update_state_locked(channel_data *chand) { - gpr_log(GPR_ERROR, "update_state_locked not implemented"); -} - static void cc_on_config_changed(void *arg, int iomgr_success) { channel_data *chand = arg; grpc_lb_policy *lb_policy = NULL; @@ -350,31 +349,42 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } gpr_mu_unlock(&chand->mu_config); - while (wakeup_closures) { - grpc_iomgr_closure *next = wakeup_closures->next; - grpc_iomgr_add_callback(wakeup_closures); - wakeup_closures = next; - } - if (old_lb_policy) { GRPC_LB_POLICY_UNREF(old_lb_policy, "channel"); } - if (iomgr_success) { + gpr_mu_lock(&chand->mu_config); + if (iomgr_success && chand->resolver) { + grpc_resolver *resolver = chand->resolver; + GRPC_RESOLVER_REF(resolver, "channel-next"); + gpr_mu_unlock(&chand->mu_config); + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); grpc_resolver_next(chand->resolver, &chand->incoming_configuration, &chand->on_config_changed); + GRPC_RESOLVER_UNREF(resolver, "channel-next"); } else { - gpr_mu_lock(&chand->mu_config); old_resolver = chand->resolver; chand->resolver = NULL; - update_state_locked(chand); + grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE); gpr_mu_unlock(&chand->mu_config); - grpc_resolver_unref(old_resolver); + if (old_resolver != NULL) { + grpc_resolver_shutdown(old_resolver); + GRPC_RESOLVER_UNREF(old_resolver, "channel"); + } + } + + while (wakeup_closures) { + grpc_iomgr_closure *next = wakeup_closures->next; + grpc_iomgr_add_callback(wakeup_closures); + wakeup_closures = next; } + + GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver"); } static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) { grpc_lb_policy *lb_policy = NULL; channel_data *chand = elem->channel_data; + grpc_resolver *destroy_resolver = NULL; grpc_iomgr_closure *on_consumed = op->on_consumed; op->on_consumed = NULL; @@ -388,6 +398,13 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op op->connectivity_state = NULL; } + if (op->disconnect && chand->resolver != NULL) { + grpc_connectivity_state_set(&chand->state_tracker, GRPC_CHANNEL_FATAL_FAILURE); + destroy_resolver = chand->resolver; + chand->resolver = NULL; + op->disconnect = 0; + } + if (!is_empty(op, sizeof(*op))) { lb_policy = chand->lb_policy; if (lb_policy) { @@ -396,6 +413,11 @@ static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op } gpr_mu_unlock(&chand->mu_config); + if (destroy_resolver) { + grpc_resolver_shutdown(destroy_resolver); + GRPC_RESOLVER_UNREF(destroy_resolver, "channel"); + } + if (lb_policy) { grpc_lb_policy_broadcast(lb_policy, op); GRPC_LB_POLICY_UNREF(lb_policy, "broadcast"); @@ -432,6 +454,7 @@ static void destroy_call_elem(grpc_call_element *elem) { remove it from the in-flight requests tracked by the child_entry we picked */ gpr_mu_lock(&calld->mu_state); + gpr_log(GPR_DEBUG, "call_elem destroy @ state %d", calld->state); switch (calld->state) { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; @@ -452,7 +475,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last) { @@ -465,7 +488,10 @@ static void init_channel_elem(grpc_channel_element *elem, gpr_mu_init(&chand->mu_config); chand->mdctx = metadata_context; + chand->master = master; grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed, chand); + + grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE); } /* Destructor for channel_data */ @@ -473,7 +499,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) { channel_data *chand = elem->channel_data; if (chand->resolver != NULL) { - grpc_resolver_unref(chand->resolver); + grpc_resolver_shutdown(chand->resolver); + GRPC_RESOLVER_UNREF(chand->resolver, "channel"); } if (chand->lb_policy != NULL) { GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel"); @@ -494,6 +521,7 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack, channel_data *chand = elem->channel_data; GPR_ASSERT(!chand->resolver); chand->resolver = resolver; - grpc_resolver_ref(resolver); + GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver"); + GRPC_RESOLVER_REF(resolver, "channel"); grpc_resolver_next(resolver, &chand->incoming_configuration, &chand->on_config_changed); } diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 84caecb6b3..99c8a643f6 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -103,7 +103,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *cd = (channel_data *)elem->channel_data; diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c index 5dec734c8c..3d1fc6a020 100644 --- a/src/core/channel/http_client_filter.c +++ b/src/core/channel/http_client_filter.c @@ -170,7 +170,7 @@ static const char *scheme_from_args(const grpc_channel_args *args) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { /* grab pointers to our data from the channel element */ diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c index dac53e9bf1..3b1128bef9 100644 --- a/src/core/channel/http_server_filter.c +++ b/src/core/channel/http_server_filter.c @@ -229,7 +229,7 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) {} /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { /* grab pointers to our data from the channel element */ diff --git a/src/core/channel/noop_filter.c b/src/core/channel/noop_filter.c index 1478f04a3c..0d9c2e82a8 100644 --- a/src/core/channel/noop_filter.c +++ b/src/core/channel/noop_filter.c @@ -95,7 +95,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { /* grab pointers to our data from the channel element */ diff --git a/src/core/client_config/resolver.c b/src/core/client_config/resolver.c index 11ba27e58d..bbc0ec4e81 100644 --- a/src/core/client_config/resolver.c +++ b/src/core/client_config/resolver.c @@ -33,12 +33,34 @@ #include "src/core/client_config/resolver.h" +void grpc_resolver_init(grpc_resolver *resolver, + const grpc_resolver_vtable *vtable) { + resolver->vtable = vtable; + gpr_ref_init(&resolver->refs, 1); +} + +#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG +void grpc_resolver_ref(grpc_resolver *resolver, const char *file, int line, + const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p ref %d -> %d %s", + resolver, (int)resolver->refs.count, (int)resolver->refs.count + 1, reason); +#else void grpc_resolver_ref(grpc_resolver *resolver) { - resolver->vtable->ref(resolver); +#endif + gpr_ref(&resolver->refs); } +#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG +void grpc_resolver_unref(grpc_resolver *resolver, const char *file, int line, + const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "RESOLVER:%p unref %d -> %d %s", + resolver, (int)resolver->refs.count, (int)resolver->refs.count - 1, reason); +#else void grpc_resolver_unref(grpc_resolver *resolver) { - resolver->vtable->unref(resolver); +#endif + if (gpr_unref(&resolver->refs)) { + resolver->vtable->destroy(resolver); + } } void grpc_resolver_shutdown(grpc_resolver *resolver) { diff --git a/src/core/client_config/resolver.h b/src/core/client_config/resolver.h index 7776870c08..16b5964eb6 100644 --- a/src/core/client_config/resolver.h +++ b/src/core/client_config/resolver.h @@ -45,11 +45,11 @@ typedef struct grpc_resolver_vtable grpc_resolver_vtable; objects */ struct grpc_resolver { const grpc_resolver_vtable *vtable; + gpr_refcount refs; }; struct grpc_resolver_vtable { - void (*ref)(grpc_resolver *resolver); - void (*unref)(grpc_resolver *resolver); + void (*destroy)(grpc_resolver *resolver); void (*shutdown)(grpc_resolver *resolver); void (*channel_saw_error)(grpc_resolver *resolver, struct sockaddr *failing_address, @@ -58,8 +58,25 @@ struct grpc_resolver_vtable { grpc_iomgr_closure *on_complete); }; -void grpc_resolver_ref(grpc_resolver *resolver); -void grpc_resolver_unref(grpc_resolver *resolver); +#ifdef GRPC_RESOLVER_REFCOUNT_DEBUG +#define GRPC_RESOLVER_REF(p, r) \ + grpc_resolver_ref((p), __FILE__, __LINE__, (r)) +#define GRPC_RESOLVER_UNREF(p, r) \ + grpc_resolver_unref((p), __FILE__, __LINE__, (r)) +void grpc_resolver_ref(grpc_resolver *policy, const char *file, int line, + const char *reason); +void grpc_resolver_unref(grpc_resolver *policy, const char *file, int line, + const char *reason); +#else +#define GRPC_RESOLVER_REF(p, r) grpc_resolver_ref((p)) +#define GRPC_RESOLVER_UNREF(p, r) grpc_resolver_unref((p)) +void grpc_resolver_ref(grpc_resolver *policy); +void grpc_resolver_unref(grpc_resolver *policy); +#endif + +void grpc_resolver_init(grpc_resolver *resolver, + const grpc_resolver_vtable *vtable); + void grpc_resolver_shutdown(grpc_resolver *resolver); /** Notification that the channel has seen an error on some address. diff --git a/src/core/client_config/resolvers/dns_resolver.c b/src/core/client_config/resolvers/dns_resolver.c index 8693bcf5eb..c64491ae51 100644 --- a/src/core/client_config/resolvers/dns_resolver.c +++ b/src/core/client_config/resolvers/dns_resolver.c @@ -73,13 +73,11 @@ typedef struct { grpc_client_config *resolved_config; } dns_resolver; -static void dns_destroy(dns_resolver *r); +static void dns_destroy(grpc_resolver *r); static void dns_start_resolving_locked(dns_resolver *r); static void dns_maybe_finish_next_locked(dns_resolver *r); -static void dns_ref(grpc_resolver *r); -static void dns_unref(grpc_resolver *r); static void dns_shutdown(grpc_resolver *r); static void dns_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, @@ -88,26 +86,13 @@ static void dns_next(grpc_resolver *r, grpc_client_config **target_config, grpc_iomgr_closure *on_complete); static const grpc_resolver_vtable dns_resolver_vtable = { - dns_ref, dns_unref, dns_shutdown, dns_channel_saw_error, dns_next}; - -static void dns_ref(grpc_resolver *resolver) { - dns_resolver *r = (dns_resolver *)resolver; - gpr_ref(&r->refs); -} - -static void dns_unref(grpc_resolver *resolver) { - dns_resolver *r = (dns_resolver *)resolver; - if (gpr_unref(&r->refs)) { - dns_destroy(r); - } -} + dns_destroy, dns_shutdown, dns_channel_saw_error, dns_next}; static void dns_shutdown(grpc_resolver *resolver) { dns_resolver *r = (dns_resolver *)resolver; gpr_mu_lock(&r->mu); if (r->next_completion != NULL) { *r->target_config = NULL; - /* TODO(ctiller): add delayed callback */ grpc_iomgr_add_callback(r->next_completion); r->next_completion = NULL; } @@ -160,8 +145,12 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { lb_policy = r->lb_policy_factory(subchannels, addresses->naddrs); grpc_client_config_set_lb_policy(config, lb_policy); GRPC_LB_POLICY_UNREF(lb_policy, "construction"); + grpc_resolved_addresses_destroy(addresses); + gpr_free(subchannels); } gpr_mu_lock(&r->mu); + GPR_ASSERT(r->resolving); + r->resolving = 0; if (r->resolved_config) { grpc_client_config_unref(r->resolved_config); } @@ -170,11 +159,12 @@ static void dns_on_resolved(void *arg, grpc_resolved_addresses *addresses) { dns_maybe_finish_next_locked(r); gpr_mu_unlock(&r->mu); - dns_unref(&r->base); + GRPC_RESOLVER_UNREF(&r->base, "dns-resolving"); } static void dns_start_resolving_locked(dns_resolver *r) { - dns_ref(&r->base); + GRPC_RESOLVER_REF(&r->base, "dns-resolving"); + GPR_ASSERT(!r->resolving); r->resolving = 1; grpc_resolve_address(r->name, r->default_port, dns_on_resolved, r); } @@ -190,7 +180,8 @@ static void dns_maybe_finish_next_locked(dns_resolver *r) { } } -static void dns_destroy(dns_resolver *r) { +static void dns_destroy(grpc_resolver *gr) { + dns_resolver *r = (dns_resolver *)gr; gpr_mu_destroy(&r->mu); if (r->resolved_config) { grpc_client_config_unref(r->resolved_config); @@ -220,7 +211,7 @@ static grpc_resolver *dns_create( memset(r, 0, sizeof(*r)); gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); - r->base.vtable = &dns_resolver_vtable; + grpc_resolver_init(&r->base, &dns_resolver_vtable); r->name = gpr_strdup(path); r->default_port = gpr_strdup(default_port); r->subchannel_factory = subchannel_factory; diff --git a/src/core/client_config/resolvers/unix_resolver_posix.c b/src/core/client_config/resolvers/unix_resolver_posix.c index f7498548b1..7f2008685c 100644 --- a/src/core/client_config/resolvers/unix_resolver_posix.c +++ b/src/core/client_config/resolvers/unix_resolver_posix.c @@ -71,12 +71,10 @@ typedef struct { grpc_client_config **target_config; } unix_resolver; -static void unix_destroy(unix_resolver *r); +static void unix_destroy(grpc_resolver *r); static void unix_maybe_finish_next_locked(unix_resolver *r); -static void unix_ref(grpc_resolver *r); -static void unix_unref(grpc_resolver *r); static void unix_shutdown(grpc_resolver *r); static void unix_channel_saw_error(grpc_resolver *r, struct sockaddr *failing_address, @@ -85,19 +83,7 @@ static void unix_next(grpc_resolver *r, grpc_client_config **target_config, grpc_iomgr_closure *on_complete); static const grpc_resolver_vtable unix_resolver_vtable = { - unix_ref, unix_unref, unix_shutdown, unix_channel_saw_error, unix_next}; - -static void unix_ref(grpc_resolver *resolver) { - unix_resolver *r = (unix_resolver *)resolver; - gpr_ref(&r->refs); -} - -static void unix_unref(grpc_resolver *resolver) { - unix_resolver *r = (unix_resolver *)resolver; - if (gpr_unref(&r->refs)) { - unix_destroy(r); - } -} + unix_destroy, unix_shutdown, unix_channel_saw_error, unix_next}; static void unix_shutdown(grpc_resolver *resolver) { unix_resolver *r = (unix_resolver *)resolver; @@ -149,7 +135,8 @@ static void unix_maybe_finish_next_locked(unix_resolver *r) { } } -static void unix_destroy(unix_resolver *r) { +static void unix_destroy(grpc_resolver *gr) { + unix_resolver *r = (unix_resolver*)gr; gpr_mu_destroy(&r->mu); grpc_subchannel_factory_unref(r->subchannel_factory); gpr_free(r); @@ -171,7 +158,7 @@ static grpc_resolver *unix_create( memset(r, 0, sizeof(*r)); gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); - r->base.vtable = &unix_resolver_vtable; + grpc_resolver_init(&r->base, &unix_resolver_vtable); r->subchannel_factory = subchannel_factory; r->lb_policy_factory = lb_policy_factory; diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index a0d21d99eb..6f4bf2ebe8 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -78,6 +78,8 @@ struct grpc_subchannel { size_t addr_len; /** metadata context */ grpc_mdctx *mdctx; + /** master channel */ + grpc_channel *master; /** set during connection */ grpc_connect_out_args connecting_result; @@ -217,6 +219,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector, c->addr_len = args->addr_len; c->args = grpc_channel_args_copy(args->args); c->mdctx = args->mdctx; + c->master = args->master; grpc_mdctx_ref(c->mdctx); grpc_pollset_set_init(&c->pollset_set); grpc_iomgr_closure_init(&c->connected, subchannel_connected, c); @@ -267,6 +270,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, w4c->initial_op = *initial_op; w4c->target = target; w4c->subchannel = c; + /* released when clearing w4c */ subchannel_ref_locked(c); grpc_iomgr_closure_init(&w4c->continuation, continue_creating_call, w4c); c->waiting = w4c; @@ -274,6 +278,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, if (!c->connecting) { c->connecting = 1; connectivity_state_changed_locked(c); + /* released by connection */ subchannel_ref_locked(c); gpr_mu_unlock(&c->mu); @@ -301,6 +306,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, notify)) { do_connect = 1; c->connecting = 1; + /* released by connection */ subchannel_ref_locked(c); grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); @@ -313,7 +319,8 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) { - abort(); /* not implemented */ + gpr_log(GPR_ERROR, "grpc_subchannel_process_transport_op not implemented"); + abort(); } static void on_state_changed(void *p, int iomgr_success) { @@ -357,6 +364,7 @@ static void on_state_changed(void *p, int iomgr_success) { break; case GRPC_CHANNEL_TRANSIENT_FAILURE: /* things are starting to go wrong, reconnect but don't deactivate */ + /* released by connection */ subchannel_ref_locked(c); do_connect = 1; c->connecting = 1; @@ -406,8 +414,9 @@ static void publish_transport(grpc_subchannel *c) { stk = (grpc_channel_stack *)(con + 1); con->refs = 0; con->subchannel = c; - grpc_channel_stack_init(filters, num_filters, c->args, c->mdctx, stk); + grpc_channel_stack_init(filters, num_filters, c->master, c->args, c->mdctx, stk); grpc_connected_channel_bind_transport(stk, c->connecting_result.transport); + gpr_free(c->connecting_result.filters); memset(&c->connecting_result, 0, sizeof(c->connecting_result)); /* initialize state watcher */ diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index b777e51d20..766258846a 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -90,6 +90,8 @@ struct grpc_subchannel_args { size_t addr_len; /** metadata context to use */ grpc_mdctx *mdctx; + /** master channel */ + grpc_channel *master; }; /** create a subchannel given a connector */ diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c index 2765706de8..8fbddd73b0 100644 --- a/src/core/iomgr/iomgr.c +++ b/src/core/iomgr/iomgr.c @@ -201,10 +201,21 @@ void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb, closure->next = NULL; } +static void assert_not_scheduled_locked(grpc_iomgr_closure *closure) { +#ifndef NDEBUG + grpc_iomgr_closure *c; + + for (c = g_cbs_head; c; c = c->next) { + GPR_ASSERT(c != closure); + } +#endif +} + void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) { closure->success = success; GPR_ASSERT(closure->cb); gpr_mu_lock(&g_mu); + assert_not_scheduled_locked(closure); closure->next = NULL; if (!g_cbs_tail) { g_cbs_head = g_cbs_tail = closure; diff --git a/src/core/security/client_auth_filter.c b/src/core/security/client_auth_filter.c index 2c05f73df5..0bd370e457 100644 --- a/src/core/security/client_auth_filter.c +++ b/src/core/security/client_auth_filter.c @@ -280,7 +280,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last) { diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c index cf5ce4010e..a92b46c85f 100644 --- a/src/core/security/server_auth_filter.c +++ b/src/core/security/server_auth_filter.c @@ -88,7 +88,7 @@ static void destroy_call_elem(grpc_call_element *elem) { } /* Constructor for channel_data */ -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { grpc_security_connector *sc = grpc_find_security_connector_in_args(args); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 4857912b4f..e85eaf2c05 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -109,8 +109,6 @@ grpc_channel *grpc_channel_create_from_filters( } channel->path_string = grpc_mdstr_from_string(mdctx, ":path"); channel->authority_string = grpc_mdstr_from_string(mdctx, ":authority"); - grpc_channel_stack_init(filters, num_filters, args, channel->metadata_context, - CHANNEL_STACK_FROM_CHANNEL(channel)); gpr_mu_init(&channel->registered_call_mu); channel->registered_calls = NULL; @@ -131,6 +129,9 @@ grpc_channel *grpc_channel_create_from_filters( } } + grpc_channel_stack_init(filters, num_filters, channel, args, channel->metadata_context, + CHANNEL_STACK_FROM_CHANNEL(channel)); + return channel; } @@ -237,6 +238,13 @@ void grpc_channel_internal_unref(grpc_channel *channel) { } void grpc_channel_destroy(grpc_channel *channel) { + grpc_transport_op op; + grpc_channel_element *elem; + memset(&op, 0, sizeof(op)); + op.disconnect = 1; + elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); + elem->filter->start_transport_op(elem, &op); + GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel"); } diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c index 494a44725a..0d756a131e 100644 --- a/src/core/surface/channel_create.c +++ b/src/core/surface/channel_create.c @@ -155,6 +155,7 @@ grpc_channel *grpc_channel_create(const char *target, f = gpr_malloc(sizeof(*f)); f->base.vtable = &subchannel_factory_vtable; gpr_ref_init(&f->refs, 1); + grpc_mdctx_ref(mdctx); f->mdctx = mdctx; resolver = grpc_resolver_create(target, &f->base); if (!resolver) { @@ -163,7 +164,8 @@ grpc_channel *grpc_channel_create(const char *target, channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1); grpc_client_channel_set_resolver(grpc_channel_get_channel_stack(channel), resolver); - grpc_resolver_unref(resolver); + GRPC_RESOLVER_UNREF(resolver, "create"); + grpc_subchannel_factory_unref(&f->base); return channel; } diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c index 5235d3f7f4..c6ac679871 100644 --- a/src/core/surface/lame_client.c +++ b/src/core/surface/lame_client.c @@ -105,7 +105,7 @@ static void init_call_elem(grpc_call_element *elem, static void destroy_call_elem(grpc_call_element *elem) {} -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *mdctx, int is_first, int is_last) { channel_data *chand = elem->channel_data; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 6d06725bf3..f7d385c7af 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -709,7 +709,7 @@ static void destroy_call_elem(grpc_call_element *elem) { server_unref(chand->server); } -static void init_channel_elem(grpc_channel_element *elem, +static void init_channel_elem(grpc_channel_element *elem,grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, int is_last) { diff --git a/src/core/transport/connectivity_state.c b/src/core/transport/connectivity_state.c index 5cbd67ef3c..8df08af32f 100644 --- a/src/core/transport/connectivity_state.c +++ b/src/core/transport/connectivity_state.c @@ -33,15 +33,18 @@ #include "src/core/transport/connectivity_state.h" #include <grpc/support/alloc.h> +#include <grpc/support/log.h> void grpc_connectivity_state_init(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state init_state) { tracker->current_state = init_state; tracker->watchers = NULL; + /*gpr_log(GPR_DEBUG, "CS:%p:init:%d", tracker, init_state);*/ } void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker *tracker) { grpc_connectivity_state_watcher *w; + /*gpr_log(GPR_DEBUG, "CS:%p:destroy", tracker);*/ while ((w = tracker->watchers)) { tracker->watchers = w->next; @@ -80,6 +83,7 @@ void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) { grpc_connectivity_state_watcher *new = NULL; grpc_connectivity_state_watcher *w; + /*gpr_log(GPR_DEBUG, "CS:%p:set:%d", tracker, state);*/ if (tracker->current_state == state) { return; } |