aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-11-04 13:14:40 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-11-04 13:14:40 -0800
commit8f7794d8dbe3dcb0ad79731d63ca42465b081d86 (patch)
treed9c93411924c889445ea18a6e3524ddae881c0b9
parentd8e265aae5f35da4c9dac160e839a0a2036a1906 (diff)
Robustness work for lb_policies_test
-rw-r--r--test/core/client_config/lb_policies_test.c116
1 files changed, 58 insertions, 58 deletions
diff --git a/test/core/client_config/lb_policies_test.c b/test/core/client_config/lb_policies_test.c
index 68e991716b..281d2b2ee1 100644
--- a/test/core/client_config/lb_policies_test.c
+++ b/test/core/client_config/lb_policies_test.c
@@ -119,14 +119,15 @@ static void test_spec_destroy(test_spec *spec) {
static void *tag(gpr_intptr t) { return (void *)t; }
-static gpr_timespec n_seconds_time(int n) {
- return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
+static gpr_timespec n_millis_time(int n) {
+ return gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_millis(n, GPR_TIMESPAN));
}
static void drain_cq(grpc_completion_queue *cq) {
grpc_event ev;
do {
- ev = grpc_completion_queue_next(cq, n_seconds_time(5), NULL);
+ ev = grpc_completion_queue_next(cq, n_millis_time(5000), NULL);
} while (ev.type != GRPC_QUEUE_SHUTDOWN);
}
@@ -134,29 +135,47 @@ static void kill_server(const servers_fixture *f, size_t i) {
gpr_log(GPR_INFO, "KILLING SERVER %d", i);
GPR_ASSERT(f->servers[i] != NULL);
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
- GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
- NULL).type == GRPC_OP_COMPLETE);
+ GPR_ASSERT(
+ grpc_completion_queue_pluck(f->cq, tag(10000), n_millis_time(5000), NULL)
+ .type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
f->servers[i] = NULL;
}
-static void revive_server(const servers_fixture *f, size_t i) {
+typedef struct request_data {
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ char *details;
+ size_t details_capacity;
+ grpc_status_code status;
+ grpc_call_details *call_details;
+} request_data;
+
+static void revive_server(const servers_fixture *f, request_data *rdata,
+ size_t i) {
int got_port;
gpr_log(GPR_INFO, "RAISE AGAIN SERVER %d", i);
GPR_ASSERT(f->servers[i] == NULL);
+
+ gpr_log(GPR_DEBUG, "revive: %s", f->servers_hostports[i]);
+
f->servers[i] = grpc_server_create(NULL, NULL);
grpc_server_register_completion_queue(f->servers[i], f->cq, NULL);
GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port(
f->servers[i], f->servers_hostports[i])) > 0);
grpc_server_start(f->servers[i]);
+
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_call(f->servers[i], &f->server_calls[i],
+ &rdata->call_details[i],
+ &f->request_metadata_recv[i], f->cq,
+ f->cq, tag(1000 + (int)i)));
}
static servers_fixture *setup_servers(const char *server_host,
+ request_data *rdata,
const size_t num_servers) {
servers_fixture *f = gpr_malloc(sizeof(servers_fixture));
- int *ports;
- int got_port;
size_t i;
f->num_servers = num_servers;
@@ -164,23 +183,16 @@ static servers_fixture *setup_servers(const char *server_host,
f->request_metadata_recv =
gpr_malloc(sizeof(grpc_metadata_array) * num_servers);
/* Create servers. */
- ports = gpr_malloc(sizeof(int *) * num_servers);
f->servers = gpr_malloc(sizeof(grpc_server *) * num_servers);
f->servers_hostports = gpr_malloc(sizeof(char *) * num_servers);
f->cq = grpc_completion_queue_create(NULL);
for (i = 0; i < num_servers; i++) {
- ports[i] = grpc_pick_unused_port_or_die();
-
- gpr_join_host_port(&f->servers_hostports[i], server_host, ports[i]);
-
- f->servers[i] = grpc_server_create(NULL, NULL);
- grpc_server_register_completion_queue(f->servers[i], f->cq, NULL);
- GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port(
- f->servers[i], f->servers_hostports[i])) > 0);
- GPR_ASSERT(ports[i] == got_port);
- grpc_server_start(f->servers[i]);
+ grpc_metadata_array_init(&f->request_metadata_recv[i]);
+ gpr_join_host_port(&f->servers_hostports[i], server_host,
+ grpc_pick_unused_port_or_die());
+ f->servers[i] = 0;
+ revive_server(f, rdata, i);
}
- gpr_free(ports);
return f;
}
@@ -191,8 +203,8 @@ static void teardown_servers(servers_fixture *f) {
if (f->servers[i] == NULL) continue;
grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
- GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
- NULL).type == GRPC_OP_COMPLETE);
+ n_millis_time(5000), NULL)
+ .type == GRPC_OP_COMPLETE);
grpc_server_destroy(f->servers[i]);
}
grpc_completion_queue_shutdown(f->cq);
@@ -203,6 +215,7 @@ static void teardown_servers(servers_fixture *f) {
for (i = 0; i < f->num_servers; i++) {
gpr_free(f->servers_hostports[i]);
+ grpc_metadata_array_destroy(&f->request_metadata_recv[i]);
}
gpr_free(f->servers_hostports);
@@ -211,15 +224,6 @@ static void teardown_servers(servers_fixture *f) {
gpr_free(f);
}
-typedef struct request_data {
- grpc_metadata_array initial_metadata_recv;
- grpc_metadata_array trailing_metadata_recv;
- char *details;
- size_t details_capacity;
- grpc_status_code status;
- grpc_call_details *call_details;
-} request_data;
-
/** Returns connection sequence (server indices), which must be freed */
int *perform_request(servers_fixture *f, grpc_channel *client,
request_data *rdata, const test_spec *spec) {
@@ -237,7 +241,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
int completed_client;
s_valid = gpr_malloc(sizeof(int) * f->num_servers);
- rdata->call_details = gpr_malloc(sizeof(grpc_call_details) * f->num_servers);
connection_sequence = gpr_malloc(sizeof(int) * spec->num_iters);
/* Send a trivial request. */
@@ -253,7 +256,7 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
kill_server(f, i);
} else if (spec->revive_at[iter_num][i] != 0) {
/* killing takes precedence */
- revive_server(f, i);
+ revive_server(f, rdata, i);
}
}
@@ -266,7 +269,7 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
}
memset(s_valid, 0, f->num_servers * sizeof(int));
- deadline = n_seconds_time(1);
+ deadline = n_millis_time(1000);
c = grpc_channel_create_call(client, NULL, GRPC_PROPAGATE_DEFAULTS, f->cq,
"/foo", "foo.test.google.fr", deadline, NULL);
GPR_ASSERT(c);
@@ -300,22 +303,9 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
GPR_ASSERT(GRPC_CALL_OK ==
grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL));
- /* "listen" on all servers */
- for (i = 0; i < f->num_servers; i++) {
- grpc_metadata_array_init(&f->request_metadata_recv[i]);
- if (f->servers[i] != NULL) {
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_server_request_call(f->servers[i], &f->server_calls[i],
- &rdata->call_details[i],
- &f->request_metadata_recv[i], f->cq,
- f->cq, tag(1000 + (int)i)));
- }
- }
-
s_idx = -1;
- while ((ev = grpc_completion_queue_next(
- f->cq, GRPC_TIMEOUT_MILLIS_TO_DEADLINE(300), NULL)).type !=
- GRPC_QUEUE_TIMEOUT) {
+ while ((ev = grpc_completion_queue_next(f->cq, n_millis_time(300), NULL))
+ .type != GRPC_QUEUE_TIMEOUT) {
GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
read_tag = ((int)(gpr_intptr)ev.tag);
gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d",
@@ -327,11 +317,14 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
s_valid[s_idx] = 1;
connection_sequence[iter_num] = s_idx;
} else if (read_tag == 1) {
+ gpr_log(GPR_DEBUG, "client timed out");
GPR_ASSERT(ev.success);
completed_client = 1;
}
}
+ gpr_log(GPR_DEBUG, "s_idx=%d", s_idx);
+
if (s_idx >= 0) {
op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
@@ -361,12 +354,22 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
}
cq_verify(cqv);
+ gpr_log(GPR_DEBUG, "status=%d; %s", rdata->status, rdata->details);
GPR_ASSERT(rdata->status == GRPC_STATUS_UNIMPLEMENTED);
GPR_ASSERT(0 == strcmp(rdata->details, "xyz"));
GPR_ASSERT(0 == strcmp(rdata->call_details[s_idx].method, "/foo"));
GPR_ASSERT(0 ==
strcmp(rdata->call_details[s_idx].host, "foo.test.google.fr"));
GPR_ASSERT(was_cancelled == 1);
+
+ grpc_call_destroy(f->server_calls[s_idx]);
+
+ /* ask for the next request on this server */
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
+ f->servers[s_idx], &f->server_calls[s_idx],
+ &rdata->call_details[s_idx],
+ &f->request_metadata_recv[s_idx], f->cq,
+ f->cq, tag(1000 + (int)s_idx)));
} else {
if (!completed_client) {
cq_expect_completion(cqv, tag(1), 1);
@@ -374,12 +377,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
}
}
- for (i = 0; i < f->num_servers; i++) {
- if (s_valid[i] != 0) {
- grpc_call_destroy(f->server_calls[i]);
- }
- grpc_metadata_array_destroy(&f->request_metadata_recv[i]);
- }
grpc_metadata_array_destroy(&rdata->initial_metadata_recv);
grpc_metadata_array_destroy(&rdata->trailing_metadata_recv);
@@ -393,7 +390,6 @@ int *perform_request(servers_fixture *f, grpc_channel *client,
gpr_free(rdata->details);
}
- gpr_free(rdata->call_details);
gpr_free(s_valid);
return connection_sequence;
@@ -456,7 +452,10 @@ void run_spec(const test_spec *spec) {
char *servers_hostports_str;
int *actual_connection_sequence;
request_data rdata;
- servers_fixture *f = setup_servers("127.0.0.1", spec->num_servers);
+ servers_fixture *f;
+ rdata.call_details =
+ gpr_malloc(sizeof(grpc_call_details) * spec->num_servers);
+ f = setup_servers("127.0.0.1", &rdata, spec->num_servers);
/* Create client. */
servers_hostports_str = gpr_strjoin_sep((const char **)f->servers_hostports,
@@ -475,6 +474,7 @@ void run_spec(const test_spec *spec) {
gpr_free(client_hostport);
gpr_free(servers_hostports_str);
gpr_free(actual_connection_sequence);
+ gpr_free(rdata.call_details);
grpc_channel_destroy(client);
teardown_servers(f);