aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/core/end2end/dualstack_socket_test.c33
-rw-r--r--test/core/end2end/fixtures/chttp2_fake_security.c5
-rw-r--r--test/core/end2end/fixtures/chttp2_fullstack.c2
-rw-r--r--test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c5
-rw-r--r--test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c6
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair.c43
-rw-r--r--test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c43
-rw-r--r--test/core/endpoint/secure_endpoint_test.c36
-rw-r--r--test/core/eventmanager/em_pipe_test.c198
-rw-r--r--test/core/httpcli/httpcli_test.c13
-rw-r--r--test/core/iomgr/alarm_test.c219
-rw-r--r--test/core/iomgr/fd_posix_test.c (renamed from test/core/eventmanager/em_test.c)289
-rw-r--r--test/core/iomgr/resolve_address_test.c (renamed from test/core/endpoint/resolve_address_test.c)2
-rw-r--r--test/core/iomgr/sockaddr_utils_test.c (renamed from test/core/endpoint/socket_utils_test.c)2
-rw-r--r--test/core/iomgr/tcp_client_posix_test.c176
-rw-r--r--test/core/iomgr/tcp_posix_test.c (renamed from test/core/endpoint/tcp_test.c)34
-rw-r--r--test/core/iomgr/tcp_server_posix_test.c (renamed from test/core/endpoint/tcp_server_test.c)20
-rw-r--r--test/core/network_benchmarks/low_level_ping_pong.c2
-rw-r--r--test/core/surface/completion_queue_test.c6
-rw-r--r--test/core/transport/chttp2_transport_end2end_test.c29
20 files changed, 526 insertions, 637 deletions
diff --git a/test/core/end2end/dualstack_socket_test.c b/test/core/end2end/dualstack_socket_test.c
index 4813672104..e127c61b25 100644
--- a/test/core/end2end/dualstack_socket_test.c
+++ b/test/core/end2end/dualstack_socket_test.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/endpoint/socket_utils.h"
+#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
@@ -165,16 +165,10 @@ void test_connect(const char *server_host, const char *client_host, int port,
grpc_completion_queue_shutdown(server_cq);
drain_cq(server_cq);
grpc_completion_queue_destroy(server_cq);
- /* TODO(klempner): We need to give the EM time to actually close the listening
- socket, or later tests will fail to bind to this port. We should fix this
- by adding an API to EM to get notified when this happens and having it
- prevent listener teardown. */
- gpr_sleep_until(gpr_time_add(gpr_now(), gpr_time_from_millis(250)));
}
int main(int argc, char **argv) {
int i;
- int port = grpc_pick_unused_port_or_die();
grpc_test_init(argc, argv);
grpc_init();
@@ -184,20 +178,21 @@ int main(int argc, char **argv) {
grpc_forbid_dualstack_sockets_for_testing = i;
/* :: and 0.0.0.0 are handled identically. */
- test_connect("::", "127.0.0.1", port, 1);
- test_connect("::", "::1", port, 1);
- test_connect("::", "::ffff:127.0.0.1", port, 1);
- test_connect("::", "localhost", port, 1);
- test_connect("0.0.0.0", "127.0.0.1", port, 1);
- test_connect("0.0.0.0", "::1", port, 1);
- test_connect("0.0.0.0", "::ffff:127.0.0.1", port, 1);
- test_connect("0.0.0.0", "localhost", port, 1);
+ test_connect("::", "127.0.0.1", grpc_pick_unused_port_or_die(), 1);
+ test_connect("::", "::1", grpc_pick_unused_port_or_die(), 1);
+ test_connect("::", "::ffff:127.0.0.1", grpc_pick_unused_port_or_die(), 1);
+ test_connect("::", "localhost", grpc_pick_unused_port_or_die(), 1);
+ test_connect("0.0.0.0", "127.0.0.1", grpc_pick_unused_port_or_die(), 1);
+ test_connect("0.0.0.0", "::1", grpc_pick_unused_port_or_die(), 1);
+ test_connect("0.0.0.0", "::ffff:127.0.0.1", grpc_pick_unused_port_or_die(),
+ 1);
+ test_connect("0.0.0.0", "localhost", grpc_pick_unused_port_or_die(), 1);
/* These only work when the families agree. */
- test_connect("::1", "::1", port, 1);
- test_connect("::1", "127.0.0.1", port, 0);
- test_connect("127.0.0.1", "127.0.0.1", port, 1);
- test_connect("127.0.0.1", "::1", port, 0);
+ test_connect("::1", "::1", grpc_pick_unused_port_or_die(), 1);
+ test_connect("::1", "127.0.0.1", grpc_pick_unused_port_or_die(), 0);
+ test_connect("127.0.0.1", "127.0.0.1", grpc_pick_unused_port_or_die(), 1);
+ test_connect("127.0.0.1", "::1", grpc_pick_unused_port_or_die(), 0);
}
diff --git a/test/core/end2end/fixtures/chttp2_fake_security.c b/test/core/end2end/fixtures/chttp2_fake_security.c
index aaca56336f..ff249ce7aa 100644
--- a/test/core/end2end/fixtures/chttp2_fake_security.c
+++ b/test/core/end2end/fixtures/chttp2_fake_security.c
@@ -37,7 +37,6 @@
#include <string.h>
#include "src/core/channel/channel_args.h"
-#include "src/core/eventmanager/em.h"
#include "src/core/security/credentials.h"
#include "src/core/security/security_context.h"
#include <grpc/support/alloc.h>
@@ -47,8 +46,6 @@
#include "test/core/util/port.h"
#include "test/core/end2end/data/ssl_test_data.h"
-static grpc_em em;
-
typedef struct fullstack_secure_fixture_data {
char *localaddr;
} fullstack_secure_fixture_data;
@@ -124,13 +121,11 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
- grpc_em_init(&em);
for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) {
grpc_end2end_tests(configs[i]);
}
- GPR_ASSERT(grpc_em_destroy(&em) == GRPC_EM_OK);
grpc_shutdown();
return 0;
diff --git a/test/core/end2end/fixtures/chttp2_fullstack.c b/test/core/end2end/fixtures/chttp2_fullstack.c
index da75d61e66..169032f6ba 100644
--- a/test/core/end2end/fixtures/chttp2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_fullstack.c
@@ -46,11 +46,9 @@
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
-#include "src/core/eventmanager/em.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/client.h"
#include "src/core/surface/server.h"
-#include "src/core/surface/surface_em.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
#include <grpc/support/host_port.h>
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
index 57c9141d95..7b0adb2e8c 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_fullstack.c
@@ -37,7 +37,6 @@
#include <string.h>
#include "src/core/channel/channel_args.h"
-#include "src/core/eventmanager/em.h"
#include "src/core/security/credentials.h"
#include "src/core/security/security_context.h"
#include <grpc/support/alloc.h>
@@ -47,8 +46,6 @@
#include "test/core/util/port.h"
#include "test/core/end2end/data/ssl_test_data.h"
-static grpc_em em;
-
typedef struct fullstack_secure_fixture_data {
char *localaddr;
} fullstack_secure_fixture_data;
@@ -131,13 +128,11 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
- grpc_em_init(&em);
for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) {
grpc_end2end_tests(configs[i]);
}
- GPR_ASSERT(grpc_em_destroy(&em) == GRPC_EM_OK);
grpc_shutdown();
return 0;
diff --git a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
index 8d5585312a..04a8795b38 100644
--- a/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
+++ b/test/core/end2end/fixtures/chttp2_simple_ssl_with_oauth2_fullstack.c
@@ -37,7 +37,7 @@
#include <string.h>
#include "src/core/channel/channel_args.h"
-#include "src/core/eventmanager/em.h"
+#include "src/core/iomgr/iomgr.h"
#include "src/core/security/credentials.h"
#include "src/core/security/security_context.h"
#include <grpc/support/alloc.h>
@@ -47,8 +47,6 @@
#include "test/core/util/port.h"
#include "test/core/end2end/data/ssl_test_data.h"
-static grpc_em em;
-
typedef struct fullstack_secure_fixture_data {
char *localaddr;
} fullstack_secure_fixture_data;
@@ -138,13 +136,11 @@ int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
- grpc_em_init(&em);
for (i = 0; i < sizeof(configs) / sizeof(*configs); i++) {
grpc_end2end_tests(configs[i]);
}
- GPR_ASSERT(grpc_em_destroy(&em) == GRPC_EM_OK);
grpc_shutdown();
return 0;
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair.c b/test/core/end2end/fixtures/chttp2_socket_pair.c
index 593ff78ba8..7ec17e3cc5 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair.c
@@ -32,25 +32,15 @@
*/
#include "test/core/end2end/end2end_tests.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
-#include "src/core/eventmanager/em.h"
+#include "src/core/iomgr/endpoint_pair.h"
+#include "src/core/iomgr/iomgr.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/client.h"
#include "src/core/surface/server.h"
-#include "src/core/surface/surface_em.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -60,15 +50,6 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
-static void create_sockets(int sv[2]) {
- int flags;
- GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
- flags = fcntl(sv[0], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
- flags = fcntl(sv[1], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
-}
-
/* chttp2 transport that is immediately available (used for testing
connected_channel without a client_channel */
@@ -102,11 +83,9 @@ static grpc_transport_setup_result client_setup_transport(
grpc_channel_get_channel_stack(channel), transport);
}
-typedef struct socketpair_fixture_data { int sv[2]; } socketpair_fixture_data;
-
static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
grpc_channel_args *client_args, grpc_channel_args *server_args) {
- socketpair_fixture_data *sfd = gpr_malloc(sizeof(socketpair_fixture_data));
+ grpc_endpoint_pair *sfd = gpr_malloc(sizeof(grpc_endpoint_pair));
grpc_end2end_test_fixture f;
f.fixture_data = sfd;
@@ -115,31 +94,27 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.server = grpc_server_create_from_filters(f.server_cq, NULL, 0, server_args);
f.client = NULL;
- create_sockets(sfd->sv);
+ *sfd = grpc_iomgr_create_endpoint_pair(65536);
return f;
}
static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *client_args) {
- socketpair_fixture_data *sfd = f->fixture_data;
- grpc_endpoint *cli_tcp;
+ grpc_endpoint_pair *sfd = f->fixture_data;
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- cli_tcp = grpc_tcp_create_dbg(sfd->sv[0], grpc_surface_em(), 65536);
grpc_create_chttp2_transport(client_setup_transport, &cs, client_args,
- cli_tcp, NULL, 0, grpc_mdctx_create(), 1);
+ sfd->client, NULL, 0, grpc_mdctx_create(), 1);
GPR_ASSERT(f->client);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *server_args) {
- socketpair_fixture_data *sfd = f->fixture_data;
- grpc_endpoint *svr_tcp;
- svr_tcp = grpc_tcp_create_dbg(sfd->sv[1], grpc_surface_em(), 65536);
- grpc_create_chttp2_transport(server_setup_transport, f, server_args, svr_tcp,
- NULL, 0, grpc_mdctx_create(), 0);
+ grpc_endpoint_pair *sfd = f->fixture_data;
+ grpc_create_chttp2_transport(server_setup_transport, f, server_args,
+ sfd->server, NULL, 0, grpc_mdctx_create(), 0);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) {
diff --git a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
index 9287364c48..3e18de9b91 100644
--- a/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
+++ b/test/core/end2end/fixtures/chttp2_socket_pair_one_byte_at_a_time.c
@@ -32,25 +32,15 @@
*/
#include "test/core/end2end/end2end_tests.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <string.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <stdio.h>
-
#include "src/core/channel/client_channel.h"
#include "src/core/channel/connected_channel.h"
#include "src/core/channel/http_filter.h"
#include "src/core/channel/http_server_filter.h"
-#include "src/core/eventmanager/em.h"
+#include "src/core/iomgr/endpoint_pair.h"
+#include "src/core/iomgr/iomgr.h"
#include "src/core/surface/channel.h"
#include "src/core/surface/client.h"
#include "src/core/surface/server.h"
-#include "src/core/surface/surface_em.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -60,15 +50,6 @@
#include "test/core/util/port.h"
#include "test/core/util/test_config.h"
-static void create_sockets(int sv[2]) {
- int flags;
- GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
- flags = fcntl(sv[0], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
- flags = fcntl(sv[1], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
-}
-
/* chttp2 transport that is immediately available (used for testing
connected_channel without a client_channel */
@@ -102,11 +83,9 @@ static grpc_transport_setup_result client_setup_transport(
grpc_channel_get_channel_stack(channel), transport);
}
-typedef struct socketpair_fixture_data { int sv[2]; } socketpair_fixture_data;
-
static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
grpc_channel_args *client_args, grpc_channel_args *server_args) {
- socketpair_fixture_data *sfd = gpr_malloc(sizeof(socketpair_fixture_data));
+ grpc_endpoint_pair *sfd = gpr_malloc(sizeof(grpc_endpoint_pair));
grpc_end2end_test_fixture f;
f.fixture_data = sfd;
@@ -115,31 +94,27 @@ static grpc_end2end_test_fixture chttp2_create_fixture_socketpair(
f.server = grpc_server_create_from_filters(f.server_cq, NULL, 0, server_args);
f.client = NULL;
- create_sockets(sfd->sv);
+ *sfd = grpc_iomgr_create_endpoint_pair(1);
return f;
}
static void chttp2_init_client_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *client_args) {
- socketpair_fixture_data *sfd = f->fixture_data;
- grpc_endpoint *cli_tcp;
+ grpc_endpoint_pair *sfd = f->fixture_data;
sp_client_setup cs;
cs.client_args = client_args;
cs.f = f;
- cli_tcp = grpc_tcp_create_dbg(sfd->sv[0], grpc_surface_em(), 1);
grpc_create_chttp2_transport(client_setup_transport, &cs, client_args,
- cli_tcp, NULL, 0, grpc_mdctx_create(), 1);
+ sfd->client, NULL, 0, grpc_mdctx_create(), 1);
GPR_ASSERT(f->client);
}
static void chttp2_init_server_socketpair(grpc_end2end_test_fixture *f,
grpc_channel_args *server_args) {
- socketpair_fixture_data *sfd = f->fixture_data;
- grpc_endpoint *svr_tcp;
- svr_tcp = grpc_tcp_create_dbg(sfd->sv[1], grpc_surface_em(), 1);
- grpc_create_chttp2_transport(server_setup_transport, f, server_args, svr_tcp,
- NULL, 0, grpc_mdctx_create(), 0);
+ grpc_endpoint_pair *sfd = f->fixture_data;
+ grpc_create_chttp2_transport(server_setup_transport, f, server_args,
+ sfd->server, NULL, 0, grpc_mdctx_create(), 0);
}
static void chttp2_tear_down_socketpair(grpc_end2end_test_fixture *f) {
diff --git a/test/core/endpoint/secure_endpoint_test.c b/test/core/endpoint/secure_endpoint_test.c
index 4fd5dee677..18a33b5f52 100644
--- a/test/core/endpoint/secure_endpoint_test.c
+++ b/test/core/endpoint/secure_endpoint_test.c
@@ -39,41 +39,25 @@
#include <unistd.h>
#include "src/core/endpoint/secure_endpoint.h"
-#include "src/core/endpoint/tcp.h"
-#include "src/core/eventmanager/em.h"
-#include "src/core/tsi/fake_transport_security.h"
+#include "src/core/iomgr/endpoint_pair.h"
+#include "src/core/iomgr/iomgr.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include "test/core/util/test_config.h"
-
-grpc_em g_em;
-
-static void create_sockets(int sv[2]) {
- int flags;
- GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
- flags = fcntl(sv[0], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
- flags = fcntl(sv[1], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
-}
+#include "src/core/tsi/fake_transport_security.h"
static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
size_t slice_size, gpr_slice *leftover_slices, size_t leftover_nslices) {
- int sv[2];
tsi_frame_protector *fake_read_protector = tsi_create_fake_protector(NULL);
tsi_frame_protector *fake_write_protector = tsi_create_fake_protector(NULL);
grpc_endpoint_test_fixture f;
- grpc_endpoint *tcp_read;
- grpc_endpoint *tcp_write;
+ grpc_endpoint_pair tcp;
- create_sockets(sv);
- grpc_em_init(&g_em);
- tcp_read = grpc_tcp_create_dbg(sv[0], &g_em, slice_size);
- tcp_write = grpc_tcp_create(sv[1], &g_em);
+ tcp = grpc_iomgr_create_endpoint_pair(slice_size);
if (leftover_nslices == 0) {
f.client_ep =
- grpc_secure_endpoint_create(fake_read_protector, tcp_read, NULL, 0);
+ grpc_secure_endpoint_create(fake_read_protector, tcp.client, NULL, 0);
} else {
int i;
tsi_result result;
@@ -115,14 +99,14 @@ static grpc_endpoint_test_fixture secure_endpoint_create_fixture_tcp_socketpair(
} while (still_pending_size > 0);
encrypted_leftover = gpr_slice_from_copied_buffer(
(const char *)encrypted_buffer, total_buffer_size - buffer_size);
- f.client_ep = grpc_secure_endpoint_create(fake_read_protector, tcp_read,
+ f.client_ep = grpc_secure_endpoint_create(fake_read_protector, tcp.client,
&encrypted_leftover, 1);
gpr_slice_unref(encrypted_leftover);
gpr_free(encrypted_buffer);
}
f.server_ep =
- grpc_secure_endpoint_create(fake_write_protector, tcp_write, NULL, 0);
+ grpc_secure_endpoint_create(fake_write_protector, tcp.server, NULL, 0);
return f;
}
@@ -141,7 +125,7 @@ secure_endpoint_create_fixture_tcp_socketpair_leftover(size_t slice_size) {
return f;
}
-static void clean_up() { grpc_em_destroy(&g_em); }
+static void clean_up() {}
static grpc_endpoint_test_config configs[] = {
{"secure_ep/tcp_socketpair",
@@ -213,9 +197,11 @@ static void test_destroy_ep_early(grpc_endpoint_test_config config,
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
+ grpc_iomgr_init();
grpc_endpoint_tests(configs[0]);
test_leftover(configs[1], 1);
test_destroy_ep_early(configs[1], 1);
+ grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/eventmanager/em_pipe_test.c b/test/core/eventmanager/em_pipe_test.c
deleted file mode 100644
index f2414c42b1..0000000000
--- a/test/core/eventmanager/em_pipe_test.c
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-/* Test grpc_em_fd with pipe. The test creates a pipe with non-blocking mode,
- sends a stream of bytes through the pipe, and verifies that all bytes are
- received. */
-#include "src/core/eventmanager/em.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <pthread.h>
-#include <string.h>
-#include <stdio.h>
-#include <unistd.h>
-
-#include <grpc/support/log.h>
-#include "test/core/util/test_config.h"
-
-/* Operation for fcntl() to set pipe buffer size. */
-#ifndef F_SETPIPE_SZ
-#define F_SETPIPE_SZ (1024 + 7)
-#endif
-
-#define TOTAL_WRITE 3 /* total number of times that the write buffer is full. \
- */
-#define BUF_SIZE 1024
-char read_buf[BUF_SIZE];
-char write_buf[BUF_SIZE];
-
-typedef struct {
- int fd[2];
- grpc_em em;
- grpc_em_fd read_em_fd;
- grpc_em_fd write_em_fd;
- int num_write; /* number of times that the write buffer is full*/
- ssize_t bytes_written_total; /* total number of bytes written to the pipe */
- ssize_t bytes_read_total; /* total number of bytes read from the pipe */
- pthread_mutex_t mu; /* protect cv and done */
- pthread_cond_t cv; /* signaled when read finished */
- int done; /* set to 1 when read finished */
-} async_pipe;
-
-void write_shutdown_cb(void *arg, /*async_pipe*/
- enum grpc_em_cb_status status) {
- async_pipe *ap = arg;
- grpc_em_fd_destroy(&ap->write_em_fd);
-}
-
-void write_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) {
- async_pipe *ap = arg;
- ssize_t bytes_written = 0;
-
- if (status == GRPC_CALLBACK_CANCELLED) {
- write_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
- return;
- }
-
- do {
- bytes_written = write(ap->fd[1], write_buf, BUF_SIZE);
- if (bytes_written > 0) ap->bytes_written_total += bytes_written;
- } while (bytes_written > 0);
-
- if (errno == EAGAIN) {
- if (ap->num_write < TOTAL_WRITE) {
- ap->num_write++;
- grpc_em_fd_notify_on_write(&ap->write_em_fd, write_cb, ap,
- gpr_inf_future);
- } else {
- /* Note that this could just shut down directly; doing a trip through the
- shutdown path serves only a demonstration of the API. */
- grpc_em_fd_shutdown(&ap->write_em_fd);
- grpc_em_fd_notify_on_write(&ap->write_em_fd, write_cb, ap,
- gpr_inf_future);
- }
- } else {
- GPR_ASSERT(0 && strcat("unknown errno: ", strerror(errno)));
- }
-}
-
-void read_shutdown_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) {
- async_pipe *ap = arg;
- grpc_em_fd_destroy(&ap->read_em_fd);
- pthread_mutex_lock(&ap->mu);
- if (ap->done == 0) {
- ap->done = 1;
- pthread_cond_signal(&ap->cv);
- }
- pthread_mutex_unlock(&ap->mu);
-}
-
-void read_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) {
- async_pipe *ap = arg;
- ssize_t bytes_read = 0;
-
- if (status == GRPC_CALLBACK_CANCELLED) {
- read_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
- return;
- }
-
- do {
- bytes_read = read(ap->fd[0], read_buf, BUF_SIZE);
- if (bytes_read > 0) ap->bytes_read_total += bytes_read;
- } while (bytes_read > 0);
-
- if (bytes_read == 0) {
- /* Note that this could just shut down directly; doing a trip through the
- shutdown path serves only a demonstration of the API. */
- grpc_em_fd_shutdown(&ap->read_em_fd);
- grpc_em_fd_notify_on_read(&ap->read_em_fd, read_cb, ap, gpr_inf_future);
- } else if (bytes_read == -1) {
- if (errno == EAGAIN) {
- grpc_em_fd_notify_on_read(&ap->read_em_fd, read_cb, ap, gpr_inf_future);
- } else {
- GPR_ASSERT(0 && strcat("unknown errno: ", strerror(errno)));
- }
- }
-}
-
-void dummy_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) {}
-
-void async_pipe_init(async_pipe *ap) {
- int i;
-
- ap->num_write = 0;
- ap->bytes_written_total = 0;
- ap->bytes_read_total = 0;
-
- pthread_mutex_init(&ap->mu, NULL);
- pthread_cond_init(&ap->cv, NULL);
- ap->done = 0;
-
- GPR_ASSERT(0 == pipe(ap->fd));
- for (i = 0; i < 2; i++) {
- int flags = fcntl(ap->fd[i], F_GETFL, 0);
- GPR_ASSERT(fcntl(ap->fd[i], F_SETFL, flags | O_NONBLOCK) == 0);
- GPR_ASSERT(fcntl(ap->fd[i], F_SETPIPE_SZ, 4096) == 4096);
- }
-
- grpc_em_init(&ap->em);
- grpc_em_fd_init(&ap->read_em_fd, &ap->em, ap->fd[0]);
- grpc_em_fd_init(&ap->write_em_fd, &ap->em, ap->fd[1]);
-}
-
-static void async_pipe_start(async_pipe *ap) {
- grpc_em_fd_notify_on_read(&ap->read_em_fd, read_cb, ap, gpr_inf_future);
- grpc_em_fd_notify_on_write(&ap->write_em_fd, write_cb, ap, gpr_inf_future);
-}
-
-static void async_pipe_wait_destroy(async_pipe *ap) {
- pthread_mutex_lock(&ap->mu);
- while (!ap->done) pthread_cond_wait(&ap->cv, &ap->mu);
- pthread_mutex_unlock(&ap->mu);
- pthread_mutex_destroy(&ap->mu);
- pthread_cond_destroy(&ap->cv);
-
- grpc_em_destroy(&ap->em);
-}
-
-int main(int argc, char **argv) {
- async_pipe ap;
- grpc_test_init(argc, argv);
- async_pipe_init(&ap);
- async_pipe_start(&ap);
- async_pipe_wait_destroy(&ap);
- GPR_ASSERT(ap.bytes_read_total == ap.bytes_written_total);
- gpr_log(GPR_INFO, "read total bytes %d", ap.bytes_read_total);
- return 0;
-}
diff --git a/test/core/httpcli/httpcli_test.c b/test/core/httpcli/httpcli_test.c
index 5c0d87c427..c901e595f6 100644
--- a/test/core/httpcli/httpcli_test.c
+++ b/test/core/httpcli/httpcli_test.c
@@ -35,11 +35,12 @@
#include <string.h>
+#include "src/core/iomgr/iomgr.h"
#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
#include "test/core/util/test_config.h"
static gpr_event g_done;
-static grpc_em g_em;
static gpr_timespec n_seconds_time(int seconds) {
return gpr_time_add(gpr_now(), gpr_time_from_micros(seconds * 1000000));
@@ -55,7 +56,7 @@ static void on_finish(void *arg, const grpc_httpcli_response *response) {
static void test_get(int use_ssl) {
grpc_httpcli_request req;
- gpr_log(GPR_INFO, "running %s with use_ssl=%d.", __FUNCTION__, (int)use_ssl);
+ gpr_log(GPR_INFO, "running %s with use_ssl=%d.", __FUNCTION__, use_ssl);
gpr_event_init(&g_done);
memset(&req, 0, sizeof(req));
@@ -63,7 +64,7 @@ static void test_get(int use_ssl) {
req.path = "/";
req.use_ssl = use_ssl;
- grpc_httpcli_get(&req, n_seconds_time(15), &g_em, on_finish, (void *)42);
+ grpc_httpcli_get(&req, n_seconds_time(15), on_finish, (void *)42);
GPR_ASSERT(gpr_event_wait(&g_done, n_seconds_time(20)));
}
@@ -79,7 +80,7 @@ static void test_post(int use_ssl) {
req.path = "/1eamwr21";
req.use_ssl = use_ssl;
- grpc_httpcli_post(&req, NULL, 0, n_seconds_time(15), &g_em, on_finish,
+ grpc_httpcli_post(&req, NULL, 0, n_seconds_time(15), on_finish,
(void *)42);
GPR_ASSERT(gpr_event_wait(&g_done, n_seconds_time(20)));
}
@@ -87,7 +88,7 @@ static void test_post(int use_ssl) {
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
- grpc_em_init(&g_em);
+ grpc_iomgr_init();
test_get(0);
test_get(1);
@@ -95,7 +96,7 @@ int main(int argc, char **argv) {
/* test_post(0); */
/* test_post(1); */
- grpc_em_destroy(&g_em);
+ grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c
new file mode 100644
index 0000000000..0dcd21463d
--- /dev/null
+++ b/test/core/iomgr/alarm_test.c
@@ -0,0 +1,219 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* Test gRPC event manager with a simple TCP upload server and client. */
+#include "src/core/iomgr/alarm.h"
+
+#include <ctype.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+#include "test/core/util/test_config.h"
+
+/* Dummy gRPC callback */
+void no_op_cb(void *arg, grpc_iomgr_cb_status status) {}
+
+typedef struct {
+ gpr_cv cv;
+ gpr_mu mu;
+ int counter;
+ int done_success_ctr;
+ int done_cancel_ctr;
+ int done;
+ gpr_event fcb_arg;
+ grpc_iomgr_cb_status status;
+} alarm_arg;
+
+static void followup_cb(void *arg, grpc_iomgr_cb_status status) {
+ gpr_event_set((gpr_event *)arg, arg);
+}
+
+/* Called when an alarm expires. */
+static void alarm_cb(void *arg /* alarm_arg */, grpc_iomgr_cb_status status) {
+ alarm_arg *a = arg;
+ gpr_mu_lock(&a->mu);
+ if (status == GRPC_CALLBACK_SUCCESS) {
+ a->counter++;
+ a->done_success_ctr++;
+ } else if (status == GRPC_CALLBACK_CANCELLED) {
+ a->done_cancel_ctr++;
+ } else {
+ GPR_ASSERT(0);
+ }
+ a->done = 1;
+ a->status = status;
+ gpr_cv_signal(&a->cv);
+ gpr_mu_unlock(&a->mu);
+ grpc_iomgr_add_callback(followup_cb, &a->fcb_arg);
+}
+
+/* Test grpc_alarm add and cancel. */
+static void test_grpc_alarm() {
+ grpc_alarm alarm;
+ grpc_alarm alarm_to_cancel;
+ gpr_timespec tv0 = {0, 1};
+ /* Timeout on the alarm cond. var, so make big enough to absorb time
+ deviations. Otherwise, operations after wait will not be properly ordered
+ */
+ gpr_timespec tv1 = gpr_time_from_micros(200000);
+ gpr_timespec tv2 = {0, 1};
+ gpr_timespec alarm_deadline;
+ gpr_timespec followup_deadline;
+
+ alarm_arg arg;
+ alarm_arg arg2;
+ void *fdone;
+
+ grpc_iomgr_init();
+
+ arg.counter = 0;
+ arg.status = GRPC_CALLBACK_DO_NOT_USE;
+ arg.done_success_ctr = 0;
+ arg.done_cancel_ctr = 0;
+ arg.done = 0;
+ gpr_mu_init(&arg.mu);
+ gpr_cv_init(&arg.cv);
+ gpr_event_init(&arg.fcb_arg);
+
+ grpc_alarm_init(&alarm, alarm_cb, &arg);
+ grpc_alarm_add(&alarm, gpr_time_add(tv0, gpr_now()));
+
+ alarm_deadline = gpr_time_add(gpr_now(), tv1);
+ gpr_mu_lock(&arg.mu);
+ while (arg.done == 0) {
+ gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline);
+ }
+ gpr_mu_unlock(&arg.mu);
+
+ followup_deadline = gpr_time_add(gpr_now(), tv1);
+ fdone = gpr_event_wait(&arg.fcb_arg, followup_deadline);
+
+ if (arg.counter != 1) {
+ gpr_log(GPR_ERROR, "Alarm callback not called");
+ GPR_ASSERT(0);
+ } else if (arg.done_success_ctr != 1) {
+ gpr_log(GPR_ERROR, "Alarm done callback not called with success");
+ GPR_ASSERT(0);
+ } else if (arg.done_cancel_ctr != 0) {
+ gpr_log(GPR_ERROR, "Alarm done callback called with cancel");
+ GPR_ASSERT(0);
+ } else if (arg.status == GRPC_CALLBACK_DO_NOT_USE) {
+ gpr_log(GPR_ERROR, "Alarm callback without status");
+ GPR_ASSERT(0);
+ } else {
+ gpr_log(GPR_INFO, "Alarm callback called successfully");
+ }
+
+ if (fdone != (void *)&arg.fcb_arg) {
+ gpr_log(GPR_ERROR, "Followup callback #1 not invoked properly %p %p", fdone,
+ &arg.fcb_arg);
+ GPR_ASSERT(0);
+ }
+ gpr_cv_destroy(&arg.cv);
+ gpr_mu_destroy(&arg.mu);
+
+ arg2.counter = 0;
+ arg2.status = GRPC_CALLBACK_DO_NOT_USE;
+ arg2.done_success_ctr = 0;
+ arg2.done_cancel_ctr = 0;
+ arg2.done = 0;
+ gpr_mu_init(&arg2.mu);
+ gpr_cv_init(&arg2.cv);
+ gpr_event_init(&arg2.fcb_arg);
+
+ grpc_alarm_init(&alarm_to_cancel, alarm_cb, &arg2);
+ grpc_alarm_add(&alarm_to_cancel, gpr_time_add(tv2, gpr_now()));
+ grpc_alarm_cancel(&alarm_to_cancel);
+
+ alarm_deadline = gpr_time_add(gpr_now(), tv1);
+ gpr_mu_lock(&arg2.mu);
+ while (arg2.done == 0) {
+ gpr_cv_wait(&arg2.cv, &arg2.mu, alarm_deadline);
+ }
+ gpr_mu_unlock(&arg2.mu);
+
+ followup_deadline = gpr_time_add(gpr_now(), tv1);
+ fdone = gpr_event_wait(&arg2.fcb_arg, followup_deadline);
+
+ if (arg2.counter != arg2.done_success_ctr) {
+ gpr_log(GPR_ERROR, "Alarm callback called but didn't lead to done success");
+ GPR_ASSERT(0);
+ } else if (arg2.done_success_ctr && arg2.done_cancel_ctr) {
+ gpr_log(GPR_ERROR, "Alarm done callback called with success and cancel");
+ GPR_ASSERT(0);
+ } else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) {
+ gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times");
+ GPR_ASSERT(0);
+ } else if (arg2.status == GRPC_CALLBACK_DO_NOT_USE) {
+ gpr_log(GPR_ERROR, "Alarm callback without status");
+ GPR_ASSERT(0);
+ } else if (arg2.done_success_ctr) {
+ gpr_log(GPR_INFO, "Alarm callback executed before cancel");
+ gpr_log(GPR_INFO, "Current value of triggered is %d\n",
+ (int)alarm_to_cancel.triggered);
+ } else if (arg2.done_cancel_ctr) {
+ gpr_log(GPR_INFO, "Alarm callback canceled");
+ gpr_log(GPR_INFO, "Current value of triggered is %d\n",
+ (int)alarm_to_cancel.triggered);
+ } else {
+ gpr_log(GPR_ERROR, "Alarm cancel test should not be here");
+ GPR_ASSERT(0);
+ }
+
+ if (fdone != (void *)&arg2.fcb_arg) {
+ gpr_log(GPR_ERROR, "Followup callback #2 not invoked properly %p %p", fdone,
+ &arg2.fcb_arg);
+ GPR_ASSERT(0);
+ }
+ gpr_cv_destroy(&arg2.cv);
+ gpr_mu_destroy(&arg2.mu);
+
+ grpc_iomgr_shutdown();
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ test_grpc_alarm();
+ return 0;
+}
diff --git a/test/core/eventmanager/em_test.c b/test/core/iomgr/fd_posix_test.c
index 274edc3231..4d4461ec6c 100644
--- a/test/core/eventmanager/em_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -32,7 +32,7 @@
*/
/* Test gRPC event manager with a simple TCP upload server and client. */
-#include "src/core/eventmanager/em.h"
+#include "src/core/iomgr/iomgr_libevent.h"
#include <ctype.h>
#include <errno.h>
@@ -92,8 +92,7 @@ void no_op_cb(void *arg, enum grpc_em_cb_status status) {}
/* An upload server. */
typedef struct {
- grpc_em em; /* event manger used by the sever */
- grpc_em_fd em_fd; /* listening fd */
+ grpc_fd *em_fd; /* listening fd */
ssize_t read_bytes_total; /* total number of received bytes */
gpr_mu mu; /* protect done and done_cv */
gpr_cv done_cv; /* signaled when a server finishes serving */
@@ -101,7 +100,6 @@ typedef struct {
} server;
static void server_init(server *sv) {
- GPR_ASSERT(grpc_em_init(&sv->em) == GRPC_EM_OK);
sv->read_bytes_total = 0;
gpr_mu_init(&sv->mu);
gpr_cv_init(&sv->done_cv);
@@ -112,7 +110,7 @@ static void server_init(server *sv) {
Created when a new upload request arrives in the server. */
typedef struct {
server *sv; /* not owned by a single session */
- grpc_em_fd em_fd; /* fd to read upload bytes */
+ grpc_fd *em_fd; /* fd to read upload bytes */
char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
} session;
@@ -122,17 +120,17 @@ static void session_shutdown_cb(void *arg, /*session*/
enum grpc_em_cb_status status) {
session *se = arg;
server *sv = se->sv;
- grpc_em_fd_destroy(&se->em_fd);
+ grpc_fd_destroy(se->em_fd);
gpr_free(se);
/* Start to shutdown listen fd. */
- grpc_em_fd_shutdown(&sv->em_fd);
+ grpc_fd_shutdown(sv->em_fd);
}
/* Called when data become readable in a session. */
static void session_read_cb(void *arg, /*session*/
enum grpc_em_cb_status status) {
session *se = arg;
- int fd = grpc_em_fd_get(&se->em_fd);
+ int fd = grpc_fd_get(se->em_fd);
ssize_t read_once = 0;
ssize_t read_total = 0;
@@ -153,8 +151,8 @@ static void session_read_cb(void *arg, /*session*/
It is possible to read nothing due to spurious edge event or data has
been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
if (read_once == 0) {
- grpc_em_fd_shutdown(&se->em_fd);
- grpc_em_fd_notify_on_read(&se->em_fd, session_read_cb, se, gpr_inf_future);
+ grpc_fd_shutdown(se->em_fd);
+ grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future);
} else if (read_once == -1) {
if (errno == EAGAIN) {
/* An edge triggered event is cached in the kernel until next poll.
@@ -165,8 +163,8 @@ static void session_read_cb(void *arg, /*session*/
TODO(chenw): in multi-threaded version, callback and polling can be
run in different threads. polling may catch a persist read edge event
before notify_on_read is called. */
- GPR_ASSERT(grpc_em_fd_notify_on_read(&se->em_fd, session_read_cb, se,
- gpr_inf_future) == GRPC_EM_OK);
+ GPR_ASSERT(grpc_fd_notify_on_read(se->em_fd, session_read_cb, se,
+ gpr_inf_future));
} else {
gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
GPR_ASSERT(0);
@@ -180,7 +178,7 @@ static void listen_shutdown_cb(void *arg /*server*/,
enum grpc_em_cb_status status) {
server *sv = arg;
- grpc_em_fd_destroy(&sv->em_fd);
+ grpc_fd_destroy(sv->em_fd);
gpr_mu_lock(&sv->mu);
sv->done = 1;
@@ -197,26 +195,26 @@ static void listen_cb(void *arg, /*=sv_arg*/
session *se;
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
- struct grpc_em_fd *listen_em_fd = &sv->em_fd;
+ struct grpc_fd *listen_em_fd = sv->em_fd;
if (status == GRPC_CALLBACK_CANCELLED) {
listen_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
return;
}
- fd = accept(grpc_em_fd_get(listen_em_fd), (struct sockaddr *)&ss, &slen);
+ fd = accept(grpc_fd_get(listen_em_fd), (struct sockaddr *)&ss, &slen);
GPR_ASSERT(fd >= 0);
GPR_ASSERT(fd < FD_SETSIZE);
flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
se = gpr_malloc(sizeof(*se));
se->sv = sv;
- GPR_ASSERT(grpc_em_fd_init(&se->em_fd, &sv->em, fd) == GRPC_EM_OK);
- GPR_ASSERT(grpc_em_fd_notify_on_read(&se->em_fd, session_read_cb, se,
- gpr_inf_future) == GRPC_EM_OK);
+ se->em_fd = grpc_fd_create(fd);
+ GPR_ASSERT(
+ grpc_fd_notify_on_read(se->em_fd, session_read_cb, se, gpr_inf_future));
- GPR_ASSERT(grpc_em_fd_notify_on_read(listen_em_fd, listen_cb, sv,
- gpr_inf_future) == GRPC_EM_OK);
+ GPR_ASSERT(
+ grpc_fd_notify_on_read(listen_em_fd, listen_cb, sv, gpr_inf_future));
}
/* Max number of connections pending to be accepted by listen(). */
@@ -235,14 +233,13 @@ static int server_start(server *sv) {
create_test_socket(port, &fd, &sin);
addr_len = sizeof(sin);
GPR_ASSERT(bind(fd, (struct sockaddr *)&sin, addr_len) == 0);
- GPR_ASSERT(getsockname(fd, (struct sockaddr *)&sin, &addr_len) == GRPC_EM_OK);
+ GPR_ASSERT(getsockname(fd, (struct sockaddr *)&sin, &addr_len) == 0);
port = ntohs(sin.sin_port);
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
- GPR_ASSERT(grpc_em_fd_init(&sv->em_fd, &sv->em, fd) == GRPC_EM_OK);
+ sv->em_fd = grpc_fd_create(fd);
/* Register to be interested in reading from listen_fd. */
- GPR_ASSERT(grpc_em_fd_notify_on_read(&sv->em_fd, listen_cb, sv,
- gpr_inf_future) == GRPC_EM_OK);
+ GPR_ASSERT(grpc_fd_notify_on_read(sv->em_fd, listen_cb, sv, gpr_inf_future));
return port;
}
@@ -255,8 +252,6 @@ static void server_wait_and_shutdown(server *sv) {
gpr_mu_destroy(&sv->mu);
gpr_cv_destroy(&sv->done_cv);
-
- GPR_ASSERT(grpc_em_destroy(&sv->em) == GRPC_EM_OK);
}
/* ===An upload client to test notify_on_write=== */
@@ -268,8 +263,7 @@ static void server_wait_and_shutdown(server *sv) {
/* An upload client. */
typedef struct {
- grpc_em em;
- grpc_em_fd em_fd;
+ grpc_fd *em_fd;
char write_buf[CLIENT_WRITE_BUF_SIZE];
ssize_t write_bytes_total;
/* Number of times that the client fills up the write buffer and calls
@@ -282,7 +276,6 @@ typedef struct {
} client;
static void client_init(client *cl) {
- GPR_ASSERT(grpc_em_init(&cl->em) == GRPC_EM_OK);
memset(cl->write_buf, 0, sizeof(cl->write_buf));
cl->write_bytes_total = 0;
cl->client_write_cnt = 0;
@@ -295,7 +288,7 @@ static void client_init(client *cl) {
static void client_session_shutdown_cb(void *arg /*client*/,
enum grpc_em_cb_status status) {
client *cl = arg;
- grpc_em_fd_destroy(&cl->em_fd);
+ grpc_fd_destroy(cl->em_fd);
gpr_mu_lock(&cl->mu);
cl->done = 1;
gpr_cv_signal(&cl->done_cv);
@@ -306,7 +299,7 @@ static void client_session_shutdown_cb(void *arg /*client*/,
static void client_session_write(void *arg, /*client*/
enum grpc_em_cb_status status) {
client *cl = arg;
- int fd = grpc_em_fd_get(&cl->em_fd);
+ int fd = grpc_fd_get(cl->em_fd);
ssize_t write_once = 0;
if (status == GRPC_CALLBACK_CANCELLED) {
@@ -322,14 +315,14 @@ static void client_session_write(void *arg, /*client*/
if (errno == EAGAIN) {
gpr_mu_lock(&cl->mu);
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
- GPR_ASSERT(grpc_em_fd_notify_on_write(&cl->em_fd, client_session_write,
- cl, gpr_inf_future) == GRPC_EM_OK);
+ GPR_ASSERT(grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl,
+ gpr_inf_future));
cl->client_write_cnt++;
} else {
close(fd);
- grpc_em_fd_shutdown(&cl->em_fd);
- grpc_em_fd_notify_on_write(&cl->em_fd, client_session_write, cl,
- gpr_inf_future);
+ grpc_fd_shutdown(cl->em_fd);
+ grpc_fd_notify_on_write(cl->em_fd, client_session_write, cl,
+ gpr_inf_future);
}
gpr_mu_unlock(&cl->mu);
} else {
@@ -349,7 +342,7 @@ static void client_start(client *cl, int port) {
GPR_ASSERT(0);
}
- GPR_ASSERT(grpc_em_fd_init(&cl->em_fd, &cl->em, fd) == GRPC_EM_OK);
+ cl->em_fd = grpc_fd_create(fd);
client_session_write(cl, GRPC_CALLBACK_SUCCESS);
}
@@ -362,14 +355,12 @@ static void client_wait_and_shutdown(client *cl) {
gpr_mu_destroy(&cl->mu);
gpr_cv_destroy(&cl->done_cv);
-
- GPR_ASSERT(grpc_em_destroy(&cl->em) == GRPC_EM_OK);
}
-/* Test grpc_em_fd. Start an upload server and client, upload a stream of
+/* Test grpc_fd. Start an upload server and client, upload a stream of
bytes from the client to the server, and verify that the total number of
sent bytes is equal to the total number of received bytes. */
-static void test_grpc_em_fd() {
+static void test_grpc_fd() {
server sv;
client cl;
int port;
@@ -425,9 +416,8 @@ static void second_read_callback(void *arg /* fd_change_data */,
Note that we have two different but almost identical callbacks above -- the
point is to have two different function pointers and two different data
pointers and make sure that changing both really works. */
-static void test_grpc_em_fd_change() {
- grpc_em em;
- grpc_em_fd em_fd;
+static void test_grpc_fd_change() {
+ grpc_fd *em_fd;
fd_change_data a, b;
int flags;
int sv[2];
@@ -443,11 +433,10 @@ static void test_grpc_em_fd_change() {
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
- grpc_em_init(&em);
- grpc_em_fd_init(&em_fd, &em, sv[0]);
+ em_fd = grpc_fd_create(sv[0]);
/* Register the first callback, then make its FD readable */
- grpc_em_fd_notify_on_read(&em_fd, first_read_callback, &a, gpr_inf_future);
+ grpc_fd_notify_on_read(em_fd, first_read_callback, &a, gpr_inf_future);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -466,7 +455,7 @@ static void test_grpc_em_fd_change() {
/* Now register a second callback with distinct change data, and do the same
thing again. */
- grpc_em_fd_notify_on_read(&em_fd, second_read_callback, &b, gpr_inf_future);
+ grpc_fd_notify_on_read(em_fd, second_read_callback, &b, gpr_inf_future);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -479,8 +468,7 @@ static void test_grpc_em_fd_change() {
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(&b.mu);
- grpc_em_fd_destroy(&em_fd);
- grpc_em_destroy(&em);
+ grpc_fd_destroy(em_fd);
destroy_change_data(&a);
destroy_change_data(&b);
close(sv[0]);
@@ -495,9 +483,8 @@ void timeout_callback(void *arg, enum grpc_em_cb_status status) {
}
}
-void test_grpc_em_fd_notify_timeout() {
- grpc_em em;
- grpc_em_fd em_fd;
+void test_grpc_fd_notify_timeout() {
+ grpc_fd *em_fd;
gpr_event ev;
int flags;
int sv[2];
@@ -512,206 +499,26 @@ void test_grpc_em_fd_notify_timeout() {
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
- grpc_em_init(&em);
- grpc_em_fd_init(&em_fd, &em, sv[0]);
+ em_fd = grpc_fd_create(sv[0]);
timeout = gpr_time_from_micros(1000000);
deadline = gpr_time_add(gpr_now(), timeout);
- grpc_em_fd_notify_on_read(&em_fd, timeout_callback, &ev, deadline);
+ grpc_fd_notify_on_read(em_fd, timeout_callback, &ev, deadline);
GPR_ASSERT(gpr_event_wait(&ev, gpr_time_add(deadline, timeout)));
GPR_ASSERT(gpr_event_get(&ev) == (void *)1);
- grpc_em_fd_destroy(&em_fd);
- grpc_em_destroy(&em);
+ grpc_fd_destroy(em_fd);
close(sv[1]);
}
-typedef struct {
- grpc_em *em;
- gpr_cv cv;
- gpr_mu mu;
- int counter;
- int done_success_ctr;
- int done_cancel_ctr;
- int done;
- gpr_event fcb_arg;
- grpc_em_cb_status status;
-} alarm_arg;
-
-static void followup_cb(void *arg, grpc_em_cb_status status) {
- gpr_event_set((gpr_event *)arg, arg);
-}
-
-/* Called when an alarm expires. */
-static void alarm_cb(void *arg /* alarm_arg */, grpc_em_cb_status status) {
- alarm_arg *a = arg;
- gpr_mu_lock(&a->mu);
- if (status == GRPC_CALLBACK_SUCCESS) {
- a->counter++;
- a->done_success_ctr++;
- } else if (status == GRPC_CALLBACK_CANCELLED) {
- a->done_cancel_ctr++;
- } else {
- GPR_ASSERT(0);
- }
- a->done = 1;
- a->status = status;
- gpr_cv_signal(&a->cv);
- gpr_mu_unlock(&a->mu);
- grpc_em_add_callback(a->em, followup_cb, &a->fcb_arg);
-}
-
-/* Test grpc_em_alarm add and cancel. */
-static void test_grpc_em_alarm() {
- struct grpc_em em;
- struct grpc_em_alarm alarm;
- struct grpc_em_alarm alarm_to_cancel;
- gpr_timespec tv0 = {0, 1};
- /* Timeout on the alarm cond. var, so make big enough to absorb time
- deviations. Otherwise, operations after wait will not be properly ordered
- */
- gpr_timespec tv1 = gpr_time_from_micros(200000);
- gpr_timespec tv2 = {0, 1};
- gpr_timespec alarm_deadline;
- gpr_timespec followup_deadline;
-
- alarm_arg arg;
- alarm_arg arg2;
- void *fdone;
-
- GPR_ASSERT(grpc_em_init(&em) == GRPC_EM_OK);
-
- arg.em = &em;
- arg.counter = 0;
- arg.status = GRPC_CALLBACK_DO_NOT_USE;
- arg.done_success_ctr = 0;
- arg.done_cancel_ctr = 0;
- arg.done = 0;
- gpr_mu_init(&arg.mu);
- gpr_cv_init(&arg.cv);
- gpr_event_init(&arg.fcb_arg);
-
- GPR_ASSERT(grpc_em_alarm_init(&alarm, &em, alarm_cb, &arg) == GRPC_EM_OK);
- GPR_ASSERT(grpc_em_alarm_add(&alarm, gpr_time_add(tv0, gpr_now())) ==
- GRPC_EM_OK);
-
- alarm_deadline = gpr_time_add(gpr_now(), tv1);
- gpr_mu_lock(&arg.mu);
- while (arg.done == 0) {
- gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline);
- }
- gpr_mu_unlock(&arg.mu);
-
- followup_deadline = gpr_time_add(gpr_now(), tv1);
- fdone = gpr_event_wait(&arg.fcb_arg, followup_deadline);
-
- if (arg.counter != 1) {
- gpr_log(GPR_ERROR, "Alarm callback not called");
- GPR_ASSERT(0);
- } else if (arg.done_success_ctr != 1) {
- gpr_log(GPR_ERROR, "Alarm done callback not called with success");
- GPR_ASSERT(0);
- } else if (arg.done_cancel_ctr != 0) {
- gpr_log(GPR_ERROR, "Alarm done callback called with cancel");
- GPR_ASSERT(0);
- } else if (arg.status == GRPC_CALLBACK_DO_NOT_USE) {
- gpr_log(GPR_ERROR, "Alarm callback without status");
- GPR_ASSERT(0);
- } else {
- gpr_log(GPR_INFO, "Alarm callback called successfully");
- }
-
- if (fdone != (void *)&arg.fcb_arg) {
- gpr_log(GPR_ERROR, "Followup callback #1 not invoked properly %p %p", fdone,
- &arg.fcb_arg);
- GPR_ASSERT(0);
- }
- gpr_cv_destroy(&arg.cv);
- gpr_mu_destroy(&arg.mu);
-
- arg2.em = &em;
- arg2.counter = 0;
- arg2.status = GRPC_CALLBACK_DO_NOT_USE;
- arg2.done_success_ctr = 0;
- arg2.done_cancel_ctr = 0;
- arg2.done = 0;
- gpr_mu_init(&arg2.mu);
- gpr_cv_init(&arg2.cv);
- gpr_event_init(&arg2.fcb_arg);
-
- GPR_ASSERT(grpc_em_alarm_init(&alarm_to_cancel, &em, alarm_cb, &arg2) ==
- GRPC_EM_OK);
- GPR_ASSERT(grpc_em_alarm_add(&alarm_to_cancel,
- gpr_time_add(tv2, gpr_now())) == GRPC_EM_OK);
- switch (grpc_em_alarm_cancel(&alarm_to_cancel)) {
- case GRPC_EM_OK:
- gpr_log(GPR_INFO, "Alarm cancel succeeded");
- break;
- case GRPC_EM_ERROR:
- gpr_log(GPR_ERROR, "Alarm cancel failed");
- GPR_ASSERT(0);
- break;
- case GRPC_EM_INVALID_ARGUMENTS:
- gpr_log(GPR_ERROR, "Alarm cancel failed with bad response code");
- gpr_log(GPR_ERROR, "Current value of triggered is %d\n",
- (int)alarm_to_cancel.triggered);
- GPR_ASSERT(0);
- break;
- }
-
- alarm_deadline = gpr_time_add(gpr_now(), tv1);
- gpr_mu_lock(&arg2.mu);
- while (arg2.done == 0) {
- gpr_cv_wait(&arg2.cv, &arg2.mu, alarm_deadline);
- }
- gpr_mu_unlock(&arg2.mu);
-
- followup_deadline = gpr_time_add(gpr_now(), tv1);
- fdone = gpr_event_wait(&arg2.fcb_arg, followup_deadline);
-
- if (arg2.counter != arg2.done_success_ctr) {
- gpr_log(GPR_ERROR, "Alarm callback called but didn't lead to done success");
- GPR_ASSERT(0);
- } else if (arg2.done_success_ctr && arg2.done_cancel_ctr) {
- gpr_log(GPR_ERROR, "Alarm done callback called with success and cancel");
- GPR_ASSERT(0);
- } else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) {
- gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times");
- GPR_ASSERT(0);
- } else if (arg2.status == GRPC_CALLBACK_DO_NOT_USE) {
- gpr_log(GPR_ERROR, "Alarm callback without status");
- GPR_ASSERT(0);
- } else if (arg2.done_success_ctr) {
- gpr_log(GPR_INFO, "Alarm callback executed before cancel");
- gpr_log(GPR_INFO, "Current value of triggered is %d\n",
- (int)alarm_to_cancel.triggered);
- } else if (arg2.done_cancel_ctr) {
- gpr_log(GPR_INFO, "Alarm callback canceled");
- gpr_log(GPR_INFO, "Current value of triggered is %d\n",
- (int)alarm_to_cancel.triggered);
- } else {
- gpr_log(GPR_ERROR, "Alarm cancel test should not be here");
- GPR_ASSERT(0);
- }
-
- if (fdone != (void *)&arg2.fcb_arg) {
- gpr_log(GPR_ERROR, "Followup callback #2 not invoked properly %p %p", fdone,
- &arg2.fcb_arg);
- GPR_ASSERT(0);
- }
- gpr_cv_destroy(&arg2.cv);
- gpr_mu_destroy(&arg2.mu);
-
- GPR_ASSERT(grpc_em_destroy(&em) == GRPC_EM_OK);
-}
-
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
- test_grpc_em_alarm();
- test_grpc_em_fd();
- test_grpc_em_fd_change();
- test_grpc_em_fd_notify_timeout();
+ grpc_iomgr_init();
+ test_grpc_fd();
+ test_grpc_fd_change();
+ test_grpc_fd_notify_timeout();
+ grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/endpoint/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c
index 1e208d3699..99e3119581 100644
--- a/test/core/endpoint/resolve_address_test.c
+++ b/test/core/iomgr/resolve_address_test.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/endpoint/resolve_address.h"
+#include "src/core/iomgr/resolve_address.h"
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
diff --git a/test/core/endpoint/socket_utils_test.c b/test/core/iomgr/sockaddr_utils_test.c
index ef6ac32c22..8cd9fb660f 100644
--- a/test/core/endpoint/socket_utils_test.c
+++ b/test/core/iomgr/sockaddr_utils_test.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/endpoint/socket_utils.h"
+#include "src/core/iomgr/sockaddr_utils.h"
#include <errno.h>
#include <netinet/in.h>
diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c
new file mode 100644
index 0000000000..cb1cd0bc16
--- /dev/null
+++ b/test/core/iomgr/tcp_client_posix_test.c
@@ -0,0 +1,176 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/tcp_client.h"
+
+#include <errno.h>
+#include <netinet/in.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include "src/core/iomgr/iomgr.h"
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+
+static gpr_timespec test_deadline() {
+ return gpr_time_add(gpr_now(), gpr_time_from_micros(1000000));
+}
+
+static void must_succeed(void *arg, grpc_endpoint *tcp) {
+ GPR_ASSERT(tcp);
+ grpc_endpoint_shutdown(tcp);
+ grpc_endpoint_destroy(tcp);
+ gpr_event_set(arg, (void *)1);
+}
+
+static void must_fail(void *arg, grpc_endpoint *tcp) {
+ GPR_ASSERT(!tcp);
+ gpr_event_set(arg, (void *)1);
+}
+
+void test_succeeds() {
+ struct sockaddr_in addr;
+ socklen_t addr_len = sizeof(addr);
+ int svr_fd;
+ int r;
+ gpr_event ev;
+
+ gpr_event_init(&ev);
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+
+ /* create a dummy server */
+ svr_fd = socket(AF_INET, SOCK_STREAM, 0);
+ GPR_ASSERT(svr_fd >= 0);
+ GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len));
+ GPR_ASSERT(0 == listen(svr_fd, 1));
+
+ /* connect to it */
+ GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0);
+ grpc_tcp_client_connect(must_succeed, &ev, (struct sockaddr *)&addr, addr_len,
+ gpr_inf_future);
+
+ /* await the connection */
+ do {
+ addr_len = sizeof(addr);
+ r = accept(svr_fd, (struct sockaddr *)&addr, &addr_len);
+ } while (r == -1 && errno == EINTR);
+ GPR_ASSERT(r >= 0);
+ close(r);
+
+ /* wait for the connection callback to finish */
+ GPR_ASSERT(gpr_event_wait(&ev, test_deadline()));
+}
+
+void test_fails() {
+ struct sockaddr_in addr;
+ socklen_t addr_len = sizeof(addr);
+ gpr_event ev;
+
+ gpr_event_init(&ev);
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+
+ /* connect to a broken address */
+ grpc_tcp_client_connect(must_fail, &ev, (struct sockaddr *)&addr, addr_len,
+ gpr_inf_future);
+
+ /* wait for the connection callback to finish */
+ GPR_ASSERT(gpr_event_wait(&ev, test_deadline()));
+}
+
+void test_times_out() {
+ struct sockaddr_in addr;
+ socklen_t addr_len = sizeof(addr);
+ int svr_fd;
+#define NUM_CLIENT_CONNECTS 10
+ int client_fd[NUM_CLIENT_CONNECTS];
+ int i;
+ int r;
+ gpr_event ev;
+ gpr_timespec connect_deadline;
+
+ gpr_event_init(&ev);
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sin_family = AF_INET;
+
+ /* create a dummy server */
+ svr_fd = socket(AF_INET, SOCK_STREAM, 0);
+ GPR_ASSERT(svr_fd >= 0);
+ GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len));
+ GPR_ASSERT(0 == listen(svr_fd, 1));
+ /* Get its address */
+ GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0);
+
+ /* tie up the listen buffer, which is somewhat arbitrarily sized. */
+ for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) {
+ client_fd[i] = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
+ do {
+ r = connect(client_fd[i], (struct sockaddr *)&addr, addr_len);
+ } while (r == -1 && errno == EINTR);
+ GPR_ASSERT(r < 0);
+ GPR_ASSERT(errno == EWOULDBLOCK || errno == EINPROGRESS);
+ }
+
+ /* connect to dummy server address */
+
+ connect_deadline = gpr_time_add(gpr_now(), gpr_time_from_micros(1000000));
+
+ grpc_tcp_client_connect(must_fail, &ev, (struct sockaddr *)&addr, addr_len,
+ connect_deadline);
+ /* Make sure the event doesn't trigger early */
+ GPR_ASSERT(!gpr_event_wait(
+ &ev, gpr_time_add(gpr_now(), gpr_time_from_micros(500000))));
+ /* Now wait until it should have triggered */
+ sleep(1);
+
+ /* wait for the connection callback to finish */
+ GPR_ASSERT(gpr_event_wait(&ev, test_deadline()));
+ close(svr_fd);
+ for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) {
+ close(client_fd[i]);
+ }
+}
+
+int main(void) {
+ grpc_iomgr_init();
+ test_succeeds();
+ test_fails();
+ test_times_out();
+ grpc_iomgr_shutdown();
+ return 0;
+}
diff --git a/test/core/endpoint/tcp_test.c b/test/core/iomgr/tcp_posix_test.c
index c703f92f68..52856b6b8a 100644
--- a/test/core/endpoint/tcp_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/endpoint/tcp.h"
+#include "src/core/iomgr/tcp_posix.h"
#include <errno.h>
#include <fcntl.h>
@@ -41,7 +41,6 @@
#include <sys/socket.h>
#include <unistd.h>
-#include "src/core/eventmanager/em.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
@@ -65,8 +64,6 @@
*/
-grpc_em g_em;
-
static void create_sockets(int sv[2]) {
int flags;
GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
@@ -165,7 +162,6 @@ static void read_cb(void *user_data, gpr_slice *slices, size_t nslices,
/* Write to a socket, then read from it using the grpc_tcp API. */
static void read_test(ssize_t num_bytes, ssize_t slice_size) {
int sv[2];
- grpc_em em;
grpc_endpoint *ep;
struct read_socket_state state;
ssize_t written_bytes;
@@ -176,9 +172,8 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
slice_size);
create_sockets(sv);
- grpc_em_init(&em);
- ep = grpc_tcp_create_dbg(sv[1], &em, slice_size);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size);
written_bytes = fill_socket_partial(sv[0], num_bytes);
gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
@@ -202,7 +197,6 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
grpc_endpoint_destroy(ep);
- grpc_em_destroy(&em);
gpr_mu_destroy(&state.mu);
gpr_cv_destroy(&state.cv);
}
@@ -211,7 +205,6 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
API. */
static void large_read_test(ssize_t slice_size) {
int sv[2];
- grpc_em em;
grpc_endpoint *ep;
struct read_socket_state state;
ssize_t written_bytes;
@@ -221,9 +214,8 @@ static void large_read_test(ssize_t slice_size) {
gpr_log(GPR_INFO, "Start large read test, slice size %d", slice_size);
create_sockets(sv);
- grpc_em_init(&em);
- ep = grpc_tcp_create_dbg(sv[1], &em, slice_size);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size);
written_bytes = fill_socket(sv[0]);
gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
@@ -247,7 +239,6 @@ static void large_read_test(ssize_t slice_size) {
grpc_endpoint_destroy(ep);
- grpc_em_destroy(&em);
gpr_mu_destroy(&state.mu);
gpr_cv_destroy(&state.cv);
}
@@ -349,7 +340,6 @@ static ssize_t drain_socket(int fd) {
socket in parallel with the read. */
static void write_test(ssize_t num_bytes, ssize_t slice_size) {
int sv[2];
- grpc_em em;
grpc_endpoint *ep;
struct write_socket_state state;
ssize_t read_bytes;
@@ -363,9 +353,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
slice_size);
create_sockets(sv);
- grpc_em_init(&em);
- ep = grpc_tcp_create(sv[1], &em);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
gpr_mu_init(&state.mu);
gpr_cv_init(&state.cv);
@@ -392,7 +381,6 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
}
grpc_endpoint_destroy(ep);
- grpc_em_destroy(&em);
gpr_mu_destroy(&state.mu);
gpr_cv_destroy(&state.cv);
gpr_free(slices);
@@ -410,7 +398,6 @@ static void read_done_for_write_error(void *ud, gpr_slice *slices,
socket in parallel with the read. */
static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
int sv[2];
- grpc_em em;
grpc_endpoint *ep;
struct write_socket_state state;
size_t num_blocks;
@@ -423,9 +410,8 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
num_bytes, slice_size);
create_sockets(sv);
- grpc_em_init(&em);
- ep = grpc_tcp_create(sv[1], &em);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1]), GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
close(sv[0]);
gpr_mu_init(&state.mu);
@@ -456,7 +442,6 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
}
grpc_endpoint_destroy(ep);
- grpc_em_destroy(&em);
gpr_mu_destroy(&state.mu);
gpr_cv_destroy(&state.cv);
free(slices);
@@ -487,7 +472,7 @@ void run_tests() {
}
}
-static void clean_up() { grpc_em_destroy(&g_em); }
+static void clean_up() {}
static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
size_t slice_size) {
@@ -495,9 +480,8 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
grpc_endpoint_test_fixture f;
create_sockets(sv);
- grpc_em_init(&g_em);
- f.client_ep = grpc_tcp_create_dbg(sv[0], &g_em, slice_size);
- f.server_ep = grpc_tcp_create(sv[1], &g_em);
+ f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0]), slice_size);
+ f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1]), slice_size);
return f;
}
@@ -508,10 +492,12 @@ static grpc_endpoint_test_config configs[] = {
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
+ grpc_iomgr_init();
/* disable SIGPIPE */
signal(SIGPIPE, SIG_IGN);
run_tests();
grpc_endpoint_tests(configs[0]);
+ grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/endpoint/tcp_server_test.c b/test/core/iomgr/tcp_server_posix_test.c
index 62089152d0..cb77a88062 100644
--- a/test/core/endpoint/tcp_server_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -31,8 +31,8 @@
*
*/
-#include "src/core/endpoint/tcp_server.h"
-#include "src/core/eventmanager/em.h"
+#include "src/core/iomgr/tcp_server.h"
+#include "src/core/iomgr/iomgr.h"
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
@@ -44,8 +44,6 @@
#define LOG_TEST() gpr_log(GPR_INFO, "%s", __FUNCTION__)
-static grpc_em em;
-
static gpr_mu mu;
static gpr_cv cv;
static int nconnects = 0;
@@ -61,12 +59,12 @@ static void on_connect(void *arg, grpc_endpoint *tcp) {
}
static void test_no_op() {
- grpc_tcp_server *s = grpc_tcp_server_create(&em);
+ grpc_tcp_server *s = grpc_tcp_server_create();
grpc_tcp_server_destroy(s);
}
static void test_no_op_with_start() {
- grpc_tcp_server *s = grpc_tcp_server_create(&em);
+ grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST();
grpc_tcp_server_start(s, on_connect, NULL);
grpc_tcp_server_destroy(s);
@@ -74,7 +72,7 @@ static void test_no_op_with_start() {
static void test_no_op_with_port() {
struct sockaddr_in addr;
- grpc_tcp_server *s = grpc_tcp_server_create(&em);
+ grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST();
memset(&addr, 0, sizeof(addr));
@@ -87,7 +85,7 @@ static void test_no_op_with_port() {
static void test_no_op_with_port_and_start() {
struct sockaddr_in addr;
- grpc_tcp_server *s = grpc_tcp_server_create(&em);
+ grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST();
memset(&addr, 0, sizeof(addr));
@@ -104,7 +102,7 @@ static void test_connect(int n) {
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr);
int svrfd, clifd;
- grpc_tcp_server *s = grpc_tcp_server_create(&em);
+ grpc_tcp_server *s = grpc_tcp_server_create();
int nconnects_before;
gpr_timespec deadline;
int i;
@@ -151,7 +149,7 @@ static void test_connect(int n) {
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
- grpc_em_init(&em);
+ grpc_iomgr_init();
gpr_mu_init(&mu);
gpr_cv_init(&cv);
@@ -162,7 +160,7 @@ int main(int argc, char **argv) {
test_connect(1);
test_connect(10);
- grpc_em_destroy(&em);
+ grpc_iomgr_shutdown();
gpr_mu_destroy(&mu);
gpr_cv_destroy(&cv);
return 0;
diff --git a/test/core/network_benchmarks/low_level_ping_pong.c b/test/core/network_benchmarks/low_level_ping_pong.c
index 93c66a9ecb..543fb27ec1 100644
--- a/test/core/network_benchmarks/low_level_ping_pong.c
+++ b/test/core/network_benchmarks/low_level_ping_pong.c
@@ -49,7 +49,7 @@
#endif
#include <sys/socket.h>
-#include "src/core/endpoint/socket_utils.h"
+#include "src/core/iomgr/socket_utils_posix.h"
#include <grpc/support/cmdline.h>
#include <grpc/support/histogram.h>
#include <grpc/support/log.h>
diff --git a/test/core/surface/completion_queue_test.c b/test/core/surface/completion_queue_test.c
index 6df159f697..77decdacea 100644
--- a/test/core/surface/completion_queue_test.c
+++ b/test/core/surface/completion_queue_test.c
@@ -33,12 +33,12 @@
#include "src/core/surface/completion_queue.h"
+#include "src/core/iomgr/iomgr.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/thd.h>
#include <grpc/support/time.h>
#include <grpc/support/useful.h>
-#include "src/core/surface/surface_em.h"
#include "test/core/util/test_config.h"
#define LOG_TEST() gpr_log(GPR_INFO, "%s", __FUNCTION__)
@@ -417,7 +417,7 @@ static void test_threading(int producers, int consumers) {
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
- grpc_surface_em_init();
+ grpc_iomgr_init();
test_no_op();
test_wait_empty();
test_cq_end_read();
@@ -430,6 +430,6 @@ int main(int argc, char **argv) {
test_threading(1, 10);
test_threading(10, 1);
test_threading(10, 10);
- grpc_surface_em_shutdown();
+ grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/transport/chttp2_transport_end2end_test.c b/test/core/transport/chttp2_transport_end2end_test.c
index 4a16789fbf..30d2a17440 100644
--- a/test/core/transport/chttp2_transport_end2end_test.c
+++ b/test/core/transport/chttp2_transport_end2end_test.c
@@ -42,38 +42,23 @@
#include <unistd.h>
#include "test/core/util/test_config.h"
-#include "src/core/eventmanager/em.h"
+#include "src/core/iomgr/iomgr.h"
+#include "src/core/iomgr/endpoint_pair.h"
#include "src/core/transport/chttp2_transport.h"
#include <grpc/support/log.h>
-static grpc_em em;
-
-static void create_sockets(int sv[2]) {
- int flags;
- GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
- flags = fcntl(sv[0], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
- flags = fcntl(sv[1], F_GETFL, 0);
- GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
-}
-
/* Wrapper to create an http2 transport pair */
static int create_http2_transport_for_test(
grpc_transport_setup_callback client_setup_transport,
void *client_setup_arg,
grpc_transport_setup_callback server_setup_transport,
void *server_setup_arg, size_t slice_size, grpc_mdctx *mdctx) {
- int sv[2];
- grpc_endpoint *svr_ep, *cli_ep;
-
- create_sockets(sv);
- svr_ep = grpc_tcp_create_dbg(sv[1], &em, slice_size);
- cli_ep = grpc_tcp_create_dbg(sv[0], &em, slice_size);
+ grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair(1);
grpc_create_chttp2_transport(client_setup_transport, client_setup_arg, NULL,
- cli_ep, NULL, 0, mdctx, 1);
+ p.client, NULL, 0, mdctx, 1);
grpc_create_chttp2_transport(server_setup_transport, server_setup_arg, NULL,
- svr_ep, NULL, 0, mdctx, 0);
+ p.server, NULL, 0, mdctx, 0);
return 0;
}
@@ -126,13 +111,13 @@ int main(int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);
grpc_test_init(argc, argv);
- grpc_em_init(&em);
+ grpc_iomgr_init();
for (i = 0; i < sizeof(fixture_configs) / sizeof(*fixture_configs); i++) {
grpc_transport_end2end_tests(&fixture_configs[i]);
}
- grpc_em_destroy(&em);
+ grpc_iomgr_shutdown();
gpr_log(GPR_INFO, "exiting");
return 0;