aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/iomgr')
-rw-r--r--test/core/iomgr/endpoint_tests.c204
-rw-r--r--test/core/iomgr/tcp_posix_test.c148
2 files changed, 174 insertions, 178 deletions
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index ef673747a1..6ef8e9ca3b 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -59,7 +59,8 @@
static grpc_pollset *g_pollset;
-size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) {
+size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
+ int *current_data) {
size_t num_bytes = 0;
size_t i;
size_t j;
@@ -71,6 +72,7 @@ size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) {
*current_data = (*current_data + 1) % 256;
}
num_bytes += GPR_SLICE_LENGTH(slices[i]);
+ gpr_slice_unref(slices[i]);
}
return num_bytes;
}
@@ -119,76 +121,86 @@ struct read_and_write_test_state {
int current_write_data;
int read_done;
int write_done;
- gpr_slice_buffer incoming;
- gpr_slice_buffer outgoing;
- grpc_iomgr_closure done_read;
- grpc_iomgr_closure done_write;
};
-static void read_and_write_test_read_handler(void *data, int success) {
+static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_cb_status error) {
struct read_and_write_test_state *state = data;
+ GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
+ if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
+ gpr_log(GPR_INFO, "Read handler shutdown");
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+ state->read_done = 1;
+ grpc_pollset_kick(g_pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
+ return;
+ }
- state->bytes_read += count_slices(
- state->incoming.slices, state->incoming.count, &state->current_read_data);
- if (state->bytes_read == state->target_bytes || !success) {
+ state->bytes_read +=
+ count_and_unref_slices(slices, nslices, &state->current_read_data);
+ if (state->bytes_read == state->target_bytes) {
gpr_log(GPR_INFO, "Read handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->read_done = 1 + success;
+ state->read_done = 1;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- } else if (success) {
- switch (grpc_endpoint_read(state->read_ep, &state->incoming,
- &state->done_read)) {
- case GRPC_ENDPOINT_ERROR:
- read_and_write_test_read_handler(data, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- read_and_write_test_read_handler(data, 1);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ } else {
+ grpc_endpoint_notify_on_read(state->read_ep,
+ read_and_write_test_read_handler, data);
}
}
-static void read_and_write_test_write_handler(void *data, int success) {
+static void read_and_write_test_write_handler(void *data,
+ grpc_endpoint_cb_status error) {
struct read_and_write_test_state *state = data;
gpr_slice *slices = NULL;
size_t nslices;
- grpc_endpoint_op_status write_status;
-
- if (success) {
- for (;;) {
- /* Need to do inline writes until they don't succeed synchronously or we
- finish writing */
- state->bytes_written += state->current_write_size;
- if (state->target_bytes - state->bytes_written <
- state->current_write_size) {
- state->current_write_size = state->target_bytes - state->bytes_written;
- }
- if (state->current_write_size == 0) {
- break;
- }
-
- slices = allocate_blocks(state->current_write_size, 8192, &nslices,
- &state->current_write_data);
- gpr_slice_buffer_reset_and_unref(&state->outgoing);
- gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
- write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
- &state->done_write);
- gpr_log(GPR_DEBUG, "write_status=%d", write_status);
- GPR_ASSERT(write_status != GRPC_ENDPOINT_ERROR);
- free(slices);
- if (write_status == GRPC_ENDPOINT_PENDING) {
- return;
- }
+ grpc_endpoint_write_status write_status;
+
+ GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
+
+ gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler",
+ error);
+
+ if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
+ gpr_log(GPR_INFO, "Write handler shutdown");
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+ state->write_done = 1;
+ grpc_pollset_kick(g_pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
+ return;
+ }
+
+ for (;;) {
+ /* Need to do inline writes until they don't succeed synchronously or we
+ finish writing */
+ state->bytes_written += state->current_write_size;
+ if (state->target_bytes - state->bytes_written <
+ state->current_write_size) {
+ state->current_write_size = state->target_bytes - state->bytes_written;
+ }
+ if (state->current_write_size == 0) {
+ break;
+ }
+
+ slices = allocate_blocks(state->current_write_size, 8192, &nslices,
+ &state->current_write_data);
+ write_status =
+ grpc_endpoint_write(state->write_ep, slices, nslices,
+ read_and_write_test_write_handler, state);
+ gpr_log(GPR_DEBUG, "write_status=%d", write_status);
+ GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR);
+ free(slices);
+ if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
+ return;
}
- GPR_ASSERT(state->bytes_written == state->target_bytes);
}
+ GPR_ASSERT(state->bytes_written == state->target_bytes);
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->write_done = 1 + success;
+ state->write_done = 1;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
@@ -222,31 +234,16 @@ static void read_and_write_test(grpc_endpoint_test_config config,
state.write_done = 0;
state.current_read_data = 0;
state.current_write_data = 0;
- grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler,
- &state);
- grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler,
- &state);
- gpr_slice_buffer_init(&state.outgoing);
- gpr_slice_buffer_init(&state.incoming);
/* Get started by pretending an initial write completed */
/* NOTE: Sets up initial conditions so we can have the same write handler
for the first iteration as for later iterations. It does the right thing
even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size;
- read_and_write_test_write_handler(&state, 1);
+ read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK);
- switch (
- grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) {
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_ERROR:
- read_and_write_test_read_handler(&state, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- read_and_write_test_read_handler(&state, 1);
- break;
- }
+ grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler,
+ &state);
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
@@ -266,8 +263,6 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep);
- gpr_slice_buffer_destroy(&state.outgoing);
- gpr_slice_buffer_destroy(&state.incoming);
end_test(config);
}
@@ -278,40 +273,36 @@ struct timeout_test_state {
typedef struct {
int done;
grpc_endpoint *ep;
- gpr_slice_buffer incoming;
- grpc_iomgr_closure done_read;
} shutdown_during_write_test_state;
-static void shutdown_during_write_test_read_handler(void *user_data,
- int success) {
+static void shutdown_during_write_test_read_handler(
+ void *user_data, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
+ size_t i;
shutdown_during_write_test_state *st = user_data;
- if (!success) {
+ for (i = 0; i < nslices; i++) {
+ gpr_slice_unref(slices[i]);
+ }
+
+ if (error != GRPC_ENDPOINT_CB_OK) {
grpc_endpoint_destroy(st->ep);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- st->done = 1;
+ st->done = error;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else {
- switch (grpc_endpoint_read(st->ep, &st->incoming, &st->done_read)) {
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_ERROR:
- shutdown_during_write_test_read_handler(user_data, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- shutdown_during_write_test_read_handler(user_data, 1);
- break;
- }
+ grpc_endpoint_notify_on_read(
+ st->ep, shutdown_during_write_test_read_handler, user_data);
}
}
-static void shutdown_during_write_test_write_handler(void *user_data,
- int success) {
+static void shutdown_during_write_test_write_handler(
+ void *user_data, grpc_endpoint_cb_status error) {
shutdown_during_write_test_state *st = user_data;
- gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: success = %d",
- success);
- if (success) {
+ gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d",
+ error);
+ if (error == 0) {
/* This happens about 0.5% of the time when run under TSAN, and is entirely
legitimate, but means we aren't testing the path we think we are. */
/* TODO(klempner): Change this test to retry the write in that case */
@@ -334,8 +325,6 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
shutdown_during_write_test_state read_st;
shutdown_during_write_test_state write_st;
gpr_slice *slices;
- gpr_slice_buffer outgoing;
- grpc_iomgr_closure done_write;
grpc_endpoint_test_fixture f =
begin_test(config, "shutdown_during_write_test", slice_size);
@@ -346,26 +335,19 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
read_st.done = 0;
write_st.done = 0;
- grpc_iomgr_closure_init(&done_write, shutdown_during_write_test_write_handler,
- &write_st);
- grpc_iomgr_closure_init(&read_st.done_read,
- shutdown_during_write_test_read_handler, &read_st);
- gpr_slice_buffer_init(&read_st.incoming);
- gpr_slice_buffer_init(&outgoing);
-
- GPR_ASSERT(grpc_endpoint_read(read_st.ep, &read_st.incoming,
- &read_st.done_read) == GRPC_ENDPOINT_PENDING);
+ grpc_endpoint_notify_on_read(
+ read_st.ep, shutdown_during_write_test_read_handler, &read_st);
for (size = 1;; size *= 2) {
slices = allocate_blocks(size, 1, &nblocks, &current_data);
- gpr_slice_buffer_reset_and_unref(&outgoing);
- gpr_slice_buffer_addn(&outgoing, slices, nblocks);
- switch (grpc_endpoint_write(write_st.ep, &outgoing, &done_write)) {
- case GRPC_ENDPOINT_DONE:
+ switch (grpc_endpoint_write(write_st.ep, slices, nblocks,
+ shutdown_during_write_test_write_handler,
+ &write_st)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
break;
- case GRPC_ENDPOINT_ERROR:
+ case GRPC_ENDPOINT_WRITE_ERROR:
gpr_log(GPR_ERROR, "error writing");
abort();
- case GRPC_ENDPOINT_PENDING:
+ case GRPC_ENDPOINT_WRITE_PENDING:
grpc_endpoint_shutdown(write_st.ep);
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
@@ -386,8 +368,6 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
gpr_free(slices);
- gpr_slice_buffer_destroy(&read_st.incoming);
- gpr_slice_buffer_destroy(&outgoing);
end_test(config);
return;
}
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 8acaa433bb..6ad832231f 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -118,12 +118,10 @@ struct read_socket_state {
grpc_endpoint *ep;
ssize_t read_bytes;
ssize_t target_read_bytes;
- gpr_slice_buffer incoming;
- grpc_iomgr_closure read_cb;
};
-static ssize_t count_slices(gpr_slice *slices, size_t nslices,
- int *current_data) {
+static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
+ int *current_data) {
ssize_t num_bytes = 0;
unsigned i, j;
unsigned char *buf;
@@ -134,41 +132,31 @@ static ssize_t count_slices(gpr_slice *slices, size_t nslices,
*current_data = (*current_data + 1) % 256;
}
num_bytes += GPR_SLICE_LENGTH(slices[i]);
+ gpr_slice_unref(slices[i]);
}
return num_bytes;
}
-static void read_cb(void *user_data, int success) {
+static void read_cb(void *user_data, gpr_slice *slices, size_t nslices,
+ grpc_endpoint_cb_status error) {
struct read_socket_state *state = (struct read_socket_state *)user_data;
ssize_t read_bytes;
int current_data;
- GPR_ASSERT(success);
+ GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
current_data = state->read_bytes % 256;
- read_bytes = count_slices(state->incoming.slices, state->incoming.count,
- &current_data);
+ read_bytes = count_and_unref_slices(slices, nslices, &current_data);
state->read_bytes += read_bytes;
gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
state->target_read_bytes);
if (state->read_bytes >= state->target_read_bytes) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ /* empty */
} else {
- switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) {
- case GRPC_ENDPOINT_DONE:
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- read_cb(user_data, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- read_cb(user_data, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- break;
- }
+ grpc_endpoint_notify_on_read(state->ep, read_cb, state);
}
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
/* Write to a socket, then read from it using the grpc_tcp API. */
@@ -193,19 +181,8 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
- gpr_slice_buffer_init(&state.incoming);
- grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
- switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
- case GRPC_ENDPOINT_DONE:
- read_cb(&state, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- read_cb(&state, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_notify_on_read(ep, read_cb, &state);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
@@ -216,7 +193,6 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep);
}
@@ -243,19 +219,8 @@ static void large_read_test(ssize_t slice_size) {
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
- gpr_slice_buffer_init(&state.incoming);
- grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
- switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
- case GRPC_ENDPOINT_DONE:
- read_cb(&state, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- read_cb(&state, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_notify_on_read(ep, read_cb, &state);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
@@ -266,7 +231,6 @@ static void large_read_test(ssize_t slice_size) {
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(ep);
}
@@ -298,7 +262,8 @@ static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size,
return slices;
}
-static void write_done(void *user_data /* write_socket_state */, int success) {
+static void write_done(void *user_data /* write_socket_state */,
+ grpc_endpoint_cb_status error) {
struct write_socket_state *state = (struct write_socket_state *)user_data;
gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -374,8 +339,6 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
size_t num_blocks;
gpr_slice *slices;
int current_data = 0;
- gpr_slice_buffer outgoing;
- grpc_iomgr_closure write_done_closure;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
@@ -392,21 +355,74 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
- gpr_slice_buffer_init(&outgoing);
- gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
- grpc_iomgr_closure_init(&write_done_closure, write_done, &state);
+ if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) ==
+ GRPC_ENDPOINT_WRITE_DONE) {
+ /* Write completed immediately */
+ read_bytes = drain_socket(sv[0]);
+ GPR_ASSERT(read_bytes == num_bytes);
+ } else {
+ drain_socket_blocking(sv[0], num_bytes, num_bytes);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ for (;;) {
+ grpc_pollset_worker worker;
+ if (state.write_done) {
+ break;
+ }
+ grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ deadline);
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ }
+
+ grpc_endpoint_destroy(ep);
+ gpr_free(slices);
+}
+
+static void read_done_for_write_error(void *ud, gpr_slice *slices,
+ size_t nslices,
+ grpc_endpoint_cb_status error) {
+ GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK);
+ GPR_ASSERT(nslices == 0);
+}
+
+/* Write to a socket using the grpc_tcp API, then drain it directly.
+ Note that if the write does not complete immediately we need to drain the
+ socket in parallel with the read. */
+static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
+ int sv[2];
+ grpc_endpoint *ep;
+ struct write_socket_state state;
+ size_t num_blocks;
+ gpr_slice *slices;
+ int current_data = 0;
+ grpc_pollset_worker worker;
+ gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
+
+ gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d",
+ num_bytes, slice_size);
+
+ create_sockets(sv);
- switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) {
- case GRPC_ENDPOINT_DONE:
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"),
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
+ grpc_endpoint_add_to_pollset(ep, &g_pollset);
+
+ close(sv[0]);
+
+ state.ep = ep;
+ state.write_done = 0;
+
+ slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
+
+ switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) {
+ case GRPC_ENDPOINT_WRITE_DONE:
+ case GRPC_ENDPOINT_WRITE_ERROR:
/* Write completed immediately */
- read_bytes = drain_socket(sv[0]);
- GPR_ASSERT(read_bytes == num_bytes);
break;
- case GRPC_ENDPOINT_PENDING:
- drain_socket_blocking(sv[0], num_bytes, num_bytes);
+ case GRPC_ENDPOINT_WRITE_PENDING:
+ grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
- grpc_pollset_worker worker;
if (state.write_done) {
break;
}
@@ -415,14 +431,10 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
break;
- case GRPC_ENDPOINT_ERROR:
- gpr_log(GPR_ERROR, "endpoint got error");
- abort();
}
- gpr_slice_buffer_destroy(&outgoing);
grpc_endpoint_destroy(ep);
- gpr_free(slices);
+ free(slices);
}
void run_tests(void) {
@@ -442,6 +454,10 @@ void run_tests(void) {
write_test(100000, 137);
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
+ write_error_test(40320, i);
+ }
+
+ for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
write_test(40320, i);
}
}