From 5287817527403a2bda28ccdc47f9d5868b91d705 Mon Sep 17 00:00:00 2001 From: nathaniel Date: Tue, 9 Dec 2014 10:17:19 -0800 Subject: Strip "_test" from the name of thread_stress_test.c. Having a BUILD target named chttp2_fullstack_thread_stress_test_test is deceptive as it is a direct test of the system rather than a test of a test of the system. Change on 2014/12/09 by nathaniel ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=81688649 --- test/core/end2end/gen_build_json.py | 2 +- test/core/end2end/tests/thread_stress.c | 325 +++++++++++++++++++++++++++ test/core/end2end/tests/thread_stress_test.c | 325 --------------------------- 3 files changed, 326 insertions(+), 326 deletions(-) create mode 100644 test/core/end2end/tests/thread_stress.c delete mode 100644 test/core/end2end/tests/thread_stress_test.c (limited to 'test/core/end2end') diff --git a/test/core/end2end/gen_build_json.py b/test/core/end2end/gen_build_json.py index a333621493..ee79118c8f 100755 --- a/test/core/end2end/gen_build_json.py +++ b/test/core/end2end/gen_build_json.py @@ -31,7 +31,7 @@ END2END_TESTS = [ 'request_response_with_payload', 'simple_delayed_request', 'simple_request', - 'thread_stress_test', + 'thread_stress', 'writes_done_hangs_with_pending_read', ] diff --git a/test/core/end2end/tests/thread_stress.c b/test/core/end2end/tests/thread_stress.c new file mode 100644 index 0000000000..29944d84f9 --- /dev/null +++ b/test/core/end2end/tests/thread_stress.c @@ -0,0 +1,325 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include + +#include "src/core/surface/event_string.h" +#include "src/core/surface/completion_queue.h" +#include +#include +#include +#include + +#define SERVER_THREADS 16 +#define CLIENT_THREADS 16 + +static grpc_end2end_test_fixture g_fixture; +static gpr_timespec g_test_end_time; +static gpr_event g_client_done[CLIENT_THREADS]; +static gpr_event g_server_done[SERVER_THREADS]; +static gpr_mu g_mu; +static int g_active_requests; + +static gpr_timespec n_seconds_time(int n) { + return gpr_time_add(gpr_now(), gpr_time_from_micros(GPR_US_PER_SEC * n)); +} + +static gpr_timespec five_seconds_time() { return n_seconds_time(5); } + +/* Drain pending events on a completion queue until it's ready to destroy. + Does some post-processing to safely release memory on some of the events. */ +static void drain_cq(int client, grpc_completion_queue *cq) { + grpc_event *ev; + grpc_completion_type type; + char *evstr; + int done = 0; + char *name = client ? "client" : "server"; + while (!done) { + ev = grpc_completion_queue_next(cq, five_seconds_time()); + if (!ev) { + gpr_log(GPR_ERROR, "waiting for %s cq to drain", name); + grpc_cq_dump_pending_ops(cq); + continue; + } + + evstr = grpc_event_string(ev); + gpr_log(GPR_INFO, "got late %s event: %s", name, evstr); + gpr_free(evstr); + + type = ev->type; + switch (type) { + case GRPC_SERVER_RPC_NEW: + gpr_free(ev->tag); + if (ev->call) { + grpc_call_destroy(ev->call); + } + break; + case GRPC_FINISHED: + grpc_call_destroy(ev->call); + break; + case GRPC_QUEUE_SHUTDOWN: + done = 1; + break; + case GRPC_READ: + case GRPC_WRITE_ACCEPTED: + if (!client && gpr_unref(ev->tag)) { + gpr_free(ev->tag); + } + default: + break; + } + grpc_event_finish(ev); + } +} + +/* Kick off a new request - assumes g_mu taken */ +static void start_request() { + grpc_call *call = grpc_channel_create_call( + g_fixture.client, "/Foo", "test.google.com", g_test_end_time); + g_active_requests++; + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(call, g_fixture.client_cq, + NULL, NULL, NULL, 0)); +} + +/* Async client: handle sending requests, reading responses, and starting + new requests when old ones finish */ +static void client_thread(void *p) { + int id = (gpr_intptr)p; + grpc_event *ev; + gpr_slice slice = gpr_slice_malloc(100); + grpc_byte_buffer *buf; + char *estr; + memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice)); + + buf = grpc_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + + for (;;) { + ev = grpc_completion_queue_next(g_fixture.client_cq, n_seconds_time(1)); + if (ev) { + switch (ev->type) { + default: + estr = grpc_event_string(ev); + gpr_log(GPR_ERROR, "unexpected event: %s", estr); + gpr_free(estr); + break; + case GRPC_INVOKE_ACCEPTED: + /* better not keep going if the invoke failed */ + if (ev->data.invoke_accepted == GRPC_OP_OK) { + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, NULL)); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_write(ev->call, buf, NULL, 0)); + } + break; + case GRPC_READ: + break; + case GRPC_WRITE_ACCEPTED: + GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(ev->call, NULL)); + break; + case GRPC_FINISH_ACCEPTED: + break; + case GRPC_CLIENT_METADATA_READ: + break; + case GRPC_FINISHED: + /* kick off a new request if the test should still be running */ + gpr_mu_lock(&g_mu); + g_active_requests--; + if (gpr_time_cmp(gpr_now(), g_test_end_time) < 0) { + start_request(); + } + gpr_mu_unlock(&g_mu); + grpc_call_destroy(ev->call); + break; + } + grpc_event_finish(ev); + } + gpr_mu_lock(&g_mu); + if (g_active_requests == 0) { + gpr_mu_unlock(&g_mu); + break; + } + gpr_mu_unlock(&g_mu); + } + + grpc_byte_buffer_destroy(buf); + gpr_event_set(&g_client_done[id], (void *)1); +} + +/* Request a new server call. We tag them with a ref-count that starts at two, + and decrements after each of: a read completes and a write completes. + When it drops to zero, we write status */ +static void request_server_call() { + gpr_refcount *rc = gpr_malloc(sizeof(gpr_refcount)); + gpr_ref_init(rc, 2); + grpc_server_request_call(g_fixture.server, rc); +} + +static void maybe_end_server_call(grpc_call *call, gpr_refcount *rc) { + grpc_status ok_status = {GRPC_STATUS_OK, NULL}; + if (gpr_unref(rc)) { + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_write_status(call, ok_status, NULL)); + gpr_free(rc); + } +} + +static void server_thread(void *p) { + int id = (gpr_intptr)p; + grpc_event *ev; + gpr_slice slice = gpr_slice_malloc(100); + grpc_byte_buffer *buf; + char *estr; + memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice)); + + request_server_call(); + + buf = grpc_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + + for (;;) { + ev = grpc_completion_queue_next(g_fixture.server_cq, n_seconds_time(1)); + if (ev) { + switch (ev->type) { + default: + estr = grpc_event_string(ev); + gpr_log(GPR_ERROR, "unexpected event: %s", estr); + gpr_free(estr); + break; + case GRPC_SERVER_RPC_NEW: + if (ev->call) { + GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(ev->call, + g_fixture.server_cq, + ev->tag, 0)); + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, ev->tag)); + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_write(ev->call, buf, ev->tag, 0)); + } else { + gpr_free(ev->tag); + } + break; + case GRPC_READ: + if (ev->data.read) { + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, ev->tag)); + } else { + maybe_end_server_call(ev->call, ev->tag); + } + break; + case GRPC_WRITE_ACCEPTED: + maybe_end_server_call(ev->call, ev->tag); + break; + case GRPC_FINISH_ACCEPTED: + break; + case GRPC_FINISHED: + grpc_call_destroy(ev->call); + request_server_call(); + break; + } + grpc_event_finish(ev); + } + gpr_mu_lock(&g_mu); + if (g_active_requests == 0) { + gpr_mu_unlock(&g_mu); + break; + } + gpr_mu_unlock(&g_mu); + } + + grpc_byte_buffer_destroy(buf); + gpr_event_set(&g_server_done[id], (void *)1); +} + +static void run_test(grpc_end2end_test_config config, int requests_in_flight) { + int i; + gpr_thd_id thd_id; + + gpr_log(GPR_INFO, "thread_stress_test/%s @ %d requests", config.name, + requests_in_flight); + + /* setup client, server */ + g_fixture = config.create_fixture(NULL, NULL); + config.init_client(&g_fixture, NULL); + config.init_server(&g_fixture, NULL); + + /* schedule end time */ + g_test_end_time = n_seconds_time(5); + + g_active_requests = 0; + gpr_mu_init(&g_mu); + + /* kick off threads */ + for (i = 0; i < CLIENT_THREADS; i++) { + gpr_event_init(&g_client_done[i]); + gpr_thd_new(&thd_id, client_thread, (void *)(gpr_intptr)i, NULL); + } + for (i = 0; i < SERVER_THREADS; i++) { + gpr_event_init(&g_server_done[i]); + gpr_thd_new(&thd_id, server_thread, (void *)(gpr_intptr)i, NULL); + } + + /* start requests */ + gpr_mu_lock(&g_mu); + for (i = 0; i < requests_in_flight; i++) { + start_request(); + } + gpr_mu_unlock(&g_mu); + + /* await completion */ + for (i = 0; i < CLIENT_THREADS; i++) { + gpr_event_wait(&g_client_done[i], gpr_inf_future); + } + for (i = 0; i < SERVER_THREADS; i++) { + gpr_event_wait(&g_server_done[i], gpr_inf_future); + } + + /* shutdown the things */ + grpc_server_shutdown(g_fixture.server); + grpc_server_destroy(g_fixture.server); + grpc_channel_destroy(g_fixture.client); + + grpc_completion_queue_shutdown(g_fixture.server_cq); + drain_cq(0, g_fixture.server_cq); + grpc_completion_queue_destroy(g_fixture.server_cq); + grpc_completion_queue_shutdown(g_fixture.client_cq); + drain_cq(1, g_fixture.client_cq); + grpc_completion_queue_destroy(g_fixture.client_cq); + + config.tear_down_data(&g_fixture); + + gpr_mu_destroy(&g_mu); +} + +void grpc_end2end_tests(grpc_end2end_test_config config) { + run_test(config, 1000); +} diff --git a/test/core/end2end/tests/thread_stress_test.c b/test/core/end2end/tests/thread_stress_test.c deleted file mode 100644 index 44b250fecb..0000000000 --- a/test/core/end2end/tests/thread_stress_test.c +++ /dev/null @@ -1,325 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include "test/core/end2end/end2end_tests.h" - -#include - -#include "src/core/surface/event_string.h" -#include "src/core/surface/completion_queue.h" -#include -#include -#include -#include - -#define SERVER_THREADS 16 -#define CLIENT_THREADS 16 - -static grpc_end2end_test_fixture g_fixture; -static gpr_timespec g_test_end_time; -static gpr_event g_client_done[CLIENT_THREADS]; -static gpr_event g_server_done[SERVER_THREADS]; -static gpr_mu g_mu; -static int g_active_requests; - -static gpr_timespec n_seconds_time(int n) { - return gpr_time_add(gpr_now(), gpr_time_from_micros(GPR_US_PER_SEC * n)); -} - -static gpr_timespec five_seconds_time() { return n_seconds_time(5); } - -/* Drain pending events on a completion queue until it's ready to destroy. - Does some post-processing to safely release memory on some of the events. */ -static void drain_cq(int client, grpc_completion_queue *cq) { - grpc_event *ev; - grpc_completion_type type; - char *evstr; - int done = 0; - char *name = client ? "client" : "server"; - while (!done) { - ev = grpc_completion_queue_next(cq, five_seconds_time()); - if (!ev) { - gpr_log(GPR_ERROR, "waiting for %s cq to drain", name); - grpc_cq_dump_pending_ops(cq); - continue; - } - - evstr = grpc_event_string(ev); - gpr_log(GPR_INFO, "got late %s event: %s", name, evstr); - gpr_free(evstr); - - type = ev->type; - switch (type) { - case GRPC_SERVER_RPC_NEW: - gpr_free(ev->tag); - if (ev->call) { - grpc_call_destroy(ev->call); - } - break; - case GRPC_FINISHED: - grpc_call_destroy(ev->call); - break; - case GRPC_QUEUE_SHUTDOWN: - done = 1; - break; - case GRPC_READ: - case GRPC_WRITE_ACCEPTED: - if (!client && gpr_unref(ev->tag)) { - gpr_free(ev->tag); - } - default: - break; - } - grpc_event_finish(ev); - } -} - -/* Kick off a new request - assumes g_mu taken */ -static void start_request() { - grpc_call *call = grpc_channel_create_call( - g_fixture.client, "/Foo", "test.google.com", g_test_end_time); - g_active_requests++; - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(call, g_fixture.client_cq, - NULL, NULL, NULL, 0)); -} - -/* Async client: handle sending requests, reading responses, and starting - new requests when old ones finish */ -static void client_thread(void *p) { - int id = (gpr_intptr)p; - grpc_event *ev; - gpr_slice slice = gpr_slice_malloc(100); - grpc_byte_buffer *buf; - char *estr; - memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice)); - - buf = grpc_byte_buffer_create(&slice, 1); - gpr_slice_unref(slice); - - for (;;) { - ev = grpc_completion_queue_next(g_fixture.client_cq, n_seconds_time(1)); - if (ev) { - switch (ev->type) { - default: - estr = grpc_event_string(ev); - gpr_log(GPR_ERROR, "unexpected event: %s", estr); - gpr_free(estr); - break; - case GRPC_INVOKE_ACCEPTED: - /* better not keep going if the invoke failed */ - if (ev->data.invoke_accepted == GRPC_OP_OK) { - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, NULL)); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_write(ev->call, buf, NULL, 0)); - } - break; - case GRPC_READ: - break; - case GRPC_WRITE_ACCEPTED: - GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(ev->call, NULL)); - break; - case GRPC_FINISH_ACCEPTED: - break; - case GRPC_CLIENT_METADATA_READ: - break; - case GRPC_FINISHED: - /* kick off a new request if the test should still be running */ - gpr_mu_lock(&g_mu); - g_active_requests--; - if (gpr_time_cmp(gpr_now(), g_test_end_time) < 0) { - start_request(); - } - gpr_mu_unlock(&g_mu); - grpc_call_destroy(ev->call); - break; - } - grpc_event_finish(ev); - } - gpr_mu_lock(&g_mu); - if (g_active_requests == 0) { - gpr_mu_unlock(&g_mu); - break; - } - gpr_mu_unlock(&g_mu); - } - - grpc_byte_buffer_destroy(buf); - gpr_event_set(&g_client_done[id], (void *)1); -} - -/* Request a new server call. We tag them with a ref-count that starts at two, - and decrements after each of: a read completes and a write completes. - When it drops to zero, we write status */ -static void request_server_call() { - gpr_refcount *rc = gpr_malloc(sizeof(gpr_refcount)); - gpr_ref_init(rc, 2); - grpc_server_request_call(g_fixture.server, rc); -} - -static void maybe_end_server_call(grpc_call *call, gpr_refcount *rc) { - grpc_status ok_status = {GRPC_STATUS_OK, NULL}; - if (gpr_unref(rc)) { - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_write_status(call, ok_status, NULL)); - gpr_free(rc); - } -} - -static void server_thread(void *p) { - int id = (gpr_intptr)p; - grpc_event *ev; - gpr_slice slice = gpr_slice_malloc(100); - grpc_byte_buffer *buf; - char *estr; - memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice)); - - request_server_call(); - - buf = grpc_byte_buffer_create(&slice, 1); - gpr_slice_unref(slice); - - for (;;) { - ev = grpc_completion_queue_next(g_fixture.server_cq, n_seconds_time(1)); - if (ev) { - switch (ev->type) { - default: - estr = grpc_event_string(ev); - gpr_log(GPR_ERROR, "unexpected event: %s", estr); - gpr_free(estr); - break; - case GRPC_SERVER_RPC_NEW: - if (ev->call) { - GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(ev->call, - g_fixture.server_cq, - ev->tag, 0)); - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, ev->tag)); - GPR_ASSERT(GRPC_CALL_OK == - grpc_call_start_write(ev->call, buf, ev->tag, 0)); - } else { - gpr_free(ev->tag); - } - break; - case GRPC_READ: - if (ev->data.read) { - GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, ev->tag)); - } else { - maybe_end_server_call(ev->call, ev->tag); - } - break; - case GRPC_WRITE_ACCEPTED: - maybe_end_server_call(ev->call, ev->tag); - break; - case GRPC_FINISH_ACCEPTED: - break; - case GRPC_FINISHED: - grpc_call_destroy(ev->call); - request_server_call(); - break; - } - grpc_event_finish(ev); - } - gpr_mu_lock(&g_mu); - if (g_active_requests == 0) { - gpr_mu_unlock(&g_mu); - break; - } - gpr_mu_unlock(&g_mu); - } - - grpc_byte_buffer_destroy(buf); - gpr_event_set(&g_server_done[id], (void *)1); -} - -static void run_test(grpc_end2end_test_config config, int requests_in_flight) { - int i; - gpr_thd_id thd_id; - - gpr_log(GPR_INFO, "thread_test/%s @ %d requests", config.name, - requests_in_flight); - - /* setup client, server */ - g_fixture = config.create_fixture(NULL, NULL); - config.init_client(&g_fixture, NULL); - config.init_server(&g_fixture, NULL); - - /* schedule end time */ - g_test_end_time = n_seconds_time(5); - - g_active_requests = 0; - gpr_mu_init(&g_mu); - - /* kick off threads */ - for (i = 0; i < CLIENT_THREADS; i++) { - gpr_event_init(&g_client_done[i]); - gpr_thd_new(&thd_id, client_thread, (void *)(gpr_intptr)i, NULL); - } - for (i = 0; i < SERVER_THREADS; i++) { - gpr_event_init(&g_server_done[i]); - gpr_thd_new(&thd_id, server_thread, (void *)(gpr_intptr)i, NULL); - } - - /* start requests */ - gpr_mu_lock(&g_mu); - for (i = 0; i < requests_in_flight; i++) { - start_request(); - } - gpr_mu_unlock(&g_mu); - - /* await completion */ - for (i = 0; i < CLIENT_THREADS; i++) { - gpr_event_wait(&g_client_done[i], gpr_inf_future); - } - for (i = 0; i < SERVER_THREADS; i++) { - gpr_event_wait(&g_server_done[i], gpr_inf_future); - } - - /* shutdown the things */ - grpc_server_shutdown(g_fixture.server); - grpc_server_destroy(g_fixture.server); - grpc_channel_destroy(g_fixture.client); - - grpc_completion_queue_shutdown(g_fixture.server_cq); - drain_cq(0, g_fixture.server_cq); - grpc_completion_queue_destroy(g_fixture.server_cq); - grpc_completion_queue_shutdown(g_fixture.client_cq); - drain_cq(1, g_fixture.client_cq); - grpc_completion_queue_destroy(g_fixture.client_cq); - - config.tear_down_data(&g_fixture); - - gpr_mu_destroy(&g_mu); -} - -void grpc_end2end_tests(grpc_end2end_test_config config) { - run_test(config, 1000); -} -- cgit v1.2.3