/* * * Copyright 2014, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ #include "test/core/end2end/end2end_tests.h" #include #include "src/core/surface/event_string.h" #include "src/core/surface/completion_queue.h" #include #include #include #include #define SERVER_THREADS 16 #define CLIENT_THREADS 16 static grpc_end2end_test_fixture g_fixture; static gpr_timespec g_test_end_time; static gpr_event g_client_done[CLIENT_THREADS]; static gpr_event g_server_done[SERVER_THREADS]; static gpr_mu g_mu; static int g_active_requests; static gpr_timespec n_seconds_time(int n) { return gpr_time_add(gpr_now(), gpr_time_from_micros(GPR_US_PER_SEC * n)); } static gpr_timespec five_seconds_time(void) { return n_seconds_time(5); } /* Drain pending events on a completion queue until it's ready to destroy. Does some post-processing to safely release memory on some of the events. */ static void drain_cq(int client, grpc_completion_queue *cq) { grpc_event *ev; grpc_completion_type type; char *evstr; int done = 0; char *name = client ? "client" : "server"; while (!done) { ev = grpc_completion_queue_next(cq, five_seconds_time()); if (!ev) { gpr_log(GPR_ERROR, "waiting for %s cq to drain", name); grpc_cq_dump_pending_ops(cq); continue; } evstr = grpc_event_string(ev); gpr_log(GPR_INFO, "got late %s event: %s", name, evstr); gpr_free(evstr); type = ev->type; switch (type) { case GRPC_SERVER_RPC_NEW: gpr_free(ev->tag); if (ev->call) { grpc_call_destroy(ev->call); } break; case GRPC_FINISHED: grpc_call_destroy(ev->call); break; case GRPC_QUEUE_SHUTDOWN: done = 1; break; case GRPC_READ: case GRPC_WRITE_ACCEPTED: if (!client && gpr_unref(ev->tag)) { gpr_free(ev->tag); } default: break; } grpc_event_finish(ev); } } /* Kick off a new request - assumes g_mu taken */ static void start_request(void) { grpc_call *call = grpc_channel_create_call( g_fixture.client, "/Foo", "test.google.com", g_test_end_time); g_active_requests++; GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(call, g_fixture.client_cq, NULL, NULL, NULL, 0)); } /* Async client: handle sending requests, reading responses, and starting new requests when old ones finish */ static void client_thread(void *p) { int id = (gpr_intptr)p; grpc_event *ev; gpr_slice slice = gpr_slice_malloc(100); grpc_byte_buffer *buf; char *estr; memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice)); buf = grpc_byte_buffer_create(&slice, 1); gpr_slice_unref(slice); for (;;) { ev = grpc_completion_queue_next(g_fixture.client_cq, n_seconds_time(1)); if (ev) { switch (ev->type) { default: estr = grpc_event_string(ev); gpr_log(GPR_ERROR, "unexpected event: %s", estr); gpr_free(estr); break; case GRPC_INVOKE_ACCEPTED: /* better not keep going if the invoke failed */ if (ev->data.invoke_accepted == GRPC_OP_OK) { GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, NULL)); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write(ev->call, buf, NULL, 0)); } break; case GRPC_READ: break; case GRPC_WRITE_ACCEPTED: GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(ev->call, NULL)); break; case GRPC_FINISH_ACCEPTED: break; case GRPC_CLIENT_METADATA_READ: break; case GRPC_FINISHED: /* kick off a new request if the test should still be running */ gpr_mu_lock(&g_mu); g_active_requests--; if (gpr_time_cmp(gpr_now(), g_test_end_time) < 0) { start_request(); } gpr_mu_unlock(&g_mu); grpc_call_destroy(ev->call); break; } grpc_event_finish(ev); } gpr_mu_lock(&g_mu); if (g_active_requests == 0) { gpr_mu_unlock(&g_mu); break; } gpr_mu_unlock(&g_mu); } grpc_byte_buffer_destroy(buf); gpr_event_set(&g_client_done[id], (void *)1); } /* Request a new server call. We tag them with a ref-count that starts at two, and decrements after each of: a read completes and a write completes. When it drops to zero, we write status */ static void request_server_call(void) { gpr_refcount *rc = gpr_malloc(sizeof(gpr_refcount)); gpr_ref_init(rc, 2); grpc_server_request_call(g_fixture.server, rc); } static void maybe_end_server_call(grpc_call *call, gpr_refcount *rc) { if (gpr_unref(rc)) { GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write_status(call, GRPC_STATUS_OK, NULL, NULL)); gpr_free(rc); } } static void server_thread(void *p) { int id = (gpr_intptr)p; grpc_event *ev; gpr_slice slice = gpr_slice_malloc(100); grpc_byte_buffer *buf; char *estr; memset(GPR_SLICE_START_PTR(slice), id, GPR_SLICE_LENGTH(slice)); request_server_call(); buf = grpc_byte_buffer_create(&slice, 1); gpr_slice_unref(slice); for (;;) { ev = grpc_completion_queue_next(g_fixture.server_cq, n_seconds_time(1)); if (ev) { switch (ev->type) { default: estr = grpc_event_string(ev); gpr_log(GPR_ERROR, "unexpected event: %s", estr); gpr_free(estr); break; case GRPC_SERVER_RPC_NEW: if (ev->call) { GPR_ASSERT(GRPC_CALL_OK == grpc_call_server_accept(ev->call, g_fixture.server_cq, ev->tag)); GPR_ASSERT(GRPC_CALL_OK == grpc_call_server_end_initial_metadata(ev->call, 0)); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, ev->tag)); GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_write(ev->call, buf, ev->tag, 0)); } else { gpr_free(ev->tag); } break; case GRPC_READ: if (ev->data.read) { GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_read(ev->call, ev->tag)); } else { maybe_end_server_call(ev->call, ev->tag); } break; case GRPC_WRITE_ACCEPTED: maybe_end_server_call(ev->call, ev->tag); break; case GRPC_FINISH_ACCEPTED: break; case GRPC_FINISHED: grpc_call_destroy(ev->call); request_server_call(); break; } grpc_event_finish(ev); } gpr_mu_lock(&g_mu); if (g_active_requests == 0) { gpr_mu_unlock(&g_mu); break; } gpr_mu_unlock(&g_mu); } grpc_byte_buffer_destroy(buf); gpr_event_set(&g_server_done[id], (void *)1); } static void run_test(grpc_end2end_test_config config, int requests_in_flight) { int i; gpr_thd_id thd_id; gpr_log(GPR_INFO, "thread_stress_test/%s @ %d requests", config.name, requests_in_flight); /* setup client, server */ g_fixture = config.create_fixture(NULL, NULL); config.init_client(&g_fixture, NULL); config.init_server(&g_fixture, NULL); /* schedule end time */ g_test_end_time = n_seconds_time(5); g_active_requests = 0; gpr_mu_init(&g_mu); /* kick off threads */ for (i = 0; i < CLIENT_THREADS; i++) { gpr_event_init(&g_client_done[i]); gpr_thd_new(&thd_id, client_thread, (void *)(gpr_intptr) i, NULL); } for (i = 0; i < SERVER_THREADS; i++) { gpr_event_init(&g_server_done[i]); gpr_thd_new(&thd_id, server_thread, (void *)(gpr_intptr) i, NULL); } /* start requests */ gpr_mu_lock(&g_mu); for (i = 0; i < requests_in_flight; i++) { start_request(); } gpr_mu_unlock(&g_mu); /* await completion */ for (i = 0; i < CLIENT_THREADS; i++) { gpr_event_wait(&g_client_done[i], gpr_inf_future); } for (i = 0; i < SERVER_THREADS; i++) { gpr_event_wait(&g_server_done[i], gpr_inf_future); } /* shutdown the things */ grpc_server_shutdown(g_fixture.server); grpc_server_destroy(g_fixture.server); grpc_channel_destroy(g_fixture.client); grpc_completion_queue_shutdown(g_fixture.server_cq); drain_cq(0, g_fixture.server_cq); grpc_completion_queue_destroy(g_fixture.server_cq); grpc_completion_queue_shutdown(g_fixture.client_cq); drain_cq(1, g_fixture.client_cq); grpc_completion_queue_destroy(g_fixture.client_cq); config.tear_down_data(&g_fixture); gpr_mu_destroy(&g_mu); } void grpc_end2end_tests(grpc_end2end_test_config config) { run_test(config, 1000); }