aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Hongwei Wang <hongweiw@google.com>2015-08-13 16:13:10 -0700
committerGravatar Hongwei Wang <hongweiw@google.com>2015-08-13 16:13:10 -0700
commit85ad685e5771d53d3f2ca667995498efe4dff1dd (patch)
tree83aad9a8b606898df1c1fbbe7cb0be3d04ecd14c
parentff6097ac49277841e2cedde3268d85aa5e3a5fcf (diff)
Address comments and update grpc plugins
-rw-r--r--include/grpc/grpc.h23
-rw-r--r--include/grpc/grpc_zookeeper.h11
-rw-r--r--src/core/client_config/resolvers/zookeeper_resolver.c52
-rw-r--r--src/core/surface/init.c56
-rw-r--r--test/cpp/end2end/zookeeper_test.cc34
5 files changed, 93 insertions, 83 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h
index 943982554a..fe4d9f929f 100644
--- a/include/grpc/grpc.h
+++ b/include/grpc/grpc.h
@@ -355,12 +355,15 @@ typedef struct grpc_op {
} data;
} grpc_op;
+/** Registers a plugin to be initialized and destroyed with the library.
-/** Registers a plugin to be initialized and deinitialized with the library.
-
- It is safe to pass NULL to either argument. The initialization and
- deinitialization order isn't guaranteed. */
-void grpc_register_plugin(void (*init)(void), void (*deinit)(void));
+ The \a init and \a destroy functions will be invoked as part of
+ \a grpc_init() and \a grpc_shutdown(), respectively.
+ Note that these functions can be invoked an arbitrary number of times
+ (and hence so will \a init and \a destroy).
+ It is safe to pass NULL to either argument. Plugins are destroyed in
+ the reverse order they were initialized. */
+void grpc_register_plugin(void (*init)(void), void (*destroy)(void));
/** Frees the memory used by all the plugin information.
@@ -381,8 +384,8 @@ void grpc_unregister_all_plugins();
/* Default propagation mask: clients of the core API are encouraged to encode
deltas from this in their implementations... ie write:
- GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline
- propagation. Doing so gives flexibility in the future to define new
+ GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline
+ propagation. Doing so gives flexibility in the future to define new
propagation types that are default inherited or not. */
#define GRPC_PROPAGATE_DEFAULTS \
((gpr_uint32)(( \
@@ -429,8 +432,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq,
otherwise a grpc_event describing the event that occurred.
Callers must not call grpc_completion_queue_next and
- grpc_completion_queue_pluck simultaneously on the same completion queue.
-
+ grpc_completion_queue_pluck simultaneously on the same completion queue.
+
Completion queues support a maximum of GRPC_MAX_COMPLETION_QUEUE_PLUCKERS
concurrently executing plucks at any time. */
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag,
@@ -470,7 +473,7 @@ void grpc_channel_watch_connectivity_state(
completions are sent to 'completion_queue'. 'method' and 'host' need only
live through the invocation of this function.
If parent_call is non-NULL, it must be a server-side call. It will be used
- to propagate properties from the server call to this new client call.
+ to propagate properties from the server call to this new client call.
*/
grpc_call *grpc_channel_create_call(grpc_channel *channel,
grpc_call *parent_call,
diff --git a/include/grpc/grpc_zookeeper.h b/include/grpc/grpc_zookeeper.h
index d1eeff3be5..2b195c18bf 100644
--- a/include/grpc/grpc_zookeeper.h
+++ b/include/grpc/grpc_zookeeper.h
@@ -31,6 +31,17 @@
*
*/
+/** Support zookeeper as alternative name system in addition to DNS
+ * Zookeeper name in gRPC is represented as a URI:
+ * zookeeper://host:port/path/service/instance
+ *
+ * Where zookeeper is the name system scheme
+ * host:port is the address of a zookeeper server
+ * /path/service/instance is the zookeeper name to be resolved
+ *
+ * Refer doc/naming.md for more details
+ */
+
#ifndef GRPC_GRPC_ZOOKEEPER_H
#define GRPC_GRPC_ZOOKEEPER_H
diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c
index 96c59eacf9..3a99a4228a 100644
--- a/src/core/client_config/resolvers/zookeeper_resolver.c
+++ b/src/core/client_config/resolvers/zookeeper_resolver.c
@@ -142,7 +142,8 @@ static void zookeeper_next(grpc_resolver *resolver,
gpr_mu_unlock(&r->mu);
}
-/** Zookeeper global watcher for connection management */
+/** Zookeeper global watcher for connection management
+ TODO: better connection management besides logs */
static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type,
int state, const char *path,
void *watcher_ctx) {
@@ -155,7 +156,8 @@ static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type,
}
}
-/** Zookeeper watcher for handling updates to watched nodes */
+/** Zookeeper watcher triggered by changes to watched nodes
+ Start to resolve again to get updated addresses */
static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
const char *path, void *watcher_ctx) {
if (watcher_ctx != NULL) {
@@ -170,6 +172,8 @@ static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state,
}
}
+/** Callback function after getting all resolved addresses
+ Create a subchannel for each address */
static void zookeeper_on_resolved(void *arg,
grpc_resolved_addresses *addresses) {
zookeeper_resolver *r = arg;
@@ -239,16 +243,18 @@ static void zookeeper_dns_resolved(void *arg,
}
}
-/** Parse json format address of a zookeeper node */
-static char *zookeeper_parse_address(char *buffer, int buffer_len) {
- const char *host;
- const char *port;
- char *address;
+/** Parse JSON format address of a zookeeper node */
+static char *zookeeper_parse_address(const char *value, int value_len) {
grpc_json *json;
grpc_json *cur;
+ const char *host;
+ const char *port;
+ char* buffer;
+ char *address = NULL;
- address = NULL;
- json = grpc_json_parse_string_with_len(buffer, buffer_len);
+ buffer = gpr_malloc(value_len);
+ memcpy(buffer, value, value_len);
+ json = grpc_json_parse_string_with_len(buffer, value_len);
if (json != NULL) {
host = NULL;
port = NULL;
@@ -270,6 +276,7 @@ static char *zookeeper_parse_address(char *buffer, int buffer_len) {
}
grpc_json_destroy(json);
}
+ gpr_free(buffer);
return address;
}
@@ -279,7 +286,6 @@ static void zookeeper_get_children_node_completion(int rc, const char *value,
const struct Stat *stat,
const void *arg) {
char *address = NULL;
- char *buffer = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
int resolve_done = 0;
@@ -288,10 +294,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value,
return;
}
- buffer = gpr_malloc(value_len);
- memcpy(buffer, value, value_len);
- address = zookeeper_parse_address(buffer, value_len);
- gpr_free(buffer);
+ address = zookeeper_parse_address(value, value_len);
if (address != NULL) {
/** Further resolve address by DNS */
grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r);
@@ -330,6 +333,8 @@ static void zookeeper_get_children_completion(
r->resolved_addrs->naddrs = 0;
r->resolved_total = children->count;
+ /** TODO: Replace expensive heap allocation and free with stack
+ if we can get maximum allowed length of zookeeper path */
for (i = 0; i < children->count; i++) {
gpr_asprintf(&path, "%s/%s", r->name, children->data[i]);
status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r,
@@ -347,7 +352,6 @@ static void zookeeper_get_node_completion(int rc, const char *value,
const void *arg) {
int status;
char *address = NULL;
- char *buffer = NULL;
zookeeper_resolver *r = (zookeeper_resolver *)arg;
r->resolved_addrs = NULL;
r->resolved_total = 0;
@@ -360,10 +364,7 @@ static void zookeeper_get_node_completion(int rc, const char *value,
/** If zookeeper node of path r->name does not have address
(i.e. service node), get its children */
- buffer = gpr_malloc(value_len);
- memcpy(buffer, value, value_len);
- address = zookeeper_parse_address(buffer, value_len);
- gpr_free(buffer);
+ address = zookeeper_parse_address(value, value_len);
if (address != NULL) {
r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses));
r->resolved_addrs->addrs = NULL;
@@ -428,22 +429,27 @@ static grpc_resolver *zookeeper_create(
size_t num_subchannels),
grpc_subchannel_factory *subchannel_factory) {
zookeeper_resolver *r;
- const char *path = uri->path;
+ size_t length;
+ char *path = uri->path;
if (0 == strcmp(uri->authority, "")) {
gpr_log(GPR_ERROR, "No authority specified in zookeeper uri");
return NULL;
}
+ /** Remove the trailing slash if exists */
+ length = strlen(path);
+ if (length > 1 && path[length - 1] == '/') {
+ path[length - 1] = 0;
+ }
+
r = gpr_malloc(sizeof(zookeeper_resolver));
memset(r, 0, sizeof(*r));
gpr_ref_init(&r->refs, 1);
gpr_mu_init(&r->mu);
grpc_resolver_init(&r->base, &zookeeper_resolver_vtable);
r->name = gpr_strdup(path);
- if (r->name[strlen(r->name) - 1] == '/') {
- r->name[strlen(r->name) - 1] = 0;
- }
+
r->subchannel_factory = subchannel_factory;
r->lb_policy_factory = lb_policy_factory;
grpc_subchannel_factory_ref(subchannel_factory);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 053878a87b..d9044549f2 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -52,6 +52,8 @@
#include "src/core/transport/chttp2_transport.h"
#include "src/core/transport/connectivity_state.h"
+#define MAX_PLUGINS 128
+
static gpr_once g_basic_init = GPR_ONCE_INIT;
static gpr_mu g_init_mu;
static int g_initializations;
@@ -63,40 +65,21 @@ static void do_basic_init(void) {
typedef struct grpc_plugin {
void (*init)();
- void (*deinit)();
- struct grpc_plugin *next;
+ void (*destroy)();
} grpc_plugin;
-static grpc_plugin *g_plugins_head = NULL;
-
-static grpc_plugin *new_plugin(void (*init)(void), void (*deinit)(void)) {
- grpc_plugin *plugin = gpr_malloc(sizeof(*plugin));
- memset(plugin, 0, sizeof(*plugin));
- plugin->init = init;
- plugin->deinit = deinit;
+static grpc_plugin g_all_of_the_plugins[MAX_PLUGINS];
+static int g_number_of_plugins = 0;
- return plugin;
-}
-
-void grpc_register_plugin(void (*init)(void), void (*deinit)(void)) {
- grpc_plugin *old_head = g_plugins_head;
- g_plugins_head = new_plugin(init, deinit);
- g_plugins_head->next = old_head;
-}
-
-void grpc_unregister_all_plugins() {
- grpc_plugin *plugin;
- grpc_plugin *next;
-
- for (plugin = g_plugins_head; plugin != NULL; plugin = next) {
- next = plugin->next;
- gpr_free(plugin);
- }
- g_plugins_head = NULL;
+void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) {
+ GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS);
+ g_all_of_the_plugins[g_number_of_plugins].init = init;
+ g_all_of_the_plugins[g_number_of_plugins].destroy = destroy;
+ g_number_of_plugins++;
}
void grpc_init(void) {
- grpc_plugin *plugin;
+ int i;
gpr_once_init(&g_basic_init, do_basic_init);
gpr_mu_lock(&g_init_mu);
@@ -125,9 +108,9 @@ void grpc_init(void) {
}
}
grpc_timers_global_init();
- for (plugin = g_plugins_head; plugin != NULL; plugin = plugin->next) {
- if (plugin->init) {
- plugin->init();
+ for (i = 0; i < g_number_of_plugins; i++) {
+ if (g_all_of_the_plugins[i].init != NULL) {
+ g_all_of_the_plugins[i].init();
}
}
}
@@ -135,9 +118,7 @@ void grpc_init(void) {
}
void grpc_shutdown(void) {
- grpc_plugin *plugin;
- grpc_plugin *next;
-
+ int i;
gpr_mu_lock(&g_init_mu);
if (--g_initializations == 0) {
grpc_iomgr_shutdown();
@@ -145,11 +126,10 @@ void grpc_shutdown(void) {
grpc_timers_global_destroy();
grpc_tracer_shutdown();
grpc_resolver_registry_shutdown();
- for (plugin = g_plugins_head; plugin != NULL; plugin = next) {
- if (plugin->deinit) {
- plugin->deinit();
+ for (i = 0; i < g_number_of_plugins; i++) {
+ if (g_all_of_the_plugins[i].destroy != NULL) {
+ g_all_of_the_plugins[i].destroy();
}
- next = plugin->next;
}
}
gpr_mu_unlock(&g_init_mu);
diff --git a/test/cpp/end2end/zookeeper_test.cc b/test/cpp/end2end/zookeeper_test.cc
index 27adcf9a96..a95ee95646 100644
--- a/test/cpp/end2end/zookeeper_test.cc
+++ b/test/cpp/end2end/zookeeper_test.cc
@@ -85,12 +85,12 @@ class ZookeeperTest : public ::testing::Test {
// Register service instance /test/1 in zookeeper
string value =
- "{\"host\":\"localhost\",\"port\":\"" + std::to_string(port1) + "\"}";
+ "{\"host\":\"localhost\",\"port\":\"" + to_string(port1) + "\"}";
RegisterService("/test/1", value);
// Register service instance /test/2 in zookeeper
value =
- "{\"host\":\"localhost\",\"port\":\"" + std::to_string(port2) + "\"}";
+ "{\"host\":\"localhost\",\"port\":\"" + to_string(port2) + "\"}";
RegisterService("/test/2", value);
}
@@ -115,13 +115,10 @@ class ZookeeperTest : public ::testing::Test {
// Register zookeeper name resolver in grpc
grpc_zookeeper_register();
-
- // Unregister all plugins when exit
- atexit(grpc_unregister_all_plugins);
}
- std::unique_ptr<Server> SetUpServer(int port) {
- string server_address = "localhost:" + std::to_string(port);
+ std::unique_ptr<Server> SetUpServer(const int port) {
+ string server_address = "localhost:" + to_string(port);
ServerBuilder builder;
builder.AddListeningPort(server_address, InsecureServerCredentials());
@@ -130,7 +127,7 @@ class ZookeeperTest : public ::testing::Test {
return server;
}
- void RegisterService(string name, string value) {
+ void RegisterService(const string& name, const string& value) {
char* path = (char*)gpr_malloc(name.size());
int status = zoo_exists(zookeeper_handle_, name.c_str(), 0, NULL);
@@ -146,11 +143,16 @@ class ZookeeperTest : public ::testing::Test {
GPR_ASSERT(status == 0);
}
- void DeleteService(string name) {
+ void DeleteService(const string& name) {
int status = zoo_delete(zookeeper_handle_, name.c_str(), -1);
GPR_ASSERT(status == 0);
}
+ void ChangeZookeeperState() {
+ server1_->Shutdown();
+ DeleteService("/test/1");
+ }
+
void TearDown() GRPC_OVERRIDE {
server1_->Shutdown();
server2_->Shutdown();
@@ -163,6 +165,12 @@ class ZookeeperTest : public ::testing::Test {
stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_));
}
+ string to_string(const int number) {
+ std::stringstream strs;
+ strs << number;
+ return strs.str();
+ }
+
std::shared_ptr<ChannelInterface> channel_;
std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
std::unique_ptr<Server> server1_;
@@ -173,7 +181,7 @@ class ZookeeperTest : public ::testing::Test {
};
// Test zookeeper state change between two RPCs
-// TODO(ctiller): leaked objects
+// TODO(ctiller): leaked objects in this test
TEST_F(ZookeeperTest, ZookeeperStateChangeTwoRpc) {
ResetStub();
@@ -188,8 +196,10 @@ TEST_F(ZookeeperTest, ZookeeperStateChangeTwoRpc) {
EXPECT_TRUE(s1.ok());
// Zookeeper state change
- DeleteService("/test/1");
- gpr_log(GPR_DEBUG, "Zookeeper state change");
+ gpr_log(GPR_DEBUG, "Zookeeper state change");
+ ChangeZookeeperState();
+ // Wait for re-resolving addresses
+ // TODO(ctiller): RPC will probably fail if not waiting
sleep(1);
// Second RPC