diff options
Diffstat (limited to 'test/core')
-rw-r--r-- | test/core/bad_client/bad_client.c | 17 | ||||
-rwxr-xr-x | test/core/end2end/gen_build_yaml.py | 4 | ||||
-rw-r--r-- | test/core/iomgr/endpoint_pair_test.c | 77 | ||||
-rw-r--r-- | test/core/iomgr/endpoint_tests.c | 263 | ||||
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 148 | ||||
-rw-r--r-- | test/core/security/secure_endpoint_test.c | 55 | ||||
-rw-r--r-- | test/core/util/port_posix.c | 14 | ||||
-rw-r--r-- | test/core/util/port_windows.c | 91 |
8 files changed, 336 insertions, 333 deletions
diff --git a/test/core/bad_client/bad_client.c b/test/core/bad_client/bad_client.c index 24bf5d3625..1d98879662 100644 --- a/test/core/bad_client/bad_client.c +++ b/test/core/bad_client/bad_client.c @@ -59,7 +59,7 @@ static void thd_func(void *arg) { gpr_event_set(&a->done_thd, (void *)1); } -static void done_write(void *arg, grpc_endpoint_cb_status status) { +static void done_write(void *arg, int success) { thd_args *a = arg; gpr_event_set(&a->done_write, (void *)1); } @@ -85,6 +85,8 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, grpc_mdctx *mdctx = grpc_mdctx_create(); gpr_slice slice = gpr_slice_from_copied_buffer(client_payload, client_payload_length); + gpr_slice_buffer outgoing; + grpc_iomgr_closure done_write_closure; hex = gpr_dump(client_payload, client_payload_length, GPR_DUMP_HEX | GPR_DUMP_ASCII); @@ -122,14 +124,18 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, /* Start validator */ gpr_thd_new(&id, thd_func, &a, NULL); + gpr_slice_buffer_init(&outgoing); + gpr_slice_buffer_add(&outgoing, slice); + grpc_iomgr_closure_init(&done_write_closure, done_write, &a); + /* Write data */ - switch (grpc_endpoint_write(sfd.client, &slice, 1, done_write, &a)) { - case GRPC_ENDPOINT_WRITE_DONE: + switch (grpc_endpoint_write(sfd.client, &outgoing, &done_write_closure)) { + case GRPC_ENDPOINT_DONE: done_write(&a, 1); break; - case GRPC_ENDPOINT_WRITE_PENDING: + case GRPC_ENDPOINT_PENDING: break; - case GRPC_ENDPOINT_WRITE_ERROR: + case GRPC_ENDPOINT_ERROR: done_write(&a, 0); break; } @@ -155,6 +161,7 @@ void grpc_run_bad_client_test(grpc_bad_client_server_side_validator validator, .type == GRPC_OP_COMPLETE); grpc_server_destroy(a.server); grpc_completion_queue_destroy(a.cq); + gpr_slice_buffer_destroy(&outgoing); grpc_shutdown(); } diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py index 3fe2909493..46cdb80c86 100755 --- a/test/core/end2end/gen_build_yaml.py +++ b/test/core/end2end/gen_build_yaml.py @@ -144,7 +144,7 @@ def main(): 'name': 'end2end_fixture_%s' % f, 'build': 'private', 'language': 'c', - 'secure': 'check' if END2END_FIXTURES[f].secure else 'no', + 'secure': 'check' if END2END_FIXTURES[f].secure else False, 'src': ['test/core/end2end/fixtures/%s.c' % f], 'platforms': [ 'linux', 'mac', 'posix' ] if f.endswith('_posix') else END2END_FIXTURES[f].platforms, 'deps': sec_deps if END2END_FIXTURES[f].secure else unsec_deps, @@ -156,7 +156,7 @@ def main(): 'name': 'end2end_test_%s' % t, 'build': 'private', 'language': 'c', - 'secure': 'check' if END2END_TESTS[t].secure else 'no', + 'secure': 'check' if END2END_TESTS[t].secure else False, 'src': ['test/core/end2end/tests/%s.c' % t], 'headers': ['test/core/end2end/tests/cancel_test_helpers.h', 'test/core/end2end/end2end_tests.h'], diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c new file mode 100644 index 0000000000..3abde5ac35 --- /dev/null +++ b/test/core/iomgr/endpoint_pair_test.c @@ -0,0 +1,77 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/tcp_posix.h" + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpc/support/useful.h> +#include "src/core/iomgr/endpoint_pair.h" +#include "test/core/util/test_config.h" +#include "test/core/iomgr/endpoint_tests.h" + +static grpc_pollset g_pollset; + +static void clean_up(void) {} + +static grpc_endpoint_test_fixture create_fixture_endpoint_pair( + size_t slice_size) { + grpc_endpoint_test_fixture f; + grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", slice_size); + + f.client_ep = p.client; + f.server_ep = p.server; + grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset); + grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset); + + return f; +} + +static grpc_endpoint_test_config configs[] = { + {"tcp/tcp_socketpair", create_fixture_endpoint_pair, clean_up}, +}; + +static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + grpc_init(); + grpc_pollset_init(&g_pollset); + grpc_endpoint_tests(configs[0], &g_pollset); + grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_shutdown(); + + return 0; +} diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 6ef8e9ca3b..27123eb216 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -39,6 +39,7 @@ #include <grpc/support/slice.h> #include <grpc/support/log.h> #include <grpc/support/time.h> +#include <grpc/support/useful.h> #include "test/core/util/test_config.h" /* @@ -59,8 +60,7 @@ static grpc_pollset *g_pollset; -size_t count_and_unref_slices(gpr_slice *slices, size_t nslices, - int *current_data) { +size_t count_slices(gpr_slice *slices, size_t nslices, int *current_data) { size_t num_bytes = 0; size_t i; size_t j; @@ -72,7 +72,6 @@ size_t count_and_unref_slices(gpr_slice *slices, size_t nslices, *current_data = (*current_data + 1) % 256; } num_bytes += GPR_SLICE_LENGTH(slices[i]); - gpr_slice_unref(slices[i]); } return num_bytes; } @@ -121,86 +120,78 @@ struct read_and_write_test_state { int current_write_data; int read_done; int write_done; + gpr_slice_buffer incoming; + gpr_slice_buffer outgoing; + grpc_iomgr_closure done_read; + grpc_iomgr_closure done_write; }; -static void read_and_write_test_read_handler(void *data, gpr_slice *slices, - size_t nslices, - grpc_endpoint_cb_status error) { +static void read_and_write_test_read_handler(void *data, int success) { struct read_and_write_test_state *state = data; - GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); - if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { - gpr_log(GPR_INFO, "Read handler shutdown"); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->read_done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - return; - } - state->bytes_read += - count_and_unref_slices(slices, nslices, &state->current_read_data); - if (state->bytes_read == state->target_bytes) { +loop: + state->bytes_read += count_slices( + state->incoming.slices, state->incoming.count, &state->current_read_data); + if (state->bytes_read == state->target_bytes || !success) { gpr_log(GPR_INFO, "Read handler done"); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->read_done = 1; + state->read_done = 1 + success; grpc_pollset_kick(g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - } else { - grpc_endpoint_notify_on_read(state->read_ep, - read_and_write_test_read_handler, data); + } else if (success) { + switch (grpc_endpoint_read(state->read_ep, &state->incoming, + &state->done_read)) { + case GRPC_ENDPOINT_ERROR: + success = 0; + goto loop; + case GRPC_ENDPOINT_DONE: + success = 1; + goto loop; + case GRPC_ENDPOINT_PENDING: + break; + } } } -static void read_and_write_test_write_handler(void *data, - grpc_endpoint_cb_status error) { +static void read_and_write_test_write_handler(void *data, int success) { struct read_and_write_test_state *state = data; gpr_slice *slices = NULL; size_t nslices; - grpc_endpoint_write_status write_status; - - GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR); - - gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler", - error); - - if (error == GRPC_ENDPOINT_CB_SHUTDOWN) { - gpr_log(GPR_INFO, "Write handler shutdown"); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->write_done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - return; - } - - for (;;) { - /* Need to do inline writes until they don't succeed synchronously or we - finish writing */ - state->bytes_written += state->current_write_size; - if (state->target_bytes - state->bytes_written < - state->current_write_size) { - state->current_write_size = state->target_bytes - state->bytes_written; - } - if (state->current_write_size == 0) { - break; - } - - slices = allocate_blocks(state->current_write_size, 8192, &nslices, - &state->current_write_data); - write_status = - grpc_endpoint_write(state->write_ep, slices, nslices, - read_and_write_test_write_handler, state); - gpr_log(GPR_DEBUG, "write_status=%d", write_status); - GPR_ASSERT(write_status != GRPC_ENDPOINT_WRITE_ERROR); - free(slices); - if (write_status == GRPC_ENDPOINT_WRITE_PENDING) { - return; + grpc_endpoint_op_status write_status; + + if (success) { + for (;;) { + /* Need to do inline writes until they don't succeed synchronously or we + finish writing */ + state->bytes_written += state->current_write_size; + if (state->target_bytes - state->bytes_written < + state->current_write_size) { + state->current_write_size = state->target_bytes - state->bytes_written; + } + if (state->current_write_size == 0) { + break; + } + + slices = allocate_blocks(state->current_write_size, 8192, &nslices, + &state->current_write_data); + gpr_slice_buffer_reset_and_unref(&state->outgoing); + gpr_slice_buffer_addn(&state->outgoing, slices, nslices); + write_status = grpc_endpoint_write(state->write_ep, &state->outgoing, + &state->done_write); + free(slices); + if (write_status == GRPC_ENDPOINT_PENDING) { + return; + } else if (write_status == GRPC_ENDPOINT_ERROR) { + goto cleanup; + } } + GPR_ASSERT(state->bytes_written == state->target_bytes); } - GPR_ASSERT(state->bytes_written == state->target_bytes); +cleanup: gpr_log(GPR_INFO, "Write handler done"); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - state->write_done = 1; + state->write_done = 1 + success; grpc_pollset_kick(g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } @@ -216,6 +207,8 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); grpc_endpoint_test_fixture f = begin_test(config, "read_and_write_test", slice_size); + gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d", + num_bytes, write_size, slice_size, shutdown); if (shutdown) { gpr_log(GPR_INFO, "Start read and write shutdown test"); @@ -234,16 +227,31 @@ static void read_and_write_test(grpc_endpoint_test_config config, state.write_done = 0; state.current_read_data = 0; state.current_write_data = 0; + grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler, + &state); + grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler, + &state); + gpr_slice_buffer_init(&state.outgoing); + gpr_slice_buffer_init(&state.incoming); /* Get started by pretending an initial write completed */ /* NOTE: Sets up initial conditions so we can have the same write handler for the first iteration as for later iterations. It does the right thing even when bytes_written is unsigned. */ state.bytes_written -= state.current_write_size; - read_and_write_test_write_handler(&state, GRPC_ENDPOINT_CB_OK); + read_and_write_test_write_handler(&state, 1); - grpc_endpoint_notify_on_read(state.read_ep, read_and_write_test_read_handler, - &state); + switch ( + grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) { + case GRPC_ENDPOINT_PENDING: + break; + case GRPC_ENDPOINT_ERROR: + read_and_write_test_read_handler(&state, 0); + break; + case GRPC_ENDPOINT_DONE: + read_and_write_test_read_handler(&state, 1); + break; + } if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); @@ -261,129 +269,22 @@ static void read_and_write_test(grpc_endpoint_test_config config, } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); + end_test(config); + gpr_slice_buffer_destroy(&state.outgoing); + gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(state.read_ep); grpc_endpoint_destroy(state.write_ep); - end_test(config); -} - -struct timeout_test_state { - int io_done; -}; - -typedef struct { - int done; - grpc_endpoint *ep; -} shutdown_during_write_test_state; - -static void shutdown_during_write_test_read_handler( - void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { - size_t i; - shutdown_during_write_test_state *st = user_data; - - for (i = 0; i < nslices; i++) { - gpr_slice_unref(slices[i]); - } - - if (error != GRPC_ENDPOINT_CB_OK) { - grpc_endpoint_destroy(st->ep); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - st->done = error; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - } else { - grpc_endpoint_notify_on_read( - st->ep, shutdown_during_write_test_read_handler, user_data); - } -} - -static void shutdown_during_write_test_write_handler( - void *user_data, grpc_endpoint_cb_status error) { - shutdown_during_write_test_state *st = user_data; - gpr_log(GPR_INFO, "shutdown_during_write_test_write_handler: error = %d", - error); - if (error == 0) { - /* This happens about 0.5% of the time when run under TSAN, and is entirely - legitimate, but means we aren't testing the path we think we are. */ - /* TODO(klempner): Change this test to retry the write in that case */ - gpr_log(GPR_ERROR, - "shutdown_during_write_test_write_handler completed unexpectedly"); - } - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - st->done = 1; - grpc_pollset_kick(g_pollset, NULL); - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); -} - -static void shutdown_during_write_test(grpc_endpoint_test_config config, - size_t slice_size) { - /* test that shutdown with a pending write creates no leaks */ - gpr_timespec deadline; - size_t size; - size_t nblocks; - int current_data = 1; - shutdown_during_write_test_state read_st; - shutdown_during_write_test_state write_st; - gpr_slice *slices; - grpc_endpoint_test_fixture f = - begin_test(config, "shutdown_during_write_test", slice_size); - - gpr_log(GPR_INFO, "testing shutdown during a write"); - - read_st.ep = f.client_ep; - write_st.ep = f.server_ep; - read_st.done = 0; - write_st.done = 0; - - grpc_endpoint_notify_on_read( - read_st.ep, shutdown_during_write_test_read_handler, &read_st); - for (size = 1;; size *= 2) { - slices = allocate_blocks(size, 1, &nblocks, ¤t_data); - switch (grpc_endpoint_write(write_st.ep, slices, nblocks, - shutdown_during_write_test_write_handler, - &write_st)) { - case GRPC_ENDPOINT_WRITE_DONE: - break; - case GRPC_ENDPOINT_WRITE_ERROR: - gpr_log(GPR_ERROR, "error writing"); - abort(); - case GRPC_ENDPOINT_WRITE_PENDING: - grpc_endpoint_shutdown(write_st.ep); - deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - while (!write_st.done) { - grpc_pollset_worker worker; - GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); - grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); - } - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - grpc_endpoint_destroy(write_st.ep); - gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); - while (!read_st.done) { - grpc_pollset_worker worker; - GPR_ASSERT(gpr_time_cmp(gpr_now(deadline.clock_type), deadline) < 0); - grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); - } - gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); - gpr_free(slices); - end_test(config); - return; - } - gpr_free(slices); - } - - gpr_log(GPR_ERROR, "should never reach here"); - abort(); } void grpc_endpoint_tests(grpc_endpoint_test_config config, grpc_pollset *pollset) { + size_t i; g_pollset = pollset; read_and_write_test(config, 10000000, 100000, 8192, 0); read_and_write_test(config, 1000000, 100000, 1, 0); read_and_write_test(config, 100000000, 100000, 1, 1); - shutdown_during_write_test(config, 1000); + for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { + read_and_write_test(config, 40320, i, i, 0); + } g_pollset = NULL; } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 6ad832231f..8acaa433bb 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -118,10 +118,12 @@ struct read_socket_state { grpc_endpoint *ep; ssize_t read_bytes; ssize_t target_read_bytes; + gpr_slice_buffer incoming; + grpc_iomgr_closure read_cb; }; -static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices, - int *current_data) { +static ssize_t count_slices(gpr_slice *slices, size_t nslices, + int *current_data) { ssize_t num_bytes = 0; unsigned i, j; unsigned char *buf; @@ -132,31 +134,41 @@ static ssize_t count_and_unref_slices(gpr_slice *slices, size_t nslices, *current_data = (*current_data + 1) % 256; } num_bytes += GPR_SLICE_LENGTH(slices[i]); - gpr_slice_unref(slices[i]); } return num_bytes; } -static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { +static void read_cb(void *user_data, int success) { struct read_socket_state *state = (struct read_socket_state *)user_data; ssize_t read_bytes; int current_data; - GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); + GPR_ASSERT(success); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); current_data = state->read_bytes % 256; - read_bytes = count_and_unref_slices(slices, nslices, ¤t_data); + read_bytes = count_slices(state->incoming.slices, state->incoming.count, + ¤t_data); state->read_bytes += read_bytes; gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes, state->target_read_bytes); if (state->read_bytes >= state->target_read_bytes) { - /* empty */ + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { - grpc_endpoint_notify_on_read(state->ep, read_cb, state); + switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) { + case GRPC_ENDPOINT_DONE: + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + read_cb(user_data, 1); + break; + case GRPC_ENDPOINT_ERROR: + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + read_cb(user_data, 0); + break; + case GRPC_ENDPOINT_PENDING: + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + break; + } } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } /* Write to a socket, then read from it using the grpc_tcp API. */ @@ -181,8 +193,19 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; + gpr_slice_buffer_init(&state.incoming); + grpc_iomgr_closure_init(&state.read_cb, read_cb, &state); - grpc_endpoint_notify_on_read(ep, read_cb, &state); + switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { + case GRPC_ENDPOINT_DONE: + read_cb(&state, 1); + break; + case GRPC_ENDPOINT_ERROR: + read_cb(&state, 0); + break; + case GRPC_ENDPOINT_PENDING: + break; + } gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { @@ -193,6 +216,7 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(ep); } @@ -219,8 +243,19 @@ static void large_read_test(ssize_t slice_size) { state.ep = ep; state.read_bytes = 0; state.target_read_bytes = written_bytes; + gpr_slice_buffer_init(&state.incoming); + grpc_iomgr_closure_init(&state.read_cb, read_cb, &state); - grpc_endpoint_notify_on_read(ep, read_cb, &state); + switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { + case GRPC_ENDPOINT_DONE: + read_cb(&state, 1); + break; + case GRPC_ENDPOINT_ERROR: + read_cb(&state, 0); + break; + case GRPC_ENDPOINT_PENDING: + break; + } gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { @@ -231,6 +266,7 @@ static void large_read_test(ssize_t slice_size) { GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + gpr_slice_buffer_destroy(&state.incoming); grpc_endpoint_destroy(ep); } @@ -262,8 +298,7 @@ static gpr_slice *allocate_blocks(ssize_t num_bytes, ssize_t slice_size, return slices; } -static void write_done(void *user_data /* write_socket_state */, - grpc_endpoint_cb_status error) { +static void write_done(void *user_data /* write_socket_state */, int success) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -339,6 +374,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { size_t num_blocks; gpr_slice *slices; int current_data = 0; + gpr_slice_buffer outgoing; + grpc_iomgr_closure write_done_closure; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes, @@ -355,74 +392,21 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); - if (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state) == - GRPC_ENDPOINT_WRITE_DONE) { - /* Write completed immediately */ - read_bytes = drain_socket(sv[0]); - GPR_ASSERT(read_bytes == num_bytes); - } else { - drain_socket_blocking(sv[0], num_bytes, num_bytes); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - for (;;) { - grpc_pollset_worker worker; - if (state.write_done) { - break; - } - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); - } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - } - - grpc_endpoint_destroy(ep); - gpr_free(slices); -} - -static void read_done_for_write_error(void *ud, gpr_slice *slices, - size_t nslices, - grpc_endpoint_cb_status error) { - GPR_ASSERT(error != GRPC_ENDPOINT_CB_OK); - GPR_ASSERT(nslices == 0); -} - -/* Write to a socket using the grpc_tcp API, then drain it directly. - Note that if the write does not complete immediately we need to drain the - socket in parallel with the read. */ -static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { - int sv[2]; - grpc_endpoint *ep; - struct write_socket_state state; - size_t num_blocks; - gpr_slice *slices; - int current_data = 0; - grpc_pollset_worker worker; - gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); - - gpr_log(GPR_INFO, "Start write error test with %d bytes, slice size %d", - num_bytes, slice_size); - - create_sockets(sv); + gpr_slice_buffer_init(&outgoing); + gpr_slice_buffer_addn(&outgoing, slices, num_blocks); + grpc_iomgr_closure_init(&write_done_closure, write_done, &state); - ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"), - GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); - grpc_endpoint_add_to_pollset(ep, &g_pollset); - - close(sv[0]); - - state.ep = ep; - state.write_done = 0; - - slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data); - - switch (grpc_endpoint_write(ep, slices, num_blocks, write_done, &state)) { - case GRPC_ENDPOINT_WRITE_DONE: - case GRPC_ENDPOINT_WRITE_ERROR: + switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) { + case GRPC_ENDPOINT_DONE: /* Write completed immediately */ + read_bytes = drain_socket(sv[0]); + GPR_ASSERT(read_bytes == num_bytes); break; - case GRPC_ENDPOINT_WRITE_PENDING: - grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL); + case GRPC_ENDPOINT_PENDING: + drain_socket_blocking(sv[0], num_bytes, num_bytes); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); for (;;) { + grpc_pollset_worker worker; if (state.write_done) { break; } @@ -431,10 +415,14 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); break; + case GRPC_ENDPOINT_ERROR: + gpr_log(GPR_ERROR, "endpoint got error"); + abort(); } + gpr_slice_buffer_destroy(&outgoing); grpc_endpoint_destroy(ep); - free(slices); + gpr_free(slices); } void run_tests(void) { @@ -454,10 +442,6 @@ void run_tests(void) { write_test(100000, 137); for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { - write_error_test(40320, i); - } - - for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) { write_test(40320, i); } } diff --git a/test/core/security/secure_endpoint_test.c b/test/core/security/secure_endpoint_test.c index a8368fc842..c76ddcd194 100644 --- a/test/core/security/secure_endpoint_test.c +++ b/test/core/security/secure_endpoint_test.c @@ -135,62 +135,26 @@ static grpc_endpoint_test_config configs[] = { secure_endpoint_create_fixture_tcp_socketpair_leftover, clean_up}, }; -static void verify_leftover(void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { - gpr_slice s = - gpr_slice_from_copied_string("hello world 12345678900987654321"); - - GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); - GPR_ASSERT(nslices == 1); - - GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0])); - gpr_slice_unref(slices[0]); - gpr_slice_unref(s); - *(int *)user_data = 1; -} - static void test_leftover(grpc_endpoint_test_config config, size_t slice_size) { grpc_endpoint_test_fixture f = config.create_fixture(slice_size); - int verified = 0; + gpr_slice_buffer incoming; + gpr_slice s = + gpr_slice_from_copied_string("hello world 12345678900987654321"); gpr_log(GPR_INFO, "Start test left over"); - grpc_endpoint_notify_on_read(f.client_ep, verify_leftover, &verified); - GPR_ASSERT(verified == 1); + gpr_slice_buffer_init(&incoming); + GPR_ASSERT(grpc_endpoint_read(f.client_ep, &incoming, NULL) == + GRPC_ENDPOINT_DONE); + GPR_ASSERT(incoming.count == 1); + GPR_ASSERT(0 == gpr_slice_cmp(s, incoming.slices[0])); grpc_endpoint_shutdown(f.client_ep); grpc_endpoint_shutdown(f.server_ep); grpc_endpoint_destroy(f.client_ep); grpc_endpoint_destroy(f.server_ep); - clean_up(); -} - -static void destroy_early(void *user_data, gpr_slice *slices, size_t nslices, - grpc_endpoint_cb_status error) { - grpc_endpoint_test_fixture *f = user_data; - gpr_slice s = - gpr_slice_from_copied_string("hello world 12345678900987654321"); - - GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK); - GPR_ASSERT(nslices == 1); - - grpc_endpoint_shutdown(f->client_ep); - grpc_endpoint_destroy(f->client_ep); - - GPR_ASSERT(0 == gpr_slice_cmp(s, slices[0])); - gpr_slice_unref(slices[0]); gpr_slice_unref(s); -} + gpr_slice_buffer_destroy(&incoming); -/* test which destroys the ep before finishing reading */ -static void test_destroy_ep_early(grpc_endpoint_test_config config, - size_t slice_size) { - grpc_endpoint_test_fixture f = config.create_fixture(slice_size); - gpr_log(GPR_INFO, "Start test destroy early"); - - grpc_endpoint_notify_on_read(f.client_ep, destroy_early, &f); - - grpc_endpoint_shutdown(f.server_ep); - grpc_endpoint_destroy(f.server_ep); clean_up(); } @@ -203,7 +167,6 @@ int main(int argc, char **argv) { grpc_pollset_init(&g_pollset); grpc_endpoint_tests(configs[0], &g_pollset); test_leftover(configs[1], 1); - test_destroy_ep_early(configs[1], 1); grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); grpc_iomgr_shutdown(); diff --git a/test/core/util/port_posix.c b/test/core/util/port_posix.c index 836e62a541..4781d334e2 100644 --- a/test/core/util/port_posix.c +++ b/test/core/util/port_posix.c @@ -198,14 +198,13 @@ int grpc_pick_unused_port(void) { races with other processes on kernels that want to reuse the same port numbers over and over. */ - /* In alternating iterations we try UDP ports before TCP ports UDP + /* In alternating iterations we trial UDP ports before TCP ports UDP ports -- it could be the case that this machine has been using up UDP ports and they are scarcer. */ /* Type of port to first pick in next iteration */ int is_tcp = 1; - int try - = 0; + int trial = 0; char *env = gpr_getenv("GRPC_TEST_PORT_SERVER"); if (env) { @@ -218,11 +217,10 @@ int grpc_pick_unused_port(void) { for (;;) { int port; - try - ++; - if (try == 1) { + trial++; + if (trial == 1) { port = getpid() % (65536 - 30000) + 30000; - } else if (try <= NUM_RANDOM_PORTS_TO_PICK) { + } else if (trial <= NUM_RANDOM_PORTS_TO_PICK) { port = rand() % (65536 - 30000) + 30000; } else { port = 0; @@ -239,7 +237,7 @@ int grpc_pick_unused_port(void) { GPR_ASSERT(port > 0); /* Check that the port # is free for the other type of socket also */ if (!is_port_available(&port, !is_tcp)) { - /* In the next iteration try to bind to the other type first + /* In the next iteration trial to bind to the other type first because perhaps it is more rare. */ is_tcp = !is_tcp; continue; diff --git a/test/core/util/port_windows.c b/test/core/util/port_windows.c index 5b072f805a..2f64626cf3 100644 --- a/test/core/util/port_windows.c +++ b/test/core/util/port_windows.c @@ -35,7 +35,6 @@ #include "test/core/util/test_config.h" #if defined(GPR_WINSOCK_SOCKET) && defined(GRPC_TEST_PICK_PORT) -#include "src/core/iomgr/sockaddr_utils.h" #include "test/core/util/port.h" #include <process.h> @@ -43,8 +42,14 @@ #include <errno.h> #include <string.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include "src/core/support/env.h" +#include "src/core/httpcli/httpcli.h" +#include "src/core/iomgr/sockaddr_utils.h" + #define NUM_RANDOM_PORTS_TO_PICK 100 static int is_port_available(int *port, int is_tcp) { @@ -99,6 +104,67 @@ static int is_port_available(int *port, int is_tcp) { return 1; } +typedef struct portreq { + grpc_pollset pollset; + int port; +} portreq; + +static void got_port_from_server(void *arg, + const grpc_httpcli_response *response) { + size_t i; + int port = 0; + portreq *pr = arg; + GPR_ASSERT(response); + GPR_ASSERT(response->status == 200); + for (i = 0; i < response->body_length; i++) { + GPR_ASSERT(response->body[i] >= '0' && response->body[i] <= '9'); + port = port * 10 + response->body[i] - '0'; + } + GPR_ASSERT(port > 1024); + gpr_mu_lock(GRPC_POLLSET_MU(&pr->pollset)); + pr->port = port; + grpc_pollset_kick(&pr->pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&pr->pollset)); +} + +static void destroy_pollset_and_shutdown(void *p) { + grpc_pollset_destroy(p); + grpc_shutdown(); +} + +static int pick_port_using_server(char *server) { + grpc_httpcli_context context; + grpc_httpcli_request req; + portreq pr; + + grpc_init(); + + memset(&pr, 0, sizeof(pr)); + memset(&req, 0, sizeof(req)); + grpc_pollset_init(&pr.pollset); + pr.port = -1; + + req.host = server; + req.path = "/get"; + + grpc_httpcli_context_init(&context); + grpc_httpcli_get(&context, &pr.pollset, &req, + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10), got_port_from_server, + &pr); + gpr_mu_lock(GRPC_POLLSET_MU(&pr.pollset)); + while (pr.port == -1) { + grpc_pollset_worker worker; + grpc_pollset_work(&pr.pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1)); + } + gpr_mu_unlock(GRPC_POLLSET_MU(&pr.pollset)); + + grpc_httpcli_context_destroy(&context); + grpc_pollset_shutdown(&pr.pollset, destroy_pollset_and_shutdown, &pr.pollset); + + return pr.port; +} + int grpc_pick_unused_port(void) { /* We repeatedly pick a port and then see whether or not it is available for use both as a TCP socket and a UDP socket. First, we @@ -108,22 +174,29 @@ int grpc_pick_unused_port(void) { races with other processes on kernels that want to reuse the same port numbers over and over. */ - /* In alternating iterations we try UDP ports before TCP ports UDP + /* In alternating iterations we trial UDP ports before TCP ports UDP ports -- it could be the case that this machine has been using up UDP ports and they are scarcer. */ /* Type of port to first pick in next iteration */ int is_tcp = 1; - int try - = 0; + int trial = 0; + + char *env = gpr_getenv("GRPC_TEST_PORT_SERVER"); + if (env) { + int port = pick_port_using_server(env); + gpr_free(env); + if (port != 0) { + return port; + } + } for (;;) { int port; - try - ++; - if (try == 1) { + trial++; + if (trial == 1) { port = _getpid() % (65536 - 30000) + 30000; - } else if (try <= NUM_RANDOM_PORTS_TO_PICK) { + } else if (trial <= NUM_RANDOM_PORTS_TO_PICK) { port = rand() % (65536 - 30000) + 30000; } else { port = 0; @@ -136,7 +209,7 @@ int grpc_pick_unused_port(void) { GPR_ASSERT(port > 0); /* Check that the port # is free for the other type of socket also */ if (!is_port_available(&port, !is_tcp)) { - /* In the next iteration try to bind to the other type first + /* In the next iteration trial to bind to the other type first because perhaps it is more rare. */ is_tcp = !is_tcp; continue; |