aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/tcp_posix_test.c
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/iomgr/tcp_posix_test.c')
-rw-r--r--test/core/iomgr/tcp_posix_test.c84
1 files changed, 52 insertions, 32 deletions
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index d998958744..5eafa570bb 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -124,22 +124,22 @@ struct read_socket_state {
grpc_endpoint *ep;
size_t read_bytes;
size_t target_read_bytes;
- gpr_slice_buffer incoming;
+ grpc_slice_buffer incoming;
grpc_closure read_cb;
};
-static size_t count_slices(gpr_slice *slices, size_t nslices,
+static size_t count_slices(grpc_slice *slices, size_t nslices,
int *current_data) {
size_t num_bytes = 0;
unsigned i, j;
unsigned char *buf;
for (i = 0; i < nslices; ++i) {
- buf = GPR_SLICE_START_PTR(slices[i]);
- for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) {
+ buf = GRPC_SLICE_START_PTR(slices[i]);
+ for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
GPR_ASSERT(buf[j] == *current_data);
*current_data = (*current_data + 1) % 256;
}
- num_bytes += GPR_SLICE_LENGTH(slices[i]);
+ num_bytes += GRPC_SLICE_LENGTH(slices[i]);
}
return num_bytes;
}
@@ -181,7 +181,10 @@ static void read_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test");
+ grpc_resource_quota *resource_quota = grpc_resource_quota_create("read_test");
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota,
+ slice_size, "test");
+ grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@@ -190,7 +193,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
- gpr_slice_buffer_init(&state.incoming);
+ grpc_slice_buffer_init(&state.incoming);
grpc_closure_init(&state.read_cb, read_cb, &state);
grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
@@ -209,7 +212,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(g_mu);
- gpr_slice_buffer_destroy(&state.incoming);
+ grpc_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(&exec_ctx, ep);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -228,8 +231,11 @@ static void large_read_test(size_t slice_size) {
create_sockets(sv);
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size,
- "test");
+ grpc_resource_quota *resource_quota =
+ grpc_resource_quota_create("large_read_test");
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), resource_quota,
+ slice_size, "test");
+ grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket(sv[0]);
@@ -238,7 +244,7 @@ static void large_read_test(size_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = (size_t)written_bytes;
- gpr_slice_buffer_init(&state.incoming);
+ grpc_slice_buffer_init(&state.incoming);
grpc_closure_init(&state.read_cb, read_cb, &state);
grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
@@ -257,7 +263,7 @@ static void large_read_test(size_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(g_mu);
- gpr_slice_buffer_destroy(&state.incoming);
+ grpc_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(&exec_ctx, ep);
grpc_exec_ctx_finish(&exec_ctx);
}
@@ -267,21 +273,21 @@ struct write_socket_state {
int write_done;
};
-static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size,
- size_t *num_blocks, uint8_t *current_data) {
+static grpc_slice *allocate_blocks(size_t num_bytes, size_t slice_size,
+ size_t *num_blocks, uint8_t *current_data) {
size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
- gpr_slice *slices = gpr_malloc(sizeof(gpr_slice) * nslices);
+ grpc_slice *slices = gpr_malloc(sizeof(grpc_slice) * nslices);
size_t num_bytes_left = num_bytes;
unsigned i, j;
unsigned char *buf;
*num_blocks = nslices;
for (i = 0; i < nslices; ++i) {
- slices[i] = gpr_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
- : slice_size);
- num_bytes_left -= GPR_SLICE_LENGTH(slices[i]);
- buf = GPR_SLICE_START_PTR(slices[i]);
- for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) {
+ slices[i] = grpc_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
+ : slice_size);
+ num_bytes_left -= GRPC_SLICE_LENGTH(slices[i]);
+ buf = GRPC_SLICE_START_PTR(slices[i]);
+ for (j = 0; j < GRPC_SLICE_LENGTH(slices[i]); ++j) {
buf[j] = *current_data;
(*current_data)++;
}
@@ -351,9 +357,9 @@ static void write_test(size_t num_bytes, size_t slice_size) {
grpc_endpoint *ep;
struct write_socket_state state;
size_t num_blocks;
- gpr_slice *slices;
+ grpc_slice *slices;
uint8_t current_data = 0;
- gpr_slice_buffer outgoing;
+ grpc_slice_buffer outgoing;
grpc_closure write_done_closure;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -364,8 +370,11 @@ static void write_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"),
+ grpc_resource_quota *resource_quota =
+ grpc_resource_quota_create("write_test");
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), resource_quota,
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
+ grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
state.ep = ep;
@@ -373,8 +382,8 @@ static void write_test(size_t num_bytes, size_t slice_size) {
slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
- gpr_slice_buffer_init(&outgoing);
- gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
+ grpc_slice_buffer_init(&outgoing);
+ grpc_slice_buffer_addn(&outgoing, slices, num_blocks);
grpc_closure_init(&write_done_closure, write_done, &state);
grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure);
@@ -395,7 +404,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
}
gpr_mu_unlock(g_mu);
- gpr_slice_buffer_destroy(&outgoing);
+ grpc_slice_buffer_destroy(&outgoing);
grpc_endpoint_destroy(&exec_ctx, ep);
gpr_free(slices);
grpc_exec_ctx_finish(&exec_ctx);
@@ -428,8 +437,12 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test");
+ grpc_resource_quota *resource_quota =
+ grpc_resource_quota_create("release_fd_test");
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), resource_quota,
+ slice_size, "test");
GPR_ASSERT(grpc_tcp_fd(ep) == sv[1] && sv[1] >= 0);
+ grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
grpc_endpoint_add_to_pollset(&exec_ctx, ep, g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
@@ -438,7 +451,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
- gpr_slice_buffer_init(&state.incoming);
+ grpc_slice_buffer_init(&state.incoming);
grpc_closure_init(&state.read_cb, read_cb, &state);
grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
@@ -450,15 +463,18 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
"pollset_work",
grpc_pollset_work(&exec_ctx, g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline)));
+ gpr_log(GPR_DEBUG, "wakeup: read=%" PRIdPTR " target=%" PRIdPTR,
+ state.read_bytes, state.target_read_bytes);
gpr_mu_unlock(g_mu);
- grpc_exec_ctx_finish(&exec_ctx);
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(g_mu);
- gpr_slice_buffer_destroy(&state.incoming);
+ grpc_slice_buffer_destroy(&state.incoming);
grpc_tcp_destroy_and_release_fd(&exec_ctx, ep, &fd, &fd_released_cb);
+ grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
while (!fd_released_done) {
grpc_pollset_worker *worker = NULL;
@@ -466,6 +482,7 @@ static void release_fd_test(size_t num_bytes, size_t slice_size) {
"pollset_work",
grpc_pollset_work(&exec_ctx, g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC), deadline)));
+ gpr_log(GPR_DEBUG, "wakeup: fd_released_done=%d", fd_released_done);
}
gpr_mu_unlock(g_mu);
GPR_ASSERT(fd_released_done == 1);
@@ -511,10 +528,13 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
create_sockets(sv);
+ grpc_resource_quota *resource_quota =
+ grpc_resource_quota_create("tcp_posix_test_socketpair");
f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"),
- slice_size, "test");
+ resource_quota, slice_size, "test");
f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"),
- slice_size, "test");
+ resource_quota, slice_size, "test");
+ grpc_resource_quota_internal_unref(&exec_ctx, resource_quota);
grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, g_pollset);
grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, g_pollset);