aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/tcp_posix_test.c
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-09-22 12:33:20 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-09-22 12:33:20 -0700
commita82950e68318a6aab6fe894fa39f7fa616c4647b (patch)
tree7d02bd1e9e1cbae1f14ad4ad1e06d3ae81a96dfe /test/core/iomgr/tcp_posix_test.c
parent8af4c337181322cc4fb396199c90f574cfb4163f (diff)
clang-format all core files
Diffstat (limited to 'test/core/iomgr/tcp_posix_test.c')
-rw-r--r--test/core/iomgr/tcp_posix_test.c509
1 files changed, 233 insertions, 276 deletions
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 2ed6157479..d2471c9668 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -66,70 +66,55 @@ static grpc_pollset g_pollset;
*/
-static void
-create_sockets (int sv[2])
-{
+static void create_sockets(int sv[2]) {
int flags;
- GPR_ASSERT (socketpair (AF_UNIX, SOCK_STREAM, 0, sv) == 0);
- flags = fcntl (sv[0], F_GETFL, 0);
- GPR_ASSERT (fcntl (sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
- flags = fcntl (sv[1], F_GETFL, 0);
- GPR_ASSERT (fcntl (sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
+ GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
+ flags = fcntl(sv[0], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
+ flags = fcntl(sv[1], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
-static ssize_t
-fill_socket (int fd)
-{
+static ssize_t fill_socket(int fd) {
ssize_t write_bytes;
ssize_t total_bytes = 0;
int i;
unsigned char buf[256];
- for (i = 0; i < 256; ++i)
- {
- buf[i] = (gpr_uint8) i;
+ for (i = 0; i < 256; ++i) {
+ buf[i] = (gpr_uint8)i;
+ }
+ do {
+ write_bytes = write(fd, buf, 256);
+ if (write_bytes > 0) {
+ total_bytes += write_bytes;
}
- do
- {
- write_bytes = write (fd, buf, 256);
- if (write_bytes > 0)
- {
- total_bytes += write_bytes;
- }
- }
- while (write_bytes >= 0 || errno == EINTR);
- GPR_ASSERT (errno == EAGAIN);
+ } while (write_bytes >= 0 || errno == EINTR);
+ GPR_ASSERT(errno == EAGAIN);
return total_bytes;
}
-static size_t
-fill_socket_partial (int fd, size_t bytes)
-{
+static size_t fill_socket_partial(int fd, size_t bytes) {
ssize_t write_bytes;
size_t total_bytes = 0;
- unsigned char *buf = malloc (bytes);
+ unsigned char *buf = malloc(bytes);
unsigned i;
- for (i = 0; i < bytes; ++i)
- {
- buf[i] = (gpr_uint8) (i % 256);
- }
-
- do
- {
- write_bytes = write (fd, buf, bytes - total_bytes);
- if (write_bytes > 0)
- {
- total_bytes += (size_t) write_bytes;
- }
+ for (i = 0; i < bytes; ++i) {
+ buf[i] = (gpr_uint8)(i % 256);
+ }
+
+ do {
+ write_bytes = write(fd, buf, bytes - total_bytes);
+ if (write_bytes > 0) {
+ total_bytes += (size_t)write_bytes;
}
- while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes);
+ } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes);
- gpr_free (buf);
+ gpr_free(buf);
return total_bytes;
}
-struct read_socket_state
-{
+struct read_socket_state {
grpc_endpoint *ep;
size_t read_bytes;
size_t target_read_bytes;
@@ -137,191 +122,176 @@ struct read_socket_state
grpc_closure read_cb;
};
-static size_t
-count_slices (gpr_slice * slices, size_t nslices, int *current_data)
-{
+static size_t count_slices(gpr_slice *slices, size_t nslices,
+ int *current_data) {
size_t num_bytes = 0;
unsigned i, j;
unsigned char *buf;
- for (i = 0; i < nslices; ++i)
- {
- buf = GPR_SLICE_START_PTR (slices[i]);
- for (j = 0; j < GPR_SLICE_LENGTH (slices[i]); ++j)
- {
- GPR_ASSERT (buf[j] == *current_data);
- *current_data = (*current_data + 1) % 256;
- }
- num_bytes += GPR_SLICE_LENGTH (slices[i]);
+ for (i = 0; i < nslices; ++i) {
+ buf = GPR_SLICE_START_PTR(slices[i]);
+ for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) {
+ GPR_ASSERT(buf[j] == *current_data);
+ *current_data = (*current_data + 1) % 256;
}
+ num_bytes += GPR_SLICE_LENGTH(slices[i]);
+ }
return num_bytes;
}
-static void
-read_cb (grpc_exec_ctx * exec_ctx, void *user_data, int success)
-{
- struct read_socket_state *state = (struct read_socket_state *) user_data;
+static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
+ struct read_socket_state *state = (struct read_socket_state *)user_data;
size_t read_bytes;
int current_data;
- GPR_ASSERT (success);
+ GPR_ASSERT(success);
- gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset));
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
current_data = state->read_bytes % 256;
- read_bytes = count_slices (state->incoming.slices, state->incoming.count, &current_data);
+ read_bytes = count_slices(state->incoming.slices, state->incoming.count,
+ &current_data);
state->read_bytes += read_bytes;
- gpr_log (GPR_INFO, "Read %d bytes of %d", read_bytes, state->target_read_bytes);
- if (state->read_bytes >= state->target_read_bytes)
- {
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
- }
- else
- {
- grpc_endpoint_read (exec_ctx, state->ep, &state->incoming, &state->read_cb);
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
- }
+ gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
+ state->target_read_bytes);
+ if (state->read_bytes >= state->target_read_bytes) {
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ } else {
+ grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ }
}
/* Write to a socket, then read from it using the grpc_tcp API. */
-static void
-read_test (size_t num_bytes, size_t slice_size)
-{
+static void read_test(size_t num_bytes, size_t slice_size) {
int sv[2];
grpc_endpoint *ep;
struct read_socket_state state;
size_t written_bytes;
- gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE (20);
+ gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- gpr_log (GPR_INFO, "Read test of size %d, slice size %d", num_bytes, slice_size);
+ gpr_log(GPR_INFO, "Read test of size %d, slice size %d", num_bytes,
+ slice_size);
- create_sockets (sv);
+ create_sockets(sv);
- ep = grpc_tcp_create (grpc_fd_create (sv[1], "read_test"), slice_size, "test");
- grpc_endpoint_add_to_pollset (&exec_ctx, ep, &g_pollset);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
- written_bytes = fill_socket_partial (sv[0], num_bytes);
- gpr_log (GPR_INFO, "Wrote %d bytes", written_bytes);
+ written_bytes = fill_socket_partial(sv[0], num_bytes);
+ gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
state.ep = ep;
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
- gpr_slice_buffer_init (&state.incoming);
- grpc_closure_init (&state.read_cb, read_cb, &state);
-
- grpc_endpoint_read (&exec_ctx, ep, &state.incoming, &state.read_cb);
-
- gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset));
- while (state.read_bytes < state.target_read_bytes)
- {
- grpc_pollset_worker worker;
- grpc_pollset_work (&g_pollset, &worker, gpr_now (&exec_ctx, GPR_CLOCK_MONOTONIC), deadline);
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
- grpc_exec_ctx_finish (&exec_ctx);
- gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset));
- }
- GPR_ASSERT (state.read_bytes == state.target_read_bytes);
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
-
- gpr_slice_buffer_destroy (&state.incoming);
- grpc_endpoint_destroy (&exec_ctx, ep);
- grpc_exec_ctx_finish (&exec_ctx);
+ gpr_slice_buffer_init(&state.incoming);
+ grpc_closure_init(&state.read_cb, read_cb, &state);
+
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
+
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ while (state.read_bytes < state.target_read_bytes) {
+ grpc_pollset_worker worker;
+ grpc_pollset_work(&g_pollset, &worker,
+ gpr_now(&exec_ctx, GPR_CLOCK_MONOTONIC), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ }
+ GPR_ASSERT(state.read_bytes == state.target_read_bytes);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+
+ gpr_slice_buffer_destroy(&state.incoming);
+ grpc_endpoint_destroy(&exec_ctx, ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/* Write to a socket until it fills up, then read from it using the grpc_tcp
API. */
-static void
-large_read_test (size_t slice_size)
-{
+static void large_read_test(size_t slice_size) {
int sv[2];
grpc_endpoint *ep;
struct read_socket_state state;
ssize_t written_bytes;
- gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE (20);
+ gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- gpr_log (GPR_INFO, "Start large read test, slice size %d", slice_size);
+ gpr_log(GPR_INFO, "Start large read test, slice size %d", slice_size);
- create_sockets (sv);
+ create_sockets(sv);
- ep = grpc_tcp_create (grpc_fd_create (sv[1], "large_read_test"), slice_size, "test");
- grpc_endpoint_add_to_pollset (&exec_ctx, ep, &g_pollset);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size,
+ "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
- written_bytes = fill_socket (sv[0]);
- gpr_log (GPR_INFO, "Wrote %d bytes", written_bytes);
+ written_bytes = fill_socket(sv[0]);
+ gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
state.ep = ep;
state.read_bytes = 0;
- state.target_read_bytes = (size_t) written_bytes;
- gpr_slice_buffer_init (&state.incoming);
- grpc_closure_init (&state.read_cb, read_cb, &state);
-
- grpc_endpoint_read (&exec_ctx, ep, &state.incoming, &state.read_cb);
-
- gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset));
- while (state.read_bytes < state.target_read_bytes)
- {
- grpc_pollset_worker worker;
- grpc_pollset_work (&g_pollset, &worker, gpr_now (&exec_ctx, GPR_CLOCK_MONOTONIC), deadline);
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
- grpc_exec_ctx_finish (&exec_ctx);
- gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset));
- }
- GPR_ASSERT (state.read_bytes == state.target_read_bytes);
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
-
- gpr_slice_buffer_destroy (&state.incoming);
- grpc_endpoint_destroy (&exec_ctx, ep);
- grpc_exec_ctx_finish (&exec_ctx);
+ state.target_read_bytes = (size_t)written_bytes;
+ gpr_slice_buffer_init(&state.incoming);
+ grpc_closure_init(&state.read_cb, read_cb, &state);
+
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
+
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ while (state.read_bytes < state.target_read_bytes) {
+ grpc_pollset_worker worker;
+ grpc_pollset_work(&g_pollset, &worker,
+ gpr_now(&exec_ctx, GPR_CLOCK_MONOTONIC), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ }
+ GPR_ASSERT(state.read_bytes == state.target_read_bytes);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+
+ gpr_slice_buffer_destroy(&state.incoming);
+ grpc_endpoint_destroy(&exec_ctx, ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-struct write_socket_state
-{
+struct write_socket_state {
grpc_endpoint *ep;
int write_done;
};
-static gpr_slice *
-allocate_blocks (size_t num_bytes, size_t slice_size, size_t * num_blocks, gpr_uint8 * current_data)
-{
+static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size,
+ size_t *num_blocks, gpr_uint8 *current_data) {
size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
- gpr_slice *slices = gpr_malloc (sizeof (gpr_slice) * nslices);
+ gpr_slice *slices = gpr_malloc(sizeof(gpr_slice) * nslices);
size_t num_bytes_left = num_bytes;
unsigned i, j;
unsigned char *buf;
*num_blocks = nslices;
- for (i = 0; i < nslices; ++i)
- {
- slices[i] = gpr_slice_malloc (slice_size > num_bytes_left ? num_bytes_left : slice_size);
- num_bytes_left -= GPR_SLICE_LENGTH (slices[i]);
- buf = GPR_SLICE_START_PTR (slices[i]);
- for (j = 0; j < GPR_SLICE_LENGTH (slices[i]); ++j)
- {
- buf[j] = *current_data;
- (*current_data)++;
- }
+ for (i = 0; i < nslices; ++i) {
+ slices[i] = gpr_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
+ : slice_size);
+ num_bytes_left -= GPR_SLICE_LENGTH(slices[i]);
+ buf = GPR_SLICE_START_PTR(slices[i]);
+ for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) {
+ buf[j] = *current_data;
+ (*current_data)++;
}
- GPR_ASSERT (num_bytes_left == 0);
+ }
+ GPR_ASSERT(num_bytes_left == 0);
return slices;
}
-static void
-write_done (void *user_data /* write_socket_state */ , int success,
- grpc_closure_list * closure_list)
-{
- struct write_socket_state *state = (struct write_socket_state *) user_data;
- gpr_log (GPR_INFO, "Write done callback called");
- gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset));
- gpr_log (GPR_INFO, "Signalling write done");
+static void write_done(void *user_data /* write_socket_state */, int success,
+ grpc_closure_list *closure_list) {
+ struct write_socket_state *state = (struct write_socket_state *)user_data;
+ gpr_log(GPR_INFO, "Write done callback called");
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ gpr_log(GPR_INFO, "Signalling write done");
state->write_done = 1;
- grpc_pollset_kick (&g_pollset, NULL);
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
+ grpc_pollset_kick(&g_pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
-void
-drain_socket_blocking (int fd, size_t num_bytes, size_t read_size)
-{
- unsigned char *buf = malloc (read_size);
+void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
+ unsigned char *buf = malloc(read_size);
ssize_t bytes_read;
size_t bytes_left = num_bytes;
int flags;
@@ -329,43 +299,38 @@ drain_socket_blocking (int fd, size_t num_bytes, size_t read_size)
int i;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- flags = fcntl (fd, F_GETFL, 0);
- GPR_ASSERT (fcntl (fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
-
- for (;;)
- {
- grpc_pollset_worker worker;
- gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset));
- grpc_pollset_work (&g_pollset, &worker, gpr_now (GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_MILLIS_TO_DEADLINE (&exec_ctx, 10));
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
- grpc_exec_ctx_finish (&exec_ctx);
- do
- {
- bytes_read = read (fd, buf, bytes_left > read_size ? read_size : bytes_left);
- }
- while (bytes_read < 0 && errno == EINTR);
- GPR_ASSERT (bytes_read >= 0);
- for (i = 0; i < bytes_read; ++i)
- {
- GPR_ASSERT (buf[i] == current);
- current = (current + 1) % 256;
- }
- bytes_left -= (size_t) bytes_read;
- if (bytes_left == 0)
- break;
+ flags = fcntl(fd, F_GETFL, 0);
+ GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
+
+ for (;;) {
+ grpc_pollset_worker worker;
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ GRPC_TIMEOUT_MILLIS_TO_DEADLINE(&exec_ctx, 10));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ do {
+ bytes_read =
+ read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
+ } while (bytes_read < 0 && errno == EINTR);
+ GPR_ASSERT(bytes_read >= 0);
+ for (i = 0; i < bytes_read; ++i) {
+ GPR_ASSERT(buf[i] == current);
+ current = (current + 1) % 256;
}
- flags = fcntl (fd, F_GETFL, 0);
- GPR_ASSERT (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == 0);
+ bytes_left -= (size_t)bytes_read;
+ if (bytes_left == 0) break;
+ }
+ flags = fcntl(fd, F_GETFL, 0);
+ GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
- gpr_free (buf);
+ gpr_free(buf);
}
/* Write to a socket using the grpc_tcp API, then drain it directly.
Note that if the write does not complete immediately we need to drain the
socket in parallel with the read. */
-static void
-write_test (size_t num_bytes, size_t slice_size)
-{
+static void write_test(size_t num_bytes, size_t slice_size) {
int sv[2];
grpc_endpoint *ep;
struct write_socket_state state;
@@ -374,119 +339,111 @@ write_test (size_t num_bytes, size_t slice_size)
gpr_uint8 current_data = 0;
gpr_slice_buffer outgoing;
grpc_closure write_done_closure;
- gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE (20);
+ gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- gpr_log (GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes, slice_size);
+ gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
+ slice_size);
- create_sockets (sv);
+ create_sockets(sv);
- ep = grpc_tcp_create (grpc_fd_create (sv[1], "write_test"), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
- grpc_endpoint_add_to_pollset (&exec_ctx, ep, &g_pollset);
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"),
+ GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
state.ep = ep;
state.write_done = 0;
- slices = allocate_blocks (num_bytes, slice_size, &num_blocks, &current_data);
-
- gpr_slice_buffer_init (&outgoing);
- gpr_slice_buffer_addn (&outgoing, slices, num_blocks);
- grpc_closure_init (&write_done_closure, write_done, &state);
-
- grpc_endpoint_write (&exec_ctx, ep, &outgoing, &write_done_closure);
- drain_socket_blocking (sv[0], num_bytes, num_bytes);
- gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset));
- for (;;)
- {
- grpc_pollset_worker worker;
- if (state.write_done)
- {
- break;
- }
- grpc_pollset_work (&g_pollset, &worker, gpr_now (&exec_ctx, GPR_CLOCK_MONOTONIC), deadline);
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
- grpc_exec_ctx_finish (&exec_ctx);
- gpr_mu_lock (GRPC_POLLSET_MU (&g_pollset));
- }
- gpr_mu_unlock (GRPC_POLLSET_MU (&g_pollset));
+ slices = allocate_blocks(num_bytes, slice_size, &num_blocks, &current_data);
+
+ gpr_slice_buffer_init(&outgoing);
+ gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
+ grpc_closure_init(&write_done_closure, write_done, &state);
- gpr_slice_buffer_destroy (&outgoing);
- grpc_endpoint_destroy (&exec_ctx, ep);
- gpr_free (slices);
- grpc_exec_ctx_finish (&exec_ctx);
+ grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure);
+ drain_socket_blocking(sv[0], num_bytes, num_bytes);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ for (;;) {
+ grpc_pollset_worker worker;
+ if (state.write_done) {
+ break;
+ }
+ grpc_pollset_work(&g_pollset, &worker,
+ gpr_now(&exec_ctx, GPR_CLOCK_MONOTONIC), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ }
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+
+ gpr_slice_buffer_destroy(&outgoing);
+ grpc_endpoint_destroy(&exec_ctx, ep);
+ gpr_free(slices);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-void
-run_tests (void)
-{
+void run_tests(void) {
size_t i = 0;
- read_test (100, 8192);
- read_test (10000, 8192);
- read_test (10000, 137);
- read_test (10000, 1);
- large_read_test (8192);
- large_read_test (1);
-
- write_test (100, 8192);
- write_test (100, 1);
- write_test (100000, 8192);
- write_test (100000, 1);
- write_test (100000, 137);
-
- for (i = 1; i < 1000; i = GPR_MAX (i + 1, i * 5 / 4))
- {
- write_test (40320, i);
- }
+ read_test(100, 8192);
+ read_test(10000, 8192);
+ read_test(10000, 137);
+ read_test(10000, 1);
+ large_read_test(8192);
+ large_read_test(1);
+
+ write_test(100, 8192);
+ write_test(100, 1);
+ write_test(100000, 8192);
+ write_test(100000, 1);
+ write_test(100000, 137);
+
+ for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
+ write_test(40320, i);
+ }
}
-static void
-clean_up (void)
-{
-}
+static void clean_up(void) {}
-static grpc_endpoint_test_fixture
-create_fixture_tcp_socketpair (size_t slice_size)
-{
+static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
+ size_t slice_size) {
int sv[2];
grpc_endpoint_test_fixture f;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- create_sockets (sv);
- f.client_ep = grpc_tcp_create (grpc_fd_create (sv[0], "fixture:client"), slice_size, "test");
- f.server_ep = grpc_tcp_create (grpc_fd_create (sv[1], "fixture:server"), slice_size, "test");
- grpc_endpoint_add_to_pollset (&exec_ctx, f.client_ep, &g_pollset);
- grpc_endpoint_add_to_pollset (&exec_ctx, f.server_ep, &g_pollset);
+ create_sockets(sv);
+ f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"),
+ slice_size, "test");
+ f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"),
+ slice_size, "test");
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset);
- grpc_exec_ctx_finish (&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
return f;
}
static grpc_endpoint_test_config configs[] = {
- {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
+ {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
};
-static void
-destroy_pollset (grpc_exec_ctx * exec_ctx, void *p, int success)
-{
- grpc_pollset_destroy (p);
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
+ grpc_pollset_destroy(p);
}
-int
-main (int argc, char **argv)
-{
+int main(int argc, char **argv) {
grpc_closure destroyed;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
- grpc_test_init (argc, argv);
- grpc_init ();
- grpc_pollset_init (&g_pollset);
- run_tests ();
- grpc_endpoint_tests (configs[0], &g_pollset);
- grpc_closure_init (&destroyed, destroy_pollset, &g_pollset);
- grpc_pollset_shutdown (&exec_ctx, &g_pollset, &destroyed);
- grpc_exec_ctx_finish (&exec_ctx);
- grpc_shutdown ();
+ grpc_test_init(argc, argv);
+ grpc_init();
+ grpc_pollset_init(&g_pollset);
+ run_tests();
+ grpc_endpoint_tests(configs[0], &g_pollset);
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
+ grpc_shutdown();
return 0;
}