aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/client_config')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c44
-rw-r--r--src/core/client_config/lb_policy.h10
-rw-r--r--src/core/client_config/subchannel.c58
-rw-r--r--src/core/client_config/subchannel.h15
4 files changed, 77 insertions, 50 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index f3a6d21eb5..4fda07a63f 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -36,6 +36,7 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include "src/core/channel/connectivity_state.h"
typedef struct pending_pick {
struct pending_pick *next;
@@ -69,6 +70,9 @@ typedef struct {
grpc_connectivity_state checking_connectivity;
/** list of picks that are waiting on connectivity */
pending_pick *pending_picks;
+
+ /** our connectivity state tracker */
+ grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
void pf_ref(grpc_lb_policy *pol) {
@@ -184,8 +188,46 @@ loop:
}
}
+static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ size_t i;
+ size_t n;
+ grpc_subchannel **subchannels;
+
+ gpr_mu_lock(&p->mu);
+ n = p->num_subchannels;
+ subchannels = gpr_malloc(n * sizeof(*subchannels));
+ for (i = 0; i < n; i++) {
+ subchannels[i] = p->subchannels[i];
+ grpc_subchannel_ref(subchannels[i]);
+ }
+ gpr_mu_unlock(&p->mu);
+
+ for (i = 0; i < n; i++) {
+ grpc_subchannel_process_transport_op(subchannels[i], op);
+ grpc_subchannel_unref(subchannels[i]);
+ }
+ gpr_free(subchannels);
+}
+
+static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ grpc_connectivity_state st;
+ gpr_mu_lock(&p->mu);
+ st = grpc_connectivity_state_check(&p->state_tracker);
+ gpr_mu_unlock(&p->mu);
+ return st;
+}
+
+static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_iomgr_closure *notify) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ gpr_mu_lock(&p->mu);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, notify);
+ gpr_mu_unlock(&p->mu);
+}
+
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_ref, pf_unref, pf_shutdown, pf_pick};
+ pf_ref, pf_unref, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels) {
diff --git a/src/core/client_config/lb_policy.h b/src/core/client_config/lb_policy.h
index 42929e933b..42be9152cb 100644
--- a/src/core/client_config/lb_policy.h
+++ b/src/core/client_config/lb_policy.h
@@ -58,6 +58,16 @@ struct grpc_lb_policy_vtable {
void (*pick)(grpc_lb_policy *policy, grpc_pollset *pollset,
grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
grpc_iomgr_closure *on_complete);
+
+ /** broadcast a transport op to all subchannels */
+ void (*broadcast)(grpc_lb_policy *policy, grpc_transport_op *op);
+
+ /** check the current connectivity of the lb_policy */
+ grpc_connectivity_state (*check_connectivity)(grpc_lb_policy *policy);
+
+ /** call notify when the connectivity state of a channel changes from *state.
+ Updates *state with the new state of the policy */
+ void (*notify_on_state_change)(grpc_lb_policy *policy, grpc_connectivity_state *state, grpc_iomgr_closure *closure);
};
void grpc_lb_policy_ref(grpc_lb_policy *policy);
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index b489ce04a5..5cbb5d9971 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -39,6 +39,7 @@
#include "src/core/channel/channel_args.h"
#include "src/core/channel/connected_channel.h"
+#include "src/core/channel/connectivity_state.h"
typedef struct {
gpr_refcount refs;
@@ -52,12 +53,6 @@ typedef struct waiting_for_connect {
grpc_subchannel_call **target;
} waiting_for_connect;
-typedef struct connectivity_state_watcher {
- struct connectivity_state_watcher *next;
- grpc_iomgr_closure *notify;
- grpc_connectivity_state *current;
-} connectivity_state_watcher;
-
struct grpc_subchannel {
gpr_refcount refs;
grpc_connector *connector;
@@ -92,8 +87,8 @@ struct grpc_subchannel {
int connecting;
/** things waiting for a connection */
waiting_for_connect *waiting;
- /** things watching the connectivity state */
- connectivity_state_watcher *watchers;
+ /** connectivity state tracking */
+ grpc_connectivity_state_tracker state_tracker;
};
struct grpc_subchannel_call {
@@ -123,6 +118,7 @@ void grpc_subchannel_unref(grpc_subchannel *c) {
gpr_free(c->addr);
grpc_mdctx_unref(c->mdctx);
grpc_pollset_set_destroy(&c->pollset_set);
+ grpc_connectivity_state_destroy(&c->state_tracker);
gpr_free(c);
}
}
@@ -156,6 +152,7 @@ grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
grpc_mdctx_ref(c->mdctx);
grpc_pollset_set_init(&c->pollset_set);
grpc_iomgr_closure_init(&c->connected, subchannel_connected, c);
+ grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE);
gpr_mu_init(&c->mu);
return c;
}
@@ -210,7 +207,7 @@ void grpc_subchannel_create_call(grpc_subchannel *c,
grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
grpc_connectivity_state state;
gpr_mu_lock(&c->mu);
- state = compute_connectivity_locked(c);
+ state = grpc_connectivity_state_check(&c->state_tracker);
gpr_mu_unlock(&c->mu);
return state;
}
@@ -218,35 +215,24 @@ grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
void grpc_subchannel_notify_on_state_change(grpc_subchannel *c,
grpc_connectivity_state *state,
grpc_iomgr_closure *notify) {
- grpc_connectivity_state current;
int do_connect = 0;
- connectivity_state_watcher *w = gpr_malloc(sizeof(*w));
- w->current = state;
- w->notify = notify;
gpr_mu_lock(&c->mu);
- current = compute_connectivity_locked(c);
- if (current == GRPC_CHANNEL_IDLE) {
- current = GRPC_CHANNEL_CONNECTING;
+ if (grpc_connectivity_state_notify_on_state_change(&c->state_tracker, state, notify)) {
+ do_connect = 1;
c->connecting = 1;
- do_connect = 1;
grpc_subchannel_ref(c);
- connectivity_state_changed_locked(c);
- }
- if (current != *state) {
- gpr_mu_unlock(&c->mu);
- *state = current;
- grpc_iomgr_add_callback(notify);
- gpr_free(w);
- } else {
- w->next = c->watchers;
- c->watchers = w;
- gpr_mu_unlock(&c->mu);
+ grpc_connectivity_state_set(&c->state_tracker, compute_connectivity_locked(c));
}
+ gpr_mu_unlock(&c->mu);
if (do_connect) {
start_connect(c);
}
}
+void grpc_subchannel_process_transport_op(grpc_subchannel *c, grpc_transport_op *op) {
+ abort();
+}
+
static void publish_transport(grpc_subchannel *c) {
size_t channel_stack_size;
connection *con;
@@ -311,21 +297,7 @@ static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
static void connectivity_state_changed_locked(grpc_subchannel *c) {
grpc_connectivity_state current = compute_connectivity_locked(c);
- connectivity_state_watcher *new = NULL;
- connectivity_state_watcher *w;
- while ((w = c->watchers)) {
- c->watchers = w->next;
-
- if (current != *w->current) {
- *w->current = current;
- grpc_iomgr_add_callback(w->notify);
- gpr_free(w);
- } else {
- w->next = new;
- new = w;
- }
- }
- c->watchers = new;
+ grpc_connectivity_state_set(&c->state_tracker, current);
}
/*
diff --git a/src/core/client_config/subchannel.h b/src/core/client_config/subchannel.h
index 8836f9b09c..60b95d3d8f 100644
--- a/src/core/client_config/subchannel.h
+++ b/src/core/client_config/subchannel.h
@@ -46,6 +46,15 @@ typedef struct grpc_subchannel_args grpc_subchannel_args;
void grpc_subchannel_ref(grpc_subchannel *channel);
void grpc_subchannel_unref(grpc_subchannel *channel);
+/** construct a call (possibly asynchronously) */
+void grpc_subchannel_create_call(grpc_subchannel *subchannel,
+ grpc_transport_stream_op *initial_op,
+ grpc_subchannel_call **target,
+ grpc_iomgr_closure *notify);
+
+/** process a transport level op */
+void grpc_subchannel_process_transport_op(grpc_subchannel *subchannel, grpc_transport_op *op);
+
void grpc_subchannel_call_ref(grpc_subchannel_call *call);
void grpc_subchannel_call_unref(grpc_subchannel_call *call);
@@ -62,12 +71,6 @@ void grpc_subchannel_notify_on_state_change(grpc_subchannel *channel,
void grpc_subchannel_add_interested_party(grpc_subchannel *channel, grpc_pollset *pollset);
void grpc_subchannel_del_interested_party(grpc_subchannel *channel, grpc_pollset *pollset);
-/** construct a call (possibly asynchronously) */
-void grpc_subchannel_create_call(grpc_subchannel *subchannel,
- grpc_transport_stream_op *initial_op,
- grpc_subchannel_call **target,
- grpc_iomgr_closure *notify);
-
/** continue processing a transport op */
void grpc_subchannel_call_process_op(grpc_subchannel_call *subchannel_call,
grpc_transport_stream_op *op);