diff options
author | Hongwei Wang <hongweiw@google.com> | 2015-07-27 16:41:54 -0700 |
---|---|---|
committer | Hongwei Wang <hongweiw@google.com> | 2015-07-27 16:41:54 -0700 |
commit | 6e732ea17f684a58773d3489074763d44d1813af (patch) | |
tree | a309ba748a14798855086a70ff289e09530e0278 /src/core/client_config | |
parent | 35d5a0fd64a73b49811b06c5cadfc001f7968658 (diff) |
Add zookeeper watch
Diffstat (limited to 'src/core/client_config')
-rw-r--r-- | src/core/client_config/resolvers/zookeeper_resolver.c | 58 |
1 files changed, 38 insertions, 20 deletions
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index ad161142c2..cb76182123 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -49,8 +49,6 @@ /** Zookeeper session expiration time in milliseconds */ #define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000 -/** Set zookeeper watch */ -#define GRPC_ZOOKEEPER_WATCH 1 typedef struct { /** base class: must be first */ @@ -86,7 +84,7 @@ typedef struct { grpc_resolved_addresses *resolved_addrs; /** total number of addresses to be resolved */ int resolved_total; - /** resolved number of addresses */ + /** number of addresses resolved */ int resolved_num; } zookeeper_resolver; @@ -132,6 +130,7 @@ static void zookeeper_next(grpc_resolver *resolver, grpc_client_config **target_config, grpc_iomgr_closure *on_complete) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; + gpr_log(GPR_INFO, "zookeeper_next"); gpr_mu_lock(&r->mu); GPR_ASSERT(r->next_completion == NULL); r->next_completion = on_complete; @@ -144,6 +143,34 @@ static void zookeeper_next(grpc_resolver *resolver, gpr_mu_unlock(&r->mu); } +/** Zookeeper global watcher for connection management */ +static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type, int state, + const char *path, void *watcher_ctx) { + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_EXPIRED_SESSION_STATE) { + gpr_log(GPR_ERROR, "Zookeeper session expired"); + } else if (state == ZOO_AUTH_FAILED_STATE) { + gpr_log(GPR_ERROR, "Zookeeper authentication failed"); + } + } +} + +/** Zookeeper watcher for handling updates to watched nodes */ +static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, + const char *path, void *watcher_ctx) { + if (watcher_ctx != NULL) { + zookeeper_resolver *r = (zookeeper_resolver *)watcher_ctx; + gpr_log(GPR_INFO, "tpye = %d, state = %d", type, state); + if (state == ZOO_CONNECTED_STATE){ + gpr_mu_lock(&r->mu); + if (r->resolving == 0) { + zookeeper_start_resolving_locked(r); + } + gpr_mu_unlock(&r->mu); + } + } +} + static void zookeeper_on_resolved(void *arg, grpc_resolved_addresses *addresses) { zookeeper_resolver *r = arg; @@ -272,6 +299,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, gpr_free(buffer); if (address != NULL) { /** Further resolve address by DNS */ + gpr_log(GPR_INFO, address); grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); gpr_free(address); } else { @@ -315,8 +343,7 @@ static void zookeeper_get_children_completion( strcat(path, r->name); strcat(path, "/"); strcat(path, children->data[i]); - status = zoo_aget(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH, - zookeeper_get_children_node_completion, r); + status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r, zookeeper_get_children_node_completion, r); gpr_free(path); if (status != 0) { gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path); @@ -332,7 +359,6 @@ static void zookeeper_get_node_completion(int rc, const char *value, char *address = NULL; char *buffer = NULL; zookeeper_resolver *r = (zookeeper_resolver *)arg; - r->resolved_addrs = NULL; r->resolved_total = 0; r->resolved_num = 0; @@ -342,13 +368,13 @@ static void zookeeper_get_node_completion(int rc, const char *value, return; } - /** If zookeeper node of path r->name does not have address - (i.e. service node), get its children */ + /** If zookeeper node of path r->name does not have address (i.e. service node), get its children */ buffer = gpr_malloc(value_len); memcpy(buffer, value, value_len); address = zookeeper_parse_address(buffer, value_len); gpr_free(buffer); if (address != NULL) { + gpr_log(GPR_INFO, address); r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); r->resolved_addrs->addrs = NULL; r->resolved_addrs->naddrs = 0; @@ -359,8 +385,7 @@ static void zookeeper_get_node_completion(int rc, const char *value, return; } - status = zoo_aget_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH, - zookeeper_get_children_completion, r); + status = zoo_awget_children(r->zookeeper_handle, r->name, zookeeper_watcher, r, zookeeper_get_children_completion, r); if (status != 0) { gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); } @@ -368,8 +393,7 @@ static void zookeeper_get_node_completion(int rc, const char *value, static void zookeeper_resolve_address(zookeeper_resolver *r) { int status; - status = zoo_aget(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH, - zookeeper_get_node_completion, r); + status = zoo_awget(r->zookeeper_handle, r->name, zookeeper_watcher, r, zookeeper_get_node_completion, r); if (status != 0) { gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); } @@ -379,7 +403,7 @@ static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving"); GPR_ASSERT(r->resolving == 0); r->resolving = 1; - + gpr_log(GPR_INFO, "zookeeper_start_resolving_locked"); zookeeper_resolve_address(r); } @@ -407,12 +431,6 @@ static void zookeeper_destroy(grpc_resolver *gr) { gpr_free(r); } -/** Zookeeper watcher function - handle updates to any watched nodes */ -static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, - const char *path, void *watcher_ctx) { - -} - static grpc_resolver *zookeeper_create( grpc_uri *uri, grpc_lb_policy *(*lb_policy_factory)(grpc_subchannel **subchannels, @@ -440,7 +458,7 @@ static grpc_resolver *zookeeper_create( /** Initialize zookeeper client */ zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); - r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_watcher, + r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_global_watcher, GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); if (r->zookeeper_handle == NULL) { gpr_log(GPR_ERROR, "Unable to connect to zookeeper server"); |