aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/python_generator.cc47
-rw-r--r--src/core/ext/client_config/resolver_registry.c8
-rw-r--r--src/core/ext/client_config/subchannel.c2
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c20
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.c4
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h11
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c21
-rw-r--r--src/core/ext/transport/cronet/client/secure/cronet_channel_create.c69
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_api_dummy.c85
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c640
-rw-r--r--src/core/lib/channel/channel_args.h4
-rw-r--r--src/core/lib/iomgr/tcp_client_windows.c29
-rw-r--r--src/core/lib/surface/version.c2
-rw-r--r--src/cpp/common/core_codegen.cc204
-rw-r--r--src/cpp/common/core_codegen.h26
-rw-r--r--src/csharp/Grpc.Core.Tests/ClientServerTest.cs63
-rw-r--r--src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj2
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs191
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs548
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs184
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs18
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs61
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs58
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/ClientResponseStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/INativeCall.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs18
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs79
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerRequestStream.cs4
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerResponseStream.cs7
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs4
-rw-r--r--src/csharp/build_packages.bat2
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c30
-rw-r--r--src/csharp/tests.json1
-rwxr-xr-xsrc/node/tools/bin/protoc.js4
-rwxr-xr-xsrc/node/tools/bin/protoc_plugin.js6
-rw-r--r--src/node/tools/package.json2
-rw-r--r--src/objective-c/GRPCClient/GRPCCall.m54
-rw-r--r--src/objective-c/tests/InteropTests.m6
-rw-r--r--src/proto/grpc/testing/echo.proto4
-rw-r--r--src/python/grpcio/README.rst1
-rw-r--r--src/python/grpcio/grpc/__init__.py3
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.c2
-rw-r--r--src/python/grpcio/grpc/_cython/imports.generated.h4
-rw-r--r--src/python/grpcio/grpc/beta/implementations.py9
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py3
-rw-r--r--src/python/grpcio/grpc_version.py2
-rw-r--r--src/python/grpcio/tests/stress/__init__.py28
-rw-r--r--src/python/grpcio/tests/stress/client.py132
-rw-r--r--src/python/grpcio/tests/stress/metrics_server.py60
-rw-r--r--src/python/grpcio/tests/stress/test_runner.py73
-rw-r--r--src/ruby/.rubocop.yml4
-rw-r--r--src/ruby/ext/grpc/extconf.rb32
-rw-r--r--src/ruby/ext/grpc/rb_byte_buffer.c3
-rw-r--r--src/ruby/ext/grpc/rb_call.c3
-rw-r--r--src/ruby/ext/grpc/rb_call_credentials.c6
-rw-r--r--src/ruby/ext/grpc/rb_channel.c3
-rw-r--r--src/ruby/ext/grpc/rb_channel_args.c3
-rw-r--r--src/ruby/ext/grpc/rb_channel_credentials.c5
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c2
-rw-r--r--src/ruby/ext/grpc/rb_event_thread.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h4
-rw-r--r--src/ruby/ext/grpc/rb_server.c3
-rw-r--r--src/ruby/ext/grpc/rb_server_credentials.c3
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb17
-rw-r--r--src/ruby/lib/grpc/version.rb2
-rw-r--r--src/ruby/tools/README.md12
-rwxr-xr-xsrc/ruby/tools/bin/protoc.rb41
-rwxr-xr-xsrc/ruby/tools/bin/protoc_grpc_ruby_plugin.rb41
-rw-r--r--src/ruby/tools/grpc-tools.gemspec22
-rw-r--r--src/ruby/tools/os_check.rb45
-rw-r--r--src/ruby/tools/version.rb34
76 files changed, 2586 insertions, 565 deletions
diff --git a/src/compiler/python_generator.cc b/src/compiler/python_generator.cc
index 59137e1c92..8e76e6dce6 100644
--- a/src/compiler/python_generator.cc
+++ b/src/compiler/python_generator.cc
@@ -182,18 +182,40 @@ bool GetModuleAndMessagePath(const Descriptor* type,
return true;
}
+// Get all comments (leading, leading_detached, trailing) and print them as a
+// docstring. Any leading space of a line will be removed, but the line wrapping
+// will not be changed.
+template <typename DescriptorType>
+static void PrintAllComments(const DescriptorType* desc, Printer* printer) {
+ std::vector<grpc::string> comments;
+ grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING_DETACHED,
+ &comments);
+ grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_LEADING,
+ &comments);
+ grpc_generator::GetComment(desc, grpc_generator::COMMENTTYPE_TRAILING,
+ &comments);
+ if (comments.empty()) {
+ return;
+ }
+ printer->Print("\"\"\"");
+ for (auto it = comments.begin(); it != comments.end(); ++it) {
+ size_t start_pos = it->find_first_not_of(' ');
+ if (start_pos != grpc::string::npos) {
+ printer->Print(it->c_str() + start_pos);
+ }
+ printer->Print("\n");
+ }
+ printer->Print("\"\"\"\n");
+}
+
bool PrintBetaServicer(const ServiceDescriptor* service,
Printer* out) {
- grpc::string doc = "<fill me in later!>";
- map<grpc::string, grpc::string> dict = ListToDict({
- "Service", service->name(),
- "Documentation", doc,
- });
out->Print("\n");
- out->Print(dict, "class Beta$Service$Servicer(object):\n");
+ out->Print("class Beta$Service$Servicer(object):\n", "Service",
+ service->name());
{
IndentScope raii_class_indent(out);
- out->Print(dict, "\"\"\"$Documentation$\"\"\"\n");
+ PrintAllComments(service, out);
for (int i = 0; i < service->method_count(); ++i) {
auto meth = service->method(i);
grpc::string arg_name = meth->client_streaming() ?
@@ -202,6 +224,7 @@ bool PrintBetaServicer(const ServiceDescriptor* service,
"Method", meth->name(), "ArgName", arg_name);
{
IndentScope raii_method_indent(out);
+ PrintAllComments(meth, out);
out->Print("context.code(beta_interfaces.StatusCode.UNIMPLEMENTED)\n");
}
}
@@ -211,16 +234,11 @@ bool PrintBetaServicer(const ServiceDescriptor* service,
bool PrintBetaStub(const ServiceDescriptor* service,
Printer* out) {
- grpc::string doc = "The interface to which stubs will conform.";
- map<grpc::string, grpc::string> dict = ListToDict({
- "Service", service->name(),
- "Documentation", doc,
- });
out->Print("\n");
- out->Print(dict, "class Beta$Service$Stub(object):\n");
+ out->Print("class Beta$Service$Stub(object):\n", "Service", service->name());
{
IndentScope raii_class_indent(out);
- out->Print(dict, "\"\"\"$Documentation$\"\"\"\n");
+ PrintAllComments(service, out);
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* meth = service->method(i);
grpc::string arg_name = meth->client_streaming() ?
@@ -229,6 +247,7 @@ bool PrintBetaStub(const ServiceDescriptor* service,
out->Print(methdict, "def $Method$(self, $ArgName$, timeout):\n");
{
IndentScope raii_method_indent(out);
+ PrintAllComments(meth, out);
out->Print("raise NotImplementedError()\n");
}
if (!meth->server_streaming()) {
diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c
index 07f29bcb27..e7a4abd568 100644
--- a/src/core/ext/client_config/resolver_registry.c
+++ b/src/core/ext/client_config/resolver_registry.c
@@ -47,7 +47,6 @@ static int g_number_of_resolvers = 0;
static char *g_default_resolver_prefix;
void grpc_resolver_registry_init(const char *default_resolver_prefix) {
- g_number_of_resolvers = 0;
g_default_resolver_prefix = gpr_strdup(default_resolver_prefix);
}
@@ -57,6 +56,13 @@ void grpc_resolver_registry_shutdown(void) {
grpc_resolver_factory_unref(g_all_of_the_resolvers[i]);
}
gpr_free(g_default_resolver_prefix);
+ // FIXME(ctiller): this should live in grpc_resolver_registry_init,
+ // however that would have the client_config plugin call this AFTER we start
+ // registering resolvers from third party plugins, and so they'd never show
+ // up.
+ // We likely need some kind of dependency system for plugins.... what form
+ // that takes is TBD.
+ g_number_of_resolvers = 0;
}
void grpc_register_resolver_type(grpc_resolver_factory *factory) {
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 3a5af9f53d..bd45d3825c 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -268,7 +268,7 @@ static void disconnect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
con = GET_CONNECTED_SUBCHANNEL(c, no_barrier);
if (con != NULL) {
GRPC_CONNECTED_SUBCHANNEL_UNREF(exec_ctx, con, "connection");
- gpr_atm_no_barrier_store(&c->connected_subchannel, 0xdeadbeef);
+ gpr_atm_no_barrier_store(&c->connected_subchannel, (gpr_atm)0xdeadbeef);
}
gpr_mu_unlock(&c->mu);
}
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index 3f6051b892..dcdc0c6285 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -306,8 +306,10 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p,
- p->num_subchannels);
+ if (grpc_lb_round_robin_trace) {
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p,
+ p->num_subchannels);
+ }
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index fcf2abfe66..53633227ac 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -629,9 +629,10 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,
check_read_ops(exec_ctx, &t->global);
gpr_mu_lock(&t->executor.mu);
- if (t->executor.pending_actions != NULL) {
- hdr = t->executor.pending_actions;
- t->executor.pending_actions = NULL;
+ if (t->executor.pending_actions_head != NULL) {
+ hdr = t->executor.pending_actions_head;
+ t->executor.pending_actions_head = t->executor.pending_actions_tail =
+ NULL;
gpr_mu_unlock(&t->executor.mu);
while (hdr != NULL) {
hdr->action(exec_ctx, t, hdr->stream, hdr->arg);
@@ -686,8 +687,14 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx,
gpr_free(hdr);
continue;
}
- hdr->next = t->executor.pending_actions;
- t->executor.pending_actions = hdr;
+ hdr->next = NULL;
+ if (t->executor.pending_actions_head != NULL) {
+ t->executor.pending_actions_tail =
+ t->executor.pending_actions_tail->next = hdr;
+ } else {
+ t->executor.pending_actions_tail = t->executor.pending_actions_head =
+ hdr;
+ }
REF_TRANSPORT(t, "pending_action");
gpr_mu_unlock(&t->executor.mu);
}
@@ -776,7 +783,8 @@ void grpc_chttp2_add_incoming_goaway(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
uint32_t goaway_error, gpr_slice goaway_text) {
char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
gpr_free(msg);
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
index 555027c866..ebeee37f0d 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
@@ -63,6 +63,8 @@
/* don't consider adding anything bigger than this to the hpack table */
#define MAX_DECODER_SPACE_USAGE 512
+extern int grpc_http_trace;
+
typedef struct {
int is_first_frame;
/* number of bytes in 'output' when we started the frame - used to calculate
@@ -532,7 +534,9 @@ void grpc_chttp2_hpack_compressor_set_max_table_size(
}
}
c->advertise_table_size_change = 1;
- gpr_log(GPR_DEBUG, "set max table size from encoder to %d", max_table_size);
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "set max table size from encoder to %d", max_table_size);
+ }
}
void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c,
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c
index 4d64506de2..295f31c44f 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.c
@@ -253,7 +253,9 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
if (tbl->max_bytes == max_bytes) {
return;
}
- gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes);
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes);
+ }
while (tbl->mem_used > max_bytes) {
evict1(tbl);
}
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index 7a8084641d..8c4fa2d34a 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -236,9 +236,6 @@ struct grpc_chttp2_transport_parsing {
/** was a goaway frame received? */
uint8_t goaway_received;
- /** the last sent max_table_size setting */
- uint32_t last_sent_max_table_size;
-
/** initial window change */
int64_t initial_window_update;
@@ -272,6 +269,9 @@ struct grpc_chttp2_transport_parsing {
uint32_t incoming_frame_size;
uint32_t incoming_stream_id;
+ /* current max frame size */
+ uint32_t max_frame_size;
+
/* active parser */
void *parser_data;
grpc_chttp2_stream_parsing *incoming_stream;
@@ -282,6 +282,8 @@ struct grpc_chttp2_transport_parsing {
/* received settings */
uint32_t settings[GRPC_CHTTP2_NUM_SETTINGS];
+ /* last settings that were sent */
+ uint32_t last_sent_settings[GRPC_CHTTP2_NUM_SETTINGS];
/* goaway data */
grpc_status_code goaway_error;
@@ -321,7 +323,8 @@ struct grpc_chttp2_transport {
/** is a thread currently parsing */
bool parsing_active;
- grpc_chttp2_executor_action_header *pending_actions;
+ grpc_chttp2_executor_action_header *pending_actions_head;
+ grpc_chttp2_executor_action_header *pending_actions_tail;
} executor;
/** is the transport destroying itself? */
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index e827a43f7a..2995066e51 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -79,9 +79,12 @@ void grpc_chttp2_prepare_to_read(
GPR_TIMER_BEGIN("grpc_chttp2_prepare_to_read", 0);
transport_parsing->next_stream_id = transport_global->next_stream_id;
- transport_parsing->last_sent_max_table_size =
- transport_global->settings[GRPC_SENT_SETTINGS]
- [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE];
+ memcpy(transport_parsing->last_sent_settings,
+ transport_global->settings[GRPC_SENT_SETTINGS],
+ sizeof(transport_parsing->last_sent_settings));
+ transport_parsing->max_frame_size =
+ transport_global->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
/* update the parsing view of incoming window */
while (grpc_chttp2_list_pop_unannounced_incoming_window_available(
@@ -388,6 +391,12 @@ int grpc_chttp2_perform_read(grpc_exec_ctx *exec_ctx,
return 1;
}
goto dts_fh_0; /* loop */
+ } else if (transport_parsing->incoming_frame_size >
+ transport_parsing->max_frame_size) {
+ gpr_log(GPR_DEBUG, "Frame size %d is larger than max frame size %d",
+ transport_parsing->incoming_frame_size,
+ transport_parsing->max_frame_size);
+ return 0;
}
if (++cur == end) {
return 1;
@@ -840,7 +849,11 @@ static int init_settings_frame_parser(
transport_parsing->settings_ack_received = 1;
grpc_chttp2_hptbl_set_max_bytes(
&transport_parsing->hpack_parser.table,
- transport_parsing->last_sent_max_table_size);
+ transport_parsing
+ ->last_sent_settings[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
+ transport_parsing->max_frame_size =
+ transport_parsing
+ ->last_sent_settings[GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
}
transport_parsing->parser = grpc_chttp2_settings_parser_parse;
transport_parsing->parser_data = &transport_parsing->simple.settings;
diff --git a/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
new file mode 100644
index 0000000000..df1acddcc0
--- /dev/null
+++ b/src/core/ext/transport/cronet/client/secure/cronet_channel_create.c
@@ -0,0 +1,69 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <grpc/impl/codegen/port_platform.h>
+
+#include <stdio.h>
+#include <string.h>
+
+#include <grpc/support/alloc.h>
+#include <grpc/support/log.h>
+
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/transport_impl.h"
+
+// Cronet transport object
+typedef struct cronet_transport {
+ grpc_transport base; // must be first element in this structure
+ void *engine;
+ char *host;
+} cronet_transport;
+
+extern grpc_transport_vtable grpc_cronet_vtable;
+
+GRPCAPI grpc_channel *grpc_cronet_secure_channel_create(
+ void *engine, const char *target, const grpc_channel_args *args,
+ void *reserved) {
+ cronet_transport *ct = gpr_malloc(sizeof(cronet_transport));
+ ct->base.vtable = &grpc_cronet_vtable;
+ ct->engine = engine;
+ ct->host = gpr_malloc(strlen(target) + 1);
+ strcpy(ct->host, target);
+ gpr_log(GPR_DEBUG,
+ "grpc_create_cronet_transport: cronet_engine = %p, target=%s", engine,
+ ct->host);
+
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ return grpc_channel_create(&exec_ctx, target, args,
+ GRPC_CLIENT_DIRECT_CHANNEL, (grpc_transport *)ct);
+}
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
new file mode 100644
index 0000000000..687026c9fd
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/* This file has empty implementation of all the functions exposed by the cronet
+library, so we can build it in all environments */
+
+#include <stdbool.h>
+
+#include <grpc/support/log.h>
+
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#ifdef GRPC_COMPILE_WITH_CRONET
+/* link with the real CRONET library in the build system */
+#else
+/* Dummy implementation of cronet API just to test for build-ability */
+cronet_bidirectional_stream* cronet_bidirectional_stream_create(
+ cronet_engine* engine, void* annotation,
+ cronet_bidirectional_stream_callback* callback) {
+ GPR_ASSERT(0);
+ return NULL;
+}
+
+int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_start(
+ cronet_bidirectional_stream* stream, const char* url, int priority,
+ const char* method, const cronet_bidirectional_stream_header_array* headers,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream,
+ char* buffer, int capacity) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream,
+ const char* buffer, int count,
+ bool end_of_stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) {
+ GPR_ASSERT(0);
+ return 0;
+}
+
+#endif /* GRPC_COMPILE_WITH_CRONET */
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
new file mode 100644
index 0000000000..5bb085195c
--- /dev/null
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -0,0 +1,640 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <string.h>
+
+#include <grpc/impl/codegen/port_platform.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/host_port.h>
+#include <grpc/support/log.h>
+#include <grpc/support/slice_buffer.h>
+#include <grpc/support/string_util.h>
+#include <grpc/support/useful.h>
+
+#include "src/core/ext/transport/chttp2/transport/incoming_metadata.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
+#include "src/core/lib/support/string.h"
+#include "src/core/lib/surface/channel.h"
+#include "src/core/lib/transport/metadata_batch.h"
+#include "src/core/lib/transport/transport_impl.h"
+#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
+
+#define GRPC_HEADER_SIZE_IN_BYTES 5
+
+// Global flag that gets set with GRPC_TRACE env variable
+int grpc_cronet_trace = 1;
+
+// Cronet transport object
+struct grpc_cronet_transport {
+ grpc_transport base; /* must be first element in this structure */
+ cronet_engine *engine;
+ char *host;
+};
+
+typedef struct grpc_cronet_transport grpc_cronet_transport;
+
+enum send_state {
+ CRONET_SEND_IDLE = 0,
+ CRONET_REQ_STARTED,
+ CRONET_SEND_HEADER,
+ CRONET_WRITE,
+ CRONET_WRITE_COMPLETED,
+};
+
+enum recv_state {
+ CRONET_RECV_IDLE = 0,
+ CRONET_RECV_READ_LENGTH,
+ CRONET_RECV_READ_DATA,
+ CRONET_RECV_CLOSED,
+};
+
+static const char *recv_state_name[] = {
+ "CRONET_RECV_IDLE", "CRONET_RECV_READ_LENGTH", "CRONET_RECV_READ_DATA,",
+ "CRONET_RECV_CLOSED"};
+
+// Enum that identifies calling function.
+enum e_caller {
+ PERFORM_STREAM_OP,
+ ON_READ_COMPLETE,
+ ON_RESPONSE_HEADERS_RECEIVED,
+ ON_RESPONSE_TRAILERS_RECEIVED
+};
+
+enum callback_id {
+ CB_SEND_INITIAL_METADATA = 0,
+ CB_SEND_MESSAGE,
+ CB_SEND_TRAILING_METADATA,
+ CB_RECV_MESSAGE,
+ CB_RECV_INITIAL_METADATA,
+ CB_RECV_TRAILING_METADATA,
+ CB_NUM_CALLBACKS
+};
+
+struct stream_obj {
+ // we store received bytes here as they trickle in.
+ gpr_slice_buffer write_slice_buffer;
+ cronet_bidirectional_stream *cbs;
+ gpr_slice slice;
+ gpr_slice_buffer read_slice_buffer;
+ struct grpc_slice_buffer_stream sbs;
+ char *read_buffer;
+ int remaining_read_bytes;
+ int total_read_bytes;
+
+ char *write_buffer;
+ size_t write_buffer_size;
+
+ // Hold the URL
+ char *url;
+
+ bool response_headers_received;
+ bool read_requested;
+ bool response_trailers_received;
+ bool read_closed;
+
+ // Recv message stuff
+ grpc_byte_buffer **recv_message;
+ // Initial metadata stuff
+ grpc_metadata_batch *recv_initial_metadata;
+ // Trailing metadata stuff
+ grpc_metadata_batch *recv_trailing_metadata;
+ grpc_chttp2_incoming_metadata_buffer imb;
+
+ // This mutex protects receive state machine execution
+ gpr_mu recv_mu;
+ // we can queue up up to 2 callbacks for each OP
+ grpc_closure *callback_list[CB_NUM_CALLBACKS][2];
+
+ // storage for header
+ cronet_bidirectional_stream_header *headers;
+ uint32_t num_headers;
+ cronet_bidirectional_stream_header_array header_array;
+ // state tracking
+ enum recv_state cronet_recv_state;
+ enum send_state cronet_send_state;
+};
+
+typedef struct stream_obj stream_obj;
+
+static void next_send_step(stream_obj *s);
+static void next_recv_step(stream_obj *s, enum e_caller caller);
+
+static void set_pollset_do_nothing(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_pollset *pollset) {}
+
+static void enqueue_callbacks(grpc_closure *callback_list[]) {
+ grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
+ if (callback_list[0]) {
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[0], true, NULL);
+ callback_list[0] = NULL;
+ }
+ if (callback_list[1]) {
+ grpc_exec_ctx_enqueue(&exec_ctx, callback_list[1], true, NULL);
+ callback_list[1] = NULL;
+ }
+ grpc_exec_ctx_finish(&exec_ctx);
+}
+
+static void on_canceled(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_canceled %p", stream);
+ }
+}
+
+static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_failed %p, error = %d", stream, net_error);
+ }
+}
+
+static void on_succeeded(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "on_succeeded %p", stream);
+ }
+}
+
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *trailers) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_trailers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+
+ memset(&s->imb, 0, sizeof(s->imb));
+ grpc_chttp2_incoming_metadata_buffer_init(&s->imb);
+ unsigned int i = 0;
+ for (i = 0; i < trailers->count; i++) {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &s->imb, grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(trailers->headers[i].key),
+ grpc_mdstr_from_string(trailers->headers[i].value)));
+ }
+ s->response_trailers_received = true;
+ next_recv_step(s, ON_RESPONSE_TRAILERS_RECEIVED);
+}
+
+static void on_write_completed(cronet_bidirectional_stream *stream,
+ const char *data) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_write_completed");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_MESSAGE]);
+ s->cronet_send_state = CRONET_WRITE_COMPLETED;
+ next_send_step(s);
+}
+
+static void process_recv_message(stream_obj *s, const uint8_t *recv_data) {
+ gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)s->total_read_bytes);
+ uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
+ memcpy(dst_p, recv_data, (size_t)s->total_read_bytes);
+ gpr_slice_buffer_add(&s->read_slice_buffer, read_data_slice);
+ grpc_slice_buffer_stream_init(&s->sbs, &s->read_slice_buffer, 0);
+ *s->recv_message = (grpc_byte_buffer *)&s->sbs;
+}
+
+static int parse_grpc_header(const uint8_t *data) {
+ const uint8_t *p = data + 1;
+ int length = 0;
+ length |= ((uint8_t)*p++) << 24;
+ length |= ((uint8_t)*p++) << 16;
+ length |= ((uint8_t)*p++) << 8;
+ length |= ((uint8_t)*p++);
+ return length;
+}
+
+static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
+ int count) {
+ stream_obj *s = (stream_obj *)stream->annotation;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_read_completed count=%d, total=%d, remaining=%d",
+ count, s->total_read_bytes, s->remaining_read_bytes);
+ }
+ if (count > 0) {
+ GPR_ASSERT(s->recv_message);
+ s->remaining_read_bytes -= count;
+ next_recv_step(s, ON_READ_COMPLETE);
+ } else {
+ s->read_closed = true;
+ next_recv_step(s, ON_READ_COMPLETE);
+ }
+}
+
+static void on_response_headers_received(
+ cronet_bidirectional_stream *stream,
+ const cronet_bidirectional_stream_header_array *headers,
+ const char *negotiated_protocol) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: on_response_headers_received");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_RECV_INITIAL_METADATA]);
+ s->response_headers_received = true;
+ next_recv_step(s, ON_RESPONSE_HEADERS_RECEIVED);
+}
+
+static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: on_request_headers_sent");
+ }
+ stream_obj *s = (stream_obj *)stream->annotation;
+ enqueue_callbacks(s->callback_list[CB_SEND_INITIAL_METADATA]);
+ s->cronet_send_state = CRONET_SEND_HEADER;
+ next_send_step(s);
+}
+
+// Callback function pointers (invoked by cronet in response to events)
+static cronet_bidirectional_stream_callback callbacks = {
+ on_request_headers_sent,
+ on_response_headers_received,
+ on_read_completed,
+ on_write_completed,
+ on_response_trailers_received,
+ on_succeeded,
+ on_failed,
+ on_canceled};
+
+static void invoke_closing_callback(stream_obj *s) {
+ grpc_chttp2_incoming_metadata_buffer_publish(&s->imb,
+ s->recv_trailing_metadata);
+ if (s->callback_list[CB_RECV_TRAILING_METADATA]) {
+ enqueue_callbacks(s->callback_list[CB_RECV_TRAILING_METADATA]);
+ }
+}
+
+static void set_recv_state(stream_obj *s, enum recv_state state) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "next_state = %s", recv_state_name[state]);
+ }
+ s->cronet_recv_state = state;
+}
+
+// This is invoked from perform_stream_op, and all on_xxxx callbacks.
+static void next_recv_step(stream_obj *s, enum e_caller caller) {
+ gpr_mu_lock(&s->recv_mu);
+ switch (s->cronet_recv_state) {
+ case CRONET_RECV_IDLE:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_IDLE");
+ }
+ if (caller == PERFORM_STREAM_OP ||
+ caller == ON_RESPONSE_HEADERS_RECEIVED) {
+ if (s->read_closed && s->response_trailers_received) {
+ invoke_closing_callback(s);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else if (s->response_headers_received == true &&
+ s->read_requested == true) {
+ set_recv_state(s, CRONET_RECV_READ_LENGTH);
+ s->total_read_bytes = s->remaining_read_bytes =
+ GRPC_HEADER_SIZE_IN_BYTES;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_LENGTH:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_LENGTH");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->read_closed) {
+ invoke_closing_callback(s);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ set_recv_state(s, CRONET_RECV_CLOSED);
+ } else {
+ GPR_ASSERT(s->remaining_read_bytes == 0);
+ set_recv_state(s, CRONET_RECV_READ_DATA);
+ s->total_read_bytes = s->remaining_read_bytes =
+ parse_grpc_header((const uint8_t *)s->read_buffer);
+ s->read_buffer =
+ gpr_realloc(s->read_buffer, (uint32_t)s->remaining_read_bytes);
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(s->cbs, (char *)s->read_buffer,
+ s->remaining_read_bytes);
+ }
+ }
+ break;
+ case CRONET_RECV_READ_DATA:
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_recv_state = CRONET_RECV_READ_DATA");
+ }
+ if (caller == ON_READ_COMPLETE) {
+ if (s->remaining_read_bytes > 0) {
+ int offset = s->total_read_bytes - s->remaining_read_bytes;
+ GPR_ASSERT(s->read_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "R: cronet_bidirectional_stream_read()");
+ }
+ cronet_bidirectional_stream_read(
+ s->cbs, (char *)s->read_buffer + offset, s->remaining_read_bytes);
+ } else {
+ gpr_slice_buffer_init(&s->read_slice_buffer);
+ uint8_t *p = (uint8_t *)s->read_buffer;
+ process_recv_message(s, p);
+ set_recv_state(s, CRONET_RECV_IDLE);
+ enqueue_callbacks(s->callback_list[CB_RECV_MESSAGE]);
+ }
+ }
+ break;
+ case CRONET_RECV_CLOSED:
+ break;
+ default:
+ GPR_ASSERT(0); // Should not reach here
+ break;
+ }
+ gpr_mu_unlock(&s->recv_mu);
+}
+
+// This function takes the data from s->write_slice_buffer and assembles into
+// a contiguous byte stream with 5 byte gRPC header prepended.
+static void create_grpc_frame(stream_obj *s) {
+ gpr_slice slice = gpr_slice_buffer_take_first(&s->write_slice_buffer);
+ uint8_t *raw_data = GPR_SLICE_START_PTR(slice);
+ size_t length = GPR_SLICE_LENGTH(slice);
+ s->write_buffer_size = length + GRPC_HEADER_SIZE_IN_BYTES;
+ s->write_buffer = gpr_realloc(s->write_buffer, s->write_buffer_size);
+ uint8_t *p = (uint8_t *)s->write_buffer;
+ // Append 5 byte header
+ *p++ = 0;
+ *p++ = (uint8_t)(length >> 24);
+ *p++ = (uint8_t)(length >> 16);
+ *p++ = (uint8_t)(length >> 8);
+ *p++ = (uint8_t)(length);
+ // append actual data
+ memcpy(p, raw_data, length);
+}
+
+static void do_write(stream_obj *s) {
+ gpr_slice_buffer *sb = &s->write_slice_buffer;
+ GPR_ASSERT(sb->count <= 1);
+ if (sb->count > 0) {
+ create_grpc_frame(s);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, s->write_buffer,
+ (int)s->write_buffer_size, false);
+ }
+}
+
+//
+static void next_send_step(stream_obj *s) {
+ switch (s->cronet_send_state) {
+ case CRONET_SEND_IDLE:
+ GPR_ASSERT(
+ s->cbs); // cronet_bidirectional_stream is not initialized yet.
+ s->cronet_send_state = CRONET_REQ_STARTED;
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_start to %s", s->url);
+ }
+ cronet_bidirectional_stream_start(s->cbs, s->url, 0, "POST",
+ &s->header_array, false);
+ // we no longer need the memory that was allocated earlier.
+ gpr_free(s->header_array.headers);
+ break;
+ case CRONET_SEND_HEADER:
+ do_write(s);
+ s->cronet_send_state = CRONET_WRITE;
+ break;
+ case CRONET_WRITE_COMPLETED:
+ do_write(s);
+ break;
+ default:
+ GPR_ASSERT(0);
+ break;
+ }
+}
+
+static void convert_metadata_to_cronet_headers(grpc_linked_mdelem *head,
+ const char *host,
+ stream_obj *s) {
+ grpc_linked_mdelem *curr = head;
+ // Walk the linked list and get number of header fields
+ uint32_t num_headers_available = 0;
+ while (curr != NULL) {
+ curr = curr->next;
+ num_headers_available++;
+ }
+ // Allocate enough memory
+ s->headers = (cronet_bidirectional_stream_header *)gpr_malloc(
+ sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+
+ // Walk the linked list again, this time copying the header fields.
+ // s->num_headers
+ // can be less than num_headers_available, as some headers are not used for
+ // cronet
+ curr = head;
+ s->num_headers = 0;
+ while (s->num_headers < num_headers_available) {
+ grpc_mdelem *mdelem = curr->md;
+ curr = curr->next;
+ const char *key = grpc_mdstr_as_c_string(mdelem->key);
+ const char *value = grpc_mdstr_as_c_string(mdelem->value);
+ if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
+ strcmp(key, ":authority") == 0) {
+ // Cronet populates these fields on its own.
+ continue;
+ }
+ if (strcmp(key, ":path") == 0) {
+ // Create URL by appending :path value to the hostname
+ gpr_asprintf(&s->url, "https://%s%s", host, value);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "extracted URL = %s", s->url);
+ }
+ continue;
+ }
+ s->headers[s->num_headers].key = key;
+ s->headers[s->num_headers].value = value;
+ s->num_headers++;
+ if (curr == NULL) {
+ break;
+ }
+ }
+}
+
+static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_transport_stream_op *op) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ GPR_ASSERT(ct->engine);
+ stream_obj *s = (stream_obj *)gs;
+ if (op->recv_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - recv_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_trailing_metadata = op->recv_trailing_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_TRAILING_METADATA][0]);
+ s->callback_list[CB_RECV_TRAILING_METADATA][0] = op->on_complete;
+ }
+ if (op->recv_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_message: on_complete=%p",
+ op->on_complete);
+ }
+ s->recv_message = (grpc_byte_buffer **)op->recv_message;
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_MESSAGE][1]);
+ s->callback_list[CB_RECV_MESSAGE][0] = op->recv_message_ready;
+ s->callback_list[CB_RECV_MESSAGE][1] = op->on_complete;
+ s->read_requested = true;
+ next_recv_step(s, PERFORM_STREAM_OP);
+ }
+ if (op->recv_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - recv_initial_metadata:=%p",
+ op->on_complete);
+ }
+ s->recv_initial_metadata = op->recv_initial_metadata;
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][0]);
+ GPR_ASSERT(!s->callback_list[CB_RECV_INITIAL_METADATA][1]);
+ s->callback_list[CB_RECV_INITIAL_METADATA][0] =
+ op->recv_initial_metadata_ready;
+ s->callback_list[CB_RECV_INITIAL_METADATA][1] = op->on_complete;
+ }
+ if (op->send_initial_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_initial_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ s->num_headers = 0;
+ convert_metadata_to_cronet_headers(op->send_initial_metadata->list.head,
+ ct->host, s);
+ s->header_array.count = s->num_headers;
+ s->header_array.capacity = s->num_headers;
+ s->header_array.headers = s->headers;
+ GPR_ASSERT(!s->callback_list[CB_SEND_INITIAL_METADATA][0]);
+ s->callback_list[CB_SEND_INITIAL_METADATA][0] = op->on_complete;
+ }
+ if (op->send_message) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "perform_stream_op - send_message: on_complete=%p",
+ op->on_complete);
+ }
+ grpc_byte_stream_next(exec_ctx, op->send_message, &s->slice,
+ op->send_message->length, NULL);
+ // Check that compression flag is not ON. We don't support compression yet.
+ // TODO (makdharma): add compression support
+ GPR_ASSERT(op->send_message->flags == 0);
+ gpr_slice_buffer_add(&s->write_slice_buffer, s->slice);
+ if (s->cbs == NULL) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_bidirectional_stream_create");
+ }
+ s->cbs = cronet_bidirectional_stream_create(ct->engine, s, &callbacks);
+ GPR_ASSERT(s->cbs);
+ s->read_closed = false;
+ s->response_trailers_received = false;
+ s->response_headers_received = false;
+ s->cronet_send_state = CRONET_SEND_IDLE;
+ s->cronet_recv_state = CRONET_RECV_IDLE;
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_MESSAGE][0]);
+ s->callback_list[CB_SEND_MESSAGE][0] = op->on_complete;
+ next_send_step(s);
+ }
+ if (op->send_trailing_metadata) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG,
+ "perform_stream_op - send_trailing_metadata: on_complete=%p",
+ op->on_complete);
+ }
+ GPR_ASSERT(!s->callback_list[CB_SEND_TRAILING_METADATA][0]);
+ s->callback_list[CB_SEND_TRAILING_METADATA][0] = op->on_complete;
+ if (s->cbs) {
+ // Send an "empty" write to the far end to signal that we're done.
+ // This will induce the server to send down trailers.
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "W: cronet_bidirectional_stream_write");
+ }
+ cronet_bidirectional_stream_write(s->cbs, "abc", 0, true);
+ } else {
+ // We never created a stream. This was probably an empty request.
+ invoke_closing_callback(s);
+ }
+ }
+}
+
+static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, grpc_stream_refcount *refcount,
+ const void *server_data) {
+ stream_obj *s = (stream_obj *)gs;
+ memset(s->callback_list, 0, sizeof(s->callback_list));
+ s->cbs = NULL;
+ gpr_mu_init(&s->recv_mu);
+ s->read_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ s->write_buffer = gpr_malloc(GRPC_HEADER_SIZE_IN_BYTES);
+ gpr_slice_buffer_init(&s->write_slice_buffer);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "cronet_transport - init_stream");
+ }
+ return 0;
+}
+
+static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
+ grpc_stream *gs, void *and_free_memory) {
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy stream");
+ }
+ stream_obj *s = (stream_obj *)gs;
+ s->cbs = NULL;
+ gpr_free(s->read_buffer);
+ gpr_free(s->write_buffer);
+ gpr_free(s->url);
+ gpr_mu_destroy(&s->recv_mu);
+ if (and_free_memory) {
+ gpr_free(and_free_memory);
+ }
+}
+
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
+ grpc_cronet_transport *ct = (grpc_cronet_transport *)gt;
+ gpr_free(ct->host);
+ if (grpc_cronet_trace) {
+ gpr_log(GPR_DEBUG, "Destroy transport");
+ }
+}
+
+const grpc_transport_vtable grpc_cronet_vtable = {
+ sizeof(stream_obj), "cronet_http", init_stream,
+ set_pollset_do_nothing, perform_stream_op, NULL,
+ destroy_stream, destroy_transport, NULL};
diff --git a/src/core/lib/channel/channel_args.h b/src/core/lib/channel/channel_args.h
index 0a51780a14..23c7b7b897 100644
--- a/src/core/lib/channel/channel_args.h
+++ b/src/core/lib/channel/channel_args.h
@@ -56,10 +56,6 @@ grpc_channel_args *grpc_channel_args_merge(const grpc_channel_args *a,
/** Destroy arguments created by \a grpc_channel_args_copy */
void grpc_channel_args_destroy(grpc_channel_args *a);
-/** Reads census_enabled settings from channel args. Returns 1 if census_enabled
- * is specified in channel args, otherwise returns 0. */
-int grpc_channel_args_is_census_enabled(const grpc_channel_args *a);
-
/** Returns the compression algorithm set in \a a. */
grpc_compression_algorithm grpc_channel_args_get_compression_algorithm(
const grpc_channel_args *a);
diff --git a/src/core/lib/iomgr/tcp_client_windows.c b/src/core/lib/iomgr/tcp_client_windows.c
index 7d78beb15a..66f9ff7a46 100644
--- a/src/core/lib/iomgr/tcp_client_windows.c
+++ b/src/core/lib/iomgr/tcp_client_windows.c
@@ -63,39 +63,45 @@ typedef struct {
grpc_endpoint **endpoint;
} async_connect;
-static void async_connect_unlock_and_cleanup(async_connect *ac) {
+static void async_connect_unlock_and_cleanup(async_connect *ac,
+ grpc_winsocket *socket) {
int done = (--ac->refs == 0);
gpr_mu_unlock(&ac->mu);
if (done) {
- if (ac->socket != NULL) grpc_winsocket_destroy(ac->socket);
gpr_mu_destroy(&ac->mu);
gpr_free(ac->addr_name);
gpr_free(ac);
}
+ if (socket != NULL) grpc_winsocket_destroy(socket);
}
static void on_alarm(grpc_exec_ctx *exec_ctx, void *acp, bool occured) {
async_connect *ac = acp;
gpr_mu_lock(&ac->mu);
- /* If the alarm didn't occur, it got cancelled. */
- if (ac->socket != NULL && occured) {
+ if (ac->socket != NULL) {
grpc_winsocket_shutdown(ac->socket);
}
- async_connect_unlock_and_cleanup(ac);
+ async_connect_unlock_and_cleanup(ac, ac->socket);
}
static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) {
async_connect *ac = acp;
SOCKET sock = ac->socket->socket;
grpc_endpoint **ep = ac->endpoint;
+ GPR_ASSERT(*ep == NULL);
grpc_winsocket_callback_info *info = &ac->socket->write_info;
grpc_closure *on_done = ac->on_done;
+ gpr_mu_lock(&ac->mu);
+ grpc_winsocket *socket = ac->socket;
+ ac->socket = NULL;
+ gpr_mu_unlock(&ac->mu);
+
grpc_timer_cancel(exec_ctx, &ac->alarm);
gpr_mu_lock(&ac->mu);
- if (from_iocp) {
+ if (from_iocp && socket != NULL) {
DWORD transfered_bytes = 0;
DWORD flags;
BOOL wsa_success = WSAGetOverlappedResult(sock, &info->overlapped,
@@ -107,12 +113,12 @@ static void on_connect(grpc_exec_ctx *exec_ctx, void *acp, bool from_iocp) {
ac->addr_name, utf8_message);
gpr_free(utf8_message);
} else {
- *ep = grpc_tcp_create(ac->socket, ac->addr_name);
- ac->socket = NULL;
+ *ep = grpc_tcp_create(socket, ac->addr_name);
+ socket = NULL;
}
}
- async_connect_unlock_and_cleanup(ac);
+ async_connect_unlock_and_cleanup(ac, socket);
/* If the connection was aborted, the callback was already called when
the deadline was met. */
on_done->cb(exec_ctx, on_done->cb_arg, *ep != NULL);
@@ -138,6 +144,7 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
const char *message = NULL;
char *utf8_message;
grpc_winsocket_callback_info *info;
+ int last_error;
*endpoint = NULL;
@@ -208,8 +215,10 @@ void grpc_tcp_client_connect(grpc_exec_ctx *exec_ctx, grpc_closure *on_done,
return;
failure:
- utf8_message = gpr_format_message(WSAGetLastError());
+ last_error = WSAGetLastError();
+ utf8_message = gpr_format_message(last_error);
gpr_log(GPR_ERROR, message, utf8_message);
+ gpr_log(GPR_ERROR, "last error = %d", last_error);
gpr_free(utf8_message);
if (socket != NULL) {
grpc_winsocket_destroy(socket);
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index fe954cbefb..aca76d2bb7 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -36,4 +36,4 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "0.14.0-dev"; }
+const char *grpc_version_string(void) { return "0.15.0-dev"; }
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
index 33a8f755e6..8e8d42eb29 100644
--- a/src/cpp/common/core_codegen.cc
+++ b/src/cpp/common/core_codegen.cc
@@ -48,124 +48,6 @@
#include "src/core/lib/profiling/timers.h"
-namespace {
-
-const int kGrpcBufferWriterMaxBufferLength = 8192;
-
-class GrpcBufferWriter GRPC_FINAL
- : public ::grpc::protobuf::io::ZeroCopyOutputStream {
- public:
- explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
- : block_size_(block_size), byte_count_(0), have_backup_(false) {
- *bp = grpc_raw_byte_buffer_create(NULL, 0);
- slice_buffer_ = &(*bp)->data.raw.slice_buffer;
- }
-
- ~GrpcBufferWriter() GRPC_OVERRIDE {
- if (have_backup_) {
- gpr_slice_unref(backup_slice_);
- }
- }
-
- bool Next(void** data, int* size) GRPC_OVERRIDE {
- if (have_backup_) {
- slice_ = backup_slice_;
- have_backup_ = false;
- } else {
- slice_ = gpr_slice_malloc(block_size_);
- }
- *data = GPR_SLICE_START_PTR(slice_);
- // On win x64, int is only 32bit
- GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
- byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
- gpr_slice_buffer_add(slice_buffer_, slice_);
- return true;
- }
-
- void BackUp(int count) GRPC_OVERRIDE {
- gpr_slice_buffer_pop(slice_buffer_);
- if (count == block_size_) {
- backup_slice_ = slice_;
- } else {
- backup_slice_ =
- gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count);
- gpr_slice_buffer_add(slice_buffer_, slice_);
- }
- have_backup_ = true;
- byte_count_ -= count;
- }
-
- grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
-
- private:
- const int block_size_;
- int64_t byte_count_;
- gpr_slice_buffer* slice_buffer_;
- bool have_backup_;
- gpr_slice backup_slice_;
- gpr_slice slice_;
-};
-
-class GrpcBufferReader GRPC_FINAL
- : public ::grpc::protobuf::io::ZeroCopyInputStream {
- public:
- explicit GrpcBufferReader(grpc_byte_buffer* buffer)
- : byte_count_(0), backup_count_(0) {
- grpc_byte_buffer_reader_init(&reader_, buffer);
- }
- ~GrpcBufferReader() GRPC_OVERRIDE {
- grpc_byte_buffer_reader_destroy(&reader_);
- }
-
- bool Next(const void** data, int* size) GRPC_OVERRIDE {
- if (backup_count_ > 0) {
- *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
- backup_count_;
- GPR_ASSERT(backup_count_ <= INT_MAX);
- *size = (int)backup_count_;
- backup_count_ = 0;
- return true;
- }
- if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) {
- return false;
- }
- gpr_slice_unref(slice_);
- *data = GPR_SLICE_START_PTR(slice_);
- // On win x64, int is only 32bit
- GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
- byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
- return true;
- }
-
- void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
-
- bool Skip(int count) GRPC_OVERRIDE {
- const void* data;
- int size;
- while (Next(&data, &size)) {
- if (size >= count) {
- BackUp(size - count);
- return true;
- }
- // size < count;
- count -= size;
- }
- // error or we have too large count;
- return false;
- }
-
- grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE {
- return byte_count_ - backup_count_;
- }
-
- private:
- int64_t byte_count_;
- int64_t backup_count_;
- grpc_byte_buffer_reader reader_;
- gpr_slice slice_;
-};
-} // namespace
-
namespace grpc {
grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
@@ -192,6 +74,44 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
::grpc_byte_buffer_destroy(bb);
}
+void CoreCodegen::grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
+ grpc_byte_buffer* buffer) {
+ ::grpc_byte_buffer_reader_init(reader, buffer);
+}
+
+void CoreCodegen::grpc_byte_buffer_reader_destroy(
+ grpc_byte_buffer_reader* reader) {
+ ::grpc_byte_buffer_reader_destroy(reader);
+}
+
+int CoreCodegen::grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
+ gpr_slice* slice) {
+ return ::grpc_byte_buffer_reader_next(reader, slice);
+}
+
+grpc_byte_buffer* CoreCodegen::grpc_raw_byte_buffer_create(gpr_slice* slice,
+ size_t nslices) {
+ return ::grpc_raw_byte_buffer_create(slice, nslices);
+}
+
+gpr_slice CoreCodegen::gpr_slice_malloc(size_t length) {
+ return ::gpr_slice_malloc(length);
+}
+
+void CoreCodegen::gpr_slice_unref(gpr_slice slice) { ::gpr_slice_unref(slice); }
+
+gpr_slice CoreCodegen::gpr_slice_split_tail(gpr_slice* s, size_t split) {
+ return ::gpr_slice_split_tail(s, split);
+}
+
+void CoreCodegen::gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) {
+ ::gpr_slice_buffer_add(sb, slice);
+}
+
+void CoreCodegen::gpr_slice_buffer_pop(gpr_slice_buffer* sb) {
+ ::gpr_slice_buffer_pop(sb);
+}
+
void CoreCodegen::grpc_metadata_array_init(grpc_metadata_array* array) {
::grpc_metadata_array_init(array);
}
@@ -200,6 +120,10 @@ void CoreCodegen::grpc_metadata_array_destroy(grpc_metadata_array* array) {
::grpc_metadata_array_destroy(array);
}
+const Status& CoreCodegen::ok() { return grpc::Status::OK; }
+
+const Status& CoreCodegen::cancelled() { return grpc::Status::CANCELLED; }
+
gpr_timespec CoreCodegen::gpr_inf_future(gpr_clock_type type) {
return ::gpr_inf_future(type);
}
@@ -209,48 +133,4 @@ void CoreCodegen::assert_fail(const char* failed_assertion) {
abort();
}
-Status CoreCodegen::SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** bp) {
- GPR_TIMER_SCOPE("SerializeProto", 0);
- int byte_size = msg.ByteSize();
- if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
- gpr_slice slice = gpr_slice_malloc(byte_size);
- GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
- msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
- *bp = grpc_raw_byte_buffer_create(&slice, 1);
- gpr_slice_unref(slice);
- return Status::OK;
- } else {
- GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
- return msg.SerializeToZeroCopyStream(&writer)
- ? Status::OK
- : Status(StatusCode::INTERNAL, "Failed to serialize message");
- }
-}
-
-Status CoreCodegen::DeserializeProto(grpc_byte_buffer* buffer,
- grpc::protobuf::Message* msg,
- int max_message_size) {
- GPR_TIMER_SCOPE("DeserializeProto", 0);
- if (buffer == nullptr) {
- return Status(StatusCode::INTERNAL, "No payload");
- }
- Status result = Status::OK;
- {
- GrpcBufferReader reader(buffer);
- ::grpc::protobuf::io::CodedInputStream decoder(&reader);
- if (max_message_size > 0) {
- decoder.SetTotalBytesLimit(max_message_size, max_message_size);
- }
- if (!msg->ParseFromCodedStream(&decoder)) {
- result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
- }
- if (!decoder.ConsumedEntireMessage()) {
- result = Status(StatusCode::INTERNAL, "Did not read entire message");
- }
- }
- grpc_byte_buffer_destroy(buffer);
- return result;
-}
-
} // namespace grpc
diff --git a/src/cpp/common/core_codegen.h b/src/cpp/common/core_codegen.h
index e15cb4c34a..656b11e7e7 100644
--- a/src/cpp/common/core_codegen.h
+++ b/src/cpp/common/core_codegen.h
@@ -42,13 +42,6 @@ namespace grpc {
/// Implementation of the core codegen interface.
class CoreCodegen : public CoreCodegenInterface {
private:
- Status SerializeProto(const grpc::protobuf::Message& msg,
- grpc_byte_buffer** bp) override;
-
- Status DeserializeProto(grpc_byte_buffer* buffer,
- grpc::protobuf::Message* msg,
- int max_message_size) override;
-
grpc_completion_queue* grpc_completion_queue_create(void* reserved) override;
void grpc_completion_queue_destroy(grpc_completion_queue* cq) override;
grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
@@ -60,11 +53,30 @@ class CoreCodegen : public CoreCodegenInterface {
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
+ void grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
+ grpc_byte_buffer* buffer) override;
+ void grpc_byte_buffer_reader_destroy(
+ grpc_byte_buffer_reader* reader) override;
+ int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader* reader,
+ gpr_slice* slice) override;
+
+ grpc_byte_buffer* grpc_raw_byte_buffer_create(gpr_slice* slice,
+ size_t nslices) override;
+
+ gpr_slice gpr_slice_malloc(size_t length) override;
+ void gpr_slice_unref(gpr_slice slice) override;
+ gpr_slice gpr_slice_split_tail(gpr_slice* s, size_t split) override;
+ void gpr_slice_buffer_add(gpr_slice_buffer* sb, gpr_slice slice) override;
+ void gpr_slice_buffer_pop(gpr_slice_buffer* sb) override;
+
void grpc_metadata_array_init(grpc_metadata_array* array) override;
void grpc_metadata_array_destroy(grpc_metadata_array* array) override;
gpr_timespec gpr_inf_future(gpr_clock_type type) override;
+ virtual const Status& ok() override;
+ virtual const Status& cancelled() override;
+
void assert_fail(const char* failed_assertion) override;
};
diff --git a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
index 6c13a4fa48..d92addbf54 100644
--- a/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/ClientServerTest.cs
@@ -167,6 +167,37 @@ namespace Grpc.Core.Tests
}
[Test]
+ public async Task ServerStreamingCall_EndOfStreamIsIdempotent()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ Assert.IsFalse(await call.ResponseStream.MoveNext());
+ }
+
+ [Test]
+ public async Task ServerStreamingCall_ErrorCanBeAwaitedTwice()
+ {
+ helper.ServerStreamingHandler = new ServerStreamingServerMethod<string, string>(async (request, responseStream, context) =>
+ {
+ context.Status = new Status(StatusCode.InvalidArgument, "");
+ });
+
+ var call = Calls.AsyncServerStreamingCall(helper.CreateServerStreamingCall(), "");
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.InvalidArgument, ex.Status.StatusCode);
+
+ // attempting MoveNext again should result in throwing the same exception.
+ var ex2 = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
+ Assert.AreEqual(StatusCode.InvalidArgument, ex2.Status.StatusCode);
+ }
+
+ [Test]
public async Task DuplexStreamingCall()
{
helper.DuplexStreamingHandler = new DuplexStreamingServerMethod<string, string>(async (requestStream, responseStream, context) =>
@@ -209,6 +240,38 @@ namespace Grpc.Core.Tests
}
[Test]
+ public async Task ClientStreamingCall_ServerSideReadAfterCancelNotificationReturnsNull()
+ {
+ var handlerStartedBarrier = new TaskCompletionSource<object>();
+ var cancelNotificationReceivedBarrier = new TaskCompletionSource<object>();
+ var successTcs = new TaskCompletionSource<string>();
+
+ helper.ClientStreamingHandler = new ClientStreamingServerMethod<string, string>(async (requestStream, context) =>
+ {
+ handlerStartedBarrier.SetResult(null);
+
+ // wait for cancellation to be delivered.
+ context.CancellationToken.Register(() => cancelNotificationReceivedBarrier.SetResult(null));
+ await cancelNotificationReceivedBarrier.Task;
+
+ var moveNextResult = await requestStream.MoveNext();
+ successTcs.SetResult(!moveNextResult ? "SUCCESS" : "FAIL");
+ return "";
+ });
+
+ var cts = new CancellationTokenSource();
+ var call = Calls.AsyncClientStreamingCall(helper.CreateClientStreamingCall(new CallOptions(cancellationToken: cts.Token)));
+
+ await handlerStartedBarrier.Task;
+ cts.Cancel();
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseAsync);
+ Assert.AreEqual(StatusCode.Cancelled, ex.Status.StatusCode);
+
+ Assert.AreEqual("SUCCESS", await successTcs.Task);
+ }
+
+ [Test]
public async Task AsyncUnaryCall_EchoMetadata()
{
helper.UnaryHandler = new UnaryServerMethod<string, string>(async (request, context) =>
diff --git a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
index 0cd059c232..47131fc454 100644
--- a/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
+++ b/src/csharp/Grpc.Core.Tests/Grpc.Core.Tests.csproj
@@ -84,6 +84,8 @@
<Compile Include="SanityTest.cs" />
<Compile Include="HalfcloseTest.cs" />
<Compile Include="NUnitMain.cs" />
+ <Compile Include="Internal\FakeNativeCall.cs" />
+ <Compile Include="Internal\AsyncCallServerTest.cs" />
</ItemGroup>
<Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
<ItemGroup>
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
new file mode 100644
index 0000000000..0e204761f6
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -0,0 +1,191 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+
+using Grpc.Core.Internal;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ /// <summary>
+ /// Uses fake native call to test interaction of <c>AsyncCallServer</c> wrapping code with C core in different situations.
+ /// </summary>
+ public class AsyncCallServerTest
+ {
+ Server server;
+ FakeNativeCall fakeCall;
+ AsyncCallServer<string, string> asyncCallServer;
+
+ [SetUp]
+ public void Init()
+ {
+ var environment = GrpcEnvironment.AddRef();
+
+ // Create a fake server just so we have an instance to refer to.
+ // The server won't actually be used at all.
+ server = new Server()
+ {
+ Ports = { { "localhost", 0, ServerCredentials.Insecure } }
+ };
+ server.Start();
+
+ fakeCall = new FakeNativeCall();
+ asyncCallServer = new AsyncCallServer<string, string>(
+ Marshallers.StringMarshaller.Serializer, Marshallers.StringMarshaller.Deserializer,
+ environment,
+ server);
+ asyncCallServer.InitializeForTesting(fakeCall);
+ }
+
+ [TearDown]
+ public void Cleanup()
+ {
+ server.ShutdownAsync().Wait();
+ GrpcEnvironment.Release();
+ }
+
+ [Test]
+ public void CancelNotificationAfterStartDisposes()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void CancelNotificationAfterStartDisposesAfterPendingReadFinishes()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
+
+ var moveNextTask = requestStream.MoveNext();
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ fakeCall.ReceivedMessageHandler(true, null);
+ Assert.IsFalse(moveNextTask.Result);
+
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void ReadAfterCancelNotificationCanSucceed()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+
+ // Check that starting a read after cancel notification has been processed is legal.
+ var moveNextTask = requestStream.MoveNext();
+ Assert.IsFalse(moveNextTask.Result);
+
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void ReadCompletionFailureClosesRequestStream()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
+
+ // if a read completion's success==false, the request stream will silently finish
+ // and we rely on C core cancelling the call.
+ var moveNextTask = requestStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(false, null);
+ Assert.IsFalse(moveNextTask.Result);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void WriteAfterCancelNotificationFails()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var requestStream = new ServerRequestStream<string, string>(asyncCallServer);
+ var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+
+ // TODO(jtattermusch): should we throw a different exception type instead?
+ Assert.Throws(typeof(InvalidOperationException), () => responseStream.WriteAsync("request1"));
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void WriteCompletionFailureThrows()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
+
+ var writeTask = responseStream.WriteAsync("request1");
+ fakeCall.SendCompletionHandler(false);
+ // TODO(jtattermusch): should we throw a different exception type instead?
+ Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ [Test]
+ public void WriteAndWriteStatusCanRunConcurrently()
+ {
+ var finishedTask = asyncCallServer.ServerSideCallAsync();
+ var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
+
+ var writeTask = responseStream.WriteAsync("request1");
+ var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
+
+ fakeCall.SendCompletionHandler(true);
+ fakeCall.SendStatusFromServerHandler(true);
+
+ Assert.DoesNotThrowAsync(async () => await writeTask);
+ Assert.DoesNotThrowAsync(async () => await writeStatusTask);
+
+ fakeCall.ReceivedCloseOnServerHandler(true, cancelled: true);
+
+ AssertFinished(asyncCallServer, fakeCall, finishedTask);
+ }
+
+ static void AssertFinished(AsyncCallServer<string, string> asyncCallServer, FakeNativeCall fakeCall, Task finishedTask)
+ {
+ Assert.IsTrue(fakeCall.IsDisposed);
+ Assert.IsTrue(finishedTask.IsCompleted);
+ Assert.DoesNotThrow(() => finishedTask.Wait());
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index 60530d3250..abe9d4a2e6 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -32,6 +32,7 @@
#endregion
using System;
+using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
@@ -40,6 +41,9 @@ using NUnit.Framework;
namespace Grpc.Core.Internal.Tests
{
+ /// <summary>
+ /// Uses fake native call to test interaction of <c>AsyncCall</c> wrapping code with C core in different situations.
+ /// </summary>
public class AsyncCallTest
{
Channel channel;
@@ -64,159 +68,421 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void AsyncUnary_CompletionSuccess()
+ public void AsyncUnary_CanBeStartedOnlyOnce()
+ {
+ asyncCall.UnaryCallAsync("request1");
+ Assert.Throws(typeof(InvalidOperationException),
+ () => asyncCall.UnaryCallAsync("abc"));
+ }
+
+ [Test]
+ public void AsyncUnary_StreamingOperationsNotAllowed()
+ {
+ asyncCall.UnaryCallAsync("request1");
+ Assert.ThrowsAsync(typeof(InvalidOperationException),
+ async () => await asyncCall.ReadMessageAsync());
+ Assert.Throws(typeof(InvalidOperationException),
+ () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
+ }
+
+ [Test]
+ public void AsyncUnary_Success()
+ {
+ var resultTask = asyncCall.UnaryCallAsync("request1");
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ }
+
+ [Test]
+ public void AsyncUnary_NonSuccessStatusCode()
+ {
+ var resultTask = asyncCall.UnaryCallAsync("request1");
+ fakeCall.UnaryResponseClientHandler(true,
+ CreateClientSideStatus(StatusCode.InvalidArgument),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
+ }
+
+ [Test]
+ public void AsyncUnary_NullResponsePayload()
+ {
+ var resultTask = asyncCall.UnaryCallAsync("request1");
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ null,
+ new Metadata());
+
+ // failure to deserialize will result in InvalidArgument status.
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
+ }
+
+ [Test]
+ public void ClientStreaming_StreamingReadNotAllowed()
+ {
+ asyncCall.ClientStreamingCallAsync();
+ Assert.ThrowsAsync(typeof(InvalidOperationException),
+ async () => await asyncCall.ReadMessageAsync());
+ }
+
+ [Test]
+ public void ClientStreaming_NoRequest_Success()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ }
+
+ [Test]
+ public void ClientStreaming_NoRequest_NonSuccessStatusCode()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ fakeCall.UnaryResponseClientHandler(true,
+ CreateClientSideStatus(StatusCode.InvalidArgument),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
+ }
+
+ [Test]
+ public void ClientStreaming_MoreRequests_Success()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ var writeTask = requestStream.WriteAsync("request1");
+ fakeCall.SendCompletionHandler(true);
+ writeTask.Wait();
+
+ var writeTask2 = requestStream.WriteAsync("request2");
+ fakeCall.SendCompletionHandler(true);
+ writeTask2.Wait();
+
+ var completeTask = requestStream.CompleteAsync();
+ fakeCall.SendCompletionHandler(true);
+ completeTask.Wait();
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteFailure()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ var writeTask = requestStream.WriteAsync("request1");
+ fakeCall.SendCompletionHandler(false);
+ Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ CreateClientSideStatus(StatusCode.Internal),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterReceivingStatusFails()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
+ }
+
+ [Test]
+ public void ClientStreaming_CompleteAfterReceivingStatusSucceeds()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterCancellationRequestFails()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ asyncCall.Cancel();
+ Assert.IsTrue(fakeCall.IsCancelled);
+
+ Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+
+ fakeCall.UnaryResponseClientHandler(true,
+ CreateClientSideStatus(StatusCode.Cancelled),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
+ }
+
+ [Test]
+ public void ServerStreaming_StreamingSendNotAllowed()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ Assert.Throws(typeof(InvalidOperationException),
+ () => asyncCall.StartSendMessage("abc", new WriteFlags(), (x,y) => {}));
+ }
+
+ [Test]
+ public void ServerStreaming_NoResponse_Success1()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+ var readTask = responseStream.MoveNext();
+
+ fakeCall.ReceivedResponseHeadersHandler(true, new Metadata());
+ Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
+
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+ }
+
+ [Test]
+ public void ServerStreaming_NoResponse_Success2()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+ var readTask = responseStream.MoveNext();
+
+ // try alternative order of completions
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageHandler(true, null);
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+ }
+
+ [Test]
+ public void ServerStreaming_NoResponse_ReadFailure()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+ var readTask = responseStream.MoveNext();
+
+ fakeCall.ReceivedMessageHandler(false, null); // after a failed read, we rely on C core to deliver appropriate status code.
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Internal));
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
+ }
+
+ [Test]
+ public void ServerStreaming_MoreResponses_Success()
+ {
+ asyncCall.StartServerStreamingCall("request1");
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var readTask1 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask1.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask2 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask2.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask3 = responseStream.MoveNext();
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+ fakeCall.ReceivedMessageHandler(true, null);
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
+ }
+
+ [Test]
+ public void DuplexStreaming_NoRequestNoResponse_Success()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var writeTask1 = requestStream.CompleteAsync();
+ fakeCall.SendCompletionHandler(true);
+ Assert.DoesNotThrowAsync(async () => await writeTask1);
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+ }
+
+ [Test]
+ public void DuplexStreaming_WriteAfterReceivingStatusFails()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+
+ Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
+ }
+
+ [Test]
+ public void DuplexStreaming_CompleteAfterReceivingStatusFails()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
+
+ AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
+
+ Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
+ }
+
+ [Test]
+ public void DuplexStreaming_WriteAfterCancellationRequestFails()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ asyncCall.Cancel();
+ Assert.IsTrue(fakeCall.IsCancelled);
+ Assert.Throws(typeof(OperationCanceledException), () => requestStream.WriteAsync("request1"));
+
+ var readTask = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
+ }
+
+ [Test]
+ public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ asyncCall.Cancel();
+ Assert.IsTrue(fakeCall.IsCancelled);
+
+ var readTask1 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask1.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask2 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
+ }
+
+ [Test]
+ public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed()
+ {
+ asyncCall.StartDuplexStreamingCall();
+ var responseStream = new ClientResponseStream<string, string>(asyncCall);
+
+ var readTask1 = responseStream.MoveNext(); // initiate the read before cancel request
+ asyncCall.Cancel();
+ Assert.IsTrue(fakeCall.IsCancelled);
+
+ fakeCall.ReceivedMessageHandler(true, CreateResponsePayload());
+ Assert.IsTrue(readTask1.Result);
+ Assert.AreEqual("response1", responseStream.Current);
+
+ var readTask2 = responseStream.MoveNext();
+ fakeCall.ReceivedMessageHandler(true, null);
+ fakeCall.ReceivedStatusOnClientHandler(true, CreateClientSideStatus(StatusCode.Cancelled));
+
+ AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
+ }
+
+ ClientSideStatus CreateClientSideStatus(StatusCode statusCode)
+ {
+ return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
+ }
+
+ byte[] CreateResponsePayload()
+ {
+ return Marshallers.StringMarshaller.Serializer("response1");
+ }
+
+ static void AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask)
{
- var resultTask = asyncCall.UnaryCallAsync("abc");
- fakeCall.UnaryResponseClientHandler(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()), new byte[] { 1, 2, 3 }, new Metadata());
Assert.IsTrue(resultTask.IsCompleted);
Assert.IsTrue(fakeCall.IsDisposed);
+
Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
+ Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
+ Assert.AreEqual(0, asyncCall.GetTrailers().Count);
+ Assert.AreEqual("response1", resultTask.Result);
}
- [Test]
- public void AsyncUnary_CompletionFailure()
+ static void AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask)
{
- var resultTask = asyncCall.UnaryCallAsync("abc");
- fakeCall.UnaryResponseClientHandler(false, new ClientSideStatus(new Status(StatusCode.Internal, ""), null), new byte[] { 1, 2, 3 }, new Metadata());
+ Assert.IsTrue(moveNextTask.IsCompleted);
+ Assert.IsTrue(fakeCall.IsDisposed);
+ Assert.IsFalse(moveNextTask.Result);
+ Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
+ Assert.AreEqual(0, asyncCall.GetTrailers().Count);
+ }
+
+ static void AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode)
+ {
Assert.IsTrue(resultTask.IsCompleted);
Assert.IsTrue(fakeCall.IsDisposed);
- Assert.AreEqual(StatusCode.Internal, asyncCall.GetStatus().StatusCode);
- Assert.IsNull(asyncCall.GetTrailers());
+ Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
var ex = Assert.ThrowsAsync<RpcException>(async () => await resultTask);
- Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
- }
-
- internal class FakeNativeCall : INativeCall
- {
- public UnaryResponseClientHandler UnaryResponseClientHandler
- {
- get;
- set;
- }
-
- public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
- {
- get;
- set;
- }
-
- public ReceivedMessageHandler ReceivedMessageHandler
- {
- get;
- set;
- }
-
- public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
- {
- get;
- set;
- }
-
- public SendCompletionHandler SendCompletionHandler
- {
- get;
- set;
- }
-
- public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
- {
- get;
- set;
- }
-
- public bool IsCancelled
- {
- get;
- set;
- }
-
- public bool IsDisposed
- {
- get;
- set;
- }
-
- public void Cancel()
- {
- IsCancelled = true;
- }
-
- public void CancelWithStatus(Status status)
- {
- IsCancelled = true;
- }
-
- public string GetPeer()
- {
- return "PEER";
- }
-
- public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
- {
- UnaryResponseClientHandler = callback;
- }
-
- public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
- {
- throw new NotImplementedException();
- }
-
- public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
- {
- UnaryResponseClientHandler = callback;
- }
-
- public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
- {
- ReceivedStatusOnClientHandler = callback;
- }
-
- public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
- {
- ReceivedStatusOnClientHandler = callback;
- }
-
- public void StartReceiveMessage(ReceivedMessageHandler callback)
- {
- ReceivedMessageHandler = callback;
- }
-
- public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
- {
- ReceivedResponseHeadersHandler = callback;
- }
-
- public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
- {
- SendCompletionHandler = callback;
- }
-
- public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
- {
- SendCompletionHandler = callback;
- }
-
- public void StartSendCloseFromClient(SendCompletionHandler callback)
- {
- SendCompletionHandler = callback;
- }
-
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
- {
- SendCompletionHandler = callback;
- }
-
- public void StartServerSide(ReceivedCloseOnServerHandler callback)
- {
- ReceivedCloseOnServerHandler = callback;
- }
-
- public void Dispose()
- {
- IsDisposed = true;
- }
+ Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode);
+ Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
+ Assert.AreEqual(0, asyncCall.GetTrailers().Count);
+ }
+
+ static void AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode)
+ {
+ Assert.IsTrue(moveNextTask.IsCompleted);
+ Assert.IsTrue(fakeCall.IsDisposed);
+
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask);
+ Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode);
+ Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
+ Assert.AreEqual(0, asyncCall.GetTrailers().Count);
}
}
}
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
new file mode 100644
index 0000000000..909112a47c
--- /dev/null
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
@@ -0,0 +1,184 @@
+#region Copyright notice and license
+
+// Copyright 2015, Google Inc.
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above
+// copyright notice, this list of conditions and the following disclaimer
+// in the documentation and/or other materials provided with the
+// distribution.
+// * Neither the name of Google Inc. nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Runtime.InteropServices;
+using System.Threading.Tasks;
+
+using Grpc.Core.Internal;
+using NUnit.Framework;
+
+namespace Grpc.Core.Internal.Tests
+{
+ /// <summary>
+ /// For testing purposes.
+ /// </summary>
+ internal class FakeNativeCall : INativeCall
+ {
+ public UnaryResponseClientHandler UnaryResponseClientHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedStatusOnClientHandler ReceivedStatusOnClientHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedMessageHandler ReceivedMessageHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedResponseHeadersHandler ReceivedResponseHeadersHandler
+ {
+ get;
+ set;
+ }
+
+ public SendCompletionHandler SendCompletionHandler
+ {
+ get;
+ set;
+ }
+
+ public SendCompletionHandler SendStatusFromServerHandler
+ {
+ get;
+ set;
+ }
+
+ public ReceivedCloseOnServerHandler ReceivedCloseOnServerHandler
+ {
+ get;
+ set;
+ }
+
+ public bool IsCancelled
+ {
+ get;
+ set;
+ }
+
+ public bool IsDisposed
+ {
+ get;
+ set;
+ }
+
+ public void Cancel()
+ {
+ IsCancelled = true;
+ }
+
+ public void CancelWithStatus(Status status)
+ {
+ IsCancelled = true;
+ }
+
+ public string GetPeer()
+ {
+ return "PEER";
+ }
+
+ public void StartUnary(UnaryResponseClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ UnaryResponseClientHandler = callback;
+ }
+
+ public void StartUnary(BatchContextSafeHandle ctx, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ throw new NotImplementedException();
+ }
+
+ public void StartClientStreaming(UnaryResponseClientHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ UnaryResponseClientHandler = callback;
+ }
+
+ public void StartServerStreaming(ReceivedStatusOnClientHandler callback, byte[] payload, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags)
+ {
+ ReceivedStatusOnClientHandler = callback;
+ }
+
+ public void StartDuplexStreaming(ReceivedStatusOnClientHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ ReceivedStatusOnClientHandler = callback;
+ }
+
+ public void StartReceiveMessage(ReceivedMessageHandler callback)
+ {
+ ReceivedMessageHandler = callback;
+ }
+
+ public void StartReceiveInitialMetadata(ReceivedResponseHeadersHandler callback)
+ {
+ ReceivedResponseHeadersHandler = callback;
+ }
+
+ public void StartSendInitialMetadata(SendCompletionHandler callback, MetadataArraySafeHandle metadataArray)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartSendMessage(SendCompletionHandler callback, byte[] payload, WriteFlags writeFlags, bool sendEmptyInitialMetadata)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartSendCloseFromClient(SendCompletionHandler callback)
+ {
+ SendCompletionHandler = callback;
+ }
+
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalPayload, WriteFlags writeFlags)
+ {
+ SendStatusFromServerHandler = callback;
+ }
+
+ public void StartServerSide(ReceivedCloseOnServerHandler callback)
+ {
+ ReceivedCloseOnServerHandler = callback;
+ }
+
+ public void Dispose()
+ {
+ IsDisposed = true;
+ }
+ }
+}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index 016e1b8587..f522174bd0 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -241,11 +241,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming response. Only one pending read action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartReadMessage(AsyncCompletionDelegate<TResponse> completionDelegate)
+ public Task<TResponse> ReadMessageAsync()
{
- StartReadMessageInternal(completionDelegate);
+ return ReadMessageInternalAsync();
}
/// <summary>
@@ -409,10 +408,13 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleUnaryResponse(bool success, ClientSideStatus receivedStatus, byte[] receivedMessage, Metadata responseHeaders)
{
+ // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
+ // success will be always set to true.
+
using (Profilers.ForCurrentThread().NewScope("AsyncCall.HandleUnaryResponse"))
{
TResponse msg = default(TResponse);
- var deserializeException = success ? TryDeserialize(receivedMessage, out msg) : null;
+ var deserializeException = TryDeserialize(receivedMessage, out msg);
lock (myLock)
{
@@ -425,14 +427,13 @@ namespace Grpc.Core.Internal
finishedStatus = receivedStatus;
ReleaseResourcesIfPossible();
-
}
responseHeadersTcs.SetResult(responseHeaders);
var status = receivedStatus.Status;
- if (!success || status.StatusCode != StatusCode.OK)
+ if (status.StatusCode != StatusCode.OK)
{
unaryResponseTcs.SetException(new RpcException(status));
return;
@@ -447,6 +448,9 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinished(bool success, ClientSideStatus receivedStatus)
{
+ // NOTE: because this event is a result of batch containing GRPC_OP_RECV_STATUS_ON_CLIENT,
+ // success will be always set to true.
+
lock (myLock)
{
finished = true;
@@ -457,7 +461,7 @@ namespace Grpc.Core.Internal
var status = receivedStatus.Status;
- if (!success || status.StatusCode != StatusCode.OK)
+ if (status.StatusCode != StatusCode.OK)
{
streamingCallFinishedTcs.SetException(new RpcException(status));
return;
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index ccd047f469..42234dcac2 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -68,7 +68,8 @@ namespace Grpc.Core.Internal
protected bool cancelRequested;
protected AsyncCompletionDelegate<object> sendCompletionDelegate; // Completion of a pending send or sendclose if not null.
- protected AsyncCompletionDelegate<TRead> readCompletionDelegate; // Completion of a pending send or sendclose if not null.
+ protected TaskCompletionSource<TRead> streamingReadTcs; // Completion of a pending streaming read if not null.
+ protected TaskCompletionSource<object> sendStatusFromServerTcs;
protected bool readingDone; // True if last read (i.e. read with null payload) was already received.
protected bool halfcloseRequested; // True if send close have been initiated.
@@ -150,15 +151,25 @@ namespace Grpc.Core.Internal
/// Initiates reading a message. Only one read operation can be active at a time.
/// completionDelegate is invoked upon completion.
/// </summary>
- protected void StartReadMessageInternal(AsyncCompletionDelegate<TRead> completionDelegate)
+ protected Task<TRead> ReadMessageInternalAsync()
{
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckReadingAllowed();
+ GrpcPreconditions.CheckState(started);
+ if (readingDone)
+ {
+ // the last read that returns null or throws an exception is idempotent
+ // and maintain its state.
+ GrpcPreconditions.CheckState(streamingReadTcs != null, "Call does not support streaming reads.");
+ return streamingReadTcs.Task;
+ }
+
+ GrpcPreconditions.CheckState(streamingReadTcs == null, "Only one read can be pending at a time");
+ GrpcPreconditions.CheckState(!disposed);
call.StartReceiveMessage(HandleReadFinished);
- readCompletionDelegate = completionDelegate;
+ streamingReadTcs = new TaskCompletionSource<TRead>();
+ return streamingReadTcs.Task;
}
}
@@ -213,15 +224,6 @@ namespace Grpc.Core.Internal
GrpcPreconditions.CheckState(sendCompletionDelegate == null, "Only one write can be pending at a time");
}
- protected virtual void CheckReadingAllowed()
- {
- GrpcPreconditions.CheckState(started);
- GrpcPreconditions.CheckState(!disposed);
-
- GrpcPreconditions.CheckState(!readingDone, "Stream has already been closed.");
- GrpcPreconditions.CheckState(readCompletionDelegate == null, "Only one read can be pending at a time");
- }
-
protected void CheckNotCancelled()
{
if (cancelRequested)
@@ -322,22 +324,18 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleSendStatusFromServerFinished(bool success)
{
- AsyncCompletionDelegate<object> origCompletionDelegate = null;
lock (myLock)
{
- origCompletionDelegate = sendCompletionDelegate;
- sendCompletionDelegate = null;
-
ReleaseResourcesIfPossible();
}
if (!success)
{
- FireCompletion(origCompletionDelegate, null, new InvalidOperationException("Error sending status from server."));
+ sendStatusFromServerTcs.SetException(new InvalidOperationException("Error sending status from server."));
}
else
{
- FireCompletion(origCompletionDelegate, null, null);
+ sendStatusFromServerTcs.SetResult(null);
}
}
@@ -346,15 +344,17 @@ namespace Grpc.Core.Internal
/// </summary>
protected void HandleReadFinished(bool success, byte[] receivedMessage)
{
+ // if success == false, received message will be null. It that case we will
+ // treat this completion as the last read an rely on C core to handle the failed
+ // read (e.g. deliver approriate statusCode on the clientside).
+
TRead msg = default(TRead);
var deserializeException = (success && receivedMessage != null) ? TryDeserialize(receivedMessage, out msg) : null;
- AsyncCompletionDelegate<TRead> origCompletionDelegate = null;
+ TaskCompletionSource<TRead> origTcs = null;
lock (myLock)
{
- origCompletionDelegate = readCompletionDelegate;
- readCompletionDelegate = null;
-
+ origTcs = streamingReadTcs;
if (receivedMessage == null)
{
// This was the last read.
@@ -364,20 +364,25 @@ namespace Grpc.Core.Internal
if (deserializeException != null && IsClient)
{
readingDone = true;
+
+ // TODO(jtattermusch): it might be too late to set the status
CancelWithStatus(DeserializeResponseFailureStatus);
}
+ if (!readingDone)
+ {
+ streamingReadTcs = null;
+ }
+
ReleaseResourcesIfPossible();
}
- // TODO: handle the case when success==false
-
if (deserializeException != null && !IsClient)
{
- FireCompletion(origCompletionDelegate, default(TRead), new IOException("Failed to deserialize request message.", deserializeException));
+ origTcs.SetException(new IOException("Failed to deserialize request message.", deserializeException));
return;
}
- FireCompletion(origCompletionDelegate, msg, null);
+ origTcs.SetResult(msg);
}
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index bea2b3660c..b1566b44a7 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -65,6 +65,15 @@ namespace Grpc.Core.Internal
}
/// <summary>
+ /// Only for testing purposes.
+ /// </summary>
+ public void InitializeForTesting(INativeCall call)
+ {
+ server.AddCallReference(this);
+ InitializeInternal(call);
+ }
+
+ /// <summary>
/// Starts a server side call.
/// </summary>
public Task ServerSideCallAsync()
@@ -91,11 +100,10 @@ namespace Grpc.Core.Internal
/// <summary>
/// Receives a streaming request. Only one pending read action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
/// </summary>
- public void StartReadMessage(AsyncCompletionDelegate<TRequest> completionDelegate)
+ public Task<TRequest> ReadMessageAsync()
{
- StartReadMessageInternal(completionDelegate);
+ return ReadMessageInternalAsync();
}
/// <summary>
@@ -128,24 +136,33 @@ namespace Grpc.Core.Internal
}
/// <summary>
- /// Sends call result status, also indicating server is done with streaming responses.
- /// Only one pending send action is allowed at any given time.
- /// completionDelegate is called when the operation finishes.
+ /// Sends call result status, indicating we are done with writes.
+ /// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
- public void StartSendStatusFromServer(Status status, Metadata trailers, AsyncCompletionDelegate<object> completionDelegate)
+ public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite)
{
+ byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null;
+ var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags);
+
lock (myLock)
{
- GrpcPreconditions.CheckNotNull(completionDelegate, "Completion delegate cannot be null");
- CheckSendingAllowed(allowFinished: false);
+ GrpcPreconditions.CheckState(started);
+ GrpcPreconditions.CheckState(!disposed);
+ GrpcPreconditions.CheckState(!halfcloseRequested, "Can only send status from server once.");
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
+ call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent,
+ payload, writeFlags);
}
halfcloseRequested = true;
- readingDone = true;
- sendCompletionDelegate = completionDelegate;
+ initialMetadataSent = true;
+ sendStatusFromServerTcs = new TaskCompletionSource<object>();
+ if (optionalWrite != null)
+ {
+ streamingWritesCounter++;
+ }
+ return sendStatusFromServerTcs.Task;
}
}
@@ -174,12 +191,6 @@ namespace Grpc.Core.Internal
get { return false; }
}
- protected override void CheckReadingAllowed()
- {
- base.CheckReadingAllowed();
- GrpcPreconditions.CheckArgument(!cancelRequested);
- }
-
protected override void OnAfterReleaseResources()
{
server.RemoveCallReference(this);
@@ -190,12 +201,21 @@ namespace Grpc.Core.Internal
/// </summary>
private void HandleFinishedServerside(bool success, bool cancelled)
{
+ // NOTE: because this event is a result of batch containing GRPC_OP_RECV_CLOSE_ON_SERVER,
+ // success will be always set to true.
lock (myLock)
{
finished = true;
+ if (streamingReadTcs == null)
+ {
+ // if there's no pending read, readingDone=true will dispose now.
+ // if there is a pending read, we will dispose once that read finishes.
+ readingDone = true;
+ streamingReadTcs = new TaskCompletionSource<TRequest>();
+ streamingReadTcs.SetResult(default(TRequest));
+ }
ReleaseResourcesIfPossible();
}
- // TODO(jtattermusch): handle error
if (cancelled)
{
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 500653ba5d..244b97d4a4 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -135,13 +135,16 @@ namespace Grpc.Core.Internal
}
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalPayload, WriteFlags writeFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
+ var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
+ Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
+ optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
index d6e34a0f04..ad9423ff58 100644
--- a/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ClientResponseStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
- var taskSource = new AsyncCompletionTaskSource<TResponse>();
- call.StartReadMessage(taskSource.CompletionDelegate);
- var result = await taskSource.Task.ConfigureAwait(false);
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
if (result == null)
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
index cbef599139..cd3719cb50 100644
--- a/src/csharp/Grpc.Core/Internal/INativeCall.cs
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -78,7 +78,7 @@ namespace Grpc.Core.Internal
void StartSendCloseFromClient(SendCompletionHandler callback);
- void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, Grpc.Core.WriteFlags writeFlags);
void StartServerSide(ReceivedCloseOnServerHandler callback);
}
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index 9ee0ba3bc0..c277c73ef0 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -421,20 +421,21 @@ namespace Grpc.Core.Internal
public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
@@ -593,7 +594,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@@ -601,7 +602,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
@@ -610,7 +611,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -618,7 +619,8 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 1f83e51548..85b7a4b01e 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,29 +75,32 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
- // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
- var result = await handler(request, context).ConfigureAwait(false);
+ var response = await handler(request, context).ConfigureAwait(false);
status = context.Status;
- await responseStream.WriteAsync(result).ConfigureAwait(false);
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -136,24 +139,26 @@ namespace Grpc.Core.Internal
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
- // TODO(jtattermusch): we need to read the full stream so that native callhandle gets deallocated.
- GrpcPreconditions.CheckArgument(!await requestStream.MoveNext().ConfigureAwait(false));
await handler(request, responseStream, context).ConfigureAwait(false);
status = context.Status;
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -187,33 +192,31 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
- var result = await handler(requestStream, context).ConfigureAwait(false);
+ var response = await handler(requestStream, context).ConfigureAwait(false);
status = context.Status;
- try
- {
- await responseStream.WriteAsync(result).ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- status = Status.DefaultCancelled;
- }
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -255,16 +258,20 @@ namespace Grpc.Core.Internal
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await responseStream.WriteStatusAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -282,9 +289,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
- var responseStream = new ServerResponseStream<byte[], byte[]>(asyncCall);
-
- await responseStream.WriteStatusAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}
@@ -300,10 +305,14 @@ namespace Grpc.Core.Internal
return rpcException.Status;
}
- // TODO(jtattermusch): what is the right status code here?
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
+ public static WriteFlags GetWriteFlags(WriteOptions writeOptions)
+ {
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
where TRequest : class
where TResponse : class
diff --git a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
index e7be82c318..d76030d1ad 100644
--- a/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerRequestStream.cs
@@ -68,9 +68,7 @@ namespace Grpc.Core.Internal
{
throw new InvalidOperationException("Cancellation of individual reads is not supported.");
}
- var taskSource = new AsyncCompletionTaskSource<TRequest>();
- call.StartReadMessage(taskSource.CompletionDelegate);
- var result = await taskSource.Task.ConfigureAwait(false);
+ var result = await call.ReadMessageAsync().ConfigureAwait(false);
this.current = result;
return result != null;
}
diff --git a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
index 03e39efc02..ecfee0bfdd 100644
--- a/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerResponseStream.cs
@@ -57,13 +57,6 @@ namespace Grpc.Core.Internal
return taskSource.Task;
}
- public Task WriteStatusAsync(Status status, Metadata trailers)
- {
- var taskSource = new AsyncCompletionTaskSource<object>();
- call.StartSendStatusFromServer(status, trailers, taskSource.CompletionDelegate);
- return taskSource.Task;
- }
-
public Task WriteResponseHeadersAsync(Metadata responseHeaders)
{
var taskSource = new AsyncCompletionTaskSource<object>();
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index f7a9cb9c1c..e1609341d9 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -48,11 +48,11 @@ namespace Grpc.Core
/// <summary>
/// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies
/// </summary>
- public const string CurrentAssemblyFileVersion = "0.14.0.0";
+ public const string CurrentAssemblyFileVersion = "0.15.0.0";
/// <summary>
/// Current version of gRPC C#
/// </summary>
- public const string CurrentVersion = "0.14.0-dev";
+ public const string CurrentVersion = "0.15.0-dev";
}
}
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index 9a60be26b6..7520b0f81a 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -1,7 +1,7 @@
@rem Builds gRPC NuGet packages
@rem Current package versions
-set VERSION=0.14.0-dev
+set VERSION=0.15.0-dev
set PROTOBUF_VERSION=3.0.0-beta2
@rem Packages that depend on prerelease packages (like Google.Protobuf) need to have prerelease suffix as well.
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index aeef8a79e9..5b8ff9b819 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -715,10 +715,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
const char *status_details, grpc_metadata_array *trailing_metadata,
- int32_t send_empty_initial_metadata) {
+ int32_t send_empty_initial_metadata, const char* optional_send_buffer,
+ size_t optional_send_buffer_len, uint32_t write_flags) {
/* TODO: don't use magic number */
- grpc_op ops[2];
- size_t nops = send_empty_initial_metadata ? 2 : 1;
+ grpc_op ops[3];
+ size_t nops = 1;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@@ -731,12 +732,23 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
- ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[1].data.send_initial_metadata.count = 0;
- ops[1].data.send_initial_metadata.metadata = NULL;
- ops[1].flags = 0;
- ops[1].reserved = NULL;
-
+ if (optional_send_buffer) {
+ ops[nops].op = GRPC_OP_SEND_MESSAGE;
+ ctx->send_message = string_to_byte_buffer(optional_send_buffer,
+ optional_send_buffer_len);
+ ops[nops].data.send_message = ctx->send_message;
+ ops[nops].flags = write_flags;
+ ops[nops].reserved = NULL;
+ nops ++;
+ }
+ if (send_empty_initial_metadata) {
+ ops[nops].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[nops].data.send_initial_metadata.count = 0;
+ ops[nops].data.send_initial_metadata.metadata = NULL;
+ ops[nops].flags = 0;
+ ops[nops].reserved = NULL;
+ nops++;
+ }
return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
diff --git a/src/csharp/tests.json b/src/csharp/tests.json
index f733352a31..f6af3408d5 100644
--- a/src/csharp/tests.json
+++ b/src/csharp/tests.json
@@ -1,5 +1,6 @@
{
"Grpc.Core.Tests": [
+ "Grpc.Core.Internal.Tests.AsyncCallServerTest",
"Grpc.Core.Internal.Tests.AsyncCallTest",
"Grpc.Core.Internal.Tests.ChannelArgsSafeHandleTest",
"Grpc.Core.Internal.Tests.CompletionQueueEventTest",
diff --git a/src/node/tools/bin/protoc.js b/src/node/tools/bin/protoc.js
index 0c6d7ce017..4d50c94b0f 100755
--- a/src/node/tools/bin/protoc.js
+++ b/src/node/tools/bin/protoc.js
@@ -43,7 +43,9 @@
var path = require('path');
var execFile = require('child_process').execFile;
-var protoc = path.resolve(__dirname, 'protoc');
+var exe_ext = process.platform === 'win32' ? '.exe' : '';
+
+var protoc = path.resolve(__dirname, 'protoc' + exe_ext);
execFile(protoc, process.argv.slice(2), function(error, stdout, stderr) {
if (error) {
diff --git a/src/node/tools/bin/protoc_plugin.js b/src/node/tools/bin/protoc_plugin.js
index 0e0bb9406e..281ec0d85e 100755
--- a/src/node/tools/bin/protoc_plugin.js
+++ b/src/node/tools/bin/protoc_plugin.js
@@ -43,9 +43,11 @@
var path = require('path');
var execFile = require('child_process').execFile;
-var protoc = path.resolve(__dirname, 'grpc_node_plugin');
+var exe_ext = process.platform === 'win32' ? '.exe' : '';
-execFile(protoc, process.argv.slice(2), function(error, stdout, stderr) {
+var plugin = path.resolve(__dirname, 'grpc_node_plugin' + exe_ext);
+
+execFile(plugin, process.argv.slice(2), function(error, stdout, stderr) {
if (error) {
throw error;
}
diff --git a/src/node/tools/package.json b/src/node/tools/package.json
index d98ed0b1fc..efdfa81124 100644
--- a/src/node/tools/package.json
+++ b/src/node/tools/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc-tools",
- "version": "0.14.0-dev",
+ "version": "0.15.0-dev",
"author": "Google Inc.",
"description": "Tools for developing with gRPC on Node.js",
"homepage": "http://www.grpc.io/",
diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m
index 1847d6016f..0eb10656dd 100644
--- a/src/objective-c/GRPCClient/GRPCCall.m
+++ b/src/objective-c/GRPCClient/GRPCCall.m
@@ -136,6 +136,10 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
#pragma mark Finish
- (void)finishWithError:(NSError *)errorOrNil {
+ @synchronized(self) {
+ _state = GRXWriterStateFinished;
+ }
+
// If the call isn't retained anywhere else, it can be deallocated now.
_retainSelf = nil;
@@ -342,6 +346,10 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
#pragma mark GRXWriter implementation
- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ @synchronized(self) {
+ _state = GRXWriterStateStarted;
+ }
+
// Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
// This makes RPCs in which the call isn't externally retained possible (as long as it is started
// before being autoreleased).
@@ -375,30 +383,32 @@ NSString * const kGRPCTrailersKey = @"io.grpc.TrailersKey";
}
- (void)setState:(GRXWriterState)newState {
- // Manual transitions are only allowed from the started or paused states.
- if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
- return;
- }
-
- switch (newState) {
- case GRXWriterStateFinished:
- _state = newState;
- // Per GRXWriter's contract, setting the state to Finished manually
- // means one doesn't wish the writeable to be messaged anymore.
- [_responseWriteable cancelSilently];
- _responseWriteable = nil;
- return;
- case GRXWriterStatePaused:
- _state = newState;
+ @synchronized(self) {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
return;
- case GRXWriterStateStarted:
- if (_state == GRXWriterStatePaused) {
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
_state = newState;
- [self startNextRead];
- }
- return;
- case GRXWriterStateNotStarted:
- return;
+ // Per GRXWriter's contract, setting the state to Finished manually
+ // means one doesn't wish the writeable to be messaged anymore.
+ [_responseWriteable cancelSilently];
+ _responseWriteable = nil;
+ return;
+ case GRXWriterStatePaused:
+ _state = newState;
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ [self startNextRead];
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
}
}
diff --git a/src/objective-c/tests/InteropTests.m b/src/objective-c/tests/InteropTests.m
index 26877b1ae8..4f096b9efa 100644
--- a/src/objective-c/tests/InteropTests.m
+++ b/src/objective-c/tests/InteropTests.m
@@ -272,8 +272,14 @@
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
[expectation fulfill];
}];
+ XCTAssertEqual(call.state, GRXWriterStateNotStarted);
+
[call start];
+ XCTAssertEqual(call.state, GRXWriterStateStarted);
+
[call cancel];
+ XCTAssertEqual(call.state, GRXWriterStateFinished);
+
[self waitForExpectationsWithTimeout:1 handler:nil];
}
diff --git a/src/proto/grpc/testing/echo.proto b/src/proto/grpc/testing/echo.proto
index 0eef53a92a..c596aabfcc 100644
--- a/src/proto/grpc/testing/echo.proto
+++ b/src/proto/grpc/testing/echo.proto
@@ -45,3 +45,7 @@ service EchoTestService {
service UnimplementedService {
rpc Unimplemented(EchoRequest) returns (EchoResponse);
}
+
+// A service without any rpc defined to test coverage.
+service NoRpcService {
+}
diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst
index cb3f6b87fe..afc4fe6a37 100644
--- a/src/python/grpcio/README.rst
+++ b/src/python/grpcio/README.rst
@@ -48,6 +48,7 @@ package named :code:`python-dev`).
$ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice
$ git clone https://github.com/grpc/grpc.git $REPO_ROOT
$ cd $REPO_ROOT
+ $ git submodule update --init
# For the next two commands do `sudo pip install` if you get permission-denied errors
$ pip install -rrequirements.txt
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 7086519106..b844a14c48 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -27,4 +27,5 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+__import__('pkg_resources').declare_namespace(__name__)
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.c b/src/python/grpcio/grpc/_cython/imports.generated.c
index f0a40dbb35..09551472b5 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.c
+++ b/src/python/grpcio/grpc/_cython/imports.generated.c
@@ -125,6 +125,7 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
+grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -395,6 +396,7 @@ void pygrpc_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
+ grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/python/grpcio/grpc/_cython/imports.generated.h b/src/python/grpcio/grpc/_cython/imports.generated.h
index d5e810b7cf..54c8aaad13 100644
--- a/src/python/grpcio/grpc/_cython/imports.generated.h
+++ b/src/python/grpcio/grpc/_cython/imports.generated.h
@@ -43,6 +43,7 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -325,6 +326,9 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
+typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
+extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
+#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import
diff --git a/src/python/grpcio/grpc/beta/implementations.py b/src/python/grpcio/grpc/beta/implementations.py
index 742e94dc65..822f593323 100644
--- a/src/python/grpcio/grpc/beta/implementations.py
+++ b/src/python/grpcio/grpc/beta/implementations.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -188,12 +188,13 @@ def insecure_channel(host, port):
Args:
host: The name of the remote host to which to connect.
port: The port of the remote host to which to connect.
+ If None only the 'host' part will be used.
Returns:
A Channel to the remote host through which RPCs may be conducted.
"""
intermediary_low_channel = _intermediary_low.Channel(
- '%s:%d' % (host, port), None)
+ '%s:%d' % (host, port) if port else host, None)
return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access
@@ -203,13 +204,15 @@ def secure_channel(host, port, channel_credentials):
Args:
host: The name of the remote host to which to connect.
port: The port of the remote host to which to connect.
+ If None only the 'host' part will be used.
channel_credentials: A ChannelCredentials.
Returns:
A secure Channel to the remote host through which RPCs may be conducted.
"""
intermediary_low_channel = _intermediary_low.Channel(
- '%s:%d' % (host, port), channel_credentials._low_credentials)
+ '%s:%d' % (host, port) if port else host,
+ channel_credentials._low_credentials)
return Channel(intermediary_low_channel._internal, intermediary_low_channel) # pylint: disable=protected-access
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index dab62530aa..5314329c2c 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -222,6 +222,9 @@ CORE_SOURCE_FILES = [
'src/core/ext/client_config/uri_parser.c',
'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
+ 'src/core/ext/transport/cronet/client/secure/cronet_channel_create.c',
+ 'src/core/ext/transport/cronet/transport/cronet_api_dummy.c',
+ 'src/core/ext/transport/cronet/transport/cronet_transport.c',
'src/core/ext/lb_policy/grpclb/load_balancer_api.c',
'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',
diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py
index 873b4e2a91..0c13104d9d 100644
--- a/src/python/grpcio/grpc_version.py
+++ b/src/python/grpcio/grpc_version.py
@@ -29,4 +29,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
-VERSION='0.14.0.dev0'
+VERSION='0.15.0.dev0'
diff --git a/src/python/grpcio/tests/stress/__init__.py b/src/python/grpcio/tests/stress/__init__.py
new file mode 100644
index 0000000000..100a624dc9
--- /dev/null
+++ b/src/python/grpcio/tests/stress/__init__.py
@@ -0,0 +1,28 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/src/python/grpcio/tests/stress/client.py b/src/python/grpcio/tests/stress/client.py
new file mode 100644
index 0000000000..a733741b73
--- /dev/null
+++ b/src/python/grpcio/tests/stress/client.py
@@ -0,0 +1,132 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Entry point for running stress tests."""
+
+import argparse
+import Queue
+import threading
+
+from grpc.beta import implementations
+from src.proto.grpc.testing import metrics_pb2
+from src.proto.grpc.testing import test_pb2
+
+from tests.interop import methods
+from tests.qps import histogram
+from tests.stress import metrics_server
+from tests.stress import test_runner
+
+
+def _args():
+ parser = argparse.ArgumentParser(description='gRPC Python stress test client')
+ parser.add_argument(
+ '--server_addresses',
+ help='comma seperated list of hostname:port to run servers on',
+ default='localhost:8080', type=str)
+ parser.add_argument(
+ '--test_cases',
+ help='comma seperated list of testcase:weighting of tests to run',
+ default='large_unary:100',
+ type=str)
+ parser.add_argument(
+ '--test_duration_secs',
+ help='number of seconds to run the stress test',
+ default=-1, type=int)
+ parser.add_argument(
+ '--num_channels_per_server',
+ help='number of channels per server',
+ default=1, type=int)
+ parser.add_argument(
+ '--num_stubs_per_channel',
+ help='number of stubs to create per channel',
+ default=1, type=int)
+ parser.add_argument(
+ '--metrics_port',
+ help='the port to listen for metrics requests on',
+ default=8081, type=int)
+ return parser.parse_args()
+
+
+def _test_case_from_arg(test_case_arg):
+ for test_case in methods.TestCase:
+ if test_case_arg == test_case.value:
+ return test_case
+ else:
+ raise ValueError('No test case {}!'.format(test_case_arg))
+
+
+def _parse_weighted_test_cases(test_case_args):
+ weighted_test_cases = {}
+ for test_case_arg in test_case_args.split(','):
+ name, weight = test_case_arg.split(':', 1)
+ test_case = _test_case_from_arg(name)
+ weighted_test_cases[test_case] = int(weight)
+ return weighted_test_cases
+
+
+def run_test(args):
+ test_cases = _parse_weighted_test_cases(args.test_cases)
+ test_servers = args.server_addresses.split(',')
+ # Propagate any client exceptions with a queue
+ exception_queue = Queue.Queue()
+ stop_event = threading.Event()
+ hist = histogram.Histogram(1, 1)
+ runners = []
+
+ server = metrics_pb2.beta_create_MetricsService_server(
+ metrics_server.MetricsServer(hist))
+ server.add_insecure_port('[::]:{}'.format(args.metrics_port))
+ server.start()
+
+ for test_server in test_servers:
+ host, port = test_server.split(':', 1)
+ for _ in xrange(args.num_channels_per_server):
+ channel = implementations.insecure_channel(host, int(port))
+ for _ in xrange(args.num_stubs_per_channel):
+ stub = test_pb2.beta_create_TestService_stub(channel)
+ runner = test_runner.TestRunner(stub, test_cases, hist,
+ exception_queue, stop_event)
+ runners.append(runner)
+
+ for runner in runners:
+ runner.start()
+ try:
+ raise exception_queue.get(block=True, timeout=args.test_duration_secs)
+ except Queue.Empty:
+ # No exceptions thrown, success
+ pass
+ finally:
+ stop_event.set()
+ for runner in runners:
+ runner.join()
+ runner = None
+ server.stop(0)
+
+if __name__ == '__main__':
+ run_test(_args())
diff --git a/src/python/grpcio/tests/stress/metrics_server.py b/src/python/grpcio/tests/stress/metrics_server.py
new file mode 100644
index 0000000000..b994e4643e
--- /dev/null
+++ b/src/python/grpcio/tests/stress/metrics_server.py
@@ -0,0 +1,60 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""MetricsService for publishing stress test qps data."""
+
+import time
+
+from src.proto.grpc.testing import metrics_pb2
+
+GAUGE_NAME = 'python_overall_qps'
+
+
+class MetricsServer(metrics_pb2.BetaMetricsServiceServicer):
+
+ def __init__(self, histogram):
+ self._start_time = time.time()
+ self._histogram = histogram
+
+ def _get_qps(self):
+ count = self._histogram.get_data().count
+ delta = time.time() - self._start_time
+ self._histogram.reset()
+ self._start_time = time.time()
+ return int(count/delta)
+
+ def GetAllGauges(self, request, context):
+ qps = self._get_qps()
+ return [metrics_pb2.GaugeResponse(name=GAUGE_NAME, long_value=qps)]
+
+ def GetGauge(self, request, context):
+ if request.name != GAUGE_NAME:
+ raise Exception('Gauge {} does not exist'.format(request.name))
+ qps = self._get_qps()
+ return metrics_pb2.GaugeResponse(name=GAUGE_NAME, long_value=qps)
diff --git a/src/python/grpcio/tests/stress/test_runner.py b/src/python/grpcio/tests/stress/test_runner.py
new file mode 100644
index 0000000000..88f13727e3
--- /dev/null
+++ b/src/python/grpcio/tests/stress/test_runner.py
@@ -0,0 +1,73 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Thread that sends random weighted requests on a TestService stub."""
+
+import random
+import threading
+import time
+import traceback
+
+
+def _weighted_test_case_generator(weighted_cases):
+ weight_sum = sum(weighted_cases.itervalues())
+
+ while True:
+ val = random.uniform(0, weight_sum)
+ partial_sum = 0
+ for case in weighted_cases:
+ partial_sum += weighted_cases[case]
+ if val <= partial_sum:
+ yield case
+ break
+
+
+class TestRunner(threading.Thread):
+
+ def __init__(self, stub, test_cases, hist, exception_queue, stop_event):
+ super(TestRunner, self).__init__()
+ self._exception_queue = exception_queue
+ self._stop_event = stop_event
+ self._stub = stub
+ self._test_cases = _weighted_test_case_generator(test_cases)
+ self._histogram = hist
+
+ def run(self):
+ while not self._stop_event.is_set():
+ try:
+ test_case = next(self._test_cases)
+ start_time = time.time()
+ test_case.test_interoperability(self._stub, None)
+ end_time = time.time()
+ self._histogram.add((end_time - start_time)*1e9)
+ except Exception as e:
+ traceback.print_exc()
+ self._exception_queue.put(
+ Exception("An exception occured during test {}"
+ .format(test_case), e))
diff --git a/src/ruby/.rubocop.yml b/src/ruby/.rubocop.yml
index d13ce42655..34bb477543 100644
--- a/src/ruby/.rubocop.yml
+++ b/src/ruby/.rubocop.yml
@@ -11,10 +11,10 @@ AllCops:
- 'pb/test/**/*'
Metrics/CyclomaticComplexity:
- Max: 8
+ Max: 9
Metrics/PerceivedComplexity:
- Max: 8
+ Max: 9
Metrics/ClassLength:
Max: 250
diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb
index 82b6d313c8..6d65db8306 100644
--- a/src/ruby/ext/grpc/extconf.rb
+++ b/src/ruby/ext/grpc/extconf.rb
@@ -60,31 +60,25 @@ grpc_root = File.expand_path(File.join(File.dirname(__FILE__), '../../../..'))
grpc_config = ENV['GRPC_CONFIG'] || 'opt'
-if ENV.key?('GRPC_LIB_DIR')
- grpc_lib_dir = File.join(grpc_root, ENV['GRPC_LIB_DIR'])
-else
- grpc_lib_dir = File.join(grpc_root, 'libs', grpc_config)
-end
-
ENV['MACOSX_DEPLOYMENT_TARGET'] = '10.7'
-unless File.exist?(File.join(grpc_lib_dir, 'libgrpc.a')) or windows
- ENV['AR'] = RbConfig::CONFIG['AR'] + ' rcs'
- ENV['CC'] = RbConfig::CONFIG['CC']
- ENV['LD'] = ENV['CC']
+ENV['AR'] = RbConfig::CONFIG['AR'] + ' rcs'
+ENV['CC'] = RbConfig::CONFIG['CC']
+ENV['LD'] = ENV['CC']
- ENV['AR'] = 'libtool -o' if RUBY_PLATFORM =~ /darwin/
+ENV['AR'] = 'libtool -o' if RUBY_PLATFORM =~ /darwin/
- ENV['EMBED_OPENSSL'] = 'true'
- ENV['EMBED_ZLIB'] = 'true'
- ENV['ARCH_FLAGS'] = RbConfig::CONFIG['ARCH_FLAG']
- ENV['ARCH_FLAGS'] = '-arch i386 -arch x86_64' if RUBY_PLATFORM =~ /darwin/
- ENV['CFLAGS'] = '-DGPR_BACKWARDS_COMPATIBILITY_MODE'
+ENV['EMBED_OPENSSL'] = 'true'
+ENV['EMBED_ZLIB'] = 'true'
+ENV['ARCH_FLAGS'] = RbConfig::CONFIG['ARCH_FLAG']
+ENV['ARCH_FLAGS'] = '-arch i386 -arch x86_64' if RUBY_PLATFORM =~ /darwin/
+ENV['CFLAGS'] = '-DGPR_BACKWARDS_COMPATIBILITY_MODE'
- output_dir = File.expand_path(RbConfig::CONFIG['topdir'])
- grpc_lib_dir = File.join(output_dir, 'libs', grpc_config)
- ENV['BUILDDIR'] = output_dir
+output_dir = File.expand_path(RbConfig::CONFIG['topdir'])
+grpc_lib_dir = File.join(output_dir, 'libs', grpc_config)
+ENV['BUILDDIR'] = output_dir
+unless windows
puts 'Building internal gRPC into ' + grpc_lib_dir
system("make -j -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config}")
exit 1 unless $? == 0
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c
index cba910d832..1172691116 100644
--- a/src/ruby/ext/grpc/rb_byte_buffer.c
+++ b/src/ruby/ext/grpc/rb_byte_buffer.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_byte_buffer.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/support/slice.h>
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 48c49a21e9..1b06273af9 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_call.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c
index 38bf1f7710..79ca5b32ce 100644
--- a/src/ruby/ext/grpc/rb_call_credentials.c
+++ b/src/ruby/ext/grpc/rb_call_credentials.c
@@ -32,10 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_call_credentials.h"
-#include <ruby/ruby.h>
#include <ruby/thread.h>
#include <grpc/grpc.h>
@@ -86,11 +86,11 @@ static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args,
rb_funcall(exception_object, rb_intern("backtrace"), 0),
rb_intern("join"),
1, rb_str_new2("\n\tfrom "));
- VALUE exception_info = rb_funcall(exception_object, rb_intern("to_s"), 0);
+ VALUE rb_exception_info = rb_funcall(exception_object, rb_intern("to_s"), 0);
const char *exception_classname = rb_obj_classname(exception_object);
(void)args;
gpr_log(GPR_INFO, "Call credentials callback failed: %s: %s\n%s",
- exception_classname, StringValueCStr(exception_info),
+ exception_classname, StringValueCStr(rb_exception_info),
StringValueCStr(backtrace));
rb_hash_aset(result, rb_str_new2("metadata"), Qnil);
/* Currently only gives the exception class name. It should be possible get
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 984afad107..013321ffc8 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_channel.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
diff --git a/src/ruby/ext/grpc/rb_channel_args.c b/src/ruby/ext/grpc/rb_channel_args.c
index 2ffb8f41da..87c0e0a705 100644
--- a/src/ruby/ext/grpc/rb_channel_args.c
+++ b/src/ruby/ext/grpc/rb_channel_args.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_channel_args.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include "rb_grpc.h"
diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c
index 09bd3093a9..cbb23885aa 100644
--- a/src/ruby/ext/grpc/rb_channel_credentials.c
+++ b/src/ruby/ext/grpc/rb_channel_credentials.c
@@ -31,14 +31,13 @@
*
*/
+#include <ruby/ruby.h>
+
#include <string.h>
-#include <ruby/ruby.h>
#include "rb_grpc_imports.generated.h"
#include "rb_channel_credentials.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 2a2eee190c..4bb615f8be 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -32,10 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_completion_queue.h"
-#include <ruby/ruby.h>
#include <ruby/thread.h>
#include <grpc/grpc.h>
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 2649a1087f..9e85bbcfbf 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -32,12 +32,12 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_event_thread.h"
#include <stdbool.h>
-#include <ruby/ruby.h>
#include <ruby/thread.h>
#include <grpc/support/alloc.h>
#include <grpc/support/sync.h>
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index acb47b0055..06a07ac646 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -32,11 +32,11 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_grpc.h"
#include <math.h>
-#include <ruby/ruby.h>
#include <ruby/vm.h>
#include <sys/time.h>
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index bc43f9d36b..cebbe8c40f 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -125,6 +125,7 @@ grpc_header_key_is_legal_type grpc_header_key_is_legal_import;
grpc_header_nonbin_value_is_legal_type grpc_header_nonbin_value_is_legal_import;
grpc_is_binary_header_type grpc_is_binary_header_import;
grpc_call_error_to_string_type grpc_call_error_to_string_import;
+grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
grpc_auth_context_property_iterator_type grpc_auth_context_property_iterator_import;
grpc_auth_context_peer_identity_type grpc_auth_context_peer_identity_import;
@@ -391,6 +392,7 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_header_nonbin_value_is_legal_import = (grpc_header_nonbin_value_is_legal_type) GetProcAddress(library, "grpc_header_nonbin_value_is_legal");
grpc_is_binary_header_import = (grpc_is_binary_header_type) GetProcAddress(library, "grpc_is_binary_header");
grpc_call_error_to_string_import = (grpc_call_error_to_string_type) GetProcAddress(library, "grpc_call_error_to_string");
+ grpc_cronet_secure_channel_create_import = (grpc_cronet_secure_channel_create_type) GetProcAddress(library, "grpc_cronet_secure_channel_create");
grpc_auth_property_iterator_next_import = (grpc_auth_property_iterator_next_type) GetProcAddress(library, "grpc_auth_property_iterator_next");
grpc_auth_context_property_iterator_import = (grpc_auth_context_property_iterator_type) GetProcAddress(library, "grpc_auth_context_property_iterator");
grpc_auth_context_peer_identity_import = (grpc_auth_context_peer_identity_type) GetProcAddress(library, "grpc_auth_context_peer_identity");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index b67361ca25..d7ea6c574c 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -43,6 +43,7 @@
#include <grpc/census.h>
#include <grpc/compression.h>
#include <grpc/grpc.h>
+#include <grpc/grpc_cronet.h>
#include <grpc/grpc_security.h>
#include <grpc/impl/codegen/alloc.h>
#include <grpc/impl/codegen/byte_buffer.h>
@@ -325,6 +326,9 @@ extern grpc_is_binary_header_type grpc_is_binary_header_import;
typedef const char *(*grpc_call_error_to_string_type)(grpc_call_error error);
extern grpc_call_error_to_string_type grpc_call_error_to_string_import;
#define grpc_call_error_to_string grpc_call_error_to_string_import
+typedef grpc_channel *(*grpc_cronet_secure_channel_create_type)(void *engine, const char *target, const grpc_channel_args *args, void *reserved);
+extern grpc_cronet_secure_channel_create_type grpc_cronet_secure_channel_create_import;
+#define grpc_cronet_secure_channel_create grpc_cronet_secure_channel_create_import
typedef const grpc_auth_property *(*grpc_auth_property_iterator_next_type)(grpc_auth_property_iterator *it);
extern grpc_auth_property_iterator_next_type grpc_auth_property_iterator_next_import;
#define grpc_auth_property_iterator_next grpc_auth_property_iterator_next_import
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 96e60c6776..2b3acaaf59 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_server.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include "rb_call.h"
diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c
index b2d7280a30..3b0fb6c910 100644
--- a/src/ruby/ext/grpc/rb_server_credentials.c
+++ b/src/ruby/ext/grpc/rb_server_credentials.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_server_credentials.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 7f3a38a9f4..a0f4071adc 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -332,15 +332,13 @@ module GRPC
# the current thread to terminate it.
def run_till_terminated
GRPC.trap_signals
- stopped = false
t = Thread.new do
run
- stopped = true
end
+ t.abort_on_exception = true
wait_till_running
- loop do
+ until running_state == :stopped
sleep SIGNAL_CHECK_PERIOD
- break if stopped
break unless GRPC.handle_signals
end
stop
@@ -416,7 +414,7 @@ module GRPC
GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
- c.send_status(StatusCodes::RESOURCE_EXHAUSTED, '')
+ c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, '')
nil
end
@@ -427,7 +425,7 @@ module GRPC
GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
noop = proc { |x| x }
c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
- c.send_status(StatusCodes::UNIMPLEMENTED, '')
+ c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
nil
end
@@ -443,7 +441,12 @@ module GRPC
unless active_call.nil?
@pool.schedule(active_call) do |ac|
c, mth = ac
- rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
+ begin
+ rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
+ rescue StandardError
+ c.send_status(GRPC::Core::StatusCodes::INTERNAL,
+ 'Server handler failed')
+ end
end
end
rescue Core::CallError, RuntimeError => e
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index 67c6a5d5a1..01c8c5ac8f 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -29,5 +29,5 @@
# GRPC contains the General RPC module.
module GRPC
- VERSION = '0.14.0.dev'
+ VERSION = '0.15.0.dev'
end
diff --git a/src/ruby/tools/README.md b/src/ruby/tools/README.md
new file mode 100644
index 0000000000..e43f223c89
--- /dev/null
+++ b/src/ruby/tools/README.md
@@ -0,0 +1,12 @@
+# Ruby gRPC Tools
+
+This package distributes protoc and the Ruby gRPC protoc plugin for Windows, Linux, and Mac.
+
+Before this package is published, the following directories should be filled with the corresponding `protoc` and `grpc_ruby_plugin` executables.
+
+ - `bin/x86-linux`
+ - `bin/x86_64-linux`
+ - `bin/x86-macos`
+ - `bin/x86_64-macos`
+ - `bin/x86-windows`
+ - `bin/x86_64-windows`
diff --git a/src/ruby/tools/bin/protoc.rb b/src/ruby/tools/bin/protoc.rb
new file mode 100755
index 0000000000..3a2a5b8dc9
--- /dev/null
+++ b/src/ruby/tools/bin/protoc.rb
@@ -0,0 +1,41 @@
+#!/usr/bin/env ruby
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require 'rbconfig'
+
+require_relative '../os_check'
+
+protoc_name = 'protoc' + RbConfig::CONFIG['EXEEXT']
+
+protoc_path = File.join(File.dirname(__FILE__),
+ RbConfig::CONFIG['host_cpu'] + '-' + OS.os_name,
+ protoc_name)
+
+exec([ protoc_path, protoc_path ], *ARGV)
diff --git a/src/ruby/tools/bin/protoc_grpc_ruby_plugin.rb b/src/ruby/tools/bin/protoc_grpc_ruby_plugin.rb
new file mode 100755
index 0000000000..4b296dedc7
--- /dev/null
+++ b/src/ruby/tools/bin/protoc_grpc_ruby_plugin.rb
@@ -0,0 +1,41 @@
+#!/usr/bin/env ruby
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require 'rbconfig'
+
+require_relative '../os_check'
+
+plugin_name = 'grpc_ruby_plugin' + RbConfig::CONFIG['EXEEXT']
+
+plugin_path = File.join(File.dirname(__FILE__),
+ RbConfig::CONFIG['host_cpu'] + '-' + OS.os_name,
+ plugin_name)
+
+exec([ plugin_path, plugin_path ], *ARGV)
diff --git a/src/ruby/tools/grpc-tools.gemspec b/src/ruby/tools/grpc-tools.gemspec
new file mode 100644
index 0000000000..af904de4a9
--- /dev/null
+++ b/src/ruby/tools/grpc-tools.gemspec
@@ -0,0 +1,22 @@
+# -*- ruby -*-
+# encoding: utf-8
+require_relative 'version.rb'
+Gem::Specification.new do |s|
+ s.name = 'grpc-tools'
+ s.version = GRPC::Tools::VERSION
+ s.authors = ['grpc Authors']
+ s.email = 'grpc-io@googlegroups.com'
+ s.homepage = 'https://github.com/google/grpc/tree/master/src/ruby/tools'
+ s.summary = 'Development tools for Ruby gRPC'
+ s.description = 'protoc and the Ruby gRPC protoc plugin'
+ s.license = 'BSD-3-Clause'
+
+ s.files = %w( version.rb os_check.rb README.md )
+ s.files += Dir.glob('bin/**/*')
+
+ s.bindir = 'bin'
+
+ s.platform = Gem::Platform::RUBY
+
+ s.executables = %w( protoc.rb protoc_grpc_ruby_plugin.rb )
+end
diff --git a/src/ruby/tools/os_check.rb b/src/ruby/tools/os_check.rb
new file mode 100644
index 0000000000..2677306457
--- /dev/null
+++ b/src/ruby/tools/os_check.rb
@@ -0,0 +1,45 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+# This is based on http://stackoverflow.com/a/171011/159388 by Aaron Hinni
+
+require 'rbconfig'
+
+module OS
+ def OS.os_name
+ case RbConfig::CONFIG['host_os']
+ when /cygwin|mswin|mingw|bccwin|wince|emx/
+ 'windows'
+ when /darwin/
+ 'macos'
+ else
+ 'linux'
+ end
+ end
+end
diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb
new file mode 100644
index 0000000000..dca7fd7e72
--- /dev/null
+++ b/src/ruby/tools/version.rb
@@ -0,0 +1,34 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+module GRPC
+ module Tools
+ VERSION = '0.15.0.dev'
+ end
+end