diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/channel/client_channel.c | 94 | ||||
-rw-r--r-- | src/core/channel/connectivity_state.c | 3 | ||||
-rw-r--r-- | src/core/client_config/lb_policies/pick_first.c | 6 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.c | 4 | ||||
-rw-r--r-- | src/core/client_config/lb_policy.h | 2 | ||||
-rw-r--r-- | src/core/client_config/subchannel.c | 94 | ||||
-rw-r--r-- | src/core/client_config/subchannel.h | 22 | ||||
-rw-r--r-- | src/core/surface/channel.c | 28 | ||||
-rw-r--r-- | src/core/surface/channel.h | 2 |
9 files changed, 149 insertions, 106 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 4d082aceb8..19700a90a6 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -38,6 +38,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" +#include "src/core/channel/connectivity_state.h" #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/pollset_set.h" #include "src/core/support/string.h" @@ -68,8 +69,7 @@ typedef struct { /** resolver callback */ grpc_iomgr_closure on_config_changed; /** connectivity state being tracked */ - grpc_iomgr_closure *on_connectivity_state_change; - grpc_connectivity_state *connectivity_state; + grpc_connectivity_state_tracker state_tracker; } channel_data; typedef enum { @@ -98,60 +98,6 @@ struct call_data { grpc_linked_mdelem details; }; -#if 0 -static int prepare_activate(grpc_call_element *elem, - grpc_child_channel *on_child) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - if (calld->state == CALL_CANCELLED) return 0; - - /* no more access to calld->s.waiting allowed */ - GPR_ASSERT(calld->state == CALL_WAITING); - - if (calld->waiting_op.bind_pollset) { - grpc_transport_setup_del_interested_party(chand->transport_setup, - calld->waiting_op.bind_pollset); - } - - calld->state = CALL_ACTIVE; - - /* create a child call */ - /* TODO(ctiller): pass the waiting op down here */ - calld->s.active.child_call = - grpc_child_channel_create_call(on_child, elem, NULL); - - return 1; -} - -static void complete_activate(grpc_call_element *elem, grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; - grpc_call_element *child_elem = - grpc_child_call_get_top_element(calld->s.active.child_call); - - GPR_ASSERT(calld->state == CALL_ACTIVE); - - /* continue the start call down the stack, this nees to happen after metadata - are flushed*/ - child_elem->filter->start_transport_op(child_elem, op); -} - -static void remove_waiting_child(channel_data *chand, call_data *calld) { - size_t new_count; - size_t i; - for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) { - if (chand->waiting_children[i] == calld) { - grpc_transport_setup_del_interested_party( - chand->transport_setup, calld->waiting_op.bind_pollset); - continue; - } - chand->waiting_children[new_count++] = chand->waiting_children[i]; - } - GPR_ASSERT(new_count == chand->waiting_child_count - 1 || - new_count == chand->waiting_child_count); - chand->waiting_child_count = new_count; -} -#endif - static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; @@ -426,7 +372,39 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } } -static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {} +static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) { + grpc_lb_policy *lb_policy = NULL; + channel_data *chand = elem->channel_data; + grpc_iomgr_closure *on_consumed = op->on_consumed; + op->on_consumed = NULL; + + GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->bind_pollset == NULL); + + gpr_mu_lock(&chand->mu_config); + if (op->on_connectivity_state_change != NULL) { + grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change); + op->on_connectivity_state_change = NULL; + op->connectivity_state = NULL; + } + + if (!is_empty(op, sizeof(*op))) { + lb_policy = chand->lb_policy; + if (lb_policy) { + grpc_lb_policy_ref(lb_policy); + } + } + gpr_mu_unlock(&chand->mu_config); + + if (lb_policy) { + grpc_lb_policy_broadcast(lb_policy, op); + grpc_lb_policy_unref(lb_policy); + } + + if (on_consumed) { + grpc_iomgr_add_callback(on_consumed); + } +} /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, @@ -458,7 +436,7 @@ static void destroy_call_elem(grpc_call_element *elem) { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_unref(subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "channel"); break; case CALL_CREATED: case CALL_CANCELLED: diff --git a/src/core/channel/connectivity_state.c b/src/core/channel/connectivity_state.c index 566a2c3344..0ee268ee59 100644 --- a/src/core/channel/connectivity_state.c +++ b/src/core/channel/connectivity_state.c @@ -75,6 +75,9 @@ int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_track void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) { grpc_connectivity_state_watcher *new = NULL; grpc_connectivity_state_watcher *w; + if (tracker->current_state == state) { + return; + } tracker->current_state = state; while ((w = tracker->watchers)) { tracker->watchers = w->next; diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index 4fda07a63f..e6e743fa7a 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -173,7 +173,7 @@ loop: p->checking_subchannel %= p->num_subchannels; p->checking_connectivity = grpc_subchannel_check_connectivity(p->subchannels[p->checking_subchannel]); p->num_subchannels--; - grpc_subchannel_unref(p->subchannels[p->num_subchannels]); + GRPC_SUBCHANNEL_UNREF(p->subchannels[p->num_subchannels], "pick_first"); add_interested_parties_locked(p); if (p->num_subchannels == 0) { abort(); @@ -199,13 +199,13 @@ static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) { subchannels = gpr_malloc(n * sizeof(*subchannels)); for (i = 0; i < n; i++) { subchannels[i] = p->subchannels[i]; - grpc_subchannel_ref(subchannels[i]); + GRPC_SUBCHANNEL_REF(subchannels[i], "broadcast"); } gpr_mu_unlock(&p->mu); for (i = 0; i < n; i++) { grpc_subchannel_process_transport_op(subchannels[i], op); - grpc_subchannel_unref(subchannels[i]); + GRPC_SUBCHANNEL_UNREF(subchannels[i], "broadcast"); } gpr_free(subchannels); } diff --git a/src/core/client_config/lb_policy.c b/src/core/client_config/lb_policy.c index 5a2b70b94e..2daba14c2a 100644 --- a/src/core/client_config/lb_policy.c +++ b/src/core/client_config/lb_policy.c @@ -49,3 +49,7 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_iomgr_closure *on_complete) { policy->vtable->pick(policy, pollset, initial_metadata, target, on_complete); } + +void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op) { + policy->vtable->broadcast(policy, op); +} diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h index 42be9152cb..dcefa6e27e 100644 --- a/src/core/client_config/lb_policy.h +++ b/src/core/client_config/lb_policy.h @@ -85,4 +85,6 @@ void grpc_lb_policy_pick(grpc_lb_policy *policy, grpc_pollset *pollset, grpc_subchannel **target, grpc_iomgr_closure *on_complete); +void grpc_lb_policy_broadcast(grpc_lb_policy *policy, grpc_transport_op *op); + #endif /* GRPC_INTERNAL_CORE_CONFIG_LB_POLICY_H */ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 5cbb5d9971..e441befc0c 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -106,13 +106,67 @@ static gpr_timespec compute_connect_deadline(grpc_subchannel *c); static void subchannel_connected(void *subchannel, int iomgr_success); /* + * connection implementation + */ + +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +#define CONNECTION_REF(c, r) connection_ref((c), __FILE__, __LINE__, (r)) +#define CONNECTION_UNREF(c, r) connection_unref((c), __FILE__, __LINE__, (r)) +#else +#define CONNECTION_REF(c, r) connection_ref((c)) +#define CONNECTION_UNREF(c, r) connection_unref((c)) +#endif + +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +static void connection_ref(connection *c, const char *file, int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p ref %d -> %d %s", + c, (int)c->refs.count, (int)c->refs.count + 1, + reason); +#else +static void connection_ref(connection *c) { +#endif + gpr_ref(&c->refs); +} + +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +static void connection_unref(connection *c, const char *file, int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCONN:%p unref %d -> %d %s", + c, (int)c->refs.count, (int)c->refs.count - 1, + reason); +#else +static void connection_unref(connection *c) { +#endif + if (gpr_unref(&c->refs)) { + GRPC_SUBCHANNEL_UNREF(c->subchannel, "connection"); + gpr_free(c); + } +} + +/* * grpc_subchannel implementation */ -void grpc_subchannel_ref(grpc_subchannel *c) { gpr_ref(&c->refs); } +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +void grpc_subchannel_ref(grpc_subchannel *c, const char *file, int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p ref %d -> %d %s", + c, (int)c->refs.count, (int)c->refs.count + 1, + reason); +#else +void grpc_subchannel_ref(grpc_subchannel *c) { +#endif + gpr_ref(&c->refs); +} +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +void grpc_subchannel_unref(grpc_subchannel *c, const char *file, int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCHAN:%p unref %d -> %d %s", + c, (int)c->refs.count, (int)c->refs.count - 1, + reason); +#else void grpc_subchannel_unref(grpc_subchannel *c) { +#endif if (gpr_unref(&c->refs)) { + if (c->active != NULL) CONNECTION_UNREF(c->active, "subchannel"); gpr_free(c->filters); grpc_channel_args_destroy(c->args); gpr_free(c->addr); @@ -178,7 +232,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, gpr_mu_lock(&c->mu); if (c->active != NULL) { con = c->active; - gpr_ref(&con->refs); + CONNECTION_REF(con, "call"); gpr_mu_unlock(&c->mu); *target = create_call(con, initial_op); @@ -194,7 +248,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c, if (!c->connecting) { c->connecting = 1; connectivity_state_changed_locked(c); - grpc_subchannel_ref(c); + GRPC_SUBCHANNEL_REF(c, "connection"); gpr_mu_unlock(&c->mu); start_connect(c); @@ -220,7 +274,7 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *c, if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) { do_connect = 1; c->connecting = 1; - grpc_subchannel_ref(c); + GRPC_SUBCHANNEL_REF(c, "connection"); grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c)); } gpr_mu_unlock(&c->mu); @@ -275,7 +329,7 @@ static void subchannel_connected(void *arg, int iomgr_success) { if (c->connecting_result.transport) { publish_transport(c); } else { - grpc_subchannel_unref(c); + GRPC_SUBCHANNEL_UNREF(c, "connection"); /* TODO(ctiller): retry after sleeping */ abort(); } @@ -304,17 +358,29 @@ static void connectivity_state_changed_locked(grpc_subchannel *c) { * grpc_subchannel_call implementation */ -void grpc_subchannel_call_ref(grpc_subchannel_call *call) { - gpr_ref(&call->refs); +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +void grpc_subchannel_call_ref(grpc_subchannel_call *c, const char *file, int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p ref %d -> %d %s", + c, (int)c->refs.count, (int)c->refs.count + 1, + reason); +#else +void grpc_subchannel_call_ref(grpc_subchannel_call *c) { +#endif + gpr_ref(&c->refs); } -void grpc_subchannel_call_unref(grpc_subchannel_call *call) { - if (gpr_unref(&call->refs)) { - grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(call)); - if (gpr_unref(&call->connection->refs)) { - gpr_free(call->connection); - } - gpr_free(call); +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +void grpc_subchannel_call_unref(grpc_subchannel_call *c, const char *file, int line, const char *reason) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SUBCALL:%p unref %d -> %d %s", + c, (int)c->refs.count, (int)c->refs.count - 1, + reason); +#else +void grpc_subchannel_call_unref(grpc_subchannel_call *c) { +#endif + if (gpr_unref(&c->refs)) { + grpc_call_stack_destroy(SUBCHANNEL_CALL_TO_CALL_STACK(c)); + CONNECTION_UNREF(c->connection, "call"); + gpr_free(c); } } diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h index 60b95d3d8f..7cdcccce8f 100644 --- a/src/core/client_config/subchannel.h +++ b/src/core/client_config/subchannel.h @@ -37,14 +37,33 @@ #include "src/core/channel/channel_stack.h" #include "src/core/client_config/connector.h" +#define GRPC_SUBCHANNEL_REFCOUNT_DEBUG + /** A (sub-)channel that knows how to connect to exactly one target address. Provides a target for load balancing. */ typedef struct grpc_subchannel grpc_subchannel; typedef struct grpc_subchannel_call grpc_subchannel_call; typedef struct grpc_subchannel_args grpc_subchannel_args; +#ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG +#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c), __FILE__, __LINE__, (r)) +#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c), __FILE__, __LINE__, (r)) +void grpc_subchannel_ref(grpc_subchannel *channel, const char *file, int line, const char *reason); +void grpc_subchannel_unref(grpc_subchannel *channel, const char *file, int line, const char *reason); +void grpc_subchannel_call_ref(grpc_subchannel_call *call, const char *file, int line, const char *reason); +void grpc_subchannel_call_unref(grpc_subchannel_call *call, const char *file, int line, const char *reason); +#else +#define GRPC_SUBCHANNEL_REF(c, r) grpc_subchannel_ref((c)) +#define GRPC_SUBCHANNEL_UNREF(c, r) grpc_subchannel_unref((c)) +#define GRPC_SUBCHANNEL_CALL_REF(c, r) grpc_subchannel_call_ref((c)) +#define GRPC_SUBCHANNEL_CALL_UNREF(c, r) grpc_subchannel_call_unref((c)) void grpc_subchannel_ref(grpc_subchannel *channel); void grpc_subchannel_unref(grpc_subchannel *channel); +void grpc_subchannel_call_ref(grpc_subchannel_call *call); +void grpc_subchannel_call_unref(grpc_subchannel_call *call); +#endif /** construct a call (possibly asynchronously) */ void grpc_subchannel_create_call(grpc_subchannel *subchannel, @@ -55,9 +74,6 @@ void grpc_subchannel_create_call(grpc_subchannel *subchannel, /** process a transport level op */ void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op); -void grpc_subchannel_call_ref(grpc_subchannel_call *call); -void grpc_subchannel_call_unref(grpc_subchannel_call *call); - /** poll the current connectivity state of a channel */ grpc_connectivity_state grpc_subchannel_check_connectivity( grpc_subchannel *channel); diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c index 8c136af841..4857912b4f 100644 --- a/src/core/surface/channel.c +++ b/src/core/surface/channel.c @@ -93,9 +93,8 @@ grpc_channel *grpc_channel_create_from_filters( grpc_channel *channel = gpr_malloc(size); GPR_ASSERT(grpc_is_initialized() && "call grpc_init()"); channel->is_client = is_client; - /* decremented by grpc_channel_destroy, and grpc_client_channel_closed if - * is_client */ - gpr_ref_init(&channel->refs, 1 + is_client); + /* decremented by grpc_channel_destroy */ + gpr_ref_init(&channel->refs, 1); channel->metadata_context = mdctx; channel->grpc_status_string = grpc_mdstr_from_string(mdctx, "grpc-status"); channel->grpc_compression_level_string = @@ -237,33 +236,10 @@ void grpc_channel_internal_unref(grpc_channel *channel) { } } -static void default_consumed(void *arg, int iomgr_success) { - grpc_channel *channel = arg; - GRPC_CHANNEL_INTERNAL_UNREF(channel, "op"); -} - -static void execute_op(grpc_channel *channel, grpc_transport_op *op) { - grpc_channel_element *elem = grpc_channel_stack_element(CHANNEL_STACK_FROM_CHANNEL(channel), 0); - if (op->on_consumed == NULL) { - GRPC_CHANNEL_INTERNAL_REF(channel, "op"); - op->on_consumed = gpr_malloc(sizeof(*op->on_consumed)); - grpc_iomgr_closure_init(op->on_consumed, default_consumed, channel); - } - elem->filter->start_transport_op(elem, op); -} - void grpc_channel_destroy(grpc_channel *channel) { - grpc_transport_op op; - memset(&op, 0, sizeof(op)); - op.disconnect = 1; - execute_op(channel, &op); GRPC_CHANNEL_INTERNAL_UNREF(channel, "channel"); } -void grpc_client_channel_closed(grpc_channel_element *elem) { - GRPC_CHANNEL_INTERNAL_UNREF(CHANNEL_FROM_TOP_ELEM(elem), "closed"); -} - grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel) { return CHANNEL_STACK_FROM_CHANNEL(channel); } diff --git a/src/core/surface/channel.h b/src/core/surface/channel.h index 516c0ac559..71f8a55731 100644 --- a/src/core/surface/channel.h +++ b/src/core/surface/channel.h @@ -58,8 +58,6 @@ grpc_mdstr *grpc_channel_get_compresssion_level_string(grpc_channel *channel); grpc_mdstr *grpc_channel_get_message_string(grpc_channel *channel); gpr_uint32 grpc_channel_get_max_message_length(grpc_channel *channel); -void grpc_client_channel_closed(grpc_channel_element *elem); - #ifdef GRPC_CHANNEL_REF_COUNT_DEBUG void grpc_channel_internal_ref(grpc_channel *channel, const char *reason); void grpc_channel_internal_unref(grpc_channel *channel, const char *reason); |