aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/lb_policies/pick_first.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-27 11:48:42 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-27 11:48:42 -0700
commitc7b5f7605e7210e2bfa1915f585a9ccfa44bbb30 (patch)
treeeb741452bfacc1d942ddf59ba978f2a0bfc202ca /src/core/client_config/lb_policies/pick_first.c
parent7874e17d77f262f682a88f38ca24cf9a7b45542f (diff)
Factor out channel state watching
Diffstat (limited to 'src/core/client_config/lb_policies/pick_first.c')
-rw-r--r--src/core/client_config/lb_policies/pick_first.c44
1 files changed, 43 insertions, 1 deletions
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index f3a6d21eb5..4fda07a63f 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -36,6 +36,7 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include "src/core/channel/connectivity_state.h"
typedef struct pending_pick {
struct pending_pick *next;
@@ -69,6 +70,9 @@ typedef struct {
grpc_connectivity_state checking_connectivity;
/** list of picks that are waiting on connectivity */
pending_pick *pending_picks;
+
+ /** our connectivity state tracker */
+ grpc_connectivity_state_tracker state_tracker;
} pick_first_lb_policy;
void pf_ref(grpc_lb_policy *pol) {
@@ -184,8 +188,46 @@ loop:
}
}
+static void pf_broadcast(grpc_lb_policy *pol, grpc_transport_op *op) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ size_t i;
+ size_t n;
+ grpc_subchannel **subchannels;
+
+ gpr_mu_lock(&p->mu);
+ n = p->num_subchannels;
+ subchannels = gpr_malloc(n * sizeof(*subchannels));
+ for (i = 0; i < n; i++) {
+ subchannels[i] = p->subchannels[i];
+ grpc_subchannel_ref(subchannels[i]);
+ }
+ gpr_mu_unlock(&p->mu);
+
+ for (i = 0; i < n; i++) {
+ grpc_subchannel_process_transport_op(subchannels[i], op);
+ grpc_subchannel_unref(subchannels[i]);
+ }
+ gpr_free(subchannels);
+}
+
+static grpc_connectivity_state pf_check_connectivity(grpc_lb_policy *pol) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ grpc_connectivity_state st;
+ gpr_mu_lock(&p->mu);
+ st = grpc_connectivity_state_check(&p->state_tracker);
+ gpr_mu_unlock(&p->mu);
+ return st;
+}
+
+static void pf_notify_on_state_change(grpc_lb_policy *pol, grpc_connectivity_state *current, grpc_iomgr_closure *notify) {
+ pick_first_lb_policy *p = (pick_first_lb_policy*)pol;
+ gpr_mu_lock(&p->mu);
+ grpc_connectivity_state_notify_on_state_change(&p->state_tracker, current, notify);
+ gpr_mu_unlock(&p->mu);
+}
+
static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
- pf_ref, pf_unref, pf_shutdown, pf_pick};
+ pf_ref, pf_unref, pf_shutdown, pf_pick, pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
grpc_lb_policy *grpc_create_pick_first_lb_policy(grpc_subchannel **subchannels,
size_t num_subchannels) {