aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2015-07-06 14:05:54 -0700
committerGravatar yang-g <yangg@google.com>2015-07-06 14:05:54 -0700
commit5ea46ab2482c3724fbc7fd0aab55f324fb65999c (patch)
tree55eebc4aae8f06f931c8f75ddf84d56595f99fa1 /test/core/iomgr
parent3abe60b9d08ff5a784a39f7c4a10c631547c3526 (diff)
parentd426864934ac60f46e538ba81932e405fa8949b1 (diff)
merge with upstream and resolve conflicts
Diffstat (limited to 'test/core/iomgr')
-rw-r--r--test/core/iomgr/endpoint_tests.c90
-rw-r--r--test/core/iomgr/endpoint_tests.h5
-rw-r--r--test/core/iomgr/fd_conservation_posix_test.c63
-rw-r--r--test/core/iomgr/fd_posix_test.c103
-rw-r--r--test/core/iomgr/poll_kick_posix_test.c44
-rw-r--r--test/core/iomgr/tcp_client_posix_test.c96
-rw-r--r--test/core/iomgr/tcp_posix_test.c88
-rw-r--r--test/core/iomgr/tcp_server_posix_test.c49
8 files changed, 322 insertions, 216 deletions
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index f9c5282f19..8198c24752 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -57,6 +57,8 @@
*/
+static grpc_pollset *g_pollset;
+
size_t count_and_unref_slices(gpr_slice *slices, size_t nslices,
int *current_data) {
size_t num_bytes = 0;
@@ -111,8 +113,6 @@ static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size,
struct read_and_write_test_state {
grpc_endpoint *read_ep;
grpc_endpoint *write_ep;
- gpr_mu mu;
- gpr_cv cv;
size_t target_bytes;
size_t bytes_read;
size_t current_write_size;
@@ -130,10 +130,10 @@ static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
gpr_log(GPR_INFO, "Read handler shutdown");
- gpr_mu_lock(&state->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->read_done = 1;
- gpr_cv_signal(&state->cv);
- gpr_mu_unlock(&state->mu);
+ grpc_pollset_kick(g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
return;
}
@@ -141,10 +141,10 @@ static void read_and_write_test_read_handler(void *data, gpr_slice *slices,
count_and_unref_slices(slices, nslices, &state->current_read_data);
if (state->bytes_read == state->target_bytes) {
gpr_log(GPR_INFO, "Read handler done");
- gpr_mu_lock(&state->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->read_done = 1;
- gpr_cv_signal(&state->cv);
- gpr_mu_unlock(&state->mu);
+ grpc_pollset_kick(g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else {
grpc_endpoint_notify_on_read(state->read_ep,
read_and_write_test_read_handler, data);
@@ -160,14 +160,15 @@ static void read_and_write_test_write_handler(void *data,
GPR_ASSERT(error != GRPC_ENDPOINT_CB_ERROR);
- gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler", error);
+ gpr_log(GPR_DEBUG, "%s: error=%d", "read_and_write_test_write_handler",
+ error);
if (error == GRPC_ENDPOINT_CB_SHUTDOWN) {
gpr_log(GPR_INFO, "Write handler shutdown");
- gpr_mu_lock(&state->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->write_done = 1;
- gpr_cv_signal(&state->cv);
- gpr_mu_unlock(&state->mu);
+ grpc_pollset_kick(g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
return;
}
@@ -198,10 +199,10 @@ static void read_and_write_test_write_handler(void *data,
GPR_ASSERT(state->bytes_written == state->target_bytes);
gpr_log(GPR_INFO, "Write handler done");
- gpr_mu_lock(&state->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->write_done = 1;
- gpr_cv_signal(&state->cv);
- gpr_mu_unlock(&state->mu);
+ grpc_pollset_kick(g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
/* Do both reading and writing using the grpc_endpoint API.
@@ -213,7 +214,8 @@ static void read_and_write_test(grpc_endpoint_test_config config,
size_t slice_size, int shutdown) {
struct read_and_write_test_state state;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
- grpc_endpoint_test_fixture f = begin_test(config, "read_and_write_test", slice_size);
+ grpc_endpoint_test_fixture f =
+ begin_test(config, "read_and_write_test", slice_size);
if (shutdown) {
gpr_log(GPR_INFO, "Start read and write shutdown test");
@@ -222,9 +224,6 @@ static void read_and_write_test(grpc_endpoint_test_config config,
num_bytes, slice_size);
}
- gpr_mu_init(&state.mu);
- gpr_cv_init(&state.cv);
-
state.read_ep = f.client_ep;
state.write_ep = f.server_ep;
state.target_bytes = num_bytes;
@@ -253,29 +252,24 @@ static void read_and_write_test(grpc_endpoint_test_config config,
grpc_endpoint_shutdown(state.write_ep);
}
- gpr_mu_lock(&state.mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
while (!state.read_done || !state.write_done) {
- if (gpr_cv_wait(&state.cv, &state.mu, deadline)) {
- gpr_log(GPR_ERROR, "timeout: read_done=%d, write_done=%d",
- state.read_done, state.write_done);
- abort();
- }
+ GPR_ASSERT(gpr_time_cmp(gpr_now(), deadline) < 0);
+ grpc_pollset_work(g_pollset, deadline);
}
- gpr_mu_unlock(&state.mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
grpc_endpoint_destroy(state.read_ep);
grpc_endpoint_destroy(state.write_ep);
- gpr_mu_destroy(&state.mu);
- gpr_cv_destroy(&state.cv);
end_test(config);
}
struct timeout_test_state {
- gpr_event io_done;
+ int io_done;
};
typedef struct {
- gpr_event ev;
+ int done;
grpc_endpoint *ep;
} shutdown_during_write_test_state;
@@ -291,7 +285,10 @@ static void shutdown_during_write_test_read_handler(
if (error != GRPC_ENDPOINT_CB_OK) {
grpc_endpoint_destroy(st->ep);
- gpr_event_set(&st->ev, (void *)(gpr_intptr) error);
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+ st->done = error;
+ grpc_pollset_kick(g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else {
grpc_endpoint_notify_on_read(
st->ep, shutdown_during_write_test_read_handler, user_data);
@@ -310,7 +307,10 @@ static void shutdown_during_write_test_write_handler(
gpr_log(GPR_ERROR,
"shutdown_during_write_test_write_handler completed unexpectedly");
}
- gpr_event_set(&st->ev, (void *)(gpr_intptr) 1);
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+ st->done = 1;
+ grpc_pollset_kick(g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
}
static void shutdown_during_write_test(grpc_endpoint_test_config config,
@@ -323,14 +323,15 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
shutdown_during_write_test_state read_st;
shutdown_during_write_test_state write_st;
gpr_slice *slices;
- grpc_endpoint_test_fixture f = begin_test(config, "shutdown_during_write_test", slice_size);
+ grpc_endpoint_test_fixture f =
+ begin_test(config, "shutdown_during_write_test", slice_size);
gpr_log(GPR_INFO, "testing shutdown during a write");
read_st.ep = f.client_ep;
write_st.ep = f.server_ep;
- gpr_event_init(&read_st.ev);
- gpr_event_init(&write_st.ev);
+ read_st.done = 0;
+ write_st.done = 0;
grpc_endpoint_notify_on_read(
read_st.ep, shutdown_during_write_test_read_handler, &read_st);
@@ -347,9 +348,19 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
case GRPC_ENDPOINT_WRITE_PENDING:
grpc_endpoint_shutdown(write_st.ep);
deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
- GPR_ASSERT(gpr_event_wait(&write_st.ev, deadline));
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+ while (!write_st.done) {
+ GPR_ASSERT(gpr_time_cmp(gpr_now(), deadline) < 0);
+ grpc_pollset_work(g_pollset, deadline);
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
grpc_endpoint_destroy(write_st.ep);
- GPR_ASSERT(gpr_event_wait(&read_st.ev, deadline));
+ gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
+ while (!read_st.done) {
+ GPR_ASSERT(gpr_time_cmp(gpr_now(), deadline) < 0);
+ grpc_pollset_work(g_pollset, deadline);
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
gpr_free(slices);
end_test(config);
return;
@@ -361,9 +372,12 @@ static void shutdown_during_write_test(grpc_endpoint_test_config config,
abort();
}
-void grpc_endpoint_tests(grpc_endpoint_test_config config) {
+void grpc_endpoint_tests(grpc_endpoint_test_config config,
+ grpc_pollset *pollset) {
+ g_pollset = pollset;
read_and_write_test(config, 10000000, 100000, 8192, 0);
read_and_write_test(config, 1000000, 100000, 1, 0);
read_and_write_test(config, 100000000, 100000, 1, 1);
shutdown_during_write_test(config, 1000);
+ g_pollset = NULL;
}
diff --git a/test/core/iomgr/endpoint_tests.h b/test/core/iomgr/endpoint_tests.h
index 1679d7bd4f..700f854891 100644
--- a/test/core/iomgr/endpoint_tests.h
+++ b/test/core/iomgr/endpoint_tests.h
@@ -52,6 +52,7 @@ struct grpc_endpoint_test_config {
void (*clean_up)();
};
-void grpc_endpoint_tests(grpc_endpoint_test_config config);
+void grpc_endpoint_tests(grpc_endpoint_test_config config,
+ grpc_pollset *pollset);
-#endif /* GRPC_TEST_CORE_IOMGR_ENDPOINT_TESTS_H */
+#endif /* GRPC_TEST_CORE_IOMGR_ENDPOINT_TESTS_H */
diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c
new file mode 100644
index 0000000000..aa4551f2f1
--- /dev/null
+++ b/test/core/iomgr/fd_conservation_posix_test.c
@@ -0,0 +1,63 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <sys/resource.h>
+
+#include <grpc/support/log.h>
+
+#include "test/core/util/test_config.h"
+#include "src/core/iomgr/endpoint_pair.h"
+#include "src/core/iomgr/iomgr.h"
+
+int main(int argc, char **argv) {
+ int i;
+ struct rlimit rlim;
+ grpc_endpoint_pair p;
+ grpc_test_init(argc, argv);
+ grpc_iomgr_init();
+
+ /* set max # of file descriptors to a low value, and
+ verify we can create and destroy many more than this number
+ of descriptors */
+ rlim.rlim_cur = rlim.rlim_max = 10;
+ GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim));
+
+ for (i = 0; i < 100; i++) {
+ p = grpc_iomgr_create_endpoint_pair("test", 1);
+ grpc_endpoint_destroy(p.client);
+ grpc_endpoint_destroy(p.server);
+ }
+
+ grpc_iomgr_shutdown();
+ return 0;
+}
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 2c8a89e4cd..fe08ec495f 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -51,6 +51,8 @@
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
+static grpc_pollset g_pollset;
+
/* buffer size used to send and receive data.
1024 is the minimal value to set TCP send and receive buffer. */
#define BUF_SIZE 1024
@@ -94,16 +96,12 @@ void no_op_cb(void *arg, int success) {}
typedef struct {
grpc_fd *em_fd; /* listening fd */
ssize_t read_bytes_total; /* total number of received bytes */
- gpr_mu mu; /* protect done and done_cv */
- gpr_cv done_cv; /* signaled when a server finishes serving */
int done; /* set to 1 when a server finishes serving */
grpc_iomgr_closure listen_closure;
} server;
static void server_init(server *sv) {
sv->read_bytes_total = 0;
- gpr_mu_init(&sv->mu);
- gpr_cv_init(&sv->done_cv);
sv->done = 0;
}
@@ -122,7 +120,7 @@ static void session_shutdown_cb(void *arg, /*session*/
int success) {
session *se = arg;
server *sv = se->sv;
- grpc_fd_orphan(se->em_fd, NULL, NULL);
+ grpc_fd_orphan(se->em_fd, NULL, "a");
gpr_free(se);
/* Start to shutdown listen fd. */
grpc_fd_shutdown(sv->em_fd);
@@ -177,12 +175,12 @@ static void session_read_cb(void *arg, /*session*/
static void listen_shutdown_cb(void *arg /*server*/, int success) {
server *sv = arg;
- grpc_fd_orphan(sv->em_fd, NULL, NULL);
+ grpc_fd_orphan(sv->em_fd, NULL, "b");
- gpr_mu_lock(&sv->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
sv->done = 1;
- gpr_cv_signal(&sv->done_cv);
- gpr_mu_unlock(&sv->mu);
+ grpc_pollset_kick(&g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
/* Called when a new TCP connection request arrives in the listening port. */
@@ -209,6 +207,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
se = gpr_malloc(sizeof(*se));
se->sv = sv;
se->em_fd = grpc_fd_create(fd, "listener");
+ grpc_pollset_add_fd(&g_pollset, se->em_fd);
se->session_read_closure.cb = session_read_cb;
se->session_read_closure.cb_arg = se;
grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
@@ -237,6 +236,7 @@ static int server_start(server *sv) {
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
sv->em_fd = grpc_fd_create(fd, "server");
+ grpc_pollset_add_fd(&g_pollset, sv->em_fd);
/* Register to be interested in reading from listen_fd. */
sv->listen_closure.cb = listen_cb;
sv->listen_closure.cb_arg = sv;
@@ -247,12 +247,11 @@ static int server_start(server *sv) {
/* Wait and shutdown a sever. */
static void server_wait_and_shutdown(server *sv) {
- gpr_mu_lock(&sv->mu);
- while (!sv->done) gpr_cv_wait(&sv->done_cv, &sv->mu, gpr_inf_future);
- gpr_mu_unlock(&sv->mu);
-
- gpr_mu_destroy(&sv->mu);
- gpr_cv_destroy(&sv->done_cv);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ while (!sv->done) {
+ grpc_pollset_work(&g_pollset, gpr_inf_future);
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
/* ===An upload client to test notify_on_write=== */
@@ -271,9 +270,7 @@ typedef struct {
notify_on_write to schedule another write. */
int client_write_cnt;
- gpr_mu mu; /* protect done and done_cv */
- gpr_cv done_cv; /* signaled when a client finishes sending */
- int done; /* set to 1 when a client finishes sending */
+ int done; /* set to 1 when a client finishes sending */
grpc_iomgr_closure write_closure;
} client;
@@ -281,17 +278,15 @@ static void client_init(client *cl) {
memset(cl->write_buf, 0, sizeof(cl->write_buf));
cl->write_bytes_total = 0;
cl->client_write_cnt = 0;
- gpr_mu_init(&cl->mu);
- gpr_cv_init(&cl->done_cv);
cl->done = 0;
}
/* Called when a client upload session is ready to shutdown. */
static void client_session_shutdown_cb(void *arg /*client*/, int success) {
client *cl = arg;
- grpc_fd_orphan(cl->em_fd, NULL, NULL);
+ grpc_fd_orphan(cl->em_fd, NULL, "c");
cl->done = 1;
- gpr_cv_signal(&cl->done_cv);
+ grpc_pollset_kick(&g_pollset);
}
/* Write as much as possible, then register notify_on_write. */
@@ -302,9 +297,9 @@ static void client_session_write(void *arg, /*client*/
ssize_t write_once = 0;
if (!success) {
- gpr_mu_lock(&cl->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
client_session_shutdown_cb(arg, 1);
- gpr_mu_unlock(&cl->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
return;
}
@@ -314,7 +309,7 @@ static void client_session_write(void *arg, /*client*/
} while (write_once > 0);
if (errno == EAGAIN) {
- gpr_mu_lock(&cl->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
cl->write_closure.cb = client_session_write;
cl->write_closure.cb_arg = cl;
@@ -323,7 +318,7 @@ static void client_session_write(void *arg, /*client*/
} else {
client_session_shutdown_cb(arg, 1);
}
- gpr_mu_unlock(&cl->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} else {
gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
abort();
@@ -352,18 +347,18 @@ static void client_start(client *cl, int port) {
}
cl->em_fd = grpc_fd_create(fd, "client");
+ grpc_pollset_add_fd(&g_pollset, cl->em_fd);
client_session_write(cl, 1);
}
/* Wait for the signal to shutdown a client. */
static void client_wait_and_shutdown(client *cl) {
- gpr_mu_lock(&cl->mu);
- while (!cl->done) gpr_cv_wait(&cl->done_cv, &cl->mu, gpr_inf_future);
- gpr_mu_unlock(&cl->mu);
-
- gpr_mu_destroy(&cl->mu);
- gpr_cv_destroy(&cl->done_cv);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ while (!cl->done) {
+ grpc_pollset_work(&g_pollset, gpr_inf_future);
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
/* Test grpc_fd. Start an upload server and client, upload a stream of
@@ -385,38 +380,29 @@ static void test_grpc_fd(void) {
}
typedef struct fd_change_data {
- gpr_mu mu;
- gpr_cv cv;
void (*cb_that_ran)(void *, int success);
} fd_change_data;
-void init_change_data(fd_change_data *fdc) {
- gpr_mu_init(&fdc->mu);
- gpr_cv_init(&fdc->cv);
- fdc->cb_that_ran = NULL;
-}
+void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; }
-void destroy_change_data(fd_change_data *fdc) {
- gpr_mu_destroy(&fdc->mu);
- gpr_cv_destroy(&fdc->cv);
-}
+void destroy_change_data(fd_change_data *fdc) {}
static void first_read_callback(void *arg /* fd_change_data */, int success) {
fd_change_data *fdc = arg;
- gpr_mu_lock(&fdc->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
fdc->cb_that_ran = first_read_callback;
- gpr_cv_signal(&fdc->cv);
- gpr_mu_unlock(&fdc->mu);
+ grpc_pollset_kick(&g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
static void second_read_callback(void *arg /* fd_change_data */, int success) {
fd_change_data *fdc = arg;
- gpr_mu_lock(&fdc->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
fdc->cb_that_ran = second_read_callback;
- gpr_cv_signal(&fdc->cv);
- gpr_mu_unlock(&fdc->mu);
+ grpc_pollset_kick(&g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
/* Test that changing the callback we use for notify_on_read actually works.
@@ -448,6 +434,7 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
+ grpc_pollset_add_fd(&g_pollset, em_fd);
/* Register the first callback, then make its FD readable */
grpc_fd_notify_on_read(em_fd, &first_closure);
@@ -456,12 +443,12 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(result == 1);
/* And now wait for it to run. */
- gpr_mu_lock(&a.mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (a.cb_that_ran == NULL) {
- gpr_cv_wait(&a.cv, &a.mu, gpr_inf_future);
+ grpc_pollset_work(&g_pollset, gpr_inf_future);
}
GPR_ASSERT(a.cb_that_ran == first_read_callback);
- gpr_mu_unlock(&a.mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
/* And drain the socket so we can generate a new read edge */
result = read(sv[0], &data, 1);
@@ -474,25 +461,29 @@ static void test_grpc_fd_change(void) {
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
- gpr_mu_lock(&b.mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (b.cb_that_ran == NULL) {
- gpr_cv_wait(&b.cv, &b.mu, gpr_inf_future);
+ grpc_pollset_work(&g_pollset, gpr_inf_future);
}
/* Except now we verify that second_read_callback ran instead */
GPR_ASSERT(b.cb_that_ran == second_read_callback);
- gpr_mu_unlock(&b.mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_fd_orphan(em_fd, NULL, NULL);
+ grpc_fd_orphan(em_fd, NULL, "d");
destroy_change_data(&a);
destroy_change_data(&b);
close(sv[1]);
}
+static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_iomgr_init();
+ grpc_pollset_init(&g_pollset);
test_grpc_fd();
test_grpc_fd_change();
+ grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/iomgr/poll_kick_posix_test.c b/test/core/iomgr/poll_kick_posix_test.c
index 2c5b444d3a..3aa6807806 100644
--- a/test/core/iomgr/poll_kick_posix_test.c
+++ b/test/core/iomgr/poll_kick_posix_test.c
@@ -31,7 +31,7 @@
*
*/
-#include "src/core/iomgr/pollset_kick.h"
+#include "src/core/iomgr/pollset_kick_posix.h"
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
@@ -45,31 +45,31 @@ static void test_allocation(void) {
static void test_non_kick(void) {
grpc_pollset_kick_state state;
- int fd;
+ grpc_kick_fd_info *kfd;
grpc_pollset_kick_init(&state);
- fd = grpc_pollset_kick_pre_poll(&state);
- GPR_ASSERT(fd >= 0);
+ kfd = grpc_pollset_kick_pre_poll(&state);
+ GPR_ASSERT(kfd != NULL);
- grpc_pollset_kick_post_poll(&state);
+ grpc_pollset_kick_post_poll(&state, kfd);
grpc_pollset_kick_destroy(&state);
}
static void test_basic_kick(void) {
/* Kicked during poll */
grpc_pollset_kick_state state;
- int fd;
+ grpc_kick_fd_info *kfd;
grpc_pollset_kick_init(&state);
- fd = grpc_pollset_kick_pre_poll(&state);
- GPR_ASSERT(fd >= 0);
+ kfd = grpc_pollset_kick_pre_poll(&state);
+ GPR_ASSERT(kfd != NULL);
grpc_pollset_kick_kick(&state);
/* Now hypothetically we polled and found that we were kicked */
- grpc_pollset_kick_consume(&state);
+ grpc_pollset_kick_consume(&state, kfd);
- grpc_pollset_kick_post_poll(&state);
+ grpc_pollset_kick_post_poll(&state, kfd);
grpc_pollset_kick_destroy(&state);
}
@@ -77,13 +77,13 @@ static void test_basic_kick(void) {
static void test_non_poll_kick(void) {
/* Kick before entering poll */
grpc_pollset_kick_state state;
- int fd;
+ grpc_kick_fd_info *kfd;
grpc_pollset_kick_init(&state);
grpc_pollset_kick_kick(&state);
- fd = grpc_pollset_kick_pre_poll(&state);
- GPR_ASSERT(fd < 0);
+ kfd = grpc_pollset_kick_pre_poll(&state);
+ GPR_ASSERT(kfd == NULL);
grpc_pollset_kick_destroy(&state);
}
@@ -92,20 +92,20 @@ static void test_non_poll_kick(void) {
static void test_over_free(void) {
/* Check high watermark pipe free logic */
int i;
- struct grpc_pollset_kick_state *kick_state =
- gpr_malloc(sizeof(grpc_pollset_kick_state) * GRPC_MAX_CACHED_PIPES);
+ grpc_kick_fd_info **kfds =
+ gpr_malloc(sizeof(grpc_kick_fd_info *) * GRPC_MAX_CACHED_PIPES);
+ grpc_pollset_kick_state state;
+ grpc_pollset_kick_init(&state);
for (i = 0; i < GRPC_MAX_CACHED_PIPES; ++i) {
- int fd;
- grpc_pollset_kick_init(&kick_state[i]);
- fd = grpc_pollset_kick_pre_poll(&kick_state[i]);
- GPR_ASSERT(fd >= 0);
+ kfds[i] = grpc_pollset_kick_pre_poll(&state);
+ GPR_ASSERT(kfds[i] != NULL);
}
for (i = 0; i < GRPC_MAX_CACHED_PIPES; ++i) {
- grpc_pollset_kick_post_poll(&kick_state[i]);
- grpc_pollset_kick_destroy(&kick_state[i]);
+ grpc_pollset_kick_post_poll(&state, kfds[i]);
}
- gpr_free(kick_state);
+ grpc_pollset_kick_destroy(&state);
+ gpr_free(kfds);
}
static void run_tests(void) {
diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c
index 3c4d8fed4f..b673c032b2 100644
--- a/test/core/iomgr/tcp_client_posix_test.c
+++ b/test/core/iomgr/tcp_client_posix_test.c
@@ -45,20 +45,31 @@
#include <grpc/support/time.h>
#include "test/core/util/test_config.h"
+static grpc_pollset_set g_pollset_set;
+static grpc_pollset g_pollset;
+static int g_connections_complete = 0;
+
static gpr_timespec test_deadline(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
}
+static void finish_connection() {
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ g_connections_complete++;
+ grpc_pollset_kick(&g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+}
+
static void must_succeed(void *arg, grpc_endpoint *tcp) {
GPR_ASSERT(tcp);
grpc_endpoint_shutdown(tcp);
grpc_endpoint_destroy(tcp);
- gpr_event_set(arg, (void *)1);
+ finish_connection();
}
static void must_fail(void *arg, grpc_endpoint *tcp) {
GPR_ASSERT(!tcp);
- gpr_event_set(arg, (void *)1);
+ finish_connection();
}
void test_succeeds(void) {
@@ -66,9 +77,7 @@ void test_succeeds(void) {
socklen_t addr_len = sizeof(addr);
int svr_fd;
int r;
- gpr_event ev;
-
- gpr_event_init(&ev);
+ int connections_complete_before;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
@@ -79,10 +88,14 @@ void test_succeeds(void) {
GPR_ASSERT(0 == bind(svr_fd, (struct sockaddr *)&addr, addr_len));
GPR_ASSERT(0 == listen(svr_fd, 1));
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ connections_complete_before = g_connections_complete;
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+
/* connect to it */
GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0);
- grpc_tcp_client_connect(must_succeed, &ev, (struct sockaddr *)&addr, addr_len,
- gpr_inf_future);
+ grpc_tcp_client_connect(must_succeed, NULL, &g_pollset_set,
+ (struct sockaddr *)&addr, addr_len, gpr_inf_future);
/* await the connection */
do {
@@ -92,26 +105,39 @@ void test_succeeds(void) {
GPR_ASSERT(r >= 0);
close(r);
- /* wait for the connection callback to finish */
- GPR_ASSERT(gpr_event_wait(&ev, test_deadline()));
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+
+ while (g_connections_complete == connections_complete_before) {
+ grpc_pollset_work(&g_pollset, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
+ }
+
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
void test_fails(void) {
struct sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
- gpr_event ev;
-
- gpr_event_init(&ev);
+ int connections_complete_before;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ connections_complete_before = g_connections_complete;
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+
/* connect to a broken address */
- grpc_tcp_client_connect(must_fail, &ev, (struct sockaddr *)&addr, addr_len,
- gpr_inf_future);
+ grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set,
+ (struct sockaddr *)&addr, addr_len, gpr_inf_future);
+
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
/* wait for the connection callback to finish */
- GPR_ASSERT(gpr_event_wait(&ev, test_deadline()));
+ while (g_connections_complete == connections_complete_before) {
+ grpc_pollset_work(&g_pollset, test_deadline());
+ }
+
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
void test_times_out(void) {
@@ -122,11 +148,9 @@ void test_times_out(void) {
int client_fd[NUM_CLIENT_CONNECTS];
int i;
int r;
- gpr_event ev;
+ int connections_complete_before;
gpr_timespec connect_deadline;
- gpr_event_init(&ev);
-
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
@@ -153,28 +177,50 @@ void test_times_out(void) {
connect_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1);
- grpc_tcp_client_connect(must_fail, &ev, (struct sockaddr *)&addr, addr_len,
- connect_deadline);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ connections_complete_before = g_connections_complete;
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+
+ grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set,
+ (struct sockaddr *)&addr, addr_len, connect_deadline);
+
/* Make sure the event doesn't trigger early */
- GPR_ASSERT(!gpr_event_wait(&ev, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(500)));
- /* Now wait until it should have triggered */
- sleep(1);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ while (gpr_time_cmp(gpr_time_add(connect_deadline, gpr_time_from_seconds(2)),
+ gpr_now()) > 0) {
+ int is_after_deadline = gpr_time_cmp(connect_deadline, gpr_now()) <= 0;
+ if (is_after_deadline &&
+ gpr_time_cmp(gpr_time_add(connect_deadline, gpr_time_from_seconds(1)),
+ gpr_now()) > 0) {
+ /* allow some slack before insisting that things be done */
+ } else {
+ GPR_ASSERT(g_connections_complete ==
+ connections_complete_before + is_after_deadline);
+ }
+ grpc_pollset_work(&g_pollset, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- /* wait for the connection callback to finish */
- GPR_ASSERT(gpr_event_wait(&ev, test_deadline()));
close(svr_fd);
for (i = 0; i < NUM_CLIENT_CONNECTS; ++i) {
close(client_fd[i]);
}
}
+static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_iomgr_init();
+ grpc_pollset_set_init(&g_pollset_set);
+ grpc_pollset_init(&g_pollset);
+ grpc_pollset_set_add_pollset(&g_pollset_set, &g_pollset);
test_succeeds();
gpr_log(GPR_ERROR, "End of first test");
test_fails();
test_times_out();
+ grpc_pollset_set_destroy(&g_pollset_set);
+ grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 2cfcc8311c..a23c64928e 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -48,6 +48,8 @@
#include "test/core/util/test_config.h"
#include "test/core/iomgr/endpoint_tests.h"
+static grpc_pollset g_pollset;
+
/*
General test notes:
@@ -114,8 +116,6 @@ static size_t fill_socket_partial(int fd, size_t bytes) {
struct read_socket_state {
grpc_endpoint *ep;
- gpr_mu mu;
- gpr_cv cv;
ssize_t read_bytes;
ssize_t target_read_bytes;
};
@@ -145,18 +145,18 @@ static void read_cb(void *user_data, gpr_slice *slices, size_t nslices,
GPR_ASSERT(error == GRPC_ENDPOINT_CB_OK);
- gpr_mu_lock(&state->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
current_data = state->read_bytes % 256;
read_bytes = count_and_unref_slices(slices, nslices, &current_data);
state->read_bytes += read_bytes;
gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
state->target_read_bytes);
if (state->read_bytes >= state->target_read_bytes) {
- gpr_cv_signal(&state->cv);
+ /* empty */
} else {
grpc_endpoint_notify_on_read(state->ep, read_cb, state);
}
- gpr_mu_unlock(&state->mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
/* Write to a socket, then read from it using the grpc_tcp API. */
@@ -173,31 +173,25 @@ static void read_test(ssize_t num_bytes, ssize_t slice_size) {
create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size);
+ grpc_endpoint_add_to_pollset(ep, &g_pollset);
+
written_bytes = fill_socket_partial(sv[0], num_bytes);
gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
- gpr_mu_init(&state.mu);
- gpr_cv_init(&state.cv);
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
grpc_endpoint_notify_on_read(ep, read_cb, &state);
- gpr_mu_lock(&state.mu);
- for (;;) {
- GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0);
- if (state.read_bytes >= state.target_read_bytes) {
- break;
- }
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ while (state.read_bytes < state.target_read_bytes) {
+ grpc_pollset_work(&g_pollset, deadline);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
- gpr_mu_unlock(&state.mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_endpoint_destroy(ep);
-
- gpr_mu_destroy(&state.mu);
- gpr_cv_destroy(&state.cv);
}
/* Write to a socket until it fills up, then read from it using the grpc_tcp
@@ -214,37 +208,29 @@ static void large_read_test(ssize_t slice_size) {
create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size);
+ grpc_endpoint_add_to_pollset(ep, &g_pollset);
+
written_bytes = fill_socket(sv[0]);
gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
- gpr_mu_init(&state.mu);
- gpr_cv_init(&state.cv);
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
grpc_endpoint_notify_on_read(ep, read_cb, &state);
- gpr_mu_lock(&state.mu);
- for (;;) {
- GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0);
- if (state.read_bytes >= state.target_read_bytes) {
- break;
- }
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ while (state.read_bytes < state.target_read_bytes) {
+ grpc_pollset_work(&g_pollset, deadline);
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
- gpr_mu_unlock(&state.mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_endpoint_destroy(ep);
-
- gpr_mu_destroy(&state.mu);
- gpr_cv_destroy(&state.cv);
}
struct write_socket_state {
grpc_endpoint *ep;
- gpr_mu mu;
- gpr_cv cv;
int write_done;
};
@@ -275,11 +261,11 @@ static void write_done(void *user_data /* write_socket_state */,
grpc_endpoint_cb_status error) {
struct write_socket_state *state = (struct write_socket_state *)user_data;
gpr_log(GPR_INFO, "Write done callback called");
- gpr_mu_lock(&state->mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
gpr_log(GPR_INFO, "Signalling write done");
state->write_done = 1;
- gpr_cv_signal(&state->cv);
- gpr_mu_unlock(&state->mu);
+ grpc_pollset_kick(&g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
@@ -294,6 +280,9 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
for (;;) {
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_pollset_work(&g_pollset, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
do {
bytes_read =
read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
@@ -352,9 +341,8 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"),
GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ grpc_endpoint_add_to_pollset(ep, &g_pollset);
- gpr_mu_init(&state.mu);
- gpr_cv_init(&state.cv);
state.ep = ep;
state.write_done = 0;
@@ -367,19 +355,17 @@ static void write_test(ssize_t num_bytes, ssize_t slice_size) {
GPR_ASSERT(read_bytes == num_bytes);
} else {
drain_socket_blocking(sv[0], num_bytes, num_bytes);
- gpr_mu_lock(&state.mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
if (state.write_done) {
break;
}
- GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0);
+ grpc_pollset_work(&g_pollset, deadline);
}
- gpr_mu_unlock(&state.mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
grpc_endpoint_destroy(ep);
- gpr_mu_destroy(&state.mu);
- gpr_cv_destroy(&state.cv);
gpr_free(slices);
}
@@ -409,10 +395,10 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_error_test"),
GRPC_TCP_DEFAULT_READ_SLICE_SIZE);
+ grpc_endpoint_add_to_pollset(ep, &g_pollset);
+
close(sv[0]);
- gpr_mu_init(&state.mu);
- gpr_cv_init(&state.cv);
state.ep = ep;
state.write_done = 0;
@@ -425,20 +411,18 @@ static void write_error_test(ssize_t num_bytes, ssize_t slice_size) {
break;
case GRPC_ENDPOINT_WRITE_PENDING:
grpc_endpoint_notify_on_read(ep, read_done_for_write_error, NULL);
- gpr_mu_lock(&state.mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (;;) {
if (state.write_done) {
break;
}
- GPR_ASSERT(gpr_cv_wait(&state.cv, &state.mu, deadline) == 0);
+ grpc_pollset_work(&g_pollset, deadline);
}
- gpr_mu_unlock(&state.mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
break;
}
grpc_endpoint_destroy(ep);
- gpr_mu_destroy(&state.mu);
- gpr_cv_destroy(&state.cv);
free(slices);
}
@@ -479,6 +463,8 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), slice_size);
f.server_ep =
grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size);
+ grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset);
return f;
}
@@ -487,11 +473,15 @@ static grpc_endpoint_test_config configs[] = {
{"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
};
+static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_init();
+ grpc_pollset_init(&g_pollset);
run_tests();
- grpc_endpoint_tests(configs[0]);
+ grpc_endpoint_tests(configs[0], &g_pollset);
+ grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_shutdown();
return 0;
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index 328b19f68a..fb262711c0 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -45,18 +45,17 @@
#define LOG_TEST(x) gpr_log(GPR_INFO, "%s", #x)
-static gpr_mu mu;
-static gpr_cv cv;
-static int nconnects = 0;
+static grpc_pollset g_pollset;
+static int g_nconnects = 0;
static void on_connect(void *arg, grpc_endpoint *tcp) {
grpc_endpoint_shutdown(tcp);
grpc_endpoint_destroy(tcp);
- gpr_mu_lock(&mu);
- nconnects++;
- gpr_cv_broadcast(&cv);
- gpr_mu_unlock(&mu);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ g_nconnects++;
+ grpc_pollset_kick(&g_pollset);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
static void test_no_op(void) {
@@ -106,12 +105,11 @@ static void test_connect(int n) {
grpc_tcp_server *s = grpc_tcp_server_create();
int nconnects_before;
gpr_timespec deadline;
+ grpc_pollset *pollsets[1];
int i;
LOG_TEST("test_connect");
gpr_log(GPR_INFO, "clients=%d", n);
- gpr_mu_lock(&mu);
-
memset(&addr, 0, sizeof(addr));
addr.ss_family = AF_INET;
GPR_ASSERT(grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, addr_len));
@@ -121,38 +119,42 @@ static void test_connect(int n) {
GPR_ASSERT(getsockname(svrfd, (struct sockaddr *)&addr, &addr_len) == 0);
GPR_ASSERT(addr_len <= sizeof(addr));
- grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
+ pollsets[0] = &g_pollset;
+ grpc_tcp_server_start(s, pollsets, 1, on_connect, NULL);
+
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
for (i = 0; i < n; i++) {
- deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1);
+ deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(4000);
- nconnects_before = nconnects;
+ nconnects_before = g_nconnects;
clifd = socket(addr.ss_family, SOCK_STREAM, 0);
GPR_ASSERT(clifd >= 0);
+ gpr_log(GPR_DEBUG, "start connect");
GPR_ASSERT(connect(clifd, (struct sockaddr *)&addr, addr_len) == 0);
- while (nconnects == nconnects_before) {
- GPR_ASSERT(gpr_cv_wait(&cv, &mu, deadline) == 0);
+ gpr_log(GPR_DEBUG, "wait");
+ while (g_nconnects == nconnects_before &&
+ gpr_time_cmp(deadline, gpr_now()) > 0) {
+ grpc_pollset_work(&g_pollset, deadline);
}
+ gpr_log(GPR_DEBUG, "wait done");
- GPR_ASSERT(nconnects == nconnects_before + 1);
+ GPR_ASSERT(g_nconnects == nconnects_before + 1);
close(clifd);
-
- if (i != n - 1) {
- sleep(1);
- }
}
- gpr_mu_unlock(&mu);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
grpc_tcp_server_destroy(s, NULL, NULL);
}
+static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_iomgr_init();
- gpr_mu_init(&mu);
- gpr_cv_init(&cv);
+ grpc_pollset_init(&g_pollset);
test_no_op();
test_no_op_with_start();
@@ -161,8 +163,7 @@ int main(int argc, char **argv) {
test_connect(1);
test_connect(10);
+ grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
grpc_iomgr_shutdown();
- gpr_mu_destroy(&mu);
- gpr_cv_destroy(&cv);
return 0;
}