aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--BUILD2
-rw-r--r--CMakeLists.txt2
-rw-r--r--Makefile2
-rw-r--r--README.md2
-rw-r--r--bazel/grpc_build_system.bzl6
-rw-r--r--binding.gyp61
-rw-r--r--build.yaml4
-rw-r--r--gRPC-Core.podspec2
-rwxr-xr-xgrpc.gemspec1
-rw-r--r--package.xml1
-rw-r--r--src/compiler/php_generator.cc12
-rw-r--r--src/compiler/php_generator.h3
-rw-r--r--src/compiler/php_generator_helpers.h17
-rw-r--r--src/compiler/php_plugin.cc5
-rw-r--r--src/core/ext/filters/client_channel/http_proxy.c110
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c5
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c4
-rw-r--r--src/core/ext/transport/inproc/inproc_transport.c7
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.c282
-rw-r--r--src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c8
-rw-r--r--src/core/lib/iomgr/ev_epoll_thread_pool_linux.c8
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.c12
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.c10
-rw-r--r--src/core/lib/iomgr/iomgr_uv.c8
-rw-r--r--src/core/lib/iomgr/iomgr_uv.h37
-rw-r--r--src/core/lib/iomgr/lockfree_event.c14
-rw-r--r--src/core/lib/iomgr/lockfree_event.h5
-rw-r--r--src/core/lib/iomgr/pollset_uv.c7
-rw-r--r--src/core/lib/iomgr/resolve_address_uv.c21
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.c5
-rw-r--r--src/core/lib/iomgr/sockaddr_utils.h2
-rw-r--r--src/core/lib/iomgr/tcp_client_uv.c3
-rw-r--r--src/core/lib/iomgr/tcp_server_uv.c129
-rw-r--r--src/core/lib/iomgr/tcp_uv.c7
-rw-r--r--src/core/lib/iomgr/timer_uv.c4
-rw-r--r--src/core/lib/support/env.h6
-rw-r--r--src/core/lib/support/env_linux.c32
-rw-r--r--src/core/lib/support/env_posix.c5
-rw-r--r--src/core/lib/support/env_windows.c5
-rw-r--r--src/core/lib/support/log.c8
-rw-r--r--src/cpp/server/create_default_thread_pool.cc11
-rw-r--r--src/cpp/server/server_builder.cc18
-rw-r--r--src/cpp/server/thread_pool_interface.h4
-rw-r--r--src/proto/grpc/testing/control.proto5
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb170
-rw-r--r--src/ruby/lib/grpc/generic/bidi_call.rb62
-rw-r--r--src/ruby/lib/grpc/generic/rpc_desc.rb4
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb1
-rw-r--r--src/ruby/spec/client_auth_spec.rb137
-rw-r--r--src/ruby/spec/generic/active_call_spec.rb4
-rw-r--r--src/ruby/spec/generic/client_stub_spec.rb355
-rw-r--r--src/ruby/spec/generic/rpc_desc_spec.rb10
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb145
-rw-r--r--src/ruby/spec/testdata/client.key16
-rw-r--r--src/ruby/spec/testdata/client.pem14
-rw-r--r--templates/binding.gyp.template46
-rw-r--r--test/core/end2end/end2end_nosec_tests.c8
-rw-r--r--test/core/end2end/end2end_tests.c8
-rw-r--r--test/core/end2end/fixtures/h2_http_proxy.c19
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.c52
-rw-r--r--test/core/end2end/fixtures/http_proxy_fixture.h19
-rwxr-xr-xtest/core/end2end/gen_build_yaml.py14
-rwxr-xr-xtest/core/end2end/generate_tests.bzl17
-rw-r--r--test/core/end2end/tests/proxy_auth.c235
-rw-r--r--test/core/surface/completion_queue_threading_test.c3
-rw-r--r--test/cpp/qps/BUILD4
-rw-r--r--test/cpp/qps/client.h14
-rw-r--r--test/cpp/qps/driver.cc39
-rw-r--r--test/cpp/qps/driver.h5
-rw-r--r--test/cpp/qps/qps_json_driver.cc9
-rw-r--r--test/cpp/qps/qps_openloop_test.cc5
-rw-r--r--test/cpp/qps/qps_worker.cc8
-rw-r--r--test/cpp/qps/qps_worker.h4
-rw-r--r--test/cpp/qps/report.cc21
-rw-r--r--test/cpp/qps/report.h7
-rw-r--r--test/cpp/qps/secure_sync_unary_ping_pong_test.cc5
-rw-r--r--test/cpp/qps/server.h15
-rw-r--r--test/cpp/qps/worker.cc5
-rw-r--r--test/cpp/util/create_test_channel.cc40
-rw-r--r--test/cpp/util/create_test_channel.h6
-rw-r--r--tools/distrib/python/grpcio_tools/setup.py4
-rw-r--r--tools/doxygen/Doxyfile.core.internal1
-rwxr-xr-xtools/internal_ci/linux/grpc_build_artifacts.sh9
-rw-r--r--tools/internal_ci/windows/grpc_build_artifacts.bat3
-rw-r--r--tools/interop_matrix/client_matrix.py26
-rwxr-xr-xtools/interop_matrix/run_interop_matrix_tests.py2
-rwxr-xr-xtools/mkowners/mkowners.py2
-rwxr-xr-xtools/profiling/microbenchmarks/bm_diff/bm_diff.py13
-rwxr-xr-xtools/profiling/microbenchmarks/bm_diff/bm_main.py14
-rwxr-xr-xtools/profiling/microbenchmarks/bm_diff/bm_run.py21
-rw-r--r--tools/run_tests/artifacts/build_artifact_python.bat4
-rw-r--r--tools/run_tests/generated/sources_and_headers.json4
-rw-r--r--tools/run_tests/generated/tests.json55
-rwxr-xr-xtools/run_tests/run_tests.py2
-rwxr-xr-xtools/run_tests/sanity/core_banned_functions.py2
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj1
-rw-r--r--vsprojects/vcxproj/grpc/grpc.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj1
-rw-r--r--vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj1
-rw-r--r--vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj2
-rw-r--r--vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters3
-rw-r--r--vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj2
-rw-r--r--vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters3
105 files changed, 2177 insertions, 435 deletions
diff --git a/BUILD b/BUILD
index 083639c856..6dcc76eb7a 100644
--- a/BUILD
+++ b/BUILD
@@ -721,6 +721,7 @@ grpc_cc_library(
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_internal.h",
"src/core/lib/iomgr/iomgr_posix.h",
+ "src/core/lib/iomgr/iomgr_uv.h",
"src/core/lib/iomgr/is_epollexclusive_available.h",
"src/core/lib/iomgr/load_file.h",
"src/core/lib/iomgr/lockfree_event.h",
@@ -1543,6 +1544,7 @@ grpc_cc_library(
":grpc++",
"//src/proto/grpc/reflection/v1alpha:reflection_proto",
],
+ alwayslink = 1,
)
grpc_cc_library(
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 0f87beb02b..266f2c0774 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -4354,6 +4354,7 @@ add_library(end2end_tests
test/core/end2end/tests/payload.c
test/core/end2end/tests/ping.c
test/core/end2end/tests/ping_pong_streaming.c
+ test/core/end2end/tests/proxy_auth.c
test/core/end2end/tests/registered_call.c
test/core/end2end/tests/request_with_flags.c
test/core/end2end/tests/request_with_payload.c
@@ -4453,6 +4454,7 @@ add_library(end2end_nosec_tests
test/core/end2end/tests/payload.c
test/core/end2end/tests/ping.c
test/core/end2end/tests/ping_pong_streaming.c
+ test/core/end2end/tests/proxy_auth.c
test/core/end2end/tests/registered_call.c
test/core/end2end/tests/request_with_flags.c
test/core/end2end/tests/request_with_payload.c
diff --git a/Makefile b/Makefile
index 2ae0229716..7b53024b6c 100644
--- a/Makefile
+++ b/Makefile
@@ -7952,6 +7952,7 @@ LIBEND2END_TESTS_SRC = \
test/core/end2end/tests/payload.c \
test/core/end2end/tests/ping.c \
test/core/end2end/tests/ping_pong_streaming.c \
+ test/core/end2end/tests/proxy_auth.c \
test/core/end2end/tests/registered_call.c \
test/core/end2end/tests/request_with_flags.c \
test/core/end2end/tests/request_with_payload.c \
@@ -8046,6 +8047,7 @@ LIBEND2END_NOSEC_TESTS_SRC = \
test/core/end2end/tests/payload.c \
test/core/end2end/tests/ping.c \
test/core/end2end/tests/ping_pong_streaming.c \
+ test/core/end2end/tests/proxy_auth.c \
test/core/end2end/tests/registered_call.c \
test/core/end2end/tests/request_with_flags.c \
test/core/end2end/tests/request_with_payload.c \
diff --git a/README.md b/README.md
index 0edea88518..995f877219 100644
--- a/README.md
+++ b/README.md
@@ -17,7 +17,7 @@ See [INSTALL](INSTALL.md) for installation instructions for various platforms.
See [tools/run_tests](tools/run_tests) for more guidance on how to run various test suites (e.g. unit tests, interop tests, benchmarks)
-See [Performance dashboard](http://performance-dot-grpc-testing.appspot.com/explore?dashboard=5712453606309888) for the performance numbers for v1.0.x.
+See [Performance dashboard](http://performance-dot-grpc-testing.appspot.com/explore?dashboard=5636470266134528) for the performance numbers for the latest released version.
# Repository Structure & Status
diff --git a/bazel/grpc_build_system.bzl b/bazel/grpc_build_system.bzl
index 8057f27a12..f793cae56d 100644
--- a/bazel/grpc_build_system.bzl
+++ b/bazel/grpc_build_system.bzl
@@ -25,7 +25,8 @@
def grpc_cc_library(name, srcs = [], public_hdrs = [], hdrs = [],
external_deps = [], deps = [], standalone = False,
- language = "C++", testonly = False, visibility = None):
+ language = "C++", testonly = False, visibility = None,
+ alwayslink = 0):
copts = []
if language.upper() == "C":
copts = ["-std=c99"]
@@ -40,7 +41,8 @@ def grpc_cc_library(name, srcs = [], public_hdrs = [], hdrs = [],
linkopts = ["-pthread"],
includes = [
"include"
- ]
+ ],
+ alwayslink = alwayslink,
)
def grpc_proto_plugin(name, srcs = [], deps = []):
diff --git a/binding.gyp b/binding.gyp
index d11a60a68a..d5902cc68f 100644
--- a/binding.gyp
+++ b/binding.gyp
@@ -175,21 +175,28 @@
}],
['OS == "mac"', {
'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ 'OTHER_CFLAGS': [
+ '-g',
+ '-Wall',
+ '-Wextra',
+ '-Werror',
+ '-Wno-long-long',
+ '-Wno-unused-parameter',
+ '-DOSATOMIC_USE_INLINED=1',
+ ],
+ 'OTHER_CPLUSPLUSFLAGS': [
+ '-g',
+ '-Wall',
+ '-Wextra',
+ '-Werror',
+ '-Wno-long-long',
+ '-Wno-unused-parameter',
+ '-DOSATOMIC_USE_INLINED=1',
+ '-stdlib=libc++',
+ '-std=c++11',
+ '-Wno-error=deprecated-declarations'
+ ],
},
- 'OTHER_CFLAGS': [
- '-g',
- '-Wall',
- '-Wextra',
- '-Werror',
- '-Wno-long-long',
- '-Wno-unused-parameter',
- '-DOSATOMIC_USE_INLINED=1',
- ],
- 'OTHER_CPLUSPLUSFLAGS': [
- '-stdlib=libc++',
- '-std=c++11'
- ],
}]
]
},
@@ -508,6 +515,13 @@
'third_party/boringssl/ssl/tls_method.c',
'third_party/boringssl/ssl/tls_record.c',
],
+ 'conditions': [
+ ['OS == "mac"', {
+ 'xcode_settings': {
+ 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ }
+ }]
+ ]
},
],
}],
@@ -626,6 +640,13 @@
'src/core/lib/support/tmpfile_windows.c',
'src/core/lib/support/wrap_memcpy.c',
],
+ 'conditions': [
+ ['OS == "mac"', {
+ 'xcode_settings': {
+ 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ }
+ }]
+ ]
},
{
'target_name': 'grpc',
@@ -889,6 +910,13 @@
'src/core/ext/filters/workarounds/workaround_utils.c',
'src/core/plugin_registry/grpc_plugin_registry.c',
],
+ 'conditions': [
+ ['OS == "mac"', {
+ 'xcode_settings': {
+ 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ }
+ }]
+ ]
},
{
'include_dirs': [
@@ -914,6 +942,11 @@
'ldflags': [
'-Wl,-wrap,memcpy'
]
+ }],
+ ['OS == "mac"', {
+ 'xcode_settings': {
+ 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ }
}]
],
"target_name": "grpc_node",
diff --git a/build.yaml b/build.yaml
index 5dac770614..198467bf5a 100644
--- a/build.yaml
+++ b/build.yaml
@@ -214,6 +214,7 @@ filegroups:
- src/core/lib/iomgr/iomgr.h
- src/core/lib/iomgr/iomgr_internal.h
- src/core/lib/iomgr/iomgr_posix.h
+ - src/core/lib/iomgr/iomgr_uv.h
- src/core/lib/iomgr/is_epollexclusive_available.h
- src/core/lib/iomgr/load_file.h
- src/core/lib/iomgr/lockfree_event.h
@@ -2386,6 +2387,8 @@ targets:
- grpc
- gpr_test_util
- gpr
+ exclude_iomgrs:
+ - uv
platforms:
- linux
secure: true
@@ -4493,6 +4496,7 @@ targets:
- grpc
- gpr_test_util
- gpr
+ timeout_seconds: 1200
- name: writes_per_rpc_test
gtest: true
cpu_cost: 0.5
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 57b6f92d1e..90580c5fad 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -277,6 +277,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_internal.h',
'src/core/lib/iomgr/iomgr_posix.h',
+ 'src/core/lib/iomgr/iomgr_uv.h',
'src/core/lib/iomgr/is_epollexclusive_available.h',
'src/core/lib/iomgr/load_file.h',
'src/core/lib/iomgr/lockfree_event.h',
@@ -761,6 +762,7 @@ Pod::Spec.new do |s|
'src/core/lib/iomgr/iomgr.h',
'src/core/lib/iomgr/iomgr_internal.h',
'src/core/lib/iomgr/iomgr_posix.h',
+ 'src/core/lib/iomgr/iomgr_uv.h',
'src/core/lib/iomgr/is_epollexclusive_available.h',
'src/core/lib/iomgr/load_file.h',
'src/core/lib/iomgr/lockfree_event.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index dedee020b7..a166839888 100755
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -209,6 +209,7 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/iomgr/iomgr.h )
s.files += %w( src/core/lib/iomgr/iomgr_internal.h )
s.files += %w( src/core/lib/iomgr/iomgr_posix.h )
+ s.files += %w( src/core/lib/iomgr/iomgr_uv.h )
s.files += %w( src/core/lib/iomgr/is_epollexclusive_available.h )
s.files += %w( src/core/lib/iomgr/load_file.h )
s.files += %w( src/core/lib/iomgr/lockfree_event.h )
diff --git a/package.xml b/package.xml
index 38f51424e6..b31514b99f 100644
--- a/package.xml
+++ b/package.xml
@@ -223,6 +223,7 @@
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr_internal.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/iomgr_posix.h" role="src" />
+ <file baseinstalldir="/" name="src/core/lib/iomgr/iomgr_uv.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/is_epollexclusive_available.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/load_file.h" role="src" />
<file baseinstalldir="/" name="src/core/lib/iomgr/lockfree_event.h" role="src" />
diff --git a/src/compiler/php_generator.cc b/src/compiler/php_generator.cc
index 6d34761fdf..38ec46e656 100644
--- a/src/compiler/php_generator.cc
+++ b/src/compiler/php_generator.cc
@@ -97,13 +97,14 @@ void PrintMethod(const MethodDescriptor *method, Printer *out) {
}
// Prints out the service descriptor object
-void PrintService(const ServiceDescriptor *service, Printer *out) {
+void PrintService(const ServiceDescriptor *service,
+ const grpc::string &parameter, Printer *out) {
map<grpc::string, grpc::string> vars;
out->Print("/**\n");
out->Print(GetPHPComments(service, " *").c_str());
out->Print(" */\n");
- vars["name"] = service->name();
- out->Print(vars, "class $name$Client extends \\Grpc\\BaseStub {\n\n");
+ vars["name"] = GetPHPServiceClassname(service, parameter);
+ out->Print(vars, "class $name$ extends \\Grpc\\BaseStub {\n\n");
out->Indent();
out->Indent();
out->Print(
@@ -131,7 +132,8 @@ void PrintService(const ServiceDescriptor *service, Printer *out) {
}
grpc::string GenerateFile(const FileDescriptor *file,
- const ServiceDescriptor *service) {
+ const ServiceDescriptor *service,
+ const grpc::string &parameter) {
grpc::string output;
{
StringOutputStream output_stream(&output);
@@ -150,7 +152,7 @@ grpc::string GenerateFile(const FileDescriptor *file,
vars["package"] = MessageIdentifierName(file->package());
out.Print(vars, "namespace $package$;\n\n");
- PrintService(service, &out);
+ PrintService(service, parameter, &out);
}
return output;
}
diff --git a/src/compiler/php_generator.h b/src/compiler/php_generator.h
index 4518bc24f9..9a04bd33d7 100644
--- a/src/compiler/php_generator.h
+++ b/src/compiler/php_generator.h
@@ -24,7 +24,8 @@
namespace grpc_php_generator {
grpc::string GenerateFile(const grpc::protobuf::FileDescriptor *file,
- const grpc::protobuf::ServiceDescriptor *service);
+ const grpc::protobuf::ServiceDescriptor *service,
+ const grpc::string &parameter);
} // namespace grpc_php_generator
diff --git a/src/compiler/php_generator_helpers.h b/src/compiler/php_generator_helpers.h
index 3a5c08b3e6..5edebf6290 100644
--- a/src/compiler/php_generator_helpers.h
+++ b/src/compiler/php_generator_helpers.h
@@ -26,9 +26,22 @@
namespace grpc_php_generator {
+inline grpc::string GetPHPServiceClassname(
+ const grpc::protobuf::ServiceDescriptor *service,
+ const grpc::string &parameter) {
+ grpc::string suffix;
+ if (parameter == "") {
+ suffix = "Client";
+ } else {
+ suffix = parameter;
+ }
+ return service->name() + suffix;
+}
+
inline grpc::string GetPHPServiceFilename(
const grpc::protobuf::FileDescriptor *file,
- const grpc::protobuf::ServiceDescriptor *service) {
+ const grpc::protobuf::ServiceDescriptor *service,
+ const grpc::string &parameter) {
std::vector<grpc::string> tokens =
grpc_generator::tokenize(file->package(), ".");
std::ostringstream oss;
@@ -36,7 +49,7 @@ inline grpc::string GetPHPServiceFilename(
oss << (i == 0 ? "" : "/")
<< grpc_generator::CapitalizeFirstLetter(tokens[i]);
}
- return oss.str() + "/" + service->name() + "Client.php";
+ return oss.str() + "/" + GetPHPServiceClassname(service, parameter) + ".php";
}
// ReplaceAll replaces all instances of search with replace in s.
diff --git a/src/compiler/php_plugin.cc b/src/compiler/php_plugin.cc
index 7a581fd7bc..bbe91656d5 100644
--- a/src/compiler/php_plugin.cc
+++ b/src/compiler/php_plugin.cc
@@ -41,10 +41,11 @@ class PHPGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator {
}
for (int i = 0; i < file->service_count(); i++) {
- grpc::string code = GenerateFile(file, file->service(i));
+ grpc::string code = GenerateFile(file, file->service(i), parameter);
// Get output file name
- grpc::string file_name = GetPHPServiceFilename(file, file->service(i));
+ grpc::string file_name =
+ GetPHPServiceFilename(file, file->service(i), parameter);
std::unique_ptr<grpc::protobuf::io::ZeroCopyOutputStream> output(
context->Open(file_name));
diff --git a/src/core/ext/filters/client_channel/http_proxy.c b/src/core/ext/filters/client_channel/http_proxy.c
index cfb5ec6f00..ef3512ed83 100644
--- a/src/core/ext/filters/client_channel/http_proxy.c
+++ b/src/core/ext/filters/client_channel/http_proxy.c
@@ -22,6 +22,7 @@
#include <string.h>
#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
@@ -29,14 +30,23 @@
#include "src/core/ext/filters/client_channel/proxy_mapper_registry.h"
#include "src/core/ext/filters/client_channel/uri_parser.h"
#include "src/core/lib/channel/channel_args.h"
+#include "src/core/lib/slice/b64.h"
#include "src/core/lib/support/env.h"
+#include "src/core/lib/support/string.h"
-static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) {
+/**
+ * Parses the 'http_proxy' env var and returns the proxy hostname to resolve or
+ * NULL on error. Also sets 'user_cred' to user credentials if present in the
+ * 'http_proxy' env var, otherwise leaves it unchanged. It is caller's
+ * responsibility to gpr_free user_cred.
+ */
+static char* get_http_proxy_server(grpc_exec_ctx* exec_ctx, char** user_cred) {
+ GPR_ASSERT(user_cred != NULL);
+ char* proxy_name = NULL;
char* uri_str = gpr_getenv("http_proxy");
if (uri_str == NULL) return NULL;
grpc_uri* uri =
grpc_uri_parse(exec_ctx, uri_str, false /* suppress_errors */);
- char* proxy_name = NULL;
if (uri == NULL || uri->authority == NULL) {
gpr_log(GPR_ERROR, "cannot parse value of 'http_proxy' env var");
goto done;
@@ -45,11 +55,27 @@ static char* grpc_get_http_proxy_server(grpc_exec_ctx* exec_ctx) {
gpr_log(GPR_ERROR, "'%s' scheme not supported in proxy URI", uri->scheme);
goto done;
}
- if (strchr(uri->authority, '@') != NULL) {
- gpr_log(GPR_ERROR, "userinfo not supported in proxy URI");
- goto done;
+ /* Split on '@' to separate user credentials from host */
+ char** authority_strs = NULL;
+ size_t authority_nstrs;
+ gpr_string_split(uri->authority, "@", &authority_strs, &authority_nstrs);
+ GPR_ASSERT(authority_nstrs != 0); /* should have at least 1 string */
+ if (authority_nstrs == 1) {
+ /* User cred not present in authority */
+ proxy_name = authority_strs[0];
+ } else if (authority_nstrs == 2) {
+ /* User cred found */
+ *user_cred = authority_strs[0];
+ proxy_name = authority_strs[1];
+ gpr_log(GPR_DEBUG, "userinfo found in proxy URI");
+ } else {
+ /* Bad authority */
+ for (size_t i = 0; i < authority_nstrs; i++) {
+ gpr_free(authority_strs[i]);
+ }
+ proxy_name = NULL;
}
- proxy_name = gpr_strdup(uri->authority);
+ gpr_free(authority_strs);
done:
gpr_free(uri_str);
grpc_uri_destroy(uri);
@@ -62,7 +88,8 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
const grpc_channel_args* args,
char** name_to_resolve,
grpc_channel_args** new_args) {
- *name_to_resolve = grpc_get_http_proxy_server(exec_ctx);
+ char* user_cred = NULL;
+ *name_to_resolve = get_http_proxy_server(exec_ctx, &user_cred);
if (*name_to_resolve == NULL) return false;
grpc_uri* uri =
grpc_uri_parse(exec_ctx, server_uri, false /* suppress_errors */);
@@ -71,19 +98,82 @@ static bool proxy_mapper_map_name(grpc_exec_ctx* exec_ctx,
"'http_proxy' environment variable set, but cannot "
"parse server URI '%s' -- not using proxy",
server_uri);
- if (uri != NULL) grpc_uri_destroy(uri);
+ if (uri != NULL) {
+ gpr_free(user_cred);
+ grpc_uri_destroy(uri);
+ }
return false;
}
if (strcmp(uri->scheme, "unix") == 0) {
gpr_log(GPR_INFO, "not using proxy for Unix domain socket '%s'",
server_uri);
+ gpr_free(user_cred);
grpc_uri_destroy(uri);
return false;
}
- grpc_arg new_arg = grpc_channel_arg_string_create(
+ char* no_proxy_str = gpr_getenv("no_proxy");
+ if (no_proxy_str != NULL) {
+ static const char* NO_PROXY_SEPARATOR = ",";
+ bool use_proxy = true;
+ char* server_host;
+ char* server_port;
+ if (!gpr_split_host_port(uri->path[0] == '/' ? uri->path + 1 : uri->path,
+ &server_host, &server_port)) {
+ gpr_log(GPR_INFO,
+ "unable to split host and port, not checking no_proxy list for "
+ "host '%s'",
+ server_uri);
+ } else {
+ size_t uri_len = strlen(server_host);
+ char** no_proxy_hosts;
+ size_t num_no_proxy_hosts;
+ gpr_string_split(no_proxy_str, NO_PROXY_SEPARATOR, &no_proxy_hosts,
+ &num_no_proxy_hosts);
+ for (size_t i = 0; i < num_no_proxy_hosts; i++) {
+ char* no_proxy_entry = no_proxy_hosts[i];
+ size_t no_proxy_len = strlen(no_proxy_entry);
+ if (no_proxy_len <= uri_len &&
+ gpr_stricmp(no_proxy_entry, &server_host[uri_len - no_proxy_len]) ==
+ 0) {
+ gpr_log(GPR_INFO, "not using proxy for host in no_proxy list '%s'",
+ server_uri);
+ use_proxy = false;
+ break;
+ }
+ }
+ for (size_t i = 0; i < num_no_proxy_hosts; i++) {
+ gpr_free(no_proxy_hosts[i]);
+ }
+ gpr_free(no_proxy_hosts);
+ gpr_free(server_host);
+ gpr_free(server_port);
+ if (!use_proxy) {
+ grpc_uri_destroy(uri);
+ gpr_free(*name_to_resolve);
+ *name_to_resolve = NULL;
+ return false;
+ }
+ }
+ }
+ grpc_arg args_to_add[2];
+ args_to_add[0] = grpc_channel_arg_string_create(
GRPC_ARG_HTTP_CONNECT_SERVER,
uri->path[0] == '/' ? uri->path + 1 : uri->path);
- *new_args = grpc_channel_args_copy_and_add(args, &new_arg, 1);
+ if (user_cred != NULL) {
+ /* Use base64 encoding for user credentials as stated in RFC 7617 */
+ char* encoded_user_cred =
+ grpc_base64_encode(user_cred, strlen(user_cred), 0, 0);
+ char* header;
+ gpr_asprintf(&header, "Proxy-Authorization:Basic %s", encoded_user_cred);
+ gpr_free(encoded_user_cred);
+ args_to_add[1] =
+ grpc_channel_arg_string_create(GRPC_ARG_HTTP_CONNECT_HEADERS, header);
+ *new_args = grpc_channel_args_copy_and_add(args, args_to_add, 2);
+ gpr_free(header);
+ } else {
+ *new_args = grpc_channel_args_copy_and_add(args, args_to_add, 1);
+ }
+ gpr_free(user_cred);
grpc_uri_destroy(uri);
return true;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
index cccc3e871a..fdb18f687f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.c
@@ -1705,7 +1705,6 @@ static void lb_on_server_status_received_locked(grpc_exec_ctx *exec_ctx,
static void glb_update_locked(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
const grpc_lb_policy_args *args) {
glb_lb_policy *glb_policy = (glb_lb_policy *)policy;
-
if (glb_policy->updating_lb_channel) {
if (GRPC_TRACER_ON(grpc_lb_glb_trace)) {
gpr_log(GPR_INFO,
@@ -1813,9 +1812,11 @@ static void glb_lb_channel_on_connectivity_changed_cb(grpc_exec_ctx *exec_ctx,
// lb_on_server_status_received will pick up the cancel and reinit
// lb_call.
if (glb_policy->pending_update_args != NULL) {
- const grpc_lb_policy_args *args = glb_policy->pending_update_args;
+ grpc_lb_policy_args *args = glb_policy->pending_update_args;
glb_policy->pending_update_args = NULL;
glb_update_locked(exec_ctx, &glb_policy->base, args);
+ grpc_channel_args_destroy(exec_ctx, args->args);
+ gpr_free(args);
}
} else if (glb_policy->started_picking && !glb_policy->shutting_down) {
if (glb_policy->retry_timer_active) {
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index 7861dc0b11..dc2bf3f9cf 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -658,6 +658,10 @@ static grpc_error *init_header_frame_parser(grpc_exec_ctx *exec_ctx,
"ignoring grpc_chttp2_stream with non-client generated index %d",
t->incoming_stream_id));
return init_skip_frame_parser(exec_ctx, t, 1);
+ } else if (grpc_chttp2_stream_map_size(&t->stream_map) >=
+ t->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS]) {
+ return GRPC_ERROR_CREATE_FROM_STATIC_STRING("Max stream count exceeded");
}
t->last_new_stream_id = t->incoming_stream_id;
s = t->incoming_stream =
diff --git a/src/core/ext/transport/inproc/inproc_transport.c b/src/core/ext/transport/inproc/inproc_transport.c
index 4df64d81e2..14498021eb 100644
--- a/src/core/ext/transport/inproc/inproc_transport.c
+++ b/src/core/ext/transport/inproc/inproc_transport.c
@@ -190,8 +190,11 @@ typedef struct inproc_stream {
static bool inproc_slice_byte_stream_next(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs, size_t max,
grpc_closure *on_complete) {
- inproc_slice_byte_stream *stream = (inproc_slice_byte_stream *)bs;
- return (stream->le->sb.count != 0);
+ // Because inproc transport always provides the entire message atomically,
+ // the byte stream always has data available when this function is called.
+ // Thus, this function always returns true (unlike other transports) and
+ // there is never any need to schedule a closure
+ return true;
}
static grpc_error *inproc_slice_byte_stream_pull(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.c b/src/core/lib/iomgr/ev_epoll1_linux.c
index 66ba601adb..b89b8af15a 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.c
+++ b/src/core/lib/iomgr/ev_epoll1_linux.c
@@ -45,6 +45,7 @@
#include "src/core/lib/iomgr/wakeup_fd_posix.h"
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/support/block_annotate.h"
+#include "src/core/lib/support/string.h"
static grpc_wakeup_fd global_wakeup_fd;
static int g_epfd;
@@ -77,8 +78,21 @@ static void fd_global_shutdown(void);
typedef enum { UNKICKED, KICKED, DESIGNATED_POLLER } kick_state;
+static const char *kick_state_string(kick_state st) {
+ switch (st) {
+ case UNKICKED:
+ return "UNKICKED";
+ case KICKED:
+ return "KICKED";
+ case DESIGNATED_POLLER:
+ return "DESIGNATED_POLLER";
+ }
+ GPR_UNREACHABLE_CODE(return "UNKNOWN");
+}
+
struct grpc_pollset_worker {
kick_state kick_state;
+ int kick_state_mutator; // which line of code last changed kick state
bool initialized_cv;
grpc_pollset_worker *next;
grpc_pollset_worker *prev;
@@ -86,6 +100,12 @@ struct grpc_pollset_worker {
grpc_closure_list schedule_on_end_work;
};
+#define SET_KICK_STATE(worker, state) \
+ do { \
+ (worker)->kick_state = (state); \
+ (worker)->kick_state_mutator = __LINE__; \
+ } while (false)
+
#define MAX_NEIGHBOURHOODS 1024
typedef struct pollset_neighbourhood {
@@ -100,10 +120,15 @@ struct grpc_pollset {
bool reassigning_neighbourhood;
grpc_pollset_worker *root_worker;
bool kicked_without_poller;
+
+ /* Set to true if the pollset is observed to have no workers available to
+ * poll */
bool seen_inactive;
- bool shutting_down; /* Is the pollset shutting down ? */
- bool finish_shutdown_called; /* Is the 'finish_shutdown_locked()' called ? */
+ bool shutting_down; /* Is the pollset shutting down ? */
grpc_closure *shutdown_closure; /* Called after after shutdown is complete */
+
+ /* Number of workers who are *about-to* attach themselves to the pollset
+ * worker list */
int begin_refs;
grpc_pollset *next;
@@ -263,29 +288,23 @@ static bool fd_is_shutdown(grpc_fd *fd) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
-
- /* Note, it is possible that fd_become_readable might be called twice with
- different 'notifier's when an fd becomes readable and it is in two epoll
- sets (This can happen briefly during polling island merges). In such cases
- it does not really matter which notifer is set as the read_notifier_pollset
- (They would both point to the same polling island anyway) */
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Use release store to match with acquire load in fd_get_read_notifier */
gpr_atm_rel_store(&fd->read_notifier_pollset, (gpr_atm)notifier);
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
/*******************************************************************************
@@ -410,18 +429,28 @@ static grpc_error *pollset_kick_all(grpc_pollset *pollset) {
if (pollset->root_worker != NULL) {
grpc_pollset_worker *worker = pollset->root_worker;
do {
- if (worker->initialized_cv) {
- worker->kick_state = KICKED;
- gpr_cv_signal(&worker->cv);
- } else {
- worker->kick_state = KICKED;
- append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
- "pollset_shutdown");
+ switch (worker->kick_state) {
+ case KICKED:
+ break;
+ case UNKICKED:
+ SET_KICK_STATE(worker, KICKED);
+ if (worker->initialized_cv) {
+ gpr_cv_signal(&worker->cv);
+ }
+ break;
+ case DESIGNATED_POLLER:
+ SET_KICK_STATE(worker, KICKED);
+ append_error(&error, grpc_wakeup_fd_wakeup(&global_wakeup_fd),
+ "pollset_kick_all");
+ break;
}
worker = worker->next;
} while (worker != pollset->root_worker);
}
+ // TODO: sreek. Check if we need to set 'kicked_without_poller' to true here
+ // in the else case
+
return error;
}
@@ -437,7 +466,9 @@ static void pollset_maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(pollset->shutdown_closure == NULL);
+ GPR_ASSERT(!pollset->shutting_down);
pollset->shutdown_closure = closure;
+ pollset->shutting_down = true;
GRPC_LOG_IF_ERROR("pollset_shutdown", pollset_kick_all(pollset));
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@@ -510,10 +541,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
gpr_timespec deadline) {
if (worker_hdl != NULL) *worker_hdl = worker;
worker->initialized_cv = false;
- worker->kick_state = UNKICKED;
+ SET_KICK_STATE(worker, UNKICKED);
worker->schedule_on_end_work = (grpc_closure_list)GRPC_CLOSURE_LIST_INIT;
pollset->begin_refs++;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_STARTS:%p", pollset, worker);
+ }
+
if (pollset->seen_inactive) {
// pollset has been observed to be inactive, we need to move back to the
// active list
@@ -529,6 +564,11 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
retry_lock_neighbourhood:
gpr_mu_lock(&neighbourhood->mu);
gpr_mu_lock(&pollset->mu);
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_REORG:%p kick_state=%s is_reassigning=%d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ is_reassigning);
+ }
if (pollset->seen_inactive) {
if (neighbourhood != pollset->neighbourhood) {
gpr_mu_unlock(&neighbourhood->mu);
@@ -539,8 +579,14 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
pollset->seen_inactive = false;
if (neighbourhood->active_root == NULL) {
neighbourhood->active_root = pollset->next = pollset->prev = pollset;
- if (gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
- worker->kick_state = DESIGNATED_POLLER;
+ /* TODO: sreek. Why would this worker state be other than UNKICKED
+ * here ? (since the worker isn't added to the pollset yet, there is no
+ * way it can be "found" by other threads to get kicked). */
+
+ /* If there is no designated poller, make this the designated poller */
+ if (worker->kick_state == UNKICKED &&
+ gpr_atm_no_barrier_cas(&g_active_poller, 0, (gpr_atm)worker)) {
+ SET_KICK_STATE(worker, DESIGNATED_POLLER);
}
} else {
pollset->next = neighbourhood->active_root;
@@ -554,24 +600,53 @@ static bool begin_worker(grpc_pollset *pollset, grpc_pollset_worker *worker,
}
gpr_mu_unlock(&neighbourhood->mu);
}
+
worker_insert(pollset, worker);
pollset->begin_refs--;
- if (worker->kick_state == UNKICKED) {
+ if (worker->kick_state == UNKICKED && !pollset->kicked_without_poller) {
GPR_ASSERT(gpr_atm_no_barrier_load(&g_active_poller) != (gpr_atm)worker);
worker->initialized_cv = true;
gpr_cv_init(&worker->cv);
- while (worker->kick_state == UNKICKED &&
- pollset->shutdown_closure == NULL) {
+ while (worker->kick_state == UNKICKED && !pollset->shutting_down) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, "PS:%p BEGIN_WAIT:%p kick_state=%s shutdown=%d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ pollset->shutting_down);
+ }
+
if (gpr_cv_wait(&worker->cv, &pollset->mu, deadline) &&
worker->kick_state == UNKICKED) {
- worker->kick_state = KICKED;
+ /* If gpr_cv_wait returns true (i.e a timeout), pretend that the worker
+ received a kick */
+ SET_KICK_STATE(worker, KICKED);
}
}
*now = gpr_now(now->clock_type);
}
- return worker->kick_state == DESIGNATED_POLLER &&
- pollset->shutdown_closure == NULL;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR,
+ "PS:%p BEGIN_DONE:%p kick_state=%s shutdown=%d "
+ "kicked_without_poller: %d",
+ pollset, worker, kick_state_string(worker->kick_state),
+ pollset->shutting_down, pollset->kicked_without_poller);
+ }
+
+ /* We release pollset lock in this function at a couple of places:
+ * 1. Briefly when assigning pollset to a neighbourhood
+ * 2. When doing gpr_cv_wait()
+ * It is possible that 'kicked_without_poller' was set to true during (1) and
+ * 'shutting_down' is set to true during (1) or (2). If either of them is
+ * true, this worker cannot do polling */
+ /* TODO(sreek): Perhaps there is a better way to handle kicked_without_poller
+ * case; especially when the worker is the DESIGNATED_POLLER */
+
+ if (pollset->kicked_without_poller) {
+ pollset->kicked_without_poller = false;
+ return false;
+ }
+
+ return worker->kick_state == DESIGNATED_POLLER && !pollset->shutting_down;
}
static bool check_neighbourhood_for_available_poller(
@@ -591,10 +666,18 @@ static bool check_neighbourhood_for_available_poller(
case UNKICKED:
if (gpr_atm_no_barrier_cas(&g_active_poller, 0,
(gpr_atm)inspect_worker)) {
- inspect_worker->kick_state = DESIGNATED_POLLER;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. choose next poller to be %p",
+ inspect_worker);
+ }
+ SET_KICK_STATE(inspect_worker, DESIGNATED_POLLER);
if (inspect_worker->initialized_cv) {
gpr_cv_signal(&inspect_worker->cv);
}
+ } else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. beaten to choose next poller");
+ }
}
// even if we didn't win the cas, there's a worker, we can stop
found_worker = true;
@@ -607,9 +690,12 @@ static bool check_neighbourhood_for_available_poller(
break;
}
inspect_worker = inspect_worker->next;
- } while (inspect_worker != inspect->root_worker);
+ } while (!found_worker && inspect_worker != inspect->root_worker);
}
if (!found_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. mark pollset %p inactive", inspect);
+ }
inspect->seen_inactive = true;
if (inspect == neighbourhood->active_root) {
neighbourhood->active_root =
@@ -627,15 +713,22 @@ static bool check_neighbourhood_for_available_poller(
static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker *worker,
grpc_pollset_worker **worker_hdl) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, "PS:%p END_WORKER:%p", pollset, worker);
+ }
if (worker_hdl != NULL) *worker_hdl = NULL;
- worker->kick_state = KICKED;
+ /* Make sure we appear kicked */
+ SET_KICK_STATE(worker, KICKED);
grpc_closure_list_move(&worker->schedule_on_end_work,
&exec_ctx->closure_list);
if (gpr_atm_no_barrier_load(&g_active_poller) == (gpr_atm)worker) {
if (worker->next != worker && worker->next->kick_state == UNKICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. choose next poller to be peer %p", worker);
+ }
GPR_ASSERT(worker->next->initialized_cv);
gpr_atm_no_barrier_store(&g_active_poller, (gpr_atm)worker->next);
- worker->next->kick_state = DESIGNATED_POLLER;
+ SET_KICK_STATE(worker->next, DESIGNATED_POLLER);
gpr_cv_signal(&worker->next->cv);
if (grpc_exec_ctx_has_work(exec_ctx)) {
gpr_mu_unlock(&pollset->mu);
@@ -644,9 +737,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
} else {
gpr_atm_no_barrier_store(&g_active_poller, 0);
- gpr_mu_unlock(&pollset->mu);
size_t poller_neighbourhood_idx =
(size_t)(pollset->neighbourhood - g_neighbourhoods);
+ gpr_mu_unlock(&pollset->mu);
bool found_worker = false;
bool scan_state[MAX_NEIGHBOURHOODS];
for (size_t i = 0; !found_worker && i < g_num_neighbourhoods; i++) {
@@ -682,6 +775,9 @@ static void end_worker(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
if (worker->initialized_cv) {
gpr_cv_destroy(&worker->cv);
}
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_DEBUG, " .. remove worker");
+ }
if (EMPTIED == worker_remove(pollset, worker)) {
pollset_maybe_finish_shutdown(exec_ctx, pollset);
}
@@ -702,16 +798,18 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
pollset->kicked_without_poller = false;
return GRPC_ERROR_NONE;
}
- gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
if (begin_worker(pollset, &worker, worker_hdl, &now, deadline)) {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
- GPR_ASSERT(!pollset->shutdown_closure);
+ GPR_ASSERT(!pollset->shutting_down);
GPR_ASSERT(!pollset->seen_inactive);
gpr_mu_unlock(&pollset->mu);
append_error(&error, pollset_epoll(exec_ctx, pollset, now, deadline),
err_desc);
gpr_mu_lock(&pollset->mu);
gpr_tls_set(&g_current_thread_worker, 0);
+ } else {
+ gpr_tls_set(&g_current_thread_pollset, (intptr_t)pollset);
}
end_worker(exec_ctx, pollset, &worker, worker_hdl);
gpr_tls_set(&g_current_thread_pollset, 0);
@@ -720,46 +818,136 @@ static grpc_error *pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
static grpc_error *pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_strvec log;
+ gpr_strvec_init(&log);
+ char *tmp;
+ gpr_asprintf(
+ &tmp, "PS:%p KICK:%p curps=%p curworker=%p root=%p", pollset,
+ specific_worker, (void *)gpr_tls_get(&g_current_thread_pollset),
+ (void *)gpr_tls_get(&g_current_thread_worker), pollset->root_worker);
+ gpr_strvec_add(&log, tmp);
+ if (pollset->root_worker != NULL) {
+ gpr_asprintf(&tmp, " {kick_state=%s next=%p {kick_state=%s}}",
+ kick_state_string(pollset->root_worker->kick_state),
+ pollset->root_worker->next,
+ kick_state_string(pollset->root_worker->next->kick_state));
+ gpr_strvec_add(&log, tmp);
+ }
+ if (specific_worker != NULL) {
+ gpr_asprintf(&tmp, " worker_kick_state=%s",
+ kick_state_string(specific_worker->kick_state));
+ gpr_strvec_add(&log, tmp);
+ }
+ tmp = gpr_strvec_flatten(&log, NULL);
+ gpr_strvec_destroy(&log);
+ gpr_log(GPR_ERROR, "%s", tmp);
+ gpr_free(tmp);
+ }
if (specific_worker == NULL) {
if (gpr_tls_get(&g_current_thread_pollset) != (intptr_t)pollset) {
grpc_pollset_worker *root_worker = pollset->root_worker;
if (root_worker == NULL) {
pollset->kicked_without_poller = true;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked_without_poller");
+ }
return GRPC_ERROR_NONE;
}
grpc_pollset_worker *next_worker = root_worker->next;
- if (root_worker == next_worker &&
- root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
- &g_active_poller)) {
- root_worker->kick_state = KICKED;
+ if (root_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. already kicked %p", root_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
+ return GRPC_ERROR_NONE;
+ } else if (next_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. already kicked %p", next_worker);
+ }
+ SET_KICK_STATE(next_worker, KICKED);
+ return GRPC_ERROR_NONE;
+ } else if (root_worker ==
+ next_worker && // only try and wake up a poller if
+ // there is no next worker
+ root_worker == (grpc_pollset_worker *)gpr_atm_no_barrier_load(
+ &g_active_poller)) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked %p", root_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (next_worker->kick_state == UNKICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked %p", next_worker);
+ }
GPR_ASSERT(next_worker->initialized_cv);
- next_worker->kick_state = KICKED;
+ SET_KICK_STATE(next_worker, KICKED);
gpr_cv_signal(&next_worker->cv);
return GRPC_ERROR_NONE;
+ } else if (next_worker->kick_state == DESIGNATED_POLLER) {
+ if (root_worker->kick_state != DESIGNATED_POLLER) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(
+ GPR_ERROR,
+ " .. kicked root non-poller %p (initialized_cv=%d) (poller=%p)",
+ root_worker, root_worker->initialized_cv, next_worker);
+ }
+ SET_KICK_STATE(root_worker, KICKED);
+ if (root_worker->initialized_cv) {
+ gpr_cv_signal(&root_worker->cv);
+ }
+ return GRPC_ERROR_NONE;
+ } else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. non-root poller %p (root=%p)", next_worker,
+ root_worker);
+ }
+ SET_KICK_STATE(next_worker, KICKED);
+ return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
+ }
} else {
+ GPR_ASSERT(next_worker->kick_state == KICKED);
+ SET_KICK_STATE(next_worker, KICKED);
return GRPC_ERROR_NONE;
}
} else {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kicked while waking up");
+ }
return GRPC_ERROR_NONE;
}
} else if (specific_worker->kick_state == KICKED) {
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. specific worker already kicked");
+ }
return GRPC_ERROR_NONE;
} else if (gpr_tls_get(&g_current_thread_worker) ==
(intptr_t)specific_worker) {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. mark %p kicked", specific_worker);
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
} else if (specific_worker ==
(grpc_pollset_worker *)gpr_atm_no_barrier_load(&g_active_poller)) {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kick active poller");
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
return grpc_wakeup_fd_wakeup(&global_wakeup_fd);
} else if (specific_worker->initialized_cv) {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kick waiting worker");
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
gpr_cv_signal(&specific_worker->cv);
return GRPC_ERROR_NONE;
} else {
- specific_worker->kick_state = KICKED;
+ if (GRPC_TRACER_ON(grpc_polling_trace)) {
+ gpr_log(GPR_ERROR, " .. kick non-waiting worker");
+ }
+ SET_KICK_STATE(specific_worker, KICKED);
return GRPC_ERROR_NONE;
}
}
@@ -805,6 +993,7 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
+ close(g_epfd);
}
static const grpc_event_engine_vtable vtable = {
@@ -841,9 +1030,6 @@ static const grpc_event_engine_vtable vtable = {
/* It is possible that GLIBC has epoll but the underlying kernel doesn't.
* Create a dummy epoll_fd to make sure epoll support is available */
const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
- /* TODO(ctiller): temporary, until this stabilizes */
- if (!explicit_request) return NULL;
-
if (!grpc_has_wakeup_fd()) {
return NULL;
}
@@ -862,6 +1048,8 @@ const grpc_event_engine_vtable *grpc_init_epoll1_linux(bool explicit_request) {
return NULL;
}
+ gpr_log(GPR_ERROR, "grpc epoll fd: %d", g_epfd);
+
return &vtable;
}
diff --git a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
index 27b4892d1d..5166dc2ac2 100644
--- a/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_limited_pollers_linux.c
@@ -1007,12 +1007,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -1223,7 +1223,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -1235,7 +1235,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
index 49be72c03e..1058f69a83 100644
--- a/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
+++ b/src/core/lib/iomgr/ev_epoll_thread_pool_linux.c
@@ -560,12 +560,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -696,11 +696,11 @@ static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
}
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_epoll_set(grpc_exec_ctx *exec_ctx, grpc_pollset *ps,
diff --git a/src/core/lib/iomgr/ev_epollex_linux.c b/src/core/lib/iomgr/ev_epollex_linux.c
index 5690431759..a2fa4ad9a5 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.c
+++ b/src/core/lib/iomgr/ev_epollex_linux.c
@@ -438,12 +438,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -710,7 +710,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -722,7 +722,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static grpc_error *fd_become_pollable_locked(grpc_fd *fd) {
@@ -1049,8 +1049,8 @@ static grpc_error *pollset_add_fd_locked(grpc_exec_ctx *exec_ctx,
/* Introduce a spurious completion.
If we do not, then it may be that the fd-specific epoll set consumed
a completion without being polled, leading to a missed edge going up. */
- grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure);
- grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &had_fd->read_closure, "read");
+ grpc_lfev_set_ready(exec_ctx, &had_fd->write_closure, "write");
pollset_kick_all(exec_ctx, pollset);
pollset->current_pollable = &pollset->pollable;
if (append_error(&error, pollable_materialize(&pollset->pollable),
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.c b/src/core/lib/iomgr/ev_epollsig_linux.c
index 0d969dccce..a84c208e1a 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.c
+++ b/src/core/lib/iomgr/ev_epollsig_linux.c
@@ -933,12 +933,12 @@ static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_error *why) {
static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->read_closure, closure, "read");
}
static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_closure *closure) {
- grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure);
+ grpc_lfev_notify_on(exec_ctx, &fd->write_closure, closure, "write");
}
/*******************************************************************************
@@ -1115,7 +1115,7 @@ static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
grpc_pollset *notifier) {
- grpc_lfev_set_ready(exec_ctx, &fd->read_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->read_closure, "read");
/* Note, it is possible that fd_become_readable might be called twice with
different 'notifier's when an fd becomes readable and it is in two epoll
@@ -1127,7 +1127,7 @@ static void fd_become_readable(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
}
static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
- grpc_lfev_set_ready(exec_ctx, &fd->write_closure);
+ grpc_lfev_set_ready(exec_ctx, &fd->write_closure, "write");
}
static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx,
@@ -1731,7 +1731,7 @@ const grpc_event_engine_vtable *grpc_init_epollsig_linux(
if (!is_grpc_wakeup_signal_initialized) {
/* TODO(ctiller): when other epoll engines are ready, remove the true || to
* force this to be explitly chosen if needed */
- if (true || explicit_request) {
+ if (explicit_request) {
grpc_use_signal(SIGRTMIN + 6);
} else {
return NULL;
diff --git a/src/core/lib/iomgr/iomgr_uv.c b/src/core/lib/iomgr/iomgr_uv.c
index 8b1245c640..df5d23af3b 100644
--- a/src/core/lib/iomgr/iomgr_uv.c
+++ b/src/core/lib/iomgr/iomgr_uv.c
@@ -21,12 +21,20 @@
#ifdef GRPC_UV
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/executor.h"
+#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/pollset_uv.h"
#include "src/core/lib/iomgr/tcp_uv.h"
+gpr_thd_id g_init_thread;
+
void grpc_iomgr_platform_init(void) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_pollset_global_init();
grpc_register_tracer(&grpc_tcp_trace);
+ grpc_executor_set_threading(&exec_ctx, false);
+ g_init_thread = gpr_thd_currentid();
+ grpc_exec_ctx_finish(&exec_ctx);
}
void grpc_iomgr_platform_flush(void) {}
void grpc_iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
diff --git a/src/core/lib/iomgr/iomgr_uv.h b/src/core/lib/iomgr/iomgr_uv.h
new file mode 100644
index 0000000000..3b4daaa73b
--- /dev/null
+++ b/src/core/lib/iomgr/iomgr_uv.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright 2017 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_IOMGR_UV_H
+#define GRPC_CORE_LIB_IOMGR_IOMGR_UV_H
+
+#include "src/core/lib/iomgr/iomgr_internal.h"
+
+#include <grpc/support/thd.h>
+
+/* The thread ID of the thread on which grpc was initialized. Used to verify
+ * that all calls into libuv are made on that same thread */
+extern gpr_thd_id g_init_thread;
+
+#ifdef GRPC_UV_THREAD_CHECK
+#define GRPC_UV_ASSERT_SAME_THREAD() \
+ GPR_ASSERT(gpr_thd_currentid() == g_init_thread)
+#else
+#define GRPC_UV_ASSERT_SAME_THREAD()
+#endif /* GRPC_UV_THREAD_CHECK */
+
+#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_UV_H */
diff --git a/src/core/lib/iomgr/lockfree_event.c b/src/core/lib/iomgr/lockfree_event.c
index c2ceecb3c5..f967b22ba9 100644
--- a/src/core/lib/iomgr/lockfree_event.c
+++ b/src/core/lib/iomgr/lockfree_event.c
@@ -79,12 +79,12 @@ bool grpc_lfev_is_shutdown(gpr_atm *state) {
}
void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
- grpc_closure *closure) {
+ grpc_closure *closure, const char *variable) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "lfev_notify_on: %p curr=%p closure=%p", state,
- (void *)curr, closure);
+ gpr_log(GPR_ERROR, "lfev_notify_on[%s]: %p curr=%p closure=%p", variable,
+ state, (void *)curr, closure);
}
switch (curr) {
case CLOSURE_NOT_READY: {
@@ -149,7 +149,7 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "lfev_set_shutdown: %p curr=%p err=%s", state,
+ gpr_log(GPR_ERROR, "lfev_set_shutdown: %p curr=%p err=%s", state,
(void *)curr, grpc_error_string(shutdown_err));
}
switch (curr) {
@@ -193,12 +193,14 @@ bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
GPR_UNREACHABLE_CODE(return false);
}
-void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state) {
+void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state,
+ const char *variable) {
while (true) {
gpr_atm curr = gpr_atm_no_barrier_load(state);
if (GRPC_TRACER_ON(grpc_polling_trace)) {
- gpr_log(GPR_DEBUG, "lfev_set_ready: %p curr=%p", state, (void *)curr);
+ gpr_log(GPR_ERROR, "lfev_set_ready[%s]: %p curr=%p", variable, state,
+ (void *)curr);
}
switch (curr) {
diff --git a/src/core/lib/iomgr/lockfree_event.h b/src/core/lib/iomgr/lockfree_event.h
index ef3844a07a..6a14a0f3b2 100644
--- a/src/core/lib/iomgr/lockfree_event.h
+++ b/src/core/lib/iomgr/lockfree_event.h
@@ -30,10 +30,11 @@ void grpc_lfev_destroy(gpr_atm *state);
bool grpc_lfev_is_shutdown(gpr_atm *state);
void grpc_lfev_notify_on(grpc_exec_ctx *exec_ctx, gpr_atm *state,
- grpc_closure *closure);
+ grpc_closure *closure, const char *variable);
/* Returns true on first successful shutdown */
bool grpc_lfev_set_shutdown(grpc_exec_ctx *exec_ctx, gpr_atm *state,
grpc_error *shutdown_err);
-void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state);
+void grpc_lfev_set_ready(grpc_exec_ctx *exec_ctx, gpr_atm *state,
+ const char *variable);
#endif /* GRPC_CORE_LIB_IOMGR_LOCKFREE_EVENT_H */
diff --git a/src/core/lib/iomgr/pollset_uv.c b/src/core/lib/iomgr/pollset_uv.c
index 946f0c8c60..a79fe89d3e 100644
--- a/src/core/lib/iomgr/pollset_uv.c
+++ b/src/core/lib/iomgr/pollset_uv.c
@@ -28,6 +28,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_uv.h"
@@ -70,6 +71,7 @@ void grpc_pollset_global_init(void) {
}
void grpc_pollset_global_shutdown(void) {
+ GRPC_UV_ASSERT_SAME_THREAD();
gpr_mu_destroy(&grpc_polling_mu);
uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb);
}
@@ -79,6 +81,7 @@ static void timer_run_cb(uv_timer_t *timer) {}
static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }
void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ GRPC_UV_ASSERT_SAME_THREAD();
*mu = &grpc_polling_mu;
uv_timer_init(uv_default_loop(), &pollset->timer);
pollset->shutting_down = 0;
@@ -87,6 +90,7 @@ void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_closure *closure) {
GPR_ASSERT(!pollset->shutting_down);
+ GRPC_UV_ASSERT_SAME_THREAD();
pollset->shutting_down = 1;
if (grpc_pollset_work_run_loop) {
// Drain any pending UV callbacks without blocking
@@ -99,6 +103,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
}
void grpc_pollset_destroy(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
+ GRPC_UV_ASSERT_SAME_THREAD();
uv_close((uv_handle_t *)&pollset->timer, timer_close_cb);
// timer.data is a boolean indicating that the timer has finished closing
pollset->timer.data = (void *)0;
@@ -113,6 +118,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
uint64_t timeout;
+ GRPC_UV_ASSERT_SAME_THREAD();
gpr_mu_unlock(&grpc_polling_mu);
if (grpc_pollset_work_run_loop) {
if (gpr_time_cmp(deadline, now) >= 0) {
@@ -141,6 +147,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
+ GRPC_UV_ASSERT_SAME_THREAD();
uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
return GRPC_ERROR_NONE;
}
diff --git a/src/core/lib/iomgr/resolve_address_uv.c b/src/core/lib/iomgr/resolve_address_uv.c
index a98b8e62db..2d438e8b48 100644
--- a/src/core/lib/iomgr/resolve_address_uv.c
+++ b/src/core/lib/iomgr/resolve_address_uv.c
@@ -30,6 +30,7 @@
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/resolve_address.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
@@ -114,11 +115,14 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *error;
int retry_status;
+ char *port = r->port;
gpr_free(req);
retry_status = retry_named_port_failure(status, r, getaddrinfo_callback);
if (retry_status == 0) {
- // The request is being retried. Nothing should be done here
+ /* The request is being retried. It is using its own port string, so we free
+ * the original one */
+ gpr_free(port);
return;
}
/* Either no retry was attempted, or the retry failed. Either way, the
@@ -171,6 +175,8 @@ static grpc_error *blocking_resolve_address_impl(
grpc_error *err;
int retry_status;
+ GRPC_UV_ASSERT_SAME_THREAD();
+
req.addrinfo = NULL;
err = try_split_host_port(name, default_port, &host, &port);
@@ -218,16 +224,19 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
grpc_pollset_set *interested_parties,
grpc_closure *on_done,
grpc_resolved_addresses **addrs) {
- uv_getaddrinfo_t *req;
- request *r;
- struct addrinfo *hints;
- char *host;
- char *port;
+ uv_getaddrinfo_t *req = NULL;
+ request *r = NULL;
+ struct addrinfo *hints = NULL;
+ char *host = NULL;
+ char *port = NULL;
grpc_error *err;
int s;
+ GRPC_UV_ASSERT_SAME_THREAD();
err = try_split_host_port(name, default_port, &host, &port);
if (err != GRPC_ERROR_NONE) {
GRPC_CLOSURE_SCHED(exec_ctx, on_done, err);
+ gpr_free(host);
+ gpr_free(port);
return;
}
r = gpr_malloc(sizeof(request));
diff --git a/src/core/lib/iomgr/sockaddr_utils.c b/src/core/lib/iomgr/sockaddr_utils.c
index 99dc2f1c78..3f4145d104 100644
--- a/src/core/lib/iomgr/sockaddr_utils.c
+++ b/src/core/lib/iomgr/sockaddr_utils.c
@@ -220,6 +220,11 @@ const char *grpc_sockaddr_get_uri_scheme(
return NULL;
}
+int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr) {
+ const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
+ return addr->sa_family;
+}
+
int grpc_sockaddr_get_port(const grpc_resolved_address *resolved_addr) {
const struct sockaddr *addr = (const struct sockaddr *)resolved_addr->addr;
switch (addr->sa_family) {
diff --git a/src/core/lib/iomgr/sockaddr_utils.h b/src/core/lib/iomgr/sockaddr_utils.h
index 7692b969f2..a589a19705 100644
--- a/src/core/lib/iomgr/sockaddr_utils.h
+++ b/src/core/lib/iomgr/sockaddr_utils.h
@@ -75,4 +75,6 @@ char *grpc_sockaddr_to_uri(const grpc_resolved_address *addr);
/* Returns the URI scheme corresponding to \a addr */
const char *grpc_sockaddr_get_uri_scheme(const grpc_resolved_address *addr);
+int grpc_sockaddr_get_family(const grpc_resolved_address *resolved_addr);
+
#endif /* GRPC_CORE_LIB_IOMGR_SOCKADDR_UTILS_H */
diff --git a/src/core/lib/iomgr/tcp_client_uv.c b/src/core/lib/iomgr/tcp_client_uv.c
index 2f1d237c07..786c456b73 100644
--- a/src/core/lib/iomgr/tcp_client_uv.c
+++ b/src/core/lib/iomgr/tcp_client_uv.c
@@ -26,6 +26,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/tcp_uv.h"
@@ -124,6 +125,8 @@ static void tcp_client_connect_impl(grpc_exec_ctx *exec_ctx,
(void)channel_args;
(void)interested_parties;
+ GRPC_UV_ASSERT_SAME_THREAD();
+
if (channel_args != NULL) {
for (size_t i = 0; i < channel_args->num_args; i++) {
if (0 == strcmp(channel_args->args[i].key, GRPC_ARG_RESOURCE_QUOTA)) {
diff --git a/src/core/lib/iomgr/tcp_server_uv.c b/src/core/lib/iomgr/tcp_server_uv.c
index 2ab836cc34..3b9332321f 100644
--- a/src/core/lib/iomgr/tcp_server_uv.c
+++ b/src/core/lib/iomgr/tcp_server_uv.c
@@ -20,6 +20,7 @@
#ifdef GRPC_UV
+#include <assert.h>
#include <string.h>
#include <grpc/support/alloc.h>
@@ -27,6 +28,7 @@
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/sockaddr.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
#include "src/core/lib/iomgr/tcp_server.h"
@@ -43,6 +45,8 @@ struct grpc_tcp_listener {
struct grpc_tcp_listener *next;
bool closed;
+
+ bool has_pending_connection;
};
struct grpc_tcp_server {
@@ -104,6 +108,7 @@ grpc_error *grpc_tcp_server_create(grpc_exec_ctx *exec_ctx,
}
grpc_tcp_server *grpc_tcp_server_ref(grpc_tcp_server *s) {
+ GRPC_UV_ASSERT_SAME_THREAD();
gpr_ref(&s->refs);
return s;
}
@@ -168,6 +173,7 @@ static void tcp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
+ GRPC_UV_ASSERT_SAME_THREAD();
if (gpr_unref(&s->refs)) {
/* Complete shutdown_starting work before destroying. */
grpc_exec_ctx local_exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -183,18 +189,49 @@ void grpc_tcp_server_unref(grpc_exec_ctx *exec_ctx, grpc_tcp_server *s) {
}
}
-static void accepted_connection_close_cb(uv_handle_t *handle) {
- gpr_free(handle);
-}
-
-static void on_connect(uv_stream_t *server, int status) {
- grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
+static void finish_accept(grpc_exec_ctx *exec_ctx, grpc_tcp_listener *sp) {
+ grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
uv_tcp_t *client;
grpc_endpoint *ep = NULL;
- grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_resolved_address peer_name;
char *peer_name_string;
int err;
+ uv_tcp_t *server = sp->handle;
+
+ client = gpr_malloc(sizeof(uv_tcp_t));
+ uv_tcp_init(uv_default_loop(), client);
+ // UV documentation says this is guaranteed to succeed
+ uv_accept((uv_stream_t *)server, (uv_stream_t *)client);
+ peer_name_string = NULL;
+ memset(&peer_name, 0, sizeof(grpc_resolved_address));
+ peer_name.len = sizeof(struct sockaddr_storage);
+ err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr,
+ (int *)&peer_name.len);
+ if (err == 0) {
+ peer_name_string = grpc_sockaddr_to_uri(&peer_name);
+ } else {
+ gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(err));
+ }
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ if (peer_name_string) {
+ gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection: %s",
+ sp->server, peer_name_string);
+ } else {
+ gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p accepted connection", sp->server);
+ }
+ }
+ ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
+ acceptor->from_server = sp->server;
+ acceptor->port_index = sp->port_index;
+ acceptor->fd_index = 0;
+ sp->server->on_accept_cb(exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
+ acceptor);
+ gpr_free(peer_name_string);
+}
+
+static void on_connect(uv_stream_t *server, int status) {
+ grpc_tcp_listener *sp = (grpc_tcp_listener *)server->data;
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (status < 0) {
switch (status) {
@@ -207,35 +244,19 @@ static void on_connect(uv_stream_t *server, int status) {
}
}
- client = gpr_malloc(sizeof(uv_tcp_t));
- uv_tcp_init(uv_default_loop(), client);
- // UV documentation says this is guaranteed to succeed
- uv_accept((uv_stream_t *)server, (uv_stream_t *)client);
- // If the server has not been started, we discard incoming connections
- if (sp->server->on_accept_cb == NULL) {
- uv_close((uv_handle_t *)client, accepted_connection_close_cb);
+ GPR_ASSERT(!sp->has_pending_connection);
+
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "SERVER_CONNECT: %p incoming connection", sp->server);
+ }
+
+ // Create acceptor.
+ if (sp->server->on_accept_cb) {
+ finish_accept(&exec_ctx, sp);
} else {
- peer_name_string = NULL;
- memset(&peer_name, 0, sizeof(grpc_resolved_address));
- peer_name.len = sizeof(struct sockaddr_storage);
- err = uv_tcp_getpeername(client, (struct sockaddr *)&peer_name.addr,
- (int *)&peer_name.len);
- if (err == 0) {
- peer_name_string = grpc_sockaddr_to_uri(&peer_name);
- } else {
- gpr_log(GPR_INFO, "uv_tcp_getpeername error: %s", uv_strerror(status));
- }
- ep = grpc_tcp_create(client, sp->server->resource_quota, peer_name_string);
- // Create acceptor.
- grpc_tcp_server_acceptor *acceptor = gpr_malloc(sizeof(*acceptor));
- acceptor->from_server = sp->server;
- acceptor->port_index = sp->port_index;
- acceptor->fd_index = 0;
- sp->server->on_accept_cb(&exec_ctx, sp->server->on_accept_cb_arg, ep, NULL,
- acceptor);
- grpc_exec_ctx_finish(&exec_ctx);
- gpr_free(peer_name_string);
+ sp->has_pending_connection = true;
}
+ grpc_exec_ctx_finish(&exec_ctx);
}
static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle,
@@ -282,7 +303,7 @@ static grpc_error *add_socket_to_server(grpc_tcp_server *s, uv_tcp_t *handle,
GPR_ASSERT(port >= 0);
GPR_ASSERT(!s->on_accept_cb && "must add ports before starting server");
- sp = gpr_malloc(sizeof(grpc_tcp_listener));
+ sp = gpr_zalloc(sizeof(grpc_tcp_listener));
sp->next = NULL;
if (s->head == NULL) {
s->head = sp;
@@ -316,6 +337,9 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
unsigned port_index = 0;
int status;
grpc_error *error = GRPC_ERROR_NONE;
+ int family;
+
+ GRPC_UV_ASSERT_SAME_THREAD();
if (s->tail != NULL) {
port_index = s->tail->port_index + 1;
@@ -353,7 +377,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
}
handle = gpr_malloc(sizeof(uv_tcp_t));
- status = uv_tcp_init(uv_default_loop(), handle);
+
+ family = grpc_sockaddr_get_family(addr);
+ status = uv_tcp_init_ex(uv_default_loop(), handle, (unsigned int)family);
+#if defined(GPR_LINUX) && defined(SO_REUSEPORT)
+ if (family == AF_INET || family == AF_INET6) {
+ int fd;
+ uv_fileno((uv_handle_t *)handle, &fd);
+ int enable = 1;
+ setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable));
+ }
+#endif /* GPR_LINUX && SO_REUSEPORT */
+
if (status == 0) {
error = add_socket_to_server(s, handle, addr, port_index, &sp);
} else {
@@ -366,6 +401,18 @@ grpc_error *grpc_tcp_server_add_port(grpc_tcp_server *s,
gpr_free(allocated_addr);
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ char *port_string;
+ grpc_sockaddr_to_string(&port_string, addr, 0);
+ const char *str = grpc_error_string(error);
+ if (port_string) {
+ gpr_log(GPR_DEBUG, "SERVER %p add_port %s error=%s", s, port_string, str);
+ gpr_free(port_string);
+ } else {
+ gpr_log(GPR_DEBUG, "SERVER %p add_port error=%s", s, str);
+ }
+ }
+
if (error != GRPC_ERROR_NONE) {
grpc_error *error_out = GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
"Failed to add port to server", &error, 1);
@@ -385,13 +432,19 @@ void grpc_tcp_server_start(grpc_exec_ctx *exec_ctx, grpc_tcp_server *server,
grpc_tcp_listener *sp;
(void)pollsets;
(void)pollset_count;
+ GRPC_UV_ASSERT_SAME_THREAD();
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ gpr_log(GPR_DEBUG, "SERVER_START %p", server);
+ }
GPR_ASSERT(on_accept_cb);
GPR_ASSERT(!server->on_accept_cb);
server->on_accept_cb = on_accept_cb;
server->on_accept_cb_arg = cb_arg;
for (sp = server->head; sp; sp = sp->next) {
- GPR_ASSERT(uv_listen((uv_stream_t *)sp->handle, SOMAXCONN, on_connect) ==
- 0);
+ if (sp->has_pending_connection) {
+ finish_accept(exec_ctx, sp);
+ sp->has_pending_connection = false;
+ }
}
}
diff --git a/src/core/lib/iomgr/tcp_uv.c b/src/core/lib/iomgr/tcp_uv.c
index 7c6a9b85f5..a05c19b4ac 100644
--- a/src/core/lib/iomgr/tcp_uv.c
+++ b/src/core/lib/iomgr/tcp_uv.c
@@ -30,6 +30,7 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/iomgr/error.h"
+#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/network_status_tracker.h"
#include "src/core/lib/iomgr/resource_quota.h"
#include "src/core/lib/iomgr/tcp_uv.h"
@@ -183,6 +184,7 @@ static void uv_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_tcp *tcp = (grpc_tcp *)ep;
int status;
grpc_error *error = GRPC_ERROR_NONE;
+ GRPC_UV_ASSERT_SAME_THREAD();
GPR_ASSERT(tcp->read_cb == NULL);
tcp->read_cb = cb;
tcp->read_slices = read_slices;
@@ -236,6 +238,7 @@ static void uv_endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
unsigned int i;
grpc_slice *slice;
uv_write_t *write_req;
+ GRPC_UV_ASSERT_SAME_THREAD();
if (GRPC_TRACER_ON(grpc_tcp_trace)) {
size_t j;
@@ -307,6 +310,10 @@ static void uv_endpoint_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
grpc_error *why) {
grpc_tcp *tcp = (grpc_tcp *)ep;
if (!tcp->shutting_down) {
+ if (GRPC_TRACER_ON(grpc_tcp_trace)) {
+ const char *str = grpc_error_string(why);
+ gpr_log(GPR_DEBUG, "TCP %p shutdown why=%s", tcp->handle, str);
+ }
tcp->shutting_down = true;
uv_shutdown_t *req = &tcp->shutdown_req;
uv_shutdown(req, (uv_stream_t *)tcp->handle, shutdown_callback);
diff --git a/src/core/lib/iomgr/timer_uv.c b/src/core/lib/iomgr/timer_uv.c
index 1ab82ef1d5..70f49bcbe8 100644
--- a/src/core/lib/iomgr/timer_uv.c
+++ b/src/core/lib/iomgr/timer_uv.c
@@ -24,6 +24,7 @@
#include <grpc/support/log.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/iomgr_uv.h"
#include "src/core/lib/iomgr/timer.h"
#include <uv.h>
@@ -43,6 +44,7 @@ static void stop_uv_timer(uv_timer_t *handle) {
void run_expired_timer(uv_timer_t *handle) {
grpc_timer *timer = (grpc_timer *)handle->data;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ GRPC_UV_ASSERT_SAME_THREAD();
GPR_ASSERT(timer->pending);
timer->pending = 0;
GRPC_CLOSURE_SCHED(&exec_ctx, timer->closure, GRPC_ERROR_NONE);
@@ -55,6 +57,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
gpr_timespec now) {
uint64_t timeout;
uv_timer_t *uv_timer;
+ GRPC_UV_ASSERT_SAME_THREAD();
timer->closure = closure;
if (gpr_time_cmp(deadline, now) <= 0) {
timer->pending = 0;
@@ -75,6 +78,7 @@ void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
}
void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
+ GRPC_UV_ASSERT_SAME_THREAD();
if (timer->pending) {
timer->pending = 0;
GRPC_CLOSURE_SCHED(exec_ctx, timer->closure, GRPC_ERROR_CANCELLED);
diff --git a/src/core/lib/support/env.h b/src/core/lib/support/env.h
index 18bc08ac62..e2c012a728 100644
--- a/src/core/lib/support/env.h
+++ b/src/core/lib/support/env.h
@@ -36,6 +36,12 @@ char *gpr_getenv(const char *name);
/* Sets the the environment with the specified name to the specified value. */
void gpr_setenv(const char *name, const char *value);
+/* This is a version of gpr_getenv that does not produce any output if it has to
+ use an insecure version of the function. It is ONLY to be used to solve the
+ problem in which we need to check an env variable to configure the verbosity
+ level of logging. So DO NOT USE THIS. */
+const char *gpr_getenv_silent(const char *name, char **dst);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/lib/support/env_linux.c b/src/core/lib/support/env_linux.c
index 0c79a2c401..4c45a977ca 100644
--- a/src/core/lib/support/env_linux.c
+++ b/src/core/lib/support/env_linux.c
@@ -38,7 +38,9 @@
#include "src/core/lib/support/string.h"
-char *gpr_getenv(const char *name) {
+const char *gpr_getenv_silent(const char *name, char **dst) {
+ const char *insecure_func_used = NULL;
+ char *result = NULL;
#if defined(GPR_BACKWARDS_COMPATIBILITY_MODE)
typedef char *(*getenv_type)(const char *);
static getenv_type getenv_func = NULL;
@@ -48,22 +50,28 @@ char *gpr_getenv(const char *name) {
for (size_t i = 0; getenv_func == NULL && i < GPR_ARRAY_SIZE(names); i++) {
getenv_func = (getenv_type)dlsym(RTLD_DEFAULT, names[i]);
if (getenv_func != NULL && strstr(names[i], "secure") == NULL) {
- gpr_log(GPR_DEBUG,
- "Warning: insecure environment read function '%s' used",
- names[i]);
+ insecure_func_used = names[i];
}
}
- char *result = getenv_func(name);
- return result == NULL ? result : gpr_strdup(result);
+ result = getenv_func(name);
#elif __GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 17)
- char *result = secure_getenv(name);
- return result == NULL ? result : gpr_strdup(result);
+ result = secure_getenv(name);
#else
- gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used",
- "getenv");
- char *result = getenv(name);
- return result == NULL ? result : gpr_strdup(result);
+ result = getenv(name);
+ insecure_func_used = "getenv";
#endif
+ *dst = result == NULL ? result : gpr_strdup(result);
+ return insecure_func_used;
+}
+
+char *gpr_getenv(const char *name) {
+ char *result = NULL;
+ const char *insecure_func_used = gpr_getenv_silent(name, &result);
+ if (insecure_func_used != NULL) {
+ gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used",
+ insecure_func_used);
+ }
+ return result;
}
void gpr_setenv(const char *name, const char *value) {
diff --git a/src/core/lib/support/env_posix.c b/src/core/lib/support/env_posix.c
index bdbc4da95a..b88822ca02 100644
--- a/src/core/lib/support/env_posix.c
+++ b/src/core/lib/support/env_posix.c
@@ -29,6 +29,11 @@
#include <grpc/support/string_util.h>
#include "src/core/lib/support/string.h"
+const char *gpr_getenv_silent(const char *name, char **dst) {
+ *dst = gpr_getenv(name);
+ return NULL;
+}
+
char *gpr_getenv(const char *name) {
char *result = getenv(name);
return result == NULL ? result : gpr_strdup(result);
diff --git a/src/core/lib/support/env_windows.c b/src/core/lib/support/env_windows.c
index c1d557e219..652eeb61c6 100644
--- a/src/core/lib/support/env_windows.c
+++ b/src/core/lib/support/env_windows.c
@@ -30,6 +30,11 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+const char *gpr_getenv_silent(const char *name, char **dst) {
+ *dst = gpr_getenv(name);
+ return NULL;
+}
+
char *gpr_getenv(const char *name) {
char *result = NULL;
DWORD size;
diff --git a/src/core/lib/support/log.c b/src/core/lib/support/log.c
index bcc336b8ae..fadb4d9a2c 100644
--- a/src/core/lib/support/log.c
+++ b/src/core/lib/support/log.c
@@ -64,7 +64,8 @@ void gpr_set_log_verbosity(gpr_log_severity min_severity_to_print) {
}
void gpr_log_verbosity_init() {
- char *verbosity = gpr_getenv("GRPC_VERBOSITY");
+ char *verbosity = NULL;
+ const char *insecure_getenv = gpr_getenv_silent("GRPC_VERBOSITY", &verbosity);
gpr_atm min_severity_to_print = GPR_LOG_SEVERITY_ERROR;
if (verbosity != NULL) {
@@ -81,6 +82,11 @@ void gpr_log_verbosity_init() {
GPR_LOG_VERBOSITY_UNSET) {
gpr_atm_no_barrier_store(&g_min_severity_to_print, min_severity_to_print);
}
+
+ if (insecure_getenv != NULL) {
+ gpr_log(GPR_DEBUG, "Warning: insecure environment read function '%s' used",
+ insecure_getenv);
+ }
}
void gpr_set_log_function(gpr_log_func f) {
diff --git a/src/cpp/server/create_default_thread_pool.cc b/src/cpp/server/create_default_thread_pool.cc
index 17ad331c9c..8ca3e32c2f 100644
--- a/src/cpp/server/create_default_thread_pool.cc
+++ b/src/cpp/server/create_default_thread_pool.cc
@@ -23,13 +23,22 @@
#ifndef GRPC_CUSTOM_DEFAULT_THREAD_POOL
namespace grpc {
+namespace {
-ThreadPoolInterface* CreateDefaultThreadPool() {
+ThreadPoolInterface* CreateDefaultThreadPoolImpl() {
int cores = gpr_cpu_num_cores();
if (!cores) cores = 4;
return new DynamicThreadPool(cores);
}
+CreateThreadPoolFunc g_ctp_impl = CreateDefaultThreadPoolImpl;
+
+} // namespace
+
+ThreadPoolInterface* CreateDefaultThreadPool() { return g_ctp_impl(); }
+
+void SetCreateThreadPool(CreateThreadPoolFunc func) { g_ctp_impl = func; }
+
} // namespace grpc
#endif // !GRPC_CUSTOM_DEFAULT_THREAD_POOL
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index c90f96c0b7..200e477822 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -250,14 +250,6 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
has_sync_methods && num_frequently_polled_cqs > 0;
if (has_sync_methods) {
- // This is a Sync server
- gpr_log(GPR_INFO,
- "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
- "%d, CQ timeout (msec): %d",
- sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
- sync_server_settings_.max_pollers,
- sync_server_settings_.cq_timeout_msec);
-
grpc_cq_polling_type polling_type =
is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;
@@ -272,6 +264,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
sync_server_settings_.min_pollers, sync_server_settings_.max_pollers,
sync_server_settings_.cq_timeout_msec));
+ if (has_sync_methods) {
+ // This is a Sync server
+ gpr_log(GPR_INFO,
+ "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
+ "%d, CQ timeout (msec): %d",
+ sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
+ sync_server_settings_.max_pollers,
+ sync_server_settings_.cq_timeout_msec);
+ }
+
ServerInitializer* initializer = server->initializer();
// Register all the completion queues with the server. i.e
diff --git a/src/cpp/server/thread_pool_interface.h b/src/cpp/server/thread_pool_interface.h
index 4f4fc7eaaa..028842a776 100644
--- a/src/cpp/server/thread_pool_interface.h
+++ b/src/cpp/server/thread_pool_interface.h
@@ -32,6 +32,10 @@ class ThreadPoolInterface {
virtual void Add(const std::function<void()>& callback) = 0;
};
+// Allows different codebases to use their own thread pool impls
+typedef ThreadPoolInterface* (*CreateThreadPoolFunc)(void);
+void SetCreateThreadPool(CreateThreadPoolFunc func);
+
ThreadPoolInterface* CreateDefaultThreadPool();
} // namespace grpc
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 48016642f7..2ff2e4e8a2 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -64,6 +64,7 @@ message LoadParams {
message SecurityParams {
bool use_test_ca = 1;
string server_host_override = 2;
+ string cred_type = 3;
}
message ChannelArg {
@@ -240,6 +241,10 @@ message ScenarioResultSummary
// Number of polls called inside completion queue per request
double client_polls_per_request = 15;
double server_polls_per_request = 16;
+
+ // Queries per CPU-sec over all servers or clients
+ double server_queries_per_cpu_sec = 17;
+ double client_queries_per_cpu_sec = 18;
}
// Results of a single benchmark scenario.
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index 67c984ab49..87b29c26ea 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -40,13 +40,13 @@ end
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
- class ActiveCall
+ class ActiveCall # rubocop:disable Metrics/ClassLength
include Core::TimeConsts
include Core::CallOps
extend Forwardable
- attr_reader :deadline, :metadata_sent, :metadata_to_send
+ attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
- :peer, :peer_cert, :trailing_metadata
+ :trailing_metadata, :status
# client_invoke begins a client invocation.
#
@@ -100,6 +100,18 @@ module GRPC
fail(ArgumentError, 'Already sent md') if started && metadata_to_send
@metadata_to_send = metadata_to_send || {} unless started
@send_initial_md_mutex = Mutex.new
+
+ @output_stream_done = false
+ @input_stream_done = false
+ @call_finished = false
+ @call_finished_mu = Mutex.new
+
+ @client_call_executed = false
+ @client_call_executed_mu = Mutex.new
+
+ # set the peer now so that the accessor can still function
+ # after the server closes the call
+ @peer = call.peer
end
# Sends the initial metadata that has yet to be sent.
@@ -142,11 +154,9 @@ module GRPC
Operation.new(self)
end
- # finished waits until a client call is completed.
- #
- # It blocks until the remote endpoint acknowledges by sending a status.
- def finished
+ def receive_and_check_status
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
+ set_input_stream_done
attach_status_results_and_complete_call(batch_result)
end
@@ -155,8 +165,6 @@ module GRPC
@call.trailing_metadata = recv_status_batch_result.status.metadata
end
@call.status = recv_status_batch_result.status
- @call.close
- op_is_done
# The RECV_STATUS in run_batch always succeeds
# Check the status for a bad status or failed run batch
@@ -193,9 +201,19 @@ module GRPC
}
ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
@call.run_batch(ops)
+ set_output_stream_done
+
nil
end
+ # Intended for use on server-side calls when a single request from
+ # the client is expected (i.e., unary and server-streaming RPC types).
+ def read_unary_request
+ req = remote_read
+ set_input_stream_done
+ req
+ end
+
def server_unary_response(req, trailing_metadata: {},
code: Core::StatusCodes::OK, details: 'OK')
ops = {}
@@ -211,6 +229,7 @@ module GRPC
ops[RECV_CLOSE_ON_SERVER] = nil
@call.run_batch(ops)
+ set_output_stream_done
end
# remote_read reads a response from the remote endpoint.
@@ -241,6 +260,8 @@ module GRPC
# each_remote_read passes each response to the given block or returns an
# enumerator the responses if no block is given.
+ # Used to generate the request enumerable for
+ # server-side client-streaming RPC's.
#
# == Enumerator ==
#
@@ -258,10 +279,14 @@ module GRPC
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) unless block_given?
- loop do
- resp = remote_read
- break if resp.nil? # the last response was received
- yield resp
+ begin
+ loop do
+ resp = remote_read
+ break if resp.nil? # the last response was received
+ yield resp
+ end
+ ensure
+ set_input_stream_done
end
end
@@ -287,13 +312,17 @@ module GRPC
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
- loop do
- resp = remote_read
- if resp.nil? # the last response was received, but not finished yet
- finished
- break
+ begin
+ loop do
+ resp = remote_read
+ if resp.nil? # the last response was received
+ receive_and_check_status
+ break
+ end
+ yield resp
end
- yield resp
+ ensure
+ set_input_stream_done
end
end
@@ -305,6 +334,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def request_response(req, metadata: {})
+ raise_error_if_already_executed
ops = {
SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil,
@@ -319,7 +349,15 @@ module GRPC
end
@metadata_sent = true
end
- batch_result = @call.run_batch(ops)
+
+ begin
+ batch_result = @call.run_batch(ops)
+ # no need to check for cancellation after a CallError because this
+ # batch contains a RECV_STATUS op
+ ensure
+ set_input_stream_done
+ set_output_stream_done
+ end
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
@@ -339,10 +377,20 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def client_streamer(requests, metadata: {})
- # Metadata might have already been sent if this is an operation view
- merge_metadata_and_send_if_not_already_sent(metadata)
+ raise_error_if_already_executed
+ begin
+ merge_metadata_and_send_if_not_already_sent(metadata)
+ requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
+ rescue GRPC::Core::CallError => e
+ receive_and_check_status # check for Cancelled
+ raise e
+ rescue => e
+ set_input_stream_done
+ raise e
+ ensure
+ set_output_stream_done
+ end
- requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
batch_result = @call.run_batch(
SEND_CLOSE_FROM_CLIENT => nil,
RECV_INITIAL_METADATA => nil,
@@ -350,12 +398,11 @@ module GRPC
RECV_STATUS_ON_CLIENT => nil
)
+ set_input_stream_done
+
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
get_message_from_batch_result(batch_result)
- rescue GRPC::Core::CallError => e
- finished # checks for Cancelled
- raise e
end
# server_streamer sends one request to the GRPC server, which yields a
@@ -373,6 +420,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {})
+ raise_error_if_already_executed
ops = {
SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil
@@ -384,13 +432,22 @@ module GRPC
end
@metadata_sent = true
end
- @call.run_batch(ops)
+
+ begin
+ @call.run_batch(ops)
+ rescue GRPC::Core::CallError => e
+ receive_and_check_status # checks for Cancelled
+ raise e
+ rescue => e
+ set_input_stream_done
+ raise e
+ ensure
+ set_output_stream_done
+ end
+
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
- rescue GRPC::Core::CallError => e
- finished # checks for Cancelled
- raise e
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
@@ -421,6 +478,7 @@ module GRPC
# a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk)
+ raise_error_if_already_executed
# Metadata might have already been sent if this is an operation view
merge_metadata_and_send_if_not_already_sent(metadata)
bd = BidiCall.new(@call,
@@ -428,7 +486,10 @@ module GRPC
@unmarshal,
metadata_received: @metadata_received)
- bd.run_on_client(requests, @op_notifier, &blk)
+ bd.run_on_client(requests,
+ proc { set_input_stream_done },
+ proc { set_output_stream_done },
+ &blk)
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
@@ -449,7 +510,7 @@ module GRPC
metadata_received: @metadata_received,
req_view: MultiReqView.new(self))
- bd.run_on_server(gen_each_reply)
+ bd.run_on_server(gen_each_reply, proc { set_input_stream_done })
end
# Waits till an operation completes
@@ -459,7 +520,8 @@ module GRPC
@op_notifier.wait
end
- # Signals that an operation is done
+ # Signals that an operation is done.
+ # Only relevant on the client-side (this is a no-op on the server-side)
def op_is_done
return if @op_notifier.nil?
@op_notifier.notify(self)
@@ -484,8 +546,40 @@ module GRPC
end
end
+ def attach_peer_cert(peer_cert)
+ @peer_cert = peer_cert
+ end
+
private
+ # To be called once the "input stream" has been completelly
+ # read through (i.e, done reading from client or received status)
+ # note this is idempotent
+ def set_input_stream_done
+ @call_finished_mu.synchronize do
+ @input_stream_done = true
+ maybe_finish_and_close_call_locked
+ end
+ end
+
+ # To be called once the "output stream" has been completelly
+ # sent through (i.e, done sending from client or sent status)
+ # note this is idempotent
+ def set_output_stream_done
+ @call_finished_mu.synchronize do
+ @output_stream_done = true
+ maybe_finish_and_close_call_locked
+ end
+ end
+
+ def maybe_finish_and_close_call_locked
+ return unless @output_stream_done && @input_stream_done
+ return if @call_finished
+ @call_finished = true
+ op_is_done
+ @call.close
+ end
+
# Starts the call if not already started
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
@@ -493,6 +587,15 @@ module GRPC
merge_metadata_to_send(metadata) && send_initial_metadata
end
+ def raise_error_if_already_executed
+ @client_call_executed_mu.synchronize do
+ if @client_call_executed
+ fail GRPC::Core::CallError, 'attempting to re-run a call'
+ end
+ @client_call_executed = true
+ end
+ end
+
def self.view_class(*visible_methods)
Class.new do
extend ::Forwardable
@@ -518,6 +621,7 @@ module GRPC
# server client_streamer handlers.
MultiReqView = view_class(:cancelled?, :deadline,
:each_remote_read, :metadata, :output_metadata,
+ :peer, :peer_cert,
:send_initial_metadata,
:metadata_to_send,
:merge_metadata_to_send,
diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb
index e54cf78969..9e125cd986 100644
--- a/src/ruby/lib/grpc/generic/bidi_call.rb
+++ b/src/ruby/lib/grpc/generic/bidi_call.rb
@@ -62,12 +62,19 @@ module GRPC
# block that can be invoked with each response.
#
# @param requests the Enumerable of requests to send
- # @param op_notifier a Notifier used to signal completion
+ # @param set_input_stream_done [Proc] called back when we're done
+ # reading the input stream
+ # @param set_input_stream_done [Proc] called back when we're done
+ # sending data on the output stream
# @return an Enumerator of requests to yield
- def run_on_client(requests, op_notifier, &blk)
- @op_notifier = op_notifier
- @enq_th = Thread.new { write_loop(requests) }
- read_loop(&blk)
+ def run_on_client(requests,
+ set_input_stream_done,
+ set_output_stream_done,
+ &blk)
+ @enq_th = Thread.new do
+ write_loop(requests, set_output_stream_done: set_output_stream_done)
+ end
+ read_loop(set_input_stream_done, &blk)
end
# Begins orchestration of the Bidi stream for a server generating replies.
@@ -81,12 +88,17 @@ module GRPC
# produced by gen_each_reply could ignore the received_msgs
#
# @param gen_each_reply [Proc] generates the BiDi stream replies.
- def run_on_server(gen_each_reply)
+ # @param set_input_steam_done [Proc] call back to call when
+ # the reads have been completely read through.
+ def run_on_server(gen_each_reply, set_input_stream_done)
# Pass in the optional call object parameter if possible
if gen_each_reply.arity == 1
- replys = gen_each_reply.call(read_loop(is_client: false))
+ replys = gen_each_reply.call(
+ read_loop(set_input_stream_done, is_client: false))
elsif gen_each_reply.arity == 2
- replys = gen_each_reply.call(read_loop(is_client: false), @req_view)
+ replys = gen_each_reply.call(
+ read_loop(set_input_stream_done, is_client: false),
+ @req_view)
else
fail 'Illegal arity of reply generator'
end
@@ -99,22 +111,6 @@ module GRPC
END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes
- # signals that bidi operation is complete
- def notify_done
- return unless @op_notifier
- GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}")
- @op_notifier.notify(self)
- end
-
- # signals that a bidi operation is complete (read + write)
- def finished
- @done_mutex.synchronize do
- return unless @reads_complete && @writes_complete && !@complete
- @call.close
- @complete = true
- end
- end
-
# performs a read using @call.run_batch, ensures metadata is set up
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
@@ -127,7 +123,8 @@ module GRPC
batch_result
end
- def write_loop(requests, is_client: true)
+ # set_output_stream_done is relevant on client-side
+ def write_loop(requests, is_client: true, set_output_stream_done: nil)
GRPC.logger.debug('bidi-write-loop: starting')
count = 0
requests.each do |req|
@@ -151,23 +148,20 @@ module GRPC
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
@call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
GRPC.logger.debug('bidi-write-loop: done')
- notify_done
- @writes_complete = true
- finished
end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
- notify_done
- @writes_complete = true
- finished
raise e
+ ensure
+ set_output_stream_done.call if is_client
end
# Provides an enumerator that yields results of remote reads
- def read_loop(is_client: true)
+ def read_loop(set_input_stream_done, is_client: true)
return enum_for(:read_loop,
+ set_input_stream_done,
is_client: is_client) unless block_given?
GRPC.logger.debug('bidi-read-loop: starting')
begin
@@ -201,10 +195,10 @@ module GRPC
GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e)
raise e
+ ensure
+ set_input_stream_done.call
end
GRPC.logger.debug('bidi-read-loop: finished')
- @reads_complete = true
- finished
# Make sure that the write loop is done done before finishing the call.
# Note that blocking is ok at this point because we've already received
# a status
diff --git a/src/ruby/lib/grpc/generic/rpc_desc.rb b/src/ruby/lib/grpc/generic/rpc_desc.rb
index ce0097573a..89cf8ff6a0 100644
--- a/src/ruby/lib/grpc/generic/rpc_desc.rb
+++ b/src/ruby/lib/grpc/generic/rpc_desc.rb
@@ -48,7 +48,7 @@ module GRPC
end
def handle_request_response(active_call, mth)
- req = active_call.remote_read
+ req = active_call.read_unary_request
resp = mth.call(req, active_call.single_req_view)
active_call.server_unary_response(
resp, trailing_metadata: active_call.output_metadata)
@@ -61,7 +61,7 @@ module GRPC
end
def handle_server_streamer(active_call, mth)
- req = active_call.remote_read
+ req = active_call.read_unary_request
replys = mth.call(req, active_call.single_req_view)
replys.each { |r| active_call.remote_send(r) }
send_status(active_call, OK, 'OK', active_call.output_metadata)
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index ef2cc0ce91..33b3cea1fc 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -418,6 +418,7 @@ module GRPC
metadata_received: true,
started: false,
metadata_to_send: connect_md)
+ c.attach_peer_cert(an_rpc.call.peer_cert)
mth = an_rpc.method.to_sym
[c, mth]
end
diff --git a/src/ruby/spec/client_auth_spec.rb b/src/ruby/spec/client_auth_spec.rb
new file mode 100644
index 0000000000..79c9192aa5
--- /dev/null
+++ b/src/ruby/spec/client_auth_spec.rb
@@ -0,0 +1,137 @@
+# Copyright 2015 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require 'grpc'
+
+def create_channel_creds
+ test_root = File.join(File.dirname(__FILE__), 'testdata')
+ files = ['ca.pem', 'client.key', 'client.pem']
+ creds = files.map { |f| File.open(File.join(test_root, f)).read }
+ GRPC::Core::ChannelCredentials.new(creds[0], creds[1], creds[2])
+end
+
+def client_cert
+ test_root = File.join(File.dirname(__FILE__), 'testdata')
+ cert = File.open(File.join(test_root, 'client.pem')).read
+ fail unless cert.is_a?(String)
+ cert
+end
+
+def create_server_creds
+ test_root = File.join(File.dirname(__FILE__), 'testdata')
+ p "test root: #{test_root}"
+ files = ['ca.pem', 'server1.key', 'server1.pem']
+ creds = files.map { |f| File.open(File.join(test_root, f)).read }
+ GRPC::Core::ServerCredentials.new(
+ creds[0],
+ [{ private_key: creds[1], cert_chain: creds[2] }],
+ true) # force client auth
+end
+
+# A test message
+class EchoMsg
+ def self.marshal(_o)
+ ''
+ end
+
+ def self.unmarshal(_o)
+ EchoMsg.new
+ end
+end
+
+# a test service that checks the cert of its peer
+class SslTestService
+ include GRPC::GenericService
+ rpc :an_rpc, EchoMsg, EchoMsg
+ rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
+ rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
+ rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
+
+ def check_peer_cert(call)
+ error_msg = "want:\n#{client_cert}\n\ngot:\n#{call.peer_cert}"
+ fail(error_msg) unless call.peer_cert == client_cert
+ end
+
+ def an_rpc(req, call)
+ check_peer_cert(call)
+ req
+ end
+
+ def a_client_streaming_rpc(call)
+ check_peer_cert(call)
+ call.each_remote_read.each { |r| p r }
+ EchoMsg.new
+ end
+
+ def a_server_streaming_rpc(_, call)
+ check_peer_cert(call)
+ [EchoMsg.new, EchoMsg.new]
+ end
+
+ def a_bidi_rpc(requests, call)
+ check_peer_cert(call)
+ requests.each { |r| p r }
+ [EchoMsg.new, EchoMsg.new]
+ end
+end
+
+SslTestServiceStub = SslTestService.rpc_stub_class
+
+describe 'client-server auth' do
+ RpcServer = GRPC::RpcServer
+
+ before(:all) do
+ server_opts = {
+ poll_period: 1
+ }
+ @srv = RpcServer.new(**server_opts)
+ port = @srv.add_http2_port('0.0.0.0:0', create_server_creds)
+ @srv.handle(SslTestService)
+ @srv_thd = Thread.new { @srv.run }
+ @srv.wait_till_running
+
+ client_opts = {
+ channel_args: {
+ GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
+ }
+ }
+ @stub = SslTestServiceStub.new("localhost:#{port}",
+ create_channel_creds,
+ **client_opts)
+ end
+
+ after(:all) do
+ expect(@srv.stopped?).to be(false)
+ @srv.stop
+ @srv_thd.join
+ end
+
+ it 'client-server auth with unary RPCs' do
+ @stub.an_rpc(EchoMsg.new)
+ end
+
+ it 'client-server auth with client streaming RPCs' do
+ @stub.a_client_streaming_rpc([EchoMsg.new, EchoMsg.new])
+ end
+
+ it 'client-server auth with server streaming RPCs' do
+ responses = @stub.a_server_streaming_rpc(EchoMsg.new)
+ responses.each { |r| p r }
+ end
+
+ it 'client-server auth with bidi RPCs' do
+ responses = @stub.a_bidi_rpc([EchoMsg.new, EchoMsg.new])
+ responses.each { |r| p r }
+ end
+end
diff --git a/src/ruby/spec/generic/active_call_spec.rb b/src/ruby/spec/generic/active_call_spec.rb
index 72e55ebcce..ec0c294174 100644
--- a/src/ruby/spec/generic/active_call_spec.rb
+++ b/src/ruby/spec/generic/active_call_spec.rb
@@ -473,7 +473,7 @@ describe GRPC::ActiveCall do
server_call.remote_send('server_response')
expect(client_call.remote_read).to eq('server_response')
server_call.send_status(OK, 'status code is OK')
- expect { client_call.finished }.to_not raise_error
+ expect { client_call.receive_and_check_status }.to_not raise_error
end
it 'finishes ok if the server sends an early status response' do
@@ -490,7 +490,7 @@ describe GRPC::ActiveCall do
expect do
call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil)
end.to_not raise_error
- expect { client_call.finished }.to_not raise_error
+ expect { client_call.receive_and_check_status }.to_not raise_error
end
it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do
diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb
index 09b88c7cef..a8653e73cf 100644
--- a/src/ruby/spec/generic/client_stub_spec.rb
+++ b/src/ruby/spec/generic/client_stub_spec.rb
@@ -36,6 +36,53 @@ include GRPC::Core::StatusCodes
include GRPC::Core::TimeConsts
include GRPC::Core::CallOps
+# check that methods on a finished/closed call t crash
+def check_op_view_of_finished_client_call(op_view,
+ expected_metadata,
+ expected_trailing_metadata)
+ # use read_response_stream to try to iterate through
+ # possible response stream
+ fail('need something to attempt reads') unless block_given?
+ expect do
+ resp = op_view.execute
+ yield resp
+ end.to raise_error(GRPC::Core::CallError)
+
+ expect { op_view.start_call }.to raise_error(RuntimeError)
+
+ sanity_check_values_of_accessors(op_view,
+ expected_metadata,
+ expected_trailing_metadata)
+
+ expect do
+ op_view.wait
+ op_view.cancel
+ op_view.write_flag = 1
+ end.to_not raise_error
+end
+
+def sanity_check_values_of_accessors(op_view,
+ expected_metadata,
+ expected_trailing_metadata)
+ expected_status = Struct::Status.new
+ expected_status.code = 0
+ expected_status.details = 'OK'
+ expected_status.metadata = expected_trailing_metadata
+
+ expect(op_view.status).to eq(expected_status)
+ expect(op_view.metadata).to eq(expected_metadata)
+ expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
+
+ expect(op_view.cancelled?).to be(false)
+ expect(op_view.write_flag).to be(nil)
+
+ # The deadline attribute of a call can be either
+ # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
+ # TODO: fix so that the accessor always returns the same type.
+ expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
+ op_view.deadline.is_a?(Time)).to be(true)
+end
+
describe 'ClientStub' do
let(:noop) { proc { |x| x } }
@@ -45,6 +92,7 @@ describe 'ClientStub' do
@method = 'an_rpc_method'
@pass = OK
@fail = INTERNAL
+ @metadata = { k1: 'v1', k2: 'v2' }
end
after(:each) do
@@ -107,7 +155,7 @@ describe 'ClientStub' do
end
end
- describe '#request_response' do
+ describe '#request_response', request_response: true do
before(:each) do
@sent_msg, @resp = 'a_msg', 'a_reply'
end
@@ -126,7 +174,7 @@ describe 'ClientStub' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_request_response(@sent_msg, @resp, @pass,
- k1: 'v1', k2: 'v2')
+ expected_metadata: { k1: 'v1', k2: 'v2' })
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
expect(get_response(stub)).to eq(@resp)
th.join
@@ -187,13 +235,24 @@ describe 'ClientStub' do
# Kill the server thread so tests can complete
th.kill
end
+
+ it 'should raise ArgumentError if metadata contains invalid values' do
+ @metadata.merge!(k3: 3)
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ expect do
+ get_response(stub)
+ end.to raise_error(ArgumentError,
+ /Header values must be of type string or array/)
+ end
end
describe 'without a call operation' do
def get_response(stub, credentials: nil)
puts credentials.inspect
stub.request_response(@method, @sent_msg, noop, noop,
- metadata: { k1: 'v1', k2: 'v2' },
+ metadata: @metadata,
credentials: credentials)
end
@@ -201,40 +260,62 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
+ after(:each) do
+ # make sure op.wait doesn't hang, even if there's a bad status
+ @op.wait
+ end
def get_response(stub, run_start_call_first: false, credentials: nil)
- op = stub.request_response(@method, @sent_msg, noop, noop,
- return_op: true,
- metadata: { k1: 'v1', k2: 'v2' },
- deadline: from_relative_time(2),
- credentials: credentials)
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.start_call if run_start_call_first
- result = op.execute
- op.wait # make sure wait doesn't hang
+ @op = stub.request_response(@method, @sent_msg, noop, noop,
+ return_op: true,
+ metadata: @metadata,
+ deadline: from_relative_time(2),
+ credentials: credentials)
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ result = @op.execute
result
end
it_behaves_like 'request response'
- it 'sends metadata to the server ok when running start_call first' do
+ def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server
host = "localhost:#{server_port}"
- th = run_request_response(@sent_msg, @resp, @pass,
- k1: 'v1', k2: 'v2')
+
+ @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+ @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+ th = run_request_response(
+ @sent_msg, @resp, @pass,
+ expected_metadata: @metadata,
+ server_initial_md: @server_initial_md,
+ server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- expect(get_response(stub)).to eq(@resp)
+ expect(
+ get_response(stub,
+ run_start_call_first: run_start_call_first)).to eq(@resp)
th.join
end
+
+ it 'sends metadata to the server ok when running start_call first' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) { |r| p r }
+ end
+
+ it 'does not crash when used after the call has been finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) { |r| p r }
+ end
end
end
- describe '#client_streamer' do
+ describe '#client_streamer', client_streamer: true do
before(:each) do
Thread.abort_on_exception = true
server_port = create_test_server
host = "localhost:#{server_port}"
@stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- @metadata = { k1: 'v1', k2: 'v2' }
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@resp = 'a_reply'
end
@@ -247,7 +328,8 @@ describe 'ClientStub' do
end
it 'should send metadata to the server ok' do
- th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
+ th = run_client_streamer(@sent_msgs, @resp, @pass,
+ expected_metadata: @metadata)
expect(get_response(@stub)).to eq(@resp)
th.join
end
@@ -278,27 +360,50 @@ describe 'ClientStub' do
end
describe 'via a call operation' do
+ after(:each) do
+ # make sure op.wait doesn't hang, even if there's a bad status
+ @op.wait
+ end
def get_response(stub, run_start_call_first: false)
- op = stub.client_streamer(@method, @sent_msgs, noop, noop,
- return_op: true, metadata: @metadata)
- expect(op).to be_a(GRPC::ActiveCall::Operation)
- op.start_call if run_start_call_first
- result = op.execute
- op.wait # make sure wait doesn't hang
+ @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
+ return_op: true, metadata: @metadata)
+ expect(@op).to be_a(GRPC::ActiveCall::Operation)
+ @op.start_call if run_start_call_first
+ result = @op.execute
result
end
it_behaves_like 'client streaming'
- it 'sends metadata to the server ok when running start_call first' do
- th = run_client_streamer(@sent_msgs, @resp, @pass, **@metadata)
- expect(get_response(@stub, run_start_call_first: true)).to eq(@resp)
+ def run_op_view_metadata_test(run_start_call_first)
+ @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+ @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+ th = run_client_streamer(
+ @sent_msgs, @resp, @pass,
+ expected_metadata: @metadata,
+ server_initial_md: @server_initial_md,
+ server_trailing_md: @server_trailing_md)
+ expect(
+ get_response(@stub,
+ run_start_call_first: run_start_call_first)).to eq(@resp)
th.join
end
+
+ it 'sends metadata to the server ok when running start_call first' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) { |r| p r }
+ end
+
+ it 'does not crash when used after the call has been finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) { |r| p r }
+ end
end
end
- describe '#server_streamer' do
+ describe '#server_streamer', server_streamer: true do
before(:each) do
@sent_msg = 'a_msg'
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -328,18 +433,42 @@ describe 'ClientStub' do
server_port = create_test_server
host = "localhost:#{server_port}"
th = run_server_streamer(@sent_msg, @replys, @fail,
- k1: 'v1', k2: 'v2')
+ expected_metadata: { k1: 'v1', k2: 'v2' })
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
e = get_responses(stub)
expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
th.join
end
+
+ it 'should raise ArgumentError if metadata contains invalid values' do
+ @metadata.merge!(k3: 3)
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+ expect do
+ get_responses(stub)
+ end.to raise_error(ArgumentError,
+ /Header values must be of type string or array/)
+ end
+
+ it 'the call terminates when there is an unmarshalling error' do
+ server_port = create_test_server
+ host = "localhost:#{server_port}"
+ th = run_server_streamer(@sent_msg, @replys, @pass)
+ stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
+
+ unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
+ expect do
+ get_responses(stub, unmarshal: unmarshal).collect { |r| r }
+ end.to raise_error(ArgumentError, 'test unmarshalling error')
+ th.join
+ end
end
describe 'without a call operation' do
- def get_responses(stub)
- e = stub.server_streamer(@method, @sent_msg, noop, noop,
- metadata: { k1: 'v1', k2: 'v2' })
+ def get_responses(stub, unmarshal: noop)
+ e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
+ metadata: @metadata)
expect(e).to be_a(Enumerator)
e
end
@@ -351,10 +480,10 @@ describe 'ClientStub' do
after(:each) do
@op.wait # make sure wait doesn't hang
end
- def get_responses(stub, run_start_call_first: false)
- @op = stub.server_streamer(@method, @sent_msg, noop, noop,
+ def get_responses(stub, run_start_call_first: false, unmarshal: noop)
+ @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
return_op: true,
- metadata: { k1: 'v1', k2: 'v2' })
+ metadata: @metadata)
expect(@op).to be_a(GRPC::ActiveCall::Operation)
@op.start_call if run_start_call_first
e = @op.execute
@@ -364,20 +493,41 @@ describe 'ClientStub' do
it_behaves_like 'server streaming'
- it 'should send metadata to the server ok when start_call is run first' do
+ def run_op_view_metadata_test(run_start_call_first)
server_port = create_test_server
host = "localhost:#{server_port}"
- th = run_server_streamer(@sent_msg, @replys, @fail,
- k1: 'v1', k2: 'v2')
+ @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+ @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+ th = run_server_streamer(
+ @sent_msg, @replys, @pass,
+ expected_metadata: @metadata,
+ server_initial_md: @server_initial_md,
+ server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
- e = get_responses(stub, run_start_call_first: true)
- expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+ e = get_responses(stub, run_start_call_first: run_start_call_first)
+ expect(e.collect { |r| r }).to eq(@replys)
th.join
end
+
+ it 'should send metadata to the server ok when start_call is run first' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) do |responses|
+ responses.each { |r| p r }
+ end
+ end
+
+ it 'does not crash when used after the call has been finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) do |responses|
+ responses.each { |r| p r }
+ end
+ end
end
end
- describe '#bidi_streamer' do
+ describe '#bidi_streamer', bidi: true do
before(:each) do
@sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
@replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
@@ -386,7 +536,7 @@ describe 'ClientStub' do
end
shared_examples 'bidi streaming' do
- it 'supports sending all the requests first', bidi: true do
+ it 'supports sending all the requests first' do
th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
@pass)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
@@ -395,7 +545,7 @@ describe 'ClientStub' do
th.join
end
- it 'supports client-initiated ping pong', bidi: true do
+ it 'supports client-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
@@ -403,18 +553,39 @@ describe 'ClientStub' do
th.join
end
- it 'supports a server-initiated ping pong', bidi: true do
+ it 'supports a server-initiated ping pong' do
th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
e = get_responses(stub)
expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
+
+ it 'should raise an error if the status is not ok' do
+ th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ e = get_responses(stub)
+ expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
+ th.join
+ end
+
+ # TODO: add test for metadata-related ArgumentError in a bidi call once
+ # issue mentioned in https://github.com/grpc/grpc/issues/10526 is fixed
+
+ it 'should send metadata to the server ok' do
+ th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
+ expected_metadata: @metadata)
+ stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
+ e = get_responses(stub)
+ expect(e.collect { |r| r }).to eq(@sent_msgs)
+ th.join
+ end
end
describe 'without a call operation' do
def get_responses(stub)
- e = stub.bidi_streamer(@method, @sent_msgs, noop, noop)
+ e = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
+ metadata: @metadata)
expect(e).to be_a(Enumerator)
e
end
@@ -428,7 +599,8 @@ describe 'ClientStub' do
end
def get_responses(stub, run_start_call_first: false)
@op = stub.bidi_streamer(@method, @sent_msgs, noop, noop,
- return_op: true)
+ return_op: true,
+ metadata: @metadata)
expect(@op).to be_a(GRPC::ActiveCall::Operation)
@op.start_call if run_start_call_first
e = @op.execute
@@ -438,27 +610,53 @@ describe 'ClientStub' do
it_behaves_like 'bidi streaming'
- it 'can run start_call before executing the call' do
- th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
- @pass)
+ def run_op_view_metadata_test(run_start_call_first)
+ @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
+ @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
+ th = run_bidi_streamer_echo_ping_pong(
+ @sent_msgs, @pass, true,
+ expected_metadata: @metadata,
+ server_initial_md: @server_initial_md,
+ server_trailing_md: @server_trailing_md)
stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
- e = get_responses(stub, run_start_call_first: true)
- expect(e.collect { |r| r }).to eq(@replys)
+ e = get_responses(stub, run_start_call_first: run_start_call_first)
+ expect(e.collect { |r| r }).to eq(@sent_msgs)
th.join
end
+
+ it 'can run start_call before executing the call' do
+ run_op_view_metadata_test(true)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) do |responses|
+ responses.each { |r| p r }
+ end
+ end
+
+ it 'doesnt crash when op_view used after call has finished' do
+ run_op_view_metadata_test(false)
+ check_op_view_of_finished_client_call(
+ @op, @server_initial_md, @server_trailing_md) do |responses|
+ responses.each { |r| p r }
+ end
+ end
end
end
- def run_server_streamer(expected_input, replys, status, **kw)
- wanted_metadata = kw.clone
+ def run_server_streamer(expected_input, replys, status,
+ expected_metadata: {},
+ server_initial_md: {},
+ server_trailing_md: {})
+ wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
- c = expect_server_to_be_invoked(notifier)
+ c = expect_server_to_be_invoked(
+ notifier, metadata_to_send: server_initial_md)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
expect(c.remote_read).to eq(expected_input)
replys.each { |r| c.remote_send(r) }
- c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+ metadata: server_trailing_md)
end
end
@@ -472,9 +670,17 @@ describe 'ClientStub' do
end
end
- def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts)
+ def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
+ expected_metadata: {},
+ server_initial_md: {},
+ server_trailing_md: {})
+ wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
- c = expect_server_to_be_invoked(notifier)
+ c = expect_server_to_be_invoked(
+ notifier, metadata_to_send: server_initial_md)
+ wanted_metadata.each do |k, v|
+ expect(c.metadata[k.to_s]).to eq(v)
+ end
expected_inputs.each do |i|
if client_starts
expect(c.remote_read).to eq(i)
@@ -484,33 +690,44 @@ describe 'ClientStub' do
expect(c.remote_read).to eq(i)
end
end
- c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+ metadata: server_trailing_md)
end
end
- def run_client_streamer(expected_inputs, resp, status, **kw)
- wanted_metadata = kw.clone
+ def run_client_streamer(expected_inputs, resp, status,
+ expected_metadata: {},
+ server_initial_md: {},
+ server_trailing_md: {})
+ wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
- c = expect_server_to_be_invoked(notifier)
+ c = expect_server_to_be_invoked(
+ notifier, metadata_to_send: server_initial_md)
expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
- c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+ metadata: server_trailing_md)
end
end
- def run_request_response(expected_input, resp, status, **kw)
- wanted_metadata = kw.clone
+ def run_request_response(expected_input, resp, status,
+ expected_metadata: {},
+ server_initial_md: {},
+ server_trailing_md: {})
+ wanted_metadata = expected_metadata.clone
wakey_thread do |notifier|
- c = expect_server_to_be_invoked(notifier)
+ c = expect_server_to_be_invoked(
+ notifier, metadata_to_send: server_initial_md)
expect(c.remote_read).to eq(expected_input)
wanted_metadata.each do |k, v|
expect(c.metadata[k.to_s]).to eq(v)
end
c.remote_send(resp)
- c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
+ c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
+ metadata: server_trailing_md)
end
end
@@ -528,13 +745,13 @@ describe 'ClientStub' do
@server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
end
- def expect_server_to_be_invoked(notifier)
+ def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
@server.start
notifier.notify(nil)
recvd_rpc = @server.request_call
recvd_call = recvd_rpc.call
recvd_call.metadata = recvd_rpc.metadata
- recvd_call.run_batch(SEND_INITIAL_METADATA => nil)
+ recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
metadata_received: true)
end
diff --git a/src/ruby/spec/generic/rpc_desc_spec.rb b/src/ruby/spec/generic/rpc_desc_spec.rb
index 100e9e8487..be578c40d3 100644
--- a/src/ruby/spec/generic/rpc_desc_spec.rb
+++ b/src/ruby/spec/generic/rpc_desc_spec.rb
@@ -38,14 +38,14 @@ describe GRPC::RpcDesc do
shared_examples 'it handles errors' do
it 'sends the specified status if BadStatus is raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(@bs_code, 'NOK', false,
metadata: {})
this_desc.run_server_method(@call, method(:bad_status))
end
it 'sends status UNKNOWN if other StandardErrors are raised' do
- expect(@call).to receive(:remote_read).once.and_return(Object.new)
+ expect(@call).to receive(:read_unary_request).once.and_return(Object.new)
expect(@call).to receive(:send_status).once.with(UNKNOWN,
arg_error_msg,
false, metadata: {})
@@ -53,7 +53,7 @@ describe GRPC::RpcDesc do
end
it 'absorbs CallError with no further action' do
- expect(@call).to receive(:remote_read).once.and_raise(CallError)
+ expect(@call).to receive(:read_unary_request).once.and_raise(CallError)
blk = proc do
this_desc.run_server_method(@call, method(:fake_reqresp))
end
@@ -75,7 +75,7 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
req = Object.new
- expect(@call).to receive(:remote_read).once.and_return(req)
+ expect(@call).to receive(:read_unary_request).once.and_return(req)
expect(@call).to receive(:output_metadata).once.and_return(fake_md)
expect(@call).to receive(:server_unary_response).once
.with(@ok_response, trailing_metadata: fake_md)
@@ -133,7 +133,7 @@ describe GRPC::RpcDesc do
it 'sends a response and closes the stream if there no errors' do
req = Object.new
- expect(@call).to receive(:remote_read).once.and_return(req)
+ expect(@call).to receive(:read_unary_request).once.and_return(req)
expect(@call).to receive(:remote_send).twice.with(@ok_response)
expect(@call).to receive(:output_metadata).and_return(fake_md)
expect(@call).to receive(:send_status).once.with(OK, 'OK', true,
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 9633a828a2..e0646f4599 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -111,6 +111,47 @@ end
SlowStub = SlowService.rpc_stub_class
+# a test service that hangs onto call objects
+# and uses them after the server-side call has been
+# finished
+class CheckCallAfterFinishedService
+ include GRPC::GenericService
+ rpc :an_rpc, EchoMsg, EchoMsg
+ rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
+ rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
+ rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
+ attr_reader :server_side_call
+
+ def an_rpc(req, call)
+ fail 'shouldnt reuse service' unless @server_side_call.nil?
+ @server_side_call = call
+ req
+ end
+
+ def a_client_streaming_rpc(call)
+ fail 'shouldnt reuse service' unless @server_side_call.nil?
+ @server_side_call = call
+ # iterate through requests so call can complete
+ call.each_remote_read.each { |r| p r }
+ EchoMsg.new
+ end
+
+ def a_server_streaming_rpc(_, call)
+ fail 'shouldnt reuse service' unless @server_side_call.nil?
+ @server_side_call = call
+ [EchoMsg.new, EchoMsg.new]
+ end
+
+ def a_bidi_rpc(requests, call)
+ fail 'shouldnt reuse service' unless @server_side_call.nil?
+ @server_side_call = call
+ requests.each { |r| p r }
+ [EchoMsg.new, EchoMsg.new]
+ end
+end
+
+CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
+
describe GRPC::RpcServer do
RpcServer = GRPC::RpcServer
StatusCodes = GRPC::Core::StatusCodes
@@ -505,5 +546,109 @@ describe GRPC::RpcServer do
t.join
end
end
+
+ context 'when call objects are used after calls have completed' do
+ before(:each) do
+ server_opts = {
+ poll_period: 1
+ }
+ @srv = RpcServer.new(**server_opts)
+ alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
+ @alt_host = "0.0.0.0:#{alt_port}"
+
+ @service = CheckCallAfterFinishedService.new
+ @srv.handle(@service)
+ @srv_thd = Thread.new { @srv.run }
+ @srv.wait_till_running
+ end
+
+ # check that the server-side call is still in a usable state even
+ # after it has finished
+ def check_single_req_view_of_finished_call(call)
+ common_check_of_finished_server_call(call)
+
+ expect(call.peer).to be_a(String)
+ expect(call.peer_cert).to be(nil)
+ end
+
+ def check_multi_req_view_of_finished_call(call)
+ common_check_of_finished_server_call(call)
+
+ expect do
+ call.each_remote_read.each { |r| p r }
+ end.to raise_error(GRPC::Core::CallError)
+ end
+
+ def common_check_of_finished_server_call(call)
+ expect do
+ call.merge_metadata_to_send({})
+ end.to raise_error(RuntimeError)
+
+ expect do
+ call.send_initial_metadata
+ end.to_not raise_error
+
+ expect(call.cancelled?).to be(false)
+ expect(call.metadata).to be_a(Hash)
+ expect(call.metadata['user-agent']).to be_a(String)
+
+ expect(call.metadata_sent).to be(true)
+ expect(call.output_metadata).to eq({})
+ expect(call.metadata_to_send).to eq({})
+ expect(call.deadline.is_a?(Time)).to be(true)
+ end
+
+ it 'should not crash when call used after an unary call is finished' do
+ req = EchoMsg.new
+ stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+ :this_channel_is_insecure)
+ resp = stub.an_rpc(req)
+ expect(resp).to be_a(EchoMsg)
+ @srv.stop
+ @srv_thd.join
+
+ check_single_req_view_of_finished_call(@service.server_side_call)
+ end
+
+ it 'should not crash when call used after client streaming finished' do
+ requests = [EchoMsg.new, EchoMsg.new]
+ stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+ :this_channel_is_insecure)
+ resp = stub.a_client_streaming_rpc(requests)
+ expect(resp).to be_a(EchoMsg)
+ @srv.stop
+ @srv_thd.join
+
+ check_multi_req_view_of_finished_call(@service.server_side_call)
+ end
+
+ it 'should not crash when call used after server streaming finished' do
+ req = EchoMsg.new
+ stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+ :this_channel_is_insecure)
+ responses = stub.a_server_streaming_rpc(req)
+ responses.each do |r|
+ expect(r).to be_a(EchoMsg)
+ end
+ @srv.stop
+ @srv_thd.join
+
+ check_single_req_view_of_finished_call(@service.server_side_call)
+ end
+
+ it 'should not crash when call used after a bidi call is finished' do
+ requests = [EchoMsg.new, EchoMsg.new]
+ stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
+ :this_channel_is_insecure)
+ responses = stub.a_bidi_rpc(requests)
+ responses.each do |r|
+ expect(r).to be_a(EchoMsg)
+ end
+ @srv.stop
+ @srv_thd.join
+
+ check_multi_req_view_of_finished_call(@service.server_side_call)
+ end
+ end
end
end
diff --git a/src/ruby/spec/testdata/client.key b/src/ruby/spec/testdata/client.key
new file mode 100644
index 0000000000..f48d0735d9
--- /dev/null
+++ b/src/ruby/spec/testdata/client.key
@@ -0,0 +1,16 @@
+-----BEGIN PRIVATE KEY-----
+MIICeQIBADANBgkqhkiG9w0BAQEFAASCAmMwggJfAgEAAoGBAOxUR9uhvhbeVUIM
+s5WbH0px0mehl2+6sZpNjzvE2KimZpHzMJHukVH0Ffkvhs0b8+S5Ut9VNUAqd3IM
+JCCAEGtRNoQhM1t9Yr2zAckSvbRacp+FL/Cj9eDmyo00KsVGaeefA4Dh4OW+ZhkT
+NKcldXqkSuj1sEf244JZYuqZp6/tAgMBAAECgYEAi2NSVqpZMafE5YYUTcMGe6QS
+k2jtpsqYgggI2RnLJ/2tNZwYI5pwP8QVSbnMaiF4gokD5hGdrNDfTnb2v+yIwYEH
+0w8+oG7Z81KodsiZSIDJfTGsAZhVNwOz9y0VD8BBZZ1/274Zh52AUKLjZS/ZwIbS
+W2ywya855dPnH/wj+0ECQQD9X8D920kByTNHhBG18biAEZ4pxs9f0OAG8333eVcI
+w2lJDLsYDZrCB2ocgA3lUdozlzPC7YDYw8reg0tkiRY5AkEA7sdNzOeQsQRn7++5
+0bP9DtT/iON1gbfxRzCfCfXdoOtfQWIzTePWtURt9X/5D9NofI0Rg5W2oGy/MLe5
+/sXHVQJBAIup5XrJDkQywNZyAUU2ecn2bCWBFjwtqd+LBmuMciI9fOKsZtEKZrz/
+U0lkeMRoSwvXE8wmGLjjrAbdfohrXFkCQQDZEx/LtIl6JINJQiswVe0tWr6k+ASP
+1WXoTm+HYpoF/XUvv9LccNF1IazFj34hwRQwhx7w/V52Ieb+p0jUMYGxAkEAjDhd
+9pBO1fKXWiXzi9ZKfoyTNcUq3eBSVKwPG2nItg5ycXengjT5sgcWDnciIzW7BIVI
+JiqOszq9GWESErAatg==
+-----END PRIVATE KEY-----
diff --git a/src/ruby/spec/testdata/client.pem b/src/ruby/spec/testdata/client.pem
new file mode 100644
index 0000000000..e332091019
--- /dev/null
+++ b/src/ruby/spec/testdata/client.pem
@@ -0,0 +1,14 @@
+-----BEGIN CERTIFICATE-----
+MIICHzCCAYgCAQEwDQYJKoZIhvcNAQEFBQAwVjELMAkGA1UEBhMCQVUxEzARBgNV
+BAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0
+ZDEPMA0GA1UEAwwGdGVzdGNhMB4XDTE0MDcxNzIzNTYwMloXDTI0MDcxNDIzNTYw
+MlowWjELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
+GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDETMBEGA1UEAwwKdGVzdGNsaWVudDCB
+nzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA7FRH26G+Ft5VQgyzlZsfSnHSZ6GX
+b7qxmk2PO8TYqKZmkfMwke6RUfQV+S+GzRvz5LlS31U1QCp3cgwkIIAQa1E2hCEz
+W31ivbMByRK9tFpyn4Uv8KP14ObKjTQqxUZp558DgOHg5b5mGRM0pyV1eqRK6PWw
+R/bjglli6pmnr+0CAwEAATANBgkqhkiG9w0BAQUFAAOBgQAStSm5PM7ubROiKK6/
+T2FkKlhiTOx+Ryenm3Eio59emq+jXl+1nhPySX5G2PQzSR5vd1dIhwgZSR4Gyttk
+tRZ57k/NI1brUW8joiEOMJA/Mr7H7asx7wIRYDE91Fs8GkKWd5LhoPAQj+qdG35C
+OO+svdkmqH0KZo320ZUqdl2ooQ==
+-----END CERTIFICATE-----
diff --git a/templates/binding.gyp.template b/templates/binding.gyp.template
index b7560fe7df..adb7d9f774 100644
--- a/templates/binding.gyp.template
+++ b/templates/binding.gyp.template
@@ -165,19 +165,22 @@
}],
['OS == "mac"', {
'xcode_settings': {
- 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ % if defaults['global'].get('CPPFLAGS', None) is not None:
+ 'OTHER_CFLAGS': [
+ % for item in defaults['global'].get('CPPFLAGS').split():
+ '${item}',
+ % endfor
+ ],
+ 'OTHER_CPLUSPLUSFLAGS': [
+ % for item in defaults['global'].get('CPPFLAGS').split():
+ '${item}',
+ % endfor
+ '-stdlib=libc++',
+ '-std=c++11',
+ '-Wno-error=deprecated-declarations'
+ ],
+ % endif
},
- % if defaults['global'].get('CPPFLAGS', None) is not None:
- 'OTHER_CFLAGS': [
- % for item in defaults['global'].get('CPPFLAGS').split():
- '${item}',
- % endfor
- ],
- 'OTHER_CPLUSPLUSFLAGS': [
- '-stdlib=libc++',
- '-std=c++11'
- ],
- % endif
}]
]
},
@@ -201,6 +204,13 @@
'${source}',
% endfor
],
+ 'conditions': [
+ ['OS == "mac"', {
+ 'xcode_settings': {
+ 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ }
+ }]
+ ]
},
% endif
% endfor
@@ -282,6 +292,13 @@
'${source}',
% endfor
],
+ 'conditions': [
+ ['OS == "mac"', {
+ 'xcode_settings': {
+ 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ }
+ }]
+ ]
},
% endif
% endfor
@@ -317,6 +334,11 @@
'ldflags': [
'-Wl,-wrap,memcpy'
]
+ }],
+ ['OS == "mac"', {
+ 'xcode_settings': {
+ 'MACOSX_DEPLOYMENT_TARGET': '10.9'
+ }
}]
],
"target_name": "${module.name}",
diff --git a/test/core/end2end/end2end_nosec_tests.c b/test/core/end2end/end2end_nosec_tests.c
index ae1db54f1a..6a061a4e2d 100644
--- a/test/core/end2end/end2end_nosec_tests.c
+++ b/test/core/end2end/end2end_nosec_tests.c
@@ -106,6 +106,8 @@ extern void ping(grpc_end2end_test_config config);
extern void ping_pre_init(void);
extern void ping_pong_streaming(grpc_end2end_test_config config);
extern void ping_pong_streaming_pre_init(void);
+extern void proxy_auth(grpc_end2end_test_config config);
+extern void proxy_auth_pre_init(void);
extern void registered_call(grpc_end2end_test_config config);
extern void registered_call_pre_init(void);
extern void request_with_flags(grpc_end2end_test_config config);
@@ -181,6 +183,7 @@ void grpc_end2end_tests_pre_init(void) {
payload_pre_init();
ping_pre_init();
ping_pong_streaming_pre_init();
+ proxy_auth_pre_init();
registered_call_pre_init();
request_with_flags_pre_init();
request_with_payload_pre_init();
@@ -244,6 +247,7 @@ void grpc_end2end_tests(int argc, char **argv,
payload(config);
ping(config);
ping_pong_streaming(config);
+ proxy_auth(config);
registered_call(config);
request_with_flags(config);
request_with_payload(config);
@@ -416,6 +420,10 @@ void grpc_end2end_tests(int argc, char **argv,
ping_pong_streaming(config);
continue;
}
+ if (0 == strcmp("proxy_auth", argv[i])) {
+ proxy_auth(config);
+ continue;
+ }
if (0 == strcmp("registered_call", argv[i])) {
registered_call(config);
continue;
diff --git a/test/core/end2end/end2end_tests.c b/test/core/end2end/end2end_tests.c
index d18dd9c7b6..3fc7c3fb6c 100644
--- a/test/core/end2end/end2end_tests.c
+++ b/test/core/end2end/end2end_tests.c
@@ -108,6 +108,8 @@ extern void ping(grpc_end2end_test_config config);
extern void ping_pre_init(void);
extern void ping_pong_streaming(grpc_end2end_test_config config);
extern void ping_pong_streaming_pre_init(void);
+extern void proxy_auth(grpc_end2end_test_config config);
+extern void proxy_auth_pre_init(void);
extern void registered_call(grpc_end2end_test_config config);
extern void registered_call_pre_init(void);
extern void request_with_flags(grpc_end2end_test_config config);
@@ -184,6 +186,7 @@ void grpc_end2end_tests_pre_init(void) {
payload_pre_init();
ping_pre_init();
ping_pong_streaming_pre_init();
+ proxy_auth_pre_init();
registered_call_pre_init();
request_with_flags_pre_init();
request_with_payload_pre_init();
@@ -248,6 +251,7 @@ void grpc_end2end_tests(int argc, char **argv,
payload(config);
ping(config);
ping_pong_streaming(config);
+ proxy_auth(config);
registered_call(config);
request_with_flags(config);
request_with_payload(config);
@@ -424,6 +428,10 @@ void grpc_end2end_tests(int argc, char **argv,
ping_pong_streaming(config);
continue;
}
+ if (0 == strcmp("proxy_auth", argv[i])) {
+ proxy_auth(config);
+ continue;
+ }
if (0 == strcmp("registered_call", argv[i])) {
registered_call(config);
continue;
diff --git a/test/core/end2end/fixtures/h2_http_proxy.c b/test/core/end2end/fixtures/h2_http_proxy.c
index f8c88e5953..6145892365 100644
--- a/test/core/end2end/fixtures/h2_http_proxy.c
+++ b/test/core/end2end/fixtures/h2_http_proxy.c
@@ -47,11 +47,13 @@ static grpc_end2end_test_fixture chttp2_create_fixture_fullstack(
grpc_channel_args *client_args, grpc_channel_args *server_args) {
grpc_end2end_test_fixture f;
memset(&f, 0, sizeof(f));
-
fullstack_fixture_data *ffd = gpr_malloc(sizeof(fullstack_fixture_data));
const int server_port = grpc_pick_unused_port_or_die();
gpr_join_host_port(&ffd->server_addr, "localhost", server_port);
- ffd->proxy = grpc_end2end_http_proxy_create();
+
+ /* Passing client_args to proxy_create for the case of checking for proxy auth
+ */
+ ffd->proxy = grpc_end2end_http_proxy_create(client_args);
f.fixture_data = ffd;
f.cq = grpc_completion_queue_create_for_next(NULL);
@@ -64,8 +66,17 @@ void chttp2_init_client_fullstack(grpc_end2end_test_fixture *f,
grpc_channel_args *client_args) {
fullstack_fixture_data *ffd = f->fixture_data;
char *proxy_uri;
- gpr_asprintf(&proxy_uri, "http://%s",
- grpc_end2end_http_proxy_get_proxy_name(ffd->proxy));
+
+ /* If testing for proxy auth, add credentials to proxy uri */
+ const grpc_arg *proxy_auth_arg =
+ grpc_channel_args_find(client_args, GRPC_ARG_HTTP_PROXY_AUTH_CREDS);
+ if (proxy_auth_arg == NULL || proxy_auth_arg->type != GRPC_ARG_STRING) {
+ gpr_asprintf(&proxy_uri, "http://%s",
+ grpc_end2end_http_proxy_get_proxy_name(ffd->proxy));
+ } else {
+ gpr_asprintf(&proxy_uri, "http://%s@%s", proxy_auth_arg->value.string,
+ grpc_end2end_http_proxy_get_proxy_name(ffd->proxy));
+ }
gpr_setenv("http_proxy", proxy_uri);
gpr_free(proxy_uri);
f->client = grpc_insecure_channel_create(ffd->server_addr, client_args, NULL);
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.c b/test/core/end2end/fixtures/http_proxy_fixture.c
index 54693c4900..a4cfc77bcb 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.c
+++ b/test/core/end2end/fixtures/http_proxy_fixture.c
@@ -22,6 +22,7 @@
#include <string.h>
+#include <grpc/grpc.h>
#include <grpc/slice_buffer.h>
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
@@ -46,7 +47,9 @@
#include "src/core/lib/iomgr/tcp_client.h"
#include "src/core/lib/iomgr/tcp_server.h"
#include "src/core/lib/iomgr/timer.h"
+#include "src/core/lib/slice/b64.h"
#include "src/core/lib/slice/slice_internal.h"
+#include "src/core/lib/support/string.h"
#include "test/core/util/port.h"
struct grpc_end2end_http_proxy {
@@ -304,6 +307,28 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg,
&conn->on_write_response_done);
}
+/**
+ * Parses the proxy auth header value to check if it matches :-
+ * Basic <base64_encoded_expected_cred>
+ * Returns true if it matches, false otherwise
+ */
+static bool proxy_auth_header_matches(grpc_exec_ctx* exec_ctx,
+ char* proxy_auth_header_val,
+ char* expected_cred) {
+ GPR_ASSERT(proxy_auth_header_val != NULL);
+ GPR_ASSERT(expected_cred != NULL);
+ if (strncmp(proxy_auth_header_val, "Basic ", 6) != 0) {
+ return false;
+ }
+ proxy_auth_header_val += 6;
+ grpc_slice decoded_slice =
+ grpc_base64_decode(exec_ctx, proxy_auth_header_val, 0);
+ const bool header_matches =
+ grpc_slice_str_cmp(decoded_slice, expected_cred) == 0;
+ grpc_slice_unref_internal(exec_ctx, decoded_slice);
+ return header_matches;
+}
+
// Callback to read the HTTP CONNECT request.
// TODO(roth): Technically, for any of the failure modes handled by this
// function, we should handle the error by returning an HTTP response to
@@ -352,6 +377,28 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg,
GRPC_ERROR_UNREF(error);
return;
}
+ // If proxy auth is being used, check if the header is present and as expected
+ const grpc_arg* proxy_auth_arg = grpc_channel_args_find(
+ conn->proxy->channel_args, GRPC_ARG_HTTP_PROXY_AUTH_CREDS);
+ if (proxy_auth_arg != NULL && proxy_auth_arg->type == GRPC_ARG_STRING) {
+ bool client_authenticated = false;
+ for (size_t i = 0; i < conn->http_request.hdr_count; i++) {
+ if (strcmp(conn->http_request.hdrs[i].key, "Proxy-Authorization") == 0) {
+ client_authenticated = proxy_auth_header_matches(
+ exec_ctx, conn->http_request.hdrs[i].value,
+ proxy_auth_arg->value.string);
+ break;
+ }
+ }
+ if (!client_authenticated) {
+ const char* msg = "HTTP Connect could not verify authentication";
+ error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg);
+ proxy_connection_failed(exec_ctx, conn, true /* is_client */,
+ "HTTP proxy read request", error);
+ GRPC_ERROR_UNREF(error);
+ return;
+ }
+ }
// Resolve address.
grpc_resolved_addresses* resolved_addresses = NULL;
error = grpc_blocking_resolve_address(conn->http_request.path, "80",
@@ -436,7 +483,8 @@ static void thread_main(void* arg) {
grpc_exec_ctx_finish(&exec_ctx);
}
-grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(void) {
+grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
+ grpc_channel_args* args) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_end2end_http_proxy* proxy =
(grpc_end2end_http_proxy*)gpr_malloc(sizeof(*proxy));
@@ -448,7 +496,7 @@ grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(void) {
gpr_join_host_port(&proxy->proxy_name, "localhost", proxy_port);
gpr_log(GPR_INFO, "Proxy address: %s", proxy->proxy_name);
// Create TCP server.
- proxy->channel_args = grpc_channel_args_copy(NULL);
+ proxy->channel_args = grpc_channel_args_copy(args);
grpc_error* error = grpc_tcp_server_create(
&exec_ctx, NULL, proxy->channel_args, &proxy->server);
GPR_ASSERT(error == GRPC_ERROR_NONE);
diff --git a/test/core/end2end/fixtures/http_proxy_fixture.h b/test/core/end2end/fixtures/http_proxy_fixture.h
index a72162e846..103bd08196 100644
--- a/test/core/end2end/fixtures/http_proxy_fixture.h
+++ b/test/core/end2end/fixtures/http_proxy_fixture.h
@@ -16,11 +16,28 @@
*
*/
+#ifndef GRPC_TEST_CORE_END2END_FIXTURES_HTTP_PROXY_FIXTURE_H
+#define GRPC_TEST_CORE_END2END_FIXTURES_HTTP_PROXY_FIXTURE_H
+
+#include <grpc/grpc.h>
+
+/* The test credentials being used for HTTP Proxy Authorization */
+#define GRPC_TEST_HTTP_PROXY_AUTH_CREDS "aladdin:opensesame"
+
+/* A channel arg key used to indicate that the channel uses proxy authorization.
+ * The value (string) should be the proxy auth credentials that should be
+ * checked.
+ */
+#define GRPC_ARG_HTTP_PROXY_AUTH_CREDS "grpc.test.proxy_auth"
+
typedef struct grpc_end2end_http_proxy grpc_end2end_http_proxy;
-grpc_end2end_http_proxy* grpc_end2end_http_proxy_create();
+grpc_end2end_http_proxy* grpc_end2end_http_proxy_create(
+ grpc_channel_args* args);
void grpc_end2end_http_proxy_destroy(grpc_end2end_http_proxy* proxy);
const char* grpc_end2end_http_proxy_get_proxy_name(
grpc_end2end_http_proxy* proxy);
+
+#endif /* GRPC_TEST_CORE_END2END_FIXTURES_HTTP_PROXY_FIXTURE_H */
diff --git a/test/core/end2end/gen_build_yaml.py b/test/core/end2end/gen_build_yaml.py
index 6878964c4f..18bae63a8a 100755
--- a/test/core/end2end/gen_build_yaml.py
+++ b/test/core/end2end/gen_build_yaml.py
@@ -24,9 +24,9 @@ import hashlib
FixtureOptions = collections.namedtuple(
'FixtureOptions',
- 'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2')
+ 'fullstack includes_proxy dns_resolver name_resolution secure platforms ci_mac tracing exclude_configs exclude_iomgrs large_writes enables_compression supports_compression is_inproc is_http2 supports_proxy_auth')
default_unsecure_fixture_options = FixtureOptions(
- True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True)
+ True, False, True, True, False, ['windows', 'linux', 'mac', 'posix'], True, False, [], [], True, False, True, False, True, False)
socketpair_unsecure_fixture_options = default_unsecure_fixture_options._replace(fullstack=False, dns_resolver=False)
default_secure_fixture_options = default_unsecure_fixture_options._replace(secure=True)
uds_fixture_options = default_unsecure_fixture_options._replace(dns_resolver=False, platforms=['linux', 'mac', 'posix'], exclude_iomgrs=['uv'])
@@ -47,7 +47,7 @@ END2END_FIXTURES = {
'h2_full+trace': default_unsecure_fixture_options._replace(tracing=True),
'h2_full+workarounds': default_unsecure_fixture_options,
'h2_http_proxy': default_unsecure_fixture_options._replace(
- ci_mac=False, exclude_iomgrs=['uv']),
+ ci_mac=False, exclude_iomgrs=['uv'], supports_proxy_auth=True),
'h2_oauth2': default_secure_fixture_options._replace(
ci_mac=False, exclude_iomgrs=['uv']),
'h2_proxy': default_unsecure_fixture_options._replace(
@@ -69,8 +69,8 @@ END2END_FIXTURES = {
TestOptions = collections.namedtuple(
'TestOptions',
- 'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2')
-default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False)
+ 'needs_fullstack needs_dns needs_names proxyable secure traceable cpu_cost exclude_iomgrs large_writes flaky allows_compression needs_compression exclude_inproc needs_http2 needs_proxy_auth')
+default_test_options = TestOptions(False, False, False, True, False, True, 1.0, [], False, False, True, False, False, False, False)
connectivity_test_options = default_test_options._replace(needs_fullstack=True)
LOWCPU = 0.1
@@ -128,6 +128,7 @@ END2END_TESTS = {
'load_reporting_hook': default_test_options,
'ping_pong_streaming': default_test_options._replace(cpu_cost=LOWCPU),
'ping': connectivity_test_options._replace(proxyable=False, cpu_cost=LOWCPU),
+ 'proxy_auth': default_test_options._replace(needs_proxy_auth=True),
'registered_call': default_test_options,
'request_with_flags': default_test_options._replace(
proxyable=False, cpu_cost=LOWCPU),
@@ -178,6 +179,9 @@ def compatible(f, t):
if END2END_TESTS[t].needs_http2:
if not END2END_FIXTURES[f].is_http2:
return False
+ if END2END_TESTS[t].needs_proxy_auth:
+ if not END2END_FIXTURES[f].supports_proxy_auth:
+ return False
return True
diff --git a/test/core/end2end/generate_tests.bzl b/test/core/end2end/generate_tests.bzl
index ea9ad03513..6d1917c0ff 100755
--- a/test/core/end2end/generate_tests.bzl
+++ b/test/core/end2end/generate_tests.bzl
@@ -21,7 +21,7 @@ load("//bazel:grpc_build_system.bzl", "grpc_sh_test", "grpc_cc_binary", "grpc_cc
def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True,
name_resolution=True, secure=True, tracing=False,
platforms=['windows', 'linux', 'mac', 'posix'],
- is_inproc=False, is_http2=True):
+ is_inproc=False, is_http2=True, supports_proxy_auth=False):
return struct(
fullstack=fullstack,
includes_proxy=includes_proxy,
@@ -30,7 +30,8 @@ def fixture_options(fullstack=True, includes_proxy=False, dns_resolver=True,
secure=secure,
tracing=tracing,
is_inproc=is_inproc,
- is_http2=is_http2
+ is_http2=is_http2,
+ supports_proxy_auth=supports_proxy_auth
#platforms=platforms
)
@@ -47,7 +48,7 @@ END2END_FIXTURES = {
'h2_full+pipe': fixture_options(platforms=['linux']),
'h2_full+trace': fixture_options(tracing=True),
'h2_full+workarounds': fixture_options(),
- 'h2_http_proxy': fixture_options(),
+ 'h2_http_proxy': fixture_options(supports_proxy_auth=True),
'h2_oauth2': fixture_options(),
'h2_proxy': fixture_options(includes_proxy=True),
'h2_sockpair_1byte': fixture_options(fullstack=False, dns_resolver=False),
@@ -67,7 +68,8 @@ END2END_FIXTURES = {
def test_options(needs_fullstack=False, needs_dns=False, needs_names=False,
proxyable=True, secure=False, traceable=False,
- exclude_inproc=False, needs_http2=False):
+ exclude_inproc=False, needs_http2=False,
+ needs_proxy_auth=False):
return struct(
needs_fullstack=needs_fullstack,
needs_dns=needs_dns,
@@ -76,7 +78,8 @@ def test_options(needs_fullstack=False, needs_dns=False, needs_names=False,
secure=secure,
traceable=traceable,
exclude_inproc=exclude_inproc,
- needs_http2=needs_http2
+ needs_http2=needs_http2,
+ needs_proxy_auth=needs_proxy_auth
)
@@ -123,6 +126,7 @@ END2END_TESTS = {
'load_reporting_hook': test_options(),
'ping_pong_streaming': test_options(),
'ping': test_options(needs_fullstack=True, proxyable=False),
+ 'proxy_auth': test_options(needs_proxy_auth=True),
'registered_call': test_options(),
'request_with_flags': test_options(proxyable=False),
'request_with_payload': test_options(),
@@ -165,6 +169,9 @@ def compatible(fopt, topt):
if topt.needs_http2:
if not fopt.is_http2:
return False
+ if topt.needs_proxy_auth:
+ if not fopt.supports_proxy_auth:
+ return False
return True
diff --git a/test/core/end2end/tests/proxy_auth.c b/test/core/end2end/tests/proxy_auth.c
new file mode 100644
index 0000000000..d922049bcb
--- /dev/null
+++ b/test/core/end2end/tests/proxy_auth.c
@@ -0,0 +1,235 @@
+/*
+ *
+ * Copyright 2015 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * This test is for checking whether proxy authentication is working with HTTP
+ * Connect.
+ */
+#include "test/core/end2end/end2end_tests.h"
+#include "test/core/end2end/fixtures/http_proxy_fixture.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/byte_buffer.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/time.h>
+#include <grpc/support/useful.h>
+#include "src/core/lib/support/string.h"
+#include "test/core/end2end/cq_verifier.h"
+
+static void *tag(intptr_t t) { return (void *)t; }
+
+static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
+ const char *test_name,
+ grpc_channel_args *client_args,
+ grpc_channel_args *server_args) {
+ grpc_end2end_test_fixture f;
+ gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
+ f = config.create_fixture(client_args, server_args);
+ config.init_server(&f, server_args);
+ config.init_client(&f, client_args);
+ return f;
+}
+
+static gpr_timespec n_seconds_from_now(int n) {
+ return grpc_timeout_seconds_to_deadline(n);
+}
+
+static gpr_timespec five_seconds_from_now(void) {
+ return n_seconds_from_now(5);
+}
+
+static void drain_cq(grpc_completion_queue *cq) {
+ grpc_event ev;
+ do {
+ ev = grpc_completion_queue_next(cq, five_seconds_from_now(), NULL);
+ } while (ev.type != GRPC_QUEUE_SHUTDOWN);
+}
+
+static void shutdown_server(grpc_end2end_test_fixture *f) {
+ if (!f->server) return;
+ grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
+ GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
+ grpc_timeout_seconds_to_deadline(5),
+ NULL)
+ .type == GRPC_OP_COMPLETE);
+ grpc_server_destroy(f->server);
+ f->server = NULL;
+}
+
+static void shutdown_client(grpc_end2end_test_fixture *f) {
+ if (!f->client) return;
+ grpc_channel_destroy(f->client);
+ f->client = NULL;
+}
+
+static void end_test(grpc_end2end_test_fixture *f) {
+ shutdown_server(f);
+ shutdown_client(f);
+
+ grpc_completion_queue_shutdown(f->cq);
+ drain_cq(f->cq);
+ grpc_completion_queue_destroy(f->cq);
+ grpc_completion_queue_destroy(f->shutdown_cq);
+}
+
+static void simple_request_body(grpc_end2end_test_config config,
+ grpc_end2end_test_fixture f) {
+ grpc_call *c;
+ grpc_call *s;
+ cq_verifier *cqv = cq_verifier_create(f.cq);
+ grpc_op ops[6];
+ grpc_op *op;
+ grpc_metadata_array initial_metadata_recv;
+ grpc_metadata_array trailing_metadata_recv;
+ grpc_metadata_array request_metadata_recv;
+ grpc_call_details call_details;
+ grpc_status_code status;
+ grpc_call_error error;
+ grpc_slice details;
+ int was_cancelled = 2;
+ char *peer;
+
+ gpr_timespec deadline = five_seconds_from_now();
+ c = grpc_channel_create_call(
+ f.client, NULL, GRPC_PROPAGATE_DEFAULTS, f.cq,
+ grpc_slice_from_static_string("/foo"),
+ get_host_override_slice("foo.test.google.fr:1234", config), deadline,
+ NULL);
+ GPR_ASSERT(c);
+
+ peer = grpc_call_get_peer(c);
+ GPR_ASSERT(peer != NULL);
+ gpr_log(GPR_DEBUG, "client_peer_before_call=%s", peer);
+ gpr_free(peer);
+
+ grpc_metadata_array_init(&initial_metadata_recv);
+ grpc_metadata_array_init(&trailing_metadata_recv);
+ grpc_metadata_array_init(&request_metadata_recv);
+ grpc_call_details_init(&call_details);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_INITIAL_METADATA;
+ op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
+ op->data.recv_status_on_client.status = &status;
+ op->data.recv_status_on_client.status_details = &details;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ error =
+ grpc_server_request_call(f.server, &s, &call_details,
+ &request_metadata_recv, f.cq, f.cq, tag(101));
+ GPR_ASSERT(GRPC_CALL_OK == error);
+ CQ_EXPECT_COMPLETION(cqv, tag(101), 1);
+ cq_verify(cqv);
+
+ peer = grpc_call_get_peer(s);
+ GPR_ASSERT(peer != NULL);
+ gpr_log(GPR_DEBUG, "server_peer=%s", peer);
+ gpr_free(peer);
+ peer = grpc_call_get_peer(c);
+ GPR_ASSERT(peer != NULL);
+ gpr_log(GPR_DEBUG, "client_peer=%s", peer);
+ gpr_free(peer);
+
+ memset(ops, 0, sizeof(ops));
+ op = ops;
+ op->op = GRPC_OP_SEND_INITIAL_METADATA;
+ op->data.send_initial_metadata.count = 0;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op->data.send_status_from_server.trailing_metadata_count = 0;
+ op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
+ grpc_slice status_details = grpc_slice_from_static_string("xyz");
+ op->data.send_status_from_server.status_details = &status_details;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op->data.recv_close_on_server.cancelled = &was_cancelled;
+ op->flags = 0;
+ op->reserved = NULL;
+ op++;
+ error = grpc_call_start_batch(s, ops, (size_t)(op - ops), tag(102), NULL);
+ GPR_ASSERT(GRPC_CALL_OK == error);
+
+ CQ_EXPECT_COMPLETION(cqv, tag(102), 1);
+ CQ_EXPECT_COMPLETION(cqv, tag(1), 1);
+ cq_verify(cqv);
+
+ GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
+ GPR_ASSERT(0 == grpc_slice_str_cmp(details, "xyz"));
+ GPR_ASSERT(0 == grpc_slice_str_cmp(call_details.method, "/foo"));
+ validate_host_override_string("foo.test.google.fr:1234", call_details.host,
+ config);
+ GPR_ASSERT(0 == call_details.flags);
+ GPR_ASSERT(was_cancelled == 1);
+
+ grpc_slice_unref(details);
+ grpc_metadata_array_destroy(&initial_metadata_recv);
+ grpc_metadata_array_destroy(&trailing_metadata_recv);
+ grpc_metadata_array_destroy(&request_metadata_recv);
+ grpc_call_details_destroy(&call_details);
+
+ grpc_call_unref(c);
+ grpc_call_unref(s);
+
+ cq_verifier_destroy(cqv);
+}
+
+static void test_invoke_proxy_auth(grpc_end2end_test_config config) {
+ /* Indicate that the proxy requires user auth */
+ grpc_arg client_arg = {.type = GRPC_ARG_STRING,
+ .key = GRPC_ARG_HTTP_PROXY_AUTH_CREDS,
+ .value.string = GRPC_TEST_HTTP_PROXY_AUTH_CREDS};
+ grpc_channel_args client_args = {.num_args = 1, .args = &client_arg};
+ grpc_end2end_test_fixture f =
+ begin_test(config, "test_invoke_proxy_auth", &client_args, NULL);
+ simple_request_body(config, f);
+ end_test(&f);
+ config.tear_down_data(&f);
+}
+
+void proxy_auth(grpc_end2end_test_config config) {
+ test_invoke_proxy_auth(config);
+}
+
+void proxy_auth_pre_init(void) {}
diff --git a/test/core/surface/completion_queue_threading_test.c b/test/core/surface/completion_queue_threading_test.c
index 366565fb35..99d0fa4980 100644
--- a/test/core/surface/completion_queue_threading_test.c
+++ b/test/core/surface/completion_queue_threading_test.c
@@ -190,7 +190,8 @@ static void consumer_thread(void *arg) {
gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
for (;;) {
- ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL);
+ ev = grpc_completion_queue_next(opt->cc,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
switch (ev.type) {
case GRPC_OP_COMPLETE:
GPR_ASSERT(ev.success);
diff --git a/test/cpp/qps/BUILD b/test/cpp/qps/BUILD
index 93fb53446c..b3348b76fa 100644
--- a/test/cpp/qps/BUILD
+++ b/test/cpp/qps/BUILD
@@ -81,6 +81,7 @@ grpc_cc_library(
"//src/proto/grpc/testing:services_proto",
"//test/core/util:gpr_test_util",
"//test/core/util:grpc_test_util",
+ "//test/cpp/util:test_util",
],
)
@@ -148,6 +149,7 @@ grpc_cc_binary(
":driver_impl",
"//:grpc++",
"//test/cpp/util:test_config",
+ "//test/cpp/util:test_util",
],
external_deps = [
"gflags",
@@ -162,6 +164,7 @@ grpc_cc_test(
":driver_impl",
":qps_worker_impl",
"//test/cpp/util:test_config",
+ "//test/cpp/util:test_util",
],
)
@@ -173,6 +176,7 @@ grpc_cc_test(
":driver_impl",
"//:grpc++",
"//test/cpp/util:test_config",
+ "//test/cpp/util:test_util",
],
)
diff --git a/test/cpp/qps/client.h b/test/cpp/qps/client.h
index ecbf9c31fa..6c4d92e859 100644
--- a/test/cpp/qps/client.h
+++ b/test/cpp/qps/client.h
@@ -39,6 +39,7 @@
#include "test/cpp/qps/interarrival.h"
#include "test/cpp/qps/usage_timer.h"
#include "test/cpp/util/create_test_channel.h"
+#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@@ -405,9 +406,18 @@ class ClientImpl : public Client {
ChannelArguments args;
args.SetInt("shard_to_ensure_no_subchannel_merges", shard);
set_channel_args(config, &args);
+
+ grpc::string type;
+ if (config.has_security_params() &&
+ config.security_params().cred_type().empty()) {
+ type = kTlsCredentialsType;
+ } else {
+ type = config.security_params().cred_type();
+ }
+
channel_ = CreateTestChannel(
- target, config.security_params().server_host_override(),
- config.has_security_params(), !config.security_params().use_test_ca(),
+ target, type, config.security_params().server_host_override(),
+ !config.security_params().use_test_ca(),
std::shared_ptr<CallCredentials>(), args);
gpr_log(GPR_INFO, "Connecting to %s", target.c_str());
GPR_ASSERT(channel_->WaitForConnected(
diff --git a/test/cpp/qps/driver.cc b/test/cpp/qps/driver.cc
index fbd8d1b1e7..4458e389e7 100644
--- a/test/cpp/qps/driver.cc
+++ b/test/cpp/qps/driver.cc
@@ -40,6 +40,7 @@
#include "test/cpp/qps/histogram.h"
#include "test/cpp/qps/qps_worker.h"
#include "test/cpp/qps/stats.h"
+#include "test/cpp/util/test_credentials_provider.h"
using std::list;
using std::thread;
@@ -172,13 +173,26 @@ static void postprocess_scenario_result(ScenarioResult* result) {
sum(result->client_stats(), CliPollCount) / histogram.Count());
result->mutable_summary()->set_server_polls_per_request(
sum(result->server_stats(), SvrPollCount) / histogram.Count());
+
+ auto server_queries_per_cpu_sec =
+ histogram.Count() / (sum(result->server_stats(), ServerSystemTime) +
+ sum(result->server_stats(), ServerUserTime));
+ auto client_queries_per_cpu_sec =
+ histogram.Count() / (sum(result->client_stats(), SystemTime) +
+ sum(result->client_stats(), UserTime));
+
+ result->mutable_summary()->set_server_queries_per_cpu_sec(
+ server_queries_per_cpu_sec);
+ result->mutable_summary()->set_client_queries_per_cpu_sec(
+ client_queries_per_cpu_sec);
}
std::unique_ptr<ScenarioResult> RunScenario(
const ClientConfig& initial_client_config, size_t num_clients,
const ServerConfig& initial_server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
- const char* qps_server_target_override) {
+ const grpc::string& qps_server_target_override,
+ const grpc::string& credential_type) {
// Log everything from the driver
gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
@@ -214,7 +228,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
}
int driver_port = grpc_pick_unused_port_or_die();
- local_workers.emplace_back(new QpsWorker(driver_port));
+ local_workers.emplace_back(new QpsWorker(driver_port, 0, credential_type));
char addr[256];
sprintf(addr, "localhost:%d", driver_port);
if (spawn_local_worker_count < 0) {
@@ -246,12 +260,14 @@ std::unique_ptr<ScenarioResult> RunScenario(
};
std::vector<ServerData> servers(num_servers);
std::unordered_map<string, std::deque<int>> hosts_cores;
+ ChannelArguments channel_args;
for (size_t i = 0; i < num_servers; i++) {
gpr_log(GPR_INFO, "Starting server on %s (worker #%" PRIuPTR ")",
workers[i].c_str(), i);
- servers[i].stub = WorkerService::NewStub(
- CreateChannel(workers[i], InsecureChannelCredentials()));
+ servers[i].stub = WorkerService::NewStub(CreateChannel(
+ workers[i], GetCredentialsProvider()->GetChannelCredentials(
+ credential_type, &channel_args)));
ServerConfig server_config = initial_server_config;
if (server_config.core_limit() != 0) {
@@ -269,8 +285,7 @@ std::unique_ptr<ScenarioResult> RunScenario(
if (!servers[i].stream->Read(&init_status)) {
gpr_log(GPR_ERROR, "Server %zu did not yield initial status", i);
}
- if (qps_server_target_override != NULL &&
- strlen(qps_server_target_override) > 0) {
+ if (qps_server_target_override.length() > 0) {
// overriding the qps server target only works if there is 1 server
GPR_ASSERT(num_servers == 1);
client_config.add_server_targets(qps_server_target_override);
@@ -298,7 +313,8 @@ std::unique_ptr<ScenarioResult> RunScenario(
gpr_log(GPR_INFO, "Starting client on %s (worker #%" PRIuPTR ")",
worker.c_str(), i + num_servers);
clients[i].stub = WorkerService::NewStub(
- CreateChannel(worker, InsecureChannelCredentials()));
+ CreateChannel(worker, GetCredentialsProvider()->GetChannelCredentials(
+ credential_type, &channel_args)));
ClientConfig per_client_config = client_config;
if (initial_client_config.core_limit() != 0) {
@@ -483,16 +499,19 @@ std::unique_ptr<ScenarioResult> RunScenario(
return result;
}
-bool RunQuit() {
+bool RunQuit(const grpc::string& credential_type) {
// Get client, server lists
bool result = true;
auto workers = get_workers("QPS_WORKERS");
if (workers.size() == 0) {
return false;
}
+
+ ChannelArguments channel_args;
for (size_t i = 0; i < workers.size(); i++) {
- auto stub = WorkerService::NewStub(
- CreateChannel(workers[i], InsecureChannelCredentials()));
+ auto stub = WorkerService::NewStub(CreateChannel(
+ workers[i], GetCredentialsProvider()->GetChannelCredentials(
+ credential_type, &channel_args)));
Void dummy;
grpc::ClientContext ctx;
ctx.set_wait_for_ready(true);
diff --git a/test/cpp/qps/driver.h b/test/cpp/qps/driver.h
index def32c6f0e..29f2776d79 100644
--- a/test/cpp/qps/driver.h
+++ b/test/cpp/qps/driver.h
@@ -31,9 +31,10 @@ std::unique_ptr<ScenarioResult> RunScenario(
const grpc::testing::ClientConfig& client_config, size_t num_clients,
const grpc::testing::ServerConfig& server_config, size_t num_servers,
int warmup_seconds, int benchmark_seconds, int spawn_local_worker_count,
- const char* qps_server_target_override = "");
+ const grpc::string& qps_server_target_override,
+ const grpc::string& credential_type);
-bool RunQuit();
+bool RunQuit(const grpc::string& credential_type);
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/qps_json_driver.cc b/test/cpp/qps/qps_json_driver.cc
index e1e5802d13..cca59f64d8 100644
--- a/test/cpp/qps/qps_json_driver.cc
+++ b/test/cpp/qps/qps_json_driver.cc
@@ -31,6 +31,7 @@
#include "test/cpp/qps/parse_json.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/test_config.h"
+#include "test/cpp/util/test_credentials_provider.h"
DEFINE_string(scenarios_file, "",
"JSON file containing an array of Scenario objects");
@@ -61,6 +62,9 @@ DEFINE_string(qps_server_target_override, "",
DEFINE_string(json_file_out, "", "File to write the JSON output to.");
+DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
+ "Credential type for communication with workers");
+
namespace grpc {
namespace testing {
@@ -72,7 +76,7 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
scenario.server_config(), scenario.num_servers(),
scenario.warmup_seconds(), scenario.benchmark_seconds(),
scenario.spawn_local_worker_count(),
- FLAGS_qps_server_target_override.c_str());
+ FLAGS_qps_server_target_override, FLAGS_credential_type);
// Amend the result with scenario config. Eventually we should adjust
// RunScenario contract so we don't need to touch the result here.
@@ -84,6 +88,7 @@ static std::unique_ptr<ScenarioResult> RunAndReport(const Scenario& scenario,
GetReporter()->ReportTimes(*result);
GetReporter()->ReportCpuUsage(*result);
GetReporter()->ReportPollCount(*result);
+ GetReporter()->ReportQueriesPerCpuSec(*result);
for (int i = 0; *success && i < result->client_success_size(); i++) {
*success = result->client_success(i);
@@ -185,7 +190,7 @@ static bool QpsDriver() {
} else if (scjson) {
json = FLAGS_scenarios_json.c_str();
} else if (FLAGS_quit) {
- return RunQuit();
+ return RunQuit(FLAGS_credential_type);
}
// Parse into an array of scenarios
diff --git a/test/cpp/qps/qps_openloop_test.cc b/test/cpp/qps/qps_openloop_test.cc
index 2f8a3d75f0..069b3fa076 100644
--- a/test/cpp/qps/qps_openloop_test.cc
+++ b/test/cpp/qps/qps_openloop_test.cc
@@ -25,6 +25,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/test_config.h"
+#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@@ -48,8 +49,8 @@ static void RunQPS() {
server_config.set_server_type(ASYNC_SERVER);
server_config.set_async_server_threads(8);
- const auto result =
- RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
+ const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP,
+ BENCHMARK, -2, "", kInsecureCredentialsType);
GetReporter()->ReportQPSPerCore(*result);
GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/qps_worker.cc b/test/cpp/qps/qps_worker.cc
index 10bc5422e1..d20bc1b074 100644
--- a/test/cpp/qps/qps_worker.cc
+++ b/test/cpp/qps/qps_worker.cc
@@ -41,6 +41,7 @@
#include "test/cpp/qps/client.h"
#include "test/cpp/qps/server.h"
#include "test/cpp/util/create_test_channel.h"
+#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@@ -263,7 +264,8 @@ class WorkerServiceImpl final : public WorkerService::Service {
QpsWorker* worker_;
};
-QpsWorker::QpsWorker(int driver_port, int server_port) {
+QpsWorker::QpsWorker(int driver_port, int server_port,
+ const grpc::string& credential_type) {
impl_.reset(new WorkerServiceImpl(server_port, this));
gpr_atm_rel_store(&done_, static_cast<gpr_atm>(0));
@@ -271,7 +273,9 @@ QpsWorker::QpsWorker(int driver_port, int server_port) {
gpr_join_host_port(&server_address, "::", driver_port);
ServerBuilder builder;
- builder.AddListeningPort(server_address, InsecureServerCredentials());
+ builder.AddListeningPort(
+ server_address,
+ GetCredentialsProvider()->GetServerCredentials(credential_type));
builder.RegisterService(impl_.get());
gpr_free(server_address);
diff --git a/test/cpp/qps/qps_worker.h b/test/cpp/qps/qps_worker.h
index c8a7be9360..360125fb17 100644
--- a/test/cpp/qps/qps_worker.h
+++ b/test/cpp/qps/qps_worker.h
@@ -21,6 +21,7 @@
#include <memory>
+#include <grpc++/support/config.h>
#include <grpc/support/atm.h>
namespace grpc {
@@ -33,7 +34,8 @@ class WorkerServiceImpl;
class QpsWorker {
public:
- explicit QpsWorker(int driver_port, int server_port = 0);
+ explicit QpsWorker(int driver_port, int server_port,
+ const grpc::string& credential_type);
~QpsWorker();
bool Done() const;
diff --git a/test/cpp/qps/report.cc b/test/cpp/qps/report.cc
index 809c563044..a45b10bcb8 100644
--- a/test/cpp/qps/report.cc
+++ b/test/cpp/qps/report.cc
@@ -71,6 +71,12 @@ void CompositeReporter::ReportPollCount(const ScenarioResult& result) {
}
}
+void CompositeReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
+ for (size_t i = 0; i < reporters_.size(); ++i) {
+ reporters_[i]->ReportQueriesPerCpuSec(result);
+ }
+}
+
void GprLogReporter::ReportQPS(const ScenarioResult& result) {
gpr_log(GPR_INFO, "QPS: %.1f", result.summary().qps());
if (result.summary().failed_requests_per_second() > 0) {
@@ -119,6 +125,13 @@ void GprLogReporter::ReportPollCount(const ScenarioResult& result) {
result.summary().server_polls_per_request());
}
+void GprLogReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
+ gpr_log(GPR_INFO, "Server Queries/CPU-sec: %.2f",
+ result.summary().server_queries_per_cpu_sec());
+ gpr_log(GPR_INFO, "Client Queries/CPU-sec: %.2f",
+ result.summary().client_queries_per_cpu_sec());
+}
+
void JsonReporter::ReportQPS(const ScenarioResult& result) {
grpc::string json_string =
SerializeJson(result, "type.googleapis.com/grpc.testing.ScenarioResult");
@@ -147,6 +160,10 @@ void JsonReporter::ReportPollCount(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
+void JsonReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
+ // NOP - all reporting is handled by ReportQPS.
+}
+
void RpcReporter::ReportQPS(const ScenarioResult& result) {
grpc::ClientContext context;
grpc::Status status;
@@ -183,5 +200,9 @@ void RpcReporter::ReportPollCount(const ScenarioResult& result) {
// NOP - all reporting is handled by ReportQPS.
}
+void RpcReporter::ReportQueriesPerCpuSec(const ScenarioResult& result) {
+ // NOP - all reporting is handled by ReportQPS.
+}
+
} // namespace testing
} // namespace grpc
diff --git a/test/cpp/qps/report.h b/test/cpp/qps/report.h
index 0bd398fd2a..321be2a97f 100644
--- a/test/cpp/qps/report.h
+++ b/test/cpp/qps/report.h
@@ -64,6 +64,9 @@ class Reporter {
/** Reports client and server poll usage inside completion queue. */
virtual void ReportPollCount(const ScenarioResult& result) = 0;
+ /** Reports queries per cpu-sec. */
+ virtual void ReportQueriesPerCpuSec(const ScenarioResult& result) = 0;
+
private:
const string name_;
};
@@ -82,6 +85,7 @@ class CompositeReporter : public Reporter {
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
+ void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
private:
std::vector<std::unique_ptr<Reporter> > reporters_;
@@ -99,6 +103,7 @@ class GprLogReporter : public Reporter {
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
+ void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
};
/** Dumps the report to a JSON file. */
@@ -114,6 +119,7 @@ class JsonReporter : public Reporter {
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
+ void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
const string report_file_;
};
@@ -130,6 +136,7 @@ class RpcReporter : public Reporter {
void ReportTimes(const ScenarioResult& result) override;
void ReportCpuUsage(const ScenarioResult& result) override;
void ReportPollCount(const ScenarioResult& result) override;
+ void ReportQueriesPerCpuSec(const ScenarioResult& result) override;
std::unique_ptr<ReportQpsScenarioService::Stub> stub_;
};
diff --git a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
index 1ee6e37474..137b33ee25 100644
--- a/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
+++ b/test/cpp/qps/secure_sync_unary_ping_pong_test.cc
@@ -24,6 +24,7 @@
#include "test/cpp/qps/driver.h"
#include "test/cpp/qps/report.h"
#include "test/cpp/util/test_config.h"
+#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@@ -51,8 +52,8 @@ static void RunSynchronousUnaryPingPong() {
client_config.mutable_security_params()->CopyFrom(security);
server_config.mutable_security_params()->CopyFrom(security);
- const auto result =
- RunScenario(client_config, 1, server_config, 1, WARMUP, BENCHMARK, -2);
+ const auto result = RunScenario(client_config, 1, server_config, 1, WARMUP,
+ BENCHMARK, -2, "", kInsecureCredentialsType);
GetReporter()->ReportQPS(*result);
GetReporter()->ReportLatency(*result);
diff --git a/test/cpp/qps/server.h b/test/cpp/qps/server.h
index 4b699e0708..c0dac96d8b 100644
--- a/test/cpp/qps/server.h
+++ b/test/cpp/qps/server.h
@@ -32,6 +32,7 @@
#include "test/core/end2end/data/ssl_test_data.h"
#include "test/core/util/port.h"
#include "test/cpp/qps/usage_timer.h"
+#include "test/cpp/util/test_credentials_provider.h"
namespace grpc {
namespace testing {
@@ -89,12 +90,14 @@ class Server {
static std::shared_ptr<ServerCredentials> CreateServerCredentials(
const ServerConfig& config) {
if (config.has_security_params()) {
- SslServerCredentialsOptions::PemKeyCertPair pkcp = {test_server1_key,
- test_server1_cert};
- SslServerCredentialsOptions ssl_opts;
- ssl_opts.pem_root_certs = "";
- ssl_opts.pem_key_cert_pairs.push_back(pkcp);
- return SslServerCredentials(ssl_opts);
+ grpc::string type;
+ if (config.security_params().cred_type().empty()) {
+ type = kTlsCredentialsType;
+ } else {
+ type = config.security_params().cred_type();
+ }
+
+ return GetCredentialsProvider()->GetServerCredentials(type);
} else {
return InsecureServerCredentials();
}
diff --git a/test/cpp/qps/worker.cc b/test/cpp/qps/worker.cc
index fd51d32db0..27010b7315 100644
--- a/test/cpp/qps/worker.cc
+++ b/test/cpp/qps/worker.cc
@@ -27,9 +27,12 @@
#include "test/cpp/qps/qps_worker.h"
#include "test/cpp/util/test_config.h"
+#include "test/cpp/util/test_credentials_provider.h"
DEFINE_int32(driver_port, 0, "Port for communication with driver");
DEFINE_int32(server_port, 0, "Port for operation as a server");
+DEFINE_string(credential_type, grpc::testing::kInsecureCredentialsType,
+ "Credential type for communication with driver");
static bool got_sigint = false;
@@ -39,7 +42,7 @@ namespace grpc {
namespace testing {
static void RunServer() {
- QpsWorker worker(FLAGS_driver_port, FLAGS_server_port);
+ QpsWorker worker(FLAGS_driver_port, FLAGS_server_port, FLAGS_credential_type);
while (!got_sigint && !worker.Done()) {
gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
diff --git a/test/cpp/util/create_test_channel.cc b/test/cpp/util/create_test_channel.cc
index 68c6fe4622..34b6d60d01 100644
--- a/test/cpp/util/create_test_channel.cc
+++ b/test/cpp/util/create_test_channel.cc
@@ -51,29 +51,31 @@ void AddProdSslType() {
} // namespace
-// When ssl is enabled, if server is empty, override_hostname is used to
+// When cred_type is 'ssl', if server is empty, override_hostname is used to
// create channel. Otherwise, connect to server and override hostname if
// override_hostname is provided.
-// When ssl is not enabled, override_hostname is ignored.
+// When cred_type is not 'ssl', override_hostname is ignored.
// Set use_prod_root to true to use the SSL root for connecting to google.
// In this case, path to the roots pem file must be set via environment variable
// GRPC_DEFAULT_SSL_ROOTS_FILE_PATH.
// Otherwise, root for test SSL cert will be used.
-// creds will be used to create a channel when enable_ssl is true.
+// creds will be used to create a channel when cred_type is 'ssl'.
// Use examples:
// CreateTestChannel(
-// "1.1.1.1:12345", "override.hostname.com", true, false, creds);
-// CreateTestChannel("test.google.com:443", "", true, true, creds);
+// "1.1.1.1:12345", "ssl", "override.hostname.com", false, creds);
+// CreateTestChannel("test.google.com:443", "ssl", "", true, creds);
// same as above
-// CreateTestChannel("", "test.google.com:443", true, true, creds);
+// CreateTestChannel("", "ssl", "test.google.com:443", true, creds);
std::shared_ptr<Channel> CreateTestChannel(
- const grpc::string& server, const grpc::string& override_hostname,
- bool enable_ssl, bool use_prod_roots,
+ const grpc::string& server, const grpc::string& cred_type,
+ const grpc::string& override_hostname, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds,
const ChannelArguments& args) {
ChannelArguments channel_args(args);
std::shared_ptr<ChannelCredentials> channel_creds;
- if (enable_ssl) {
+ if (cred_type.empty()) {
+ return CreateChannel(server, InsecureChannelCredentials());
+ } else if (cred_type == testing::kTlsCredentialsType) { // cred_type == "ssl"
if (use_prod_roots) {
gpr_once_init(&g_once_init_add_prod_ssl_provider, &AddProdSslType);
channel_creds = testing::GetCredentialsProvider()->GetChannelCredentials(
@@ -95,13 +97,31 @@ std::shared_ptr<Channel> CreateTestChannel(
}
return CreateCustomChannel(connect_to, channel_creds, channel_args);
} else {
- return CreateChannel(server, InsecureChannelCredentials());
+ channel_creds = testing::GetCredentialsProvider()->GetChannelCredentials(
+ cred_type, &channel_args);
+ GPR_ASSERT(channel_creds != nullptr);
+
+ return CreateChannel(server, channel_creds);
}
}
std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& override_hostname,
bool enable_ssl, bool use_prod_roots,
+ const std::shared_ptr<CallCredentials>& creds,
+ const ChannelArguments& args) {
+ grpc::string type;
+ if (enable_ssl) {
+ type = testing::kTlsCredentialsType;
+ }
+
+ return CreateTestChannel(server, type, override_hostname, use_prod_roots,
+ creds, args);
+}
+
+std::shared_ptr<Channel> CreateTestChannel(
+ const grpc::string& server, const grpc::string& override_hostname,
+ bool enable_ssl, bool use_prod_roots,
const std::shared_ptr<CallCredentials>& creds) {
return CreateTestChannel(server, override_hostname, enable_ssl,
use_prod_roots, creds, ChannelArguments());
diff --git a/test/cpp/util/create_test_channel.h b/test/cpp/util/create_test_channel.h
index 9b4b09171e..e2ca8f99b4 100644
--- a/test/cpp/util/create_test_channel.h
+++ b/test/cpp/util/create_test_channel.h
@@ -45,6 +45,12 @@ std::shared_ptr<Channel> CreateTestChannel(
const ChannelArguments& args);
std::shared_ptr<Channel> CreateTestChannel(
+ const grpc::string& server, const grpc::string& cred_type,
+ const grpc::string& override_hostname, bool use_prod_roots,
+ const std::shared_ptr<CallCredentials>& creds,
+ const ChannelArguments& args);
+
+std::shared_ptr<Channel> CreateTestChannel(
const grpc::string& server, const grpc::string& credential_type,
const std::shared_ptr<CallCredentials>& creds);
diff --git a/tools/distrib/python/grpcio_tools/setup.py b/tools/distrib/python/grpcio_tools/setup.py
index bc24e4c462..ddbb348e0c 100644
--- a/tools/distrib/python/grpcio_tools/setup.py
+++ b/tools/distrib/python/grpcio_tools/setup.py
@@ -62,9 +62,9 @@ if EXTRA_ENV_COMPILE_ARGS is None:
# envvars) without adding yet more GRPC-specific envvars.
# See https://sourceforge.net/p/mingw-w64/bugs/363/
if '32' in platform.architecture()[0]:
- EXTRA_ENV_COMPILE_ARGS += ' -D_ftime=_ftime32 -D_timeb=__timeb32 -D_ftime_s=_ftime32_s'
+ EXTRA_ENV_COMPILE_ARGS += ' -D_ftime=_ftime32 -D_timeb=__timeb32 -D_ftime_s=_ftime32_s -D_hypot=hypot'
else:
- EXTRA_ENV_COMPILE_ARGS += ' -D_ftime=_ftime64 -D_timeb=__timeb64'
+ EXTRA_ENV_COMPILE_ARGS += ' -D_ftime=_ftime64 -D_timeb=__timeb64 -D_hypot=hypot'
else:
# We need to statically link the C++ Runtime, only the C runtime is
# available dynamically
diff --git a/tools/doxygen/Doxyfile.core.internal b/tools/doxygen/Doxyfile.core.internal
index 4ad8a0ddb4..b747da3a67 100644
--- a/tools/doxygen/Doxyfile.core.internal
+++ b/tools/doxygen/Doxyfile.core.internal
@@ -1123,6 +1123,7 @@ src/core/lib/iomgr/iomgr_internal.h \
src/core/lib/iomgr/iomgr_posix.c \
src/core/lib/iomgr/iomgr_posix.h \
src/core/lib/iomgr/iomgr_uv.c \
+src/core/lib/iomgr/iomgr_uv.h \
src/core/lib/iomgr/iomgr_windows.c \
src/core/lib/iomgr/is_epollexclusive_available.c \
src/core/lib/iomgr/is_epollexclusive_available.h \
diff --git a/tools/internal_ci/linux/grpc_build_artifacts.sh b/tools/internal_ci/linux/grpc_build_artifacts.sh
index bc29014ab5..3997a13087 100755
--- a/tools/internal_ci/linux/grpc_build_artifacts.sh
+++ b/tools/internal_ci/linux/grpc_build_artifacts.sh
@@ -20,9 +20,10 @@ cd $(dirname $0)/../../..
source tools/internal_ci/helper_scripts/prepare_build_linux_rc
-# TODO(jtattermusch): install ruby on the internal_ci worker
-gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3
-# TODO(jtattermusch): grep works around https://github.com/rvm/rvm/issues/4068
-curl -sSL https://get.rvm.io | grep -v __rvm_print_headline | bash -s stable --ruby
+set +ex
+[[ -s /etc/profile.d/rvm.sh ]] && . /etc/profile.d/rvm.sh
+set -e # rvm commands are very verbose
+rvm --default use ruby-2.4.1
+set -ex
tools/run_tests/task_runner.py -f artifact linux
diff --git a/tools/internal_ci/windows/grpc_build_artifacts.bat b/tools/internal_ci/windows/grpc_build_artifacts.bat
index d5ade5a83d..8cfa2fb0be 100644
--- a/tools/internal_ci/windows/grpc_build_artifacts.bat
+++ b/tools/internal_ci/windows/grpc_build_artifacts.bat
@@ -19,10 +19,13 @@ rename C:\Python34_32bit Python34_32bits
rename C:\Python35_32bit Python35_32bits
rename C:\Python36_32bit Python36_32bits
+pacman -S --noconfirm mingw64/mingw-w64-x86_64-gcc mingw32/mingw-w64-i686-gcc
+
@rem make sure msys binaries are preferred over cygwin binaries
@rem set path to python 2.7
set PATH=C:\tools\msys64\usr\bin;C:\Python27;%PATH%
+
@rem enter repo root
cd /d %~dp0\..\..\..
diff --git a/tools/interop_matrix/client_matrix.py b/tools/interop_matrix/client_matrix.py
index 4394e32c08..4d1b5f0169 100644
--- a/tools/interop_matrix/client_matrix.py
+++ b/tools/interop_matrix/client_matrix.py
@@ -30,9 +30,27 @@ LANG_RUNTIME_MATRIX = {
}
# Dictionary of releases per language. For each language, we need to provide
-# a tuple of release tag (used as the tag for the GCR image) and also github hash.
+# a release tag pointing to the latest build of the branch.
LANG_RELEASE_MATRIX = {
- 'cxx': ['v1.0.1', 'v1.1.2'],
- 'go': ['v1.0.1-GA', 'v1.3.0'],
- 'java': ['v1.0.3', 'v1.1.2'],
+ 'cxx': [
+ 'v1.0.1',
+ 'v1.1.4',
+ 'v1.2.5',
+ 'v1.3.9',
+ 'v1.4.2',
+ ],
+ 'go': [
+ 'v1.0.5',
+ 'v1.2.1',
+ 'v1.3.0',
+ 'v1.4.2',
+ ],
+ 'java': [
+ 'v1.0.3',
+ 'v1.1.2',
+ 'v1.2.0',
+ 'v1.3.1',
+ 'v1.4.0',
+ 'v1.5.0',
+ ],
}
diff --git a/tools/interop_matrix/run_interop_matrix_tests.py b/tools/interop_matrix/run_interop_matrix_tests.py
index 28126ddf5c..4315c8277d 100755
--- a/tools/interop_matrix/run_interop_matrix_tests.py
+++ b/tools/interop_matrix/run_interop_matrix_tests.py
@@ -168,7 +168,7 @@ def run_tests_for_lang(lang, runtime, images):
_xml_report_tree,
resultset,
'grpc_interop_matrix',
- '%s__%s:%s'%(lang,runtime,release),
+ '%s__%s %s'%(lang,runtime,release),
str(uuid.uuid4()))
_docker_images_cleanup = []
diff --git a/tools/mkowners/mkowners.py b/tools/mkowners/mkowners.py
index 2ccedfcfb8..e0ad998bdc 100755
--- a/tools/mkowners/mkowners.py
+++ b/tools/mkowners/mkowners.py
@@ -164,7 +164,6 @@ def expand_directives(root, directives):
if intersect:
for f in sorted(files_add): # sorted to ensure merge stability
if f not in intersect:
- print("X", root, glob_add, glob_have)
out_globs[os.path.relpath(f, start=root)] = who_add
for who in who_have:
if who not in out_globs[glob_add]:
@@ -185,7 +184,6 @@ def add_parent_to_globs(parent, globs, globs_dir):
if intersect:
for f in sorted(files_child): # sorted to ensure merge stability
if f not in intersect:
- print("Y", full_dir(owners.dir, oglob), full_dir(globs_dir, gglob))
who = gglob_who_orig.copy()
globs[os.path.relpath(f, start=globs_dir)] = who
for who in oglob_who:
diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py
index 809817a1a8..a41d0f0552 100755
--- a/tools/profiling/microbenchmarks/bm_diff/bm_diff.py
+++ b/tools/profiling/microbenchmarks/bm_diff/bm_diff.py
@@ -67,6 +67,12 @@ def _args():
default=20,
help='Number of times to loops the benchmarks. Must match what was passed to bm_run.py'
)
+ argp.add_argument(
+ '-r',
+ '--regex',
+ type=str,
+ default="",
+ help='Regex to filter benchmarks run')
argp.add_argument('--counters', dest='counters', action='store_true')
argp.add_argument('--no-counters', dest='counters', action='store_false')
argp.set_defaults(counters=True)
@@ -144,7 +150,7 @@ def _read_json(filename, badjson_files, nonexistant_files):
def fmt_dict(d):
return ''.join([" " + k + ": " + str(d[k]) + "\n" for k in d])
-def diff(bms, loops, track, old, new, counters):
+def diff(bms, loops, regex, track, old, new, counters):
benchmarks = collections.defaultdict(Benchmark)
badjson_files = {}
@@ -153,7 +159,8 @@ def diff(bms, loops, track, old, new, counters):
for loop in range(0, loops):
for line in subprocess.check_output(
['bm_diff_%s/opt/%s' % (old, bm),
- '--benchmark_list_tests']).splitlines():
+ '--benchmark_list_tests',
+ '--benchmark_filter=%s' % regex]).splitlines():
stripped_line = line.strip().replace("/", "_").replace(
"<", "_").replace(">", "_").replace(", ", "_")
js_new_opt = _read_json('%s.%s.opt.%s.%d.json' %
@@ -211,6 +218,6 @@ def diff(bms, loops, track, old, new, counters):
if __name__ == '__main__':
args = _args()
- diff, note = diff(args.benchmarks, args.loops, args.track, args.old,
+ diff, note = diff(args.benchmarks, args.loops, args.regex, args.track, args.old,
args.new, args.counters)
print('%s\n%s' % (note, diff if diff else "No performance differences"))
diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_main.py b/tools/profiling/microbenchmarks/bm_diff/bm_main.py
index 8b4e0cb69a..5aa11ac391 100755
--- a/tools/profiling/microbenchmarks/bm_diff/bm_main.py
+++ b/tools/profiling/microbenchmarks/bm_diff/bm_main.py
@@ -63,10 +63,10 @@ def _args():
help='Name of baseline run to compare to. Ususally just called "old"')
argp.add_argument(
'-r',
- '--repetitions',
- type=int,
- default=1,
- help='Number of repetitions to pass to the benchmarks')
+ '--regex',
+ type=str,
+ default="",
+ help='Regex to filter benchmarks run')
argp.add_argument(
'-l',
'--loops',
@@ -125,10 +125,10 @@ def main(args):
subprocess.check_call(['git', 'checkout', where_am_i])
subprocess.check_call(['git', 'submodule', 'update'])
- bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters)
- bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters)
+ bm_run.run('new', args.benchmarks, args.jobs, args.loops, args.regex, args.counters)
+ bm_run.run(old, args.benchmarks, args.jobs, args.loops, args.regex, args.counters)
- diff, note = bm_diff.diff(args.benchmarks, args.loops, args.track, old,
+ diff, note = bm_diff.diff(args.benchmarks, args.loops, args.regex, args.track, old,
'new', args.counters)
if diff:
text = '[%s] Performance differences noted:\n%s' % (args.pr_comment_name, diff)
diff --git a/tools/profiling/microbenchmarks/bm_diff/bm_run.py b/tools/profiling/microbenchmarks/bm_diff/bm_run.py
index 72b3d3cf10..206f7c5845 100755
--- a/tools/profiling/microbenchmarks/bm_diff/bm_run.py
+++ b/tools/profiling/microbenchmarks/bm_diff/bm_run.py
@@ -56,10 +56,10 @@ def _args():
)
argp.add_argument(
'-r',
- '--repetitions',
- type=int,
- default=1,
- help='Number of repetitions to pass to the benchmarks')
+ '--regex',
+ type=str,
+ default="",
+ help='Regex to filter benchmarks run')
argp.add_argument(
'-l',
'--loops',
@@ -77,18 +77,17 @@ def _args():
return args
-def _collect_bm_data(bm, cfg, name, reps, idx, loops):
+def _collect_bm_data(bm, cfg, name, regex, idx, loops):
jobs_list = []
for line in subprocess.check_output(
['bm_diff_%s/%s/%s' % (name, cfg, bm),
- '--benchmark_list_tests']).splitlines():
+ '--benchmark_list_tests', '--benchmark_filter=%s' % regex]).splitlines():
stripped_line = line.strip().replace("/", "_").replace(
"<", "_").replace(">", "_").replace(", ", "_")
cmd = [
'bm_diff_%s/%s/%s' % (name, cfg, bm), '--benchmark_filter=^%s$' %
line, '--benchmark_out=%s.%s.%s.%s.%d.json' %
(bm, stripped_line, cfg, name, idx), '--benchmark_out_format=json',
- '--benchmark_repetitions=%d' % (reps)
]
jobs_list.append(
jobset.JobSpec(
@@ -100,13 +99,13 @@ def _collect_bm_data(bm, cfg, name, reps, idx, loops):
return jobs_list
-def run(name, benchmarks, jobs, loops, reps, counters):
+def run(name, benchmarks, jobs, loops, regex, counters):
jobs_list = []
for loop in range(0, loops):
for bm in benchmarks:
- jobs_list += _collect_bm_data(bm, 'opt', name, reps, loop, loops)
+ jobs_list += _collect_bm_data(bm, 'opt', name, regex, loop, loops)
if counters:
- jobs_list += _collect_bm_data(bm, 'counters', name, reps, loop,
+ jobs_list += _collect_bm_data(bm, 'counters', name, regex, loop,
loops)
random.shuffle(jobs_list, random.SystemRandom().random)
jobset.run(jobs_list, maxjobs=jobs)
@@ -114,4 +113,4 @@ def run(name, benchmarks, jobs, loops, reps, counters):
if __name__ == '__main__':
args = _args()
- run(args.name, args.benchmarks, args.jobs, args.loops, args.repetitions, args.counters)
+ run(args.name, args.benchmarks, args.jobs, args.loops, args.regex, args.counters)
diff --git a/tools/run_tests/artifacts/build_artifact_python.bat b/tools/run_tests/artifacts/build_artifact_python.bat
index 4c34fc50f0..3a0564129c 100644
--- a/tools/run_tests/artifacts/build_artifact_python.bat
+++ b/tools/run_tests/artifacts/build_artifact_python.bat
@@ -12,8 +12,8 @@
@rem See the License for the specific language governing permissions and
@rem limitations under the License.
-
-set PATH=C:\%1;C:\%1\scripts;C:\msys64\mingw%2\bin;%PATH%
+@rem set path to python & mingw compiler
+set PATH=C:\%1;C:\%1\scripts;C:\msys64\mingw%2\bin;C:\tools\msys64\mingw%2\bin;%PATH%
pip install --upgrade six
pip install --upgrade setuptools
diff --git a/tools/run_tests/generated/sources_and_headers.json b/tools/run_tests/generated/sources_and_headers.json
index 8993e9340a..24e7ff2b7b 100644
--- a/tools/run_tests/generated/sources_and_headers.json
+++ b/tools/run_tests/generated/sources_and_headers.json
@@ -7339,6 +7339,7 @@
"test/core/end2end/tests/payload.c",
"test/core/end2end/tests/ping.c",
"test/core/end2end/tests/ping_pong_streaming.c",
+ "test/core/end2end/tests/proxy_auth.c",
"test/core/end2end/tests/registered_call.c",
"test/core/end2end/tests/request_with_flags.c",
"test/core/end2end/tests/request_with_payload.c",
@@ -7416,6 +7417,7 @@
"test/core/end2end/tests/payload.c",
"test/core/end2end/tests/ping.c",
"test/core/end2end/tests/ping_pong_streaming.c",
+ "test/core/end2end/tests/proxy_auth.c",
"test/core/end2end/tests/registered_call.c",
"test/core/end2end/tests/request_with_flags.c",
"test/core/end2end/tests/request_with_payload.c",
@@ -7738,6 +7740,7 @@
"src/core/lib/iomgr/iomgr.h",
"src/core/lib/iomgr/iomgr_internal.h",
"src/core/lib/iomgr/iomgr_posix.h",
+ "src/core/lib/iomgr/iomgr_uv.h",
"src/core/lib/iomgr/is_epollexclusive_available.h",
"src/core/lib/iomgr/load_file.h",
"src/core/lib/iomgr/lockfree_event.h",
@@ -7898,6 +7901,7 @@
"src/core/lib/iomgr/iomgr_posix.c",
"src/core/lib/iomgr/iomgr_posix.h",
"src/core/lib/iomgr/iomgr_uv.c",
+ "src/core/lib/iomgr/iomgr_uv.h",
"src/core/lib/iomgr/iomgr_windows.c",
"src/core/lib/iomgr/is_epollexclusive_available.c",
"src/core/lib/iomgr/is_epollexclusive_available.h",
diff --git a/tools/run_tests/generated/tests.json b/tools/run_tests/generated/tests.json
index 7ce8b3b9d1..ebf30717d4 100644
--- a/tools/run_tests/generated/tests.json
+++ b/tools/run_tests/generated/tests.json
@@ -1390,7 +1390,9 @@
],
"cpu_cost": 1.0,
"exclude_configs": [],
- "exclude_iomgrs": [],
+ "exclude_iomgrs": [
+ "uv"
+ ],
"flaky": false,
"gtest": false,
"language": "c",
@@ -3991,7 +3993,8 @@
"mac",
"posix",
"windows"
- ]
+ ],
+ "timeout_seconds": 1200
},
{
"args": [],
@@ -16351,6 +16354,30 @@
},
{
"args": [
+ "proxy_auth"
+ ],
+ "ci_platforms": [
+ "windows",
+ "linux",
+ "posix"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [
+ "uv"
+ ],
+ "flaky": false,
+ "language": "c",
+ "name": "h2_http_proxy_test",
+ "platforms": [
+ "windows",
+ "linux",
+ "mac",
+ "posix"
+ ]
+ },
+ {
+ "args": [
"registered_call"
],
"ci_platforms": [
@@ -38864,6 +38891,30 @@
},
{
"args": [
+ "proxy_auth"
+ ],
+ "ci_platforms": [
+ "windows",
+ "linux",
+ "posix"
+ ],
+ "cpu_cost": 1.0,
+ "exclude_configs": [],
+ "exclude_iomgrs": [
+ "uv"
+ ],
+ "flaky": false,
+ "language": "c",
+ "name": "h2_http_proxy_nosec_test",
+ "platforms": [
+ "windows",
+ "linux",
+ "mac",
+ "posix"
+ ]
+ },
+ {
+ "args": [
"registered_call"
],
"ci_platforms": [
diff --git a/tools/run_tests/run_tests.py b/tools/run_tests/run_tests.py
index 611868ce5a..50eed6256c 100755
--- a/tools/run_tests/run_tests.py
+++ b/tools/run_tests/run_tests.py
@@ -245,7 +245,7 @@ class CLanguage(object):
self._docker_distro, self._make_options = self._compiler_options(self.args.use_docker,
self.args.compiler)
if args.iomgr_platform == "uv":
- cflags = '-DGRPC_UV '
+ cflags = '-DGRPC_UV -DGRPC_UV_THREAD_CHECK'
try:
cflags += subprocess.check_output(['pkg-config', '--cflags', 'libuv']).strip() + ' '
except (subprocess.CalledProcessError, OSError):
diff --git a/tools/run_tests/sanity/core_banned_functions.py b/tools/run_tests/sanity/core_banned_functions.py
index b394bbbeaf..1f13905484 100755
--- a/tools/run_tests/sanity/core_banned_functions.py
+++ b/tools/run_tests/sanity/core_banned_functions.py
@@ -41,6 +41,8 @@ BANNED_EXCEPT = {
'grpc_closure_sched(' : ['src/core/lib/iomgr/closure.c'],
'grpc_closure_run(' : ['src/core/lib/iomgr/closure.c'],
'grpc_closure_list_sched(' : ['src/core/lib/iomgr/closure.c'],
+ 'gpr_getenv_silent(' : ['src/core/lib/support/log.c', 'src/core/lib/support/env_linux.c',
+ 'src/core/lib/support/env_posix.c', 'src/core/lib/support/env_windows.c'],
}
errors = 0
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj b/vsprojects/vcxproj/grpc/grpc.vcxproj
index 41fe1b222b..57c0ac7bc4 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj
@@ -334,6 +334,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\load_file.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\lockfree_event.h" />
diff --git a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
index 00b8705826..e9bb52defa 100644
--- a/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc/grpc.vcxproj.filters
@@ -959,6 +959,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
index 32809ca589..64bf54e557 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj
@@ -229,6 +229,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\load_file.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\lockfree_event.h" />
diff --git a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
index def8de4202..35fd87a4c5 100644
--- a/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_test_util/grpc_test_util.vcxproj.filters
@@ -683,6 +683,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
index ac98dc0540..4ed59780a4 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj
@@ -324,6 +324,7 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_internal.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h" />
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\load_file.h" />
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\lockfree_event.h" />
diff --git a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
index fea68c51dc..2c3319beae 100644
--- a/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
+++ b/vsprojects/vcxproj/grpc_unsecure/grpc_unsecure.vcxproj.filters
@@ -866,6 +866,9 @@
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_posix.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
+ <ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\iomgr_uv.h">
+ <Filter>src\core\lib\iomgr</Filter>
+ </ClInclude>
<ClInclude Include="$(SolutionDir)\..\src\core\lib\iomgr\is_epollexclusive_available.h">
<Filter>src\core\lib\iomgr</Filter>
</ClInclude>
diff --git a/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj b/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj
index 4d02c88082..249d99b526 100644
--- a/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj
+++ b/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj
@@ -231,6 +231,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\ping_pong_streaming.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\proxy_auth.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\registered_call.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\request_with_flags.c">
diff --git a/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters b/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters
index 89316bc535..3a2105ebe8 100644
--- a/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters
+++ b/vsprojects/vcxproj/test/end2end/tests/end2end_nosec_tests/end2end_nosec_tests.vcxproj.filters
@@ -121,6 +121,9 @@
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\ping_pong_streaming.c">
<Filter>test\core\end2end\tests</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\proxy_auth.c">
+ <Filter>test\core\end2end\tests</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\registered_call.c">
<Filter>test\core\end2end\tests</Filter>
</ClCompile>
diff --git a/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj b/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj
index 9573111452..b7a2ecd27b 100644
--- a/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj
+++ b/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj
@@ -233,6 +233,8 @@
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\ping_pong_streaming.c">
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\proxy_auth.c">
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\registered_call.c">
</ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\request_with_flags.c">
diff --git a/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters b/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters
index 7d02fc3fa0..1626b77d14 100644
--- a/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters
+++ b/vsprojects/vcxproj/test/end2end/tests/end2end_tests/end2end_tests.vcxproj.filters
@@ -124,6 +124,9 @@
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\ping_pong_streaming.c">
<Filter>test\core\end2end\tests</Filter>
</ClCompile>
+ <ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\proxy_auth.c">
+ <Filter>test\core\end2end\tests</Filter>
+ </ClCompile>
<ClCompile Include="$(SolutionDir)\..\test\core\end2end\tests\registered_call.c">
<Filter>test\core\end2end\tests</Filter>
</ClCompile>