aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/client_config/resolvers
diff options
context:
space:
mode:
authorGravatar Hongwei Wang <hongweiw@google.com>2015-07-14 17:01:49 -0700
committerGravatar Hongwei Wang <hongweiw@google.com>2015-07-14 17:01:49 -0700
commitd65009d84fc71b9a79ab6a5c291fc0da9fa68798 (patch)
tree2280f94ce42ab7cf16192d9c6336bea77622125f /src/core/client_config/resolvers
parent85fd2f7a328f11327ae9ad5730e1232828e83370 (diff)
Support JSON format of Zookeeper node.
If a node does not contain IP or port, consider its children.
Diffstat (limited to 'src/core/client_config/resolvers')
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c136
1 files changed, 107 insertions, 29 deletions
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 73abc155b9..e124de93f7 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -43,10 +43,11 @@
#include "src/core/client_config/lb_policies/pick_first.h"
#include "src/core/iomgr/resolve_address.h"
#include "src/core/support/string.h"
+#include "src/core/json/json.h"
-#define GRPC_ZOOKEEPER_MAX_SIZE 128
+#define GRPC_MAX_ZOOKEEPER_SIZE 1024
#define GRPC_ZOOKEEPER_TIMEOUT 15000
-#define GRPC_ZOOKEEPER_WATCH 1
+#define GRPC_ZOOKEEPER_WATCH 0
typedef struct {
/** base class: must be first */
@@ -78,12 +79,12 @@ typedef struct {
/** zookeeper handle */
zhandle_t *zookeeper_handle;
- /** zookeeper connection state */
- int zookeeper_state;
+ /** zookeeper resolved addresses */
grpc_resolved_addresses * resolved_addrs;
+ /** zookeeper total number of addresses to be resolved */
int resolved_total;
+ /** zookeeper resolved number of addresses */
int resolved_num;
-
} zookeeper_resolver;
static void zookeeper_destroy(grpc_resolver *r);
@@ -180,9 +181,12 @@ static void zookeeper_dns_resolved(void *arg, grpc_resolved_addresses *addresses
size_t i;
zookeeper_resolver *r = arg;
r->resolved_num++;
+ gpr_log(GPR_INFO, "log");
+ gpr_log(GPR_INFO, "%d", addresses->naddrs);
+ gpr_log(GPR_INFO, "%d", r->resolved_addrs->naddrs);
r->resolved_addrs->addrs = gpr_realloc(r->resolved_addrs->addrs,
sizeof(grpc_resolved_address) * (r->resolved_addrs->naddrs + addresses->naddrs));
-
+ gpr_log(GPR_INFO, "log");
for (i = 0; i < addresses->naddrs; i++) {
memcpy(r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].addr, addresses->addrs[i].addr, addresses->addrs[i].len);
r->resolved_addrs->addrs[i + r->resolved_addrs->naddrs].len = addresses->addrs[i].len;
@@ -195,46 +199,120 @@ static void zookeeper_dns_resolved(void *arg, grpc_resolved_addresses *addresses
zookeeper_on_resolved(r, r->resolved_addrs);
}
+/** Parse json format address of a zookeeper node */
+static char *zookeeper_parse_address(char *buffer, int buffer_len) {
+ char *host;
+ char *port;
+ grpc_json *json;
+ grpc_json *cur;
+ char *address;
+
+ gpr_log(GPR_INFO, buffer);
+ address = NULL;
+ json = grpc_json_parse_string_with_len(buffer, buffer_len);
+ if (json != NULL) {
+ host = NULL;
+ port = NULL;
+ for (cur = json->child; cur != NULL; cur = cur->next) {
+ if (!strcmp(cur->key, "host")) {
+ host = (char *)cur->value;
+ }
+ if (!strcmp(cur->key, "port")) {
+ port = (char *)cur->value;
+ }
+ }
+ if (host != NULL && port != NULL) {
+ address = gpr_malloc(GRPC_MAX_SOCKADDR_SIZE);
+ memset(address, 0, GRPC_MAX_SOCKADDR_SIZE);
+ strcat(address, host);
+ strcat(address, ":");
+ strcat(address, port);
+ gpr_log(GPR_INFO, address);
+ } else {
+ gpr_log(GPR_ERROR, "Cannot resolve zookeeper address: no host or port");
+ }
+ grpc_json_destroy(json);
+ } else {
+ gpr_log(GPR_ERROR, "Cannot resolve zookeeper address: json parse error");
+ }
+ return address;
+}
+
/** Resolve address by zookeeper */
static void zookeeper_resolve_address(zookeeper_resolver *r) {
struct String_vector addresses;
int i;
int status;
- char path[GRPC_ZOOKEEPER_MAX_SIZE];
- char buffer[GRPC_ZOOKEEPER_MAX_SIZE];
+ char path[GRPC_MAX_ZOOKEEPER_SIZE];
+ char buffer[GRPC_MAX_ZOOKEEPER_SIZE];
+ char *address;
int buffer_len;
r->resolved_addrs = NULL;
r->resolved_total = 0;
r->resolved_num = 0;
gpr_log(GPR_INFO, r->name);
- status = zoo_get_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH, &addresses);
+
+ address = NULL;
+ memset(path, 0, GRPC_MAX_ZOOKEEPER_SIZE);
+ memset(buffer, 0, GRPC_MAX_ZOOKEEPER_SIZE);
+ buffer_len = GRPC_MAX_ZOOKEEPER_SIZE;
+
+ /** Read zookeeper node of given path r->name
+ If not containing address, read its children */
+ status = zoo_get(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH, buffer, &buffer_len, NULL);
+ status = 0;
if (!status) {
- /** Assume no children are deleted */
- r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
- r->resolved_addrs->addrs = NULL;
- r->resolved_addrs->naddrs = 0;
- r->resolved_total = addresses.count;
- for (i = 0; i < addresses.count; i++) {
- memset(path, 0, GRPC_ZOOKEEPER_MAX_SIZE);
- strcat(path, r->name);
- strcat(path, "/");
- strcat(path, addresses.data[i]);
-
- gpr_log(GPR_INFO, path);
- memset(buffer, 0, GRPC_ZOOKEEPER_MAX_SIZE);
- status = zoo_get(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH, buffer, &buffer_len, NULL);
- if (!status) {
- gpr_log(GPR_INFO, buffer);
- grpc_resolve_address(buffer, NULL, zookeeper_dns_resolved, r);
- } else {
- gpr_log(GPR_ERROR, "Cannot resolve zookeeper address");
+ if (buffer_len > 0) {
+ address = zookeeper_parse_address(buffer, buffer_len);
+ if (address != NULL) {
+ r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
+ r->resolved_addrs->addrs = NULL;
+ r->resolved_addrs->naddrs = 0;
+ r->resolved_total = 1;
+ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
+ gpr_free(address);
+ return;
+ }
+ }
+
+ buffer_len = GRPC_MAX_ZOOKEEPER_SIZE;
+ status = zoo_get_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH, &addresses);
+ if (!status) {
+ /** Assume no children are deleted */
+ r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
+ r->resolved_addrs->addrs = NULL;
+ r->resolved_addrs->naddrs = 0;
+ r->resolved_total = addresses.count;
+ for (i = 0; i < addresses.count; i++) {
+ memset(path, 0, GRPC_MAX_ZOOKEEPER_SIZE);
+ strcat(path, r->name);
+ strcat(path, "/");
+ strcat(path, addresses.data[i]);
+
+ gpr_log(GPR_INFO, path);
+ memset(buffer, 0, GRPC_MAX_ZOOKEEPER_SIZE);
+ status = zoo_get(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH, buffer, &buffer_len, NULL);
+ if (!status) {
+ if (buffer_len > 0) {
+ address = zookeeper_parse_address(buffer, buffer_len);
+ if (address != NULL) {
+ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
+ }
+ }
+ } else {
+ gpr_log(GPR_ERROR, "Cannot resolve zookeeper address: read zookeeper node error");
+ }
}
+ } else {
+ gpr_log(GPR_ERROR, "Cannot resolve zookeeper address: get zookeeper children error");
}
} else {
gpr_log(GPR_ERROR, "Cannot resolve zookeeper address");
}
+
+ gpr_free(address);
}
static void zookeeper_start_resolving_locked(zookeeper_resolver *r) {
@@ -281,6 +359,7 @@ static grpc_resolver *zookeeper_create(
grpc_subchannel_factory *subchannel_factory) {
zookeeper_resolver *r;
const char *path = uri->path;
+ gpr_log(GPR_INFO, path);
if (0 == strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "no authority specified in zookeeper uri");
@@ -300,7 +379,6 @@ static grpc_resolver *zookeeper_create(
/** Initialize zookeeper client */
zoo_set_debug_level(ZOO_LOG_LEVEL_WARN);
r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_watcher, GRPC_ZOOKEEPER_TIMEOUT, 0, 0, 0);
-
if (r->zookeeper_handle == NULL) {
gpr_log(GPR_ERROR, "Cannot connect to zookeeper servers");
return NULL;