diff options
Diffstat (limited to 'test/core/iomgr/fd_posix_test.cc')
-rw-r--r-- | test/core/iomgr/fd_posix_test.cc | 537 |
1 files changed, 537 insertions, 0 deletions
diff --git a/test/core/iomgr/fd_posix_test.cc b/test/core/iomgr/fd_posix_test.cc new file mode 100644 index 0000000000..9bf16923cb --- /dev/null +++ b/test/core/iomgr/fd_posix_test.cc @@ -0,0 +1,537 @@ +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +// This test won't work except with posix sockets enabled +#ifdef GRPC_POSIX_SOCKET + +#include "src/core/lib/iomgr/ev_posix.h" + +#include <ctype.h> +#include <errno.h> +#include <fcntl.h> +#include <netinet/in.h> +#include <poll.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/time.h> +#include <unistd.h> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/sync.h> +#include <grpc/support/time.h> + +#include "src/core/lib/iomgr/ev_posix.h" +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/iomgr/socket_utils_posix.h" +#include "test/core/util/test_config.h" + +static gpr_mu* g_mu; +static grpc_pollset* g_pollset; + +/* buffer size used to send and receive data. + 1024 is the minimal value to set TCP send and receive buffer. */ +#define BUF_SIZE 1024 + +/* Create a test socket with the right properties for testing. + port is the TCP port to listen or connect to. + Return a socket FD and sockaddr_in. */ +static void create_test_socket(int port, int* socket_fd, + struct sockaddr_in* sin) { + int fd; + int one = 1; + int buffer_size_bytes = BUF_SIZE; + int flags; + + fd = socket(AF_INET, SOCK_STREAM, 0); + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); + /* Reset the size of socket send buffer to the minimal value to facilitate + buffer filling up and triggering notify_on_write */ + GPR_ASSERT(grpc_set_socket_sndbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE); + GPR_ASSERT(grpc_set_socket_rcvbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE); + /* Make fd non-blocking */ + flags = fcntl(fd, F_GETFL, 0); + GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); + *socket_fd = fd; + + /* Use local address for test */ + sin->sin_family = AF_INET; + sin->sin_addr.s_addr = htonl(0x7f000001); + GPR_ASSERT(port >= 0 && port < 65536); + sin->sin_port = htons((uint16_t)port); +} + +/* Dummy gRPC callback */ +void no_op_cb(void* arg, int success) {} + +/* =======An upload server to test notify_on_read=========== + The server simply reads and counts a stream of bytes. */ + +/* An upload server. */ +typedef struct { + grpc_fd* em_fd; /* listening fd */ + ssize_t read_bytes_total; /* total number of received bytes */ + int done; /* set to 1 when a server finishes serving */ + grpc_closure listen_closure; +} server; + +static void server_init(server* sv) { + sv->read_bytes_total = 0; + sv->done = 0; +} + +/* An upload session. + Created when a new upload request arrives in the server. */ +typedef struct { + server* sv; /* not owned by a single session */ + grpc_fd* em_fd; /* fd to read upload bytes */ + char read_buf[BUF_SIZE]; /* buffer to store upload bytes */ + grpc_closure session_read_closure; +} session; + +/* Called when an upload session can be safely shutdown. + Close session FD and start to shutdown listen FD. */ +static void session_shutdown_cb(void* arg, /*session */ + bool success) { + session* se = static_cast<session*>(arg); + server* sv = se->sv; + grpc_fd_orphan(se->em_fd, NULL, NULL, false /* already_closed */, "a"); + gpr_free(se); + /* Start to shutdown listen fd. */ + grpc_fd_shutdown(sv->em_fd, + GRPC_ERROR_CREATE_FROM_STATIC_STRING("session_shutdown_cb")); +} + +/* Called when data become readable in a session. */ +static void session_read_cb(void* arg, /*session */ + grpc_error* error) { + session* se = static_cast<session*>(arg); + int fd = grpc_fd_wrapped_fd(se->em_fd); + + ssize_t read_once = 0; + ssize_t read_total = 0; + + if (error != GRPC_ERROR_NONE) { + session_shutdown_cb(arg, 1); + return; + } + + do { + read_once = read(fd, se->read_buf, BUF_SIZE); + if (read_once > 0) read_total += read_once; + } while (read_once > 0); + se->sv->read_bytes_total += read_total; + + /* read() returns 0 to indicate the TCP connection was closed by the client. + read(fd, read_buf, 0) also returns 0 which should never be called as such. + It is possible to read nothing due to spurious edge event or data has + been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ + if (read_once == 0) { + session_shutdown_cb(arg, 1); + } else if (read_once == -1) { + if (errno == EAGAIN) { + /* An edge triggered event is cached in the kernel until next poll. + In the current single thread implementation, session_read_cb is called + in the polling thread, such that polling only happens after this + callback, and will catch read edge event if data is available again + before notify_on_read. + TODO(chenw): in multi-threaded version, callback and polling can be + run in different threads. polling may catch a persist read edge event + before notify_on_read is called. */ + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + } else { + gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); + abort(); + } + } +} + +/* Called when the listen FD can be safely shutdown. + Close listen FD and signal that server can be shutdown. */ +static void listen_shutdown_cb(void* arg /*server */, int success) { + server* sv = static_cast<server*>(arg); + + grpc_fd_orphan(sv->em_fd, NULL, NULL, false /* already_closed */, "b"); + + gpr_mu_lock(g_mu); + sv->done = 1; + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); + gpr_mu_unlock(g_mu); +} + +/* Called when a new TCP connection request arrives in the listening port. */ +static void listen_cb(void* arg, /*=sv_arg*/ + grpc_error* error) { + server* sv = static_cast<server*>(arg); + int fd; + int flags; + session* se; + struct sockaddr_storage ss; + socklen_t slen = sizeof(ss); + grpc_fd* listen_em_fd = sv->em_fd; + + if (error != GRPC_ERROR_NONE) { + listen_shutdown_cb(arg, 1); + return; + } + + fd = accept(grpc_fd_wrapped_fd(listen_em_fd), (struct sockaddr*)&ss, &slen); + GPR_ASSERT(fd >= 0); + GPR_ASSERT(fd < FD_SETSIZE); + flags = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + se = static_cast<session*>(gpr_malloc(sizeof(*se))); + se->sv = sv; + se->em_fd = grpc_fd_create(fd, "listener"); + grpc_pollset_add_fd(g_pollset, se->em_fd); + GRPC_CLOSURE_INIT(&se->session_read_closure, session_read_cb, se, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + + grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure); +} + +/* Max number of connections pending to be accepted by listen(). */ +#define MAX_NUM_FD 1024 + +/* Start a test server, return the TCP listening port bound to listen_fd. + listen_cb() is registered to be interested in reading from listen_fd. + When connection request arrives, listen_cb() is called to accept the + connection request. */ +static int server_start(server* sv) { + int port = 0; + int fd; + struct sockaddr_in sin; + socklen_t addr_len; + + create_test_socket(port, &fd, &sin); + addr_len = sizeof(sin); + GPR_ASSERT(bind(fd, (struct sockaddr*)&sin, addr_len) == 0); + GPR_ASSERT(getsockname(fd, (struct sockaddr*)&sin, &addr_len) == 0); + port = ntohs(sin.sin_port); + GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); + + sv->em_fd = grpc_fd_create(fd, "server"); + grpc_pollset_add_fd(g_pollset, sv->em_fd); + /* Register to be interested in reading from listen_fd. */ + GRPC_CLOSURE_INIT(&sv->listen_closure, listen_cb, sv, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure); + + return port; +} + +/* Wait and shutdown a sever. */ +static void server_wait_and_shutdown(server* sv) { + gpr_mu_lock(g_mu); + while (!sv->done) { + ExecCtx _local_exec_ctx; + grpc_pollset_worker* worker = NULL; + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); + gpr_mu_unlock(g_mu); + grpc_exec_ctx_finish(); + gpr_mu_lock(g_mu); + } + gpr_mu_unlock(g_mu); +} + +/* ===An upload client to test notify_on_write=== */ + +/* Client write buffer size */ +#define CLIENT_WRITE_BUF_SIZE 10 +/* Total number of times that the client fills up the write buffer */ +#define CLIENT_TOTAL_WRITE_CNT 3 + +/* An upload client. */ +typedef struct { + grpc_fd* em_fd; + char write_buf[CLIENT_WRITE_BUF_SIZE]; + ssize_t write_bytes_total; + /* Number of times that the client fills up the write buffer and calls + notify_on_write to schedule another write. */ + int client_write_cnt; + + int done; /* set to 1 when a client finishes sending */ + grpc_closure write_closure; +} client; + +static void client_init(client* cl) { + memset(cl->write_buf, 0, sizeof(cl->write_buf)); + cl->write_bytes_total = 0; + cl->client_write_cnt = 0; + cl->done = 0; +} + +/* Called when a client upload session is ready to shutdown. */ +static void client_session_shutdown_cb(void* arg /*client */, int success) { + client* cl = static_cast<client*>(arg); + grpc_fd_orphan(cl->em_fd, NULL, NULL, false /* already_closed */, "c"); + cl->done = 1; + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); +} + +/* Write as much as possible, then register notify_on_write. */ +static void client_session_write(void* arg, /*client */ + grpc_error* error) { + client* cl = static_cast<client*>(arg); + int fd = grpc_fd_wrapped_fd(cl->em_fd); + ssize_t write_once = 0; + + if (error != GRPC_ERROR_NONE) { + gpr_mu_lock(g_mu); + client_session_shutdown_cb(arg, 1); + gpr_mu_unlock(g_mu); + return; + } + + do { + write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE); + if (write_once > 0) cl->write_bytes_total += write_once; + } while (write_once > 0); + + if (errno == EAGAIN) { + gpr_mu_lock(g_mu); + if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { + GRPC_CLOSURE_INIT(&cl->write_closure, client_session_write, cl, + grpc_schedule_on_exec_ctx); + grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure); + cl->client_write_cnt++; + } else { + client_session_shutdown_cb(arg, 1); + } + gpr_mu_unlock(g_mu); + } else { + gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno)); + abort(); + } +} + +/* Start a client to send a stream of bytes. */ +static void client_start(client* cl, int port) { + int fd; + struct sockaddr_in sin; + create_test_socket(port, &fd, &sin); + if (connect(fd, (struct sockaddr*)&sin, sizeof(sin)) == -1) { + if (errno == EINPROGRESS) { + struct pollfd pfd; + pfd.fd = fd; + pfd.events = POLLOUT; + pfd.revents = 0; + if (poll(&pfd, 1, -1) == -1) { + gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno); + abort(); + } + } else { + gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno); + abort(); + } + } + + cl->em_fd = grpc_fd_create(fd, "client"); + grpc_pollset_add_fd(g_pollset, cl->em_fd); + + client_session_write(cl, GRPC_ERROR_NONE); +} + +/* Wait for the signal to shutdown a client. */ +static void client_wait_and_shutdown(client* cl) { + gpr_mu_lock(g_mu); + while (!cl->done) { + grpc_pollset_worker* worker = NULL; + ExecCtx _local_exec_ctx; + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); + gpr_mu_unlock(g_mu); + grpc_exec_ctx_finish(); + gpr_mu_lock(g_mu); + } + gpr_mu_unlock(g_mu); +} + +/* Test grpc_fd. Start an upload server and client, upload a stream of + bytes from the client to the server, and verify that the total number of + sent bytes is equal to the total number of received bytes. */ +static void test_grpc_fd(void) { + server sv; + client cl; + int port; + ExecCtx _local_exec_ctx; + + server_init(&sv); + port = server_start(&sv); + client_init(&cl); + client_start(&cl, port); + grpc_exec_ctx_finish(); + client_wait_and_shutdown(&cl); + server_wait_and_shutdown(&sv); + GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total); + gpr_log(GPR_INFO, "Total read bytes %" PRIdPTR, sv.read_bytes_total); +} + +typedef struct fd_change_data { + grpc_iomgr_cb_func cb_that_ran; +} fd_change_data; + +void init_change_data(fd_change_data* fdc) { fdc->cb_that_ran = NULL; } + +void destroy_change_data(fd_change_data* fdc) {} + +static void first_read_callback(void* arg /* fd_change_data */, + grpc_error* error) { + fd_change_data* fdc = static_cast<fd_change_data*>(arg); + + gpr_mu_lock(g_mu); + fdc->cb_that_ran = first_read_callback; + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); + gpr_mu_unlock(g_mu); +} + +static void second_read_callback(void* arg /* fd_change_data */, + grpc_error* error) { + fd_change_data* fdc = static_cast<fd_change_data*>(arg); + + gpr_mu_lock(g_mu); + fdc->cb_that_ran = second_read_callback; + GPR_ASSERT( + GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL))); + gpr_mu_unlock(g_mu); +} + +/* Test that changing the callback we use for notify_on_read actually works. + Note that we have two different but almost identical callbacks above -- the + point is to have two different function pointers and two different data + pointers and make sure that changing both really works. */ +static void test_grpc_fd_change(void) { + grpc_fd* em_fd; + fd_change_data a, b; + int flags; + int sv[2]; + char data; + ssize_t result; + grpc_closure first_closure; + grpc_closure second_closure; + ExecCtx _local_exec_ctx; + + GRPC_CLOSURE_INIT(&first_closure, first_read_callback, &a, + grpc_schedule_on_exec_ctx); + GRPC_CLOSURE_INIT(&second_closure, second_read_callback, &b, + grpc_schedule_on_exec_ctx); + + init_change_data(&a); + init_change_data(&b); + + GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0); + flags = fcntl(sv[0], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); + flags = fcntl(sv[1], F_GETFL, 0); + GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); + + em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); + grpc_pollset_add_fd(g_pollset, em_fd); + + /* Register the first callback, then make its FD readable */ + grpc_fd_notify_on_read(em_fd, &first_closure); + data = 0; + result = write(sv[1], &data, 1); + GPR_ASSERT(result == 1); + + /* And now wait for it to run. */ + gpr_mu_lock(g_mu); + while (a.cb_that_ran == NULL) { + grpc_pollset_worker* worker = NULL; + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); + gpr_mu_unlock(g_mu); + grpc_exec_ctx_finish(); + gpr_mu_lock(g_mu); + } + GPR_ASSERT(a.cb_that_ran == first_read_callback); + gpr_mu_unlock(g_mu); + + /* And drain the socket so we can generate a new read edge */ + result = read(sv[0], &data, 1); + GPR_ASSERT(result == 1); + + /* Now register a second callback with distinct change data, and do the same + thing again. */ + grpc_fd_notify_on_read(em_fd, &second_closure); + data = 0; + result = write(sv[1], &data, 1); + GPR_ASSERT(result == 1); + + gpr_mu_lock(g_mu); + while (b.cb_that_ran == NULL) { + grpc_pollset_worker* worker = NULL; + GPR_ASSERT(GRPC_LOG_IF_ERROR( + "pollset_work", + grpc_pollset_work(g_pollset, &worker, GRPC_MILLIS_INF_FUTURE))); + gpr_mu_unlock(g_mu); + grpc_exec_ctx_finish(); + gpr_mu_lock(g_mu); + } + /* Except now we verify that second_read_callback ran instead */ + GPR_ASSERT(b.cb_that_ran == second_read_callback); + gpr_mu_unlock(g_mu); + + grpc_fd_orphan(em_fd, NULL, NULL, false /* already_closed */, "d"); + grpc_exec_ctx_finish(); + destroy_change_data(&a); + destroy_change_data(&b); + close(sv[1]); +} + +static void destroy_pollset(void* p, grpc_error* error) { + grpc_pollset_destroy(static_cast<grpc_pollset*>(p)); +} + +int main(int argc, char** argv) { + grpc_closure destroyed; + ExecCtx _local_exec_ctx; + grpc_test_init(argc, argv); + grpc_init(); + g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size())); + grpc_pollset_init(g_pollset, &g_mu); + test_grpc_fd(); + test_grpc_fd_change(); + GRPC_CLOSURE_INIT(&destroyed, destroy_pollset, g_pollset, + grpc_schedule_on_exec_ctx); + grpc_pollset_shutdown(g_pollset, &destroyed); + grpc_exec_ctx_flush(); + gpr_free(g_pollset); + grpc_exec_ctx_finish(); + grpc_shutdown(); + return 0; +} + +#else /* GRPC_POSIX_SOCKET */ + +int main(int argc, char** argv) { return 1; } + +#endif /* GRPC_POSIX_SOCKET */ |