aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/compiler/csharp_generator.cc4
-rw-r--r--src/compiler/ruby_generator.cc4
-rw-r--r--src/core/ext/client_config/channel_connectivity.c3
-rw-r--r--src/core/ext/client_config/resolver_registry.c8
-rw-r--r--src/core/ext/client_config/subchannel.c6
-rw-r--r--src/core/ext/client_config/subchannel_call_holder.c1
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.c14
-rw-r--r--src/core/ext/lb_policy/grpclb/load_balancer_api.h10
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h182
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c (renamed from src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c)59
-rw-r--r--src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h178
-rw-r--r--src/core/ext/lb_policy/round_robin/round_robin.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/chttp2_transport.c163
-rw-r--r--src/core/ext/transport/chttp2/transport/frame_rst_stream.c5
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_encoder.c6
-rw-r--r--src/core/ext/transport/chttp2/transport/hpack_table.c4
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.c1
-rw-r--r--src/core/ext/transport/chttp2/transport/incoming_metadata.h1
-rw-r--r--src/core/ext/transport/chttp2/transport/internal.h25
-rw-r--r--src/core/ext/transport/chttp2/transport/parsing.c62
-rw-r--r--src/core/lib/iomgr/tcp_posix.c4
-rw-r--r--src/core/lib/surface/completion_queue.c10
-rw-r--r--src/core/lib/surface/version.c2
-rw-r--r--src/core/lib/transport/metadata.h4
-rw-r--r--src/core/lib/transport/metadata_batch.c9
-rw-r--r--src/core/lib/transport/metadata_batch.h3
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs2
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs51
-rw-r--r--src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs3
-rw-r--r--src/csharp/Grpc.Core/GrpcEnvironment.cs34
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCall.cs15
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallBase.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/AsyncCallServer.cs12
-rw-r--r--src/csharp/Grpc.Core/Internal/CallSafeHandle.cs7
-rw-r--r--src/csharp/Grpc.Core/Internal/INativeCall.cs2
-rw-r--r--src/csharp/Grpc.Core/Internal/NativeMethods.cs18
-rw-r--r--src/csharp/Grpc.Core/Internal/ServerCallHandler.cs72
-rw-r--r--src/csharp/Grpc.Core/Server.cs24
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs4
-rw-r--r--src/csharp/Grpc.Examples/MathGrpc.cs16
-rw-r--r--src/csharp/Grpc.HealthCheck/HealthGrpc.cs4
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ClientRunners.cs23
-rw-r--r--src/csharp/Grpc.IntegrationTesting/InteropClient.cs4
-rw-r--r--src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs8
-rw-r--r--src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs24
-rw-r--r--src/csharp/Grpc.IntegrationTesting/TestGrpc.cs36
-rw-r--r--src/csharp/build_packages.bat2
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c30
-rw-r--r--src/node/test/math/math_server.js2
-rwxr-xr-xsrc/node/tools/bin/protoc.js7
-rwxr-xr-xsrc/node/tools/bin/protoc_plugin.js8
-rw-r--r--src/node/tools/package.json6
-rw-r--r--src/objective-c/BoringSSL.podspec4
-rwxr-xr-xsrc/php/tests/interop/interop_client.php259
-rw-r--r--src/php/tests/interop/metrics_client.php49
-rw-r--r--src/php/tests/interop/stress_client.php116
-rw-r--r--src/proto/grpc/lb/v0/load_balancer.options6
-rw-r--r--src/proto/grpc/lb/v1/load_balancer.options6
-rw-r--r--src/proto/grpc/lb/v1/load_balancer.proto (renamed from src/proto/grpc/lb/v0/load_balancer.proto)11
-rw-r--r--src/python/grpcio/README.rst1
-rw-r--r--src/python/grpcio/grpc/__init__.py3
-rw-r--r--src/python/grpcio/grpc_core_dependencies.py2
-rw-r--r--src/python/grpcio/grpc_version.py2
-rw-r--r--src/python/grpcio/precompiled.py114
-rw-r--r--src/python/grpcio/tests/qps/benchmark_client.py60
-rw-r--r--src/python/grpcio/tests/qps/client_runner.py2
-rw-r--r--src/python/grpcio/tests/qps/worker_server.py5
-rw-r--r--src/python/grpcio/tests/stress/client.py5
-rw-r--r--src/python/grpcio/tests/unit/_cython/_channel_test.py5
-rw-r--r--src/ruby/ext/grpc/extconf.rb8
-rw-r--r--src/ruby/ext/grpc/rb_byte_buffer.c3
-rw-r--r--src/ruby/ext/grpc/rb_call.c3
-rw-r--r--src/ruby/ext/grpc/rb_call_credentials.c6
-rw-r--r--src/ruby/ext/grpc/rb_channel.c3
-rw-r--r--src/ruby/ext/grpc/rb_channel_args.c3
-rw-r--r--src/ruby/ext/grpc/rb_channel_credentials.c5
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c2
-rw-r--r--src/ruby/ext/grpc/rb_event_thread.c2
-rw-r--r--src/ruby/ext/grpc/rb_grpc.c4
-rw-r--r--src/ruby/ext/grpc/rb_server.c3
-rw-r--r--src/ruby/ext/grpc/rb_server_credentials.c3
-rw-r--r--src/ruby/ext/grpc/rb_signal.c70
-rw-r--r--src/ruby/ext/grpc/rb_signal.h39
-rw-r--r--src/ruby/lib/grpc.rb3
-rw-r--r--src/ruby/lib/grpc/generic/active_call.rb6
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb63
-rw-r--r--src/ruby/lib/grpc/generic/service.rb16
-rw-r--r--src/ruby/lib/grpc/signals.rb69
-rw-r--r--src/ruby/lib/grpc/version.rb2
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb23
-rw-r--r--src/ruby/spec/generic/service_spec.rb69
-rwxr-xr-xsrc/ruby/tools/bin/grpc_tools_ruby_protoc.rb (renamed from src/ruby/tools/bin/protoc.rb)0
-rwxr-xr-xsrc/ruby/tools/bin/grpc_tools_ruby_protoc_plugin.rb (renamed from src/ruby/tools/bin/protoc_grpc_ruby_plugin.rb)0
-rw-r--r--src/ruby/tools/grpc-tools.gemspec2
-rw-r--r--src/ruby/tools/version.rb2
95 files changed, 1351 insertions, 898 deletions
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc
index ac0fee1ec4..29c359c539 100644
--- a/src/compiler/csharp_generator.cc
+++ b/src/compiler/csharp_generator.cc
@@ -214,10 +214,10 @@ std::string GetMethodReturnTypeServer(const MethodDescriptor *method) {
switch (GetMethodType(method)) {
case METHODTYPE_NO_STREAMING:
case METHODTYPE_CLIENT_STREAMING:
- return "Task<" + GetClassName(method->output_type()) + ">";
+ return "global::System.Threading.Tasks.Task<" + GetClassName(method->output_type()) + ">";
case METHODTYPE_SERVER_STREAMING:
case METHODTYPE_BIDI_STREAMING:
- return "Task";
+ return "global::System.Threading.Tasks.Task";
}
GOOGLE_LOG(FATAL)<< "Can't get here.";
return "";
diff --git a/src/compiler/ruby_generator.cc b/src/compiler/ruby_generator.cc
index 5ac56ad289..936a186beb 100644
--- a/src/compiler/ruby_generator.cc
+++ b/src/compiler/ruby_generator.cc
@@ -98,8 +98,8 @@ void PrintService(const ServiceDescriptor *service, const grpc::string &package,
out->Print("self.marshal_class_method = :encode\n");
out->Print("self.unmarshal_class_method = :decode\n");
std::map<grpc::string, grpc::string> pkg_vars =
- ListToDict({"service.name", service->name(), "pkg.name", package, });
- out->Print(pkg_vars, "self.service_name = '$pkg.name$.$service.name$'\n");
+ ListToDict({"service_full_name", service->full_name()});
+ out->Print(pkg_vars, "self.service_name = '$service_full_name$'\n");
out->Print("\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintMethod(service->method(i), package, out);
diff --git a/src/core/ext/client_config/channel_connectivity.c b/src/core/ext/client_config/channel_connectivity.c
index bef43d84ca..e20670e51f 100644
--- a/src/core/ext/client_config/channel_connectivity.c
+++ b/src/core/ext/client_config/channel_connectivity.c
@@ -132,7 +132,8 @@ static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
gpr_mu_lock(&w->mu);
const char *msg = grpc_error_string(error);
- gpr_log(GPR_DEBUG, "partly_done: d2c=%d phs=%d err=%s", due_to_completion, w->phase, msg);
+ gpr_log(GPR_DEBUG, "partly_done: d2c=%d phs=%d err=%s", due_to_completion,
+ w->phase, msg);
grpc_error_free_string(msg);
if (due_to_completion) {
diff --git a/src/core/ext/client_config/resolver_registry.c b/src/core/ext/client_config/resolver_registry.c
index 07f29bcb27..e7a4abd568 100644
--- a/src/core/ext/client_config/resolver_registry.c
+++ b/src/core/ext/client_config/resolver_registry.c
@@ -47,7 +47,6 @@ static int g_number_of_resolvers = 0;
static char *g_default_resolver_prefix;
void grpc_resolver_registry_init(const char *default_resolver_prefix) {
- g_number_of_resolvers = 0;
g_default_resolver_prefix = gpr_strdup(default_resolver_prefix);
}
@@ -57,6 +56,13 @@ void grpc_resolver_registry_shutdown(void) {
grpc_resolver_factory_unref(g_all_of_the_resolvers[i]);
}
gpr_free(g_default_resolver_prefix);
+ // FIXME(ctiller): this should live in grpc_resolver_registry_init,
+ // however that would have the client_config plugin call this AFTER we start
+ // registering resolvers from third party plugins, and so they'd never show
+ // up.
+ // We likely need some kind of dependency system for plugins.... what form
+ // that takes is TBD.
+ g_number_of_resolvers = 0;
}
void grpc_register_resolver_type(grpc_resolver_factory *factory) {
diff --git a/src/core/ext/client_config/subchannel.c b/src/core/ext/client_config/subchannel.c
index 3bfba47ea9..4ddacfd7c7 100644
--- a/src/core/ext/client_config/subchannel.c
+++ b/src/core/ext/client_config/subchannel.c
@@ -639,9 +639,11 @@ static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
exec_ctx, &c->state_tracker, GRPC_CHANNEL_TRANSIENT_FAILURE,
GRPC_ERROR_CREATE_REFERENCING("Connect Failed", &error, 1),
"connect_failed");
- gpr_timespec time_til_next = gpr_time_sub(c->next_attempt, gpr_now(c->next_attempt.clock_type));
+ gpr_timespec time_til_next =
+ gpr_time_sub(c->next_attempt, gpr_now(c->next_attempt.clock_type));
const char *errmsg = grpc_error_string(error);
- gpr_log(GPR_INFO, "Connect failed, retry in %d.%09d seconds: %s", time_til_next.tv_sec, time_til_next.tv_nsec, errmsg);
+ gpr_log(GPR_INFO, "Connect failed, retry in %d.%09d seconds: %s",
+ time_til_next.tv_sec, time_til_next.tv_nsec, errmsg);
grpc_error_free_string(errmsg);
grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
}
diff --git a/src/core/ext/client_config/subchannel_call_holder.c b/src/core/ext/client_config/subchannel_call_holder.c
index b65127b627..07b7813482 100644
--- a/src/core/ext/client_config/subchannel_call_holder.c
+++ b/src/core/ext/client_config/subchannel_call_holder.c
@@ -181,6 +181,7 @@ static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg,
GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
if (holder->connected_subchannel == NULL) {
+ gpr_atm_no_barrier_store(&holder->subchannel_call, 1);
fail_locked(exec_ctx, holder,
GRPC_ERROR_CREATE_REFERENCING("Failed to create subchannel",
&error, 1));
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.c b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
index 459d6d9954..59b89997dd 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.c
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.c
@@ -50,7 +50,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
decode_serverlist_arg *dec_arg = *arg;
if (dec_arg->first_pass != 0) { /* first pass */
grpc_grpclb_server server;
- if (!pb_decode(stream, grpc_lb_v0_Server_fields, &server)) {
+ if (!pb_decode(stream, grpc_lb_v1_Server_fields, &server)) {
return false;
}
dec_arg->num_servers++;
@@ -61,7 +61,7 @@ static bool decode_serverlist(pb_istream_t *stream, const pb_field_t *field,
dec_arg->servers =
gpr_malloc(sizeof(grpc_grpclb_server *) * dec_arg->num_servers);
}
- if (!pb_decode(stream, grpc_lb_v0_Server_fields, server)) {
+ if (!pb_decode(stream, grpc_lb_v1_Server_fields, server)) {
return false;
}
dec_arg->servers[dec_arg->i++] = server;
@@ -87,13 +87,13 @@ gpr_slice grpc_grpclb_request_encode(const grpc_grpclb_request *request) {
pb_ostream_t outputstream;
gpr_slice slice;
memset(&sizestream, 0, sizeof(pb_ostream_t));
- pb_encode(&sizestream, grpc_lb_v0_LoadBalanceRequest_fields, request);
+ pb_encode(&sizestream, grpc_lb_v1_LoadBalanceRequest_fields, request);
encoded_length = sizestream.bytes_written;
slice = gpr_slice_malloc(encoded_length);
outputstream =
pb_ostream_from_buffer(GPR_SLICE_START_PTR(slice), encoded_length);
- GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v0_LoadBalanceRequest_fields,
+ GPR_ASSERT(pb_encode(&outputstream, grpc_lb_v1_LoadBalanceRequest_fields,
request) != 0);
return slice;
}
@@ -109,7 +109,7 @@ grpc_grpclb_response *grpc_grpclb_response_parse(gpr_slice encoded_response) {
GPR_SLICE_LENGTH(encoded_response));
grpc_grpclb_response *res = gpr_malloc(sizeof(grpc_grpclb_response));
memset(res, 0, sizeof(*res));
- status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res);
+ status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, res);
if (!status) {
grpc_grpclb_response_destroy(res);
return NULL;
@@ -132,7 +132,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
res->server_list.servers.funcs.decode = decode_serverlist;
res->server_list.servers.arg = &arg;
arg.first_pass = 1;
- status = pb_decode(&stream, grpc_lb_v0_LoadBalanceResponse_fields, res);
+ status = pb_decode(&stream, grpc_lb_v1_LoadBalanceResponse_fields, res);
if (!status) {
grpc_grpclb_response_destroy(res);
return NULL;
@@ -140,7 +140,7 @@ grpc_grpclb_serverlist *grpc_grpclb_response_parse_serverlist(
arg.first_pass = 0;
status =
- pb_decode(&stream_at_start, grpc_lb_v0_LoadBalanceResponse_fields, res);
+ pb_decode(&stream_at_start, grpc_lb_v1_LoadBalanceResponse_fields, res);
if (!status) {
grpc_grpclb_response_destroy(res);
return NULL;
diff --git a/src/core/ext/lb_policy/grpclb/load_balancer_api.h b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
index 968f7d278a..71b5616d0c 100644
--- a/src/core/ext/lb_policy/grpclb/load_balancer_api.h
+++ b/src/core/ext/lb_policy/grpclb/load_balancer_api.h
@@ -37,7 +37,7 @@
#include <grpc/support/slice_buffer.h>
#include "src/core/ext/client_config/lb_policy_factory.h"
-#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h"
+#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
#ifdef __cplusplus
extern "C" {
@@ -45,10 +45,10 @@ extern "C" {
#define GRPC_GRPCLB_SERVICE_NAME_MAX_LENGTH 128
-typedef grpc_lb_v0_LoadBalanceRequest grpc_grpclb_request;
-typedef grpc_lb_v0_LoadBalanceResponse grpc_grpclb_response;
-typedef grpc_lb_v0_Server grpc_grpclb_server;
-typedef grpc_lb_v0_Duration grpc_grpclb_duration;
+typedef grpc_lb_v1_LoadBalanceRequest grpc_grpclb_request;
+typedef grpc_lb_v1_LoadBalanceResponse grpc_grpclb_response;
+typedef grpc_lb_v1_Server grpc_grpclb_server;
+typedef grpc_lb_v1_Duration grpc_grpclb_duration;
typedef struct grpc_grpclb_serverlist {
grpc_grpclb_server **servers;
size_t num_servers;
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h
deleted file mode 100644
index 3599f881bb..0000000000
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-/* Automatically generated nanopb header */
-/* Generated by nanopb-0.3.5-dev */
-
-#ifndef PB_LOAD_BALANCER_PB_H_INCLUDED
-#define PB_LOAD_BALANCER_PB_H_INCLUDED
-#include "third_party/nanopb/pb.h"
-#if PB_PROTO_HEADER_VERSION != 30
-#error Regenerate this file with the current version of nanopb generator.
-#endif
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/* Struct definitions */
-typedef struct _grpc_lb_v0_ClientStats {
- bool has_total_requests;
- int64_t total_requests;
- bool has_client_rpc_errors;
- int64_t client_rpc_errors;
- bool has_dropped_requests;
- int64_t dropped_requests;
-} grpc_lb_v0_ClientStats;
-
-typedef struct _grpc_lb_v0_Duration {
- bool has_seconds;
- int64_t seconds;
- bool has_nanos;
- int32_t nanos;
-} grpc_lb_v0_Duration;
-
-typedef struct _grpc_lb_v0_InitialLoadBalanceRequest {
- bool has_name;
- char name[128];
-} grpc_lb_v0_InitialLoadBalanceRequest;
-
-typedef PB_BYTES_ARRAY_T(64) grpc_lb_v0_Server_load_balance_token_t;
-typedef struct _grpc_lb_v0_Server {
- bool has_ip_address;
- char ip_address[46];
- bool has_port;
- int32_t port;
- bool has_load_balance_token;
- grpc_lb_v0_Server_load_balance_token_t load_balance_token;
- bool has_drop_request;
- bool drop_request;
-} grpc_lb_v0_Server;
-
-typedef struct _grpc_lb_v0_InitialLoadBalanceResponse {
- bool has_client_config;
- char client_config[64];
- bool has_load_balancer_delegate;
- char load_balancer_delegate[64];
- bool has_client_stats_report_interval;
- grpc_lb_v0_Duration client_stats_report_interval;
-} grpc_lb_v0_InitialLoadBalanceResponse;
-
-typedef struct _grpc_lb_v0_LoadBalanceRequest {
- bool has_initial_request;
- grpc_lb_v0_InitialLoadBalanceRequest initial_request;
- bool has_client_stats;
- grpc_lb_v0_ClientStats client_stats;
-} grpc_lb_v0_LoadBalanceRequest;
-
-typedef struct _grpc_lb_v0_ServerList {
- pb_callback_t servers;
- bool has_expiration_interval;
- grpc_lb_v0_Duration expiration_interval;
-} grpc_lb_v0_ServerList;
-
-typedef struct _grpc_lb_v0_LoadBalanceResponse {
- bool has_initial_response;
- grpc_lb_v0_InitialLoadBalanceResponse initial_response;
- bool has_server_list;
- grpc_lb_v0_ServerList server_list;
-} grpc_lb_v0_LoadBalanceResponse;
-
-/* Default values for struct fields */
-
-/* Initializer values for message structs */
-#define grpc_lb_v0_Duration_init_default {false, 0, false, 0}
-#define grpc_lb_v0_LoadBalanceRequest_init_default {false, grpc_lb_v0_InitialLoadBalanceRequest_init_default, false, grpc_lb_v0_ClientStats_init_default}
-#define grpc_lb_v0_InitialLoadBalanceRequest_init_default {false, ""}
-#define grpc_lb_v0_ClientStats_init_default {false, 0, false, 0, false, 0}
-#define grpc_lb_v0_LoadBalanceResponse_init_default {false, grpc_lb_v0_InitialLoadBalanceResponse_init_default, false, grpc_lb_v0_ServerList_init_default}
-#define grpc_lb_v0_InitialLoadBalanceResponse_init_default {false, "", false, "", false, grpc_lb_v0_Duration_init_default}
-#define grpc_lb_v0_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v0_Duration_init_default}
-#define grpc_lb_v0_Server_init_default {false, "", false, 0, false, {0, {0}}, false, 0}
-#define grpc_lb_v0_Duration_init_zero {false, 0, false, 0}
-#define grpc_lb_v0_LoadBalanceRequest_init_zero {false, grpc_lb_v0_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v0_ClientStats_init_zero}
-#define grpc_lb_v0_InitialLoadBalanceRequest_init_zero {false, ""}
-#define grpc_lb_v0_ClientStats_init_zero {false, 0, false, 0, false, 0}
-#define grpc_lb_v0_LoadBalanceResponse_init_zero {false, grpc_lb_v0_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v0_ServerList_init_zero}
-#define grpc_lb_v0_InitialLoadBalanceResponse_init_zero {false, "", false, "", false, grpc_lb_v0_Duration_init_zero}
-#define grpc_lb_v0_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v0_Duration_init_zero}
-#define grpc_lb_v0_Server_init_zero {false, "", false, 0, false, {0, {0}}, false, 0}
-
-/* Field tags (for use in manual encoding/decoding) */
-#define grpc_lb_v0_ClientStats_total_requests_tag 1
-#define grpc_lb_v0_ClientStats_client_rpc_errors_tag 2
-#define grpc_lb_v0_ClientStats_dropped_requests_tag 3
-#define grpc_lb_v0_Duration_seconds_tag 1
-#define grpc_lb_v0_Duration_nanos_tag 2
-#define grpc_lb_v0_InitialLoadBalanceRequest_name_tag 1
-#define grpc_lb_v0_Server_ip_address_tag 1
-#define grpc_lb_v0_Server_port_tag 2
-#define grpc_lb_v0_Server_load_balance_token_tag 3
-#define grpc_lb_v0_Server_drop_request_tag 4
-#define grpc_lb_v0_InitialLoadBalanceResponse_client_config_tag 1
-#define grpc_lb_v0_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
-#define grpc_lb_v0_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
-#define grpc_lb_v0_LoadBalanceRequest_initial_request_tag 1
-#define grpc_lb_v0_LoadBalanceRequest_client_stats_tag 2
-#define grpc_lb_v0_ServerList_servers_tag 1
-#define grpc_lb_v0_ServerList_expiration_interval_tag 3
-#define grpc_lb_v0_LoadBalanceResponse_initial_response_tag 1
-#define grpc_lb_v0_LoadBalanceResponse_server_list_tag 2
-
-/* Struct field encoding specification for nanopb */
-extern const pb_field_t grpc_lb_v0_Duration_fields[3];
-extern const pb_field_t grpc_lb_v0_LoadBalanceRequest_fields[3];
-extern const pb_field_t grpc_lb_v0_InitialLoadBalanceRequest_fields[2];
-extern const pb_field_t grpc_lb_v0_ClientStats_fields[4];
-extern const pb_field_t grpc_lb_v0_LoadBalanceResponse_fields[3];
-extern const pb_field_t grpc_lb_v0_InitialLoadBalanceResponse_fields[4];
-extern const pb_field_t grpc_lb_v0_ServerList_fields[3];
-extern const pb_field_t grpc_lb_v0_Server_fields[5];
-
-/* Maximum encoded size of messages (where known) */
-#define grpc_lb_v0_Duration_size 22
-#define grpc_lb_v0_LoadBalanceRequest_size 169
-#define grpc_lb_v0_InitialLoadBalanceRequest_size 131
-#define grpc_lb_v0_ClientStats_size 33
-#define grpc_lb_v0_LoadBalanceResponse_size (165 + grpc_lb_v0_ServerList_size)
-#define grpc_lb_v0_InitialLoadBalanceResponse_size 156
-#define grpc_lb_v0_Server_size 127
-
-/* Message IDs (where set with "msgid" option) */
-#ifdef PB_MSGID
-
-#define LOAD_BALANCER_MESSAGES \
-
-
-#endif
-
-#ifdef __cplusplus
-} /* extern "C" */
-#endif
-
-#endif
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
index 9719673181..52e11c40bb 100644
--- a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c
@@ -33,7 +33,7 @@
/* Automatically generated nanopb constant definitions */
/* Generated by nanopb-0.3.5-dev */
-#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.h"
+#include "src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h"
#if PB_PROTO_HEADER_VERSION != 30
#error Regenerate this file with the current version of nanopb generator.
@@ -41,54 +41,53 @@
-const pb_field_t grpc_lb_v0_Duration_fields[3] = {
- PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v0_Duration, seconds, seconds, 0),
- PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Duration, nanos, seconds, 0),
+const pb_field_t grpc_lb_v1_Duration_fields[3] = {
+ PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Duration, seconds, seconds, 0),
+ PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Duration, nanos, seconds, 0),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_LoadBalanceRequest_fields[3] = {
- PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v0_LoadBalanceRequest, initial_request, initial_request, &grpc_lb_v0_InitialLoadBalanceRequest_fields),
- PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_LoadBalanceRequest, client_stats, initial_request, &grpc_lb_v0_ClientStats_fields),
+const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3] = {
+ PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_LoadBalanceRequest, initial_request, initial_request, &grpc_lb_v1_InitialLoadBalanceRequest_fields),
+ PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_LoadBalanceRequest, client_stats, initial_request, &grpc_lb_v1_ClientStats_fields),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_InitialLoadBalanceRequest_fields[2] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_InitialLoadBalanceRequest, name, name, 0),
+const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2] = {
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceRequest, name, name, 0),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_ClientStats_fields[4] = {
- PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v0_ClientStats, total_requests, total_requests, 0),
- PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ClientStats, client_rpc_errors, total_requests, 0),
- PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ClientStats, dropped_requests, client_rpc_errors, 0),
+const pb_field_t grpc_lb_v1_ClientStats_fields[4] = {
+ PB_FIELD( 1, INT64 , OPTIONAL, STATIC , FIRST, grpc_lb_v1_ClientStats, total_requests, total_requests, 0),
+ PB_FIELD( 2, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, client_rpc_errors, total_requests, 0),
+ PB_FIELD( 3, INT64 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ClientStats, dropped_requests, client_rpc_errors, 0),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_LoadBalanceResponse_fields[3] = {
- PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v0_LoadBalanceResponse, initial_response, initial_response, &grpc_lb_v0_InitialLoadBalanceResponse_fields),
- PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_LoadBalanceResponse, server_list, initial_response, &grpc_lb_v0_ServerList_fields),
+const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3] = {
+ PB_FIELD( 1, MESSAGE , OPTIONAL, STATIC , FIRST, grpc_lb_v1_LoadBalanceResponse, initial_response, initial_response, &grpc_lb_v1_InitialLoadBalanceResponse_fields),
+ PB_FIELD( 2, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_LoadBalanceResponse, server_list, initial_response, &grpc_lb_v1_ServerList_fields),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_InitialLoadBalanceResponse_fields[4] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_InitialLoadBalanceResponse, client_config, client_config, 0),
- PB_FIELD( 2, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v0_InitialLoadBalanceResponse, load_balancer_delegate, client_config, 0),
- PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v0_Duration_fields),
+const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3] = {
+ PB_FIELD( 2, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_InitialLoadBalanceResponse, load_balancer_delegate, load_balancer_delegate, 0),
+ PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval, load_balancer_delegate, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_ServerList_fields[3] = {
- PB_FIELD( 1, MESSAGE , REPEATED, CALLBACK, FIRST, grpc_lb_v0_ServerList, servers, servers, &grpc_lb_v0_Server_fields),
- PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v0_ServerList, expiration_interval, servers, &grpc_lb_v0_Duration_fields),
+const pb_field_t grpc_lb_v1_ServerList_fields[3] = {
+ PB_FIELD( 1, MESSAGE , REPEATED, CALLBACK, FIRST, grpc_lb_v1_ServerList, servers, servers, &grpc_lb_v1_Server_fields),
+ PB_FIELD( 3, MESSAGE , OPTIONAL, STATIC , OTHER, grpc_lb_v1_ServerList, expiration_interval, servers, &grpc_lb_v1_Duration_fields),
PB_LAST_FIELD
};
-const pb_field_t grpc_lb_v0_Server_fields[5] = {
- PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v0_Server, ip_address, ip_address, 0),
- PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, port, ip_address, 0),
- PB_FIELD( 3, BYTES , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, load_balance_token, port, 0),
- PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v0_Server, drop_request, load_balance_token, 0),
+const pb_field_t grpc_lb_v1_Server_fields[5] = {
+ PB_FIELD( 1, STRING , OPTIONAL, STATIC , FIRST, grpc_lb_v1_Server, ip_address, ip_address, 0),
+ PB_FIELD( 2, INT32 , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, port, ip_address, 0),
+ PB_FIELD( 3, STRING , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, load_balance_token, port, 0),
+ PB_FIELD( 4, BOOL , OPTIONAL, STATIC , OTHER, grpc_lb_v1_Server, drop_request, load_balance_token, 0),
PB_LAST_FIELD
};
@@ -102,7 +101,7 @@ const pb_field_t grpc_lb_v0_Server_fields[5] = {
* numbers or field sizes that are larger than what can fit in 8 or 16 bit
* field descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v0_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v0_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v0_Duration_grpc_lb_v0_LoadBalanceRequest_grpc_lb_v0_InitialLoadBalanceRequest_grpc_lb_v0_ClientStats_grpc_lb_v0_LoadBalanceResponse_grpc_lb_v0_InitialLoadBalanceResponse_grpc_lb_v0_ServerList_grpc_lb_v0_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 65536 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 65536 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 65536 && pb_membersize(grpc_lb_v1_ServerList, servers) < 65536 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 65536), YOU_MUST_DEFINE_PB_FIELD_32BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
#if !defined(PB_FIELD_16BIT) && !defined(PB_FIELD_32BIT)
@@ -113,7 +112,7 @@ PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request)
* numbers or field sizes that are larger than what can fit in the default
* 8 bit descriptors.
*/
-PB_STATIC_ASSERT((pb_membersize(grpc_lb_v0_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v0_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v0_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v0_ServerList, servers) < 256 && pb_membersize(grpc_lb_v0_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v0_Duration_grpc_lb_v0_LoadBalanceRequest_grpc_lb_v0_InitialLoadBalanceRequest_grpc_lb_v0_ClientStats_grpc_lb_v0_LoadBalanceResponse_grpc_lb_v0_InitialLoadBalanceResponse_grpc_lb_v0_ServerList_grpc_lb_v0_Server)
+PB_STATIC_ASSERT((pb_membersize(grpc_lb_v1_LoadBalanceRequest, initial_request) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceRequest, client_stats) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, initial_response) < 256 && pb_membersize(grpc_lb_v1_LoadBalanceResponse, server_list) < 256 && pb_membersize(grpc_lb_v1_InitialLoadBalanceResponse, client_stats_report_interval) < 256 && pb_membersize(grpc_lb_v1_ServerList, servers) < 256 && pb_membersize(grpc_lb_v1_ServerList, expiration_interval) < 256), YOU_MUST_DEFINE_PB_FIELD_16BIT_FOR_MESSAGES_grpc_lb_v1_Duration_grpc_lb_v1_LoadBalanceRequest_grpc_lb_v1_InitialLoadBalanceRequest_grpc_lb_v1_ClientStats_grpc_lb_v1_LoadBalanceResponse_grpc_lb_v1_InitialLoadBalanceResponse_grpc_lb_v1_ServerList_grpc_lb_v1_Server)
#endif
diff --git a/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
new file mode 100644
index 0000000000..d5dc39ab94
--- /dev/null
+++ b/src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.h
@@ -0,0 +1,178 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+/* Automatically generated nanopb header */
+/* Generated by nanopb-0.3.5-dev */
+
+#ifndef PB_LOAD_BALANCER_PB_H_INCLUDED
+#define PB_LOAD_BALANCER_PB_H_INCLUDED
+#include "third_party/nanopb/pb.h"
+#if PB_PROTO_HEADER_VERSION != 30
+#error Regenerate this file with the current version of nanopb generator.
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Struct definitions */
+typedef struct _grpc_lb_v1_ClientStats {
+ bool has_total_requests;
+ int64_t total_requests;
+ bool has_client_rpc_errors;
+ int64_t client_rpc_errors;
+ bool has_dropped_requests;
+ int64_t dropped_requests;
+} grpc_lb_v1_ClientStats;
+
+typedef struct _grpc_lb_v1_Duration {
+ bool has_seconds;
+ int64_t seconds;
+ bool has_nanos;
+ int32_t nanos;
+} grpc_lb_v1_Duration;
+
+typedef struct _grpc_lb_v1_InitialLoadBalanceRequest {
+ bool has_name;
+ char name[128];
+} grpc_lb_v1_InitialLoadBalanceRequest;
+
+typedef struct _grpc_lb_v1_Server {
+ bool has_ip_address;
+ char ip_address[46];
+ bool has_port;
+ int32_t port;
+ bool has_load_balance_token;
+ char load_balance_token[64];
+ bool has_drop_request;
+ bool drop_request;
+} grpc_lb_v1_Server;
+
+typedef struct _grpc_lb_v1_InitialLoadBalanceResponse {
+ bool has_load_balancer_delegate;
+ char load_balancer_delegate[64];
+ bool has_client_stats_report_interval;
+ grpc_lb_v1_Duration client_stats_report_interval;
+} grpc_lb_v1_InitialLoadBalanceResponse;
+
+typedef struct _grpc_lb_v1_LoadBalanceRequest {
+ bool has_initial_request;
+ grpc_lb_v1_InitialLoadBalanceRequest initial_request;
+ bool has_client_stats;
+ grpc_lb_v1_ClientStats client_stats;
+} grpc_lb_v1_LoadBalanceRequest;
+
+typedef struct _grpc_lb_v1_ServerList {
+ pb_callback_t servers;
+ bool has_expiration_interval;
+ grpc_lb_v1_Duration expiration_interval;
+} grpc_lb_v1_ServerList;
+
+typedef struct _grpc_lb_v1_LoadBalanceResponse {
+ bool has_initial_response;
+ grpc_lb_v1_InitialLoadBalanceResponse initial_response;
+ bool has_server_list;
+ grpc_lb_v1_ServerList server_list;
+} grpc_lb_v1_LoadBalanceResponse;
+
+/* Default values for struct fields */
+
+/* Initializer values for message structs */
+#define grpc_lb_v1_Duration_init_default {false, 0, false, 0}
+#define grpc_lb_v1_LoadBalanceRequest_init_default {false, grpc_lb_v1_InitialLoadBalanceRequest_init_default, false, grpc_lb_v1_ClientStats_init_default}
+#define grpc_lb_v1_InitialLoadBalanceRequest_init_default {false, ""}
+#define grpc_lb_v1_ClientStats_init_default {false, 0, false, 0, false, 0}
+#define grpc_lb_v1_LoadBalanceResponse_init_default {false, grpc_lb_v1_InitialLoadBalanceResponse_init_default, false, grpc_lb_v1_ServerList_init_default}
+#define grpc_lb_v1_InitialLoadBalanceResponse_init_default {false, "", false, grpc_lb_v1_Duration_init_default}
+#define grpc_lb_v1_ServerList_init_default {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_default}
+#define grpc_lb_v1_Server_init_default {false, "", false, 0, false, "", false, 0}
+#define grpc_lb_v1_Duration_init_zero {false, 0, false, 0}
+#define grpc_lb_v1_LoadBalanceRequest_init_zero {false, grpc_lb_v1_InitialLoadBalanceRequest_init_zero, false, grpc_lb_v1_ClientStats_init_zero}
+#define grpc_lb_v1_InitialLoadBalanceRequest_init_zero {false, ""}
+#define grpc_lb_v1_ClientStats_init_zero {false, 0, false, 0, false, 0}
+#define grpc_lb_v1_LoadBalanceResponse_init_zero {false, grpc_lb_v1_InitialLoadBalanceResponse_init_zero, false, grpc_lb_v1_ServerList_init_zero}
+#define grpc_lb_v1_InitialLoadBalanceResponse_init_zero {false, "", false, grpc_lb_v1_Duration_init_zero}
+#define grpc_lb_v1_ServerList_init_zero {{{NULL}, NULL}, false, grpc_lb_v1_Duration_init_zero}
+#define grpc_lb_v1_Server_init_zero {false, "", false, 0, false, "", false, 0}
+
+/* Field tags (for use in manual encoding/decoding) */
+#define grpc_lb_v1_ClientStats_total_requests_tag 1
+#define grpc_lb_v1_ClientStats_client_rpc_errors_tag 2
+#define grpc_lb_v1_ClientStats_dropped_requests_tag 3
+#define grpc_lb_v1_Duration_seconds_tag 1
+#define grpc_lb_v1_Duration_nanos_tag 2
+#define grpc_lb_v1_InitialLoadBalanceRequest_name_tag 1
+#define grpc_lb_v1_Server_ip_address_tag 1
+#define grpc_lb_v1_Server_port_tag 2
+#define grpc_lb_v1_Server_load_balance_token_tag 3
+#define grpc_lb_v1_Server_drop_request_tag 4
+#define grpc_lb_v1_InitialLoadBalanceResponse_load_balancer_delegate_tag 2
+#define grpc_lb_v1_InitialLoadBalanceResponse_client_stats_report_interval_tag 3
+#define grpc_lb_v1_LoadBalanceRequest_initial_request_tag 1
+#define grpc_lb_v1_LoadBalanceRequest_client_stats_tag 2
+#define grpc_lb_v1_ServerList_servers_tag 1
+#define grpc_lb_v1_ServerList_expiration_interval_tag 3
+#define grpc_lb_v1_LoadBalanceResponse_initial_response_tag 1
+#define grpc_lb_v1_LoadBalanceResponse_server_list_tag 2
+
+/* Struct field encoding specification for nanopb */
+extern const pb_field_t grpc_lb_v1_Duration_fields[3];
+extern const pb_field_t grpc_lb_v1_LoadBalanceRequest_fields[3];
+extern const pb_field_t grpc_lb_v1_InitialLoadBalanceRequest_fields[2];
+extern const pb_field_t grpc_lb_v1_ClientStats_fields[4];
+extern const pb_field_t grpc_lb_v1_LoadBalanceResponse_fields[3];
+extern const pb_field_t grpc_lb_v1_InitialLoadBalanceResponse_fields[3];
+extern const pb_field_t grpc_lb_v1_ServerList_fields[3];
+extern const pb_field_t grpc_lb_v1_Server_fields[5];
+
+/* Maximum encoded size of messages (where known) */
+#define grpc_lb_v1_Duration_size 22
+#define grpc_lb_v1_LoadBalanceRequest_size 169
+#define grpc_lb_v1_InitialLoadBalanceRequest_size 131
+#define grpc_lb_v1_ClientStats_size 33
+#define grpc_lb_v1_LoadBalanceResponse_size (98 + grpc_lb_v1_ServerList_size)
+#define grpc_lb_v1_InitialLoadBalanceResponse_size 90
+#define grpc_lb_v1_Server_size 127
+
+/* Message IDs (where set with "msgid" option) */
+#ifdef PB_MSGID
+
+#define LOAD_BALANCER_MESSAGES \
+
+
+#endif
+
+#ifdef __cplusplus
+} /* extern "C" */
+#endif
+
+#endif
diff --git a/src/core/ext/lb_policy/round_robin/round_robin.c b/src/core/ext/lb_policy/round_robin/round_robin.c
index ada2ef9b7c..2154250846 100644
--- a/src/core/ext/lb_policy/round_robin/round_robin.c
+++ b/src/core/ext/lb_policy/round_robin/round_robin.c
@@ -308,8 +308,10 @@ static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
size_t i;
p->started_picking = 1;
- gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p,
- p->num_subchannels);
+ if (grpc_lb_round_robin_trace) {
+ gpr_log(GPR_DEBUG, "LB_POLICY: p=%p num_subchannels=%d", p,
+ p->num_subchannels);
+ }
for (i = 0; i < p->num_subchannels; i++) {
subchannel_data *sd = p->subchannels[i];
diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
index d994aa3871..395dd132e3 100644
--- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c
+++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c
@@ -56,6 +56,8 @@
#define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
#define MAX_WINDOW 0x7fffffffu
+#define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024)
+
#define MAX_CLIENT_STREAM_ID 0x7fffffffu
int grpc_http_trace = 0;
@@ -65,8 +67,8 @@ int grpc_flowctl_trace = 0;
((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
writing)))
-#define TRANSPORT_FROM_PARSING(tw) \
- ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
+#define TRANSPORT_FROM_PARSING(tp) \
+ ((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \
parsing)))
#define TRANSPORT_FROM_GLOBAL(tg) \
@@ -312,6 +314,8 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
}
push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
+ push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ DEFAULT_MAX_HEADER_LIST_SIZE);
if (channel_args) {
for (i = 0; i < channel_args->num_args; i++) {
@@ -379,6 +383,18 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
&t->writing.hpack_compressor,
(uint32_t)channel_args->args[i].value.integer);
}
+ } else if (0 == strcmp(channel_args->args[i].key,
+ GRPC_ARG_MAX_METADATA_SIZE)) {
+ if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
+ gpr_log(GPR_ERROR, "%s: must be an integer",
+ GRPC_ARG_MAX_METADATA_SIZE);
+ } else if (channel_args->args[i].value.integer < 0) {
+ gpr_log(GPR_ERROR, "%s: must be non-negative",
+ GRPC_ARG_MAX_METADATA_SIZE);
+ } else {
+ push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE,
+ (uint32_t)channel_args->args[i].value.integer);
+ }
}
}
}
@@ -511,7 +527,7 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
[GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
*t->accepting_stream = s;
grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
- s->global.in_stream_map = 1;
+ s->global.in_stream_map = true;
}
grpc_chttp2_run_with_global_lock(exec_ctx, t, s, finish_init_stream_locked,
@@ -786,7 +802,8 @@ void grpc_chttp2_add_incoming_goaway(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
uint32_t goaway_error, gpr_slice goaway_text) {
char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
+ GRPC_CHTTP2_IF_TRACING(
+ gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg));
gpr_slice_unref(goaway_text);
transport_global->seen_goaway = 1;
connectivity_state_set(
@@ -841,7 +858,7 @@ static void maybe_start_some_streams(
grpc_chttp2_stream_map_add(
&TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
stream_global->id, STREAM_FROM_GLOBAL(stream_global));
- stream_global->in_stream_map = 1;
+ stream_global->in_stream_map = true;
transport_global->concurrent_stream_count++;
grpc_chttp2_become_writable(transport_global, stream_global);
}
@@ -947,26 +964,40 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
stream_global->send_initial_metadata_finished =
add_closure_barrier(on_complete);
stream_global->send_initial_metadata = op->send_initial_metadata;
- if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
- stream_global->seen_error = 1;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
- }
- if (!stream_global->write_closed) {
- if (transport_global->is_client) {
- GPR_ASSERT(stream_global->id == 0);
- grpc_chttp2_list_add_waiting_for_concurrency(transport_global,
- stream_global);
- maybe_start_some_streams(exec_ctx, transport_global);
+ const size_t metadata_size =
+ grpc_metadata_batch_size(op->send_initial_metadata);
+ const size_t metadata_peer_limit =
+ transport_global->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
+ if (metadata_size > metadata_peer_limit) {
+ gpr_log(GPR_DEBUG,
+ "to-be-sent initial metadata size exceeds peer limit "
+ "(%lu vs. %lu)",
+ metadata_size, metadata_peer_limit);
+ cancel_from_api(exec_ctx, transport_global, stream_global,
+ GRPC_STATUS_RESOURCE_EXHAUSTED);
+ } else {
+ if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
+ stream_global->seen_error = true;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ if (!stream_global->write_closed) {
+ if (transport_global->is_client) {
+ GPR_ASSERT(stream_global->id == 0);
+ grpc_chttp2_list_add_waiting_for_concurrency(transport_global,
+ stream_global);
+ maybe_start_some_streams(exec_ctx, transport_global);
+ } else {
+ GPR_ASSERT(stream_global->id != 0);
+ grpc_chttp2_become_writable(transport_global, stream_global);
+ }
} else {
- GPR_ASSERT(stream_global->id != 0);
- grpc_chttp2_become_writable(transport_global, stream_global);
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, transport_global, stream_global,
+ &stream_global->send_initial_metadata_finished,
+ GRPC_ERROR_CREATE(
+ "Attempt to send initial metadata after stream was closed"));
}
- } else {
- grpc_chttp2_complete_closure_step(
- exec_ctx, transport_global, stream_global,
- &stream_global->send_initial_metadata_finished,
- GRPC_ERROR_CREATE(
- "Attempt to send initial metadata after stream was closed"));
}
}
@@ -992,22 +1023,37 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
stream_global->send_trailing_metadata_finished =
add_closure_barrier(on_complete);
stream_global->send_trailing_metadata = op->send_trailing_metadata;
- if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) {
- stream_global->seen_error = 1;
- grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
- }
- if (stream_global->write_closed) {
- grpc_chttp2_complete_closure_step(
- exec_ctx, transport_global, stream_global,
- &stream_global->send_trailing_metadata_finished,
- grpc_metadata_batch_is_empty(op->send_trailing_metadata)
- ? GRPC_ERROR_NONE
- : GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
- "stream was closed"));
- } else if (stream_global->id != 0) {
- /* TODO(ctiller): check if there's flow control for any outstanding
- bytes before going writable */
- grpc_chttp2_become_writable(transport_global, stream_global);
+ const size_t metadata_size =
+ grpc_metadata_batch_size(op->send_trailing_metadata);
+ const size_t metadata_peer_limit =
+ transport_global->settings[GRPC_PEER_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
+ if (metadata_size > metadata_peer_limit) {
+ gpr_log(GPR_DEBUG,
+ "to-be-sent trailing metadata size exceeds peer limit "
+ "(%lu vs. %lu)",
+ metadata_size, metadata_peer_limit);
+ cancel_from_api(exec_ctx, transport_global, stream_global,
+ GRPC_STATUS_RESOURCE_EXHAUSTED);
+ } else {
+ if (contains_non_ok_status(transport_global,
+ op->send_trailing_metadata)) {
+ stream_global->seen_error = true;
+ grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
+ }
+ if (stream_global->write_closed) {
+ grpc_chttp2_complete_closure_step(
+ exec_ctx, transport_global, stream_global,
+ &stream_global->send_trailing_metadata_finished,
+ grpc_metadata_batch_is_empty(op->send_trailing_metadata)
+ ? GRPC_ERROR_NONE
+ : GRPC_ERROR_CREATE("Attempt to send trailing metadata after "
+ "stream was closed"));
+ } else if (stream_global->id != 0) {
+ /* TODO(ctiller): check if there's flow control for any outstanding
+ bytes before going writable */
+ grpc_chttp2_become_writable(transport_global, stream_global);
+ }
}
}
@@ -1173,6 +1219,16 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) {
if (stream_global->recv_initial_metadata_ready != NULL &&
stream_global->published_initial_metadata) {
+ if (stream_global->seen_error) {
+ while ((bs = grpc_chttp2_incoming_frame_queue_pop(
+ &stream_global->incoming_frames)) != NULL) {
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ }
+ if (stream_global->exceeded_metadata_size) {
+ cancel_from_api(exec_ctx, transport_global, stream_global,
+ GRPC_STATUS_RESOURCE_EXHAUSTED);
+ }
+ }
grpc_chttp2_incoming_metadata_buffer_publish(
&stream_global->received_initial_metadata,
stream_global->recv_initial_metadata);
@@ -1202,10 +1258,15 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
}
if (stream_global->recv_trailing_metadata_finished != NULL &&
stream_global->read_closed && stream_global->write_closed) {
- while (stream_global->seen_error &&
- (bs = grpc_chttp2_incoming_frame_queue_pop(
- &stream_global->incoming_frames)) != NULL) {
- incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ if (stream_global->seen_error) {
+ while ((bs = grpc_chttp2_incoming_frame_queue_pop(
+ &stream_global->incoming_frames)) != NULL) {
+ incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
+ }
+ if (stream_global->exceeded_metadata_size) {
+ cancel_from_api(exec_ctx, transport_global, stream_global,
+ GRPC_STATUS_RESOURCE_EXHAUSTED);
+ }
}
if (stream_global->all_incoming_byte_streams_finished) {
grpc_chttp2_incoming_metadata_buffer_publish(
@@ -1237,7 +1298,7 @@ static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
}
GPR_ASSERT(s);
- s->global.in_stream_map = 0;
+ s->global.in_stream_map = false;
if (t->parsing.incoming_stream == &s->parsing) {
t->parsing.incoming_stream = NULL;
grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing);
@@ -1283,7 +1344,7 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx,
NULL);
}
if (status != GRPC_STATUS_OK && !stream_global->seen_error) {
- stream_global->seen_error = 1;
+ stream_global->seen_error = true;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
grpc_chttp2_mark_stream_closed(
@@ -1297,7 +1358,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
grpc_chttp2_stream_global *stream_global,
grpc_status_code status, gpr_slice *slice) {
if (status != GRPC_STATUS_OK) {
- stream_global->seen_error = 1;
+ stream_global->seen_error = true;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
/* stream_global->recv_trailing_metadata_finished gives us a
@@ -1321,7 +1382,7 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
GRPC_MDSTR_GRPC_MESSAGE,
grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
}
- stream_global->published_trailing_metadata = 1;
+ stream_global->published_trailing_metadata = true;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
if (slice) {
@@ -1355,13 +1416,13 @@ void grpc_chttp2_mark_stream_closed(
}
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
if (close_reads && !stream_global->read_closed) {
- stream_global->read_closed = 1;
- stream_global->published_initial_metadata = 1;
- stream_global->published_trailing_metadata = 1;
+ stream_global->read_closed = true;
+ stream_global->published_initial_metadata = true;
+ stream_global->published_trailing_metadata = true;
decrement_active_streams_locked(exec_ctx, transport_global, stream_global);
}
if (close_writes && !stream_global->write_closed) {
- stream_global->write_closed = 1;
+ stream_global->write_closed = true;
if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) {
GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes");
grpc_chttp2_list_add_closed_waiting_for_writing(transport_global,
diff --git a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
index ea71123054..08804fa553 100644
--- a/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
+++ b/src/core/ext/transport/chttp2/transport/frame_rst_stream.c
@@ -47,15 +47,20 @@ gpr_slice grpc_chttp2_rst_stream_create(uint32_t id, uint32_t code,
stats->framing_bytes += frame_size;
uint8_t *p = GPR_SLICE_START_PTR(slice);
+ // Frame size.
*p++ = 0;
*p++ = 0;
*p++ = 4;
+ // Frame type.
*p++ = GRPC_CHTTP2_FRAME_RST_STREAM;
+ // Flags.
*p++ = 0;
+ // Stream ID.
*p++ = (uint8_t)(id >> 24);
*p++ = (uint8_t)(id >> 16);
*p++ = (uint8_t)(id >> 8);
*p++ = (uint8_t)(id);
+ // Error code.
*p++ = (uint8_t)(code >> 24);
*p++ = (uint8_t)(code >> 16);
*p++ = (uint8_t)(code >> 8);
diff --git a/src/core/ext/transport/chttp2/transport/hpack_encoder.c b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
index 555027c866..ebeee37f0d 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_encoder.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_encoder.c
@@ -63,6 +63,8 @@
/* don't consider adding anything bigger than this to the hpack table */
#define MAX_DECODER_SPACE_USAGE 512
+extern int grpc_http_trace;
+
typedef struct {
int is_first_frame;
/* number of bytes in 'output' when we started the frame - used to calculate
@@ -532,7 +534,9 @@ void grpc_chttp2_hpack_compressor_set_max_table_size(
}
}
c->advertise_table_size_change = 1;
- gpr_log(GPR_DEBUG, "set max table size from encoder to %d", max_table_size);
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "set max table size from encoder to %d", max_table_size);
+ }
}
void grpc_chttp2_encode_header(grpc_chttp2_hpack_compressor *c,
diff --git a/src/core/ext/transport/chttp2/transport/hpack_table.c b/src/core/ext/transport/chttp2/transport/hpack_table.c
index 0684d49899..2b73ec969e 100644
--- a/src/core/ext/transport/chttp2/transport/hpack_table.c
+++ b/src/core/ext/transport/chttp2/transport/hpack_table.c
@@ -254,7 +254,9 @@ void grpc_chttp2_hptbl_set_max_bytes(grpc_chttp2_hptbl *tbl,
if (tbl->max_bytes == max_bytes) {
return;
}
- gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes);
+ if (grpc_http_trace) {
+ gpr_log(GPR_DEBUG, "Update hpack parser max size to %d", max_bytes);
+ }
while (tbl->mem_used > max_bytes) {
evict1(tbl);
}
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.c b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
index db21744f0c..3e463a7995 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.c
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.c
@@ -65,6 +65,7 @@ void grpc_chttp2_incoming_metadata_buffer_add(
gpr_realloc(buffer->elems, sizeof(*buffer->elems) * buffer->capacity);
}
buffer->elems[buffer->count++].md = elem;
+ buffer->size += GRPC_MDELEM_LENGTH(elem);
}
void grpc_chttp2_incoming_metadata_buffer_set_deadline(
diff --git a/src/core/ext/transport/chttp2/transport/incoming_metadata.h b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
index 17ecf8e181..df4343b93e 100644
--- a/src/core/ext/transport/chttp2/transport/incoming_metadata.h
+++ b/src/core/ext/transport/chttp2/transport/incoming_metadata.h
@@ -42,6 +42,7 @@ typedef struct {
size_t capacity;
gpr_timespec deadline;
int published;
+ size_t size; // total size of metadata
} grpc_chttp2_incoming_metadata_buffer;
/** assumes everything initially zeroed */
diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h
index ff01f70319..dba9056a50 100644
--- a/src/core/ext/transport/chttp2/transport/internal.h
+++ b/src/core/ext/transport/chttp2/transport/internal.h
@@ -422,23 +422,21 @@ typedef struct {
/** number of streams that are currently being read */
gpr_refcount active_streams;
- /** when the application requests writes be closed, the write_closed is
- 'queued'; when the close is flow controlled into the send path, we are
- 'sending' it; when the write has been performed it is 'sent' */
+ /** Is this stream closed for writing. */
bool write_closed;
- /** is this stream reading half-closed (boolean) */
+ /** Is this stream reading half-closed. */
bool read_closed;
- /** are all published incoming byte streams closed */
+ /** Are all published incoming byte streams closed. */
bool all_incoming_byte_streams_finished;
- /** is this stream in the stream map? (boolean) */
+ /** Is this stream in the stream map. */
bool in_stream_map;
- /** has this stream seen an error? if 1, then pending incoming frames
- can be thrown away */
+ /** Has this stream seen an error.
+ If true, then pending incoming frames can be thrown away. */
bool seen_error;
+ bool exceeded_metadata_size;
bool published_initial_metadata;
bool published_trailing_metadata;
- bool faked_trailing_metadata;
grpc_chttp2_incoming_metadata_buffer received_initial_metadata;
grpc_chttp2_incoming_metadata_buffer received_trailing_metadata;
@@ -470,6 +468,8 @@ typedef struct {
} grpc_chttp2_stream_writing;
struct grpc_chttp2_stream_parsing {
+ /** saw some stream level error */
+ grpc_error *forced_close_error;
/** HTTP2 stream id for this stream, or zero if one has not been assigned */
uint32_t id;
/** has this stream received a close */
@@ -478,10 +478,9 @@ struct grpc_chttp2_stream_parsing {
uint8_t header_frames_received;
/** which metadata did we get (on this parse) */
uint8_t got_metadata_on_parse[2];
- /** saw some stream level error */
- grpc_error *forced_close_error;
- /** should we raise the seen_error flag in stream_global */
- uint8_t seen_error;
+ /** should we raise the seen_error flag in transport_global */
+ bool seen_error;
+ bool exceeded_metadata_size;
/** window available for peer to send to us */
int64_t incoming_window;
/** parsing state for data frames */
diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c
index e0bb3a02d7..445805b61f 100644
--- a/src/core/ext/transport/chttp2/transport/parsing.c
+++ b/src/core/ext/transport/chttp2/transport/parsing.c
@@ -45,6 +45,10 @@
#include "src/core/lib/profiling/timers.h"
#include "src/core/lib/transport/static_metadata.h"
+#define TRANSPORT_FROM_PARSING(tp) \
+ ((grpc_chttp2_transport *)((char *)(tp)-offsetof(grpc_chttp2_transport, \
+ parsing)))
+
static grpc_error *init_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing);
static grpc_error *init_header_frame_parser(
@@ -83,8 +87,8 @@ void grpc_chttp2_prepare_to_read(
transport_global->settings[GRPC_SENT_SETTINGS],
sizeof(transport_parsing->last_sent_settings));
transport_parsing->max_frame_size =
- transport_global
- ->settings[GRPC_ACKED_SETTINGS][GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
+ transport_global->settings[GRPC_ACKED_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_FRAME_SIZE];
/* update the parsing view of incoming window */
while (grpc_chttp2_list_pop_unannounced_incoming_window_available(
@@ -170,7 +174,9 @@ void grpc_chttp2_publish_reads(
while (grpc_chttp2_list_pop_parsing_seen_stream(
transport_global, transport_parsing, &stream_global, &stream_parsing)) {
if (stream_parsing->seen_error) {
- stream_global->seen_error = 1;
+ stream_global->seen_error = true;
+ stream_global->exceeded_metadata_size =
+ stream_parsing->exceeded_metadata_size;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
@@ -643,7 +649,7 @@ static void on_initial_header(void *tp, grpc_mdelem *md) {
if (md->key == GRPC_MDSTR_GRPC_STATUS && md != GRPC_MDELEM_GRPC_STATUS_0) {
/* TODO(ctiller): check for a status like " 0" */
- stream_parsing->seen_error = 1;
+ stream_parsing->seen_error = true;
}
if (md->key == GRPC_MDSTR_GRPC_TIMEOUT) {
@@ -664,8 +670,26 @@ static void on_initial_header(void *tp, grpc_mdelem *md) {
gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), *cached_timeout));
GRPC_MDELEM_UNREF(md);
} else {
- grpc_chttp2_incoming_metadata_buffer_add(
- &stream_parsing->metadata_buffer[0], md);
+ const size_t new_size =
+ stream_parsing->metadata_buffer[0].size + GRPC_MDELEM_LENGTH(md);
+ grpc_chttp2_transport_global *transport_global =
+ &TRANSPORT_FROM_PARSING(transport_parsing)->global;
+ const size_t metadata_size_limit =
+ transport_global->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
+ if (new_size > metadata_size_limit) {
+ if (!stream_parsing->exceeded_metadata_size) {
+ gpr_log(GPR_DEBUG,
+ "received initial metadata size exceeds limit (%lu vs. %lu)",
+ new_size, metadata_size_limit);
+ stream_parsing->seen_error = true;
+ stream_parsing->exceeded_metadata_size = true;
+ }
+ GRPC_MDELEM_UNREF(md);
+ } else {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &stream_parsing->metadata_buffer[0], md);
+ }
}
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
@@ -689,12 +713,30 @@ static void on_trailing_header(void *tp, grpc_mdelem *md) {
if (md->key == GRPC_MDSTR_GRPC_STATUS && md != GRPC_MDELEM_GRPC_STATUS_0) {
/* TODO(ctiller): check for a status like " 0" */
- stream_parsing->seen_error = 1;
+ stream_parsing->seen_error = true;
+ }
+
+ const size_t new_size =
+ stream_parsing->metadata_buffer[1].size + GRPC_MDELEM_LENGTH(md);
+ grpc_chttp2_transport_global *transport_global =
+ &TRANSPORT_FROM_PARSING(transport_parsing)->global;
+ const size_t metadata_size_limit =
+ transport_global->settings[GRPC_LOCAL_SETTINGS]
+ [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
+ if (new_size > metadata_size_limit) {
+ if (!stream_parsing->exceeded_metadata_size) {
+ gpr_log(GPR_DEBUG,
+ "received trailing metadata size exceeds limit (%lu vs. %lu)",
+ new_size, metadata_size_limit);
+ stream_parsing->seen_error = true;
+ stream_parsing->exceeded_metadata_size = true;
+ }
+ GRPC_MDELEM_UNREF(md);
+ } else {
+ grpc_chttp2_incoming_metadata_buffer_add(
+ &stream_parsing->metadata_buffer[1], md);
}
- grpc_chttp2_incoming_metadata_buffer_add(&stream_parsing->metadata_buffer[1],
- md);
-
grpc_chttp2_list_add_parsing_seen_stream(transport_parsing, stream_parsing);
GPR_TIMER_END("on_trailing_header", 0);
diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c
index 237611633e..766a976277 100644
--- a/src/core/lib/iomgr/tcp_posix.c
+++ b/src/core/lib/iomgr/tcp_posix.c
@@ -167,7 +167,7 @@ static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
for (i = 0; i < tcp->incoming_buffer->count; i++) {
char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
+ gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", tcp, tcp->peer_string, dump);
gpr_free(dump);
}
}
@@ -398,7 +398,7 @@ static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
for (i = 0; i < buf->count; i++) {
char *data =
gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
+ gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", tcp, tcp->peer_string, data);
gpr_free(data);
}
}
diff --git a/src/core/lib/surface/completion_queue.c b/src/core/lib/surface/completion_queue.c
index 3c04a9b734..c1a80a7ba7 100644
--- a/src/core/lib/surface/completion_queue.c
+++ b/src/core/lib/surface/completion_queue.c
@@ -228,12 +228,12 @@ void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
if (grpc_api_trace) {
- const char *errmsg = grpc_error_string(error);
+ const char *errmsg = grpc_error_string(error);
GRPC_API_TRACE(
- "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, "
- "done_arg=%p, storage=%p)",
- 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
- grpc_error_free_string(errmsg);
+ "grpc_cq_end_op(exec_ctx=%p, cc=%p, tag=%p, error=%s, done=%p, "
+ "done_arg=%p, storage=%p)",
+ 7, (exec_ctx, cc, tag, errmsg, done, done_arg, storage));
+ grpc_error_free_string(errmsg);
}
storage->tag = tag;
diff --git a/src/core/lib/surface/version.c b/src/core/lib/surface/version.c
index fe954cbefb..aca76d2bb7 100644
--- a/src/core/lib/surface/version.c
+++ b/src/core/lib/surface/version.c
@@ -36,4 +36,4 @@
#include <grpc/grpc.h>
-const char *grpc_version_string(void) { return "0.14.0-dev"; }
+const char *grpc_version_string(void) { return "0.15.0-dev"; }
diff --git a/src/core/lib/transport/metadata.h b/src/core/lib/transport/metadata.h
index 713d9e6782..6d82f4d681 100644
--- a/src/core/lib/transport/metadata.h
+++ b/src/core/lib/transport/metadata.h
@@ -147,6 +147,10 @@ const char *grpc_mdstr_as_c_string(grpc_mdstr *s);
#define GRPC_MDSTR_LENGTH(s) (GPR_SLICE_LENGTH(s->slice))
+/* We add 32 bytes of padding as per RFC-7540 section 6.5.2. */
+#define GRPC_MDELEM_LENGTH(e) \
+ (GRPC_MDSTR_LENGTH((e)->key) + GRPC_MDSTR_LENGTH((e)->value) + 32)
+
int grpc_mdstr_is_legal_header(grpc_mdstr *s);
int grpc_mdstr_is_legal_nonbin_header(grpc_mdstr *s);
int grpc_mdstr_is_bin_suffixed(grpc_mdstr *s);
diff --git a/src/core/lib/transport/metadata_batch.c b/src/core/lib/transport/metadata_batch.c
index 4567221a48..e4398abeb7 100644
--- a/src/core/lib/transport/metadata_batch.c
+++ b/src/core/lib/transport/metadata_batch.c
@@ -192,3 +192,12 @@ int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch) {
gpr_time_cmp(gpr_inf_future(batch->deadline.clock_type),
batch->deadline) == 0;
}
+
+size_t grpc_metadata_batch_size(grpc_metadata_batch *batch) {
+ size_t size = 0;
+ for (grpc_linked_mdelem *elem = batch->list.head; elem != NULL;
+ elem = elem->next) {
+ size += GRPC_MDELEM_LENGTH(elem->md);
+ }
+ return size;
+}
diff --git a/src/core/lib/transport/metadata_batch.h b/src/core/lib/transport/metadata_batch.h
index b62668876e..7af823f7ca 100644
--- a/src/core/lib/transport/metadata_batch.h
+++ b/src/core/lib/transport/metadata_batch.h
@@ -66,6 +66,9 @@ void grpc_metadata_batch_destroy(grpc_metadata_batch *batch);
void grpc_metadata_batch_clear(grpc_metadata_batch *batch);
int grpc_metadata_batch_is_empty(grpc_metadata_batch *batch);
+/* Returns the transport size of the batch. */
+size_t grpc_metadata_batch_size(grpc_metadata_batch *batch);
+
/** Moves the metadata information from \a src to \a dst. Upon return, \a src is
* zeroed. */
void grpc_metadata_batch_move(grpc_metadata_batch *dst,
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
index 058371521d..0e204761f6 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallServerTest.cs
@@ -168,7 +168,7 @@ namespace Grpc.Core.Internal.Tests
var responseStream = new ServerResponseStream<string, string>(asyncCallServer);
var writeTask = responseStream.WriteAsync("request1");
- var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata());
+ var writeStatusTask = asyncCallServer.SendStatusFromServerAsync(Status.DefaultSuccess, new Metadata(), null);
fakeCall.SendCompletionHandler(true);
fakeCall.SendStatusFromServerHandler(true);
diff --git a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
index abe9d4a2e6..777a1c8c50 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/AsyncCallTest.cs
@@ -181,13 +181,14 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void ClientStreaming_WriteFailure()
+ public void ClientStreaming_WriteCompletionFailure()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
var writeTask = requestStream.WriteAsync("request1");
fakeCall.SendCompletionHandler(false);
+ // TODO: maybe IOException or waiting for RPCException is more appropriate here.
Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await writeTask);
fakeCall.UnaryResponseClientHandler(true,
@@ -199,7 +200,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void ClientStreaming_WriteAfterReceivingStatusFails()
+ public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -210,7 +211,44 @@ namespace Grpc.Core.Internal.Tests
new Metadata());
AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
+ var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+ Assert.AreEqual(Status.DefaultSuccess, ex.Status);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
+ var ex = Assert.Throws<RpcException>(() => requestStream.WriteAsync("request1"));
+ Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
+ }
+
+ [Test]
+ public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()
+ {
+ var resultTask = asyncCall.ClientStreamingCallAsync();
+ var requestStream = new ClientRequestStream<string, string>(asyncCall);
+
+ requestStream.CompleteAsync();
+
Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
+
+ fakeCall.SendCompletionHandler(true);
+
+ fakeCall.UnaryResponseClientHandler(true,
+ new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
+ CreateResponsePayload(),
+ new Metadata());
+
+ AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
}
[Test]
@@ -229,7 +267,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void ClientStreaming_WriteAfterCancellationRequestFails()
+ public void ClientStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
var resultTask = asyncCall.ClientStreamingCallAsync();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -340,7 +378,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void DuplexStreaming_WriteAfterReceivingStatusFails()
+ public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
@@ -352,7 +390,8 @@ namespace Grpc.Core.Internal.Tests
AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
- Assert.ThrowsAsync(typeof(InvalidOperationException), async () => await requestStream.WriteAsync("request1"));
+ var ex = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("request1"));
+ Assert.AreEqual(Status.DefaultSuccess, ex.Status);
}
[Test]
@@ -372,7 +411,7 @@ namespace Grpc.Core.Internal.Tests
}
[Test]
- public void DuplexStreaming_WriteAfterCancellationRequestFails()
+ public void DuplexStreaming_WriteAfterCancellationRequestThrowsOperationCancelledException()
{
asyncCall.StartDuplexStreamingCall();
var requestStream = new ClientRequestStream<string, string>(asyncCall);
diff --git a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
index 1bec258ca2..909112a47c 100644
--- a/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
+++ b/src/csharp/Grpc.Core.Tests/Internal/FakeNativeCall.cs
@@ -165,7 +165,8 @@ namespace Grpc.Core.Internal.Tests
SendCompletionHandler = callback;
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalPayload, WriteFlags writeFlags)
{
SendStatusFromServerHandler = callback;
}
diff --git a/src/csharp/Grpc.Core/GrpcEnvironment.cs b/src/csharp/Grpc.Core/GrpcEnvironment.cs
index a5c78cc9d7..bee0ef1d62 100644
--- a/src/csharp/Grpc.Core/GrpcEnvironment.cs
+++ b/src/csharp/Grpc.Core/GrpcEnvironment.cs
@@ -45,11 +45,12 @@ namespace Grpc.Core
/// </summary>
public class GrpcEnvironment
{
- const int THREAD_POOL_SIZE = 4;
+ const int MinDefaultThreadPoolSize = 4;
static object staticLock = new object();
static GrpcEnvironment instance;
static int refCount;
+ static int? customThreadPoolSize;
static ILogger logger = new ConsoleLogger();
@@ -123,13 +124,30 @@ namespace Grpc.Core
}
/// <summary>
+ /// Sets the number of threads in the gRPC thread pool that polls for internal RPC events.
+ /// Can be only invoke before the <c>GrpcEnviroment</c> is started and cannot be changed afterwards.
+ /// Setting thread pool size is an advanced setting and you should only use it if you know what you are doing.
+ /// Most users should rely on the default value provided by gRPC library.
+ /// Note: this method is part of an experimental API that can change or be removed without any prior notice.
+ /// </summary>
+ public static void SetThreadPoolSize(int threadCount)
+ {
+ lock (staticLock)
+ {
+ GrpcPreconditions.CheckState(instance == null, "Can only be set before GrpcEnvironment is initialized");
+ GrpcPreconditions.CheckArgument(threadCount > 0, "threadCount needs to be a positive number");
+ customThreadPoolSize = threadCount;
+ }
+ }
+
+ /// <summary>
/// Creates gRPC environment.
/// </summary>
private GrpcEnvironment()
{
GrpcNativeInit();
completionRegistry = new CompletionRegistry(this);
- threadPool = new GrpcThreadPool(this, THREAD_POOL_SIZE);
+ threadPool = new GrpcThreadPool(this, GetThreadPoolSizeOrDefault());
threadPool.Start();
}
@@ -200,5 +218,17 @@ namespace Grpc.Core
debugStats.CheckOK();
}
+
+ private int GetThreadPoolSizeOrDefault()
+ {
+ if (customThreadPoolSize.HasValue)
+ {
+ return customThreadPoolSize.Value;
+ }
+ // In systems with many cores, use half of the cores for GrpcThreadPool
+ // and the other half for .NET thread pool. This heuristic definitely needs
+ // more work, but seems to work reasonably well for a start.
+ return Math.Max(MinDefaultThreadPoolSize, Environment.ProcessorCount / 2);
+ }
}
}
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCall.cs b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
index f522174bd0..55351869b5 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCall.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCall.cs
@@ -57,7 +57,7 @@ namespace Grpc.Core.Internal
// Completion of a pending unary response if not null.
TaskCompletionSource<TResponse> unaryResponseTcs;
- // Indicates that steaming call has finished.
+ // Indicates that response streaming call has finished.
TaskCompletionSource<object> streamingCallFinishedTcs = new TaskCompletionSource<object>();
// Response headers set here once received.
@@ -443,6 +443,19 @@ namespace Grpc.Core.Internal
}
}
+ protected override void CheckSendingAllowed(bool allowFinished)
+ {
+ base.CheckSendingAllowed(true);
+
+ // throwing RpcException if we already received status on client
+ // side makes the most sense.
+ // Note that this throws even for StatusCode.OK.
+ if (!allowFinished && finishedStatus.HasValue)
+ {
+ throw new RpcException(finishedStatus.Value.Status);
+ }
+ }
+
/// <summary>
/// Handles receive status completion for calls with streaming response.
/// </summary>
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
index 42234dcac2..4de23706b2 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallBase.cs
@@ -213,7 +213,7 @@ namespace Grpc.Core.Internal
{
}
- protected void CheckSendingAllowed(bool allowFinished)
+ protected virtual void CheckSendingAllowed(bool allowFinished)
{
GrpcPreconditions.CheckState(started);
CheckNotCancelled();
diff --git a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
index eafe2ccab8..b1566b44a7 100644
--- a/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
+++ b/src/csharp/Grpc.Core/Internal/AsyncCallServer.cs
@@ -139,8 +139,11 @@ namespace Grpc.Core.Internal
/// Sends call result status, indicating we are done with writes.
/// Sending a status different from StatusCode.OK will also implicitly cancel the call.
/// </summary>
- public Task SendStatusFromServerAsync(Status status, Metadata trailers)
+ public Task SendStatusFromServerAsync(Status status, Metadata trailers, Tuple<TResponse, WriteFlags> optionalWrite)
{
+ byte[] payload = optionalWrite != null ? UnsafeSerialize(optionalWrite.Item1) : null;
+ var writeFlags = optionalWrite != null ? optionalWrite.Item2 : default(WriteFlags);
+
lock (myLock)
{
GrpcPreconditions.CheckState(started);
@@ -149,11 +152,16 @@ namespace Grpc.Core.Internal
using (var metadataArray = MetadataArraySafeHandle.Create(trailers))
{
- call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent);
+ call.StartSendStatusFromServer(HandleSendStatusFromServerFinished, status, metadataArray, !initialMetadataSent,
+ payload, writeFlags);
}
halfcloseRequested = true;
initialMetadataSent = true;
sendStatusFromServerTcs = new TaskCompletionSource<object>();
+ if (optionalWrite != null)
+ {
+ streamingWritesCounter++;
+ }
return sendStatusFromServerTcs.Task;
}
}
diff --git a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
index 500653ba5d..244b97d4a4 100644
--- a/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
+++ b/src/csharp/Grpc.Core/Internal/CallSafeHandle.cs
@@ -135,13 +135,16 @@ namespace Grpc.Core.Internal
}
}
- public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata)
+ public void StartSendStatusFromServer(SendCompletionHandler callback, Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalPayload, WriteFlags writeFlags)
{
using (completionQueue.NewScope())
{
var ctx = BatchContextSafeHandle.Create();
+ var optionalPayloadLength = optionalPayload != null ? new UIntPtr((ulong)optionalPayload.Length) : UIntPtr.Zero;
completionRegistry.RegisterBatchCompletion(ctx, (success, context) => callback(success));
- Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata).CheckOk();
+ Native.grpcsharp_call_send_status_from_server(this, ctx, status.StatusCode, status.Detail, metadataArray, sendEmptyInitialMetadata,
+ optionalPayload, optionalPayloadLength, writeFlags).CheckOk();
}
}
diff --git a/src/csharp/Grpc.Core/Internal/INativeCall.cs b/src/csharp/Grpc.Core/Internal/INativeCall.cs
index cbef599139..cd3719cb50 100644
--- a/src/csharp/Grpc.Core/Internal/INativeCall.cs
+++ b/src/csharp/Grpc.Core/Internal/INativeCall.cs
@@ -78,7 +78,7 @@ namespace Grpc.Core.Internal
void StartSendCloseFromClient(SendCompletionHandler callback);
- void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ void StartSendStatusFromServer(SendCompletionHandler callback, Grpc.Core.Status status, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata, byte[] optionalPayload, Grpc.Core.WriteFlags writeFlags);
void StartServerSide(ReceivedCloseOnServerHandler callback);
}
diff --git a/src/csharp/Grpc.Core/Internal/NativeMethods.cs b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
index 9ee0ba3bc0..c277c73ef0 100644
--- a/src/csharp/Grpc.Core/Internal/NativeMethods.cs
+++ b/src/csharp/Grpc.Core/Internal/NativeMethods.cs
@@ -421,20 +421,21 @@ namespace Grpc.Core.Internal
public delegate GRPCCallError grpcsharp_call_cancel_delegate(CallSafeHandle call);
public delegate GRPCCallError grpcsharp_call_cancel_with_status_delegate(CallSafeHandle call, StatusCode status, string description);
public delegate GRPCCallError grpcsharp_call_start_unary_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_start_client_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
public delegate GRPCCallError grpcsharp_call_start_server_streaming_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_start_duplex_streaming_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx, MetadataArraySafeHandle metadataArray);
public delegate GRPCCallError grpcsharp_call_send_message_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
public delegate GRPCCallError grpcsharp_call_send_close_from_client_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
public delegate GRPCCallError grpcsharp_call_send_status_from_server_delegate(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
public delegate GRPCCallError grpcsharp_call_recv_message_delegate(CallSafeHandle call,
BatchContextSafeHandle ctx);
public delegate GRPCCallError grpcsharp_call_recv_initial_metadata_delegate(CallSafeHandle call,
@@ -593,7 +594,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_unary(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_client_streaming(CallSafeHandle call,
@@ -601,7 +602,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_start_server_streaming(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len,
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen,
MetadataArraySafeHandle metadataArray, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
@@ -610,7 +611,7 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_message(CallSafeHandle call,
- BatchContextSafeHandle ctx, byte[] send_buffer, UIntPtr send_buffer_len, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, byte[] sendBuffer, UIntPtr sendBufferLen, WriteFlags writeFlags, bool sendEmptyInitialMetadata);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_close_from_client(CallSafeHandle call,
@@ -618,7 +619,8 @@ namespace Grpc.Core.Internal
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_send_status_from_server(CallSafeHandle call,
- BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata);
+ BatchContextSafeHandle ctx, StatusCode statusCode, string statusMessage, MetadataArraySafeHandle metadataArray, bool sendEmptyInitialMetadata,
+ byte[] optionalSendBuffer, UIntPtr optionalSendBufferLen, WriteFlags writeFlags);
[DllImport("grpc_csharp_ext.dll")]
public static extern GRPCCallError grpcsharp_call_recv_message(CallSafeHandle call,
diff --git a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
index 00d82d51e8..85b7a4b01e 100644
--- a/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
+++ b/src/csharp/Grpc.Core/Internal/ServerCallHandler.cs
@@ -75,27 +75,32 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
GrpcPreconditions.CheckArgument(await requestStream.MoveNext().ConfigureAwait(false));
var request = requestStream.Current;
- var result = await handler(request, context).ConfigureAwait(false);
+ var response = await handler(request, context).ConfigureAwait(false);
status = context.Status;
- await responseStream.WriteAsync(result).ConfigureAwait(false);
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -139,17 +144,21 @@ namespace Grpc.Core.Internal
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -183,33 +192,31 @@ namespace Grpc.Core.Internal
var responseStream = new ServerResponseStream<TRequest, TResponse>(asyncCall);
Status status;
+ Tuple<TResponse,WriteFlags> responseTuple = null;
var context = HandlerUtils.NewContext(newRpc, asyncCall.Peer, responseStream, asyncCall.CancellationToken);
try
{
- var result = await handler(requestStream, context).ConfigureAwait(false);
+ var response = await handler(requestStream, context).ConfigureAwait(false);
status = context.Status;
- try
- {
- await responseStream.WriteAsync(result).ConfigureAwait(false);
- }
- catch (OperationCanceledException)
- {
- status = Status.DefaultCancelled;
- }
+ responseTuple = Tuple.Create(response, HandlerUtils.GetWriteFlags(context.WriteOptions));
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, responseTuple).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -251,16 +258,20 @@ namespace Grpc.Core.Internal
}
catch (Exception e)
{
- Logger.Error(e, "Exception occured in handler.");
+ if (!(e is RpcException))
+ {
+ Logger.Warning(e, "Exception occured in handler.");
+ }
status = HandlerUtils.StatusFromException(e);
}
try
{
- await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(status, context.ResponseTrailers, null).ConfigureAwait(false);
}
- catch (OperationCanceledException)
+ catch (Exception)
{
- // Call has been already cancelled.
+ asyncCall.Cancel();
+ throw;
}
await finishedTask.ConfigureAwait(false);
}
@@ -278,7 +289,7 @@ namespace Grpc.Core.Internal
asyncCall.Initialize(newRpc.Call);
var finishedTask = asyncCall.ServerSideCallAsync();
- await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty).ConfigureAwait(false);
+ await asyncCall.SendStatusFromServerAsync(new Status(StatusCode.Unimplemented, ""), Metadata.Empty, null).ConfigureAwait(false);
await finishedTask.ConfigureAwait(false);
}
}
@@ -297,6 +308,11 @@ namespace Grpc.Core.Internal
return new Status(StatusCode.Unknown, "Exception was thrown by handler.");
}
+ public static WriteFlags GetWriteFlags(WriteOptions writeOptions)
+ {
+ return writeOptions != null ? writeOptions.Flags : default(WriteFlags);
+ }
+
public static ServerCallContext NewContext<TRequest, TResponse>(ServerRpcNew newRpc, string peer, ServerResponseStream<TRequest, TResponse> serverResponseStream, CancellationToken cancellationToken)
where TRequest : class
where TResponse : class
diff --git a/src/csharp/Grpc.Core/Server.cs b/src/csharp/Grpc.Core/Server.cs
index 5b61b7f060..d538a4671f 100644
--- a/src/csharp/Grpc.Core/Server.cs
+++ b/src/csharp/Grpc.Core/Server.cs
@@ -48,6 +48,7 @@ namespace Grpc.Core
/// </summary>
public class Server
{
+ const int InitialAllowRpcTokenCount = 10;
static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<Server>();
readonly AtomicCounter activeCallCounter = new AtomicCounter();
@@ -65,7 +66,7 @@ namespace Grpc.Core
readonly TaskCompletionSource<object> shutdownTcs = new TaskCompletionSource<object>();
bool startRequested;
- bool shutdownRequested;
+ volatile bool shutdownRequested;
/// <summary>
/// Create a new server.
@@ -129,7 +130,13 @@ namespace Grpc.Core
startRequested = true;
handle.Start();
- AllowOneRpc();
+
+ // Starting with more than one AllowOneRpc tokens can significantly increase
+ // unary RPC throughput.
+ for (int i = 0; i < InitialAllowRpcTokenCount; i++)
+ {
+ AllowOneRpc();
+ }
}
}
@@ -239,12 +246,9 @@ namespace Grpc.Core
/// </summary>
private void AllowOneRpc()
{
- lock (myLock)
+ if (!shutdownRequested)
{
- if (!shutdownRequested)
- {
- handle.RequestCall(HandleNewServerRpc, environment);
- }
+ handle.RequestCall(HandleNewServerRpc, environment);
}
}
@@ -283,6 +287,8 @@ namespace Grpc.Core
/// </summary>
private void HandleNewServerRpc(bool success, BatchContextSafeHandle ctx)
{
+ Task.Run(() => AllowOneRpc());
+
if (success)
{
ServerRpcNew newRpc = ctx.GetServerRpcNew(this);
@@ -290,11 +296,9 @@ namespace Grpc.Core
// after server shutdown, the callback returns with null call
if (!newRpc.Call.IsInvalid)
{
- Task.Run(async () => await HandleCallAsync(newRpc)).ConfigureAwait(false);
+ HandleCallAsync(newRpc); // we don't need to await.
}
}
-
- AllowOneRpc();
}
/// <summary>
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index f7a9cb9c1c..e1609341d9 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -48,11 +48,11 @@ namespace Grpc.Core
/// <summary>
/// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies
/// </summary>
- public const string CurrentAssemblyFileVersion = "0.14.0.0";
+ public const string CurrentAssemblyFileVersion = "0.15.0.0";
/// <summary>
/// Current version of gRPC C#
/// </summary>
- public const string CurrentVersion = "0.14.0-dev";
+ public const string CurrentVersion = "0.15.0-dev";
}
}
diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs
index 2d3034d28b..d700a18778 100644
--- a/src/csharp/Grpc.Examples/MathGrpc.cs
+++ b/src/csharp/Grpc.Examples/MathGrpc.cs
@@ -151,25 +151,25 @@ namespace Math {
/// Div divides args.dividend by args.divisor and returns the quotient and
/// remainder.
/// </summary>
- Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context);
/// <summary>
/// DivMany accepts an arbitrary number of division args from the client stream
/// and sends back the results in the reply stream. The stream continues until
/// the client closes its end; the server does the same after sending all the
/// replies. The stream ends immediately if either end aborts.
/// </summary>
- Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context);
/// <summary>
/// Fib generates numbers in the Fibonacci sequence. If args.limit > 0, Fib
/// generates up to limit numbers; otherwise it continues until the call is
/// canceled. Unlike Fib above, Fib has no final FibReply.
/// </summary>
- Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context);
/// <summary>
/// Sum sums a stream of numbers, returning the final result once the stream
/// is closed.
/// </summary>
- Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context);
}
/// <summary>Base class for server-side implementations of Math</summary>
@@ -179,7 +179,7 @@ namespace Math {
/// Div divides args.dividend by args.divisor and returns the quotient and
/// remainder.
/// </summary>
- public virtual Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Math.DivReply> Div(global::Math.DivArgs request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -190,7 +190,7 @@ namespace Math {
/// the client closes its end; the server does the same after sending all the
/// replies. The stream ends immediately if either end aborts.
/// </summary>
- public virtual Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task DivMany(IAsyncStreamReader<global::Math.DivArgs> requestStream, IServerStreamWriter<global::Math.DivReply> responseStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -200,7 +200,7 @@ namespace Math {
/// generates up to limit numbers; otherwise it continues until the call is
/// canceled. Unlike Fib above, Fib has no final FibReply.
/// </summary>
- public virtual Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task Fib(global::Math.FibArgs request, IServerStreamWriter<global::Math.Num> responseStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -209,7 +209,7 @@ namespace Math {
/// Sum sums a stream of numbers, returning the final result once the stream
/// is closed.
/// </summary>
- public virtual Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Math.Num> Sum(IAsyncStreamReader<global::Math.Num> requestStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
diff --git a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs
index 967d1170be..51c6a39b1d 100644
--- a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs
+++ b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs
@@ -72,13 +72,13 @@ namespace Grpc.Health.V1 {
[System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
public interface IHealth
{
- Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context);
}
/// <summary>Base class for server-side implementations of Health</summary>
public abstract class HealthBase
{
- public virtual Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Health.V1.HealthCheckResponse> Check(global::Grpc.Health.V1.HealthCheckRequest request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
index b4572756f2..9eaf6bf7ce 100644
--- a/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ClientRunners.cs
@@ -142,8 +142,7 @@ namespace Grpc.IntegrationTesting
for (int i = 0; i < outstandingRpcsPerChannel; i++)
{
var timer = CreateTimer(loadParams, 1.0 / this.channels.Count / outstandingRpcsPerChannel);
- var threadBody = GetThreadBody(channel, timer);
- this.runnerTasks.Add(Task.Factory.StartNew(threadBody, TaskCreationOptions.LongRunning));
+ this.runnerTasks.Add(RunClientAsync(channel, timer));
}
}
}
@@ -269,38 +268,30 @@ namespace Grpc.IntegrationTesting
}
}
- private Action GetThreadBody(Channel channel, IInterarrivalTimer timer)
+ private Task RunClientAsync(Channel channel, IInterarrivalTimer timer)
{
if (payloadConfig.PayloadCase == PayloadConfig.PayloadOneofCase.BytebufParams)
{
GrpcPreconditions.CheckArgument(clientType == ClientType.ASYNC_CLIENT, "Generic client only supports async API");
GrpcPreconditions.CheckArgument(rpcType == RpcType.STREAMING, "Generic client only supports streaming calls");
- return () =>
- {
- RunGenericStreamingAsync(channel, timer).Wait();
- };
+ return RunGenericStreamingAsync(channel, timer);
}
GrpcPreconditions.CheckNotNull(payloadConfig.SimpleParams);
if (clientType == ClientType.SYNC_CLIENT)
{
GrpcPreconditions.CheckArgument(rpcType == RpcType.UNARY, "Sync client can only be used for Unary calls in C#");
- return () => RunUnary(channel, timer);
+ // create a dedicated thread for the synchronous client
+ return Task.Factory.StartNew(() => RunUnary(channel, timer), TaskCreationOptions.LongRunning);
}
else if (clientType == ClientType.ASYNC_CLIENT)
{
switch (rpcType)
{
case RpcType.UNARY:
- return () =>
- {
- RunUnaryAsync(channel, timer).Wait();
- };
+ return RunUnaryAsync(channel, timer);
case RpcType.STREAMING:
- return () =>
- {
- RunStreamingPingPongAsync(channel, timer).Wait();
- };
+ return RunStreamingPingPongAsync(channel, timer);
}
}
throw new ArgumentException("Unsupported configuration.");
diff --git a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
index b3b1abf1bc..cff8508631 100644
--- a/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
+++ b/src/csharp/Grpc.IntegrationTesting/InteropClient.cs
@@ -492,6 +492,10 @@ namespace Grpc.IntegrationTesting
{
// Deadline was reached before write has started. Eat the exception and continue.
}
+ catch (RpcException)
+ {
+ // Deadline was reached before write has started. Eat the exception and continue.
+ }
var ex = Assert.ThrowsAsync<RpcException>(async () => await call.ResponseStream.MoveNext());
// We can't guarantee the status code always DeadlineExceeded. See issue #2685.
diff --git a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs
index aa4f1c5c3e..9d31d1c514 100644
--- a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs
@@ -112,11 +112,11 @@ namespace Grpc.Testing {
/// Returns the values of all the gauges that are currently being maintained by
/// the service
/// </summary>
- Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context);
/// <summary>
/// Returns the value of one gauge
/// </summary>
- Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context);
}
/// <summary>Base class for server-side implementations of MetricsService</summary>
@@ -126,7 +126,7 @@ namespace Grpc.Testing {
/// Returns the values of all the gauges that are currently being maintained by
/// the service
/// </summary>
- public virtual Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task GetAllGauges(global::Grpc.Testing.EmptyMessage request, IServerStreamWriter<global::Grpc.Testing.GaugeResponse> responseStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -134,7 +134,7 @@ namespace Grpc.Testing {
/// <summary>
/// Returns the value of one gauge
/// </summary>
- public virtual Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.GaugeResponse> GetGauge(global::Grpc.Testing.GaugeRequest request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
diff --git a/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs b/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs
index 42bf5e0b58..f7071ebf6b 100644
--- a/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/ServicesGrpc.cs
@@ -111,12 +111,12 @@ namespace Grpc.Testing {
/// One request followed by one response.
/// The server returns the client payload as-is.
/// </summary>
- Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context);
/// <summary>
/// One request followed by one response.
/// The server returns the client payload as-is.
/// </summary>
- Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context);
}
/// <summary>Base class for server-side implementations of BenchmarkService</summary>
@@ -126,7 +126,7 @@ namespace Grpc.Testing {
/// One request followed by one response.
/// The server returns the client payload as-is.
/// </summary>
- public virtual Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -135,7 +135,7 @@ namespace Grpc.Testing {
/// One request followed by one response.
/// The server returns the client payload as-is.
/// </summary>
- public virtual Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task StreamingCall(IAsyncStreamReader<global::Grpc.Testing.SimpleRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.SimpleResponse> responseStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -375,7 +375,7 @@ namespace Grpc.Testing {
/// and once the shutdown has finished, the OK status is sent to terminate
/// this RPC.
/// </summary>
- Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context);
/// <summary>
/// Start client with specified workload.
/// First request sent specifies the ClientConfig followed by ClientStatus
@@ -384,15 +384,15 @@ namespace Grpc.Testing {
/// and once the shutdown has finished, the OK status is sent to terminate
/// this RPC.
/// </summary>
- Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context);
/// <summary>
/// Just return the core count - unary call
/// </summary>
- Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context);
/// <summary>
/// Quit this worker
/// </summary>
- Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context);
}
/// <summary>Base class for server-side implementations of WorkerService</summary>
@@ -406,7 +406,7 @@ namespace Grpc.Testing {
/// and once the shutdown has finished, the OK status is sent to terminate
/// this RPC.
/// </summary>
- public virtual Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task RunServer(IAsyncStreamReader<global::Grpc.Testing.ServerArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ServerStatus> responseStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -419,7 +419,7 @@ namespace Grpc.Testing {
/// and once the shutdown has finished, the OK status is sent to terminate
/// this RPC.
/// </summary>
- public virtual Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task RunClient(IAsyncStreamReader<global::Grpc.Testing.ClientArgs> requestStream, IServerStreamWriter<global::Grpc.Testing.ClientStatus> responseStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -427,7 +427,7 @@ namespace Grpc.Testing {
/// <summary>
/// Just return the core count - unary call
/// </summary>
- public virtual Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.CoreResponse> CoreCount(global::Grpc.Testing.CoreRequest request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -435,7 +435,7 @@ namespace Grpc.Testing {
/// <summary>
/// Quit this worker
/// </summary>
- public virtual Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Void> QuitWorker(global::Grpc.Testing.Void request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
index f1878cbb55..cf43a77118 100644
--- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
+++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs
@@ -196,34 +196,34 @@ namespace Grpc.Testing {
/// <summary>
/// One empty request followed by one empty response.
/// </summary>
- Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context);
/// <summary>
/// One request followed by one response.
/// </summary>
- Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context);
/// <summary>
/// One request followed by a sequence of responses (streamed download).
/// The server returns the payload with client desired type and sizes.
/// </summary>
- Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context);
/// <summary>
/// A sequence of requests followed by one response (streamed upload).
/// The server returns the aggregated size of client payload as the result.
/// </summary>
- Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context);
/// <summary>
/// A sequence of requests with each request served by the server immediately.
/// As one request could lead to multiple responses, this interface
/// demonstrates the idea of full duplexing.
/// </summary>
- Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context);
/// <summary>
/// A sequence of requests followed by a sequence of responses.
/// The server buffers all the client requests and then serves them in order. A
/// stream of responses are returned to the client when the server starts with
/// first request.
/// </summary>
- Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context);
+ global::System.Threading.Tasks.Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context);
}
/// <summary>Base class for server-side implementations of TestService</summary>
@@ -232,7 +232,7 @@ namespace Grpc.Testing {
/// <summary>
/// One empty request followed by one empty response.
/// </summary>
- public virtual Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> EmptyCall(global::Grpc.Testing.Empty request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -240,7 +240,7 @@ namespace Grpc.Testing {
/// <summary>
/// One request followed by one response.
/// </summary>
- public virtual Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.SimpleResponse> UnaryCall(global::Grpc.Testing.SimpleRequest request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -249,7 +249,7 @@ namespace Grpc.Testing {
/// One request followed by a sequence of responses (streamed download).
/// The server returns the payload with client desired type and sizes.
/// </summary>
- public virtual Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task StreamingOutputCall(global::Grpc.Testing.StreamingOutputCallRequest request, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -258,7 +258,7 @@ namespace Grpc.Testing {
/// A sequence of requests followed by one response (streamed upload).
/// The server returns the aggregated size of client payload as the result.
/// </summary>
- public virtual Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.StreamingInputCallResponse> StreamingInputCall(IAsyncStreamReader<global::Grpc.Testing.StreamingInputCallRequest> requestStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -268,7 +268,7 @@ namespace Grpc.Testing {
/// As one request could lead to multiple responses, this interface
/// demonstrates the idea of full duplexing.
/// </summary>
- public virtual Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task FullDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -279,7 +279,7 @@ namespace Grpc.Testing {
/// stream of responses are returned to the client when the server starts with
/// first request.
/// </summary>
- public virtual Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task HalfDuplexCall(IAsyncStreamReader<global::Grpc.Testing.StreamingOutputCallRequest> requestStream, IServerStreamWriter<global::Grpc.Testing.StreamingOutputCallResponse> responseStream, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -525,7 +525,7 @@ namespace Grpc.Testing {
/// <summary>
/// A call that no server should implement
/// </summary>
- Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context);
}
/// <summary>Base class for server-side implementations of UnimplementedService</summary>
@@ -534,7 +534,7 @@ namespace Grpc.Testing {
/// <summary>
/// A call that no server should implement
/// </summary>
- public virtual Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> UnimplementedCall(global::Grpc.Testing.Empty request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
@@ -669,19 +669,19 @@ namespace Grpc.Testing {
[System.Obsolete("Service implementations should inherit from the generated abstract base class instead.")]
public interface IReconnectService
{
- Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context);
- Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context);
+ global::System.Threading.Tasks.Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context);
}
/// <summary>Base class for server-side implementations of ReconnectService</summary>
public abstract class ReconnectServiceBase
{
- public virtual Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.Empty> Start(global::Grpc.Testing.ReconnectParams request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
- public virtual Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context)
+ public virtual global::System.Threading.Tasks.Task<global::Grpc.Testing.ReconnectInfo> Stop(global::Grpc.Testing.Empty request, ServerCallContext context)
{
throw new RpcException(new Status(StatusCode.Unimplemented, ""));
}
diff --git a/src/csharp/build_packages.bat b/src/csharp/build_packages.bat
index 9a60be26b6..7520b0f81a 100644
--- a/src/csharp/build_packages.bat
+++ b/src/csharp/build_packages.bat
@@ -1,7 +1,7 @@
@rem Builds gRPC NuGet packages
@rem Current package versions
-set VERSION=0.14.0-dev
+set VERSION=0.15.0-dev
set PROTOBUF_VERSION=3.0.0-beta2
@rem Packages that depend on prerelease packages (like Google.Protobuf) need to have prerelease suffix as well.
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index aeef8a79e9..5b8ff9b819 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -715,10 +715,11 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
grpc_call *call, grpcsharp_batch_context *ctx, grpc_status_code status_code,
const char *status_details, grpc_metadata_array *trailing_metadata,
- int32_t send_empty_initial_metadata) {
+ int32_t send_empty_initial_metadata, const char* optional_send_buffer,
+ size_t optional_send_buffer_len, uint32_t write_flags) {
/* TODO: don't use magic number */
- grpc_op ops[2];
- size_t nops = send_empty_initial_metadata ? 2 : 1;
+ grpc_op ops[3];
+ size_t nops = 1;
ops[0].op = GRPC_OP_SEND_STATUS_FROM_SERVER;
ops[0].data.send_status_from_server.status = status_code;
ops[0].data.send_status_from_server.status_details =
@@ -731,12 +732,23 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ctx->send_status_from_server.trailing_metadata.metadata;
ops[0].flags = 0;
ops[0].reserved = NULL;
- ops[1].op = GRPC_OP_SEND_INITIAL_METADATA;
- ops[1].data.send_initial_metadata.count = 0;
- ops[1].data.send_initial_metadata.metadata = NULL;
- ops[1].flags = 0;
- ops[1].reserved = NULL;
-
+ if (optional_send_buffer) {
+ ops[nops].op = GRPC_OP_SEND_MESSAGE;
+ ctx->send_message = string_to_byte_buffer(optional_send_buffer,
+ optional_send_buffer_len);
+ ops[nops].data.send_message = ctx->send_message;
+ ops[nops].flags = write_flags;
+ ops[nops].reserved = NULL;
+ nops ++;
+ }
+ if (send_empty_initial_metadata) {
+ ops[nops].op = GRPC_OP_SEND_INITIAL_METADATA;
+ ops[nops].data.send_initial_metadata.count = 0;
+ ops[nops].data.send_initial_metadata.metadata = NULL;
+ ops[nops].flags = 0;
+ ops[nops].reserved = NULL;
+ nops++;
+ }
return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
diff --git a/src/node/test/math/math_server.js b/src/node/test/math/math_server.js
index fa05ed0165..5e3d1dd864 100644
--- a/src/node/test/math/math_server.js
+++ b/src/node/test/math/math_server.js
@@ -68,7 +68,7 @@ function mathDiv(call, cb) {
function mathFib(stream) {
// Here, call is a standard writable Node object Stream
var previous = 0, current = 1;
- for (var i = 0; i < stream.request.limit; i++) {
+ for (var i = 0; i < stream.request.getLimit(); i++) {
var response = new math.Num();
response.setNum(current);
stream.write(response);
diff --git a/src/node/tools/bin/protoc.js b/src/node/tools/bin/protoc.js
index 4d50c94b0f..53fc5dc428 100755
--- a/src/node/tools/bin/protoc.js
+++ b/src/node/tools/bin/protoc.js
@@ -47,10 +47,11 @@ var exe_ext = process.platform === 'win32' ? '.exe' : '';
var protoc = path.resolve(__dirname, 'protoc' + exe_ext);
-execFile(protoc, process.argv.slice(2), function(error, stdout, stderr) {
+var child_process = execFile(protoc, process.argv.slice(2), function(error, stdout, stderr) {
if (error) {
throw error;
}
- console.log(stdout);
- console.log(stderr);
});
+
+child_process.stdout.pipe(process.stdout);
+child_process.stderr.pipe(process.stderr);
diff --git a/src/node/tools/bin/protoc_plugin.js b/src/node/tools/bin/protoc_plugin.js
index 281ec0d85e..857882e1c3 100755
--- a/src/node/tools/bin/protoc_plugin.js
+++ b/src/node/tools/bin/protoc_plugin.js
@@ -47,10 +47,12 @@ var exe_ext = process.platform === 'win32' ? '.exe' : '';
var plugin = path.resolve(__dirname, 'grpc_node_plugin' + exe_ext);
-execFile(plugin, process.argv.slice(2), function(error, stdout, stderr) {
+var child_process = execFile(plugin, process.argv.slice(2), {encoding: 'buffer'}, function(error, stdout, stderr) {
if (error) {
throw error;
}
- console.log(stdout);
- console.log(stderr);
});
+
+process.stdin.pipe(child_process.stdin);
+child_process.stdout.pipe(process.stdout);
+child_process.stderr.pipe(process.stderr);
diff --git a/src/node/tools/package.json b/src/node/tools/package.json
index d98ed0b1fc..d4849c2e38 100644
--- a/src/node/tools/package.json
+++ b/src/node/tools/package.json
@@ -1,6 +1,6 @@
{
"name": "grpc-tools",
- "version": "0.14.0-dev",
+ "version": "0.15.0-dev",
"author": "Google Inc.",
"description": "Tools for developing with gRPC on Node.js",
"homepage": "http://www.grpc.io/",
@@ -16,8 +16,8 @@
}
],
"bin": {
- "grpc-tools-protoc": "./bin/protoc.js",
- "grpc-tools-plugin": "./bin/protoc_plugin.js"
+ "grpc_tools_node_protoc": "./bin/protoc.js",
+ "grpc_tools_node_protoc_plugin": "./bin/protoc_plugin.js"
},
"scripts": {
"install": "./node_modules/.bin/node-pre-gyp install"
diff --git a/src/objective-c/BoringSSL.podspec b/src/objective-c/BoringSSL.podspec
index 4a6df910a7..7d1de80716 100644
--- a/src/objective-c/BoringSSL.podspec
+++ b/src/objective-c/BoringSSL.podspec
@@ -31,7 +31,7 @@
Pod::Spec.new do |s|
s.name = 'BoringSSL'
- s.version = '2.0'
+ s.version = '3.0'
s.summary = 'BoringSSL is a fork of OpenSSL that is designed to meet Google’s needs.'
# Adapted from the homepage:
s.description = <<-DESC
@@ -67,7 +67,7 @@ Pod::Spec.new do |s|
s.authors = 'Adam Langley', 'David Benjamin', 'Matt Braithwaite'
s.source = { :git => 'https://boringssl.googlesource.com/boringssl',
- :tag => 'version_for_cocoapods_2.0' }
+ :tag => 'version_for_cocoapods_3.0' }
s.source_files = 'ssl/*.{h,c}',
'ssl/**/*.{h,c}',
diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php
index aebf80f6bf..565bfce74f 100755
--- a/src/php/tests/interop/interop_client.php
+++ b/src/php/tests/interop/interop_client.php
@@ -1,7 +1,7 @@
<?php
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -388,141 +388,158 @@ function timeoutOnSleepingServer($stub)
'Call status was not DEADLINE_EXCEEDED');
}
-$args = getopt('', ['server_host:', 'server_port:', 'test_case:',
- 'use_tls::', 'use_test_ca::',
- 'server_host_override:', 'oauth_scope:',
- 'default_service_account:', ]);
-if (!array_key_exists('server_host', $args)) {
- throw new Exception('Missing argument: --server_host is required');
-}
-if (!array_key_exists('server_port', $args)) {
- throw new Exception('Missing argument: --server_port is required');
-}
-if (!array_key_exists('test_case', $args)) {
- throw new Exception('Missing argument: --test_case is required');
-}
-
-if ($args['server_port'] == 443) {
- $server_address = $args['server_host'];
-} else {
- $server_address = $args['server_host'].':'.$args['server_port'];
-}
+function _makeStub($args)
+{
+ if (!array_key_exists('server_host', $args)) {
+ throw new Exception('Missing argument: --server_host is required');
+ }
+ if (!array_key_exists('server_port', $args)) {
+ throw new Exception('Missing argument: --server_port is required');
+ }
+ if (!array_key_exists('test_case', $args)) {
+ throw new Exception('Missing argument: --test_case is required');
+ }
-$test_case = $args['test_case'];
+ if ($args['server_port'] == 443) {
+ $server_address = $args['server_host'];
+ } else {
+ $server_address = $args['server_host'].':'.$args['server_port'];
+ }
-$host_override = 'foo.test.google.fr';
-if (array_key_exists('server_host_override', $args)) {
- $host_override = $args['server_host_override'];
-}
+ $test_case = $args['test_case'];
-$use_tls = false;
-if (array_key_exists('use_tls', $args) &&
- $args['use_tls'] != 'false') {
- $use_tls = true;
-}
+ $host_override = 'foo.test.google.fr';
+ if (array_key_exists('server_host_override', $args)) {
+ $host_override = $args['server_host_override'];
+ }
-$use_test_ca = false;
-if (array_key_exists('use_test_ca', $args) &&
- $args['use_test_ca'] != 'false') {
- $use_test_ca = true;
-}
+ $use_tls = false;
+ if (array_key_exists('use_tls', $args) &&
+ $args['use_tls'] != 'false') {
+ $use_tls = true;
+ }
-$opts = [];
+ $use_test_ca = false;
+ if (array_key_exists('use_test_ca', $args) &&
+ $args['use_test_ca'] != 'false') {
+ $use_test_ca = true;
+ }
-if ($use_tls) {
- if ($use_test_ca) {
- $ssl_credentials = Grpc\ChannelCredentials::createSsl(
- file_get_contents(dirname(__FILE__).'/../data/ca.pem'));
+ $opts = [];
+
+ if ($use_tls) {
+ if ($use_test_ca) {
+ $ssl_credentials = Grpc\ChannelCredentials::createSsl(
+ file_get_contents(dirname(__FILE__).'/../data/ca.pem'));
+ } else {
+ $ssl_credentials = Grpc\ChannelCredentials::createSsl();
+ }
+ $opts['credentials'] = $ssl_credentials;
+ $opts['grpc.ssl_target_name_override'] = $host_override;
} else {
- $ssl_credentials = Grpc\ChannelCredentials::createSsl();
+ $opts['credentials'] = Grpc\ChannelCredentials::createInsecure();
}
- $opts['credentials'] = $ssl_credentials;
- $opts['grpc.ssl_target_name_override'] = $host_override;
-} else {
- $opts['credentials'] = Grpc\ChannelCredentials::createInsecure();
-}
-if (in_array($test_case, ['service_account_creds',
- 'compute_engine_creds', 'jwt_token_creds', ])) {
- if ($test_case == 'jwt_token_creds') {
- $auth_credentials = ApplicationDefaultCredentials::getCredentials();
- } else {
+ if (in_array($test_case, ['service_account_creds',
+ 'compute_engine_creds', 'jwt_token_creds', ])) {
+ if ($test_case == 'jwt_token_creds') {
+ $auth_credentials = ApplicationDefaultCredentials::getCredentials();
+ } else {
+ $auth_credentials = ApplicationDefaultCredentials::getCredentials(
+ $args['oauth_scope']
+ );
+ }
+ $opts['update_metadata'] = $auth_credentials->getUpdateMetadataFunc();
+ }
+
+ if ($test_case == 'oauth2_auth_token') {
$auth_credentials = ApplicationDefaultCredentials::getCredentials(
$args['oauth_scope']
);
+ $token = $auth_credentials->fetchAuthToken();
+ $update_metadata =
+ function ($metadata,
+ $authUri = null,
+ ClientInterface $client = null) use ($token) {
+ $metadata_copy = $metadata;
+ $metadata_copy[CredentialsLoader::AUTH_METADATA_KEY] =
+ [sprintf('%s %s',
+ $token['token_type'],
+ $token['access_token'])];
+
+ return $metadata_copy;
+ };
+ $opts['update_metadata'] = $update_metadata;
}
- $opts['update_metadata'] = $auth_credentials->getUpdateMetadataFunc();
+
+ $stub = new grpc\testing\TestServiceClient($server_address, $opts);
+
+ return $stub;
}
-if ($test_case == 'oauth2_auth_token') {
- $auth_credentials = ApplicationDefaultCredentials::getCredentials(
- $args['oauth_scope']
- );
- $token = $auth_credentials->fetchAuthToken();
- $update_metadata =
- function ($metadata,
- $authUri = null,
- ClientInterface $client = null) use ($token) {
- $metadata_copy = $metadata;
- $metadata_copy[CredentialsLoader::AUTH_METADATA_KEY] =
- [sprintf('%s %s',
- $token['token_type'],
- $token['access_token'])];
-
- return $metadata_copy;
- };
- $opts['update_metadata'] = $update_metadata;
+function interop_main($args, $stub = false) {
+ if (!$stub) {
+ $stub = _makeStub($args);
+ }
+
+ $test_case = $args['test_case'];
+ echo "Running test case $test_case\n";
+
+ switch ($test_case) {
+ case 'empty_unary':
+ emptyUnary($stub);
+ break;
+ case 'large_unary':
+ largeUnary($stub);
+ break;
+ case 'client_streaming':
+ clientStreaming($stub);
+ break;
+ case 'server_streaming':
+ serverStreaming($stub);
+ break;
+ case 'ping_pong':
+ pingPong($stub);
+ break;
+ case 'empty_stream':
+ emptyStream($stub);
+ break;
+ case 'cancel_after_begin':
+ cancelAfterBegin($stub);
+ break;
+ case 'cancel_after_first_response':
+ cancelAfterFirstResponse($stub);
+ break;
+ case 'timeout_on_sleeping_server':
+ timeoutOnSleepingServer($stub);
+ break;
+ case 'service_account_creds':
+ serviceAccountCreds($stub, $args);
+ break;
+ case 'compute_engine_creds':
+ computeEngineCreds($stub, $args);
+ break;
+ case 'jwt_token_creds':
+ jwtTokenCreds($stub, $args);
+ break;
+ case 'oauth2_auth_token':
+ oauth2AuthToken($stub, $args);
+ break;
+ case 'per_rpc_creds':
+ perRpcCreds($stub, $args);
+ break;
+ default:
+ echo "Unsupported test case $test_case\n";
+ exit(1);
+ }
+
+ return $stub;
}
-$stub = new grpc\testing\TestServiceClient($server_address, $opts);
-
-echo "Connecting to $server_address\n";
-echo "Running test case $test_case\n";
-
-switch ($test_case) {
- case 'empty_unary':
- emptyUnary($stub);
- break;
- case 'large_unary':
- largeUnary($stub);
- break;
- case 'client_streaming':
- clientStreaming($stub);
- break;
- case 'server_streaming':
- serverStreaming($stub);
- break;
- case 'ping_pong':
- pingPong($stub);
- break;
- case 'empty_stream':
- emptyStream($stub);
- break;
- case 'cancel_after_begin':
- cancelAfterBegin($stub);
- break;
- case 'cancel_after_first_response':
- cancelAfterFirstResponse($stub);
- break;
- case 'timeout_on_sleeping_server':
- timeoutOnSleepingServer($stub);
- break;
- case 'service_account_creds':
- serviceAccountCreds($stub, $args);
- break;
- case 'compute_engine_creds':
- computeEngineCreds($stub, $args);
- break;
- case 'jwt_token_creds':
- jwtTokenCreds($stub, $args);
- break;
- case 'oauth2_auth_token':
- oauth2AuthToken($stub, $args);
- break;
- case 'per_rpc_creds':
- perRpcCreds($stub, $args);
- break;
- default:
- echo "Unsupported test case $test_case\n";
- exit(1);
+if (isset($_SERVER['PHP_SELF']) && preg_match('/interop_client/', $_SERVER['PHP_SELF'])) {
+ $args = getopt('', ['server_host:', 'server_port:', 'test_case:',
+ 'use_tls::', 'use_test_ca::',
+ 'server_host_override:', 'oauth_scope:',
+ 'default_service_account:', ]);
+ interop_main($args);
}
diff --git a/src/php/tests/interop/metrics_client.php b/src/php/tests/interop/metrics_client.php
new file mode 100644
index 0000000000..46f4212f77
--- /dev/null
+++ b/src/php/tests/interop/metrics_client.php
@@ -0,0 +1,49 @@
+<?php
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+$args = getopt('', ['metrics_server_address:', 'total_only::']);
+$parts = explode(':', $args['metrics_server_address']);
+$server_host = $parts[0];
+$server_port = (count($parts) == 2) ? $parts[1] : '';
+
+$socket = socket_create(AF_INET, SOCK_STREAM, 0);
+if (@!socket_connect($socket, $server_host, $server_port)) {
+ echo "Cannot connect to merics server...\n";
+ exit(1);
+}
+socket_write($socket, 'qps');
+while ($out = socket_read($socket, 1024)) {
+ echo "$out\n";
+}
+socket_close($socket);
diff --git a/src/php/tests/interop/stress_client.php b/src/php/tests/interop/stress_client.php
new file mode 100644
index 0000000000..2339f0d105
--- /dev/null
+++ b/src/php/tests/interop/stress_client.php
@@ -0,0 +1,116 @@
+<?php
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+include_once('interop_client.php');
+
+function stress_main($args) {
+ mt_srand();
+ set_time_limit(0);
+
+ // open socket to listen as metrics server
+ $socket = socket_create(AF_INET, SOCK_STREAM, 0);
+ socket_set_option($socket, SOL_SOCKET, SO_REUSEADDR, 1);
+ if (@!socket_bind($socket, 'localhost', $args['metrics_port'])) {
+ echo "Cannot create socket for metrics server...\n";
+ exit(1);
+ }
+ socket_listen($socket);
+ socket_set_nonblock($socket);
+
+ $start_time = microtime(true);
+ $count = 0;
+ $deadline = $args['test_duration_secs'] ?
+ ($start_time + $args['test_duration_secs']) : false;
+ $num_test_cases = count($args['test_cases']);
+ $stub = false;
+
+ while (true) {
+ $current_time = microtime(true);
+ if ($deadline && $current_time > $deadline) {
+ break;
+ }
+ if ($client_connection = socket_accept($socket)) {
+ // there is an incoming request, respond with qps metrics
+ $input = socket_read($client_connection, 1024);
+ $qps = round($count / ($current_time - $start_time));
+ socket_write($client_connection, "qps: $qps");
+ socket_close($client_connection);
+ } else {
+ // do actual work, run one interop test case
+ $args['test_case'] =
+ $args['test_cases'][mt_rand(0, $num_test_cases - 1)];
+ $stub = @interop_main($args, $stub);
+ $count++;
+ }
+ }
+ socket_close($socket);
+ echo "Number of interop tests run in $args[test_duration_secs] seconds: $count.\n";
+}
+
+// process command line arguments
+$raw_args = getopt('',
+ ['server_addresses::',
+ 'test_cases:',
+ 'metrics_port::',
+ 'test_duration_secs::',
+ 'num_channels_per_server::',
+ 'num_stubs_per_channel::']);
+
+$args = [];
+
+if (empty($raw_args['server_addresses'])) {
+ $args['server_host'] = 'localhost';
+ $args['server_port'] = '8080';
+} else {
+ $parts = explode(':', $raw_args['server_addresses']);
+ $args['server_host'] = $parts[0];
+ $args['server_port'] = (count($parts) == 2) ? $parts[1] : '';
+}
+
+$args['metrics_port'] = empty($raw_args['metrics_port']) ?
+ '8081' : $args['metrics_port'];
+
+$args['test_duration_secs'] = empty($raw_args['test_duration_secs']) ||
+ $raw_args['test_duration_secs'] == -1 ?
+ false : $raw_args['test_duration_secs'];
+
+$test_cases = [];
+$test_case_strs = explode(',', $raw_args['test_cases']);
+foreach ($test_case_strs as $test_case_str) {
+ $parts = explode(':', $test_case_str);
+ $test_cases = array_merge($test_cases, array_fill(0, $parts[1], $parts[0]));
+}
+$args['test_cases'] = $test_cases;
+
+stress_main($args);
diff --git a/src/proto/grpc/lb/v0/load_balancer.options b/src/proto/grpc/lb/v0/load_balancer.options
deleted file mode 100644
index 6d4528f838..0000000000
--- a/src/proto/grpc/lb/v0/load_balancer.options
+++ /dev/null
@@ -1,6 +0,0 @@
-grpc.lb.v0.InitialLoadBalanceRequest.name max_size:128
-grpc.lb.v0.InitialLoadBalanceResponse.client_config max_size:64
-grpc.lb.v0.InitialLoadBalanceResponse.load_balancer_delegate max_size:64
-grpc.lb.v0.Server.ip_address max_size:46
-grpc.lb.v0.Server.load_balance_token max_size:64
-load_balancer.proto no_unions:true
diff --git a/src/proto/grpc/lb/v1/load_balancer.options b/src/proto/grpc/lb/v1/load_balancer.options
new file mode 100644
index 0000000000..d90366996e
--- /dev/null
+++ b/src/proto/grpc/lb/v1/load_balancer.options
@@ -0,0 +1,6 @@
+grpc.lb.v1.InitialLoadBalanceRequest.name max_size:128
+grpc.lb.v1.InitialLoadBalanceResponse.client_config max_size:64
+grpc.lb.v1.InitialLoadBalanceResponse.load_balancer_delegate max_size:64
+grpc.lb.v1.Server.ip_address max_size:46
+grpc.lb.v1.Server.load_balance_token max_size:64
+load_balancer.proto no_unions:true
diff --git a/src/proto/grpc/lb/v0/load_balancer.proto b/src/proto/grpc/lb/v1/load_balancer.proto
index e88a4f8c4a..1bcad0b1d4 100644
--- a/src/proto/grpc/lb/v0/load_balancer.proto
+++ b/src/proto/grpc/lb/v1/load_balancer.proto
@@ -29,7 +29,7 @@
syntax = "proto3";
-package grpc.lb.v0;
+package grpc.lb.v1;
message Duration {
@@ -94,9 +94,8 @@ message LoadBalanceResponse {
message InitialLoadBalanceResponse {
oneof initial_response_type {
- // Contains gRPC config options like RPC deadline or flow control.
- // TODO(yetianx): Change to ClientConfig after it is defined.
- string client_config = 1;
+ // TODO(zhangkun83): ClientConfig not yet defined
+ //ClientConfig client_config = 1;
// This is an application layer redirect that indicates the client should
// use the specified server for load balancing. When this field is set in
@@ -134,9 +133,7 @@ message Server {
// An opaque token that is passed from the client to the server in metadata.
// The server may expect this token to indicate that the request from the
// client was load balanced.
- // TODO(yetianx): Not used right now, and will be used after implementing
- // load report.
- bytes load_balance_token = 3;
+ string load_balance_token = 3;
// Indicates whether this particular request should be dropped by the client
// when this server is chosen from the list.
diff --git a/src/python/grpcio/README.rst b/src/python/grpcio/README.rst
index cb3f6b87fe..afc4fe6a37 100644
--- a/src/python/grpcio/README.rst
+++ b/src/python/grpcio/README.rst
@@ -48,6 +48,7 @@ package named :code:`python-dev`).
$ export REPO_ROOT=grpc # REPO_ROOT can be any directory of your choice
$ git clone https://github.com/grpc/grpc.git $REPO_ROOT
$ cd $REPO_ROOT
+ $ git submodule update --init
# For the next two commands do `sudo pip install` if you get permission-denied errors
$ pip install -rrequirements.txt
diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py
index 7086519106..b844a14c48 100644
--- a/src/python/grpcio/grpc/__init__.py
+++ b/src/python/grpcio/grpc/__init__.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -27,4 +27,5 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+__import__('pkg_resources').declare_namespace(__name__)
diff --git a/src/python/grpcio/grpc_core_dependencies.py b/src/python/grpcio/grpc_core_dependencies.py
index 5d39cef772..7ba4fc1193 100644
--- a/src/python/grpcio/grpc_core_dependencies.py
+++ b/src/python/grpcio/grpc_core_dependencies.py
@@ -233,7 +233,7 @@ CORE_SOURCE_FILES = [
'src/core/ext/transport/chttp2/server/insecure/server_chttp2.c',
'src/core/ext/transport/chttp2/client/insecure/channel_create.c',
'src/core/ext/lb_policy/grpclb/load_balancer_api.c',
- 'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v0/load_balancer.pb.c',
+ 'src/core/ext/lb_policy/grpclb/proto/grpc/lb/v1/load_balancer.pb.c',
'third_party/nanopb/pb_common.c',
'third_party/nanopb/pb_decode.c',
'third_party/nanopb/pb_encode.c',
diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py
index 873b4e2a91..0c13104d9d 100644
--- a/src/python/grpcio/grpc_version.py
+++ b/src/python/grpcio/grpc_version.py
@@ -29,4 +29,4 @@
# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!!
-VERSION='0.14.0.dev0'
+VERSION='0.15.0.dev0'
diff --git a/src/python/grpcio/precompiled.py b/src/python/grpcio/precompiled.py
deleted file mode 100644
index b6aa7fc90e..0000000000
--- a/src/python/grpcio/precompiled.py
+++ /dev/null
@@ -1,114 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-import os
-import platform
-import shutil
-import sys
-import sysconfig
-
-import setuptools
-
-import commands
-import grpc_version
-
-try:
- from urllib2 import urlopen
-except ImportError:
- from urllib.request import urlopen
-
-PYTHON_STEM = os.path.dirname(os.path.abspath(__file__))
-BINARIES_REPOSITORY = os.environ.get(
- 'GRPC_PYTHON_BINARIES_REPOSITORY',
- 'https://storage.googleapis.com/grpc-precompiled-binaries/python')
-USE_PRECOMPILED_BINARIES = bool(int(os.environ.get(
- 'GRPC_PYTHON_USE_PRECOMPILED_BINARIES', '1')))
-
-def _tagged_ext_name(base):
- uname = platform.uname()
- tags = (
- grpc_version.VERSION,
- 'py{}'.format(sysconfig.get_python_version()),
- uname[0],
- uname[4],
- )
- ucs = 'ucs{}'.format(sysconfig.get_config_var('Py_UNICODE_SIZE'))
- return '{base}-{tags}-{ucs}'.format(
- base=base, tags='-'.join(tags), ucs=ucs)
-
-
-class BuildTaggedExt(setuptools.Command):
-
- description = 'build the gRPC tagged extensions'
- user_options = []
-
- def initialize_options(self):
- # distutils requires this override.
- pass
-
- def finalize_options(self):
- # distutils requires this override.
- pass
-
- def run(self):
- if 'linux' in sys.platform:
- self.run_command('build_ext')
- try:
- os.makedirs('dist/')
- except OSError:
- pass
- shutil.copyfile(
- os.path.join(PYTHON_STEM, 'grpc/_cython/cygrpc.so'),
- 'dist/{}.so'.format(_tagged_ext_name('cygrpc')))
- else:
- sys.stderr.write('nothing to do for build_tagged_ext\n')
-
-
-def update_setup_arguments(setup_arguments):
- if not USE_PRECOMPILED_BINARIES:
- sys.stderr.write('not using precompiled extension')
- return
- url = '{}/{}.so'.format(BINARIES_REPOSITORY, _tagged_ext_name('cygrpc'))
- target_path = os.path.join(PYTHON_STEM, 'grpc/_cython/cygrpc.so')
- try:
- extension = urlopen(url).read()
- except:
- sys.stderr.write(
- 'could not download precompiled extension: {}\n'.format(url))
- return
- try:
- with open(target_path, 'w') as target:
- target.write(extension)
- setup_arguments['ext_modules'] = []
- except:
- sys.stderr.write(
- 'could not write precompiled extension to directory: {} -> {}\n'
- .format(url, target_path))
- return
- setup_arguments['package_data']['grpc._cython'].append('cygrpc.so')
diff --git a/src/python/grpcio/tests/qps/benchmark_client.py b/src/python/grpcio/tests/qps/benchmark_client.py
index eed0b0c6da..b372ea01ad 100644
--- a/src/python/grpcio/tests/qps/benchmark_client.py
+++ b/src/python/grpcio/tests/qps/benchmark_client.py
@@ -39,6 +39,7 @@ except ImportError:
from concurrent import futures
from grpc.beta import implementations
+from grpc.framework.interfaces.face import face
from src.proto.grpc.testing import messages_pb2
from src.proto.grpc.testing import services_pb2
from tests.unit import resources
@@ -141,10 +142,10 @@ class UnaryAsyncBenchmarkClient(BenchmarkClient):
self._stub = None
-class StreamingAsyncBenchmarkClient(BenchmarkClient):
+class StreamingSyncBenchmarkClient(BenchmarkClient):
def __init__(self, server, config, hist):
- super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ super(StreamingSyncBenchmarkClient, self).__init__(server, config, hist)
self._is_streaming = False
self._pool = futures.ThreadPoolExecutor(max_workers=1)
# Use a thread-safe queue to put requests on the stream
@@ -167,12 +168,12 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
def _request_stream(self):
self._is_streaming = True
if self._generic:
- response_stream = self._stub.inline_stream_stream(
- 'grpc.testing.BenchmarkService', 'StreamingCall',
- self._request_generator(), _TIMEOUT)
+ stream_callable = self._stub.stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall')
else:
- response_stream = self._stub.StreamingCall(self._request_generator(),
- _TIMEOUT)
+ stream_callable = self._stub.StreamingCall
+
+ response_stream = stream_callable(self._request_generator(), _TIMEOUT)
for _ in response_stream:
end_time = time.time()
self._handle_response(end_time - self._send_time_queue.get_nowait())
@@ -184,3 +185,48 @@ class StreamingAsyncBenchmarkClient(BenchmarkClient):
yield request
except queue.Empty:
pass
+
+
+class AsyncReceiver(face.ResponseReceiver):
+ """Receiver for async stream responses."""
+
+ def __init__(self, send_time_queue, response_handler):
+ self._send_time_queue = send_time_queue
+ self._response_handler = response_handler
+
+ def initial_metadata(self, initial_mdetadata):
+ pass
+
+ def response(self, response):
+ end_time = time.time()
+ self._response_handler(end_time - self._send_time_queue.get_nowait())
+
+ def complete(self, terminal_metadata, code, details):
+ pass
+
+
+class StreamingAsyncBenchmarkClient(BenchmarkClient):
+
+ def __init__(self, server, config, hist):
+ super(StreamingAsyncBenchmarkClient, self).__init__(server, config, hist)
+ self._send_time_queue = queue.Queue()
+ self._receiver = AsyncReceiver(self._send_time_queue, self._handle_response)
+ self._rendezvous = None
+
+ def send_request(self):
+ if self._rendezvous is not None:
+ self._send_time_queue.put(time.time())
+ self._rendezvous.consume(self._request)
+
+ def start(self):
+ if self._generic:
+ stream_callable = self._stub.stream_stream(
+ 'grpc.testing.BenchmarkService', 'StreamingCall')
+ else:
+ stream_callable = self._stub.StreamingCall
+ self._rendezvous = stream_callable.event(
+ self._receiver, lambda *args: None, _TIMEOUT)
+
+ def stop(self):
+ self._rendezvous.terminate()
+ self._rendezvous = None
diff --git a/src/python/grpcio/tests/qps/client_runner.py b/src/python/grpcio/tests/qps/client_runner.py
index a36c30ccc0..1ede7d2af1 100644
--- a/src/python/grpcio/tests/qps/client_runner.py
+++ b/src/python/grpcio/tests/qps/client_runner.py
@@ -89,9 +89,9 @@ class ClosedLoopClientRunner(ClientRunner):
def start(self):
self._is_running = True
+ self._client.start()
for _ in xrange(self._request_count):
self._client.send_request()
- self._client.start()
def stop(self):
self._is_running = False
diff --git a/src/python/grpcio/tests/qps/worker_server.py b/src/python/grpcio/tests/qps/worker_server.py
index 0b3acc14e7..1f9af5482c 100644
--- a/src/python/grpcio/tests/qps/worker_server.py
+++ b/src/python/grpcio/tests/qps/worker_server.py
@@ -146,8 +146,9 @@ class WorkerServer(services_pb2.BetaWorkerServiceServicer):
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnarySyncBenchmarkClient(
server, config, qps_data)
- else:
- raise Exception('STREAMING SYNC client not supported')
+ elif config.rpc_type == control_pb2.STREAMING:
+ client = benchmark_client.StreamingSyncBenchmarkClient(
+ server, config, qps_data)
elif config.client_type == control_pb2.ASYNC_CLIENT:
if config.rpc_type == control_pb2.UNARY:
client = benchmark_client.UnaryAsyncBenchmarkClient(
diff --git a/src/python/grpcio/tests/stress/client.py b/src/python/grpcio/tests/stress/client.py
index a733741b73..e2e016760c 100644
--- a/src/python/grpcio/tests/stress/client.py
+++ b/src/python/grpcio/tests/stress/client.py
@@ -117,7 +117,10 @@ def run_test(args):
for runner in runners:
runner.start()
try:
- raise exception_queue.get(block=True, timeout=args.test_duration_secs)
+ timeout_secs = args.test_duration_secs
+ if timeout_secs < 0:
+ timeout_secs = None
+ raise exception_queue.get(block=True, timeout=timeout_secs)
except Queue.Empty:
# No exceptions thrown, success
pass
diff --git a/src/python/grpcio/tests/unit/_cython/_channel_test.py b/src/python/grpcio/tests/unit/_cython/_channel_test.py
index b414f8e6f6..931cd9083e 100644
--- a/src/python/grpcio/tests/unit/_cython/_channel_test.py
+++ b/src/python/grpcio/tests/unit/_cython/_channel_test.py
@@ -33,8 +33,7 @@ import unittest
from grpc._cython import cygrpc
-# TODO(nathaniel): This should be at least one hundred. Why not one thousand?
-_PARALLELISM = 4
+from tests.unit.framework.common import test_constants
def _channel_and_completion_queue():
@@ -61,7 +60,7 @@ def _create_loop_destroy():
def _in_parallel(behavior, arguments):
threads = tuple(
threading.Thread(target=behavior, args=arguments)
- for _ in range(_PARALLELISM))
+ for _ in range(test_constants.PARALLELISM))
for thread in threads:
thread.start()
for thread in threads:
diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb
index 07f7bb93b8..6d65db8306 100644
--- a/src/ruby/ext/grpc/extconf.rb
+++ b/src/ruby/ext/grpc/extconf.rb
@@ -78,9 +78,11 @@ output_dir = File.expand_path(RbConfig::CONFIG['topdir'])
grpc_lib_dir = File.join(output_dir, 'libs', grpc_config)
ENV['BUILDDIR'] = output_dir
-puts 'Building internal gRPC into ' + grpc_lib_dir
-system("make -j -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config}")
-exit 1 unless $? == 0
+unless windows
+ puts 'Building internal gRPC into ' + grpc_lib_dir
+ system("make -j -C #{grpc_root} #{grpc_lib_dir}/libgrpc.a CONFIG=#{grpc_config}")
+ exit 1 unless $? == 0
+end
$CFLAGS << ' -I' + File.join(grpc_root, 'include')
$LDFLAGS << ' ' + File.join(grpc_lib_dir, 'libgrpc.a') unless windows
diff --git a/src/ruby/ext/grpc/rb_byte_buffer.c b/src/ruby/ext/grpc/rb_byte_buffer.c
index cba910d832..1172691116 100644
--- a/src/ruby/ext/grpc/rb_byte_buffer.c
+++ b/src/ruby/ext/grpc/rb_byte_buffer.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_byte_buffer.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/byte_buffer_reader.h>
#include <grpc/support/slice.h>
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 48c49a21e9..1b06273af9 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_call.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/support/alloc.h>
diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c
index 38bf1f7710..79ca5b32ce 100644
--- a/src/ruby/ext/grpc/rb_call_credentials.c
+++ b/src/ruby/ext/grpc/rb_call_credentials.c
@@ -32,10 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_call_credentials.h"
-#include <ruby/ruby.h>
#include <ruby/thread.h>
#include <grpc/grpc.h>
@@ -86,11 +86,11 @@ static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args,
rb_funcall(exception_object, rb_intern("backtrace"), 0),
rb_intern("join"),
1, rb_str_new2("\n\tfrom "));
- VALUE exception_info = rb_funcall(exception_object, rb_intern("to_s"), 0);
+ VALUE rb_exception_info = rb_funcall(exception_object, rb_intern("to_s"), 0);
const char *exception_classname = rb_obj_classname(exception_object);
(void)args;
gpr_log(GPR_INFO, "Call credentials callback failed: %s: %s\n%s",
- exception_classname, StringValueCStr(exception_info),
+ exception_classname, StringValueCStr(rb_exception_info),
StringValueCStr(backtrace));
rb_hash_aset(result, rb_str_new2("metadata"), Qnil);
/* Currently only gives the exception class name. It should be possible get
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 984afad107..013321ffc8 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_channel.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
diff --git a/src/ruby/ext/grpc/rb_channel_args.c b/src/ruby/ext/grpc/rb_channel_args.c
index 2ffb8f41da..87c0e0a705 100644
--- a/src/ruby/ext/grpc/rb_channel_args.c
+++ b/src/ruby/ext/grpc/rb_channel_args.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_channel_args.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include "rb_grpc.h"
diff --git a/src/ruby/ext/grpc/rb_channel_credentials.c b/src/ruby/ext/grpc/rb_channel_credentials.c
index 09bd3093a9..cbb23885aa 100644
--- a/src/ruby/ext/grpc/rb_channel_credentials.c
+++ b/src/ruby/ext/grpc/rb_channel_credentials.c
@@ -31,14 +31,13 @@
*
*/
+#include <ruby/ruby.h>
+
#include <string.h>
-#include <ruby/ruby.h>
#include "rb_grpc_imports.generated.h"
#include "rb_channel_credentials.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/alloc.h>
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index 2a2eee190c..4bb615f8be 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -32,10 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_completion_queue.h"
-#include <ruby/ruby.h>
#include <ruby/thread.h>
#include <grpc/grpc.h>
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 2649a1087f..9e85bbcfbf 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -32,12 +32,12 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_event_thread.h"
#include <stdbool.h>
-#include <ruby/ruby.h>
#include <ruby/thread.h>
#include <grpc/support/alloc.h>
#include <grpc/support/sync.h>
diff --git a/src/ruby/ext/grpc/rb_grpc.c b/src/ruby/ext/grpc/rb_grpc.c
index acb47b0055..5277148fc9 100644
--- a/src/ruby/ext/grpc/rb_grpc.c
+++ b/src/ruby/ext/grpc/rb_grpc.c
@@ -32,11 +32,11 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_grpc.h"
#include <math.h>
-#include <ruby/ruby.h>
#include <ruby/vm.h>
#include <sys/time.h>
@@ -50,6 +50,7 @@
#include "rb_loader.h"
#include "rb_server.h"
#include "rb_server_credentials.h"
+#include "rb_signal.h"
static VALUE grpc_rb_cTimeVal = Qnil;
@@ -332,6 +333,7 @@ void Init_grpc_c() {
Init_grpc_channel_credentials();
Init_grpc_server();
Init_grpc_server_credentials();
+ Init_grpc_signals();
Init_grpc_status_codes();
Init_grpc_time_consts();
}
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 96e60c6776..2b3acaaf59 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_server.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include "rb_call.h"
diff --git a/src/ruby/ext/grpc/rb_server_credentials.c b/src/ruby/ext/grpc/rb_server_credentials.c
index b2d7280a30..3b0fb6c910 100644
--- a/src/ruby/ext/grpc/rb_server_credentials.c
+++ b/src/ruby/ext/grpc/rb_server_credentials.c
@@ -32,11 +32,10 @@
*/
#include <ruby/ruby.h>
+
#include "rb_grpc_imports.generated.h"
#include "rb_server_credentials.h"
-#include <ruby/ruby.h>
-
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
diff --git a/src/ruby/ext/grpc/rb_signal.c b/src/ruby/ext/grpc/rb_signal.c
new file mode 100644
index 0000000000..a9e512374b
--- /dev/null
+++ b/src/ruby/ext/grpc/rb_signal.c
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <ruby/ruby.h>
+#include <signal.h>
+#include <stdbool.h>
+
+#include <grpc/support/log.h>
+
+#include "rb_grpc.h"
+
+static void (*old_sigint_handler)(int);
+static void (*old_sigterm_handler)(int);
+
+static volatile bool signal_received = false;
+
+/* This has to be handled at the C level instead of Ruby, because Ruby signal
+ * handlers are constrained to run in the main interpreter thread. If that main
+ * thread is blocked on grpc_completion_queue_pluck, the signal handlers will
+ * never run */
+static void handle_signal(int signum) {
+ signal_received = true;
+ if (signum == SIGINT) {
+ old_sigint_handler(signum);
+ } else if (signum == SIGTERM) {
+ old_sigterm_handler(signum);
+ }
+}
+
+static VALUE grpc_rb_signal_received(VALUE self) {
+ (void)self;
+ return signal_received ? Qtrue : Qfalse;
+}
+
+void Init_grpc_signals() {
+ old_sigint_handler = signal(SIGINT, handle_signal);
+ old_sigterm_handler = signal(SIGTERM, handle_signal);
+ rb_define_singleton_method(grpc_rb_mGrpcCore, "signal_received?",
+ grpc_rb_signal_received, 0);
+}
diff --git a/src/ruby/ext/grpc/rb_signal.h b/src/ruby/ext/grpc/rb_signal.h
new file mode 100644
index 0000000000..07e49c0a8b
--- /dev/null
+++ b/src/ruby/ext/grpc/rb_signal.h
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_RB_SIGNAL_H_
+#define GRPC_RB_SIGNAL_H_
+
+void Init_grpc_signals();
+
+#endif /* GRPC_RB_SIGNAL_H_ */
diff --git a/src/ruby/lib/grpc.rb b/src/ruby/lib/grpc.rb
index 79fa705b1c..7c9aae30e9 100644
--- a/src/ruby/lib/grpc.rb
+++ b/src/ruby/lib/grpc.rb
@@ -33,6 +33,7 @@ require_relative 'grpc/errors'
require_relative 'grpc/grpc'
require_relative 'grpc/logconfig'
require_relative 'grpc/notifier'
+require_relative 'grpc/signals'
require_relative 'grpc/version'
require_relative 'grpc/core/time_consts'
require_relative 'grpc/generic/active_call'
@@ -47,3 +48,5 @@ begin
ensure
file.close
end
+
+GRPC::Signals.wait_for_signals
diff --git a/src/ruby/lib/grpc/generic/active_call.rb b/src/ruby/lib/grpc/generic/active_call.rb
index ecf3cc3293..fd20a86144 100644
--- a/src/ruby/lib/grpc/generic/active_call.rb
+++ b/src/ruby/lib/grpc/generic/active_call.rb
@@ -28,7 +28,9 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require 'forwardable'
+require 'weakref'
require_relative 'bidi_call'
+require_relative '../signals'
class Struct
# BatchResult is the struct returned by calls to call#start_batch.
@@ -121,6 +123,10 @@ module GRPC
@unmarshal = unmarshal
@metadata_tag = metadata_tag
@op_notifier = nil
+ weak_self = WeakRef.new(self)
+ remove_handler = GRPC::Signals.register_handler(&weak_self
+ .method(:cancel))
+ ObjectSpace.define_finalizer(self, remove_handler)
end
# output_metadata are provides access to hash that can be used to
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index a0f4071adc..238aaa9656 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -28,46 +28,13 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
require_relative '../grpc'
+require_relative '../signals'
require_relative 'active_call'
require_relative 'service'
require 'thread'
-# A global that contains signals the gRPC servers should respond to.
-$grpc_signals = []
-
# GRPC contains the General RPC module.
module GRPC
- # Handles the signals in $grpc_signals.
- #
- # @return false if the server should exit, true if not.
- def handle_signals
- loop do
- sig = $grpc_signals.shift
- case sig
- when 'INT'
- return false
- when 'TERM'
- return false
- when nil
- return true
- end
- end
- true
- end
- module_function :handle_signals
-
- # Sets up a signal handler that adds signals to the signal handling global.
- #
- # Signal handlers should do as little as humanly possible.
- # Here, they just add themselves to $grpc_signals
- #
- # RpcServer (and later other parts of gRPC) monitors the signals
- # $grpc_signals in its own non-signal context.
- def trap_signals
- %w(INT TERM).each { |sig| trap(sig) { $grpc_signals << sig } }
- end
- module_function :trap_signals
-
# Pool is a simple thread pool.
class Pool
# Default keep alive period is 1s
@@ -328,23 +295,6 @@ module GRPC
end
end
- # Runs the server in its own thread, then waits for signal INT or TERM on
- # the current thread to terminate it.
- def run_till_terminated
- GRPC.trap_signals
- t = Thread.new do
- run
- end
- t.abort_on_exception = true
- wait_till_running
- until running_state == :stopped
- sleep SIGNAL_CHECK_PERIOD
- break unless GRPC.handle_signals
- end
- stop
- t.join
- end
-
# handle registration of classes
#
# service is either a class that includes GRPC::GenericService and whose
@@ -403,9 +353,14 @@ module GRPC
transition_running_state(:running)
@run_cond.broadcast
end
+ remove_signal_handler = GRPC::Signals.register_handler { stop }
loop_handle_server_calls
+ # Remove signal handler when server stops
+ remove_signal_handler.call
end
+ alias_method :run_till_terminated, :run
+
# Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
def available?(an_rpc)
jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
@@ -503,10 +458,8 @@ module GRPC
unless cls.include?(GenericService)
fail "#{cls} must 'include GenericService'"
end
- if cls.rpc_descs.size.zero?
- fail "#{cls} should specify some rpc descriptions"
- end
- cls.assert_rpc_descs_have_methods
+ fail "#{cls} should specify some rpc descriptions" if
+ cls.rpc_descs.size.zero?
end
# This should be called while holding @run_mutex
diff --git a/src/ruby/lib/grpc/generic/service.rb b/src/ruby/lib/grpc/generic/service.rb
index 8e940b5b13..0a166e823e 100644
--- a/src/ruby/lib/grpc/generic/service.rb
+++ b/src/ruby/lib/grpc/generic/service.rb
@@ -110,6 +110,9 @@ module GRPC
rpc_descs[name] = RpcDesc.new(name, input, output,
marshal_class_method,
unmarshal_class_method)
+ define_method(name) do
+ fail GRPC::BadStatus, GRPC::Core::StatusCodes::UNIMPLEMENTED
+ end
end
def inherited(subclass)
@@ -199,19 +202,6 @@ module GRPC
end
end
end
-
- # Asserts that the appropriate methods are defined for each added rpc
- # spec. Is intended to aid verifying that server classes are correctly
- # implemented.
- def assert_rpc_descs_have_methods
- rpc_descs.each_pair do |m, spec|
- mth_name = GenericService.underscore(m.to_s).to_sym
- unless instance_methods.include?(mth_name)
- fail "#{self} does not provide instance method '#{mth_name}'"
- end
- spec.assert_arity_matches(instance_method(mth_name))
- end
- end
end
def self.included(o)
diff --git a/src/ruby/lib/grpc/signals.rb b/src/ruby/lib/grpc/signals.rb
new file mode 100644
index 0000000000..2ab85c8bb1
--- /dev/null
+++ b/src/ruby/lib/grpc/signals.rb
@@ -0,0 +1,69 @@
+# Copyright 2016, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+require 'thread'
+require_relative 'grpc'
+
+# GRPC contains the General RPC module.
+module GRPC
+ # Signals contains gRPC functions related to signal handling
+ module Signals
+ @interpreter_exiting = false
+ @signal_handlers = []
+ @handlers_mutex = Mutex.new
+
+ def register_handler(&handler)
+ @handlers_mutex.synchronize do
+ @signal_handlers.push(handler)
+ handler.call if @exit_signal_received
+ end
+ # Returns a function to remove the handler
+ lambda do
+ @handlers_mutex.synchronize { @signal_handlers.delete(handler) }
+ end
+ end
+ module_function :register_handler
+
+ def wait_for_signals
+ t = Thread.new do
+ sleep 0.1 until GRPC::Core.signal_received? || @interpreter_exiting
+ unless @interpreter_exiting
+ @handlers_mutex.synchronize do
+ @signal_handlers.each(&:call)
+ end
+ end
+ end
+ at_exit do
+ @interpreter_exiting = true
+ t.join
+ end
+ end
+ module_function :wait_for_signals
+ end
+end
diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb
index 67c6a5d5a1..01c8c5ac8f 100644
--- a/src/ruby/lib/grpc/version.rb
+++ b/src/ruby/lib/grpc/version.rb
@@ -29,5 +29,5 @@
# GRPC contains the General RPC module.
module GRPC
- VERSION = '0.14.0.dev'
+ VERSION = '0.15.0.dev'
end
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index e688057cb1..2a42736237 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -308,10 +308,6 @@ describe GRPC::RpcServer do
expect { @srv.handle(EmptyService) }.to raise_error
end
- it 'raises if the service does not define its rpc methods' do
- expect { @srv.handle(NoRpcImplementation) }.to raise_error
- end
-
it 'raises if a handler method is already registered' do
@srv.handle(EchoService)
expect { r.handle(EchoService) }.to raise_error
@@ -349,6 +345,25 @@ describe GRPC::RpcServer do
t.join
end
+ it 'should return UNIMPLEMENTED on unimplemented methods', server: true do
+ @srv.handle(NoRpcImplementation)
+ t = Thread.new { @srv.run }
+ @srv.wait_till_running
+ req = EchoMsg.new
+ blk = proc do
+ cq = GRPC::Core::CompletionQueue.new
+ stub = GRPC::ClientStub.new(@host, cq, :this_channel_is_insecure,
+ **client_opts)
+ stub.request_response('/an_rpc', req, marshal, unmarshal)
+ end
+ expect(&blk).to raise_error do |error|
+ expect(error).to be_a(GRPC::BadStatus)
+ expect(error.code).to be(GRPC::Core::StatusCodes::UNIMPLEMENTED)
+ end
+ @srv.stop
+ t.join
+ end
+
it 'should handle multiple sequential requests', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
diff --git a/src/ruby/spec/generic/service_spec.rb b/src/ruby/spec/generic/service_spec.rb
index 5e7b6c7aba..76034e4f74 100644
--- a/src/ruby/spec/generic/service_spec.rb
+++ b/src/ruby/spec/generic/service_spec.rb
@@ -273,73 +273,4 @@ describe GenericService do
end
end
end
-
- describe '#assert_rpc_descs_have_methods' do
- it 'fails if there is no instance method for an rpc descriptor' do
- c1 = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
- end
- expect { c1.assert_rpc_descs_have_methods }.to raise_error
-
- c2 = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
- rpc :AnotherRpc, GoodMsg, GoodMsg
-
- def an_rpc
- end
- end
- expect { c2.assert_rpc_descs_have_methods }.to raise_error
- end
-
- it 'passes if there are corresponding methods for each descriptor' do
- c = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
- rpc :AServerStreamer, GoodMsg, stream(GoodMsg)
- rpc :AClientStreamer, stream(GoodMsg), GoodMsg
- rpc :ABidiStreamer, stream(GoodMsg), stream(GoodMsg)
-
- def an_rpc(_req, _call)
- end
-
- def a_server_streamer(_req, _call)
- end
-
- def a_client_streamer(_call)
- end
-
- def a_bidi_streamer(_call)
- end
- end
- expect { c.assert_rpc_descs_have_methods }.to_not raise_error
- end
-
- it 'passes for subclasses of that include GenericService' do
- base = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
-
- def an_rpc(_req, _call)
- end
- end
- c = Class.new(base)
- expect { c.assert_rpc_descs_have_methods }.to_not raise_error
- expect(c.include?(GenericService)).to be(true)
- end
-
- it 'passes if subclasses define the rpc methods' do
- base = Class.new do
- include GenericService
- rpc :AnRpc, GoodMsg, GoodMsg
- end
- c = Class.new(base) do
- def an_rpc(_req, _call)
- end
- end
- expect { c.assert_rpc_descs_have_methods }.to_not raise_error
- expect(c.include?(GenericService)).to be(true)
- end
- end
end
diff --git a/src/ruby/tools/bin/protoc.rb b/src/ruby/tools/bin/grpc_tools_ruby_protoc.rb
index 3a2a5b8dc9..3a2a5b8dc9 100755
--- a/src/ruby/tools/bin/protoc.rb
+++ b/src/ruby/tools/bin/grpc_tools_ruby_protoc.rb
diff --git a/src/ruby/tools/bin/protoc_grpc_ruby_plugin.rb b/src/ruby/tools/bin/grpc_tools_ruby_protoc_plugin.rb
index 4b296dedc7..4b296dedc7 100755
--- a/src/ruby/tools/bin/protoc_grpc_ruby_plugin.rb
+++ b/src/ruby/tools/bin/grpc_tools_ruby_protoc_plugin.rb
diff --git a/src/ruby/tools/grpc-tools.gemspec b/src/ruby/tools/grpc-tools.gemspec
index af904de4a9..9fa4b66392 100644
--- a/src/ruby/tools/grpc-tools.gemspec
+++ b/src/ruby/tools/grpc-tools.gemspec
@@ -18,5 +18,5 @@ Gem::Specification.new do |s|
s.platform = Gem::Platform::RUBY
- s.executables = %w( protoc.rb protoc_grpc_ruby_plugin.rb )
+ s.executables = %w( grpc_tools_ruby_protoc.rb grpc_tools_ruby_protoc_plugin.rb )
end
diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb
index 12ad21b80e..dca7fd7e72 100644
--- a/src/ruby/tools/version.rb
+++ b/src/ruby/tools/version.rb
@@ -29,6 +29,6 @@
module GRPC
module Tools
- VERSION = '0.14.0.dev'
+ VERSION = '0.15.0.dev'
end
end