aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/fd_posix_test.c
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/iomgr/fd_posix_test.c')
-rw-r--r--test/core/iomgr/fd_posix_test.c78
1 files changed, 46 insertions, 32 deletions
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index f97f33712e..62dc24d85a 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -52,6 +52,7 @@
#include "src/core/lib/iomgr/ev_posix.h"
#include "src/core/lib/iomgr/iomgr.h"
+#include "src/core/lib/iomgr/socket_utils_posix.h"
#include "test/core/util/test_config.h"
static gpr_mu *g_mu;
@@ -68,17 +69,15 @@ static void create_test_socket(int port, int *socket_fd,
struct sockaddr_in *sin) {
int fd;
int one = 1;
- int buf_size = BUF_SIZE;
+ int buffer_size_bytes = BUF_SIZE;
int flags;
fd = socket(AF_INET, SOCK_STREAM, 0);
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
/* Reset the size of socket send buffer to the minimal value to facilitate
buffer filling up and triggering notify_on_write */
- GPR_ASSERT(
- setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size)) != -1);
- GPR_ASSERT(
- setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)) != -1);
+ GPR_ASSERT(grpc_set_socket_sndbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
+ GPR_ASSERT(grpc_set_socket_rcvbuf(fd, buffer_size_bytes) == GRPC_ERROR_NONE);
/* Make fd non-blocking */
flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
@@ -133,14 +132,14 @@ static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
/* Called when data become readable in a session. */
static void session_read_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
- bool success) {
+ grpc_error *error) {
session *se = arg;
int fd = grpc_fd_wrapped_fd(se->em_fd);
ssize_t read_once = 0;
ssize_t read_total = 0;
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
session_shutdown_cb(exec_ctx, arg, 1);
return;
}
@@ -185,13 +184,14 @@ static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */,
gpr_mu_lock(g_mu);
sv->done = 1;
- grpc_pollset_kick(g_pollset, NULL);
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
gpr_mu_unlock(g_mu);
}
/* Called when a new TCP connection request arrives in the listening port. */
static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/
- bool success) {
+ grpc_error *error) {
server *sv = arg;
int fd;
int flags;
@@ -200,7 +200,7 @@ static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/
socklen_t slen = sizeof(ss);
grpc_fd *listen_em_fd = sv->em_fd;
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
listen_shutdown_cb(exec_ctx, arg, 1);
return;
}
@@ -257,9 +257,11 @@ static void server_wait_and_shutdown(server *sv) {
while (!sv->done) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker *worker = NULL;
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_inf_future(GPR_CLOCK_MONOTONIC))));
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
@@ -300,17 +302,18 @@ static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx,
client *cl = arg;
grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, NULL, "c");
cl->done = 1;
- grpc_pollset_kick(g_pollset, NULL);
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
}
/* Write as much as possible, then register notify_on_write. */
static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */
- bool success) {
+ grpc_error *error) {
client *cl = arg;
int fd = grpc_fd_wrapped_fd(cl->em_fd);
ssize_t write_once = 0;
- if (!success) {
+ if (error != GRPC_ERROR_NONE) {
gpr_mu_lock(g_mu);
client_session_shutdown_cb(exec_ctx, arg, 1);
gpr_mu_unlock(g_mu);
@@ -363,7 +366,7 @@ static void client_start(grpc_exec_ctx *exec_ctx, client *cl, int port) {
cl->em_fd = grpc_fd_create(fd, "client");
grpc_pollset_add_fd(exec_ctx, g_pollset, cl->em_fd);
- client_session_write(exec_ctx, cl, 1);
+ client_session_write(exec_ctx, cl, GRPC_ERROR_NONE);
}
/* Wait for the signal to shutdown a client. */
@@ -372,9 +375,11 @@ static void client_wait_and_shutdown(client *cl) {
while (!cl->done) {
grpc_pollset_worker *worker = NULL;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_inf_future(GPR_CLOCK_MONOTONIC))));
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
@@ -399,7 +404,7 @@ static void test_grpc_fd(void) {
client_wait_and_shutdown(&cl);
server_wait_and_shutdown(&sv);
GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
- gpr_log(GPR_INFO, "Total read bytes %d", sv.read_bytes_total);
+ gpr_log(GPR_INFO, "Total read bytes %" PRIdPTR, sv.read_bytes_total);
}
typedef struct fd_change_data {
@@ -411,22 +416,26 @@ void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; }
void destroy_change_data(fd_change_data *fdc) {}
static void first_read_callback(grpc_exec_ctx *exec_ctx,
- void *arg /* fd_change_data */, bool success) {
+ void *arg /* fd_change_data */,
+ grpc_error *error) {
fd_change_data *fdc = arg;
gpr_mu_lock(g_mu);
fdc->cb_that_ran = first_read_callback;
- grpc_pollset_kick(g_pollset, NULL);
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
gpr_mu_unlock(g_mu);
}
static void second_read_callback(grpc_exec_ctx *exec_ctx,
- void *arg /* fd_change_data */, bool success) {
+ void *arg /* fd_change_data */,
+ grpc_error *error) {
fd_change_data *fdc = arg;
gpr_mu_lock(g_mu);
fdc->cb_that_ran = second_read_callback;
- grpc_pollset_kick(g_pollset, NULL);
+ GPR_ASSERT(
+ GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, NULL)));
gpr_mu_unlock(g_mu);
}
@@ -472,9 +481,11 @@ static void test_grpc_fd_change(void) {
gpr_mu_lock(g_mu);
while (a.cb_that_ran == NULL) {
grpc_pollset_worker *worker = NULL;
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_inf_future(GPR_CLOCK_MONOTONIC))));
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
@@ -496,9 +507,11 @@ static void test_grpc_fd_change(void) {
gpr_mu_lock(g_mu);
while (b.cb_that_ran == NULL) {
grpc_pollset_worker *worker = NULL;
- grpc_pollset_work(&exec_ctx, g_pollset, &worker,
- gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ GPR_ASSERT(GRPC_LOG_IF_ERROR(
+ "pollset_work",
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_inf_future(GPR_CLOCK_MONOTONIC))));
gpr_mu_unlock(g_mu);
grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(g_mu);
@@ -514,7 +527,8 @@ static void test_grpc_fd_change(void) {
close(sv[1]);
}
-static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, bool success) {
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p,
+ grpc_error *error) {
grpc_pollset_destroy(p);
}