diff options
author | Hongwei Wang <hongweiw@google.com> | 2015-07-24 11:14:23 -0700 |
---|---|---|
committer | Hongwei Wang <hongweiw@google.com> | 2015-07-24 11:14:23 -0700 |
commit | 35d5a0fd64a73b49811b06c5cadfc001f7968658 (patch) | |
tree | f913a8794e9f349dceb05ebfeed6f5fcbddf52bf /src/core/client_config/resolvers | |
parent | cd62075eb6e9c4aba6d9de9be9e2a9615844c3d3 (diff) |
Formatting
Diffstat (limited to 'src/core/client_config/resolvers')
-rw-r--r-- | src/core/client_config/resolvers/zookeeper_resolver.c | 122 | ||||
-rw-r--r-- | src/core/client_config/resolvers/zookeeper_resolver.h | 4 |
2 files changed, 75 insertions, 51 deletions
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 71a60b73ed..ad161142c2 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -47,9 +47,10 @@ #include "src/core/support/string.h" #include "src/core/json/json.h" -#define GRPC_MAX_ZOOKEEPER_BUFFER_SIZE 1024 -#define GRPC_ZOOKEEPER_TIMEOUT 15000 -#define GRPC_ZOOKEEPER_WATCH 0 +/** Zookeeper session expiration time in milliseconds */ +#define GRPC_ZOOKEEPER_SESSION_TIMEOUT 15000 +/** Set zookeeper watch */ +#define GRPC_ZOOKEEPER_WATCH 1 typedef struct { /** base class: must be first */ @@ -121,7 +122,7 @@ static void zookeeper_channel_saw_error(grpc_resolver *resolver, struct sockaddr *sa, int len) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; gpr_mu_lock(&r->mu); - if (!r->resolving) { + if (r->resolving == 0) { zookeeper_start_resolving_locked(r); } gpr_mu_unlock(&r->mu); @@ -132,10 +133,10 @@ static void zookeeper_next(grpc_resolver *resolver, grpc_iomgr_closure *on_complete) { zookeeper_resolver *r = (zookeeper_resolver *)resolver; gpr_mu_lock(&r->mu); - GPR_ASSERT(!r->next_completion); + GPR_ASSERT(r->next_completion == NULL); r->next_completion = on_complete; r->target_config = target_config; - if (r->resolved_version == 0 && !r->resolving) { + if (r->resolved_version == 0 && r->resolving == 0) { zookeeper_start_resolving_locked(r); } else { zookeeper_maybe_finish_next_locked(r); @@ -151,7 +152,7 @@ static void zookeeper_on_resolved(void *arg, grpc_subchannel_args args; grpc_lb_policy *lb_policy; size_t i; - if (addresses) { + if (addresses != NULL) { config = grpc_client_config_create(); subchannels = gpr_malloc(sizeof(grpc_subchannel *) * addresses->naddrs); for (i = 0; i < addresses->naddrs; i++) { @@ -168,9 +169,9 @@ static void zookeeper_on_resolved(void *arg, gpr_free(subchannels); } gpr_mu_lock(&r->mu); - GPR_ASSERT(r->resolving); + GPR_ASSERT(r->resolving == 1); r->resolving = 0; - if (r->resolved_config) { + if (r->resolved_config != NULL) { grpc_client_config_unref(r->resolved_config); } r->resolved_config = config; @@ -181,11 +182,13 @@ static void zookeeper_on_resolved(void *arg, GRPC_RESOLVER_UNREF(&r->base, "zookeeper-resolving"); } -/* Callback function for each DNS resolved address */ +/** Callback function for each DNS resolved address */ static void zookeeper_dns_resolved(void *arg, grpc_resolved_addresses *addresses) { size_t i; zookeeper_resolver *r = arg; + int resolve_all = 0; + gpr_mu_lock(&r->mu); r->resolved_num++; r->resolved_addrs->addrs = @@ -202,19 +205,18 @@ static void zookeeper_dns_resolved(void *arg, r->resolved_addrs->naddrs += addresses->naddrs; grpc_resolved_addresses_destroy(addresses); - /* Wait for all addresses to be resolved */ - if (r->resolved_num == r->resolved_total) { - gpr_mu_unlock(&r->mu); + /** Wait for all addresses to be resolved */ + resolve_all = (r->resolved_num == r->resolved_total); + gpr_mu_unlock(&r->mu); + if (resolve_all) { zookeeper_on_resolved(r, r->resolved_addrs); - } else { - gpr_mu_unlock(&r->mu); } } -/* Parse json format address of a zookeeper node */ +/** Parse json format address of a zookeeper node */ static char *zookeeper_parse_address(char *buffer, int buffer_len) { - char *host; - char *port; + const char *host; + const char *port; char *address; grpc_json *json; grpc_json *cur; @@ -226,11 +228,15 @@ static char *zookeeper_parse_address(char *buffer, int buffer_len) { port = NULL; for (cur = json->child; cur != NULL; cur = cur->next) { if (!strcmp(cur->key, "host")) { - host = (char *)cur->value; - if (port != NULL) break; + host = cur->value; + if (port != NULL) { + break; + } } else if (!strcmp(cur->key, "port")) { - port = (char *)cur->value; - if (host != NULL) break; + port = cur->value; + if (host != NULL) { + break; + } } } if (host != NULL && port != NULL) { @@ -251,38 +257,43 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, const struct Stat *stat, const void *arg) { char *address = NULL; + char *buffer = NULL; zookeeper_resolver *r = (zookeeper_resolver *)arg; + int resolve_all = 0; - if (rc) { + if (rc != 0) { gpr_log(GPR_ERROR, "Error in getting a child node of %s", r->name); return; } - address = zookeeper_parse_address((char *)value, value_len); + buffer = gpr_malloc(value_len); + memcpy(buffer, value, value_len); + address = zookeeper_parse_address(buffer, value_len); + gpr_free(buffer); if (address != NULL) { - /* Further resolve address by DNS */ + /** Further resolve address by DNS */ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); gpr_free(address); } else { gpr_mu_lock(&r->mu); r->resolved_total--; - if (r->resolved_num == r->resolved_total) { - gpr_mu_unlock(&r->mu); + resolve_all = (r->resolved_num == r->resolved_total); + gpr_mu_unlock(&r->mu); + if (resolve_all) { zookeeper_on_resolved(r, r->resolved_addrs); - } else { - gpr_mu_unlock(&r->mu); } } } static void zookeeper_get_children_completion( int rc, const struct String_vector *children, const void *arg) { + char *path; + int path_length; int status; - char path[GRPC_MAX_ZOOKEEPER_BUFFER_SIZE]; int i; zookeeper_resolver *r = (zookeeper_resolver *)arg; - if (rc) { + if (rc != 0) { gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); return; } @@ -298,13 +309,18 @@ static void zookeeper_get_children_completion( r->resolved_total = children->count; for (i = 0; i < children->count; i++) { - memset(path, 0, GRPC_MAX_ZOOKEEPER_BUFFER_SIZE); + path_length = strlen(r->name) + strlen(children->data[i]) + 2; + path = gpr_malloc(path_length); + memset(path, 0, path_length); strcat(path, r->name); strcat(path, "/"); strcat(path, children->data[i]); status = zoo_aget(r->zookeeper_handle, path, GRPC_ZOOKEEPER_WATCH, zookeeper_get_children_node_completion, r); - if (status) gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path); + gpr_free(path); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", path); + } } } @@ -314,27 +330,30 @@ static void zookeeper_get_node_completion(int rc, const char *value, const void *arg) { int status; char *address = NULL; + char *buffer = NULL; zookeeper_resolver *r = (zookeeper_resolver *)arg; r->resolved_addrs = NULL; r->resolved_total = 0; r->resolved_num = 0; - if (rc) { + if (rc != 0) { gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); return; } - /* If zookeeper node of path r->name does not have address (i.e. service - node), - get its children */ - address = zookeeper_parse_address((char *)value, value_len); + /** If zookeeper node of path r->name does not have address + (i.e. service node), get its children */ + buffer = gpr_malloc(value_len); + memcpy(buffer, value, value_len); + address = zookeeper_parse_address(buffer, value_len); + gpr_free(buffer); if (address != NULL) { r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); r->resolved_addrs->addrs = NULL; r->resolved_addrs->naddrs = 0; r->resolved_total = 1; - /* Further resolve address by DNS */ + /** Further resolve address by DNS */ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); gpr_free(address); return; @@ -342,20 +361,23 @@ static void zookeeper_get_node_completion(int rc, const char *value, status = zoo_aget_children(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH, zookeeper_get_children_completion, r); - if (status) + if (status != 0) { gpr_log(GPR_ERROR, "Error in getting zookeeper children of %s", r->name); + } } static void zookeeper_resolve_address(zookeeper_resolver *r) { int status; status = zoo_aget(r->zookeeper_handle, r->name, GRPC_ZOOKEEPER_WATCH, zookeeper_get_node_completion, r); - if (status) gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + if (status != 0) { + gpr_log(GPR_ERROR, "Error in getting zookeeper node %s", r->name); + } } static void zookeeper_start_resolving_locked(zookeeper_resolver *r) { GRPC_RESOLVER_REF(&r->base, "zookeeper-resolving"); - GPR_ASSERT(!r->resolving); + GPR_ASSERT(r->resolving == 0); r->resolving = 1; zookeeper_resolve_address(r); @@ -365,7 +387,7 @@ static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { if (r->next_completion != NULL && r->resolved_version != r->published_version) { *r->target_config = r->resolved_config; - if (r->resolved_config) { + if (r->resolved_config != NULL) { grpc_client_config_ref(r->resolved_config); } grpc_iomgr_add_callback(r->next_completion); @@ -377,7 +399,7 @@ static void zookeeper_maybe_finish_next_locked(zookeeper_resolver *r) { static void zookeeper_destroy(grpc_resolver *gr) { zookeeper_resolver *r = (zookeeper_resolver *)gr; gpr_mu_destroy(&r->mu); - if (r->resolved_config) { + if (r->resolved_config != NULL) { grpc_client_config_unref(r->resolved_config); } grpc_subchannel_factory_unref(r->subchannel_factory); @@ -385,9 +407,11 @@ static void zookeeper_destroy(grpc_resolver *gr) { gpr_free(r); } -/* Zookeeper watcher function - handle updates to any watched nodes */ +/** Zookeeper watcher function - handle updates to any watched nodes */ static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, - const char *path, void *watcher_ctx) {} + const char *path, void *watcher_ctx) { + +} static grpc_resolver *zookeeper_create( grpc_uri *uri, @@ -414,10 +438,10 @@ static grpc_resolver *zookeeper_create( r->lb_policy_factory = lb_policy_factory; grpc_subchannel_factory_ref(subchannel_factory); - /* Initialize zookeeper client */ + /** Initialize zookeeper client */ zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); r->zookeeper_handle = zookeeper_init(uri->authority, zookeeper_watcher, - GRPC_ZOOKEEPER_TIMEOUT, 0, 0, 0); + GRPC_ZOOKEEPER_SESSION_TIMEOUT, 0, 0, 0); if (r->zookeeper_handle == NULL) { gpr_log(GPR_ERROR, "Unable to connect to zookeeper server"); return NULL; @@ -458,4 +482,4 @@ static grpc_resolver_factory zookeeper_resolver_factory = { grpc_resolver_factory *grpc_zookeeper_resolver_factory_create() { return &zookeeper_resolver_factory; -}
\ No newline at end of file +} diff --git a/src/core/client_config/resolvers/zookeeper_resolver.h b/src/core/client_config/resolvers/zookeeper_resolver.h index 825b64e539..a6f002dd6d 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.h +++ b/src/core/client_config/resolvers/zookeeper_resolver.h @@ -36,7 +36,7 @@ #include "src/core/client_config/resolver_factory.h" -/** Create a zookeeper resolver for \a name */ +/** Create a zookeeper resolver factory */ grpc_resolver_factory *grpc_zookeeper_resolver_factory_create(void); -#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */
\ No newline at end of file +#endif /* GRPC_INTERNAL_CORE_CLIENT_CONFIG_RESOLVERS_ZOOKEEPER_RESOLVER_H */ |