aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar hcaseyal <hcaseyal@gmail.com>2018-12-03 10:16:35 -0800
committerGravatar GitHub <noreply@github.com>2018-12-03 10:16:35 -0800
commitcf14bd13fbbd2d0069db35de51b940d9f0a458f2 (patch)
treea6bcfc779b60b4ea436f04875bdc7658046b0080 /src
parentc12aabc6a7d67e9f786125480ed6b17e25278c98 (diff)
parent1e1d4c262134c2641329a42f262c1687d8113f0e (diff)
Merge branch 'master' into callback_test_coverage_1
Diffstat (limited to 'src')
-rw-r--r--src/compiler/cpp_generator.cc147
-rw-r--r--src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc84
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc10
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc36
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h1
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc12
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h4
-rw-r--r--src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc3
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.cc1
-rw-r--r--src/core/ext/transport/chttp2/transport/context_list.cc1
-rw-r--r--src/core/ext/transport/chttp2/transport/context_list.h2
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h6
-rw-r--r--src/core/ext/transport/chttp2/transport/writing.cc7
-rw-r--r--src/core/lib/gprpp/inlined_vector.h44
-rw-r--r--src/core/lib/iomgr/buffer_list.cc3
-rw-r--r--src/core/lib/iomgr/buffer_list.h4
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc4
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc4
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc4
-rw-r--r--src/core/lib/iomgr/ev_posix.cc8
-rw-r--r--src/core/lib/iomgr/ev_posix.h10
-rw-r--r--src/core/lib/iomgr/fork_posix.cc2
-rw-r--r--src/core/lib/iomgr/iomgr.cc4
-rw-r--r--src/core/lib/iomgr/iomgr.h4
-rw-r--r--src/core/lib/iomgr/iomgr_custom.cc4
-rw-r--r--src/core/lib/iomgr/iomgr_internal.cc4
-rw-r--r--src/core/lib/iomgr/iomgr_internal.h4
-rw-r--r--src/core/lib/iomgr/iomgr_posix.cc7
-rw-r--r--src/core/lib/iomgr/iomgr_posix_cfstream.cc7
-rw-r--r--src/core/lib/iomgr/iomgr_windows.cc5
-rw-r--r--src/core/lib/iomgr/resolve_address.h2
-rw-r--r--src/core/lib/iomgr/tcp_posix.cc36
-rw-r--r--src/core/lib/surface/init.cc1
-rw-r--r--src/core/lib/transport/transport.cc3
-rw-r--r--src/core/lib/transport/transport.h6
-rw-r--r--src/core/tsi/alts/handshaker/alts_handshaker_client.cc11
-rw-r--r--src/cpp/client/generic_stub.cc9
-rw-r--r--src/cpp/server/server_cc.cc7
-rw-r--r--src/cpp/server/server_context.cc47
-rw-r--r--src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs2
-rwxr-xr-xsrc/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj2
-rw-r--r--src/csharp/Grpc.Core.Tests/NUnitMain.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/SanityTest.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeExtension.cs4
-rwxr-xr-xsrc/csharp/Grpc.Examples.MathClient/Grpc.Examples.MathClient.csproj2
-rwxr-xr-xsrc/csharp/Grpc.Examples.MathServer/Grpc.Examples.MathServer.csproj2
-rwxr-xr-xsrc/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj2
-rw-r--r--src/csharp/Grpc.Examples.Tests/NUnitMain.cs2
-rwxr-xr-xsrc/csharp/Grpc.Examples/Grpc.Examples.csproj2
-rwxr-xr-xsrc/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj2
-rw-r--r--src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs2
-rwxr-xr-xsrc/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj2
-rwxr-xr-xsrc/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj2
-rwxr-xr-xsrc/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj4
-rwxr-xr-xsrc/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj2
-rwxr-xr-xsrc/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/NUnitMain.cs2
-rw-r--r--src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj2
-rwxr-xr-xsrc/csharp/Grpc.Reflection.Tests/Grpc.Reflection.Tests.csproj2
-rw-r--r--src/csharp/Grpc.Reflection.Tests/NUnitMain.cs2
-rw-r--r--src/csharp/Grpc.Tools.Tests/Grpc.Tools.Tests.csproj2
-rw-r--r--src/csharp/Grpc.Tools.Tests/NUnitMain.cs2
-rw-r--r--src/php/ext/grpc/php_grpc.c4
-rw-r--r--src/proto/grpc/channelz/BUILD7
-rw-r--r--src/proto/grpc/health/v1/BUILD1
-rw-r--r--src/proto/grpc/testing/compiler_test.proto3
-rw-r--r--src/python/grpcio/grpc/__init__.py2
-rw-r--r--src/python/grpcio/grpc/_cython/BUILD.bazel1
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/channelz.pyx.pxi69
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi10
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx1
-rw-r--r--src/python/grpcio_channelz/.gitignore6
-rw-r--r--src/python/grpcio_channelz/MANIFEST.in3
-rw-r--r--src/python/grpcio_channelz/README.rst9
-rw-r--r--src/python/grpcio_channelz/channelz_commands.py63
-rw-r--r--src/python/grpcio_channelz/grpc_channelz/__init__.py13
-rw-r--r--src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel38
-rw-r--r--src/python/grpcio_channelz/grpc_channelz/v1/__init__.py13
-rw-r--r--src/python/grpcio_channelz/grpc_channelz/v1/channelz.py141
-rw-r--r--src/python/grpcio_channelz/grpc_version.py17
-rw-r--r--src/python/grpcio_channelz/setup.py96
-rw-r--r--src/python/grpcio_tests/commands.py7
-rw-r--r--src/python/grpcio_tests/setup.py1
-rw-r--r--src/python/grpcio_tests/tests/channelz/BUILD.bazel15
-rw-r--r--src/python/grpcio_tests/tests/channelz/__init__.py13
-rw-r--r--src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py494
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
-rw-r--r--src/ruby/pb/grpc/health/checker.rb5
88 files changed, 1466 insertions, 177 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 7986aca696..b004687250 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -132,6 +132,7 @@ grpc::string GetHeaderIncludes(grpc_generator::File* file,
"grpcpp/impl/codegen/async_generic_service.h",
"grpcpp/impl/codegen/async_stream.h",
"grpcpp/impl/codegen/async_unary_call.h",
+ "grpcpp/impl/codegen/client_callback.h",
"grpcpp/impl/codegen/method_handler_impl.h",
"grpcpp/impl/codegen/proto_utils.h",
"grpcpp/impl/codegen/rpc_method.h",
@@ -580,11 +581,22 @@ void PrintHeaderClientMethodCallbackInterfaces(
"const $Request$* request, $Response$* response, "
"std::function<void(::grpc::Status)>) = 0;\n");
} else if (ClientOnlyStreaming(method)) {
- // TODO(vjpai): Add support for client-side streaming
+ printer->Print(*vars,
+ "virtual void $Method$(::grpc::ClientContext* context, "
+ "$Response$* response, "
+ "::grpc::experimental::ClientWriteReactor< $Request$>* "
+ "reactor) = 0;\n");
} else if (ServerOnlyStreaming(method)) {
- // TODO(vjpai): Add support for server-side streaming
+ printer->Print(*vars,
+ "virtual void $Method$(::grpc::ClientContext* context, "
+ "$Request$* request, "
+ "::grpc::experimental::ClientReadReactor< $Response$>* "
+ "reactor) = 0;\n");
} else if (method->BidiStreaming()) {
- // TODO(vjpai): Add support for bidi streaming
+ printer->Print(*vars,
+ "virtual void $Method$(::grpc::ClientContext* context, "
+ "::grpc::experimental::ClientBidiReactor< "
+ "$Request$,$Response$>* reactor) = 0;\n");
}
}
@@ -631,11 +643,23 @@ void PrintHeaderClientMethodCallback(grpc_generator::Printer* printer,
"const $Request$* request, $Response$* response, "
"std::function<void(::grpc::Status)>) override;\n");
} else if (ClientOnlyStreaming(method)) {
- // TODO(vjpai): Add support for client-side streaming
+ printer->Print(*vars,
+ "void $Method$(::grpc::ClientContext* context, "
+ "$Response$* response, "
+ "::grpc::experimental::ClientWriteReactor< $Request$>* "
+ "reactor) override;\n");
} else if (ServerOnlyStreaming(method)) {
- // TODO(vjpai): Add support for server-side streaming
+ printer->Print(*vars,
+ "void $Method$(::grpc::ClientContext* context, "
+ "$Request$* request, "
+ "::grpc::experimental::ClientReadReactor< $Response$>* "
+ "reactor) override;\n");
+
} else if (method->BidiStreaming()) {
- // TODO(vjpai): Add support for bidi streaming
+ printer->Print(*vars,
+ "void $Method$(::grpc::ClientContext* context, "
+ "::grpc::experimental::ClientBidiReactor< "
+ "$Request$,$Response$>* reactor) override;\n");
}
}
@@ -865,6 +889,11 @@ void PrintHeaderServerCallbackMethodsHelper(
" abort();\n"
" return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
"}\n");
+ printer->Print(*vars,
+ "virtual ::grpc::experimental::ServerReadReactor< "
+ "$RealRequest$, $RealResponse$>* $Method$() {\n"
+ " return new ::grpc::internal::UnimplementedReadReactor<\n"
+ " $RealRequest$, $RealResponse$>;}\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
@@ -876,6 +905,11 @@ void PrintHeaderServerCallbackMethodsHelper(
" abort();\n"
" return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
"}\n");
+ printer->Print(*vars,
+ "virtual ::grpc::experimental::ServerWriteReactor< "
+ "$RealRequest$, $RealResponse$>* $Method$() {\n"
+ " return new ::grpc::internal::UnimplementedWriteReactor<\n"
+ " $RealRequest$, $RealResponse$>;}\n");
} else if (method->BidiStreaming()) {
printer->Print(
*vars,
@@ -887,6 +921,11 @@ void PrintHeaderServerCallbackMethodsHelper(
" abort();\n"
" return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, \"\");\n"
"}\n");
+ printer->Print(*vars,
+ "virtual ::grpc::experimental::ServerBidiReactor< "
+ "$RealRequest$, $RealResponse$>* $Method$() {\n"
+ " return new ::grpc::internal::UnimplementedBidiReactor<\n"
+ " $RealRequest$, $RealResponse$>;}\n");
}
}
@@ -915,22 +954,36 @@ void PrintHeaderServerMethodCallback(
*vars,
" ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n"
" new ::grpc::internal::CallbackUnaryHandler< "
- "ExperimentalWithCallbackMethod_$Method$<BaseClass>, $RealRequest$, "
- "$RealResponse$>(\n"
+ "$RealRequest$, $RealResponse$>(\n"
" [this](::grpc::ServerContext* context,\n"
" const $RealRequest$* request,\n"
" $RealResponse$* response,\n"
" ::grpc::experimental::ServerCallbackRpcController* "
"controller) {\n"
- " this->$"
+ " return this->$"
"Method$(context, request, response, controller);\n"
- " }, this));\n");
+ " }));\n");
} else if (ClientOnlyStreaming(method)) {
- // TODO(vjpai): Add in code generation for all streaming methods
+ printer->Print(
+ *vars,
+ " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n"
+ " new ::grpc::internal::CallbackClientStreamingHandler< "
+ "$RealRequest$, $RealResponse$>(\n"
+ " [this] { return this->$Method$(); }));\n");
} else if (ServerOnlyStreaming(method)) {
- // TODO(vjpai): Add in code generation for all streaming methods
+ printer->Print(
+ *vars,
+ " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n"
+ " new ::grpc::internal::CallbackServerStreamingHandler< "
+ "$RealRequest$, $RealResponse$>(\n"
+ " [this] { return this->$Method$(); }));\n");
} else if (method->BidiStreaming()) {
- // TODO(vjpai): Add in code generation for all streaming methods
+ printer->Print(
+ *vars,
+ " ::grpc::Service::experimental().MarkMethodCallback($Idx$,\n"
+ " new ::grpc::internal::CallbackBidiHandler< "
+ "$RealRequest$, $RealResponse$>(\n"
+ " [this] { return this->$Method$(); }));\n");
}
printer->Print(*vars, "}\n");
printer->Print(*vars,
@@ -967,8 +1020,7 @@ void PrintHeaderServerMethodRawCallback(
*vars,
" ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n"
" new ::grpc::internal::CallbackUnaryHandler< "
- "ExperimentalWithRawCallbackMethod_$Method$<BaseClass>, $RealRequest$, "
- "$RealResponse$>(\n"
+ "$RealRequest$, $RealResponse$>(\n"
" [this](::grpc::ServerContext* context,\n"
" const $RealRequest$* request,\n"
" $RealResponse$* response,\n"
@@ -976,13 +1028,28 @@ void PrintHeaderServerMethodRawCallback(
"controller) {\n"
" this->$"
"Method$(context, request, response, controller);\n"
- " }, this));\n");
+ " }));\n");
} else if (ClientOnlyStreaming(method)) {
- // TODO(vjpai): Add in code generation for all streaming methods
+ printer->Print(
+ *vars,
+ " ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n"
+ " new ::grpc::internal::CallbackClientStreamingHandler< "
+ "$RealRequest$, $RealResponse$>(\n"
+ " [this] { return this->$Method$(); }));\n");
} else if (ServerOnlyStreaming(method)) {
- // TODO(vjpai): Add in code generation for all streaming methods
+ printer->Print(
+ *vars,
+ " ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n"
+ " new ::grpc::internal::CallbackServerStreamingHandler< "
+ "$RealRequest$, $RealResponse$>(\n"
+ " [this] { return this->$Method$(); }));\n");
} else if (method->BidiStreaming()) {
- // TODO(vjpai): Add in code generation for all streaming methods
+ printer->Print(
+ *vars,
+ " ::grpc::Service::experimental().MarkMethodRawCallback($Idx$,\n"
+ " new ::grpc::internal::CallbackBidiHandler< "
+ "$RealRequest$, $RealResponse$>(\n"
+ " [this] { return this->$Method$(); }));\n");
}
printer->Print(*vars, "}\n");
printer->Print(*vars,
@@ -1607,7 +1674,19 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"context, response);\n"
"}\n\n");
- // TODO(vjpai): Add callback version
+ printer->Print(
+ *vars,
+ "void $ns$$Service$::"
+ "Stub::experimental_async::$Method$(::grpc::ClientContext* context, "
+ "$Response$* response, "
+ "::grpc::experimental::ClientWriteReactor< $Request$>* reactor) {\n");
+ printer->Print(*vars,
+ " ::grpc::internal::ClientCallbackWriterFactory< "
+ "$Request$>::Create("
+ "stub_->channel_.get(), "
+ "stub_->rpcmethod_$Method$_, "
+ "context, response, reactor);\n"
+ "}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
@@ -1641,7 +1720,19 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"context, request);\n"
"}\n\n");
- // TODO(vjpai): Add callback version
+ printer->Print(
+ *vars,
+ "void $ns$$Service$::Stub::experimental_async::$Method$(::grpc::"
+ "ClientContext* context, "
+ "$Request$* request, "
+ "::grpc::experimental::ClientReadReactor< $Response$>* reactor) {\n");
+ printer->Print(*vars,
+ " ::grpc::internal::ClientCallbackReaderFactory< "
+ "$Response$>::Create("
+ "stub_->channel_.get(), "
+ "stub_->rpcmethod_$Method$_, "
+ "context, request, reactor);\n"
+ "}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
@@ -1675,7 +1766,19 @@ void PrintSourceClientMethod(grpc_generator::Printer* printer,
"context);\n"
"}\n\n");
- // TODO(vjpai): Add callback version
+ printer->Print(
+ *vars,
+ "void $ns$$Service$::Stub::experimental_async::$Method$(::grpc::"
+ "ClientContext* context, "
+ "::grpc::experimental::ClientBidiReactor< $Request$,$Response$>* "
+ "reactor) {\n");
+ printer->Print(*vars,
+ " ::grpc::internal::ClientCallbackReaderWriterFactory< "
+ "$Request$,$Response$>::Create("
+ "stub_->channel_.get(), "
+ "stub_->rpcmethod_$Method$_, "
+ "context, reactor);\n"
+ "}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index dc0e1f89ce..7e70f1c28b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -749,7 +749,7 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
void* arg, grpc_error* error) {
BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
- // Empty payload means the LB call was cancelled.
+ // Null payload means the LB call was cancelled.
if (lb_calld != grpclb_policy->lb_calld_.get() ||
lb_calld->recv_message_payload_ == nullptr) {
lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
@@ -803,54 +803,45 @@ void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
gpr_free(ipport);
}
}
- /* update serverlist */
- if (serverlist->num_servers > 0) {
- // Start sending client load report only after we start using the
- // serverlist returned from the current LB call.
- if (lb_calld->client_stats_report_interval_ > 0 &&
- lb_calld->client_stats_ == nullptr) {
- lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
- // TODO(roth): We currently track this ref manually. Once the
- // ClosureRef API is ready, we should pass the RefCountedPtr<> along
- // with the callback.
- auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
- self.release();
- lb_calld->ScheduleNextClientLoadReportLocked();
- }
- if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_,
- serverlist)) {
- if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO,
- "[grpclb %p] Incoming server list identical to current, "
- "ignoring.",
- grpclb_policy);
- }
- grpc_grpclb_destroy_serverlist(serverlist);
- } else { /* new serverlist */
- if (grpclb_policy->serverlist_ != nullptr) {
- /* dispose of the old serverlist */
- grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
- } else {
- /* or dispose of the fallback */
- grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
- grpclb_policy->fallback_backend_addresses_ = nullptr;
- if (grpclb_policy->fallback_timer_callback_pending_) {
- grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
- }
- }
- // and update the copy in the GrpcLb instance. This
- // serverlist instance will be destroyed either upon the next
- // update or when the GrpcLb instance is destroyed.
- grpclb_policy->serverlist_ = serverlist;
- grpclb_policy->serverlist_index_ = 0;
- grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
- }
- } else {
+ // Start sending client load report only after we start using the
+ // serverlist returned from the current LB call.
+ if (lb_calld->client_stats_report_interval_ > 0 &&
+ lb_calld->client_stats_ == nullptr) {
+ lb_calld->client_stats_.reset(New<GrpcLbClientStats>());
+ // TODO(roth): We currently track this ref manually. Once the
+ // ClosureRef API is ready, we should pass the RefCountedPtr<> along
+ // with the callback.
+ auto self = lb_calld->Ref(DEBUG_LOCATION, "client_load_report");
+ self.release();
+ lb_calld->ScheduleNextClientLoadReportLocked();
+ }
+ // Check if the serverlist differs from the previous one.
+ if (grpc_grpclb_serverlist_equals(grpclb_policy->serverlist_, serverlist)) {
if (grpc_lb_glb_trace.enabled()) {
- gpr_log(GPR_INFO, "[grpclb %p] Received empty server list, ignoring.",
+ gpr_log(GPR_INFO,
+ "[grpclb %p] Incoming server list identical to current, "
+ "ignoring.",
grpclb_policy);
}
grpc_grpclb_destroy_serverlist(serverlist);
+ } else { // New serverlist.
+ if (grpclb_policy->serverlist_ != nullptr) {
+ // Dispose of the old serverlist.
+ grpc_grpclb_destroy_serverlist(grpclb_policy->serverlist_);
+ } else {
+ // Dispose of the fallback.
+ grpc_lb_addresses_destroy(grpclb_policy->fallback_backend_addresses_);
+ grpclb_policy->fallback_backend_addresses_ = nullptr;
+ if (grpclb_policy->fallback_timer_callback_pending_) {
+ grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
+ }
+ }
+ // Update the serverlist in the GrpcLb instance. This serverlist
+ // instance will be destroyed either upon the next update or when the
+ // GrpcLb instance is destroyed.
+ grpclb_policy->serverlist_ = serverlist;
+ grpclb_policy->serverlist_index_ = 0;
+ grpclb_policy->CreateOrUpdateRoundRobinPolicyLocked();
}
} else {
// No valid initial response or serverlist found.
@@ -1583,7 +1574,7 @@ void GrpcLb::AddPendingPick(PendingPick* pp) {
bool GrpcLb::PickFromRoundRobinPolicyLocked(bool force_async, PendingPick* pp,
grpc_error** error) {
// Check for drops if we are not using fallback backend addresses.
- if (serverlist_ != nullptr) {
+ if (serverlist_ != nullptr && serverlist_->num_servers > 0) {
// Look at the index into the serverlist to see if we should drop this call.
grpc_grpclb_server* server = serverlist_->servers[serverlist_index_++];
if (serverlist_index_ == serverlist_->num_servers) {
@@ -1681,7 +1672,6 @@ grpc_channel_args* GrpcLb::CreateRoundRobinPolicyArgsLocked() {
grpc_lb_addresses* addresses;
bool is_backend_from_grpclb_load_balancer = false;
if (serverlist_ != nullptr) {
- GPR_ASSERT(serverlist_->num_servers > 0);
addresses = ProcessServerlist(serverlist_);
is_backend_from_grpclb_load_balancer = true;
} else {
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 90bc88961d..4ebc2c8161 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -122,6 +122,8 @@ class AresDnsResolver : public Resolver {
char* service_config_json_ = nullptr;
// has shutdown been initiated
bool shutdown_initiated_ = false;
+ // timeout in milliseconds for active DNS queries
+ int query_timeout_ms_;
};
AresDnsResolver::AresDnsResolver(const ResolverArgs& args)
@@ -159,6 +161,11 @@ AresDnsResolver::AresDnsResolver(const ResolverArgs& args)
grpc_combiner_scheduler(combiner()));
GRPC_CLOSURE_INIT(&on_resolved_, OnResolvedLocked, this,
grpc_combiner_scheduler(combiner()));
+ const grpc_arg* query_timeout_ms_arg =
+ grpc_channel_args_find(channel_args_, GRPC_ARG_DNS_ARES_QUERY_TIMEOUT_MS);
+ query_timeout_ms_ = grpc_channel_arg_get_integer(
+ query_timeout_ms_arg,
+ {GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS, 0, INT_MAX});
}
AresDnsResolver::~AresDnsResolver() {
@@ -410,7 +417,8 @@ void AresDnsResolver::StartResolvingLocked() {
pending_request_ = grpc_dns_lookup_ares_locked(
dns_server_, name_to_resolve_, kDefaultPort, interested_parties_,
&on_resolved_, &lb_addresses_, true /* check_grpclb */,
- request_service_config_ ? &service_config_json_ : nullptr, combiner());
+ request_service_config_ ? &service_config_json_ : nullptr,
+ query_timeout_ms_, combiner());
last_resolution_timestamp_ = grpc_core::ExecCtx::Get()->Now();
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
index fdbd07ebf5..f42b1e309d 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.cc
@@ -33,6 +33,7 @@
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
#include "src/core/lib/iomgr/sockaddr_utils.h"
+#include "src/core/lib/iomgr/timer.h"
typedef struct fd_node {
/** the owner of this fd node */
@@ -76,6 +77,12 @@ struct grpc_ares_ev_driver {
grpc_ares_request* request;
/** Owned by the ev_driver. Creates new GrpcPolledFd's */
grpc_core::UniquePtr<grpc_core::GrpcPolledFdFactory> polled_fd_factory;
+ /** query timeout in milliseconds */
+ int query_timeout_ms;
+ /** alarm to cancel active queries */
+ grpc_timer query_timeout;
+ /** cancels queries on a timeout */
+ grpc_closure on_timeout_locked;
};
static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver);
@@ -116,8 +123,11 @@ static void fd_node_shutdown_locked(fd_node* fdn, const char* reason) {
}
}
+static void on_timeout_locked(void* arg, grpc_error* error);
+
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set,
+ int query_timeout_ms,
grpc_combiner* combiner,
grpc_ares_request* request) {
*ev_driver = grpc_core::New<grpc_ares_ev_driver>();
@@ -146,6 +156,9 @@ grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_core::NewGrpcPolledFdFactory((*ev_driver)->combiner);
(*ev_driver)
->polled_fd_factory->ConfigureAresChannelLocked((*ev_driver)->channel);
+ GRPC_CLOSURE_INIT(&(*ev_driver)->on_timeout_locked, on_timeout_locked,
+ *ev_driver, grpc_combiner_scheduler(combiner));
+ (*ev_driver)->query_timeout_ms = query_timeout_ms;
return GRPC_ERROR_NONE;
}
@@ -155,6 +168,7 @@ void grpc_ares_ev_driver_on_queries_complete_locked(
// is working, grpc_ares_notify_on_event_locked will shut down the
// fds; if it's not working, there are no fds to shut down.
ev_driver->shutting_down = true;
+ grpc_timer_cancel(&ev_driver->query_timeout);
grpc_ares_ev_driver_unref(ev_driver);
}
@@ -185,6 +199,17 @@ static fd_node* pop_fd_node_locked(fd_node** head, ares_socket_t as) {
return nullptr;
}
+static void on_timeout_locked(void* arg, grpc_error* error) {
+ grpc_ares_ev_driver* driver = static_cast<grpc_ares_ev_driver*>(arg);
+ GRPC_CARES_TRACE_LOG(
+ "ev_driver=%p on_timeout_locked. driver->shutting_down=%d. err=%s",
+ driver, driver->shutting_down, grpc_error_string(error));
+ if (!driver->shutting_down && error == GRPC_ERROR_NONE) {
+ grpc_ares_ev_driver_shutdown_locked(driver);
+ }
+ grpc_ares_ev_driver_unref(driver);
+}
+
static void on_readable_locked(void* arg, grpc_error* error) {
fd_node* fdn = static_cast<fd_node*>(arg);
grpc_ares_ev_driver* ev_driver = fdn->ev_driver;
@@ -314,6 +339,17 @@ void grpc_ares_ev_driver_start_locked(grpc_ares_ev_driver* ev_driver) {
if (!ev_driver->working) {
ev_driver->working = true;
grpc_ares_notify_on_event_locked(ev_driver);
+ grpc_millis timeout =
+ ev_driver->query_timeout_ms == 0
+ ? GRPC_MILLIS_INF_FUTURE
+ : ev_driver->query_timeout_ms + grpc_core::ExecCtx::Get()->Now();
+ GRPC_CARES_TRACE_LOG(
+ "ev_driver=%p grpc_ares_ev_driver_start_locked. timeout in %" PRId64
+ " ms",
+ ev_driver, timeout);
+ grpc_ares_ev_driver_ref(ev_driver);
+ grpc_timer_init(&ev_driver->query_timeout, timeout,
+ &ev_driver->on_timeout_locked);
}
}
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
index 671c537fe7..b8cefd9470 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver.h
@@ -43,6 +43,7 @@ ares_channel* grpc_ares_ev_driver_get_channel_locked(
created successfully. */
grpc_error* grpc_ares_ev_driver_create_locked(grpc_ares_ev_driver** ev_driver,
grpc_pollset_set* pollset_set,
+ int query_timeout_ms,
grpc_combiner* combiner,
grpc_ares_request* request);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index 582e2203fc..55715869b6 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -359,7 +359,7 @@ done:
void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
grpc_ares_request* r, const char* dns_server, const char* name,
const char* default_port, grpc_pollset_set* interested_parties,
- bool check_grpclb, grpc_combiner* combiner) {
+ bool check_grpclb, int query_timeout_ms, grpc_combiner* combiner) {
grpc_error* error = GRPC_ERROR_NONE;
grpc_ares_hostbyname_request* hr = nullptr;
ares_channel* channel = nullptr;
@@ -388,7 +388,7 @@ void grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
port = gpr_strdup(default_port);
}
error = grpc_ares_ev_driver_create_locked(&r->ev_driver, interested_parties,
- combiner, r);
+ query_timeout_ms, combiner, r);
if (error != GRPC_ERROR_NONE) goto error_cleanup;
channel = grpc_ares_ev_driver_get_channel_locked(r->ev_driver);
// If dns_server is specified, use it.
@@ -522,7 +522,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
- grpc_combiner* combiner) {
+ int query_timeout_ms, grpc_combiner* combiner) {
grpc_ares_request* r =
static_cast<grpc_ares_request*>(gpr_zalloc(sizeof(grpc_ares_request)));
r->ev_driver = nullptr;
@@ -546,7 +546,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
// Look up name using c-ares lib.
grpc_dns_lookup_ares_continue_after_check_localhost_and_ip_literals_locked(
r, dns_server, name, default_port, interested_parties, check_grpclb,
- combiner);
+ query_timeout_ms, combiner);
return r;
}
@@ -554,6 +554,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ int query_timeout_ms,
grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {
@@ -648,7 +649,8 @@ static void grpc_resolve_address_invoke_dns_lookup_ares_locked(
r->ares_request = grpc_dns_lookup_ares_locked(
nullptr /* dns_server */, r->name, r->default_port, r->interested_parties,
&r->on_dns_lookup_done_locked, &r->lb_addrs, false /* check_grpclb */,
- nullptr /* service_config_json */, r->combiner);
+ nullptr /* service_config_json */, GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS,
+ r->combiner);
}
static void grpc_resolve_address_ares_impl(const char* name,
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
index a1231cc4e0..9acef1d0ca 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.h
@@ -26,6 +26,8 @@
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/resolve_address.h"
+#define GRPC_DNS_ARES_DEFAULT_QUERY_TIMEOUT_MS 10000
+
extern grpc_core::TraceFlag grpc_trace_cares_address_sorting;
extern grpc_core::TraceFlag grpc_trace_cares_resolver;
@@ -60,7 +62,7 @@ extern grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addresses, bool check_grpclb,
- char** service_config_json, grpc_combiner* combiner);
+ char** service_config_json, int query_timeout_ms, grpc_combiner* combiner);
/* Cancel the pending grpc_ares_request \a request */
extern void (*grpc_cancel_ares_request_locked)(grpc_ares_request* request);
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
index 9f293c1ac0..fc78b18304 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper_fallback.cc
@@ -30,7 +30,7 @@ static grpc_ares_request* grpc_dns_lookup_ares_locked_impl(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
- grpc_combiner* combiner) {
+ int query_timeout_ms, grpc_combiner* combiner) {
return NULL;
}
@@ -38,6 +38,7 @@ grpc_ares_request* (*grpc_dns_lookup_ares_locked)(
const char* dns_server, const char* name, const char* default_port,
grpc_pollset_set* interested_parties, grpc_closure* on_done,
grpc_lb_addresses** addrs, bool check_grpclb, char** service_config_json,
+ int query_timeout_ms,
grpc_combiner* combiner) = grpc_dns_lookup_ares_locked_impl;
static void grpc_cancel_ares_request_locked_impl(grpc_ares_request* r) {}
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
index b1b8c0083b..99c675f503 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.cc
@@ -1390,6 +1390,7 @@ static void perform_stream_op_locked(void* stream_op,
GRPC_STATS_INC_HTTP2_OP_BATCHES();
s->context = op->payload->context;
+ s->traced = op->is_traced;
if (grpc_http_trace.enabled()) {
char* str = grpc_transport_stream_op_batch_string(op);
gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,
diff --git a/src/core/ext/transport/chttp2/transport/context_list.cc b/src/core/ext/transport/chttp2/transport/context_list.cc
index 4acd0c9583..f30d41c332 100644
--- a/src/core/ext/transport/chttp2/transport/context_list.cc
+++ b/src/core/ext/transport/chttp2/transport/context_list.cc
@@ -32,6 +32,7 @@ void ContextList::Execute(void* arg, grpc_core::Timestamps* ts,
while (head != nullptr) {
if (error == GRPC_ERROR_NONE && ts != nullptr) {
if (write_timestamps_callback_g) {
+ ts->byte_offset = static_cast<uint32_t>(head->byte_offset_);
write_timestamps_callback_g(head->s_->context, ts);
}
}
diff --git a/src/core/ext/transport/chttp2/transport/context_list.h b/src/core/ext/transport/chttp2/transport/context_list.h
index 68d11e94d8..d870107749 100644
--- a/src/core/ext/transport/chttp2/transport/context_list.h
+++ b/src/core/ext/transport/chttp2/transport/context_list.h
@@ -50,6 +50,7 @@ class ContextList {
/* Create a new element in the list and add it at the front */
ContextList* elem = grpc_core::New<ContextList>();
elem->s_ = s;
+ elem->byte_offset_ = s->byte_counter;
elem->next_ = *head;
*head = elem;
}
@@ -61,6 +62,7 @@ class ContextList {
private:
grpc_chttp2_stream* s_ = nullptr;
ContextList* next_ = nullptr;
+ size_t byte_offset_ = 0;
};
void grpc_http2_set_write_timestamps_callback(
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 1c79cf6353..6aa68f5d4a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -642,10 +642,12 @@ struct grpc_chttp2_stream {
/** Whether bytes stored in unprocessed_incoming_byte_stream is decompressed
*/
bool unprocessed_incoming_frames_decompressed = false;
- /** gRPC header bytes that are already decompressed */
- size_t decompressed_header_bytes = 0;
/** Whether the bytes needs to be traced using Fathom */
bool traced = false;
+ /** gRPC header bytes that are already decompressed */
+ size_t decompressed_header_bytes = 0;
+ /** Byte counter for number of bytes written */
+ size_t byte_counter = 0;
};
/** Transport writing call flow:
diff --git a/src/core/ext/transport/chttp2/transport/writing.cc b/src/core/ext/transport/chttp2/transport/writing.cc
index 77320b496f..265d3365d3 100644
--- a/src/core/ext/transport/chttp2/transport/writing.cc
+++ b/src/core/ext/transport/chttp2/transport/writing.cc
@@ -363,6 +363,7 @@ class DataSendContext {
grpc_chttp2_encode_data(s_->id, &s_->compressed_data_buffer, send_bytes,
is_last_frame_, &s_->stats.outgoing, &t_->outbuf);
s_->flow_control->SentData(send_bytes);
+ s_->byte_counter += send_bytes;
if (s_->compressed_data_buffer.length == 0) {
s_->sending_bytes += s_->uncompressed_data_size;
}
@@ -488,9 +489,6 @@ class StreamWriteContext {
return; // early out: nothing to do
}
- if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) {
- grpc_core::ContextList::Append(&t_->cl, s_);
- }
while ((s_->flow_controlled_buffer.length > 0 ||
s_->compressed_data_buffer.length > 0) &&
data_send_context.max_outgoing() > 0) {
@@ -500,6 +498,9 @@ class StreamWriteContext {
data_send_context.CompressMoreBytes();
}
}
+ if (s_->traced && grpc_endpoint_can_track_err(t_->ep)) {
+ grpc_core::ContextList::Append(&t_->cl, s_);
+ }
write_context_->ResetPingClock();
if (data_send_context.is_last_frame()) {
SentLastFrame();
diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h
index 65c2b9634f..66dc751a56 100644
--- a/src/core/lib/gprpp/inlined_vector.h
+++ b/src/core/lib/gprpp/inlined_vector.h
@@ -100,10 +100,7 @@ class InlinedVector {
void reserve(size_t capacity) {
if (capacity > capacity_) {
T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * capacity));
- for (size_t i = 0; i < size_; ++i) {
- new (&new_dynamic[i]) T(std::move(data()[i]));
- data()[i].~T();
- }
+ move_elements(data(), new_dynamic, size_);
gpr_free(dynamic_);
dynamic_ = new_dynamic;
capacity_ = capacity;
@@ -131,13 +128,25 @@ class InlinedVector {
size_--;
}
+ size_t size() const { return size_; }
+ bool empty() const { return size_ == 0; }
+
+ size_t capacity() const { return capacity_; }
+
+ void clear() {
+ destroy_elements();
+ init_data();
+ }
+
+ private:
void copy_from(const InlinedVector& v) {
- // if v is allocated, copy over the buffer.
+ // if v is allocated, make sure we have enough capacity.
if (v.dynamic_ != nullptr) {
reserve(v.capacity_);
- memcpy(dynamic_, v.dynamic_, v.size_ * sizeof(T));
- } else {
- memcpy(inline_, v.inline_, v.size_ * sizeof(T));
+ }
+ // copy over elements
+ for (size_t i = 0; i < v.size_; ++i) {
+ new (&(data()[i])) T(v[i]);
}
// copy over metadata
size_ = v.size_;
@@ -145,11 +154,12 @@ class InlinedVector {
}
void move_from(InlinedVector& v) {
- // if v is allocated, then we steal its buffer, else we copy it.
+ // if v is allocated, then we steal its dynamic array; otherwise, we
+ // move the elements individually.
if (v.dynamic_ != nullptr) {
dynamic_ = v.dynamic_;
} else {
- memcpy(inline_, v.inline_, v.size_ * sizeof(T));
+ move_elements(v.data(), data(), v.size_);
}
// copy over metadata
size_ = v.size_;
@@ -158,17 +168,13 @@ class InlinedVector {
v.init_data();
}
- size_t size() const { return size_; }
- bool empty() const { return size_ == 0; }
-
- size_t capacity() const { return capacity_; }
-
- void clear() {
- destroy_elements();
- init_data();
+ static void move_elements(T* src, T* dst, size_t num_elements) {
+ for (size_t i = 0; i < num_elements; ++i) {
+ new (&dst[i]) T(std::move(src[i]));
+ src[i].~T();
+ }
}
- private:
void init_data() {
dynamic_ = nullptr;
size_ = 0;
diff --git a/src/core/lib/iomgr/buffer_list.cc b/src/core/lib/iomgr/buffer_list.cc
index e20dab15b1..ace17a108d 100644
--- a/src/core/lib/iomgr/buffer_list.cc
+++ b/src/core/lib/iomgr/buffer_list.cc
@@ -35,6 +35,9 @@ void TracedBuffer::AddNewEntry(TracedBuffer** head, uint32_t seq_no,
TracedBuffer* new_elem = New<TracedBuffer>(seq_no, arg);
/* Store the current time as the sendmsg time. */
new_elem->ts_.sendmsg_time = gpr_now(GPR_CLOCK_REALTIME);
+ new_elem->ts_.scheduled_time = gpr_inf_past(GPR_CLOCK_REALTIME);
+ new_elem->ts_.sent_time = gpr_inf_past(GPR_CLOCK_REALTIME);
+ new_elem->ts_.acked_time = gpr_inf_past(GPR_CLOCK_REALTIME);
if (*head == nullptr) {
*head = new_elem;
return;
diff --git a/src/core/lib/iomgr/buffer_list.h b/src/core/lib/iomgr/buffer_list.h
index 87d74f9ce2..627f1bde99 100644
--- a/src/core/lib/iomgr/buffer_list.h
+++ b/src/core/lib/iomgr/buffer_list.h
@@ -37,6 +37,8 @@ struct Timestamps {
gpr_timespec scheduled_time;
gpr_timespec sent_time;
gpr_timespec acked_time;
+
+ uint32_t byte_offset; /* byte offset relative to the start of the RPC */
};
/** TracedBuffer is a class to keep track of timestamps for a specific buffer in
@@ -73,7 +75,7 @@ class TracedBuffer {
private:
GPRC_ALLOW_CLASS_TO_USE_NON_PUBLIC_NEW
- TracedBuffer(int seq_no, void* arg)
+ TracedBuffer(uint32_t seq_no, void* arg)
: seq_no_(seq_no), arg_(arg), next_(nullptr) {}
uint32_t seq_no_; /* The sequence number for the last byte in the buffer */
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 38571b1957..4b8c891e9b 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1242,6 +1242,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
* Event engine binding
*/
+static void shutdown_background_closure(void) {}
+
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
@@ -1255,6 +1257,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
true,
+ false,
fd_create,
fd_wrapped_fd,
@@ -1284,6 +1287,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd,
pollset_set_del_fd,
+ shutdown_background_closure,
shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index 06a382c556..7a4870db78 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1604,6 +1604,8 @@ static void pollset_set_del_pollset_set(grpc_pollset_set* bag,
* Event engine binding
*/
+static void shutdown_background_closure(void) {}
+
static void shutdown_engine(void) {
fd_global_shutdown();
pollset_global_shutdown();
@@ -1612,6 +1614,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
true,
+ false,
fd_create,
fd_wrapped_fd,
@@ -1641,6 +1644,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd,
pollset_set_del_fd,
+ shutdown_background_closure,
shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 16562538a6..67cbfbbd02 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -1782,6 +1782,8 @@ static void global_cv_fd_table_shutdown() {
* event engine binding
*/
+static void shutdown_background_closure(void) {}
+
static void shutdown_engine(void) {
pollset_global_shutdown();
if (grpc_cv_wakeup_fds_enabled()) {
@@ -1796,6 +1798,7 @@ static void shutdown_engine(void) {
static const grpc_event_engine_vtable vtable = {
sizeof(grpc_pollset),
false,
+ false,
fd_create,
fd_wrapped_fd,
@@ -1825,6 +1828,7 @@ static const grpc_event_engine_vtable vtable = {
pollset_set_add_fd,
pollset_set_del_fd,
+ shutdown_background_closure,
shutdown_engine,
};
diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc
index 6f6d134ced..32d1b6c43e 100644
--- a/src/core/lib/iomgr/ev_posix.cc
+++ b/src/core/lib/iomgr/ev_posix.cc
@@ -244,6 +244,10 @@ bool grpc_event_engine_can_track_errors(void) {
return false;
}
+bool grpc_event_engine_run_in_background(void) {
+ return g_event_engine->run_in_background;
+}
+
grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) {
GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err);
@@ -395,4 +399,8 @@ void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd) {
g_event_engine->pollset_set_del_fd(pollset_set, fd);
}
+void grpc_shutdown_background_closure(void) {
+ g_event_engine->shutdown_background_closure();
+}
+
#endif // GRPC_POSIX_SOCKET_EV
diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h
index b8fb8f534b..812c7a0f0f 100644
--- a/src/core/lib/iomgr/ev_posix.h
+++ b/src/core/lib/iomgr/ev_posix.h
@@ -42,6 +42,7 @@ typedef struct grpc_fd grpc_fd;
typedef struct grpc_event_engine_vtable {
size_t pollset_size;
bool can_track_err;
+ bool run_in_background;
grpc_fd* (*fd_create)(int fd, const char* name, bool track_err);
int (*fd_wrapped_fd)(grpc_fd* fd);
@@ -79,6 +80,7 @@ typedef struct grpc_event_engine_vtable {
void (*pollset_set_add_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd);
void (*pollset_set_del_fd)(grpc_pollset_set* pollset_set, grpc_fd* fd);
+ void (*shutdown_background_closure)(void);
void (*shutdown_engine)(void);
} grpc_event_engine_vtable;
@@ -101,6 +103,11 @@ const char* grpc_get_poll_strategy_name();
*/
bool grpc_event_engine_can_track_errors();
+/* Returns true if polling engine runs in the background, false otherwise.
+ * Currently only 'epollbg' runs in the background.
+ */
+bool grpc_event_engine_run_in_background();
+
/* Create a wrapped file descriptor.
Requires fd is a non-blocking file descriptor.
\a track_err if true means that error events would be tracked separately
@@ -174,6 +181,9 @@ void grpc_pollset_add_fd(grpc_pollset* pollset, struct grpc_fd* fd);
void grpc_pollset_set_add_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
void grpc_pollset_set_del_fd(grpc_pollset_set* pollset_set, grpc_fd* fd);
+/* Shut down all the closures registered in the background poller. */
+void grpc_shutdown_background_closure();
+
/* override to allow tests to hook poll() usage */
typedef int (*grpc_poll_function_type)(struct pollfd*, nfds_t, int);
extern grpc_poll_function_type grpc_poll_function;
diff --git a/src/core/lib/iomgr/fork_posix.cc b/src/core/lib/iomgr/fork_posix.cc
index e957bad73d..05ecd2a49b 100644
--- a/src/core/lib/iomgr/fork_posix.cc
+++ b/src/core/lib/iomgr/fork_posix.cc
@@ -60,7 +60,7 @@ void grpc_prefork() {
}
if (strcmp(grpc_get_poll_strategy_name(), "epoll1") != 0 &&
strcmp(grpc_get_poll_strategy_name(), "poll") != 0) {
- gpr_log(GPR_ERROR,
+ gpr_log(GPR_INFO,
"Fork support is only compatible with the epoll1 and poll polling "
"strategies");
}
diff --git a/src/core/lib/iomgr/iomgr.cc b/src/core/lib/iomgr/iomgr.cc
index fd09a6863b..eb29973514 100644
--- a/src/core/lib/iomgr/iomgr.cc
+++ b/src/core/lib/iomgr/iomgr.cc
@@ -157,6 +157,10 @@ void grpc_iomgr_shutdown() {
gpr_cv_destroy(&g_rcv);
}
+void grpc_iomgr_shutdown_background_closure() {
+ grpc_iomgr_platform_shutdown_background_closure();
+}
+
void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name) {
obj->name = gpr_strdup(name);
gpr_mu_lock(&g_mu);
diff --git a/src/core/lib/iomgr/iomgr.h b/src/core/lib/iomgr/iomgr.h
index 537ef8a6ff..8ea9289e06 100644
--- a/src/core/lib/iomgr/iomgr.h
+++ b/src/core/lib/iomgr/iomgr.h
@@ -35,6 +35,10 @@ void grpc_iomgr_start();
* exec_ctx. */
void grpc_iomgr_shutdown();
+/** Signals the intention to shutdown all the closures registered in the
+ * background poller. */
+void grpc_iomgr_shutdown_background_closure();
+
/* Exposed only for testing */
size_t grpc_iomgr_count_objects_for_testing();
diff --git a/src/core/lib/iomgr/iomgr_custom.cc b/src/core/lib/iomgr/iomgr_custom.cc
index d34c8e7cd1..4b112c9097 100644
--- a/src/core/lib/iomgr/iomgr_custom.cc
+++ b/src/core/lib/iomgr/iomgr_custom.cc
@@ -40,9 +40,11 @@ static void iomgr_platform_init(void) {
}
static void iomgr_platform_flush(void) {}
static void iomgr_platform_shutdown(void) { grpc_pollset_global_shutdown(); }
+static void iomgr_platform_shutdown_background_closure(void) {}
static grpc_iomgr_platform_vtable vtable = {
- iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown};
+ iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
+ iomgr_platform_shutdown_background_closure};
void grpc_custom_iomgr_init(grpc_socket_vtable* socket,
grpc_custom_resolver_vtable* resolver,
diff --git a/src/core/lib/iomgr/iomgr_internal.cc b/src/core/lib/iomgr/iomgr_internal.cc
index 32dbabb79d..b6c9211865 100644
--- a/src/core/lib/iomgr/iomgr_internal.cc
+++ b/src/core/lib/iomgr/iomgr_internal.cc
@@ -41,3 +41,7 @@ void grpc_iomgr_platform_init() { iomgr_platform_vtable->init(); }
void grpc_iomgr_platform_flush() { iomgr_platform_vtable->flush(); }
void grpc_iomgr_platform_shutdown() { iomgr_platform_vtable->shutdown(); }
+
+void grpc_iomgr_platform_shutdown_background_closure() {
+ iomgr_platform_vtable->shutdown_background_closure();
+}
diff --git a/src/core/lib/iomgr/iomgr_internal.h b/src/core/lib/iomgr/iomgr_internal.h
index b011d9c7b1..bca7409907 100644
--- a/src/core/lib/iomgr/iomgr_internal.h
+++ b/src/core/lib/iomgr/iomgr_internal.h
@@ -35,6 +35,7 @@ typedef struct grpc_iomgr_platform_vtable {
void (*init)(void);
void (*flush)(void);
void (*shutdown)(void);
+ void (*shutdown_background_closure)(void);
} grpc_iomgr_platform_vtable;
void grpc_iomgr_register_object(grpc_iomgr_object* obj, const char* name);
@@ -52,6 +53,9 @@ void grpc_iomgr_platform_flush(void);
/** tear down all platform specific global iomgr structures */
void grpc_iomgr_platform_shutdown(void);
+/** shut down all the closures registered in the background poller */
+void grpc_iomgr_platform_shutdown_background_closure(void);
+
bool grpc_iomgr_abort_on_leaks(void);
#endif /* GRPC_CORE_LIB_IOMGR_IOMGR_INTERNAL_H */
diff --git a/src/core/lib/iomgr/iomgr_posix.cc b/src/core/lib/iomgr/iomgr_posix.cc
index ca7334c9a4..9386adf060 100644
--- a/src/core/lib/iomgr/iomgr_posix.cc
+++ b/src/core/lib/iomgr/iomgr_posix.cc
@@ -51,8 +51,13 @@ static void iomgr_platform_shutdown(void) {
grpc_wakeup_fd_global_destroy();
}
+static void iomgr_platform_shutdown_background_closure(void) {
+ grpc_shutdown_background_closure();
+}
+
static grpc_iomgr_platform_vtable vtable = {
- iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown};
+ iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
+ iomgr_platform_shutdown_background_closure};
void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_posix_tcp_client_vtable);
diff --git a/src/core/lib/iomgr/iomgr_posix_cfstream.cc b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
index 235a9e0712..552ef4309c 100644
--- a/src/core/lib/iomgr/iomgr_posix_cfstream.cc
+++ b/src/core/lib/iomgr/iomgr_posix_cfstream.cc
@@ -54,8 +54,13 @@ static void iomgr_platform_shutdown(void) {
grpc_wakeup_fd_global_destroy();
}
+static void iomgr_platform_shutdown_background_closure(void) {
+ grpc_shutdown_background_closure();
+}
+
static grpc_iomgr_platform_vtable vtable = {
- iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown};
+ iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
+ iomgr_platform_shutdown_background_closure};
void grpc_set_default_iomgr_platform() {
char* enable_cfstream = getenv(grpc_cfstream_env_var);
diff --git a/src/core/lib/iomgr/iomgr_windows.cc b/src/core/lib/iomgr/iomgr_windows.cc
index cdef89cbf0..24ef0dba7b 100644
--- a/src/core/lib/iomgr/iomgr_windows.cc
+++ b/src/core/lib/iomgr/iomgr_windows.cc
@@ -71,8 +71,11 @@ static void iomgr_platform_shutdown(void) {
winsock_shutdown();
}
+static void iomgr_platform_shutdown_background_closure(void) {}
+
static grpc_iomgr_platform_vtable vtable = {
- iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown};
+ iomgr_platform_init, iomgr_platform_flush, iomgr_platform_shutdown,
+ iomgr_platform_shutdown_background_closure};
void grpc_set_default_iomgr_platform() {
grpc_set_tcp_client_impl(&grpc_windows_tcp_client_vtable);
diff --git a/src/core/lib/iomgr/resolve_address.h b/src/core/lib/iomgr/resolve_address.h
index 6afe94a7a9..7016ffc31a 100644
--- a/src/core/lib/iomgr/resolve_address.h
+++ b/src/core/lib/iomgr/resolve_address.h
@@ -65,7 +65,7 @@ void grpc_set_resolver_impl(grpc_address_resolver_vtable* vtable);
/* Asynchronously resolve addr. Use default_port if a port isn't designated
in addr, otherwise use the port in addr. */
-/* TODO(ctiller): add a timeout here */
+/* TODO(apolcyn): add a timeout here */
void grpc_resolve_address(const char* addr, const char* default_port,
grpc_pollset_set* interested_parties,
grpc_closure* on_done,
diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc
index 4a5ca615e5..cfcb190d60 100644
--- a/src/core/lib/iomgr/tcp_posix.cc
+++ b/src/core/lib/iomgr/tcp_posix.cc
@@ -260,10 +260,17 @@ static void notify_on_write(grpc_tcp* tcp) {
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_INFO, "TCP:%p notify_on_write", tcp);
}
- cover_self(tcp);
- GRPC_CLOSURE_INIT(&tcp->write_done_closure,
- tcp_drop_uncovered_then_handle_write, tcp,
- grpc_schedule_on_exec_ctx);
+ if (grpc_event_engine_run_in_background()) {
+ // If there is a polling engine always running in the background, there is
+ // no need to run the backup poller.
+ GRPC_CLOSURE_INIT(&tcp->write_done_closure, tcp_handle_write, tcp,
+ grpc_schedule_on_exec_ctx);
+ } else {
+ cover_self(tcp);
+ GRPC_CLOSURE_INIT(&tcp->write_done_closure,
+ tcp_drop_uncovered_then_handle_write, tcp,
+ grpc_schedule_on_exec_ctx);
+ }
grpc_fd_notify_on_write(tcp->em_fd, &tcp->write_done_closure);
}
@@ -627,7 +634,7 @@ static bool tcp_write_with_timestamps(grpc_tcp* tcp, struct msghdr* msg,
if (sending_length == static_cast<size_t>(length)) {
gpr_mu_lock(&tcp->tb_mu);
grpc_core::TracedBuffer::AddNewEntry(
- &tcp->tb_head, static_cast<int>(tcp->bytes_counter + length),
+ &tcp->tb_head, static_cast<uint32_t>(tcp->bytes_counter + length),
tcp->outgoing_buffer_arg);
gpr_mu_unlock(&tcp->tb_mu);
tcp->outgoing_buffer_arg = nullptr;
@@ -679,11 +686,9 @@ struct cmsghdr* process_timestamp(grpc_tcp* tcp, msghdr* msg,
}
/** For linux platforms, reads the socket's error queue and processes error
- * messages from the queue. Returns true if all the errors processed were
- * timestamps. Returns false if any of the errors were not timestamps. For
- * non-linux platforms, error processing is not used/enabled currently.
+ * messages from the queue.
*/
-static bool process_errors(grpc_tcp* tcp) {
+static void process_errors(grpc_tcp* tcp) {
while (true) {
struct iovec iov;
iov.iov_base = nullptr;
@@ -712,10 +717,10 @@ static bool process_errors(grpc_tcp* tcp) {
} while (r < 0 && saved_errno == EINTR);
if (r == -1 && saved_errno == EAGAIN) {
- return true; /* No more errors to process */
+ return; /* No more errors to process */
}
if (r == -1) {
- return false;
+ return;
}
if (grpc_tcp_trace.enabled()) {
if ((msg.msg_flags & MSG_CTRUNC) == 1) {
@@ -725,8 +730,9 @@ static bool process_errors(grpc_tcp* tcp) {
if (msg.msg_controllen == 0) {
/* There was no control message found. It was probably spurious. */
- return true;
+ return;
}
+ bool seen = false;
for (auto cmsg = CMSG_FIRSTHDR(&msg); cmsg && cmsg->cmsg_len;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level != SOL_SOCKET ||
@@ -738,9 +744,13 @@ static bool process_errors(grpc_tcp* tcp) {
"unknown control message cmsg_level:%d cmsg_type:%d",
cmsg->cmsg_level, cmsg->cmsg_type);
}
- return false;
+ return;
}
cmsg = process_timestamp(tcp, &msg, cmsg);
+ seen = true;
+ }
+ if (!seen) {
+ return;
}
}
}
diff --git a/src/core/lib/surface/init.cc b/src/core/lib/surface/init.cc
index c6198b8ae7..67cf5d89bf 100644
--- a/src/core/lib/surface/init.cc
+++ b/src/core/lib/surface/init.cc
@@ -161,6 +161,7 @@ void grpc_shutdown(void) {
if (--g_initializations == 0) {
{
grpc_core::ExecCtx exec_ctx(0);
+ grpc_iomgr_shutdown_background_closure();
{
grpc_timer_manager_set_threading(
false); // shutdown timer_manager thread
diff --git a/src/core/lib/transport/transport.cc b/src/core/lib/transport/transport.cc
index cbdb77c844..b32f9c6ec1 100644
--- a/src/core/lib/transport/transport.cc
+++ b/src/core/lib/transport/transport.cc
@@ -27,6 +27,7 @@
#include <grpc/support/log.h>
#include <grpc/support/sync.h>
+#include "src/core/lib/gpr/alloc.h"
#include "src/core/lib/gpr/string.h"
#include "src/core/lib/iomgr/executor.h"
#include "src/core/lib/slice/slice_internal.h"
@@ -149,7 +150,7 @@ void grpc_transport_move_stats(grpc_transport_stream_stats* from,
}
size_t grpc_transport_stream_size(grpc_transport* transport) {
- return transport->vtable->sizeof_stream;
+ return GPR_ROUND_UP_TO_ALIGNMENT_SIZE(transport->vtable->sizeof_stream);
}
void grpc_transport_destroy(grpc_transport* transport) {
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index edfa7030d1..5ce568834e 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -129,7 +129,8 @@ struct grpc_transport_stream_op_batch {
recv_initial_metadata(false),
recv_message(false),
recv_trailing_metadata(false),
- cancel_stream(false) {}
+ cancel_stream(false),
+ is_traced(false) {}
/** Should be scheduled when all of the non-recv operations in the batch
are complete.
@@ -167,6 +168,9 @@ struct grpc_transport_stream_op_batch {
/** Cancel this stream with the provided error */
bool cancel_stream : 1;
+ /** Is this stream traced */
+ bool is_traced : 1;
+
/***************************************************************************
* remaining fields are initialized and used at the discretion of the
* current handler of the op */
diff --git a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
index 1de6264183..43d0979f4b 100644
--- a/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
+++ b/src/core/tsi/alts/handshaker/alts_handshaker_client.cc
@@ -116,12 +116,13 @@ void alts_handshaker_client_handle_response(alts_handshaker_client* c,
"cb is nullptr in alts_tsi_handshaker_handle_response()");
return;
}
- if (handshaker == nullptr || recv_buffer == nullptr) {
+ if (handshaker == nullptr) {
gpr_log(GPR_ERROR,
- "Invalid arguments to alts_tsi_handshaker_handle_response()");
+ "handshaker is nullptr in alts_tsi_handshaker_handle_response()");
cb(TSI_INTERNAL_ERROR, user_data, nullptr, 0, nullptr);
return;
}
+ /* TSI handshake has been shutdown. */
if (alts_tsi_handshaker_has_shutdown(handshaker)) {
gpr_log(GPR_ERROR, "TSI handshake shutdown");
cb(TSI_HANDSHAKE_SHUTDOWN, user_data, nullptr, 0, nullptr);
@@ -133,6 +134,12 @@ void alts_handshaker_client_handle_response(alts_handshaker_client* c,
cb(TSI_INTERNAL_ERROR, user_data, nullptr, 0, nullptr);
return;
}
+ if (recv_buffer == nullptr) {
+ gpr_log(GPR_ERROR,
+ "recv_buffer is nullptr in alts_tsi_handshaker_handle_response()");
+ cb(TSI_INTERNAL_ERROR, user_data, nullptr, 0, nullptr);
+ return;
+ }
grpc_gcp_handshaker_resp* resp =
alts_tsi_utils_deserialize_response(recv_buffer);
grpc_byte_buffer_destroy(client->recv_buffer);
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 87902b26f0..f61c1b5317 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -72,4 +72,13 @@ void GenericStub::experimental_type::UnaryCall(
context, request, response, std::move(on_completion));
}
+void GenericStub::experimental_type::PrepareBidiStreamingCall(
+ ClientContext* context, const grpc::string& method,
+ experimental::ClientBidiReactor<ByteBuffer, ByteBuffer>* reactor) {
+ internal::ClientCallbackReaderWriterFactory<ByteBuffer, ByteBuffer>::Create(
+ stub_->channel_.get(),
+ internal::RpcMethod(method.c_str(), internal::RpcMethod::BIDI_STREAMING),
+ context, reactor);
+}
+
} // namespace grpc
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index 0a51cf5626..69af43a656 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -291,7 +291,7 @@ class Server::SyncRequest final : public internal::CompletionQueueTag {
void ContinueRunAfterInterception() {
{
- ctx_.BeginCompletionOp(&call_, false);
+ ctx_.BeginCompletionOp(&call_, nullptr, nullptr);
global_callbacks_->PreSynchronousRequest(&ctx_);
auto* handler = resources_ ? method_->handler()
: server_->resource_exhausted_handler_.get();
@@ -456,7 +456,6 @@ class Server::CallbackRequest final : public internal::CompletionQueueTag {
}
}
void ContinueRunAfterInterception() {
- req_->ctx_.BeginCompletionOp(call_, true);
req_->method_->handler()->RunHandler(
internal::MethodHandler::HandlerParameter(
call_, &req_->ctx_, req_->request_, req_->request_status_,
@@ -1018,7 +1017,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
}
}
if (*status && call_) {
- context_->BeginCompletionOp(&call_wrapper_, false);
+ context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
}
*tag = tag_;
if (delete_on_finalize_) {
@@ -1029,7 +1028,7 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
void ServerInterface::BaseAsyncRequest::
ContinueFinalizeResultAfterInterception() {
- context_->BeginCompletionOp(&call_wrapper_, false);
+ context_->BeginCompletionOp(&call_wrapper_, nullptr, nullptr);
// Queue a tag which will be returned immediately
grpc_core::ExecCtx exec_ctx;
grpc_cq_begin_op(notification_cq_->cq(), this);
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 9c01f896e6..1b524bc3e8 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -17,6 +17,7 @@
*/
#include <grpcpp/server_context.h>
+#include <grpcpp/support/server_callback.h>
#include <algorithm>
#include <mutex>
@@ -41,8 +42,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
// must ref the call before calling constructor and after deleting this
- CompletionOp(internal::Call* call)
+ CompletionOp(internal::Call* call, internal::ServerReactor* reactor)
: call_(*call),
+ reactor_(reactor),
has_tag_(false),
tag_(nullptr),
core_cq_tag_(this),
@@ -124,9 +126,9 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
return;
}
/* Start a dummy op so that we can return the tag */
- GPR_CODEGEN_ASSERT(GRPC_CALL_OK ==
- g_core_codegen_interface->grpc_call_start_batch(
- call_.call(), nullptr, 0, this, nullptr));
+ GPR_CODEGEN_ASSERT(
+ GRPC_CALL_OK ==
+ grpc_call_start_batch(call_.call(), nullptr, 0, core_cq_tag_, nullptr));
}
private:
@@ -136,13 +138,14 @@ class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
}
internal::Call call_;
+ internal::ServerReactor* reactor_;
bool has_tag_;
void* tag_;
void* core_cq_tag_;
std::mutex mu_;
int refs_;
bool finalized_;
- int cancelled_;
+ int cancelled_; // This is an int (not bool) because it is passed to core
bool done_intercepting_;
internal::InterceptorBatchMethodsImpl interceptor_methods_;
};
@@ -190,7 +193,16 @@ bool ServerContext::CompletionOp::FinalizeResult(void** tag, bool* status) {
}
finalized_ = true;
- if (!*status) cancelled_ = 1;
+ // If for some reason the incoming status is false, mark that as a
+ // cancellation.
+ // TODO(vjpai): does this ever happen?
+ if (!*status) {
+ cancelled_ = 1;
+ }
+
+ if (cancelled_ && (reactor_ != nullptr)) {
+ reactor_->OnCancel();
+ }
/* Release the lock since we are going to be running through interceptors now
*/
lock.unlock();
@@ -251,21 +263,25 @@ void ServerContext::Clear() {
initial_metadata_.clear();
trailing_metadata_.clear();
client_metadata_.Reset();
- if (call_) {
- grpc_call_unref(call_);
- }
if (completion_op_) {
completion_op_->Unref();
+ completion_op_ = nullptr;
completion_tag_.Clear();
}
if (rpc_info_) {
rpc_info_->Unref();
+ rpc_info_ = nullptr;
+ }
+ if (call_) {
+ auto* call = call_;
+ call_ = nullptr;
+ grpc_call_unref(call);
}
- // Don't need to clear out call_, completion_op_, or rpc_info_ because this is
- // either called from destructor or just before Setup
}
-void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) {
+void ServerContext::BeginCompletionOp(internal::Call* call,
+ std::function<void(bool)> callback,
+ internal::ServerReactor* reactor) {
GPR_ASSERT(!completion_op_);
if (rpc_info_) {
rpc_info_->Ref();
@@ -273,10 +289,11 @@ void ServerContext::BeginCompletionOp(internal::Call* call, bool callback) {
grpc_call_ref(call->call());
completion_op_ =
new (grpc_call_arena_alloc(call->call(), sizeof(CompletionOp)))
- CompletionOp(call);
- if (callback) {
- completion_tag_.Set(call->call(), nullptr, completion_op_);
+ CompletionOp(call, reactor);
+ if (callback != nullptr) {
+ completion_tag_.Set(call->call(), std::move(callback), completion_op_);
completion_op_->set_core_cq_tag(&completion_tag_);
+ completion_op_->set_tag(completion_op_);
} else if (has_notify_when_done_tag_) {
completion_op_->set_tag(async_notify_when_done_tag_);
}
diff --git a/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs b/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs
index 3a161763fd..b10d7a1045 100644
--- a/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs
+++ b/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs
@@ -25,7 +25,7 @@ namespace Grpc.Core.Tests
{
public class AppDomainUnloadTest
{
-#if NETCOREAPP1_0
+#if NETCOREAPP1_1 || NETCOREAPP2_1
[Test]
[Ignore("Not supported for CoreCLR")]
public void AppDomainUnloadHookCanCleanupAbandonedCall()
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index d58f046824..178931a3d7 100755
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.Core.Tests</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.Core.Tests</PackageId>
diff --git a/src/csharp/Grpc.Core.Tests/NUnitMain.cs b/src/csharp/Grpc.Core.Tests/NUnitMain.cs
index 49cb8cd3b9..221a6b823a 100644
--- a/src/csharp/Grpc.Core.Tests/NUnitMain.cs
+++ b/src/csharp/Grpc.Core.Tests/NUnitMain.cs
@@ -34,7 +34,7 @@ namespace Grpc.Core.Tests
{
// Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406.
GrpcEnvironment.SetLogger(new ConsoleLogger());
-#if NETCOREAPP1_0
+#if NETCOREAPP1_1 || NETCOREAPP2_1
return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In);
#else
return new AutoRun().Execute(args);
diff --git a/src/csharp/Grpc.Core.Tests/SanityTest.cs b/src/csharp/Grpc.Core.Tests/SanityTest.cs
index 0904453b6e..f785f70f4c 100644
--- a/src/csharp/Grpc.Core.Tests/SanityTest.cs
+++ b/src/csharp/Grpc.Core.Tests/SanityTest.cs
@@ -31,7 +31,7 @@ namespace Grpc.Core.Tests
public class SanityTest
{
// TODO: make sanity test work for CoreCLR as well
-#if !NETCOREAPP1_0
+#if !NETCOREAPP1_1 && !NETCOREAPP2_1
/// <summary>
/// Because we depend on a native library, sometimes when things go wrong, the
/// entire NUnit test process crashes. To be able to track down problems better,
@@ -44,7 +44,7 @@ namespace Grpc.Core.Tests
public void TestsJsonUpToDate()
{
var discoveredTests = DiscoverAllTestClasses();
- var testsFromFile
+ var testsFromFile
= JsonConvert.DeserializeObject<Dictionary<string, List<string>>>(ReadTestsJson());
Assert.AreEqual(discoveredTests, testsFromFile);
diff --git a/src/csharp/Grpc.Core/Internal/NativeExtension.cs b/src/csharp/Grpc.Core/Internal/NativeExtension.cs
index f526b913af..5177b69fd9 100644
--- a/src/csharp/Grpc.Core/Internal/NativeExtension.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeExtension.cs
@@ -83,13 +83,13 @@ namespace Grpc.Core.Internal
// See https://github.com/grpc/grpc/pull/7303 for one option.
var assemblyDirectory = Path.GetDirectoryName(GetAssemblyPath());
- // With old-style VS projects, the native libraries get copied using a .targets rule to the build output folder
+ // With "classic" VS projects, the native libraries get copied using a .targets rule to the build output folder
// alongside the compiled assembly.
// With dotnet cli projects targeting net45 framework, the native libraries (just the required ones)
// are similarly copied to the built output folder, through the magic of Microsoft.NETCore.Platforms.
var classicPath = Path.Combine(assemblyDirectory, GetNativeLibraryFilename());
- // With dotnet cli project targeting netcoreapp1.0, projects will use Grpc.Core assembly directly in the location where it got restored
+ // With dotnet cli project targeting netcoreappX.Y, projects will use Grpc.Core assembly directly in the location where it got restored
// by nuget. We locate the native libraries based on known structure of Grpc.Core nuget package.
// When "dotnet publish" is used, the runtimes directory is copied next to the published assemblies.
string runtimesDirectory = string.Format("runtimes/{0}/native", GetPlatformString());
diff --git a/src/csharp/Grpc.Examples.MathClient/Grpc.Examples.MathClient.csproj b/src/csharp/Grpc.Examples.MathClient/Grpc.Examples.MathClient.csproj
index db4e3ef4e3..1afcd9fba0 100755
--- a/src/csharp/Grpc.Examples.MathClient/Grpc.Examples.MathClient.csproj
+++ b/src/csharp/Grpc.Examples.MathClient/Grpc.Examples.MathClient.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.Examples.MathClient</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.Examples.MathClient</PackageId>
diff --git a/src/csharp/Grpc.Examples.MathServer/Grpc.Examples.MathServer.csproj b/src/csharp/Grpc.Examples.MathServer/Grpc.Examples.MathServer.csproj
index b12b418d01..75ef6d1008 100755
--- a/src/csharp/Grpc.Examples.MathServer/Grpc.Examples.MathServer.csproj
+++ b/src/csharp/Grpc.Examples.MathServer/Grpc.Examples.MathServer.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.Examples.MathServer</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.Examples.MathServer</PackageId>
diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
index 7493eb8051..93d112a0c5 100755
--- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
+++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.Examples.Tests</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.Examples.Tests</PackageId>
diff --git a/src/csharp/Grpc.Examples.Tests/NUnitMain.cs b/src/csharp/Grpc.Examples.Tests/NUnitMain.cs
index bcb8b46b64..0b9a5258ef 100644
--- a/src/csharp/Grpc.Examples.Tests/NUnitMain.cs
+++ b/src/csharp/Grpc.Examples.Tests/NUnitMain.cs
@@ -34,7 +34,7 @@ namespace Grpc.Examples.Tests
{
// Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406.
GrpcEnvironment.SetLogger(new ConsoleLogger());
-#if NETCOREAPP1_0
+#if NETCOREAPP1_1 || NETCOREAPP2_1
return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In);
#else
return new AutoRun().Execute(args);
diff --git a/src/csharp/Grpc.Examples/Grpc.Examples.csproj b/src/csharp/Grpc.Examples/Grpc.Examples.csproj
index baa3b4ce6c..9ce2b59d03 100755
--- a/src/csharp/Grpc.Examples/Grpc.Examples.csproj
+++ b/src/csharp/Grpc.Examples/Grpc.Examples.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.Examples</AssemblyName>
<PackageId>Grpc.Examples</PackageId>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
diff --git a/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj b/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj
index 616e56df10..2a037a72e5 100755
--- a/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj
+++ b/src/csharp/Grpc.HealthCheck.Tests/Grpc.HealthCheck.Tests.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.HealthCheck.Tests</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.HealthCheck.Tests</PackageId>
diff --git a/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs b/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs
index 365551e895..f5091863b8 100644
--- a/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs
+++ b/src/csharp/Grpc.HealthCheck.Tests/NUnitMain.cs
@@ -34,7 +34,7 @@ namespace Grpc.HealthCheck.Tests
{
// Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406.
GrpcEnvironment.SetLogger(new ConsoleLogger());
-#if NETCOREAPP1_0
+#if NETCOREAPP1_1 || NETCOREAPP2_1
return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In);
#else
return new AutoRun().Execute(args);
diff --git a/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj b/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj
index 35713156ea..1cd4b83e1e 100755
--- a/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj
+++ b/src/csharp/Grpc.IntegrationTesting.Client/Grpc.IntegrationTesting.Client.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.IntegrationTesting.Client</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.IntegrationTesting.Client</PackageId>
diff --git a/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj b/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj
index 3ecefe3bc4..2890a7df58 100755
--- a/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj
+++ b/src/csharp/Grpc.IntegrationTesting.QpsWorker/Grpc.IntegrationTesting.QpsWorker.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.IntegrationTesting.QpsWorker</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.IntegrationTesting.QpsWorker</PackageId>
diff --git a/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj b/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj
index 1092b2c21e..ee718958bc 100755
--- a/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj
+++ b/src/csharp/Grpc.IntegrationTesting.Server/Grpc.IntegrationTesting.Server.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.IntegrationTesting.Server</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.IntegrationTesting.Server</PackageId>
@@ -19,7 +19,7 @@
<Reference Include="System" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
-
+
<ItemGroup>
<Compile Include="..\Grpc.Core\Version.cs" />
</ItemGroup>
diff --git a/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj b/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj
index 22272547f6..99926497e4 100755
--- a/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj
+++ b/src/csharp/Grpc.IntegrationTesting.StressClient/Grpc.IntegrationTesting.StressClient.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.IntegrationTesting.StressClient</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.IntegrationTesting.StressClient</PackageId>
diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
index 8daf3fa98b..c342f8a107 100755
--- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
+++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.IntegrationTesting</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.IntegrationTesting</PackageId>
diff --git a/src/csharp/Grpc.IntegrationTesting/NUnitMain.cs b/src/csharp/Grpc.IntegrationTesting/NUnitMain.cs
index 9d24762e0a..6265953622 100644
--- a/src/csharp/Grpc.IntegrationTesting/NUnitMain.cs
+++ b/src/csharp/Grpc.IntegrationTesting/NUnitMain.cs
@@ -34,7 +34,7 @@ namespace Grpc.IntegrationTesting
{
// Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406.
GrpcEnvironment.SetLogger(new ConsoleLogger());
-#if NETCOREAPP1_0
+#if NETCOREAPP1_1 || NETCOREAPP2_1
return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In);
#else
return new AutoRun().Execute(args);
diff --git a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj
index d39d46cf1b..5b1656080a 100644
--- a/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj
+++ b/src/csharp/Grpc.Microbenchmarks/Grpc.Microbenchmarks.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.Microbenchmarks</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.Microbenchmarks</PackageId>
diff --git a/src/csharp/Grpc.Reflection.Tests/Grpc.Reflection.Tests.csproj b/src/csharp/Grpc.Reflection.Tests/Grpc.Reflection.Tests.csproj
index 0c12f38f25..8b586c6ecb 100755
--- a/src/csharp/Grpc.Reflection.Tests/Grpc.Reflection.Tests.csproj
+++ b/src/csharp/Grpc.Reflection.Tests/Grpc.Reflection.Tests.csproj
@@ -4,7 +4,7 @@
<Import Project="..\Grpc.Core\Common.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<AssemblyName>Grpc.Reflection.Tests</AssemblyName>
<OutputType>Exe</OutputType>
<PackageId>Grpc.Reflection.Tests</PackageId>
diff --git a/src/csharp/Grpc.Reflection.Tests/NUnitMain.cs b/src/csharp/Grpc.Reflection.Tests/NUnitMain.cs
index 49ed1cc8d4..1be7f6542b 100644
--- a/src/csharp/Grpc.Reflection.Tests/NUnitMain.cs
+++ b/src/csharp/Grpc.Reflection.Tests/NUnitMain.cs
@@ -34,7 +34,7 @@ namespace Grpc.Reflection.Tests
{
// Make logger immune to NUnit capturing stdout and stderr to workaround https://github.com/nunit/nunit/issues/1406.
GrpcEnvironment.SetLogger(new ConsoleLogger());
-#if NETCOREAPP1_0
+#if NETCOREAPP1_1 || NETCOREAPP2_1
return new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args, new ExtendedTextWrapper(Console.Out), Console.In);
#else
return new AutoRun().Execute(args);
diff --git a/src/csharp/Grpc.Tools.Tests/Grpc.Tools.Tests.csproj b/src/csharp/Grpc.Tools.Tests/Grpc.Tools.Tests.csproj
index a2d4874eec..cfb40f44ae 100644
--- a/src/csharp/Grpc.Tools.Tests/Grpc.Tools.Tests.csproj
+++ b/src/csharp/Grpc.Tools.Tests/Grpc.Tools.Tests.csproj
@@ -3,7 +3,7 @@
<Import Project="..\Grpc.Core\Version.csproj.include" />
<PropertyGroup>
- <TargetFrameworks>net45;netcoreapp1.0</TargetFrameworks>
+ <TargetFrameworks>net45;netcoreapp1.1</TargetFrameworks>
<OutputType>Exe</OutputType>
</PropertyGroup>
diff --git a/src/csharp/Grpc.Tools.Tests/NUnitMain.cs b/src/csharp/Grpc.Tools.Tests/NUnitMain.cs
index 418c33820e..13ba8b51d0 100644
--- a/src/csharp/Grpc.Tools.Tests/NUnitMain.cs
+++ b/src/csharp/Grpc.Tools.Tests/NUnitMain.cs
@@ -24,7 +24,7 @@ namespace Grpc.Tools.Tests
static class NUnitMain
{
public static int Main(string[] args) =>
-#if NETCOREAPP1_0 || NETCOREAPP1_1
+#if NETCOREAPP1_1 || NETCOREAPP2_1
new AutoRun(typeof(NUnitMain).GetTypeInfo().Assembly).Execute(args);
#else
new AutoRun().Execute(args);
diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c
index f7d5a85876..111c6f4867 100644
--- a/src/php/ext/grpc/php_grpc.c
+++ b/src/php/ext/grpc/php_grpc.c
@@ -170,7 +170,9 @@ void prefork() {
acquire_persistent_locks();
}
-void postfork_child(TSRMLS_D) {
+void postfork_child() {
+ TSRMLS_FETCH();
+
// loop through persistant list and destroy all underlying grpc_channel objs
destroy_grpc_channels();
diff --git a/src/proto/grpc/channelz/BUILD b/src/proto/grpc/channelz/BUILD
index bdb03d5e2d..b6b485e3e8 100644
--- a/src/proto/grpc/channelz/BUILD
+++ b/src/proto/grpc/channelz/BUILD
@@ -24,3 +24,10 @@ grpc_proto_library(
has_services = True,
well_known_protos = True,
)
+
+filegroup(
+ name = "channelz_proto_file",
+ srcs = [
+ "channelz.proto",
+ ],
+)
diff --git a/src/proto/grpc/health/v1/BUILD b/src/proto/grpc/health/v1/BUILD
index 97642985c9..38a7d992e0 100644
--- a/src/proto/grpc/health/v1/BUILD
+++ b/src/proto/grpc/health/v1/BUILD
@@ -29,4 +29,3 @@ filegroup(
"health.proto",
],
)
-
diff --git a/src/proto/grpc/testing/compiler_test.proto b/src/proto/grpc/testing/compiler_test.proto
index db5ca48331..9fa5590a59 100644
--- a/src/proto/grpc/testing/compiler_test.proto
+++ b/src/proto/grpc/testing/compiler_test.proto
@@ -20,6 +20,9 @@
syntax = "proto3";
// Ignored detached comment
+// The comments in this file are not meant for readability
+// but rather to test to make sure that the code generator
+// properly preserves comments on files, services, and RPCs
// Ignored package leading comment
package grpc.testing;
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 88c5b6d5be..6022fc3ef2 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -1723,7 +1723,7 @@ def server(thread_pool,
handlers. The interceptors are given control in the order they are
specified. This is an EXPERIMENTAL API.
options: An optional list of key-value pairs (channel args in gRPC runtime)
- to configure the channel.
+ to configure the channel.
maximum_concurrent_rpcs: The maximum number of concurrent RPCs this server
will service before returning RESOURCE_EXHAUSTED status, or None to
indicate no limit.
diff --git a/src/python/grpcio/grpc/_cython/BUILD.bazel b/src/python/grpcio/grpc/_cython/BUILD.bazel
index cfd3a51d9b..e318298d0a 100644
--- a/src/python/grpcio/grpc/_cython/BUILD.bazel
+++ b/src/python/grpcio/grpc/_cython/BUILD.bazel
@@ -12,6 +12,7 @@ pyx_library(
"_cygrpc/grpc_string.pyx.pxi",
"_cygrpc/arguments.pyx.pxi",
"_cygrpc/call.pyx.pxi",
+ "_cygrpc/channelz.pyx.pxi",
"_cygrpc/channel.pyx.pxi",
"_cygrpc/credentials.pyx.pxi",
"_cygrpc/completion_queue.pyx.pxi",
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/channelz.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/channelz.pyx.pxi
new file mode 100644
index 0000000000..113f7976dd
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/channelz.pyx.pxi
@@ -0,0 +1,69 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def channelz_get_top_channels(start_channel_id):
+ cdef char *c_returned_str = grpc_channelz_get_top_channels(
+ start_channel_id,
+ )
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get top channels, please ensure your' \
+ ' start_channel_id==%s is valid' % start_channel_id)
+ return c_returned_str
+
+def channelz_get_servers(start_server_id):
+ cdef char *c_returned_str = grpc_channelz_get_servers(start_server_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get servers, please ensure your' \
+ ' start_server_id==%s is valid' % start_server_id)
+ return c_returned_str
+
+def channelz_get_server(server_id):
+ cdef char *c_returned_str = grpc_channelz_get_server(server_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get the server, please ensure your' \
+ ' server_id==%s is valid' % server_id)
+ return c_returned_str
+
+def channelz_get_server_sockets(server_id, start_socket_id):
+ cdef char *c_returned_str = grpc_channelz_get_server_sockets(
+ server_id,
+ start_socket_id,
+ )
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get server sockets, please ensure your' \
+ ' server_id==%s and start_socket_id==%s is valid' %
+ (server_id, start_socket_id))
+ return c_returned_str
+
+def channelz_get_channel(channel_id):
+ cdef char *c_returned_str = grpc_channelz_get_channel(channel_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get the channel, please ensure your' \
+ ' channel_id==%s is valid' % (channel_id))
+ return c_returned_str
+
+def channelz_get_subchannel(subchannel_id):
+ cdef char *c_returned_str = grpc_channelz_get_subchannel(subchannel_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get the subchannel, please ensure your' \
+ ' subchannel_id==%s is valid' % (subchannel_id))
+ return c_returned_str
+
+def channelz_get_socket(socket_id):
+ cdef char *c_returned_str = grpc_channelz_get_socket(socket_id)
+ if c_returned_str == NULL:
+ raise ValueError('Failed to get the socket, please ensure your' \
+ ' socket_id==%s is valid' % (socket_id))
+ return c_returned_str
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 5aaf31e36c..5bbc10af25 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -13,6 +13,7 @@
# limitations under the License.
cimport libc.time
+from libc.stdint cimport intptr_t
# Typedef types with approximately the same semantics to provide their names to
@@ -384,6 +385,15 @@ cdef extern from "grpc/grpc.h":
void grpc_server_cancel_all_calls(grpc_server *server) nogil
void grpc_server_destroy(grpc_server *server) nogil
+ char* grpc_channelz_get_top_channels(intptr_t start_channel_id)
+ char* grpc_channelz_get_servers(intptr_t start_server_id)
+ char* grpc_channelz_get_server(intptr_t server_id)
+ char* grpc_channelz_get_server_sockets(intptr_t server_id,
+ intptr_t start_socket_id)
+ char* grpc_channelz_get_channel(intptr_t channel_id)
+ char* grpc_channelz_get_subchannel(intptr_t subchannel_id)
+ char* grpc_channelz_get_socket(intptr_t socket_id)
+
cdef extern from "grpc/grpc_security.h":
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index ae5c07bfc8..9ab919375c 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -35,6 +35,7 @@ include "_cygrpc/server.pyx.pxi"
include "_cygrpc/tag.pyx.pxi"
include "_cygrpc/time.pyx.pxi"
include "_cygrpc/_hooks.pyx.pxi"
+include "_cygrpc/channelz.pyx.pxi"
include "_cygrpc/grpc_gevent.pyx.pxi"
diff --git a/src/python/grpcio_channelz/.gitignore b/src/python/grpcio_channelz/.gitignore
new file mode 100644
index 0000000000..0c5da6b5af
--- /dev/null
+++ b/src/python/grpcio_channelz/.gitignore
@@ -0,0 +1,6 @@
+*.proto
+*_pb2.py
+*_pb2_grpc.py
+build/
+grpcio_channelz.egg-info/
+dist/
diff --git a/src/python/grpcio_channelz/MANIFEST.in b/src/python/grpcio_channelz/MANIFEST.in
new file mode 100644
index 0000000000..5597f375ba
--- /dev/null
+++ b/src/python/grpcio_channelz/MANIFEST.in
@@ -0,0 +1,3 @@
+include grpc_version.py
+recursive-include grpc_channelz *.py
+global-exclude *.pyc
diff --git a/src/python/grpcio_channelz/README.rst b/src/python/grpcio_channelz/README.rst
new file mode 100644
index 0000000000..efeaa56064
--- /dev/null
+++ b/src/python/grpcio_channelz/README.rst
@@ -0,0 +1,9 @@
+gRPC Python Channelz package
+==============================
+
+Channelz is a live debug tool in gRPC Python.
+
+Dependencies
+------------
+
+Depends on the `grpcio` package, available from PyPI via `pip install grpcio`.
diff --git a/src/python/grpcio_channelz/channelz_commands.py b/src/python/grpcio_channelz/channelz_commands.py
new file mode 100644
index 0000000000..e9ad355034
--- /dev/null
+++ b/src/python/grpcio_channelz/channelz_commands.py
@@ -0,0 +1,63 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Provides distutils command classes for the GRPC Python setup process."""
+
+import os
+import shutil
+
+import setuptools
+
+ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))
+CHANNELZ_PROTO = os.path.join(ROOT_DIR,
+ '../../proto/grpc/channelz/channelz.proto')
+
+
+class CopyProtoModules(setuptools.Command):
+ """Command to copy proto modules from grpc/src/proto."""
+
+ description = ''
+ user_options = []
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ pass
+
+ def run(self):
+ if os.path.isfile(CHANNELZ_PROTO):
+ shutil.copyfile(CHANNELZ_PROTO,
+ os.path.join(ROOT_DIR,
+ 'grpc_channelz/v1/channelz.proto'))
+
+
+class BuildPackageProtos(setuptools.Command):
+ """Command to generate project *_pb2.py modules from proto files."""
+
+ description = 'build grpc protobuf modules'
+ user_options = []
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ pass
+
+ def run(self):
+ # due to limitations of the proto generator, we require that only *one*
+ # directory is provided as an 'include' directory. We assume it's the '' key
+ # to `self.distribution.package_dir` (and get a key error if it's not
+ # there).
+ from grpc_tools import command
+ command.build_package_protos(self.distribution.package_dir[''])
diff --git a/src/python/grpcio_channelz/grpc_channelz/__init__.py b/src/python/grpcio_channelz/grpc_channelz/__init__.py
new file mode 100644
index 0000000000..38fdfc9c5c
--- /dev/null
+++ b/src/python/grpcio_channelz/grpc_channelz/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel b/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel
new file mode 100644
index 0000000000..aae8cedb76
--- /dev/null
+++ b/src/python/grpcio_channelz/grpc_channelz/v1/BUILD.bazel
@@ -0,0 +1,38 @@
+load("@grpc_python_dependencies//:requirements.bzl", "requirement")
+load("@org_pubref_rules_protobuf//python:rules.bzl", "py_proto_library")
+
+package(default_visibility = ["//visibility:public"])
+
+genrule(
+ name = "mv_channelz_proto",
+ srcs = [
+ "//src/proto/grpc/channelz:channelz_proto_file",
+ ],
+ outs = ["channelz.proto",],
+ cmd = "cp $< $@",
+)
+
+py_proto_library(
+ name = "py_channelz_proto",
+ protos = ["mv_channelz_proto",],
+ imports = [
+ "external/com_google_protobuf/src/",
+ ],
+ inputs = [
+ "@com_google_protobuf//:well_known_protos",
+ ],
+ with_grpc = True,
+ deps = [
+ requirement('protobuf'),
+ ],
+)
+
+py_library(
+ name = "grpc_channelz",
+ srcs = ["channelz.py",],
+ deps = [
+ ":py_channelz_proto",
+ "//src/python/grpcio/grpc:grpcio",
+ ],
+ imports=["../../",],
+)
diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/__init__.py b/src/python/grpcio_channelz/grpc_channelz/v1/__init__.py
new file mode 100644
index 0000000000..38fdfc9c5c
--- /dev/null
+++ b/src/python/grpcio_channelz/grpc_channelz/v1/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py b/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py
new file mode 100644
index 0000000000..573b9d0d5a
--- /dev/null
+++ b/src/python/grpcio_channelz/grpc_channelz/v1/channelz.py
@@ -0,0 +1,141 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Channelz debug service implementation in gRPC Python."""
+
+import grpc
+from grpc._cython import cygrpc
+
+import grpc_channelz.v1.channelz_pb2 as _channelz_pb2
+import grpc_channelz.v1.channelz_pb2_grpc as _channelz_pb2_grpc
+
+from google.protobuf import json_format
+
+
+class ChannelzServicer(_channelz_pb2_grpc.ChannelzServicer):
+ """Servicer handling RPCs for service statuses."""
+
+ @staticmethod
+ def GetTopChannels(request, context):
+ try:
+ return json_format.Parse(
+ cygrpc.channelz_get_top_channels(request.start_channel_id),
+ _channelz_pb2.GetTopChannelsResponse(),
+ )
+ except (ValueError, json_format.ParseError) as e:
+ context.set_code(grpc.StatusCode.INTERNAL)
+ context.set_details(str(e))
+
+ @staticmethod
+ def GetServers(request, context):
+ try:
+ return json_format.Parse(
+ cygrpc.channelz_get_servers(request.start_server_id),
+ _channelz_pb2.GetServersResponse(),
+ )
+ except (ValueError, json_format.ParseError) as e:
+ context.set_code(grpc.StatusCode.INTERNAL)
+ context.set_details(str(e))
+
+ @staticmethod
+ def GetServer(request, context):
+ try:
+ return json_format.Parse(
+ cygrpc.channelz_get_server(request.server_id),
+ _channelz_pb2.GetServerResponse(),
+ )
+ except ValueError as e:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ context.set_details(str(e))
+ except json_format.ParseError as e:
+ context.set_code(grpc.StatusCode.INTERNAL)
+ context.set_details(str(e))
+
+ @staticmethod
+ def GetServerSockets(request, context):
+ try:
+ return json_format.Parse(
+ cygrpc.channelz_get_server_sockets(request.server_id,
+ request.start_socket_id),
+ _channelz_pb2.GetServerSocketsResponse(),
+ )
+ except ValueError as e:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ context.set_details(str(e))
+ except json_format.ParseError as e:
+ context.set_code(grpc.StatusCode.INTERNAL)
+ context.set_details(str(e))
+
+ @staticmethod
+ def GetChannel(request, context):
+ try:
+ return json_format.Parse(
+ cygrpc.channelz_get_channel(request.channel_id),
+ _channelz_pb2.GetChannelResponse(),
+ )
+ except ValueError as e:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ context.set_details(str(e))
+ except json_format.ParseError as e:
+ context.set_code(grpc.StatusCode.INTERNAL)
+ context.set_details(str(e))
+
+ @staticmethod
+ def GetSubchannel(request, context):
+ try:
+ return json_format.Parse(
+ cygrpc.channelz_get_subchannel(request.subchannel_id),
+ _channelz_pb2.GetSubchannelResponse(),
+ )
+ except ValueError as e:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ context.set_details(str(e))
+ except json_format.ParseError as e:
+ context.set_code(grpc.StatusCode.INTERNAL)
+ context.set_details(str(e))
+
+ @staticmethod
+ def GetSocket(request, context):
+ try:
+ return json_format.Parse(
+ cygrpc.channelz_get_socket(request.socket_id),
+ _channelz_pb2.GetSocketResponse(),
+ )
+ except ValueError as e:
+ context.set_code(grpc.StatusCode.NOT_FOUND)
+ context.set_details(str(e))
+ except json_format.ParseError as e:
+ context.set_code(grpc.StatusCode.INTERNAL)
+ context.set_details(str(e))
+
+
+def add_channelz_servicer(server):
+ """Add Channelz servicer to a server. Channelz servicer is in charge of
+ pulling information from C-Core for entire process. It will allow the
+ server to response to Channelz queries.
+
+ The Channelz statistic is enabled by default inside C-Core. Whether the
+ statistic is enabled or not is isolated from adding Channelz servicer.
+ That means you can query Channelz info with a Channelz-disabled channel,
+ and you can add Channelz servicer to a Channelz-disabled server.
+
+ The Channelz statistic can be enabled or disabled by channel option
+ 'grpc.enable_channelz'. Set to 1 to enable, set to 0 to disable.
+
+ This is an EXPERIMENTAL API.
+
+ Args:
+ server: grpc.Server to which Channelz service will be added.
+ """
+ _channelz_pb2_grpc.add_ChannelzServicer_to_server(ChannelzServicer(),
+ server)
diff --git a/src/python/grpcio_channelz/grpc_version.py b/src/python/grpcio_channelz/grpc_version.py
new file mode 100644
index 0000000000..16356ea402
--- /dev/null
+++ b/src/python/grpcio_channelz/grpc_version.py
@@ -0,0 +1,17 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_channelz/grpc_version.py.template`!!!
+
+VERSION = '1.18.0.dev0'
diff --git a/src/python/grpcio_channelz/setup.py b/src/python/grpcio_channelz/setup.py
new file mode 100644
index 0000000000..a495052376
--- /dev/null
+++ b/src/python/grpcio_channelz/setup.py
@@ -0,0 +1,96 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Setup module for the GRPC Python package's Channelz."""
+
+import os
+import sys
+
+import setuptools
+
+# Ensure we're in the proper directory whether or not we're being used by pip.
+os.chdir(os.path.dirname(os.path.abspath(__file__)))
+
+# Break import-style to ensure we can actually find our local modules.
+import grpc_version
+
+
+class _NoOpCommand(setuptools.Command):
+ """No-op command."""
+
+ description = ''
+ user_options = []
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ pass
+
+ def run(self):
+ pass
+
+
+CLASSIFIERS = [
+ 'Development Status :: 5 - Production/Stable',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 2',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.4',
+ 'Programming Language :: Python :: 3.5',
+ 'Programming Language :: Python :: 3.6',
+ 'License :: OSI Approved :: Apache Software License',
+]
+
+PACKAGE_DIRECTORIES = {
+ '': '.',
+}
+
+INSTALL_REQUIRES = (
+ 'protobuf>=3.6.0',
+ 'grpcio>={version}'.format(version=grpc_version.VERSION),
+)
+
+try:
+ import channelz_commands as _channelz_commands
+ # we are in the build environment, otherwise the above import fails
+ SETUP_REQUIRES = (
+ 'grpcio-tools=={version}'.format(version=grpc_version.VERSION),)
+ COMMAND_CLASS = {
+ # Run preprocess from the repository *before* doing any packaging!
+ 'preprocess': _channelz_commands.CopyProtoModules,
+ 'build_package_protos': _channelz_commands.BuildPackageProtos,
+ }
+except ImportError:
+ SETUP_REQUIRES = ()
+ COMMAND_CLASS = {
+ # wire up commands to no-op not to break the external dependencies
+ 'preprocess': _NoOpCommand,
+ 'build_package_protos': _NoOpCommand,
+ }
+
+setuptools.setup(
+ name='grpcio-channelz',
+ version=grpc_version.VERSION,
+ license='Apache License 2.0',
+ description='Channel Level Live Debug Information Service for gRPC',
+ author='The gRPC Authors',
+ author_email='grpc-io@googlegroups.com',
+ classifiers=CLASSIFIERS,
+ url='https://grpc.io',
+ package_dir=PACKAGE_DIRECTORIES,
+ packages=setuptools.find_packages('.'),
+ install_requires=INSTALL_REQUIRES,
+ setup_requires=SETUP_REQUIRES,
+ cmdclass=COMMAND_CLASS)
diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py
index d163f6fb68..65e9a99950 100644
--- a/src/python/grpcio_tests/commands.py
+++ b/src/python/grpcio_tests/commands.py
@@ -132,7 +132,12 @@ class TestGevent(setuptools.Command):
'unit.beta._beta_features_test',
# TODO(https://github.com/grpc/grpc/issues/15411) unpin gevent version
# This test will stuck while running higher version of gevent
- 'unit._auth_context_test.AuthContextTest.testSessionResumption')
+ 'unit._auth_context_test.AuthContextTest.testSessionResumption',
+ # TODO(https://github.com/grpc/grpc/issues/17330) enable these three tests
+ 'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels',
+ 'channelz._channelz_servicer_test.ChannelzServicerTest.test_many_subchannels_and_sockets',
+ 'channelz._channelz_servicer_test.ChannelzServicerTest.test_streaming_rpc'
+ )
description = 'run tests with gevent. Assumes grpc/gevent are installed'
user_options = []
diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py
index 61c98fa038..f56425ac6d 100644
--- a/src/python/grpcio_tests/setup.py
+++ b/src/python/grpcio_tests/setup.py
@@ -39,6 +39,7 @@ PACKAGE_DIRECTORIES = {
INSTALL_REQUIRES = (
'coverage>=4.0', 'enum34>=1.0.4',
'grpcio>={version}'.format(version=grpc_version.VERSION),
+ 'grpcio-channelz>={version}'.format(version=grpc_version.VERSION),
'grpcio-tools>={version}'.format(version=grpc_version.VERSION),
'grpcio-health-checking>={version}'.format(version=grpc_version.VERSION),
'oauth2client>=1.4.7', 'protobuf>=3.6.0', 'six>=1.10', 'google-auth>=1.0.0',
diff --git a/src/python/grpcio_tests/tests/channelz/BUILD.bazel b/src/python/grpcio_tests/tests/channelz/BUILD.bazel
new file mode 100644
index 0000000000..63513616e7
--- /dev/null
+++ b/src/python/grpcio_tests/tests/channelz/BUILD.bazel
@@ -0,0 +1,15 @@
+package(default_visibility = ["//visibility:public"])
+
+py_test(
+ name = "channelz_servicer_test",
+ srcs = ["_channelz_servicer_test.py"],
+ main = "_channelz_servicer_test.py",
+ size = "small",
+ deps = [
+ "//src/python/grpcio/grpc:grpcio",
+ "//src/python/grpcio_channelz/grpc_channelz/v1:grpc_channelz",
+ "//src/python/grpcio_tests/tests/unit:test_common",
+ "//src/python/grpcio_tests/tests/unit/framework/common:common",
+ ],
+ imports = ["../../",],
+)
diff --git a/src/python/grpcio_tests/tests/channelz/__init__.py b/src/python/grpcio_tests/tests/channelz/__init__.py
new file mode 100644
index 0000000000..38fdfc9c5c
--- /dev/null
+++ b/src/python/grpcio_tests/tests/channelz/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
new file mode 100644
index 0000000000..84f8594689
--- /dev/null
+++ b/src/python/grpcio_tests/tests/channelz/_channelz_servicer_test.py
@@ -0,0 +1,494 @@
+# Copyright 2018 The gRPC Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tests of grpc_channelz.v1.channelz."""
+
+import unittest
+
+from concurrent import futures
+
+import grpc
+from grpc_channelz.v1 import channelz
+from grpc_channelz.v1 import channelz_pb2
+from grpc_channelz.v1 import channelz_pb2_grpc
+
+from tests.unit import test_common
+from tests.unit.framework.common import test_constants
+
+_SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary'
+_FAILED_UNARY_UNARY = '/test/FailedUnaryUnary'
+_SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream'
+
+_REQUEST = b'\x00\x00\x00'
+_RESPONSE = b'\x01\x01\x01'
+
+_DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),)
+_ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),)
+_DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),)
+
+
+def _successful_unary_unary(request, servicer_context):
+ return _RESPONSE
+
+
+def _failed_unary_unary(request, servicer_context):
+ servicer_context.set_code(grpc.StatusCode.INTERNAL)
+ servicer_context.set_details("Channelz Test Intended Failure")
+
+
+def _successful_stream_stream(request_iterator, servicer_context):
+ for _ in request_iterator:
+ yield _RESPONSE
+
+
+class _GenericHandler(grpc.GenericRpcHandler):
+
+ def service(self, handler_call_details):
+ if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
+ return grpc.unary_unary_rpc_method_handler(_successful_unary_unary)
+ elif handler_call_details.method == _FAILED_UNARY_UNARY:
+ return grpc.unary_unary_rpc_method_handler(_failed_unary_unary)
+ elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
+ return grpc.stream_stream_rpc_method_handler(
+ _successful_stream_stream)
+ else:
+ return None
+
+
+class _ChannelServerPair(object):
+
+ def __init__(self):
+ # Server will enable channelz service
+ # Bind as attribute, so its `del` can be called explicitly, during
+ # the destruction process. Otherwise, if the removal of server
+ # rely on gc cycle, the test will become non-deterministic.
+ self._server = grpc.server(
+ futures.ThreadPoolExecutor(max_workers=3),
+ options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ)
+ port = self._server.add_insecure_port('[::]:0')
+ self._server.add_generic_rpc_handlers((_GenericHandler(),))
+ self._server.start()
+
+ # Channel will enable channelz service...
+ self.channel = grpc.insecure_channel('localhost:%d' % port,
+ _ENABLE_CHANNELZ)
+
+ def __del__(self):
+ self._server.__del__()
+ self.channel.close()
+
+
+def _generate_channel_server_pairs(n):
+ return [_ChannelServerPair() for i in range(n)]
+
+
+def _clean_channel_server_pairs(pairs):
+ for pair in pairs:
+ pair.__del__()
+
+
+class ChannelzServicerTest(unittest.TestCase):
+
+ def _send_successful_unary_unary(self, idx):
+ _, r = self._pairs[idx].channel.unary_unary(
+ _SUCCESSFUL_UNARY_UNARY).with_call(_REQUEST)
+ self.assertEqual(r.code(), grpc.StatusCode.OK)
+
+ def _send_failed_unary_unary(self, idx):
+ try:
+ self._pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY).with_call(
+ _REQUEST)
+ except grpc.RpcError:
+ return
+ else:
+ self.fail("This call supposed to fail")
+
+ def _send_successful_stream_stream(self, idx):
+ response_iterator = self._pairs[idx].channel.stream_stream(
+ _SUCCESSFUL_STREAM_STREAM).__call__(
+ iter([_REQUEST] * test_constants.STREAM_LENGTH))
+ cnt = 0
+ for _ in response_iterator:
+ cnt += 1
+ self.assertEqual(cnt, test_constants.STREAM_LENGTH)
+
+ def _get_channel_id(self, idx):
+ """Channel id may not be consecutive"""
+ resp = self._channelz_stub.GetTopChannels(
+ channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
+ self.assertGreater(len(resp.channel), idx)
+ return resp.channel[idx].ref.channel_id
+
+ def setUp(self):
+ self._pairs = []
+ # This server is for Channelz info fetching only
+ # It self should not enable Channelz
+ self._server = grpc.server(
+ futures.ThreadPoolExecutor(max_workers=3),
+ options=_DISABLE_REUSE_PORT + _DISABLE_CHANNELZ)
+ port = self._server.add_insecure_port('[::]:0')
+ channelz.add_channelz_servicer(self._server)
+ self._server.start()
+
+ # This channel is used to fetch Channelz info only
+ # Channelz should not be enabled
+ self._channel = grpc.insecure_channel('localhost:%d' % port,
+ _DISABLE_CHANNELZ)
+ self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel)
+
+ def tearDown(self):
+ self._server.__del__()
+ self._channel.close()
+ _clean_channel_server_pairs(self._pairs)
+
+ def test_get_top_channels_basic(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ resp = self._channelz_stub.GetTopChannels(
+ channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
+ self.assertEqual(len(resp.channel), 1)
+ self.assertEqual(resp.end, True)
+
+ def test_get_top_channels_high_start_id(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ resp = self._channelz_stub.GetTopChannels(
+ channelz_pb2.GetTopChannelsRequest(start_channel_id=10000))
+ self.assertEqual(len(resp.channel), 0)
+ self.assertEqual(resp.end, True)
+
+ def test_successful_request(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ self._send_successful_unary_unary(0)
+ resp = self._channelz_stub.GetChannel(
+ channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
+ self.assertEqual(resp.channel.data.calls_started, 1)
+ self.assertEqual(resp.channel.data.calls_succeeded, 1)
+ self.assertEqual(resp.channel.data.calls_failed, 0)
+
+ def test_failed_request(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ self._send_failed_unary_unary(0)
+ resp = self._channelz_stub.GetChannel(
+ channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
+ self.assertEqual(resp.channel.data.calls_started, 1)
+ self.assertEqual(resp.channel.data.calls_succeeded, 0)
+ self.assertEqual(resp.channel.data.calls_failed, 1)
+
+ def test_many_requests(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ k_success = 7
+ k_failed = 9
+ for i in range(k_success):
+ self._send_successful_unary_unary(0)
+ for i in range(k_failed):
+ self._send_failed_unary_unary(0)
+ resp = self._channelz_stub.GetChannel(
+ channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
+ self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
+ self.assertEqual(resp.channel.data.calls_succeeded, k_success)
+ self.assertEqual(resp.channel.data.calls_failed, k_failed)
+
+ def test_many_channel(self):
+ k_channels = 4
+ self._pairs = _generate_channel_server_pairs(k_channels)
+ resp = self._channelz_stub.GetTopChannels(
+ channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
+ self.assertEqual(len(resp.channel), k_channels)
+
+ def test_many_requests_many_channel(self):
+ k_channels = 4
+ self._pairs = _generate_channel_server_pairs(k_channels)
+ k_success = 11
+ k_failed = 13
+ for i in range(k_success):
+ self._send_successful_unary_unary(0)
+ self._send_successful_unary_unary(2)
+ for i in range(k_failed):
+ self._send_failed_unary_unary(1)
+ self._send_failed_unary_unary(2)
+
+ # The first channel saw only successes
+ resp = self._channelz_stub.GetChannel(
+ channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
+ self.assertEqual(resp.channel.data.calls_started, k_success)
+ self.assertEqual(resp.channel.data.calls_succeeded, k_success)
+ self.assertEqual(resp.channel.data.calls_failed, 0)
+
+ # The second channel saw only failures
+ resp = self._channelz_stub.GetChannel(
+ channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1)))
+ self.assertEqual(resp.channel.data.calls_started, k_failed)
+ self.assertEqual(resp.channel.data.calls_succeeded, 0)
+ self.assertEqual(resp.channel.data.calls_failed, k_failed)
+
+ # The third channel saw both successes and failures
+ resp = self._channelz_stub.GetChannel(
+ channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2)))
+ self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
+ self.assertEqual(resp.channel.data.calls_succeeded, k_success)
+ self.assertEqual(resp.channel.data.calls_failed, k_failed)
+
+ # The fourth channel saw nothing
+ resp = self._channelz_stub.GetChannel(
+ channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3)))
+ self.assertEqual(resp.channel.data.calls_started, 0)
+ self.assertEqual(resp.channel.data.calls_succeeded, 0)
+ self.assertEqual(resp.channel.data.calls_failed, 0)
+
+ def test_many_subchannels(self):
+ k_channels = 4
+ self._pairs = _generate_channel_server_pairs(k_channels)
+ k_success = 17
+ k_failed = 19
+ for i in range(k_success):
+ self._send_successful_unary_unary(0)
+ self._send_successful_unary_unary(2)
+ for i in range(k_failed):
+ self._send_failed_unary_unary(1)
+ self._send_failed_unary_unary(2)
+
+ gtc_resp = self._channelz_stub.GetTopChannels(
+ channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
+ self.assertEqual(len(gtc_resp.channel), k_channels)
+ for i in range(k_channels):
+ # If no call performed in the channel, there shouldn't be any subchannel
+ if gtc_resp.channel[i].data.calls_started == 0:
+ self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
+ continue
+
+ # Otherwise, the subchannel should exist
+ self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
+ gsc_resp = self._channelz_stub.GetSubchannel(
+ channelz_pb2.GetSubchannelRequest(
+ subchannel_id=gtc_resp.channel[i].subchannel_ref[
+ 0].subchannel_id))
+ self.assertEqual(gtc_resp.channel[i].data.calls_started,
+ gsc_resp.subchannel.data.calls_started)
+ self.assertEqual(gtc_resp.channel[i].data.calls_succeeded,
+ gsc_resp.subchannel.data.calls_succeeded)
+ self.assertEqual(gtc_resp.channel[i].data.calls_failed,
+ gsc_resp.subchannel.data.calls_failed)
+
+ @unittest.skip('Servers in core are not guaranteed to be destroyed ' \
+ 'immediately when the reference goes out of scope, so ' \
+ 'servers from multiple test cases are not hermetic. ' \
+ 'TODO(https://github.com/grpc/grpc/issues/17258)')
+ def test_server_basic(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ resp = self._channelz_stub.GetServers(
+ channelz_pb2.GetServersRequest(start_server_id=0))
+ self.assertEqual(len(resp.server), 1)
+
+ @unittest.skip('Servers in core are not guaranteed to be destroyed ' \
+ 'immediately when the reference goes out of scope, so ' \
+ 'servers from multiple test cases are not hermetic. ' \
+ 'TODO(https://github.com/grpc/grpc/issues/17258)')
+ def test_get_one_server(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ gss_resp = self._channelz_stub.GetServers(
+ channelz_pb2.GetServersRequest(start_server_id=0))
+ self.assertEqual(len(gss_resp.server), 1)
+ gs_resp = self._channelz_stub.GetServer(
+ channelz_pb2.GetServerRequest(
+ server_id=gss_resp.server[0].ref.server_id))
+ self.assertEqual(gss_resp.server[0].ref.server_id,
+ gs_resp.server.ref.server_id)
+
+ @unittest.skip('Servers in core are not guaranteed to be destroyed ' \
+ 'immediately when the reference goes out of scope, so ' \
+ 'servers from multiple test cases are not hermetic. ' \
+ 'TODO(https://github.com/grpc/grpc/issues/17258)')
+ def test_server_call(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ k_success = 23
+ k_failed = 29
+ for i in range(k_success):
+ self._send_successful_unary_unary(0)
+ for i in range(k_failed):
+ self._send_failed_unary_unary(0)
+
+ resp = self._channelz_stub.GetServers(
+ channelz_pb2.GetServersRequest(start_server_id=0))
+ self.assertEqual(len(resp.server), 1)
+ self.assertEqual(resp.server[0].data.calls_started,
+ k_success + k_failed)
+ self.assertEqual(resp.server[0].data.calls_succeeded, k_success)
+ self.assertEqual(resp.server[0].data.calls_failed, k_failed)
+
+ def test_many_subchannels_and_sockets(self):
+ k_channels = 4
+ self._pairs = _generate_channel_server_pairs(k_channels)
+ k_success = 3
+ k_failed = 5
+ for i in range(k_success):
+ self._send_successful_unary_unary(0)
+ self._send_successful_unary_unary(2)
+ for i in range(k_failed):
+ self._send_failed_unary_unary(1)
+ self._send_failed_unary_unary(2)
+
+ gtc_resp = self._channelz_stub.GetTopChannels(
+ channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
+ self.assertEqual(len(gtc_resp.channel), k_channels)
+ for i in range(k_channels):
+ # If no call performed in the channel, there shouldn't be any subchannel
+ if gtc_resp.channel[i].data.calls_started == 0:
+ self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
+ continue
+
+ # Otherwise, the subchannel should exist
+ self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
+ gsc_resp = self._channelz_stub.GetSubchannel(
+ channelz_pb2.GetSubchannelRequest(
+ subchannel_id=gtc_resp.channel[i].subchannel_ref[
+ 0].subchannel_id))
+ self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
+
+ gs_resp = self._channelz_stub.GetSocket(
+ channelz_pb2.GetSocketRequest(
+ socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
+ self.assertEqual(gsc_resp.subchannel.data.calls_started,
+ gs_resp.socket.data.streams_started)
+ self.assertEqual(gsc_resp.subchannel.data.calls_started,
+ gs_resp.socket.data.streams_succeeded)
+ # Calls started == messages sent, only valid for unary calls
+ self.assertEqual(gsc_resp.subchannel.data.calls_started,
+ gs_resp.socket.data.messages_sent)
+ # Only receive responses when the RPC was successful
+ self.assertEqual(gsc_resp.subchannel.data.calls_succeeded,
+ gs_resp.socket.data.messages_received)
+
+ def test_streaming_rpc(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ # In C++, the argument for _send_successful_stream_stream is message length.
+ # Here the argument is still channel idx, to be consistent with the other two.
+ self._send_successful_stream_stream(0)
+
+ gc_resp = self._channelz_stub.GetChannel(
+ channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
+ self.assertEqual(gc_resp.channel.data.calls_started, 1)
+ self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
+ self.assertEqual(gc_resp.channel.data.calls_failed, 0)
+ # Subchannel exists
+ self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
+
+ gsc_resp = self._channelz_stub.GetSubchannel(
+ channelz_pb2.GetSubchannelRequest(
+ subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id))
+ self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
+ self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
+ self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
+ # Socket exists
+ self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
+
+ gs_resp = self._channelz_stub.GetSocket(
+ channelz_pb2.GetSocketRequest(
+ socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
+ self.assertEqual(gs_resp.socket.data.streams_started, 1)
+ self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
+ self.assertEqual(gs_resp.socket.data.streams_failed, 0)
+ self.assertEqual(gs_resp.socket.data.messages_sent,
+ test_constants.STREAM_LENGTH)
+ self.assertEqual(gs_resp.socket.data.messages_received,
+ test_constants.STREAM_LENGTH)
+
+ @unittest.skip('Servers in core are not guaranteed to be destroyed ' \
+ 'immediately when the reference goes out of scope, so ' \
+ 'servers from multiple test cases are not hermetic. ' \
+ 'TODO(https://github.com/grpc/grpc/issues/17258)')
+ def test_server_sockets(self):
+ self._pairs = _generate_channel_server_pairs(1)
+ self._send_successful_unary_unary(0)
+ self._send_failed_unary_unary(0)
+
+ gs_resp = self._channelz_stub.GetServers(
+ channelz_pb2.GetServersRequest(start_server_id=0))
+ self.assertEqual(len(gs_resp.server), 1)
+ self.assertEqual(gs_resp.server[0].data.calls_started, 2)
+ self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1)
+ self.assertEqual(gs_resp.server[0].data.calls_failed, 1)
+
+ gss_resp = self._channelz_stub.GetServerSockets(
+ channelz_pb2.GetServerSocketsRequest(
+ server_id=gs_resp.server[0].ref.server_id, start_socket_id=0))
+ # If the RPC call failed, it will raise a grpc.RpcError
+ # So, if there is no exception raised, considered pass
+
+ @unittest.skip('Servers in core are not guaranteed to be destroyed ' \
+ 'immediately when the reference goes out of scope, so ' \
+ 'servers from multiple test cases are not hermetic. ' \
+ 'TODO(https://github.com/grpc/grpc/issues/17258)')
+ def test_server_listen_sockets(self):
+ self._pairs = _generate_channel_server_pairs(1)
+
+ gss_resp = self._channelz_stub.GetServers(
+ channelz_pb2.GetServersRequest(start_server_id=0))
+ self.assertEqual(len(gss_resp.server), 1)
+ self.assertEqual(len(gss_resp.server[0].listen_socket), 1)
+
+ gs_resp = self._channelz_stub.GetSocket(
+ channelz_pb2.GetSocketRequest(
+ socket_id=gss_resp.server[0].listen_socket[0].socket_id))
+ # If the RPC call failed, it will raise a grpc.RpcError
+ # So, if there is no exception raised, considered pass
+
+ def test_invalid_query_get_server(self):
+ try:
+ self._channelz_stub.GetServer(
+ channelz_pb2.GetServerRequest(server_id=10000))
+ except BaseException as e:
+ self.assertIn('StatusCode.NOT_FOUND', str(e))
+ else:
+ self.fail('Invalid query not detected')
+
+ def test_invalid_query_get_channel(self):
+ try:
+ self._channelz_stub.GetChannel(
+ channelz_pb2.GetChannelRequest(channel_id=10000))
+ except BaseException as e:
+ self.assertIn('StatusCode.NOT_FOUND', str(e))
+ else:
+ self.fail('Invalid query not detected')
+
+ def test_invalid_query_get_subchannel(self):
+ try:
+ self._channelz_stub.GetSubchannel(
+ channelz_pb2.GetSubchannelRequest(subchannel_id=10000))
+ except BaseException as e:
+ self.assertIn('StatusCode.NOT_FOUND', str(e))
+ else:
+ self.fail('Invalid query not detected')
+
+ def test_invalid_query_get_socket(self):
+ try:
+ self._channelz_stub.GetSocket(
+ channelz_pb2.GetSocketRequest(socket_id=10000))
+ except BaseException as e:
+ self.assertIn('StatusCode.NOT_FOUND', str(e))
+ else:
+ self.fail('Invalid query not detected')
+
+ def test_invalid_query_get_server_sockets(self):
+ try:
+ self._channelz_stub.GetServerSockets(
+ channelz_pb2.GetServerSocketsRequest(
+ server_id=10000,
+ start_socket_id=0,
+ ))
+ except BaseException as e:
+ self.assertIn('StatusCode.NOT_FOUND', str(e))
+ else:
+ self.fail('Invalid query not detected')
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index a3006d9afc..9cffd3df19 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -1,5 +1,6 @@
[
"_sanity._sanity_test.SanityTest",
+ "channelz._channelz_servicer_test.ChannelzServicerTest",
"health_check._health_servicer_test.HealthServicerTest",
"interop._insecure_intraop_test.InsecureIntraopTest",
"interop._secure_intraop_test.SecureIntraopTest",
diff --git a/src/ruby/pb/grpc/health/checker.rb b/src/ruby/pb/grpc/health/checker.rb
index c492455d8f..7ad68409dd 100644
--- a/src/ruby/pb/grpc/health/checker.rb
+++ b/src/ruby/pb/grpc/health/checker.rb
@@ -14,7 +14,6 @@
require 'grpc'
require 'grpc/health/v1/health_services_pb'
-require 'thread'
module Grpc
# Health contains classes and modules that support providing a health check
@@ -37,9 +36,9 @@ module Grpc
@status_mutex.synchronize do
status = @statuses["#{req.service}"]
end
- if status.nil?
+ if status.nil?
fail GRPC::BadStatus.new_status_exception(StatusCodes::NOT_FOUND)
- end
+ end
HealthCheckResponse.new(status: status)
end