aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-21 14:39:57 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-21 14:39:57 -0700
commitdfff1b8126a1f83833fd99626517f28d1e68453a (patch)
tree40a9aa8126c08a11fb1a5cdd4058f504e05dca43 /test
parent3ffd8220a17fd2fdf64adc66b03e4e254880471b (diff)
Call list progress
Diffstat (limited to 'test')
-rw-r--r--test/core/bad_client/bad_client.c14
-rw-r--r--test/core/channel/channel_stack_test.c32
-rw-r--r--test/core/end2end/fixtures/h2_sockpair+trace.c33
-rw-r--r--test/core/end2end/fixtures/h2_sockpair.c33
-rw-r--r--test/core/end2end/fixtures/h2_sockpair_1byte.c33
-rw-r--r--test/core/httpcli/httpcli_test.c29
-rw-r--r--test/core/iomgr/alarm_list_test.c63
-rw-r--r--test/core/iomgr/alarm_test.c10
-rw-r--r--test/core/iomgr/endpoint_pair_test.c22
-rw-r--r--test/core/iomgr/endpoint_tests.c3
-rw-r--r--test/core/iomgr/fd_conservation_posix_test.c11
-rw-r--r--test/core/iomgr/fd_posix_test.c121
-rw-r--r--test/core/iomgr/resolve_address_test.c6
-rw-r--r--test/core/iomgr/tcp_client_posix_test.c69
-rw-r--r--test/core/security/credentials_test.c118
-rw-r--r--test/core/security/jwt_verifier_test.c60
-rw-r--r--test/core/security/oauth2_utils.c2
-rw-r--r--test/core/security/secure_endpoint_test.c44
-rw-r--r--test/core/surface/completion_queue_test.c21
-rw-r--r--test/core/util/port_posix.c4
-rw-r--r--test/core/util/reconnect_server.c4
21 files changed, 471 insertions, 261 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index e89d5b3d39..fd37fce976 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -70,9 +70,11 @@ static void server_setup_transport(void *ts, grpc_transport *transport,
thd_args *a = ts;
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
- grpc_server_setup_transport(a->server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue,
- grpc_server_get_channel_args(a->server));
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_server_setup_transport(
+ a->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx,
+ workqueue, grpc_server_get_channel_args(a->server), &call_list);
+ grpc_call_list_run(&call_list);
}
void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
@@ -116,9 +118,11 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
a.validator = validator;
grpc_server_register_completion_queue(a.server, a.cq, NULL);
grpc_server_start(a.server);
- transport = grpc_create_chttp2_transport(NULL, sfd.server, mdctx, 0);
+ transport =
+ grpc_create_chttp2_transport(NULL, sfd.server, mdctx, 0, &call_list);
server_setup_transport(&a, transport, mdctx, workqueue);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list);
+ grpc_call_list_run(&call_list);
/* Bind everything into the same pollset */
grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq), &call_list);
diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c
index 60129afc43..07df921270 100644
--- a/test/core/channel/channel_stack_test.c
+++ b/test/core/channel/channel_stack_test.c
@@ -44,7 +44,7 @@
static void channel_init_func(grpc_channel_element *elem, grpc_channel *master,
const grpc_channel_args *args,
grpc_mdctx *metadata_context, int is_first,
- int is_last) {
+ int is_last, grpc_call_list *call_list) {
GPR_ASSERT(args->num_args == 1);
GPR_ASSERT(args->args[0].type == GRPC_ARG_INTEGER);
GPR_ASSERT(0 == strcmp(args->args[0].key, "test_key"));
@@ -56,26 +56,33 @@ static void channel_init_func(grpc_channel_element *elem, grpc_channel *master,
static void call_init_func(grpc_call_element *elem,
const void *server_transport_data,
- grpc_transport_stream_op *initial_op) {
+ grpc_transport_stream_op *initial_op,
+ grpc_call_list *call_list) {
++*(int *)(elem->channel_data);
*(int *)(elem->call_data) = 0;
}
-static void channel_destroy_func(grpc_channel_element *elem) {}
+static void channel_destroy_func(grpc_channel_element *elem,
+ grpc_call_list *call_list) {}
-static void call_destroy_func(grpc_call_element *elem) {
+static void call_destroy_func(grpc_call_element *elem,
+ grpc_call_list *call_list) {
++*(int *)(elem->channel_data);
}
-static void call_func(grpc_call_element *elem, grpc_transport_stream_op *op) {
+static void call_func(grpc_call_element *elem, grpc_transport_stream_op *op,
+ grpc_call_list *call_list) {
++*(int *)(elem->call_data);
}
-static void channel_func(grpc_channel_element *elem, grpc_transport_op *op) {
+static void channel_func(grpc_channel_element *elem, grpc_transport_op *op,
+ grpc_call_list *call_list) {
++*(int *)(elem->channel_data);
}
-static char *get_peer(grpc_call_element *elem) { return gpr_strdup("peer"); }
+static char *get_peer(grpc_call_element *elem, grpc_call_list *call_list) {
+ return gpr_strdup("peer");
+}
static void test_create_channel_stack(void) {
const grpc_channel_filter filter = {call_func, channel_func,
@@ -93,6 +100,7 @@ static void test_create_channel_stack(void) {
grpc_mdctx *metadata_context;
int *channel_data;
int *call_data;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
metadata_context = grpc_mdctx_create();
@@ -105,14 +113,14 @@ static void test_create_channel_stack(void) {
channel_stack = gpr_malloc(grpc_channel_stack_size(&filters, 1));
grpc_channel_stack_init(&filters, 1, NULL, &chan_args, metadata_context,
- channel_stack);
+ channel_stack, &call_list);
GPR_ASSERT(channel_stack->count == 1);
channel_elem = grpc_channel_stack_element(channel_stack, 0);
channel_data = (int *)channel_elem->channel_data;
GPR_ASSERT(*channel_data == 0);
call_stack = gpr_malloc(channel_stack->call_stack_size);
- grpc_call_stack_init(channel_stack, NULL, NULL, call_stack);
+ grpc_call_stack_init(channel_stack, NULL, NULL, call_stack, &call_list);
GPR_ASSERT(call_stack->count == 1);
call_elem = grpc_call_stack_element(call_stack, 0);
GPR_ASSERT(call_elem->filter == channel_elem->filter);
@@ -121,14 +129,16 @@ static void test_create_channel_stack(void) {
GPR_ASSERT(*call_data == 0);
GPR_ASSERT(*channel_data == 1);
- grpc_call_stack_destroy(call_stack);
+ grpc_call_stack_destroy(call_stack, &call_list);
gpr_free(call_stack);
GPR_ASSERT(*channel_data == 2);
- grpc_channel_stack_destroy(channel_stack);
+ grpc_channel_stack_destroy(channel_stack, &call_list);
gpr_free(channel_stack);
grpc_mdctx_unref(metadata_context);
+
+ GPR_ASSERT(grpc_call_list_empty(call_list));
}
int main(int argc, char **argv) {
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c
index 04dbf48522..be8132bc96 100644
--- a/test/core/end2end/fixtures/h2_sockpair+trace.c
+++ b/test/core/end2end/fixtures/h2_sockpair+trace.c
@@ -64,9 +64,11 @@ static void server_setup_transport(void *ts, grpc_transport *transport,
grpc_end2end_test_fixture *f = ts;
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
- grpc_server_setup_transport(f->server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx, g_workqueue,
- grpc_server_get_channel_args(f->server));
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_server_setup_transport(
+ f->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx,
+ g_workqueue, grpc_server_get_channel_args(f->server), &call_list);
+ grpc_call_list_run(&call_list);
}
typedef struct {
@@ -75,16 +77,17 @@ typedef struct {
} sp_client_setup;
static void client_setup_transport(void *ts, grpc_transport *transport,
- grpc_mdctx *mdctx) {
+ grpc_mdctx *mdctx,
+ grpc_call_list *call_list) {
sp_client_setup *cs = ts;
const grpc_channel_filter *filters[] = {&grpc_http_client_filter,
&grpc_compress_filter,
&grpc_connected_channel_filter};
size_t nfilters = sizeof(filters) / sizeof(*filters);
- grpc_channel *channel =
- grpc_channel_create_from_filters("socketpair-target", filters, nfilters,
- cs->client_args, mdctx, g_workqueue, 1);
+ grpc_channel *channel = grpc_channel_create_from_filters(
+ "socketpair-target", filters, nfilters, cs->client_args, mdctx,
+ g_workqueue, 1, call_list);
cs->f->client = channel;
@@ -108,20 +111,24 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *client_args) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_endpoint_pair *sfd = f->fixture_data;
grpc_transport *transport;
grpc_mdctx *mdctx = grpc_mdctx_create();
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1);
- client_setup_transport(&cs, transport, mdctx);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1,
+ &call_list);
+ client_setup_transport(&cs, transport, mdctx, &call_list);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list);
+ grpc_call_list_run(&call_list);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *server_args) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_endpoint_pair *sfd = f->fixture_data;
grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_transport *transport;
@@ -129,9 +136,11 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_register_completion_queue(f->server, f->cq, NULL);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0,
+ &call_list);
server_setup_transport(f, transport, mdctx);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list);
+ grpc_call_list_run(&call_list);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) {
diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c
index b661dbfd60..08f918cbad 100644
--- a/test/core/end2end/fixtures/h2_sockpair.c
+++ b/test/core/end2end/fixtures/h2_sockpair.c
@@ -63,9 +63,11 @@ static void server_setup_transport(void *ts, grpc_transport *transport,
grpc_end2end_test_fixture *f = ts;
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
- grpc_server_setup_transport(f->server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx, g_workqueue,
- grpc_server_get_channel_args(f->server));
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_server_setup_transport(
+ f->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx,
+ g_workqueue, grpc_server_get_channel_args(f->server), &call_list);
+ grpc_call_list_run(&call_list);
}
typedef struct {
@@ -74,16 +76,17 @@ typedef struct {
} sp_client_setup;
static void client_setup_transport(void *ts, grpc_transport *transport,
- grpc_mdctx *mdctx) {
+ grpc_mdctx *mdctx,
+ grpc_call_list *call_list) {
sp_client_setup *cs = ts;
const grpc_channel_filter *filters[] = {&grpc_http_client_filter,
&grpc_compress_filter,
&grpc_connected_channel_filter};
size_t nfilters = sizeof(filters) / sizeof(*filters);
- grpc_channel *channel =
- grpc_channel_create_from_filters("socketpair-target", filters, nfilters,
- cs->client_args, mdctx, g_workqueue, 1);
+ grpc_channel *channel = grpc_channel_create_from_filters(
+ "socketpair-target", filters, nfilters, cs->client_args, mdctx,
+ g_workqueue, 1, call_list);
cs->f->client = channel;
@@ -107,20 +110,24 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *client_args) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_endpoint_pair *sfd = f->fixture_data;
grpc_transport *transport;
grpc_mdctx *mdctx = grpc_mdctx_create();
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1);
- client_setup_transport(&cs, transport, mdctx);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1,
+ &call_list);
+ client_setup_transport(&cs, transport, mdctx, &call_list);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list);
+ grpc_call_list_run(&call_list);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *server_args) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_endpoint_pair *sfd = f->fixture_data;
grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_transport *transport;
@@ -128,9 +135,11 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_register_completion_queue(f->server, f->cq, NULL);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0,
+ &call_list);
server_setup_transport(f, transport, mdctx);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list);
+ grpc_call_list_run(&call_list);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) {
diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c
index 6cf656bcde..46ffaff745 100644
--- a/test/core/end2end/fixtures/h2_sockpair_1byte.c
+++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c
@@ -63,9 +63,11 @@ static void server_setup_transport(void *ts, grpc_transport *transport,
grpc_end2end_test_fixture *f = ts;
static grpc_channel_filter const *extra_filters[] = {
&grpc_http_server_filter};
- grpc_server_setup_transport(f->server, transport, extra_filters,
- GPR_ARRAY_SIZE(extra_filters), mdctx, g_workqueue,
- grpc_server_get_channel_args(f->server));
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_server_setup_transport(
+ f->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx,
+ g_workqueue, grpc_server_get_channel_args(f->server), &call_list);
+ grpc_call_list_run(&call_list);
}
typedef struct {
@@ -74,16 +76,17 @@ typedef struct {
} sp_client_setup;
static void client_setup_transport(void *ts, grpc_transport *transport,
- grpc_mdctx *mdctx) {
+ grpc_mdctx *mdctx,
+ grpc_call_list *call_list) {
sp_client_setup *cs = ts;
const grpc_channel_filter *filters[] = {&grpc_http_client_filter,
&grpc_compress_filter,
&grpc_connected_channel_filter};
size_t nfilters = sizeof(filters) / sizeof(*filters);
- grpc_channel *channel =
- grpc_channel_create_from_filters("socketpair-target", filters, nfilters,
- cs->client_args, mdctx, g_workqueue, 1);
+ grpc_channel *channel = grpc_channel_create_from_filters(
+ "socketpair-target", filters, nfilters, cs->client_args, mdctx,
+ g_workqueue, 1, call_list);
cs->f->client = channel;
@@ -107,20 +110,24 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *client_args) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_endpoint_pair *sfd = f->fixture_data;
grpc_transport *transport;
grpc_mdctx *mdctx = grpc_mdctx_create();
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1);
- client_setup_transport(&cs, transport, mdctx);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1,
+ &call_list);
+ client_setup_transport(&cs, transport, mdctx, &call_list);
GPR_ASSERT(f->client);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list);
+ grpc_call_list_run(&call_list);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *server_args) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_endpoint_pair *sfd = f->fixture_data;
grpc_mdctx *mdctx = grpc_mdctx_create();
grpc_transport *transport;
@@ -128,9 +135,11 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_register_completion_queue(f->server, f->cq, NULL);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0,
+ &call_list);
server_setup_transport(f, transport, mdctx);
- grpc_chttp2_transport_start_reading(transport, NULL, 0);
+ grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list);
+ grpc_call_list_run(&call_list);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) {
diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c
index cf2b10c021..bcb544e502 100644
--- a/test/core/httpcli/httpcli_test.c
+++ b/test/core/httpcli/httpcli_test.c
@@ -53,7 +53,8 @@ static gpr_timespec n_seconds_time(int seconds) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(seconds);
}
-static void on_finish(void *arg, const grpc_httpcli_response *response) {
+static void on_finish(void *arg, const grpc_httpcli_response *response,
+ grpc_call_list *call_list) {
const char *expect =
"<html><head><title>Hello world!</title></head>"
"<body><p>This is a test</p></body></html>";
@@ -71,6 +72,7 @@ static void on_finish(void *arg, const grpc_httpcli_response *response) {
static void test_get(int use_ssl, int port) {
grpc_httpcli_request req;
char *host;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
g_done = 0;
gpr_log(GPR_INFO, "running %s with use_ssl=%d.", "test_get", use_ssl);
@@ -84,12 +86,15 @@ static void test_get(int use_ssl, int port) {
req.handshaker = use_ssl ? &grpc_httpcli_ssl : &grpc_httpcli_plaintext;
grpc_httpcli_get(&g_context, &g_pollset, &req, n_seconds_time(15), on_finish,
- (void *)42);
+ (void *)42, &call_list);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- n_seconds_time(20));
+ n_seconds_time(20), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_free(host);
@@ -98,6 +103,7 @@ static void test_get(int use_ssl, int port) {
static void test_post(int use_ssl, int port) {
grpc_httpcli_request req;
char *host;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
g_done = 0;
gpr_log(GPR_INFO, "running %s with use_ssl=%d.", "test_post", (int)use_ssl);
@@ -111,20 +117,27 @@ static void test_post(int use_ssl, int port) {
req.handshaker = use_ssl ? &grpc_httpcli_ssl : &grpc_httpcli_plaintext;
grpc_httpcli_post(&g_context, &g_pollset, &req, "hello", 5,
- n_seconds_time(15), on_finish, (void *)42);
+ n_seconds_time(15), on_finish, (void *)42, &call_list);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!g_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- n_seconds_time(20));
+ n_seconds_time(20), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_free(host);
}
-static void destroy_pollset(void *ignored) { grpc_pollset_destroy(&g_pollset); }
+static void destroy_pollset(void *p, int success, grpc_call_list *call_list) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_subprocess *server;
char *me = argv[0];
char *lslash = strrchr(me, '/');
@@ -161,7 +174,9 @@ int main(int argc, char **argv) {
test_post(0, port);
grpc_httpcli_context_destroy(&g_context);
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, NULL);
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list);
+ grpc_call_list_run(&call_list);
grpc_shutdown();
gpr_subprocess_destroy(server);
diff --git a/test/core/iomgr/alarm_list_test.c b/test/core/iomgr/alarm_list_test.c
index 56d662e61a..8082532434 100644
--- a/test/core/iomgr/alarm_list_test.c
+++ b/test/core/iomgr/alarm_list_test.c
@@ -42,11 +42,8 @@
#define MAX_CB 30
static int cb_called[MAX_CB][2];
-static int kicks;
-void grpc_kick_poller(void) { ++kicks; }
-
-static void cb(void *arg, int success) {
+static void cb(void *arg, int success, grpc_call_list *call_list) {
cb_called[(gpr_intptr)arg][success]++;
}
@@ -54,6 +51,7 @@ static void add_test(void) {
gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
int i;
grpc_alarm alarms[20];
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_alarm_list_init(start);
memset(cb_called, 0, sizeof(cb_called));
@@ -62,55 +60,56 @@ static void add_test(void) {
for (i = 0; i < 10; i++) {
grpc_alarm_init(&alarms[i],
gpr_time_add(start, gpr_time_from_millis(10, GPR_TIMESPAN)),
- cb, (void *)(gpr_intptr)i, start);
+ cb, (void *)(gpr_intptr)i, start, &call_list);
}
/* 1010 ms alarms. will expire in the next epoch */
for (i = 10; i < 20; i++) {
grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis(
1010, GPR_TIMESPAN)),
- cb, (void *)(gpr_intptr)i, start);
+ cb, (void *)(gpr_intptr)i, start, &call_list);
}
/* collect alarms. Only the first batch should be ready. */
- GPR_ASSERT(10 == grpc_alarm_check(NULL,
- gpr_time_add(start, gpr_time_from_millis(
+ GPR_ASSERT(10 == grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(
500, GPR_TIMESPAN)),
- NULL));
+ NULL, &call_list));
+ grpc_call_list_run(&call_list);
for (i = 0; i < 20; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 10));
GPR_ASSERT(cb_called[i][0] == 0);
}
- GPR_ASSERT(0 == grpc_alarm_check(
- NULL, gpr_time_add(
- start, gpr_time_from_millis(600, GPR_TIMESPAN)),
- NULL));
+ GPR_ASSERT(0 == grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(
+ 600, GPR_TIMESPAN)),
+ NULL, &call_list));
+ grpc_call_list_run(&call_list);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 10));
GPR_ASSERT(cb_called[i][0] == 0);
}
/* collect the rest of the alarms */
- GPR_ASSERT(
- 10 == grpc_alarm_check(NULL, gpr_time_add(start, gpr_time_from_millis(
- 1500, GPR_TIMESPAN)),
- NULL));
+ GPR_ASSERT(10 ==
+ grpc_alarm_check(
+ gpr_time_add(start, gpr_time_from_millis(1500, GPR_TIMESPAN)),
+ NULL, &call_list));
+ grpc_call_list_run(&call_list);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 20));
GPR_ASSERT(cb_called[i][0] == 0);
}
- GPR_ASSERT(0 == grpc_alarm_check(NULL,
- gpr_time_add(start, gpr_time_from_millis(
+ GPR_ASSERT(0 == grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis(
1600, GPR_TIMESPAN)),
- NULL));
+ NULL, &call_list));
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 20));
GPR_ASSERT(cb_called[i][0] == 0);
}
- grpc_alarm_list_shutdown();
+ grpc_alarm_list_shutdown(&call_list);
+ grpc_call_list_run(&call_list);
}
static gpr_timespec tfm(int m) {
@@ -122,28 +121,32 @@ static gpr_timespec tfm(int m) {
/* Cleaning up a list with pending alarms. */
void destruction_test(void) {
grpc_alarm alarms[5];
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_alarm_list_init(gpr_time_0(GPR_CLOCK_REALTIME));
memset(cb_called, 0, sizeof(cb_called));
grpc_alarm_init(&alarms[0], tfm(100), cb, (void *)(gpr_intptr)0,
- gpr_time_0(GPR_CLOCK_REALTIME));
+ gpr_time_0(GPR_CLOCK_REALTIME), &call_list);
grpc_alarm_init(&alarms[1], tfm(3), cb, (void *)(gpr_intptr)1,
- gpr_time_0(GPR_CLOCK_REALTIME));
+ gpr_time_0(GPR_CLOCK_REALTIME), &call_list);
grpc_alarm_init(&alarms[2], tfm(100), cb, (void *)(gpr_intptr)2,
- gpr_time_0(GPR_CLOCK_REALTIME));
+ gpr_time_0(GPR_CLOCK_REALTIME), &call_list);
grpc_alarm_init(&alarms[3], tfm(3), cb, (void *)(gpr_intptr)3,
- gpr_time_0(GPR_CLOCK_REALTIME));
+ gpr_time_0(GPR_CLOCK_REALTIME), &call_list);
grpc_alarm_init(&alarms[4], tfm(1), cb, (void *)(gpr_intptr)4,
- gpr_time_0(GPR_CLOCK_REALTIME));
- GPR_ASSERT(1 == grpc_alarm_check(NULL, tfm(2), NULL));
+ gpr_time_0(GPR_CLOCK_REALTIME), &call_list);
+ GPR_ASSERT(1 == grpc_alarm_check(tfm(2), NULL, &call_list));
+ grpc_call_list_run(&call_list);
GPR_ASSERT(1 == cb_called[4][1]);
- grpc_alarm_cancel(&alarms[0]);
- grpc_alarm_cancel(&alarms[3]);
+ grpc_alarm_cancel(&alarms[0], &call_list);
+ grpc_alarm_cancel(&alarms[3], &call_list);
+ grpc_call_list_run(&call_list);
GPR_ASSERT(1 == cb_called[0][0]);
GPR_ASSERT(1 == cb_called[3][0]);
- grpc_alarm_list_shutdown();
+ grpc_alarm_list_shutdown(&call_list);
+ grpc_call_list_run(&call_list);
GPR_ASSERT(1 == cb_called[1][0]);
GPR_ASSERT(1 == cb_called[2][0]);
}
diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c
index eafac0dc46..bfced83409 100644
--- a/test/core/iomgr/alarm_test.c
+++ b/test/core/iomgr/alarm_test.c
@@ -65,7 +65,8 @@ typedef struct {
} alarm_arg;
/* Called when an alarm expires. */
-static void alarm_cb(void *arg /* alarm_arg */, int success) {
+static void alarm_cb(void *arg /* alarm_arg */, int success,
+ grpc_call_list *call_list) {
alarm_arg *a = arg;
gpr_mu_lock(&a->mu);
if (success) {
@@ -90,6 +91,7 @@ static void test_grpc_alarm(void) {
*/
gpr_timespec alarm_deadline;
gpr_timespec followup_deadline;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
alarm_arg arg;
alarm_arg arg2;
@@ -107,7 +109,7 @@ static void test_grpc_alarm(void) {
gpr_event_init(&arg.fcb_arg);
grpc_alarm_init(&alarm, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), alarm_cb, &arg,
- gpr_now(GPR_CLOCK_MONOTONIC));
+ gpr_now(GPR_CLOCK_MONOTONIC), &call_list);
alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1);
gpr_mu_lock(&arg.mu);
@@ -157,8 +159,8 @@ static void test_grpc_alarm(void) {
gpr_event_init(&arg2.fcb_arg);
grpc_alarm_init(&alarm_to_cancel, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100),
- alarm_cb, &arg2, gpr_now(GPR_CLOCK_MONOTONIC));
- grpc_alarm_cancel(&alarm_to_cancel);
+ alarm_cb, &arg2, gpr_now(GPR_CLOCK_MONOTONIC), &call_list);
+ grpc_alarm_cancel(&alarm_to_cancel, &call_list);
alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1);
gpr_mu_lock(&arg2.mu);
diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c
index 9fe54771bd..491effd5ae 100644
--- a/test/core/iomgr/endpoint_pair_test.c
+++ b/test/core/iomgr/endpoint_pair_test.c
@@ -43,20 +43,20 @@
#include "test/core/iomgr/endpoint_tests.h"
static grpc_pollset g_pollset;
-static grpc_workqueue *g_workqueue;
static void clean_up(void) {}
static grpc_endpoint_test_fixture create_fixture_endpoint_pair(
size_t slice_size) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_endpoint_test_fixture f;
- grpc_endpoint_pair p =
- grpc_iomgr_create_endpoint_pair("test", slice_size, g_workqueue);
+ grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", slice_size);
f.client_ep = p.client;
f.server_ep = p.server;
- grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset);
- grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset, &call_list);
+ grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset, &call_list);
+ grpc_call_list_run(&call_list);
return f;
}
@@ -65,16 +65,20 @@ static grpc_endpoint_test_config configs[] = {
{"tcp/tcp_socketpair", create_fixture_endpoint_pair, clean_up},
};
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(void *p, int success, grpc_call_list *call_list) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_test_init(argc, argv);
grpc_init();
grpc_pollset_init(&g_pollset);
- g_workqueue = grpc_workqueue_create();
grpc_endpoint_tests(configs[0], &g_pollset);
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
- GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list);
+ grpc_call_list_run(&call_list);
grpc_shutdown();
return 0;
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index a5079f17a5..02618dfd66 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -243,9 +243,10 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
+ deadline, &call_list);
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
+ grpc_call_list_run(&call_list);
end_test(config);
gpr_slice_buffer_destroy(&state.outgoing);
diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c
index 30852050ce..a1b1171b07 100644
--- a/test/core/iomgr/fd_conservation_posix_test.c
+++ b/test/core/iomgr/fd_conservation_posix_test.c
@@ -43,11 +43,9 @@ int main(int argc, char **argv) {
int i;
struct rlimit rlim;
grpc_endpoint_pair p;
- grpc_workqueue *workqueue;
grpc_test_init(argc, argv);
grpc_iomgr_init();
- workqueue = grpc_workqueue_create();
/* set max # of file descriptors to a low value, and
verify we can create and destroy many more than this number
@@ -56,12 +54,13 @@ int main(int argc, char **argv) {
GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim));
for (i = 0; i < 100; i++) {
- p = grpc_iomgr_create_endpoint_pair("test", 1, workqueue);
- grpc_endpoint_destroy(p.client);
- grpc_endpoint_destroy(p.server);
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ p = grpc_iomgr_create_endpoint_pair("test", 1);
+ grpc_endpoint_destroy(p.client, &call_list);
+ grpc_endpoint_destroy(p.server, &call_list);
+ grpc_call_list_run(&call_list);
}
- GRPC_WORKQUEUE_UNREF(workqueue, "destroy");
grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index ae6b56da77..f89d6c7824 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -52,7 +52,6 @@
#include "test/core/util/test_config.h"
static grpc_pollset g_pollset;
-static grpc_workqueue *g_workqueue;
/* buffer size used to send and receive data.
1024 is the minimal value to set TCP send and receive buffer. */
@@ -119,18 +118,18 @@ typedef struct {
/* Called when an upload session can be safely shutdown.
Close session FD and start to shutdown listen FD. */
static void session_shutdown_cb(void *arg, /*session*/
- int success) {
+ int success, grpc_call_list *call_list) {
session *se = arg;
server *sv = se->sv;
- grpc_fd_orphan(se->em_fd, NULL, "a");
+ grpc_fd_orphan(se->em_fd, NULL, "a", call_list);
gpr_free(se);
/* Start to shutdown listen fd. */
- grpc_fd_shutdown(sv->em_fd);
+ grpc_fd_shutdown(sv->em_fd, call_list);
}
/* Called when data become readable in a session. */
static void session_read_cb(void *arg, /*session*/
- int success) {
+ int success, grpc_call_list *call_list) {
session *se = arg;
int fd = se->em_fd->fd;
@@ -138,7 +137,7 @@ static void session_read_cb(void *arg, /*session*/
ssize_t read_total = 0;
if (!success) {
- session_shutdown_cb(arg, 1);
+ session_shutdown_cb(arg, 1, call_list);
return;
}
@@ -153,7 +152,7 @@ static void session_read_cb(void *arg, /*session*/
It is possible to read nothing due to spurious edge event or data has
been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
if (read_once == 0) {
- session_shutdown_cb(arg, 1);
+ session_shutdown_cb(arg, 1, call_list);
} else if (read_once == -1) {
if (errno == EAGAIN) {
/* An edge triggered event is cached in the kernel until next poll.
@@ -164,7 +163,7 @@ static void session_read_cb(void *arg, /*session*/
TODO(chenw): in multi-threaded version, callback and polling can be
run in different threads. polling may catch a persist read edge event
before notify_on_read is called. */
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure, call_list);
} else {
gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
abort();
@@ -174,10 +173,11 @@ static void session_read_cb(void *arg, /*session*/
/* Called when the listen FD can be safely shutdown.
Close listen FD and signal that server can be shutdown. */
-static void listen_shutdown_cb(void *arg /*server*/, int success) {
+static void listen_shutdown_cb(void *arg /*server*/, int success,
+ grpc_call_list *call_list) {
server *sv = arg;
- grpc_fd_orphan(sv->em_fd, NULL, "b");
+ grpc_fd_orphan(sv->em_fd, NULL, "b", call_list);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
sv->done = 1;
@@ -187,7 +187,7 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) {
/* Called when a new TCP connection request arrives in the listening port. */
static void listen_cb(void *arg, /*=sv_arg*/
- int success) {
+ int success, grpc_call_list *call_list) {
server *sv = arg;
int fd;
int flags;
@@ -197,7 +197,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
grpc_fd *listen_em_fd = sv->em_fd;
if (!success) {
- listen_shutdown_cb(arg, 1);
+ listen_shutdown_cb(arg, 1, call_list);
return;
}
@@ -208,13 +208,13 @@ static void listen_cb(void *arg, /*=sv_arg*/
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
se = gpr_malloc(sizeof(*se));
se->sv = sv;
- se->em_fd = grpc_fd_create(fd, g_workqueue, "listener");
- grpc_pollset_add_fd(&g_pollset, se->em_fd);
+ se->em_fd = grpc_fd_create(fd, "listener");
+ grpc_pollset_add_fd(&g_pollset, se->em_fd, call_list);
se->session_read_closure.cb = session_read_cb;
se->session_read_closure.cb_arg = se;
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure, call_list);
- grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure, call_list);
}
/* Max number of connections pending to be accepted by listen(). */
@@ -224,7 +224,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
listen_cb() is registered to be interested in reading from listen_fd.
When connection request arrives, listen_cb() is called to accept the
connection request. */
-static int server_start(server *sv) {
+static int server_start(server *sv, grpc_call_list *call_list) {
int port = 0;
int fd;
struct sockaddr_in sin;
@@ -237,12 +237,12 @@ static int server_start(server *sv) {
port = ntohs(sin.sin_port);
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
- sv->em_fd = grpc_fd_create(fd, g_workqueue, "server");
- grpc_pollset_add_fd(&g_pollset, sv->em_fd);
+ sv->em_fd = grpc_fd_create(fd, "server");
+ grpc_pollset_add_fd(&g_pollset, sv->em_fd, call_list);
/* Register to be interested in reading from listen_fd. */
sv->listen_closure.cb = listen_cb;
sv->listen_closure.cb_arg = sv;
- grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure, call_list);
return port;
}
@@ -251,9 +251,13 @@ static int server_start(server *sv) {
static void server_wait_and_shutdown(server *sv) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!sv->done) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@@ -286,23 +290,24 @@ static void client_init(client *cl) {
}
/* Called when a client upload session is ready to shutdown. */
-static void client_session_shutdown_cb(void *arg /*client*/, int success) {
+static void client_session_shutdown_cb(void *arg /*client*/, int success,
+ grpc_call_list *call_list) {
client *cl = arg;
- grpc_fd_orphan(cl->em_fd, NULL, "c");
+ grpc_fd_orphan(cl->em_fd, NULL, "c", call_list);
cl->done = 1;
grpc_pollset_kick(&g_pollset, NULL);
}
/* Write as much as possible, then register notify_on_write. */
static void client_session_write(void *arg, /*client*/
- int success) {
+ int success, grpc_call_list *call_list) {
client *cl = arg;
int fd = cl->em_fd->fd;
ssize_t write_once = 0;
if (!success) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
- client_session_shutdown_cb(arg, 1);
+ client_session_shutdown_cb(arg, 1, call_list);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
return;
}
@@ -317,10 +322,10 @@ static void client_session_write(void *arg, /*client*/
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
cl->write_closure.cb = client_session_write;
cl->write_closure.cb_arg = cl;
- grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
+ grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure, call_list);
cl->client_write_cnt++;
} else {
- client_session_shutdown_cb(arg, 1);
+ client_session_shutdown_cb(arg, 1, call_list);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} else {
@@ -330,7 +335,7 @@ static void client_session_write(void *arg, /*client*/
}
/* Start a client to send a stream of bytes. */
-static void client_start(client *cl, int port) {
+static void client_start(client *cl, int port, grpc_call_list *call_list) {
int fd;
struct sockaddr_in sin;
create_test_socket(port, &fd, &sin);
@@ -350,10 +355,10 @@ static void client_start(client *cl, int port) {
}
}
- cl->em_fd = grpc_fd_create(fd, g_workqueue, "client");
- grpc_pollset_add_fd(&g_pollset, cl->em_fd);
+ cl->em_fd = grpc_fd_create(fd, "client");
+ grpc_pollset_add_fd(&g_pollset, cl->em_fd, call_list);
- client_session_write(cl, 1);
+ client_session_write(cl, 1, call_list);
}
/* Wait for the signal to shutdown a client. */
@@ -361,8 +366,12 @@ static void client_wait_and_shutdown(client *cl) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!cl->done) {
grpc_pollset_worker worker;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@@ -374,11 +383,13 @@ static void test_grpc_fd(void) {
server sv;
client cl;
int port;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
server_init(&sv);
- port = server_start(&sv);
+ port = server_start(&sv, &call_list);
client_init(&cl);
- client_start(&cl, port);
+ client_start(&cl, port, &call_list);
+ grpc_call_list_run(&call_list);
client_wait_and_shutdown(&cl);
server_wait_and_shutdown(&sv);
GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
@@ -386,14 +397,15 @@ static void test_grpc_fd(void) {
}
typedef struct fd_change_data {
- void (*cb_that_ran)(void *, int success);
+ void (*cb_that_ran)(void *, int success, grpc_call_list *call_list);
} fd_change_data;
void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; }
void destroy_change_data(fd_change_data *fdc) {}
-static void first_read_callback(void *arg /* fd_change_data */, int success) {
+static void first_read_callback(void *arg /* fd_change_data */, int success,
+ grpc_call_list *call_list) {
fd_change_data *fdc = arg;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -402,7 +414,8 @@ static void first_read_callback(void *arg /* fd_change_data */, int success) {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
-static void second_read_callback(void *arg /* fd_change_data */, int success) {
+static void second_read_callback(void *arg /* fd_change_data */, int success,
+ grpc_call_list *call_list) {
fd_change_data *fdc = arg;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -424,6 +437,7 @@ static void test_grpc_fd_change(void) {
ssize_t result;
grpc_closure first_closure;
grpc_closure second_closure;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
first_closure.cb = first_read_callback;
first_closure.cb_arg = &a;
@@ -439,11 +453,11 @@ static void test_grpc_fd_change(void) {
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
- em_fd = grpc_fd_create(sv[0], g_workqueue, "test_grpc_fd_change");
- grpc_pollset_add_fd(&g_pollset, em_fd);
+ em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
+ grpc_pollset_add_fd(&g_pollset, em_fd, &call_list);
/* Register the first callback, then make its FD readable */
- grpc_fd_notify_on_read(em_fd, &first_closure);
+ grpc_fd_notify_on_read(em_fd, &first_closure, &call_list);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -453,7 +467,10 @@ static void test_grpc_fd_change(void) {
while (a.cb_that_ran == NULL) {
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
GPR_ASSERT(a.cb_that_ran == first_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -464,7 +481,7 @@ static void test_grpc_fd_change(void) {
/* Now register a second callback with distinct change data, and do the same
thing again. */
- grpc_fd_notify_on_read(em_fd, &second_closure);
+ grpc_fd_notify_on_read(em_fd, &second_closure, &call_list);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -473,29 +490,37 @@ static void test_grpc_fd_change(void) {
while (b.cb_that_ran == NULL) {
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
/* Except now we verify that second_read_callback ran instead */
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_fd_orphan(em_fd, NULL, "d");
+ grpc_fd_orphan(em_fd, NULL, "d", &call_list);
+ grpc_call_list_run(&call_list);
destroy_change_data(&a);
destroy_change_data(&b);
close(sv[1]);
}
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(void *p, int success, grpc_call_list *call_list) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_test_init(argc, argv);
grpc_iomgr_init();
grpc_pollset_init(&g_pollset);
- g_workqueue = grpc_workqueue_create();
test_grpc_fd();
test_grpc_fd_change();
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
- GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list);
+ grpc_call_list_run(&call_list);
grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c
index 668c5399f9..2ea3b56ee6 100644
--- a/test/core/iomgr/resolve_address_test.c
+++ b/test/core/iomgr/resolve_address_test.c
@@ -42,14 +42,16 @@ static gpr_timespec test_deadline(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(100);
}
-static void must_succeed(void* evp, grpc_resolved_addresses* p) {
+static void must_succeed(void* evp, grpc_resolved_addresses* p,
+ grpc_call_list* call_list) {
GPR_ASSERT(p);
GPR_ASSERT(p->naddrs >= 1);
grpc_resolved_addresses_destroy(p);
gpr_event_set(evp, (void*)1);
}
-static void must_fail(void* evp, grpc_resolved_addresses* p) {
+static void must_fail(void* evp, grpc_resolved_addresses* p,
+ grpc_call_list* call_list) {
GPR_ASSERT(!p);
gpr_event_set(evp, (void*)1);
}
diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c
index 4a6786c0a2..a5a3d0ea7e 100644
--- a/test/core/iomgr/tcp_client_posix_test.c
+++ b/test/core/iomgr/tcp_client_posix_test.c
@@ -49,8 +49,8 @@
static grpc_pollset_set g_pollset_set;
static grpc_pollset g_pollset;
-static grpc_workqueue *g_workqueue;
static int g_connections_complete = 0;
+static grpc_endpoint *g_connecting = NULL;
static gpr_timespec test_deadline(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
@@ -63,15 +63,18 @@ static void finish_connection() {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
-static void must_succeed(void *arg, grpc_endpoint *tcp) {
- GPR_ASSERT(tcp);
- grpc_endpoint_shutdown(tcp);
- grpc_endpoint_destroy(tcp);
+static void must_succeed(void *arg, int success, grpc_call_list *call_list) {
+ GPR_ASSERT(g_connecting != NULL);
+ GPR_ASSERT(success);
+ grpc_endpoint_shutdown(g_connecting, call_list);
+ grpc_endpoint_destroy(g_connecting, call_list);
+ g_connecting = NULL;
finish_connection();
}
-static void must_fail(void *arg, grpc_endpoint *tcp) {
- GPR_ASSERT(!tcp);
+static void must_fail(void *arg, int success, grpc_call_list *call_list) {
+ GPR_ASSERT(g_connecting == NULL);
+ GPR_ASSERT(!success);
finish_connection();
}
@@ -81,6 +84,8 @@ void test_succeeds(void) {
int svr_fd;
int r;
int connections_complete_before;
+ grpc_closure done;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_log(GPR_DEBUG, "test_succeeds");
@@ -99,9 +104,10 @@ void test_succeeds(void) {
/* connect to it */
GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0);
- grpc_tcp_client_connect(must_succeed, NULL, &g_pollset_set, g_workqueue,
+ grpc_closure_init(&done, must_succeed, NULL);
+ grpc_tcp_client_connect(&done, &g_connecting, &g_pollset_set,
(struct sockaddr *)&addr, addr_len,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME), &call_list);
/* await the connection */
do {
@@ -116,7 +122,10 @@ void test_succeeds(void) {
while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -126,6 +135,8 @@ void test_fails(void) {
struct sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
int connections_complete_before;
+ grpc_closure done;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_log(GPR_DEBUG, "test_fails");
@@ -137,9 +148,10 @@ void test_fails(void) {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
/* connect to a broken address */
- grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set, g_workqueue,
+ grpc_closure_init(&done, must_fail, NULL);
+ grpc_tcp_client_connect(&done, &g_connecting, &g_pollset_set,
(struct sockaddr *)&addr, addr_len,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME), &call_list);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -147,7 +159,10 @@ void test_fails(void) {
while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker worker;
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- test_deadline());
+ test_deadline(), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -163,6 +178,8 @@ void test_times_out(void) {
int r;
int connections_complete_before;
gpr_timespec connect_deadline;
+ grpc_closure done;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_log(GPR_DEBUG, "test_times_out");
@@ -196,8 +213,10 @@ void test_times_out(void) {
connections_complete_before = g_connections_complete;
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set, g_workqueue,
- (struct sockaddr *)&addr, addr_len, connect_deadline);
+ grpc_closure_init(&done, must_fail, NULL);
+ grpc_tcp_client_connect(&done, &g_connecting, &g_pollset_set,
+ (struct sockaddr *)&addr, addr_len, connect_deadline,
+ &call_list);
/* Make sure the event doesn't trigger early */
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -222,7 +241,10 @@ void test_times_out(void) {
connections_complete_before + is_after_deadline);
}
grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
+ GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10), &call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_call_list_run(&call_list);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -232,22 +254,27 @@ void test_times_out(void) {
}
}
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(void *p, int success, grpc_call_list *call_list) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_test_init(argc, argv);
grpc_init();
grpc_pollset_set_init(&g_pollset_set);
grpc_pollset_init(&g_pollset);
- grpc_pollset_set_add_pollset(&g_pollset_set, &g_pollset);
- g_workqueue = grpc_workqueue_create();
+ grpc_pollset_set_add_pollset(&g_pollset_set, &g_pollset, &call_list);
+ grpc_call_list_run(&call_list);
test_succeeds();
gpr_log(GPR_ERROR, "End of first test");
test_fails();
test_times_out();
grpc_pollset_set_destroy(&g_pollset_set);
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
- GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list);
+ grpc_call_list_run(&call_list);
grpc_shutdown();
return 0;
}
diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c
index 97ebaa0570..3d0ed094c9 100644
--- a/test/core/security/credentials_test.c
+++ b/test/core/security/credentials_test.c
@@ -318,7 +318,8 @@ static void check_metadata(expected_md *expected, grpc_credentials_md *md_elems,
static void check_google_iam_metadata(void *user_data,
grpc_credentials_md *md_elems,
size_t num_md,
- grpc_credentials_status status) {
+ grpc_credentials_status status,
+ grpc_call_list *call_list) {
grpc_credentials *c = (grpc_credentials *)user_data;
expected_md emd[] = {{GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY,
test_google_iam_authorization_token},
@@ -331,19 +332,23 @@ static void check_google_iam_metadata(void *user_data,
}
static void test_google_iam_creds(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *creds = grpc_google_iam_credentials_create(
test_google_iam_authorization_token, test_google_iam_authority_selector,
NULL);
GPR_ASSERT(grpc_credentials_has_request_metadata(creds));
GPR_ASSERT(grpc_credentials_has_request_metadata_only(creds));
grpc_credentials_get_request_metadata(creds, NULL, test_service_url,
- check_google_iam_metadata, creds);
+ check_google_iam_metadata, creds,
+ &call_list);
+ grpc_call_list_run(&call_list);
}
static void check_access_token_metadata(void *user_data,
grpc_credentials_md *md_elems,
size_t num_md,
- grpc_credentials_status status) {
+ grpc_credentials_status status,
+ grpc_call_list *call_list) {
grpc_credentials *c = (grpc_credentials *)user_data;
expected_md emd[] = {{GRPC_AUTHORIZATION_METADATA_KEY, "Bearer blah"}};
GPR_ASSERT(status == GRPC_CREDENTIALS_OK);
@@ -353,17 +358,22 @@ static void check_access_token_metadata(void *user_data,
}
static void test_access_token_creds(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *creds = grpc_access_token_credentials_create("blah", NULL);
GPR_ASSERT(grpc_credentials_has_request_metadata(creds));
GPR_ASSERT(grpc_credentials_has_request_metadata_only(creds));
GPR_ASSERT(strcmp(creds->type, GRPC_CREDENTIALS_TYPE_OAUTH2) == 0);
grpc_credentials_get_request_metadata(creds, NULL, test_service_url,
- check_access_token_metadata, creds);
+ check_access_token_metadata, creds,
+ &call_list);
+ grpc_call_list_run(&call_list);
}
-static void check_ssl_oauth2_composite_metadata(
- void *user_data, grpc_credentials_md *md_elems, size_t num_md,
- grpc_credentials_status status) {
+static void check_ssl_oauth2_composite_metadata(void *user_data,
+ grpc_credentials_md *md_elems,
+ size_t num_md,
+ grpc_credentials_status status,
+ grpc_call_list *call_list) {
grpc_credentials *c = (grpc_credentials *)user_data;
expected_md emd[] = {
{GRPC_AUTHORIZATION_METADATA_KEY, test_oauth2_bearer_token}};
@@ -374,6 +384,7 @@ static void check_ssl_oauth2_composite_metadata(
}
static void test_ssl_oauth2_composite_creds(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *ssl_creds =
grpc_ssl_credentials_create(test_root_cert, NULL, NULL);
const grpc_credentials_array *creds_array;
@@ -395,7 +406,8 @@ static void test_ssl_oauth2_composite_creds(void) {
GRPC_CREDENTIALS_TYPE_OAUTH2) == 0);
grpc_credentials_get_request_metadata(composite_creds, NULL, test_service_url,
check_ssl_oauth2_composite_metadata,
- composite_creds);
+ composite_creds, &call_list);
+ grpc_call_list_run(&call_list);
}
void test_ssl_fake_transport_security_composite_creds_failure(void) {
@@ -412,7 +424,7 @@ void test_ssl_fake_transport_security_composite_creds_failure(void) {
static void check_ssl_oauth2_google_iam_composite_metadata(
void *user_data, grpc_credentials_md *md_elems, size_t num_md,
- grpc_credentials_status status) {
+ grpc_credentials_status status, grpc_call_list *call_list) {
grpc_credentials *c = (grpc_credentials *)user_data;
expected_md emd[] = {
{GRPC_AUTHORIZATION_METADATA_KEY, test_oauth2_bearer_token},
@@ -427,6 +439,7 @@ static void check_ssl_oauth2_google_iam_composite_metadata(
}
static void test_ssl_oauth2_google_iam_composite_creds(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *ssl_creds =
grpc_ssl_credentials_create(test_root_cert, NULL, NULL);
const grpc_credentials_array *creds_array;
@@ -457,12 +470,16 @@ static void test_ssl_oauth2_google_iam_composite_creds(void) {
GRPC_CREDENTIALS_TYPE_IAM) == 0);
grpc_credentials_get_request_metadata(
composite_creds, NULL, test_service_url,
- check_ssl_oauth2_google_iam_composite_metadata, composite_creds);
+ check_ssl_oauth2_google_iam_composite_metadata, composite_creds,
+ &call_list);
+ grpc_call_list_run(&call_list);
}
-static void on_oauth2_creds_get_metadata_success(
- void *user_data, grpc_credentials_md *md_elems, size_t num_md,
- grpc_credentials_status status) {
+static void on_oauth2_creds_get_metadata_success(void *user_data,
+ grpc_credentials_md *md_elems,
+ size_t num_md,
+ grpc_credentials_status status,
+ grpc_call_list *call_list) {
GPR_ASSERT(status == GRPC_CREDENTIALS_OK);
GPR_ASSERT(num_md == 1);
GPR_ASSERT(gpr_slice_str_cmp(md_elems[0].key, "Authorization") == 0);
@@ -473,9 +490,11 @@ static void on_oauth2_creds_get_metadata_success(
GPR_ASSERT(strcmp((const char *)user_data, test_user_data) == 0);
}
-static void on_oauth2_creds_get_metadata_failure(
- void *user_data, grpc_credentials_md *md_elems, size_t num_md,
- grpc_credentials_status status) {
+static void on_oauth2_creds_get_metadata_failure(void *user_data,
+ grpc_credentials_md *md_elems,
+ size_t num_md,
+ grpc_credentials_status status,
+ grpc_call_list *call_list) {
GPR_ASSERT(status == GRPC_CREDENTIALS_ERROR);
GPR_ASSERT(num_md == 0);
GPR_ASSERT(user_data != NULL);
@@ -497,39 +516,44 @@ static void validate_compute_engine_http_request(
static int compute_engine_httpcli_get_success_override(
const grpc_httpcli_request *request, gpr_timespec deadline,
- grpc_httpcli_response_cb on_response, void *user_data) {
+ grpc_httpcli_response_cb on_response, void *user_data,
+ grpc_call_list *call_list) {
grpc_httpcli_response response =
http_response(200, valid_oauth2_json_response);
validate_compute_engine_http_request(request);
- on_response(user_data, &response);
+ on_response(user_data, &response, call_list);
return 1;
}
static int compute_engine_httpcli_get_failure_override(
const grpc_httpcli_request *request, gpr_timespec deadline,
- grpc_httpcli_response_cb on_response, void *user_data) {
+ grpc_httpcli_response_cb on_response, void *user_data,
+ grpc_call_list *call_list) {
grpc_httpcli_response response = http_response(403, "Not Authorized.");
validate_compute_engine_http_request(request);
- on_response(user_data, &response);
+ on_response(user_data, &response, call_list);
return 1;
}
static int httpcli_post_should_not_be_called(
const grpc_httpcli_request *request, const char *body_bytes,
size_t body_size, gpr_timespec deadline,
- grpc_httpcli_response_cb on_response, void *user_data) {
+ grpc_httpcli_response_cb on_response, void *user_data,
+ grpc_call_list *call_list) {
GPR_ASSERT("HTTP POST should not be called" == NULL);
return 1;
}
static int httpcli_get_should_not_be_called(
const grpc_httpcli_request *request, gpr_timespec deadline,
- grpc_httpcli_response_cb on_response, void *user_data) {
+ grpc_httpcli_response_cb on_response, void *user_data,
+ grpc_call_list *call_list) {
GPR_ASSERT("HTTP GET should not be called" == NULL);
return 1;
}
static void test_compute_engine_creds_success(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *compute_engine_creds =
grpc_google_compute_engine_credentials_create(NULL);
GPR_ASSERT(grpc_credentials_has_request_metadata(compute_engine_creds));
@@ -540,20 +564,23 @@ static void test_compute_engine_creds_success(void) {
httpcli_post_should_not_be_called);
grpc_credentials_get_request_metadata(
compute_engine_creds, NULL, test_service_url,
- on_oauth2_creds_get_metadata_success, (void *)test_user_data);
+ on_oauth2_creds_get_metadata_success, (void *)test_user_data, &call_list);
+ grpc_call_list_run(&call_list);
/* Second request: the cached token should be served directly. */
grpc_httpcli_set_override(httpcli_get_should_not_be_called,
httpcli_post_should_not_be_called);
grpc_credentials_get_request_metadata(
compute_engine_creds, NULL, test_service_url,
- on_oauth2_creds_get_metadata_success, (void *)test_user_data);
+ on_oauth2_creds_get_metadata_success, (void *)test_user_data, &call_list);
+ grpc_call_list_run(&call_list);
grpc_credentials_unref(compute_engine_creds);
grpc_httpcli_set_override(NULL, NULL);
}
static void test_compute_engine_creds_failure(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *compute_engine_creds =
grpc_google_compute_engine_credentials_create(NULL);
grpc_httpcli_set_override(compute_engine_httpcli_get_failure_override,
@@ -562,9 +589,10 @@ static void test_compute_engine_creds_failure(void) {
GPR_ASSERT(grpc_credentials_has_request_metadata_only(compute_engine_creds));
grpc_credentials_get_request_metadata(
compute_engine_creds, NULL, test_service_url,
- on_oauth2_creds_get_metadata_failure, (void *)test_user_data);
+ on_oauth2_creds_get_metadata_failure, (void *)test_user_data, &call_list);
grpc_credentials_unref(compute_engine_creds);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
static void validate_refresh_token_http_request(
@@ -592,25 +620,26 @@ static void validate_refresh_token_http_request(
static int refresh_token_httpcli_post_success(
const grpc_httpcli_request *request, const char *body, size_t body_size,
gpr_timespec deadline, grpc_httpcli_response_cb on_response,
- void *user_data) {
+ void *user_data, grpc_call_list *call_list) {
grpc_httpcli_response response =
http_response(200, valid_oauth2_json_response);
validate_refresh_token_http_request(request, body, body_size);
- on_response(user_data, &response);
+ on_response(user_data, &response, call_list);
return 1;
}
static int refresh_token_httpcli_post_failure(
const grpc_httpcli_request *request, const char *body, size_t body_size,
gpr_timespec deadline, grpc_httpcli_response_cb on_response,
- void *user_data) {
+ void *user_data, grpc_call_list *call_list) {
grpc_httpcli_response response = http_response(403, "Not Authorized.");
validate_refresh_token_http_request(request, body, body_size);
- on_response(user_data, &response);
+ on_response(user_data, &response, call_list);
return 1;
}
static void test_refresh_token_creds_success(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *refresh_token_creds =
grpc_google_refresh_token_credentials_create(test_refresh_token_str,
NULL);
@@ -622,20 +651,24 @@ static void test_refresh_token_creds_success(void) {
refresh_token_httpcli_post_success);
grpc_credentials_get_request_metadata(
refresh_token_creds, NULL, test_service_url,
- on_oauth2_creds_get_metadata_success, (void *)test_user_data);
+ on_oauth2_creds_get_metadata_success, (void *)test_user_data, &call_list);
+ grpc_call_list_run(&call_list);
/* Second request: the cached token should be served directly. */
grpc_httpcli_set_override(httpcli_get_should_not_be_called,
httpcli_post_should_not_be_called);
grpc_credentials_get_request_metadata(
refresh_token_creds, NULL, test_service_url,
- on_oauth2_creds_get_metadata_success, (void *)test_user_data);
+ on_oauth2_creds_get_metadata_success, (void *)test_user_data, &call_list);
+ grpc_call_list_run(&call_list);
grpc_credentials_unref(refresh_token_creds);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
static void test_refresh_token_creds_failure(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *refresh_token_creds =
grpc_google_refresh_token_credentials_create(test_refresh_token_str,
NULL);
@@ -645,9 +678,10 @@ static void test_refresh_token_creds_failure(void) {
GPR_ASSERT(grpc_credentials_has_request_metadata_only(refresh_token_creds));
grpc_credentials_get_request_metadata(
refresh_token_creds, NULL, test_service_url,
- on_oauth2_creds_get_metadata_failure, (void *)test_user_data);
+ on_oauth2_creds_get_metadata_failure, (void *)test_user_data, &call_list);
grpc_credentials_unref(refresh_token_creds);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
static void validate_jwt_encode_and_sign_params(
@@ -698,7 +732,8 @@ static char *encode_and_sign_jwt_should_not_be_called(
static void on_jwt_creds_get_metadata_success(void *user_data,
grpc_credentials_md *md_elems,
size_t num_md,
- grpc_credentials_status status) {
+ grpc_credentials_status status,
+ grpc_call_list *call_list) {
char *expected_md_value;
gpr_asprintf(&expected_md_value, "Bearer %s", test_signed_jwt);
GPR_ASSERT(status == GRPC_CREDENTIALS_OK);
@@ -713,7 +748,8 @@ static void on_jwt_creds_get_metadata_success(void *user_data,
static void on_jwt_creds_get_metadata_failure(void *user_data,
grpc_credentials_md *md_elems,
size_t num_md,
- grpc_credentials_status status) {
+ grpc_credentials_status status,
+ grpc_call_list *call_list) {
GPR_ASSERT(status == GRPC_CREDENTIALS_ERROR);
GPR_ASSERT(num_md == 0);
GPR_ASSERT(user_data != NULL);
@@ -722,6 +758,7 @@ static void on_jwt_creds_get_metadata_failure(void *user_data,
static void test_jwt_creds_success(void) {
char *json_key_string = test_json_key_str();
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *jwt_creds =
grpc_service_account_jwt_access_credentials_create(
json_key_string, grpc_max_auth_token_lifetime, NULL);
@@ -732,21 +769,24 @@ static void test_jwt_creds_success(void) {
grpc_jwt_encode_and_sign_set_override(encode_and_sign_jwt_success);
grpc_credentials_get_request_metadata(jwt_creds, NULL, test_service_url,
on_jwt_creds_get_metadata_success,
- (void *)test_user_data);
+ (void *)test_user_data, &call_list);
+ grpc_call_list_run(&call_list);
/* Second request: the cached token should be served directly. */
grpc_jwt_encode_and_sign_set_override(
encode_and_sign_jwt_should_not_be_called);
grpc_credentials_get_request_metadata(jwt_creds, NULL, test_service_url,
on_jwt_creds_get_metadata_success,
- (void *)test_user_data);
+ (void *)test_user_data, &call_list);
+ grpc_call_list_run(&call_list);
/* Third request: Different service url so jwt_encode_and_sign should be
called again (no caching). */
grpc_jwt_encode_and_sign_set_override(encode_and_sign_jwt_success);
grpc_credentials_get_request_metadata(jwt_creds, NULL, other_test_service_url,
on_jwt_creds_get_metadata_success,
- (void *)test_user_data);
+ (void *)test_user_data, &call_list);
+ grpc_call_list_run(&call_list);
gpr_free(json_key_string);
grpc_credentials_unref(jwt_creds);
@@ -755,6 +795,7 @@ static void test_jwt_creds_success(void) {
static void test_jwt_creds_signing_failure(void) {
char *json_key_string = test_json_key_str();
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_credentials *jwt_creds =
grpc_service_account_jwt_access_credentials_create(
json_key_string, grpc_max_auth_token_lifetime, NULL);
@@ -764,11 +805,12 @@ static void test_jwt_creds_signing_failure(void) {
grpc_jwt_encode_and_sign_set_override(encode_and_sign_jwt_failure);
grpc_credentials_get_request_metadata(jwt_creds, NULL, test_service_url,
on_jwt_creds_get_metadata_failure,
- (void *)test_user_data);
+ (void *)test_user_data, &call_list);
gpr_free(json_key_string);
grpc_credentials_unref(jwt_creds);
grpc_jwt_encode_and_sign_set_override(NULL);
+ grpc_call_list_run(&call_list);
}
static void set_google_default_creds_env_var_with_file_contents(
diff --git a/test/core/security/jwt_verifier_test.c b/test/core/security/jwt_verifier_test.c
index 5cc8b2e9be..00a079f54e 100644
--- a/test/core/security/jwt_verifier_test.c
+++ b/test/core/security/jwt_verifier_test.c
@@ -276,14 +276,16 @@ static grpc_httpcli_response http_response(int status, char *body) {
static int httpcli_post_should_not_be_called(
const grpc_httpcli_request *request, const char *body_bytes,
size_t body_size, gpr_timespec deadline,
- grpc_httpcli_response_cb on_response, void *user_data) {
+ grpc_httpcli_response_cb on_response, void *user_data,
+ grpc_call_list *call_list) {
GPR_ASSERT("HTTP POST should not be called" == NULL);
return 1;
}
static int httpcli_get_google_keys_for_email(
const grpc_httpcli_request *request, gpr_timespec deadline,
- grpc_httpcli_response_cb on_response, void *user_data) {
+ grpc_httpcli_response_cb on_response, void *user_data,
+ grpc_call_list *call_list) {
grpc_httpcli_response response = http_response(200, good_google_email_keys());
GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl);
GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0);
@@ -291,7 +293,7 @@ static int httpcli_get_google_keys_for_email(
"/robot/v1/metadata/x509/"
"777-abaslkan11hlb6nmim3bpspl31ud@developer."
"gserviceaccount.com") == 0);
- on_response(user_data, &response);
+ on_response(user_data, &response, call_list);
gpr_free(response.body);
return 1;
}
@@ -307,6 +309,7 @@ static void on_verification_success(void *user_data,
}
static void test_jwt_verifier_google_email_issuer_success(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0);
char *jwt = NULL;
char *key_str = json_key_str(json_key_str_part3_for_google_email_issuer);
@@ -320,25 +323,29 @@ static void test_jwt_verifier_google_email_issuer_success(void) {
grpc_auth_json_key_destruct(&key);
GPR_ASSERT(jwt != NULL);
grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience,
- on_verification_success, (void *)expected_user_data);
+ on_verification_success, (void *)expected_user_data,
+ &call_list);
gpr_free(jwt);
grpc_jwt_verifier_destroy(verifier);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
static int httpcli_get_custom_keys_for_email(
const grpc_httpcli_request *request, gpr_timespec deadline,
- grpc_httpcli_response_cb on_response, void *user_data) {
+ grpc_httpcli_response_cb on_response, void *user_data,
+ grpc_call_list *call_list) {
grpc_httpcli_response response = http_response(200, gpr_strdup(good_jwk_set));
GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl);
GPR_ASSERT(strcmp(request->host, "keys.bar.com") == 0);
GPR_ASSERT(strcmp(request->path, "/jwk/foo@bar.com") == 0);
- on_response(user_data, &response);
+ on_response(user_data, &response, call_list);
gpr_free(response.body);
return 1;
}
static void test_jwt_verifier_custom_email_issuer_success(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(&custom_mapping, 1);
char *jwt = NULL;
char *key_str = json_key_str(json_key_str_part3_for_custom_email_issuer);
@@ -352,21 +359,23 @@ static void test_jwt_verifier_custom_email_issuer_success(void) {
grpc_auth_json_key_destruct(&key);
GPR_ASSERT(jwt != NULL);
grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience,
- on_verification_success, (void *)expected_user_data);
+ on_verification_success, (void *)expected_user_data,
+ &call_list);
gpr_free(jwt);
grpc_jwt_verifier_destroy(verifier);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
static int httpcli_get_jwk_set(const grpc_httpcli_request *request,
gpr_timespec deadline,
grpc_httpcli_response_cb on_response,
- void *user_data) {
+ void *user_data, grpc_call_list *call_list) {
grpc_httpcli_response response = http_response(200, gpr_strdup(good_jwk_set));
GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl);
GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0);
GPR_ASSERT(strcmp(request->path, "/oauth2/v3/certs") == 0);
- on_response(user_data, &response);
+ on_response(user_data, &response, call_list);
gpr_free(response.body);
return 1;
}
@@ -374,7 +383,8 @@ static int httpcli_get_jwk_set(const grpc_httpcli_request *request,
static int httpcli_get_openid_config(const grpc_httpcli_request *request,
gpr_timespec deadline,
grpc_httpcli_response_cb on_response,
- void *user_data) {
+ void *user_data,
+ grpc_call_list *call_list) {
grpc_httpcli_response response =
http_response(200, gpr_strdup(good_openid_config));
GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl);
@@ -382,12 +392,13 @@ static int httpcli_get_openid_config(const grpc_httpcli_request *request,
GPR_ASSERT(strcmp(request->path, GRPC_OPENID_CONFIG_URL_SUFFIX) == 0);
grpc_httpcli_set_override(httpcli_get_jwk_set,
httpcli_post_should_not_be_called);
- on_response(user_data, &response);
+ on_response(user_data, &response, call_list);
gpr_free(response.body);
return 1;
}
static void test_jwt_verifier_url_issuer_success(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0);
char *jwt = NULL;
char *key_str = json_key_str(json_key_str_part3_for_url_issuer);
@@ -401,10 +412,12 @@ static void test_jwt_verifier_url_issuer_success(void) {
grpc_auth_json_key_destruct(&key);
GPR_ASSERT(jwt != NULL);
grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience,
- on_verification_success, (void *)expected_user_data);
+ on_verification_success, (void *)expected_user_data,
+ &call_list);
gpr_free(jwt);
grpc_jwt_verifier_destroy(verifier);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
static void on_verification_key_retrieval_error(void *user_data,
@@ -418,16 +431,17 @@ static void on_verification_key_retrieval_error(void *user_data,
static int httpcli_get_bad_json(const grpc_httpcli_request *request,
gpr_timespec deadline,
grpc_httpcli_response_cb on_response,
- void *user_data) {
+ void *user_data, grpc_call_list *call_list) {
grpc_httpcli_response response =
http_response(200, gpr_strdup("{\"bad\": \"stuff\"}"));
GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl);
- on_response(user_data, &response);
+ on_response(user_data, &response, call_list);
gpr_free(response.body);
return 1;
}
static void test_jwt_verifier_url_issuer_bad_config(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0);
char *jwt = NULL;
char *key_str = json_key_str(json_key_str_part3_for_url_issuer);
@@ -442,13 +456,15 @@ static void test_jwt_verifier_url_issuer_bad_config(void) {
GPR_ASSERT(jwt != NULL);
grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience,
on_verification_key_retrieval_error,
- (void *)expected_user_data);
+ (void *)expected_user_data, &call_list);
gpr_free(jwt);
grpc_jwt_verifier_destroy(verifier);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
static void test_jwt_verifier_bad_json_key(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0);
char *jwt = NULL;
char *key_str = json_key_str(json_key_str_part3_for_google_email_issuer);
@@ -463,10 +479,11 @@ static void test_jwt_verifier_bad_json_key(void) {
GPR_ASSERT(jwt != NULL);
grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience,
on_verification_key_retrieval_error,
- (void *)expected_user_data);
+ (void *)expected_user_data, &call_list);
gpr_free(jwt);
grpc_jwt_verifier_destroy(verifier);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
static void corrupt_jwt_sig(char *jwt) {
@@ -495,6 +512,7 @@ static void on_verification_bad_signature(void *user_data,
}
static void test_jwt_verifier_bad_signature(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0);
char *jwt = NULL;
char *key_str = json_key_str(json_key_str_part3_for_url_issuer);
@@ -510,15 +528,17 @@ static void test_jwt_verifier_bad_signature(void) {
GPR_ASSERT(jwt != NULL);
grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience,
on_verification_bad_signature,
- (void *)expected_user_data);
+ (void *)expected_user_data, &call_list);
gpr_free(jwt);
grpc_jwt_verifier_destroy(verifier);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
static int httpcli_get_should_not_be_called(
const grpc_httpcli_request *request, gpr_timespec deadline,
- grpc_httpcli_response_cb on_response, void *user_data) {
+ grpc_httpcli_response_cb on_response, void *user_data,
+ grpc_call_list *call_list) {
GPR_ASSERT(0);
return 1;
}
@@ -532,14 +552,16 @@ static void on_verification_bad_format(void *user_data,
}
static void test_jwt_verifier_bad_format(void) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0);
grpc_httpcli_set_override(httpcli_get_should_not_be_called,
httpcli_post_should_not_be_called);
grpc_jwt_verifier_verify(verifier, NULL, "bad jwt", expected_audience,
on_verification_bad_format,
- (void *)expected_user_data);
+ (void *)expected_user_data, &call_list);
grpc_jwt_verifier_destroy(verifier);
grpc_httpcli_set_override(NULL, NULL);
+ grpc_call_list_run(&call_list);
}
/* find verification key: bad jks, cannot find key in jks */
diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c
index 45beb05c1c..fad52d0c59 100644
--- a/test/core/security/oauth2_utils.c
+++ b/test/core/security/oauth2_utils.c
@@ -93,7 +93,7 @@ char *grpc_test_fetch_oauth2_token_with_credentials(grpc_credentials *creds) {
while (!request.is_done) {
grpc_pollset_worker worker;
grpc_pollset_work(&request.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset));
diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c
index e0bdea527a..f7948b56af 100644
--- a/test/core/security/secure_endpoint_test.c
+++ b/test/core/security/secure_endpoint_test.c
@@ -46,18 +46,18 @@
#include "src/core/tsi/fake_transport_security.h"
static grpc_pollset g_pollset;
-static grpc_workqueue *g_workqueue;
static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
size_t slice_size, gpr_slice *leftover_slices, size_t leftover_nslices) {
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
tsi_frame_protector *fake_read_protector = tsi_create_fake_protector(NULL);
tsi_frame_protector *fake_write_protector = tsi_create_fake_protector(NULL);
grpc_endpoint_test_fixture f;
grpc_endpoint_pair tcp;
- tcp = grpc_iomgr_create_endpoint_pair("fixture", slice_size, g_workqueue);
- grpc_endpoint_add_to_pollset(tcp.client, &g_pollset);
- grpc_endpoint_add_to_pollset(tcp.server, &g_pollset);
+ tcp = grpc_iomgr_create_endpoint_pair("fixture", slice_size);
+ grpc_endpoint_add_to_pollset(tcp.client, &g_pollset, &call_list);
+ grpc_endpoint_add_to_pollset(tcp.server, &g_pollset, &call_list);
if (leftover_nslices == 0) {
f.client_ep =
@@ -110,6 +110,7 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
f.server_ep =
grpc_secure_endpoint_create(fake_write_protector, tcp.server, NULL, 0);
+ grpc_call_list_run(&call_list);
return f;
}
@@ -137,41 +138,56 @@ static grpc_endpoint_test_config configs[] = {
secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up},
};
+static void inc_call_ctr(void *arg, int success, grpc_call_list *call_list) {
+ ++*(int *)arg;
+ ;
+}
+
static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) {
grpc_endpoint_test_fixture f = config.create_fixture(slice_size);
gpr_slice_buffer incoming;
gpr_slice s =
gpr_slice_from_copied_string("hello world 12345678900987654321");
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ int n = 0;
+ grpc_closure done_closure;
gpr_log(GPR_INFO, "Start test left over");
gpr_slice_buffer_init(&incoming);
- GPR_ASSERT(grpc_endpoint_read(f.client_ep, &incoming, NULL) ==
- GRPC_ENDPOINT_DONE);
+ grpc_closure_init(&done_closure, inc_call_ctr, &n);
+ grpc_endpoint_read(f.client_ep, &incoming, NULL, &call_list);
+ grpc_call_list_run(&call_list);
+ GPR_ASSERT(n == 1);
GPR_ASSERT(incoming.count == 1);
GPR_ASSERT(0 == gpr_slice_cmp(s, incoming.slices[0]));
- grpc_endpoint_shutdown(f.client_ep);
- grpc_endpoint_shutdown(f.server_ep);
- grpc_endpoint_destroy(f.client_ep);
- grpc_endpoint_destroy(f.server_ep);
+ grpc_endpoint_shutdown(f.client_ep, &call_list);
+ grpc_endpoint_shutdown(f.server_ep, &call_list);
+ grpc_endpoint_destroy(f.client_ep, &call_list);
+ grpc_endpoint_destroy(f.server_ep, &call_list);
+ grpc_call_list_run(&call_list);
gpr_slice_unref(s);
gpr_slice_buffer_destroy(&incoming);
clean_up();
}
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(void *p, int success, grpc_call_list *call_list) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_test_init(argc, argv);
grpc_init();
- g_workqueue = grpc_workqueue_create();
grpc_pollset_init(&g_pollset);
grpc_endpoint_tests(configs[0], &g_pollset);
test_leftover(configs[1], 1);
- GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list);
+ grpc_call_list_run(&call_list);
grpc_shutdown();
return 0;
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index 0eeb5dac45..06bab3279e 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -75,12 +75,14 @@ static void test_wait_empty(void) {
shutdown_and_destroy(cc);
}
-static void do_nothing_end_completion(void *arg, grpc_cq_completion *c) {}
+static void do_nothing_end_completion(void *arg, grpc_cq_completion *c,
+ grpc_call_list *call_list) {}
static void test_cq_end_op(void) {
grpc_event ev;
grpc_completion_queue *cc;
grpc_cq_completion completion;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
void *tag = create_test_tag();
LOG_TEST("test_cq_end_op");
@@ -88,7 +90,8 @@ static void test_cq_end_op(void) {
cc = grpc_completion_queue_create(NULL);
grpc_cq_begin_op(cc);
- grpc_cq_end_op(cc, tag, 1, do_nothing_end_completion, NULL, &completion);
+ grpc_cq_end_op(cc, tag, 1, do_nothing_end_completion, NULL, &completion,
+ &call_list);
ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
@@ -96,6 +99,7 @@ static void test_cq_end_op(void) {
GPR_ASSERT(ev.success);
shutdown_and_destroy(cc);
+ GPR_ASSERT(grpc_call_list_empty(call_list));
}
static void test_shutdown_then_next_polling(void) {
@@ -129,6 +133,7 @@ static void test_pluck(void) {
grpc_completion_queue *cc;
void *tags[128];
grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
unsigned i, j;
LOG_TEST("test_pluck");
@@ -145,7 +150,7 @@ static void test_pluck(void) {
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc);
grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL,
- &completions[i]);
+ &completions[i], &call_list);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
@@ -157,7 +162,7 @@ static void test_pluck(void) {
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
grpc_cq_begin_op(cc);
grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL,
- &completions[i]);
+ &completions[i], &call_list);
}
for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
@@ -167,6 +172,7 @@ static void test_pluck(void) {
}
shutdown_and_destroy(cc);
+ GPR_ASSERT(grpc_call_list_empty(call_list));
}
#define TEST_THREAD_EVENTS 10000
@@ -186,13 +192,15 @@ gpr_timespec ten_seconds_time(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
}
-static void free_completion(void *arg, grpc_cq_completion *completion) {
+static void free_completion(void *arg, grpc_cq_completion *completion,
+ grpc_call_list *call_list) {
gpr_free(completion);
}
static void producer_thread(void *arg) {
test_thread_options *opt = arg;
int i;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_log(GPR_INFO, "producer %d started", opt->id);
gpr_event_set(&opt->on_started, (void *)(gpr_intptr)1);
@@ -210,12 +218,13 @@ static void producer_thread(void *arg) {
gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
for (i = 0; i < TEST_THREAD_EVENTS; i++) {
grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL,
- gpr_malloc(sizeof(grpc_cq_completion)));
+ gpr_malloc(sizeof(grpc_cq_completion)), &call_list);
opt->events_triggered++;
}
gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
gpr_event_set(&opt->on_finished, (void *)(gpr_intptr)1);
+ GPR_ASSERT(grpc_call_list_empty(call_list));
}
static void consumer_thread(void *arg) {
diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c
index fa3798aa3c..98b5686e91 100644
--- a/test/core/util/port_posix.c
+++ b/test/core/util/port_posix.c
@@ -116,7 +116,7 @@ static void free_port_using_server(char *server, int port) {
while (!pr.done) {
grpc_pollset_worker worker;
grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), &call_list);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));
@@ -272,7 +272,7 @@ static int pick_port_using_server(char *server) {
while (pr.port == -1) {
grpc_pollset_worker worker;
grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1));
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), &call_list);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));
diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c
index b4eb7ed916..eeb3ecb55a 100644
--- a/test/core/util/reconnect_server.c
+++ b/test/core/util/reconnect_server.c
@@ -138,10 +138,12 @@ void reconnect_server_poll(reconnect_server *server, int seconds) {
gpr_timespec deadline =
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
gpr_time_from_seconds(seconds, GPR_TIMESPAN));
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_mu_lock(GRPC_POLLSET_MU(&server->pollset));
grpc_pollset_work(&server->pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
+ deadline, &call_list);
gpr_mu_unlock(GRPC_POLLSET_MU(&server->pollset));
+ grpc_call_list_run(&call_list);
}
void reconnect_server_clear_timestamps(reconnect_server *server) {