aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/config.h11
-rw-r--r--src/compiler/csharp_generator.cc116
-rw-r--r--src/core/ext/census/grpc_filter.c4
-rw-r--r--src/core/ext/client_config/client_channel.c11
-rw-r--r--src/core/ext/client_config/lb_policy.c4
-rw-r--r--src/core/ext/client_config/lb_policy.h6
-rw-r--r--src/core/ext/client_config/subchannel.c4
-rw-r--r--src/core/ext/client_config/subchannel.h3
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c5
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.h3
-rw-r--r--src/core/ext/lb_policy/pick_first/pick_first.c26
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c22
-rw-r--r--src/core/ext/load_reporting/load_reporting_filter.c2
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c8
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c18
-rw-r--r--src/core/lib/channel/channel_stack.c15
-rw-r--r--src/core/lib/channel/channel_stack.h25
-rw-r--r--src/core/lib/channel/compress_filter.c2
-rw-r--r--src/core/lib/channel/connected_channel.c11
-rw-r--r--src/core/lib/channel/http_client_filter.c2
-rw-r--r--src/core/lib/channel/http_server_filter.c2
-rw-r--r--src/core/lib/http/httpcli.c23
-rw-r--r--src/core/lib/http/httpcli.h5
-rw-r--r--src/core/lib/iomgr/polling_entity.c104
-rw-r--r--src/core/lib/iomgr/polling_entity.h81
-rw-r--r--src/core/lib/iomgr/pollset_set_windows.c5
-rw-r--r--src/core/lib/iomgr/timer.c3
-rw-r--r--src/core/lib/security/credentials/composite/composite_credentials.c13
-rw-r--r--src/core/lib/security/credentials/credentials.c4
-rw-r--r--src/core/lib/security/credentials/credentials.h6
-rw-r--r--src/core/lib/security/credentials/fake/fake_credentials.c2
-rw-r--r--src/core/lib/security/credentials/google_default/google_default_credentials.c25
-rw-r--r--src/core/lib/security/credentials/iam/iam_credentials.c2
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_credentials.c2
-rw-r--r--src/core/lib/security/credentials/jwt/jwt_verifier.c11
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.c14
-rw-r--r--src/core/lib/security/credentials/oauth2/oauth2_credentials.h2
-rw-r--r--src/core/lib/security/credentials/plugin/plugin_credentials.c2
-rw-r--r--src/core/lib/security/transport/client_auth_filter.c37
-rw-r--r--src/core/lib/security/transport/server_auth_filter.c18
-rw-r--r--src/core/lib/surface/call.c38
-rw-r--r--src/core/lib/surface/call.h2
-rw-r--r--src/core/lib/surface/channel.c30
-rw-r--r--src/core/lib/surface/channel.h5
-rw-r--r--src/core/lib/surface/lame_client.c2
-rw-r--r--src/core/lib/surface/server.c8
-rw-r--r--src/core/lib/transport/transport.c17
-rw-r--r--src/core/lib/transport/transport.h6
-rw-r--r--src/core/lib/transport/transport_impl.h4
-rw-r--r--src/core/plugin_registry/grpc_cronet_plugin_registry.c46
-rw-r--r--src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs90
-rw-r--r--src/csharp/Grpc.Core.Tests/ChannelTest.cs6
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj4
-rw-r--r--src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs12
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs4
-rw-r--r--src/csharp/Grpc.Core.Tests/MockServiceHelper.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/PInvokeTest.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/ServerTest.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/ShutdownHookClientTest.cs57
-rw-r--r--src/csharp/Grpc.Core.Tests/ShutdownHookPendingCallTest.cs69
-rw-r--r--src/csharp/Grpc.Core.Tests/ShutdownHookServerTest.cs58
-rw-r--r--src/csharp/Grpc.Core/AsyncClientStreamingCall.cs4
-rw-r--r--src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs4
-rw-r--r--src/csharp/Grpc.Core/AsyncServerStreamingCall.cs4
-rw-r--r--src/csharp/Grpc.Core/AsyncUnaryCall.cs4
-rw-r--r--src/csharp/Grpc.Core/CallOptions.cs8
-rw-r--r--src/csharp/Grpc.Core/Channel.cs55
-rw-r--r--src/csharp/Grpc.Core/ChannelState.cs2
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs121
-rw-r--r--src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs28
-rw-r--r--src/csharp/Grpc.Core/Server.cs99
-rw-r--r--src/csharp/Grpc.Core/ServerServiceDefinition.cs10
-rw-r--r--src/csharp/Grpc.Examples/MathGrpc.cs137
-rw-r--r--src/csharp/Grpc.HealthCheck/HealthGrpc.cs34
-rw-r--r--src/csharp/Grpc.IntegrationTesting/GenericService.cs2
-rw-r--r--src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs65
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs196
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestGrpc.cs232
-rw-r--r--src/csharp/tests.json4
-rw-r--r--src/objective-c/GRPCClient/GRPCCall+Cronet.h2
-rw-r--r--src/objective-c/GRPCClient/GRPCCall+Cronet.m2
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.h3
-rw-r--r--src/objective-c/GRPCClient/private/GRPCChannel.m8
-rw-r--r--src/objective-c/GRPCClient/private/GRPCHost.m9
-rw-r--r--src/proto/math/math.proto6
-rw-r--r--src/python/grpcio/grpc/__init__.py15
-rw-r--r--src/python/grpcio/grpc/_adapter/_implementations.py48
-rw-r--r--src/python/grpcio/grpc/_adapter/_low.py76
-rw-r--r--src/python/grpcio/grpc/_auth.py (renamed from src/python/grpcio/grpc/beta/_auth.py)6
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.c2
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.h4
-rw-r--r--src/python/grpcio/grpc/_server.py41
-rw-r--r--src/python/grpcio/grpc/beta/_client_adaptations.py566
-rw-r--r--src/python/grpcio/grpc/beta/_server_adaptations.py359
-rw-r--r--src/python/grpcio/grpc/beta/implementations.py220
-rw-r--r--src/python/grpcio/grpc/beta/interfaces.py89
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py4
-rw-r--r--src/python/grpcio/tests/tests.json1
-rw-r--r--src/python/grpcio/tests/unit/_auth_test.py (renamed from src/python/grpcio/tests/unit/beta/_auth_test.py)2
-rw-r--r--src/python/grpcio/tests/unit/_cython/_read_some_but_not_all_responses_test.py251
-rw-r--r--src/python/grpcio/tests/unit/beta/test_utilities.py13
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h4
103 files changed, 2437 insertions, 1428 deletions
diff --git a/src/compiler/config.h b/src/compiler/config.h
index d8b95818db..f2abb85186 100644
--- a/src/compiler/config.h
+++ b/src/compiler/config.h
@@ -34,8 +34,7 @@
#ifndef SRC_COMPILER_CONFIG_H
#define SRC_COMPILER_CONFIG_H
-#include <grpc++/support/config.h>
-#include <grpc++/support/config_protobuf.h>
+#include <grpc++/impl/codegen/config_protobuf.h>
#ifndef GRPC_CUSTOM_DESCRIPTOR
#include <google/protobuf/descriptor.h>
@@ -75,7 +74,15 @@
#define GRPC_CUSTOM_PARSEGENERATORPARAMETER ::google::protobuf::compiler::ParseGeneratorParameter
#endif
+#ifndef GRPC_CUSTOM_STRING
+#include <string>
+#define GRPC_CUSTOM_STRING std::string
+#endif
+
namespace grpc {
+
+typedef GRPC_CUSTOM_STRING string;
+
namespace protobuf {
typedef GRPC_CUSTOM_DESCRIPTOR Descriptor;
typedef GRPC_CUSTOM_FILEDESCRIPTOR FileDescriptor;
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc
index 484fa3cdcc..fc8feaf0fc 100644
--- a/src/compiler/csharp_generator.cc
+++ b/src/compiler/csharp_generator.cc
@@ -119,18 +119,10 @@ std::string GetServiceClassName(const ServiceDescriptor* service) {
return service->name();
}
-std::string GetClientInterfaceName(const ServiceDescriptor* service) {
- return "I" + service->name() + "Client";
-}
-
std::string GetClientClassName(const ServiceDescriptor* service) {
return service->name() + "Client";
}
-std::string GetServerInterfaceName(const ServiceDescriptor* service) {
- return "I" + service->name();
-}
-
std::string GetServerClassName(const ServiceDescriptor* service) {
return service->name() + "Base";
}
@@ -302,86 +294,6 @@ void GenerateServiceDescriptorProperty(Printer* out, const ServiceDescriptor *se
out->Print("\n");
}
-void GenerateClientInterface(Printer* out, const ServiceDescriptor *service) {
- out->Print("/// <summary>Client for $servicename$</summary>\n",
- "servicename", GetServiceClassName(service));
- out->Print("[System.Obsolete(\"Client side interfaced will be removed "
- "in the next release. Use client class directly.\")]\n");
- out->Print("public interface $name$\n", "name",
- GetClientInterfaceName(service));
- out->Print("{\n");
- out->Indent();
- for (int i = 0; i < service->method_count(); i++) {
- const MethodDescriptor *method = service->method(i);
- MethodType method_type = GetMethodType(method);
-
- if (method_type == METHODTYPE_NO_STREAMING) {
- // unary calls have an extra synchronous stub method
- GenerateDocCommentBody(out, method);
- out->Print(
- "$response$ $methodname$($request$ request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));\n",
- "methodname", method->name(), "request",
- GetClassName(method->input_type()), "response",
- GetClassName(method->output_type()));
-
- // overload taking CallOptions as a param
- GenerateDocCommentBody(out, method);
- out->Print(
- "$response$ $methodname$($request$ request, CallOptions options);\n",
- "methodname", method->name(), "request",
- GetClassName(method->input_type()), "response",
- GetClassName(method->output_type()));
- }
-
- std::string method_name = method->name();
- if (method_type == METHODTYPE_NO_STREAMING) {
- method_name += "Async"; // prevent name clash with synchronous method.
- }
- GenerateDocCommentBody(out, method);
- out->Print(
- "$returntype$ $methodname$($request_maybe$Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));\n",
- "methodname", method_name, "request_maybe",
- GetMethodRequestParamMaybe(method), "returntype",
- GetMethodReturnTypeClient(method));
-
- // overload taking CallOptions as a param
- GenerateDocCommentBody(out, method);
- out->Print(
- "$returntype$ $methodname$($request_maybe$CallOptions options);\n",
- "methodname", method_name, "request_maybe",
- GetMethodRequestParamMaybe(method), "returntype",
- GetMethodReturnTypeClient(method));
- }
- out->Outdent();
- out->Print("}\n");
- out->Print("\n");
-}
-
-void GenerateServerInterface(Printer* out, const ServiceDescriptor *service) {
- out->Print("/// <summary>Interface of server-side implementations of $servicename$</summary>\n",
- "servicename", GetServiceClassName(service));
- out->Print("[System.Obsolete(\"Service implementations should inherit"
- " from the generated abstract base class instead.\")]\n");
- out->Print("public interface $name$\n", "name",
- GetServerInterfaceName(service));
- out->Print("{\n");
- out->Indent();
- for (int i = 0; i < service->method_count(); i++) {
- const MethodDescriptor *method = service->method(i);
- GenerateDocCommentBody(out, method);
- out->Print(
- "$returntype$ $methodname$($request$$response_stream_maybe$, "
- "ServerCallContext context);\n",
- "methodname", method->name(), "returntype",
- GetMethodReturnTypeServer(method), "request",
- GetMethodRequestParamServer(method), "response_stream_maybe",
- GetMethodResponseStreamMaybe(method));
- }
- out->Outdent();
- out->Print("}\n");
- out->Print("\n");
-}
-
void GenerateServerClass(Printer* out, const ServiceDescriptor *service) {
out->Print("/// <summary>Base class for server-side implementations of $servicename$</summary>\n",
"servicename", GetServiceClassName(service));
@@ -414,12 +326,9 @@ void GenerateServerClass(Printer* out, const ServiceDescriptor *service) {
void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
out->Print("/// <summary>Client for $servicename$</summary>\n",
"servicename", GetServiceClassName(service));
- out->Print("#pragma warning disable 0618\n");
out->Print(
- "public class $name$ : ClientBase<$name$>, $interface$\n",
- "name", GetClientClassName(service),
- "interface", GetClientInterfaceName(service));
- out->Print("#pragma warning restore 0618\n");
+ "public class $name$ : ClientBase<$name$>\n",
+ "name", GetClientClassName(service));
out->Print("{\n");
out->Indent();
@@ -546,22 +455,16 @@ void GenerateClientStub(Printer* out, const ServiceDescriptor *service) {
out->Print("\n");
}
-void GenerateBindServiceMethod(Printer* out, const ServiceDescriptor *service,
- bool use_server_class) {
+void GenerateBindServiceMethod(Printer* out, const ServiceDescriptor *service) {
out->Print(
"/// <summary>Creates service definition that can be registered with a server</summary>\n");
- out->Print("#pragma warning disable 0618\n");
out->Print(
- "public static ServerServiceDefinition BindService($interface$ serviceImpl)\n",
- "interface", use_server_class ? GetServerClassName(service) :
- GetServerInterfaceName(service));
- out->Print("#pragma warning restore 0618\n");
+ "public static ServerServiceDefinition BindService($implclass$ serviceImpl)\n",
+ "implclass", GetServerClassName(service));
out->Print("{\n");
out->Indent();
- out->Print(
- "return ServerServiceDefinition.CreateBuilder($servicenamefield$)\n",
- "servicenamefield", GetServiceNameFieldName());
+ out->Print("return ServerServiceDefinition.CreateBuilder()\n");
out->Indent();
out->Indent();
for (int i = 0; i < service->method_count(); i++) {
@@ -616,11 +519,7 @@ void GenerateService(Printer* out, const ServiceDescriptor *service,
}
GenerateServiceDescriptorProperty(out, service);
- if (generate_client) {
- GenerateClientInterface(out, service);
- }
if (generate_server) {
- GenerateServerInterface(out, service);
GenerateServerClass(out, service);
}
if (generate_client) {
@@ -628,8 +527,7 @@ void GenerateService(Printer* out, const ServiceDescriptor *service,
GenerateNewStubMethods(out, service);
}
if (generate_server) {
- GenerateBindServiceMethod(out, service, false);
- GenerateBindServiceMethod(out, service, true);
+ GenerateBindServiceMethod(out, service);
}
out->Outdent();
diff --git a/src/core/ext/census/grpc_filter.c b/src/core/ext/census/grpc_filter.c
index 8c4c17ad9a..86ec2ae87a 100644
--- a/src/core/ext/census/grpc_filter.c
+++ b/src/core/ext/census/grpc_filter.c
@@ -180,7 +180,7 @@ const grpc_channel_filter grpc_client_census_filter = {
grpc_channel_next_op,
sizeof(call_data),
client_init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
client_destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
@@ -193,7 +193,7 @@ const grpc_channel_filter grpc_server_census_filter = {
grpc_channel_next_op,
sizeof(call_data),
server_init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
server_destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c
index 713c2c7814..e297dfa0ef 100644
--- a/src/core/ext/client_config/client_channel.c
+++ b/src/core/ext/client_config/client_channel.c
@@ -376,7 +376,7 @@ static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
int r;
GRPC_LB_POLICY_REF(lb_policy, "cc_pick_subchannel");
gpr_mu_unlock(&chand->mu_config);
- r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollset,
+ r = grpc_lb_policy_pick(exec_ctx, lb_policy, calld->pollent,
initial_metadata, initial_metadata_flags,
connected_subchannel, on_ready);
GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "cc_pick_subchannel");
@@ -461,10 +461,11 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
gpr_mu_destroy(&chand->mu_config);
}
-static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_pollset *pollset) {
+static void cc_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_polling_entity *pollent) {
call_data *calld = elem->call_data;
- calld->pollset = pollset;
+ calld->pollent = pollent;
}
const grpc_channel_filter grpc_client_channel_filter = {
@@ -472,7 +473,7 @@ const grpc_channel_filter grpc_client_channel_filter = {
cc_start_transport_op,
sizeof(call_data),
init_call_elem,
- cc_set_pollset,
+ cc_set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/ext/client_config/lb_policy.c b/src/core/ext/client_config/lb_policy.c
index a7ad9842dc..20535398d6 100644
--- a/src/core/ext/client_config/lb_policy.c
+++ b/src/core/ext/client_config/lb_policy.c
@@ -99,12 +99,12 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx,
}
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
- return policy->vtable->pick(exec_ctx, policy, pollset, initial_metadata,
+ return policy->vtable->pick(exec_ctx, policy, pollent, initial_metadata,
initial_metadata_flags, target, on_complete);
}
diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h
index 0384e0b2eb..56fa11198b 100644
--- a/src/core/ext/client_config/lb_policy.h
+++ b/src/core/ext/client_config/lb_policy.h
@@ -35,6 +35,7 @@
#define GRPC_CORE_EXT_CLIENT_CONFIG_LB_POLICY_H
#include "src/core/ext/client_config/subchannel.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A load balancing policy: specified by a vtable and a struct (which
@@ -59,7 +60,8 @@ struct grpc_lb_policy_vtable {
/** implement grpc_lb_policy_pick */
int (*pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target, grpc_closure *on_complete);
void (*cancel_pick)(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
@@ -124,7 +126,7 @@ void grpc_lb_policy_init(grpc_lb_policy *policy,
\a target.
Picking can be asynchronous. Any IO should be done under \a pollset. */
int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 3a635be02f..85183b0564 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -683,7 +683,7 @@ grpc_connected_subchannel *grpc_subchannel_get_connected_subchannel(
grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *con,
- grpc_pollset *pollset) {
+ grpc_polling_entity *pollent) {
grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
grpc_subchannel_call *call =
gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
@@ -692,7 +692,7 @@ grpc_subchannel_call *grpc_connected_subchannel_create_call(
GRPC_CONNECTED_SUBCHANNEL_REF(con, "subchannel_call");
grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
NULL, NULL, callstk);
- grpc_call_stack_set_pollset(exec_ctx, callstk, pollset);
+ grpc_call_stack_set_pollset_or_pollset_set(exec_ctx, callstk, pollent);
return call;
}
diff --git a/src/core/ext/client_config/subchannel.h b/src/core/ext/client_config/subchannel.h
index 0765a544e8..525f854a44 100644
--- a/src/core/ext/client_config/subchannel.h
+++ b/src/core/ext/client_config/subchannel.h
@@ -36,6 +36,7 @@
#include "src/core/ext/client_config/connector.h"
#include "src/core/lib/channel/channel_stack.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/connectivity_state.h"
/** A (sub-)channel that knows how to connect to exactly one target
@@ -109,7 +110,7 @@ void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
/** construct a subchannel call */
grpc_subchannel_call *grpc_connected_subchannel_create_call(
grpc_exec_ctx *exec_ctx, grpc_connected_subchannel *connected_subchannel,
- grpc_pollset *pollset);
+ grpc_polling_entity *pollent);
/** process a transport level op */
void grpc_connected_subchannel_process_transport_op(
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 91fa917661..3df1f254d6 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -68,6 +68,7 @@ void grpc_subchannel_call_holder_init(
holder->waiting_ops_capacity = 0;
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
holder->owning_call = owning_call;
+ holder->pollent = NULL;
}
void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
@@ -157,7 +158,7 @@ retry:
gpr_atm_rel_store(
&holder->subchannel_call,
(gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollset));
+ exec_ctx, holder->connected_subchannel, holder->pollent));
retry_waiting_locked(exec_ctx, holder);
goto retry;
}
@@ -183,7 +184,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
gpr_atm_rel_store(
&holder->subchannel_call,
(gpr_atm)(uintptr_t)grpc_connected_subchannel_create_call(
- exec_ctx, holder->connected_subchannel, holder->pollset));
+ exec_ctx, holder->connected_subchannel, holder->pollent));
retry_waiting_locked(exec_ctx, holder);
}
gpr_mu_unlock(&holder->mu);
diff --git a/src/core/ext/client_config/subchannel_call_holder.h b/src/core/ext/client_config/subchannel_call_holder.h
index 9299908788..8d2deb02f3 100644
--- a/src/core/ext/client_config/subchannel_call_holder.h
+++ b/src/core/ext/client_config/subchannel_call_holder.h
@@ -35,6 +35,7 @@
#define GRPC_CORE_EXT_CLIENT_CONFIG_SUBCHANNEL_CALL_HOLDER_H
#include "src/core/ext/client_config/subchannel.h"
+#include "src/core/lib/iomgr/polling_entity.h"
/** Pick a subchannel for grpc_subchannel_call_holder;
Return 1 if subchannel is available immediately (in which case on_ready
@@ -71,7 +72,7 @@ typedef struct grpc_subchannel_call_holder {
grpc_subchannel_call_holder_creation_phase creation_phase;
grpc_connected_subchannel *connected_subchannel;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
grpc_transport_stream_op *waiting_ops;
size_t waiting_ops_count;
diff --git a/src/core/ext/lb_policy/pick_first/pick_first.c b/src/core/ext/lb_policy/pick_first/pick_first.c
index b5797da832..cc559eb2da 100644
--- a/src/core/ext/lb_policy/pick_first/pick_first.c
+++ b/src/core/ext/lb_policy/pick_first/pick_first.c
@@ -39,7 +39,7 @@
typedef struct pending_pick {
struct pending_pick *next;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
@@ -118,8 +118,8 @@ static void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
while (pp != NULL) {
pending_pick *next = pp->next;
*pp->target = NULL;
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
pp = next;
@@ -136,8 +136,8 @@ static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
@@ -162,8 +162,8 @@ static void pf_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
} else {
@@ -196,7 +196,8 @@ static void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
@@ -221,10 +222,11 @@ static int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollset = pollset;
+ pp->pollent = pollent;
pp->target = target;
pp->initial_metadata_flags = initial_metadata_flags;
pp->on_complete = on_complete;
@@ -304,8 +306,8 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
while ((pp = p->pending_picks)) {
p->pending_picks = pp->next;
*pp->target = selected;
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 7ab913b645..8645333c8e 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -48,7 +48,7 @@ int grpc_lb_round_robin_trace = 0;
* Once a pick is available, \a target is updated and \a on_complete called. */
typedef struct pending_pick {
struct pending_pick *next;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
uint32_t initial_metadata_flags;
grpc_connected_subchannel **target;
grpc_closure *on_complete;
@@ -262,8 +262,8 @@ static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
while (pp != NULL) {
pending_pick *next = pp->next;
if (pp->target == target) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
*target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
@@ -288,8 +288,8 @@ static void rr_cancel_picks(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
pending_pick *next = pp->next;
if ((pp->initial_metadata_flags & initial_metadata_flags_mask) ==
initial_metadata_flags_eq) {
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
*pp->target = NULL;
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, false, NULL);
gpr_free(pp);
@@ -331,7 +331,8 @@ static void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
}
static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
- grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
+ grpc_polling_entity *pollent,
+ grpc_metadata_batch *initial_metadata,
uint32_t initial_metadata_flags,
grpc_connected_subchannel **target,
grpc_closure *on_complete) {
@@ -354,10 +355,11 @@ static int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
if (!p->started_picking) {
start_picking(exec_ctx, p);
}
- grpc_pollset_set_add_pollset(exec_ctx, p->base.interested_parties, pollset);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, pollent,
+ p->base.interested_parties);
pp = gpr_malloc(sizeof(*pp));
pp->next = p->pending_picks;
- pp->pollset = pollset;
+ pp->pollent = pollent;
pp->target = target;
pp->on_complete = on_complete;
pp->initial_metadata_flags = initial_metadata_flags;
@@ -406,8 +408,8 @@ static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
"[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
selected->subchannel, selected);
}
- grpc_pollset_set_del_pollset(exec_ctx, p->base.interested_parties,
- pp->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, pp->pollent,
+ p->base.interested_parties);
grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, true, NULL);
gpr_free(pp);
}
diff --git a/src/core/ext/load_reporting/load_reporting_filter.c b/src/core/ext/load_reporting/load_reporting_filter.c
index f49730fac3..f372f88c3a 100644
--- a/src/core/ext/load_reporting/load_reporting_filter.c
+++ b/src/core/ext/load_reporting/load_reporting_filter.c
@@ -142,7 +142,7 @@ const grpc_channel_filter grpc_load_reporting_filter = {
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index 1abc49f435..046b395001 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -1734,6 +1734,13 @@ static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
add_to_pollset_locked, pollset, 0);
}
+static void set_pollset_set(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset_set *pollset_set) {
+ grpc_chttp2_run_with_global_lock(exec_ctx, (grpc_chttp2_transport *)gt,
+ (grpc_chttp2_stream *)gs,
+ add_to_pollset_set_locked, pollset_set, 0);
+}
+
/*******************************************************************************
* BYTE STREAM
*/
@@ -2055,6 +2062,7 @@ static const grpc_transport_vtable vtable = {sizeof(grpc_chttp2_stream),
"chttp2",
init_stream,
set_pollset,
+ set_pollset_set,
perform_stream_op,
perform_transport_op,
destroy_stream,
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index df160aaaf6..1bb53b756c 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -152,6 +152,10 @@ static void next_recv_step(stream_obj *s, enum e_caller caller);
static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_pollset *pollset) {}
+static void set_pollset_set_do_nothing(grpc_exec_ctx *exec_ctx,
+ grpc_transport *gt, grpc_stream *gs,
+ grpc_pollset_set *pollset_set) {}
+
static void enqueue_callbacks(grpc_closure *callback_list[]) {
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
if (callback_list[0]) {
@@ -646,7 +650,13 @@ static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
}
}
-const grpc_transport_vtable grpc_cronet_vtable = {
- sizeof(stream_obj), "cronet_http", init_stream,
- set_pollset_do_nothing, perform_stream_op, NULL,
- destroy_stream, destroy_transport, NULL};
+const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
+ "cronet_http",
+ init_stream,
+ set_pollset_do_nothing,
+ set_pollset_set_do_nothing,
+ perform_stream_op,
+ NULL,
+ destroy_stream,
+ destroy_transport,
+ NULL};
diff --git a/src/core/lib/channel/channel_stack.c b/src/core/lib/channel/channel_stack.c
index 4892ed283d..5c161652ac 100644
--- a/src/core/lib/channel/channel_stack.c
+++ b/src/core/lib/channel/channel_stack.c
@@ -189,9 +189,9 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
}
}
-void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
- grpc_call_stack *call_stack,
- grpc_pollset *pollset) {
+void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_call_stack *call_stack,
+ grpc_polling_entity *pollent) {
size_t count = call_stack->count;
grpc_call_element *call_elems;
char *user_data;
@@ -203,15 +203,16 @@ void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
/* init per-filter data */
for (i = 0; i < count; i++) {
- call_elems[i].filter->set_pollset(exec_ctx, &call_elems[i], pollset);
+ call_elems[i].filter->set_pollset_or_pollset_set(exec_ctx, &call_elems[i],
+ pollent);
user_data +=
ROUND_UP_TO_ALIGNMENT_SIZE(call_elems[i].filter->sizeof_call_data);
}
}
-void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_pollset *pollset) {}
+void grpc_call_stack_ignore_set_pollset_or_pollset_set(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_polling_entity *pollent) {}
void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
const grpc_call_stats *call_stats,
diff --git a/src/core/lib/channel/channel_stack.h b/src/core/lib/channel/channel_stack.h
index 2040002269..3ca643c893 100644
--- a/src/core/lib/channel/channel_stack.h
+++ b/src/core/lib/channel/channel_stack.h
@@ -48,6 +48,7 @@
#include <grpc/support/time.h>
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/transport/transport.h"
typedef struct grpc_channel_element grpc_channel_element;
@@ -109,8 +110,9 @@ typedef struct {
argument. */
void (*init_call_elem)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_call_element_args *args);
- void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_pollset *pollset);
+ void (*set_pollset_or_pollset_set)(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_polling_entity *pollent);
/* Destroy per call data.
The filter does not need to do any chaining.
The bottom filter of a stack will be passed a non-NULL pointer to
@@ -210,10 +212,11 @@ void grpc_call_stack_init(grpc_exec_ctx *exec_ctx,
grpc_call_context_element *context,
const void *transport_server_data,
grpc_call_stack *call_stack);
-/* Set a pollset for a call stack: must occur before the first op is started */
-void grpc_call_stack_set_pollset(grpc_exec_ctx *exec_ctx,
- grpc_call_stack *call_stack,
- grpc_pollset *pollset);
+/* Set a pollset or a pollset_set for a call stack: must occur before the first
+ * op is started */
+void grpc_call_stack_set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_call_stack *call_stack,
+ grpc_polling_entity *pollent);
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
#define GRPC_CALL_STACK_REF(call_stack, reason) \
@@ -240,11 +243,11 @@ void grpc_call_stack_destroy(grpc_exec_ctx *exec_ctx, grpc_call_stack *stack,
const grpc_call_stats *call_stats,
void *and_free_memory);
-/* Ignore set pollset - used by filters to implement the set_pollset method
- if they don't care about pollsets at all. Does nothing. */
-void grpc_call_stack_ignore_set_pollset(grpc_exec_ctx *exec_ctx,
- grpc_call_element *elem,
- grpc_pollset *pollset);
+/* Ignore set pollset{_set} - used by filters if they don't care about pollsets
+ * at all. Does nothing. */
+void grpc_call_stack_ignore_set_pollset_or_pollset_set(
+ grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
+ grpc_polling_entity *pollent);
/* Call the next operation in a call stack */
void grpc_call_next_op(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_transport_stream_op *op);
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 79aa5da2e9..4fe9a7f045 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -318,7 +318,7 @@ const grpc_channel_filter grpc_compress_filter = {
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/lib/channel/connected_channel.c b/src/core/lib/channel/connected_channel.c
index 06e87b005f..0a7d27a1dc 100644
--- a/src/core/lib/channel/connected_channel.c
+++ b/src/core/lib/channel/connected_channel.c
@@ -93,12 +93,13 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
GPR_ASSERT(r == 0);
}
-static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_pollset *pollset) {
+static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_polling_entity *pollent) {
call_data *calld = elem->call_data;
channel_data *chand = elem->channel_data;
- grpc_transport_set_pollset(exec_ctx, chand->transport,
- TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollset);
+ grpc_transport_set_pops(exec_ctx, chand->transport,
+ TRANSPORT_STREAM_FROM_CALL_DATA(calld), pollent);
}
/* Destructor for call_data */
@@ -138,7 +139,7 @@ static const grpc_channel_filter connected_channel_filter = {
con_start_transport_op,
sizeof(call_data),
init_call_elem,
- set_pollset,
+ set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/lib/channel/http_client_filter.c b/src/core/lib/channel/http_client_filter.c
index cd9e6e894b..7cb66797eb 100644
--- a/src/core/lib/channel/http_client_filter.c
+++ b/src/core/lib/channel/http_client_filter.c
@@ -250,7 +250,7 @@ const grpc_channel_filter grpc_http_client_filter = {
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/lib/channel/http_server_filter.c b/src/core/lib/channel/http_server_filter.c
index 43d71af473..aa8e377571 100644
--- a/src/core/lib/channel/http_server_filter.c
+++ b/src/core/lib/channel/http_server_filter.c
@@ -244,7 +244,7 @@ const grpc_channel_filter grpc_http_server_filter = {
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/lib/http/httpcli.c b/src/core/lib/http/httpcli.c
index f22721ac8f..e8957bfe89 100644
--- a/src/core/lib/http/httpcli.c
+++ b/src/core/lib/http/httpcli.c
@@ -62,7 +62,7 @@ typedef struct {
grpc_httpcli_response_cb on_response;
void *user_data;
grpc_httpcli_context *context;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
grpc_iomgr_object iomgr_obj;
gpr_slice_buffer incoming;
gpr_slice_buffer outgoing;
@@ -97,8 +97,8 @@ static void next_address(grpc_exec_ctx *exec_ctx, internal_request *req);
static void finish(grpc_exec_ctx *exec_ctx, internal_request *req,
int success) {
- grpc_pollset_set_del_pollset(exec_ctx, req->context->pollset_set,
- req->pollset);
+ grpc_polling_entity_del_from_pollset_set(exec_ctx, req->pollent,
+ req->context->pollset_set);
req->on_response(exec_ctx, req->user_data,
success ? &req->parser.http.response : NULL);
grpc_http_parser_destroy(&req->parser);
@@ -222,7 +222,7 @@ static void on_resolved(grpc_exec_ctx *exec_ctx, void *arg,
static void internal_request_begin(
grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
- grpc_pollset *pollset, const grpc_httpcli_request *request,
+ grpc_polling_entity *pollent, const grpc_httpcli_request *request,
gpr_timespec deadline, grpc_httpcli_response_cb on_response,
void *user_data, const char *name, gpr_slice request_text) {
internal_request *req = gpr_malloc(sizeof(internal_request));
@@ -235,7 +235,7 @@ static void internal_request_begin(
req->handshaker =
request->handshaker ? request->handshaker : &grpc_httpcli_plaintext;
req->context = context;
- req->pollset = pollset;
+ req->pollent = pollent;
grpc_closure_init(&req->on_read, on_read, req);
grpc_closure_init(&req->done_write, done_write, req);
gpr_slice_buffer_init(&req->incoming);
@@ -244,14 +244,15 @@ static void internal_request_begin(
req->host = gpr_strdup(request->host);
req->ssl_host_override = gpr_strdup(request->ssl_host_override);
- grpc_pollset_set_add_pollset(exec_ctx, req->context->pollset_set,
- req->pollset);
+ GPR_ASSERT(pollent);
+ grpc_polling_entity_add_to_pollset_set(exec_ctx, req->pollent,
+ req->context->pollset_set);
grpc_resolve_address(exec_ctx, request->host, req->handshaker->default_port,
on_resolved, req);
}
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
const grpc_httpcli_request *request,
gpr_timespec deadline,
grpc_httpcli_response_cb on_response, void *user_data) {
@@ -261,14 +262,14 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
return;
}
gpr_asprintf(&name, "HTTP:GET:%s:%s", request->host, request->http.path);
- internal_request_begin(exec_ctx, context, pollset, request, deadline,
+ internal_request_begin(exec_ctx, context, pollent, request, deadline,
on_response, user_data, name,
grpc_httpcli_format_get_request(request));
gpr_free(name);
}
void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
gpr_timespec deadline,
@@ -281,7 +282,7 @@ void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
}
gpr_asprintf(&name, "HTTP:POST:%s:%s", request->host, request->http.path);
internal_request_begin(
- exec_ctx, context, pollset, request, deadline, on_response, user_data,
+ exec_ctx, context, pollent, request, deadline, on_response, user_data,
name, grpc_httpcli_format_post_request(request, body_bytes, body_size));
gpr_free(name);
}
diff --git a/src/core/lib/http/httpcli.h b/src/core/lib/http/httpcli.h
index 11a32a125c..7e7784f1ab 100644
--- a/src/core/lib/http/httpcli.h
+++ b/src/core/lib/http/httpcli.h
@@ -41,6 +41,7 @@
#include "src/core/lib/http/parser.h"
#include "src/core/lib/iomgr/endpoint.h"
#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset_set.h"
/* User agent this library reports */
@@ -100,7 +101,7 @@ void grpc_httpcli_context_destroy(grpc_httpcli_context *context);
'on_response' is a callback to report results to (and 'user_data' is a user
supplied pointer to pass to said call) */
void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
const grpc_httpcli_request *request,
gpr_timespec deadline,
grpc_httpcli_response_cb on_response, void *user_data);
@@ -121,7 +122,7 @@ void grpc_httpcli_get(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
supplied pointer to pass to said call)
Does not support ?var1=val1&var2=val2 in the path. */
void grpc_httpcli_post(grpc_exec_ctx *exec_ctx, grpc_httpcli_context *context,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
const grpc_httpcli_request *request,
const char *body_bytes, size_t body_size,
gpr_timespec deadline,
diff --git a/src/core/lib/iomgr/polling_entity.c b/src/core/lib/iomgr/polling_entity.c
new file mode 100644
index 0000000000..d1686aa12f
--- /dev/null
+++ b/src/core/lib/iomgr/polling_entity.c
@@ -0,0 +1,104 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/iomgr/polling_entity.h"
+
+grpc_polling_entity grpc_polling_entity_create_from_pollset_set(
+ grpc_pollset_set *pollset_set) {
+ grpc_polling_entity pollent;
+ pollent.pollent.pollset_set = pollset_set;
+ pollent.tag = POPS_POLLSET_SET;
+ return pollent;
+}
+
+grpc_polling_entity grpc_polling_entity_create_from_pollset(
+ grpc_pollset *pollset) {
+ grpc_polling_entity pollent;
+ pollent.pollent.pollset = pollset;
+ pollent.tag = POPS_POLLSET;
+ return pollent;
+}
+
+grpc_pollset *grpc_polling_entity_pollset(grpc_polling_entity *pollent) {
+ if (pollent->tag == POPS_POLLSET) {
+ return pollent->pollent.pollset;
+ }
+ return NULL;
+}
+
+grpc_pollset_set *grpc_polling_entity_pollset_set(
+ grpc_polling_entity *pollent) {
+ if (pollent->tag == POPS_POLLSET_SET) {
+ return pollent->pollent.pollset_set;
+ }
+ return NULL;
+}
+
+bool grpc_polling_entity_is_empty(const grpc_polling_entity *pollent) {
+ return pollent->tag == POPS_NONE;
+}
+
+void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_polling_entity *pollent,
+ grpc_pollset_set *pss_dst) {
+ if (pollent->tag == POPS_POLLSET) {
+ GPR_ASSERT(pollent->pollent.pollset != NULL);
+ grpc_pollset_set_add_pollset(exec_ctx, pss_dst, pollent->pollent.pollset);
+ } else if (pollent->tag == POPS_POLLSET_SET) {
+ GPR_ASSERT(pollent->pollent.pollset_set != NULL);
+ grpc_pollset_set_add_pollset_set(exec_ctx, pss_dst,
+ pollent->pollent.pollset_set);
+ } else {
+ gpr_log(GPR_ERROR, "Invalid grpc_polling_entity tag '%d'", pollent->tag);
+ abort();
+ }
+}
+
+void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_polling_entity *pollent,
+ grpc_pollset_set *pss_dst) {
+ if (pollent->tag == POPS_POLLSET) {
+ GPR_ASSERT(pollent->pollent.pollset != NULL);
+ grpc_pollset_set_del_pollset(exec_ctx, pss_dst, pollent->pollent.pollset);
+ } else if (pollent->tag == POPS_POLLSET_SET) {
+ GPR_ASSERT(pollent->pollent.pollset_set != NULL);
+ grpc_pollset_set_del_pollset_set(exec_ctx, pss_dst,
+ pollent->pollent.pollset_set);
+ } else {
+ gpr_log(GPR_ERROR, "Invalid grpc_polling_entity tag '%d'", pollent->tag);
+ abort();
+ }
+}
diff --git a/src/core/lib/iomgr/polling_entity.h b/src/core/lib/iomgr/polling_entity.h
new file mode 100644
index 0000000000..e81531053c
--- /dev/null
+++ b/src/core/lib/iomgr/polling_entity.h
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H
+#define GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H
+
+#include "src/core/lib/iomgr/pollset.h"
+#include "src/core/lib/iomgr/pollset_set.h"
+
+/* A grpc_polling_entity is a pollset-or-pollset_set container. It allows
+ * functions that
+ * accept a pollset XOR a pollset_set to do so through an abstract interface.
+ * No ownership is taken. */
+
+typedef struct grpc_polling_entity {
+ union {
+ grpc_pollset *pollset;
+ grpc_pollset_set *pollset_set;
+ } pollent;
+ enum pops_tag { POPS_NONE, POPS_POLLSET, POPS_POLLSET_SET } tag;
+} grpc_polling_entity;
+
+grpc_polling_entity grpc_polling_entity_create_from_pollset_set(
+ grpc_pollset_set *pollset_set);
+grpc_polling_entity grpc_polling_entity_create_from_pollset(
+ grpc_pollset *pollset);
+
+/** If \a pollent contains a pollset, return it. Otherwise, return NULL */
+grpc_pollset *grpc_polling_entity_pollset(grpc_polling_entity *pollent);
+
+/** If \a pollent contains a pollset_set, return it. Otherwise, return NULL */
+grpc_pollset_set *grpc_polling_entity_pollset_set(grpc_polling_entity *pollent);
+
+bool grpc_polling_entity_is_empty(const grpc_polling_entity *pollent);
+
+/** Add the pollset or pollset_set in \a pollent to the destination pollset_set
+ * \a
+ * pss_dst */
+void grpc_polling_entity_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_polling_entity *pollent,
+ grpc_pollset_set *pss_dst);
+
+/** Delete the pollset or pollset_set in \a pollent from the destination
+ * pollset_set \a
+ * pss_dst */
+void grpc_polling_entity_del_from_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_polling_entity *pollent,
+ grpc_pollset_set *pss_dst);
+/* pollset_set specific */
+
+#endif /* GRPC_CORE_LIB_IOMGR_POLLING_ENTITY_H */
diff --git a/src/core/lib/iomgr/pollset_set_windows.c b/src/core/lib/iomgr/pollset_set_windows.c
index 89f60b92fb..a35a9766fc 100644
--- a/src/core/lib/iomgr/pollset_set_windows.c
+++ b/src/core/lib/iomgr/pollset_set_windows.c
@@ -32,12 +32,15 @@
*/
#include <grpc/support/port_platform.h>
+#include <stdint.h>
#ifdef GPR_WINSOCK_SOCKET
#include "src/core/lib/iomgr/pollset_set_windows.h"
-grpc_pollset_set* grpc_pollset_set_create(void) { return NULL; }
+grpc_pollset_set* grpc_pollset_set_create(void) {
+ return (grpc_pollset_set*)((intptr_t)0xdeafbeef);
+}
void grpc_pollset_set_destroy(grpc_pollset_set* pollset_set) {}
diff --git a/src/core/lib/iomgr/timer.c b/src/core/lib/iomgr/timer.c
index acb5b26c87..05a2fc104f 100644
--- a/src/core/lib/iomgr/timer.c
+++ b/src/core/lib/iomgr/timer.c
@@ -278,7 +278,8 @@ static int refill_queue(shard_type *shard, gpr_timespec now) {
return !grpc_timer_heap_is_empty(&shard->heap);
}
-/* This pops the next non-cancelled timer with deadline <= now from the queue,
+/* This pollent the next non-cancelled timer with deadline <= now from the
+ queue,
or returns NULL if there isn't one.
REQUIRES: shard->mu locked */
static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
diff --git a/src/core/lib/security/credentials/composite/composite_credentials.c b/src/core/lib/security/credentials/composite/composite_credentials.c
index 18189a8fb8..07db8bfd75 100644
--- a/src/core/lib/security/credentials/composite/composite_credentials.c
+++ b/src/core/lib/security/credentials/composite/composite_credentials.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -35,6 +35,7 @@
#include <string.h>
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/surface/api_trace.h"
#include <grpc/support/alloc.h>
@@ -49,7 +50,7 @@ typedef struct {
grpc_credentials_md_store *md_elems;
grpc_auth_metadata_context auth_md_context;
void *user_data;
- grpc_pollset *pollset;
+ grpc_polling_entity *pollent;
grpc_credentials_metadata_cb cb;
} grpc_composite_call_credentials_metadata_context;
@@ -93,7 +94,7 @@ static void composite_call_metadata_cb(grpc_exec_ctx *exec_ctx, void *user_data,
grpc_call_credentials *inner_creds =
ctx->composite_creds->inner.creds_array[ctx->creds_index++];
grpc_call_credentials_get_request_metadata(
- exec_ctx, inner_creds, ctx->pollset, ctx->auth_md_context,
+ exec_ctx, inner_creds, ctx->pollent, ctx->auth_md_context,
composite_call_metadata_cb, ctx);
return;
}
@@ -106,7 +107,7 @@ static void composite_call_metadata_cb(grpc_exec_ctx *exec_ctx, void *user_data,
static void composite_call_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
- grpc_pollset *pollset, grpc_auth_metadata_context auth_md_context,
+ grpc_polling_entity *pollent, grpc_auth_metadata_context auth_md_context,
grpc_credentials_metadata_cb cb, void *user_data) {
grpc_composite_call_credentials *c = (grpc_composite_call_credentials *)creds;
grpc_composite_call_credentials_metadata_context *ctx;
@@ -117,10 +118,10 @@ static void composite_call_get_request_metadata(
ctx->user_data = user_data;
ctx->cb = cb;
ctx->composite_creds = c;
- ctx->pollset = pollset;
+ ctx->pollent = pollent;
ctx->md_elems = grpc_credentials_md_store_create(c->inner.num_creds);
grpc_call_credentials_get_request_metadata(
- exec_ctx, c->inner.creds_array[ctx->creds_index++], pollset,
+ exec_ctx, c->inner.creds_array[ctx->creds_index++], ctx->pollent,
auth_md_context, composite_call_metadata_cb, ctx);
}
diff --git a/src/core/lib/security/credentials/credentials.c b/src/core/lib/security/credentials/credentials.c
index 3dde6e587d..f45a8d8ff6 100644
--- a/src/core/lib/security/credentials/credentials.c
+++ b/src/core/lib/security/credentials/credentials.c
@@ -111,7 +111,7 @@ void grpc_call_credentials_release(grpc_call_credentials *creds) {
void grpc_call_credentials_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
- grpc_pollset *pollset, grpc_auth_metadata_context context,
+ grpc_polling_entity *pollent, grpc_auth_metadata_context context,
grpc_credentials_metadata_cb cb, void *user_data) {
if (creds == NULL || creds->vtable->get_request_metadata == NULL) {
if (cb != NULL) {
@@ -119,7 +119,7 @@ void grpc_call_credentials_get_request_metadata(
}
return;
}
- creds->vtable->get_request_metadata(exec_ctx, creds, pollset, context, cb,
+ creds->vtable->get_request_metadata(exec_ctx, creds, pollent, context, cb,
user_data);
}
diff --git a/src/core/lib/security/credentials/credentials.h b/src/core/lib/security/credentials/credentials.h
index 5f44c7c3e3..15dcfe473d 100644
--- a/src/core/lib/security/credentials/credentials.h
+++ b/src/core/lib/security/credentials/credentials.h
@@ -41,6 +41,7 @@
#include "src/core/lib/http/httpcli.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/security/transport/security_connector.h"
struct grpc_http_response;
@@ -164,7 +165,8 @@ typedef void (*grpc_credentials_metadata_cb)(grpc_exec_ctx *exec_ctx,
typedef struct {
void (*destruct)(grpc_call_credentials *c);
void (*get_request_metadata)(grpc_exec_ctx *exec_ctx,
- grpc_call_credentials *c, grpc_pollset *pollset,
+ grpc_call_credentials *c,
+ grpc_polling_entity *pollent,
grpc_auth_metadata_context context,
grpc_credentials_metadata_cb cb,
void *user_data);
@@ -180,7 +182,7 @@ grpc_call_credentials *grpc_call_credentials_ref(grpc_call_credentials *creds);
void grpc_call_credentials_unref(grpc_call_credentials *creds);
void grpc_call_credentials_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
- grpc_pollset *pollset, grpc_auth_metadata_context context,
+ grpc_polling_entity *pollent, grpc_auth_metadata_context context,
grpc_credentials_metadata_cb cb, void *user_data);
/* Metadata-only credentials with the specified key and value where
diff --git a/src/core/lib/security/credentials/fake/fake_credentials.c b/src/core/lib/security/credentials/fake/fake_credentials.c
index 54d7cf2581..005777d3c6 100644
--- a/src/core/lib/security/credentials/fake/fake_credentials.c
+++ b/src/core/lib/security/credentials/fake/fake_credentials.c
@@ -106,7 +106,7 @@ static void on_simulated_token_fetch_done(grpc_exec_ctx *exec_ctx,
static void md_only_test_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
- grpc_pollset *pollset, grpc_auth_metadata_context context,
+ grpc_polling_entity *pollent, grpc_auth_metadata_context context,
grpc_credentials_metadata_cb cb, void *user_data) {
grpc_md_only_test_credentials *c = (grpc_md_only_test_credentials *)creds;
diff --git a/src/core/lib/security/credentials/google_default/google_default_credentials.c b/src/core/lib/security/credentials/google_default/google_default_credentials.c
index a521d95abc..98df68e7b3 100644
--- a/src/core/lib/security/credentials/google_default/google_default_credentials.c
+++ b/src/core/lib/security/credentials/google_default/google_default_credentials.c
@@ -41,6 +41,7 @@
#include "src/core/lib/http/httpcli.h"
#include "src/core/lib/http/parser.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/security/credentials/jwt/jwt_credentials.h"
#include "src/core/lib/security/credentials/oauth2/oauth2_credentials.h"
#include "src/core/lib/support/env.h"
@@ -62,7 +63,7 @@ static gpr_once g_once = GPR_ONCE_INIT;
static void init_default_credentials(void) { gpr_mu_init(&g_state_mu); }
typedef struct {
- grpc_pollset *pollset;
+ grpc_polling_entity pollent;
int is_done;
int success;
} compute_engine_detector;
@@ -86,7 +87,7 @@ static void on_compute_engine_detection_http_response(
}
gpr_mu_lock(g_polling_mu);
detector->is_done = 1;
- grpc_pollset_kick(detector->pollset, NULL);
+ grpc_pollset_kick(grpc_polling_entity_pollset(&detector->pollent), NULL);
gpr_mu_unlock(g_polling_mu);
}
@@ -105,8 +106,9 @@ static int is_stack_running_on_compute_engine(void) {
on compute engine. */
gpr_timespec max_detection_delay = gpr_time_from_seconds(1, GPR_TIMESPAN);
- detector.pollset = gpr_malloc(grpc_pollset_size());
- grpc_pollset_init(detector.pollset, &g_polling_mu);
+ grpc_pollset *pollset = gpr_malloc(grpc_pollset_size());
+ grpc_pollset_init(pollset, &g_polling_mu);
+ detector.pollent = grpc_polling_entity_create_from_pollset(pollset);
detector.is_done = 0;
detector.success = 0;
@@ -117,7 +119,7 @@ static int is_stack_running_on_compute_engine(void) {
grpc_httpcli_context_init(&context);
grpc_httpcli_get(
- &exec_ctx, &context, detector.pollset, &request,
+ &exec_ctx, &context, &detector.pollent, &request,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), max_detection_delay),
on_compute_engine_detection_http_response, &detector);
@@ -128,19 +130,22 @@ static int is_stack_running_on_compute_engine(void) {
gpr_mu_lock(g_polling_mu);
while (!detector.is_done) {
grpc_pollset_worker *worker = NULL;
- grpc_pollset_work(&exec_ctx, detector.pollset, &worker,
- gpr_now(GPR_CLOCK_MONOTONIC),
+ grpc_pollset_work(&exec_ctx, grpc_polling_entity_pollset(&detector.pollent),
+ &worker, gpr_now(GPR_CLOCK_MONOTONIC),
gpr_inf_future(GPR_CLOCK_MONOTONIC));
}
gpr_mu_unlock(g_polling_mu);
grpc_httpcli_context_destroy(&context);
- grpc_closure_init(&destroy_closure, destroy_pollset, detector.pollset);
- grpc_pollset_shutdown(&exec_ctx, detector.pollset, &destroy_closure);
+ grpc_closure_init(&destroy_closure, destroy_pollset,
+ grpc_polling_entity_pollset(&detector.pollent));
+ grpc_pollset_shutdown(&exec_ctx,
+ grpc_polling_entity_pollset(&detector.pollent),
+ &destroy_closure);
grpc_exec_ctx_finish(&exec_ctx);
g_polling_mu = NULL;
- gpr_free(detector.pollset);
+ gpr_free(grpc_polling_entity_pollset(&detector.pollent));
return detector.success;
}
diff --git a/src/core/lib/security/credentials/iam/iam_credentials.c b/src/core/lib/security/credentials/iam/iam_credentials.c
index 89defa7c60..64d5871844 100644
--- a/src/core/lib/security/credentials/iam/iam_credentials.c
+++ b/src/core/lib/security/credentials/iam/iam_credentials.c
@@ -49,7 +49,7 @@ static void iam_destruct(grpc_call_credentials *creds) {
static void iam_get_request_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
grpc_auth_metadata_context context,
grpc_credentials_metadata_cb cb,
void *user_data) {
diff --git a/src/core/lib/security/credentials/jwt/jwt_credentials.c b/src/core/lib/security/credentials/jwt/jwt_credentials.c
index 8755a96af4..973fb75eaa 100644
--- a/src/core/lib/security/credentials/jwt/jwt_credentials.c
+++ b/src/core/lib/security/credentials/jwt/jwt_credentials.c
@@ -64,7 +64,7 @@ static void jwt_destruct(grpc_call_credentials *creds) {
static void jwt_get_request_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
grpc_auth_metadata_context context,
grpc_credentials_metadata_cb cb,
void *user_data) {
diff --git a/src/core/lib/security/credentials/jwt/jwt_verifier.c b/src/core/lib/security/credentials/jwt/jwt_verifier.c
index cd6c7ce392..1f67db103c 100644
--- a/src/core/lib/security/credentials/jwt/jwt_verifier.c
+++ b/src/core/lib/security/credentials/jwt/jwt_verifier.c
@@ -37,6 +37,7 @@
#include <string.h>
#include "src/core/lib/http/httpcli.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/security/util/b64.h"
#include "src/core/lib/tsi/ssl_types.h"
@@ -321,7 +322,7 @@ grpc_jwt_verifier_status grpc_jwt_claims_check(const grpc_jwt_claims *claims,
typedef struct {
grpc_jwt_verifier *verifier;
- grpc_pollset *pollset;
+ grpc_polling_entity pollent;
jose_header *header;
grpc_jwt_claims *claims;
char *audience;
@@ -337,10 +338,11 @@ static verifier_cb_ctx *verifier_cb_ctx_create(
grpc_jwt_claims *claims, const char *audience, gpr_slice signature,
const char *signed_jwt, size_t signed_jwt_len, void *user_data,
grpc_jwt_verification_done_cb cb) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
verifier_cb_ctx *ctx = gpr_malloc(sizeof(verifier_cb_ctx));
memset(ctx, 0, sizeof(verifier_cb_ctx));
ctx->verifier = verifier;
- ctx->pollset = pollset;
+ ctx->pollent = grpc_polling_entity_create_from_pollset(pollset);
ctx->header = header;
ctx->audience = gpr_strdup(audience);
ctx->claims = claims;
@@ -348,6 +350,7 @@ static verifier_cb_ctx *verifier_cb_ctx_create(
ctx->signed_data = gpr_slice_from_copied_buffer(signed_jwt, signed_jwt_len);
ctx->user_data = user_data;
ctx->user_cb = cb;
+ grpc_exec_ctx_finish(&exec_ctx);
return ctx;
}
@@ -642,7 +645,7 @@ static void on_openid_config_retrieved(grpc_exec_ctx *exec_ctx, void *user_data,
*(req.host + (req.http.path - jwks_uri)) = '\0';
}
grpc_httpcli_get(
- exec_ctx, &ctx->verifier->http_ctx, ctx->pollset, &req,
+ exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
on_keys_retrieved, ctx);
grpc_json_destroy(json);
@@ -745,7 +748,7 @@ static void retrieve_key_and_verify(grpc_exec_ctx *exec_ctx,
}
grpc_httpcli_get(
- exec_ctx, &ctx->verifier->http_ctx, ctx->pollset, &req,
+ exec_ctx, &ctx->verifier->http_ctx, &ctx->pollent, &req,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), grpc_jwt_verifier_max_delay),
http_cb, ctx);
gpr_free(req.host);
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
index 973c6e1d17..268026b9f0 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.c
@@ -244,7 +244,7 @@ static void on_oauth2_token_fetcher_http_response(
static void oauth2_token_fetcher_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
- grpc_pollset *pollset, grpc_auth_metadata_context context,
+ grpc_polling_entity *pollent, grpc_auth_metadata_context context,
grpc_credentials_metadata_cb cb, void *user_data) {
grpc_oauth2_token_fetcher_credentials *c =
(grpc_oauth2_token_fetcher_credentials *)creds;
@@ -270,7 +270,7 @@ static void oauth2_token_fetcher_get_request_metadata(
c->fetch_func(
exec_ctx,
grpc_credentials_metadata_request_create(creds, cb, user_data),
- &c->httpcli_context, pollset, on_oauth2_token_fetcher_http_response,
+ &c->httpcli_context, pollent, on_oauth2_token_fetcher_http_response,
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), refresh_threshold));
}
}
@@ -295,7 +295,7 @@ static grpc_call_credentials_vtable compute_engine_vtable = {
static void compute_engine_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
- grpc_httpcli_context *httpcli_context, grpc_pollset *pollset,
+ grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent,
grpc_httpcli_response_cb response_cb, gpr_timespec deadline) {
grpc_http_header header = {"Metadata-Flavor", "Google"};
grpc_httpcli_request request;
@@ -304,7 +304,7 @@ static void compute_engine_fetch_oauth2(
request.http.path = GRPC_COMPUTE_ENGINE_METADATA_TOKEN_PATH;
request.http.hdr_count = 1;
request.http.hdrs = &header;
- grpc_httpcli_get(exec_ctx, httpcli_context, pollset, &request, deadline,
+ grpc_httpcli_get(exec_ctx, httpcli_context, pollent, &request, deadline,
response_cb, metadata_req);
}
@@ -336,7 +336,7 @@ static grpc_call_credentials_vtable refresh_token_vtable = {
static void refresh_token_fetch_oauth2(
grpc_exec_ctx *exec_ctx, grpc_credentials_metadata_request *metadata_req,
- grpc_httpcli_context *httpcli_context, grpc_pollset *pollset,
+ grpc_httpcli_context *httpcli_context, grpc_polling_entity *pollent,
grpc_httpcli_response_cb response_cb, gpr_timespec deadline) {
grpc_google_refresh_token_credentials *c =
(grpc_google_refresh_token_credentials *)metadata_req->creds;
@@ -353,7 +353,7 @@ static void refresh_token_fetch_oauth2(
request.http.hdr_count = 1;
request.http.hdrs = &header;
request.handshaker = &grpc_httpcli_ssl;
- grpc_httpcli_post(exec_ctx, httpcli_context, pollset, &request, body,
+ grpc_httpcli_post(exec_ctx, httpcli_context, pollent, &request, body,
strlen(body), deadline, response_cb, metadata_req);
gpr_free(body);
}
@@ -396,7 +396,7 @@ static void access_token_destruct(grpc_call_credentials *creds) {
static void access_token_get_request_metadata(
grpc_exec_ctx *exec_ctx, grpc_call_credentials *creds,
- grpc_pollset *pollset, grpc_auth_metadata_context context,
+ grpc_polling_entity *pollent, grpc_auth_metadata_context context,
grpc_credentials_metadata_cb cb, void *user_data) {
grpc_access_token_credentials *c = (grpc_access_token_credentials *)creds;
cb(exec_ctx, user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK);
diff --git a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
index 04915b333f..3bc8360a41 100644
--- a/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
+++ b/src/core/lib/security/credentials/oauth2/oauth2_credentials.h
@@ -70,7 +70,7 @@ void grpc_auth_refresh_token_destruct(grpc_auth_refresh_token *refresh_token);
typedef void (*grpc_fetch_oauth2_func)(grpc_exec_ctx *exec_ctx,
grpc_credentials_metadata_request *req,
grpc_httpcli_context *http_context,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
grpc_httpcli_response_cb response_cb,
gpr_timespec deadline);
typedef struct {
diff --git a/src/core/lib/security/credentials/plugin/plugin_credentials.c b/src/core/lib/security/credentials/plugin/plugin_credentials.c
index bae357321e..9fb55e8466 100644
--- a/src/core/lib/security/credentials/plugin/plugin_credentials.c
+++ b/src/core/lib/security/credentials/plugin/plugin_credentials.c
@@ -94,7 +94,7 @@ static void plugin_md_request_metadata_ready(void *request,
static void plugin_get_request_metadata(grpc_exec_ctx *exec_ctx,
grpc_call_credentials *creds,
- grpc_pollset *pollset,
+ grpc_polling_entity *pollent,
grpc_auth_metadata_context context,
grpc_credentials_metadata_cb cb,
void *user_data) {
diff --git a/src/core/lib/security/transport/client_auth_filter.c b/src/core/lib/security/transport/client_auth_filter.c
index 27208ebb60..76be2acd72 100644
--- a/src/core/lib/security/transport/client_auth_filter.c
+++ b/src/core/lib/security/transport/client_auth_filter.c
@@ -54,11 +54,11 @@ typedef struct {
grpc_call_credentials *creds;
grpc_mdstr *host;
grpc_mdstr *method;
- /* pollset bound to this call; if we need to make external
- network requests, they should be done under this pollset
- so that work can progress when this call wants work to
- progress */
- grpc_pollset *pollset;
+ /* pollset{_set} bound to this call; if we need to make external
+ network requests, they should be done under a pollset added to this
+ pollset_set so that work can progress when this call wants work to progress
+ */
+ grpc_polling_entity *pollent;
grpc_transport_stream_op op;
uint8_t security_context_set;
grpc_linked_mdelem md_links[MAX_CREDENTIALS_METADATA_COUNT];
@@ -184,9 +184,9 @@ static void send_security_metadata(grpc_exec_ctx *exec_ctx,
build_auth_metadata_context(&chand->security_connector->base,
chand->auth_context, calld);
calld->op = *op; /* Copy op (originates from the caller's stack). */
- GPR_ASSERT(calld->pollset);
+ GPR_ASSERT(calld->pollent != NULL);
grpc_call_credentials_get_request_metadata(
- exec_ctx, calld->creds, calld->pollset, calld->auth_md_context,
+ exec_ctx, calld->creds, calld->pollent, calld->auth_md_context,
on_credentials_metadata, elem);
}
@@ -270,10 +270,11 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
memset(calld, 0, sizeof(*calld));
}
-static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_pollset *pollset) {
+static void set_pollset_or_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_call_element *elem,
+ grpc_polling_entity *pollent) {
call_data *calld = elem->call_data;
- calld->pollset = pollset;
+ calld->pollent = pollent;
}
/* Destructor for call_data */
@@ -329,8 +330,14 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
GRPC_AUTH_CONTEXT_UNREF(chand->auth_context, "client_auth_filter");
}
-const grpc_channel_filter grpc_client_auth_filter = {
- auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, set_pollset, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- grpc_call_next_get_peer, "client-auth"};
+const grpc_channel_filter grpc_client_auth_filter = {auth_start_transport_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "client-auth"};
diff --git a/src/core/lib/security/transport/server_auth_filter.c b/src/core/lib/security/transport/server_auth_filter.c
index 714e0adfc9..b36be127fd 100644
--- a/src/core/lib/security/transport/server_auth_filter.c
+++ b/src/core/lib/security/transport/server_auth_filter.c
@@ -220,9 +220,6 @@ static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
grpc_server_security_context_destroy;
}
-static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
- grpc_pollset *pollset) {}
-
/* Destructor for call_data */
static void destroy_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
const grpc_call_stats *stats, void *ignored) {}
@@ -258,7 +255,14 @@ static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
}
const grpc_channel_filter grpc_server_auth_filter = {
- auth_start_transport_op, grpc_channel_next_op, sizeof(call_data),
- init_call_elem, set_pollset, destroy_call_elem,
- sizeof(channel_data), init_channel_elem, destroy_channel_elem,
- grpc_call_next_get_peer, "server-auth"};
+ auth_start_transport_op,
+ grpc_channel_next_op,
+ sizeof(call_data),
+ init_call_elem,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
+ destroy_call_elem,
+ sizeof(channel_data),
+ init_channel_elem,
+ destroy_channel_elem,
+ grpc_call_next_get_peer,
+ "server-auth"};
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 54c93f8829..a9b1e25a77 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -113,6 +113,7 @@ typedef struct batch_control {
struct grpc_call {
grpc_completion_queue *cq;
+ grpc_polling_entity pollent;
grpc_channel *channel;
grpc_call *parent;
grpc_call *first_child;
@@ -224,13 +225,11 @@ static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
bool success);
-grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
- uint32_t propagation_mask,
- grpc_completion_queue *cq,
- const void *server_transport_data,
- grpc_mdelem **add_initial_metadata,
- size_t add_initial_metadata_count,
- gpr_timespec send_deadline) {
+grpc_call *grpc_call_create(
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative,
+ const void *server_transport_data, grpc_mdelem **add_initial_metadata,
+ size_t add_initial_metadata_count, gpr_timespec send_deadline) {
size_t i, j;
grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
@@ -267,9 +266,20 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
call->context, server_transport_data,
CALL_STACK_FROM_CALL(call));
if (cq != NULL) {
+ GPR_ASSERT(
+ pollset_set_alternative == NULL &&
+ "Only one of 'cq' and 'pollset_set_alternative' should be non-NULL.");
GRPC_CQ_INTERNAL_REF(cq, "bind");
- grpc_call_stack_set_pollset(&exec_ctx, CALL_STACK_FROM_CALL(call),
- grpc_cq_pollset(cq));
+ call->pollent =
+ grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
+ }
+ if (pollset_set_alternative != NULL) {
+ call->pollent =
+ grpc_polling_entity_create_from_pollset_set(pollset_set_alternative);
+ }
+ if (!grpc_polling_entity_is_empty(&call->pollent)) {
+ grpc_call_stack_set_pollset_or_pollset_set(
+ &exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
if (parent_call != NULL) {
GRPC_CALL_INTERNAL_REF(parent_call, "child");
@@ -324,10 +334,16 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
grpc_completion_queue *cq) {
GPR_ASSERT(cq);
+
+ if (grpc_polling_entity_pollset_set(&call->pollent) != NULL) {
+ gpr_log(GPR_ERROR, "A pollset_set is already registered for this call.");
+ abort();
+ }
call->cq = cq;
GRPC_CQ_INTERNAL_REF(cq, "bind");
- grpc_call_stack_set_pollset(exec_ctx, CALL_STACK_FROM_CALL(call),
- grpc_cq_pollset(cq));
+ call->pollent = grpc_polling_entity_create_from_pollset(grpc_cq_pollset(cq));
+ grpc_call_stack_set_pollset_or_pollset_set(
+ exec_ctx, CALL_STACK_FROM_CALL(call), &call->pollent);
}
#ifdef GRPC_STREAM_REFCOUNT_DEBUG
diff --git a/src/core/lib/surface/call.h b/src/core/lib/surface/call.h
index 2725e060b8..b640345c21 100644
--- a/src/core/lib/surface/call.h
+++ b/src/core/lib/surface/call.h
@@ -53,6 +53,8 @@ typedef void (*grpc_ioreq_completion_func)(grpc_exec_ctx *exec_ctx,
grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
uint32_t propagation_mask,
grpc_completion_queue *cq,
+ /* if not NULL, it'll be used in lieu of \a cq */
+ grpc_pollset_set *pollset_set_alternative,
const void *server_transport_data,
grpc_mdelem **add_initial_metadata,
size_t add_initial_metadata_count,
diff --git a/src/core/lib/surface/channel.c b/src/core/lib/surface/channel.c
index a71190f3c0..f0b3c2e15d 100644
--- a/src/core/lib/surface/channel.c
+++ b/src/core/lib/surface/channel.c
@@ -191,12 +191,14 @@ char *grpc_channel_get_target(grpc_channel *channel) {
static grpc_call *grpc_channel_create_call_internal(
grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
- grpc_completion_queue *cq, grpc_mdelem *path_mdelem,
- grpc_mdelem *authority_mdelem, gpr_timespec deadline) {
+ grpc_completion_queue *cq, grpc_pollset_set *pollset_set_alternative,
+ grpc_mdelem *path_mdelem, grpc_mdelem *authority_mdelem,
+ gpr_timespec deadline) {
grpc_mdelem *send_metadata[2];
size_t num_metadata = 0;
GPR_ASSERT(channel->is_client);
+ GPR_ASSERT(!(cq != NULL && pollset_set_alternative != NULL));
send_metadata[num_metadata++] = path_mdelem;
if (authority_mdelem != NULL) {
@@ -205,8 +207,9 @@ static grpc_call *grpc_channel_create_call_internal(
send_metadata[num_metadata++] = GRPC_MDELEM_REF(channel->default_authority);
}
- return grpc_call_create(channel, parent_call, propagation_mask, cq, NULL,
- send_metadata, num_metadata, deadline);
+ return grpc_call_create(channel, parent_call, propagation_mask, cq,
+ pollset_set_alternative, NULL, send_metadata,
+ num_metadata, deadline);
}
grpc_call *grpc_channel_create_call(grpc_channel *channel,
@@ -226,7 +229,22 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
(int)deadline.clock_type, reserved));
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
- channel, parent_call, propagation_mask, cq,
+ channel, parent_call, propagation_mask, cq, NULL,
+ grpc_mdelem_from_metadata_strings(GRPC_MDSTR_PATH,
+ grpc_mdstr_from_string(method)),
+ host ? grpc_mdelem_from_metadata_strings(GRPC_MDSTR_AUTHORITY,
+ grpc_mdstr_from_string(host))
+ : NULL,
+ deadline);
+}
+
+grpc_call *grpc_channel_create_pollset_set_call(
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_pollset_set *pollset_set, const char *method, const char *host,
+ gpr_timespec deadline, void *reserved) {
+ GPR_ASSERT(!reserved);
+ return grpc_channel_create_call_internal(
+ channel, parent_call, propagation_mask, NULL, pollset_set,
grpc_mdelem_from_metadata_strings(GRPC_MDSTR_PATH,
grpc_mdstr_from_string(method)),
host ? grpc_mdelem_from_metadata_strings(GRPC_MDSTR_AUTHORITY,
@@ -270,7 +288,7 @@ grpc_call *grpc_channel_create_registered_call(
(int)deadline.tv_nsec, (int)deadline.clock_type, reserved));
GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
- channel, parent_call, propagation_mask, completion_queue,
+ channel, parent_call, propagation_mask, completion_queue, NULL,
GRPC_MDELEM_REF(rc->path),
rc->authority ? GRPC_MDELEM_REF(rc->authority) : NULL, deadline);
}
diff --git a/src/core/lib/surface/channel.h b/src/core/lib/surface/channel.h
index c3279fef41..7eff7b8883 100644
--- a/src/core/lib/surface/channel.h
+++ b/src/core/lib/surface/channel.h
@@ -42,6 +42,11 @@ grpc_channel *grpc_channel_create(grpc_exec_ctx *exec_ctx, const char *target,
grpc_channel_stack_type channel_stack_type,
grpc_transport *optional_transport);
+grpc_call *grpc_channel_create_pollset_set_call(
+ grpc_channel *channel, grpc_call *parent_call, uint32_t propagation_mask,
+ grpc_pollset_set *pollset_set, const char *method, const char *host,
+ gpr_timespec deadline, void *reserved);
+
/** Get a (borrowed) pointer to this channels underlying channel stack */
grpc_channel_stack *grpc_channel_get_channel_stack(grpc_channel *channel);
diff --git a/src/core/lib/surface/lame_client.c b/src/core/lib/surface/lame_client.c
index eef862787f..8223555382 100644
--- a/src/core/lib/surface/lame_client.c
+++ b/src/core/lib/surface/lame_client.c
@@ -128,7 +128,7 @@ const grpc_channel_filter grpc_lame_filter = {
lame_start_transport_op,
sizeof(call_data),
init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c
index 9532e090a4..88d898bc1c 100644
--- a/src/core/lib/surface/server.c
+++ b/src/core/lib/surface/server.c
@@ -793,9 +793,9 @@ static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
const void *transport_server_data) {
channel_data *chand = cd;
/* create a call */
- grpc_call *call =
- grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data,
- NULL, 0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
+ grpc_call *call = grpc_call_create(chand->channel, NULL, 0, NULL, NULL,
+ transport_server_data, NULL, 0,
+ gpr_inf_future(GPR_CLOCK_MONOTONIC));
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
@@ -910,7 +910,7 @@ const grpc_channel_filter grpc_server_top_filter = {
grpc_channel_next_op,
sizeof(call_data),
init_call_elem,
- grpc_call_stack_ignore_set_pollset,
+ grpc_call_stack_ignore_set_pollset_or_pollset_set,
destroy_call_elem,
sizeof(channel_data),
init_channel_elem,
diff --git a/src/core/lib/transport/transport.c b/src/core/lib/transport/transport.c
index e6d524abe6..b65e157a02 100644
--- a/src/core/lib/transport/transport.c
+++ b/src/core/lib/transport/transport.c
@@ -125,10 +125,19 @@ void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx,
transport->vtable->perform_op(exec_ctx, transport, op);
}
-void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
- grpc_transport *transport, grpc_stream *stream,
- grpc_pollset *pollset) {
- transport->vtable->set_pollset(exec_ctx, transport, stream, pollset);
+void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
+ grpc_stream *stream,
+ grpc_polling_entity *pollent) {
+ grpc_pollset *pollset;
+ grpc_pollset_set *pollset_set;
+ if ((pollset = grpc_polling_entity_pollset(pollent)) != NULL) {
+ transport->vtable->set_pollset(exec_ctx, transport, stream, pollset);
+ } else if ((pollset_set = grpc_polling_entity_pollset_set(pollent)) != NULL) {
+ transport->vtable->set_pollset_set(exec_ctx, transport, stream,
+ pollset_set);
+ } else {
+ abort();
+ }
}
void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/transport/transport.h b/src/core/lib/transport/transport.h
index 482a9d1791..ed06fc3ed2 100644
--- a/src/core/lib/transport/transport.h
+++ b/src/core/lib/transport/transport.h
@@ -37,6 +37,7 @@
#include <stddef.h>
#include "src/core/lib/channel/context.h"
+#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset.h"
#include "src/core/lib/iomgr/pollset_set.h"
#include "src/core/lib/transport/byte_stream.h"
@@ -197,9 +198,8 @@ int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
grpc_stream_refcount *refcount,
const void *server_data);
-void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
- grpc_transport *transport, grpc_stream *stream,
- grpc_pollset *pollset);
+void grpc_transport_set_pops(grpc_exec_ctx *exec_ctx, grpc_transport *transport,
+ grpc_stream *stream, grpc_polling_entity *pollent);
/* Destroy transport data for a stream.
diff --git a/src/core/lib/transport/transport_impl.h b/src/core/lib/transport/transport_impl.h
index 956155eec8..fc7140671b 100644
--- a/src/core/lib/transport/transport_impl.h
+++ b/src/core/lib/transport/transport_impl.h
@@ -53,6 +53,10 @@ typedef struct grpc_transport_vtable {
void (*set_pollset)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_pollset *pollset);
+ /* implementation of grpc_transport_set_pollset */
+ void (*set_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
+ grpc_stream *stream, grpc_pollset_set *pollset_set);
+
/* implementation of grpc_transport_perform_stream_op */
void (*perform_stream_op)(grpc_exec_ctx *exec_ctx, grpc_transport *self,
grpc_stream *stream, grpc_transport_stream_op *op);
diff --git a/src/core/plugin_registry/grpc_cronet_plugin_registry.c b/src/core/plugin_registry/grpc_cronet_plugin_registry.c
new file mode 100644
index 0000000000..d0b5f5c702
--- /dev/null
+++ b/src/core/plugin_registry/grpc_cronet_plugin_registry.c
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/grpc.h>
+
+extern void grpc_chttp2_plugin_init(void);
+extern void grpc_chttp2_plugin_shutdown(void);
+extern void grpc_client_config_init(void);
+extern void grpc_client_config_shutdown(void);
+
+void grpc_register_built_in_plugins(void) {
+ grpc_register_plugin(grpc_chttp2_plugin_init,
+ grpc_chttp2_plugin_shutdown);
+ grpc_register_plugin(grpc_client_config_init,
+ grpc_client_config_shutdown);
+}
diff --git a/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs b/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs
new file mode 100644
index 0000000000..e605a310f9
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/AppDomainUnloadTest.cs
@@ -0,0 +1,90 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Reflection;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class AppDomainUnloadTest
+ {
+ [Test]
+ public void AppDomainUnloadHookCanCleanupAbandonedCall()
+ {
+ var setup = new AppDomainSetup
+ {
+ ApplicationBase = AppDomain.CurrentDomain.BaseDirectory
+ };
+ var childDomain = AppDomain.CreateDomain("test", null, setup);
+ var remoteObj = childDomain.CreateInstance(typeof(AppDomainTestClass).Assembly.GetName().Name, typeof(AppDomainTestClass).FullName);
+
+ // Try to unload the appdomain once we've created a server and a channel inside the appdomain.
+ AppDomain.Unload(childDomain);
+ }
+
+ public class AppDomainTestClass
+ {
+ const string Host = "127.0.0.1";
+
+ /// <summary>
+ /// Creates a server and a channel and initiates a call. The code is invoked from inside of an AppDomain
+ /// to test if AppDomain.Unload() work if Grpc is being used.
+ /// </summary>
+ public AppDomainTestClass()
+ {
+ var helper = new MockServiceHelper(Host);
+ var server = helper.GetServer();
+ server.Start();
+ var channel = helper.GetChannel();
+
+ var readyToShutdown = new TaskCompletionSource<object>();
+ helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ readyToShutdown.SetResult(null);
+ await requestStream.ToListAsync();
+ });
+
+ var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall());
+ readyToShutdown.Task.Wait(); // make sure handler is running
+ }
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
index 850d70ce92..db0ef3a4cd 100644
--- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
@@ -71,7 +71,7 @@ namespace Grpc.Core.Tests
public void WaitForStateChangedAsync_InvalidArgument()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
- Assert.ThrowsAsync(typeof(ArgumentException), async () => await channel.WaitForStateChangedAsync(ChannelState.FatalFailure));
+ Assert.ThrowsAsync(typeof(ArgumentException), async () => await channel.WaitForStateChangedAsync(ChannelState.Shutdown));
channel.ShutdownAsync().Wait();
}
@@ -102,11 +102,11 @@ namespace Grpc.Core.Tests
}
[Test]
- public async Task StateIsFatalFailureAfterShutdown()
+ public async Task StateIsShutdownAfterShutdown()
{
var channel = new Channel("localhost", ChannelCredentials.Insecure);
await channel.ShutdownAsync();
- Assert.AreEqual(ChannelState.FatalFailure, channel.State);
+ Assert.AreEqual(ChannelState.Shutdown, channel.State);
}
[Test]
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 47131fc454..074c9603dc 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -86,6 +86,10 @@
<Compile Include="NUnitMain.cs" />
<Compile Include="Internal\FakeNativeCall.cs" />
<Compile Include="Internal\AsyncCallServerTest.cs" />
+ <Compile Include="ShutdownHookServerTest.cs" />
+ <Compile Include="ShutdownHookPendingCallTest.cs" />
+ <Compile Include="ShutdownHookClientTest.cs" />
+ <Compile Include="AppDomainUnloadTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
index 6fe382751a..3ec2cf48cd 100644
--- a/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
+++ b/src/csharp/Grpc.Core.Tests/GrpcEnvironmentTest.cs
@@ -49,7 +49,7 @@ namespace Grpc.Core.Tests
{
Assert.IsNotNull(env.CompletionQueues.ElementAt(i));
}
- GrpcEnvironment.Release();
+ GrpcEnvironment.ReleaseAsync().Wait();
}
[Test]
@@ -58,8 +58,8 @@ namespace Grpc.Core.Tests
var env1 = GrpcEnvironment.AddRef();
var env2 = GrpcEnvironment.AddRef();
Assert.AreSame(env1, env2);
- GrpcEnvironment.Release();
- GrpcEnvironment.Release();
+ GrpcEnvironment.ReleaseAsync().Wait();
+ GrpcEnvironment.ReleaseAsync().Wait();
}
[Test]
@@ -68,10 +68,10 @@ namespace Grpc.Core.Tests
Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
var env1 = GrpcEnvironment.AddRef();
- GrpcEnvironment.Release();
+ GrpcEnvironment.ReleaseAsync().Wait();
var env2 = GrpcEnvironment.AddRef();
- GrpcEnvironment.Release();
+ GrpcEnvironment.ReleaseAsync().Wait();
Assert.AreNotSame(env1, env2);
}
@@ -80,7 +80,7 @@ namespace Grpc.Core.Tests
public void ReleaseWithoutAddRef()
{
Assert.AreEqual(0, GrpcEnvironment.GetRefCount());
- Assert.Throws(typeof(InvalidOperationException), () => GrpcEnvironment.Release());
+ Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await GrpcEnvironment.ReleaseAsync());
}
[Test]
diff --git a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
index 195119f920..e9ec59eb3d 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/CompletionQueueSafeHandleTest.cs
@@ -48,7 +48,7 @@ namespace Grpc.Core.Internal.Tests
GrpcEnvironment.AddRef();
var cq = CompletionQueueSafeHandle.Create();
cq.Dispose();
- GrpcEnvironment.Release();
+ GrpcEnvironment.ReleaseAsync().Wait();
}
[Test]
@@ -59,7 +59,7 @@ namespace Grpc.Core.Internal.Tests
cq.Shutdown();
var ev = cq.Next();
cq.Dispose();
- GrpcEnvironment.Release();
+ GrpcEnvironment.ReleaseAsync().Wait();
Assert.AreEqual(CompletionQueueEvent.CompletionType.Shutdown, ev.type);
Assert.AreNotEqual(IntPtr.Zero, ev.success);
Assert.AreEqual(IntPtr.Zero, ev.tag);
diff --git a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
index 3047314345..4d90470056 100644
--- a/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
+++ b/src/csharp/Grpc.Core.Tests/MockServiceHelper.cs
@@ -102,7 +102,7 @@ namespace Grpc.Core.Tests
marshaller,
marshaller);
- serviceDefinition = ServerServiceDefinition.CreateBuilder(ServiceName)
+ serviceDefinition = ServerServiceDefinition.CreateBuilder()
.AddMethod(unaryMethod, (request, context) => unaryHandler(request, context))
.AddMethod(clientStreamingMethod, (requestStream, context) => clientStreamingHandler(requestStream, context))
.AddMethod(serverStreamingMethod, (request, responseStream, context) => serverStreamingHandler(request, responseStream, context))
diff --git a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
index d2b2fc6a66..d3735c7880 100644
--- a/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
+++ b/src/csharp/Grpc.Core.Tests/PInvokeTest.cs
@@ -65,7 +65,7 @@ namespace Grpc.Core.Tests
cq.Dispose();
});
- GrpcEnvironment.Release();
+ GrpcEnvironment.ReleaseAsync().Wait();
}
/// <summary>
diff --git a/src/csharp/Grpc.Core.Tests/ServerTest.cs b/src/csharp/Grpc.Core.Tests/ServerTest.cs
index b40508accc..fa693162ad 100644
--- a/src/csharp/Grpc.Core.Tests/ServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ServerTest.cs
@@ -89,7 +89,7 @@ namespace Grpc.Core.Tests
};
server.Start();
Assert.Throws(typeof(InvalidOperationException), () => server.Ports.Add("localhost", 9999, ServerCredentials.Insecure));
- Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder("serviceName").Build()));
+ Assert.Throws(typeof(InvalidOperationException), () => server.Services.Add(ServerServiceDefinition.CreateBuilder().Build()));
server.ShutdownAsync().Wait();
}
diff --git a/src/csharp/Grpc.Core.Tests/ShutdownHookClientTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownHookClientTest.cs
new file mode 100644
index 0000000000..12b8452f64
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ShutdownHookClientTest.cs
@@ -0,0 +1,57 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class ShutdownHookClientTest
+ {
+ const string Host = "127.0.0.1";
+
+ [Test]
+ public void ProcessExitHookCanCleanupAbandonedChannels()
+ {
+ var channel = new Channel(Host, 1000, ChannelCredentials.Insecure);
+ var channel2 = new Channel(Host, 1001, ChannelCredentials.Insecure);
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ShutdownHookPendingCallTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownHookPendingCallTest.cs
new file mode 100644
index 0000000000..175233840d
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ShutdownHookPendingCallTest.cs
@@ -0,0 +1,69 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class ShutdownHookPendingCallTest
+ {
+ const string Host = "127.0.0.1";
+
+ [Test]
+ public void ProcessExitHookCanCleanupAbandonedCall()
+ {
+ var helper = new MockServiceHelper(Host);
+ var server = helper.GetServer();
+ server.Start();
+ var channel = helper.GetChannel();
+
+ var readyToShutdown = new TaskCompletionSource<object>();
+ helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
+ {
+ readyToShutdown.SetResult(null);
+ await requestStream.ToListAsync();
+ });
+
+ var call = Calls.AsyncDuplexStreamingCall(helper.CreateDuplexStreamingCall());
+ readyToShutdown.Task.Wait(); // make sure handler is running
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/ShutdownHookServerTest.cs b/src/csharp/Grpc.Core.Tests/ShutdownHookServerTest.cs
new file mode 100644
index 0000000000..e7ea7a0bf5
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/ShutdownHookServerTest.cs
@@ -0,0 +1,58 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+using Grpc.Core.Internal;
+using Grpc.Core.Utils;
+using NUnit.Framework;
+
+namespace Grpc.Core.Tests
+{
+ public class ShutdownHookServerTest
+ {
+ const string Host = "127.0.0.1";
+
+ [Test]
+ public void ProcessExitHookCanCleanupAbandonedServers()
+ {
+ var helper = new MockServiceHelper(Host);
+ var server = helper.GetServer();
+ server.Start();
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
index 5646fed3d9..02b08d2a10 100644
--- a/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncClientStreamingCall.cs
@@ -127,6 +127,10 @@ namespace Grpc.Core
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
+ /// <remarks>
+ /// Normally, there is no need for you to dispose the call unless you want to utilize the
+ /// "Cancel" semantics of invoking <c>Dispose</c>.
+ /// </remarks>
public void Dispose()
{
disposeAction.Invoke();
diff --git a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
index e75108c7e5..68fd6d0b05 100644
--- a/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncDuplexStreamingCall.cs
@@ -117,6 +117,10 @@ namespace Grpc.Core
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
+ /// <remarks>
+ /// Normally, there is no need for you to dispose the call unless you want to utilize the
+ /// "Cancel" semantics of invoking <c>Dispose</c>.
+ /// </remarks>
public void Dispose()
{
disposeAction.Invoke();
diff --git a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
index f953091984..5777c72615 100644
--- a/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
+++ b/src/csharp/Grpc.Core/AsyncServerStreamingCall.cs
@@ -103,6 +103,10 @@ namespace Grpc.Core
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
+ /// <remarks>
+ /// Normally, there is no need for you to dispose the call unless you want to utilize the
+ /// "Cancel" semantics of invoking <c>Dispose</c>.
+ /// </remarks>
public void Dispose()
{
disposeAction.Invoke();
diff --git a/src/csharp/Grpc.Core/AsyncUnaryCall.cs b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
index 97df8f5e91..d180c27922 100644
--- a/src/csharp/Grpc.Core/AsyncUnaryCall.cs
+++ b/src/csharp/Grpc.Core/AsyncUnaryCall.cs
@@ -112,6 +112,10 @@ namespace Grpc.Core
/// Otherwise, requests cancellation of the call which should terminate all pending async operations associated with the call.
/// As a result, all resources being used by the call should be released eventually.
/// </summary>
+ /// <remarks>
+ /// Normally, there is no need for you to dispose the call unless you want to utilize the
+ /// "Cancel" semantics of invoking <c>Dispose</c>.
+ /// </remarks>
public void Dispose()
{
disposeAction.Invoke();
diff --git a/src/csharp/Grpc.Core/CallOptions.cs b/src/csharp/Grpc.Core/CallOptions.cs
index 9ca88849ee..35548cfc96 100644
--- a/src/csharp/Grpc.Core/CallOptions.cs
+++ b/src/csharp/Grpc.Core/CallOptions.cs
@@ -88,7 +88,13 @@ namespace Grpc.Core
}
/// <summary>
- /// Token that can be used for cancelling the call.
+ /// Token that can be used for cancelling the call on the client side.
+ /// Cancelling the token will request cancellation
+ /// of the remote call. Best effort will be made to deliver the cancellation
+ /// notification to the server and interaction of the call with the server side
+ /// will be terminated. Unless the call finishes before the cancellation could
+ /// happen (there is an inherent race),
+ /// the call will finish with <c>StatusCode.Cancelled</c> status.
/// </summary>
public CancellationToken CancellationToken
{
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 886adfec33..4f29c35b32 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -67,8 +67,19 @@ namespace Grpc.Core
/// </summary>
/// <param name="target">Target of the channel.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
+ public Channel(string target, ChannelCredentials credentials) :
+ this(target, credentials, null)
+ {
+ }
+
+ /// <summary>
+ /// Creates a channel that connects to a specific host.
+ /// Port will default to 80 for an unsecure channel and to 443 for a secure channel.
+ /// </summary>
+ /// <param name="target">Target of the channel.</param>
+ /// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
- public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options = null)
+ public Channel(string target, ChannelCredentials credentials, IEnumerable<ChannelOption> options)
{
this.target = GrpcPreconditions.CheckNotNull(target, "target");
this.options = CreateOptionsDictionary(options);
@@ -88,6 +99,18 @@ namespace Grpc.Core
this.handle = ChannelSafeHandle.CreateInsecure(target, nativeChannelArgs);
}
}
+ GrpcEnvironment.RegisterChannel(this);
+ }
+
+ /// <summary>
+ /// Creates a channel that connects to a specific host and port.
+ /// </summary>
+ /// <param name="host">The name or IP address of the host.</param>
+ /// <param name="port">The port.</param>
+ /// <param name="credentials">Credentials to secure the channel.</param>
+ public Channel(string host, int port, ChannelCredentials credentials) :
+ this(host, port, credentials, null)
+ {
}
/// <summary>
@@ -97,14 +120,14 @@ namespace Grpc.Core
/// <param name="port">The port.</param>
/// <param name="credentials">Credentials to secure the channel.</param>
/// <param name="options">Channel options.</param>
- public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options = null) :
+ public Channel(string host, int port, ChannelCredentials credentials, IEnumerable<ChannelOption> options) :
this(string.Format("{0}:{1}", host, port), credentials, options)
{
}
/// <summary>
/// Gets current connectivity state of this channel.
- /// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned.
+ /// After channel is has been shutdown, <c>ChannelState.Shutdown</c> will be returned.
/// </summary>
public ChannelState State
{
@@ -121,8 +144,8 @@ namespace Grpc.Core
/// </summary>
public Task WaitForStateChangedAsync(ChannelState lastObservedState, DateTime? deadline = null)
{
- GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.FatalFailure,
- "FatalFailure is a terminal state. No further state changes can occur.");
+ GrpcPreconditions.CheckArgument(lastObservedState != ChannelState.Shutdown,
+ "Shutdown is a terminal state. No further state changes can occur.");
var tcs = new TaskCompletionSource<object>();
var deadlineTimespec = deadline.HasValue ? Timespec.FromDateTime(deadline.Value) : Timespec.InfFuture;
var handler = new BatchCompletionDelegate((success, ctx) =>
@@ -172,7 +195,7 @@ namespace Grpc.Core
/// <summary>
/// Allows explicitly requesting channel to connect without starting an RPC.
/// Returned task completes once state Ready was seen. If the deadline is reached,
- /// or channel enters the FatalFailure state, the task is cancelled.
+ /// or channel enters the Shutdown state, the task is cancelled.
/// There is no need to call this explicitly unless your use case requires that.
/// Starting an RPC on a new channel will request connection implicitly.
/// </summary>
@@ -182,9 +205,9 @@ namespace Grpc.Core
var currentState = GetConnectivityState(true);
while (currentState != ChannelState.Ready)
{
- if (currentState == ChannelState.FatalFailure)
+ if (currentState == ChannelState.Shutdown)
{
- throw new OperationCanceledException("Channel has reached FatalFailure state.");
+ throw new OperationCanceledException("Channel has reached Shutdown state.");
}
await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
currentState = GetConnectivityState(false);
@@ -192,9 +215,16 @@ namespace Grpc.Core
}
/// <summary>
- /// Waits until there are no more active calls for this channel and then cleans up
- /// resources used by this channel.
+ /// Shuts down the channel cleanly. It is strongly recommended to shutdown
+ /// all previously created channels before exiting from the process.
/// </summary>
+ /// <remarks>
+ /// This method doesn't wait for all calls on this channel to finish (nor does
+ /// it explicitly cancel all outstanding calls). It is user's responsibility to make sure
+ /// all the calls on this channel have finished (successfully or with an error)
+ /// before shutting down the channel to ensure channel shutdown won't impact
+ /// the outcome of those remote calls.
+ /// </remarks>
public async Task ShutdownAsync()
{
lock (myLock)
@@ -202,6 +232,7 @@ namespace Grpc.Core
GrpcPreconditions.CheckState(!shutdownRequested);
shutdownRequested = true;
}
+ GrpcEnvironment.UnregisterChannel(this);
shutdownTokenSource.Cancel();
@@ -213,7 +244,7 @@ namespace Grpc.Core
handle.Dispose();
- await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
+ await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
}
internal ChannelSafeHandle Handle
@@ -264,7 +295,7 @@ namespace Grpc.Core
}
catch (ObjectDisposedException)
{
- return ChannelState.FatalFailure;
+ return ChannelState.Shutdown;
}
}
diff --git a/src/csharp/Grpc.Core/ChannelState.cs b/src/csharp/Grpc.Core/ChannelState.cs
index d293b98f75..a6c3b2a488 100644
--- a/src/csharp/Grpc.Core/ChannelState.cs
+++ b/src/csharp/Grpc.Core/ChannelState.cs
@@ -64,6 +64,6 @@ namespace Grpc.Core
/// <summary>
/// Channel has seen a failure that it cannot recover from
/// </summary>
- FatalFailure
+ Shutdown
}
}
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index 18af1099f1..0359d9092a 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -35,6 +35,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
+using System.Threading.Tasks;
using Grpc.Core.Internal;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@@ -53,12 +54,16 @@ namespace Grpc.Core
static int refCount;
static int? customThreadPoolSize;
static int? customCompletionQueueCount;
+ static readonly HashSet<Channel> registeredChannels = new HashSet<Channel>();
+ static readonly HashSet<Server> registeredServers = new HashSet<Server>();
static ILogger logger = new ConsoleLogger();
+ readonly object myLock = new object();
readonly GrpcThreadPool threadPool;
readonly DebugStats debugStats = new DebugStats();
readonly AtomicCounter cqPickerCounter = new AtomicCounter();
+
bool isClosed;
/// <summary>
@@ -67,6 +72,8 @@ namespace Grpc.Core
/// </summary>
internal static GrpcEnvironment AddRef()
{
+ ShutdownHooks.Register();
+
lock (staticLock)
{
refCount++;
@@ -79,21 +86,26 @@ namespace Grpc.Core
}
/// <summary>
- /// Decrements the reference count for currently active environment and shuts down the gRPC environment if reference count drops to zero.
- /// (and blocks until the environment has been fully shutdown).
+ /// Decrements the reference count for currently active environment and asynchronously shuts down the gRPC environment if reference count drops to zero.
/// </summary>
- internal static void Release()
+ internal static async Task ReleaseAsync()
{
+ GrpcEnvironment instanceToShutdown = null;
lock (staticLock)
{
GrpcPreconditions.CheckState(refCount > 0);
refCount--;
if (refCount == 0)
{
- instance.Close();
+ instanceToShutdown = instance;
instance = null;
}
}
+
+ if (instanceToShutdown != null)
+ {
+ await instanceToShutdown.ShutdownAsync();
+ }
}
internal static int GetRefCount()
@@ -104,6 +116,68 @@ namespace Grpc.Core
}
}
+ internal static void RegisterChannel(Channel channel)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(channel);
+ registeredChannels.Add(channel);
+ }
+ }
+
+ internal static void UnregisterChannel(Channel channel)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(channel);
+ GrpcPreconditions.CheckArgument(registeredChannels.Remove(channel), "Channel not found in the registered channels set.");
+ }
+ }
+
+ internal static void RegisterServer(Server server)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(server);
+ registeredServers.Add(server);
+ }
+ }
+
+ internal static void UnregisterServer(Server server)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckNotNull(server);
+ GrpcPreconditions.CheckArgument(registeredServers.Remove(server), "Server not found in the registered servers set.");
+ }
+ }
+
+ /// <summary>
+ /// Requests shutdown of all channels created by the current process.
+ /// </summary>
+ public static Task ShutdownChannelsAsync()
+ {
+ HashSet<Channel> snapshot = null;
+ lock (staticLock)
+ {
+ snapshot = new HashSet<Channel>(registeredChannels);
+ }
+ return Task.WhenAll(snapshot.Select((channel) => channel.ShutdownAsync()));
+ }
+
+ /// <summary>
+ /// Requests immediate shutdown of all servers created by the current process.
+ /// </summary>
+ public static Task KillServersAsync()
+ {
+ HashSet<Server> snapshot = null;
+ lock (staticLock)
+ {
+ snapshot = new HashSet<Server>(registeredServers);
+ }
+ return Task.WhenAll(snapshot.Select((server) => server.KillAsync()));
+ }
+
/// <summary>
/// Gets application-wide logger used by gRPC.
/// </summary>
@@ -180,6 +254,14 @@ namespace Grpc.Core
}
}
+ internal bool IsAlive
+ {
+ get
+ {
+ return this.threadPool.IsAlive;
+ }
+ }
+
/// <summary>
/// Picks a completion queue in a round-robin fashion.
/// Shouldn't be invoked on a per-call basis (used at per-channel basis).
@@ -223,13 +305,13 @@ namespace Grpc.Core
/// <summary>
/// Shuts down this environment.
/// </summary>
- private void Close()
+ private async Task ShutdownAsync()
{
if (isClosed)
{
throw new InvalidOperationException("Close has already been called");
}
- threadPool.Stop();
+ await threadPool.StopAsync().ConfigureAwait(false);
GrpcNativeShutdown();
isClosed = true;
@@ -257,5 +339,32 @@ namespace Grpc.Core
// by default, create a completion queue for each thread
return GetThreadPoolSizeOrDefault();
}
+
+ private static class ShutdownHooks
+ {
+ static object staticLock = new object();
+ static bool hooksRegistered;
+
+ public static void Register()
+ {
+ lock (staticLock)
+ {
+ if (!hooksRegistered)
+ {
+ AppDomain.CurrentDomain.ProcessExit += ShutdownHookHandler;
+ AppDomain.CurrentDomain.DomainUnload += ShutdownHookHandler;
+ }
+ hooksRegistered = true;
+ }
+ }
+
+ /// <summary>
+ /// Handler for AppDomain.DomainUnload and AppDomain.ProcessExit hooks.
+ /// </summary>
+ private static void ShutdownHookHandler(object sender, EventArgs e)
+ {
+ Task.WaitAll(GrpcEnvironment.ShutdownChannelsAsync(), GrpcEnvironment.KillServersAsync());
+ }
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
index 4de543bef7..a446c1f99f 100644
--- a/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
+++ b/src/csharp/Grpc.Core/Internal/GrpcThreadPool.cs
@@ -35,6 +35,7 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
+using System.Threading.Tasks;
using Grpc.Core.Logging;
using Grpc.Core.Utils;
@@ -53,6 +54,8 @@ namespace Grpc.Core.Internal
readonly int poolSize;
readonly int completionQueueCount;
+ bool stopRequested;
+
IReadOnlyCollection<CompletionQueueSafeHandle> completionQueues;
/// <summary>
@@ -84,15 +87,21 @@ namespace Grpc.Core.Internal
}
}
- public void Stop()
+ public Task StopAsync()
{
lock (myLock)
{
+ GrpcPreconditions.CheckState(!stopRequested, "Stop already requested.");
+ stopRequested = true;
+
foreach (var cq in completionQueues)
{
cq.Shutdown();
}
+ }
+ return Task.Run(() =>
+ {
foreach (var thread in threads)
{
thread.Join();
@@ -102,6 +111,21 @@ namespace Grpc.Core.Internal
{
cq.Dispose();
}
+ });
+ }
+
+ /// <summary>
+ /// Returns true if there is at least one thread pool thread that hasn't
+ /// already stopped.
+ /// Threads can either stop because all completion queues shut down or
+ /// because all foreground threads have already shutdown and process is
+ /// going to exit.
+ /// </summary>
+ internal bool IsAlive
+ {
+ get
+ {
+ return threads.Any(t => t.ThreadState != ThreadState.Stopped);
}
}
@@ -119,7 +143,7 @@ namespace Grpc.Core.Internal
var cq = completionQueues.ElementAt(cqIndex);
var thread = new Thread(new ThreadStart(() => RunHandlerLoop(cq)));
- thread.IsBackground = false;
+ thread.IsBackground = true;
thread.Name = string.Format("grpc {0} (cq {1})", threadIndex, cqIndex);
thread.Start();
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 069185e13a..ae7a8c9a9a 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -67,11 +67,19 @@ namespace Grpc.Core
bool startRequested;
volatile bool shutdownRequested;
+
+ /// <summary>
+ /// Creates a new server.
+ /// </summary>
+ public Server() : this(null)
+ {
+ }
+
/// <summary>
- /// Create a new server.
+ /// Creates a new server.
/// </summary>
/// <param name="options">Channel options.</param>
- public Server(IEnumerable<ChannelOption> options = null)
+ public Server(IEnumerable<ChannelOption> options)
{
this.serviceDefinitions = new ServiceDefinitionCollection(this);
this.ports = new ServerPortCollection(this);
@@ -86,6 +94,7 @@ namespace Grpc.Core
{
this.handle.RegisterCompletionQueue(cq);
}
+ GrpcEnvironment.RegisterServer(this);
}
/// <summary>
@@ -152,43 +161,24 @@ namespace Grpc.Core
/// cleans up used resources. The returned task finishes when shutdown procedure
/// is complete.
/// </summary>
- public async Task ShutdownAsync()
+ /// <remarks>
+ /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
+ /// </remarks>
+ public Task ShutdownAsync()
{
- lock (myLock)
- {
- GrpcPreconditions.CheckState(startRequested);
- GrpcPreconditions.CheckState(!shutdownRequested);
- shutdownRequested = true;
- }
-
- var cq = environment.CompletionQueues.First(); // any cq will do
- handle.ShutdownAndNotify(HandleServerShutdown, cq);
- await shutdownTcs.Task.ConfigureAwait(false);
- DisposeHandle();
-
- await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
+ return ShutdownInternalAsync(false);
}
/// <summary>
/// Requests server shutdown while cancelling all the in-progress calls.
/// The returned task finishes when shutdown procedure is complete.
/// </summary>
- public async Task KillAsync()
+ /// <remarks>
+ /// It is strongly recommended to shutdown all previously created servers before exiting from the process.
+ /// </remarks>
+ public Task KillAsync()
{
- lock (myLock)
- {
- GrpcPreconditions.CheckState(startRequested);
- GrpcPreconditions.CheckState(!shutdownRequested);
- shutdownRequested = true;
- }
-
- var cq = environment.CompletionQueues.First(); // any cq will do
- handle.ShutdownAndNotify(HandleServerShutdown, cq);
- handle.CancelAllCalls();
- await shutdownTcs.Task.ConfigureAwait(false);
- DisposeHandle();
-
- await Task.Run(() => GrpcEnvironment.Release()).ConfigureAwait(false);
+ return ShutdownInternalAsync(true);
}
internal void AddCallReference(object call)
@@ -207,6 +197,53 @@ namespace Grpc.Core
}
/// <summary>
+ /// Shuts down the server.
+ /// </summary>
+ private async Task ShutdownInternalAsync(bool kill)
+ {
+ lock (myLock)
+ {
+ GrpcPreconditions.CheckState(startRequested);
+ GrpcPreconditions.CheckState(!shutdownRequested);
+ shutdownRequested = true;
+ }
+ GrpcEnvironment.UnregisterServer(this);
+
+ var cq = environment.CompletionQueues.First(); // any cq will do
+ handle.ShutdownAndNotify(HandleServerShutdown, cq);
+ if (kill)
+ {
+ handle.CancelAllCalls();
+ }
+
+ await ShutdownCompleteOrEnvironmentDeadAsync().ConfigureAwait(false);
+
+ DisposeHandle();
+
+ await GrpcEnvironment.ReleaseAsync().ConfigureAwait(false);
+ }
+
+ /// <summary>
+ /// In case the environment's threadpool becomes dead, the shutdown completion will
+ /// never be delivered, but we need to release the environment's handle anyway.
+ /// </summary>
+ private async Task ShutdownCompleteOrEnvironmentDeadAsync()
+ {
+ while (true)
+ {
+ var task = await Task.WhenAny(shutdownTcs.Task, Task.Delay(20)).ConfigureAwait(false);
+ if (shutdownTcs.Task == task)
+ {
+ return;
+ }
+ if (!environment.IsAlive)
+ {
+ return;
+ }
+ }
+ }
+
+ /// <summary>
/// Adds a service definition.
/// </summary>
private void AddServiceDefinitionInternal(ServerServiceDefinition serviceDefinition)
diff --git a/src/csharp/Grpc.Core/ServerServiceDefinition.cs b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
index deb1431ca3..ac08c04bf6 100644
--- a/src/csharp/Grpc.Core/ServerServiceDefinition.cs
+++ b/src/csharp/Grpc.Core/ServerServiceDefinition.cs
@@ -63,11 +63,10 @@ namespace Grpc.Core
/// <summary>
/// Creates a new builder object for <c>ServerServiceDefinition</c>.
/// </summary>
- /// <param name="serviceName">The service name.</param>
/// <returns>The builder object.</returns>
- public static Builder CreateBuilder(string serviceName)
+ public static Builder CreateBuilder()
{
- return new Builder(serviceName);
+ return new Builder();
}
/// <summary>
@@ -75,16 +74,13 @@ namespace Grpc.Core
/// </summary>
public class Builder
{
- readonly string serviceName;
readonly Dictionary<string, IServerCallHandler> callHandlers = new Dictionary<string, IServerCallHandler>();
/// <summary>
/// Creates a new instance of builder.
/// </summary>
- /// <param name="serviceName">The service name.</param>
- public Builder(string serviceName)
+ public Builder()
{
- this.serviceName = serviceName;
}
/// <summary>
diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs
index d700a18778..4bbefcbe01 100644
--- a/src/csharp/Grpc.Examples/MathGrpc.cs
+++ b/src/csharp/Grpc.Examples/MathGrpc.cs
@@ -81,103 +81,12 @@ namespace Math {
get { return global::Math.MathReflection.Descriptor.Services[0]; }
}
- /// <summary>Client for Math</summary>
- [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")]
- public interface IMathClient
- {
- /// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
- /// </summary>
- global::Math.DivReply Div(global::Math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
- /// </summary>
- global::Math.DivReply Div(global::Math.DivArgs request, CallOptions options);
- /// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
- /// </summary>
- AsyncUnaryCall<global::Math.DivReply> DivAsync(global::Math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
- /// </summary>
- AsyncUnaryCall<global::Math.DivReply> DivAsync(global::Math.DivArgs request, CallOptions options);
- /// <summary>
- /// DivMany accepts an arbitrary number of division args from the client stream
- /// and sends back the results in the reply stream. The stream continues until
- /// the client closes its end; the server does the same after sending all the
- /// replies. The stream ends immediately if either end aborts.
- /// </summary>
- AsyncDuplexStreamingCall<global::Math.DivArgs, global::Math.DivReply> DivMany(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// DivMany accepts an arbitrary number of division args from the client stream
- /// and sends back the results in the reply stream. The stream continues until
- /// the client closes its end; the server does the same after sending all the
- /// replies. The stream ends immediately if either end aborts.
- /// </summary>
- AsyncDuplexStreamingCall<global::Math.DivArgs, global::Math.DivReply> DivMany(CallOptions options);
- /// <summary>
- /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
- /// generates up to limit numbers; otherwise it continues until the call is
- /// canceled. Unlike Fib above, Fib has no final FibReply.
- /// </summary>
- AsyncServerStreamingCall<global::Math.Num> Fib(global::Math.FibArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
- /// generates up to limit numbers; otherwise it continues until the call is
- /// canceled. Unlike Fib above, Fib has no final FibReply.
- /// </summary>
- AsyncServerStreamingCall<global::Math.Num> Fib(global::Math.FibArgs request, CallOptions options);
- /// <summary>
- /// Sum sums a stream of numbers, returning the final result once the stream
- /// is closed.
- /// </summary>
- AsyncClientStreamingCall<global::Math.Num, global::Math.Num> Sum(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Sum sums a stream of numbers, returning the final result once the stream
- /// is closed.
- /// </summary>
- AsyncClientStreamingCall<global::Math.Num, global::Math.Num> Sum(CallOptions options);
- }
-
- /// <summary>Interface of server-side implementations of Math</summary>
- [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
- public interface IMath
- {
- /// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
- /// </summary>
- global::System.Threading.Tasks.Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context);
- /// <summary>
- /// DivMany accepts an arbitrary number of division args from the client stream
- /// and sends back the results in the reply stream. The stream continues until
- /// the client closes its end; the server does the same after sending all the
- /// replies. The stream ends immediately if either end aborts.
- /// </summary>
- global::System.Threading.Tasks.Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context);
- /// <summary>
- /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
- /// generates up to limit numbers; otherwise it continues until the call is
- /// canceled. Unlike Fib above, Fib has no final FibReply.
- /// </summary>
- global::System.Threading.Tasks.Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context);
- /// <summary>
- /// Sum sums a stream of numbers, returning the final result once the stream
- /// is closed.
- /// </summary>
- global::System.Threading.Tasks.Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context);
- }
-
/// <summary>Base class for server-side implementations of Math</summary>
public abstract class MathBase
{
/// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
+ /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient
+ /// and remainder.
/// </summary>
public virtual global::System.Threading.Tasks.Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context)
{
@@ -196,7 +105,7 @@ namespace Math {
}
/// <summary>
- /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
+ /// Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib
/// generates up to limit numbers; otherwise it continues until the call is
/// canceled. Unlike Fib above, Fib has no final FibReply.
/// </summary>
@@ -217,9 +126,7 @@ namespace Math {
}
/// <summary>Client for Math</summary>
- #pragma warning disable 0618
- public class MathClient : ClientBase<MathClient>, IMathClient
- #pragma warning restore 0618
+ public class MathClient : ClientBase<MathClient>
{
public MathClient(Channel channel) : base(channel)
{
@@ -237,32 +144,32 @@ namespace Math {
}
/// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
+ /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient
+ /// and remainder.
/// </summary>
public virtual global::Math.DivReply Div(global::Math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
return Div(request, new CallOptions(headers, deadline, cancellationToken));
}
/// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
+ /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient
+ /// and remainder.
/// </summary>
public virtual global::Math.DivReply Div(global::Math.DivArgs request, CallOptions options)
{
return CallInvoker.BlockingUnaryCall(__Method_Div, null, options, request);
}
/// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
+ /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient
+ /// and remainder.
/// </summary>
public virtual AsyncUnaryCall<global::Math.DivReply> DivAsync(global::Math.DivArgs request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken))
{
return DivAsync(request, new CallOptions(headers, deadline, cancellationToken));
}
/// <summary>
- /// Div divides args.dividend by args.divisor and returns the quotient and
- /// remainder.
+ /// Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient
+ /// and remainder.
/// </summary>
public virtual AsyncUnaryCall<global::Math.DivReply> DivAsync(global::Math.DivArgs request, CallOptions options)
{
@@ -289,7 +196,7 @@ namespace Math {
return CallInvoker.AsyncDuplexStreamingCall(__Method_DivMany, null, options);
}
/// <summary>
- /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
+ /// Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib
/// generates up to limit numbers; otherwise it continues until the call is
/// canceled. Unlike Fib above, Fib has no final FibReply.
/// </summary>
@@ -298,7 +205,7 @@ namespace Math {
return Fib(request, new CallOptions(headers, deadline, cancellationToken));
}
/// <summary>
- /// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
+ /// Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib
/// generates up to limit numbers; otherwise it continues until the call is
/// canceled. Unlike Fib above, Fib has no final FibReply.
/// </summary>
@@ -335,23 +242,9 @@ namespace Math {
}
/// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
- public static ServerServiceDefinition BindService(IMath serviceImpl)
- #pragma warning restore 0618
- {
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
- .AddMethod(__Method_Div, serviceImpl.Div)
- .AddMethod(__Method_DivMany, serviceImpl.DivMany)
- .AddMethod(__Method_Fib, serviceImpl.Fib)
- .AddMethod(__Method_Sum, serviceImpl.Sum).Build();
- }
-
- /// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
public static ServerServiceDefinition BindService(MathBase serviceImpl)
- #pragma warning restore 0618
{
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
+ return ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_Div, serviceImpl.Div)
.AddMethod(__Method_DivMany, serviceImpl.DivMany)
.AddMethod(__Method_Fib, serviceImpl.Fib)
diff --git a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs
index 51c6a39b1d..d0ade7d02b 100644
--- a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs
+++ b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs
@@ -58,23 +58,6 @@ namespace Grpc.Health.V1 {
get { return global::Grpc.Health.V1.HealthReflection.Descriptor.Services[0]; }
}
- /// <summary>Client for Health</summary>
- [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")]
- public interface IHealthClient
- {
- global::Grpc.Health.V1.HealthCheckResponse Check(global::Grpc.Health.V1.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- global::Grpc.Health.V1.HealthCheckResponse Check(global::Grpc.Health.V1.HealthCheckRequest request, CallOptions options);
- AsyncUnaryCall<global::Grpc.Health.V1.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1.HealthCheckRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- AsyncUnaryCall<global::Grpc.Health.V1.HealthCheckResponse> CheckAsync(global::Grpc.Health.V1.HealthCheckRequest request, CallOptions options);
- }
-
- /// <summary>Interface of server-side implementations of Health</summary>
- [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
- public interface IHealth
- {
- global::System.Threading.Tasks.Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context);
- }
-
/// <summary>Base class for server-side implementations of Health</summary>
public abstract class HealthBase
{
@@ -86,9 +69,7 @@ namespace Grpc.Health.V1 {
}
/// <summary>Client for Health</summary>
- #pragma warning disable 0618
- public class HealthClient : ClientBase<HealthClient>, IHealthClient
- #pragma warning restore 0618
+ public class HealthClient : ClientBase<HealthClient>
{
public HealthClient(Channel channel) : base(channel)
{
@@ -134,20 +115,9 @@ namespace Grpc.Health.V1 {
}
/// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
- public static ServerServiceDefinition BindService(IHealth serviceImpl)
- #pragma warning restore 0618
- {
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
- .AddMethod(__Method_Check, serviceImpl.Check).Build();
- }
-
- /// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
public static ServerServiceDefinition BindService(HealthBase serviceImpl)
- #pragma warning restore 0618
{
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
+ return ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_Check, serviceImpl.Check).Build();
}
diff --git a/src/csharp/Grpc.IntegrationTesting/GenericService.cs b/src/csharp/Grpc.IntegrationTesting/GenericService.cs
index c6128264ac..53fa1ee5f6 100644
--- a/src/csharp/Grpc.IntegrationTesting/GenericService.cs
+++ b/src/csharp/Grpc.IntegrationTesting/GenericService.cs
@@ -64,7 +64,7 @@ namespace Grpc.IntegrationTesting
public static ServerServiceDefinition BindHandler(DuplexStreamingServerMethod<byte[], byte[]> handler)
{
- return ServerServiceDefinition.CreateBuilder(StreamingCallMethod.ServiceName)
+ return ServerServiceDefinition.CreateBuilder()
.AddMethod(StreamingCallMethod, handler).Build();
}
}
diff --git a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs
index 9d31d1c514..22bd27ec0a 100644
--- a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs
@@ -72,53 +72,6 @@ namespace Grpc.Testing {
get { return global::Grpc.Testing.MetricsReflection.Descriptor.Services[0]; }
}
- /// <summary>Client for MetricsService</summary>
- [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")]
- public interface IMetricsServiceClient
- {
- /// <summary>
- /// Returns the values of all the gauges that are currently being maintained by
- /// the service
- /// </summary>
- AsyncServerStreamingCall<global::Grpc.Testing.GaugeResponse> GetAllGauges(global::Grpc.Testing.EmptyMessage request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Returns the values of all the gauges that are currently being maintained by
- /// the service
- /// </summary>
- AsyncServerStreamingCall<global::Grpc.Testing.GaugeResponse> GetAllGauges(global::Grpc.Testing.EmptyMessage request, CallOptions options);
- /// <summary>
- /// Returns the value of one gauge
- /// </summary>
- global::Grpc.Testing.GaugeResponse GetGauge(global::Grpc.Testing.GaugeRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Returns the value of one gauge
- /// </summary>
- global::Grpc.Testing.GaugeResponse GetGauge(global::Grpc.Testing.GaugeRequest request, CallOptions options);
- /// <summary>
- /// Returns the value of one gauge
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.GaugeResponse> GetGaugeAsync(global::Grpc.Testing.GaugeRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Returns the value of one gauge
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.GaugeResponse> GetGaugeAsync(global::Grpc.Testing.GaugeRequest request, CallOptions options);
- }
-
- /// <summary>Interface of server-side implementations of MetricsService</summary>
- [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
- public interface IMetricsService
- {
- /// <summary>
- /// Returns the values of all the gauges that are currently being maintained by
- /// the service
- /// </summary>
- global::System.Threading.Tasks.Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context);
- /// <summary>
- /// Returns the value of one gauge
- /// </summary>
- global::System.Threading.Tasks.Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context);
- }
-
/// <summary>Base class for server-side implementations of MetricsService</summary>
public abstract class MetricsServiceBase
{
@@ -142,9 +95,7 @@ namespace Grpc.Testing {
}
/// <summary>Client for MetricsService</summary>
- #pragma warning disable 0618
- public class MetricsServiceClient : ClientBase<MetricsServiceClient>, IMetricsServiceClient
- #pragma warning restore 0618
+ public class MetricsServiceClient : ClientBase<MetricsServiceClient>
{
public MetricsServiceClient(Channel channel) : base(channel)
{
@@ -218,21 +169,9 @@ namespace Grpc.Testing {
}
/// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
- public static ServerServiceDefinition BindService(IMetricsService serviceImpl)
- #pragma warning restore 0618
- {
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
- .AddMethod(__Method_GetAllGauges, serviceImpl.GetAllGauges)
- .AddMethod(__Method_GetGauge, serviceImpl.GetGauge).Build();
- }
-
- /// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
public static ServerServiceDefinition BindService(MetricsServiceBase serviceImpl)
- #pragma warning restore 0618
{
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
+ return ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_GetAllGauges, serviceImpl.GetAllGauges)
.AddMethod(__Method_GetGauge, serviceImpl.GetGauge).Build();
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs b/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs
index f7071ebf6b..9c99296115 100644
--- a/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs
@@ -67,58 +67,6 @@ namespace Grpc.Testing {
get { return global::Grpc.Testing.ServicesReflection.Descriptor.Services[0]; }
}
- /// <summary>Client for BenchmarkService</summary>
- [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")]
- public interface IBenchmarkServiceClient
- {
- /// <summary>
- /// One request followed by one response.
- /// The server returns the client payload as-is.
- /// </summary>
- global::Grpc.Testing.SimpleResponse UnaryCall(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// One request followed by one response.
- /// The server returns the client payload as-is.
- /// </summary>
- global::Grpc.Testing.SimpleResponse UnaryCall(global::Grpc.Testing.SimpleRequest request, CallOptions options);
- /// <summary>
- /// One request followed by one response.
- /// The server returns the client payload as-is.
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> UnaryCallAsync(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// One request followed by one response.
- /// The server returns the client payload as-is.
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> UnaryCallAsync(global::Grpc.Testing.SimpleRequest request, CallOptions options);
- /// <summary>
- /// One request followed by one response.
- /// The server returns the client payload as-is.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> StreamingCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// One request followed by one response.
- /// The server returns the client payload as-is.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> StreamingCall(CallOptions options);
- }
-
- /// <summary>Interface of server-side implementations of BenchmarkService</summary>
- [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
- public interface IBenchmarkService
- {
- /// <summary>
- /// One request followed by one response.
- /// The server returns the client payload as-is.
- /// </summary>
- global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context);
- /// <summary>
- /// One request followed by one response.
- /// The server returns the client payload as-is.
- /// </summary>
- global::System.Threading.Tasks.Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context);
- }
-
/// <summary>Base class for server-side implementations of BenchmarkService</summary>
public abstract class BenchmarkServiceBase
{
@@ -143,9 +91,7 @@ namespace Grpc.Testing {
}
/// <summary>Client for BenchmarkService</summary>
- #pragma warning disable 0618
- public class BenchmarkServiceClient : ClientBase<BenchmarkServiceClient>, IBenchmarkServiceClient
- #pragma warning restore 0618
+ public class BenchmarkServiceClient : ClientBase<BenchmarkServiceClient>
{
public BenchmarkServiceClient(Channel channel) : base(channel)
{
@@ -223,21 +169,9 @@ namespace Grpc.Testing {
}
/// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
- public static ServerServiceDefinition BindService(IBenchmarkService serviceImpl)
- #pragma warning restore 0618
- {
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
- .AddMethod(__Method_UnaryCall, serviceImpl.UnaryCall)
- .AddMethod(__Method_StreamingCall, serviceImpl.StreamingCall).Build();
- }
-
- /// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
public static ServerServiceDefinition BindService(BenchmarkServiceBase serviceImpl)
- #pragma warning restore 0618
{
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
+ return ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_UnaryCall, serviceImpl.UnaryCall)
.AddMethod(__Method_StreamingCall, serviceImpl.StreamingCall).Build();
}
@@ -289,112 +223,6 @@ namespace Grpc.Testing {
get { return global::Grpc.Testing.ServicesReflection.Descriptor.Services[1]; }
}
- /// <summary>Client for WorkerService</summary>
- [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")]
- public interface IWorkerServiceClient
- {
- /// <summary>
- /// Start server with specified workload.
- /// First request sent specifies the ServerConfig followed by ServerStatus
- /// response. After that, a "Mark" can be sent anytime to request the latest
- /// stats. Closing the stream will initiate shutdown of the test server
- /// and once the shutdown has finished, the OK status is sent to terminate
- /// this RPC.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.ServerArgs, global::Grpc.Testing.ServerStatus> RunServer(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Start server with specified workload.
- /// First request sent specifies the ServerConfig followed by ServerStatus
- /// response. After that, a "Mark" can be sent anytime to request the latest
- /// stats. Closing the stream will initiate shutdown of the test server
- /// and once the shutdown has finished, the OK status is sent to terminate
- /// this RPC.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.ServerArgs, global::Grpc.Testing.ServerStatus> RunServer(CallOptions options);
- /// <summary>
- /// Start client with specified workload.
- /// First request sent specifies the ClientConfig followed by ClientStatus
- /// response. After that, a "Mark" can be sent anytime to request the latest
- /// stats. Closing the stream will initiate shutdown of the test client
- /// and once the shutdown has finished, the OK status is sent to terminate
- /// this RPC.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.ClientArgs, global::Grpc.Testing.ClientStatus> RunClient(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Start client with specified workload.
- /// First request sent specifies the ClientConfig followed by ClientStatus
- /// response. After that, a "Mark" can be sent anytime to request the latest
- /// stats. Closing the stream will initiate shutdown of the test client
- /// and once the shutdown has finished, the OK status is sent to terminate
- /// this RPC.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.ClientArgs, global::Grpc.Testing.ClientStatus> RunClient(CallOptions options);
- /// <summary>
- /// Just return the core count - unary call
- /// </summary>
- global::Grpc.Testing.CoreResponse CoreCount(global::Grpc.Testing.CoreRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Just return the core count - unary call
- /// </summary>
- global::Grpc.Testing.CoreResponse CoreCount(global::Grpc.Testing.CoreRequest request, CallOptions options);
- /// <summary>
- /// Just return the core count - unary call
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.CoreResponse> CoreCountAsync(global::Grpc.Testing.CoreRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Just return the core count - unary call
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.CoreResponse> CoreCountAsync(global::Grpc.Testing.CoreRequest request, CallOptions options);
- /// <summary>
- /// Quit this worker
- /// </summary>
- global::Grpc.Testing.Void QuitWorker(global::Grpc.Testing.Void request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Quit this worker
- /// </summary>
- global::Grpc.Testing.Void QuitWorker(global::Grpc.Testing.Void request, CallOptions options);
- /// <summary>
- /// Quit this worker
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.Void> QuitWorkerAsync(global::Grpc.Testing.Void request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// Quit this worker
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.Void> QuitWorkerAsync(global::Grpc.Testing.Void request, CallOptions options);
- }
-
- /// <summary>Interface of server-side implementations of WorkerService</summary>
- [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
- public interface IWorkerService
- {
- /// <summary>
- /// Start server with specified workload.
- /// First request sent specifies the ServerConfig followed by ServerStatus
- /// response. After that, a "Mark" can be sent anytime to request the latest
- /// stats. Closing the stream will initiate shutdown of the test server
- /// and once the shutdown has finished, the OK status is sent to terminate
- /// this RPC.
- /// </summary>
- global::System.Threading.Tasks.Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context);
- /// <summary>
- /// Start client with specified workload.
- /// First request sent specifies the ClientConfig followed by ClientStatus
- /// response. After that, a "Mark" can be sent anytime to request the latest
- /// stats. Closing the stream will initiate shutdown of the test client
- /// and once the shutdown has finished, the OK status is sent to terminate
- /// this RPC.
- /// </summary>
- global::System.Threading.Tasks.Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context);
- /// <summary>
- /// Just return the core count - unary call
- /// </summary>
- global::System.Threading.Tasks.Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context);
- /// <summary>
- /// Quit this worker
- /// </summary>
- global::System.Threading.Tasks.Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context);
- }
-
/// <summary>Base class for server-side implementations of WorkerService</summary>
public abstract class WorkerServiceBase
{
@@ -443,9 +271,7 @@ namespace Grpc.Testing {
}
/// <summary>Client for WorkerService</summary>
- #pragma warning disable 0618
- public class WorkerServiceClient : ClientBase<WorkerServiceClient>, IWorkerServiceClient
- #pragma warning restore 0618
+ public class WorkerServiceClient : ClientBase<WorkerServiceClient>
{
public WorkerServiceClient(Channel channel) : base(channel)
{
@@ -579,23 +405,9 @@ namespace Grpc.Testing {
}
/// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
- public static ServerServiceDefinition BindService(IWorkerService serviceImpl)
- #pragma warning restore 0618
- {
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
- .AddMethod(__Method_RunServer, serviceImpl.RunServer)
- .AddMethod(__Method_RunClient, serviceImpl.RunClient)
- .AddMethod(__Method_CoreCount, serviceImpl.CoreCount)
- .AddMethod(__Method_QuitWorker, serviceImpl.QuitWorker).Build();
- }
-
- /// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
public static ServerServiceDefinition BindService(WorkerServiceBase serviceImpl)
- #pragma warning restore 0618
{
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
+ return ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_RunServer, serviceImpl.RunServer)
.AddMethod(__Method_RunClient, serviceImpl.RunClient)
.AddMethod(__Method_CoreCount, serviceImpl.CoreCount)
diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
index cf43a77118..6c252013f8 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
@@ -105,127 +105,6 @@ namespace Grpc.Testing {
get { return global::Grpc.Testing.TestReflection.Descriptor.Services[0]; }
}
- /// <summary>Client for TestService</summary>
- [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")]
- public interface ITestServiceClient
- {
- /// <summary>
- /// One empty request followed by one empty response.
- /// </summary>
- global::Grpc.Testing.Empty EmptyCall(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// One empty request followed by one empty response.
- /// </summary>
- global::Grpc.Testing.Empty EmptyCall(global::Grpc.Testing.Empty request, CallOptions options);
- /// <summary>
- /// One empty request followed by one empty response.
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.Empty> EmptyCallAsync(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// One empty request followed by one empty response.
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.Empty> EmptyCallAsync(global::Grpc.Testing.Empty request, CallOptions options);
- /// <summary>
- /// One request followed by one response.
- /// </summary>
- global::Grpc.Testing.SimpleResponse UnaryCall(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// One request followed by one response.
- /// </summary>
- global::Grpc.Testing.SimpleResponse UnaryCall(global::Grpc.Testing.SimpleRequest request, CallOptions options);
- /// <summary>
- /// One request followed by one response.
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> UnaryCallAsync(global::Grpc.Testing.SimpleRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// One request followed by one response.
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.SimpleResponse> UnaryCallAsync(global::Grpc.Testing.SimpleRequest request, CallOptions options);
- /// <summary>
- /// One request followed by a sequence of responses (streamed download).
- /// The server returns the payload with client desired type and sizes.
- /// </summary>
- AsyncServerStreamingCall<global::Grpc.Testing.StreamingOutputCallResponse> StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// One request followed by a sequence of responses (streamed download).
- /// The server returns the payload with client desired type and sizes.
- /// </summary>
- AsyncServerStreamingCall<global::Grpc.Testing.StreamingOutputCallResponse> StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, CallOptions options);
- /// <summary>
- /// A sequence of requests followed by one response (streamed upload).
- /// The server returns the aggregated size of client payload as the result.
- /// </summary>
- AsyncClientStreamingCall<global::Grpc.Testing.StreamingInputCallRequest, global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// A sequence of requests followed by one response (streamed upload).
- /// The server returns the aggregated size of client payload as the result.
- /// </summary>
- AsyncClientStreamingCall<global::Grpc.Testing.StreamingInputCallRequest, global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(CallOptions options);
- /// <summary>
- /// A sequence of requests with each request served by the server immediately.
- /// As one request could lead to multiple responses, this interface
- /// demonstrates the idea of full duplexing.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> FullDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// A sequence of requests with each request served by the server immediately.
- /// As one request could lead to multiple responses, this interface
- /// demonstrates the idea of full duplexing.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> FullDuplexCall(CallOptions options);
- /// <summary>
- /// A sequence of requests followed by a sequence of responses.
- /// The server buffers all the client requests and then serves them in order. A
- /// stream of responses are returned to the client when the server starts with
- /// first request.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> HalfDuplexCall(Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// A sequence of requests followed by a sequence of responses.
- /// The server buffers all the client requests and then serves them in order. A
- /// stream of responses are returned to the client when the server starts with
- /// first request.
- /// </summary>
- AsyncDuplexStreamingCall<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> HalfDuplexCall(CallOptions options);
- }
-
- /// <summary>Interface of server-side implementations of TestService</summary>
- [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
- public interface ITestService
- {
- /// <summary>
- /// One empty request followed by one empty response.
- /// </summary>
- global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context);
- /// <summary>
- /// One request followed by one response.
- /// </summary>
- global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context);
- /// <summary>
- /// One request followed by a sequence of responses (streamed download).
- /// The server returns the payload with client desired type and sizes.
- /// </summary>
- global::System.Threading.Tasks.Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context);
- /// <summary>
- /// A sequence of requests followed by one response (streamed upload).
- /// The server returns the aggregated size of client payload as the result.
- /// </summary>
- global::System.Threading.Tasks.Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context);
- /// <summary>
- /// A sequence of requests with each request served by the server immediately.
- /// As one request could lead to multiple responses, this interface
- /// demonstrates the idea of full duplexing.
- /// </summary>
- global::System.Threading.Tasks.Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context);
- /// <summary>
- /// A sequence of requests followed by a sequence of responses.
- /// The server buffers all the client requests and then serves them in order. A
- /// stream of responses are returned to the client when the server starts with
- /// first request.
- /// </summary>
- global::System.Threading.Tasks.Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context);
- }
-
/// <summary>Base class for server-side implementations of TestService</summary>
public abstract class TestServiceBase
{
@@ -287,9 +166,7 @@ namespace Grpc.Testing {
}
/// <summary>Client for TestService</summary>
- #pragma warning disable 0618
- public class TestServiceClient : ClientBase<TestServiceClient>, ITestServiceClient
- #pragma warning restore 0618
+ public class TestServiceClient : ClientBase<TestServiceClient>
{
public TestServiceClient(Channel channel) : base(channel)
{
@@ -445,25 +322,9 @@ namespace Grpc.Testing {
}
/// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
- public static ServerServiceDefinition BindService(ITestService serviceImpl)
- #pragma warning restore 0618
- {
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
- .AddMethod(__Method_EmptyCall, serviceImpl.EmptyCall)
- .AddMethod(__Method_UnaryCall, serviceImpl.UnaryCall)
- .AddMethod(__Method_StreamingOutputCall, serviceImpl.StreamingOutputCall)
- .AddMethod(__Method_StreamingInputCall, serviceImpl.StreamingInputCall)
- .AddMethod(__Method_FullDuplexCall, serviceImpl.FullDuplexCall)
- .AddMethod(__Method_HalfDuplexCall, serviceImpl.HalfDuplexCall).Build();
- }
-
- /// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
public static ServerServiceDefinition BindService(TestServiceBase serviceImpl)
- #pragma warning restore 0618
{
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
+ return ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_EmptyCall, serviceImpl.EmptyCall)
.AddMethod(__Method_UnaryCall, serviceImpl.UnaryCall)
.AddMethod(__Method_StreamingOutputCall, serviceImpl.StreamingOutputCall)
@@ -496,38 +357,6 @@ namespace Grpc.Testing {
get { return global::Grpc.Testing.TestReflection.Descriptor.Services[1]; }
}
- /// <summary>Client for UnimplementedService</summary>
- [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")]
- public interface IUnimplementedServiceClient
- {
- /// <summary>
- /// A call that no server should implement
- /// </summary>
- global::Grpc.Testing.Empty UnimplementedCall(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// A call that no server should implement
- /// </summary>
- global::Grpc.Testing.Empty UnimplementedCall(global::Grpc.Testing.Empty request, CallOptions options);
- /// <summary>
- /// A call that no server should implement
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.Empty> UnimplementedCallAsync(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- /// <summary>
- /// A call that no server should implement
- /// </summary>
- AsyncUnaryCall<global::Grpc.Testing.Empty> UnimplementedCallAsync(global::Grpc.Testing.Empty request, CallOptions options);
- }
-
- /// <summary>Interface of server-side implementations of UnimplementedService</summary>
- [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
- public interface IUnimplementedService
- {
- /// <summary>
- /// A call that no server should implement
- /// </summary>
- global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context);
- }
-
/// <summary>Base class for server-side implementations of UnimplementedService</summary>
public abstract class UnimplementedServiceBase
{
@@ -542,9 +371,7 @@ namespace Grpc.Testing {
}
/// <summary>Client for UnimplementedService</summary>
- #pragma warning disable 0618
- public class UnimplementedServiceClient : ClientBase<UnimplementedServiceClient>, IUnimplementedServiceClient
- #pragma warning restore 0618
+ public class UnimplementedServiceClient : ClientBase<UnimplementedServiceClient>
{
public UnimplementedServiceClient(Channel channel) : base(channel)
{
@@ -602,20 +429,9 @@ namespace Grpc.Testing {
}
/// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
- public static ServerServiceDefinition BindService(IUnimplementedService serviceImpl)
- #pragma warning restore 0618
- {
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
- .AddMethod(__Method_UnimplementedCall, serviceImpl.UnimplementedCall).Build();
- }
-
- /// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
public static ServerServiceDefinition BindService(UnimplementedServiceBase serviceImpl)
- #pragma warning restore 0618
{
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
+ return ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_UnimplementedCall, serviceImpl.UnimplementedCall).Build();
}
@@ -651,28 +467,6 @@ namespace Grpc.Testing {
get { return global::Grpc.Testing.TestReflection.Descriptor.Services[2]; }
}
- /// <summary>Client for ReconnectService</summary>
- [System.Obsolete("Client side interfaced will be removed in the next release. Use client class directly.")]
- public interface IReconnectServiceClient
- {
- global::Grpc.Testing.Empty Start(global::Grpc.Testing.ReconnectParams request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- global::Grpc.Testing.Empty Start(global::Grpc.Testing.ReconnectParams request, CallOptions options);
- AsyncUnaryCall<global::Grpc.Testing.Empty> StartAsync(global::Grpc.Testing.ReconnectParams request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- AsyncUnaryCall<global::Grpc.Testing.Empty> StartAsync(global::Grpc.Testing.ReconnectParams request, CallOptions options);
- global::Grpc.Testing.ReconnectInfo Stop(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- global::Grpc.Testing.ReconnectInfo Stop(global::Grpc.Testing.Empty request, CallOptions options);
- AsyncUnaryCall<global::Grpc.Testing.ReconnectInfo> StopAsync(global::Grpc.Testing.Empty request, Metadata headers = null, DateTime? deadline = null, CancellationToken cancellationToken = default(CancellationToken));
- AsyncUnaryCall<global::Grpc.Testing.ReconnectInfo> StopAsync(global::Grpc.Testing.Empty request, CallOptions options);
- }
-
- /// <summary>Interface of server-side implementations of ReconnectService</summary>
- [System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
- public interface IReconnectService
- {
- global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context);
- global::System.Threading.Tasks.Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context);
- }
-
/// <summary>Base class for server-side implementations of ReconnectService</summary>
public abstract class ReconnectServiceBase
{
@@ -689,9 +483,7 @@ namespace Grpc.Testing {
}
/// <summary>Client for ReconnectService</summary>
- #pragma warning disable 0618
- public class ReconnectServiceClient : ClientBase<ReconnectServiceClient>, IReconnectServiceClient
- #pragma warning restore 0618
+ public class ReconnectServiceClient : ClientBase<ReconnectServiceClient>
{
public ReconnectServiceClient(Channel channel) : base(channel)
{
@@ -753,21 +545,9 @@ namespace Grpc.Testing {
}
/// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
- public static ServerServiceDefinition BindService(IReconnectService serviceImpl)
- #pragma warning restore 0618
- {
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
- .AddMethod(__Method_Start, serviceImpl.Start)
- .AddMethod(__Method_Stop, serviceImpl.Stop).Build();
- }
-
- /// <summary>Creates service definition that can be registered with a server</summary>
- #pragma warning disable 0618
public static ServerServiceDefinition BindService(ReconnectServiceBase serviceImpl)
- #pragma warning restore 0618
{
- return ServerServiceDefinition.CreateBuilder(__ServiceName)
+ return ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_Start, serviceImpl.Start)
.AddMethod(__Method_Stop, serviceImpl.Stop).Build();
}
diff --git a/src/csharp/tests.json b/src/csharp/tests.json
index f6af3408d5..7e7aee1093 100644
--- a/src/csharp/tests.json
+++ b/src/csharp/tests.json
@@ -7,6 +7,7 @@
"Grpc.Core.Internal.Tests.CompletionQueueSafeHandleTest",
"Grpc.Core.Internal.Tests.MetadataArraySafeHandleTest",
"Grpc.Core.Internal.Tests.TimespecTest",
+ "Grpc.Core.Tests.AppDomainUnloadTest",
"Grpc.Core.Tests.CallCredentialsTest",
"Grpc.Core.Tests.CallOptionsTest",
"Grpc.Core.Tests.ChannelCredentialsTest",
@@ -25,6 +26,9 @@
"Grpc.Core.Tests.ResponseHeadersTest",
"Grpc.Core.Tests.SanityTest",
"Grpc.Core.Tests.ServerTest",
+ "Grpc.Core.Tests.ShutdownHookClientTest",
+ "Grpc.Core.Tests.ShutdownHookPendingCallTest",
+ "Grpc.Core.Tests.ShutdownHookServerTest",
"Grpc.Core.Tests.ShutdownTest",
"Grpc.Core.Tests.TimeoutsTest",
"Grpc.Core.Tests.UserAgentStringTest"
diff --git a/src/objective-c/GRPCClient/GRPCCall+Cronet.h b/src/objective-c/GRPCClient/GRPCCall+Cronet.h
index 7f7fc6c2a9..2d8f7ac8fb 100644
--- a/src/objective-c/GRPCClient/GRPCCall+Cronet.h
+++ b/src/objective-c/GRPCClient/GRPCCall+Cronet.h
@@ -30,6 +30,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
+#ifdef GRPC_COMPILE_WITH_CRONET
#import <Cronet/Cronet.h>
#import "GRPCCall.h"
@@ -53,3 +54,4 @@
+(BOOL)isUsingCronet;
@end
+#endif
diff --git a/src/objective-c/GRPCClient/GRPCCall+Cronet.m b/src/objective-c/GRPCClient/GRPCCall+Cronet.m
index 69a410e95a..76ca1a2537 100644
--- a/src/objective-c/GRPCClient/GRPCCall+Cronet.m
+++ b/src/objective-c/GRPCClient/GRPCCall+Cronet.m
@@ -33,6 +33,7 @@
#import "GRPCCall+Cronet.h"
+#ifdef GRPC_COMPILE_WITH_CRONET
static BOOL useCronet = NO;
static cronet_engine *globalCronetEngine;
@@ -52,3 +53,4 @@ static cronet_engine *globalCronetEngine;
}
@end
+#endif
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.h b/src/objective-c/GRPCClient/private/GRPCChannel.h
index 3219835d02..40e78a92d6 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.h
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.h
@@ -58,9 +58,10 @@ struct grpc_channel_credentials;
/**
* Creates a secure channel to the specified @c host using Cronet as a transport mechanism.
*/
+#ifdef GRPC_COMPILE_WITH_CRONET
+ (nullable GRPCChannel *)secureCronetChannelWithHost:(NSString *)host
channelArgs:(NSDictionary *)channelArgs;
-
+#endif
/**
* Creates a secure channel to the specified @c host using the specified @c credentials and
* @c channelArgs. Only in tests should @c GRPC_SSL_TARGET_NAME_OVERRIDE_ARG channel arg be set.
diff --git a/src/objective-c/GRPCClient/private/GRPCChannel.m b/src/objective-c/GRPCClient/private/GRPCChannel.m
index e4e0dbe6d2..d3192c983d 100644
--- a/src/objective-c/GRPCClient/private/GRPCChannel.m
+++ b/src/objective-c/GRPCClient/private/GRPCChannel.m
@@ -34,13 +34,17 @@
#import "GRPCChannel.h"
#include <grpc/grpc_security.h>
+#ifdef GRPC_COMPILE_WITH_CRONET
#include <grpc/grpc_cronet.h>
+#endif
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
+#ifdef GRPC_COMPILE_WITH_CRONET
#import <Cronet/Cronet.h>
#import <GRPCClient/GRPCCall+Cronet.h>
+#endif
#import "GRPCCompletionQueue.h"
void freeChannelArgs(grpc_channel_args *channel_args) {
@@ -102,6 +106,7 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) {
grpc_channel_args *_channelArgs;
}
+#ifdef GRPC_COMPILE_WITH_CRONET
- (instancetype)initWithHost:(NSString *)host
cronetEngine:(cronet_engine *)cronetEngine
channelArgs:(NSDictionary *)channelArgs {
@@ -118,6 +123,7 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) {
return self;
}
+#endif
- (instancetype)initWithHost:(NSString *)host
secure:(BOOL)secure
@@ -152,6 +158,7 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) {
freeChannelArgs(_channelArgs);
}
+#ifdef GRPC_COMPILE_WITH_CRONET
+ (GRPCChannel *)secureCronetChannelWithHost:(NSString *)host
channelArgs:(NSDictionary *)channelArgs {
cronet_engine *engine = [GRPCCall cronetEngine];
@@ -162,6 +169,7 @@ grpc_channel_args * buildChannelArgs(NSDictionary *dictionary) {
}
return [[GRPCChannel alloc] initWithHost:host cronetEngine:engine channelArgs:channelArgs];
}
+#endif
+ (GRPCChannel *)secureChannelWithHost:(NSString *)host {
return [[GRPCChannel alloc] initWithHost:host secure:YES credentials:NULL channelArgs:NULL];
diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m
index 7da508810c..fef6385cea 100644
--- a/src/objective-c/GRPCClient/private/GRPCHost.m
+++ b/src/objective-c/GRPCClient/private/GRPCHost.m
@@ -36,8 +36,10 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#import <GRPCClient/GRPCCall.h>
+#ifdef GRPC_COMPILE_WITH_CRONET
#import <GRPCClient/GRPCCall+ChannelArg.h>
#import <GRPCClient/GRPCCall+Cronet.h>
+#endif
#import "GRPCChannel.h"
#import "GRPCCompletionQueue.h"
@@ -201,17 +203,22 @@ NS_ASSUME_NONNULL_BEGIN
- (GRPCChannel *)newChannel {
NSDictionary *args = [self channelArgs];
+#ifdef GRPC_COMPILE_WITH_CRONET
BOOL useCronet = [GRPCCall isUsingCronet];
+#endif
if (_secure) {
GRPCChannel *channel;
@synchronized(self) {
if (_channelCreds == nil) {
[self setTLSPEMRootCerts:nil withPrivateKey:nil withCertChain:nil error:nil];
}
+#ifdef GRPC_COMPILE_WITH_CRONET
if (useCronet) {
channel = [GRPCChannel secureCronetChannelWithHost:_address
channelArgs:args];
- } else {
+ } else
+#endif
+ {
channel = [GRPCChannel secureChannelWithHost:_address
credentials:_channelCreds
channelArgs:args];
diff --git a/src/proto/math/math.proto b/src/proto/math/math.proto
index 311e148c02..269c60bde8 100644
--- a/src/proto/math/math.proto
+++ b/src/proto/math/math.proto
@@ -55,8 +55,8 @@ message FibReply {
}
service Math {
- // Div divides args.dividend by args.divisor and returns the quotient and
- // remainder.
+ // Div divides DivArgs.dividend by DivArgs.divisor and returns the quotient
+ // and remainder.
rpc Div (DivArgs) returns (DivReply) {
}
@@ -67,7 +67,7 @@ service Math {
rpc DivMany (stream DivArgs) returns (stream DivReply) {
}
- // Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
+ // Fib generates numbers in the Fibonacci sequence. If FibArgs.limit > 0, Fib
// generates up to limit numbers; otherwise it continues until the call is
// canceled. Unlike Fib above, Fib has no final FibReply.
rpc Fib (FibArgs) returns (stream Num) {
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index bbf04ad03e..5ba5a4e1fd 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -947,6 +947,21 @@ def metadata_call_credentials(metadata_plugin, name=None):
metadata_plugin, effective_name))
+def access_token_call_credentials(access_token):
+ """Construct CallCredentials from an access token.
+
+ Args:
+ access_token: A string to place directly in the http request
+ authorization header, ie "Authorization: Bearer <access_token>".
+
+ Returns:
+ A CallCredentials.
+ """
+ from grpc import _auth
+ return metadata_call_credentials(
+ _auth.AccessTokenCallCredentials(access_token))
+
+
def composite_call_credentials(call_credentials, additional_call_credentials):
"""Compose two CallCredentials to make a new one.
diff --git a/src/python/grpcio/grpc/_adapter/_implementations.py b/src/python/grpcio/grpc/_adapter/_implementations.py
deleted file mode 100644
index b85f228bf6..0000000000
--- a/src/python/grpcio/grpc/_adapter/_implementations.py
+++ /dev/null
@@ -1,48 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import collections
-
-from grpc.beta import interfaces
-
-class AuthMetadataContext(collections.namedtuple(
- 'AuthMetadataContext', [
- 'service_url',
- 'method_name'
- ]), interfaces.GRPCAuthMetadataContext):
- pass
-
-
-class AuthMetadataPluginCallback(interfaces.GRPCAuthMetadataContext):
-
- def __init__(self, callback):
- self._callback = callback
-
- def __call__(self, metadata, error):
- self._callback(metadata, error)
diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py
index 00788bd4cf..48410167a0 100644
--- a/src/python/grpcio/grpc/_adapter/_low.py
+++ b/src/python/grpcio/grpc/_adapter/_low.py
@@ -30,8 +30,8 @@
import threading
from grpc import _grpcio_metadata
+from grpc import _plugin_wrapping
from grpc._cython import cygrpc
-from grpc._adapter import _implementations
from grpc._adapter import _types
_USER_AGENT = 'Python-gRPC-{}'.format(_grpcio_metadata.__version__)
@@ -57,78 +57,8 @@ def channel_credentials_ssl(
return cygrpc.channel_credentials_ssl(root_certificates, pair)
-class _WrappedCygrpcCallback(object):
-
- def __init__(self, cygrpc_callback):
- self.is_called = False
- self.error = None
- self.is_called_lock = threading.Lock()
- self.cygrpc_callback = cygrpc_callback
-
- def _invoke_failure(self, error):
- # TODO(atash) translate different Exception superclasses into different
- # status codes.
- self.cygrpc_callback(
- cygrpc.Metadata([]), cygrpc.StatusCode.internal, error.message)
-
- def _invoke_success(self, metadata):
- try:
- cygrpc_metadata = cygrpc.Metadata(
- cygrpc.Metadatum(key, value)
- for key, value in metadata)
- except Exception as error:
- self._invoke_failure(error)
- return
- self.cygrpc_callback(cygrpc_metadata, cygrpc.StatusCode.ok, '')
-
- def __call__(self, metadata, error):
- with self.is_called_lock:
- if self.is_called:
- raise RuntimeError('callback should only ever be invoked once')
- if self.error:
- self._invoke_failure(self.error)
- return
- self.is_called = True
- if error is None:
- self._invoke_success(metadata)
- else:
- self._invoke_failure(error)
-
- def notify_failure(self, error):
- with self.is_called_lock:
- if not self.is_called:
- self.error = error
-
-
-class _WrappedPlugin(object):
-
- def __init__(self, plugin):
- self.plugin = plugin
-
- def __call__(self, context, cygrpc_callback):
- wrapped_cygrpc_callback = _WrappedCygrpcCallback(cygrpc_callback)
- wrapped_context = _implementations.AuthMetadataContext(context.service_url,
- context.method_name)
- try:
- self.plugin(
- wrapped_context,
- _implementations.AuthMetadataPluginCallback(wrapped_cygrpc_callback))
- except Exception as error:
- wrapped_cygrpc_callback.notify_failure(error)
- raise
-
-
-def call_credentials_metadata_plugin(plugin, name):
- """
- Args:
- plugin: A callable accepting a _types.AuthMetadataContext
- object and a callback (itself accepting a list of metadata key/value
- 2-tuples and a None-able exception value). The callback must be eventually
- called, but need not be called in plugin's invocation.
- plugin's invocation must be non-blocking.
- """
- return cygrpc.call_credentials_metadata_plugin(
- cygrpc.CredentialsMetadataPlugin(_WrappedPlugin(plugin), name))
+call_credentials_metadata_plugin = (
+ _plugin_wrapping.call_credentials_metadata_plugin)
class CompletionQueue(_types.CompletionQueue):
diff --git a/src/python/grpcio/grpc/beta/_auth.py b/src/python/grpcio/grpc/_auth.py
index 553d4b9991..3ae00ca23a 100644
--- a/src/python/grpcio/grpc/beta/_auth.py
+++ b/src/python/grpcio/grpc/_auth.py
@@ -31,7 +31,7 @@
from concurrent import futures
-from grpc.beta import interfaces
+import grpc
def _sign_request(callback, token, error):
@@ -39,7 +39,7 @@ def _sign_request(callback, token, error):
callback(metadata, error)
-class GoogleCallCredentials(interfaces.GRPCAuthMetadataPlugin):
+class GoogleCallCredentials(grpc.AuthMetadataPlugin):
"""Metadata wrapper for GoogleCredentials from the oauth2client library."""
def __init__(self, credentials):
@@ -63,7 +63,7 @@ class GoogleCallCredentials(interfaces.GRPCAuthMetadataPlugin):
self._pool.shutdown(wait=False)
-class AccessTokenCallCredentials(interfaces.GRPCAuthMetadataPlugin):
+class AccessTokenCallCredentials(grpc.AuthMetadataPlugin):
"""Metadata wrapper for raw access token credentials."""
def __init__(self, access_token):
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c
index 5d09329680..c80ee66c06 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.c
+++ b/src/python/grpcio/grpc/_cython/imports.generated.c
@@ -126,7 +126,6 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
-grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -398,7 +397,6 @@ void pygrpc_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
- grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h
index cfadad0527..7b8e98d9bf 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.h
+++ b/src/python/grpcio/grpc/_cython/imports.generated.h
@@ -43,7 +43,6 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
-#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -329,9 +328,6 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
-typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
-extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
-#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index c65070f1b3..aae9f48ae6 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -65,12 +65,23 @@ def _serialized_request(request_event):
return request_event.batch_operations[0].received_message.bytes()
-def _code(state):
+def _application_code(code):
+ cygrpc_code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(code)
+ return cygrpc.StatusCode.unknown if cygrpc_code is None else cygrpc_code
+
+
+def _completion_code(state):
if state.code is None:
return cygrpc.StatusCode.ok
else:
- code = _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE.get(state.code)
- return cygrpc.StatusCode.unknown if code is None else code
+ return _application_code(state.code)
+
+
+def _abortion_code(state, code):
+ if state.code is None:
+ return code
+ else:
+ return _application_code(state.code)
def _details(state):
@@ -126,20 +137,22 @@ def _send_status_from_server(state, token):
def _abort(state, call, code, details):
if state.client is not _CANCELLED:
+ effective_code = _abortion_code(state, code)
+ effective_details = details if state.details is None else state.details
if state.initial_metadata_allowed:
operations = (
cygrpc.operation_send_initial_metadata(
_EMPTY_METADATA, _EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
- _common.metadata(state.trailing_metadata), code, details,
- _EMPTY_FLAGS),
+ _common.metadata(state.trailing_metadata), effective_code,
+ effective_details, _EMPTY_FLAGS),
)
token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
else:
operations = (
cygrpc.operation_send_status_from_server(
- _common.metadata(state.trailing_metadata), code, details,
- _EMPTY_FLAGS),
+ _common.metadata(state.trailing_metadata), effective_code,
+ effective_details, _EMPTY_FLAGS),
)
token = _SEND_STATUS_FROM_SERVER_TOKEN
call.start_batch(
@@ -346,7 +359,7 @@ def _unary_request(rpc_event, state, request_deserializer):
def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
context = _Context(rpc_event, state, request_deserializer)
try:
- return behavior(argument, context)
+ return behavior(argument, context), True
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
@@ -354,7 +367,7 @@ def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
logging.exception(details)
_abort(
state, rpc_event.operation_call, cygrpc.StatusCode.unknown, details)
- return None
+ return None, False
def _take_response_from_response_iterator(rpc_event, state, response_iterator):
@@ -415,7 +428,7 @@ def _status(rpc_event, state, serialized_response):
with state.condition:
if state.client is not _CANCELLED:
trailing_metadata = _common.metadata(state.trailing_metadata)
- code = _code(state)
+ code = _completion_code(state)
details = _details(state)
operations = [
cygrpc.operation_send_status_from_server(
@@ -440,9 +453,9 @@ def _unary_response_in_pool(
response_serializer):
argument = argument_thunk()
if argument is not None:
- response = _call_behavior(
+ response, proceed = _call_behavior(
rpc_event, state, behavior, argument, request_deserializer)
- if response is not None:
+ if proceed:
serialized_response = _serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
@@ -455,9 +468,9 @@ def _stream_response_in_pool(
response_serializer):
argument = argument_thunk()
if argument is not None:
- response_iterator = _call_behavior(
+ response_iterator, proceed = _call_behavior(
rpc_event, state, behavior, argument, request_deserializer)
- if response_iterator is not None:
+ if proceed:
while True:
response, proceed = _take_response_from_response_iterator(
rpc_event, state, response_iterator)
diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py
new file mode 100644
index 0000000000..621fcf2174
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/_client_adaptations.py
@@ -0,0 +1,566 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Translates gRPC's client-side API into gRPC's client-side Beta API."""
+
+import grpc
+from grpc._cython import cygrpc
+from grpc.beta import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.foundation import future
+from grpc.framework.interfaces.face import face
+
+_STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS = {
+ grpc.StatusCode.CANCELLED: (
+ face.Abortion.Kind.CANCELLED, face.CancellationError),
+ grpc.StatusCode.UNKNOWN: (
+ face.Abortion.Kind.REMOTE_FAILURE, face.RemoteError),
+ grpc.StatusCode.DEADLINE_EXCEEDED: (
+ face.Abortion.Kind.EXPIRED, face.ExpirationError),
+ grpc.StatusCode.UNIMPLEMENTED: (
+ face.Abortion.Kind.LOCAL_FAILURE, face.LocalError),
+}
+
+
+def _fully_qualified_method(group, method):
+ return b'/{}/{}'.format(group, method)
+
+
+def _effective_metadata(metadata, metadata_transformer):
+ non_none_metadata = () if metadata is None else metadata
+ if metadata_transformer is None:
+ return non_none_metadata
+ else:
+ return metadata_transformer(non_none_metadata)
+
+
+def _credentials(grpc_call_options):
+ return None if grpc_call_options is None else grpc_call_options.credentials
+
+
+def _abortion(rpc_error_call):
+ code = rpc_error_call.code()
+ pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
+ error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0]
+ return face.Abortion(
+ error_kind, rpc_error_call.initial_metadata(),
+ rpc_error_call.trailing_metadata(), code, rpc_error_code.details())
+
+
+def _abortion_error(rpc_error_call):
+ code = rpc_error_call.code()
+ pair = _STATUS_CODE_TO_ABORTION_KIND_AND_ABORTION_ERROR_CLASS.get(code)
+ exception_class = face.AbortionError if pair is None else pair[1]
+ return exception_class(
+ rpc_error_call.initial_metadata(), rpc_error_call.trailing_metadata(),
+ code, rpc_error_call.details())
+
+
+class _InvocationProtocolContext(interfaces.GRPCInvocationContext):
+
+ def disable_next_request_compression(self):
+ pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
+
+
+class _Rendezvous(future.Future, face.Call):
+
+ def __init__(self, response_future, response_iterator, call):
+ self._future = response_future
+ self._iterator = response_iterator
+ self._call = call
+
+ def cancel(self):
+ return self._call.cancel()
+
+ def cancelled(self):
+ return self._future.cancelled()
+
+ def running(self):
+ return self._future.running()
+
+ def done(self):
+ return self._future.done()
+
+ def result(self, timeout=None):
+ try:
+ return self._future.result(timeout=timeout)
+ except grpc.RpcError as rpc_error_call:
+ raise _abortion_error(rpc_error_call)
+ except grpc.FutureTimeoutError:
+ raise future.TimeoutError()
+ except grpc.FutureCancelledError:
+ raise future.CancelledError()
+
+ def exception(self, timeout=None):
+ try:
+ rpc_error_call = self._future.exception(timeout=timeout)
+ return _abortion_error(rpc_error_call)
+ except grpc.FutureTimeoutError:
+ raise future.TimeoutError()
+ except grpc.FutureCancelledError:
+ raise future.CancelledError()
+
+ def traceback(self, timeout=None):
+ try:
+ return self._future.traceback(timeout=timeout)
+ except grpc.FutureTimeoutError:
+ raise future.TimeoutError()
+ except grpc.FutureCancelledError:
+ raise future.CancelledError()
+
+ def add_done_callback(self, fn):
+ self._future.add_done_callback(lambda ignored_callback: fn(self))
+
+ def __iter__(self):
+ return self
+
+ def _next(self):
+ try:
+ return next(self._iterator)
+ except grpc.RpcError as rpc_error_call:
+ raise _abortion_error(rpc_error_call)
+
+ def __next__(self):
+ return self._next()
+
+ def next(self):
+ return self._next()
+
+ def is_active(self):
+ return self._call.is_active()
+
+ def time_remaining(self):
+ return self._call.time_remaining()
+
+ def add_abortion_callback(self, abortion_callback):
+ registered = self._call.add_callback(
+ lambda: abortion_callback(_abortion(self._call)))
+ return None if registered else _abortion(self._call)
+
+ def protocol_context(self):
+ return _InvocationProtocolContext()
+
+ def initial_metadata(self):
+ return self._call.initial_metadata()
+
+ def terminal_metadata(self):
+ return self._call.terminal_metadata()
+
+ def code(self):
+ return self._call.code()
+
+ def details(self):
+ return self._call.details()
+
+
+def _blocking_unary_unary(
+ channel, group, method, timeout, with_call, protocol_options, metadata,
+ metadata_transformer, request, request_serializer, response_deserializer):
+ try:
+ multi_callable = channel.unary_unary(
+ _fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ if with_call:
+ response, call = multi_callable(
+ request, timeout=timeout, metadata=effective_metadata,
+ credentials=_credentials(protocol_options), with_call=True)
+ return response, _Rendezvous(None, None, call)
+ else:
+ return multi_callable(
+ request, timeout=timeout, metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ except grpc.RpcError as rpc_error_call:
+ raise _abortion_error(rpc_error_call)
+
+
+def _future_unary_unary(
+ channel, group, method, timeout, protocol_options, metadata,
+ metadata_transformer, request, request_serializer, response_deserializer):
+ multi_callable = channel.unary_unary(
+ _fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ response_future = multi_callable.future(
+ request, timeout=timeout, metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return _Rendezvous(response_future, None, response_future)
+
+
+def _unary_stream(
+ channel, group, method, timeout, protocol_options, metadata,
+ metadata_transformer, request, request_serializer, response_deserializer):
+ multi_callable = channel.unary_stream(
+ _fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ response_iterator = multi_callable(
+ request, timeout=timeout, metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return _Rendezvous(None, response_iterator, response_iterator)
+
+
+def _blocking_stream_unary(
+ channel, group, method, timeout, with_call, protocol_options, metadata,
+ metadata_transformer, request_iterator, request_serializer,
+ response_deserializer):
+ try:
+ multi_callable = channel.stream_unary(
+ _fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ if with_call:
+ response, call = multi_callable(
+ request_iterator, timeout=timeout, metadata=effective_metadata,
+ credentials=_credentials(protocol_options), with_call=True)
+ return response, _Rendezvous(None, None, call)
+ else:
+ return multi_callable(
+ request_iterator, timeout=timeout, metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ except grpc.RpcError as rpc_error_call:
+ raise _abortion_error(rpc_error_call)
+
+
+def _future_stream_unary(
+ channel, group, method, timeout, protocol_options, metadata,
+ metadata_transformer, request_iterator, request_serializer,
+ response_deserializer):
+ multi_callable = channel.stream_unary(
+ _fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ response_future = multi_callable.future(
+ request_iterator, timeout=timeout, metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return _Rendezvous(response_future, None, response_future)
+
+
+def _stream_stream(
+ channel, group, method, timeout, protocol_options, metadata,
+ metadata_transformer, request_iterator, request_serializer,
+ response_deserializer):
+ multi_callable = channel.stream_stream(
+ _fully_qualified_method(group, method),
+ request_serializer=request_serializer,
+ response_deserializer=response_deserializer)
+ effective_metadata = _effective_metadata(metadata, metadata_transformer)
+ response_iterator = multi_callable(
+ request_iterator, timeout=timeout, metadata=effective_metadata,
+ credentials=_credentials(protocol_options))
+ return _Rendezvous(None, response_iterator, response_iterator)
+
+
+class _UnaryUnaryMultiCallable(face.UnaryUnaryMultiCallable):
+
+ def __init__(
+ self, channel, group, method, metadata_transformer, request_serializer,
+ response_deserializer):
+ self._channel = channel
+ self._group = group
+ self._method = method
+ self._metadata_transformer = metadata_transformer
+ self._request_serializer = request_serializer
+ self._response_deserializer = response_deserializer
+
+ def __call__(
+ self, request, timeout, metadata=None, with_call=False,
+ protocol_options=None):
+ return _blocking_unary_unary(
+ self._channel, self._group, self._method, timeout, with_call,
+ protocol_options, metadata, self._metadata_transformer, request,
+ self._request_serializer, self._response_deserializer)
+
+ def future(self, request, timeout, metadata=None, protocol_options=None):
+ return _future_unary_unary(
+ self._channel, self._group, self._method, timeout, protocol_options,
+ metadata, self._metadata_transformer, request, self._request_serializer,
+ self._response_deserializer)
+
+ def event(
+ self, request, receiver, abortion_callback, timeout,
+ metadata=None, protocol_options=None):
+ raise NotImplementedError()
+
+
+class _UnaryStreamMultiCallable(face.UnaryStreamMultiCallable):
+
+ def __init__(
+ self, channel, group, method, metadata_transformer, request_serializer,
+ response_deserializer):
+ self._channel = channel
+ self._group = group
+ self._method = method
+ self._metadata_transformer = metadata_transformer
+ self._request_serializer = request_serializer
+ self._response_deserializer = response_deserializer
+
+ def __call__(self, request, timeout, metadata=None, protocol_options=None):
+ return _unary_stream(
+ self._channel, self._group, self._method, timeout, protocol_options,
+ metadata, self._metadata_transformer, request, self._request_serializer,
+ self._response_deserializer)
+
+ def event(
+ self, request, receiver, abortion_callback, timeout,
+ metadata=None, protocol_options=None):
+ raise NotImplementedError()
+
+
+class _StreamUnaryMultiCallable(face.StreamUnaryMultiCallable):
+
+ def __init__(
+ self, channel, group, method, metadata_transformer, request_serializer,
+ response_deserializer):
+ self._channel = channel
+ self._group = group
+ self._method = method
+ self._metadata_transformer = metadata_transformer
+ self._request_serializer = request_serializer
+ self._response_deserializer = response_deserializer
+
+ def __call__(
+ self, request_iterator, timeout, metadata=None, with_call=False,
+ protocol_options=None):
+ return _blocking_stream_unary(
+ self._channel, self._group, self._method, timeout, with_call,
+ protocol_options, metadata, self._metadata_transformer,
+ request_iterator, self._request_serializer, self._response_deserializer)
+
+ def future(
+ self, request_iterator, timeout, metadata=None, protocol_options=None):
+ return _future_stream_unary(
+ self._channel, self._group, self._method, timeout, protocol_options,
+ metadata, self._metadata_transformer, request_iterator,
+ self._request_serializer, self._response_deserializer)
+
+ def event(
+ self, receiver, abortion_callback, timeout, metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
+
+
+class _StreamStreamMultiCallable(face.StreamStreamMultiCallable):
+
+ def __init__(
+ self, channel, group, method, metadata_transformer, request_serializer,
+ response_deserializer):
+ self._channel = channel
+ self._group = group
+ self._method = method
+ self._metadata_transformer = metadata_transformer
+ self._request_serializer = request_serializer
+ self._response_deserializer = response_deserializer
+
+ def __call__(
+ self, request_iterator, timeout, metadata=None, protocol_options=None):
+ return _stream_stream(
+ self._channel, self._group, self._method, timeout, protocol_options,
+ metadata, self._metadata_transformer, request_iterator,
+ self._request_serializer, self._response_deserializer)
+
+ def event(
+ self, receiver, abortion_callback, timeout, metadata=None,
+ protocol_options=None):
+ raise NotImplementedError()
+
+
+class _GenericStub(face.GenericStub):
+
+ def __init__(
+ self, channel, metadata_transformer, request_serializers,
+ response_deserializers):
+ self._channel = channel
+ self._metadata_transformer = metadata_transformer
+ self._request_serializers = request_serializers or {}
+ self._response_deserializers = response_deserializers or {}
+
+ def blocking_unary_unary(
+ self, group, method, request, timeout, metadata=None,
+ with_call=None, protocol_options=None):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _blocking_unary_unary(
+ self._channel, group, method, timeout, with_call, protocol_options,
+ metadata, self._metadata_transformer, request, request_serializer,
+ response_deserializer)
+
+ def future_unary_unary(
+ self, group, method, request, timeout, metadata=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _future_unary_unary(
+ self._channel, group, method, timeout, protocol_options, metadata,
+ self._metadata_transformer, request, request_serializer,
+ response_deserializer)
+
+ def inline_unary_stream(
+ self, group, method, request, timeout, metadata=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _unary_stream(
+ self._channel, group, method, timeout, protocol_options, metadata,
+ self._metadata_transformer, request, request_serializer,
+ response_deserializer)
+
+ def blocking_stream_unary(
+ self, group, method, request_iterator, timeout, metadata=None,
+ with_call=None, protocol_options=None):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _blocking_stream_unary(
+ self._channel, group, method, timeout, with_call, protocol_options,
+ metadata, self._metadata_transformer, request_iterator,
+ request_serializer, response_deserializer)
+
+ def future_stream_unary(
+ self, group, method, request_iterator, timeout, metadata=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _future_stream_unary(
+ self._channel, group, method, timeout, protocol_options, metadata,
+ self._metadata_transformer, request_iterator, request_serializer,
+ response_deserializer)
+
+ def inline_stream_stream(
+ self, group, method, request_iterator, timeout, metadata=None,
+ protocol_options=None):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _stream_stream(
+ self._channel, group, method, timeout, protocol_options, metadata,
+ self._metadata_transformer, request_iterator, request_serializer,
+ response_deserializer)
+
+ def event_unary_unary(
+ self, group, method, request, receiver, abortion_callback, timeout,
+ metadata=None, protocol_options=None):
+ raise NotImplementedError()
+
+ def event_unary_stream(
+ self, group, method, request, receiver, abortion_callback, timeout,
+ metadata=None, protocol_options=None):
+ raise NotImplementedError()
+
+ def event_stream_unary(
+ self, group, method, receiver, abortion_callback, timeout,
+ metadata=None, protocol_options=None):
+ raise NotImplementedError()
+
+ def event_stream_stream(
+ self, group, method, receiver, abortion_callback, timeout,
+ metadata=None, protocol_options=None):
+ raise NotImplementedError()
+
+ def unary_unary(self, group, method):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _UnaryUnaryMultiCallable(
+ self._channel, group, method, self._metadata_transformer,
+ request_serializer, response_deserializer)
+
+ def unary_stream(self, group, method):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _UnaryStreamMultiCallable(
+ self._channel, group, method, self._metadata_transformer,
+ request_serializer, response_deserializer)
+
+ def stream_unary(self, group, method):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _StreamUnaryMultiCallable(
+ self._channel, group, method, self._metadata_transformer,
+ request_serializer, response_deserializer)
+
+ def stream_stream(self, group, method):
+ request_serializer = self._request_serializers.get((group, method,))
+ response_deserializer = self._response_deserializers.get((group, method,))
+ return _StreamStreamMultiCallable(
+ self._channel, group, method, self._metadata_transformer,
+ request_serializer, response_deserializer)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+
+class _DynamicStub(face.DynamicStub):
+
+ def __init__(self, generic_stub, group, cardinalities):
+ self._generic_stub = generic_stub
+ self._group = group
+ self._cardinalities = cardinalities
+
+ def __getattr__(self, attr):
+ method_cardinality = self._cardinalities.get(attr)
+ if method_cardinality is cardinality.Cardinality.UNARY_UNARY:
+ return self._generic_stub.unary_unary(self._group, attr)
+ elif method_cardinality is cardinality.Cardinality.UNARY_STREAM:
+ return self._generic_stub.unary_stream(self._group, attr)
+ elif method_cardinality is cardinality.Cardinality.STREAM_UNARY:
+ return self._generic_stub.stream_unary(self._group, attr)
+ elif method_cardinality is cardinality.Cardinality.STREAM_STREAM:
+ return self._generic_stub.stream_stream(self._group, attr)
+ else:
+ raise AttributeError('_DynamicStub object has no attribute "%s"!' % attr)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ return False
+
+
+def generic_stub(
+ channel, host, metadata_transformer, request_serializers,
+ response_deserializers):
+ return _GenericStub(
+ channel, metadata_transformer, request_serializers,
+ response_deserializers)
+
+
+def dynamic_stub(
+ channel, service, cardinalities, host, metadata_transformer,
+ request_serializers, response_deserializers):
+ return _DynamicStub(
+ _GenericStub(
+ channel, metadata_transformer, request_serializers,
+ response_deserializers),
+ service, cardinalities)
diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py
new file mode 100644
index 0000000000..52eadf2315
--- /dev/null
+++ b/src/python/grpcio/grpc/beta/_server_adaptations.py
@@ -0,0 +1,359 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Translates gRPC's server-side API into gRPC's server-side Beta API."""
+
+import collections
+import threading
+
+import grpc
+from grpc.beta import interfaces
+from grpc.framework.common import cardinality
+from grpc.framework.common import style
+from grpc.framework.foundation import abandonment
+from grpc.framework.foundation import logging_pool
+from grpc.framework.foundation import stream
+from grpc.framework.interfaces.face import face
+
+_DEFAULT_POOL_SIZE = 8
+
+
+class _ServerProtocolContext(interfaces.GRPCServicerContext):
+
+ def __init__(self, servicer_context):
+ self._servicer_context = servicer_context
+
+ def peer(self):
+ return self._servicer_context.peer()
+
+ def disable_next_response_compression(self):
+ pass # TODO(https://github.com/grpc/grpc/issues/4078): design, implement.
+
+
+class _FaceServicerContext(face.ServicerContext):
+
+ def __init__(self, servicer_context):
+ self._servicer_context = servicer_context
+
+ def is_active(self):
+ return self._servicer_context.is_active()
+
+ def time_remaining(self):
+ return self._servicer_context.time_remaining()
+
+ def add_abortion_callback(self, abortion_callback):
+ raise NotImplementedError(
+ 'add_abortion_callback no longer supported server-side!')
+
+ def cancel(self):
+ self._servicer_context.cancel()
+
+ def protocol_context(self):
+ return _ServerProtocolContext(self._servicer_context)
+
+ def invocation_metadata(self):
+ return self._servicer_context.invocation_metadata()
+
+ def initial_metadata(self, initial_metadata):
+ self._servicer_context.send_initial_metadata(initial_metadata)
+
+ def terminal_metadata(self, terminal_metadata):
+ self._servicer_context.set_terminal_metadata(terminal_metadata)
+
+ def code(self, code):
+ self._servicer_context.set_code(code)
+
+ def details(self, details):
+ self._servicer_context.set_details(details)
+
+
+def _adapt_unary_request_inline(unary_request_inline):
+ def adaptation(request, servicer_context):
+ return unary_request_inline(request, _FaceServicerContext(servicer_context))
+ return adaptation
+
+
+def _adapt_stream_request_inline(stream_request_inline):
+ def adaptation(request_iterator, servicer_context):
+ return stream_request_inline(
+ request_iterator, _FaceServicerContext(servicer_context))
+ return adaptation
+
+
+class _Callback(stream.Consumer):
+
+ def __init__(self):
+ self._condition = threading.Condition()
+ self._values = []
+ self._terminated = False
+ self._cancelled = False
+
+ def consume(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._condition.notify_all()
+
+ def terminate(self):
+ with self._condition:
+ self._terminated = True
+ self._condition.notify_all()
+
+ def consume_and_terminate(self, value):
+ with self._condition:
+ self._values.append(value)
+ self._terminated = True
+ self._condition.notify_all()
+
+ def cancel(self):
+ with self._condition:
+ self._cancelled = True
+ self._condition.notify_all()
+
+ def draw_one_value(self):
+ with self._condition:
+ while True:
+ if self._cancelled:
+ raise abandonment.Abandoned()
+ elif self._values:
+ return self._values.pop(0)
+ elif self._terminated:
+ return None
+ else:
+ self._condition.wait()
+
+ def draw_all_values(self):
+ with self._condition:
+ while True:
+ if self._cancelled:
+ raise abandonment.Abandoned()
+ elif self._terminated:
+ all_values = tuple(self._values)
+ self._values = None
+ return all_values
+ else:
+ self._condition.wait()
+
+
+def _pipe_requests(request_iterator, request_consumer, servicer_context):
+ for request in request_iterator:
+ if not servicer_context.is_active():
+ return
+ request_consumer.consume(request)
+ if not servicer_context.is_active():
+ return
+ request_consumer.terminate()
+
+
+def _adapt_unary_unary_event(unary_unary_event):
+ def adaptation(request, servicer_context):
+ callback = _Callback()
+ if not servicer_context.add_callback(callback.cancel):
+ raise abandonment.Abandoned()
+ unary_unary_event(
+ request, callback.consume_and_terminate,
+ _FaceServicerContext(servicer_context))
+ return callback.draw_all_values()[0]
+ return adaptation
+
+
+def _adapt_unary_stream_event(unary_stream_event):
+ def adaptation(request, servicer_context):
+ callback = _Callback()
+ if not servicer_context.add_callback(callback.cancel):
+ raise abandonment.Abandoned()
+ unary_stream_event(
+ request, callback, _FaceServicerContext(servicer_context))
+ while True:
+ response = callback.draw_one_value()
+ if response is None:
+ return
+ else:
+ yield response
+ return adaptation
+
+
+def _adapt_stream_unary_event(stream_unary_event):
+ def adaptation(request_iterator, servicer_context):
+ callback = _Callback()
+ if not servicer_context.add_callback(callback.cancel):
+ raise abandonment.Abandoned()
+ request_consumer = stream_unary_event(
+ callback.consume_and_terminate, _FaceServicerContext(servicer_context))
+ request_pipe_thread = threading.Thread(
+ target=_pipe_requests,
+ args=(request_iterator, request_consumer, servicer_context,))
+ request_pipe_thread.start()
+ return callback.draw_all_values()[0]
+ return adaptation
+
+
+def _adapt_stream_stream_event(stream_stream_event):
+ def adaptation(request_iterator, servicer_context):
+ callback = _Callback()
+ if not servicer_context.add_callback(callback.cancel):
+ raise abandonment.Abandoned()
+ request_consumer = stream_stream_event(
+ callback, _FaceServicerContext(servicer_context))
+ request_pipe_thread = threading.Thread(
+ target=_pipe_requests,
+ args=(request_iterator, request_consumer, servicer_context,))
+ request_pipe_thread.start()
+ while True:
+ response = callback.draw_one_value()
+ if response is None:
+ return
+ else:
+ yield response
+ return adaptation
+
+
+class _SimpleMethodHandler(
+ collections.namedtuple(
+ '_MethodHandler',
+ ('request_streaming', 'response_streaming', 'request_deserializer',
+ 'response_serializer', 'unary_unary', 'unary_stream', 'stream_unary',
+ 'stream_stream',)),
+ grpc.RpcMethodHandler):
+ pass
+
+
+def _simple_method_handler(
+ implementation, request_deserializer, response_serializer):
+ if implementation.style is style.Service.INLINE:
+ if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ return _SimpleMethodHandler(
+ False, False, request_deserializer, response_serializer,
+ _adapt_unary_request_inline(implementation.unary_unary_inline), None,
+ None, None)
+ elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ return _SimpleMethodHandler(
+ False, True, request_deserializer, response_serializer, None,
+ _adapt_unary_request_inline(implementation.unary_stream_inline), None,
+ None)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ return _SimpleMethodHandler(
+ True, False, request_deserializer, response_serializer, None, None,
+ _adapt_stream_request_inline(implementation.stream_unary_inline),
+ None)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ return _SimpleMethodHandler(
+ True, True, request_deserializer, response_serializer, None, None,
+ None,
+ _adapt_stream_request_inline(implementation.stream_stream_inline))
+ elif implementation.style is style.Service.EVENT:
+ if implementation.cardinality is cardinality.Cardinality.UNARY_UNARY:
+ return _SimpleMethodHandler(
+ False, False, request_deserializer, response_serializer,
+ _adapt_unary_unary_event(implementation.unary_unary_event), None,
+ None, None)
+ elif implementation.cardinality is cardinality.Cardinality.UNARY_STREAM:
+ return _SimpleMethodHandler(
+ False, True, request_deserializer, response_serializer, None,
+ _adapt_unary_stream_event(implementation.unary_stream_event), None,
+ None)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_UNARY:
+ return _SimpleMethodHandler(
+ True, False, request_deserializer, response_serializer, None, None,
+ _adapt_stream_unary_event(implementation.stream_unary_event), None)
+ elif implementation.cardinality is cardinality.Cardinality.STREAM_STREAM:
+ return _SimpleMethodHandler(
+ True, True, request_deserializer, response_serializer, None, None,
+ None, _adapt_stream_stream_event(implementation.stream_stream_event))
+
+
+class _GenericRpcHandler(grpc.GenericRpcHandler):
+
+ def __init__(
+ self, method_implementations, multi_method_implementation,
+ request_deserializers, response_serializers):
+ self._method_implementations = method_implementations
+ self._multi_method_implementation = multi_method_implementation
+ self._request_deserializers = request_deserializers or {}
+ self._response_serializers = response_serializers or {}
+
+ def service(self, handler_call_details):
+ try:
+ group_name, method_name = handler_call_details.method.split(b'/')[1:3]
+ except ValueError:
+ return None
+ else:
+ method_implementation = self._method_implementations.get(
+ (group_name, method_name,))
+ if method_implementation is not None:
+ return _simple_method_handler(
+ method_implementation,
+ self._request_deserializers.get((group_name, method_name,)),
+ self._response_serializers.get((group_name, method_name,)))
+ elif self._multi_method_implementation is None:
+ return None
+ else:
+ try:
+ return None #TODO(nathaniel): call the multimethod.
+ except face.NoSuchMethodError:
+ return None
+
+
+class _Server(interfaces.Server):
+
+ def __init__(self, server):
+ self._server = server
+
+ def add_insecure_port(self, address):
+ return self._server.add_insecure_port(address)
+
+ def add_secure_port(self, address, server_credentials):
+ return self._server.add_secure_port(address, server_credentials)
+
+ def start(self):
+ self._server.start()
+
+ def stop(self, grace):
+ return self._server.stop(grace)
+
+ def __enter__(self):
+ self._server.start()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self._server.stop(None)
+ return False
+
+
+def server(
+ service_implementations, multi_method_implementation, request_deserializers,
+ response_serializers, thread_pool, thread_pool_size):
+ generic_rpc_handler = _GenericRpcHandler(
+ service_implementations, multi_method_implementation,
+ request_deserializers, response_serializers)
+ if thread_pool is None:
+ effective_thread_pool = logging_pool.pool(
+ _DEFAULT_POOL_SIZE if thread_pool_size is None else thread_pool_size)
+ else:
+ effective_thread_pool = thread_pool
+ return _Server(grpc.server((generic_rpc_handler,), effective_thread_pool))
diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py
index d8c32dd2f5..4ae6e7d675 100644
--- a/src/python/grpcio/grpc/beta/implementations.py
+++ b/src/python/grpcio/grpc/beta/implementations.py
@@ -35,83 +35,20 @@ import enum
import threading # pylint: disable=unused-import
# cardinality and face are referenced from specification in this module.
-from grpc._adapter import _intermediary_low
-from grpc._adapter import _low
+import grpc
+from grpc import _auth
from grpc._adapter import _types
-from grpc.beta import _auth
-from grpc.beta import _connectivity_channel
-from grpc.beta import _server
-from grpc.beta import _stub
+from grpc.beta import _client_adaptations
+from grpc.beta import _server_adaptations
from grpc.beta import interfaces
from grpc.framework.common import cardinality # pylint: disable=unused-import
from grpc.framework.interfaces.face import face # pylint: disable=unused-import
-_CHANNEL_SUBSCRIPTION_CALLBACK_ERROR_LOG_MESSAGE = (
- 'Exception calling channel subscription callback!')
-
-class ChannelCredentials(object):
- """A value encapsulating the data required to create a secure Channel.
-
- This class and its instances have no supported interface - it exists to define
- the type of its instances and its instances exist to be passed to other
- functions.
- """
-
- def __init__(self, low_credentials):
- self._low_credentials = low_credentials
-
-
-def ssl_channel_credentials(root_certificates=None, private_key=None,
- certificate_chain=None):
- """Creates a ChannelCredentials for use with an SSL-enabled Channel.
-
- Args:
- root_certificates: The PEM-encoded root certificates or unset to ask for
- them to be retrieved from a default location.
- private_key: The PEM-encoded private key to use or unset if no private key
- should be used.
- certificate_chain: The PEM-encoded certificate chain to use or unset if no
- certificate chain should be used.
-
- Returns:
- A ChannelCredentials for use with an SSL-enabled Channel.
- """
- return ChannelCredentials(_low.channel_credentials_ssl(
- root_certificates, private_key, certificate_chain))
-
-
-class CallCredentials(object):
- """A value encapsulating data asserting an identity over an *established*
- channel. May be composed with ChannelCredentials to always assert identity for
- every call over that channel.
-
- This class and its instances have no supported interface - it exists to define
- the type of its instances and its instances exist to be passed to other
- functions.
- """
-
- def __init__(self, low_credentials):
- self._low_credentials = low_credentials
-
-
-def metadata_call_credentials(metadata_plugin, name=None):
- """Construct CallCredentials from an interfaces.GRPCAuthMetadataPlugin.
-
- Args:
- metadata_plugin: An interfaces.GRPCAuthMetadataPlugin to use in constructing
- the CallCredentials object.
-
- Returns:
- A CallCredentials object for use in a GRPCCallOptions object.
- """
- if name is None:
- try:
- name = metadata_plugin.__name__
- except AttributeError:
- name = metadata_plugin.__class__.__name__
- return CallCredentials(
- _low.call_credentials_metadata_plugin(metadata_plugin, name))
+ChannelCredentials = grpc.ChannelCredentials
+ssl_channel_credentials = grpc.ssl_channel_credentials
+CallCredentials = grpc.CallCredentials
+metadata_call_credentials = grpc.metadata_call_credentials
def google_call_credentials(credentials):
@@ -125,53 +62,9 @@ def google_call_credentials(credentials):
"""
return metadata_call_credentials(_auth.GoogleCallCredentials(credentials))
-
-def access_token_call_credentials(access_token):
- """Construct CallCredentials from an access token.
-
- Args:
- access_token: A string to place directly in the http request
- authorization header, ie "Authorization: Bearer <access_token>".
-
- Returns:
- A CallCredentials object for use in a GRPCCallOptions object.
- """
- return metadata_call_credentials(
- _auth.AccessTokenCallCredentials(access_token))
-
-
-def composite_call_credentials(call_credentials, additional_call_credentials):
- """Compose two CallCredentials to make a new one.
-
- Args:
- call_credentials: A CallCredentials object.
- additional_call_credentials: Another CallCredentials object to compose on
- top of call_credentials.
-
- Returns:
- A CallCredentials object for use in a GRPCCallOptions object.
- """
- return CallCredentials(
- _low.call_credentials_composite(
- call_credentials._low_credentials,
- additional_call_credentials._low_credentials))
-
-def composite_channel_credentials(channel_credentials,
- additional_call_credentials):
- """Compose ChannelCredentials on top of client credentials to make a new one.
-
- Args:
- channel_credentials: A ChannelCredentials object.
- additional_call_credentials: A CallCredentials object to compose on
- top of channel_credentials.
-
- Returns:
- A ChannelCredentials object for use in a GRPCCallOptions object.
- """
- return ChannelCredentials(
- _low.channel_credentials_composite(
- channel_credentials._low_credentials,
- additional_call_credentials._low_credentials))
+access_token_call_credentials = grpc.access_token_call_credentials
+composite_call_credentials = grpc.composite_call_credentials
+composite_channel_credentials = grpc.composite_channel_credentials
class Channel(object):
@@ -182,11 +75,8 @@ class Channel(object):
unsupported.
"""
- def __init__(self, low_channel, intermediary_low_channel):
- self._low_channel = low_channel
- self._intermediary_low_channel = intermediary_low_channel
- self._connectivity_channel = _connectivity_channel.ConnectivityChannel(
- low_channel)
+ def __init__(self, channel):
+ self._channel = channel
def subscribe(self, callback, try_to_connect=None):
"""Subscribes to this Channel's connectivity.
@@ -201,7 +91,7 @@ class Channel(object):
attempt to connect if it is not already connected and ready to conduct
RPCs.
"""
- self._connectivity_channel.subscribe(callback, try_to_connect)
+ self._channel.subscribe(callback, try_to_connect=try_to_connect)
def unsubscribe(self, callback):
"""Unsubscribes a callback from this Channel's connectivity.
@@ -210,7 +100,7 @@ class Channel(object):
callback: A callable previously registered with this Channel from having
been passed to its "subscribe" method.
"""
- self._connectivity_channel.unsubscribe(callback)
+ self._channel.unsubscribe(callback)
def insecure_channel(host, port):
@@ -224,9 +114,9 @@ def insecure_channel(host, port):
Returns:
A Channel to the remote host through which RPCs may be conducted.
"""
- intermediary_low_channel = _intermediary_low.Channel(
- '%s:%d' % (host, port) if port else host, None)
- return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access
+ channel = grpc.insecure_channel(
+ host if port is None else '%s:%d' % (host, port))
+ return Channel(channel)
def secure_channel(host, port, channel_credentials):
@@ -241,10 +131,9 @@ def secure_channel(host, port, channel_credentials):
Returns:
A secure Channel to the remote host through which RPCs may be conducted.
"""
- intermediary_low_channel = _intermediary_low.Channel(
- '%s:%d' % (host, port) if port else host,
- channel_credentials._low_credentials)
- return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access
+ channel = grpc.secure_channel(
+ host if port is None else '%s:%d' % (host, port), channel_credentials)
+ return Channel(channel)
class StubOptions(object):
@@ -308,12 +197,11 @@ def generic_stub(channel, options=None):
A face.GenericStub on which RPCs can be made.
"""
effective_options = _EMPTY_STUB_OPTIONS if options is None else options
- return _stub.generic_stub(
- channel._intermediary_low_channel, effective_options.host, # pylint: disable=protected-access
- effective_options.metadata_transformer,
+ return _client_adaptations.generic_stub(
+ channel._channel, # pylint: disable=protected-access
+ effective_options.host, effective_options.metadata_transformer,
effective_options.request_serializers,
- effective_options.response_deserializers, effective_options.thread_pool,
- effective_options.thread_pool_size)
+ effective_options.response_deserializers)
def dynamic_stub(channel, service, cardinalities, options=None):
@@ -331,55 +219,16 @@ def dynamic_stub(channel, service, cardinalities, options=None):
A face.DynamicStub with which RPCs can be invoked.
"""
effective_options = StubOptions() if options is None else options
- return _stub.dynamic_stub(
- channel._intermediary_low_channel, effective_options.host, service, # pylint: disable=protected-access
- cardinalities, effective_options.metadata_transformer,
+ return _client_adaptations.dynamic_stub(
+ channel._channel, # pylint: disable=protected-access
+ service, cardinalities, effective_options.host,
+ effective_options.metadata_transformer,
effective_options.request_serializers,
- effective_options.response_deserializers, effective_options.thread_pool,
- effective_options.thread_pool_size)
-
-
-class ServerCredentials(object):
- """A value encapsulating the data required to open a secure port on a Server.
+ effective_options.response_deserializers)
- This class and its instances have no supported interface - it exists to define
- the type of its instances and its instances exist to be passed to other
- functions.
- """
- def __init__(self, low_credentials):
- self._low_credentials = low_credentials
-
-
-def ssl_server_credentials(
- private_key_certificate_chain_pairs, root_certificates=None,
- require_client_auth=False):
- """Creates a ServerCredentials for use with an SSL-enabled Server.
-
- Args:
- private_key_certificate_chain_pairs: A nonempty sequence each element of
- which is a pair the first element of which is a PEM-encoded private key
- and the second element of which is the corresponding PEM-encoded
- certificate chain.
- root_certificates: PEM-encoded client root certificates to be used for
- verifying authenticated clients. If omitted, require_client_auth must also
- be omitted or be False.
- require_client_auth: A boolean indicating whether or not to require clients
- to be authenticated. May only be True if root_certificates is not None.
-
- Returns:
- A ServerCredentials for use with an SSL-enabled Server.
- """
- if len(private_key_certificate_chain_pairs) == 0:
- raise ValueError(
- 'At least one private key-certificate chain pairis required!')
- elif require_client_auth and root_certificates is None:
- raise ValueError(
- 'Illegal to require client auth without providing root certificates!')
- else:
- return ServerCredentials(_low.server_credentials_ssl(
- root_certificates, private_key_certificate_chain_pairs,
- require_client_auth))
+ServerCredentials = grpc.ServerCredentials
+ssl_server_credentials = grpc.ssl_server_credentials
class ServerOptions(object):
@@ -452,9 +301,8 @@ def server(service_implementations, options=None):
An interfaces.Server with which RPCs can be serviced.
"""
effective_options = _EMPTY_SERVER_OPTIONS if options is None else options
- return _server.server(
+ return _server_adaptations.server(
service_implementations, effective_options.multi_method_implementation,
effective_options.request_deserializers,
effective_options.response_serializers, effective_options.thread_pool,
- effective_options.thread_pool_size, effective_options.default_timeout,
- effective_options.maximum_timeout)
+ effective_options.thread_pool_size)
diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py
index 24de9ad1a8..4343b6c4b5 100644
--- a/src/python/grpcio/grpc/beta/interfaces.py
+++ b/src/python/grpcio/grpc/beta/interfaces.py
@@ -30,53 +30,13 @@
"""Constants and interfaces of the Beta API of gRPC Python."""
import abc
-import enum
import six
-from grpc._adapter import _types
+import grpc
-
-@enum.unique
-class ChannelConnectivity(enum.Enum):
- """Mirrors grpc_connectivity_state in the gRPC Core.
-
- Attributes:
- IDLE: The channel is idle.
- CONNECTING: The channel is connecting.
- READY: The channel is ready to conduct RPCs.
- TRANSIENT_FAILURE: The channel has seen a failure from which it expects to
- recover.
- FATAL_FAILURE: The channel has seen a failure from which it cannot recover.
- """
- IDLE = (_types.ConnectivityState.IDLE, 'idle',)
- CONNECTING = (_types.ConnectivityState.CONNECTING, 'connecting',)
- READY = (_types.ConnectivityState.READY, 'ready',)
- TRANSIENT_FAILURE = (
- _types.ConnectivityState.TRANSIENT_FAILURE, 'transient failure',)
- FATAL_FAILURE = (_types.ConnectivityState.FATAL_FAILURE, 'fatal failure',)
-
-
-@enum.unique
-class StatusCode(enum.Enum):
- """Mirrors grpc_status_code in the C core."""
- OK = 0
- CANCELLED = 1
- UNKNOWN = 2
- INVALID_ARGUMENT = 3
- DEADLINE_EXCEEDED = 4
- NOT_FOUND = 5
- ALREADY_EXISTS = 6
- PERMISSION_DENIED = 7
- RESOURCE_EXHAUSTED = 8
- FAILED_PRECONDITION = 9
- ABORTED = 10
- OUT_OF_RANGE = 11
- UNIMPLEMENTED = 12
- INTERNAL = 13
- UNAVAILABLE = 14
- DATA_LOSS = 15
- UNAUTHENTICATED = 16
+ChannelConnectivity = grpc.ChannelConnectivity
+StatusCode = grpc.StatusCode
class GRPCCallOptions(object):
@@ -106,46 +66,9 @@ def grpc_call_options(disable_compression=False, credentials=None):
"""
return GRPCCallOptions(disable_compression, None, credentials)
-
-class GRPCAuthMetadataContext(six.with_metaclass(abc.ABCMeta)):
- """Provides information to call credentials metadata plugins.
-
- Attributes:
- service_url: A string URL of the service being called into.
- method_name: A string of the fully qualified method name being called.
- """
-
-
-class GRPCAuthMetadataPluginCallback(six.with_metaclass(abc.ABCMeta)):
- """Callback object received by a metadata plugin."""
-
- def __call__(self, metadata, error):
- """Inform the gRPC runtime of the metadata to construct a CallCredentials.
-
- Args:
- metadata: An iterable of 2-sequences (e.g. tuples) of metadata key/value
- pairs.
- error: An Exception to indicate error or None to indicate success.
- """
- raise NotImplementedError()
-
-
-class GRPCAuthMetadataPlugin(six.with_metaclass(abc.ABCMeta)):
- """
- """
-
- def __call__(self, context, callback):
- """Invoke the plugin.
-
- Must not block. Need only be called by the gRPC runtime.
-
- Args:
- context: A GRPCAuthMetadataContext providing information on what the
- plugin is being used for.
- callback: A GRPCAuthMetadataPluginCallback to be invoked either
- synchronously or asynchronously.
- """
- raise NotImplementedError()
+GRPCAuthMetadataContext = grpc.AuthMetadataContext
+GRPCAuthMetadataPluginCallback = grpc.AuthMetadataPluginCallback
+GRPCAuthMetadataPlugin = grpc.AuthMetadataPlugin
class GRPCServicerContext(six.with_metaclass(abc.ABCMeta)):
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 5b053ac2c4..6a1aa3f22a 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -103,6 +103,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/iomgr.c',
'src/core/lib/iomgr/iomgr_posix.c',
'src/core/lib/iomgr/iomgr_windows.c',
+ 'src/core/lib/iomgr/polling_entity.c',
'src/core/lib/iomgr/pollset_set_windows.c',
'src/core/lib/iomgr/pollset_windows.c',
'src/core/lib/iomgr/resolve_address_posix.c',
@@ -231,9 +232,6 @@ CORE_SOURCE_FILES = [
'src/core/ext/client_config/uri_parser.c',
'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
- 'src/core/ext/transport/cronet/client/secure/cronet_channel_create.c',
- 'src/core/ext/transport/cronet/transport/cronet_api_dummy.c',
- 'src/core/ext/transport/cronet/transport/cronet_transport.c',
'src/core/ext/lb_policy/grpclb/load_balancer_api.c',
'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',
diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json
index 81458b11da..8dc47bf69d 100644
--- a/src/python/grpcio/tests/tests.json
+++ b/src/python/grpcio/tests/tests.json
@@ -48,6 +48,7 @@
"_low_test.HangingServerShutdown",
"_low_test.InsecureServerInsecureClient",
"_not_found_test.NotFoundTest",
+ "_read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
"_rpc_test.RPCTest",
"_sanity_test.Sanity",
"_secure_interop_test.SecureInteropTest",
diff --git a/src/python/grpcio/tests/unit/beta/_auth_test.py b/src/python/grpcio/tests/unit/_auth_test.py
index 694928a91b..c31f7b06f7 100644
--- a/src/python/grpcio/tests/unit/beta/_auth_test.py
+++ b/src/python/grpcio/tests/unit/_auth_test.py
@@ -33,7 +33,7 @@ import collections
import threading
import unittest
-from grpc.beta import _auth
+from grpc import _auth
class MockGoogleCreds(object):
diff --git a/src/python/grpcio/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio/tests/unit/_cython/_read_some_but_not_all_responses_test.py
new file mode 100644
index 0000000000..6ae7a90fbe
--- /dev/null
+++ b/src/python/grpcio/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -0,0 +1,251 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Test a corner-case at the level of the Cython API."""
+
+import threading
+import unittest
+
+from grpc._cython import cygrpc
+
+_INFINITE_FUTURE = cygrpc.Timespec(float('+inf'))
+_EMPTY_FLAGS = 0
+_EMPTY_METADATA = cygrpc.Metadata(())
+
+
+class _ServerDriver(object):
+
+ def __init__(self, completion_queue, shutdown_tag):
+ self._condition = threading.Condition()
+ self._completion_queue = completion_queue
+ self._shutdown_tag = shutdown_tag
+ self._events = []
+ self._saw_shutdown_tag = False
+
+ def start(self):
+ def in_thread():
+ while True:
+ event = self._completion_queue.poll()
+ with self._condition:
+ self._events.append(event)
+ self._condition.notify()
+ if event.tag is self._shutdown_tag:
+ self._saw_shutdown_tag = True
+ break
+ thread = threading.Thread(target=in_thread)
+ thread.start()
+
+ def done(self):
+ with self._condition:
+ return self._saw_shutdown_tag
+
+ def first_event(self):
+ with self._condition:
+ while not self._events:
+ self._condition.wait()
+ return self._events[0]
+
+ def events(self):
+ with self._condition:
+ while not self._saw_shutdown_tag:
+ self._condition.wait()
+ return tuple(self._events)
+
+
+class _QueueDriver(object):
+
+ def __init__(self, condition, completion_queue, due):
+ self._condition = condition
+ self._completion_queue = completion_queue
+ self._due = due
+ self._events = []
+ self._returned = False
+
+ def start(self):
+ def in_thread():
+ while True:
+ event = self._completion_queue.poll()
+ with self._condition:
+ self._events.append(event)
+ self._due.remove(event.tag)
+ self._condition.notify_all()
+ if not self._due:
+ self._returned = True
+ return
+ thread = threading.Thread(target=in_thread)
+ thread.start()
+
+ def done(self):
+ with self._condition:
+ return self._returned
+
+ def event_with_tag(self, tag):
+ with self._condition:
+ while True:
+ for event in self._events:
+ if event.tag is tag:
+ return event
+ self._condition.wait()
+
+ def events(self):
+ with self._condition:
+ while not self._returned:
+ self._condition.wait()
+ return tuple(self._events)
+
+
+class ReadSomeButNotAllResponsesTest(unittest.TestCase):
+
+ def testReadSomeButNotAllResponses(self):
+ server_completion_queue = cygrpc.CompletionQueue()
+ server = cygrpc.Server()
+ server.register_completion_queue(server_completion_queue)
+ port = server.add_http2_port('[::]:0')
+ server.start()
+ channel = cygrpc.Channel('localhost:{}'.format(port))
+
+ server_shutdown_tag = 'server_shutdown_tag'
+ server_driver = _ServerDriver(server_completion_queue, server_shutdown_tag)
+ server_driver.start()
+
+ client_condition = threading.Condition()
+ client_due = set()
+ client_completion_queue = cygrpc.CompletionQueue()
+ client_driver = _QueueDriver(
+ client_condition, client_completion_queue, client_due)
+ client_driver.start()
+
+ server_call_condition = threading.Condition()
+ server_send_initial_metadata_tag = 'server_send_initial_metadata_tag'
+ server_send_first_message_tag = 'server_send_first_message_tag'
+ server_send_second_message_tag = 'server_send_second_message_tag'
+ server_complete_rpc_tag = 'server_complete_rpc_tag'
+ server_call_due = set((
+ server_send_initial_metadata_tag,
+ server_send_first_message_tag,
+ server_send_second_message_tag,
+ server_complete_rpc_tag,
+ ))
+ server_call_completion_queue = cygrpc.CompletionQueue()
+ server_call_driver = _QueueDriver(
+ server_call_condition, server_call_completion_queue, server_call_due)
+ server_call_driver.start()
+
+ server_rpc_tag = 'server_rpc_tag'
+ request_call_result = server.request_call(
+ server_call_completion_queue, server_completion_queue, server_rpc_tag)
+
+ client_call = channel.create_call(
+ None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None,
+ _INFINITE_FUTURE)
+ client_receive_initial_metadata_tag = 'client_receive_initial_metadata_tag'
+ client_complete_rpc_tag = 'client_complete_rpc_tag'
+ with client_condition:
+ client_receive_initial_metadata_start_batch_result = (
+ client_call.start_batch(cygrpc.Operations([
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ ]), client_receive_initial_metadata_tag))
+ client_due.add(client_receive_initial_metadata_tag)
+ client_complete_rpc_start_batch_result = (
+ client_call.start_batch(cygrpc.Operations([
+ cygrpc.operation_send_initial_metadata(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
+ ]), client_complete_rpc_tag))
+ client_due.add(client_complete_rpc_tag)
+
+ server_rpc_event = server_driver.first_event()
+
+ with server_call_condition:
+ server_send_initial_metadata_start_batch_result = (
+ server_rpc_event.operation_call.start_batch(cygrpc.Operations([
+ cygrpc.operation_send_initial_metadata(
+ _EMPTY_METADATA, _EMPTY_FLAGS),
+ ]), server_send_initial_metadata_tag))
+ server_send_first_message_start_batch_result = (
+ server_rpc_event.operation_call.start_batch(cygrpc.Operations([
+ cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+ ]), server_send_first_message_tag))
+ server_send_initial_metadata_event = server_call_driver.event_with_tag(
+ server_send_initial_metadata_tag)
+ server_send_first_message_event = server_call_driver.event_with_tag(
+ server_send_first_message_tag)
+ with server_call_condition:
+ server_send_second_message_start_batch_result = (
+ server_rpc_event.operation_call.start_batch(cygrpc.Operations([
+ cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+ ]), server_send_second_message_tag))
+ server_complete_rpc_start_batch_result = (
+ server_rpc_event.operation_call.start_batch(cygrpc.Operations([
+ cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
+ cygrpc.operation_send_status_from_server(
+ cygrpc.Metadata(()), cygrpc.StatusCode.ok, b'test details',
+ _EMPTY_FLAGS),
+ ]), server_complete_rpc_tag))
+ server_send_second_message_event = server_call_driver.event_with_tag(
+ server_send_second_message_tag)
+ server_complete_rpc_event = server_call_driver.event_with_tag(
+ server_complete_rpc_tag)
+ server_call_driver.events()
+
+ with client_condition:
+ client_receive_first_message_tag = 'client_receive_first_message_tag'
+ client_receive_first_message_start_batch_result = (
+ client_call.start_batch(cygrpc.Operations([
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ ]), client_receive_first_message_tag))
+ client_due.add(client_receive_first_message_tag)
+ client_receive_first_message_event = client_driver.event_with_tag(
+ client_receive_first_message_tag)
+
+ client_call_cancel_result = client_call.cancel()
+ client_driver.events()
+
+ server.shutdown(server_completion_queue, server_shutdown_tag)
+ server.cancel_all_calls()
+ server_driver.events()
+
+ self.assertEqual(cygrpc.CallError.ok, request_call_result)
+ self.assertEqual(
+ cygrpc.CallError.ok, server_send_initial_metadata_start_batch_result)
+ self.assertEqual(
+ cygrpc.CallError.ok, client_receive_initial_metadata_start_batch_result)
+ self.assertEqual(
+ cygrpc.CallError.ok, client_complete_rpc_start_batch_result)
+ self.assertEqual(cygrpc.CallError.ok, client_call_cancel_result)
+ self.assertIs(server_rpc_tag, server_rpc_event.tag)
+ self.assertEqual(
+ cygrpc.CompletionType.operation_complete, server_rpc_event.type)
+ self.assertIsInstance(server_rpc_event.operation_call, cygrpc.Call)
+ self.assertEqual(0, len(server_rpc_event.batch_operations))
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/beta/test_utilities.py b/src/python/grpcio/tests/unit/beta/test_utilities.py
index 0313e06a93..66b5f72897 100644
--- a/src/python/grpcio/tests/unit/beta/test_utilities.py
+++ b/src/python/grpcio/tests/unit/beta/test_utilities.py
@@ -29,7 +29,7 @@
"""Test-appropriate entry points into the gRPC Python Beta API."""
-from grpc._adapter import _intermediary_low
+import grpc
from grpc.beta import implementations
@@ -48,9 +48,8 @@ def not_really_secure_channel(
An implementations.Channel to the remote host through which RPCs may be
conducted.
"""
- hostport = '%s:%d' % (host, port)
- intermediary_low_channel = _intermediary_low.Channel(
- hostport, channel_credentials._low_credentials,
- server_host_override=server_host_override)
- return implementations.Channel(
- intermediary_low_channel._internal, intermediary_low_channel)
+ target = '%s:%d' % (host, port)
+ channel = grpc.secure_channel(
+ target, ((b'grpc.ssl_target_name_override', server_host_override,),),
+ channel_credentials._credentials)
+ return implementations.Channel(channel)
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index 1510191e78..f76462649d 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -126,7 +126,6 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
-grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -394,7 +393,6 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
- grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index dfaabf1d47..5d690a915d 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -43,7 +43,6 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
-#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -329,9 +328,6 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
-typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
-extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
-#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import