aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/python_generator.cc110
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.c69
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_api_dummy.c85
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c640
-rw-r--r--src/core/lib/channel/channel_args.c9
-rw-r--r--src/core/lib/http/parser.c3
-rw-r--r--src/core/lib/iomgr/ev_poll_and_epoll_posix.c2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.c1212
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.h41
-rw-r--r--src/core/lib/iomgr/ev_posix.c93
-rw-r--r--src/core/lib/iomgr/iomgr_posix.c6
-rw-r--r--src/core/lib/iomgr/udp_server.c18
-rw-r--r--src/core/lib/iomgr/udp_server.h6
-rw-r--r--src/cpp/common/channel_arguments.cc2
-rw-r--r--src/cpp/server/server_builder.cc2
-rw-r--r--src/csharp/build_packages.bat29
-rw-r--r--src/csharp/buildall.bat29
-rw-r--r--src/php/ext/grpc/call.c1
-rw-r--r--src/proto/grpc/reflection/v1alpha/reflection.proto151
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.c2
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.h4
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py4
-rw-r--r--src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py295
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto51
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto77
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto47
-rw-r--r--src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto (renamed from src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto)85
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h4
29 files changed, 2778 insertions, 301 deletions
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc
index 8e76e6dce6..cd5ddd8832 100644
--- a/src/compiler/python_generator.cc
+++ b/src/compiler/python_generator.cc
@@ -147,7 +147,8 @@ class IndentScope {
// END FORMATTING BOILERPLATE //
////////////////////////////////
-// TODO(protobuf team): Export `ModuleName` from protobuf's
+// TODO(https://github.com/google/protobuf/issues/888):
+// Export `ModuleName` from protobuf's
// `src/google/protobuf/compiler/python/python_generator.cc` file.
grpc::string ModuleName(const grpc::string& filename) {
grpc::string basename = StripProto(filename);
@@ -156,8 +157,23 @@ grpc::string ModuleName(const grpc::string& filename) {
return basename + "_pb2";
}
+// TODO(https://github.com/google/protobuf/issues/888):
+// Export `ModuleAlias` from protobuf's
+// `src/google/protobuf/compiler/python/python_generator.cc` file.
+grpc::string ModuleAlias(const grpc::string& filename) {
+ grpc::string module_name = ModuleName(filename);
+ // We can't have dots in the module name, so we replace each with _dot_.
+ // But that could lead to a collision between a.b and a_dot_b, so we also
+ // duplicate each underscore.
+ module_name = StringReplace(module_name, "_", "__");
+ module_name = StringReplace(module_name, ".", "_dot_");
+ return module_name;
+}
+
+
bool GetModuleAndMessagePath(const Descriptor* type,
- pair<grpc::string, grpc::string>* out) {
+ const ServiceDescriptor* service,
+ grpc::string* out) {
const Descriptor* path_elem_type = type;
vector<const Descriptor*> message_path;
do {
@@ -170,7 +186,9 @@ bool GetModuleAndMessagePath(const Descriptor* type,
file_name.find_last_of(".proto") == file_name.size() - 1)) {
return false;
}
- grpc::string module = ModuleName(file_name);
+ grpc::string service_file_name = service->file()->name();
+ grpc::string module = service_file_name == file_name ?
+ "" : ModuleAlias(file_name) + ".";
grpc::string message_type;
for (auto path_iter = message_path.rbegin();
path_iter != message_path.rend(); ++path_iter) {
@@ -178,7 +196,7 @@ bool GetModuleAndMessagePath(const Descriptor* type,
}
// no pop_back prior to C++11
message_type.resize(message_type.size() - 1);
- *out = make_pair(module, message_type);
+ *out = module + message_type;
return true;
}
@@ -210,7 +228,7 @@ static void PrintAllComments(const DescriptorType* desc, Printer* printer) {
bool PrintBetaServicer(const ServiceDescriptor* service,
Printer* out) {
- out->Print("\n");
+ out->Print("\n\n");
out->Print("class Beta$Service$Servicer(object):\n", "Service",
service->name());
{
@@ -234,7 +252,7 @@ bool PrintBetaServicer(const ServiceDescriptor* service,
bool PrintBetaStub(const ServiceDescriptor* service,
Printer* out) {
- out->Print("\n");
+ out->Print("\n\n");
out->Print("class Beta$Service$Stub(object):\n", "Service", service->name());
{
IndentScope raii_class_indent(out);
@@ -244,7 +262,7 @@ bool PrintBetaStub(const ServiceDescriptor* service,
grpc::string arg_name = meth->client_streaming() ?
"request_iterator" : "request";
auto methdict = ListToDict({"Method", meth->name(), "ArgName", arg_name});
- out->Print(methdict, "def $Method$(self, $ArgName$, timeout):\n");
+ out->Print(methdict, "def $Method$(self, $ArgName$, timeout, metadata=None, with_call=False, protocol_options=None):\n");
{
IndentScope raii_method_indent(out);
PrintAllComments(meth, out);
@@ -260,38 +278,31 @@ bool PrintBetaStub(const ServiceDescriptor* service,
bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name,
const ServiceDescriptor* service, Printer* out) {
- out->Print("\n");
+ out->Print("\n\n");
out->Print("def beta_create_$Service$_server(servicer, pool=None, "
"pool_size=None, default_timeout=None, maximum_timeout=None):\n",
"Service", service->name());
{
IndentScope raii_create_server_indent(out);
map<grpc::string, grpc::string> method_implementation_constructors;
- map<grpc::string, pair<grpc::string, grpc::string>>
- input_message_modules_and_classes;
- map<grpc::string, pair<grpc::string, grpc::string>>
- output_message_modules_and_classes;
+ map<grpc::string, grpc::string> input_message_modules_and_classes;
+ map<grpc::string, grpc::string> output_message_modules_and_classes;
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* method = service->method(i);
const grpc::string method_implementation_constructor =
grpc::string(method->client_streaming() ? "stream_" : "unary_") +
grpc::string(method->server_streaming() ? "stream_" : "unary_") +
"inline";
- pair<grpc::string, grpc::string> input_message_module_and_class;
- if (!GetModuleAndMessagePath(method->input_type(),
+ grpc::string input_message_module_and_class;
+ if (!GetModuleAndMessagePath(method->input_type(), service,
&input_message_module_and_class)) {
return false;
}
- pair<grpc::string, grpc::string> output_message_module_and_class;
- if (!GetModuleAndMessagePath(method->output_type(),
+ grpc::string output_message_module_and_class;
+ if (!GetModuleAndMessagePath(method->output_type(), service,
&output_message_module_and_class)) {
return false;
}
- // Import the modules that define the messages used in RPCs.
- out->Print("import $Module$\n", "Module",
- input_message_module_and_class.first);
- out->Print("import $Module$\n", "Module",
- output_message_module_and_class.first);
method_implementation_constructors.insert(
make_pair(method->name(), method_implementation_constructor));
input_message_modules_and_classes.insert(
@@ -307,13 +318,11 @@ bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name,
name_and_input_module_class_pair++) {
IndentScope raii_indent(out);
out->Print("(\'$PackageQualifiedServiceName$\', \'$MethodName$\'): "
- "$InputTypeModule$.$InputTypeClass$.FromString,\n",
+ "$InputTypeModuleAndClass$.FromString,\n",
"PackageQualifiedServiceName", package_qualified_service_name,
"MethodName", name_and_input_module_class_pair->first,
- "InputTypeModule",
- name_and_input_module_class_pair->second.first,
- "InputTypeClass",
- name_and_input_module_class_pair->second.second);
+ "InputTypeModuleAndClass",
+ name_and_input_module_class_pair->second);
}
out->Print("}\n");
out->Print("response_serializers = {\n");
@@ -324,13 +333,11 @@ bool PrintBetaServerFactory(const grpc::string& package_qualified_service_name,
name_and_output_module_class_pair++) {
IndentScope raii_indent(out);
out->Print("(\'$PackageQualifiedServiceName$\', \'$MethodName$\'): "
- "$OutputTypeModule$.$OutputTypeClass$.SerializeToString,\n",
+ "$OutputTypeModuleAndClass$.SerializeToString,\n",
"PackageQualifiedServiceName", package_qualified_service_name,
"MethodName", name_and_output_module_class_pair->first,
- "OutputTypeModule",
- name_and_output_module_class_pair->second.first,
- "OutputTypeClass",
- name_and_output_module_class_pair->second.second);
+ "OutputTypeModuleAndClass",
+ name_and_output_module_class_pair->second);
}
out->Print("}\n");
out->Print("method_implementations = {\n");
@@ -366,37 +373,30 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
map<grpc::string, grpc::string> dict = ListToDict({
"Service", service->name(),
});
- out->Print("\n");
+ out->Print("\n\n");
out->Print(dict, "def beta_create_$Service$_stub(channel, host=None,"
" metadata_transformer=None, pool=None, pool_size=None):\n");
{
IndentScope raii_create_server_indent(out);
map<grpc::string, grpc::string> method_cardinalities;
- map<grpc::string, pair<grpc::string, grpc::string>>
- input_message_modules_and_classes;
- map<grpc::string, pair<grpc::string, grpc::string>>
- output_message_modules_and_classes;
+ map<grpc::string, grpc::string> input_message_modules_and_classes;
+ map<grpc::string, grpc::string> output_message_modules_and_classes;
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* method = service->method(i);
const grpc::string method_cardinality =
grpc::string(method->client_streaming() ? "STREAM" : "UNARY") +
"_" +
- grpc::string(method->server_streaming() ? "STREAM" : "UNARY");
- pair<grpc::string, grpc::string> input_message_module_and_class;
- if (!GetModuleAndMessagePath(method->input_type(),
+ grpc::string(method->server_streaming() ? "STREAM" : "UNARY");
+ grpc::string input_message_module_and_class;
+ if (!GetModuleAndMessagePath(method->input_type(), service,
&input_message_module_and_class)) {
return false;
}
- pair<grpc::string, grpc::string> output_message_module_and_class;
- if (!GetModuleAndMessagePath(method->output_type(),
+ grpc::string output_message_module_and_class;
+ if (!GetModuleAndMessagePath(method->output_type(), service,
&output_message_module_and_class)) {
return false;
}
- // Import the modules that define the messages used in RPCs.
- out->Print("import $Module$\n", "Module",
- input_message_module_and_class.first);
- out->Print("import $Module$\n", "Module",
- output_message_module_and_class.first);
method_cardinalities.insert(
make_pair(method->name(), method_cardinality));
input_message_modules_and_classes.insert(
@@ -412,13 +412,11 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
name_and_input_module_class_pair++) {
IndentScope raii_indent(out);
out->Print("(\'$PackageQualifiedServiceName$\', \'$MethodName$\'): "
- "$InputTypeModule$.$InputTypeClass$.SerializeToString,\n",
+ "$InputTypeModuleAndClass$.SerializeToString,\n",
"PackageQualifiedServiceName", package_qualified_service_name,
"MethodName", name_and_input_module_class_pair->first,
- "InputTypeModule",
- name_and_input_module_class_pair->second.first,
- "InputTypeClass",
- name_and_input_module_class_pair->second.second);
+ "InputTypeModuleAndClass",
+ name_and_input_module_class_pair->second);
}
out->Print("}\n");
out->Print("response_deserializers = {\n");
@@ -429,13 +427,11 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
name_and_output_module_class_pair++) {
IndentScope raii_indent(out);
out->Print("(\'$PackageQualifiedServiceName$\', \'$MethodName$\'): "
- "$OutputTypeModule$.$OutputTypeClass$.FromString,\n",
+ "$OutputTypeModuleAndClass$.FromString,\n",
"PackageQualifiedServiceName", package_qualified_service_name,
"MethodName", name_and_output_module_class_pair->first,
- "OutputTypeModule",
- name_and_output_module_class_pair->second.first,
- "OutputTypeClass",
- name_and_output_module_class_pair->second.second);
+ "OutputTypeModuleAndClass",
+ name_and_output_module_class_pair->second);
}
out->Print("}\n");
out->Print("cardinalities = {\n");
@@ -463,8 +459,6 @@ bool PrintBetaStubFactory(const grpc::string& package_qualified_service_name,
bool PrintPreamble(const FileDescriptor* file,
const GeneratorConfiguration& config, Printer* out) {
- out->Print("import abc\n");
- out->Print("import six\n");
out->Print("from $Package$ import implementations as beta_implementations\n",
"Package", config.beta_package_root);
out->Print("from $Package$ import interfaces as beta_interfaces\n",
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
new file mode 100644
index 0000000000..df1acddcc0
--- /dev/null
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/transport_impl.h"
+
+// Cronet transport object
+typedef struct cronet_transport {
+ grpc_transport base; // must be first element in this structure
+ void *engine;
+ char *host;
+} cronet_transport;
+
+extern grpc_transport_vtable grpc_cronet_vtable;
+
+GRPCAPI grpc_channel *grpc_cronet_secure_channel_create(
+ void *engine, const char *target, const grpc_channel_args *args,
+ void *reserved) {
+ cronet_transport *ct = gpr_malloc(sizeof(cronet_transport));
+ ct->base.vtable = &grpc_cronet_vtable;
+ ct->engine = engine;
+ ct->host = gpr_malloc(strlen(target) + 1);
+ strcpy(ct->host, target);
+ gpr_log(GPR_DEBUG,
+ "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine,
+ ct->host);
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ return grpc_channel_create(&exec_ctx, target, args,
+ GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct);
+}
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
new file mode 100644
index 0000000000..687026c9fd
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* This file has empty implementation of all the functions exposed by the cronet
+library, so we can build it in all environments */
+
+#include <stdbool.h>
+
+#include <grpc/support/log.h>
+
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#ifdef GRPC_COMPILE_WITH_CRONET
+/* link with the real CRONET library in the build system */
+#else
+/* Dummy implementation of cronet API just to test for build-ability */
+cronet_bidirectional_stream* cronet_bidirectional_stream_create(
+ cronet_engine* engine, void* annotation,
+ cronet_bidirectional_stream_callback* callback) {
+ GPR_ASSERT(0);
+ return NULL;
+}
+
+int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_start(
+ cronet_bidirectional_stream* stream, const char* url, int priority,
+ const char* method, const cronet_bidirectional_stream_header_array* headers,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream,
+ char* buffer, int capacity) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
+ const char* buffer, int count,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+#endif /* GRPC_COMPILE_WITH_CRONET */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
new file mode 100644
index 0000000000..5bb085195c
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -0,0 +1,640 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/impl/codegen/port_platform.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/transport_impl.h"
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+
+// Global flag that gets set with GRPC_TRACE env variable
+int grpc_cronet_trace = 1;
+
+// Cronet transport object
+struct grpc_cronet_transport {
+ grpc_transport base; /* must be first element in this structure */
+ cronet_engine *engine;
+ char *host;
+};
+
+typedef struct grpc_cronet_transport grpc_cronet_transport;
+
+enum send_state {
+ CRONET_SEND_IDLE = 0,
+ CRONET_REQ_STARTED,
+ CRONET_SEND_HEADER,
+ CRONET_WRITE,
+ CRONET_WRITE_COMPLETED,
+};
+
+enum recv_state {
+ CRONET_RECV_IDLE = 0,
+ CRONET_RECV_READ_LENGTH,
+ CRONET_RECV_READ_DATA,
+ CRONET_RECV_CLOSED,
+};
+
+static const char *recv_state_name[] = {
+ "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,",
+ "CRONET_RECV_CLOSED"};
+
+// Enum that identifies calling function.
+enum e_caller {
+ PERFORM_STREAM_OP,
+ ON_READ_COMPLETE,
+ ON_RESPONSE_HEADERS_RECEIVED,
+ ON_RESPONSE_TRAILERS_RECEIVED
+};
+
+enum callback_id {
+ CB_SEND_INITIAL_METADATA = 0,
+ CB_SEND_MESSAGE,
+ CB_SEND_TRAILING_METADATA,
+ CB_RECV_MESSAGE,
+ CB_RECV_INITIAL_METADATA,
+ CB_RECV_TRAILING_METADATA,
+ CB_NUM_CALLBACKS
+};
+
+struct stream_obj {
+ // we store received bytes here as they trickle in.
+ gpr_slice_buffer write_slice_buffer;
+ cronet_bidirectional_stream *cbs;
+ gpr_slice slice;
+ gpr_slice_buffer read_slice_buffer;
+ struct grpc_slice_buffer_stream sbs;
+ char *read_buffer;
+ int remaining_read_bytes;
+ int total_read_bytes;
+
+ char *write_buffer;
+ size_t write_buffer_size;
+
+ // Hold the URL
+ char *url;
+
+ bool response_headers_received;
+ bool read_requested;
+ bool response_trailers_received;
+ bool read_closed;
+
+ // Recv message stuff
+ grpc_byte_buffer **recv_message;
+ // Initial metadata stuff
+ grpc_metadata_batch *recv_initial_metadata;
+ // Trailing metadata stuff
+ grpc_metadata_batch *recv_trailing_metadata;
+ grpc_chttp2_incoming_metadata_buffer imb;
+
+ // This mutex protects receive state machine execution
+ gpr_mu recv_mu;
+ // we can queue up up to 2 callbacks for each OP
+ grpc_closure *callback_list[CB_NUM_CALLBACKS][2];
+
+ // storage for header
+ cronet_bidirectional_stream_header *headers;
+ uint32_t num_headers;
+ cronet_bidirectional_stream_header_array header_array;
+ // state tracking
+ enum recv_state cronet_recv_state;
+ enum send_state cronet_send_state;
+};
+
+typedef struct stream_obj stream_obj;
+
+static void next_send_step(stream_obj *s);
+static void next_recv_step(stream_obj *s, enum e_caller caller);
+
+static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {}
+
+static void enqueue_callbacks(grpc_closure *callback_list[]) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ if (callback_list[0]) {
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[0], true, NULL);
+ callback_list[0] = NULL;
+ }
+ if (callback_list[1]) {
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[1], true, NULL);
+ callback_list[1] = NULL;
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void on_canceled(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_canceled %p", stream);
+ }
+}
+
+static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error);
+ }
+}
+
+static void on_succeeded(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_succeeded %p", stream);
+ }
+}
+
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *trailers) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_trailers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+
+ memset(&s->imb, 0, sizeof(s->imb));
+ grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
+ unsigned int i = 0;
+ for (i = 0; i < trailers->count; i++) {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->imb, grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(trailers->headers[i].key),
+ grpc_mdstr_from_string(trailers->headers[i].value)));
+ }
+ s->response_trailers_received = true;
+ next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
+}
+
+static void on_write_completed(cronet_bidirectional_stream *stream,
+ const char *data) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_write_completed");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]);
+ s->cronet_send_state = CRONET_WRITE_COMPLETED;
+ next_send_step(s);
+}
+
+static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
+ gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
+ uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
+ memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
+ gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+ grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
+ *s->recv_message = (grpc_byte_buffer *)&s->sbs;
+}
+
+static int parse_grpc_header(const uint8_t *data) {
+ const uint8_t *p = data + 1;
+ int length = 0;
+ length |= ((uint8_t)*p++) << 24;
+ length |= ((uint8_t)*p++) << 16;
+ length |= ((uint8_t)*p++) << 8;
+ length |= ((uint8_t)*p++);
+ return length;
+}
+
+static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+ int count) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d",
+ count, s->total_read_bytes, s->remaining_read_bytes);
+ }
+ if (count > 0) {
+ GPR_ASSERT(s->recv_message);
+ s->remaining_read_bytes -= count;
+ next_recv_step(s, ON_READ_COMPLETE);
+ } else {
+ s->read_closed = true;
+ next_recv_step(s, ON_READ_COMPLETE);
+ }
+}
+
+static void on_response_headers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *headers,
+ const char *negotiated_protocol) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_headers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
+ s->response_headers_received = true;
+ next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
+}
+
+static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_request_headers_sent");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]);
+ s->cronet_send_state = CRONET_SEND_HEADER;
+ next_send_step(s);
+}
+
+// Callback function pointers (invoked by cronet in response to events)
+static cronet_bidirectional_stream_callback callbacks = {
+ on_request_headers_sent,
+ on_response_headers_received,
+ on_read_completed,
+ on_write_completed,
+ on_response_trailers_received,
+ on_succeeded,
+ on_failed,
+ on_canceled};
+
+static void invoke_closing_callback(stream_obj *s) {
+ grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
+ s->recv_trailing_metadata);
+ if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
+ enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
+ }
+}
+
+static void set_recv_state(stream_obj *s, enum recv_state state) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]);
+ }
+ s->cronet_recv_state = state;
+}
+
+// This is invoked from perform_stream_op, and all on_xxxx callbacks.
+static void next_recv_step(stream_obj *s, enum e_caller caller) {
+ gpr_mu_lock(&s->recv_mu);
+ switch (s->cronet_recv_state) {
+ case CRONET_RECV_IDLE:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE");
+ }
+ if (caller == PERFORM_STREAM_OP ||
+ caller == ON_RESPONSE_HEADERS_RECEIVED) {
+ if (s->read_closed && s->response_trailers_received) {
+ invoke_closing_callback(s);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else if (s->response_headers_received == true &&
+ s->read_requested == true) {
+ set_recv_state(s, CRONET_RECV_READ_LENGTH);
+ s->total_read_bytes = s->remaining_read_bytes =
+ GRPC_HEADER_SIZE_IN_BYTES;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_LENGTH:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->read_closed) {
+ invoke_closing_callback(s);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else {
+ GPR_ASSERT(s->remaining_read_bytes == 0);
+ set_recv_state(s, CRONET_RECV_READ_DATA);
+ s->total_read_bytes = s->remaining_read_bytes =
+ parse_grpc_header((const uint8_t *)s->read_buffer);
+ s->read_buffer =
+ gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes);
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_DATA:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->remaining_read_bytes > 0) {
+ int offset = s->total_read_bytes - s->remaining_read_bytes;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(
+ s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes);
+ } else {
+ gpr_slice_buffer_init(&s->read_slice_buffer);
+ uint8_t *p = (uint8_t *)s->read_buffer;
+ process_recv_message(s, p);
+ set_recv_state(s, CRONET_RECV_IDLE);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ }
+ }
+ break;
+ case CRONET_RECV_CLOSED:
+ break;
+ default:
+ GPR_ASSERT(0); // Should not reach here
+ break;
+ }
+ gpr_mu_unlock(&s->recv_mu);
+}
+
+// This function takes the data from s->write_slice_buffer and assembles into
+// a contiguous byte stream with 5 byte gRPC header prepended.
+static void create_grpc_frame(stream_obj *s) {
+ gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer);
+ uint8_t *raw_data = GPR_SLICE_START_PTR(slice);
+ size_t length = GPR_SLICE_LENGTH(slice);
+ s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
+ s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size);
+ uint8_t *p = (uint8_t *)s->write_buffer;
+ // Append 5 byte header
+ *p++ = 0;
+ *p++ = (uint8_t)(length >> 24);
+ *p++ = (uint8_t)(length >> 16);
+ *p++ = (uint8_t)(length >> 8);
+ *p++ = (uint8_t)(length);
+ // append actual data
+ memcpy(p, raw_data, length);
+}
+
+static void do_write(stream_obj *s) {
+ gpr_slice_buffer *sb = &s->write_slice_buffer;
+ GPR_ASSERT(sb->count <= 1);
+ if (sb->count > 0) {
+ create_grpc_frame(s);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
+ (int)s->write_buffer_size, false);
+ }
+}
+
+//
+static void next_send_step(stream_obj *s) {
+ switch (s->cronet_send_state) {
+ case CRONET_SEND_IDLE:
+ GPR_ASSERT(
+ s->cbs); // cronet_bidirectional_stream is not initialized yet.
+ s->cronet_send_state = CRONET_REQ_STARTED;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url);
+ }
+ cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST",
+ &s->header_array, false);
+ // we no longer need the memory that was allocated earlier.
+ gpr_free(s->header_array.headers);
+ break;
+ case CRONET_SEND_HEADER:
+ do_write(s);
+ s->cronet_send_state = CRONET_WRITE;
+ break;
+ case CRONET_WRITE_COMPLETED:
+ do_write(s);
+ break;
+ default:
+ GPR_ASSERT(0);
+ break;
+ }
+}
+
+static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
+ const char *host,
+ stream_obj *s) {
+ grpc_linked_mdelem *curr = head;
+ // Walk the linked list and get number of header fields
+ uint32_t num_headers_available = 0;
+ while (curr != NULL) {
+ curr = curr->next;
+ num_headers_available++;
+ }
+ // Allocate enough memory
+ s->headers = (cronet_bidirectional_stream_header *)gpr_malloc(
+ sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+
+ // Walk the linked list again, this time copying the header fields.
+ // s->num_headers
+ // can be less than num_headers_available, as some headers are not used for
+ // cronet
+ curr = head;
+ s->num_headers = 0;
+ while (s->num_headers < num_headers_available) {
+ grpc_mdelem *mdelem = curr->md;
+ curr = curr->next;
+ const char *key = grpc_mdstr_as_c_string(mdelem->key);
+ const char *value = grpc_mdstr_as_c_string(mdelem->value);
+ if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
+ strcmp(key, ":authority") == 0) {
+ // Cronet populates these fields on its own.
+ continue;
+ }
+ if (strcmp(key, ":path") == 0) {
+ // Create URL by appending :path value to the hostname
+ gpr_asprintf(&s->url, "https://%s%s", host, value);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "extracted URL = %s", s->url);
+ }
+ continue;
+ }
+ s->headers[s->num_headers].key = key;
+ s->headers[s->num_headers].value = value;
+ s->num_headers++;
+ if (curr == NULL) {
+ break;
+ }
+ }
+}
+
+static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ GPR_ASSERT(ct->engine);
+ stream_obj *s = (stream_obj *)gs;
+ if (op->recv_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - recv_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_trailing_metadata = op->recv_trailing_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]);
+ s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
+ }
+ if (op->recv_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_message = (grpc_byte_buffer **)op->recv_message;
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]);
+ s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
+ s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
+ s->read_requested = true;
+ next_recv_step(s, PERFORM_STREAM_OP);
+ }
+ if (op->recv_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p",
+ op->on_complete);
+ }
+ s->recv_initial_metadata = op->recv_initial_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]);
+ s->callback_list[CB_RECV_INITIAL_METADATA][0] =
+ op->recv_initial_metadata_ready;
+ s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
+ }
+ if (op->send_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_initial_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->num_headers = 0;
+ convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head,
+ ct->host, s);
+ s->header_array.count = s->num_headers;
+ s->header_array.capacity = s->num_headers;
+ s->header_array.headers = s->headers;
+ GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]);
+ s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
+ }
+ if (op->send_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p",
+ op->on_complete);
+ }
+ grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice,
+ op->send_message->length, NULL);
+ // Check that compression flag is not ON. We don't support compression yet.
+ // TODO (makdharma): add compression support
+ GPR_ASSERT(op->send_message->flags == 0);
+ gpr_slice_buffer_add(&s->write_slice_buffer, s->slice);
+ if (s->cbs == NULL) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
+ }
+ s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
+ GPR_ASSERT(s->cbs);
+ s->read_closed = false;
+ s->response_trailers_received = false;
+ s->response_headers_received = false;
+ s->cronet_send_state = CRONET_SEND_IDLE;
+ s->cronet_recv_state = CRONET_RECV_IDLE;
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
+ s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
+ next_send_step(s);
+ }
+ if (op->send_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]);
+ s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
+ if (s->cbs) {
+ // Send an "empty" write to the far end to signal that we're done.
+ // This will induce the server to send down trailers.
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
+ } else {
+ // We never created a stream. This was probably an empty request.
+ invoke_closing_callback(s);
+ }
+ }
+}
+
+static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_stream_refcount *refcount,
+ const void *server_data) {
+ stream_obj *s = (stream_obj *)gs;
+ memset(s->callback_list, 0, sizeof(s->callback_list));
+ s->cbs = NULL;
+ gpr_mu_init(&s->recv_mu);
+ s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ gpr_slice_buffer_init(&s->write_slice_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_transport - init_stream");
+ }
+ return 0;
+}
+
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, void *and_free_memory) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy stream");
+ }
+ stream_obj *s = (stream_obj *)gs;
+ s->cbs = NULL;
+ gpr_free(s->read_buffer);
+ gpr_free(s->write_buffer);
+ gpr_free(s->url);
+ gpr_mu_destroy(&s->recv_mu);
+ if (and_free_memory) {
+ gpr_free(and_free_memory);
+ }
+}
+
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ gpr_free(ct->host);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy transport");
+ }
+}
+
+const grpc_transport_vtable grpc_cronet_vtable = {
+ sizeof(stream_obj), "cronet_http", init_stream,
+ set_pollset_do_nothing, perform_stream_op, NULL,
+ destroy_stream, destroy_transport, NULL};
diff --git a/src/core/lib/channel/channel_args.c b/src/core/lib/channel/channel_args.c
index 28d2d78d00..893cf0700e 100644
--- a/src/core/lib/channel/channel_args.c
+++ b/src/core/lib/channel/channel_args.c
@@ -170,7 +170,7 @@ grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
if (a == NULL) return 0;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_ALGORITHM_ARG, a->args[i].key)) {
+ !strcmp(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, a->args[i].key)) {
return (grpc_compression_algorithm)a->args[i].value.integer;
break;
}
@@ -182,7 +182,7 @@ grpc_channel_args *grpc_channel_args_set_compression_algorithm(
grpc_channel_args *a, grpc_compression_algorithm algorithm) {
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_ALGORITHM_ARG;
+ tmp.key = GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM;
tmp.value.integer = algorithm;
return grpc_channel_args_copy_and_add(a, &tmp, 1);
}
@@ -196,7 +196,8 @@ static int find_compression_algorithm_states_bitset(const grpc_channel_args *a,
size_t i;
for (i = 0; i < a->num_args; ++i) {
if (a->args[i].type == GRPC_ARG_INTEGER &&
- !strcmp(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, a->args[i].key)) {
+ !strcmp(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
+ a->args[i].key)) {
*states_arg = &a->args[i].value.integer;
return 1; /* GPR_TRUE */
}
@@ -222,7 +223,7 @@ grpc_channel_args *grpc_channel_args_compression_algorithm_set_state(
/* create a new arg */
grpc_arg tmp;
tmp.type = GRPC_ARG_INTEGER;
- tmp.key = GRPC_COMPRESSION_ALGORITHM_STATE_ARG;
+ tmp.key = GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET;
/* all enabled by default */
tmp.value.integer = (1u << GRPC_COMPRESS_ALGORITHMS_COUNT) - 1;
if (state != 0) {
diff --git a/src/core/lib/http/parser.c b/src/core/lib/http/parser.c
index a7efb5e73e..09b2ed40d1 100644
--- a/src/core/lib/http/parser.c
+++ b/src/core/lib/http/parser.c
@@ -161,8 +161,9 @@ static int add_header(grpc_http_parser *parser) {
cur++;
}
if (cur == end) {
- if (grpc_http1_trace)
+ if (grpc_http1_trace) {
gpr_log(GPR_ERROR, "Didn't find ':' in header string");
+ }
goto error;
}
GPR_ASSERT(cur >= beg);
diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
index 3c8127e1a8..aeb6e28665 100644
--- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
+++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c
@@ -790,7 +790,6 @@ static void pollset_kick(grpc_pollset *p,
static void pollset_global_init(void) {
gpr_tls_init(&g_current_thread_poller);
gpr_tls_init(&g_current_thread_worker);
- grpc_wakeup_fd_global_init();
grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
}
@@ -798,7 +797,6 @@ static void pollset_global_shutdown(void) {
grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
gpr_tls_destroy(&g_current_thread_poller);
gpr_tls_destroy(&g_current_thread_worker);
- grpc_wakeup_fd_global_destroy();
}
static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c
new file mode 100644
index 0000000000..d1752327a2
--- /dev/null
+++ b/src/core/lib/iomgr/ev_poll_posix.c
@@ -0,0 +1,1212 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_SOCKET
+
+#include "src/core/lib/iomgr/ev_poll_posix.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <poll.h>
+#include <string.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/tls.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/lib/iomgr/iomgr_internal.h"
+#include "src/core/lib/iomgr/wakeup_fd_posix.h"
+#include "src/core/lib/profiling/timers.h"
+#include "src/core/lib/support/block_annotate.h"
+
+/*******************************************************************************
+ * FD declarations
+ */
+
+typedef struct grpc_fd_watcher {
+ struct grpc_fd_watcher *next;
+ struct grpc_fd_watcher *prev;
+ grpc_pollset *pollset;
+ grpc_pollset_worker *worker;
+ grpc_fd *fd;
+} grpc_fd_watcher;
+
+struct grpc_fd {
+ int fd;
+ /* refst format:
+ bit0: 1=active/0=orphaned
+ bit1-n: refcount
+ meaning that mostly we ref by two to avoid altering the orphaned bit,
+ and just unref by 1 when we're ready to flag the object as orphaned */
+ gpr_atm refst;
+
+ gpr_mu mu;
+ int shutdown;
+ int closed;
+ int released;
+
+ /* The watcher list.
+
+ The following watcher related fields are protected by watcher_mu.
+
+ An fd_watcher is an ephemeral object created when an fd wants to
+ begin polling, and destroyed after the poll.
+
+ It denotes the fd's interest in whether to read poll or write poll
+ or both or neither on this fd.
+
+ If a watcher is asked to poll for reads or writes, the read_watcher
+ or write_watcher fields are set respectively. A watcher may be asked
+ to poll for both, in which case both fields will be set.
+
+ read_watcher and write_watcher may be NULL if no watcher has been
+ asked to poll for reads or writes.
+
+ If an fd_watcher is not asked to poll for reads or writes, it's added
+ to a linked list of inactive watchers, rooted at inactive_watcher_root.
+ If at a later time there becomes need of a poller to poll, one of
+ the inactive pollers may be kicked out of their poll loops to take
+ that responsibility. */
+ grpc_fd_watcher inactive_watcher_root;
+ grpc_fd_watcher *read_watcher;
+ grpc_fd_watcher *write_watcher;
+
+ grpc_closure *read_closure;
+ grpc_closure *write_closure;
+
+ grpc_closure *on_done_closure;
+
+ grpc_iomgr_object iomgr_object;
+};
+
+/* Begin polling on an fd.
+ Registers that the given pollset is interested in this fd - so that if read
+ or writability interest changes, the pollset can be kicked to pick up that
+ new interest.
+ Return value is:
+ (fd_needs_read? read_mask : 0) | (fd_needs_write? write_mask : 0)
+ i.e. a combination of read_mask and write_mask determined by the fd's current
+ interest in said events.
+ Polling strategies that do not need to alter their behavior depending on the
+ fd's current interest (such as epoll) do not need to call this function.
+ MUST NOT be called with a pollset lock taken */
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *rec);
+/* Complete polling previously started with fd_begin_poll
+ MUST NOT be called with a pollset lock taken
+ if got_read or got_write are 1, also does the become_{readable,writable} as
+ appropriate. */
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *rec,
+ int got_read, int got_write);
+
+/* Return 1 if this fd is orphaned, 0 otherwise */
+static bool fd_is_orphaned(grpc_fd *fd);
+
+/* Reference counting for fds */
+/*#define GRPC_FD_REF_COUNT_DEBUG*/
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file, int line);
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd, reason, __FILE__, __LINE__)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd, reason, __FILE__, __LINE__)
+#else
+static void fd_ref(grpc_fd *fd);
+static void fd_unref(grpc_fd *fd);
+#define GRPC_FD_REF(fd, reason) fd_ref(fd)
+#define GRPC_FD_UNREF(fd, reason) fd_unref(fd)
+#endif
+
+#define CLOSURE_NOT_READY ((grpc_closure *)0)
+#define CLOSURE_READY ((grpc_closure *)1)
+
+/*******************************************************************************
+ * pollset declarations
+ */
+
+typedef struct grpc_cached_wakeup_fd {
+ grpc_wakeup_fd fd;
+ struct grpc_cached_wakeup_fd *next;
+} grpc_cached_wakeup_fd;
+
+struct grpc_pollset_worker {
+ grpc_cached_wakeup_fd *wakeup_fd;
+ int reevaluate_polling_on_wakeup;
+ int kicked_specifically;
+ struct grpc_pollset_worker *next;
+ struct grpc_pollset_worker *prev;
+};
+
+struct grpc_pollset {
+ gpr_mu mu;
+ grpc_pollset_worker root_worker;
+ int in_flight_cbs;
+ int shutting_down;
+ int called_shutdown;
+ int kicked_without_pollers;
+ grpc_closure *shutdown_done;
+ grpc_closure_list idle_jobs;
+ /* all polled fds */
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+ /* fds that have been removed from the pollset explicitly */
+ size_t del_count;
+ size_t del_capacity;
+ grpc_fd **dels;
+ /* Local cache of eventfds for workers */
+ grpc_cached_wakeup_fd *local_wakeup_cache;
+};
+
+/* Add an fd to a pollset */
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ struct grpc_fd *fd);
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd);
+
+/* Convert a timespec to milliseconds:
+ - very small or negative poll times are clamped to zero to do a
+ non-blocking poll (which becomes spin polling)
+ - other small values are rounded up to one millisecond
+ - longer than a millisecond polls are rounded up to the next nearest
+ millisecond to avoid spinning
+ - infinite timeouts are converted to -1 */
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now);
+
+/* Allow kick to wakeup the currently polling worker */
+#define GRPC_POLLSET_CAN_KICK_SELF 1
+/* Force the wakee to repoll when awoken */
+#define GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP 2
+/* As per pollset_kick, with an extended set of flags (defined above)
+ -- mostly for fd_posix's use. */
+static void pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags);
+
+/* Return 1 if the pollset has active threads in pollset_work (pollset must
+ * be locked) */
+static int pollset_has_workers(grpc_pollset *pollset);
+
+/*******************************************************************************
+ * pollset_set definitions
+ */
+
+struct grpc_pollset_set {
+ gpr_mu mu;
+
+ size_t pollset_count;
+ size_t pollset_capacity;
+ grpc_pollset **pollsets;
+
+ size_t pollset_set_count;
+ size_t pollset_set_capacity;
+ struct grpc_pollset_set **pollset_sets;
+
+ size_t fd_count;
+ size_t fd_capacity;
+ grpc_fd **fds;
+};
+
+/*******************************************************************************
+ * fd_posix.c
+ */
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+#define REF_BY(fd, n, reason) ref_by(fd, n, reason, __FILE__, __LINE__)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n, reason, __FILE__, __LINE__)
+static void ref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_log(GPR_DEBUG, "FD %d %p ref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) + n, reason, file, line);
+#else
+#define REF_BY(fd, n, reason) ref_by(fd, n)
+#define UNREF_BY(fd, n, reason) unref_by(fd, n)
+static void ref_by(grpc_fd *fd, int n) {
+#endif
+ GPR_ASSERT(gpr_atm_no_barrier_fetch_add(&fd->refst, n) > 0);
+}
+
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void unref_by(grpc_fd *fd, int n, const char *reason, const char *file,
+ int line) {
+ gpr_atm old;
+ gpr_log(GPR_DEBUG, "FD %d %p unref %d %d -> %d [%s; %s:%d]", fd->fd, fd, n,
+ gpr_atm_no_barrier_load(&fd->refst),
+ gpr_atm_no_barrier_load(&fd->refst) - n, reason, file, line);
+#else
+static void unref_by(grpc_fd *fd, int n) {
+ gpr_atm old;
+#endif
+ old = gpr_atm_full_fetch_add(&fd->refst, -n);
+ if (old == n) {
+ gpr_mu_destroy(&fd->mu);
+ grpc_iomgr_unregister_object(&fd->iomgr_object);
+ gpr_free(fd);
+ } else {
+ GPR_ASSERT(old > n);
+ }
+}
+
+static grpc_fd *fd_create(int fd, const char *name) {
+ grpc_fd *r = gpr_malloc(sizeof(*r));
+ gpr_mu_init(&r->mu);
+ gpr_atm_rel_store(&r->refst, 1);
+ r->shutdown = 0;
+ r->read_closure = CLOSURE_NOT_READY;
+ r->write_closure = CLOSURE_NOT_READY;
+ r->fd = fd;
+ r->inactive_watcher_root.next = r->inactive_watcher_root.prev =
+ &r->inactive_watcher_root;
+ r->read_watcher = r->write_watcher = NULL;
+ r->on_done_closure = NULL;
+ r->closed = 0;
+ r->released = 0;
+
+ char *name2;
+ gpr_asprintf(&name2, "%s fd=%d", name, fd);
+ grpc_iomgr_register_object(&r->iomgr_object, name2);
+ gpr_free(name2);
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+ gpr_log(GPR_DEBUG, "FD %d %p create %s", fd, r, name);
+#endif
+ return r;
+}
+
+static bool fd_is_orphaned(grpc_fd *fd) {
+ return (gpr_atm_acq_load(&fd->refst) & 1) == 0;
+}
+
+static void pollset_kick_locked(grpc_fd_watcher *watcher) {
+ gpr_mu_lock(&watcher->pollset->mu);
+ GPR_ASSERT(watcher->worker);
+ pollset_kick_ext(watcher->pollset, watcher->worker,
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP);
+ gpr_mu_unlock(&watcher->pollset->mu);
+}
+
+static void maybe_wake_one_watcher_locked(grpc_fd *fd) {
+ if (fd->inactive_watcher_root.next != &fd->inactive_watcher_root) {
+ pollset_kick_locked(fd->inactive_watcher_root.next);
+ } else if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ } else if (fd->write_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static void wake_all_watchers_locked(grpc_fd *fd) {
+ grpc_fd_watcher *watcher;
+ for (watcher = fd->inactive_watcher_root.next;
+ watcher != &fd->inactive_watcher_root; watcher = watcher->next) {
+ pollset_kick_locked(watcher);
+ }
+ if (fd->read_watcher) {
+ pollset_kick_locked(fd->read_watcher);
+ }
+ if (fd->write_watcher && fd->write_watcher != fd->read_watcher) {
+ pollset_kick_locked(fd->write_watcher);
+ }
+}
+
+static int has_watchers(grpc_fd *fd) {
+ return fd->read_watcher != NULL || fd->write_watcher != NULL ||
+ fd->inactive_watcher_root.next != &fd->inactive_watcher_root;
+}
+
+static void close_fd_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ fd->closed = 1;
+ if (!fd->released) {
+ close(fd->fd);
+ }
+ grpc_exec_ctx_enqueue(exec_ctx, fd->on_done_closure, true, NULL);
+}
+
+static int fd_wrapped_fd(grpc_fd *fd) {
+ if (fd->released || fd->closed) {
+ return -1;
+ } else {
+ return fd->fd;
+ }
+}
+
+static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *on_done, int *release_fd,
+ const char *reason) {
+ fd->on_done_closure = on_done;
+ fd->released = release_fd != NULL;
+ if (!fd->released) {
+ shutdown(fd->fd, SHUT_RDWR);
+ } else {
+ *release_fd = fd->fd;
+ }
+ gpr_mu_lock(&fd->mu);
+ REF_BY(fd, 1, reason); /* remove active status, but keep referenced */
+ if (!has_watchers(fd)) {
+ close_fd_locked(exec_ctx, fd);
+ } else {
+ wake_all_watchers_locked(fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+ UNREF_BY(fd, 2, reason); /* drop the reference */
+}
+
+/* increment refcount by two to avoid changing the orphan bit */
+#ifdef GRPC_FD_REF_COUNT_DEBUG
+static void fd_ref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ ref_by(fd, 2, reason, file, line);
+}
+
+static void fd_unref(grpc_fd *fd, const char *reason, const char *file,
+ int line) {
+ unref_by(fd, 2, reason, file, line);
+}
+#else
+static void fd_ref(grpc_fd *fd) { ref_by(fd, 2); }
+
+static void fd_unref(grpc_fd *fd) { unref_by(fd, 2); }
+#endif
+
+static void notify_on_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st, grpc_closure *closure) {
+ if (*st == CLOSURE_NOT_READY) {
+ /* not ready ==> switch to a waiting state by setting the closure */
+ *st = closure;
+ } else if (*st == CLOSURE_READY) {
+ /* already ready ==> queue the closure to run immediately */
+ *st = CLOSURE_NOT_READY;
+ grpc_exec_ctx_enqueue(exec_ctx, closure, !fd->shutdown, NULL);
+ maybe_wake_one_watcher_locked(fd);
+ } else {
+ /* upcallptr was set to a different closure. This is an error! */
+ gpr_log(GPR_ERROR,
+ "User called a notify_on function with a previous callback still "
+ "pending");
+ abort();
+ }
+}
+
+/* returns 1 if state becomes not ready */
+static int set_ready_locked(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure **st) {
+ if (*st == CLOSURE_READY) {
+ /* duplicate ready ==> ignore */
+ return 0;
+ } else if (*st == CLOSURE_NOT_READY) {
+ /* not ready, and not waiting ==> flag ready */
+ *st = CLOSURE_READY;
+ return 0;
+ } else {
+ /* waiting ==> queue closure */
+ grpc_exec_ctx_enqueue(exec_ctx, *st, !fd->shutdown, NULL);
+ *st = CLOSURE_NOT_READY;
+ return 1;
+ }
+}
+
+static void fd_shutdown(grpc_exec_ctx *exec_ctx, grpc_fd *fd) {
+ gpr_mu_lock(&fd->mu);
+ GPR_ASSERT(!fd->shutdown);
+ fd->shutdown = 1;
+ set_ready_locked(exec_ctx, fd, &fd->read_closure);
+ set_ready_locked(exec_ctx, fd, &fd->write_closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_notify_on_read(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->read_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd,
+ grpc_closure *closure) {
+ gpr_mu_lock(&fd->mu);
+ notify_on_locked(exec_ctx, fd, &fd->write_closure, closure);
+ gpr_mu_unlock(&fd->mu);
+}
+
+static uint32_t fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
+ grpc_pollset_worker *worker, uint32_t read_mask,
+ uint32_t write_mask, grpc_fd_watcher *watcher) {
+ uint32_t mask = 0;
+ grpc_closure *cur;
+ int requested;
+ /* keep track of pollers that have requested our events, in case they change
+ */
+ GRPC_FD_REF(fd, "poll");
+
+ gpr_mu_lock(&fd->mu);
+
+ /* if we are shutdown, then don't add to the watcher set */
+ if (fd->shutdown) {
+ watcher->fd = NULL;
+ watcher->pollset = NULL;
+ watcher->worker = NULL;
+ gpr_mu_unlock(&fd->mu);
+ GRPC_FD_UNREF(fd, "poll");
+ return 0;
+ }
+
+ /* if there is nobody polling for read, but we need to, then start doing so */
+ cur = fd->read_closure;
+ requested = cur != CLOSURE_READY;
+ if (read_mask && fd->read_watcher == NULL && requested) {
+ fd->read_watcher = watcher;
+ mask |= read_mask;
+ }
+ /* if there is nobody polling for write, but we need to, then start doing so
+ */
+ cur = fd->write_closure;
+ requested = cur != CLOSURE_READY;
+ if (write_mask && fd->write_watcher == NULL && requested) {
+ fd->write_watcher = watcher;
+ mask |= write_mask;
+ }
+ /* if not polling, remember this watcher in case we need someone to later */
+ if (mask == 0 && worker != NULL) {
+ watcher->next = &fd->inactive_watcher_root;
+ watcher->prev = watcher->next->prev;
+ watcher->next->prev = watcher->prev->next = watcher;
+ }
+ watcher->pollset = pollset;
+ watcher->worker = worker;
+ watcher->fd = fd;
+ gpr_mu_unlock(&fd->mu);
+
+ return mask;
+}
+
+static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher,
+ int got_read, int got_write) {
+ int was_polling = 0;
+ int kick = 0;
+ grpc_fd *fd = watcher->fd;
+
+ if (fd == NULL) {
+ return;
+ }
+
+ gpr_mu_lock(&fd->mu);
+
+ if (watcher == fd->read_watcher) {
+ /* remove read watcher, kick if we still need a read */
+ was_polling = 1;
+ if (!got_read) {
+ kick = 1;
+ }
+ fd->read_watcher = NULL;
+ }
+ if (watcher == fd->write_watcher) {
+ /* remove write watcher, kick if we still need a write */
+ was_polling = 1;
+ if (!got_write) {
+ kick = 1;
+ }
+ fd->write_watcher = NULL;
+ }
+ if (!was_polling && watcher->worker != NULL) {
+ /* remove from inactive list */
+ watcher->next->prev = watcher->prev;
+ watcher->prev->next = watcher->next;
+ }
+ if (got_read) {
+ if (set_ready_locked(exec_ctx, fd, &fd->read_closure)) {
+ kick = 1;
+ }
+ }
+ if (got_write) {
+ if (set_ready_locked(exec_ctx, fd, &fd->write_closure)) {
+ kick = 1;
+ }
+ }
+ if (kick) {
+ maybe_wake_one_watcher_locked(fd);
+ }
+ if (fd_is_orphaned(fd) && !has_watchers(fd) && !fd->closed) {
+ close_fd_locked(exec_ctx, fd);
+ }
+ gpr_mu_unlock(&fd->mu);
+
+ GRPC_FD_UNREF(fd, "poll");
+}
+
+/*******************************************************************************
+ * pollset_posix.c
+ */
+
+GPR_TLS_DECL(g_current_thread_poller);
+GPR_TLS_DECL(g_current_thread_worker);
+
+static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev->next = worker->next;
+ worker->next->prev = worker->prev;
+}
+
+static int pollset_has_workers(grpc_pollset *p) {
+ return p->root_worker.next != &p->root_worker;
+}
+
+static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
+ if (pollset_has_workers(p)) {
+ grpc_pollset_worker *w = p->root_worker.next;
+ remove_worker(p, w);
+ return w;
+ } else {
+ return NULL;
+ }
+}
+
+static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->next = &p->root_worker;
+ worker->prev = worker->next->prev;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
+ worker->prev = &p->root_worker;
+ worker->next = worker->prev->next;
+ worker->prev->next = worker->next->prev = worker;
+}
+
+static void pollset_kick_ext(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker,
+ uint32_t flags) {
+ GPR_TIMER_BEGIN("pollset_kick_ext", 0);
+
+ /* pollset->mu already held */
+ if (specific_worker != NULL) {
+ if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
+ GPR_TIMER_BEGIN("pollset_kick_ext.broadcast", 0);
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ for (specific_worker = p->root_worker.next;
+ specific_worker != &p->root_worker;
+ specific_worker = specific_worker->next) {
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ p->kicked_without_pollers = 1;
+ GPR_TIMER_END("pollset_kick_ext.broadcast", 0);
+ } else if (gpr_tls_get(&g_current_thread_worker) !=
+ (intptr_t)specific_worker) {
+ GPR_TIMER_MARK("different_thread_worker", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ specific_worker->kicked_specifically = 1;
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
+ GPR_TIMER_MARK("kick_yoself", 0);
+ if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
+ specific_worker->reevaluate_polling_on_wakeup = 1;
+ }
+ specific_worker->kicked_specifically = 1;
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ } else if (gpr_tls_get(&g_current_thread_poller) != (intptr_t)p) {
+ GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
+ GPR_TIMER_MARK("kick_anonymous", 0);
+ specific_worker = pop_front_worker(p);
+ if (specific_worker != NULL) {
+ if (gpr_tls_get(&g_current_thread_worker) == (intptr_t)specific_worker) {
+ GPR_TIMER_MARK("kick_anonymous_not_self", 0);
+ push_back_worker(p, specific_worker);
+ specific_worker = pop_front_worker(p);
+ if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
+ gpr_tls_get(&g_current_thread_worker) ==
+ (intptr_t)specific_worker) {
+ push_back_worker(p, specific_worker);
+ specific_worker = NULL;
+ }
+ }
+ if (specific_worker != NULL) {
+ GPR_TIMER_MARK("finally_kick", 0);
+ push_back_worker(p, specific_worker);
+ grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
+ }
+ } else {
+ GPR_TIMER_MARK("kicked_no_pollers", 0);
+ p->kicked_without_pollers = 1;
+ }
+ }
+
+ GPR_TIMER_END("pollset_kick_ext", 0);
+}
+
+static void pollset_kick(grpc_pollset *p,
+ grpc_pollset_worker *specific_worker) {
+ pollset_kick_ext(p, specific_worker, 0);
+}
+
+/* global state management */
+
+static void pollset_global_init(void) {
+ gpr_tls_init(&g_current_thread_poller);
+ gpr_tls_init(&g_current_thread_worker);
+ grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
+}
+
+static void pollset_global_shutdown(void) {
+ grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
+ gpr_tls_destroy(&g_current_thread_poller);
+ gpr_tls_destroy(&g_current_thread_worker);
+}
+
+static void kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
+
+/* main interface */
+
+static void pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
+ gpr_mu_init(&pollset->mu);
+ *mu = &pollset->mu;
+ pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
+ pollset->in_flight_cbs = 0;
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+ pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
+ pollset->local_wakeup_cache = NULL;
+ pollset->kicked_without_pollers = 0;
+ pollset->fd_count = 0;
+ pollset->fd_capacity = 0;
+ pollset->del_count = 0;
+ pollset->del_capacity = 0;
+ pollset->fds = NULL;
+ pollset->dels = NULL;
+}
+
+static void pollset_destroy(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ while (pollset->local_wakeup_cache) {
+ grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
+ grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
+ gpr_free(pollset->local_wakeup_cache);
+ pollset->local_wakeup_cache = next;
+ }
+ gpr_free(pollset->fds);
+ gpr_free(pollset->dels);
+ gpr_mu_destroy(&pollset->mu);
+}
+
+static void pollset_reset(grpc_pollset *pollset) {
+ GPR_ASSERT(pollset->shutting_down);
+ GPR_ASSERT(pollset->in_flight_cbs == 0);
+ GPR_ASSERT(!pollset_has_workers(pollset));
+ GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
+ GPR_ASSERT(pollset->fd_count == 0);
+ GPR_ASSERT(pollset->del_count == 0);
+ pollset->shutting_down = 0;
+ pollset->called_shutdown = 0;
+ pollset->kicked_without_pollers = 0;
+}
+
+static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_fd *fd) {
+ gpr_mu_lock(&pollset->mu);
+ size_t i;
+ /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
+ for (i = 0; i < pollset->fd_count; i++) {
+ if (pollset->fds[i] == fd) goto exit;
+ }
+ if (pollset->fd_count == pollset->fd_capacity) {
+ pollset->fd_capacity =
+ GPR_MAX(pollset->fd_capacity + 8, pollset->fd_count * 3 / 2);
+ pollset->fds =
+ gpr_realloc(pollset->fds, sizeof(grpc_fd *) * pollset->fd_capacity);
+ }
+ pollset->fds[pollset->fd_count++] = fd;
+ GRPC_FD_REF(fd, "multipoller");
+ pollset_kick(pollset, NULL);
+exit:
+ gpr_mu_unlock(&pollset->mu);
+}
+
+static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
+ GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
+ size_t i;
+ for (i = 0; i < pollset->fd_count; i++) {
+ GRPC_FD_UNREF(pollset->fds[i], "multipoller");
+ }
+ for (i = 0; i < pollset->del_count; i++) {
+ GRPC_FD_UNREF(pollset->dels[i], "multipoller_del");
+ }
+ pollset->fd_count = 0;
+ pollset->del_count = 0;
+ grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, true, NULL);
+}
+
+static void pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_pollset_worker **worker_hdl, gpr_timespec now,
+ gpr_timespec deadline) {
+ grpc_pollset_worker worker;
+ *worker_hdl = &worker;
+
+ /* pollset->mu already held */
+ int added_worker = 0;
+ int locked = 1;
+ int queued_work = 0;
+ int keep_polling = 0;
+ GPR_TIMER_BEGIN("pollset_work", 0);
+ /* this must happen before we (potentially) drop pollset->mu */
+ worker.next = worker.prev = NULL;
+ worker.reevaluate_polling_on_wakeup = 0;
+ if (pollset->local_wakeup_cache != NULL) {
+ worker.wakeup_fd = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd->next;
+ } else {
+ worker.wakeup_fd = gpr_malloc(sizeof(*worker.wakeup_fd));
+ grpc_wakeup_fd_init(&worker.wakeup_fd->fd);
+ }
+ worker.kicked_specifically = 0;
+ /* If there's work waiting for the pollset to be idle, and the
+ pollset is idle, then do that work */
+ if (!pollset_has_workers(pollset) &&
+ !grpc_closure_list_empty(pollset->idle_jobs)) {
+ GPR_TIMER_MARK("pollset_work.idle_jobs", 0);
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ goto done;
+ }
+ /* If we're shutting down then we don't execute any extended work */
+ if (pollset->shutting_down) {
+ GPR_TIMER_MARK("pollset_work.shutting_down", 0);
+ goto done;
+ }
+ /* Give do_promote priority so we don't starve it out */
+ if (pollset->in_flight_cbs) {
+ GPR_TIMER_MARK("pollset_work.in_flight_cbs", 0);
+ gpr_mu_unlock(&pollset->mu);
+ locked = 0;
+ goto done;
+ }
+ /* Start polling, and keep doing so while we're being asked to
+ re-evaluate our pollers (this allows poll() based pollers to
+ ensure they don't miss wakeups) */
+ keep_polling = 1;
+ while (keep_polling) {
+ keep_polling = 0;
+ if (!pollset->kicked_without_pollers) {
+ if (!added_worker) {
+ push_front_worker(pollset, &worker);
+ added_worker = 1;
+ gpr_tls_set(&g_current_thread_worker, (intptr_t)&worker);
+ }
+ gpr_tls_set(&g_current_thread_poller, (intptr_t)pollset);
+ GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
+#define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
+#define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
+
+ int timeout;
+ int r;
+ size_t i, j, fd_count;
+ nfds_t pfd_count;
+ /* TODO(ctiller): inline some elements to avoid an allocation */
+ grpc_fd_watcher *watchers;
+ struct pollfd *pfds;
+
+ timeout = poll_deadline_to_millis_timeout(deadline, now);
+ /* TODO(ctiller): perform just one malloc here if we exceed the inline
+ * case */
+ pfds = gpr_malloc(sizeof(*pfds) * (pollset->fd_count + 2));
+ watchers = gpr_malloc(sizeof(*watchers) * (pollset->fd_count + 2));
+ fd_count = 0;
+ pfd_count = 2;
+ pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
+ pfds[0].events = POLLIN;
+ pfds[0].revents = 0;
+ pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker.wakeup_fd->fd);
+ pfds[1].events = POLLIN;
+ pfds[1].revents = 0;
+ for (i = 0; i < pollset->fd_count; i++) {
+ int remove = fd_is_orphaned(pollset->fds[i]);
+ for (j = 0; !remove && j < pollset->del_count; j++) {
+ if (pollset->fds[i] == pollset->dels[j]) remove = 1;
+ }
+ if (remove) {
+ GRPC_FD_UNREF(pollset->fds[i], "multipoller");
+ } else {
+ pollset->fds[fd_count++] = pollset->fds[i];
+ watchers[pfd_count].fd = pollset->fds[i];
+ GRPC_FD_REF(watchers[pfd_count].fd, "multipoller_start");
+ pfds[pfd_count].fd = pollset->fds[i]->fd;
+ pfds[pfd_count].revents = 0;
+ pfd_count++;
+ }
+ }
+ for (j = 0; j < pollset->del_count; j++) {
+ GRPC_FD_UNREF(pollset->dels[j], "multipoller_del");
+ }
+ pollset->del_count = 0;
+ pollset->fd_count = fd_count;
+ gpr_mu_unlock(&pollset->mu);
+
+ for (i = 2; i < pfd_count; i++) {
+ grpc_fd *fd = watchers[i].fd;
+ pfds[i].events = (short)fd_begin_poll(fd, pollset, &worker, POLLIN,
+ POLLOUT, &watchers[i]);
+ GRPC_FD_UNREF(fd, "multipoller_start");
+ }
+
+ /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
+ even going into the blocking annotation if possible */
+ GRPC_SCHEDULING_START_BLOCKING_REGION;
+ r = grpc_poll_function(pfds, pfd_count, timeout);
+ GRPC_SCHEDULING_END_BLOCKING_REGION;
+
+ if (r < 0) {
+ if (errno != EINTR) {
+ gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
+ }
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ }
+ } else if (r == 0) {
+ for (i = 2; i < pfd_count; i++) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ }
+ } else {
+ if (pfds[0].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
+ }
+ if (pfds[1].revents & POLLIN_CHECK) {
+ grpc_wakeup_fd_consume_wakeup(&worker.wakeup_fd->fd);
+ }
+ for (i = 2; i < pfd_count; i++) {
+ if (watchers[i].fd == NULL) {
+ fd_end_poll(exec_ctx, &watchers[i], 0, 0);
+ } else {
+ fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
+ pfds[i].revents & POLLOUT_CHECK);
+ }
+ }
+ }
+
+ gpr_free(pfds);
+ gpr_free(watchers);
+ GPR_TIMER_END("maybe_work_and_unlock", 0);
+ locked = 0;
+ gpr_tls_set(&g_current_thread_poller, 0);
+ } else {
+ GPR_TIMER_MARK("pollset_work.kicked_without_pollers", 0);
+ pollset->kicked_without_pollers = 0;
+ }
+ /* Finished execution - start cleaning up.
+ Note that we may arrive here from outside the enclosing while() loop.
+ In that case we won't loop though as we haven't added worker to the
+ worker list, which means nobody could ask us to re-evaluate polling). */
+ done:
+ if (!locked) {
+ queued_work |= grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ locked = 1;
+ }
+ /* If we're forced to re-evaluate polling (via pollset_kick with
+ GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
+ a loop */
+ if (worker.reevaluate_polling_on_wakeup) {
+ worker.reevaluate_polling_on_wakeup = 0;
+ pollset->kicked_without_pollers = 0;
+ if (queued_work || worker.kicked_specifically) {
+ /* If there's queued work on the list, then set the deadline to be
+ immediate so we get back out of the polling loop quickly */
+ deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
+ }
+ keep_polling = 1;
+ }
+ if (keep_polling) {
+ now = gpr_now(now.clock_type);
+ }
+ }
+ if (added_worker) {
+ remove_worker(pollset, &worker);
+ gpr_tls_set(&g_current_thread_worker, 0);
+ }
+ /* release wakeup fd to the local pool */
+ worker.wakeup_fd->next = pollset->local_wakeup_cache;
+ pollset->local_wakeup_cache = worker.wakeup_fd;
+ /* check shutdown conditions */
+ if (pollset->shutting_down) {
+ if (pollset_has_workers(pollset)) {
+ pollset_kick(pollset, NULL);
+ } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
+ pollset->called_shutdown = 1;
+ gpr_mu_unlock(&pollset->mu);
+ finish_shutdown(exec_ctx, pollset);
+ grpc_exec_ctx_flush(exec_ctx);
+ /* Continuing to access pollset here is safe -- it is the caller's
+ * responsibility to not destroy when it has outstanding calls to
+ * pollset_work.
+ * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
+ gpr_mu_lock(&pollset->mu);
+ } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ gpr_mu_unlock(&pollset->mu);
+ grpc_exec_ctx_flush(exec_ctx);
+ gpr_mu_lock(&pollset->mu);
+ }
+ }
+ *worker_hdl = NULL;
+ GPR_TIMER_END("pollset_work", 0);
+}
+
+static void pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
+ grpc_closure *closure) {
+ GPR_ASSERT(!pollset->shutting_down);
+ pollset->shutting_down = 1;
+ pollset->shutdown_done = closure;
+ pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
+ if (!pollset_has_workers(pollset)) {
+ grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs, NULL);
+ }
+ if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
+ !pollset_has_workers(pollset)) {
+ pollset->called_shutdown = 1;
+ finish_shutdown(exec_ctx, pollset);
+ }
+}
+
+static int poll_deadline_to_millis_timeout(gpr_timespec deadline,
+ gpr_timespec now) {
+ gpr_timespec timeout;
+ static const int64_t max_spin_polling_us = 10;
+ if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
+ return -1;
+ }
+ if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
+ max_spin_polling_us,
+ GPR_TIMESPAN))) <= 0) {
+ return 0;
+ }
+ timeout = gpr_time_sub(deadline, now);
+ return gpr_time_to_millis(gpr_time_add(
+ timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
+}
+
+/*******************************************************************************
+ * pollset_set_posix.c
+ */
+
+static grpc_pollset_set *pollset_set_create(void) {
+ grpc_pollset_set *pollset_set = gpr_malloc(sizeof(*pollset_set));
+ memset(pollset_set, 0, sizeof(*pollset_set));
+ gpr_mu_init(&pollset_set->mu);
+ return pollset_set;
+}
+
+static void pollset_set_destroy(grpc_pollset_set *pollset_set) {
+ size_t i;
+ gpr_mu_destroy(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ }
+ gpr_free(pollset_set->pollsets);
+ gpr_free(pollset_set->pollset_sets);
+ gpr_free(pollset_set->fds);
+ gpr_free(pollset_set);
+}
+
+static void pollset_set_add_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i, j;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->pollset_count == pollset_set->pollset_capacity) {
+ pollset_set->pollset_capacity =
+ GPR_MAX(8, 2 * pollset_set->pollset_capacity);
+ pollset_set->pollsets =
+ gpr_realloc(pollset_set->pollsets, pollset_set->pollset_capacity *
+ sizeof(*pollset_set->pollsets));
+ }
+ pollset_set->pollsets[pollset_set->pollset_count++] = pollset;
+ for (i = 0, j = 0; i < pollset_set->fd_count; i++) {
+ if (fd_is_orphaned(pollset_set->fds[i])) {
+ GRPC_FD_UNREF(pollset_set->fds[i], "pollset_set");
+ } else {
+ pollset_add_fd(exec_ctx, pollset, pollset_set->fds[i]);
+ pollset_set->fds[j++] = pollset_set->fds[i];
+ }
+ }
+ pollset_set->fd_count = j;
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_pollset(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set,
+ grpc_pollset *pollset) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ if (pollset_set->pollsets[i] == pollset) {
+ pollset_set->pollset_count--;
+ GPR_SWAP(grpc_pollset *, pollset_set->pollsets[i],
+ pollset_set->pollsets[pollset_set->pollset_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_add_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i, j;
+ gpr_mu_lock(&bag->mu);
+ if (bag->pollset_set_count == bag->pollset_set_capacity) {
+ bag->pollset_set_capacity = GPR_MAX(8, 2 * bag->pollset_set_capacity);
+ bag->pollset_sets =
+ gpr_realloc(bag->pollset_sets,
+ bag->pollset_set_capacity * sizeof(*bag->pollset_sets));
+ }
+ bag->pollset_sets[bag->pollset_set_count++] = item;
+ for (i = 0, j = 0; i < bag->fd_count; i++) {
+ if (fd_is_orphaned(bag->fds[i])) {
+ GRPC_FD_UNREF(bag->fds[i], "pollset_set");
+ } else {
+ pollset_set_add_fd(exec_ctx, item, bag->fds[i]);
+ bag->fds[j++] = bag->fds[i];
+ }
+ }
+ bag->fd_count = j;
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *bag,
+ grpc_pollset_set *item) {
+ size_t i;
+ gpr_mu_lock(&bag->mu);
+ for (i = 0; i < bag->pollset_set_count; i++) {
+ if (bag->pollset_sets[i] == item) {
+ bag->pollset_set_count--;
+ GPR_SWAP(grpc_pollset_set *, bag->pollset_sets[i],
+ bag->pollset_sets[bag->pollset_set_count]);
+ break;
+ }
+ }
+ gpr_mu_unlock(&bag->mu);
+}
+
+static void pollset_set_add_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ if (pollset_set->fd_count == pollset_set->fd_capacity) {
+ pollset_set->fd_capacity = GPR_MAX(8, 2 * pollset_set->fd_capacity);
+ pollset_set->fds = gpr_realloc(
+ pollset_set->fds, pollset_set->fd_capacity * sizeof(*pollset_set->fds));
+ }
+ GRPC_FD_REF(fd, "pollset_set");
+ pollset_set->fds[pollset_set->fd_count++] = fd;
+ for (i = 0; i < pollset_set->pollset_count; i++) {
+ pollset_add_fd(exec_ctx, pollset_set->pollsets[i], fd);
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_add_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+static void pollset_set_del_fd(grpc_exec_ctx *exec_ctx,
+ grpc_pollset_set *pollset_set, grpc_fd *fd) {
+ size_t i;
+ gpr_mu_lock(&pollset_set->mu);
+ for (i = 0; i < pollset_set->fd_count; i++) {
+ if (pollset_set->fds[i] == fd) {
+ pollset_set->fd_count--;
+ GPR_SWAP(grpc_fd *, pollset_set->fds[i],
+ pollset_set->fds[pollset_set->fd_count]);
+ GRPC_FD_UNREF(fd, "pollset_set");
+ break;
+ }
+ }
+ for (i = 0; i < pollset_set->pollset_set_count; i++) {
+ pollset_set_del_fd(exec_ctx, pollset_set->pollset_sets[i], fd);
+ }
+ gpr_mu_unlock(&pollset_set->mu);
+}
+
+/*******************************************************************************
+ * event engine binding
+ */
+
+static void shutdown_engine(void) { pollset_global_shutdown(); }
+
+static const grpc_event_engine_vtable vtable = {
+ .pollset_size = sizeof(grpc_pollset),
+
+ .fd_create = fd_create,
+ .fd_wrapped_fd = fd_wrapped_fd,
+ .fd_orphan = fd_orphan,
+ .fd_shutdown = fd_shutdown,
+ .fd_notify_on_read = fd_notify_on_read,
+ .fd_notify_on_write = fd_notify_on_write,
+
+ .pollset_init = pollset_init,
+ .pollset_shutdown = pollset_shutdown,
+ .pollset_reset = pollset_reset,
+ .pollset_destroy = pollset_destroy,
+ .pollset_work = pollset_work,
+ .pollset_kick = pollset_kick,
+ .pollset_add_fd = pollset_add_fd,
+
+ .pollset_set_create = pollset_set_create,
+ .pollset_set_destroy = pollset_set_destroy,
+ .pollset_set_add_pollset = pollset_set_add_pollset,
+ .pollset_set_del_pollset = pollset_set_del_pollset,
+ .pollset_set_add_pollset_set = pollset_set_add_pollset_set,
+ .pollset_set_del_pollset_set = pollset_set_del_pollset_set,
+ .pollset_set_add_fd = pollset_set_add_fd,
+ .pollset_set_del_fd = pollset_set_del_fd,
+
+ .kick_poller = kick_poller,
+
+ .shutdown_engine = shutdown_engine,
+};
+
+const grpc_event_engine_vtable *grpc_init_poll_posix(void) {
+ pollset_global_init();
+ return &vtable;
+}
+
+#endif
diff --git a/src/core/lib/iomgr/ev_poll_posix.h b/src/core/lib/iomgr/ev_poll_posix.h
new file mode 100644
index 0000000000..291736a2db
--- /dev/null
+++ b/src/core/lib/iomgr/ev_poll_posix.h
@@ -0,0 +1,41 @@
+/*
+ *
+ * Copyright 2015-2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
+#define GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H
+
+#include "src/core/lib/iomgr/ev_posix.h"
+
+const grpc_event_engine_vtable *grpc_init_poll_posix(void);
+
+#endif /* GRPC_CORE_LIB_IOMGR_EV_POLL_POSIX_H */
diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c
index 7df1751352..a7dfc9552d 100644
--- a/src/core/lib/iomgr/ev_posix.c
+++ b/src/core/lib/iomgr/ev_posix.c
@@ -37,23 +37,104 @@
#include "src/core/lib/iomgr/ev_posix.h"
+#include <string.h>
+
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
#include "src/core/lib/iomgr/ev_poll_and_epoll_posix.h"
+#include "src/core/lib/iomgr/ev_poll_posix.h"
+#include "src/core/lib/support/env.h"
+
+/** Default poll() function - a pointer so that it can be overridden by some
+ * tests */
+grpc_poll_function_type grpc_poll_function = poll;
static const grpc_event_engine_vtable *g_event_engine;
-grpc_poll_function_type grpc_poll_function = poll;
+typedef const grpc_event_engine_vtable *(*event_engine_factory_fn)(void);
+
+typedef struct {
+ const char *name;
+ event_engine_factory_fn factory;
+} event_engine_factory;
+
+static const event_engine_factory g_factories[] = {
+ {"poll", grpc_init_poll_posix}, {"legacy", grpc_init_poll_and_epoll_posix},
+};
+
+static void add(const char *beg, const char *end, char ***ss, size_t *ns) {
+ size_t n = *ns;
+ size_t np = n + 1;
+ char *s;
+ size_t len;
+ GPR_ASSERT(end >= beg);
+ len = (size_t)(end - beg);
+ s = gpr_malloc(len + 1);
+ memcpy(s, beg, len);
+ s[len] = 0;
+ *ss = gpr_realloc(*ss, sizeof(char **) * np);
+ (*ss)[n] = s;
+ *ns = np;
+}
+
+static void split(const char *s, char ***ss, size_t *ns) {
+ const char *c = strchr(s, ',');
+ if (c == NULL) {
+ add(s, s + strlen(s), ss, ns);
+ } else {
+ add(s, c, ss, ns);
+ split(c + 1, ss, ns);
+ }
+}
+
+static bool is(const char *want, const char *have) {
+ return 0 == strcmp(want, "all") || 0 == strcmp(want, have);
+}
+
+static void try_engine(const char *engine) {
+ for (size_t i = 0; i < GPR_ARRAY_SIZE(g_factories); i++) {
+ if (is(engine, g_factories[i].name)) {
+ if ((g_event_engine = g_factories[i].factory())) {
+ gpr_log(GPR_DEBUG, "Using polling engine: %s", g_factories[i].name);
+ return;
+ }
+ }
+ }
+}
void grpc_event_engine_init(void) {
- if ((g_event_engine = grpc_init_poll_and_epoll_posix())) {
- return;
+ char *s = gpr_getenv("GRPC_POLL_STRATEGY");
+ if (s == NULL) {
+ s = gpr_strdup("all");
+ }
+
+ char **strings = NULL;
+ size_t nstrings = 0;
+ split(s, &strings, &nstrings);
+
+ for (size_t i = 0; g_event_engine == NULL && i < nstrings; i++) {
+ try_engine(strings[i]);
+ }
+
+ for (size_t i = 0; i < nstrings; i++) {
+ gpr_free(strings[i]);
+ }
+ gpr_free(strings);
+ gpr_free(s);
+
+ if (g_event_engine == NULL) {
+ gpr_log(GPR_ERROR, "No event engine could be initialized");
+ abort();
}
- gpr_log(GPR_ERROR, "No event engine could be initialized");
- abort();
}
-void grpc_event_engine_shutdown(void) { g_event_engine->shutdown_engine(); }
+void grpc_event_engine_shutdown(void) {
+ g_event_engine->shutdown_engine();
+ g_event_engine = NULL;
+}
grpc_fd *grpc_fd_create(int fd, const char *name) {
return g_event_engine->fd_create(fd, name);
diff --git a/src/core/lib/iomgr/iomgr_posix.c b/src/core/lib/iomgr/iomgr_posix.c
index 016c501f75..cede97f4c6 100644
--- a/src/core/lib/iomgr/iomgr_posix.c
+++ b/src/core/lib/iomgr/iomgr_posix.c
@@ -41,12 +41,16 @@
#include "src/core/lib/iomgr/tcp_posix.h"
void grpc_iomgr_platform_init(void) {
+ grpc_wakeup_fd_global_init();
grpc_event_engine_init();
grpc_register_tracer("tcp", &grpc_tcp_trace);
}
void grpc_iomgr_platform_flush(void) {}
-void grpc_iomgr_platform_shutdown(void) { grpc_event_engine_shutdown(); }
+void grpc_iomgr_platform_shutdown(void) {
+ grpc_event_engine_shutdown();
+ grpc_wakeup_fd_global_destroy();
+}
#endif /* GRPC_POSIX_SOCKET */
diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c
index df6cf956d9..98ffccd59b 100644
--- a/src/core/lib/iomgr/udp_server.c
+++ b/src/core/lib/iomgr/udp_server.c
@@ -81,6 +81,7 @@ typedef struct {
grpc_closure read_closure;
grpc_closure destroyed_closure;
grpc_udp_server_read_cb read_cb;
+ grpc_udp_server_orphan_cb orphan_cb;
} server_port;
/* the overall server */
@@ -168,6 +169,10 @@ static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
server_port *sp = &s->ports[i];
sp->destroyed_closure.cb = destroyed_port;
sp->destroyed_closure.cb_arg = s;
+
+ GPR_ASSERT(sp->orphan_cb);
+ sp->orphan_cb(sp->emfd);
+
grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
"udp_listener_shutdown");
}
@@ -268,7 +273,8 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) {
static int add_socket_to_server(grpc_udp_server *s, int fd,
const struct sockaddr *addr, size_t addr_len,
- grpc_udp_server_read_cb read_cb) {
+ grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
server_port *sp;
int port;
char *addr_str;
@@ -292,6 +298,7 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
memcpy(sp->addr.untyped, addr, addr_len);
sp->addr_len = addr_len;
sp->read_cb = read_cb;
+ sp->orphan_cb = orphan_cb;
GPR_ASSERT(sp->emfd);
gpr_mu_unlock(&s->mu);
gpr_free(name);
@@ -301,7 +308,8 @@ static int add_socket_to_server(grpc_udp_server *s, int fd,
}
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb) {
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb) {
int allocated_port1 = -1;
int allocated_port2 = -1;
unsigned i;
@@ -348,7 +356,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&wild6;
addr_len = sizeof(wild6);
fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
- allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port1 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
goto done;
}
@@ -370,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
addr = (struct sockaddr *)&addr4_copy;
addr_len = sizeof(addr4_copy);
}
- allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
+ allocated_port2 =
+ add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb);
done:
gpr_free(allocated_addr);
diff --git a/src/core/lib/iomgr/udp_server.h b/src/core/lib/iomgr/udp_server.h
index d8cf957a22..33c5ce11cd 100644
--- a/src/core/lib/iomgr/udp_server.h
+++ b/src/core/lib/iomgr/udp_server.h
@@ -48,6 +48,9 @@ typedef struct grpc_udp_server grpc_udp_server;
typedef void (*grpc_udp_server_read_cb)(grpc_exec_ctx *exec_ctx, grpc_fd *emfd,
struct grpc_server *server);
+/* Called when the grpc_fd is about to be orphaned (and the FD closed). */
+typedef void (*grpc_udp_server_orphan_cb)(grpc_fd *emfd);
+
/* Create a server, initially not bound to any ports */
grpc_udp_server *grpc_udp_server_create(void);
@@ -69,7 +72,8 @@ int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned index);
/* TODO(ctiller): deprecate this, and make grpc_udp_server_add_ports to handle
all of the multiple socket port matching logic in one place */
int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
- size_t addr_len, grpc_udp_server_read_cb read_cb);
+ size_t addr_len, grpc_udp_server_read_cb read_cb,
+ grpc_udp_server_orphan_cb orphan_cb);
void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *server,
grpc_closure *on_done);
diff --git a/src/cpp/common/channel_arguments.cc b/src/cpp/common/channel_arguments.cc
index db3558f192..f297ae8587 100644
--- a/src/cpp/common/channel_arguments.cc
+++ b/src/cpp/common/channel_arguments.cc
@@ -85,7 +85,7 @@ void ChannelArguments::Swap(ChannelArguments& other) {
void ChannelArguments::SetCompressionAlgorithm(
grpc_compression_algorithm algorithm) {
- SetInt(GRPC_COMPRESSION_ALGORITHM_ARG, algorithm);
+ SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM, algorithm);
}
// Note: a second call to this will add in front the result of the first call.
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index 9658a56745..61f0f6ae2a 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -123,7 +123,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
if (max_message_size_ > 0) {
args.SetInt(GRPC_ARG_MAX_MESSAGE_LENGTH, max_message_size_);
}
- args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
+ args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
new Server(thread_pool.release(), true, max_message_size_, &args));
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index 7520b0f81a..28e4262121 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -1,3 +1,32 @@
+@rem Copyright 2016, Google Inc.
+@rem All rights reserved.
+@rem
+@rem Redistribution and use in source and binary forms, with or without
+@rem modification, are permitted provided that the following conditions are
+@rem met:
+@rem
+@rem * Redistributions of source code must retain the above copyright
+@rem notice, this list of conditions and the following disclaimer.
+@rem * Redistributions in binary form must reproduce the above
+@rem copyright notice, this list of conditions and the following disclaimer
+@rem in the documentation and/or other materials provided with the
+@rem distribution.
+@rem * Neither the name of Google Inc. nor the names of its
+@rem contributors may be used to endorse or promote products derived from
+@rem this software without specific prior written permission.
+@rem
+@rem THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+@rem "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+@rem LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+@rem A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+@rem OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+@rem SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+@rem LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+@rem DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+@rem THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+@rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+@rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
@rem Builds gRPC NuGet packages
@rem Current package versions
diff --git a/src/csharp/buildall.bat b/src/csharp/buildall.bat
index f800756dfe..0beb30c198 100644
--- a/src/csharp/buildall.bat
+++ b/src/csharp/buildall.bat
@@ -1,3 +1,32 @@
+@rem Copyright 2016, Google Inc.
+@rem All rights reserved.
+@rem
+@rem Redistribution and use in source and binary forms, with or without
+@rem modification, are permitted provided that the following conditions are
+@rem met:
+@rem
+@rem * Redistributions of source code must retain the above copyright
+@rem notice, this list of conditions and the following disclaimer.
+@rem * Redistributions in binary form must reproduce the above
+@rem copyright notice, this list of conditions and the following disclaimer
+@rem in the documentation and/or other materials provided with the
+@rem distribution.
+@rem * Neither the name of Google Inc. nor the names of its
+@rem contributors may be used to endorse or promote products derived from
+@rem this software without specific prior written permission.
+@rem
+@rem THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+@rem "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+@rem LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+@rem A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+@rem OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+@rem SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+@rem LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+@rem DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+@rem THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+@rem (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+@rem OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
@rem Convenience script to build gRPC C# from command line
setlocal
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index a0f3d160c6..a2c1c08169 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -96,6 +96,7 @@ zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned) {
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(call_object TSRMLS_CC);
call->wrapped = wrapped;
+ call->owned = owned;
return call_object;
}
diff --git a/src/proto/grpc/reflection/v1alpha/reflection.proto b/src/proto/grpc/reflection/v1alpha/reflection.proto
new file mode 100644
index 0000000000..276ff0e255
--- /dev/null
+++ b/src/proto/grpc/reflection/v1alpha/reflection.proto
@@ -0,0 +1,151 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+// Service exported by server reflection
+
+syntax = "proto3";
+
+package grpc.reflection.v1alpha;
+
+service ServerReflection {
+ // The reflection service is structured as a bidirectional stream, ensuring
+ // all related requests go to a single server.
+ rpc ServerReflectionInfo(stream ServerReflectionRequest)
+ returns (stream ServerReflectionResponse);
+}
+
+// The message sent by the client when calling ServerReflectionInfo method.
+message ServerReflectionRequest {
+ string host = 1;
+ // To use reflection service, the client should set one of the following
+ // fields in message_request. The server distinguishes requests by their
+ // defined field and then handles them using corresponding methods.
+ oneof message_request {
+ // Find a proto file by the file name.
+ string file_by_filename = 3;
+
+ // Find the proto file that declares the given fully-qualified symbol name.
+ // This field should be a fully-qualified symbol name
+ // (e.g. <package>.<service>[.<method>] or <package>.<type>).
+ string file_containing_symbol = 4;
+
+ // Find the proto file which defines an extension extending the given
+ // message type with the given field number.
+ ExtensionRequest file_containing_extension = 5;
+
+ // Finds the tag numbers used by all known extensions of extendee_type, and
+ // appends them to ExtensionNumberResponse in an undefined order.
+ // Its corresponding method is best-effort: it's not guaranteed that the
+ // reflection service will implement this method, and it's not guaranteed
+ // that this method will provide all extensions. Returns
+ // StatusCode::UNIMPLEMENTED if it's not implemented.
+ // This field should be a fully-qualified type name. The format is
+ // <package>.<type>
+ string all_extension_numbers_of_type = 6;
+
+ // List the full names of registered services. The content will not be
+ // checked.
+ string list_services = 7;
+ }
+}
+
+// The type name and extension number sent by the client when requesting
+// file_containing_extension.
+message ExtensionRequest {
+ // Fully-qualified type name. The format should be <package>.<type>
+ string containing_type = 1;
+ int32 extension_number = 2;
+}
+
+// The message sent by the server to answer ServerReflectionInfo method.
+message ServerReflectionResponse {
+ string valid_host = 1;
+ ServerReflectionRequest original_request = 2;
+ // The server set one of the following fields accroding to the message_request
+ // in the request.
+ oneof message_response {
+ // This message is used to answer file_by_filename, file_containing_symbol,
+ // file_containing_extension requests with transitive dependencies. As
+ // the repeated label is not allowed in oneof fields, we use a
+ // FileDescriptorResponse message to encapsulate the repeated fields.
+ // The reflection service is allowed to avoid sending FileDescriptorProtos
+ // that were previously sent in response to earlier requests in the stream.
+ FileDescriptorResponse file_descriptor_response = 4;
+
+ // This message is used to answer all_extension_numbers_of_type requst.
+ ExtensionNumberResponse all_extension_numbers_response = 5;
+
+ // This message is used to answer list_services request.
+ ListServiceResponse list_services_response = 6;
+
+ // This message is used when an error occurs.
+ ErrorResponse error_response = 7;
+ }
+}
+
+// Serialized FileDescriptorProto messages sent by the server answering
+// a file_by_filename, file_containing_symbol, or file_containing_extension
+// request.
+message FileDescriptorResponse {
+ // Serialized FileDescriptorProto messages. We avoid taking a dependency on
+ // descriptor.proto, which uses proto2 only features, by making them opaque
+ // bytes instead.
+ repeated bytes file_descriptor_proto = 1;
+}
+
+// A list of extension numbers sent by the server answering
+// all_extension_numbers_of_type request.
+message ExtensionNumberResponse {
+ // Full name of the base type, including the package name. The format
+ // is <package>.<type>
+ string base_type_name = 1;
+ repeated int32 extension_number = 2;
+}
+
+// A list of ServiceResponse sent by the server answering list_services request.
+message ListServiceResponse {
+ // The information of each service may be expanded in the future, so we use
+ // ServiceResponse message to encapsulate it.
+ repeated ServiceResponse service = 1;
+}
+
+// The information of a single service used by ListServiceResponse to answer
+// list_services request.
+message ServiceResponse {
+ // Full name of a registered service, including its package name. The format
+ // is <package>.<service>
+ string name = 1;
+}
+
+// The error code and error message sent by the server when an error occurs.
+message ErrorResponse {
+ // This field uses the error codes defined in grpc::StatusCode.
+ int32 error_code = 1;
+ string error_message = 2;
+}
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c
index f0a40dbb35..09551472b5 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.c
+++ b/src/python/grpcio/grpc/_cython/imports.generated.c
@@ -125,6 +125,7 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
+grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -395,6 +396,7 @@ void pygrpc_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
+ grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h
index d5e810b7cf..54c8aaad13 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.h
+++ b/src/python/grpcio/grpc/_cython/imports.generated.h
@@ -43,6 +43,7 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -325,6 +326,9 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
+typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
+extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
+#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 4b98dc1a13..162191b06d 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -95,6 +95,7 @@ CORE_SOURCE_FILES = [
'src/core/lib/iomgr/endpoint_pair_posix.c',
'src/core/lib/iomgr/endpoint_pair_windows.c',
'src/core/lib/iomgr/ev_poll_and_epoll_posix.c',
+ 'src/core/lib/iomgr/ev_poll_posix.c',
'src/core/lib/iomgr/ev_posix.c',
'src/core/lib/iomgr/exec_ctx.c',
'src/core/lib/iomgr/executor.c',
@@ -222,6 +223,9 @@ CORE_SOURCE_FILES = [
'src/core/ext/client_config/uri_parser.c',
'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
+ 'src/core/ext/transport/cronet/client/secure/cronet_channel_create.c',
+ 'src/core/ext/transport/cronet/transport/cronet_api_dummy.c',
+ 'src/core/ext/transport/cronet/transport/cronet_transport.c',
'src/core/ext/lb_policy/grpclb/load_balancer_api.c',
'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',
diff --git a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
index 3dc3042e38..7466f88059 100644
--- a/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
+++ b/src/python/grpcio/tests/protoc_plugin/beta_python_plugin_test.py
@@ -59,11 +59,12 @@ STUB_FACTORY_IDENTIFIER = 'beta_create_TestService_stub'
class _ServicerMethods(object):
- def __init__(self, test_pb2):
+ def __init__(self, response_pb2, payload_pb2):
self._condition = threading.Condition()
self._paused = False
self._fail = False
- self._test_pb2 = test_pb2
+ self._response_pb2 = response_pb2
+ self._payload_pb2 = payload_pb2
@contextlib.contextmanager
def pause(self): # pylint: disable=invalid-name
@@ -90,22 +91,22 @@ class _ServicerMethods(object):
self._condition.wait()
def UnaryCall(self, request, unused_rpc_context):
- response = self._test_pb2.SimpleResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.SimpleResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * request.response_size
self._control()
return response
def StreamingOutputCall(self, request, unused_rpc_context):
for parameter in request.response_parameters:
- response = self._test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
def StreamingInputCall(self, request_iter, unused_rpc_context):
- response = self._test_pb2.StreamingInputCallResponse()
+ response = self._response_pb2.StreamingInputCallResponse()
aggregated_payload_size = 0
for request in request_iter:
aggregated_payload_size += len(request.payload.payload_compressable)
@@ -116,8 +117,8 @@ class _ServicerMethods(object):
def FullDuplexCall(self, request_iter, unused_rpc_context):
for request in request_iter:
for parameter in request.response_parameters:
- response = self._test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
yield response
@@ -126,8 +127,8 @@ class _ServicerMethods(object):
responses = []
for request in request_iter:
for parameter in request.response_parameters:
- response = self._test_pb2.StreamingOutputCallResponse()
- response.payload.payload_type = self._test_pb2.COMPRESSABLE
+ response = self._response_pb2.StreamingOutputCallResponse()
+ response.payload.payload_type = self._payload_pb2.COMPRESSABLE
response.payload.payload_compressable = 'a' * parameter.size
self._control()
responses.append(response)
@@ -136,23 +137,25 @@ class _ServicerMethods(object):
@contextlib.contextmanager
-def _CreateService(test_pb2):
+def _CreateService(service_pb2, response_pb2, payload_pb2):
"""Provides a servicer backend and a stub.
The servicer is just the implementation of the actual servicer passed to the
face player of the python RPC implementation; the two are detached.
Args:
- test_pb2: The test_pb2 module generated by this test.
+ service_pb2: The service_pb2 module generated by this test.
+ response_pb2: The response_pb2 module generated by this test
+ payload_pb2: The payload_pb2 module generated by this test
Yields:
A (servicer_methods, stub) pair where servicer_methods is the back-end of
the service bound to the stub and and stub is the stub on which to invoke
RPCs.
"""
- servicer_methods = _ServicerMethods(test_pb2)
+ servicer_methods = _ServicerMethods(response_pb2, payload_pb2)
- class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+ class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
def UnaryCall(self, request, context):
return servicer_methods.UnaryCall(request, context)
@@ -170,55 +173,52 @@ def _CreateService(test_pb2):
return servicer_methods.HalfDuplexCall(request_iter, context)
servicer = Servicer()
- server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
+ server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
channel = implementations.insecure_channel('localhost', port)
- stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
- yield servicer_methods, stub
+ stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
+ yield (servicer_methods, stub)
server.stop(0)
@contextlib.contextmanager
-def _CreateIncompleteService(test_pb2):
+def _CreateIncompleteService(service_pb2):
"""Provides a servicer backend that fails to implement methods and its stub.
The servicer is just the implementation of the actual servicer passed to the
face player of the python RPC implementation; the two are detached.
-
Args:
- test_pb2: The test_pb2 module generated by this test.
-
+ service_pb2: The service_pb2 module generated by this test.
Yields:
A (servicer_methods, stub) pair where servicer_methods is the back-end of
the service bound to the stub and and stub is the stub on which to invoke
RPCs.
"""
- servicer_methods = _ServicerMethods(test_pb2)
- class Servicer(getattr(test_pb2, SERVICER_IDENTIFIER)):
+ class Servicer(getattr(service_pb2, SERVICER_IDENTIFIER)):
pass
servicer = Servicer()
- server = getattr(test_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
+ server = getattr(service_pb2, SERVER_FACTORY_IDENTIFIER)(servicer)
port = server.add_insecure_port('[::]:0')
server.start()
channel = implementations.insecure_channel('localhost', port)
- stub = getattr(test_pb2, STUB_FACTORY_IDENTIFIER)(channel)
- yield servicer_methods, stub
+ stub = getattr(service_pb2, STUB_FACTORY_IDENTIFIER)(channel)
+ yield None, stub
server.stop(0)
-def _streaming_input_request_iterator(test_pb2):
+def _streaming_input_request_iterator(request_pb2, payload_pb2):
for _ in range(3):
- request = test_pb2.StreamingInputCallRequest()
- request.payload.payload_type = test_pb2.COMPRESSABLE
+ request = request_pb2.StreamingInputCallRequest()
+ request.payload.payload_type = payload_pb2.COMPRESSABLE
request.payload.payload_compressable = 'a'
yield request
-def _streaming_output_request(test_pb2):
- request = test_pb2.StreamingOutputCallRequest()
+def _streaming_output_request(request_pb2):
+ request = request_pb2.StreamingOutputCallRequest()
sizes = [1, 2, 3]
request.response_parameters.add(size=sizes[0], interval_us=0)
request.response_parameters.add(size=sizes[1], interval_us=0)
@@ -226,11 +226,11 @@ def _streaming_output_request(test_pb2):
return request
-def _full_duplex_request_iterator(test_pb2):
- request = test_pb2.StreamingOutputCallRequest()
+def _full_duplex_request_iterator(request_pb2):
+ request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
- request = test_pb2.StreamingOutputCallRequest()
+ request = request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@@ -250,8 +250,6 @@ class PythonPluginTest(unittest.TestCase):
protoc_command = 'protoc'
protoc_plugin_filename = distutils.spawn.find_executable(
'grpc_python_plugin')
- test_proto_filename = pkg_resources.resource_filename(
- 'tests.protoc_plugin', 'protoc_plugin_test.proto')
if not os.path.isfile(protoc_command):
# Assume that if we haven't built protoc that it's on the system.
protoc_command = 'protoc'
@@ -259,19 +257,44 @@ class PythonPluginTest(unittest.TestCase):
# Ensure that the output directory exists.
self.outdir = tempfile.mkdtemp()
+ # Find all proto files
+ paths = []
+ root_dir = os.path.dirname(os.path.realpath(__file__))
+ proto_dir = os.path.join(root_dir, 'protos')
+ for walk_root, _, filenames in os.walk(proto_dir):
+ for filename in filenames:
+ if filename.endswith('.proto'):
+ path = os.path.join(walk_root, filename)
+ paths.append(path)
+
# Invoke protoc with the plugin.
cmd = [
protoc_command,
'--plugin=protoc-gen-python-grpc=%s' % protoc_plugin_filename,
- '-I .',
+ '-I %s' % root_dir,
'--python_out=%s' % self.outdir,
- '--python-grpc_out=%s' % self.outdir,
- os.path.basename(test_proto_filename),
- ]
+ '--python-grpc_out=%s' % self.outdir
+ ] + paths
subprocess.check_call(' '.join(cmd), shell=True, env=os.environ,
- cwd=os.path.dirname(test_proto_filename))
+ cwd=os.path.dirname(os.path.realpath(__file__)))
+
+ # Generated proto directories dont include __init__.py, but
+ # these are needed for python package resolution
+ for walk_root, _, _ in os.walk(os.path.join(self.outdir, 'protos')):
+ path = os.path.join(walk_root, '__init__.py')
+ open(path, 'a').close()
+
sys.path.insert(0, self.outdir)
+ import protos.payload.test_payload_pb2 as payload_pb2 # pylint: disable=g-import-not-at-top
+ import protos.requests.r.test_requests_pb2 as request_pb2 # pylint: disable=g-import-not-at-top
+ import protos.responses.test_responses_pb2 as response_pb2 # pylint: disable=g-import-not-at-top
+ import protos.service.test_service_pb2 as service_pb2 # pylint: disable=g-import-not-at-top
+ self._payload_pb2 = payload_pb2
+ self._request_pb2 = request_pb2
+ self._response_pb2 = response_pb2
+ self._service_pb2 = service_pb2
+
def tearDown(self):
try:
shutil.rmtree(self.outdir)
@@ -282,43 +305,40 @@ class PythonPluginTest(unittest.TestCase):
def testImportAttributes(self):
# check that we can access the generated module and its members.
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- self.assertIsNotNone(getattr(test_pb2, SERVICER_IDENTIFIER, None))
- self.assertIsNotNone(getattr(test_pb2, STUB_IDENTIFIER, None))
- self.assertIsNotNone(getattr(test_pb2, SERVER_FACTORY_IDENTIFIER, None))
- self.assertIsNotNone(getattr(test_pb2, STUB_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, SERVICER_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, STUB_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, SERVER_FACTORY_IDENTIFIER, None))
+ self.assertIsNotNone(
+ getattr(self._service_pb2, STUB_FACTORY_IDENTIFIER, None))
def testUpDown(self):
- import protoc_plugin_test_pb2 as test_pb2
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (servicer, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(
+ self._service_pb2, self._response_pb2, self._payload_pb2):
+ self._request_pb2.SimpleRequest(response_size=13)
def testIncompleteServicer(self):
- import protoc_plugin_test_pb2 as test_pb2
- moves.reload_module(test_pb2)
- with _CreateIncompleteService(test_pb2) as (servicer, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateIncompleteService(self._service_pb2) as (_, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
try:
- response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
+ stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
except face.AbortionError as error:
self.assertEqual(interfaces.StatusCode.UNIMPLEMENTED, error.code)
def testUnaryCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
response = stub.UnaryCall(request, test_constants.LONG_TIMEOUT)
expected_response = methods.UnaryCall(request, 'not a real context!')
self.assertEqual(expected_response, response)
def testUnaryCallFuture(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
# Check that the call does not block waiting for the server to respond.
with methods.pause():
response_future = stub.UnaryCall.future(
@@ -328,10 +348,9 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testUnaryCallFutureExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
- request = test_pb2.SimpleRequest(response_size=13)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(
request, test_constants.SHORT_TIMEOUT)
@@ -339,30 +358,27 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testUnaryCallFutureCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
with methods.pause():
response_future = stub.UnaryCall.future(request, 1)
response_future.cancel()
self.assertTrue(response_future.cancelled())
def testUnaryCallFutureFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = test_pb2.SimpleRequest(response_size=13)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = self._request_pb2.SimpleRequest(response_size=13)
with methods.fail():
response_future = stub.UnaryCall.future(
request, test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testStreamingOutputCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
responses = stub.StreamingOutputCall(
request, test_constants.LONG_TIMEOUT)
expected_responses = methods.StreamingOutputCall(
@@ -372,10 +388,9 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testStreamingOutputCallExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
with methods.pause():
responses = stub.StreamingOutputCall(
request, test_constants.SHORT_TIMEOUT)
@@ -383,10 +398,9 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testStreamingOutputCallCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (unused_methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
responses = stub.StreamingOutputCall(
request, test_constants.LONG_TIMEOUT)
next(responses)
@@ -395,10 +409,9 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingOutputCallFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request = _streaming_output_request(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request = _streaming_output_request(self._request_pb2)
with methods.fail():
responses = stub.StreamingOutputCall(request, 1)
self.assertIsNotNone(responses)
@@ -406,36 +419,38 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testStreamingInputCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
response = stub.StreamingInputCall(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
expected_response = methods.StreamingInputCall(
- _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ _streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
+ 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFuture(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
response = response_future.result()
expected_response = methods.StreamingInputCall(
- _streaming_input_request_iterator(test_pb2), 'not a real RpcContext!')
+ _streaming_input_request_iterator(self._request_pb2, self._payload_pb2),
+ 'not a real RpcContext!')
self.assertEqual(expected_response, response)
def testStreamingInputCallFutureExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.SHORT_TIMEOUT)
with self.assertRaises(face.ExpirationError):
response_future.result()
@@ -443,12 +458,12 @@ class PythonPluginTest(unittest.TestCase):
response_future.exception(), face.ExpirationError)
def testStreamingInputCallFutureCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
response_future.cancel()
self.assertTrue(response_future.cancelled())
@@ -456,32 +471,32 @@ class PythonPluginTest(unittest.TestCase):
response_future.result()
def testStreamingInputCallFutureFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.fail():
response_future = stub.StreamingInputCall.future(
- _streaming_input_request_iterator(test_pb2),
+ _streaming_input_request_iterator(
+ self._request_pb2, self._payload_pb2),
test_constants.LONG_TIMEOUT)
self.assertIsNotNone(response_future.exception())
def testFullDuplexCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
responses = stub.FullDuplexCall(
- _full_duplex_request_iterator(test_pb2), test_constants.LONG_TIMEOUT)
+ _full_duplex_request_iterator(self._request_pb2),
+ test_constants.LONG_TIMEOUT)
expected_responses = methods.FullDuplexCall(
- _full_duplex_request_iterator(test_pb2), 'not a real RpcContext!')
+ _full_duplex_request_iterator(self._request_pb2),
+ 'not a real RpcContext!')
for expected_response, response in moves.zip_longest(
expected_responses, responses):
self.assertEqual(expected_response, response)
def testFullDuplexCallExpired(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request_iterator = _full_duplex_request_iterator(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._request_pb2)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.pause():
responses = stub.FullDuplexCall(
request_iterator, test_constants.SHORT_TIMEOUT)
@@ -489,10 +504,9 @@ class PythonPluginTest(unittest.TestCase):
list(responses)
def testFullDuplexCallCancelled(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
- request_iterator = _full_duplex_request_iterator(test_pb2)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._request_pb2)
responses = stub.FullDuplexCall(
request_iterator, test_constants.LONG_TIMEOUT)
next(responses)
@@ -501,10 +515,9 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testFullDuplexCallFailed(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- request_iterator = _full_duplex_request_iterator(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ request_iterator = _full_duplex_request_iterator(self._request_pb2)
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with methods.fail():
responses = stub.FullDuplexCall(
request_iterator, test_constants.LONG_TIMEOUT)
@@ -513,14 +526,13 @@ class PythonPluginTest(unittest.TestCase):
next(responses)
def testHalfDuplexCall(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
def half_duplex_request_iterator():
- request = test_pb2.StreamingOutputCallRequest()
+ request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
- request = test_pb2.StreamingOutputCallRequest()
+ request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=2, interval_us=0)
request.response_parameters.add(size=3, interval_us=0)
yield request
@@ -533,8 +545,6 @@ class PythonPluginTest(unittest.TestCase):
self.assertEqual(expected_response, response)
def testHalfDuplexCallWedged(self):
- import protoc_plugin_test_pb2 as test_pb2 # pylint: disable=g-import-not-at-top
- moves.reload_module(test_pb2)
condition = threading.Condition()
wait_cell = [False]
@contextlib.contextmanager
@@ -547,13 +557,14 @@ class PythonPluginTest(unittest.TestCase):
wait_cell[0] = False
condition.notify_all()
def half_duplex_request_iterator():
- request = test_pb2.StreamingOutputCallRequest()
+ request = self._request_pb2.StreamingOutputCallRequest()
request.response_parameters.add(size=1, interval_us=0)
yield request
with condition:
while wait_cell[0]:
condition.wait()
- with _CreateService(test_pb2) as (methods, stub):
+ with _CreateService(self._service_pb2, self._response_pb2,
+ self._payload_pb2) as (methods, stub):
with wait():
responses = stub.HalfDuplexCall(
half_duplex_request_iterator(), test_constants.SHORT_TIMEOUT)
@@ -563,5 +574,5 @@ class PythonPluginTest(unittest.TestCase):
if __name__ == '__main__':
- os.chdir(os.path.dirname(sys.argv[0]))
+ #os.chdir(os.path.dirname(sys.argv[0]))
unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto b/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto
new file mode 100644
index 0000000000..457543aa79
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/payload/test_payload.proto
@@ -0,0 +1,51 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+package grpc_protoc_plugin;
+
+enum PayloadType {
+ // Compressable text format.
+ COMPRESSABLE= 0;
+
+ // Uncompressable binary format.
+ UNCOMPRESSABLE = 1;
+
+ // Randomly chosen from all other formats defined in this enum.
+ RANDOM = 2;
+}
+
+message Payload {
+ PayloadType payload_type = 1;
+ oneof payload_body {
+ string payload_compressable = 2;
+ bytes payload_uncompressable = 3;
+ }
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto b/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto
new file mode 100644
index 0000000000..54105df6a5
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/requests/r/test_requests.proto
@@ -0,0 +1,77 @@
+// Copyright 2016, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+import "protos/payload/test_payload.proto";
+
+package grpc_protoc_plugin;
+
+message SimpleRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, server randomly chooses one from other formats.
+ PayloadType response_type = 1;
+
+ // Desired payload size in the response from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ int32 response_size = 2;
+
+ // input payload sent along with the request.
+ Payload payload = 3;
+}
+
+message StreamingInputCallRequest {
+ // input payload sent along with the request.
+ Payload payload = 1;
+
+ // Not expecting any payload from the response.
+}
+
+message ResponseParameters {
+ // Desired payload sizes in responses from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ int32 size = 1;
+
+ // Desired interval between consecutive responses in the response stream in
+ // microseconds.
+ int32 interval_us = 2;
+}
+
+message StreamingOutputCallRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, the payload from each response in the stream
+ // might be of different types. This is to simulate a mixed type of payload
+ // stream.
+ PayloadType response_type = 1;
+
+ repeated ResponseParameters response_parameters = 2;
+
+ // input payload sent along with the request.
+ Payload payload = 3;
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto b/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto
new file mode 100644
index 0000000000..734fbda86e
--- /dev/null
+++ b/src/python/grpcio/tests/protoc_plugin/protos/responses/test_responses.proto
@@ -0,0 +1,47 @@
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+syntax = "proto3";
+
+import "protos/payload/test_payload.proto";
+
+package grpc_protoc_plugin;
+
+message SimpleResponse {
+ Payload payload = 1;
+}
+
+message StreamingInputCallResponse {
+ // Aggregated size of payloads received from the client.
+ int32 aggregated_payload_size = 1;
+}
+
+message StreamingOutputCallResponse {
+ Payload payload = 1;
+}
diff --git a/src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto b/src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
index 6762a8e7f3..fe715ee7f9 100644
--- a/src/python/grpcio/tests/protoc_plugin/protoc_plugin_test.proto
+++ b/src/python/grpcio/tests/protoc_plugin/protos/service/test_service.proto
@@ -1,4 +1,4 @@
-// Copyright 2015, Google Inc.
+// Copyright 2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -27,87 +27,12 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-// An integration test service that covers all the method signature permutations
-// of unary/streaming requests/responses.
-// This file is duplicated around the code base. See GitHub issue #526.
-syntax = "proto2";
+syntax = "proto3";
-package grpc_protoc_plugin;
-
-enum PayloadType {
- // Compressable text format.
- COMPRESSABLE= 1;
-
- // Uncompressable binary format.
- UNCOMPRESSABLE = 2;
-
- // Randomly chosen from all other formats defined in this enum.
- RANDOM = 3;
-}
-
-message Payload {
- required PayloadType payload_type = 1;
- oneof payload_body {
- string payload_compressable = 2;
- bytes payload_uncompressable = 3;
- }
-}
-
-message SimpleRequest {
- // Desired payload type in the response from the server.
- // If response_type is RANDOM, server randomly chooses one from other formats.
- optional PayloadType response_type = 1 [default=COMPRESSABLE];
-
- // Desired payload size in the response from the server.
- // If response_type is COMPRESSABLE, this denotes the size before compression.
- optional int32 response_size = 2;
-
- // Optional input payload sent along with the request.
- optional Payload payload = 3;
-}
-
-message SimpleResponse {
- optional Payload payload = 1;
-}
-
-message StreamingInputCallRequest {
- // Optional input payload sent along with the request.
- optional Payload payload = 1;
+import "protos/requests/r/test_requests.proto";
+import "protos/responses/test_responses.proto";
- // Not expecting any payload from the response.
-}
-
-message StreamingInputCallResponse {
- // Aggregated size of payloads received from the client.
- optional int32 aggregated_payload_size = 1;
-}
-
-message ResponseParameters {
- // Desired payload sizes in responses from the server.
- // If response_type is COMPRESSABLE, this denotes the size before compression.
- required int32 size = 1;
-
- // Desired interval between consecutive responses in the response stream in
- // microseconds.
- required int32 interval_us = 2;
-}
-
-message StreamingOutputCallRequest {
- // Desired payload type in the response from the server.
- // If response_type is RANDOM, the payload from each response in the stream
- // might be of different types. This is to simulate a mixed type of payload
- // stream.
- optional PayloadType response_type = 1 [default=COMPRESSABLE];
-
- repeated ResponseParameters response_parameters = 2;
-
- // Optional input payload sent along with the request.
- optional Payload payload = 3;
-}
-
-message StreamingOutputCallResponse {
- optional Payload payload = 1;
-}
+package grpc_protoc_plugin;
service TestService {
// One request followed by one response.
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index bc43f9d36b..cebbe8c40f 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -125,6 +125,7 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
+grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -391,6 +392,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
+ grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index b67361ca25..d7ea6c574c 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -43,6 +43,7 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -325,6 +326,9 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
+typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
+extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
+#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import