diff options
author | Craig Tiller <ctiller@google.com> | 2015-09-18 17:29:00 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-09-18 17:29:00 -0700 |
commit | d1bec03fa148344b8eac2b59517252d86e4ca858 (patch) | |
tree | f359e48f9151ab7ceff72cd624ad6c7a59e4d304 /test | |
parent | 33825118df7157219cec15382beb006d3462ad96 (diff) |
Call list progress
Diffstat (limited to 'test')
-rw-r--r-- | test/core/bad_client/bad_client.c | 40 | ||||
-rw-r--r-- | test/core/end2end/fixtures/h2_sockpair+trace.c | 17 | ||||
-rw-r--r-- | test/core/end2end/fixtures/h2_sockpair.c | 17 | ||||
-rw-r--r-- | test/core/end2end/fixtures/h2_sockpair_1byte.c | 17 | ||||
-rw-r--r-- | test/core/iomgr/endpoint_tests.c | 58 | ||||
-rw-r--r-- | test/core/security/oauth2_utils.c | 18 | ||||
-rw-r--r-- | test/core/util/port_posix.c | 35 | ||||
-rw-r--r-- | test/core/util/reconnect_server.c | 24 |
8 files changed, 120 insertions, 106 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index b9c0c98f69..e89d5b3d39 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -59,7 +59,7 @@ static void thd_func(void *arg) { gpr_event_set(&a->done_thd, (void *)1); } -static void done_write(void *arg, int success) { +static void done_write(void *arg, int success, grpc_call_list *call_list) { thd_args *a = arg; gpr_event_set(&a->done_write, (void *)1); } @@ -89,6 +89,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, gpr_slice_buffer outgoing; grpc_closure done_write_closure; grpc_workqueue *workqueue; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; hex = gpr_dump(client_payload, client_payload_length, GPR_DUMP_HEX | GPR_DUMP_ASCII); @@ -101,10 +102,11 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, /* Init grpc */ grpc_init(); - workqueue = grpc_workqueue_create(); + workqueue = grpc_workqueue_create(&call_list); + grpc_call_list_run(&call_list); /* Create endpoints */ - sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536, workqueue); + sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536); /* Create server, completion events */ a.server = grpc_server_create_from_filters(NULL, 0, NULL); @@ -114,14 +116,13 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, a.validator = validator; grpc_server_register_completion_queue(a.server, a.cq, NULL); grpc_server_start(a.server); - transport = - grpc_create_chttp2_transport(NULL, sfd.server, mdctx, workqueue, 0); + transport = grpc_create_chttp2_transport(NULL, sfd.server, mdctx, 0); server_setup_transport(&a, transport, mdctx, workqueue); grpc_chttp2_transport_start_reading(transport, NULL, 0); /* Bind everything into the same pollset */ - grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq)); - grpc_endpoint_add_to_pollset(sfd.server, grpc_cq_pollset(a.cq)); + grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq), &call_list); + grpc_endpoint_add_to_pollset(sfd.server, grpc_cq_pollset(a.cq), &call_list); /* Check a ground truth */ GPR_ASSERT(grpc_server_has_open_connections(a.server)); @@ -134,24 +135,17 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, grpc_closure_init(&done_write_closure, done_write, &a); /* Write data */ - switch (grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure)) { - case GRPC_ENDPOINT_DONE: - done_write(&a, 1); - break; - case GRPC_ENDPOINT_PENDING: - break; - case GRPC_ENDPOINT_ERROR: - done_write(&a, 0); - break; - } + grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure, &call_list); + grpc_call_list_run(&call_list); /* Await completion */ GPR_ASSERT( gpr_event_wait(&a.done_write, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5))); if (flags & GRPC_BAD_CLIENT_DISCONNECT) { - grpc_endpoint_shutdown(sfd.client); - grpc_endpoint_destroy(sfd.client); + grpc_endpoint_shutdown(sfd.client, &call_list); + grpc_endpoint_destroy(sfd.client, &call_list); + grpc_call_list_run(&call_list); sfd.client = NULL; } @@ -159,8 +153,9 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, /* Shutdown */ if (sfd.client) { - grpc_endpoint_shutdown(sfd.client); - grpc_endpoint_destroy(sfd.client); + grpc_endpoint_shutdown(sfd.client, &call_list); + grpc_endpoint_destroy(sfd.client, &call_list); + grpc_call_list_run(&call_list); } grpc_server_shutdown_and_notify(a.server, a.cq, NULL); GPR_ASSERT(grpc_completion_queue_pluck( @@ -170,6 +165,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, grpc_completion_queue_destroy(a.cq); gpr_slice_buffer_destroy(&outgoing); - GRPC_WORKQUEUE_UNREF(workqueue, "destroy"); + GRPC_WORKQUEUE_UNREF(workqueue, "destroy", &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); } diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c index 210aa68fe1..04dbf48522 100644 --- a/test/core/end2end/fixtures/h2_sockpair+trace.c +++ b/test/core/end2end/fixtures/h2_sockpair+trace.c @@ -101,7 +101,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( f.fixture_data = sfd; f.cq = grpc_completion_queue_create(NULL); - *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536, g_workqueue); + *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536); return f; } @@ -114,8 +114,7 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, - g_workqueue, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1); client_setup_transport(&cs, transport, mdctx); GPR_ASSERT(f->client); grpc_chttp2_transport_start_reading(transport, NULL, 0); @@ -130,8 +129,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq, NULL); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, - g_workqueue, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0); server_setup_transport(f, transport, mdctx); grpc_chttp2_transport_start_reading(transport, NULL, 0); } @@ -149,6 +147,7 @@ static grpc_end2end_test_config configs[] = { int main(int argc, char **argv) { size_t i; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; /* force tracing on, with a value to force many code paths in trace.c to be taken */ @@ -161,7 +160,8 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); - g_workqueue = grpc_workqueue_create(); + g_workqueue = grpc_workqueue_create(&call_list); + grpc_call_list_run(&call_list); GPR_ASSERT(0 == grpc_tracer_set_enabled("also-doesnt-exist", 0)); GPR_ASSERT(1 == grpc_tracer_set_enabled("http", 1)); @@ -171,8 +171,9 @@ int main(int argc, char **argv) { grpc_end2end_tests(configs[i]); } - grpc_workqueue_flush(g_workqueue); - GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); + grpc_workqueue_flush(g_workqueue, &call_list); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy", &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); return 0; diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c index e2bba5d1dd..b661dbfd60 100644 --- a/test/core/end2end/fixtures/h2_sockpair.c +++ b/test/core/end2end/fixtures/h2_sockpair.c @@ -100,7 +100,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( f.fixture_data = sfd; f.cq = grpc_completion_queue_create(NULL); - *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536, g_workqueue); + *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536); return f; } @@ -113,8 +113,7 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, - g_workqueue, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1); client_setup_transport(&cs, transport, mdctx); GPR_ASSERT(f->client); grpc_chttp2_transport_start_reading(transport, NULL, 0); @@ -129,8 +128,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq, NULL); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, - g_workqueue, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0); server_setup_transport(f, transport, mdctx); grpc_chttp2_transport_start_reading(transport, NULL, 0); } @@ -148,17 +146,20 @@ static grpc_end2end_test_config configs[] = { int main(int argc, char **argv) { size_t i; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_init(); - g_workqueue = grpc_workqueue_create(); + g_workqueue = grpc_workqueue_create(&call_list); + grpc_call_list_run(&call_list); for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { grpc_end2end_tests(configs[i]); } - grpc_workqueue_flush(g_workqueue); - GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); + grpc_workqueue_flush(g_workqueue, &call_list); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy", &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); return 0; diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c index 2db0669dba..6cf656bcde 100644 --- a/test/core/end2end/fixtures/h2_sockpair_1byte.c +++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c @@ -100,7 +100,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( f.fixture_data = sfd; f.cq = grpc_completion_queue_create(NULL); - *sfd = grpc_iomgr_create_endpoint_pair("fixture", 1, g_workqueue); + *sfd = grpc_iomgr_create_endpoint_pair("fixture", 1); return f; } @@ -113,8 +113,7 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, sp_client_setup cs; cs.client_args = client_args; cs.f = f; - transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, - g_workqueue, 1); + transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1); client_setup_transport(&cs, transport, mdctx); GPR_ASSERT(f->client); grpc_chttp2_transport_start_reading(transport, NULL, 0); @@ -129,8 +128,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, f->server = grpc_server_create_from_filters(NULL, 0, server_args); grpc_server_register_completion_queue(f->server, f->cq, NULL); grpc_server_start(f->server); - transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, - g_workqueue, 0); + transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0); server_setup_transport(f, transport, mdctx); grpc_chttp2_transport_start_reading(transport, NULL, 0); } @@ -148,17 +146,20 @@ static grpc_end2end_test_config configs[] = { int main(int argc, char **argv) { size_t i; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; grpc_test_init(argc, argv); grpc_init(); - g_workqueue = grpc_workqueue_create(); + g_workqueue = grpc_workqueue_create(&call_list); + grpc_call_list_run(&call_list); for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { grpc_end2end_tests(configs[i]); } - grpc_workqueue_flush(g_workqueue); - GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy"); + grpc_workqueue_flush(g_workqueue, &call_list); + GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy", &call_list); + grpc_call_list_run(&call_list); grpc_shutdown(); return 0; diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index bf9c2d844e..a5079f17a5 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -126,10 +126,10 @@ struct read_and_write_test_state { grpc_closure done_write; }; -static void read_and_write_test_read_handler(void *data, int success) { +static void read_and_write_test_read_handler(void *data, int success, + grpc_call_list *call_list) { struct read_and_write_test_state *state = data; -loop: state->bytes_read += count_slices( state->incoming.slices, state->incoming.count, &state->current_read_data); if (state->bytes_read == state->target_bytes || !success) { @@ -139,25 +139,16 @@ loop: grpc_pollset_kick(g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } else if (success) { - switch (grpc_endpoint_read(state->read_ep, &state->incoming, - &state->done_read)) { - case GRPC_ENDPOINT_ERROR: - success = 0; - goto loop; - case GRPC_ENDPOINT_DONE: - success = 1; - goto loop; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read, + call_list); } } -static void read_and_write_test_write_handler(void *data, int success) { +static void read_and_write_test_write_handler(void *data, int success, + grpc_call_list *call_list) { struct read_and_write_test_state *state = data; gpr_slice *slices = NULL; size_t nslices; - grpc_endpoint_op_status write_status; if (success) { for (;;) { @@ -176,19 +167,13 @@ static void read_and_write_test_write_handler(void *data, int success) { &state->current_write_data); gpr_slice_buffer_reset_and_unref(&state->outgoing); gpr_slice_buffer_addn(&state->outgoing, slices, nslices); - write_status = grpc_endpoint_write(state->write_ep, &state->outgoing, - &state->done_write); + grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write, + call_list); free(slices); - if (write_status == GRPC_ENDPOINT_PENDING) { - return; - } else if (write_status == GRPC_ENDPOINT_ERROR) { - goto cleanup; - } } GPR_ASSERT(state->bytes_written == state->target_bytes); } -cleanup: gpr_log(GPR_INFO, "Write handler done"); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); state->write_done = 1 + success; @@ -207,6 +192,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); grpc_endpoint_test_fixture f = begin_test(config, "read_and_write_test", slice_size); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d", num_bytes, write_size, slice_size, shutdown); @@ -238,26 +224,19 @@ static void read_and_write_test(grpc_endpoint_test_config config, for the first iteration as for later iterations. It does the right thing even when bytes_written is unsigned. */ state.bytes_written -= state.current_write_size; - read_and_write_test_write_handler(&state, 1); + read_and_write_test_write_handler(&state, 1, &call_list); + grpc_call_list_run(&call_list); - switch ( - grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) { - case GRPC_ENDPOINT_PENDING: - break; - case GRPC_ENDPOINT_ERROR: - read_and_write_test_read_handler(&state, 0); - break; - case GRPC_ENDPOINT_DONE: - read_and_write_test_read_handler(&state, 1); - break; - } + grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read, + &call_list); if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); - grpc_endpoint_shutdown(state.read_ep); + grpc_endpoint_shutdown(state.read_ep, &call_list); gpr_log(GPR_DEBUG, "shutdown write"); - grpc_endpoint_shutdown(state.write_ep); + grpc_endpoint_shutdown(state.write_ep, &call_list); } + grpc_call_list_run(&call_list); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); while (!state.read_done || !state.write_done) { @@ -271,8 +250,9 @@ static void read_and_write_test(grpc_endpoint_test_config config, end_test(config); gpr_slice_buffer_destroy(&state.outgoing); gpr_slice_buffer_destroy(&state.incoming); - grpc_endpoint_destroy(state.read_ep); - grpc_endpoint_destroy(state.write_ep); + grpc_endpoint_destroy(state.read_ep, &call_list); + grpc_endpoint_destroy(state.write_ep, &call_list); + grpc_call_list_run(&call_list); } void grpc_endpoint_tests(grpc_endpoint_test_config config, diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c index 7df6fade6b..45beb05c1c 100644 --- a/test/core/security/oauth2_utils.c +++ b/test/core/security/oauth2_utils.c @@ -51,7 +51,8 @@ typedef struct { } oauth2_request; static void on_oauth2_response(void *user_data, grpc_credentials_md *md_elems, - size_t num_md, grpc_credentials_status status) { + size_t num_md, grpc_credentials_status status, + grpc_call_list *call_list) { oauth2_request *request = user_data; char *token = NULL; gpr_slice token_slice; @@ -72,15 +73,21 @@ static void on_oauth2_response(void *user_data, grpc_credentials_md *md_elems, gpr_mu_unlock(GRPC_POLLSET_MU(&request->pollset)); } -static void do_nothing(void *unused) {} +static void do_nothing(void *unused, int success, grpc_call_list *call_list) {} char *grpc_test_fetch_oauth2_token_with_credentials(grpc_credentials *creds) { oauth2_request request; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_closure do_nothing_closure; grpc_pollset_init(&request.pollset); request.is_done = 0; - grpc_credentials_get_request_metadata(creds, &request.pollset, "", - on_oauth2_response, &request); + grpc_closure_init(&do_nothing_closure, do_nothing, NULL); + + grpc_credentials_get_request_metadata( + creds, &request.pollset, "", on_oauth2_response, &request, &call_list); + + grpc_call_list_run(&call_list); gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset)); while (!request.is_done) { @@ -90,7 +97,8 @@ char *grpc_test_fetch_oauth2_token_with_credentials(grpc_credentials *creds) { } gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset)); - grpc_pollset_shutdown(&request.pollset, do_nothing, NULL); + grpc_pollset_shutdown(&request.pollset, &do_nothing_closure, &call_list); + grpc_call_list_run(&call_list); grpc_pollset_destroy(&request.pollset); return request.token; } diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index 03f4c4912e..fa3798aa3c 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -72,13 +72,15 @@ typedef struct freereq { int done; } freereq; -static void destroy_pollset_and_shutdown(void *p) { +static void destroy_pollset_and_shutdown(void *p, int success, + grpc_call_list *call_list) { grpc_pollset_destroy(p); grpc_shutdown(); } static void freed_port_from_server(void *arg, - const grpc_httpcli_response *response) { + const grpc_httpcli_response *response, + grpc_call_list *call_list) { freereq *pr = arg; gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset)); pr->done = 1; @@ -91,12 +93,16 @@ static void free_port_using_server(char *server, int port) { grpc_httpcli_request req; freereq pr; char *path; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_closure shutdown_closure; grpc_init(); memset(&pr, 0, sizeof(pr)); memset(&req, 0, sizeof(req)); grpc_pollset_init(&pr.pollset); + grpc_closure_init(&shutdown_closure, destroy_pollset_and_shutdown, + &pr.pollset); req.host = server; gpr_asprintf(&path, "/drop/%d", port); @@ -105,7 +111,7 @@ static void free_port_using_server(char *server, int port) { grpc_httpcli_context_init(&context); grpc_httpcli_get(&context, &pr.pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), freed_port_from_server, - &pr); + &pr, &call_list); gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); while (!pr.done) { grpc_pollset_worker worker; @@ -115,7 +121,9 @@ static void free_port_using_server(char *server, int port) { gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); grpc_httpcli_context_destroy(&context); - grpc_pollset_shutdown(&pr.pollset, destroy_pollset_and_shutdown, &pr.pollset); + grpc_call_list_run(&call_list); + grpc_pollset_shutdown(&pr.pollset, &shutdown_closure, &call_list); + grpc_call_list_run(&call_list); gpr_free(path); } @@ -201,10 +209,12 @@ typedef struct portreq { } portreq; static void got_port_from_server(void *arg, - const grpc_httpcli_response *response) { + const grpc_httpcli_response *response, + grpc_call_list *call_list) { size_t i; int port = 0; portreq *pr = arg; + if (!response || response->status != 200) { grpc_httpcli_request req; memset(&req, 0, sizeof(req)); @@ -214,8 +224,9 @@ static void got_port_from_server(void *arg, req.path = "/get"; gpr_log(GPR_DEBUG, "failed port pick from server: retrying"); sleep(1); - grpc_httpcli_get(pr->ctx, &pr->pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), - got_port_from_server, pr); + grpc_httpcli_get(pr->ctx, &pr->pollset, &req, + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, + pr, call_list); return; } GPR_ASSERT(response); @@ -235,12 +246,16 @@ static int pick_port_using_server(char *server) { grpc_httpcli_context context; grpc_httpcli_request req; portreq pr; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_closure shutdown_closure; grpc_init(); memset(&pr, 0, sizeof(pr)); memset(&req, 0, sizeof(req)); grpc_pollset_init(&pr.pollset); + grpc_closure_init(&shutdown_closure, destroy_pollset_and_shutdown, + &pr.pollset); pr.port = -1; pr.server = server; pr.ctx = &context; @@ -251,7 +266,8 @@ static int pick_port_using_server(char *server) { grpc_httpcli_context_init(&context); grpc_httpcli_get(&context, &pr.pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, - &pr); + &pr, &call_list); + grpc_call_list_run(&call_list); gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); while (pr.port == -1) { grpc_pollset_worker worker; @@ -261,7 +277,8 @@ static int pick_port_using_server(char *server) { gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); grpc_httpcli_context_destroy(&context); - grpc_pollset_shutdown(&pr.pollset, destroy_pollset_and_shutdown, &pr.pollset); + grpc_pollset_shutdown(&pr.pollset, &shutdown_closure, &call_list); + grpc_call_list_run(&call_list); return pr.port; } diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c index 71fb69b54f..b4eb7ed916 100644 --- a/test/core/util/reconnect_server.c +++ b/test/core/util/reconnect_server.c @@ -65,15 +65,16 @@ static void pretty_print_backoffs(reconnect_server *server) { } } -static void on_connect(void *arg, grpc_endpoint *tcp) { +static void on_connect(void *arg, grpc_endpoint *tcp, + grpc_call_list *call_list) { char *peer; char *last_colon; reconnect_server *server = (reconnect_server *)arg; gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME); timestamp_list *new_tail; peer = grpc_endpoint_get_peer(tcp); - grpc_endpoint_shutdown(tcp); - grpc_endpoint_destroy(tcp); + grpc_endpoint_shutdown(tcp, call_list); + grpc_endpoint_destroy(tcp, call_list); if (peer) { last_colon = strrchr(peer, ':'); if (server->peer == NULL) { @@ -114,6 +115,7 @@ void reconnect_server_init(reconnect_server *server) { void reconnect_server_start(reconnect_server *server, int port) { struct sockaddr_in addr; int port_added; + grpc_call_list call_list = GRPC_CALL_LIST_INIT; addr.sin_family = AF_INET; addr.sin_port = htons((gpr_uint16)port); @@ -125,8 +127,10 @@ void reconnect_server_start(reconnect_server *server, int port) { GPR_ASSERT(port_added == port); grpc_tcp_server_start(server->tcp_server, server->pollsets, 1, on_connect, - server); + server, &call_list); gpr_log(GPR_INFO, "reconnect tcp server listening on 0.0.0.0:%d", port); + + grpc_call_list_run(&call_list); } void reconnect_server_poll(reconnect_server *server, int seconds) { @@ -152,12 +156,18 @@ void reconnect_server_clear_timestamps(reconnect_server *server) { server->peer = NULL; } -static void do_nothing(void *ignored) {} +static void do_nothing(void *ignored, int success, grpc_call_list *call_list) {} void reconnect_server_destroy(reconnect_server *server) { - grpc_tcp_server_destroy(server->tcp_server, do_nothing, NULL); + grpc_call_list call_list = GRPC_CALL_LIST_INIT; + grpc_closure do_nothing_closure[2]; + grpc_closure_init(&do_nothing_closure[0], do_nothing, NULL); + grpc_closure_init(&do_nothing_closure[1], do_nothing, NULL); + grpc_tcp_server_destroy(server->tcp_server, &do_nothing_closure[0], + &call_list); reconnect_server_clear_timestamps(server); - grpc_pollset_shutdown(&server->pollset, do_nothing, NULL); + grpc_pollset_shutdown(&server->pollset, &do_nothing_closure[1], &call_list); + grpc_call_list_run(&call_list); grpc_pollset_destroy(&server->pollset); grpc_shutdown(); } |