diff options
author | Hongwei Wang <hongweiw@google.com> | 2015-08-13 16:13:10 -0700 |
---|---|---|
committer | Hongwei Wang <hongweiw@google.com> | 2015-08-13 16:13:10 -0700 |
commit | 85ad685e5771d53d3f2ca667995498efe4dff1dd (patch) | |
tree | 83aad9a8b606898df1c1fbbe7cb0be3d04ecd14c | |
parent | ff6097ac49277841e2cedde3268d85aa5e3a5fcf (diff) |
Address comments and update grpc plugins
-rw-r--r-- | include/grpc/grpc.h | 23 | ||||
-rw-r--r-- | include/grpc/grpc_zookeeper.h | 11 | ||||
-rw-r--r-- | src/core/client_config/resolvers/zookeeper_resolver.c | 52 | ||||
-rw-r--r-- | src/core/surface/init.c | 56 | ||||
-rw-r--r-- | test/cpp/end2end/zookeeper_test.cc | 34 |
5 files changed, 93 insertions, 83 deletions
diff --git a/include/grpc/grpc.h b/include/grpc/grpc.h index 943982554a..fe4d9f929f 100644 --- a/include/grpc/grpc.h +++ b/include/grpc/grpc.h @@ -355,12 +355,15 @@ typedef struct grpc_op { } data; } grpc_op; +/** Registers a plugin to be initialized and destroyed with the library. -/** Registers a plugin to be initialized and deinitialized with the library. - - It is safe to pass NULL to either argument. The initialization and - deinitialization order isn't guaranteed. */ -void grpc_register_plugin(void (*init)(void), void (*deinit)(void)); + The \a init and \a destroy functions will be invoked as part of + \a grpc_init() and \a grpc_shutdown(), respectively. + Note that these functions can be invoked an arbitrary number of times + (and hence so will \a init and \a destroy). + It is safe to pass NULL to either argument. Plugins are destroyed in + the reverse order they were initialized. */ +void grpc_register_plugin(void (*init)(void), void (*destroy)(void)); /** Frees the memory used by all the plugin information. @@ -381,8 +384,8 @@ void grpc_unregister_all_plugins(); /* Default propagation mask: clients of the core API are encouraged to encode deltas from this in their implementations... ie write: - GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline - propagation. Doing so gives flexibility in the future to define new + GRPC_PROPAGATE_DEFAULTS & ~GRPC_PROPAGATE_DEADLINE to disable deadline + propagation. Doing so gives flexibility in the future to define new propagation types that are default inherited or not. */ #define GRPC_PROPAGATE_DEFAULTS \ ((gpr_uint32)(( \ @@ -429,8 +432,8 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cq, otherwise a grpc_event describing the event that occurred. Callers must not call grpc_completion_queue_next and - grpc_completion_queue_pluck simultaneously on the same completion queue. - + grpc_completion_queue_pluck simultaneously on the same completion queue. + Completion queues support a maximum of GRPC_MAX_COMPLETION_QUEUE_PLUCKERS concurrently executing plucks at any time. */ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cq, void *tag, @@ -470,7 +473,7 @@ void grpc_channel_watch_connectivity_state( completions are sent to 'completion_queue'. 'method' and 'host' need only live through the invocation of this function. If parent_call is non-NULL, it must be a server-side call. It will be used - to propagate properties from the server call to this new client call. + to propagate properties from the server call to this new client call. */ grpc_call *grpc_channel_create_call(grpc_channel *channel, grpc_call *parent_call, diff --git a/include/grpc/grpc_zookeeper.h b/include/grpc/grpc_zookeeper.h index d1eeff3be5..2b195c18bf 100644 --- a/include/grpc/grpc_zookeeper.h +++ b/include/grpc/grpc_zookeeper.h @@ -31,6 +31,17 @@ * */ +/** Support zookeeper as alternative name system in addition to DNS + * Zookeeper name in gRPC is represented as a URI: + * zookeeper://host:port/path/service/instance + * + * Where zookeeper is the name system scheme + * host:port is the address of a zookeeper server + * /path/service/instance is the zookeeper name to be resolved + * + * Refer doc/naming.md for more details + */ + #ifndef GRPC_GRPC_ZOOKEEPER_H #define GRPC_GRPC_ZOOKEEPER_H diff --git a/src/core/client_config/resolvers/zookeeper_resolver.c b/src/core/client_config/resolvers/zookeeper_resolver.c index 96c59eacf9..3a99a4228a 100644 --- a/src/core/client_config/resolvers/zookeeper_resolver.c +++ b/src/core/client_config/resolvers/zookeeper_resolver.c @@ -142,7 +142,8 @@ static void zookeeper_next(grpc_resolver *resolver, gpr_mu_unlock(&r->mu); } -/** Zookeeper global watcher for connection management */ +/** Zookeeper global watcher for connection management + TODO: better connection management besides logs */ static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type, int state, const char *path, void *watcher_ctx) { @@ -155,7 +156,8 @@ static void zookeeper_global_watcher(zhandle_t *zookeeper_handle, int type, } } -/** Zookeeper watcher for handling updates to watched nodes */ +/** Zookeeper watcher triggered by changes to watched nodes + Start to resolve again to get updated addresses */ static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, const char *path, void *watcher_ctx) { if (watcher_ctx != NULL) { @@ -170,6 +172,8 @@ static void zookeeper_watcher(zhandle_t *zookeeper_handle, int type, int state, } } +/** Callback function after getting all resolved addresses + Create a subchannel for each address */ static void zookeeper_on_resolved(void *arg, grpc_resolved_addresses *addresses) { zookeeper_resolver *r = arg; @@ -239,16 +243,18 @@ static void zookeeper_dns_resolved(void *arg, } } -/** Parse json format address of a zookeeper node */ -static char *zookeeper_parse_address(char *buffer, int buffer_len) { - const char *host; - const char *port; - char *address; +/** Parse JSON format address of a zookeeper node */ +static char *zookeeper_parse_address(const char *value, int value_len) { grpc_json *json; grpc_json *cur; + const char *host; + const char *port; + char* buffer; + char *address = NULL; - address = NULL; - json = grpc_json_parse_string_with_len(buffer, buffer_len); + buffer = gpr_malloc(value_len); + memcpy(buffer, value, value_len); + json = grpc_json_parse_string_with_len(buffer, value_len); if (json != NULL) { host = NULL; port = NULL; @@ -270,6 +276,7 @@ static char *zookeeper_parse_address(char *buffer, int buffer_len) { } grpc_json_destroy(json); } + gpr_free(buffer); return address; } @@ -279,7 +286,6 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, const struct Stat *stat, const void *arg) { char *address = NULL; - char *buffer = NULL; zookeeper_resolver *r = (zookeeper_resolver *)arg; int resolve_done = 0; @@ -288,10 +294,7 @@ static void zookeeper_get_children_node_completion(int rc, const char *value, return; } - buffer = gpr_malloc(value_len); - memcpy(buffer, value, value_len); - address = zookeeper_parse_address(buffer, value_len); - gpr_free(buffer); + address = zookeeper_parse_address(value, value_len); if (address != NULL) { /** Further resolve address by DNS */ grpc_resolve_address(address, NULL, zookeeper_dns_resolved, r); @@ -330,6 +333,8 @@ static void zookeeper_get_children_completion( r->resolved_addrs->naddrs = 0; r->resolved_total = children->count; + /** TODO: Replace expensive heap allocation and free with stack + if we can get maximum allowed length of zookeeper path */ for (i = 0; i < children->count; i++) { gpr_asprintf(&path, "%s/%s", r->name, children->data[i]); status = zoo_awget(r->zookeeper_handle, path, zookeeper_watcher, r, @@ -347,7 +352,6 @@ static void zookeeper_get_node_completion(int rc, const char *value, const void *arg) { int status; char *address = NULL; - char *buffer = NULL; zookeeper_resolver *r = (zookeeper_resolver *)arg; r->resolved_addrs = NULL; r->resolved_total = 0; @@ -360,10 +364,7 @@ static void zookeeper_get_node_completion(int rc, const char *value, /** If zookeeper node of path r->name does not have address (i.e. service node), get its children */ - buffer = gpr_malloc(value_len); - memcpy(buffer, value, value_len); - address = zookeeper_parse_address(buffer, value_len); - gpr_free(buffer); + address = zookeeper_parse_address(value, value_len); if (address != NULL) { r->resolved_addrs = gpr_malloc(sizeof(grpc_resolved_addresses)); r->resolved_addrs->addrs = NULL; @@ -428,22 +429,27 @@ static grpc_resolver *zookeeper_create( size_t num_subchannels), grpc_subchannel_factory *subchannel_factory) { zookeeper_resolver *r; - const char *path = uri->path; + size_t length; + char *path = uri->path; if (0 == strcmp(uri->authority, "")) { gpr_log(GPR_ERROR, "No authority specified in zookeeper uri"); return NULL; } + /** Remove the trailing slash if exists */ + length = strlen(path); + if (length > 1 && path[length - 1] == '/') { + path[length - 1] = 0; + } + r = gpr_malloc(sizeof(zookeeper_resolver)); memset(r, 0, sizeof(*r)); gpr_ref_init(&r->refs, 1); gpr_mu_init(&r->mu); grpc_resolver_init(&r->base, &zookeeper_resolver_vtable); r->name = gpr_strdup(path); - if (r->name[strlen(r->name) - 1] == '/') { - r->name[strlen(r->name) - 1] = 0; - } + r->subchannel_factory = subchannel_factory; r->lb_policy_factory = lb_policy_factory; grpc_subchannel_factory_ref(subchannel_factory); diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 053878a87b..d9044549f2 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -52,6 +52,8 @@ #include "src/core/transport/chttp2_transport.h" #include "src/core/transport/connectivity_state.h" +#define MAX_PLUGINS 128 + static gpr_once g_basic_init = GPR_ONCE_INIT; static gpr_mu g_init_mu; static int g_initializations; @@ -63,40 +65,21 @@ static void do_basic_init(void) { typedef struct grpc_plugin { void (*init)(); - void (*deinit)(); - struct grpc_plugin *next; + void (*destroy)(); } grpc_plugin; -static grpc_plugin *g_plugins_head = NULL; - -static grpc_plugin *new_plugin(void (*init)(void), void (*deinit)(void)) { - grpc_plugin *plugin = gpr_malloc(sizeof(*plugin)); - memset(plugin, 0, sizeof(*plugin)); - plugin->init = init; - plugin->deinit = deinit; +static grpc_plugin g_all_of_the_plugins[MAX_PLUGINS]; +static int g_number_of_plugins = 0; - return plugin; -} - -void grpc_register_plugin(void (*init)(void), void (*deinit)(void)) { - grpc_plugin *old_head = g_plugins_head; - g_plugins_head = new_plugin(init, deinit); - g_plugins_head->next = old_head; -} - -void grpc_unregister_all_plugins() { - grpc_plugin *plugin; - grpc_plugin *next; - - for (plugin = g_plugins_head; plugin != NULL; plugin = next) { - next = plugin->next; - gpr_free(plugin); - } - g_plugins_head = NULL; +void grpc_register_plugin(void (*init)(void), void (*destroy)(void)) { + GPR_ASSERT(g_number_of_plugins != MAX_PLUGINS); + g_all_of_the_plugins[g_number_of_plugins].init = init; + g_all_of_the_plugins[g_number_of_plugins].destroy = destroy; + g_number_of_plugins++; } void grpc_init(void) { - grpc_plugin *plugin; + int i; gpr_once_init(&g_basic_init, do_basic_init); gpr_mu_lock(&g_init_mu); @@ -125,9 +108,9 @@ void grpc_init(void) { } } grpc_timers_global_init(); - for (plugin = g_plugins_head; plugin != NULL; plugin = plugin->next) { - if (plugin->init) { - plugin->init(); + for (i = 0; i < g_number_of_plugins; i++) { + if (g_all_of_the_plugins[i].init != NULL) { + g_all_of_the_plugins[i].init(); } } } @@ -135,9 +118,7 @@ void grpc_init(void) { } void grpc_shutdown(void) { - grpc_plugin *plugin; - grpc_plugin *next; - + int i; gpr_mu_lock(&g_init_mu); if (--g_initializations == 0) { grpc_iomgr_shutdown(); @@ -145,11 +126,10 @@ void grpc_shutdown(void) { grpc_timers_global_destroy(); grpc_tracer_shutdown(); grpc_resolver_registry_shutdown(); - for (plugin = g_plugins_head; plugin != NULL; plugin = next) { - if (plugin->deinit) { - plugin->deinit(); + for (i = 0; i < g_number_of_plugins; i++) { + if (g_all_of_the_plugins[i].destroy != NULL) { + g_all_of_the_plugins[i].destroy(); } - next = plugin->next; } } gpr_mu_unlock(&g_init_mu); diff --git a/test/cpp/end2end/zookeeper_test.cc b/test/cpp/end2end/zookeeper_test.cc index 27adcf9a96..a95ee95646 100644 --- a/test/cpp/end2end/zookeeper_test.cc +++ b/test/cpp/end2end/zookeeper_test.cc @@ -85,12 +85,12 @@ class ZookeeperTest : public ::testing::Test { // Register service instance /test/1 in zookeeper string value = - "{\"host\":\"localhost\",\"port\":\"" + std::to_string(port1) + "\"}"; + "{\"host\":\"localhost\",\"port\":\"" + to_string(port1) + "\"}"; RegisterService("/test/1", value); // Register service instance /test/2 in zookeeper value = - "{\"host\":\"localhost\",\"port\":\"" + std::to_string(port2) + "\"}"; + "{\"host\":\"localhost\",\"port\":\"" + to_string(port2) + "\"}"; RegisterService("/test/2", value); } @@ -115,13 +115,10 @@ class ZookeeperTest : public ::testing::Test { // Register zookeeper name resolver in grpc grpc_zookeeper_register(); - - // Unregister all plugins when exit - atexit(grpc_unregister_all_plugins); } - std::unique_ptr<Server> SetUpServer(int port) { - string server_address = "localhost:" + std::to_string(port); + std::unique_ptr<Server> SetUpServer(const int port) { + string server_address = "localhost:" + to_string(port); ServerBuilder builder; builder.AddListeningPort(server_address, InsecureServerCredentials()); @@ -130,7 +127,7 @@ class ZookeeperTest : public ::testing::Test { return server; } - void RegisterService(string name, string value) { + void RegisterService(const string& name, const string& value) { char* path = (char*)gpr_malloc(name.size()); int status = zoo_exists(zookeeper_handle_, name.c_str(), 0, NULL); @@ -146,11 +143,16 @@ class ZookeeperTest : public ::testing::Test { GPR_ASSERT(status == 0); } - void DeleteService(string name) { + void DeleteService(const string& name) { int status = zoo_delete(zookeeper_handle_, name.c_str(), -1); GPR_ASSERT(status == 0); } + void ChangeZookeeperState() { + server1_->Shutdown(); + DeleteService("/test/1"); + } + void TearDown() GRPC_OVERRIDE { server1_->Shutdown(); server2_->Shutdown(); @@ -163,6 +165,12 @@ class ZookeeperTest : public ::testing::Test { stub_ = std::move(grpc::cpp::test::util::TestService::NewStub(channel_)); } + string to_string(const int number) { + std::stringstream strs; + strs << number; + return strs.str(); + } + std::shared_ptr<ChannelInterface> channel_; std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_; std::unique_ptr<Server> server1_; @@ -173,7 +181,7 @@ class ZookeeperTest : public ::testing::Test { }; // Test zookeeper state change between two RPCs -// TODO(ctiller): leaked objects +// TODO(ctiller): leaked objects in this test TEST_F(ZookeeperTest, ZookeeperStateChangeTwoRpc) { ResetStub(); @@ -188,8 +196,10 @@ TEST_F(ZookeeperTest, ZookeeperStateChangeTwoRpc) { EXPECT_TRUE(s1.ok()); // Zookeeper state change - DeleteService("/test/1"); - gpr_log(GPR_DEBUG, "Zookeeper state change"); + gpr_log(GPR_DEBUG, "Zookeeper state change"); + ChangeZookeeperState(); + // Wait for re-resolving addresses + // TODO(ctiller): RPC will probably fail if not waiting sleep(1); // Second RPC |