aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Julien Boeuf <jboeuf@google.com>2016-05-18 22:12:43 -0700
committerGravatar Julien Boeuf <jboeuf@google.com>2016-05-18 22:12:43 -0700
commita4880374f9e49ce648cdf3b7512804613280075e (patch)
tree8ee37af5eab830def6f5f196e5f79b55169f12f5 /src
parentfc1ed17fdea8ba574586cfc479f77de3b200f1c6 (diff)
parenta71b0afdc2916e85c03d73eb9fbedd3d9070676a (diff)
Merge branch 'master' of github.com:grpc/grpc into credentials_refactoring
Diffstat (limited to 'src')
-rw-r--r--src/compiler/python_generator.cc110
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c1
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.c (renamed from src/ruby/ext/grpc/rb_signal.c)57
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_api_dummy.c85
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c640
-rw-r--r--src/core/lib/channel/channel_args.c9
-rw-r--r--src/core/lib/channel/compress_filter.c12
-rw-r--r--src/core/lib/channel/compress_filter.h2
-rw-r--r--src/core/lib/http/parser.c3
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c1212
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.h (renamed from src/ruby/ext/grpc/rb_signal.h)12
-rw-r--r--src/core/lib/iomgr/ev_posix.c93
-rw-r--r--src/core/lib/iomgr/exec_ctx.c17
-rw-r--r--src/core/lib/iomgr/exec_ctx.h29
-rw-r--r--src/core/lib/iomgr/iomgr_posix.c6
-rw-r--r--src/core/lib/iomgr/udp_server.c18
-rw-r--r--src/core/lib/iomgr/udp_server.h6
-rw-r--r--src/core/lib/support/string_util_win32.c2
-rw-r--r--src/core/lib/surface/byte_buffer_reader.c19
-rw-r--r--src/core/lib/surface/call.c32
-rw-r--r--src/core/lib/surface/init.c2
-rw-r--r--src/cpp/common/channel_arguments.cc2
-rw-r--r--src/cpp/server/server.cc19
-rw-r--r--src/cpp/server/server_builder.cc38
-rw-r--r--src/csharp/Grpc.Core.Tests/ChannelTest.cs39
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs51
-rw-r--r--src/csharp/Grpc.Core/Channel.cs34
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs15
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs2
-rw-r--r--src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs2
-rw-r--r--src/csharp/Grpc.Examples/MathExamples.cs38
-rw-r--r--src/csharp/Grpc.Examples/MathServiceImpl.cs29
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs4
-rw-r--r--src/csharp/Grpc.IntegrationTesting/StressTestClient.cs2
-rw-r--r--src/csharp/build_packages.bat29
-rw-r--r--src/csharp/buildall.bat29
-rwxr-xr-xsrc/node/tools/bin/protoc.js7
-rwxr-xr-xsrc/node/tools/bin/protoc_plugin.js8
-rw-r--r--src/php/ext/grpc/call.c11
-rw-r--r--src/php/ext/grpc/call.h4
-rw-r--r--src/php/ext/grpc/call_credentials.c12
-rw-r--r--src/php/ext/grpc/channel.c4
-rwxr-xr-xsrc/php/ext/grpc/channel.h2
-rw-r--r--src/php/ext/grpc/channel_credentials.c8
-rw-r--r--src/php/ext/grpc/server.c8
-rw-r--r--src/php/ext/grpc/server_credentials.c4
-rw-r--r--src/php/ext/grpc/timeval.c14
-rwxr-xr-xsrc/php/ext/grpc/timeval.h2
-rw-r--r--src/proto/grpc/reflection/v1alpha/reflection.proto151
-rw-r--r--src/proto/grpc/testing/echo_messages.proto7
-rw-r--r--src/python/grpcio/grpc/_adapter/_low.py20
-rw-r--r--src/python/grpcio/grpc/_adapter/_types.py4
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi38
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi4
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi74
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.c2
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.h13
-rw-r--r--src/python/grpcio/grpc/beta/interfaces.py2
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py4
-rw-r--r--src/python/grpcio/tests/health_check/__init__.py (renamed from src/ruby/lib/grpc/signals.rb)41
-rw-r--r--src/python/grpcio/tests/health_check/_health_servicer_test.py75
-rw-r--r--src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py295
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto51
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto77
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto (renamed from src/python/grpcio_health_checking/grpc/health/v1/health.proto)22
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto (renamed from src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto)85
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py60
-rw-r--r--src/python/grpcio/tests/qps/client_runner.py2
-rw-r--r--src/python/grpcio/tests/qps/worker_server.py5
-rw-r--r--src/python/grpcio/tests/tests.json5
-rw-r--r--src/python/grpcio/tests/unit/_cython/_channel_test.py2
-rw-r--r--src/python/grpcio/tests/unit/_cython/cygrpc_test.py58
-rw-r--r--src/python/grpcio/tests/unit/framework/common/test_constants.py9
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py10
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py8
-rw-r--r--src/python/grpcio_health_checking/.gitignore5
-rw-r--r--src/python/grpcio_health_checking/MANIFEST.in5
-rw-r--r--src/python/grpcio_health_checking/grpc/health/v1/health.py129
-rw-r--r--src/python/grpcio_health_checking/grpc_health/__init__.py (renamed from src/python/grpcio_health_checking/grpc/__init__.py)0
-rw-r--r--src/python/grpcio_health_checking/grpc_health/health/__init__.py (renamed from src/python/grpcio_health_checking/grpc/health/__init__.py)0
-rw-r--r--src/python/grpcio_health_checking/grpc_health/health/v1/__init__.py (renamed from src/python/grpcio_health_checking/grpc/health/v1/__init__.py)0
-rw-r--r--src/python/grpcio_health_checking/grpc_health/health/v1/health.py66
-rw-r--r--src/python/grpcio_health_checking/health_commands.py (renamed from src/python/grpcio_health_checking/commands.py)30
-rw-r--r--src/python/grpcio_health_checking/setup.py15
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c52
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.h2
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h4
-rw-r--r--src/ruby/ext/grpc/rb_server.c2
-rw-r--r--src/ruby/lib/grpc.rb3
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb5
-rw-r--r--src/ruby/lib/grpc/generic/client_stub.rb7
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb4
-rw-r--r--src/ruby/tools/grpc-tools.gemspec2
96 files changed, 3535 insertions, 710 deletions
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc
index 8e76e6dce6..cd5ddd8832 100644
--- a/src/compiler/python_generator.cc
+++ b/src/compiler/python_generator.cc
@@ -147,7 +147,8 @@ class IndentScope {
// END FORMATTING BOILERPLATE //
////////////////////////////////
-// TODO(protobuf team): Export `ModuleName` from protobuf's
+// TODO(https://github.com/google/protobuf/issues/888):
+// Export `ModuleName` from protobuf's
// `src/google/protobuf/compiler/python/python_generator.cc` file.
grpc::string ModuleName(const grpc::string& filename) {
grpc::string basename = StripProto(filename);
@@ -156,8 +157,23 @@ grpc::string ModuleName(const grpc::string& filename) {
return basename + "_pb2";
}
+// TODO(https://github.com/google/protobuf/issues/888):
+// Export `ModuleAlias` from protobuf's
+// `src/google/protobuf/compiler/python/python_generator.cc` file.
+grpc::string ModuleAlias(const grpc::string& filename) {
+ grpc::string module_name = ModuleName(filename);
+ // We can't have dots in the module name, so we replace each with _dot_.
+ // But that could lead to a collision between a.b and a_dot_b, so we also
+ // duplicate each underscore.
+ module_name = StringReplace(module_name, "_", "__");
+ module_name = StringReplace(module_name, ".", "_dot_");
+ return module_name;
+}
+
+
bool GetModuleAndMessagePath(const Descriptor* type,
- pair<grpc::string, grpc::string>* out) {
+ const ServiceDescriptor* service,
+ grpc::string* out) {
const Descriptor* path_elem_type = type;
vector<const Descriptor*> message_path;
do {
@@ -170,7 +186,9 @@ bool GetModuleAndMessagePath(const Descriptor* type,
file_name.find_last_of(".proto") == file_name.size() - 1)) {
return false;
}
- grpc::string module = ModuleName(file_name);
+ grpc::string service_file_name = service->file()->name();
+ grpc::string module = service_file_name == file_name ?
+ "" : ModuleAlias(file_name) + ".";
grpc::string message_type;
for (auto path_iter = message_path.rbegin();
path_iter != message_path.rend(); ++path_iter) {
@@ -178,7 +196,7 @@ bool GetModuleAndMessagePath(const Descriptor* type,
}
// no pop_back prior to C++11
message_type.resize(message_type.size() - 1);
- *out = make_pair(module, message_type);
+ *out = module + message_type;
return true;
}
@@ -210,7 +228,7 @@ static void PrintAllComments(const DescriptorType* desc, Printer* printer) {
bool PrintBetaServicer(const ServiceDescriptor* service,
Printer* out) {
- out->Print("\n");
+ out->Print("\n\n");
out->Print("class Beta$Service$Servicer(object):\n", "Service",
service->name());
{
@@ -234,7 +252,7 @@ bool PrintBetaServicer(const ServiceDescriptor* service,
bool PrintBetaStub(const ServiceDescriptor* service,
Printer* out) {
- out->Print("\n");
+ out->Print("\n\n");
out->Print("class Beta$Service$Stub(object):\n", "Service", service->name());
{
IndentScope raii_class_indent(out);
@@ -244,7 +262,7 @@ bool PrintBetaStub(const ServiceDescriptor* service,
grpc::string arg_name = meth->client_streaming() ?
"request_iterator" : "request";
auto methdict = ListToDict({"Method", meth->name(), "ArgName", arg_name});
- out->Print(methdict, "def $Method$(self, $ArgName$, timeout):\n");
+ out->Print(methdict, "def $Method$(self, $ArgName$, timeout, metadata=None, with_call=False, protocol_options=None):\n");
{
IndentScope raii_method_indent(out);
PrintAllComments(meth, out);
@@ -260,38 +278,31 @@ bool PrintBetaStub(const ServiceDescriptor* service,
bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name,
const ServiceDescriptor* service, Printer* out) {
- out->Print("\n");
+ out->Print("\n\n");
out->Print("def beta_create_$Service$_server(servicer, pool=None, "
"pool_size=None, default_timeout=None, maximum_timeout=None):\n",
"Service", service->name());
{
IndentScope raii_create_server_indent(out);
map<grpc::string, grpc::string> method_implementation_constructors;
- map<grpc::string, pair<grpc::string, grpc::string>>
- input_message_modules_and_classes;
- map<grpc::string, pair<grpc::string, grpc::string>>
- output_message_modules_and_classes;
+ map<grpc::string, grpc::string> input_message_modules_and_classes;
+ map<grpc::string, grpc::string> output_message_modules_and_classes;
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* method = service->method(i);
const grpc::string method_implementation_constructor =
grpc::string(method->client_streaming() ? "stream_" : "unary_") +
grpc::string(method->server_streaming() ? "stream_" : "unary_") +
"inline";
- pair<grpc::string, grpc::string> input_message_module_and_class;
- if (!GetModuleAndMessagePath(method->input_type(),
+ grpc::string input_message_module_and_class;
+ if (!GetModuleAndMessagePath(method->input_type(), service,
&input_message_module_and_class)) {
return false;
}
- pair<grpc::string, grpc::string> output_message_module_and_class;
- if (!GetModuleAndMessagePath(method->output_type(),
+ grpc::string output_message_module_and_class;
+ if (!GetModuleAndMessagePath(method->output_type(), service,
&output_message_module_and_class)) {
return false;
}
- // Import the modules that define the messages used in RPCs.
- out->Print("import $Module$\n", "Module",
- input_message_module_and_class.first);
- out->Print("import $Module$\n", "Module",
- output_message_module_and_class.first);
method_implementation_constructors.insert(
make_pair(method->name(), method_implementation_constructor));
input_message_modules_and_classes.insert(
@@ -307,13 +318,11 @@ bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name,
name_and_input_module_class_pair++) {
IndentScope raii_indent(out);
out->Print("(\'$PackageQualifiedServiceName$\', \'$MethodName$\'): "
- "$InputTypeModule$.$InputTypeClass$.FromString,\n",
+ "$InputTypeModuleAndClass$.FromString,\n",
"PackageQualifiedServiceName", package_qualified_service_name,
"MethodName", name_and_input_module_class_pair->first,
- "InputTypeModule",
- name_and_input_module_class_pair->second.first,
- "InputTypeClass",
- name_and_input_module_class_pair->second.second);
+ "InputTypeModuleAndClass",
+ name_and_input_module_class_pair->second);
}
out->Print("}\n");
out->Print("response_serializers = {\n");
@@ -324,13 +333,11 @@ bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name,
name_and_output_module_class_pair++) {
IndentScope raii_indent(out);
out->Print("(\'$PackageQualifiedServiceName$\', \'$MethodName$\'): "
- "$OutputTypeModule$.$OutputTypeClass$.SerializeToString,\n",
+ "$OutputTypeModuleAndClass$.SerializeToString,\n",
"PackageQualifiedServiceName", package_qualified_service_name,
"MethodName", name_and_output_module_class_pair->first,
- "OutputTypeModule",
- name_and_output_module_class_pair->second.first,
- "OutputTypeClass",
- name_and_output_module_class_pair->second.second);
+ "OutputTypeModuleAndClass",
+ name_and_output_module_class_pair->second);
}
out->Print("}\n");
out->Print("method_implementations = {\n");
@@ -366,37 +373,30 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
map<grpc::string, grpc::string> dict = ListToDict({
"Service", service->name(),
});
- out->Print("\n");
+ out->Print("\n\n");
out->Print(dict, "def beta_create_$Service$_stub(channel, host=None,"
" metadata_transformer=None, pool=None, pool_size=None):\n");
{
IndentScope raii_create_server_indent(out);
map<grpc::string, grpc::string> method_cardinalities;
- map<grpc::string, pair<grpc::string, grpc::string>>
- input_message_modules_and_classes;
- map<grpc::string, pair<grpc::string, grpc::string>>
- output_message_modules_and_classes;
+ map<grpc::string, grpc::string> input_message_modules_and_classes;
+ map<grpc::string, grpc::string> output_message_modules_and_classes;
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* method = service->method(i);
const grpc::string method_cardinality =
grpc::string(method->client_streaming() ? "STREAM" : "UNARY") +
"_" +
- grpc::string(method->server_streaming() ? "STREAM" : "UNARY");
- pair<grpc::string, grpc::string> input_message_module_and_class;
- if (!GetModuleAndMessagePath(method->input_type(),
+ grpc::string(method->server_streaming() ? "STREAM" : "UNARY");
+ grpc::string input_message_module_and_class;
+ if (!GetModuleAndMessagePath(method->input_type(), service,
&input_message_module_and_class)) {
return false;
}
- pair<grpc::string, grpc::string> output_message_module_and_class;
- if (!GetModuleAndMessagePath(method->output_type(),
+ grpc::string output_message_module_and_class;
+ if (!GetModuleAndMessagePath(method->output_type(), service,
&output_message_module_and_class)) {
return false;
}
- // Import the modules that define the messages used in RPCs.
- out->Print("import $Module$\n", "Module",
- input_message_module_and_class.first);
- out->Print("import $Module$\n", "Module",
- output_message_module_and_class.first);
method_cardinalities.insert(
make_pair(method->name(), method_cardinality));
input_message_modules_and_classes.insert(
@@ -412,13 +412,11 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
name_and_input_module_class_pair++) {
IndentScope raii_indent(out);
out->Print("(\'$PackageQualifiedServiceName$\', \'$MethodName$\'): "
- "$InputTypeModule$.$InputTypeClass$.SerializeToString,\n",
+ "$InputTypeModuleAndClass$.SerializeToString,\n",
"PackageQualifiedServiceName", package_qualified_service_name,
"MethodName", name_and_input_module_class_pair->first,
- "InputTypeModule",
- name_and_input_module_class_pair->second.first,
- "InputTypeClass",
- name_and_input_module_class_pair->second.second);
+ "InputTypeModuleAndClass",
+ name_and_input_module_class_pair->second);
}
out->Print("}\n");
out->Print("response_deserializers = {\n");
@@ -429,13 +427,11 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
name_and_output_module_class_pair++) {
IndentScope raii_indent(out);
out->Print("(\'$PackageQualifiedServiceName$\', \'$MethodName$\'): "
- "$OutputTypeModule$.$OutputTypeClass$.FromString,\n",
+ "$OutputTypeModuleAndClass$.FromString,\n",
"PackageQualifiedServiceName", package_qualified_service_name,
"MethodName", name_and_output_module_class_pair->first,
- "OutputTypeModule",
- name_and_output_module_class_pair->second.first,
- "OutputTypeClass",
- name_and_output_module_class_pair->second.second);
+ "OutputTypeModuleAndClass",
+ name_and_output_module_class_pair->second);
}
out->Print("}\n");
out->Print("cardinalities = {\n");
@@ -463,8 +459,6 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
bool PrintPreamble(const FileDescriptor* file,
const GeneratorConfiguration& config, Printer* out) {
- out->Print("import abc\n");
- out->Print("import six\n");
out->Print("from $Package$ import implementations as beta_implementations\n",
"Package", config.beta_package_root);
out->Print("from $Package$ import interfaces as beta_interfaces\n",
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index 9918fbdcb4..91fa917661 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -174,6 +174,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (holder->connected_subchannel == NULL) {
+ gpr_atm_no_barrier_store(&holder->subchannel_call, 1);
fail_locked(exec_ctx, holder);
} else if (1 == gpr_atm_acq_load(&holder->subchannel_call)) {
/* already cancelled before subchannel became ready */
diff --git a/src/ruby/ext/grpc/rb_signal.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
index a9e512374b..df1acddcc0 100644
--- a/src/ruby/ext/grpc/rb_signal.c
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
@@ -31,40 +31,39 @@
*
*/
-#include <ruby/ruby.h>
-#include <signal.h>
-#include <stdbool.h>
+#include <grpc/impl/codegen/port_platform.h>
-#include <grpc/support/log.h>
+#include <stdio.h>
+#include <string.h>
-#include "rb_grpc.h"
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
-static void (*old_sigint_handler)(int);
-static void (*old_sigterm_handler)(int);
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/transport_impl.h"
-static volatile bool signal_received = false;
+// Cronet transport object
+typedef struct cronet_transport {
+ grpc_transport base; // must be first element in this structure
+ void *engine;
+ char *host;
+} cronet_transport;
-/* This has to be handled at the C level instead of Ruby, because Ruby signal
- * handlers are constrained to run in the main interpreter thread. If that main
- * thread is blocked on grpc_completion_queue_pluck, the signal handlers will
- * never run */
-static void handle_signal(int signum) {
- signal_received = true;
- if (signum == SIGINT) {
- old_sigint_handler(signum);
- } else if (signum == SIGTERM) {
- old_sigterm_handler(signum);
- }
-}
+extern grpc_transport_vtable grpc_cronet_vtable;
-static VALUE grpc_rb_signal_received(VALUE self) {
- (void)self;
- return signal_received ? Qtrue : Qfalse;
-}
+GRPCAPI grpc_channel *grpc_cronet_secure_channel_create(
+ void *engine, const char *target, const grpc_channel_args *args,
+ void *reserved) {
+ cronet_transport *ct = gpr_malloc(sizeof(cronet_transport));
+ ct->base.vtable = &grpc_cronet_vtable;
+ ct->engine = engine;
+ ct->host = gpr_malloc(strlen(target) + 1);
+ strcpy(ct->host, target);
+ gpr_log(GPR_DEBUG,
+ "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine,
+ ct->host);
-void Init_grpc_signals() {
- old_sigint_handler = signal(SIGINT, handle_signal);
- old_sigterm_handler = signal(SIGTERM, handle_signal);
- rb_define_singleton_method(grpc_rb_mGrpcCore, "signal_received?",
- grpc_rb_signal_received, 0);
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ return grpc_channel_create(&exec_ctx, target, args,
+ GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct);
}
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
new file mode 100644
index 0000000000..687026c9fd
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* This file has empty implementation of all the functions exposed by the cronet
+library, so we can build it in all environments */
+
+#include <stdbool.h>
+
+#include <grpc/support/log.h>
+
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#ifdef GRPC_COMPILE_WITH_CRONET
+/* link with the real CRONET library in the build system */
+#else
+/* Dummy implementation of cronet API just to test for build-ability */
+cronet_bidirectional_stream* cronet_bidirectional_stream_create(
+ cronet_engine* engine, void* annotation,
+ cronet_bidirectional_stream_callback* callback) {
+ GPR_ASSERT(0);
+ return NULL;
+}
+
+int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_start(
+ cronet_bidirectional_stream* stream, const char* url, int priority,
+ const char* method, const cronet_bidirectional_stream_header_array* headers,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream,
+ char* buffer, int capacity) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
+ const char* buffer, int count,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+#endif /* GRPC_COMPILE_WITH_CRONET */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
new file mode 100644
index 0000000000..5bb085195c
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -0,0 +1,640 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/impl/codegen/port_platform.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/transport_impl.h"
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+
+// Global flag that gets set with GRPC_TRACE env variable
+int grpc_cronet_trace = 1;
+
+// Cronet transport object
+struct grpc_cronet_transport {
+ grpc_transport base; /* must be first element in this structure */
+ cronet_engine *engine;
+ char *host;
+};
+
+typedef struct grpc_cronet_transport grpc_cronet_transport;
+
+enum send_state {
+ CRONET_SEND_IDLE = 0,
+ CRONET_REQ_STARTED,
+ CRONET_SEND_HEADER,
+ CRONET_WRITE,
+ CRONET_WRITE_COMPLETED,
+};
+
+enum recv_state {
+ CRONET_RECV_IDLE = 0,
+ CRONET_RECV_READ_LENGTH,
+ CRONET_RECV_READ_DATA,
+ CRONET_RECV_CLOSED,
+};
+
+static const char *recv_state_name[] = {
+ "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,",
+ "CRONET_RECV_CLOSED"};
+
+// Enum that identifies calling function.
+enum e_caller {
+ PERFORM_STREAM_OP,
+ ON_READ_COMPLETE,
+ ON_RESPONSE_HEADERS_RECEIVED,
+ ON_RESPONSE_TRAILERS_RECEIVED
+};
+
+enum callback_id {
+ CB_SEND_INITIAL_METADATA = 0,
+ CB_SEND_MESSAGE,
+ CB_SEND_TRAILING_METADATA,
+ CB_RECV_MESSAGE,
+ CB_RECV_INITIAL_METADATA,
+ CB_RECV_TRAILING_METADATA,
+ CB_NUM_CALLBACKS
+};
+
+struct stream_obj {
+ // we store received bytes here as they trickle in.
+ gpr_slice_buffer write_slice_buffer;
+ cronet_bidirectional_stream *cbs;
+ gpr_slice slice;
+ gpr_slice_buffer read_slice_buffer;
+ struct grpc_slice_buffer_stream sbs;
+ char *read_buffer;
+ int remaining_read_bytes;
+ int total_read_bytes;
+
+ char *write_buffer;
+ size_t write_buffer_size;
+
+ // Hold the URL
+ char *url;
+
+ bool response_headers_received;
+ bool read_requested;
+ bool response_trailers_received;
+ bool read_closed;
+
+ // Recv message stuff
+ grpc_byte_buffer **recv_message;
+ // Initial metadata stuff
+ grpc_metadata_batch *recv_initial_metadata;
+ // Trailing metadata stuff
+ grpc_metadata_batch *recv_trailing_metadata;
+ grpc_chttp2_incoming_metadata_buffer imb;
+
+ // This mutex protects receive state machine execution
+ gpr_mu recv_mu;
+ // we can queue up up to 2 callbacks for each OP
+ grpc_closure *callback_list[CB_NUM_CALLBACKS][2];
+
+ // storage for header
+ cronet_bidirectional_stream_header *headers;
+ uint32_t num_headers;
+ cronet_bidirectional_stream_header_array header_array;
+ // state tracking
+ enum recv_state cronet_recv_state;
+ enum send_state cronet_send_state;
+};
+
+typedef struct stream_obj stream_obj;
+
+static void next_send_step(stream_obj *s);
+static void next_recv_step(stream_obj *s, enum e_caller caller);
+
+static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {}
+
+static void enqueue_callbacks(grpc_closure *callback_list[]) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ if (callback_list[0]) {
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[0], true, NULL);
+ callback_list[0] = NULL;
+ }
+ if (callback_list[1]) {
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[1], true, NULL);
+ callback_list[1] = NULL;
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void on_canceled(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_canceled %p", stream);
+ }
+}
+
+static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error);
+ }
+}
+
+static void on_succeeded(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_succeeded %p", stream);
+ }
+}
+
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *trailers) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_trailers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+
+ memset(&s->imb, 0, sizeof(s->imb));
+ grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
+ unsigned int i = 0;
+ for (i = 0; i < trailers->count; i++) {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->imb, grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(trailers->headers[i].key),
+ grpc_mdstr_from_string(trailers->headers[i].value)));
+ }
+ s->response_trailers_received = true;
+ next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
+}
+
+static void on_write_completed(cronet_bidirectional_stream *stream,
+ const char *data) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_write_completed");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]);
+ s->cronet_send_state = CRONET_WRITE_COMPLETED;
+ next_send_step(s);
+}
+
+static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
+ gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
+ uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
+ memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
+ gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+ grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
+ *s->recv_message = (grpc_byte_buffer *)&s->sbs;
+}
+
+static int parse_grpc_header(const uint8_t *data) {
+ const uint8_t *p = data + 1;
+ int length = 0;
+ length |= ((uint8_t)*p++) << 24;
+ length |= ((uint8_t)*p++) << 16;
+ length |= ((uint8_t)*p++) << 8;
+ length |= ((uint8_t)*p++);
+ return length;
+}
+
+static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+ int count) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d",
+ count, s->total_read_bytes, s->remaining_read_bytes);
+ }
+ if (count > 0) {
+ GPR_ASSERT(s->recv_message);
+ s->remaining_read_bytes -= count;
+ next_recv_step(s, ON_READ_COMPLETE);
+ } else {
+ s->read_closed = true;
+ next_recv_step(s, ON_READ_COMPLETE);
+ }
+}
+
+static void on_response_headers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *headers,
+ const char *negotiated_protocol) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_headers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
+ s->response_headers_received = true;
+ next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
+}
+
+static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_request_headers_sent");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]);
+ s->cronet_send_state = CRONET_SEND_HEADER;
+ next_send_step(s);
+}
+
+// Callback function pointers (invoked by cronet in response to events)
+static cronet_bidirectional_stream_callback callbacks = {
+ on_request_headers_sent,
+ on_response_headers_received,
+ on_read_completed,
+ on_write_completed,
+ on_response_trailers_received,
+ on_succeeded,
+ on_failed,
+ on_canceled};
+
+static void invoke_closing_callback(stream_obj *s) {
+ grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
+ s->recv_trailing_metadata);
+ if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
+ enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
+ }
+}
+
+static void set_recv_state(stream_obj *s, enum recv_state state) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]);
+ }
+ s->cronet_recv_state = state;
+}
+
+// This is invoked from perform_stream_op, and all on_xxxx callbacks.
+static void next_recv_step(stream_obj *s, enum e_caller caller) {
+ gpr_mu_lock(&s->recv_mu);
+ switch (s->cronet_recv_state) {
+ case CRONET_RECV_IDLE:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE");
+ }
+ if (caller == PERFORM_STREAM_OP ||
+ caller == ON_RESPONSE_HEADERS_RECEIVED) {
+ if (s->read_closed && s->response_trailers_received) {
+ invoke_closing_callback(s);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else if (s->response_headers_received == true &&
+ s->read_requested == true) {
+ set_recv_state(s, CRONET_RECV_READ_LENGTH);
+ s->total_read_bytes = s->remaining_read_bytes =
+ GRPC_HEADER_SIZE_IN_BYTES;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_LENGTH:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->read_closed) {
+ invoke_closing_callback(s);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else {
+ GPR_ASSERT(s->remaining_read_bytes == 0);
+ set_recv_state(s, CRONET_RECV_READ_DATA);
+ s->total_read_bytes = s->remaining_read_bytes =
+ parse_grpc_header((const uint8_t *)s->read_buffer);
+ s->read_buffer =
+ gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes);
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_DATA:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->remaining_read_bytes > 0) {
+ int offset = s->total_read_bytes - s->remaining_read_bytes;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(
+ s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes);
+ } else {
+ gpr_slice_buffer_init(&s->read_slice_buffer);
+ uint8_t *p = (uint8_t *)s->read_buffer;
+ process_recv_message(s, p);
+ set_recv_state(s, CRONET_RECV_IDLE);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ }
+ }
+ break;
+ case CRONET_RECV_CLOSED:
+ break;
+ default:
+ GPR_ASSERT(0); // Should not reach here
+ break;
+ }
+ gpr_mu_unlock(&s->recv_mu);
+}
+
+// This function takes the data from s->write_slice_buffer and assembles into
+// a contiguous byte stream with 5 byte gRPC header prepended.
+static void create_grpc_frame(stream_obj *s) {
+ gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer);
+ uint8_t *raw_data = GPR_SLICE_START_PTR(slice);
+ size_t length = GPR_SLICE_LENGTH(slice);
+ s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
+ s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size);
+ uint8_t *p = (uint8_t *)s->write_buffer;
+ // Append 5 byte header
+ *p++ = 0;
+ *p++ = (uint8_t)(length >> 24);
+ *p++ = (uint8_t)(length >> 16);
+ *p++ = (uint8_t)(length >> 8);
+ *p++ = (uint8_t)(length);
+ // append actual data
+ memcpy(p, raw_data, length);
+}
+
+static void do_write(stream_obj *s) {
+ gpr_slice_buffer *sb = &s->write_slice_buffer;
+ GPR_ASSERT(sb->count <= 1);
+ if (sb->count > 0) {
+ create_grpc_frame(s);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
+ (int)s->write_buffer_size, false);
+ }
+}
+
+//
+static void next_send_step(stream_obj *s) {
+ switch (s->cronet_send_state) {
+ case CRONET_SEND_IDLE:
+ GPR_ASSERT(
+ s->cbs); // cronet_bidirectional_stream is not initialized yet.
+ s->cronet_send_state = CRONET_REQ_STARTED;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url);
+ }
+ cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST",
+ &s->header_array, false);
+ // we no longer need the memory that was allocated earlier.
+ gpr_free(s->header_array.headers);
+ break;
+ case CRONET_SEND_HEADER:
+ do_write(s);
+ s->cronet_send_state = CRONET_WRITE;
+ break;
+ case CRONET_WRITE_COMPLETED:
+ do_write(s);
+ break;
+ default:
+ GPR_ASSERT(0);
+ break;
+ }
+}
+
+static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
+ const char *host,
+ stream_obj *s) {
+ grpc_linked_mdelem *curr = head;
+ // Walk the linked list and get number of header fields
+ uint32_t num_headers_available = 0;
+ while (curr != NULL) {
+ curr = curr->next;
+ num_headers_available++;
+ }
+ // Allocate enough memory
+ s->headers = (cronet_bidirectional_stream_header *)gpr_malloc(
+ sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+
+ // Walk the linked list again, this time copying the header fields.
+ // s->num_headers
+ // can be less than num_headers_available, as some headers are not used for
+ // cronet
+ curr = head;
+ s->num_headers = 0;
+ while (s->num_headers < num_headers_available) {
+ grpc_mdelem *mdelem = curr->md;
+ curr = curr->next;
+ const char *key = grpc_mdstr_as_c_string(mdelem->key);
+ const char *value = grpc_mdstr_as_c_string(mdelem->value);
+ if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
+ strcmp(key, ":authority") == 0) {
+ // Cronet populates these fields on its own.
+ continue;
+ }
+ if (strcmp(key, ":path") == 0) {
+ // Create URL by appending :path value to the hostname
+ gpr_asprintf(&s->url, "https://%s%s", host, value);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "extracted URL = %s", s->url);
+ }
+ continue;
+ }
+ s->headers[s->num_headers].key = key;
+ s->headers[s->num_headers].value = value;
+ s->num_headers++;
+ if (curr == NULL) {
+ break;
+ }
+ }
+}
+
+static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ GPR_ASSERT(ct->engine);
+ stream_obj *s = (stream_obj *)gs;
+ if (op->recv_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - recv_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_trailing_metadata = op->recv_trailing_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]);
+ s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
+ }
+ if (op->recv_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_message = (grpc_byte_buffer **)op->recv_message;
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]);
+ s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
+ s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
+ s->read_requested = true;
+ next_recv_step(s, PERFORM_STREAM_OP);
+ }
+ if (op->recv_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p",
+ op->on_complete);
+ }
+ s->recv_initial_metadata = op->recv_initial_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]);
+ s->callback_list[CB_RECV_INITIAL_METADATA][0] =
+ op->recv_initial_metadata_ready;
+ s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
+ }
+ if (op->send_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_initial_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->num_headers = 0;
+ convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head,
+ ct->host, s);
+ s->header_array.count = s->num_headers;
+ s->header_array.capacity = s->num_headers;
+ s->header_array.headers = s->headers;
+ GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]);
+ s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
+ }
+ if (op->send_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p",
+ op->on_complete);
+ }
+ grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice,
+ op->send_message->length, NULL);
+ // Check that compression flag is not ON. We don't support compression yet.
+ // TODO (makdharma): add compression support
+ GPR_ASSERT(op->send_message->flags == 0);
+ gpr_slice_buffer_add(&s->write_slice_buffer, s->slice);
+ if (s->cbs == NULL) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
+ }
+ s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
+ GPR_ASSERT(s->cbs);
+ s->read_closed = false;
+ s->response_trailers_received = false;
+ s->response_headers_received = false;
+ s->cronet_send_state = CRONET_SEND_IDLE;
+ s->cronet_recv_state = CRONET_RECV_IDLE;
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
+ s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
+ next_send_step(s);
+ }
+ if (op->send_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]);
+ s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
+ if (s->cbs) {
+ // Send an "empty" write to the far end to signal that we're done.
+ // This will induce the server to send down trailers.
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
+ } else {
+ // We never created a stream. This was probably an empty request.
+ invoke_closing_callback(s);
+ }
+ }
+}
+
+static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_stream_refcount *refcount,
+ const void *server_data) {
+ stream_obj *s = (stream_obj *)gs;
+ memset(s->callback_list, 0, sizeof(s->callback_list));
+ s->cbs = NULL;
+ gpr_mu_init(&s->recv_mu);
+ s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ gpr_slice_buffer_init(&s->write_slice_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_transport - init_stream");
+ }
+ return 0;
+}
+
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, void *and_free_memory) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy stream");
+ }
+ stream_obj *s = (stream_obj *)gs;
+ s->cbs = NULL;
+ gpr_free(s->read_buffer);
+ gpr_free(s->write_buffer);
+ gpr_free(s->url);
+ gpr_mu_destroy(&s->recv_mu);
+ if (and_free_memory) {
+ gpr_free(and_free_memory);
+ }
+}
+
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ gpr_free(ct->host);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy transport");
+ }
+}
+
+const grpc_transport_vtable grpc_cronet_vtable = {
+ sizeof(stream_obj), "cronet_http", init_stream,
+ set_pollset_do_nothing, perform_stream_op, NULL,
+ destroy_stream, destroy_transport, NULL};
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 28d2d78d00..893cf0700e 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -170,7 +170,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
if (a == NULL) return 0;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) {
+ !strcmp(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, a->args[i].key)) {
return (grpc_compression_algorithm)a->args[i].value.integer;
break;
}
@@ -182,7 +182,7 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm) {
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG;
+ tmp.key = GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM;
tmp.value.integer = algorithm;
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
@@ -196,7 +196,8 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a,
size_t i;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
+ !strcmp(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
+ a->args[i].key)) {
*states_arg = &a->args[i].value.integer;
return 1; /* GPR_TRUE */
}
@@ -222,7 +223,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
/* create a new arg */
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
+ tmp.key = GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET;
/* all enabled by default */
tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
if (state != 0) {
diff --git a/src/core/lib/channel/compress_filter.c b/src/core/lib/channel/compress_filter.c
index 5510c79b18..0e548c61b8 100644
--- a/src/core/lib/channel/compress_filter.c
+++ b/src/core/lib/channel/compress_filter.c
@@ -47,7 +47,7 @@
#include "src/core/lib/support/string.h"
#include "src/core/lib/transport/static_metadata.h"
-int grpc_compress_filter_trace = 0;
+int grpc_compression_trace = 0;
typedef struct call_data {
gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
@@ -171,7 +171,7 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
did_compress =
grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
if (did_compress) {
- if (grpc_compress_filter_trace) {
+ if (grpc_compression_trace) {
char *algo_name;
const size_t before_size = calld->slices.length;
const size_t after_size = tmp.length;
@@ -185,12 +185,14 @@ static void finish_send_message(grpc_exec_ctx *exec_ctx,
gpr_slice_buffer_swap(&calld->slices, &tmp);
calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
} else {
- if (grpc_compress_filter_trace) {
+ if (grpc_compression_trace) {
char *algo_name;
GPR_ASSERT(grpc_compression_algorithm_name(calld->compression_algorithm,
&algo_name));
- gpr_log(GPR_DEBUG, "Algorithm '%s' enabled but decided not to compress.",
- algo_name);
+ gpr_log(
+ GPR_DEBUG,
+ "Algorithm '%s' enabled but decided not to compress. Input size: %d",
+ algo_name, calld->slices.length);
}
}
diff --git a/src/core/lib/channel/compress_filter.h b/src/core/lib/channel/compress_filter.h
index cf5879d82e..0ce5d08837 100644
--- a/src/core/lib/channel/compress_filter.h
+++ b/src/core/lib/channel/compress_filter.h
@@ -38,7 +38,7 @@
#define GRPC_COMPRESS_REQUEST_ALGORITHM_KEY "grpc-internal-encoding-request"
-extern int grpc_compress_filter_trace;
+extern int grpc_compression_trace;
/** Compression filter for outgoing data.
*
diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c
index a7efb5e73e..09b2ed40d1 100644
--- a/src/core/lib/http/parser.c
+++ b/src/core/lib/http/parser.c
@@ -161,8 +161,9 @@ static int add_header(grpc_http_parser *parser) {
cur++;
}
if (cur == end) {
- if (grpc_http1_trace)
+ if (grpc_http1_trace) {
gpr_log(GPR_ERROR, "Didn't find ':' in header string");
+ }
goto error;
}
GPR_ASSERT(cur >= beg);
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 3c8127e1a8..aeb6e28665 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -790,7 +790,6 @@ static void pollset_kick(grpc_pollset *p,
static void pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
- grpc_wakeup_fd_global_init();
grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
@@ -798,7 +797,6 @@ static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
- grpc_wakeup_fd_global_destroy();
}
static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
new file mode 100644
index 0000000000..e91ae40212
--- /dev/null
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -0,0 +1,1212 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SOCKET
+
+#include "src/core/lib/iomgr/ev_poll_posix.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/tls.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/block_annotate.h"
+
+/*******************************************************************************
+ * FD declarations
+ */
+
+typedef struct grpc_fd_watcher {
+ struct grpc_fd_watcher *next;
+ struct grpc_fd_watcher *prev;
+ grpc_pollset *pollset;
+ grpc_pollset_worker *worker;
+ grpc_fd *fd;
+} grpc_fd_watcher;
+
+struct grpc_fd {
+ int fd;
+ /* refst format:
+ bit0: 1=active/0=orphaned
+ bit1-n: refcount
+ meaning that mostly we ref by two to avoid altering the orphaned bit,
+ and just unref by 1 when we're ready to flag the object as orphaned */
+ gpr_atm refst;
+
+ gpr_mu mu;
+ int shutdown;
+ int closed;
+ int released;
+
+ /* The watcher list.
+
+ The following watcher related fields are protected by watcher_mu.
+
+ An fd_watcher is an ephemeral object created when an fd wants to
+ begin polling, and destroyed after the poll.
+
+ It denotes the fd's interest in whether to read poll or write poll
+ or both or neither on this fd.
+
+ If a watcher is asked to poll for reads or writes, the read_watcher
+ or write_watcher fields are set respectively. A watcher may be asked
+ to poll for both, in which case both fields will be set.
+
+ read_watcher and write_watcher may be NULL if no watcher has been
+ asked to poll for reads or writes.
+
+ If an fd_watcher is not asked to poll for reads or writes, it's added
+ to a linked list of inactive watchers, rooted at inactive_watcher_root.
+ If at a later time there becomes need of a poller to poll, one of
+ the inactive pollers may be kicked out of their poll loops to take
+ that responsibility. */
+ grpc_fd_watcher inactive_watcher_root;
+ grpc_fd_watcher *read_watcher;
+ grpc_fd_watcher *write_watcher;
+
+ grpc_closure *read_closure;
+ grpc_closure *write_closure;
+
+ grpc_closure *on_done_closure;
+
+ grpc_iomgr_object iomgr_object;
+};
+
+/* Begin polling on an fd.
+ Registers that the given pollset is interested in this fd - so that if read
+ or writability interest changes, the pollset can be kicked to pick up that
+ new interest.
+ Return value is:
+ (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
+ i.e. a combination of read_mask and write_mask determined by the fd's current
+ interest in said events.
+ Polling strategies that do not need to alter their behavior depending on the
+ fd's current interest (such as epoll) do not need to call this function.
+ MUST NOT be called with a pollset lock taken */
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *rec);
+/* Complete polling previously started with fd_begin_poll
+ MUST NOT be called with a pollset lock taken
+ if got_read or got_write are 1, also does the become_{readable,writable} as
+ appropriate. */
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
+ int got_read, int got_write);
+
+/* Return 1 if this fd is orphaned, 0 otherwise */
+static bool fd_is_orphaned(grpc_fd *fd);
+
+/* Reference counting for fds */
+/*#define GRPC_FD_REF_COUNT_DEBUG*/
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
+#else
+static void fd_ref(grpc_fd *fd);
+static void fd_unref(grpc_fd *fd);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
+#endif
+
+#define CLOSURE_NOT_READY ((grpc_closure *)0)
+#define CLOSURE_READY ((grpc_closure *)1)
+
+/*******************************************************************************
+ * pollset declarations
+ */
+
+typedef struct grpc_cached_wakeup_fd {
+ grpc_wakeup_fd fd;
+ struct grpc_cached_wakeup_fd *next;
+} grpc_cached_wakeup_fd;
+
+struct grpc_pollset_worker {
+ grpc_cached_wakeup_fd *wakeup_fd;
+ int reevaluate_polling_on_wakeup;
+ int kicked_specifically;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+};
+
+struct grpc_pollset {
+ gpr_mu mu;
+ grpc_pollset_worker root_worker;
+ int in_flight_cbs;
+ int shutting_down;
+ int called_shutdown;
+ int kicked_without_pollers;
+ grpc_closure *shutdown_done;
+ grpc_closure_list idle_jobs;
+ /* all polled fds */
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+ /* fds that have been removed from the pollset explicitly */
+ size_t del_count;
+ size_t del_capacity;
+ grpc_fd **dels;
+ /* Local cache of eventfds for workers */
+ grpc_cached_wakeup_fd *local_wakeup_cache;
+};
+
+/* Add an fd to a pollset */
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ struct grpc_fd *fd);
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd);
+
+/* Convert a timespec to milliseconds:
+ - very small or negative poll times are clamped to zero to do a
+ non-blocking poll (which becomes spin polling)
+ - other small values are rounded up to one millisecond
+ - longer than a millisecond polls are rounded up to the next nearest
+ millisecond to avoid spinning
+ - infinite timeouts are converted to -1 */
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now);
+
+/* Allow kick to wakeup the currently polling worker */
+#define GRPC_POLLSET_CAN_KICK_SELF 1
+/* Force the wakee to repoll when awoken */
+#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
+/* As per pollset_kick, with an extended set of flags (defined above)
+ -- mostly for fd_posix's use. */
+static void pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags);
+
+/* Return 1 if the pollset has active threads in pollset_work (pollset must
+ * be locked) */
+static int pollset_has_workers(grpc_pollset *pollset);
+
+/*******************************************************************************
+ * pollset_set definitions
+ */
+
+struct grpc_pollset_set {
+ gpr_mu mu;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ grpc_pollset **pollsets;
+
+ size_t pollset_set_count;
+ size_t pollset_set_capacity;
+ struct grpc_pollset_set **pollset_sets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+};
+
+/*******************************************************************************
+ * fd_posix.c
+ */
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
+static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
+#else
+#define REF_BY(fd, n, reason) ref_by(fd, n)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n)
+static void ref_by(grpc_fd *fd, int n) {
+#endif
+ GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
+}
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_atm old;
+ gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
+#else
+static void unref_by(grpc_fd *fd, int n) {
+ gpr_atm old;
+#endif
+ old = gpr_atm_full_fetch_add(&fd->refst, -n);
+ if (old == n) {
+ gpr_mu_destroy(&fd->mu);
+ grpc_iomgr_unregister_object(&fd->iomgr_object);
+ gpr_free(fd);
+ } else {
+ GPR_ASSERT(old > n);
+ }
+}
+
+static grpc_fd *fd_create(int fd, const char *name) {
+ grpc_fd *r = gpr_malloc(sizeof(*r));
+ gpr_mu_init(&r->mu);
+ gpr_atm_rel_store(&r->refst, 1);
+ r->shutdown = 0;
+ r->read_closure = CLOSURE_NOT_READY;
+ r->write_closure = CLOSURE_NOT_READY;
+ r->fd = fd;
+ r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
+ &r->inactive_watcher_root;
+ r->read_watcher = r->write_watcher = NULL;
+ r->on_done_closure = NULL;
+ r->closed = 0;
+ r->released = 0;
+
+ char *name2;
+ gpr_asprintf(&name2, "%s fd=%d", name, fd);
+ grpc_iomgr_register_object(&r->iomgr_object, name2);
+ gpr_free(name2);
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+ gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
+#endif
+ return r;
+}
+
+static bool fd_is_orphaned(grpc_fd *fd) {
+ return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
+}
+
+static void pollset_kick_locked(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(&watcher->pollset->mu);
+ GPR_ASSERT(watcher->worker);
+ pollset_kick_ext(watcher->pollset, watcher->worker,
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ gpr_mu_unlock(&watcher->pollset->mu);
+}
+
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+ if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
+ pollset_kick_locked(fd->inactive_watcher_root.next);
+ } else if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ } else if (fd->write_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static void wake_all_watchers_locked(grpc_fd *fd) {
+ grpc_fd_watcher *watcher;
+ for (watcher = fd->inactive_watcher_root.next;
+ watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
+ pollset_kick_locked(watcher);
+ }
+ if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ }
+ if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static int has_watchers(grpc_fd *fd) {
+ return fd->read_watcher != NULL || fd->write_watcher != NULL ||
+ fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
+}
+
+static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ fd->closed = 1;
+ if (!fd->released) {
+ close(fd->fd);
+ }
+ grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
+}
+
+static int fd_wrapped_fd(grpc_fd *fd) {
+ if (fd->released || fd->closed) {
+ return -1;
+ } else {
+ return fd->fd;
+ }
+}
+
+static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *on_done, int *release_fd,
+ const char *reason) {
+ fd->on_done_closure = on_done;
+ fd->released = release_fd != NULL;
+ if (!fd->released) {
+ shutdown(fd->fd, SHUT_RDWR);
+ } else {
+ *release_fd = fd->fd;
+ }
+ gpr_mu_lock(&fd->mu);
+ REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
+ if (!has_watchers(fd)) {
+ close_fd_locked(exec_ctx, fd);
+ } else {
+ wake_all_watchers_locked(fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+ UNREF_BY(fd, 2, reason); /* drop the reference */
+}
+
+/* increment refcount by two to avoid changing the orphan bit */
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ ref_by(fd, 2, reason, file, line);
+}
+
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ unref_by(fd, 2, reason, file, line);
+}
+#else
+static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
+
+static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
+#endif
+
+static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st, grpc_closure *closure) {
+ if (*st == CLOSURE_NOT_READY) {
+ /* not ready ==> switch to a waiting state by setting the closure */
+ *st = closure;
+ } else if (*st == CLOSURE_READY) {
+ /* already ready ==> queue the closure to run immediately */
+ *st = CLOSURE_NOT_READY;
+ grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
+ maybe_wake_one_watcher_locked(fd);
+ } else {
+ /* upcallptr was set to a different closure. This is an error! */
+ gpr_log(GPR_ERROR,
+ "User called a notify_on function with a previous callback still "
+ "pending");
+ abort();
+ }
+}
+
+/* returns 1 if state becomes not ready */
+static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st) {
+ if (*st == CLOSURE_READY) {
+ /* duplicate ready ==> ignore */
+ return 0;
+ } else if (*st == CLOSURE_NOT_READY) {
+ /* not ready, and not waiting ==> flag ready */
+ *st = CLOSURE_READY;
+ return 0;
+ } else {
+ /* waiting ==> queue closure */
+ grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
+ *st = CLOSURE_NOT_READY;
+ return 1;
+ }
+}
+
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ GPR_ASSERT(!fd->shutdown);
+ fd->shutdown = 1;
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *watcher) {
+ uint32_t mask = 0;
+ grpc_closure *cur;
+ int requested;
+ /* keep track of pollers that have requested our events, in case they change
+ */
+ GRPC_FD_REF(fd, "poll");
+
+ gpr_mu_lock(&fd->mu);
+
+ /* if we are shutdown, then don't add to the watcher set */
+ if (fd->shutdown) {
+ watcher->fd = NULL;
+ watcher->pollset = NULL;
+ watcher->worker = NULL;
+ gpr_mu_unlock(&fd->mu);
+ GRPC_FD_UNREF(fd, "poll");
+ return 0;
+ }
+
+ /* if there is nobody polling for read, but we need to, then start doing so */
+ cur = fd->read_closure;
+ requested = cur != CLOSURE_READY;
+ if (read_mask && fd->read_watcher == NULL && requested) {
+ fd->read_watcher = watcher;
+ mask |= read_mask;
+ }
+ /* if there is nobody polling for write, but we need to, then start doing so
+ */
+ cur = fd->write_closure;
+ requested = cur != CLOSURE_READY;
+ if (write_mask && fd->write_watcher == NULL && requested) {
+ fd->write_watcher = watcher;
+ mask |= write_mask;
+ }
+ /* if not polling, remember this watcher in case we need someone to later */
+ if (mask == 0 && worker != NULL) {
+ watcher->next = &fd->inactive_watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ }
+ watcher->pollset = pollset;
+ watcher->worker = worker;
+ watcher->fd = fd;
+ gpr_mu_unlock(&fd->mu);
+
+ return mask;
+}
+
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
+ int got_read, int got_write) {
+ int was_polling = 0;
+ int kick = 0;
+ grpc_fd *fd = watcher->fd;
+
+ if (fd == NULL) {
+ return;
+ }
+
+ gpr_mu_lock(&fd->mu);
+
+ if (watcher == fd->read_watcher) {
+ /* remove read watcher, kick if we still need a read */
+ was_polling = 1;
+ if (!got_read) {
+ kick = 1;
+ }
+ fd->read_watcher = NULL;
+ }
+ if (watcher == fd->write_watcher) {
+ /* remove write watcher, kick if we still need a write */
+ was_polling = 1;
+ if (!got_write) {
+ kick = 1;
+ }
+ fd->write_watcher = NULL;
+ }
+ if (!was_polling && watcher->worker != NULL) {
+ /* remove from inactive list */
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ }
+ if (got_read) {
+ if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
+ kick = 1;
+ }
+ }
+ if (got_write) {
+ if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
+ kick = 1;
+ }
+ }
+ if (kick) {
+ maybe_wake_one_watcher_locked(fd);
+ }
+ if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
+ close_fd_locked(exec_ctx, fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+
+ GRPC_FD_UNREF(fd, "poll");
+}
+
+/*******************************************************************************
+ * pollset_posix.c
+ */
+
+GPR_TLS_DECL(g_current_thread_poller);
+GPR_TLS_DECL(g_current_thread_worker);
+
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+static int pollset_has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
+}
+
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (pollset_has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ } else {
+ return NULL;
+ }
+}
+
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags) {
+ GPR_TIMER_BEGIN("pollset_kick_ext", 0);
+
+ /* pollset->mu already held */
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ p->kicked_without_pollers = 1;
+ GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
+ } else if (gpr_tls_get(&g_current_thread_worker) !=
+ (intptr_t)specific_worker) {
+ GPR_TIMER_MARK("different_thread_worker", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ specific_worker->kicked_specifically = 1;
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
+ GPR_TIMER_MARK("kick_yoself", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ specific_worker->kicked_specifically = 1;
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ GPR_TIMER_MARK("kick_anonymous", 0);
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
+ GPR_TIMER_MARK("kick_anonymous_not_self", 0);
+ push_back_worker(p, specific_worker);
+ specific_worker = pop_front_worker(p);
+ if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
+ gpr_tls_get(&g_current_thread_worker) ==
+ (intptr_t)specific_worker) {
+ push_back_worker(p, specific_worker);
+ specific_worker = NULL;
+ }
+ }
+ if (specific_worker != NULL) {
+ GPR_TIMER_MARK("finally_kick", 0);
+ push_back_worker(p, specific_worker);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ } else {
+ GPR_TIMER_MARK("kicked_no_pollers", 0);
+ p->kicked_without_pollers = 1;
+ }
+ }
+
+ GPR_TIMER_END("pollset_kick_ext", 0);
+}
+
+static void pollset_kick(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker) {
+ pollset_kick_ext(p, specific_worker, 0);
+}
+
+/* global state management */
+
+static void pollset_global_init(void) {
+ gpr_tls_init(&g_current_thread_poller);
+ gpr_tls_init(&g_current_thread_worker);
+ grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+}
+
+static void pollset_global_shutdown(void) {
+ grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ gpr_tls_destroy(&g_current_thread_poller);
+ gpr_tls_destroy(&g_current_thread_worker);
+}
+
+static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
+
+/* main interface */
+
+static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ gpr_mu_init(&pollset->mu);
+ *mu = &pollset->mu;
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
+ pollset->in_flight_cbs = 0;
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+ pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
+ pollset->local_wakeup_cache = NULL;
+ pollset->kicked_without_pollers = 0;
+ pollset->fd_count = 0;
+ pollset->fd_capacity = 0;
+ pollset->del_count = 0;
+ pollset->del_capacity = 0;
+ pollset->fds = NULL;
+ pollset->dels = NULL;
+}
+
+static void pollset_destroy(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ while (pollset->local_wakeup_cache) {
+ grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
+ grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
+ gpr_free(pollset->local_wakeup_cache);
+ pollset->local_wakeup_cache = next;
+ }
+ gpr_free(pollset->fds);
+ gpr_free(pollset->dels);
+ gpr_mu_destroy(&pollset->mu);
+}
+
+static void pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ GPR_ASSERT(pollset->fd_count == 0);
+ GPR_ASSERT(pollset->del_count == 0);
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+}
+
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
+ gpr_mu_lock(&pollset->mu);
+ size_t i;
+ /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
+ for (i = 0; i < pollset->fd_count; i++) {
+ if (pollset->fds[i] == fd) goto exit;
+ }
+ if (pollset->fd_count == pollset->fd_capacity) {
+ pollset->fd_capacity =
+ GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
+ pollset->fds =
+ gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity);
+ }
+ pollset->fds[pollset->fd_count++] = fd;
+ GRPC_FD_REF(fd, "multipoller");
+ pollset_kick(pollset, NULL);
+exit:
+ gpr_mu_unlock(&pollset->mu);
+}
+
+static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
+ GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
+ size_t i;
+ for (i = 0; i < pollset->fd_count; i++) {
+ GRPC_FD_UNREF(pollset->fds[i], "multipoller");
+ }
+ for (i = 0; i < pollset->del_count; i++) {
+ GRPC_FD_UNREF(pollset->dels[i], "multipoller_del");
+ }
+ pollset->fd_count = 0;
+ pollset->del_count = 0;
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
+}
+
+static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl, gpr_timespec now,
+ gpr_timespec deadline) {
+ grpc_pollset_worker worker;
+ *worker_hdl = &worker;
+
+ /* pollset->mu already held */
+ int added_worker = 0;
+ int locked = 1;
+ int queued_work = 0;
+ int keep_polling = 0;
+ GPR_TIMER_BEGIN("pollset_work", 0);
+ /* this must happen before we (potentially) drop pollset->mu */
+ worker.next = worker.prev = NULL;
+ worker.reevaluate_polling_on_wakeup = 0;
+ if (pollset->local_wakeup_cache != NULL) {
+ worker.wakeup_fd = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd->next;
+ } else {
+ worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
+ grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ }
+ worker.kicked_specifically = 0;
+ /* If there's work waiting for the pollset to be idle, and the
+ pollset is idle, then do that work */
+ if (!pollset_has_workers(pollset) &&
+ !grpc_closure_list_empty(pollset->idle_jobs)) {
+ GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ goto done;
+ }
+ /* If we're shutting down then we don't execute any extended work */
+ if (pollset->shutting_down) {
+ GPR_TIMER_MARK("pollset_work.shutting_down", 0);
+ goto done;
+ }
+ /* Give do_promote priority so we don't starve it out */
+ if (pollset->in_flight_cbs) {
+ GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0);
+ gpr_mu_unlock(&pollset->mu);
+ locked = 0;
+ goto done;
+ }
+ /* Start polling, and keep doing so while we're being asked to
+ re-evaluate our pollers (this allows poll() based pollers to
+ ensure they don't miss wakeups) */
+ keep_polling = 1;
+ gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
+ while (keep_polling) {
+ keep_polling = 0;
+ if (!pollset->kicked_without_pollers) {
+ if (!added_worker) {
+ push_front_worker(pollset, &worker);
+ added_worker = 1;
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
+ }
+ GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
+ int timeout;
+ int r;
+ size_t i, j, fd_count;
+ nfds_t pfd_count;
+ /* TODO(ctiller): inline some elements to avoid an allocation */
+ grpc_fd_watcher *watchers;
+ struct pollfd *pfds;
+
+ timeout = poll_deadline_to_millis_timeout(deadline, now);
+ /* TODO(ctiller): perform just one malloc here if we exceed the inline
+ * case */
+ pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2));
+ watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2));
+ fd_count = 0;
+ pfd_count = 2;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+ pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
+ for (i = 0; i < pollset->fd_count; i++) {
+ int remove = fd_is_orphaned(pollset->fds[i]);
+ for (j = 0; !remove && j < pollset->del_count; j++) {
+ if (pollset->fds[i] == pollset->dels[j]) remove = 1;
+ }
+ if (remove) {
+ GRPC_FD_UNREF(pollset->fds[i], "multipoller");
+ } else {
+ pollset->fds[fd_count++] = pollset->fds[i];
+ watchers[pfd_count].fd = pollset->fds[i];
+ GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
+ pfds[pfd_count].fd = pollset->fds[i]->fd;
+ pfds[pfd_count].revents = 0;
+ pfd_count++;
+ }
+ }
+ for (j = 0; j < pollset->del_count; j++) {
+ GRPC_FD_UNREF(pollset->dels[j], "multipoller_del");
+ }
+ pollset->del_count = 0;
+ pollset->fd_count = fd_count;
+ gpr_mu_unlock(&pollset->mu);
+
+ for (i = 2; i < pfd_count; i++) {
+ grpc_fd *fd = watchers[i].fd;
+ pfds[i].events = (short)fd_begin_poll(fd, pollset, &worker, POLLIN,
+ POLLOUT, &watchers[i]);
+ GRPC_FD_UNREF(fd, "multipoller_start");
+ }
+
+ /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
+ even going into the blocking annotation if possible */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ r = grpc_poll_function(pfds, pfd_count, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+
+ if (r < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ }
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ }
+ } else if (r == 0) {
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ }
+ } else {
+ if (pfds[0].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfds[1].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd);
+ }
+ for (i = 2; i < pfd_count; i++) {
+ if (watchers[i].fd == NULL) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ } else {
+ fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
+ pfds[i].revents & POLLOUT_CHECK);
+ }
+ }
+ }
+
+ gpr_free(pfds);
+ gpr_free(watchers);
+ GPR_TIMER_END("maybe_work_and_unlock", 0);
+ locked = 0;
+ } else {
+ GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
+ pollset->kicked_without_pollers = 0;
+ }
+ /* Finished execution - start cleaning up.
+ Note that we may arrive here from outside the enclosing while() loop.
+ In that case we won't loop though as we haven't added worker to the
+ worker list, which means nobody could ask us to re-evaluate polling). */
+ done:
+ if (!locked) {
+ queued_work |= grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ locked = 1;
+ }
+ /* If we're forced to re-evaluate polling (via pollset_kick with
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
+ a loop */
+ if (worker.reevaluate_polling_on_wakeup) {
+ worker.reevaluate_polling_on_wakeup = 0;
+ pollset->kicked_without_pollers = 0;
+ if (queued_work || worker.kicked_specifically) {
+ /* If there's queued work on the list, then set the deadline to be
+ immediate so we get back out of the polling loop quickly */
+ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ }
+ keep_polling = 1;
+ }
+ if (keep_polling) {
+ now = gpr_now(now.clock_type);
+ }
+ }
+ gpr_tls_set(&g_current_thread_poller, 0);
+ if (added_worker) {
+ remove_worker(pollset, &worker);
+ gpr_tls_set(&g_current_thread_worker, 0);
+ }
+ /* release wakeup fd to the local pool */
+ worker.wakeup_fd->next = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd;
+ /* check shutdown conditions */
+ if (pollset->shutting_down) {
+ if (pollset_has_workers(pollset)) {
+ pollset_kick(pollset, NULL);
+ } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
+ pollset->called_shutdown = 1;
+ gpr_mu_unlock(&pollset->mu);
+ finish_shutdown(exec_ctx, pollset);
+ grpc_exec_ctx_flush(exec_ctx);
+ /* Continuing to access pollset here is safe -- it is the caller's
+ * responsibility to not destroy when it has outstanding calls to
+ * pollset_work.
+ * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
+ gpr_mu_lock(&pollset->mu);
+ } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ gpr_mu_unlock(&pollset->mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ }
+ }
+ *worker_hdl = NULL;
+ GPR_TIMER_END("pollset_work", 0);
+}
+
+static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
+ GPR_ASSERT(!pollset->shutting_down);
+ pollset->shutting_down = 1;
+ pollset->shutdown_done = closure;
+ pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+ if (!pollset_has_workers(pollset)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ }
+ if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
+ !pollset_has_workers(pollset)) {
+ pollset->called_shutdown = 1;
+ finish_shutdown(exec_ctx, pollset);
+ }
+}
+
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now) {
+ gpr_timespec timeout;
+ static const int64_t max_spin_polling_us = 10;
+ if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
+ return -1;
+ }
+ if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
+ max_spin_polling_us,
+ GPR_TIMESPAN))) <= 0) {
+ return 0;
+ }
+ timeout = gpr_time_sub(deadline, now);
+ return gpr_time_to_millis(gpr_time_add(
+ timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
+}
+
+/*******************************************************************************
+ * pollset_set_posix.c
+ */
+
+static grpc_pollset_set *pollset_set_create(void) {
+ grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
+ memset(pollset_set, 0, sizeof(*pollset_set));
+ gpr_mu_init(&pollset_set->mu);
+ return pollset_set;
+}
+
+static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
+ size_t i;
+ gpr_mu_destroy(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ }
+ gpr_free(pollset_set->pollsets);
+ gpr_free(pollset_set->pollset_sets);
+ gpr_free(pollset_set->fds);
+ gpr_free(pollset_set);
+}
+
+static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i, j;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
+ pollset_set->pollset_capacity =
+ GPR_MAX(8, 2 * pollset_set->pollset_capacity);
+ pollset_set->pollsets =
+ gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
+ sizeof(*pollset_set->pollsets));
+ }
+ pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
+ for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
+ if (fd_is_orphaned(pollset_set->fds[i])) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ } else {
+ pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
+ pollset_set->fds[j++] = pollset_set->fds[i];
+ }
+ }
+ pollset_set->fd_count = j;
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ if (pollset_set->pollsets[i] == pollset) {
+ pollset_set->pollset_count--;
+ GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
+ pollset_set->pollsets[pollset_set->pollset_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i, j;
+ gpr_mu_lock(&bag->mu);
+ if (bag->pollset_set_count == bag->pollset_set_capacity) {
+ bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
+ bag->pollset_sets =
+ gpr_realloc(bag->pollset_sets,
+ bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
+ }
+ bag->pollset_sets[bag->pollset_set_count++] = item;
+ for (i = 0, j = 0; i < bag->fd_count; i++) {
+ if (fd_is_orphaned(bag->fds[i])) {
+ GRPC_FD_UNREF(bag->fds[i], "pollset_set");
+ } else {
+ pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
+ bag->fds[j++] = bag->fds[i];
+ }
+ }
+ bag->fd_count = j;
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i;
+ gpr_mu_lock(&bag->mu);
+ for (i = 0; i < bag->pollset_set_count; i++) {
+ if (bag->pollset_sets[i] == item) {
+ bag->pollset_set_count--;
+ GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
+ bag->pollset_sets[bag->pollset_set_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->fd_count == pollset_set->fd_capacity) {
+ pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
+ pollset_set->fds = gpr_realloc(
+ pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
+ }
+ GRPC_FD_REF(fd, "pollset_set");
+ pollset_set->fds[pollset_set->fd_count++] = fd;
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ if (pollset_set->fds[i] == fd) {
+ pollset_set->fd_count--;
+ GPR_SWAP(grpc_fd *, pollset_set->fds[i],
+ pollset_set->fds[pollset_set->fd_count]);
+ GRPC_FD_UNREF(fd, "pollset_set");
+ break;
+ }
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+/*******************************************************************************
+ * event engine binding
+ */
+
+static void shutdown_engine(void) { pollset_global_shutdown(); }
+
+static const grpc_event_engine_vtable vtable = {
+ .pollset_size = sizeof(grpc_pollset),
+
+ .fd_create = fd_create,
+ .fd_wrapped_fd = fd_wrapped_fd,
+ .fd_orphan = fd_orphan,
+ .fd_shutdown = fd_shutdown,
+ .fd_notify_on_read = fd_notify_on_read,
+ .fd_notify_on_write = fd_notify_on_write,
+
+ .pollset_init = pollset_init,
+ .pollset_shutdown = pollset_shutdown,
+ .pollset_reset = pollset_reset,
+ .pollset_destroy = pollset_destroy,
+ .pollset_work = pollset_work,
+ .pollset_kick = pollset_kick,
+ .pollset_add_fd = pollset_add_fd,
+
+ .pollset_set_create = pollset_set_create,
+ .pollset_set_destroy = pollset_set_destroy,
+ .pollset_set_add_pollset = pollset_set_add_pollset,
+ .pollset_set_del_pollset = pollset_set_del_pollset,
+ .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
+ .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
+ .pollset_set_add_fd = pollset_set_add_fd,
+ .pollset_set_del_fd = pollset_set_del_fd,
+
+ .kick_poller = kick_poller,
+
+ .shutdown_engine = shutdown_engine,
+};
+
+const grpc_event_engine_vtable *grpc_init_poll_posix(void) {
+ pollset_global_init();
+ return &vtable;
+}
+
+#endif
diff --git a/src/ruby/ext/grpc/rb_signal.h b/src/core/lib/iomgr/ev_poll_posix.h
index 07e49c0a8b..291736a2db 100644
--- a/src/ruby/ext/grpc/rb_signal.h
+++ b/src/core/lib/iomgr/ev_poll_posix.h
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2016, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -31,9 +31,11 @@
*
*/
-#ifndef GRPC_RB_SIGNAL_H_
-#define GRPC_RB_SIGNAL_H_
+#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
+#define GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
-void Init_grpc_signals();
+#include "src/core/lib/iomgr/ev_posix.h"
-#endif /* GRPC_RB_SIGNAL_H_ */
+const grpc_event_engine_vtable *grpc_init_poll_posix(void);
+
+#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 7df1751352..a7dfc9552d 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -37,23 +37,104 @@
#include "src/core/lib/iomgr/ev_posix.h"
+#include <string.h>
+
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
+#include "src/core/lib/iomgr/ev_poll_posix.h"
+#include "src/core/lib/support/env.h"
+
+/** Default poll() function - a pointer so that it can be overridden by some
+ * tests */
+grpc_poll_function_type grpc_poll_function = poll;
static const grpc_event_engine_vtable *g_event_engine;
-grpc_poll_function_type grpc_poll_function = poll;
+typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void);
+
+typedef struct {
+ const char *name;
+ event_engine_factory_fn factory;
+} event_engine_factory;
+
+static const event_engine_factory g_factories[] = {
+ {"poll", grpc_init_poll_posix}, {"legacy", grpc_init_poll_and_epoll_posix},
+};
+
+static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
+ size_t n = *ns;
+ size_t np = n + 1;
+ char *s;
+ size_t len;
+ GPR_ASSERT(end >= beg);
+ len = (size_t)(end - beg);
+ s = gpr_malloc(len + 1);
+ memcpy(s, beg, len);
+ s[len] = 0;
+ *ss = gpr_realloc(*ss, sizeof(char **) * np);
+ (*ss)[n] = s;
+ *ns = np;
+}
+
+static void split(const char *s, char ***ss, size_t *ns) {
+ const char *c = strchr(s, ',');
+ if (c == NULL) {
+ add(s, s + strlen(s), ss, ns);
+ } else {
+ add(s, c, ss, ns);
+ split(c + 1, ss, ns);
+ }
+}
+
+static bool is(const char *want, const char *have) {
+ return 0 == strcmp(want, "all") || 0 == strcmp(want, have);
+}
+
+static void try_engine(const char *engine) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (is(engine, g_factories[i].name)) {
+ if ((g_event_engine = g_factories[i].factory())) {
+ gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name);
+ return;
+ }
+ }
+ }
+}
void grpc_event_engine_init(void) {
- if ((g_event_engine = grpc_init_poll_and_epoll_posix())) {
- return;
+ char *s = gpr_getenv("GRPC_POLL_STRATEGY");
+ if (s == NULL) {
+ s = gpr_strdup("all");
+ }
+
+ char **strings = NULL;
+ size_t nstrings = 0;
+ split(s, &strings, &nstrings);
+
+ for (size_t i = 0; g_event_engine == NULL && i < nstrings; i++) {
+ try_engine(strings[i]);
+ }
+
+ for (size_t i = 0; i < nstrings; i++) {
+ gpr_free(strings[i]);
+ }
+ gpr_free(strings);
+ gpr_free(s);
+
+ if (g_event_engine == NULL) {
+ gpr_log(GPR_ERROR, "No event engine could be initialized");
+ abort();
}
- gpr_log(GPR_ERROR, "No event engine could be initialized");
- abort();
}
-void grpc_event_engine_shutdown(void) { g_event_engine->shutdown_engine(); }
+void grpc_event_engine_shutdown(void) {
+ g_event_engine->shutdown_engine();
+ g_event_engine = NULL;
+}
grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c
index 2146c7dd1f..e451479073 100644
--- a/src/core/lib/iomgr/exec_ctx.c
+++ b/src/core/lib/iomgr/exec_ctx.c
@@ -39,6 +39,22 @@
#include "src/core/lib/profiling/timers.h"
+bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) {
+ if (!exec_ctx->cached_ready_to_finish) {
+ exec_ctx->cached_ready_to_finish = exec_ctx->check_ready_to_finish(
+ exec_ctx, exec_ctx->check_ready_to_finish_arg);
+ }
+ return exec_ctx->cached_ready_to_finish;
+}
+
+bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
+ return false;
+}
+
+bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored) {
+ return true;
+}
+
#ifndef GRPC_EXECUTION_CONTEXT_SANITIZER
bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
bool did_something = 0;
@@ -61,6 +77,7 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx) {
}
void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) {
+ exec_ctx->cached_ready_to_finish = true;
grpc_exec_ctx_flush(exec_ctx);
}
diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h
index 976cc40347..9d47a262f8 100644
--- a/src/core/lib/iomgr/exec_ctx.h
+++ b/src/core/lib/iomgr/exec_ctx.h
@@ -53,6 +53,9 @@ typedef struct grpc_workqueue grpc_workqueue;
* - track a list of work that needs to be delayed until the top of the
* call stack (this provides a convenient mechanism to run callbacks
* without worrying about locking issues)
+ * - provide a decision maker (via grpc_exec_ctx_ready_to_finish) that provides
+ * signal as to whether a borrowed thread should continue to do work or
+ * should actively try to finish up and get this thread back to its owner
*
* CONVENTIONS:
* Instance of this must ALWAYS be constructed on the stack, never
@@ -63,18 +66,26 @@ typedef struct grpc_workqueue grpc_workqueue;
*/
struct grpc_exec_ctx {
grpc_closure_list closure_list;
+ bool cached_ready_to_finish;
+ void *check_ready_to_finish_arg;
+ bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
-#define GRPC_EXEC_CTX_INIT \
- { GRPC_CLOSURE_LIST_INIT }
+#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
+ { GRPC_CLOSURE_LIST_INIT, false, finish_check_arg, finish_check }
#else
struct grpc_exec_ctx {
- int unused;
+ bool cached_ready_to_finish;
+ void *check_ready_to_finish_arg;
+ bool (*check_ready_to_finish)(grpc_exec_ctx *exec_ctx, void *arg);
};
-#define GRPC_EXEC_CTX_INIT \
- { 0 }
+#define GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(finish_check, finish_check_arg) \
+ { false, finish_check_arg, finish_check }
#endif
+#define GRPC_EXEC_CTX_INIT \
+ GRPC_EXEC_CTX_INIT_WITH_FINISH_CHECK(grpc_never_ready_to_finish, NULL)
+
/** Flush any work that has been enqueued onto this grpc_exec_ctx.
* Caller must guarantee that no interfering locks are held.
* Returns true if work was performed, false otherwise. */
@@ -86,6 +97,14 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx);
void grpc_exec_ctx_enqueue(grpc_exec_ctx *exec_ctx, grpc_closure *closure,
bool success,
grpc_workqueue *offload_target_or_null);
+/** Returns true if we'd like to leave this execution context as soon as
+ possible: useful for deciding whether to do something more or not depending
+ on outside context */
+bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx);
+/** A finish check that is never ready to finish */
+bool grpc_never_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
+/** A finish check that is always ready to finish */
+bool grpc_always_ready_to_finish(grpc_exec_ctx *exec_ctx, void *arg_ignored);
/** Add a list of closures to be executed at the next flush/finish point.
* Leaves \a list empty. */
void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx,
diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c
index 016c501f75..cede97f4c6 100644
--- a/src/core/lib/iomgr/iomgr_posix.c
+++ b/src/core/lib/iomgr/iomgr_posix.c
@@ -41,12 +41,16 @@
#include "src/core/lib/iomgr/tcp_posix.h"
void grpc_iomgr_platform_init(void) {
+ grpc_wakeup_fd_global_init();
grpc_event_engine_init();
grpc_register_tracer("tcp", &grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
-void grpc_iomgr_platform_shutdown(void) { grpc_event_engine_shutdown(); }
+void grpc_iomgr_platform_shutdown(void) {
+ grpc_event_engine_shutdown();
+ grpc_wakeup_fd_global_destroy();
+}
#endif /* GRPC_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index df6cf956d9..98ffccd59b 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -81,6 +81,7 @@ typedef struct {
grpc_closure read_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
+ grpc_udp_server_orphan_cb orphan_cb;
} server_port;
/* the overall server */
@@ -168,6 +169,10 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
server_port *sp = &s->ports[i];
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
+
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(sp->emfd);
+
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
}
@@ -268,7 +273,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int add_socket_to_server(grpc_udp_server *s, int fd,
const struct sockaddr *addr, size_t addr_len,
- grpc_udp_server_read_cb read_cb) {
+ grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
server_port *sp;
int port;
char *addr_str;
@@ -292,6 +298,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->read_cb = read_cb;
+ sp->orphan_cb = orphan_cb;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
@@ -301,7 +308,8 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
}
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb) {
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
int allocated_port1 = -1;
int allocated_port2 = -1;
unsigned i;
@@ -348,7 +356,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
- allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port1 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -370,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
- allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port2 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
done:
gpr_free(allocated_addr);
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index d8cf957a22..33c5ce11cd 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -48,6 +48,9 @@ typedef struct grpc_udp_server grpc_udp_server;
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
struct grpc_server *server);
+/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
+typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd);
+
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
@@ -69,7 +72,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb);
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb);
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
grpc_closure *on_done);
diff --git a/src/core/lib/support/string_util_win32.c b/src/core/lib/support/string_util_win32.c
index f3cb0c050f..0d7bcdb5aa 100644
--- a/src/core/lib/support/string_util_win32.c
+++ b/src/core/lib/support/string_util_win32.c
@@ -83,7 +83,7 @@ char *gpr_format_message(int messageid) {
DWORD status = FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
- NULL, (DWORD)messageid, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ NULL, (DWORD)messageid, MAKELANGID(LANG_ENGLISH, SUBLANG_DEFAULT),
(LPTSTR)(&tmessage), 0, NULL);
if (status == 0) return gpr_strdup("Unable to retrieve error string");
message = gpr_tchar_to_char(tmessage);
diff --git a/src/core/lib/surface/byte_buffer_reader.c b/src/core/lib/surface/byte_buffer_reader.c
index 809fd5f1fa..c97079f638 100644
--- a/src/core/lib/surface/byte_buffer_reader.c
+++ b/src/core/lib/surface/byte_buffer_reader.c
@@ -62,12 +62,19 @@ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
case GRPC_BB_RAW:
gpr_slice_buffer_init(&decompressed_slices_buffer);
if (is_compressed(reader->buffer_in)) {
- grpc_msg_decompress(reader->buffer_in->data.raw.compression,
- &reader->buffer_in->data.raw.slice_buffer,
- &decompressed_slices_buffer);
- reader->buffer_out =
- grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
- decompressed_slices_buffer.count);
+ if (grpc_msg_decompress(reader->buffer_in->data.raw.compression,
+ &reader->buffer_in->data.raw.slice_buffer,
+ &decompressed_slices_buffer) == 0) {
+ gpr_log(GPR_ERROR,
+ "Unexpected error decompressing data for algorithm with enum "
+ "value '%d'. Reading data as if it were uncompressed.",
+ reader->buffer_in->data.raw.compression);
+ reader->buffer_out = reader->buffer_in;
+ } else { /* all fine */
+ reader->buffer_out =
+ grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
+ decompressed_slices_buffer.count);
+ }
gpr_slice_buffer_destroy(&decompressed_slices_buffer);
} else { /* not compressed, use the input buffer as output */
reader->buffer_out = reader->buffer_in;
diff --git a/src/core/lib/surface/call.c b/src/core/lib/surface/call.c
index 9b2b94eedf..c8728fa278 100644
--- a/src/core/lib/surface/call.c
+++ b/src/core/lib/surface/call.c
@@ -261,6 +261,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
call->channel = channel;
call->cq = cq;
call->parent = parent_call;
+ /* Always support no compression */
+ GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
call->is_client = server_transport_data == NULL;
if (call->is_client) {
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT);
@@ -408,6 +410,7 @@ static void set_status_code(grpc_call *call, status_source source,
static void set_compression_algorithm(grpc_call *call,
grpc_compression_algorithm algo) {
+ GPR_ASSERT(algo < GRPC_COMPRESS_ALGORITHMS_COUNT);
call->compression_algorithm = algo;
}
@@ -828,12 +831,16 @@ static uint32_t decode_status(grpc_mdelem *md) {
return status;
}
-static uint32_t decode_compression(grpc_mdelem *md) {
+static grpc_compression_algorithm decode_compression(grpc_mdelem *md) {
grpc_compression_algorithm algorithm =
grpc_compression_algorithm_from_mdstr(md->value);
if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
const char *md_c_str = grpc_mdstr_as_c_string(md->value);
- gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
+ gpr_log(GPR_ERROR,
+ "Invalid incoming compression algorithm: '%s'. Interpreting "
+ "incoming data as uncompressed.",
+ md_c_str);
+ return GRPC_COMPRESS_NONE;
}
return algorithm;
}
@@ -1087,6 +1094,24 @@ static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
grpc_metadata_batch_filter(md, recv_initial_filter, call);
+ /* make sure the received grpc-encoding is amongst the ones listed in
+ * grpc-accept-encoding */
+
+ GPR_ASSERT(call->encodings_accepted_by_peer != 0);
+ if (!GPR_BITGET(call->encodings_accepted_by_peer,
+ call->compression_algorithm)) {
+ extern int grpc_compression_trace;
+ if (grpc_compression_trace) {
+ char *algo_name;
+ grpc_compression_algorithm_name(call->compression_algorithm,
+ &algo_name);
+ gpr_log(GPR_ERROR,
+ "Compression algorithm (grpc-encoding = '%s') not present in "
+ "the bitset of accepted encodings (grpc-accept-encodings: "
+ "'0x%x')",
+ algo_name, call->encodings_accepted_by_peer);
+ }
+ }
if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
0 &&
!call->is_client) {
@@ -1474,7 +1499,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
grpc_call_error err;
GRPC_API_TRACE(
- "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)",
+ "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, "
+ "reserved=%p)",
5, (call, ops, (unsigned long)nops, tag, reserved));
if (reserved != NULL) {
diff --git a/src/core/lib/surface/init.c b/src/core/lib/surface/init.c
index 57c6897626..1c8b709015 100644
--- a/src/core/lib/surface/init.c
+++ b/src/core/lib/surface/init.c
@@ -164,7 +164,7 @@ void grpc_init(void) {
grpc_register_tracer("channel_stack_builder",
&grpc_trace_channel_stack_builder);
grpc_register_tracer("http1", &grpc_http1_trace);
- grpc_register_tracer("compression", &grpc_compress_filter_trace);
+ grpc_register_tracer("compression", &grpc_compression_trace);
grpc_security_pre_init();
grpc_iomgr_init();
grpc_executor_init();
diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc
index db3558f192..f297ae8587 100644
--- a/src/cpp/common/channel_arguments.cc
+++ b/src/cpp/common/channel_arguments.cc
@@ -85,7 +85,7 @@ void ChannelArguments::Swap(ChannelArguments& other) {
void ChannelArguments::SetCompressionAlgorithm(
grpc_compression_algorithm algorithm) {
- SetInt(GRPC_COMPRESSION_ALGORITHM_ARG, algorithm);
+ SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm);
}
// Note: a second call to this will add in front the result of the first call.
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index fafe31e84c..f955a31494 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -33,6 +33,7 @@
#include <grpc++/server.h>
+#include <sstream>
#include <utility>
#include <grpc++/completion_queue.h>
@@ -41,6 +42,7 @@
#include <grpc++/impl/grpc_library.h>
#include <grpc++/impl/method_handler_impl.h>
#include <grpc++/impl/rpc_service_method.h>
+#include <grpc++/impl/server_initializer.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/security/server_credentials.h>
#include <grpc++/server_context.h>
@@ -284,7 +286,8 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
has_generic_service_(false),
server_(nullptr),
thread_pool_(thread_pool),
- thread_pool_owned_(thread_pool_owned) {
+ thread_pool_owned_(thread_pool_owned),
+ server_initializer_(new ServerInitializer(this)) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
@@ -341,6 +344,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
"Can only register an asynchronous service against one server.");
service->server_ = this;
}
+ const char* method_name = nullptr;
for (auto it = service->methods_.begin(); it != service->methods_.end();
++it) {
if (it->get() == nullptr) { // Handled by generic service if any.
@@ -360,6 +364,17 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
} else {
sync_methods_->emplace_back(method, tag);
}
+ method_name = method->name();
+ }
+
+ // Parse service name.
+ if (method_name != nullptr) {
+ std::stringstream ss(method_name);
+ grpc::string service_name;
+ if (std::getline(ss, service_name, '/') &&
+ std::getline(ss, service_name, '/')) {
+ services_.push_back(service_name);
+ }
}
return true;
}
@@ -598,4 +613,6 @@ void Server::RunRpc() {
}
}
+ServerInitializer* Server::initializer() { return server_initializer_.get(); }
+
} // namespace grpc
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 68cc38258c..61f0f6ae2a 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -41,9 +41,23 @@
namespace grpc {
+static std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>*
+ g_plugin_factory_list;
+static gpr_once once_init_plugin_list = GPR_ONCE_INIT;
+
+static void do_plugin_list_init(void) {
+ g_plugin_factory_list =
+ new std::vector<std::unique_ptr<ServerBuilderPlugin> (*)()>();
+}
+
ServerBuilder::ServerBuilder()
: max_message_size_(-1), generic_service_(nullptr) {
grpc_compression_options_init(&compression_options_);
+ gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
+ for (auto factory : (*g_plugin_factory_list)) {
+ std::unique_ptr<ServerBuilderPlugin> plugin = factory();
+ plugins_[plugin->name()] = std::move(plugin);
+ }
}
std::unique_ptr<ServerCompletionQueue> ServerBuilder::AddCompletionQueue() {
@@ -96,14 +110,24 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
ChannelArguments args;
for (auto option = options_.begin(); option != options_.end(); ++option) {
(*option)->UpdateArguments(&args);
+ (*option)->UpdatePlugins(&plugins_);
+ }
+ if (thread_pool == nullptr) {
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ if ((*plugin).second->has_sync_methods()) {
+ thread_pool.reset(CreateDefaultThreadPool());
+ break;
+ }
+ }
}
if (max_message_size_ > 0) {
args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_);
}
- args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
+ args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args));
+ ServerInitializer* initializer = server->initializer();
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
@@ -114,6 +138,9 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
return nullptr;
}
}
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ (*plugin).second->InitServer(initializer);
+ }
if (generic_service_) {
server->RegisterAsyncGenericService(generic_service_);
} else {
@@ -137,7 +164,16 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
if (!server->Start(cqs_data, cqs_.size())) {
return nullptr;
}
+ for (auto plugin = plugins_.begin(); plugin != plugins_.end(); plugin++) {
+ (*plugin).second->Finish(initializer);
+ }
return server;
}
+void ServerBuilder::InternalAddPluginFactory(
+ std::unique_ptr<ServerBuilderPlugin> (*CreatePlugin)()) {
+ gpr_once_init(&once_init_plugin_list, do_plugin_list_init);
+ (*g_plugin_factory_list).push_back(CreatePlugin);
+}
+
} // namespace grpc
diff --git a/src/csharp/Grpc.Core.Tests/ChannelTest.cs b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
index 6330f50fae..850d70ce92 100644
--- a/src/csharp/Grpc.Core.Tests/ChannelTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ChannelTest.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Core.Internal;
using Grpc.Core.Utils;
@@ -89,5 +90,43 @@ namespace Grpc.Core.Tests
channel.ShutdownAsync().Wait();
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await channel.ShutdownAsync());
}
+
+ [Test]
+ public async Task ShutdownTokenCancelledAfterShutdown()
+ {
+ var channel = new Channel("localhost", ChannelCredentials.Insecure);
+ Assert.IsFalse(channel.ShutdownToken.IsCancellationRequested);
+ var shutdownTask = channel.ShutdownAsync();
+ Assert.IsTrue(channel.ShutdownToken.IsCancellationRequested);
+ await shutdownTask;
+ }
+
+ [Test]
+ public async Task StateIsFatalFailureAfterShutdown()
+ {
+ var channel = new Channel("localhost", ChannelCredentials.Insecure);
+ await channel.ShutdownAsync();
+ Assert.AreEqual(ChannelState.FatalFailure, channel.State);
+ }
+
+ [Test]
+ public async Task ShutdownFinishesWaitForStateChangedAsync()
+ {
+ var channel = new Channel("localhost", ChannelCredentials.Insecure);
+ var stateChangedTask = channel.WaitForStateChangedAsync(ChannelState.Idle);
+ var shutdownTask = channel.ShutdownAsync();
+ await stateChangedTask;
+ await shutdownTask;
+ }
+
+ [Test]
+ public async Task OperationsThrowAfterShutdown()
+ {
+ var channel = new Channel("localhost", ChannelCredentials.Insecure);
+ await channel.ShutdownAsync();
+ Assert.ThrowsAsync(typeof(ObjectDisposedException), async () => await channel.WaitForStateChangedAsync(ChannelState.Idle));
+ Assert.Throws(typeof(ObjectDisposedException), () => { var x = channel.ResolvedTarget; });
+ Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await channel.ConnectAsync());
+ }
}
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index abe9d4a2e6..777a1c8c50 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -181,13 +181,14 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void ClientStreaming_WriteFailure()
+ public void ClientStreaming_WriteCompletionFailure()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
+ // TODO: maybe IOException or waiting for RPCException is more appropriate here.
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
@@ -199,7 +200,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void ClientStreaming_WriteAfterReceivingStatusFails()
+ public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -210,7 +211,44 @@ namespace Grpc.Core.Internal.Tests
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+ Assert.AreEqual(Status.DefaultSuccess, ex.Status);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
+ var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+ Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ requestStream.CompleteAsync();
+
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
+
+ fakeCall.SendCompletionHandler(true);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
}
[Test]
@@ -229,7 +267,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void ClientStreaming_WriteAfterCancellationRequestFails()
+ public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -340,7 +378,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void DuplexStreaming_WriteAfterReceivingStatusFails()
+ public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -352,7 +390,8 @@ namespace Grpc.Core.Internal.Tests
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
- Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
+ Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
@@ -372,7 +411,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void DuplexStreaming_WriteAfterCancellationRequestFails()
+ public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
diff --git a/src/csharp/Grpc.Core/Channel.cs b/src/csharp/Grpc.Core/Channel.cs
index 89981b1849..93a6e6a3d9 100644
--- a/src/csharp/Grpc.Core/Channel.cs
+++ b/src/csharp/Grpc.Core/Channel.cs
@@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Threading;
using System.Threading.Tasks;
using Grpc.Core.Internal;
@@ -51,6 +52,7 @@ namespace Grpc.Core
readonly object myLock = new object();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
+ readonly CancellationTokenSource shutdownTokenSource = new CancellationTokenSource();
readonly string target;
readonly GrpcEnvironment environment;
@@ -101,12 +103,13 @@ namespace Grpc.Core
/// <summary>
/// Gets current connectivity state of this channel.
+ /// After channel is has been shutdown, <c>ChannelState.FatalFailure</c> will be returned.
/// </summary>
public ChannelState State
{
get
{
- return handle.CheckConnectivityState(false);
+ return GetConnectivityState(false);
}
}
@@ -155,6 +158,17 @@ namespace Grpc.Core
}
/// <summary>
+ /// Returns a token that gets cancelled once <c>ShutdownAsync</c> is invoked.
+ /// </summary>
+ public CancellationToken ShutdownToken
+ {
+ get
+ {
+ return this.shutdownTokenSource.Token;
+ }
+ }
+
+ /// <summary>
/// Allows explicitly requesting channel to connect without starting an RPC.
/// Returned task completes once state Ready was seen. If the deadline is reached,
/// or channel enters the FatalFailure state, the task is cancelled.
@@ -164,7 +178,7 @@ namespace Grpc.Core
/// <param name="deadline">The deadline. <c>null</c> indicates no deadline.</param>
public async Task ConnectAsync(DateTime? deadline = null)
{
- var currentState = handle.CheckConnectivityState(true);
+ var currentState = GetConnectivityState(true);
while (currentState != ChannelState.Ready)
{
if (currentState == ChannelState.FatalFailure)
@@ -172,7 +186,7 @@ namespace Grpc.Core
throw new OperationCanceledException("Channel has reached FatalFailure state.");
}
await WaitForStateChangedAsync(currentState, deadline).ConfigureAwait(false);
- currentState = handle.CheckConnectivityState(false);
+ currentState = GetConnectivityState(false);
}
}
@@ -188,6 +202,8 @@ namespace Grpc.Core
shutdownRequested = true;
}
+ shutdownTokenSource.Cancel();
+
var activeCallCount = activeCallCounter.Count;
if (activeCallCount > 0)
{
@@ -231,6 +247,18 @@ namespace Grpc.Core
activeCallCounter.Decrement();
}
+ private ChannelState GetConnectivityState(bool tryToConnect)
+ {
+ try
+ {
+ return handle.CheckConnectivityState(tryToConnect);
+ }
+ catch (ObjectDisposedException)
+ {
+ return ChannelState.FatalFailure;
+ }
+ }
+
private static void EnsureUserAgentChannelOption(Dictionary<string, ChannelOption> options)
{
var key = ChannelOptions.PrimaryUserAgentString;
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index f522174bd0..55351869b5 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -57,7 +57,7 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
- // Indicates that steaming call has finished.
+ // Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// Response headers set here once received.
@@ -443,6 +443,19 @@ namespace Grpc.Core.Internal
}
}
+ protected override void CheckSendingAllowed(bool allowFinished)
+ {
+ base.CheckSendingAllowed(true);
+
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ if (!allowFinished && finishedStatus.HasValue)
+ {
+ throw new RpcException(finishedStatus.Value.Status);
+ }
+ }
+
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 42234dcac2..4de23706b2 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -213,7 +213,7 @@ namespace Grpc.Core.Internal
{
}
- protected void CheckSendingAllowed(bool allowFinished)
+ protected virtual void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();
diff --git a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
index 875202b950..ee11105efe 100644
--- a/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
+++ b/src/csharp/Grpc.Examples.Tests/MathClientServerTests.cs
@@ -92,7 +92,7 @@ namespace Math.Tests
public void DivByZero()
{
var ex = Assert.Throws<RpcException>(() => client.Div(new DivArgs { Dividend = 0, Divisor = 0 }));
- Assert.AreEqual(StatusCode.Unknown, ex.Status.StatusCode);
+ Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode);
}
[Test]
diff --git a/src/csharp/Grpc.Examples/MathExamples.cs b/src/csharp/Grpc.Examples/MathExamples.cs
index 6075420974..d260830b94 100644
--- a/src/csharp/Grpc.Examples/MathExamples.cs
+++ b/src/csharp/Grpc.Examples/MathExamples.cs
@@ -32,6 +32,7 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
+using Grpc.Core;
using Grpc.Core.Utils;
namespace Math
@@ -109,5 +110,42 @@ namespace Math
DivReply result = await client.DivAsync(new DivArgs { Dividend = sum.Num_, Divisor = numbers.Count });
Console.WriteLine("Avg Result: " + result);
}
+
+ /// <summary>
+ /// Shows how to handle a call ending with non-OK status.
+ /// </summary>
+ public static async Task HandleErrorExample(Math.MathClient client)
+ {
+ try
+ {
+ DivReply result = await client.DivAsync(new DivArgs { Dividend = 5, Divisor = 0 });
+ }
+ catch (RpcException ex)
+ {
+ Console.WriteLine(string.Format("RPC ended with status {0}", ex.Status));
+ }
+ }
+
+ /// <summary>
+ /// Shows how to send request headers and how to access response headers
+ /// and response trailers.
+ /// </summary>
+ public static async Task MetadataExample(Math.MathClient client)
+ {
+ var requestHeaders = new Metadata
+ {
+ { "custom-header", "custom-value" }
+ };
+
+ var call = client.DivAsync(new DivArgs { Dividend = 5, Divisor = 0 }, requestHeaders);
+
+ // Get response headers
+ Metadata responseHeaders = await call.ResponseHeadersAsync;
+
+ var result = await call;
+
+ // Get response trailers after the call has finished.
+ Metadata responseTrailers = call.GetTrailers();
+ }
}
}
diff --git a/src/csharp/Grpc.Examples/MathServiceImpl.cs b/src/csharp/Grpc.Examples/MathServiceImpl.cs
index 79c56e57a8..a28020f62f 100644
--- a/src/csharp/Grpc.Examples/MathServiceImpl.cs
+++ b/src/csharp/Grpc.Examples/MathServiceImpl.cs
@@ -52,23 +52,15 @@ namespace Math
public override async Task Fib(FibArgs request, IServerStreamWriter<Num> responseStream, ServerCallContext context)
{
- if (request.Limit <= 0)
- {
- // keep streaming the sequence until cancelled.
- IEnumerator<Num> fibEnumerator = FibInternal(long.MaxValue).GetEnumerator();
- while (!context.CancellationToken.IsCancellationRequested && fibEnumerator.MoveNext())
- {
- await responseStream.WriteAsync(fibEnumerator.Current);
- await Task.Delay(100);
- }
- }
+ var limit = request.Limit > 0 ? request.Limit : long.MaxValue;
+ var fibEnumerator = FibInternal(limit).GetEnumerator();
- if (request.Limit > 0)
+ // Keep streaming the sequence until the call is cancelled.
+ // Use CancellationToken from ServerCallContext to detect the cancellation.
+ while (!context.CancellationToken.IsCancellationRequested && fibEnumerator.MoveNext())
{
- foreach (var num in FibInternal(request.Limit))
- {
- await responseStream.WriteAsync(num);
- }
+ await responseStream.WriteAsync(fibEnumerator.Current);
+ await Task.Delay(100);
}
}
@@ -89,6 +81,13 @@ namespace Math
static DivReply DivInternal(DivArgs args)
{
+ if (args.Divisor == 0)
+ {
+ // One can finish the RPC with non-ok status by throwing RpcException instance.
+ // Alternatively, resulting status can be set using ServerCallContext.Status
+ throw new RpcException(new Status(StatusCode.InvalidArgument, "Division by zero"));
+ }
+
long quotient = args.Dividend / args.Divisor;
long remainder = args.Dividend % args.Divisor;
return new DivReply { Quotient = quotient, Remainder = remainder };
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index b3b1abf1bc..cff8508631 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -492,6 +492,10 @@ namespace Grpc.IntegrationTesting
{
// Deadline was reached before write has started. Eat the exception and continue.
}
+ catch (RpcException)
+ {
+ // Deadline was reached before write has started. Eat the exception and continue.
+ }
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
diff --git a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
index 8db691cb04..4d6ca7ece5 100644
--- a/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/StressTestClient.cs
@@ -311,7 +311,7 @@ namespace Grpc.IntegrationTesting
var snapshot = histogram.GetSnapshot(true);
var elapsedSnapshot = wallClockStopwatch.GetElapsedSnapshot(true);
- return (long) (snapshot.Count / elapsedSnapshot.Seconds);
+ return (long) (snapshot.Count / elapsedSnapshot.TotalSeconds);
}
}
}
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index 7520b0f81a..28e4262121 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -1,3 +1,32 @@
+@rem Copyright 2016, Google Inc.
+@rem All rights reserved.
+@rem
+@rem Redistribution and use in source and binary forms, with or without
+@rem modification, are permitted provided that the following conditions are
+@rem met:
+@rem
+@rem * Redistributions of source code must retain the above copyright
+@rem notice, this list of conditions and the following disclaimer.
+@rem * Redistributions in binary form must reproduce the above
+@rem copyright notice, this list of conditions and the following disclaimer
+@rem in the documentation and/or other materials provided with the
+@rem distribution.
+@rem * Neither the name of Google Inc. nor the names of its
+@rem contributors may be used to endorse or promote products derived from
+@rem this software without specific prior written permission.
+@rem
+@rem THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+@rem "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+@rem LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+@rem A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+@rem OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+@rem SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+@rem LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+@rem DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+@rem THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+@rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+@rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
@rem Builds gRPC NuGet packages
@rem Current package versions
diff --git a/src/csharp/buildall.bat b/src/csharp/buildall.bat
index f800756dfe..0beb30c198 100644
--- a/src/csharp/buildall.bat
+++ b/src/csharp/buildall.bat
@@ -1,3 +1,32 @@
+@rem Copyright 2016, Google Inc.
+@rem All rights reserved.
+@rem
+@rem Redistribution and use in source and binary forms, with or without
+@rem modification, are permitted provided that the following conditions are
+@rem met:
+@rem
+@rem * Redistributions of source code must retain the above copyright
+@rem notice, this list of conditions and the following disclaimer.
+@rem * Redistributions in binary form must reproduce the above
+@rem copyright notice, this list of conditions and the following disclaimer
+@rem in the documentation and/or other materials provided with the
+@rem distribution.
+@rem * Neither the name of Google Inc. nor the names of its
+@rem contributors may be used to endorse or promote products derived from
+@rem this software without specific prior written permission.
+@rem
+@rem THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+@rem "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+@rem LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+@rem A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+@rem OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+@rem SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+@rem LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+@rem DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+@rem THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+@rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+@rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
@rem Convenience script to build gRPC C# from command line
setlocal
diff --git a/src/node/tools/bin/protoc.js b/src/node/tools/bin/protoc.js
index 4d50c94b0f..53fc5dc428 100755
--- a/src/node/tools/bin/protoc.js
+++ b/src/node/tools/bin/protoc.js
@@ -47,10 +47,11 @@ var exe_ext = process.platform === 'win32' ? '.exe' : '';
var protoc = path.resolve(__dirname, 'protoc' + exe_ext);
-execFile(protoc, process.argv.slice(2), function(error, stdout, stderr) {
+var child_process = execFile(protoc, process.argv.slice(2), function(error, stdout, stderr) {
if (error) {
throw error;
}
- console.log(stdout);
- console.log(stderr);
});
+
+child_process.stdout.pipe(process.stdout);
+child_process.stderr.pipe(process.stderr);
diff --git a/src/node/tools/bin/protoc_plugin.js b/src/node/tools/bin/protoc_plugin.js
index 281ec0d85e..857882e1c3 100755
--- a/src/node/tools/bin/protoc_plugin.js
+++ b/src/node/tools/bin/protoc_plugin.js
@@ -47,10 +47,12 @@ var exe_ext = process.platform === 'win32' ? '.exe' : '';
var plugin = path.resolve(__dirname, 'grpc_node_plugin' + exe_ext);
-execFile(plugin, process.argv.slice(2), function(error, stdout, stderr) {
+var child_process = execFile(plugin, process.argv.slice(2), {encoding: 'buffer'}, function(error, stdout, stderr) {
if (error) {
throw error;
}
- console.log(stdout);
- console.log(stderr);
});
+
+process.stdin.pipe(child_process.stdin);
+child_process.stdout.pipe(process.stdout);
+child_process.stderr.pipe(process.stderr);
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index a0f3d160c6..884130e7d4 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -89,19 +89,20 @@ zend_object_value create_wrapped_grpc_call(zend_class_entry *class_type
/* Wraps a grpc_call struct in a PHP object. Owned indicates whether the struct
should be destroyed at the end of the object's lifecycle */
-zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned) {
+zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC) {
zval *call_object;
MAKE_STD_ZVAL(call_object);
object_init_ex(call_object, grpc_ce_call);
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(call_object TSRMLS_CC);
call->wrapped = wrapped;
+ call->owned = owned;
return call_object;
}
/* Creates and returns a PHP array object with the data in a
* grpc_metadata_array. Returns NULL on failure */
-zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array) {
+zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array TSRMLS_DC) {
int count = metadata_array->count;
grpc_metadata *elements = metadata_array->metadata;
int i;
@@ -126,7 +127,7 @@ zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array) {
if (zend_hash_find(array_hash, str_key, key_len, (void **)data) ==
SUCCESS) {
if (Z_TYPE_P(*data) != IS_ARRAY) {
- zend_throw_exception(zend_exception_get_default(),
+ zend_throw_exception(zend_exception_get_default(TSRMLS_C),
"Metadata hash somehow contains wrong types.",
1 TSRMLS_CC);
efree(str_key);
@@ -453,7 +454,7 @@ PHP_METHOD(Call, startBatch) {
add_property_bool(result, "send_status", true);
break;
case GRPC_OP_RECV_INITIAL_METADATA:
- array = grpc_parse_metadata_array(&recv_metadata);
+ array = grpc_parse_metadata_array(&recv_metadata TSRMLS_CC);
add_property_zval(result, "metadata", array);
Z_DELREF_P(array);
break;
@@ -469,7 +470,7 @@ PHP_METHOD(Call, startBatch) {
case GRPC_OP_RECV_STATUS_ON_CLIENT:
MAKE_STD_ZVAL(recv_status);
object_init(recv_status);
- array = grpc_parse_metadata_array(&recv_trailing_metadata);
+ array = grpc_parse_metadata_array(&recv_trailing_metadata TSRMLS_CC);
add_property_zval(recv_status, "metadata", array);
Z_DELREF_P(array);
add_property_long(recv_status, "code", status);
diff --git a/src/php/ext/grpc/call.h b/src/php/ext/grpc/call.h
index 73efadae35..36c5f2d272 100644
--- a/src/php/ext/grpc/call.h
+++ b/src/php/ext/grpc/call.h
@@ -60,11 +60,11 @@ typedef struct wrapped_grpc_call {
void grpc_init_call(TSRMLS_D);
/* Creates a Call object that wraps the given grpc_call struct */
-zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned);
+zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC);
/* Creates and returns a PHP associative array of metadata from a C array of
* call metadata */
-zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array);
+zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array TSRMLS_DC);
/* Populates a grpc_metadata_array with the data in a PHP array object.
Returns true on success and false on failure */
diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c
index 285c4e7c85..ec0e6b9181 100644
--- a/src/php/ext/grpc/call_credentials.c
+++ b/src/php/ext/grpc/call_credentials.c
@@ -83,7 +83,7 @@ zend_object_value create_wrapped_grpc_call_credentials(
return retval;
}
-zval *grpc_php_wrap_call_credentials(grpc_call_credentials *wrapped) {
+zval *grpc_php_wrap_call_credentials(grpc_call_credentials *wrapped TSRMLS_DC) {
zval *credentials_object;
MAKE_STD_ZVAL(credentials_object);
object_init_ex(credentials_object, grpc_ce_call_credentials);
@@ -122,7 +122,7 @@ PHP_METHOD(CallCredentials, createComposite) {
grpc_call_credentials *creds =
grpc_composite_call_credentials_create(cred1->wrapped, cred2->wrapped,
NULL);
- zval *creds_object = grpc_php_wrap_call_credentials(creds);
+ zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC);
RETURN_DESTROY_ZVAL(creds_object);
}
@@ -141,7 +141,7 @@ PHP_METHOD(CallCredentials, createFromPlugin) {
memset(fci_cache, 0, sizeof(zend_fcall_info_cache));
/* "f" == 1 function */
- if (zend_parse_parameters(ZEND_NUM_ARGS(), "f", fci,
+ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", fci,
fci_cache,
fci->params,
fci->param_count) == FAILURE) {
@@ -167,7 +167,7 @@ PHP_METHOD(CallCredentials, createFromPlugin) {
grpc_call_credentials *creds = grpc_metadata_credentials_create_from_plugin(
plugin, NULL);
- zval *creds_object = grpc_php_wrap_call_credentials(creds);
+ zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC);
RETURN_DESTROY_ZVAL(creds_object);
}
@@ -175,6 +175,8 @@ PHP_METHOD(CallCredentials, createFromPlugin) {
void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context,
grpc_credentials_plugin_metadata_cb cb,
void *user_data) {
+ TSRMLS_FETCH();
+
plugin_state *state = (plugin_state *)ptr;
/* prepare to call the user callback function with info from the
@@ -192,7 +194,7 @@ void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context,
state->fci->retval_ptr_ptr = &retval;
/* call the user callback function */
- zend_call_function(state->fci, state->fci_cache);
+ zend_call_function(state->fci, state->fci_cache TSRMLS_CC);
if (Z_TYPE_P(retval) != IS_ARRAY) {
zend_throw_exception(spl_ce_InvalidArgumentException,
diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c
index eba2c81424..9f0431908f 100644
--- a/src/php/ext/grpc/channel.c
+++ b/src/php/ext/grpc/channel.c
@@ -84,7 +84,7 @@ zend_object_value create_wrapped_grpc_channel(zend_class_entry *class_type
return retval;
}
-void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args) {
+void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_DC) {
HashTable *array_hash;
HashPosition array_pointer;
int args_index;
@@ -168,7 +168,7 @@ PHP_METHOD(Channel, __construct) {
zend_hash_del(array_hash, "credentials", 12);
}
}
- php_grpc_read_args_array(args_array, &args);
+ php_grpc_read_args_array(args_array, &args TSRMLS_CC);
if (creds == NULL) {
channel->wrapped = grpc_insecure_channel_create(target, &args, NULL);
} else {
diff --git a/src/php/ext/grpc/channel.h b/src/php/ext/grpc/channel.h
index 78a16ed0c9..cc5823ee7f 100755
--- a/src/php/ext/grpc/channel.h
+++ b/src/php/ext/grpc/channel.h
@@ -59,6 +59,6 @@ typedef struct wrapped_grpc_channel {
void grpc_init_channel(TSRMLS_D);
/* Iterates through a PHP array and populates args with the contents */
-void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args);
+void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_DC);
#endif /* NET_GRPC_PHP_GRPC_CHANNEL_H_ */
diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c
index ae9a9897fc..5c537378a6 100644
--- a/src/php/ext/grpc/channel_credentials.c
+++ b/src/php/ext/grpc/channel_credentials.c
@@ -82,7 +82,7 @@ zend_object_value create_wrapped_grpc_channel_credentials(
return retval;
}
-zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped) {
+zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped TSRMLS_DC) {
zval *credentials_object;
MAKE_STD_ZVAL(credentials_object);
object_init_ex(credentials_object, grpc_ce_channel_credentials);
@@ -99,7 +99,7 @@ zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped) {
*/
PHP_METHOD(ChannelCredentials, createDefault) {
grpc_channel_credentials *creds = grpc_google_default_credentials_create();
- zval *creds_object = grpc_php_wrap_channel_credentials(creds);
+ zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC);
RETURN_DESTROY_ZVAL(creds_object);
}
@@ -134,7 +134,7 @@ PHP_METHOD(ChannelCredentials, createSsl) {
grpc_channel_credentials *creds = grpc_ssl_credentials_create(
pem_root_certs,
pem_key_cert_pair.private_key == NULL ? NULL : &pem_key_cert_pair, NULL);
- zval *creds_object = grpc_php_wrap_channel_credentials(creds);
+ zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC);
RETURN_DESTROY_ZVAL(creds_object);
}
@@ -165,7 +165,7 @@ PHP_METHOD(ChannelCredentials, createComposite) {
grpc_channel_credentials *creds =
grpc_composite_channel_credentials_create(cred1->wrapped, cred2->wrapped,
NULL);
- zval *creds_object = grpc_php_wrap_channel_credentials(creds);
+ zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC);
RETURN_DESTROY_ZVAL(creds_object);
}
diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c
index ca129e76ca..6df2e4f978 100644
--- a/src/php/ext/grpc/server.c
+++ b/src/php/ext/grpc/server.c
@@ -111,7 +111,7 @@ PHP_METHOD(Server, __construct) {
if (args_array == NULL) {
server->wrapped = grpc_server_create(NULL, NULL);
} else {
- php_grpc_read_args_array(args_array, &args);
+ php_grpc_read_args_array(args_array, &args TSRMLS_CC);
server->wrapped = grpc_server_create(&args, NULL);
efree(args.args);
}
@@ -154,12 +154,12 @@ PHP_METHOD(Server, requestCall) {
1 TSRMLS_CC);
goto cleanup;
}
- add_property_zval(result, "call", grpc_php_wrap_call(call, true));
+ add_property_zval(result, "call", grpc_php_wrap_call(call, true TSRMLS_CC));
add_property_string(result, "method", details.method, true);
add_property_string(result, "host", details.host, true);
add_property_zval(result, "absolute_deadline",
- grpc_php_wrap_timeval(details.deadline));
- add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata));
+ grpc_php_wrap_timeval(details.deadline TSRMLS_CC));
+ add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata TSRMLS_CC));
cleanup:
grpc_call_details_destroy(&details);
grpc_metadata_array_destroy(&metadata);
diff --git a/src/php/ext/grpc/server_credentials.c b/src/php/ext/grpc/server_credentials.c
index f3951b31fe..505da10a28 100644
--- a/src/php/ext/grpc/server_credentials.c
+++ b/src/php/ext/grpc/server_credentials.c
@@ -81,7 +81,7 @@ zend_object_value create_wrapped_grpc_server_credentials(
return retval;
}
-zval *grpc_php_wrap_server_credentials(grpc_server_credentials *wrapped) {
+zval *grpc_php_wrap_server_credentials(grpc_server_credentials *wrapped TSRMLS_DC) {
zval *server_credentials_object;
MAKE_STD_ZVAL(server_credentials_object);
object_init_ex(server_credentials_object, grpc_ce_server_credentials);
@@ -120,7 +120,7 @@ PHP_METHOD(ServerCredentials, createSsl) {
grpc_server_credentials *creds = grpc_ssl_server_credentials_create_ex(
pem_root_certs, &pem_key_cert_pair, 1,
GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, NULL);
- zval *creds_object = grpc_php_wrap_server_credentials(creds);
+ zval *creds_object = grpc_php_wrap_server_credentials(creds TSRMLS_CC);
RETURN_DESTROY_ZVAL(creds_object);
}
diff --git a/src/php/ext/grpc/timeval.c b/src/php/ext/grpc/timeval.c
index 4fd069e19a..5e242162a8 100644
--- a/src/php/ext/grpc/timeval.c
+++ b/src/php/ext/grpc/timeval.c
@@ -72,7 +72,7 @@ zend_object_value create_wrapped_grpc_timeval(zend_class_entry *class_type
return retval;
}
-zval *grpc_php_wrap_timeval(gpr_timespec wrapped) {
+zval *grpc_php_wrap_timeval(gpr_timespec wrapped TSRMLS_DC) {
zval *timeval_object;
MAKE_STD_ZVAL(timeval_object);
object_init_ex(timeval_object, grpc_ce_timeval);
@@ -122,7 +122,7 @@ PHP_METHOD(Timeval, add) {
wrapped_grpc_timeval *other =
(wrapped_grpc_timeval *)zend_object_store_get_object(other_obj TSRMLS_CC);
zval *sum =
- grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped));
+ grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped) TSRMLS_CC);
RETURN_DESTROY_ZVAL(sum);
}
@@ -146,7 +146,7 @@ PHP_METHOD(Timeval, subtract) {
wrapped_grpc_timeval *other =
(wrapped_grpc_timeval *)zend_object_store_get_object(other_obj TSRMLS_CC);
zval *diff =
- grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped));
+ grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped) TSRMLS_CC);
RETURN_DESTROY_ZVAL(diff);
}
@@ -208,7 +208,7 @@ PHP_METHOD(Timeval, similar) {
* @return Timeval The current time
*/
PHP_METHOD(Timeval, now) {
- zval *now = grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME));
+ zval *now = grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME) TSRMLS_CC);
RETURN_DESTROY_ZVAL(now);
}
@@ -218,7 +218,7 @@ PHP_METHOD(Timeval, now) {
*/
PHP_METHOD(Timeval, zero) {
zval *grpc_php_timeval_zero =
- grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME));
+ grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME) TSRMLS_CC);
RETURN_ZVAL(grpc_php_timeval_zero,
false, /* Copy original before returning? */
true /* Destroy original before returning */);
@@ -230,7 +230,7 @@ PHP_METHOD(Timeval, zero) {
*/
PHP_METHOD(Timeval, infFuture) {
zval *grpc_php_timeval_inf_future =
- grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME));
+ grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME) TSRMLS_CC);
RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_future);
}
@@ -240,7 +240,7 @@ PHP_METHOD(Timeval, infFuture) {
*/
PHP_METHOD(Timeval, infPast) {
zval *grpc_php_timeval_inf_past =
- grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME));
+ grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME) TSRMLS_CC);
RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_past);
}
diff --git a/src/php/ext/grpc/timeval.h b/src/php/ext/grpc/timeval.h
index 07cef037cb..7456eb6d58 100755
--- a/src/php/ext/grpc/timeval.h
+++ b/src/php/ext/grpc/timeval.h
@@ -63,6 +63,6 @@ void grpc_init_timeval(TSRMLS_D);
void grpc_shutdown_timeval(TSRMLS_D);
/* Creates a Timeval object that wraps the given timeval struct */
-zval *grpc_php_wrap_timeval(gpr_timespec wrapped);
+zval *grpc_php_wrap_timeval(gpr_timespec wrapped TSRMLS_DC);
#endif /* NET_GRPC_PHP_GRPC_TIMEVAL_H_ */
diff --git a/src/proto/grpc/reflection/v1alpha/reflection.proto b/src/proto/grpc/reflection/v1alpha/reflection.proto
new file mode 100644
index 0000000000..276ff0e255
--- /dev/null
+++ b/src/proto/grpc/reflection/v1alpha/reflection.proto
@@ -0,0 +1,151 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// Service exported by server reflection
+
+syntax = "proto3";
+
+package grpc.reflection.v1alpha;
+
+service ServerReflection {
+ // The reflection service is structured as a bidirectional stream, ensuring
+ // all related requests go to a single server.
+ rpc ServerReflectionInfo(stream ServerReflectionRequest)
+ returns (stream ServerReflectionResponse);
+}
+
+// The message sent by the client when calling ServerReflectionInfo method.
+message ServerReflectionRequest {
+ string host = 1;
+ // To use reflection service, the client should set one of the following
+ // fields in message_request. The server distinguishes requests by their
+ // defined field and then handles them using corresponding methods.
+ oneof message_request {
+ // Find a proto file by the file name.
+ string file_by_filename = 3;
+
+ // Find the proto file that declares the given fully-qualified symbol name.
+ // This field should be a fully-qualified symbol name
+ // (e.g. <package>.<service>[.<method>] or <package>.<type>).
+ string file_containing_symbol = 4;
+
+ // Find the proto file which defines an extension extending the given
+ // message type with the given field number.
+ ExtensionRequest file_containing_extension = 5;
+
+ // Finds the tag numbers used by all known extensions of extendee_type, and
+ // appends them to ExtensionNumberResponse in an undefined order.
+ // Its corresponding method is best-effort: it's not guaranteed that the
+ // reflection service will implement this method, and it's not guaranteed
+ // that this method will provide all extensions. Returns
+ // StatusCode::UNIMPLEMENTED if it's not implemented.
+ // This field should be a fully-qualified type name. The format is
+ // <package>.<type>
+ string all_extension_numbers_of_type = 6;
+
+ // List the full names of registered services. The content will not be
+ // checked.
+ string list_services = 7;
+ }
+}
+
+// The type name and extension number sent by the client when requesting
+// file_containing_extension.
+message ExtensionRequest {
+ // Fully-qualified type name. The format should be <package>.<type>
+ string containing_type = 1;
+ int32 extension_number = 2;
+}
+
+// The message sent by the server to answer ServerReflectionInfo method.
+message ServerReflectionResponse {
+ string valid_host = 1;
+ ServerReflectionRequest original_request = 2;
+ // The server set one of the following fields accroding to the message_request
+ // in the request.
+ oneof message_response {
+ // This message is used to answer file_by_filename, file_containing_symbol,
+ // file_containing_extension requests with transitive dependencies. As
+ // the repeated label is not allowed in oneof fields, we use a
+ // FileDescriptorResponse message to encapsulate the repeated fields.
+ // The reflection service is allowed to avoid sending FileDescriptorProtos
+ // that were previously sent in response to earlier requests in the stream.
+ FileDescriptorResponse file_descriptor_response = 4;
+
+ // This message is used to answer all_extension_numbers_of_type requst.
+ ExtensionNumberResponse all_extension_numbers_response = 5;
+
+ // This message is used to answer list_services request.
+ ListServiceResponse list_services_response = 6;
+
+ // This message is used when an error occurs.
+ ErrorResponse error_response = 7;
+ }
+}
+
+// Serialized FileDescriptorProto messages sent by the server answering
+// a file_by_filename, file_containing_symbol, or file_containing_extension
+// request.
+message FileDescriptorResponse {
+ // Serialized FileDescriptorProto messages. We avoid taking a dependency on
+ // descriptor.proto, which uses proto2 only features, by making them opaque
+ // bytes instead.
+ repeated bytes file_descriptor_proto = 1;
+}
+
+// A list of extension numbers sent by the server answering
+// all_extension_numbers_of_type request.
+message ExtensionNumberResponse {
+ // Full name of the base type, including the package name. The format
+ // is <package>.<type>
+ string base_type_name = 1;
+ repeated int32 extension_number = 2;
+}
+
+// A list of ServiceResponse sent by the server answering list_services request.
+message ListServiceResponse {
+ // The information of each service may be expanded in the future, so we use
+ // ServiceResponse message to encapsulate it.
+ repeated ServiceResponse service = 1;
+}
+
+// The information of a single service used by ListServiceResponse to answer
+// list_services request.
+message ServiceResponse {
+ // Full name of a registered service, including its package name. The format
+ // is <package>.<service>
+ string name = 1;
+}
+
+// The error code and error message sent by the server when an error occurs.
+message ErrorResponse {
+ // This field uses the error codes defined in grpc::StatusCode.
+ int32 error_code = 1;
+ string error_message = 2;
+}
diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto
index 1be1966f10..b405acf043 100644
--- a/src/proto/grpc/testing/echo_messages.proto
+++ b/src/proto/grpc/testing/echo_messages.proto
@@ -32,6 +32,12 @@ syntax = "proto3";
package grpc.testing;
+// Message to be echoed back serialized in trailer.
+message DebugInfo {
+ repeated string stack_entries = 1;
+ string detail = 2;
+}
+
message RequestParams {
bool echo_deadline = 1;
int32 client_cancel_after_us = 2;
@@ -43,6 +49,7 @@ message RequestParams {
string expected_client_identity = 8; // will force check_auth_context.
bool skip_cancelled_check = 9;
string expected_transport_security_type = 10;
+ DebugInfo debug_info = 11;
}
message EchoRequest {
diff --git a/src/python/grpcio/grpc/_adapter/_low.py b/src/python/grpcio/grpc/_adapter/_low.py
index b13d8dd9dd..00788bd4cf 100644
--- a/src/python/grpcio/grpc/_adapter/_low.py
+++ b/src/python/grpcio/grpc/_adapter/_low.py
@@ -195,26 +195,30 @@ class Call(_types.Call):
translated_op = cygrpc.operation_send_initial_metadata(
cygrpc.Metadata(
cygrpc.Metadatum(key, value)
- for key, value in op.initial_metadata))
+ for key, value in op.initial_metadata),
+ op.flags)
elif op.type == _types.OpType.SEND_MESSAGE:
- translated_op = cygrpc.operation_send_message(op.message)
+ translated_op = cygrpc.operation_send_message(op.message, op.flags)
elif op.type == _types.OpType.SEND_CLOSE_FROM_CLIENT:
- translated_op = cygrpc.operation_send_close_from_client()
+ translated_op = cygrpc.operation_send_close_from_client(op.flags)
elif op.type == _types.OpType.SEND_STATUS_FROM_SERVER:
translated_op = cygrpc.operation_send_status_from_server(
cygrpc.Metadata(
cygrpc.Metadatum(key, value)
for key, value in op.trailing_metadata),
op.status.code,
- op.status.details)
+ op.status.details,
+ op.flags)
elif op.type == _types.OpType.RECV_INITIAL_METADATA:
- translated_op = cygrpc.operation_receive_initial_metadata()
+ translated_op = cygrpc.operation_receive_initial_metadata(
+ op.flags)
elif op.type == _types.OpType.RECV_MESSAGE:
- translated_op = cygrpc.operation_receive_message()
+ translated_op = cygrpc.operation_receive_message(op.flags)
elif op.type == _types.OpType.RECV_STATUS_ON_CLIENT:
- translated_op = cygrpc.operation_receive_status_on_client()
+ translated_op = cygrpc.operation_receive_status_on_client(
+ op.flags)
elif op.type == _types.OpType.RECV_CLOSE_ON_SERVER:
- translated_op = cygrpc.operation_receive_close_on_server()
+ translated_op = cygrpc.operation_receive_close_on_server(op.flags)
else:
raise ValueError('unexpected operation type {}'.format(op.type))
translated_ops.append(translated_op)
diff --git a/src/python/grpcio/grpc/_adapter/_types.py b/src/python/grpcio/grpc/_adapter/_types.py
index 8ca7ff4b60..f8405949d4 100644
--- a/src/python/grpcio/grpc/_adapter/_types.py
+++ b/src/python/grpcio/grpc/_adapter/_types.py
@@ -152,7 +152,7 @@ class OpArgs(collections.namedtuple(
'trailing_metadata',
'message',
'status',
- 'write_flags',
+ 'flags',
])):
"""Arguments passed into a GRPC operation.
@@ -165,7 +165,7 @@ class OpArgs(collections.namedtuple(
message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None.
status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else
is None.
- write_flags (int): a bit OR'ing of 0 or more OpWriteFlags values.
+ flags (int): a bitwise OR'ing of 0 or more OpWriteFlags values.
"""
@staticmethod
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 3d158a7707..66e6e6b549 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -140,6 +140,9 @@ cdef extern from "grpc/_cython/loader.h":
const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING
const char *GRPC_ARG_SECONDARY_USER_AGENT_STRING
const char *GRPC_SSL_TARGET_NAME_OVERRIDE_ARG
+ const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM
+ const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL
+ const char *GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET
const int GRPC_WRITE_BUFFER_HINT
const int GRPC_WRITE_NO_COMPRESS
@@ -425,3 +428,38 @@ cdef extern from "grpc/_cython/loader.h":
grpc_call_credentials *grpc_metadata_credentials_create_from_plugin(
grpc_metadata_credentials_plugin plugin, void *reserved) nogil
+
+ ctypedef enum grpc_compression_algorithm:
+ GRPC_COMPRESS_NONE
+ GRPC_COMPRESS_DEFLATE
+ GRPC_COMPRESS_GZIP
+ GRPC_COMPRESS_ALGORITHMS_COUNT
+
+ ctypedef enum grpc_compression_level:
+ GRPC_COMPRESS_LEVEL_NONE
+ GRPC_COMPRESS_LEVEL_LOW
+ GRPC_COMPRESS_LEVEL_MED
+ GRPC_COMPRESS_LEVEL_HIGH
+ GRPC_COMPRESS_LEVEL_COUNT
+
+ ctypedef struct grpc_compression_options:
+ uint32_t enabled_algorithms_bitset
+ grpc_compression_algorithm default_compression_algorithm
+
+ int grpc_compression_algorithm_parse(
+ const char *name, size_t name_length,
+ grpc_compression_algorithm *algorithm) nogil
+ int grpc_compression_algorithm_name(grpc_compression_algorithm algorithm,
+ char **name) nogil
+ grpc_compression_algorithm grpc_compression_algorithm_for_level(
+ grpc_compression_level level, uint32_t accepted_encodings) nogil
+ void grpc_compression_options_init(grpc_compression_options *opts) nogil
+ void grpc_compression_options_enable_algorithm(
+ grpc_compression_options *opts,
+ grpc_compression_algorithm algorithm) nogil
+ void grpc_compression_options_disable_algorithm(
+ grpc_compression_options *opts,
+ grpc_compression_algorithm algorithm) nogil
+ int grpc_compression_options_is_algorithm_enabled(
+ const grpc_compression_options *opts,
+ grpc_compression_algorithm algorithm) nogil
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 30397818a1..0474697af8 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -124,3 +124,7 @@ cdef class Operations:
cdef size_t c_nops
cdef list operations
+
+cdef class CompressionOptions:
+
+ cdef grpc_compression_options c_options
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index c2202bdab2..c7539f0d49 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -103,6 +103,19 @@ class OperationType:
receive_close_on_server = GRPC_OP_RECV_CLOSE_ON_SERVER
+class CompressionAlgorithm:
+ none = GRPC_COMPRESS_NONE
+ deflate = GRPC_COMPRESS_DEFLATE
+ gzip = GRPC_COMPRESS_GZIP
+
+
+class CompressionLevel:
+ none = GRPC_COMPRESS_LEVEL_NONE
+ low = GRPC_COMPRESS_LEVEL_LOW
+ medium = GRPC_COMPRESS_LEVEL_MED
+ high = GRPC_COMPRESS_LEVEL_HIGH
+
+
cdef class Timespec:
def __cinit__(self, time):
@@ -473,6 +486,10 @@ cdef class Operation:
return self.c_op.type
@property
+ def flags(self):
+ return self.c_op.flags
+
+ @property
def has_status(self):
return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT
@@ -553,9 +570,10 @@ cdef class Operation:
with nogil:
gpr_free(self._received_status_details)
-def operation_send_initial_metadata(Metadata metadata):
+def operation_send_initial_metadata(Metadata metadata, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
+ op.c_op.flags = flags
op.c_op.data.send_initial_metadata.count = metadata.c_metadata_array.count
op.c_op.data.send_initial_metadata.metadata = (
metadata.c_metadata_array.metadata)
@@ -563,23 +581,25 @@ def operation_send_initial_metadata(Metadata metadata):
op.is_valid = True
return op
-def operation_send_message(data):
+def operation_send_message(data, int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_MESSAGE
+ op.c_op.flags = flags
byte_buffer = ByteBuffer(data)
op.c_op.data.send_message = byte_buffer.c_byte_buffer
op.references.append(byte_buffer)
op.is_valid = True
return op
-def operation_send_close_from_client():
+def operation_send_close_from_client(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
+ op.c_op.flags = flags
op.is_valid = True
return op
def operation_send_status_from_server(
- Metadata metadata, grpc_status_code code, details):
+ Metadata metadata, grpc_status_code code, details, int flags):
if isinstance(details, bytes):
pass
elif isinstance(details, basestring):
@@ -588,6 +608,7 @@ def operation_send_status_from_server(
raise TypeError("expected a str or bytes object for details")
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
+ op.c_op.flags = flags
op.c_op.data.send_status_from_server.trailing_metadata_count = (
metadata.c_metadata_array.count)
op.c_op.data.send_status_from_server.trailing_metadata = (
@@ -599,18 +620,20 @@ def operation_send_status_from_server(
op.is_valid = True
return op
-def operation_receive_initial_metadata():
+def operation_receive_initial_metadata(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
+ op.c_op.flags = flags
op._received_metadata = Metadata([])
op.c_op.data.receive_initial_metadata = (
&op._received_metadata.c_metadata_array)
op.is_valid = True
return op
-def operation_receive_message():
+def operation_receive_message(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_MESSAGE
+ op.c_op.flags = flags
op._received_message = ByteBuffer(None)
# n.b. the c_op.data.receive_message field needs to be deleted by us,
# anyway, so we just let that be handled by the ByteBuffer() we allocated
@@ -619,9 +642,10 @@ def operation_receive_message():
op.is_valid = True
return op
-def operation_receive_status_on_client():
+def operation_receive_status_on_client(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
+ op.c_op.flags = flags
op._received_metadata = Metadata([])
op.c_op.data.receive_status_on_client.trailing_metadata = (
&op._received_metadata.c_metadata_array)
@@ -634,9 +658,10 @@ def operation_receive_status_on_client():
op.is_valid = True
return op
-def operation_receive_close_on_server():
+def operation_receive_close_on_server(int flags):
cdef Operation op = Operation()
op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
+ op.c_op.flags = flags
op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled
op.is_valid = True
return op
@@ -692,3 +717,36 @@ cdef class Operations:
def __iter__(self):
return _OperationsIterator(self)
+
+cdef class CompressionOptions:
+
+ def __cinit__(self):
+ with nogil:
+ grpc_compression_options_init(&self.c_options)
+
+ def enable_algorithm(self, grpc_compression_algorithm algorithm):
+ with nogil:
+ grpc_compression_options_enable_algorithm(&self.c_options, algorithm)
+
+ def disable_algorithm(self, grpc_compression_algorithm algorithm):
+ with nogil:
+ grpc_compression_options_disable_algorithm(&self.c_options, algorithm)
+
+ def is_algorithm_enabled(self, grpc_compression_algorithm algorithm):
+ cdef int result
+ with nogil:
+ result = grpc_compression_options_is_algorithm_enabled(
+ &self.c_options, algorithm)
+ return result
+
+ def to_channel_arg(self):
+ return ChannelArg(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
+ self.c_options.enabled_algorithms_bitset)
+
+
+def compression_algorithm_name(grpc_compression_algorithm algorithm):
+ cdef char* name
+ with nogil:
+ grpc_compression_algorithm_name(algorithm, &name)
+ # Let Cython do the right thing with string casting
+ return name
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c
index f0a40dbb35..09551472b5 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.c
+++ b/src/python/grpcio/grpc/_cython/imports.generated.c
@@ -125,6 +125,7 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
+grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -395,6 +396,7 @@ void pygrpc_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
+ grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h
index d5e810b7cf..6de295414a 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.h
+++ b/src/python/grpcio/grpc/_cython/imports.generated.h
@@ -43,6 +43,7 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -325,6 +326,9 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
+typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
+extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
+#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import
@@ -866,14 +870,15 @@ void pygrpc_load_imports(HMODULE library);
#else /* !GPR_WIN32 */
-#include <grpc/support/alloc.h>
-#include <grpc/support/slice.h>
-#include <grpc/support/time.h>
-#include <grpc/status.h>
#include <grpc/byte_buffer.h>
#include <grpc/byte_buffer_reader.h>
+#include <grpc/compression.h>
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/slice.h>
+#include <grpc/support/time.h>
+#include <grpc/status.h>
#endif /* !GPR_WIN32 */
diff --git a/src/python/grpcio/grpc/beta/interfaces.py b/src/python/grpcio/grpc/beta/interfaces.py
index 33ca45ac5b..24de9ad1a8 100644
--- a/src/python/grpcio/grpc/beta/interfaces.py
+++ b/src/python/grpcio/grpc/beta/interfaces.py
@@ -235,7 +235,7 @@ class Server(six.with_metaclass(abc.ABCMeta)):
This method may be called at any time and is idempotent. Passing a smaller
grace value than has been passed in a previous call will have the effect of
stopping the Server sooner. Passing a larger grace value than has been
- passed in a previous call will not have the effect of stopping the sooner
+ passed in a previous call will not have the effect of stopping the server
later.
Args:
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index a411f45bbd..9c067add0a 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -95,6 +95,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
+ 'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
'src/core/lib/iomgr/executor.c',
@@ -230,6 +231,9 @@ CORE_SOURCE_FILES = [
'src/core/ext/client_config/uri_parser.c',
'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
+ 'src/core/ext/transport/cronet/client/secure/cronet_channel_create.c',
+ 'src/core/ext/transport/cronet/transport/cronet_api_dummy.c',
+ 'src/core/ext/transport/cronet/transport/cronet_transport.c',
'src/core/ext/lb_policy/grpclb/load_balancer_api.c',
'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',
diff --git a/src/ruby/lib/grpc/signals.rb b/src/python/grpcio/tests/health_check/__init__.py
index 2ab85c8bb1..100a624dc9 100644
--- a/src/ruby/lib/grpc/signals.rb
+++ b/src/python/grpcio/tests/health_check/__init__.py
@@ -26,44 +26,3 @@
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-require 'thread'
-require_relative 'grpc'
-
-# GRPC contains the General RPC module.
-module GRPC
- # Signals contains gRPC functions related to signal handling
- module Signals
- @interpreter_exiting = false
- @signal_handlers = []
- @handlers_mutex = Mutex.new
-
- def register_handler(&handler)
- @handlers_mutex.synchronize do
- @signal_handlers.push(handler)
- handler.call if @exit_signal_received
- end
- # Returns a function to remove the handler
- lambda do
- @handlers_mutex.synchronize { @signal_handlers.delete(handler) }
- end
- end
- module_function :register_handler
-
- def wait_for_signals
- t = Thread.new do
- sleep 0.1 until GRPC::Core.signal_received? || @interpreter_exiting
- unless @interpreter_exiting
- @handlers_mutex.synchronize do
- @signal_handlers.each(&:call)
- end
- end
- end
- at_exit do
- @interpreter_exiting = true
- t.join
- end
- end
- module_function :wait_for_signals
- end
-end
diff --git a/src/python/grpcio/tests/health_check/_health_servicer_test.py b/src/python/grpcio/tests/health_check/_health_servicer_test.py
new file mode 100644
index 0000000000..1b63388663
--- /dev/null
+++ b/src/python/grpcio/tests/health_check/_health_servicer_test.py
@@ -0,0 +1,75 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests of grpc_health.health.v1.health."""
+
+import unittest
+
+from grpc_health.health.v1 import health
+from grpc_health.health.v1 import health_pb2
+
+
+class HealthServicerTest(unittest.TestCase):
+
+ def setUp(self):
+ self.servicer = health.HealthServicer()
+ self.servicer.set('', health_pb2.HealthCheckResponse.SERVING)
+ self.servicer.set('grpc.test.TestServiceServing',
+ health_pb2.HealthCheckResponse.SERVING)
+ self.servicer.set('grpc.test.TestServiceUnknown',
+ health_pb2.HealthCheckResponse.UNKNOWN)
+ self.servicer.set('grpc.test.TestServiceNotServing',
+ health_pb2.HealthCheckResponse.NOT_SERVING)
+
+ def test_empty_service(self):
+ request = health_pb2.HealthCheckRequest()
+ resp = self.servicer.Check(request, None)
+ self.assertEqual(resp.status, health_pb2.HealthCheckResponse.SERVING)
+
+ def test_serving_service(self):
+ request = health_pb2.HealthCheckRequest(
+ service='grpc.test.TestServiceServing')
+ resp = self.servicer.Check(request, None)
+ self.assertEqual(resp.status, health_pb2.HealthCheckResponse.SERVING)
+
+ def test_unknown_serivce(self):
+ request = health_pb2.HealthCheckRequest(
+ service='grpc.test.TestServiceUnknown')
+ resp = self.servicer.Check(request, None)
+ self.assertEqual(resp.status, health_pb2.HealthCheckResponse.UNKNOWN)
+
+ def test_not_serving_service(self):
+ request = health_pb2.HealthCheckRequest(
+ service='grpc.test.TestServiceNotServing')
+ resp = self.servicer.Check(request, None)
+ self.assertEqual(resp.status, health_pb2.HealthCheckResponse.NOT_SERVING)
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
index 3dc3042e38..7466f88059 100644
--- a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
@@ -59,11 +59,12 @@ STUB_FACTORY_IDENTIFIER = 'beta_create_TestService_stub'
class _ServicerMethods(object):
- def __init__(self, test_pb2):
+ def __init__(self, response_pb2, payload_pb2):
self._condition = threading.Condition()
self._paused = False
self._fail = False
- self._test_pb2 = test_pb2
+ self._response_pb2 = response_pb2
+ self._payload_pb2 = payload_pb2
@contextlib.contextmanager
def pause(self): # pylint: disable=invalid-name
@@ -90,22 +91,22 @@ class _ServicerMethods(object):
self._condition.wait()
def UnaryCall(self, request, unused_rpc_context):
- response = self._test_pb2.SimpleResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.SimpleResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * request.response_size
self._control()
return response
def StreamingOutputCall(self, request, unused_rpc_context):
for parameter in request.response_parameters:
- response = self._test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def StreamingInputCall(self, request_iter, unused_rpc_context):
- response = self._test_pb2.StreamingInputCallResponse()
+ response = self._response_pb2.StreamingInputCallResponse()
aggregated_payload_size = 0
for request in request_iter:
aggregated_payload_size += len(request.payload.payload_compressable)
@@ -116,8 +117,8 @@ class _ServicerMethods(object):
def FullDuplexCall(self, request_iter, unused_rpc_context):
for request in request_iter:
for parameter in request.response_parameters:
- response = self._test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
@@ -126,8 +127,8 @@ class _ServicerMethods(object):
responses = []
for request in request_iter:
for parameter in request.response_parameters:
- response = self._test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
responses.append(response)
@@ -136,23 +137,25 @@ class _ServicerMethods(object):
@contextlib.contextmanager
-def _CreateService(test_pb2):
+def _CreateService(service_pb2, response_pb2, payload_pb2):
"""Provides a servicer backend and a stub.
The servicer is just the implementation of the actual servicer passed to the
face player of the python RPC implementation; the two are detached.
Args:
- test_pb2: The test_pb2 module generated by this test.
+ service_pb2: The service_pb2 module generated by this test.
+ response_pb2: The response_pb2 module generated by this test
+ payload_pb2: The payload_pb2 module generated by this test
Yields:
A (servicer_methods, stub) pair where servicer_methods is the back-end of
the service bound to the stub and and stub is the stub on which to invoke
RPCs.
"""
- servicer_methods = _ServicerMethods(test_pb2)
+ servicer_methods = _ServicerMethods(response_pb2, payload_pb2)
- class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+ class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
def UnaryCall(self, request, context):
return servicer_methods.UnaryCall(request, context)
@@ -170,55 +173,52 @@ def _CreateService(test_pb2):
return servicer_methods.HalfDuplexCall(request_iter, context)
servicer = Servicer()
- server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
+ server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
channel = implementations.insecure_channel('localhost', port)
- stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
- yield servicer_methods, stub
+ stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
+ yield (servicer_methods, stub)
server.stop(0)
@contextlib.contextmanager
-def _CreateIncompleteService(test_pb2):
+def _CreateIncompleteService(service_pb2):
"""Provides a servicer backend that fails to implement methods and its stub.
The servicer is just the implementation of the actual servicer passed to the
face player of the python RPC implementation; the two are detached.
-
Args:
- test_pb2: The test_pb2 module generated by this test.
-
+ service_pb2: The service_pb2 module generated by this test.
Yields:
A (servicer_methods, stub) pair where servicer_methods is the back-end of
the service bound to the stub and and stub is the stub on which to invoke
RPCs.
"""
- servicer_methods = _ServicerMethods(test_pb2)
- class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+ class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
pass
servicer = Servicer()
- server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
+ server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
channel = implementations.insecure_channel('localhost', port)
- stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
- yield servicer_methods, stub
+ stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
+ yield None, stub
server.stop(0)
-def _streaming_input_request_iterator(test_pb2):
+def _streaming_input_request_iterator(request_pb2, payload_pb2):
for _ in range(3):
- request = test_pb2.StreamingInputCallRequest()
- request.payload.payload_type = test_pb2.COMPRESSABLE
+ request = request_pb2.StreamingInputCallRequest()
+ request.payload.payload_type = payload_pb2.COMPRESSABLE
request.payload.payload_compressable = 'a'
yield request
-def _streaming_output_request(test_pb2):
- request = test_pb2.StreamingOutputCallRequest()
+def _streaming_output_request(request_pb2):
+ request = request_pb2.StreamingOutputCallRequest()
sizes = [1, 2, 3]
request.response_parameters.add(size=sizes[0], interval_us=0)
request.response_parameters.add(size=sizes[1], interval_us=0)
@@ -226,11 +226,11 @@ def _streaming_output_request(test_pb2):
return request
-def _full_duplex_request_iterator(test_pb2):
- request = test_pb2.StreamingOutputCallRequest()
+def _full_duplex_request_iterator(request_pb2):
+ request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
- request = test_pb2.StreamingOutputCallRequest()
+ request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@@ -250,8 +250,6 @@ class PythonPluginTest(unittest.TestCase):
protoc_command = 'protoc'
protoc_plugin_filename = distutils.spawn.find_executable(
'grpc_python_plugin')
- test_proto_filename = pkg_resources.resource_filename(
- 'tests.protoc_plugin', 'protoc_plugin_test.proto')
if not os.path.isfile(protoc_command):
# Assume that if we haven't built protoc that it's on the system.
protoc_command = 'protoc'
@@ -259,19 +257,44 @@ class PythonPluginTest(unittest.TestCase):
# Ensure that the output directory exists.
self.outdir = tempfile.mkdtemp()
+ # Find all proto files
+ paths = []
+ root_dir = os.path.dirname(os.path.realpath(__file__))
+ proto_dir = os.path.join(root_dir, 'protos')
+ for walk_root, _, filenames in os.walk(proto_dir):
+ for filename in filenames:
+ if filename.endswith('.proto'):
+ path = os.path.join(walk_root, filename)
+ paths.append(path)
+
# Invoke protoc with the plugin.
cmd = [
protoc_command,
'--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
- '-I .',
+ '-I %s' % root_dir,
'--python_out=%s' % self.outdir,
- '--python-grpc_out=%s' % self.outdir,
- os.path.basename(test_proto_filename),
- ]
+ '--python-grpc_out=%s' % self.outdir
+ ] + paths
subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
- cwd=os.path.dirname(test_proto_filename))
+ cwd=os.path.dirname(os.path.realpath(__file__)))
+
+ # Generated proto directories dont include __init__.py, but
+ # these are needed for python package resolution
+ for walk_root, _, _ in os.walk(os.path.join(self.outdir, 'protos')):
+ path = os.path.join(walk_root, '__init__.py')
+ open(path, 'a').close()
+
sys.path.insert(0, self.outdir)
+ import protos.payload.test_payload_pb2 as payload_pb2 # pylint: disable=g-import-not-at-top
+ import protos.requests.r.test_requests_pb2 as request_pb2 # pylint: disable=g-import-not-at-top
+ import protos.responses.test_responses_pb2 as response_pb2 # pylint: disable=g-import-not-at-top
+ import protos.service.test_service_pb2 as service_pb2 # pylint: disable=g-import-not-at-top
+ self._payload_pb2 = payload_pb2
+ self._request_pb2 = request_pb2
+ self._response_pb2 = response_pb2
+ self._service_pb2 = service_pb2
+
def tearDown(self):
try:
shutil.rmtree(self.outdir)
@@ -282,43 +305,40 @@ class PythonPluginTest(unittest.TestCase):
def testImportAttributes(self):
# check that we can access the generated module and its members.
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
- self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
- self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
- self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, SERVICER_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, STUB_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, SERVER_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, STUB_FACTORY_IDENTIFIER, None))
def testUpDown(self):
- import protoc_plugin_test_pb2 as test_pb2
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (servicer, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(
+ self._service_pb2, self._response_pb2, self._payload_pb2):
+ self._request_pb2.SimpleRequest(response_size=13)
def testIncompleteServicer(self):
- import protoc_plugin_test_pb2 as test_pb2
- moves.reload_module(test_pb2)
- with _CreateIncompleteService(test_pb2) as (servicer, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateIncompleteService(self._service_pb2) as (_, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
try:
- response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
+ stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
except face.AbortionError as error:
self.assertEqual(interfaces.StatusCode.UNIMPLEMENTED, error.code)
def testUnaryCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
expected_response = methods.UnaryCall(request, 'not a real context!')
self.assertEqual(expected_response, response)
def testUnaryCallFuture(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
# Check that the call does not block waiting for the server to respond.
with methods.pause():
response_future = stub.UnaryCall.future(
@@ -328,10 +348,9 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testUnaryCallFutureExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(
request, test_constants.SHORT_TIMEOUT)
@@ -339,30 +358,27 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testUnaryCallFutureCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(request, 1)
response_future.cancel()
self.assertTrue(response_future.cancelled())
def testUnaryCallFutureFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
with methods.fail():
response_future = stub.UnaryCall.future(
request, test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testStreamingOutputCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
responses = stub.StreamingOutputCall(
request, test_constants.LONG_TIMEOUT)
expected_responses = methods.StreamingOutputCall(
@@ -372,10 +388,9 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testStreamingOutputCallExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
with methods.pause():
responses = stub.StreamingOutputCall(
request, test_constants.SHORT_TIMEOUT)
@@ -383,10 +398,9 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testStreamingOutputCallCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (unused_methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
responses = stub.StreamingOutputCall(
request, test_constants.LONG_TIMEOUT)
next(responses)
@@ -395,10 +409,9 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingOutputCallFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
with methods.fail():
responses = stub.StreamingOutputCall(request, 1)
self.assertIsNotNone(responses)
@@ -406,36 +419,38 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingInputCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
response = stub.StreamingInputCall(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
expected_response = methods.StreamingInputCall(
- _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ _streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
+ 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFuture(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
response = response_future.result()
expected_response = methods.StreamingInputCall(
- _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ _streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
+ 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFutureExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError):
response_future.result()
@@ -443,12 +458,12 @@ class PythonPluginTest(unittest.TestCase):
response_future.exception(), face.ExpirationError)
def testStreamingInputCallFutureCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
response_future.cancel()
self.assertTrue(response_future.cancelled())
@@ -456,32 +471,32 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testStreamingInputCallFutureFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.fail():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testFullDuplexCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
responses = stub.FullDuplexCall(
- _full_duplex_request_iterator(test_pb2), test_constants.LONG_TIMEOUT)
+ _full_duplex_request_iterator(self._request_pb2),
+ test_constants.LONG_TIMEOUT)
expected_responses = methods.FullDuplexCall(
- _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
+ _full_duplex_request_iterator(self._request_pb2),
+ 'not a real RpcContext!')
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
def testFullDuplexCallExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request_iterator = _full_duplex_request_iterator(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._request_pb2)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
responses = stub.FullDuplexCall(
request_iterator, test_constants.SHORT_TIMEOUT)
@@ -489,10 +504,9 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testFullDuplexCallCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
- request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._request_pb2)
responses = stub.FullDuplexCall(
request_iterator, test_constants.LONG_TIMEOUT)
next(responses)
@@ -501,10 +515,9 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testFullDuplexCallFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request_iterator = _full_duplex_request_iterator(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._request_pb2)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.fail():
responses = stub.FullDuplexCall(
request_iterator, test_constants.LONG_TIMEOUT)
@@ -513,14 +526,13 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testHalfDuplexCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
def half_duplex_request_iterator():
- request = test_pb2.StreamingOutputCallRequest()
+ request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
- request = test_pb2.StreamingOutputCallRequest()
+ request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@@ -533,8 +545,6 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
condition = threading.Condition()
wait_cell = [False]
@contextlib.contextmanager
@@ -547,13 +557,14 @@ class PythonPluginTest(unittest.TestCase):
wait_cell[0] = False
condition.notify_all()
def half_duplex_request_iterator():
- request = test_pb2.StreamingOutputCallRequest()
+ request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
with condition:
while wait_cell[0]:
condition.wait()
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with wait():
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), test_constants.SHORT_TIMEOUT)
@@ -563,5 +574,5 @@ class PythonPluginTest(unittest.TestCase):
if __name__ == '__main__':
- os.chdir(os.path.dirname(sys.argv[0]))
+ #os.chdir(os.path.dirname(sys.argv[0]))
unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto b/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto
new file mode 100644
index 0000000000..457543aa79
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto
@@ -0,0 +1,51 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+package grpc_protoc_plugin;
+
+enum PayloadType {
+ // Compressable text format.
+ COMPRESSABLE= 0;
+
+ // Uncompressable binary format.
+ UNCOMPRESSABLE = 1;
+
+ // Randomly chosen from all other formats defined in this enum.
+ RANDOM = 2;
+}
+
+message Payload {
+ PayloadType payload_type = 1;
+ oneof payload_body {
+ string payload_compressable = 2;
+ bytes payload_uncompressable = 3;
+ }
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto b/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto
new file mode 100644
index 0000000000..54105df6a5
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto
@@ -0,0 +1,77 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+import "protos/payload/test_payload.proto";
+
+package grpc_protoc_plugin;
+
+message SimpleRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, server randomly chooses one from other formats.
+ PayloadType response_type = 1;
+
+ // Desired payload size in the response from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ int32 response_size = 2;
+
+ // input payload sent along with the request.
+ Payload payload = 3;
+}
+
+message StreamingInputCallRequest {
+ // input payload sent along with the request.
+ Payload payload = 1;
+
+ // Not expecting any payload from the response.
+}
+
+message ResponseParameters {
+ // Desired payload sizes in responses from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ int32 size = 1;
+
+ // Desired interval between consecutive responses in the response stream in
+ // microseconds.
+ int32 interval_us = 2;
+}
+
+message StreamingOutputCallRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, the payload from each response in the stream
+ // might be of different types. This is to simulate a mixed type of payload
+ // stream.
+ PayloadType response_type = 1;
+
+ repeated ResponseParameters response_parameters = 2;
+
+ // input payload sent along with the request.
+ Payload payload = 3;
+}
diff --git a/src/python/grpcio_health_checking/grpc/health/v1/health.proto b/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto
index b0bac54be9..734fbda86e 100644
--- a/src/python/grpcio_health_checking/grpc/health/v1/health.proto
+++ b/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto
@@ -29,21 +29,19 @@
syntax = "proto3";
-package grpc.health.v1;
+import "protos/payload/test_payload.proto";
-message HealthCheckRequest {
- string service = 1;
+package grpc_protoc_plugin;
+
+message SimpleResponse {
+ Payload payload = 1;
}
-message HealthCheckResponse {
- enum ServingStatus {
- UNKNOWN = 0;
- SERVING = 1;
- NOT_SERVING = 2;
- }
- ServingStatus status = 1;
+message StreamingInputCallResponse {
+ // Aggregated size of payloads received from the client.
+ int32 aggregated_payload_size = 1;
}
-service Health {
- rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
+message StreamingOutputCallResponse {
+ Payload payload = 1;
}
diff --git a/src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto b/src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
index 6762a8e7f3..fe715ee7f9 100644
--- a/src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto
+++ b/src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
@@ -1,4 +1,4 @@
-// Copyright 2015, Google Inc.
+// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -27,87 +27,12 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-// An integration test service that covers all the method signature permutations
-// of unary/streaming requests/responses.
-// This file is duplicated around the code base. See GitHub issue #526.
-syntax = "proto2";
+syntax = "proto3";
-package grpc_protoc_plugin;
-
-enum PayloadType {
- // Compressable text format.
- COMPRESSABLE= 1;
-
- // Uncompressable binary format.
- UNCOMPRESSABLE = 2;
-
- // Randomly chosen from all other formats defined in this enum.
- RANDOM = 3;
-}
-
-message Payload {
- required PayloadType payload_type = 1;
- oneof payload_body {
- string payload_compressable = 2;
- bytes payload_uncompressable = 3;
- }
-}
-
-message SimpleRequest {
- // Desired payload type in the response from the server.
- // If response_type is RANDOM, server randomly chooses one from other formats.
- optional PayloadType response_type = 1 [default=COMPRESSABLE];
-
- // Desired payload size in the response from the server.
- // If response_type is COMPRESSABLE, this denotes the size before compression.
- optional int32 response_size = 2;
-
- // Optional input payload sent along with the request.
- optional Payload payload = 3;
-}
-
-message SimpleResponse {
- optional Payload payload = 1;
-}
-
-message StreamingInputCallRequest {
- // Optional input payload sent along with the request.
- optional Payload payload = 1;
+import "protos/requests/r/test_requests.proto";
+import "protos/responses/test_responses.proto";
- // Not expecting any payload from the response.
-}
-
-message StreamingInputCallResponse {
- // Aggregated size of payloads received from the client.
- optional int32 aggregated_payload_size = 1;
-}
-
-message ResponseParameters {
- // Desired payload sizes in responses from the server.
- // If response_type is COMPRESSABLE, this denotes the size before compression.
- required int32 size = 1;
-
- // Desired interval between consecutive responses in the response stream in
- // microseconds.
- required int32 interval_us = 2;
-}
-
-message StreamingOutputCallRequest {
- // Desired payload type in the response from the server.
- // If response_type is RANDOM, the payload from each response in the stream
- // might be of different types. This is to simulate a mixed type of payload
- // stream.
- optional PayloadType response_type = 1 [default=COMPRESSABLE];
-
- repeated ResponseParameters response_parameters = 2;
-
- // Optional input payload sent along with the request.
- optional Payload payload = 3;
-}
-
-message StreamingOutputCallResponse {
- optional Payload payload = 1;
-}
+package grpc_protoc_plugin;
service TestService {
// One request followed by one response.
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
index eed0b0c6da..b372ea01ad 100644
--- a/src/python/grpcio/tests/qps/benchmark_client.py
+++ b/src/python/grpcio/tests/qps/benchmark_client.py
@@ -39,6 +39,7 @@ except ImportError:
from concurrent import futures
from grpc.beta import implementations
+from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import services_pb2
from tests.unit import resources
@@ -141,10 +142,10 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient):
self._stub = None
-class StreamingAsyncBenchmarkClient(BenchmarkClient):
+class StreamingSyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist):
- super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
self._is_streaming = False
self._pool = futures.ThreadPoolExecutor(max_workers=1)
# Use a thread-safe queue to put requests on the stream
@@ -167,12 +168,12 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
def _request_stream(self):
self._is_streaming = True
if self._generic:
- response_stream = self._stub.inline_stream_stream(
- 'grpc.testing.BenchmarkService', 'StreamingCall',
- self._request_generator(), _TIMEOUT)
+ stream_callable = self._stub.stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall')
else:
- response_stream = self._stub.StreamingCall(self._request_generator(),
- _TIMEOUT)
+ stream_callable = self._stub.StreamingCall
+
+ response_stream = stream_callable(self._request_generator(), _TIMEOUT)
for _ in response_stream:
end_time = time.time()
self._handle_response(end_time - self._send_time_queue.get_nowait())
@@ -184,3 +185,48 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
yield request
except queue.Empty:
pass
+
+
+class AsyncReceiver(face.ResponseReceiver):
+ """Receiver for async stream responses."""
+
+ def __init__(self, send_time_queue, response_handler):
+ self._send_time_queue = send_time_queue
+ self._response_handler = response_handler
+
+ def initial_metadata(self, initial_mdetadata):
+ pass
+
+ def response(self, response):
+ end_time = time.time()
+ self._response_handler(end_time - self._send_time_queue.get_nowait())
+
+ def complete(self, terminal_metadata, code, details):
+ pass
+
+
+class StreamingAsyncBenchmarkClient(BenchmarkClient):
+
+ def __init__(self, server, config, hist):
+ super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ self._send_time_queue = queue.Queue()
+ self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response)
+ self._rendezvous = None
+
+ def send_request(self):
+ if self._rendezvous is not None:
+ self._send_time_queue.put(time.time())
+ self._rendezvous.consume(self._request)
+
+ def start(self):
+ if self._generic:
+ stream_callable = self._stub.stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall')
+ else:
+ stream_callable = self._stub.StreamingCall
+ self._rendezvous = stream_callable.event(
+ self._receiver, lambda *args: None, _TIMEOUT)
+
+ def stop(self):
+ self._rendezvous.terminate()
+ self._rendezvous = None
diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py
index a36c30ccc0..1ede7d2af1 100644
--- a/src/python/grpcio/tests/qps/client_runner.py
+++ b/src/python/grpcio/tests/qps/client_runner.py
@@ -89,9 +89,9 @@ class ClosedLoopClientRunner(ClientRunner):
def start(self):
self._is_running = True
+ self._client.start()
for _ in xrange(self._request_count):
self._client.send_request()
- self._client.start()
def stop(self):
self._is_running = False
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py
index 0b3acc14e7..1f9af5482c 100644
--- a/src/python/grpcio/tests/qps/worker_server.py
+++ b/src/python/grpcio/tests/qps/worker_server.py
@@ -146,8 +146,9 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnarySyncBenchmarkClient(
server, config, qps_data)
- else:
- raise Exception('STREAMING SYNC client not supported')
+ elif config.rpc_type == control_pb2.STREAMING:
+ client = benchmark_client.StreamingSyncBenchmarkClient(
+ server, config, qps_data)
elif config.client_type == control_pb2.ASYNC_CLIENT:
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnaryAsyncBenchmarkClient(
diff --git a/src/python/grpcio/tests/tests.json b/src/python/grpcio/tests/tests.json
index 84870aaa5c..691062f25a 100644
--- a/src/python/grpcio/tests/tests.json
+++ b/src/python/grpcio/tests/tests.json
@@ -28,7 +28,8 @@
"_face_interface_test.GenericInvokerBlockingInvocationInlineServiceTest",
"_face_interface_test.GenericInvokerFutureInvocationAsynchronousEventServiceTest",
"_face_interface_test.MultiCallableInvokerBlockingInvocationInlineServiceTest",
- "_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
+ "_face_interface_test.MultiCallableInvokerFutureInvocationAsynchronousEventServiceTest",
+ "_health_servicer_test.HealthServicerTest",
"_implementations_test.ChannelCredentialsTest",
"_insecure_interop_test.InsecureInteropTest",
"_intermediary_low_test.CancellationTest",
@@ -50,4 +51,4 @@
"cygrpc_test.InsecureServerInsecureClient",
"cygrpc_test.SecureServerSecureClient",
"cygrpc_test.TypeSmokeTest"
-] \ No newline at end of file
+]
diff --git a/src/python/grpcio/tests/unit/_cython/_channel_test.py b/src/python/grpcio/tests/unit/_cython/_channel_test.py
index 931cd9083e..3dc7a246ae 100644
--- a/src/python/grpcio/tests/unit/_cython/_channel_test.py
+++ b/src/python/grpcio/tests/unit/_cython/_channel_test.py
@@ -60,7 +60,7 @@ def _create_loop_destroy():
def _in_parallel(behavior, arguments):
threads = tuple(
threading.Thread(target=behavior, args=arguments)
- for _ in range(test_constants.PARALLELISM))
+ for _ in range(test_constants.THREAD_CONCURRENCY))
for thread in threads:
thread.start()
for thread in threads:
diff --git a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py
index 876da88de9..0a511101f0 100644
--- a/src/python/grpcio/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio/tests/unit/_cython/cygrpc_test.py
@@ -40,6 +40,7 @@ from tests.unit import resources
_SSL_HOST_OVERRIDE = 'foo.test.google.fr'
_CALL_CREDENTIALS_METADATA_KEY = 'call-creds-key'
_CALL_CREDENTIALS_METADATA_VALUE = 'call-creds-value'
+_EMPTY_FLAGS = 0
def _metadata_plugin_callback(context, callback):
callback(cygrpc.Metadata(
@@ -76,7 +77,7 @@ class TypeSmokeTest(unittest.TestCase):
def testOperationsIteration(self):
operations = cygrpc.Operations([
- cygrpc.operation_send_message('asdf')])
+ cygrpc.operation_send_message('asdf', _EMPTY_FLAGS)])
iterator = iter(operations)
operation = next(iterator)
self.assertIsInstance(operation, cygrpc.Operation)
@@ -85,6 +86,11 @@ class TypeSmokeTest(unittest.TestCase):
with self.assertRaises(StopIteration):
next(iterator)
+ def testOperationFlags(self):
+ operation = cygrpc.operation_send_message('asdf',
+ cygrpc.WriteFlag.no_compress)
+ self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags)
+
def testTimespec(self):
now = time.time()
timespec = cygrpc.Timespec(now)
@@ -188,12 +194,13 @@ class InsecureServerInsecureClient(unittest.TestCase):
CLIENT_METADATA_ASCII_VALUE),
cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)])
client_start_batch_result = client_call.start_batch(cygrpc.Operations([
- cygrpc.operation_send_initial_metadata(client_initial_metadata),
- cygrpc.operation_send_message(REQUEST),
- cygrpc.operation_send_close_from_client(),
- cygrpc.operation_receive_initial_metadata(),
- cygrpc.operation_receive_message(),
- cygrpc.operation_receive_status_on_client()
+ cygrpc.operation_send_initial_metadata(client_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
]), client_call_tag)
self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
client_event_future = test_utilities.CompletionQueuePollFuture(
@@ -223,12 +230,14 @@ class InsecureServerInsecureClient(unittest.TestCase):
cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY,
SERVER_TRAILING_METADATA_VALUE)])
server_start_batch_result = server_call.start_batch([
- cygrpc.operation_send_initial_metadata(server_initial_metadata),
- cygrpc.operation_receive_message(),
- cygrpc.operation_send_message(RESPONSE),
- cygrpc.operation_receive_close_on_server(),
+ cygrpc.operation_send_initial_metadata(server_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
+ cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
- server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
+ server_trailing_metadata, SERVER_STATUS_CODE,
+ SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
], server_call_tag)
self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
@@ -349,12 +358,13 @@ class SecureServerSecureClient(unittest.TestCase):
CLIENT_METADATA_ASCII_VALUE),
cygrpc.Metadatum(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)])
client_start_batch_result = client_call.start_batch(cygrpc.Operations([
- cygrpc.operation_send_initial_metadata(client_initial_metadata),
- cygrpc.operation_send_message(REQUEST),
- cygrpc.operation_send_close_from_client(),
- cygrpc.operation_receive_initial_metadata(),
- cygrpc.operation_receive_message(),
- cygrpc.operation_receive_status_on_client()
+ cygrpc.operation_send_initial_metadata(client_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
+ cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
+ cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
]), client_call_tag)
self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
client_event_future = test_utilities.CompletionQueuePollFuture(
@@ -387,12 +397,14 @@ class SecureServerSecureClient(unittest.TestCase):
cygrpc.Metadatum(SERVER_TRAILING_METADATA_KEY,
SERVER_TRAILING_METADATA_VALUE)])
server_start_batch_result = server_call.start_batch([
- cygrpc.operation_send_initial_metadata(server_initial_metadata),
- cygrpc.operation_receive_message(),
- cygrpc.operation_send_message(RESPONSE),
- cygrpc.operation_receive_close_on_server(),
+ cygrpc.operation_send_initial_metadata(server_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
+ cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
cygrpc.operation_send_status_from_server(
- server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
+ server_trailing_metadata, SERVER_STATUS_CODE,
+ SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
], server_call_tag)
self.assertEqual(cygrpc.CallError.ok, server_start_batch_result)
diff --git a/src/python/grpcio/tests/unit/framework/common/test_constants.py b/src/python/grpcio/tests/unit/framework/common/test_constants.py
index 8d89101e09..b6682d396c 100644
--- a/src/python/grpcio/tests/unit/framework/common/test_constants.py
+++ b/src/python/grpcio/tests/unit/framework/common/test_constants.py
@@ -49,8 +49,13 @@ STREAM_LENGTH = 200
# The size of payloads to transmit in tests.
PAYLOAD_SIZE = 256 * 1024 + 17
-# The parallelism to use in tests of parallel RPCs.
-PARALLELISM = 200
+# The concurrency to use in tests of concurrent RPCs that will not create as
+# many threads as RPCs.
+RPC_CONCURRENCY = 200
+
+# The concurrency to use in tests of concurrent RPCs that will create as many
+# threads as RPCs.
+THREAD_CONCURRENCY = 25
# The size of thread pools to use in tests.
POOL_SIZE = 10
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
index 649892463a..e338aaa396 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -146,13 +146,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
test_messages.verify(second_request, second_response, self)
def testParallelInvocations(self):
- pool = logging_pool.pool(test_constants.PARALLELISM)
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures = []
- for _ in range(test_constants.PARALLELISM):
+ for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
@@ -168,13 +168,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
pool.shutdown(wait=True)
def testWaitingForSomeButNotAllParallelInvocations(self):
- pool = logging_pool.pool(test_constants.PARALLELISM)
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
- for index in range(test_constants.PARALLELISM):
+ for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = pool.submit(
self._invoker.blocking(group, method), request,
@@ -184,7 +184,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
- test_constants.PARALLELISM // 2)
+ test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self)
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index c3813d5f3a..791620307b 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -249,7 +249,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
for test_messages in test_messages_sequence:
requests = []
response_futures = []
- for _ in range(test_constants.PARALLELISM):
+ for _ in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
@@ -263,13 +263,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
test_messages.verify(request, response, self)
def testWaitingForSomeButNotAllParallelInvocations(self):
- pool = logging_pool.pool(test_constants.PARALLELISM)
+ pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY)
for (group, method), test_messages_sequence in (
six.iteritems(self._digest.unary_unary_messages_sequences)):
for test_messages in test_messages_sequence:
requests = []
response_futures_to_indices = {}
- for index in range(test_constants.PARALLELISM):
+ for index in range(test_constants.THREAD_CONCURRENCY):
request = test_messages.request()
inner_response_future = self._invoker.future(group, method)(
request, test_constants.LONG_TIMEOUT)
@@ -279,7 +279,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest.
some_completed_response_futures_iterator = itertools.islice(
futures.as_completed(response_futures_to_indices),
- test_constants.PARALLELISM // 2)
+ test_constants.THREAD_CONCURRENCY // 2)
for response_future in some_completed_response_futures_iterator:
index = response_futures_to_indices[response_future]
test_messages.verify(requests[index], response_future.result(), self)
diff --git a/src/python/grpcio_health_checking/.gitignore b/src/python/grpcio_health_checking/.gitignore
new file mode 100644
index 0000000000..85af466886
--- /dev/null
+++ b/src/python/grpcio_health_checking/.gitignore
@@ -0,0 +1,5 @@
+*.proto
+*_pb2.py
+build/
+grpcio_health_checking.egg-info/
+dist/
diff --git a/src/python/grpcio_health_checking/MANIFEST.in b/src/python/grpcio_health_checking/MANIFEST.in
index 498b55f20a..7d26647697 100644
--- a/src/python/grpcio_health_checking/MANIFEST.in
+++ b/src/python/grpcio_health_checking/MANIFEST.in
@@ -1,2 +1,3 @@
-graft grpc
-include commands.py
+include health_commands.py
+graft grpc_health
+global-exclude *.pyc
diff --git a/src/python/grpcio_health_checking/grpc/health/v1/health.py b/src/python/grpcio_health_checking/grpc/health/v1/health.py
deleted file mode 100644
index 4b5af15aa6..0000000000
--- a/src/python/grpcio_health_checking/grpc/health/v1/health.py
+++ /dev/null
@@ -1,129 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Reference implementation for health checking in gRPC Python."""
-
-import abc
-import enum
-import threading
-
-from grpc.health.v1 import health_pb2
-
-
-@enum.unique
-class HealthStatus(enum.Enum):
- """Statuses for a service mirroring the reference health.proto's values."""
- UNKNOWN = health_pb2.HealthCheckResponse.UNKNOWN
- SERVING = health_pb2.HealthCheckResponse.SERVING
- NOT_SERVING = health_pb2.HealthCheckResponse.NOT_SERVING
-
-
-class _HealthServicer(health_pb2.EarlyAdopterHealthServicer):
- """Servicer handling RPCs for service statuses."""
-
- def __init__(self):
- self._server_status_lock = threading.Lock()
- self._server_status = {}
-
- def Check(self, request, context):
- with self._server_status_lock:
- if request.service not in self._server_status:
- # TODO(atash): once the Python API has a way of setting the server
- # status, bring us into conformance with the health check spec by
- # returning the NOT_FOUND status here.
- raise NotImplementedError()
- else:
- return health_pb2.HealthCheckResponse(
- status=self._server_status[request.service].value)
-
- def set(service, status):
- if not isinstance(status, HealthStatus):
- raise TypeError('expected grpc.health.v1.health.HealthStatus '
- 'for argument `status` but got {}'.format(status))
- with self._server_status_lock:
- self._server_status[service] = status
-
-
-class HealthServer(health_pb2.EarlyAdopterHealthServer):
- """Interface for the reference gRPC Python health server."""
- __metaclass__ = abc.ABCMeta
-
- @abc.abstractmethod
- def start(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def stop(self):
- raise NotImplementedError()
-
- @abc.abstractmethod
- def set(self, service, status):
- """Set the status of the given service.
-
- Args:
- service (str): service name of the service to set the reported status of
- status (HealthStatus): status to set for the specified service
- """
- raise NotImplementedError()
-
-
-class _HealthServerImplementation(HealthServer):
- """Implementation for the reference gRPC Python health server."""
-
- def __init__(self, server, servicer):
- self._server = server
- self._servicer = servicer
-
- def start(self):
- self._server.start()
-
- def stop(self):
- self._server.stop()
-
- def set(self, service, status):
- self._servicer.set(service, status)
-
-
-def create_Health_server(port, private_key=None, certificate_chain=None):
- """Get a HealthServer instance.
-
- Args:
- port (int): port number passed through to health_pb2 server creation
- routine.
- private_key (str): to-be-created server's desired private key
- certificate_chain (str): to-be-created server's desired certificate chain
-
- Returns:
- An instance of HealthServer (conforming thus to
- EarlyAdopterHealthServer and providing a method to set server status)."""
- servicer = _HealthServicer()
- server = health_pb2.early_adopter_create_Health_server(
- servicer, port=port, private_key=private_key,
- certificate_chain=certificate_chain)
- return _HealthServerImplementation(server, servicer)
diff --git a/src/python/grpcio_health_checking/grpc/__init__.py b/src/python/grpcio_health_checking/grpc_health/__init__.py
index 7086519106..7086519106 100644
--- a/src/python/grpcio_health_checking/grpc/__init__.py
+++ b/src/python/grpcio_health_checking/grpc_health/__init__.py
diff --git a/src/python/grpcio_health_checking/grpc/health/__init__.py b/src/python/grpcio_health_checking/grpc_health/health/__init__.py
index 7086519106..7086519106 100644
--- a/src/python/grpcio_health_checking/grpc/health/__init__.py
+++ b/src/python/grpcio_health_checking/grpc_health/health/__init__.py
diff --git a/src/python/grpcio_health_checking/grpc/health/v1/__init__.py b/src/python/grpcio_health_checking/grpc_health/health/v1/__init__.py
index 7086519106..7086519106 100644
--- a/src/python/grpcio_health_checking/grpc/health/v1/__init__.py
+++ b/src/python/grpcio_health_checking/grpc_health/health/v1/__init__.py
diff --git a/src/python/grpcio_health_checking/grpc_health/health/v1/health.py b/src/python/grpcio_health_checking/grpc_health/health/v1/health.py
new file mode 100644
index 0000000000..8da60c70cb
--- /dev/null
+++ b/src/python/grpcio_health_checking/grpc_health/health/v1/health.py
@@ -0,0 +1,66 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Reference implementation for health checking in gRPC Python."""
+
+import threading
+
+from grpc_health.health.v1 import health_pb2
+
+
+class HealthServicer(health_pb2.BetaHealthServicer):
+ """Servicer handling RPCs for service statuses."""
+
+ def __init__(self):
+ self._server_status_lock = threading.Lock()
+ self._server_status = {}
+
+ def Check(self, request, context):
+ with self._server_status_lock:
+ if request.service not in self._server_status:
+ # TODO(atash): once the Python API has a way of setting the server
+ # status, bring us into conformance with the health check spec by
+ # returning the NOT_FOUND status here.
+ raise NotImplementedError()
+ else:
+ return health_pb2.HealthCheckResponse(
+ status=self._server_status[request.service])
+
+ def set(self, service, status):
+ """Sets the status of a service.
+
+ Args:
+ service: string, the name of the service.
+ NOTE, '' must be set.
+ status: HealthCheckResponse.status enum value indicating
+ the status of the service
+ """
+ with self._server_status_lock:
+ self._server_status[service] = status
+
diff --git a/src/python/grpcio_health_checking/commands.py b/src/python/grpcio_health_checking/health_commands.py
index 3f4ea6e22f..631066f331 100644
--- a/src/python/grpcio_health_checking/commands.py
+++ b/src/python/grpcio_health_checking/health_commands.py
@@ -33,11 +33,16 @@ import distutils
import glob
import os
import os.path
+import shutil
import subprocess
import sys
import setuptools
from setuptools.command import build_py
+from setuptools.command import sdist
+
+ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__)))
+HEALTH_PROTO = os.path.join(ROOT_DIR, '../../proto/grpc/health/v1/health.proto')
class BuildProtoModules(setuptools.Command):
@@ -76,9 +81,34 @@ class BuildProtoModules(setuptools.Command):
raise Exception('{}\nOutput:\n{}'.format(e.message, e.output))
+class CopyProtoModules(setuptools.Command):
+ """Command to copy proto modules from grpc/src/proto."""
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ pass
+
+ def run(self):
+ if os.path.isfile(HEALTH_PROTO):
+ shutil.copyfile(
+ HEALTH_PROTO,
+ os.path.join(ROOT_DIR, 'grpc_health/health/v1/health.proto'))
+
+
class BuildPy(build_py.build_py):
"""Custom project build command."""
def run(self):
+ self.run_command('copy_proto_modules')
self.run_command('build_proto_modules')
build_py.build_py.run(self)
+
+
+class SDist(sdist.sdist):
+ """Custom project build command."""
+
+ def run(self):
+ self.run_command('copy_proto_modules')
+ sdist.sdist.run(self)
diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py
index 35253ba312..d68a7ced8e 100644
--- a/src/python/grpcio_health_checking/setup.py
+++ b/src/python/grpcio_health_checking/setup.py
@@ -40,7 +40,7 @@ import setuptools
os.chdir(os.path.dirname(os.path.abspath(__file__)))
# Break import-style to ensure we can actually find our commands module.
-import commands
+import health_commands
_PACKAGES = (
setuptools.find_packages('.')
@@ -51,22 +51,21 @@ _PACKAGE_DIRECTORIES = {
}
_INSTALL_REQUIRES = (
- 'grpcio>=0.11.0b0',
+ 'grpcio>=0.13.1',
)
-_SETUP_REQUIRES = _INSTALL_REQUIRES
-
_COMMAND_CLASS = {
- 'build_proto_modules': commands.BuildProtoModules,
- 'build_py': commands.BuildPy,
+ 'copy_proto_modules': health_commands.CopyProtoModules,
+ 'build_proto_modules': health_commands.BuildProtoModules,
+ 'build_py': health_commands.BuildPy,
+ 'sdist': health_commands.SDist,
}
setuptools.setup(
name='grpcio_health_checking',
- version='0.11.0b0',
+ version='0.14.0b0',
packages=list(_PACKAGES),
package_dir=_PACKAGE_DIRECTORIES,
install_requires=_INSTALL_REQUIRES,
- setup_requires=_SETUP_REQUIRES,
cmdclass=_COMMAND_CLASS
)
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 4bb615f8be..b6ddbe88dc 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -52,21 +52,41 @@ typedef struct next_call_stack {
grpc_event event;
gpr_timespec timeout;
void *tag;
+ volatile int interrupted;
} next_call_stack;
/* Calls grpc_completion_queue_next without holding the ruby GIL */
static void *grpc_rb_completion_queue_next_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
- next_call->event =
- grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL);
+ gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
+ gpr_timespec deadline;
+ do {
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
+ next_call->event = grpc_completion_queue_next(next_call->cq,
+ deadline, NULL);
+ if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
+ gpr_time_cmp(deadline, next_call->timeout) > 0) {
+ break;
+ }
+ } while (!next_call->interrupted);
return NULL;
}
/* Calls grpc_completion_queue_pluck without holding the ruby GIL */
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
- next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
- next_call->timeout, NULL);
+ gpr_timespec increment = gpr_time_from_millis(20, GPR_TIMESPAN);
+ gpr_timespec deadline;
+ do {
+ deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), increment);
+ next_call->event = grpc_completion_queue_pluck(next_call->cq,
+ next_call->tag,
+ deadline, NULL);
+ if (next_call->event.type != GRPC_QUEUE_TIMEOUT ||
+ gpr_time_cmp(deadline, next_call->timeout) > 0) {
+ break;
+ }
+ } while (!next_call->interrupted);
return NULL;
}
@@ -139,6 +159,11 @@ static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
return TypedData_Wrap_Struct(cls, &grpc_rb_completion_queue_data_type, cq);
}
+static void unblock_func(void *param) {
+ next_call_stack *const next_call = (next_call_stack*)param;
+ next_call->interrupted = 1;
+}
+
/* Blocks until the next event for given tag is available, and returns the
* event. */
grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
@@ -158,8 +183,23 @@ grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
next_call.tag = ROBJECT(tag);
}
next_call.event.type = GRPC_QUEUE_TIMEOUT;
- rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
- (void *)&next_call, NULL, NULL);
+ /* Loop until we finish a pluck without an interruption. The internal
+ pluck function runs either until it is interrupted or it gets an
+ event, or time runs out.
+
+ The basic reason we need this relatively complicated construction is that
+ we need to re-acquire the GVL when an interrupt comes in, so that the ruby
+ interpreter can do what it needs to do with the interrupt. But we also need
+ to get back to plucking when the interrupt has been handled. */
+ do {
+ next_call.interrupted = 0;
+ rb_thread_call_without_gvl(grpc_rb_completion_queue_pluck_no_gil,
+ (void *)&next_call, unblock_func,
+ (void *)&next_call);
+ /* If an interrupt prevented pluck from returning useful information, then
+ any plucks that did complete must have timed out */
+ } while (next_call.interrupted &&
+ next_call.event.type == GRPC_QUEUE_TIMEOUT);
return next_call.event;
}
diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h
index 6cc4e96589..42de43c3fb 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.h
+++ b/src/ruby/ext/grpc/rb_completion_queue.h
@@ -46,7 +46,7 @@ grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v);
*
* This avoids having code that holds the GIL repeated at multiple sites.
*/
-grpc_event grpc_rb_completion_queue_pluck_event(VALUE cqueue, VALUE tag,
+grpc_event grpc_rb_completion_queue_pluck_event(VALUE self, VALUE tag,
VALUE timeout);
/* Initializes the CompletionQueue class. */
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index 5277148fc9..06a07ac646 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -50,7 +50,6 @@
#include "rb_loader.h"
#include "rb_server.h"
#include "rb_server_credentials.h"
-#include "rb_signal.h"
static VALUE grpc_rb_cTimeVal = Qnil;
@@ -333,7 +332,6 @@ void Init_grpc_c() {
Init_grpc_channel_credentials();
Init_grpc_server();
Init_grpc_server_credentials();
- Init_grpc_signals();
Init_grpc_status_codes();
Init_grpc_time_consts();
}
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index bc43f9d36b..cebbe8c40f 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -125,6 +125,7 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
+grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -391,6 +392,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
+ grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index b67361ca25..d7ea6c574c 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -43,6 +43,7 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -325,6 +326,9 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
+typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
+extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
+#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 2b3acaaf59..0899feb685 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -60,6 +60,7 @@ typedef struct grpc_rb_server {
VALUE mark;
/* The actual server */
grpc_server *wrapped;
+ grpc_completion_queue *queue;
} grpc_rb_server;
/* Destroys server instances. */
@@ -145,6 +146,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
}
grpc_server_register_completion_queue(srv, cq, NULL);
wrapper->wrapped = srv;
+ wrapper->queue = cq;
/* Add the cq as the server's mark object. This ensures the ruby cq can't be
GCed before the server */
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb
index 7c9aae30e9..79fa705b1c 100644
--- a/src/ruby/lib/grpc.rb
+++ b/src/ruby/lib/grpc.rb
@@ -33,7 +33,6 @@ require_relative 'grpc/errors'
require_relative 'grpc/grpc'
require_relative 'grpc/logconfig'
require_relative 'grpc/notifier'
-require_relative 'grpc/signals'
require_relative 'grpc/version'
require_relative 'grpc/core/time_consts'
require_relative 'grpc/generic/active_call'
@@ -48,5 +47,3 @@ begin
ensure
file.close
end
-
-GRPC::Signals.wait_for_signals
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index fd20a86144..7fe588bd4c 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -30,7 +30,6 @@
require 'forwardable'
require 'weakref'
require_relative 'bidi_call'
-require_relative '../signals'
class Struct
# BatchResult is the struct returned by calls to call#start_batch.
@@ -123,10 +122,6 @@ module GRPC
@unmarshal = unmarshal
@metadata_tag = metadata_tag
@op_notifier = nil
- weak_self = WeakRef.new(self)
- remove_handler = GRPC::Signals.register_handler(&weak_self
- .method(:cancel))
- ObjectSpace.define_finalizer(self, remove_handler)
end
# output_metadata are provides access to hash that can be used to
diff --git a/src/ruby/lib/grpc/generic/client_stub.rb b/src/ruby/lib/grpc/generic/client_stub.rb
index 68e167a69f..12946fe819 100644
--- a/src/ruby/lib/grpc/generic/client_stub.rb
+++ b/src/ruby/lib/grpc/generic/client_stub.rb
@@ -49,7 +49,12 @@ module GRPC
fail(TypeError, '!Channel') unless alt_chan.is_a?(Core::Channel)
return alt_chan
end
- kw['grpc.primary_user_agent'] = "grpc-ruby/#{VERSION}"
+ if kw['grpc.primary_user_agent'].nil?
+ kw['grpc.primary_user_agent'] = ''
+ else
+ kw['grpc.primary_user_agent'] += ' '
+ end
+ kw['grpc.primary_user_agent'] += "grpc-ruby/#{VERSION}"
unless creds.is_a?(Core::ChannelCredentials) || creds.is_a?(Symbol)
fail(TypeError, '!ChannelCredentials or Symbol')
end
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 238aaa9656..e1496d491a 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -28,7 +28,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require_relative '../grpc'
-require_relative '../signals'
require_relative 'active_call'
require_relative 'service'
require 'thread'
@@ -353,10 +352,7 @@ module GRPC
transition_running_state(:running)
@run_cond.broadcast
end
- remove_signal_handler = GRPC::Signals.register_handler { stop }
loop_handle_server_calls
- # Remove signal handler when server stops
- remove_signal_handler.call
end
alias_method :run_till_terminated, :run
diff --git a/src/ruby/tools/grpc-tools.gemspec b/src/ruby/tools/grpc-tools.gemspec
index af904de4a9..9fa4b66392 100644
--- a/src/ruby/tools/grpc-tools.gemspec
+++ b/src/ruby/tools/grpc-tools.gemspec
@@ -18,5 +18,5 @@ Gem::Specification.new do |s|
s.platform = Gem::Platform::RUBY
- s.executables = %w( protoc.rb protoc_grpc_ruby_plugin.rb )
+ s.executables = %w( grpc_tools_ruby_protoc.rb grpc_tools_ruby_protoc_plugin.rb )
end