aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/tcp_posix_test.c
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/iomgr/tcp_posix_test.c')
-rw-r--r--test/core/iomgr/tcp_posix_test.c148
1 files changed, 66 insertions, 82 deletions
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 6ad832231f..8acaa433bb 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -118,10 +118,12 @@ struct read_socket_state {
grpc_endpoint *ep;
ssize_t read_bytes;
ssize_t target_read_bytes;
+ gpr_slice_buffer incoming;
+ grpc_iomgr_closure read_cb;
};
-static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
- int *current_data) {
+static ssize_t count_slices(gpr_slice *slices, size_t nslices,
+ int *current_data) {
ssize_t num_bytes = 0;
unsigned i, j;
unsigned char *buf;
@@ -132,31 +134,41 @@ static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
*current_data = (*current_data + 1) % 256;
}
num_bytes += GPR_SLICE_LENGTH(slices[i]);
- gpr_slice_unref(slices[i]);
}
return num_bytes;
}
-static void read_cb(void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
+static void read_cb(void *user_data, int success) {
struct read_socket_state *state = (struct read_socket_state *)user_data;
ssize_t read_bytes;
int current_data;
- GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
+ GPR_ASSERT(success);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
current_data = state->read_bytes % 256;
- read_bytes = count_and_unref_slices(slices, nslices, &current_data);
+ read_bytes = count_slices(state->incoming.slices, state->incoming.count,
+ &current_data);
state->read_bytes += read_bytes;
gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
state->target_read_bytes);
if (state->read_bytes >= state->target_read_bytes) {
- /* empty */
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} else {
- grpc_endpoint_notify_on_read(state->ep, read_cb, state);
+ switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) {
+ case GRPC_ENDPOINT_DONE:
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ read_cb(user_data, 1);
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ read_cb(user_data, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ break;
+ }
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
/* Write to a socket, then read from it using the grpc_tcp API. */
@@ -181,8 +193,19 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
+ gpr_slice_buffer_init(&state.incoming);
+ grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
- grpc_endpoint_notify_on_read(ep, read_cb, &state);
+ switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
+ case GRPC_ENDPOINT_DONE:
+ read_cb(&state, 1);
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ read_cb(&state, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ }
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
@@ -193,6 +216,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep);
}
@@ -219,8 +243,19 @@ static void large_read_test(ssize_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
+ gpr_slice_buffer_init(&state.incoming);
+ grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
- grpc_endpoint_notify_on_read(ep, read_cb, &state);
+ switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
+ case GRPC_ENDPOINT_DONE:
+ read_cb(&state, 1);
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ read_cb(&state, 0);
+ break;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ }
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
@@ -231,6 +266,7 @@ static void large_read_test(ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep);
}
@@ -262,8 +298,7 @@ static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size,
return slices;
}
-static void write_done(void *user_data /* write_socket_state */,
- grpc_endpoint_cb_status error) {
+static void write_done(void *user_data /* write_socket_state */, int success) {
struct write_socket_state *state = (struct write_socket_state *)user_data;
gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -339,6 +374,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
size_t num_blocks;
gpr_slice *slices;
int current_data = 0;
+ gpr_slice_buffer outgoing;
+ grpc_iomgr_closure write_done_closure;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
@@ -355,74 +392,21 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
- if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) ==
- GRPC_ENDPOINT_WRITE_DONE) {
- /* Write completed immediately */
- read_bytes = drain_socket(sv[0]);
- GPR_ASSERT(read_bytes == num_bytes);
- } else {
- drain_socket_blocking(sv[0], num_bytes, num_bytes);
- gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
- for (;;) {
- grpc_pollset_worker worker;
- if (state.write_done) {
- break;
- }
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- }
-
- grpc_endpoint_destroy(ep);
- gpr_free(slices);
-}
-
-static void read_done_for_write_error(void *ud, gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_cb_status error) {
- GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK);
- GPR_ASSERT(nslices == 0);
-}
-
-/* Write to a socket using the grpc_tcp API, then drain it directly.
- Note that if the write does not complete immediately we need to drain the
- socket in parallel with the read. */
-static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
- int sv[2];
- grpc_endpoint *ep;
- struct write_socket_state state;
- size_t num_blocks;
- gpr_slice *slices;
- int current_data = 0;
- grpc_pollset_worker worker;
- gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
-
- gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d",
- num_bytes, slice_size);
-
- create_sockets(sv);
+ gpr_slice_buffer_init(&outgoing);
+ gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
+ grpc_iomgr_closure_init(&write_done_closure, write_done, &state);
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"),
- GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
- grpc_endpoint_add_to_pollset(ep, &g_pollset);
-
- close(sv[0]);
-
- state.ep = ep;
- state.write_done = 0;
-
- slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
-
- switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) {
- case GRPC_ENDPOINT_WRITE_DONE:
- case GRPC_ENDPOINT_WRITE_ERROR:
+ switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) {
+ case GRPC_ENDPOINT_DONE:
/* Write completed immediately */
+ read_bytes = drain_socket(sv[0]);
+ GPR_ASSERT(read_bytes == num_bytes);
break;
- case GRPC_ENDPOINT_WRITE_PENDING:
- grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL);
+ case GRPC_ENDPOINT_PENDING:
+ drain_socket_blocking(sv[0], num_bytes, num_bytes);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
+ grpc_pollset_worker worker;
if (state.write_done) {
break;
}
@@ -431,10 +415,14 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
break;
+ case GRPC_ENDPOINT_ERROR:
+ gpr_log(GPR_ERROR, "endpoint got error");
+ abort();
}
+ gpr_slice_buffer_destroy(&outgoing);
grpc_endpoint_destroy(ep);
- free(slices);
+ gpr_free(slices);
}
void run_tests(void) {
@@ -454,10 +442,6 @@ void run_tests(void) {
write_test(100000, 137);
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
- write_error_test(40320, i);
- }
-
- for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
write_test(40320, i);
}
}