aboutsummaryrefslogtreecommitdiffhomepage
path: root/test
diff options
context:
space:
mode:
authorGravatar ncteisen <ncteisen@gmail.com>2018-08-28 12:53:57 -0700
committerGravatar ncteisen <ncteisen@gmail.com>2018-08-28 12:53:57 -0700
commit4b5b019d5644affef122e06c6898811286850b8d (patch)
tree73c601dc31c436e3d800a846d08b9ace7ff05235 /test
parentb8f030bc0b507903e9d156fb44d161015273d0c6 (diff)
parentee65a5eaee45a765d1d4176a797bb8288faeb180 (diff)
Merge branch 'master' of https://github.com/grpc/grpc into channelz-subchannels
Diffstat (limited to 'test')
-rw-r--r--test/core/bad_client/bad_client.cc2
-rw-r--r--test/core/channel/minimal_stack_is_minimal_test.cc16
-rw-r--r--test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc117
-rw-r--r--test/core/client_channel/resolvers/fake_resolver_test.cc58
-rw-r--r--test/core/end2end/BUILD13
-rw-r--r--test/core/end2end/bad_server_response_test.cc2
-rwxr-xr-xtest/core/end2end/end2end_test.sh2
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.cc10
-rw-r--r--test/core/end2end/inproc_callback_test.cc498
-rw-r--r--test/core/end2end/tests/filter_call_init_fails.cc15
-rw-r--r--test/core/end2end/tests/filter_causes_close.cc5
-rw-r--r--test/core/end2end/tests/filter_latency.cc19
-rw-r--r--test/core/end2end/tests/filter_status_code.cc19
-rw-r--r--test/core/iomgr/BUILD13
-rw-r--r--test/core/iomgr/buffer_list_test.cc111
-rw-r--r--test/core/iomgr/endpoint_tests.cc7
-rw-r--r--test/core/iomgr/ios/CFStreamTests/Podfile1
-rw-r--r--test/core/iomgr/resource_quota_test.cc97
-rw-r--r--test/core/iomgr/tcp_posix_test.cc109
-rw-r--r--test/core/iomgr/timer_list_test.cc115
-rw-r--r--test/core/security/credentials_test.cc4
-rw-r--r--test/core/surface/completion_queue_test.cc80
-rw-r--r--test/core/surface/public_headers_must_be_c89.c3
-rw-r--r--test/core/tsi/ssl_transport_security_test.cc6
-rw-r--r--test/core/util/mock_endpoint.cc2
-rw-r--r--test/core/util/passthru_endpoint.cc2
-rw-r--r--test/core/util/trickle_endpoint.cc5
-rw-r--r--test/cpp/common/channel_filter_test.cc3
-rw-r--r--test/cpp/end2end/client_lb_end2end_test.cc136
-rw-r--r--test/cpp/end2end/filter_end2end_test.cc3
-rw-r--r--test/cpp/end2end/grpclb_end2end_test.cc19
-rw-r--r--test/cpp/end2end/health_service_end2end_test.cc76
-rw-r--r--test/cpp/end2end/thread_stress_test.cc117
-rw-r--r--test/cpp/microbenchmarks/bm_chttp2_transport.cc2
-rw-r--r--test/cpp/microbenchmarks/bm_cq_multiple_threads.cc83
-rw-r--r--test/cpp/qps/client.h45
-rw-r--r--test/cpp/qps/driver.cc6
-rw-r--r--test/cpp/qps/driver.h3
-rw-r--r--test/cpp/qps/histogram.h5
-rw-r--r--test/cpp/qps/inproc_sync_unary_ping_pong_test.cc2
-rw-r--r--test/cpp/qps/qps_json_driver.cc19
-rw-r--r--test/cpp/qps/qps_openloop_test.cc2
-rw-r--r--test/cpp/qps/secure_sync_unary_ping_pong_test.cc2
-rw-r--r--test/cpp/thread_manager/thread_manager_test.cc153
-rw-r--r--test/cpp/util/BUILD18
45 files changed, 1632 insertions, 393 deletions
diff --git a/test/core/bad_client/bad_client.cc b/test/core/bad_client/bad_client.cc
index c03ebcf409..ade23133c5 100644
--- a/test/core/bad_client/bad_client.cc
+++ b/test/core/bad_client/bad_client.cc
@@ -115,7 +115,7 @@ void grpc_run_client_side_validator(grpc_bad_client_arg* arg, uint32_t flags,
grpc_schedule_on_exec_ctx);
/* Write data */
- grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure);
+ grpc_endpoint_write(sfd->client, &outgoing, &done_write_closure, nullptr);
grpc_core::ExecCtx::Get()->Flush();
/* Await completion, unless the request is large and write may not finish
diff --git a/test/core/channel/minimal_stack_is_minimal_test.cc b/test/core/channel/minimal_stack_is_minimal_test.cc
index 5b651ed39b..e5953acedc 100644
--- a/test/core/channel/minimal_stack_is_minimal_test.cc
+++ b/test/core/channel/minimal_stack_is_minimal_test.cc
@@ -85,21 +85,21 @@ int main(int argc, char** argv) {
// tests with a default stack
errors +=
- CHECK_STACK("unknown", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "deadline",
- "authority", "message_size", "connected", NULL);
+ CHECK_STACK("unknown", nullptr, GRPC_CLIENT_DIRECT_CHANNEL, "authority",
+ "message_size", "deadline", "connected", NULL);
errors += CHECK_STACK("unknown", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority",
"message_size", "connected", NULL);
errors += CHECK_STACK("unknown", nullptr, GRPC_SERVER_CHANNEL, "server",
- "deadline", "message_size", "connected", NULL);
+ "message_size", "deadline", "connected", NULL);
errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_DIRECT_CHANNEL,
- "deadline", "authority", "message_size",
- "message_compress", "http-client", "connected", NULL);
+ "authority", "message_size", "deadline", "http-client",
+ "message_compress", "connected", NULL);
errors += CHECK_STACK("chttp2", nullptr, GRPC_CLIENT_SUBCHANNEL, "authority",
- "message_size", "message_compress", "http-client",
+ "message_size", "http-client", "message_compress",
"connected", NULL);
errors += CHECK_STACK("chttp2", nullptr, GRPC_SERVER_CHANNEL, "server",
- "deadline", "message_size", "message_compress",
- "http-server", "connected", NULL);
+ "message_size", "deadline", "http-server",
+ "message_compress", "connected", NULL);
errors += CHECK_STACK(nullptr, nullptr, GRPC_CLIENT_CHANNEL, "client-channel",
NULL);
diff --git a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
index b1f3a1c08a..1a7db40f59 100644
--- a/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
+++ b/test/core/client_channel/resolvers/dns_resolver_cooldown_test.cc
@@ -28,12 +28,16 @@
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "test/core/util/test_config.h"
+constexpr int kMinResolutionPeriodMs = 1000;
+// Provide some slack when checking intervals, to allow for test timing issues.
+constexpr int kMinResolutionPeriodForCheckMs = 900;
+
extern grpc_address_resolver_vtable* grpc_resolve_address_impl;
static grpc_address_resolver_vtable* default_resolve_address;
static grpc_combiner* g_combiner;
-grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
+static grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
@@ -43,7 +47,7 @@ grpc_ares_request* (*g_default_dns_lookup_ares_locked)(
// times a system-level resolution has happened.
static int g_resolution_count;
-struct iomgr_args {
+static struct iomgr_args {
gpr_event ev;
gpr_atm done_atm;
gpr_mu* mu;
@@ -61,6 +65,16 @@ static void test_resolve_address_impl(const char* name,
default_resolve_address->resolve_address(
name, default_port, g_iomgr_args.pollset_set, on_done, addrs);
++g_resolution_count;
+ static grpc_millis last_resolution_time = 0;
+ if (last_resolution_time == 0) {
+ last_resolution_time =
+ grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
+ } else {
+ grpc_millis now =
+ grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
+ GPR_ASSERT(now - last_resolution_time >= kMinResolutionPeriodForCheckMs);
+ last_resolution_time = now;
+ }
}
static grpc_error* test_blocking_resolve_address_impl(
@@ -73,7 +87,7 @@ static grpc_error* test_blocking_resolve_address_impl(
static grpc_address_resolver_vtable test_resolver = {
test_resolve_address_impl, test_blocking_resolve_address_impl};
-grpc_ares_request* test_dns_lookup_ares_locked(
+static grpc_ares_request* test_dns_lookup_ares_locked(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
@@ -82,6 +96,16 @@ grpc_ares_request* test_dns_lookup_ares_locked(
dns_server, name, default_port, g_iomgr_args.pollset_set, on_done, addrs,
check_grpclb, service_config_json, combiner);
++g_resolution_count;
+ static grpc_millis last_resolution_time = 0;
+ if (last_resolution_time == 0) {
+ last_resolution_time =
+ grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
+ } else {
+ grpc_millis now =
+ grpc_timespec_to_millis_round_up(gpr_now(GPR_CLOCK_MONOTONIC));
+ GPR_ASSERT(now - last_resolution_time >= kMinResolutionPeriodForCheckMs);
+ last_resolution_time = now;
+ }
return result;
}
@@ -91,7 +115,7 @@ static gpr_timespec test_deadline(void) {
static void do_nothing(void* arg, grpc_error* error) {}
-void iomgr_args_init(iomgr_args* args) {
+static void iomgr_args_init(iomgr_args* args) {
gpr_event_init(&args->ev);
args->pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
grpc_pollset_init(args->pollset, &args->mu);
@@ -100,7 +124,7 @@ void iomgr_args_init(iomgr_args* args) {
gpr_atm_rel_store(&args->done_atm, 0);
}
-void iomgr_args_finish(iomgr_args* args) {
+static void iomgr_args_finish(iomgr_args* args) {
GPR_ASSERT(gpr_event_wait(&args->ev, test_deadline()));
grpc_pollset_set_del_pollset(args->pollset_set, args->pollset);
grpc_pollset_set_destroy(args->pollset_set);
@@ -146,29 +170,19 @@ struct OnResolutionCallbackArg {
const char* uri_str = nullptr;
grpc_core::OrphanablePtr<grpc_core::Resolver> resolver;
grpc_channel_args* result = nullptr;
- grpc_millis delay_before_second_resolution = 0;
};
-// Counter for the number of times a resolution notification callback has been
-// invoked.
-static int g_on_resolution_invocations_count;
-
// Set to true by the last callback in the resolution chain.
-bool g_all_callbacks_invoked;
+static bool g_all_callbacks_invoked;
-void on_fourth_resolution(void* arg, grpc_error* error) {
+static void on_second_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
- ++g_on_resolution_invocations_count;
- gpr_log(GPR_INFO,
- "4th: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
- g_on_resolution_invocations_count, g_resolution_count);
- // In this case we expect to have incurred in another system-level resolution
- // because on_third_resolution slept for longer than the min resolution
- // period.
- GPR_ASSERT(g_on_resolution_invocations_count == 4);
- GPR_ASSERT(g_resolution_count == 3);
+ gpr_log(GPR_INFO, "2nd: g_resolution_count: %d", g_resolution_count);
+ // The resolution callback was not invoked until new data was
+ // available, which was delayed until after the cooldown period.
+ GPR_ASSERT(g_resolution_count == 2);
cb_arg->resolver.reset();
gpr_atm_rel_store(&g_iomgr_args.done_atm, 1);
gpr_mu_lock(g_iomgr_args.mu);
@@ -179,67 +193,13 @@ void on_fourth_resolution(void* arg, grpc_error* error) {
g_all_callbacks_invoked = true;
}
-void on_third_resolution(void* arg, grpc_error* error) {
- OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
- grpc_channel_args_destroy(cb_arg->result);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- ++g_on_resolution_invocations_count;
- gpr_log(GPR_INFO,
- "3rd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
- g_on_resolution_invocations_count, g_resolution_count);
- // The timer set because of the previous re-resolution request fires, so a new
- // system-level resolution happened.
- GPR_ASSERT(g_on_resolution_invocations_count == 3);
- GPR_ASSERT(g_resolution_count == 2);
- grpc_core::ExecCtx::Get()->TestOnlySetNow(
- cb_arg->delay_before_second_resolution * 2);
- cb_arg->resolver->NextLocked(
- &cb_arg->result,
- GRPC_CLOSURE_CREATE(on_fourth_resolution, arg,
- grpc_combiner_scheduler(g_combiner)));
- cb_arg->resolver->RequestReresolutionLocked();
- gpr_mu_lock(g_iomgr_args.mu);
- GRPC_LOG_IF_ERROR("pollset_kick",
- grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
- gpr_mu_unlock(g_iomgr_args.mu);
-}
-
-void on_second_resolution(void* arg, grpc_error* error) {
- OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
- grpc_channel_args_destroy(cb_arg->result);
- GPR_ASSERT(error == GRPC_ERROR_NONE);
- ++g_on_resolution_invocations_count;
- gpr_log(GPR_INFO,
- "2nd: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
- g_on_resolution_invocations_count, g_resolution_count);
- // The resolution request for which this function is the callback happened
- // before the min resolution period. Therefore, no new system-level
- // resolutions happened, as indicated by g_resolution_count. But a resolution
- // timer was set to fire when the cooldown finishes.
- GPR_ASSERT(g_on_resolution_invocations_count == 2);
- GPR_ASSERT(g_resolution_count == 1);
- // Register a new callback to capture the timer firing.
- cb_arg->resolver->NextLocked(
- &cb_arg->result,
- GRPC_CLOSURE_CREATE(on_third_resolution, arg,
- grpc_combiner_scheduler(g_combiner)));
- gpr_mu_lock(g_iomgr_args.mu);
- GRPC_LOG_IF_ERROR("pollset_kick",
- grpc_pollset_kick(g_iomgr_args.pollset, nullptr));
- gpr_mu_unlock(g_iomgr_args.mu);
-}
-
-void on_first_resolution(void* arg, grpc_error* error) {
+static void on_first_resolution(void* arg, grpc_error* error) {
OnResolutionCallbackArg* cb_arg = static_cast<OnResolutionCallbackArg*>(arg);
grpc_channel_args_destroy(cb_arg->result);
GPR_ASSERT(error == GRPC_ERROR_NONE);
- ++g_on_resolution_invocations_count;
- gpr_log(GPR_INFO,
- "1st: g_on_resolution_invocations_count: %d, g_resolution_count: %d",
- g_on_resolution_invocations_count, g_resolution_count);
+ gpr_log(GPR_INFO, "1st: g_resolution_count: %d", g_resolution_count);
// There's one initial system-level resolution and one invocation of a
// notification callback (the current function).
- GPR_ASSERT(g_on_resolution_invocations_count == 1);
GPR_ASSERT(g_resolution_count == 1);
cb_arg->resolver->NextLocked(
&cb_arg->result,
@@ -265,9 +225,7 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
grpc_core::ResolverArgs args;
args.uri = uri;
args.combiner = g_combiner;
- g_on_resolution_invocations_count = 0;
g_resolution_count = 0;
- constexpr int kMinResolutionPeriodMs = 1000;
grpc_arg cooldown_arg;
cooldown_arg.key =
@@ -280,7 +238,6 @@ static void start_test_under_combiner(void* arg, grpc_error* error) {
res_cb_arg->resolver = factory->CreateResolver(args);
grpc_channel_args_destroy(cooldown_channel_args);
GPR_ASSERT(res_cb_arg->resolver != nullptr);
- res_cb_arg->delay_before_second_resolution = kMinResolutionPeriodMs;
// First resolution, would incur in system-level resolution.
res_cb_arg->resolver->NextLocked(
&res_cb_arg->result,
diff --git a/test/core/client_channel/resolvers/fake_resolver_test.cc b/test/core/client_channel/resolvers/fake_resolver_test.cc
index 14caa3ea5d..f6696bf127 100644
--- a/test/core/client_channel/resolvers/fake_resolver_test.cc
+++ b/test/core/client_channel/resolvers/fake_resolver_test.cc
@@ -124,8 +124,8 @@ static void test_fake_resolver() {
build_fake_resolver(combiner, response_generator.get());
GPR_ASSERT(resolver.get() != nullptr);
// Test 1: normal resolution.
- // next_results != NULL, reresolution_results == NULL, last_used_results ==
- // NULL. Expected response is next_results.
+ // next_results != NULL, reresolution_results == NULL.
+ // Expected response is next_results.
grpc_channel_args* results = create_new_resolver_result();
on_resolution_arg on_res_arg = create_on_resolution_arg(results);
grpc_closure* on_resolution = GRPC_CLOSURE_CREATE(
@@ -137,10 +137,9 @@ static void test_fake_resolver() {
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
// Test 2: update resolution.
- // next_results != NULL, reresolution_results == NULL, last_used_results !=
- // NULL. Expected response is next_results.
+ // next_results != NULL, reresolution_results == NULL.
+ // Expected response is next_results.
results = create_new_resolver_result();
- grpc_channel_args* last_used_results = grpc_channel_args_copy(results);
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
@@ -150,21 +149,9 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
- // Test 3: fallback re-resolution.
- // next_results == NULL, reresolution_results == NULL, last_used_results !=
- // NULL. Expected response is last_used_results.
- on_res_arg = create_on_resolution_arg(last_used_results);
- on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
- grpc_combiner_scheduler(combiner));
- resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
- // Trigger a re-resolution.
- resolver->RequestReresolutionLocked();
- grpc_core::ExecCtx::Get()->Flush();
- GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
- grpc_timeout_seconds_to_deadline(5)) != nullptr);
- // Test 4: normal re-resolution.
- // next_results == NULL, reresolution_results != NULL, last_used_results !=
- // NULL. Expected response is reresolution_results.
+ // Test 3: normal re-resolution.
+ // next_results == NULL, reresolution_results != NULL.
+ // Expected response is reresolution_results.
grpc_channel_args* reresolution_results = create_new_resolver_result();
on_res_arg =
create_on_resolution_arg(grpc_channel_args_copy(reresolution_results));
@@ -180,9 +167,9 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
- // Test 5: repeat re-resolution.
- // next_results == NULL, reresolution_results != NULL, last_used_results !=
- // NULL. Expected response is reresolution_results.
+ // Test 4: repeat re-resolution.
+ // next_results == NULL, reresolution_results != NULL.
+ // Expected response is reresolution_results.
on_res_arg = create_on_resolution_arg(reresolution_results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
@@ -192,11 +179,10 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
- // Test 6: normal resolution.
- // next_results != NULL, reresolution_results != NULL, last_used_results !=
- // NULL. Expected response is next_results.
+ // Test 5: normal resolution.
+ // next_results != NULL, reresolution_results != NULL.
+ // Expected response is next_results.
results = create_new_resolver_result();
- last_used_results = grpc_channel_args_copy(results);
on_res_arg = create_on_resolution_arg(results);
on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
grpc_combiner_scheduler(combiner));
@@ -206,23 +192,7 @@ static void test_fake_resolver() {
grpc_core::ExecCtx::Get()->Flush();
GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
grpc_timeout_seconds_to_deadline(5)) != nullptr);
- // Test 7: fallback re-resolution.
- // next_results == NULL, reresolution_results == NULL, last_used_results !=
- // NULL. Expected response is last_used_results.
- on_res_arg = create_on_resolution_arg(last_used_results);
- on_resolution = GRPC_CLOSURE_CREATE(on_resolution_cb, &on_res_arg,
- grpc_combiner_scheduler(combiner));
- resolver->NextLocked(&on_res_arg.resolver_result, on_resolution);
- // Reset reresolution_results.
- response_generator->SetReresolutionResponse(nullptr);
- // Flush here to guarantee that reresolution_results has been reset.
- grpc_core::ExecCtx::Get()->Flush();
- // Trigger a re-resolution.
- resolver->RequestReresolutionLocked();
- grpc_core::ExecCtx::Get()->Flush();
- GPR_ASSERT(gpr_event_wait(&on_res_arg.ev,
- grpc_timeout_seconds_to_deadline(5)) != nullptr);
- // Test 8: no-op.
+ // Test 6: no-op.
// Requesting a new resolution without setting the response shouldn't trigger
// the resolution callback.
memset(&on_res_arg, 0, sizeof(on_res_arg));
diff --git a/test/core/end2end/BUILD b/test/core/end2end/BUILD
index dd16694204..37999a98d1 100644
--- a/test/core/end2end/BUILD
+++ b/test/core/end2end/BUILD
@@ -124,6 +124,19 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "inproc_callback_test",
+ srcs = ["inproc_callback_test.cc"],
+ language = "C++",
+ deps = [
+ ':end2end_tests',
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+)
+
+grpc_cc_test(
name = "invalid_call_argument_test",
srcs = ["invalid_call_argument_test.cc"],
language = "C++",
diff --git a/test/core/end2end/bad_server_response_test.cc b/test/core/end2end/bad_server_response_test.cc
index 3d133cfc18..f7396a1684 100644
--- a/test/core/end2end/bad_server_response_test.cc
+++ b/test/core/end2end/bad_server_response_test.cc
@@ -104,7 +104,7 @@ static void handle_write() {
grpc_slice_buffer_reset_and_unref(&state.outgoing_buffer);
grpc_slice_buffer_add(&state.outgoing_buffer, slice);
- grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write);
+ grpc_endpoint_write(state.tcp, &state.outgoing_buffer, &on_write, nullptr);
}
static void handle_read(void* arg, grpc_error* error) {
diff --git a/test/core/end2end/end2end_test.sh b/test/core/end2end/end2end_test.sh
index 5bfb253090..6b23d848be 100755
--- a/test/core/end2end/end2end_test.sh
+++ b/test/core/end2end/end2end_test.sh
@@ -15,7 +15,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-if [ -z "$3" ]
+if [ -n "$3" ]
then
export GRPC_POLL_STRATEGY=$3
fi
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc
index f02fa9d998..ea9c000efb 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.cc
+++ b/test/core/end2end/fixtures/http_proxy_fixture.cc
@@ -201,7 +201,7 @@ static void on_client_write_done(void* arg, grpc_error* error) {
&conn->client_write_buffer);
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
- &conn->on_client_write_done);
+ &conn->on_client_write_done, nullptr);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "write_done");
@@ -226,7 +226,7 @@ static void on_server_write_done(void* arg, grpc_error* error) {
&conn->server_write_buffer);
conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
- &conn->on_server_write_done);
+ &conn->on_server_write_done, nullptr);
} else {
// No more writes. Unref the connection.
proxy_connection_unref(conn, "server_write");
@@ -257,7 +257,7 @@ static void on_client_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "client_read");
conn->server_is_writing = true;
grpc_endpoint_write(conn->server_endpoint, &conn->server_write_buffer,
- &conn->on_server_write_done);
+ &conn->on_server_write_done, nullptr);
}
// Read more data.
grpc_endpoint_read(conn->client_endpoint, &conn->client_read_buffer,
@@ -288,7 +288,7 @@ static void on_server_read_done(void* arg, grpc_error* error) {
proxy_connection_ref(conn, "server_read");
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
- &conn->on_client_write_done);
+ &conn->on_client_write_done, nullptr);
}
// Read more data.
grpc_endpoint_read(conn->server_endpoint, &conn->server_read_buffer,
@@ -340,7 +340,7 @@ static void on_server_connect_done(void* arg, grpc_error* error) {
grpc_slice_buffer_add(&conn->client_write_buffer, slice);
conn->client_is_writing = true;
grpc_endpoint_write(conn->client_endpoint, &conn->client_write_buffer,
- &conn->on_write_response_done);
+ &conn->on_write_response_done, nullptr);
}
/**
diff --git a/test/core/end2end/inproc_callback_test.cc b/test/core/end2end/inproc_callback_test.cc
new file mode 100644
index 0000000000..0d6c7c75a8
--- /dev/null
+++ b/test/core/end2end/inproc_callback_test.cc
@@ -0,0 +1,498 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "test/core/end2end/end2end_tests.h"
+
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/ext/transport/inproc/inproc_transport.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/surface/completion_queue.h"
+#include "src/core/lib/surface/server.h"
+#include "test/core/util/port.h"
+#include "test/core/util/test_config.h"
+
+typedef struct inproc_fixture_data {
+ bool dummy; // reserved for future expansion. Struct can't be empty
+} inproc_fixture_data;
+
+namespace {
+template <typename F>
+class CQDeletingCallback : public grpc_core::CQCallbackInterface {
+ public:
+ explicit CQDeletingCallback(F f) : func_(f) {}
+ ~CQDeletingCallback() override {}
+ void Run(bool ok) override {
+ func_(ok);
+ grpc_core::Delete(this);
+ }
+
+ private:
+ F func_;
+};
+
+template <typename F>
+grpc_core::CQCallbackInterface* NewDeletingCallback(F f) {
+ return grpc_core::New<CQDeletingCallback<F>>(f);
+}
+
+class ShutdownCallback : public grpc_core::CQCallbackInterface {
+ public:
+ ShutdownCallback() : done_(false) {
+ gpr_mu_init(&mu_);
+ gpr_cv_init(&cv_);
+ }
+ ~ShutdownCallback() override {}
+ void Run(bool ok) override {
+ gpr_log(GPR_DEBUG, "CQ shutdown notification invoked");
+ gpr_mu_lock(&mu_);
+ done_ = true;
+ gpr_cv_broadcast(&cv_);
+ gpr_mu_unlock(&mu_);
+ }
+ // The Wait function waits for a specified amount of
+ // time for the completion of the shutdown and returns
+ // whether it was successfully shut down
+ bool Wait(gpr_timespec deadline) {
+ gpr_mu_lock(&mu_);
+ while (!done_ && !gpr_cv_wait(&cv_, &mu_, deadline)) {
+ }
+ bool ret = done_;
+ gpr_mu_unlock(&mu_);
+ return ret;
+ }
+
+ private:
+ bool done_;
+ gpr_mu mu_;
+ gpr_cv cv_;
+};
+
+ShutdownCallback* g_shutdown_callback;
+} // namespace
+
+// The following global structure is the tag collection. It holds
+// all information related to tags expected and tags received
+// during the execution, with each callback setting a tag.
+// The tag sets are implemented and checked using arrays and
+// linear lookups (rather than maps) so that this test doesn't
+// need the C++ standard library.
+static gpr_mu tags_mu;
+static gpr_cv tags_cv;
+const size_t kAvailableTags = 4;
+bool tags[kAvailableTags];
+bool tags_valid[kAvailableTags];
+bool tags_expected[kAvailableTags];
+bool tags_needed[kAvailableTags];
+
+// Mark that a tag is expected; this function must be executed in the
+// main thread only while there are no other threads altering the
+// expectation set (e.g., by calling expect_tag or verify_tags)
+static void expect_tag(intptr_t tag, bool ok) {
+ size_t idx = static_cast<size_t>(tag);
+ GPR_ASSERT(idx < kAvailableTags);
+ tags_needed[idx] = true;
+ tags_expected[idx] = ok;
+}
+
+// Check that the expected tags have reached, within a certain
+// deadline. This must also be executed only on the main thread while
+// there are no other threads altering the expectation set (e.g., by
+// calling expect_tag or verify_tags). The tag verifier doesn't have
+// to drive the CQ at all (unlike the next-based end2end tests)
+// because the tags will get set when the callbacks are executed,
+// which happens when a particular batch related to a callback is
+// complete.
+static void verify_tags(gpr_timespec deadline) {
+ bool done = false;
+
+ gpr_mu_lock(&tags_mu);
+ while (!done) {
+ done = gpr_time_cmp(gpr_now(GPR_CLOCK_MONOTONIC), deadline) > 0;
+ for (size_t i = 0; i < kAvailableTags; i++) {
+ if (tags_needed[i]) {
+ if (tags_valid[i]) {
+ gpr_log(GPR_DEBUG, "Verifying tag %d", static_cast<int>(i));
+ if (tags[i] != tags_expected[i]) {
+ gpr_log(GPR_ERROR, "Got wrong result (%d instead of %d) for tag %d",
+ tags[i], tags_expected[i], static_cast<int>(i));
+ GPR_ASSERT(false);
+ }
+ tags_valid[i] = false;
+ tags_needed[i] = false;
+ } else if (done) {
+ gpr_log(GPR_ERROR, "Didn't get tag %d", static_cast<int>(i));
+ GPR_ASSERT(false);
+ }
+ }
+ }
+ bool empty = true;
+ for (size_t i = 0; i < kAvailableTags; i++) {
+ if (tags_needed[i]) {
+ empty = false;
+ }
+ }
+ done = done || empty;
+ if (done) {
+ for (size_t i = 0; i < kAvailableTags; i++) {
+ if (tags_valid[i]) {
+ gpr_log(GPR_ERROR, "Got unexpected tag %d and result %d",
+ static_cast<int>(i), tags[i]);
+ GPR_ASSERT(false);
+ }
+ tags_valid[i] = false;
+ }
+ } else {
+ gpr_cv_wait(&tags_cv, &tags_mu, deadline);
+ }
+ }
+ gpr_mu_unlock(&tags_mu);
+}
+
+// This function creates a callback functor that emits the
+// desired tag into the global tag set
+static grpc_core::CQCallbackInterface* tag(intptr_t t) {
+ auto func = [t](bool ok) {
+ gpr_mu_lock(&tags_mu);
+ gpr_log(GPR_DEBUG, "Completing operation %" PRIdPTR, t);
+ bool was_empty = true;
+ for (size_t i = 0; i < kAvailableTags; i++) {
+ if (tags_valid[i]) {
+ was_empty = false;
+ }
+ }
+ size_t idx = static_cast<size_t>(t);
+ tags[idx] = ok;
+ tags_valid[idx] = true;
+ if (was_empty) {
+ gpr_cv_signal(&tags_cv);
+ }
+ gpr_mu_unlock(&tags_mu);
+ };
+ auto cb = NewDeletingCallback(func);
+ return cb;
+}
+
+static grpc_end2end_test_fixture inproc_create_fixture(
+ grpc_channel_args* client_args, grpc_channel_args* server_args) {
+ grpc_end2end_test_fixture f;
+ inproc_fixture_data* ffd = static_cast<inproc_fixture_data*>(
+ gpr_malloc(sizeof(inproc_fixture_data)));
+ memset(&f, 0, sizeof(f));
+
+ f.fixture_data = ffd;
+ g_shutdown_callback = grpc_core::New<ShutdownCallback>();
+ f.cq =
+ grpc_completion_queue_create_for_callback(g_shutdown_callback, nullptr);
+ f.shutdown_cq = grpc_completion_queue_create_for_pluck(nullptr);
+
+ return f;
+}
+
+void inproc_init_client(grpc_end2end_test_fixture* f,
+ grpc_channel_args* client_args) {
+ f->client = grpc_inproc_channel_create(f->server, client_args, nullptr);
+ GPR_ASSERT(f->client);
+}
+
+void inproc_init_server(grpc_end2end_test_fixture* f,
+ grpc_channel_args* server_args) {
+ if (f->server) {
+ grpc_server_destroy(f->server);
+ }
+ f->server = grpc_server_create(server_args, nullptr);
+ grpc_server_register_completion_queue(f->server, f->cq, nullptr);
+ grpc_server_start(f->server);
+}
+
+void inproc_tear_down(grpc_end2end_test_fixture* f) {
+ inproc_fixture_data* ffd = static_cast<inproc_fixture_data*>(f->fixture_data);
+ gpr_free(ffd);
+}
+
+static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
+ const char* test_name,
+ grpc_channel_args* client_args,
+ grpc_channel_args* server_args) {
+ grpc_end2end_test_fixture f;
+ gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
+ f = config.create_fixture(client_args, server_args);
+ config.init_server(&f, server_args);
+ config.init_client(&f, client_args);
+ return f;
+}
+
+static gpr_timespec n_seconds_from_now(int n) {
+ return grpc_timeout_seconds_to_deadline(n);
+}
+
+static gpr_timespec five_seconds_from_now() { return n_seconds_from_now(5); }
+
+static void drain_cq(grpc_completion_queue* cq) {
+ // Wait for the shutdown callback to arrive, or fail the test
+ GPR_ASSERT(g_shutdown_callback->Wait(five_seconds_from_now()));
+ gpr_log(GPR_DEBUG, "CQ shutdown wait complete");
+ grpc_core::Delete(g_shutdown_callback);
+}
+
+static void shutdown_server(grpc_end2end_test_fixture* f) {
+ if (!f->server) return;
+ grpc_server_shutdown_and_notify(
+ f->server, f->shutdown_cq,
+ reinterpret_cast<void*>(static_cast<intptr_t>(1000)));
+ GPR_ASSERT(
+ grpc_completion_queue_pluck(f->shutdown_cq, (void*)((intptr_t)1000),
+ grpc_timeout_seconds_to_deadline(5), nullptr)
+ .type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(f->server);
+ f->server = nullptr;
+}
+
+static void shutdown_client(grpc_end2end_test_fixture* f) {
+ if (!f->client) return;
+ grpc_channel_destroy(f->client);
+ f->client = nullptr;
+}
+
+static void end_test(grpc_end2end_test_fixture* f) {
+ shutdown_server(f);
+ shutdown_client(f);
+
+ grpc_completion_queue_shutdown(f->cq);
+ drain_cq(f->cq);
+ grpc_completion_queue_destroy(f->cq);
+ grpc_completion_queue_destroy(f->shutdown_cq);
+}
+
+static void simple_request_body(grpc_end2end_test_config config,
+ grpc_end2end_test_fixture f) {
+ grpc_call* c;
+ grpc_call* s;
+ grpc_op ops[6];
+ grpc_op* op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_status_code status;
+ const char* error_string;
+ grpc_call_error error;
+ grpc_slice details;
+ int was_cancelled = 2;
+ char* peer;
+ gpr_timespec deadline = five_seconds_from_now();
+
+ c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ grpc_slice_from_static_string("/foo"), nullptr,
+ deadline, nullptr);
+ GPR_ASSERT(c);
+
+ peer = grpc_call_get_peer(c);
+ GPR_ASSERT(peer != nullptr);
+ gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
+ gpr_free(peer);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ // Create a basic client unary request batch (no payload)
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->data.recv_status_on_client.error_string = &error_string;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
+ nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ // Register a call at the server-side to match the incoming client call
+ error = grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.cq, f.cq, tag(2));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ // We expect that the server call creation callback (and no others) will
+ // execute now since no other batch should be complete.
+ expect_tag(2, true);
+ verify_tags(deadline);
+
+ peer = grpc_call_get_peer(s);
+ GPR_ASSERT(peer != nullptr);
+ gpr_log(GPR_DEBUG, "server_peer=%s", peer);
+ gpr_free(peer);
+ peer = grpc_call_get_peer(c);
+ GPR_ASSERT(peer != nullptr);
+ gpr_log(GPR_DEBUG, "client_peer=%s", peer);
+ gpr_free(peer);
+
+ // Create the server response batch (no payload)
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
+ grpc_slice status_details = grpc_slice_from_static_string("xyz");
+ op->data.send_status_from_server.status_details = &status_details;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = nullptr;
+ op++;
+ error = grpc_call_start_batch(s, ops, static_cast<size_t>(op - ops), tag(3),
+ nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ // Both the client request and server response batches should get complete
+ // now and we should see that their callbacks have been executed
+ expect_tag(3, true);
+ expect_tag(1, true);
+ verify_tags(deadline);
+
+ GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
+ GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
+ // the following sanity check makes sure that the requested error string is
+ // correctly populated by the core. It looks for certain substrings that are
+ // not likely to change much. Some parts of the error, like time created,
+ // obviously are not checked.
+ GPR_ASSERT(nullptr != strstr(error_string, "xyz"));
+ GPR_ASSERT(nullptr != strstr(error_string, "description"));
+ GPR_ASSERT(nullptr != strstr(error_string, "Error received from peer"));
+ GPR_ASSERT(nullptr != strstr(error_string, "grpc_message"));
+ GPR_ASSERT(nullptr != strstr(error_string, "grpc_status"));
+ GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
+ GPR_ASSERT(0 == call_details.flags);
+ GPR_ASSERT(was_cancelled == 1);
+
+ grpc_slice_unref(details);
+ gpr_free(static_cast<void*>(const_cast<char*>(error_string)));
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+
+ grpc_call_unref(c);
+ grpc_call_unref(s);
+
+ int expected_calls = 1;
+ if (config.feature_mask & FEATURE_MASK_SUPPORTS_REQUEST_PROXYING) {
+ expected_calls *= 2;
+ }
+}
+
+static void test_invoke_simple_request(grpc_end2end_test_config config) {
+ grpc_end2end_test_fixture f;
+
+ f = begin_test(config, "test_invoke_simple_request", nullptr, nullptr);
+ simple_request_body(config, f);
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+static void test_invoke_10_simple_requests(grpc_end2end_test_config config) {
+ int i;
+ grpc_end2end_test_fixture f =
+ begin_test(config, "test_invoke_10_simple_requests", nullptr, nullptr);
+ for (i = 0; i < 10; i++) {
+ simple_request_body(config, f);
+ gpr_log(GPR_INFO, "Running test: Passed simple request %d", i);
+ }
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+static void test_invoke_many_simple_requests(grpc_end2end_test_config config) {
+ int i;
+ const int many = 1000;
+ grpc_end2end_test_fixture f =
+ begin_test(config, "test_invoke_many_simple_requests", nullptr, nullptr);
+ gpr_timespec t1 = gpr_now(GPR_CLOCK_MONOTONIC);
+ for (i = 0; i < many; i++) {
+ simple_request_body(config, f);
+ }
+ double us =
+ gpr_timespec_to_micros(gpr_time_sub(gpr_now(GPR_CLOCK_MONOTONIC), t1)) /
+ many;
+ gpr_log(GPR_INFO, "Time per ping %f us", us);
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+static void simple_request(grpc_end2end_test_config config) {
+ int i;
+ for (i = 0; i < 10; i++) {
+ test_invoke_simple_request(config);
+ }
+ test_invoke_10_simple_requests(config);
+ test_invoke_many_simple_requests(config);
+}
+
+static void simple_request_pre_init() {
+ gpr_mu_init(&tags_mu);
+ gpr_cv_init(&tags_cv);
+}
+
+/* All test configurations */
+static grpc_end2end_test_config configs[] = {
+ {"inproc-callback", FEATURE_MASK_SUPPORTS_AUTHORITY_HEADER, nullptr,
+ inproc_create_fixture, inproc_init_client, inproc_init_server,
+ inproc_tear_down},
+};
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+
+ simple_request_pre_init();
+ simple_request(configs[0]);
+
+ grpc_shutdown();
+
+ return 0;
+}
diff --git a/test/core/end2end/tests/filter_call_init_fails.cc b/test/core/end2end/tests/filter_call_init_fails.cc
index 07e1421446..ab96879fe4 100644
--- a/test/core/end2end/tests/filter_call_init_fails.cc
+++ b/test/core/end2end/tests/filter_call_init_fails.cc
@@ -438,6 +438,7 @@ static bool maybe_add_server_channel_filter(grpc_channel_stack_builder* builder,
// must be the last one. So we add it right before the last one.
grpc_channel_stack_builder_iterator* it =
grpc_channel_stack_builder_create_iterator_at_last(builder);
+ GPR_ASSERT(grpc_channel_stack_builder_move_prev(it));
const bool retval = grpc_channel_stack_builder_add_filter_before(
it, &test_filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
@@ -456,6 +457,7 @@ static bool maybe_add_client_channel_filter(grpc_channel_stack_builder* builder,
// must be the last one. So we add it right before the last one.
grpc_channel_stack_builder_iterator* it =
grpc_channel_stack_builder_create_iterator_at_last(builder);
+ GPR_ASSERT(grpc_channel_stack_builder_move_prev(it));
const bool retval = grpc_channel_stack_builder_add_filter_before(
it, &test_filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
@@ -474,6 +476,7 @@ static bool maybe_add_client_subchannel_filter(
// must be the last one. So we add it right before the last one.
grpc_channel_stack_builder_iterator* it =
grpc_channel_stack_builder_create_iterator_at_last(builder);
+ GPR_ASSERT(grpc_channel_stack_builder_move_prev(it));
const bool retval = grpc_channel_stack_builder_add_filter_before(
it, &test_filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
@@ -484,17 +487,13 @@ static bool maybe_add_client_subchannel_filter(
}
static void init_plugin(void) {
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
maybe_add_server_channel_filter, nullptr);
- grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX,
maybe_add_client_channel_filter, nullptr);
- grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL, INT_MAX,
maybe_add_client_subchannel_filter, nullptr);
- grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_MAX,
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
maybe_add_client_channel_filter, nullptr);
}
diff --git a/test/core/end2end/tests/filter_causes_close.cc b/test/core/end2end/tests/filter_causes_close.cc
index 891c1b8c1f..a7f4268803 100644
--- a/test/core/end2end/tests/filter_causes_close.cc
+++ b/test/core/end2end/tests/filter_causes_close.cc
@@ -261,9 +261,8 @@ static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) {
}
static void init_plugin(void) {
- grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL,
- GRPC_CHANNEL_INIT_PRIORITY_HIGH,
- maybe_add_filter, nullptr);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, 0, maybe_add_filter,
+ nullptr);
}
static void destroy_plugin(void) {}
diff --git a/test/core/end2end/tests/filter_latency.cc b/test/core/end2end/tests/filter_latency.cc
index 02a4d07927..a89db7b094 100644
--- a/test/core/end2end/tests/filter_latency.cc
+++ b/test/core/end2end/tests/filter_latency.cc
@@ -314,6 +314,7 @@ static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) {
// must be the last one. So we add it right before the last one.
grpc_channel_stack_builder_iterator* it =
grpc_channel_stack_builder_create_iterator_at_last(builder);
+ GPR_ASSERT(grpc_channel_stack_builder_move_prev(it));
const bool retval = grpc_channel_stack_builder_add_filter_before(
it, filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
@@ -325,15 +326,15 @@ static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) {
static void init_plugin(void) {
gpr_mu_init(&g_mu);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_filter,
- (void*)&test_client_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX,
- maybe_add_filter, (void*)&test_client_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_filter,
- (void*)&test_server_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX,
+ maybe_add_filter,
+ (void*)&test_client_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
+ maybe_add_filter,
+ (void*)&test_client_filter);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ maybe_add_filter,
+ (void*)&test_server_filter);
}
static void destroy_plugin(void) { gpr_mu_destroy(&g_mu); }
diff --git a/test/core/end2end/tests/filter_status_code.cc b/test/core/end2end/tests/filter_status_code.cc
index 6ed1de15c6..ba3cbfa6d1 100644
--- a/test/core/end2end/tests/filter_status_code.cc
+++ b/test/core/end2end/tests/filter_status_code.cc
@@ -333,6 +333,7 @@ static bool maybe_add_filter(grpc_channel_stack_builder* builder, void* arg) {
// So we add it right before the last one.
grpc_channel_stack_builder_iterator* it =
grpc_channel_stack_builder_create_iterator_at_last(builder);
+ GPR_ASSERT(grpc_channel_stack_builder_move_prev(it));
const bool retval = grpc_channel_stack_builder_add_filter_before(
it, filter, nullptr, nullptr);
grpc_channel_stack_builder_iterator_destroy(it);
@@ -349,15 +350,15 @@ static void init_plugin(void) {
g_client_code_recv = false;
g_server_code_recv = false;
- grpc_channel_init_register_stage(
- GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_filter,
- (void*)&test_client_filter);
- grpc_channel_init_register_stage(
- GRPC_CLIENT_DIRECT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX,
- maybe_add_filter, (void*)&test_client_filter);
- grpc_channel_init_register_stage(
- GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_MAX, maybe_add_filter,
- (void*)&test_server_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_CHANNEL, INT_MAX,
+ maybe_add_filter,
+ (void*)&test_client_filter);
+ grpc_channel_init_register_stage(GRPC_CLIENT_DIRECT_CHANNEL, INT_MAX,
+ maybe_add_filter,
+ (void*)&test_client_filter);
+ grpc_channel_init_register_stage(GRPC_SERVER_CHANNEL, INT_MAX,
+ maybe_add_filter,
+ (void*)&test_server_filter);
}
static void destroy_plugin(void) {
diff --git a/test/core/iomgr/BUILD b/test/core/iomgr/BUILD
index 002671a5fa..675d9e6278 100644
--- a/test/core/iomgr/BUILD
+++ b/test/core/iomgr/BUILD
@@ -247,6 +247,19 @@ grpc_cc_test(
)
grpc_cc_test(
+ name = "buffer_list_test",
+ srcs = ["buffer_list_test.cc"],
+ language = "C++",
+ deps = [
+ "//:gpr",
+ "//:grpc",
+ "//test/core/util:gpr_test_util",
+ "//test/core/util:grpc_test_util",
+ ],
+)
+
+
+grpc_cc_test(
name = "tcp_server_posix_test",
srcs = ["tcp_server_posix_test.cc"],
language = "C++",
diff --git a/test/core/iomgr/buffer_list_test.cc b/test/core/iomgr/buffer_list_test.cc
new file mode 100644
index 0000000000..c7f30fa092
--- /dev/null
+++ b/test/core/iomgr/buffer_list_test.cc
@@ -0,0 +1,111 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "src/core/lib/iomgr/port.h"
+
+#include "src/core/lib/iomgr/buffer_list.h"
+
+#include <grpc/grpc.h>
+
+#include "test/core/util/test_config.h"
+
+#ifdef GRPC_LINUX_ERRQUEUE
+
+static void TestShutdownFlushesListVerifier(void* arg,
+ grpc_core::Timestamps* ts,
+ grpc_error* error) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(arg != nullptr);
+ gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
+ gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
+}
+
+/** Tests that all TracedBuffer elements in the list are flushed out on
+ * shutdown.
+ * Also tests that arg is passed correctly.
+ */
+static void TestShutdownFlushesList() {
+ grpc_core::grpc_tcp_set_write_timestamps_callback(
+ TestShutdownFlushesListVerifier);
+ grpc_core::TracedBuffer* list = nullptr;
+#define NUM_ELEM 5
+ gpr_atm verifier_called[NUM_ELEM];
+ for (auto i = 0; i < NUM_ELEM; i++) {
+ gpr_atm_rel_store(&verifier_called[i], static_cast<gpr_atm>(0));
+ grpc_core::TracedBuffer::AddNewEntry(
+ &list, i, static_cast<void*>(&verifier_called[i]));
+ }
+ grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
+ GPR_ASSERT(list == nullptr);
+ for (auto i = 0; i < NUM_ELEM; i++) {
+ GPR_ASSERT(gpr_atm_acq_load(&verifier_called[i]) ==
+ static_cast<gpr_atm>(1));
+ }
+}
+
+static void TestVerifierCalledOnAckVerifier(void* arg,
+ grpc_core::Timestamps* ts,
+ grpc_error* error) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(arg != nullptr);
+ GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
+ GPR_ASSERT(ts->acked_time.tv_sec == 123);
+ GPR_ASSERT(ts->acked_time.tv_nsec == 456);
+ gpr_atm* done = reinterpret_cast<gpr_atm*>(arg);
+ gpr_atm_rel_store(done, static_cast<gpr_atm>(1));
+}
+
+/** Tests that the timestamp verifier is called on an ACK timestamp.
+ */
+static void TestVerifierCalledOnAck() {
+ struct sock_extended_err serr;
+ serr.ee_data = 213;
+ serr.ee_info = grpc_core::SCM_TSTAMP_ACK;
+ struct grpc_core::scm_timestamping tss;
+ tss.ts[0].tv_sec = 123;
+ tss.ts[0].tv_nsec = 456;
+ grpc_core::grpc_tcp_set_write_timestamps_callback(
+ TestVerifierCalledOnAckVerifier);
+ grpc_core::TracedBuffer* list = nullptr;
+ gpr_atm verifier_called;
+ gpr_atm_rel_store(&verifier_called, static_cast<gpr_atm>(0));
+ grpc_core::TracedBuffer::AddNewEntry(&list, 213, &verifier_called);
+ grpc_core::TracedBuffer::ProcessTimestamp(&list, &serr, &tss);
+ GPR_ASSERT(gpr_atm_acq_load(&verifier_called) == static_cast<gpr_atm>(1));
+ GPR_ASSERT(list == nullptr);
+ grpc_core::TracedBuffer::Shutdown(&list, GRPC_ERROR_NONE);
+}
+
+static void TestTcpBufferList() {
+ TestVerifierCalledOnAck();
+ TestShutdownFlushesList();
+}
+
+int main(int argc, char** argv) {
+ grpc_test_init(argc, argv);
+ grpc_init();
+ TestTcpBufferList();
+ grpc_shutdown();
+ return 0;
+}
+
+#else /* GRPC_LINUX_ERRQUEUE */
+
+int main(int argc, char** argv) { return 0; }
+
+#endif /* GRPC_LINUX_ERRQUEUE */
diff --git a/test/core/iomgr/endpoint_tests.cc b/test/core/iomgr/endpoint_tests.cc
index 8db8ac5ed6..a9e8ba86c5 100644
--- a/test/core/iomgr/endpoint_tests.cc
+++ b/test/core/iomgr/endpoint_tests.cc
@@ -150,8 +150,8 @@ static void read_and_write_test_write_handler(void* data, grpc_error* error) {
&state->current_write_data);
grpc_slice_buffer_reset_and_unref(&state->outgoing);
grpc_slice_buffer_addn(&state->outgoing, slices, nslices);
- grpc_endpoint_write(state->write_ep, &state->outgoing,
- &state->done_write);
+ grpc_endpoint_write(state->write_ep, &state->outgoing, &state->done_write,
+ nullptr);
gpr_free(slices);
return;
}
@@ -294,7 +294,8 @@ static void multiple_shutdown_test(grpc_endpoint_test_config config) {
grpc_slice_buffer_add(&slice_buffer, grpc_slice_from_copied_string("a"));
grpc_endpoint_write(f.client_ep, &slice_buffer,
GRPC_CLOSURE_CREATE(inc_on_failure, &fail_count,
- grpc_schedule_on_exec_ctx));
+ grpc_schedule_on_exec_ctx),
+ nullptr);
wait_for_fail_count(&fail_count, 3);
grpc_endpoint_shutdown(f.client_ep,
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Test Shutdown"));
diff --git a/test/core/iomgr/ios/CFStreamTests/Podfile b/test/core/iomgr/ios/CFStreamTests/Podfile
index 630168a363..e6ec66d549 100644
--- a/test/core/iomgr/ios/CFStreamTests/Podfile
+++ b/test/core/iomgr/ios/CFStreamTests/Podfile
@@ -9,6 +9,7 @@ GRPC_LOCAL_SRC = '../../../../..'
# Install the dependencies in the main target plus all test targets.
target 'CFStreamTests' do
pod 'gRPC-Core/CFStream-Implementation', :path => GRPC_LOCAL_SRC
+ pod 'BoringSSL-GRPC', :podspec => "#{GRPC_LOCAL_SRC}/src/objective-c", :inhibit_warnings => true
end
pre_install do |installer|
diff --git a/test/core/iomgr/resource_quota_test.cc b/test/core/iomgr/resource_quota_test.cc
index 059ff7b5f8..f3b35fed32 100644
--- a/test/core/iomgr/resource_quota_test.cc
+++ b/test/core/iomgr/resource_quota_test.cc
@@ -798,6 +798,98 @@ static void test_negative_rq_free_pool(void) {
}
}
+// Simple test to check resource quota thread limits
+static void test_thread_limit() {
+ grpc_core::ExecCtx exec_ctx;
+
+ grpc_resource_quota* rq = grpc_resource_quota_create("test_thread_limit");
+ grpc_resource_user* ru1 = grpc_resource_user_create(rq, "ru1");
+ grpc_resource_user* ru2 = grpc_resource_user_create(rq, "ru2");
+
+ // Max threads = 100
+ grpc_resource_quota_set_max_threads(rq, 100);
+
+ // Request quota for 100 threads (50 for ru1, 50 for ru2)
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru1, 10));
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru2, 10));
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru1, 40));
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru2, 40));
+
+ // Threads exhausted. Next request must fail
+ GPR_ASSERT(!grpc_resource_user_allocate_threads(ru2, 20));
+
+ // Free 20 threads from two different users
+ grpc_resource_user_free_threads(ru1, 10);
+ grpc_resource_user_free_threads(ru2, 10);
+
+ // Next request to 20 threads must succeed
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru2, 20));
+
+ // No more thread quota again
+ GPR_ASSERT(!grpc_resource_user_allocate_threads(ru1, 20));
+
+ // Free 10 more
+ grpc_resource_user_free_threads(ru1, 10);
+
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru1, 5));
+ GPR_ASSERT(
+ !grpc_resource_user_allocate_threads(ru2, 10)); // Only 5 available
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru2, 5));
+
+ // Teardown (ru1 and ru2 release all the quota back to rq)
+ grpc_resource_user_unref(ru1);
+ grpc_resource_user_unref(ru2);
+ grpc_resource_quota_unref(rq);
+}
+
+// Change max quota in either direction dynamically
+static void test_thread_maxquota_change() {
+ grpc_core::ExecCtx exec_ctx;
+
+ grpc_resource_quota* rq =
+ grpc_resource_quota_create("test_thread_maxquota_change");
+ grpc_resource_user* ru1 = grpc_resource_user_create(rq, "ru1");
+ grpc_resource_user* ru2 = grpc_resource_user_create(rq, "ru2");
+
+ // Max threads = 100
+ grpc_resource_quota_set_max_threads(rq, 100);
+
+ // Request quota for 100 threads (50 for ru1, 50 for ru2)
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru1, 50));
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru2, 50));
+
+ // Threads exhausted. Next request must fail
+ GPR_ASSERT(!grpc_resource_user_allocate_threads(ru2, 20));
+
+ // Increase maxquota and retry
+ // Max threads = 150;
+ grpc_resource_quota_set_max_threads(rq, 150);
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru2, 20)); // ru2=70, ru1=50
+
+ // Decrease maxquota (Note: Quota already given to ru1 and ru2 is unaffected)
+ // Max threads = 10;
+ grpc_resource_quota_set_max_threads(rq, 10);
+
+ // New requests will fail until quota is available
+ GPR_ASSERT(!grpc_resource_user_allocate_threads(ru1, 10));
+
+ // Make quota available
+ grpc_resource_user_free_threads(ru1, 50); // ru1 now has 0
+ GPR_ASSERT(!grpc_resource_user_allocate_threads(ru1, 10)); // not enough
+
+ grpc_resource_user_free_threads(ru2, 70); // ru2 now has 0
+
+ // Now we can get quota up-to 10, the current max
+ GPR_ASSERT(grpc_resource_user_allocate_threads(ru2, 10));
+ // No more thread quota again
+ GPR_ASSERT(!grpc_resource_user_allocate_threads(ru1, 10));
+
+ // Teardown (ru1 and ru2 release all the quota back to rq)
+ grpc_resource_user_unref(ru1);
+ grpc_resource_user_unref(ru2);
+ grpc_resource_quota_unref(rq);
+}
+
int main(int argc, char** argv) {
grpc_test_init(argc, argv);
grpc_init();
@@ -827,6 +919,11 @@ int main(int argc, char** argv) {
test_negative_rq_free_pool();
gpr_mu_destroy(&g_mu);
gpr_cv_destroy(&g_cv);
+
+ // Resource quota thread related
+ test_thread_limit();
+ test_thread_maxquota_change();
+
grpc_shutdown();
return 0;
}
diff --git a/test/core/iomgr/tcp_posix_test.cc b/test/core/iomgr/tcp_posix_test.cc
index 3e87831e44..6447cc234d 100644
--- a/test/core/iomgr/tcp_posix_test.cc
+++ b/test/core/iomgr/tcp_posix_test.cc
@@ -36,6 +36,9 @@
#include <grpc/support/time.h>
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/iomgr/buffer_list.h"
+#include "src/core/lib/iomgr/ev_posix.h"
+#include "src/core/lib/iomgr/sockaddr_posix.h"
#include "src/core/lib/slice/slice_internal.h"
#include "test/core/iomgr/endpoint_tests.h"
#include "test/core/util/test_config.h"
@@ -68,6 +71,43 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
}
+static void create_inet_sockets(int sv[2]) {
+ /* Prepare listening socket */
+ struct sockaddr_in addr;
+ memset(&addr, 0, sizeof(struct sockaddr_in));
+ addr.sin_family = AF_INET;
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ GPR_ASSERT(sock);
+ GPR_ASSERT(bind(sock, (sockaddr*)&addr, sizeof(sockaddr_in)) == 0);
+ listen(sock, 1);
+
+ /* Prepare client socket and connect to server */
+ socklen_t len = sizeof(sockaddr_in);
+ GPR_ASSERT(getsockname(sock, (sockaddr*)&addr, &len) == 0);
+
+ int client = socket(AF_INET, SOCK_STREAM, 0);
+ GPR_ASSERT(client);
+ int ret;
+ do {
+ ret = connect(client, (sockaddr*)&addr, sizeof(sockaddr_in));
+ } while (ret == -1 && errno == EINTR);
+
+ /* Accept client connection */
+ len = sizeof(socklen_t);
+ int server;
+ do {
+ server = accept(sock, (sockaddr*)&addr, (socklen_t*)&len);
+ } while (server == -1 && errno == EINTR);
+ GPR_ASSERT(server != -1);
+
+ sv[0] = server;
+ sv[1] = client;
+ int flags = fcntl(sv[0], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
+ flags = fcntl(sv[1], F_GETFL, 0);
+ GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
+}
+
static ssize_t fill_socket(int fd) {
ssize_t write_bytes;
ssize_t total_bytes = 0;
@@ -289,11 +329,10 @@ static grpc_slice* allocate_blocks(size_t num_bytes, size_t slice_size,
static void write_done(void* user_data /* write_socket_state */,
grpc_error* error) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
struct write_socket_state* state =
static_cast<struct write_socket_state*>(user_data);
- gpr_log(GPR_INFO, "Write done callback called");
gpr_mu_lock(g_mu);
- gpr_log(GPR_INFO, "Signalling write done");
state->write_done = 1;
GPR_ASSERT(
GRPC_LOG_IF_ERROR("pollset_kick", grpc_pollset_kick(g_pollset, nullptr)));
@@ -340,10 +379,24 @@ void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
gpr_free(buf);
}
+/* Verifier for timestamps callback for write_test */
+void timestamps_verifier(void* arg, grpc_core::Timestamps* ts,
+ grpc_error* error) {
+ GPR_ASSERT(error == GRPC_ERROR_NONE);
+ GPR_ASSERT(arg != nullptr);
+ GPR_ASSERT(ts->sendmsg_time.clock_type == GPR_CLOCK_REALTIME);
+ GPR_ASSERT(ts->scheduled_time.clock_type == GPR_CLOCK_REALTIME);
+ GPR_ASSERT(ts->acked_time.clock_type == GPR_CLOCK_REALTIME);
+ gpr_atm* done_timestamps = (gpr_atm*)arg;
+ gpr_atm_rel_store(done_timestamps, static_cast<gpr_atm>(1));
+}
+
/* Write to a socket using the grpc_tcp API, then drain it directly.
Note that if the write does not complete immediately we need to drain the
- socket in parallel with the read. */
-static void write_test(size_t num_bytes, size_t slice_size) {
+ socket in parallel with the read. If collect_timestamps is true, it will
+ try to get timestamps for the write. */
+static void write_test(size_t num_bytes, size_t slice_size,
+ bool collect_timestamps) {
int sv[2];
grpc_endpoint* ep;
struct write_socket_state state;
@@ -356,19 +409,27 @@ static void write_test(size_t num_bytes, size_t slice_size) {
grpc_timespec_to_millis_round_up(grpc_timeout_seconds_to_deadline(20));
grpc_core::ExecCtx exec_ctx;
+ if (collect_timestamps && !grpc_event_engine_can_track_errors()) {
+ return;
+ }
+
gpr_log(GPR_INFO,
"Start write test with %" PRIuPTR " bytes, slice size %" PRIuPTR,
num_bytes, slice_size);
- create_sockets(sv);
+ if (collect_timestamps) {
+ create_inet_sockets(sv);
+ } else {
+ create_sockets(sv);
+ }
grpc_arg a[1];
a[0].key = const_cast<char*>(GRPC_ARG_TCP_READ_CHUNK_SIZE);
a[0].type = GRPC_ARG_INTEGER,
a[0].value.integer = static_cast<int>(slice_size);
grpc_channel_args args = {GPR_ARRAY_SIZE(a), a};
- ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", false), &args,
- "test");
+ ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test", collect_timestamps),
+ &args, "test");
grpc_endpoint_add_to_pollset(ep, g_pollset);
state.ep = ep;
@@ -381,18 +442,26 @@ static void write_test(size_t num_bytes, size_t slice_size) {
GRPC_CLOSURE_INIT(&write_done_closure, write_done, &state,
grpc_schedule_on_exec_ctx);
- grpc_endpoint_write(ep, &outgoing, &write_done_closure);
+ gpr_atm done_timestamps;
+ gpr_atm_rel_store(&done_timestamps, static_cast<gpr_atm>(0));
+ grpc_endpoint_write(ep, &outgoing, &write_done_closure,
+ grpc_event_engine_can_track_errors() && collect_timestamps
+ ? (void*)&done_timestamps
+ : nullptr);
drain_socket_blocking(sv[0], num_bytes, num_bytes);
+ exec_ctx.Flush();
gpr_mu_lock(g_mu);
for (;;) {
grpc_pollset_worker* worker = nullptr;
- if (state.write_done) {
+ if (state.write_done &&
+ (!(grpc_event_engine_can_track_errors() && collect_timestamps) ||
+ gpr_atm_acq_load(&done_timestamps) == static_cast<gpr_atm>(1))) {
break;
}
GPR_ASSERT(GRPC_LOG_IF_ERROR(
"pollset_work", grpc_pollset_work(g_pollset, &worker, deadline)));
gpr_mu_unlock(g_mu);
-
+ exec_ctx.Flush();
gpr_mu_lock(g_mu);
}
gpr_mu_unlock(g_mu);
@@ -497,14 +566,21 @@ void run_tests(void) {
large_read_test(8192);
large_read_test(1);
- write_test(100, 8192);
- write_test(100, 1);
- write_test(100000, 8192);
- write_test(100000, 1);
- write_test(100000, 137);
+ write_test(100, 8192, false);
+ write_test(100, 1, false);
+ write_test(100000, 8192, false);
+ write_test(100000, 1, false);
+ write_test(100000, 137, false);
+
+ write_test(100, 8192, true);
+ write_test(100, 1, true);
+ write_test(100000, 8192, true);
+ write_test(100000, 1, true);
+ write_test(100, 137, true);
for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
- write_test(40320, i);
+ write_test(40320, i, false);
+ write_test(40320, i, true);
}
release_fd_test(100, 8192);
@@ -549,6 +625,7 @@ int main(int argc, char** argv) {
grpc_closure destroyed;
grpc_test_init(argc, argv);
grpc_init();
+ grpc_core::grpc_tcp_set_write_timestamps_callback(timestamps_verifier);
{
grpc_core::ExecCtx exec_ctx;
g_pollset = static_cast<grpc_pollset*>(gpr_zalloc(grpc_pollset_size()));
diff --git a/test/core/iomgr/timer_list_test.cc b/test/core/iomgr/timer_list_test.cc
index b1d919b292..feedf3f149 100644
--- a/test/core/iomgr/timer_list_test.cc
+++ b/test/core/iomgr/timer_list_test.cc
@@ -38,6 +38,8 @@ extern grpc_core::TraceFlag grpc_timer_trace;
extern grpc_core::TraceFlag grpc_timer_check_trace;
static int cb_called[MAX_CB][2];
+static const int64_t kMillisIn25Days = 2160000000;
+static const int64_t kHoursIn25Days = 600;
static void cb(void* arg, grpc_error* error) {
cb_called[(intptr_t)arg][error == GRPC_ERROR_NONE]++;
@@ -151,17 +153,112 @@ void destruction_test(void) {
GPR_ASSERT(1 == cb_called[2][0]);
}
-int main(int argc, char** argv) {
- grpc_test_init(argc, argv);
- grpc_core::ExecCtx::GlobalInit();
+/* Cleans up a list with pending timers that simulate long-running-services.
+ This test does the following:
+ 1) Simulates grpc server start time to 25 days in the past (completed in
+ `main` using TestOnlyGlobalInit())
+ 2) Creates 4 timers - one with a deadline 25 days in the future, one just
+ 3 milliseconds in future, one way out in the future, and one using the
+ grpc_timespec_to_millis_round_up function to compute a deadline of 25
+ days in the future
+ 3) Simulates 4 milliseconds of elapsed time by changing `now` (cached at
+ step 1) to `now+4`
+ 4) Shuts down the timer list
+ https://github.com/grpc/grpc/issues/15904 */
+void long_running_service_cleanup_test(void) {
+ grpc_timer timers[4];
grpc_core::ExecCtx exec_ctx;
- grpc_determine_iomgr_platform();
- grpc_iomgr_platform_init();
- gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
- add_test();
- destruction_test();
- grpc_iomgr_platform_shutdown();
+
+ gpr_log(GPR_INFO, "long_running_service_cleanup_test");
+
+ grpc_millis now = grpc_core::ExecCtx::Get()->Now();
+ GPR_ASSERT(now >= kMillisIn25Days);
+ grpc_timer_list_init();
+ grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_trace);
+ grpc_core::testing::grpc_tracer_enable_flag(&grpc_timer_check_trace);
+ memset(cb_called, 0, sizeof(cb_called));
+
+ grpc_timer_init(
+ &timers[0], now + kMillisIn25Days,
+ GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)0, grpc_schedule_on_exec_ctx));
+ grpc_timer_init(
+ &timers[1], now + 3,
+ GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)1, grpc_schedule_on_exec_ctx));
+ grpc_timer_init(
+ &timers[2], GRPC_MILLIS_INF_FUTURE - 1,
+ GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)2, grpc_schedule_on_exec_ctx));
+
+ gpr_timespec deadline_spec = grpc_millis_to_timespec(
+ now + kMillisIn25Days, gpr_clock_type::GPR_CLOCK_MONOTONIC);
+
+ /* grpc_timespec_to_millis_round_up is how users usually compute a millisecond
+ input value into grpc_timer_init, so we mimic that behavior here */
+ grpc_timer_init(
+ &timers[3], grpc_timespec_to_millis_round_up(deadline_spec),
+ GRPC_CLOSURE_CREATE(cb, (void*)(intptr_t)3, grpc_schedule_on_exec_ctx));
+
+ grpc_core::ExecCtx::Get()->TestOnlySetNow(now + 4);
+ GPR_ASSERT(grpc_timer_check(nullptr) == GRPC_TIMERS_FIRED);
+ grpc_core::ExecCtx::Get()->Flush();
+ GPR_ASSERT(0 == cb_called[0][0]); // Timer 0 not called
+ GPR_ASSERT(0 == cb_called[0][1]);
+ GPR_ASSERT(0 == cb_called[1][0]);
+ GPR_ASSERT(1 == cb_called[1][1]); // Timer 1 fired
+ GPR_ASSERT(0 == cb_called[2][0]); // Timer 2 not called
+ GPR_ASSERT(0 == cb_called[2][1]);
+ GPR_ASSERT(0 == cb_called[3][0]); // Timer 3 not called
+ GPR_ASSERT(0 == cb_called[3][1]);
+
+ grpc_timer_list_shutdown();
+ grpc_core::ExecCtx::Get()->Flush();
+ /* Timers 0, 2, and 3 were fired with an error during cleanup */
+ GPR_ASSERT(1 == cb_called[0][0]);
+ GPR_ASSERT(0 == cb_called[1][0]);
+ GPR_ASSERT(1 == cb_called[2][0]);
+ GPR_ASSERT(1 == cb_called[3][0]);
+}
+
+int main(int argc, char** argv) {
+ /* Tests with default g_start_time */
+ {
+ grpc_test_init(argc, argv);
+ grpc_core::ExecCtx::GlobalInit();
+ grpc_core::ExecCtx exec_ctx;
+ grpc_determine_iomgr_platform();
+ grpc_iomgr_platform_init();
+ gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
+ add_test();
+ destruction_test();
+ grpc_iomgr_platform_shutdown();
+ }
grpc_core::ExecCtx::GlobalShutdown();
+
+ /* Begin long running service tests */
+ {
+ grpc_test_init(argc, argv);
+ /* Set g_start_time back 25 days. */
+ /* We set g_start_time here in case there are any initialization
+ dependencies that use g_start_time. */
+ gpr_timespec new_start =
+ gpr_time_sub(gpr_now(gpr_clock_type::GPR_CLOCK_MONOTONIC),
+ gpr_time_from_hours(kHoursIn25Days,
+ gpr_clock_type::GPR_CLOCK_MONOTONIC));
+ grpc_core::ExecCtx::TestOnlyGlobalInit(new_start);
+ grpc_core::ExecCtx exec_ctx;
+ grpc_determine_iomgr_platform();
+ grpc_iomgr_platform_init();
+ gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
+#ifndef GPR_WINDOWS
+ /* Skip this test on Windows until we figure out why it fails */
+ /* https://github.com/grpc/grpc/issues/16417 */
+ long_running_service_cleanup_test();
+#endif // GPR_WINDOWS
+ add_test();
+ destruction_test();
+ grpc_iomgr_platform_shutdown();
+ }
+ grpc_core::ExecCtx::GlobalShutdown();
+
return 0;
}
diff --git a/test/core/security/credentials_test.cc b/test/core/security/credentials_test.cc
index 8a793e4bb2..97156761bd 100644
--- a/test/core/security/credentials_test.cc
+++ b/test/core/security/credentials_test.cc
@@ -610,7 +610,7 @@ static void test_compute_engine_creds_failure(void) {
grpc_core::ExecCtx exec_ctx;
request_metadata_state* state = make_request_metadata_state(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Error occured when fetching oauth2 token."),
+ "Error occurred when fetching oauth2 token."),
nullptr, 0);
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method,
nullptr, nullptr};
@@ -699,7 +699,7 @@ static void test_refresh_token_creds_failure(void) {
grpc_core::ExecCtx exec_ctx;
request_metadata_state* state = make_request_metadata_state(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
- "Error occured when fetching oauth2 token."),
+ "Error occurred when fetching oauth2 token."),
nullptr, 0);
grpc_auth_metadata_context auth_md_ctx = {test_service_url, test_method,
nullptr, nullptr};
diff --git a/test/core/surface/completion_queue_test.cc b/test/core/surface/completion_queue_test.cc
index 68129146cc..b889fd0fc6 100644
--- a/test/core/surface/completion_queue_test.cc
+++ b/test/core/surface/completion_queue_test.cc
@@ -22,6 +22,7 @@
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/core/lib/gpr/useful.h"
+#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "test/core/util/test_config.h"
@@ -41,11 +42,18 @@ static void shutdown_and_destroy(grpc_completion_queue* cc) {
case GRPC_CQ_NEXT: {
ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME),
nullptr);
+ GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
break;
}
case GRPC_CQ_PLUCK: {
ev = grpc_completion_queue_pluck(
cc, create_test_tag(), gpr_inf_past(GPR_CLOCK_REALTIME), nullptr);
+ GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
+ break;
+ }
+ case GRPC_CQ_CALLBACK: {
+ // Nothing to do here. The shutdown callback will be invoked when
+ // possible.
break;
}
default: {
@@ -54,7 +62,6 @@ static void shutdown_and_destroy(grpc_completion_queue* cc) {
}
}
- GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
grpc_completion_queue_destroy(cc);
}
@@ -350,6 +357,76 @@ static void test_pluck_after_shutdown(void) {
}
}
+static void test_callback(void) {
+ grpc_completion_queue* cc;
+ void* tags[128];
+ grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
+ grpc_cq_polling_type polling_types[] = {
+ GRPC_CQ_DEFAULT_POLLING, GRPC_CQ_NON_LISTENING, GRPC_CQ_NON_POLLING};
+ grpc_completion_queue_attributes attr;
+ unsigned i;
+
+ LOG_TEST("test_callback");
+
+ bool got_shutdown = false;
+ class ShutdownCallback : public grpc_core::CQCallbackInterface {
+ public:
+ ShutdownCallback(bool* done) : done_(done) {}
+ ~ShutdownCallback() {}
+ void Run(bool ok) override { *done_ = ok; }
+
+ private:
+ bool* done_;
+ };
+ ShutdownCallback shutdown_cb(&got_shutdown);
+
+ attr.version = 2;
+ attr.cq_completion_type = GRPC_CQ_CALLBACK;
+ attr.cq_shutdown_cb = &shutdown_cb;
+
+ for (size_t pidx = 0; pidx < GPR_ARRAY_SIZE(polling_types); pidx++) {
+ grpc_core::ExecCtx exec_ctx; // reset exec_ctx
+ attr.cq_polling_type = polling_types[pidx];
+ cc = grpc_completion_queue_create(
+ grpc_completion_queue_factory_lookup(&attr), &attr, nullptr);
+
+ int counter = 0;
+ class TagCallback : public grpc_core::CQCallbackInterface {
+ public:
+ TagCallback(int* counter, int tag) : counter_(counter), tag_(tag) {}
+ ~TagCallback() {}
+ void Run(bool ok) override {
+ GPR_ASSERT(ok);
+ *counter_ += tag_;
+ grpc_core::Delete(this);
+ };
+
+ private:
+ int* counter_;
+ int tag_;
+ };
+
+ int sumtags = 0;
+ for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
+ tags[i] = static_cast<void*>(grpc_core::New<TagCallback>(&counter, i));
+ sumtags += i;
+ }
+
+ for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
+ GPR_ASSERT(grpc_cq_begin_op(cc, tags[i]));
+ grpc_cq_end_op(cc, tags[i], GRPC_ERROR_NONE, do_nothing_end_completion,
+ nullptr, &completions[i]);
+ }
+
+ GPR_ASSERT(sumtags == counter);
+
+ shutdown_and_destroy(cc);
+
+ GPR_ASSERT(got_shutdown);
+ got_shutdown = false;
+ }
+}
+
struct thread_state {
grpc_completion_queue* cc;
void* tag;
@@ -368,6 +445,7 @@ int main(int argc, char** argv) {
test_pluck_after_shutdown();
test_cq_tls_cache_full();
test_cq_tls_cache_empty();
+ test_callback();
grpc_shutdown();
return 0;
}
diff --git a/test/core/surface/public_headers_must_be_c89.c b/test/core/surface/public_headers_must_be_c89.c
index 429497df82..a4d2a5bf62 100644
--- a/test/core/surface/public_headers_must_be_c89.c
+++ b/test/core/surface/public_headers_must_be_c89.c
@@ -77,11 +77,13 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_register_plugin);
printf("%lx", (unsigned long) grpc_init);
printf("%lx", (unsigned long) grpc_shutdown);
+ printf("%lx", (unsigned long) grpc_is_initialized);
printf("%lx", (unsigned long) grpc_version_string);
printf("%lx", (unsigned long) grpc_g_stands_for);
printf("%lx", (unsigned long) grpc_completion_queue_factory_lookup);
printf("%lx", (unsigned long) grpc_completion_queue_create_for_next);
printf("%lx", (unsigned long) grpc_completion_queue_create_for_pluck);
+ printf("%lx", (unsigned long) grpc_completion_queue_create_for_callback);
printf("%lx", (unsigned long) grpc_completion_queue_create);
printf("%lx", (unsigned long) grpc_completion_queue_next);
printf("%lx", (unsigned long) grpc_completion_queue_pluck);
@@ -131,6 +133,7 @@ int main(int argc, char **argv) {
printf("%lx", (unsigned long) grpc_resource_quota_ref);
printf("%lx", (unsigned long) grpc_resource_quota_unref);
printf("%lx", (unsigned long) grpc_resource_quota_resize);
+ printf("%lx", (unsigned long) grpc_resource_quota_set_max_threads);
printf("%lx", (unsigned long) grpc_resource_quota_arg_vtable);
printf("%lx", (unsigned long) grpc_channelz_get_top_channels);
printf("%lx", (unsigned long) grpc_channelz_get_channel);
diff --git a/test/core/tsi/ssl_transport_security_test.cc b/test/core/tsi/ssl_transport_security_test.cc
index b477904d60..baffad6ea3 100644
--- a/test/core/tsi/ssl_transport_security_test.cc
+++ b/test/core/tsi/ssl_transport_security_test.cc
@@ -208,9 +208,11 @@ static void check_session_reusage(ssl_tsi_test_fixture* ssl_fixture,
tsi_peer_get_property_by_name(peer, TSI_SSL_SESSION_REUSED_PEER_PROPERTY);
GPR_ASSERT(session_reused != nullptr);
if (ssl_fixture->session_reused) {
- GPR_ASSERT(strcmp(session_reused->value.data, "true") == 0);
+ GPR_ASSERT(strncmp(session_reused->value.data, "true",
+ session_reused->value.length) == 0);
} else {
- GPR_ASSERT(strcmp(session_reused->value.data, "false") == 0);
+ GPR_ASSERT(strncmp(session_reused->value.data, "false",
+ session_reused->value.length) == 0);
}
}
diff --git a/test/core/util/mock_endpoint.cc b/test/core/util/mock_endpoint.cc
index 1156cd5fc5..ef6fd62b51 100644
--- a/test/core/util/mock_endpoint.cc
+++ b/test/core/util/mock_endpoint.cc
@@ -55,7 +55,7 @@ static void me_read(grpc_endpoint* ep, grpc_slice_buffer* slices,
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
mock_endpoint* m = reinterpret_cast<mock_endpoint*>(ep);
for (size_t i = 0; i < slices->count; i++) {
m->on_write(slices->slices[i]);
diff --git a/test/core/util/passthru_endpoint.cc b/test/core/util/passthru_endpoint.cc
index 5958216747..3cc8ad6fe1 100644
--- a/test/core/util/passthru_endpoint.cc
+++ b/test/core/util/passthru_endpoint.cc
@@ -76,7 +76,7 @@ static half* other_half(half* h) {
}
static void me_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
half* m = other_half(reinterpret_cast<half*>(ep));
gpr_mu_lock(&m->parent->mu);
grpc_error* error = GRPC_ERROR_NONE;
diff --git a/test/core/util/trickle_endpoint.cc b/test/core/util/trickle_endpoint.cc
index f2efb049b4..62ed72a629 100644
--- a/test/core/util/trickle_endpoint.cc
+++ b/test/core/util/trickle_endpoint.cc
@@ -62,7 +62,7 @@ static void maybe_call_write_cb_locked(trickle_endpoint* te) {
}
static void te_write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
trickle_endpoint* te = reinterpret_cast<trickle_endpoint*>(ep);
gpr_mu_lock(&te->mu);
GPR_ASSERT(te->write_cb == nullptr);
@@ -186,7 +186,8 @@ size_t grpc_trickle_endpoint_trickle(grpc_endpoint* ep) {
te->last_write = now;
grpc_endpoint_write(
te->wrapped, &te->writing_buffer,
- GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx));
+ GRPC_CLOSURE_CREATE(te_finish_write, te, grpc_schedule_on_exec_ctx),
+ nullptr);
maybe_call_write_cb_locked(te);
}
}
diff --git a/test/cpp/common/channel_filter_test.cc b/test/cpp/common/channel_filter_test.cc
index 9b603ca5b4..7bdd53f9e7 100644
--- a/test/cpp/common/channel_filter_test.cc
+++ b/test/cpp/common/channel_filter_test.cc
@@ -50,8 +50,7 @@ class MyCallData : public CallData {
// C-core, we don't accidentally break the C++ filter API.
TEST(ChannelFilterTest, RegisterChannelFilter) {
grpc::RegisterChannelFilter<MyChannelData, MyCallData>(
- "myfilter", GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_LOW, true,
- nullptr);
+ "myfilter", GRPC_CLIENT_CHANNEL, INT_MAX, nullptr);
}
// TODO(roth): When we have time, add tests for all methods of the
diff --git a/test/cpp/end2end/client_lb_end2end_test.cc b/test/cpp/end2end/client_lb_end2end_test.cc
index 7fe0da8aae..e5d6132012 100644
--- a/test/cpp/end2end/client_lb_end2end_test.cc
+++ b/test/cpp/end2end/client_lb_end2end_test.cc
@@ -129,12 +129,23 @@ class ClientLbEnd2endTest : public ::testing::Test {
}
}
- void StartServers(size_t num_servers,
- std::vector<int> ports = std::vector<int>()) {
+ void CreateServers(size_t num_servers,
+ std::vector<int> ports = std::vector<int>()) {
+ servers_.clear();
for (size_t i = 0; i < num_servers; ++i) {
int port = 0;
if (ports.size() == num_servers) port = ports[i];
- servers_.emplace_back(new ServerData(server_host_, port));
+ servers_.emplace_back(new ServerData(port));
+ }
+ }
+
+ void StartServer(size_t index) { servers_[index]->Start(server_host_); }
+
+ void StartServers(size_t num_servers,
+ std::vector<int> ports = std::vector<int>()) {
+ CreateServers(num_servers, ports);
+ for (size_t i = 0; i < num_servers; ++i) {
+ StartServer(i);
}
}
@@ -240,20 +251,23 @@ class ClientLbEnd2endTest : public ::testing::Test {
std::unique_ptr<std::thread> thread_;
bool server_ready_ = false;
- explicit ServerData(const grpc::string& server_host, int port = 0) {
+ explicit ServerData(int port = 0) {
port_ = port > 0 ? port : grpc_pick_unused_port_or_die();
+ }
+
+ void Start(const grpc::string& server_host) {
gpr_log(GPR_INFO, "starting server on port %d", port_);
std::mutex mu;
std::unique_lock<std::mutex> lock(mu);
std::condition_variable cond;
thread_.reset(new std::thread(
- std::bind(&ServerData::Start, this, server_host, &mu, &cond)));
+ std::bind(&ServerData::Serve, this, server_host, &mu, &cond)));
cond.wait(lock, [this] { return server_ready_; });
server_ready_ = false;
gpr_log(GPR_INFO, "server startup complete");
}
- void Start(const grpc::string& server_host, std::mutex* mu,
+ void Serve(const grpc::string& server_host, std::mutex* mu,
std::condition_variable* cond) {
std::ostringstream server_address;
server_address << server_host << ":" << port_;
@@ -291,6 +305,17 @@ class ClientLbEnd2endTest : public ::testing::Test {
ResetCounters();
}
+ bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) {
+ const gpr_timespec deadline =
+ grpc_timeout_seconds_to_deadline(timeout_seconds);
+ grpc_connectivity_state state;
+ while ((state = channel->GetState(false /* try_to_connect */)) ==
+ GRPC_CHANNEL_READY) {
+ if (!channel->WaitForStateChange(state, deadline)) return false;
+ }
+ return true;
+ }
+
bool SeenAllServers() {
for (const auto& server : servers_) {
if (server->service_.request_count() == 0) return false;
@@ -353,6 +378,23 @@ TEST_F(ClientLbEnd2endTest, PickFirst) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
+TEST_F(ClientLbEnd2endTest, PickFirstProcessPending) {
+ StartServers(1); // Single server
+ auto channel = BuildChannel(""); // test that pick first is the default.
+ auto stub = BuildStub(channel);
+ SetNextResolution({servers_[0]->port_});
+ WaitForServer(stub, 0, DEBUG_LOCATION);
+ // Create a new channel and its corresponding PF LB policy, which will pick
+ // the subchannels in READY state from the previous RPC against the same
+ // target (even if it happened over a different channel, because subchannels
+ // are globally reused). Progress should happen without any transition from
+ // this READY state.
+ auto second_channel = BuildChannel("");
+ auto second_stub = BuildStub(second_channel);
+ SetNextResolution({servers_[0]->port_});
+ CheckRpcSendOk(second_stub, DEBUG_LOCATION);
+}
+
TEST_F(ClientLbEnd2endTest, PickFirstBackOffInitialReconnect) {
ChannelArguments args;
constexpr int kInitialBackOffMs = 100;
@@ -573,6 +615,83 @@ TEST_F(ClientLbEnd2endTest, PickFirstReresolutionNoSelected) {
EXPECT_EQ("pick_first", channel->GetLoadBalancingPolicyName());
}
+TEST_F(ClientLbEnd2endTest, PickFirstReconnectWithoutNewResolverResult) {
+ std::vector<int> ports = {grpc_pick_unused_port_or_die()};
+ StartServers(1, ports);
+ auto channel = BuildChannel("pick_first");
+ auto stub = BuildStub(channel);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
+ WaitForServer(stub, 0, DEBUG_LOCATION);
+ gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
+ servers_[0]->Shutdown();
+ EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
+ gpr_log(GPR_INFO, "****** RESTARTING SERVER ******");
+ StartServers(1, ports);
+ WaitForServer(stub, 0, DEBUG_LOCATION);
+}
+
+TEST_F(ClientLbEnd2endTest,
+ PickFirstReconnectWithoutNewResolverResultStartsFromTopOfList) {
+ std::vector<int> ports = {grpc_pick_unused_port_or_die(),
+ grpc_pick_unused_port_or_die()};
+ CreateServers(2, ports);
+ StartServer(1);
+ auto channel = BuildChannel("pick_first");
+ auto stub = BuildStub(channel);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** INITIAL CONNECTION *******");
+ WaitForServer(stub, 1, DEBUG_LOCATION);
+ gpr_log(GPR_INFO, "****** STOPPING SERVER ******");
+ servers_[1]->Shutdown();
+ EXPECT_TRUE(WaitForChannelNotReady(channel.get()));
+ gpr_log(GPR_INFO, "****** STARTING BOTH SERVERS ******");
+ StartServers(2, ports);
+ WaitForServer(stub, 0, DEBUG_LOCATION);
+}
+
+TEST_F(ClientLbEnd2endTest, PickFirstCheckStateBeforeStartWatch) {
+ std::vector<int> ports = {grpc_pick_unused_port_or_die()};
+ StartServers(1, ports);
+ auto channel_1 = BuildChannel("pick_first");
+ auto stub_1 = BuildStub(channel_1);
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 1 *******");
+ WaitForServer(stub_1, 0, DEBUG_LOCATION);
+ gpr_log(GPR_INFO, "****** CHANNEL 1 CONNECTED *******");
+ servers_[0]->Shutdown();
+ // Channel 1 will receive a re-resolution containing the same server. It will
+ // create a new subchannel and hold a ref to it.
+ StartServers(1, ports);
+ gpr_log(GPR_INFO, "****** SERVER RESTARTED *******");
+ auto channel_2 = BuildChannel("pick_first");
+ auto stub_2 = BuildStub(channel_2);
+ // TODO(juanlishen): This resolution result will only be visible to channel 2
+ // since the response generator is only associated with channel 2 now. We
+ // should change the response generator to be able to deliver updates to
+ // multiple channels at once.
+ SetNextResolution(ports);
+ gpr_log(GPR_INFO, "****** RESOLUTION SET FOR CHANNEL 2 *******");
+ WaitForServer(stub_2, 0, DEBUG_LOCATION, true);
+ gpr_log(GPR_INFO, "****** CHANNEL 2 CONNECTED *******");
+ servers_[0]->Shutdown();
+ // Wait until the disconnection has triggered the connectivity notification.
+ // Otherwise, the subchannel may be picked for next call but will fail soon.
+ EXPECT_TRUE(WaitForChannelNotReady(channel_2.get()));
+ // Channel 2 will also receive a re-resolution containing the same server.
+ // Both channels will ref the same subchannel that failed.
+ StartServers(1, ports);
+ gpr_log(GPR_INFO, "****** SERVER RESTARTED AGAIN *******");
+ gpr_log(GPR_INFO, "****** CHANNEL 2 STARTING A CALL *******");
+ // The first call after the server restart will succeed.
+ CheckRpcSendOk(stub_2, DEBUG_LOCATION);
+ gpr_log(GPR_INFO, "****** CHANNEL 2 FINISHED A CALL *******");
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_1->GetLoadBalancingPolicyName());
+ // Check LB policy name for the channel.
+ EXPECT_EQ("pick_first", channel_2->GetLoadBalancingPolicyName());
+}
+
TEST_F(ClientLbEnd2endTest, RoundRobin) {
// Start servers and send one RPC per server.
const int kNumServers = 3;
@@ -788,7 +907,7 @@ TEST_F(ClientLbEnd2endTest, RoundRobinReresolve) {
// Kill all servers
gpr_log(GPR_INFO, "****** ABOUT TO KILL SERVERS *******");
for (size_t i = 0; i < servers_.size(); ++i) {
- servers_[i]->Shutdown(false);
+ servers_[i]->Shutdown(true);
}
gpr_log(GPR_INFO, "****** SERVERS KILLED *******");
gpr_log(GPR_INFO, "****** SENDING DOOMED REQUESTS *******");
@@ -849,7 +968,8 @@ TEST_F(ClientLbEnd2endTest, RoundRobinSingleReconnect) {
// No requests have gone to the deceased server.
EXPECT_EQ(pre_death, post_death);
// Bring the first server back up.
- servers_[0].reset(new ServerData(server_host_, ports[0]));
+ servers_[0].reset(new ServerData(ports[0]));
+ StartServer(0);
// Requests should start arriving at the first server either right away (if
// the server managed to start before the RR policy retried the subchannel) or
// after the subchannel retry delay otherwise (RR's subchannel retried before
diff --git a/test/cpp/end2end/filter_end2end_test.cc b/test/cpp/end2end/filter_end2end_test.cc
index a8022823b1..88f8f380c3 100644
--- a/test/cpp/end2end/filter_end2end_test.cc
+++ b/test/cpp/end2end/filter_end2end_test.cc
@@ -323,8 +323,7 @@ TEST_F(FilterEnd2endTest, SimpleBidiStreaming) {
void RegisterFilter() {
grpc::RegisterChannelFilter<ChannelDataImpl, CallDataImpl>(
- "test-filter", GRPC_SERVER_CHANNEL, GRPC_CHANNEL_INIT_PRIORITY_LOW, true,
- nullptr);
+ "test-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr);
}
} // namespace
diff --git a/test/cpp/end2end/grpclb_end2end_test.cc b/test/cpp/end2end/grpclb_end2end_test.cc
index 28f9ae6f40..b69b861fcf 100644
--- a/test/cpp/end2end/grpclb_end2end_test.cc
+++ b/test/cpp/end2end/grpclb_end2end_test.cc
@@ -734,6 +734,25 @@ TEST_F(SingleBalancerTest, InitiallyEmptyServerlist) {
EXPECT_EQ(2U, balancer_servers_[0].service_->response_count());
}
+TEST_F(SingleBalancerTest, AllServersUnreachableFailFast) {
+ SetNextResolutionAllBalancers();
+ const size_t kNumUnreachableServers = 5;
+ std::vector<int> ports;
+ for (size_t i = 0; i < kNumUnreachableServers; ++i) {
+ ports.push_back(grpc_pick_unused_port_or_die());
+ }
+ ScheduleResponseForBalancer(
+ 0, BalancerServiceImpl::BuildResponseForBackends(ports, {}), 0);
+ const Status status = SendRpc();
+ // The error shouldn't be DEADLINE_EXCEEDED.
+ EXPECT_EQ(StatusCode::UNAVAILABLE, status.error_code());
+ balancers_[0]->NotifyDoneWithServerlists();
+ // The balancer got a single request.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->request_count());
+ // and sent a single response.
+ EXPECT_EQ(1U, balancer_servers_[0].service_->response_count());
+}
+
TEST_F(SingleBalancerTest, Fallback) {
SetNextResolutionAllBalancers();
const int kFallbackTimeoutMs = 200 * grpc_test_slowdown_factor();
diff --git a/test/cpp/end2end/health_service_end2end_test.cc b/test/cpp/end2end/health_service_end2end_test.cc
index 1c48b9d151..fca65dfc13 100644
--- a/test/cpp/end2end/health_service_end2end_test.cc
+++ b/test/cpp/end2end/health_service_end2end_test.cc
@@ -64,6 +64,29 @@ class HealthCheckServiceImpl : public ::grpc::health::v1::Health::Service {
return Status::OK;
}
+ Status Watch(ServerContext* context, const HealthCheckRequest* request,
+ ::grpc::ServerWriter<HealthCheckResponse>* writer) override {
+ auto last_state = HealthCheckResponse::UNKNOWN;
+ while (!context->IsCancelled()) {
+ {
+ std::lock_guard<std::mutex> lock(mu_);
+ HealthCheckResponse response;
+ auto iter = status_map_.find(request->service());
+ if (iter == status_map_.end()) {
+ response.set_status(response.SERVICE_UNKNOWN);
+ } else {
+ response.set_status(iter->second);
+ }
+ if (response.status() != last_state) {
+ writer->Write(response, ::grpc::WriteOptions());
+ }
+ }
+ gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_millis(1000, GPR_TIMESPAN)));
+ }
+ return Status::OK;
+ }
+
void SetStatus(const grpc::string& service_name,
HealthCheckResponse::ServingStatus status) {
std::lock_guard<std::mutex> lock(mu_);
@@ -106,14 +129,6 @@ class CustomHealthCheckService : public HealthCheckServiceInterface {
HealthCheckServiceImpl* impl_; // not owned
};
-void LoopCompletionQueue(ServerCompletionQueue* cq) {
- void* tag;
- bool ok;
- while (cq->Next(&tag, &ok)) {
- abort(); // Nothing should come out of the cq.
- }
-}
-
class HealthServiceEnd2endTest : public ::testing::Test {
protected:
HealthServiceEnd2endTest() {}
@@ -218,6 +233,33 @@ class HealthServiceEnd2endTest : public ::testing::Test {
Status(StatusCode::NOT_FOUND, ""));
}
+ void VerifyHealthCheckServiceStreaming() {
+ const grpc::string kServiceName("service_name");
+ HealthCheckServiceInterface* service = server_->GetHealthCheckService();
+ // Start Watch for service.
+ ClientContext context;
+ HealthCheckRequest request;
+ request.set_service(kServiceName);
+ std::unique_ptr<::grpc::ClientReaderInterface<HealthCheckResponse>> reader =
+ hc_stub_->Watch(&context, request);
+ // Initial response will be SERVICE_UNKNOWN.
+ HealthCheckResponse response;
+ EXPECT_TRUE(reader->Read(&response));
+ EXPECT_EQ(response.SERVICE_UNKNOWN, response.status());
+ response.Clear();
+ // Now set service to NOT_SERVING and make sure we get an update.
+ service->SetServingStatus(kServiceName, false);
+ EXPECT_TRUE(reader->Read(&response));
+ EXPECT_EQ(response.NOT_SERVING, response.status());
+ response.Clear();
+ // Now set service to SERVING and make sure we get another update.
+ service->SetServingStatus(kServiceName, true);
+ EXPECT_TRUE(reader->Read(&response));
+ EXPECT_EQ(response.SERVING, response.status());
+ // Finish call.
+ context.TryCancel();
+ }
+
TestServiceImpl echo_test_service_;
HealthCheckServiceImpl health_check_service_impl_;
std::unique_ptr<Health::Stub> hc_stub_;
@@ -245,6 +287,7 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) {
EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
SetUpServer(true, false, false, nullptr);
VerifyHealthCheckService();
+ VerifyHealthCheckServiceStreaming();
// The default service has a size limit of the service name.
const grpc::string kTooLongServiceName(201, 'x');
@@ -252,22 +295,6 @@ TEST_F(HealthServiceEnd2endTest, DefaultHealthService) {
Status(StatusCode::INVALID_ARGUMENT, ""));
}
-// The server has no sync service.
-TEST_F(HealthServiceEnd2endTest, DefaultHealthServiceAsyncOnly) {
- EnableDefaultHealthCheckService(true);
- EXPECT_TRUE(DefaultHealthCheckServiceEnabled());
- SetUpServer(false, true, false, nullptr);
- cq_thread_ = std::thread(LoopCompletionQueue, cq_.get());
-
- HealthCheckServiceInterface* default_service =
- server_->GetHealthCheckService();
- EXPECT_TRUE(default_service == nullptr);
-
- ResetStubs();
-
- SendHealthCheckRpc("", Status(StatusCode::UNIMPLEMENTED, ""));
-}
-
// Provide an empty service to disable the default service.
TEST_F(HealthServiceEnd2endTest, ExplicitlyDisableViaOverride) {
EnableDefaultHealthCheckService(true);
@@ -296,6 +323,7 @@ TEST_F(HealthServiceEnd2endTest, ExplicitlyOverride) {
ResetStubs();
VerifyHealthCheckService();
+ VerifyHealthCheckServiceStreaming();
}
} // namespace
diff --git a/test/cpp/end2end/thread_stress_test.cc b/test/cpp/end2end/thread_stress_test.cc
index ccf8400a87..1a5ed28a2c 100644
--- a/test/cpp/end2end/thread_stress_test.cc
+++ b/test/cpp/end2end/thread_stress_test.cc
@@ -16,6 +16,7 @@
*
*/
+#include <cinttypes>
#include <mutex>
#include <thread>
@@ -24,6 +25,7 @@
#include <grpcpp/channel.h>
#include <grpcpp/client_context.h>
#include <grpcpp/create_channel.h>
+#include <grpcpp/resource_quota.h>
#include <grpcpp/server.h>
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
@@ -51,63 +53,13 @@ namespace testing {
class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
public:
- TestServiceImpl() : signal_client_(false) {}
+ TestServiceImpl() {}
Status Echo(ServerContext* context, const EchoRequest* request,
EchoResponse* response) override {
response->set_message(request->message());
return Status::OK;
}
-
- // Unimplemented is left unimplemented to test the returned error.
-
- Status RequestStream(ServerContext* context,
- ServerReader<EchoRequest>* reader,
- EchoResponse* response) override {
- EchoRequest request;
- response->set_message("");
- while (reader->Read(&request)) {
- response->mutable_message()->append(request.message());
- }
- return Status::OK;
- }
-
- // Return 3 messages.
- // TODO(yangg) make it generic by adding a parameter into EchoRequest
- Status ResponseStream(ServerContext* context, const EchoRequest* request,
- ServerWriter<EchoResponse>* writer) override {
- EchoResponse response;
- response.set_message(request->message() + "0");
- writer->Write(response);
- response.set_message(request->message() + "1");
- writer->Write(response);
- response.set_message(request->message() + "2");
- writer->Write(response);
-
- return Status::OK;
- }
-
- Status BidiStream(
- ServerContext* context,
- ServerReaderWriter<EchoResponse, EchoRequest>* stream) override {
- EchoRequest request;
- EchoResponse response;
- while (stream->Read(&request)) {
- gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
- response.set_message(request.message());
- stream->Write(response);
- }
- return Status::OK;
- }
-
- bool signal_client() {
- std::unique_lock<std::mutex> lock(mu_);
- return signal_client_;
- }
-
- private:
- bool signal_client_;
- std::mutex mu_;
};
template <class Service>
@@ -118,6 +70,7 @@ class CommonStressTest {
virtual void SetUp() = 0;
virtual void TearDown() = 0;
virtual void ResetStub() = 0;
+ virtual bool AllowExhaustion() = 0;
grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
protected:
@@ -146,6 +99,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
CreateChannel(server_address_.str(), InsecureChannelCredentials());
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return false; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -161,7 +115,7 @@ class CommonStressTestInsecure : public CommonStressTest<Service> {
std::ostringstream server_address_;
};
-template <class Service>
+template <class Service, bool allow_resource_exhaustion>
class CommonStressTestInproc : public CommonStressTest<Service> {
public:
void ResetStub() override {
@@ -169,6 +123,7 @@ class CommonStressTestInproc : public CommonStressTest<Service> {
std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
}
+ bool AllowExhaustion() override { return allow_resource_exhaustion; }
protected:
void SetUpStart(ServerBuilder* builder, Service* service) override {
@@ -194,6 +149,26 @@ class CommonStressTestSyncServer : public BaseClass {
};
template <class BaseClass>
+class CommonStressTestSyncServerLowThreadCount : public BaseClass {
+ public:
+ void SetUp() override {
+ ServerBuilder builder;
+ ResourceQuota quota;
+ this->SetUpStart(&builder, &service_);
+ quota.SetMaxThreads(4);
+ builder.SetResourceQuota(quota);
+ this->SetUpEnd(&builder);
+ }
+ void TearDown() override {
+ this->TearDownStart();
+ this->TearDownEnd();
+ }
+
+ private:
+ TestServiceImpl service_;
+};
+
+template <class BaseClass>
class CommonStressTestAsyncServer : public BaseClass {
public:
CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {}
@@ -293,7 +268,8 @@ class End2endTest : public ::testing::Test {
Common common_;
};
-static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
+static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
+ bool allow_exhaustion, gpr_atm* errors) {
EchoRequest request;
EchoResponse response;
request.set_message("Hello");
@@ -301,34 +277,53 @@ static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs) {
for (int i = 0; i < num_rpcs; ++i) {
ClientContext context;
Status s = stub->Echo(&context, request, &response);
- EXPECT_EQ(response.message(), request.message());
+ EXPECT_TRUE(s.ok() || (allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
if (!s.ok()) {
- gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
- s.error_message().c_str());
+ if (!(allow_exhaustion &&
+ s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
+ gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
+ s.error_message().c_str());
+ }
+ gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
+ } else {
+ EXPECT_EQ(response.message(), request.message());
}
- ASSERT_TRUE(s.ok());
}
}
typedef ::testing::Types<
CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
- CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl>>,
+ CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
+ CommonStressTestSyncServerLowThreadCount<
+ CommonStressTestInproc<TestServiceImpl, true>>,
CommonStressTestAsyncServer<
CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
- CommonStressTestAsyncServer<
- CommonStressTestInproc<grpc::testing::EchoTestService::AsyncService>>>
+ CommonStressTestAsyncServer<CommonStressTestInproc<
+ grpc::testing::EchoTestService::AsyncService, false>>>
CommonTypes;
TYPED_TEST_CASE(End2endTest, CommonTypes);
TYPED_TEST(End2endTest, ThreadStress) {
this->common_.ResetStub();
std::vector<std::thread> threads;
+ gpr_atm errors;
+ gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; ++i) {
- threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs);
+ threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
+ this->common_.AllowExhaustion(), &errors);
}
for (int i = 0; i < kNumThreads; ++i) {
threads[i].join();
}
+ uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
+ if (error_cnt != 0) {
+ gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
+ }
+ // If this test allows resource exhaustion, expect that it actually sees some
+ if (this->common_.AllowExhaustion()) {
+ EXPECT_GT(error_cnt, static_cast<uint64_t>(0));
+ }
}
template <class Common>
diff --git a/test/cpp/microbenchmarks/bm_chttp2_transport.cc b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
index 1e9bd273aa..189923a841 100644
--- a/test/cpp/microbenchmarks/bm_chttp2_transport.cc
+++ b/test/cpp/microbenchmarks/bm_chttp2_transport.cc
@@ -96,7 +96,7 @@ class DummyEndpoint : public grpc_endpoint {
}
static void write(grpc_endpoint* ep, grpc_slice_buffer* slices,
- grpc_closure* cb) {
+ grpc_closure* cb, void* arg) {
GRPC_CLOSURE_SCHED(cb, GRPC_ERROR_NONE);
}
diff --git a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
index da095c3e68..85767c8758 100644
--- a/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
+++ b/test/cpp/microbenchmarks/bm_cq_multiple_threads.cc
@@ -34,15 +34,15 @@ struct grpc_pollset {
gpr_mu mu;
};
+static gpr_mu g_mu;
+static gpr_cv g_cv;
+static int g_threads_active;
+static bool g_active;
+
namespace grpc {
namespace testing {
-
-auto& force_library_initialization = Library::get();
-
-static void* g_tag = (void*)static_cast<intptr_t>(10); // Some random number
static grpc_completion_queue* g_cq;
static grpc_event_engine_vtable g_vtable;
-static const grpc_event_engine_vtable* g_old_vtable;
static void pollset_shutdown(grpc_pollset* ps, grpc_closure* closure) {
GRPC_CLOSURE_SCHED(closure, GRPC_ERROR_NONE);
@@ -74,16 +74,18 @@ static grpc_error* pollset_work(grpc_pollset* ps, grpc_pollset_worker** worker,
}
gpr_mu_unlock(&ps->mu);
- GPR_ASSERT(grpc_cq_begin_op(g_cq, g_tag));
+
+ void* tag = (void*)static_cast<intptr_t>(10); // Some random number
+ GPR_ASSERT(grpc_cq_begin_op(g_cq, tag));
grpc_cq_end_op(
- g_cq, g_tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
+ g_cq, tag, GRPC_ERROR_NONE, cq_done_cb, nullptr,
static_cast<grpc_cq_completion*>(gpr_malloc(sizeof(grpc_cq_completion))));
grpc_core::ExecCtx::Get()->Flush();
gpr_mu_lock(&ps->mu);
return GRPC_ERROR_NONE;
}
-static void init_engine_vtable() {
+static const grpc_event_engine_vtable* init_engine_vtable(bool) {
memset(&g_vtable, 0, sizeof(g_vtable));
g_vtable.pollset_size = sizeof(grpc_pollset);
@@ -92,17 +94,23 @@ static void init_engine_vtable() {
g_vtable.pollset_destroy = pollset_destroy;
g_vtable.pollset_work = pollset_work;
g_vtable.pollset_kick = pollset_kick;
+ g_vtable.shutdown_engine = [] {};
+
+ return &g_vtable;
}
static void setup() {
- grpc_init();
+ // This test should only ever be run with a non or any polling engine
+ // Override the polling engine for the non-polling engine
+ // and add a custom polling engine
+ grpc_register_event_engine_factory("none", init_engine_vtable, false);
+ grpc_register_event_engine_factory("bm_cq_multiple_threads",
+ init_engine_vtable, true);
- /* Override the event engine with our test event engine (g_vtable); but before
- * that, save the current event engine in g_old_vtable. We will have to set
- * g_old_vtable back before calling grpc_shutdown() */
- init_engine_vtable();
- g_old_vtable = grpc_get_event_engine_test_only();
- grpc_set_event_engine_test_only(&g_vtable);
+ grpc_init();
+ GPR_ASSERT(strcmp(grpc_get_poll_strategy_name(), "none") == 0 ||
+ strcmp(grpc_get_poll_strategy_name(), "bm_cq_multiple_threads") ==
+ 0);
g_cq = grpc_completion_queue_create_for_next(nullptr);
}
@@ -118,9 +126,6 @@ static void teardown() {
}
grpc_completion_queue_destroy(g_cq);
-
- /* Restore the old event engine before calling grpc_shutdown */
- grpc_set_event_engine_test_only(g_old_vtable);
grpc_shutdown();
}
@@ -137,14 +142,33 @@ static void teardown() {
code (i.e the code between two successive calls of state.KeepRunning()) if
state.KeepRunning() returns false. So it is safe to do the teardown in one
of the threads after state.keepRunning() returns false.
+
+ However, our use requires synchronization because we do additional work at
+ each thread that requires specific ordering (TrackCounters must be constructed
+ after grpc_init because it needs the number of cores, initialized by grpc,
+ and its Finish call must take place before grpc_shutdown so that it can use
+ grpc_stats).
*/
static void BM_Cq_Throughput(benchmark::State& state) {
- TrackCounters track_counters;
gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
+ auto thd_idx = state.thread_index;
- if (state.thread_index == 0) {
+ gpr_mu_lock(&g_mu);
+ g_threads_active++;
+ if (thd_idx == 0) {
setup();
+ g_active = true;
+ gpr_cv_broadcast(&g_cv);
+ } else {
+ while (!g_active) {
+ gpr_cv_wait(&g_cv, &g_mu, deadline);
+ }
}
+ gpr_mu_unlock(&g_mu);
+
+ // Use a TrackCounters object to monitor the gRPC performance statistics
+ // (optionally including low-level counters) before and after the test
+ TrackCounters track_counters;
while (state.KeepRunning()) {
GPR_ASSERT(grpc_completion_queue_next(g_cq, deadline, nullptr).type ==
@@ -152,12 +176,23 @@ static void BM_Cq_Throughput(benchmark::State& state) {
}
state.SetItemsProcessed(state.iterations());
+ track_counters.Finish(state);
- if (state.thread_index == 0) {
- teardown();
+ gpr_mu_lock(&g_mu);
+ g_threads_active--;
+ if (g_threads_active == 0) {
+ gpr_cv_broadcast(&g_cv);
+ } else {
+ while (g_threads_active > 0) {
+ gpr_cv_wait(&g_cv, &g_mu, deadline);
+ }
}
+ gpr_mu_unlock(&g_mu);
- track_counters.Finish(state);
+ if (thd_idx == 0) {
+ teardown();
+ g_active = false;
+ }
}
BENCHMARK(BM_Cq_Throughput)->ThreadRange(1, 16)->UseRealTime();
@@ -172,6 +207,8 @@ void RunTheBenchmarksNamespaced() { RunSpecifiedBenchmarks(); }
} // namespace benchmark
int main(int argc, char** argv) {
+ gpr_mu_init(&g_mu);
+ gpr_cv_init(&g_cv);
::benchmark::Initialize(&argc, argv);
::grpc::testing::InitTest(&argc, &argv, false);
benchmark::RunTheBenchmarksNamespaced();
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index 9d7469c9b5..0b4b2ff0a9 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -180,6 +180,19 @@ class Client {
timer_result = timer_->Mark();
}
+ // Print the median latency per interval for one thread.
+ // If the number of warmup seconds is x, then the first x + 1 numbers in the
+ // vector are from the warmup period and should be discarded.
+ if (median_latency_collection_interval_seconds_ > 0) {
+ std::vector<double> medians_per_interval =
+ threads_[0]->GetMedianPerIntervalList();
+ gpr_log(GPR_INFO, "Num threads: %ld", threads_.size());
+ gpr_log(GPR_INFO, "Number of medians: %ld", medians_per_interval.size());
+ for (size_t j = 0; j < medians_per_interval.size(); j++) {
+ gpr_log(GPR_INFO, "%f", medians_per_interval[j]);
+ }
+ }
+
grpc_stats_data core_stats;
grpc_stats_collect(&core_stats);
@@ -210,6 +223,12 @@ class Client {
}
}
+ // Returns the interval (in seconds) between collecting latency medians. If 0,
+ // no periodic median latencies will be collected.
+ double GetLatencyCollectionIntervalInSeconds() {
+ return median_latency_collection_interval_seconds_;
+ }
+
virtual int GetPollCount() {
// For sync client.
return 0;
@@ -218,6 +237,7 @@ class Client {
protected:
bool closed_loop_;
gpr_atm thread_pool_done_;
+ double median_latency_collection_interval_seconds_; // In seconds
void StartThreads(size_t num_threads) {
gpr_atm_rel_store(&thread_pool_done_, static_cast<gpr_atm>(false));
@@ -299,10 +319,27 @@ class Client {
MergeStatusHistogram(statuses_, s);
}
+ std::vector<double> GetMedianPerIntervalList() {
+ return medians_each_interval_list_;
+ }
+
void UpdateHistogram(HistogramEntry* entry) {
std::lock_guard<std::mutex> g(mu_);
if (entry->value_used()) {
histogram_.Add(entry->value());
+ if (client_->GetLatencyCollectionIntervalInSeconds() > 0) {
+ histogram_per_interval_.Add(entry->value());
+ double now = UsageTimer::Now();
+ if ((now - interval_start_time_) >=
+ client_->GetLatencyCollectionIntervalInSeconds()) {
+ // Record the median latency of requests from the last interval.
+ // Divide by 1e3 to get microseconds.
+ medians_each_interval_list_.push_back(
+ histogram_per_interval_.Percentile(50) / 1e3);
+ histogram_per_interval_.Reset();
+ interval_start_time_ = now;
+ }
+ }
}
if (entry->status_used()) {
statuses_[entry->status()]++;
@@ -334,6 +371,11 @@ class Client {
Client* client_;
const size_t idx_;
std::thread impl_;
+ // The following are used only if
+ // median_latency_collection_interval_seconds_ is greater than 0
+ Histogram histogram_per_interval_;
+ std::vector<double> medians_each_interval_list_;
+ double interval_start_time_;
};
bool ThreadCompleted() {
@@ -392,7 +434,8 @@ class ClientImpl : public Client {
for (auto& t : connecting_threads) {
t->join();
}
-
+ median_latency_collection_interval_seconds_ =
+ config.median_latency_collection_interval_millis() / 1e3;
ClientRequestCreator<RequestType> create_req(&request_,
config.payload_config());
}
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index cabbd51843..11cfb4aa05 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -198,7 +198,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
const ServerConfig& initial_server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const grpc::string& qps_server_target_override,
- const grpc::string& credential_type, bool run_inproc) {
+ const grpc::string& credential_type, bool run_inproc,
+ int32_t median_latency_collection_interval_millis) {
if (run_inproc) {
g_inproc_servers = new std::vector<grpc::testing::Server*>;
}
@@ -317,6 +318,9 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
}
+ client_config.set_median_latency_collection_interval_millis(
+ median_latency_collection_interval_millis);
+
// Targets are all set by now
result_client_config = client_config;
// Start clients
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index fede4d8045..cda89f7ddf 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -32,7 +32,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
const grpc::string& qps_server_target_override,
- const grpc::string& credential_type, bool run_inproc);
+ const grpc::string& credential_type, bool run_inproc,
+ int32_t median_latency_collection_interval_millis);
bool RunQuit(const grpc::string& credential_type);
} // namespace testing
diff --git a/test/cpp/qps/histogram.h b/test/cpp/qps/histogram.h
index ba72b5b332..6275128f34 100644
--- a/test/cpp/qps/histogram.h
+++ b/test/cpp/qps/histogram.h
@@ -34,6 +34,11 @@ class Histogram {
~Histogram() {
if (impl_) grpc_histogram_destroy(impl_);
}
+ void Reset() {
+ if (impl_) grpc_histogram_destroy(impl_);
+ impl_ = grpc_histogram_create(default_resolution(), default_max_possible());
+ }
+
Histogram(Histogram&& other) : impl_(other.impl_) { other.impl_ = nullptr; }
void Merge(const Histogram& h) { grpc_histogram_merge(impl_, h.impl_); }
diff --git a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc
index f2e977d48b..56d1730252 100644
--- a/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/inproc_sync_unary_ping_pong_test.cc
@@ -48,7 +48,7 @@ static void RunSynchronousUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
- kInsecureCredentialsType, true);
+ kInsecureCredentialsType, true, 0);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index 0ff692255c..eaa0dd992c 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -66,6 +66,11 @@ DEFINE_string(json_file_out, "", "File to write the JSON output to.");
DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
"Credential type for communication with workers");
DEFINE_bool(run_inproc, false, "Perform an in-process transport test");
+DEFINE_int32(
+ median_latency_collection_interval_millis, 0,
+ "Specifies the period between gathering latency medians in "
+ "milliseconds. The medians will be logged out on the client at the "
+ "end of the benchmark run. If 0, this periodic collection is disabled.");
namespace grpc {
namespace testing {
@@ -73,13 +78,13 @@ namespace testing {
static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
bool* success) {
std::cerr << "RUNNING SCENARIO: " << scenario.name() << "\n";
- auto result =
- RunScenario(scenario.client_config(), scenario.num_clients(),
- scenario.server_config(), scenario.num_servers(),
- scenario.warmup_seconds(), scenario.benchmark_seconds(),
- !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2,
- FLAGS_qps_server_target_override, FLAGS_credential_type,
- FLAGS_run_inproc);
+ auto result = RunScenario(
+ scenario.client_config(), scenario.num_clients(),
+ scenario.server_config(), scenario.num_servers(),
+ scenario.warmup_seconds(), scenario.benchmark_seconds(),
+ !FLAGS_run_inproc ? scenario.spawn_local_worker_count() : -2,
+ FLAGS_qps_server_target_override, FLAGS_credential_type, FLAGS_run_inproc,
+ FLAGS_median_latency_collection_interval_millis);
// Amend the result with scenario config. Eventually we should adjust
// RunScenario contract so we don't need to touch the result here.
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index df929b9811..6044f4265a 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -52,7 +52,7 @@ static void RunQPS() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
- kInsecureCredentialsType, false);
+ kInsecureCredentialsType, false, 0);
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
index bb415e9d63..a559c82cc8 100644
--- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
@@ -55,7 +55,7 @@ static void RunSynchronousUnaryPingPong() {
const auto result =
RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2, "",
- kInsecureCredentialsType, false);
+ kInsecureCredentialsType, false, 0);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/thread_manager/thread_manager_test.cc b/test/cpp/thread_manager/thread_manager_test.cc
index 7a95a9f17d..99de5a3e01 100644
--- a/test/cpp/thread_manager/thread_manager_test.cc
+++ b/test/cpp/thread_manager/thread_manager_test.cc
@@ -30,30 +30,44 @@
#include "test/cpp/util/test_config.h"
namespace grpc {
+
+struct ThreadManagerTestSettings {
+ // The min number of pollers that SHOULD be active in ThreadManager
+ int min_pollers;
+ // The max number of pollers that could be active in ThreadManager
+ int max_pollers;
+ // The sleep duration in PollForWork() function to simulate "polling"
+ int poll_duration_ms;
+ // The sleep duration in DoWork() function to simulate "work"
+ int work_duration_ms;
+ // Max number of times PollForWork() is called before shutting down
+ int max_poll_calls;
+};
+
class ThreadManagerTest final : public grpc::ThreadManager {
public:
- ThreadManagerTest()
- : ThreadManager(kMinPollers, kMaxPollers),
+ ThreadManagerTest(const char* name, grpc_resource_quota* rq,
+ const ThreadManagerTestSettings& settings)
+ : ThreadManager(name, rq, settings.min_pollers, settings.max_pollers),
+ settings_(settings),
num_do_work_(0),
num_poll_for_work_(0),
num_work_found_(0) {}
grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
- void DoWork(void* tag, bool ok) override;
- void PerformTest();
+ void DoWork(void* tag, bool ok, bool resources) override;
+
+ // Get number of times PollForWork() returned WORK_FOUND
+ int GetNumWorkFound();
+ // Get number of times DoWork() was called
+ int GetNumDoWork();
private:
void SleepForMs(int sleep_time_ms);
- static const int kMinPollers = 2;
- static const int kMaxPollers = 10;
-
- static const int kPollingTimeoutMsec = 10;
- static const int kDoWorkDurationMsec = 1;
-
- // PollForWork will return SHUTDOWN after these many number of invocations
- static const int kMaxNumPollForWork = 50;
+ ThreadManagerTestSettings settings_;
+ // Counters
gpr_atm num_do_work_; // Number of calls to DoWork
gpr_atm num_poll_for_work_; // Number of calls to PollForWork
gpr_atm num_work_found_; // Number of times WORK_FOUND was returned
@@ -69,54 +83,117 @@ void ThreadManagerTest::SleepForMs(int duration_ms) {
grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag,
bool* ok) {
int call_num = gpr_atm_no_barrier_fetch_add(&num_poll_for_work_, 1);
-
- if (call_num >= kMaxNumPollForWork) {
+ if (call_num >= settings_.max_poll_calls) {
Shutdown();
return SHUTDOWN;
}
- // Simulate "polling for work" by sleeping for sometime
- SleepForMs(kPollingTimeoutMsec);
-
+ SleepForMs(settings_.poll_duration_ms); // Simulate "polling" duration
*tag = nullptr;
*ok = true;
- // Return timeout roughly 1 out of every 3 calls
+ // Return timeout roughly 1 out of every 3 calls just to make the test a bit
+ // more interesting
if (call_num % 3 == 0) {
return TIMEOUT;
- } else {
- gpr_atm_no_barrier_fetch_add(&num_work_found_, 1);
- return WORK_FOUND;
}
+
+ gpr_atm_no_barrier_fetch_add(&num_work_found_, 1);
+ return WORK_FOUND;
}
-void ThreadManagerTest::DoWork(void* tag, bool ok) {
+void ThreadManagerTest::DoWork(void* tag, bool ok, bool resources) {
gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
- SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping
+ SleepForMs(settings_.work_duration_ms); // Simulate work by sleeping
}
-void ThreadManagerTest::PerformTest() {
- // Initialize() starts the ThreadManager
- Initialize();
-
- // Wait for all the threads to gracefully terminate
- Wait();
+int ThreadManagerTest::GetNumWorkFound() {
+ return static_cast<int>(gpr_atm_no_barrier_load(&num_work_found_));
+}
- // The number of times DoWork() was called is equal to the number of times
- // WORK_FOUND was returned
- gpr_log(GPR_DEBUG, "DoWork() called %" PRIdPTR " times",
- gpr_atm_no_barrier_load(&num_do_work_));
- GPR_ASSERT(gpr_atm_no_barrier_load(&num_do_work_) ==
- gpr_atm_no_barrier_load(&num_work_found_));
+int ThreadManagerTest::GetNumDoWork() {
+ return static_cast<int>(gpr_atm_no_barrier_load(&num_do_work_));
}
} // namespace grpc
+// Test that the number of times DoWork() is called is equal to the number of
+// times PollForWork() returned WORK_FOUND
+static void TestPollAndWork() {
+ grpc_resource_quota* rq = grpc_resource_quota_create("Test-poll-and-work");
+ grpc::ThreadManagerTestSettings settings = {
+ 2 /* min_pollers */, 10 /* max_pollers */, 10 /* poll_duration_ms */,
+ 1 /* work_duration_ms */, 50 /* max_poll_calls */};
+
+ grpc::ThreadManagerTest test_thread_mgr("TestThreadManager", rq, settings);
+ grpc_resource_quota_unref(rq);
+
+ test_thread_mgr.Initialize(); // Start the thread manager
+ test_thread_mgr.Wait(); // Wait for all threads to finish
+
+ // Verify that The number of times DoWork() was called is equal to the number
+ // of times WORK_FOUND was returned
+ gpr_log(GPR_DEBUG, "DoWork() called %d times",
+ test_thread_mgr.GetNumDoWork());
+ GPR_ASSERT(test_thread_mgr.GetNumDoWork() ==
+ test_thread_mgr.GetNumWorkFound());
+}
+
+static void TestThreadQuota() {
+ const int kMaxNumThreads = 3;
+ grpc_resource_quota* rq = grpc_resource_quota_create("Test-thread-quota");
+ grpc_resource_quota_set_max_threads(rq, kMaxNumThreads);
+
+ // Set work_duration_ms to be much greater than poll_duration_ms. This way,
+ // the thread manager will be forced to create more 'polling' threads to
+ // honor the min_pollers guarantee
+ grpc::ThreadManagerTestSettings settings = {
+ 1 /* min_pollers */, 1 /* max_pollers */, 1 /* poll_duration_ms */,
+ 10 /* work_duration_ms */, 50 /* max_poll_calls */};
+
+ // Create two thread managers (but with same resource quota). This means
+ // that the max number of active threads across BOTH the thread managers
+ // cannot be greater than kMaxNumthreads
+ grpc::ThreadManagerTest test_thread_mgr_1("TestThreadManager-1", rq,
+ settings);
+ grpc::ThreadManagerTest test_thread_mgr_2("TestThreadManager-2", rq,
+ settings);
+ // It is ok to unref resource quota before starting thread managers.
+ grpc_resource_quota_unref(rq);
+
+ // Start both thread managers
+ test_thread_mgr_1.Initialize();
+ test_thread_mgr_2.Initialize();
+
+ // Wait for both to finish
+ test_thread_mgr_1.Wait();
+ test_thread_mgr_2.Wait();
+
+ // Now verify that the total number of active threads in either thread manager
+ // never exceeds kMaxNumThreads
+ //
+ // NOTE: Actually the total active threads across *both* thread managers at
+ // any point of time never exceeds kMaxNumThreads but unfortunately there is
+ // no easy way to verify it (i.e we can't just do (max1 + max2 <= k))
+ // Its okay to not test this case here. The resource quota c-core tests
+ // provide enough coverage to resource quota object with multiple resource
+ // users
+ int max1 = test_thread_mgr_1.GetMaxActiveThreadsSoFar();
+ int max2 = test_thread_mgr_2.GetMaxActiveThreadsSoFar();
+ gpr_log(
+ GPR_DEBUG,
+ "MaxActiveThreads in TestThreadManager_1: %d, TestThreadManager_2: %d",
+ max1, max2);
+ GPR_ASSERT(max1 <= kMaxNumThreads && max2 <= kMaxNumThreads);
+}
+
int main(int argc, char** argv) {
std::srand(std::time(nullptr));
-
grpc::testing::InitTest(&argc, &argv, true);
- grpc::ThreadManagerTest test_rpc_manager;
- test_rpc_manager.PerformTest();
+ grpc_init();
+
+ TestPollAndWork();
+ TestThreadQuota();
+ grpc_shutdown();
return 0;
}
diff --git a/test/cpp/util/BUILD b/test/cpp/util/BUILD
index c3bfeb7615..477862a0ee 100644
--- a/test/cpp/util/BUILD
+++ b/test/cpp/util/BUILD
@@ -269,27 +269,15 @@ grpc_cc_test(
grpc_cc_binary(
name = "grpc_cli",
srcs = [
- "cli_call.cc",
- "cli_call.h",
- "cli_credentials.cc",
- "cli_credentials.h",
- "config_grpc_cli.h",
"grpc_cli.cc",
- "grpc_tool.cc",
- "grpc_tool.h",
- "proto_file_parser.cc",
- "proto_file_parser.h",
- "proto_reflection_descriptor_database.cc",
- "proto_reflection_descriptor_database.h",
- "service_describer.cc",
- "service_describer.h",
- "test_config.h",
- "test_config_cc.cc",
],
external_deps = [
"gflags",
],
deps = [
+ ":grpc_cli_libs",
+ ":grpc++_proto_reflection_desc_db",
+ ":test_config",
"//:grpc++",
"//src/proto/grpc/reflection/v1alpha:reflection_proto",
],