aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2016-04-13 20:15:13 -0700
committerGravatar Craig Tiller <ctiller@google.com>2016-04-13 20:15:13 -0700
commitf224c0c1fe2abd039f81f11c01793b4068350529 (patch)
tree832f373983d32b3f48311e5d57f88bd27514bff6
parentd0b2523ebebd6b025836dbabf7eaed4a1423eba3 (diff)
Continuing connection pipeline
-rw-r--r--test/core/end2end/fuzzers/api_fuzzer.c107
1 files changed, 74 insertions, 33 deletions
diff --git a/test/core/end2end/fuzzers/api_fuzzer.c b/test/core/end2end/fuzzers/api_fuzzer.c
index 1ba64eb58f..c00f2427ba 100644
--- a/test/core/end2end/fuzzers/api_fuzzer.c
+++ b/test/core/end2end/fuzzers/api_fuzzer.c
@@ -129,6 +129,8 @@ static bool is_eof(input_stream *inp) { return inp->cur == inp->end; }
// global state
static gpr_timespec g_now;
+static grpc_server *g_server;
+static grpc_channel *g_channel;
extern gpr_timespec (*gpr_now_impl)(gpr_clock_type clock_type);
@@ -152,8 +154,6 @@ static void finish_resolve(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
addr_req *r = arg;
if (0 == strcmp(r->addr, "server")) {
- wait_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_seconds(1, GPR_TIMESPAN)));
grpc_resolved_addresses *addrs = gpr_malloc(sizeof(*addrs));
addrs->naddrs = 1;
addrs->addrs = gpr_malloc(sizeof(*addrs->addrs));
@@ -189,12 +189,48 @@ extern void (*grpc_tcp_client_connect_impl)(
grpc_pollset_set *interested_parties, const struct sockaddr *addr,
size_t addr_len, gpr_timespec deadline);
+static void sched_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, gpr_timespec deadline);
+
+typedef struct {
+ grpc_timer timer;
+ grpc_closure *closure;
+ grpc_endpoint **ep;
+ gpr_timespec deadline;
+} future_connect;
+
+static void do_connect(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
+ future_connect *fc = arg;
+ if (g_server) {
+ abort();
+ } else {
+ sched_connect(exec_ctx, fc->closure, fc->ep, fc->deadline);
+ }
+ gpr_free(fc);
+}
+
+static void sched_connect(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_endpoint **ep, gpr_timespec deadline) {
+ if (gpr_time_cmp(deadline, gpr_now(deadline.clock_type)) > 0) {
+ *ep = NULL;
+ grpc_exec_ctx_enqueue(exec_ctx, closure, false, NULL);
+ return;
+ }
+
+ future_connect *fc = gpr_malloc(sizeof(*fc));
+ fc->closure = closure;
+ fc->ep = ep;
+ fc->deadline = deadline;
+ grpc_timer_init(exec_ctx, &fc->timer,
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(1, GPR_TIMESPAN)),
+ do_connect, fc, gpr_now(GPR_CLOCK_MONOTONIC));
+}
+
static void my_tcp_client_connect(grpc_exec_ctx *exec_ctx,
grpc_closure *closure, grpc_endpoint **ep,
grpc_pollset_set *interested_parties,
const struct sockaddr *addr, size_t addr_len,
gpr_timespec deadline) {
- abort();
+ sched_connect(exec_ctx, closure, ep, deadline);
}
////////////////////////////////////////////////////////////////////////////////
@@ -215,27 +251,28 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
gpr_now_impl = now_impl;
grpc_init();
- grpc_channel *channel = NULL;
- grpc_server *server = NULL;
+ GPR_ASSERT(g_channel == NULL);
+ GPR_ASSERT(g_server == NULL);
+
bool server_shutdown = false;
int pending_server_shutdowns = 0;
grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
- while (!is_eof(&inp) || channel != NULL || server != NULL) {
+ while (!is_eof(&inp) || g_channel != NULL || g_server != NULL) {
if (is_eof(&inp)) {
- if (channel != NULL) {
- grpc_channel_destroy(channel);
- channel = NULL;
+ if (g_channel != NULL) {
+ grpc_channel_destroy(g_channel);
+ g_channel = NULL;
}
- if (server != NULL) {
+ if (g_server != NULL) {
if (!server_shutdown) {
- grpc_server_shutdown_and_notify(server, cq, tag(SERVER_SHUTDOWN));
+ grpc_server_shutdown_and_notify(g_server, cq, tag(SERVER_SHUTDOWN));
server_shutdown = true;
pending_server_shutdowns++;
} else if (pending_server_shutdowns == 0) {
- grpc_server_destroy(server);
- server = NULL;
+ grpc_server_destroy(g_server);
+ g_server = NULL;
}
}
@@ -274,13 +311,13 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
}
// create an insecure channel
case 2: {
- if (channel == NULL) {
+ if (g_channel == NULL) {
char *target = read_string(&inp);
char *target_uri;
gpr_asprintf(&target_uri, "dns:%s", target);
grpc_channel_args *args = read_args(&inp);
- channel = grpc_insecure_channel_create(target_uri, args, NULL);
- GPR_ASSERT(channel != NULL);
+ g_channel = grpc_insecure_channel_create(target_uri, args, NULL);
+ GPR_ASSERT(g_channel != NULL);
grpc_channel_args_destroy(args);
gpr_free(target_uri);
gpr_free(target);
@@ -289,29 +326,29 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
}
// destroy a channel
case 3: {
- if (channel != NULL) {
- grpc_channel_destroy(channel);
- channel = NULL;
+ if (g_channel != NULL) {
+ grpc_channel_destroy(g_channel);
+ g_channel = NULL;
}
break;
}
// bring up a server
case 4: {
- if (server == NULL) {
+ if (g_server == NULL) {
grpc_channel_args *args = read_args(&inp);
- server = grpc_server_create(args, NULL);
- GPR_ASSERT(server != NULL);
+ g_server = grpc_server_create(args, NULL);
+ GPR_ASSERT(g_server != NULL);
grpc_channel_args_destroy(args);
- grpc_server_register_completion_queue(server, cq, NULL);
- grpc_server_start(server);
+ grpc_server_register_completion_queue(g_server, cq, NULL);
+ grpc_server_start(g_server);
server_shutdown = false;
GPR_ASSERT(pending_server_shutdowns == 0);
}
}
// begin server shutdown
case 5: {
- if (server != NULL) {
- grpc_server_shutdown_and_notify(server, cq, tag(SERVER_SHUTDOWN));
+ if (g_server != NULL) {
+ grpc_server_shutdown_and_notify(g_server, cq, tag(SERVER_SHUTDOWN));
pending_server_shutdowns++;
server_shutdown = true;
}
@@ -319,30 +356,34 @@ int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) {
}
// cancel all calls if shutdown
case 6: {
- if (server != NULL && server_shutdown) {
- grpc_server_cancel_all_calls(server);
+ if (g_server != NULL && server_shutdown) {
+ grpc_server_cancel_all_calls(g_server);
}
break;
}
// destroy server
case 7: {
- if (server != NULL && server_shutdown &&
+ if (g_server != NULL && server_shutdown &&
pending_server_shutdowns == 0) {
- grpc_server_destroy(server);
- server = NULL;
+ grpc_server_destroy(g_server);
+ g_server = NULL;
}
break;
}
// check connectivity
case 8: {
- if (channel != NULL) {
- grpc_channel_check_connectivity_state(channel, next_byte(&inp) > 127);
+ if (g_channel != NULL) {
+ grpc_channel_check_connectivity_state(g_channel,
+ next_byte(&inp) > 127);
}
break;
}
}
}
+ GPR_ASSERT(g_channel == NULL);
+ GPR_ASSERT(g_server == NULL);
+
grpc_completion_queue_shutdown(cq);
GPR_ASSERT(
grpc_completion_queue_next(cq, gpr_inf_past(GPR_CLOCK_REALTIME), NULL)