aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/eventmanager
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/eventmanager')
-rw-r--r--test/core/eventmanager/em_pipe_test.c200
-rw-r--r--test/core/eventmanager/em_test.c725
2 files changed, 925 insertions, 0 deletions
diff --git a/test/core/eventmanager/em_pipe_test.c b/test/core/eventmanager/em_pipe_test.c
new file mode 100644
index 0000000000..5411142c89
--- /dev/null
+++ b/test/core/eventmanager/em_pipe_test.c
@@ -0,0 +1,200 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* Test grpc_em_fd with pipe. The test creates a pipe with non-blocking mode,
+ sends a stream of bytes through the pipe, and verifies that all bytes are
+ received. */
+#include "src/core/eventmanager/em.h"
+
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+
+#include <grpc/support/log.h>
+#include "test/core/util/test_config.h"
+
+/* Operation for fcntl() to set pipe buffer size. */
+#ifndef F_SETPIPE_SZ
+#define F_SETPIPE_SZ (1024 + 7)
+#endif
+
+#define TOTAL_WRITE 3 /* total number of times that the write buffer is full. \
+ */
+#define BUF_SIZE 1024
+char read_buf[BUF_SIZE];
+char write_buf[BUF_SIZE];
+
+typedef struct {
+ int fd[2];
+ grpc_em em;
+ grpc_em_fd read_em_fd;
+ grpc_em_fd write_em_fd;
+ int num_write; /* number of times that the write buffer is full*/
+ ssize_t bytes_written_total; /* total number of bytes written to the pipe */
+ ssize_t bytes_read_total; /* total number of bytes read from the pipe */
+ pthread_mutex_t mu; /* protect cv and done */
+ pthread_cond_t cv; /* signaled when read finished */
+ int done; /* set to 1 when read finished */
+} async_pipe;
+
+void write_shutdown_cb(void *arg, /*async_pipe*/
+ enum grpc_em_cb_status status) {
+ async_pipe *ap = arg;
+ close(ap->fd[1]);
+ grpc_em_fd_destroy(&ap->write_em_fd);
+}
+
+void write_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) {
+ async_pipe *ap = arg;
+ ssize_t bytes_written = 0;
+
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ write_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
+ return;
+ }
+
+ do {
+ bytes_written = write(ap->fd[1], write_buf, BUF_SIZE);
+ if (bytes_written > 0) ap->bytes_written_total += bytes_written;
+ } while (bytes_written > 0);
+
+ if (errno == EAGAIN) {
+ if (ap->num_write < TOTAL_WRITE) {
+ ap->num_write++;
+ grpc_em_fd_notify_on_write(&ap->write_em_fd, write_cb, ap,
+ gpr_inf_future);
+ } else {
+ /* Note that this could just shut down directly; doing a trip through the
+ shutdown path serves only a demonstration of the API. */
+ grpc_em_fd_shutdown(&ap->write_em_fd);
+ grpc_em_fd_notify_on_write(&ap->write_em_fd, write_cb, ap,
+ gpr_inf_future);
+ }
+ } else {
+ GPR_ASSERT(0 && strcat("unknown errno: ", strerror(errno)));
+ }
+}
+
+void read_shutdown_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) {
+ async_pipe *ap = arg;
+ close(ap->fd[0]);
+ grpc_em_fd_destroy(&ap->read_em_fd);
+ pthread_mutex_lock(&ap->mu);
+ if (ap->done == 0) {
+ ap->done = 1;
+ pthread_cond_signal(&ap->cv);
+ }
+ pthread_mutex_unlock(&ap->mu);
+}
+
+void read_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) {
+ async_pipe *ap = arg;
+ ssize_t bytes_read = 0;
+
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ read_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
+ return;
+ }
+
+ do {
+ bytes_read = read(ap->fd[0], read_buf, BUF_SIZE);
+ if (bytes_read > 0) ap->bytes_read_total += bytes_read;
+ } while (bytes_read > 0);
+
+ if (bytes_read == 0) {
+ /* Note that this could just shut down directly; doing a trip through the
+ shutdown path serves only a demonstration of the API. */
+ grpc_em_fd_shutdown(&ap->read_em_fd);
+ grpc_em_fd_notify_on_read(&ap->read_em_fd, read_cb, ap, gpr_inf_future);
+ } else if (bytes_read == -1) {
+ if (errno == EAGAIN) {
+ grpc_em_fd_notify_on_read(&ap->read_em_fd, read_cb, ap, gpr_inf_future);
+ } else {
+ GPR_ASSERT(0 && strcat("unknown errno: ", strerror(errno)));
+ }
+ }
+}
+
+void dummy_cb(void *arg, /*async_pipe*/ enum grpc_em_cb_status status) {}
+
+void async_pipe_init(async_pipe *ap) {
+ int i;
+
+ ap->num_write = 0;
+ ap->bytes_written_total = 0;
+ ap->bytes_read_total = 0;
+
+ pthread_mutex_init(&ap->mu, NULL);
+ pthread_cond_init(&ap->cv, NULL);
+ ap->done = 0;
+
+ GPR_ASSERT(0 == pipe(ap->fd));
+ for (i = 0; i < 2; i++) {
+ int flags = fcntl(ap->fd[i], F_GETFL, 0);
+ GPR_ASSERT(fcntl(ap->fd[i], F_SETFL, flags | O_NONBLOCK) == 0);
+ GPR_ASSERT(fcntl(ap->fd[i], F_SETPIPE_SZ, 4096) == 4096);
+ }
+
+ grpc_em_init(&ap->em);
+ grpc_em_fd_init(&ap->read_em_fd, &ap->em, ap->fd[0]);
+ grpc_em_fd_init(&ap->write_em_fd, &ap->em, ap->fd[1]);
+}
+
+static void async_pipe_start(async_pipe *ap) {
+ grpc_em_fd_notify_on_read(&ap->read_em_fd, read_cb, ap, gpr_inf_future);
+ grpc_em_fd_notify_on_write(&ap->write_em_fd, write_cb, ap, gpr_inf_future);
+}
+
+static void async_pipe_wait_destroy(async_pipe *ap) {
+ pthread_mutex_lock(&ap->mu);
+ while (!ap->done) pthread_cond_wait(&ap->cv, &ap->mu);
+ pthread_mutex_unlock(&ap->mu);
+ pthread_mutex_destroy(&ap->mu);
+ pthread_cond_destroy(&ap->cv);
+
+ grpc_em_destroy(&ap->em);
+}
+
+int main(int argc, char **argv) {
+ async_pipe ap;
+ grpc_test_init(argc, argv);
+ async_pipe_init(&ap);
+ async_pipe_start(&ap);
+ async_pipe_wait_destroy(&ap);
+ GPR_ASSERT(ap.bytes_read_total == ap.bytes_written_total);
+ gpr_log(GPR_INFO, "read total bytes %d", ap.bytes_read_total);
+ return 0;
+}
diff --git a/test/core/eventmanager/em_test.c b/test/core/eventmanager/em_test.c
new file mode 100644
index 0000000000..2bcfe86c3b
--- /dev/null
+++ b/test/core/eventmanager/em_test.c
@@ -0,0 +1,725 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* Test gRPC event manager with a simple TCP upload server and client. */
+#include "src/core/eventmanager/em.h"
+
+#include <ctype.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+#include <grpc/support/time.h>
+#include "test/core/util/test_config.h"
+
+/* buffer size used to send and receive data.
+ 1024 is the minimal value to set TCP send and receive buffer. */
+#define BUF_SIZE 1024
+
+/* Create a test socket with the right properties for testing.
+ port is the TCP port to listen or connect to.
+ Return a socket FD and sockaddr_in. */
+static void create_test_socket(int port, int *socket_fd,
+ struct sockaddr_in *sin) {
+ int fd;
+ int one = 1;
+ int buf_size = BUF_SIZE;
+ int flags;
+
+ fd = socket(AF_INET, SOCK_STREAM, 0);
+ setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
+ /* Reset the size of socket send buffer to the minimal value to facilitate
+ buffer filling up and triggering notify_on_write */
+ GPR_ASSERT(
+ setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size)) != -1);
+ GPR_ASSERT(
+ setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)) != -1);
+ /* Make fd non-blocking */
+ flags = fcntl(fd, F_GETFL, 0);
+ GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
+ *socket_fd = fd;
+
+ /* Use local address for test */
+ sin->sin_family = AF_INET;
+ sin->sin_addr.s_addr = 0;
+ sin->sin_port = htons(port);
+}
+
+/* Dummy gRPC callback */
+void no_op_cb(void *arg, enum grpc_em_cb_status status) {}
+
+/* =======An upload server to test notify_on_read===========
+ The server simply reads and counts a stream of bytes. */
+
+/* An upload server. */
+typedef struct {
+ grpc_em em; /* event manger used by the sever */
+ grpc_em_fd em_fd; /* listening fd */
+ ssize_t read_bytes_total; /* total number of received bytes */
+ gpr_mu mu; /* protect done and done_cv */
+ gpr_cv done_cv; /* signaled when a server finishes serving */
+ int done; /* set to 1 when a server finishes serving */
+} server;
+
+static void server_init(server *sv) {
+ GPR_ASSERT(grpc_em_init(&sv->em) == GRPC_EM_OK);
+ sv->read_bytes_total = 0;
+ gpr_mu_init(&sv->mu);
+ gpr_cv_init(&sv->done_cv);
+ sv->done = 0;
+}
+
+/* An upload session.
+ Created when a new upload request arrives in the server. */
+typedef struct {
+ server *sv; /* not owned by a single session */
+ grpc_em_fd em_fd; /* fd to read upload bytes */
+ char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
+} session;
+
+/* Called when an upload session can be safely shutdown.
+ Close session FD and start to shutdown listen FD. */
+static void session_shutdown_cb(void *arg, /*session*/
+ enum grpc_em_cb_status status) {
+ session *se = arg;
+ server *sv = se->sv;
+ grpc_em_fd_destroy(&se->em_fd);
+ gpr_free(se);
+ /* Start to shutdown listen fd. */
+ grpc_em_fd_shutdown(&sv->em_fd);
+}
+
+/* Called when data become readable in a session. */
+static void session_read_cb(void *arg, /*session*/
+ enum grpc_em_cb_status status) {
+ session *se = arg;
+ int fd = grpc_em_fd_get(&se->em_fd);
+
+ ssize_t read_once = 0;
+ ssize_t read_total = 0;
+
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ close(fd);
+ session_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
+ return;
+ }
+
+ do {
+ read_once = read(fd, se->read_buf, BUF_SIZE);
+ if (read_once > 0) read_total += read_once;
+ } while (read_once > 0);
+ se->sv->read_bytes_total += read_total;
+
+ /* read() returns 0 to indicate the TCP connection was closed by the client.
+ read(fd, read_buf, 0) also returns 0 which should never be called as such.
+ It is possible to read nothing due to spurious edge event or data has
+ been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
+ if (read_once == 0) {
+ grpc_em_fd_shutdown(&se->em_fd);
+ grpc_em_fd_notify_on_read(&se->em_fd, session_read_cb, se, gpr_inf_future);
+ } else if (read_once == -1) {
+ if (errno == EAGAIN) {
+ /* An edge triggered event is cached in the kernel until next poll.
+ In the current single thread implementation, session_read_cb is called
+ in the polling thread, such that polling only happens after this
+ callback, and will catch read edge event if data is available again
+ before notify_on_read.
+ TODO(chenw): in multi-threaded version, callback and polling can be
+ run in different threads. polling may catch a persist read edge event
+ before notify_on_read is called. */
+ GPR_ASSERT(grpc_em_fd_notify_on_read(&se->em_fd, session_read_cb, se,
+ gpr_inf_future) == GRPC_EM_OK);
+ } else {
+ gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
+ GPR_ASSERT(0);
+ }
+ }
+}
+
+/* Called when the listen FD can be safely shutdown.
+ Close listen FD and signal that server can be shutdown. */
+static void listen_shutdown_cb(void *arg /*server*/,
+ enum grpc_em_cb_status status) {
+ server *sv = arg;
+
+ close(grpc_em_fd_get(&sv->em_fd));
+ grpc_em_fd_destroy(&sv->em_fd);
+
+ gpr_mu_lock(&sv->mu);
+ sv->done = 1;
+ gpr_cv_signal(&sv->done_cv);
+ gpr_mu_unlock(&sv->mu);
+}
+
+/* Called when a new TCP connection request arrives in the listening port. */
+static void listen_cb(void *arg, /*=sv_arg*/
+ enum grpc_em_cb_status status) {
+ server *sv = arg;
+ int fd;
+ int flags;
+ session *se;
+ struct sockaddr_storage ss;
+ socklen_t slen = sizeof(ss);
+ struct grpc_em_fd *listen_em_fd = &sv->em_fd;
+
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ listen_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
+ return;
+ }
+
+ fd = accept(grpc_em_fd_get(listen_em_fd), (struct sockaddr *)&ss, &slen);
+ GPR_ASSERT(fd >= 0);
+ GPR_ASSERT(fd < FD_SETSIZE);
+ flags = fcntl(fd, F_GETFL, 0);
+ fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ se = gpr_malloc(sizeof(*se));
+ se->sv = sv;
+ GPR_ASSERT(grpc_em_fd_init(&se->em_fd, &sv->em, fd) == GRPC_EM_OK);
+ GPR_ASSERT(grpc_em_fd_notify_on_read(&se->em_fd, session_read_cb, se,
+ gpr_inf_future) == GRPC_EM_OK);
+
+ GPR_ASSERT(grpc_em_fd_notify_on_read(listen_em_fd, listen_cb, sv,
+ gpr_inf_future) == GRPC_EM_OK);
+}
+
+/* Max number of connections pending to be accepted by listen(). */
+#define MAX_NUM_FD 1024
+
+/* Start a test server, return the TCP listening port bound to listen_fd.
+ listen_cb() is registered to be interested in reading from listen_fd.
+ When connection request arrives, listen_cb() is called to accept the
+ connection request. */
+static int server_start(server *sv) {
+ int port = 0;
+ int fd;
+ struct sockaddr_in sin;
+ socklen_t addr_len;
+
+ create_test_socket(port, &fd, &sin);
+ addr_len = sizeof(sin);
+ GPR_ASSERT(bind(fd, (struct sockaddr *)&sin, addr_len) == 0);
+ GPR_ASSERT(getsockname(fd, (struct sockaddr *)&sin, &addr_len) == GRPC_EM_OK);
+ port = ntohs(sin.sin_port);
+ GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
+
+ GPR_ASSERT(grpc_em_fd_init(&sv->em_fd, &sv->em, fd) == GRPC_EM_OK);
+ /* Register to be interested in reading from listen_fd. */
+ GPR_ASSERT(grpc_em_fd_notify_on_read(&sv->em_fd, listen_cb, sv,
+ gpr_inf_future) == GRPC_EM_OK);
+
+ return port;
+}
+
+/* Wait and shutdown a sever. */
+static void server_wait_and_shutdown(server *sv) {
+ gpr_mu_lock(&sv->mu);
+ while (!sv->done) gpr_cv_wait(&sv->done_cv, &sv->mu, gpr_inf_future);
+ gpr_mu_unlock(&sv->mu);
+
+ gpr_mu_destroy(&sv->mu);
+ gpr_cv_destroy(&sv->done_cv);
+
+ GPR_ASSERT(grpc_em_destroy(&sv->em) == GRPC_EM_OK);
+}
+
+/* ===An upload client to test notify_on_write=== */
+
+/* Client write buffer size */
+#define CLIENT_WRITE_BUF_SIZE 10
+/* Total number of times that the client fills up the write buffer */
+#define CLIENT_TOTAL_WRITE_CNT 3
+
+/* An upload client. */
+typedef struct {
+ grpc_em em;
+ grpc_em_fd em_fd;
+ char write_buf[CLIENT_WRITE_BUF_SIZE];
+ ssize_t write_bytes_total;
+ /* Number of times that the client fills up the write buffer and calls
+ notify_on_write to schedule another write. */
+ int client_write_cnt;
+
+ gpr_mu mu; /* protect done and done_cv */
+ gpr_cv done_cv; /* signaled when a client finishes sending */
+ int done; /* set to 1 when a client finishes sending */
+} client;
+
+static void client_init(client *cl) {
+ GPR_ASSERT(grpc_em_init(&cl->em) == GRPC_EM_OK);
+ memset(cl->write_buf, 0, sizeof(cl->write_buf));
+ cl->write_bytes_total = 0;
+ cl->client_write_cnt = 0;
+ gpr_mu_init(&cl->mu);
+ gpr_cv_init(&cl->done_cv);
+ cl->done = 0;
+}
+
+/* Called when a client upload session is ready to shutdown. */
+static void client_session_shutdown_cb(void *arg /*client*/,
+ enum grpc_em_cb_status status) {
+ client *cl = arg;
+ grpc_em_fd_destroy(&cl->em_fd);
+ gpr_mu_lock(&cl->mu);
+ cl->done = 1;
+ gpr_cv_signal(&cl->done_cv);
+ gpr_mu_unlock(&cl->mu);
+}
+
+/* Write as much as possible, then register notify_on_write. */
+static void client_session_write(void *arg, /*client*/
+ enum grpc_em_cb_status status) {
+ client *cl = arg;
+ int fd = grpc_em_fd_get(&cl->em_fd);
+ ssize_t write_once = 0;
+
+ if (status == GRPC_CALLBACK_CANCELLED) {
+ client_session_shutdown_cb(arg, GRPC_CALLBACK_SUCCESS);
+ return;
+ }
+
+ do {
+ write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
+ if (write_once > 0) cl->write_bytes_total += write_once;
+ } while (write_once > 0);
+
+ if (errno == EAGAIN) {
+ gpr_mu_lock(&cl->mu);
+ if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
+ GPR_ASSERT(grpc_em_fd_notify_on_write(&cl->em_fd, client_session_write,
+ cl, gpr_inf_future) == GRPC_EM_OK);
+ cl->client_write_cnt++;
+ } else {
+ close(fd);
+ grpc_em_fd_shutdown(&cl->em_fd);
+ grpc_em_fd_notify_on_write(&cl->em_fd, client_session_write, cl,
+ gpr_inf_future);
+ }
+ gpr_mu_unlock(&cl->mu);
+ } else {
+ gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
+ GPR_ASSERT(0);
+ }
+}
+
+/* Start a client to send a stream of bytes. */
+static void client_start(client *cl, int port) {
+ int fd;
+ struct sockaddr_in sin;
+ create_test_socket(port, &fd, &sin);
+ if (connect(fd, (struct sockaddr *)&sin, sizeof(sin)) == -1 &&
+ errno != EINPROGRESS) {
+ gpr_log(GPR_ERROR, "Failed to connect to the server");
+ GPR_ASSERT(0);
+ }
+
+ GPR_ASSERT(grpc_em_fd_init(&cl->em_fd, &cl->em, fd) == GRPC_EM_OK);
+
+ client_session_write(cl, GRPC_CALLBACK_SUCCESS);
+}
+
+/* Wait for the signal to shutdown a client. */
+static void client_wait_and_shutdown(client *cl) {
+ gpr_mu_lock(&cl->mu);
+ while (!cl->done) gpr_cv_wait(&cl->done_cv, &cl->mu, gpr_inf_future);
+ gpr_mu_unlock(&cl->mu);
+
+ gpr_mu_destroy(&cl->mu);
+ gpr_cv_destroy(&cl->done_cv);
+
+ GPR_ASSERT(grpc_em_destroy(&cl->em) == GRPC_EM_OK);
+}
+
+/* Test grpc_em_fd. Start an upload server and client, upload a stream of
+ bytes from the client to the server, and verify that the total number of
+ sent bytes is equal to the total number of received bytes. */
+static void test_grpc_em_fd() {
+ server sv;
+ client cl;
+ int port;
+
+ server_init(&sv);
+ port = server_start(&sv);
+ client_init(&cl);
+ client_start(&cl, port);
+ client_wait_and_shutdown(&cl);
+ server_wait_and_shutdown(&sv);
+ GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
+ gpr_log(GPR_INFO, "Total read bytes %d", sv.read_bytes_total);
+}
+
+typedef struct fd_change_data {
+ gpr_mu mu;
+ gpr_cv cv;
+ void (*cb_that_ran)(void *, enum grpc_em_cb_status);
+} fd_change_data;
+
+void init_change_data(fd_change_data *fdc) {
+ gpr_mu_init(&fdc->mu);
+ gpr_cv_init(&fdc->cv);
+ fdc->cb_that_ran = NULL;
+}
+
+void destroy_change_data(fd_change_data *fdc) {
+ gpr_mu_destroy(&fdc->mu);
+ gpr_cv_destroy(&fdc->cv);
+}
+
+static void first_read_callback(void *arg /* fd_change_data */,
+ enum grpc_em_cb_status status) {
+ fd_change_data *fdc = arg;
+
+ gpr_mu_lock(&fdc->mu);
+ fdc->cb_that_ran = first_read_callback;
+ gpr_cv_signal(&fdc->cv);
+ gpr_mu_unlock(&fdc->mu);
+}
+
+static void second_read_callback(void *arg /* fd_change_data */,
+ enum grpc_em_cb_status status) {
+ fd_change_data *fdc = arg;
+
+ gpr_mu_lock(&fdc->mu);
+ fdc->cb_that_ran = second_read_callback;
+ gpr_cv_signal(&fdc->cv);
+ gpr_mu_unlock(&fdc->mu);
+}
+
+/* Test that changing the callback we use for notify_on_read actually works.
+ Note that we have two different but almost identical callbacks above -- the
+ point is to have two different function pointers and two different data
+ pointers and make sure that changing both really works. */
+static void test_grpc_em_fd_change() {
+ grpc_em em;
+ grpc_em_fd em_fd;
+ fd_change_data a, b;
+ int flags;
+ int sv[2];
+ char data;
+ int result;
+
+ init_change_data(&a);
+ init_change_data(&b);
+
+ GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
+ flags = fcntl(sv[0], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
+ flags = fcntl(sv[1], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
+
+ grpc_em_init(&em);
+ grpc_em_fd_init(&em_fd, &em, sv[0]);
+
+ /* Register the first callback, then make its FD readable */
+ grpc_em_fd_notify_on_read(&em_fd, first_read_callback, &a, gpr_inf_future);
+ data = 0;
+ result = write(sv[1], &data, 1);
+ GPR_ASSERT(result == 1);
+
+ /* And now wait for it to run. */
+ gpr_mu_lock(&a.mu);
+ while (a.cb_that_ran == NULL) {
+ gpr_cv_wait(&a.cv, &a.mu, gpr_inf_future);
+ }
+ GPR_ASSERT(a.cb_that_ran == first_read_callback);
+ gpr_mu_unlock(&a.mu);
+
+ /* And drain the socket so we can generate a new read edge */
+ result = read(sv[0], &data, 1);
+ GPR_ASSERT(result == 1);
+
+ /* Now register a second callback with distinct change data, and do the same
+ thing again. */
+ grpc_em_fd_notify_on_read(&em_fd, second_read_callback, &b, gpr_inf_future);
+ data = 0;
+ result = write(sv[1], &data, 1);
+ GPR_ASSERT(result == 1);
+
+ gpr_mu_lock(&b.mu);
+ while (b.cb_that_ran == NULL) {
+ gpr_cv_wait(&b.cv, &b.mu, gpr_inf_future);
+ }
+ /* Except now we verify that second_read_callback ran instead */
+ GPR_ASSERT(b.cb_that_ran == second_read_callback);
+ gpr_mu_unlock(&b.mu);
+
+ grpc_em_fd_destroy(&em_fd);
+ grpc_em_destroy(&em);
+ destroy_change_data(&a);
+ destroy_change_data(&b);
+ close(sv[0]);
+ close(sv[1]);
+}
+
+void timeout_callback(void *arg, enum grpc_em_cb_status status) {
+ if (status == GRPC_CALLBACK_TIMED_OUT) {
+ gpr_event_set(arg, (void *)1);
+ } else {
+ gpr_event_set(arg, (void *)2);
+ }
+}
+
+void test_grpc_em_fd_notify_timeout() {
+ grpc_em em;
+ grpc_em_fd em_fd;
+ gpr_event ev;
+ int flags;
+ int sv[2];
+ gpr_timespec timeout;
+ gpr_timespec deadline;
+
+ gpr_event_init(&ev);
+
+ GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
+ flags = fcntl(sv[0], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
+ flags = fcntl(sv[1], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
+
+ grpc_em_init(&em);
+ grpc_em_fd_init(&em_fd, &em, sv[0]);
+
+ timeout = gpr_time_from_micros(1000000);
+ deadline = gpr_time_add(gpr_now(), timeout);
+
+ grpc_em_fd_notify_on_read(&em_fd, timeout_callback, &ev, deadline);
+
+ GPR_ASSERT(gpr_event_wait(&ev, gpr_time_add(deadline, timeout)));
+
+ GPR_ASSERT(gpr_event_get(&ev) == (void *)1);
+ grpc_em_fd_destroy(&em_fd);
+ grpc_em_destroy(&em);
+ close(sv[0]);
+ close(sv[1]);
+}
+
+typedef struct {
+ grpc_em *em;
+ gpr_cv cv;
+ gpr_mu mu;
+ int counter;
+ int done_success_ctr;
+ int done_cancel_ctr;
+ int done;
+ gpr_event fcb_arg;
+ grpc_em_cb_status status;
+} alarm_arg;
+
+static void followup_cb(void *arg, grpc_em_cb_status status) {
+ gpr_event_set((gpr_event *)arg, arg);
+}
+
+/* Called when an alarm expires. */
+static void alarm_cb(void *arg /* alarm_arg */, grpc_em_cb_status status) {
+ alarm_arg *a = arg;
+ gpr_mu_lock(&a->mu);
+ if (status == GRPC_CALLBACK_SUCCESS) {
+ a->counter++;
+ a->done_success_ctr++;
+ } else if (status == GRPC_CALLBACK_CANCELLED) {
+ a->done_cancel_ctr++;
+ } else {
+ GPR_ASSERT(0);
+ }
+ a->done = 1;
+ a->status = status;
+ gpr_cv_signal(&a->cv);
+ gpr_mu_unlock(&a->mu);
+ grpc_em_add_callback(a->em, followup_cb, &a->fcb_arg);
+}
+
+/* Test grpc_em_alarm add and cancel. */
+static void test_grpc_em_alarm() {
+ struct grpc_em em;
+ struct grpc_em_alarm alarm;
+ struct grpc_em_alarm alarm_to_cancel;
+ gpr_timespec tv0 = {0, 1};
+ /* Timeout on the alarm cond. var, so make big enough to absorb time
+ deviations. Otherwise, operations after wait will not be properly ordered
+ */
+ gpr_timespec tv1 = gpr_time_from_micros(200000);
+ gpr_timespec tv2 = {0, 1};
+ gpr_timespec alarm_deadline;
+ gpr_timespec followup_deadline;
+
+ alarm_arg *cancel_arg = NULL;
+ alarm_arg arg;
+ alarm_arg arg2;
+ void *fdone;
+
+ GPR_ASSERT(grpc_em_init(&em) == GRPC_EM_OK);
+
+ arg.em = &em;
+ arg.counter = 0;
+ arg.status = GRPC_CALLBACK_DO_NOT_USE;
+ arg.done_success_ctr = 0;
+ arg.done_cancel_ctr = 0;
+ arg.done = 0;
+ gpr_mu_init(&arg.mu);
+ gpr_cv_init(&arg.cv);
+ gpr_event_init(&arg.fcb_arg);
+
+ GPR_ASSERT(grpc_em_alarm_init(&alarm, &em, alarm_cb, &arg) == GRPC_EM_OK);
+ GPR_ASSERT(grpc_em_alarm_add(&alarm, gpr_time_add(tv0, gpr_now())) ==
+ GRPC_EM_OK);
+
+ alarm_deadline = gpr_time_add(gpr_now(), tv1);
+ gpr_mu_lock(&arg.mu);
+ while (arg.done == 0) {
+ gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline);
+ }
+ gpr_mu_unlock(&arg.mu);
+
+ followup_deadline = gpr_time_add(gpr_now(), tv1);
+ fdone = gpr_event_wait(&arg.fcb_arg, followup_deadline);
+
+ if (arg.counter != 1) {
+ gpr_log(GPR_ERROR, "Alarm callback not called");
+ GPR_ASSERT(0);
+ } else if (arg.done_success_ctr != 1) {
+ gpr_log(GPR_ERROR, "Alarm done callback not called with success");
+ GPR_ASSERT(0);
+ } else if (arg.done_cancel_ctr != 0) {
+ gpr_log(GPR_ERROR, "Alarm done callback called with cancel");
+ GPR_ASSERT(0);
+ } else if (arg.status == GRPC_CALLBACK_DO_NOT_USE) {
+ gpr_log(GPR_ERROR, "Alarm callback without status");
+ GPR_ASSERT(0);
+ } else {
+ gpr_log(GPR_INFO, "Alarm callback called successfully");
+ }
+
+ if (fdone != (void *)&arg.fcb_arg) {
+ gpr_log(GPR_ERROR, "Followup callback #1 not invoked properly %p %p", fdone,
+ &arg.fcb_arg);
+ GPR_ASSERT(0);
+ }
+ gpr_cv_destroy(&arg.cv);
+ gpr_mu_destroy(&arg.mu);
+
+ arg2.em = &em;
+ arg2.counter = 0;
+ arg2.status = GRPC_CALLBACK_DO_NOT_USE;
+ arg2.done_success_ctr = 0;
+ arg2.done_cancel_ctr = 0;
+ arg2.done = 0;
+ gpr_mu_init(&arg2.mu);
+ gpr_cv_init(&arg2.cv);
+ gpr_event_init(&arg2.fcb_arg);
+
+ GPR_ASSERT(grpc_em_alarm_init(&alarm_to_cancel, &em, alarm_cb, &arg2) ==
+ GRPC_EM_OK);
+ GPR_ASSERT(grpc_em_alarm_add(&alarm_to_cancel,
+ gpr_time_add(tv2, gpr_now())) == GRPC_EM_OK);
+ switch (grpc_em_alarm_cancel(&alarm_to_cancel, (void **)&cancel_arg)) {
+ case GRPC_EM_OK:
+ gpr_log(GPR_INFO, "Alarm cancel succeeded");
+ break;
+ case GRPC_EM_ERROR:
+ gpr_log(GPR_ERROR, "Alarm cancel failed");
+ GPR_ASSERT(0);
+ break;
+ case GRPC_EM_INVALID_ARGUMENTS:
+ gpr_log(GPR_ERROR, "Alarm cancel failed with bad response code");
+ gpr_log(GPR_ERROR, "Current value of triggered is %d\n",
+ (int)alarm_to_cancel.triggered);
+ GPR_ASSERT(0);
+ break;
+ }
+
+ alarm_deadline = gpr_time_add(gpr_now(), tv1);
+ gpr_mu_lock(&arg2.mu);
+ while (arg2.done == 0) {
+ gpr_cv_wait(&arg2.cv, &arg2.mu, alarm_deadline);
+ }
+ gpr_mu_unlock(&arg2.mu);
+
+ followup_deadline = gpr_time_add(gpr_now(), tv1);
+ fdone = gpr_event_wait(&arg2.fcb_arg, followup_deadline);
+
+ if (arg2.counter != arg2.done_success_ctr) {
+ gpr_log(GPR_ERROR, "Alarm callback called but didn't lead to done success");
+ GPR_ASSERT(0);
+ } else if (arg2.done_success_ctr && arg2.done_cancel_ctr) {
+ gpr_log(GPR_ERROR, "Alarm done callback called with success and cancel");
+ GPR_ASSERT(0);
+ } else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) {
+ gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times");
+ GPR_ASSERT(0);
+ } else if (arg2.status == GRPC_CALLBACK_DO_NOT_USE) {
+ gpr_log(GPR_ERROR, "Alarm callback without status");
+ GPR_ASSERT(0);
+ } else if (arg2.done_success_ctr) {
+ gpr_log(GPR_INFO, "Alarm callback executed before cancel");
+ gpr_log(GPR_INFO, "Current value of triggered is %d\n",
+ (int)alarm_to_cancel.triggered);
+ } else if (arg2.done_cancel_ctr) {
+ gpr_log(GPR_INFO, "Alarm callback canceled");
+ gpr_log(GPR_INFO, "Current value of triggered is %d\n",
+ (int)alarm_to_cancel.triggered);
+ } else {
+ gpr_log(GPR_ERROR, "Alarm cancel test should not be here");
+ GPR_ASSERT(0);
+ }
+
+ if (cancel_arg != &arg2) {
+ gpr_log(GPR_ERROR, "Alarm cancel arg address wrong");
+ GPR_ASSERT(0);
+ }
+ if (fdone != (void *)&arg2.fcb_arg) {
+ gpr_log(GPR_ERROR, "Followup callback #2 not invoked properly %p %p", fdone,
+ &arg2.fcb_arg);
+ GPR_ASSERT(0);
+ }
+ gpr_cv_destroy(&arg2.cv);
+ gpr_mu_destroy(&arg2.mu);
+
+ GPR_ASSERT(grpc_em_destroy(&em) == GRPC_EM_OK);
+}
+
+int main(int argc, char **argv) {
+ grpc_test_init(argc, argv);
+ test_grpc_em_alarm();
+ test_grpc_em_fd();
+ test_grpc_em_fd_change();
+ test_grpc_em_fd_notify_timeout();
+ return 0;
+}