aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-18 17:29:00 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-18 17:29:00 -0700
commitd1bec03fa148344b8eac2b59517252d86e4ca858 (patch)
treef359e48f9151ab7ceff72cd624ad6c7a59e4d304 /test
parent33825118df7157219cec15382beb006d3462ad96 (diff)
Call list progress
Diffstat (limited to 'test')
-rw-r--r--test/core/bad_client/bad_client.c40
-rw-r--r--test/core/end2end/fixtures/h2_sockpair+trace.c17
-rw-r--r--test/core/end2end/fixtures/h2_sockpair.c17
-rw-r--r--test/core/end2end/fixtures/h2_sockpair_1byte.c17
-rw-r--r--test/core/iomgr/endpoint_tests.c58
-rw-r--r--test/core/security/oauth2_utils.c18
-rw-r--r--test/core/util/port_posix.c35
-rw-r--r--test/core/util/reconnect_server.c24
8 files changed, 120 insertions, 106 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c
index b9c0c98f69..e89d5b3d39 100644
--- a/test/core/bad_client/bad_client.c
+++ b/test/core/bad_client/bad_client.c
@@ -59,7 +59,7 @@ static void thd_func(void *arg) {
gpr_event_set(&a->done_thd, (void *)1);
}
-static void done_write(void *arg, int success) {
+static void done_write(void *arg, int success, grpc_call_list *call_list) {
thd_args *a = arg;
gpr_event_set(&a->done_write, (void *)1);
}
@@ -89,6 +89,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
gpr_slice_buffer outgoing;
grpc_closure done_write_closure;
grpc_workqueue *workqueue;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
hex = gpr_dump(client_payload, client_payload_length,
GPR_DUMP_HEX | GPR_DUMP_ASCII);
@@ -101,10 +102,11 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
/* Init grpc */
grpc_init();
- workqueue = grpc_workqueue_create();
+ workqueue = grpc_workqueue_create(&call_list);
+ grpc_call_list_run(&call_list);
/* Create endpoints */
- sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536, workqueue);
+ sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
/* Create server, completion events */
a.server = grpc_server_create_from_filters(NULL, 0, NULL);
@@ -114,14 +116,13 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
a.validator = validator;
grpc_server_register_completion_queue(a.server, a.cq, NULL);
grpc_server_start(a.server);
- transport =
- grpc_create_chttp2_transport(NULL, sfd.server, mdctx, workqueue, 0);
+ transport = grpc_create_chttp2_transport(NULL, sfd.server, mdctx, 0);
server_setup_transport(&a, transport, mdctx, workqueue);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
/* Bind everything into the same pollset */
- grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq));
- grpc_endpoint_add_to_pollset(sfd.server, grpc_cq_pollset(a.cq));
+ grpc_endpoint_add_to_pollset(sfd.client, grpc_cq_pollset(a.cq), &call_list);
+ grpc_endpoint_add_to_pollset(sfd.server, grpc_cq_pollset(a.cq), &call_list);
/* Check a ground truth */
GPR_ASSERT(grpc_server_has_open_connections(a.server));
@@ -134,24 +135,17 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
grpc_closure_init(&done_write_closure, done_write, &a);
/* Write data */
- switch (grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure)) {
- case GRPC_ENDPOINT_DONE:
- done_write(&a, 1);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_ERROR:
- done_write(&a, 0);
- break;
- }
+ grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure, &call_list);
+ grpc_call_list_run(&call_list);
/* Await completion */
GPR_ASSERT(
gpr_event_wait(&a.done_write, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)));
if (flags & GRPC_BAD_CLIENT_DISCONNECT) {
- grpc_endpoint_shutdown(sfd.client);
- grpc_endpoint_destroy(sfd.client);
+ grpc_endpoint_shutdown(sfd.client, &call_list);
+ grpc_endpoint_destroy(sfd.client, &call_list);
+ grpc_call_list_run(&call_list);
sfd.client = NULL;
}
@@ -159,8 +153,9 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
/* Shutdown */
if (sfd.client) {
- grpc_endpoint_shutdown(sfd.client);
- grpc_endpoint_destroy(sfd.client);
+ grpc_endpoint_shutdown(sfd.client, &call_list);
+ grpc_endpoint_destroy(sfd.client, &call_list);
+ grpc_call_list_run(&call_list);
}
grpc_server_shutdown_and_notify(a.server, a.cq, NULL);
GPR_ASSERT(grpc_completion_queue_pluck(
@@ -170,6 +165,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator,
grpc_completion_queue_destroy(a.cq);
gpr_slice_buffer_destroy(&outgoing);
- GRPC_WORKQUEUE_UNREF(workqueue, "destroy");
+ GRPC_WORKQUEUE_UNREF(workqueue, "destroy", &call_list);
+ grpc_call_list_run(&call_list);
grpc_shutdown();
}
diff --git a/test/core/end2end/fixtures/h2_sockpair+trace.c b/test/core/end2end/fixtures/h2_sockpair+trace.c
index 210aa68fe1..04dbf48522 100644
--- a/test/core/end2end/fixtures/h2_sockpair+trace.c
+++ b/test/core/end2end/fixtures/h2_sockpair+trace.c
@@ -101,7 +101,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.fixture_data = sfd;
f.cq = grpc_completion_queue_create(NULL);
- *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536, g_workqueue);
+ *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
return f;
}
@@ -114,8 +114,7 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx,
- g_workqueue, 1);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1);
client_setup_transport(&cs, transport, mdctx);
GPR_ASSERT(f->client);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
@@ -130,8 +129,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_register_completion_queue(f->server, f->cq, NULL);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx,
- g_workqueue, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0);
server_setup_transport(f, transport, mdctx);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
}
@@ -149,6 +147,7 @@ static grpc_end2end_test_config configs[] = {
int main(int argc, char **argv) {
size_t i;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
/* force tracing on, with a value to force many
code paths in trace.c to be taken */
@@ -161,7 +160,8 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
- g_workqueue = grpc_workqueue_create();
+ g_workqueue = grpc_workqueue_create(&call_list);
+ grpc_call_list_run(&call_list);
GPR_ASSERT(0 == grpc_tracer_set_enabled("also-doesnt-exist", 0));
GPR_ASSERT(1 == grpc_tracer_set_enabled("http", 1));
@@ -171,8 +171,9 @@ int main(int argc, char **argv) {
grpc_end2end_tests(configs[i]);
}
- grpc_workqueue_flush(g_workqueue);
- GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
+ grpc_workqueue_flush(g_workqueue, &call_list);
+ GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy", &call_list);
+ grpc_call_list_run(&call_list);
grpc_shutdown();
return 0;
diff --git a/test/core/end2end/fixtures/h2_sockpair.c b/test/core/end2end/fixtures/h2_sockpair.c
index e2bba5d1dd..b661dbfd60 100644
--- a/test/core/end2end/fixtures/h2_sockpair.c
+++ b/test/core/end2end/fixtures/h2_sockpair.c
@@ -100,7 +100,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.fixture_data = sfd;
f.cq = grpc_completion_queue_create(NULL);
- *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536, g_workqueue);
+ *sfd = grpc_iomgr_create_endpoint_pair("fixture", 65536);
return f;
}
@@ -113,8 +113,7 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx,
- g_workqueue, 1);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1);
client_setup_transport(&cs, transport, mdctx);
GPR_ASSERT(f->client);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
@@ -129,8 +128,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_register_completion_queue(f->server, f->cq, NULL);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx,
- g_workqueue, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0);
server_setup_transport(f, transport, mdctx);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
}
@@ -148,17 +146,20 @@ static grpc_end2end_test_config configs[] = {
int main(int argc, char **argv) {
size_t i;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_test_init(argc, argv);
grpc_init();
- g_workqueue = grpc_workqueue_create();
+ g_workqueue = grpc_workqueue_create(&call_list);
+ grpc_call_list_run(&call_list);
for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) {
grpc_end2end_tests(configs[i]);
}
- grpc_workqueue_flush(g_workqueue);
- GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
+ grpc_workqueue_flush(g_workqueue, &call_list);
+ GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy", &call_list);
+ grpc_call_list_run(&call_list);
grpc_shutdown();
return 0;
diff --git a/test/core/end2end/fixtures/h2_sockpair_1byte.c b/test/core/end2end/fixtures/h2_sockpair_1byte.c
index 2db0669dba..6cf656bcde 100644
--- a/test/core/end2end/fixtures/h2_sockpair_1byte.c
+++ b/test/core/end2end/fixtures/h2_sockpair_1byte.c
@@ -100,7 +100,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.fixture_data = sfd;
f.cq = grpc_completion_queue_create(NULL);
- *sfd = grpc_iomgr_create_endpoint_pair("fixture", 1, g_workqueue);
+ *sfd = grpc_iomgr_create_endpoint_pair("fixture", 1);
return f;
}
@@ -113,8 +113,7 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx,
- g_workqueue, 1);
+ transport = grpc_create_chttp2_transport(client_args, sfd->client, mdctx, 1);
client_setup_transport(&cs, transport, mdctx);
GPR_ASSERT(f->client);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
@@ -129,8 +128,7 @@ static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
f->server = grpc_server_create_from_filters(NULL, 0, server_args);
grpc_server_register_completion_queue(f->server, f->cq, NULL);
grpc_server_start(f->server);
- transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx,
- g_workqueue, 0);
+ transport = grpc_create_chttp2_transport(server_args, sfd->server, mdctx, 0);
server_setup_transport(f, transport, mdctx);
grpc_chttp2_transport_start_reading(transport, NULL, 0);
}
@@ -148,17 +146,20 @@ static grpc_end2end_test_config configs[] = {
int main(int argc, char **argv) {
size_t i;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
grpc_test_init(argc, argv);
grpc_init();
- g_workqueue = grpc_workqueue_create();
+ g_workqueue = grpc_workqueue_create(&call_list);
+ grpc_call_list_run(&call_list);
for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) {
grpc_end2end_tests(configs[i]);
}
- grpc_workqueue_flush(g_workqueue);
- GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy");
+ grpc_workqueue_flush(g_workqueue, &call_list);
+ GRPC_WORKQUEUE_UNREF(g_workqueue, "destroy", &call_list);
+ grpc_call_list_run(&call_list);
grpc_shutdown();
return 0;
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index bf9c2d844e..a5079f17a5 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -126,10 +126,10 @@ struct read_and_write_test_state {
grpc_closure done_write;
};
-static void read_and_write_test_read_handler(void *data, int success) {
+static void read_and_write_test_read_handler(void *data, int success,
+ grpc_call_list *call_list) {
struct read_and_write_test_state *state = data;
-loop:
state->bytes_read += count_slices(
state->incoming.slices, state->incoming.count, &state->current_read_data);
if (state->bytes_read == state->target_bytes || !success) {
@@ -139,25 +139,16 @@ loop:
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else if (success) {
- switch (grpc_endpoint_read(state->read_ep, &state->incoming,
- &state->done_read)) {
- case GRPC_ENDPOINT_ERROR:
- success = 0;
- goto loop;
- case GRPC_ENDPOINT_DONE:
- success = 1;
- goto loop;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_read(state->read_ep, &state->incoming, &state->done_read,
+ call_list);
}
}
-static void read_and_write_test_write_handler(void *data, int success) {
+static void read_and_write_test_write_handler(void *data, int success,
+ grpc_call_list *call_list) {
struct read_and_write_test_state *state = data;
gpr_slice *slices = NULL;
size_t nslices;
- grpc_endpoint_op_status write_status;
if (success) {
for (;;) {
@@ -176,19 +167,13 @@ static void read_and_write_test_write_handler(void *data, int success) {
&state->current_write_data);
gpr_slice_buffer_reset_and_unref(&state->outgoing);
gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
- write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
- &state->done_write);
+ grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
+ call_list);
free(slices);
- if (write_status == GRPC_ENDPOINT_PENDING) {
- return;
- } else if (write_status == GRPC_ENDPOINT_ERROR) {
- goto cleanup;
- }
}
GPR_ASSERT(state->bytes_written == state->target_bytes);
}
-cleanup:
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->write_done = 1 + success;
@@ -207,6 +192,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
grpc_endpoint_test_fixture f =
begin_test(config, "read_and_write_test", slice_size);
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d",
num_bytes, write_size, slice_size, shutdown);
@@ -238,26 +224,19 @@ static void read_and_write_test(grpc_endpoint_test_config config,
for the first iteration as for later iterations. It does the right thing
even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size;
- read_and_write_test_write_handler(&state, 1);
+ read_and_write_test_write_handler(&state, 1, &call_list);
+ grpc_call_list_run(&call_list);
- switch (
- grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) {
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_ERROR:
- read_and_write_test_read_handler(&state, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- read_and_write_test_read_handler(&state, 1);
- break;
- }
+ grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read,
+ &call_list);
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
- grpc_endpoint_shutdown(state.read_ep);
+ grpc_endpoint_shutdown(state.read_ep, &call_list);
gpr_log(GPR_DEBUG, "shutdown write");
- grpc_endpoint_shutdown(state.write_ep);
+ grpc_endpoint_shutdown(state.write_ep, &call_list);
}
+ grpc_call_list_run(&call_list);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
while (!state.read_done || !state.write_done) {
@@ -271,8 +250,9 @@ static void read_and_write_test(grpc_endpoint_test_config config,
end_test(config);
gpr_slice_buffer_destroy(&state.outgoing);
gpr_slice_buffer_destroy(&state.incoming);
- grpc_endpoint_destroy(state.read_ep);
- grpc_endpoint_destroy(state.write_ep);
+ grpc_endpoint_destroy(state.read_ep, &call_list);
+ grpc_endpoint_destroy(state.write_ep, &call_list);
+ grpc_call_list_run(&call_list);
}
void grpc_endpoint_tests(grpc_endpoint_test_config config,
diff --git a/test/core/security/oauth2_utils.c b/test/core/security/oauth2_utils.c
index 7df6fade6b..45beb05c1c 100644
--- a/test/core/security/oauth2_utils.c
+++ b/test/core/security/oauth2_utils.c
@@ -51,7 +51,8 @@ typedef struct {
} oauth2_request;
static void on_oauth2_response(void *user_data, grpc_credentials_md *md_elems,
- size_t num_md, grpc_credentials_status status) {
+ size_t num_md, grpc_credentials_status status,
+ grpc_call_list *call_list) {
oauth2_request *request = user_data;
char *token = NULL;
gpr_slice token_slice;
@@ -72,15 +73,21 @@ static void on_oauth2_response(void *user_data, grpc_credentials_md *md_elems,
gpr_mu_unlock(GRPC_POLLSET_MU(&request->pollset));
}
-static void do_nothing(void *unused) {}
+static void do_nothing(void *unused, int success, grpc_call_list *call_list) {}
char *grpc_test_fetch_oauth2_token_with_credentials(grpc_credentials *creds) {
oauth2_request request;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_closure do_nothing_closure;
grpc_pollset_init(&request.pollset);
request.is_done = 0;
- grpc_credentials_get_request_metadata(creds, &request.pollset, "",
- on_oauth2_response, &request);
+ grpc_closure_init(&do_nothing_closure, do_nothing, NULL);
+
+ grpc_credentials_get_request_metadata(
+ creds, &request.pollset, "", on_oauth2_response, &request, &call_list);
+
+ grpc_call_list_run(&call_list);
gpr_mu_lock(GRPC_POLLSET_MU(&request.pollset));
while (!request.is_done) {
@@ -90,7 +97,8 @@ char *grpc_test_fetch_oauth2_token_with_credentials(grpc_credentials *creds) {
}
gpr_mu_unlock(GRPC_POLLSET_MU(&request.pollset));
- grpc_pollset_shutdown(&request.pollset, do_nothing, NULL);
+ grpc_pollset_shutdown(&request.pollset, &do_nothing_closure, &call_list);
+ grpc_call_list_run(&call_list);
grpc_pollset_destroy(&request.pollset);
return request.token;
}
diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c
index 03f4c4912e..fa3798aa3c 100644
--- a/test/core/util/port_posix.c
+++ b/test/core/util/port_posix.c
@@ -72,13 +72,15 @@ typedef struct freereq {
int done;
} freereq;
-static void destroy_pollset_and_shutdown(void *p) {
+static void destroy_pollset_and_shutdown(void *p, int success,
+ grpc_call_list *call_list) {
grpc_pollset_destroy(p);
grpc_shutdown();
}
static void freed_port_from_server(void *arg,
- const grpc_httpcli_response *response) {
+ const grpc_httpcli_response *response,
+ grpc_call_list *call_list) {
freereq *pr = arg;
gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset));
pr->done = 1;
@@ -91,12 +93,16 @@ static void free_port_using_server(char *server, int port) {
grpc_httpcli_request req;
freereq pr;
char *path;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_closure shutdown_closure;
grpc_init();
memset(&pr, 0, sizeof(pr));
memset(&req, 0, sizeof(req));
grpc_pollset_init(&pr.pollset);
+ grpc_closure_init(&shutdown_closure, destroy_pollset_and_shutdown,
+ &pr.pollset);
req.host = server;
gpr_asprintf(&path, "/drop/%d", port);
@@ -105,7 +111,7 @@ static void free_port_using_server(char *server, int port) {
grpc_httpcli_context_init(&context);
grpc_httpcli_get(&context, &pr.pollset, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), freed_port_from_server,
- &pr);
+ &pr, &call_list);
gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
while (!pr.done) {
grpc_pollset_worker worker;
@@ -115,7 +121,9 @@ static void free_port_using_server(char *server, int port) {
gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));
grpc_httpcli_context_destroy(&context);
- grpc_pollset_shutdown(&pr.pollset, destroy_pollset_and_shutdown, &pr.pollset);
+ grpc_call_list_run(&call_list);
+ grpc_pollset_shutdown(&pr.pollset, &shutdown_closure, &call_list);
+ grpc_call_list_run(&call_list);
gpr_free(path);
}
@@ -201,10 +209,12 @@ typedef struct portreq {
} portreq;
static void got_port_from_server(void *arg,
- const grpc_httpcli_response *response) {
+ const grpc_httpcli_response *response,
+ grpc_call_list *call_list) {
size_t i;
int port = 0;
portreq *pr = arg;
+
if (!response || response->status != 200) {
grpc_httpcli_request req;
memset(&req, 0, sizeof(req));
@@ -214,8 +224,9 @@ static void got_port_from_server(void *arg,
req.path = "/get";
gpr_log(GPR_DEBUG, "failed port pick from server: retrying");
sleep(1);
- grpc_httpcli_get(pr->ctx, &pr->pollset, &req, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10),
- got_port_from_server, pr);
+ grpc_httpcli_get(pr->ctx, &pr->pollset, &req,
+ GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server,
+ pr, call_list);
return;
}
GPR_ASSERT(response);
@@ -235,12 +246,16 @@ static int pick_port_using_server(char *server) {
grpc_httpcli_context context;
grpc_httpcli_request req;
portreq pr;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_closure shutdown_closure;
grpc_init();
memset(&pr, 0, sizeof(pr));
memset(&req, 0, sizeof(req));
grpc_pollset_init(&pr.pollset);
+ grpc_closure_init(&shutdown_closure, destroy_pollset_and_shutdown,
+ &pr.pollset);
pr.port = -1;
pr.server = server;
pr.ctx = &context;
@@ -251,7 +266,8 @@ static int pick_port_using_server(char *server) {
grpc_httpcli_context_init(&context);
grpc_httpcli_get(&context, &pr.pollset, &req,
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server,
- &pr);
+ &pr, &call_list);
+ grpc_call_list_run(&call_list);
gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset));
while (pr.port == -1) {
grpc_pollset_worker worker;
@@ -261,7 +277,8 @@ static int pick_port_using_server(char *server) {
gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset));
grpc_httpcli_context_destroy(&context);
- grpc_pollset_shutdown(&pr.pollset, destroy_pollset_and_shutdown, &pr.pollset);
+ grpc_pollset_shutdown(&pr.pollset, &shutdown_closure, &call_list);
+ grpc_call_list_run(&call_list);
return pr.port;
}
diff --git a/test/core/util/reconnect_server.c b/test/core/util/reconnect_server.c
index 71fb69b54f..b4eb7ed916 100644
--- a/test/core/util/reconnect_server.c
+++ b/test/core/util/reconnect_server.c
@@ -65,15 +65,16 @@ static void pretty_print_backoffs(reconnect_server *server) {
}
}
-static void on_connect(void *arg, grpc_endpoint *tcp) {
+static void on_connect(void *arg, grpc_endpoint *tcp,
+ grpc_call_list *call_list) {
char *peer;
char *last_colon;
reconnect_server *server = (reconnect_server *)arg;
gpr_timespec now = gpr_now(GPR_CLOCK_REALTIME);
timestamp_list *new_tail;
peer = grpc_endpoint_get_peer(tcp);
- grpc_endpoint_shutdown(tcp);
- grpc_endpoint_destroy(tcp);
+ grpc_endpoint_shutdown(tcp, call_list);
+ grpc_endpoint_destroy(tcp, call_list);
if (peer) {
last_colon = strrchr(peer, ':');
if (server->peer == NULL) {
@@ -114,6 +115,7 @@ void reconnect_server_init(reconnect_server *server) {
void reconnect_server_start(reconnect_server *server, int port) {
struct sockaddr_in addr;
int port_added;
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
addr.sin_family = AF_INET;
addr.sin_port = htons((gpr_uint16)port);
@@ -125,8 +127,10 @@ void reconnect_server_start(reconnect_server *server, int port) {
GPR_ASSERT(port_added == port);
grpc_tcp_server_start(server->tcp_server, server->pollsets, 1, on_connect,
- server);
+ server, &call_list);
gpr_log(GPR_INFO, "reconnect tcp server listening on 0.0.0.0:%d", port);
+
+ grpc_call_list_run(&call_list);
}
void reconnect_server_poll(reconnect_server *server, int seconds) {
@@ -152,12 +156,18 @@ void reconnect_server_clear_timestamps(reconnect_server *server) {
server->peer = NULL;
}
-static void do_nothing(void *ignored) {}
+static void do_nothing(void *ignored, int success, grpc_call_list *call_list) {}
void reconnect_server_destroy(reconnect_server *server) {
- grpc_tcp_server_destroy(server->tcp_server, do_nothing, NULL);
+ grpc_call_list call_list = GRPC_CALL_LIST_INIT;
+ grpc_closure do_nothing_closure[2];
+ grpc_closure_init(&do_nothing_closure[0], do_nothing, NULL);
+ grpc_closure_init(&do_nothing_closure[1], do_nothing, NULL);
+ grpc_tcp_server_destroy(server->tcp_server, &do_nothing_closure[0],
+ &call_list);
reconnect_server_clear_timestamps(server);
- grpc_pollset_shutdown(&server->pollset, do_nothing, NULL);
+ grpc_pollset_shutdown(&server->pollset, &do_nothing_closure[1], &call_list);
+ grpc_call_list_run(&call_list);
grpc_pollset_destroy(&server->pollset);
grpc_shutdown();
}