diff options
author | 2017-12-06 09:05:05 -0800 | |
---|---|---|
committer | 2017-12-06 09:05:05 -0800 | |
commit | ad4d2dde0052efbbf49d64b0843c45f0381cfeb3 (patch) | |
tree | 6a657f8c6179d873b34505cdc24bce9462ca68eb /test/core/iomgr/tcp_posix_test.cc | |
parent | a3df36cc2505a89c2f481eea4a66a87b3002844a (diff) |
Revert "All instances of exec_ctx being passed around in src/core removed"
Diffstat (limited to 'test/core/iomgr/tcp_posix_test.cc')
-rw-r--r-- | test/core/iomgr/tcp_posix_test.cc | 153 |
1 files changed, 84 insertions, 69 deletions
diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc index f4acba8302..7986dc2b19 100644 --- a/test/core/iomgr/tcp_posix_test.cc +++ b/test/core/iomgr/tcp_posix_test.cc @@ -131,7 +131,8 @@ static size_t count_slices(grpc_slice* slices, size_t nslices, return num_bytes; } -static void read_cb(void* user_data, grpc_error* error) { +static void read_cb(grpc_exec_ctx* exec_ctx, void* user_data, + grpc_error* error) { struct read_socket_state* state = (struct read_socket_state*)user_data; size_t read_bytes; int current_data; @@ -146,11 +147,11 @@ static void read_cb(void* user_data, grpc_error* error) { gpr_log(GPR_INFO, "Read %" PRIuPTR " bytes of %" PRIuPTR, read_bytes, state->target_read_bytes); if (state->read_bytes >= state->target_read_bytes) { - GPR_ASSERT( - GRPC_LOG_IF_ERROR("kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } else { - grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb); + grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb); gpr_mu_unlock(g_mu); } } @@ -163,7 +164,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { size_t written_bytes; grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "Read test of size %" PRIuPTR ", slice size %" PRIuPTR, num_bytes, slice_size); @@ -174,8 +175,9 @@ static void read_test(size_t num_bytes, size_t slice_size) { a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE); a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test"); - grpc_endpoint_add_to_pollset(ep, g_pollset); + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args, + "test"); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes); @@ -186,22 +188,24 @@ static void read_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_slice_buffer_destroy_internal(&state.incoming); - grpc_endpoint_destroy(ep); + grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming); + grpc_endpoint_destroy(&exec_ctx, ep); + grpc_exec_ctx_finish(&exec_ctx); } /* Write to a socket until it fills up, then read from it using the grpc_tcp @@ -213,7 +217,7 @@ static void large_read_test(size_t slice_size) { ssize_t written_bytes; grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "Start large read test, slice size %" PRIuPTR, slice_size); @@ -224,8 +228,9 @@ static void large_read_test(size_t slice_size) { a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), &args, "test"); - grpc_endpoint_add_to_pollset(ep, g_pollset); + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "large_read_test"), + &args, "test"); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes); @@ -236,22 +241,24 @@ static void large_read_test(size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_slice_buffer_destroy_internal(&state.incoming); - grpc_endpoint_destroy(ep); + grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming); + grpc_endpoint_destroy(&exec_ctx, ep); + grpc_exec_ctx_finish(&exec_ctx); } struct write_socket_state { @@ -282,15 +289,16 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size, return slices; } -static void write_done(void* user_data /* write_socket_state */, +static void write_done(grpc_exec_ctx* exec_ctx, + void* user_data /* write_socket_state */, grpc_error* error) { struct write_socket_state* state = (struct write_socket_state*)user_data; gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(g_mu); gpr_log(GPR_INFO, "Signalling write done"); state->write_done = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); gpr_mu_unlock(g_mu); } @@ -301,7 +309,7 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { int flags; int current = 0; int i; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; flags = fcntl(fd, F_GETFL, 0); GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); @@ -311,11 +319,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { gpr_mu_lock(g_mu); GPR_ASSERT(GRPC_LOG_IF_ERROR( "pollset_work", - grpc_pollset_work(g_pollset, &worker, + grpc_pollset_work(&exec_ctx, g_pollset, &worker, grpc_timespec_to_millis_round_up( grpc_timeout_milliseconds_to_deadline(10))))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); do { bytes_read = read(fd, buf, bytes_left > read_size ? read_size : bytes_left); @@ -348,7 +356,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_closure write_done_closure; grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR, @@ -360,8 +368,9 @@ static void write_test(size_t num_bytes, size_t slice_size) { a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE); a[0].type = GRPC_ARG_INTEGER, a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), &args, "test"); - grpc_endpoint_add_to_pollset(ep, g_pollset); + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "write_test"), &args, + "test"); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); state.ep = ep; state.write_done = 0; @@ -373,7 +382,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_write(ep, &outgoing, &write_done_closure); + grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); drain_socket_blocking(sv[0], num_bytes, num_bytes); gpr_mu_lock(g_mu); for (;;) { @@ -382,23 +391,25 @@ static void write_test(size_t num_bytes, size_t slice_size) { break; } GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_mu_unlock(g_mu); - + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(g_mu); } gpr_mu_unlock(g_mu); - grpc_slice_buffer_destroy_internal(&outgoing); - grpc_endpoint_destroy(ep); + grpc_slice_buffer_destroy_internal(&exec_ctx, &outgoing); + grpc_endpoint_destroy(&exec_ctx, ep); gpr_free(slices); + grpc_exec_ctx_finish(&exec_ctx); } -void on_fd_released(void* arg, grpc_error* errors) { +void on_fd_released(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* errors) { int* done = (int*)arg; *done = 1; - GPR_ASSERT( - GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr))); + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_kick", grpc_pollset_kick(exec_ctx, g_pollset, nullptr))); } /* Do a read_test, then release fd and try to read/write again. Verify that @@ -411,7 +422,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { int fd; grpc_millis deadline = grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20)); - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_closure fd_released_cb; int fd_released_done = 0; GRPC_CLOSURE_INIT(&fd_released_cb, &on_fd_released, &fd_released_done, @@ -428,9 +439,10 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), &args, "test"); + ep = grpc_tcp_create(&exec_ctx, grpc_fd_create(sv[1], "read_test"), &args, + "test"); GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); - grpc_endpoint_add_to_pollset(ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %" PRIuPTR " bytes", written_bytes); @@ -441,35 +453,38 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { grpc_slice_buffer_init(&state.incoming); GRPC_CLOSURE_INIT(&state.read_cb, read_cb, &state, grpc_schedule_on_exec_ctx); - grpc_endpoint_read(ep, &state.incoming, &state.read_cb); + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); gpr_mu_lock(g_mu); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR, state.read_bytes, state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_core::ExecCtx::Get()->Flush(); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_slice_buffer_destroy_internal(&state.incoming); - grpc_tcp_destroy_and_release_fd(ep, &fd, &fd_released_cb); - grpc_core::ExecCtx::Get()->Flush(); + grpc_slice_buffer_destroy_internal(&exec_ctx, &state.incoming); + grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); while (!fd_released_done) { grpc_pollset_worker* worker = nullptr; GPR_ASSERT(GRPC_LOG_IF_ERROR( - "pollset_work", grpc_pollset_work(g_pollset, &worker, deadline))); + "pollset_work", + grpc_pollset_work(&exec_ctx, g_pollset, &worker, deadline))); gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done); } gpr_mu_unlock(g_mu); GPR_ASSERT(fd_released_done == 1); GPR_ASSERT(fd == sv[1]); + grpc_exec_ctx_finish(&exec_ctx); written_bytes = fill_socket_partial(sv[0], num_bytes); drain_socket_blocking(fd, written_bytes, written_bytes); @@ -507,7 +522,7 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( size_t slice_size) { int sv[2]; grpc_endpoint_test_fixture f; - grpc_core::ExecCtx exec_ctx; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; create_sockets(sv); grpc_resource_quota* resource_quota = @@ -517,13 +532,15 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( a[0].type = GRPC_ARG_INTEGER; a[0].value.integer = (int)slice_size; grpc_channel_args args = {GPR_ARRAY_SIZE(a), a}; - f.client_ep = - grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), &args, "test"); - f.server_ep = - grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), &args, "test"); - grpc_resource_quota_unref_internal(resource_quota); - grpc_endpoint_add_to_pollset(f.client_ep, g_pollset); - grpc_endpoint_add_to_pollset(f.server_ep, g_pollset); + f.client_ep = grpc_tcp_create( + &exec_ctx, grpc_fd_create(sv[0], "fixture:client"), &args, "test"); + f.server_ep = grpc_tcp_create( + &exec_ctx, grpc_fd_create(sv[1], "fixture:server"), &args, "test"); + grpc_resource_quota_unref_internal(&exec_ctx, resource_quota); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); + + grpc_exec_ctx_finish(&exec_ctx); return f; } @@ -532,26 +549,24 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up}, }; -static void destroy_pollset(void* p, grpc_error* error) { - grpc_pollset_destroy((grpc_pollset*)p); +static void destroy_pollset(grpc_exec_ctx* exec_ctx, void* p, + grpc_error* error) { + grpc_pollset_destroy(exec_ctx, (grpc_pollset*)p); } int main(int argc, char** argv) { grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); - { - grpc_core::ExecCtx exec_ctx; - g_pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); - grpc_pollset_init(g_pollset, &g_mu); - grpc_endpoint_tests(configs[0], g_pollset, g_mu); - run_tests(); - GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, - grpc_schedule_on_exec_ctx); - grpc_pollset_shutdown(g_pollset, &destroyed); - - grpc_core::ExecCtx::Get()->Flush(); - } + g_pollset = (grpc_pollset*)gpr_zalloc(grpc_pollset_size()); + grpc_pollset_init(g_pollset, &g_mu); + grpc_endpoint_tests(configs[0], g_pollset, g_mu); + run_tests(); + GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(&exec_ctx, g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); gpr_free(g_pollset); |