aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/iomgr')
-rw-r--r--test/core/iomgr/alarm_list_test.c63
-rw-r--r--test/core/iomgr/alarm_test.c225
-rw-r--r--test/core/iomgr/endpoint_pair_test.c16
-rw-r--r--test/core/iomgr/endpoint_tests.c94
-rw-r--r--test/core/iomgr/fd_conservation_posix_test.c7
-rw-r--r--test/core/iomgr/fd_posix_test.c124
-rw-r--r--test/core/iomgr/resolve_address_test.c18
-rw-r--r--test/core/iomgr/sockaddr_utils_test.c2
-rw-r--r--test/core/iomgr/tcp_client_posix_test.c81
-rw-r--r--test/core/iomgr/tcp_posix_test.c161
-rw-r--r--test/core/iomgr/tcp_server_posix_test.c49
-rw-r--r--test/core/iomgr/time_averaged_stats_test.c2
-rw-r--r--test/core/iomgr/udp_server_test.c43
-rw-r--r--test/core/iomgr/workqueue_test.c93
14 files changed, 456 insertions, 522 deletions
diff --git a/test/core/iomgr/alarm_list_test.c b/test/core/iomgr/alarm_list_test.c
index 56d662e61a..6656a8fa3b 100644
--- a/test/core/iomgr/alarm_list_test.c
+++ b/test/core/iomgr/alarm_list_test.c
@@ -42,11 +42,8 @@
#define MAX_CB 30
static int cb_called[MAX_CB][2];
-static int kicks;
-void grpc_kick_poller(void) { ++kicks; }
-
-static void cb(void *arg, int success) {
+static void cb(grpc_exec_ctx *exec_ctx, void *arg, int success) {
cb_called[(gpr_intptr)arg][success]++;
}
@@ -54,54 +51,59 @@ static void add_test(void) {
gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
int i;
grpc_alarm alarms[20];
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_alarm_list_init(start);
memset(cb_called, 0, sizeof(cb_called));
/* 10 ms alarms. will expire in the current epoch */
for (i = 0; i < 10; i++) {
- grpc_alarm_init(&alarms[i],
+ grpc_alarm_init(&exec_ctx, &alarms[i],
gpr_time_add(start, gpr_time_from_millis(10, GPR_TIMESPAN)),
cb, (void *)(gpr_intptr)i, start);
}
/* 1010 ms alarms. will expire in the next epoch */
for (i = 10; i < 20; i++) {
- grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis(
- 1010, GPR_TIMESPAN)),
- cb, (void *)(gpr_intptr)i, start);
+ grpc_alarm_init(
+ &exec_ctx, &alarms[i],
+ gpr_time_add(start, gpr_time_from_millis(1010, GPR_TIMESPAN)), cb,
+ (void *)(gpr_intptr)i, start);
}
/* collect alarms. Only the first batch should be ready. */
- GPR_ASSERT(10 == grpc_alarm_check(NULL,
+ GPR_ASSERT(10 == grpc_alarm_check(&exec_ctx,
gpr_time_add(start, gpr_time_from_millis(
500, GPR_TIMESPAN)),
NULL));
+ grpc_exec_ctx_finish(&exec_ctx);
for (i = 0; i < 20; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 10));
GPR_ASSERT(cb_called[i][0] == 0);
}
- GPR_ASSERT(0 == grpc_alarm_check(
- NULL, gpr_time_add(
- start, gpr_time_from_millis(600, GPR_TIMESPAN)),
- NULL));
+ GPR_ASSERT(0 == grpc_alarm_check(&exec_ctx,
+ gpr_time_add(start, gpr_time_from_millis(
+ 600, GPR_TIMESPAN)),
+ NULL));
+ grpc_exec_ctx_finish(&exec_ctx);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 10));
GPR_ASSERT(cb_called[i][0] == 0);
}
/* collect the rest of the alarms */
- GPR_ASSERT(
- 10 == grpc_alarm_check(NULL, gpr_time_add(start, gpr_time_from_millis(
- 1500, GPR_TIMESPAN)),
- NULL));
+ GPR_ASSERT(10 == grpc_alarm_check(
+ &exec_ctx, gpr_time_add(start, gpr_time_from_millis(
+ 1500, GPR_TIMESPAN)),
+ NULL));
+ grpc_exec_ctx_finish(&exec_ctx);
for (i = 0; i < 30; i++) {
GPR_ASSERT(cb_called[i][1] == (i < 20));
GPR_ASSERT(cb_called[i][0] == 0);
}
- GPR_ASSERT(0 == grpc_alarm_check(NULL,
+ GPR_ASSERT(0 == grpc_alarm_check(&exec_ctx,
gpr_time_add(start, gpr_time_from_millis(
1600, GPR_TIMESPAN)),
NULL));
@@ -110,7 +112,8 @@ static void add_test(void) {
GPR_ASSERT(cb_called[i][0] == 0);
}
- grpc_alarm_list_shutdown();
+ grpc_alarm_list_shutdown(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static gpr_timespec tfm(int m) {
@@ -122,28 +125,32 @@ static gpr_timespec tfm(int m) {
/* Cleaning up a list with pending alarms. */
void destruction_test(void) {
grpc_alarm alarms[5];
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_alarm_list_init(gpr_time_0(GPR_CLOCK_REALTIME));
memset(cb_called, 0, sizeof(cb_called));
- grpc_alarm_init(&alarms[0], tfm(100), cb, (void *)(gpr_intptr)0,
+ grpc_alarm_init(&exec_ctx, &alarms[0], tfm(100), cb, (void *)(gpr_intptr)0,
gpr_time_0(GPR_CLOCK_REALTIME));
- grpc_alarm_init(&alarms[1], tfm(3), cb, (void *)(gpr_intptr)1,
+ grpc_alarm_init(&exec_ctx, &alarms[1], tfm(3), cb, (void *)(gpr_intptr)1,
gpr_time_0(GPR_CLOCK_REALTIME));
- grpc_alarm_init(&alarms[2], tfm(100), cb, (void *)(gpr_intptr)2,
+ grpc_alarm_init(&exec_ctx, &alarms[2], tfm(100), cb, (void *)(gpr_intptr)2,
gpr_time_0(GPR_CLOCK_REALTIME));
- grpc_alarm_init(&alarms[3], tfm(3), cb, (void *)(gpr_intptr)3,
+ grpc_alarm_init(&exec_ctx, &alarms[3], tfm(3), cb, (void *)(gpr_intptr)3,
gpr_time_0(GPR_CLOCK_REALTIME));
- grpc_alarm_init(&alarms[4], tfm(1), cb, (void *)(gpr_intptr)4,
+ grpc_alarm_init(&exec_ctx, &alarms[4], tfm(1), cb, (void *)(gpr_intptr)4,
gpr_time_0(GPR_CLOCK_REALTIME));
- GPR_ASSERT(1 == grpc_alarm_check(NULL, tfm(2), NULL));
+ GPR_ASSERT(1 == grpc_alarm_check(&exec_ctx, tfm(2), NULL));
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(1 == cb_called[4][1]);
- grpc_alarm_cancel(&alarms[0]);
- grpc_alarm_cancel(&alarms[3]);
+ grpc_alarm_cancel(&exec_ctx, &alarms[0]);
+ grpc_alarm_cancel(&exec_ctx, &alarms[3]);
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(1 == cb_called[0][0]);
GPR_ASSERT(1 == cb_called[3][0]);
- grpc_alarm_list_shutdown();
+ grpc_alarm_list_shutdown(&exec_ctx);
+ grpc_exec_ctx_finish(&exec_ctx);
GPR_ASSERT(1 == cb_called[1][0]);
GPR_ASSERT(1 == cb_called[2][0]);
}
diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c
deleted file mode 100644
index 55aa517529..0000000000
--- a/test/core/iomgr/alarm_test.c
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-/* Test gRPC event manager with a simple TCP upload server and client. */
-#include "src/core/iomgr/alarm.h"
-
-#include <ctype.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-#include <grpc/support/log.h>
-#include <grpc/support/sync.h>
-#include <grpc/support/time.h>
-#include "test/core/util/test_config.h"
-
-#define SUCCESS_NOT_SET (-1)
-
-/* Dummy gRPC callback */
-void no_op_cb(void *arg, int success) {}
-
-typedef struct {
- gpr_cv cv;
- gpr_mu mu;
- grpc_iomgr_closure *followup_closure;
- int counter;
- int done_success_ctr;
- int done_cancel_ctr;
- int done;
- gpr_event fcb_arg;
- int success;
-} alarm_arg;
-
-static void followup_cb(void *arg, int success) {
- gpr_event_set((gpr_event *)arg, arg);
-}
-
-/* Called when an alarm expires. */
-static void alarm_cb(void *arg /* alarm_arg */, int success) {
- alarm_arg *a = arg;
- gpr_mu_lock(&a->mu);
- if (success) {
- a->counter++;
- a->done_success_ctr++;
- } else {
- a->done_cancel_ctr++;
- }
- a->done = 1;
- a->success = success;
- gpr_cv_signal(&a->cv);
- gpr_mu_unlock(&a->mu);
- grpc_iomgr_closure_init(a->followup_closure, followup_cb, &a->fcb_arg);
- grpc_iomgr_add_callback(a->followup_closure);
-}
-
-/* Test grpc_alarm add and cancel. */
-static void test_grpc_alarm(void) {
- grpc_alarm alarm;
- grpc_alarm alarm_to_cancel;
- /* Timeout on the alarm cond. var, so make big enough to absorb time
- deviations. Otherwise, operations after wait will not be properly ordered
- */
- gpr_timespec alarm_deadline;
- gpr_timespec followup_deadline;
-
- alarm_arg arg;
- alarm_arg arg2;
- void *fdone;
-
- grpc_init();
-
- arg.counter = 0;
- arg.success = SUCCESS_NOT_SET;
- arg.done_success_ctr = 0;
- arg.done_cancel_ctr = 0;
- arg.done = 0;
- gpr_mu_init(&arg.mu);
- gpr_cv_init(&arg.cv);
- arg.followup_closure = gpr_malloc(sizeof(grpc_iomgr_closure));
- gpr_event_init(&arg.fcb_arg);
-
- grpc_alarm_init(&alarm, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), alarm_cb, &arg,
- gpr_now(GPR_CLOCK_MONOTONIC));
-
- alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1);
- gpr_mu_lock(&arg.mu);
- while (arg.done == 0) {
- if (gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline)) {
- gpr_log(GPR_ERROR, "alarm deadline exceeded");
- break;
- }
- }
- gpr_mu_unlock(&arg.mu);
-
- followup_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5);
- fdone = gpr_event_wait(&arg.fcb_arg, followup_deadline);
-
- if (arg.counter != 1) {
- gpr_log(GPR_ERROR, "Alarm callback not called");
- GPR_ASSERT(0);
- } else if (arg.done_success_ctr != 1) {
- gpr_log(GPR_ERROR, "Alarm done callback not called with success");
- GPR_ASSERT(0);
- } else if (arg.done_cancel_ctr != 0) {
- gpr_log(GPR_ERROR, "Alarm done callback called with cancel");
- GPR_ASSERT(0);
- } else if (arg.success == SUCCESS_NOT_SET) {
- gpr_log(GPR_ERROR, "Alarm callback without status");
- GPR_ASSERT(0);
- } else {
- gpr_log(GPR_INFO, "Alarm callback called successfully");
- }
-
- if (fdone != (void *)&arg.fcb_arg) {
- gpr_log(GPR_ERROR, "Followup callback #1 not invoked properly %p %p", fdone,
- &arg.fcb_arg);
- GPR_ASSERT(0);
- }
- gpr_cv_destroy(&arg.cv);
- gpr_mu_destroy(&arg.mu);
- gpr_free(arg.followup_closure);
-
- arg2.counter = 0;
- arg2.success = SUCCESS_NOT_SET;
- arg2.done_success_ctr = 0;
- arg2.done_cancel_ctr = 0;
- arg2.done = 0;
- gpr_mu_init(&arg2.mu);
- gpr_cv_init(&arg2.cv);
- arg2.followup_closure = gpr_malloc(sizeof(grpc_iomgr_closure));
-
- gpr_event_init(&arg2.fcb_arg);
-
- grpc_alarm_init(&alarm_to_cancel, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100),
- alarm_cb, &arg2, gpr_now(GPR_CLOCK_MONOTONIC));
- grpc_alarm_cancel(&alarm_to_cancel);
-
- alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1);
- gpr_mu_lock(&arg2.mu);
- while (arg2.done == 0) {
- gpr_cv_wait(&arg2.cv, &arg2.mu, alarm_deadline);
- }
- gpr_mu_unlock(&arg2.mu);
-
- gpr_log(GPR_INFO, "alarm done = %d", arg2.done);
-
- followup_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5);
- fdone = gpr_event_wait(&arg2.fcb_arg, followup_deadline);
-
- if (arg2.counter != arg2.done_success_ctr) {
- gpr_log(GPR_ERROR, "Alarm callback called but didn't lead to done success");
- GPR_ASSERT(0);
- } else if (arg2.done_success_ctr && arg2.done_cancel_ctr) {
- gpr_log(GPR_ERROR, "Alarm done callback called with success and cancel");
- GPR_ASSERT(0);
- } else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) {
- gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times");
- GPR_ASSERT(0);
- } else if (arg2.success == SUCCESS_NOT_SET) {
- gpr_log(GPR_ERROR, "Alarm callback without status");
- GPR_ASSERT(0);
- } else if (arg2.done_success_ctr) {
- gpr_log(GPR_INFO, "Alarm callback executed before cancel");
- gpr_log(GPR_INFO, "Current value of triggered is %d\n",
- alarm_to_cancel.triggered);
- } else if (arg2.done_cancel_ctr) {
- gpr_log(GPR_INFO, "Alarm callback canceled");
- gpr_log(GPR_INFO, "Current value of triggered is %d\n",
- alarm_to_cancel.triggered);
- } else {
- gpr_log(GPR_ERROR, "Alarm cancel test should not be here");
- GPR_ASSERT(0);
- }
-
- if (fdone != (void *)&arg2.fcb_arg) {
- gpr_log(GPR_ERROR, "Followup callback #2 not invoked properly %p %p", fdone,
- &arg2.fcb_arg);
- GPR_ASSERT(0);
- }
- gpr_cv_destroy(&arg2.cv);
- gpr_mu_destroy(&arg2.mu);
- gpr_free(arg2.followup_closure);
-
- grpc_shutdown();
-}
-
-int main(int argc, char **argv) {
- grpc_test_init(argc, argv);
- test_grpc_alarm();
- return 0;
-}
diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c
index 3abde5ac35..ff590cf2d5 100644
--- a/test/core/iomgr/endpoint_pair_test.c
+++ b/test/core/iomgr/endpoint_pair_test.c
@@ -48,13 +48,15 @@ static void clean_up(void) {}
static grpc_endpoint_test_fixture create_fixture_endpoint_pair(
size_t slice_size) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_endpoint_test_fixture f;
grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", slice_size);
f.client_ep = p.client;
f.server_ep = p.server;
- grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset);
- grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset);
+ grpc_exec_ctx_finish(&exec_ctx);
return f;
}
@@ -63,14 +65,20 @@ static grpc_endpoint_test_config configs[] = {
{"tcp/tcp_socketpair", create_fixture_endpoint_pair, clean_up},
};
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
grpc_pollset_init(&g_pollset);
grpc_endpoint_tests(configs[0], &g_pollset);
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c
index 853b9a32c2..4b72590819 100644
--- a/test/core/iomgr/endpoint_tests.c
+++ b/test/core/iomgr/endpoint_tests.c
@@ -122,14 +122,14 @@ struct read_and_write_test_state {
int write_done;
gpr_slice_buffer incoming;
gpr_slice_buffer outgoing;
- grpc_iomgr_closure done_read;
- grpc_iomgr_closure done_write;
+ grpc_closure done_read;
+ grpc_closure done_write;
};
-static void read_and_write_test_read_handler(void *data, int success) {
+static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx,
+ void *data, int success) {
struct read_and_write_test_state *state = data;
-loop:
state->bytes_read += count_slices(
state->incoming.slices, state->incoming.count, &state->current_read_data);
if (state->bytes_read == state->target_bytes || !success) {
@@ -139,56 +139,35 @@ loop:
grpc_pollset_kick(g_pollset, NULL);
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
} else if (success) {
- switch (grpc_endpoint_read(state->read_ep, &state->incoming,
- &state->done_read)) {
- case GRPC_ENDPOINT_ERROR:
- success = 0;
- goto loop;
- case GRPC_ENDPOINT_DONE:
- success = 1;
- goto loop;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming,
+ &state->done_read);
}
}
-static void read_and_write_test_write_handler(void *data, int success) {
+static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx,
+ void *data, int success) {
struct read_and_write_test_state *state = data;
gpr_slice *slices = NULL;
size_t nslices;
- grpc_endpoint_op_status write_status;
if (success) {
- for (;;) {
- /* Need to do inline writes until they don't succeed synchronously or we
- finish writing */
- state->bytes_written += state->current_write_size;
- if (state->target_bytes - state->bytes_written <
- state->current_write_size) {
- state->current_write_size = state->target_bytes - state->bytes_written;
- }
- if (state->current_write_size == 0) {
- break;
- }
-
+ state->bytes_written += state->current_write_size;
+ if (state->target_bytes - state->bytes_written <
+ state->current_write_size) {
+ state->current_write_size = state->target_bytes - state->bytes_written;
+ }
+ if (state->current_write_size != 0) {
slices = allocate_blocks(state->current_write_size, 8192, &nslices,
&state->current_write_data);
gpr_slice_buffer_reset_and_unref(&state->outgoing);
gpr_slice_buffer_addn(&state->outgoing, slices, nslices);
- write_status = grpc_endpoint_write(state->write_ep, &state->outgoing,
- &state->done_write);
+ grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing,
+ &state->done_write);
free(slices);
- if (write_status == GRPC_ENDPOINT_PENDING) {
- return;
- } else if (write_status == GRPC_ENDPOINT_ERROR) {
- goto cleanup;
- }
+ return;
}
- GPR_ASSERT(state->bytes_written == state->target_bytes);
}
-cleanup:
gpr_log(GPR_INFO, "Write handler done");
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
state->write_done = 1 + success;
@@ -207,6 +186,7 @@ static void read_and_write_test(grpc_endpoint_test_config config,
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
grpc_endpoint_test_fixture f =
begin_test(config, "read_and_write_test", slice_size);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d",
num_bytes, write_size, slice_size, shutdown);
@@ -227,10 +207,9 @@ static void read_and_write_test(grpc_endpoint_test_config config,
state.write_done = 0;
state.current_read_data = 0;
state.current_write_data = 0;
- grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler,
- &state);
- grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler,
- &state);
+ grpc_closure_init(&state.done_read, read_and_write_test_read_handler, &state);
+ grpc_closure_init(&state.done_write, read_and_write_test_write_handler,
+ &state);
gpr_slice_buffer_init(&state.outgoing);
gpr_slice_buffer_init(&state.incoming);
@@ -239,41 +218,36 @@ static void read_and_write_test(grpc_endpoint_test_config config,
for the first iteration as for later iterations. It does the right thing
even when bytes_written is unsigned. */
state.bytes_written -= state.current_write_size;
- read_and_write_test_write_handler(&state, 1);
+ read_and_write_test_write_handler(&exec_ctx, &state, 1);
+ grpc_exec_ctx_finish(&exec_ctx);
- switch (
- grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) {
- case GRPC_ENDPOINT_PENDING:
- break;
- case GRPC_ENDPOINT_ERROR:
- read_and_write_test_read_handler(&state, 0);
- break;
- case GRPC_ENDPOINT_DONE:
- read_and_write_test_read_handler(&state, 1);
- break;
- }
+ grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming,
+ &state.done_read);
if (shutdown) {
gpr_log(GPR_DEBUG, "shutdown read");
- grpc_endpoint_shutdown(state.read_ep);
+ grpc_endpoint_shutdown(&exec_ctx, state.read_ep);
gpr_log(GPR_DEBUG, "shutdown write");
- grpc_endpoint_shutdown(state.write_ep);
+ grpc_endpoint_shutdown(&exec_ctx, state.write_ep);
}
+ grpc_exec_ctx_finish(&exec_ctx);
gpr_mu_lock(GRPC_POLLSET_MU(g_pollset));
while (!state.read_done || !state.write_done) {
grpc_pollset_worker worker;
GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0);
- grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
+ grpc_pollset_work(&exec_ctx, g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline);
}
gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
end_test(config);
gpr_slice_buffer_destroy(&state.outgoing);
gpr_slice_buffer_destroy(&state.incoming);
- grpc_endpoint_destroy(state.read_ep);
- grpc_endpoint_destroy(state.write_ep);
+ grpc_endpoint_destroy(&exec_ctx, state.read_ep);
+ grpc_endpoint_destroy(&exec_ctx, state.write_ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_endpoint_tests(grpc_endpoint_test_config config,
diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c
index 8327c681b8..401bf70a9e 100644
--- a/test/core/iomgr/fd_conservation_posix_test.c
+++ b/test/core/iomgr/fd_conservation_posix_test.c
@@ -43,6 +43,7 @@ int main(int argc, char **argv) {
int i;
struct rlimit rlim;
grpc_endpoint_pair p;
+
grpc_test_init(argc, argv);
grpc_iomgr_init();
@@ -53,9 +54,11 @@ int main(int argc, char **argv) {
GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim));
for (i = 0; i < 100; i++) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
p = grpc_iomgr_create_endpoint_pair("test", 1);
- grpc_endpoint_destroy(p.client);
- grpc_endpoint_destroy(p.server);
+ grpc_endpoint_destroy(&exec_ctx, p.client);
+ grpc_endpoint_destroy(&exec_ctx, p.server);
+ grpc_exec_ctx_finish(&exec_ctx);
}
grpc_iomgr_shutdown();
diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c
index 75959069c0..f592f63ba9 100644
--- a/test/core/iomgr/fd_posix_test.c
+++ b/test/core/iomgr/fd_posix_test.c
@@ -98,7 +98,7 @@ typedef struct {
grpc_fd *em_fd; /* listening fd */
ssize_t read_bytes_total; /* total number of received bytes */
int done; /* set to 1 when a server finishes serving */
- grpc_iomgr_closure listen_closure;
+ grpc_closure listen_closure;
} server;
static void server_init(server *sv) {
@@ -112,23 +112,23 @@ typedef struct {
server *sv; /* not owned by a single session */
grpc_fd *em_fd; /* fd to read upload bytes */
char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
- grpc_iomgr_closure session_read_closure;
+ grpc_closure session_read_closure;
} session;
/* Called when an upload session can be safely shutdown.
Close session FD and start to shutdown listen FD. */
-static void session_shutdown_cb(void *arg, /*session*/
+static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
int success) {
session *se = arg;
server *sv = se->sv;
- grpc_fd_orphan(se->em_fd, NULL, "a");
+ grpc_fd_orphan(exec_ctx, se->em_fd, NULL, "a");
gpr_free(se);
/* Start to shutdown listen fd. */
- grpc_fd_shutdown(sv->em_fd);
+ grpc_fd_shutdown(exec_ctx, sv->em_fd);
}
/* Called when data become readable in a session. */
-static void session_read_cb(void *arg, /*session*/
+static void session_read_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
int success) {
session *se = arg;
int fd = se->em_fd->fd;
@@ -137,7 +137,7 @@ static void session_read_cb(void *arg, /*session*/
ssize_t read_total = 0;
if (!success) {
- session_shutdown_cb(arg, 1);
+ session_shutdown_cb(exec_ctx, arg, 1);
return;
}
@@ -152,7 +152,7 @@ static void session_read_cb(void *arg, /*session*/
It is possible to read nothing due to spurious edge event or data has
been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
if (read_once == 0) {
- session_shutdown_cb(arg, 1);
+ session_shutdown_cb(exec_ctx, arg, 1);
} else if (read_once == -1) {
if (errno == EAGAIN) {
/* An edge triggered event is cached in the kernel until next poll.
@@ -163,7 +163,7 @@ static void session_read_cb(void *arg, /*session*/
TODO(chenw): in multi-threaded version, callback and polling can be
run in different threads. polling may catch a persist read edge event
before notify_on_read is called. */
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
} else {
gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
abort();
@@ -173,10 +173,11 @@ static void session_read_cb(void *arg, /*session*/
/* Called when the listen FD can be safely shutdown.
Close listen FD and signal that server can be shutdown. */
-static void listen_shutdown_cb(void *arg /*server*/, int success) {
+static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */,
+ int success) {
server *sv = arg;
- grpc_fd_orphan(sv->em_fd, NULL, "b");
+ grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, "b");
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
sv->done = 1;
@@ -185,7 +186,7 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) {
}
/* Called when a new TCP connection request arrives in the listening port. */
-static void listen_cb(void *arg, /*=sv_arg*/
+static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/
int success) {
server *sv = arg;
int fd;
@@ -196,7 +197,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
grpc_fd *listen_em_fd = sv->em_fd;
if (!success) {
- listen_shutdown_cb(arg, 1);
+ listen_shutdown_cb(exec_ctx, arg, 1);
return;
}
@@ -208,12 +209,12 @@ static void listen_cb(void *arg, /*=sv_arg*/
se = gpr_malloc(sizeof(*se));
se->sv = sv;
se->em_fd = grpc_fd_create(fd, "listener");
- grpc_pollset_add_fd(&g_pollset, se->em_fd);
+ grpc_pollset_add_fd(exec_ctx, &g_pollset, se->em_fd);
se->session_read_closure.cb = session_read_cb;
se->session_read_closure.cb_arg = se;
- grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure);
+ grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
- grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(exec_ctx, listen_em_fd, &sv->listen_closure);
}
/* Max number of connections pending to be accepted by listen(). */
@@ -223,7 +224,7 @@ static void listen_cb(void *arg, /*=sv_arg*/
listen_cb() is registered to be interested in reading from listen_fd.
When connection request arrives, listen_cb() is called to accept the
connection request. */
-static int server_start(server *sv) {
+static int server_start(grpc_exec_ctx *exec_ctx, server *sv) {
int port = 0;
int fd;
struct sockaddr_in sin;
@@ -237,11 +238,11 @@ static int server_start(server *sv) {
GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
sv->em_fd = grpc_fd_create(fd, "server");
- grpc_pollset_add_fd(&g_pollset, sv->em_fd);
+ grpc_pollset_add_fd(exec_ctx, &g_pollset, sv->em_fd);
/* Register to be interested in reading from listen_fd. */
sv->listen_closure.cb = listen_cb;
sv->listen_closure.cb_arg = sv;
- grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure);
+ grpc_fd_notify_on_read(exec_ctx, sv->em_fd, &sv->listen_closure);
return port;
}
@@ -250,9 +251,14 @@ static int server_start(server *sv) {
static void server_wait_and_shutdown(server *sv) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!sv->done) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@@ -274,7 +280,7 @@ typedef struct {
int client_write_cnt;
int done; /* set to 1 when a client finishes sending */
- grpc_iomgr_closure write_closure;
+ grpc_closure write_closure;
} client;
static void client_init(client *cl) {
@@ -285,15 +291,16 @@ static void client_init(client *cl) {
}
/* Called when a client upload session is ready to shutdown. */
-static void client_session_shutdown_cb(void *arg /*client*/, int success) {
+static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx,
+ void *arg /*client */, int success) {
client *cl = arg;
- grpc_fd_orphan(cl->em_fd, NULL, "c");
+ grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, "c");
cl->done = 1;
grpc_pollset_kick(&g_pollset, NULL);
}
/* Write as much as possible, then register notify_on_write. */
-static void client_session_write(void *arg, /*client*/
+static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */
int success) {
client *cl = arg;
int fd = cl->em_fd->fd;
@@ -301,7 +308,7 @@ static void client_session_write(void *arg, /*client*/
if (!success) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
- client_session_shutdown_cb(arg, 1);
+ client_session_shutdown_cb(exec_ctx, arg, 1);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
return;
}
@@ -316,10 +323,10 @@ static void client_session_write(void *arg, /*client*/
if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
cl->write_closure.cb = client_session_write;
cl->write_closure.cb_arg = cl;
- grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure);
+ grpc_fd_notify_on_write(exec_ctx, cl->em_fd, &cl->write_closure);
cl->client_write_cnt++;
} else {
- client_session_shutdown_cb(arg, 1);
+ client_session_shutdown_cb(exec_ctx, arg, 1);
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} else {
@@ -329,7 +336,7 @@ static void client_session_write(void *arg, /*client*/
}
/* Start a client to send a stream of bytes. */
-static void client_start(client *cl, int port) {
+static void client_start(grpc_exec_ctx *exec_ctx, client *cl, int port) {
int fd;
struct sockaddr_in sin;
create_test_socket(port, &fd, &sin);
@@ -350,9 +357,9 @@ static void client_start(client *cl, int port) {
}
cl->em_fd = grpc_fd_create(fd, "client");
- grpc_pollset_add_fd(&g_pollset, cl->em_fd);
+ grpc_pollset_add_fd(exec_ctx, &g_pollset, cl->em_fd);
- client_session_write(cl, 1);
+ client_session_write(exec_ctx, cl, 1);
}
/* Wait for the signal to shutdown a client. */
@@ -360,8 +367,13 @@ static void client_wait_and_shutdown(client *cl) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (!cl->done) {
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
@@ -373,11 +385,13 @@ static void test_grpc_fd(void) {
server sv;
client cl;
int port;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
server_init(&sv);
- port = server_start(&sv);
+ port = server_start(&exec_ctx, &sv);
client_init(&cl);
- client_start(&cl, port);
+ client_start(&exec_ctx, &cl, port);
+ grpc_exec_ctx_finish(&exec_ctx);
client_wait_and_shutdown(&cl);
server_wait_and_shutdown(&sv);
GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
@@ -385,14 +399,15 @@ static void test_grpc_fd(void) {
}
typedef struct fd_change_data {
- void (*cb_that_ran)(void *, int success);
+ void (*cb_that_ran)(grpc_exec_ctx *exec_ctx, void *, int success);
} fd_change_data;
void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; }
void destroy_change_data(fd_change_data *fdc) {}
-static void first_read_callback(void *arg /* fd_change_data */, int success) {
+static void first_read_callback(grpc_exec_ctx *exec_ctx,
+ void *arg /* fd_change_data */, int success) {
fd_change_data *fdc = arg;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -401,7 +416,8 @@ static void first_read_callback(void *arg /* fd_change_data */, int success) {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
-static void second_read_callback(void *arg /* fd_change_data */, int success) {
+static void second_read_callback(grpc_exec_ctx *exec_ctx,
+ void *arg /* fd_change_data */, int success) {
fd_change_data *fdc = arg;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -421,8 +437,9 @@ static void test_grpc_fd_change(void) {
int sv[2];
char data;
ssize_t result;
- grpc_iomgr_closure first_closure;
- grpc_iomgr_closure second_closure;
+ grpc_closure first_closure;
+ grpc_closure second_closure;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
first_closure.cb = first_read_callback;
first_closure.cb_arg = &a;
@@ -439,10 +456,10 @@ static void test_grpc_fd_change(void) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
- grpc_pollset_add_fd(&g_pollset, em_fd);
+ grpc_pollset_add_fd(&exec_ctx, &g_pollset, em_fd);
/* Register the first callback, then make its FD readable */
- grpc_fd_notify_on_read(em_fd, &first_closure);
+ grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -451,8 +468,12 @@ static void test_grpc_fd_change(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (a.cb_that_ran == NULL) {
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
GPR_ASSERT(a.cb_that_ran == first_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -463,7 +484,7 @@ static void test_grpc_fd_change(void) {
/* Now register a second callback with distinct change data, and do the same
thing again. */
- grpc_fd_notify_on_read(em_fd, &second_closure);
+ grpc_fd_notify_on_read(&exec_ctx, em_fd, &second_closure);
data = 0;
result = write(sv[1], &data, 1);
GPR_ASSERT(result == 1);
@@ -471,28 +492,39 @@ static void test_grpc_fd_change(void) {
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (b.cb_that_ran == NULL) {
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
/* Except now we verify that second_read_callback ran instead */
GPR_ASSERT(b.cb_that_ran == second_read_callback);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_fd_orphan(em_fd, NULL, "d");
+ grpc_fd_orphan(&exec_ctx, em_fd, NULL, "d");
+ grpc_exec_ctx_finish(&exec_ctx);
destroy_change_data(&a);
destroy_change_data(&b);
close(sv[1]);
}
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_iomgr_init();
grpc_pollset_init(&g_pollset);
test_grpc_fd();
test_grpc_fd_change();
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c
index 668c5399f9..1c95a9960e 100644
--- a/test/core/iomgr/resolve_address_test.c
+++ b/test/core/iomgr/resolve_address_test.c
@@ -42,16 +42,18 @@ static gpr_timespec test_deadline(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(100);
}
-static void must_succeed(void* evp, grpc_resolved_addresses* p) {
+static void must_succeed(grpc_exec_ctx *exec_ctx, void *evp,
+ grpc_resolved_addresses *p) {
GPR_ASSERT(p);
GPR_ASSERT(p->naddrs >= 1);
grpc_resolved_addresses_destroy(p);
- gpr_event_set(evp, (void*)1);
+ gpr_event_set(evp, (void *)1);
}
-static void must_fail(void* evp, grpc_resolved_addresses* p) {
+static void must_fail(grpc_exec_ctx *exec_ctx, void *evp,
+ grpc_resolved_addresses *p) {
GPR_ASSERT(!p);
- gpr_event_set(evp, (void*)1);
+ gpr_event_set(evp, (void *)1);
}
static void test_localhost(void) {
@@ -83,7 +85,7 @@ static void test_ipv6_with_port(void) {
}
static void test_ipv6_without_port(void) {
- const char* const kCases[] = {
+ const char *const kCases[] = {
"2001:db8::1", "2001:db8::1.2.3.4", "[2001:db8::1]",
};
unsigned i;
@@ -96,7 +98,7 @@ static void test_ipv6_without_port(void) {
}
static void test_invalid_ip_addresses(void) {
- const char* const kCases[] = {
+ const char *const kCases[] = {
"293.283.1238.3:1", "[2001:db8::11111]:1",
};
unsigned i;
@@ -109,7 +111,7 @@ static void test_invalid_ip_addresses(void) {
}
static void test_unparseable_hostports(void) {
- const char* const kCases[] = {
+ const char *const kCases[] = {
"[", "[::1", "[::1]bad", "[1.2.3.4]", "[localhost]", "[localhost]:1",
};
unsigned i;
@@ -121,7 +123,7 @@ static void test_unparseable_hostports(void) {
}
}
-int main(int argc, char** argv) {
+int main(int argc, char **argv) {
grpc_test_init(argc, argv);
grpc_iomgr_init();
test_localhost();
diff --git a/test/core/iomgr/sockaddr_utils_test.c b/test/core/iomgr/sockaddr_utils_test.c
index 72a0f71835..5009a641ea 100644
--- a/test/core/iomgr/sockaddr_utils_test.c
+++ b/test/core/iomgr/sockaddr_utils_test.c
@@ -63,9 +63,11 @@ static struct sockaddr_in6 make_addr6(const gpr_uint8 *data, size_t data_len) {
static const gpr_uint8 kMapped[] = {0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0xff, 0xff, 192, 0, 2, 1};
+
static const gpr_uint8 kNotQuiteMapped[] = {0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0xff, 0xfe, 192, 0, 2, 99};
static const gpr_uint8 kIPv4[] = {192, 0, 2, 1};
+
static const gpr_uint8 kIPv6[] = {0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 1};
diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c
index f0e2de24d9..a61cccdb02 100644
--- a/test/core/iomgr/tcp_client_posix_test.c
+++ b/test/core/iomgr/tcp_client_posix_test.c
@@ -50,6 +50,7 @@
static grpc_pollset_set g_pollset_set;
static grpc_pollset g_pollset;
static int g_connections_complete = 0;
+static grpc_endpoint *g_connecting = NULL;
static gpr_timespec test_deadline(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
@@ -62,15 +63,18 @@ static void finish_connection() {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
-static void must_succeed(void *arg, grpc_endpoint *tcp) {
- GPR_ASSERT(tcp);
- grpc_endpoint_shutdown(tcp);
- grpc_endpoint_destroy(tcp);
+static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ GPR_ASSERT(g_connecting != NULL);
+ GPR_ASSERT(success);
+ grpc_endpoint_shutdown(exec_ctx, g_connecting);
+ grpc_endpoint_destroy(exec_ctx, g_connecting);
+ g_connecting = NULL;
finish_connection();
}
-static void must_fail(void *arg, grpc_endpoint *tcp) {
- GPR_ASSERT(!tcp);
+static void must_fail(grpc_exec_ctx *exec_ctx, void *arg, int success) {
+ GPR_ASSERT(g_connecting == NULL);
+ GPR_ASSERT(!success);
finish_connection();
}
@@ -80,6 +84,8 @@ void test_succeeds(void) {
int svr_fd;
int r;
int connections_complete_before;
+ grpc_closure done;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_DEBUG, "test_succeeds");
@@ -98,7 +104,8 @@ void test_succeeds(void) {
/* connect to it */
GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0);
- grpc_tcp_client_connect(must_succeed, NULL, &g_pollset_set,
+ grpc_closure_init(&done, must_succeed, NULL);
+ grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set,
(struct sockaddr *)&addr, addr_len,
gpr_inf_future(GPR_CLOCK_REALTIME));
@@ -114,8 +121,12 @@ void test_succeeds(void) {
while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -125,6 +136,8 @@ void test_fails(void) {
struct sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
int connections_complete_before;
+ grpc_closure done;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_DEBUG, "test_fails");
@@ -136,7 +149,8 @@ void test_fails(void) {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
/* connect to a broken address */
- grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set,
+ grpc_closure_init(&done, must_fail, NULL);
+ grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set,
(struct sockaddr *)&addr, addr_len,
gpr_inf_future(GPR_CLOCK_REALTIME));
@@ -145,8 +159,11 @@ void test_fails(void) {
/* wait for the connection callback to finish */
while (g_connections_complete == connections_complete_before) {
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- test_deadline());
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), test_deadline());
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -162,6 +179,8 @@ void test_times_out(void) {
int r;
int connections_complete_before;
gpr_timespec connect_deadline;
+ grpc_closure done;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_DEBUG, "test_times_out");
@@ -195,7 +214,8 @@ void test_times_out(void) {
connections_complete_before = g_connections_complete;
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set,
+ grpc_closure_init(&done, must_fail, NULL);
+ grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set,
(struct sockaddr *)&addr, addr_len, connect_deadline);
/* Make sure the event doesn't trigger early */
@@ -203,25 +223,31 @@ void test_times_out(void) {
for (;;) {
grpc_pollset_worker worker;
gpr_timespec now = gpr_now(connect_deadline.clock_type);
- gpr_timespec continue_verifying_time = gpr_time_from_seconds(2, GPR_TIMESPAN);
- gpr_timespec grace_time = gpr_time_from_seconds(1, GPR_TIMESPAN);
- gpr_timespec finish_time = gpr_time_add(connect_deadline, continue_verifying_time);
- gpr_timespec restart_verifying_time = gpr_time_add(connect_deadline, grace_time);
+ gpr_timespec continue_verifying_time =
+ gpr_time_from_seconds(5, GPR_TIMESPAN);
+ gpr_timespec grace_time = gpr_time_from_seconds(3, GPR_TIMESPAN);
+ gpr_timespec finish_time =
+ gpr_time_add(connect_deadline, continue_verifying_time);
+ gpr_timespec restart_verifying_time =
+ gpr_time_add(connect_deadline, grace_time);
int is_after_deadline = gpr_time_cmp(now, connect_deadline) > 0;
if (gpr_time_cmp(now, finish_time) > 0) {
break;
}
- gpr_log(GPR_DEBUG, "now=%d.%09d connect_deadline=%d.%09d",
- now.tv_sec, now.tv_nsec, connect_deadline.tv_sec, connect_deadline.tv_nsec);
- if (is_after_deadline &&
- gpr_time_cmp(now, restart_verifying_time) <= 0) {
+ gpr_log(GPR_DEBUG, "now=%d.%09d connect_deadline=%d.%09d", now.tv_sec,
+ now.tv_nsec, connect_deadline.tv_sec, connect_deadline.tv_nsec);
+ if (is_after_deadline && gpr_time_cmp(now, restart_verifying_time) <= 0) {
/* allow some slack before insisting that things be done */
} else {
GPR_ASSERT(g_connections_complete ==
connections_complete_before + is_after_deadline);
}
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
@@ -231,20 +257,27 @@ void test_times_out(void) {
}
}
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
grpc_pollset_set_init(&g_pollset_set);
grpc_pollset_init(&g_pollset);
- grpc_pollset_set_add_pollset(&g_pollset_set, &g_pollset);
+ grpc_pollset_set_add_pollset(&exec_ctx, &g_pollset_set, &g_pollset);
+ grpc_exec_ctx_finish(&exec_ctx);
test_succeeds();
gpr_log(GPR_ERROR, "End of first test");
test_fails();
test_times_out();
grpc_pollset_set_destroy(&g_pollset_set);
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
}
diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c
index 59c498edff..f676454b7f 100644
--- a/test/core/iomgr/tcp_posix_test.c
+++ b/test/core/iomgr/tcp_posix_test.c
@@ -119,7 +119,7 @@ struct read_socket_state {
size_t read_bytes;
size_t target_read_bytes;
gpr_slice_buffer incoming;
- grpc_iomgr_closure read_cb;
+ grpc_closure read_cb;
};
static size_t count_slices(gpr_slice *slices, size_t nslices,
@@ -138,7 +138,7 @@ static size_t count_slices(gpr_slice *slices, size_t nslices,
return num_bytes;
}
-static void read_cb(void *user_data, int success) {
+static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
struct read_socket_state *state = (struct read_socket_state *)user_data;
size_t read_bytes;
int current_data;
@@ -155,19 +155,8 @@ static void read_cb(void *user_data, int success) {
if (state->read_bytes >= state->target_read_bytes) {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
} else {
- switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) {
- case GRPC_ENDPOINT_DONE:
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- read_cb(user_data, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- read_cb(user_data, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- break;
- }
+ grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
}
}
@@ -178,6 +167,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
struct read_socket_state state;
size_t written_bytes;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "Read test of size %d, slice size %d", num_bytes,
slice_size);
@@ -185,7 +175,7 @@ static void read_test(size_t num_bytes, size_t slice_size) {
create_sockets(sv);
ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test");
- grpc_endpoint_add_to_pollset(ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
written_bytes = fill_socket_partial(sv[0], num_bytes);
gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
@@ -194,30 +184,25 @@ static void read_test(size_t num_bytes, size_t slice_size) {
state.read_bytes = 0;
state.target_read_bytes = written_bytes;
gpr_slice_buffer_init(&state.incoming);
- grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
+ grpc_closure_init(&state.read_cb, read_cb, &state);
- switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
- case GRPC_ENDPOINT_DONE:
- read_cb(&state, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- read_cb(&state, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_slice_buffer_destroy(&state.incoming);
- grpc_endpoint_destroy(ep);
+ grpc_endpoint_destroy(&exec_ctx, ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
/* Write to a socket until it fills up, then read from it using the grpc_tcp
@@ -228,6 +213,7 @@ static void large_read_test(size_t slice_size) {
struct read_socket_state state;
ssize_t written_bytes;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "Start large read test, slice size %d", slice_size);
@@ -235,7 +221,7 @@ static void large_read_test(size_t slice_size) {
ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size,
"test");
- grpc_endpoint_add_to_pollset(ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
written_bytes = fill_socket(sv[0]);
gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
@@ -244,30 +230,25 @@ static void large_read_test(size_t slice_size) {
state.read_bytes = 0;
state.target_read_bytes = (size_t)written_bytes;
gpr_slice_buffer_init(&state.incoming);
- grpc_iomgr_closure_init(&state.read_cb, read_cb, &state);
+ grpc_closure_init(&state.read_cb, read_cb, &state);
- switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) {
- case GRPC_ENDPOINT_DONE:
- read_cb(&state, 1);
- break;
- case GRPC_ENDPOINT_ERROR:
- read_cb(&state, 0);
- break;
- case GRPC_ENDPOINT_PENDING:
- break;
- }
+ grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
while (state.read_bytes < state.target_read_bytes) {
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
GPR_ASSERT(state.read_bytes == state.target_read_bytes);
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_slice_buffer_destroy(&state.incoming);
- grpc_endpoint_destroy(ep);
+ grpc_endpoint_destroy(&exec_ctx, ep);
+ grpc_exec_ctx_finish(&exec_ctx);
}
struct write_socket_state {
@@ -298,7 +279,8 @@ static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size,
return slices;
}
-static void write_done(void *user_data /* write_socket_state */, int success) {
+static void write_done(grpc_exec_ctx *exec_ctx,
+ void *user_data /* write_socket_state */, int success) {
struct write_socket_state *state = (struct write_socket_state *)user_data;
gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -315,6 +297,7 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
int flags;
int current = 0;
int i;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
flags = fcntl(fd, F_GETFL, 0);
GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
@@ -322,9 +305,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
for (;;) {
grpc_pollset_worker worker;
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
do {
bytes_read =
read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
@@ -343,26 +328,6 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
gpr_free(buf);
}
-static ssize_t drain_socket(int fd) {
- ssize_t read_bytes;
- ssize_t total_bytes = 0;
- unsigned char buf[256];
- int current = 0;
- int i;
- do {
- read_bytes = read(fd, buf, 256);
- if (read_bytes > 0) {
- total_bytes += read_bytes;
- for (i = 0; i < read_bytes; ++i) {
- GPR_ASSERT(buf[i] == current);
- current = (current + 1) % 256;
- }
- }
- } while (read_bytes >= 0 || errno == EINTR);
- GPR_ASSERT(errno == EAGAIN);
- return total_bytes;
-}
-
/* Write to a socket using the grpc_tcp API, then drain it directly.
Note that if the write does not complete immediately we need to drain the
socket in parallel with the read. */
@@ -370,13 +335,13 @@ static void write_test(size_t num_bytes, size_t slice_size) {
int sv[2];
grpc_endpoint *ep;
struct write_socket_state state;
- ssize_t read_bytes;
size_t num_blocks;
gpr_slice *slices;
gpr_uint8 current_data = 0;
gpr_slice_buffer outgoing;
- grpc_iomgr_closure write_done_closure;
+ grpc_closure write_done_closure;
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
slice_size);
@@ -385,7 +350,7 @@ static void write_test(size_t num_bytes, size_t slice_size) {
ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"),
GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
- grpc_endpoint_add_to_pollset(ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
state.ep = ep;
state.write_done = 0;
@@ -394,35 +359,28 @@ static void write_test(size_t num_bytes, size_t slice_size) {
gpr_slice_buffer_init(&outgoing);
gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
- grpc_iomgr_closure_init(&write_done_closure, write_done, &state);
+ grpc_closure_init(&write_done_closure, write_done, &state);
- switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) {
- case GRPC_ENDPOINT_DONE:
- /* Write completed immediately */
- read_bytes = drain_socket(sv[0]);
- GPR_ASSERT((size_t)read_bytes == num_bytes);
- break;
- case GRPC_ENDPOINT_PENDING:
- drain_socket_blocking(sv[0], num_bytes, num_bytes);
- gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
- for (;;) {
- grpc_pollset_worker worker;
- if (state.write_done) {
- break;
- }
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
- }
- gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure);
+ drain_socket_blocking(sv[0], num_bytes, num_bytes);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ for (;;) {
+ grpc_pollset_worker worker;
+ if (state.write_done) {
break;
- case GRPC_ENDPOINT_ERROR:
- gpr_log(GPR_ERROR, "endpoint got error");
- abort();
+ }
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
gpr_slice_buffer_destroy(&outgoing);
- grpc_endpoint_destroy(ep);
+ grpc_endpoint_destroy(&exec_ctx, ep);
gpr_free(slices);
+ grpc_exec_ctx_finish(&exec_ctx);
}
void run_tests(void) {
@@ -452,14 +410,17 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
size_t slice_size) {
int sv[2];
grpc_endpoint_test_fixture f;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
create_sockets(sv);
f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"),
slice_size, "test");
f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"),
slice_size, "test");
- grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset);
- grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset);
+ grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset);
+
+ grpc_exec_ctx_finish(&exec_ctx);
return f;
}
@@ -468,15 +429,21 @@ static grpc_endpoint_test_config configs[] = {
{"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
};
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_init();
grpc_pollset_init(&g_pollset);
run_tests();
grpc_endpoint_tests(configs[0], &g_pollset);
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_shutdown();
return 0;
diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c
index 29a20cba8e..530381e37f 100644
--- a/test/core/iomgr/tcp_server_posix_test.c
+++ b/test/core/iomgr/tcp_server_posix_test.c
@@ -48,9 +48,9 @@
static grpc_pollset g_pollset;
static int g_nconnects = 0;
-static void on_connect(void *arg, grpc_endpoint *tcp) {
- grpc_endpoint_shutdown(tcp);
- grpc_endpoint_destroy(tcp);
+static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) {
+ grpc_endpoint_shutdown(exec_ctx, tcp);
+ grpc_endpoint_destroy(exec_ctx, tcp);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
g_nconnects++;
@@ -59,18 +59,23 @@ static void on_connect(void *arg, grpc_endpoint *tcp) {
}
static void test_no_op(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server *s = grpc_tcp_server_create();
- grpc_tcp_server_destroy(s, NULL, NULL);
+ grpc_tcp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_start(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST("test_no_op_with_start");
- grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
- grpc_tcp_server_destroy(s, NULL, NULL);
+ grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
+ grpc_tcp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST("test_no_op_with_port");
@@ -80,10 +85,12 @@ static void test_no_op_with_port(void) {
GPR_ASSERT(
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)));
- grpc_tcp_server_destroy(s, NULL, NULL);
+ grpc_tcp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port_and_start(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
grpc_tcp_server *s = grpc_tcp_server_create();
LOG_TEST("test_no_op_with_port_and_start");
@@ -93,12 +100,14 @@ static void test_no_op_with_port_and_start(void) {
GPR_ASSERT(
grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr)));
- grpc_tcp_server_start(s, NULL, 0, on_connect, NULL);
+ grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL);
- grpc_tcp_server_destroy(s, NULL, NULL);
+ grpc_tcp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_connect(int n) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr);
int svrfd, clifd;
@@ -120,7 +129,7 @@ static void test_connect(int n) {
GPR_ASSERT(addr_len <= sizeof(addr));
pollsets[0] = &g_pollset;
- grpc_tcp_server_start(s, pollsets, 1, on_connect, NULL);
+ grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, NULL);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -137,8 +146,11 @@ static void test_connect(int n) {
while (g_nconnects == nconnects_before &&
gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
gpr_log(GPR_DEBUG, "wait done");
@@ -148,12 +160,17 @@ static void test_connect(int n) {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_tcp_server_destroy(s, NULL, NULL);
+ grpc_tcp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_iomgr_init();
grpc_pollset_init(&g_pollset);
@@ -165,7 +182,9 @@ int main(int argc, char **argv) {
test_connect(1);
test_connect(10);
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/iomgr/time_averaged_stats_test.c b/test/core/iomgr/time_averaged_stats_test.c
index 4206a1c58f..cb006d152a 100644
--- a/test/core/iomgr/time_averaged_stats_test.c
+++ b/test/core/iomgr/time_averaged_stats_test.c
@@ -189,7 +189,7 @@ static void some_regress_some_persist_test(void) {
grpc_time_averaged_stats_add_sample(&tas, 4944.32);
grpc_time_averaged_stats_update_average(&tas);
/* (1 * 4944.32 + 0.6 * 2.384 * 2200 + 0.4 * 1000) /
- (1 + 0.6 * 2.384 + 0.4) */
+ (1 + 0.6 * 2.384 + 0.4) */
EXPECT_DOUBLE_EQ(3000, tas.aggregate_weighted_avg);
EXPECT_DOUBLE_EQ(2.8304, tas.aggregate_total_weight);
}
diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c
index 24b6b1c717..d56500707c 100644
--- a/test/core/iomgr/udp_server_test.c
+++ b/test/core/iomgr/udp_server_test.c
@@ -64,18 +64,23 @@ static void on_read(int fd) {
}
static void test_no_op(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_udp_server *s = grpc_udp_server_create();
- grpc_udp_server_destroy(s, NULL, NULL);
+ grpc_udp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_start(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_udp_server *s = grpc_udp_server_create();
LOG_TEST("test_no_op_with_start");
- grpc_udp_server_start(s, NULL, 0);
- grpc_udp_server_destroy(s, NULL, NULL);
+ grpc_udp_server_start(&exec_ctx, s, NULL, 0);
+ grpc_udp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
grpc_udp_server *s = grpc_udp_server_create();
LOG_TEST("test_no_op_with_port");
@@ -85,10 +90,12 @@ static void test_no_op_with_port(void) {
GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr),
on_read));
- grpc_udp_server_destroy(s, NULL, NULL);
+ grpc_udp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_no_op_with_port_and_start(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_in addr;
grpc_udp_server *s = grpc_udp_server_create();
LOG_TEST("test_no_op_with_port_and_start");
@@ -98,12 +105,14 @@ static void test_no_op_with_port_and_start(void) {
GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr),
on_read));
- grpc_udp_server_start(s, NULL, 0);
+ grpc_udp_server_start(&exec_ctx, s, NULL, 0);
- grpc_udp_server_destroy(s, NULL, NULL);
+ grpc_udp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
static void test_receive(int number_of_clients) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
struct sockaddr_storage addr;
socklen_t addr_len = sizeof(addr);
int clifd, svrfd;
@@ -128,7 +137,7 @@ static void test_receive(int number_of_clients) {
GPR_ASSERT(addr_len <= sizeof(addr));
pollsets[0] = &g_pollset;
- grpc_udp_server_start(s, pollsets, 1);
+ grpc_udp_server_start(&exec_ctx, s, pollsets, 1);
gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
@@ -144,8 +153,11 @@ static void test_receive(int number_of_clients) {
while (g_number_of_reads == number_of_reads_before &&
gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
grpc_pollset_worker worker;
- grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC),
- deadline);
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(GPR_CLOCK_MONOTONIC), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
}
GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1);
close(clifd);
@@ -154,12 +166,17 @@ static void test_receive(int number_of_clients) {
gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
- grpc_udp_server_destroy(s, NULL, NULL);
+ grpc_udp_server_destroy(&exec_ctx, s, NULL);
+ grpc_exec_ctx_finish(&exec_ctx);
}
-static void destroy_pollset(void *p) { grpc_pollset_destroy(p); }
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
+ grpc_pollset_destroy(p);
+}
int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_test_init(argc, argv);
grpc_iomgr_init();
grpc_pollset_init(&g_pollset);
@@ -171,7 +188,9 @@ int main(int argc, char **argv) {
test_receive(1);
test_receive(10);
- grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset);
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
grpc_iomgr_shutdown();
return 0;
}
diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c
new file mode 100644
index 0000000000..90f7ba7a83
--- /dev/null
+++ b/test/core/iomgr/workqueue_test.c
@@ -0,0 +1,93 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/core/iomgr/workqueue.h"
+
+#include <grpc/grpc.h>
+#include <grpc/support/log.h>
+
+#include "test/core/util/test_config.h"
+
+static grpc_pollset g_pollset;
+
+static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, int success) {
+ GPR_ASSERT(success == 1);
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ *(int *)p = 1;
+ grpc_pollset_kick(&g_pollset, NULL);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+}
+
+static void test_add_closure(void) {
+ grpc_closure c;
+ int done = 0;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_workqueue *wq = grpc_workqueue_create(&exec_ctx);
+ gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5);
+ grpc_pollset_worker worker;
+ grpc_closure_init(&c, must_succeed, &done);
+
+ grpc_workqueue_push(wq, &c, 1);
+ grpc_workqueue_add_to_pollset(&exec_ctx, wq, &g_pollset);
+
+ gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
+ GPR_ASSERT(!done);
+ grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
+ gpr_now(deadline.clock_type), deadline);
+ gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
+ grpc_exec_ctx_finish(&exec_ctx);
+ GPR_ASSERT(done);
+
+ GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy");
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
+ grpc_pollset_destroy(p);
+}
+
+int main(int argc, char **argv) {
+ grpc_closure destroyed;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ grpc_test_init(argc, argv);
+ grpc_init();
+ grpc_pollset_init(&g_pollset);
+
+ test_add_closure();
+
+ grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
+ grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
+ grpc_exec_ctx_finish(&exec_ctx);
+ grpc_shutdown();
+ return 0;
+}