aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/tcp_posix_test.cc
diff options
context:
space:
mode:
authorGravatar Yash Tibrewal <yashkt@google.com>2017-12-06 09:05:05 -0800
committerGravatar GitHub <noreply@github.com>2017-12-06 09:05:05 -0800
commitad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch)
tree6a657f8c6179d873b34505cdc24bce9462ca68eb /test/core/iomgr/tcp_posix_test.cc
parenta3df36cc2505a89c2f481eea4a66a87b3002844a (diff)
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'test/core/iomgr/tcp_posix_test.cc')
-rw-r--r--test/core/iomgr/tcp_posix_test.cc153
1 files changed, 84 insertions, 69 deletions
diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc
index f4acba8302..7986dc2b19 100644
--- a/test/core/iomgr/tcp_posix_test.cc
+++ b/test/core/iomgr/tcp_posix_test.cc
@@ -131,7 +131,8 @@ static size_t count_slices(grpc_slice* slices, size_t nslices,
return num_bytes;
}
-static void read_cb(void* user_data, grpc_error* error) {
+static void read_cb(grpc_exec_ctx* exec_ctx, void* user_data,
+ grpc_error* error) {
struct read_socket_state* state = (struct read_socket_state*)user_data;
size_t read_bytes;
int current_data;
@@ -146,11 +147,11 @@ static void read_cb(void* user_data, grpc_error* error) {
gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes,
state->target_read_bytes);
if (state->read_bytes >= state->target_read_bytes) {
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
} else {
- grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb);
+ grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb);
gpr_mu_unlock(g_mu);
}
}
@@ -163,7 +164,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
size_t written_bytes;
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR,
num_bytes, slice_size);
@@ -174,8 +175,9 @@ static void read_test(size_t num_bytes, size_t slice_size) {
a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test");
- grpc_endpoint_add_to_pollset(ep, g_pollset);
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args,
+ "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
@@ -186,22 +188,24 @@ static void read_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
- grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(g_mu);
- grpc_slice_buffer_destroy_internal(&state.incoming);
- grpc_endpoint_destroy(ep);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming);
+ grpc_endpoint_destroy(&exec_ctx, ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/* Write to a socket until it fills up, then read from it using the grpc_tcp
@@ -213,7 +217,7 @@ static void large_read_test(size_t slice_size) {
ssize_t written_bytes;
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size);
@@ -224,8 +228,9 @@ static void large_read_test(size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), &args, "test");
- grpc_endpoint_add_to_pollset(ep, g_pollset);
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "large_read_test"),
+ &args, "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket(sv[0]);
gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
@@ -236,22 +241,24 @@ static void large_read_test(size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
- grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(g_mu);
- grpc_slice_buffer_destroy_internal(&state.incoming);
- grpc_endpoint_destroy(ep);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming);
+ grpc_endpoint_destroy(&exec_ctx, ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
struct write_socket_state {
@@ -282,15 +289,16 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
return slices;
}
-static void write_done(void* user_data /* write_socket_state */,
+static void write_done(grpc_exec_ctx* exec_ctx,
+ void* user_data /* write_socket_state */,
grpc_error* error) {
struct write_socket_state* state = (struct write_socket_state*)user_data;
gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(g_mu);
gpr_log(GPR_INFO, "Signalling write done");
state->write_done = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
gpr_mu_unlock(g_mu);
}
@@ -301,7 +309,7 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
int flags;
int current = 0;
int i;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
@@ -311,11 +319,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
gpr_mu_lock(g_mu);
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work",
- grpc_pollset_work(g_pollset, &worker,
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
grpc_timespec_to_millis_round_up(
grpc_timeout_milliseconds_to_deadline(10)))));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
do {
bytes_read =
read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
@@ -348,7 +356,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
grpc_closure write_done_closure;
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO,
"Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
@@ -360,8 +368,9 @@ static void write_test(size_t num_bytes, size_t slice_size) {
a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), &args, "test");
- grpc_endpoint_add_to_pollset(ep, g_pollset);
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "write_test"), &args,
+ "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
state.ep = ep;
state.write_done = 0;
@@ -373,7 +382,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
grpc_schedule_on_exec_ctx);
- grpc_endpoint_write(ep, &outgoing, &write_done_closure);
+ grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure);
drain_socket_blocking(sv[0], num_bytes, num_bytes);
gpr_mu_lock(g_mu);
for (;;) {
@@ -382,23 +391,25 @@ static void write_test(size_t num_bytes, size_t slice_size) {
break;
}
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
-
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
- grpc_slice_buffer_destroy_internal(&outgoing);
- grpc_endpoint_destroy(ep);
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &outgoing);
+ grpc_endpoint_destroy(&exec_ctx, ep);
gpr_free(slices);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-void on_fd_released(void* arg, grpc_error* errors) {
+void on_fd_released(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* errors) {
int* done = (int*)arg;
*done = 1;
- GPR_ASSERT(
- GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr)));
}
/* Do a read_test, then release fd and try to read/write again. Verify that
@@ -411,7 +422,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
int fd;
grpc_millis deadline =
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_closure fd_released_cb;
int fd_released_done = 0;
GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done,
@@ -428,9 +439,10 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test");
+ ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args,
+ "test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
- grpc_endpoint_add_to_pollset(ep, g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes);
@@ -441,35 +453,38 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
grpc_slice_buffer_init(&state.incoming);
GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx);
- grpc_endpoint_read(ep, &state.incoming, &state.read_cb);
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
gpr_mu_lock(g_mu);
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR,
state.read_bytes, state.target_read_bytes);
gpr_mu_unlock(g_mu);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(g_mu);
- grpc_slice_buffer_destroy_internal(&state.incoming);
- grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb);
- grpc_core::ExecCtx::Get()->Flush();
+ grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming);
+ grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb);
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
while (!fd_released_done) {
grpc_pollset_worker* worker = nullptr;
GPR_ASSERT(GRPC_LOG_IF_ERROR(
- "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline)));
gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
}
gpr_mu_unlock(g_mu);
GPR_ASSERT(fd_released_done == 1);
GPR_ASSERT(fd == sv[1]);
+ grpc_exec_ctx_finish(&exec_ctx);
written_bytes = fill_socket_partial(sv[0], num_bytes);
drain_socket_blocking(fd, written_bytes, written_bytes);
@@ -507,7 +522,7 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
size_t slice_size) {
int sv[2];
grpc_endpoint_test_fixture f;
- grpc_core::ExecCtx exec_ctx;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
create_sockets(sv);
grpc_resource_quota* resource_quota =
@@ -517,13 +532,15 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
a[0].type = GRPC_ARG_INTEGER;
a[0].value.integer = (int)slice_size;
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- f.client_ep =
- grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), &args, "test");
- f.server_ep =
- grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), &args, "test");
- grpc_resource_quota_unref_internal(resource_quota);
- grpc_endpoint_add_to_pollset(f.client_ep, g_pollset);
- grpc_endpoint_add_to_pollset(f.server_ep, g_pollset);
+ f.client_ep = grpc_tcp_create(
+ &exec_ctx, grpc_fd_create(sv[0], "fixture:client"), &args, "test");
+ f.server_ep = grpc_tcp_create(
+ &exec_ctx, grpc_fd_create(sv[1], "fixture:server"), &args, "test");
+ grpc_resource_quota_unref_internal(&exec_ctx, resource_quota);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset);
+
+ grpc_exec_ctx_finish(&exec_ctx);
return f;
}
@@ -532,26 +549,24 @@ static grpc_endpoint_test_config configs[] = {
{"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
};
-static void destroy_pollset(void* p, grpc_error* error) {
- grpc_pollset_destroy((grpc_pollset*)p);
+static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p,
+ grpc_error* error) {
+ grpc_pollset_destroy(exec_ctx, (grpc_pollset*)p);
}
int main(int argc, char** argv) {
grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
- {
- grpc_core::ExecCtx exec_ctx;
- g_pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
- grpc_pollset_init(g_pollset, &g_mu);
- grpc_endpoint_tests(configs[0], g_pollset, g_mu);
- run_tests();
- GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
- grpc_schedule_on_exec_ctx);
- grpc_pollset_shutdown(g_pollset, &destroyed);
-
- grpc_core::ExecCtx::Get()->Flush();
- }
+ g_pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size());
+ grpc_pollset_init(g_pollset, &g_mu);
+ grpc_endpoint_tests(configs[0], g_pollset, g_mu);
+ run_tests();
+ GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset,
+ grpc_schedule_on_exec_ctx);
+ grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
gpr_free(g_pollset);