aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config
diff options
context:
space:
mode:
authorGravatar Hongwei Wang <hongweiw@google.com>2015-07-27 16:41:54 -0700
committerGravatar Hongwei Wang <hongweiw@google.com>2015-07-27 16:41:54 -0700
commit6e732ea17f684a58773d3489074763d44d1813af (patch)
treea309ba748a14798855086a70ff289e09530e0278 /src/core/client_config
parent35d5a0fd64a73b49811b06c5cadfc001f7968658 (diff)
Add zookeeper watch
Diffstat (limited to 'src/core/client_config')
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c58
1 files changed, 38 insertions, 20 deletions
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index ad161142c2..cb76182123 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -49,8 +49,6 @@
/** Zookeeper session expiration time in milliseconds */
#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000
-/** Set zookeeper watch */
-#define GRPC_ZOOKEEPER_WATCH 1
typedef struct {
/** base class: must be first */
@@ -86,7 +84,7 @@ typedef struct {
grpc_resolved_addresses *resolved_addrs;
/** total number of addresses to be resolved */
int resolved_total;
- /** resolved number of addresses */
+ /** number of addresses resolved */
int resolved_num;
} zookeeper_resolver;
@@ -132,6 +130,7 @@ static void zookeeper_next(grpc_resolver *resolver,
grpc_client_config **target_config,
grpc_iomgr_closure *on_complete) {
zookeeper_resolver *r = (zookeeper_resolver *)resolver;
+ gpr_log(GPR_INFO, "zookeeper_next");
gpr_mu_lock(&r->mu);
GPR_ASSERT(r->next_completion == NULL);
r->next_completion = on_complete;
@@ -144,6 +143,34 @@ static void zookeeper_next(grpc_resolver *resolver,
gpr_mu_unlock(&r->mu);
}
+/** Zookeeper global watcher for connection management */
+static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type, int state,
+ const char *path, void *watcher_ctx) {
+ if (type == ZOO_SESSION_EVENT) {
+ if (state == ZOO_EXPIRED_SESSION_STATE) {
+ gpr_log(GPR_ERROR, "Zookeeper session expired");
+ } else if (state == ZOO_AUTH_FAILED_STATE) {
+ gpr_log(GPR_ERROR, "Zookeeper authentication failed");
+ }
+ }
+}
+
+/** Zookeeper watcher for handling updates to watched nodes */
+static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
+ const char *path, void *watcher_ctx) {
+ if (watcher_ctx != NULL) {
+ zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx;
+ gpr_log(GPR_INFO, "tpye = %d, state = %d", type, state);
+ if (state == ZOO_CONNECTED_STATE){
+ gpr_mu_lock(&r->mu);
+ if (r->resolving == 0) {
+ zookeeper_start_resolving_locked(r);
+ }
+ gpr_mu_unlock(&r->mu);
+ }
+ }
+}
+
static void zookeeper_on_resolved(void *arg,
grpc_resolved_addresses *addresses) {
zookeeper_resolver *r = arg;
@@ -272,6 +299,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value,
gpr_free(buffer);
if (address != NULL) {
/** Further resolve address by DNS */
+ gpr_log(GPR_INFO, address);
grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
gpr_free(address);
} else {
@@ -315,8 +343,7 @@ static void zookeeper_get_children_completion(
strcat(path, r->name);
strcat(path, "/");
strcat(path, children->data[i]);
- status = zoo_aget(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH,
- zookeeper_get_children_node_completion, r);
+ status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r, zookeeper_get_children_node_completion, r);
gpr_free(path);
if (status != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path);
@@ -332,7 +359,6 @@ static void zookeeper_get_node_completion(int rc, const char *value,
char *address = NULL;
char *buffer = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
-
r->resolved_addrs = NULL;
r->resolved_total = 0;
r->resolved_num = 0;
@@ -342,13 +368,13 @@ static void zookeeper_get_node_completion(int rc, const char *value,
return;
}
- /** If zookeeper node of path r->name does not have address
- (i.e. service node), get its children */
+ /** If zookeeper node of path r->name does not have address (i.e. service node), get its children */
buffer = gpr_malloc(value_len);
memcpy(buffer, value, value_len);
address = zookeeper_parse_address(buffer, value_len);
gpr_free(buffer);
if (address != NULL) {
+ gpr_log(GPR_INFO, address);
r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
r->resolved_addrs->addrs = NULL;
r->resolved_addrs->naddrs = 0;
@@ -359,8 +385,7 @@ static void zookeeper_get_node_completion(int rc, const char *value,
return;
}
- status = zoo_aget_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH,
- zookeeper_get_children_completion, r);
+ status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher, r, zookeeper_get_children_completion, r);
if (status != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name);
}
@@ -368,8 +393,7 @@ static void zookeeper_get_node_completion(int rc, const char *value,
static void zookeeper_resolve_address(zookeeper_resolver *r) {
int status;
- status = zoo_aget(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH,
- zookeeper_get_node_completion, r);
+ status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r, zookeeper_get_node_completion, r);
if (status != 0) {
gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name);
}
@@ -379,7 +403,7 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving");
GPR_ASSERT(r->resolving == 0);
r->resolving = 1;
-
+ gpr_log(GPR_INFO, "zookeeper_start_resolving_locked");
zookeeper_resolve_address(r);
}
@@ -407,12 +431,6 @@ static void zookeeper_destroy(grpc_resolver *gr) {
gpr_free(r);
}
-/** Zookeeper watcher function - handle updates to any watched nodes */
-static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
- const char *path, void *watcher_ctx) {
-
-}
-
static grpc_resolver *zookeeper_create(
grpc_uri *uri,
grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels,
@@ -440,7 +458,7 @@ static grpc_resolver *zookeeper_create(
/** Initialize zookeeper client */
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
- r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_watcher,
+ r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher,
GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0);
if (r->zookeeper_handle == NULL) {
gpr_log(GPR_ERROR, "Unable to connect to zookeeper server");