/* * * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */ /* Basic I/O ping-pong benchmarks. The goal here is to establish lower bounds on how fast the stack could get by measuring the cost of using various I/O strategies to do a basic request-response loop. */ #include #include #include #include #include #ifdef __linux__ #include #endif #include #include #include #include #include #include #include #include "src/core/lib/iomgr/socket_utils_posix.h" typedef struct fd_pair { int read_fd; int write_fd; } fd_pair; typedef struct thread_args { fd_pair fds; size_t msg_size; int (*read_bytes)(struct thread_args *args, char *buf); int (*write_bytes)(struct thread_args *args, char *buf); int (*setup)(struct thread_args *args); int epoll_fd; char *strategy_name; } thread_args; /* Read strategies There are a number of read strategies, each of which has a blocking and non-blocking version. */ /* Basic call to read() */ static int read_bytes(int fd, char *buf, size_t read_size, int spin) { size_t bytes_read = 0; ssize_t err; do { err = read(fd, buf + bytes_read, read_size - bytes_read); if (err < 0) { if (errno == EINTR) { continue; } else { if (errno == EAGAIN && spin == 1) { continue; } gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno)); return -1; } } else { bytes_read += (size_t)err; } } while (bytes_read < read_size); return 0; } static int blocking_read_bytes(thread_args *args, char *buf) { return read_bytes(args->fds.read_fd, buf, args->msg_size, 0); } static int spin_read_bytes(thread_args *args, char *buf) { return read_bytes(args->fds.read_fd, buf, args->msg_size, 1); } /* Call poll() to monitor a non-blocking fd */ static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) { struct pollfd pfd; size_t bytes_read = 0; int err; ssize_t err2; pfd.fd = fd; pfd.events = POLLIN; do { err = poll(&pfd, 1, spin ? 0 : -1); if (err < 0) { if (errno == EINTR) { continue; } else { gpr_log(GPR_ERROR, "Poll failed: %s", strerror(errno)); return -1; } } if (err == 0 && spin) continue; GPR_ASSERT(err == 1); GPR_ASSERT(pfd.revents == POLLIN); do { err2 = read(fd, buf + bytes_read, read_size - bytes_read); } while (err2 < 0 && errno == EINTR); if (err2 < 0 && errno != EAGAIN) { gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno)); return -1; } bytes_read += (size_t)err2; } while (bytes_read < read_size); return 0; } static int poll_read_bytes_blocking(struct thread_args *args, char *buf) { return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 0); } static int poll_read_bytes_spin(struct thread_args *args, char *buf) { return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 1); } #ifdef __linux__ /* Call epoll_wait() to monitor a non-blocking fd */ static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) { struct epoll_event ev; size_t bytes_read = 0; int err; ssize_t err2; size_t read_size = args->msg_size; do { err = epoll_wait(args->epoll_fd, &ev, 1, spin ? 0 : -1); if (err < 0) { if (errno == EINTR) continue; gpr_log(GPR_ERROR, "epoll_wait failed: %s", strerror(errno)); return -1; } if (err == 0 && spin) continue; GPR_ASSERT(err == 1); GPR_ASSERT(ev.events & EPOLLIN); GPR_ASSERT(ev.data.fd == args->fds.read_fd); do { do { err2 = read(args->fds.read_fd, buf + bytes_read, read_size - bytes_read); } while (err2 < 0 && errno == EINTR); if (errno == EAGAIN) break; bytes_read += (size_t)err2; /* TODO(klempner): This should really be doing an extra call after we are done to ensure we see an EAGAIN */ } while (bytes_read < read_size); } while (bytes_read < read_size); GPR_ASSERT(bytes_read == read_size); return 0; } static int epoll_read_bytes_blocking(struct thread_args *args, char *buf) { return epoll_read_bytes(args, buf, 0); } static int epoll_read_bytes_spin(struct thread_args *args, char *buf) { return epoll_read_bytes(args, buf, 1); } #endif /* __linux__ */ /* Write out bytes. At this point we only have one strategy, since in the common case these writes go directly out to the kernel. */ static int blocking_write_bytes(struct thread_args *args, char *buf) { size_t bytes_written = 0; ssize_t err; size_t write_size = args->msg_size; do { err = write(args->fds.write_fd, buf + bytes_written, write_size - bytes_written); if (err < 0) { if (errno == EINTR) { continue; } else { gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno)); return -1; } } else { bytes_written += (size_t)err; } } while (bytes_written < write_size); return 0; } /* Initialization code These are called at the beginning of the client and server thread, depending on the scenario we're using. */ static int set_socket_nonblocking(thread_args *args) { if (!grpc_set_socket_nonblocking(args->fds.read_fd, 1)) { gpr_log(GPR_ERROR, "Unable to set socket nonblocking: %s", strerror(errno)); return -1; } if (!grpc_set_socket_nonblocking(args->fds.write_fd, 1)) { gpr_log(GPR_ERROR, "Unable to set socket nonblocking: %s", strerror(errno)); return -1; } return 0; } static int do_nothing(thread_args *args) { return 0; } #ifdef __linux__ /* Special case for epoll, where we need to create the fd ahead of time. */ static int epoll_setup(thread_args *args) { int epoll_fd; struct epoll_event ev; set_socket_nonblocking(args); epoll_fd = epoll_create(1); if (epoll_fd < 0) { gpr_log(GPR_ERROR, "epoll_create: %s", strerror(errno)); return -1; } args->epoll_fd = epoll_fd; ev.events = EPOLLIN | EPOLLET; ev.data.fd = args->fds.read_fd; if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, args->fds.read_fd, &ev) < 0) { gpr_log(GPR_ERROR, "epoll_ctl: %s", strerror(errno)); } return 0; } #endif static void server_thread(thread_args *args) { char *buf = malloc(args->msg_size); if (args->setup(args) < 0) { gpr_log(GPR_ERROR, "Setup failed"); } for (;;) { if (args->read_bytes(args, buf) < 0) { gpr_log(GPR_ERROR, "Server read failed"); free(buf); return; } if (args->write_bytes(args, buf) < 0) { gpr_log(GPR_ERROR, "Server write failed"); free(buf); return; } } } static void server_thread_wrap(void *arg) { thread_args *args = arg; server_thread(args); } static void print_histogram(gpr_histogram *histogram) { /* TODO(klempner): Print more detailed information, such as detailed histogram buckets */ gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f", gpr_histogram_percentile(histogram, 50), gpr_histogram_percentile(histogram, 95), gpr_histogram_percentile(histogram, 99), gpr_histogram_percentile(histogram, 99.9)); } static double now(void) { gpr_timespec tv = gpr_now(GPR_CLOCK_REALTIME); return 1e9 * (double)tv.tv_sec + (double)tv.tv_nsec; } static void client_thread(thread_args *args) { char *buf = calloc(args->msg_size, sizeof(char)); gpr_histogram *histogram = gpr_histogram_create(0.01, 60e9); double start_time; double end_time; double interval; const int kNumIters = 100000; int i; if (args->setup(args) < 0) { gpr_log(GPR_ERROR, "Setup failed"); } for (i = 0; i < kNumIters; ++i) { start_time = now(); if (args->write_bytes(args, buf) < 0) { gpr_log(GPR_ERROR, "Client write failed"); goto error; } if (args->read_bytes(args, buf) < 0) { gpr_log(GPR_ERROR, "Client read failed"); goto error; } end_time = now(); if (i > kNumIters / 2) { interval = end_time - start_time; gpr_histogram_add(histogram, interval); } } print_histogram(histogram); error: free(buf); gpr_histogram_destroy(histogram); } /* This roughly matches tcp_server's create_listening_socket */ static int create_listening_socket(struct sockaddr *port, socklen_t len) { int fd = socket(port->sa_family, SOCK_STREAM, 0); if (fd < 0) { gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); goto error; } if (!grpc_set_socket_cloexec(fd, 1) || !grpc_set_socket_low_latency(fd, 1) || !grpc_set_socket_reuse_addr(fd, 1)) { gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, strerror(errno)); goto error; } if (bind(fd, port, len) < 0) { gpr_log(GPR_ERROR, "bind: %s", strerror(errno)); goto error; } if (listen(fd, 1) < 0) { gpr_log(GPR_ERROR, "listen: %s", strerror(errno)); goto error; } if (getsockname(fd, port, &len) < 0) { gpr_log(GPR_ERROR, "getsockname: %s", strerror(errno)); goto error; } return fd; error: if (fd >= 0) { close(fd); } return -1; } static int connect_client(struct sockaddr *addr, socklen_t len) { int fd = socket(addr->sa_family, SOCK_STREAM, 0); int err; if (fd < 0) { gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); goto error; } if (!grpc_set_socket_cloexec(fd, 1) || !grpc_set_socket_low_latency(fd, 1)) { gpr_log(GPR_ERROR, "Failed to configure socket"); goto error; } do { err = connect(fd, addr, len); } while (err < 0 && errno == EINTR); if (err < 0) { gpr_log(GPR_ERROR, "connect error: %s", strerror(errno)); goto error; } return fd; error: if (fd >= 0) { close(fd); } return -1; } static int accept_server(int listen_fd) { int fd = accept(listen_fd, NULL, NULL); if (fd < 0) { gpr_log(GPR_ERROR, "Accept failed: %s", strerror(errno)); return -1; } return fd; } static int create_sockets_tcp(fd_pair *client_fds, fd_pair *server_fds) { int listen_fd = -1; int client_fd = -1; int server_fd = -1; struct sockaddr_in port; struct sockaddr *sa_port = (struct sockaddr *)&port; port.sin_family = AF_INET; port.sin_port = 0; port.sin_addr.s_addr = INADDR_ANY; listen_fd = create_listening_socket(sa_port, sizeof(port)); if (listen_fd == -1) { gpr_log(GPR_ERROR, "Listen failed"); goto error; } client_fd = connect_client(sa_port, sizeof(port)); if (client_fd == -1) { gpr_log(GPR_ERROR, "Connect failed"); goto error; } server_fd = accept_server(listen_fd); if (server_fd == -1) { gpr_log(GPR_ERROR, "Accept failed"); goto error; } client_fds->read_fd = client_fd; client_fds->write_fd = client_fd; server_fds->read_fd = server_fd; server_fds->write_fd = server_fd; close(listen_fd); return 0; error: if (listen_fd != -1) { close(listen_fd); } if (client_fd != -1) { close(client_fd); } if (server_fd != -1) { close(server_fd); } return -1; } static int create_sockets_socketpair(fd_pair *client_fds, fd_pair *server_fds) { int fds[2]; if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { gpr_log(GPR_ERROR, "socketpair: %s", strerror(errno)); return -1; } client_fds->read_fd = fds[0]; client_fds->write_fd = fds[0]; server_fds->read_fd = fds[1]; server_fds->write_fd = fds[1]; return 0; } static int create_sockets_pipe(fd_pair *client_fds, fd_pair *server_fds) { int cfds[2]; int sfds[2]; if (pipe(cfds) < 0) { gpr_log(GPR_ERROR, "pipe: %s", strerror(errno)); return -1; } if (pipe(sfds) < 0) { gpr_log(GPR_ERROR, "pipe: %s", strerror(errno)); return -1; } client_fds->read_fd = cfds[0]; client_fds->write_fd = cfds[1]; server_fds->read_fd = sfds[0]; server_fds->write_fd = sfds[1]; return 0; } static const char *read_strategy_usage = "Strategy for doing reads, which is one of:\n" " blocking: blocking read calls\n" " same_thread_poll: poll() call on same thread \n" #ifdef __linux__ " same_thread_epoll: epoll_wait() on same thread \n" #endif " spin_read: spinning non-blocking read() calls \n" " spin_poll: spinning 0 timeout poll() calls \n" #ifdef __linux__ " spin_epoll: spinning 0 timeout epoll_wait() calls \n" #endif ""; static const char *socket_type_usage = "Type of socket used, one of:\n" " tcp: fds are endpoints of a TCP connection\n" " socketpair: fds come from socketpair()\n" " pipe: fds come from pipe()\n"; void print_usage(char *argv0) { fprintf(stderr, "%s usage:\n\n", argv0); fprintf(stderr, "%s read_strategy socket_type msg_size\n\n", argv0); fprintf(stderr, "where read_strategy is one of:\n"); fprintf(stderr, " blocking: blocking read calls\n"); fprintf(stderr, " same_thread_poll: poll() call on same thread \n"); #ifdef __linux__ fprintf(stderr, " same_thread_epoll: epoll_wait() on same thread \n"); #endif fprintf(stderr, " spin_read: spinning non-blocking read() calls \n"); fprintf(stderr, " spin_poll: spinning 0 timeout poll() calls \n"); #ifdef __linux__ fprintf(stderr, " spin_epoll: spinning 0 timeout epoll_wait() calls \n"); #endif fprintf(stderr, "and socket_type is one of:\n"); fprintf(stderr, " tcp: fds are endpoints of a TCP connection\n"); fprintf(stderr, " socketpair: fds come from socketpair()\n"); fprintf(stderr, " pipe: fds come from pipe()\n"); } typedef struct test_strategy { char *name; int (*read_strategy)(struct thread_args *args, char *buf); int (*setup)(struct thread_args *args); } test_strategy; static test_strategy test_strategies[] = { {"blocking", blocking_read_bytes, do_nothing}, {"same_thread_poll", poll_read_bytes_blocking, set_socket_nonblocking}, #ifdef __linux__ {"same_thread_epoll", epoll_read_bytes_blocking, epoll_setup}, {"spin_epoll", epoll_read_bytes_spin, epoll_setup}, #endif /* __linux__ */ {"spin_read", spin_read_bytes, set_socket_nonblocking}, {"spin_poll", poll_read_bytes_spin, set_socket_nonblocking}}; static char *socket_types[] = {"tcp", "socketpair", "pipe"}; int create_socket(char *socket_type, fd_pair *client_fds, fd_pair *server_fds) { if (strcmp(socket_type, "tcp") == 0) { create_sockets_tcp(client_fds, server_fds); } else if (strcmp(socket_type, "socketpair") == 0) { create_sockets_socketpair(client_fds, server_fds); } else if (strcmp(socket_type, "pipe") == 0) { create_sockets_pipe(client_fds, server_fds); } else { fprintf(stderr, "Invalid socket type %s\n", socket_type); return -1; } return 0; } static int run_benchmark(char *socket_type, thread_args *client_args, thread_args *server_args) { gpr_thd_id tid; int rv = 0; rv = create_socket(socket_type, &client_args->fds, &server_args->fds); if (rv < 0) { return rv; } gpr_log(GPR_INFO, "Starting test %s %s %d", client_args->strategy_name, socket_type, client_args->msg_size); gpr_thd_new(&tid, server_thread_wrap, server_args, NULL); client_thread(client_args); return 0; } static int run_all_benchmarks(size_t msg_size) { int error = 0; size_t i; for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) { test_strategy *strategy = &test_strategies[i]; size_t j; for (j = 0; j < GPR_ARRAY_SIZE(socket_types); ++j) { thread_args *client_args = malloc(sizeof(thread_args)); thread_args *server_args = malloc(sizeof(thread_args)); char *socket_type = socket_types[j]; client_args->read_bytes = strategy->read_strategy; client_args->write_bytes = blocking_write_bytes; client_args->setup = strategy->setup; client_args->msg_size = msg_size; client_args->strategy_name = strategy->name; server_args->read_bytes = strategy->read_strategy; server_args->write_bytes = blocking_write_bytes; server_args->setup = strategy->setup; server_args->msg_size = msg_size; server_args->strategy_name = strategy->name; error = run_benchmark(socket_type, client_args, server_args); if (error < 0) { return error; } } } return error; } int main(int argc, char **argv) { thread_args *client_args = malloc(sizeof(thread_args)); thread_args *server_args = malloc(sizeof(thread_args)); int msg_size = -1; char *read_strategy = NULL; char *socket_type = NULL; size_t i; const test_strategy *strategy = NULL; int error = 0; gpr_cmdline *cmdline = gpr_cmdline_create("low_level_ping_pong network benchmarking tool"); gpr_cmdline_add_int(cmdline, "msg_size", "Size of sent messages", &msg_size); gpr_cmdline_add_string(cmdline, "read_strategy", read_strategy_usage, &read_strategy); gpr_cmdline_add_string(cmdline, "socket_type", socket_type_usage, &socket_type); gpr_cmdline_parse(cmdline, argc, argv); if (msg_size == -1) { msg_size = 50; } if (read_strategy == NULL) { gpr_log(GPR_INFO, "No strategy specified, running all benchmarks"); return run_all_benchmarks((size_t)msg_size); } if (socket_type == NULL) { socket_type = "tcp"; } if (msg_size <= 0) { fprintf(stderr, "msg_size must be > 0\n"); print_usage(argv[0]); return -1; } for (i = 0; i < GPR_ARRAY_SIZE(test_strategies); ++i) { if (strcmp(test_strategies[i].name, read_strategy) == 0) { strategy = &test_strategies[i]; } } if (strategy == NULL) { fprintf(stderr, "Invalid read strategy %s\n", read_strategy); return -1; } client_args->read_bytes = strategy->read_strategy; client_args->write_bytes = blocking_write_bytes; client_args->setup = strategy->setup; client_args->msg_size = (size_t)msg_size; client_args->strategy_name = read_strategy; server_args->read_bytes = strategy->read_strategy; server_args->write_bytes = blocking_write_bytes; server_args->setup = strategy->setup; server_args->msg_size = (size_t)msg_size; server_args->strategy_name = read_strategy; error = run_benchmark(socket_type, client_args, server_args); gpr_cmdline_destroy(cmdline); return error; }