From c6d61c4dd6deb80477cb5977b79b00614b20b409 Mon Sep 17 00:00:00 2001 From: ctiller Date: Mon, 15 Dec 2014 14:52:08 -0800 Subject: GOAWAY & Reconnection support. Clients stay connected to a server after it shutdowns until all active calls have completed, and then they drop. After a GOAWAY or a disconnect, clients will attempt to re-resolve and reconnect to the server. Change on 2014/12/15 by ctiller ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=82178515 --- test/core/end2end/dualstack_socket_test.c | 26 ++-- test/core/end2end/fixtures/chttp2_fake_security.c | 6 +- test/core/end2end/fixtures/chttp2_fullstack.c | 13 +- .../end2end/fixtures/chttp2_simple_ssl_fullstack.c | 6 +- .../chttp2_simple_ssl_with_oauth2_fullstack.c | 6 +- test/core/end2end/fixtures/chttp2_socket_pair.c | 9 +- .../chttp2_socket_pair_one_byte_at_a_time.c | 9 +- test/core/end2end/gen_build_json.py | 1 + test/core/end2end/tests/disappearing_server.c | 168 +++++++++++++++++++++ test/core/iomgr/fd_posix_test.c | 10 +- 10 files changed, 221 insertions(+), 33 deletions(-) create mode 100644 test/core/end2end/tests/disappearing_server.c (limited to 'test') diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index f2dcc80afe..55c4bb25b2 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -171,6 +171,7 @@ void test_connect(const char *server_host, const char *client_host, int port, int main(int argc, char **argv) { int do_ipv6 = 1; int i; + int port = grpc_pick_unused_port_or_die(); grpc_test_init(argc, argv); grpc_init(); @@ -185,24 +186,23 @@ int main(int argc, char **argv) { grpc_forbid_dualstack_sockets_for_testing = i; /* :: and 0.0.0.0 are handled identically. */ - test_connect("::", "127.0.0.1", grpc_pick_unused_port_or_die(), 1); - test_connect("::", "::ffff:127.0.0.1", grpc_pick_unused_port_or_die(), 1); - test_connect("::", "localhost", grpc_pick_unused_port_or_die(), 1); - test_connect("0.0.0.0", "127.0.0.1", grpc_pick_unused_port_or_die(), 1); - test_connect("0.0.0.0", "::ffff:127.0.0.1", grpc_pick_unused_port_or_die(), - 1); - test_connect("0.0.0.0", "localhost", grpc_pick_unused_port_or_die(), 1); + test_connect("::", "127.0.0.1", port, 1); + test_connect("::", "::ffff:127.0.0.1", port, 1); + test_connect("::", "localhost", port, 1); + test_connect("0.0.0.0", "127.0.0.1", port, 1); + test_connect("0.0.0.0", "::ffff:127.0.0.1", port, 1); + test_connect("0.0.0.0", "localhost", port, 1); if (do_ipv6) { - test_connect("::", "::1", grpc_pick_unused_port_or_die(), 1); - test_connect("0.0.0.0", "::1", grpc_pick_unused_port_or_die(), 1); + test_connect("::", "::1", port, 1); + test_connect("0.0.0.0", "::1", port, 1); } /* These only work when the families agree. */ - test_connect("127.0.0.1", "127.0.0.1", grpc_pick_unused_port_or_die(), 1); + test_connect("127.0.0.1", "127.0.0.1", port, 1); if (do_ipv6) { - test_connect("::1", "::1", grpc_pick_unused_port_or_die(), 1); - test_connect("::1", "127.0.0.1", grpc_pick_unused_port_or_die(), 0); - test_connect("127.0.0.1", "::1", grpc_pick_unused_port_or_die(), 0); + test_connect("::1", "::1", port, 1); + test_connect("::1", "127.0.0.1", port, 0); + test_connect("127.0.0.1", "::1", port, 0); } } diff --git a/test/core/end2end/fixtures/chttp2_fake_security.c b/test/core/end2end/fixtures/chttp2_fake_security.c index ff249ce7aa..e170a55db5 100644 --- a/test/core/end2end/fixtures/chttp2_fake_security.c +++ b/test/core/end2end/fixtures/chttp2_fake_security.c @@ -57,6 +57,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( fullstack_secure_fixture_data *ffd = gpr_malloc(sizeof(fullstack_secure_fixture_data)); + memset(&f, 0, sizeof(f)); gpr_join_host_port(&ffd->localaddr, "localhost", port); f.fixture_data = ffd; @@ -79,10 +80,13 @@ static void chttp2_init_server_secure_fullstack( grpc_end2end_test_fixture *f, grpc_channel_args *server_args, grpc_server_credentials *server_creds) { fullstack_secure_fixture_data *ffd = f->fixture_data; + if (f->server) { + grpc_server_destroy(f->server); + } f->server = grpc_secure_server_create(server_creds, f->server_cq, server_args); grpc_server_credentials_release(server_creds); - grpc_server_add_secure_http2_port(f->server, ffd->localaddr); + GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr)); grpc_server_start(f->server); } diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c index 169032f6ba..86d5dd708b 100644 --- a/test/core/end2end/fixtures/chttp2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_fullstack.c @@ -33,14 +33,7 @@ #include "test/core/end2end/end2end_tests.h" -#include -#include #include -#include -#include -#include -#include -#include #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" @@ -68,6 +61,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack( grpc_end2end_test_fixture f; int port = grpc_pick_unused_port_or_die(); fullstack_fixture_data *ffd = gpr_malloc(sizeof(fullstack_fixture_data)); + memset(&f, 0, sizeof(f)); gpr_join_host_port(&ffd->localaddr, "localhost", port); @@ -87,8 +81,11 @@ void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f, void chttp2_init_server_fullstack(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { fullstack_fixture_data *ffd = f->fixture_data; + if (f->server) { + grpc_server_destroy(f->server); + } f->server = grpc_server_create(f->server_cq, server_args); - grpc_server_add_http2_port(f->server, ffd->localaddr); + GPR_ASSERT(grpc_server_add_http2_port(f->server, ffd->localaddr)); grpc_server_start(f->server); } diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c index 7b0adb2e8c..047cb801c7 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c @@ -56,6 +56,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( int port = grpc_pick_unused_port_or_die(); fullstack_secure_fixture_data *ffd = gpr_malloc(sizeof(fullstack_secure_fixture_data)); + memset(&f, 0, sizeof(f)); gpr_join_host_port(&ffd->localaddr, "localhost", port); @@ -79,10 +80,13 @@ static void chttp2_init_server_secure_fullstack( grpc_end2end_test_fixture *f, grpc_channel_args *server_args, grpc_server_credentials *server_creds) { fullstack_secure_fixture_data *ffd = f->fixture_data; + if (f->server) { + grpc_server_destroy(f->server); + } f->server = grpc_secure_server_create(server_creds, f->server_cq, server_args); grpc_server_credentials_release(server_creds); - grpc_server_add_secure_http2_port(f->server, ffd->localaddr); + GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr)); grpc_server_start(f->server); } diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c index 04a8795b38..5c7d61eefe 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c @@ -57,6 +57,7 @@ static grpc_end2end_test_fixture chttp2_create_fixture_secure_fullstack( int port = grpc_pick_unused_port_or_die(); fullstack_secure_fixture_data *ffd = gpr_malloc(sizeof(fullstack_secure_fixture_data)); + memset(&f, 0, sizeof(f)); gpr_join_host_port(&ffd->localaddr, "localhost", port); @@ -80,10 +81,13 @@ static void chttp2_init_server_secure_fullstack( grpc_end2end_test_fixture *f, grpc_channel_args *server_args, grpc_server_credentials *server_creds) { fullstack_secure_fixture_data *ffd = f->fixture_data; + if (f->server) { + grpc_server_destroy(f->server); + } f->server = grpc_secure_server_create(server_creds, f->server_cq, server_args); grpc_server_credentials_release(server_creds); - grpc_server_add_secure_http2_port(f->server, ffd->localaddr); + GPR_ASSERT(grpc_server_add_secure_http2_port(f->server, ffd->localaddr)); grpc_server_start(f->server); } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index 7ec17e3cc5..1410477134 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -32,6 +32,9 @@ */ #include "test/core/end2end/end2end_tests.h" + +#include + #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_filter.h" @@ -88,11 +91,10 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( grpc_endpoint_pair *sfd = gpr_malloc(sizeof(grpc_endpoint_pair)); grpc_end2end_test_fixture f; + memset(&f, 0, sizeof(f)); f.fixture_data = sfd; f.client_cq = grpc_completion_queue_create(); f.server_cq = grpc_completion_queue_create(); - f.server = grpc_server_create_from_filters(f.server_cq, NULL, 0, server_args); - f.client = NULL; *sfd = grpc_iomgr_create_endpoint_pair(65536); @@ -113,6 +115,9 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_endpoint_pair *sfd = f->fixture_data; + GPR_ASSERT(!f->server); + f->server = + grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); } diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index 3e18de9b91..bbca425451 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -32,6 +32,9 @@ */ #include "test/core/end2end/end2end_tests.h" + +#include + #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_filter.h" @@ -88,11 +91,10 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( grpc_endpoint_pair *sfd = gpr_malloc(sizeof(grpc_endpoint_pair)); grpc_end2end_test_fixture f; + memset(&f, 0, sizeof(f)); f.fixture_data = sfd; f.client_cq = grpc_completion_queue_create(); f.server_cq = grpc_completion_queue_create(); - f.server = grpc_server_create_from_filters(f.server_cq, NULL, 0, server_args); - f.client = NULL; *sfd = grpc_iomgr_create_endpoint_pair(1); @@ -113,6 +115,9 @@ static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { grpc_endpoint_pair *sfd = f->fixture_data; + GPR_ASSERT(!f->server); + f->server = + grpc_server_create_from_filters(f->server_cq, NULL, 0, server_args); grpc_create_chttp2_transport(server_setup_transport, f, server_args, sfd->server, NULL, 0, grpc_mdctx_create(), 0); } diff --git a/test/core/end2end/gen_build_json.py b/test/core/end2end/gen_build_json.py index ebbb2dae12..a6b5a0c827 100755 --- a/test/core/end2end/gen_build_json.py +++ b/test/core/end2end/gen_build_json.py @@ -21,6 +21,7 @@ END2END_TESTS = [ 'cancel_after_invoke', 'cancel_before_invoke', 'cancel_in_a_vacuum', + 'disappearing_server', 'early_server_shutdown_finishes_inflight_calls', 'early_server_shutdown_finishes_tags', 'invoke_large_request', diff --git a/test/core/end2end/tests/disappearing_server.c b/test/core/end2end/tests/disappearing_server.c new file mode 100644 index 0000000000..4d3738a365 --- /dev/null +++ b/test/core/end2end/tests/disappearing_server.c @@ -0,0 +1,168 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "test/core/end2end/end2end_tests.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include "test/core/end2end/cq_verifier.h" + +enum { TIMEOUT = 200000 }; + +static void *tag(gpr_intptr t) { return (void *)t; } + +static gpr_timespec n_seconds_time(int n) { + return gpr_time_add(gpr_now(), gpr_time_from_micros(GPR_US_PER_SEC * n)); +} + +static gpr_timespec five_seconds_time() { return n_seconds_time(5); } + +static void drain_cq(grpc_completion_queue *cq) { + grpc_event *ev; + grpc_completion_type type; + do { + ev = grpc_completion_queue_next(cq, five_seconds_time()); + GPR_ASSERT(ev); + type = ev->type; + grpc_event_finish(ev); + } while (type != GRPC_QUEUE_SHUTDOWN); +} + +static void shutdown_server(grpc_end2end_test_fixture *f) { + if (!f->server) return; + grpc_server_shutdown(f->server); + grpc_server_destroy(f->server); + f->server = NULL; +} + +static void shutdown_client(grpc_end2end_test_fixture *f) { + if (!f->client) return; + grpc_channel_destroy(f->client); + f->client = NULL; +} + +static void end_test(grpc_end2end_test_fixture *f) { + shutdown_server(f); + shutdown_client(f); + + grpc_completion_queue_shutdown(f->server_cq); + drain_cq(f->server_cq); + grpc_completion_queue_destroy(f->server_cq); + grpc_completion_queue_shutdown(f->client_cq); + drain_cq(f->client_cq); + grpc_completion_queue_destroy(f->client_cq); +} + +static void do_request_and_shutdown_server(grpc_end2end_test_fixture *f, + cq_verifier *v_client, + cq_verifier *v_server) { + grpc_call *c; + grpc_call *s; + grpc_status send_status = {GRPC_STATUS_UNIMPLEMENTED, "xyz"}; + gpr_timespec deadline = five_seconds_time(); + + c = grpc_channel_create_call(f->client, "/foo", "test.google.com", deadline); + GPR_ASSERT(c); + + GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_invoke(c, f->client_cq, tag(1), + tag(2), tag(3), 0)); + cq_expect_invoke_accepted(v_client, tag(1), GRPC_OP_OK); + + cq_verify(v_client); + + GPR_ASSERT(GRPC_CALL_OK == grpc_call_writes_done(c, tag(4))); + cq_expect_finish_accepted(v_client, tag(4), GRPC_OP_OK); + cq_verify(v_client); + + GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(f->server, tag(100))); + cq_expect_server_rpc_new(v_server, &s, tag(100), "/foo", "test.google.com", + deadline, NULL); + cq_verify(v_server); + + GPR_ASSERT(GRPC_CALL_OK == grpc_call_accept(s, f->server_cq, tag(102), 0)); + cq_expect_client_metadata_read(v_client, tag(2), NULL); + cq_verify(v_client); + + /* should be able to shut down the server early + - and still complete the request */ + grpc_server_shutdown(f->server); + + GPR_ASSERT(GRPC_CALL_OK == + grpc_call_start_write_status(s, send_status, tag(5))); + cq_expect_finished_with_status(v_client, tag(3), send_status, NULL); + cq_verify(v_client); + + cq_expect_finish_accepted(v_server, tag(5), GRPC_OP_OK); + cq_expect_finished(v_server, tag(102), NULL); + cq_verify(v_server); + + grpc_call_destroy(c); + grpc_call_destroy(s); +} + +static void disappearing_server_test(grpc_end2end_test_config config) { + grpc_end2end_test_fixture f = config.create_fixture(NULL, NULL); + cq_verifier *v_client = cq_verifier_create(f.client_cq); + cq_verifier *v_server = cq_verifier_create(f.server_cq); + + gpr_log(GPR_INFO, "%s/%s", __FUNCTION__, config.name); + + config.init_client(&f, NULL); + config.init_server(&f, NULL); + + do_request_and_shutdown_server(&f, v_client, v_server); + + /* now destroy and recreate the server */ + config.init_server(&f, NULL); + + do_request_and_shutdown_server(&f, v_client, v_server); + + cq_verifier_destroy(v_client); + cq_verifier_destroy(v_server); + + end_test(&f); + config.tear_down_data(&f); +} + +void grpc_end2end_tests(grpc_end2end_test_config config) { + if (config.feature_mask & FEATURE_MASK_SUPPORTS_DELAYED_CONNECTION) { + disappearing_server_test(config); + } +} diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 4d4461ec6c..c3a0afdb25 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -120,7 +120,7 @@ static void session_shutdown_cb(void *arg, /*session*/ enum grpc_em_cb_status status) { session *se = arg; server *sv = se->sv; - grpc_fd_destroy(se->em_fd); + grpc_fd_destroy(se->em_fd, NULL, NULL); gpr_free(se); /* Start to shutdown listen fd. */ grpc_fd_shutdown(sv->em_fd); @@ -178,7 +178,7 @@ static void listen_shutdown_cb(void *arg /*server*/, enum grpc_em_cb_status status) { server *sv = arg; - grpc_fd_destroy(sv->em_fd); + grpc_fd_destroy(sv->em_fd, NULL, NULL); gpr_mu_lock(&sv->mu); sv->done = 1; @@ -288,7 +288,7 @@ static void client_init(client *cl) { static void client_session_shutdown_cb(void *arg /*client*/, enum grpc_em_cb_status status) { client *cl = arg; - grpc_fd_destroy(cl->em_fd); + grpc_fd_destroy(cl->em_fd, NULL, NULL); gpr_mu_lock(&cl->mu); cl->done = 1; gpr_cv_signal(&cl->done_cv); @@ -468,7 +468,7 @@ static void test_grpc_fd_change() { GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(&b.mu); - grpc_fd_destroy(em_fd); + grpc_fd_destroy(em_fd, NULL, NULL); destroy_change_data(&a); destroy_change_data(&b); close(sv[0]); @@ -509,7 +509,7 @@ void test_grpc_fd_notify_timeout() { GPR_ASSERT(gpr_event_wait(&ev, gpr_time_add(deadline, timeout))); GPR_ASSERT(gpr_event_get(&ev) == (void *)1); - grpc_fd_destroy(em_fd); + grpc_fd_destroy(em_fd, NULL, NULL); close(sv[1]); } -- cgit v1.2.3