diff options
Diffstat (limited to 'src/core/channel')
-rw-r--r-- | src/core/channel/client_channel.c | 94 | ||||
-rw-r--r-- | src/core/channel/connectivity_state.c | 3 |
2 files changed, 39 insertions, 58 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 4d082aceb8..19700a90a6 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -38,6 +38,7 @@ #include "src/core/channel/channel_args.h" #include "src/core/channel/connected_channel.h" +#include "src/core/channel/connectivity_state.h" #include "src/core/iomgr/iomgr.h" #include "src/core/iomgr/pollset_set.h" #include "src/core/support/string.h" @@ -68,8 +69,7 @@ typedef struct { /** resolver callback */ grpc_iomgr_closure on_config_changed; /** connectivity state being tracked */ - grpc_iomgr_closure *on_connectivity_state_change; - grpc_connectivity_state *connectivity_state; + grpc_connectivity_state_tracker state_tracker; } channel_data; typedef enum { @@ -98,60 +98,6 @@ struct call_data { grpc_linked_mdelem details; }; -#if 0 -static int prepare_activate(grpc_call_element *elem, - grpc_child_channel *on_child) { - call_data *calld = elem->call_data; - channel_data *chand = elem->channel_data; - if (calld->state == CALL_CANCELLED) return 0; - - /* no more access to calld->s.waiting allowed */ - GPR_ASSERT(calld->state == CALL_WAITING); - - if (calld->waiting_op.bind_pollset) { - grpc_transport_setup_del_interested_party(chand->transport_setup, - calld->waiting_op.bind_pollset); - } - - calld->state = CALL_ACTIVE; - - /* create a child call */ - /* TODO(ctiller): pass the waiting op down here */ - calld->s.active.child_call = - grpc_child_channel_create_call(on_child, elem, NULL); - - return 1; -} - -static void complete_activate(grpc_call_element *elem, grpc_transport_stream_op *op) { - call_data *calld = elem->call_data; - grpc_call_element *child_elem = - grpc_child_call_get_top_element(calld->s.active.child_call); - - GPR_ASSERT(calld->state == CALL_ACTIVE); - - /* continue the start call down the stack, this nees to happen after metadata - are flushed*/ - child_elem->filter->start_transport_op(child_elem, op); -} - -static void remove_waiting_child(channel_data *chand, call_data *calld) { - size_t new_count; - size_t i; - for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) { - if (chand->waiting_children[i] == calld) { - grpc_transport_setup_del_interested_party( - chand->transport_setup, calld->waiting_op.bind_pollset); - continue; - } - chand->waiting_children[new_count++] = chand->waiting_children[i]; - } - GPR_ASSERT(new_count == chand->waiting_child_count - 1 || - new_count == chand->waiting_child_count); - chand->waiting_child_count = new_count; -} -#endif - static void handle_op_after_cancellation(grpc_call_element *elem, grpc_transport_stream_op *op) { call_data *calld = elem->call_data; @@ -426,7 +372,39 @@ static void cc_on_config_changed(void *arg, int iomgr_success) { } } -static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {} +static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) { + grpc_lb_policy *lb_policy = NULL; + channel_data *chand = elem->channel_data; + grpc_iomgr_closure *on_consumed = op->on_consumed; + op->on_consumed = NULL; + + GPR_ASSERT(op->set_accept_stream == NULL); + GPR_ASSERT(op->bind_pollset == NULL); + + gpr_mu_lock(&chand->mu_config); + if (op->on_connectivity_state_change != NULL) { + grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change); + op->on_connectivity_state_change = NULL; + op->connectivity_state = NULL; + } + + if (!is_empty(op, sizeof(*op))) { + lb_policy = chand->lb_policy; + if (lb_policy) { + grpc_lb_policy_ref(lb_policy); + } + } + gpr_mu_unlock(&chand->mu_config); + + if (lb_policy) { + grpc_lb_policy_broadcast(lb_policy, op); + grpc_lb_policy_unref(lb_policy); + } + + if (on_consumed) { + grpc_iomgr_add_callback(on_consumed); + } +} /* Constructor for call_data */ static void init_call_elem(grpc_call_element *elem, @@ -458,7 +436,7 @@ static void destroy_call_elem(grpc_call_element *elem) { case CALL_ACTIVE: subchannel_call = calld->subchannel_call; gpr_mu_unlock(&calld->mu_state); - grpc_subchannel_call_unref(subchannel_call); + GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "channel"); break; case CALL_CREATED: case CALL_CANCELLED: diff --git a/src/core/channel/connectivity_state.c b/src/core/channel/connectivity_state.c index 566a2c3344..0ee268ee59 100644 --- a/src/core/channel/connectivity_state.c +++ b/src/core/channel/connectivity_state.c @@ -75,6 +75,9 @@ int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_track void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) { grpc_connectivity_state_watcher *new = NULL; grpc_connectivity_state_watcher *w; + if (tracker->current_state == state) { + return; + } tracker->current_state = state; while ((w = tracker->watchers)) { tracker->watchers = w->next; |