diff options
Diffstat (limited to 'test/core/network_benchmarks/low_level_ping_pong.c')
-rw-r--r-- | test/core/network_benchmarks/low_level_ping_pong.c | 626 |
1 files changed, 626 insertions, 0 deletions
diff --git a/test/core/network_benchmarks/low_level_ping_pong.c b/test/core/network_benchmarks/low_level_ping_pong.c new file mode 100644 index 0000000000..93c66a9ecb --- /dev/null +++ b/test/core/network_benchmarks/low_level_ping_pong.c @@ -0,0 +1,626 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/* + Basic I/O ping-pong benchmarks. + + The goal here is to establish lower bounds on how fast the stack could get by + measuring the cost of using various I/O strategies to do a basic + request-response loop. + */ + +#include <errno.h> +#include <netinet/ip.h> +#include <poll.h> +#include <stdio.h> +#include <string.h> +#ifdef __linux__ +#include <sys/epoll.h> +#endif +#include <sys/socket.h> + +#include "src/core/endpoint/socket_utils.h" +#include <grpc/support/cmdline.h> +#include <grpc/support/histogram.h> +#include <grpc/support/log.h> +#include <grpc/support/thd.h> +#include <grpc/support/time.h> + +typedef struct fd_pair { + int read_fd; + int write_fd; +} fd_pair; + +typedef struct thread_args { + fd_pair fds; + size_t msg_size; + int (*read_bytes)(struct thread_args *args, char *buf); + int (*write_bytes)(struct thread_args *args, char *buf); + int (*setup)(struct thread_args *args); + int epoll_fd; +} thread_args; + +/* + Read strategies + + There are a number of read strategies, each of which has a blocking and + non-blocking version. + */ + +/* Basic call to read() */ +static int read_bytes(int fd, char *buf, size_t read_size, int spin) { + int bytes_read = 0; + int err; + do { + err = read(fd, buf + bytes_read, read_size - bytes_read); + if (err < 0) { + if (errno == EINTR) { + continue; + } else { + if (errno == EAGAIN && spin == 1) { + continue; + } + gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno)); + return -1; + } + } else { + bytes_read += err; + } + } while (bytes_read < read_size); + return 0; +} + +static int blocking_read_bytes(thread_args *args, char *buf) { + return read_bytes(args->fds.read_fd, buf, args->msg_size, 0); +} + +static int spin_read_bytes(thread_args *args, char *buf) { + return read_bytes(args->fds.read_fd, buf, args->msg_size, 1); +} + +/* Call poll() to monitor a non-blocking fd */ +static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) { + struct pollfd pfd; + size_t bytes_read = 0; + int err; + + pfd.fd = fd; + pfd.events = POLLIN; + do { + err = poll(&pfd, 1, spin ? 0 : -1); + if (err < 0) { + if (errno == EINTR) { + continue; + } else { + gpr_log(GPR_ERROR, "Poll failed: %s", strerror(errno)); + return -1; + } + } + if (err == 0 && spin) continue; + GPR_ASSERT(err == 1); + GPR_ASSERT(pfd.revents == POLLIN); + do { + err = read(fd, buf + bytes_read, read_size - bytes_read); + } while (err < 0 && errno == EINTR); + if (err < 0 && errno != EAGAIN) { + gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno)); + return -1; + } + bytes_read += err; + } while (bytes_read < read_size); + return 0; +} + +static int poll_read_bytes_blocking(struct thread_args *args, char *buf) { + return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 0); +} + +static int poll_read_bytes_spin(struct thread_args *args, char *buf) { + return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 1); +} + +#ifdef __linux__ +/* Call epoll_wait() to monitor a non-blocking fd */ +static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) { + struct epoll_event ev; + size_t bytes_read = 0; + int err; + size_t read_size = args->msg_size; + + do { + err = epoll_wait(args->epoll_fd, &ev, 1, spin ? 0 : -1); + if (err < 0) { + if (errno == EINTR) continue; + gpr_log(GPR_ERROR, "epoll_wait failed: %s", strerror(errno)); + return -1; + } + if (err == 0 && spin) continue; + GPR_ASSERT(err == 1); + GPR_ASSERT(ev.events & EPOLLIN); + GPR_ASSERT(ev.data.fd == args->fds.read_fd); + do { + do { + err = read(args->fds.read_fd, buf + bytes_read, read_size - bytes_read); + } while (err < 0 && errno == EINTR); + if (errno == EAGAIN) break; + bytes_read += err; + /* TODO(klempner): This should really be doing an extra call after we are + done to ensure we see an EAGAIN */ + } while (bytes_read < read_size); + } while (bytes_read < read_size); + GPR_ASSERT(bytes_read == read_size); + return 0; +} + +static int epoll_read_bytes_blocking(struct thread_args *args, char *buf) { + return epoll_read_bytes(args, buf, 0); +} + +static int epoll_read_bytes_spin(struct thread_args *args, char *buf) { + return epoll_read_bytes(args, buf, 1); +} +#endif /* __linux__ */ + +/* Write out bytes. + At this point we only have one strategy, since in the common case these + writes go directly out to the kernel. + */ +static int blocking_write_bytes(struct thread_args *args, char *buf) { + int bytes_written = 0; + int err; + size_t write_size = args->msg_size; + do { + err = write(args->fds.write_fd, buf + bytes_written, + write_size - bytes_written); + if (err < 0) { + if (errno == EINTR) { + continue; + } else { + gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno)); + return -1; + } + } else { + bytes_written += err; + } + } while (bytes_written < write_size); + return 0; +} + +/* + Initialization code + + These are called at the beginning of the client and server thread, depending + on the scenario we're using. + */ +static int set_socket_nonblocking(thread_args *args) { + if (!grpc_set_socket_nonblocking(args->fds.read_fd, 1)) { + gpr_log(GPR_ERROR, "Unable to set socket nonblocking: %s", strerror(errno)); + return -1; + } + if (!grpc_set_socket_nonblocking(args->fds.write_fd, 1)) { + gpr_log(GPR_ERROR, "Unable to set socket nonblocking: %s", strerror(errno)); + return -1; + } + return 0; +} + +static int do_nothing(thread_args *args) { return 0; } + +/* Special case for epoll, where we need to create the fd ahead of time. */ +static int epoll_setup(thread_args *args) { + int epoll_fd; + struct epoll_event ev; + set_socket_nonblocking(args); + epoll_fd = epoll_create(1); + if (epoll_fd < 0) { + gpr_log(GPR_ERROR, "epoll_create: %s", strerror(errno)); + return -1; + } + + args->epoll_fd = epoll_fd; + + ev.events = EPOLLIN | EPOLLET; + ev.data.fd = args->fds.read_fd; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, args->fds.read_fd, &ev) < 0) { + gpr_log(GPR_ERROR, "epoll_ctl: %s", strerror(errno)); + } + return 0; +} + +static void server_thread(thread_args *args) { + char *buf = malloc(args->msg_size); + if (args->setup(args) < 0) { + gpr_log(GPR_ERROR, "Setup failed"); + } + for (;;) { + if (args->read_bytes(args, buf) < 0) { + gpr_log(GPR_ERROR, "Server read failed"); + free(buf); + return; + } + if (args->write_bytes(args, buf) < 0) { + gpr_log(GPR_ERROR, "Server write failed"); + free(buf); + return; + } + } +} + +static void server_thread_wrap(void *arg) { + thread_args *args = arg; + server_thread(args); +} + +static void print_histogram(gpr_histogram *histogram) { + /* TODO(klempner): Print more detailed information, such as detailed histogram + buckets */ + gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f", + gpr_histogram_percentile(histogram, 50), + gpr_histogram_percentile(histogram, 95), + gpr_histogram_percentile(histogram, 99), + gpr_histogram_percentile(histogram, 99.9)); +} + +static double now() { + gpr_timespec tv = gpr_now(); + return 1e9 * tv.tv_sec + tv.tv_nsec; +} + +static void client_thread(thread_args *args) { + char *buf = calloc(args->msg_size, sizeof(char)); + gpr_histogram *histogram = gpr_histogram_create(0.01, 60e9); + double start_time; + double end_time; + double interval; + const int kNumIters = 100000; + int i; + + if (args->setup(args) < 0) { + gpr_log(GPR_ERROR, "Setup failed"); + } + for (i = 0; i < kNumIters; ++i) { + start_time = now(); + if (args->write_bytes(args, buf) < 0) { + gpr_log(GPR_ERROR, "Client write failed"); + goto error; + } + if (args->read_bytes(args, buf) < 0) { + gpr_log(GPR_ERROR, "Client read failed"); + goto error; + } + end_time = now(); + if (i > kNumIters / 2) { + interval = end_time - start_time; + gpr_histogram_add(histogram, interval); + } + } + print_histogram(histogram); +error: + free(buf); + gpr_histogram_destroy(histogram); +} + +/* This roughly matches tcp_server's create_listening_socket */ +static int create_listening_socket(struct sockaddr *port, socklen_t len) { + int fd = socket(port->sa_family, SOCK_STREAM, 0); + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + goto error; + } + + if (!grpc_set_socket_cloexec(fd, 1) || !grpc_set_socket_low_latency(fd, 1) || + !grpc_set_socket_reuse_addr(fd, 1)) { + gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, + strerror(errno)); + goto error; + } + + if (bind(fd, port, len) < 0) { + gpr_log(GPR_ERROR, "bind: %s", strerror(errno)); + goto error; + } + + if (listen(fd, 1) < 0) { + gpr_log(GPR_ERROR, "listen: %s", strerror(errno)); + goto error; + } + + if (getsockname(fd, port, &len) < 0) { + gpr_log(GPR_ERROR, "getsockname: %s", strerror(errno)); + goto error; + } + + return fd; + +error: + if (fd >= 0) { + close(fd); + } + return -1; +} + +static int connect_client(struct sockaddr *addr, int len) { + int fd = socket(addr->sa_family, SOCK_STREAM, 0); + int err; + if (fd < 0) { + gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); + goto error; + } + + if (!grpc_set_socket_cloexec(fd, 1) || !grpc_set_socket_low_latency(fd, 1)) { + gpr_log(GPR_ERROR, "Failed to configure socket"); + goto error; + } + + do { + err = connect(fd, addr, len); + } while (err < 0 && errno == EINTR); + + if (err < 0) { + gpr_log(GPR_ERROR, "connect error: %s", strerror(errno)); + goto error; + } + return fd; + +error: + if (fd >= 0) { + close(fd); + } + return -1; +} + +static int accept_server(int listen_fd) { + int fd = accept(listen_fd, NULL, NULL); + if (fd < 0) { + gpr_log(GPR_ERROR, "Accept failed: %s", strerror(errno)); + return -1; + } + return fd; +} + +static int create_sockets_tcp(fd_pair *client_fds, fd_pair *server_fds) { + int listen_fd = -1; + int client_fd = -1; + int server_fd = -1; + + struct sockaddr_in port; + struct sockaddr *sa_port = (struct sockaddr *)&port; + + port.sin_family = AF_INET; + port.sin_port = 0; + port.sin_addr.s_addr = INADDR_ANY; + + listen_fd = create_listening_socket(sa_port, sizeof(port)); + if (listen_fd == -1) { + gpr_log(GPR_ERROR, "Listen failed"); + goto error; + } + + client_fd = connect_client(sa_port, sizeof(port)); + if (client_fd == -1) { + gpr_log(GPR_ERROR, "Connect failed"); + goto error; + } + + server_fd = accept_server(listen_fd); + if (server_fd == -1) { + gpr_log(GPR_ERROR, "Accept failed"); + goto error; + } + + client_fds->read_fd = client_fd; + client_fds->write_fd = client_fd; + server_fds->read_fd = server_fd; + server_fds->write_fd = server_fd; + close(listen_fd); + return 0; + +error: + if (listen_fd != -1) { + close(listen_fd); + } + if (client_fd != -1) { + close(client_fd); + } + if (server_fd != -1) { + close(server_fd); + } + return -1; +} + +static int create_sockets_socketpair(fd_pair *client_fds, fd_pair *server_fds) { + int fds[2]; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { + gpr_log(GPR_ERROR, "socketpair: %s", strerror(errno)); + return -1; + } + + client_fds->read_fd = fds[0]; + client_fds->write_fd = fds[0]; + server_fds->read_fd = fds[1]; + server_fds->write_fd = fds[1]; + return 0; +} + +static int create_sockets_pipe(fd_pair *client_fds, fd_pair *server_fds) { + int cfds[2]; + int sfds[2]; + if (pipe(cfds) < 0) { + gpr_log(GPR_ERROR, "pipe: %s", strerror(errno)); + return -1; + } + + if (pipe(sfds) < 0) { + gpr_log(GPR_ERROR, "pipe: %s", strerror(errno)); + return -1; + } + + client_fds->read_fd = cfds[0]; + client_fds->write_fd = cfds[1]; + server_fds->read_fd = sfds[0]; + server_fds->write_fd = sfds[1]; + return 0; +} + +static const char *read_strategy_usage = + "Strategy for doing reads, which is one of:\n" + " blocking: blocking read calls\n" + " same_thread_poll: poll() call on same thread \n" +#ifdef __linux__ + " same_thread_epoll: epoll_wait() on same thread \n" +#endif + " spin_read: spinning non-blocking read() calls \n" + " spin_poll: spinning 0 timeout poll() calls \n" +#ifdef __linux__ + " spin_epoll: spinning 0 timeout epoll_wait() calls \n" +#endif + ""; + +static const char *socket_type_usage = + "Type of socket used, one of:\n" + " tcp: fds are endpoints of a TCP connection\n" + " socketpair: fds come from socketpair()\n" + " pipe: fds come from pipe()\n"; + +void print_usage(char *argv0) { + fprintf(stderr, "%s usage:\n\n", argv0); + fprintf(stderr, "%s read_strategy socket_type msg_size\n\n", argv0); + fprintf(stderr, "where read_strategy is one of:\n"); + fprintf(stderr, " blocking: blocking read calls\n"); + fprintf(stderr, " same_thread_poll: poll() call on same thread \n"); +#ifdef __linux__ + fprintf(stderr, " same_thread_epoll: epoll_wait() on same thread \n"); +#endif + fprintf(stderr, " spin_read: spinning non-blocking read() calls \n"); + fprintf(stderr, " spin_poll: spinning 0 timeout poll() calls \n"); +#ifdef __linux__ + fprintf(stderr, " spin_epoll: spinning 0 timeout epoll_wait() calls \n"); +#endif + fprintf(stderr, "and socket_type is one of:\n"); + fprintf(stderr, " tcp: fds are endpoints of a TCP connection\n"); + fprintf(stderr, " socketpair: fds come from socketpair()\n"); + fprintf(stderr, " pipe: fds come from pipe()\n"); +} + +typedef struct test_strategy { + char *name; + int (*read_strategy)(struct thread_args *args, char *buf); + int (*setup)(struct thread_args *args); +} test_strategy; + +static test_strategy test_strategies[] = { + {"blocking", blocking_read_bytes, do_nothing}, + {"same_thread_poll", poll_read_bytes_blocking, set_socket_nonblocking}, +#ifdef __linux__ + {"same_thread_epoll", epoll_read_bytes_blocking, epoll_setup}, + {"spin_epoll", epoll_read_bytes_spin, epoll_setup}, +#endif /* __linux__ */ + {"spin_read", spin_read_bytes, set_socket_nonblocking}, + {"spin_poll", poll_read_bytes_spin, set_socket_nonblocking}}; + +int main(int argc, char **argv) { + gpr_thd_id tid; + thread_args *client_args = malloc(sizeof(thread_args)); + thread_args *server_args = malloc(sizeof(thread_args)); + int msg_size = -1; + char *read_strategy = NULL; + char *socket_type = NULL; + int i; + const test_strategy *test_strategy = NULL; + + gpr_cmdline *cmdline = + gpr_cmdline_create("low_level_ping_pong network benchmarking tool"); + + gpr_cmdline_add_int(cmdline, "msg_size", "Size of sent messages", &msg_size); + gpr_cmdline_add_string(cmdline, "read_strategy", read_strategy_usage, + &read_strategy); + gpr_cmdline_add_string(cmdline, "socket_type", socket_type_usage, + &socket_type); + + gpr_cmdline_parse(cmdline, argc, argv); + + if (read_strategy == NULL) { + read_strategy = "blocking"; + } + if (socket_type == NULL) { + socket_type = "tcp"; + } + if (msg_size == -1) { + msg_size = 50; + } + + for (i = 0; i < sizeof(test_strategies) / sizeof(struct test_strategy); ++i) { + if (!strcmp(test_strategies[i].name, read_strategy)) { + test_strategy = &test_strategies[i]; + } + } + if (test_strategy == NULL) { + fprintf(stderr, "Invalid read strategy %s\n", read_strategy); + return -1; + } + + client_args->read_bytes = test_strategy->read_strategy; + client_args->write_bytes = blocking_write_bytes; + client_args->setup = test_strategy->setup; + server_args->read_bytes = test_strategy->read_strategy; + server_args->write_bytes = blocking_write_bytes; + server_args->setup = test_strategy->setup; + + if (strcmp(socket_type, "tcp") == 0) { + create_sockets_tcp(&client_args->fds, &server_args->fds); + } else if (strcmp(socket_type, "socketpair") == 0) { + create_sockets_socketpair(&client_args->fds, &server_args->fds); + } else if (strcmp(socket_type, "pipe") == 0) { + create_sockets_pipe(&client_args->fds, &server_args->fds); + } else { + fprintf(stderr, "Invalid socket type %s\n", socket_type); + return -1; + } + + if (msg_size <= 0) { + fprintf(stderr, "msg_size must be > 0\n"); + print_usage(argv[0]); + return -1; + } + + server_args->msg_size = msg_size; + client_args->msg_size = msg_size; + + gpr_log(GPR_INFO, "Starting test %s %s %d", read_strategy, socket_type, + msg_size); + + gpr_thd_new(&tid, server_thread_wrap, server_args, NULL); + client_thread(client_args); + gpr_cmdline_destroy(cmdline); + return 0; +} |