aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/network_benchmarks/low_level_ping_pong.c
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/network_benchmarks/low_level_ping_pong.c')
-rw-r--r--test/core/network_benchmarks/low_level_ping_pong.c626
1 files changed, 626 insertions, 0 deletions
diff --git a/test/core/network_benchmarks/low_level_ping_pong.c b/test/core/network_benchmarks/low_level_ping_pong.c
new file mode 100644
index 0000000000..93c66a9ecb
--- /dev/null
+++ b/test/core/network_benchmarks/low_level_ping_pong.c
@@ -0,0 +1,626 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/*
+ Basic I/O ping-pong benchmarks.
+
+ The goal here is to establish lower bounds on how fast the stack could get by
+ measuring the cost of using various I/O strategies to do a basic
+ request-response loop.
+ */
+
+#include <errno.h>
+#include <netinet/ip.h>
+#include <poll.h>
+#include <stdio.h>
+#include <string.h>
+#ifdef __linux__
+#include <sys/epoll.h>
+#endif
+#include <sys/socket.h>
+
+#include "src/core/endpoint/socket_utils.h"
+#include <grpc/support/cmdline.h>
+#include <grpc/support/histogram.h>
+#include <grpc/support/log.h>
+#include <grpc/support/thd.h>
+#include <grpc/support/time.h>
+
+typedef struct fd_pair {
+ int read_fd;
+ int write_fd;
+} fd_pair;
+
+typedef struct thread_args {
+ fd_pair fds;
+ size_t msg_size;
+ int (*read_bytes)(struct thread_args *args, char *buf);
+ int (*write_bytes)(struct thread_args *args, char *buf);
+ int (*setup)(struct thread_args *args);
+ int epoll_fd;
+} thread_args;
+
+/*
+ Read strategies
+
+ There are a number of read strategies, each of which has a blocking and
+ non-blocking version.
+ */
+
+/* Basic call to read() */
+static int read_bytes(int fd, char *buf, size_t read_size, int spin) {
+ int bytes_read = 0;
+ int err;
+ do {
+ err = read(fd, buf + bytes_read, read_size - bytes_read);
+ if (err < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ if (errno == EAGAIN && spin == 1) {
+ continue;
+ }
+ gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
+ return -1;
+ }
+ } else {
+ bytes_read += err;
+ }
+ } while (bytes_read < read_size);
+ return 0;
+}
+
+static int blocking_read_bytes(thread_args *args, char *buf) {
+ return read_bytes(args->fds.read_fd, buf, args->msg_size, 0);
+}
+
+static int spin_read_bytes(thread_args *args, char *buf) {
+ return read_bytes(args->fds.read_fd, buf, args->msg_size, 1);
+}
+
+/* Call poll() to monitor a non-blocking fd */
+static int poll_read_bytes(int fd, char *buf, size_t read_size, int spin) {
+ struct pollfd pfd;
+ size_t bytes_read = 0;
+ int err;
+
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+ do {
+ err = poll(&pfd, 1, spin ? 0 : -1);
+ if (err < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ gpr_log(GPR_ERROR, "Poll failed: %s", strerror(errno));
+ return -1;
+ }
+ }
+ if (err == 0 && spin) continue;
+ GPR_ASSERT(err == 1);
+ GPR_ASSERT(pfd.revents == POLLIN);
+ do {
+ err = read(fd, buf + bytes_read, read_size - bytes_read);
+ } while (err < 0 && errno == EINTR);
+ if (err < 0 && errno != EAGAIN) {
+ gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
+ return -1;
+ }
+ bytes_read += err;
+ } while (bytes_read < read_size);
+ return 0;
+}
+
+static int poll_read_bytes_blocking(struct thread_args *args, char *buf) {
+ return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 0);
+}
+
+static int poll_read_bytes_spin(struct thread_args *args, char *buf) {
+ return poll_read_bytes(args->fds.read_fd, buf, args->msg_size, 1);
+}
+
+#ifdef __linux__
+/* Call epoll_wait() to monitor a non-blocking fd */
+static int epoll_read_bytes(struct thread_args *args, char *buf, int spin) {
+ struct epoll_event ev;
+ size_t bytes_read = 0;
+ int err;
+ size_t read_size = args->msg_size;
+
+ do {
+ err = epoll_wait(args->epoll_fd, &ev, 1, spin ? 0 : -1);
+ if (err < 0) {
+ if (errno == EINTR) continue;
+ gpr_log(GPR_ERROR, "epoll_wait failed: %s", strerror(errno));
+ return -1;
+ }
+ if (err == 0 && spin) continue;
+ GPR_ASSERT(err == 1);
+ GPR_ASSERT(ev.events & EPOLLIN);
+ GPR_ASSERT(ev.data.fd == args->fds.read_fd);
+ do {
+ do {
+ err = read(args->fds.read_fd, buf + bytes_read, read_size - bytes_read);
+ } while (err < 0 && errno == EINTR);
+ if (errno == EAGAIN) break;
+ bytes_read += err;
+ /* TODO(klempner): This should really be doing an extra call after we are
+ done to ensure we see an EAGAIN */
+ } while (bytes_read < read_size);
+ } while (bytes_read < read_size);
+ GPR_ASSERT(bytes_read == read_size);
+ return 0;
+}
+
+static int epoll_read_bytes_blocking(struct thread_args *args, char *buf) {
+ return epoll_read_bytes(args, buf, 0);
+}
+
+static int epoll_read_bytes_spin(struct thread_args *args, char *buf) {
+ return epoll_read_bytes(args, buf, 1);
+}
+#endif /* __linux__ */
+
+/* Write out bytes.
+ At this point we only have one strategy, since in the common case these
+ writes go directly out to the kernel.
+ */
+static int blocking_write_bytes(struct thread_args *args, char *buf) {
+ int bytes_written = 0;
+ int err;
+ size_t write_size = args->msg_size;
+ do {
+ err = write(args->fds.write_fd, buf + bytes_written,
+ write_size - bytes_written);
+ if (err < 0) {
+ if (errno == EINTR) {
+ continue;
+ } else {
+ gpr_log(GPR_ERROR, "Read failed: %s", strerror(errno));
+ return -1;
+ }
+ } else {
+ bytes_written += err;
+ }
+ } while (bytes_written < write_size);
+ return 0;
+}
+
+/*
+ Initialization code
+
+ These are called at the beginning of the client and server thread, depending
+ on the scenario we're using.
+ */
+static int set_socket_nonblocking(thread_args *args) {
+ if (!grpc_set_socket_nonblocking(args->fds.read_fd, 1)) {
+ gpr_log(GPR_ERROR, "Unable to set socket nonblocking: %s", strerror(errno));
+ return -1;
+ }
+ if (!grpc_set_socket_nonblocking(args->fds.write_fd, 1)) {
+ gpr_log(GPR_ERROR, "Unable to set socket nonblocking: %s", strerror(errno));
+ return -1;
+ }
+ return 0;
+}
+
+static int do_nothing(thread_args *args) { return 0; }
+
+/* Special case for epoll, where we need to create the fd ahead of time. */
+static int epoll_setup(thread_args *args) {
+ int epoll_fd;
+ struct epoll_event ev;
+ set_socket_nonblocking(args);
+ epoll_fd = epoll_create(1);
+ if (epoll_fd < 0) {
+ gpr_log(GPR_ERROR, "epoll_create: %s", strerror(errno));
+ return -1;
+ }
+
+ args->epoll_fd = epoll_fd;
+
+ ev.events = EPOLLIN | EPOLLET;
+ ev.data.fd = args->fds.read_fd;
+ if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, args->fds.read_fd, &ev) < 0) {
+ gpr_log(GPR_ERROR, "epoll_ctl: %s", strerror(errno));
+ }
+ return 0;
+}
+
+static void server_thread(thread_args *args) {
+ char *buf = malloc(args->msg_size);
+ if (args->setup(args) < 0) {
+ gpr_log(GPR_ERROR, "Setup failed");
+ }
+ for (;;) {
+ if (args->read_bytes(args, buf) < 0) {
+ gpr_log(GPR_ERROR, "Server read failed");
+ free(buf);
+ return;
+ }
+ if (args->write_bytes(args, buf) < 0) {
+ gpr_log(GPR_ERROR, "Server write failed");
+ free(buf);
+ return;
+ }
+ }
+}
+
+static void server_thread_wrap(void *arg) {
+ thread_args *args = arg;
+ server_thread(args);
+}
+
+static void print_histogram(gpr_histogram *histogram) {
+ /* TODO(klempner): Print more detailed information, such as detailed histogram
+ buckets */
+ gpr_log(GPR_INFO, "latency (50/95/99/99.9): %f/%f/%f/%f",
+ gpr_histogram_percentile(histogram, 50),
+ gpr_histogram_percentile(histogram, 95),
+ gpr_histogram_percentile(histogram, 99),
+ gpr_histogram_percentile(histogram, 99.9));
+}
+
+static double now() {
+ gpr_timespec tv = gpr_now();
+ return 1e9 * tv.tv_sec + tv.tv_nsec;
+}
+
+static void client_thread(thread_args *args) {
+ char *buf = calloc(args->msg_size, sizeof(char));
+ gpr_histogram *histogram = gpr_histogram_create(0.01, 60e9);
+ double start_time;
+ double end_time;
+ double interval;
+ const int kNumIters = 100000;
+ int i;
+
+ if (args->setup(args) < 0) {
+ gpr_log(GPR_ERROR, "Setup failed");
+ }
+ for (i = 0; i < kNumIters; ++i) {
+ start_time = now();
+ if (args->write_bytes(args, buf) < 0) {
+ gpr_log(GPR_ERROR, "Client write failed");
+ goto error;
+ }
+ if (args->read_bytes(args, buf) < 0) {
+ gpr_log(GPR_ERROR, "Client read failed");
+ goto error;
+ }
+ end_time = now();
+ if (i > kNumIters / 2) {
+ interval = end_time - start_time;
+ gpr_histogram_add(histogram, interval);
+ }
+ }
+ print_histogram(histogram);
+error:
+ free(buf);
+ gpr_histogram_destroy(histogram);
+}
+
+/* This roughly matches tcp_server's create_listening_socket */
+static int create_listening_socket(struct sockaddr *port, socklen_t len) {
+ int fd = socket(port->sa_family, SOCK_STREAM, 0);
+ if (fd < 0) {
+ gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
+ goto error;
+ }
+
+ if (!grpc_set_socket_cloexec(fd, 1) || !grpc_set_socket_low_latency(fd, 1) ||
+ !grpc_set_socket_reuse_addr(fd, 1)) {
+ gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
+ strerror(errno));
+ goto error;
+ }
+
+ if (bind(fd, port, len) < 0) {
+ gpr_log(GPR_ERROR, "bind: %s", strerror(errno));
+ goto error;
+ }
+
+ if (listen(fd, 1) < 0) {
+ gpr_log(GPR_ERROR, "listen: %s", strerror(errno));
+ goto error;
+ }
+
+ if (getsockname(fd, port, &len) < 0) {
+ gpr_log(GPR_ERROR, "getsockname: %s", strerror(errno));
+ goto error;
+ }
+
+ return fd;
+
+error:
+ if (fd >= 0) {
+ close(fd);
+ }
+ return -1;
+}
+
+static int connect_client(struct sockaddr *addr, int len) {
+ int fd = socket(addr->sa_family, SOCK_STREAM, 0);
+ int err;
+ if (fd < 0) {
+ gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
+ goto error;
+ }
+
+ if (!grpc_set_socket_cloexec(fd, 1) || !grpc_set_socket_low_latency(fd, 1)) {
+ gpr_log(GPR_ERROR, "Failed to configure socket");
+ goto error;
+ }
+
+ do {
+ err = connect(fd, addr, len);
+ } while (err < 0 && errno == EINTR);
+
+ if (err < 0) {
+ gpr_log(GPR_ERROR, "connect error: %s", strerror(errno));
+ goto error;
+ }
+ return fd;
+
+error:
+ if (fd >= 0) {
+ close(fd);
+ }
+ return -1;
+}
+
+static int accept_server(int listen_fd) {
+ int fd = accept(listen_fd, NULL, NULL);
+ if (fd < 0) {
+ gpr_log(GPR_ERROR, "Accept failed: %s", strerror(errno));
+ return -1;
+ }
+ return fd;
+}
+
+static int create_sockets_tcp(fd_pair *client_fds, fd_pair *server_fds) {
+ int listen_fd = -1;
+ int client_fd = -1;
+ int server_fd = -1;
+
+ struct sockaddr_in port;
+ struct sockaddr *sa_port = (struct sockaddr *)&port;
+
+ port.sin_family = AF_INET;
+ port.sin_port = 0;
+ port.sin_addr.s_addr = INADDR_ANY;
+
+ listen_fd = create_listening_socket(sa_port, sizeof(port));
+ if (listen_fd == -1) {
+ gpr_log(GPR_ERROR, "Listen failed");
+ goto error;
+ }
+
+ client_fd = connect_client(sa_port, sizeof(port));
+ if (client_fd == -1) {
+ gpr_log(GPR_ERROR, "Connect failed");
+ goto error;
+ }
+
+ server_fd = accept_server(listen_fd);
+ if (server_fd == -1) {
+ gpr_log(GPR_ERROR, "Accept failed");
+ goto error;
+ }
+
+ client_fds->read_fd = client_fd;
+ client_fds->write_fd = client_fd;
+ server_fds->read_fd = server_fd;
+ server_fds->write_fd = server_fd;
+ close(listen_fd);
+ return 0;
+
+error:
+ if (listen_fd != -1) {
+ close(listen_fd);
+ }
+ if (client_fd != -1) {
+ close(client_fd);
+ }
+ if (server_fd != -1) {
+ close(server_fd);
+ }
+ return -1;
+}
+
+static int create_sockets_socketpair(fd_pair *client_fds, fd_pair *server_fds) {
+ int fds[2];
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
+ gpr_log(GPR_ERROR, "socketpair: %s", strerror(errno));
+ return -1;
+ }
+
+ client_fds->read_fd = fds[0];
+ client_fds->write_fd = fds[0];
+ server_fds->read_fd = fds[1];
+ server_fds->write_fd = fds[1];
+ return 0;
+}
+
+static int create_sockets_pipe(fd_pair *client_fds, fd_pair *server_fds) {
+ int cfds[2];
+ int sfds[2];
+ if (pipe(cfds) < 0) {
+ gpr_log(GPR_ERROR, "pipe: %s", strerror(errno));
+ return -1;
+ }
+
+ if (pipe(sfds) < 0) {
+ gpr_log(GPR_ERROR, "pipe: %s", strerror(errno));
+ return -1;
+ }
+
+ client_fds->read_fd = cfds[0];
+ client_fds->write_fd = cfds[1];
+ server_fds->read_fd = sfds[0];
+ server_fds->write_fd = sfds[1];
+ return 0;
+}
+
+static const char *read_strategy_usage =
+ "Strategy for doing reads, which is one of:\n"
+ " blocking: blocking read calls\n"
+ " same_thread_poll: poll() call on same thread \n"
+#ifdef __linux__
+ " same_thread_epoll: epoll_wait() on same thread \n"
+#endif
+ " spin_read: spinning non-blocking read() calls \n"
+ " spin_poll: spinning 0 timeout poll() calls \n"
+#ifdef __linux__
+ " spin_epoll: spinning 0 timeout epoll_wait() calls \n"
+#endif
+ "";
+
+static const char *socket_type_usage =
+ "Type of socket used, one of:\n"
+ " tcp: fds are endpoints of a TCP connection\n"
+ " socketpair: fds come from socketpair()\n"
+ " pipe: fds come from pipe()\n";
+
+void print_usage(char *argv0) {
+ fprintf(stderr, "%s usage:\n\n", argv0);
+ fprintf(stderr, "%s read_strategy socket_type msg_size\n\n", argv0);
+ fprintf(stderr, "where read_strategy is one of:\n");
+ fprintf(stderr, " blocking: blocking read calls\n");
+ fprintf(stderr, " same_thread_poll: poll() call on same thread \n");
+#ifdef __linux__
+ fprintf(stderr, " same_thread_epoll: epoll_wait() on same thread \n");
+#endif
+ fprintf(stderr, " spin_read: spinning non-blocking read() calls \n");
+ fprintf(stderr, " spin_poll: spinning 0 timeout poll() calls \n");
+#ifdef __linux__
+ fprintf(stderr, " spin_epoll: spinning 0 timeout epoll_wait() calls \n");
+#endif
+ fprintf(stderr, "and socket_type is one of:\n");
+ fprintf(stderr, " tcp: fds are endpoints of a TCP connection\n");
+ fprintf(stderr, " socketpair: fds come from socketpair()\n");
+ fprintf(stderr, " pipe: fds come from pipe()\n");
+}
+
+typedef struct test_strategy {
+ char *name;
+ int (*read_strategy)(struct thread_args *args, char *buf);
+ int (*setup)(struct thread_args *args);
+} test_strategy;
+
+static test_strategy test_strategies[] = {
+ {"blocking", blocking_read_bytes, do_nothing},
+ {"same_thread_poll", poll_read_bytes_blocking, set_socket_nonblocking},
+#ifdef __linux__
+ {"same_thread_epoll", epoll_read_bytes_blocking, epoll_setup},
+ {"spin_epoll", epoll_read_bytes_spin, epoll_setup},
+#endif /* __linux__ */
+ {"spin_read", spin_read_bytes, set_socket_nonblocking},
+ {"spin_poll", poll_read_bytes_spin, set_socket_nonblocking}};
+
+int main(int argc, char **argv) {
+ gpr_thd_id tid;
+ thread_args *client_args = malloc(sizeof(thread_args));
+ thread_args *server_args = malloc(sizeof(thread_args));
+ int msg_size = -1;
+ char *read_strategy = NULL;
+ char *socket_type = NULL;
+ int i;
+ const test_strategy *test_strategy = NULL;
+
+ gpr_cmdline *cmdline =
+ gpr_cmdline_create("low_level_ping_pong network benchmarking tool");
+
+ gpr_cmdline_add_int(cmdline, "msg_size", "Size of sent messages", &msg_size);
+ gpr_cmdline_add_string(cmdline, "read_strategy", read_strategy_usage,
+ &read_strategy);
+ gpr_cmdline_add_string(cmdline, "socket_type", socket_type_usage,
+ &socket_type);
+
+ gpr_cmdline_parse(cmdline, argc, argv);
+
+ if (read_strategy == NULL) {
+ read_strategy = "blocking";
+ }
+ if (socket_type == NULL) {
+ socket_type = "tcp";
+ }
+ if (msg_size == -1) {
+ msg_size = 50;
+ }
+
+ for (i = 0; i < sizeof(test_strategies) / sizeof(struct test_strategy); ++i) {
+ if (!strcmp(test_strategies[i].name, read_strategy)) {
+ test_strategy = &test_strategies[i];
+ }
+ }
+ if (test_strategy == NULL) {
+ fprintf(stderr, "Invalid read strategy %s\n", read_strategy);
+ return -1;
+ }
+
+ client_args->read_bytes = test_strategy->read_strategy;
+ client_args->write_bytes = blocking_write_bytes;
+ client_args->setup = test_strategy->setup;
+ server_args->read_bytes = test_strategy->read_strategy;
+ server_args->write_bytes = blocking_write_bytes;
+ server_args->setup = test_strategy->setup;
+
+ if (strcmp(socket_type, "tcp") == 0) {
+ create_sockets_tcp(&client_args->fds, &server_args->fds);
+ } else if (strcmp(socket_type, "socketpair") == 0) {
+ create_sockets_socketpair(&client_args->fds, &server_args->fds);
+ } else if (strcmp(socket_type, "pipe") == 0) {
+ create_sockets_pipe(&client_args->fds, &server_args->fds);
+ } else {
+ fprintf(stderr, "Invalid socket type %s\n", socket_type);
+ return -1;
+ }
+
+ if (msg_size <= 0) {
+ fprintf(stderr, "msg_size must be > 0\n");
+ print_usage(argv[0]);
+ return -1;
+ }
+
+ server_args->msg_size = msg_size;
+ client_args->msg_size = msg_size;
+
+ gpr_log(GPR_INFO, "Starting test %s %s %d", read_strategy, socket_type,
+ msg_size);
+
+ gpr_thd_new(&tid, server_thread_wrap, server_args, NULL);
+ client_thread(client_args);
+ gpr_cmdline_destroy(cmdline);
+ return 0;
+}