diff options
author | 2014-12-09 14:39:16 -0800 | |
---|---|---|
committer | 2014-12-09 16:20:55 -0800 | |
commit | 18b49ab914ea5a57f22ed6d77520cd7d4372749b (patch) | |
tree | c2ec5971eebd10e3ef52c0c084c797b8d06bb267 /test | |
parent | 98bffb779b8c47f4d76c72c7807d9f1b1074a795 (diff) |
Introducing iomgr.
Move eventmanager and platform dependent endpoint functionality into a single
library called 'iomgr'.
This is primarily to prepare for a Windows port - where posix socket semantics
lead to poor quality code.
Mostly this is a code movement CL, with some small changes to help prepare the
way for porting:
- em style fd objects can only be held internally in iomgr, and own their memory
- added grpc_iomgr_create_endpoint_pair() to accomodate the common pattern of
creating a tcp endpoint from the output of socketpair - this will help keep
our tests portable
- separated em alarm interface into a separate file, as this part of event
manager is needed higher up the stack
- made the eventmanager bits a true singleton, simplifying API's across the
stack as there's no longer a reason to carry a pointer there.
Initial design document is here:
https://docs.google.com/document/d/1VmafcHvvrP5kwtQkz84R5yXF7u7fW-9Pn0bkSUQHDt8/edit?disco=AAAAARNByxg
Change on 2014/12/09 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=81716456
Diffstat (limited to 'test')
20 files changed, 526 insertions, 637 deletions
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c index 4813672104..e127c61b25 100644 --- a/test/core/end2end/dualstack_socket_test.c +++ b/test/core/end2end/dualstack_socket_test.c @@ -31,7 +31,7 @@ * */ -#include "src/core/endpoint/socket_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" #include <grpc/grpc.h> #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> @@ -165,16 +165,10 @@ void test_connect(const char *server_host, const char *client_host, int port, grpc_completion_queue_shutdown(server_cq); drain_cq(server_cq); grpc_completion_queue_destroy(server_cq); - /* TODO(klempner): We need to give the EM time to actually close the listening - socket, or later tests will fail to bind to this port. We should fix this - by adding an API to EM to get notified when this happens and having it - prevent listener teardown. */ - gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_millis(250))); } int main(int argc, char **argv) { int i; - int port = grpc_pick_unused_port_or_die(); grpc_test_init(argc, argv); grpc_init(); @@ -184,20 +178,21 @@ int main(int argc, char **argv) { grpc_forbid_dualstack_sockets_for_testing = i; /* :: and 0.0.0.0 are handled identically. */ - test_connect("::", "127.0.0.1", port, 1); - test_connect("::", "::1", port, 1); - test_connect("::", "::ffff:127.0.0.1", port, 1); - test_connect("::", "localhost", port, 1); - test_connect("0.0.0.0", "127.0.0.1", port, 1); - test_connect("0.0.0.0", "::1", port, 1); - test_connect("0.0.0.0", "::ffff:127.0.0.1", port, 1); - test_connect("0.0.0.0", "localhost", port, 1); + test_connect("::", "127.0.0.1", grpc_pick_unused_port_or_die(), 1); + test_connect("::", "::1", grpc_pick_unused_port_or_die(), 1); + test_connect("::", "::ffff:127.0.0.1", grpc_pick_unused_port_or_die(), 1); + test_connect("::", "localhost", grpc_pick_unused_port_or_die(), 1); + test_connect("0.0.0.0", "127.0.0.1", grpc_pick_unused_port_or_die(), 1); + test_connect("0.0.0.0", "::1", grpc_pick_unused_port_or_die(), 1); + test_connect("0.0.0.0", "::ffff:127.0.0.1", grpc_pick_unused_port_or_die(), + 1); + test_connect("0.0.0.0", "localhost", grpc_pick_unused_port_or_die(), 1); /* These only work when the families agree. */ - test_connect("::1", "::1", port, 1); - test_connect("::1", "127.0.0.1", port, 0); - test_connect("127.0.0.1", "127.0.0.1", port, 1); - test_connect("127.0.0.1", "::1", port, 0); + test_connect("::1", "::1", grpc_pick_unused_port_or_die(), 1); + test_connect("::1", "127.0.0.1", grpc_pick_unused_port_or_die(), 0); + test_connect("127.0.0.1", "127.0.0.1", grpc_pick_unused_port_or_die(), 1); + test_connect("127.0.0.1", "::1", grpc_pick_unused_port_or_die(), 0); } diff --git a/test/core/end2end/fixtures/chttp2_fake_security.c b/test/core/end2end/fixtures/chttp2_fake_security.c index aaca56336f..ff249ce7aa 100644 --- a/test/core/end2end/fixtures/chttp2_fake_security.c +++ b/test/core/end2end/fixtures/chttp2_fake_security.c @@ -37,7 +37,6 @@ #include <string.h> #include "src/core/channel/channel_args.h" -#include "src/core/eventmanager/em.h" #include "src/core/security/credentials.h" #include "src/core/security/security_context.h" #include <grpc/support/alloc.h> @@ -47,8 +46,6 @@ #include "test/core/util/port.h" #include "test/core/end2end/data/ssl_test_data.h" -static grpc_em em; - typedef struct fullstack_secure_fixture_data { char *localaddr; } fullstack_secure_fixture_data; @@ -124,13 +121,11 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); - grpc_em_init(&em); for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { grpc_end2end_tests(configs[i]); } - GPR_ASSERT(grpc_em_destroy(&em) == GRPC_EM_OK); grpc_shutdown(); return 0; diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c index da75d61e66..169032f6ba 100644 --- a/test/core/end2end/fixtures/chttp2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_fullstack.c @@ -46,11 +46,9 @@ #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" -#include "src/core/eventmanager/em.h" #include "src/core/surface/channel.h" #include "src/core/surface/client.h" #include "src/core/surface/server.h" -#include "src/core/surface/surface_em.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/host_port.h> diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c index 57c9141d95..7b0adb2e8c 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c @@ -37,7 +37,6 @@ #include <string.h> #include "src/core/channel/channel_args.h" -#include "src/core/eventmanager/em.h" #include "src/core/security/credentials.h" #include "src/core/security/security_context.h" #include <grpc/support/alloc.h> @@ -47,8 +46,6 @@ #include "test/core/util/port.h" #include "test/core/end2end/data/ssl_test_data.h" -static grpc_em em; - typedef struct fullstack_secure_fixture_data { char *localaddr; } fullstack_secure_fixture_data; @@ -131,13 +128,11 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); - grpc_em_init(&em); for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { grpc_end2end_tests(configs[i]); } - GPR_ASSERT(grpc_em_destroy(&em) == GRPC_EM_OK); grpc_shutdown(); return 0; diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c index 8d5585312a..04a8795b38 100644 --- a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c +++ b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c @@ -37,7 +37,7 @@ #include <string.h> #include "src/core/channel/channel_args.h" -#include "src/core/eventmanager/em.h" +#include "src/core/iomgr/iomgr.h" #include "src/core/security/credentials.h" #include "src/core/security/security_context.h" #include <grpc/support/alloc.h> @@ -47,8 +47,6 @@ #include "test/core/util/port.h" #include "test/core/end2end/data/ssl_test_data.h" -static grpc_em em; - typedef struct fullstack_secure_fixture_data { char *localaddr; } fullstack_secure_fixture_data; @@ -138,13 +136,11 @@ int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_init(); - grpc_em_init(&em); for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) { grpc_end2end_tests(configs[i]); } - GPR_ASSERT(grpc_em_destroy(&em) == GRPC_EM_OK); grpc_shutdown(); return 0; diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c index 593ff78ba8..7ec17e3cc5 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair.c @@ -32,25 +32,15 @@ */ #include "test/core/end2end/end2end_tests.h" - -#include <errno.h> -#include <fcntl.h> -#include <string.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <unistd.h> -#include <stdlib.h> -#include <stdio.h> - #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" -#include "src/core/eventmanager/em.h" +#include "src/core/iomgr/endpoint_pair.h" +#include "src/core/iomgr/iomgr.h" #include "src/core/surface/channel.h" #include "src/core/surface/client.h" #include "src/core/surface/server.h" -#include "src/core/surface/surface_em.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -60,15 +50,6 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" -static void create_sockets(int sv[2]) { - int flags; - GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); - flags = fcntl(sv[0], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); - flags = fcntl(sv[1], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); -} - /* chttp2 transport that is immediately available (used for testing connected_channel without a client_channel */ @@ -102,11 +83,9 @@ static grpc_transport_setup_result client_setup_transport( grpc_channel_get_channel_stack(channel), transport); } -typedef struct socketpair_fixture_data { int sv[2]; } socketpair_fixture_data; - static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( grpc_channel_args *client_args, grpc_channel_args *server_args) { - socketpair_fixture_data *sfd = gpr_malloc(sizeof(socketpair_fixture_data)); + grpc_endpoint_pair *sfd = gpr_malloc(sizeof(grpc_endpoint_pair)); grpc_end2end_test_fixture f; f.fixture_data = sfd; @@ -115,31 +94,27 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( f.server = grpc_server_create_from_filters(f.server_cq, NULL, 0, server_args); f.client = NULL; - create_sockets(sfd->sv); + *sfd = grpc_iomgr_create_endpoint_pair(65536); return f; } static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *client_args) { - socketpair_fixture_data *sfd = f->fixture_data; - grpc_endpoint *cli_tcp; + grpc_endpoint_pair *sfd = f->fixture_data; sp_client_setup cs; cs.client_args = client_args; cs.f = f; - cli_tcp = grpc_tcp_create_dbg(sfd->sv[0], grpc_surface_em(), 65536); grpc_create_chttp2_transport(client_setup_transport, &cs, client_args, - cli_tcp, NULL, 0, grpc_mdctx_create(), 1); + sfd->client, NULL, 0, grpc_mdctx_create(), 1); GPR_ASSERT(f->client); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { - socketpair_fixture_data *sfd = f->fixture_data; - grpc_endpoint *svr_tcp; - svr_tcp = grpc_tcp_create_dbg(sfd->sv[1], grpc_surface_em(), 65536); - grpc_create_chttp2_transport(server_setup_transport, f, server_args, svr_tcp, - NULL, 0, grpc_mdctx_create(), 0); + grpc_endpoint_pair *sfd = f->fixture_data; + grpc_create_chttp2_transport(server_setup_transport, f, server_args, + sfd->server, NULL, 0, grpc_mdctx_create(), 0); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) { diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c index 9287364c48..3e18de9b91 100644 --- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c +++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c @@ -32,25 +32,15 @@ */ #include "test/core/end2end/end2end_tests.h" - -#include <errno.h> -#include <fcntl.h> -#include <string.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <unistd.h> -#include <stdlib.h> -#include <stdio.h> - #include "src/core/channel/client_channel.h" #include "src/core/channel/connected_channel.h" #include "src/core/channel/http_filter.h" #include "src/core/channel/http_server_filter.h" -#include "src/core/eventmanager/em.h" +#include "src/core/iomgr/endpoint_pair.h" +#include "src/core/iomgr/iomgr.h" #include "src/core/surface/channel.h" #include "src/core/surface/client.h" #include "src/core/surface/server.h" -#include "src/core/surface/surface_em.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> @@ -60,15 +50,6 @@ #include "test/core/util/port.h" #include "test/core/util/test_config.h" -static void create_sockets(int sv[2]) { - int flags; - GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); - flags = fcntl(sv[0], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); - flags = fcntl(sv[1], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); -} - /* chttp2 transport that is immediately available (used for testing connected_channel without a client_channel */ @@ -102,11 +83,9 @@ static grpc_transport_setup_result client_setup_transport( grpc_channel_get_channel_stack(channel), transport); } -typedef struct socketpair_fixture_data { int sv[2]; } socketpair_fixture_data; - static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( grpc_channel_args *client_args, grpc_channel_args *server_args) { - socketpair_fixture_data *sfd = gpr_malloc(sizeof(socketpair_fixture_data)); + grpc_endpoint_pair *sfd = gpr_malloc(sizeof(grpc_endpoint_pair)); grpc_end2end_test_fixture f; f.fixture_data = sfd; @@ -115,31 +94,27 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair( f.server = grpc_server_create_from_filters(f.server_cq, NULL, 0, server_args); f.client = NULL; - create_sockets(sfd->sv); + *sfd = grpc_iomgr_create_endpoint_pair(1); return f; } static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *client_args) { - socketpair_fixture_data *sfd = f->fixture_data; - grpc_endpoint *cli_tcp; + grpc_endpoint_pair *sfd = f->fixture_data; sp_client_setup cs; cs.client_args = client_args; cs.f = f; - cli_tcp = grpc_tcp_create_dbg(sfd->sv[0], grpc_surface_em(), 1); grpc_create_chttp2_transport(client_setup_transport, &cs, client_args, - cli_tcp, NULL, 0, grpc_mdctx_create(), 1); + sfd->client, NULL, 0, grpc_mdctx_create(), 1); GPR_ASSERT(f->client); } static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f, grpc_channel_args *server_args) { - socketpair_fixture_data *sfd = f->fixture_data; - grpc_endpoint *svr_tcp; - svr_tcp = grpc_tcp_create_dbg(sfd->sv[1], grpc_surface_em(), 1); - grpc_create_chttp2_transport(server_setup_transport, f, server_args, svr_tcp, - NULL, 0, grpc_mdctx_create(), 0); + grpc_endpoint_pair *sfd = f->fixture_data; + grpc_create_chttp2_transport(server_setup_transport, f, server_args, + sfd->server, NULL, 0, grpc_mdctx_create(), 0); } static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) { diff --git a/test/core/endpoint/secure_endpoint_test.c b/test/core/endpoint/secure_endpoint_test.c index 4fd5dee677..18a33b5f52 100644 --- a/test/core/endpoint/secure_endpoint_test.c +++ b/test/core/endpoint/secure_endpoint_test.c @@ -39,41 +39,25 @@ #include <unistd.h> #include "src/core/endpoint/secure_endpoint.h" -#include "src/core/endpoint/tcp.h" -#include "src/core/eventmanager/em.h" -#include "src/core/tsi/fake_transport_security.h" +#include "src/core/iomgr/endpoint_pair.h" +#include "src/core/iomgr/iomgr.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include "test/core/util/test_config.h" - -grpc_em g_em; - -static void create_sockets(int sv[2]) { - int flags; - GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); - flags = fcntl(sv[0], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); - flags = fcntl(sv[1], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); -} +#include "src/core/tsi/fake_transport_security.h" static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( size_t slice_size, gpr_slice *leftover_slices, size_t leftover_nslices) { - int sv[2]; tsi_frame_protector *fake_read_protector = tsi_create_fake_protector(NULL); tsi_frame_protector *fake_write_protector = tsi_create_fake_protector(NULL); grpc_endpoint_test_fixture f; - grpc_endpoint *tcp_read; - grpc_endpoint *tcp_write; + grpc_endpoint_pair tcp; - create_sockets(sv); - grpc_em_init(&g_em); - tcp_read = grpc_tcp_create_dbg(sv[0], &g_em, slice_size); - tcp_write = grpc_tcp_create(sv[1], &g_em); + tcp = grpc_iomgr_create_endpoint_pair(slice_size); if (leftover_nslices == 0) { f.client_ep = - grpc_secure_endpoint_create(fake_read_protector, tcp_read, NULL, 0); + grpc_secure_endpoint_create(fake_read_protector, tcp.client, NULL, 0); } else { int i; tsi_result result; @@ -115,14 +99,14 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair( } while (still_pending_size > 0); encrypted_leftover = gpr_slice_from_copied_buffer( (const char *)encrypted_buffer, total_buffer_size - buffer_size); - f.client_ep = grpc_secure_endpoint_create(fake_read_protector, tcp_read, + f.client_ep = grpc_secure_endpoint_create(fake_read_protector, tcp.client, &encrypted_leftover, 1); gpr_slice_unref(encrypted_leftover); gpr_free(encrypted_buffer); } f.server_ep = - grpc_secure_endpoint_create(fake_write_protector, tcp_write, NULL, 0); + grpc_secure_endpoint_create(fake_write_protector, tcp.server, NULL, 0); return f; } @@ -141,7 +125,7 @@ secure_endpoint_create_fixture_tcp_socketpair_leftover(size_t slice_size) { return f; } -static void clean_up() { grpc_em_destroy(&g_em); } +static void clean_up() {} static grpc_endpoint_test_config configs[] = { {"secure_ep/tcp_socketpair", @@ -213,9 +197,11 @@ static void test_destroy_ep_early(grpc_endpoint_test_config config, int main(int argc, char **argv) { grpc_test_init(argc, argv); + grpc_iomgr_init(); grpc_endpoint_tests(configs[0]); test_leftover(configs[1], 1); test_destroy_ep_early(configs[1], 1); + grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/eventmanager/em_pipe_test.c b/test/core/eventmanager/em_pipe_test.c deleted file mode 100644 index f2414c42b1..0000000000 --- a/test/core/eventmanager/em_pipe_test.c +++ /dev/null @@ -1,198 +0,0 @@ -/* - * - * Copyright 2014, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -/* Test grpc_em_fd with pipe. The test creates a pipe with non-blocking mode, - sends a stream of bytes through the pipe, and verifies that all bytes are - received. */ -#include "src/core/eventmanager/em.h" - -#include <errno.h> -#include <fcntl.h> -#include <pthread.h> -#include <string.h> -#include <stdio.h> -#include <unistd.h> - -#include <grpc/support/log.h> -#include "test/core/util/test_config.h" - -/* Operation for fcntl() to set pipe buffer size. */ -#ifndef F_SETPIPE_SZ -#define F_SETPIPE_SZ (1024 + 7) -#endif - -#define TOTAL_WRITE 3 /* total number of times that the write buffer is full. \ - */ -#define BUF_SIZE 1024 -char read_buf[BUF_SIZE]; -char write_buf[BUF_SIZE]; - -typedef struct { - int fd[2]; - grpc_em em; - grpc_em_fd read_em_fd; - grpc_em_fd write_em_fd; - int num_write; /* number of times that the write buffer is full*/ - ssize_t bytes_written_total; /* total number of bytes written to the pipe */ - ssize_t bytes_read_total; /* total number of bytes read from the pipe */ - pthread_mutex_t mu; /* protect cv and done */ - pthread_cond_t cv; /* signaled when read finished */ - int done; /* set to 1 when read finished */ -} async_pipe; - -void write_shutdown_cb(void *arg, /*async_pipe*/ - enum grpc_em_cb_status status) { - async_pipe *ap = arg; - grpc_em_fd_destroy(&ap->write_em_fd); -} - -void write_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) { - async_pipe *ap = arg; - ssize_t bytes_written = 0; - - if (status == GRPC_CALLBACK_CANCELLED) { - write_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS); - return; - } - - do { - bytes_written = write(ap->fd[1], write_buf, BUF_SIZE); - if (bytes_written > 0) ap->bytes_written_total += bytes_written; - } while (bytes_written > 0); - - if (errno == EAGAIN) { - if (ap->num_write < TOTAL_WRITE) { - ap->num_write++; - grpc_em_fd_notify_on_write(&ap->write_em_fd, write_cb, ap, - gpr_inf_future); - } else { - /* Note that this could just shut down directly; doing a trip through the - shutdown path serves only a demonstration of the API. */ - grpc_em_fd_shutdown(&ap->write_em_fd); - grpc_em_fd_notify_on_write(&ap->write_em_fd, write_cb, ap, - gpr_inf_future); - } - } else { - GPR_ASSERT(0 && strcat("unknown errno: ", strerror(errno))); - } -} - -void read_shutdown_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) { - async_pipe *ap = arg; - grpc_em_fd_destroy(&ap->read_em_fd); - pthread_mutex_lock(&ap->mu); - if (ap->done == 0) { - ap->done = 1; - pthread_cond_signal(&ap->cv); - } - pthread_mutex_unlock(&ap->mu); -} - -void read_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) { - async_pipe *ap = arg; - ssize_t bytes_read = 0; - - if (status == GRPC_CALLBACK_CANCELLED) { - read_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS); - return; - } - - do { - bytes_read = read(ap->fd[0], read_buf, BUF_SIZE); - if (bytes_read > 0) ap->bytes_read_total += bytes_read; - } while (bytes_read > 0); - - if (bytes_read == 0) { - /* Note that this could just shut down directly; doing a trip through the - shutdown path serves only a demonstration of the API. */ - grpc_em_fd_shutdown(&ap->read_em_fd); - grpc_em_fd_notify_on_read(&ap->read_em_fd, read_cb, ap, gpr_inf_future); - } else if (bytes_read == -1) { - if (errno == EAGAIN) { - grpc_em_fd_notify_on_read(&ap->read_em_fd, read_cb, ap, gpr_inf_future); - } else { - GPR_ASSERT(0 && strcat("unknown errno: ", strerror(errno))); - } - } -} - -void dummy_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) {} - -void async_pipe_init(async_pipe *ap) { - int i; - - ap->num_write = 0; - ap->bytes_written_total = 0; - ap->bytes_read_total = 0; - - pthread_mutex_init(&ap->mu, NULL); - pthread_cond_init(&ap->cv, NULL); - ap->done = 0; - - GPR_ASSERT(0 == pipe(ap->fd)); - for (i = 0; i < 2; i++) { - int flags = fcntl(ap->fd[i], F_GETFL, 0); - GPR_ASSERT(fcntl(ap->fd[i], F_SETFL, flags | O_NONBLOCK) == 0); - GPR_ASSERT(fcntl(ap->fd[i], F_SETPIPE_SZ, 4096) == 4096); - } - - grpc_em_init(&ap->em); - grpc_em_fd_init(&ap->read_em_fd, &ap->em, ap->fd[0]); - grpc_em_fd_init(&ap->write_em_fd, &ap->em, ap->fd[1]); -} - -static void async_pipe_start(async_pipe *ap) { - grpc_em_fd_notify_on_read(&ap->read_em_fd, read_cb, ap, gpr_inf_future); - grpc_em_fd_notify_on_write(&ap->write_em_fd, write_cb, ap, gpr_inf_future); -} - -static void async_pipe_wait_destroy(async_pipe *ap) { - pthread_mutex_lock(&ap->mu); - while (!ap->done) pthread_cond_wait(&ap->cv, &ap->mu); - pthread_mutex_unlock(&ap->mu); - pthread_mutex_destroy(&ap->mu); - pthread_cond_destroy(&ap->cv); - - grpc_em_destroy(&ap->em); -} - -int main(int argc, char **argv) { - async_pipe ap; - grpc_test_init(argc, argv); - async_pipe_init(&ap); - async_pipe_start(&ap); - async_pipe_wait_destroy(&ap); - GPR_ASSERT(ap.bytes_read_total == ap.bytes_written_total); - gpr_log(GPR_INFO, "read total bytes %d", ap.bytes_read_total); - return 0; -} diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c index 5c0d87c427..c901e595f6 100644 --- a/test/core/httpcli/httpcli_test.c +++ b/test/core/httpcli/httpcli_test.c @@ -35,11 +35,12 @@ #include <string.h> +#include "src/core/iomgr/iomgr.h" #include <grpc/support/log.h> +#include <grpc/support/sync.h> #include "test/core/util/test_config.h" static gpr_event g_done; -static grpc_em g_em; static gpr_timespec n_seconds_time(int seconds) { return gpr_time_add(gpr_now(), gpr_time_from_micros(seconds * 1000000)); @@ -55,7 +56,7 @@ static void on_finish(void *arg, const grpc_httpcli_response *response) { static void test_get(int use_ssl) { grpc_httpcli_request req; - gpr_log(GPR_INFO, "running %s with use_ssl=%d.", __FUNCTION__, (int)use_ssl); + gpr_log(GPR_INFO, "running %s with use_ssl=%d.", __FUNCTION__, use_ssl); gpr_event_init(&g_done); memset(&req, 0, sizeof(req)); @@ -63,7 +64,7 @@ static void test_get(int use_ssl) { req.path = "/"; req.use_ssl = use_ssl; - grpc_httpcli_get(&req, n_seconds_time(15), &g_em, on_finish, (void *)42); + grpc_httpcli_get(&req, n_seconds_time(15), on_finish, (void *)42); GPR_ASSERT(gpr_event_wait(&g_done, n_seconds_time(20))); } @@ -79,7 +80,7 @@ static void test_post(int use_ssl) { req.path = "/1eamwr21"; req.use_ssl = use_ssl; - grpc_httpcli_post(&req, NULL, 0, n_seconds_time(15), &g_em, on_finish, + grpc_httpcli_post(&req, NULL, 0, n_seconds_time(15), on_finish, (void *)42); GPR_ASSERT(gpr_event_wait(&g_done, n_seconds_time(20))); } @@ -87,7 +88,7 @@ static void test_post(int use_ssl) { int main(int argc, char **argv) { grpc_test_init(argc, argv); - grpc_em_init(&g_em); + grpc_iomgr_init(); test_get(0); test_get(1); @@ -95,7 +96,7 @@ int main(int argc, char **argv) { /* test_post(0); */ /* test_post(1); */ - grpc_em_destroy(&g_em); + grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c new file mode 100644 index 0000000000..0dcd21463d --- /dev/null +++ b/test/core/iomgr/alarm_test.c @@ -0,0 +1,219 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* Test gRPC event manager with a simple TCP upload server and client. */ +#include "src/core/iomgr/alarm.h" + +#include <ctype.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/time.h> +#include <unistd.h> + +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> +#include "test/core/util/test_config.h" + +/* Dummy gRPC callback */ +void no_op_cb(void *arg, grpc_iomgr_cb_status status) {} + +typedef struct { + gpr_cv cv; + gpr_mu mu; + int counter; + int done_success_ctr; + int done_cancel_ctr; + int done; + gpr_event fcb_arg; + grpc_iomgr_cb_status status; +} alarm_arg; + +static void followup_cb(void *arg, grpc_iomgr_cb_status status) { + gpr_event_set((gpr_event *)arg, arg); +} + +/* Called when an alarm expires. */ +static void alarm_cb(void *arg /* alarm_arg */, grpc_iomgr_cb_status status) { + alarm_arg *a = arg; + gpr_mu_lock(&a->mu); + if (status == GRPC_CALLBACK_SUCCESS) { + a->counter++; + a->done_success_ctr++; + } else if (status == GRPC_CALLBACK_CANCELLED) { + a->done_cancel_ctr++; + } else { + GPR_ASSERT(0); + } + a->done = 1; + a->status = status; + gpr_cv_signal(&a->cv); + gpr_mu_unlock(&a->mu); + grpc_iomgr_add_callback(followup_cb, &a->fcb_arg); +} + +/* Test grpc_alarm add and cancel. */ +static void test_grpc_alarm() { + grpc_alarm alarm; + grpc_alarm alarm_to_cancel; + gpr_timespec tv0 = {0, 1}; + /* Timeout on the alarm cond. var, so make big enough to absorb time + deviations. Otherwise, operations after wait will not be properly ordered + */ + gpr_timespec tv1 = gpr_time_from_micros(200000); + gpr_timespec tv2 = {0, 1}; + gpr_timespec alarm_deadline; + gpr_timespec followup_deadline; + + alarm_arg arg; + alarm_arg arg2; + void *fdone; + + grpc_iomgr_init(); + + arg.counter = 0; + arg.status = GRPC_CALLBACK_DO_NOT_USE; + arg.done_success_ctr = 0; + arg.done_cancel_ctr = 0; + arg.done = 0; + gpr_mu_init(&arg.mu); + gpr_cv_init(&arg.cv); + gpr_event_init(&arg.fcb_arg); + + grpc_alarm_init(&alarm, alarm_cb, &arg); + grpc_alarm_add(&alarm, gpr_time_add(tv0, gpr_now())); + + alarm_deadline = gpr_time_add(gpr_now(), tv1); + gpr_mu_lock(&arg.mu); + while (arg.done == 0) { + gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline); + } + gpr_mu_unlock(&arg.mu); + + followup_deadline = gpr_time_add(gpr_now(), tv1); + fdone = gpr_event_wait(&arg.fcb_arg, followup_deadline); + + if (arg.counter != 1) { + gpr_log(GPR_ERROR, "Alarm callback not called"); + GPR_ASSERT(0); + } else if (arg.done_success_ctr != 1) { + gpr_log(GPR_ERROR, "Alarm done callback not called with success"); + GPR_ASSERT(0); + } else if (arg.done_cancel_ctr != 0) { + gpr_log(GPR_ERROR, "Alarm done callback called with cancel"); + GPR_ASSERT(0); + } else if (arg.status == GRPC_CALLBACK_DO_NOT_USE) { + gpr_log(GPR_ERROR, "Alarm callback without status"); + GPR_ASSERT(0); + } else { + gpr_log(GPR_INFO, "Alarm callback called successfully"); + } + + if (fdone != (void *)&arg.fcb_arg) { + gpr_log(GPR_ERROR, "Followup callback #1 not invoked properly %p %p", fdone, + &arg.fcb_arg); + GPR_ASSERT(0); + } + gpr_cv_destroy(&arg.cv); + gpr_mu_destroy(&arg.mu); + + arg2.counter = 0; + arg2.status = GRPC_CALLBACK_DO_NOT_USE; + arg2.done_success_ctr = 0; + arg2.done_cancel_ctr = 0; + arg2.done = 0; + gpr_mu_init(&arg2.mu); + gpr_cv_init(&arg2.cv); + gpr_event_init(&arg2.fcb_arg); + + grpc_alarm_init(&alarm_to_cancel, alarm_cb, &arg2); + grpc_alarm_add(&alarm_to_cancel, gpr_time_add(tv2, gpr_now())); + grpc_alarm_cancel(&alarm_to_cancel); + + alarm_deadline = gpr_time_add(gpr_now(), tv1); + gpr_mu_lock(&arg2.mu); + while (arg2.done == 0) { + gpr_cv_wait(&arg2.cv, &arg2.mu, alarm_deadline); + } + gpr_mu_unlock(&arg2.mu); + + followup_deadline = gpr_time_add(gpr_now(), tv1); + fdone = gpr_event_wait(&arg2.fcb_arg, followup_deadline); + + if (arg2.counter != arg2.done_success_ctr) { + gpr_log(GPR_ERROR, "Alarm callback called but didn't lead to done success"); + GPR_ASSERT(0); + } else if (arg2.done_success_ctr && arg2.done_cancel_ctr) { + gpr_log(GPR_ERROR, "Alarm done callback called with success and cancel"); + GPR_ASSERT(0); + } else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) { + gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times"); + GPR_ASSERT(0); + } else if (arg2.status == GRPC_CALLBACK_DO_NOT_USE) { + gpr_log(GPR_ERROR, "Alarm callback without status"); + GPR_ASSERT(0); + } else if (arg2.done_success_ctr) { + gpr_log(GPR_INFO, "Alarm callback executed before cancel"); + gpr_log(GPR_INFO, "Current value of triggered is %d\n", + (int)alarm_to_cancel.triggered); + } else if (arg2.done_cancel_ctr) { + gpr_log(GPR_INFO, "Alarm callback canceled"); + gpr_log(GPR_INFO, "Current value of triggered is %d\n", + (int)alarm_to_cancel.triggered); + } else { + gpr_log(GPR_ERROR, "Alarm cancel test should not be here"); + GPR_ASSERT(0); + } + + if (fdone != (void *)&arg2.fcb_arg) { + gpr_log(GPR_ERROR, "Followup callback #2 not invoked properly %p %p", fdone, + &arg2.fcb_arg); + GPR_ASSERT(0); + } + gpr_cv_destroy(&arg2.cv); + gpr_mu_destroy(&arg2.mu); + + grpc_iomgr_shutdown(); +} + +int main(int argc, char **argv) { + grpc_test_init(argc, argv); + test_grpc_alarm(); + return 0; +} diff --git a/test/core/eventmanager/em_test.c b/test/core/iomgr/fd_posix_test.c index 274edc3231..4d4461ec6c 100644 --- a/test/core/eventmanager/em_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -32,7 +32,7 @@ */ /* Test gRPC event manager with a simple TCP upload server and client. */ -#include "src/core/eventmanager/em.h" +#include "src/core/iomgr/iomgr_libevent.h" #include <ctype.h> #include <errno.h> @@ -92,8 +92,7 @@ void no_op_cb(void *arg, enum grpc_em_cb_status status) {} /* An upload server. */ typedef struct { - grpc_em em; /* event manger used by the sever */ - grpc_em_fd em_fd; /* listening fd */ + grpc_fd *em_fd; /* listening fd */ ssize_t read_bytes_total; /* total number of received bytes */ gpr_mu mu; /* protect done and done_cv */ gpr_cv done_cv; /* signaled when a server finishes serving */ @@ -101,7 +100,6 @@ typedef struct { } server; static void server_init(server *sv) { - GPR_ASSERT(grpc_em_init(&sv->em) == GRPC_EM_OK); sv->read_bytes_total = 0; gpr_mu_init(&sv->mu); gpr_cv_init(&sv->done_cv); @@ -112,7 +110,7 @@ static void server_init(server *sv) { Created when a new upload request arrives in the server. */ typedef struct { server *sv; /* not owned by a single session */ - grpc_em_fd em_fd; /* fd to read upload bytes */ + grpc_fd *em_fd; /* fd to read upload bytes */ char read_buf[BUF_SIZE]; /* buffer to store upload bytes */ } session; @@ -122,17 +120,17 @@ static void session_shutdown_cb(void *arg, /*session*/ enum grpc_em_cb_status status) { session *se = arg; server *sv = se->sv; - grpc_em_fd_destroy(&se->em_fd); + grpc_fd_destroy(se->em_fd); gpr_free(se); /* Start to shutdown listen fd. */ - grpc_em_fd_shutdown(&sv->em_fd); + grpc_fd_shutdown(sv->em_fd); } /* Called when data become readable in a session. */ static void session_read_cb(void *arg, /*session*/ enum grpc_em_cb_status status) { session *se = arg; - int fd = grpc_em_fd_get(&se->em_fd); + int fd = grpc_fd_get(se->em_fd); ssize_t read_once = 0; ssize_t read_total = 0; @@ -153,8 +151,8 @@ static void session_read_cb(void *arg, /*session*/ It is possible to read nothing due to spurious edge event or data has been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ if (read_once == 0) { - grpc_em_fd_shutdown(&se->em_fd); - grpc_em_fd_notify_on_read(&se->em_fd, session_read_cb, se, gpr_inf_future); + grpc_fd_shutdown(se->em_fd); + grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future); } else if (read_once == -1) { if (errno == EAGAIN) { /* An edge triggered event is cached in the kernel until next poll. @@ -165,8 +163,8 @@ static void session_read_cb(void *arg, /*session*/ TODO(chenw): in multi-threaded version, callback and polling can be run in different threads. polling may catch a persist read edge event before notify_on_read is called. */ - GPR_ASSERT(grpc_em_fd_notify_on_read(&se->em_fd, session_read_cb, se, - gpr_inf_future) == GRPC_EM_OK); + GPR_ASSERT(grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, + gpr_inf_future)); } else { gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); GPR_ASSERT(0); @@ -180,7 +178,7 @@ static void listen_shutdown_cb(void *arg /*server*/, enum grpc_em_cb_status status) { server *sv = arg; - grpc_em_fd_destroy(&sv->em_fd); + grpc_fd_destroy(sv->em_fd); gpr_mu_lock(&sv->mu); sv->done = 1; @@ -197,26 +195,26 @@ static void listen_cb(void *arg, /*=sv_arg*/ session *se; struct sockaddr_storage ss; socklen_t slen = sizeof(ss); - struct grpc_em_fd *listen_em_fd = &sv->em_fd; + struct grpc_fd *listen_em_fd = sv->em_fd; if (status == GRPC_CALLBACK_CANCELLED) { listen_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS); return; } - fd = accept(grpc_em_fd_get(listen_em_fd), (struct sockaddr *)&ss, &slen); + fd = accept(grpc_fd_get(listen_em_fd), (struct sockaddr *)&ss, &slen); GPR_ASSERT(fd >= 0); GPR_ASSERT(fd < FD_SETSIZE); flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK); se = gpr_malloc(sizeof(*se)); se->sv = sv; - GPR_ASSERT(grpc_em_fd_init(&se->em_fd, &sv->em, fd) == GRPC_EM_OK); - GPR_ASSERT(grpc_em_fd_notify_on_read(&se->em_fd, session_read_cb, se, - gpr_inf_future) == GRPC_EM_OK); + se->em_fd = grpc_fd_create(fd); + GPR_ASSERT( + grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future)); - GPR_ASSERT(grpc_em_fd_notify_on_read(listen_em_fd, listen_cb, sv, - gpr_inf_future) == GRPC_EM_OK); + GPR_ASSERT( + grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv, gpr_inf_future)); } /* Max number of connections pending to be accepted by listen(). */ @@ -235,14 +233,13 @@ static int server_start(server *sv) { create_test_socket(port, &fd, &sin); addr_len = sizeof(sin); GPR_ASSERT(bind(fd, (struct sockaddr *)&sin, addr_len) == 0); - GPR_ASSERT(getsockname(fd, (struct sockaddr *)&sin, &addr_len) == GRPC_EM_OK); + GPR_ASSERT(getsockname(fd, (struct sockaddr *)&sin, &addr_len) == 0); port = ntohs(sin.sin_port); GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); - GPR_ASSERT(grpc_em_fd_init(&sv->em_fd, &sv->em, fd) == GRPC_EM_OK); + sv->em_fd = grpc_fd_create(fd); /* Register to be interested in reading from listen_fd. */ - GPR_ASSERT(grpc_em_fd_notify_on_read(&sv->em_fd, listen_cb, sv, - gpr_inf_future) == GRPC_EM_OK); + GPR_ASSERT(grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv, gpr_inf_future)); return port; } @@ -255,8 +252,6 @@ static void server_wait_and_shutdown(server *sv) { gpr_mu_destroy(&sv->mu); gpr_cv_destroy(&sv->done_cv); - - GPR_ASSERT(grpc_em_destroy(&sv->em) == GRPC_EM_OK); } /* ===An upload client to test notify_on_write=== */ @@ -268,8 +263,7 @@ static void server_wait_and_shutdown(server *sv) { /* An upload client. */ typedef struct { - grpc_em em; - grpc_em_fd em_fd; + grpc_fd *em_fd; char write_buf[CLIENT_WRITE_BUF_SIZE]; ssize_t write_bytes_total; /* Number of times that the client fills up the write buffer and calls @@ -282,7 +276,6 @@ typedef struct { } client; static void client_init(client *cl) { - GPR_ASSERT(grpc_em_init(&cl->em) == GRPC_EM_OK); memset(cl->write_buf, 0, sizeof(cl->write_buf)); cl->write_bytes_total = 0; cl->client_write_cnt = 0; @@ -295,7 +288,7 @@ static void client_init(client *cl) { static void client_session_shutdown_cb(void *arg /*client*/, enum grpc_em_cb_status status) { client *cl = arg; - grpc_em_fd_destroy(&cl->em_fd); + grpc_fd_destroy(cl->em_fd); gpr_mu_lock(&cl->mu); cl->done = 1; gpr_cv_signal(&cl->done_cv); @@ -306,7 +299,7 @@ static void client_session_shutdown_cb(void *arg /*client*/, static void client_session_write(void *arg, /*client*/ enum grpc_em_cb_status status) { client *cl = arg; - int fd = grpc_em_fd_get(&cl->em_fd); + int fd = grpc_fd_get(cl->em_fd); ssize_t write_once = 0; if (status == GRPC_CALLBACK_CANCELLED) { @@ -322,14 +315,14 @@ static void client_session_write(void *arg, /*client*/ if (errno == EAGAIN) { gpr_mu_lock(&cl->mu); if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { - GPR_ASSERT(grpc_em_fd_notify_on_write(&cl->em_fd, client_session_write, - cl, gpr_inf_future) == GRPC_EM_OK); + GPR_ASSERT(grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl, + gpr_inf_future)); cl->client_write_cnt++; } else { close(fd); - grpc_em_fd_shutdown(&cl->em_fd); - grpc_em_fd_notify_on_write(&cl->em_fd, client_session_write, cl, - gpr_inf_future); + grpc_fd_shutdown(cl->em_fd); + grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl, + gpr_inf_future); } gpr_mu_unlock(&cl->mu); } else { @@ -349,7 +342,7 @@ static void client_start(client *cl, int port) { GPR_ASSERT(0); } - GPR_ASSERT(grpc_em_fd_init(&cl->em_fd, &cl->em, fd) == GRPC_EM_OK); + cl->em_fd = grpc_fd_create(fd); client_session_write(cl, GRPC_CALLBACK_SUCCESS); } @@ -362,14 +355,12 @@ static void client_wait_and_shutdown(client *cl) { gpr_mu_destroy(&cl->mu); gpr_cv_destroy(&cl->done_cv); - - GPR_ASSERT(grpc_em_destroy(&cl->em) == GRPC_EM_OK); } -/* Test grpc_em_fd. Start an upload server and client, upload a stream of +/* Test grpc_fd. Start an upload server and client, upload a stream of bytes from the client to the server, and verify that the total number of sent bytes is equal to the total number of received bytes. */ -static void test_grpc_em_fd() { +static void test_grpc_fd() { server sv; client cl; int port; @@ -425,9 +416,8 @@ static void second_read_callback(void *arg /* fd_change_data */, Note that we have two different but almost identical callbacks above -- the point is to have two different function pointers and two different data pointers and make sure that changing both really works. */ -static void test_grpc_em_fd_change() { - grpc_em em; - grpc_em_fd em_fd; +static void test_grpc_fd_change() { + grpc_fd *em_fd; fd_change_data a, b; int flags; int sv[2]; @@ -443,11 +433,10 @@ static void test_grpc_em_fd_change() { flags = fcntl(sv[1], F_GETFL, 0); GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); - grpc_em_init(&em); - grpc_em_fd_init(&em_fd, &em, sv[0]); + em_fd = grpc_fd_create(sv[0]); /* Register the first callback, then make its FD readable */ - grpc_em_fd_notify_on_read(&em_fd, first_read_callback, &a, gpr_inf_future); + grpc_fd_notify_on_read(em_fd, first_read_callback, &a, gpr_inf_future); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -466,7 +455,7 @@ static void test_grpc_em_fd_change() { /* Now register a second callback with distinct change data, and do the same thing again. */ - grpc_em_fd_notify_on_read(&em_fd, second_read_callback, &b, gpr_inf_future); + grpc_fd_notify_on_read(em_fd, second_read_callback, &b, gpr_inf_future); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -479,8 +468,7 @@ static void test_grpc_em_fd_change() { GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(&b.mu); - grpc_em_fd_destroy(&em_fd); - grpc_em_destroy(&em); + grpc_fd_destroy(em_fd); destroy_change_data(&a); destroy_change_data(&b); close(sv[0]); @@ -495,9 +483,8 @@ void timeout_callback(void *arg, enum grpc_em_cb_status status) { } } -void test_grpc_em_fd_notify_timeout() { - grpc_em em; - grpc_em_fd em_fd; +void test_grpc_fd_notify_timeout() { + grpc_fd *em_fd; gpr_event ev; int flags; int sv[2]; @@ -512,206 +499,26 @@ void test_grpc_em_fd_notify_timeout() { flags = fcntl(sv[1], F_GETFL, 0); GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); - grpc_em_init(&em); - grpc_em_fd_init(&em_fd, &em, sv[0]); + em_fd = grpc_fd_create(sv[0]); timeout = gpr_time_from_micros(1000000); deadline = gpr_time_add(gpr_now(), timeout); - grpc_em_fd_notify_on_read(&em_fd, timeout_callback, &ev, deadline); + grpc_fd_notify_on_read(em_fd, timeout_callback, &ev, deadline); GPR_ASSERT(gpr_event_wait(&ev, gpr_time_add(deadline, timeout))); GPR_ASSERT(gpr_event_get(&ev) == (void *)1); - grpc_em_fd_destroy(&em_fd); - grpc_em_destroy(&em); + grpc_fd_destroy(em_fd); close(sv[1]); } -typedef struct { - grpc_em *em; - gpr_cv cv; - gpr_mu mu; - int counter; - int done_success_ctr; - int done_cancel_ctr; - int done; - gpr_event fcb_arg; - grpc_em_cb_status status; -} alarm_arg; - -static void followup_cb(void *arg, grpc_em_cb_status status) { - gpr_event_set((gpr_event *)arg, arg); -} - -/* Called when an alarm expires. */ -static void alarm_cb(void *arg /* alarm_arg */, grpc_em_cb_status status) { - alarm_arg *a = arg; - gpr_mu_lock(&a->mu); - if (status == GRPC_CALLBACK_SUCCESS) { - a->counter++; - a->done_success_ctr++; - } else if (status == GRPC_CALLBACK_CANCELLED) { - a->done_cancel_ctr++; - } else { - GPR_ASSERT(0); - } - a->done = 1; - a->status = status; - gpr_cv_signal(&a->cv); - gpr_mu_unlock(&a->mu); - grpc_em_add_callback(a->em, followup_cb, &a->fcb_arg); -} - -/* Test grpc_em_alarm add and cancel. */ -static void test_grpc_em_alarm() { - struct grpc_em em; - struct grpc_em_alarm alarm; - struct grpc_em_alarm alarm_to_cancel; - gpr_timespec tv0 = {0, 1}; - /* Timeout on the alarm cond. var, so make big enough to absorb time - deviations. Otherwise, operations after wait will not be properly ordered - */ - gpr_timespec tv1 = gpr_time_from_micros(200000); - gpr_timespec tv2 = {0, 1}; - gpr_timespec alarm_deadline; - gpr_timespec followup_deadline; - - alarm_arg arg; - alarm_arg arg2; - void *fdone; - - GPR_ASSERT(grpc_em_init(&em) == GRPC_EM_OK); - - arg.em = &em; - arg.counter = 0; - arg.status = GRPC_CALLBACK_DO_NOT_USE; - arg.done_success_ctr = 0; - arg.done_cancel_ctr = 0; - arg.done = 0; - gpr_mu_init(&arg.mu); - gpr_cv_init(&arg.cv); - gpr_event_init(&arg.fcb_arg); - - GPR_ASSERT(grpc_em_alarm_init(&alarm, &em, alarm_cb, &arg) == GRPC_EM_OK); - GPR_ASSERT(grpc_em_alarm_add(&alarm, gpr_time_add(tv0, gpr_now())) == - GRPC_EM_OK); - - alarm_deadline = gpr_time_add(gpr_now(), tv1); - gpr_mu_lock(&arg.mu); - while (arg.done == 0) { - gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline); - } - gpr_mu_unlock(&arg.mu); - - followup_deadline = gpr_time_add(gpr_now(), tv1); - fdone = gpr_event_wait(&arg.fcb_arg, followup_deadline); - - if (arg.counter != 1) { - gpr_log(GPR_ERROR, "Alarm callback not called"); - GPR_ASSERT(0); - } else if (arg.done_success_ctr != 1) { - gpr_log(GPR_ERROR, "Alarm done callback not called with success"); - GPR_ASSERT(0); - } else if (arg.done_cancel_ctr != 0) { - gpr_log(GPR_ERROR, "Alarm done callback called with cancel"); - GPR_ASSERT(0); - } else if (arg.status == GRPC_CALLBACK_DO_NOT_USE) { - gpr_log(GPR_ERROR, "Alarm callback without status"); - GPR_ASSERT(0); - } else { - gpr_log(GPR_INFO, "Alarm callback called successfully"); - } - - if (fdone != (void *)&arg.fcb_arg) { - gpr_log(GPR_ERROR, "Followup callback #1 not invoked properly %p %p", fdone, - &arg.fcb_arg); - GPR_ASSERT(0); - } - gpr_cv_destroy(&arg.cv); - gpr_mu_destroy(&arg.mu); - - arg2.em = &em; - arg2.counter = 0; - arg2.status = GRPC_CALLBACK_DO_NOT_USE; - arg2.done_success_ctr = 0; - arg2.done_cancel_ctr = 0; - arg2.done = 0; - gpr_mu_init(&arg2.mu); - gpr_cv_init(&arg2.cv); - gpr_event_init(&arg2.fcb_arg); - - GPR_ASSERT(grpc_em_alarm_init(&alarm_to_cancel, &em, alarm_cb, &arg2) == - GRPC_EM_OK); - GPR_ASSERT(grpc_em_alarm_add(&alarm_to_cancel, - gpr_time_add(tv2, gpr_now())) == GRPC_EM_OK); - switch (grpc_em_alarm_cancel(&alarm_to_cancel)) { - case GRPC_EM_OK: - gpr_log(GPR_INFO, "Alarm cancel succeeded"); - break; - case GRPC_EM_ERROR: - gpr_log(GPR_ERROR, "Alarm cancel failed"); - GPR_ASSERT(0); - break; - case GRPC_EM_INVALID_ARGUMENTS: - gpr_log(GPR_ERROR, "Alarm cancel failed with bad response code"); - gpr_log(GPR_ERROR, "Current value of triggered is %d\n", - (int)alarm_to_cancel.triggered); - GPR_ASSERT(0); - break; - } - - alarm_deadline = gpr_time_add(gpr_now(), tv1); - gpr_mu_lock(&arg2.mu); - while (arg2.done == 0) { - gpr_cv_wait(&arg2.cv, &arg2.mu, alarm_deadline); - } - gpr_mu_unlock(&arg2.mu); - - followup_deadline = gpr_time_add(gpr_now(), tv1); - fdone = gpr_event_wait(&arg2.fcb_arg, followup_deadline); - - if (arg2.counter != arg2.done_success_ctr) { - gpr_log(GPR_ERROR, "Alarm callback called but didn't lead to done success"); - GPR_ASSERT(0); - } else if (arg2.done_success_ctr && arg2.done_cancel_ctr) { - gpr_log(GPR_ERROR, "Alarm done callback called with success and cancel"); - GPR_ASSERT(0); - } else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) { - gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times"); - GPR_ASSERT(0); - } else if (arg2.status == GRPC_CALLBACK_DO_NOT_USE) { - gpr_log(GPR_ERROR, "Alarm callback without status"); - GPR_ASSERT(0); - } else if (arg2.done_success_ctr) { - gpr_log(GPR_INFO, "Alarm callback executed before cancel"); - gpr_log(GPR_INFO, "Current value of triggered is %d\n", - (int)alarm_to_cancel.triggered); - } else if (arg2.done_cancel_ctr) { - gpr_log(GPR_INFO, "Alarm callback canceled"); - gpr_log(GPR_INFO, "Current value of triggered is %d\n", - (int)alarm_to_cancel.triggered); - } else { - gpr_log(GPR_ERROR, "Alarm cancel test should not be here"); - GPR_ASSERT(0); - } - - if (fdone != (void *)&arg2.fcb_arg) { - gpr_log(GPR_ERROR, "Followup callback #2 not invoked properly %p %p", fdone, - &arg2.fcb_arg); - GPR_ASSERT(0); - } - gpr_cv_destroy(&arg2.cv); - gpr_mu_destroy(&arg2.mu); - - GPR_ASSERT(grpc_em_destroy(&em) == GRPC_EM_OK); -} - int main(int argc, char **argv) { grpc_test_init(argc, argv); - test_grpc_em_alarm(); - test_grpc_em_fd(); - test_grpc_em_fd_change(); - test_grpc_em_fd_notify_timeout(); + grpc_iomgr_init(); + test_grpc_fd(); + test_grpc_fd_change(); + test_grpc_fd_notify_timeout(); + grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/endpoint/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c index 1e208d3699..99e3119581 100644 --- a/test/core/endpoint/resolve_address_test.c +++ b/test/core/iomgr/resolve_address_test.c @@ -31,7 +31,7 @@ * */ -#include "src/core/endpoint/resolve_address.h" +#include "src/core/iomgr/resolve_address.h" #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> diff --git a/test/core/endpoint/socket_utils_test.c b/test/core/iomgr/sockaddr_utils_test.c index ef6ac32c22..8cd9fb660f 100644 --- a/test/core/endpoint/socket_utils_test.c +++ b/test/core/iomgr/sockaddr_utils_test.c @@ -31,7 +31,7 @@ * */ -#include "src/core/endpoint/socket_utils.h" +#include "src/core/iomgr/sockaddr_utils.h" #include <errno.h> #include <netinet/in.h> diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c new file mode 100644 index 0000000000..cb1cd0bc16 --- /dev/null +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -0,0 +1,176 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/tcp_client.h" + +#include <errno.h> +#include <netinet/in.h> +#include <string.h> +#include <sys/socket.h> +#include <unistd.h> + +#include "src/core/iomgr/iomgr.h" +#include <grpc/support/log.h> +#include <grpc/support/time.h> + +static gpr_timespec test_deadline() { + return gpr_time_add(gpr_now(), gpr_time_from_micros(1000000)); +} + +static void must_succeed(void *arg, grpc_endpoint *tcp) { + GPR_ASSERT(tcp); + grpc_endpoint_shutdown(tcp); + grpc_endpoint_destroy(tcp); + gpr_event_set(arg, (void *)1); +} + +static void must_fail(void *arg, grpc_endpoint *tcp) { + GPR_ASSERT(!tcp); + gpr_event_set(arg, (void *)1); +} + +void test_succeeds() { + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + int svr_fd; + int r; + gpr_event ev; + + gpr_event_init(&ev); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + + /* create a dummy server */ + svr_fd = socket(AF_INET, SOCK_STREAM, 0); + GPR_ASSERT(svr_fd >= 0); + GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len)); + GPR_ASSERT(0 == listen(svr_fd, 1)); + + /* connect to it */ + GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); + grpc_tcp_client_connect(must_succeed, &ev, (struct sockaddr *)&addr, addr_len, + gpr_inf_future); + + /* await the connection */ + do { + addr_len = sizeof(addr); + r = accept(svr_fd, (struct sockaddr *)&addr, &addr_len); + } while (r == -1 && errno == EINTR); + GPR_ASSERT(r >= 0); + close(r); + + /* wait for the connection callback to finish */ + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); +} + +void test_fails() { + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + gpr_event ev; + + gpr_event_init(&ev); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + + /* connect to a broken address */ + grpc_tcp_client_connect(must_fail, &ev, (struct sockaddr *)&addr, addr_len, + gpr_inf_future); + + /* wait for the connection callback to finish */ + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); +} + +void test_times_out() { + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + int svr_fd; +#define NUM_CLIENT_CONNECTS 10 + int client_fd[NUM_CLIENT_CONNECTS]; + int i; + int r; + gpr_event ev; + gpr_timespec connect_deadline; + + gpr_event_init(&ev); + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + + /* create a dummy server */ + svr_fd = socket(AF_INET, SOCK_STREAM, 0); + GPR_ASSERT(svr_fd >= 0); + GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len)); + GPR_ASSERT(0 == listen(svr_fd, 1)); + /* Get its address */ + GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); + + /* tie up the listen buffer, which is somewhat arbitrarily sized. */ + for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) { + client_fd[i] = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + do { + r = connect(client_fd[i], (struct sockaddr *)&addr, addr_len); + } while (r == -1 && errno == EINTR); + GPR_ASSERT(r < 0); + GPR_ASSERT(errno == EWOULDBLOCK || errno == EINPROGRESS); + } + + /* connect to dummy server address */ + + connect_deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(1000000)); + + grpc_tcp_client_connect(must_fail, &ev, (struct sockaddr *)&addr, addr_len, + connect_deadline); + /* Make sure the event doesn't trigger early */ + GPR_ASSERT(!gpr_event_wait( + &ev, gpr_time_add(gpr_now(), gpr_time_from_micros(500000)))); + /* Now wait until it should have triggered */ + sleep(1); + + /* wait for the connection callback to finish */ + GPR_ASSERT(gpr_event_wait(&ev, test_deadline())); + close(svr_fd); + for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) { + close(client_fd[i]); + } +} + +int main(void) { + grpc_iomgr_init(); + test_succeeds(); + test_fails(); + test_times_out(); + grpc_iomgr_shutdown(); + return 0; +} diff --git a/test/core/endpoint/tcp_test.c b/test/core/iomgr/tcp_posix_test.c index c703f92f68..52856b6b8a 100644 --- a/test/core/endpoint/tcp_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -31,7 +31,7 @@ * */ -#include "src/core/endpoint/tcp.h" +#include "src/core/iomgr/tcp_posix.h" #include <errno.h> #include <fcntl.h> @@ -41,7 +41,6 @@ #include <sys/socket.h> #include <unistd.h> -#include "src/core/eventmanager/em.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/time.h> @@ -65,8 +64,6 @@ */ -grpc_em g_em; - static void create_sockets(int sv[2]) { int flags; GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); @@ -165,7 +162,6 @@ static void read_cb(void *user_data, gpr_slice *slices, size_t nslices, /* Write to a socket, then read from it using the grpc_tcp API. */ static void read_test(ssize_t num_bytes, ssize_t slice_size) { int sv[2]; - grpc_em em; grpc_endpoint *ep; struct read_socket_state state; ssize_t written_bytes; @@ -176,9 +172,8 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { slice_size); create_sockets(sv); - grpc_em_init(&em); - ep = grpc_tcp_create_dbg(sv[1], &em, slice_size); + ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -202,7 +197,6 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { grpc_endpoint_destroy(ep); - grpc_em_destroy(&em); gpr_mu_destroy(&state.mu); gpr_cv_destroy(&state.cv); } @@ -211,7 +205,6 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) { API. */ static void large_read_test(ssize_t slice_size) { int sv[2]; - grpc_em em; grpc_endpoint *ep; struct read_socket_state state; ssize_t written_bytes; @@ -221,9 +214,8 @@ static void large_read_test(ssize_t slice_size) { gpr_log(GPR_INFO, "Start large read test, slice size %d", slice_size); create_sockets(sv); - grpc_em_init(&em); - ep = grpc_tcp_create_dbg(sv[1], &em, slice_size); + ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size); written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -247,7 +239,6 @@ static void large_read_test(ssize_t slice_size) { grpc_endpoint_destroy(ep); - grpc_em_destroy(&em); gpr_mu_destroy(&state.mu); gpr_cv_destroy(&state.cv); } @@ -349,7 +340,6 @@ static ssize_t drain_socket(int fd) { socket in parallel with the read. */ static void write_test(ssize_t num_bytes, ssize_t slice_size) { int sv[2]; - grpc_em em; grpc_endpoint *ep; struct write_socket_state state; ssize_t read_bytes; @@ -363,9 +353,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { slice_size); create_sockets(sv); - grpc_em_init(&em); - ep = grpc_tcp_create(sv[1], &em); + ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE); gpr_mu_init(&state.mu); gpr_cv_init(&state.cv); @@ -392,7 +381,6 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) { } grpc_endpoint_destroy(ep); - grpc_em_destroy(&em); gpr_mu_destroy(&state.mu); gpr_cv_destroy(&state.cv); gpr_free(slices); @@ -410,7 +398,6 @@ static void read_done_for_write_error(void *ud, gpr_slice *slices, socket in parallel with the read. */ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { int sv[2]; - grpc_em em; grpc_endpoint *ep; struct write_socket_state state; size_t num_blocks; @@ -423,9 +410,8 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { num_bytes, slice_size); create_sockets(sv); - grpc_em_init(&em); - ep = grpc_tcp_create(sv[1], &em); + ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE); close(sv[0]); gpr_mu_init(&state.mu); @@ -456,7 +442,6 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) { } grpc_endpoint_destroy(ep); - grpc_em_destroy(&em); gpr_mu_destroy(&state.mu); gpr_cv_destroy(&state.cv); free(slices); @@ -487,7 +472,7 @@ void run_tests() { } } -static void clean_up() { grpc_em_destroy(&g_em); } +static void clean_up() {} static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( size_t slice_size) { @@ -495,9 +480,8 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( grpc_endpoint_test_fixture f; create_sockets(sv); - grpc_em_init(&g_em); - f.client_ep = grpc_tcp_create_dbg(sv[0], &g_em, slice_size); - f.server_ep = grpc_tcp_create(sv[1], &g_em); + f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0]), slice_size); + f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size); return f; } @@ -508,10 +492,12 @@ static grpc_endpoint_test_config configs[] = { int main(int argc, char **argv) { grpc_test_init(argc, argv); + grpc_iomgr_init(); /* disable SIGPIPE */ signal(SIGPIPE, SIG_IGN); run_tests(); grpc_endpoint_tests(configs[0]); + grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/endpoint/tcp_server_test.c b/test/core/iomgr/tcp_server_posix_test.c index 62089152d0..cb77a88062 100644 --- a/test/core/endpoint/tcp_server_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -31,8 +31,8 @@ * */ -#include "src/core/endpoint/tcp_server.h" -#include "src/core/eventmanager/em.h" +#include "src/core/iomgr/tcp_server.h" +#include "src/core/iomgr/iomgr.h" #include <grpc/support/log.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> @@ -44,8 +44,6 @@ #define LOG_TEST() gpr_log(GPR_INFO, "%s", __FUNCTION__) -static grpc_em em; - static gpr_mu mu; static gpr_cv cv; static int nconnects = 0; @@ -61,12 +59,12 @@ static void on_connect(void *arg, grpc_endpoint *tcp) { } static void test_no_op() { - grpc_tcp_server *s = grpc_tcp_server_create(&em); + grpc_tcp_server *s = grpc_tcp_server_create(); grpc_tcp_server_destroy(s); } static void test_no_op_with_start() { - grpc_tcp_server *s = grpc_tcp_server_create(&em); + grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST(); grpc_tcp_server_start(s, on_connect, NULL); grpc_tcp_server_destroy(s); @@ -74,7 +72,7 @@ static void test_no_op_with_start() { static void test_no_op_with_port() { struct sockaddr_in addr; - grpc_tcp_server *s = grpc_tcp_server_create(&em); + grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST(); memset(&addr, 0, sizeof(addr)); @@ -87,7 +85,7 @@ static void test_no_op_with_port() { static void test_no_op_with_port_and_start() { struct sockaddr_in addr; - grpc_tcp_server *s = grpc_tcp_server_create(&em); + grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST(); memset(&addr, 0, sizeof(addr)); @@ -104,7 +102,7 @@ static void test_connect(int n) { struct sockaddr_storage addr; socklen_t addr_len = sizeof(addr); int svrfd, clifd; - grpc_tcp_server *s = grpc_tcp_server_create(&em); + grpc_tcp_server *s = grpc_tcp_server_create(); int nconnects_before; gpr_timespec deadline; int i; @@ -151,7 +149,7 @@ static void test_connect(int n) { int main(int argc, char **argv) { grpc_test_init(argc, argv); - grpc_em_init(&em); + grpc_iomgr_init(); gpr_mu_init(&mu); gpr_cv_init(&cv); @@ -162,7 +160,7 @@ int main(int argc, char **argv) { test_connect(1); test_connect(10); - grpc_em_destroy(&em); + grpc_iomgr_shutdown(); gpr_mu_destroy(&mu); gpr_cv_destroy(&cv); return 0; diff --git a/test/core/network_benchmarks/low_level_ping_pong.c b/test/core/network_benchmarks/low_level_ping_pong.c index 93c66a9ecb..543fb27ec1 100644 --- a/test/core/network_benchmarks/low_level_ping_pong.c +++ b/test/core/network_benchmarks/low_level_ping_pong.c @@ -49,7 +49,7 @@ #endif #include <sys/socket.h> -#include "src/core/endpoint/socket_utils.h" +#include "src/core/iomgr/socket_utils_posix.h" #include <grpc/support/cmdline.h> #include <grpc/support/histogram.h> #include <grpc/support/log.h> diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c index 6df159f697..77decdacea 100644 --- a/test/core/surface/completion_queue_test.c +++ b/test/core/surface/completion_queue_test.c @@ -33,12 +33,12 @@ #include "src/core/surface/completion_queue.h" +#include "src/core/iomgr/iomgr.h" #include <grpc/support/alloc.h> #include <grpc/support/log.h> #include <grpc/support/thd.h> #include <grpc/support/time.h> #include <grpc/support/useful.h> -#include "src/core/surface/surface_em.h" #include "test/core/util/test_config.h" #define LOG_TEST() gpr_log(GPR_INFO, "%s", __FUNCTION__) @@ -417,7 +417,7 @@ static void test_threading(int producers, int consumers) { int main(int argc, char **argv) { grpc_test_init(argc, argv); - grpc_surface_em_init(); + grpc_iomgr_init(); test_no_op(); test_wait_empty(); test_cq_end_read(); @@ -430,6 +430,6 @@ int main(int argc, char **argv) { test_threading(1, 10); test_threading(10, 1); test_threading(10, 10); - grpc_surface_em_shutdown(); + grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/transport/chttp2_transport_end2end_test.c b/test/core/transport/chttp2_transport_end2end_test.c index 4a16789fbf..30d2a17440 100644 --- a/test/core/transport/chttp2_transport_end2end_test.c +++ b/test/core/transport/chttp2_transport_end2end_test.c @@ -42,38 +42,23 @@ #include <unistd.h> #include "test/core/util/test_config.h" -#include "src/core/eventmanager/em.h" +#include "src/core/iomgr/iomgr.h" +#include "src/core/iomgr/endpoint_pair.h" #include "src/core/transport/chttp2_transport.h" #include <grpc/support/log.h> -static grpc_em em; - -static void create_sockets(int sv[2]) { - int flags; - GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); - flags = fcntl(sv[0], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); - flags = fcntl(sv[1], F_GETFL, 0); - GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); -} - /* Wrapper to create an http2 transport pair */ static int create_http2_transport_for_test( grpc_transport_setup_callback client_setup_transport, void *client_setup_arg, grpc_transport_setup_callback server_setup_transport, void *server_setup_arg, size_t slice_size, grpc_mdctx *mdctx) { - int sv[2]; - grpc_endpoint *svr_ep, *cli_ep; - - create_sockets(sv); - svr_ep = grpc_tcp_create_dbg(sv[1], &em, slice_size); - cli_ep = grpc_tcp_create_dbg(sv[0], &em, slice_size); + grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair(1); grpc_create_chttp2_transport(client_setup_transport, client_setup_arg, NULL, - cli_ep, NULL, 0, mdctx, 1); + p.client, NULL, 0, mdctx, 1); grpc_create_chttp2_transport(server_setup_transport, server_setup_arg, NULL, - svr_ep, NULL, 0, mdctx, 0); + p.server, NULL, 0, mdctx, 0); return 0; } @@ -126,13 +111,13 @@ int main(int argc, char **argv) { signal(SIGPIPE, SIG_IGN); grpc_test_init(argc, argv); - grpc_em_init(&em); + grpc_iomgr_init(); for (i = 0; i < sizeof(fixture_configs) / sizeof(*fixture_configs); i++) { grpc_transport_end2end_tests(&fixture_configs[i]); } - grpc_em_destroy(&em); + grpc_iomgr_shutdown(); gpr_log(GPR_INFO, "exiting"); return 0; |