diff options
author | Vijay Pai <vpai@google.com> | 2016-02-25 18:10:24 -0800 |
---|---|---|
committer | Vijay Pai <vpai@google.com> | 2016-02-25 18:10:24 -0800 |
commit | e9ef53645150f7a0400a5f9be770eb2b4ba335b5 (patch) | |
tree | 34a8b7094b851589eec500b2ca419b64c9051d89 /test | |
parent | e3cd25684051de21e4b1ba93173c06e72fa5ffad (diff) |
Revert "Add an implementation firewall against pollset_set"
Diffstat (limited to 'test')
24 files changed, 361 insertions, 432 deletions
diff --git a/test/core/client_config/set_initial_connect_string_test.c b/test/core/client_config/set_initial_connect_string_test.c index 3cf267fb3b..bcd1f26123 100644 --- a/test/core/client_config/set_initial_connect_string_test.c +++ b/test/core/client_config/set_initial_connect_string_test.c @@ -85,7 +85,7 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, gpr_slice_buffer_init(&state.incoming_buffer); gpr_slice_buffer_init(&state.temp_incoming_buffer); state.tcp = tcp; - grpc_endpoint_add_to_pollset(exec_ctx, tcp, server->pollset); + grpc_endpoint_add_to_pollset(exec_ctx, tcp, &server->pollset); grpc_endpoint_read(exec_ctx, tcp, &state.temp_incoming_buffer, &on_read); } diff --git a/test/core/end2end/fixtures/h2_full+poll+pipe.c b/test/core/end2end/fixtures/h2_full+poll+pipe.c index 682598fbe2..d475a7bb55 100644 --- a/test/core/end2end/fixtures/h2_full+poll+pipe.c +++ b/test/core/end2end/fixtures/h2_full+poll+pipe.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,23 +35,21 @@ #include <string.h> -#include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> -#include <grpc/support/log.h> -#include <grpc/support/sync.h> -#include <grpc/support/thd.h> -#include <grpc/support/useful.h> - #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_server_filter.h" -#include "src/core/iomgr/pollset_posix.h" -#include "src/core/iomgr/wakeup_fd_posix.h" #include "src/core/surface/channel.h" #include "src/core/surface/server.h" #include "src/core/transport/chttp2_transport.h" +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#include "src/core/iomgr/wakeup_fd_posix.h" typedef struct fullstack_fixture_data { char *localaddr; diff --git a/test/core/end2end/fixtures/h2_full+poll.c b/test/core/end2end/fixtures/h2_full+poll.c index 5a0b2ef495..3f5e6096f6 100644 --- a/test/core/end2end/fixtures/h2_full+poll.c +++ b/test/core/end2end/fixtures/h2_full+poll.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -35,20 +35,18 @@ #include <string.h> -#include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> -#include <grpc/support/log.h> -#include <grpc/support/sync.h> -#include <grpc/support/thd.h> -#include <grpc/support/useful.h> - #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_server_filter.h" -#include "src/core/iomgr/pollset_posix.h" #include "src/core/surface/channel.h" #include "src/core/surface/server.h" #include "src/core/transport/chttp2_transport.h" +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "test/core/util/port.h" #include "test/core/util/test_config.h" diff --git a/test/core/end2end/fixtures/h2_ssl+poll.c b/test/core/end2end/fixtures/h2_ssl+poll.c index 66268c77d5..0b88876d13 100644 --- a/test/core/end2end/fixtures/h2_ssl+poll.c +++ b/test/core/end2end/fixtures/h2_ssl+poll.c @@ -41,7 +41,6 @@ #include <grpc/support/log.h> #include "src/core/channel/channel_args.h" -#include "src/core/iomgr/pollset_posix.h" #include "src/core/security/credentials.h" #include "src/core/support/env.h" #include "src/core/support/tmpfile.h" diff --git a/test/core/end2end/fixtures/h2_uchannel.c b/test/core/end2end/fixtures/h2_uchannel.c index 87bbd64d09..33055561cb 100644 --- a/test/core/end2end/fixtures/h2_uchannel.c +++ b/test/core/end2end/fixtures/h2_uchannel.c @@ -35,13 +35,6 @@ #include <string.h> -#include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/sync.h> -#include <grpc/support/thd.h> -#include <grpc/support/useful.h> #include "src/core/channel/channel_args.h" #include "src/core/channel/client_channel.h" #include "src/core/channel/client_uchannel.h" @@ -53,6 +46,13 @@ #include "src/core/surface/channel.h" #include "src/core/surface/server.h" #include "src/core/transport/chttp2_transport.h" +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "test/core/util/port.h" #include "test/core/util/test_config.h" @@ -238,12 +238,12 @@ static grpc_end2end_test_fixture chttp2_create_fixture_micro_fullstack( } grpc_connectivity_state g_state = GRPC_CHANNEL_IDLE; -grpc_pollset_set *g_interested_parties; +grpc_pollset_set g_interested_parties; static void state_changed(grpc_exec_ctx *exec_ctx, void *arg, bool success) { if (g_state != GRPC_CHANNEL_READY) { grpc_subchannel_notify_on_state_change( - exec_ctx, arg, g_interested_parties, &g_state, + exec_ctx, arg, &g_interested_parties, &g_state, grpc_closure_create(state_changed, arg)); } } @@ -253,31 +253,30 @@ static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *arg, bool success) { } static grpc_connected_subchannel *connect_subchannel(grpc_subchannel *c) { - gpr_mu *mu; - grpc_pollset *pollset = gpr_malloc(grpc_pollset_size()); + grpc_pollset pollset; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_pollset_init(pollset, &mu); - g_interested_parties = grpc_pollset_set_create(); - grpc_pollset_set_add_pollset(&exec_ctx, g_interested_parties, pollset); - grpc_subchannel_notify_on_state_change(&exec_ctx, c, g_interested_parties, + grpc_pollset_init(&pollset); + grpc_pollset_set_init(&g_interested_parties); + grpc_pollset_set_add_pollset(&exec_ctx, &g_interested_parties, &pollset); + grpc_subchannel_notify_on_state_change(&exec_ctx, c, &g_interested_parties, &g_state, grpc_closure_create(state_changed, c)); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(mu); + gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); while (g_state != GRPC_CHANNEL_READY) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + grpc_pollset_work(&exec_ctx, &pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); - gpr_mu_unlock(mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(mu); + gpr_mu_lock(GRPC_POLLSET_MU(&pollset)); } - grpc_pollset_shutdown(&exec_ctx, pollset, - grpc_closure_create(destroy_pollset, pollset)); - grpc_pollset_set_destroy(g_interested_parties); - gpr_mu_unlock(mu); + grpc_pollset_shutdown(&exec_ctx, &pollset, + grpc_closure_create(destroy_pollset, &pollset)); + grpc_pollset_set_destroy(&g_interested_parties); + gpr_mu_unlock(GRPC_POLLSET_MU(&pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_free(pollset); return grpc_subchannel_get_connected_subchannel(c); } diff --git a/test/core/end2end/fixtures/h2_uds+poll.c b/test/core/end2end/fixtures/h2_uds+poll.c index c3a855ff88..155017c887 100644 --- a/test/core/end2end/fixtures/h2_uds+poll.c +++ b/test/core/end2end/fixtures/h2_uds+poll.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -37,22 +37,20 @@ #include <string.h> #include <unistd.h> -#include <grpc/support/alloc.h> -#include <grpc/support/host_port.h> -#include <grpc/support/log.h> -#include <grpc/support/string_util.h> -#include <grpc/support/sync.h> -#include <grpc/support/thd.h> -#include <grpc/support/useful.h> - #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_server_filter.h" -#include "src/core/iomgr/pollset_posix.h" #include "src/core/support/string.h" #include "src/core/surface/channel.h" #include "src/core/surface/server.h" #include "src/core/transport/chttp2_transport.h" +#include <grpc/support/alloc.h> +#include <grpc/support/host_port.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/sync.h> +#include <grpc/support/thd.h> +#include <grpc/support/useful.h> #include "test/core/util/port.h" #include "test/core/util/test_config.h" diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c index da1463329d..e954a920b0 100644 --- a/test/core/httpcli/httpcli_test.c +++ b/test/core/httpcli/httpcli_test.c @@ -36,19 +36,18 @@ #include <string.h> #include <grpc/grpc.h> +#include "src/core/iomgr/iomgr.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/subprocess.h> #include <grpc/support/sync.h> -#include "src/core/iomgr/iomgr.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" static int g_done = 0; static grpc_httpcli_context g_context; -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; +static grpc_pollset g_pollset; static gpr_timespec n_seconds_time(int seconds) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(seconds); @@ -64,10 +63,10 @@ static void on_finish(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(response->status == 200); GPR_ASSERT(response->body_length == strlen(expect)); GPR_ASSERT(0 == memcmp(expect, response->body, response->body_length)); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); g_done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } static void test_get(int port) { @@ -86,18 +85,18 @@ static void test_get(int port) { req.path = "/get"; req.handshaker = &grpc_httpcli_plaintext; - grpc_httpcli_get(&exec_ctx, &g_context, g_pollset, &req, n_seconds_time(15), + grpc_httpcli_get(&exec_ctx, &g_context, &g_pollset, &req, n_seconds_time(15), on_finish, (void *)42); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_free(host); } @@ -117,18 +116,18 @@ static void test_post(int port) { req.path = "/post"; req.handshaker = &grpc_httpcli_plaintext; - grpc_httpcli_post(&exec_ctx, &g_context, g_pollset, &req, "hello", 5, + grpc_httpcli_post(&exec_ctx, &g_context, &g_pollset, &req, "hello", 5, n_seconds_time(15), on_finish, (void *)42); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_free(host); } @@ -176,20 +175,17 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); grpc_httpcli_context_init(&g_context); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); + grpc_pollset_init(&g_pollset); test_get(port); test_post(port); grpc_httpcli_context_destroy(&g_context); - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); - gpr_free(g_pollset); - gpr_subprocess_destroy(server); return 0; diff --git a/test/core/httpcli/httpscli_test.c b/test/core/httpcli/httpscli_test.c index 7f765bc614..9f31eae278 100644 --- a/test/core/httpcli/httpscli_test.c +++ b/test/core/httpcli/httpscli_test.c @@ -36,19 +36,18 @@ #include <string.h> #include <grpc/grpc.h> +#include "src/core/iomgr/iomgr.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/string_util.h> #include <grpc/support/subprocess.h> #include <grpc/support/sync.h> -#include "src/core/iomgr/iomgr.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" static int g_done = 0; static grpc_httpcli_context g_context; -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; +static grpc_pollset g_pollset; static gpr_timespec n_seconds_time(int seconds) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(seconds); @@ -64,10 +63,10 @@ static void on_finish(grpc_exec_ctx *exec_ctx, void *arg, GPR_ASSERT(response->status == 200); GPR_ASSERT(response->body_length == strlen(expect)); GPR_ASSERT(0 == memcmp(expect, response->body, response->body_length)); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); g_done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } static void test_get(int port) { @@ -87,18 +86,18 @@ static void test_get(int port) { req.path = "/get"; req.handshaker = &grpc_httpcli_ssl; - grpc_httpcli_get(&exec_ctx, &g_context, g_pollset, &req, n_seconds_time(15), + grpc_httpcli_get(&exec_ctx, &g_context, &g_pollset, &req, n_seconds_time(15), on_finish, (void *)42); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_free(host); } @@ -119,18 +118,18 @@ static void test_post(int port) { req.path = "/post"; req.handshaker = &grpc_httpcli_ssl; - grpc_httpcli_post(&exec_ctx, &g_context, g_pollset, &req, "hello", 5, + grpc_httpcli_post(&exec_ctx, &g_context, &g_pollset, &req, "hello", 5, n_seconds_time(15), on_finish, (void *)42); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), n_seconds_time(20)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_free(host); } @@ -179,20 +178,17 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); grpc_httpcli_context_init(&g_context); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); + grpc_pollset_init(&g_pollset); test_get(port); test_post(port); grpc_httpcli_context_destroy(&g_context); - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); - gpr_free(g_pollset); - gpr_subprocess_destroy(server); return 0; diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c index c3a91088a5..7e266ebfb9 100644 --- a/test/core/iomgr/endpoint_pair_test.c +++ b/test/core/iomgr/endpoint_pair_test.c @@ -39,11 +39,10 @@ #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "src/core/iomgr/endpoint_pair.h" -#include "test/core/iomgr/endpoint_tests.h" #include "test/core/util/test_config.h" +#include "test/core/iomgr/endpoint_tests.h" -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; +static grpc_pollset g_pollset; static void clean_up(void) {} @@ -55,8 +54,8 @@ static grpc_endpoint_test_fixture create_fixture_endpoint_pair( f.client_ep = p.client; f.server_ep = p.server; - grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); - grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset); grpc_exec_ctx_finish(&exec_ctx); return f; @@ -75,14 +74,12 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); - grpc_endpoint_tests(configs[0], g_pollset, g_mu); - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_pollset_init(&g_pollset); + grpc_endpoint_tests(configs[0], &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); - gpr_free(g_pollset); return 0; } diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index f689e4ba7f..6ba67df3b1 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -36,8 +36,8 @@ #include <sys/types.h> #include <grpc/support/alloc.h> -#include <grpc/support/log.h> #include <grpc/support/slice.h> +#include <grpc/support/log.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> #include "test/core/util/test_config.h" @@ -58,7 +58,6 @@ */ -static gpr_mu *g_mu; static grpc_pollset *g_pollset; size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) { @@ -135,10 +134,10 @@ static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, state->incoming.slices, state->incoming.count, &state->current_read_data); if (state->bytes_read == state->target_bytes || !success) { gpr_log(GPR_INFO, "Read handler done"); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); state->read_done = 1 + success; grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } else if (success) { grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, &state->done_read); @@ -170,10 +169,10 @@ static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, } gpr_log(GPR_INFO, "Write handler done"); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); state->write_done = 1 + success; grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } /* Do both reading and writing using the grpc_endpoint API. @@ -233,14 +232,14 @@ static void read_and_write_test(grpc_endpoint_test_config config, } grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); while (!state.read_done || !state.write_done) { grpc_pollset_worker *worker = NULL; GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); grpc_exec_ctx_finish(&exec_ctx); end_test(config); @@ -252,10 +251,9 @@ static void read_and_write_test(grpc_endpoint_test_config config, } void grpc_endpoint_tests(grpc_endpoint_test_config config, - grpc_pollset *pollset, gpr_mu *mu) { + grpc_pollset *pollset) { size_t i; g_pollset = pollset; - g_mu = mu; read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 100000000, 100000, 1, 1); diff --git a/test/core/iomgr/endpoint_tests.h b/test/core/iomgr/endpoint_tests.h index 8ea47e345c..700f854891 100644 --- a/test/core/iomgr/endpoint_tests.h +++ b/test/core/iomgr/endpoint_tests.h @@ -1,6 +1,6 @@ /* * - * Copyright 2015-2016, Google Inc. + * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -53,6 +53,6 @@ struct grpc_endpoint_test_config { }; void grpc_endpoint_tests(grpc_endpoint_test_config config, - grpc_pollset *pollset, gpr_mu *mu); + grpc_pollset *pollset); #endif /* GRPC_TEST_CORE_IOMGR_ENDPOINT_TESTS_H */ diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 99689ebcc3..07761b6576 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -49,12 +49,9 @@ #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> - -#include "src/core/iomgr/pollset_posix.h" #include "test/core/util/test_config.h" -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; +static grpc_pollset g_pollset; /* buffer size used to send and receive data. 1024 is the minimal value to set TCP send and receive buffer. */ @@ -182,10 +179,10 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */, grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, NULL, "b"); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); sv->done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Called when a new TCP connection request arrives in the listening port. */ @@ -212,7 +209,7 @@ static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/ se = gpr_malloc(sizeof(*se)); se->sv = sv; se->em_fd = grpc_fd_create(fd, "listener"); - grpc_pollset_add_fd(exec_ctx, g_pollset, se->em_fd); + grpc_pollset_add_fd(exec_ctx, &g_pollset, se->em_fd); se->session_read_closure.cb = session_read_cb; se->session_read_closure.cb_arg = se; grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure); @@ -241,7 +238,7 @@ static int server_start(grpc_exec_ctx *exec_ctx, server *sv) { GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); sv->em_fd = grpc_fd_create(fd, "server"); - grpc_pollset_add_fd(exec_ctx, g_pollset, sv->em_fd); + grpc_pollset_add_fd(exec_ctx, &g_pollset, sv->em_fd); /* Register to be interested in reading from listen_fd. */ sv->listen_closure.cb = listen_cb; sv->listen_closure.cb_arg = sv; @@ -252,18 +249,18 @@ static int server_start(grpc_exec_ctx *exec_ctx, server *sv) { /* Wait and shutdown a sever. */ static void server_wait_and_shutdown(server *sv) { - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!sv->done) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* ===An upload client to test notify_on_write=== */ @@ -299,7 +296,7 @@ static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx, client *cl = arg; grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c"); cl->done = 1; - grpc_pollset_kick(g_pollset, NULL); + grpc_pollset_kick(&g_pollset, NULL); } /* Write as much as possible, then register notify_on_write. */ @@ -310,9 +307,9 @@ static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */ ssize_t write_once = 0; if (!success) { - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); client_session_shutdown_cb(exec_ctx, arg, 1); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); return; } @@ -322,7 +319,7 @@ static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */ } while (write_once > 0); if (errno == EAGAIN) { - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { cl->write_closure.cb = client_session_write; cl->write_closure.cb_arg = cl; @@ -331,7 +328,7 @@ static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */ } else { client_session_shutdown_cb(exec_ctx, arg, 1); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno)); abort(); @@ -360,25 +357,25 @@ static void client_start(grpc_exec_ctx *exec_ctx, client *cl, int port) { } cl->em_fd = grpc_fd_create(fd, "client"); - grpc_pollset_add_fd(exec_ctx, g_pollset, cl->em_fd); + grpc_pollset_add_fd(exec_ctx, &g_pollset, cl->em_fd); client_session_write(exec_ctx, cl, 1); } /* Wait for the signal to shutdown a client. */ static void client_wait_and_shutdown(client *cl) { - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!cl->done) { grpc_pollset_worker *worker = NULL; grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Test grpc_fd. Start an upload server and client, upload a stream of @@ -413,20 +410,20 @@ static void first_read_callback(grpc_exec_ctx *exec_ctx, void *arg /* fd_change_data */, bool success) { fd_change_data *fdc = arg; - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); fdc->cb_that_ran = first_read_callback; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } static void second_read_callback(grpc_exec_ctx *exec_ctx, void *arg /* fd_change_data */, bool success) { fd_change_data *fdc = arg; - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); fdc->cb_that_ran = second_read_callback; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Test that changing the callback we use for notify_on_read actually works. @@ -459,7 +456,7 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); - grpc_pollset_add_fd(&exec_ctx, g_pollset, em_fd); + grpc_pollset_add_fd(&exec_ctx, &g_pollset, em_fd); /* Register the first callback, then make its FD readable */ grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure); @@ -468,18 +465,18 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(result == 1); /* And now wait for it to run. */ - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (a.cb_that_ran == NULL) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(a.cb_that_ran == first_read_callback); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); /* And drain the socket so we can generate a new read edge */ result = read(sv[0], &data, 1); @@ -492,19 +489,19 @@ static void test_grpc_fd_change(void) { result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (b.cb_that_ran == NULL) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_fd_orphan(&exec_ctx, em_fd, NULL, NULL, "d"); grpc_exec_ctx_finish(&exec_ctx); @@ -522,14 +519,12 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_iomgr_init(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); + grpc_pollset_init(&g_pollset); test_grpc_fd(); test_grpc_fd_change(); - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); - gpr_free(g_pollset); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 1e6fa5d45a..cdd5b096fb 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -40,7 +40,6 @@ #include <unistd.h> #include <grpc/grpc.h> -#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -49,9 +48,8 @@ #include "src/core/iomgr/timer.h" #include "test/core/util/test_config.h" -static grpc_pollset_set *g_pollset_set; -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; +static grpc_pollset_set g_pollset_set; +static grpc_pollset g_pollset; static int g_connections_complete = 0; static grpc_endpoint *g_connecting = NULL; @@ -60,10 +58,10 @@ static gpr_timespec test_deadline(void) { } static void finish_connection() { - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); g_connections_complete++; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, bool success) { @@ -101,14 +99,14 @@ void test_succeeds(void) { GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len)); GPR_ASSERT(0 == listen(svr_fd, 1)); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); connections_complete_before = g_connections_complete; - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); /* connect to it */ GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); grpc_closure_init(&done, must_succeed, NULL); - grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set, + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set, (struct sockaddr *)&addr, addr_len, gpr_inf_future(GPR_CLOCK_REALTIME)); @@ -120,19 +118,19 @@ void test_succeeds(void) { GPR_ASSERT(r >= 0); close(r); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (g_connections_complete == connections_complete_before) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); } @@ -149,17 +147,17 @@ void test_fails(void) { memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); connections_complete_before = g_connections_complete; - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); /* connect to a broken address */ grpc_closure_init(&done, must_fail, NULL); - grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set, + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set, (struct sockaddr *)&addr, addr_len, gpr_inf_future(GPR_CLOCK_REALTIME)); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); /* wait for the connection callback to finish */ while (g_connections_complete == connections_complete_before) { @@ -167,14 +165,14 @@ void test_fails(void) { gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC); gpr_timespec polling_deadline = test_deadline(); if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { - grpc_pollset_work(&exec_ctx, g_pollset, &worker, now, polling_deadline); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); } @@ -219,16 +217,16 @@ void test_times_out(void) { connect_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); connections_complete_before = g_connections_complete; - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_closure_init(&done, must_fail, NULL); - grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, g_pollset_set, + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set, (struct sockaddr *)&addr, addr_len, connect_deadline); /* Make sure the event doesn't trigger early */ - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (;;) { grpc_pollset_worker *worker = NULL; gpr_timespec now = gpr_now(connect_deadline.clock_type); @@ -254,13 +252,13 @@ void test_times_out(void) { } gpr_timespec polling_deadline = GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10); if (!grpc_timer_check(&exec_ctx, now, &polling_deadline)) { - grpc_pollset_work(&exec_ctx, g_pollset, &worker, now, polling_deadline); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, now, polling_deadline); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); @@ -279,20 +277,18 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - g_pollset_set = grpc_pollset_set_create(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); - grpc_pollset_set_add_pollset(&exec_ctx, g_pollset_set, g_pollset); + grpc_pollset_set_init(&g_pollset_set); + grpc_pollset_init(&g_pollset); + grpc_pollset_set_add_pollset(&exec_ctx, &g_pollset_set, &g_pollset); grpc_exec_ctx_finish(&exec_ctx); test_succeeds(); gpr_log(GPR_ERROR, "End of first test"); test_fails(); test_times_out(); - grpc_pollset_set_destroy(g_pollset_set); - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_pollset_set_destroy(&g_pollset_set); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); - gpr_free(g_pollset); return 0; } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 4351642ab6..20b0b9e13b 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -36,8 +36,8 @@ #include <errno.h> #include <fcntl.h> #include <string.h> -#include <sys/socket.h> #include <sys/types.h> +#include <sys/socket.h> #include <unistd.h> #include <grpc/grpc.h> @@ -45,11 +45,10 @@ #include <grpc/support/log.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> -#include "test/core/iomgr/endpoint_tests.h" #include "test/core/util/test_config.h" +#include "test/core/iomgr/endpoint_tests.h" -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; +static grpc_pollset g_pollset; /* General test notes: @@ -146,7 +145,7 @@ static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { GPR_ASSERT(success); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); current_data = state->read_bytes % 256; read_bytes = count_slices(state->incoming.slices, state->incoming.count, ¤t_data); @@ -154,10 +153,10 @@ static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, bool success) { gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, state->target_read_bytes); if (state->read_bytes >= state->target_read_bytes) { - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } } @@ -176,7 +175,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -189,17 +188,17 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(&exec_ctx, ep); @@ -222,7 +221,7 @@ static void large_read_test(size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -235,17 +234,17 @@ static void large_read_test(size_t slice_size) { grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(&exec_ctx, ep); @@ -284,11 +283,11 @@ static void write_done(grpc_exec_ctx *exec_ctx, void *user_data /* write_socket_state */, bool success) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { @@ -305,11 +304,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { for (;;) { grpc_pollset_worker *worker = NULL; - gpr_mu_lock(g_mu); - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); do { bytes_read = @@ -351,7 +350,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); state.ep = ep; state.write_done = 0; @@ -364,19 +363,19 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (;;) { grpc_pollset_worker *worker = NULL; if (state.write_done) { break; } - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&outgoing); grpc_endpoint_destroy(&exec_ctx, ep); @@ -387,7 +386,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { void on_fd_released(grpc_exec_ctx *exec_ctx, void *arg, bool success) { int *done = arg; *done = 1; - grpc_pollset_kick(g_pollset, NULL); + grpc_pollset_kick(&g_pollset, NULL); } /* Do a read_test, then release fd and try to read/write again. Verify that @@ -411,7 +410,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); - grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -424,27 +423,27 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&state.incoming); grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!fd_released_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, g_pollset, &worker, + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); } - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); GPR_ASSERT(fd_released_done == 1); GPR_ASSERT(fd == sv[1]); grpc_exec_ctx_finish(&exec_ctx); @@ -492,8 +491,8 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( slice_size, "test"); f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size, "test"); - grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); - grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset); grpc_exec_ctx_finish(&exec_ctx); @@ -513,15 +512,13 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); + grpc_pollset_init(&g_pollset); run_tests(); - grpc_endpoint_tests(configs[0], g_pollset, g_mu); - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_endpoint_tests(configs[0], &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); - gpr_free(g_pollset); return 0; } diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 7933468355..43180b1698 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -32,28 +32,24 @@ */ #include "src/core/iomgr/tcp_server.h" - -#include <errno.h> -#include <netinet/in.h> -#include <string.h> -#include <sys/socket.h> -#include <unistd.h> - +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/sockaddr_utils.h" #include <grpc/grpc.h> -#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> - -#include "src/core/iomgr/iomgr.h" -#include "src/core/iomgr/sockaddr_utils.h" #include "test/core/util/port.h" #include "test/core/util/test_config.h" +#include <errno.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <string.h> +#include <unistd.h> + #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x) -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; +static grpc_pollset g_pollset; static int g_nconnects = 0; typedef struct on_connect_result { @@ -117,11 +113,11 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp, grpc_endpoint_shutdown(exec_ctx, tcp); grpc_endpoint_destroy(exec_ctx, tcp); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); on_connect_result_set(&g_result, acceptor); g_nconnects++; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } static void test_no_op(void) { @@ -178,7 +174,7 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, int clifd = socket(remote->sa_family, SOCK_STREAM, 0); int nconnects_before; - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); nconnects_before = g_nconnects; on_connect_result_init(&g_result); GPR_ASSERT(clifd >= 0); @@ -188,18 +184,18 @@ static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote, while (g_nconnects == nconnects_before && gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(exec_ctx, g_pollset, &worker, + grpc_pollset_work(exec_ctx, &g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(exec_ctx); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_log(GPR_DEBUG, "wait done"); GPR_ASSERT(g_nconnects == nconnects_before + 1); close(clifd); *result = g_result; - gpr_mu_unlock(g_mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Tests a tcp server with multiple ports. TODO(daniel-j-born): Multiple fds for @@ -214,6 +210,7 @@ static void test_connect(unsigned n) { unsigned svr1_fd_count; int svr1_port; grpc_tcp_server *s = grpc_tcp_server_create(NULL); + grpc_pollset *pollsets[1]; unsigned i; server_weak_ref weak_ref; server_weak_ref_init(&weak_ref); @@ -262,7 +259,8 @@ static void test_connect(unsigned n) { } } - grpc_tcp_server_start(&exec_ctx, s, &g_pollset, 1, on_connect, NULL); + pollsets[0] = &g_pollset; + grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, NULL); for (i = 0; i < n; i++) { on_connect_result result; @@ -314,8 +312,7 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); + grpc_pollset_init(&g_pollset); test_no_op(); test_no_op_with_start(); @@ -324,10 +321,9 @@ int main(int argc, char **argv) { test_connect(1); test_connect(10); - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); - gpr_free(g_pollset); return 0; } diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c index 8a1faf6303..a024915256 100644 --- a/test/core/iomgr/workqueue_test.c +++ b/test/core/iomgr/workqueue_test.c @@ -34,20 +34,18 @@ #include "src/core/iomgr/workqueue.h" #include <grpc/grpc.h> -#include <grpc/support/alloc.h> #include <grpc/support/log.h> #include "test/core/util/test_config.h" -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; +static grpc_pollset g_pollset; static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, bool success) { GPR_ASSERT(success == 1); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); *(int *)p = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(g_mu); + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } static void test_ref_unref(void) { @@ -69,13 +67,13 @@ static void test_add_closure(void) { grpc_closure_init(&c, must_succeed, &done); grpc_workqueue_push(wq, &c, 1); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); + grpc_workqueue_add_to_pollset(&exec_ctx, wq, &g_pollset); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); GPR_ASSERT(!done); - grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(deadline.clock_type), - deadline); - gpr_mu_unlock(g_mu); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(deadline.clock_type), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(done); @@ -94,13 +92,13 @@ static void test_flush(void) { grpc_exec_ctx_enqueue(&exec_ctx, &c, true, NULL); grpc_workqueue_flush(&exec_ctx, wq); - grpc_workqueue_add_to_pollset(&exec_ctx, wq, g_pollset); + grpc_workqueue_add_to_pollset(&exec_ctx, wq, &g_pollset); - gpr_mu_lock(g_mu); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); GPR_ASSERT(!done); - grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(deadline.clock_type), - deadline); - gpr_mu_unlock(g_mu); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(deadline.clock_type), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(done); @@ -117,18 +115,15 @@ int main(int argc, char **argv) { grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); + grpc_pollset_init(&g_pollset); test_ref_unref(); test_add_closure(); test_flush(); - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); - - gpr_free(g_pollset); return 0; } diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index 9b70afffe1..4dd595df95 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -45,8 +45,7 @@ #include "src/core/security/credentials.h" typedef struct { - gpr_mu *mu; - grpc_pollset *pollset; + grpc_pollset pollset; int is_done; char *token; } oauth2_request; @@ -67,11 +66,11 @@ static void on_oauth2_response(grpc_exec_ctx *exec_ctx, void *user_data, GPR_SLICE_LENGTH(token_slice)); token[GPR_SLICE_LENGTH(token_slice)] = '\0'; } - gpr_mu_lock(request->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&request->pollset)); request->is_done = 1; request->token = token; - grpc_pollset_kick(request->pollset, NULL); - gpr_mu_unlock(request->mu); + grpc_pollset_kick(&request->pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&request->pollset)); } static void do_nothing(grpc_exec_ctx *exec_ctx, void *unused, bool success) {} @@ -83,30 +82,28 @@ char *grpc_test_fetch_oauth2_token_with_credentials( grpc_closure do_nothing_closure; grpc_auth_metadata_context null_ctx = {"", "", NULL, NULL}; - request.pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(request.pollset, &request.mu); + grpc_pollset_init(&request.pollset); request.is_done = 0; grpc_closure_init(&do_nothing_closure, do_nothing, NULL); - grpc_call_credentials_get_request_metadata(&exec_ctx, creds, request.pollset, + grpc_call_credentials_get_request_metadata(&exec_ctx, creds, &request.pollset, null_ctx, on_oauth2_response, &request); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(request.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset)); while (!request.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, request.pollset, &worker, + grpc_pollset_work(&exec_ctx, &request.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); } - gpr_mu_unlock(request.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset)); - grpc_pollset_shutdown(&exec_ctx, request.pollset, &do_nothing_closure); + grpc_pollset_shutdown(&exec_ctx, &request.pollset, &do_nothing_closure); grpc_exec_ctx_finish(&exec_ctx); - grpc_pollset_destroy(request.pollset); - gpr_free(request.pollset); + grpc_pollset_destroy(&request.pollset); return request.token; } diff --git a/test/core/security/print_google_default_creds_token.c b/test/core/security/print_google_default_creds_token.c index 09673f362c..6043bf5420 100644 --- a/test/core/security/print_google_default_creds_token.c +++ b/test/core/security/print_google_default_creds_token.c @@ -34,6 +34,8 @@ #include <stdio.h> #include <string.h> +#include "src/core/security/credentials.h" +#include "src/core/support/string.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> @@ -42,12 +44,8 @@ #include <grpc/support/slice.h> #include <grpc/support/sync.h> -#include "src/core/security/credentials.h" -#include "src/core/support/string.h" - typedef struct { - gpr_mu *mu; - grpc_pollset *pollset; + grpc_pollset pollset; int is_done; } synchronizer; @@ -64,10 +62,10 @@ static void on_metadata_response(grpc_exec_ctx *exec_ctx, void *user_data, printf("\nGot token: %s\n\n", token); gpr_free(token); } - gpr_mu_lock(sync->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&sync->pollset)); sync->is_done = 1; - grpc_pollset_kick(sync->pollset, NULL); - gpr_mu_unlock(sync->mu); + grpc_pollset_kick(&sync->pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&sync->pollset)); } int main(int argc, char **argv) { @@ -93,30 +91,26 @@ int main(int argc, char **argv) { goto end; } - sync.pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(sync.pollset, &sync.mu); + grpc_pollset_init(&sync.pollset); sync.is_done = 0; grpc_call_credentials_get_request_metadata( &exec_ctx, ((grpc_composite_channel_credentials *)creds)->call_creds, - sync.pollset, context, on_metadata_response, &sync); + &sync.pollset, context, on_metadata_response, &sync); - gpr_mu_lock(sync.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); while (!sync.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, sync.pollset, &worker, + grpc_pollset_work(&exec_ctx, &sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(sync.mu); - grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(sync.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); } - gpr_mu_unlock(sync.mu); - - grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); grpc_channel_credentials_release(creds); - gpr_free(sync.pollset); end: gpr_cmdline_destroy(cl); diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index 0e8c38a53e..fb4bd30e2d 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -36,17 +36,16 @@ #include <fcntl.h> #include <sys/types.h> +#include "src/core/security/secure_endpoint.h" +#include "src/core/iomgr/endpoint_pair.h" +#include "src/core/iomgr/iomgr.h" #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> -#include "src/core/iomgr/endpoint_pair.h" -#include "src/core/iomgr/iomgr.h" -#include "src/core/security/secure_endpoint.h" -#include "src/core/tsi/fake_transport_security.h" #include "test/core/util/test_config.h" +#include "src/core/tsi/fake_transport_security.h" -static gpr_mu *g_mu; -static grpc_pollset *g_pollset; +static grpc_pollset g_pollset; static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( size_t slice_size, gpr_slice *leftover_slices, size_t leftover_nslices) { @@ -57,8 +56,8 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( grpc_endpoint_pair tcp; tcp = grpc_iomgr_create_endpoint_pair("fixture", slice_size); - grpc_endpoint_add_to_pollset(&exec_ctx, tcp.client, g_pollset); - grpc_endpoint_add_to_pollset(&exec_ctx, tcp.server, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, tcp.client, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, tcp.server, &g_pollset); if (leftover_nslices == 0) { f.client_ep = @@ -182,16 +181,13 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); - g_pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); - grpc_endpoint_tests(configs[0], g_pollset, g_mu); + grpc_pollset_init(&g_pollset); + grpc_endpoint_tests(configs[0], &g_pollset); test_leftover(configs[1], 1); - grpc_closure_init(&destroyed, destroy_pollset, g_pollset); - grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); - gpr_free(g_pollset); - return 0; } diff --git a/test/core/security/verify_jwt.c b/test/core/security/verify_jwt.c index eb86589681..5070cf0492 100644 --- a/test/core/security/verify_jwt.c +++ b/test/core/security/verify_jwt.c @@ -34,6 +34,7 @@ #include <stdio.h> #include <string.h> +#include "src/core/security/jwt_verifier.h" #include <grpc/grpc.h> #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> @@ -42,11 +43,8 @@ #include <grpc/support/slice.h> #include <grpc/support/sync.h> -#include "src/core/security/jwt_verifier.h" - typedef struct { - grpc_pollset *pollset; - gpr_mu *mu; + grpc_pollset pollset; int is_done; int success; } synchronizer; @@ -79,10 +77,10 @@ static void on_jwt_verification_done(void *user_data, grpc_jwt_verifier_status_to_string(status)); } - gpr_mu_lock(sync->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&sync->pollset)); sync->is_done = 1; - grpc_pollset_kick(sync->pollset, NULL); - gpr_mu_unlock(sync->mu); + grpc_pollset_kick(&sync->pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&sync->pollset)); } int main(int argc, char **argv) { @@ -105,26 +103,23 @@ int main(int argc, char **argv) { grpc_init(); - sync.pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(sync.pollset, &sync.mu); + grpc_pollset_init(&sync.pollset); sync.is_done = 0; - grpc_jwt_verifier_verify(&exec_ctx, verifier, sync.pollset, jwt, aud, + grpc_jwt_verifier_verify(&exec_ctx, verifier, &sync.pollset, jwt, aud, on_jwt_verification_done, &sync); - gpr_mu_lock(sync.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); while (!sync.is_done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, sync.pollset, &worker, + grpc_pollset_work(&exec_ctx, &sync.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); - gpr_mu_unlock(sync.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(sync.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&sync.pollset)); } - gpr_mu_unlock(sync.mu); - - gpr_free(sync.pollset); + gpr_mu_unlock(GRPC_POLLSET_MU(&sync.pollset)); grpc_jwt_verifier_destroy(verifier); gpr_cmdline_destroy(cl); diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index ba382d242a..ad21fe0f53 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -69,8 +69,7 @@ static int has_port_been_chosen(int port) { } typedef struct freereq { - gpr_mu *mu; - grpc_pollset *pollset; + grpc_pollset pollset; int done; } freereq; @@ -83,10 +82,10 @@ static void destroy_pollset_and_shutdown(grpc_exec_ctx *exec_ctx, void *p, static void freed_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, const grpc_httpcli_response *response) { freereq *pr = arg; - gpr_mu_lock(pr->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset)); pr->done = 1; - grpc_pollset_kick(pr->pollset, NULL); - gpr_mu_unlock(pr->mu); + grpc_pollset_kick(&pr->pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset)); } static void free_port_using_server(char *server, int port) { @@ -101,34 +100,31 @@ static void free_port_using_server(char *server, int port) { memset(&pr, 0, sizeof(pr)); memset(&req, 0, sizeof(req)); - - pr.pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(pr.pollset, &pr.mu); + grpc_pollset_init(&pr.pollset); grpc_closure_init(&shutdown_closure, destroy_pollset_and_shutdown, - pr.pollset); + &pr.pollset); req.host = server; gpr_asprintf(&path, "/drop/%d", port); req.path = path; grpc_httpcli_context_init(&context); - grpc_httpcli_get(&exec_ctx, &context, pr.pollset, &req, + grpc_httpcli_get(&exec_ctx, &context, &pr.pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), freed_port_from_server, &pr); - gpr_mu_lock(pr.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); while (!pr.done) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, pr.pollset, &worker, + grpc_pollset_work(&exec_ctx, &pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); } - gpr_mu_unlock(pr.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); grpc_httpcli_context_destroy(&context); grpc_exec_ctx_finish(&exec_ctx); - grpc_pollset_shutdown(&exec_ctx, pr.pollset, &shutdown_closure); + grpc_pollset_shutdown(&exec_ctx, &pr.pollset, &shutdown_closure); grpc_exec_ctx_finish(&exec_ctx); - gpr_free(pr.pollset); gpr_free(path); } @@ -206,8 +202,7 @@ static int is_port_available(int *port, int is_tcp) { } typedef struct portreq { - gpr_mu *mu; - grpc_pollset *pollset; + grpc_pollset pollset; int port; int retries; char *server; @@ -239,7 +234,7 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, pr->retries++; req.host = pr->server; req.path = "/get"; - grpc_httpcli_get(exec_ctx, pr->ctx, pr->pollset, &req, + grpc_httpcli_get(exec_ctx, pr->ctx, &pr->pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, pr); return; @@ -251,10 +246,10 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, port = port * 10 + response->body[i] - '0'; } GPR_ASSERT(port > 1024); - gpr_mu_lock(pr->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset)); pr->port = port; - grpc_pollset_kick(pr->pollset, NULL); - gpr_mu_unlock(pr->mu); + grpc_pollset_kick(&pr->pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset)); } static int pick_port_using_server(char *server) { @@ -268,10 +263,9 @@ static int pick_port_using_server(char *server) { memset(&pr, 0, sizeof(pr)); memset(&req, 0, sizeof(req)); - pr.pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(pr.pollset, &pr.mu); + grpc_pollset_init(&pr.pollset); grpc_closure_init(&shutdown_closure, destroy_pollset_and_shutdown, - pr.pollset); + &pr.pollset); pr.port = -1; pr.server = server; pr.ctx = &context; @@ -280,23 +274,22 @@ static int pick_port_using_server(char *server) { req.path = "/get"; grpc_httpcli_context_init(&context); - grpc_httpcli_get(&exec_ctx, &context, pr.pollset, &req, + grpc_httpcli_get(&exec_ctx, &context, &pr.pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, &pr); grpc_exec_ctx_finish(&exec_ctx); - gpr_mu_lock(pr.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); while (pr.port == -1) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, pr.pollset, &worker, + grpc_pollset_work(&exec_ctx, &pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); } - gpr_mu_unlock(pr.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); grpc_httpcli_context_destroy(&context); - grpc_pollset_shutdown(&exec_ctx, pr.pollset, &shutdown_closure); + grpc_pollset_shutdown(&exec_ctx, &pr.pollset, &shutdown_closure); grpc_exec_ctx_finish(&exec_ctx); - gpr_free(pr.pollset); return pr.port; } diff --git a/test/core/util/port_windows.c b/test/core/util/port_windows.c index 3b20aeb718..b5bd0168ad 100644 --- a/test/core/util/port_windows.c +++ b/test/core/util/port_windows.c @@ -129,8 +129,7 @@ static int is_port_available(int *port, int is_tcp) { } typedef struct portreq { - grpc_pollset *pollset; - gpr_mu *mu; + grpc_pollset pollset; int port; } portreq; @@ -146,10 +145,10 @@ static void got_port_from_server(grpc_exec_ctx *exec_ctx, void *arg, port = port * 10 + response->body[i] - '0'; } GPR_ASSERT(port > 1024); - gpr_mu_lock(pr->mu); + gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset)); pr->port = port; - grpc_pollset_kick(pr->pollset, NULL); - gpr_mu_unlock(pr->mu); + grpc_pollset_kick(&pr->pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset)); } static void destroy_pollset_and_shutdown(grpc_exec_ctx *exec_ctx, void *p, @@ -169,34 +168,32 @@ static int pick_port_using_server(char *server) { memset(&pr, 0, sizeof(pr)); memset(&req, 0, sizeof(req)); - pr.pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(pr.pollset, &pr.mu); + grpc_pollset_init(&pr.pollset); pr.port = -1; req.host = server; req.path = "/get"; grpc_httpcli_context_init(&context); - grpc_httpcli_get(&exec_ctx, &context, pr.pollset, &req, + grpc_httpcli_get(&exec_ctx, &context, &pr.pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, &pr); - gpr_mu_lock(pr.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); while (pr.port == -1) { grpc_pollset_worker *worker = NULL; - grpc_pollset_work(&exec_ctx, pr.pollset, &worker, + grpc_pollset_work(&exec_ctx, &pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); - gpr_mu_unlock(pr.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); grpc_exec_ctx_flush(&exec_ctx); - gpr_mu_lock(pr.mu); + gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); } - gpr_mu_unlock(pr.mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); grpc_httpcli_context_destroy(&context); grpc_closure_init(&destroy_pollset_closure, destroy_pollset_and_shutdown, &pr.pollset); - grpc_pollset_shutdown(&exec_ctx, pr.pollset, &destroy_pollset_closure); - gpr_free(pr.pollset); + grpc_pollset_shutdown(&exec_ctx, &pr.pollset, &destroy_pollset_closure); grpc_exec_ctx_finish(&exec_ctx); return pr.port; diff --git a/test/core/util/test_tcp_server.c b/test/core/util/test_tcp_server.c index ab379441d8..e99d5dcffd 100644 --- a/test/core/util/test_tcp_server.c +++ b/test/core/util/test_tcp_server.c @@ -57,8 +57,8 @@ void test_tcp_server_init(test_tcp_server *server, server->tcp_server = NULL; grpc_closure_init(&server->shutdown_complete, on_server_destroyed, server); server->shutdown = 0; - server->pollset = gpr_malloc(grpc_pollset_size()); - grpc_pollset_init(server->pollset, &server->mu); + grpc_pollset_init(&server->pollset); + server->pollsets[0] = &server->pollset; server->on_connect = on_connect; server->cb_data = user_data; } @@ -77,7 +77,7 @@ void test_tcp_server_start(test_tcp_server *server, int port) { grpc_tcp_server_add_port(server->tcp_server, &addr, sizeof(addr)); GPR_ASSERT(port_added == port); - grpc_tcp_server_start(&exec_ctx, server->tcp_server, &server->pollset, 1, + grpc_tcp_server_start(&exec_ctx, server->tcp_server, server->pollsets, 1, server->on_connect, server->cb_data); gpr_log(GPR_INFO, "test tcp server listening on 0.0.0.0:%d", port); @@ -90,10 +90,10 @@ void test_tcp_server_poll(test_tcp_server *server, int seconds) { gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(seconds, GPR_TIMESPAN)); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; - gpr_mu_lock(server->mu); - grpc_pollset_work(&exec_ctx, server->pollset, &worker, + gpr_mu_lock(GRPC_POLLSET_MU(&server->pollset)); + grpc_pollset_work(&exec_ctx, &server->pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline); - gpr_mu_unlock(server->mu); + gpr_mu_unlock(GRPC_POLLSET_MU(&server->pollset)); grpc_exec_ctx_finish(&exec_ctx); } @@ -111,9 +111,8 @@ void test_tcp_server_destroy(test_tcp_server *server) { gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), shutdown_deadline) < 0) { test_tcp_server_poll(server, 1); } - grpc_pollset_shutdown(&exec_ctx, server->pollset, &do_nothing_cb); + grpc_pollset_shutdown(&exec_ctx, &server->pollset, &do_nothing_cb); grpc_exec_ctx_finish(&exec_ctx); - grpc_pollset_destroy(server->pollset); - gpr_free(server->pollset); + grpc_pollset_destroy(&server->pollset); grpc_shutdown(); } diff --git a/test/core/util/test_tcp_server.h b/test/core/util/test_tcp_server.h index 15fcb4fb87..51119cf6c8 100644 --- a/test/core/util/test_tcp_server.h +++ b/test/core/util/test_tcp_server.h @@ -41,8 +41,8 @@ typedef struct test_tcp_server { grpc_tcp_server *tcp_server; grpc_closure shutdown_complete; int shutdown; - gpr_mu *mu; - grpc_pollset *pollset; + grpc_pollset pollset; + grpc_pollset *pollsets[1]; grpc_tcp_server_cb on_connect; void *cb_data; } test_tcp_server; |