aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/endpoint_tests.c
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/iomgr/endpoint_tests.c')
-rw-r--r--test/core/iomgr/endpoint_tests.c263
1 files changed, 82 insertions, 181 deletions
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index 6ef8e9ca3b..27123eb216 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -39,6 +39,7 @@
#include <grpc/support/slice.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
#include "test/core/util/test_config.h"
/*
@@ -59,8 +60,7 @@
static grpc_pollset *g_pollset;
-size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
- int *current_data) {
+size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) {
size_t num_bytes = 0;
size_t i;
size_t j;
@@ -72,7 +72,6 @@ size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
*current_data = (*current_data + 1) % 256;
}
num_bytes += GPR_SLICE_LENGTH(slices[i]);
- gpr_slice_unref(slices[i]);
}
return num_bytes;
}
@@ -121,86 +120,78 @@ struct read_and_write_test_state {
int current_write_data;
int read_done;
int write_done;
+ gpr_slice_buffer incoming;
+ gpr_slice_buffer outgoing;
+ grpc_iomgr_closure done_read;
+ grpc_iomgr_closure done_write;
};
-static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
- size_t nslices,
- grpc_endpoint_cb_status error) {
+static void read_and_write_test_read_handler(void *data, int success) {
struct read_and_write_test_state *state = data;
- GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
- if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
- gpr_log(GPR_INFO, "Read handler shutdown");
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->read_done = 1;
- grpc_pollset_kick(g_pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- return;
- }
- state->bytes_read +=
- count_and_unref_slices(slices, nslices, &state->current_read_data);
- if (state->bytes_read == state->target_bytes) {
+loop:
+ state->bytes_read += count_slices(
+ state->incoming.slices, state->incoming.count, &state->current_read_data);
+ if (state->bytes_read == state->target_bytes || !success) {
gpr_log(GPR_INFO, "Read handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->read_done = 1;
+ state->read_done = 1 + success;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- } else {
- grpc_endpoint_notify_on_read(state->read_ep,
- read_and_write_test_read_handler, data);
+ } else if (success) {
+ switch (grpc_endpoint_read(state->read_ep, &state->incoming,
+ &state->done_read)) {
+ case GRPC_ENDPOINT_ERROR:
+ success = 0;
+ goto loop;
+ case GRPC_ENDPOINT_DONE:
+ success = 1;
+ goto loop;
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ }
}
}
-static void read_and_write_test_write_handler(void *data,
- grpc_endpoint_cb_status error) {
+static void read_and_write_test_write_handler(void *data, int success) {
struct read_and_write_test_state *state = data;
gpr_slice *slices = NULL;
size_t nslices;
- grpc_endpoint_write_status write_status;
-
- GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
-
- gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler",
- error);
-
- if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
- gpr_log(GPR_INFO, "Write handler shutdown");
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->write_done = 1;
- grpc_pollset_kick(g_pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- return;
- }
-
- for (;;) {
- /* Need to do inline writes until they don't succeed synchronously or we
- finish writing */
- state->bytes_written += state->current_write_size;
- if (state->target_bytes - state->bytes_written <
- state->current_write_size) {
- state->current_write_size = state->target_bytes - state->bytes_written;
- }
- if (state->current_write_size == 0) {
- break;
- }
-
- slices = allocate_blocks(state->current_write_size, 8192, &nslices,
- &state->current_write_data);
- write_status =
- grpc_endpoint_write(state->write_ep, slices, nslices,
- read_and_write_test_write_handler, state);
- gpr_log(GPR_DEBUG, "write_status=%d", write_status);
- GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR);
- free(slices);
- if (write_status == GRPC_ENDPOINT_WRITE_PENDING) {
- return;
+ grpc_endpoint_op_status write_status;
+
+ if (success) {
+ for (;;) {
+ /* Need to do inline writes until they don't succeed synchronously or we
+ finish writing */
+ state->bytes_written += state->current_write_size;
+ if (state->target_bytes - state->bytes_written <
+ state->current_write_size) {
+ state->current_write_size = state->target_bytes - state->bytes_written;
+ }
+ if (state->current_write_size == 0) {
+ break;
+ }
+
+ slices = allocate_blocks(state->current_write_size, 8192, &nslices,
+ &state->current_write_data);
+ gpr_slice_buffer_reset_and_unref(&state->outgoing);
+ gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
+ write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
+ &state->done_write);
+ free(slices);
+ if (write_status == GRPC_ENDPOINT_PENDING) {
+ return;
+ } else if (write_status == GRPC_ENDPOINT_ERROR) {
+ goto cleanup;
+ }
}
+ GPR_ASSERT(state->bytes_written == state->target_bytes);
}
- GPR_ASSERT(state->bytes_written == state->target_bytes);
+cleanup:
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- state->write_done = 1;
+ state->write_done = 1 + success;
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
@@ -216,6 +207,8 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
grpc_endpoint_test_fixture f =
begin_test(config, "read_and_write_test", slice_size);
+ gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d",
+ num_bytes, write_size, slice_size, shutdown);
if (shutdown) {
gpr_log(GPR_INFO, "Start read and write shutdown test");
@@ -234,16 +227,31 @@ static void read_and_write_test(grpc_endpoint_test_config config,
state.write_done = 0;
state.current_read_data = 0;
state.current_write_data = 0;
+ grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler,
+ &state);
+ grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler,
+ &state);
+ gpr_slice_buffer_init(&state.outgoing);
+ gpr_slice_buffer_init(&state.incoming);
/* Get started by pretending an initial write completed */
/* NOTE: Sets up initial conditions so we can have the same write handler
for the first iteration as for later iterations. It does the right thing
even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size;
- read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK);
+ read_and_write_test_write_handler(&state, 1);
- grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler,
- &state);
+ switch (
+ grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) {
+ case GRPC_ENDPOINT_PENDING:
+ break;
+ case GRPC_ENDPOINT_ERROR:
+ read_and_write_test_read_handler(&state, 0);
+ break;
+ case GRPC_ENDPOINT_DONE:
+ read_and_write_test_read_handler(&state, 1);
+ break;
+ }
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
@@ -261,129 +269,22 @@ static void read_and_write_test(grpc_endpoint_test_config config,
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
+ end_test(config);
+ gpr_slice_buffer_destroy(&state.outgoing);
+ gpr_slice_buffer_destroy(&state.incoming);
grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep);
- end_test(config);
-}
-
-struct timeout_test_state {
- int io_done;
-};
-
-typedef struct {
- int done;
- grpc_endpoint *ep;
-} shutdown_during_write_test_state;
-
-static void shutdown_during_write_test_read_handler(
- void *user_data, gpr_slice *slices, size_t nslices,
- grpc_endpoint_cb_status error) {
- size_t i;
- shutdown_during_write_test_state *st = user_data;
-
- for (i = 0; i < nslices; i++) {
- gpr_slice_unref(slices[i]);
- }
-
- if (error != GRPC_ENDPOINT_CB_OK) {
- grpc_endpoint_destroy(st->ep);
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- st->done = error;
- grpc_pollset_kick(g_pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- } else {
- grpc_endpoint_notify_on_read(
- st->ep, shutdown_during_write_test_read_handler, user_data);
- }
-}
-
-static void shutdown_during_write_test_write_handler(
- void *user_data, grpc_endpoint_cb_status error) {
- shutdown_during_write_test_state *st = user_data;
- gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d",
- error);
- if (error == 0) {
- /* This happens about 0.5% of the time when run under TSAN, and is entirely
- legitimate, but means we aren't testing the path we think we are. */
- /* TODO(klempner): Change this test to retry the write in that case */
- gpr_log(GPR_ERROR,
- "shutdown_during_write_test_write_handler completed unexpectedly");
- }
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- st->done = 1;
- grpc_pollset_kick(g_pollset, NULL);
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
-}
-
-static void shutdown_during_write_test(grpc_endpoint_test_config config,
- size_t slice_size) {
- /* test that shutdown with a pending write creates no leaks */
- gpr_timespec deadline;
- size_t size;
- size_t nblocks;
- int current_data = 1;
- shutdown_during_write_test_state read_st;
- shutdown_during_write_test_state write_st;
- gpr_slice *slices;
- grpc_endpoint_test_fixture f =
- begin_test(config, "shutdown_during_write_test", slice_size);
-
- gpr_log(GPR_INFO, "testing shutdown during a write");
-
- read_st.ep = f.client_ep;
- write_st.ep = f.server_ep;
- read_st.done = 0;
- write_st.done = 0;
-
- grpc_endpoint_notify_on_read(
- read_st.ep, shutdown_during_write_test_read_handler, &read_st);
- for (size = 1;; size *= 2) {
- slices = allocate_blocks(size, 1, &nblocks, &current_data);
- switch (grpc_endpoint_write(write_st.ep, slices, nblocks,
- shutdown_during_write_test_write_handler,
- &write_st)) {
- case GRPC_ENDPOINT_WRITE_DONE:
- break;
- case GRPC_ENDPOINT_WRITE_ERROR:
- gpr_log(GPR_ERROR, "error writing");
- abort();
- case GRPC_ENDPOINT_WRITE_PENDING:
- grpc_endpoint_shutdown(write_st.ep);
- deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- while (!write_st.done) {
- grpc_pollset_worker worker;
- GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
- grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- grpc_endpoint_destroy(write_st.ep);
- gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
- while (!read_st.done) {
- grpc_pollset_worker worker;
- GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0);
- grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
- gpr_free(slices);
- end_test(config);
- return;
- }
- gpr_free(slices);
- }
-
- gpr_log(GPR_ERROR, "should never reach here");
- abort();
}
void grpc_endpoint_tests(grpc_endpoint_test_config config,
grpc_pollset *pollset) {
+ size_t i;
g_pollset = pollset;
read_and_write_test(config, 10000000, 100000, 8192, 0);
read_and_write_test(config, 1000000, 100000, 1, 0);
read_and_write_test(config, 100000000, 100000, 1, 1);
- shutdown_during_write_test(config, 1000);
+ for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
+ read_and_write_test(config, 40320, i, i, 0);
+ }
g_pollset = NULL;
}