diff options
Diffstat (limited to 'test/core/iomgr/tcp_posix_test.c')
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 84 |
1 files changed, 52 insertions, 32 deletions
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index d998958744..5eafa570bb 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -124,22 +124,22 @@ struct read_socket_state { grpc_endpoint *ep; size_t read_bytes; size_t target_read_bytes; - gpr_slice_buffer incoming; + grpc_slice_buffer incoming; grpc_closure read_cb; }; -static size_t count_slices(gpr_slice *slices, size_t nslices, +static size_t count_slices(grpc_slice *slices, size_t nslices, int *current_data) { size_t num_bytes = 0; unsigned i, j; unsigned char *buf; for (i = 0; i < nslices; ++i) { - buf = GPR_SLICE_START_PTR(slices[i]); - for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) { + buf = GRPC_SLICE_START_PTR(slices[i]); + for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) { GPR_ASSERT(buf[j] == *current_data); *current_data = (*current_data + 1) % 256; } - num_bytes += GPR_SLICE_LENGTH(slices[i]); + num_bytes += GRPC_SLICE_LENGTH(slices[i]); } return num_bytes; } @@ -181,7 +181,10 @@ static void read_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); + grpc_resource_quota *resource_quota = grpc_resource_quota_create("read_test"); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota, + slice_size, "test"); + grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -190,7 +193,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; - gpr_slice_buffer_init(&state.incoming); + grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state); grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); @@ -209,7 +212,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(g_mu); - gpr_slice_buffer_destroy(&state.incoming); + grpc_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(&exec_ctx, ep); grpc_exec_ctx_finish(&exec_ctx); } @@ -228,8 +231,11 @@ static void large_read_test(size_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size, - "test"); + grpc_resource_quota *resource_quota = + grpc_resource_quota_create("large_read_test"); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), resource_quota, + slice_size, "test"); + grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket(sv[0]); @@ -238,7 +244,7 @@ static void large_read_test(size_t slice_size) { state.ep = ep; state.read_bytes = 0; state.target_read_bytes = (size_t)written_bytes; - gpr_slice_buffer_init(&state.incoming); + grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state); grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); @@ -257,7 +263,7 @@ static void large_read_test(size_t slice_size) { GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(g_mu); - gpr_slice_buffer_destroy(&state.incoming); + grpc_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(&exec_ctx, ep); grpc_exec_ctx_finish(&exec_ctx); } @@ -267,21 +273,21 @@ struct write_socket_state { int write_done; }; -static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size, - size_t *num_blocks, uint8_t *current_data) { +static grpc_slice *allocate_blocks(size_t num_bytes, size_t slice_size, + size_t *num_blocks, uint8_t *current_data) { size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u); - gpr_slice *slices = gpr_malloc(sizeof(gpr_slice) * nslices); + grpc_slice *slices = gpr_malloc(sizeof(grpc_slice) * nslices); size_t num_bytes_left = num_bytes; unsigned i, j; unsigned char *buf; *num_blocks = nslices; for (i = 0; i < nslices; ++i) { - slices[i] = gpr_slice_malloc(slice_size > num_bytes_left ? num_bytes_left - : slice_size); - num_bytes_left -= GPR_SLICE_LENGTH(slices[i]); - buf = GPR_SLICE_START_PTR(slices[i]); - for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) { + slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left + : slice_size); + num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]); + buf = GRPC_SLICE_START_PTR(slices[i]); + for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) { buf[j] = *current_data; (*current_data)++; } @@ -351,9 +357,9 @@ static void write_test(size_t num_bytes, size_t slice_size) { grpc_endpoint *ep; struct write_socket_state state; size_t num_blocks; - gpr_slice *slices; + grpc_slice *slices; uint8_t current_data = 0; - gpr_slice_buffer outgoing; + grpc_slice_buffer outgoing; grpc_closure write_done_closure; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; @@ -364,8 +370,11 @@ static void write_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), + grpc_resource_quota *resource_quota = + grpc_resource_quota_create("write_test"); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), resource_quota, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); + grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); state.ep = ep; @@ -373,8 +382,8 @@ static void write_test(size_t num_bytes, size_t slice_size) { slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); - gpr_slice_buffer_init(&outgoing); - gpr_slice_buffer_addn(&outgoing, slices, num_blocks); + grpc_slice_buffer_init(&outgoing); + grpc_slice_buffer_addn(&outgoing, slices, num_blocks); grpc_closure_init(&write_done_closure, write_done, &state); grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); @@ -395,7 +404,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { } gpr_mu_unlock(g_mu); - gpr_slice_buffer_destroy(&outgoing); + grpc_slice_buffer_destroy(&outgoing); grpc_endpoint_destroy(&exec_ctx, ep); gpr_free(slices); grpc_exec_ctx_finish(&exec_ctx); @@ -428,8 +437,12 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); + grpc_resource_quota *resource_quota = + grpc_resource_quota_create("release_fd_test"); + ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota, + slice_size, "test"); GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0); + grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); @@ -438,7 +451,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; - gpr_slice_buffer_init(&state.incoming); + grpc_slice_buffer_init(&state.incoming); grpc_closure_init(&state.read_cb, read_cb, &state); grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); @@ -450,15 +463,18 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline))); + gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR, + state.read_bytes, state.target_read_bytes); gpr_mu_unlock(g_mu); - grpc_exec_ctx_finish(&exec_ctx); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(g_mu); - gpr_slice_buffer_destroy(&state.incoming); + grpc_slice_buffer_destroy(&state.incoming); grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb); + grpc_exec_ctx_flush(&exec_ctx); gpr_mu_lock(g_mu); while (!fd_released_done) { grpc_pollset_worker *worker = NULL; @@ -466,6 +482,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) { "pollset_work", grpc_pollset_work(&exec_ctx, g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), deadline))); + gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done); } gpr_mu_unlock(g_mu); GPR_ASSERT(fd_released_done == 1); @@ -511,10 +528,13 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; create_sockets(sv); + grpc_resource_quota *resource_quota = + grpc_resource_quota_create("tcp_posix_test_socketpair"); f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), - slice_size, "test"); + resource_quota, slice_size, "test"); f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), - slice_size, "test"); + resource_quota, slice_size, "test"); + grpc_resource_quota_internal_unref(&exec_ctx, resource_quota); grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset); grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset); |