diff options
Diffstat (limited to 'test/core/iomgr')
-rw-r--r-- | test/core/iomgr/alarm_list_test.c | 63 | ||||
-rw-r--r-- | test/core/iomgr/alarm_test.c | 225 | ||||
-rw-r--r-- | test/core/iomgr/endpoint_pair_test.c | 16 | ||||
-rw-r--r-- | test/core/iomgr/endpoint_tests.c | 94 | ||||
-rw-r--r-- | test/core/iomgr/fd_conservation_posix_test.c | 7 | ||||
-rw-r--r-- | test/core/iomgr/fd_posix_test.c | 124 | ||||
-rw-r--r-- | test/core/iomgr/resolve_address_test.c | 18 | ||||
-rw-r--r-- | test/core/iomgr/sockaddr_utils_test.c | 2 | ||||
-rw-r--r-- | test/core/iomgr/tcp_client_posix_test.c | 81 | ||||
-rw-r--r-- | test/core/iomgr/tcp_posix_test.c | 161 | ||||
-rw-r--r-- | test/core/iomgr/tcp_server_posix_test.c | 49 | ||||
-rw-r--r-- | test/core/iomgr/time_averaged_stats_test.c | 2 | ||||
-rw-r--r-- | test/core/iomgr/udp_server_test.c | 43 | ||||
-rw-r--r-- | test/core/iomgr/workqueue_test.c | 93 |
14 files changed, 456 insertions, 522 deletions
diff --git a/test/core/iomgr/alarm_list_test.c b/test/core/iomgr/alarm_list_test.c index 56d662e61a..6656a8fa3b 100644 --- a/test/core/iomgr/alarm_list_test.c +++ b/test/core/iomgr/alarm_list_test.c @@ -42,11 +42,8 @@ #define MAX_CB 30 static int cb_called[MAX_CB][2]; -static int kicks; -void grpc_kick_poller(void) { ++kicks; } - -static void cb(void *arg, int success) { +static void cb(grpc_exec_ctx *exec_ctx, void *arg, int success) { cb_called[(gpr_intptr)arg][success]++; } @@ -54,54 +51,59 @@ static void add_test(void) { gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME); int i; grpc_alarm alarms[20]; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_alarm_list_init(start); memset(cb_called, 0, sizeof(cb_called)); /* 10 ms alarms. will expire in the current epoch */ for (i = 0; i < 10; i++) { - grpc_alarm_init(&alarms[i], + grpc_alarm_init(&exec_ctx, &alarms[i], gpr_time_add(start, gpr_time_from_millis(10, GPR_TIMESPAN)), cb, (void *)(gpr_intptr)i, start); } /* 1010 ms alarms. will expire in the next epoch */ for (i = 10; i < 20; i++) { - grpc_alarm_init(&alarms[i], gpr_time_add(start, gpr_time_from_millis( - 1010, GPR_TIMESPAN)), - cb, (void *)(gpr_intptr)i, start); + grpc_alarm_init( + &exec_ctx, &alarms[i], + gpr_time_add(start, gpr_time_from_millis(1010, GPR_TIMESPAN)), cb, + (void *)(gpr_intptr)i, start); } /* collect alarms. Only the first batch should be ready. */ - GPR_ASSERT(10 == grpc_alarm_check(NULL, + GPR_ASSERT(10 == grpc_alarm_check(&exec_ctx, gpr_time_add(start, gpr_time_from_millis( 500, GPR_TIMESPAN)), NULL)); + grpc_exec_ctx_finish(&exec_ctx); for (i = 0; i < 20; i++) { GPR_ASSERT(cb_called[i][1] == (i < 10)); GPR_ASSERT(cb_called[i][0] == 0); } - GPR_ASSERT(0 == grpc_alarm_check( - NULL, gpr_time_add( - start, gpr_time_from_millis(600, GPR_TIMESPAN)), - NULL)); + GPR_ASSERT(0 == grpc_alarm_check(&exec_ctx, + gpr_time_add(start, gpr_time_from_millis( + 600, GPR_TIMESPAN)), + NULL)); + grpc_exec_ctx_finish(&exec_ctx); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 10)); GPR_ASSERT(cb_called[i][0] == 0); } /* collect the rest of the alarms */ - GPR_ASSERT( - 10 == grpc_alarm_check(NULL, gpr_time_add(start, gpr_time_from_millis( - 1500, GPR_TIMESPAN)), - NULL)); + GPR_ASSERT(10 == grpc_alarm_check( + &exec_ctx, gpr_time_add(start, gpr_time_from_millis( + 1500, GPR_TIMESPAN)), + NULL)); + grpc_exec_ctx_finish(&exec_ctx); for (i = 0; i < 30; i++) { GPR_ASSERT(cb_called[i][1] == (i < 20)); GPR_ASSERT(cb_called[i][0] == 0); } - GPR_ASSERT(0 == grpc_alarm_check(NULL, + GPR_ASSERT(0 == grpc_alarm_check(&exec_ctx, gpr_time_add(start, gpr_time_from_millis( 1600, GPR_TIMESPAN)), NULL)); @@ -110,7 +112,8 @@ static void add_test(void) { GPR_ASSERT(cb_called[i][0] == 0); } - grpc_alarm_list_shutdown(); + grpc_alarm_list_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); } static gpr_timespec tfm(int m) { @@ -122,28 +125,32 @@ static gpr_timespec tfm(int m) { /* Cleaning up a list with pending alarms. */ void destruction_test(void) { grpc_alarm alarms[5]; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_alarm_list_init(gpr_time_0(GPR_CLOCK_REALTIME)); memset(cb_called, 0, sizeof(cb_called)); - grpc_alarm_init(&alarms[0], tfm(100), cb, (void *)(gpr_intptr)0, + grpc_alarm_init(&exec_ctx, &alarms[0], tfm(100), cb, (void *)(gpr_intptr)0, gpr_time_0(GPR_CLOCK_REALTIME)); - grpc_alarm_init(&alarms[1], tfm(3), cb, (void *)(gpr_intptr)1, + grpc_alarm_init(&exec_ctx, &alarms[1], tfm(3), cb, (void *)(gpr_intptr)1, gpr_time_0(GPR_CLOCK_REALTIME)); - grpc_alarm_init(&alarms[2], tfm(100), cb, (void *)(gpr_intptr)2, + grpc_alarm_init(&exec_ctx, &alarms[2], tfm(100), cb, (void *)(gpr_intptr)2, gpr_time_0(GPR_CLOCK_REALTIME)); - grpc_alarm_init(&alarms[3], tfm(3), cb, (void *)(gpr_intptr)3, + grpc_alarm_init(&exec_ctx, &alarms[3], tfm(3), cb, (void *)(gpr_intptr)3, gpr_time_0(GPR_CLOCK_REALTIME)); - grpc_alarm_init(&alarms[4], tfm(1), cb, (void *)(gpr_intptr)4, + grpc_alarm_init(&exec_ctx, &alarms[4], tfm(1), cb, (void *)(gpr_intptr)4, gpr_time_0(GPR_CLOCK_REALTIME)); - GPR_ASSERT(1 == grpc_alarm_check(NULL, tfm(2), NULL)); + GPR_ASSERT(1 == grpc_alarm_check(&exec_ctx, tfm(2), NULL)); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(1 == cb_called[4][1]); - grpc_alarm_cancel(&alarms[0]); - grpc_alarm_cancel(&alarms[3]); + grpc_alarm_cancel(&exec_ctx, &alarms[0]); + grpc_alarm_cancel(&exec_ctx, &alarms[3]); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(1 == cb_called[0][0]); GPR_ASSERT(1 == cb_called[3][0]); - grpc_alarm_list_shutdown(); + grpc_alarm_list_shutdown(&exec_ctx); + grpc_exec_ctx_finish(&exec_ctx); GPR_ASSERT(1 == cb_called[1][0]); GPR_ASSERT(1 == cb_called[2][0]); } diff --git a/test/core/iomgr/alarm_test.c b/test/core/iomgr/alarm_test.c deleted file mode 100644 index 55aa517529..0000000000 --- a/test/core/iomgr/alarm_test.c +++ /dev/null @@ -1,225 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -/* Test gRPC event manager with a simple TCP upload server and client. */ -#include "src/core/iomgr/alarm.h" - -#include <ctype.h> -#include <errno.h> -#include <fcntl.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include <grpc/grpc.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/sync.h> -#include <grpc/support/time.h> -#include "test/core/util/test_config.h" - -#define SUCCESS_NOT_SET (-1) - -/* Dummy gRPC callback */ -void no_op_cb(void *arg, int success) {} - -typedef struct { - gpr_cv cv; - gpr_mu mu; - grpc_iomgr_closure *followup_closure; - int counter; - int done_success_ctr; - int done_cancel_ctr; - int done; - gpr_event fcb_arg; - int success; -} alarm_arg; - -static void followup_cb(void *arg, int success) { - gpr_event_set((gpr_event *)arg, arg); -} - -/* Called when an alarm expires. */ -static void alarm_cb(void *arg /* alarm_arg */, int success) { - alarm_arg *a = arg; - gpr_mu_lock(&a->mu); - if (success) { - a->counter++; - a->done_success_ctr++; - } else { - a->done_cancel_ctr++; - } - a->done = 1; - a->success = success; - gpr_cv_signal(&a->cv); - gpr_mu_unlock(&a->mu); - grpc_iomgr_closure_init(a->followup_closure, followup_cb, &a->fcb_arg); - grpc_iomgr_add_callback(a->followup_closure); -} - -/* Test grpc_alarm add and cancel. */ -static void test_grpc_alarm(void) { - grpc_alarm alarm; - grpc_alarm alarm_to_cancel; - /* Timeout on the alarm cond. var, so make big enough to absorb time - deviations. Otherwise, operations after wait will not be properly ordered - */ - gpr_timespec alarm_deadline; - gpr_timespec followup_deadline; - - alarm_arg arg; - alarm_arg arg2; - void *fdone; - - grpc_init(); - - arg.counter = 0; - arg.success = SUCCESS_NOT_SET; - arg.done_success_ctr = 0; - arg.done_cancel_ctr = 0; - arg.done = 0; - gpr_mu_init(&arg.mu); - gpr_cv_init(&arg.cv); - arg.followup_closure = gpr_malloc(sizeof(grpc_iomgr_closure)); - gpr_event_init(&arg.fcb_arg); - - grpc_alarm_init(&alarm, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), alarm_cb, &arg, - gpr_now(GPR_CLOCK_MONOTONIC)); - - alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); - gpr_mu_lock(&arg.mu); - while (arg.done == 0) { - if (gpr_cv_wait(&arg.cv, &arg.mu, alarm_deadline)) { - gpr_log(GPR_ERROR, "alarm deadline exceeded"); - break; - } - } - gpr_mu_unlock(&arg.mu); - - followup_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); - fdone = gpr_event_wait(&arg.fcb_arg, followup_deadline); - - if (arg.counter != 1) { - gpr_log(GPR_ERROR, "Alarm callback not called"); - GPR_ASSERT(0); - } else if (arg.done_success_ctr != 1) { - gpr_log(GPR_ERROR, "Alarm done callback not called with success"); - GPR_ASSERT(0); - } else if (arg.done_cancel_ctr != 0) { - gpr_log(GPR_ERROR, "Alarm done callback called with cancel"); - GPR_ASSERT(0); - } else if (arg.success == SUCCESS_NOT_SET) { - gpr_log(GPR_ERROR, "Alarm callback without status"); - GPR_ASSERT(0); - } else { - gpr_log(GPR_INFO, "Alarm callback called successfully"); - } - - if (fdone != (void *)&arg.fcb_arg) { - gpr_log(GPR_ERROR, "Followup callback #1 not invoked properly %p %p", fdone, - &arg.fcb_arg); - GPR_ASSERT(0); - } - gpr_cv_destroy(&arg.cv); - gpr_mu_destroy(&arg.mu); - gpr_free(arg.followup_closure); - - arg2.counter = 0; - arg2.success = SUCCESS_NOT_SET; - arg2.done_success_ctr = 0; - arg2.done_cancel_ctr = 0; - arg2.done = 0; - gpr_mu_init(&arg2.mu); - gpr_cv_init(&arg2.cv); - arg2.followup_closure = gpr_malloc(sizeof(grpc_iomgr_closure)); - - gpr_event_init(&arg2.fcb_arg); - - grpc_alarm_init(&alarm_to_cancel, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(100), - alarm_cb, &arg2, gpr_now(GPR_CLOCK_MONOTONIC)); - grpc_alarm_cancel(&alarm_to_cancel); - - alarm_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1); - gpr_mu_lock(&arg2.mu); - while (arg2.done == 0) { - gpr_cv_wait(&arg2.cv, &arg2.mu, alarm_deadline); - } - gpr_mu_unlock(&arg2.mu); - - gpr_log(GPR_INFO, "alarm done = %d", arg2.done); - - followup_deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); - fdone = gpr_event_wait(&arg2.fcb_arg, followup_deadline); - - if (arg2.counter != arg2.done_success_ctr) { - gpr_log(GPR_ERROR, "Alarm callback called but didn't lead to done success"); - GPR_ASSERT(0); - } else if (arg2.done_success_ctr && arg2.done_cancel_ctr) { - gpr_log(GPR_ERROR, "Alarm done callback called with success and cancel"); - GPR_ASSERT(0); - } else if (arg2.done_cancel_ctr + arg2.done_success_ctr != 1) { - gpr_log(GPR_ERROR, "Alarm done callback called incorrect number of times"); - GPR_ASSERT(0); - } else if (arg2.success == SUCCESS_NOT_SET) { - gpr_log(GPR_ERROR, "Alarm callback without status"); - GPR_ASSERT(0); - } else if (arg2.done_success_ctr) { - gpr_log(GPR_INFO, "Alarm callback executed before cancel"); - gpr_log(GPR_INFO, "Current value of triggered is %d\n", - alarm_to_cancel.triggered); - } else if (arg2.done_cancel_ctr) { - gpr_log(GPR_INFO, "Alarm callback canceled"); - gpr_log(GPR_INFO, "Current value of triggered is %d\n", - alarm_to_cancel.triggered); - } else { - gpr_log(GPR_ERROR, "Alarm cancel test should not be here"); - GPR_ASSERT(0); - } - - if (fdone != (void *)&arg2.fcb_arg) { - gpr_log(GPR_ERROR, "Followup callback #2 not invoked properly %p %p", fdone, - &arg2.fcb_arg); - GPR_ASSERT(0); - } - gpr_cv_destroy(&arg2.cv); - gpr_mu_destroy(&arg2.mu); - gpr_free(arg2.followup_closure); - - grpc_shutdown(); -} - -int main(int argc, char **argv) { - grpc_test_init(argc, argv); - test_grpc_alarm(); - return 0; -} diff --git a/test/core/iomgr/endpoint_pair_test.c b/test/core/iomgr/endpoint_pair_test.c index 3abde5ac35..ff590cf2d5 100644 --- a/test/core/iomgr/endpoint_pair_test.c +++ b/test/core/iomgr/endpoint_pair_test.c @@ -48,13 +48,15 @@ static void clean_up(void) {} static grpc_endpoint_test_fixture create_fixture_endpoint_pair( size_t slice_size) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_endpoint_test_fixture f; grpc_endpoint_pair p = grpc_iomgr_create_endpoint_pair("test", slice_size); f.client_ep = p.client; f.server_ep = p.server; - grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset); - grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset); + grpc_exec_ctx_finish(&exec_ctx); return f; } @@ -63,14 +65,20 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_endpoint_pair, clean_up}, }; -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); grpc_pollset_init(&g_pollset); grpc_endpoint_tests(configs[0], &g_pollset); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; diff --git a/test/core/iomgr/endpoint_tests.c b/test/core/iomgr/endpoint_tests.c index 853b9a32c2..4b72590819 100644 --- a/test/core/iomgr/endpoint_tests.c +++ b/test/core/iomgr/endpoint_tests.c @@ -122,14 +122,14 @@ struct read_and_write_test_state { int write_done; gpr_slice_buffer incoming; gpr_slice_buffer outgoing; - grpc_iomgr_closure done_read; - grpc_iomgr_closure done_write; + grpc_closure done_read; + grpc_closure done_write; }; -static void read_and_write_test_read_handler(void *data, int success) { +static void read_and_write_test_read_handler(grpc_exec_ctx *exec_ctx, + void *data, int success) { struct read_and_write_test_state *state = data; -loop: state->bytes_read += count_slices( state->incoming.slices, state->incoming.count, &state->current_read_data); if (state->bytes_read == state->target_bytes || !success) { @@ -139,56 +139,35 @@ loop: grpc_pollset_kick(g_pollset, NULL); gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); } else if (success) { - switch (grpc_endpoint_read(state->read_ep, &state->incoming, - &state->done_read)) { - case GRPC_ENDPOINT_ERROR: - success = 0; - goto loop; - case GRPC_ENDPOINT_DONE: - success = 1; - goto loop; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_read(exec_ctx, state->read_ep, &state->incoming, + &state->done_read); } } -static void read_and_write_test_write_handler(void *data, int success) { +static void read_and_write_test_write_handler(grpc_exec_ctx *exec_ctx, + void *data, int success) { struct read_and_write_test_state *state = data; gpr_slice *slices = NULL; size_t nslices; - grpc_endpoint_op_status write_status; if (success) { - for (;;) { - /* Need to do inline writes until they don't succeed synchronously or we - finish writing */ - state->bytes_written += state->current_write_size; - if (state->target_bytes - state->bytes_written < - state->current_write_size) { - state->current_write_size = state->target_bytes - state->bytes_written; - } - if (state->current_write_size == 0) { - break; - } - + state->bytes_written += state->current_write_size; + if (state->target_bytes - state->bytes_written < + state->current_write_size) { + state->current_write_size = state->target_bytes - state->bytes_written; + } + if (state->current_write_size != 0) { slices = allocate_blocks(state->current_write_size, 8192, &nslices, &state->current_write_data); gpr_slice_buffer_reset_and_unref(&state->outgoing); gpr_slice_buffer_addn(&state->outgoing, slices, nslices); - write_status = grpc_endpoint_write(state->write_ep, &state->outgoing, - &state->done_write); + grpc_endpoint_write(exec_ctx, state->write_ep, &state->outgoing, + &state->done_write); free(slices); - if (write_status == GRPC_ENDPOINT_PENDING) { - return; - } else if (write_status == GRPC_ENDPOINT_ERROR) { - goto cleanup; - } + return; } - GPR_ASSERT(state->bytes_written == state->target_bytes); } -cleanup: gpr_log(GPR_INFO, "Write handler done"); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); state->write_done = 1 + success; @@ -207,6 +186,7 @@ static void read_and_write_test(grpc_endpoint_test_config config, gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); grpc_endpoint_test_fixture f = begin_test(config, "read_and_write_test", slice_size); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_DEBUG, "num_bytes=%d write_size=%d slice_size=%d shutdown=%d", num_bytes, write_size, slice_size, shutdown); @@ -227,10 +207,9 @@ static void read_and_write_test(grpc_endpoint_test_config config, state.write_done = 0; state.current_read_data = 0; state.current_write_data = 0; - grpc_iomgr_closure_init(&state.done_read, read_and_write_test_read_handler, - &state); - grpc_iomgr_closure_init(&state.done_write, read_and_write_test_write_handler, - &state); + grpc_closure_init(&state.done_read, read_and_write_test_read_handler, &state); + grpc_closure_init(&state.done_write, read_and_write_test_write_handler, + &state); gpr_slice_buffer_init(&state.outgoing); gpr_slice_buffer_init(&state.incoming); @@ -239,41 +218,36 @@ static void read_and_write_test(grpc_endpoint_test_config config, for the first iteration as for later iterations. It does the right thing even when bytes_written is unsigned. */ state.bytes_written -= state.current_write_size; - read_and_write_test_write_handler(&state, 1); + read_and_write_test_write_handler(&exec_ctx, &state, 1); + grpc_exec_ctx_finish(&exec_ctx); - switch ( - grpc_endpoint_read(state.read_ep, &state.incoming, &state.done_read)) { - case GRPC_ENDPOINT_PENDING: - break; - case GRPC_ENDPOINT_ERROR: - read_and_write_test_read_handler(&state, 0); - break; - case GRPC_ENDPOINT_DONE: - read_and_write_test_read_handler(&state, 1); - break; - } + grpc_endpoint_read(&exec_ctx, state.read_ep, &state.incoming, + &state.done_read); if (shutdown) { gpr_log(GPR_DEBUG, "shutdown read"); - grpc_endpoint_shutdown(state.read_ep); + grpc_endpoint_shutdown(&exec_ctx, state.read_ep); gpr_log(GPR_DEBUG, "shutdown write"); - grpc_endpoint_shutdown(state.write_ep); + grpc_endpoint_shutdown(&exec_ctx, state.write_ep); } + grpc_exec_ctx_finish(&exec_ctx); gpr_mu_lock(GRPC_POLLSET_MU(g_pollset)); while (!state.read_done || !state.write_done) { grpc_pollset_worker worker; GPR_ASSERT(gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) < 0); - grpc_pollset_work(g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + grpc_pollset_work(&exec_ctx, g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline); } gpr_mu_unlock(GRPC_POLLSET_MU(g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); end_test(config); gpr_slice_buffer_destroy(&state.outgoing); gpr_slice_buffer_destroy(&state.incoming); - grpc_endpoint_destroy(state.read_ep); - grpc_endpoint_destroy(state.write_ep); + grpc_endpoint_destroy(&exec_ctx, state.read_ep); + grpc_endpoint_destroy(&exec_ctx, state.write_ep); + grpc_exec_ctx_finish(&exec_ctx); } void grpc_endpoint_tests(grpc_endpoint_test_config config, diff --git a/test/core/iomgr/fd_conservation_posix_test.c b/test/core/iomgr/fd_conservation_posix_test.c index 8327c681b8..401bf70a9e 100644 --- a/test/core/iomgr/fd_conservation_posix_test.c +++ b/test/core/iomgr/fd_conservation_posix_test.c @@ -43,6 +43,7 @@ int main(int argc, char **argv) { int i; struct rlimit rlim; grpc_endpoint_pair p; + grpc_test_init(argc, argv); grpc_iomgr_init(); @@ -53,9 +54,11 @@ int main(int argc, char **argv) { GPR_ASSERT(0 == setrlimit(RLIMIT_NOFILE, &rlim)); for (i = 0; i < 100; i++) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; p = grpc_iomgr_create_endpoint_pair("test", 1); - grpc_endpoint_destroy(p.client); - grpc_endpoint_destroy(p.server); + grpc_endpoint_destroy(&exec_ctx, p.client); + grpc_endpoint_destroy(&exec_ctx, p.server); + grpc_exec_ctx_finish(&exec_ctx); } grpc_iomgr_shutdown(); diff --git a/test/core/iomgr/fd_posix_test.c b/test/core/iomgr/fd_posix_test.c index 75959069c0..f592f63ba9 100644 --- a/test/core/iomgr/fd_posix_test.c +++ b/test/core/iomgr/fd_posix_test.c @@ -98,7 +98,7 @@ typedef struct { grpc_fd *em_fd; /* listening fd */ ssize_t read_bytes_total; /* total number of received bytes */ int done; /* set to 1 when a server finishes serving */ - grpc_iomgr_closure listen_closure; + grpc_closure listen_closure; } server; static void server_init(server *sv) { @@ -112,23 +112,23 @@ typedef struct { server *sv; /* not owned by a single session */ grpc_fd *em_fd; /* fd to read upload bytes */ char read_buf[BUF_SIZE]; /* buffer to store upload bytes */ - grpc_iomgr_closure session_read_closure; + grpc_closure session_read_closure; } session; /* Called when an upload session can be safely shutdown. Close session FD and start to shutdown listen FD. */ -static void session_shutdown_cb(void *arg, /*session*/ +static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */ int success) { session *se = arg; server *sv = se->sv; - grpc_fd_orphan(se->em_fd, NULL, "a"); + grpc_fd_orphan(exec_ctx, se->em_fd, NULL, "a"); gpr_free(se); /* Start to shutdown listen fd. */ - grpc_fd_shutdown(sv->em_fd); + grpc_fd_shutdown(exec_ctx, sv->em_fd); } /* Called when data become readable in a session. */ -static void session_read_cb(void *arg, /*session*/ +static void session_read_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */ int success) { session *se = arg; int fd = se->em_fd->fd; @@ -137,7 +137,7 @@ static void session_read_cb(void *arg, /*session*/ ssize_t read_total = 0; if (!success) { - session_shutdown_cb(arg, 1); + session_shutdown_cb(exec_ctx, arg, 1); return; } @@ -152,7 +152,7 @@ static void session_read_cb(void *arg, /*session*/ It is possible to read nothing due to spurious edge event or data has been drained, In such a case, read() returns -1 and set errno to EAGAIN. */ if (read_once == 0) { - session_shutdown_cb(arg, 1); + session_shutdown_cb(exec_ctx, arg, 1); } else if (read_once == -1) { if (errno == EAGAIN) { /* An edge triggered event is cached in the kernel until next poll. @@ -163,7 +163,7 @@ static void session_read_cb(void *arg, /*session*/ TODO(chenw): in multi-threaded version, callback and polling can be run in different threads. polling may catch a persist read edge event before notify_on_read is called. */ - grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure); } else { gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno)); abort(); @@ -173,10 +173,11 @@ static void session_read_cb(void *arg, /*session*/ /* Called when the listen FD can be safely shutdown. Close listen FD and signal that server can be shutdown. */ -static void listen_shutdown_cb(void *arg /*server*/, int success) { +static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */, + int success) { server *sv = arg; - grpc_fd_orphan(sv->em_fd, NULL, "b"); + grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, "b"); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); sv->done = 1; @@ -185,7 +186,7 @@ static void listen_shutdown_cb(void *arg /*server*/, int success) { } /* Called when a new TCP connection request arrives in the listening port. */ -static void listen_cb(void *arg, /*=sv_arg*/ +static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/ int success) { server *sv = arg; int fd; @@ -196,7 +197,7 @@ static void listen_cb(void *arg, /*=sv_arg*/ grpc_fd *listen_em_fd = sv->em_fd; if (!success) { - listen_shutdown_cb(arg, 1); + listen_shutdown_cb(exec_ctx, arg, 1); return; } @@ -208,12 +209,12 @@ static void listen_cb(void *arg, /*=sv_arg*/ se = gpr_malloc(sizeof(*se)); se->sv = sv; se->em_fd = grpc_fd_create(fd, "listener"); - grpc_pollset_add_fd(&g_pollset, se->em_fd); + grpc_pollset_add_fd(exec_ctx, &g_pollset, se->em_fd); se->session_read_closure.cb = session_read_cb; se->session_read_closure.cb_arg = se; - grpc_fd_notify_on_read(se->em_fd, &se->session_read_closure); + grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure); - grpc_fd_notify_on_read(listen_em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(exec_ctx, listen_em_fd, &sv->listen_closure); } /* Max number of connections pending to be accepted by listen(). */ @@ -223,7 +224,7 @@ static void listen_cb(void *arg, /*=sv_arg*/ listen_cb() is registered to be interested in reading from listen_fd. When connection request arrives, listen_cb() is called to accept the connection request. */ -static int server_start(server *sv) { +static int server_start(grpc_exec_ctx *exec_ctx, server *sv) { int port = 0; int fd; struct sockaddr_in sin; @@ -237,11 +238,11 @@ static int server_start(server *sv) { GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0); sv->em_fd = grpc_fd_create(fd, "server"); - grpc_pollset_add_fd(&g_pollset, sv->em_fd); + grpc_pollset_add_fd(exec_ctx, &g_pollset, sv->em_fd); /* Register to be interested in reading from listen_fd. */ sv->listen_closure.cb = listen_cb; sv->listen_closure.cb_arg = sv; - grpc_fd_notify_on_read(sv->em_fd, &sv->listen_closure); + grpc_fd_notify_on_read(exec_ctx, sv->em_fd, &sv->listen_closure); return port; } @@ -250,9 +251,14 @@ static int server_start(server *sv) { static void server_wait_and_shutdown(server *sv) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!sv->done) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -274,7 +280,7 @@ typedef struct { int client_write_cnt; int done; /* set to 1 when a client finishes sending */ - grpc_iomgr_closure write_closure; + grpc_closure write_closure; } client; static void client_init(client *cl) { @@ -285,15 +291,16 @@ static void client_init(client *cl) { } /* Called when a client upload session is ready to shutdown. */ -static void client_session_shutdown_cb(void *arg /*client*/, int success) { +static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx, + void *arg /*client */, int success) { client *cl = arg; - grpc_fd_orphan(cl->em_fd, NULL, "c"); + grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, "c"); cl->done = 1; grpc_pollset_kick(&g_pollset, NULL); } /* Write as much as possible, then register notify_on_write. */ -static void client_session_write(void *arg, /*client*/ +static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */ int success) { client *cl = arg; int fd = cl->em_fd->fd; @@ -301,7 +308,7 @@ static void client_session_write(void *arg, /*client*/ if (!success) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - client_session_shutdown_cb(arg, 1); + client_session_shutdown_cb(exec_ctx, arg, 1); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); return; } @@ -316,10 +323,10 @@ static void client_session_write(void *arg, /*client*/ if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) { cl->write_closure.cb = client_session_write; cl->write_closure.cb_arg = cl; - grpc_fd_notify_on_write(cl->em_fd, &cl->write_closure); + grpc_fd_notify_on_write(exec_ctx, cl->em_fd, &cl->write_closure); cl->client_write_cnt++; } else { - client_session_shutdown_cb(arg, 1); + client_session_shutdown_cb(exec_ctx, arg, 1); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { @@ -329,7 +336,7 @@ static void client_session_write(void *arg, /*client*/ } /* Start a client to send a stream of bytes. */ -static void client_start(client *cl, int port) { +static void client_start(grpc_exec_ctx *exec_ctx, client *cl, int port) { int fd; struct sockaddr_in sin; create_test_socket(port, &fd, &sin); @@ -350,9 +357,9 @@ static void client_start(client *cl, int port) { } cl->em_fd = grpc_fd_create(fd, "client"); - grpc_pollset_add_fd(&g_pollset, cl->em_fd); + grpc_pollset_add_fd(exec_ctx, &g_pollset, cl->em_fd); - client_session_write(cl, 1); + client_session_write(exec_ctx, cl, 1); } /* Wait for the signal to shutdown a client. */ @@ -360,8 +367,13 @@ static void client_wait_and_shutdown(client *cl) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (!cl->done) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } @@ -373,11 +385,13 @@ static void test_grpc_fd(void) { server sv; client cl; int port; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; server_init(&sv); - port = server_start(&sv); + port = server_start(&exec_ctx, &sv); client_init(&cl); - client_start(&cl, port); + client_start(&exec_ctx, &cl, port); + grpc_exec_ctx_finish(&exec_ctx); client_wait_and_shutdown(&cl); server_wait_and_shutdown(&sv); GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total); @@ -385,14 +399,15 @@ static void test_grpc_fd(void) { } typedef struct fd_change_data { - void (*cb_that_ran)(void *, int success); + void (*cb_that_ran)(grpc_exec_ctx *exec_ctx, void *, int success); } fd_change_data; void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; } void destroy_change_data(fd_change_data *fdc) {} -static void first_read_callback(void *arg /* fd_change_data */, int success) { +static void first_read_callback(grpc_exec_ctx *exec_ctx, + void *arg /* fd_change_data */, int success) { fd_change_data *fdc = arg; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -401,7 +416,8 @@ static void first_read_callback(void *arg /* fd_change_data */, int success) { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } -static void second_read_callback(void *arg /* fd_change_data */, int success) { +static void second_read_callback(grpc_exec_ctx *exec_ctx, + void *arg /* fd_change_data */, int success) { fd_change_data *fdc = arg; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -421,8 +437,9 @@ static void test_grpc_fd_change(void) { int sv[2]; char data; ssize_t result; - grpc_iomgr_closure first_closure; - grpc_iomgr_closure second_closure; + grpc_closure first_closure; + grpc_closure second_closure; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; first_closure.cb = first_read_callback; first_closure.cb_arg = &a; @@ -439,10 +456,10 @@ static void test_grpc_fd_change(void) { GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change"); - grpc_pollset_add_fd(&g_pollset, em_fd); + grpc_pollset_add_fd(&exec_ctx, &g_pollset, em_fd); /* Register the first callback, then make its FD readable */ - grpc_fd_notify_on_read(em_fd, &first_closure); + grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -451,8 +468,12 @@ static void test_grpc_fd_change(void) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (a.cb_that_ran == NULL) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(a.cb_that_ran == first_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -463,7 +484,7 @@ static void test_grpc_fd_change(void) { /* Now register a second callback with distinct change data, and do the same thing again. */ - grpc_fd_notify_on_read(em_fd, &second_closure); + grpc_fd_notify_on_read(&exec_ctx, em_fd, &second_closure); data = 0; result = write(sv[1], &data, 1); GPR_ASSERT(result == 1); @@ -471,28 +492,39 @@ static void test_grpc_fd_change(void) { gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (b.cb_that_ran == NULL) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), gpr_inf_future(GPR_CLOCK_MONOTONIC)); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } /* Except now we verify that second_read_callback ran instead */ GPR_ASSERT(b.cb_that_ran == second_read_callback); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_fd_orphan(em_fd, NULL, "d"); + grpc_fd_orphan(&exec_ctx, em_fd, NULL, "d"); + grpc_exec_ctx_finish(&exec_ctx); destroy_change_data(&a); destroy_change_data(&b); close(sv[1]); } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_iomgr_init(); grpc_pollset_init(&g_pollset); test_grpc_fd(); test_grpc_fd_change(); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/resolve_address_test.c b/test/core/iomgr/resolve_address_test.c index 668c5399f9..1c95a9960e 100644 --- a/test/core/iomgr/resolve_address_test.c +++ b/test/core/iomgr/resolve_address_test.c @@ -42,16 +42,18 @@ static gpr_timespec test_deadline(void) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(100); } -static void must_succeed(void* evp, grpc_resolved_addresses* p) { +static void must_succeed(grpc_exec_ctx *exec_ctx, void *evp, + grpc_resolved_addresses *p) { GPR_ASSERT(p); GPR_ASSERT(p->naddrs >= 1); grpc_resolved_addresses_destroy(p); - gpr_event_set(evp, (void*)1); + gpr_event_set(evp, (void *)1); } -static void must_fail(void* evp, grpc_resolved_addresses* p) { +static void must_fail(grpc_exec_ctx *exec_ctx, void *evp, + grpc_resolved_addresses *p) { GPR_ASSERT(!p); - gpr_event_set(evp, (void*)1); + gpr_event_set(evp, (void *)1); } static void test_localhost(void) { @@ -83,7 +85,7 @@ static void test_ipv6_with_port(void) { } static void test_ipv6_without_port(void) { - const char* const kCases[] = { + const char *const kCases[] = { "2001:db8::1", "2001:db8::1.2.3.4", "[2001:db8::1]", }; unsigned i; @@ -96,7 +98,7 @@ static void test_ipv6_without_port(void) { } static void test_invalid_ip_addresses(void) { - const char* const kCases[] = { + const char *const kCases[] = { "293.283.1238.3:1", "[2001:db8::11111]:1", }; unsigned i; @@ -109,7 +111,7 @@ static void test_invalid_ip_addresses(void) { } static void test_unparseable_hostports(void) { - const char* const kCases[] = { + const char *const kCases[] = { "[", "[::1", "[::1]bad", "[1.2.3.4]", "[localhost]", "[localhost]:1", }; unsigned i; @@ -121,7 +123,7 @@ static void test_unparseable_hostports(void) { } } -int main(int argc, char** argv) { +int main(int argc, char **argv) { grpc_test_init(argc, argv); grpc_iomgr_init(); test_localhost(); diff --git a/test/core/iomgr/sockaddr_utils_test.c b/test/core/iomgr/sockaddr_utils_test.c index 72a0f71835..5009a641ea 100644 --- a/test/core/iomgr/sockaddr_utils_test.c +++ b/test/core/iomgr/sockaddr_utils_test.c @@ -63,9 +63,11 @@ static struct sockaddr_in6 make_addr6(const gpr_uint8 *data, size_t data_len) { static const gpr_uint8 kMapped[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, 192, 0, 2, 1}; + static const gpr_uint8 kNotQuiteMapped[] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xfe, 192, 0, 2, 99}; static const gpr_uint8 kIPv4[] = {192, 0, 2, 1}; + static const gpr_uint8 kIPv6[] = {0x20, 0x01, 0x0d, 0xb8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}; diff --git a/test/core/iomgr/tcp_client_posix_test.c b/test/core/iomgr/tcp_client_posix_test.c index f0e2de24d9..a61cccdb02 100644 --- a/test/core/iomgr/tcp_client_posix_test.c +++ b/test/core/iomgr/tcp_client_posix_test.c @@ -50,6 +50,7 @@ static grpc_pollset_set g_pollset_set; static grpc_pollset g_pollset; static int g_connections_complete = 0; +static grpc_endpoint *g_connecting = NULL; static gpr_timespec test_deadline(void) { return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10); @@ -62,15 +63,18 @@ static void finish_connection() { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } -static void must_succeed(void *arg, grpc_endpoint *tcp) { - GPR_ASSERT(tcp); - grpc_endpoint_shutdown(tcp); - grpc_endpoint_destroy(tcp); +static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg, int success) { + GPR_ASSERT(g_connecting != NULL); + GPR_ASSERT(success); + grpc_endpoint_shutdown(exec_ctx, g_connecting); + grpc_endpoint_destroy(exec_ctx, g_connecting); + g_connecting = NULL; finish_connection(); } -static void must_fail(void *arg, grpc_endpoint *tcp) { - GPR_ASSERT(!tcp); +static void must_fail(grpc_exec_ctx *exec_ctx, void *arg, int success) { + GPR_ASSERT(g_connecting == NULL); + GPR_ASSERT(!success); finish_connection(); } @@ -80,6 +84,8 @@ void test_succeeds(void) { int svr_fd; int r; int connections_complete_before; + grpc_closure done; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_DEBUG, "test_succeeds"); @@ -98,7 +104,8 @@ void test_succeeds(void) { /* connect to it */ GPR_ASSERT(getsockname(svr_fd, (struct sockaddr *)&addr, &addr_len) == 0); - grpc_tcp_client_connect(must_succeed, NULL, &g_pollset_set, + grpc_closure_init(&done, must_succeed, NULL); + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set, (struct sockaddr *)&addr, addr_len, gpr_inf_future(GPR_CLOCK_REALTIME)); @@ -114,8 +121,12 @@ void test_succeeds(void) { while (g_connections_complete == connections_complete_before) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5)); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -125,6 +136,8 @@ void test_fails(void) { struct sockaddr_in addr; socklen_t addr_len = sizeof(addr); int connections_complete_before; + grpc_closure done; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_DEBUG, "test_fails"); @@ -136,7 +149,8 @@ void test_fails(void) { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); /* connect to a broken address */ - grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set, + grpc_closure_init(&done, must_fail, NULL); + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set, (struct sockaddr *)&addr, addr_len, gpr_inf_future(GPR_CLOCK_REALTIME)); @@ -145,8 +159,11 @@ void test_fails(void) { /* wait for the connection callback to finish */ while (g_connections_complete == connections_complete_before) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - test_deadline()); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), test_deadline()); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -162,6 +179,8 @@ void test_times_out(void) { int r; int connections_complete_before; gpr_timespec connect_deadline; + grpc_closure done; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_DEBUG, "test_times_out"); @@ -195,7 +214,8 @@ void test_times_out(void) { connections_complete_before = g_connections_complete; gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_tcp_client_connect(must_fail, NULL, &g_pollset_set, + grpc_closure_init(&done, must_fail, NULL); + grpc_tcp_client_connect(&exec_ctx, &done, &g_connecting, &g_pollset_set, (struct sockaddr *)&addr, addr_len, connect_deadline); /* Make sure the event doesn't trigger early */ @@ -203,25 +223,31 @@ void test_times_out(void) { for (;;) { grpc_pollset_worker worker; gpr_timespec now = gpr_now(connect_deadline.clock_type); - gpr_timespec continue_verifying_time = gpr_time_from_seconds(2, GPR_TIMESPAN); - gpr_timespec grace_time = gpr_time_from_seconds(1, GPR_TIMESPAN); - gpr_timespec finish_time = gpr_time_add(connect_deadline, continue_verifying_time); - gpr_timespec restart_verifying_time = gpr_time_add(connect_deadline, grace_time); + gpr_timespec continue_verifying_time = + gpr_time_from_seconds(5, GPR_TIMESPAN); + gpr_timespec grace_time = gpr_time_from_seconds(3, GPR_TIMESPAN); + gpr_timespec finish_time = + gpr_time_add(connect_deadline, continue_verifying_time); + gpr_timespec restart_verifying_time = + gpr_time_add(connect_deadline, grace_time); int is_after_deadline = gpr_time_cmp(now, connect_deadline) > 0; if (gpr_time_cmp(now, finish_time) > 0) { break; } - gpr_log(GPR_DEBUG, "now=%d.%09d connect_deadline=%d.%09d", - now.tv_sec, now.tv_nsec, connect_deadline.tv_sec, connect_deadline.tv_nsec); - if (is_after_deadline && - gpr_time_cmp(now, restart_verifying_time) <= 0) { + gpr_log(GPR_DEBUG, "now=%d.%09d connect_deadline=%d.%09d", now.tv_sec, + now.tv_nsec, connect_deadline.tv_sec, connect_deadline.tv_nsec); + if (is_after_deadline && gpr_time_cmp(now, restart_verifying_time) <= 0) { /* allow some slack before insisting that things be done */ } else { GPR_ASSERT(g_connections_complete == connections_complete_before + is_after_deadline); } - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); @@ -231,20 +257,27 @@ void test_times_out(void) { } } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); grpc_pollset_set_init(&g_pollset_set); grpc_pollset_init(&g_pollset); - grpc_pollset_set_add_pollset(&g_pollset_set, &g_pollset); + grpc_pollset_set_add_pollset(&exec_ctx, &g_pollset_set, &g_pollset); + grpc_exec_ctx_finish(&exec_ctx); test_succeeds(); gpr_log(GPR_ERROR, "End of first test"); test_fails(); test_times_out(); grpc_pollset_set_destroy(&g_pollset_set); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; } diff --git a/test/core/iomgr/tcp_posix_test.c b/test/core/iomgr/tcp_posix_test.c index 59c498edff..f676454b7f 100644 --- a/test/core/iomgr/tcp_posix_test.c +++ b/test/core/iomgr/tcp_posix_test.c @@ -119,7 +119,7 @@ struct read_socket_state { size_t read_bytes; size_t target_read_bytes; gpr_slice_buffer incoming; - grpc_iomgr_closure read_cb; + grpc_closure read_cb; }; static size_t count_slices(gpr_slice *slices, size_t nslices, @@ -138,7 +138,7 @@ static size_t count_slices(gpr_slice *slices, size_t nslices, return num_bytes; } -static void read_cb(void *user_data, int success) { +static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, int success) { struct read_socket_state *state = (struct read_socket_state *)user_data; size_t read_bytes; int current_data; @@ -155,19 +155,8 @@ static void read_cb(void *user_data, int success) { if (state->read_bytes >= state->target_read_bytes) { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } else { - switch (grpc_endpoint_read(state->ep, &state->incoming, &state->read_cb)) { - case GRPC_ENDPOINT_DONE: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - read_cb(user_data, 1); - break; - case GRPC_ENDPOINT_ERROR: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - read_cb(user_data, 0); - break; - case GRPC_ENDPOINT_PENDING: - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - break; - } + grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); } } @@ -178,6 +167,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { struct read_socket_state state; size_t written_bytes; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "Read test of size %d, slice size %d", num_bytes, slice_size); @@ -185,7 +175,7 @@ static void read_test(size_t num_bytes, size_t slice_size) { create_sockets(sv); ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test"); - grpc_endpoint_add_to_pollset(ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); written_bytes = fill_socket_partial(sv[0], num_bytes); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -194,30 +184,25 @@ static void read_test(size_t num_bytes, size_t slice_size) { state.read_bytes = 0; state.target_read_bytes = written_bytes; gpr_slice_buffer_init(&state.incoming); - grpc_iomgr_closure_init(&state.read_cb, read_cb, &state); + grpc_closure_init(&state.read_cb, read_cb, &state); - switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { - case GRPC_ENDPOINT_DONE: - read_cb(&state, 1); - break; - case GRPC_ENDPOINT_ERROR: - read_cb(&state, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&state.incoming); - grpc_endpoint_destroy(ep); + grpc_endpoint_destroy(&exec_ctx, ep); + grpc_exec_ctx_finish(&exec_ctx); } /* Write to a socket until it fills up, then read from it using the grpc_tcp @@ -228,6 +213,7 @@ static void large_read_test(size_t slice_size) { struct read_socket_state state; ssize_t written_bytes; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "Start large read test, slice size %d", slice_size); @@ -235,7 +221,7 @@ static void large_read_test(size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size, "test"); - grpc_endpoint_add_to_pollset(ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); written_bytes = fill_socket(sv[0]); gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes); @@ -244,30 +230,25 @@ static void large_read_test(size_t slice_size) { state.read_bytes = 0; state.target_read_bytes = (size_t)written_bytes; gpr_slice_buffer_init(&state.incoming); - grpc_iomgr_closure_init(&state.read_cb, read_cb, &state); + grpc_closure_init(&state.read_cb, read_cb, &state); - switch (grpc_endpoint_read(ep, &state.incoming, &state.read_cb)) { - case GRPC_ENDPOINT_DONE: - read_cb(&state, 1); - break; - case GRPC_ENDPOINT_ERROR: - read_cb(&state, 0); - break; - case GRPC_ENDPOINT_PENDING: - break; - } + grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); while (state.read_bytes < state.target_read_bytes) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(state.read_bytes == state.target_read_bytes); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&state.incoming); - grpc_endpoint_destroy(ep); + grpc_endpoint_destroy(&exec_ctx, ep); + grpc_exec_ctx_finish(&exec_ctx); } struct write_socket_state { @@ -298,7 +279,8 @@ static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size, return slices; } -static void write_done(void *user_data /* write_socket_state */, int success) { +static void write_done(grpc_exec_ctx *exec_ctx, + void *user_data /* write_socket_state */, int success) { struct write_socket_state *state = (struct write_socket_state *)user_data; gpr_log(GPR_INFO, "Write done callback called"); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -315,6 +297,7 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { int flags; int current = 0; int i; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; flags = fcntl(fd, F_GETFL, 0); GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0); @@ -322,9 +305,11 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { for (;;) { grpc_pollset_worker worker; gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10)); gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); do { bytes_read = read(fd, buf, bytes_left > read_size ? read_size : bytes_left); @@ -343,26 +328,6 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) { gpr_free(buf); } -static ssize_t drain_socket(int fd) { - ssize_t read_bytes; - ssize_t total_bytes = 0; - unsigned char buf[256]; - int current = 0; - int i; - do { - read_bytes = read(fd, buf, 256); - if (read_bytes > 0) { - total_bytes += read_bytes; - for (i = 0; i < read_bytes; ++i) { - GPR_ASSERT(buf[i] == current); - current = (current + 1) % 256; - } - } - } while (read_bytes >= 0 || errno == EINTR); - GPR_ASSERT(errno == EAGAIN); - return total_bytes; -} - /* Write to a socket using the grpc_tcp API, then drain it directly. Note that if the write does not complete immediately we need to drain the socket in parallel with the read. */ @@ -370,13 +335,13 @@ static void write_test(size_t num_bytes, size_t slice_size) { int sv[2]; grpc_endpoint *ep; struct write_socket_state state; - ssize_t read_bytes; size_t num_blocks; gpr_slice *slices; gpr_uint8 current_data = 0; gpr_slice_buffer outgoing; - grpc_iomgr_closure write_done_closure; + grpc_closure write_done_closure; gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20); + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes, slice_size); @@ -385,7 +350,7 @@ static void write_test(size_t num_bytes, size_t slice_size) { ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"), GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test"); - grpc_endpoint_add_to_pollset(ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset); state.ep = ep; state.write_done = 0; @@ -394,35 +359,28 @@ static void write_test(size_t num_bytes, size_t slice_size) { gpr_slice_buffer_init(&outgoing); gpr_slice_buffer_addn(&outgoing, slices, num_blocks); - grpc_iomgr_closure_init(&write_done_closure, write_done, &state); + grpc_closure_init(&write_done_closure, write_done, &state); - switch (grpc_endpoint_write(ep, &outgoing, &write_done_closure)) { - case GRPC_ENDPOINT_DONE: - /* Write completed immediately */ - read_bytes = drain_socket(sv[0]); - GPR_ASSERT((size_t)read_bytes == num_bytes); - break; - case GRPC_ENDPOINT_PENDING: - drain_socket_blocking(sv[0], num_bytes, num_bytes); - gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); - for (;;) { - grpc_pollset_worker worker; - if (state.write_done) { - break; - } - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); - } - gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure); + drain_socket_blocking(sv[0], num_bytes, num_bytes); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + for (;;) { + grpc_pollset_worker worker; + if (state.write_done) { break; - case GRPC_ENDPOINT_ERROR: - gpr_log(GPR_ERROR, "endpoint got error"); - abort(); + } + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); gpr_slice_buffer_destroy(&outgoing); - grpc_endpoint_destroy(ep); + grpc_endpoint_destroy(&exec_ctx, ep); gpr_free(slices); + grpc_exec_ctx_finish(&exec_ctx); } void run_tests(void) { @@ -452,14 +410,17 @@ static grpc_endpoint_test_fixture create_fixture_tcp_socketpair( size_t slice_size) { int sv[2]; grpc_endpoint_test_fixture f; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; create_sockets(sv); f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"), slice_size, "test"); f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"), slice_size, "test"); - grpc_endpoint_add_to_pollset(f.client_ep, &g_pollset); - grpc_endpoint_add_to_pollset(f.server_ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset); + grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset); + + grpc_exec_ctx_finish(&exec_ctx); return f; } @@ -468,15 +429,21 @@ static grpc_endpoint_test_config configs[] = { {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up}, }; -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_init(); grpc_pollset_init(&g_pollset); run_tests(); grpc_endpoint_tests(configs[0], &g_pollset); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_shutdown(); return 0; diff --git a/test/core/iomgr/tcp_server_posix_test.c b/test/core/iomgr/tcp_server_posix_test.c index 29a20cba8e..530381e37f 100644 --- a/test/core/iomgr/tcp_server_posix_test.c +++ b/test/core/iomgr/tcp_server_posix_test.c @@ -48,9 +48,9 @@ static grpc_pollset g_pollset; static int g_nconnects = 0; -static void on_connect(void *arg, grpc_endpoint *tcp) { - grpc_endpoint_shutdown(tcp); - grpc_endpoint_destroy(tcp); +static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp) { + grpc_endpoint_shutdown(exec_ctx, tcp); + grpc_endpoint_destroy(exec_ctx, tcp); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); g_nconnects++; @@ -59,18 +59,23 @@ static void on_connect(void *arg, grpc_endpoint *tcp) { } static void test_no_op(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server *s = grpc_tcp_server_create(); - grpc_tcp_server_destroy(s, NULL, NULL); + grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_start(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST("test_no_op_with_start"); - grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); - grpc_tcp_server_destroy(s, NULL, NULL); + grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); + grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_in addr; grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST("test_no_op_with_port"); @@ -80,10 +85,12 @@ static void test_no_op_with_port(void) { GPR_ASSERT( grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); - grpc_tcp_server_destroy(s, NULL, NULL); + grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port_and_start(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_in addr; grpc_tcp_server *s = grpc_tcp_server_create(); LOG_TEST("test_no_op_with_port_and_start"); @@ -93,12 +100,14 @@ static void test_no_op_with_port_and_start(void) { GPR_ASSERT( grpc_tcp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr))); - grpc_tcp_server_start(s, NULL, 0, on_connect, NULL); + grpc_tcp_server_start(&exec_ctx, s, NULL, 0, on_connect, NULL); - grpc_tcp_server_destroy(s, NULL, NULL); + grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } static void test_connect(int n) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_storage addr; socklen_t addr_len = sizeof(addr); int svrfd, clifd; @@ -120,7 +129,7 @@ static void test_connect(int n) { GPR_ASSERT(addr_len <= sizeof(addr)); pollsets[0] = &g_pollset; - grpc_tcp_server_start(s, pollsets, 1, on_connect, NULL); + grpc_tcp_server_start(&exec_ctx, s, pollsets, 1, on_connect, NULL); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -137,8 +146,11 @@ static void test_connect(int n) { while (g_nconnects == nconnects_before && gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } gpr_log(GPR_DEBUG, "wait done"); @@ -148,12 +160,17 @@ static void test_connect(int n) { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_tcp_server_destroy(s, NULL, NULL); + grpc_tcp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_iomgr_init(); grpc_pollset_init(&g_pollset); @@ -165,7 +182,9 @@ int main(int argc, char **argv) { test_connect(1); test_connect(10); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/time_averaged_stats_test.c b/test/core/iomgr/time_averaged_stats_test.c index 4206a1c58f..cb006d152a 100644 --- a/test/core/iomgr/time_averaged_stats_test.c +++ b/test/core/iomgr/time_averaged_stats_test.c @@ -189,7 +189,7 @@ static void some_regress_some_persist_test(void) { grpc_time_averaged_stats_add_sample(&tas, 4944.32); grpc_time_averaged_stats_update_average(&tas); /* (1 * 4944.32 + 0.6 * 2.384 * 2200 + 0.4 * 1000) / - (1 + 0.6 * 2.384 + 0.4) */ + (1 + 0.6 * 2.384 + 0.4) */ EXPECT_DOUBLE_EQ(3000, tas.aggregate_weighted_avg); EXPECT_DOUBLE_EQ(2.8304, tas.aggregate_total_weight); } diff --git a/test/core/iomgr/udp_server_test.c b/test/core/iomgr/udp_server_test.c index 24b6b1c717..d56500707c 100644 --- a/test/core/iomgr/udp_server_test.c +++ b/test/core/iomgr/udp_server_test.c @@ -64,18 +64,23 @@ static void on_read(int fd) { } static void test_no_op(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_udp_server *s = grpc_udp_server_create(); - grpc_udp_server_destroy(s, NULL, NULL); + grpc_udp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_start(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_udp_server *s = grpc_udp_server_create(); LOG_TEST("test_no_op_with_start"); - grpc_udp_server_start(s, NULL, 0); - grpc_udp_server_destroy(s, NULL, NULL); + grpc_udp_server_start(&exec_ctx, s, NULL, 0); + grpc_udp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_in addr; grpc_udp_server *s = grpc_udp_server_create(); LOG_TEST("test_no_op_with_port"); @@ -85,10 +90,12 @@ static void test_no_op_with_port(void) { GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr), on_read)); - grpc_udp_server_destroy(s, NULL, NULL); + grpc_udp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } static void test_no_op_with_port_and_start(void) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_in addr; grpc_udp_server *s = grpc_udp_server_create(); LOG_TEST("test_no_op_with_port_and_start"); @@ -98,12 +105,14 @@ static void test_no_op_with_port_and_start(void) { GPR_ASSERT(grpc_udp_server_add_port(s, (struct sockaddr *)&addr, sizeof(addr), on_read)); - grpc_udp_server_start(s, NULL, 0); + grpc_udp_server_start(&exec_ctx, s, NULL, 0); - grpc_udp_server_destroy(s, NULL, NULL); + grpc_udp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } static void test_receive(int number_of_clients) { + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; struct sockaddr_storage addr; socklen_t addr_len = sizeof(addr); int clifd, svrfd; @@ -128,7 +137,7 @@ static void test_receive(int number_of_clients) { GPR_ASSERT(addr_len <= sizeof(addr)); pollsets[0] = &g_pollset; - grpc_udp_server_start(s, pollsets, 1); + grpc_udp_server_start(&exec_ctx, s, pollsets, 1); gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); @@ -144,8 +153,11 @@ static void test_receive(int number_of_clients) { while (g_number_of_reads == number_of_reads_before && gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { grpc_pollset_worker worker; - grpc_pollset_work(&g_pollset, &worker, gpr_now(GPR_CLOCK_MONOTONIC), - deadline); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(GPR_CLOCK_MONOTONIC), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); } GPR_ASSERT(g_number_of_reads == number_of_reads_before + 1); close(clifd); @@ -154,12 +166,17 @@ static void test_receive(int number_of_clients) { gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); - grpc_udp_server_destroy(s, NULL, NULL); + grpc_udp_server_destroy(&exec_ctx, s, NULL); + grpc_exec_ctx_finish(&exec_ctx); } -static void destroy_pollset(void *p) { grpc_pollset_destroy(p); } +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { + grpc_pollset_destroy(p); +} int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; grpc_test_init(argc, argv); grpc_iomgr_init(); grpc_pollset_init(&g_pollset); @@ -171,7 +188,9 @@ int main(int argc, char **argv) { test_receive(1); test_receive(10); - grpc_pollset_shutdown(&g_pollset, destroy_pollset, &g_pollset); + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); grpc_iomgr_shutdown(); return 0; } diff --git a/test/core/iomgr/workqueue_test.c b/test/core/iomgr/workqueue_test.c new file mode 100644 index 0000000000..90f7ba7a83 --- /dev/null +++ b/test/core/iomgr/workqueue_test.c @@ -0,0 +1,93 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "src/core/iomgr/workqueue.h" + +#include <grpc/grpc.h> +#include <grpc/support/log.h> + +#include "test/core/util/test_config.h" + +static grpc_pollset g_pollset; + +static void must_succeed(grpc_exec_ctx *exec_ctx, void *p, int success) { + GPR_ASSERT(success == 1); + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + *(int *)p = 1; + grpc_pollset_kick(&g_pollset, NULL); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); +} + +static void test_add_closure(void) { + grpc_closure c; + int done = 0; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_workqueue *wq = grpc_workqueue_create(&exec_ctx); + gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5); + grpc_pollset_worker worker; + grpc_closure_init(&c, must_succeed, &done); + + grpc_workqueue_push(wq, &c, 1); + grpc_workqueue_add_to_pollset(&exec_ctx, wq, &g_pollset); + + gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset)); + GPR_ASSERT(!done); + grpc_pollset_work(&exec_ctx, &g_pollset, &worker, + gpr_now(deadline.clock_type), deadline); + gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset)); + grpc_exec_ctx_finish(&exec_ctx); + GPR_ASSERT(done); + + GRPC_WORKQUEUE_UNREF(&exec_ctx, wq, "destroy"); + grpc_exec_ctx_finish(&exec_ctx); +} + +static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) { + grpc_pollset_destroy(p); +} + +int main(int argc, char **argv) { + grpc_closure destroyed; + grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT; + grpc_test_init(argc, argv); + grpc_init(); + grpc_pollset_init(&g_pollset); + + test_add_closure(); + + grpc_closure_init(&destroyed, destroy_pollset, &g_pollset); + grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed); + grpc_exec_ctx_finish(&exec_ctx); + grpc_shutdown(); + return 0; +} |