aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel')
-rw-r--r--src/core/channel/client_channel.c94
-rw-r--r--src/core/channel/connectivity_state.c3
2 files changed, 39 insertions, 58 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index 4d082aceb8..19700a90a6 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -38,6 +38,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/connectivity_state.h"
#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
@@ -68,8 +69,7 @@ typedef struct {
/** resolver callback */
grpc_iomgr_closure on_config_changed;
/** connectivity state being tracked */
- grpc_iomgr_closure *on_connectivity_state_change;
- grpc_connectivity_state *connectivity_state;
+ grpc_connectivity_state_tracker state_tracker;
} channel_data;
typedef enum {
@@ -98,60 +98,6 @@ struct call_data {
grpc_linked_mdelem details;
};
-#if 0
-static int prepare_activate(grpc_call_element *elem,
- grpc_child_channel *on_child) {
- call_data *calld = elem->call_data;
- channel_data *chand = elem->channel_data;
- if (calld->state == CALL_CANCELLED) return 0;
-
- /* no more access to calld->s.waiting allowed */
- GPR_ASSERT(calld->state == CALL_WAITING);
-
- if (calld->waiting_op.bind_pollset) {
- grpc_transport_setup_del_interested_party(chand->transport_setup,
- calld->waiting_op.bind_pollset);
- }
-
- calld->state = CALL_ACTIVE;
-
- /* create a child call */
- /* TODO(ctiller): pass the waiting op down here */
- calld->s.active.child_call =
- grpc_child_channel_create_call(on_child, elem, NULL);
-
- return 1;
-}
-
-static void complete_activate(grpc_call_element *elem, grpc_transport_stream_op *op) {
- call_data *calld = elem->call_data;
- grpc_call_element *child_elem =
- grpc_child_call_get_top_element(calld->s.active.child_call);
-
- GPR_ASSERT(calld->state == CALL_ACTIVE);
-
- /* continue the start call down the stack, this nees to happen after metadata
- are flushed*/
- child_elem->filter->start_transport_op(child_elem, op);
-}
-
-static void remove_waiting_child(channel_data *chand, call_data *calld) {
- size_t new_count;
- size_t i;
- for (i = 0, new_count = 0; i < chand->waiting_child_count; i++) {
- if (chand->waiting_children[i] == calld) {
- grpc_transport_setup_del_interested_party(
- chand->transport_setup, calld->waiting_op.bind_pollset);
- continue;
- }
- chand->waiting_children[new_count++] = chand->waiting_children[i];
- }
- GPR_ASSERT(new_count == chand->waiting_child_count - 1 ||
- new_count == chand->waiting_child_count);
- chand->waiting_child_count = new_count;
-}
-#endif
-
static void handle_op_after_cancellation(grpc_call_element *elem,
grpc_transport_stream_op *op) {
call_data *calld = elem->call_data;
@@ -426,7 +372,39 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
}
-static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {}
+static void cc_start_transport_op(grpc_channel_element *elem, grpc_transport_op *op) {
+ grpc_lb_policy *lb_policy = NULL;
+ channel_data *chand = elem->channel_data;
+ grpc_iomgr_closure *on_consumed = op->on_consumed;
+ op->on_consumed = NULL;
+
+ GPR_ASSERT(op->set_accept_stream == NULL);
+ GPR_ASSERT(op->bind_pollset == NULL);
+
+ gpr_mu_lock(&chand->mu_config);
+ if (op->on_connectivity_state_change != NULL) {
+ grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, op->connectivity_state, op->on_connectivity_state_change);
+ op->on_connectivity_state_change = NULL;
+ op->connectivity_state = NULL;
+ }
+
+ if (!is_empty(op, sizeof(*op))) {
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ grpc_lb_policy_ref(lb_policy);
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+
+ if (lb_policy) {
+ grpc_lb_policy_broadcast(lb_policy, op);
+ grpc_lb_policy_unref(lb_policy);
+ }
+
+ if (on_consumed) {
+ grpc_iomgr_add_callback(on_consumed);
+ }
+}
/* Constructor for call_data */
static void init_call_elem(grpc_call_element *elem,
@@ -458,7 +436,7 @@ static void destroy_call_elem(grpc_call_element *elem) {
case CALL_ACTIVE:
subchannel_call = calld->subchannel_call;
gpr_mu_unlock(&calld->mu_state);
- grpc_subchannel_call_unref(subchannel_call);
+ GRPC_SUBCHANNEL_CALL_UNREF(subchannel_call, "channel");
break;
case CALL_CREATED:
case CALL_CANCELLED:
diff --git a/src/core/channel/connectivity_state.c b/src/core/channel/connectivity_state.c
index 566a2c3344..0ee268ee59 100644
--- a/src/core/channel/connectivity_state.c
+++ b/src/core/channel/connectivity_state.c
@@ -75,6 +75,9 @@ int grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_track
void grpc_connectivity_state_set(grpc_connectivity_state_tracker *tracker, grpc_connectivity_state state) {
grpc_connectivity_state_watcher *new = NULL;
grpc_connectivity_state_watcher *w;
+ if (tracker->current_state == state) {
+ return;
+ }
tracker->current_state = state;
while ((w = tracker->watchers)) {
tracker->watchers = w->next;