diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-21 14:39:57 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-21 14:39:57 -0700 |
commit | dfff1b8126a1f83833fd99626517f28d1e68453a (patch) | |
tree | 40a9aa8126c08a11fb1a5cdd4058f504e05dca43 /test | |
parent | 3ffd8220a17fd2fdf64adc66b03e4e254880471b (diff) |
Call list progress
Diffstat (limited to 'test')
21 files changed, 471 insertions, 261 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index e89d5b3d39..fd37fce976 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -70,9 +70,11 @@ static void server_setup_transport(void *ts, grpc_transport *transport, thd_args *a = ts; static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; - grpc_server_setup_transport(a->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, workqueue, - grpc_server_get_channel_args(a->server)); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_server_setup_transport( + a->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx, + workqueue, grpc_server_get_channel_args(a->server), &call_list); + grpc_call_list_run(&call_list); } void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, @@ -116,9 +118,11 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, a.validator = validator; grpc_server_register_completion_queue(a.server, a.cq, NULL); grpc_server_start(a.server); - transport = grpc_create_chttp2_transport(NULL, sfd.server, mdctx, 0); + transport = + grpc_create_chttp2_transport(NULL, sfd.server, mdctx, 0, &call_list); server_setup_transport(&a, transport, mdctx, workqueue); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list); + grpc_call_list_run(&call_list); /* Bind everything into the same pollset */ grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq), &call_list); diff --git a/test/core/channel/channel_stack_test.c b/test/core/channel/channel_stack_test.c index 60129afc43..07df921270 100644 --- a/test/core/channel/channel_stack_test.c +++ b/test/core/channel/channel_stack_test.c @@ -44,7 +44,7 @@ static void channel_init_func(grpc_channel_element *elem, grpc_channel *master, const grpc_channel_args *args, grpc_mdctx *metadata_context, int is_first, - int is_last) { + int is_last, grpc_call_list *call_list) { GPR_ASSERT(args->num_args == 1); GPR_ASSERT(args->args[0].type == GRPC_ARG_INTEGER); GPR_ASSERT(0 == strcmp(args->args[0].key, "test_key")); @@ -56,26 +56,33 @@ static void channel_init_func(grpc_channel_element *elem, grpc_channel *master, static void call_init_func(grpc_call_element *elem, const void *server_transport_data, - grpc_transport_stream_op *initial_op) { + grpc_transport_stream_op *initial_op, + grpc_call_list *call_list) { ++*(int *)(elem->channel_data); *(int *)(elem->call_data) = 0; } -static void channel_destroy_func(grpc_channel_element *elem) {} +static void channel_destroy_func(grpc_channel_element *elem, + grpc_call_list *call_list) {} -static void call_destroy_func(grpc_call_element *elem) { +static void call_destroy_func(grpc_call_element *elem, + grpc_call_list *call_list) { ++*(int *)(elem->channel_data); } -static void call_func(grpc_call_element *elem, grpc_transport_stream_op *op) { +static void call_func(grpc_call_element *elem, grpc_transport_stream_op *op, + grpc_call_list *call_list) { ++*(int *)(elem->call_data); } -static void channel_func(grpc_channel_element *elem, grpc_transport_op *op) { +static void channel_func(grpc_channel_element *elem, grpc_transport_op *op, + grpc_call_list *call_list) { ++*(int *)(elem->channel_data); } -static char *get_peer(grpc_call_element *elem) { return gpr_strdup("peer"); } +static char *get_peer(grpc_call_element *elem, grpc_call_list *call_list) { + return gpr_strdup("peer"); +} static void test_create_channel_stack(void) { const grpc_channel_filter filter = {call_func, channel_func, @@ -93,6 +100,7 @@ static void test_create_channel_stack(void) { grpc_mdctx *metadata_context; int *channel_data; int *call_data; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; metadata_context = grpc_mdctx_create(); @@ -105,14 +113,14 @@ static void test_create_channel_stack(void) { channel_stack = gpr_malloc(grpc_channel_stack_size(&filters, 1)); grpc_channel_stack_init(&filters, 1, NULL, &chan_args, metadata_context, - channel_stack); + channel_stack, &call_list); GPR_ASSERT(channel_stack->count == 1); channel_elem = grpc_channel_stack_element(channel_stack, 0); channel_data = (int *)channel_elem->channel_data; GPR_ASSERT(*channel_data == 0); call_stack = gpr_malloc(channel_stack->call_stack_size); - grpc_call_stack_init(channel_stack, NULL, NULL, call_stack); + grpc_call_stack_init(channel_stack, NULL, NULL, call_stack, &call_list); GPR_ASSERT(call_stack->count == 1); call_elem = grpc_call_stack_element(call_stack, 0); GPR_ASSERT(call_elem->filter == channel_elem->filter); @@ -121,14 +129,16 @@ static void test_create_channel_stack(void) { GPR_ASSERT(*call_data == 0); GPR_ASSERT(*channel_data == 1); - grpc_call_stack_destroy(call_stack); + grpc_call_stack_destroy(call_stack, &call_list); gpr_free(call_stack); GPR_ASSERT(*channel_data == 2); - grpc_channel_stack_destroy(channel_stack); + grpc_channel_stack_destroy(channel_stack, &call_list); gpr_free(channel_stack); grpc_mdctx_unref(metadata_context); + + GPR_ASSERT(grpc_call_list_empty(call_list)); } int main(int argc, char **argv) { diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c index 04dbf48522..be8132bc96 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.c +++ b/test/core/end2end/fixtures/h2_sockpair+trace.c @@ -64,9 +64,11 @@ static void server_setup_transport(void *ts, grpc_transport *transport, grpc_end2end_test_fixture *f = ts; static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; - grpc_server_setup_transport(f->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, g_workqueue, - grpc_server_get_channel_args(f->server)); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_server_setup_transport( + f->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx, + g_workqueue, grpc_server_get_channel_args(f->server), &call_list); + grpc_call_list_run(&call_list); } typedef struct { @@ -75,16 +77,17 @@ typedef struct { } sp_client_setup; static void client_setup_transport(void *ts, grpc_transport *transport, - grpc_mdctx *mdctx) { + grpc_mdctx *mdctx, + grpc_call_list *call_list) { sp_client_setup *cs = ts; const grpc_channel_filter *filters[] = {&grpc_http_client_filter, &grpc_compress_filter, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); - grpc_channel *channel = - grpc_channel_create_from_filters("socketpair-target", filters, nfilters, - cs->client_args, mdctx, g_workqueue, 1); + grpc_channel *channel = grpc_channel_create_from_filters( + "socketpair-target", filters, nfilters, cs->client_args, mdctx, + g_workqueue, 1, call_list); cs->f->client = channel; @@ -108,20 +111,24 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *client_args) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_endpoint_pair *sfd = f->fixture_data; grpc_transport *transport; grpc_mdctx *mdctx = grpc_mdctx_create(); sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1); - client_setup_transport(&cs, transport, mdctx); + transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1, + &call_list); + client_setup_transport(&cs, transport, mdctx, &call_list); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list); + grpc_call_list_run(&call_list); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_endpoint_pair *sfd = f->fixture_data; grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_transport *transport; @@ -129,9 +136,11 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq, NULL); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0, + &call_list); server_setup_transport(f, transport, mdctx); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list); + grpc_call_list_run(&call_list); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) { diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c index b661dbfd60..08f918cbad 100644 --- a/test/core/end2end/fixtures/h2_sockpair.c +++ b/test/core/end2end/fixtures/h2_sockpair.c @@ -63,9 +63,11 @@ static void server_setup_transport(void *ts, grpc_transport *transport, grpc_end2end_test_fixture *f = ts; static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; - grpc_server_setup_transport(f->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, g_workqueue, - grpc_server_get_channel_args(f->server)); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_server_setup_transport( + f->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx, + g_workqueue, grpc_server_get_channel_args(f->server), &call_list); + grpc_call_list_run(&call_list); } typedef struct { @@ -74,16 +76,17 @@ typedef struct { } sp_client_setup; static void client_setup_transport(void *ts, grpc_transport *transport, - grpc_mdctx *mdctx) { + grpc_mdctx *mdctx, + grpc_call_list *call_list) { sp_client_setup *cs = ts; const grpc_channel_filter *filters[] = {&grpc_http_client_filter, &grpc_compress_filter, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); - grpc_channel *channel = - grpc_channel_create_from_filters("socketpair-target", filters, nfilters, - cs->client_args, mdctx, g_workqueue, 1); + grpc_channel *channel = grpc_channel_create_from_filters( + "socketpair-target", filters, nfilters, cs->client_args, mdctx, + g_workqueue, 1, call_list); cs->f->client = channel; @@ -107,20 +110,24 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *client_args) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_endpoint_pair *sfd = f->fixture_data; grpc_transport *transport; grpc_mdctx *mdctx = grpc_mdctx_create(); sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1); - client_setup_transport(&cs, transport, mdctx); + transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1, + &call_list); + client_setup_transport(&cs, transport, mdctx, &call_list); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list); + grpc_call_list_run(&call_list); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_endpoint_pair *sfd = f->fixture_data; grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_transport *transport; @@ -128,9 +135,11 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq, NULL); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0, + &call_list); server_setup_transport(f, transport, mdctx); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list); + grpc_call_list_run(&call_list); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) { diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c index 6cf656bcde..46ffaff745 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.c +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c @@ -63,9 +63,11 @@ static void server_setup_transport(void *ts, grpc_transport *transport, grpc_end2end_test_fixture *f = ts; static grpc_channel_filter const *extra_filters[] = { &grpc_http_server_filter}; - grpc_server_setup_transport(f->server, transport, extra_filters, - GPR_ARRAY_SIZE(extra_filters), mdctx, g_workqueue, - grpc_server_get_channel_args(f->server)); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_server_setup_transport( + f->server, transport, extra_filters, GPR_ARRAY_SIZE(extra_filters), mdctx, + g_workqueue, grpc_server_get_channel_args(f->server), &call_list); + grpc_call_list_run(&call_list); } typedef struct { @@ -74,16 +76,17 @@ typedef struct { } sp_client_setup; static void client_setup_transport(void *ts, grpc_transport *transport, - grpc_mdctx *mdctx) { + grpc_mdctx *mdctx, + grpc_call_list *call_list) { sp_client_setup *cs = ts; const grpc_channel_filter *filters[] = {&grpc_http_client_filter, &grpc_compress_filter, &grpc_connected_channel_filter}; size_t nfilters = sizeof(filters) / sizeof(*filters); - grpc_channel *channel = - grpc_channel_create_from_filters("socketpair-target", filters, nfilters, - cs->client_args, mdctx, g_workqueue, 1); + grpc_channel *channel = grpc_channel_create_from_filters( + "socketpair-target", filters, nfilters, cs->client_args, mdctx, + g_workqueue, 1, call_list); cs->f->client = channel; @@ -107,20 +110,24 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *client_args) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_endpoint_pair *sfd = f->fixture_data; grpc_transport *transport; grpc_mdctx *mdctx = grpc_mdctx_create(); sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1); - client_setup_transport(&cs, transport, mdctx); + transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1, + &call_list); + client_setup_transport(&cs, transport, mdctx, &call_list); GPR_ASSERT(f->client); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list); + grpc_call_list_run(&call_list); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_endpoint_pair *sfd = f->fixture_data; grpc_mdctx *mdctx = grpc_mdctx_create(); grpc_transport *transport; @@ -128,9 +135,11 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq, NULL); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0, + &call_list); server_setup_transport(f, transport, mdctx); - grpc_chttp2_transport_start_reading(transport, NULL, 0); + grpc_chttp2_transport_start_reading(transport, NULL, 0, &call_list); + grpc_call_list_run(&call_list); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) { diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c index cf2b10c021..bcb544e502 100644 --- a/test/core/httpcli/httpcli_test.c +++ b/test/core/httpcli/httpcli_test.c @@ -53,7 +53,8 @@ static gpr_timespec n_seconds_time(int seconds) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(seconds); } -static void on_finish(void *arg, const grpc_httpcli_response *response) { +static void on_finish(void *arg, const grpc_httpcli_response *response, + grpc_call_list *call_list) { const char *expect = "<html><head><title>Hello world!</title></head>" "<body><p>This is a test</p></body></html>"; @@ -71,6 +72,7 @@ static void on_finish(void *arg, const grpc_httpcli_response *response) { static void test_get(int use_ssl, int port) { grpc_httpcli_request req; char *host; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; g_done = 0; gpr_log(GPR_INFO, "running %s with use_ssl=%d.", "test_get", use_ssl); @@ -84,12 +86,15 @@ static void test_get(int use_ssl, int port) { req.handshaker = use_ssl ? &grpc_httpcli_ssl : &grpc_httpcli_plaintext; grpc_httpcli_get(&g_context, &g_pollset, &req, n_seconds_time(15), on_finish, - (void *)42); + (void *)42, &call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - n_seconds_time(20)); + n_seconds_time(20), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_free(host); @@ -98,6 +103,7 @@ static void test_get(int use_ssl, int port) { static void test_post(int use_ssl, int port) { grpc_httpcli_request req; char *host; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; g_done = 0; gpr_log(GPR_INFO, "running %s with use_ssl=%d.", "test_post", (int)use_ssl); @@ -111,20 +117,27 @@ static void test_post(int use_ssl, int port) { req.handshaker = use_ssl ? &grpc_httpcli_ssl : &grpc_httpcli_plaintext; grpc_httpcli_post(&g_context, &g_pollset, &req, "hello", 5, - n_seconds_time(15), on_finish, (void *)42); + n_seconds_time(15), on_finish, (void *)42, &call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!g_done) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - n_seconds_time(20)); + n_seconds_time(20), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_free(host); } -static void destroy_pollset(void *ignored) { grpc_pollset_destroy(&g_pollset); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_subprocess *server; char *me = argv[0]; char *lslash = strrchr(me, '/'); @@ -161,7 +174,9 @@ int main(int argc, char **argv) { test_post(0, port); grpc_httpcli_context_destroy(&g_context); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, NULL); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); gpr_subprocess_destroy(server); diff --git a/test/core/iomgr/alarm_list_test.c b/test/core/iomgr/alarm_list_test.c index 56d662e61a..8082532434 100644 --- a/test/core/iomgr/alarm_list_test.c +++ b/test/core/iomgr/alarm_list_test.c @@ -42,11 +42,8 @@ #define MAX_CB 30 static int cb_called[MAX_CB][2]; -static int kicks; -void grpc_kick_poller(void) { ++kicks; } - -static void cb(void *arg, int success) { +static void cb(void *arg, int success, grpc_call_list *call_list) { cb_called[(gpr_intptr)arg][success]++; } @@ -54,6 +51,7 @@ static void add_test(void) { gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME); int i; grpc_alarm alarms[20]; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_alarm_list_init(start); memset(cb_called, 0, sizeof(cb_called)); @@ -62,55 +60,56 @@ static void add_test(void) { for (i = 0; i < 10; i++) { grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis(10, GPR_TIMESPAN)), - cb, (void *)(gpr_intptr)i, start); + cb, (void *)(gpr_intptr)i, start, &call_list); } /* 1010 ms alarms. will expire in the next epoch */ for (i = 10; i < 20; i++) { grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis( 1010, GPR_TIMESPAN)), - cb, (void *)(gpr_intptr)i, start); + cb, (void *)(gpr_intptr)i, start, &call_list); } /* collect alarms. Only the first batch should be ready. */ - GPR_ASSERT(10 == grpc_alarm_check(NULL, - gpr_time_add(start, gpr_time_from_millis( + GPR_ASSERT(10 == grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis( 500, GPR_TIMESPAN)), - NULL)); + NULL, &call_list)); + grpc_call_list_run(&call_list); for (i = 0; i < 20; i++) { GPR_ASSERT(cb_called[i][1] == (i < 10)); GPR_ASSERT(cb_called[i][0] == 0); } - GPR_ASSERT(0 == grpc_alarm_check( - NULL, gpr_time_add( - start, gpr_time_from_millis(600, GPR_TIMESPAN)), - NULL)); + GPR_ASSERT(0 == grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis( + 600, GPR_TIMESPAN)), + NULL, &call_list)); + grpc_call_list_run(&call_list); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 10)); GPR_ASSERT(cb_called[i][0] == 0); } /* collect the rest of the alarms */ - GPR_ASSERT( - 10 == grpc_alarm_check(NULL, gpr_time_add(start, gpr_time_from_millis( - 1500, GPR_TIMESPAN)), - NULL)); + GPR_ASSERT(10 == + grpc_alarm_check( + gpr_time_add(start, gpr_time_from_millis(1500, GPR_TIMESPAN)), + NULL, &call_list)); + grpc_call_list_run(&call_list); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 20)); GPR_ASSERT(cb_called[i][0] == 0); } - GPR_ASSERT(0 == grpc_alarm_check(NULL, - gpr_time_add(start, gpr_time_from_millis( + GPR_ASSERT(0 == grpc_alarm_check(gpr_time_add(start, gpr_time_from_millis( 1600, GPR_TIMESPAN)), - NULL)); + NULL, &call_list)); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 20)); GPR_ASSERT(cb_called[i][0] == 0); } - grpc_alarm_list_shutdown(); + grpc_alarm_list_shutdown(&call_list); + grpc_call_list_run(&call_list); } static gpr_timespec tfm(int m) { @@ -122,28 +121,32 @@ static gpr_timespec tfm(int m) { /* Cleaning up a list with pending alarms. */ void destruction_test(void) { grpc_alarm alarms[5]; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_alarm_list_init(gpr_time_0(GPR_CLOCK_REALTIME)); memset(cb_called, 0, sizeof(cb_called)); grpc_alarm_init(&alarms[0], tfm(100), cb, (void *)(gpr_intptr)0, - gpr_time_0(GPR_CLOCK_REALTIME)); + gpr_time_0(GPR_CLOCK_REALTIME), &call_list); grpc_alarm_init(&alarms[1], tfm(3), cb, (void *)(gpr_intptr)1, - gpr_time_0(GPR_CLOCK_REALTIME)); + gpr_time_0(GPR_CLOCK_REALTIME), &call_list); grpc_alarm_init(&alarms[2], tfm(100), cb, (void *)(gpr_intptr)2, - gpr_time_0(GPR_CLOCK_REALTIME)); + gpr_time_0(GPR_CLOCK_REALTIME), &call_list); grpc_alarm_init(&alarms[3], tfm(3), cb, (void *)(gpr_intptr)3, - gpr_time_0(GPR_CLOCK_REALTIME)); + gpr_time_0(GPR_CLOCK_REALTIME), &call_list); grpc_alarm_init(&alarms[4], tfm(1), cb, (void *)(gpr_intptr)4, - gpr_time_0(GPR_CLOCK_REALTIME)); - GPR_ASSERT(1 == grpc_alarm_check(NULL, tfm(2), NULL)); + gpr_time_0(GPR_CLOCK_REALTIME), &call_list); + GPR_ASSERT(1 == grpc_alarm_check(tfm(2), NULL, &call_list)); + grpc_call_list_run(&call_list); GPR_ASSERT(1 == cb_called[4][1]); - grpc_alarm_cancel(&alarms[0]); - grpc_alarm_cancel(&alarms[3]); + grpc_alarm_cancel(&alarms[0], &call_list); + grpc_alarm_cancel(&alarms[3], &call_list); + grpc_call_list_run(&call_list); GPR_ASSERT(1 == cb_called[0][0]); GPR_ASSERT(1 == cb_called[3][0]); - grpc_alarm_list_shutdown(); + grpc_alarm_list_shutdown(&call_list); + grpc_call_list_run(&call_list); GPR_ASSERT(1 == cb_called[1][0]); GPR_ASSERT(1 == cb_called[2][0]); } diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c index eafac0dc46..bfced83409 100644 --- a/test/core/iomgr/alarm_test.c +++ b/test/core/iomgr/alarm_test.c @@ -65,7 +65,8 @@ typedef struct { } alarm_arg; /* Called when an alarm expires. */ -static void alarm_cb(void *arg /* alarm_arg */, int success) { +static void alarm_cb(void *arg /* alarm_arg */, int success, + grpc_call_list *call_list) { alarm_arg *a = arg; gpr_mu_lock(&a->mu); if (success) { @@ -90,6 +91,7 @@ static void test_grpc_alarm(void) { */ gpr_timespec alarm_deadline; gpr_timespec followup_deadline; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; alarm_arg arg; alarm_arg arg2; @@ -107,7 +109,7 @@ static void test_grpc_alarm(void) { gpr_event_init(&arg.fcb_arg); grpc_alarm_init(&alarm, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), alarm_cb, &arg, - gpr_now(GPR_CLOCK_MONOTONIC)); + gpr_now(GPR_CLOCK_MONOTONIC), &call_list); alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); gpr_mu_lock(&arg.mu); @@ -157,8 +159,8 @@ static void test_grpc_alarm(void) { gpr_event_init(&arg2.fcb_arg); grpc_alarm_init(&alarm_to_cancel, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), - alarm_cb, &arg2, gpr_now(GPR_CLOCK_MONOTONIC)); - grpc_alarm_cancel(&alarm_to_cancel); + alarm_cb, &arg2, gpr_now(GPR_CLOCK_MONOTONIC), &call_list); + grpc_alarm_cancel(&alarm_to_cancel, &call_list); alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); gpr_mu_lock(&arg2.mu); diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c index 9fe54771bd..491effd5ae 100644 --- a/test/core/iomgr/endpoint_pair_test.c +++ b/test/core/iomgr/endpoint_pair_test.c @@ -43,20 +43,20 @@ #include "test/core/iomgr/endpoint_tests.h" static grpc_pollset g_pollset; -static grpc_workqueue *g_workqueue; static void clean_up(void) {} static grpc_endpoint_test_fixture create_fixture_endpoint_pair( size_t slice_size) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_endpoint_test_fixture f; - grpc_endpoint_pair p = - grpc_iomgr_create_endpoint_pair("test", slice_size, g_workqueue); + grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", slice_size); f.client_ep = p.client; f.server_ep = p.server; - grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset); - grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset); + grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset, &call_list); + grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset, &call_list); + grpc_call_list_run(&call_list); return f; } @@ -65,16 +65,20 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_endpoint_pair, clean_up}, }; -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_init(); grpc_pollset_init(&g_pollset); - g_workqueue = grpc_workqueue_create(); grpc_endpoint_tests(configs[0], &g_pollset); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); - GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); return 0; diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index a5079f17a5..02618dfd66 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -243,9 +243,10 @@ static void read_and_write_test(grpc_endpoint_test_config config, grpc_pollset_worker worker; GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + deadline, &call_list); } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); + grpc_call_list_run(&call_list); end_test(config); gpr_slice_buffer_destroy(&state.outgoing); diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c index 30852050ce..a1b1171b07 100644 --- a/test/core/iomgr/fd_conservation_posix_test.c +++ b/test/core/iomgr/fd_conservation_posix_test.c @@ -43,11 +43,9 @@ int main(int argc, char **argv) { int i; struct rlimit rlim; grpc_endpoint_pair p; - grpc_workqueue *workqueue; grpc_test_init(argc, argv); grpc_iomgr_init(); - workqueue = grpc_workqueue_create(); /* set max # of file descriptors to a low value, and verify we can create and destroy many more than this number @@ -56,12 +54,13 @@ int main(int argc, char **argv) { GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim)); for (i = 0; i < 100; i++) { - p = grpc_iomgr_create_endpoint_pair("test", 1, workqueue); - grpc_endpoint_destroy(p.client); - grpc_endpoint_destroy(p.server); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + p = grpc_iomgr_create_endpoint_pair("test", 1); + grpc_endpoint_destroy(p.client, &call_list); + grpc_endpoint_destroy(p.server, &call_list); + grpc_call_list_run(&call_list); } - GRPC_WORKQUEUE_UNREF(workqueue, "destroy"); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index ae6b56da77..f89d6c7824 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -52,7 +52,6 @@ #include "test/core/util/test_config.h" static grpc_pollset g_pollset; -static grpc_workqueue *g_workqueue; /* buffer size used to send and receive data. 1024 is the minimal value to set TCP send and receive buffer. */ @@ -119,18 +118,18 @@ typedef struct { /* Called when an upload session can be safely shutdown. Close session FD and start to shutdown listen FD. */ static void session_shutdown_cb(void *arg, /*session*/ - int success) { + int success, grpc_call_list *call_list) { session *se = arg; server *sv = se->sv; - grpc_fd_orphan(se->em_fd, NULL, "a"); + grpc_fd_orphan(se->em_fd, NULL, "a", call_list); gpr_free(se); /* Start to shutdown listen fd. */ - grpc_fd_shutdown(sv->em_fd); + grpc_fd_shutdown(sv->em_fd, call_list); } /* Called when data become readable in a session. */ static void session_read_cb(void *arg, /*session*/ - int success) { + int success, grpc_call_list *call_list) { session *se = arg; int fd = se->em_fd->fd; @@ -138,7 +137,7 @@ static void session_read_cb(void *arg, /*session*/ ssize_t read_total = 0; if (!success) { - session_shutdown_cb(arg, 1); + session_shutdown_cb(arg, 1, call_list); return; } @@ -153,7 +152,7 @@ static void session_read_cb(void *arg, /*session*/ It is possible to read nothing due to spurious edge event or data has been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ if (read_once == 0) { - session_shutdown_cb(arg, 1); + session_shutdown_cb(arg, 1, call_list); } else if (read_once == -1) { if (errno == EAGAIN) { /* An edge triggered event is cached in the kernel until next poll. @@ -164,7 +163,7 @@ static void session_read_cb(void *arg, /*session*/ TODO(chenw): in multi-threaded version, callback and polling can be run in different threads. polling may catch a persist read edge event before notify_on_read is called. */ - grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure, call_list); } else { gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); abort(); @@ -174,10 +173,11 @@ static void session_read_cb(void *arg, /*session*/ /* Called when the listen FD can be safely shutdown. Close listen FD and signal that server can be shutdown. */ -static void listen_shutdown_cb(void *arg /*server*/, int success) { +static void listen_shutdown_cb(void *arg /*server*/, int success, + grpc_call_list *call_list) { server *sv = arg; - grpc_fd_orphan(sv->em_fd, NULL, "b"); + grpc_fd_orphan(sv->em_fd, NULL, "b", call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); sv->done = 1; @@ -187,7 +187,7 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) { /* Called when a new TCP connection request arrives in the listening port. */ static void listen_cb(void *arg, /*=sv_arg*/ - int success) { + int success, grpc_call_list *call_list) { server *sv = arg; int fd; int flags; @@ -197,7 +197,7 @@ static void listen_cb(void *arg, /*=sv_arg*/ grpc_fd *listen_em_fd = sv->em_fd; if (!success) { - listen_shutdown_cb(arg, 1); + listen_shutdown_cb(arg, 1, call_list); return; } @@ -208,13 +208,13 @@ static void listen_cb(void *arg, /*=sv_arg*/ fcntl(fd, F_SETFL, flags | O_NONBLOCK); se = gpr_malloc(sizeof(*se)); se->sv = sv; - se->em_fd = grpc_fd_create(fd, g_workqueue, "listener"); - grpc_pollset_add_fd(&g_pollset, se->em_fd); + se->em_fd = grpc_fd_create(fd, "listener"); + grpc_pollset_add_fd(&g_pollset, se->em_fd, call_list); se->session_read_closure.cb = session_read_cb; se->session_read_closure.cb_arg = se; - grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure, call_list); - grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure, call_list); } /* Max number of connections pending to be accepted by listen(). */ @@ -224,7 +224,7 @@ static void listen_cb(void *arg, /*=sv_arg*/ listen_cb() is registered to be interested in reading from listen_fd. When connection request arrives, listen_cb() is called to accept the connection request. */ -static int server_start(server *sv) { +static int server_start(server *sv, grpc_call_list *call_list) { int port = 0; int fd; struct sockaddr_in sin; @@ -237,12 +237,12 @@ static int server_start(server *sv) { port = ntohs(sin.sin_port); GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); - sv->em_fd = grpc_fd_create(fd, g_workqueue, "server"); - grpc_pollset_add_fd(&g_pollset, sv->em_fd); + sv->em_fd = grpc_fd_create(fd, "server"); + grpc_pollset_add_fd(&g_pollset, sv->em_fd, call_list); /* Register to be interested in reading from listen_fd. */ sv->listen_closure.cb = listen_cb; sv->listen_closure.cb_arg = sv; - grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure, call_list); return port; } @@ -251,9 +251,13 @@ static int server_start(server *sv) { static void server_wait_and_shutdown(server *sv) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!sv->done) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -286,23 +290,24 @@ static void client_init(client *cl) { } /* Called when a client upload session is ready to shutdown. */ -static void client_session_shutdown_cb(void *arg /*client*/, int success) { +static void client_session_shutdown_cb(void *arg /*client*/, int success, + grpc_call_list *call_list) { client *cl = arg; - grpc_fd_orphan(cl->em_fd, NULL, "c"); + grpc_fd_orphan(cl->em_fd, NULL, "c", call_list); cl->done = 1; grpc_pollset_kick(&g_pollset, NULL); } /* Write as much as possible, then register notify_on_write. */ static void client_session_write(void *arg, /*client*/ - int success) { + int success, grpc_call_list *call_list) { client *cl = arg; int fd = cl->em_fd->fd; ssize_t write_once = 0; if (!success) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - client_session_shutdown_cb(arg, 1); + client_session_shutdown_cb(arg, 1, call_list); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); return; } @@ -317,10 +322,10 @@ static void client_session_write(void *arg, /*client*/ if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { cl->write_closure.cb = client_session_write; cl->write_closure.cb_arg = cl; - grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure); + grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure, call_list); cl->client_write_cnt++; } else { - client_session_shutdown_cb(arg, 1); + client_session_shutdown_cb(arg, 1, call_list); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { @@ -330,7 +335,7 @@ static void client_session_write(void *arg, /*client*/ } /* Start a client to send a stream of bytes. */ -static void client_start(client *cl, int port) { +static void client_start(client *cl, int port, grpc_call_list *call_list) { int fd; struct sockaddr_in sin; create_test_socket(port, &fd, &sin); @@ -350,10 +355,10 @@ static void client_start(client *cl, int port) { } } - cl->em_fd = grpc_fd_create(fd, g_workqueue, "client"); - grpc_pollset_add_fd(&g_pollset, cl->em_fd); + cl->em_fd = grpc_fd_create(fd, "client"); + grpc_pollset_add_fd(&g_pollset, cl->em_fd, call_list); - client_session_write(cl, 1); + client_session_write(cl, 1, call_list); } /* Wait for the signal to shutdown a client. */ @@ -361,8 +366,12 @@ static void client_wait_and_shutdown(client *cl) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!cl->done) { grpc_pollset_worker worker; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -374,11 +383,13 @@ static void test_grpc_fd(void) { server sv; client cl; int port; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; server_init(&sv); - port = server_start(&sv); + port = server_start(&sv, &call_list); client_init(&cl); - client_start(&cl, port); + client_start(&cl, port, &call_list); + grpc_call_list_run(&call_list); client_wait_and_shutdown(&cl); server_wait_and_shutdown(&sv); GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total); @@ -386,14 +397,15 @@ static void test_grpc_fd(void) { } typedef struct fd_change_data { - void (*cb_that_ran)(void *, int success); + void (*cb_that_ran)(void *, int success, grpc_call_list *call_list); } fd_change_data; void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; } void destroy_change_data(fd_change_data *fdc) {} -static void first_read_callback(void *arg /* fd_change_data */, int success) { +static void first_read_callback(void *arg /* fd_change_data */, int success, + grpc_call_list *call_list) { fd_change_data *fdc = arg; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -402,7 +414,8 @@ static void first_read_callback(void *arg /* fd_change_data */, int success) { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } -static void second_read_callback(void *arg /* fd_change_data */, int success) { +static void second_read_callback(void *arg /* fd_change_data */, int success, + grpc_call_list *call_list) { fd_change_data *fdc = arg; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -424,6 +437,7 @@ static void test_grpc_fd_change(void) { ssize_t result; grpc_closure first_closure; grpc_closure second_closure; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; first_closure.cb = first_read_callback; first_closure.cb_arg = &a; @@ -439,11 +453,11 @@ static void test_grpc_fd_change(void) { flags = fcntl(sv[1], F_GETFL, 0); GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); - em_fd = grpc_fd_create(sv[0], g_workqueue, "test_grpc_fd_change"); - grpc_pollset_add_fd(&g_pollset, em_fd); + em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); + grpc_pollset_add_fd(&g_pollset, em_fd, &call_list); /* Register the first callback, then make its FD readable */ - grpc_fd_notify_on_read(em_fd, &first_closure); + grpc_fd_notify_on_read(em_fd, &first_closure, &call_list); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -453,7 +467,10 @@ static void test_grpc_fd_change(void) { while (a.cb_that_ran == NULL) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(a.cb_that_ran == first_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -464,7 +481,7 @@ static void test_grpc_fd_change(void) { /* Now register a second callback with distinct change data, and do the same thing again. */ - grpc_fd_notify_on_read(em_fd, &second_closure); + grpc_fd_notify_on_read(em_fd, &second_closure, &call_list); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -473,29 +490,37 @@ static void test_grpc_fd_change(void) { while (b.cb_that_ran == NULL) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_fd_orphan(em_fd, NULL, "d"); + grpc_fd_orphan(em_fd, NULL, "d", &call_list); + grpc_call_list_run(&call_list); destroy_change_data(&a); destroy_change_data(&b); close(sv[1]); } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_iomgr_init(); grpc_pollset_init(&g_pollset); - g_workqueue = grpc_workqueue_create(); test_grpc_fd(); test_grpc_fd_change(); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); - GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c index 668c5399f9..2ea3b56ee6 100644 --- a/test/core/iomgr/resolve_address_test.c +++ b/test/core/iomgr/resolve_address_test.c @@ -42,14 +42,16 @@ static gpr_timespec test_deadline(void) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(100); } -static void must_succeed(void* evp, grpc_resolved_addresses* p) { +static void must_succeed(void* evp, grpc_resolved_addresses* p, + grpc_call_list* call_list) { GPR_ASSERT(p); GPR_ASSERT(p->naddrs >= 1); grpc_resolved_addresses_destroy(p); gpr_event_set(evp, (void*)1); } -static void must_fail(void* evp, grpc_resolved_addresses* p) { +static void must_fail(void* evp, grpc_resolved_addresses* p, + grpc_call_list* call_list) { GPR_ASSERT(!p); gpr_event_set(evp, (void*)1); } diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index 4a6786c0a2..a5a3d0ea7e 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -49,8 +49,8 @@ static grpc_pollset_set g_pollset_set; static grpc_pollset g_pollset; -static grpc_workqueue *g_workqueue; static int g_connections_complete = 0; +static grpc_endpoint *g_connecting = NULL; static gpr_timespec test_deadline(void) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); @@ -63,15 +63,18 @@ static void finish_connection() { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } -static void must_succeed(void *arg, grpc_endpoint *tcp) { - GPR_ASSERT(tcp); - grpc_endpoint_shutdown(tcp); - grpc_endpoint_destroy(tcp); +static void must_succeed(void *arg, int success, grpc_call_list *call_list) { + GPR_ASSERT(g_connecting != NULL); + GPR_ASSERT(success); + grpc_endpoint_shutdown(g_connecting, call_list); + grpc_endpoint_destroy(g_connecting, call_list); + g_connecting = NULL; finish_connection(); } -static void must_fail(void *arg, grpc_endpoint *tcp) { - GPR_ASSERT(!tcp); +static void must_fail(void *arg, int success, grpc_call_list *call_list) { + GPR_ASSERT(g_connecting == NULL); + GPR_ASSERT(!success); finish_connection(); } @@ -81,6 +84,8 @@ void test_succeeds(void) { int svr_fd; int r; int connections_complete_before; + grpc_closure done; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_log(GPR_DEBUG, "test_succeeds"); @@ -99,9 +104,10 @@ void test_succeeds(void) { /* connect to it */ GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); - grpc_tcp_client_connect(must_succeed, NULL, &g_pollset_set, g_workqueue, + grpc_closure_init(&done, must_succeed, NULL); + grpc_tcp_client_connect(&done, &g_connecting, &g_pollset_set, (struct sockaddr *)&addr, addr_len, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), &call_list); /* await the connection */ do { @@ -116,7 +122,10 @@ void test_succeeds(void) { while (g_connections_complete == connections_complete_before) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -126,6 +135,8 @@ void test_fails(void) { struct sockaddr_in addr; socklen_t addr_len = sizeof(addr); int connections_complete_before; + grpc_closure done; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_log(GPR_DEBUG, "test_fails"); @@ -137,9 +148,10 @@ void test_fails(void) { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); /* connect to a broken address */ - grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set, g_workqueue, + grpc_closure_init(&done, must_fail, NULL); + grpc_tcp_client_connect(&done, &g_connecting, &g_pollset_set, (struct sockaddr *)&addr, addr_len, - gpr_inf_future(GPR_CLOCK_REALTIME)); + gpr_inf_future(GPR_CLOCK_REALTIME), &call_list); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -147,7 +159,10 @@ void test_fails(void) { while (g_connections_complete == connections_complete_before) { grpc_pollset_worker worker; grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - test_deadline()); + test_deadline(), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -163,6 +178,8 @@ void test_times_out(void) { int r; int connections_complete_before; gpr_timespec connect_deadline; + grpc_closure done; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_log(GPR_DEBUG, "test_times_out"); @@ -196,8 +213,10 @@ void test_times_out(void) { connections_complete_before = g_connections_complete; gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set, g_workqueue, - (struct sockaddr *)&addr, addr_len, connect_deadline); + grpc_closure_init(&done, must_fail, NULL); + grpc_tcp_client_connect(&done, &g_connecting, &g_pollset_set, + (struct sockaddr *)&addr, addr_len, connect_deadline, + &call_list); /* Make sure the event doesn't trigger early */ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -222,7 +241,10 @@ void test_times_out(void) { connections_complete_before + is_after_deadline); } grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10), &call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_call_list_run(&call_list); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -232,22 +254,27 @@ void test_times_out(void) { } } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_init(); grpc_pollset_set_init(&g_pollset_set); grpc_pollset_init(&g_pollset); - grpc_pollset_set_add_pollset(&g_pollset_set, &g_pollset); - g_workqueue = grpc_workqueue_create(); + grpc_pollset_set_add_pollset(&g_pollset_set, &g_pollset, &call_list); + grpc_call_list_run(&call_list); test_succeeds(); gpr_log(GPR_ERROR, "End of first test"); test_fails(); test_times_out(); grpc_pollset_set_destroy(&g_pollset_set); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); - GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); return 0; } diff --git a/test/core/security/credentials_test.c b/test/core/security/credentials_test.c index 97ebaa0570..3d0ed094c9 100644 --- a/test/core/security/credentials_test.c +++ b/test/core/security/credentials_test.c @@ -318,7 +318,8 @@ static void check_metadata(expected_md *expected, grpc_credentials_md *md_elems, static void check_google_iam_metadata(void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { + grpc_credentials_status status, + grpc_call_list *call_list) { grpc_credentials *c = (grpc_credentials *)user_data; expected_md emd[] = {{GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, test_google_iam_authorization_token}, @@ -331,19 +332,23 @@ static void check_google_iam_metadata(void *user_data, } static void test_google_iam_creds(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *creds = grpc_google_iam_credentials_create( test_google_iam_authorization_token, test_google_iam_authority_selector, NULL); GPR_ASSERT(grpc_credentials_has_request_metadata(creds)); GPR_ASSERT(grpc_credentials_has_request_metadata_only(creds)); grpc_credentials_get_request_metadata(creds, NULL, test_service_url, - check_google_iam_metadata, creds); + check_google_iam_metadata, creds, + &call_list); + grpc_call_list_run(&call_list); } static void check_access_token_metadata(void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { + grpc_credentials_status status, + grpc_call_list *call_list) { grpc_credentials *c = (grpc_credentials *)user_data; expected_md emd[] = {{GRPC_AUTHORIZATION_METADATA_KEY, "Bearer blah"}}; GPR_ASSERT(status == GRPC_CREDENTIALS_OK); @@ -353,17 +358,22 @@ static void check_access_token_metadata(void *user_data, } static void test_access_token_creds(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *creds = grpc_access_token_credentials_create("blah", NULL); GPR_ASSERT(grpc_credentials_has_request_metadata(creds)); GPR_ASSERT(grpc_credentials_has_request_metadata_only(creds)); GPR_ASSERT(strcmp(creds->type, GRPC_CREDENTIALS_TYPE_OAUTH2) == 0); grpc_credentials_get_request_metadata(creds, NULL, test_service_url, - check_access_token_metadata, creds); + check_access_token_metadata, creds, + &call_list); + grpc_call_list_run(&call_list); } -static void check_ssl_oauth2_composite_metadata( - void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { +static void check_ssl_oauth2_composite_metadata(void *user_data, + grpc_credentials_md *md_elems, + size_t num_md, + grpc_credentials_status status, + grpc_call_list *call_list) { grpc_credentials *c = (grpc_credentials *)user_data; expected_md emd[] = { {GRPC_AUTHORIZATION_METADATA_KEY, test_oauth2_bearer_token}}; @@ -374,6 +384,7 @@ static void check_ssl_oauth2_composite_metadata( } static void test_ssl_oauth2_composite_creds(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *ssl_creds = grpc_ssl_credentials_create(test_root_cert, NULL, NULL); const grpc_credentials_array *creds_array; @@ -395,7 +406,8 @@ static void test_ssl_oauth2_composite_creds(void) { GRPC_CREDENTIALS_TYPE_OAUTH2) == 0); grpc_credentials_get_request_metadata(composite_creds, NULL, test_service_url, check_ssl_oauth2_composite_metadata, - composite_creds); + composite_creds, &call_list); + grpc_call_list_run(&call_list); } void test_ssl_fake_transport_security_composite_creds_failure(void) { @@ -412,7 +424,7 @@ void test_ssl_fake_transport_security_composite_creds_failure(void) { static void check_ssl_oauth2_google_iam_composite_metadata( void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { + grpc_credentials_status status, grpc_call_list *call_list) { grpc_credentials *c = (grpc_credentials *)user_data; expected_md emd[] = { {GRPC_AUTHORIZATION_METADATA_KEY, test_oauth2_bearer_token}, @@ -427,6 +439,7 @@ static void check_ssl_oauth2_google_iam_composite_metadata( } static void test_ssl_oauth2_google_iam_composite_creds(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *ssl_creds = grpc_ssl_credentials_create(test_root_cert, NULL, NULL); const grpc_credentials_array *creds_array; @@ -457,12 +470,16 @@ static void test_ssl_oauth2_google_iam_composite_creds(void) { GRPC_CREDENTIALS_TYPE_IAM) == 0); grpc_credentials_get_request_metadata( composite_creds, NULL, test_service_url, - check_ssl_oauth2_google_iam_composite_metadata, composite_creds); + check_ssl_oauth2_google_iam_composite_metadata, composite_creds, + &call_list); + grpc_call_list_run(&call_list); } -static void on_oauth2_creds_get_metadata_success( - void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { +static void on_oauth2_creds_get_metadata_success(void *user_data, + grpc_credentials_md *md_elems, + size_t num_md, + grpc_credentials_status status, + grpc_call_list *call_list) { GPR_ASSERT(status == GRPC_CREDENTIALS_OK); GPR_ASSERT(num_md == 1); GPR_ASSERT(gpr_slice_str_cmp(md_elems[0].key, "Authorization") == 0); @@ -473,9 +490,11 @@ static void on_oauth2_creds_get_metadata_success( GPR_ASSERT(strcmp((const char *)user_data, test_user_data) == 0); } -static void on_oauth2_creds_get_metadata_failure( - void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { +static void on_oauth2_creds_get_metadata_failure(void *user_data, + grpc_credentials_md *md_elems, + size_t num_md, + grpc_credentials_status status, + grpc_call_list *call_list) { GPR_ASSERT(status == GRPC_CREDENTIALS_ERROR); GPR_ASSERT(num_md == 0); GPR_ASSERT(user_data != NULL); @@ -497,39 +516,44 @@ static void validate_compute_engine_http_request( static int compute_engine_httpcli_get_success_override( const grpc_httpcli_request *request, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { grpc_httpcli_response response = http_response(200, valid_oauth2_json_response); validate_compute_engine_http_request(request); - on_response(user_data, &response); + on_response(user_data, &response, call_list); return 1; } static int compute_engine_httpcli_get_failure_override( const grpc_httpcli_request *request, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { grpc_httpcli_response response = http_response(403, "Not Authorized."); validate_compute_engine_http_request(request); - on_response(user_data, &response); + on_response(user_data, &response, call_list); return 1; } static int httpcli_post_should_not_be_called( const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { GPR_ASSERT("HTTP POST should not be called" == NULL); return 1; } static int httpcli_get_should_not_be_called( const grpc_httpcli_request *request, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { GPR_ASSERT("HTTP GET should not be called" == NULL); return 1; } static void test_compute_engine_creds_success(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *compute_engine_creds = grpc_google_compute_engine_credentials_create(NULL); GPR_ASSERT(grpc_credentials_has_request_metadata(compute_engine_creds)); @@ -540,20 +564,23 @@ static void test_compute_engine_creds_success(void) { httpcli_post_should_not_be_called); grpc_credentials_get_request_metadata( compute_engine_creds, NULL, test_service_url, - on_oauth2_creds_get_metadata_success, (void *)test_user_data); + on_oauth2_creds_get_metadata_success, (void *)test_user_data, &call_list); + grpc_call_list_run(&call_list); /* Second request: the cached token should be served directly. */ grpc_httpcli_set_override(httpcli_get_should_not_be_called, httpcli_post_should_not_be_called); grpc_credentials_get_request_metadata( compute_engine_creds, NULL, test_service_url, - on_oauth2_creds_get_metadata_success, (void *)test_user_data); + on_oauth2_creds_get_metadata_success, (void *)test_user_data, &call_list); + grpc_call_list_run(&call_list); grpc_credentials_unref(compute_engine_creds); grpc_httpcli_set_override(NULL, NULL); } static void test_compute_engine_creds_failure(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *compute_engine_creds = grpc_google_compute_engine_credentials_create(NULL); grpc_httpcli_set_override(compute_engine_httpcli_get_failure_override, @@ -562,9 +589,10 @@ static void test_compute_engine_creds_failure(void) { GPR_ASSERT(grpc_credentials_has_request_metadata_only(compute_engine_creds)); grpc_credentials_get_request_metadata( compute_engine_creds, NULL, test_service_url, - on_oauth2_creds_get_metadata_failure, (void *)test_user_data); + on_oauth2_creds_get_metadata_failure, (void *)test_user_data, &call_list); grpc_credentials_unref(compute_engine_creds); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } static void validate_refresh_token_http_request( @@ -592,25 +620,26 @@ static void validate_refresh_token_http_request( static int refresh_token_httpcli_post_success( const grpc_httpcli_request *request, const char *body, size_t body_size, gpr_timespec deadline, grpc_httpcli_response_cb on_response, - void *user_data) { + void *user_data, grpc_call_list *call_list) { grpc_httpcli_response response = http_response(200, valid_oauth2_json_response); validate_refresh_token_http_request(request, body, body_size); - on_response(user_data, &response); + on_response(user_data, &response, call_list); return 1; } static int refresh_token_httpcli_post_failure( const grpc_httpcli_request *request, const char *body, size_t body_size, gpr_timespec deadline, grpc_httpcli_response_cb on_response, - void *user_data) { + void *user_data, grpc_call_list *call_list) { grpc_httpcli_response response = http_response(403, "Not Authorized."); validate_refresh_token_http_request(request, body, body_size); - on_response(user_data, &response); + on_response(user_data, &response, call_list); return 1; } static void test_refresh_token_creds_success(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *refresh_token_creds = grpc_google_refresh_token_credentials_create(test_refresh_token_str, NULL); @@ -622,20 +651,24 @@ static void test_refresh_token_creds_success(void) { refresh_token_httpcli_post_success); grpc_credentials_get_request_metadata( refresh_token_creds, NULL, test_service_url, - on_oauth2_creds_get_metadata_success, (void *)test_user_data); + on_oauth2_creds_get_metadata_success, (void *)test_user_data, &call_list); + grpc_call_list_run(&call_list); /* Second request: the cached token should be served directly. */ grpc_httpcli_set_override(httpcli_get_should_not_be_called, httpcli_post_should_not_be_called); grpc_credentials_get_request_metadata( refresh_token_creds, NULL, test_service_url, - on_oauth2_creds_get_metadata_success, (void *)test_user_data); + on_oauth2_creds_get_metadata_success, (void *)test_user_data, &call_list); + grpc_call_list_run(&call_list); grpc_credentials_unref(refresh_token_creds); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } static void test_refresh_token_creds_failure(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *refresh_token_creds = grpc_google_refresh_token_credentials_create(test_refresh_token_str, NULL); @@ -645,9 +678,10 @@ static void test_refresh_token_creds_failure(void) { GPR_ASSERT(grpc_credentials_has_request_metadata_only(refresh_token_creds)); grpc_credentials_get_request_metadata( refresh_token_creds, NULL, test_service_url, - on_oauth2_creds_get_metadata_failure, (void *)test_user_data); + on_oauth2_creds_get_metadata_failure, (void *)test_user_data, &call_list); grpc_credentials_unref(refresh_token_creds); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } static void validate_jwt_encode_and_sign_params( @@ -698,7 +732,8 @@ static char *encode_and_sign_jwt_should_not_be_called( static void on_jwt_creds_get_metadata_success(void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { + grpc_credentials_status status, + grpc_call_list *call_list) { char *expected_md_value; gpr_asprintf(&expected_md_value, "Bearer %s", test_signed_jwt); GPR_ASSERT(status == GRPC_CREDENTIALS_OK); @@ -713,7 +748,8 @@ static void on_jwt_creds_get_metadata_success(void *user_data, static void on_jwt_creds_get_metadata_failure(void *user_data, grpc_credentials_md *md_elems, size_t num_md, - grpc_credentials_status status) { + grpc_credentials_status status, + grpc_call_list *call_list) { GPR_ASSERT(status == GRPC_CREDENTIALS_ERROR); GPR_ASSERT(num_md == 0); GPR_ASSERT(user_data != NULL); @@ -722,6 +758,7 @@ static void on_jwt_creds_get_metadata_failure(void *user_data, static void test_jwt_creds_success(void) { char *json_key_string = test_json_key_str(); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *jwt_creds = grpc_service_account_jwt_access_credentials_create( json_key_string, grpc_max_auth_token_lifetime, NULL); @@ -732,21 +769,24 @@ static void test_jwt_creds_success(void) { grpc_jwt_encode_and_sign_set_override(encode_and_sign_jwt_success); grpc_credentials_get_request_metadata(jwt_creds, NULL, test_service_url, on_jwt_creds_get_metadata_success, - (void *)test_user_data); + (void *)test_user_data, &call_list); + grpc_call_list_run(&call_list); /* Second request: the cached token should be served directly. */ grpc_jwt_encode_and_sign_set_override( encode_and_sign_jwt_should_not_be_called); grpc_credentials_get_request_metadata(jwt_creds, NULL, test_service_url, on_jwt_creds_get_metadata_success, - (void *)test_user_data); + (void *)test_user_data, &call_list); + grpc_call_list_run(&call_list); /* Third request: Different service url so jwt_encode_and_sign should be called again (no caching). */ grpc_jwt_encode_and_sign_set_override(encode_and_sign_jwt_success); grpc_credentials_get_request_metadata(jwt_creds, NULL, other_test_service_url, on_jwt_creds_get_metadata_success, - (void *)test_user_data); + (void *)test_user_data, &call_list); + grpc_call_list_run(&call_list); gpr_free(json_key_string); grpc_credentials_unref(jwt_creds); @@ -755,6 +795,7 @@ static void test_jwt_creds_success(void) { static void test_jwt_creds_signing_failure(void) { char *json_key_string = test_json_key_str(); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_credentials *jwt_creds = grpc_service_account_jwt_access_credentials_create( json_key_string, grpc_max_auth_token_lifetime, NULL); @@ -764,11 +805,12 @@ static void test_jwt_creds_signing_failure(void) { grpc_jwt_encode_and_sign_set_override(encode_and_sign_jwt_failure); grpc_credentials_get_request_metadata(jwt_creds, NULL, test_service_url, on_jwt_creds_get_metadata_failure, - (void *)test_user_data); + (void *)test_user_data, &call_list); gpr_free(json_key_string); grpc_credentials_unref(jwt_creds); grpc_jwt_encode_and_sign_set_override(NULL); + grpc_call_list_run(&call_list); } static void set_google_default_creds_env_var_with_file_contents( diff --git a/test/core/security/jwt_verifier_test.c b/test/core/security/jwt_verifier_test.c index 5cc8b2e9be..00a079f54e 100644 --- a/test/core/security/jwt_verifier_test.c +++ b/test/core/security/jwt_verifier_test.c @@ -276,14 +276,16 @@ static grpc_httpcli_response http_response(int status, char *body) { static int httpcli_post_should_not_be_called( const grpc_httpcli_request *request, const char *body_bytes, size_t body_size, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { GPR_ASSERT("HTTP POST should not be called" == NULL); return 1; } static int httpcli_get_google_keys_for_email( const grpc_httpcli_request *request, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { grpc_httpcli_response response = http_response(200, good_google_email_keys()); GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0); @@ -291,7 +293,7 @@ static int httpcli_get_google_keys_for_email( "/robot/v1/metadata/x509/" "777-abaslkan11hlb6nmim3bpspl31ud@developer." "gserviceaccount.com") == 0); - on_response(user_data, &response); + on_response(user_data, &response, call_list); gpr_free(response.body); return 1; } @@ -307,6 +309,7 @@ static void on_verification_success(void *user_data, } static void test_jwt_verifier_google_email_issuer_success(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0); char *jwt = NULL; char *key_str = json_key_str(json_key_str_part3_for_google_email_issuer); @@ -320,25 +323,29 @@ static void test_jwt_verifier_google_email_issuer_success(void) { grpc_auth_json_key_destruct(&key); GPR_ASSERT(jwt != NULL); grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience, - on_verification_success, (void *)expected_user_data); + on_verification_success, (void *)expected_user_data, + &call_list); gpr_free(jwt); grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } static int httpcli_get_custom_keys_for_email( const grpc_httpcli_request *request, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { grpc_httpcli_response response = http_response(200, gpr_strdup(good_jwk_set)); GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "keys.bar.com") == 0); GPR_ASSERT(strcmp(request->path, "/jwk/foo@bar.com") == 0); - on_response(user_data, &response); + on_response(user_data, &response, call_list); gpr_free(response.body); return 1; } static void test_jwt_verifier_custom_email_issuer_success(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(&custom_mapping, 1); char *jwt = NULL; char *key_str = json_key_str(json_key_str_part3_for_custom_email_issuer); @@ -352,21 +359,23 @@ static void test_jwt_verifier_custom_email_issuer_success(void) { grpc_auth_json_key_destruct(&key); GPR_ASSERT(jwt != NULL); grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience, - on_verification_success, (void *)expected_user_data); + on_verification_success, (void *)expected_user_data, + &call_list); gpr_free(jwt); grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } static int httpcli_get_jwk_set(const grpc_httpcli_request *request, gpr_timespec deadline, grpc_httpcli_response_cb on_response, - void *user_data) { + void *user_data, grpc_call_list *call_list) { grpc_httpcli_response response = http_response(200, gpr_strdup(good_jwk_set)); GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); GPR_ASSERT(strcmp(request->host, "www.googleapis.com") == 0); GPR_ASSERT(strcmp(request->path, "/oauth2/v3/certs") == 0); - on_response(user_data, &response); + on_response(user_data, &response, call_list); gpr_free(response.body); return 1; } @@ -374,7 +383,8 @@ static int httpcli_get_jwk_set(const grpc_httpcli_request *request, static int httpcli_get_openid_config(const grpc_httpcli_request *request, gpr_timespec deadline, grpc_httpcli_response_cb on_response, - void *user_data) { + void *user_data, + grpc_call_list *call_list) { grpc_httpcli_response response = http_response(200, gpr_strdup(good_openid_config)); GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); @@ -382,12 +392,13 @@ static int httpcli_get_openid_config(const grpc_httpcli_request *request, GPR_ASSERT(strcmp(request->path, GRPC_OPENID_CONFIG_URL_SUFFIX) == 0); grpc_httpcli_set_override(httpcli_get_jwk_set, httpcli_post_should_not_be_called); - on_response(user_data, &response); + on_response(user_data, &response, call_list); gpr_free(response.body); return 1; } static void test_jwt_verifier_url_issuer_success(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0); char *jwt = NULL; char *key_str = json_key_str(json_key_str_part3_for_url_issuer); @@ -401,10 +412,12 @@ static void test_jwt_verifier_url_issuer_success(void) { grpc_auth_json_key_destruct(&key); GPR_ASSERT(jwt != NULL); grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience, - on_verification_success, (void *)expected_user_data); + on_verification_success, (void *)expected_user_data, + &call_list); gpr_free(jwt); grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } static void on_verification_key_retrieval_error(void *user_data, @@ -418,16 +431,17 @@ static void on_verification_key_retrieval_error(void *user_data, static int httpcli_get_bad_json(const grpc_httpcli_request *request, gpr_timespec deadline, grpc_httpcli_response_cb on_response, - void *user_data) { + void *user_data, grpc_call_list *call_list) { grpc_httpcli_response response = http_response(200, gpr_strdup("{\"bad\": \"stuff\"}")); GPR_ASSERT(request->handshaker == &grpc_httpcli_ssl); - on_response(user_data, &response); + on_response(user_data, &response, call_list); gpr_free(response.body); return 1; } static void test_jwt_verifier_url_issuer_bad_config(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0); char *jwt = NULL; char *key_str = json_key_str(json_key_str_part3_for_url_issuer); @@ -442,13 +456,15 @@ static void test_jwt_verifier_url_issuer_bad_config(void) { GPR_ASSERT(jwt != NULL); grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience, on_verification_key_retrieval_error, - (void *)expected_user_data); + (void *)expected_user_data, &call_list); gpr_free(jwt); grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } static void test_jwt_verifier_bad_json_key(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0); char *jwt = NULL; char *key_str = json_key_str(json_key_str_part3_for_google_email_issuer); @@ -463,10 +479,11 @@ static void test_jwt_verifier_bad_json_key(void) { GPR_ASSERT(jwt != NULL); grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience, on_verification_key_retrieval_error, - (void *)expected_user_data); + (void *)expected_user_data, &call_list); gpr_free(jwt); grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } static void corrupt_jwt_sig(char *jwt) { @@ -495,6 +512,7 @@ static void on_verification_bad_signature(void *user_data, } static void test_jwt_verifier_bad_signature(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0); char *jwt = NULL; char *key_str = json_key_str(json_key_str_part3_for_url_issuer); @@ -510,15 +528,17 @@ static void test_jwt_verifier_bad_signature(void) { GPR_ASSERT(jwt != NULL); grpc_jwt_verifier_verify(verifier, NULL, jwt, expected_audience, on_verification_bad_signature, - (void *)expected_user_data); + (void *)expected_user_data, &call_list); gpr_free(jwt); grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } static int httpcli_get_should_not_be_called( const grpc_httpcli_request *request, gpr_timespec deadline, - grpc_httpcli_response_cb on_response, void *user_data) { + grpc_httpcli_response_cb on_response, void *user_data, + grpc_call_list *call_list) { GPR_ASSERT(0); return 1; } @@ -532,14 +552,16 @@ static void on_verification_bad_format(void *user_data, } static void test_jwt_verifier_bad_format(void) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_jwt_verifier *verifier = grpc_jwt_verifier_create(NULL, 0); grpc_httpcli_set_override(httpcli_get_should_not_be_called, httpcli_post_should_not_be_called); grpc_jwt_verifier_verify(verifier, NULL, "bad jwt", expected_audience, on_verification_bad_format, - (void *)expected_user_data); + (void *)expected_user_data, &call_list); grpc_jwt_verifier_destroy(verifier); grpc_httpcli_set_override(NULL, NULL); + grpc_call_list_run(&call_list); } /* find verification key: bad jks, cannot find key in jks */ diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index 45beb05c1c..fad52d0c59 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -93,7 +93,7 @@ char *grpc_test_fetch_oauth2_token_with_credentials(grpc_credentials *creds) { while (!request.is_done) { grpc_pollset_worker worker; grpc_pollset_work(&request.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_inf_future(GPR_CLOCK_MONOTONIC), &call_list); } gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset)); diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index e0bdea527a..f7948b56af 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -46,18 +46,18 @@ #include "src/core/tsi/fake_transport_security.h" static grpc_pollset g_pollset; -static grpc_workqueue *g_workqueue; static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( size_t slice_size, gpr_slice *leftover_slices, size_t leftover_nslices) { + grpc_call_list call_list = GRPC_CALL_LIST_INIT; tsi_frame_protector *fake_read_protector = tsi_create_fake_protector(NULL); tsi_frame_protector *fake_write_protector = tsi_create_fake_protector(NULL); grpc_endpoint_test_fixture f; grpc_endpoint_pair tcp; - tcp = grpc_iomgr_create_endpoint_pair("fixture", slice_size, g_workqueue); - grpc_endpoint_add_to_pollset(tcp.client, &g_pollset); - grpc_endpoint_add_to_pollset(tcp.server, &g_pollset); + tcp = grpc_iomgr_create_endpoint_pair("fixture", slice_size); + grpc_endpoint_add_to_pollset(tcp.client, &g_pollset, &call_list); + grpc_endpoint_add_to_pollset(tcp.server, &g_pollset, &call_list); if (leftover_nslices == 0) { f.client_ep = @@ -110,6 +110,7 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( f.server_ep = grpc_secure_endpoint_create(fake_write_protector, tcp.server, NULL, 0); + grpc_call_list_run(&call_list); return f; } @@ -137,41 +138,56 @@ static grpc_endpoint_test_config configs[] = { secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up}, }; +static void inc_call_ctr(void *arg, int success, grpc_call_list *call_list) { + ++*(int *)arg; + ; +} + static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { grpc_endpoint_test_fixture f = config.create_fixture(slice_size); gpr_slice_buffer incoming; gpr_slice s = gpr_slice_from_copied_string("hello world 12345678900987654321"); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + int n = 0; + grpc_closure done_closure; gpr_log(GPR_INFO, "Start test left over"); gpr_slice_buffer_init(&incoming); - GPR_ASSERT(grpc_endpoint_read(f.client_ep, &incoming, NULL) == - GRPC_ENDPOINT_DONE); + grpc_closure_init(&done_closure, inc_call_ctr, &n); + grpc_endpoint_read(f.client_ep, &incoming, NULL, &call_list); + grpc_call_list_run(&call_list); + GPR_ASSERT(n == 1); GPR_ASSERT(incoming.count == 1); GPR_ASSERT(0 == gpr_slice_cmp(s, incoming.slices[0])); - grpc_endpoint_shutdown(f.client_ep); - grpc_endpoint_shutdown(f.server_ep); - grpc_endpoint_destroy(f.client_ep); - grpc_endpoint_destroy(f.server_ep); + grpc_endpoint_shutdown(f.client_ep, &call_list); + grpc_endpoint_shutdown(f.server_ep, &call_list); + grpc_endpoint_destroy(f.client_ep, &call_list); + grpc_endpoint_destroy(f.server_ep, &call_list); + grpc_call_list_run(&call_list); gpr_slice_unref(s); gpr_slice_buffer_destroy(&incoming); clean_up(); } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(void *p, int success, grpc_call_list *call_list) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_init(); - g_workqueue = grpc_workqueue_create(); grpc_pollset_init(&g_pollset); grpc_endpoint_tests(configs[0], &g_pollset); test_leftover(configs[1], 1); - GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&g_pollset, &destroyed, &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); return 0; diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index 0eeb5dac45..06bab3279e 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -75,12 +75,14 @@ static void test_wait_empty(void) { shutdown_and_destroy(cc); } -static void do_nothing_end_completion(void *arg, grpc_cq_completion *c) {} +static void do_nothing_end_completion(void *arg, grpc_cq_completion *c, + grpc_call_list *call_list) {} static void test_cq_end_op(void) { grpc_event ev; grpc_completion_queue *cc; grpc_cq_completion completion; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; void *tag = create_test_tag(); LOG_TEST("test_cq_end_op"); @@ -88,7 +90,8 @@ static void test_cq_end_op(void) { cc = grpc_completion_queue_create(NULL); grpc_cq_begin_op(cc); - grpc_cq_end_op(cc, tag, 1, do_nothing_end_completion, NULL, &completion); + grpc_cq_end_op(cc, tag, 1, do_nothing_end_completion, NULL, &completion, + &call_list); ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL); GPR_ASSERT(ev.type == GRPC_OP_COMPLETE); @@ -96,6 +99,7 @@ static void test_cq_end_op(void) { GPR_ASSERT(ev.success); shutdown_and_destroy(cc); + GPR_ASSERT(grpc_call_list_empty(call_list)); } static void test_shutdown_then_next_polling(void) { @@ -129,6 +133,7 @@ static void test_pluck(void) { grpc_completion_queue *cc; void *tags[128]; grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)]; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; unsigned i, j; LOG_TEST("test_pluck"); @@ -145,7 +150,7 @@ static void test_pluck(void) { for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { grpc_cq_begin_op(cc); grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, - &completions[i]); + &completions[i], &call_list); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -157,7 +162,7 @@ static void test_pluck(void) { for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { grpc_cq_begin_op(cc); grpc_cq_end_op(cc, tags[i], 1, do_nothing_end_completion, NULL, - &completions[i]); + &completions[i], &call_list); } for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) { @@ -167,6 +172,7 @@ static void test_pluck(void) { } shutdown_and_destroy(cc); + GPR_ASSERT(grpc_call_list_empty(call_list)); } #define TEST_THREAD_EVENTS 10000 @@ -186,13 +192,15 @@ gpr_timespec ten_seconds_time(void) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); } -static void free_completion(void *arg, grpc_cq_completion *completion) { +static void free_completion(void *arg, grpc_cq_completion *completion, + grpc_call_list *call_list) { gpr_free(completion); } static void producer_thread(void *arg) { test_thread_options *opt = arg; int i; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_log(GPR_INFO, "producer %d started", opt->id); gpr_event_set(&opt->on_started, (void *)(gpr_intptr)1); @@ -210,12 +218,13 @@ static void producer_thread(void *arg) { gpr_log(GPR_INFO, "producer %d phase 2", opt->id); for (i = 0; i < TEST_THREAD_EVENTS; i++) { grpc_cq_end_op(opt->cc, (void *)(gpr_intptr)1, 1, free_completion, NULL, - gpr_malloc(sizeof(grpc_cq_completion))); + gpr_malloc(sizeof(grpc_cq_completion)), &call_list); opt->events_triggered++; } gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id); gpr_event_set(&opt->on_finished, (void *)(gpr_intptr)1); + GPR_ASSERT(grpc_call_list_empty(call_list)); } static void consumer_thread(void *arg) { diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index fa3798aa3c..98b5686e91 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -116,7 +116,7 @@ static void free_port_using_server(char *server, int port) { while (!pr.done) { grpc_pollset_worker worker; grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), &call_list); } gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); @@ -272,7 +272,7 @@ static int pick_port_using_server(char *server) { while (pr.port == -1) { grpc_pollset_worker worker; grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), &call_list); } gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index b4eb7ed916..eeb3ecb55a 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -138,10 +138,12 @@ void reconnect_server_poll(reconnect_server *server, int seconds) { gpr_timespec deadline = gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), gpr_time_from_seconds(seconds, GPR_TIMESPAN)); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_mu_lock(GRPC_POLLSET_MU(&server->pollset)); grpc_pollset_work(&server->pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + deadline, &call_list); gpr_mu_unlock(GRPC_POLLSET_MU(&server->pollset)); + grpc_call_list_run(&call_list); } void reconnect_server_clear_timestamps(reconnect_server *server) { |