diff options
author | Craig Tiller <ctiller@google.com> | 2016-04-13 20:15:13 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2016-04-13 20:15:13 -0700 |
commit | f224c0c1fe2abd039f81f11c01793b4068350529 (patch) | |
tree | 832f373983d32b3f48311e5d57f88bd27514bff6 | |
parent | d0b2523ebebd6b025836dbabf7eaed4a1423eba3 (diff) |
Continuing connection pipeline
-rw-r--r-- | test/core/end2end/fuzzers/api_fuzzer.c | 107 |
1 files changed, 74 insertions, 33 deletions
diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c index 1ba64eb58f..c00f2427ba 100644 --- a/test/core/end2end/fuzzers/api_fuzzer.c +++ b/test/core/end2end/fuzzers/api_fuzzer.c @@ -129,6 +129,8 @@ static bool is_eof(input_stream *inp) { return inp->cur == inp->end; } // global state static gpr_timespec g_now; +static grpc_server *g_server; +static grpc_channel *g_channel; extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type); @@ -152,8 +154,6 @@ static void finish_resolve(grpc_exec_ctx *exec_ctx, void *arg, bool success) { addr_req *r = arg; if (0 == strcmp(r->addr, "server")) { - wait_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_seconds(1, GPR_TIMESPAN))); grpc_resolved_addresses *addrs = gpr_malloc(sizeof(*addrs)); addrs->naddrs = 1; addrs->addrs = gpr_malloc(sizeof(*addrs->addrs)); @@ -189,12 +189,48 @@ extern void (*grpc_tcp_client_connect_impl)( grpc_pollset_set *interested_parties, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline); +static void sched_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, gpr_timespec deadline); + +typedef struct { + grpc_timer timer; + grpc_closure *closure; + grpc_endpoint **ep; + gpr_timespec deadline; +} future_connect; + +static void do_connect(grpc_exec_ctx *exec_ctx, void *arg, bool success) { + future_connect *fc = arg; + if (g_server) { + abort(); + } else { + sched_connect(exec_ctx, fc->closure, fc->ep, fc->deadline); + } + gpr_free(fc); +} + +static void sched_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, gpr_timespec deadline) { + if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) { + *ep = NULL; + grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL); + return; + } + + future_connect *fc = gpr_malloc(sizeof(*fc)); + fc->closure = closure; + fc->ep = ep; + fc->deadline = deadline; + grpc_timer_init(exec_ctx, &fc->timer, + gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), + gpr_time_from_millis(1, GPR_TIMESPAN)), + do_connect, fc, gpr_now(GPR_CLOCK_MONOTONIC)); +} + static void my_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, grpc_pollset_set *interested_parties, const struct sockaddr *addr, size_t addr_len, gpr_timespec deadline) { - abort(); + sched_connect(exec_ctx, closure, ep, deadline); } //////////////////////////////////////////////////////////////////////////////// @@ -215,27 +251,28 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { gpr_now_impl = now_impl; grpc_init(); - grpc_channel *channel = NULL; - grpc_server *server = NULL; + GPR_ASSERT(g_channel == NULL); + GPR_ASSERT(g_server == NULL); + bool server_shutdown = false; int pending_server_shutdowns = 0; grpc_completion_queue *cq = grpc_completion_queue_create(NULL); - while (!is_eof(&inp) || channel != NULL || server != NULL) { + while (!is_eof(&inp) || g_channel != NULL || g_server != NULL) { if (is_eof(&inp)) { - if (channel != NULL) { - grpc_channel_destroy(channel); - channel = NULL; + if (g_channel != NULL) { + grpc_channel_destroy(g_channel); + g_channel = NULL; } - if (server != NULL) { + if (g_server != NULL) { if (!server_shutdown) { - grpc_server_shutdown_and_notify(server, cq, tag(SERVER_SHUTDOWN)); + grpc_server_shutdown_and_notify(g_server, cq, tag(SERVER_SHUTDOWN)); server_shutdown = true; pending_server_shutdowns++; } else if (pending_server_shutdowns == 0) { - grpc_server_destroy(server); - server = NULL; + grpc_server_destroy(g_server); + g_server = NULL; } } @@ -274,13 +311,13 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { } // create an insecure channel case 2: { - if (channel == NULL) { + if (g_channel == NULL) { char *target = read_string(&inp); char *target_uri; gpr_asprintf(&target_uri, "dns:%s", target); grpc_channel_args *args = read_args(&inp); - channel = grpc_insecure_channel_create(target_uri, args, NULL); - GPR_ASSERT(channel != NULL); + g_channel = grpc_insecure_channel_create(target_uri, args, NULL); + GPR_ASSERT(g_channel != NULL); grpc_channel_args_destroy(args); gpr_free(target_uri); gpr_free(target); @@ -289,29 +326,29 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { } // destroy a channel case 3: { - if (channel != NULL) { - grpc_channel_destroy(channel); - channel = NULL; + if (g_channel != NULL) { + grpc_channel_destroy(g_channel); + g_channel = NULL; } break; } // bring up a server case 4: { - if (server == NULL) { + if (g_server == NULL) { grpc_channel_args *args = read_args(&inp); - server = grpc_server_create(args, NULL); - GPR_ASSERT(server != NULL); + g_server = grpc_server_create(args, NULL); + GPR_ASSERT(g_server != NULL); grpc_channel_args_destroy(args); - grpc_server_register_completion_queue(server, cq, NULL); - grpc_server_start(server); + grpc_server_register_completion_queue(g_server, cq, NULL); + grpc_server_start(g_server); server_shutdown = false; GPR_ASSERT(pending_server_shutdowns == 0); } } // begin server shutdown case 5: { - if (server != NULL) { - grpc_server_shutdown_and_notify(server, cq, tag(SERVER_SHUTDOWN)); + if (g_server != NULL) { + grpc_server_shutdown_and_notify(g_server, cq, tag(SERVER_SHUTDOWN)); pending_server_shutdowns++; server_shutdown = true; } @@ -319,30 +356,34 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { } // cancel all calls if shutdown case 6: { - if (server != NULL && server_shutdown) { - grpc_server_cancel_all_calls(server); + if (g_server != NULL && server_shutdown) { + grpc_server_cancel_all_calls(g_server); } break; } // destroy server case 7: { - if (server != NULL && server_shutdown && + if (g_server != NULL && server_shutdown && pending_server_shutdowns == 0) { - grpc_server_destroy(server); - server = NULL; + grpc_server_destroy(g_server); + g_server = NULL; } break; } // check connectivity case 8: { - if (channel != NULL) { - grpc_channel_check_connectivity_state(channel, next_byte(&inp) > 127); + if (g_channel != NULL) { + grpc_channel_check_connectivity_state(g_channel, + next_byte(&inp) > 127); } break; } } } + GPR_ASSERT(g_channel == NULL); + GPR_ASSERT(g_server == NULL); + grpc_completion_queue_shutdown(cq); GPR_ASSERT( grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), NULL) |