aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/channel/client_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/channel/client_channel.c')
-rw-r--r--src/core/channel/client_channel.c149
1 files changed, 132 insertions, 17 deletions
diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c
index ec6ca42889..2cfca399b6 100644
--- a/src/core/channel/client_channel.c
+++ b/src/core/channel/client_channel.c
@@ -40,7 +40,6 @@
#include "src/core/channel/connected_channel.h"
#include "src/core/surface/channel.h"
#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/pollset_set.h"
#include "src/core/support/string.h"
#include "src/core/transport/connectivity_state.h"
#include <grpc/support/alloc.h>
@@ -77,8 +76,22 @@ typedef struct {
grpc_iomgr_closure on_config_changed;
/** connectivity state being tracked */
grpc_connectivity_state_tracker state_tracker;
+ /** when an lb_policy arrives, should we try to exit idle */
+ int exit_idle_when_lb_policy_arrives;
+ /** pollset_set of interested parties in a new connection */
+ grpc_pollset_set pollset_set;
} channel_data;
+/** We create one watcher for each new lb_policy that is returned from a resolver,
+ to watch for state changes from the lb_policy. When a state change is seen, we
+ update the channel, and create a new watcher */
+typedef struct {
+ channel_data *chand;
+ grpc_iomgr_closure on_changed;
+ grpc_connectivity_state state;
+ grpc_lb_policy *lb_policy;
+} lb_policy_connectivity_watcher;
+
typedef enum {
CALL_CREATED,
CALL_WAITING_FOR_SEND,
@@ -408,16 +421,53 @@ static void cc_start_transport_stream_op(grpc_call_element *elem,
perform_transport_stream_op(elem, op, 0);
}
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state);
+
+static void on_lb_policy_state_changed(void *arg, int iomgr_success) {
+ lb_policy_connectivity_watcher *w = arg;
+
+ gpr_mu_lock(&w->chand->mu_config);
+ /* check if the notification is for a stale policy */
+ if (w->lb_policy == w->chand->lb_policy) {
+ grpc_connectivity_state_set(&w->chand->state_tracker, w->state,
+ "lb_changed");
+ if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
+ watch_lb_policy(w->chand, w->lb_policy, w->state);
+ }
+ }
+ gpr_mu_unlock(&w->chand->mu_config);
+
+ GRPC_CHANNEL_INTERNAL_UNREF(w->chand->master, "watch_lb_policy");
+ gpr_free(w);
+}
+
+static void watch_lb_policy(channel_data *chand, grpc_lb_policy *lb_policy, grpc_connectivity_state current_state) {
+ lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
+ GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
+
+ w->chand = chand;
+ grpc_iomgr_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
+ w->state = current_state;
+ w->lb_policy = lb_policy;
+ grpc_lb_policy_notify_on_state_change(lb_policy, &w->state, &w->on_changed);
+}
+
static void cc_on_config_changed(void *arg, int iomgr_success) {
channel_data *chand = arg;
grpc_lb_policy *lb_policy = NULL;
grpc_lb_policy *old_lb_policy;
grpc_resolver *old_resolver;
grpc_iomgr_closure *wakeup_closures = NULL;
+ grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
+ int exit_idle = 0;
if (chand->incoming_configuration != NULL) {
lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
- GRPC_LB_POLICY_REF(lb_policy, "channel");
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_REF(lb_policy, "channel");
+ GRPC_LB_POLICY_REF(lb_policy, "config_change");
+ state = grpc_lb_policy_check_connectivity(lb_policy);
+ }
grpc_client_config_unref(chand->incoming_configuration);
}
@@ -431,13 +481,12 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
wakeup_closures = chand->waiting_for_config_closures;
chand->waiting_for_config_closures = NULL;
}
- gpr_mu_unlock(&chand->mu_config);
-
- if (old_lb_policy) {
- GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
+ if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
+ GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
+ exit_idle = 1;
+ chand->exit_idle_when_lb_policy_arrives = 0;
}
- gpr_mu_lock(&chand->mu_config);
if (iomgr_success && chand->resolver) {
grpc_resolver *resolver = chand->resolver;
GRPC_RESOLVER_REF(resolver, "channel-next");
@@ -446,11 +495,16 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
GRPC_RESOLVER_UNREF(resolver, "channel-next");
+ grpc_connectivity_state_set(&chand->state_tracker, state,
+ "new_lb+resolver");
+ if (lb_policy != NULL) {
+ watch_lb_policy(chand, lb_policy, state);
+ }
} else {
old_resolver = chand->resolver;
chand->resolver = NULL;
grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE);
+ GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
gpr_mu_unlock(&chand->mu_config);
if (old_resolver != NULL) {
grpc_resolver_shutdown(old_resolver);
@@ -458,12 +512,24 @@ static void cc_on_config_changed(void *arg, int iomgr_success) {
}
}
+ if (exit_idle) {
+ grpc_lb_policy_exit_idle(lb_policy);
+ GRPC_LB_POLICY_UNREF(lb_policy, "exit_idle");
+ }
+
+ if (old_lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(old_lb_policy, "channel");
+ }
+
while (wakeup_closures) {
grpc_iomgr_closure *next = wakeup_closures->next;
wakeup_closures->cb(wakeup_closures->cb_arg, 1);
wakeup_closures = next;
}
+ if (lb_policy != NULL) {
+ GRPC_LB_POLICY_UNREF(lb_policy, "config_change");
+ }
GRPC_CHANNEL_INTERNAL_UNREF(chand->master, "resolver");
}
@@ -487,20 +553,22 @@ static void cc_start_transport_op(grpc_channel_element *elem,
op->connectivity_state = NULL;
}
+ if (!is_empty(op, sizeof(*op))) {
+ lb_policy = chand->lb_policy;
+ if (lb_policy) {
+ GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+ }
+ }
+
if (op->disconnect && chand->resolver != NULL) {
grpc_connectivity_state_set(&chand->state_tracker,
- GRPC_CHANNEL_FATAL_FAILURE);
+ GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
destroy_resolver = chand->resolver;
chand->resolver = NULL;
if (chand->lb_policy != NULL) {
grpc_lb_policy_shutdown(chand->lb_policy);
- }
- }
-
- if (!is_empty(op, sizeof(*op))) {
- lb_policy = chand->lb_policy;
- if (lb_policy) {
- GRPC_LB_POLICY_REF(lb_policy, "broadcast");
+ GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
+ chand->lb_policy = NULL;
}
}
gpr_mu_unlock(&chand->mu_config);
@@ -581,10 +649,11 @@ static void init_channel_elem(grpc_channel_element *elem, grpc_channel *master,
gpr_mu_init(&chand->mu_config);
chand->mdctx = metadata_context;
chand->master = master;
+ grpc_pollset_set_init(&chand->pollset_set);
grpc_iomgr_closure_init(&chand->on_config_changed, cc_on_config_changed,
chand);
- grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE);
+ grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE, "client_channel");
}
/* Destructor for channel_data */
@@ -598,6 +667,8 @@ static void destroy_channel_elem(grpc_channel_element *elem) {
if (chand->lb_policy != NULL) {
GRPC_LB_POLICY_UNREF(chand->lb_policy, "channel");
}
+ grpc_connectivity_state_destroy(&chand->state_tracker);
+ grpc_pollset_set_destroy(&chand->pollset_set);
gpr_mu_destroy(&chand->mu_config);
}
@@ -626,3 +697,47 @@ void grpc_client_channel_set_resolver(grpc_channel_stack *channel_stack,
grpc_resolver_next(resolver, &chand->incoming_configuration,
&chand->on_config_changed);
}
+
+grpc_connectivity_state grpc_client_channel_check_connectivity_state(
+ grpc_channel_element *elem, int try_to_connect) {
+ channel_data *chand = elem->channel_data;
+ grpc_connectivity_state out;
+ gpr_mu_lock(&chand->mu_config);
+ out = grpc_connectivity_state_check(&chand->state_tracker);
+ if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
+ if (chand->lb_policy != NULL) {
+ grpc_lb_policy_exit_idle(chand->lb_policy);
+ } else {
+ chand->exit_idle_when_lb_policy_arrives = 1;
+ }
+ }
+ gpr_mu_unlock(&chand->mu_config);
+ return out;
+}
+
+void grpc_client_channel_watch_connectivity_state(
+ grpc_channel_element *elem, grpc_connectivity_state *state,
+ grpc_iomgr_closure *on_complete) {
+ channel_data *chand = elem->channel_data;
+ gpr_mu_lock(&chand->mu_config);
+ grpc_connectivity_state_notify_on_state_change(&chand->state_tracker, state,
+ on_complete);
+ gpr_mu_unlock(&chand->mu_config);
+}
+
+grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(grpc_channel_element *elem) {
+ channel_data *chand = elem->channel_data;
+ return &chand->pollset_set;
+}
+
+void grpc_client_channel_add_interested_party(grpc_channel_element *elem,
+ grpc_pollset *pollset) {
+ channel_data *chand = elem->channel_data;
+ grpc_pollset_set_add_pollset(&chand->pollset_set, pollset);
+}
+
+void grpc_client_channel_del_interested_party(grpc_channel_element *elem,
+ grpc_pollset *pollset) {
+ channel_data *chand = elem->channel_data;
+ grpc_pollset_set_del_pollset(&chand->pollset_set, pollset);
+}